Support multi audio/video track

This commit is contained in:
夏楚 2023-12-09 22:34:22 +08:00 committed by GitHub
parent bbe8f4a018
commit 64f15202de
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
36 changed files with 366 additions and 278 deletions

View File

@ -36,6 +36,7 @@
- 极致体验,[独家特性](https://github.com/ZLMediaKit/ZLMediaKit/wiki/ZLMediakit%E7%8B%AC%E5%AE%B6%E7%89%B9%E6%80%A7%E4%BB%8B%E7%BB%8D)
- [谁在使用zlmediakit?](https://github.com/ZLMediaKit/ZLMediaKit/issues/511)
- 全面支持ipv6网络
- 支持多轨道模式(一个流中多个视频/音频)
## 项目定位
@ -76,16 +77,19 @@
- 通过cookie追踪技术可以模拟HLS播放为长连接可以实现HLS按需拉流、播放统计等业务
- 支持HLS播发器支持拉流HLS转rtsp/rtmp/mp4
- 支持H264/H265/AAC/G711/OPUS编码
- 支持多轨道模式
- TS
- 支持http[s]-ts直播
- 支持ws[s]-ts直播
- 支持H264/H265/AAC/G711/OPUS编码
- 支持多轨道模式
- fMP4
- 支持http[s]-fmp4直播
- 支持ws[s]-fmp4直播
- 支持H264/H265/AAC/G711/OPUS/MJPEG编码
- 支持多轨道模式
- HTTP[S]与WebSocket
- 服务器支持`目录索引生成`,`文件下载`,`表单提交请求`
@ -104,11 +108,13 @@
- 支持es/ps rtp转推
- 支持GB28181主动拉流模式
- 支持双向语音对讲
- 支持多轨道模式
- MP4点播与录制
- 支持录制为FLV/HLS/MP4
- RTSP/RTMP/HTTP-FLV/WS-FLV支持MP4文件点播支持seek
- 支持H264/H265/AAC/G711/OPUS编码
- 支持多轨道模式
- WebRTC
- 支持WebRTC推流支持转其他协议

View File

@ -1818,6 +1818,8 @@ void installWebApi() {
CHECK_ARGS("vhost", "app", "stream", "file_path");
ProtocolOption option;
// mp4支持多track
option.max_track = 16;
// 默认解复用mp4不生成mp4
option.enable_mp4 = false;
// 但是如果参数明确指定开启mp4, 那么也允许之

View File

@ -12,6 +12,8 @@
#include "Common/config.h"
#include "Extension/Factory.h"
#define MUTE_AUDIO_INDEX 0xFFFF
using namespace std;
namespace mediakit{
@ -33,28 +35,30 @@ bool MediaSink::addTrack(const Track::Ptr &track_in) {
WarnL << "All track is ready, add track too late: " << track_in->getCodecName();
return false;
}
//克隆Track只拷贝其数据不拷贝其数据转发关系
// 克隆Track只拷贝其数据不拷贝其数据转发关系
auto track = track_in->clone();
auto track_type = track->getTrackType();
_track_map[track_type] = std::make_pair(track, false);
_track_ready_callback[track_type] = [this, track]() {
onTrackReady(track);
};
auto index = track->getIndex();
if (!_track_map.emplace(index, std::make_pair(track, false)).second) {
WarnL << "Already add a same track: " << track->getIndex() << ", codec: " << track->getCodecName();
return false;
}
_ticker.resetTime();
_audio_add = track->getTrackType() == TrackAudio ? true : _audio_add;
_track_ready_callback[index] = [this, track]() { onTrackReady(track); };
track->addDelegate([this](const Frame::Ptr &frame) {
if (_all_track_ready) {
return onTrackFrame(frame);
}
auto &frame_unread = _frame_unread[frame->getTrackType()];
auto &frame_unread = _frame_unread[frame->getIndex()];
GET_CONFIG(uint32_t, kMaxUnreadyFrame, General::kUnreadyFrameCache);
if (frame_unread.size() > kMaxUnreadyFrame) {
//未就绪的的track不能缓存太多的帧否则可能内存溢出
// 未就绪的的track不能缓存太多的帧否则可能内存溢出
frame_unread.clear();
WarnL << "Cached frame of unready track(" << frame->getCodecName() << ") is too much, now cleared";
}
//还有Track未就绪先缓存之
// 还有Track未就绪先缓存之
frame_unread.emplace_back(Frame::getCacheAbleFrame(frame));
return true;
});
@ -62,36 +66,37 @@ bool MediaSink::addTrack(const Track::Ptr &track_in) {
}
void MediaSink::resetTracks() {
_all_track_ready = false;
_audio_add = false;
_have_video = false;
_track_map.clear();
_track_ready_callback.clear();
_all_track_ready = false;
_mute_audio_maker = nullptr;
_ticker.resetTime();
_max_track_size = 2;
_track_map.clear();
_frame_unread.clear();
_track_ready_callback.clear();
}
bool MediaSink::inputFrame(const Frame::Ptr &frame) {
auto it = _track_map.find(frame->getTrackType());
auto it = _track_map.find(frame->getIndex());
if (it == _track_map.end()) {
return false;
}
//got frame
// got frame
it->second.second = true;
auto ret = it->second.first->inputFrame(frame);
if (_mute_audio_maker && frame->getTrackType() == TrackVideo) {
//视频驱动产生静音音频
// 视频驱动产生静音音频
_mute_audio_maker->inputFrame(frame);
}
checkTrackIfReady();
return ret;
}
void MediaSink::checkTrackIfReady(){
void MediaSink::checkTrackIfReady() {
if (!_all_track_ready && !_track_ready_callback.empty()) {
for (auto &pr : _track_map) {
if (pr.second.second && pr.second.first->ready()) {
//Track由未就绪状态转换成就绪状态我们就触发onTrackReady回调
// Track由未就绪状态转换成就绪状态我们就触发onTrackReady回调
auto it = _track_ready_callback.find(pr.first);
if (it != _track_ready_callback.end()) {
it->second();
@ -101,28 +106,34 @@ void MediaSink::checkTrackIfReady(){
}
}
if(!_all_track_ready){
if (!_all_track_ready) {
GET_CONFIG(uint32_t, kMaxWaitReadyMS, General::kWaitTrackReadyMS);
if(_ticker.elapsedTime() > kMaxWaitReadyMS){
//如果超过规定时间那么不再等待并忽略未准备好的Track
if (_ticker.elapsedTime() > kMaxWaitReadyMS) {
// 如果超过规定时间那么不再等待并忽略未准备好的Track
emitAllTrackReady();
return;
}
if(!_track_ready_callback.empty()){
//在超时时间内如果存在未准备好的Track那么继续等待
if (!_track_ready_callback.empty()) {
// 在超时时间内如果存在未准备好的Track那么继续等待
return;
}
if(_track_map.size() == _max_track_size){
//如果已经添加了音视频Track并且不存在未准备好的Track那么说明所有Track都准备好了
if (_only_audio && _audio_add) {
// 只开启音频
emitAllTrackReady();
return;
}
if (_track_map.size() == _max_track_size) {
// 如果已经添加了音视频Track并且不存在未准备好的Track那么说明所有Track都准备好了
emitAllTrackReady();
return;
}
GET_CONFIG(uint32_t, kMaxAddTrackMS, General::kWaitAddTrackMS);
if(_track_map.size() == 1 && _ticker.elapsedTime() > kMaxAddTrackMS){
//如果只有一个Track那么在该Track添加后我们最多还等待若干时间(可能后面还会添加Track)
if (_track_map.size() == 1 && _ticker.elapsedTime() > kMaxAddTrackMS) {
// 如果只有一个Track那么在该Track添加后我们最多还等待若干时间(可能后面还会添加Track)
emitAllTrackReady();
return;
}
@ -138,7 +149,7 @@ void MediaSink::setMaxTrackCount(size_t i) {
WarnL << "All track is ready, set max track count ignored";
return;
}
_max_track_size = MAX(MIN(i, 2), 1);
_max_track_size = MAX(i, 1);
checkTrackIfReady();
}
@ -149,9 +160,9 @@ void MediaSink::emitAllTrackReady() {
DebugL << "All track ready use " << _ticker.elapsedTime() << "ms";
if (!_track_ready_callback.empty()) {
//这是超时强制忽略未准备好的Track
// 这是超时强制忽略未准备好的Track
_track_ready_callback.clear();
//移除未准备好的Track
// 移除未准备好的Track
for (auto it = _track_map.begin(); it != _track_map.end();) {
if (!it->second.second || !it->second.first->ready()) {
WarnL << "Track not ready for a long time, ignored: " << it->second.first->getCodecName();
@ -163,25 +174,23 @@ void MediaSink::emitAllTrackReady() {
}
if (!_track_map.empty()) {
//最少有一个有效的Track
// 最少有一个有效的Track
onAllTrackReady_l();
//全部Track就绪我们一次性把之前的帧输出
for(auto &pr : _frame_unread){
// 全部Track就绪我们一次性把之前的帧输出
for (auto &pr : _frame_unread) {
if (_track_map.find(pr.first) == _track_map.end()) {
//该Track已经被移除
// 该Track已经被移除
continue;
}
pr.second.for_each([&](const Frame::Ptr &frame) {
MediaSink::inputFrame(frame);
});
pr.second.for_each([&](const Frame::Ptr &frame) { MediaSink::inputFrame(frame); });
}
_frame_unread.clear();
}
}
void MediaSink::onAllTrackReady_l() {
//是否添加静音音频
// 是否添加静音音频
if (_add_mute_audio) {
addMuteAudioTrack();
}
@ -190,10 +199,10 @@ void MediaSink::onAllTrackReady_l() {
_have_video = (bool)getTrack(TrackVideo);
}
vector<Track::Ptr> MediaSink::getTracks(bool ready) const{
vector<Track::Ptr> MediaSink::getTracks(bool ready) const {
vector<Track::Ptr> ret;
for (auto &pr : _track_map){
if(ready && !pr.second.first->ready()){
for (auto &pr : _track_map) {
if (ready && !pr.second.first->ready()) {
continue;
}
ret.emplace_back(pr.second.first);
@ -230,14 +239,20 @@ static uint8_t s_mute_adts[] = {0xff, 0xf1, 0x6c, 0x40, 0x2d, 0x3f, 0xfc, 0x00,
static uint8_t ADTS_CONFIG[2] = { 0x15, 0x88 };
bool MuteAudioMaker::inputFrame(const Frame::Ptr &frame) {
if (frame->getTrackType() == TrackVideo) {
auto audio_idx = frame->dts() / MUTE_ADTS_DATA_MS;
if (_audio_idx != audio_idx) {
_audio_idx = audio_idx;
auto aacFrame = std::make_shared<FrameToCache<FrameFromPtr>>(CodecAAC, (char *) MUTE_ADTS_DATA, sizeof(s_mute_adts),
_audio_idx * MUTE_ADTS_DATA_MS, 0, 7);
return FrameDispatcher::inputFrame(aacFrame);
}
if (_track_index == -1) {
// 锁定track
_track_index = frame->getIndex();
}
if (frame->getIndex() != _track_index) {
// 不是锁定的track
return false;
}
auto audio_idx = frame->dts() / MUTE_ADTS_DATA_MS;
if (_audio_idx != audio_idx) {
_audio_idx = audio_idx;
auto aacFrame = std::make_shared<FrameToCache<FrameFromPtr>>(CodecAAC, (char *)MUTE_ADTS_DATA, sizeof(s_mute_adts), _audio_idx * MUTE_ADTS_DATA_MS, 0, 7);
aacFrame->setIndex(MUTE_AUDIO_INDEX);
return FrameDispatcher::inputFrame(aacFrame);
}
return false;
}
@ -246,19 +261,18 @@ bool MediaSink::addMuteAudioTrack() {
if (!_enable_audio) {
return false;
}
if (_track_map.find(TrackAudio) != _track_map.end()) {
return false;
for (auto &pr : _track_map) {
if (pr.second.first->getTrackType() == TrackAudio) {
return false;
}
}
auto audio = Factory::getTrackByCodecId(CodecAAC);
audio->setIndex(MUTE_AUDIO_INDEX);
audio->setExtraData(ADTS_CONFIG, 2);
_track_map[audio->getTrackType()] = std::make_pair(audio, true);
audio->addDelegate([this](const Frame::Ptr &frame) {
return onTrackFrame(frame);
});
_track_map[MUTE_AUDIO_INDEX] = std::make_pair(audio, true);
audio->addDelegate([this](const Frame::Ptr &frame) { return onTrackFrame(frame); });
_mute_audio_maker = std::make_shared<MuteAudioMaker>();
_mute_audio_maker->addDelegate([audio](const Frame::Ptr &frame) {
return audio->inputFrame(frame);
});
_mute_audio_maker->addDelegate([audio](const Frame::Ptr &frame) { return audio->inputFrame(frame); });
onTrackReady(audio);
TraceL << "Mute aac track added";
return true;
@ -270,14 +284,12 @@ bool MediaSink::isAllTrackReady() const {
void MediaSink::enableAudio(bool flag) {
_enable_audio = flag;
_max_track_size = flag ? 2 : 1;
}
void MediaSink::setOnlyAudio(){
void MediaSink::setOnlyAudio() {
_only_audio = true;
_enable_audio = true;
_add_mute_audio = false;
_max_track_size = 1;
}
void MediaSink::enableMuteAudio(bool flag) {
@ -332,9 +344,7 @@ bool Demuxer::addTrack(const Track::Ptr &track) {
}
if (_sink->addTrack(track)) {
track->addDelegate([this](const Frame::Ptr &frame) {
return _sink->inputFrame(frame);
});
track->addDelegate([this](const Frame::Ptr &frame) { return _sink->inputFrame(frame); });
return true;
}
return false;
@ -370,4 +380,4 @@ vector<Track::Ptr> Demuxer::getTracks(bool ready) const {
}
return ret;
}
}//namespace mediakit
} // namespace mediakit

View File

@ -55,6 +55,7 @@ public:
bool inputFrame(const Frame::Ptr &frame) override;
private:
int _track_index = -1;
uint64_t _audio_idx = 0;
};
@ -86,7 +87,7 @@ public:
void addTrackCompleted() override;
/**
* track数1~2addTrackCompleted类型
* track数>=1addTrackCompleted类型
* track时
*/
void setMaxTrackCount(size_t i);
@ -163,17 +164,20 @@ private:
bool addMuteAudioTrack();
private:
bool _audio_add = false;
bool _have_video = false;
bool _enable_audio = true;
bool _only_audio = false;
bool _add_mute_audio = true;
bool _all_track_ready = false;
bool _have_video = false;
size_t _max_track_size = 2;
std::unordered_map<int, std::pair<Track::Ptr, bool/*got frame*/> > _track_map;
std::unordered_map<int, toolkit::List<Frame::Ptr> > _frame_unread;
std::unordered_map<int, std::function<void()> > _track_ready_callback;
toolkit::Ticker _ticker;
MuteAudioMaker::Ptr _mute_audio_maker;
std::unordered_map<int, toolkit::List<Frame::Ptr> > _frame_unread;
std::unordered_map<int, std::function<void()> > _track_ready_callback;
std::unordered_map<int, std::pair<Track::Ptr, bool/*got frame*/> > _track_map;
};

View File

@ -202,6 +202,9 @@ public:
// 支持通过on_publish返回值替换stream_id
std::string stream_replace;
// 最大track数
size_t max_track = 2;
template <typename MAP>
ProtocolOption(const MAP &allArgs) : ProtocolOption() {
load(allArgs);
@ -237,6 +240,7 @@ public:
GET_OPT_VALUE(hls_save_path);
GET_OPT_VALUE(stream_replace);
GET_OPT_VALUE(max_track);
}
private:

View File

@ -177,11 +177,8 @@ MultiMediaSourceMuxer::MultiMediaSourceMuxer(const MediaTuple& tuple, float dur_
_poller = EventPollerPool::Instance().getPoller();
_create_in_poller = _poller->isCurrentThread();
_option = option;
if (dur_sec > 0.01) {
// 点播
_stamp[TrackVideo].setPlayBack();
_stamp[TrackAudio].setPlayBack();
}
_dur_sec = dur_sec;
setMaxTrackCount(option.max_track);
if (option.enable_rtmp) {
_rtmp = std::make_shared<RtmpMediaSourceMuxer>(_tuple, option, std::make_shared<TitleMeta>(dur_sec));
@ -464,6 +461,12 @@ std::shared_ptr<MultiMediaSourceMuxer> MultiMediaSourceMuxer::getMuxer(MediaSour
}
bool MultiMediaSourceMuxer::onTrackReady(const Track::Ptr &track) {
auto &stamp = _stamps[track->getIndex()];
if (_dur_sec > 0.01) {
// 点播
stamp.setPlayBack();
}
bool ret = false;
if (_rtmp) {
ret = _rtmp->addTrack(track) ? true : ret;
@ -536,10 +539,14 @@ void MultiMediaSourceMuxer::onAllTrackReady() {
createGopCacheIfNeed();
}
#endif
auto tracks = getTracks(false);
if (tracks.size() >= 2) {
// 音频时间戳同步于视频,因为音频时间戳被修改后不影响播放
_stamp[TrackAudio].syncTo(_stamp[TrackVideo]);
Stamp *first = nullptr;
for (auto &pr : _stamps) {
if (!first) {
first = &pr.second;
} else {
pr.second.syncTo(*first);
}
}
InfoL << "stream: " << shortUrl() << " , codec info: " << getTrackInfoStr(this);
}
@ -589,7 +596,7 @@ void MultiMediaSourceMuxer::resetTracks() {
bool MultiMediaSourceMuxer::onTrackFrame(const Frame::Ptr &frame) {
if (_option.modify_stamp != ProtocolOption::kModifyStampOff) {
// 时间戳不采用原始的绝对时间戳
const_cast<Frame::Ptr&>(frame) = std::make_shared<FrameStamp>(frame, _stamp[frame->getTrackType()], _option.modify_stamp);
const_cast<Frame::Ptr&>(frame) = std::make_shared<FrameStamp>(frame, _stamps[frame->getIndex()], _option.modify_stamp);
}
return _paced_sender ? _paced_sender->inputFrame(frame) : onTrackFrame_l(frame);
}

View File

@ -162,11 +162,12 @@ private:
bool _is_enable = false;
bool _create_in_poller = false;
bool _video_key_pos = false;
float _dur_sec;
std::shared_ptr<class FramePacedSender> _paced_sender;
MediaTuple _tuple;
ProtocolOption _option;
toolkit::Ticker _last_check;
Stamp _stamp[2];
std::unordered_map<int, Stamp> _stamps;
std::weak_ptr<Listener> _track_listener;
std::unordered_multimap<std::string, RingType::RingReader::Ptr> _rtp_sender;
FMP4MediaSourceMuxer::Ptr _fmp4;

View File

@ -275,6 +275,7 @@ static Frame::Ptr addADTSHeader(const Frame::Ptr &frame_in, const std::string &a
frame->_dts = frame_in->dts();
frame->_buffer.assign(adts_header, size);
frame->_buffer.append(frame_in->data(), frame_in->size());
frame->setIndex(frame_in->getIndex());
return frame;
}

View File

@ -42,6 +42,7 @@ Frame::Ptr Frame::getCacheAbleFrame(const Frame::Ptr &frame){
FrameStamp::FrameStamp(Frame::Ptr frame, Stamp &stamp, int modify_stamp)
{
setIndex(frame->getIndex());
_frame = std::move(frame);
// kModifyStampSystem时采用系统时间戳kModifyStampRelative采用相对时间戳
stamp.revise(_frame->dts(), _frame->pts(), _dts, _pts, modify_stamp == ProtocolOption::kModifyStampSystem);

View File

@ -123,7 +123,24 @@ public:
*
*/
TrackType getTrackType() const;
/**
*
*/
std::string getTrackTypeStr() const;
/**
* track index, track
*/
void setIndex(int index) { _index = index; }
/**
* track index, track
*/
int getIndex() const { return _index < 0 ? (int)getTrackType() : _index; }
private:
int _index = -1;
};
/**
@ -302,6 +319,7 @@ public:
FrameInternalBase(Frame::Ptr parent_frame, char *ptr, size_t size, uint64_t dts, uint64_t pts = 0, size_t prefix_size = 0)
: Parent(parent_frame->getCodecId(), ptr, size, dts, pts, prefix_size) {
_parent_frame = std::move(parent_frame);
this->setIndex(_parent_frame->getIndex());
}
bool cacheAble() const override { return _parent_frame->cacheAble(); }
@ -353,6 +371,7 @@ public:
using Ptr = std::shared_ptr<FrameCacheAble>;
FrameCacheAble(const Frame::Ptr &frame, bool force_key_frame = false, toolkit::Buffer::Ptr buf = nullptr) {
setIndex(frame->getIndex());
if (frame->cacheAble()) {
_ptr = frame->data();
_buffer = frame;

View File

@ -285,6 +285,7 @@ void H264Track::insertConfigFrame(const Frame::Ptr &frame) {
spsFrame->_buffer.assign("\x00\x00\x00\x01", 4);
spsFrame->_buffer.append(_sps);
spsFrame->_dts = frame->dts();
spsFrame->setIndex(frame->getIndex());
VideoTrack::inputFrame(spsFrame);
}
@ -294,6 +295,7 @@ void H264Track::insertConfigFrame(const Frame::Ptr &frame) {
ppsFrame->_buffer.assign("\x00\x00\x00\x01", 4);
ppsFrame->_buffer.append(_pps);
ppsFrame->_dts = frame->dts();
ppsFrame->setIndex(frame->getIndex());
VideoTrack::inputFrame(ppsFrame);
}
}

View File

@ -195,6 +195,7 @@ void H265Track::insertConfigFrame(const Frame::Ptr &frame) {
vpsFrame->_buffer.assign("\x00\x00\x00\x01", 4);
vpsFrame->_buffer.append(_vps);
vpsFrame->_dts = frame->dts();
vpsFrame->setIndex(frame->getIndex());
VideoTrack::inputFrame(vpsFrame);
}
if (!_sps.empty()) {
@ -203,6 +204,7 @@ void H265Track::insertConfigFrame(const Frame::Ptr &frame) {
spsFrame->_buffer.assign("\x00\x00\x00\x01", 4);
spsFrame->_buffer.append(_sps);
spsFrame->_dts = frame->dts();
spsFrame->setIndex(frame->getIndex());
VideoTrack::inputFrame(spsFrame);
}
@ -212,6 +214,7 @@ void H265Track::insertConfigFrame(const Frame::Ptr &frame) {
ppsFrame->_buffer.assign("\x00\x00\x00\x01", 4);
ppsFrame->_buffer.append(_pps);
ppsFrame->_dts = frame->dts();
ppsFrame->setIndex(frame->getIndex());
VideoTrack::inputFrame(ppsFrame);
}
}

View File

@ -34,7 +34,10 @@ public:
*
*
*/
Track(const Track &that) { _bit_rate = that._bit_rate; }
Track(const Track &that) {
_bit_rate = that._bit_rate;
setIndex(that.getIndex());
}
/**
* sps pps等信息

View File

@ -96,7 +96,16 @@ void PlayerProxy::setTranslationInfo()
}
}
static int getMaxTrackSize(const std::string &url) {
if (url.find(".m3u8") != std::string::npos || url.find(".ts") != std::string::npos) {
// hls和ts协议才开放多track支持
return 16;
}
return 2;
}
void PlayerProxy::play(const string &strUrlTmp) {
_option.max_track = getMaxTrackSize(strUrlTmp);
weak_ptr<PlayerProxy> weakSelf = shared_from_this();
std::shared_ptr<int> piFailedCnt(new int(0)); // 连续播放失败次数
setOnPlayResult([weakSelf, strUrlTmp, piFailedCnt](const SockException &err) {

View File

@ -61,7 +61,8 @@ void MP4Demuxer::onVideoTrack(uint32_t track, uint8_t object, int width, int hei
if (!video) {
return;
}
_track_to_codec.emplace(track, video);
video->setIndex(track);
_tracks.emplace(track, video);
if (extra && bytes) {
video->setExtraData((uint8_t *)extra, bytes);
}
@ -72,7 +73,8 @@ void MP4Demuxer::onAudioTrack(uint32_t track, uint8_t object, int channel_count,
if (!audio) {
return;
}
_track_to_codec.emplace(track, audio);
audio->setIndex(track);
_tracks.emplace(track, audio);
if (extra && bytes) {
audio->setExtraData((uint8_t *)extra, bytes);
}
@ -134,8 +136,8 @@ Frame::Ptr MP4Demuxer::readFrame(bool &keyFrame, bool &eof) {
}
Frame::Ptr MP4Demuxer::makeFrame(uint32_t track_id, const Buffer::Ptr &buf, int64_t pts, int64_t dts) {
auto it = _track_to_codec.find(track_id);
if (it == _track_to_codec.end()) {
auto it = _tracks.find(track_id);
if (it == _tracks.end()) {
return nullptr;
}
Frame::Ptr ret;
@ -166,15 +168,16 @@ Frame::Ptr MP4Demuxer::makeFrame(uint32_t track_id, const Buffer::Ptr &buf, int6
}
}
if (ret) {
ret->setIndex(track_id);
it->second->inputFrame(ret);
}
return ret;
}
vector<Track::Ptr> MP4Demuxer::getTracks(bool trackReady) const {
vector<Track::Ptr> MP4Demuxer::getTracks(bool ready) const {
vector<Track::Ptr> ret;
for (auto &pr : _track_to_codec) {
if (trackReady && !pr.second->ready()) {
for (auto &pr : _tracks) {
if (ready && !pr.second->ready()) {
continue;
}
ret.push_back(pr.second);

View File

@ -71,7 +71,7 @@ private:
MP4FileDisk::Ptr _mp4_file;
MP4FileDisk::Reader _mov_reader;
uint64_t _duration_ms = 0;
std::map<int, Track::Ptr> _track_to_codec;
std::unordered_map<int, Track::Ptr> _tracks;
toolkit::ResourcePool<toolkit::BufferRaw> _buffer_pool;
};

View File

@ -60,7 +60,7 @@ bool MP4MuxerInterface::haveVideo() const {
uint64_t MP4MuxerInterface::getDuration() const {
uint64_t ret = 0;
for (auto &pr : _codec_to_trackid) {
for (auto &pr : _tracks) {
if (pr.second.stamp.getRelativeStamp() > (int64_t)ret) {
ret = pr.second.stamp.getRelativeStamp();
}
@ -72,61 +72,50 @@ void MP4MuxerInterface::resetTracks() {
_started = false;
_have_video = false;
_mov_writter = nullptr;
_frame_merger.clear();
_codec_to_trackid.clear();
_tracks.clear();
}
void MP4MuxerInterface::flush() {
_frame_merger.flush();
for (auto &pr : _tracks) {
pr.second.merger.flush();
}
}
bool MP4MuxerInterface::inputFrame(const Frame::Ptr &frame) {
auto it = _codec_to_trackid.find(frame->getCodecId());
if (it == _codec_to_trackid.end()) {
//该Track不存在或初始化失败
auto it = _tracks.find(frame->getIndex());
if (it == _tracks.end()) {
// 该Track不存在或初始化失败
return false;
}
if (!_started) {
//该逻辑确保含有视频时,第一帧为关键帧
// 该逻辑确保含有视频时,第一帧为关键帧
if (_have_video && !frame->keyFrame()) {
//含有视频,但是不是关键帧,那么前面的帧丢弃
// 含有视频,但是不是关键帧,那么前面的帧丢弃
return false;
}
//开始写文件
// 开始写文件
_started = true;
}
//mp4文件时间戳需要从0开始
auto &track_info = it->second;
// mp4文件时间戳需要从0开始
auto &track = it->second;
switch (frame->getCodecId()) {
case CodecH264:
case CodecH265: {
//这里的代码逻辑是让SPS、PPS、IDR这些时间戳相同的帧打包到一起当做一个帧处理
_frame_merger.inputFrame(frame, [this, &track_info](uint64_t dts, uint64_t pts, const Buffer::Ptr &buffer, bool have_idr) {
// 这里的代码逻辑是让SPS、PPS、IDR这些时间戳相同的帧打包到一起当做一个帧处理
track.merger.inputFrame(frame, [&](uint64_t dts, uint64_t pts, const Buffer::Ptr &buffer, bool have_idr) {
int64_t dts_out, pts_out;
track_info.stamp.revise(dts, pts, dts_out, pts_out);
mp4_writer_write(_mov_writter.get(),
track_info.track_id,
buffer->data(),
buffer->size(),
pts_out,
dts_out,
have_idr ? MOV_AV_FLAG_KEYFREAME : 0);
track.stamp.revise(dts, pts, dts_out, pts_out);
mp4_writer_write(_mov_writter.get(), track.track_id, buffer->data(), buffer->size(), pts_out, dts_out, have_idr ? MOV_AV_FLAG_KEYFREAME : 0);
});
break;
}
default: {
int64_t dts_out, pts_out;
track_info.stamp.revise(frame->dts(), frame->pts(), dts_out, pts_out);
mp4_writer_write(_mov_writter.get(),
track_info.track_id,
frame->data() + frame->prefixSize(),
frame->size() - frame->prefixSize(),
pts_out,
dts_out,
frame->keyFrame() ? MOV_AV_FLAG_KEYFREAME : 0);
track.stamp.revise(frame->dts(), frame->pts(), dts_out, pts_out);
mp4_writer_write(_mov_writter.get(), track.track_id, frame->data() + frame->prefixSize(), frame->size() - frame->prefixSize(), pts_out, dts_out, frame->keyFrame() ? MOV_AV_FLAG_KEYFREAME : 0);
break;
}
}
@ -134,23 +123,14 @@ bool MP4MuxerInterface::inputFrame(const Frame::Ptr &frame) {
}
void MP4MuxerInterface::stampSync() {
if (_codec_to_trackid.size() < 2) {
return;
}
Stamp *audio = nullptr, *video = nullptr;
for(auto &pr : _codec_to_trackid){
switch (getTrackType((CodecId) pr.first)){
case TrackAudio : audio = &pr.second.stamp; break;
case TrackVideo : video = &pr.second.stamp; break;
default : break;
Stamp *first = nullptr;
for (auto &pr : _tracks) {
if (!first) {
first = &pr.second.stamp;
} else {
pr.second.stamp.syncTo(*first);
}
}
if (audio && video) {
//音频时间戳同步于视频,因为音频时间戳被修改后不影响播放
audio->syncTo(*video);
}
}
bool MP4MuxerInterface::addTrack(const Track::Ptr &track) {
@ -164,7 +144,7 @@ bool MP4MuxerInterface::addTrack(const Track::Ptr &track) {
}
if (!track->ready()) {
WarnL << "Track[" << track->getCodecName() << "] unready";
WarnL << "Track " << track->getCodecName() << " unready";
return false;
}
@ -175,36 +155,26 @@ bool MP4MuxerInterface::addTrack(const Track::Ptr &track) {
auto extra_size = extra ? extra->size() : 0;
if (track->getTrackType() == TrackVideo) {
auto video_track = dynamic_pointer_cast<VideoTrack>(track);
if (!video_track) {
WarnL << "不是VideoTrack";
return false;
}
CHECK(video_track);
auto track_id = mp4_writer_add_video(_mov_writter.get(), mp4_object, video_track->getVideoWidth(), video_track->getVideoHeight(), extra_data, extra_size);
if (track_id < 0) {
WarnL << "添加Video Track失败:" << video_track->getCodecName();
WarnL << "mp4_writer_add_video failed: " << video_track->getCodecName();
return false;
}
_codec_to_trackid[track->getCodecId()].track_id = track_id;
_tracks[track->getIndex()].track_id = track_id;
_have_video = true;
} else if (track->getTrackType() == TrackAudio) {
auto audio_track = dynamic_pointer_cast<AudioTrack>(track);
if (!audio_track) {
WarnL << "不是音频Track:" << track->getCodecName();
return false;
}
auto track_id = mp4_writer_add_audio(_mov_writter.get(), mp4_object, audio_track->getAudioChannel(),
audio_track->getAudioSampleBit() * audio_track->getAudioChannel(),
audio_track->getAudioSampleRate(), extra_data, extra_size);
CHECK(audio_track);
auto track_id = mp4_writer_add_audio(_mov_writter.get(), mp4_object, audio_track->getAudioChannel(), audio_track->getAudioSampleBit() * audio_track->getAudioChannel(), audio_track->getAudioSampleRate(), extra_data, extra_size);
if (track_id < 0) {
WarnL << "添加Track[" << track->getCodecName() << "]失败:" << track_id;
WarnL << "mp4_writer_add_audio failed: " << audio_track->getCodecName();
return false;
}
_codec_to_trackid[track->getCodecId()].track_id = track_id;
_tracks[track->getIndex()].track_id = track_id;
}
//尝试音视频同步
// 尝试音视频同步
stampSync();
return true;
}
@ -236,7 +206,7 @@ void MP4MuxerMemory::resetTracks() {
bool MP4MuxerMemory::inputFrame(const Frame::Ptr &frame) {
if (_init_segment.empty()) {
//尚未生成init segment
// 尚未生成init segment
return false;
}
@ -259,5 +229,5 @@ bool MP4MuxerMemory::inputFrame(const Frame::Ptr &frame) {
return MP4MuxerInterface::inputFrame(frame);
}
}//namespace mediakit
#endif //defined(ENABLE_MP4)
} // namespace mediakit
#endif // defined(ENABLE_MP4)

View File

@ -72,12 +72,18 @@ private:
bool _started = false;
bool _have_video = false;
MP4FileIO::Writer _mov_writter;
struct track_info {
class FrameMergerImp : public FrameMerger {
public:
FrameMergerImp() : FrameMerger(FrameMerger::mp4_nal_size) {}
};
struct MP4Track {
int track_id = -1;
Stamp stamp;
FrameMergerImp merger;
};
std::unordered_map<int, track_info> _codec_to_trackid;
FrameMerger _frame_merger { FrameMerger::mp4_nal_size };
std::unordered_map<int, MP4Track> _tracks;
};
class MP4Muxer : public MP4MuxerInterface{

View File

@ -27,7 +27,8 @@ MP4Reader::MP4Reader(const std::string &vhost, const std::string &app, const std
option.enable_mp4 = false;
option.enable_hls = false;
option.enable_hls_fmp4 = false;
// mp4支持多track
option.max_track = 16;
setup(vhost, app, stream_id, file_path, option, std::move(poller));
}

View File

@ -18,7 +18,7 @@
using namespace toolkit;
namespace mediakit{
namespace mediakit {
MpegMuxer::MpegMuxer(bool is_ps) {
_is_ps = is_ps;
@ -40,27 +40,27 @@ bool MpegMuxer::addTrack(const Track::Ptr &track) {
if (track->getTrackType() == TrackVideo) {
_have_video = true;
}
_codec_to_trackid[track->getCodecId()] = mpeg_muxer_add_stream((::mpeg_muxer_t *)_context, mpeg_id, nullptr, 0);
_tracks[track->getIndex()].track_id = mpeg_muxer_add_stream((::mpeg_muxer_t *)_context, mpeg_id, nullptr, 0);
return true;
}
bool MpegMuxer::inputFrame(const Frame::Ptr &frame) {
auto it = _codec_to_trackid.find(frame->getCodecId());
if (it == _codec_to_trackid.end()) {
auto it = _tracks.find(frame->getIndex());
if (it == _tracks.end()) {
return false;
}
auto track_id = it->second;
auto &track = it->second;
_key_pos = !_have_video;
switch (frame->getCodecId()) {
case CodecH264:
case CodecH265: {
//这里的代码逻辑是让SPS、PPS、IDR这些时间戳相同的帧打包到一起当做一个帧处理
return _frame_merger.inputFrame(frame,[this, track_id](uint64_t dts, uint64_t pts, const Buffer::Ptr &buffer, bool have_idr) {
// 这里的代码逻辑是让SPS、PPS、IDR这些时间戳相同的帧打包到一起当做一个帧处理
return track.merger.inputFrame(frame, [&](uint64_t dts, uint64_t pts, const Buffer::Ptr &buffer, bool have_idr) {
_key_pos = have_idr;
//取视频时间戳为TS的时间戳
// 取视频时间戳为TS的时间戳
_timestamp = dts;
_max_cache_size = 512 + 1.2 * buffer->size();
mpeg_muxer_input((::mpeg_muxer_t *)_context, track_id, have_idr ? 0x0001 : 0, pts * 90LL,dts * 90LL, buffer->data(), buffer->size());
mpeg_muxer_input((::mpeg_muxer_t *)_context, track.track_id, have_idr ? 0x0001 : 0, pts * 90LL, dts * 90LL, buffer->data(), buffer->size());
flushCache();
});
}
@ -80,7 +80,7 @@ bool MpegMuxer::inputFrame(const Frame::Ptr &frame) {
_timestamp = frame->dts();
}
_max_cache_size = 512 + 1.2 * frame->size();
mpeg_muxer_input((::mpeg_muxer_t *)_context, track_id, frame->keyFrame() ? 0x0001 : 0, frame->pts() * 90LL, frame->dts() * 90LL, frame->data(), frame->size());
mpeg_muxer_input((::mpeg_muxer_t *)_context, track.track_id, frame->keyFrame() ? 0x0001 : 0, frame->pts() * 90LL, frame->dts() * 90LL, frame->data(), frame->size());
flushCache();
return true;
}
@ -103,7 +103,6 @@ void MpegMuxer::createContext() {
if (!thiz->_current_buffer
|| thiz->_current_buffer->size() + bytes > thiz->_current_buffer->getCapacity()) {
if (thiz->_current_buffer) {
//WarnL << "need realloc mpeg buffer" << thiz->_current_buffer->size() + bytes << " > " << thiz->_current_buffer->getCapacity();
thiz->flushCache();
}
thiz->_current_buffer = thiz->_buffer_pool.obtain2();
@ -143,12 +142,13 @@ void MpegMuxer::releaseContext() {
mpeg_muxer_destroy((::mpeg_muxer_t *)_context);
_context = nullptr;
}
_codec_to_trackid.clear();
_frame_merger.clear();
_tracks.clear();
}
void MpegMuxer::flush() {
_frame_merger.flush();
for (auto &pr : _tracks) {
pr.second.merger.flush();
}
}
}//mediakit

View File

@ -70,8 +70,17 @@ private:
uint32_t _max_cache_size = 0;
uint64_t _timestamp = 0;
struct mpeg_muxer_t *_context = nullptr;
std::unordered_map<int, int/*track_id*/> _codec_to_trackid;
FrameMerger _frame_merger{FrameMerger::h264_prefix};
class FrameMergerImp : public FrameMerger {
public:
FrameMergerImp() : FrameMerger(FrameMerger::h264_prefix) {}
};
struct MP4Track {
int track_id = -1;
FrameMergerImp merger;
};
std::unordered_map<int, MP4Track> _tracks;
toolkit::BufferRaw::Ptr _current_buffer;
toolkit::ResourcePool<toolkit::BufferRaw> _buffer_pool;
};

View File

@ -23,17 +23,22 @@ RtmpMuxer::RtmpMuxer(const TitleMeta::Ptr &title) {
}
bool RtmpMuxer::addTrack(const Track::Ptr &track) {
auto &encoder = _encoder[track->getTrackType()];
if (encoder) {
WarnL << "Already add a track kind of: " << track->getTrackTypeStr()
<< ", ignore track: " << track->getCodecName();
if (_track_existed[track->getTrackType()]) {
// rtmp不支持多个同类型track
WarnL << "Already add a track kind of: " << track->getTrackTypeStr() << ", ignore track: " << track->getCodecName();
return false;
}
auto &encoder = _encoders[track->getIndex()];
CHECK(!encoder);
encoder = Factory::getRtmpEncoderByTrack(track);
if (!encoder) {
return false;
}
// 标记已经存在该类型track
_track_existed[track->getTrackType()] = true;
// 设置rtmp输出环形缓存
encoder->setRtmpRing(_rtmp_ring);
@ -43,22 +48,22 @@ bool RtmpMuxer::addTrack(const Track::Ptr &track) {
}
bool RtmpMuxer::inputFrame(const Frame::Ptr &frame) {
auto &encoder = _encoder[frame->getTrackType()];
auto &encoder = _encoders[frame->getIndex()];
return encoder ? encoder->inputFrame(frame) : false;
}
void RtmpMuxer::flush() {
for (auto &encoder : _encoder) {
if (encoder) {
encoder->flush();
for (auto &pr : _encoders) {
if (pr.second) {
pr.second->flush();
}
}
}
void RtmpMuxer::makeConfigPacket() {
for (auto &encoder : _encoder) {
if (encoder) {
encoder->makeConfigPacket();
for (auto &pr : _encoders) {
if (pr.second) {
pr.second->makeConfigPacket();
}
}
}
@ -73,9 +78,8 @@ RtmpRing::RingType::Ptr RtmpMuxer::getRtmpRing() const {
void RtmpMuxer::resetTracks() {
_metadata.clear();
for (auto &encoder : _encoder) {
encoder = nullptr;
}
_encoders.clear();
CLEAR_ARR(_track_existed);
}
} /* namespace mediakit */

View File

@ -64,10 +64,13 @@ public:
* config包
*/
void makeConfigPacket();
private:
RtmpRing::RingType::Ptr _rtmp_ring;
bool _track_existed[2] = { false, false };
AMFValue _metadata;
RtmpCodec::Ptr _encoder[TrackMax];
RtmpRing::RingType::Ptr _rtmp_ring;
std::unordered_map<int, RtmpCodec::Ptr> _encoders;
};

View File

@ -62,7 +62,9 @@ DecoderImp::Ptr DecoderImp::createDecoder(Type type, MediaSinkInterface *sink){
}
void DecoderImp::flush() {
_merger.flush();
for (auto &pr : _tracks) {
pr.second.second.flush();
}
}
ssize_t DecoderImp::input(const uint8_t *data, size_t bytes){
@ -88,9 +90,9 @@ void DecoderImp::onStream(int stream, int codecid, const void *extra, size_t byt
if (!track) {
return;
}
onTrack(std::move(track));
onTrack(stream, std::move(track));
// 防止未获取视频track提前complete导致忽略后续视频的问题用于兼容一些不太规范的ps流
if (finish && _tracks[TrackVideo]) {
if (finish && _have_video) {
_sink->addTrackCompleted();
InfoL << "Add track finished";
}
@ -104,37 +106,43 @@ void DecoderImp::onDecode(int stream, int codecid, int flags, int64_t pts, int64
if (codec == CodecInvalid) {
return;
}
if (!_tracks[getTrackType(codec)]) {
onTrack(Factory::getTrackByCodecId(codec, 8000, 1, 16));
auto &ref = _tracks[stream];
if (!ref.first) {
onTrack(stream, Factory::getTrackByCodecId(codec, 8000, 1, 16));
}
// TODO 支持多track
auto frame = Factory::getFrameFromPtr(codec, (char *) data, bytes, dts, pts);
if (getTrackType(codec) == TrackVideo) {
_merger.inputFrame(frame, [&](uint64_t dts, uint64_t pts, const Buffer::Ptr &buffer, bool) {
onFrame(Factory::getFrameFromBuffer(codec, buffer, dts, pts));
});
} else {
onFrame(frame);
auto frame = Factory::getFrameFromPtr(codec, (char *)data, bytes, dts, pts);
if (getTrackType(codec) != TrackVideo) {
onFrame(stream, frame);
return;
}
ref.second.inputFrame(frame, [&](uint64_t dts, uint64_t pts, const Buffer::Ptr &buffer, bool) {
onFrame(stream, Factory::getFrameFromBuffer(codec, buffer, dts, pts));
});
}
#else
void DecoderImp::onDecode(int stream,int codecid,int flags,int64_t pts,int64_t dts,const void *data,size_t bytes) {}
void DecoderImp::onStream(int stream,int codecid,const void *extra,size_t bytes,int finish) {}
#endif
void DecoderImp::onTrack(const Track::Ptr &track) {
void DecoderImp::onTrack(int index, const Track::Ptr &track) {
if (!track) {
return;
}
if (!_tracks[track->getTrackType()]) {
_tracks[track->getTrackType()] = track;
_sink->addTrack(track);
InfoL << "got track: " << track->getCodecName();
track->setIndex(index);
auto &ref = _tracks[index];
if (ref.first) {
WarnL << "Already existed a same track: " << index << ", codec: " << track->getCodecName();
return;
}
ref.first = track;
_sink->addTrack(track);
InfoL << "Got track: " << track->getCodecName();
_have_video = track->getTrackType() == TrackVideo ? true : _have_video;
}
void DecoderImp::onFrame(const Frame::Ptr &frame) {
void DecoderImp::onFrame(int index, const Frame::Ptr &frame) {
if (frame) {
frame->setIndex(index);
_sink->inputFrame(frame);
}
}

View File

@ -48,8 +48,8 @@ public:
void flush();
protected:
void onTrack(const Track::Ptr &track);
void onFrame(const Frame::Ptr &frame);
void onTrack(int index, const Track::Ptr &track);
void onFrame(int index, const Frame::Ptr &frame);
private:
DecoderImp(const Decoder::Ptr &decoder, MediaSinkInterface *sink);
@ -57,10 +57,15 @@ private:
void onStream(int stream, int codecid, const void *extra, size_t bytes, int finish);
private:
bool _have_video = false;
Decoder::Ptr _decoder;
MediaSinkInterface *_sink;
FrameMerger _merger{FrameMerger::none};
Track::Ptr _tracks[TrackMax];
class FrameMergerImp : public FrameMerger {
public:
FrameMergerImp() : FrameMerger(FrameMerger::none) {}
};
std::unordered_map<int, std::pair<Track::Ptr, FrameMergerImp> > _tracks;
};
}//namespace mediakit

View File

@ -78,7 +78,7 @@ bool GB28181Process::inputRtp(bool, const char *data, size_t data_len) {
if (!ref) {
if (_rtp_receiver.size() > 2) {
// 防止pt类型太多导致内存溢出
throw std::invalid_argument("rtp pt类型不得超过2种!");
WarnL << "Rtp payload type more than 2 types: " << _rtp_receiver.size();
}
switch (pt) {
case Rtsp::PT_PCMA:
@ -87,6 +87,7 @@ bool GB28181Process::inputRtp(bool, const char *data, size_t data_len) {
ref = std::make_shared<RtpReceiverImp>(8000, [this](RtpPacket::Ptr rtp) { onRtpSorted(std::move(rtp)); });
auto track = Factory::getTrackByCodecId(pt == Rtsp::PT_PCMU ? CodecG711U : CodecG711A, 8000, 1, 16);
CHECK(track);
track->setIndex(pt);
_interface->addTrack(track);
_rtp_decoder[pt] = Factory::getRtpDecoderByTrack(track);
break;
@ -96,6 +97,7 @@ bool GB28181Process::inputRtp(bool, const char *data, size_t data_len) {
ref = std::make_shared<RtpReceiverImp>(90000, [this](RtpPacket::Ptr rtp) { onRtpSorted(std::move(rtp)); });
auto track = Factory::getTrackByCodecId(CodecJPEG);
CHECK(track);
track->setIndex(pt);
_interface->addTrack(track);
_rtp_decoder[pt] = Factory::getRtpDecoderByTrack(track);
break;
@ -106,23 +108,28 @@ bool GB28181Process::inputRtp(bool, const char *data, size_t data_len) {
ref = std::make_shared<RtpReceiverImp>(48000, [this](RtpPacket::Ptr rtp) { onRtpSorted(std::move(rtp)); });
auto track = Factory::getTrackByCodecId(CodecOpus);
CHECK(track);
track->setIndex(pt);
_interface->addTrack(track);
_rtp_decoder[pt] = Factory::getRtpDecoderByTrack(track);
} else if (pt == h265_pt) {
// H265负载
ref = std::make_shared<RtpReceiverImp>(90000, [this](RtpPacket::Ptr rtp) { onRtpSorted(std::move(rtp)); });
auto track = Factory::getTrackByCodecId(CodecH265);
CHECK(track);
track->setIndex(pt);
_interface->addTrack(track);
_rtp_decoder[pt] = Factory::getRtpDecoderByTrack(track);
} else if (pt == h264_pt) {
// H264负载
ref = std::make_shared<RtpReceiverImp>(90000, [this](RtpPacket::Ptr rtp) { onRtpSorted(std::move(rtp)); });
auto track = Factory::getTrackByCodecId(CodecH264);
CHECK(track);
track->setIndex(pt);
_interface->addTrack(track);
_rtp_decoder[pt] = Factory::getRtpDecoderByTrack(track);
} else {
if (pt != Rtsp::PT_MP2T && pt != ps_pt) {
WarnL << "rtp payload type未识别(" << (int)pt << "),已按ts或ps负载处理";
WarnL << "Unknown rtp payload type(" << (int)pt << "), decode it as mpeg-ps or mpeg-ts";
}
ref = std::make_shared<RtpReceiverImp>(90000, [this](RtpPacket::Ptr rtp) { onRtpSorted(std::move(rtp)); });
// ts或ps负载
@ -142,7 +149,8 @@ bool GB28181Process::inputRtp(bool, const char *data, size_t data_len) {
}
}
// 设置frame回调
_rtp_decoder[pt]->addDelegate([this](const Frame::Ptr &frame) {
_rtp_decoder[pt]->addDelegate([this, pt](const Frame::Ptr &frame) {
frame->setIndex(pt);
onRtpDecode(frame);
return true;
});

View File

@ -22,7 +22,7 @@ namespace mediakit{
PSEncoderImp::PSEncoderImp(uint32_t ssrc, uint8_t payload_type) : MpegMuxer(true) {
GET_CONFIG(uint32_t,video_mtu,Rtp::kVideoMtuSize);
_rtp_encoder = std::make_shared<CommonRtpEncoder>();
_rtp_encoder->setRtpInfo(ssrc, video_mtu, 90000, payload_type, 0);
_rtp_encoder->setRtpInfo(ssrc, video_mtu, 90000, payload_type);
auto ring = std::make_shared<RtpRing::RingType>();
ring->setDelegate(std::make_shared<RingDelegateHelper>([this](RtpPacket::Ptr rtp, bool is_key) { onRTP(std::move(rtp), is_key); }));
_rtp_encoder->setRtpRing(std::move(ring));

View File

@ -72,7 +72,7 @@ RtpCodec::Ptr RawEncoderImp::createRtpEncoder(const Track::Ptr &track) {
sample_rate = std::static_pointer_cast<AudioTrack>(track)->getAudioSampleRate();
}
auto ret = Factory::getRtpEncoderByCodecId(track->getCodecId(), _payload_type);
ret->setRtpInfo(_ssrc, mtu, sample_rate, _payload_type, 2 * track->getTrackType());
ret->setRtpInfo(_ssrc, mtu, sample_rate, _payload_type);
return ret;
}

View File

@ -254,8 +254,7 @@ void RtpProcess::emitOnPublish() {
return;
}
if (err.empty()) {
strong_self->_muxer = std::make_shared<MultiMediaSourceMuxer>(strong_self->_media_info, 0.0f,
option);
strong_self->_muxer = std::make_shared<MultiMediaSourceMuxer>(strong_self->_media_info, 0.0f, option);
if (strong_self->_only_audio) {
strong_self->_muxer->setOnlyAudio();
}

View File

@ -19,6 +19,7 @@ RtpPacket::Ptr RtpInfo::makeRtp(TrackType type, const void* data, size_t len, bo
rtp->setSize(payload_len + RtpPacket::kRtpTcpHeaderSize);
rtp->sample_rate = _sample_rate;
rtp->type = type;
rtp->track_index = _track_index;
//rtsp over tcp 头
auto ptr = (uint8_t *) rtp->data();

View File

@ -54,7 +54,7 @@ class RtpInfo {
public:
using Ptr = std::shared_ptr<RtpInfo>;
RtpInfo(uint32_t ssrc, size_t mtu_size, uint32_t sample_rate, uint8_t pt, uint8_t interleaved) {
RtpInfo(uint32_t ssrc, size_t mtu_size, uint32_t sample_rate, uint8_t pt, uint8_t interleaved, int track_index) {
if (ssrc == 0) {
ssrc = ((uint64_t) this) & 0xFFFFFFFF;
}
@ -63,6 +63,7 @@ public:
_mtu_size = mtu_size;
_sample_rate = sample_rate;
_interleaved = interleaved;
_track_index = track_index;
}
//返回rtp负载最大长度
@ -78,6 +79,7 @@ private:
uint16_t _seq = 0;
uint32_t _ssrc;
uint32_t _sample_rate;
int _track_index;
size_t _mtu_size;
};
@ -85,8 +87,8 @@ class RtpCodec : public RtpRing, public FrameDispatcher {
public:
using Ptr = std::shared_ptr<RtpCodec>;
void setRtpInfo(uint32_t ssrc, size_t mtu_size, uint32_t sample_rate, uint8_t pt, uint8_t interleaved) {
_rtp_info.reset(new RtpInfo(ssrc, mtu_size, sample_rate, pt, interleaved));
void setRtpInfo(uint32_t ssrc, size_t mtu_size, uint32_t sample_rate, uint8_t pt, uint8_t interleaved = 0, int track_index = 0) {
_rtp_info.reset(new RtpInfo(ssrc, mtu_size, sample_rate, pt, interleaved, track_index));
}
RtpInfo &getRtpInfo() { return *_rtp_info; }

View File

@ -166,7 +166,7 @@ public:
// ntp时间戳
uint64_t ntp_stamp;
bool disable_ntp = false;
int track_index;
static Ptr create();

View File

@ -19,18 +19,19 @@ namespace mediakit {
void RtspMuxer::onRtp(RtpPacket::Ptr in, bool is_key) {
if (_live) {
if (_rtp_stamp[in->type] != in->getHeader()->stamp) {
auto &ref = _tracks[in->track_index];
if (ref.rtp_stamp != in->getHeader()->stamp) {
// rtp时间戳变化才计算ntp节省cpu资源
int64_t stamp_ms = in->getStamp() * uint64_t(1000) / in->sample_rate;
int64_t stamp_ms_inc;
// 求rtp时间戳增量
_stamp[in->type].revise(stamp_ms, stamp_ms, stamp_ms_inc, stamp_ms_inc);
_rtp_stamp[in->type] = in->getHeader()->stamp;
_ntp_stamp[in->type] = stamp_ms_inc + _ntp_stamp_start;
ref.stamp.revise(stamp_ms, stamp_ms, stamp_ms_inc, stamp_ms_inc);
ref.rtp_stamp = in->getHeader()->stamp;
ref.ntp_stamp = stamp_ms_inc + _ntp_stamp_start;
}
// rtp拦截入口此处统一赋值ntp
in->ntp_stamp = _ntp_stamp[in->type];
in->ntp_stamp = ref.ntp_stamp;
} else {
// 点播情况下设置ntp时间戳为rtp时间戳加基准ntp时间戳
in->ntp_stamp = _ntp_stamp_start + (in->getStamp() * uint64_t(1000) / in->sample_rate);
@ -55,16 +56,20 @@ RtspMuxer::RtspMuxer(const TitleSdp::Ptr &title) {
}
bool RtspMuxer::addTrack(const Track::Ptr &track) {
auto &encoder = _encoder[track->getTrackType()];
if (encoder) {
WarnL << "Already add a track kind of: " << track->getTrackTypeStr()
<< ", ignore track: " << track->getCodecName();
if (_track_existed[track->getTrackType()]) {
// rtsp不支持多个同类型track
WarnL << "Already add a track kind of: " << track->getTrackTypeStr() << ", ignore track: " << track->getCodecName();
return false;
}
auto &ref = _tracks[track->getIndex()];
auto &encoder = ref.encoder;
CHECK(!encoder);
// payload type 96以后则为动态pt
Sdp::Ptr sdp = track->getSdp(96 + _index);
if (!sdp) {
WarnL << "rtsp复用器不支持该编码:" << track->getCodecName();
WarnL << "Unsupported codec: " << track->getCodecName();
return false;
}
@ -73,6 +78,9 @@ bool RtspMuxer::addTrack(const Track::Ptr &track) {
return false;
}
// 标记已经存在该类型track
_track_existed[track->getTrackType()] = true;
{
static atomic<uint32_t> s_ssrc(0);
uint32_t ssrc = s_ssrc++;
@ -90,7 +98,7 @@ bool RtspMuxer::addTrack(const Track::Ptr &track) {
GET_CONFIG(uint32_t, audio_mtu, Rtp::kAudioMtuSize);
GET_CONFIG(uint32_t, video_mtu, Rtp::kVideoMtuSize);
auto mtu = track->getTrackType() == TrackVideo ? video_mtu : audio_mtu;
encoder->setRtpInfo(ssrc, mtu, sdp->getSampleRate(), sdp->getPayloadType(), 2 * track->getTrackType());
encoder->setRtpInfo(ssrc, mtu, sdp->getSampleRate(), sdp->getPayloadType(), 2 * track->getTrackType(), track->getIndex());
}
// 设置rtp输出环形缓存
@ -106,28 +114,33 @@ bool RtspMuxer::addTrack(const Track::Ptr &track) {
trySyncTrack();
// rtp的时间戳是pts允许回退
_stamp[TrackVideo].enableRollback(true);
if (track->getTrackType() == TrackVideo) {
ref.stamp.enableRollback(true);
}
++_index;
return true;
}
void RtspMuxer::trySyncTrack() {
if (_encoder[TrackAudio] && _encoder[TrackVideo]) {
// 音频时间戳同步于视频,因为音频时间戳被修改后不影响播放
_stamp[TrackAudio].syncTo(_stamp[TrackVideo]);
Stamp *first = nullptr;
for (auto &pr : _tracks) {
if (!first) {
first = &pr.second.stamp;
} else {
pr.second.stamp.syncTo(*first);
}
}
}
bool RtspMuxer::inputFrame(const Frame::Ptr &frame) {
auto &encoder = _encoder[frame->getTrackType()];
auto &encoder = _tracks[frame->getIndex()].encoder;
return encoder ? encoder->inputFrame(frame) : false;
}
void RtspMuxer::flush() {
for (auto &encoder : _encoder) {
if (encoder) {
encoder->flush();
for (auto &pr : _tracks) {
if (pr.second.encoder) {
pr.second.encoder->flush();
}
}
}
@ -142,9 +155,8 @@ RtpRing::RingType::Ptr RtspMuxer::getRtpRing() const {
void RtspMuxer::resetTracks() {
_sdp.clear();
for (auto &encoder : _encoder) {
encoder = nullptr;
}
_tracks.clear();
CLEAR_ARR(_track_existed);
}
} /* namespace mediakit */

View File

@ -85,13 +85,20 @@ private:
private:
bool _live = true;
bool _track_existed[2] = { false, false };
uint8_t _index {0};
uint32_t _rtp_stamp[TrackMax]{0};
uint64_t _ntp_stamp[TrackMax]{0};
uint64_t _ntp_stamp_start;
std::string _sdp;
Stamp _stamp[TrackMax];
RtpCodec::Ptr _encoder[TrackMax];
struct TrackInfo {
Stamp stamp;
uint32_t rtp_stamp { 0 };
uint64_t ntp_stamp { 0 };
RtpCodec::Ptr encoder;
};
std::unordered_map<int, TrackInfo> _tracks;
RtpRing::RingType::Ptr _rtpRing;
RtpRing::RingType::Ptr _rtpInterceptor;
};

View File

@ -287,18 +287,7 @@ std::string SrtTransportImp::getIdentifier() const {
bool SrtTransportImp::inputFrame(const Frame::Ptr &frame) {
if (_muxer) {
//TraceL<<"before type "<<frame->getCodecName()<<" dts "<<frame->dts()<<" pts "<<frame->pts();
auto frame_tmp = std::make_shared<FrameStamp>(frame, _type_to_stamp[frame->getTrackType()],false);
if(_type_to_stamp.size()>1){
// 有音视频,检查是否时间戳是否差距过大
auto diff = _type_to_stamp[TrackType::TrackVideo].getRelativeStamp() - _type_to_stamp[TrackType::TrackAudio].getRelativeStamp();
if(std::abs(diff) > 5000){
// 超过5s应该同步 TODO
WarnL << _media_info.full_url <<" video or audio not sync : "<<diff;
}
}
//TraceL<<"after type "<<frame_tmp->getCodecName()<<" dts "<<frame_tmp->dts()<<" pts "<<frame_tmp->pts();
return _muxer->inputFrame(frame_tmp);
return _muxer->inputFrame(frame);
}
if (_cached_func.size() > 200) {
WarnL << "cached frame of track(" << frame->getCodecName() << ") is too much, now dropped";
@ -306,17 +295,11 @@ bool SrtTransportImp::inputFrame(const Frame::Ptr &frame) {
}
auto frame_cached = Frame::getCacheAbleFrame(frame);
lock_guard<recursive_mutex> lck(_func_mtx);
_cached_func.emplace_back([this, frame_cached]() {
//TraceL<<"before type "<<frame_cached->getCodecName()<<" dts "<<frame_cached->dts()<<" pts "<<frame_cached->pts();
auto frame_tmp = std::make_shared<FrameStamp>(frame_cached, _type_to_stamp[frame_cached->getTrackType()],false);
//TraceL<<"after type "<<frame_tmp->getCodecName()<<" dts "<<frame_tmp->dts()<<" pts "<<frame_tmp->pts();
_muxer->inputFrame(frame_tmp);
});
_cached_func.emplace_back([this, frame_cached]() { _muxer->inputFrame(frame_cached); });
return true;
}
bool SrtTransportImp::addTrack(const Track::Ptr &track) {
_type_to_stamp.emplace(track->getTrackType(),Stamp());
if (_muxer) {
return _muxer->addTrack(track);
}
@ -333,9 +316,6 @@ void SrtTransportImp::addTrackCompleted() {
lock_guard<recursive_mutex> lck(_func_mtx);
_cached_func.emplace_back([this]() { _muxer->addTrackCompleted(); });
}
if(_type_to_stamp.size() >1){
_type_to_stamp[TrackType::TrackAudio].syncTo(_type_to_stamp[TrackType::TrackVideo]);
}
}
void SrtTransportImp::doCachedFunc() {

View File

@ -86,8 +86,6 @@ private:
DecoderImp::Ptr _decoder;
std::recursive_mutex _func_mtx;
std::deque<std::function<void()>> _cached_func;
std::unordered_map<int, Stamp> _type_to_stamp;
};
} // namespace SRT