添加流量统计广播、整理广播参数类型

This commit is contained in:
xiongziliang 2018-02-06 15:28:27 +08:00
parent fa5f90599c
commit 991715fc93
11 changed files with 79 additions and 39 deletions

View File

@ -82,10 +82,10 @@ bool MediaSource::regist() {
InfoL << m_strSchema << " " << m_strVhost << " " << m_strApp << " " << m_strId;
NoticeCenter::Instance().emitEvent(Broadcast::kBroadcastMediaChanged,
true,
m_strSchema.data(),
m_strVhost.data(),
m_strApp.data(),
m_strId.data());
m_strSchema,
m_strVhost,
m_strApp,
m_strId);
}
return success;
}
@ -111,10 +111,10 @@ void MediaSource::unregisted(){
InfoL << "" << m_strSchema << " " << m_strVhost << " " << m_strApp << " " << m_strId;
NoticeCenter::Instance().emitEvent(Broadcast::kBroadcastMediaChanged,
false,
m_strSchema.data(),
m_strVhost.data(),
m_strApp.data(),
m_strId.data());
m_strSchema,
m_strVhost,
m_strApp,
m_strId);
}
void MediaInfo::parse(const string &url){

View File

@ -54,6 +54,11 @@ const char kBroadcastOnGetRtspRealm[] = "kBroadcastOnGetRtspRealm";
const char kBroadcastOnRtspAuth[] = "kBroadcastOnRtspAuth";
const char kBroadcastMediaPlayed[] = "kBroadcastMediaPlayed";
const char kBroadcastRtmpPublish[] = "kBroadcastRtmpPublish";
const char kBroadcastFlowReport[] = "kBroadcastFlowReport";
const char kFlowThreshold[] = "Broadcast.flowThreshold";
onceToken token([](){
mINI::Instance()[kFlowThreshold] = 1024;
},nullptr);
} //namespace Broadcast
//代理失败最大重试次数

View File

@ -65,7 +65,7 @@ namespace Broadcast {
//注册或反注册MediaSource事件广播
extern const char kBroadcastMediaChanged[];
#define BroadcastMediaChangedArgs bool bRegist, const char *schema,const char *vhost,const char *app,const char *stream
#define BroadcastMediaChangedArgs const bool &bRegist, const string &schema,const string &vhost,const string &app,const string &stream
//录制mp4文件成功后广播
extern const char kBroadcastRecordMP4[];
@ -73,17 +73,16 @@ extern const char kBroadcastRecordMP4[];
//收到http api请求广播
extern const char kBroadcastHttpRequest[];
#define BroadcastHttpRequestArgs const Parser &parser,HttpSession::HttpResponseInvoker &invoker,bool &consumed
#define BroadcastHttpRequestArgs const Parser &parser,const HttpSession::HttpResponseInvoker &invoker,bool &consumed
//该流是否需要认证是的话调用invoker并传入realm,否则传入空的realm.如果该事件不监听则不认证
extern const char kBroadcastOnGetRtspRealm[];
#define BroadcastOnGetRtspRealmArgs const char *app,const char *stream,const RtspSession::onGetRealm &invoker
#define BroadcastOnGetRtspRealmArgs const string &app,const string &stream,const RtspSession::onGetRealm &invoker
//请求认证用户密码事件user_name为用户名must_no_encrypt如果为true则必须提供明文密码(因为此时是base64认证方式),否则会导致认证失败
//获取到密码后请调用invoker并输入对应类型的密码和密码类型invoker执行时会匹配密码
extern const char kBroadcastOnRtspAuth[];
#define BroadcastOnRtspAuthArgs const char *user_name,bool must_no_encrypt,const RtspSession::onAuth &invoker
#define BroadcastOnRtspAuthArgs const string &user_name,const bool &must_no_encrypt,const RtspSession::onAuth &invoker
//鉴权结果回调对象
typedef std::function<void(bool success, const string &errMessage)> AuthInvoker;
@ -92,10 +91,16 @@ typedef std::function<void(bool success, const string &errMessage)> AuthInvoker;
extern const char kBroadcastRtmpPublish[];
#define BroadcastRtmpPublishArgs const MediaInfo &args,const Broadcast::AuthInvoker &invoker
//播放rtsp或rtmp事件广播,通过该事件控制播放鉴权
//播放rtsp/rtmp/http-flv事件广播,通过该事件控制播放鉴权
extern const char kBroadcastMediaPlayed[];
#define BroadcastMediaPlayedArgs const MediaInfo &args,const Broadcast::AuthInvoker &invoker
//停止rtsp/rtmp/http-flv会话后流量汇报事件广播
extern const char kBroadcastFlowReport[];
#define BroadcastFlowReportArgs const MediaInfo &args,const uint64_t &totalBytes
//流量汇报事件流量阈值,单位KB默认1MB
extern const char kFlowThreshold[];
} //namespace Broadcast
//代理失败最大重试次数

