188 lines
8.2 KiB
C++
188 lines
8.2 KiB
C++
#include "Streamer.h"
|
|
#include "Core/Logger.h"
|
|
#include "Helpers.h"
|
|
#include <boost/asio/io_context.hpp>
|
|
#include <boost/asio/strand.hpp>
|
|
#include <boost/json/parse.hpp>
|
|
#include <boost/json/serialize.hpp>
|
|
#include <rtc/rtc.hpp>
|
|
|
|
class WebRTCStreamerPrivate {
|
|
public:
|
|
WebRTCStreamerPrivate(boost::asio::io_context &ioContext) : strand{ioContext.get_executor()} {
|
|
}
|
|
|
|
std::shared_ptr<ClientTrackData> addVideo(const std::shared_ptr<rtc::PeerConnection> pc, const uint8_t payloadType,
|
|
const uint32_t ssrc, const std::string cname, const std::string msid,
|
|
const std::function<void(void)> onOpen) {
|
|
using namespace std::chrono;
|
|
auto video = rtc::Description::Video(cname);
|
|
video.addH264Codec(payloadType);
|
|
video.addSSRC(ssrc, cname, msid, cname);
|
|
auto track = pc->addTrack(video);
|
|
|
|
auto rtpConfig = std::make_shared<rtc::RtpPacketizationConfig>(
|
|
ssrc, cname, payloadType, rtc::H264RtpPacketizer::defaultClockRate); // create RTP configuration
|
|
rtpConfig->startTimestamp = rtpConfig->secondsToTimestamp(
|
|
static_cast<double>(duration_cast<milliseconds>(system_clock::now().time_since_epoch()).count()) / 1000);
|
|
|
|
auto packetizer = std::make_shared<rtc::H264RtpPacketizer>(rtc::NalUnit::Separator::StartSequence,
|
|
rtpConfig); // create packetizer
|
|
|
|
auto srReporter = std::make_shared<rtc::RtcpSrReporter>(rtpConfig); // add RTCP SR handler
|
|
packetizer->addToChain(srReporter);
|
|
|
|
auto nackResponder = std::make_shared<rtc::RtcpNackResponder>(); // add RTCP NACK handler
|
|
packetizer->addToChain(nackResponder);
|
|
|
|
track->setMediaHandler(packetizer); // set handler
|
|
track->onOpen(onOpen);
|
|
auto trackData = std::make_shared<ClientTrackData>();
|
|
trackData->track = track;
|
|
trackData->sender = srReporter;
|
|
return trackData;
|
|
}
|
|
|
|
std::shared_ptr<Client> createPeerConnection(const rtc::Configuration &config, std::string id) {
|
|
auto pc = std::make_shared<rtc::PeerConnection>(config);
|
|
auto client = std::make_shared<Client>(pc);
|
|
|
|
pc->onStateChange([this, id](rtc::PeerConnection::State state) {
|
|
LOG(info) << "State: " << state;
|
|
if (state == rtc::PeerConnection::State::Disconnected || state == rtc::PeerConnection::State::Failed ||
|
|
state == rtc::PeerConnection::State::Closed) {
|
|
boost::asio::post(strand, [this, id]() { m_clients.erase(id); }); // remove disconnected client
|
|
}
|
|
});
|
|
|
|
pc->onGatheringStateChange([this, wpc = make_weak_ptr(pc), id](rtc::PeerConnection::GatheringState state) {
|
|
LOG(info) << "Gathering State: " << state;
|
|
if (state == rtc::PeerConnection::GatheringState::Complete) {
|
|
if (auto pc = wpc.lock()) {
|
|
auto description = pc->localDescription();
|
|
boost::json::object message;
|
|
message["id"] = id;
|
|
message["type"] = description->typeString();
|
|
message["sdp"] = std::string(description.value());
|
|
websocket->send(boost::json::serialize(message)); // Gathering complete, send answer
|
|
}
|
|
}
|
|
});
|
|
|
|
client->video = addVideo(pc, 102, 1, "video-stream", "stream", [this, id, wc = make_weak_ptr(client)]() {
|
|
boost::asio::post(strand, [this, wc]() {
|
|
if (auto c = wc.lock()) {
|
|
c->setState(Client::State::Ready);
|
|
}
|
|
});
|
|
LOG(info) << "Video from " << id << " opened";
|
|
});
|
|
|
|
auto dc = pc->createDataChannel("ping-pong");
|
|
dc->onOpen([id, wdc = make_weak_ptr(dc)]() {
|
|
if (auto dc = wdc.lock()) {
|
|
dc->send("Ping");
|
|
}
|
|
});
|
|
|
|
dc->onMessage(nullptr, [id, wdc = make_weak_ptr(dc)](std::string msg) {
|
|
LOG(info) << "Message from " << id << " received: " << msg;
|
|
if (auto dc = wdc.lock()) {
|
|
dc->send("Ping");
|
|
}
|
|
});
|
|
client->dataChannel = dc;
|
|
|
|
pc->setLocalDescription();
|
|
return client;
|
|
}
|
|
void onWebSocketMesssage(const boost::json::object &message) {
|
|
if (!message.contains("id") || !message.contains("type")) return;
|
|
std::string id = std::string(message.at("id").as_string());
|
|
std::string type = std::string(message.at("type").as_string());
|
|
if (type == "request") {
|
|
m_clients.emplace(id, createPeerConnection(configuration, id));
|
|
} else if (type == "answer") {
|
|
if (m_clients.count(id)) {
|
|
auto &peer = m_clients.at(id);
|
|
std::string sdp = std::string(message.at("sdp").as_string());
|
|
auto pc = peer->peerConnection();
|
|
auto description = rtc::Description(sdp, type);
|
|
pc->setRemoteDescription(description);
|
|
}
|
|
}
|
|
}
|
|
boost::asio::strand<boost::asio::io_context::executor_type> strand;
|
|
rtc::Configuration configuration;
|
|
std::shared_ptr<rtc::WebSocket> websocket;
|
|
std::unordered_map<std::string, std::shared_ptr<Client>> m_clients;
|
|
};
|
|
|
|
Streamer::~Streamer() {
|
|
if (m_d != nullptr) {
|
|
delete m_d;
|
|
}
|
|
}
|
|
|
|
void Streamer::start(const std::string &signalServerAddress, uint16_t signalServerPort) {
|
|
using namespace std::chrono_literals;
|
|
|
|
std::string localId = "server";
|
|
LOG(info) << "The local ID is: " << localId;
|
|
|
|
rtc::WebSocket::Configuration c;
|
|
c.disableTlsVerification = true;
|
|
m_d->websocket = std::make_shared<rtc::WebSocket>(c);
|
|
m_d->websocket->onOpen([]() { LOG(info) << "WebSocket connected, signaling ready"; });
|
|
m_d->websocket->onClosed([]() { LOG(info) << "WebSocket closed"; });
|
|
m_d->websocket->onError([](const std::string &error) { LOG(error) << "WebSocket failed: " << error; });
|
|
|
|
m_d->websocket->onMessage([this](std::variant<rtc::binary, std::string> data) {
|
|
if (!std::holds_alternative<std::string>(data)) return;
|
|
auto &text = std::get<std::string>(data);
|
|
LOG(info) << "ws received: " << std::endl << text;
|
|
auto value = boost::json::parse(text);
|
|
boost::asio::post(m_d->strand,
|
|
[this, value = std::move(value)]() { m_d->onWebSocketMesssage(value.as_object()); });
|
|
});
|
|
|
|
const std::string url =
|
|
"wss://" + signalServerAddress + ":" + std::to_string(signalServerPort) + "/api/v1/webrtc/signal/" + localId;
|
|
LOG(info) << "URL is " << url;
|
|
m_d->websocket->open(url);
|
|
LOG(info) << "Waiting for signaling to be connected...";
|
|
}
|
|
|
|
void Streamer::push(const uint8_t *data, uint32_t size) {
|
|
using namespace std::chrono;
|
|
boost::asio::post(m_d->strand, [this, frame = rtc::binary(reinterpret_cast<const rtc::byte *>(data),
|
|
reinterpret_cast<const rtc::byte *>(data) + size)]() {
|
|
for (auto &[id, client] : m_d->m_clients) {
|
|
if (client->state() != Client::State::Ready) continue;
|
|
auto sender = (*client->video)->sender;
|
|
auto track = (*client->video)->track;
|
|
|
|
sender->rtpConfig->timestamp = sender->rtpConfig->secondsToTimestamp(
|
|
static_cast<double>(duration_cast<milliseconds>(system_clock::now().time_since_epoch()).count()) /
|
|
1000);
|
|
auto reportElapsedTimestamp = sender->rtpConfig->timestamp - sender->lastReportedTimestamp();
|
|
if (sender->rtpConfig->timestampToSeconds(reportElapsedTimestamp) > 1) {
|
|
sender->setNeedsToReport();
|
|
}
|
|
track->send(frame);
|
|
}
|
|
});
|
|
}
|
|
|
|
Streamer::Streamer(boost::asio::io_context &ioContext) : m_d{new WebRTCStreamerPrivate(ioContext)} {
|
|
rtc::InitLogger(rtc::LogLevel::Debug);
|
|
std::string stunServer = "stun:amass.fun:5349"; // ssl
|
|
m_d->configuration.iceServers.emplace_back(stunServer);
|
|
LOG(info) << "STUN server is " << stunServer;
|
|
|
|
rtc::IceServer turnServer("amass.fun", 5349, "amass", "88888888");
|
|
m_d->configuration.iceServers.emplace_back(turnServer);
|
|
|
|
m_d->configuration.disableAutoNegotiation = true;
|
|
}
|