#include "WebSocketSignalSession.h" #include "SignalServer.h" #include "StringUtility.h" #include #include #include WebSocketSignalSession::WebSocketSignalSession(boost::asio::ip::tcp::socket &&socket, SignalServer &server, const std::string &id) : m_ws(std::move(socket)), m_server(server), m_id(id) { m_server.join(m_id, this); } WebSocketSignalSession::~WebSocketSignalSession() { m_server.leave(m_id); } void WebSocketSignalSession::onAccept(boost::beast::error_code ec) { if (ec) { if (ec == boost::asio::error::operation_aborted || ec == boost::beast::websocket::error::closed) return; std::cerr << "accept: " << ec.message() << "\n"; return; } LOG(info) << "accept websocket target: " << m_target << ", id: " << m_id; // Read a message m_ws.async_read(m_buffer, boost::beast::bind_front_handler(&WebSocketSignalSession::onRead, shared_from_this())); } void WebSocketSignalSession::onRead(const boost::beast::error_code &error, std::size_t bytesTransferred) { if (error) { if (error == boost::asio::error::operation_aborted || error == boost::beast::websocket::error::closed) return; LOG(error) << error << ": " << error.message(); return; } boost::scope::scope_exit raii([this] { m_buffer.consume(m_buffer.size()); // Clear the buffer m_ws.async_read(m_buffer, boost::beast::bind_front_handler(&WebSocketSignalSession::onRead, shared_from_this())); }); if (!m_ws.got_text()) { LOG(warning) << "current not supported binary message."; return; } auto message = boost::beast::buffers_to_string(m_buffer.data()); auto rootObject = boost::json::parse(message); auto &root = rootObject.as_object(); if (root.contains("id")) { if (!root.at("id").is_string()) { LOG(warning) << "wrong format."; m_ws.close(boost::beast::websocket::close_code::normal); return; } auto destinationId = std::string(root["id"].as_string()); auto destination = m_server.client(destinationId); if (destination == nullptr) { LOG(info) << "client " << destinationId << " not found."; } else { root["id"] = m_id; auto reply = std::make_shared(boost::json::serialize(root)); destination->send(reply); } LOG(info) << message; } else if (root.contains("type")) { auto &type = root.at("type").as_string(); if (type == "ping") { boost::json::object object; object["type"] = "pong"; send(boost::json::serialize(object)); } } else { LOG(info) << message; } } void WebSocketSignalSession::send(const std::shared_ptr &ss) { boost::asio::post(m_ws.get_executor(), [ptr = weak_from_this(), ss]() { if (ptr.expired()) return; auto self = ptr.lock(); self->m_queue.push_back(ss); if (self->m_queue.size() > 1) return; // 之前已经有发送了 self->m_ws.text(); self->m_ws.async_write(boost::asio::buffer(*self->m_queue.front()), boost::beast::bind_front_handler(&WebSocketSignalSession::on_write, self->shared_from_this())); }); } void WebSocketSignalSession::send(std::string &&message) { return send(std::make_shared(std::move(message))); } void WebSocketSignalSession::on_write(boost::beast::error_code ec, std::size_t) { if (ec) { // Don't report these if (ec == boost::asio::error::operation_aborted || ec == boost::beast::websocket::error::closed) return; std::cerr << "write: " << ec.message() << "\n"; return; } // Remove the string from the queue m_queue.erase(m_queue.begin()); if (!m_queue.empty()) m_ws.async_write(boost::asio::buffer(*m_queue.front()), boost::beast::bind_front_handler(&WebSocketSignalSession::on_write, shared_from_this())); }