diff --git a/server/WebApi.cpp b/server/WebApi.cpp index 17639b7c..c010bc40 100644 --- a/server/WebApi.cpp +++ b/server/WebApi.cpp @@ -343,7 +343,7 @@ void installWebApi() { }); //主动关断流,包括关断拉流、推流 - //测试url http://127.0.0.1/index/api/close_stream?schema=rtsp&vhost=__defaultVhost__&app=live&stream=obs + //测试url http://127.0.0.1/index/api/close_stream?schema=rtsp&vhost=__defaultVhost__&app=live&stream=obs&force=1 API_REGIST(api,close_stream,{ CHECK_SECRET(); CHECK_ARGS("schema","vhost","app","stream"); @@ -353,7 +353,7 @@ void installWebApi() { allArgs["app"], allArgs["stream"]); if(src){ - bool flag = src->close(); + bool flag = src->close(allArgs["force"].as()); val["code"] = flag ? 0 : -1; val["msg"] = flag ? "success" : "close failed"; }else{ @@ -511,6 +511,13 @@ void installWebApi() { //shell登录调试事件 throw SuccessException(); }); + + API_REGIST(hook,on_stream_none_reader,{ + //无人观看流默认关闭 + val["close"] = true; + }); + + } void unInstallWebApi(){ diff --git a/server/WebHook.cpp b/server/WebHook.cpp index 63565b14..59a271e9 100644 --- a/server/WebHook.cpp +++ b/server/WebHook.cpp @@ -42,6 +42,7 @@ const char kOnStreamChanged[] = HOOK_FIELD"on_stream_changed"; const char kOnStreamNotFound[] = HOOK_FIELD"on_stream_not_found"; const char kOnRecordMp4[] = HOOK_FIELD"on_record_mp4"; const char kOnShellLogin[] = HOOK_FIELD"on_shell_login"; +const char kOnStreamNoneReader[] = HOOK_FIELD"on_stream_none_reader"; const char kAdminParams[] = HOOK_FIELD"admin_params"; onceToken token([](){ @@ -56,6 +57,7 @@ onceToken token([](){ mINI::Instance()[kOnStreamNotFound] = "https://127.0.0.1/index/hook/on_stream_not_found"; mINI::Instance()[kOnRecordMp4] = "https://127.0.0.1/index/hook/on_record_mp4"; mINI::Instance()[kOnShellLogin] = "https://127.0.0.1/index/hook/on_shell_login"; + mINI::Instance()[kOnStreamNoneReader] = "https://127.0.0.1/index/hook/on_stream_none_reader"; mINI::Instance()[kAdminParams] = "secret=035c73f7-bb6b-4889-a715-d9eb2d1925cc"; },nullptr); }//namespace Hook @@ -159,7 +161,7 @@ void installWebHook(){ GET_CONFIG_AND_REGISTER(string,hook_stream_not_found,Hook::kOnStreamNotFound); GET_CONFIG_AND_REGISTER(string,hook_record_mp4,Hook::kOnRecordMp4); GET_CONFIG_AND_REGISTER(string,hook_shell_login,Hook::kOnShellLogin); - + GET_CONFIG_AND_REGISTER(string,hook_stream_none_reader,Hook::kOnStreamNoneReader); NoticeCenter::Instance().addListener(nullptr,Broadcast::kBroadcastMediaPublish,[](BroadcastMediaPublishArgs){ if(!hook_enable || args._param_strs == hook_adminparams || hook_publish.empty()){ @@ -325,6 +327,29 @@ void installWebHook(){ }); }); + NoticeCenter::Instance().addListener(nullptr,Broadcast::kBroadcastStreamNoneReader,[](BroadcastStreamNoneReaderArgs){ + if(!hook_enable || hook_stream_none_reader.empty()){ + return; + } + + ArgsType body; + body["schema"] = sender.getSchema(); + body["vhost"] = sender.getVhost(); + body["app"] = sender.getApp(); + body["stream"] = sender.getId(); + weak_ptr weakSrc = sender.shared_from_this(); + //执行hook + do_http_hook(hook_stream_none_reader,body, [weakSrc](const Value &obj,const string &err){ + bool flag = obj["close"].asBool(); + auto strongSrc = weakSrc.lock(); + if(!flag || !err.empty() || !strongSrc){ + return; + } + strongSrc->close(false); + }); + + }); + } void unInstallWebHook(){ diff --git a/src/Common/MediaSource.h b/src/Common/MediaSource.h index 3c6a0e38..bc49b9cf 100644 --- a/src/Common/MediaSource.h +++ b/src/Common/MediaSource.h @@ -44,8 +44,8 @@ using namespace toolkit; namespace mediakit { -class MediaSourceEvent -{ +class MediaSource; +class MediaSourceEvent{ public: MediaSourceEvent(){}; virtual ~MediaSourceEvent(){}; @@ -55,15 +55,18 @@ public: return false; } - virtual bool close() { + virtual bool close(bool force) { //通知其停止推流 return false; } - virtual void onReaderChanged(const EventPoller::Ptr &poller,int size,bool add_flag){} + virtual void onNoneReader(MediaSource &sender){ + //没有任何读取器消费该源,表明该源可以关闭了 + NoticeCenter::Instance().emitEvent(Broadcast::kBroadcastStreamNoneReader,sender); + } }; -class MediaInfo -{ + +class MediaInfo{ public: MediaInfo(){} MediaInfo(const string &url){ @@ -85,7 +88,6 @@ public: string _streamid; StrCaseMap _params; string _param_strs; - }; @@ -144,12 +146,12 @@ public: virtual uint32_t getTimeStamp(TrackType trackType) = 0; - bool close() { + bool close(bool force) { auto listener = _listener.lock(); if(!listener){ return false; } - return listener->close(); + return listener->close(force); } virtual void setListener(const std::weak_ptr &listener){ _listener = listener; diff --git a/src/Common/config.cpp b/src/Common/config.cpp index 6562d399..d8cc3b47 100644 --- a/src/Common/config.cpp +++ b/src/Common/config.cpp @@ -65,11 +65,15 @@ const char kBroadcastFlowReport[] = "kBroadcastFlowReport"; const char kBroadcastReloadConfig[] = "kBroadcastReloadConfig"; const char kBroadcastShellLogin[] = "kBroadcastShellLogin"; const char kBroadcastNotFoundStream[] = "kBroadcastNotFoundStream"; +const char kBroadcastStreamNoneReader[] = "kBroadcastStreamNoneReader"; const char kFlowThreshold[] = "broadcast.flowThreshold"; +const char kStreamNoneReaderDelayMS[] = "broadcast.streamNoneReaderDelayMS"; + onceToken token([](){ mINI::Instance()[kFlowThreshold] = 1024; + mINI::Instance()[kStreamNoneReaderDelayMS] = 5 * 1000; },nullptr); } //namespace Broadcast diff --git a/src/Common/config.h b/src/Common/config.h index 034fba05..24c98208 100644 --- a/src/Common/config.h +++ b/src/Common/config.h @@ -116,9 +116,18 @@ extern const char kBroadcastFlowReport[]; extern const char kBroadcastNotFoundStream[]; #define BroadcastNotFoundStreamArgs const MediaInfo &args,TcpSession &sender +//某个流无人消费时触发,目的为了实现无人观看时主动断开拉流等业务逻辑 +extern const char kBroadcastStreamNoneReader[]; +#define BroadcastStreamNoneReaderArgs MediaSource &sender + //流量汇报事件流量阈值,单位KB,默认1MB extern const char kFlowThreshold[]; +//流无人观看并且超过若干时间后才触发kBroadcastStreamNoneReader事件 +//默认连续5秒无人观看然后触发kBroadcastStreamNoneReader事件 +extern const char kStreamNoneReaderDelayMS[]; + + //更新配置文件事件广播,执行loadIniConfig函数加载配置文件成功后会触发该广播 extern const char kBroadcastReloadConfig[]; #define BroadcastReloadConfigArgs void diff --git a/src/MediaFile/MediaReader.cpp b/src/MediaFile/MediaReader.cpp index 07c421b3..e6e30514 100644 --- a/src/MediaFile/MediaReader.cpp +++ b/src/MediaFile/MediaReader.cpp @@ -166,11 +166,21 @@ void MediaReader::startReadMP4() { seek(ui32Stamp); return true; } -bool MediaReader::close(){ +bool MediaReader::close(bool force){ + if(!force && _mediaMuxer->readerCount() != 0 ){ + return false; + } _timer.reset(); return true; } +void MediaReader::onNoneReader(MediaSource &sender) { + if(_mediaMuxer->readerCount() != 0){ + return; + } + MediaSourceEvent::onNoneReader(sender); +} + bool MediaReader::readSample(int iTimeInc,bool justSeekSyncFrame) { TimeTicker(); lock_guard lck(_mtx); diff --git a/src/MediaFile/MediaReader.h b/src/MediaFile/MediaReader.h index 58113131..7399844d 100644 --- a/src/MediaFile/MediaReader.h +++ b/src/MediaFile/MediaReader.h @@ -68,7 +68,7 @@ public: * 关闭MediaReader的流化进程,会触发该对象放弃自持有 * @return */ - bool close() override; + bool close(bool force) override; /** * 自动生成MediaReader对象然后查找相关的MediaSource对象 @@ -86,9 +86,11 @@ public: const string &strId, const string &filePath = "", bool checkApp = true); -#ifdef ENABLE_MP4V2 + private: - void seek(uint32_t iSeekTime,bool bReStart = true); + void onNoneReader(MediaSource &sender) override; +#ifdef ENABLE_MP4V2 + void seek(uint32_t iSeekTime,bool bReStart = true); inline void setSeekTime(uint32_t iSeekTime); inline uint32_t getVideoCurrentTime(); inline MP4SampleId getVideoSampleId(int iTimeInc = 0); diff --git a/src/Player/PlayerProxy.cpp b/src/Player/PlayerProxy.cpp index 4013620c..38e306d4 100644 --- a/src/Player/PlayerProxy.cpp +++ b/src/Player/PlayerProxy.cpp @@ -145,7 +145,11 @@ void PlayerProxy::rePlay(const string &strUrl,int iFailedCnt){ return false; }, getPoller()); } -bool PlayerProxy::close() { +bool PlayerProxy::close(bool force) { + if(!force && _mediaMuxer->readerCount() != 0){ + return false; + } + //通知其停止推流 weak_ptr weakSlef = dynamic_pointer_cast(shared_from_this()); getPoller()->async_first([weakSlef]() { @@ -161,6 +165,12 @@ bool PlayerProxy::close() { return true; } +void PlayerProxy::onNoneReader(MediaSource &sender) { + if(_mediaMuxer->readerCount() != 0){ + return; + } + MediaSourceEvent::onNoneReader(sender); +} class MuteAudioMaker : public FrameRingInterfaceDelegate{ public: diff --git a/src/Player/PlayerProxy.h b/src/Player/PlayerProxy.h index 2ca79380..8cad05ed 100644 --- a/src/Player/PlayerProxy.h +++ b/src/Player/PlayerProxy.h @@ -79,8 +79,9 @@ public: * 被主动关闭 * @return */ - bool close() override; + bool close(bool force) override; private: + void onNoneReader(MediaSource &sender) override; void rePlay(const string &strUrl,int iFailedCnt); void onPlaySuccess(); private: diff --git a/src/Rtmp/RtmpMediaSource.h b/src/Rtmp/RtmpMediaSource.h index b5818791..ea868703 100644 --- a/src/Rtmp/RtmpMediaSource.h +++ b/src/Rtmp/RtmpMediaSource.h @@ -53,10 +53,13 @@ public: typedef std::shared_ptr Ptr; typedef RingBuffer RingType; - RtmpMediaSource(const string &vhost,const string &strApp, const string &strId,int ringSize = 0) : + RtmpMediaSource(const string &vhost, + const string &strApp, + const string &strId, + int ringSize = 0) : MediaSource(RTMP_SCHEMA,vhost,strApp,strId), - _pRing(new RingBuffer(ringSize)) { - } + _ringSize(ringSize) {} + virtual ~RtmpMediaSource() {} const RingType::Ptr &getRing() const { @@ -65,7 +68,7 @@ public: } int readerCount() override { - return _pRing->readerCount(); + return _pRing ? _pRing->readerCount() : 0; } const AMFValue &getMetaData() const { @@ -89,15 +92,26 @@ public: lock_guard lock(_mtxMap); if (pkt->isCfgFrame()) { _mapCfgFrame[pkt->typeId] = pkt; - } else{ - if(!_bRegisted){ - regist(); - _bRegisted = true; - } - _mapStamp[pkt->typeId] = pkt->timeStamp; - _pRing->write(pkt,pkt->isVideoKeyFrame()); + return; } - } + + _mapStamp[pkt->typeId] = pkt->timeStamp; + + if(!_pRing){ + weak_ptr weakSelf = dynamic_pointer_cast(shared_from_this()); + _pRing = std::make_shared(_ringSize,[weakSelf](const EventPoller::Ptr &,int size,bool){ + auto strongSelf = weakSelf.lock(); + if(!strongSelf){ + return; + } + strongSelf->onReaderChanged(size); + }); + onReaderChanged(0); + regist(); + } + _pRing->write(pkt,pkt->isVideoKeyFrame()); + checkNoneReader(); + } uint32_t getTimeStamp(TrackType trackType) override { lock_guard lock(_mtxMap); @@ -110,13 +124,38 @@ public: return MAX(_mapStamp[MSG_VIDEO],_mapStamp[MSG_AUDIO]); } } + +private: + void onReaderChanged(int size){ + if(size != 0 || readerCount() != 0){ + //还有消费者正在观看该流,我们记录最后一次活动时间 + _readerTicker.resetTime(); + _asyncEmitNoneReader = false; + return; + } + _asyncEmitNoneReader = true; + } + + void checkNoneReader(){ + GET_CONFIG_AND_REGISTER(int,stream_none_reader_delay,Broadcast::kStreamNoneReaderDelayMS); + if(_asyncEmitNoneReader && _readerTicker.elapsedTime() > stream_none_reader_delay){ + _asyncEmitNoneReader = false; + auto listener = _listener.lock(); + if(!listener){ + return; + } + listener->onNoneReader(*this); + } + } protected: AMFValue _metadata; unordered_map _mapCfgFrame; unordered_map _mapStamp; mutable recursive_mutex _mtxMap; RingBuffer::Ptr _pRing; //rtp环形缓冲 - bool _bRegisted = false; + int _ringSize; + Ticker _readerTicker; + bool _asyncEmitNoneReader = false; }; } /* namespace mediakit */ diff --git a/src/Rtmp/RtmpSession.cpp b/src/Rtmp/RtmpSession.cpp index 95d6ab4b..a5a2bdd5 100644 --- a/src/Rtmp/RtmpSession.cpp +++ b/src/Rtmp/RtmpSession.cpp @@ -360,7 +360,7 @@ void RtmpSession::sendPlayResponse(const string &err,const RtmpMediaSource::Ptr strongSelf->shutdown(); }); _pPlayerSrc = src; - if (src->getRing()->readerCount() == 1) { + if (src->readerCount() == 1) { src->seekTo(0); } diff --git a/src/Rtmp/RtmpSession.h b/src/Rtmp/RtmpSession.h index 2d234563..1a4b1133 100644 --- a/src/Rtmp/RtmpSession.h +++ b/src/Rtmp/RtmpSession.h @@ -81,7 +81,10 @@ private: sendResponse(MSG_CMD, invoke.data()); } - bool close() override { + bool close(bool force) override { + if(!force && _pPublisherSrc->readerCount() != 0){ + return false; + } InfoL << "kick out:" << _mediaInfo._vhost << " " << _mediaInfo._app << " " << _mediaInfo._streamid; safeShutdown(); return true; diff --git a/src/Rtmp/RtmpToRtspMediaSource.h b/src/Rtmp/RtmpToRtspMediaSource.h index c4eff001..aa258c90 100644 --- a/src/Rtmp/RtmpToRtspMediaSource.h +++ b/src/Rtmp/RtmpToRtspMediaSource.h @@ -91,7 +91,7 @@ public: } int readerCount() override { - return RtmpMediaSource::readerCount() + _rtspMuxer->readerCount(); + return RtmpMediaSource::readerCount() + (_rtspMuxer ? _rtspMuxer->readerCount() : 0); } private: RtmpDemuxer::Ptr _rtmpDemuxer; diff --git a/src/RtmpMuxer/RtmpMediaSourceMuxer.h b/src/RtmpMuxer/RtmpMediaSourceMuxer.h index 48eb8e43..c9855d7f 100644 --- a/src/RtmpMuxer/RtmpMediaSourceMuxer.h +++ b/src/RtmpMuxer/RtmpMediaSourceMuxer.h @@ -49,7 +49,7 @@ public: _mediaSouce->setListener(listener); } int readerCount() const{ - return _mediaSouce->getRing()->readerCount(); + return _mediaSouce->readerCount(); } private: void onAllTrackReady() override { diff --git a/src/Rtsp/RtspMediaSource.h b/src/Rtsp/RtspMediaSource.h index aef2e6b6..56743f85 100644 --- a/src/Rtsp/RtspMediaSource.h +++ b/src/Rtsp/RtspMediaSource.h @@ -55,10 +55,13 @@ public: typedef std::shared_ptr Ptr; typedef RingBuffer RingType; - RtspMediaSource(const string &strVhost,const string &strApp, const string &strId,int ringSize = 0) : + RtspMediaSource(const string &strVhost, + const string &strApp, + const string &strId, + int ringSize = 0) : MediaSource(RTSP_SCHEMA,strVhost,strApp,strId), - _pRing(new RingBuffer(ringSize)) { - } + _ringSize(ringSize){} + virtual ~RtspMediaSource() {} const RingType::Ptr &getRing() const { @@ -67,7 +70,7 @@ public: } int readerCount() override { - return _pRing->readerCount(); + return _pRing ? _pRing->readerCount() : 0; } const string& getSdp() const { @@ -114,7 +117,6 @@ public: //派生类设置该媒体源媒体描述信息 _strSdp = sdp; _sdpAttr.load(sdp); - regist(); } void onWrite(const RtpPacket::Ptr &rtppt, bool keyPos) override { @@ -124,12 +126,50 @@ public: track->_time_stamp = rtppt->timeStamp; track->_ssrc = rtppt->ssrc; } + if(!_pRing){ + weak_ptr weakSelf = dynamic_pointer_cast(shared_from_this()); + _pRing = std::make_shared(_ringSize,[weakSelf](const EventPoller::Ptr &,int size,bool){ + auto strongSelf = weakSelf.lock(); + if(!strongSelf){ + return; + } + strongSelf->onReaderChanged(size); + }); + onReaderChanged(0); + regist(); + } _pRing->write(rtppt,keyPos); + checkNoneReader(); + } +private: + void onReaderChanged(int size){ + if(size != 0 || readerCount() != 0){ + //还有消费者正在观看该流,我们记录最后一次活动时间 + _readerTicker.resetTime(); + _asyncEmitNoneReader = false; + return; + } + _asyncEmitNoneReader = true; + } + + void checkNoneReader(){ + GET_CONFIG_AND_REGISTER(int,stream_none_reader_delay,Broadcast::kStreamNoneReaderDelayMS); + if(_asyncEmitNoneReader && _readerTicker.elapsedTime() > stream_none_reader_delay){ + _asyncEmitNoneReader = false; + auto listener = _listener.lock(); + if(!listener){ + return; + } + listener->onNoneReader(*this); + } } protected: SdpAttr _sdpAttr; string _strSdp; //媒体描述信息 RingType::Ptr _pRing; //rtp环形缓冲 + int _ringSize; + Ticker _readerTicker; + bool _asyncEmitNoneReader = false; }; } /* namespace mediakit */ diff --git a/src/Rtsp/RtspSession.cpp b/src/Rtsp/RtspSession.cpp index c0672114..3ba0d5e1 100644 --- a/src/Rtsp/RtspSession.cpp +++ b/src/Rtsp/RtspSession.cpp @@ -1161,7 +1161,10 @@ inline int RtspSession::getTrackIndexByInterleaved(int interleaved){ return -1; } -bool RtspSession::close() { +bool RtspSession::close(bool force) { + if(!force && _pushSrc->readerCount() != 0){ + return false; + } InfoL << "kick out:" << _mediaInfo._vhost << " " << _mediaInfo._app << " " << _mediaInfo._streamid; safeShutdown(); return true; diff --git a/src/Rtsp/RtspSession.h b/src/Rtsp/RtspSession.h index 137ddb22..0a0e72b5 100644 --- a/src/Rtsp/RtspSession.h +++ b/src/Rtsp/RtspSession.h @@ -105,7 +105,7 @@ protected: //RtpReceiver override void onRtpSorted(const RtpPacket::Ptr &rtppt, int trackidx) override; //MediaSourceEvent override - bool close() override ; + bool close(bool force) override ; //TcpSession override int send(const Buffer::Ptr &pkt) override; diff --git a/src/Rtsp/RtspToRtmpMediaSource.h b/src/Rtsp/RtspToRtmpMediaSource.h index f5d28770..a724e8cb 100644 --- a/src/Rtsp/RtspToRtmpMediaSource.h +++ b/src/Rtsp/RtspToRtmpMediaSource.h @@ -65,8 +65,7 @@ public: _rtmpMuxer = std::make_shared(getVhost(), getApp(), getId(), - std::make_shared( - _rtspDemuxer->getDuration())); + std::make_shared(_rtspDemuxer->getDuration())); for (auto &track : _rtspDemuxer->getTracks(false)) { _rtmpMuxer->addTrack(track); _recorder->addTrack(track); @@ -86,7 +85,7 @@ public: } } int readerCount() override { - return RtspMediaSource::readerCount() + _rtmpMuxer->readerCount(); + return RtspMediaSource::readerCount() + (_rtmpMuxer ? _rtmpMuxer->readerCount() : 0); } private: RtspDemuxer::Ptr _rtspDemuxer; diff --git a/src/RtspMuxer/RtspMediaSourceMuxer.h b/src/RtspMuxer/RtspMediaSourceMuxer.h index 5e8d607d..6d21ac8f 100644 --- a/src/RtspMuxer/RtspMediaSourceMuxer.h +++ b/src/RtspMuxer/RtspMediaSourceMuxer.h @@ -39,8 +39,9 @@ public: RtspMediaSourceMuxer(const string &vhost, const string &strApp, const string &strId, - const TitleSdp::Ptr &title = nullptr) : RtspMuxer(title){ - _mediaSouce = std::make_shared(vhost,strApp,strId); + const TitleSdp::Ptr &title = nullptr, + bool masterSrc = true) : RtspMuxer(title){ + _mediaSouce = std::make_shared(vhost,strApp,strId,0,masterSrc); getRtpRing()->setDelegate(_mediaSouce); } virtual ~RtspMediaSourceMuxer(){} @@ -49,7 +50,7 @@ public: _mediaSouce->setListener(listener); } int readerCount() const{ - return _mediaSouce->getRing()->readerCount(); + return _mediaSouce->readerCount(); } void setTimeStamp(uint32_t stamp){ _mediaSouce->setTimeStamp(stamp); diff --git a/src/Shell/ShellCMD.cpp b/src/Shell/ShellCMD.cpp index 01ad7f42..eacc6c9f 100644 --- a/src/Shell/ShellCMD.cpp +++ b/src/Shell/ShellCMD.cpp @@ -51,7 +51,7 @@ public: if(!media) { break; } - if(!media->close()) { + if(!media->close(true)) { break; } (*stream) << "\t踢出成功:" @@ -86,9 +86,12 @@ public: } }; -static onceToken s_token([]() { - REGIST_CMD(media); -}, nullptr); +void installShellCMD(){ + static onceToken s_token([]() { + REGIST_CMD(media); + }, nullptr); +} + } /* namespace mediakit */ \ No newline at end of file diff --git a/src/Shell/ShellSession.cpp b/src/Shell/ShellSession.cpp index e3a5837c..531f1d6d 100644 --- a/src/Shell/ShellSession.cpp +++ b/src/Shell/ShellSession.cpp @@ -33,7 +33,10 @@ using namespace toolkit; namespace mediakit { +extern void installShellCMD(); + ShellSession::ShellSession(const Socket::Ptr &_sock) : TcpSession(_sock) { + installShellCMD(); pleaseInputUser(); }