From 7606dd7110c729dd4ef70d5d7b755a0df963c760 Mon Sep 17 00:00:00 2001 From: xiongguangjie Date: Fri, 3 Jun 2022 20:18:34 +0800 Subject: [PATCH] for srt push fix ack paramter error result in pkt lost --- srt/Ack.cpp | 9 +++++ srt/Ack.hpp | 2 +- srt/Common.hpp | 36 ++++++++++++++++++++ srt/SrtTransport.cpp | 79 ++++++++++++++++++++++++-------------------- srt/SrtTransport.hpp | 13 ++++---- srt/Statistic.cpp | 51 ++++++++++++++++++---------- srt/Statistic.hpp | 21 +++++++----- 7 files changed, 142 insertions(+), 69 deletions(-) diff --git a/srt/Ack.cpp b/srt/Ack.cpp index 7a61971e..7be5a08d 100644 --- a/srt/Ack.cpp +++ b/srt/Ack.cpp @@ -71,4 +71,13 @@ bool ACKPacket::storeToData() { return true; } + +std::string ACKPacket::dump(){ + _StrPrinter printer; + printer << "last_ack_pkt_seq_number="<(); res->dst_socket_id = _peer_socket_id; - res->timestamp = DurationCountMicroseconds(SteadyClock::now() - _start_timestamp); + res->timestamp = DurationCountMicroseconds(_now - _start_timestamp); res->mtu = _mtu; res->max_flow_window_size = _max_window_size; res->initial_packet_sequence_number = _init_seq_number; @@ -194,6 +197,7 @@ void SrtTransport::handleHandshakeConclusion(HandshakePacket &pkt, struct sockad TraceL << getIdentifier() << " CONCLUSION handle repeate "; sendControlPacket(_handleshake_res, true); } + _last_ack_pkt_seq_num = _init_seq_number; } void SrtTransport::handleHandshake(uint8_t *buf, int len, struct sockaddr_storage *addr){ HandshakePacket pkt; @@ -206,21 +210,29 @@ void SrtTransport::handleHandshake(uint8_t *buf, int len, struct sockaddr_storag }else{ WarnL<<" not support handshake type = "<< pkt.handshake_type; } - _ack_ticker.resetTime(); - _nak_ticker.resetTime(); + _ack_ticker.resetTime(_now); + _nak_ticker.resetTime(_now); } void SrtTransport::handleKeeplive(uint8_t *buf, int len, struct sockaddr_storage *addr){ TraceL; + sendKeepLivePacket(); } + + void SrtTransport::sendKeepLivePacket(){ + KeepLivePacket::Ptr pkt = std::make_shared(); + pkt->dst_socket_id = _peer_socket_id; + pkt->timestamp = DurationCountMicroseconds(_now -_start_timestamp); + pkt->storeToData(); + sendControlPacket(pkt,true); + } void SrtTransport::handleACK(uint8_t *buf, int len, struct sockaddr_storage *addr){ TraceL; - auto now = SteadyClock::now(); ACKPacket ack; ack.loadFromData(buf,len); ACKACKPacket::Ptr pkt = std::make_shared(); pkt->dst_socket_id = _peer_socket_id; - pkt->timestamp = DurationCountMicroseconds(now -_start_timestamp); + pkt->timestamp = DurationCountMicroseconds(_now -_start_timestamp); pkt->ack_number = ack.ack_number; pkt->storeToData(); sendControlPacket(pkt,true); @@ -247,13 +259,12 @@ void SrtTransport::handleUserDefinedType(uint8_t *buf, int len, struct sockaddr_ void SrtTransport::handleACKACK(uint8_t *buf, int len, struct sockaddr_storage *addr){ //TraceL; - auto now = SteadyClock::now(); ACKACKPacket::Ptr pkt = std::make_shared(); pkt->loadFromData(buf,len); - uint32_t rtt = DurationCountMicroseconds(now - _ack_send_timestamp[pkt->ack_number]); - _rtt_variance = 3*_rtt_variance/4+abs(_rtt - rtt); - _rtt = 7*rtt/8+_rtt/8; + uint32_t rtt = DurationCountMicroseconds(_now - _ack_send_timestamp[pkt->ack_number]); + _rtt_variance = (3*_rtt_variance+abs(_rtt - rtt))/4; + _rtt = (7*rtt+_rtt)/8; _ack_send_timestamp.erase(pkt->ack_number); } @@ -268,31 +279,30 @@ void SrtTransport::sendACKPacket() { } ACKPacket::Ptr pkt=std::make_shared(); - auto now = SteadyClock::now(); pkt->dst_socket_id = _peer_socket_id; - pkt->timestamp = DurationCountMicroseconds(now - _start_timestamp); + pkt->timestamp = DurationCountMicroseconds(_now - _start_timestamp); pkt->ack_number = ++_ack_number_count; pkt->last_ack_pkt_seq_number = _recv_buf->getExpectedSeq(); pkt->rtt = _rtt; pkt->rtt_variance = _rtt_variance; pkt->available_buf_size = _recv_buf->getAvailableBufferSize(); - pkt->pkt_recv_rate = _pkt_recv_rate_context.getPacketRecvRate(); - pkt->estimated_link_capacity = _estimated_link_capacity_context.getEstimatedLinkCapacity(); - pkt->recv_rate = _recv_rate_context.getRecvRate(); + pkt->pkt_recv_rate = _pkt_recv_rate_context->getPacketRecvRate(); + pkt->estimated_link_capacity = _estimated_link_capacity_context->getEstimatedLinkCapacity(); + pkt->recv_rate = _recv_rate_context->getRecvRate(); pkt->storeToData(); - _ack_send_timestamp[pkt->ack_number] = now; + _ack_send_timestamp[pkt->ack_number] = _now; _last_ack_pkt_seq_num = pkt->last_ack_pkt_seq_number; sendControlPacket(pkt,true); - TraceL<<"send ack"; + //TraceL<<"send ack "<dump(); } void SrtTransport::sendLightACKPacket() { if(_last_ack_pkt_seq_num == _recv_buf->getExpectedSeq()){ return; } ACKPacket::Ptr pkt=std::make_shared(); - auto now = SteadyClock::now(); + pkt->dst_socket_id = _peer_socket_id; - pkt->timestamp = DurationCountMicroseconds(now - _start_timestamp); + pkt->timestamp = DurationCountMicroseconds(_now - _start_timestamp); pkt->ack_number = 0; pkt->last_ack_pkt_seq_number = _recv_buf->getExpectedSeq(); pkt->rtt = 0; @@ -304,15 +314,14 @@ void SrtTransport::sendLightACKPacket() { pkt->storeToData(); _last_ack_pkt_seq_num = pkt->last_ack_pkt_seq_number; sendControlPacket(pkt,true); - TraceL<<"send light ack"; + TraceL<<"send ack "<dump(); } void SrtTransport::sendNAKPacket(std::list& lost_list){ NAKPacket::Ptr pkt = std::make_shared(); - auto now = SteadyClock::now(); pkt->dst_socket_id = _peer_socket_id; - pkt->timestamp = DurationCountMicroseconds(now - _start_timestamp); + pkt->timestamp = DurationCountMicroseconds(_now - _start_timestamp); pkt->lost_list = lost_list; pkt->storeToData(); @@ -325,7 +334,7 @@ void SrtTransport::handleDataPacket(uint8_t *buf, int len, struct sockaddr_stora pkt->loadFromData(buf,len); //TraceL<<" seq="<< pkt->packet_seq_number<<" ts="<timestamp<<" size="<payloadSize()<<\ - " PP="<<(int)pkt->PP<<" O="<<(int)pkt->O<<" kK="<<(int)pkt->KK<<" R="<<(int)pkt->R; + //" PP="<<(int)pkt->PP<<" O="<<(int)pkt->O<<" kK="<<(int)pkt->KK<<" R="<<(int)pkt->R; #if 1 _recv_buf->inputPacket(pkt); #else @@ -344,19 +353,19 @@ void SrtTransport::handleDataPacket(uint8_t *buf, int len, struct sockaddr_stora onSRTData(std::move(data),addr); } - auto nak_interval = (_rtt+_rtt_variance*4)/2/1000; - if(_nak_ticker.elapsedTime()>20 && _nak_ticker.elapsedTime()>nak_interval){ + auto nak_interval = (_rtt+_rtt_variance*4)/2; + if(_nak_ticker.elapsedTime(_now)>20*1000 && _nak_ticker.elapsedTime(_now)>nak_interval){ auto lost = _recv_buf->getLostSeq(); if(!lost.empty()){ sendNAKPacket(lost); //TraceL<<"send NAK"; } - _nak_ticker.resetTime(); + _nak_ticker.resetTime(_now); } - if(_ack_ticker.elapsedTime()>=10){ + if(_ack_ticker.elapsedTime(_now)>10*1000){ _light_ack_pkt_count = 0; - _ack_ticker.resetTime(); + _ack_ticker.resetTime(_now); // send a ack per 10 ms for receiver sendACKPacket(); }else{ diff --git a/srt/SrtTransport.hpp b/srt/SrtTransport.hpp index 6e9f4b7d..6d2b9e48 100644 --- a/srt/SrtTransport.hpp +++ b/srt/SrtTransport.hpp @@ -8,7 +8,6 @@ #include "Network/Session.h" #include "Poller/EventPoller.h" -#include "Util/TimeTicker.h" #include "Common.hpp" #include "Packet.hpp" @@ -72,6 +71,7 @@ private: void sendNAKPacket(std::list& lost_list); void sendACKPacket(); void sendLightACKPacket(); + void sendKeepLivePacket(); protected: void sendDataPacket(DataPacket::Ptr pkt,char* buf,int len,bool flush = false); void sendControlPacket(ControlPacket::Ptr pkt,bool flush = true); @@ -87,6 +87,7 @@ private: uint32_t _peer_socket_id; uint32_t _socket_id = 0; + TimePoint _now; TimePoint _start_timestamp; uint32_t _mtu = 1500; @@ -102,14 +103,14 @@ private: uint32_t _light_ack_pkt_count = 0; uint32_t _ack_number_count = 0; uint32_t _last_ack_pkt_seq_num = 0; - Ticker _ack_ticker; + UTicker _ack_ticker; std::map _ack_send_timestamp; - PacketRecvRateContext _pkt_recv_rate_context; - EstimatedLinkCapacityContext _estimated_link_capacity_context; - RecvRateContext _recv_rate_context; + std::shared_ptr _pkt_recv_rate_context; + std::shared_ptr _estimated_link_capacity_context; + std::shared_ptr _recv_rate_context; - Ticker _nak_ticker; + UTicker _nak_ticker; //保持发送的握手消息,防止丢失重发 HandshakePacket::Ptr _handleshake_res; diff --git a/srt/Statistic.cpp b/srt/Statistic.cpp index c836b56e..9fc13cd5 100644 --- a/srt/Statistic.cpp +++ b/srt/Statistic.cpp @@ -2,33 +2,47 @@ #include "Statistic.hpp" namespace SRT { -void PacketRecvRateContext::inputPacket(TimePoint ts) { +void PacketRecvRateContext::inputPacket(TimePoint& ts) { if(_pkt_map.size()>100){ _pkt_map.erase(_pkt_map.begin()); } - _pkt_map.emplace(ts,ts); + auto tmp = DurationCountMicroseconds(ts - _start); + _pkt_map.emplace(tmp,tmp); } uint32_t PacketRecvRateContext::getPacketRecvRate() { - if(_pkt_map.size()<2){ - return 0; + if (_pkt_map.size() < 2) { + return 50000; + } + int64_t dur = 1000; + for (auto it = _pkt_map.begin(); it != _pkt_map.end(); ++it) { + auto next = it; + ++next; + if (next != _pkt_map.end()) { + if ((next->first - it->first) < dur) { + dur = next->first - it->first; + } + } else { + break; + } } - auto first = _pkt_map.begin(); - auto last = _pkt_map.rbegin(); - double dur = DurationCountMicroseconds(last->first - first->first)/1000000.0; - double rate = _pkt_map.size()/dur; - return (uint32_t)rate; + double rate = 1e6 / (double)dur; + if(rate <=1000){ + return 50000; + } + return rate; } -void EstimatedLinkCapacityContext::inputPacket(TimePoint ts) { - if(_pkt_map.size()>16){ +void EstimatedLinkCapacityContext::inputPacket(TimePoint& ts) { + if (_pkt_map.size() > 16) { _pkt_map.erase(_pkt_map.begin()); } - _pkt_map.emplace(ts,ts); + auto tmp = DurationCountMicroseconds(ts - _start); + _pkt_map.emplace(tmp, tmp); } uint32_t EstimatedLinkCapacityContext::getEstimatedLinkCapacity() { decltype(_pkt_map.begin()) next; - std::vector tmp; + std::vector tmp; for(auto it = _pkt_map.begin();it != _pkt_map.end();++it){ next = it; @@ -47,19 +61,20 @@ uint32_t EstimatedLinkCapacityContext::getEstimatedLinkCapacity() { if(tmp.size()<16){ return 1000; } - return 1000; - double dur =DurationCountMicroseconds(tmp[tmp.size()/2])/1e6; + double dur =tmp[0]/1e6; return (uint32_t)(1.0/dur); } -void RecvRateContext::inputPacket(TimePoint ts, size_t size ) { +void RecvRateContext::inputPacket(TimePoint& ts, size_t size ) { if (_pkt_map.size() > 100) { _pkt_map.erase(_pkt_map.begin()); } - _pkt_map.emplace(ts, size); + auto tmp = DurationCountMicroseconds(ts - _start); + + _pkt_map.emplace(tmp, tmp); } uint32_t RecvRateContext::getRecvRate() { if(_pkt_map.size()<2){ @@ -68,7 +83,7 @@ uint32_t RecvRateContext::getRecvRate() { auto first = _pkt_map.begin(); auto last = _pkt_map.rbegin(); - double dur = DurationCountMicroseconds(last->first - first->first)/1000000.0; + double dur = (last->first - first->first)/1000000.0; size_t bytes = 0; for(auto it : _pkt_map){ diff --git a/srt/Statistic.hpp b/srt/Statistic.hpp index 283d4546..4524aebe 100644 --- a/srt/Statistic.hpp +++ b/srt/Statistic.hpp @@ -8,33 +8,36 @@ namespace SRT { class PacketRecvRateContext { public: - PacketRecvRateContext() = default; + PacketRecvRateContext(TimePoint start):_start(start){}; ~PacketRecvRateContext() = default; - void inputPacket(TimePoint ts); + void inputPacket(TimePoint& ts); uint32_t getPacketRecvRate(); private: - std::map _pkt_map; + std::map _pkt_map; + TimePoint _start; }; class EstimatedLinkCapacityContext { public: - EstimatedLinkCapacityContext() = default; + EstimatedLinkCapacityContext(TimePoint start):_start(start){}; ~EstimatedLinkCapacityContext() = default; - void inputPacket(TimePoint ts); + void inputPacket(TimePoint& ts); uint32_t getEstimatedLinkCapacity(); private: - std::map _pkt_map; + std::map _pkt_map; + TimePoint _start; }; class RecvRateContext { public: - RecvRateContext() = default; + RecvRateContext(TimePoint start):_start(start){}; ~RecvRateContext() = default; - void inputPacket(TimePoint ts,size_t size); + void inputPacket(TimePoint& ts,size_t size); uint32_t getRecvRate(); private: - std::map _pkt_map; + std::map _pkt_map; + TimePoint _start; };