解决流名被替换后,getRtpInfo等接口无法使用新流名的问题 (#3265)

This commit is contained in:
chdahuzi 2024-01-27 20:55:57 +08:00 committed by GitHub
parent fd1ebb1a51
commit 56bdb14baf
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
3 changed files with 35 additions and 4 deletions

View File

@ -11,6 +11,7 @@
#if defined(ENABLE_RTPPROXY) #if defined(ENABLE_RTPPROXY)
#include "GB28181Process.h" #include "GB28181Process.h"
#include "RtpProcess.h" #include "RtpProcess.h"
#include "RtpSelector.h"
#include "Http/HttpTSPlayer.h" #include "Http/HttpTSPlayer.h"
#include "Util/File.h" #include "Util/File.h"
#include "Common/config.h" #include "Common/config.h"
@ -255,6 +256,9 @@ void RtpProcess::emitOnPublish() {
} }
if (err.empty()) { if (err.empty()) {
strong_self->_muxer = std::make_shared<MultiMediaSourceMuxer>(strong_self->_media_info, 0.0f, option); strong_self->_muxer = std::make_shared<MultiMediaSourceMuxer>(strong_self->_media_info, 0.0f, option);
if (!option.stream_replace.empty()) {
RtpSelector::Instance().addStreamReplace(strong_self->_media_info.stream, option.stream_replace);
}
if (strong_self->_only_audio) { if (strong_self->_only_audio) {
strong_self->_muxer->setOnlyAudio(); strong_self->_muxer->setOnlyAudio();
} }

View File

@ -23,6 +23,7 @@ INSTANCE_IMP(RtpSelector);
void RtpSelector::clear(){ void RtpSelector::clear(){
lock_guard<decltype(_mtx_map)> lck(_mtx_map); lock_guard<decltype(_mtx_map)> lck(_mtx_map);
_map_rtp_process.clear(); _map_rtp_process.clear();
_map_stream_replace.clear();
} }
bool RtpSelector::getSSRC(const char *data, size_t data_len, uint32_t &ssrc){ bool RtpSelector::getSSRC(const char *data, size_t data_len, uint32_t &ssrc){
@ -36,17 +37,23 @@ bool RtpSelector::getSSRC(const char *data, size_t data_len, uint32_t &ssrc){
RtpProcess::Ptr RtpSelector::getProcess(const string &stream_id,bool makeNew) { RtpProcess::Ptr RtpSelector::getProcess(const string &stream_id,bool makeNew) {
lock_guard<decltype(_mtx_map)> lck(_mtx_map); lock_guard<decltype(_mtx_map)> lck(_mtx_map);
auto it = _map_rtp_process.find(stream_id); string stream_id_origin = stream_id;
auto it_replace = _map_stream_replace.find(stream_id);
if (it_replace != _map_stream_replace.end()) {
stream_id_origin = it_replace->second;
}
auto it = _map_rtp_process.find(stream_id_origin);
if (it == _map_rtp_process.end() && !makeNew) { if (it == _map_rtp_process.end() && !makeNew) {
return nullptr; return nullptr;
} }
if (it != _map_rtp_process.end() && makeNew) { if (it != _map_rtp_process.end() && makeNew) {
//已经被其他线程持有了,不得再被持有,否则会存在线程安全的问题 //已经被其他线程持有了,不得再被持有,否则会存在线程安全的问题
throw ProcessExisted(StrPrinter << "RtpProcess(" << stream_id << ") already existed"); throw ProcessExisted(StrPrinter << "RtpProcess(" << stream_id_origin << ") already existed");
} }
RtpProcessHelper::Ptr &ref = _map_rtp_process[stream_id]; RtpProcessHelper::Ptr &ref = _map_rtp_process[stream_id_origin];
if (!ref) { if (!ref) {
ref = std::make_shared<RtpProcessHelper>(stream_id, shared_from_this()); ref = std::make_shared<RtpProcessHelper>(stream_id_origin, shared_from_this());
ref->attachEvent(); ref->attachEvent();
createTimer(); createTimer();
} }
@ -81,10 +88,25 @@ void RtpSelector::delProcess(const string &stream_id,const RtpProcess *ptr) {
} }
process = it->second->getProcess(); process = it->second->getProcess();
_map_rtp_process.erase(it); _map_rtp_process.erase(it);
delStreamReplace(stream_id);
} }
process->onDetach(); process->onDetach();
} }
void RtpSelector::addStreamReplace(const string &stream_id, const std::string &stream_replace) {
lock_guard<decltype(_mtx_map)> lck(_mtx_map);
_map_stream_replace[stream_replace] = stream_id;
}
void RtpSelector::delStreamReplace(const string &stream_id) {
for (auto it = _map_stream_replace.begin(); it != _map_stream_replace.end(); ++it) {
if (it->second == stream_id) {
_map_stream_replace.erase(it);
break;
}
}
}
void RtpSelector::onManager() { void RtpSelector::onManager() {
List<RtpProcess::Ptr> clear_list; List<RtpProcess::Ptr> clear_list;
{ {
@ -96,6 +118,7 @@ void RtpSelector::onManager() {
} }
WarnL << "RtpProcess timeout:" << it->first; WarnL << "RtpProcess timeout:" << it->first;
clear_list.emplace_back(it->second->getProcess()); clear_list.emplace_back(it->second->getProcess());
delStreamReplace(it->first);
it = _map_rtp_process.erase(it); it = _map_rtp_process.erase(it);
} }
} }

View File

@ -70,14 +70,18 @@ public:
*/ */
void delProcess(const std::string &stream_id, const RtpProcess *ptr); void delProcess(const std::string &stream_id, const RtpProcess *ptr);
void addStreamReplace(const std::string &stream_id, const std::string &stream_replace);
private: private:
void onManager(); void onManager();
void createTimer(); void createTimer();
void delStreamReplace(const std::string &stream_id);
private: private:
toolkit::Timer::Ptr _timer; toolkit::Timer::Ptr _timer;
std::recursive_mutex _mtx_map; std::recursive_mutex _mtx_map;
std::unordered_map<std::string,RtpProcessHelper::Ptr> _map_rtp_process; std::unordered_map<std::string,RtpProcessHelper::Ptr> _map_rtp_process;
std::unordered_map<std::string,std::string> _map_stream_replace;
}; };
}//namespace mediakit }//namespace mediakit