openRtpServer接口新增only_audio参数,优化语音对讲场景

This commit is contained in:
xiongziliang 2023-02-17 22:43:45 +08:00
parent 5cdaf982f3
commit 8f0ba6988b
10 changed files with 80 additions and 17 deletions

View File

@ -1404,6 +1404,12 @@
"value": "0", "value": "0",
"description": "是否指定收流的rtp ssrc, 十进制数字不指定或指定0时则不过滤rtp非必选参数", "description": "是否指定收流的rtp ssrc, 十进制数字不指定或指定0时则不过滤rtp非必选参数",
"disabled": true "disabled": true
},
{
"key": "only_audio",
"value": "1",
"description": "是否为单音频track用于语音对讲",
"disabled": true
} }
] ]
} }

View File

@ -391,7 +391,7 @@ Value makeMediaSourceJson(MediaSource &media){
} }
#if defined(ENABLE_RTPPROXY) #if defined(ENABLE_RTPPROXY)
uint16_t openRtpServer(uint16_t local_port, const string &stream_id, int tcp_mode, const string &local_ip, bool re_use_port, uint32_t ssrc) { uint16_t openRtpServer(uint16_t local_port, const string &stream_id, int tcp_mode, const string &local_ip, bool re_use_port, uint32_t ssrc, bool only_audio) {
lock_guard<recursive_mutex> lck(s_rtpServerMapMtx); lock_guard<recursive_mutex> lck(s_rtpServerMapMtx);
if (s_rtpServerMap.find(stream_id) != s_rtpServerMap.end()) { if (s_rtpServerMap.find(stream_id) != s_rtpServerMap.end()) {
//为了防止RtpProcess所有权限混乱的问题不允许重复添加相同的stream_id //为了防止RtpProcess所有权限混乱的问题不允许重复添加相同的stream_id
@ -399,7 +399,7 @@ uint16_t openRtpServer(uint16_t local_port, const string &stream_id, int tcp_mod
} }
RtpServer::Ptr server = std::make_shared<RtpServer>(); RtpServer::Ptr server = std::make_shared<RtpServer>();
server->start(local_port, stream_id, (RtpServer::TcpMode)tcp_mode, local_ip.c_str(), re_use_port, ssrc); server->start(local_port, stream_id, (RtpServer::TcpMode)tcp_mode, local_ip.c_str(), re_use_port, ssrc, only_audio);
server->setOnDetach([stream_id]() { server->setOnDetach([stream_id]() {
//设置rtp超时移除事件 //设置rtp超时移除事件
lock_guard<recursive_mutex> lck(s_rtpServerMapMtx); lock_guard<recursive_mutex> lck(s_rtpServerMapMtx);
@ -1140,7 +1140,7 @@ void installWebApi() {
tcp_mode = 1; tcp_mode = 1;
} }
auto port = openRtpServer(allArgs["port"], stream_id, tcp_mode, "::", allArgs["re_use_port"].as<bool>(), auto port = openRtpServer(allArgs["port"], stream_id, tcp_mode, "::", allArgs["re_use_port"].as<bool>(),
allArgs["ssrc"].as<uint32_t>()); allArgs["ssrc"].as<uint32_t>(), allArgs["only_audio"].as<bool>());
if (port == 0) { if (port == 0) {
throw InvalidArgsException("该stream_id已存在"); throw InvalidArgsException("该stream_id已存在");
} }

View File

@ -17,16 +17,20 @@ using namespace std;
namespace mediakit{ namespace mediakit{
bool MediaSink::addTrack(const Track::Ptr &track_in) { bool MediaSink::addTrack(const Track::Ptr &track_in) {
if (_only_audio && track_in->getTrackType() != TrackAudio) {
InfoL << "Only audio enabled, track ignored: " << track_in->getCodecName();
return false;
}
if (!_enable_audio) { if (!_enable_audio) {
//关闭音频时,加快单视频流注册速度 // 关闭音频时,加快单视频流注册速度
_max_track_size = 1;
if (track_in->getTrackType() == TrackAudio) { if (track_in->getTrackType() == TrackAudio) {
//音频被全局忽略 // 音频被全局忽略
InfoL << "Audio disabled, audio track ignored";
return false; return false;
} }
} }
if (_all_track_ready) { if (_all_track_ready) {
WarnL << "all track is ready, add this track too late!"; WarnL << "All track is ready, add track too late: " << track_in->getCodecName();
return false; return false;
} }
//克隆Track只拷贝其数据不拷贝其数据转发关系 //克隆Track只拷贝其数据不拷贝其数据转发关系
@ -48,7 +52,7 @@ bool MediaSink::addTrack(const Track::Ptr &track_in) {
if (frame_unread.size() > kMaxUnreadyFrame) { if (frame_unread.size() > kMaxUnreadyFrame) {
//未就绪的的track不能缓存太多的帧否则可能内存溢出 //未就绪的的track不能缓存太多的帧否则可能内存溢出
frame_unread.clear(); frame_unread.clear();
WarnL << "cached frame of unready track(" << frame->getCodecName() << ") is too much, now cleared"; WarnL << "Cached frame of unready track(" << frame->getCodecName() << ") is too much, now cleared";
} }
//还有Track未就绪先缓存之 //还有Track未就绪先缓存之
frame_unread.emplace_back(Frame::getCacheAbleFrame(frame)); frame_unread.emplace_back(Frame::getCacheAbleFrame(frame));
@ -124,8 +128,16 @@ void MediaSink::checkTrackIfReady(){
} }
} }
void MediaSink::addTrackCompleted(){ void MediaSink::addTrackCompleted() {
_max_track_size = _track_map.size(); setMaxTrackCount(_track_map.size());
}
void MediaSink::setMaxTrackCount(size_t i) {
if (_all_track_ready) {
WarnL << "All track is ready, set max track count ignored";
return;
}
_max_track_size = MAX(MIN(i, 2), 1);
checkTrackIfReady(); checkTrackIfReady();
} }
@ -134,14 +146,14 @@ void MediaSink::emitAllTrackReady() {
return; return;
} }
DebugL << "all track ready use " << _ticker.elapsedTime() << "ms"; DebugL << "All track ready use " << _ticker.elapsedTime() << "ms";
if (!_track_ready_callback.empty()) { if (!_track_ready_callback.empty()) {
//这是超时强制忽略未准备好的Track //这是超时强制忽略未准备好的Track
_track_ready_callback.clear(); _track_ready_callback.clear();
//移除未准备好的Track //移除未准备好的Track
for (auto it = _track_map.begin(); it != _track_map.end();) { for (auto it = _track_map.begin(); it != _track_map.end();) {
if (!it->second.second || !it->second.first->ready()) { if (!it->second.second || !it->second.first->ready()) {
WarnL << "track not ready for a long time, ignored: " << it->second.first->getCodecName(); WarnL << "Track not ready for a long time, ignored: " << it->second.first->getCodecName();
it = _track_map.erase(it); it = _track_map.erase(it);
continue; continue;
} }
@ -256,7 +268,7 @@ bool MediaSink::addMuteAudioTrack() {
return audio->inputFrame(frame); return audio->inputFrame(frame);
}); });
onTrackReady(audio); onTrackReady(audio);
TraceL << "mute aac track added"; TraceL << "Mute aac track added";
return true; return true;
} }
@ -266,6 +278,14 @@ bool MediaSink::isAllTrackReady() const {
void MediaSink::enableAudio(bool flag) { void MediaSink::enableAudio(bool flag) {
_enable_audio = flag; _enable_audio = flag;
_max_track_size = flag ? 2 : 1;
}
void MediaSink::setOnlyAudio(){
_only_audio = true;
_enable_audio = true;
_add_mute_audio = false;
_max_track_size = 1;
} }
void MediaSink::enableMuteAudio(bool flag) { void MediaSink::enableMuteAudio(bool flag) {

View File

@ -94,6 +94,12 @@ public:
*/ */
void addTrackCompleted() override; void addTrackCompleted() override;
/**
* track数1~2addTrackCompleted类型
* track时
*/
void setMaxTrackCount(size_t i);
/** /**
* track * track
*/ */
@ -115,6 +121,11 @@ public:
*/ */
void enableAudio(bool flag); void enableAudio(bool flag);
/**
*
*/
void setOnlyAudio();
/** /**
* *
*/ */
@ -157,6 +168,7 @@ private:
private: private:
bool _enable_audio = true; bool _enable_audio = true;
bool _only_audio = false;
bool _add_mute_audio = true; bool _add_mute_audio = true;
bool _all_track_ready = false; bool _all_track_ready = false;
size_t _max_track_size = 2; size_t _max_track_size = 2;

View File

@ -187,6 +187,10 @@ void RtpProcess::setStopCheckRtp(bool is_check){
} }
} }
void RtpProcess::setOnlyAudio(bool only_audio){
_only_audio = only_audio;
}
void RtpProcess::onDetach() { void RtpProcess::onDetach() {
if (_on_detach) { if (_on_detach) {
_on_detach(); _on_detach();
@ -247,6 +251,9 @@ void RtpProcess::emitOnPublish() {
strong_self->_media_info._app, strong_self->_media_info._app,
strong_self->_media_info._streamid,0.0f, strong_self->_media_info._streamid,0.0f,
option); option);
if (strong_self->_only_audio) {
strong_self->_muxer->setOnlyAudio();
}
strong_self->_muxer->setMediaListener(strong_self); strong_self->_muxer->setMediaListener(strong_self);
strong_self->doCachedFunc(); strong_self->doCachedFunc();
InfoP(strong_self) << "允许RTP推流"; InfoP(strong_self) << "允许RTP推流";

View File

@ -57,6 +57,12 @@ public:
*/ */
void setStopCheckRtp(bool is_check=false); void setStopCheckRtp(bool is_check=false);
/**
* track
* inputRtp前调用此方法
*/
void setOnlyAudio(bool only_audio);
/** /**
* flush输出缓存 * flush输出缓存
*/ */
@ -87,6 +93,7 @@ private:
void doCachedFunc(); void doCachedFunc();
private: private:
bool _only_audio = false;
uint64_t _dts = 0; uint64_t _dts = 0;
uint64_t _total_bytes = 0; uint64_t _total_bytes = 0;
std::unique_ptr<sockaddr_storage> _addr; std::unique_ptr<sockaddr_storage> _addr;

View File

@ -42,11 +42,12 @@ public:
} }
} }
void setRtpServerInfo(uint16_t local_port,RtpServer::TcpMode mode,bool re_use_port,uint32_t ssrc){ void setRtpServerInfo(uint16_t local_port,RtpServer::TcpMode mode,bool re_use_port,uint32_t ssrc, bool only_audio) {
_local_port = local_port; _local_port = local_port;
_tcp_mode = mode; _tcp_mode = mode;
_re_use_port = re_use_port; _re_use_port = re_use_port;
_ssrc = ssrc; _ssrc = ssrc;
_only_audio = only_audio;
} }
void setOnDetach(function<void()> cb) { void setOnDetach(function<void()> cb) {
@ -60,6 +61,7 @@ public:
void onRecvRtp(const Socket::Ptr &sock, const Buffer::Ptr &buf, struct sockaddr *addr) { void onRecvRtp(const Socket::Ptr &sock, const Buffer::Ptr &buf, struct sockaddr *addr) {
if (!_process) { if (!_process) {
_process = RtpSelector::Instance().getProcess(_stream_id, true); _process = RtpSelector::Instance().getProcess(_stream_id, true);
_process->setOnlyAudio(_only_audio);
_process->setOnDetach(std::move(_on_detach)); _process->setOnDetach(std::move(_on_detach));
cancelDelayTask(); cancelDelayTask();
} }
@ -137,6 +139,7 @@ private:
private: private:
bool _re_use_port = false; bool _re_use_port = false;
bool _only_audio = false;
uint16_t _local_port = 0; uint16_t _local_port = 0;
uint32_t _ssrc = 0; uint32_t _ssrc = 0;
RtpServer::TcpMode _tcp_mode = RtpServer::NONE; RtpServer::TcpMode _tcp_mode = RtpServer::NONE;
@ -150,7 +153,7 @@ private:
EventPoller::DelayTask::Ptr _delay_task; EventPoller::DelayTask::Ptr _delay_task;
}; };
void RtpServer::start(uint16_t local_port, const string &stream_id, TcpMode tcp_mode, const char *local_ip, bool re_use_port, uint32_t ssrc) { void RtpServer::start(uint16_t local_port, const string &stream_id, TcpMode tcp_mode, const char *local_ip, bool re_use_port, uint32_t ssrc, bool only_audio) {
//创建udp服务器 //创建udp服务器
Socket::Ptr rtp_socket = Socket::createSocket(nullptr, true); Socket::Ptr rtp_socket = Socket::createSocket(nullptr, true);
Socket::Ptr rtcp_socket = Socket::createSocket(nullptr, true); Socket::Ptr rtcp_socket = Socket::createSocket(nullptr, true);
@ -176,6 +179,7 @@ void RtpServer::start(uint16_t local_port, const string &stream_id, TcpMode tcp_
tcp_server = std::make_shared<TcpServer>(rtp_socket->getPoller()); tcp_server = std::make_shared<TcpServer>(rtp_socket->getPoller());
(*tcp_server)[RtpSession::kStreamID] = stream_id; (*tcp_server)[RtpSession::kStreamID] = stream_id;
(*tcp_server)[RtpSession::kSSRC] = ssrc; (*tcp_server)[RtpSession::kSSRC] = ssrc;
(*tcp_server)[RtpSession::kOnlyAudio] = only_audio;
if (tcp_mode == PASSIVE) { if (tcp_mode == PASSIVE) {
tcp_server->start<RtpSession>(rtp_socket->get_local_port(), local_ip); tcp_server->start<RtpSession>(rtp_socket->get_local_port(), local_ip);
} else if (stream_id.empty()) { } else if (stream_id.empty()) {
@ -191,7 +195,7 @@ void RtpServer::start(uint16_t local_port, const string &stream_id, TcpMode tcp_
//指定了流id那么一个端口一个流(不管是否包含多个ssrc的多个流绑定rtp源后会筛选掉ip端口不匹配的流) //指定了流id那么一个端口一个流(不管是否包含多个ssrc的多个流绑定rtp源后会筛选掉ip端口不匹配的流)
helper = std::make_shared<RtcpHelper>(std::move(rtcp_socket), stream_id); helper = std::make_shared<RtcpHelper>(std::move(rtcp_socket), stream_id);
helper->startRtcp(); helper->startRtcp();
helper->setRtpServerInfo(local_port,tcp_mode,re_use_port,ssrc); helper->setRtpServerInfo(local_port, tcp_mode, re_use_port, ssrc, only_audio);
bool bind_peer_addr = false; bool bind_peer_addr = false;
rtp_socket->setOnRead([rtp_socket, helper, ssrc, bind_peer_addr](const Buffer::Ptr &buf, struct sockaddr *addr, int addr_len) mutable { rtp_socket->setOnRead([rtp_socket, helper, ssrc, bind_peer_addr](const Buffer::Ptr &buf, struct sockaddr *addr, int addr_len) mutable {
RtpHeader *header = (RtpHeader *)buf->data(); RtpHeader *header = (RtpHeader *)buf->data();
@ -211,6 +215,7 @@ void RtpServer::start(uint16_t local_port, const string &stream_id, TcpMode tcp_
#if 1 #if 1
//单端口多线程接收多个流根据ssrc区分流 //单端口多线程接收多个流根据ssrc区分流
udp_server = std::make_shared<UdpServer>(rtp_socket->getPoller()); udp_server = std::make_shared<UdpServer>(rtp_socket->getPoller());
(*udp_server)[RtpSession::kOnlyAudio] = only_audio;
udp_server->start<RtpSession>(rtp_socket->get_local_port(), local_ip); udp_server->start<RtpSession>(rtp_socket->get_local_port(), local_ip);
rtp_socket = nullptr; rtp_socket = nullptr;
#else #else

View File

@ -44,7 +44,7 @@ public:
* @param ssrc ssrc * @param ssrc ssrc
*/ */
void start(uint16_t local_port, const std::string &stream_id = "", TcpMode tcp_mode = PASSIVE, void start(uint16_t local_port, const std::string &stream_id = "", TcpMode tcp_mode = PASSIVE,
const char *local_ip = "::", bool re_use_port = true, uint32_t ssrc = 0); const char *local_ip = "::", bool re_use_port = true, uint32_t ssrc = 0, bool only_audio = false);
/** /**
* tcp服务(tcp主动模式) * tcp服务(tcp主动模式)
@ -75,6 +75,7 @@ protected:
std::shared_ptr<RtcpHelper> _rtcp_helper; std::shared_ptr<RtcpHelper> _rtcp_helper;
std::function<void()> _on_cleanup; std::function<void()> _on_cleanup;
bool _only_audio = false;
//用于tcp主动模式 //用于tcp主动模式
TcpMode _tcp_mode = NONE; TcpMode _tcp_mode = NONE;
}; };

View File

@ -22,6 +22,7 @@ namespace mediakit{
const string RtpSession::kStreamID = "stream_id"; const string RtpSession::kStreamID = "stream_id";
const string RtpSession::kSSRC = "ssrc"; const string RtpSession::kSSRC = "ssrc";
const string RtpSession::kOnlyAudio = "only_audio";
void RtpSession::attachServer(const Server &server) { void RtpSession::attachServer(const Server &server) {
setParams(const_cast<Server &>(server)); setParams(const_cast<Server &>(server));
@ -30,6 +31,7 @@ void RtpSession::attachServer(const Server &server) {
void RtpSession::setParams(mINI &ini) { void RtpSession::setParams(mINI &ini) {
_stream_id = ini[kStreamID]; _stream_id = ini[kStreamID];
_ssrc = ini[kSSRC]; _ssrc = ini[kSSRC];
_only_audio = ini[kOnlyAudio];
} }
RtpSession::RtpSession(const Socket::Ptr &sock) : Session(sock) { RtpSession::RtpSession(const Socket::Ptr &sock) : Session(sock) {
@ -101,6 +103,7 @@ void RtpSession::onRtpPacket(const char *data, size_t len) {
} }
//tcp情况下一个tcp链接只可能是一路流不需要通过多个ssrc来区分所以不需要频繁getProcess //tcp情况下一个tcp链接只可能是一路流不需要通过多个ssrc来区分所以不需要频繁getProcess
_process = RtpSelector::Instance().getProcess(_stream_id, true); _process = RtpSelector::Instance().getProcess(_stream_id, true);
_process->setOnlyAudio(_only_audio);
_process->setDelegate(dynamic_pointer_cast<RtpSession>(shared_from_this())); _process->setDelegate(dynamic_pointer_cast<RtpSession>(shared_from_this()));
} }
try { try {

View File

@ -24,6 +24,7 @@ class RtpSession : public toolkit::Session, public RtpSplitter, public MediaSour
public: public:
static const std::string kStreamID; static const std::string kStreamID;
static const std::string kSSRC; static const std::string kSSRC;
static const std::string kOnlyAudio;
RtpSession(const toolkit::Socket::Ptr &sock); RtpSession(const toolkit::Socket::Ptr &sock);
~RtpSession() override; ~RtpSession() override;
@ -45,6 +46,7 @@ private:
bool _is_udp = false; bool _is_udp = false;
bool _search_rtp = false; bool _search_rtp = false;
bool _search_rtp_finished = false; bool _search_rtp_finished = false;
bool _only_audio = false;
uint32_t _ssrc = 0; uint32_t _ssrc = 0;
toolkit::Ticker _ticker; toolkit::Ticker _ticker;
std::string _stream_id; std::string _stream_id;