diff --git a/src/Rtsp/RtpReceiver.h b/src/Rtsp/RtpReceiver.h index 5a2e4da6..fad10684 100644 --- a/src/Rtsp/RtpReceiver.h +++ b/src/Rtsp/RtpReceiver.h @@ -26,6 +26,8 @@ template class PacketSortor { public: static constexpr SEQ SEQ_MAX = (std::numeric_limits::max)(); + using iterator = typename std::map::iterator; + PacketSortor() = default; ~PacketSortor() = default; @@ -36,7 +38,7 @@ public: */ void clear() { _started = false; - _seq_cycle_count = 0; + _ticker.resetTime(); _pkt_sort_cache_map.clear(); } @@ -45,96 +47,129 @@ public: */ size_t getJitterSize() const { return _pkt_sort_cache_map.size(); } - /** - * 获取seq回环次数 - */ - size_t getCycleCount() const { return _seq_cycle_count; } - /** * 输入并排序 * @param seq 序列号 * @param packet 包负载 */ void sortPacket(SEQ seq, T packet) { + _latest_seq = seq; if (!_started) { // 记录第一个seq _started = true; _last_seq_out = seq - 1; } - if (seq == static_cast(_last_seq_out + 1)) { + auto next_seq = static_cast(_last_seq_out + 1); + if (seq == next_seq) { // 收到下一个seq output(seq, std::move(packet)); + // 清空连续包列表 + flushPacket(); return; } - if (seq < _last_seq_out && _last_seq_out != SEQ_MAX && seq < 1024 && _last_seq_out > SEQ_MAX - 1024) { - // seq回环,清空回环前缓存 - flush(); - _last_seq_out = SEQ_MAX; - ++_seq_cycle_count; - sortPacket(seq, std::move(packet)); + if (seq < next_seq && !mayLooped(next_seq, seq)) { + // 无回环风险, 过滤seq回退包 return; } - - if (seq <= _last_seq_out && _last_seq_out != SEQ_MAX) { - // 这个回退包已经不再等待 - return; - } - _pkt_sort_cache_map.emplace(seq, std::move(packet)); - auto it_min = _pkt_sort_cache_map.begin(); - auto it_max = _pkt_sort_cache_map.rbegin(); - if (it_max->first - it_min->first > (SEQ_MAX >> 1)) { - // 回环后,收到回环前的大值seq, 忽略掉 - _pkt_sort_cache_map.erase((++it_max).base()); - return; - } - tryFlushFrontPacket(); - - if (_pkt_sort_cache_map.size() > _max_buffer_size || (_ticker.elapsedTime() > _max_buffer_ms && !_pkt_sort_cache_map.empty())) { - // buffer太长,强行减小 - WarnL << "packet dropped: " << static_cast(_last_seq_out + 1) << " -> " - << static_cast(_pkt_sort_cache_map.begin()->first - 1) - << ", jitter buffer size: " << _pkt_sort_cache_map.size() - << ", jitter buffer ms: " << _ticker.elapsedTime(); - popIterator(_pkt_sort_cache_map.begin()); + if (needForceFlush(seq)) { + forceFlush(next_seq); } } void flush() { - // 清空缓存 - while (!_pkt_sort_cache_map.empty()) { - popIterator(_pkt_sort_cache_map.begin()); + if (!_pkt_sort_cache_map.empty()) { + forceFlush(static_cast(_last_seq_out + 1)); + _pkt_sort_cache_map.clear(); } } + void setParams(size_t max_buffer_size, size_t max_buffer_ms, size_t max_distance) { + _max_buffer_size = max_buffer_size; + _max_buffer_ms = max_buffer_ms; + _max_distance = max_distance; + } + private: - void tryFlushFrontPacket() { - while (!_pkt_sort_cache_map.empty()) { - auto it = _pkt_sort_cache_map.begin(); - auto next_seq = static_cast(_last_seq_out + 1); - if (it->first < next_seq) { - _pkt_sort_cache_map.erase(it); - continue; + SEQ distance(SEQ seq) { + SEQ ret; + auto next_seq = static_cast(_last_seq_out + 1); + if (seq > next_seq) { + ret = seq - next_seq; + } else { + ret = next_seq - seq; + } + if (ret > SEQ_MAX >> 1) { + return SEQ_MAX - ret; + } + return ret; + } + + bool needForceFlush(SEQ seq) { + return !_pkt_sort_cache_map.empty() && (_pkt_sort_cache_map.size() > _max_buffer_size || + distance(seq) > _max_distance || _ticker.elapsedTime() > _max_buffer_ms); + } + + //外部调用代码确保_pkt_sort_cache_map不为空 + void forceFlush(SEQ next_seq) { + // 寻找距离比next_seq大的最近的seq + auto it = _pkt_sort_cache_map.lower_bound(next_seq); + if (it == _pkt_sort_cache_map.end()) { + // 没有比next_seq更大的seq,应该是回环时丢包导致 + it = _pkt_sort_cache_map.begin(); + } + // 丢包无法恢复,把这个包当做next_seq + popIterator(it); + // 清空连续包列表 + flushPacket(); + // 删除距离next_seq太大的包 + for (auto it = _pkt_sort_cache_map.begin(); it != _pkt_sort_cache_map.end();) { + if (distance(it->first) > _max_distance) { + it = _pkt_sort_cache_map.erase(it); + } else { + ++it; } - if (it->first == next_seq) { - // 连续的seq - popIterator(it); + } + } + + bool mayLooped(SEQ last_seq, SEQ now_seq) { return last_seq > SEQ_MAX - _max_distance || now_seq < _max_distance; } + + void flushPacket() { + if (_pkt_sort_cache_map.empty()) { + return; + } + auto next_seq = static_cast(_last_seq_out + 1); + auto it = _pkt_sort_cache_map.lower_bound(next_seq); + if (!mayLooped(next_seq, next_seq)) { + // 无回环风险, 清空 < next_seq的值 + it = _pkt_sort_cache_map.erase(_pkt_sort_cache_map.begin(), it); + } + + while (it != _pkt_sort_cache_map.end()) { + // 找到下一个包 + if (it->first == static_cast(_last_seq_out + 1)) { + it = popIterator(it); continue; } break; } } - void popIterator(typename std::map::iterator it) { - auto seq = it->first; - auto data = std::move(it->second); - _pkt_sort_cache_map.erase(it); - output(seq, std::move(data)); + iterator popIterator(iterator it) { + output(it->first, std::move(it->second)); + return _pkt_sort_cache_map.erase(it); } void output(SEQ seq, T packet) { + auto next_seq = static_cast(_last_seq_out + 1); + if (seq != next_seq) { + WarnL << "packet dropped: " << next_seq << " -> " << static_cast(seq - 1) + << ", latest seq: " << _latest_seq + << ", jitter buffer size: " << _pkt_sort_cache_map.size() + << ", jitter buffer ms: " << _ticker.elapsedTime(); + } _last_seq_out = seq; _cb(seq, std::move(packet)); _ticker.resetTime(); @@ -142,23 +177,25 @@ private: private: bool _started = false; - //排序缓存最大保存数据长度,单位毫秒 - size_t _max_buffer_ms = 3000; - //排序缓存最大保存数据个数 + // 排序缓存最大保存数据长度,单位毫秒 + size_t _max_buffer_ms = 1000; + // 排序缓存最大保存数据个数 size_t _max_buffer_size = 1024; - //记录上次output至今的时间 + // seq最大跳跃距离 + size_t _max_distance = 256; + // 记录上次output至今的时间 toolkit::Ticker _ticker; - //下次应该输出的SEQ + // 最近输入的seq + SEQ _latest_seq = 0; + // 下次应该输出的SEQ SEQ _last_seq_out = 0; - //seq回环次数计数 - size_t _seq_cycle_count = 0; - //pkt排序缓存,根据seq排序 + // pkt排序缓存,根据seq排序 std::map _pkt_sort_cache_map; - //回调 + // 回调 std::function _cb; }; -class RtpTrack : private PacketSortor { +class RtpTrack : public PacketSortor { public: class BadRtpException : public std::invalid_argument { public: @@ -268,11 +305,6 @@ public: return _track[index].getJitterSize(); } - size_t getCycleCount(int index) const { - assert(index < kCount && index >= 0); - return _track[index].getCycleCount(); - } - uint32_t getSSRC(int index) const { assert(index < kCount && index >= 0); return _track[index].getSSRC(); diff --git a/tests/test_sortor.cpp b/tests/test_sortor.cpp index 921f248c..7968a2c9 100644 --- a/tests/test_sortor.cpp +++ b/tests/test_sortor.cpp @@ -111,7 +111,6 @@ void test_real() { } cout << "输入数据个数:" << input_list.size() - << " 回环次数:" << sortor.getCycleCount() << " 抖动缓冲区大小:" << sortor.getJitterSize(); //清空缓存 @@ -206,7 +205,6 @@ void test_rand(){ i += (count + 1); } cout << "输入数据个数:" << input_list.size() - << " 回环次数:" << sortor.getCycleCount() << " 抖动缓冲区大小:" << sortor.getJitterSize() << " 丢包个数:" << drop_list.size() << " 重复包个数:" << repeat_list.size(); diff --git a/webrtc/WebRtcTransport.cpp b/webrtc/WebRtcTransport.cpp index 99f9f55d..3551f00a 100644 --- a/webrtc/WebRtcTransport.cpp +++ b/webrtc/WebRtcTransport.cpp @@ -652,15 +652,14 @@ void WebRtcTransportImp::onRtcConfigure(RtcConfigure &configure) const { /////////////////////////////////////////////////////////////////// -class RtpChannel - : public RtpTrackImp - , public std::enable_shared_from_this { +class RtpChannel : public RtpTrackImp, public std::enable_shared_from_this { public: RtpChannel(EventPoller::Ptr poller, RtpTrackImp::OnSorted cb, function on_nack) { _poller = std::move(poller); _on_nack = std::move(on_nack); setOnSorted(std::move(cb)); - + //设置jitter buffer参数 + RtpTrackImp::setParams(1024, NackContext::kNackMaxMS, 512); _nack_ctx.setOnNack([this](const FCI_NACK &nack) { onNack(nack); }); }