优化rtmp性能

This commit is contained in:
xia-chu 2021-02-04 17:58:51 +08:00
parent 827158af73
commit bc6286553a
19 changed files with 88 additions and 72 deletions

View File

@ -95,9 +95,7 @@ void AACRtmpEncoder::inputFrame(const Frame::Ptr &frame) {
}
if(!_aac_cfg.empty()){
RtmpPacket::Ptr rtmpPkt = ResourcePoolHelper<RtmpPacket>::obtainObj();
rtmpPkt->buffer.clear();
auto rtmpPkt = RtmpPacket::create();
//header
uint8_t is_config = false;
rtmpPkt->buffer.push_back(_audio_flv_flags);
@ -117,8 +115,7 @@ void AACRtmpEncoder::inputFrame(const Frame::Ptr &frame) {
void AACRtmpEncoder::makeAudioConfigPkt() {
_audio_flv_flags = getAudioRtmpFlags(std::make_shared<AACTrack>(_aac_cfg));
RtmpPacket::Ptr rtmpPkt = ResourcePoolHelper<RtmpPacket>::obtainObj();
rtmpPkt->buffer.clear();
auto rtmpPkt = RtmpPacket::create();
//header
uint8_t is_config = true;

View File

@ -47,7 +47,7 @@ private:
/**
* aac adts转Rtmp类
*/
class AACRtmpEncoder : public AACRtmpDecoder , public ResourcePoolHelper<RtmpPacket> {
class AACRtmpEncoder : public AACRtmpDecoder{
public:
typedef std::shared_ptr<AACRtmpEncoder> Ptr;

View File

@ -49,8 +49,7 @@ void CommonRtmpEncoder::inputFrame(const Frame::Ptr &frame) {
if (!_audio_flv_flags) {
return;
}
RtmpPacket::Ptr rtmp = ResourcePoolHelper<RtmpPacket>::obtainObj();
rtmp->buffer.clear();
auto rtmp = RtmpPacket::create();
//header
rtmp->buffer.push_back(_audio_flv_flags);
//data

View File

@ -53,7 +53,7 @@ private:
/**
* rtmp编码类
*/
class CommonRtmpEncoder : public CommonRtmpDecoder , public ResourcePoolHelper<RtmpPacket> {
class CommonRtmpEncoder : public CommonRtmpDecoder {
public:
typedef std::shared_ptr<CommonRtmpEncoder> Ptr;

View File

@ -196,8 +196,7 @@ void H264RtmpEncoder::inputFrame(const Frame::Ptr &frame) {
bool is_config = false;
flags |= (((frame->configFrame() || frame->keyFrame()) ? FLV_KEY_FRAME : FLV_INTER_FRAME) << 4);
_lastPacket = ResourcePoolHelper<RtmpPacket>::obtainObj();
_lastPacket->buffer.clear();
_lastPacket = RtmpPacket::create();
_lastPacket->buffer.push_back(flags);
_lastPacket->buffer.push_back(!is_config);
int32_t cts = frame->pts() - frame->dts();
@ -224,9 +223,7 @@ void H264RtmpEncoder::makeVideoConfigPkt() {
flags |= (FLV_KEY_FRAME << 4);
bool is_config = true;
RtmpPacket::Ptr rtmpPkt = ResourcePoolHelper<RtmpPacket>::obtainObj();
rtmpPkt->buffer.clear();
auto rtmpPkt = RtmpPacket::create();
//header
rtmpPkt->buffer.push_back(flags);
rtmpPkt->buffer.push_back(!is_config);

View File

@ -52,7 +52,7 @@ protected:
/**
* 264 Rtmp打包类
*/
class H264RtmpEncoder : public H264RtmpDecoder, public ResourcePoolHelper<RtmpPacket> {
class H264RtmpEncoder : public H264RtmpDecoder{
public:
typedef std::shared_ptr<H264RtmpEncoder> Ptr;

View File

@ -182,8 +182,7 @@ void H265RtmpEncoder::inputFrame(const Frame::Ptr &frame) {
bool is_config = false;
flags |= (((frame->configFrame() || frame->keyFrame()) ? FLV_KEY_FRAME : FLV_INTER_FRAME) << 4);
_lastPacket = ResourcePoolHelper<RtmpPacket>::obtainObj();
_lastPacket->buffer.clear();
_lastPacket = RtmpPacket::create();
_lastPacket->buffer.push_back(flags);
_lastPacket->buffer.push_back(!is_config);
auto cts = frame->pts() - frame->dts();
@ -208,9 +207,7 @@ void H265RtmpEncoder::makeVideoConfigPkt() {
flags |= (FLV_KEY_FRAME << 4);
bool is_config = true;
RtmpPacket::Ptr rtmpPkt = ResourcePoolHelper<RtmpPacket>::obtainObj();
rtmpPkt->buffer.clear();
auto rtmpPkt = RtmpPacket::create();
//header
rtmpPkt->buffer.push_back(flags);
rtmpPkt->buffer.push_back(!is_config);

View File

@ -50,7 +50,7 @@ protected:
/**
* 265 Rtmp打包类
*/
class H265RtmpEncoder : public H265RtmpDecoder, public ResourcePoolHelper<RtmpPacket> {
class H265RtmpEncoder : public H265RtmpDecoder{
public:
typedef std::shared_ptr<H265RtmpEncoder> Ptr;

View File

@ -131,4 +131,21 @@ void Metadata::addTrack(AMFValue &metadata, const Track::Ptr &track) {
metadata.set(key, value);
});
}
RtmpPacket::Ptr RtmpPacket::create(){
#if 0
static ResourcePool<RtmpPacket> packet_pool;
static onceToken token([]() {
packet_pool.setSize(1024);
});
auto ret = packet_pool.obtain();
ret->clear();
return ret;
#else
auto ret = Ptr(new RtmpPacket);
ret->clear();
return ret;
#endif
}
}//namespace mediakit

View File

@ -128,17 +128,19 @@ public:
class RtmpPacket : public Buffer{
public:
typedef std::shared_ptr<RtmpPacket> Ptr;
bool is_abs_stamp = false;
using Ptr = std::shared_ptr<RtmpPacket>;
bool is_abs_stamp;
uint8_t type_id;
uint32_t time_stamp = 0;
uint32_t ts_field = 0;
uint32_t time_stamp;
uint32_t ts_field;
uint32_t stream_index;
uint32_t chunk_id;
size_t body_size = 0;
size_t body_size;
BufferLikeString buffer;
public:
static Ptr create();
char *data() const override{
return (char*)buffer.data();
}
@ -146,21 +148,12 @@ public:
return buffer.size();
}
public:
RtmpPacket() = default;
RtmpPacket(const RtmpPacket &that) = delete;
RtmpPacket &operator=(const RtmpPacket &that) = delete;
RtmpPacket &operator=(RtmpPacket &&that) = delete;
RtmpPacket(RtmpPacket &&that){
type_id = that.type_id;
body_size = that.body_size;
time_stamp = that.time_stamp;
is_abs_stamp = that.is_abs_stamp;
ts_field = that.ts_field;
stream_index = that.stream_index;
chunk_id = that.chunk_id;
buffer = std::move(that.buffer);
void clear(){
is_abs_stamp = false;
time_stamp = 0;
ts_field = 0;
body_size = 0;
buffer.clear();
}
bool isVideoKeyFrame() const {
@ -214,6 +207,12 @@ public:
const static int channel[] = { 1, 2 };
return channel[flvStereoOrMono];
}
private:
friend class ResourcePool_l<RtmpPacket>;
RtmpPacket(){
clear();
}
};
/**

View File

@ -319,7 +319,7 @@ void RtmpPlayer::onStreamDry(uint32_t stream_index) {
onPlayResult_l(SockException(Err_other, "rtmp stream dry"), true);
}
void RtmpPlayer::onMediaData_l(const RtmpPacket::Ptr &chunk_data) {
void RtmpPlayer::onMediaData_l(RtmpPacket::Ptr chunk_data) {
_rtmp_recv_ticker.resetTime();
if (!_play_timer) {
//已经触发了onPlayResult事件直接触发onMediaData事件
@ -338,8 +338,8 @@ void RtmpPlayer::onMediaData_l(const RtmpPacket::Ptr &chunk_data) {
}
}
void RtmpPlayer::onRtmpChunk(RtmpPacket &chunk_data) {
void RtmpPlayer::onRtmpChunk(RtmpPacket::Ptr packet) {
auto &chunk_data = *packet;
typedef void (RtmpPlayer::*rtmp_func_ptr)(AMFDecoder &dec);
static unordered_map<string, rtmp_func_ptr> s_func_map;
static onceToken token([]() {
@ -379,7 +379,7 @@ void RtmpPlayer::onRtmpChunk(RtmpPacket &chunk_data) {
}
_metadata_got = true;
}
onMediaData_l(std::make_shared<RtmpPacket>(std::move(chunk_data)));
onMediaData_l(std::move(packet));
break;
}

View File

@ -42,12 +42,12 @@ public:
protected:
virtual bool onCheckMeta(const AMFValue &val) = 0;
virtual void onMediaData(const RtmpPacket::Ptr &chunk_data) = 0;
virtual void onMediaData(RtmpPacket::Ptr chunk_data) = 0;
uint32_t getProgressMilliSecond() const;
void seekToMilliSecond(uint32_t ms);
protected:
void onMediaData_l(const RtmpPacket::Ptr &chunk_data);
void onMediaData_l(RtmpPacket::Ptr chunk_data);
//在获取config帧后才触发onPlayResult_l(而不是收到play命令回复)所以此时所有track都初始化完毕了
void onPlayResult_l(const SockException &ex, bool handshake_done);
@ -56,7 +56,7 @@ protected:
void onConnect(const SockException &err) override;
void onErr(const SockException &ex) override;
//from RtmpProtocol
void onRtmpChunk(RtmpPacket &chunk_data) override;
void onRtmpChunk(RtmpPacket::Ptr chunk_data) override;
void onStreamDry(uint32_t stream_index) override;
void onSendRawData(Buffer::Ptr buffer) override {
send(std::move(buffer));

View File

@ -63,19 +63,20 @@ private:
return true;
}
void onMediaData(const RtmpPacket::Ptr &chunkData) override {
if (_rtmp_src) {
if (!_set_meta_data && !chunkData->isCfgFrame()) {
_set_meta_data = true;
_rtmp_src->setMetaData(TitleMeta().getMetadata());
}
_rtmp_src->onWrite(chunkData);
}
void onMediaData(RtmpPacket::Ptr chunkData) override {
if (!_delegate) {
//这个流没有metadata
_delegate.reset(new RtmpDemuxer());
}
_delegate->inputRtmp(chunkData);
if (_rtmp_src) {
if (!_set_meta_data && !chunkData->isCfgFrame()) {
_set_meta_data = true;
_rtmp_src->setMetaData(TitleMeta().getMetadata());
}
_rtmp_src->onWrite(std::move(chunkData));
}
}
private:

View File

@ -13,6 +13,7 @@
#include "Util/util.h"
#include "Util/onceToken.h"
#include "Thread/ThreadPool.h"
#include "RtmpMediaSource.h"
using namespace toolkit;
#define C1_DIGEST_SIZE 32
@ -569,7 +570,11 @@ const char* RtmpProtocol::handle_rtmp(const char *data, size_t len) {
}
RtmpHeader &header = *((RtmpHeader *) (ptr + offset));
auto &chunk_data = _map_chunk_data[_now_chunk_id];
auto &packet_ptr = _map_chunk_data[_now_chunk_id];
if (!packet_ptr) {
packet_ptr = RtmpPacket::create();
}
auto &chunk_data = *packet_ptr;
chunk_data.chunk_id = _now_chunk_id;
switch (header_len) {
case 12:
@ -611,16 +616,17 @@ const char* RtmpProtocol::handle_rtmp(const char *data, size_t len) {
_now_stream_index = chunk_data.stream_index;
chunk_data.time_stamp = time_stamp + (chunk_data.is_abs_stamp ? 0 : chunk_data.time_stamp);
if (chunk_data.body_size) {
handle_chunk(chunk_data);
handle_chunk(std::move(packet_ptr));
} else {
packet_ptr = nullptr;
}
chunk_data.buffer.clear();
chunk_data.is_abs_stamp = false;
}
}
return ptr;
}
void RtmpProtocol::handle_chunk(RtmpPacket& chunk_data) {
void RtmpProtocol::handle_chunk(RtmpPacket::Ptr packet) {
auto &chunk_data = *packet;
switch (chunk_data.type_id) {
case MSG_ACK: {
if (chunk_data.buffer.size() < 4) {
@ -736,20 +742,21 @@ void RtmpProtocol::handle_chunk(RtmpPacket& chunk_data) {
if (ptr + size > ptr_tail) {
break;
}
RtmpPacket sub_packet;
auto sub_packet_ptr = RtmpPacket::create();
auto &sub_packet = *sub_packet_ptr;
sub_packet.buffer.assign((char *)ptr, size);
sub_packet.type_id = type;
sub_packet.body_size = size;
sub_packet.time_stamp = ts;
sub_packet.stream_index = chunk_data.stream_index;
sub_packet.chunk_id = chunk_data.chunk_id;
handle_chunk(sub_packet);
handle_chunk(std::move(sub_packet_ptr));
ptr += size;
}
break;
}
default: onRtmpChunk(chunk_data); break;
default: onRtmpChunk(std::move(packet)); break;
}
}

View File

@ -39,7 +39,7 @@ public:
protected:
virtual void onSendRawData(Buffer::Ptr buffer) = 0;
virtual void onRtmpChunk(RtmpPacket &chunk_data) = 0;
virtual void onRtmpChunk(RtmpPacket::Ptr chunk_data) = 0;
virtual void onStreamBegin(uint32_t stream_index){
_stream_index = stream_index;
}
@ -85,7 +85,7 @@ private:
const char* handle_C0C1(const char *data, size_t len);
const char* handle_C2(const char *data, size_t len);
const char* handle_rtmp(const char *data, size_t len);
void handle_chunk(RtmpPacket &chunk_data);
void handle_chunk(RtmpPacket::Ptr chunk_data);
protected:
int _send_req_id = 0;
@ -108,7 +108,7 @@ private:
//////////Rtmp parser//////////
function<const char * (const char *data, size_t len)> _next_step_func;
////////////Chunk////////////
unordered_map<int, RtmpPacket> _map_chunk_data;
unordered_map<int, RtmpPacket::Ptr> _map_chunk_data;
};
} /* namespace mediakit */

View File

@ -279,7 +279,8 @@ void RtmpPusher::onCmd_onStatus(AMFDecoder &dec) {
}
}
void RtmpPusher::onRtmpChunk(RtmpPacket &chunk_data) {
void RtmpPusher::onRtmpChunk(RtmpPacket::Ptr packet) {
auto &chunk_data = *packet;
switch (chunk_data.type_id) {
case MSG_CMD:
case MSG_CMD3: {

View File

@ -42,7 +42,7 @@ protected:
void onErr(const SockException &ex) override;
//for RtmpProtocol override
void onRtmpChunk(RtmpPacket &chunk_data) override;
void onRtmpChunk(RtmpPacket::Ptr chunk_data) override;
void onSendRawData(Buffer::Ptr buffer) override{
send(std::move(buffer));
}

View File

@ -458,7 +458,8 @@ void RtmpSession::onProcessCmd(AMFDecoder &dec) {
(this->*fun)(dec);
}
void RtmpSession::onRtmpChunk(RtmpPacket &chunk_data) {
void RtmpSession::onRtmpChunk(RtmpPacket::Ptr packet) {
auto &chunk_data = *packet;
switch (chunk_data.type_id) {
case MSG_CMD:
case MSG_CMD3: {
@ -495,7 +496,7 @@ void RtmpSession::onRtmpChunk(RtmpPacket &chunk_data) {
_set_meta_data = true;
_publisher_src->setMetaData(TitleMeta().getMetadata());
}
_publisher_src->onWrite(std::make_shared<RtmpPacket>(std::move(chunk_data)));
_publisher_src->onWrite(std::move(packet));
break;
}

View File

@ -60,7 +60,7 @@ private:
_total_bytes += buffer->size();
send(std::move(buffer));
}
void onRtmpChunk(RtmpPacket &chunk_data) override;
void onRtmpChunk(RtmpPacket::Ptr chunk_data) override;
template<typename first, typename second>
inline void sendReply(const char *str, const first &reply, const second &status) {