From 978143c86db1d393049c056576ba33ae1cf2ca6a Mon Sep 17 00:00:00 2001 From: Dw9 Date: Mon, 17 Apr 2023 12:19:24 +0800 Subject: [PATCH] =?UTF-8?q?rtp=E7=BA=A7=E8=81=94(ps/ts/es)=E6=96=B0?= =?UTF-8?q?=E5=A2=9E=E6=94=AF=E6=8C=81gop=E7=BC=93=E5=AD=98=E5=8A=9F?= =?UTF-8?q?=E8=83=BD=20(#2395)?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit 该修改主要解决rtp级联(调用startSendRtp接口)未做gop缓存导致上级无法秒开的问题。 同时通过RingBuffer对象线程隔离的特性,实现了在断连续推场景下归属线程切换导致的线程安全问题。 用户如未使用rtp级联功能,请修改配置文件关闭GOP缓存(rtp_proxy.gop_cache=0)以便节省内存。 --------- Co-authored-by: 夏楚 <771730766@qq.com> --- conf/config.ini | 3 + src/Common/MediaSink.cpp | 6 ++ src/Common/MediaSink.h | 6 ++ src/Common/MediaSource.cpp | 2 - src/Common/MultiMediaSourceMuxer.cpp | 151 ++++++++++++++++----------- src/Common/MultiMediaSourceMuxer.h | 10 +- src/Common/config.cpp | 2 + src/Common/config.h | 2 + src/Rtp/RtpCache.cpp | 14 --- src/Rtp/RtpCache.h | 3 - 10 files changed, 115 insertions(+), 84 deletions(-) diff --git a/conf/config.ini b/conf/config.ini index 86b403ea..8950b299 100644 --- a/conf/config.ini +++ b/conf/config.ini @@ -295,6 +295,9 @@ h265_pt=99 ps_pt=96 #rtp opus 负载的pt opus_pt=100 +#RtpSender相关功能是否提前开启gop缓存优化级联秒开体验,默认开启 +#如果不调用startSendRtp相关接口,可以置0节省内存 +gop_cache=1 [rtc] #rtc播放推流、播放超时时间 diff --git a/src/Common/MediaSink.cpp b/src/Common/MediaSink.cpp index 66062b97..d90d5798 100644 --- a/src/Common/MediaSink.cpp +++ b/src/Common/MediaSink.cpp @@ -63,6 +63,7 @@ bool MediaSink::addTrack(const Track::Ptr &track_in) { void MediaSink::resetTracks() { _all_track_ready = false; + _have_video = false; _track_map.clear(); _track_ready_callback.clear(); _ticker.resetTime(); @@ -186,6 +187,7 @@ void MediaSink::onAllTrackReady_l() { } onAllTrackReady(); _all_track_ready = true; + _have_video = (bool)getTrack(TrackVideo); } vector MediaSink::getTracks(bool ready) const{ @@ -292,6 +294,10 @@ void MediaSink::enableMuteAudio(bool flag) { _add_mute_audio = flag; } +bool MediaSink::haveVideo() const { + return _have_video; +} + ///////////////////////////DemuxerSink////////////////////////////// void MediaSinkDelegate::setTrackListener(TrackListener *listener) { diff --git a/src/Common/MediaSink.h b/src/Common/MediaSink.h index 88420593..ea207bae 100644 --- a/src/Common/MediaSink.h +++ b/src/Common/MediaSink.h @@ -131,6 +131,11 @@ public: */ void enableMuteAudio(bool flag); + /** + * 是否有视频track + */ + bool haveVideo() const; + protected: /** * 某track已经准备好,其ready()状态返回true, @@ -171,6 +176,7 @@ private: bool _only_audio = false; bool _add_mute_audio = true; bool _all_track_ready = false; + bool _have_video = false; size_t _max_track_size = 2; std::unordered_map > _track_map; std::unordered_map > _frame_unread; diff --git a/src/Common/MediaSource.cpp b/src/Common/MediaSource.cpp index 66d333c1..5e378124 100644 --- a/src/Common/MediaSource.cpp +++ b/src/Common/MediaSource.cpp @@ -153,8 +153,6 @@ std::shared_ptr MediaSource::getOwnership() { //已经被所有 return nullptr; } - // 关闭所有rtp推流,确保线程安全 - stopSendRtp(""); weak_ptr weak_self = shared_from_this(); //确保返回的Ownership智能指针不为空,0x01无实际意义 return std::shared_ptr((void *) 0x01, [weak_self](void *ptr) { diff --git a/src/Common/MultiMediaSourceMuxer.cpp b/src/Common/MultiMediaSourceMuxer.cpp index 389da30f..33e5f3c0 100644 --- a/src/Common/MultiMediaSourceMuxer.cpp +++ b/src/Common/MultiMediaSourceMuxer.cpp @@ -21,6 +21,17 @@ namespace toolkit { namespace mediakit { +namespace { +class MediaSourceForMuxer : public MediaSource { +public: + MediaSourceForMuxer(const MultiMediaSourceMuxer::Ptr &muxer) + : MediaSource("muxer", muxer->getVhost(), muxer->getApp(), muxer->getStreamId()) { + MediaSource::setListener(muxer); + } + int readerCount() override { return 0; } +}; +} // namespace + static std::shared_ptr makeRecorder(MediaSource &sender, const vector &tracks, Recorder::type type, const ProtocolOption &option){ auto recorder = Recorder::createRecorder(type, sender.getVhost(), sender.getApp(), sender.getId(), option); for (auto &track : tracks) { @@ -144,20 +155,15 @@ void MultiMediaSourceMuxer::setTrackListener(const std::weak_ptr &list int MultiMediaSourceMuxer::totalReaderCount() const { auto hls = _hls; - auto ret = (_rtsp ? _rtsp->readerCount() : 0) + - (_rtmp ? _rtmp->readerCount() : 0) + - (_ts ? _ts->readerCount() : 0) + - #if defined(ENABLE_MP4) - (_fmp4 ? _fmp4->readerCount() : 0) + - #endif - (_mp4 ? _option.mp4_as_player : 0) + - (hls ? hls->readerCount() : 0); - -#if defined(ENABLE_RTPPROXY) - return ret + (int)_rtp_sender.size(); -#else - return ret; -#endif + return (_rtsp ? _rtsp->readerCount() : 0) + + (_rtmp ? _rtmp->readerCount() : 0) + + (_ts ? _ts->readerCount() : 0) + + #if defined(ENABLE_MP4) + (_fmp4 ? _fmp4->readerCount() : 0) + + #endif + (_mp4 ? _option.mp4_as_player : 0) + + (hls ? hls->readerCount() : 0) + + (_ring ? _ring->readerCount() : 0); } void MultiMediaSourceMuxer::setTimeStamp(uint32_t stamp) { @@ -237,42 +243,45 @@ bool MultiMediaSourceMuxer::isRecording(MediaSource &sender, Recorder::type type void MultiMediaSourceMuxer::startSendRtp(MediaSource &sender, const MediaSourceEvent::SendRtpArgs &args, const std::function cb) { #if defined(ENABLE_RTPPROXY) + createGopCacheIfNeed(); + + auto ring = _ring; + auto ssrc = args.ssrc; + auto tracks = getTracks(false); auto rtp_sender = std::make_shared(getOwnerPoller(sender)); - weak_ptr weak_sender = sender.shared_from_this(); weak_ptr weak_self = shared_from_this(); - rtp_sender->startSend(args, [args, weak_self, rtp_sender, cb, weak_sender](uint16_t local_port, const SockException &ex) mutable { + + rtp_sender->startSend(args, [ssrc, weak_self, rtp_sender, cb, tracks, ring](uint16_t local_port, const SockException &ex) mutable { cb(local_port, ex); auto strong_self = weak_self.lock(); if (!strong_self || ex) { return; } - if (!strong_self->getOwnerPoller(MediaSource::NullMediaSource())->isCurrentThread()) { - // poller线程发生变更了 - return; - } - for (auto &track : strong_self->getTracks(false)) { + + for (auto &track : tracks) { rtp_sender->addTrack(track); } rtp_sender->addTrackCompleted(); - - auto ssrc = args.ssrc; - rtp_sender->setOnClose([weak_self, ssrc, weak_sender](const toolkit::SockException &ex) { + rtp_sender->setOnClose([weak_self, ssrc](const toolkit::SockException &ex) { if (auto strong_self = weak_self.lock()) { - WarnL << "stream:" << strong_self->shortUrl() << " stop send rtp:" << ssrc << ", reason:" << ex.what(); - strong_self->_rtp_sender.erase(ssrc); - //触发观看人数统计 - auto strong_sender = weak_sender.lock(); - if (strong_sender) { - strong_self->onReaderChanged(*strong_sender, strong_self->totalReaderCount()); - } - NoticeCenter::Instance().emitEvent(Broadcast::kBroadcastSendRtpStopped, *strong_self, ssrc, ex); + // 可能归属线程发生变更 + strong_self->getOwnerPoller(MediaSource::NullMediaSource())->async([=]() { + WarnL << "stream:" << strong_self->shortUrl() << " stop send rtp:" << ssrc << ", reason:" << ex.what(); + strong_self->_rtp_sender.erase(ssrc); + NoticeCenter::Instance().emitEvent(Broadcast::kBroadcastSendRtpStopped, *strong_self, ssrc, ex); + }); } }); - strong_self->_rtp_sender[args.ssrc] = std::move(rtp_sender); - auto strong_sender = weak_sender.lock(); - if (strong_sender) { - strong_self->onReaderChanged(*strong_sender, strong_self->totalReaderCount()); - } + + auto reader = ring->attach(EventPoller::getCurrentPoller()); + reader->setReadCB([rtp_sender](const Frame::Ptr &frame) { + rtp_sender->inputFrame(frame); + }); + + // 可能归属线程发生变更 + strong_self->getOwnerPoller(MediaSource::NullMediaSource())->async([=]() { + strong_self->_rtp_sender[ssrc] = std::move(reader); + }); }); #else cb(0, SockException(Err_other, "该功能未启用,编译时请打开ENABLE_RTPPROXY宏")); @@ -281,10 +290,6 @@ void MultiMediaSourceMuxer::startSendRtp(MediaSource &sender, const MediaSourceE bool MultiMediaSourceMuxer::stopSendRtp(MediaSource &sender, const string &ssrc) { #if defined(ENABLE_RTPPROXY) - onceToken token(nullptr, [&]() { - //关闭rtp推流,可能触发无人观看事件 - onReaderChanged(sender, totalReaderCount()); - }); if (ssrc.empty()) { //关闭全部 auto size = _rtp_sender.size(); @@ -369,9 +374,33 @@ void MultiMediaSourceMuxer::onAllTrackReady() { if (listener) { listener->onAllTrackReady(); } + +#if defined(ENABLE_RTPPROXY) + GET_CONFIG(bool, gop_cache, RtpProxy::kGopCache); + if (gop_cache) { + createGopCacheIfNeed(); + } +#endif InfoL << "stream: " << shortUrl() << " , codec info: " << getTrackInfoStr(this); } +void MultiMediaSourceMuxer::createGopCacheIfNeed() { + if (_ring) { + return; + } + weak_ptr weak_self = shared_from_this(); + _ring = std::make_shared(1024, [weak_self](int size) { + auto strong_self = weak_self.lock(); + if (strong_self) { + // 切换到归属线程 + strong_self->getOwnerPoller(MediaSource::NullMediaSource())->async([=]() { + auto src = std::make_shared(strong_self); + strong_self->onReaderChanged(*src, strong_self->totalReaderCount()); + }); + } + }); +} + void MultiMediaSourceMuxer::resetTracks() { MediaSink::resetTracks(); @@ -390,12 +419,6 @@ void MultiMediaSourceMuxer::resetTracks() { } #endif -#if defined(ENABLE_RTPPROXY) - for (auto &pr : _rtp_sender) { - pr.second->resetTracks(); - } -#endif - //拷贝智能指针,目的是为了防止跨线程调用设置录像相关api导致的线程竞争问题 auto hls = _hls; if (hls) { @@ -443,11 +466,17 @@ bool MultiMediaSourceMuxer::onTrackFrame(const Frame::Ptr &frame_in) { } #endif -#if defined(ENABLE_RTPPROXY) - for (auto &pr : _rtp_sender) { - ret = pr.second->inputFrame(frame) ? true : ret; + if (_ring) { + if (frame->getTrackType() == TrackVideo) { + // 视频时,遇到第一帧配置帧或关键帧则标记为gop开始处 + auto video_key_pos = frame->keyFrame() || frame->configFrame(); + _ring->write(frame, video_key_pos && !_video_key_pos); + _video_key_pos = video_key_pos; + } else { + // 没有视频时,设置is_key为true,目的是关闭gop缓存 + _ring->write(frame, !haveVideo()); + } } -#endif //ENABLE_RTPPROXY return ret; } @@ -457,19 +486,15 @@ bool MultiMediaSourceMuxer::isEnabled(){ //无人观看时,每次检查是否真的无人观看 //有人观看时,则延迟一定时间检查一遍是否无人观看了(节省性能) auto hls = _hls; - auto flag = (_rtmp ? _rtmp->isEnabled() : false) || - (_rtsp ? _rtsp->isEnabled() : false) || - (_ts ? _ts->isEnabled() : false) || - #if defined(ENABLE_MP4) - (_fmp4 ? _fmp4->isEnabled() : false) || - #endif - (hls ? hls->isEnabled() : false) || _mp4; + _is_enable = (_rtmp ? _rtmp->isEnabled() : false) || + (_rtsp ? _rtsp->isEnabled() : false) || + (_ts ? _ts->isEnabled() : false) || + #if defined(ENABLE_MP4) + (_fmp4 ? _fmp4->isEnabled() : false) || + #endif + (_ring ? (bool)_ring->readerCount() : false) || + (hls ? hls->isEnabled() : false) || _mp4; -#if defined(ENABLE_RTPPROXY) - _is_enable = flag || _rtp_sender.size(); -#else - _is_enable = flag; -#endif //ENABLE_RTPPROXY if (_is_enable) { //无人观看时,不刷新计时器,因为无人观看时每次都会检查一遍,所以刷新计数器无意义且浪费cpu _last_check.resetTime(); diff --git a/src/Common/MultiMediaSourceMuxer.h b/src/Common/MultiMediaSourceMuxer.h index 7bedef98..77585f84 100644 --- a/src/Common/MultiMediaSourceMuxer.h +++ b/src/Common/MultiMediaSourceMuxer.h @@ -28,6 +28,7 @@ namespace mediakit { class MultiMediaSourceMuxer : public MediaSourceEventInterceptor, public MediaSink, public std::enable_shared_from_this{ public: using Ptr = std::shared_ptr; + using RingType = toolkit::RingBuffer; class Listener { public: @@ -46,7 +47,7 @@ public: void setMediaListener(const std::weak_ptr &listener); /** - * 随着Track就绪事件监听器 + * 设置Track就绪事件监听器 * @param listener 事件监听器 */ void setTrackListener(const std::weak_ptr &listener); @@ -151,9 +152,13 @@ protected: */ bool onTrackFrame(const Frame::Ptr &frame) override; +private: + void createGopCacheIfNeed(); + private: bool _is_enable = false; bool _create_in_poller = false; + bool _video_key_pos = false; std::string _vhost; std::string _app; std::string _stream_id; @@ -162,7 +167,7 @@ private: Stamp _stamp[2]; std::weak_ptr _track_listener; #if defined(ENABLE_RTPPROXY) - std::unordered_map _rtp_sender; + std::unordered_map _rtp_sender; #endif //ENABLE_RTPPROXY #if defined(ENABLE_MP4) @@ -174,6 +179,7 @@ private: MediaSinkInterface::Ptr _mp4; HlsRecorder::Ptr _hls; toolkit::EventPoller::Ptr _poller; + RingType::Ptr _ring; //对象个数统计 toolkit::ObjectStatistic _statistic; diff --git a/src/Common/config.cpp b/src/Common/config.cpp index bf8213f1..cae98d55 100644 --- a/src/Common/config.cpp +++ b/src/Common/config.cpp @@ -318,6 +318,7 @@ const string kH264PT = RTP_PROXY_FIELD "h264_pt"; const string kH265PT = RTP_PROXY_FIELD "h265_pt"; const string kPSPT = RTP_PROXY_FIELD "ps_pt"; const string kOpusPT = RTP_PROXY_FIELD "opus_pt"; +const string kGopCache = RTP_PROXY_FIELD "gop_cache"; static onceToken token([]() { mINI::Instance()[kDumpDir] = ""; @@ -327,6 +328,7 @@ static onceToken token([]() { mINI::Instance()[kH265PT] = 99; mINI::Instance()[kPSPT] = 96; mINI::Instance()[kOpusPT] = 100; + mINI::Instance()[kGopCache] = 1; }); } // namespace RtpProxy diff --git a/src/Common/config.h b/src/Common/config.h index cc281f6e..146c6067 100644 --- a/src/Common/config.h +++ b/src/Common/config.h @@ -352,6 +352,8 @@ extern const std::string kH265PT; extern const std::string kPSPT; // rtp server opus 的pt extern const std::string kOpusPT; +// RtpSender相关功能是否提前开启gop缓存优化级联秒开体验,默认开启 +extern const std::string kGopCache; } // namespace RtpProxy /** diff --git a/src/Rtp/RtpCache.cpp b/src/Rtp/RtpCache.cpp index 7a897847..0e723b57 100644 --- a/src/Rtp/RtpCache.cpp +++ b/src/Rtp/RtpCache.cpp @@ -20,14 +20,6 @@ RtpCache::RtpCache(onFlushed cb) { _cb = std::move(cb); } -bool RtpCache::firstKeyReady(bool in) { - if (_first_key) { - return _first_key; - } - _first_key = in; - return _first_key; -} - void RtpCache::onFlush(std::shared_ptr> rtp_list, bool) { _cb(std::move(rtp_list)); } @@ -42,9 +34,6 @@ void RtpCachePS::flush() { } void RtpCachePS::onRTP(Buffer::Ptr buffer, bool is_key) { - if (!firstKeyReady(is_key)) { - return; - } auto rtp = std::static_pointer_cast(buffer); auto stamp = rtp->getStampMS(); input(stamp, std::move(buffer), is_key); @@ -56,9 +45,6 @@ void RtpCacheRaw::flush() { } void RtpCacheRaw::onRTP(Buffer::Ptr buffer, bool is_key) { - if (!firstKeyReady(is_key)) { - return; - } auto rtp = std::static_pointer_cast(buffer); auto stamp = rtp->getStampMS(); input(stamp, std::move(buffer), is_key); diff --git a/src/Rtp/RtpCache.h b/src/Rtp/RtpCache.h index c03329cd..a3882222 100644 --- a/src/Rtp/RtpCache.h +++ b/src/Rtp/RtpCache.h @@ -32,13 +32,10 @@ protected: */ void input(uint64_t stamp, toolkit::Buffer::Ptr buffer,bool is_key = false); - bool firstKeyReady(bool in); - protected: void onFlush(std::shared_ptr > rtp_list, bool) override; private: - bool _first_key = false; onFlushed _cb; };