diff --git a/srt/PacketQueue.cpp b/srt/PacketQueue.cpp index cb43bd13..87f078d6 100644 --- a/srt/PacketQueue.cpp +++ b/srt/PacketQueue.cpp @@ -9,6 +9,21 @@ static inline bool isSeqEdge(uint32_t seq, uint32_t cap) { return false; } +static inline bool isSeqCycle(uint32_t first, uint32_t second) { + uint32_t diff; + if (first > second) { + diff = first - second; + } else { + diff = second - first; + } + + if (diff > (MAX_SEQ >> 1)) { + return true; + } else { + return false; + } +} + static inline bool isTSCycle(uint32_t first, uint32_t second) { uint32_t diff; if (first > second) { @@ -220,4 +235,283 @@ std::string PacketQueue::dump() { return std::move(printer); } +//////////////////// PacketRecvQueue ////////////////////////////////// + +PacketRecvQueue::PacketRecvQueue(uint32_t max_size, uint32_t init_seq, uint32_t latency) + : _pkt_cap(max_size) + , _pkt_latency(latency) + , _pkt_expected_seq(init_seq) + , _pkt_buf(max_size) {} +bool PacketRecvQueue::inputPacket(DataPacket::Ptr pkt, std::list &out) { + while (_size > 0 && _start == _end) { + if (_pkt_buf[_start]) { + out.push_back(_pkt_buf[_start]); + _size--; + _pkt_buf[_start] = nullptr; + } + _start = (_start + 1) % _pkt_cap; + _pkt_expected_seq = genExpectedSeq(_pkt_expected_seq + 1); + } + + tryInsertPkt(pkt); + + DataPacket::Ptr it = _pkt_buf[_start]; + while (it) { + out.push_back(it); + _size--; + _pkt_buf[_start] = nullptr; + _pkt_expected_seq = genExpectedSeq(_pkt_expected_seq + 1); + _start = (_start + 1) % _pkt_cap; + it = _pkt_buf[_start]; + } + while (timeLatency() > _pkt_latency) { + it = _pkt_buf[_start]; + if (it) { + _pkt_buf[_start] = nullptr; + out.push_back(it); + _size--; + } + _pkt_expected_seq = genExpectedSeq(_pkt_expected_seq + 1); + _start = (_start + 1) % _pkt_cap; + } + return true; +} + +uint32_t PacketRecvQueue::timeLatency() { + if (_size <= 0) { + return 0; + } + + auto first = getFirst()->timestamp; + auto last = getLast()->timestamp; + + uint32_t dur; + if (last > first) { + dur = last - first; + } else { + dur = first - last; + } + + if (dur > 0x80000000) { + dur = MAX_TS - dur; + WarnL << "cycle dur " << dur; + } + + return dur; +} +std::list PacketRecvQueue::getLostSeq() { + std::list re; + if (_size <= 0) { + return re; + } + + if (getExpectedSize() == getSize()) { + return re; + } + + LostPair lost; + uint32_t steup = 0; + bool finish = true; + + for (uint32_t i = _start; i != _end;) { + if (!_pkt_buf[i]) { + if (finish) { + finish = false; + lost.first = _pkt_expected_seq + steup; + lost.second = genExpectedSeq(lost.first + 1); + } else { + lost.second = genExpectedSeq(_pkt_expected_seq + steup + 1); + } + } else { + if (!finish) { + finish = true; + re.push_back(lost); + } + } + i = (i + 1) % _pkt_cap; + steup++; + } + return re; +} + +size_t PacketRecvQueue::getSize() { + return _size; +} +size_t PacketRecvQueue::getExpectedSize() { + if (_size <= 0) { + return 0; + } + + uint32_t max, min; + auto first = _pkt_expected_seq; + auto last = getLast()->packet_seq_number; + if (last >= first) { + max = last; + min = first; + } else { + max = first; + min = last; + } + if ((max - min) >= (MAX_SEQ >> 1)) { + TraceL << "cycle " + << "expected seq " << _pkt_expected_seq << " min " << min << " max " << max << " size " << _size; + return MAX_SEQ - _pkt_expected_seq + min + 1; + } else { + return max - _pkt_expected_seq + 1; + } +} +size_t PacketRecvQueue::getAvailableBufferSize() { + auto size = getExpectedSize(); + if (_pkt_cap > size) { + return _pkt_cap - size; + } + + if (_pkt_cap > _size) { + return _pkt_cap - _size; + } + WarnL << " cap " << _pkt_cap << " expected size " << size << " map size " << _size; + return _pkt_cap; +} +uint32_t PacketRecvQueue::getExpectedSeq() { + return _pkt_expected_seq; +} + +std::string PacketRecvQueue::dump() { + _StrPrinter printer; + if (_size <= 0) { + printer << " expected seq :" << _pkt_expected_seq; + } else { + printer << " expected seq :" << _pkt_expected_seq << " size:" << _size + << " first:" << getFirst()->packet_seq_number; + printer << " last:" << getLast()->packet_seq_number; + printer << " latency:" << timeLatency() / 1e3; + } + return std::move(printer); +} +bool PacketRecvQueue::drop(uint32_t first, uint32_t last, std::list &out) { + uint32_t diff = 0; + if (isSeqCycle(_pkt_expected_seq, last)) { + if (last < _pkt_expected_seq) { + diff = MAX_SEQ - _pkt_expected_seq + last + 1; + } else { + WarnL << "drop first " << first << " last " << last << " expected " << _pkt_expected_seq; + return false; + } + } else { + if (last < _pkt_expected_seq) { + WarnL << "drop first " << first << " last " << last << " expected " << _pkt_expected_seq; + return false; + } + diff = last - _pkt_expected_seq + 1; + } + + if (diff > getExpectedSize()) { + WarnL << " diff " << diff << " expected size " << getExpectedSize(); + return false; + } + + for (uint32_t i = 0; i < diff; i++) { + auto pos = (i + _start) % _pkt_cap; + if (_pkt_buf[pos]) { + out.push_back(_pkt_buf[pos]); + _pkt_buf[pos] = nullptr; + _size--; + } + } + + _pkt_expected_seq = genExpectedSeq(last + 1); + _start = (diff + _start) % _pkt_cap; + if (_size <= 0) { + _end = _start; + WarnL; + } + return true; +} + +void PacketRecvQueue::insertToCycleBuf(DataPacket::Ptr pkt, uint32_t diff) { + auto pos = (_start + diff) % _pkt_cap; + + if (!_pkt_buf[pos]) { + _size++; + } else { + // WarnL << "repate packet " << pkt->packet_seq_number; + return; + } + _pkt_buf[pos] = pkt; + if (pos >= _end && (_start <= _end || pos < _start)) { + _end = (pos + 1) % _pkt_cap; + } +} +void PacketRecvQueue::tryInsertPkt(DataPacket::Ptr pkt) { + if (_pkt_expected_seq <= pkt->packet_seq_number) { + auto diff = pkt->packet_seq_number - _pkt_expected_seq; + if (diff >= (MAX_SEQ >> 1)) { + TraceL << "drop packet too later for cycle " + << "expected seq=" << _pkt_expected_seq << " pkt seq=" << pkt->packet_seq_number; + return; + } else { + if (diff >= _pkt_cap) { + WarnL << "too new " + << "expected seq=" << _pkt_expected_seq << " pkt seq=" << pkt->packet_seq_number << " cap " + << _pkt_cap; + return; + } + + insertToCycleBuf(pkt, diff); + } + } else { + auto diff = _pkt_expected_seq - pkt->packet_seq_number; + if (diff >= (MAX_SEQ >> 1)) { + diff = MAX_SEQ - diff; + if (diff >= _pkt_cap) { + WarnL << "too new " + << "expected seq=" << _pkt_expected_seq << " pkt seq=" << pkt->packet_seq_number << " cap " + << _pkt_cap; + return; + } + + insertToCycleBuf(pkt, diff); + + TraceL << " cycle packet " + << "expected seq=" << _pkt_expected_seq << " pkt seq=" << pkt->packet_seq_number; + } else { + // TraceL << "drop packet too later " + //<< "expected seq=" << _pkt_expected_seq << " pkt seq=" << pkt->packet_seq_number; + } + } +} +DataPacket::Ptr PacketRecvQueue::getFirst() { + if (_size <= 0) { + return nullptr; + } + + uint32_t i = _start; + while (1) { + if (_pkt_buf[i]) { + return _pkt_buf[i]; + } + i = (i + 1) % _pkt_cap; + } +} +DataPacket::Ptr PacketRecvQueue::getLast() { + if (_size <= 0) { + return nullptr; + } + uint32_t steup = 1; + uint32_t i = (_end + _pkt_cap - steup) % _pkt_cap; + /* + while (1) { + if (_pkt_buf[i]) { + _end = (i + 1) % _pkt_cap; + return _pkt_buf[i]; + } + i = (_end + _pkt_cap - steup) % _pkt_cap; + steup++; + } + */ + if (!_pkt_buf[i]) { + WarnL << "start " << _start << " end" << _end << " size " << _size; + } + return _pkt_buf[i]; +} } // namespace SRT \ No newline at end of file diff --git a/srt/PacketQueue.hpp b/srt/PacketQueue.hpp index 58d5f18a..d3e224c6 100644 --- a/srt/PacketQueue.hpp +++ b/srt/PacketQueue.hpp @@ -7,14 +7,34 @@ #include #include #include +#include namespace SRT { +class PacketQueueInterface { +public: + using Ptr = std::shared_ptr; + using LostPair = std::pair; + + PacketQueueInterface() = default; + virtual ~PacketQueueInterface() = default; + virtual bool inputPacket(DataPacket::Ptr pkt, std::list &out) = 0; + + virtual uint32_t timeLatency() = 0; + virtual std::list getLostSeq() = 0; + + virtual size_t getSize() = 0; + virtual size_t getExpectedSize() = 0; + virtual size_t getAvailableBufferSize() = 0; + virtual uint32_t getExpectedSeq() = 0; + + virtual std::string dump() = 0; + virtual bool drop(uint32_t first, uint32_t last, std::list &out) = 0; +}; // for recv -class PacketQueue { +class PacketQueue : public PacketQueueInterface { public: using Ptr = std::shared_ptr; - using LostPair = std::pair; PacketQueue(uint32_t max_size, uint32_t init_seq, uint32_t latency); ~PacketQueue() = default; @@ -37,10 +57,46 @@ private: private: uint32_t _pkt_cap; uint32_t _pkt_latency; - uint32_t _pkt_expected_seq = 0; + uint32_t _pkt_expected_seq; std::map _pkt_map; }; +class PacketRecvQueue : public PacketQueueInterface { +public: + using Ptr = std::shared_ptr; + + PacketRecvQueue(uint32_t max_size, uint32_t init_seq, uint32_t latency); + ~PacketRecvQueue() = default; + bool inputPacket(DataPacket::Ptr pkt, std::list &out); + + uint32_t timeLatency(); + std::list getLostSeq(); + + size_t getSize(); + size_t getExpectedSize(); + size_t getAvailableBufferSize(); + uint32_t getExpectedSeq(); + + std::string dump(); + bool drop(uint32_t first, uint32_t last, std::list &out); + +private: + void tryInsertPkt(DataPacket::Ptr pkt); + void insertToCycleBuf(DataPacket::Ptr pkt, uint32_t diff); + DataPacket::Ptr getFirst(); + DataPacket::Ptr getLast(); + +private: + uint32_t _pkt_cap; + uint32_t _pkt_latency; + uint32_t _pkt_expected_seq; + + std::vector _pkt_buf; + uint32_t _start = 0; + uint32_t _end = 0; + size_t _size = 0; +}; + } // namespace SRT #endif // ZLMEDIAKIT_SRT_PACKET_QUEUE_H \ No newline at end of file diff --git a/srt/SrtTransport.cpp b/srt/SrtTransport.cpp index 8089c168..ba90b8ab 100644 --- a/srt/SrtTransport.cpp +++ b/srt/SrtTransport.cpp @@ -213,7 +213,7 @@ void SrtTransport::handleHandshakeConclusion(HandshakePacket &pkt, struct sockad sendControlPacket(res, true); TraceL << " buf size = " << res->max_flow_window_size << " init seq =" << _init_seq_number << " latency=" << delay; - _recv_buf = std::make_shared(getPktBufSize(), _init_seq_number, delay * 1e3); + _recv_buf = std::make_shared(getPktBufSize(), _init_seq_number, delay * 1e3); _send_buf = std::make_shared(getPktBufSize(), delay * 1e3); _send_packet_seq_number = _init_seq_number; _buf_delay = delay; diff --git a/srt/SrtTransport.hpp b/srt/SrtTransport.hpp index eb7180fc..2fbd8064 100644 --- a/srt/SrtTransport.hpp +++ b/srt/SrtTransport.hpp @@ -120,7 +120,7 @@ private: PacketSendQueue::Ptr _send_buf; uint32_t _buf_delay = 120; - PacketQueue::Ptr _recv_buf; + PacketQueueInterface::Ptr _recv_buf; // NackContext _recv_nack; uint32_t _rtt = 100 * 1000; uint32_t _rtt_variance = 50 * 1000;