From cd77b8d1e13b7c6cec8cf3cd02da092b90147250 Mon Sep 17 00:00:00 2001 From: amass <168062547@qq.com> Date: Wed, 24 Jan 2024 23:19:53 +0800 Subject: [PATCH] Update server source, --- CMakeLists.txt | 2 + Server/{SharedState.cpp => Application.cpp} | 405 +++++++++--------- Server/Application.h | 45 ++ Server/CMakeLists.txt | 5 +- Server/ChatRoom/CMakeLists.txt | 10 + Server/ChatRoom/ChatRoom.cpp | 31 ++ Server/ChatRoom/ChatRoom.h | 23 + .../WebSocketChatSession.cpp} | 37 +- .../WebSocketChatSession.h} | 11 +- Server/HttpSession.cpp | 19 +- Server/HttpSession.h | 4 +- Server/Listener.cpp | 7 +- Server/Listener.h | 10 +- Server/ServiceLogic.h | 13 +- Server/ServiceLogic.inl | 2 +- Server/SharedState.h | 65 --- Server/WebRTC/CMakeLists.txt | 8 + Server/WebRTC/SignalServer.cpp | 19 + Server/WebRTC/SignalServer.h | 20 + Server/WebRTC/WebSocketSignalSession.cpp | 102 +++++ Server/WebRTC/WebSocketSignalSession.h | 53 +++ Server/main.cpp | 121 ++---- resource/{deploy.sh => build.sh} | 0 23 files changed, 575 insertions(+), 437 deletions(-) rename Server/{SharedState.cpp => Application.cpp} (64%) create mode 100644 Server/Application.h create mode 100644 Server/ChatRoom/CMakeLists.txt create mode 100644 Server/ChatRoom/ChatRoom.cpp create mode 100644 Server/ChatRoom/ChatRoom.h rename Server/{WebsocketSession.cpp => ChatRoom/WebSocketChatSession.cpp} (71%) rename Server/{WebsocketSession.h => ChatRoom/WebSocketChatSession.h} (87%) delete mode 100644 Server/SharedState.h create mode 100644 Server/WebRTC/CMakeLists.txt create mode 100644 Server/WebRTC/SignalServer.cpp create mode 100644 Server/WebRTC/SignalServer.h create mode 100644 Server/WebRTC/WebSocketSignalSession.cpp create mode 100644 Server/WebRTC/WebSocketSignalSession.h rename resource/{deploy.sh => build.sh} (100%) diff --git a/CMakeLists.txt b/CMakeLists.txt index 6781dc8..2d264e9 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -5,6 +5,8 @@ project(Older) set(CMAKE_CXX_STANDARD 20) set(CMAKE_CXX_STANDARD_REQUIRED ON) +set(OPENSSL_LIBRARIES ssl crypto) + include(FetchContent) FetchContent_Declare(Kylin GIT_REPOSITORY https://gitea.amass.fun/amass/Kylin.git diff --git a/Server/SharedState.cpp b/Server/Application.cpp similarity index 64% rename from Server/SharedState.cpp rename to Server/Application.cpp index aa44e96..8525ca9 100644 --- a/Server/SharedState.cpp +++ b/Server/Application.cpp @@ -1,214 +1,191 @@ -#include "SharedState.h" -#include "Database.h" -#include "DateTime.h" -#include "HttpSession.h" -#include "ServiceLogic.h" -#include "ServiceManager.h" -#include "WeChatContext/CorporationContext.h" -#include "WebsocketSession.h" - -SharedState::SharedState(boost::asio::io_context &ioContext, std::string doc_root) - : m_ioContext(ioContext), m_router{std::make_shared>()}, - m_docRoot(std::move(doc_root)), m_timer(ioContext) { - alarmTask(); - - m_router->insert("/{path*}",[this](HttpSession &session, const Request &request, const boost::urls::matches &matches) { - using namespace boost::beast; - boost::urls::url_view view(request.target()); - auto target = view.path(); - LOG(info) << target; - if (target.find("..") != boost::beast::string_view::npos) { - session.reply(ServiceLogic::badRequest(request, "Illegal request-target")); - return; - } - std::string path = ResponseUtility::pathCat(m_docRoot, target); - if (target.back() == '/') path.append("index.html"); - if (std::filesystem::is_directory(path)) path.append("/index.html"); - boost::beast::error_code ec; - http::file_body::value_type body; - body.open(path.c_str(), boost::beast::file_mode::scan, ec); - if (ec == boost::beast::errc::no_such_file_or_directory) { - std::ostringstream oss; - oss << "The resource '" << target << "' was not found."; - LOG(error) << oss.str(); - session.errorReply(request, http::status::not_found, oss.str()); - return; - } else if (ec) { - session.reply(ServiceLogic::serverError(request, ec.message())); - return; - } - auto const size = body.size(); - http::response res{std::piecewise_construct, std::make_tuple(std::move(body)), - std::make_tuple(http::status::ok, request.version())}; - res.set(http::field::server, BOOST_BEAST_VERSION_STRING); - res.set(http::field::content_type, ResponseUtility::mimeType(path)); - res.content_length(size); - res.keep_alive(request.keep_alive()); - session.reply(std::move(res)); - }); - - m_router->insert("/wechat/{session*}", - [this](HttpSession &session, const Request &request, const boost::urls::matches &matches) { - ServiceLogic::onWechat(shared_from_this(), request, - [&session](auto &&response) { session.reply(std::move(response)); }); - }); - - m_router->insert("/api/v1/tasklist",[this](HttpSession &session, const Request &request, const boost::urls::matches &matches) { - using namespace boost::beast; - auto database = Amass::Singleton::instance(); - auto tasks = database->tasks(); - - http::response s{boost::beast::http::status::ok, request.version()}; - s.set(http::field::server, BOOST_BEAST_VERSION_STRING); - s.set(http::field::content_type, "application/json;charset=UTF-8"); - s.keep_alive(request.keep_alive()); - s.body() = boost::json::serialize(tasks); - s.prepare_payload(); - session.reply(std::move(s)); - }); - - m_router->insert("/api/v1/task/add", [this](HttpSession &session, const Request &request, const boost::urls::matches &matches) { - using namespace boost::beast; - LOG(info) << "add task: " << request.body(); - auto database = Amass::Singleton::instance(); - auto rootJson = boost::json::parse(request.body()); - auto &root = rootJson.as_object(); - - std::string content; - if (root.contains("content")) { - content = root.at("content").as_string(); - } - bool ret = database->addTask(root.at("createTime").as_int64(), content, - std::string(root.at("comment").as_string()), root.at("parentId").as_int64()); - boost::json::object reply; - reply["status"] = ret ? 0 : -1; - http::response s{boost::beast::http::status::ok, request.version()}; - s.set(http::field::server, BOOST_BEAST_VERSION_STRING); - s.set(http::field::content_type, "application/json;charset=UTF-8"); - s.keep_alive(request.keep_alive()); - s.body() = boost::json::serialize(reply); - s.prepare_payload(); - session.reply(std::move(s)); - }); - - m_router->insert("/api/v1/task/delete/{id}", [this](HttpSession &session, const Request &request, const boost::urls::matches &matches) { - using namespace boost::beast; - LOG(info) << "delete task: " << matches.at("id"); - auto database = Amass::Singleton::instance(); - auto status = database->removeTask(std::stoi(matches.at("id")) ); - - boost::json::object reply; - reply["status"] = status? 0 : -1; - http::response s{boost::beast::http::status::ok, request.version()}; - s.set(http::field::server, BOOST_BEAST_VERSION_STRING); - s.set(http::field::content_type, "application/json;charset=UTF-8"); - s.keep_alive(request.keep_alive()); - s.body() = boost::json::serialize(reply); - s.prepare_payload(); - session.reply(std::move(s)); - }); - - m_router->insert("/trigger-ci.hook", [this](HttpSession &session, const Request &request, const boost::urls::matches &matches) { - LOG(info) << "webhook: " << request; - session.reply(ServiceLogic::make_200(request, "Main page\n", "text/html")); - }); - - m_router->insert("/notify", [this](HttpSession &session, const Request &request, const boost::urls::matches &matches) { - auto corp = Amass::Singleton::instance(); - corp->notify(request); - session.reply( - ServiceLogic::make_200(request, "notify successed.\n", "text/html")); - }); -} - -const SharedState::Handler *SharedState::find(boost::urls::segments_encoded_view path, - boost::urls::matches_base &matches) const noexcept { - return m_router->find(path, matches); -} - -std::string_view SharedState::galleryRoot() const noexcept { - return m_galleryRoot; -} - -void SharedState::setFileRoot(const std::string_view &root) { - if (m_fileRoot == root) return; - m_fileRoot = root; -} - -void SharedState::join(WebSocketSession *session) { - std::lock_guard lock(mutex_); - sessions_.insert(session); -} - -void SharedState::leave(WebSocketSession *session) { - std::lock_guard lock(mutex_); - sessions_.erase(session); -} - -void SharedState::send(std::string message) { - // Put the message in a shared pointer so we can re-use it for each client - auto const ss = std::make_shared(std::move(message)); - - // Make a local list of all the weak pointers representing - // the sessions, so we can do the actual sending without - // holding the mutex: - std::vector> v; - { - std::lock_guard lock(mutex_); - v.reserve(sessions_.size()); - for (auto p : sessions_) v.emplace_back(p->weak_from_this()); - } - - // For each session in our local list, try to acquire a strong - // pointer. If successful, then send the message on that session. - for (auto const &wp : v) - if (auto sp = wp.lock()) sp->send(ss); -} - -void SharedState::alarmTask() { - int hour = 10; - int minute = 30; - auto alarmTime = DateTime::currentDateTime(); - alarmTime.setHour(hour); - alarmTime.setMinute(minute); - if (std::chrono::system_clock::now() > alarmTime()) { - alarmTime = alarmTime.tomorrow(); - } - m_timer.expires_at(alarmTime()); - m_timer.async_wait([this](const boost::system::error_code &error) mutable { - if (error) { - LOG(error) << error.message(); - return; - } - auto database = Amass::Singleton::instance(); - auto tasks = database->tasks(); - bool founded = false; - std::string content; - for (auto &task : tasks) { - if (founded) break; - for (auto &child : task.children) { - if (!child.finished) { - content = child.content; - founded = true; - break; - } - } - if (!founded && !task.finished) { - content = task.content; - founded = true; - } - } - if (founded) { - std::ostringstream oss; - oss << "待完成事项:" << std::endl; - oss << "==========" << std::endl; - oss << content << std::endl; - oss << "==========" << std::endl; - oss << "每天都要过得充实开心哦~"; - auto manager = Amass::Singleton::instance(); - if (manager) manager->sendMessage(NotifyServerChan, oss.str()); - } - - alarmTask(); - }); -} +#include "Application.h" +#include "Database.h" +#include "DateTime.h" +#include "HttpSession.h" +#include "IoContext.h" +#include "ServiceLogic.h" +#include "ServiceManager.h" +#include "WeChatContext/CorporationContext.h" + +Application::Application(const std::string &path) + : ApplicationSettings(path), m_router{std::make_shared>()} { + + // clang-format off + m_router->insert("/{path*}",[this](HttpSession &session, const Request &request, const boost::urls::matches &matches) { + using namespace boost::beast; + boost::urls::url_view view(request.target()); + auto target = view.path(); + LOG(info) << target; + if (target.find("..") != boost::beast::string_view::npos) { + session.reply(ServiceLogic::badRequest(request, "Illegal request-target")); + return; + } + std::string path = ResponseUtility::pathCat(getDocumentRoot(), target); + if (target.back() == '/') path.append("index.html"); + if (std::filesystem::is_directory(path)) path.append("/index.html"); + boost::beast::error_code ec; + http::file_body::value_type body; + body.open(path.c_str(), boost::beast::file_mode::scan, ec); + if (ec == boost::beast::errc::no_such_file_or_directory) { + std::ostringstream oss; + oss << "The resource '" << target << "' was not found."; + LOG(error) << oss.str(); + session.errorReply(request, http::status::not_found, oss.str()); + return; + } else if (ec) { + session.reply(ServiceLogic::serverError(request, ec.message())); + return; + } + auto const size = body.size(); + http::response res{std::piecewise_construct, std::make_tuple(std::move(body)), + std::make_tuple(http::status::ok, request.version())}; + res.set(http::field::server, BOOST_BEAST_VERSION_STRING); + res.set(http::field::content_type, ResponseUtility::mimeType(path)); + res.content_length(size); + res.keep_alive(request.keep_alive()); + session.reply(std::move(res)); + }); + + m_router->insert("/wechat/{session*}",[this](HttpSession &session, const Request &request, const boost::urls::matches &matches) { + ServiceLogic::onWechat(shared_from_this(), request, + [&session](auto &&response) { session.reply(std::move(response)); }); + }); + + m_router->insert("/api/v1/tasklist", [this](HttpSession &session, const Request &request, const boost::urls::matches &matches) { + using namespace boost::beast; + auto database = Amass::Singleton::instance(); + auto tasks = database->tasks(); + + http::response s{boost::beast::http::status::ok, request.version()}; + s.set(http::field::server, BOOST_BEAST_VERSION_STRING); + s.set(http::field::content_type, "application/json;charset=UTF-8"); + s.keep_alive(request.keep_alive()); + s.body() = boost::json::serialize(tasks); + s.prepare_payload(); + session.reply(std::move(s)); + }); + + m_router->insert("/api/v1/task/add", [this](HttpSession &session, const Request &request, const boost::urls::matches &matches) { + using namespace boost::beast; + LOG(info) << "add task: " << request.body(); + auto database = Amass::Singleton::instance(); + auto rootJson = boost::json::parse(request.body()); + auto &root = rootJson.as_object(); + + std::string content; + if (root.contains("content")) { + content = root.at("content").as_string(); + } + bool ret = database->addTask(root.at("createTime").as_int64(), content, + std::string(root.at("comment").as_string()), root.at("parentId").as_int64()); + boost::json::object reply; + reply["status"] = ret ? 0 : -1; + http::response s{boost::beast::http::status::ok, request.version()}; + s.set(http::field::server, BOOST_BEAST_VERSION_STRING); + s.set(http::field::content_type, "application/json;charset=UTF-8"); + s.keep_alive(request.keep_alive()); + s.body() = boost::json::serialize(reply); + s.prepare_payload(); + session.reply(std::move(s)); + }); + + m_router->insert("/api/v1/task/delete/{id}", [this](HttpSession &session, const Request &request,const boost::urls::matches &matches) { + using namespace boost::beast; + LOG(info) << "delete task: " << matches.at("id"); + auto database = Amass::Singleton::instance(); + auto status = database->removeTask(std::stoi(matches.at("id"))); + + boost::json::object reply; + reply["status"] = status ? 0 : -1; + http::response s{boost::beast::http::status::ok, request.version()}; + s.set(http::field::server, BOOST_BEAST_VERSION_STRING); + s.set(http::field::content_type, "application/json;charset=UTF-8"); + s.keep_alive(request.keep_alive()); + s.body() = boost::json::serialize(reply); + s.prepare_payload(); + session.reply(std::move(s)); + }); + + m_router->insert("/trigger-ci.hook", [this](HttpSession &session, const Request &request, const boost::urls::matches &matches) { + LOG(info) << "webhook: " << request; + session.reply(ServiceLogic::make_200(request, "Main page\n", "text/html")); + }); + + m_router->insert("/notify", [this](HttpSession &session, const Request &request, const boost::urls::matches &matches) { + auto corp = Amass::Singleton::instance(); + corp->notify(request); + session.reply( + ServiceLogic::make_200(request, "notify successed.\n", "text/html")); + }); + // clang-format on + + m_ioContext = Amass::Singleton::instance(getThreads()); + m_timer = std::make_shared(*m_ioContext->ioContext()); + + alarmTask(); +} + +boost::asio::io_context &Application::ioContext() { + return *m_ioContext->ioContext(); +} + +const Application::RequestHandler *Application::find(boost::urls::segments_encoded_view path, + boost::urls::matches_base &matches) const noexcept { + return m_router->find(path, matches); +} + +int Application::exec() { + LOG(info) << "application start successful ..."; + startCheckInterval(*m_ioContext->ioContext(), 2); + m_ioContext->run(); + LOG(info) << "application exit successful ..."; + return m_status; +} + +void Application::alarmTask() { + int hour = 10; + int minute = 30; + auto alarmTime = DateTime::currentDateTime(); + alarmTime.setHour(hour); + alarmTime.setMinute(minute); + if (std::chrono::system_clock::now() > alarmTime()) { + alarmTime = alarmTime.tomorrow(); + } + m_timer->expires_at(alarmTime()); + m_timer->async_wait([this](const boost::system::error_code &error) mutable { + if (error) { + LOG(error) << error.message(); + return; + } + auto database = Amass::Singleton::instance(); + auto tasks = database->tasks(); + bool founded = false; + std::string content; + for (auto &task : tasks) { + if (founded) break; + for (auto &child : task.children) { + if (!child.finished) { + content = child.content; + founded = true; + break; + } + } + if (!founded && !task.finished) { + content = task.content; + founded = true; + } + } + if (founded) { + std::ostringstream oss; + oss << "待完成事项:" << std::endl; + oss << "==========" << std::endl; + oss << content << std::endl; + oss << "==========" << std::endl; + oss << "每天都要过得充实开心哦~"; + auto manager = Amass::Singleton::instance(); + if (manager) manager->sendMessage(NotifyServerChan, oss.str()); + } + + alarmTask(); + }); +} \ No newline at end of file diff --git a/Server/Application.h b/Server/Application.h new file mode 100644 index 0000000..69d05bd --- /dev/null +++ b/Server/Application.h @@ -0,0 +1,45 @@ +#ifndef __SETTINGS_H__ +#define __SETTINGS_H__ + +#include "ApplicationSettings.h" +#include "Singleton.h" +#include "router.hpp" +#include +#include +#include + +class HttpSession; +class ChatRoom; +class IoContext; + +class Application : public ApplicationSettings, public std::enable_shared_from_this { +public: + using Pointer = std::shared_ptr; + using Request = boost::beast::http::request; + using RequestHandler = std::function; + + BUILD_SETTING(std::string, Server, "0.0.0.0"); + BUILD_SETTING(uint16_t, Port, 8081); + BUILD_SETTING(uint32_t, Threads, std::thread::hardware_concurrency()); + BUILD_SETTING(std::string, DocumentRoot, "."); + + INITIALIZE_FIELDS(Server, Port, Threads, DocumentRoot); + Application(const std::string &path); + boost::asio::io_context &ioContext(); + int exec(); + + const RequestHandler *find(boost::urls::segments_encoded_view path, + boost::urls::matches_base &matches) const noexcept; + +protected: + void alarmTask(); + +private: + int m_status = 0; + std::shared_ptr m_ioContext; + std::shared_ptr> m_router; + std::shared_ptr m_timer; + std::shared_ptr m_charRoom; +}; + +#endif // __SETTINGS_H__ \ No newline at end of file diff --git a/Server/CMakeLists.txt b/Server/CMakeLists.txt index 49f8c73..1560e18 100644 --- a/Server/CMakeLists.txt +++ b/Server/CMakeLists.txt @@ -1,16 +1,17 @@ find_package(Boost COMPONENTS program_options json REQUIRED) +add_subdirectory(ChatRoom) add_subdirectory(Database) +add_subdirectory(WebRTC) add_executable(Server main.cpp + Application.h Application.cpp HttpSession.h HttpSession.cpp Listener.h Listener.cpp ResponseUtility.h ResponseUtility.cpp ServiceLogic.h ServiceLogic.inl ServiceLogic.cpp ServiceManager.h - SharedState.h SharedState.cpp UdpServer.h UdpServer.cpp - WebsocketSession.h WebsocketSession.cpp WeChatContext/CorporationContext.h WeChatContext/CorporationContext.cpp WeChatContext/WeChatContext.h WeChatContext/WeChatContext.cpp WeChatContext/WeChatSession.h WeChatContext/WeChatSession.cpp diff --git a/Server/ChatRoom/CMakeLists.txt b/Server/ChatRoom/CMakeLists.txt new file mode 100644 index 0000000..d3d95aa --- /dev/null +++ b/Server/ChatRoom/CMakeLists.txt @@ -0,0 +1,10 @@ + + +add_library(ChatRoom + ChatRoom.h ChatRoom.cpp + WebSocketChatSession.h WebSocketChatSession.cpp +) + +target_link_libraries(ChatRoom + PUBLIC Universal +) \ No newline at end of file diff --git a/Server/ChatRoom/ChatRoom.cpp b/Server/ChatRoom/ChatRoom.cpp new file mode 100644 index 0000000..f85e430 --- /dev/null +++ b/Server/ChatRoom/ChatRoom.cpp @@ -0,0 +1,31 @@ +#include "ChatRoom.h" +#include "WebSocketChatSession.h" + +void ChatRoom::join(WebSocketSession *session) { + std::lock_guard lock(m_mutex); + m_sessions.insert(session); +} + +void ChatRoom::leave(WebSocketSession *session) { + std::lock_guard lock(m_mutex); + m_sessions.erase(session); +} + +void ChatRoom::send(std::string message) { + auto const ss = std::make_shared(std::move(message)); + + // Make a local list of all the weak pointers representing + // the sessions, so we can do the actual sending without + // holding the mutex: + std::vector> v; + { + std::lock_guard lock(m_mutex); + v.reserve(m_sessions.size()); + for (auto p : m_sessions) v.emplace_back(p->weak_from_this()); + } + + // For each session in our local list, try to acquire a strong + // pointer. If successful, then send the message on that session. + for (auto const &wp : v) + if (auto sp = wp.lock()) sp->send(ss); +} \ No newline at end of file diff --git a/Server/ChatRoom/ChatRoom.h b/Server/ChatRoom/ChatRoom.h new file mode 100644 index 0000000..186c6db --- /dev/null +++ b/Server/ChatRoom/ChatRoom.h @@ -0,0 +1,23 @@ +#ifndef __CHATROOM_H__ +#define __CHATROOM_H__ + +#include "Singleton.h" +#include +#include + +class WebSocketSession; + +class ChatRoom { +public: + void join(WebSocketSession *session); + void leave(WebSocketSession *session); + /** + * @brief Broadcast a message to all websocket client sessions + */ + void send(std::string message); + +private: + std::mutex m_mutex; + std::unordered_set m_sessions; +}; +#endif // __CHATROOM_H__ \ No newline at end of file diff --git a/Server/WebsocketSession.cpp b/Server/ChatRoom/WebSocketChatSession.cpp similarity index 71% rename from Server/WebsocketSession.cpp rename to Server/ChatRoom/WebSocketChatSession.cpp index 09c528e..8080403 100644 --- a/Server/WebsocketSession.cpp +++ b/Server/ChatRoom/WebSocketChatSession.cpp @@ -1,45 +1,50 @@ -#include "WebsocketSession.h" +#include "WebSocketChatSession.h" +#include "ChatRoom.h" +#include "StringUtility.h" +#include +#include #include -WebSocketSession::WebSocketSession(boost::asio::ip::tcp::socket &&socket, std::shared_ptr const &state) - : m_ws(std::move(socket)), m_state(state) {} +WebSocketSession::WebSocketSession(boost::asio::ip::tcp::socket &&socket) : m_ws(std::move(socket)) { +} WebSocketSession::~WebSocketSession() { - // Remove this session from the list of active sessions - m_state->leave(this); + auto chatRoom = Amass::Singleton::instance(); + chatRoom->leave(this); } void WebSocketSession::onAccept(boost::beast::error_code ec) { - // Handle the error, if any if (ec) { if (ec == boost::asio::error::operation_aborted || ec == boost::beast::websocket::error::closed) return; std::cerr << "accept: " << ec.message() << "\n"; return; } + LOG(info) << "accept websocket target: " << m_target; + if (m_target.find("/webrtc") == 0) { + auto splits = Amass::StringUtility::split(m_target, "/"); + if (!splits.empty()) m_id = splits.back(); + } - // Add this session to the list of active sessions - m_state->join(this); + auto chatRoom = Amass::Singleton::instance(); + chatRoom->join(this); // Read a message m_ws.async_read(m_buffer, boost::beast::bind_front_handler(&WebSocketSession::on_read, shared_from_this())); } void WebSocketSession::on_read(boost::beast::error_code ec, std::size_t) { - // Handle the error, if any if (ec) { // Don't report these if (ec == boost::asio::error::operation_aborted || ec == boost::beast::websocket::error::closed) return; LOG(error) << "read: " << ec.message(); return; } - LOG(info) << boost::beast::buffers_to_string(m_buffer.data()); - // Send to all connections - m_state->send(boost::beast::buffers_to_string(m_buffer.data())); + auto message = boost::beast::buffers_to_string(m_buffer.data()); + LOG(info) << message; + auto chatRoom = Amass::Singleton::instance(); + chatRoom->send(message); // Send to all connections - // Clear the buffer - m_buffer.consume(m_buffer.size()); - - // Read another message + m_buffer.consume(m_buffer.size()); // Clear the buffer m_ws.async_read(m_buffer, boost::beast::bind_front_handler(&WebSocketSession::on_read, shared_from_this())); } diff --git a/Server/WebsocketSession.h b/Server/ChatRoom/WebSocketChatSession.h similarity index 87% rename from Server/WebsocketSession.h rename to Server/ChatRoom/WebSocketChatSession.h index 7f56bd3..05f3a27 100644 --- a/Server/WebsocketSession.h +++ b/Server/ChatRoom/WebSocketChatSession.h @@ -2,22 +2,18 @@ #define WEBSOCKETSESSION_H #include "BoostLog.h" -#include "SharedState.h" #include #include #include #include #include -class SharedState; - /** * @brief Represents an active WebSocket connection to the server */ class WebSocketSession : public std::enable_shared_from_this { - public: - WebSocketSession(boost::asio::ip::tcp::socket &&socket, std::shared_ptr const &state); + WebSocketSession(boost::asio::ip::tcp::socket &&socket); ~WebSocketSession(); @@ -27,7 +23,7 @@ public: using namespace boost::beast::websocket; // Set suggested timeout settings for the websocket m_ws.set_option(stream_base::timeout::suggested(boost::beast::role_type::server)); - + m_target = request.target(); // Set a decorator to change the Server of the handshake m_ws.set_option(stream_base::decorator([](response_type &response) { response.set(field::server, std::string(BOOST_BEAST_VERSION_STRING) + " websocket-chat-multi"); @@ -47,10 +43,11 @@ protected: void onSend(std::shared_ptr const &ss); private: + std::string m_target; boost::beast::flat_buffer m_buffer; boost::beast::websocket::stream m_ws; - std::shared_ptr m_state; std::vector> m_queue; + std::string m_id; }; #endif // WEBSOCKETSESSION_H diff --git a/Server/HttpSession.cpp b/Server/HttpSession.cpp index 364be5f..6b8f134 100644 --- a/Server/HttpSession.cpp +++ b/Server/HttpSession.cpp @@ -1,14 +1,12 @@ #include "HttpSession.h" -#include "WebsocketSession.h" +#include "Application.h" #include #include #include #include #include -HttpSession::HttpSession(boost::asio::ip::tcp::socket &&socket, const std::shared_ptr &state) - : m_stream(std::move(socket)), m_state(state) { - // m_buffer.reserve(1000 * 1000 * 1000); +HttpSession::HttpSession(boost::asio::ip::tcp::socket &&socket) : m_stream(std::move(socket)) { } void HttpSession::run() { @@ -62,22 +60,17 @@ void HttpSession::onRead(boost::beast::error_code ec, std::size_t) { } auto &request = m_parser->get(); - // See if it is a WebSocket Upgrade - if (websocket::is_upgrade(request)) { - // Create a websocket session, transferring ownership - // of both the socket and the HTTP request. - auto session = std::make_shared(m_stream.release_socket(), m_state); - session->run(m_parser->release()); - return; - } auto path = boost::urls::parse_path(request.target()); if (!path) { LOG(error) << request.target() << "failed, error: " << path.error().message(); errorReply(request, http::status::bad_request, "Illegal request-target"); return; } + + auto application = Amass::Singleton::instance(); + boost::urls::matches matches; - auto handler = m_state->find(*path, matches); + auto handler = application->find(*path, matches); if (handler) { (*handler)(*this, request, matches); } else { diff --git a/Server/HttpSession.h b/Server/HttpSession.h index d7d249d..1f58398 100644 --- a/Server/HttpSession.h +++ b/Server/HttpSession.h @@ -1,7 +1,6 @@ #ifndef HTTPSESSION_H #define HTTPSESSION_H -#include "SharedState.h" #include "boost/beast.hpp" #include #include @@ -18,7 +17,7 @@ class HttpSession : public std::enable_shared_from_this { public: using Request = boost::beast::http::request; - HttpSession(boost::asio::ip::tcp::socket &&socket, std::shared_ptr const &state); + HttpSession(boost::asio::ip::tcp::socket &&socket); template void reply(Response &&response) { using ResponseType = typename std::decay_t; @@ -35,7 +34,6 @@ public: private: boost::beast::tcp_stream m_stream; boost::beast::flat_buffer m_buffer{std::numeric_limits::max()}; - SharedStatePtr m_state; std::optional> m_parser; }; diff --git a/Server/Listener.cpp b/Server/Listener.cpp index c2de3c0..55aae03 100644 --- a/Server/Listener.cpp +++ b/Server/Listener.cpp @@ -3,9 +3,8 @@ #include #include -Listener::Listener(boost::asio::io_context &ioc, boost::asio::ip::tcp::endpoint endpoint, - std::shared_ptr const &state) - : m_ioContext(ioc), m_acceptor(ioc), m_state(state) { +Listener::Listener(boost::asio::io_context &ioc, boost::asio::ip::tcp::endpoint endpoint) + : m_ioContext(ioc), m_acceptor(ioc) { boost::beast::error_code ec; // Open the acceptor @@ -57,7 +56,7 @@ void Listener::onAccept(boost::beast::error_code ec, std::shared_ptr(std::move(*socket), m_state); + auto session = std::make_shared(std::move(*socket)); session->run(); } startAccept(); diff --git a/Server/Listener.h b/Server/Listener.h index 6357222..081e746 100644 --- a/Server/Listener.h +++ b/Server/Listener.h @@ -6,19 +6,12 @@ #include #include -class SharedState; - -// Accepts incoming connections and launches the sessions class Listener : public std::enable_shared_from_this { public: - Listener(boost::asio::io_context &ioc, boost::asio::ip::tcp::endpoint endpoint, - std::shared_ptr const &state); + Listener(boost::asio::io_context &ioc, boost::asio::ip::tcp::endpoint endpoint); // Start accepting incoming connections void startAccept(); - inline std::shared_ptr state() const { - return m_state; - } protected: void fail(boost::beast::error_code ec, char const *what); @@ -27,7 +20,6 @@ protected: private: boost::asio::io_context &m_ioContext; boost::asio::ip::tcp::acceptor m_acceptor; - std::shared_ptr m_state; }; #endif // LISTENER_H diff --git a/Server/ServiceLogic.h b/Server/ServiceLogic.h index 9da759b..4fa8ddb 100644 --- a/Server/ServiceLogic.h +++ b/Server/ServiceLogic.h @@ -1,8 +1,8 @@ #ifndef SERVICELOGIC_H #define SERVICELOGIC_H +#include "Application.h" #include "ResponseUtility.h" -#include "SharedState.h" #include "StringUtility.h" #include #include @@ -17,16 +17,7 @@ using StringRequest = boost::beast::http::request -static void onGetBlogList(SharedStatePtr state, StringRequest &&request, Send &&send); - -template -static void onGetBlogContent(SharedStatePtr state, StringRequest &&request, Send &&send, const std::string &prefix); - -template -static void onGetBlogImage(SharedStatePtr state, StringRequest &&request, Send &&send); - -template -static void onWechat(SharedStatePtr state, StringRequest &&request, Send &&send); +static void onWechat(const Application::Pointer &app, StringRequest &&request, Send &&send); // Returns a server error response boost::beast::http::response diff --git a/Server/ServiceLogic.inl b/Server/ServiceLogic.inl index 71f5371..2869352 100644 --- a/Server/ServiceLogic.inl +++ b/Server/ServiceLogic.inl @@ -15,7 +15,7 @@ namespace ServiceLogic { template -static void onWechat(SharedStatePtr state, const StringRequest &request, Send &&send) { +static void onWechat(const Application::Pointer &app, const StringRequest &request, Send &&send) { using namespace boost::beast; boost::urls::url url(request.target()); auto context = Amass::Singleton::instance(); diff --git a/Server/SharedState.h b/Server/SharedState.h deleted file mode 100644 index 4a68a1f..0000000 --- a/Server/SharedState.h +++ /dev/null @@ -1,65 +0,0 @@ -#ifndef SHAREDSTATE_H -#define SHAREDSTATE_H - -#include "router.hpp" -#include -#include -#include -#include -#include -#include -#include -#include - -class HttpSession; -class WebSocketSession; - -// Represents the shared server state -class SharedState : public std::enable_shared_from_this { - - // This mutex synchronizes all access to sessions_ - std::mutex mutex_; - - // Keep a list of all the connected clients - std::unordered_set sessions_; - -public: - using Request = boost::beast::http::request; - using Handler = std::function; - SharedState(boost::asio::io_context &ioContext, std::string doc_root); - - const Handler *find(boost::urls::segments_encoded_view path, boost::urls::matches_base &matches) const noexcept; - - inline std::string_view docRoot() const noexcept { - return m_docRoot; - } - std::string_view galleryRoot() const noexcept; - - inline std::string_view fileRoot() const { - return m_fileRoot; - } - void setFileRoot(const std::string_view &root); - - void join(WebSocketSession *session); - void leave(WebSocketSession *session); - - /** - * @brief Broadcast a message to all websocket client sessions - */ - void send(std::string message); - -protected: - void alarmTask(); - -private: - boost::asio::io_context &m_ioContext; - std::shared_ptr> m_router; - std::string m_docRoot; - std::string m_galleryRoot = "/root/photos"; - std::string m_fileRoot; - boost::asio::system_timer m_timer; -}; - -using SharedStatePtr = std::shared_ptr; - -#endif // SHAREDSTATE_H diff --git a/Server/WebRTC/CMakeLists.txt b/Server/WebRTC/CMakeLists.txt new file mode 100644 index 0000000..c6b041f --- /dev/null +++ b/Server/WebRTC/CMakeLists.txt @@ -0,0 +1,8 @@ + +add_library(WebRTC + WebSocketSignalSession.h WebSocketSignalSession.cpp +) + +target_link_libraries(WebRTC + PUBLIC Universal +) \ No newline at end of file diff --git a/Server/WebRTC/SignalServer.cpp b/Server/WebRTC/SignalServer.cpp new file mode 100644 index 0000000..6e3a1e0 --- /dev/null +++ b/Server/WebRTC/SignalServer.cpp @@ -0,0 +1,19 @@ +#include "SignalServer.h" + +void SignalServer::join(const std::string &id, WebSocketSignalSession *client) { + m_clients.insert({id, client}); +} + +void SignalServer::leave(const std::string &id) { + if (m_clients.contains(id)) { + m_clients.erase(id); + } +} + +WebSocketSignalSession *SignalServer::client(const std::string &id) { + WebSocketSignalSession *ret; + if (m_clients.contains(id)) { + ret = m_clients.at(id); + } + return ret; +} \ No newline at end of file diff --git a/Server/WebRTC/SignalServer.h b/Server/WebRTC/SignalServer.h new file mode 100644 index 0000000..6d513bc --- /dev/null +++ b/Server/WebRTC/SignalServer.h @@ -0,0 +1,20 @@ +#ifndef __SIGNALSERVER_H__ +#define __SIGNALSERVER_H__ + +#include "Singleton.h" +#include +#include + +class WebSocketSignalSession; + +class SignalServer { +public: + void join(const std::string &id, WebSocketSignalSession *client); + void leave(const std::string &id); + WebSocketSignalSession *client(const std::string &id); + +private: + std::unordered_map m_clients; +}; + +#endif // __SIGNALSERVER_H__ \ No newline at end of file diff --git a/Server/WebRTC/WebSocketSignalSession.cpp b/Server/WebRTC/WebSocketSignalSession.cpp new file mode 100644 index 0000000..4acc2ba --- /dev/null +++ b/Server/WebRTC/WebSocketSignalSession.cpp @@ -0,0 +1,102 @@ +#include "WebSocketSignalSession.h" +#include "SignalServer.h" +#include "StringUtility.h" +#include +#include + +WebSocketSignalSession::WebSocketSignalSession(boost::asio::ip::tcp::socket &&socket) : m_ws(std::move(socket)) { +} + +WebSocketSignalSession::~WebSocketSignalSession() { + auto server = Amass::Singleton::instance(); + server->leave(m_id); +} + +void WebSocketSignalSession::onAccept(boost::beast::error_code ec) { + if (ec) { + if (ec == boost::asio::error::operation_aborted || ec == boost::beast::websocket::error::closed) return; + std::cerr << "accept: " << ec.message() << "\n"; + return; + } + LOG(info) << "accept websocket target: " << m_target; + if (m_target.find("/webrtc") == 0) { + auto splits = Amass::StringUtility::split(m_target, "/"); + if (!splits.empty()) m_id = splits.back(); + } + + auto server = Amass::Singleton::instance(); + server->join(m_id, this); + + // Read a message + m_ws.async_read(m_buffer, boost::beast::bind_front_handler(&WebSocketSignalSession::on_read, shared_from_this())); +} + +void WebSocketSignalSession::on_read(boost::beast::error_code ec, std::size_t) { + // Handle the error, if any + if (ec) { + // Don't report these + if (ec == boost::asio::error::operation_aborted || ec == boost::beast::websocket::error::closed) return; + LOG(error) << "read: " << ec.message(); + return; + } + auto message = boost::beast::buffers_to_string(m_buffer.data()); + LOG(info) << message; + auto rootObject = boost::json::parse(message); + auto &root = rootObject.as_object(); + if (root.contains("id")) { + if (!root.at("id").is_string()) { + LOG(warning) << "wrong format."; + m_ws.close(boost::beast::websocket::close_code::normal); + return; + } + auto server = Amass::Singleton::instance(); + auto destinationId = std::string(root["id"].as_string()); + auto destination = server->client(destinationId); + if (destination == nullptr) { + LOG(info) << "client " << destinationId << " not found."; + } else { + root["id"] = m_id; + auto reply = std::make_shared(boost::json::serialize(root)); + destination->send(reply); + } + } + m_buffer.consume(m_buffer.size()); // Clear the buffer + m_ws.async_read(m_buffer, boost::beast::bind_front_handler(&WebSocketSignalSession::on_read, shared_from_this())); +} + +void WebSocketSignalSession::send(std::shared_ptr const &ss) { + // Post our work to the strand, this ensures + // that the members of `this` will not be + // accessed concurrently. + m_ws.text(); + boost::asio::post(m_ws.get_executor(), + boost::beast::bind_front_handler(&WebSocketSignalSession::onSend, shared_from_this(), ss)); +} + +void WebSocketSignalSession::onSend(std::shared_ptr const &ss) { + // Always add to queue + m_queue.push_back(ss); + + // Are we already writing? + if (m_queue.size() > 1) return; + + // We are not currently writing, so send this immediately + m_ws.async_write(boost::asio::buffer(*m_queue.front()), + boost::beast::bind_front_handler(&WebSocketSignalSession::on_write, shared_from_this())); +} + +void WebSocketSignalSession::on_write(boost::beast::error_code ec, std::size_t) { + // Handle the error, if any + if (ec) { + // Don't report these + if (ec == boost::asio::error::operation_aborted || ec == boost::beast::websocket::error::closed) return; + std::cerr << "write: " << ec.message() << "\n"; + return; + } + + // Remove the string from the queue + m_queue.erase(m_queue.begin()); + if (!m_queue.empty()) + m_ws.async_write(boost::asio::buffer(*m_queue.front()), + boost::beast::bind_front_handler(&WebSocketSignalSession::on_write, shared_from_this())); +} diff --git a/Server/WebRTC/WebSocketSignalSession.h b/Server/WebRTC/WebSocketSignalSession.h new file mode 100644 index 0000000..179fcfd --- /dev/null +++ b/Server/WebRTC/WebSocketSignalSession.h @@ -0,0 +1,53 @@ +#ifndef __WEBSOCKETSIGNALSESSION_H__ +#define __WEBSOCKETSIGNALSESSION_H__ + +#include "BoostLog.h" +#include +#include +#include +#include +#include + +/** + * @brief Represents an active WebSocket connection to the server + */ +class WebSocketSignalSession : public std::enable_shared_from_this { +public: + WebSocketSignalSession(boost::asio::ip::tcp::socket &&socket); + + ~WebSocketSignalSession(); + + template + void run(boost::beast::http::request> request) { + using namespace boost::beast::http; + using namespace boost::beast::websocket; + // Set suggested timeout settings for the websocket + m_ws.set_option(stream_base::timeout::suggested(boost::beast::role_type::server)); + m_target = request.target(); + // Set a decorator to change the Server of the handshake + m_ws.set_option(stream_base::decorator([](response_type &response) { + response.set(field::server, std::string(BOOST_BEAST_VERSION_STRING) + " websocket-chat-multi"); + })); + // LOG(info) << request.base().target(); //get path + // Accept the websocket handshake + m_ws.async_accept(request, [self{shared_from_this()}](boost::beast::error_code ec) { self->onAccept(ec); }); + } + + // Send a message + void send(std::shared_ptr const &ss); + +protected: + void onAccept(boost::beast::error_code ec); + void on_read(boost::beast::error_code ec, std::size_t bytes_transferred); + void on_write(boost::beast::error_code ec, std::size_t bytes_transferred); + void onSend(std::shared_ptr const &ss); + +private: + std::string m_target; + boost::beast::flat_buffer m_buffer; + boost::beast::websocket::stream m_ws; + std::vector> m_queue; + std::string m_id; +}; + +#endif // __WEBSOCKETSIGNALSESSION_H__ \ No newline at end of file diff --git a/Server/main.cpp b/Server/main.cpp index f6bdb99..b434c8a 100644 --- a/Server/main.cpp +++ b/Server/main.cpp @@ -1,9 +1,10 @@ +#include "Application.h" #include "BoostLog.h" +#include "Database.h" #include "IoContext.h" #include "Listener.h" #include "ProxyListener.h" #include "ServiceManager.h" -#include "SharedState.h" #include "UdpServer.h" #include "WeChatContext/CorporationContext.h" #include "WeChatContext/WeChatContext.h" @@ -11,13 +12,11 @@ #include #include #include -#include "Database.h" - -void initSettings(); int main(int argc, char const *argv[]) { + using namespace Amass; boost::log::initialize("logs/HttpServer"); - auto manager = Amass::Singleton::instance(); + auto manager = Singleton::instance(); boost::program_options::options_description description("Allowed options"); // clang-format off @@ -55,115 +54,53 @@ int main(int argc, char const *argv[]) { std::filesystem::current_path(prefix, error); LOG_IF(fatal, error) << "cannot set current path,reason: " << error.message(); } - initSettings(); - auto database = Amass::Singleton::instance(); + auto application = Singleton::instance("settings.ini"); + + auto database = Singleton::instance(); database->open("database.sqlite"); - std::string server = "0.0.0.0"; - - boost::property_tree::ptree ptree; - boost::property_tree::read_ini("settings.ini", ptree); - if (ptree.count("server") > 0) { - server = ptree.get("server"); - } - - auto port = ptree.get_optional("port"); - - auto docRoot = ptree.get("docRoot"); - if (docRoot.empty()) { - LOG(fatal) << "please set docRoot."; - std::exit(101); - } else if (!std::filesystem::exists(docRoot)) { - LOG(fatal) << "document root: " << docRoot << " is not exists..."; + if (!std::filesystem::exists(application->getDocumentRoot())) { + LOG(fatal) << "document root: " << application->getDocumentRoot() << " is not exists..."; std::exit(102); } + BOOST_ASSERT_MSG(!application->getServer().empty(), "server.empty() == true"); - auto fileRoot = ptree.get("fileRoot"); - if (fileRoot.empty()) { - LOG(fatal) << "please set fileRoot."; - std::exit(103); - } else if (!std::filesystem::exists(fileRoot)) { - LOG(fatal) << "file root: " << fileRoot << " is not exists..."; - std::exit(104); - } - - if (!port) { - LOG(fatal) << "port is not a number."; - std::exit(255); - } - auto threads = ptree.get("threads"); - if (threads <= 0 || threads > std::thread::hardware_concurrency()) threads = std::thread::hardware_concurrency(); - - BOOST_ASSERT_MSG(!server.empty(), "server.empty() == true"); - - auto ioContext = Amass::Singleton::instance(threads); - auto address = boost::asio::ip::make_address(server); - auto listener = std::make_shared(*ioContext->ioContext(), boost::asio::ip::tcp::endpoint{address, *port}, - std::make_shared(*ioContext->ioContext(), docRoot)); + auto address = boost::asio::ip::make_address(application->getServer()); + auto listener = std::make_shared(application->ioContext(), + boost::asio::ip::tcp::endpoint{address, application->getPort()}); listener->startAccept(); - auto state = listener->state(); - state->setFileRoot(fileRoot); + auto wechatContext = Singleton::instance(application->ioContext()); + auto corpContext = Singleton::instance(application->ioContext()); - auto wechatContext = Amass::Singleton::instance(*ioContext->ioContext()); - auto corpContext = Amass::Singleton::instance(*ioContext->ioContext()); - - LOG(info) << "hardware_concurrency: " << std::thread::hardware_concurrency() << ",threads: " << threads; + LOG(info) << "hardware_concurrency: " << std::thread::hardware_concurrency() + << ",threads: " << application->getThreads(); LOG(info) << "working directory: " << prefix.generic_string(); - LOG(info) << "server: " << server << ",port: " << *port; - LOG(info) << "document root: " << state->docRoot(); + LOG(info) << "server: " << application->getServer() << ",port: " << application->getPort(); + LOG(info) << "document root: " << application->getDocumentRoot(); // Capture SIGINT and SIGTERM to perform a clean shutdown #ifndef WIN32 - boost::asio::signal_set signals(*ioContext->ioContext(), SIGINT, SIGTERM, SIGHUP); + boost::asio::signal_set signals(application->ioContext(), SIGINT, SIGTERM, SIGHUP); #else - boost::asio::signal_set signals(*ioContext->ioContext(), SIGINT, SIGTERM); + boost::asio::signal_set signals(application->ioContext(), SIGINT, SIGTERM); #endif - signals.async_wait([&ioContext](boost::system::error_code const &, int signal) { + signals.async_wait([&application](boost::system::error_code const &, int signal) { // Stop the io_context. This will cause run() // to return immediately, eventually destroying the // io_context and any remaining handlers in it. LOG(info) << "capture " << (signal == SIGINT ? "SIGINT" : "SIGTERM") << ",stop!"; - ioContext->ioContext()->stop(); + application->ioContext().stop(); }); - auto udpServer = std::make_shared(*ioContext->ioContext()); + auto udpServer = std::make_shared(application->ioContext()); - auto proxyAddress = boost::asio::ip::make_address(server); + using namespace boost::asio::ip; + auto proxyAddress = make_address(application->getServer()); uint16_t proxyPort = 41091; - auto proxy = std::make_shared(*ioContext->ioContext(), - boost::asio::ip::tcp::endpoint{proxyAddress, proxyPort}); + auto proxy = std::make_shared(application->ioContext(), tcp::endpoint{proxyAddress, proxyPort}); boost::system::error_code perror; proxy->run(perror); - - LOG(info) << "server start successful ..."; - ioContext->run(); - LOG(info) << "server exit successful ..."; - return EXIT_SUCCESS; -} - -void initSettings() { - boost::property_tree::ptree ptree; - - if (std::filesystem::exists(std::filesystem::path("settings.ini"))) - boost::property_tree::read_ini("settings.ini", ptree); - - if (ptree.find("server") == ptree.not_found()) { - ptree.put("server", "0.0.0.0"); - } - if (ptree.find("port") == ptree.not_found()) { - ptree.put("port", 8083); - } - if (ptree.find("docRoot") == ptree.not_found()) { - ptree.put("docRoot", "."); - } - if (ptree.find("fileRoot") == ptree.not_found()) { - ptree.put("fileRoot", "."); - } - if (ptree.find("threads") == ptree.not_found()) { - ptree.put("threads", std::thread::hardware_concurrency()); - } - - boost::property_tree::write_ini("settings.ini", ptree); -} + return application->exec(); +} \ No newline at end of file diff --git a/resource/deploy.sh b/resource/build.sh similarity index 100% rename from resource/deploy.sh rename to resource/build.sh