diff --git a/src/Common/MediaSink.cpp b/src/Common/MediaSink.cpp index 64049dc5..eb7cf533 100644 --- a/src/Common/MediaSink.cpp +++ b/src/Common/MediaSink.cpp @@ -26,7 +26,11 @@ #include "MediaSink.h" //最多等待未初始化的Track 10秒,超时之后会忽略未初始化的Track -#define MAX_WAIT_MS 10000 +#define MAX_WAIT_MS_READY 10000 + +//如果添加Track,最多等待3秒 +#define MAX_WAIT_MS_ADD_TRACK 3000 + namespace mediakit{ @@ -34,23 +38,16 @@ void MediaSink::addTrack(const Track::Ptr &track_in) { lock_guard lck(_mtx); //克隆Track,只拷贝其数据,不拷贝其数据转发关系 auto track = track_in->clone(); - auto codec_id = track->getCodecId(); _track_map[codec_id] = track; - auto lam = [this,track](){ + _allTrackReady = false; + _trackReadyCallback[codec_id] = [this, track]() { onTrackReady(track); }; - if(track->ready()){ - lam(); - }else{ - _anyTrackUnReady = true; - _allTrackReady = false; - _trackReadyCallback[codec_id] = lam; - _ticker.resetTime(); - } + _ticker.resetTime(); - track->addDelegate(std::make_shared([this](const Frame::Ptr &frame){ - if(!_anyTrackUnReady){ + track->addDelegate(std::make_shared([this](const Frame::Ptr &frame) { + if (_allTrackReady) { onTrackFrame(frame); } })); @@ -58,7 +55,6 @@ void MediaSink::addTrack(const Track::Ptr &track_in) { void MediaSink::resetTracks() { lock_guard lck(_mtx); - _anyTrackUnReady = false; _allTrackReady = false; _track_map.clear(); _trackReadyCallback.clear(); @@ -83,26 +79,50 @@ void MediaSink::inputFrame(const Frame::Ptr &frame) { } } - if(!_allTrackReady && (_trackReadyCallback.empty() || _ticker.elapsedTime() > MAX_WAIT_MS)){ - _allTrackReady = true; - _anyTrackUnReady = false; - if(!_trackReadyCallback.empty()){ - //这是超时强制忽略未准备好的Track - _trackReadyCallback.clear(); - //移除未准备好的Track - for(auto it = _track_map.begin() ; it != _track_map.end() ; ){ - if(!it->second->ready()){ - it = _track_map.erase(it); - continue; - } - ++it; - } + if(!_allTrackReady){ + if(_ticker.elapsedTime() > MAX_WAIT_MS_READY){ + //如果超过规定时间,那么不再等待并忽略未准备好的Track + emitAllTrackReady(); + return; } - if(!_track_map.empty()){ - //最少有一个有效的Track - onAllTrackReady(); + if(!_trackReadyCallback.empty()){ + //在超时时间内,如果存在未准备好的Track,那么继续等待 + return; } + + if(_track_map.size() == 2){ + //如果已经添加了音视频Track,并且不存在未准备好的Track,那么说明所有Track都准备好了 + emitAllTrackReady(); + return; + } + + if(_track_map.size() == 1 && _ticker.elapsedTime() > MAX_WAIT_MS_ADD_TRACK){ + //如果只有一个Track,那么在该Track添加后,我们最多还等待若干时间(可能后面还会添加Track) + emitAllTrackReady(); + return; + } + } +} + +void MediaSink::emitAllTrackReady() { + _allTrackReady = true; + if(!_trackReadyCallback.empty()){ + //这是超时强制忽略未准备好的Track + _trackReadyCallback.clear(); + //移除未准备好的Track + for(auto it = _track_map.begin() ; it != _track_map.end() ; ){ + if(!it->second->ready()){ + it = _track_map.erase(it); + continue; + } + ++it; + } + } + + if(!_track_map.empty()){ + //最少有一个有效的Track + onAllTrackReady(); } } diff --git a/src/Common/MediaSink.h b/src/Common/MediaSink.h index 64317385..e37980e1 100644 --- a/src/Common/MediaSink.h +++ b/src/Common/MediaSink.h @@ -109,12 +109,13 @@ protected: * @param frame */ virtual void onTrackFrame(const Frame::Ptr &frame) {}; +private: + void emitAllTrackReady(); private: mutable recursive_mutex _mtx; map _track_map; map > _trackReadyCallback; bool _allTrackReady = false; - bool _anyTrackUnReady = false; Ticker _ticker; }; diff --git a/src/Common/Stamp.cpp b/src/Common/Stamp.cpp index 7330de21..e01887e3 100644 --- a/src/Common/Stamp.cpp +++ b/src/Common/Stamp.cpp @@ -26,7 +26,8 @@ #include "Stamp.h" -#define MAX_DELTA_STAMP 300 +#define MAX_DELTA_STAMP 1000 +#define MAX_CTS 500 #define ABS(x) ((x) > 0 ? (x) : (-x)) namespace mediakit { @@ -77,7 +78,7 @@ void Stamp::revise(int64_t dts, int64_t pts, int64_t &dts_out, int64_t &pts_out, dts_out = _relativeStamp; //////////////以下是播放时间戳的计算////////////////// - if(pts_dts_diff > MAX_DELTA_STAMP || pts_dts_diff < -MAX_DELTA_STAMP){ + if(ABS(pts_dts_diff) > MAX_CTS){ //如果差值太大,则认为由于回环导致时间戳错乱了 pts_dts_diff = 0; } diff --git a/src/Rtmp/RtmpMediaSource.h b/src/Rtmp/RtmpMediaSource.h index 982d4401..928c9066 100644 --- a/src/Rtmp/RtmpMediaSource.h +++ b/src/Rtmp/RtmpMediaSource.h @@ -72,7 +72,6 @@ public: const string &stream_id, int ring_size = 0) : MediaSource(RTMP_SCHEMA, vhost, app, stream_id), _ring_size(ring_size) { - _metadata = TitleMeta().getMetadata(); } virtual ~RtmpMediaSource() {} @@ -117,6 +116,9 @@ public: virtual void setMetaData(const AMFValue &metadata) { lock_guard lock(_mtx); _metadata = metadata; + if(_ring){ + regist(); + } } /** @@ -143,10 +145,9 @@ public: _ring = std::make_shared(_ring_size, std::move(lam)); onReaderChanged(0); - //如果输入了非config帧, - //那么说明不再可能获取config帧以及metadata, - //所以我们强制其为已注册 - regist(); + if(_metadata){ + regist(); + } } _track_stamps_map[pkt->typeId] = pkt->timeStamp; _ring->write(pkt, pkt->isVideoKeyFrame()); diff --git a/src/Rtmp/RtmpPlayerImp.h b/src/Rtmp/RtmpPlayerImp.h index b06be303..e90acc43 100644 --- a/src/Rtmp/RtmpPlayerImp.h +++ b/src/Rtmp/RtmpPlayerImp.h @@ -66,6 +66,7 @@ private: _pRtmpMediaSrc = dynamic_pointer_cast(_pMediaSrc); if(_pRtmpMediaSrc){ _pRtmpMediaSrc->setMetaData(val); + _set_meta_data = true; } _delegate.reset(new RtmpDemuxer); _delegate->loadMetaData(val); @@ -73,6 +74,10 @@ private: } void onMediaData(const RtmpPacket::Ptr &chunkData) override { if(_pRtmpMediaSrc){ + if(!_set_meta_data && !chunkData->isCfgFrame()){ + _set_meta_data = true; + _pRtmpMediaSrc->setMetaData(TitleMeta().getMetadata()); + } _pRtmpMediaSrc->onWrite(chunkData); } if(!_delegate){ @@ -83,6 +88,7 @@ private: } private: RtmpMediaSource::Ptr _pRtmpMediaSrc; + bool _set_meta_data = false; }; diff --git a/src/Rtmp/RtmpSession.cpp b/src/Rtmp/RtmpSession.cpp index 6f067fea..061e512a 100644 --- a/src/Rtmp/RtmpSession.cpp +++ b/src/Rtmp/RtmpSession.cpp @@ -434,6 +434,7 @@ void RtmpSession::setMetaData(AMFDecoder &dec) { auto metadata = dec.load(); // dumpMetadata(metadata); _pPublisherSrc->setMetaData(metadata); + _set_meta_data = true; } void RtmpSession::onProcessCmd(AMFDecoder &dec) { @@ -491,6 +492,11 @@ void RtmpSession::onRtmpChunk(RtmpPacket &chunkData) { _stamp[chunkData.typeId % 2].revise(chunkData.timeStamp, chunkData.timeStamp, dts_out, dts_out, true); chunkData.timeStamp = dts_out; } + + if(!_set_meta_data && !chunkData.isCfgFrame()){ + _set_meta_data = true; + _pPublisherSrc->setMetaData(TitleMeta().getMetadata()); + } _pPublisherSrc->onWrite(std::make_shared(std::move(chunkData))); } break; diff --git a/src/Rtmp/RtmpSession.h b/src/Rtmp/RtmpSession.h index 572d40f8..f5b6df79 100644 --- a/src/Rtmp/RtmpSession.h +++ b/src/Rtmp/RtmpSession.h @@ -95,6 +95,7 @@ private: std::string _strTcUrl; MediaInfo _mediaInfo; double _dNowReqID = 0; + bool _set_meta_data = false; Ticker _ticker;//数据接收时间 RingBuffer::RingReader::Ptr _pRingReader; std::shared_ptr _pPublisherSrc;