diff --git a/README.md b/README.md index 38c1c4c3..1702e4d5 100644 --- a/README.md +++ b/README.md @@ -118,6 +118,7 @@ - 支持FFmpeg拉流代理任意格式的流 - 支持http api生成并返回实时截图 - 支持按需解复用、转协议,当有人观看时才开启转协议 + - 支持溯源模式的集群部署,溯源方式支持rtsp/rtmp/hls/http-ts ## 编译以及测试 diff --git a/conf/config.ini b/conf/config.ini index c064e21b..77441f9f 100644 --- a/conf/config.ini +++ b/conf/config.ini @@ -153,6 +153,16 @@ timeoutSec=10 #keepalive hook触发间隔,单位秒,float类型 alive_interval=10.0 +[cluster] +#设置源站拉流url模板, 格式跟printf类似,第一个%s指定app,第二个%s指定stream_id, +#开启集群模式后,on_stream_not_found和on_stream_none_reader hook将无效. +#溯源模式支持以下类型: +#rtmp方式: rtmp://127.0.0.1:1935/%s/%s +#rtsp方式: rtsp://127.0.0.1:554/%s/%s +#hls方式: http://127.0.0.1:80/%s/%s/hls.m3u8 +#http-ts方式: http://127.0.0.1:80/%s/%s.live.ts +origin_url= + [http] #http服务器字符编码,windows上默认gb2312 charSet=utf-8 diff --git a/server/WebApi.cpp b/server/WebApi.cpp index 581fcc9b..e9fce6f9 100755 --- a/server/WebApi.cpp +++ b/server/WebApi.cpp @@ -446,6 +446,45 @@ void getStatisticJson(const function &cb) { #endif } +void addStreamProxy(const string &vhost, const string &app, const string &stream, const string &url, int retry_count, + bool enable_hls, bool enable_mp4, int rtp_type, float timeout_sec, + const function &cb) { + auto key = getProxyKey(vhost, app, stream); + lock_guard lck(s_proxyMapMtx); + if (s_proxyMap.find(key) != s_proxyMap.end()) { + //已经在拉流了 + cb(SockException(Err_success), key); + return; + } + //添加拉流代理 + auto player = std::make_shared(vhost, app, stream, enable_hls, enable_mp4, retry_count ? retry_count : -1); + s_proxyMap[key] = player; + + //指定RTP over TCP(播放rtsp时有效) + (*player)[kRtpType] = rtp_type; + + if (timeout_sec > 0.1) { + //播放握手超时时间 + (*player)[kTimeoutMS] = timeout_sec * 1000; + } + + //开始播放,如果播放失败或者播放中止,将会自动重试若干次,默认一直重试 + player->setPlayCallbackOnce([cb, key](const SockException &ex) { + if (ex) { + lock_guard lck(s_proxyMapMtx); + s_proxyMap.erase(key); + } + cb(ex, key); + }); + + //被主动关闭拉流 + player->setOnClose([key](const SockException &ex) { + lock_guard lck(s_proxyMapMtx); + s_proxyMap.erase(key); + }); + player->play(url); +}; + /** * 安装api接口 * 所有api都支持GET和POST两种方式 @@ -861,52 +900,6 @@ void installWebApi() { val["data"]["flag"] = s_proxyPusherMap.erase(allArgs["key"]) == 1; }); - static auto addStreamProxy = [](const string &vhost, - const string &app, - const string &stream, - const string &url, - int retry_count, - bool enable_hls, - bool enable_mp4, - int rtp_type, - float timeout_sec, - const function &cb){ - auto key = getProxyKey(vhost,app,stream); - lock_guard lck(s_proxyMapMtx); - if(s_proxyMap.find(key) != s_proxyMap.end()){ - //已经在拉流了 - cb(SockException(Err_success),key); - return; - } - //添加拉流代理 - PlayerProxy::Ptr player(new PlayerProxy(vhost, app, stream, enable_hls, enable_mp4, retry_count ? retry_count : -1)); - s_proxyMap[key] = player; - - //指定RTP over TCP(播放rtsp时有效) - (*player)[kRtpType] = rtp_type; - - if (timeout_sec > 0.1) { - //播放握手超时时间 - (*player)[kTimeoutMS] = timeout_sec * 1000; - } - - //开始播放,如果播放失败或者播放中止,将会自动重试若干次,默认一直重试 - player->setPlayCallbackOnce([cb,key](const SockException &ex){ - if(ex){ - lock_guard lck(s_proxyMapMtx); - s_proxyMap.erase(key); - } - cb(ex,key); - }); - - //被主动关闭拉流 - player->setOnClose([key](const SockException &ex){ - lock_guard lck(s_proxyMapMtx); - s_proxyMap.erase(key); - }); - player->play(url); - }; - //动态添加rtsp/rtmp拉流代理 //测试url http://127.0.0.1/index/api/addStreamProxy?vhost=__defaultVhost__&app=proxy&enable_rtsp=1&enable_rtmp=1&stream=0&url=rtmp://127.0.0.1/live/obs api_regist("/index/api/addStreamProxy",[](API_ARGS_MAP_ASYNC){ diff --git a/server/WebApi.h b/server/WebApi.h index cc02535c..10d2a9a2 100755 --- a/server/WebApi.h +++ b/server/WebApi.h @@ -236,4 +236,7 @@ void installWebApi(); void unInstallWebApi(); Value makeMediaSourceJson(MediaSource &media); void getStatisticJson(const function &cb); +void addStreamProxy(const string &vhost, const string &app, const string &stream, const string &url, int retry_count, + bool enable_hls, bool enable_mp4, int rtp_type, float timeout_sec, + const function &cb); #endif //ZLMEDIAKIT_WEBAPI_H diff --git a/server/WebHook.cpp b/server/WebHook.cpp index 497bfc66..451c46c1 100755 --- a/server/WebHook.cpp +++ b/server/WebHook.cpp @@ -69,9 +69,16 @@ onceToken token([](){ },nullptr); }//namespace Hook +namespace Cluster { +#define CLUSTER_FIELD "cluster." +const string kOriginUrl = CLUSTER_FIELD "origin_url"; +static onceToken token([]() { + mINI::Instance()[kOriginUrl] = ""; +}); -static void parse_http_response(const SockException &ex, - const Parser &res, +}//namespace Cluster + +static void parse_http_response(const SockException &ex, const Parser &res, const function &fun){ if (ex) { auto errStr = StrPrinter << "[network err]:" << ex.what() << endl; @@ -358,12 +365,41 @@ void installWebHook(){ do_http_hook(hook_stream_chaned,body, nullptr); }); + static auto getPullUrl = [](const string &origin_fmt, const MediaInfo &info) -> string { + char url[1024] = { 0 }; + if (origin_fmt.size() > snprintf(url, sizeof(url), origin_fmt.data(), info._app.data(), info._streamid.data())) { + WarnL << "get origin url failed, origin_fmt:" << origin_fmt; + return ""; + } + + return string(url) + '?' + VHOST_KEY + '=' + info._vhost + '&' + info._param_strs; + }; + //监听播放失败(未找到特定的流)事件 - NoticeCenter::Instance().addListener(nullptr,Broadcast::kBroadcastNotFoundStream,[](BroadcastNotFoundStreamArgs){ - GET_CONFIG(string,hook_stream_not_found,Hook::kOnStreamNotFound); - if(!hook_enable || hook_stream_not_found.empty()){ + NoticeCenter::Instance().addListener(nullptr, Broadcast::kBroadcastNotFoundStream, [](BroadcastNotFoundStreamArgs) { + GET_CONFIG(string, origin_url, Cluster::kOriginUrl); + if (!origin_url.empty()) { + //设置了源站 + auto url = getPullUrl(origin_url, args); + if (url.empty()) { + closePlayer(); + return; + } + InfoL << "start pull stream from origin:" << url; + GET_CONFIG(float, hook_timeout_sec, Hook::kTimeoutSec); + addStreamProxy(args._vhost, args._app, args._streamid, url, -1, args._schema == HLS_SCHEMA, false, + Rtsp::RTP_TCP, hook_timeout_sec, [closePlayer](const SockException &ex, const string &key) { + if (ex) { + closePlayer(); + } + }); + return; + } + + GET_CONFIG(string, hook_stream_not_found, Hook::kOnStreamNotFound); + if (!hook_enable || hook_stream_not_found.empty()) { //如果确定这个流不存在,可以closePlayer()返回播放器流不存在 - //closePlayer(); + // closePlayer(); return; } auto body = make_json(args); @@ -371,7 +407,7 @@ void installWebHook(){ body["port"] = sender.get_peer_port(); body["id"] = sender.getIdentifier(); //执行hook - do_http_hook(hook_stream_not_found,body, nullptr); + do_http_hook(hook_stream_not_found, body, nullptr); }); static auto getRecordInfo = [](const RecordInfo &info) { @@ -429,7 +465,13 @@ void installWebHook(){ }); }); - NoticeCenter::Instance().addListener(nullptr,Broadcast::kBroadcastStreamNoneReader,[](BroadcastStreamNoneReaderArgs){ + NoticeCenter::Instance().addListener(nullptr,Broadcast::kBroadcastStreamNoneReader,[](BroadcastStreamNoneReaderArgs) { + GET_CONFIG(string, origin_url, Cluster::kOriginUrl); + if (!origin_url.empty()) { + sender.close(false); + WarnL << "无人观看主动关闭流:" << sender.getOriginUrl(); + return; + } GET_CONFIG(string,hook_stream_none_reader,Hook::kOnStreamNoneReader); if(!hook_enable || hook_stream_none_reader.empty()){ return; @@ -449,11 +491,7 @@ void installWebHook(){ return; } strongSrc->close(false); - WarnL << "无人观看主动关闭流:" - << strongSrc->getSchema() << "/" - << strongSrc->getVhost() << "/" - << strongSrc->getApp() << "/" - << strongSrc->getId(); + WarnL << "无人观看主动关闭流:" << strongSrc->getOriginUrl(); }); });