From e3580ec8f49d0a0648fa372eea2a61bbb71b1439 Mon Sep 17 00:00:00 2001 From: amass Date: Wed, 15 Jan 2025 15:24:47 +0800 Subject: [PATCH] add ws ping/pong. --- Server/WebRTC/WebSocketSignalSession.cpp | 50 +++++++++++++++--------- Server/WebRTC/WebSocketSignalSession.h | 6 +-- Server/conf/nginx.conf | 2 + Server/conf/server.conf | 3 -- UnitTest/WebRTCClient/Client.cpp | 2 + WebApplication/js/WebRTCClient.js | 48 ++++++++++++++++++++--- 6 files changed, 81 insertions(+), 30 deletions(-) diff --git a/Server/WebRTC/WebSocketSignalSession.cpp b/Server/WebRTC/WebSocketSignalSession.cpp index 2ee22df..b1fbc0a 100644 --- a/Server/WebRTC/WebSocketSignalSession.cpp +++ b/Server/WebRTC/WebSocketSignalSession.cpp @@ -3,6 +3,7 @@ #include "StringUtility.h" #include #include +#include WebSocketSignalSession::WebSocketSignalSession(boost::asio::ip::tcp::socket &&socket, SignalServer &server, const std::string &id) : m_ws(std::move(socket)), m_server(server), m_id(id) { @@ -31,8 +32,16 @@ void WebSocketSignalSession::onRead(const boost::beast::error_code &error, std:: LOG(error) << error << ": " << error.message(); return; } + + boost::scope::scope_exit raii([this] { + m_buffer.consume(m_buffer.size()); // Clear the buffer + m_ws.async_read(m_buffer, boost::beast::bind_front_handler(&WebSocketSignalSession::onRead, shared_from_this())); + }); + if (!m_ws.got_text()) { + LOG(warning) << "current not supported binary message."; + return; + } auto message = boost::beast::buffers_to_string(m_buffer.data()); - LOG(info) << message; auto rootObject = boost::json::parse(message); auto &root = rootObject.as_object(); if (root.contains("id")) { @@ -50,31 +59,36 @@ void WebSocketSignalSession::onRead(const boost::beast::error_code &error, std:: auto reply = std::make_shared(boost::json::serialize(root)); destination->send(reply); } + LOG(info) << message; + } else if (root.contains("type")) { + auto &type = root.at("type").as_string(); + if (type == "ping") { + boost::json::object object; + object["type"] = "pong"; + send(boost::json::serialize(object)); + } + } else { + LOG(info) << message; } - m_buffer.consume(m_buffer.size()); // Clear the buffer - m_ws.async_read(m_buffer, boost::beast::bind_front_handler(&WebSocketSignalSession::onRead, shared_from_this())); } -void WebSocketSignalSession::send(std::shared_ptr const &ss) { - m_ws.text(); - boost::asio::post(m_ws.get_executor(), - boost::beast::bind_front_handler(&WebSocketSignalSession::onSend, shared_from_this(), ss)); +void WebSocketSignalSession::send(const std::shared_ptr &ss) { + boost::asio::post(m_ws.get_executor(), [ptr = weak_from_this(), ss]() { + if (ptr.expired()) return; + auto self = ptr.lock(); + self->m_queue.push_back(ss); + if (self->m_queue.size() > 1) return; // 之前已经有发送了 + self->m_ws.text(); + self->m_ws.async_write(boost::asio::buffer(*self->m_queue.front()), + boost::beast::bind_front_handler(&WebSocketSignalSession::on_write, self->shared_from_this())); + }); } -void WebSocketSignalSession::onSend(std::shared_ptr const &ss) { - // Always add to queue - m_queue.push_back(ss); - - // Are we already writing? - if (m_queue.size() > 1) return; - - // We are not currently writing, so send this immediately - m_ws.async_write(boost::asio::buffer(*m_queue.front()), - boost::beast::bind_front_handler(&WebSocketSignalSession::on_write, shared_from_this())); +void WebSocketSignalSession::send(std::string &&message) { + return send(std::make_shared(std::move(message))); } void WebSocketSignalSession::on_write(boost::beast::error_code ec, std::size_t) { - // Handle the error, if any if (ec) { // Don't report these if (ec == boost::asio::error::operation_aborted || ec == boost::beast::websocket::error::closed) return; diff --git a/Server/WebRTC/WebSocketSignalSession.h b/Server/WebRTC/WebSocketSignalSession.h index 5dd8ee3..67dd7a4 100644 --- a/Server/WebRTC/WebSocketSignalSession.h +++ b/Server/WebRTC/WebSocketSignalSession.h @@ -36,13 +36,13 @@ public: } // Send a message - void send(std::shared_ptr const &ss); + void send(const std::shared_ptr &ss); + void send(std::string &&message); protected: void onAccept(boost::beast::error_code ec); void onRead(const boost::beast::error_code &error, std::size_t bytesTransferred); void on_write(boost::beast::error_code ec, std::size_t bytes_transferred); - void onSend(std::shared_ptr const &ss); private: boost::beast::websocket::stream m_ws; @@ -50,7 +50,7 @@ private: std::string m_id; std::string m_target; boost::beast::flat_buffer m_buffer; - std::vector> m_queue; + std::vector> m_queue; }; #endif // __WEBSOCKETSIGNALSESSION_H__ \ No newline at end of file diff --git a/Server/conf/nginx.conf b/Server/conf/nginx.conf index 1ad306c..6eae70d 100644 --- a/Server/conf/nginx.conf +++ b/Server/conf/nginx.conf @@ -398,6 +398,7 @@ http { server { listen 443 ssl; server_name amass.fun; + proxy_http_version 1.1; ssl_certificate cert/amass.fun.pem; ssl_certificate_key cert/amass.fun.key; include server.conf; @@ -406,6 +407,7 @@ http { server { listen 443 ssl; server_name mirror.amass.fun; + proxy_http_version 1.1; ssl_certificate cert/mirror.amass.fun.pem; ssl_certificate_key cert/mirror.amass.fun.key; include server.conf; diff --git a/Server/conf/server.conf b/Server/conf/server.conf index 5c4e0f9..9adec8b 100644 --- a/Server/conf/server.conf +++ b/Server/conf/server.conf @@ -29,7 +29,6 @@ location ^~ /api/v1/search/ { } location ^~ /api/v1/auth { - proxy_http_version 1.1; proxy_set_header Upgrade $http_upgrade; proxy_set_header Connection "upgrade"; proxy_set_header X-Real-IP $remote_addr; @@ -53,7 +52,6 @@ location /wt { } location = /wt/app.js { - proxy_http_version 1.1; proxy_set_header Upgrade $http_upgrade; proxy_set_header Connection "upgrade"; proxy_set_header X-Real-IP $remote_addr; @@ -78,7 +76,6 @@ location /freedom { } proxy_redirect off; proxy_pass http://127.0.0.1:8089; - proxy_http_version 1.1; proxy_set_header Upgrade $http_upgrade; proxy_set_header Connection "upgrade"; proxy_set_header Host $host; diff --git a/UnitTest/WebRTCClient/Client.cpp b/UnitTest/WebRTCClient/Client.cpp index 2fbd374..1109be7 100644 --- a/UnitTest/WebRTCClient/Client.cpp +++ b/UnitTest/WebRTCClient/Client.cpp @@ -116,6 +116,8 @@ std::string Client::randomId(size_t length) { Client::Client() : m_d{new ClientPrivate(this)} { rtc::InitLogger(rtc::LogLevel::Info); + m_d->config.iceServers.emplace_back("stun:amass.fun:5439"); + m_d->config.iceServers.emplace_back(rtc::IceServer("amass.fun", 5439, "amass", "88888888")); m_id = randomId(4); LOG(info) << "The local ID is " << m_id; log(std::format("The local ID is {}", m_id)); diff --git a/WebApplication/js/WebRTCClient.js b/WebApplication/js/WebRTCClient.js index e9bba25..ca3a63e 100644 --- a/WebApplication/js/WebRTCClient.js +++ b/WebApplication/js/WebRTCClient.js @@ -1,7 +1,19 @@ WT_DECLARE_WT_MEMBER(1, JavaScriptConstructor, "WebRTCClient", function (WT, client, offerId, offerBtn, sendMsg, sendBtn, textBrowser, localId, url) { this.peerConnectionMap = {}; this.dataChannelMap = {}; - this.config = {}; + this.ws = null; + this.config = { + iceServers: [ + { + urls: "stun:amass.fun:5439" + }, + { + urls: "turns:amass.fun", + username: 'amass', + credential: '88888888' + }, + ], + }; this.log = (text) => { textBrowser.value = `${textBrowser.value}\n${text}`; }; @@ -89,12 +101,26 @@ WT_DECLARE_WT_MEMBER(1, JavaScriptConstructor, "WebRTCClient", function (WT, cli return new Promise((resolve, reject) => { const ws = new WebSocket(url); ws.onopen = () => resolve(ws); - ws.onerror = () => reject(new Error('WebSocket error')); - ws.onclose = () => console.error('WebSocket disconnected'); + ws.onerror = () => { + if (this.pingTimer) { + clearInterval(this.pingTimer); + this.pingTimer = 0; + } + reject(new Error('WebSocket error')); + }; + ws.onclose = () => { + if (this.pingTimer) { + clearInterval(this.pingTimer); + this.pingTimer = 0; + } + console.error('WebSocket disconnected'); + }; ws.onmessage = (e) => { if (typeof (e.data) != 'string') return; const message = JSON.parse(e.data); - console.log(message); + if (message.type == undefined || message.type != "pong") { + console.log(message); + } const { id, type } = message; let pc = this.peerConnectionMap[id]; if (!pc) { @@ -115,7 +141,6 @@ WT_DECLARE_WT_MEMBER(1, JavaScriptConstructor, "WebRTCClient", function (WT, cli } }); break; - case 'candidate': pc.addIceCandidate({ candidate: message.candidate, @@ -134,6 +159,17 @@ WT_DECLARE_WT_MEMBER(1, JavaScriptConstructor, "WebRTCClient", function (WT, cli offerBtn.disabled = false; offerBtn.onclick = () => { this.offerPeerConnection(ws, offerId.value); - } + }; + this.ws = ws; + this.pingTimer = setInterval(() => { + if (this.ws.readyState === WebSocket.OPEN) { + this.ws.send(JSON.stringify({ + type: "ping", + })); + } else { + clearInterval(this.pingTimer); + this.pingTimer = 0; + } + }, 3000); }); }); \ No newline at end of file