diff --git a/3rdpart/media-server b/3rdpart/media-server index 8d40dad3..737b8d85 160000 --- a/3rdpart/media-server +++ b/3rdpart/media-server @@ -1 +1 @@ -Subproject commit 8d40dad3dbdce171756691d4511aca49fcf2a231 +Subproject commit 737b8d852eeb1a36bc77854f327fbbef7cfb81be diff --git a/README.md b/README.md index 1377e922..555289f3 100644 --- a/README.md +++ b/README.md @@ -67,7 +67,7 @@ git submodule update --init - 支持http文件访问鉴权 - GB28181 - - 支持UDP/TCP国标RTP推流,可以转换成RTSP/RTMP/HLS等协议 + - 支持UDP/TCP国标RTP(PS或TS)推流,可以转换成RTSP/RTMP/HLS等协议 - 点播 - 支持录制为FLV/HLS/MP4 diff --git a/README_en.md b/README_en.md index d9947c01..b92cfd15 100644 --- a/README_en.md +++ b/README_en.md @@ -51,7 +51,7 @@ - Auto close stream when nobody played. - Play and push authentication. - Pull stream on Demand. - + - Support TS / PS streaming push through RTP,and it can be converted to RTSP / RTMP / HLS / FLV. - Protocol conversion: diff --git a/api/include/mk_events_objects.h b/api/include/mk_events_objects.h index a385fdfb..730f1dc0 100644 --- a/api/include/mk_events_objects.h +++ b/api/include/mk_events_objects.h @@ -109,7 +109,20 @@ API_EXPORT const char* API_CALL mk_media_source_get_stream(const mk_media_source API_EXPORT int API_CALL mk_media_source_get_reader_count(const mk_media_source ctx); //MediaSource::totalReaderCount() API_EXPORT int API_CALL mk_media_source_get_total_reader_count(const mk_media_source ctx); -//MediaSource::close() +/** + * 直播源在ZLMediaKit中被称作为MediaSource, + * 目前支持3种,分别是RtmpMediaSource、RtspMediaSource、HlsMediaSource + * 源的产生有被动和主动方式: + * 被动方式分别是rtsp/rtmp/rtp推流、mp4点播 + * 主动方式包括mk_media_create创建的对象(DevChannel)、mk_proxy_player_create创建的对象(PlayerProxy) + * 被动方式你不用做任何处理,ZLMediaKit已经默认适配了MediaSource::close()事件,都会关闭直播流 + * 主动方式你要设置这个事件的回调,你要自己选择删除对象 + * 通过mk_proxy_player_set_on_close、mk_media_set_on_close函数可以设置回调, + * 请在回调中删除对象来完成媒体的关闭,否则又为什么要调用mk_media_source_close函数? + * @param ctx 对象 + * @param force 是否强制关闭,如果强制关闭,在有人观看的情况下也会关闭 + * @return 0代表失败,1代表成功 + */ API_EXPORT int API_CALL mk_media_source_close(const mk_media_source ctx,int force); //MediaSource::seekTo() API_EXPORT int API_CALL mk_media_source_seek_to(const mk_media_source ctx,uint32_t stamp); diff --git a/api/include/mk_media.h b/api/include/mk_media.h index 4f56a4f8..1ec00bd5 100755 --- a/api/include/mk_media.h +++ b/api/include/mk_media.h @@ -130,17 +130,18 @@ API_EXPORT void API_CALL mk_media_input_aac(mk_media ctx, void *data, int len, u API_EXPORT void API_CALL mk_media_input_aac1(mk_media ctx, void *data, int len, uint32_t dts, void *adts); /** - * 在调用对应的MediaSource.close()时会触发该回调 - * 你可以在该事件中做清理工作(比如说关闭摄像头,同时调用mk_media_release函数销毁该对象) + * MediaSource.close()回调事件 + * 在选择关闭一个关联的MediaSource时,将会最终触发到该回调 + * 你应该通过该事件调用mk_media_release函数并且释放其他资源 + * 如果你不调用mk_media_release函数,那么MediaSource.close()操作将无效 * @param user_data 用户数据指针,通过mk_media_set_on_close函数设置 - * @return 返回0告知事件触发者关闭媒体失败,非0代表成功 */ -typedef int(API_CALL *on_mk_media_close)(void *user_data); +typedef void(API_CALL *on_mk_media_close)(void *user_data); /** * 监听MediaSource.close()事件 - * 在选择关闭一个MediaSource时,将会最终触发到该回调 - * 你可以通过该事件选择删除对象,当然你在该事件中也可以什么都不做 + * 在选择关闭一个关联的MediaSource时,将会最终触发到该回调 + * 你应该通过该事件调用mk_media_release函数并且释放其他资源 * @param ctx 对象指针 * @param cb 回调指针 * @param user_data 用户数据指针 diff --git a/api/include/mk_proxyplayer.h b/api/include/mk_proxyplayer.h index a8495eda..96bb5bf8 100644 --- a/api/include/mk_proxyplayer.h +++ b/api/include/mk_proxyplayer.h @@ -68,6 +68,32 @@ API_EXPORT void API_CALL mk_proxy_player_set_option(mk_proxy_player ctx, const c */ API_EXPORT void API_CALL mk_proxy_player_play(mk_proxy_player ctx, const char *url); +/** + * MediaSource.close()回调事件 + * 在选择关闭一个关联的MediaSource时,将会最终触发到该回调 + * 你应该通过该事件调用mk_proxy_player_release函数并且释放其他资源 + * 如果你不调用mk_proxy_player_release函数,那么MediaSource.close()操作将无效 + * @param user_data 用户数据指针,通过mk_proxy_player_set_on_close函数设置 + */ +typedef void(API_CALL *on_mk_proxy_player_close)(void *user_data); + +/** + * 监听MediaSource.close()事件 + * 在选择关闭一个关联的MediaSource时,将会最终触发到该回调 + * 你应该通过该事件调用mk_proxy_player_release函数并且释放其他资源 + * @param ctx 对象指针 + * @param cb 回调指针 + * @param user_data 用户数据指针 + */ +API_EXPORT void API_CALL mk_proxy_player_set_on_close(mk_proxy_player ctx, on_mk_proxy_player_close cb, void *user_data); + +/** + * 获取总的观看人数 + * @param ctx 对象指针 + * @return 观看人数 + */ +API_EXPORT int API_CALL mk_proxy_player_total_reader_count(mk_proxy_player ctx); + #ifdef __cplusplus } #endif diff --git a/api/source/mk_media.cpp b/api/source/mk_media.cpp index d513af54..210034d3 100755 --- a/api/source/mk_media.cpp +++ b/api/source/mk_media.cpp @@ -62,14 +62,11 @@ protected: } if(!_cb){ //未设置回调,没法关闭 + WarnL << "请使用mk_media_set_on_close函数设置回调函数!"; return false; } - if(!_cb(_user_data)){ - //回调选择返回不关闭该视频 - return false; - } - - //回调中已经关闭该视频 + //请在回调中调用mk_media_release函数释放资源,否则MediaSource::close()操作不会生效 + _cb(_user_data); WarnL << "close media:" << sender.getSchema() << "/" << sender.getVhost() << "/" << sender.getApp() << "/" << sender.getId() << " " << force; return true; } diff --git a/api/source/mk_proxyplayer.cpp b/api/source/mk_proxyplayer.cpp index 23702a9d..00cb341a 100644 --- a/api/source/mk_proxyplayer.cpp +++ b/api/source/mk_proxyplayer.cpp @@ -61,3 +61,22 @@ API_EXPORT void API_CALL mk_proxy_player_play(mk_proxy_player ctx, const char *u obj->play(url_str); }); } + +API_EXPORT void API_CALL mk_proxy_player_set_on_close(mk_proxy_player ctx, on_mk_proxy_player_close cb, void *user_data){ + assert(ctx); + PlayerProxy::Ptr &obj = *((PlayerProxy::Ptr *) ctx); + obj->getPoller()->async([obj,cb,user_data](){ + //切换线程再操作 + obj->setOnClose([cb,user_data](){ + if(cb){ + cb(user_data); + } + }); + }); +} + +API_EXPORT int API_CALL mk_proxy_player_total_reader_count(mk_proxy_player ctx){ + assert(ctx); + PlayerProxy::Ptr &obj = *((PlayerProxy::Ptr *) ctx); + return obj->totalReaderCount(); +} diff --git a/src/Common/MediaSink.cpp b/src/Common/MediaSink.cpp index 9d224ac2..90cb1797 100644 --- a/src/Common/MediaSink.cpp +++ b/src/Common/MediaSink.cpp @@ -137,6 +137,7 @@ void MediaSink::emitAllTrackReady() { //移除未准备好的Track for (auto it = _track_map.begin(); it != _track_map.end();) { if (!it->second->ready()) { + WarnL << "该track长时间未被初始化,已忽略:" << it->second->getCodecName(); it = _track_map.erase(it); continue; } diff --git a/src/Common/MediaSource.cpp b/src/Common/MediaSource.cpp index 75d6c269..9d15ab09 100644 --- a/src/Common/MediaSource.cpp +++ b/src/Common/MediaSource.cpp @@ -340,6 +340,12 @@ void MediaInfo::parse(const string &url){ } else{ _host = _vhost = vhost; } + + if(_vhost == "localhost" || INADDR_NONE != inet_addr(_vhost.data())){ + //如果访问的是localhost或ip,那么则为默认虚拟主机 + _vhost = DEFAULT_VHOST; + } + } if(split_vec.size() > 1){ _app = split_vec[1]; @@ -366,7 +372,8 @@ void MediaInfo::parse(const string &url){ } GET_CONFIG(bool,enableVhost,General::kEnableVhost); - if(!enableVhost || _vhost.empty() || _vhost == "localhost" || INADDR_NONE != inet_addr(_vhost.data())){ + if(!enableVhost || _vhost.empty()){ + //如果关闭虚拟主机或者虚拟主机为空,则设置虚拟主机为默认 _vhost = DEFAULT_VHOST; } } diff --git a/src/Common/MultiMediaSourceMuxer.h b/src/Common/MultiMediaSourceMuxer.h index 0ec1f2ed..c0a79adb 100644 --- a/src/Common/MultiMediaSourceMuxer.h +++ b/src/Common/MultiMediaSourceMuxer.h @@ -33,6 +33,10 @@ #include "Record/HlsMediaSource.h" #include "Record/HlsRecorder.h" +/** + * 使用该对象时,应该使用setListener方法来绑定MediaSource相关的事件 + * 否则多种不同类型的MediaSource(rtsp/rtmp/hls)将无法产生关联 + */ class MultiMediaSourceMuxer : public MediaSink , public std::enable_shared_from_this{ public: class Listener{ diff --git a/src/Extension/AACRtp.cpp b/src/Extension/AACRtp.cpp index a84c22be..cc2feb42 100644 --- a/src/Extension/AACRtp.cpp +++ b/src/Extension/AACRtp.cpp @@ -24,6 +24,7 @@ * SOFTWARE. */ #include "AACRtp.h" +#define ADTS_HEADER_LEN 7 namespace mediakit{ @@ -91,8 +92,8 @@ AACRtpDecoder::AACRtpDecoder() { AACFrame::Ptr AACRtpDecoder::obtainFrame() { //从缓存池重新申请对象,防止覆盖已经写入环形缓存的对象 auto frame = ResourcePoolHelper::obtainObj(); - frame->aac_frame_length = 7; - frame->iPrefixSize = 7; + frame->aac_frame_length = ADTS_HEADER_LEN; + frame->iPrefixSize = ADTS_HEADER_LEN; if(frame->syncword == 0 && !_aac_cfg.empty()) { makeAdtsHeader(_aac_cfg,*frame); } @@ -100,70 +101,48 @@ AACFrame::Ptr AACRtpDecoder::obtainFrame() { } bool AACRtpDecoder::inputRtp(const RtpPacket::Ptr &rtppack, bool key_pos) { - // 获取rtp数据长度 - int length = rtppack->size() - rtppack->offset; + //rtp数据开始部分 + uint8_t *ptr = (uint8_t *) rtppack->data() + rtppack->offset; + //rtp数据末尾 + const uint8_t *end = (uint8_t *) rtppack->data() + rtppack->size(); - // 获取rtp数据 - const uint8_t *rtp_packet_buf = (uint8_t *)rtppack->data() + rtppack->offset; - - do - { - // 查询头部的偏移,每次2字节 - uint32_t au_header_offset = 0; - //首2字节表示Au-Header的长度,单位bit,所以除以16得到Au-Header字节数 - const uint16_t au_header_length = (((rtp_packet_buf[au_header_offset] << 8) | rtp_packet_buf[au_header_offset + 1]) >> 4); - au_header_offset += 2; - - //assert(length > (2 + au_header_length * 2)); - if (length < (2 + au_header_length * 2)) - break; - - // 存放每一个aac帧长度 - std::vector vec_aac_len; - for (int i = 0; i < au_header_length; ++i) - { - // 之后的2字节是AU_HEADER - const uint16_t au_header = ((rtp_packet_buf[au_header_offset] << 8) | rtp_packet_buf[au_header_offset + 1]); - // 其中高13位表示一帧AAC负载的字节长度,低3位无用 - uint32_t nAac = (au_header >> 3); - vec_aac_len.push_back(nAac); - au_header_offset += 2; - } + //首2字节表示Au-Header的个数,单位bit,所以除以16得到Au-Header个数 + const uint16_t au_header_count = ((ptr[0] << 8) | ptr[1]) >> 4; + //忽略Au-Header区 + ptr += 2 + au_header_count * 2; - // 真正aac负载开始处 - const uint8_t *rtp_packet_payload = rtp_packet_buf + au_header_offset; - // 载荷查找 - uint32_t next_aac_payload_offset = 0; - for (int j = 0; j < au_header_length; ++j) - { - // 当前aac包长度 - const uint32_t cur_aac_payload_len = vec_aac_len.at(j); + static const uint32_t max_size = sizeof(AACFrame::buffer) - ADTS_HEADER_LEN; + while (ptr < end) { + auto size = (uint32_t) (end - ptr); + if(size > max_size){ + size = max_size; + } + if (_adts->aac_frame_length + size > sizeof(AACFrame::buffer)) { + //数据太多了,先清空 + flushData(); + } + //追加aac数据 + memcpy(_adts->buffer + _adts->aac_frame_length, ptr, size); + _adts->aac_frame_length += size; + _adts->timeStamp = rtppack->timeStamp; + ptr += size; + } - if (_adts->aac_frame_length + cur_aac_payload_len > sizeof(AACFrame::buffer)) { - _adts->aac_frame_length = 7; - WarnL << "aac负载数据太长"; - return false; - } - - // 提取每一包aac载荷数据 - memcpy(_adts->buffer + _adts->aac_frame_length, rtp_packet_payload + next_aac_payload_offset, cur_aac_payload_len); - _adts->aac_frame_length += (cur_aac_payload_len); - if (rtppack->mark == true) { - _adts->timeStamp = rtppack->timeStamp; - writeAdtsHeader(*_adts, _adts->buffer); - onGetAAC(_adts); - } - - next_aac_payload_offset += cur_aac_payload_len; - } - } while (0); - + if (rtppack->mark) { + //最后一个rtp分片 + flushData(); + } return false; } -void AACRtpDecoder::onGetAAC(const AACFrame::Ptr &frame) { - //写入环形缓存 - RtpCodec::inputFrame(frame); + +void AACRtpDecoder::flushData() { + if(_adts->aac_frame_length == ADTS_HEADER_LEN){ + //没有有效数据 + return; + } + writeAdtsHeader(*_adts, _adts->buffer); + RtpCodec::inputFrame(_adts); _adts = obtainFrame(); } diff --git a/src/Extension/AACRtp.h b/src/Extension/AACRtp.h index 14cfdd30..89f0b279 100644 --- a/src/Extension/AACRtp.h +++ b/src/Extension/AACRtp.h @@ -56,8 +56,8 @@ public: protected: AACRtpDecoder(); private: - void onGetAAC(const AACFrame::Ptr &frame); AACFrame::Ptr obtainFrame(); + void flushData(); private: AACFrame::Ptr _adts; string _aac_cfg; diff --git a/src/Extension/Factory.cpp b/src/Extension/Factory.cpp index cff9a4c0..6aedc4b5 100644 --- a/src/Extension/Factory.cpp +++ b/src/Extension/Factory.cpp @@ -155,7 +155,7 @@ RtpCodec::Ptr Factory::getRtpDecoderByTrack(const Track::Ptr &track) { case CodecAAC: return std::make_shared(track->clone()); default: - WarnL << "暂不支持该CodecId:" << track->getCodecId(); + WarnL << "暂不支持该CodecId:" << track->getCodecName(); return nullptr; } } @@ -212,7 +212,7 @@ RtmpCodec::Ptr Factory::getRtmpCodecByTrack(const Track::Ptr &track) { case CodecAAC: return std::make_shared(track); default: - WarnL << "暂不支持该CodecId:" << track->getCodecId(); + WarnL << "暂不支持该CodecId:" << track->getCodecName(); return nullptr; } } diff --git a/src/Extension/Frame.cpp b/src/Extension/Frame.cpp index 0b6f5232..2c204e19 100644 --- a/src/Extension/Frame.cpp +++ b/src/Extension/Frame.cpp @@ -38,5 +38,16 @@ Frame::Ptr Frame::getCacheAbleFrame(const Frame::Ptr &frame){ return std::make_shared(frame); } +#define SWITCH_CASE(codec_id) case codec_id : return #codec_id +const char *CodecInfo::getCodecName() { + switch (getCodecId()) { + SWITCH_CASE(CodecH264); + SWITCH_CASE(CodecH265); + SWITCH_CASE(CodecAAC); + default: + return "unknown codec"; + } +} + }//namespace mediakit diff --git a/src/Extension/Frame.h b/src/Extension/Frame.h index d4cbca6b..66218b0c 100644 --- a/src/Extension/Frame.h +++ b/src/Extension/Frame.h @@ -72,6 +72,12 @@ public: * 获取编解码器类型 */ virtual CodecId getCodecId() const = 0; + + /** + * 获取编码器名称 + * @return 编码器名称 + */ + const char *getCodecName(); }; /** diff --git a/src/Http/HttpSession.cpp b/src/Http/HttpSession.cpp index d9144d11..a1bae44f 100644 --- a/src/Http/HttpSession.cpp +++ b/src/Http/HttpSession.cpp @@ -53,20 +53,29 @@ HttpSession::~HttpSession() { TraceP(this); } +void HttpSession::Handle_Req_HEAD(int64_t &content_len){ + //暂时全部返回200 OK,因为HTTP GET存在按需生成流的操作,所以不能按照HTTP GET的流程返回 + //如果直接返回404,那么又会导致按需生成流的逻辑失效,所以HTTP HEAD在静态文件或者已存在资源时才有效 + //对于按需生成流的直播场景并不适用 + sendResponse("200 OK", true); +} + int64_t HttpSession::onRecvHeader(const char *header,uint64_t len) { typedef void (HttpSession::*HttpCMDHandle)(int64_t &); static unordered_map s_func_map; static onceToken token([]() { s_func_map.emplace("GET",&HttpSession::Handle_Req_GET); s_func_map.emplace("POST",&HttpSession::Handle_Req_POST); - }, nullptr); + s_func_map.emplace("HEAD",&HttpSession::Handle_Req_HEAD); + }, nullptr); _parser.Parse(header); urlDecode(_parser); string cmd = _parser.Method(); auto it = s_func_map.find(cmd); if (it == s_func_map.end()) { - sendResponse("403 Forbidden", true); + WarnL << "不支持该命令:" << cmd; + sendResponse("405 Not Allowed", true); return 0; } @@ -256,8 +265,11 @@ bool HttpSession::checkLiveFlvStream(const function &cb){ return true; } - void HttpSession::Handle_Req_GET(int64_t &content_len) { + Handle_Req_GET_l(content_len, true); +} + +void HttpSession::Handle_Req_GET_l(int64_t &content_len, bool sendBody) { //先看看是否为WebSocket请求 if(checkWebSocket()){ content_len = -1; diff --git a/src/Http/HttpSession.h b/src/Http/HttpSession.h index 0d9ce18e..072681d3 100644 --- a/src/Http/HttpSession.h +++ b/src/Http/HttpSession.h @@ -107,8 +107,11 @@ protected: void onWebSocketEncodeData(const Buffer::Ptr &buffer) override; private: void Handle_Req_GET(int64_t &content_len); - void Handle_Req_POST(int64_t &content_len); - bool checkLiveFlvStream(const function &cb = nullptr); + void Handle_Req_GET_l(int64_t &content_len, bool sendBody); + void Handle_Req_POST(int64_t &content_len); + void Handle_Req_HEAD(int64_t &content_len); + + bool checkLiveFlvStream(const function &cb = nullptr); bool checkWebSocket(); bool emitHttpEvent(bool doInvoke); void urlDecode(Parser &parser); diff --git a/src/Http/strCoding.cpp b/src/Http/strCoding.cpp index 4b5ea07b..2a01f73e 100644 --- a/src/Http/strCoding.cpp +++ b/src/Http/strCoding.cpp @@ -69,24 +69,19 @@ char StrToBin(const char *str) } string strCoding::UrlEncode(const string &str) { - string dd; - size_t len = str.size(); - for (size_t i = 0; i < len; i++) { - if (isalnum((uint8_t)str[i])) { - char tempbuff[2]; - sprintf(tempbuff, "%c", str[i]); - dd.append(tempbuff); - } - else if (isspace((uint8_t)str[i])) { - dd.append("+"); - } - else { - char tempbuff[4]; - sprintf(tempbuff, "%%%X%X", (uint8_t)str[i] >> 4,(uint8_t)str[i] % 16); - dd.append(tempbuff); + string out; + size_t len = str.size(); + for (size_t i = 0; i < len; ++i) { + char ch = str[i]; + if (isalnum((uint8_t)ch)) { + out.push_back(ch); + }else { + char buf[4]; + sprintf(buf, "%%%X%X", (uint8_t)ch >> 4,(uint8_t)ch & 0x0F); + out.append(buf); } } - return dd; + return out; } string strCoding::UrlDecode(const string &str) { string output = ""; @@ -94,16 +89,18 @@ string strCoding::UrlDecode(const string &str) { int i = 0, len = str.length(); while (i < len) { if (str[i] == '%') { + if(i > len - 3){ + //防止内存溢出 + break; + } tmp[0] = str[i + 1]; tmp[1] = str[i + 2]; output += StrToBin(tmp); i = i + 3; - } - else if (str[i] == '+') { + } else if (str[i] == '+') { output += ' '; i++; - } - else { + } else { output += str[i]; i++; } diff --git a/src/Player/PlayerProxy.h b/src/Player/PlayerProxy.h index e4ed9807..7560e4c3 100644 --- a/src/Player/PlayerProxy.h +++ b/src/Player/PlayerProxy.h @@ -75,13 +75,16 @@ public: * @param strUrl */ void play(const string &strUrl) override; + + /** + * 获取观看总人数 + */ + int totalReaderCount() ; private: //MediaSourceEvent override bool close(MediaSource &sender,bool force) override; void onNoneReader(MediaSource &sender) override; int totalReaderCount(MediaSource &sender) override; - int totalReaderCount() ; - void rePlay(const string &strUrl,int iFailedCnt); void onPlaySuccess(); private: diff --git a/src/Record/MP4Muxer.cpp b/src/Record/MP4Muxer.cpp index 785fedbd..a8dd967a 100644 --- a/src/Record/MP4Muxer.cpp +++ b/src/Record/MP4Muxer.cpp @@ -258,7 +258,7 @@ void MP4Muxer::addTrack(const Track::Ptr &track) { } break; default: - WarnL << "MP4录制不支持该编码格式:" << track->getCodecId(); + WarnL << "MP4录制不支持该编码格式:" << track->getCodecName(); break; } } diff --git a/src/Rtp/RtpDecoder.cpp b/src/Rtp/RtpDecoder.cpp index af64f0d5..7bcd23e1 100644 --- a/src/Rtp/RtpDecoder.cpp +++ b/src/Rtp/RtpDecoder.cpp @@ -25,7 +25,6 @@ */ #if defined(ENABLE_RTPPROXY) -#include #include "Util/logger.h" #include "RtpDecoder.h" #include "rtp-payload.h" @@ -44,13 +43,7 @@ RtpDecoder::~RtpDecoder() { } } -void RtpDecoder::decodeRtp(const void *data, int bytes,const string &type_name) { - if(_rtp_type != type_name && _rtp_decoder){ - //rtp类型发生变化,切换之 - rtp_payload_decode_destroy(_rtp_decoder); - _rtp_decoder = nullptr; - } - +void RtpDecoder::decodeRtp(const void *data, int bytes) { if(!_rtp_decoder){ static rtp_payload_t s_func= { [](void* param, int bytes){ @@ -69,11 +62,9 @@ void RtpDecoder::decodeRtp(const void *data, int bytes,const string &type_name) uint8_t rtp_type = 0x7F & ((uint8_t *) data)[1]; InfoL << "rtp type:" << (int) rtp_type; - _rtp_decoder = rtp_payload_decode_create(rtp_type, type_name.data(), &s_func, this); + _rtp_decoder = rtp_payload_decode_create(rtp_type, "MP2P", &s_func, this); if (!_rtp_decoder) { WarnL << "unsupported rtp type:" << (int) rtp_type << ",size:" << bytes << ",hexdump" << hexdump(data, bytes > 16 ? 16 : bytes); - }else{ - _rtp_type = type_name; } } diff --git a/src/Rtp/RtpDecoder.h b/src/Rtp/RtpDecoder.h index a6d78a65..52a32022 100644 --- a/src/Rtp/RtpDecoder.h +++ b/src/Rtp/RtpDecoder.h @@ -38,12 +38,11 @@ public: RtpDecoder(); virtual ~RtpDecoder(); protected: - void decodeRtp(const void *data, int bytes,const string &type_name); + void decodeRtp(const void *data, int bytes); virtual void onRtpDecode(const uint8_t *packet, int bytes, uint32_t timestamp, int flags) = 0; private: void *_rtp_decoder = nullptr; BufferRaw::Ptr _buffer; - string _rtp_type; }; }//namespace mediakit diff --git a/src/Rtp/RtpProcess.cpp b/src/Rtp/RtpProcess.cpp index 32986f98..654ac363 100644 --- a/src/Rtp/RtpProcess.cpp +++ b/src/Rtp/RtpProcess.cpp @@ -33,8 +33,6 @@ namespace mediakit{ -static const vector kRtpTypes = {"MP2P","MP4V-ES"}; - /** * 合并一些时间戳相同的frame */ @@ -85,7 +83,6 @@ RtpProcess::RtpProcess(uint32_t ssrc) { _track->_samplerate = 90000; _track->_type = TrackVideo; _track->_ssrc = _ssrc; - getNextRtpType(); DebugL << printSSRC(_ssrc); GET_CONFIG(bool,toRtxp,General::kPublishToRtxp); @@ -155,12 +152,9 @@ bool RtpProcess::inputRtp(const char *data, int data_len,const struct sockaddr * return ret; } -void RtpProcess::getNextRtpType(){ - _rtp_type = kRtpTypes[_rtp_type_idx++]; - _rtp_dec_failed_cnt = 0; - if(_rtp_type_idx == kRtpTypes.size()){ - _rtp_type_idx = 0; - } +//判断是否为ts负载 +static inline bool checkTS(const uint8_t *packet, int bytes){ + return bytes % 188 == 0 && packet[0] == 0x47; } void RtpProcess::onRtpSorted(const RtpPacket::Ptr &rtp, int) { @@ -168,29 +162,29 @@ void RtpProcess::onRtpSorted(const RtpPacket::Ptr &rtp, int) { WarnL << rtp->sequence << " != " << _sequence << "+1"; } _sequence = rtp->sequence; - if(_save_file_rtp){ uint16_t size = rtp->size() - 4; size = htons(size); fwrite((uint8_t *) &size, 2, 1, _save_file_rtp.get()); fwrite((uint8_t *) rtp->data() + 4, rtp->size() - 4, 1, _save_file_rtp.get()); } - - decodeRtp(rtp->data() + 4 ,rtp->size() - 4,_rtp_type); + decodeRtp(rtp->data() + 4 ,rtp->size() - 4); } -void RtpProcess::onRtpDecode(const uint8_t *packet, int bytes, uint32_t, int flags) { +void RtpProcess::onRtpDecode(const uint8_t *packet, int bytes, uint32_t timestamp, int flags) { if(_save_file_ps){ fwrite((uint8_t *)packet,bytes, 1, _save_file_ps.get()); } if(!_decoder){ //创建解码器 - if(bytes % 188 == 0 || packet[0] == 0x47){ + if(checkTS(packet, bytes)){ //猜测是ts负载 + InfoL << "judged to be TS: " << printSSRC(_ssrc); _decoder = Decoder::createDecoder(Decoder::decoder_ts); }else{ //猜测是ps负载 + InfoL << "judged to be PS: " << printSSRC(_ssrc); _decoder = Decoder::createDecoder(Decoder::decoder_ps); } _decoder->setOnDecode([this](int stream,int codecid,int flags,int64_t pts,int64_t dts,const void *data,int bytes){ @@ -201,12 +195,6 @@ void RtpProcess::onRtpDecode(const uint8_t *packet, int bytes, uint32_t, int fla auto ret = _decoder->input((uint8_t *)packet,bytes); if(ret != bytes){ WarnL << ret << " != " << bytes << " " << flags; - if(++_rtp_dec_failed_cnt == 10){ - getNextRtpType(); - InfoL << "rtp of ssrc " << printSSRC(_ssrc) << " change to type: " << _rtp_type ; - } - } else{ - _rtp_dec_failed_cnt = 0; } } diff --git a/src/Rtp/RtpProcess.h b/src/Rtp/RtpProcess.h index e8295969..d4395ecf 100644 --- a/src/Rtp/RtpProcess.h +++ b/src/Rtp/RtpProcess.h @@ -49,7 +49,6 @@ public: bool alive(); string get_peer_ip(); uint16_t get_peer_port(); - int totalReaderCount(); void setListener(const std::weak_ptr &listener); protected: @@ -73,9 +72,6 @@ private: Ticker _last_rtp_time; map _stamps; uint32_t _dts = 0; - int _rtp_type_idx = 0; - string _rtp_type; - int _rtp_dec_failed_cnt = 0; Decoder::Ptr _decoder; };