#include "WebsocketSession.h" #include WebSocketSession::WebSocketSession(boost::asio::ip::tcp::socket &&socket, std::shared_ptr const &state) : m_ws(std::move(socket)), m_state(state) {} WebSocketSession::~WebSocketSession() { // Remove this session from the list of active sessions m_state->leave(this); } void WebSocketSession::onAccept(boost::beast::error_code ec) { // Handle the error, if any if (ec) { if (ec == boost::asio::error::operation_aborted || ec == boost::beast::websocket::error::closed) return; std::cerr << "accept: " << ec.message() << "\n"; return; } // Add this session to the list of active sessions m_state->join(this); // Read a message m_ws.async_read(m_buffer, boost::beast::bind_front_handler(&WebSocketSession::on_read, shared_from_this())); } void WebSocketSession::on_read(boost::beast::error_code ec, std::size_t) { // Handle the error, if any if (ec) { // Don't report these if (ec == boost::asio::error::operation_aborted || ec == boost::beast::websocket::error::closed) return; LOG(error) << "read: " << ec.message(); return; } LOG(info) << boost::beast::buffers_to_string(m_buffer.data()); // Send to all connections m_state->send(boost::beast::buffers_to_string(m_buffer.data())); // Clear the buffer m_buffer.consume(m_buffer.size()); // Read another message m_ws.async_read(m_buffer, boost::beast::bind_front_handler(&WebSocketSession::on_read, shared_from_this())); } void WebSocketSession::send(std::shared_ptr const &ss) { // Post our work to the strand, this ensures // that the members of `this` will not be // accessed concurrently. m_ws.text(); boost::asio::post(m_ws.get_executor(), boost::beast::bind_front_handler(&WebSocketSession::onSend, shared_from_this(), ss)); } void WebSocketSession::onSend(std::shared_ptr const &ss) { // Always add to queue m_queue.push_back(ss); // Are we already writing? if (m_queue.size() > 1) return; // We are not currently writing, so send this immediately m_ws.async_write(boost::asio::buffer(*m_queue.front()), boost::beast::bind_front_handler(&WebSocketSession::on_write, shared_from_this())); } void WebSocketSession::on_write(boost::beast::error_code ec, std::size_t) { // Handle the error, if any if (ec) { // Don't report these if (ec == boost::asio::error::operation_aborted || ec == boost::beast::websocket::error::closed) return; std::cerr << "write: " << ec.message() << "\n"; return; } // Remove the string from the queue m_queue.erase(m_queue.begin()); // Send the next message if any if (!m_queue.empty()) m_ws.async_write(boost::asio::buffer(*m_queue.front()), boost::beast::bind_front_handler(&WebSocketSession::on_write, shared_from_this())); }