#include "Streamer.h" #include "Core/Logger.h" #include "Helpers.h" #include #include #include #include #include namespace Danki { class WebRTCStreamerPrivate { public: WebRTCStreamerPrivate(boost::asio::io_context &ioContext) : strand{ioContext.get_executor()} { } std::shared_ptr addVideo(const std::shared_ptr pc, const uint8_t payloadType, const uint32_t ssrc, const std::string cname, const std::string msid, const std::function onOpen) { using namespace std::chrono; auto video = rtc::Description::Video(cname); video.addH264Codec(payloadType); video.addSSRC(ssrc, cname, msid, cname); auto track = pc->addTrack(video); auto rtpConfig = std::make_shared( ssrc, cname, payloadType, rtc::H264RtpPacketizer::defaultClockRate); // create RTP configuration rtpConfig->startTimestamp = rtpConfig->secondsToTimestamp( static_cast(duration_cast(system_clock::now().time_since_epoch()).count()) / 1000); auto packetizer = std::make_shared(rtc::NalUnit::Separator::StartSequence, rtpConfig); // create packetizer auto srReporter = std::make_shared(rtpConfig); // add RTCP SR handler packetizer->addToChain(srReporter); auto nackResponder = std::make_shared(); // add RTCP NACK handler packetizer->addToChain(nackResponder); track->setMediaHandler(packetizer); // set handler track->onOpen(onOpen); auto trackData = std::make_shared(); trackData->track = track; trackData->sender = srReporter; return trackData; } std::shared_ptr createPeerConnection(const rtc::Configuration &config, std::string id) { auto pc = std::make_shared(config); auto client = std::make_shared(pc); pc->onStateChange([this, id](rtc::PeerConnection::State state) { LOG(info) << "State: " << state; if (state == rtc::PeerConnection::State::Disconnected || state == rtc::PeerConnection::State::Failed || state == rtc::PeerConnection::State::Closed) { boost::asio::post(strand, [this, id]() { m_clients.erase(id); }); // remove disconnected client } }); pc->onGatheringStateChange([this, wpc = make_weak_ptr(pc), id](rtc::PeerConnection::GatheringState state) { LOG(info) << "Gathering State: " << state; if (state == rtc::PeerConnection::GatheringState::Complete) { if (auto pc = wpc.lock()) { auto description = pc->localDescription(); boost::json::object message; message["id"] = id; message["type"] = description->typeString(); message["sdp"] = std::string(description.value()); websocket->send(boost::json::serialize(message)); // Gathering complete, send answer } } }); client->video = addVideo(pc, 102, 1, "video-stream", "stream", [this, id, wc = make_weak_ptr(client)]() { boost::asio::post(strand, [this, wc]() { if (auto c = wc.lock()) { c->setState(Client::State::Ready); } }); LOG(info) << "Video from " << id << " opened"; }); auto dc = pc->createDataChannel("ping-pong"); dc->onOpen([id, wdc = make_weak_ptr(dc)]() { if (auto dc = wdc.lock()) { dc->send("Ping"); } }); dc->onMessage(nullptr, [id, wdc = make_weak_ptr(dc)](std::string msg) { LOG(info) << "Message from " << id << " received: " << msg; if (auto dc = wdc.lock()) { dc->send("Ping"); } }); client->dataChannel = dc; pc->setLocalDescription(); return client; } void onWebSocketMesssage(const boost::json::object &message) { if (!message.contains("id") || !message.contains("type")) return; std::string id = std::string(message.at("id").as_string()); std::string type = std::string(message.at("type").as_string()); if (type == "request") { m_clients.emplace(id, createPeerConnection(configuration, id)); } else if (type == "answer") { if (m_clients.count(id)) { auto &peer = m_clients.at(id); std::string sdp = std::string(message.at("sdp").as_string()); auto pc = peer->peerConnection(); auto description = rtc::Description(sdp, type); pc->setRemoteDescription(description); } } } boost::asio::strand strand; rtc::Configuration configuration; std::shared_ptr websocket; std::unordered_map> m_clients; }; Streamer::~Streamer() { if (m_d != nullptr) { delete m_d; } } void Streamer::start(const std::string &signalServerAddress, uint16_t signalServerPort) { using namespace std::chrono_literals; std::string localId = "server"; LOG(info) << "The local ID is: " << localId; rtc::WebSocket::Configuration c; c.disableTlsVerification = true; m_d->websocket = std::make_shared(c); m_d->websocket->onOpen([]() { LOG(info) << "WebSocket connected, signaling ready"; }); m_d->websocket->onClosed([]() { LOG(info) << "WebSocket closed"; }); m_d->websocket->onError([this, signalServerAddress, signalServerPort](const std::string &error) { LOG(error) << "WebSocket failed: " << error; boost::asio::post(m_d->strand, [this, signalServerAddress, signalServerPort]() { start(signalServerAddress, signalServerPort); }); }); m_d->websocket->onMessage([this](std::variant data) { if (!std::holds_alternative(data)) return; auto &text = std::get(data); LOG(info) << "ws received: " << std::endl << text; auto value = boost::json::parse(text); boost::asio::post(m_d->strand, [this, value = std::move(value)]() { m_d->onWebSocketMesssage(value.as_object()); }); }); const std::string url = "ws://" + signalServerAddress + ":" + std::to_string(signalServerPort) + "/api/v1/webrtc/signal/" + localId; LOG(info) << "URL is " << url; m_d->websocket->open(url); LOG(info) << "Waiting for signaling to be connected..."; } void Streamer::push(const uint8_t *data, uint32_t size) { using namespace std::chrono; boost::asio::post(m_d->strand, [this, frame = rtc::binary(reinterpret_cast(data), reinterpret_cast(data) + size)]() { for (auto &[id, client] : m_d->m_clients) { if (client->state() != Client::State::Ready) continue; auto sender = (*client->video)->sender; auto track = (*client->video)->track; sender->rtpConfig->timestamp = sender->rtpConfig->secondsToTimestamp( static_cast(duration_cast(system_clock::now().time_since_epoch()).count()) / 1000); auto reportElapsedTimestamp = sender->rtpConfig->timestamp - sender->lastReportedTimestamp(); if (sender->rtpConfig->timestampToSeconds(reportElapsedTimestamp) > 1) { sender->setNeedsToReport(); } track->send(frame); } }); } Streamer::Streamer(boost::asio::io_context &ioContext) : m_d{new WebRTCStreamerPrivate(ioContext)} { rtc::InitLogger(rtc::LogLevel::Info); std::string stunServer = "stun:amass.fun:5349"; // ssl m_d->configuration.iceServers.emplace_back(stunServer); LOG(info) << "STUN server is " << stunServer; rtc::IceServer turnServer("amass.fun", 5349, "amass", "88888888"); m_d->configuration.iceServers.emplace_back(turnServer); m_d->configuration.disableAutoNegotiation = true; } }