Merge pull request #11 from xiongziliang/master

update
This commit is contained in:
baiyfcu 2020-03-16 18:05:15 +08:00 committed by GitHub
commit 935aea057a
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
25 changed files with 198 additions and 145 deletions

@ -1 +1 @@
Subproject commit 8d40dad3dbdce171756691d4511aca49fcf2a231
Subproject commit 737b8d852eeb1a36bc77854f327fbbef7cfb81be

View File

@ -67,7 +67,7 @@ git submodule update --init
- 支持http文件访问鉴权
- GB28181
- 支持UDP/TCP国标RTP推流可以转换成RTSP/RTMP/HLS等协议
- 支持UDP/TCP国标RTP(PS或TS)推流可以转换成RTSP/RTMP/HLS等协议
- 点播
- 支持录制为FLV/HLS/MP4

View File

@ -51,7 +51,7 @@
- Auto close stream when nobody played.
- Play and push authentication.
- Pull stream on Demand.
- Support TS / PS streaming push through RTP,and it can be converted to RTSP / RTMP / HLS / FLV.
- Protocol conversion:

View File

@ -109,7 +109,20 @@ API_EXPORT const char* API_CALL mk_media_source_get_stream(const mk_media_source
API_EXPORT int API_CALL mk_media_source_get_reader_count(const mk_media_source ctx);
//MediaSource::totalReaderCount()
API_EXPORT int API_CALL mk_media_source_get_total_reader_count(const mk_media_source ctx);
//MediaSource::close()
/**
* ZLMediaKit中被称作为MediaSource
* 3RtmpMediaSourceRtspMediaSourceHlsMediaSource
* :
* rtsp/rtmp/rtp推流mp4点播
* mk_media_create创建的对象(DevChannel)mk_proxy_player_create创建的对象(PlayerProxy)
* ZLMediaKit已经默认适配了MediaSource::close()
*
* mk_proxy_player_set_on_closemk_media_set_on_close函数可以设置回调,
* mk_media_source_close函数
* @param ctx
* @param force
* @return 01
*/
API_EXPORT int API_CALL mk_media_source_close(const mk_media_source ctx,int force);
//MediaSource::seekTo()
API_EXPORT int API_CALL mk_media_source_seek_to(const mk_media_source ctx,uint32_t stamp);

View File

@ -130,17 +130,18 @@ API_EXPORT void API_CALL mk_media_input_aac(mk_media ctx, void *data, int len, u
API_EXPORT void API_CALL mk_media_input_aac1(mk_media ctx, void *data, int len, uint32_t dts, void *adts);
/**
* MediaSource.close()
* mk_media_release函数销毁该对象
* MediaSource.close()
* MediaSource时
* mk_media_release函数并且释放其他资源
* mk_media_release函数MediaSource.close()
* @param user_data mk_media_set_on_close函数设置
* @return 00
*/
typedef int(API_CALL *on_mk_media_close)(void *user_data);
typedef void(API_CALL *on_mk_media_close)(void *user_data);
/**
* MediaSource.close()
* MediaSource时
*
* MediaSource时
* mk_media_release函数并且释放其他资源
* @param ctx
* @param cb
* @param user_data

View File

@ -68,6 +68,32 @@ API_EXPORT void API_CALL mk_proxy_player_set_option(mk_proxy_player ctx, const c
*/
API_EXPORT void API_CALL mk_proxy_player_play(mk_proxy_player ctx, const char *url);
/**
* MediaSource.close()
* MediaSource时
* mk_proxy_player_release函数并且释放其他资源
* mk_proxy_player_release函数MediaSource.close()
* @param user_data mk_proxy_player_set_on_close函数设置
*/
typedef void(API_CALL *on_mk_proxy_player_close)(void *user_data);
/**
* MediaSource.close()
* MediaSource时
* mk_proxy_player_release函数并且释放其他资源
* @param ctx
* @param cb
* @param user_data
*/
API_EXPORT void API_CALL mk_proxy_player_set_on_close(mk_proxy_player ctx, on_mk_proxy_player_close cb, void *user_data);
/**
*
* @param ctx
* @return
*/
API_EXPORT int API_CALL mk_proxy_player_total_reader_count(mk_proxy_player ctx);
#ifdef __cplusplus
}
#endif

View File

@ -62,14 +62,11 @@ protected:
}
if(!_cb){
//未设置回调,没法关闭
WarnL << "请使用mk_media_set_on_close函数设置回调函数!";
return false;
}
if(!_cb(_user_data)){
//回调选择返回不关闭该视频
return false;
}
//回调中已经关闭该视频
//请在回调中调用mk_media_release函数释放资源,否则MediaSource::close()操作不会生效
_cb(_user_data);
WarnL << "close media:" << sender.getSchema() << "/" << sender.getVhost() << "/" << sender.getApp() << "/" << sender.getId() << " " << force;
return true;
}

