From 04aa3ef41f72debe7c29eb6f15092660405199a0 Mon Sep 17 00:00:00 2001 From: custompal <1040801756@qq.com> Date: Tue, 30 Aug 2022 21:05:19 +0800 Subject: [PATCH 1/6] =?UTF-8?q?=E5=A2=9E=E5=8A=A0=E8=8E=B7=E5=8F=96?= =?UTF-8?q?=E5=AA=92=E4=BD=93=E6=B5=81=E6=92=AD=E6=94=BE=E5=99=A8=E5=88=97?= =?UTF-8?q?=E8=A1=A8=E5=8A=9F=E8=83=BD?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- server/WebApi.cpp | 26 ++++++++++++++++++ src/Common/MediaSource.h | 54 +++++--------------------------------- src/FMP4/FMP4MediaSource.h | 5 ++++ src/Http/HttpSession.cpp | 6 +++++ src/Rtmp/FlvMuxer.cpp | 3 +++ src/Rtmp/RtmpMediaSource.h | 5 ++++ src/Rtmp/RtmpSession.cpp | 1 + src/Rtsp/RtspMediaSource.h | 5 ++++ src/Rtsp/RtspSession.cpp | 3 +++ src/TS/TSMediaSource.h | 5 ++++ srt/SrtTransportImp.cpp | 2 ++ webrtc/WebRtcPlayer.cpp | 2 ++ 12 files changed, 70 insertions(+), 47 deletions(-) diff --git a/server/WebApi.cpp b/server/WebApi.cpp index 4de4449b..4546657d 100755 --- a/server/WebApi.cpp +++ b/server/WebApi.cpp @@ -741,6 +741,32 @@ void installWebApi() { val["online"] = (bool) (MediaSource::find(allArgs["schema"],allArgs["vhost"],allArgs["app"],allArgs["stream"])); }); + api_regist("/index/api/getMediaPlayerList",[](API_ARGS_MAP_ASYNC){ + CHECK_SECRET(); + CHECK_ARGS("schema", "vhost", "app", "stream"); + auto src = MediaSource::find(allArgs["schema"], allArgs["vhost"], allArgs["app"], allArgs["stream"]); + if (!src) { + throw ApiRetException("can not find the stream", API::NotFound); + } + src->getPlayerList( + [=](const std::list> &info_list) mutable { + val["code"] = API::Success; + auto &data = val["data"]; + for (auto &info : info_list) { + auto obj = reinterpret_pointer_cast(info); + data.append(std::move(*obj)); + } + invoker(200, headerOut, val.toStyledString()); + }, + [](std::shared_ptr &&info) -> std::shared_ptr { + auto obj = std::make_shared(); + auto session = reinterpret_pointer_cast(info); + (*obj)["peer_ip"] = session->get_peer_ip(); + (*obj)["peer_port"] = session->get_peer_port(); + return obj; + }); + }); + //测试url http://127.0.0.1/index/api/getMediaInfo?schema=rtsp&vhost=__defaultVhost__&app=live&stream=obs api_regist("/index/api/getMediaInfo",[](API_ARGS_MAP_ASYNC){ CHECK_SECRET(); diff --git a/src/Common/MediaSource.h b/src/Common/MediaSource.h index 2ea5204d..fbeaa115 100644 --- a/src/Common/MediaSource.h +++ b/src/Common/MediaSource.h @@ -192,52 +192,6 @@ public: std::string _param_strs; }; -class BytesSpeed { -public: - BytesSpeed() = default; - ~BytesSpeed() = default; - - /** - * 添加统计字节 - */ - BytesSpeed& operator += (size_t bytes) { - _bytes += bytes; - if (_bytes > 1024 * 1024) { - //数据大于1MB就计算一次网速 - computeSpeed(); - } - return *this; - } - - /** - * 获取速度,单位bytes/s - */ - int getSpeed() { - if (_ticker.elapsedTime() < 1000) { - //获取频率小于1秒,那么返回上次计算结果 - return _speed; - } - return computeSpeed(); - } - -private: - int computeSpeed() { - auto elapsed = _ticker.elapsedTime(); - if (!elapsed) { - return _speed; - } - _speed = (int)(_bytes * 1000 / elapsed); - _ticker.resetTime(); - _bytes = 0; - return _speed; - } - -private: - int _speed = 0; - size_t _bytes = 0; - toolkit::Ticker _ticker; -}; - /** * 媒体源,任何rtsp/rtmp的直播流都源自该对象 */ @@ -293,6 +247,12 @@ public: virtual int readerCount() = 0; // 观看者个数,包括(hls/rtsp/rtmp) virtual int totalReaderCount(); + // 获取播放器列表 + virtual void getPlayerList(const std::function> &info_list)> &cb, + const std::function(std::shared_ptr &&info)> &on_change) { + assert(cb); + cb(std::list>()); + } // 获取媒体源类型 MediaOriginType getOriginType() const; @@ -350,7 +310,7 @@ private: void emitEvent(bool regist); protected: - BytesSpeed _speed[TrackMax]; + toolkit::BytesSpeed _speed[TrackMax]; private: std::atomic_flag _owned { false }; diff --git a/src/FMP4/FMP4MediaSource.h b/src/FMP4/FMP4MediaSource.h index 65fc7172..e4c6a15f 100644 --- a/src/FMP4/FMP4MediaSource.h +++ b/src/FMP4/FMP4MediaSource.h @@ -51,6 +51,11 @@ public: return _ring; } + void getPlayerList(const std::function> &info_list)> &cb, + const std::function(std::shared_ptr &&info)> &on_change) override { + _ring->getInfoList(cb, on_change); + } + /** * 获取fmp4 init segment */ diff --git a/src/Http/HttpSession.cpp b/src/Http/HttpSession.cpp index c90544b3..88a3be4b 100644 --- a/src/Http/HttpSession.cpp +++ b/src/Http/HttpSession.cpp @@ -285,6 +285,9 @@ bool HttpSession::checkLiveStreamFMP4(const function &cb){ } strong_self->shutdown(SockException(Err_shutdown, "fmp4 ring buffer detached")); }); + + _fmp4_reader->setGetInfoCB([weak_self]() { return weak_self.lock(); }); + _fmp4_reader->setReadCB([weak_self](const FMP4MediaSource::RingDataType &fmp4_list) { auto strong_self = weak_self.lock(); if (!strong_self) { @@ -326,6 +329,9 @@ bool HttpSession::checkLiveStreamTS(const function &cb){ } strong_self->shutdown(SockException(Err_shutdown,"ts ring buffer detached")); }); + + _ts_reader->setGetInfoCB([weak_self]() { return weak_self.lock(); }); + _ts_reader->setReadCB([weak_self](const TSMediaSource::RingDataType &ts_list) { auto strong_self = weak_self.lock(); if (!strong_self) { diff --git a/src/Rtmp/FlvMuxer.cpp b/src/Rtmp/FlvMuxer.cpp index a58dd665..7e54d0f7 100644 --- a/src/Rtmp/FlvMuxer.cpp +++ b/src/Rtmp/FlvMuxer.cpp @@ -54,6 +54,9 @@ void FlvMuxer::start(const EventPoller::Ptr &poller, const RtmpMediaSource::Ptr }); bool check = start_pts > 0; + + _ring_reader->setGetInfoCB([weakSelf]() { return weakSelf.lock(); }); + _ring_reader->setReadCB([weakSelf, start_pts, check](const RtmpMediaSource::RingDataType &pkt) mutable { auto strongSelf = weakSelf.lock(); if (!strongSelf) { diff --git a/src/Rtmp/RtmpMediaSource.h b/src/Rtmp/RtmpMediaSource.h index a08b4e49..f92246c8 100644 --- a/src/Rtmp/RtmpMediaSource.h +++ b/src/Rtmp/RtmpMediaSource.h @@ -69,6 +69,11 @@ public: return _ring; } + void getPlayerList(const std::function> &info_list)> &cb, + const std::function(std::shared_ptr &&info)> &on_change) override { + _ring->getInfoList(cb, on_change); + } + /** * 获取播放器个数 * @return diff --git a/src/Rtmp/RtmpSession.cpp b/src/Rtmp/RtmpSession.cpp index 2135fe9c..e69d9391 100644 --- a/src/Rtmp/RtmpSession.cpp +++ b/src/Rtmp/RtmpSession.cpp @@ -308,6 +308,7 @@ void RtmpSession::sendPlayResponse(const string &err, const RtmpMediaSource::Ptr src->pause(false); _ring_reader = src->getRing()->attach(getPoller()); weak_ptr weakSelf = dynamic_pointer_cast(shared_from_this()); + _ring_reader->setGetInfoCB([weakSelf]() { return weakSelf.lock(); }); _ring_reader->setReadCB([weakSelf](const RtmpMediaSource::RingDataType &pkt) { auto strongSelf = weakSelf.lock(); if (!strongSelf) { diff --git a/src/Rtsp/RtspMediaSource.h b/src/Rtsp/RtspMediaSource.h index 03fdd1b2..69e3f93b 100644 --- a/src/Rtsp/RtspMediaSource.h +++ b/src/Rtsp/RtspMediaSource.h @@ -65,6 +65,11 @@ public: return _ring; } + void getPlayerList(const std::function> &info_list)> &cb, + const std::function(std::shared_ptr &&info)> &on_change) override { + _ring->getInfoList(cb, on_change); + } + /** * 获取播放器个数 */ diff --git a/src/Rtsp/RtspSession.cpp b/src/Rtsp/RtspSession.cpp index 5d7d675b..f7a2af93 100644 --- a/src/Rtsp/RtspSession.cpp +++ b/src/Rtsp/RtspSession.cpp @@ -836,6 +836,9 @@ void RtspSession::handleReq_Play(const Parser &parser) { if (!_play_reader && _rtp_type != Rtsp::RTP_MULTICAST) { weak_ptr weakSelf = dynamic_pointer_cast(shared_from_this()); _play_reader = play_src->getRing()->attach(getPoller(), useGOP); + + _play_reader->setGetInfoCB([weakSelf]() { return weakSelf.lock(); }); + _play_reader->setDetachCB([weakSelf]() { auto strongSelf = weakSelf.lock(); if (!strongSelf) { diff --git a/src/TS/TSMediaSource.h b/src/TS/TSMediaSource.h index bbd2f498..101dda9c 100644 --- a/src/TS/TSMediaSource.h +++ b/src/TS/TSMediaSource.h @@ -51,6 +51,11 @@ public: return _ring; } + void getPlayerList(const std::function> &info_list)> &cb, + const std::function(std::shared_ptr &&info)> &on_change) override { + _ring->getInfoList(cb, on_change); + } + /** * 获取播放器个数 */ diff --git a/srt/SrtTransportImp.cpp b/srt/SrtTransportImp.cpp index aebb6cbf..564bc17e 100644 --- a/srt/SrtTransportImp.cpp +++ b/srt/SrtTransportImp.cpp @@ -234,6 +234,8 @@ void SrtTransportImp::doPlay() { } strong_self->onShutdown(SockException(Err_shutdown)); }); + weak_ptr weak_session = strong_self->getSession(); + strong_self->_ts_reader->setGetInfoCB([weak_session]() { return weak_session.lock(); }); strong_self->_ts_reader->setReadCB([weak_self](const TSMediaSource::RingDataType &ts_list) { auto strong_self = weak_self.lock(); if (!strong_self) { diff --git a/webrtc/WebRtcPlayer.cpp b/webrtc/WebRtcPlayer.cpp index 69a595d4..b9a8eb5e 100644 --- a/webrtc/WebRtcPlayer.cpp +++ b/webrtc/WebRtcPlayer.cpp @@ -39,6 +39,8 @@ void WebRtcPlayer::onStartWebRTC() { _play_src->pause(false); _reader = _play_src->getRing()->attach(getPoller(), true); weak_ptr weak_self = static_pointer_cast(shared_from_this()); + weak_ptr weak_session = getSession(); + _reader->setGetInfoCB([weak_session]() { return weak_session.lock(); }); _reader->setReadCB([weak_self](const RtspMediaSource::RingDataType &pkt) { auto strongSelf = weak_self.lock(); if (!strongSelf) { From 0d6509ce03e07c07ba586e25fdc6445e9cd3e337 Mon Sep 17 00:00:00 2001 From: custompal <1040801756@qq.com> Date: Tue, 30 Aug 2022 21:24:25 +0800 Subject: [PATCH 2/6] =?UTF-8?q?=E7=BB=9F=E4=B8=80=E9=83=A8=E5=88=86?= =?UTF-8?q?=E5=8F=98=E9=87=8F=E5=90=8D=E5=92=8C=E4=BB=A3=E7=A0=81=E6=A0=B7?= =?UTF-8?q?=E5=BC=8F=E9=A3=8E=E6=A0=BC?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- src/Http/HttpSession.cpp | 44 +++++---- src/Rtmp/FlvMuxer.cpp | 32 ++++--- src/Rtmp/RtmpSession.cpp | 40 ++++----- src/Rtp/RtpSelector.h | 2 +- src/Rtsp/RtspSession.cpp | 186 +++++++++++++++++++-------------------- srt/SrtTransportImp.cpp | 4 +- webrtc/WebRtcPlayer.cpp | 12 +-- 7 files changed, 156 insertions(+), 164 deletions(-) diff --git a/src/Http/HttpSession.cpp b/src/Http/HttpSession.cpp index 88a3be4b..413bf9dd 100644 --- a/src/Http/HttpSession.cpp +++ b/src/Http/HttpSession.cpp @@ -245,8 +245,8 @@ bool HttpSession::checkLiveStream(const string &schema, const string &url_suffi }; Broadcast::AuthInvoker invoker = [weak_self, onRes](const string &err) { - if (auto strongSelf = weak_self.lock()) { - strongSelf->async([onRes, err]() { onRes(err); }); + if (auto strong_self = weak_self.lock()) { + strong_self->async([onRes, err]() { onRes(err); }); } }; @@ -277,6 +277,7 @@ bool HttpSession::checkLiveStreamFMP4(const function &cb){ weak_ptr weak_self = dynamic_pointer_cast(shared_from_this()); fmp4_src->pause(false); _fmp4_reader = fmp4_src->getRing()->attach(getPoller()); + _fmp4_reader->setGetInfoCB([weak_self]() { return weak_self.lock(); }); _fmp4_reader->setDetachCB([weak_self]() { auto strong_self = weak_self.lock(); if (!strong_self) { @@ -285,9 +286,6 @@ bool HttpSession::checkLiveStreamFMP4(const function &cb){ } strong_self->shutdown(SockException(Err_shutdown, "fmp4 ring buffer detached")); }); - - _fmp4_reader->setGetInfoCB([weak_self]() { return weak_self.lock(); }); - _fmp4_reader->setReadCB([weak_self](const FMP4MediaSource::RingDataType &fmp4_list) { auto strong_self = weak_self.lock(); if (!strong_self) { @@ -321,6 +319,7 @@ bool HttpSession::checkLiveStreamTS(const function &cb){ weak_ptr weak_self = dynamic_pointer_cast(shared_from_this()); ts_src->pause(false); _ts_reader = ts_src->getRing()->attach(getPoller()); + _ts_reader->setGetInfoCB([weak_self]() { return weak_self.lock(); }); _ts_reader->setDetachCB([weak_self](){ auto strong_self = weak_self.lock(); if (!strong_self) { @@ -329,9 +328,6 @@ bool HttpSession::checkLiveStreamTS(const function &cb){ } strong_self->shutdown(SockException(Err_shutdown,"ts ring buffer detached")); }); - - _ts_reader->setGetInfoCB([weak_self]() { return weak_self.lock(); }); - _ts_reader->setReadCB([weak_self](const TSMediaSource::RingDataType &ts_list) { auto strong_self = weak_self.lock(); if (!strong_self) { @@ -418,19 +414,19 @@ void HttpSession::Handle_Req_GET_l(ssize_t &content_len, bool sendBody) { } bool bClose = !strcasecmp(_parser["Connection"].data(),"close"); - weak_ptr weakSelf = dynamic_pointer_cast(shared_from_this()); - HttpFileManager::onAccessPath(*this, _parser, [weakSelf, bClose](int code, const string &content_type, + weak_ptr weak_self = dynamic_pointer_cast(shared_from_this()); + HttpFileManager::onAccessPath(*this, _parser, [weak_self, bClose](int code, const string &content_type, const StrCaseMap &responseHeader, const HttpBody::Ptr &body) { - auto strongSelf = weakSelf.lock(); - if (!strongSelf) { + auto strong_self = weak_self.lock(); + if (!strong_self) { return; } - strongSelf->async([weakSelf, bClose, code, content_type, responseHeader, body]() { - auto strongSelf = weakSelf.lock(); - if (!strongSelf) { + strong_self->async([weak_self, bClose, code, content_type, responseHeader, body]() { + auto strong_self = weak_self.lock(); + if (!strong_self) { return; } - strongSelf->sendResponse(code, bClose, content_type.data(), responseHeader, body); + strong_self->sendResponse(code, bClose, content_type.data(), responseHeader, body); }); }); } @@ -651,19 +647,19 @@ void HttpSession::urlDecode(Parser &parser){ bool HttpSession::emitHttpEvent(bool doInvoke){ bool bClose = !strcasecmp(_parser["Connection"].data(),"close"); /////////////////////异步回复Invoker/////////////////////////////// - weak_ptr weakSelf = dynamic_pointer_cast(shared_from_this()); - HttpResponseInvoker invoker = [weakSelf,bClose](int code, const KeyValue &headerOut, const HttpBody::Ptr &body){ - auto strongSelf = weakSelf.lock(); - if(!strongSelf) { + weak_ptr weak_self = dynamic_pointer_cast(shared_from_this()); + HttpResponseInvoker invoker = [weak_self,bClose](int code, const KeyValue &headerOut, const HttpBody::Ptr &body){ + auto strong_self = weak_self.lock(); + if(!strong_self) { return; } - strongSelf->async([weakSelf, bClose, code, headerOut, body]() { - auto strongSelf = weakSelf.lock(); - if (!strongSelf) { + strong_self->async([weak_self, bClose, code, headerOut, body]() { + auto strong_self = weak_self.lock(); + if (!strong_self) { //本对象已经销毁 return; } - strongSelf->sendResponse(code, bClose, nullptr, headerOut, body); + strong_self->sendResponse(code, bClose, nullptr, headerOut, body); }); }; ///////////////////广播HTTP事件/////////////////////////// diff --git a/src/Rtmp/FlvMuxer.cpp b/src/Rtmp/FlvMuxer.cpp index 7e54d0f7..8439e670 100644 --- a/src/Rtmp/FlvMuxer.cpp +++ b/src/Rtmp/FlvMuxer.cpp @@ -28,12 +28,12 @@ void FlvMuxer::start(const EventPoller::Ptr &poller, const RtmpMediaSource::Ptr throw std::runtime_error("RtmpMediaSource 无效"); } if (!poller->isCurrentThread()) { - weak_ptr weakSelf = getSharedPtr(); + weak_ptr weak_self = getSharedPtr(); //延时两秒启动录制,目的是为了等待config帧收集完毕 - poller->doDelayTask(2000, [weakSelf, poller, media, start_pts]() { - auto strongSelf = weakSelf.lock(); - if (strongSelf) { - strongSelf->start(poller, media, start_pts); + poller->doDelayTask(2000, [weak_self, poller, media, start_pts]() { + auto strong_self = weak_self.lock(); + if (strong_self) { + strong_self->start(poller, media, start_pts); } return 0; }); @@ -42,24 +42,22 @@ void FlvMuxer::start(const EventPoller::Ptr &poller, const RtmpMediaSource::Ptr onWriteFlvHeader(media); - std::weak_ptr weakSelf = getSharedPtr(); + std::weak_ptr weak_self = getSharedPtr(); media->pause(false); _ring_reader = media->getRing()->attach(poller); - _ring_reader->setDetachCB([weakSelf]() { - auto strongSelf = weakSelf.lock(); - if (!strongSelf) { + _ring_reader->setGetInfoCB([weak_self]() { return weak_self.lock(); }); + _ring_reader->setDetachCB([weak_self]() { + auto strong_self = weak_self.lock(); + if (!strong_self) { return; } - strongSelf->onDetach(); + strong_self->onDetach(); }); bool check = start_pts > 0; - - _ring_reader->setGetInfoCB([weakSelf]() { return weakSelf.lock(); }); - - _ring_reader->setReadCB([weakSelf, start_pts, check](const RtmpMediaSource::RingDataType &pkt) mutable { - auto strongSelf = weakSelf.lock(); - if (!strongSelf) { + _ring_reader->setReadCB([weak_self, start_pts, check](const RtmpMediaSource::RingDataType &pkt) mutable { + auto strong_self = weak_self.lock(); + if (!strong_self) { return; } @@ -72,7 +70,7 @@ void FlvMuxer::start(const EventPoller::Ptr &poller, const RtmpMediaSource::Ptr } check = false; } - strongSelf->onWriteRtmp(rtmp, ++i == size); + strong_self->onWriteRtmp(rtmp, ++i == size); }); }); } diff --git a/src/Rtmp/RtmpSession.cpp b/src/Rtmp/RtmpSession.cpp index e69d9391..aa9d5e9c 100644 --- a/src/Rtmp/RtmpSession.cpp +++ b/src/Rtmp/RtmpSession.cpp @@ -199,13 +199,13 @@ void RtmpSession::onCmd_publish(AMFDecoder &dec) { } Broadcast::PublishAuthInvoker invoker = [weak_self, on_res, pToken](const string &err, const ProtocolOption &option) { - auto strongSelf = weak_self.lock(); - if (!strongSelf) { + auto strong_self = weak_self.lock(); + if (!strong_self) { return; } - strongSelf->async([weak_self, on_res, err, pToken, option]() { - auto strongSelf = weak_self.lock(); - if (!strongSelf) { + strong_self->async([weak_self, on_res, err, pToken, option]() { + auto strong_self = weak_self.lock(); + if (!strong_self) { return; } on_res(err, option); @@ -307,29 +307,29 @@ void RtmpSession::sendPlayResponse(const string &err, const RtmpMediaSource::Ptr src->pause(false); _ring_reader = src->getRing()->attach(getPoller()); - weak_ptr weakSelf = dynamic_pointer_cast(shared_from_this()); - _ring_reader->setGetInfoCB([weakSelf]() { return weakSelf.lock(); }); - _ring_reader->setReadCB([weakSelf](const RtmpMediaSource::RingDataType &pkt) { - auto strongSelf = weakSelf.lock(); - if (!strongSelf) { + weak_ptr weak_self = dynamic_pointer_cast(shared_from_this()); + _ring_reader->setGetInfoCB([weak_self]() { return weak_self.lock(); }); + _ring_reader->setReadCB([weak_self](const RtmpMediaSource::RingDataType &pkt) { + auto strong_self = weak_self.lock(); + if (!strong_self) { return; } size_t i = 0; auto size = pkt->size(); - strongSelf->setSendFlushFlag(false); + strong_self->setSendFlushFlag(false); pkt->for_each([&](const RtmpPacket::Ptr &rtmp){ if(++i == size){ - strongSelf->setSendFlushFlag(true); + strong_self->setSendFlushFlag(true); } - strongSelf->onSendMedia(rtmp); + strong_self->onSendMedia(rtmp); }); }); - _ring_reader->setDetachCB([weakSelf]() { - auto strongSelf = weakSelf.lock(); - if (!strongSelf) { + _ring_reader->setDetachCB([weak_self]() { + auto strong_self = weak_self.lock(); + if (!strong_self) { return; } - strongSelf->shutdown(SockException(Err_shutdown,"rtmp ring buffer detached")); + strong_self->shutdown(SockException(Err_shutdown,"rtmp ring buffer detached")); }); src->pause(false); _play_src = src; @@ -361,9 +361,9 @@ void RtmpSession::doPlay(AMFDecoder &dec){ std::shared_ptr ticker(new Ticker); weak_ptr weak_self = dynamic_pointer_cast(shared_from_this()); std::shared_ptr token(new onceToken(nullptr, [ticker,weak_self](){ - auto strongSelf = weak_self.lock(); - if (strongSelf) { - DebugP(strongSelf.get()) << "play 回复时间:" << ticker->elapsedTime() << "ms"; + auto strong_self = weak_self.lock(); + if (strong_self) { + DebugP(strong_self.get()) << "play 回复时间:" << ticker->elapsedTime() << "ms"; } })); Broadcast::AuthInvoker invoker = [weak_self,token](const string &err){ diff --git a/src/Rtp/RtpSelector.h b/src/Rtp/RtpSelector.h index 0befef6b..ae796ac6 100644 --- a/src/Rtp/RtpSelector.h +++ b/src/Rtp/RtpSelector.h @@ -36,7 +36,7 @@ protected: private: std::string _stream_id; RtpProcess::Ptr _process; - std::weak_ptr _parent; + std::weak_ptr _parent; }; class RtpSelector : public std::enable_shared_from_this{ diff --git a/src/Rtsp/RtspSession.cpp b/src/Rtsp/RtspSession.cpp index f7a2af93..91a51dc7 100644 --- a/src/Rtsp/RtspSession.cpp +++ b/src/Rtsp/RtspSession.cpp @@ -283,15 +283,15 @@ void RtspSession::handleReq_ANNOUNCE(const Parser &parser) { sendRtspResponse("200 OK"); }; - weak_ptr weakSelf = dynamic_pointer_cast(shared_from_this()); - Broadcast::PublishAuthInvoker invoker = [weakSelf, onRes](const string &err, const ProtocolOption &option) { - auto strongSelf = weakSelf.lock(); - if (!strongSelf) { + weak_ptr weak_self = dynamic_pointer_cast(shared_from_this()); + Broadcast::PublishAuthInvoker invoker = [weak_self, onRes](const string &err, const ProtocolOption &option) { + auto strong_self = weak_self.lock(); + if (!strong_self) { return; } - strongSelf->async([weakSelf, onRes, err, option]() { - auto strongSelf = weakSelf.lock(); - if (!strongSelf) { + strong_self->async([weak_self, onRes, err, option]() { + auto strong_self = weak_self.lock(); + if (!strong_self) { return; } onRes(err, option); @@ -330,28 +330,28 @@ void RtspSession::handleReq_RECORD(const Parser &parser){ } void RtspSession::emitOnPlay(){ - weak_ptr weakSelf = dynamic_pointer_cast(shared_from_this()); + weak_ptr weak_self = dynamic_pointer_cast(shared_from_this()); //url鉴权回调 - auto onRes = [weakSelf](const string &err) { - auto strongSelf = weakSelf.lock(); - if (!strongSelf) { + auto onRes = [weak_self](const string &err) { + auto strong_self = weak_self.lock(); + if (!strong_self) { return; } if (!err.empty()) { //播放url鉴权失败 - strongSelf->sendRtspResponse("401 Unauthorized", {"Content-Type", "text/plain"}, err); - strongSelf->shutdown(SockException(Err_shutdown, StrPrinter << "401 Unauthorized:" << err)); + strong_self->sendRtspResponse("401 Unauthorized", {"Content-Type", "text/plain"}, err); + strong_self->shutdown(SockException(Err_shutdown, StrPrinter << "401 Unauthorized:" << err)); return; } - strongSelf->onAuthSuccess(); + strong_self->onAuthSuccess(); }; - Broadcast::AuthInvoker invoker = [weakSelf, onRes](const string &err) { - auto strongSelf = weakSelf.lock(); - if (!strongSelf) { + Broadcast::AuthInvoker invoker = [weak_self, onRes](const string &err) { + auto strong_self = weak_self.lock(); + if (!strong_self) { return; } - strongSelf->async([onRes, err, weakSelf]() { + strong_self->async([onRes, err, weak_self]() { onRes(err); }); }; @@ -369,29 +369,29 @@ void RtspSession::emitOnPlay(){ void RtspSession::handleReq_Describe(const Parser &parser) { //该请求中的认证信息 auto authorization = parser["Authorization"]; - weak_ptr weakSelf = dynamic_pointer_cast(shared_from_this()); + weak_ptr weak_self = dynamic_pointer_cast(shared_from_this()); //rtsp专属鉴权是否开启事件回调 - onGetRealm invoker = [weakSelf, authorization](const string &realm) { - auto strongSelf = weakSelf.lock(); - if (!strongSelf) { + onGetRealm invoker = [weak_self, authorization](const string &realm) { + auto strong_self = weak_self.lock(); + if (!strong_self) { //本对象已经销毁 return; } //切换到自己的线程然后执行 - strongSelf->async([weakSelf, realm, authorization]() { - auto strongSelf = weakSelf.lock(); - if (!strongSelf) { + strong_self->async([weak_self, realm, authorization]() { + auto strong_self = weak_self.lock(); + if (!strong_self) { //本对象已经销毁 return; } if (realm.empty()) { //无需rtsp专属认证, 那么继续url通用鉴权认证(on_play) - strongSelf->emitOnPlay(); + strong_self->emitOnPlay(); return; } //该流需要rtsp专属认证,开启rtsp专属认证后,将不再触发url通用鉴权认证(on_play) - strongSelf->_rtsp_realm = realm; - strongSelf->onAuthUser(realm, authorization); + strong_self->_rtsp_realm = realm; + strong_self->onAuthUser(realm, authorization); }); }; @@ -408,43 +408,43 @@ void RtspSession::handleReq_Describe(const Parser &parser) { void RtspSession::onAuthSuccess() { TraceP(this); - weak_ptr weakSelf = dynamic_pointer_cast(shared_from_this()); - MediaSource::findAsync(_media_info, weakSelf.lock(), [weakSelf](const MediaSource::Ptr &src){ - auto strongSelf = weakSelf.lock(); - if(!strongSelf){ + weak_ptr weak_self = dynamic_pointer_cast(shared_from_this()); + MediaSource::findAsync(_media_info, weak_self.lock(), [weak_self](const MediaSource::Ptr &src){ + auto strong_self = weak_self.lock(); + if(!strong_self){ return; } auto rtsp_src = dynamic_pointer_cast(src); if (!rtsp_src) { //未找到相应的MediaSource - string err = StrPrinter << "no such stream:" << strongSelf->_media_info._vhost << " " << strongSelf->_media_info._app << " " << strongSelf->_media_info._streamid; - strongSelf->send_StreamNotFound(); - strongSelf->shutdown(SockException(Err_shutdown,err)); + string err = StrPrinter << "no such stream:" << strong_self->_media_info._vhost << " " << strong_self->_media_info._app << " " << strong_self->_media_info._streamid; + strong_self->send_StreamNotFound(); + strong_self->shutdown(SockException(Err_shutdown,err)); return; } //找到了相应的rtsp流 - strongSelf->_sdp_track = SdpParser(rtsp_src->getSdp()).getAvailableTrack(); - if (strongSelf->_sdp_track.empty()) { + strong_self->_sdp_track = SdpParser(rtsp_src->getSdp()).getAvailableTrack(); + if (strong_self->_sdp_track.empty()) { //该流无效 WarnL << "sdp中无有效track,该流无效:" << rtsp_src->getSdp(); - strongSelf->send_StreamNotFound(); - strongSelf->shutdown(SockException(Err_shutdown,"can not find any available track in sdp")); + strong_self->send_StreamNotFound(); + strong_self->shutdown(SockException(Err_shutdown,"can not find any available track in sdp")); return; } - strongSelf->_rtcp_context.clear(); - for (auto &track : strongSelf->_sdp_track) { - strongSelf->_rtcp_context.emplace_back(std::make_shared()); + strong_self->_rtcp_context.clear(); + for (auto &track : strong_self->_sdp_track) { + strong_self->_rtcp_context.emplace_back(std::make_shared()); } - strongSelf->_sessionid = makeRandStr(12); - strongSelf->_play_src = rtsp_src; - for(auto &track : strongSelf->_sdp_track){ + strong_self->_sessionid = makeRandStr(12); + strong_self->_play_src = rtsp_src; + for(auto &track : strong_self->_sdp_track){ track->_ssrc = rtsp_src->getSsrc(track->_type); track->_seq = rtsp_src->getSeqence(track->_type); track->_time_stamp = rtsp_src->getTimeStamp(track->_type); } - strongSelf->sendRtspResponse("200 OK", - {"Content-Base", strongSelf->_content_base + "/", + strong_self->sendRtspResponse("200 OK", + {"Content-Base", strong_self->_content_base + "/", "x-Accept-Retransmit","our-retransmit", "x-Accept-Dynamic-Rate","1" },rtsp_src->getSdp()); @@ -482,28 +482,28 @@ void RtspSession::onAuthBasic(const string &realm,const string &auth_base64){ } auto user = user_pwd_vec[0]; auto pwd = user_pwd_vec[1]; - weak_ptr weakSelf = dynamic_pointer_cast(shared_from_this()); - onAuth invoker = [pwd, realm, weakSelf](bool encrypted, const string &good_pwd) { - auto strongSelf = weakSelf.lock(); - if (!strongSelf) { + weak_ptr weak_self = dynamic_pointer_cast(shared_from_this()); + onAuth invoker = [pwd, realm, weak_self](bool encrypted, const string &good_pwd) { + auto strong_self = weak_self.lock(); + if (!strong_self) { //本对象已经销毁 return; } //切换到自己的线程执行 - strongSelf->async([weakSelf, good_pwd, pwd, realm]() { - auto strongSelf = weakSelf.lock(); - if (!strongSelf) { + strong_self->async([weak_self, good_pwd, pwd, realm]() { + auto strong_self = weak_self.lock(); + if (!strong_self) { //本对象已经销毁 return; } //base64忽略encrypted参数,上层必须传入明文密码 if (pwd == good_pwd) { //提供的密码且匹配正确 - strongSelf->onAuthSuccess(); + strong_self->onAuthSuccess(); return; } //密码错误 - strongSelf->onAuthFailed(realm, StrPrinter << "password mismatch when base64 auth:" << pwd << " != " << good_pwd); + strong_self->onAuthFailed(realm, StrPrinter << "password mismatch when base64 auth:" << pwd << " != " << good_pwd); }); }; @@ -575,16 +575,16 @@ void RtspSession::onAuthDigest(const string &realm,const string &auth_md5){ } }; - weak_ptr weakSelf = dynamic_pointer_cast(shared_from_this()); - onAuth invoker = [realInvoker,weakSelf](bool encrypted,const string &good_pwd){ - auto strongSelf = weakSelf.lock(); - if(!strongSelf){ + weak_ptr weak_self = dynamic_pointer_cast(shared_from_this()); + onAuth invoker = [realInvoker,weak_self](bool encrypted,const string &good_pwd){ + auto strong_self = weak_self.lock(); + if(!strong_self){ return; } //切换到自己的线程确保realInvoker执行时,this指针有效 - strongSelf->async([realInvoker,weakSelf,encrypted,good_pwd](){ - auto strongSelf = weakSelf.lock(); - if(!strongSelf){ + strong_self->async([realInvoker,weak_self,encrypted,good_pwd](){ + auto strong_self = weak_self.lock(); + if(!strong_self){ return; } realInvoker(false,encrypted,good_pwd); @@ -733,13 +733,13 @@ void RtspSession::handleReq_Setup(const Parser &parser) { send_NotAcceptable(); throw SockException(Err_shutdown, "can not get a available udp multicast socket"); } - weak_ptr weakSelf = dynamic_pointer_cast(shared_from_this()); - _multicaster->setDetachCB(this, [weakSelf]() { - auto strongSelf = weakSelf.lock(); - if(!strongSelf) { + weak_ptr weak_self = dynamic_pointer_cast(shared_from_this()); + _multicaster->setDetachCB(this, [weak_self]() { + auto strong_self = weak_self.lock(); + if(!strong_self) { return; } - strongSelf->safeShutdown(SockException(Err_shutdown,"ring buffer detached")); + strong_self->safeShutdown(SockException(Err_shutdown,"ring buffer detached")); }); } int iSrvPort = _multicaster->getMultiCasterPort(trackRef->_type); @@ -781,7 +781,7 @@ void RtspSession::handleReq_Play(const Parser &parser) { return; } - bool useGOP = true; + bool use_gop = true; auto &strScale = parser["Scale"]; auto &strRange = parser["Range"]; StrCaseMap res_header; @@ -801,7 +801,7 @@ void RtspSession::handleReq_Play(const Parser &parser) { strStart = "0"; } auto iStartTime = 1000 * (float) atof(strStart.data()); - useGOP = !play_src->seekTo((uint32_t) iStartTime); + use_gop = !play_src->seekTo((uint32_t) iStartTime); InfoP(this) << "rtsp seekTo(ms):" << iStartTime; } @@ -834,24 +834,22 @@ void RtspSession::handleReq_Play(const Parser &parser) { setSocketFlags(); if (!_play_reader && _rtp_type != Rtsp::RTP_MULTICAST) { - weak_ptr weakSelf = dynamic_pointer_cast(shared_from_this()); - _play_reader = play_src->getRing()->attach(getPoller(), useGOP); - - _play_reader->setGetInfoCB([weakSelf]() { return weakSelf.lock(); }); - - _play_reader->setDetachCB([weakSelf]() { - auto strongSelf = weakSelf.lock(); - if (!strongSelf) { + weak_ptr weak_self = dynamic_pointer_cast(shared_from_this()); + _play_reader = play_src->getRing()->attach(getPoller(), use_gop); + _play_reader->setGetInfoCB([weak_self]() { return weak_self.lock(); }); + _play_reader->setDetachCB([weak_self]() { + auto strong_self = weak_self.lock(); + if (!strong_self) { return; } - strongSelf->shutdown(SockException(Err_shutdown, "rtsp ring buffer detached")); + strong_self->shutdown(SockException(Err_shutdown, "rtsp ring buffer detached")); }); - _play_reader->setReadCB([weakSelf](const RtspMediaSource::RingDataType &pack) { - auto strongSelf = weakSelf.lock(); - if (!strongSelf) { + _play_reader->setReadCB([weak_self](const RtspMediaSource::RingDataType &pack) { + auto strong_self = weak_self.lock(); + if (!strong_self) { return; } - strongSelf->sendRtpPacket(pack); + strong_self->sendRtpPacket(pack); }); } } @@ -975,32 +973,32 @@ void RtspSession::onRcvPeerUdpData(int interleaved, const Buffer::Ptr &buf, cons } void RtspSession::startListenPeerUdpData(int track_idx) { - weak_ptr weakSelf = dynamic_pointer_cast(shared_from_this()); + weak_ptr weak_self = dynamic_pointer_cast(shared_from_this()); auto peer_ip = get_peer_ip(); - auto onUdpData = [weakSelf,peer_ip](const Buffer::Ptr &buf, struct sockaddr *peer_addr, int interleaved){ - auto strongSelf = weakSelf.lock(); - if (!strongSelf) { + auto onUdpData = [weak_self,peer_ip](const Buffer::Ptr &buf, struct sockaddr *peer_addr, int interleaved){ + auto strong_self = weak_self.lock(); + if (!strong_self) { return false; } if (SockUtil::inet_ntoa(peer_addr) != peer_ip) { - WarnP(strongSelf.get()) << ((interleaved % 2 == 0) ? "收到其他地址的rtp数据:" : "收到其他地址的rtcp数据:") + WarnP(strong_self.get()) << ((interleaved % 2 == 0) ? "收到其他地址的rtp数据:" : "收到其他地址的rtcp数据:") << SockUtil::inet_ntoa(peer_addr); return true; } struct sockaddr_storage addr = *((struct sockaddr_storage *)peer_addr); - strongSelf->async([weakSelf, buf, addr, interleaved]() { - auto strongSelf = weakSelf.lock(); - if (!strongSelf) { + strong_self->async([weak_self, buf, addr, interleaved]() { + auto strong_self = weak_self.lock(); + if (!strong_self) { return; } try { - strongSelf->onRcvPeerUdpData(interleaved, buf, addr); + strong_self->onRcvPeerUdpData(interleaved, buf, addr); } catch (SockException &ex) { - strongSelf->shutdown(ex); + strong_self->shutdown(ex); } catch (std::exception &ex) { - strongSelf->shutdown(SockException(Err_other, ex.what())); + strong_self->shutdown(SockException(Err_other, ex.what())); } }); return true; diff --git a/srt/SrtTransportImp.cpp b/srt/SrtTransportImp.cpp index 564bc17e..036aa0b7 100644 --- a/srt/SrtTransportImp.cpp +++ b/srt/SrtTransportImp.cpp @@ -226,6 +226,8 @@ void SrtTransportImp::doPlay() { assert(ts_src); ts_src->pause(false); strong_self->_ts_reader = ts_src->getRing()->attach(strong_self->getPoller()); + weak_ptr weak_session = strong_self->getSession(); + strong_self->_ts_reader->setGetInfoCB([weak_session]() { return weak_session.lock(); }); strong_self->_ts_reader->setDetachCB([weak_self]() { auto strong_self = weak_self.lock(); if (!strong_self) { @@ -234,8 +236,6 @@ void SrtTransportImp::doPlay() { } strong_self->onShutdown(SockException(Err_shutdown)); }); - weak_ptr weak_session = strong_self->getSession(); - strong_self->_ts_reader->setGetInfoCB([weak_session]() { return weak_session.lock(); }); strong_self->_ts_reader->setReadCB([weak_self](const TSMediaSource::RingDataType &ts_list) { auto strong_self = weak_self.lock(); if (!strong_self) { diff --git a/webrtc/WebRtcPlayer.cpp b/webrtc/WebRtcPlayer.cpp index b9a8eb5e..520ca79d 100644 --- a/webrtc/WebRtcPlayer.cpp +++ b/webrtc/WebRtcPlayer.cpp @@ -42,22 +42,22 @@ void WebRtcPlayer::onStartWebRTC() { weak_ptr weak_session = getSession(); _reader->setGetInfoCB([weak_session]() { return weak_session.lock(); }); _reader->setReadCB([weak_self](const RtspMediaSource::RingDataType &pkt) { - auto strongSelf = weak_self.lock(); - if (!strongSelf) { + auto strong_self = weak_self.lock(); + if (!strong_self) { return; } size_t i = 0; pkt->for_each([&](const RtpPacket::Ptr &rtp) { //TraceL<<"send track type:"<type<<" ts:"<getStamp()<<" ntp:"<ntp_stamp<<" size:"<getPayloadSize()<<" i:"<onSendRtp(rtp, ++i == pkt->size()); + strong_self->onSendRtp(rtp, ++i == pkt->size()); }); }); _reader->setDetachCB([weak_self]() { - auto strongSelf = weak_self.lock(); - if (!strongSelf) { + auto strong_self = weak_self.lock(); + if (!strong_self) { return; } - strongSelf->onShutdown(SockException(Err_shutdown, "rtsp ring buffer detached")); + strong_self->onShutdown(SockException(Err_shutdown, "rtsp ring buffer detached")); }); } //使用完毕后,释放强引用,这样确保推流器断开后能及时注销媒体 From 640ed13d67f19c75c9ae84b10f965cf00d5a1243 Mon Sep 17 00:00:00 2001 From: custompal <1040801756@qq.com> Date: Thu, 1 Sep 2022 17:03:16 +0800 Subject: [PATCH 3/6] =?UTF-8?q?=E6=9B=B4=E6=96=B0ZLToolKit?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- 3rdpart/ZLToolKit | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/3rdpart/ZLToolKit b/3rdpart/ZLToolKit index 658271fd..23575ba8 160000 --- a/3rdpart/ZLToolKit +++ b/3rdpart/ZLToolKit @@ -1 +1 @@ -Subproject commit 658271fdf4fb497b0665c06544627789844a003a +Subproject commit 23575ba82d00641f8f33851b1d85391da310c378 From 38170c702e06754c10522d07e3ef0e65669f0d11 Mon Sep 17 00:00:00 2001 From: custompal <1040801756@qq.com> Date: Thu, 1 Sep 2022 17:33:36 +0800 Subject: [PATCH 4/6] =?UTF-8?q?=E4=BF=AE=E6=AD=A3gcc4.8=E7=BC=96=E8=AF=91?= =?UTF-8?q?=E9=94=99=E8=AF=AF?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- server/WebApi.cpp | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/server/WebApi.cpp b/server/WebApi.cpp index 4546657d..83409998 100755 --- a/server/WebApi.cpp +++ b/server/WebApi.cpp @@ -753,14 +753,14 @@ void installWebApi() { val["code"] = API::Success; auto &data = val["data"]; for (auto &info : info_list) { - auto obj = reinterpret_pointer_cast(info); + auto obj = static_pointer_cast(info); data.append(std::move(*obj)); } invoker(200, headerOut, val.toStyledString()); }, [](std::shared_ptr &&info) -> std::shared_ptr { auto obj = std::make_shared(); - auto session = reinterpret_pointer_cast(info); + auto session = static_pointer_cast(info); (*obj)["peer_ip"] = session->get_peer_ip(); (*obj)["peer_port"] = session->get_peer_port(); return obj; From d0214a13e1a953c2ff2036635b671dcd42a80581 Mon Sep 17 00:00:00 2001 From: custompal <1040801756@qq.com> Date: Thu, 1 Sep 2022 17:45:06 +0800 Subject: [PATCH 5/6] =?UTF-8?q?=E9=98=B2=E6=AD=A2getPlayerList=E8=BF=94?= =?UTF-8?q?=E5=9B=9E=E7=9A=84json=E6=95=B0=E6=8D=AE=E4=B8=BAnull?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- server/WebApi.cpp | 1 + 1 file changed, 1 insertion(+) diff --git a/server/WebApi.cpp b/server/WebApi.cpp index 83409998..78706e4d 100755 --- a/server/WebApi.cpp +++ b/server/WebApi.cpp @@ -752,6 +752,7 @@ void installWebApi() { [=](const std::list> &info_list) mutable { val["code"] = API::Success; auto &data = val["data"]; + data = Value(arrayValue); for (auto &info : info_list) { auto obj = static_pointer_cast(info); data.append(std::move(*obj)); From 33e1e6b88d0d5996b57ab04182ddb49b8ddb88e9 Mon Sep 17 00:00:00 2001 From: custompal <1040801756@qq.com> Date: Thu, 1 Sep 2022 21:52:43 +0800 Subject: [PATCH 6/6] =?UTF-8?q?getMediaPlayerList=E8=BF=94=E5=9B=9E?= =?UTF-8?q?=E6=92=AD=E6=94=BE=E5=99=A8id=E4=BB=A5=E5=8F=8A=E4=BC=9A?= =?UTF-8?q?=E8=AF=9D=E7=B1=BB=E5=9E=8B=E5=90=8D=E7=A7=B0?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- server/WebApi.cpp | 2 ++ 1 file changed, 2 insertions(+) diff --git a/server/WebApi.cpp b/server/WebApi.cpp index 78706e4d..49f04537 100755 --- a/server/WebApi.cpp +++ b/server/WebApi.cpp @@ -764,6 +764,8 @@ void installWebApi() { auto session = static_pointer_cast(info); (*obj)["peer_ip"] = session->get_peer_ip(); (*obj)["peer_port"] = session->get_peer_port(); + (*obj)["id"] = session->getIdentifier(); + (*obj)["typeid"] = toolkit::demangle(typeid(*session).name()); return obj; }); });