Older/Server/WebRTC/WebSocketSignalSession.cpp

105 lines
4.0 KiB
C++
Raw Normal View History

2024-01-24 23:19:53 +08:00
#include "WebSocketSignalSession.h"
#include "SignalServer.h"
#include "StringUtility.h"
#include <boost/json/parse.hpp>
#include <boost/json/serialize.hpp>
2025-01-15 15:24:47 +08:00
#include <boost/scope/scope_exit.hpp>
2024-01-24 23:19:53 +08:00
2025-01-12 00:46:14 +08:00
WebSocketSignalSession::WebSocketSignalSession(boost::asio::ip::tcp::socket &&socket, SignalServer &server, const std::string &id)
: m_ws(std::move(socket)), m_server(server), m_id(id) {
m_server.join(m_id, this);
2024-01-24 23:19:53 +08:00
}
WebSocketSignalSession::~WebSocketSignalSession() {
2025-01-12 00:46:14 +08:00
m_server.leave(m_id);
2024-01-24 23:19:53 +08:00
}
void WebSocketSignalSession::onAccept(boost::beast::error_code ec) {
if (ec) {
if (ec == boost::asio::error::operation_aborted || ec == boost::beast::websocket::error::closed) return;
std::cerr << "accept: " << ec.message() << "\n";
return;
}
2025-01-12 00:46:14 +08:00
LOG(info) << "accept websocket target: " << m_target << ", id: " << m_id;
2024-01-24 23:19:53 +08:00
// Read a message
2025-01-12 00:46:14 +08:00
m_ws.async_read(m_buffer, boost::beast::bind_front_handler(&WebSocketSignalSession::onRead, shared_from_this()));
2024-01-24 23:19:53 +08:00
}
2025-01-12 00:46:14 +08:00
void WebSocketSignalSession::onRead(const boost::beast::error_code &error, std::size_t bytesTransferred) {
if (error) {
if (error == boost::asio::error::operation_aborted || error == boost::beast::websocket::error::closed) return;
LOG(error) << error << ": " << error.message();
2024-01-24 23:19:53 +08:00
return;
}
2025-01-15 15:24:47 +08:00
boost::scope::scope_exit raii([this] {
m_buffer.consume(m_buffer.size()); // Clear the buffer
m_ws.async_read(m_buffer, boost::beast::bind_front_handler(&WebSocketSignalSession::onRead, shared_from_this()));
});
if (!m_ws.got_text()) {
LOG(warning) << "current not supported binary message.";
return;
}
2024-01-24 23:19:53 +08:00
auto message = boost::beast::buffers_to_string(m_buffer.data());
auto rootObject = boost::json::parse(message);
auto &root = rootObject.as_object();
if (root.contains("id")) {
if (!root.at("id").is_string()) {
LOG(warning) << "wrong format.";
m_ws.close(boost::beast::websocket::close_code::normal);
return;
}
auto destinationId = std::string(root["id"].as_string());
2025-01-12 00:46:14 +08:00
auto destination = m_server.client(destinationId);
2024-01-24 23:19:53 +08:00
if (destination == nullptr) {
LOG(info) << "client " << destinationId << " not found.";
} else {
root["id"] = m_id;
auto reply = std::make_shared<std::string>(boost::json::serialize(root));
destination->send(reply);
}
2025-01-15 15:24:47 +08:00
LOG(info) << message;
} else if (root.contains("type")) {
auto &type = root.at("type").as_string();
if (type == "ping") {
boost::json::object object;
object["type"] = "pong";
send(boost::json::serialize(object));
}
} else {
LOG(info) << message;
2024-01-24 23:19:53 +08:00
}
}
2025-01-15 15:24:47 +08:00
void WebSocketSignalSession::send(const std::shared_ptr<std::string> &ss) {
boost::asio::post(m_ws.get_executor(), [ptr = weak_from_this(), ss]() {
if (ptr.expired()) return;
auto self = ptr.lock();
self->m_queue.push_back(ss);
if (self->m_queue.size() > 1) return; // 之前已经有发送了
self->m_ws.text();
self->m_ws.async_write(boost::asio::buffer(*self->m_queue.front()),
boost::beast::bind_front_handler(&WebSocketSignalSession::on_write, self->shared_from_this()));
});
2024-01-24 23:19:53 +08:00
}
2025-01-15 15:24:47 +08:00
void WebSocketSignalSession::send(std::string &&message) {
return send(std::make_shared<std::string>(std::move(message)));
2024-01-24 23:19:53 +08:00
}
void WebSocketSignalSession::on_write(boost::beast::error_code ec, std::size_t) {
if (ec) {
// Don't report these
if (ec == boost::asio::error::operation_aborted || ec == boost::beast::websocket::error::closed) return;
std::cerr << "write: " << ec.message() << "\n";
return;
}
// Remove the string from the queue
m_queue.erase(m_queue.begin());
if (!m_queue.empty())
m_ws.async_write(boost::asio::buffer(*m_queue.front()),
boost::beast::bind_front_handler(&WebSocketSignalSession::on_write, shared_from_this()));
}