diff --git a/src/Rtp/GB28181Process.cpp b/src/Rtp/GB28181Process.cpp index 6400c98f..c4dc8c24 100644 --- a/src/Rtp/GB28181Process.cpp +++ b/src/Rtp/GB28181Process.cpp @@ -24,36 +24,23 @@ static inline bool checkTS(const uint8_t *packet, size_t bytes){ return bytes % TS_PACKET_SIZE == 0 && packet[0] == TS_SYNC_BYTE; } -class RtpReceiverImp : public RtpReceiver { +class RtpReceiverImp : public RtpTrackImp { public: using Ptr = std::shared_ptr; - RtpReceiverImp(int sample_rate, function cb, function cb_before = nullptr){ + RtpReceiverImp(int sample_rate, RtpTrackImp::OnSorted cb, RtpTrackImp::BeforeSorted cb_before = nullptr){ _sample_rate = sample_rate; - _on_sort = std::move(cb); - _on_before_sort = std::move(cb_before); + setOnSorted(std::move(cb)); + setBeforeSorted(std::move(cb_before)); } ~RtpReceiverImp() override = default; bool inputRtp(TrackType type, uint8_t *ptr, size_t len){ - return handleOneRtp((int) type, type, _sample_rate, ptr, len); - } - -protected: - void onRtpSorted(RtpPacket::Ptr rtp, int track_index) override { - _on_sort(std::move(rtp)); - } - - void onBeforeRtpSorted(const RtpPacket::Ptr &rtp, int track_index) override { - if (_on_before_sort) { - _on_before_sort(rtp); - } + return RtpTrack::inputRtp(type, _sample_rate, ptr, len); } private: int _sample_rate; - function _on_sort; - function _on_before_sort; }; /////////////////////////////////////////////////////////////////////////////////////////// diff --git a/src/Rtp/RtpSession.cpp b/src/Rtp/RtpSession.cpp index 880fd6cc..c685114c 100644 --- a/src/Rtp/RtpSession.cpp +++ b/src/Rtp/RtpSession.cpp @@ -103,7 +103,7 @@ void RtpSession::onRtpPacket(const char *data, size_t len) { } try { _process->inputRtp(false, getSock(), data, len, &_addr); - } catch (RtpReceiver::BadRtpException &ex) { + } catch (RtpTrack::BadRtpException &ex) { if (!_is_udp) { WarnL << ex.what() << ",开始搜索ssrc以便恢复上下文"; _search_rtp = true; diff --git a/src/Rtsp/RtpReceiver.cpp b/src/Rtsp/RtpReceiver.cpp index 3cc50dbb..5819dd48 100644 --- a/src/Rtsp/RtpReceiver.cpp +++ b/src/Rtsp/RtpReceiver.cpp @@ -15,19 +15,23 @@ namespace mediakit { -RtpReceiver::RtpReceiver() { - int index = 0; - for (auto &sortor : _rtp_sortor) { - sortor.setOnSort([this, index](uint16_t seq, RtpPacket::Ptr &packet) { - onRtpSorted(std::move(packet), index); - }); - ++index; - } +RtpTrack::RtpTrack() { + setOnSort([this](uint16_t seq, RtpPacket::Ptr &packet) { + onRtpSorted(std::move(packet)); + }); } -RtpReceiver::~RtpReceiver() {} +uint32_t RtpTrack::getSSRC() const { + return _ssrc; +} -bool RtpReceiver::handleOneRtp(int index, TrackType type, int sample_rate, uint8_t *ptr, size_t len) { +void RtpTrack::clear() { + _ssrc = 0; + _ssrc_alive.resetTime(); + PacketSortor::clear(); +} + +bool RtpTrack::inputRtp(TrackType type, int sample_rate, uint8_t *ptr, size_t len) { if (len < RtpPacket::kRtpHeaderSize) { WarnL << "rtp包太小:" << len; return false; @@ -52,23 +56,23 @@ bool RtpReceiver::handleOneRtp(int index, TrackType type, int sample_rate, uint8 //比对缓存ssrc auto ssrc = ntohl(header->ssrc); - if (!_ssrc[index]) { + if (!_ssrc) { //记录并锁定ssrc - _ssrc[index] = ssrc; - _ssrc_alive[index].resetTime(); - } else if (_ssrc[index] == ssrc) { + _ssrc = ssrc; + _ssrc_alive.resetTime(); + } else if (_ssrc == ssrc) { //ssrc匹配正确,刷新计时器 - _ssrc_alive[index].resetTime(); + _ssrc_alive.resetTime(); } else { //ssrc错误 - if (_ssrc_alive[index].elapsedTime() < 3 * 1000) { + if (_ssrc_alive.elapsedTime() < 3 * 1000) { //接受正确ssrc的rtp在10秒内,那么我们认为存在多路rtp,忽略掉ssrc不匹配的rtp - WarnL << "ssrc不匹配,rtp已丢弃:" << ssrc << " != " << _ssrc[index]; + WarnL << "ssrc不匹配,rtp已丢弃:" << ssrc << " != " << _ssrc; return false; } - InfoL << "rtp流ssrc切换:" << _ssrc[index] << " -> " << ssrc; - _ssrc[index] = ssrc; - _ssrc_alive[index].resetTime(); + InfoL << "rtp流ssrc切换:" << _ssrc << " -> " << ssrc; + _ssrc = ssrc; + _ssrc_alive.resetTime(); } auto rtp = RtpPacket::create(); @@ -87,29 +91,32 @@ bool RtpReceiver::handleOneRtp(int index, TrackType type, int sample_rate, uint8 //拷贝rtp memcpy(&data[4], ptr, len); - onBeforeRtpSorted(rtp, index); + onBeforeRtpSorted(rtp); auto seq = rtp->getSeq(); - _rtp_sortor[index].sortPacket(seq, std::move(rtp)); + sortPacket(seq, std::move(rtp)); return true; } -void RtpReceiver::clear() { - CLEAR_ARR(_ssrc); - for (auto &sortor : _rtp_sortor) { - sortor.clear(); +//////////////////////////////////////////////////////////////////////////////////// + +void RtpTrackImp::setOnSorted(OnSorted cb) { + _on_sorted = std::move(cb); +} + +void RtpTrackImp::setBeforeSorted(BeforeSorted cb) { + _on_before_sorted = std::move(cb); +} + +void RtpTrackImp::onRtpSorted(RtpPacket::Ptr rtp) { + if (_on_sorted) { + _on_sorted(std::move(rtp)); } } -size_t RtpReceiver::getJitterSize(int index) const{ - return _rtp_sortor[index].getJitterSize(); -} - -size_t RtpReceiver::getCycleCount(int index) const{ - return _rtp_sortor[index].getCycleCount(); -} - -uint32_t RtpReceiver::getSSRC(int index) const{ - return _ssrc[index]; +void RtpTrackImp::onBeforeRtpSorted(const RtpPacket::Ptr &rtp) { + if (_on_before_sorted) { + _on_before_sorted(rtp); + } } }//namespace mediakit diff --git a/src/Rtsp/RtpReceiver.h b/src/Rtsp/RtpReceiver.h index 24cf8fb8..608fb3cd 100644 --- a/src/Rtsp/RtpReceiver.h +++ b/src/Rtsp/RtpReceiver.h @@ -160,11 +160,8 @@ private: function _cb; }; -class RtpReceiver { +class RtpTrack : private PacketSortor{ public: - RtpReceiver(); - virtual ~RtpReceiver(); - class BadRtpException : public invalid_argument { public: template @@ -172,7 +169,60 @@ public: ~BadRtpException() = default; }; + RtpTrack(); + virtual ~RtpTrack() = default; + + void clear(); + uint32_t getSSRC() const; + bool inputRtp(TrackType type, int sample_rate, uint8_t *ptr, size_t len); + protected: + virtual void onRtpSorted(RtpPacket::Ptr rtp) {} + virtual void onBeforeRtpSorted(const RtpPacket::Ptr &rtp) {} + +private: + uint32_t _ssrc = 0; + Ticker _ssrc_alive; +}; + +class RtpTrackImp : public RtpTrack{ +public: + using OnSorted = function; + using BeforeSorted = function; + + RtpTrackImp() = default; + ~RtpTrackImp() override = default; + + void setOnSorted(OnSorted cb); + void setBeforeSorted(BeforeSorted cb); + +protected: + void onRtpSorted(RtpPacket::Ptr rtp) override; + void onBeforeRtpSorted(const RtpPacket::Ptr &rtp) override; + +private: + OnSorted _on_sorted; + BeforeSorted _on_before_sorted; +}; + +template +class RtpMultiReceiver { +public: + RtpMultiReceiver() { + int index = 0; + for (auto &track : _track) { + track.setOnSorted([this, index](RtpPacket::Ptr rtp) { + onRtpSorted(std::move(rtp), index); + }); + track.setBeforeSorted([this, index](const RtpPacket::Ptr &rtp) { + onBeforeRtpSorted(rtp, index); + }); + ++index; + } + } + + virtual ~RtpMultiReceiver() = default; + /** * 输入数据指针生成并排序rtp包 * @param index track下标索引 @@ -182,34 +232,49 @@ protected: * @param len rtp数据指针长度 * @return 解析成功返回true */ - bool handleOneRtp(int index, TrackType type, int samplerate, uint8_t *ptr, size_t len); + bool handleOneRtp(int index, TrackType type, int sample_rate, uint8_t *ptr, size_t len){ + return _track[index].inputRtp(type, sample_rate, ptr, len); + } + void clear() { + for (auto &track : _track) { + track.clear(); + } + } + + size_t getJitterSize(int index) const { + return _track[index].getJitterSize(); + } + + size_t getCycleCount(int index) const { + return _track[index].getCycleCount(); + } + + uint32_t getSSRC(int index) const { + return _track[index].getSSRC(); + } + +protected: /** * rtp数据包排序后输出 * @param rtp rtp数据包 * @param track_index track索引 */ - virtual void onRtpSorted(RtpPacket::Ptr rtp, int track_index) {} + virtual void onRtpSorted(RtpPacket::Ptr rtp, int index) {} /** * 解析出rtp但还未排序 * @param rtp rtp数据包 * @param track_index track索引 */ - virtual void onBeforeRtpSorted(const RtpPacket::Ptr &rtp, int track_index) {} - - void clear(); - size_t getJitterSize(int track_index) const; - size_t getCycleCount(int track_index) const; - uint32_t getSSRC(int track_index) const; + virtual void onBeforeRtpSorted(const RtpPacket::Ptr &rtp, int index) {} private: - uint32_t _ssrc[2] = {0, 0}; - Ticker _ssrc_alive[2]; - //rtp排序缓存,根据seq排序 - PacketSortor _rtp_sortor[2]; + RtpTrackImp _track[kCount]; }; +using RtpReceiver = RtpMultiReceiver<2>; + }//namespace mediakit diff --git a/webrtc/WebRtcTransport.cpp b/webrtc/WebRtcTransport.cpp index 5b2ce735..b2f095d1 100644 --- a/webrtc/WebRtcTransport.cpp +++ b/webrtc/WebRtcTransport.cpp @@ -552,14 +552,14 @@ SdpAttrCandidate::Ptr WebRtcTransportImp::getIceCandidate() const{ /////////////////////////////////////////////////////////////////// -class RtpChannel : public RtpReceiver { +class RtpChannel : public RtpTrackImp { public: uint32_t rtp_ssrc; public: - RtpChannel(function on_rtp, function on_nack) { - _on_sort = std::move(on_rtp); - nack_ctx.setOnNack(std::move(on_nack)); + RtpChannel(RtpTrackImp::OnSorted cb, function on_nack) { + setOnSorted(std::move(cb)); + _nack_ctx.setOnNack(std::move(on_nack)); } ~RtpChannel() override = default; @@ -569,27 +569,21 @@ public: RtpHeader *rtp = (RtpHeader *) ptr; auto seq = ntohs(rtp->seq); //统计rtp接受情况,便于生成nack rtcp包 - nack_ctx.received(seq); + _nack_ctx.received(seq); //统计rtp收到的情况,好做rr汇报 - rtcp_context.onRtp(seq, ntohl(rtp->stamp), len); + _rtcp_context.onRtp(seq, ntohl(rtp->stamp), len); } - return handleOneRtp((int) type, type, sample_rate, ptr, len); + return RtpTrack::inputRtp(type, sample_rate, ptr, len); } Buffer::Ptr createRtcpRR(RtcpHeader *sr, uint32_t ssrc) { - rtcp_context.onRtcp(sr); - return rtcp_context.createRtcpRR(ssrc, rtp_ssrc); - } - -protected: - void onRtpSorted(RtpPacket::Ptr rtp, int track_index) override { - _on_sort(std::move(rtp)); + _rtcp_context.onRtcp(sr); + return _rtcp_context.createRtcpRR(ssrc, rtp_ssrc); } private: - NackContext nack_ctx; - RtcpContext rtcp_context{true}; - function _on_sort; + NackContext _nack_ctx; + RtcpContext _rtcp_context{true}; }; std::shared_ptr MediaTrack::getRtpChannel(uint32_t ssrc) const{