View File

@ -61,3 +61,22 @@ API_EXPORT void API_CALL mk_proxy_player_play(mk_proxy_player ctx, const char *u
obj->play(url_str);
});
}
API_EXPORT void API_CALL mk_proxy_player_set_on_close(mk_proxy_player ctx, on_mk_proxy_player_close cb, void *user_data){
assert(ctx);
PlayerProxy::Ptr &obj = *((PlayerProxy::Ptr *) ctx);
obj->getPoller()->async([obj,cb,user_data](){
//切换线程再操作
obj->setOnClose([cb,user_data](){
if(cb){
cb(user_data);
}
});
});
}
API_EXPORT int API_CALL mk_proxy_player_total_reader_count(mk_proxy_player ctx){
assert(ctx);
PlayerProxy::Ptr &obj = *((PlayerProxy::Ptr *) ctx);
return obj->totalReaderCount();
}

View File

@ -137,6 +137,7 @@ void MediaSink::emitAllTrackReady() {
//移除未准备好的Track
for (auto it = _track_map.begin(); it != _track_map.end();) {
if (!it->second->ready()) {
WarnL << "该track长时间未被初始化,已忽略:" << it->second->getCodecName();
it = _track_map.erase(it);
continue;
}

View File

@ -340,6 +340,12 @@ void MediaInfo::parse(const string &url){
} else{
_host = _vhost = vhost;
}
if(_vhost == "localhost" || INADDR_NONE != inet_addr(_vhost.data())){
//如果访问的是localhost或ip那么则为默认虚拟主机
_vhost = DEFAULT_VHOST;
}
}
if(split_vec.size() > 1){
_app = split_vec[1];
@ -366,7 +372,8 @@ void MediaInfo::parse(const string &url){
}
GET_CONFIG(bool,enableVhost,General::kEnableVhost);
if(!enableVhost || _vhost.empty() || _vhost == "localhost" || INADDR_NONE != inet_addr(_vhost.data())){
if(!enableVhost || _vhost.empty()){
//如果关闭虚拟主机或者虚拟主机为空,则设置虚拟主机为默认
_vhost = DEFAULT_VHOST;
}
}

View File

@ -33,6 +33,10 @@
#include "Record/HlsMediaSource.h"
#include "Record/HlsRecorder.h"
/**
* 使使setListener方法来绑定MediaSource相关的事件
* MediaSource(rtsp/rtmp/hls)
*/
class MultiMediaSourceMuxer : public MediaSink , public std::enable_shared_from_this<MultiMediaSourceMuxer>{
public:
class Listener{

View File

@ -24,6 +24,7 @@
* SOFTWARE.
*/
#include "AACRtp.h"
#define ADTS_HEADER_LEN 7
namespace mediakit{
@ -91,8 +92,8 @@ AACRtpDecoder::AACRtpDecoder() {
AACFrame::Ptr AACRtpDecoder::obtainFrame() {
//从缓存池重新申请对象,防止覆盖已经写入环形缓存的对象
auto frame = ResourcePoolHelper<AACFrame>::obtainObj();
frame->aac_frame_length = 7;
frame->iPrefixSize = 7;
frame->aac_frame_length = ADTS_HEADER_LEN;
frame->iPrefixSize = ADTS_HEADER_LEN;
if(frame->syncword == 0 && !_aac_cfg.empty()) {
makeAdtsHeader(_aac_cfg,*frame);
}
@ -100,70 +101,48 @@ AACFrame::Ptr AACRtpDecoder::obtainFrame() {
}
bool AACRtpDecoder::inputRtp(const RtpPacket::Ptr &rtppack, bool key_pos) {
// 获取rtp数据长度
int length = rtppack->size() - rtppack->offset;
//rtp数据开始部分
uint8_t *ptr = (uint8_t *) rtppack->data() + rtppack->offset;
//rtp数据末尾
const uint8_t *end = (uint8_t *) rtppack->data() + rtppack->size();
// 获取rtp数据
const uint8_t *rtp_packet_buf = (uint8_t *)rtppack->data() + rtppack->offset;
do
{
// 查询头部的偏移每次2字节
uint32_t au_header_offset = 0;
//首2字节表示Au-Header的长度单位bit所以除以16得到Au-Header字节数
const uint16_t au_header_length = (((rtp_packet_buf[au_header_offset] << 8) | rtp_packet_buf[au_header_offset + 1]) >> 4);
au_header_offset += 2;
//assert(length > (2 + au_header_length * 2));
if (length < (2 + au_header_length * 2))
break;
// 存放每一个aac帧长度
std::vector<uint32_t > vec_aac_len;
for (int i = 0; i < au_header_length; ++i)
{
// 之后的2字节是AU_HEADER
const uint16_t au_header = ((rtp_packet_buf[au_header_offset] << 8) | rtp_packet_buf[au_header_offset + 1]);
// 其中高13位表示一帧AAC负载的字节长度低3位无用
uint32_t nAac = (au_header >> 3);
vec_aac_len.push_back(nAac);
au_header_offset += 2;
}
//首2字节表示Au-Header的个数单位bit所以除以16得到Au-Header个数
const uint16_t au_header_count = ((ptr[0] << 8) | ptr[1]) >> 4;
//忽略Au-Header区
ptr += 2 + au_header_count * 2;
// 真正aac负载开始处
const uint8_t *rtp_packet_payload = rtp_packet_buf + au_header_offset;
// 载荷查找
uint32_t next_aac_payload_offset = 0;
for (int j = 0; j < au_header_length; ++j)
{
// 当前aac包长度
const uint32_t cur_aac_payload_len = vec_aac_len.at(j);
static const uint32_t max_size = sizeof(AACFrame::buffer) - ADTS_HEADER_LEN;
while (ptr < end) {
auto size = (uint32_t) (end - ptr);
if(size > max_size){
size = max_size;
}
if (_adts->aac_frame_length + size > sizeof(AACFrame::buffer)) {
//数据太多了,先清空
flushData();
}
//追加aac数据
memcpy(_adts->buffer + _adts->aac_frame_length, ptr, size);
_adts->aac_frame_length += size;
_adts->timeStamp = rtppack->timeStamp;
ptr += size;
}
if (_adts->aac_frame_length + cur_aac_payload_len > sizeof(AACFrame::buffer)) {
_adts->aac_frame_length = 7;
WarnL << "aac负载数据太长";
return false;
}
// 提取每一包aac载荷数据
memcpy(_adts->buffer + _adts->aac_frame_length, rtp_packet_payload + next_aac_payload_offset, cur_aac_payload_len);
_adts->aac_frame_length += (cur_aac_payload_len);
if (rtppack->mark == true) {
_adts->timeStamp = rtppack->timeStamp;
writeAdtsHeader(*_adts, _adts->buffer);
onGetAAC(_adts);
}
next_aac_payload_offset += cur_aac_payload_len;
}
} while (0);
if (rtppack->mark) {
//最后一个rtp分片
flushData();
}
return false;
}
void AACRtpDecoder::onGetAAC(const AACFrame::Ptr &frame) {
//写入环形缓存
RtpCodec::inputFrame(frame);
void AACRtpDecoder::flushData() {
if(_adts->aac_frame_length == ADTS_HEADER_LEN){
//没有有效数据
return;
}
writeAdtsHeader(*_adts, _adts->buffer);
RtpCodec::inputFrame(_adts);
_adts = obtainFrame();
}

View File

@ -56,8 +56,8 @@ public:
protected:
AACRtpDecoder();
private:
void onGetAAC(const AACFrame::Ptr &frame);
AACFrame::Ptr obtainFrame();
void flushData();
private:
AACFrame::Ptr _adts;
string _aac_cfg;

View File

@ -155,7 +155,7 @@ RtpCodec::Ptr Factory::getRtpDecoderByTrack(const Track::Ptr &track) {
case CodecAAC:
return std::make_shared<AACRtpDecoder>(track->clone());
default:
WarnL << "暂不支持该CodecId:" << track->getCodecId();
WarnL << "暂不支持该CodecId:" << track->getCodecName();
return nullptr;
}
}
@ -212,7 +212,7 @@ RtmpCodec::Ptr Factory::getRtmpCodecByTrack(const Track::Ptr &track) {
case CodecAAC:
return std::make_shared<AACRtmpEncoder>(track);
default:
WarnL << "暂不支持该CodecId:" << track->getCodecId();
WarnL << "暂不支持该CodecId:" << track->getCodecName();
return nullptr;
}
}

View File

@ -38,5 +38,16 @@ Frame::Ptr Frame::getCacheAbleFrame(const Frame::Ptr &frame){
return std::make_shared<FrameCacheAble>(frame);
}
#define SWITCH_CASE(codec_id) case codec_id : return #codec_id
const char *CodecInfo::getCodecName() {
switch (getCodecId()) {
SWITCH_CASE(CodecH264);
SWITCH_CASE(CodecH265);
SWITCH_CASE(CodecAAC);
default:
return "unknown codec";
}
}
}//namespace mediakit

View File

@ -72,6 +72,12 @@ public:
*
*/
virtual CodecId getCodecId() const = 0;
/**
*
* @return
*/
const char *getCodecName();
};
/**

View File

@ -53,20 +53,29 @@ HttpSession::~HttpSession() {
TraceP(this);
}
void HttpSession::Handle_Req_HEAD(int64_t &content_len){
//暂时全部返回200 OK因为HTTP GET存在按需生成流的操作所以不能按照HTTP GET的流程返回
//如果直接返回404那么又会导致按需生成流的逻辑失效所以HTTP HEAD在静态文件或者已存在资源时才有效
//对于按需生成流的直播场景并不适用
sendResponse("200 OK", true);
}
int64_t HttpSession::onRecvHeader(const char *header,uint64_t len) {
typedef void (HttpSession::*HttpCMDHandle)(int64_t &);
static unordered_map<string, HttpCMDHandle> s_func_map;
static onceToken token([]() {
s_func_map.emplace("GET",&HttpSession::Handle_Req_GET);
s_func_map.emplace("POST",&HttpSession::Handle_Req_POST);
}, nullptr);
s_func_map.emplace("HEAD",&HttpSession::Handle_Req_HEAD);
}, nullptr);
_parser.Parse(header);
urlDecode(_parser);
string cmd = _parser.Method();
auto it = s_func_map.find(cmd);
if (it == s_func_map.end()) {
sendResponse("403 Forbidden", true);
WarnL << "不支持该命令:" << cmd;
sendResponse("405 Not Allowed", true);
return 0;
}
@ -256,8 +265,11 @@ bool HttpSession::checkLiveFlvStream(const function<void()> &cb){
return true;
}
void HttpSession::Handle_Req_GET(int64_t &content_len) {
Handle_Req_GET_l(content_len, true);
}
void HttpSession::Handle_Req_GET_l(int64_t &content_len, bool sendBody) {
//先看看是否为WebSocket请求
if(checkWebSocket()){
content_len = -1;

View File

@ -107,8 +107,11 @@ protected:
void onWebSocketEncodeData(const Buffer::Ptr &buffer) override;
private:
void Handle_Req_GET(int64_t &content_len);
void Handle_Req_POST(int64_t &content_len);
bool checkLiveFlvStream(const function<void()> &cb = nullptr);
void Handle_Req_GET_l(int64_t &content_len, bool sendBody);
void Handle_Req_POST(int64_t &content_len);
void Handle_Req_HEAD(int64_t &content_len);
bool checkLiveFlvStream(const function<void()> &cb = nullptr);
bool checkWebSocket();
bool emitHttpEvent(bool doInvoke);
void urlDecode(Parser &parser);

View File

@ -69,24 +69,19 @@ char StrToBin(const char *str)
}
string strCoding::UrlEncode(const string &str) {
string dd;
size_t len = str.size();
for (size_t i = 0; i < len; i++) {
if (isalnum((uint8_t)str[i])) {
char tempbuff[2];
sprintf(tempbuff, "%c", str[i]);
dd.append(tempbuff);
}
else if (isspace((uint8_t)str[i])) {
dd.append("+");
}
else {
char tempbuff[4];
sprintf(tempbuff, "%%%X%X", (uint8_t)str[i] >> 4,(uint8_t)str[i] % 16);
dd.append(tempbuff);
string out;
size_t len = str.size();
for (size_t i = 0; i < len; ++i) {
char ch = str[i];
if (isalnum((uint8_t)ch)) {
out.push_back(ch);
}else {
char buf[4];
sprintf(buf, "%%%X%X", (uint8_t)ch >> 4,(uint8_t)ch & 0x0F);
out.append(buf);
}
}
return dd;
return out;
}
string strCoding::UrlDecode(const string &str) {
string output = "";
@ -94,16 +89,18 @@ string strCoding::UrlDecode(const string &str) {
int i = 0, len = str.length();
while (i < len) {
if (str[i] == '%') {
if(i > len - 3){
//防止内存溢出
break;
}
tmp[0] = str[i + 1];
tmp[1] = str[i + 2];
output += StrToBin(tmp);
i = i + 3;
}
else if (str[i] == '+') {
} else if (str[i] == '+') {
output += ' ';
i++;
}
else {
} else {
output += str[i];
i++;
}

View File

@ -75,13 +75,16 @@ public:
* @param strUrl
*/
void play(const string &strUrl) override;
/**
*
*/
int totalReaderCount() ;
private:
//MediaSourceEvent override
bool close(MediaSource &sender,bool force) override;
void onNoneReader(MediaSource &sender) override;
int totalReaderCount(MediaSource &sender) override;
int totalReaderCount() ;
void rePlay(const string &strUrl,int iFailedCnt);
void onPlaySuccess();
private:

View File

@ -258,7 +258,7 @@ void MP4Muxer::addTrack(const Track::Ptr &track) {
}
break;
default:
WarnL << "MP4录制不支持该编码格式:" << track->getCodecId();
WarnL << "MP4录制不支持该编码格式:" << track->getCodecName();
break;
}
}

View File

@ -25,7 +25,6 @@
*/
#if defined(ENABLE_RTPPROXY)
#include <assert.h>
#include "Util/logger.h"
#include "RtpDecoder.h"
#include "rtp-payload.h"
@ -44,13 +43,7 @@ RtpDecoder::~RtpDecoder() {
}
}
void RtpDecoder::decodeRtp(const void *data, int bytes,const string &type_name) {
if(_rtp_type != type_name && _rtp_decoder){
//rtp类型发生变化切换之
rtp_payload_decode_destroy(_rtp_decoder);
_rtp_decoder = nullptr;
}
void RtpDecoder::decodeRtp(const void *data, int bytes) {
if(!_rtp_decoder){
static rtp_payload_t s_func= {
[](void* param, int bytes){
@ -69,11 +62,9 @@ void RtpDecoder::decodeRtp(const void *data, int bytes,const string &type_name)
uint8_t rtp_type = 0x7F & ((uint8_t *) data)[1];
InfoL << "rtp type:" << (int) rtp_type;
_rtp_decoder = rtp_payload_decode_create(rtp_type, type_name.data(), &s_func, this);
_rtp_decoder = rtp_payload_decode_create(rtp_type, "MP2P", &s_func, this);
if (!_rtp_decoder) {
WarnL << "unsupported rtp type:" << (int) rtp_type << ",size:" << bytes << ",hexdump" << hexdump(data, bytes > 16 ? 16 : bytes);
}else{
_rtp_type = type_name;
}
}

View File

@ -38,12 +38,11 @@ public:
RtpDecoder();
virtual ~RtpDecoder();
protected:
void decodeRtp(const void *data, int bytes,const string &type_name);
void decodeRtp(const void *data, int bytes);
virtual void onRtpDecode(const uint8_t *packet, int bytes, uint32_t timestamp, int flags) = 0;
private:
void *_rtp_decoder = nullptr;
BufferRaw::Ptr _buffer;
string _rtp_type;
};
}//namespace mediakit

View File

@ -33,8 +33,6 @@
namespace mediakit{
static const vector<string> kRtpTypes = {"MP2P","MP4V-ES"};
/**
* frame
*/
@ -85,7 +83,6 @@ RtpProcess::RtpProcess(uint32_t ssrc) {
_track->_samplerate = 90000;
_track->_type = TrackVideo;
_track->_ssrc = _ssrc;
getNextRtpType();
DebugL << printSSRC(_ssrc);
GET_CONFIG(bool,toRtxp,General::kPublishToRtxp);
@ -155,12 +152,9 @@ bool RtpProcess::inputRtp(const char *data, int data_len,const struct sockaddr *
return ret;
}
void RtpProcess::getNextRtpType(){
_rtp_type = kRtpTypes[_rtp_type_idx++];
_rtp_dec_failed_cnt = 0;
if(_rtp_type_idx == kRtpTypes.size()){
_rtp_type_idx = 0;
}
//判断是否为ts负载
static inline bool checkTS(const uint8_t *packet, int bytes){
return bytes % 188 == 0 && packet[0] == 0x47;
}
void RtpProcess::onRtpSorted(const RtpPacket::Ptr &rtp, int) {
@ -168,29 +162,29 @@ void RtpProcess::onRtpSorted(const RtpPacket::Ptr &rtp, int) {
WarnL << rtp->sequence << " != " << _sequence << "+1";
}
_sequence = rtp->sequence;
if(_save_file_rtp){
uint16_t size = rtp->size() - 4;
size = htons(size);
fwrite((uint8_t *) &size, 2, 1, _save_file_rtp.get());
fwrite((uint8_t *) rtp->data() + 4, rtp->size() - 4, 1, _save_file_rtp.get());
}
decodeRtp(rtp->data() + 4 ,rtp->size() - 4,_rtp_type);
decodeRtp(rtp->data() + 4 ,rtp->size() - 4);
}
void RtpProcess::onRtpDecode(const uint8_t *packet, int bytes, uint32_t, int flags) {
void RtpProcess::onRtpDecode(const uint8_t *packet, int bytes, uint32_t timestamp, int flags) {
if(_save_file_ps){
fwrite((uint8_t *)packet,bytes, 1, _save_file_ps.get());
}
if(!_decoder){
//创建解码器
if(bytes % 188 == 0 || packet[0] == 0x47){
if(checkTS(packet, bytes)){
//猜测是ts负载
InfoL << "judged to be TS: " << printSSRC(_ssrc);
_decoder = Decoder::createDecoder(Decoder::decoder_ts);
}else{
//猜测是ps负载
InfoL << "judged to be PS: " << printSSRC(_ssrc);
_decoder = Decoder::createDecoder(Decoder::decoder_ps);
}
_decoder->setOnDecode([this](int stream,int codecid,int flags,int64_t pts,int64_t dts,const void *data,int bytes){
@ -201,12 +195,6 @@ void RtpProcess::onRtpDecode(const uint8_t *packet, int bytes, uint32_t, int fla
auto ret = _decoder->input((uint8_t *)packet,bytes);
if(ret != bytes){
WarnL << ret << " != " << bytes << " " << flags;
if(++_rtp_dec_failed_cnt == 10){
getNextRtpType();
InfoL << "rtp of ssrc " << printSSRC(_ssrc) << " change to type: " << _rtp_type ;
}
} else{
_rtp_dec_failed_cnt = 0;
}
}

View File

@ -49,7 +49,6 @@ public:
bool alive();
string get_peer_ip();
uint16_t get_peer_port();
int totalReaderCount();
void setListener(const std::weak_ptr<MediaSourceEvent> &listener);
protected:
@ -73,9 +72,6 @@ private:
Ticker _last_rtp_time;
map<int,Stamp> _stamps;
uint32_t _dts = 0;
int _rtp_type_idx = 0;
string _rtp_type;
int _rtp_dec_failed_cnt = 0;
Decoder::Ptr _decoder;
};