整理MediaSource代码

This commit is contained in:
xiongziliang 2019-12-03 16:10:02 +08:00
parent 68718953d4
commit 5249c24430
18 changed files with 323 additions and 263 deletions

View File

@ -395,25 +395,21 @@ void installWebApi() {
API_REGIST(api,getMediaList,{
CHECK_SECRET();
//获取所有MediaSource列表
MediaSource::for_each_media([&](const string &schema,
const string &vhost,
const string &app,
const string &stream,
const MediaSource::Ptr &media){
if(!allArgs["schema"].empty() && allArgs["schema"] != schema){
MediaSource::for_each_media([&](const MediaSource::Ptr &media){
if(!allArgs["schema"].empty() && allArgs["schema"] != media->getSchema()){
return;
}
if(!allArgs["vhost"].empty() && allArgs["vhost"] != vhost){
if(!allArgs["vhost"].empty() && allArgs["vhost"] != media->getVhost()){
return;
}
if(!allArgs["app"].empty() && allArgs["app"] != app){
if(!allArgs["app"].empty() && allArgs["app"] != media->getApp()){
return;
}
Value item;
item["schema"] = schema;
item["vhost"] = vhost;
item["app"] = app;
item["stream"] = stream;
item["schema"] = media->getSchema();
item["vhost"] = media->getVhost();
item["app"] = media->getApp();
item["stream"] = media->getId();
val["data"].append(item);
});
});
@ -453,21 +449,17 @@ void installWebApi() {
int count_hit = 0;
int count_closed = 0;
list<MediaSource::Ptr> media_list;
MediaSource::for_each_media([&](const string &schema,
const string &vhost,
const string &app,
const string &stream,
const MediaSource::Ptr &media){
if(!allArgs["schema"].empty() && allArgs["schema"] != schema){
MediaSource::for_each_media([&](const MediaSource::Ptr &media){
if(!allArgs["schema"].empty() && allArgs["schema"] != media->getSchema()){
return;
}
if(!allArgs["vhost"].empty() && allArgs["vhost"] != vhost){
if(!allArgs["vhost"].empty() && allArgs["vhost"] != media->getVhost()){
return;
}
if(!allArgs["app"].empty() && allArgs["app"] != app){
if(!allArgs["app"].empty() && allArgs["app"] != media->getApp()){
return;
}
if(!allArgs["stream"].empty() && allArgs["stream"] != stream){
if(!allArgs["stream"].empty() && allArgs["stream"] != media->getId()){
return;
}
++count_hit;

View File

@ -38,8 +38,142 @@ namespace mediakit {
recursive_mutex MediaSource::g_mtxMediaSrc;
MediaSource::SchemaVhostAppStreamMap MediaSource::g_mapMediaSrc;
MediaSource::MediaSource(const string &strSchema, const string &strVhost, const string &strApp, const string &strId) :
_strSchema(strSchema),
_strApp(strApp),
_strId(strId) {
if (strVhost.empty()) {
_strVhost = DEFAULT_VHOST;
} else {
_strVhost = strVhost;
}
}
void MediaSource::findAsync(const MediaInfo &info,
MediaSource::~MediaSource() {
unregist();
}
const string& MediaSource::getSchema() const {
return _strSchema;
}
const string& MediaSource::getVhost() const {
return _strVhost;
}
const string& MediaSource::getApp() const {
//获取该源的id
return _strApp;
}
const string& MediaSource::getId() const {
return _strId;
}
vector<Track::Ptr> MediaSource::getTracks(bool trackReady) const {
auto strongPtr = _track_source.lock();
if(strongPtr){
return strongPtr->getTracks(trackReady);
}
return vector<Track::Ptr>();
}
void MediaSource::setTrackSource(const std::weak_ptr<TrackSource> &track_src) {
_track_source = track_src;
}
void MediaSource::setListener(const std::weak_ptr<MediaSourceEvent> &listener){
_listener = listener;
}
const std::weak_ptr<MediaSourceEvent>& MediaSource::getListener() const{
return _listener;
}
bool MediaSource::seekTo(uint32_t ui32Stamp) {
auto listener = _listener.lock();
if(!listener){
return false;
}
return listener->seekTo(*this,ui32Stamp);
}
bool MediaSource::close(bool force) {
auto listener = _listener.lock();
if(!listener){
return false;
}
return listener->close(*this,force);
}
void MediaSource::onNoneReader(){
auto listener = _listener.lock();
if(!listener){
return;
}
listener->onNoneReader(*this);
}
void MediaSource::for_each_media(const function<void(const MediaSource::Ptr &src)> &cb) {
lock_guard<recursive_mutex> lock(g_mtxMediaSrc);
for (auto &pr0 : g_mapMediaSrc) {
for (auto &pr1 : pr0.second) {
for (auto &pr2 : pr1.second) {
for (auto &pr3 : pr2.second) {
auto src = pr3.second.lock();
if(src){
cb(src);
}
}
}
}
}
}
template<typename MAP, typename FUNC>
static bool searchMedia(MAP &map,
const string &schema,
const string &vhost,
const string &app,
const string &id,
FUNC &&func) {
auto it0 = map.find(schema);
if (it0 == map.end()) {
//未找到协议
return false;
}
auto it1 = it0->second.find(vhost);
if (it1 == it0->second.end()) {
//未找到vhost
return false;
}
auto it2 = it1->second.find(app);
if (it2 == it1->second.end()) {
//未找到app
return false;
}
auto it3 = it2->second.find(id);
if (it3 == it2->second.end()) {
//未找到streamId
return false;
}
return func(it0, it1, it2, it3);
}
template<typename MAP, typename IT0, typename IT1, typename IT2>
static void eraseIfEmpty(MAP &map, IT0 it0, IT1 it1, IT2 it2) {
if (it2->second.empty()) {
it1->second.erase(it2);
if (it1->second.empty()) {
it0->second.erase(it1);
if (it0->second.empty()) {
map.erase(it0);
}
}
}
};
void findAsync_l(const MediaInfo &info,
const std::shared_ptr<TcpSession> &session,
bool retry,
const function<void(const MediaSource::Ptr &src)> &cb){
@ -99,12 +233,17 @@ void MediaSource::findAsync(const MediaInfo &info,
}
DebugL << "收到媒体注册事件,回复播放器:" << info._schema << "/" << info._vhost << "/" << info._app << "/" << info._streamid;
//再找一遍媒体源,一般能找到
findAsync(info,strongSession,false,cb);
findAsync_l(info,strongSession,false,cb);
}, false);
};
//监听媒体注册事件
NoticeCenter::Instance().addListener(listener_tag, Broadcast::kBroadcastMediaChanged, onRegist);
}
void MediaSource::findAsync(const MediaInfo &info, const std::shared_ptr<TcpSession> &session,const function<void(const Ptr &src)> &cb){
return findAsync_l(info, session, true, cb);
}
MediaSource::Ptr MediaSource::find(
const string &schema,
const string &vhost_tmp,
@ -124,20 +263,19 @@ MediaSource::Ptr MediaSource::find(
lock_guard<recursive_mutex> lock(g_mtxMediaSrc);
MediaSource::Ptr ret;
//查找某一媒体源,找到后返回
searchMedia(schema, vhost, app, id,
[&](SchemaVhostAppStreamMap::iterator &it0 ,
VhostAppStreamMap::iterator &it1,
AppStreamMap::iterator &it2,
StreamMap::iterator &it3){
ret = it3->second.lock();
if(!ret){
//该对象已经销毁
it2->second.erase(it3);
eraseIfEmpty(it0,it1,it2);
return false;
}
return true;
});
searchMedia(g_mapMediaSrc, schema, vhost, app, id, [&](SchemaVhostAppStreamMap::iterator &it0,
VhostAppStreamMap::iterator &it1,
AppStreamMap::iterator &it2,
StreamMap::iterator &it3) {
ret = it3->second.lock();
if (!ret) {
//该对象已经销毁
it2->second.erase(it3);
eraseIfEmpty(g_mapMediaSrc,it0, it1, it2);
return false;
}
return true;
});
if(!ret && bMake){
//未查找媒体源,则创建一个
ret = MediaReader::onMakeMediaSource(schema, vhost,app,id);
@ -166,17 +304,17 @@ void MediaSource::regist() {
bool MediaSource::unregist() {
//反注册该源
lock_guard<recursive_mutex> lock(g_mtxMediaSrc);
return searchMedia(_strSchema, _strVhost, _strApp, _strId, [&](SchemaVhostAppStreamMap::iterator &it0 ,
VhostAppStreamMap::iterator &it1,
AppStreamMap::iterator &it2,
StreamMap::iterator &it3){
return searchMedia(g_mapMediaSrc, _strSchema, _strVhost, _strApp, _strId,[&](SchemaVhostAppStreamMap::iterator &it0,
VhostAppStreamMap::iterator &it1,
AppStreamMap::iterator &it2,
StreamMap::iterator &it3) {
auto strongMedia = it3->second.lock();
if(strongMedia && this != strongMedia.get()){
if (strongMedia && this != strongMedia.get()) {
//不是自己,不允许反注册
return false;
}
it2->second.erase(it3);
eraseIfEmpty(it0,it1,it2);
eraseIfEmpty(g_mapMediaSrc, it0, it1, it2);
unregisted();
return true;
});
@ -192,6 +330,9 @@ void MediaSource::unregisted(){
*this);
}
/////////////////////////////////////MediaInfo//////////////////////////////////////
void MediaInfo::parse(const string &url){
//string url = "rtsp://127.0.0.1:8554/live/id?key=val&a=1&&b=2&vhost=vhost.com";
auto schema_pos = url.find("://");
@ -241,6 +382,8 @@ void MediaInfo::parse(const string &url){
}
}
/////////////////////////////////////MediaSourceEvent//////////////////////////////////////
void MediaSourceEvent::onNoneReader(MediaSource &sender){
//没有任何读取器消费该源,表明该源可以关闭了
WarnL << sender.getSchema() << "/" << sender.getVhost() << "/" << sender.getApp() << "/" << sender.getId();

View File

@ -45,7 +45,7 @@ using namespace toolkit;
namespace toolkit{
class TcpSession;
}//namespace toolkit
}// namespace toolkit
namespace mediakit {
@ -54,17 +54,18 @@ class MediaSourceEvent{
public:
MediaSourceEvent(){};
virtual ~MediaSourceEvent(){};
public:
// 通知拖动进度条
virtual bool seekTo(MediaSource &sender,uint32_t ui32Stamp){
//拖动进度条
return false;
}
// 通知其停止推流
virtual bool close(MediaSource &sender,bool force) {
//通知其停止推流
return false;
}
// 通知无人观看
virtual void onNoneReader(MediaSource &sender);
};
@ -92,6 +93,9 @@ public:
string _param_strs;
};
/**
* rtsp/rtmp的直播流都源自该对象
*/
class MediaSource: public TrackSource, public enable_shared_from_this<MediaSource> {
public:
typedef std::shared_ptr<MediaSource> Ptr;
@ -100,152 +104,59 @@ public:
typedef unordered_map<string, AppStreamMap > VhostAppStreamMap;
typedef unordered_map<string, VhostAppStreamMap > SchemaVhostAppStreamMap;
MediaSource(const string &strSchema,
const string &strVhost,
const string &strApp,
const string &strId) :
_strSchema(strSchema),
_strApp(strApp),
_strId(strId) {
if(strVhost.empty()){
_strVhost = DEFAULT_VHOST;
}else{
_strVhost = strVhost;
}
}
virtual ~MediaSource() {
unregist();
}
MediaSource(const string &strSchema, const string &strVhost, const string &strApp, const string &strId) ;
virtual ~MediaSource() ;
static Ptr find(const string &schema,
const string &vhost,
const string &app,
const string &id,
bool bMake = true) ;
static void findAsync(const MediaInfo &info,
const std::shared_ptr<TcpSession> &session,
bool retry,
const function<void(const MediaSource::Ptr &src)> &cb);
const string& getSchema() const {
return _strSchema;
}
const string& getVhost() const {
return _strVhost;
}
const string& getApp() const {
//获取该源的id
return _strApp;
}
const string& getId() const {
return _strId;
}
bool seekTo(uint32_t ui32Stamp) {
auto listener = _listener.lock();
if(!listener){
return false;
}
return listener->seekTo(*this,ui32Stamp);
}
// 获取协议类型
const string& getSchema() const;
// 虚拟主机
const string& getVhost() const;
// 应用名
const string& getApp() const;
// 流id
const string& getId() const;
// 获取所有Track
vector<Track::Ptr> getTracks(bool trackReady = true) const override;
// 获取监听者
const std::weak_ptr<MediaSourceEvent>& getListener() const;
// 设置TrackSource
void setTrackSource(const std::weak_ptr<TrackSource> &track_src);
// 设置监听者
virtual void setListener(const std::weak_ptr<MediaSourceEvent> &listener);
// 获取观看者个数
virtual int readerCount() = 0;
// 获取流当前时间戳
virtual uint32_t getTimeStamp(TrackType trackType) = 0;
bool close(bool force) {
auto listener = _listener.lock();
if(!listener){
return false;
}
return listener->close(*this,force);
}
// 拖动进度条
bool seekTo(uint32_t ui32Stamp);
// 关闭该流
bool close(bool force);
// 该流无人观看
void onNoneReader();
void onNoneReader(){
auto listener = _listener.lock();
if(!listener){
return;
}
listener->onNoneReader(*this);
}
// 同步查找流
static Ptr find(const string &schema, const string &vhost, const string &app, const string &id, bool bMake = true) ;
// 异步查找流
static void findAsync(const MediaInfo &info, const std::shared_ptr<TcpSession> &session, const function<void(const Ptr &src)> &cb);
// 遍历所有流
static void for_each_media(const function<void(const Ptr &src)> &cb);
virtual void setListener(const std::weak_ptr<MediaSourceEvent> &listener){
_listener = listener;
}
std::weak_ptr<MediaSourceEvent> getListener(){
return _listener;
}
template <typename FUN>
static void for_each_media(FUN && fun){
lock_guard<recursive_mutex> lock(g_mtxMediaSrc);
for (auto &pr0 : g_mapMediaSrc){
for(auto &pr1 : pr0.second){
for(auto &pr2 : pr1.second){
for(auto &pr3 : pr2.second){
fun(pr0.first,pr1.first,pr2.first,pr3.first,pr3.second.lock());
}
}
}
}
}
virtual int readerCount() = 0;
protected:
void regist() ;
bool unregist() ;
private:
template <typename FUN>
static bool searchMedia(const string &schema,
const string &vhost,
const string &app,
const string &id,
FUN &&fun){
auto it0 = g_mapMediaSrc.find(schema);
if (it0 == g_mapMediaSrc.end()) {
//未找到协议
return false;
}
auto it1 = it0->second.find(vhost);
if(it1 == it0->second.end()){
//未找到vhost
return false;
}
auto it2 = it1->second.find(app);
if(it2 == it1->second.end()){
//未找到app
return false;
}
auto it3 = it2->second.find(id);
if(it3 == it2->second.end()){
//未找到streamId
return false;
}
return fun(it0,it1,it2,it3);
}
template <typename IT0,typename IT1,typename IT2>
static void eraseIfEmpty(IT0 it0,IT1 it1,IT2 it2){
if(it2->second.empty()){
it1->second.erase(it2);
if(it1->second.empty()){
it0->second.erase(it1);
if(it0->second.empty()){
g_mapMediaSrc.erase(it0);
}
}
}
};
void unregisted();
protected:
std::weak_ptr<MediaSourceEvent> _listener;
private:
string _strSchema;//协议类型
string _strVhost; //vhost
string _strApp; //媒体app
string _strId; //媒体id
static SchemaVhostAppStreamMap g_mapMediaSrc; //静态的媒体源表
static recursive_mutex g_mtxMediaSrc; //访问静态的媒体源表的互斥锁
string _strSchema;
string _strVhost;
string _strApp;
string _strId;
std::weak_ptr<MediaSourceEvent> _listener;
weak_ptr<TrackSource> _track_source;
static SchemaVhostAppStreamMap g_mapMediaSrc;
static recursive_mutex g_mtxMediaSrc;
};
} /* namespace mediakit */

View File

@ -31,7 +31,7 @@
#include "Rtmp/RtmpMediaSourceMuxer.h"
#include "MediaFile/MediaRecorder.h"
class MultiMediaSourceMuxer : public MediaSink{
class MultiMediaSourceMuxer : public MediaSink , public std::enable_shared_from_this<MultiMediaSourceMuxer>{
public:
typedef std::shared_ptr<MultiMediaSourceMuxer> Ptr;
@ -128,9 +128,11 @@ protected:
*/
void onAllTrackReady() override{
if(_rtmp) {
_rtmp->setTrackSource(shared_from_this());
_rtmp->onAllTrackReady();
}
if(_rtsp) {
_rtmp->setTrackSource(shared_from_this());
_rtsp->onAllTrackReady();
}
}

View File

@ -141,9 +141,7 @@ public:
* @param trackReady Track
* @return
*/
virtual vector<Track::Ptr> getTracks(bool trackReady = true) const {
return vector<Track::Ptr>();
}
virtual vector<Track::Ptr> getTracks(bool trackReady = true) const = 0;
/**
* Track

View File

@ -197,7 +197,7 @@ bool HttpSession::checkLiveFlvStream(const function<void()> &cb){
bool bClose = !strcasecmp(_parser["Connection"].data(),"close");
weak_ptr<HttpSession> weakSelf = dynamic_pointer_cast<HttpSession>(shared_from_this());
MediaSource::findAsync(_mediaInfo,weakSelf.lock(), true,[weakSelf,bClose,this,cb](const MediaSource::Ptr &src){
MediaSource::findAsync(_mediaInfo,weakSelf.lock(),[weakSelf,bClose,this,cb](const MediaSource::Ptr &src){
auto strongSelf = weakSelf.lock();
if(!strongSelf){
//本对象已经销毁

View File

@ -42,13 +42,13 @@ MediaPlayer::MediaPlayer(const EventPoller::Ptr &poller) {
MediaPlayer::~MediaPlayer() {
}
void MediaPlayer::play(const string &strUrl) {
_parser = PlayerBase::createPlayer(_poller,strUrl);
_parser->setOnShutdown(_shutdownCB);
_parser->setOnPlayResult(_playResultCB);
_parser->setOnResume(_resumeCB);
_parser->setMediaSouce(_pMediaSrc);
_parser->mINI::operator=(*this);
_parser->play(strUrl);
_delegate = PlayerBase::createPlayer(_poller,strUrl);
_delegate->setOnShutdown(_shutdownCB);
_delegate->setOnPlayResult(_playResultCB);
_delegate->setOnResume(_resumeCB);
_delegate->setMediaSouce(_pMediaSrc);
_delegate->mINI::operator=(*this);
_delegate->play(strUrl);
}
EventPoller::Ptr MediaPlayer::getPoller(){
@ -56,14 +56,14 @@ EventPoller::Ptr MediaPlayer::getPoller(){
}
void MediaPlayer::pause(bool bPause) {
if (_parser) {
_parser->pause(bPause);
if (_delegate) {
_delegate->pause(bPause);
}
}
void MediaPlayer::teardown() {
if (_parser) {
_parser->teardown();
if (_delegate) {
_delegate->teardown();
}
}

View File

@ -127,6 +127,13 @@ public:
* @return
*/
virtual float getPacketLossRate(TrackType trackType) const {return 0; }
/**
* track
*/
vector<Track::Ptr> getTracks(bool trackReady = true) const override{
return vector<Track::Ptr>();
}
protected:
virtual void onShutdown(const SockException &ex) {}
virtual void onPlayResult(const SockException &ex) {}
@ -136,9 +143,8 @@ protected:
virtual void onResume(){};
};
template<typename Parent,typename Parser>
class PlayerImp : public Parent
{
template<typename Parent,typename Delegate>
class PlayerImp : public Parent {
public:
typedef std::shared_ptr<PlayerImp> Ptr;
@ -147,62 +153,62 @@ public:
virtual ~PlayerImp(){}
void setOnShutdown(const function<void(const SockException &)> &cb) override {
if (_parser) {
_parser->setOnShutdown(cb);
if (_delegate) {
_delegate->setOnShutdown(cb);
}
_shutdownCB = cb;
}
void setOnPlayResult(const function<void(const SockException &ex)> &cb) override {
if (_parser) {
_parser->setOnPlayResult(cb);
if (_delegate) {
_delegate->setOnPlayResult(cb);
}
_playResultCB = cb;
}
void setOnResume(const function<void()> &cb) override {
if (_parser) {
_parser->setOnResume(cb);
if (_delegate) {
_delegate->setOnResume(cb);
}
_resumeCB = cb;
}
bool isInited(int analysisMs) override{
if (_parser) {
return _parser->isInited(analysisMs);
if (_delegate) {
return _delegate->isInited(analysisMs);
}
return PlayerBase::isInited(analysisMs);
return Parent::isInited(analysisMs);
}
float getDuration() const override {
if (_parser) {
return _parser->getDuration();
if (_delegate) {
return _delegate->getDuration();
}
return PlayerBase::getDuration();
return Parent::getDuration();
}
float getProgress() const override{
if (_parser) {
return _parser->getProgress();
if (_delegate) {
return _delegate->getProgress();
}
return PlayerBase::getProgress();
return Parent::getProgress();
}
void seekTo(float fProgress) override{
if (_parser) {
return _parser->seekTo(fProgress);
if (_delegate) {
return _delegate->seekTo(fProgress);
}
return PlayerBase::seekTo(fProgress);
return Parent::seekTo(fProgress);
}
void setMediaSouce(const MediaSource::Ptr & src) override {
if (_parser) {
_parser->setMediaSouce(src);
if (_delegate) {
_delegate->setMediaSouce(src);
}
_pMediaSrc = src;
}
vector<Track::Ptr> getTracks(bool trackReady = true) const override{
if (_parser) {
return _parser->getTracks(trackReady);
if (_delegate) {
return _delegate->getTracks(trackReady);
}
return PlayerBase::getTracks(trackReady);
return Parent::getTracks(trackReady);
}
protected:
void onShutdown(const SockException &ex) override {
@ -228,7 +234,7 @@ protected:
function<void(const SockException &ex)> _shutdownCB;
function<void(const SockException &ex)> _playResultCB;
function<void()> _resumeCB;
std::shared_ptr<Parser> _parser;
std::shared_ptr<Delegate> _delegate;
MediaSource::Ptr _pMediaSrc;
};

View File

@ -138,13 +138,13 @@ void PlayerProxy::play(const string &strUrlTmp) {
MediaPlayer::play(strUrlTmp);
MediaSource::Ptr mediaSource;
if(dynamic_pointer_cast<RtspPlayer>(_parser)){
if(dynamic_pointer_cast<RtspPlayer>(_delegate)){
//rtsp拉流
GET_CONFIG(bool,directProxy,Rtsp::kDirectProxy);
if(directProxy && _bEnableRtsp){
mediaSource = std::make_shared<RtspMediaSource>(_strVhost,_strApp,_strSrc);
}
} else if(dynamic_pointer_cast<RtmpPlayer>(_parser)){
} else if(dynamic_pointer_cast<RtmpPlayer>(_delegate)){
//rtmp拉流
if(_bEnableRtmp){
mediaSource = std::make_shared<RtmpMediaSource>(_strVhost,_strApp,_strSrc);

View File

@ -56,6 +56,11 @@ public:
void onAllTrackReady(){
_mediaSouce->onGetMetaData(getMetadata());
}
// 设置TrackSource
void setTrackSource(const std::weak_ptr<TrackSource> &track_src){
_mediaSouce->setTrackSource(track_src);
}
private:
RtmpMediaSource::Ptr _mediaSouce;
};

View File

@ -67,18 +67,18 @@ private:
if(_pRtmpMediaSrc){
_pRtmpMediaSrc->onGetMetaData(val);
}
_parser.reset(new RtmpDemuxer(val));
_delegate.reset(new RtmpDemuxer(val));
return true;
}
void onMediaData(const RtmpPacket::Ptr &chunkData) override {
if(_pRtmpMediaSrc){
_pRtmpMediaSrc->onWrite(chunkData);
}
if(!_parser){
if(!_delegate){
//这个流没有metadata
_parser.reset(new RtmpDemuxer());
_delegate.reset(new RtmpDemuxer());
}
_parser->inputRtmp(chunkData);
_delegate->inputRtmp(chunkData);
}
private:
RtmpMediaSource::Ptr _pRtmpMediaSrc;

View File

@ -319,7 +319,7 @@ void RtmpSession::doPlayResponse(const string &err,const std::function<void(bool
//鉴权成功,查找媒体源并回复
weak_ptr<RtmpSession> weakSelf = dynamic_pointer_cast<RtmpSession>(shared_from_this());
MediaSource::findAsync(_mediaInfo,weakSelf.lock(), true,[weakSelf,cb](const MediaSource::Ptr &src){
MediaSource::findAsync(_mediaInfo,weakSelf.lock(),[weakSelf,cb](const MediaSource::Ptr &src){
auto rtmp_src = dynamic_pointer_cast<RtmpMediaSource>(src);
auto strongSelf = weakSelf.lock();
if(strongSelf){

View File

@ -52,7 +52,8 @@ public:
RtmpToRtspMediaSource(const string &vhost,
const string &app,
const string &id,
int ringSize = 0) : RtmpMediaSource(vhost, app, id,ringSize){
int ringSize = 0) :
RtmpMediaSource(vhost, app, id,ringSize){
}
virtual ~RtmpToRtspMediaSource(){}
@ -83,7 +84,7 @@ public:
_muxer->addTrack(track);
track->addDelegate(_muxer);
}
_muxer->setListener(_listener);
_muxer->setListener(getListener());
}
RtmpMediaSource::onWrite(pkt,key_pos);
}

View File

@ -60,6 +60,11 @@ public:
void onAllTrackReady(){
_mediaSouce->onGetSDP(getSdp());
}
// 设置TrackSource
void setTrackSource(const std::weak_ptr<TrackSource> &track_src){
_mediaSouce->setTrackSource(track_src);
}
private:
RtspMediaSource::Ptr _mediaSouce;
};

View File

@ -66,16 +66,16 @@ private:
if(_pRtspMediaSrc){
_pRtspMediaSrc->onGetSDP(sdp);
}
_parser.reset(new RtspDemuxer(sdp));
_delegate.reset(new RtspDemuxer(sdp));
return true;
}
void onRecvRTP(const RtpPacket::Ptr &rtp, const SdpTrack::Ptr &track) override {
if(_pRtspMediaSrc){
_pRtspMediaSrc->onWrite(rtp,true);
}
_parser->inputRtp(rtp);
_delegate->inputRtp(rtp);
if(_maxAnalysisMS && _parser->isInited(_maxAnalysisMS)){
if(_maxAnalysisMS && _delegate->isInited(_maxAnalysisMS)){
PlayerImp<RtspPlayer,RtspDemuxer>::onPlayResult(SockException(Err_success,"play rtsp success"));
_maxAnalysisMS = 0;
}
@ -87,7 +87,7 @@ private:
//如果超过这个时间还未获取成功那么会强制触发onPlayResult事件(虽然此时有些track还未初始化成功)
void onPlayResult(const SockException &ex) override {
//isInited判断条件无超时
if(ex || _parser->isInited(0)){
if(ex || _delegate->isInited(0)){
//已经初始化成功说明sdp里面有完善的信息
PlayerImp<RtspPlayer,RtspDemuxer>::onPlayResult(ex);
}else{

View File

@ -371,7 +371,7 @@ void RtspSession::handleReq_Describe(const Parser &parser) {
void RtspSession::onAuthSuccess() {
TraceP(this);
weak_ptr<RtspSession> weakSelf = dynamic_pointer_cast<RtspSession>(shared_from_this());
MediaSource::findAsync(_mediaInfo,weakSelf.lock(), true,[weakSelf](const MediaSource::Ptr &src){
MediaSource::findAsync(_mediaInfo,weakSelf.lock(),[weakSelf](const MediaSource::Ptr &src){
auto strongSelf = weakSelf.lock();
if(!strongSelf){
return;

View File

@ -43,7 +43,8 @@ public:
RtspToRtmpMediaSource(const string &vhost,
const string &app,
const string &id,
int ringSize = 0) : RtspMediaSource(vhost, app, id,ringSize) {
int ringSize = 0)
: RtspMediaSource(vhost, app, id,ringSize) {
}
virtual ~RtspToRtmpMediaSource() {}
@ -69,7 +70,7 @@ public:
_muxer->addTrack(track);
track->addDelegate(_muxer);
}
_muxer->setListener(_listener);
_muxer->setListener(getListener());
}
}
RtspMediaSource::onWrite(rtp, bKeyPos);

View File

@ -16,39 +16,35 @@ class CMD_media: public CMD {
public:
CMD_media(){
_parser.reset(new OptionParser([](const std::shared_ptr<ostream> &stream,mINI &ini){
MediaSource::for_each_media([&](const string &schema,
const string &vhost,
const string &app,
const string &streamid,
const MediaSource::Ptr &media){
if(!ini["schema"].empty() && ini["schema"] != schema){
MediaSource::for_each_media([&](const MediaSource::Ptr &media){
if(!ini["schema"].empty() && ini["schema"] != media->getSchema()){
//筛选协议不匹配
return;
}
if(!ini["vhost"].empty() && ini["vhost"] != vhost){
if(!ini["vhost"].empty() && ini["vhost"] != media->getVhost()){
//筛选虚拟主机不匹配
return;
}
if(!ini["app"].empty() && ini["app"] != app){
if(!ini["app"].empty() && ini["app"] != media->getApp()){
//筛选应用名不匹配
return;
}
if(!ini["stream"].empty() && ini["stream"] != streamid){
if(!ini["stream"].empty() && ini["stream"] != media->getId()){
//流id不匹配
return;
}
if(ini.find("list") != ini.end()){
//列出源
(*stream) << "\t"
<< schema << "/"
<< vhost << "/"
<< app << "/"
<< streamid
<< media->getSchema() << "/"
<< media->getVhost() << "/"
<< media->getApp() << "/"
<< media->getId()
<< "\r\n";
return;
}
EventPollerPool::Instance().getPoller()->async([ini,media,stream,schema,vhost,app,streamid](){
EventPollerPool::Instance().getPoller()->async([ini,media,stream](){
if(ini.find("kick") != ini.end()){
//踢出源
do{
@ -59,18 +55,18 @@ public:
break;
}
(*stream) << "\t踢出成功:"
<< schema << "/"
<< vhost << "/"
<< app << "/"
<< streamid
<< media->getSchema() << "/"
<< media->getVhost() << "/"
<< media->getApp() << "/"
<< media->getId()
<< "\r\n";
return;
}while(0);
(*stream) << "\t踢出失败:"
<< schema << "/"
<< vhost << "/"
<< app << "/"
<< streamid
<< media->getSchema() << "/"
<< media->getVhost() << "/"
<< media->getApp() << "/"
<< media->getId()
<< "\r\n";
}
},false);