#include "WebSocketSignalSession.h" #include "SignalServer.h" #include "StringUtility.h" #include #include WebSocketSignalSession::WebSocketSignalSession(boost::asio::ip::tcp::socket &&socket) : m_ws(std::move(socket)) { } WebSocketSignalSession::~WebSocketSignalSession() { auto server = Amass::Singleton::instance(); server->leave(m_id); } void WebSocketSignalSession::onAccept(boost::beast::error_code ec) { if (ec) { if (ec == boost::asio::error::operation_aborted || ec == boost::beast::websocket::error::closed) return; std::cerr << "accept: " << ec.message() << "\n"; return; } LOG(info) << "accept websocket target: " << m_target; if (m_target.find("/webrtc") == 0) { auto splits = Amass::StringUtility::split(m_target, "/"); if (!splits.empty()) m_id = splits.back(); } auto server = Amass::Singleton::instance(); server->join(m_id, this); // Read a message m_ws.async_read(m_buffer, boost::beast::bind_front_handler(&WebSocketSignalSession::on_read, shared_from_this())); } void WebSocketSignalSession::on_read(boost::beast::error_code ec, std::size_t) { // Handle the error, if any if (ec) { // Don't report these if (ec == boost::asio::error::operation_aborted || ec == boost::beast::websocket::error::closed) return; LOG(error) << "read: " << ec.message(); return; } auto message = boost::beast::buffers_to_string(m_buffer.data()); LOG(info) << message; auto rootObject = boost::json::parse(message); auto &root = rootObject.as_object(); if (root.contains("id")) { if (!root.at("id").is_string()) { LOG(warning) << "wrong format."; m_ws.close(boost::beast::websocket::close_code::normal); return; } auto server = Amass::Singleton::instance(); auto destinationId = std::string(root["id"].as_string()); auto destination = server->client(destinationId); if (destination == nullptr) { LOG(info) << "client " << destinationId << " not found."; } else { root["id"] = m_id; auto reply = std::make_shared(boost::json::serialize(root)); destination->send(reply); } } m_buffer.consume(m_buffer.size()); // Clear the buffer m_ws.async_read(m_buffer, boost::beast::bind_front_handler(&WebSocketSignalSession::on_read, shared_from_this())); } void WebSocketSignalSession::send(std::shared_ptr const &ss) { // Post our work to the strand, this ensures // that the members of `this` will not be // accessed concurrently. m_ws.text(); boost::asio::post(m_ws.get_executor(), boost::beast::bind_front_handler(&WebSocketSignalSession::onSend, shared_from_this(), ss)); } void WebSocketSignalSession::onSend(std::shared_ptr const &ss) { // Always add to queue m_queue.push_back(ss); // Are we already writing? if (m_queue.size() > 1) return; // We are not currently writing, so send this immediately m_ws.async_write(boost::asio::buffer(*m_queue.front()), boost::beast::bind_front_handler(&WebSocketSignalSession::on_write, shared_from_this())); } void WebSocketSignalSession::on_write(boost::beast::error_code ec, std::size_t) { // Handle the error, if any if (ec) { // Don't report these if (ec == boost::asio::error::operation_aborted || ec == boost::beast::websocket::error::closed) return; std::cerr << "write: " << ec.message() << "\n"; return; } // Remove the string from the queue m_queue.erase(m_queue.begin()); if (!m_queue.empty()) m_ws.async_write(boost::asio::buffer(*m_queue.front()), boost::beast::bind_front_handler(&WebSocketSignalSession::on_write, shared_from_this())); }