diff --git a/src/Rtp/RtpProcess.cpp b/src/Rtp/RtpProcess.cpp index c0e16d64..1d95f6b0 100644 --- a/src/Rtp/RtpProcess.cpp +++ b/src/Rtp/RtpProcess.cpp @@ -11,6 +11,7 @@ #if defined(ENABLE_RTPPROXY) #include "GB28181Process.h" #include "RtpProcess.h" +#include "RtpSelector.h" #include "Http/HttpTSPlayer.h" #include "Util/File.h" #include "Common/config.h" @@ -255,6 +256,9 @@ void RtpProcess::emitOnPublish() { } if (err.empty()) { strong_self->_muxer = std::make_shared(strong_self->_media_info, 0.0f, option); + if (!option.stream_replace.empty()) { + RtpSelector::Instance().addStreamReplace(strong_self->_media_info.stream, option.stream_replace); + } if (strong_self->_only_audio) { strong_self->_muxer->setOnlyAudio(); } diff --git a/src/Rtp/RtpSelector.cpp b/src/Rtp/RtpSelector.cpp index 8d165124..1eb3058a 100644 --- a/src/Rtp/RtpSelector.cpp +++ b/src/Rtp/RtpSelector.cpp @@ -23,6 +23,7 @@ INSTANCE_IMP(RtpSelector); void RtpSelector::clear(){ lock_guard lck(_mtx_map); _map_rtp_process.clear(); + _map_stream_replace.clear(); } bool RtpSelector::getSSRC(const char *data, size_t data_len, uint32_t &ssrc){ @@ -36,17 +37,23 @@ bool RtpSelector::getSSRC(const char *data, size_t data_len, uint32_t &ssrc){ RtpProcess::Ptr RtpSelector::getProcess(const string &stream_id,bool makeNew) { lock_guard lck(_mtx_map); - auto it = _map_rtp_process.find(stream_id); + string stream_id_origin = stream_id; + auto it_replace = _map_stream_replace.find(stream_id); + if (it_replace != _map_stream_replace.end()) { + stream_id_origin = it_replace->second; + } + + auto it = _map_rtp_process.find(stream_id_origin); if (it == _map_rtp_process.end() && !makeNew) { return nullptr; } if (it != _map_rtp_process.end() && makeNew) { //已经被其他线程持有了,不得再被持有,否则会存在线程安全的问题 - throw ProcessExisted(StrPrinter << "RtpProcess(" << stream_id << ") already existed"); + throw ProcessExisted(StrPrinter << "RtpProcess(" << stream_id_origin << ") already existed"); } - RtpProcessHelper::Ptr &ref = _map_rtp_process[stream_id]; + RtpProcessHelper::Ptr &ref = _map_rtp_process[stream_id_origin]; if (!ref) { - ref = std::make_shared(stream_id, shared_from_this()); + ref = std::make_shared(stream_id_origin, shared_from_this()); ref->attachEvent(); createTimer(); } @@ -81,10 +88,25 @@ void RtpSelector::delProcess(const string &stream_id,const RtpProcess *ptr) { } process = it->second->getProcess(); _map_rtp_process.erase(it); + delStreamReplace(stream_id); } process->onDetach(); } +void RtpSelector::addStreamReplace(const string &stream_id, const std::string &stream_replace) { + lock_guard lck(_mtx_map); + _map_stream_replace[stream_replace] = stream_id; +} + +void RtpSelector::delStreamReplace(const string &stream_id) { + for (auto it = _map_stream_replace.begin(); it != _map_stream_replace.end(); ++it) { + if (it->second == stream_id) { + _map_stream_replace.erase(it); + break; + } + } +} + void RtpSelector::onManager() { List clear_list; { @@ -96,6 +118,7 @@ void RtpSelector::onManager() { } WarnL << "RtpProcess timeout:" << it->first; clear_list.emplace_back(it->second->getProcess()); + delStreamReplace(it->first); it = _map_rtp_process.erase(it); } } diff --git a/src/Rtp/RtpSelector.h b/src/Rtp/RtpSelector.h index e48622d5..4f46e8dc 100644 --- a/src/Rtp/RtpSelector.h +++ b/src/Rtp/RtpSelector.h @@ -70,14 +70,18 @@ public: */ void delProcess(const std::string &stream_id, const RtpProcess *ptr); + void addStreamReplace(const std::string &stream_id, const std::string &stream_replace); + private: void onManager(); void createTimer(); + void delStreamReplace(const std::string &stream_id); private: toolkit::Timer::Ptr _timer; std::recursive_mutex _mtx_map; std::unordered_map _map_rtp_process; + std::unordered_map _map_stream_replace; }; }//namespace mediakit