Support mpegts rtp payload in startSendRtp (#3335)

This commit is contained in:
夏楚 2024-03-02 16:53:53 +08:00 committed by GitHub
parent 5a6364bae2
commit 24ad9c9b9e
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
6 changed files with 47 additions and 18 deletions

View File

@ -1289,7 +1289,10 @@ void installWebApi() {
if (!src) { if (!src) {
throw ApiRetException("can not find the source stream", API::NotFound); throw ApiRetException("can not find the source stream", API::NotFound);
} }
if (!allArgs["use_ps"].empty()) {
// 兼容之前的use_ps参数
allArgs["type"] = allArgs["use_ps"].as<int>();
}
MediaSourceEvent::SendRtpArgs args; MediaSourceEvent::SendRtpArgs args;
args.passive = false; args.passive = false;
args.dst_url = allArgs["dst_url"]; args.dst_url = allArgs["dst_url"];
@ -1299,11 +1302,11 @@ void installWebApi() {
args.is_udp = allArgs["is_udp"]; args.is_udp = allArgs["is_udp"];
args.src_port = allArgs["src_port"]; args.src_port = allArgs["src_port"];
args.pt = allArgs["pt"].empty() ? 96 : allArgs["pt"].as<int>(); args.pt = allArgs["pt"].empty() ? 96 : allArgs["pt"].as<int>();
args.use_ps = allArgs["use_ps"].empty() ? true : allArgs["use_ps"].as<bool>(); args.type = (MediaSourceEvent::SendRtpArgs::Type)(allArgs["type"].as<int>());
args.only_audio = allArgs["only_audio"].as<bool>(); args.only_audio = allArgs["only_audio"].as<bool>();
args.udp_rtcp_timeout = allArgs["udp_rtcp_timeout"]; args.udp_rtcp_timeout = allArgs["udp_rtcp_timeout"];
args.recv_stream_id = allArgs["recv_stream_id"]; args.recv_stream_id = allArgs["recv_stream_id"];
TraceL << "startSendRtp, pt " << int(args.pt) << " ps " << args.use_ps << " audio " << args.only_audio; TraceL << "startSendRtp, pt " << int(args.pt) << " rtp type " << args.type << " audio " << args.only_audio;
src->getOwnerPoller()->async([=]() mutable { src->getOwnerPoller()->async([=]() mutable {
src->startSendRtp(args, [val, headerOut, invoker](uint16_t local_port, const SockException &ex) mutable { src->startSendRtp(args, [val, headerOut, invoker](uint16_t local_port, const SockException &ex) mutable {
@ -1326,18 +1329,23 @@ void installWebApi() {
throw ApiRetException("can not find the source stream", API::NotFound); throw ApiRetException("can not find the source stream", API::NotFound);
} }
if (!allArgs["use_ps"].empty()) {
// 兼容之前的use_ps参数
allArgs["type"] = allArgs["use_ps"].as<int>();
}
MediaSourceEvent::SendRtpArgs args; MediaSourceEvent::SendRtpArgs args;
args.passive = true; args.passive = true;
args.ssrc = allArgs["ssrc"]; args.ssrc = allArgs["ssrc"];
args.is_udp = false; args.is_udp = false;
args.src_port = allArgs["src_port"]; args.src_port = allArgs["src_port"];
args.pt = allArgs["pt"].empty() ? 96 : allArgs["pt"].as<int>(); args.pt = allArgs["pt"].empty() ? 96 : allArgs["pt"].as<int>();
args.use_ps = allArgs["use_ps"].empty() ? true : allArgs["use_ps"].as<bool>(); args.type = (MediaSourceEvent::SendRtpArgs::Type)(allArgs["type"].as<int>());
args.only_audio = allArgs["only_audio"].as<bool>(); args.only_audio = allArgs["only_audio"].as<bool>();
args.recv_stream_id = allArgs["recv_stream_id"]; args.recv_stream_id = allArgs["recv_stream_id"];
//tcp被动服务器等待链接超时时间 //tcp被动服务器等待链接超时时间
args.tcp_passive_close_delay_ms = allArgs["close_delay_ms"]; args.tcp_passive_close_delay_ms = allArgs["close_delay_ms"];
TraceL << "startSendRtpPassive, pt " << int(args.pt) << " ps " << args.use_ps << " audio " << args.only_audio; TraceL << "startSendRtpPassive, pt " << int(args.pt) << " rtp type " << args.type << " audio " << args.only_audio;
src->getOwnerPoller()->async([=]() mutable { src->getOwnerPoller()->async([=]() mutable {
src->startSendRtp(args, [val, headerOut, invoker](uint16_t local_port, const SockException &ex) mutable { src->startSendRtp(args, [val, headerOut, invoker](uint16_t local_port, const SockException &ex) mutable {

View File

@ -92,10 +92,11 @@ public:
class SendRtpArgs { class SendRtpArgs {
public: public:
enum Type { kRtpRAW = 0, kRtpPS = 1, kRtpTS = 2 };
// 是否采用udp方式发送rtp // 是否采用udp方式发送rtp
bool is_udp = true; bool is_udp = true;
// rtp采用ps还是es方式 // rtp类型
bool use_ps = true; Type type = kRtpPS;
//发送es流时指定是否只发送纯音频流 //发送es流时指定是否只发送纯音频流
bool only_audio = false; bool only_audio = false;
//tcp被动方式 //tcp被动方式

View File

@ -19,9 +19,17 @@ using namespace toolkit;
namespace mediakit{ namespace mediakit{
PSEncoderImp::PSEncoderImp(uint32_t ssrc, uint8_t payload_type) : MpegMuxer(true) { PSEncoderImp::PSEncoderImp(uint32_t ssrc, uint8_t payload_type, bool ps_or_ts) : MpegMuxer(ps_or_ts) {
GET_CONFIG(uint32_t,video_mtu,Rtp::kVideoMtuSize); GET_CONFIG(uint32_t, s_video_mtu, Rtp::kVideoMtuSize);
_rtp_encoder = std::make_shared<CommonRtpEncoder>(); _rtp_encoder = std::make_shared<CommonRtpEncoder>();
auto video_mtu = s_video_mtu;
if (!ps_or_ts) {
// 确保ts rtp负载部分长度是188的倍数
video_mtu = RtpPacket::kRtpHeaderSize + (s_video_mtu - (s_video_mtu % 188));
if (video_mtu > s_video_mtu) {
video_mtu -= 188;
}
}
_rtp_encoder->setRtpInfo(ssrc, video_mtu, 90000, payload_type); _rtp_encoder->setRtpInfo(ssrc, video_mtu, 90000, payload_type);
auto ring = std::make_shared<RtpRing::RingType>(); auto ring = std::make_shared<RtpRing::RingType>();
ring->setDelegate(std::make_shared<RingDelegateHelper>([this](RtpPacket::Ptr rtp, bool is_key) { onRTP(std::move(rtp), is_key); })); ring->setDelegate(std::make_shared<RingDelegateHelper>([this](RtpPacket::Ptr rtp, bool is_key) { onRTP(std::move(rtp), is_key); }));

View File

@ -16,11 +16,19 @@
#include "Record/MPEG.h" #include "Record/MPEG.h"
#include "Common/MediaSink.h" #include "Common/MediaSink.h"
namespace mediakit{ namespace mediakit {
class CommonRtpEncoder; class CommonRtpEncoder;
class PSEncoderImp : public MpegMuxer{
class PSEncoderImp : public MpegMuxer {
public: public:
PSEncoderImp(uint32_t ssrc, uint8_t payload_type = 96); /**
* psh或ts rtp编码器
* @param ssrc rtp的ssrc
* @param payload_type rtp的pt
* @param ps_or_ts true: ps, false: ts
*/
PSEncoderImp(uint32_t ssrc, uint8_t payload_type = 96, bool ps_or_ts = true);
~PSEncoderImp() override; ~PSEncoderImp() override;
protected: protected:

View File

@ -40,7 +40,9 @@ private:
class RtpCachePS : public RtpCache, public PSEncoderImp { class RtpCachePS : public RtpCache, public PSEncoderImp {
public: public:
RtpCachePS(onFlushed cb, uint32_t ssrc, uint8_t payload_type = 96) : RtpCache(std::move(cb)), PSEncoderImp(ssrc, payload_type) {}; RtpCachePS(onFlushed cb, uint32_t ssrc, uint8_t payload_type = 96, bool ps_or_ts = true) :
RtpCache(std::move(cb)), PSEncoderImp(ssrc, ps_or_ts ? payload_type : Rtsp::PT_MP2T, ps_or_ts) {};
void flush() override; void flush() override;
protected: protected:
@ -56,6 +58,7 @@ protected:
void onRTP(toolkit::Buffer::Ptr rtp, bool is_key = false) override; void onRTP(toolkit::Buffer::Ptr rtp, bool is_key = false) override;
}; };
}//namespace mediakit } //namespace mediakit
#endif//ENABLE_RTPPROXY #endif//ENABLE_RTPPROXY
#endif //ZLMEDIAKIT_RTPCACHE_H #endif //ZLMEDIAKIT_RTPCACHE_H

View File

@ -40,10 +40,11 @@ void RtpSender::startSend(const MediaSourceEvent::SendRtpArgs &args, const funct
if (!_interface) { if (!_interface) {
//重连时不重新创建对象 //重连时不重新创建对象
auto lam = [this](std::shared_ptr<List<Buffer::Ptr>> list) { onFlushRtpList(std::move(list)); }; auto lam = [this](std::shared_ptr<List<Buffer::Ptr>> list) { onFlushRtpList(std::move(list)); };
if (args.use_ps) { switch (args.type) {
_interface = std::make_shared<RtpCachePS>(lam, atoi(args.ssrc.data()), args.pt); case MediaSourceEvent::SendRtpArgs::kRtpPS: _interface = std::make_shared<RtpCachePS>(lam, atoi(args.ssrc.data()), args.pt, true); break;
} else { case MediaSourceEvent::SendRtpArgs::kRtpTS: _interface = std::make_shared<RtpCachePS>(lam, atoi(args.ssrc.data()), args.pt, false); break;
_interface = std::make_shared<RtpCacheRaw>(lam, atoi(args.ssrc.data()), args.pt, args.only_audio); case MediaSourceEvent::SendRtpArgs::kRtpRAW: _interface = std::make_shared<RtpCacheRaw>(lam, atoi(args.ssrc.data()), args.pt, args.only_audio); break;
default: CHECK(0, "invalid rtp type:" + to_string(args.type)); break;
} }
} }