diff --git a/api/include/mk_events.h b/api/include/mk_events.h index e9a4b07a..c9373af3 100644 --- a/api/include/mk_events.h +++ b/api/include/mk_events.h @@ -166,8 +166,7 @@ typedef struct { void (API_CALL *on_mk_flow_report)(const mk_media_info url_info, uint64_t total_bytes, uint64_t total_seconds, - int is_player, - const mk_tcp_session sender); + int is_player); } mk_events; diff --git a/api/source/mk_events.cpp b/api/source/mk_events.cpp index d44413e7..5e693587 100644 --- a/api/source/mk_events.cpp +++ b/api/source/mk_events.cpp @@ -151,8 +151,7 @@ API_EXPORT void API_CALL mk_events_listen(const mk_events *events){ s_events.on_mk_flow_report((mk_media_info) &args, totalBytes, totalDuration, - isPlayer, - (mk_tcp_session) &sender); + isPlayer); } }); diff --git a/api/tests/server.c b/api/tests/server.c index 72afc1b0..85714d50 100644 --- a/api/tests/server.c +++ b/api/tests/server.c @@ -376,14 +376,14 @@ void API_CALL on_mk_shell_login(const char *user_name, void API_CALL on_mk_flow_report(const mk_media_info url_info, uint64_t total_bytes, uint64_t total_seconds, - int is_player, - const mk_tcp_session sender) { - log_printf(LOG_LEV,"client info, local: %s:%d, peer: %s:%d\n" + int is_player) { + log_printf(LOG_LEV,"%s/%s/%s/%s, url params: %s," "total_bytes: %d, total_seconds: %d, is_player: %d", - mk_tcp_session_local_ip(sender), - mk_tcp_session_local_port(sender), - mk_tcp_session_peer_ip(sender), - mk_tcp_session_peer_port(sender), + mk_media_info_get_schema(url_info), + mk_media_info_get_vhost(url_info), + mk_media_info_get_app(url_info), + mk_media_info_get_stream(url_info), + mk_media_info_get_params(url_info), (int)total_bytes, (int)total_seconds, (int)is_player); } diff --git a/server/WebHook.cpp b/server/WebHook.cpp index 87ab067f..058fee39 100644 --- a/server/WebHook.cpp +++ b/server/WebHook.cpp @@ -269,16 +269,20 @@ void installWebHook(){ }); NoticeCenter::Instance().addListener(nullptr,Broadcast::kBroadcastFlowReport,[](BroadcastFlowReportArgs){ - if(!hook_enable || args._param_strs == hook_adminparams || hook_flowreport.empty() || sender.get_peer_ip() == "127.0.0.1"){ + if(!hook_enable || args._param_strs == hook_adminparams || hook_flowreport.empty()){ return; } auto body = make_json(args); - body["ip"] = sender.get_peer_ip(); - body["port"] = sender.get_peer_port(); - body["id"] = sender.getIdentifier(); body["totalBytes"] = (Json::UInt64)totalBytes; body["duration"] = (Json::UInt64)totalDuration; body["player"] = isPlayer; + + body["schema"] = args._schema; + body["vhost"] = args._vhost; + body["app"] = args._app; + body["stream"] = args._streamid; + body["params"] = args._param_strs; + //执行hook do_http_hook(hook_flowreport,body, nullptr); }); diff --git a/src/Common/config.h b/src/Common/config.h index 53ad8e3c..7cadbf70 100644 --- a/src/Common/config.h +++ b/src/Common/config.h @@ -127,7 +127,7 @@ extern const string kBroadcastShellLogin; //停止rtsp/rtmp/http-flv会话后流量汇报事件广播 extern const string kBroadcastFlowReport; -#define BroadcastFlowReportArgs const MediaInfo &args,const uint64_t &totalBytes,const uint64_t &totalDuration,const bool &isPlayer,TcpSession &sender +#define BroadcastFlowReportArgs const MediaInfo &args,const uint64_t &totalBytes,const uint64_t &totalDuration,const bool &isPlayer //未找到流后会广播该事件,请在监听该事件后去拉流或其他方式产生流,这样就能按需拉流了 extern const string kBroadcastNotFoundStream; diff --git a/src/Http/HttpFileManager.cpp b/src/Http/HttpFileManager.cpp index 367e6dc1..430df39f 100644 --- a/src/Http/HttpFileManager.cpp +++ b/src/Http/HttpFileManager.cpp @@ -46,6 +46,7 @@ static const string kAccessErrKey = "kAccessErrKey"; static const string kAccessHls = "kAccessHls"; static const string kHlsSuffix = "/hls.m3u8"; static const string kHlsData = "kHlsData"; +static const string kHlsHaveFindMediaSource = "kHlsHaveFindMediaSource"; static const string &getContentType(const char *name) { const char *dot; @@ -306,6 +307,8 @@ static void canAccessPath(TcpSession &sender, const Parser &parser, const MediaI if(is_hls){ //hls相关信息 (*cookie)[kHlsData].set(mediaInfo); + //hls未查找MediaSource + (*cookie)[kHlsHaveFindMediaSource].set(false); } callback(errMsg, cookie); }else{ @@ -370,49 +373,64 @@ static void accessFile(TcpSession &sender, const Parser &parser, const MediaInfo weak_ptr weakSession = sender.shared_from_this(); //判断是否有权限访问该文件 canAccessPath(sender, parser, mediaInfo, false, [cb, strFile, parser, is_hls, mediaInfo, weakSession , file_exist](const string &errMsg, const HttpServerCookie::Ptr &cookie) { - if (!errMsg.empty()) { - //文件鉴权失败 - StrCaseMap headerOut; + auto strongSession = weakSession.lock(); + if(!strongSession){ + //http客户端已经断开,不需要回复 + return; + } + if (!errMsg.empty()) { + //文件鉴权失败 + StrCaseMap headerOut; + if (cookie) { + headerOut["Set-Cookie"] = cookie->getCookie((*cookie)[kCookiePathKey].get()); + } + cb("401 Unauthorized", "text/html", headerOut, std::make_shared(errMsg)); + return; + } + + auto response_file = [](const HttpServerCookie::Ptr &cookie, const HttpFileManager::invoker &cb, const string &strFile, const Parser &parser) { + StrCaseMap httpHeader; + if (cookie) { + httpHeader["Set-Cookie"] = cookie->getCookie((*cookie)[kCookiePathKey].get()); + } + HttpSession::HttpResponseInvoker invoker = [&](const string &codeOut, const StrCaseMap &headerOut, const HttpBody::Ptr &body) { if (cookie) { - headerOut["Set-Cookie"] = cookie->getCookie((*cookie)[kCookiePathKey].get()); + cookie->getLock(); + auto is_hls = (*cookie)[kAccessHls].get(); + if (is_hls) { + (*cookie)[kHlsData].get().addByteUsage(body->remainSize()); + } } - cb("401 Unauthorized", "text/html", headerOut, std::make_shared(errMsg)); + cb(codeOut.data(), getContentType(strFile.data()), headerOut, body); + }; + invoker.responseFile(parser.getValues(), httpHeader, strFile); + }; + + //如果程序未正常退出,会残余上次的hls文件,所以判断hls直播是否存在的关键不是文件存在与否 + //而是应该判断HlsMediaSource是否已注册,但是这样会每次获取m3u8文件时都会用MediaSource::findAsync判断一次 + //会导致程序性能低下,所以我们应该在cookie声明周期的第一次判断HlsMediaSource是否已经注册,后续通过文件存在与否判断 + if (!is_hls) { + //不是hls,直接回复文件或404 + response_file(cookie, cb, strFile, parser); + } else { + bool have_find_media_src = false; + if(cookie){ + have_find_media_src = (*cookie)[kHlsHaveFindMediaSource].get(); + if(!have_find_media_src){ + (*cookie)[kHlsHaveFindMediaSource].set(true); + } + } + if(have_find_media_src){ + //之前该cookie已经通过MediaSource::findAsync查找过了,所以现在只以文件系统查找结果为准 + response_file(cookie, cb, strFile, parser); return; } - - auto response_file = [](const HttpServerCookie::Ptr &cookie, const HttpFileManager::invoker &cb, const string &strFile, const Parser &parser) { - StrCaseMap httpHeader; - if (cookie) { - httpHeader["Set-Cookie"] = cookie->getCookie((*cookie)[kCookiePathKey].get()); - } - HttpSession::HttpResponseInvoker invoker = [&](const string &codeOut, const StrCaseMap &headerOut, const HttpBody::Ptr &body) { - if (cookie) { - cookie->getLock(); - auto is_hls = (*cookie)[kAccessHls].get(); - if (is_hls) { - (*cookie)[kHlsData].get().addByteUsage(body->remainSize()); - } - } - cb(codeOut.data(), getContentType(strFile.data()), headerOut, body); - }; - invoker.responseFile(parser.getValues(), httpHeader, strFile); - }; - - if (file_exist || !is_hls) { - //不是hls或者文件存在,直接回复文件或404 + //hls文件不存在,我们等待其生成并延后回复 + MediaSource::findAsync(mediaInfo, strongSession, [response_file, cookie, cb, strFile, parser](const MediaSource::Ptr &src) { + //hls已经生成或者超时后仍未生成,那么不管怎么样都返回客户端 response_file(cookie, cb, strFile, parser); - } else { - //hls文件不存在,我们等待其生成并延后回复 - auto strongSession = weakSession.lock(); - if(!strongSession){ - //http客户端已经断开,不需要回复 - return; - } - MediaSource::findAsync(mediaInfo, strongSession, [response_file, cookie, cb, strFile, parser](const MediaSource::Ptr &src) { - //hls已经生成或者超时后仍未生成,那么不管怎么样都返回客户端 - response_file(cookie, cb, strFile, parser); - }); - } + }); + } }); } diff --git a/src/Http/HttpSession.cpp b/src/Http/HttpSession.cpp index 4b8daf0d..320e7338 100644 --- a/src/Http/HttpSession.cpp +++ b/src/Http/HttpSession.cpp @@ -104,7 +104,7 @@ void HttpSession::onRecv(const Buffer::Ptr &pBuf) { void HttpSession::onError(const SockException& err) { if(_is_flv_stream){ //flv播放器 - WarnP(this) << "播放器(" + WarnP(this) << "FLV播放器(" << _mediaInfo._vhost << "/" << _mediaInfo._app << "/" << _mediaInfo._streamid @@ -112,12 +112,7 @@ void HttpSession::onError(const SockException& err) { GET_CONFIG(uint32_t,iFlowThreshold,General::kFlowThreshold); if(_ui64TotalBytes > iFlowThreshold * 1024){ - NoticeCenter::Instance().emitEvent(Broadcast::kBroadcastFlowReport, - _mediaInfo, - _ui64TotalBytes, - _ticker.createdTime()/1000, - true, - *this); + NoticeCenter::Instance().emitEvent(Broadcast::kBroadcastFlowReport, _mediaInfo, _ui64TotalBytes, _ticker.createdTime()/1000, true); } return; } diff --git a/src/Record/HlsMediaSource.cpp b/src/Record/HlsMediaSource.cpp index e5b0af23..9ff3f010 100644 --- a/src/Record/HlsMediaSource.cpp +++ b/src/Record/HlsMediaSource.cpp @@ -42,21 +42,27 @@ void HlsCookieData::addReaderCount(){ _src = src; } } - } HlsCookieData::~HlsCookieData() { - if(_added){ + if (_added) { auto src = _src.lock(); - if(src){ + if (src) { src->modifyReaderCount(false); } + auto duration = (_ticker.createdTime() - _ticker.elapsedTime()) / 1000; + WarnL << "HLS播放器(" << _info._vhost << "/" << _info._app << "/" << _info._streamid << ")断开,播放时间:" << duration; + GET_CONFIG(uint32_t, iFlowThreshold, General::kFlowThreshold); + if (_bytes > iFlowThreshold * 1024) { + NoticeCenter::Instance().emitEvent(Broadcast::kBroadcastFlowReport, _info, _bytes, duration, true); + } } } void HlsCookieData::addByteUsage(uint64_t bytes) { addReaderCount(); _bytes += bytes; + _ticker.resetTime(); } diff --git a/src/Record/HlsMediaSource.h b/src/Record/HlsMediaSource.h index a402b1cb..760676e1 100644 --- a/src/Record/HlsMediaSource.h +++ b/src/Record/HlsMediaSource.h @@ -27,6 +27,7 @@ #define ZLMEDIAKIT_HLSMEDIASOURCE_H #include +#include "Util/TimeTicker.h" #include "Common/MediaSource.h" namespace mediakit{ @@ -43,6 +44,7 @@ private: MediaInfo _info; bool _added = false; weak_ptr _src; + Ticker _ticker; }; class HlsMediaSource : public MediaSource { diff --git a/src/Rtmp/RtmpSession.cpp b/src/Rtmp/RtmpSession.cpp index 42518016..e7e93688 100644 --- a/src/Rtmp/RtmpSession.cpp +++ b/src/Rtmp/RtmpSession.cpp @@ -45,7 +45,7 @@ RtmpSession::~RtmpSession() { void RtmpSession::onError(const SockException& err) { bool isPlayer = !_pPublisherSrc; - WarnP(this) << (isPlayer ? "播放器(" : "推流器(") + WarnP(this) << (isPlayer ? "RTMP播放器(" : "RTMP推流器(") << _mediaInfo._vhost << "/" << _mediaInfo._app << "/" << _mediaInfo._streamid @@ -55,12 +55,7 @@ void RtmpSession::onError(const SockException& err) { GET_CONFIG(uint32_t,iFlowThreshold,General::kFlowThreshold); if(_ui64TotalBytes > iFlowThreshold * 1024){ - NoticeCenter::Instance().emitEvent(Broadcast::kBroadcastFlowReport, - _mediaInfo, - _ui64TotalBytes, - _ticker.createdTime()/1000, - isPlayer, - *this); + NoticeCenter::Instance().emitEvent(Broadcast::kBroadcastFlowReport, _mediaInfo, _ui64TotalBytes, _ticker.createdTime()/1000, isPlayer); } } diff --git a/src/Rtsp/RtspSession.cpp b/src/Rtsp/RtspSession.cpp index 7d874cd8..b9a3c424 100644 --- a/src/Rtsp/RtspSession.cpp +++ b/src/Rtsp/RtspSession.cpp @@ -86,7 +86,7 @@ RtspSession::~RtspSession() { void RtspSession::onError(const SockException& err) { bool isPlayer = !_pushSrc; - WarnP(this) << (isPlayer ? "播放器(" : "推流器(") + WarnP(this) << (isPlayer ? "RTSP播放器(" : "RTSP推流器(") << _mediaInfo._vhost << "/" << _mediaInfo._app << "/" << _mediaInfo._streamid @@ -106,12 +106,7 @@ void RtspSession::onError(const SockException& err) { //流量统计事件广播 GET_CONFIG(uint32_t,iFlowThreshold,General::kFlowThreshold); if(_ui64TotalBytes > iFlowThreshold * 1024){ - NoticeCenter::Instance().emitEvent(Broadcast::kBroadcastFlowReport, - _mediaInfo, - _ui64TotalBytes, - _ticker.createdTime()/1000, - isPlayer, - *this); + NoticeCenter::Instance().emitEvent(Broadcast::kBroadcastFlowReport, _mediaInfo, _ui64TotalBytes, _ticker.createdTime()/1000, isPlayer); } }