diff --git a/src/Rtsp/RtpReceiver.cpp b/src/Rtsp/RtpReceiver.cpp index 0b10063c..6268c358 100644 --- a/src/Rtsp/RtpReceiver.cpp +++ b/src/Rtsp/RtpReceiver.cpp @@ -11,11 +11,6 @@ #include "Common/config.h" #include "RtpReceiver.h" -#define POP_HEAD(trackidx) \ - auto it = _rtp_sort_cache_map[trackidx].begin(); \ - onRtpSorted(it->second, trackidx); \ - _rtp_sort_cache_map[trackidx].erase(it); - #define AV_RB16(x) \ ((((const uint8_t*)(x))[0] << 8) | \ ((const uint8_t*)(x))[1]) @@ -24,7 +19,18 @@ namespace mediakit { -RtpReceiver::RtpReceiver() {} +RtpReceiver::RtpReceiver() { + GET_CONFIG(uint32_t, clearCount, Rtp::kClearCount); + GET_CONFIG(uint32_t, maxRtpCount, Rtp::kMaxRtpCount); + int index = 0; + for (auto &sortor : _rtp_sortor) { + sortor.setup(maxRtpCount, clearCount); + sortor.setOnSort([this, index](uint16_t seq, const RtpPacket::Ptr &packet) { + onRtpSorted(packet, index); + }); + ++index; + } +} RtpReceiver::~RtpReceiver() {} bool RtpReceiver::handleOneRtp(int track_index, TrackType type, int samplerate, unsigned char *rtp_raw_ptr, unsigned int rtp_raw_len) { @@ -80,7 +86,7 @@ bool RtpReceiver::handleOneRtp(int track_index, TrackType type, int samplerate, if (_ssrc_err_count[track_index]++ > 10) { //ssrc切换后清除老数据 WarnL << "ssrc更换:" << _ssrc[track_index] << " -> " << rtp.ssrc; - _rtp_sort_cache_map[track_index].clear(); + _rtp_sortor[track_index].clear(); _ssrc[track_index] = rtp.ssrc; } return false; @@ -127,56 +133,15 @@ bool RtpReceiver::handleOneRtp(int track_index, TrackType type, int samplerate, } void RtpReceiver::sortRtp(const RtpPacket::Ptr &rtp,int track_index){ - if(rtp->sequence != _last_seq[track_index] + 1 && _last_seq[track_index] != 0){ - //包乱序或丢包 - _seq_ok_count[track_index] = 0; - _sort_started[track_index] = true; - if(_last_seq[track_index] > rtp->sequence && _last_seq[track_index] - rtp->sequence > 0xFF){ - //sequence回环,清空所有排序缓存 - while (_rtp_sort_cache_map[track_index].size()) { - POP_HEAD(track_index) - } - ++_seq_cycle_count[track_index]; - } - }else{ - //正确序列的包 - _seq_ok_count[track_index]++; - } - - _last_seq[track_index] = rtp->sequence; - - //开始排序缓存 - if (_sort_started[track_index]) { - _rtp_sort_cache_map[track_index].emplace(rtp->sequence, rtp); - GET_CONFIG(uint32_t,clearCount,Rtp::kClearCount); - GET_CONFIG(uint32_t,maxRtpCount,Rtp::kMaxRtpCount); - if (_seq_ok_count[track_index] >= clearCount) { - //网络环境改善,需要清空排序缓存 - _seq_ok_count[track_index] = 0; - _sort_started[track_index] = false; - while (_rtp_sort_cache_map[track_index].size()) { - POP_HEAD(track_index) - } - } else if (_rtp_sort_cache_map[track_index].size() >= maxRtpCount) { - //排序缓存溢出 - POP_HEAD(track_index) - } - }else{ - //正确序列 - onRtpSorted(rtp, track_index); - } + _rtp_sortor[track_index].sortPacket(rtp->sequence, rtp); } void RtpReceiver::clear() { - CLEAR_ARR(_last_seq); CLEAR_ARR(_ssrc); CLEAR_ARR(_ssrc_err_count); - CLEAR_ARR(_seq_ok_count); - CLEAR_ARR(_sort_started); - CLEAR_ARR(_seq_cycle_count); - - _rtp_sort_cache_map[0].clear(); - _rtp_sort_cache_map[1].clear(); + for (auto &sortor : _rtp_sortor) { + sortor.clear(); + } } void RtpReceiver::setPoolSize(int size) { @@ -184,11 +149,11 @@ void RtpReceiver::setPoolSize(int size) { } int RtpReceiver::getJitterSize(int track_index){ - return _rtp_sort_cache_map[track_index].size(); + return _rtp_sortor[track_index].getJitterSize(); } int RtpReceiver::getCycleCount(int track_index){ - return _seq_cycle_count[track_index]; + return _rtp_sortor[track_index].getCycleCount(); } diff --git a/src/Rtsp/RtpReceiver.h b/src/Rtsp/RtpReceiver.h index 243fef20..f4c7e94b 100644 --- a/src/Rtsp/RtpReceiver.h +++ b/src/Rtsp/RtpReceiver.h @@ -11,24 +11,141 @@ #ifndef ZLMEDIAKIT_RTPRECEIVER_H #define ZLMEDIAKIT_RTPRECEIVER_H - #include #include #include #include "RtpCodec.h" #include "RtspMediaSource.h" - using namespace std; using namespace toolkit; namespace mediakit { +template +class PacketSortor { +public: + PacketSortor() = default; + ~PacketSortor() = default; + + /** + * 设置参数 + * @param max_sort_size 最大排序缓存长度 + * @param clear_sort_size seq连续次数超过该值后,清空并关闭排序缓存 + */ + void setup(uint32_t max_sort_size, uint32_t clear_sort_size) { + _max_sort_size = max_sort_size; + _clear_sort_size = clear_sort_size; + } + + void setOnSort(function cb){ + _cb = std::move(cb); + } + + /** + * 清空状态 + */ + void clear() { + _last_seq = 0; + _seq_ok_count = 0; + _sort_started = 0; + _seq_cycle_count = 0; + _rtp_sort_cache_map.clear(); + } + + /** + * 获取排序缓存长度 + */ + int getJitterSize(){ + return _rtp_sort_cache_map.size(); + } + + /** + * 获取seq回环次数 + */ + int getCycleCount(){ + return _seq_cycle_count; + } + + /** + * 输入并排序 + * @param seq 序列号 + * @param packet 包负载 + */ + void sortPacket(SEQ seq, const T &packet){ + if (seq != _last_seq + 1 && _last_seq != 0) { + //包乱序或丢包 + _seq_ok_count = 0; + _sort_started = true; + if (_last_seq > seq && _last_seq - seq > 0xFF) { + //sequence回环,清空所有排序缓存 + while (_rtp_sort_cache_map.size()) { + popPacket(); + } + ++_seq_cycle_count; + } + } else { + //正确序列的包 + _seq_ok_count++; + } + + _last_seq = seq; + + //开始排序缓存 + if (_sort_started) { + _rtp_sort_cache_map.emplace(seq, packet); + if (_seq_ok_count >= _clear_sort_size) { + //网络环境改善,需要清空排序缓存 + _seq_ok_count = 0; + _sort_started = false; + while (_rtp_sort_cache_map.size()) { + popPacket(); + } + } else if (_rtp_sort_cache_map.size() >= _max_sort_size) { + //排序缓存溢出 + popPacket(); + } + } else { + //正确序列 + onPacketSorted(seq, packet); + } + } + +private: + void popPacket() { + auto it = _rtp_sort_cache_map.begin(); + onPacketSorted(it->first, it->second); + _rtp_sort_cache_map.erase(it); + } + + void onPacketSorted(SEQ seq, const T &packet) { + _cb(seq, packet); + } + +private: + //是否开始seq排序 + bool _sort_started = false; + //上次seq + SEQ _last_seq = 0; + //seq连续次数计数 + uint32_t _seq_ok_count = 0; + //seq回环次数计数 + uint32_t _seq_cycle_count = 0; + //排序缓存长度 + uint32_t _max_sort_size; + //seq连续次数超过该值后,清空并关闭排序缓存 + uint32_t _clear_sort_size; + //rtp排序缓存,根据seq排序 + map _rtp_sort_cache_map; + //回调 + function _cb; +}; + class RtpReceiver { public: RtpReceiver(); virtual ~RtpReceiver(); -protected: +protected: /** * 输入数据指针生成并排序rtp包 * @param track_index track下标索引 @@ -46,6 +163,7 @@ protected: * @param track_index track索引 */ virtual void onRtpSorted(const RtpPacket::Ptr &rtp, int track_index){} + void clear(); void setPoolSize(int size); int getJitterSize(int track_index); @@ -58,16 +176,8 @@ private: uint32_t _ssrc[2] = { 0, 0 }; //ssrc不匹配计数 uint32_t _ssrc_err_count[2] = { 0, 0 }; - //上次seq - uint16_t _last_seq[2] = { 0 , 0 }; - //seq连续次数计数 - uint32_t _seq_ok_count[2] = { 0 , 0}; - //seq回环次数计数 - uint32_t _seq_cycle_count[2] = { 0 , 0}; - //是否开始seq排序 - bool _sort_started[2] = { 0 , 0}; //rtp排序缓存,根据seq排序 - map _rtp_sort_cache_map[2]; + PacketSortor _rtp_sortor[2]; //rtp循环池 RtspMediaSource::PoolType _rtp_pool; };