/* * Copyright (c) 2016 The ZLToolKit project authors. All Rights Reserved. * * This file is part of ZLToolKit(https://github.com/ZLMediaKit/ZLToolKit). * * Use of this source code is governed by MIT license that can be found in the * LICENSE file in the root of the source tree. All contributing project authors * may be found in the AUTHORS file in the root of the source tree. */ #ifndef UTIL_RINGBUFFER_H_ #define UTIL_RINGBUFFER_H_ #include #include #include #include #include #include #include #include "util.h" #include "List.h" #include "Poller/EventPoller.h" // GOP缓存最大长度下限值 #define RING_MIN_SIZE 32 #define LOCK_GUARD(mtx) std::lock_guard lck(mtx) namespace toolkit { template class RingDelegate { public: using Ptr = std::shared_ptr; RingDelegate() = default; virtual ~RingDelegate() = default; virtual void onWrite(T in, bool is_key = true) = 0; }; template class _RingStorage; template class _RingReaderDispatcher; /** * 环形缓存读取器 * 该对象的事件触发都会在绑定的poller线程中执行 * 所以把锁去掉了 * 对该对象的一切操作都应该在poller线程中执行 */ template class _RingReader { public: using Ptr = std::shared_ptr<_RingReader>; friend class _RingReaderDispatcher; _RingReader(std::shared_ptr<_RingStorage> storage) { _storage = std::move(storage); setReadCB(nullptr); setDetachCB(nullptr); setGetInfoCB(nullptr); setMessageCB(nullptr); } ~_RingReader() = default; void setReadCB(std::function cb) { if (!cb) { _read_cb = [](const T &) {}; } else { _read_cb = std::move(cb); flushGop(); } } void setDetachCB(std::function cb) { _detach_cb = cb ? std::move(cb) : []() {}; } void setGetInfoCB(std::function cb) { _info_cb = cb ? std::move(cb) : []() { return Any(); }; } void setMessageCB(std::function cb) { _msg_cb = cb ? std::move(cb) : [](const Any &data) {}; } private: void onRead(const T &data, bool /*is_key*/) { _read_cb(data); } void onMessage(const Any &data) { _msg_cb(data); } void onDetach() const { _detach_cb(); } Any getInfo() { return _info_cb(); } void flushGop() { if (!_storage) { return; } _storage->getCache().for_each([this](const List> &lst) { lst.for_each([this](const std::pair &pr) { onRead(pr.second, pr.first); }); }); } private: std::shared_ptr<_RingStorage> _storage; std::function _detach_cb; std::function _read_cb; std::function _info_cb; std::function _msg_cb; }; template class _RingStorage { public: using Ptr = std::shared_ptr<_RingStorage>; using GopType = List>>; _RingStorage(size_t max_size, size_t max_gop_size) { // gop缓存个数不能小于32 if (max_size < RING_MIN_SIZE) { max_size = RING_MIN_SIZE; } _max_size = max_size; _max_gop_size = max_gop_size; clearCache(); } ~_RingStorage() = default; /** * 写入环形缓存数据 * @param in 数据 * @param is_key 是否为关键帧 * @return 是否触发重置环形缓存大小 */ void write(T in, bool is_key = true) { if (is_key) { _have_idr = true; _started = true; if (!_data_cache.back().empty()) { //当前gop列队还没收到任意缓存 _data_cache.emplace_back(); } if (_data_cache.size() > _max_gop_size) { // GOP个数超过限制,那么移除最早的GOP popFrontGop(); } } if (!_have_idr && _started) { //缓存中没有关键帧,那么gop缓存无效 return; } _data_cache.back().emplace_back(std::make_pair(is_key, std::move(in))); if (++_size > _max_size) { // GOP缓存溢出 while (_data_cache.size() > 1) { //先尝试清除老的GOP缓存 popFrontGop(); } if (_size > _max_size) { //还是大于最大缓冲限制,那么清空所有GOP clearCache(); } } } Ptr clone() const { Ptr ret(new _RingStorage()); ret->_size = _size; ret->_have_idr = _have_idr; ret->_started = _started; ret->_max_size = _max_size; ret->_max_gop_size = _max_gop_size; ret->_data_cache = _data_cache; return ret; } const GopType &getCache() const { return _data_cache; } void clearCache() { _size = 0; _have_idr = false; _data_cache.clear(); _data_cache.emplace_back(); } private: _RingStorage() = default; void popFrontGop() { if (!_data_cache.empty()) { _size -= _data_cache.front().size(); _data_cache.pop_front(); if (_data_cache.empty()) { _data_cache.emplace_back(); } } } private: bool _started = false; bool _have_idr; size_t _size; size_t _max_size; size_t _max_gop_size; GopType _data_cache; }; template class RingBuffer; /** * 环形缓存事件派发器,只能一个poller线程操作它 * @tparam T */ template class _RingReaderDispatcher : public std::enable_shared_from_this<_RingReaderDispatcher> { public: using Ptr = std::shared_ptr<_RingReaderDispatcher>; using RingReader = _RingReader; using RingStorage = _RingStorage; using onChangeInfoCB = std::function; friend class RingBuffer; ~_RingReaderDispatcher() { decltype(_reader_map) reader_map; reader_map.swap(_reader_map); for (auto &pr : reader_map) { auto reader = pr.second.lock(); if (reader) { reader->onDetach(); } } } private: _RingReaderDispatcher( const typename RingStorage::Ptr &storage, std::function onSizeChanged) { _reader_size = 0; _storage = storage; _on_size_changed = std::move(onSizeChanged); assert(_on_size_changed); } void write(T in, bool is_key = true) { for (auto it = _reader_map.begin(); it != _reader_map.end();) { auto reader = it->second.lock(); if (!reader) { it = _reader_map.erase(it); --_reader_size; onSizeChanged(false); continue; } reader->onRead(in, is_key); ++it; } _storage->write(std::move(in), is_key); } void sendMessage(const Any &data) { for (auto it = _reader_map.begin(); it != _reader_map.end();) { auto reader = it->second.lock(); if (!reader) { it = _reader_map.erase(it); --_reader_size; onSizeChanged(false); continue; } reader->onMessage(data); ++it; } } std::shared_ptr attach(const EventPoller::Ptr &poller, bool use_cache) { if (!poller->isCurrentThread()) { throw std::runtime_error("You can attach RingBuffer only in it's poller thread"); } std::weak_ptr<_RingReaderDispatcher> weak_self = this->shared_from_this(); auto on_dealloc = [weak_self, poller](RingReader *ptr) { poller->async([weak_self, ptr]() { auto strong_self = weak_self.lock(); if (strong_self && strong_self->_reader_map.erase(ptr)) { --strong_self->_reader_size; strong_self->onSizeChanged(false); } delete ptr; }); }; std::shared_ptr reader(new RingReader(use_cache ? _storage : nullptr), on_dealloc); _reader_map[reader.get()] = reader; ++_reader_size; onSizeChanged(true); return reader; } void onSizeChanged(bool add_flag) { _on_size_changed(_reader_size, add_flag); } void clearCache() { if (_reader_size == 0) { _storage->clearCache(); } } std::list getInfoList(const onChangeInfoCB &on_change) { std::list ret; for (auto &pr : _reader_map) { auto reader = pr.second.lock(); if (!reader) { continue; } auto info = reader->getInfo(); if (!info) { continue; } ret.emplace_back(on_change(std::move(info))); } return ret; } private: std::atomic_int _reader_size; std::function _on_size_changed; typename RingStorage::Ptr _storage; std::unordered_map> _reader_map; }; template class RingBuffer : public std::enable_shared_from_this> { public: using Ptr = std::shared_ptr; using RingReader = _RingReader; using RingStorage = _RingStorage; using RingReaderDispatcher = _RingReaderDispatcher; using onReaderChanged = std::function; using onGetInfoCB = std::function &info_list)>; RingBuffer(size_t max_size = 1024, onReaderChanged cb = nullptr, size_t max_gop_size = 1) { _storage = std::make_shared(max_size, max_gop_size); _on_reader_changed = cb ? std::move(cb) : [](int size) {}; //先触发无人观看 _on_reader_changed(0); } ~RingBuffer() = default; void write(T in, bool is_key = true) { if (_delegate) { _delegate->onWrite(std::move(in), is_key); return; } LOCK_GUARD(_mtx_map); for (auto &pr : _dispatcher_map) { auto &second = pr.second; //切换线程后触发onRead事件 pr.first->async([second, in, is_key]() mutable { second->write(std::move(in), is_key); }, false); } _storage->write(std::move(in), is_key); } void sendMessage(const Any &data) { LOCK_GUARD(_mtx_map); for (auto &pr : _dispatcher_map) { auto &second = pr.second; // 切换线程后触发sendMessage pr.first->async([second, data]() { second->sendMessage(data); }, false); } } void setDelegate(const typename RingDelegate::Ptr &delegate) { _delegate = delegate; } std::shared_ptr attach(const EventPoller::Ptr &poller, bool use_cache = true) { typename RingReaderDispatcher::Ptr dispatcher; { LOCK_GUARD(_mtx_map); auto &ref = _dispatcher_map[poller]; if (!ref) { std::weak_ptr weak_self = this->shared_from_this(); auto onSizeChanged = [weak_self, poller](int size, bool add_flag) { if (auto strong_self = weak_self.lock()) { strong_self->onSizeChanged(poller, size, add_flag); } }; auto onDealloc = [poller](RingReaderDispatcher *ptr) { poller->async([ptr]() { delete ptr; }); }; ref.reset(new RingReaderDispatcher(_storage->clone(), std::move(onSizeChanged)), std::move(onDealloc)); } dispatcher = ref; } return dispatcher->attach(poller, use_cache); } int readerCount() { return _total_count; } void clearCache() { LOCK_GUARD(_mtx_map); _storage->clearCache(); for (auto &pr : _dispatcher_map) { auto &second = pr.second; //切换线程后清空缓存 pr.first->async([second]() { second->clearCache(); }, false); } } void getInfoList(const onGetInfoCB &cb, const typename RingReaderDispatcher::onChangeInfoCB &on_change = nullptr) { if (!cb) { return; } if (!on_change) { const_cast(on_change) = [](Any &&info) { return std::move(info); }; } LOCK_GUARD(_mtx_map); auto info_vec = std::make_shared>>(); // 1、最少确保一个元素 info_vec->resize(_dispatcher_map.empty() ? 1 : _dispatcher_map.size()); std::shared_ptr on_finished(nullptr, [cb, info_vec](void *) mutable { // 2、防止这里为空 auto &lst = *info_vec->begin(); for (auto &item : *info_vec) { if (&lst != &item) { lst.insert(lst.end(), item.begin(), item.end()); } } cb(lst); }); auto i = 0U; for (auto &pr : _dispatcher_map) { auto &second = pr.second; pr.first->async([second, info_vec, on_finished, i, on_change]() { (*info_vec)[i] = second->getInfoList(on_change); }); ++i; } } private: void onSizeChanged(const EventPoller::Ptr &poller, int size, bool add_flag) { if (size == 0) { LOCK_GUARD(_mtx_map); _dispatcher_map.erase(poller); } if (add_flag) { ++_total_count; } else { --_total_count; } _on_reader_changed(_total_count); } private: struct HashOfPtr { std::size_t operator()(const EventPoller::Ptr &key) const { return (std::size_t)key.get(); } }; private: std::mutex _mtx_map; std::atomic_int _total_count { 0 }; typename RingStorage::Ptr _storage; typename RingDelegate::Ptr _delegate; onReaderChanged _on_reader_changed; std::unordered_map _dispatcher_map; }; } /* namespace toolkit */ #endif /* UTIL_RINGBUFFER_H_ */