diff --git a/tests/test_rtcp_nack.cpp b/tests/test_rtcp_nack.cpp index f3f8a6c2..864c3b2f 100644 --- a/tests/test_rtcp_nack.cpp +++ b/tests/test_rtcp_nack.cpp @@ -15,20 +15,38 @@ using namespace std; using namespace toolkit; using namespace mediakit; -extern void testFCI(); - int main() { Logger::Instance().add(std::make_shared()); + Logger::Instance().setWriter(std::make_shared()); - srand((unsigned) time(NULL)); - + srand((unsigned)time(NULL)); NackContext ctx; - for (int i = 1; i < 1000; ++i) { - if (i % (1 + (rand() % 30)) == 0) { - DebugL << "drop:" << i; + ctx.setOnNack([](const FCI_NACK &nack){ + InfoL << nack.dumpString(); + }); + auto drop_start = 0; + auto drop_len = 0; + uint16_t offset = 0xFFFF - 200 - 50; + for (int i = 1; i < 10000; ++i) { + if (i % 100 == 0) { + drop_start = i + rand() % 16; + drop_len = 4 + rand() % 16; + InfoL << "start drop:" << (uint16_t)(drop_start + offset) << " -> " + << (uint16_t)(drop_start + offset + drop_len); + } + uint16_t seq = i + offset; + if ((i >= drop_start && i <= drop_start + drop_len) || seq == 65535 || seq == 0 || seq == 1) { + TraceL << "drop:" << (uint16_t)(i + offset); } else { - ctx.received(i); - + static auto last_seq = seq; + if (seq - last_seq > 16) { + ctx.received(last_seq); + ctx.received(seq); + DebugL << "seq reduce:" << last_seq; + last_seq = seq; + } else { + ctx.received(seq); + } } } sleep(1); diff --git a/webrtc/Nack.cpp b/webrtc/Nack.cpp index 63e615b5..72662058 100644 --- a/webrtc/Nack.cpp +++ b/webrtc/Nack.cpp @@ -27,7 +27,7 @@ void NackList::pushBack(RtpPacket::Ptr rtp) { } _cache_ms_check = 0; while (getCacheMS() >= kMaxNackMS) { - //需要清除部分nack缓存 + // 需要清除部分nack缓存 popFront(); } } @@ -36,7 +36,7 @@ void NackList::forEach(const FCI_NACK &nack, const function= front_stamp) { return back_stamp - front_stamp; } - //很有可能回环了 + // 很有可能回环了 return back_stamp + (UINT32_MAX - front_stamp); } return 0; @@ -95,90 +95,122 @@ int64_t NackList::getRtpStamp(uint16_t seq) { //////////////////////////////////////////////////////////////////////////////////////////////// +NackContext::NackContext() { + setOnNack(nullptr); +} + void NackContext::received(uint16_t seq, bool is_rtx) { - if (!_last_max_seq && _seq.empty()) { - _last_max_seq = seq - 1; + if (!_started) { + // 记录第一个seq + _started = true; + _nack_seq = seq - 1; } - if (is_rtx || (seq < _last_max_seq && !(seq < 1024 && _last_max_seq > UINT16_MAX - 1024))) { - //重传包或 - // seq回退,且非回环,那么这个应该是重传包 - onRtx(seq); + + if (seq < _nack_seq && _nack_seq != UINT16_MAX && seq < 1024 && _nack_seq > UINT16_MAX - 1024) { + // seq回环,清空回环前状态 + makeNack(UINT16_MAX, true); + // 等待下一个seq为0的包 + _nack_seq = UINT16_MAX; + _seq.emplace(seq); + return; + } + + if (is_rtx || (seq < _nack_seq && _nack_seq != UINT16_MAX)) { + // seq非回环回退包,猜测其为重传包,清空其nack状态 + clearNackStatus(seq); + return; + } + + auto pr = _seq.emplace(seq); + if (!pr.second) { + // seq重复, 忽略 return; } - _seq.emplace(seq); auto max_seq = *_seq.rbegin(); auto min_seq = *_seq.begin(); auto diff = max_seq - min_seq; - if (!diff) { + if (diff > (UINT16_MAX >> 1)) { + // 回环后,收到回环前的大值seq, 忽略掉 + _seq.erase(max_seq); return; } - - if (diff > UINT16_MAX / 2) { - //回环 + if (min_seq == (uint16_t)(_nack_seq + 1) && _seq.size() == diff + 1) { + // 都是连续的seq,未丢包 _seq.clear(); - _last_max_seq = min_seq; - _nack_send_status.clear(); - return; - } - - if (_seq.size() == (size_t)diff + 1 && _last_max_seq + 1 == min_seq) { - //都是连续的seq,未丢包 - _seq.clear(); - _last_max_seq = max_seq; + _nack_seq = max_seq; } else { // seq不连续,有丢包 - if (min_seq == _last_max_seq + 1) { - //前面部分seq是连续的,未丢包,移除之 - eraseFrontSeq(); - } + makeNack(max_seq, false); + } +} - //有丢包,丢包从_last_max_seq开始 - auto nack_rtp_count = FCI_NACK::kBitSize; - if (max_seq > nack_rtp_count + _last_max_seq) { - vector vec; - vec.resize(FCI_NACK::kBitSize, false); - for (size_t i = 0; i < nack_rtp_count; ++i) { - vec[i] = _seq.find(_last_max_seq + i + 2) == _seq.end(); - } - doNack(FCI_NACK(_last_max_seq + 1, vec), true); - _last_max_seq += nack_rtp_count + 1; - if (_last_max_seq >= max_seq) { - _seq.clear(); - } else { - auto it = _seq.emplace_hint(_seq.begin(), _last_max_seq + 1); - _seq.erase(_seq.begin(), it); - } +void NackContext::makeNack(uint16_t max_seq, bool flush) { + // 尝试移除前面部分连续的seq + eraseFrontSeq(); + + if (max_seq == _nack_seq) { + // 完成所有seq丢包判断 + return; + } + + // 有丢包,丢包从_last_max_seq开始统计丢包 + uint16_t nack_rtp_count; + if (flush) { + nack_rtp_count = max_seq - _nack_seq - 1; + nack_rtp_count = MIN(FCI_NACK::kBitSize, nack_rtp_count); + } else { + nack_rtp_count = FCI_NACK::kBitSize; + } + auto max_nack = 5u; + // 最多生成5个nack包,防止seq大幅跳跃导致一直循环 + while (max_nack-- && max_seq > (uint16_t)(nack_rtp_count + _nack_seq)) { + vector vec; + vec.resize(nack_rtp_count, false); + for (size_t i = 0; i < nack_rtp_count; ++i) { + vec[i] = _seq.find((uint16_t)(_nack_seq + i + 2)) == _seq.end(); + } + doNack(FCI_NACK(_nack_seq + 1, vec), true); + _nack_seq += nack_rtp_count + 1; + if (_nack_seq >= max_seq) { + _seq.clear(); + } else { + // 返回第一个比_last_max_seq大的元素 + auto it = _seq.upper_bound(_nack_seq); + // 移除 <=_last_max_seq 的seq + _seq.erase(_seq.begin(), it); } } } void NackContext::setOnNack(onNack cb) { - _cb = std::move(cb); + if (cb) { + _cb = std::move(cb); + } else { + _cb = [](const FCI_NACK &nack) {}; + } } void NackContext::doNack(const FCI_NACK &nack, bool record_nack) { if (record_nack) { recordNack(nack); } - if (_cb) { - _cb(nack); - } + _cb(nack); } void NackContext::eraseFrontSeq() { - //前面部分seq是连续的,未丢包,移除之 + // 前面部分seq是连续的,未丢包,移除之 for (auto it = _seq.begin(); it != _seq.end();) { - if (*it != _last_max_seq + 1) { + if (*it != (uint16_t)(_nack_seq + 1)) { // seq不连续,丢包了 break; } - _last_max_seq = *it; + _nack_seq = *it; it = _seq.erase(it); } } -void NackContext::onRtx(uint16_t seq) { +void NackContext::clearNackStatus(uint16_t seq) { auto it = _nack_send_status.find(seq); if (it == _nack_send_status.end()) { return; @@ -187,9 +219,8 @@ void NackContext::onRtx(uint16_t seq) { _nack_send_status.erase(it); if (rtt >= 0) { - // rtt不肯小于0 + // rtt不能小于0 _rtt = rtt; - // InfoL << "rtt:" << rtt; } } @@ -205,7 +236,7 @@ void NackContext::recordNack(const FCI_NACK &nack) { } ++i; } - //记录太多了,移除一部分早期的记录 + // 记录太多了,移除一部分早期的记录 while (_nack_send_status.size() > kNackMaxSize) { _nack_send_status.erase(_nack_send_status.begin()); } @@ -216,18 +247,18 @@ uint64_t NackContext::reSendNack() { auto now = getCurrentMillisecond(); for (auto it = _nack_send_status.begin(); it != _nack_send_status.end();) { if (now - it->second.first_stamp > kNackMaxMS) { - //该rtp丢失太久了,不再要求重传 + // 该rtp丢失太久了,不再要求重传 it = _nack_send_status.erase(it); continue; } if (now - it->second.update_stamp < kNackIntervalRatio * _rtt) { - //距离上次nack不足2倍的rtt,不用再发送nack + // 距离上次nack不足2倍的rtt,不用再发送nack ++it; continue; } - //此rtp需要请求重传 + // 此rtp需要请求重传 nack_rtp.emplace(it->first); - //更新nack发送时间戳 + // 更新nack发送时间戳 it->second.update_stamp = now; if (++(it->second.nack_count) == kNackMaxCount) { // nack次数太多,移除之 @@ -238,7 +269,7 @@ uint64_t NackContext::reSendNack() { } if (_nack_send_status.empty()) { - //不需要再发送nack + // 不需要再发送nack return 0; } @@ -253,12 +284,12 @@ uint64_t NackContext::reSendNack() { } auto inc = *it - pid; if (inc > (ssize_t)FCI_NACK::kBitSize) { - //新的nack包 + // 新的nack包 doNack(FCI_NACK(pid, vec), false); pid = -1; continue; } - //这个包丢了 + // 这个包丢了 vec[inc - 1] = true; ++it; } @@ -266,7 +297,7 @@ uint64_t NackContext::reSendNack() { doNack(FCI_NACK(pid, vec), false); } - //重传间隔不得低于5ms + // 重传间隔不得低于5ms return max(_rtt, 5); } diff --git a/webrtc/Nack.h b/webrtc/Nack.h index 2b38e6a3..0867823e 100644 --- a/webrtc/Nack.h +++ b/webrtc/Nack.h @@ -53,7 +53,7 @@ public: // nack重传频率,rtt的倍数 static constexpr auto kNackIntervalRatio = 1.0f; - NackContext() = default; + NackContext(); ~NackContext() = default; void received(uint16_t seq, bool is_rtx = false); @@ -64,13 +64,16 @@ private: void eraseFrontSeq(); void doNack(const FCI_NACK &nack, bool record_nack); void recordNack(const FCI_NACK &nack); - void onRtx(uint16_t seq); + void clearNackStatus(uint16_t seq); + void makeNack(uint16_t max, bool flush = false); private: + bool _started = false; int _rtt = 50; onNack _cb; std::set _seq; - uint16_t _last_max_seq = 0; + // 最新nack包中的rtp seq值 + uint16_t _nack_seq = 0; struct NackStatus { uint64_t first_stamp;