diff --git a/src/Extension/AACRtmp.cpp b/src/Extension/AACRtmp.cpp index b028763f..e9bb8699 100644 --- a/src/Extension/AACRtmp.cpp +++ b/src/Extension/AACRtmp.cpp @@ -95,9 +95,7 @@ void AACRtmpEncoder::inputFrame(const Frame::Ptr &frame) { } if(!_aac_cfg.empty()){ - RtmpPacket::Ptr rtmpPkt = ResourcePoolHelper::obtainObj(); - rtmpPkt->buffer.clear(); - + auto rtmpPkt = RtmpPacket::create(); //header uint8_t is_config = false; rtmpPkt->buffer.push_back(_audio_flv_flags); @@ -117,8 +115,7 @@ void AACRtmpEncoder::inputFrame(const Frame::Ptr &frame) { void AACRtmpEncoder::makeAudioConfigPkt() { _audio_flv_flags = getAudioRtmpFlags(std::make_shared(_aac_cfg)); - RtmpPacket::Ptr rtmpPkt = ResourcePoolHelper::obtainObj(); - rtmpPkt->buffer.clear(); + auto rtmpPkt = RtmpPacket::create(); //header uint8_t is_config = true; diff --git a/src/Extension/AACRtmp.h b/src/Extension/AACRtmp.h index e00a9f71..cc574bed 100644 --- a/src/Extension/AACRtmp.h +++ b/src/Extension/AACRtmp.h @@ -47,7 +47,7 @@ private: /** * aac adts转Rtmp类 */ -class AACRtmpEncoder : public AACRtmpDecoder , public ResourcePoolHelper { +class AACRtmpEncoder : public AACRtmpDecoder{ public: typedef std::shared_ptr Ptr; diff --git a/src/Extension/CommonRtmp.cpp b/src/Extension/CommonRtmp.cpp index 858228ca..2a881854 100644 --- a/src/Extension/CommonRtmp.cpp +++ b/src/Extension/CommonRtmp.cpp @@ -49,8 +49,7 @@ void CommonRtmpEncoder::inputFrame(const Frame::Ptr &frame) { if (!_audio_flv_flags) { return; } - RtmpPacket::Ptr rtmp = ResourcePoolHelper::obtainObj(); - rtmp->buffer.clear(); + auto rtmp = RtmpPacket::create(); //header rtmp->buffer.push_back(_audio_flv_flags); //data diff --git a/src/Extension/CommonRtmp.h b/src/Extension/CommonRtmp.h index 48f64d22..465c893f 100644 --- a/src/Extension/CommonRtmp.h +++ b/src/Extension/CommonRtmp.h @@ -53,7 +53,7 @@ private: /** * 通用 rtmp编码类 */ -class CommonRtmpEncoder : public CommonRtmpDecoder , public ResourcePoolHelper { +class CommonRtmpEncoder : public CommonRtmpDecoder { public: typedef std::shared_ptr Ptr; diff --git a/src/Extension/H264Rtmp.cpp b/src/Extension/H264Rtmp.cpp index 98b12abc..97981343 100644 --- a/src/Extension/H264Rtmp.cpp +++ b/src/Extension/H264Rtmp.cpp @@ -196,8 +196,7 @@ void H264RtmpEncoder::inputFrame(const Frame::Ptr &frame) { bool is_config = false; flags |= (((frame->configFrame() || frame->keyFrame()) ? FLV_KEY_FRAME : FLV_INTER_FRAME) << 4); - _lastPacket = ResourcePoolHelper::obtainObj(); - _lastPacket->buffer.clear(); + _lastPacket = RtmpPacket::create(); _lastPacket->buffer.push_back(flags); _lastPacket->buffer.push_back(!is_config); int32_t cts = frame->pts() - frame->dts(); @@ -224,9 +223,7 @@ void H264RtmpEncoder::makeVideoConfigPkt() { flags |= (FLV_KEY_FRAME << 4); bool is_config = true; - RtmpPacket::Ptr rtmpPkt = ResourcePoolHelper::obtainObj(); - rtmpPkt->buffer.clear(); - + auto rtmpPkt = RtmpPacket::create(); //header rtmpPkt->buffer.push_back(flags); rtmpPkt->buffer.push_back(!is_config); diff --git a/src/Extension/H264Rtmp.h b/src/Extension/H264Rtmp.h index 92b37dfc..cc20d98d 100644 --- a/src/Extension/H264Rtmp.h +++ b/src/Extension/H264Rtmp.h @@ -52,7 +52,7 @@ protected: /** * 264 Rtmp打包类 */ -class H264RtmpEncoder : public H264RtmpDecoder, public ResourcePoolHelper { +class H264RtmpEncoder : public H264RtmpDecoder{ public: typedef std::shared_ptr Ptr; diff --git a/src/Extension/H265Rtmp.cpp b/src/Extension/H265Rtmp.cpp index 8b91b059..38842fe7 100644 --- a/src/Extension/H265Rtmp.cpp +++ b/src/Extension/H265Rtmp.cpp @@ -182,8 +182,7 @@ void H265RtmpEncoder::inputFrame(const Frame::Ptr &frame) { bool is_config = false; flags |= (((frame->configFrame() || frame->keyFrame()) ? FLV_KEY_FRAME : FLV_INTER_FRAME) << 4); - _lastPacket = ResourcePoolHelper::obtainObj(); - _lastPacket->buffer.clear(); + _lastPacket = RtmpPacket::create(); _lastPacket->buffer.push_back(flags); _lastPacket->buffer.push_back(!is_config); auto cts = frame->pts() - frame->dts(); @@ -208,9 +207,7 @@ void H265RtmpEncoder::makeVideoConfigPkt() { flags |= (FLV_KEY_FRAME << 4); bool is_config = true; - RtmpPacket::Ptr rtmpPkt = ResourcePoolHelper::obtainObj(); - rtmpPkt->buffer.clear(); - + auto rtmpPkt = RtmpPacket::create(); //header rtmpPkt->buffer.push_back(flags); rtmpPkt->buffer.push_back(!is_config); diff --git a/src/Extension/H265Rtmp.h b/src/Extension/H265Rtmp.h index ef2f96ba..59ba842f 100644 --- a/src/Extension/H265Rtmp.h +++ b/src/Extension/H265Rtmp.h @@ -50,7 +50,7 @@ protected: /** * 265 Rtmp打包类 */ -class H265RtmpEncoder : public H265RtmpDecoder, public ResourcePoolHelper { +class H265RtmpEncoder : public H265RtmpDecoder{ public: typedef std::shared_ptr Ptr; diff --git a/src/Rtmp/Rtmp.cpp b/src/Rtmp/Rtmp.cpp index cd497114..4be4f77d 100644 --- a/src/Rtmp/Rtmp.cpp +++ b/src/Rtmp/Rtmp.cpp @@ -131,4 +131,21 @@ void Metadata::addTrack(AMFValue &metadata, const Track::Ptr &track) { metadata.set(key, value); }); } + +RtmpPacket::Ptr RtmpPacket::create(){ +#if 0 + static ResourcePool packet_pool; + static onceToken token([]() { + packet_pool.setSize(1024); + }); + auto ret = packet_pool.obtain(); + ret->clear(); + return ret; +#else + auto ret = Ptr(new RtmpPacket); + ret->clear(); + return ret; +#endif +} + }//namespace mediakit \ No newline at end of file diff --git a/src/Rtmp/Rtmp.h b/src/Rtmp/Rtmp.h index fa331e79..15eec7f1 100644 --- a/src/Rtmp/Rtmp.h +++ b/src/Rtmp/Rtmp.h @@ -128,17 +128,19 @@ public: class RtmpPacket : public Buffer{ public: - typedef std::shared_ptr Ptr; - bool is_abs_stamp = false; + using Ptr = std::shared_ptr; + bool is_abs_stamp; uint8_t type_id; - uint32_t time_stamp = 0; - uint32_t ts_field = 0; + uint32_t time_stamp; + uint32_t ts_field; uint32_t stream_index; uint32_t chunk_id; - size_t body_size = 0; + size_t body_size; BufferLikeString buffer; public: + static Ptr create(); + char *data() const override{ return (char*)buffer.data(); } @@ -146,21 +148,12 @@ public: return buffer.size(); } -public: - RtmpPacket() = default; - RtmpPacket(const RtmpPacket &that) = delete; - RtmpPacket &operator=(const RtmpPacket &that) = delete; - RtmpPacket &operator=(RtmpPacket &&that) = delete; - - RtmpPacket(RtmpPacket &&that){ - type_id = that.type_id; - body_size = that.body_size; - time_stamp = that.time_stamp; - is_abs_stamp = that.is_abs_stamp; - ts_field = that.ts_field; - stream_index = that.stream_index; - chunk_id = that.chunk_id; - buffer = std::move(that.buffer); + void clear(){ + is_abs_stamp = false; + time_stamp = 0; + ts_field = 0; + body_size = 0; + buffer.clear(); } bool isVideoKeyFrame() const { @@ -214,6 +207,12 @@ public: const static int channel[] = { 1, 2 }; return channel[flvStereoOrMono]; } + +private: + friend class ResourcePool_l; + RtmpPacket(){ + clear(); + } }; /** diff --git a/src/Rtmp/RtmpPlayer.cpp b/src/Rtmp/RtmpPlayer.cpp index 3852780e..30c2ef5a 100644 --- a/src/Rtmp/RtmpPlayer.cpp +++ b/src/Rtmp/RtmpPlayer.cpp @@ -319,7 +319,7 @@ void RtmpPlayer::onStreamDry(uint32_t stream_index) { onPlayResult_l(SockException(Err_other, "rtmp stream dry"), true); } -void RtmpPlayer::onMediaData_l(const RtmpPacket::Ptr &chunk_data) { +void RtmpPlayer::onMediaData_l(RtmpPacket::Ptr chunk_data) { _rtmp_recv_ticker.resetTime(); if (!_play_timer) { //已经触发了onPlayResult事件,直接触发onMediaData事件 @@ -338,8 +338,8 @@ void RtmpPlayer::onMediaData_l(const RtmpPacket::Ptr &chunk_data) { } } - -void RtmpPlayer::onRtmpChunk(RtmpPacket &chunk_data) { +void RtmpPlayer::onRtmpChunk(RtmpPacket::Ptr packet) { + auto &chunk_data = *packet; typedef void (RtmpPlayer::*rtmp_func_ptr)(AMFDecoder &dec); static unordered_map s_func_map; static onceToken token([]() { @@ -379,7 +379,7 @@ void RtmpPlayer::onRtmpChunk(RtmpPacket &chunk_data) { } _metadata_got = true; } - onMediaData_l(std::make_shared(std::move(chunk_data))); + onMediaData_l(std::move(packet)); break; } diff --git a/src/Rtmp/RtmpPlayer.h b/src/Rtmp/RtmpPlayer.h index 81a5ef19..fd24560a 100644 --- a/src/Rtmp/RtmpPlayer.h +++ b/src/Rtmp/RtmpPlayer.h @@ -42,12 +42,12 @@ public: protected: virtual bool onCheckMeta(const AMFValue &val) = 0; - virtual void onMediaData(const RtmpPacket::Ptr &chunk_data) = 0; + virtual void onMediaData(RtmpPacket::Ptr chunk_data) = 0; uint32_t getProgressMilliSecond() const; void seekToMilliSecond(uint32_t ms); protected: - void onMediaData_l(const RtmpPacket::Ptr &chunk_data); + void onMediaData_l(RtmpPacket::Ptr chunk_data); //在获取config帧后才触发onPlayResult_l(而不是收到play命令回复),所以此时所有track都初始化完毕了 void onPlayResult_l(const SockException &ex, bool handshake_done); @@ -56,7 +56,7 @@ protected: void onConnect(const SockException &err) override; void onErr(const SockException &ex) override; //from RtmpProtocol - void onRtmpChunk(RtmpPacket &chunk_data) override; + void onRtmpChunk(RtmpPacket::Ptr chunk_data) override; void onStreamDry(uint32_t stream_index) override; void onSendRawData(Buffer::Ptr buffer) override { send(std::move(buffer)); diff --git a/src/Rtmp/RtmpPlayerImp.h b/src/Rtmp/RtmpPlayerImp.h index af243acd..393b0197 100644 --- a/src/Rtmp/RtmpPlayerImp.h +++ b/src/Rtmp/RtmpPlayerImp.h @@ -63,19 +63,20 @@ private: return true; } - void onMediaData(const RtmpPacket::Ptr &chunkData) override { - if (_rtmp_src) { - if (!_set_meta_data && !chunkData->isCfgFrame()) { - _set_meta_data = true; - _rtmp_src->setMetaData(TitleMeta().getMetadata()); - } - _rtmp_src->onWrite(chunkData); - } + void onMediaData(RtmpPacket::Ptr chunkData) override { if (!_delegate) { //这个流没有metadata _delegate.reset(new RtmpDemuxer()); } _delegate->inputRtmp(chunkData); + + if (_rtmp_src) { + if (!_set_meta_data && !chunkData->isCfgFrame()) { + _set_meta_data = true; + _rtmp_src->setMetaData(TitleMeta().getMetadata()); + } + _rtmp_src->onWrite(std::move(chunkData)); + } } private: diff --git a/src/Rtmp/RtmpProtocol.cpp b/src/Rtmp/RtmpProtocol.cpp index f92ba994..ef66d7d2 100644 --- a/src/Rtmp/RtmpProtocol.cpp +++ b/src/Rtmp/RtmpProtocol.cpp @@ -13,6 +13,7 @@ #include "Util/util.h" #include "Util/onceToken.h" #include "Thread/ThreadPool.h" +#include "RtmpMediaSource.h" using namespace toolkit; #define C1_DIGEST_SIZE 32 @@ -569,7 +570,11 @@ const char* RtmpProtocol::handle_rtmp(const char *data, size_t len) { } RtmpHeader &header = *((RtmpHeader *) (ptr + offset)); - auto &chunk_data = _map_chunk_data[_now_chunk_id]; + auto &packet_ptr = _map_chunk_data[_now_chunk_id]; + if (!packet_ptr) { + packet_ptr = RtmpPacket::create(); + } + auto &chunk_data = *packet_ptr; chunk_data.chunk_id = _now_chunk_id; switch (header_len) { case 12: @@ -611,16 +616,17 @@ const char* RtmpProtocol::handle_rtmp(const char *data, size_t len) { _now_stream_index = chunk_data.stream_index; chunk_data.time_stamp = time_stamp + (chunk_data.is_abs_stamp ? 0 : chunk_data.time_stamp); if (chunk_data.body_size) { - handle_chunk(chunk_data); + handle_chunk(std::move(packet_ptr)); + } else { + packet_ptr = nullptr; } - chunk_data.buffer.clear(); - chunk_data.is_abs_stamp = false; } } return ptr; } -void RtmpProtocol::handle_chunk(RtmpPacket& chunk_data) { +void RtmpProtocol::handle_chunk(RtmpPacket::Ptr packet) { + auto &chunk_data = *packet; switch (chunk_data.type_id) { case MSG_ACK: { if (chunk_data.buffer.size() < 4) { @@ -736,20 +742,21 @@ void RtmpProtocol::handle_chunk(RtmpPacket& chunk_data) { if (ptr + size > ptr_tail) { break; } - RtmpPacket sub_packet; + auto sub_packet_ptr = RtmpPacket::create(); + auto &sub_packet = *sub_packet_ptr; sub_packet.buffer.assign((char *)ptr, size); sub_packet.type_id = type; sub_packet.body_size = size; sub_packet.time_stamp = ts; sub_packet.stream_index = chunk_data.stream_index; sub_packet.chunk_id = chunk_data.chunk_id; - handle_chunk(sub_packet); + handle_chunk(std::move(sub_packet_ptr)); ptr += size; } break; } - default: onRtmpChunk(chunk_data); break; + default: onRtmpChunk(std::move(packet)); break; } } diff --git a/src/Rtmp/RtmpProtocol.h b/src/Rtmp/RtmpProtocol.h index cdfbc59b..f6beff7d 100644 --- a/src/Rtmp/RtmpProtocol.h +++ b/src/Rtmp/RtmpProtocol.h @@ -39,7 +39,7 @@ public: protected: virtual void onSendRawData(Buffer::Ptr buffer) = 0; - virtual void onRtmpChunk(RtmpPacket &chunk_data) = 0; + virtual void onRtmpChunk(RtmpPacket::Ptr chunk_data) = 0; virtual void onStreamBegin(uint32_t stream_index){ _stream_index = stream_index; } @@ -85,7 +85,7 @@ private: const char* handle_C0C1(const char *data, size_t len); const char* handle_C2(const char *data, size_t len); const char* handle_rtmp(const char *data, size_t len); - void handle_chunk(RtmpPacket &chunk_data); + void handle_chunk(RtmpPacket::Ptr chunk_data); protected: int _send_req_id = 0; @@ -108,7 +108,7 @@ private: //////////Rtmp parser////////// function _next_step_func; ////////////Chunk//////////// - unordered_map _map_chunk_data; + unordered_map _map_chunk_data; }; } /* namespace mediakit */ diff --git a/src/Rtmp/RtmpPusher.cpp b/src/Rtmp/RtmpPusher.cpp index 1267d49b..bb0cf6fb 100644 --- a/src/Rtmp/RtmpPusher.cpp +++ b/src/Rtmp/RtmpPusher.cpp @@ -279,7 +279,8 @@ void RtmpPusher::onCmd_onStatus(AMFDecoder &dec) { } } -void RtmpPusher::onRtmpChunk(RtmpPacket &chunk_data) { +void RtmpPusher::onRtmpChunk(RtmpPacket::Ptr packet) { + auto &chunk_data = *packet; switch (chunk_data.type_id) { case MSG_CMD: case MSG_CMD3: { diff --git a/src/Rtmp/RtmpPusher.h b/src/Rtmp/RtmpPusher.h index 8e237911..0e5428be 100644 --- a/src/Rtmp/RtmpPusher.h +++ b/src/Rtmp/RtmpPusher.h @@ -42,7 +42,7 @@ protected: void onErr(const SockException &ex) override; //for RtmpProtocol override - void onRtmpChunk(RtmpPacket &chunk_data) override; + void onRtmpChunk(RtmpPacket::Ptr chunk_data) override; void onSendRawData(Buffer::Ptr buffer) override{ send(std::move(buffer)); } diff --git a/src/Rtmp/RtmpSession.cpp b/src/Rtmp/RtmpSession.cpp index bad3d8cf..0c4f1bb0 100644 --- a/src/Rtmp/RtmpSession.cpp +++ b/src/Rtmp/RtmpSession.cpp @@ -458,7 +458,8 @@ void RtmpSession::onProcessCmd(AMFDecoder &dec) { (this->*fun)(dec); } -void RtmpSession::onRtmpChunk(RtmpPacket &chunk_data) { +void RtmpSession::onRtmpChunk(RtmpPacket::Ptr packet) { + auto &chunk_data = *packet; switch (chunk_data.type_id) { case MSG_CMD: case MSG_CMD3: { @@ -495,7 +496,7 @@ void RtmpSession::onRtmpChunk(RtmpPacket &chunk_data) { _set_meta_data = true; _publisher_src->setMetaData(TitleMeta().getMetadata()); } - _publisher_src->onWrite(std::make_shared(std::move(chunk_data))); + _publisher_src->onWrite(std::move(packet)); break; } diff --git a/src/Rtmp/RtmpSession.h b/src/Rtmp/RtmpSession.h index 1ecec693..0939ee80 100644 --- a/src/Rtmp/RtmpSession.h +++ b/src/Rtmp/RtmpSession.h @@ -60,7 +60,7 @@ private: _total_bytes += buffer->size(); send(std::move(buffer)); } - void onRtmpChunk(RtmpPacket &chunk_data) override; + void onRtmpChunk(RtmpPacket::Ptr chunk_data) override; template inline void sendReply(const char *str, const first &reply, const second &status) {