From ee1aaa1bf3c2635f71583db99331d4f288403282 Mon Sep 17 00:00:00 2001 From: amass <168062547@qq.com> Date: Mon, 17 Mar 2025 20:52:00 +0800 Subject: [PATCH] =?UTF-8?q?=E9=AA=8C=E8=AF=81=E6=9C=AC=E5=9C=B0http?= =?UTF-8?q?=E5=8F=AF=E4=BB=A5=E6=9F=A5=E7=9C=8Bwebrtc.?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- Frontend/package.json | 4 +- Frontend/src/App.jsx | 8 +- Frontend/src/WebRTCClient.jsx | 210 +++++++++++++++++++++++++ Main/Application.cpp | 88 +++++++++++ Main/Application.h | 44 ++++++ Main/CMakeLists.txt | 8 + Main/HttpSession.cpp | 120 ++++++++++++++ Main/HttpSession.h | 52 ++++++ Main/ResponseUtility.cpp | 51 ++++++ Main/ResponseUtility.h | 19 +++ Main/RtspServer.cpp | 8 +- Main/ServiceLogic.cpp | 100 ++++++++++++ Main/ServiceLogic.h | 43 +++++ Main/Settings.cpp | 70 +++++++++ Main/Settings.h | 30 ++++ Main/WebRTC/SignalServer.cpp | 41 +++++ Main/WebRTC/SignalServer.h | 24 +++ Main/WebRTC/Streamer.cpp | 15 +- Main/WebRTC/Streamer.h | 3 + Main/WebRTC/WebSocketSignalSession.cpp | 109 +++++++++++++ Main/WebRTC/WebSocketSignalSession.h | 58 +++++++ Main/main.cpp | 63 ++++---- resources/build.sh | 2 +- 23 files changed, 1131 insertions(+), 39 deletions(-) create mode 100644 Frontend/src/WebRTCClient.jsx create mode 100644 Main/Application.cpp create mode 100644 Main/Application.h create mode 100644 Main/HttpSession.cpp create mode 100644 Main/HttpSession.h create mode 100644 Main/ResponseUtility.cpp create mode 100644 Main/ResponseUtility.h create mode 100644 Main/ServiceLogic.cpp create mode 100644 Main/ServiceLogic.h create mode 100644 Main/Settings.cpp create mode 100644 Main/Settings.h create mode 100644 Main/WebRTC/SignalServer.cpp create mode 100644 Main/WebRTC/SignalServer.h create mode 100644 Main/WebRTC/WebSocketSignalSession.cpp create mode 100644 Main/WebRTC/WebSocketSignalSession.h diff --git a/Frontend/package.json b/Frontend/package.json index a8eb219..321e4b0 100644 --- a/Frontend/package.json +++ b/Frontend/package.json @@ -17,11 +17,13 @@ "@babel/preset-env": "^7.26.9", "@babel/preset-react": "^7.26.3", "@babel/runtime": "^7.26.10", + "antd": "^5.24.4", "babel-loader": "^10.0.0", "react": "^18.3.1", "react-dom": "^18.3.1", + "react-router": "^7.3.0", "webpack": "^5.98.0", "webpack-cli": "^6.0.1", "webpack-dev-server": "^5.2.0" } -} \ No newline at end of file +} diff --git a/Frontend/src/App.jsx b/Frontend/src/App.jsx index eff294f..1739e41 100644 --- a/Frontend/src/App.jsx +++ b/Frontend/src/App.jsx @@ -1,9 +1,13 @@ import React from "react"; +import WebRTCClient from "./WebRTCClient"; export default function App() { - return ( + return

从 Webpack 和 Babel 开始搭建 React 项目

- ) + +
+ + } \ No newline at end of file diff --git a/Frontend/src/WebRTCClient.jsx b/Frontend/src/WebRTCClient.jsx new file mode 100644 index 0000000..1d35089 --- /dev/null +++ b/Frontend/src/WebRTCClient.jsx @@ -0,0 +1,210 @@ +import React, { useState, useRef, useEffect } from 'react'; +import { Button, Card, Row, Col, Typography, List, message } from 'antd'; +const { Text } = Typography; + +// 生成随机ID +const randomId = (length) => { + return Array.from({ length }, () => Math.random().toString(36).charAt(2)).join(''); +}; + +const WebRTCClient = () => { + const [connectionStates, setConnectionStates] = useState({ + iceConnection: 'new', + iceGathering: 'new', + signaling: 'stable', + dataChannel: [] + }); + const [sdpInfo, setSdpInfo] = useState({ offer: '', answer: '' }); + const [mediaState, setMediaState] = useState({ started: false }); + + const clientId = useRef(randomId(10)); + const websocket = useRef(null); + const pc = useRef(null); + const dc = useRef(null); + const videoRef = useRef(null); + const startTime = useRef(null); + + // WebSocket 初始化 + useEffect(() => { + websocket.current = new WebSocket(`wss://amass.fun/api/v1/webrtc/signal/${clientId.current}`); + + websocket.current.onopen = () => { + message.success('信令服务器连接成功'); + }; + + websocket.current.onmessage = async (evt) => { + if (typeof evt.data !== 'string') return; + const message = JSON.parse(evt.data); + if (message.type === "offer") { + setSdpInfo(prev => ({ ...prev, offer: message.sdp })); + handleOffer(message); + } + }; + + return () => websocket.current?.close(); + }, []); + + // 状态更新方法 + const updateConnectionState = (type, state) => { + setConnectionStates(prev => ({ + ...prev, + [type]: [...prev[type], state] + })); + }; + + // 创建PeerConnection + const createPeerConnection = () => { + const config = { + bundlePolicy: "max-bundle", + iceServers: [{ urls: ['stun:stun.l.google.com:19302'] }] + }; + + const newPc = new RTCPeerConnection(config); + + // ICE状态监听 + newPc.addEventListener('iceconnectionstatechange', () => { + updateConnectionState('iceConnection', newPc.iceConnectionState); + }); + + newPc.addEventListener('icegatheringstatechange', () => { + updateConnectionState('iceGathering', newPc.iceGatheringState); + }); + + newPc.addEventListener('signalingstatechange', () => { + updateConnectionState('signaling', newPc.signalingState); + }); + + // 媒体流处理(网页2关键逻辑) + newPc.ontrack = (evt) => { + if (videoRef.current) { + videoRef.current.srcObject = evt.streams[0]; + videoRef.current.play().catch(err => message.error('视频播放失败')); + } + }; + + // 数据通道处理(网页1实现参考) + newPc.ondatachannel = (evt) => { + dc.current = evt.channel; + dc.current.onmessage = (event) => { + if (typeof event.data === 'string') { + setConnectionStates(prev => ({ + ...prev, + dataChannel: [...prev.dataChannel, `< ${event.data}`] + })); + } + }; + }; + + return newPc; + }; + + // 处理Offer + const handleOffer = async (offer) => { + pc.current = createPeerConnection(); + await pc.current.setRemoteDescription(offer); + await sendAnswer(); + }; + + // 发送Answer + const sendAnswer = async () => { + const answer = await pc.current.createAnswer(); + await pc.current.setLocalDescription(answer); + + setSdpInfo(prev => ({ ...prev, answer: answer.sdp })); + + websocket.current.send(JSON.stringify({ + id: "server", + type: answer.type, + sdp: answer.sdp + })); + }; + + // 控制方法 + const startCall = () => { + setMediaState({ started: true }); + websocket.current.send(JSON.stringify({ id: "server", type: "request" })); + startTime.current = Date.now(); + }; + + const stopCall = () => { + if (dc.current) dc.current.close(); + if (pc.current) pc.current.close(); + setMediaState({ started: false }); + }; + + return ( + + + {/* 视频区域 */} + + + + ); +}; + +export default WebRTCClient; \ No newline at end of file diff --git a/Main/Application.cpp b/Main/Application.cpp new file mode 100644 index 0000000..4048723 --- /dev/null +++ b/Main/Application.cpp @@ -0,0 +1,88 @@ +#include "Application.h" +#include "Core/IoContext.h" +#include "Core/Logger.h" +#include "Core/Singleton.h" +#include "HttpSession.h" +#include "Router/router.hpp" +#include "ServiceLogic.h" +#include "Settings.h" +#include "WebRTC/SignalServer.h" +#include +#include + +namespace Danki { + +class ApplicationPrivate { +public: + std::shared_ptr> router; + std::shared_ptr acceptor; +}; + +Application::Application() : m_d{new ApplicationPrivate()} { + using namespace boost::urls; + using namespace Core; + m_settings = Singleton::construct(); + m_ioContext = Singleton::construct(m_settings->threads()); + m_d->router = std::make_shared>(); + m_signalServer = std::make_shared(*this); +} + +void Application::insertUrl(std::string_view url, RequestHandler &&handler) { + m_d->router->insert(url, std::move(handler)); +} + +boost::asio::io_context &Application::ioContext() { + return *m_ioContext->ioContext(); +} + +int Application::exec() { + using namespace Core; + auto settings = Singleton::instance(); + ServiceLogic::staticFilesDeploy(); + startAcceptHttpConnections(settings->server(), settings->port()); + m_ioContext->run(); + return 0; +} + +void Application::startAcceptHttpConnections(const std::string &address, uint16_t port) { + m_d->acceptor = std::make_shared(*m_ioContext->ioContext()); + boost::beast::error_code error; + boost::asio::ip::tcp::endpoint endpoint(boost::asio::ip::make_address(address), port); + m_d->acceptor->open(endpoint.protocol(), error); + if (error) { + LOG(error) << error.message(); + return; + } + m_d->acceptor->set_option(boost::asio::socket_base::reuse_address(true), error); + if (error) { + LOG(error) << error.message(); + return; + } + m_d->acceptor->bind(endpoint, error); + if (error) { + LOG(error) << error.message(); + return; + } + m_d->acceptor->listen(boost::asio::socket_base::max_listen_connections, error); + if (error) { + LOG(error) << error.message(); + return; + } + asyncAcceptHttpConnections(); +} + +void Application::asyncAcceptHttpConnections() { + auto socket = std::make_shared(boost::asio::make_strand(*m_ioContext->ioContext())); + m_d->acceptor->async_accept(*socket, [self{shared_from_this()}, socket](const boost::system::error_code &error) { + if (error) { + if (error == boost::asio::error::operation_aborted) return; + LOG(error) << error.message(); + } else { + auto session = std::make_shared(std::move(*socket), self->m_d->router); + session->run(); + } + self->asyncAcceptHttpConnections(); + }); +} + +} // namespace Danki \ No newline at end of file diff --git a/Main/Application.h b/Main/Application.h new file mode 100644 index 0000000..3e837ff --- /dev/null +++ b/Main/Application.h @@ -0,0 +1,44 @@ +#ifndef __APPLICATION_H__ +#define __APPLICATION_H__ + +#include "Router/matches.hpp" +#include + +namespace Core { +class IoContext; +} // namespace Core + +namespace boost { +namespace asio { +class io_context; +} +} // namespace boost + +namespace Danki { + +class ApplicationPrivate; +class Settings; +class HttpSession; +class SignalServer; + +class Application : public std::enable_shared_from_this { +public: + using Request = boost::beast::http::request; + using RequestHandler = std::function; + Application(); + boost::asio::io_context &ioContext(); + void insertUrl(std::string_view url, RequestHandler &&handler); + int exec(); + void startAcceptHttpConnections(const std::string &address, uint16_t port); + +protected: + void asyncAcceptHttpConnections(); + +private: + ApplicationPrivate *m_d = nullptr; + std::shared_ptr m_settings; + std::shared_ptr m_ioContext; + std::shared_ptr m_signalServer; +}; +} // namespace Danki +#endif // __APPLICATION_H__ \ No newline at end of file diff --git a/Main/CMakeLists.txt b/Main/CMakeLists.txt index 6d44bf0..c7cdec7 100644 --- a/Main/CMakeLists.txt +++ b/Main/CMakeLists.txt @@ -4,13 +4,20 @@ find_package(LibDataChannel REQUIRED) find_package(Boost COMPONENTS json REQUIRED) add_executable(PassengerStatistics main.cpp + Application.h Application.cpp Camera.h Camera.cpp + HttpSession.h HttpSession.cpp ImageUtilities.h ImageUtilities.cpp RtspServer.h RtspServer.cpp + ResponseUtility.h ResponseUtility.cpp + ServiceLogic.h ServiceLogic.cpp + Settings.h Settings.cpp VideoInput.h VideoInput.cpp WebRTC/Streamer.h WebRTC/Streamer.cpp WebRTC/Helpers.h WebRTC/Helpers.cpp + WebRTC/SignalServer.h WebRTC/SignalServer.cpp + WebRTC/WebSocketSignalSession.h WebRTC/WebSocketSignalSession.cpp ) target_include_directories(PassengerStatistics @@ -28,6 +35,7 @@ target_link_directories(PassengerStatistics target_link_libraries(PassengerStatistics PRIVATE Kylin::Core + PRIVATE Kylin::Router PRIVATE LibDataChannel::LibDataChannel PRIVATE OpenSSL::SSL PRIVATE OpenSSL::Crypto diff --git a/Main/HttpSession.cpp b/Main/HttpSession.cpp new file mode 100644 index 0000000..926e382 --- /dev/null +++ b/Main/HttpSession.cpp @@ -0,0 +1,120 @@ +#include "HttpSession.h" +#include "Core/Logger.h" +#include +#include +#include +#include +#include +#include +#include +#include + +namespace Danki { +HttpSession::HttpSession(boost::asio::ip::tcp::socket &&socket, + const std::shared_ptr> &router) + : m_stream(std::move(socket)), m_router(router) { +} + +void HttpSession::run() { + doRead(); +} + +boost::beast::tcp_stream::executor_type HttpSession::executor() { + return m_stream.get_executor(); +} + +boost::asio::ip::tcp::socket HttpSession::releaseSocket() { + return m_stream.release_socket(); +} + +void HttpSession::errorReply(const Request &request, boost::beast::http::status status, + boost::beast::string_view message) { + using namespace boost::beast; + // invalid route + http::response res{status, request.version()}; + res.set(http::field::server, BOOST_BEAST_VERSION_STRING); + res.set(http::field::content_type, "text/html"); + res.keep_alive(request.keep_alive()); + res.body() = message; + res.prepare_payload(); + + reply(std::move(res)); +} + +void HttpSession::doRead() { + // Construct a new parser for each message + m_parser.emplace(); + + // Apply a reasonable limit to the allowed size + // of the body in bytes to prevent abuse. + m_parser->body_limit(std::numeric_limits::max()); + m_parser->header_limit(std::numeric_limits::max()); + m_buffer.clear(); + + // Set the timeout. + m_stream.expires_after(std::chrono::seconds(30)); + // clang-format off + boost::beast::http::async_read(m_stream, m_buffer, *m_parser, [self{shared_from_this()}](const boost::system::error_code &ec, std::size_t bytes_transferred) { + self->onRead(ec, bytes_transferred); + }); + // clang-format on +} + +void HttpSession::onRead(const boost::beast::error_code &error, std::size_t) { + using namespace boost::beast; + if (error) { + if (error == http::error::end_of_stream) { + boost::beast::error_code e; + m_stream.socket().shutdown(boost::asio::ip::tcp::socket::shutdown_send, e); + } else if (error != boost::asio::error::operation_aborted) { + LOG(info) << error << " : " << error.message(); + } + return; + } else if (m_router.expired()) { + LOG(error) << "router is null."; + return; + } + + auto &request = m_parser->get(); + auto path = boost::urls::parse_path(request.target()); + if (!path) { + LOG(error) << request.target() << "failed, error: " << path.error().message(); + errorReply(request, http::status::bad_request, "Illegal request-target"); + return; + } + auto router = m_router.lock(); + boost::urls::matches matches; + auto handler = router->find(*path, matches); + if (handler) { + try { + (*handler)(*this, request, matches); + } catch (const std::exception &e) { + boost::stacktrace::stacktrace trace = boost::stacktrace::stacktrace::from_current_exception(); + LOG(error) << e.what() << ", trace:\n" << trace; + } + } else { + std::ostringstream oss; + oss << "The resource '" << request.target() << "' was not found."; + auto message = oss.str(); + errorReply(request, http::status::not_found, message); + LOG(error) << message; + } +} + +void HttpSession::onWrite(boost::beast::error_code ec, std::size_t, bool close) { + if (ec) { + if (ec == boost::asio::error::operation_aborted) return; + std::cerr << "write: " << ec.message() << "\n"; + } + + if (close) { + // This means we should close the connection, usually because + // the response indicated the "Connection: close" semantic. + m_stream.socket().shutdown(boost::asio::ip::tcp::socket::shutdown_send, ec); + return; + } + + // Read another request + doRead(); +} +} // namespace Danki diff --git a/Main/HttpSession.h b/Main/HttpSession.h new file mode 100644 index 0000000..1f69aae --- /dev/null +++ b/Main/HttpSession.h @@ -0,0 +1,52 @@ +#ifndef HTTPSESSION_H +#define HTTPSESSION_H + +#include "Router/router.hpp" +#include +#include +#include +#include +#include +#include +#include +#include + +namespace Danki { + +/** Represents an established HTTP connection + */ +class HttpSession : public std::enable_shared_from_this { + void doRead(); + void onWrite(boost::beast::error_code ec, std::size_t, bool close); + +public: + using Request = boost::beast::http::request; + using RequestHandler = std::function; + HttpSession(boost::asio::ip::tcp::socket &&socket, + const std::shared_ptr> &router); + template + void reply(Response &&response) { + using ResponseType = typename std::decay_t; + auto sp = std::make_shared(std::forward(response)); + boost::beast::http::async_write( + m_stream, *sp, [self = shared_from_this(), sp](boost::beast::error_code ec, std::size_t bytes) { + self->onWrite(ec, bytes, sp->need_eof()); + }); + } + void errorReply(const Request &request, boost::beast::http::status status, boost::beast::string_view message); + boost::beast::tcp_stream::executor_type executor(); + boost::asio::ip::tcp::socket releaseSocket(); + void run(); + +protected: + void onRead(const boost::beast::error_code &error, std::size_t); + +private: + boost::beast::tcp_stream m_stream; + std::weak_ptr> m_router; + boost::beast::flat_buffer m_buffer{std::numeric_limits::max()}; + std::optional> m_parser; +}; +} // namespace Danki + +#endif // HTTPSESSION_H diff --git a/Main/ResponseUtility.cpp b/Main/ResponseUtility.cpp new file mode 100644 index 0000000..dee717e --- /dev/null +++ b/Main/ResponseUtility.cpp @@ -0,0 +1,51 @@ +#include "ResponseUtility.h" +#include "boost/beast.hpp" + +namespace ResponseUtility { + +std::string_view mimeType(std::string_view path) { + using boost::beast::iequals; + auto const ext = [&path] { + auto const pos = path.rfind("."); + if (pos == std::string_view::npos) return std::string_view{}; + return path.substr(pos); + }(); + if (iequals(ext, ".pdf")) return "Application/pdf"; + if (iequals(ext, ".htm")) return "text/html"; + if (iequals(ext, ".html")) return "text/html"; + if (iequals(ext, ".php")) return "text/html"; + if (iequals(ext, ".css")) return "text/css"; + if (iequals(ext, ".txt")) return "text/plain"; + if (iequals(ext, ".js")) return "application/javascript"; + if (iequals(ext, ".json")) return "application/json"; + if (iequals(ext, ".xml")) return "application/xml"; + if (iequals(ext, ".swf")) return "application/x-shockwave-flash"; + if (iequals(ext, ".flv")) return "video/x-flv"; + if (iequals(ext, ".png")) return "image/png"; + if (iequals(ext, ".jpe")) return "image/jpeg"; + if (iequals(ext, ".jpeg")) return "image/jpeg"; + if (iequals(ext, ".jpg")) return "image/jpeg"; + if (iequals(ext, ".gif")) return "image/gif"; + if (iequals(ext, ".bmp")) return "image/bmp"; + if (iequals(ext, ".ico")) return "image/vnd.microsoft.icon"; + if (iequals(ext, ".tiff")) return "image/tiff"; + if (iequals(ext, ".tif")) return "image/tiff"; + if (iequals(ext, ".svg")) return "image/svg+xml"; + if (iequals(ext, ".svgz")) return "image/svg+xml"; + return "application/text"; +} + +std::string pathCat(std::string_view base, std::string_view path) { + if (base.empty()) return std::string(path); + std::string result(base); + char constexpr path_separator = '/'; + if (result.back() == path_separator && path.front() == path_separator) { + result.resize(result.size() - 1); + } else if (result.back() != path_separator && path.front() != path_separator) { + result.append("/"); + } + result.append(path.data(), path.size()); + + return result; +} +} // namespace ResponseUtility diff --git a/Main/ResponseUtility.h b/Main/ResponseUtility.h new file mode 100644 index 0000000..c0ff08b --- /dev/null +++ b/Main/ResponseUtility.h @@ -0,0 +1,19 @@ +#ifndef RESPONSEUTILITY_H +#define RESPONSEUTILITY_H + +#include + +namespace ResponseUtility { +/** + * @brief Return a reasonable mime type based on the extension of a file. + */ +std::string_view mimeType(std::string_view path); + +/** + * @brief Append an HTTP rel-path to a local filesystem path.The returned path is normalized for the + * platform. + */ +std::string pathCat(std::string_view base, std::string_view path); +} // namespace ResponseUtility + +#endif // RESPONSEUTILITY_H diff --git a/Main/RtspServer.cpp b/Main/RtspServer.cpp index 8fece4b..20499f6 100644 --- a/Main/RtspServer.cpp +++ b/Main/RtspServer.cpp @@ -1,4 +1,5 @@ #include "RtspServer.h" +#include "Core/Logger.h" #include #include #include @@ -39,7 +40,12 @@ RtspServer::RtspServer(boost::asio::io_context &ioContext) : m_d(new RtspServerP config.ssl_is_path = 1; mk_env_init(&config); - mk_rtsp_server_start(554, 0); + uint16_t status = mk_rtsp_server_start(554, 0); + + status = mk_rtc_server_start(7764); + if (status == 0) { + LOG(error) << "mk_rtc_server_start() failed."; + } m_d->media = mk_media_create("__defaultVhost__", "live", "video", 0, 0, 0); diff --git a/Main/ServiceLogic.cpp b/Main/ServiceLogic.cpp new file mode 100644 index 0000000..ef3cc96 --- /dev/null +++ b/Main/ServiceLogic.cpp @@ -0,0 +1,100 @@ +#include "ServiceLogic.h" +#include "Core/Logger.h" +#include "Core/Singleton.h" +#include "HttpSession.h" +#include "Settings.h" +#include +#include +#include +#include + +namespace ServiceLogic { +using namespace boost::beast; + +std::string extractToken(const std::string &cookieHeader, const std::string &tokenName = "access_token") { + // 格式示例:"access_token=abc123; Path=/; Expires=Wed, 21 Oct 2023 07:28:00 GMT" + size_t startPos = cookieHeader.find(tokenName + "="); + if (startPos == std::string::npos) { + return ""; + } + startPos += tokenName.size() + 1; // 跳过 "token_name=" + size_t endPos = cookieHeader.find(';', startPos); + if (endPos == std::string::npos) { + endPos = cookieHeader.size(); + } + std::string token = cookieHeader.substr(startPos, endPos - startPos); + + // 移除可能的引号和空格 + token.erase(std::remove(token.begin(), token.end(), '"'), token.end()); + token.erase(std::remove(token.begin(), token.end(), ' '), token.end()); + return token; +} + +boost::beast::http::response +serverError(const boost::beast::http::request &request, + std::string_view errorMessage) { + using namespace boost::beast; + http::response res{http::status::internal_server_error, request.version()}; + res.set(http::field::server, BOOST_BEAST_VERSION_STRING); + res.set(http::field::content_type, "text/html"); + res.keep_alive(request.keep_alive()); + std::ostringstream oss; + oss << "An error occurred: '" << errorMessage << "'"; + res.body() = oss.str(); + res.prepare_payload(); + return res; +} + +http::response badRequest(const http::request &request, std::string_view why) { + http::response res{http::status::bad_request, request.version()}; + res.set(http::field::server, BOOST_BEAST_VERSION_STRING); + res.set(http::field::content_type, "text/html"); + res.keep_alive(request.keep_alive()); + res.body() = std::string(why); + res.prepare_payload(); + return res; +} + +void staticFilesDeploy() { + using namespace Core; + using namespace boost::urls; + auto application = Singleton::instance(); + // clang-format off + application->insertUrl("/{path*}", [](Danki::HttpSession &session, const Danki::Application::Request &request, const matches &matches) { + using namespace boost::beast; + url_view view(request.target()); + auto target = view.path(); + LOG(info) << target; + if (target.find("..") != boost::beast::string_view::npos) { + session.reply(ServiceLogic::badRequest(request, "Illegal request-target")); + return; + } + auto settings = Singleton::instance(); + std::string path = ResponseUtility::pathCat(settings->documentRoot(), target); + if (target.back() == '/') path.append("index.html"); + if (std::filesystem::is_directory(path)) path.append("/index.html"); + boost::beast::error_code ec; + http::file_body::value_type body; + body.open(path.c_str(), boost::beast::file_mode::scan, ec); + if (ec == boost::beast::errc::no_such_file_or_directory) { + std::ostringstream oss; + oss << "The resource '" << target << "' was not found."; + LOG(error) << oss.str(); + session.errorReply(request, http::status::not_found, oss.str()); + return; + } else if (ec) { + session.reply(ServiceLogic::serverError(request, ec.message())); + return; + } + auto const size = body.size(); + http::response res{std::piecewise_construct, std::make_tuple(std::move(body)), + std::make_tuple(http::status::ok, request.version())}; + res.set(http::field::server, BOOST_BEAST_VERSION_STRING); + res.set(http::field::content_type, ResponseUtility::mimeType(path)); + res.content_length(size); + res.keep_alive(request.keep_alive()); + session.reply(std::move(res)); + }); + // clang-format on +} +} // namespace ServiceLogic diff --git a/Main/ServiceLogic.h b/Main/ServiceLogic.h new file mode 100644 index 0000000..51c3e43 --- /dev/null +++ b/Main/ServiceLogic.h @@ -0,0 +1,43 @@ +#ifndef SERVICELOGIC_H +#define SERVICELOGIC_H + +#include "Application.h" +#include "ResponseUtility.h" +#include +#include +#include +#include +#include +#include +#include + +using StringRequest = boost::beast::http::request; + +namespace ServiceLogic { + +// Returns a server error response +boost::beast::http::response +serverError(const boost::beast::http::request &request, std::string_view errorMessage); + +boost::beast::http::response +badRequest(const boost::beast::http::request &request, std::string_view why); + +template +boost::beast::http::response make_200(const boost::beast::http::request &request, + typename ResponseBody::value_type body, + boost::beast::string_view content) { + boost::beast::http::response response{boost::beast::http::status::ok, request.version()}; + response.set(boost::beast::http::field::server, BOOST_BEAST_VERSION_STRING); + response.set(boost::beast::http::field::content_type, content); + response.body() = body; + response.prepare_payload(); + response.keep_alive(request.keep_alive()); + + return response; +} + +void staticFilesDeploy(); + +}; // namespace ServiceLogic + +#endif // SERVICELOGIC_H diff --git a/Main/Settings.cpp b/Main/Settings.cpp new file mode 100644 index 0000000..78d0d50 --- /dev/null +++ b/Main/Settings.cpp @@ -0,0 +1,70 @@ +#include "Settings.h" +#include "Core/Logger.h" +#include +#include +#include +#include +#include + +constexpr auto SettingsFilePath = "settings.xml"; + +namespace Danki { +Settings::Settings() { + if (!std::filesystem::exists(SettingsFilePath)) { + save(); + } + load(); +} + +void Settings::save() { + using namespace boost::property_tree; + ptree ptree; + ptree.put("Application.Threads", std::thread::hardware_concurrency()); + ptree.put("Application.SqlitePath", m_sqlitePath); + + ptree.put("Application.HttpServer.Address", m_server); + ptree.put("Application.HttpServer.DocumentRoot", m_documentRoot); + ptree.put("Application.HttpServer.DocumentRoot.", + "静态网页文件存放位置,为空时网站统计将不判断页面页面是否存在"); + + xml_writer_settings settings('\t', 1); + write_xml(SettingsFilePath, ptree, std::locale(), settings); +} + +void Settings::load() { + using namespace boost::property_tree; + ptree ptree; + try { + read_xml(SettingsFilePath, ptree); + m_sqlitePath = ptree.get("Application.SqlitePath"); + m_threads = ptree.get("Application.Threads"); + + m_server = ptree.get("Application.HttpServer.Address"); + m_documentRoot = ptree.get("Application.HttpServer.DocumentRoot"); + boost::algorithm::trim(m_documentRoot); + } catch (const xml_parser_error &error) { + LOG(error) << "parse " << SettingsFilePath << " failed: " << error.message(); + } +} + +uint32_t Settings::threads() const { + return m_threads; +} + +std::string Settings::server() const { + return m_server; +} + +uint16_t Settings::port() const { + return m_port; +} + +std::string Settings::documentRoot() const { + return m_documentRoot; +} + +std::string Settings::sqlitePath() const { + return m_sqlitePath; +} + +} // namespace Danki \ No newline at end of file diff --git a/Main/Settings.h b/Main/Settings.h new file mode 100644 index 0000000..c83dde0 --- /dev/null +++ b/Main/Settings.h @@ -0,0 +1,30 @@ +#ifndef __SETTINGS_H__ +#define __SETTINGS_H__ + +#include +#include + +namespace Danki { +class Settings { +public: + Settings(); + void save(); + void load(); + + uint32_t threads() const; + std::string server() const; + uint16_t port() const; + std::string documentRoot() const; + std::string sqlitePath() const; + +private: + uint32_t m_threads = 1; + std::string m_server = "0.0.0.0"; + uint16_t m_port = 80; + + std::string m_documentRoot = "/data/sdcard/PassengerStatistics/web"; + std::string m_sqlitePath = "database.sqlite"; +}; +} // namespace Danki + +#endif // __SETTINGS_H__ \ No newline at end of file diff --git a/Main/WebRTC/SignalServer.cpp b/Main/WebRTC/SignalServer.cpp new file mode 100644 index 0000000..350e8d0 --- /dev/null +++ b/Main/WebRTC/SignalServer.cpp @@ -0,0 +1,41 @@ +#include "SignalServer.h" +#include "../Application.h" +#include "../HttpSession.h" +#include "Core/Logger.h" +#include "WebSocketSignalSession.h" +#include + +namespace Danki { +SignalServer::SignalServer(Application &app) { + using namespace boost::urls; + // clang-format off + app.insertUrl("/api/v1/webrtc/signal/{id}", [this](HttpSession &session, const Application::Request &request, const matches &matches) { + auto id = matches.at("id"); + if (boost::beast::websocket::is_upgrade(request)) { + auto ws = std::make_shared(session.releaseSocket(), *this, id); + ws->run(request); + } else { + LOG(error) << "webrtc client[" << id << "] not upgrade connection, request: " << std::endl << request; + } + }); + // clang-format on +} + +void SignalServer::join(const std::string &id, WebSocketSignalSession *client) { + m_clients.insert({id, client}); +} + +void SignalServer::leave(const std::string &id) { + if (m_clients.count(id) > 0) { + m_clients.erase(id); + } +} + +WebSocketSignalSession *SignalServer::client(const std::string &id) { + WebSocketSignalSession *ret = nullptr; + if (m_clients.count(id) > 0) { + ret = m_clients.at(id); + } + return ret; +} +} // namespace Danki \ No newline at end of file diff --git a/Main/WebRTC/SignalServer.h b/Main/WebRTC/SignalServer.h new file mode 100644 index 0000000..7f5c496 --- /dev/null +++ b/Main/WebRTC/SignalServer.h @@ -0,0 +1,24 @@ +#ifndef __SIGNALSERVER_H__ +#define __SIGNALSERVER_H__ + +#include +#include + +namespace Danki { + +class Application; +class WebSocketSignalSession; + +class SignalServer { +public: + SignalServer(Application &app); + void join(const std::string &id, WebSocketSignalSession *client); + void leave(const std::string &id); + WebSocketSignalSession *client(const std::string &id); + +private: + std::unordered_map m_clients; +}; +} // namespace Danki + +#endif // __SIGNALSERVER_H__ \ No newline at end of file diff --git a/Main/WebRTC/Streamer.cpp b/Main/WebRTC/Streamer.cpp index 253a0b8..25cfda9 100644 --- a/Main/WebRTC/Streamer.cpp +++ b/Main/WebRTC/Streamer.cpp @@ -7,6 +7,8 @@ #include #include +namespace Danki { + class WebRTCStreamerPrivate { public: WebRTCStreamerPrivate(boost::asio::io_context &ioContext) : strand{ioContext.get_executor()} { @@ -135,7 +137,13 @@ void Streamer::start(const std::string &signalServerAddress, uint16_t signalServ m_d->websocket = std::make_shared(c); m_d->websocket->onOpen([]() { LOG(info) << "WebSocket connected, signaling ready"; }); m_d->websocket->onClosed([]() { LOG(info) << "WebSocket closed"; }); - m_d->websocket->onError([](const std::string &error) { LOG(error) << "WebSocket failed: " << error; }); + m_d->websocket->onError([this, signalServerAddress, signalServerPort](const std::string &error) { + LOG(error) << "WebSocket failed: " << error; + + boost::asio::post(m_d->strand, [this, signalServerAddress, signalServerPort]() { + start(signalServerAddress, signalServerPort); + }); + }); m_d->websocket->onMessage([this](std::variant data) { if (!std::holds_alternative(data)) return; @@ -147,7 +155,7 @@ void Streamer::start(const std::string &signalServerAddress, uint16_t signalServ }); const std::string url = - "wss://" + signalServerAddress + ":" + std::to_string(signalServerPort) + "/api/v1/webrtc/signal/" + localId; + "ws://" + signalServerAddress + ":" + std::to_string(signalServerPort) + "/api/v1/webrtc/signal/" + localId; LOG(info) << "URL is " << url; m_d->websocket->open(url); LOG(info) << "Waiting for signaling to be connected..."; @@ -175,7 +183,7 @@ void Streamer::push(const uint8_t *data, uint32_t size) { } Streamer::Streamer(boost::asio::io_context &ioContext) : m_d{new WebRTCStreamerPrivate(ioContext)} { - rtc::InitLogger(rtc::LogLevel::Debug); + rtc::InitLogger(rtc::LogLevel::Info); std::string stunServer = "stun:amass.fun:5349"; // ssl m_d->configuration.iceServers.emplace_back(stunServer); LOG(info) << "STUN server is " << stunServer; @@ -185,3 +193,4 @@ Streamer::Streamer(boost::asio::io_context &ioContext) : m_d{new WebRTCStreamerP m_d->configuration.disableAutoNegotiation = true; } +} \ No newline at end of file diff --git a/Main/WebRTC/Streamer.h b/Main/WebRTC/Streamer.h index d91ca75..9c25737 100644 --- a/Main/WebRTC/Streamer.h +++ b/Main/WebRTC/Streamer.h @@ -9,6 +9,8 @@ namespace asio { class io_context; } } // namespace boost + +namespace Danki { class WebRTCStreamerPrivate; class Streamer { @@ -21,5 +23,6 @@ public: private: WebRTCStreamerPrivate *m_d = nullptr; }; +} // namespace Danki #endif // __WEBRTCSTREAMER_H__ \ No newline at end of file diff --git a/Main/WebRTC/WebSocketSignalSession.cpp b/Main/WebRTC/WebSocketSignalSession.cpp new file mode 100644 index 0000000..f723353 --- /dev/null +++ b/Main/WebRTC/WebSocketSignalSession.cpp @@ -0,0 +1,109 @@ +#include "WebSocketSignalSession.h" +#include "Core/Logger.h" +#include "SignalServer.h" +#include +#include +#include + +namespace Danki { +WebSocketSignalSession::WebSocketSignalSession(boost::asio::ip::tcp::socket &&socket, SignalServer &server, + const std::string &id) + : m_ws(std::move(socket)), m_server(server), m_id(id) { + m_server.join(m_id, this); +} + +WebSocketSignalSession::~WebSocketSignalSession() { + m_server.leave(m_id); +} + +void WebSocketSignalSession::onAccept(boost::beast::error_code ec) { + if (ec) { + if (ec == boost::asio::error::operation_aborted || ec == boost::beast::websocket::error::closed) return; + LOG(error) << "accept: " << ec.message() << "\n"; + return; + } + LOG(info) << "accept websocket target: " << m_target << ", id: " << m_id; + + // Read a message + m_ws.async_read(m_buffer, boost::beast::bind_front_handler(&WebSocketSignalSession::onRead, shared_from_this())); +} + +void WebSocketSignalSession::onRead(const boost::beast::error_code &error, std::size_t bytesTransferred) { + if (error) { + if (error == boost::asio::error::operation_aborted || error == boost::beast::websocket::error::closed) return; + LOG(error) << error << ": " << error.message(); + return; + } + + boost::scope::scope_exit raii([this] { + m_buffer.consume(m_buffer.size()); // Clear the buffer + m_ws.async_read(m_buffer, + boost::beast::bind_front_handler(&WebSocketSignalSession::onRead, shared_from_this())); + }); + if (!m_ws.got_text()) { + LOG(warning) << "current not supported binary message."; + return; + } + auto message = boost::beast::buffers_to_string(m_buffer.data()); + auto rootObject = boost::json::parse(message); + auto &root = rootObject.as_object(); + if (root.contains("id")) { + if (!root.at("id").is_string()) { + LOG(warning) << "wrong format."; + m_ws.close(boost::beast::websocket::close_code::normal); + return; + } + auto destinationId = std::string(root["id"].as_string()); + auto destination = m_server.client(destinationId); + if (destination == nullptr) { + LOG(info) << "client " << destinationId << " not found."; + } else { + root["id"] = m_id; + auto reply = std::make_shared(boost::json::serialize(root)); + destination->send(reply); + } + LOG(info) << message; + } else if (root.contains("type")) { + auto &type = root.at("type").as_string(); + if (type == "ping") { + boost::json::object object; + object["type"] = "pong"; + send(boost::json::serialize(object)); + } + } else { + LOG(info) << message; + } +} + +void WebSocketSignalSession::send(const std::shared_ptr &ss) { + boost::asio::post(m_ws.get_executor(), [ptr = weak_from_this(), ss]() { + if (ptr.expired()) return; + auto self = ptr.lock(); + self->m_queue.push_back(ss); + if (self->m_queue.size() > 1) return; // 之前已经有发送了 + self->m_ws.text(); + self->m_ws.async_write( + boost::asio::buffer(*self->m_queue.front()), + boost::beast::bind_front_handler(&WebSocketSignalSession::on_write, self->shared_from_this())); + }); +} + +void WebSocketSignalSession::send(std::string &&message) { + return send(std::make_shared(std::move(message))); +} + +void WebSocketSignalSession::on_write(boost::beast::error_code ec, std::size_t) { + if (ec) { + // Don't report these + if (ec == boost::asio::error::operation_aborted || ec == boost::beast::websocket::error::closed) return; + std::cerr << "write: " << ec.message() << "\n"; + return; + } + + // Remove the string from the queue + m_queue.erase(m_queue.begin()); + if (!m_queue.empty()) + m_ws.async_write(boost::asio::buffer(*m_queue.front()), + boost::beast::bind_front_handler(&WebSocketSignalSession::on_write, shared_from_this())); +} +} // namespace Danki \ No newline at end of file diff --git a/Main/WebRTC/WebSocketSignalSession.h b/Main/WebRTC/WebSocketSignalSession.h new file mode 100644 index 0000000..c1f9a8f --- /dev/null +++ b/Main/WebRTC/WebSocketSignalSession.h @@ -0,0 +1,58 @@ +#ifndef __WEBSOCKETSIGNALSESSION_H__ +#define __WEBSOCKETSIGNALSESSION_H__ + +#include +#include +#include +#include +#include + +namespace Danki { + +class SignalServer; + +/** + * @brief Represents an active WebSocket connection to the server + */ +class WebSocketSignalSession : public std::enable_shared_from_this { +public: + WebSocketSignalSession(boost::asio::ip::tcp::socket &&socket, SignalServer &server, const std::string &id); + + ~WebSocketSignalSession(); + + template + void run(boost::beast::http::request> request) { + using namespace boost::beast::http; + using namespace boost::beast::websocket; + // Set suggested timeout settings for the websocket + m_ws.set_option(stream_base::timeout::suggested(boost::beast::role_type::server)); + m_target = request.target(); + // Set a decorator to change the Server of the handshake + m_ws.set_option(stream_base::decorator([](response_type &response) { + response.set(field::server, std::string(BOOST_BEAST_VERSION_STRING) + " websocket-chat-multi"); + })); + // LOG(info) << request.base().target(); //get path + // Accept the websocket handshake + m_ws.async_accept(request, [self{shared_from_this()}](boost::beast::error_code ec) { self->onAccept(ec); }); + } + + // Send a message + void send(const std::shared_ptr &ss); + void send(std::string &&message); + +protected: + void onAccept(boost::beast::error_code ec); + void onRead(const boost::beast::error_code &error, std::size_t bytesTransferred); + void on_write(boost::beast::error_code ec, std::size_t bytes_transferred); + +private: + boost::beast::websocket::stream m_ws; + SignalServer &m_server; + std::string m_id; + std::string m_target; + boost::beast::flat_buffer m_buffer; + std::vector> m_queue; +}; +} // namespace Danki + +#endif // __WEBSOCKETSIGNALSESSION_H__ \ No newline at end of file diff --git a/Main/main.cpp b/Main/main.cpp index d9dcaad..7f26ef5 100644 --- a/Main/main.cpp +++ b/Main/main.cpp @@ -1,3 +1,4 @@ +#include "Application.h" #include "Camera.h" #include "Core/DateTime.h" #include "Core/IoContext.h" @@ -8,47 +9,47 @@ #include "WebRTC/Streamer.h" #include "rw_mpp_api.h" #include +#include #include -int main(int argc, char const *argv[]) { +int main(int argc, char const *argv[]) try { using namespace Core; + using namespace Danki; LOG(info) << "app start..."; int status = rw_mpp__init(); if (status != 0) { LOG(error) << "rw_mpp__init() failed, status: " << status; return -1; } - try { - auto camera = Singleton::construct(); - auto ioContext = Singleton::construct(std::thread::hardware_concurrency()); - auto rtsp = std::make_shared(*ioContext->ioContext()); - auto streamer = std::make_shared(*ioContext->ioContext()); - streamer->start("amass.fun", 443); + boost::scope::scope_exit raii([] { + LOG(info) << "app exit."; + rw_mpp__finalize(); + }); - auto video = std::make_shared(2592, 1536); - video->setPacketHandler([&](const uint8_t *data, uint32_t size) { - rtsp->push(data, size); - streamer->push(data, size); - }); - video->start(); - // video->startFileInput("/data/sdcard/HM1.264", 1280, 720); - video->startEncode(); - boost::asio::signal_set signals(*ioContext->ioContext(), SIGINT); - signals.async_wait([&](boost::system::error_code const &, int signal) { - LOG(info) << "capture " << (signal == SIGINT ? "SIGINT" : "SIGTERM") << ",stop!"; - video.reset(); - rw_mpp__finalize(); - ioContext->ioContext()->stop(); - }); + auto application = Singleton::construct(); - ioContext->run(true); - } catch (const boost::exception &e) { - LOG(error) << "error"; - } catch (const std::exception &e) { - LOG(error) << e.what(); - } + auto camera = Singleton::construct(); + auto rtsp = std::make_shared(application->ioContext()); + auto streamer = std::make_shared(application->ioContext()); + streamer->start("127.0.0.1", 80); - rw_mpp__finalize(); - LOG(info) << "app exit."; - return 0; + auto video = std::make_shared(2592, 1536); + video->setPacketHandler([&](const uint8_t *data, uint32_t size) { + rtsp->push(data, size); + streamer->push(data, size); + }); + video->start(); + video->startEncode(); + boost::asio::signal_set signals(application->ioContext(), SIGINT); + signals.async_wait([&](boost::system::error_code const &, int signal) { + LOG(info) << "capture " << (signal == SIGINT ? "SIGINT" : "SIGTERM") << ",stop!"; + video.reset(); + rw_mpp__finalize(); + application->ioContext().stop(); + }); + return application->exec(); +} catch (const boost::exception &e) { + LOG(error) << "error"; +} catch (const std::exception &e) { + LOG(error) << e.what(); } diff --git a/resources/build.sh b/resources/build.sh index c7f9ade..6a0fc08 100755 --- a/resources/build.sh +++ b/resources/build.sh @@ -81,7 +81,7 @@ function init() { echo "put ${BOOST_LIBDIR}/libboost_system.so.1.87.0 /data/sdcard/PassengerStatistics/lib" | sftp danki echo "put ${BOOST_LIBDIR}/libboost_filesystem.so.1.87.0 /data/sdcard/PassengerStatistics/lib" | sftp danki echo "put ${BOOST_LIBDIR}/libboost_thread.so.1.87.0 /data/sdcard/PassengerStatistics/lib" | sftp danki - # echo "put ${BOOST_LIBDIR}/libboost_url.so.1.84.0 /system/lib" | sftp -i resources/ssh_host_rsa_key_ok root@${TARGET_IP} + echo "put ${BOOST_LIBDIR}/libboost_url.so.1.87.0 /data/sdcard/PassengerStatistics/lib" | sftp danki echo "put ${BOOST_LIBDIR}/libboost_json.so.1.87.0 /data/sdcard/PassengerStatistics/lib" | sftp danki # echo "put ${BOOST_LIBDIR}/libboost_program_options.so.1.84.0 /system/lib" | sftp -i resources/ssh_host_rsa_key_ok root@${TARGET_IP}