diff --git a/src/Rtp/RtpProcess.h b/src/Rtp/RtpProcess.h index 0c55e375..1f53451b 100644 --- a/src/Rtp/RtpProcess.h +++ b/src/Rtp/RtpProcess.h @@ -27,9 +27,24 @@ public: typedef std::shared_ptr Ptr; RtpProcess(const string &stream_id); ~RtpProcess(); + + /** + * 输入rtp + * @param sock 本地监听的socket + * @param data rtp数据指针 + * @param data_len rtp数据长度 + * @param addr 数据源地址 + * @param dts_out 解析出最新的dts + * @return 是否解析成功 + */ bool inputRtp(const Socket::Ptr &sock, const char *data,int data_len, const struct sockaddr *addr , uint32_t *dts_out = nullptr); + + /** + * 是否超时,用于超时移除对象 + */ bool alive(); + /// SockInfo override string get_local_ip() override; uint16_t get_local_port() override; string get_peer_ip() override; diff --git a/src/Rtp/RtpSelector.cpp b/src/Rtp/RtpSelector.cpp index 36d8f8bc..06979972 100644 --- a/src/Rtp/RtpSelector.cpp +++ b/src/Rtp/RtpSelector.cpp @@ -15,20 +15,17 @@ namespace mediakit{ INSTANCE_IMP(RtpSelector); -bool RtpSelector::inputRtp(const Socket::Ptr &sock, string &stream_id, const char *data, int data_len, +bool RtpSelector::inputRtp(const Socket::Ptr &sock, const char *data, int data_len, const struct sockaddr *addr,uint32_t *dts_out) { - if (stream_id.empty()) { - //未指定流id,那么使用ssrc为流id - uint32_t ssrc = 0; - if (!getSSRC(data, data_len, ssrc)) { - WarnL << "get ssrc from rtp failed:" << data_len; - return false; - } - stream_id = printSSRC(ssrc); + //使用ssrc为流id + uint32_t ssrc = 0; + if (!getSSRC(data, data_len, ssrc)) { + WarnL << "get ssrc from rtp failed:" << data_len; + return false; } //假定指定了流id,那么通过流id来区分是否为一路流(哪怕可能同时收到多路流) - auto process = getProcess(stream_id, true); + auto process = getProcess(printSSRC(ssrc), true); if (process) { return process->inputRtp(sock, data, data_len, addr, dts_out); } diff --git a/src/Rtp/RtpSelector.h b/src/Rtp/RtpSelector.h index adb6f1a5..83f8bd2e 100644 --- a/src/Rtp/RtpSelector.h +++ b/src/Rtp/RtpSelector.h @@ -49,10 +49,31 @@ public: static bool getSSRC(const char *data,int data_len, uint32_t &ssrc); static RtpSelector &Instance(); - bool inputRtp(const Socket::Ptr &sock, string &stream_id, const char *data, int data_len, + /** + * 输入多个rtp流,根据ssrc分流 + * @param sock 本地socket + * @param data 收到的数据 + * @param data_len 收到的数据长度 + * @param addr rtp流源地址 + * @param dts_out 解析出最新的dts + * @return 是否成功 + */ + bool inputRtp(const Socket::Ptr &sock, const char *data, int data_len, const struct sockaddr *addr, uint32_t *dts_out = nullptr); + /** + * 获取一个rtp处理器 + * @param stream_id 流id + * @param makeNew 不存在时是否新建 + * @return rtp处理器 + */ RtpProcess::Ptr getProcess(const string &stream_id, bool makeNew); + + /** + * 删除rtp处理器 + * @param stream_id 流id + * @param ptr rtp处理器指针 + */ void delProcess(const string &stream_id, const RtpProcess *ptr); private: diff --git a/src/Rtp/RtpServer.cpp b/src/Rtp/RtpServer.cpp index 466b6a0c..fd006c50 100644 --- a/src/Rtp/RtpServer.cpp +++ b/src/Rtp/RtpServer.cpp @@ -17,25 +17,20 @@ RtpServer::RtpServer() { } RtpServer::~RtpServer() { - if(_udp_server){ - _udp_server->setOnRead(nullptr); + if(_on_clearup){ + _on_clearup(); } } void RtpServer::start(uint16_t local_port, const string &stream_id, bool enable_tcp, const char *local_ip) { _udp_server.reset(new Socket(nullptr, false)); - auto &ref = RtpSelector::Instance(); - auto sock = _udp_server; - _udp_server->setOnRead([&ref, sock, stream_id](const Buffer::Ptr &buf, struct sockaddr *addr, int) { - ref.inputRtp(sock, const_cast(stream_id), buf->data(), buf->size(), addr); - }); - //创建udp服务器 if (!_udp_server->bindUdpSock(local_port, local_ip)) { _udp_server = nullptr; string err = (StrPrinter << "bindUdpSock on " << local_ip << ":" << local_port << " failed:" << get_uv_errmsg(true)); throw std::runtime_error(err); } + //设置udp socket读缓存 SockUtil::setRecvBuf(_udp_server->rawFD(), 4 * 1024 * 1024); @@ -51,6 +46,31 @@ void RtpServer::start(uint16_t local_port, const string &stream_id, bool enable throw; } } + + auto sock = _udp_server; + RtpProcess::Ptr process; + if (!stream_id.empty()) { + //指定了流id,那么一个端口一个流(不管是否包含多个ssrc的多个流,绑定rtp源后,会筛选掉ip端口不匹配的流) + process = RtpSelector::Instance().getProcess(stream_id, true); + _udp_server->setOnRead([sock, process](const Buffer::Ptr &buf, struct sockaddr *addr, int) { + process->inputRtp(sock, buf->data(), buf->size(), addr); + }); + } else { + //未指定流id,一个端口多个流,通过ssrc来分流 + auto &ref = RtpSelector::Instance(); + _udp_server->setOnRead([&ref, sock](const Buffer::Ptr &buf, struct sockaddr *addr, int) { + ref.inputRtp(sock, buf->data(), buf->size(), addr); + }); + } + + _on_clearup = [sock, process, stream_id]() { + //去除循环引用 + sock->setOnRead(nullptr); + if (process) { + //删除rtp处理器 + RtpSelector::Instance().delProcess(stream_id, process.get()); + } + }; } EventPoller::Ptr RtpServer::getPoller() { diff --git a/src/Rtp/RtpServer.h b/src/Rtp/RtpServer.h index 7c406903..2623ae2b 100644 --- a/src/Rtp/RtpServer.h +++ b/src/Rtp/RtpServer.h @@ -55,6 +55,7 @@ public: protected: Socket::Ptr _udp_server; TcpServer::Ptr _tcp_server; + function _on_clearup; }; }//namespace mediakit diff --git a/tests/test_rtp.cpp b/tests/test_rtp.cpp index 7d6d0b42..24d9ac83 100644 --- a/tests/test_rtp.cpp +++ b/tests/test_rtp.cpp @@ -38,7 +38,6 @@ static bool loadFile(const char *path){ uint16_t len; char rtp[2 * 1024]; struct sockaddr addr = {0}; - string stream_id; while (true) { if (2 != fread(&len, 1, 2, fp)) { WarnL; @@ -56,7 +55,7 @@ static bool loadFile(const char *path){ } uint32_t timeStamp; - RtpSelector::Instance().inputRtp(nullptr, stream_id, rtp, len, &addr, &timeStamp); + RtpSelector::Instance().inputRtp(nullptr, rtp, len, &addr, &timeStamp); if(timeStamp_last){ auto diff = timeStamp - timeStamp_last; if(diff > 0 && diff < 500){