diff --git a/3rdpart/ZLToolKit b/3rdpart/ZLToolKit index d4d4ac1e..17c445fd 160000 --- a/3rdpart/ZLToolKit +++ b/3rdpart/ZLToolKit @@ -1 +1 @@ -Subproject commit d4d4ac1e9866ef4d025b8189264efe5ec8fa201c +Subproject commit 17c445fd1f42cddeeccdc6a06aa56cf2ffcfc99f diff --git a/src/Extension/Frame.h b/src/Extension/Frame.h index 8f2ab8a3..a967392c 100644 --- a/src/Extension/Frame.h +++ b/src/Extension/Frame.h @@ -161,7 +161,7 @@ public: public: CodecId _codec_id = CodecInvalid; - string _buffer; + BufferLikeString _buffer; uint32_t _dts = 0; uint32_t _pts = 0; uint32_t _prefix_size = 0; diff --git a/src/Http/HttpChunkedSplitter.cpp b/src/Http/HttpChunkedSplitter.cpp index e99c4f98..fd074491 100644 --- a/src/Http/HttpChunkedSplitter.cpp +++ b/src/Http/HttpChunkedSplitter.cpp @@ -13,7 +13,7 @@ namespace mediakit{ -const char *HttpChunkedSplitter::onSearchPacketTail(const char *data, int len) { +const char *HttpChunkedSplitter::onSearchPacketTail(const char *data, uint64_t len) { auto pos = strstr(data,"\r\n"); if(!pos){ return nullptr; diff --git a/src/Http/HttpChunkedSplitter.h b/src/Http/HttpChunkedSplitter.h index 5368c4da..5ba586a4 100644 --- a/src/Http/HttpChunkedSplitter.h +++ b/src/Http/HttpChunkedSplitter.h @@ -30,7 +30,7 @@ public: protected: int64_t onRecvHeader(const char *data,uint64_t len) override; void onRecvContent(const char *data,uint64_t len) override; - const char *onSearchPacketTail(const char *data,int len) override; + const char *onSearchPacketTail(const char *data,uint64_t len) override; protected: virtual void onRecvChunk(const char *data,uint64_t len){ if(_onChunkData){ diff --git a/src/Http/HttpClient.cpp b/src/Http/HttpClient.cpp index 27549e6a..706bd294 100644 --- a/src/Http/HttpClient.cpp +++ b/src/Http/HttpClient.cpp @@ -99,9 +99,6 @@ void HttpClient::onConnect(const SockException &ex) { return; } - //先假设http客户端只会接收一点点数据(只接受http头,节省内存) - getSock()->setReadBuffer(std::make_shared(1 * 1024)); - _totalBodySize = 0; _recvedBodySize = 0; HttpRequestSplitter::reset(); @@ -156,9 +153,6 @@ int64_t HttpClient::onRecvHeader(const char *data, uint64_t len) { } if(_parser["Transfer-Encoding"] == "chunked"){ - //我们认为这种情况下后面应该有大量的数据过来,加大接收缓存提高性能 - getSock()->setReadBuffer(std::make_shared(256 * 1024)); - //如果Transfer-Encoding字段等于chunked,则认为后续的content是不限制长度的 _totalBodySize = -1; _chunkedSplitter = std::make_shared([this](const char *data,uint64_t len){ @@ -183,13 +177,6 @@ int64_t HttpClient::onRecvHeader(const char *data, uint64_t len) { //但是由于我们没必要等content接收完毕才回调onRecvContent(因为这样浪费内存并且要多次拷贝数据) //所以返回-1代表我们接下来分段接收content _recvedBodySize = 0; - if(_totalBodySize > 0){ - //根据_totalBodySize设置接收缓存大小 - getSock()->setReadBuffer(std::make_shared(MIN(_totalBodySize + 1,256 * 1024))); - }else{ - getSock()->setReadBuffer(std::make_shared(256 * 1024)); - } - return -1; } diff --git a/src/Http/HttpRequestSplitter.cpp b/src/Http/HttpRequestSplitter.cpp index 44a2f040..160d7243 100644 --- a/src/Http/HttpRequestSplitter.cpp +++ b/src/Http/HttpRequestSplitter.cpp @@ -39,6 +39,9 @@ void HttpRequestSplitter::input(const char *data,uint64_t len) { const char *index = nullptr; _remain_data_size = len; while (_content_len == 0 && _remain_data_size > 0 && (index = onSearchPacketTail(ptr,_remain_data_size)) != nullptr) { + if(index == ptr){ + break; + } //_content_len == 0,这是请求头 const char *header_ptr = ptr; int64_t header_size = index - ptr; @@ -61,8 +64,7 @@ void HttpRequestSplitter::input(const char *data,uint64_t len) { if(_content_len == 0){ //尚未找到http头,缓存定位到剩余数据部分 - string str(ptr,_remain_data_size); - _remain_data = str; + _remain_data.assign(ptr,_remain_data_size); return; } @@ -71,8 +73,7 @@ void HttpRequestSplitter::input(const char *data,uint64_t len) { //数据按照固定长度content处理 if(_remain_data_size < _content_len){ //数据不够,缓存定位到剩余数据部分 - string str(ptr,_remain_data_size); - _remain_data = str; + _remain_data.assign(ptr, _remain_data_size); return; } //收到content数据,并且接受content完毕 @@ -85,9 +86,7 @@ void HttpRequestSplitter::input(const char *data,uint64_t len) { if(_remain_data_size > 0){ //还有数据没有处理完毕 - string str(ptr,_remain_data_size); - _remain_data = str; - + _remain_data.assign(ptr,_remain_data_size); data = ptr = (char *)_remain_data.data(); len = _remain_data.size(); goto splitPacket; @@ -112,7 +111,7 @@ void HttpRequestSplitter::reset() { _remain_data.clear(); } -const char *HttpRequestSplitter::onSearchPacketTail(const char *data,int len) { +const char *HttpRequestSplitter::onSearchPacketTail(const char *data,uint64_t len) { auto pos = strstr(data,"\r\n\r\n"); if(pos == nullptr){ return nullptr; diff --git a/src/Http/HttpRequestSplitter.h b/src/Http/HttpRequestSplitter.h index 4359e130..d832a230 100644 --- a/src/Http/HttpRequestSplitter.h +++ b/src/Http/HttpRequestSplitter.h @@ -12,7 +12,9 @@ #define ZLMEDIAKIT_HTTPREQUESTSPLITTER_H #include +#include "Network/Buffer.h" using namespace std; +using namespace toolkit; namespace mediakit { @@ -54,7 +56,7 @@ protected: * @param len 数据长度 * @return nullptr代表未找到包位,否则返回包尾指针 */ - virtual const char *onSearchPacketTail(const char *data,int len); + virtual const char *onSearchPacketTail(const char *data, uint64_t len); /** * 设置content len @@ -71,7 +73,7 @@ protected: */ int64_t remainDataSize(); private: - string _remain_data; + BufferLikeString _remain_data; int64_t _content_len = 0; int64_t _remain_data_size = 0; }; diff --git a/src/Http/HttpSession.cpp b/src/Http/HttpSession.cpp index ba996d93..2d9740cc 100644 --- a/src/Http/HttpSession.cpp +++ b/src/Http/HttpSession.cpp @@ -24,8 +24,6 @@ HttpSession::HttpSession(const Socket::Ptr &pSock) : TcpSession(pSock) { TraceP(this); GET_CONFIG(uint32_t,keep_alive_sec,Http::kKeepAliveSecond); pSock->setSendTimeOutSecond(keep_alive_sec); - //起始接收buffer缓存设置为4K,节省内存 - pSock->setReadBuffer(std::make_shared(4 * 1024)); } HttpSession::~HttpSession() { @@ -638,14 +636,6 @@ void HttpSession::Handle_Req_POST(int64_t &content_len) { return; } - //根据Content-Length设置接收缓存大小 - if(totalContentLen > 0){ - getSock()->setReadBuffer(std::make_shared(MIN(totalContentLen + 1,256 * 1024))); - }else{ - //不定长度的Content-Length - getSock()->setReadBuffer(std::make_shared(256 * 1024)); - } - if(totalContentLen > 0 && totalContentLen < maxReqSize ){ //返回固定长度的content content_len = totalContentLen; @@ -738,9 +728,9 @@ void HttpSession::onWrite(const Buffer::Ptr &buffer, bool flush) { } } -void HttpSession::onWebSocketEncodeData(const Buffer::Ptr &buffer){ +void HttpSession::onWebSocketEncodeData(Buffer::Ptr buffer){ _total_bytes_usage += buffer->size(); - send(buffer); + send(std::move(buffer)); } void HttpSession::onWebSocketDecodeComplete(const WebSocketHeader &header_in){ diff --git a/src/Http/HttpSession.h b/src/Http/HttpSession.h index f1df5a8d..d9d2047c 100644 --- a/src/Http/HttpSession.h +++ b/src/Http/HttpSession.h @@ -92,7 +92,7 @@ protected: * 发送数据进行websocket协议打包后回调 * @param buffer websocket协议数据 */ - void onWebSocketEncodeData(const Buffer::Ptr &buffer) override; + void onWebSocketEncodeData(Buffer::Ptr buffer) override; /** * 接收到完整的一个webSocket数据包后回调 diff --git a/src/Http/WebSocketClient.h b/src/Http/WebSocketClient.h index 4ecce5cb..17a0febc 100644 --- a/src/Http/WebSocketClient.h +++ b/src/Http/WebSocketClient.h @@ -43,11 +43,11 @@ protected: /** * 发送前拦截并打包为websocket协议 */ - int send(const Buffer::Ptr &buf) override{ + int send(Buffer::Ptr buf) override{ if(_beforeSendCB){ return _beforeSendCB(buf); } - return ClientType::send(buf); + return ClientType::send(std::move(buf)); } /** @@ -287,8 +287,8 @@ protected: * @param ptr 数据指针 * @param len 数据指针长度 */ - void onWebSocketEncodeData(const Buffer::Ptr &buffer) override{ - HttpClientImp::send(buffer); + void onWebSocketEncodeData(Buffer::Ptr buffer) override{ + HttpClientImp::send(std::move(buffer)); } private: diff --git a/src/Http/WebSocketSession.h b/src/Http/WebSocketSession.h index 69b691f1..0a8d7163 100644 --- a/src/Http/WebSocketSession.h +++ b/src/Http/WebSocketSession.h @@ -53,11 +53,11 @@ protected: * @param buf 需要截取的数据 * @return 数据字节数 */ - int send(const Buffer::Ptr &buf) override { + int send(Buffer::Ptr buf) override { if (_beforeSendCB) { return _beforeSendCB(buf); } - return TcpSessionType::send(buf); + return TcpSessionType::send(std::move(buf)); } string getIdentifier() const override { @@ -219,8 +219,8 @@ protected: /** * 发送数据进行websocket协议打包后回调 */ - void onWebSocketEncodeData(const Buffer::Ptr &buffer) override{ - HttpSessionType::send(buffer); + void onWebSocketEncodeData(Buffer::Ptr buffer) override{ + HttpSessionType::send(std::move(buffer)); } private: diff --git a/src/Http/WebSocketSplitter.h b/src/Http/WebSocketSplitter.h index e241406c..02d5d2eb 100644 --- a/src/Http/WebSocketSplitter.h +++ b/src/Http/WebSocketSplitter.h @@ -132,7 +132,7 @@ protected: * @param ptr 数据指针 * @param len 数据指针长度 */ - virtual void onWebSocketEncodeData(const Buffer::Ptr &buffer){}; + virtual void onWebSocketEncodeData(Buffer::Ptr buffer){}; private: void onPayloadData(uint8_t *data, uint64_t len); diff --git a/src/Record/MP4Muxer.cpp b/src/Record/MP4Muxer.cpp index 471b6e09..f5b79032 100644 --- a/src/Record/MP4Muxer.cpp +++ b/src/Record/MP4Muxer.cpp @@ -101,7 +101,8 @@ void MP4MuxerInterface::inputFrame(const Frame::Ptr &frame) { if (_frameCached.size() != 1) { //缓存中有多帧,需要按照mp4格式合并一起 - string merged; + BufferLikeString merged; + merged.reserve(back->size() + 1024); _frameCached.for_each([&](const Frame::Ptr &frame) { uint32_t nalu_size = frame->size() - frame->prefixSize(); nalu_size = htonl(nalu_size); diff --git a/src/Record/TsMuxer.cpp b/src/Record/TsMuxer.cpp index f6e64fc2..84273fc6 100644 --- a/src/Record/TsMuxer.cpp +++ b/src/Record/TsMuxer.cpp @@ -107,7 +107,8 @@ void TsMuxer::inputFrame(const Frame::Ptr &frame) { Frame::Ptr back = _frameCached.back(); Buffer::Ptr merged_frame = back; if (_frameCached.size() != 1) { - string merged; + BufferLikeString merged; + merged.reserve(back->size() + 1024); _frameCached.for_each([&](const Frame::Ptr &frame) { if (frame->prefixSize()) { merged.append(frame->data(), frame->size()); @@ -119,7 +120,7 @@ void TsMuxer::inputFrame(const Frame::Ptr &frame) { _is_idr_fast_packet = true; } }); - merged_frame = std::make_shared(std::move(merged)); + merged_frame = std::make_shared >(std::move(merged)); } track_info.stamp.revise(back->dts(), back->pts(), dts_out, pts_out); //取视频时间戳为TS的时间戳 diff --git a/src/Rtmp/Rtmp.h b/src/Rtmp/Rtmp.h index 0d2e5fb3..1f8d1e3e 100644 --- a/src/Rtmp/Rtmp.h +++ b/src/Rtmp/Rtmp.h @@ -136,7 +136,7 @@ public: uint32_t ts_field = 0; uint32_t stream_index; uint32_t chunk_id; - std::string buffer; + BufferLikeString buffer; public: char *data() const override{ diff --git a/src/Rtmp/RtmpPlayer.h b/src/Rtmp/RtmpPlayer.h index 08c0bde0..4a712a67 100644 --- a/src/Rtmp/RtmpPlayer.h +++ b/src/Rtmp/RtmpPlayer.h @@ -58,8 +58,8 @@ protected: //from RtmpProtocol void onRtmpChunk(RtmpPacket &chunk_data) override; void onStreamDry(uint32_t stream_index) override; - void onSendRawData(const Buffer::Ptr &buffer) override { - send(buffer); + void onSendRawData(Buffer::Ptr buffer) override { + send(std::move(buffer)); } template diff --git a/src/Rtmp/RtmpProtocol.cpp b/src/Rtmp/RtmpProtocol.cpp index 8c658bc1..55f45092 100644 --- a/src/Rtmp/RtmpProtocol.cpp +++ b/src/Rtmp/RtmpProtocol.cpp @@ -57,8 +57,8 @@ static string openssl_HMACsha256(const void *key, unsigned int key_len, const vo namespace mediakit { RtmpProtocol::RtmpProtocol() { - _next_step_func = [this]() { - handle_C0C1(); + _next_step_func = [this](const char *data, uint64_t len) { + return handle_C0C1(data, len); }; } @@ -84,10 +84,10 @@ void RtmpProtocol::reset() { //////////Invoke Request////////// _send_req_id = 0; //////////Rtmp parser////////// - _recv_data_buf.clear(); + HttpRequestSplitter::reset(); _stream_index = STREAM_CONTROL; - _next_step_func = [this]() { - handle_C0C1(); + _next_step_func = [this](const char *data, uint64_t len) { + return handle_C0C1(data, len); }; } @@ -218,7 +218,7 @@ void RtmpProtocol::sendRtmp(uint8_t type, uint32_t stream_index, const Buffer::P set_be24(header->body_size, buf->size()); set_le32(header->stream_index, stream_index); //发送rtmp头 - onSendRawData(buffer_header); + onSendRawData(std::move(buffer_header)); //扩展时间戳字段 BufferRaw::Ptr buffer_ext_stamp; @@ -260,17 +260,20 @@ void RtmpProtocol::sendRtmp(uint8_t type, uint32_t stream_index, const Buffer::P } } -void RtmpProtocol::onParseRtmp(const char *data, int size) { - _recv_data_buf.append(data, size); - //移动拷贝提高性能 - function next_step_func(std::move(_next_step_func)); - //执行下一步 - next_step_func(); +void RtmpProtocol::onParseRtmp(const char *data, uint64_t size) { + input(data, size); +} +const char *RtmpProtocol::onSearchPacketTail(const char *data,uint64_t len){ + //移动拷贝提高性能 + auto next_step_func(std::move(_next_step_func)); + //执行下一步 + auto ret = next_step_func(data, len); if (!_next_step_func) { //为设置下一步,恢复之 next_step_func.swap(_next_step_func); } + return ret; } ////for client//// @@ -280,57 +283,57 @@ void RtmpProtocol::startClientSession(const function &func) { onSendRawData(obtainBuffer(&handshake_head, 1)); RtmpHandshake c1(0); onSendRawData(obtainBuffer((char *) (&c1), sizeof(c1))); - _next_step_func = [this, func]() { + _next_step_func = [this, func](const char *data, uint64_t len) { //等待 S0+S1+S2 - handle_S0S1S2(func); + return handle_S0S1S2(data, len, func); }; } -void RtmpProtocol::handle_S0S1S2(const function &func) { - if (_recv_data_buf.size() < 1 + 2 * C1_HANDSHARK_SIZE) { +const char* RtmpProtocol::handle_S0S1S2(const char *data, uint64_t len, const function &func) { + if (len < 1 + 2 * C1_HANDSHARK_SIZE) { //数据不够 - return; + return nullptr; } - if (_recv_data_buf[0] != HANDSHAKE_PLAINTEXT) { + if (data[0] != HANDSHAKE_PLAINTEXT) { throw std::runtime_error("only plaintext[0x03] handshake supported"); } //发送 C2 - const char *pcC2 = _recv_data_buf.data() + 1; + const char *pcC2 = data + 1; onSendRawData(obtainBuffer(pcC2, C1_HANDSHARK_SIZE)); - _recv_data_buf.erase(0, 1 + 2 * C1_HANDSHARK_SIZE); //握手结束 - _next_step_func = [this]() { + _next_step_func = [this](const char *data, uint64_t len) { //握手结束并且开始进入解析命令模式 - handle_rtmp(); + return handle_rtmp(data, len); }; func(); + return data + 1 + 2 * C1_HANDSHARK_SIZE; } ////for server //// -void RtmpProtocol::handle_C0C1() { - if (_recv_data_buf.size() < 1 + C1_HANDSHARK_SIZE) { +const char * RtmpProtocol::handle_C0C1(const char *data, uint64_t len) { + if (len < 1 + C1_HANDSHARK_SIZE) { //need more data! - return; + return nullptr; } - if (_recv_data_buf[0] != HANDSHAKE_PLAINTEXT) { + if (data[0] != HANDSHAKE_PLAINTEXT) { throw std::runtime_error("only plaintext[0x03] handshake supported"); } - if (memcmp(_recv_data_buf.data() + 5, "\x00\x00\x00\x00", 4) == 0) { + if (memcmp(data + 5, "\x00\x00\x00\x00", 4) == 0) { //simple handsharke - handle_C1_simple(); + handle_C1_simple(data); } else { #ifdef ENABLE_OPENSSL //complex handsharke - handle_C1_complex(); + handle_C1_complex(data); #else WarnL << "未打开ENABLE_OPENSSL宏,复杂握手采用简单方式处理,flash播放器可能无法播放!"; - handle_C1_simple(); + handle_C1_simple(data); #endif//ENABLE_OPENSSL } - _recv_data_buf.erase(0, 1 + C1_HANDSHARK_SIZE); + return data + 1 + C1_HANDSHARK_SIZE; } -void RtmpProtocol::handle_C1_simple(){ +void RtmpProtocol::handle_C1_simple(const char *data){ //发送S0 char handshake_head = HANDSHAKE_PLAINTEXT; onSendRawData(obtainBuffer(&handshake_head, 1)); @@ -338,18 +341,19 @@ void RtmpProtocol::handle_C1_simple(){ RtmpHandshake s1(0); onSendRawData(obtainBuffer((char *) &s1, C1_HANDSHARK_SIZE)); //发送S2 - onSendRawData(obtainBuffer(_recv_data_buf.data() + 1, C1_HANDSHARK_SIZE)); + onSendRawData(obtainBuffer(data + 1, C1_HANDSHARK_SIZE)); //等待C2 - _next_step_func = [this]() { - handle_C2(); + _next_step_func = [this](const char *data, uint64_t len) { + //握手结束并且开始进入解析命令模式 + return handle_C2(data, len); }; } #ifdef ENABLE_OPENSSL -void RtmpProtocol::handle_C1_complex(){ +void RtmpProtocol::handle_C1_complex(const char *data){ //参考自:http://blog.csdn.net/win_lin/article/details/13006803 //skip c0,time,version - const char *c1_start = _recv_data_buf.data() + 1; + const char *c1_start = data + 1; const char *schema_start = c1_start + 8; char *digest_start; try { @@ -385,7 +389,7 @@ void RtmpProtocol::handle_C1_complex(){ // InfoL << "schema1"; } catch (std::exception &ex) { // WarnL << "try rtmp complex schema1 failed:" << ex.what(); - handle_C1_simple(); + handle_C1_simple(data); } } } @@ -502,44 +506,43 @@ void RtmpProtocol::send_complex_S0S1S2(int schemeType,const string &digest){ memcpy((char *) &s2 + C1_HANDSHARK_SIZE - C1_DIGEST_SIZE, s2_digest.data(), C1_DIGEST_SIZE); onSendRawData(obtainBuffer((char *) &s2, sizeof(s2))); //等待C2 - _next_step_func = [this]() { - handle_C2(); + _next_step_func = [this](const char *data, uint64_t len) { + return handle_C2(data, len); }; } #endif //ENABLE_OPENSSL -void RtmpProtocol::handle_C2() { - if (_recv_data_buf.size() < C1_HANDSHARK_SIZE) { +const char* RtmpProtocol::handle_C2(const char *data, uint64_t len) { + if (len < C1_HANDSHARK_SIZE) { //need more data! - return; + return nullptr; } - _recv_data_buf.erase(0, C1_HANDSHARK_SIZE); - //握手结束,进入命令模式 - if (!_recv_data_buf.empty()) { - handle_rtmp(); - } - _next_step_func = [this]() { - handle_rtmp(); + _next_step_func = [this](const char *data, uint64_t len) { + return handle_rtmp(data, len); }; + + //握手结束,进入命令模式 + return handle_rtmp(data + C1_HANDSHARK_SIZE, len - C1_HANDSHARK_SIZE); } static const size_t HEADER_LENGTH[] = {12, 8, 4, 1}; -void RtmpProtocol::handle_rtmp() { - while (!_recv_data_buf.empty()) { +const char* RtmpProtocol::handle_rtmp(const char *data, uint64_t len) { + auto ptr = data; + while (len) { int offset = 0; - uint8_t flags = _recv_data_buf[0]; + uint8_t flags = ptr[0]; size_t header_len = HEADER_LENGTH[flags >> 6]; _now_chunk_id = flags & 0x3f; switch (_now_chunk_id) { case 0: { //0 值表示二字节形式,并且 ID 范围 64 - 319 //(第二个字节 + 64)。 - if (_recv_data_buf.size() < 2) { + if (len < 2) { //need more data - return; + return ptr; } - _now_chunk_id = 64 + (uint8_t) (_recv_data_buf[1]); + _now_chunk_id = 64 + (uint8_t) (ptr[1]); offset = 1; break; } @@ -547,11 +550,11 @@ void RtmpProtocol::handle_rtmp() { case 1: { //1 值表示三字节形式,并且 ID 范围为 64 - 65599 //((第三个字节) * 256 + 第二个字节 + 64)。 - if (_recv_data_buf.size() < 3) { + if (len < 3) { //need more data - return; + return ptr; } - _now_chunk_id = 64 + ((uint8_t) (_recv_data_buf[2]) << 8) + (uint8_t) (_recv_data_buf[1]); + _now_chunk_id = 64 + ((uint8_t) (ptr[2]) << 8) + (uint8_t) (ptr[1]); offset = 2; break; } @@ -560,12 +563,12 @@ void RtmpProtocol::handle_rtmp() { default : break; } - if (_recv_data_buf.size() < header_len + offset) { + if (len < header_len + offset) { //need more data - return; + return ptr; } - RtmpHeader &header = *((RtmpHeader *) (_recv_data_buf.data() + offset)); + RtmpHeader &header = *((RtmpHeader *) (ptr + offset)); auto &chunk_data = _map_chunk_data[_now_chunk_id]; chunk_data.chunk_id = _now_chunk_id; switch (header_len) { @@ -581,11 +584,11 @@ void RtmpProtocol::handle_rtmp() { auto time_stamp = chunk_data.ts_field; if (chunk_data.ts_field == 0xFFFFFF) { - if (_recv_data_buf.size() < header_len + offset + 4) { + if (len < header_len + offset + 4) { //need more data - return; + return ptr; } - time_stamp = load_be32(_recv_data_buf.data() + offset + header_len); + time_stamp = load_be32(ptr + offset + header_len); offset += 4; } @@ -593,27 +596,29 @@ void RtmpProtocol::handle_rtmp() { throw std::runtime_error("非法的bodySize"); } - auto iMore = min(_chunk_size_in, chunk_data.body_size - chunk_data.buffer.size()); - if (_recv_data_buf.size() < header_len + offset + iMore) { + auto more = min(_chunk_size_in, (size_t)(chunk_data.body_size - chunk_data.buffer.size())); + if (len < header_len + offset + more) { //need more data - return; + return ptr; } - chunk_data.buffer.append(_recv_data_buf, header_len + offset, iMore); - _recv_data_buf.erase(0, header_len + offset + iMore); + chunk_data.buffer.append(ptr + header_len + offset, more); + ptr += header_len + offset + more; + len -= header_len + offset + more; if (chunk_data.buffer.size() == chunk_data.body_size) { //frame is ready _now_stream_index = chunk_data.stream_index; chunk_data.time_stamp = time_stamp + (chunk_data.is_abs_stamp ? 0 : chunk_data.time_stamp); if (chunk_data.body_size) { - handle_rtmpChunk(chunk_data); + handle_chunk(chunk_data); } chunk_data.buffer.clear(); chunk_data.is_abs_stamp = false; } } + return ptr; } -void RtmpProtocol::handle_rtmpChunk(RtmpPacket& chunk_data) { +void RtmpProtocol::handle_chunk(RtmpPacket& chunk_data) { switch (chunk_data.type_id) { case MSG_ACK: { if (chunk_data.buffer.size() < 4) { @@ -713,7 +718,7 @@ void RtmpProtocol::handle_rtmpChunk(RtmpPacket& chunk_data) { case MSG_AGGREGATE: { auto ptr = (uint8_t *) chunk_data.buffer.data(); - auto ptr_tail = ptr + chunk_data.buffer.length(); + auto ptr_tail = ptr + chunk_data.buffer.size(); while (ptr + 8 + 3 < ptr_tail) { auto type = *ptr; ptr += 1; @@ -730,14 +735,13 @@ void RtmpProtocol::handle_rtmpChunk(RtmpPacket& chunk_data) { break; } RtmpPacket sub_packet; - sub_packet.buffer.resize(size); - memcpy((char *) sub_packet.buffer.data(), ptr, size); + sub_packet.buffer.assign((char *)ptr, size); sub_packet.type_id = type; sub_packet.body_size = size; sub_packet.time_stamp = ts; sub_packet.stream_index = chunk_data.stream_index; sub_packet.chunk_id = chunk_data.chunk_id; - handle_rtmpChunk(sub_packet); + handle_chunk(sub_packet); ptr += size; } break; diff --git a/src/Rtmp/RtmpProtocol.h b/src/Rtmp/RtmpProtocol.h index 6f2df9dd..e20fce30 100644 --- a/src/Rtmp/RtmpProtocol.h +++ b/src/Rtmp/RtmpProtocol.h @@ -21,23 +21,24 @@ #include "Util/TimeTicker.h" #include "Network/Socket.h" #include "Util/ResourcePool.h" +#include "Http/HttpRequestSplitter.h" using namespace std; using namespace toolkit; namespace mediakit { -class RtmpProtocol { +class RtmpProtocol : public HttpRequestSplitter{ public: RtmpProtocol(); virtual ~RtmpProtocol(); - void onParseRtmp(const char *data, int size); + void onParseRtmp(const char *data, uint64_t size); //作为客户端发送c0c1,等待s0s1s2并且回调 void startClientSession(const function &cb); protected: - virtual void onSendRawData(const Buffer::Ptr &buffer) = 0; + virtual void onSendRawData(Buffer::Ptr buffer) = 0; virtual void onRtmpChunk(RtmpPacket &chunk_data) = 0; virtual void onStreamBegin(uint32_t stream_index){ _stream_index = stream_index; @@ -45,6 +46,11 @@ protected: virtual void onStreamEof(uint32_t stream_index){}; virtual void onStreamDry(uint32_t stream_index){}; +protected: + //// HttpRequestSplitter override //// + int64_t onRecvHeader(const char *data,uint64_t len) override { return 0; } + const char *onSearchPacketTail(const char *data,uint64_t len) override; + protected: void reset(); BufferRaw::Ptr obtainBuffer(); @@ -66,20 +72,20 @@ protected: void sendRtmp(uint8_t type, uint32_t stream_index, const Buffer::Ptr &buffer, uint32_t stamp, int chunk_id); private: - void handle_S0S1S2(const function &func); - void handle_C0C1(); - void handle_C1_simple(); + void handle_C1_simple(const char *data); #ifdef ENABLE_OPENSSL - void handle_C1_complex(); + void handle_C1_complex(const char *data); string get_C1_digest(const uint8_t *ptr,char **digestPos); string get_C1_key(const uint8_t *ptr); void check_C1_Digest(const string &digest,const string &data); void send_complex_S0S1S2(int schemeType,const string &digest); #endif //ENABLE_OPENSSL - void handle_C2(); - void handle_rtmp(); - void handle_rtmpChunk(RtmpPacket &chunk_data); + const char* handle_S0S1S2(const char *data, uint64_t len, const function &func); + const char* handle_C0C1(const char *data, uint64_t len); + const char* handle_C2(const char *data, uint64_t len); + const char* handle_rtmp(const char *data, uint64_t len); + void handle_chunk(RtmpPacket &chunk_data); protected: int _send_req_id = 0; @@ -100,8 +106,7 @@ private: uint32_t _bandwidth = 2500000; uint8_t _band_limit_type = 2; //////////Rtmp parser////////// - string _recv_data_buf; - function _next_step_func; + function _next_step_func; ////////////Chunk//////////// unordered_map _map_chunk_data; }; diff --git a/src/Rtmp/RtmpPusher.cpp b/src/Rtmp/RtmpPusher.cpp index 201d6f9e..a7d08cf5 100644 --- a/src/Rtmp/RtmpPusher.cpp +++ b/src/Rtmp/RtmpPusher.cpp @@ -119,9 +119,6 @@ void RtmpPusher::onConnect(const SockException &err){ onPublishResult(err, false); return; } - //推流器不需要多大的接收缓存,节省内存占用 - getSock()->setReadBuffer(std::make_shared(1 * 1024)); - weak_ptr weak_self = dynamic_pointer_cast(shared_from_this()); startClientSession([weak_self]() { auto strong_self = weak_self.lock(); diff --git a/src/Rtmp/RtmpPusher.h b/src/Rtmp/RtmpPusher.h index 3d7e7044..a1ac0d9c 100644 --- a/src/Rtmp/RtmpPusher.h +++ b/src/Rtmp/RtmpPusher.h @@ -43,8 +43,8 @@ protected: //for RtmpProtocol override void onRtmpChunk(RtmpPacket &chunk_data) override; - void onSendRawData(const Buffer::Ptr &buffer) override{ - send(buffer); + void onSendRawData(Buffer::Ptr buffer) override{ + send(std::move(buffer)); } private: diff --git a/src/Rtmp/RtmpSession.cpp b/src/Rtmp/RtmpSession.cpp index f992385c..9b82600f 100644 --- a/src/Rtmp/RtmpSession.cpp +++ b/src/Rtmp/RtmpSession.cpp @@ -17,8 +17,6 @@ RtmpSession::RtmpSession(const Socket::Ptr &sock) : TcpSession(sock) { DebugP(this); GET_CONFIG(uint32_t,keep_alive_sec,Rtmp::kKeepAliveSecond); sock->setSendTimeOutSecond(keep_alive_sec); - //起始接收buffer缓存设置为4K,节省内存 - sock->setReadBuffer(std::make_shared(4 * 1024)); } RtmpSession::~RtmpSession() { @@ -151,9 +149,6 @@ void RtmpSession::onCmd_publish(AMFDecoder &dec) { _publisher_src->setListener(dynamic_pointer_cast(shared_from_this())); //设置转协议 _publisher_src->setProtocolTranslation(enableHls, enableMP4); - - //如果是rtmp推流客户端,那么加大TCP接收缓存,这样能提升接收性能 - getSock()->setReadBuffer(std::make_shared(256 * 1024)); setSocketFlags(); }; diff --git a/src/Rtmp/RtmpSession.h b/src/Rtmp/RtmpSession.h index 239a7781..e608ca4c 100644 --- a/src/Rtmp/RtmpSession.h +++ b/src/Rtmp/RtmpSession.h @@ -56,9 +56,9 @@ private: void setMetaData(AMFDecoder &dec); void onSendMedia(const RtmpPacket::Ptr &pkt); - void onSendRawData(const Buffer::Ptr &buffer) override{ + void onSendRawData(Buffer::Ptr buffer) override{ _total_bytes += buffer->size(); - send(buffer); + send(std::move(buffer)); } void onRtmpChunk(RtmpPacket &chunk_data) override; diff --git a/src/Rtmp/amf.cpp b/src/Rtmp/amf.cpp index ca4c8077..c2db49a3 100644 --- a/src/Rtmp/amf.cpp +++ b/src/Rtmp/amf.cpp @@ -539,7 +539,7 @@ std::string AMFDecoder::load() { if (pos + str_len > buf.size()) { throw std::runtime_error("Not enough data"); } - std::string s(buf, pos, str_len); + std::string s = buf.substr(pos, str_len); pos += str_len; return s; } @@ -612,7 +612,7 @@ std::string AMFDecoder::load_key() { if (pos + str_len > buf.size()) { throw std::runtime_error("Not enough data"); } - std::string s(buf, pos, str_len); + std::string s = buf.substr(pos, str_len); pos += str_len; return s; @@ -680,7 +680,7 @@ AMFValue AMFDecoder::load_arr() { return object; } -AMFDecoder::AMFDecoder(const std::string &buf_in, size_t pos_in, int version_in) : +AMFDecoder::AMFDecoder(const BufferLikeString &buf_in, size_t pos_in, int version_in) : buf(buf_in), pos(pos_in), version(version_in) { } diff --git a/src/Rtmp/amf.h b/src/Rtmp/amf.h index 9b29b11e..44153da3 100644 --- a/src/Rtmp/amf.h +++ b/src/Rtmp/amf.h @@ -18,7 +18,9 @@ #include #include #include +#include "Network/Buffer.h" using namespace std; +using namespace toolkit; enum AMFType { AMF_NUMBER, @@ -81,7 +83,7 @@ private: class AMFDecoder { public: - AMFDecoder(const std::string &buf, size_t pos, int version = 0); + AMFDecoder(const BufferLikeString &buf, size_t pos, int version = 0); template TP load(); private: @@ -92,7 +94,7 @@ private: uint8_t front(); uint8_t pop_front(); private: - const std::string &buf; + const BufferLikeString &buf; size_t pos; int version; }; diff --git a/src/Rtp/Decoder.cpp b/src/Rtp/Decoder.cpp index 09090d76..9df23287 100644 --- a/src/Rtp/Decoder.cpp +++ b/src/Rtp/Decoder.cpp @@ -102,11 +102,12 @@ void FrameMerger::inputFrame(const Frame::Ptr &frame,const functionsize() + 1024); _frameCached.for_each([&](const Frame::Ptr &frame){ merged.append(frame->data(),frame->size()); }); - merged_frame = std::make_shared(std::move(merged)); + merged_frame = std::make_shared >(std::move(merged)); } cb(back->dts(),back->pts(),merged_frame); _frameCached.clear(); diff --git a/src/Rtp/GB28181Process.cpp b/src/Rtp/GB28181Process.cpp index 0006d2eb..27947533 100644 --- a/src/Rtp/GB28181Process.cpp +++ b/src/Rtp/GB28181Process.cpp @@ -69,7 +69,7 @@ void GB28181Process::onRtpSorted(const RtpPacket::Ptr &rtp, int) { _rtp_decoder->inputRtp(rtp, false); } -const char *GB28181Process::onSearchPacketTail(const char *packet,int bytes){ +const char *GB28181Process::onSearchPacketTail(const char *packet,uint64_t bytes){ try { auto ret = _decoder->input((uint8_t *) packet, bytes); if (ret > 0) { diff --git a/src/Rtp/GB28181Process.h b/src/Rtp/GB28181Process.h index 536af856..76da3f1d 100644 --- a/src/Rtp/GB28181Process.h +++ b/src/Rtp/GB28181Process.h @@ -37,7 +37,7 @@ public: protected: void onRtpSorted(const RtpPacket::Ptr &rtp, int track_index) override ; - const char *onSearchPacketTail(const char *data,int len) override; + const char *onSearchPacketTail(const char *data,uint64_t len) override; int64_t onRecvHeader(const char *data,uint64_t len) override { return 0; }; private: diff --git a/src/Rtp/PSEncoder.cpp b/src/Rtp/PSEncoder.cpp index f2b47693..7bd55d3f 100644 --- a/src/Rtp/PSEncoder.cpp +++ b/src/Rtp/PSEncoder.cpp @@ -129,7 +129,8 @@ void PSEncoder::inputFrame(const Frame::Ptr &frame) { Frame::Ptr back = _frameCached.back(); Buffer::Ptr merged_frame = back; if (_frameCached.size() != 1) { - string merged; + BufferLikeString merged; + merged.reserve(back->size() + 1024); _frameCached.for_each([&](const Frame::Ptr &frame) { if (frame->prefixSize()) { merged.append(frame->data(), frame->size()); @@ -138,7 +139,7 @@ void PSEncoder::inputFrame(const Frame::Ptr &frame) { merged.append(frame->data(), frame->size()); } }); - merged_frame = std::make_shared(std::move(merged)); + merged_frame = std::make_shared >(std::move(merged)); } track_info.stamp.revise(back->dts(), back->pts(), dts_out, pts_out); _timestamp = dts_out; diff --git a/src/Rtp/RtpSplitter.cpp b/src/Rtp/RtpSplitter.cpp index 74b20c2e..0bb76d59 100644 --- a/src/Rtp/RtpSplitter.cpp +++ b/src/Rtp/RtpSplitter.cpp @@ -9,7 +9,6 @@ */ #if defined(ENABLE_RTPPROXY) -#include #include #include "RtpSplitter.h" namespace mediakit{ @@ -35,14 +34,14 @@ int64_t RtpSplitter::onRecvHeader(const char *data,uint64_t len){ return 0; } -static bool isEhome(const char *data, int len){ +static bool isEhome(const char *data, uint64_t len){ if (len < 4) { return false; } return memcmp(data, kEHOME_MAGIC, sizeof(kEHOME_MAGIC) - 1) == 0; } -const char *RtpSplitter::onSearchPacketTail(const char *data, int len) { +const char *RtpSplitter::onSearchPacketTail(const char *data, uint64_t len) { if (len < 4) { //数据不够 return nullptr; @@ -70,7 +69,7 @@ const char *RtpSplitter::onSearchPacketTail(const char *data, int len) { return onSearchPacketTail_l(data, len); } -const char *RtpSplitter::onSearchPacketTail_l(const char *data, int len) { +const char *RtpSplitter::onSearchPacketTail_l(const char *data, uint64_t len) { //这是rtp包 uint16_t length = (((uint8_t *) data)[0] << 8) | ((uint8_t *) data)[1]; if (len < length + 2) { diff --git a/src/Rtp/RtpSplitter.h b/src/Rtp/RtpSplitter.h index d81cdd0e..9f895c21 100644 --- a/src/Rtp/RtpSplitter.h +++ b/src/Rtp/RtpSplitter.h @@ -31,8 +31,8 @@ protected: protected: int64_t onRecvHeader(const char *data, uint64_t len) override; - const char *onSearchPacketTail(const char *data, int len) override; - const char *onSearchPacketTail_l(const char *data, int len); + const char *onSearchPacketTail(const char *data, uint64_t len) override; + const char *onSearchPacketTail_l(const char *data, uint64_t len); private: int _offset = 0; diff --git a/src/Rtp/TSDecoder.cpp b/src/Rtp/TSDecoder.cpp index 0f25c20e..92fb26dd 100644 --- a/src/Rtp/TSDecoder.cpp +++ b/src/Rtp/TSDecoder.cpp @@ -28,7 +28,7 @@ int64_t TSSegment::onRecvHeader(const char *data, uint64_t len) { return 0; } -const char *TSSegment::onSearchPacketTail(const char *data, int len) { +const char *TSSegment::onSearchPacketTail(const char *data, uint64_t len) { if (len < _size + 1) { if (len == _size && ((uint8_t *) data)[0] == TS_SYNC_BYTE) { return data + _size; diff --git a/src/Rtp/TSDecoder.h b/src/Rtp/TSDecoder.h index 42841404..db6e8b4b 100644 --- a/src/Rtp/TSDecoder.h +++ b/src/Rtp/TSDecoder.h @@ -31,7 +31,7 @@ public: static bool isTSPacket(const char *data, int len); protected: int64_t onRecvHeader(const char *data, uint64_t len) override ; - const char *onSearchPacketTail(const char *data, int len) override ; + const char *onSearchPacketTail(const char *data, uint64_t len) override ; private: int _size; onSegment _onSegment; diff --git a/src/Rtsp/RtspPlayer.cpp b/src/Rtsp/RtspPlayer.cpp index dc899a5d..747de9e5 100644 --- a/src/Rtsp/RtspPlayer.cpp +++ b/src/Rtsp/RtspPlayer.cpp @@ -704,7 +704,8 @@ void RtspPlayer::sendRtspRequest(const string &cmd, const string &url,const StrC for (auto &pr : header){ printer << pr.first << ": " << pr.second << "\r\n"; } - SockSender::send(printer << "\r\n"); + printer << "\r\n"; + SockSender::send(std::move(printer)); } void RtspPlayer::onRecvRTP_l(const RtpPacket::Ptr &rtp, const SdpTrack::Ptr &track) { diff --git a/src/Rtsp/RtspPusher.cpp b/src/Rtsp/RtspPusher.cpp index 4728c8b4..0172ca5b 100644 --- a/src/Rtsp/RtspPusher.cpp +++ b/src/Rtsp/RtspPusher.cpp @@ -121,8 +121,6 @@ void RtspPusher::onConnect(const SockException &err) { onPublishResult(err, false); return; } - //推流器不需要多大的接收缓存,节省内存占用 - getSock()->setReadBuffer(std::make_shared(1 * 1024)); sendAnnounce(); } @@ -318,7 +316,7 @@ inline void RtspPusher::sendRtpPacket(const RtspMediaSource::RingDataType &pkt) setSendFlushFlag(true); } BufferRtp::Ptr buffer(new BufferRtp(rtp)); - send(buffer); + send(std::move(buffer)); }); break; } @@ -335,7 +333,7 @@ inline void RtspPusher::sendRtpPacket(const RtspMediaSource::RingDataType &pkt) } BufferRtp::Ptr buffer(new BufferRtp(rtp, 4)); - pSock->send(buffer, nullptr, 0, ++i == size); + pSock->send(std::move(buffer), nullptr, 0, ++i == size); }); break; } @@ -475,7 +473,7 @@ void RtspPusher::sendRtspRequest(const string &cmd, const string &url,const StrC if (!sdp.empty()) { printer << sdp; } - SockSender::send(printer); + SockSender::send(std::move(printer)); } diff --git a/src/Rtsp/RtspSession.cpp b/src/Rtsp/RtspSession.cpp index e010bd3c..40c42f47 100644 --- a/src/Rtsp/RtspSession.cpp +++ b/src/Rtsp/RtspSession.cpp @@ -60,8 +60,6 @@ RtspSession::RtspSession(const Socket::Ptr &sock) : TcpSession(sock) { DebugP(this); GET_CONFIG(uint32_t,keep_alive_sec,Rtsp::kKeepAliveSecond); sock->setSendTimeOutSecond(keep_alive_sec); - //起始接收buffer缓存设置为4K,节省内存 - sock->setReadBuffer(std::make_shared(4 * 1024)); } RtspSession::~RtspSession() { @@ -270,8 +268,7 @@ void RtspSession::handleReq_RECORD(const Parser &parser){ rtp_info.pop_back(); sendRtspResponse("200 OK", {"RTP-Info",rtp_info}); if(_rtp_type == Rtsp::RTP_TCP){ - //如果是rtsp推流服务器,并且是TCP推流,那么加大TCP接收缓存,这样能提升接收性能 - getSock()->setReadBuffer(std::make_shared(256 * 1024)); + //如果是rtsp推流服务器,并且是TCP推流,设置socket flags,,这样能提升接收性能 setSocketFlags(); } }; @@ -1030,15 +1027,15 @@ bool RtspSession::sendRtspResponse(const string &res_code, const StrCaseMap &hea printer << sdp; } // DebugP(this) << printer; - return send(std::make_shared(printer)) > 0 ; + return send(std::make_shared(std::move(printer))) > 0 ; } -int RtspSession::send(const Buffer::Ptr &pkt){ +int RtspSession::send(Buffer::Ptr pkt){ // if(!_enableSendRtp){ // DebugP(this) << pkt->data(); // } _bytes_usage += pkt->size(); - return TcpSession::send(pkt); + return TcpSession::send(std::move(pkt)); } bool RtspSession::sendRtspResponse(const string &res_code, const std::initializer_list &header, const string &sdp, const char *protocol) { @@ -1162,7 +1159,7 @@ void RtspSession::sendRtpPacket(const RtspMediaSource::RingDataType &pkt) { } BufferRtp::Ptr buffer(new BufferRtp(rtp, 4)); _bytes_usage += buffer->size(); - pSock->send(buffer, nullptr, 0, ++i == size); + pSock->send(std::move(buffer), nullptr, 0, ++i == size); }); } break; diff --git a/src/Rtsp/RtspSession.h b/src/Rtsp/RtspSession.h index cd691c7b..23dd52bb 100644 --- a/src/Rtsp/RtspSession.h +++ b/src/Rtsp/RtspSession.h @@ -93,7 +93,7 @@ protected: std::shared_ptr getOriginSock(MediaSource &sender) const override; /////TcpSession override//// - int send(const Buffer::Ptr &pkt) override; + int send(Buffer::Ptr pkt) override; //收到RTCP包回调 virtual void onRtcpPacket(int track_idx, SdpTrack::Ptr &track, unsigned char *data, unsigned int len); diff --git a/src/Rtsp/RtspSplitter.cpp b/src/Rtsp/RtspSplitter.cpp index 1c97f369..527659af 100644 --- a/src/Rtsp/RtspSplitter.cpp +++ b/src/Rtsp/RtspSplitter.cpp @@ -15,7 +15,7 @@ namespace mediakit{ -const char *RtspSplitter::onSearchPacketTail(const char *data, int len) { +const char *RtspSplitter::onSearchPacketTail(const char *data, uint64_t len) { auto ret = onSearchPacketTail_l(data, len); if(ret){ return ret; @@ -32,7 +32,7 @@ const char *RtspSplitter::onSearchPacketTail(const char *data, int len) { return ret; } -const char *RtspSplitter::onSearchPacketTail_l(const char *data, int len) { +const char *RtspSplitter::onSearchPacketTail_l(const char *data, uint64_t len) { if(!_enableRecvRtp || data[0] != '$'){ //这是rtsp包 _isRtpPacket = false; diff --git a/src/Rtsp/RtspSplitter.h b/src/Rtsp/RtspSplitter.h index 3cd35821..a1a443c2 100644 --- a/src/Rtsp/RtspSplitter.h +++ b/src/Rtsp/RtspSplitter.h @@ -47,8 +47,8 @@ protected: */ virtual int64_t getContentLength(Parser &parser); protected: - const char *onSearchPacketTail(const char *data,int len) override ; - const char *onSearchPacketTail_l(const char *data,int len) ; + const char *onSearchPacketTail(const char *data,uint64_t len) override ; + const char *onSearchPacketTail_l(const char *data,uint64_t len) ; int64_t onRecvHeader(const char *data,uint64_t len) override; void onRecvContent(const char *data,uint64_t len) override; private: