From 4c1db38c8ee3cc38529f2a24bfc95ea43887e3ab Mon Sep 17 00:00:00 2001 From: luocai Date: Fri, 21 Jul 2023 14:07:27 +0800 Subject: [PATCH] add library 'AsioZeroMQ'. --- AsioZeroMQ/BasicSocket.h | 94 ++++++++++ AsioZeroMQ/BasicSocket.inl | 331 +++++++++++++++++++++++++++++++++++ AsioZeroMQ/CMakeLists.txt | 29 +++ AsioZeroMQ/ErrorCode.cpp | 13 ++ AsioZeroMQ/ErrorCode.h | 18 ++ AsioZeroMQ/Message.cpp | 85 +++++++++ AsioZeroMQ/Message.h | 84 +++++++++ AsioZeroMQ/Options.h | 65 +++++++ AsioZeroMQ/SocketService.cpp | 62 +++++++ AsioZeroMQ/SocketService.h | 63 +++++++ AsioZeroMQ/ZeroMQSocket.h | 10 ++ CMakeLists.txt | 4 + resource/deploy.sh | 3 +- 13 files changed, 860 insertions(+), 1 deletion(-) create mode 100644 AsioZeroMQ/BasicSocket.h create mode 100644 AsioZeroMQ/BasicSocket.inl create mode 100644 AsioZeroMQ/CMakeLists.txt create mode 100644 AsioZeroMQ/ErrorCode.cpp create mode 100644 AsioZeroMQ/ErrorCode.h create mode 100644 AsioZeroMQ/Message.cpp create mode 100644 AsioZeroMQ/Message.h create mode 100644 AsioZeroMQ/Options.h create mode 100644 AsioZeroMQ/SocketService.cpp create mode 100644 AsioZeroMQ/SocketService.h create mode 100644 AsioZeroMQ/ZeroMQSocket.h diff --git a/AsioZeroMQ/BasicSocket.h b/AsioZeroMQ/BasicSocket.h new file mode 100644 index 0000000..62c5ef4 --- /dev/null +++ b/AsioZeroMQ/BasicSocket.h @@ -0,0 +1,94 @@ +#ifndef BASICSOCKET_H +#define BASICSOCKET_H + +#include "Message.h" +#include "Options.h" +#include +#include +#include +#include + +namespace ZeroMQ { + +template +class BasicSocket { +public: + using ImplementationType = typename Service::ImplementationType; + BasicSocket(boost::asio::io_context &context, SocketType type); + void connect(std::string_view address, boost::system::error_code &error); + void connect(std::string_view address); + + /** + * @brief only see the instance is nullptr or not. + */ + bool connected() const; + + void bind(std::string_view address, boost::system::error_code &error); + void bind(std::string_view address); + + template + void setOption(IntegralOption, const T &value, boost::system::error_code &error); + + template + void setOption(IntegralOption, const T &value); + + template + void setOption(ArrayOption, const std::string_view &buffer, boost::system::error_code &error); + + template + T option(IntegralOption, boost::system::error_code &error) const; + + template + T option(IntegralOption) const; + + size_t send(boost::asio::const_buffer buffer, SendFlags flags, boost::system::error_code &error); + + template + typename boost::enable_if, size_t>::type + send(const ConstBufferSequence &buffers, SendFlags flags, boost::system::error_code &error); + + template + typename boost::enable_if, size_t>::type + send(const ConstBufferSequence &buffers, SendFlags flags = SendFlags::None); + + size_t send(Message &&message, SendFlags flags, boost::system::error_code &error); + size_t send(Message &&message, SendFlags flags); + + std::size_t receive(Message &message, RecvFlags flags, boost::system::error_code &error); + std::size_t receive(Message &message, RecvFlags flags = RecvFlags::None); + + size_t receive(boost::asio::mutable_buffer buffer, RecvFlags flags, boost::system::error_code &error); + size_t receive(const boost::asio::mutable_buffer &buffer, RecvFlags flags = RecvFlags::None); + + template + typename boost::enable_if, std::vector>::type + receive(const MutableBufferSequence &buffers, RecvFlags flags, boost::system::error_code &error); + + template + typename boost::enable_if, std::vector>::type + receive(const MutableBufferSequence &buffers, RecvFlags flags = RecvFlags::None); + + // void read_handler(const boost::system::error_code& ec,std::size_t bytes_transferred) + template + void asyncReceive(Message &message, ReadHandler &&handler); + + template + void asyncReceive(const MutableBufferSequence &buffers, ReadHandler &&handler); + + template + size_t receiveMultipart(OutputIt &out, size_t n, RecvFlags flags, boost::system::error_code &error); + + template + void asyncReceiveMultipart(OutputIt out, ReadHandler &&handler); + + boost::asio::io_context &ioContext() const; + ~BasicSocket(); + +private: + Service &m_service; + ImplementationType m_impl; +}; + +} // namespace ZeroMQ +#include "BasicSocket.inl" +#endif // BASICSOCKET_H diff --git a/AsioZeroMQ/BasicSocket.inl b/AsioZeroMQ/BasicSocket.inl new file mode 100644 index 0000000..b9953e5 --- /dev/null +++ b/AsioZeroMQ/BasicSocket.inl @@ -0,0 +1,331 @@ +#ifndef BASICSOCKET_INL +#define BASICSOCKET_INL + +#include "BasicSocket.h" +#include "ErrorCode.h" +#include "SocketService.h" +#include +#include +#include + +#include "BoostLog.h" + +namespace ZeroMQ { + +template +BasicSocket::BasicSocket(boost::asio::io_context &context, SocketType type) + : m_service(boost::asio::use_service(context)) { + m_service.construct(m_impl, type); +} + +template +BasicSocket::~BasicSocket() { + m_service.destroy(m_impl); +} + +template +void BasicSocket::connect(std::string_view address, boost::system::error_code &error) { + m_service.connect(m_impl, std::move(address), error); +} + +template +void BasicSocket::connect(std::string_view address) { + boost::system::error_code error; + connect(std::move(address), error); + if (error) throw error; +} + +template +bool BasicSocket::connected() const { + return m_impl != nullptr && m_impl->socket != nullptr; +} + +template +void BasicSocket::bind(std::string_view address, boost::system::error_code &error) { + std::lock_guard lock_guard(m_impl->mutex); + auto status = zmq_bind(m_impl->socket, address.data()); + if (status < 0) { + error = makeErrorCode(); + return; + } +} + +template +void BasicSocket::bind(std::string_view address) { + boost::system::error_code error; + bind(std::move(address), error); + if (error) throw error; +} + +template +template +void BasicSocket::setOption(IntegralOption, const T &value, + boost::system::error_code &error) { + static_assert(std::is_integral::value, "T must be integral"); + m_service.setOption(m_impl, Option, &value, sizeof(value), error); +} + +template +template +void BasicSocket::setOption(IntegralOption, const T &value) { + boost::system::error_code error; + setOption(IntegralOption(), value, error); + if (error) throw error; +} + +template +template +void BasicSocket::setOption(ArrayOption, const std::string_view &buffer, + boost::system::error_code &error) { + m_service.setOption(m_impl, Option, buffer.data(), buffer.size(), error); +} + +template +template +T BasicSocket::option(IntegralOption, boost::system::error_code &error) const { + static_assert(std::is_integral::value, "T must be integral"); + T value; + size_t size = sizeof value; + m_service.option(m_impl, Option, &value, &size, error); + + assert(size == sizeof value); + return value; +} + +template +template +T BasicSocket::option(IntegralOption) const { + boost::system::error_code error; + auto ret = option(IntegralOption(), error); + if (error) throw error; + return ret; +} + +template +boost::asio::io_context &BasicSocket::ioContext() const { + return m_service.get_io_context(); +} + +template +size_t BasicSocket::send(boost::asio::const_buffer buffer, SendFlags flags, boost::system::error_code &error) { + const int nbytes = zmq_send(m_impl->socket, buffer.data(), buffer.size(), static_cast(flags)); + if (nbytes >= 0) return static_cast(nbytes); + error = makeErrorCode(); + return nbytes; +} + +template +template +typename boost::enable_if, size_t>::type +BasicSocket::send(const ConstBufferSequence &buffers, SendFlags flags, boost::system::error_code &error) { + size_t res = 0; + auto last = std::distance(std::begin(buffers), std::end(buffers)) - 1; + auto index = 0u; + for (auto it = std::begin(buffers); it != std::end(buffers); ++it, ++index) { + auto f = index == last ? static_cast(flags) : static_cast(flags) | ZMQ_SNDMORE; + res += send(*it, static_cast(f), error); + if (error) return 0u; + } + return res; +} + +template +template +typename boost::enable_if, size_t>::type +BasicSocket::send(const ConstBufferSequence &buffers, SendFlags flags) { + boost::system::error_code error; + auto size = send(buffers, flags, error); + if (error) throw error; + return size; +} + +template +size_t BasicSocket::send(Message &&message, SendFlags flags, boost::system::error_code &error) { + int nbytes = zmq_msg_send(message.handle(), m_impl->socket, static_cast(flags)); + if (nbytes >= 0) return static_cast(nbytes); + error = makeErrorCode(); + return nbytes; +} + +template +size_t BasicSocket::send(Message &&message, SendFlags flags) { + boost::system::error_code error; + auto size = send(std::move(message), flags, error); + if (error) throw error; + return size; +} + +template +std::size_t BasicSocket::receive(Message &message, RecvFlags flags, boost::system::error_code &error) { + std::lock_guard lock_guard(m_impl->mutex); + BOOST_ASSERT_MSG(m_impl->socket, "Invalid socket"); + auto size = zmq_msg_recv(message.handle(), m_impl->socket, static_cast(flags)); + if (size < 0) error = makeErrorCode(); + return size; +} + +template +std::size_t BasicSocket::receive(Message &message, RecvFlags flags) { + boost::system::error_code error; + auto size = receive(message, flags, error); + if (error) throw error; + return size; +} + +template +size_t BasicSocket::receive(boost::asio::mutable_buffer buffer, RecvFlags flags, + boost::system::error_code &error) { + const int nbytes = zmq_recv(m_impl->socket, buffer.data(), buffer.size(), static_cast(flags)); + if (nbytes >= 0) return nbytes; + + error = makeErrorCode(); + return nbytes; +} + +template +template +typename boost::enable_if, std::vector>::type +BasicSocket::receive(const MutableBufferSequence &buffers, RecvFlags flags, boost::system::error_code &error) { + std::vector ret; + auto iterator = std::begin(buffers); + auto f = static_cast(flags); + do { + auto size = receive(*iterator, flags, error); + if (error) return ret; + ret.push_back(size); + + f |= ZMQ_RCVMORE; + ++iterator; + } while ((iterator != std::end(buffers)) && option(ReceiveMore)); + if (option(ReceiveMore)) error = makeErrorCode(boost::system::errc::no_buffer_space); + return ret; +} + +template +template +typename boost::enable_if, std::vector>::type +BasicSocket::receive(const MutableBufferSequence &buffers, RecvFlags flags) { + boost::system::error_code error; + auto size = receive(buffers, flags, error); + if (error) throw error; + return size; +} + +template +size_t BasicSocket::receive(const boost::asio::mutable_buffer &buffer, RecvFlags flags) { + boost::system::error_code error; + auto size = receive(std::move(buffer), flags, error); + if (error) throw error; + return size; +} + +template +template +void BasicSocket::asyncReceive(const MutableBufferSequence &buffers, ReadHandler &&handler) { + // using namespace boost::asio::posix; + if (option(Events) & ZMQ_POLLIN) { + boost::asio::post(m_service.get_io_context(), [&buffers, this, handler{std::move(handler)}]() { + boost::system::error_code error; + auto size = receive(buffers, RecvFlags::Dontwait, error); + handler(error, size); + }); + return; + } + m_impl->descriptor->async_wait(StreamType::wait_read, [this, &buffers, handler{std::move(handler)}]( + const boost::system::error_code &waitError) { + if (waitError) { + handler(waitError, {}); + return; + } + if (option(Events) & ZMQ_POLLIN) { + boost::system::error_code error; + auto size = receive(buffers, RecvFlags::Dontwait, error); + return handler(error, size); + } else { + asyncReceive(buffers, handler); + } + }); +} + +template +template +void BasicSocket::asyncReceive(Message &message, ReadHandler &&handler) { + // using namespace boost::asio::posix; + + if (option(Events) & ZMQ_POLLIN) { + boost::asio::post(m_service.get_io_context(), [&message, this, handler{std::move(handler)}]() { + boost::system::error_code error; + auto size = receive(message, RecvFlags::Dontwait, error); + handler(error, size); + }); + return; + } + + m_impl->descriptor->async_wait(StreamType::wait_read, [this, &message, handler{std::move(handler)}]( + const boost::system::error_code &waitError) { + if (waitError) { + handler(waitError, 0); + return; + } + if (option(Events) & ZMQ_POLLIN) { + boost::system::error_code error; + auto size = receive(message, RecvFlags::Dontwait, error); + return handler(error, size); + } else { + asyncReceive(message, handler); + } + }); +} + +template +template +size_t BasicSocket::receiveMultipart(OutputIt &out, size_t n, RecvFlags flags, + boost::system::error_code &error) { + size_t msg_count = 0; + Message message; + while (true) { + if (CheckN) { + if (msg_count >= n) throw std::runtime_error("Too many message parts in recv_multipart_n"); + } + receive(message, flags, error); + if (error) return msg_count; + + ++msg_count; + const bool more = message.more(); + *out++ = std::move(message); + if (!more) break; + } + + return msg_count; +} + +template +template +void BasicSocket::asyncReceiveMultipart(OutputIt out, ReadHandler &&handler) { + // using namespace boost::asio::posix; + boost::system::error_code error; + + if (option(Events) & ZMQ_POLLIN) { + auto size = receiveMultipart(out, 0, RecvFlags::Dontwait, error); + return handler(error, size); + } + m_impl->descriptor->async_wait(StreamType::wait_read, [this, out, handler{std::move(handler)}]( + const boost::system::error_code &waitError) mutable { + if (waitError) { + handler(waitError, 0); + return; + } + if ((option(Events) & ZMQ_POLLIN) == 0) { + asyncReceiveMultipart(out, handler); + return; + } + size_t size = 0; + boost::system::error_code error; + size += receiveMultipart(out, 0, RecvFlags::Dontwait, error); + if (!error || error.value() != EAGAIN) return handler(error, size); + }); +} + +} // namespace ZeroMQ + +#endif // BASICSOCKET_INL diff --git a/AsioZeroMQ/CMakeLists.txt b/AsioZeroMQ/CMakeLists.txt new file mode 100644 index 0000000..648e488 --- /dev/null +++ b/AsioZeroMQ/CMakeLists.txt @@ -0,0 +1,29 @@ +add_library(AsioZeroMQ + BasicSocket.h BasicSocket.inl + ErrorCode.h ErrorCode.cpp + Message.h Message.cpp + Options.h + SocketService.h SocketService.cpp + ZeroMQSocket.h +) + +target_include_directories(AsioZeroMQ + INTERFACE ${CMAKE_CURRENT_SOURCE_DIR} + PUBLIC ${ZeroMQ_INCLUDE_DIR} +) + +target_link_directories(AsioZeroMQ + PUBLIC ${ZeroMQ_LIBRARY_DIRS} +) + +target_link_libraries(AsioZeroMQ + PUBLIC ${Boost_LIBRARIES} + PUBLIC Universal + PUBLIC ${ZeroMQ_LIBRARIES} +) + +if(UNIX) +target_compile_options(AsioZeroMQ + PRIVATE -fPIC +) +endif() diff --git a/AsioZeroMQ/ErrorCode.cpp b/AsioZeroMQ/ErrorCode.cpp new file mode 100644 index 0000000..ac93689 --- /dev/null +++ b/AsioZeroMQ/ErrorCode.cpp @@ -0,0 +1,13 @@ +#include "ErrorCode.h" + +namespace ZeroMQ { +boost::system::error_code makeErrorCode(int ev) { + static ErrorCategory cat; + + return boost::system::error_code(ev, cat); +} + +const char *ErrorCategory::name() const noexcept { return "ZeroMQ"; } + +std::string ErrorCategory::message(int ev) const { return std::string(zmq_strerror(ev)); } +} // namespace ZeroMQ diff --git a/AsioZeroMQ/ErrorCode.h b/AsioZeroMQ/ErrorCode.h new file mode 100644 index 0000000..04b6063 --- /dev/null +++ b/AsioZeroMQ/ErrorCode.h @@ -0,0 +1,18 @@ +#ifndef ERRORCODE_H +#define ERRORCODE_H + +#include +#include + +namespace ZeroMQ { + +class ErrorCategory : public boost::system::error_category { +public: + const char *name() const noexcept override; + std::string message(int ev) const override; +}; + +boost::system::error_code makeErrorCode(int ev = zmq_errno()); +} // namespace ZeroMQ + +#endif // ERRORCODE_H diff --git a/AsioZeroMQ/Message.cpp b/AsioZeroMQ/Message.cpp new file mode 100644 index 0000000..47be3d9 --- /dev/null +++ b/AsioZeroMQ/Message.cpp @@ -0,0 +1,85 @@ +#include "Message.h" +#include +#include +#include + +namespace ZeroMQ { +Message::Message() noexcept { zmq_msg_init(&m_message); } + +Message::~Message() noexcept { + int rc = zmq_msg_close(&m_message); + BOOST_ASSERT_MSG(rc == 0, "init failed."); +} + +Message::Message(Message &&rhs) noexcept : m_message(rhs.m_message) { + int rc = zmq_msg_init(&rhs.m_message); + BOOST_ASSERT(rc == 0); +} + +Message &Message::operator=(Message &&rhs) noexcept { + std::swap(m_message, rhs.m_message); + return *this; +} + +bool Message::operator==(const Message &other) const noexcept { + const size_t my_size = size(); + return my_size == other.size() && 0 == memcmp(data(), other.data(), my_size); +} + +bool Message::operator!=(const Message &other) const noexcept { return !(*this == other); } + +Message::Message(const void *dataSrc, size_t size) { + int rc = zmq_msg_init_size(&m_message, size); + if (rc != 0) throw makeErrorCode(); + if (size) { + // this constructor allows (nullptr, 0), memcpy with a null pointer is UB + memcpy(data(), dataSrc, size); + } +} + +void Message::copy(Message &message) { + int rc = zmq_msg_copy(&m_message, message.handle()); + if (rc != 0) throw makeErrorCode(); +} + +std::string Message::dump() const { + + // Partly mutuated from the same method in zmq::multipart_t + std::ostringstream os; + + const unsigned char *msg_data = this->data(); + unsigned char byte; + size_t size = this->size(); + int is_ascii[2] = {0, 0}; + + os << "Message [size " << std::dec << std::setw(3) << std::setfill('0') << size << "] ("; + // Totally arbitrary + if (size >= 1000) { + os << "... too big to print)"; + } else { + while (size--) { + byte = *msg_data++; + + is_ascii[1] = (byte >= 32 && byte < 127); + if (is_ascii[1] != is_ascii[0]) os << " "; // Separate text/non text + + if (is_ascii[1]) { + os << byte; + } else { + os << std::hex << std::uppercase << std::setw(2) << std::setfill('0') << static_cast(byte); + } + is_ascii[0] = is_ascii[1]; + } + os << ")"; + } + return os.str(); +} + +} // namespace ZeroMQ + +namespace std { +ostream &operator<<(ostream &stream, const ZeroMQ::Message &message) { + stream << message.dump(); + return stream; +} +} // namespace std diff --git a/AsioZeroMQ/Message.h b/AsioZeroMQ/Message.h new file mode 100644 index 0000000..c0f5249 --- /dev/null +++ b/AsioZeroMQ/Message.h @@ -0,0 +1,84 @@ +#ifndef MESSAGE_H +#define MESSAGE_H + +#include "ErrorCode.h" +#include +#include + +namespace ZeroMQ { + +class Message { +public: + Message() noexcept; + ~Message() noexcept; + Message(Message &&rhs) noexcept; + Message &operator=(Message &&rhs) noexcept; + bool operator==(const Message &other) const noexcept; + + bool operator!=(const Message &other) const noexcept; + + Message(size_t size) { + int rc = zmq_msg_init_size(&m_message, size); + if (rc != 0) throw makeErrorCode(); + } + Message(const void *dataSrc, size_t size); + Message(std::string_view string) : Message(string.data(), string.size()) { + } + + void *data() noexcept { + return zmq_msg_data(&m_message); + } + const void *data() const noexcept { + return zmq_msg_data(const_cast(&m_message)); + } + + template + T const *data() const noexcept { + return static_cast(data()); + } + + size_t size() const noexcept { + return zmq_msg_size(const_cast(&m_message)); + } + zmq_msg_t *handle() noexcept { + return &m_message; + } + const zmq_msg_t *handle() const noexcept { + return &m_message; + } + bool more() const noexcept { + int rc = zmq_msg_more(const_cast(&m_message)); + return rc != 0; + } + + /** + * @brief copy from message + */ + void copy(Message &message); + + /** + * @brief zeromq的字符串不是以\0结束的,所以调用std::string_view::data()可能会导致错误 + * + * @return std::string_view + */ + std::string_view toStringView() const noexcept { + return std::string_view(static_cast(data()), size()); + } + + std::string dump() const; + +protected: + Message(const Message &) = delete; + void operator=(const Message &) = delete; + +private: + // The underlying message + zmq_msg_t m_message; +}; +} // namespace ZeroMQ + +namespace std { +ostream &operator<<(ostream &stream, const ZeroMQ::Message &message); +} + +#endif // MESSAGE_H diff --git a/AsioZeroMQ/Options.h b/AsioZeroMQ/Options.h new file mode 100644 index 0000000..5e80977 --- /dev/null +++ b/AsioZeroMQ/Options.h @@ -0,0 +1,65 @@ +#ifndef OPTIONS_H +#define OPTIONS_H + +#include + +namespace ZeroMQ { +enum class SocketType : int { + Req = ZMQ_REQ, + Rep = ZMQ_REP, + Dealer = ZMQ_DEALER, + Router = ZMQ_ROUTER, + Pub = ZMQ_PUB, + Sub = ZMQ_SUB, + Xpub = ZMQ_XPUB, + Xsub = ZMQ_XSUB, + Push = ZMQ_PUSH, + Pull = ZMQ_PULL, + Stream = ZMQ_STREAM, + Pair = ZMQ_PAIR +}; + +enum class SendFlags : int { + None = 0, + Dontwait = ZMQ_DONTWAIT, + Sndmore = ZMQ_SNDMORE, +}; + +enum class RecvFlags : int { + None = 0, + Dontwait = ZMQ_DONTWAIT, +}; + +// BoolUnit: if true accepts values of type bool (but passed as T into libzmq) +template +struct IntegralOption {}; + +// NullTerm: +// 0: binary data +// 1: null-terminated string (`getsockopt` size includes null) +// 2: binary (size 32) or Z85 encoder string of size 41 (null included) +template +struct ArrayOption {}; + +// ZMQ_DEFINE_INTEGRAL_OPT(ZMQ_LINGER, linger, int); +using LingerType = IntegralOption; +inline constexpr LingerType Linger; + +// ZMQ_DEFINE_INTEGRAL_OPT(ZMQ_RCVHWM, rcvhwm, int); +using ReceiveHighWaterMarkType = IntegralOption; +inline constexpr ReceiveHighWaterMarkType ReceiveHighWaterMark; + +// ZMQ_DEFINE_INTEGRAL_OPT(ZMQ_EVENTS, events, int); +using EventsType = IntegralOption; +inline constexpr EventsType Events; + +// ZMQ_DEFINE_INTEGRAL_BOOL_UNIT_OPT(ZMQ_RCVMORE, rcvmore, int); +using ReceiveMoreType = IntegralOption; +inline constexpr ReceiveMoreType ReceiveMore; + +// ZMQ_DEFINE_ARRAY_OPT(ZMQ_SUBSCRIBE, subscribe); +using SubscribeType = ArrayOption; +inline constexpr SubscribeType Subscribe; + +} // namespace ZeroMQ +#endif // OPTIONS_H diff --git a/AsioZeroMQ/SocketService.cpp b/AsioZeroMQ/SocketService.cpp new file mode 100644 index 0000000..2aa5e2b --- /dev/null +++ b/AsioZeroMQ/SocketService.cpp @@ -0,0 +1,62 @@ +#include "SocketService.h" +#include "ErrorCode.h" +#include +#include + +#include "BoostLog.h" + +namespace ZeroMQ { +SocketService::SocketService(boost::asio::io_context &context) : boost::asio::io_context::service(context) { + m_context = zmq_ctx_new(); +} + +SocketService::~SocketService() { zmq_ctx_term(m_context); } + +void SocketService::construct(ImplementationType &impl, SocketType type) { + impl = std::make_shared(m_context, type, get_io_context()); +} + +void SocketService::destroy(ImplementationType &impl) { impl.reset(); } + +void SocketService::connect(ImplementationType &impl, std::string_view endpoint, boost::system::error_code &error) { + std::lock_guard lock_guard(impl->mutex); + BOOST_ASSERT_MSG(impl->socket, "invalid socket"); + auto status = zmq_connect(impl->socket, endpoint.data()); + if (status < 0) error = makeErrorCode(); +} + +void SocketService::setOption(SocketService::ImplementationType &impl, int option, const void *optval, size_t optvallen, + boost::system::error_code &error) { + int status = zmq_setsockopt(impl->socket, option, optval, optvallen); + if (status < 0) error = makeErrorCode(); +} + +void SocketService::option(const ImplementationType &impl, int option, void *optval, size_t *optvallen, + boost::system::error_code &error) { + int rc = zmq_getsockopt(impl->socket, option, optval, optvallen); + if (rc != 0) error = makeErrorCode(); +} + +SocketService::Implementation::Implementation(void *context, SocketType type, boost::asio::io_context &ioContext) { + socket = zmq_socket(context, static_cast(type)); + NativeHandleType handle = 0; + auto size = sizeof(handle); + auto rc = zmq_getsockopt(socket, ZMQ_FD, &handle, &size); + if (rc < 0) { + throw makeErrorCode(); + } +#if !defined BOOST_ASIO_WINDOWS + descriptor.reset(new StreamType(ioContext, handle)); +#else + // Use duplicated SOCKET, because ASIO socket takes ownership over it so destroys one in dtor. + ::WSAPROTOCOL_INFOW pi; + ::WSADuplicateSocketW(handle, ::GetCurrentProcessId(), &pi); + handle = ::WSASocketW(pi.iAddressFamily /*AF_INET*/, pi.iSocketType /*SOCK_STREAM*/, pi.iProtocol /*IPPROTO_TCP*/, + &pi, 0, 0); + descriptor.reset(new boost::asio::ip::tcp::socket(ioContext, boost::asio::ip::tcp::v4(), handle)); +#endif +} + +SocketService::Implementation::~Implementation() { zmq_close(socket); } + +} // namespace ZeroMQ diff --git a/AsioZeroMQ/SocketService.h b/AsioZeroMQ/SocketService.h new file mode 100644 index 0000000..ceb2f1b --- /dev/null +++ b/AsioZeroMQ/SocketService.h @@ -0,0 +1,63 @@ +#ifndef SOCKETSERVICE_H +#define SOCKETSERVICE_H + +#include "Options.h" +#include +#include +#include +#include + +namespace ZeroMQ { + +#if !defined BOOST_ASIO_WINDOWS +struct StreamDescriptorClose { + void operator()(boost::asio::posix::stream_descriptor *descriptor) { + descriptor->release(); + delete descriptor; + } +}; +using StreamDescriptor = std::unique_ptr; +using StreamType = boost::asio::posix::stream_descriptor; +using NativeHandleType = boost::asio::posix::stream_descriptor::native_handle_type; +#else +using StreamDescriptor = std::unique_ptr; +using StreamType = boost::asio::ip::tcp::socket; +using NativeHandleType = boost::asio::ip::tcp::socket::native_handle_type; +#endif + +class SocketService : public boost::asio::io_context::service { +public: + inline static boost::asio::execution_context::id id; + SocketService(boost::asio::io_context &context); + ~SocketService(); + + class Implementation { + public: + Implementation(void *context, SocketType type, boost::asio::io_context &ioContext); + ~Implementation(); + + void *socket; + StreamDescriptor descriptor; + std::mutex mutex; + }; + using ImplementationType = std::shared_ptr; + + void construct(ImplementationType &impl, SocketType type); + + void destroy(ImplementationType &impl); + + void connect(ImplementationType &impl, std::string_view endpoint, boost::system::error_code &error); + void setOption(ImplementationType &impl, int option, const void *optval, size_t optvallen, + boost::system::error_code &error); + void option(const ImplementationType &impl, int option, void *optval, size_t *optvallen, + boost::system::error_code &error); + + /// Destroy all user-defined handler objects owned by the service. + void shutdown() {} + +private: + void *m_context; +}; +} // namespace ZeroMQ + +#endif // SOCKETSERVICE_H diff --git a/AsioZeroMQ/ZeroMQSocket.h b/AsioZeroMQ/ZeroMQSocket.h new file mode 100644 index 0000000..53f5343 --- /dev/null +++ b/AsioZeroMQ/ZeroMQSocket.h @@ -0,0 +1,10 @@ +#ifndef SOCKET_H +#define SOCKET_H + +#include "BasicSocket.h" +#include "SocketService.h" + +namespace ZeroMQ { +using Socket = BasicSocket; +} +#endif // SOCKET_H diff --git a/CMakeLists.txt b/CMakeLists.txt index fd1baa0..80a7f1c 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -2,5 +2,9 @@ cmake_minimum_required(VERSION 3.15) project(Kylin) +set(ZeroMQ_INCLUDE_DIR ${ZeroMQ_ROOT}/include) +set(ZeroMQ_LIBRARY_DIRS ${ZeroMQ_ROOT}/lib) + +add_subdirectory(AsioZeroMQ) add_subdirectory(HttpProxy) add_subdirectory(Universal) \ No newline at end of file diff --git a/resource/deploy.sh b/resource/deploy.sh index 53b46d6..1c511ea 100644 --- a/resource/deploy.sh +++ b/resource/deploy.sh @@ -18,7 +18,8 @@ function cmake_scan() { -S ${base_path} \ -B ${build_path} \ -DCMAKE_BUILD_TYPE=Debug \ - -DBOOST_ROOT=${libraries_root}/boost_1_82_0 + -DBOOST_ROOT=${libraries_root}/boost_1_82_0 \ + -DZeroMQ_ROOT=${libraries_root}/zeromq-4.3.4_debug } function build() {