From 24ad9c9b9e3ec19ddf967af86962e43e752e550d Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E5=A4=8F=E6=A5=9A?= <771730766@qq.com> Date: Sat, 2 Mar 2024 16:53:53 +0800 Subject: [PATCH] Support mpegts rtp payload in startSendRtp (#3335) --- server/WebApi.cpp | 18 +++++++++++++----- src/Common/MediaSource.h | 5 +++-- src/Rtp/PSEncoder.cpp | 12 ++++++++++-- src/Rtp/PSEncoder.h | 14 +++++++++++--- src/Rtp/RtpCache.h | 7 +++++-- src/Rtp/RtpSender.cpp | 9 +++++---- 6 files changed, 47 insertions(+), 18 deletions(-) diff --git a/server/WebApi.cpp b/server/WebApi.cpp index f13fd4f4..8b53e948 100755 --- a/server/WebApi.cpp +++ b/server/WebApi.cpp @@ -1289,7 +1289,10 @@ void installWebApi() { if (!src) { throw ApiRetException("can not find the source stream", API::NotFound); } - + if (!allArgs["use_ps"].empty()) { + // 兼容之前的use_ps参数 + allArgs["type"] = allArgs["use_ps"].as(); + } MediaSourceEvent::SendRtpArgs args; args.passive = false; args.dst_url = allArgs["dst_url"]; @@ -1299,11 +1302,11 @@ void installWebApi() { args.is_udp = allArgs["is_udp"]; args.src_port = allArgs["src_port"]; args.pt = allArgs["pt"].empty() ? 96 : allArgs["pt"].as(); - args.use_ps = allArgs["use_ps"].empty() ? true : allArgs["use_ps"].as(); + args.type = (MediaSourceEvent::SendRtpArgs::Type)(allArgs["type"].as()); args.only_audio = allArgs["only_audio"].as(); args.udp_rtcp_timeout = allArgs["udp_rtcp_timeout"]; args.recv_stream_id = allArgs["recv_stream_id"]; - TraceL << "startSendRtp, pt " << int(args.pt) << " ps " << args.use_ps << " audio " << args.only_audio; + TraceL << "startSendRtp, pt " << int(args.pt) << " rtp type " << args.type << " audio " << args.only_audio; src->getOwnerPoller()->async([=]() mutable { src->startSendRtp(args, [val, headerOut, invoker](uint16_t local_port, const SockException &ex) mutable { @@ -1326,18 +1329,23 @@ void installWebApi() { throw ApiRetException("can not find the source stream", API::NotFound); } + if (!allArgs["use_ps"].empty()) { + // 兼容之前的use_ps参数 + allArgs["type"] = allArgs["use_ps"].as(); + } + MediaSourceEvent::SendRtpArgs args; args.passive = true; args.ssrc = allArgs["ssrc"]; args.is_udp = false; args.src_port = allArgs["src_port"]; args.pt = allArgs["pt"].empty() ? 96 : allArgs["pt"].as(); - args.use_ps = allArgs["use_ps"].empty() ? true : allArgs["use_ps"].as(); + args.type = (MediaSourceEvent::SendRtpArgs::Type)(allArgs["type"].as()); args.only_audio = allArgs["only_audio"].as(); args.recv_stream_id = allArgs["recv_stream_id"]; //tcp被动服务器等待链接超时时间 args.tcp_passive_close_delay_ms = allArgs["close_delay_ms"]; - TraceL << "startSendRtpPassive, pt " << int(args.pt) << " ps " << args.use_ps << " audio " << args.only_audio; + TraceL << "startSendRtpPassive, pt " << int(args.pt) << " rtp type " << args.type << " audio " << args.only_audio; src->getOwnerPoller()->async([=]() mutable { src->startSendRtp(args, [val, headerOut, invoker](uint16_t local_port, const SockException &ex) mutable { diff --git a/src/Common/MediaSource.h b/src/Common/MediaSource.h index 4c6b33e9..1ce0d8c5 100644 --- a/src/Common/MediaSource.h +++ b/src/Common/MediaSource.h @@ -92,10 +92,11 @@ public: class SendRtpArgs { public: + enum Type { kRtpRAW = 0, kRtpPS = 1, kRtpTS = 2 }; // 是否采用udp方式发送rtp bool is_udp = true; - // rtp采用ps还是es方式 - bool use_ps = true; + // rtp类型 + Type type = kRtpPS; //发送es流时指定是否只发送纯音频流 bool only_audio = false; //tcp被动方式 diff --git a/src/Rtp/PSEncoder.cpp b/src/Rtp/PSEncoder.cpp index 66e39bc7..9de7f43b 100644 --- a/src/Rtp/PSEncoder.cpp +++ b/src/Rtp/PSEncoder.cpp @@ -19,9 +19,17 @@ using namespace toolkit; namespace mediakit{ -PSEncoderImp::PSEncoderImp(uint32_t ssrc, uint8_t payload_type) : MpegMuxer(true) { - GET_CONFIG(uint32_t,video_mtu,Rtp::kVideoMtuSize); +PSEncoderImp::PSEncoderImp(uint32_t ssrc, uint8_t payload_type, bool ps_or_ts) : MpegMuxer(ps_or_ts) { + GET_CONFIG(uint32_t, s_video_mtu, Rtp::kVideoMtuSize); _rtp_encoder = std::make_shared(); + auto video_mtu = s_video_mtu; + if (!ps_or_ts) { + // 确保ts rtp负载部分长度是188的倍数 + video_mtu = RtpPacket::kRtpHeaderSize + (s_video_mtu - (s_video_mtu % 188)); + if (video_mtu > s_video_mtu) { + video_mtu -= 188; + } + } _rtp_encoder->setRtpInfo(ssrc, video_mtu, 90000, payload_type); auto ring = std::make_shared(); ring->setDelegate(std::make_shared([this](RtpPacket::Ptr rtp, bool is_key) { onRTP(std::move(rtp), is_key); })); diff --git a/src/Rtp/PSEncoder.h b/src/Rtp/PSEncoder.h index 16bbcd4f..e195913a 100644 --- a/src/Rtp/PSEncoder.h +++ b/src/Rtp/PSEncoder.h @@ -16,11 +16,19 @@ #include "Record/MPEG.h" #include "Common/MediaSink.h" -namespace mediakit{ +namespace mediakit { + class CommonRtpEncoder; -class PSEncoderImp : public MpegMuxer{ + +class PSEncoderImp : public MpegMuxer { public: - PSEncoderImp(uint32_t ssrc, uint8_t payload_type = 96); + /** + * 创建psh或ts rtp编码器 + * @param ssrc rtp的ssrc + * @param payload_type rtp的pt + * @param ps_or_ts true: ps, false: ts + */ + PSEncoderImp(uint32_t ssrc, uint8_t payload_type = 96, bool ps_or_ts = true); ~PSEncoderImp() override; protected: diff --git a/src/Rtp/RtpCache.h b/src/Rtp/RtpCache.h index c8778e18..bd570c47 100644 --- a/src/Rtp/RtpCache.h +++ b/src/Rtp/RtpCache.h @@ -40,7 +40,9 @@ private: class RtpCachePS : public RtpCache, public PSEncoderImp { public: - RtpCachePS(onFlushed cb, uint32_t ssrc, uint8_t payload_type = 96) : RtpCache(std::move(cb)), PSEncoderImp(ssrc, payload_type) {}; + RtpCachePS(onFlushed cb, uint32_t ssrc, uint8_t payload_type = 96, bool ps_or_ts = true) : + RtpCache(std::move(cb)), PSEncoderImp(ssrc, ps_or_ts ? payload_type : Rtsp::PT_MP2T, ps_or_ts) {}; + void flush() override; protected: @@ -56,6 +58,7 @@ protected: void onRTP(toolkit::Buffer::Ptr rtp, bool is_key = false) override; }; -}//namespace mediakit +} //namespace mediakit + #endif//ENABLE_RTPPROXY #endif //ZLMEDIAKIT_RTPCACHE_H diff --git a/src/Rtp/RtpSender.cpp b/src/Rtp/RtpSender.cpp index 28b7770b..ed0d836e 100644 --- a/src/Rtp/RtpSender.cpp +++ b/src/Rtp/RtpSender.cpp @@ -40,10 +40,11 @@ void RtpSender::startSend(const MediaSourceEvent::SendRtpArgs &args, const funct if (!_interface) { //重连时不重新创建对象 auto lam = [this](std::shared_ptr> list) { onFlushRtpList(std::move(list)); }; - if (args.use_ps) { - _interface = std::make_shared(lam, atoi(args.ssrc.data()), args.pt); - } else { - _interface = std::make_shared(lam, atoi(args.ssrc.data()), args.pt, args.only_audio); + switch (args.type) { + case MediaSourceEvent::SendRtpArgs::kRtpPS: _interface = std::make_shared(lam, atoi(args.ssrc.data()), args.pt, true); break; + case MediaSourceEvent::SendRtpArgs::kRtpTS: _interface = std::make_shared(lam, atoi(args.ssrc.data()), args.pt, false); break; + case MediaSourceEvent::SendRtpArgs::kRtpRAW: _interface = std::make_shared(lam, atoi(args.ssrc.data()), args.pt, args.only_audio); break; + default: CHECK(0, "invalid rtp type:" + to_string(args.type)); break; } }