View File

@ -83,7 +83,7 @@ void HttpClient::sendRequest(const string &strUrl){
_isHttps = isHttps;
if(!alive() || bChanged){
InfoL << "reconnet:" << _lastHost;
//InfoL << "reconnet:" << _lastHost;
startConnect(host, port);
}else{
SockException ex;

View File

@ -161,6 +161,10 @@ inline HttpSession::HttpCode HttpSession::parserHttpReq(const string &str) {
}
void HttpSession::onError(const SockException& err) {
//WarnL << err.what();
static uint64_t iFlowThreshold = mINI::Instance()[Broadcast::kFlowThreshold];
if(m_previousTagSize > iFlowThreshold * 1024){
NoticeCenter::Instance().emitEvent(Broadcast::kBroadcastFlowReport,m_mediaInfo,m_previousTagSize);
}
}
void HttpSession::onManager() {
@ -185,10 +189,10 @@ inline bool HttpSession::checkLiveFlvStream(){
}
//拼接成完整url
auto fullUrl = string(HTTP_SCHEMA) + "://" + m_parser["Host"] + m_parser.FullUrl();
MediaInfo info(fullUrl);
info.m_streamid.erase(info.m_streamid.size() - 4);//去除.flv后缀
m_mediaInfo.parse(fullUrl);
m_mediaInfo.m_streamid.erase(m_mediaInfo.m_streamid.size() - 4);//去除.flv后缀
auto mediaSrc = dynamic_pointer_cast<RtmpMediaSource>(MediaSource::find(RTMP_SCHEMA,info.m_vhost,info.m_app,info.m_streamid));
auto mediaSrc = dynamic_pointer_cast<RtmpMediaSource>(MediaSource::find(RTMP_SCHEMA,m_mediaInfo.m_vhost,m_mediaInfo.m_app,m_mediaInfo.m_streamid));
if(!mediaSrc){
//该rtmp源不存在
sendNotFound(true);
@ -288,7 +292,7 @@ inline bool HttpSession::checkLiveFlvStream(){
onRes(authSuccess,err);
});
};
auto flag = NoticeCenter::Instance().emitEvent(Broadcast::kBroadcastMediaPlayed,info,invoker);
auto flag = NoticeCenter::Instance().emitEvent(Broadcast::kBroadcastMediaPlayed,m_mediaInfo,invoker);
if(!flag){
//该事件无人监听,默认不鉴权
onRes(true,"");
@ -306,9 +310,9 @@ inline HttpSession::HttpCode HttpSession::Handle_Req_GET() {
//事件未被拦截则认为是http下载请求
auto fullUrl = string(HTTP_SCHEMA) + "://" + m_parser["Host"] + m_parser.FullUrl();
MediaInfo info(fullUrl);
m_mediaInfo.parse(fullUrl);
string strFile = m_strPath + "/" + info.m_vhost + m_parser.Url();
string strFile = m_strPath + "/" + m_mediaInfo.m_vhost + m_parser.Url();
/////////////HTTP连接是否需要被关闭////////////////
static uint32_t reqCnt = mINI::Instance()[Config::Http::kMaxReqCount].as<uint32_t>();
bool bClose = (strcasecmp(m_parser["Connection"].data(),"close") == 0) && ( ++m_iReqCnt < reqCnt);
@ -317,7 +321,7 @@ inline HttpSession::HttpCode HttpSession::Handle_Req_GET() {
if (strFile.back() == '/') {
//生成文件夹菜单索引
string strMeun;
if (!makeMeun(strFile,info.m_vhost, strMeun)) {
if (!makeMeun(strFile,m_mediaInfo.m_vhost, strMeun)) {
//文件夹不存在
sendNotFound(bClose);
return eHttpCode;
@ -600,7 +604,7 @@ inline bool HttpSession::emitHttpEvent(bool doInvoke){
};
///////////////////广播HTTP事件///////////////////////////
bool consumed = false;//该事件是否被消费
NoticeCenter::Instance().emitEvent(Config::Broadcast::kBroadcastHttpRequest,m_parser,invoker,(bool &)consumed);
NoticeCenter::Instance().emitEvent(Config::Broadcast::kBroadcastHttpRequest,m_parser,invoker,consumed);
if(!consumed && doInvoke){
//该事件无人消费所以返回404
invoker("404 Not Found",KeyValue(),"");

View File

@ -76,7 +76,9 @@ private:
//flv over http
uint32_t m_aui32FirstStamp[2] = {0};
uint32_t m_previousTagSize = 0;
MediaInfo m_mediaInfo;
RingBuffer<RtmpPacket::Ptr>::RingReader::Ptr m_pRingReader;
void onSendMedia(const RtmpPacket::Ptr &pkt);
void sendRtmp(const RtmpPacket::Ptr &pkt, uint32_t ui32TimeStamp);
void sendRtmp(uint8_t ui8Type, const std::string& strBuf, uint32_t ui32TimeStamp);

View File

@ -219,7 +219,7 @@ void Mp4Maker::closeFile() {
stat(m_strFile.data(), &fileData);
m_info.ui64FileSize = fileData.st_size;
//----record 业务逻辑----//
NoticeCenter::Instance().emitEvent(Config::Broadcast::kBroadcastRecordMP4,(const Mp4Info &)m_info);
NoticeCenter::Instance().emitEvent(Config::Broadcast::kBroadcastRecordMP4,m_info);
}
}

View File

@ -52,9 +52,12 @@ RtmpSession::~RtmpSession() {
void RtmpSession::onError(const SockException& err) {
DebugL << err.what();
if (m_pPublisherSrc) {
m_pPublisherSrc.reset();
}
//流量统计事件广播
static uint64_t iFlowThreshold = mINI::Instance()[Broadcast::kFlowThreshold];
if(m_ui64TotalBytes > iFlowThreshold * 1024){
NoticeCenter::Instance().emitEvent(Broadcast::kBroadcastFlowReport,m_mediaInfo,m_ui64TotalBytes);
}
}
void RtmpSession::onManager() {
@ -76,6 +79,7 @@ void RtmpSession::onManager() {
void RtmpSession::onRecv(const Socket::Buffer::Ptr &pBuf) {
m_ticker.resetTime();
try {
m_ui64TotalBytes += pBuf->size();
onParseRtmp(pBuf->data(), pBuf->size());
} catch (exception &e) {
WarnL << e.what();

View File

@ -65,7 +65,8 @@ private:
bool m_bPublisherSrcRegisted = false;
std::weak_ptr<RtmpMediaSource> m_pPlayerSrc;
uint32_t m_aui32FirstStamp[2] = {0};
//消耗的总流量
uint64_t m_ui64TotalBytes = 0;
void onProcessCmd(AMFDecoder &dec);
void onCmd_connect(AMFDecoder &dec);
void onCmd_createStream(AMFDecoder &dec);
@ -82,10 +83,12 @@ private:
void onSendMedia(const RtmpPacket::Ptr &pkt);
void onSendRawData(const char *pcRawData,int iSize) override{
m_ui64TotalBytes += iSize;
send(pcRawData, iSize);
}
void onSendRawData(const Socket::Buffer::Ptr &buffer,int flags) override{
sock->send(buffer,flags);
m_ui64TotalBytes += buffer->size();
sock->send(buffer,flags);
}
void onRtmpChunk(RtmpPacket &chunkData) override;

View File

@ -125,6 +125,12 @@ void RtspSession::onError(const SockException& err) {
g_mapPostter.emplace(this, dynamic_pointer_cast<RtspSession>(shared_from_this()));
TraceL << "quickTime will not send request any more!";
}
//流量统计事件广播
static uint64_t iFlowThreshold = mINI::Instance()[Broadcast::kFlowThreshold];
if(m_ui64TotalBytes > iFlowThreshold * 1024){
NoticeCenter::Instance().emitEvent(Broadcast::kBroadcastFlowReport,m_mediaInfo,m_ui64TotalBytes);
}
}
void RtspSession::onManager() {
@ -144,9 +150,11 @@ void RtspSession::onManager() {
void RtspSession::onRecv(const Socket::Buffer::Ptr &pBuf) {
m_ticker.resetTime();
char tmp[2 * 1024];
m_pcBuf = tmp;
if (m_bBase64need) {
char tmp[2 * 1024];
m_pcBuf = tmp;
m_ui64TotalBytes += pBuf->size();
if (m_bBase64need) {
//quicktime 加密后的rtsp请求需要解密
av_base64_decode((uint8_t *) m_pcBuf, pBuf->data(), sizeof(tmp));
m_parser.Parse(m_pcBuf); //rtsp请求解析
@ -217,8 +225,8 @@ bool RtspSession::handleReq_Describe() {
//广播是否需要认证事件
if(!NoticeCenter::Instance().emitEvent(Broadcast::kBroadcastOnGetRtspRealm,
m_mediaInfo.m_app.data(),
m_mediaInfo.m_streamid.data(),
m_mediaInfo.m_app,
m_mediaInfo.m_streamid,
invoker)){
//无人监听此事件,说明无需认证
invoker("");
@ -322,7 +330,7 @@ void RtspSession::onAuthBasic(const weak_ptr<RtspSession> &weakSelf,const string
};
//此时必须提供明文密码
bool must_no_encrypt = true;
if(!NoticeCenter::Instance().emitEvent(Broadcast::kBroadcastOnRtspAuth,user.data(),must_no_encrypt,invoker)){
if(!NoticeCenter::Instance().emitEvent(Broadcast::kBroadcastOnRtspAuth,user,must_no_encrypt,invoker)){
//表明该流需要认证却没监听请求密码事件,这一般是大意的程序所为,警告之
WarnL << "请监听kBroadcastOnRtspAuth事件";
//但是我们还是忽略认证以便完成播放
@ -404,7 +412,7 @@ void RtspSession::onAuthDigest(const weak_ptr<RtspSession> &weakSelf,const strin
//此时可以提供明文或md5加密的密码
bool must_no_encrypt = false;
if(!NoticeCenter::Instance().emitEvent(Broadcast::kBroadcastOnRtspAuth,username.data(),must_no_encrypt,invoker)){
if(!NoticeCenter::Instance().emitEvent(Broadcast::kBroadcastOnRtspAuth,username,must_no_encrypt,invoker)){
//表明该流需要认证却没监听请求密码事件,这一般是大意的程序所为,警告之
WarnL << "请监听kBroadcastOnRtspAuth事件";
//但是我们还是忽略认证以便完成播放
@ -908,6 +916,7 @@ inline void RtspSession::sendRtpPacket(const RtpPacket::Ptr & pkt) {
return;
}
BufferRtp::Ptr buffer(new BufferRtp(pkt,4));
m_ui64TotalBytes += buffer->size();
pSock->send(buffer,SOCKET_DEFAULE_FLAGS, peerAddr.get());
}
break;

View File

@ -83,16 +83,20 @@ public:
private:
typedef bool (RtspSession::*rtspCMDHandle)();
int send(const string &strBuf) override {
m_ui64TotalBytes += strBuf.size();
return m_pSender->send(strBuf);
}
int send(string &&strBuf) override {
return m_pSender->send(std::move(strBuf));
m_ui64TotalBytes += strBuf.size();
return m_pSender->send(std::move(strBuf));
}
int send(const char *pcBuf, int iSize) override {
return m_pSender->send(pcBuf, iSize);
m_ui64TotalBytes += iSize;
return m_pSender->send(pcBuf, iSize);
}
int send(const Socket::Buffer::Ptr &pkt) override{
return m_pSender->send(pkt,SOCKET_DEFAULE_FLAGS | FLAG_MORE);
m_ui64TotalBytes += pkt->size();
return m_pSender->send(pkt,SOCKET_DEFAULE_FLAGS | FLAG_MORE);
}
void shutdown() override;
bool handleReq_Options(); //处理options方法
@ -185,6 +189,10 @@ private:
//quicktime 请求rtsp会产生两次tcp连接
//一次发送 get 一次发送post需要通过sessioncookie关联起来
string m_strSessionCookie;
//消耗的总流量
uint64_t m_ui64TotalBytes = 0;
static recursive_mutex g_mtxGetter; //对quicktime上锁保护
static recursive_mutex g_mtxPostter; //对quicktime上锁保护
static unordered_map<string, weak_ptr<RtspSession> > g_mapGetter;