From cc63303cbbd6912b71b002bb847c516433f1c2bc Mon Sep 17 00:00:00 2001 From: amass <168062547@qq.com> Date: Fri, 14 Mar 2025 17:04:28 +0800 Subject: [PATCH] =?UTF-8?q?=E6=B7=BB=E5=8A=A0=E4=BF=A1=E4=BB=A4=E5=A4=84?= =?UTF-8?q?=E7=90=86=E4=BB=A3=E7=A0=81=E3=80=82?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .vscode/settings.json | 4 ++ Application.cpp | 3 + Application.h | 2 + CMakeLists.txt | 6 ++ WebRTC/SignalServer.cpp | 39 +++++++++++ WebRTC/SignalServer.h | 25 +++++++ WebRTC/WebSocketSignalSession.cpp | 106 ++++++++++++++++++++++++++++++ WebRTC/WebSocketSignalSession.h | 58 ++++++++++++++++ 8 files changed, 243 insertions(+) create mode 100644 .vscode/settings.json create mode 100644 WebRTC/SignalServer.cpp create mode 100644 WebRTC/SignalServer.h create mode 100644 WebRTC/WebSocketSignalSession.cpp create mode 100644 WebRTC/WebSocketSignalSession.h diff --git a/.vscode/settings.json b/.vscode/settings.json new file mode 100644 index 0000000..293fcec --- /dev/null +++ b/.vscode/settings.json @@ -0,0 +1,4 @@ +{ + "editor.formatOnSaveMode": "modificationsIfAvailable", + "editor.formatOnSave": true +} \ No newline at end of file diff --git a/Application.cpp b/Application.cpp index 7f2669f..cf585c3 100644 --- a/Application.cpp +++ b/Application.cpp @@ -10,6 +10,7 @@ #include "SessionStore.h" #include "Settings.h" #include "WeChat/Corporation/Context.h" +#include "WebRTC/SignalServer.h" #include namespace Older { @@ -37,6 +38,8 @@ Application::Application() : m_d{new ApplicationPrivate()} { m_corporationContext = Singleton::construct(*m_ioContext->ioContext()); m_corporationContext->start(); + m_signalServer = std::make_shared(*this); + m_d->router->insert("/api/v1/notify", [this](HttpSession &session, const Request &request, const matches &matches) { auto manager = Singleton::instance(); if (manager) { diff --git a/Application.h b/Application.h index 5a4f743..771d756 100644 --- a/Application.h +++ b/Application.h @@ -24,6 +24,7 @@ class ApplicationPrivate; class HttpSession; class Database; class SessionStore; +class SignalServer; class Application : public std::enable_shared_from_this { public: @@ -47,6 +48,7 @@ private: std::shared_ptr m_sessionStore; std::shared_ptr m_database; std::shared_ptr m_corporationContext; + std::shared_ptr m_signalServer; }; } // namespace Older #endif // __APPLICATION_H__ \ No newline at end of file diff --git a/CMakeLists.txt b/CMakeLists.txt index 4be7c48..a3236d7 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -1,3 +1,6 @@ +set(CMAKE_CXX_STANDARD 20) +set(CMAKE_CXX_STANDARD_REQUIRED ON) + find_package(Boost REQUIRED COMPONENTS json) find_package(OpenSSL REQUIRED) @@ -6,6 +9,9 @@ add_subdirectory(Base) add_subdirectory(UnitTest) add_executable(Older main.cpp + WebRTC/SignalServer.h WebRTC/SignalServer.cpp + WebRTC/WebSocketSignalSession.h WebRTC/WebSocketSignalSession.cpp + WeChat/Corporation/Context.h WeChat/Corporation/Context.cpp Application.h Application.cpp diff --git a/WebRTC/SignalServer.cpp b/WebRTC/SignalServer.cpp new file mode 100644 index 0000000..970ecb6 --- /dev/null +++ b/WebRTC/SignalServer.cpp @@ -0,0 +1,39 @@ +#include "SignalServer.h" +#include "../Application.h" +#include "../HttpSession.h" +#include "Core/Logger.h" +#include "WebSocketSignalSession.h" +#include + +namespace Older { +SignalServer::SignalServer(Application &app) { + using namespace boost::urls; + // clang-format off + app.insertUrl("/api/v1/webrtc/signal/{id}", [this](HttpSession &session, const Application::Request &request, const matches &matches) { + auto id = matches.at("id"); + if (boost::beast::websocket::is_upgrade(request)) { + auto ws = std::make_shared(session.releaseSocket(), *this, id); + ws->run(request); + } + }); + // clang-format on +} + +void SignalServer::join(const std::string &id, WebSocketSignalSession *client) { + m_clients.insert({id, client}); +} + +void SignalServer::leave(const std::string &id) { + if (m_clients.contains(id)) { + m_clients.erase(id); + } +} + +WebSocketSignalSession *SignalServer::client(const std::string &id) { + WebSocketSignalSession *ret = nullptr; + if (m_clients.contains(id)) { + ret = m_clients.at(id); + } + return ret; +} +} \ No newline at end of file diff --git a/WebRTC/SignalServer.h b/WebRTC/SignalServer.h new file mode 100644 index 0000000..c015d23 --- /dev/null +++ b/WebRTC/SignalServer.h @@ -0,0 +1,25 @@ +#ifndef __SIGNALSERVER_H__ +#define __SIGNALSERVER_H__ + +#include +#include + + +namespace Older { + +class Application; +class WebSocketSignalSession; + +class SignalServer { +public: + SignalServer(Application &app); + void join(const std::string &id, WebSocketSignalSession *client); + void leave(const std::string &id); + WebSocketSignalSession *client(const std::string &id); + +private: + std::unordered_map m_clients; +}; +} // namespace Older + +#endif // __SIGNALSERVER_H__ \ No newline at end of file diff --git a/WebRTC/WebSocketSignalSession.cpp b/WebRTC/WebSocketSignalSession.cpp new file mode 100644 index 0000000..bce4c70 --- /dev/null +++ b/WebRTC/WebSocketSignalSession.cpp @@ -0,0 +1,106 @@ +#include "WebSocketSignalSession.h" +#include "Core/Logger.h" +#include "SignalServer.h" +#include +#include +#include + +namespace Older { +WebSocketSignalSession::WebSocketSignalSession(boost::asio::ip::tcp::socket &&socket, SignalServer &server, const std::string &id) + : m_ws(std::move(socket)), m_server(server), m_id(id) { + m_server.join(m_id, this); +} + +WebSocketSignalSession::~WebSocketSignalSession() { + m_server.leave(m_id); +} + +void WebSocketSignalSession::onAccept(boost::beast::error_code ec) { + if (ec) { + if (ec == boost::asio::error::operation_aborted || ec == boost::beast::websocket::error::closed) return; + LOG(error) << "accept: " << ec.message() << "\n"; + return; + } + LOG(info) << "accept websocket target: " << m_target << ", id: " << m_id; + + // Read a message + m_ws.async_read(m_buffer, boost::beast::bind_front_handler(&WebSocketSignalSession::onRead, shared_from_this())); +} + +void WebSocketSignalSession::onRead(const boost::beast::error_code &error, std::size_t bytesTransferred) { + if (error) { + if (error == boost::asio::error::operation_aborted || error == boost::beast::websocket::error::closed) return; + LOG(error) << error << ": " << error.message(); + return; + } + + boost::scope::scope_exit raii([this] { + m_buffer.consume(m_buffer.size()); // Clear the buffer + m_ws.async_read(m_buffer, boost::beast::bind_front_handler(&WebSocketSignalSession::onRead, shared_from_this())); + }); + if (!m_ws.got_text()) { + LOG(warning) << "current not supported binary message."; + return; + } + auto message = boost::beast::buffers_to_string(m_buffer.data()); + auto rootObject = boost::json::parse(message); + auto &root = rootObject.as_object(); + if (root.contains("id")) { + if (!root.at("id").is_string()) { + LOG(warning) << "wrong format."; + m_ws.close(boost::beast::websocket::close_code::normal); + return; + } + auto destinationId = std::string(root["id"].as_string()); + auto destination = m_server.client(destinationId); + if (destination == nullptr) { + LOG(info) << "client " << destinationId << " not found."; + } else { + root["id"] = m_id; + auto reply = std::make_shared(boost::json::serialize(root)); + destination->send(reply); + } + LOG(info) << message; + } else if (root.contains("type")) { + auto &type = root.at("type").as_string(); + if (type == "ping") { + boost::json::object object; + object["type"] = "pong"; + send(boost::json::serialize(object)); + } + } else { + LOG(info) << message; + } +} + +void WebSocketSignalSession::send(const std::shared_ptr &ss) { + boost::asio::post(m_ws.get_executor(), [ptr = weak_from_this(), ss]() { + if (ptr.expired()) return; + auto self = ptr.lock(); + self->m_queue.push_back(ss); + if (self->m_queue.size() > 1) return; // 之前已经有发送了 + self->m_ws.text(); + self->m_ws.async_write(boost::asio::buffer(*self->m_queue.front()), + boost::beast::bind_front_handler(&WebSocketSignalSession::on_write, self->shared_from_this())); + }); +} + +void WebSocketSignalSession::send(std::string &&message) { + return send(std::make_shared(std::move(message))); +} + +void WebSocketSignalSession::on_write(boost::beast::error_code ec, std::size_t) { + if (ec) { + // Don't report these + if (ec == boost::asio::error::operation_aborted || ec == boost::beast::websocket::error::closed) return; + std::cerr << "write: " << ec.message() << "\n"; + return; + } + + // Remove the string from the queue + m_queue.erase(m_queue.begin()); + if (!m_queue.empty()) + m_ws.async_write(boost::asio::buffer(*m_queue.front()), + boost::beast::bind_front_handler(&WebSocketSignalSession::on_write, shared_from_this())); +} +} \ No newline at end of file diff --git a/WebRTC/WebSocketSignalSession.h b/WebRTC/WebSocketSignalSession.h new file mode 100644 index 0000000..020ef00 --- /dev/null +++ b/WebRTC/WebSocketSignalSession.h @@ -0,0 +1,58 @@ +#ifndef __WEBSOCKETSIGNALSESSION_H__ +#define __WEBSOCKETSIGNALSESSION_H__ + +#include +#include +#include +#include +#include + +namespace Older { + + +class SignalServer; + +/** + * @brief Represents an active WebSocket connection to the server + */ +class WebSocketSignalSession : public std::enable_shared_from_this { +public: + WebSocketSignalSession(boost::asio::ip::tcp::socket &&socket, SignalServer &server, const std::string &id); + + ~WebSocketSignalSession(); + + template + void run(boost::beast::http::request> request) { + using namespace boost::beast::http; + using namespace boost::beast::websocket; + // Set suggested timeout settings for the websocket + m_ws.set_option(stream_base::timeout::suggested(boost::beast::role_type::server)); + m_target = request.target(); + // Set a decorator to change the Server of the handshake + m_ws.set_option(stream_base::decorator([](response_type &response) { + response.set(field::server, std::string(BOOST_BEAST_VERSION_STRING) + " websocket-chat-multi"); + })); + // LOG(info) << request.base().target(); //get path + // Accept the websocket handshake + m_ws.async_accept(request, [self{shared_from_this()}](boost::beast::error_code ec) { self->onAccept(ec); }); + } + + // Send a message + void send(const std::shared_ptr &ss); + void send(std::string &&message); + +protected: + void onAccept(boost::beast::error_code ec); + void onRead(const boost::beast::error_code &error, std::size_t bytesTransferred); + void on_write(boost::beast::error_code ec, std::size_t bytes_transferred); + +private: + boost::beast::websocket::stream m_ws; + SignalServer &m_server; + std::string m_id; + std::string m_target; + boost::beast::flat_buffer m_buffer; + std::vector> m_queue; +};} + +#endif // __WEBSOCKETSIGNALSESSION_H__ \ No newline at end of file