Older/Server/ChatRoom/WebSocketChatSession.cpp
2024-01-24 23:19:53 +08:00

89 lines
3.3 KiB
C++

#include "WebSocketChatSession.h"
#include "ChatRoom.h"
#include "StringUtility.h"
#include <boost/json/parse.hpp>
#include <boost/json/serialize.hpp>
#include <iostream>
WebSocketSession::WebSocketSession(boost::asio::ip::tcp::socket &&socket) : m_ws(std::move(socket)) {
}
WebSocketSession::~WebSocketSession() {
auto chatRoom = Amass::Singleton<ChatRoom>::instance();
chatRoom->leave(this);
}
void WebSocketSession::onAccept(boost::beast::error_code ec) {
if (ec) {
if (ec == boost::asio::error::operation_aborted || ec == boost::beast::websocket::error::closed) return;
std::cerr << "accept: " << ec.message() << "\n";
return;
}
LOG(info) << "accept websocket target: " << m_target;
if (m_target.find("/webrtc") == 0) {
auto splits = Amass::StringUtility::split(m_target, "/");
if (!splits.empty()) m_id = splits.back();
}
auto chatRoom = Amass::Singleton<ChatRoom>::instance();
chatRoom->join(this);
// Read a message
m_ws.async_read(m_buffer, boost::beast::bind_front_handler(&WebSocketSession::on_read, shared_from_this()));
}
void WebSocketSession::on_read(boost::beast::error_code ec, std::size_t) {
if (ec) {
// Don't report these
if (ec == boost::asio::error::operation_aborted || ec == boost::beast::websocket::error::closed) return;
LOG(error) << "read: " << ec.message();
return;
}
auto message = boost::beast::buffers_to_string(m_buffer.data());
LOG(info) << message;
auto chatRoom = Amass::Singleton<ChatRoom>::instance();
chatRoom->send(message); // Send to all connections
m_buffer.consume(m_buffer.size()); // Clear the buffer
m_ws.async_read(m_buffer, boost::beast::bind_front_handler(&WebSocketSession::on_read, shared_from_this()));
}
void WebSocketSession::send(std::shared_ptr<std::string const> const &ss) {
// Post our work to the strand, this ensures
// that the members of `this` will not be
// accessed concurrently.
m_ws.text();
boost::asio::post(m_ws.get_executor(),
boost::beast::bind_front_handler(&WebSocketSession::onSend, shared_from_this(), ss));
}
void WebSocketSession::onSend(std::shared_ptr<std::string const> const &ss) {
// Always add to queue
m_queue.push_back(ss);
// Are we already writing?
if (m_queue.size() > 1) return;
// We are not currently writing, so send this immediately
m_ws.async_write(boost::asio::buffer(*m_queue.front()),
boost::beast::bind_front_handler(&WebSocketSession::on_write, shared_from_this()));
}
void WebSocketSession::on_write(boost::beast::error_code ec, std::size_t) {
// Handle the error, if any
if (ec) {
// Don't report these
if (ec == boost::asio::error::operation_aborted || ec == boost::beast::websocket::error::closed) return;
std::cerr << "write: " << ec.message() << "\n";
return;
}
// Remove the string from the queue
m_queue.erase(m_queue.begin());
// Send the next message if any
if (!m_queue.empty())
m_ws.async_write(boost::asio::buffer(*m_queue.front()),
boost::beast::bind_front_handler(&WebSocketSession::on_write, shared_from_this()));
}