diff --git a/src/Common/MediaSource.cpp b/src/Common/MediaSource.cpp index d489a217..07b81a5a 100644 --- a/src/Common/MediaSource.cpp +++ b/src/Common/MediaSource.cpp @@ -80,6 +80,19 @@ vector MediaSource::getTracks(bool trackReady) const { void MediaSource::setTrackSource(const std::weak_ptr &track_src) { _track_source = track_src; + weak_ptr weakPtr = shared_from_this(); + EventPollerPool::Instance().getPoller()->async([weakPtr,this](){ + auto strongPtr = weakPtr.lock(); + if (!strongPtr) { + return; + } + NoticeCenter::Instance().emitEvent(Broadcast::kBroadcastMediaResetTracks, + _strSchema, + _strVhost, + _strApp, + _strId, + *this); + },false); } void MediaSource::setListener(const std::weak_ptr &listener){ @@ -293,13 +306,20 @@ void MediaSource::regist() { g_mapMediaSrc[_strSchema][_strVhost][_strApp][_strId] = shared_from_this(); } InfoL << _strSchema << " " << _strVhost << " " << _strApp << " " << _strId; - NoticeCenter::Instance().emitEvent(Broadcast::kBroadcastMediaChanged, - true, - _strSchema, - _strVhost, - _strApp, - _strId, - *this); + weak_ptr weakPtr = shared_from_this(); + EventPollerPool::Instance().getPoller()->async([weakPtr,this](){ + auto strongPtr = weakPtr.lock(); + if (!strongPtr) { + return; + } + NoticeCenter::Instance().emitEvent(Broadcast::kBroadcastMediaChanged, + true, + _strSchema, + _strVhost, + _strApp, + _strId, + *this); + },false); } bool MediaSource::unregist() { //反注册该源 @@ -328,6 +348,21 @@ void MediaSource::unregisted(){ _strApp, _strId, *this); + + weak_ptr weakPtr = shared_from_this(); + EventPollerPool::Instance().getPoller()->async([weakPtr,this](){ + auto strongPtr = weakPtr.lock(); + if (!strongPtr) { + return; + } + NoticeCenter::Instance().emitEvent(Broadcast::kBroadcastMediaChanged, + true, + _strSchema, + _strVhost, + _strApp, + _strId, + *this); + },false); } diff --git a/src/Common/MultiMediaSourceMuxer.h b/src/Common/MultiMediaSourceMuxer.h index 431f609c..0cda92dc 100644 --- a/src/Common/MultiMediaSourceMuxer.h +++ b/src/Common/MultiMediaSourceMuxer.h @@ -49,14 +49,32 @@ public: if (bEanbleRtsp) { _rtsp = std::make_shared(vhost, strApp, strId, std::make_shared(dur_sec)); } - if(bEanbleHls){ - _hls.reset(Recorder::createHlsRecorder(vhost, strApp, strId)); - } - if(bEnableMp4){ - _mp4.reset(Recorder::createMP4Recorder(vhost, strApp, strId)); + + _recordFunc = [bEanbleHls,bEnableMp4,vhost, strApp, strId](bool start){ + if(bEanbleHls){ + if(start){ + Recorder::startRecord(Recorder::type_hls,vhost, strApp, strId, true, false); + }else{ + Recorder::stopRecord(Recorder::type_hls,vhost, strApp, strId); + } + } + + if(bEnableMp4){ + if(start){ + Recorder::startRecord(Recorder::type_mp4,vhost, strApp, strId, true, false); + }else{ + Recorder::stopRecord(Recorder::type_mp4,vhost, strApp, strId); + } + } + }; + + _recordFunc(true); + } + virtual ~MultiMediaSourceMuxer(){ + if(_recordFunc){ + _recordFunc(false); } } - virtual ~MultiMediaSourceMuxer(){} /** * 重置音视频媒体 @@ -68,12 +86,6 @@ public: if(_rtsp){ _rtsp->resetTracks(); } - if(_hls){ - _hls->resetTracks(); - } - if(_mp4){ - _mp4->resetTracks(); - } } /** @@ -115,12 +127,6 @@ protected: if(_rtsp){ _rtsp->addTrack(track); } - if(_hls){ - _hls->addTrack(track); - } - if(_mp4){ - _mp4->addTrack(track); - } } /** @@ -134,12 +140,6 @@ protected: if(_rtsp) { _rtsp->inputFrame(frame); } - if(_hls){ - _hls->inputFrame(frame); - } - if(_mp4){ - _mp4->inputFrame(frame); - } } /** @@ -158,8 +158,7 @@ protected: private: RtmpMediaSourceMuxer::Ptr _rtmp; RtspMediaSourceMuxer::Ptr _rtsp; - MediaSinkInterface::Ptr _hls; - MediaSinkInterface::Ptr _mp4; + function _recordFunc; }; diff --git a/src/Common/config.cpp b/src/Common/config.cpp index 37597b96..426bb082 100644 --- a/src/Common/config.cpp +++ b/src/Common/config.cpp @@ -55,6 +55,7 @@ bool loadIniConfig(const char *ini_path){ ////////////广播名称/////////// namespace Broadcast { const string kBroadcastMediaChanged = "kBroadcastMediaChanged"; +const string kBroadcastMediaResetTracks = "kBroadcastMediaResetTracks"; const string kBroadcastRecordMP4 = "kBroadcastRecordMP4"; const string kBroadcastHttpRequest = "kBroadcastHttpRequest"; const string kBroadcastHttpAccess = "kBroadcastHttpAccess"; diff --git a/src/Common/config.h b/src/Common/config.h index 3f597c5a..9a090144 100644 --- a/src/Common/config.h +++ b/src/Common/config.h @@ -71,9 +71,13 @@ namespace Broadcast { extern const string kBroadcastMediaChanged; #define BroadcastMediaChangedArgs const bool &bRegist, const string &schema,const string &vhost,const string &app,const string &stream,MediaSource &sender +//MediaSource重置Track事件 +extern const string kBroadcastMediaResetTracks; +#define BroadcastMediaResetTracksArgs const string &schema,const string &vhost,const string &app,const string &stream,MediaSource &sender + //录制mp4文件成功后广播 extern const string kBroadcastRecordMP4; -#define BroadcastRecordMP4Args const Mp4Info &info +#define BroadcastRecordMP4Args const MP4Info &info //收到http api请求广播 extern const string kBroadcastHttpRequest; diff --git a/src/Record/MP4Recorder.cpp b/src/Record/MP4Recorder.cpp index 092258b4..e0332a56 100644 --- a/src/Record/MP4Recorder.cpp +++ b/src/Record/MP4Recorder.cpp @@ -107,7 +107,7 @@ void MP4Recorder::asyncClose() { auto info = _info; WorkThreadPool::Instance().getExecutor()->async([muxer,strFileTmp,strFile,info]() { //获取文件录制时间,放在关闭mp4之前是为了忽略关闭mp4执行时间 - const_cast(info).ui64TimeLen = ::time(NULL) - info.ui64StartedTime; + const_cast(info).ui64TimeLen = ::time(NULL) - info.ui64StartedTime; //关闭mp4非常耗时,所以要放在后台线程执行 const_cast(muxer).reset(); //临时文件名改成正式文件名,防止mp4未完成时被访问 @@ -115,7 +115,7 @@ void MP4Recorder::asyncClose() { //获取文件大小 struct stat fileData; stat(strFile.data(), &fileData); - const_cast(info).ui64FileSize = fileData.st_size; + const_cast(info).ui64FileSize = fileData.st_size; /////record 业务逻辑////// NoticeCenter::Instance().emitEvent(Broadcast::kBroadcastRecordMP4,info); }); diff --git a/src/Record/MP4Recorder.h b/src/Record/MP4Recorder.h index 2d9e84e9..3204fb51 100644 --- a/src/Record/MP4Recorder.h +++ b/src/Record/MP4Recorder.h @@ -42,7 +42,7 @@ using namespace toolkit; namespace mediakit { -class Mp4Info { +class MP4Info { public: time_t ui64StartedTime; //GMT标准时间,单位秒 time_t ui64TimeLen;//录像长度,单位秒 @@ -55,13 +55,15 @@ public: string strStreamId;//流ID string strVhost;//vhost }; + class MP4Recorder : public MediaSinkInterface{ public: typedef std::shared_ptr Ptr; + MP4Recorder(const string &strPath, - const string &strVhost , - const string &strApp, - const string &strStreamId); + const string &strVhost, + const string &strApp, + const string &strStreamId); virtual ~MP4Recorder(); /** @@ -87,7 +89,7 @@ private: string _strFile; string _strFileTmp; Ticker _createFileTicker; - Mp4Info _info; + MP4Info _info; bool _haveVideo = false; MP4MuxerFile::Ptr _muxer; list _tracks; diff --git a/src/Record/Recorder.cpp b/src/Record/Recorder.cpp index cc73324c..0627d318 100644 --- a/src/Record/Recorder.cpp +++ b/src/Record/Recorder.cpp @@ -26,13 +26,6 @@ #include "Recorder.h" #include "Common/config.h" -#include "Http/HttpSession.h" -#include "Util/util.h" -#include "Util/mini.h" -#include "Network/sockutil.h" -#include "HlsMakerImp.h" -#include "Player/PlayerBase.h" -#include "Common/MediaSink.h" #include "MP4Recorder.h" #include "HlsRecorder.h" @@ -40,7 +33,7 @@ using namespace toolkit; namespace mediakit { -MediaSinkInterface *Recorder::createHlsRecorder(const string &strVhost_tmp, const string &strApp, const string &strId) { +MediaSinkInterface *createHlsRecorder(const string &strVhost_tmp, const string &strApp, const string &strId) { #if defined(ENABLE_HLS) GET_CONFIG(bool, enableVhost, General::kEnableVhost); GET_CONFIG(string, hlsPath, Hls::kFilePath); @@ -66,7 +59,7 @@ MediaSinkInterface *Recorder::createHlsRecorder(const string &strVhost_tmp, cons #endif //defined(ENABLE_HLS) } -MediaSinkInterface *Recorder::createMP4Recorder(const string &strVhost_tmp, const string &strApp, const string &strId) { +MediaSinkInterface *createMP4Recorder(const string &strVhost_tmp, const string &strApp, const string &strId) { #if defined(ENABLE_MP4RECORD) GET_CONFIG(bool, enableVhost, General::kEnableVhost); GET_CONFIG(string, recordPath, Record::kFilePath); @@ -91,5 +84,286 @@ MediaSinkInterface *Recorder::createMP4Recorder(const string &strVhost_tmp, cons #endif //defined(ENABLE_MP4RECORD) } +//////////////////////////////////////////////////////////////////////////////////////// + +class RecorderHelper { +public: + typedef std::shared_ptr Ptr; + + /** + * 构建函数 + * @param bContinueRecord false表明hls录制从头开始录制(意味着hls临时文件在媒体反注册时会被删除) + */ + RecorderHelper(const MediaSinkInterface::Ptr &recorder, vector &&tracks , bool bContinueRecord, const string &schema) { + _recorder = recorder; + _continueRecord = bContinueRecord; + _schema = schema; + attachTracks(std::move(tracks)); + } + + ~RecorderHelper() { + resetTracks(); + } + + // 附则于track上 + void attachTracks(vector &&tracks){ + if(isTracksSame(tracks)){ + return; + } + resetTracks(); + _tracks = std::move(tracks); + for (auto &track : _tracks) { + _recorder->addTrack(track); + track->addDelegate(_recorder); + } + } + + + // 判断新的tracks是否与之前的一致 + bool isTracksSame(const vector &tracks){ + if(tracks.size() != _tracks.size()) { + return false; + } + int i = 0; + for(auto &track : tracks){ + if(track != _tracks[i++]){ + return false; + } + } + return true; + } + + // 重置所有track + void resetTracks(){ + if(_tracks.empty()){ + return; + } + for (auto &track : _tracks) { + track->delDelegate(_recorder.get()); + } + _tracks.clear(); + _recorder->resetTracks(); + } + + // 返回false表明hls录制从头开始录制(意味着hls临时文件在媒体反注册时会被删除) + bool continueRecord(){ + return _continueRecord; + } + + bool isRecording() { + return !_tracks.empty(); + } + + const string &getSchema() const{ + return _schema; + } +private: + MediaSinkInterface::Ptr _recorder; + vector _tracks; + bool _continueRecord; + string _schema; +}; + + +template +class MediaSourceWatcher { +public: + static MediaSourceWatcher& Instance(){ + static MediaSourceWatcher instance; + return instance; + } + + Recorder::status getRecordStatus(const string &vhost, const string &app, const string &stream_id) { + return getRecordStatus_l(getRecorderKey(vhost, app, stream_id)); + } + + int startRecord(const string &vhost, const string &app, const string &stream_id, bool waitForRecord, bool continueRecord) { + auto key = getRecorderKey(vhost, app, stream_id); + lock_guard lck(_recorder_mtx); + if (getRecordStatus_l(key) != Recorder::status_not_record) { + // 已经在录制了 + return 0; + } + + string schema; + auto tracks = findTracks(vhost, app, stream_id,schema); + if (!waitForRecord && tracks.empty()) { + // 暂时无法开启录制 + return -1; + } + + auto recorder = MediaSinkInterface::Ptr(createRecorder(vhost, app, stream_id)); + if (!recorder) { + // 创建录制器失败 + return -2; + } + _recorder_map[key] = std::make_shared(recorder, std::move(tracks), continueRecord, schema); + return 0; + } + + void stopRecord(const string &vhost, const string &app, const string &stream_id) { + lock_guard lck(_recorder_mtx); + _recorder_map.erase(getRecorderKey(vhost, app, stream_id)); + } + +private: + MediaSourceWatcher(){ + NoticeCenter::Instance().addListener(this,Broadcast::kBroadcastMediaChanged,[this](BroadcastMediaChangedArgs){ + if(bRegist){ + onRegist(schema,vhost,app,stream,sender); + }else{ + onUnRegist(schema,vhost,app,stream,sender); + } + }); + NoticeCenter::Instance().addListener(this,Broadcast::kBroadcastMediaResetTracks,[this](BroadcastMediaResetTracksArgs){ + onRegist(schema,vhost,app,stream,sender); + }); + } + + ~MediaSourceWatcher(){ + NoticeCenter::Instance().delListener(this,Broadcast::kBroadcastMediaChanged); + NoticeCenter::Instance().delListener(this,Broadcast::kBroadcastMediaResetTracks); + } + + void onRegist(const string &schema,const string &vhost,const string &app,const string &stream,MediaSource &sender){ + auto key = getRecorderKey(vhost,app,stream); + lock_guard lck(_recorder_mtx); + auto it = _recorder_map.find(key); + if(it == _recorder_map.end()){ + // 录像记录不存在 + return; + } + + auto tracks = sender.getTracks(true); + if (tracks.empty()) { + // 无有效的tracks + return; + } + + auto &helper = it->second; + if(!helper){ + // 对象不存在,创建之 + auto recorder = MediaSinkInterface::Ptr(createRecorder(vhost, app, stream)); + if (recorder) { + _recorder_map[key] = std::make_shared(recorder, std::move(tracks), false, schema); + } + return; + } + + if(helper->getSchema() == schema){ + // 对象存在且绑定的协议一致,替换tracks + helper->attachTracks(std::move(tracks)); + } + + } + + void onUnRegist(const string &schema,const string &vhost,const string &app,const string &stream,MediaSource &sender){ + auto key = getRecorderKey(vhost,app,stream); + lock_guard lck(_recorder_mtx); + auto it = _recorder_map.find(key); + if(it == _recorder_map.end()){ + // 录像记录不存在 + return; + } + + if(!it->second){ + // 录像对象为空,已经停止录制 + return; + } + if(it->second->continueRecord()){ + // 如果可以继续录制,那么只重置tracks,不删除对象 + it->second->resetTracks(); + }else{ + // 删除对象(意味着可能删除hls临时文件) + it->second.reset(); + } + } + + Recorder::status getRecordStatus_l(const string &key) { + auto it = _recorder_map.find(key); + if (it == _recorder_map.end()) { + return Recorder::status_not_record; + } + if (it->second && it->second->isRecording()) { + return Recorder::status_recording; + } + + return Recorder::status_wait_record; + } + + // 查找MediaSource以便录制 + vector findTracks(const string &vhost, const string &app, const string &stream_id,string &schema) { + auto src = MediaSource::find(RTMP_SCHEMA, vhost, app, stream_id); + if (src) { + auto ret = src->getTracks(true); + if (!ret.empty()) { + schema = RTMP_SCHEMA; + return std::move(ret); + } + } + + src = MediaSource::find(RTSP_SCHEMA, vhost, app, stream_id); + if (src) { + schema = RTSP_SCHEMA; + return src->getTracks(true); + } + return vector(); + } + + string getRecorderKey(const string &vhost, const string &app, const string &stream_id) { + return vhost + "/" + app + "/" + stream_id; + } + + MediaSinkInterface *createRecorder(const string &vhost, const string &app, const string &stream_id) { + MediaSinkInterface *ret = nullptr; + switch (type) { + case Recorder::type_hls: + ret = createHlsRecorder(vhost, app, stream_id); + break; + case Recorder::type_mp4: + ret = createMP4Recorder(vhost, app, stream_id); + break; + default: + break; + } + if(!ret){ + WarnL << "can not recorder of: " << type; + } + return ret; + } +private: + recursive_mutex _recorder_mtx; + unordered_map _recorder_map; +}; + + +Recorder::status Recorder::getRecordStatus(Recorder::type type, const string &vhost, const string &app, const string &stream_id) { + switch (type){ + case type_mp4: + return MediaSourceWatcher::Instance().getRecordStatus(vhost,app,stream_id); + case type_hls: + return MediaSourceWatcher::Instance().getRecordStatus(vhost,app,stream_id); + } + return status_not_record; +} + +int Recorder::startRecord(Recorder::type type, const string &vhost, const string &app, const string &stream_id, bool waitForRecord, bool continueRecord) { + switch (type){ + case type_mp4: + return MediaSourceWatcher::Instance().startRecord(vhost,app,stream_id,waitForRecord,continueRecord); + case type_hls: + return MediaSourceWatcher::Instance().startRecord(vhost,app,stream_id,waitForRecord,continueRecord); + } + return -3; +} + +void Recorder::stopRecord(Recorder::type type, const string &vhost, const string &app, const string &stream_id) { + switch (type){ + case type_mp4: + return MediaSourceWatcher::Instance().stopRecord(vhost,app,stream_id); + case type_hls: + return MediaSourceWatcher::Instance().stopRecord(vhost,app,stream_id); + } +} } /* namespace mediakit */ diff --git a/src/Record/Recorder.h b/src/Record/Recorder.h index 63fdc147..b170e77a 100644 --- a/src/Record/Recorder.h +++ b/src/Record/Recorder.h @@ -37,8 +37,25 @@ class MediaSinkInterface; class Recorder{ public: - static MediaSinkInterface *createHlsRecorder(const string &strVhost, const string &strApp, const string &strId); - static MediaSinkInterface *createMP4Recorder(const string &strVhost, const string &strApp, const string &strId); + typedef enum { + // 未录制 + status_not_record = 0, + // 等待MediaSource注册,注册成功后立即开始录制 + status_wait_record = 1, + // MediaSource已注册,并且正在录制 + status_recording = 2, + } status; + + typedef enum { + // 录制hls + type_hls = 0, + // 录制MP4 + type_mp4 = 1 + } type; + + static status getRecordStatus(type type, const string &vhost, const string &app, const string &stream_id); + static int startRecord(type type, const string &vhost, const string &app, const string &stream_id,bool waitForRecord, bool continueRecord); + static void stopRecord(type type, const string &vhost, const string &app, const string &stream_id); private: Recorder() = delete; ~Recorder() = delete;