整理rtsp服务器相关代码

This commit is contained in:
xiongziliang 2020-07-10 10:42:23 +08:00
parent bc5931dce9
commit b588053571
2 changed files with 335 additions and 358 deletions

View File

@ -56,12 +56,12 @@ static unordered_map<string, weak_ptr<RtspSession> > g_mapGetter;
//对g_mapGetter上锁保护 //对g_mapGetter上锁保护
static recursive_mutex g_mtxGetter; static recursive_mutex g_mtxGetter;
RtspSession::RtspSession(const Socket::Ptr &pSock) : TcpSession(pSock) { RtspSession::RtspSession(const Socket::Ptr &sock) : TcpSession(sock) {
DebugP(this); DebugP(this);
GET_CONFIG(uint32_t,keep_alive_sec,Rtsp::kKeepAliveSecond); GET_CONFIG(uint32_t,keep_alive_sec,Rtsp::kKeepAliveSecond);
pSock->setSendTimeOutSecond(keep_alive_sec); sock->setSendTimeOutSecond(keep_alive_sec);
//起始接收buffer缓存设置为4K节省内存 //起始接收buffer缓存设置为4K节省内存
pSock->setReadBuffer(std::make_shared<BufferRaw>(4 * 1024)); sock->setReadBuffer(std::make_shared<BufferRaw>(4 * 1024));
} }
RtspSession::~RtspSession() { RtspSession::~RtspSession() {
@ -69,16 +69,16 @@ RtspSession::~RtspSession() {
} }
void RtspSession::onError(const SockException &err) { void RtspSession::onError(const SockException &err) {
bool isPlayer = !_pushSrc; bool isPlayer = !_push_src;
uint64_t duration = _ticker.createdTime()/1000; uint64_t duration = _alive_ticker.createdTime() / 1000;
WarnP(this) << (isPlayer ? "RTSP播放器(" : "RTSP推流器(") WarnP(this) << (isPlayer ? "RTSP播放器(" : "RTSP推流器(")
<< _mediaInfo._vhost << "/" << _media_info._vhost << "/"
<< _mediaInfo._app << "/" << _media_info._app << "/"
<< _mediaInfo._streamid << _media_info._streamid
<< ")断开:" << err.what() << ")断开:" << err.what()
<< ",耗时(s):" << duration; << ",耗时(s):" << duration;
if (_rtpType == Rtsp::RTP_MULTICAST) { if (_rtp_type == Rtsp::RTP_MULTICAST) {
//取消UDP端口监听 //取消UDP端口监听
UDPServer::Instance().stopListenPeer(get_peer_ip().data(), this); UDPServer::Instance().stopListenPeer(get_peer_ip().data(), this);
} }
@ -91,8 +91,8 @@ void RtspSession::onError(const SockException& err) {
//流量统计事件广播 //流量统计事件广播
GET_CONFIG(uint32_t,iFlowThreshold,General::kFlowThreshold); GET_CONFIG(uint32_t,iFlowThreshold,General::kFlowThreshold);
if(_ui64TotalBytes > iFlowThreshold * 1024){ if(_bytes_usage > iFlowThreshold * 1024){
NoticeCenter::Instance().emitEvent(Broadcast::kBroadcastFlowReport, _mediaInfo, _ui64TotalBytes, duration, isPlayer, static_cast<SockInfo &>(*this)); NoticeCenter::Instance().emitEvent(Broadcast::kBroadcastFlowReport, _media_info, _bytes_usage, duration, isPlayer, static_cast<SockInfo &>(*this));
} }
} }
@ -101,30 +101,28 @@ void RtspSession::onManager() {
GET_CONFIG(uint32_t,handshake_sec,Rtsp::kHandshakeSecond); GET_CONFIG(uint32_t,handshake_sec,Rtsp::kHandshakeSecond);
GET_CONFIG(uint32_t,keep_alive_sec,Rtsp::kKeepAliveSecond); GET_CONFIG(uint32_t,keep_alive_sec,Rtsp::kKeepAliveSecond);
if (_ticker.createdTime() > handshake_sec * 1000) { if (_alive_ticker.createdTime() > handshake_sec * 1000) {
if (_strSession.size() == 0) { if (_sessionid.size() == 0) {
shutdown(SockException(Err_timeout,"illegal connection")); shutdown(SockException(Err_timeout,"illegal connection"));
return; return;
} }
} }
if ((_rtp_type == Rtsp::RTP_UDP || _push_src ) && _alive_ticker.elapsedTime() > keep_alive_sec * 1000) {
if ((_rtpType == Rtsp::RTP_UDP || _pushSrc ) && _ticker.elapsedTime() > keep_alive_sec * 1000) {
//如果是推流端或者rtp over udp类型的播放端那么就做超时检测 //如果是推流端或者rtp over udp类型的播放端那么就做超时检测
shutdown(SockException(Err_timeout,"rtp over udp session timeouted")); shutdown(SockException(Err_timeout,"rtp over udp session timeouted"));
return; return;
} }
} }
void RtspSession::onRecv(const Buffer::Ptr &pBuf) { void RtspSession::onRecv(const Buffer::Ptr &buf) {
_ticker.resetTime(); _alive_ticker.resetTime();
_ui64TotalBytes += pBuf->size(); _bytes_usage += buf->size();
if (_onRecv) { if (_on_recv) {
//http poster的请求数据转发给http getter处理 //http poster的请求数据转发给http getter处理
_onRecv(pBuf); _on_recv(buf);
} else { } else {
// TraceP(this) << pBuf->size() << "\r\n" << pBuf->data(); input(buf->data(), buf->size());
input(pBuf->data(),pBuf->size());
} }
} }
@ -132,15 +130,15 @@ void RtspSession::onRecv(const Buffer::Ptr &pBuf) {
static inline bool end_of(const string &str, const string &substr){ static inline bool end_of(const string &str, const string &substr){
auto pos = str.rfind(substr); auto pos = str.rfind(substr);
return pos != string::npos && pos == str.size() - substr.size(); return pos != string::npos && pos == str.size() - substr.size();
}; }
void RtspSession::onWholeRtspPacket(Parser &parser) { void RtspSession::onWholeRtspPacket(Parser &parser) {
string strCmd = parser.Method(); //提取出请求命令字 string method = parser.Method(); //提取出请求命令字
_iCseq = atoi(parser["CSeq"].data()); _cseq = atoi(parser["CSeq"].data());
if(_strContentBase.empty() && strCmd != "GET"){ if(_content_base.empty() && method != "GET"){
_strContentBase = parser.Url(); _content_base = parser.Url();
_mediaInfo.parse(parser.FullUrl()); _media_info.parse(parser.FullUrl());
_mediaInfo._schema = RTSP_SCHEMA; _media_info._schema = RTSP_SCHEMA;
} }
typedef void (RtspSession::*rtsp_request_handler)(const Parser &parser); typedef void (RtspSession::*rtsp_request_handler)(const Parser &parser);
@ -160,10 +158,10 @@ void RtspSession::onWholeRtspPacket(Parser &parser) {
s_cmd_functions.emplace("GET_PARAMETER",&RtspSession::handleReq_SET_PARAMETER); s_cmd_functions.emplace("GET_PARAMETER",&RtspSession::handleReq_SET_PARAMETER);
}, []() {}); }, []() {});
auto it = s_cmd_functions.find(strCmd); auto it = s_cmd_functions.find(method);
if (it == s_cmd_functions.end()) { if (it == s_cmd_functions.end()) {
sendRtspResponse("403 Forbidden"); sendRtspResponse("403 Forbidden");
shutdown(SockException(Err_shutdown,StrPrinter << "403 Forbidden:" << strCmd)); shutdown(SockException(Err_shutdown,StrPrinter << "403 Forbidden:" << method));
return; return;
} }
@ -181,24 +179,22 @@ void RtspSession::onWholeRtspPacket(Parser &parser) {
} }
void RtspSession::onRtpPacket(const char *data, uint64_t len) { void RtspSession::onRtpPacket(const char *data, uint64_t len) {
if(!_pushSrc){ if(!_push_src){
return; return;
} }
int trackIdx = -1;
uint8_t interleaved = data[1]; uint8_t interleaved = data[1];
if(interleaved %2 == 0){ if(interleaved %2 == 0){
trackIdx = getTrackIndexByInterleaved(interleaved); auto track_idx = getTrackIndexByInterleaved(interleaved);
handleOneRtp(trackIdx, _aTrackInfo[trackIdx]->_type, _aTrackInfo[trackIdx]->_samplerate, (unsigned char *) data + 4, len - 4); handleOneRtp(track_idx, _sdp_track[track_idx]->_type, _sdp_track[track_idx]->_samplerate, (unsigned char *) data + 4, len - 4);
}else{ }else{
trackIdx = getTrackIndexByInterleaved(interleaved - 1); auto track_idx = getTrackIndexByInterleaved(interleaved - 1);
onRtcpPacket(trackIdx, _aTrackInfo[trackIdx], (unsigned char *) data + 4, len - 4); onRtcpPacket(track_idx, _sdp_track[track_idx], (unsigned char *) data + 4, len - 4);
} }
} }
void RtspSession::onRtcpPacket(int iTrackidx, SdpTrack::Ptr &track, unsigned char *pucData, unsigned int uiLen){ void RtspSession::onRtcpPacket(int track_idx, SdpTrack::Ptr &track, unsigned char *data, unsigned int len){}
}
int64_t RtspSession::getContentLength(Parser &parser) { int64_t RtspSession::getContentLength(Parser &parser) {
if(parser.Method() == "POST"){ if(parser.Method() == "POST"){
//http post请求的content数据部分是base64编码后的rtsp请求信令包 //http post请求的content数据部分是base64编码后的rtsp请求信令包
@ -207,7 +203,6 @@ int64_t RtspSession::getContentLength(Parser &parser) {
return RtspSplitter::getContentLength(parser); return RtspSplitter::getContentLength(parser);
} }
void RtspSession::handleReq_Options(const Parser &parser) { void RtspSession::handleReq_Options(const Parser &parser) {
//支持这些命令 //支持这些命令
sendRtspResponse("200 OK",{"Public" , "OPTIONS, DESCRIBE, SETUP, TEARDOWN, PLAY, PAUSE, ANNOUNCE, RECORD, SET_PARAMETER, GET_PARAMETER"}); sendRtspResponse("200 OK",{"Public" , "OPTIONS, DESCRIBE, SETUP, TEARDOWN, PLAY, PAUSE, ANNOUNCE, RECORD, SET_PARAMETER, GET_PARAMETER"});
@ -215,16 +210,16 @@ void RtspSession::handleReq_Options(const Parser &parser) {
void RtspSession::handleReq_ANNOUNCE(const Parser &parser) { void RtspSession::handleReq_ANNOUNCE(const Parser &parser) {
auto src = dynamic_pointer_cast<RtspMediaSource>(MediaSource::find(RTSP_SCHEMA, auto src = dynamic_pointer_cast<RtspMediaSource>(MediaSource::find(RTSP_SCHEMA,
_mediaInfo._vhost, _media_info._vhost,
_mediaInfo._app, _media_info._app,
_mediaInfo._streamid)); _media_info._streamid));
if(src){ if(src){
sendRtspResponse("406 Not Acceptable", {"Content-Type", "text/plain"}, "Already publishing."); sendRtspResponse("406 Not Acceptable", {"Content-Type", "text/plain"}, "Already publishing.");
string err = StrPrinter << "ANNOUNCE:" string err = StrPrinter << "ANNOUNCE:"
<< "Already publishing:" << "Already publishing:"
<< _mediaInfo._vhost << " " << _media_info._vhost << " "
<< _mediaInfo._app << " " << _media_info._app << " "
<< _mediaInfo._streamid << endl; << _media_info._streamid << endl;
throw SockException(Err_shutdown,err); throw SockException(Err_shutdown,err);
} }
@ -232,30 +227,30 @@ void RtspSession::handleReq_ANNOUNCE(const Parser &parser) {
if(end_of(full_url,".sdp")){ if(end_of(full_url,".sdp")){
//去除.sdp后缀防止EasyDarwin推流器强制添加.sdp后缀 //去除.sdp后缀防止EasyDarwin推流器强制添加.sdp后缀
full_url = full_url.substr(0,full_url.length() - 4); full_url = full_url.substr(0,full_url.length() - 4);
_mediaInfo.parse(full_url); _media_info.parse(full_url);
} }
if(_mediaInfo._app.empty() || _mediaInfo._streamid.empty()){ if(_media_info._app.empty() || _media_info._streamid.empty()){
//推流rtsp url必须最少两级(rtsp://host/app/stream_id)不允许莫名其妙的推流url //推流rtsp url必须最少两级(rtsp://host/app/stream_id)不允许莫名其妙的推流url
sendRtspResponse("403 Forbidden", {"Content-Type", "text/plain"}, "rtsp推流url非法,最少确保两级rtsp url"); sendRtspResponse("403 Forbidden", {"Content-Type", "text/plain"}, "rtsp推流url非法,最少确保两级rtsp url");
throw SockException(Err_shutdown,StrPrinter << "rtsp推流url非法:" << full_url); throw SockException(Err_shutdown,StrPrinter << "rtsp推流url非法:" << full_url);
} }
SdpParser sdpParser(parser.Content()); SdpParser sdpParser(parser.Content());
_strSession = makeRandStr(12); _sessionid = makeRandStr(12);
_aTrackInfo = sdpParser.getAvailableTrack(); _sdp_track = sdpParser.getAvailableTrack();
_pushSrc = std::make_shared<RtspMediaSourceImp>(_mediaInfo._vhost,_mediaInfo._app,_mediaInfo._streamid); _push_src = std::make_shared<RtspMediaSourceImp>(_media_info._vhost, _media_info._app, _media_info._streamid);
_pushSrc->setListener(dynamic_pointer_cast<MediaSourceEvent>(shared_from_this())); _push_src->setListener(dynamic_pointer_cast<MediaSourceEvent>(shared_from_this()));
_pushSrc->setSdp(sdpParser.toString()); _push_src->setSdp(sdpParser.toString());
sendRtspResponse("200 OK",{"Content-Base",_strContentBase + "/"}); sendRtspResponse("200 OK",{"Content-Base", _content_base + "/"});
} }
void RtspSession::handleReq_RECORD(const Parser &parser){ void RtspSession::handleReq_RECORD(const Parser &parser){
if (_aTrackInfo.empty() || parser["Session"] != _strSession) { if (_sdp_track.empty() || parser["Session"] != _sessionid) {
send_SessionNotFound(); send_SessionNotFound();
throw SockException(Err_shutdown,_aTrackInfo.empty() ? "can not find any availabe track when record" : "session not found when record"); throw SockException(Err_shutdown, _sdp_track.empty() ? "can not find any availabe track when record" : "session not found when record");
} }
auto onRes = [this](const string &err,bool enableRtxp,bool enableHls,bool enableMP4){ auto onRes = [this](const string &err,bool enableRtxp,bool enableHls,bool enableMP4){
bool authSuccess = err.empty(); bool authSuccess = err.empty();
@ -266,21 +261,21 @@ void RtspSession::handleReq_RECORD(const Parser &parser){
} }
//设置转协议 //设置转协议
_pushSrc->setProtocolTranslation(enableRtxp,enableHls,enableMP4); _push_src->setProtocolTranslation(enableRtxp, enableHls, enableMP4);
_StrPrinter rtp_info; _StrPrinter rtp_info;
for(auto &track : _aTrackInfo){ for(auto &track : _sdp_track){
if (track->_inited == false) { if (track->_inited == false) {
//还有track没有setup //还有track没有setup
shutdown(SockException(Err_shutdown,"track not setuped")); shutdown(SockException(Err_shutdown,"track not setuped"));
return; return;
} }
rtp_info << "url=" << _strContentBase << "/" << track->_control_surffix << ","; rtp_info << "url=" << _content_base << "/" << track->_control_surffix << ",";
} }
rtp_info.pop_back(); rtp_info.pop_back();
sendRtspResponse("200 OK", {"RTP-Info",rtp_info}); sendRtspResponse("200 OK", {"RTP-Info",rtp_info});
if(_rtpType == Rtsp::RTP_TCP){ if(_rtp_type == Rtsp::RTP_TCP){
//如果是rtsp推流服务器并且是TCP推流那么加大TCP接收缓存这样能提升接收性能 //如果是rtsp推流服务器并且是TCP推流那么加大TCP接收缓存这样能提升接收性能
_sock->setReadBuffer(std::make_shared<BufferRaw>(256 * 1024)); _sock->setReadBuffer(std::make_shared<BufferRaw>(256 * 1024));
setSocketFlags(); setSocketFlags();
@ -303,7 +298,7 @@ void RtspSession::handleReq_RECORD(const Parser &parser){
}; };
//rtsp推流需要鉴权 //rtsp推流需要鉴权
auto flag = NoticeCenter::Instance().emitEvent(Broadcast::kBroadcastMediaPublish,_mediaInfo,invoker,static_cast<SockInfo &>(*this)); auto flag = NoticeCenter::Instance().emitEvent(Broadcast::kBroadcastMediaPublish, _media_info, invoker, static_cast<SockInfo &>(*this));
if(!flag){ if(!flag){
//该事件无人监听,默认不鉴权 //该事件无人监听,默认不鉴权
GET_CONFIG(bool,toRtxp,General::kPublishToRtxp); GET_CONFIG(bool,toRtxp,General::kPublishToRtxp);
@ -341,7 +336,7 @@ void RtspSession::emitOnPlay(){
}; };
//广播通用播放url鉴权事件 //广播通用播放url鉴权事件
auto flag = _emit_on_play ? false : NoticeCenter::Instance().emitEvent(Broadcast::kBroadcastMediaPlayed, _mediaInfo, invoker, static_cast<SockInfo &>(*this)); auto flag = _emit_on_play ? false : NoticeCenter::Instance().emitEvent(Broadcast::kBroadcastMediaPlayed, _media_info, invoker, static_cast<SockInfo &>(*this));
if (!flag) { if (!flag) {
//该事件无人监听,默认不鉴权 //该事件无人监听,默认不鉴权
onRes(""); onRes("");
@ -381,7 +376,7 @@ void RtspSession::handleReq_Describe(const Parser &parser) {
if(_rtsp_realm.empty()){ if(_rtsp_realm.empty()){
//广播是否需要rtsp专属认证事件 //广播是否需要rtsp专属认证事件
if (!NoticeCenter::Instance().emitEvent(Broadcast::kBroadcastOnGetRtspRealm, _mediaInfo, invoker, static_cast<SockInfo &>(*this))) { if (!NoticeCenter::Instance().emitEvent(Broadcast::kBroadcastOnGetRtspRealm, _media_info, invoker, static_cast<SockInfo &>(*this))) {
//无人监听此事件,说明无需认证 //无人监听此事件,说明无需认证
invoker(""); invoker("");
} }
@ -389,10 +384,11 @@ void RtspSession::handleReq_Describe(const Parser &parser) {
invoker(_rtsp_realm); invoker(_rtsp_realm);
} }
} }
void RtspSession::onAuthSuccess() { void RtspSession::onAuthSuccess() {
TraceP(this); TraceP(this);
weak_ptr<RtspSession> weakSelf = dynamic_pointer_cast<RtspSession>(shared_from_this()); weak_ptr<RtspSession> weakSelf = dynamic_pointer_cast<RtspSession>(shared_from_this());
MediaSource::findAsync(_mediaInfo,weakSelf.lock(),[weakSelf](const MediaSource::Ptr &src){ MediaSource::findAsync(_media_info, weakSelf.lock(), [weakSelf](const MediaSource::Ptr &src){
auto strongSelf = weakSelf.lock(); auto strongSelf = weakSelf.lock();
if(!strongSelf){ if(!strongSelf){
return; return;
@ -400,43 +396,44 @@ void RtspSession::onAuthSuccess() {
auto rtsp_src = dynamic_pointer_cast<RtspMediaSource>(src); auto rtsp_src = dynamic_pointer_cast<RtspMediaSource>(src);
if (!rtsp_src) { if (!rtsp_src) {
//未找到相应的MediaSource //未找到相应的MediaSource
string err = StrPrinter << "no such stream:" << strongSelf->_mediaInfo._vhost << " " << strongSelf->_mediaInfo._app << " " << strongSelf->_mediaInfo._streamid; string err = StrPrinter << "no such stream:" << strongSelf->_media_info._vhost << " " << strongSelf->_media_info._app << " " << strongSelf->_media_info._streamid;
strongSelf->send_StreamNotFound(); strongSelf->send_StreamNotFound();
strongSelf->shutdown(SockException(Err_shutdown,err)); strongSelf->shutdown(SockException(Err_shutdown,err));
return; return;
} }
//找到了相应的rtsp流 //找到了相应的rtsp流
strongSelf->_aTrackInfo = SdpParser(rtsp_src->getSdp()).getAvailableTrack(); strongSelf->_sdp_track = SdpParser(rtsp_src->getSdp()).getAvailableTrack();
if (strongSelf->_aTrackInfo.empty()) { if (strongSelf->_sdp_track.empty()) {
//该流无效 //该流无效
DebugL << "无trackInfo该流无效"; DebugL << "无trackInfo该流无效";
strongSelf->send_StreamNotFound(); strongSelf->send_StreamNotFound();
strongSelf->shutdown(SockException(Err_shutdown,"can not find any available track in sdp")); strongSelf->shutdown(SockException(Err_shutdown,"can not find any available track in sdp"));
return; return;
} }
strongSelf->_strSession = makeRandStr(12); strongSelf->_sessionid = makeRandStr(12);
strongSelf->_pMediaSrc = rtsp_src; strongSelf->_play_src = rtsp_src;
for(auto &track : strongSelf->_aTrackInfo){ for(auto &track : strongSelf->_sdp_track){
track->_ssrc = rtsp_src->getSsrc(track->_type); track->_ssrc = rtsp_src->getSsrc(track->_type);
track->_seq = rtsp_src->getSeqence(track->_type); track->_seq = rtsp_src->getSeqence(track->_type);
track->_time_stamp = rtsp_src->getTimeStamp(track->_type); track->_time_stamp = rtsp_src->getTimeStamp(track->_type);
} }
strongSelf->sendRtspResponse("200 OK", strongSelf->sendRtspResponse("200 OK",
{"Content-Base",strongSelf->_strContentBase + "/", {"Content-Base", strongSelf->_content_base + "/",
"x-Accept-Retransmit","our-retransmit", "x-Accept-Retransmit","our-retransmit",
"x-Accept-Dynamic-Rate","1" "x-Accept-Dynamic-Rate","1"
},rtsp_src->getSdp()); },rtsp_src->getSdp());
}); });
} }
void RtspSession::onAuthFailed(const string &realm,const string &why,bool close) { void RtspSession::onAuthFailed(const string &realm,const string &why,bool close) {
GET_CONFIG(bool,authBasic,Rtsp::kAuthBasic); GET_CONFIG(bool,authBasic,Rtsp::kAuthBasic);
if (!authBasic) { if (!authBasic) {
//我们需要客户端优先以md5方式认证 //我们需要客户端优先以md5方式认证
_strNonce = makeRandStr(32); _auth_nonce = makeRandStr(32);
sendRtspResponse("401 Unauthorized", sendRtspResponse("401 Unauthorized",
{"WWW-Authenticate", {"WWW-Authenticate",
StrPrinter << "Digest realm=\"" << realm << "\",nonce=\"" << _strNonce << "\"" }); StrPrinter << "Digest realm=\"" << realm << "\",nonce=\"" << _auth_nonce << "\"" });
}else { }else {
//当然我们也支持base64认证,但是我们不建议这样做 //当然我们也支持base64认证,但是我们不建议这样做
sendRtspResponse("401 Unauthorized", sendRtspResponse("401 Unauthorized",
@ -448,10 +445,10 @@ void RtspSession::onAuthFailed(const string &realm,const string &why,bool close)
} }
} }
void RtspSession::onAuthBasic(const string &realm,const string &strBase64){ void RtspSession::onAuthBasic(const string &realm,const string &auth_base64){
//base64认证 //base64认证
char user_pwd_buf[512]; char user_pwd_buf[512];
av_base64_decode((uint8_t *)user_pwd_buf,strBase64.data(),strBase64.size()); av_base64_decode((uint8_t *) user_pwd_buf, auth_base64.data(), auth_base64.size());
auto user_pwd_vec = split(user_pwd_buf, ":"); auto user_pwd_vec = split(user_pwd_buf, ":");
if (user_pwd_vec.size() < 2) { if (user_pwd_vec.size() < 2) {
//认证信息格式不合法回复401 Unauthorized //认证信息格式不合法回复401 Unauthorized
@ -486,7 +483,7 @@ void RtspSession::onAuthBasic(const string &realm,const string &strBase64){
}; };
//此时必须提供明文密码 //此时必须提供明文密码
if(!NoticeCenter::Instance().emitEvent(Broadcast::kBroadcastOnRtspAuth,_mediaInfo,realm,user, true,invoker,static_cast<SockInfo &>(*this))){ if (!NoticeCenter::Instance().emitEvent(Broadcast::kBroadcastOnRtspAuth, _media_info, realm, user, true, invoker, static_cast<SockInfo &>(*this))) {
//表明该流需要认证却没监听请求密码事件,这一般是大意的程序所为,警告之 //表明该流需要认证却没监听请求密码事件,这一般是大意的程序所为,警告之
WarnP(this) << "请监听kBroadcastOnRtspAuth事件"; WarnP(this) << "请监听kBroadcastOnRtspAuth事件";
//但是我们还是忽略认证以便完成播放 //但是我们还是忽略认证以便完成播放
@ -495,9 +492,9 @@ void RtspSession::onAuthBasic(const string &realm,const string &strBase64){
} }
} }
void RtspSession::onAuthDigest(const string &realm,const string &strMd5){ void RtspSession::onAuthDigest(const string &realm,const string &auth_md5){
DebugP(this) << strMd5; DebugP(this) << auth_md5;
auto mapTmp = Parser::parseArgs(strMd5,",","="); auto mapTmp = Parser::parseArgs(auth_md5, ",", "=");
decltype(mapTmp) map; decltype(mapTmp) map;
for(auto &pr : mapTmp){ for(auto &pr : mapTmp){
map[trim(string(pr.first)," \"")] = trim(pr.second," \""); map[trim(string(pr.first)," \"")] = trim(pr.second," \"");
@ -509,8 +506,8 @@ void RtspSession::onAuthDigest(const string &realm,const string &strMd5){
} }
//check nonce //check nonce
auto nonce = map["nonce"]; auto nonce = map["nonce"];
if(_strNonce != nonce){ if(_auth_nonce != nonce){
onAuthFailed(realm,StrPrinter << "nonce not mached:" << nonce << " != " << _strNonce); onAuthFailed(realm,StrPrinter << "nonce not mached:" << nonce << " != " << _auth_nonce);
return ; return ;
} }
//check username and uri //check username and uri
@ -570,7 +567,7 @@ void RtspSession::onAuthDigest(const string &realm,const string &strMd5){
}; };
//此时可以提供明文或md5加密的密码 //此时可以提供明文或md5加密的密码
if(!NoticeCenter::Instance().emitEvent(Broadcast::kBroadcastOnRtspAuth,_mediaInfo,realm,username, false,invoker,static_cast<SockInfo &>(*this))){ if(!NoticeCenter::Instance().emitEvent(Broadcast::kBroadcastOnRtspAuth, _media_info, realm, username, false, invoker, static_cast<SockInfo &>(*this))){
//表明该流需要认证却没监听请求密码事件,这一般是大意的程序所为,警告之 //表明该流需要认证却没监听请求密码事件,这一般是大意的程序所为,警告之
WarnP(this) << "请监听kBroadcastOnRtspAuth事件"; WarnP(this) << "请监听kBroadcastOnRtspAuth事件";
//但是我们还是忽略认证以便完成播放 //但是我们还是忽略认证以便完成播放
@ -602,9 +599,11 @@ void RtspSession::onAuthUser(const string &realm,const string &authorization){
onAuthFailed(realm,StrPrinter << "unsupported auth type:" << authType); onAuthFailed(realm,StrPrinter << "unsupported auth type:" << authType);
} }
} }
inline void RtspSession::send_StreamNotFound() { inline void RtspSession::send_StreamNotFound() {
sendRtspResponse("404 Stream Not Found",{"Connection","Close"}); sendRtspResponse("404 Stream Not Found",{"Connection","Close"});
} }
inline void RtspSession::send_UnsupportedTransport() { inline void RtspSession::send_UnsupportedTransport() {
sendRtspResponse("461 Unsupported Transport",{"Connection","Close"}); sendRtspResponse("461 Unsupported Transport",{"Connection","Close"});
} }
@ -615,35 +614,35 @@ inline void RtspSession::send_SessionNotFound() {
void RtspSession::handleReq_Setup(const Parser &parser) { void RtspSession::handleReq_Setup(const Parser &parser) {
//处理setup命令该函数可能进入多次 //处理setup命令该函数可能进入多次
auto controlSuffix = split(parser.FullUrl(),"/").back();// parser.FullUrl().substr(_strContentBase.size()); auto controlSuffix = split(parser.FullUrl(),"/").back();
if(controlSuffix.front() == '/'){ if(controlSuffix.front() == '/'){
controlSuffix = controlSuffix.substr(1); controlSuffix = controlSuffix.substr(1);
} }
int trackIdx = getTrackIndexByControlSuffix(controlSuffix); int trackIdx = getTrackIndexByControlSuffix(controlSuffix);
SdpTrack::Ptr &trackRef = _aTrackInfo[trackIdx]; SdpTrack::Ptr &trackRef = _sdp_track[trackIdx];
if (trackRef->_inited) { if (trackRef->_inited) {
//已经初始化过该Track //已经初始化过该Track
throw SockException(Err_shutdown, "can not setup one track twice"); throw SockException(Err_shutdown, "can not setup one track twice");
} }
trackRef->_inited = true; //现在初始化 trackRef->_inited = true; //现在初始化
if(_rtpType == Rtsp::RTP_Invalid){ if(_rtp_type == Rtsp::RTP_Invalid){
auto &strTransport = parser["Transport"]; auto &strTransport = parser["Transport"];
if(strTransport.find("TCP") != string::npos){ if(strTransport.find("TCP") != string::npos){
_rtpType = Rtsp::RTP_TCP; _rtp_type = Rtsp::RTP_TCP;
}else if(strTransport.find("multicast") != string::npos){ }else if(strTransport.find("multicast") != string::npos){
_rtpType = Rtsp::RTP_MULTICAST; _rtp_type = Rtsp::RTP_MULTICAST;
}else{ }else{
_rtpType = Rtsp::RTP_UDP; _rtp_type = Rtsp::RTP_UDP;
} }
} }
//允许接收rtp、rtcp包 //允许接收rtp、rtcp包
RtspSplitter::enableRecvRtp(_rtpType == Rtsp::RTP_TCP); RtspSplitter::enableRecvRtp(_rtp_type == Rtsp::RTP_TCP);
switch (_rtpType) { switch (_rtp_type) {
case Rtsp::RTP_TCP: { case Rtsp::RTP_TCP: {
if(_pushSrc){ if(_push_src){
//rtsp推流时interleaved由推流者决定 //rtsp推流时interleaved由推流者决定
auto key_values = Parser::parseArgs(parser["Transport"],";","="); auto key_values = Parser::parseArgs(parser["Transport"],";","=");
int interleaved_rtp = -1 , interleaved_rtcp = -1; int interleaved_rtp = -1 , interleaved_rtcp = -1;
@ -658,13 +657,15 @@ void RtspSession::handleReq_Setup(const Parser &parser) {
} }
sendRtspResponse("200 OK", sendRtspResponse("200 OK",
{"Transport", StrPrinter << "RTP/AVP/TCP;unicast;" {"Transport", StrPrinter << "RTP/AVP/TCP;unicast;"
<< "interleaved=" << (int)trackRef->_interleaved << "-" << (int)trackRef->_interleaved + 1 << ";" << "interleaved=" << (int) trackRef->_interleaved << "-"
<< (int) trackRef->_interleaved + 1 << ";"
<< "ssrc=" << printSSRC(trackRef->_ssrc), << "ssrc=" << printSSRC(trackRef->_ssrc),
"x-Transport-Options", "late-tolerance=1.400000", "x-Transport-Options", "late-tolerance=1.400000",
"x-Dynamic-Rate", "1" "x-Dynamic-Rate", "1"
}); });
} }
break; break;
case Rtsp::RTP_UDP: { case Rtsp::RTP_UDP: {
std::pair<Socket::Ptr, Socket::Ptr> pr; std::pair<Socket::Ptr, Socket::Ptr> pr;
try{ try{
@ -675,8 +676,8 @@ void RtspSession::handleReq_Setup(const Parser &parser) {
throw SockException(Err_shutdown, ex.what()); throw SockException(Err_shutdown, ex.what());
} }
_apRtpSock[trackIdx] = pr.first; _rtp_socks[trackIdx] = pr.first;
_apRtcpSock[trackIdx] = pr.second; _rtcp_socks[trackIdx] = pr.second;
//设置客户端内网端口信息 //设置客户端内网端口信息
string strClientPort = FindField(parser["Transport"].data(), "client_port=", NULL); string strClientPort = FindField(parser["Transport"].data(), "client_port=", NULL);
@ -705,14 +706,15 @@ void RtspSession::handleReq_Setup(const Parser &parser) {
sendRtspResponse("200 OK", sendRtspResponse("200 OK",
{"Transport", StrPrinter << "RTP/AVP/UDP;unicast;" {"Transport", StrPrinter << "RTP/AVP/UDP;unicast;"
<< "client_port=" << strClientPort << ";" << "client_port=" << strClientPort << ";"
<< "server_port=" << pr.first->get_local_port() << "-" << pr.second->get_local_port() << ";" << "server_port=" << pr.first->get_local_port() << "-"
<< pr.second->get_local_port() << ";"
<< "ssrc=" << printSSRC(trackRef->_ssrc) << "ssrc=" << printSSRC(trackRef->_ssrc)
}); });
} }
break; break;
case Rtsp::RTP_MULTICAST: { case Rtsp::RTP_MULTICAST: {
if(!_multicaster){ if(!_multicaster){
_multicaster = RtpMultiCaster::get(getPoller(),get_local_ip(),_mediaInfo._vhost, _mediaInfo._app, _mediaInfo._streamid); _multicaster = RtpMultiCaster::get(getPoller(), get_local_ip(), _media_info._vhost, _media_info._app, _media_info._streamid);
if (!_multicaster) { if (!_multicaster) {
send_NotAcceptable(); send_NotAcceptable();
throw SockException(Err_shutdown, "can not get a available udp multicast socket"); throw SockException(Err_shutdown, "can not get a available udp multicast socket");
@ -754,19 +756,19 @@ void RtspSession::handleReq_Setup(const Parser &parser) {
} }
void RtspSession::handleReq_Play(const Parser &parser) { void RtspSession::handleReq_Play(const Parser &parser) {
if (_aTrackInfo.empty() || parser["Session"] != _strSession) { if (_sdp_track.empty() || parser["Session"] != _sessionid) {
send_SessionNotFound(); send_SessionNotFound();
throw SockException(Err_shutdown,_aTrackInfo.empty() ? "can not find any available track when play" : "session not found when play"); throw SockException(Err_shutdown, _sdp_track.empty() ? "can not find any available track when play" : "session not found when play");
} }
auto pMediaSrc = _pMediaSrc.lock(); auto play_src = _play_src.lock();
if(!pMediaSrc){ if(!play_src){
send_StreamNotFound(); send_StreamNotFound();
shutdown(SockException(Err_shutdown,"rtsp stream released")); shutdown(SockException(Err_shutdown,"rtsp stream released"));
return; return;
} }
bool useBuf = true; bool useGOP = true;
_enableSendRtp = false; _enable_send_rtp = false;
float iStartTime = 0; float iStartTime = 0;
auto strRange = parser["Range"]; auto strRange = parser["Range"];
if (strRange.size()) { if (strRange.size()) {
@ -777,53 +779,53 @@ void RtspSession::handleReq_Play(const Parser &parser) {
} }
iStartTime = 1000 * atof(strStart.data()); iStartTime = 1000 * atof(strStart.data());
InfoP(this) << "rtsp seekTo(ms):" << iStartTime; InfoP(this) << "rtsp seekTo(ms):" << iStartTime;
useBuf = !pMediaSrc->seekTo(iStartTime); useGOP = !play_src->seekTo(iStartTime);
} else if (pMediaSrc->totalReaderCount() == 0) { } else if (play_src->totalReaderCount() == 0) {
//第一个消费者 //第一个消费者
pMediaSrc->seekTo(0); play_src->seekTo(0);
} }
_StrPrinter rtp_info; _StrPrinter rtp_info;
for (auto &track : _aTrackInfo) { for (auto &track : _sdp_track) {
if (track->_inited == false) { if (track->_inited == false) {
//还有track没有setup //还有track没有setup
shutdown(SockException(Err_shutdown, "track not setuped")); shutdown(SockException(Err_shutdown, "track not setuped"));
return; return;
} }
track->_ssrc = pMediaSrc->getSsrc(track->_type); track->_ssrc = play_src->getSsrc(track->_type);
track->_seq = pMediaSrc->getSeqence(track->_type); track->_seq = play_src->getSeqence(track->_type);
track->_time_stamp = pMediaSrc->getTimeStamp(track->_type); track->_time_stamp = play_src->getTimeStamp(track->_type);
rtp_info << "url=" << _strContentBase << "/" << track->_control_surffix << ";" rtp_info << "url=" << _content_base << "/" << track->_control_surffix << ";"
<< "seq=" << track->_seq << ";" << "seq=" << track->_seq << ";"
<< "rtptime=" << (int) (track->_time_stamp * (track->_samplerate / 1000)) << ","; << "rtptime=" << (int) (track->_time_stamp * (track->_samplerate / 1000)) << ",";
} }
rtp_info.pop_back(); rtp_info.pop_back();
sendRtspResponse("200 OK", sendRtspResponse("200 OK",
{"Range", StrPrinter << "npt=" << setiosflags(ios::fixed) << setprecision(2) << (useBuf? pMediaSrc->getTimeStamp(TrackInvalid) / 1000.0 : iStartTime / 1000), {"Range", StrPrinter << "npt=" << setiosflags(ios::fixed) << setprecision(2) << (useGOP ? play_src->getTimeStamp(TrackInvalid) / 1000.0 : iStartTime / 1000),
"RTP-Info",rtp_info "RTP-Info",rtp_info
}); });
_enableSendRtp = true; _enable_send_rtp = true;
setSocketFlags(); setSocketFlags();
if (!_pRtpReader && _rtpType != Rtsp::RTP_MULTICAST) { if (!_play_reader && _rtp_type != Rtsp::RTP_MULTICAST) {
weak_ptr<RtspSession> weakSelf = dynamic_pointer_cast<RtspSession>(shared_from_this()); weak_ptr<RtspSession> weakSelf = dynamic_pointer_cast<RtspSession>(shared_from_this());
_pRtpReader = pMediaSrc->getRing()->attach(getPoller(), useBuf); _play_reader = play_src->getRing()->attach(getPoller(), useGOP);
_pRtpReader->setDetachCB([weakSelf]() { _play_reader->setDetachCB([weakSelf]() {
auto strongSelf = weakSelf.lock(); auto strongSelf = weakSelf.lock();
if (!strongSelf) { if (!strongSelf) {
return; return;
} }
strongSelf->shutdown(SockException(Err_shutdown, "rtsp ring buffer detached")); strongSelf->shutdown(SockException(Err_shutdown, "rtsp ring buffer detached"));
}); });
_pRtpReader->setReadCB([weakSelf](const RtspMediaSource::RingDataType &pack) { _play_reader->setReadCB([weakSelf](const RtspMediaSource::RingDataType &pack) {
auto strongSelf = weakSelf.lock(); auto strongSelf = weakSelf.lock();
if (!strongSelf) { if (!strongSelf) {
return; return;
} }
if (strongSelf->_enableSendRtp) { if (strongSelf->_enable_send_rtp) {
strongSelf->sendRtpPacket(pack); strongSelf->sendRtpPacket(pack);
} }
}); });
@ -831,13 +833,13 @@ void RtspSession::handleReq_Play(const Parser &parser) {
} }
void RtspSession::handleReq_Pause(const Parser &parser) { void RtspSession::handleReq_Pause(const Parser &parser) {
if (parser["Session"] != _strSession) { if (parser["Session"] != _sessionid) {
send_SessionNotFound(); send_SessionNotFound();
throw SockException(Err_shutdown,"session not found when pause"); throw SockException(Err_shutdown,"session not found when pause");
} }
sendRtspResponse("200 OK"); sendRtspResponse("200 OK");
_enableSendRtp = false; _enable_send_rtp = false;
} }
void RtspSession::handleReq_Teardown(const Parser &parser) { void RtspSession::handleReq_Teardown(const Parser &parser) {
@ -873,7 +875,7 @@ void RtspSession::handleReq_Post(const Parser &parser) {
g_mapGetter.erase(sessioncookie); g_mapGetter.erase(sessioncookie);
//http poster收到请求后转发给http getter处理 //http poster收到请求后转发给http getter处理
_onRecv = [this,httpGetterWeak](const Buffer::Ptr &pBuf){ _on_recv = [this,httpGetterWeak](const Buffer::Ptr &buf){
auto httpGetterStrong = httpGetterWeak.lock(); auto httpGetterStrong = httpGetterWeak.lock();
if(!httpGetterStrong){ if(!httpGetterStrong){
shutdown(SockException(Err_shutdown,"http getter released")); shutdown(SockException(Err_shutdown,"http getter released"));
@ -881,18 +883,18 @@ void RtspSession::handleReq_Post(const Parser &parser) {
} }
//切换到http getter的线程 //切换到http getter的线程
httpGetterStrong->async([pBuf,httpGetterWeak](){ httpGetterStrong->async([buf,httpGetterWeak](){
auto httpGetterStrong = httpGetterWeak.lock(); auto httpGetterStrong = httpGetterWeak.lock();
if(!httpGetterStrong){ if(!httpGetterStrong){
return; return;
} }
httpGetterStrong->onRecv(std::make_shared<BufferString>(decodeBase64(string(pBuf->data(),pBuf->size())))); httpGetterStrong->onRecv(std::make_shared<BufferString>(decodeBase64(string(buf->data(), buf->size()))));
}); });
}; };
if(!parser.Content().empty()){ if(!parser.Content().empty()){
//http poster后面的粘包 //http poster后面的粘包
_onRecv(std::make_shared<BufferString>(parser.Content())); _on_recv(std::make_shared<BufferString>(parser.Content()));
} }
sendRtspResponse("200 OK", sendRtspResponse("200 OK",
@ -911,82 +913,82 @@ inline void RtspSession::send_NotAcceptable() {
sendRtspResponse("406 Not Acceptable",{"Connection","Close"}); sendRtspResponse("406 Not Acceptable",{"Connection","Close"});
} }
void RtspSession::onRtpSorted(const RtpPacket::Ptr &rtp, int) {
void RtspSession::onRtpSorted(const RtpPacket::Ptr &rtppt, int trackidx) { _push_src->onWrite(rtp, false);
_pushSrc->onWrite(rtppt, false);
} }
inline void RtspSession::onRcvPeerUdpData(int intervaled, const Buffer::Ptr &pBuf, const struct sockaddr& addr) {
//这是rtcp心跳包说明播放器还存活
_ticker.resetTime();
if(intervaled % 2 == 0){ inline void RtspSession::onRcvPeerUdpData(int interleaved, const Buffer::Ptr &buf, const struct sockaddr &addr) {
if(_pushSrc){ //这是rtcp心跳包说明播放器还存活
_alive_ticker.resetTime();
if (interleaved % 2 == 0) {
if (_push_src) {
//这是rtsp推流上来的rtp包 //这是rtsp推流上来的rtp包
auto &ref = _aTrackInfo[intervaled / 2]; auto &ref = _sdp_track[interleaved / 2];
handleOneRtp(intervaled / 2, ref->_type, ref->_samplerate, (unsigned char *) pBuf->data(), pBuf->size()); handleOneRtp(interleaved / 2, ref->_type, ref->_samplerate, (unsigned char *) buf->data(), buf->size());
}else if(!_udpSockConnected.count(intervaled)){ } else if (!_udp_connected_flags.count(interleaved)) {
//这是rtsp播放器的rtp打洞包 //这是rtsp播放器的rtp打洞包
_udpSockConnected.emplace(intervaled); _udp_connected_flags.emplace(interleaved);
_apRtpSock[intervaled / 2]->setSendPeerAddr(&addr); _rtp_socks[interleaved / 2]->setSendPeerAddr(&addr);
} }
} else { } else {
//rtcp包 //rtcp包
if(!_udpSockConnected.count(intervaled)){ if (!_udp_connected_flags.count(interleaved)) {
_udpSockConnected.emplace(intervaled); _udp_connected_flags.emplace(interleaved);
_apRtcpSock[(intervaled - 1) / 2]->setSendPeerAddr(&addr); _rtcp_socks[(interleaved - 1) / 2]->setSendPeerAddr(&addr);
} }
onRtcpPacket((intervaled - 1) / 2, _aTrackInfo[(intervaled - 1) / 2], (unsigned char *) pBuf->data(),pBuf->size()); onRtcpPacket((interleaved - 1) / 2, _sdp_track[(interleaved - 1) / 2], (unsigned char *) buf->data(),
buf->size());
} }
} }
inline void RtspSession::startListenPeerUdpData(int track_idx) {
inline void RtspSession::startListenPeerUdpData(int trackIdx) {
weak_ptr<RtspSession> weakSelf = dynamic_pointer_cast<RtspSession>(shared_from_this()); weak_ptr<RtspSession> weakSelf = dynamic_pointer_cast<RtspSession>(shared_from_this());
auto srcIP = inet_addr(get_peer_ip().data()); auto srcIP = inet_addr(get_peer_ip().data());
auto onUdpData = [weakSelf,srcIP](const Buffer::Ptr &pBuf, struct sockaddr *pPeerAddr,int intervaled){ auto onUdpData = [weakSelf,srcIP](const Buffer::Ptr &buf, struct sockaddr *peer_addr, int interleaved){
auto strongSelf = weakSelf.lock(); auto strongSelf = weakSelf.lock();
if (!strongSelf) { if (!strongSelf) {
return false; return false;
} }
if (((struct sockaddr_in *) pPeerAddr)->sin_addr.s_addr != srcIP) { if (((struct sockaddr_in *) peer_addr)->sin_addr.s_addr != srcIP) {
WarnP(strongSelf.get()) << ((intervaled % 2 == 0) ? "收到其他地址的rtp数据:" : "收到其他地址的rtcp数据:") WarnP(strongSelf.get()) << ((interleaved % 2 == 0) ? "收到其他地址的rtp数据:" : "收到其他地址的rtcp数据:")
<< SockUtil::inet_ntoa(((struct sockaddr_in *) pPeerAddr)->sin_addr); << SockUtil::inet_ntoa(((struct sockaddr_in *) peer_addr)->sin_addr);
return true; return true;
} }
struct sockaddr addr=*pPeerAddr; struct sockaddr addr = *peer_addr;
strongSelf->async([weakSelf,pBuf,addr,intervaled]() { strongSelf->async([weakSelf, buf, addr, interleaved]() {
auto strongSelf = weakSelf.lock(); auto strongSelf = weakSelf.lock();
if (!strongSelf) { if (!strongSelf) {
return; return;
} }
strongSelf->onRcvPeerUdpData(intervaled,pBuf,addr); strongSelf->onRcvPeerUdpData(interleaved, buf, addr);
}); });
return true; return true;
}; };
switch (_rtpType){ switch (_rtp_type){
case Rtsp::RTP_MULTICAST:{ case Rtsp::RTP_MULTICAST:{
//组播使用的共享rtcp端口 //组播使用的共享rtcp端口
UDPServer::Instance().listenPeer(get_peer_ip().data(), this, [onUdpData]( UDPServer::Instance().listenPeer(get_peer_ip().data(), this,
int intervaled, const Buffer::Ptr &pBuf, struct sockaddr *pPeerAddr) { [onUdpData]( int interleaved, const Buffer::Ptr &buf, struct sockaddr *peer_addr) {
return onUdpData(pBuf,pPeerAddr,intervaled); return onUdpData(buf, peer_addr, interleaved);
}); });
} }
break; break;
case Rtsp::RTP_UDP:{ case Rtsp::RTP_UDP:{
auto setEvent = [&](Socket::Ptr &sock,int intervaled){ auto setEvent = [&](Socket::Ptr &sock,int interleaved){
if(!sock){ if(!sock){
WarnP(this) << "udp端口为空:" << intervaled; WarnP(this) << "udp端口为空:" << interleaved;
return; return;
} }
sock->setOnRead([onUdpData,intervaled](const Buffer::Ptr &pBuf, struct sockaddr *pPeerAddr , int addr_len){ sock->setOnRead([onUdpData,interleaved](const Buffer::Ptr &pBuf, struct sockaddr *pPeerAddr , int addr_len){
onUdpData(pBuf,pPeerAddr,intervaled); onUdpData(pBuf, pPeerAddr, interleaved);
}); });
}; };
setEvent(_apRtpSock[trackIdx], 2*trackIdx ); setEvent(_rtp_socks[track_idx], 2 * track_idx );
setEvent(_apRtcpSock[trackIdx], 2*trackIdx + 1 ); setEvent(_rtcp_socks[track_idx], 2 * track_idx + 1 );
} }
break; break;
@ -1003,14 +1005,11 @@ static string dateStr(){
return buf; return buf;
} }
bool RtspSession::sendRtspResponse(const string &res_code, bool RtspSession::sendRtspResponse(const string &res_code, const StrCaseMap &header_const, const string &sdp, const char *protocol){
const StrCaseMap &header_const,
const string &sdp,
const char *protocol){
auto header = header_const; auto header = header_const;
header.emplace("CSeq",StrPrinter << _iCseq); header.emplace("CSeq",StrPrinter << _cseq);
if(!_strSession.empty()){ if(!_sessionid.empty()){
header.emplace("Session",_strSession); header.emplace("Session", _sessionid);
} }
header.emplace("Server",SERVER_NAME); header.emplace("Server",SERVER_NAME);
@ -1040,14 +1039,11 @@ int RtspSession::send(const Buffer::Ptr &pkt){
// if(!_enableSendRtp){ // if(!_enableSendRtp){
// DebugP(this) << pkt->data(); // DebugP(this) << pkt->data();
// } // }
_ui64TotalBytes += pkt->size(); _bytes_usage += pkt->size();
return TcpSession::send(pkt); return TcpSession::send(pkt);
} }
bool RtspSession::sendRtspResponse(const string &res_code, bool RtspSession::sendRtspResponse(const string &res_code, const std::initializer_list<string> &header, const string &sdp, const char *protocol) {
const std::initializer_list<string> &header,
const string &sdp,
const char *protocol) {
string key; string key;
StrCaseMap header_map; StrCaseMap header_map;
int i = 0; int i = 0;
@ -1062,35 +1058,36 @@ bool RtspSession::sendRtspResponse(const string &res_code,
} }
inline int RtspSession::getTrackIndexByTrackType(TrackType type) { inline int RtspSession::getTrackIndexByTrackType(TrackType type) {
for (unsigned int i = 0; i < _aTrackInfo.size(); i++) { for (unsigned int i = 0; i < _sdp_track.size(); i++) {
if (type == _aTrackInfo[i]->_type) { if (type == _sdp_track[i]->_type) {
return i; return i;
} }
} }
if(_aTrackInfo.size() == 1){ if(_sdp_track.size() == 1){
return 0; return 0;
} }
throw SockException(Err_shutdown, StrPrinter << "no such track with type:" << (int) type); throw SockException(Err_shutdown, StrPrinter << "no such track with type:" << (int) type);
} }
inline int RtspSession::getTrackIndexByControlSuffix(const string &controlSuffix) { inline int RtspSession::getTrackIndexByControlSuffix(const string &controlSuffix) {
for (unsigned int i = 0; i < _aTrackInfo.size(); i++) { for (unsigned int i = 0; i < _sdp_track.size(); i++) {
if (controlSuffix == _aTrackInfo[i]->_control_surffix) { if (controlSuffix == _sdp_track[i]->_control_surffix) {
return i; return i;
} }
} }
if(_aTrackInfo.size() == 1){ if(_sdp_track.size() == 1){
return 0; return 0;
} }
throw SockException(Err_shutdown, StrPrinter << "no such track with suffix:" << controlSuffix); throw SockException(Err_shutdown, StrPrinter << "no such track with suffix:" << controlSuffix);
} }
inline int RtspSession::getTrackIndexByInterleaved(int interleaved){ inline int RtspSession::getTrackIndexByInterleaved(int interleaved){
for (unsigned int i = 0; i < _aTrackInfo.size(); i++) { for (unsigned int i = 0; i < _sdp_track.size(); i++) {
if (_aTrackInfo[i]->_interleaved == interleaved) { if (_sdp_track[i]->_interleaved == interleaved) {
return i; return i;
} }
} }
if(_aTrackInfo.size() == 1){ if(_sdp_track.size() == 1){
return 0; return 0;
} }
throw SockException(Err_shutdown, StrPrinter << "no such track with interleaved:" << interleaved); throw SockException(Err_shutdown, StrPrinter << "no such track with interleaved:" << interleaved);
@ -1098,7 +1095,7 @@ inline int RtspSession::getTrackIndexByInterleaved(int interleaved){
bool RtspSession::close(MediaSource &sender, bool force) { bool RtspSession::close(MediaSource &sender, bool force) {
//此回调在其他线程触发 //此回调在其他线程触发
if(!_pushSrc || (!force && _pushSrc->totalReaderCount())){ if(!_push_src || (!force && _push_src->totalReaderCount())){
return false; return false;
} }
string err = StrPrinter << "close media:" << sender.getSchema() << "/" << sender.getVhost() << "/" << sender.getApp() << "/" << sender.getId() << " " << force; string err = StrPrinter << "close media:" << sender.getSchema() << "/" << sender.getVhost() << "/" << sender.getApp() << "/" << sender.getId() << " " << force;
@ -1107,17 +1104,34 @@ bool RtspSession::close(MediaSource &sender,bool force) {
} }
int RtspSession::totalReaderCount(MediaSource &sender) { int RtspSession::totalReaderCount(MediaSource &sender) {
return _pushSrc ? _pushSrc->totalReaderCount() : sender.readerCount(); return _push_src ? _push_src->totalReaderCount() : sender.readerCount();
}
inline void RtspSession::onSendRtpPacket(const RtpPacket::Ptr &pkt){
#if RTSP_SERVER_SEND_RTCP
int track_index = getTrackIndexByTrackType(pkt->type);
RtcpCounter &counter = _rtcp_counter[track_index];
counter.pktCnt += 1;
counter.octCount += (pkt->size() - pkt->offset);
auto &ticker = _rtcp_send_tickers[track_index];
if (ticker.elapsedTime() > 5 * 1000) {
//send rtcp every 5 second
ticker.resetTime();
//直接保存网络字节序
memcpy(&counter.timeStamp, pkt->data() + 8, 4);
sendSenderReport(_rtp_type == Rtsp::RTP_TCP, track_index);
}
#endif
} }
void RtspSession::sendRtpPacket(const RtspMediaSource::RingDataType &pkt) { void RtspSession::sendRtpPacket(const RtspMediaSource::RingDataType &pkt) {
//InfoP(this) <<(int)pkt.Interleaved; switch (_rtp_type) {
switch (_rtpType) {
case Rtsp::RTP_TCP: { case Rtsp::RTP_TCP: {
int i = 0; int i = 0;
int size = pkt->size(); int size = pkt->size();
setSendFlushFlag(false); setSendFlushFlag(false);
pkt->for_each([&](const RtpPacket::Ptr &rtp) { pkt->for_each([&](const RtpPacket::Ptr &rtp) {
onSendRtpPacket(rtp);
if (++i == size) { if (++i == size) {
setSendFlushFlag(true); setSendFlushFlag(true);
} }
@ -1129,15 +1143,15 @@ void RtspSession::sendRtpPacket(const RtspMediaSource::RingDataType &pkt) {
int i = 0; int i = 0;
int size = pkt->size(); int size = pkt->size();
pkt->for_each([&](const RtpPacket::Ptr &rtp) { pkt->for_each([&](const RtpPacket::Ptr &rtp) {
int iTrackIndex = getTrackIndexByTrackType(rtp->type); onSendRtpPacket(rtp);
auto &pSock = _apRtpSock[iTrackIndex]; int track_index = getTrackIndexByTrackType(rtp->type);
auto &pSock = _rtp_socks[track_index];
if (!pSock) { if (!pSock) {
shutdown(SockException(Err_shutdown, "udp sock not opened yet")); shutdown(SockException(Err_shutdown, "udp sock not opened yet"));
return; return;
} }
BufferRtp::Ptr buffer(new BufferRtp(rtp, 4)); BufferRtp::Ptr buffer(new BufferRtp(rtp, 4));
_ui64TotalBytes += buffer->size(); _bytes_usage += buffer->size();
pSock->send(buffer, nullptr, 0, ++i == size); pSock->send(buffer, nullptr, 0, ++i == size);
}); });
} }
@ -1145,42 +1159,27 @@ void RtspSession::sendRtpPacket(const RtspMediaSource::RingDataType &pkt) {
default: default:
break; break;
} }
#if RTSP_SERVER_SEND_RTCP
int iTrackIndex = getTrackIndexByTrackType(pkt->type);
RtcpCounter &counter = _aRtcpCnt[iTrackIndex];
counter.pktCnt += 1;
counter.octCount += (pkt->length - pkt->offset);
auto &ticker = _aRtcpTicker[iTrackIndex];
if (ticker.elapsedTime() > 5 * 1000) {
//send rtcp every 5 second
ticker.resetTime();
//直接保存网络字节序
memcpy(&counter.timeStamp, pkt->payload + 8 , 4);
sendSenderReport(_rtpType == Rtsp::RTP_TCP,iTrackIndex);
}
#endif
} }
void RtspSession::sendSenderReport(bool overTcp,int iTrackIndex) { void RtspSession::sendSenderReport(bool over_tcp, int track_index) {
static const char s_cname[] = "ZLMediaKitRtsp"; static const char s_cname[] = "ZLMediaKitRtsp";
uint8_t aui8Rtcp[4 + 28 + 10 + sizeof(s_cname) + 1] = {0}; uint8_t rtcp_buf[4 + 28 + 10 + sizeof(s_cname) + 1] = {0};
uint8_t *pui8Rtcp_SR = aui8Rtcp + 4, *pui8Rtcp_SDES = pui8Rtcp_SR + 28; uint8_t *rtcp_sr = rtcp_buf + 4, *rtcp_sdes = rtcp_sr + 28;
auto &track = _aTrackInfo[iTrackIndex]; auto &track = _sdp_track[track_index];
auto &counter = _aRtcpCnt[iTrackIndex]; auto &counter = _rtcp_counter[track_index];
aui8Rtcp[0] = '$'; rtcp_buf[0] = '$';
aui8Rtcp[1] = track->_interleaved + 1; rtcp_buf[1] = track->_interleaved + 1;
aui8Rtcp[2] = (sizeof(aui8Rtcp) - 4) >> 8; rtcp_buf[2] = (sizeof(rtcp_buf) - 4) >> 8;
aui8Rtcp[3] = (sizeof(aui8Rtcp) - 4) & 0xFF; rtcp_buf[3] = (sizeof(rtcp_buf) - 4) & 0xFF;
pui8Rtcp_SR[0] = 0x80; rtcp_sr[0] = 0x80;
pui8Rtcp_SR[1] = 0xC8; rtcp_sr[1] = 0xC8;
pui8Rtcp_SR[2] = 0x00; rtcp_sr[2] = 0x00;
pui8Rtcp_SR[3] = 0x06; rtcp_sr[3] = 0x06;
uint32_t ssrc = htonl(track->_ssrc); uint32_t ssrc = htonl(track->_ssrc);
memcpy(&pui8Rtcp_SR[4], &ssrc, 4); memcpy(&rtcp_sr[4], &ssrc, 4);
uint64_t msw; uint64_t msw;
uint64_t lsw; uint64_t lsw;
@ -1190,35 +1189,35 @@ void RtspSession::sendSenderReport(bool overTcp,int iTrackIndex) {
lsw = (uint32_t) ((double) tv.tv_usec * (double) (((uint64_t) 1) << 32) * 1.0e-6); lsw = (uint32_t) ((double) tv.tv_usec * (double) (((uint64_t) 1) << 32) * 1.0e-6);
msw = htonl(msw); msw = htonl(msw);
memcpy(&pui8Rtcp_SR[8], &msw, 4); memcpy(&rtcp_sr[8], &msw, 4);
lsw = htonl(lsw); lsw = htonl(lsw);
memcpy(&pui8Rtcp_SR[12], &lsw, 4); memcpy(&rtcp_sr[12], &lsw, 4);
//直接使用网络字节序 //直接使用网络字节序
memcpy(&pui8Rtcp_SR[16], &counter.timeStamp, 4); memcpy(&rtcp_sr[16], &counter.timeStamp, 4);
uint32_t pktCnt = htonl(counter.pktCnt); uint32_t pktCnt = htonl(counter.pktCnt);
memcpy(&pui8Rtcp_SR[20], &pktCnt, 4); memcpy(&rtcp_sr[20], &pktCnt, 4);
uint32_t octCount = htonl(counter.octCount); uint32_t octCount = htonl(counter.octCount);
memcpy(&pui8Rtcp_SR[24], &octCount, 4); memcpy(&rtcp_sr[24], &octCount, 4);
pui8Rtcp_SDES[0] = 0x81; rtcp_sdes[0] = 0x81;
pui8Rtcp_SDES[1] = 0xCA; rtcp_sdes[1] = 0xCA;
pui8Rtcp_SDES[2] = 0x00; rtcp_sdes[2] = 0x00;
pui8Rtcp_SDES[3] = 0x06; rtcp_sdes[3] = 0x06;
memcpy(&pui8Rtcp_SDES[4], &ssrc, 4); memcpy(&rtcp_sdes[4], &ssrc, 4);
pui8Rtcp_SDES[8] = 0x01; rtcp_sdes[8] = 0x01;
pui8Rtcp_SDES[9] = 0x0f; rtcp_sdes[9] = 0x0f;
memcpy(&pui8Rtcp_SDES[10], s_cname, sizeof(s_cname)); memcpy(&rtcp_sdes[10], s_cname, sizeof(s_cname));
pui8Rtcp_SDES[10 + sizeof(s_cname)] = 0x00; rtcp_sdes[10 + sizeof(s_cname)] = 0x00;
if(overTcp){ if (over_tcp) {
send(obtainBuffer((char *) aui8Rtcp, sizeof(aui8Rtcp))); send(obtainBuffer((char *) rtcp_buf, sizeof(rtcp_buf)));
} else { } else {
_apRtcpSock[iTrackIndex]->send((char *) aui8Rtcp + 4, sizeof(aui8Rtcp) - 4); _rtcp_socks[track_index]->send((char *) rtcp_buf + 4, sizeof(rtcp_buf) - 4);
} }
} }
@ -1234,4 +1233,3 @@ void RtspSession::setSocketFlags(){
} }
/* namespace mediakit */ /* namespace mediakit */

View File

@ -59,51 +59,31 @@ public:
//在请求明文密码时如果提供md5密码者则会导致认证失败 //在请求明文密码时如果提供md5密码者则会导致认证失败
typedef std::function<void(bool encrypted,const string &pwd_or_md5)> onAuth; typedef std::function<void(bool encrypted,const string &pwd_or_md5)> onAuth;
RtspSession(const Socket::Ptr &pSock); RtspSession(const Socket::Ptr &sock);
virtual ~RtspSession(); virtual ~RtspSession();
////TcpSession override//// ////TcpSession override////
void onRecv(const Buffer::Ptr &pBuf) override; void onRecv(const Buffer::Ptr &buf) override;
void onError(const SockException &err) override; void onError(const SockException &err) override;
void onManager() override; void onManager() override;
protected: protected:
//RtspSplitter override /////RtspSplitter override/////
/** //收到完整的rtsp包回调包括sdp等content数据
* rtsp包回调sdp等content数据
* @param parser rtsp包
*/
void onWholeRtspPacket(Parser &parser) override; void onWholeRtspPacket(Parser &parser) override;
//收到rtp包回调
/**
* rtp包回调
* @param data
* @param len
*/
void onRtpPacket(const char *data, uint64_t len) override; void onRtpPacket(const char *data, uint64_t len) override;
//从rtsp头中获取Content长度
/**
* rtsp头中获取Content长度
* @param parser
* @return
*/
int64_t getContentLength(Parser &parser) override; int64_t getContentLength(Parser &parser) override;
////RtpReceiver override////
//RtpReceiver override void onRtpSorted(const RtpPacket::Ptr &rtp, int track_idx) override;
void onRtpSorted(const RtpPacket::Ptr &rtppt, int trackidx) override; ////MediaSourceEvent override////
//MediaSourceEvent override
bool close(MediaSource &sender, bool force) override ; bool close(MediaSource &sender, bool force) override ;
int totalReaderCount(MediaSource &sender) override; int totalReaderCount(MediaSource &sender) override;
/////TcpSession override////
//TcpSession override
int send(const Buffer::Ptr &pkt) override; int send(const Buffer::Ptr &pkt) override;
//收到RTCP包回调
virtual void onRtcpPacket(int track_idx, SdpTrack::Ptr &track, unsigned char *data, unsigned int len);
/**
* RTCP包回调
* @param iTrackidx
* @param track
* @param pucData
* @param uiLen
*/
virtual void onRtcpPacket(int iTrackidx, SdpTrack::Ptr &track, unsigned char *pucData, unsigned int uiLen);
private: private:
//处理options方法,获取服务器能力 //处理options方法,获取服务器能力
void handleReq_Options(const Parser &parser); void handleReq_Options(const Parser &parser);
@ -127,26 +107,22 @@ private:
void handleReq_Post(const Parser &parser); void handleReq_Post(const Parser &parser);
//处理SET_PARAMETER、GET_PARAMETER方法一般用于心跳 //处理SET_PARAMETER、GET_PARAMETER方法一般用于心跳
void handleReq_SET_PARAMETER(const Parser &parser); void handleReq_SET_PARAMETER(const Parser &parser);
//rtsp资源未找到 //rtsp资源未找到
void inline send_StreamNotFound(); void send_StreamNotFound();
//不支持的传输模式 //不支持的传输模式
void inline send_UnsupportedTransport(); void send_UnsupportedTransport();
//会话id错误 //会话id错误
void inline send_SessionNotFound(); void send_SessionNotFound();
//一般rtsp服务器打开端口失败时触发 //一般rtsp服务器打开端口失败时触发
void inline send_NotAcceptable(); void send_NotAcceptable();
//获取track下标 //获取track下标
inline int getTrackIndexByTrackType(TrackType type); int getTrackIndexByTrackType(TrackType type);
inline int getTrackIndexByControlSuffix(const string &controlSuffix); int getTrackIndexByControlSuffix(const string &control_suffix);
inline int getTrackIndexByInterleaved(int interleaved); int getTrackIndexByInterleaved(int interleaved);
//一般用于接收udp打洞包也用于rtsp推流 //一般用于接收udp打洞包也用于rtsp推流
inline void onRcvPeerUdpData(int intervaled, const Buffer::Ptr &pBuf, const struct sockaddr &addr); void onRcvPeerUdpData(int interleaved, const Buffer::Ptr &buf, const struct sockaddr &addr);
//配合onRcvPeerUdpData使用 //配合onRcvPeerUdpData使用
inline void startListenPeerUdpData(int iTrackIdx); void startListenPeerUdpData(int track_idx);
////rtsp专有认证相关//// ////rtsp专有认证相关////
//认证成功 //认证成功
void onAuthSuccess(); void onAuthSuccess();
@ -155,73 +131,76 @@ private:
//开始走rtsp专有认证流程 //开始走rtsp专有认证流程
void onAuthUser(const string &realm, const string &authorization); void onAuthUser(const string &realm, const string &authorization);
//校验base64方式的认证加密 //校验base64方式的认证加密
void onAuthBasic(const string &realm,const string &strBase64); void onAuthBasic(const string &realm, const string &auth_base64);
//校验md5方式的认证加密 //校验md5方式的认证加密
void onAuthDigest(const string &realm,const string &strMd5); void onAuthDigest(const string &realm, const string &auth_md5);
//触发url鉴权事件 //触发url鉴权事件
void emitOnPlay(); void emitOnPlay();
//发送rtp给客户端 //发送rtp给客户端
void sendRtpPacket(const RtspMediaSource::RingDataType &pkt); void sendRtpPacket(const RtspMediaSource::RingDataType &pkt);
//触发rtcp发送
void onSendRtpPacket(const RtpPacket::Ptr &rtp);
//回复客户端 //回复客户端
bool sendRtspResponse(const string &res_code, const std::initializer_list<string> &header, const string &sdp = "", const char *protocol = "RTSP/1.0"); bool sendRtspResponse(const string &res_code, const std::initializer_list<string> &header, const string &sdp = "", const char *protocol = "RTSP/1.0");
bool sendRtspResponse(const string &res_code, const StrCaseMap &header = StrCaseMap(), const string &sdp = "", const char *protocol = "RTSP/1.0"); bool sendRtspResponse(const string &res_code, const StrCaseMap &header = StrCaseMap(), const string &sdp = "", const char *protocol = "RTSP/1.0");
//服务器发送rtcp //服务器发送rtcp
void sendSenderReport(bool overTcp,int iTrackIndex); void sendSenderReport(bool over_tcp, int track_idx);
//设置socket标志 //设置socket标志
void setSocketFlags(); void setSocketFlags();
private: private:
//用于判断客户端是否超时
Ticker _ticker;
//收到的seq回复时一致
int _iCseq = 0;
//ContentBase
string _strContentBase;
//Session号
string _strSession;
//记录是否需要rtsp专属鉴权防止重复触发事件
string _rtsp_realm;
//是否已经触发on_play事件 //是否已经触发on_play事件
bool _emit_on_play = false; bool _emit_on_play = false;
//url解析后保存的相关信息 //是否开始发送rtp
MediaInfo _mediaInfo; bool _enable_send_rtp;
//rtsp播放器绑定的直播源
std::weak_ptr<RtspMediaSource> _pMediaSrc;
//直播源读取器
RtspMediaSource::RingType::RingReader::Ptr _pRtpReader;
//推流或拉流客户端采用的rtp传输方式 //推流或拉流客户端采用的rtp传输方式
Rtsp::eRtpType _rtpType = Rtsp::RTP_Invalid; Rtsp::eRtpType _rtp_type = Rtsp::RTP_Invalid;
//收到的seq回复时一致
int _cseq = 0;
//消耗的总流量
uint64_t _bytes_usage = 0;
//ContentBase
string _content_base;
//Session号
string _sessionid;
//记录是否需要rtsp专属鉴权防止重复触发事件
string _rtsp_realm;
//登录认证
string _auth_nonce;
//用于判断客户端是否超时
Ticker _alive_ticker;
//url解析后保存的相关信息
MediaInfo _media_info;
//rtsp推流相关绑定的源
RtspMediaSourceImp::Ptr _push_src;
//rtsp播放器绑定的直播源
std::weak_ptr<RtspMediaSource> _play_src;
//直播源读取器
RtspMediaSource::RingType::RingReader::Ptr _play_reader;
//sdp里面有效的track,包含音频或视频 //sdp里面有效的track,包含音频或视频
vector<SdpTrack::Ptr> _aTrackInfo; vector<SdpTrack::Ptr> _sdp_track;
//rtcp统计,trackid idx 为数组下标
RtcpCounter _rtcp_counter[2];
//rtcp发送时间,trackid idx 为数组下标
Ticker _rtcp_send_tickers[2];
////////RTP over udp//////// ////////RTP over udp////////
//RTP端口,trackid idx 为数组下标 //RTP端口,trackid idx 为数组下标
Socket::Ptr _apRtpSock[2]; Socket::Ptr _rtp_socks[2];
//RTCP端口,trackid idx 为数组下标 //RTCP端口,trackid idx 为数组下标
Socket::Ptr _apRtcpSock[2]; Socket::Ptr _rtcp_socks[2];
//标记是否收到播放的udp打洞包,收到播放的udp打洞包后才能知道其外网udp端口号 //标记是否收到播放的udp打洞包,收到播放的udp打洞包后才能知道其外网udp端口号
unordered_set<int> _udpSockConnected; unordered_set<int> _udp_connected_flags;
////////RTP over udp_multicast//////// ////////RTP over udp_multicast////////
//共享的rtp组播对象 //共享的rtp组播对象
RtpMultiCaster::Ptr _multicaster; RtpMultiCaster::Ptr _multicaster;
////////RTSP over HTTP ////////
//登录认证
string _strNonce;
//消耗的总流量
uint64_t _ui64TotalBytes = 0;
//RTSP over HTTP
//quicktime 请求rtsp会产生两次tcp连接 //quicktime 请求rtsp会产生两次tcp连接
//一次发送 get 一次发送post需要通过x-sessioncookie关联起来 //一次发送 get 一次发送post需要通过x-sessioncookie关联起来
string _http_x_sessioncookie; string _http_x_sessioncookie;
function<void(const Buffer::Ptr &pBuf)> _onRecv; function<void(const Buffer::Ptr &)> _on_recv;
//是否开始发送rtp
bool _enableSendRtp;
//rtsp推流相关
RtspMediaSourceImp::Ptr _pushSrc;
//rtcp统计,trackid idx 为数组下标
RtcpCounter _aRtcpCnt[2];
//rtcp发送时间,trackid idx 为数组下标
Ticker _aRtcpTicker[2];
}; };
/** /**