diff --git a/3rdpart/ZLToolKit b/3rdpart/ZLToolKit index 07d21ac6..ebd96d98 160000 --- a/3rdpart/ZLToolKit +++ b/3rdpart/ZLToolKit @@ -1 +1 @@ -Subproject commit 07d21ac61be6c7a4eba90a5d2d26b15daa882cf7 +Subproject commit ebd96d983d8dd3268e3e77ed08fb57d67666061c diff --git a/src/Rtp/RtpProcess.cpp b/src/Rtp/RtpProcess.cpp index 784e7b8e..b0f23721 100644 --- a/src/Rtp/RtpProcess.cpp +++ b/src/Rtp/RtpProcess.cpp @@ -68,13 +68,11 @@ RtpProcess::RtpProcess(uint32_t ssrc) { _track->_samplerate = 90000; _track->_type = TrackVideo; _track->_ssrc = _ssrc; - DebugL << printSSRC(_ssrc); - GET_CONFIG(bool,toRtxp,General::kPublishToRtxp); - GET_CONFIG(bool,toHls,General::kPublishToHls); - GET_CONFIG(bool,toMP4,General::kPublishToMP4); - - _muxer = std::make_shared(DEFAULT_VHOST,RTP_APP_NAME,printSSRC(_ssrc),0,toRtxp,toRtxp,toHls,toMP4); + _media_info._schema = RTP_APP_NAME; + _media_info._vhost = DEFAULT_VHOST; + _media_info._app = RTP_APP_NAME; + _media_info._streamid = printSSRC(_ssrc); GET_CONFIG(string,dump_dir,RtpProxy::kDumpDir); { @@ -107,11 +105,22 @@ RtpProcess::RtpProcess(uint32_t ssrc) { } RtpProcess::~RtpProcess() { - if(_addr){ - DebugL << printSSRC(_ssrc) << " " << printAddress(_addr); + DebugP(this); + if (_addr) { delete _addr; - }else{ - DebugL << printSSRC(_ssrc); + } + + uint64_t duration = (_last_rtp_time.createdTime() - _last_rtp_time.elapsedTime()) / 1000; + WarnP(this) << "RTP推流器(" + << _media_info._vhost << "/" + << _media_info._app << "/" + << _media_info._streamid + << ")断开,耗时(s):" << duration; + + //流量统计事件广播 + GET_CONFIG(uint32_t, iFlowThreshold, General::kFlowThreshold); + if (_ui64TotalBytes > iFlowThreshold * 1024) { + NoticeCenter::Instance().emitEvent(Broadcast::kBroadcastFlowReport, _media_info, _ui64TotalBytes, duration, false, static_cast(*this)); } } @@ -121,14 +130,22 @@ bool RtpProcess::inputRtp(const char *data, int data_len,const struct sockaddr * if(!_addr){ _addr = new struct sockaddr; memcpy(_addr,addr, sizeof(struct sockaddr)); - DebugL << "RtpProcess(" << printSSRC(_ssrc) << ") bind to address:" << printAddress(_addr); + DebugP(this) << "bind to address:" << printAddress(_addr); + //推流鉴权 + emitOnPublish(); } - if(check_source && memcmp(_addr,addr,sizeof(struct sockaddr)) != 0){ - DebugL << "RtpProcess(" << printSSRC(_ssrc) << ") address dismatch:" << printAddress(addr) << " != " << printAddress(_addr); + if(!_muxer){ + //无权限推流 return false; } + if(check_source && memcmp(_addr,addr,sizeof(struct sockaddr)) != 0){ + DebugP(this) << "address dismatch:" << printAddress(addr) << " != " << printAddress(_addr); + return false; + } + + _ui64TotalBytes += data_len; _last_rtp_time.resetTime(); bool ret = handleOneRtp(0,_track,(unsigned char *)data,data_len); if(dts_out){ @@ -144,7 +161,7 @@ static inline bool checkTS(const uint8_t *packet, int bytes){ void RtpProcess::onRtpSorted(const RtpPacket::Ptr &rtp, int) { if(rtp->sequence != _sequence + 1){ - WarnL << rtp->sequence << " != " << _sequence << "+1"; + WarnP(this) << rtp->sequence << " != " << _sequence << "+1"; } _sequence = rtp->sequence; if(_save_file_rtp){ @@ -165,11 +182,11 @@ void RtpProcess::onRtpDecode(const uint8_t *packet, int bytes, uint32_t timestam //创建解码器 if(checkTS(packet, bytes)){ //猜测是ts负载 - InfoL << "judged to be TS: " << printSSRC(_ssrc); + InfoP(this) << "judged to be TS: " << printSSRC(_ssrc); _decoder = Decoder::createDecoder(Decoder::decoder_ts); }else{ //猜测是ps负载 - InfoL << "judged to be PS: " << printSSRC(_ssrc); + InfoP(this) << "judged to be PS: " << printSSRC(_ssrc); _decoder = Decoder::createDecoder(Decoder::decoder_ps); } _decoder->setOnDecode([this](int stream,int codecid,int flags,int64_t pts,int64_t dts,const void *data,int bytes){ @@ -179,7 +196,7 @@ void RtpProcess::onRtpDecode(const uint8_t *packet, int bytes, uint32_t timestam auto ret = _decoder->input((uint8_t *)packet,bytes); if(ret != bytes){ - WarnL << ret << " != " << bytes << " " << flags; + WarnP(this) << ret << " != " << bytes << " " << flags; } } @@ -213,13 +230,13 @@ void RtpProcess::onDecode(int stream,int codecid,int flags,int64_t pts,int64_t d if (!_codecid_video) { //获取到视频 _codecid_video = codecid; - InfoL << "got video track: H264"; + InfoP(this) << "got video track: H264"; auto track = std::make_shared(); _muxer->addTrack(track); } if (codecid != _codecid_video) { - WarnL << "video track change to H264 from codecid:" << getCodecName(_codecid_video); + WarnP(this) << "video track change to H264 from codecid:" << getCodecName(_codecid_video); return; } @@ -238,12 +255,12 @@ void RtpProcess::onDecode(int stream,int codecid,int flags,int64_t pts,int64_t d if (!_codecid_video) { //获取到视频 _codecid_video = codecid; - InfoL << "got video track: H265"; + InfoP(this) << "got video track: H265"; auto track = std::make_shared(); _muxer->addTrack(track); } if (codecid != _codecid_video) { - WarnL << "video track change to H265 from codecid:" << getCodecName(_codecid_video); + WarnP(this) << "video track change to H265 from codecid:" << getCodecName(_codecid_video); return; } if(_save_file_video){ @@ -261,13 +278,13 @@ void RtpProcess::onDecode(int stream,int codecid,int flags,int64_t pts,int64_t d if (!_codecid_audio) { //获取到音频 _codecid_audio = codecid; - InfoL << "got audio track: AAC"; + InfoP(this) << "got audio track: AAC"; auto track = std::make_shared(); _muxer->addTrack(track); } if (codecid != _codecid_audio) { - WarnL << "audio track change to AAC from codecid:" << getCodecName(_codecid_audio); + WarnP(this) << "audio track change to AAC from codecid:" << getCodecName(_codecid_audio); return; } _muxer->inputFrame(std::make_shared((char *) data, bytes, dts, 0, 7)); @@ -281,14 +298,14 @@ void RtpProcess::onDecode(int stream,int codecid,int flags,int64_t pts,int64_t d if (!_codecid_audio) { //获取到音频 _codecid_audio = codecid; - InfoL << "got audio track: G711"; + InfoP(this) << "got audio track: G711"; //G711传统只支持 8000/1/16的规格,FFmpeg貌似做了扩展,但是这里不管它了 auto track = std::make_shared(codec, 8000, 1, 16); _muxer->addTrack(track); } if (codecid != _codecid_audio) { - WarnL << "audio track change to G711 from codecid:" << getCodecName(_codecid_audio); + WarnP(this) << "audio track change to G711 from codecid:" << getCodecName(_codecid_audio); return; } _muxer->inputFrame(std::make_shared(codec, (char *) data, bytes, dts)); @@ -296,7 +313,7 @@ void RtpProcess::onDecode(int stream,int codecid,int flags,int64_t pts,int64_t d } default: if(codecid != 0){ - WarnL << "unsupported codec type:" << getCodecName(codecid) << " " << (int)codecid; + WarnP(this) << "unsupported codec type:" << getCodecName(codecid) << " " << (int)codecid; } return; } @@ -310,20 +327,74 @@ bool RtpProcess::alive() { return false; } -string RtpProcess::get_peer_ip() { - return SockUtil::inet_ntoa(((struct sockaddr_in *) _addr)->sin_addr); +const string& RtpProcess::get_peer_ip() { + if(_peer_ip.empty() && _addr){ + _peer_ip = SockUtil::inet_ntoa(((struct sockaddr_in *) _addr)->sin_addr); + } + return _peer_ip; } uint16_t RtpProcess::get_peer_port() { + if(!_addr){ + return 0; + } return ntohs(((struct sockaddr_in *) _addr)->sin_port); } +const string& RtpProcess::get_local_ip() { + //todo + return _local_ip; +} + +uint16_t RtpProcess::get_local_port() { + //todo + return 0; +} + +string RtpProcess::getIdentifier() const{ + return _media_info._streamid; +} + int RtpProcess::totalReaderCount(){ - return _muxer->totalReaderCount(); + return _muxer ? _muxer->totalReaderCount() : 0; } void RtpProcess::setListener(const std::weak_ptr &listener){ - _muxer->setMediaListener(listener); + if(_muxer){ + _muxer->setMediaListener(listener); + }else{ + _listener = listener; + } +} + +void RtpProcess::emitOnPublish() { + weak_ptr weak_self = shared_from_this(); + Broadcast::PublishAuthInvoker invoker = [weak_self](const string &err, bool enableRtxp, bool enableHls, bool enableMP4) { + auto strongSelf = weak_self.lock(); + if (!strongSelf) { + return; + } + if (err.empty()) { + strongSelf->_muxer = std::make_shared(strongSelf->_media_info._vhost, + strongSelf->_media_info._app, + strongSelf->_media_info._streamid, 0, + enableRtxp, enableRtxp, enableHls, enableMP4); + strongSelf->_muxer->setMediaListener(strongSelf->_listener); + InfoP(strongSelf) << "允许RTP推流"; + } else { + WarnP(strongSelf) << "禁止RTP推流:" << err; + } + }; + + //触发推流鉴权事件 + auto flag = NoticeCenter::Instance().emitEvent(Broadcast::kBroadcastMediaPublish, _media_info, invoker, static_cast(*this)); + if(!flag){ + //该事件无人监听,默认不鉴权 + GET_CONFIG(bool, toRtxp, General::kPublishToRtxp); + GET_CONFIG(bool, toHls, General::kPublishToHls); + GET_CONFIG(bool, toMP4, General::kPublishToMP4); + invoker("", toRtxp, toHls, toMP4); + } } diff --git a/src/Rtp/RtpProcess.h b/src/Rtp/RtpProcess.h index 50f0a6bb..b8841402 100644 --- a/src/Rtp/RtpProcess.h +++ b/src/Rtp/RtpProcess.h @@ -24,21 +24,31 @@ namespace mediakit{ string printSSRC(uint32_t ui32Ssrc); class FrameMerger; -class RtpProcess : public RtpReceiver , public RtpDecoder{ +class RtpProcess : public RtpReceiver , public RtpDecoder, public SockInfo, public std::enable_shared_from_this{ public: typedef std::shared_ptr Ptr; RtpProcess(uint32_t ssrc); ~RtpProcess(); bool inputRtp(const char *data,int data_len, const struct sockaddr *addr , uint32_t *dts_out = nullptr); bool alive(); - string get_peer_ip(); - uint16_t get_peer_port(); + + const string &get_local_ip() override; + uint16_t get_local_port() override; + const string &get_peer_ip() override; + uint16_t get_peer_port() override; + string getIdentifier() const override; + int totalReaderCount(); void setListener(const std::weak_ptr &listener); + protected: void onRtpSorted(const RtpPacket::Ptr &rtp, int track_index) override ; void onRtpDecode(const uint8_t *packet, int bytes, uint32_t timestamp, int flags) override; void onDecode(int stream,int codecid,int flags,int64_t pts,int64_t dts, const void *data,int bytes); + +private: + void emitOnPublish(); + private: std::shared_ptr _save_file_rtp; std::shared_ptr _save_file_ps; @@ -55,6 +65,11 @@ private: unordered_map _stamps; uint32_t _dts = 0; Decoder::Ptr _decoder; + string _peer_ip; + string _local_ip; + std::weak_ptr _listener; + MediaInfo _media_info; + uint64_t _ui64TotalBytes = 0; }; }//namespace mediakit