/* * Copyright (c) 2016-present The ZLMediaKit project authors. All Rights Reserved. * * This file is part of ZLMediaKit(https://github.com/ZLMediaKit/ZLMediaKit). * * Use of this source code is governed by MIT-like license that can be found in the * LICENSE file in the root of the source tree. All contributing project authors * may be found in the AUTHORS file in the root of the source tree. */ #include "WebRtcSession.h" #include "Util/util.h" #include "Network/TcpServer.h" #include "Common/config.h" #include "IceServer.hpp" #include "WebRtcTransport.h" using namespace std; namespace mediakit { static string getUserName(const char *buf, size_t len) { if (!RTC::StunPacket::IsStun((const uint8_t *) buf, len)) { return ""; } std::unique_ptr packet(RTC::StunPacket::Parse((const uint8_t *) buf, len)); if (!packet) { return ""; } if (packet->GetClass() != RTC::StunPacket::Class::REQUEST || packet->GetMethod() != RTC::StunPacket::Method::BINDING) { return ""; } // 收到binding request请求 [AUTO-TRANSLATED:eff4d773] // Received binding request auto vec = split(packet->GetUsername(), ":"); return vec[0]; } EventPoller::Ptr WebRtcSession::queryPoller(const Buffer::Ptr &buffer) { auto user_name = getUserName(buffer->data(), buffer->size()); if (user_name.empty()) { return nullptr; } auto ret = WebRtcTransportManager::Instance().getItem(user_name); return ret ? ret->getPoller() : nullptr; } //////////////////////////////////////////////////////////////////////////////// WebRtcSession::WebRtcSession(const Socket::Ptr &sock) : Session(sock) { _over_tcp = sock->sockType() == SockNum::Sock_TCP; } void WebRtcSession::attachServer(const Server &server) { _server = std::static_pointer_cast(const_cast(server).shared_from_this()); } void WebRtcSession::onRecv_l(const char *data, size_t len) { if (_find_transport) { // 只允许寻找一次transport [AUTO-TRANSLATED:446fae53] // Only allow searching for transport once _find_transport = false; auto user_name = getUserName(data, len); auto transport = WebRtcTransportManager::Instance().getItem(user_name); CHECK(transport); // WebRtcTransport在其他poller线程上,需要切换poller线程并重新创建WebRtcSession对象 [AUTO-TRANSLATED:7e5534cf] // WebRtcTransport is on another poller thread, need to switch poller thread and recreate WebRtcSession object if (!transport->getPoller()->isCurrentThread()) { auto sock = Socket::createSocket(transport->getPoller(), false); // 1、克隆socket(fd不变),切换poller线程到WebRtcTransport所在线程 [AUTO-TRANSLATED:f930bfab] // 1. Clone socket (fd remains unchanged), switch poller thread to the thread where WebRtcTransport is located sock->cloneSocket(*(getSock())); auto server = _server; std::string str(data, len); sock->getPoller()->async([sock, server, str](){ auto strong_server = server.lock(); if (strong_server) { auto session = static_pointer_cast(strong_server->createSession(sock)); // 2、创建新的WebRtcSession对象(绑定到WebRtcTransport所在线程),重新处理一遍ice binding request命令 [AUTO-TRANSLATED:c75203bb] // 2. Create a new WebRtcSession object (bound to the thread where WebRtcTransport is located), reprocess the ice binding request command session->onRecv_l(str.data(), str.size()); } }); // 3、销毁原先的socket和WebRtcSession(原先的对象跟WebRtcTransport不在同一条线程) [AUTO-TRANSLATED:a6d6d63f] // 3. Destroy the original socket and WebRtcSession (the original object is not on the same thread as WebRtcTransport) throw std::runtime_error("webrtc over tcp change poller: " + getPoller()->getThreadName() + " -> " + sock->getPoller()->getThreadName()); } _transport = std::move(transport); InfoP(this); } _ticker.resetTime(); CHECK(_transport); _transport->inputSockData((char *)data, len, this); } void WebRtcSession::onRecv(const Buffer::Ptr &buffer) { if (_over_tcp) { input(buffer->data(), buffer->size()); } else { onRecv_l(buffer->data(), buffer->size()); } } void WebRtcSession::onError(const SockException &err) { // udp链接超时,但是rtc链接不一定超时,因为可能存在链接迁移的情况 [AUTO-TRANSLATED:aaa9672f] // UDP connection timeout, but RTC connection may not timeout, because there may be connection migration // 在udp链接迁移时,新的WebRtcSession对象将接管WebRtcTransport对象的生命周期 [AUTO-TRANSLATED:7e7d19df] // When UDP connection migrates, the new WebRtcSession object will take over the life cycle of the WebRtcTransport object // 本WebRtcSession对象将在超时后自动销毁 [AUTO-TRANSLATED:bc903a06] // This WebRtcSession object will be automatically destroyed after timeout WarnP(this) << err; if (!_transport) { return; } auto self = static_pointer_cast(shared_from_this()); auto transport = std::move(_transport); getPoller()->async([transport, self]() mutable { // 延时减引用,防止使用transport对象时,销毁对象 [AUTO-TRANSLATED:09dd6609] // Delay decrementing the reference count to prevent the object from being destroyed when using the transport object transport->removeTuple(self.get()); // 确保transport在Session对象前销毁,防止WebRtcTransport::onDestory()时获取不到Session对象 [AUTO-TRANSLATED:acd8bd77] // Ensure that the transport is destroyed before the Session object to prevent WebRtcTransport::onDestory() from not being able to get the Session object transport = nullptr; }, false); } void WebRtcSession::onManager() { GET_CONFIG(float, timeoutSec, Rtc::kTimeOutSec); if (!_transport && _ticker.createdTime() > timeoutSec * 1000) { shutdown(SockException(Err_timeout, "illegal webrtc connection")); return; } if (_ticker.elapsedTime() > timeoutSec * 1000) { shutdown(SockException(Err_timeout, "webrtc connection timeout")); return; } } ssize_t WebRtcSession::onRecvHeader(const char *data, size_t len) { onRecv_l(data + 2, len - 2); return 0; } const char *WebRtcSession::onSearchPacketTail(const char *data, size_t len) { if (len < 2) { // 数据不够 [AUTO-TRANSLATED:830a2785] // Not enough data return nullptr; } uint16_t length = (((uint8_t *)data)[0] << 8) | ((uint8_t *)data)[1]; if (len < (size_t)(length + 2)) { // 数据不够 [AUTO-TRANSLATED:830a2785] // Not enough data return nullptr; } // 返回rtp包末尾 [AUTO-TRANSLATED:5134cf6f] // Return the end of the RTP packet return data + 2 + length; } }// namespace mediakit