Older/ToolKit/Poller/EventPoller.h

289 lines
8.1 KiB
C
Raw Permalink Normal View History

2024-09-28 23:55:00 +08:00
/*
* Copyright (c) 2016 The ZLToolKit project authors. All Rights Reserved.
*
* This file is part of ZLToolKit(https://github.com/ZLMediaKit/ZLToolKit).
*
* Use of this source code is governed by MIT license that can be found in the
* LICENSE file in the root of the source tree. All contributing project authors
* may be found in the AUTHORS file in the root of the source tree.
*/
#ifndef EventPoller_h
#define EventPoller_h
#include <mutex>
#include <thread>
#include <string>
#include <functional>
#include <memory>
#include <unordered_map>
#include <unordered_set>
#include "PipeWrap.h"
#include "Util/logger.h"
#include "Util/List.h"
#include "Thread/TaskExecutor.h"
#include "Thread/ThreadPool.h"
#include "Network/Buffer.h"
#include "Network/BufferSock.h"
#if defined(__linux__) || defined(__linux)
#define HAS_EPOLL
#endif //__linux__
#if defined(__APPLE__) || defined(__FreeBSD__) || defined(__NetBSD__) || defined(__OpenBSD__)
#define HAS_KQUEUE
#endif // __APPLE__
namespace toolkit {
class EventPoller : public TaskExecutor, public AnyStorage, public std::enable_shared_from_this<EventPoller> {
public:
friend class TaskExecutorGetterImp;
using Ptr = std::shared_ptr<EventPoller>;
using PollEventCB = std::function<void(int event)>;
using PollCompleteCB = std::function<void(bool success)>;
using DelayTask = TaskCancelableImp<uint64_t(void)>;
typedef enum {
Event_Read = 1 << 0, //读事件
Event_Write = 1 << 1, //写事件
Event_Error = 1 << 2, //错误事件
Event_LT = 1 << 3,//水平触发
} Poll_Event;
~EventPoller();
/**
* EventPollerPool单例中的第一个EventPoller实例
*
* @return
*/
static EventPoller &Instance();
/**
*
* @param fd
* @param event Event_Read | Event_Write
* @param cb functional
* @return -1:0:
*/
int addEvent(int fd, int event, PollEventCB cb);
/**
*
* @param fd
* @param cb functional
* @return -1:0:
*/
int delEvent(int fd, PollCompleteCB cb = nullptr);
/**
*
* @param fd
* @param event Event_Read | Event_Write
* @return -1:0:
*/
int modifyEvent(int fd, int event, PollCompleteCB cb = nullptr);
/**
*
* @param task
* @param may_sync 线线may_sync为true时就是同步执行任务
* @return true
*/
Task::Ptr async(TaskIn task, bool may_sync = true) override;
/**
* async方法
* @param task
* @param may_sync 线线may_sync为true时就是同步执行任务
* @return true
*/
Task::Ptr async_first(TaskIn task, bool may_sync = true) override;
/**
* 线线
* @return 线
*/
bool isCurrentThread();
/**
*
* @param delay_ms
* @param task 0
* @return
*/
DelayTask::Ptr doDelayTask(uint64_t delay_ms, std::function<uint64_t()> task);
/**
* 线Poller实例
*/
static EventPoller::Ptr getCurrentPoller();
/**
* 线socket共享的读缓存
*/
SocketRecvBuffer::Ptr getSharedBuffer(bool is_udp);
/**
* poller线程id
*/
std::thread::id getThreadId() const;
/**
* 线
*/
const std::string& getThreadName() const;
private:
/**
* EventPollerPool中构造
*/
EventPoller(std::string name);
/**
*
* @param blocked 线
* @param ref_self thread local变量
*/
void runLoop(bool blocked, bool ref_self);
/**
* 线
*/
void onPipeEvent(bool flush = false);
/**
* 线
* @param task
* @param may_sync
* @param first
* @return nullptr
*/
Task::Ptr async_l(TaskIn task, bool may_sync = true, bool first = false);
/**
*
* 线
*/
void shutdown();
/**
*
*/
uint64_t flushDelayTask(uint64_t now);
/**
* select或epoll休眠时间
*/
uint64_t getMinDelay();
/**
*
*/
void addEventPipe();
private:
class ExitException : public std::exception {};
private:
//标记loop线程是否退出
bool _exit_flag;
//线程名
std::string _name;
//当前线程下所有socket共享的读缓存
std::weak_ptr<SocketRecvBuffer> _shared_buffer[2];
//执行事件循环的线程
std::thread *_loop_thread = nullptr;
//通知事件循环的线程已启动
semaphore _sem_run_started;
//内部事件管道
PipeWrap _pipe;
//从其他线程切换过来的任务
std::mutex _mtx_task;
List<Task::Ptr> _list_task;
//保持日志可用
Logger::Ptr _logger;
#if defined(HAS_EPOLL) || defined(HAS_KQUEUE)
// epoll和kqueue相关
int _event_fd = -1;
std::unordered_map<int, std::shared_ptr<PollEventCB> > _event_map;
#else
// select相关
struct Poll_Record {
using Ptr = std::shared_ptr<Poll_Record>;
int fd;
int event;
int attach;
PollEventCB call_back;
};
std::unordered_map<int, Poll_Record::Ptr> _event_map;
#endif //HAS_EPOLL
std::unordered_set<int> _event_cache_expired;
//定时器相关
std::multimap<uint64_t, DelayTask::Ptr> _delay_task_map;
};
class EventPollerPool : public std::enable_shared_from_this<EventPollerPool>, public TaskExecutorGetterImp {
public:
using Ptr = std::shared_ptr<EventPollerPool>;
static const std::string kOnStarted;
#define EventPollerPoolOnStartedArgs EventPollerPool &pool, size_t &size
~EventPollerPool() = default;
/**
*
* @return
*/
static EventPollerPool &Instance();
/**
* EventPoller个数EventPollerPool单例创建前有效
* thread::hardware_concurrency()EventPoller实例
* @param size EventPoller个数0thread::hardware_concurrency()
*/
static void setPoolSize(size_t size = 0);
/**
* 线cpu亲和性cpu亲和性
*/
static void enableCpuAffinity(bool enable);
/**
*
* @return
*/
EventPoller::Ptr getFirstPoller();
/**
*
* 线线
* 线线
* @param prefer_current_thread 线
*/
EventPoller::Ptr getPoller(bool prefer_current_thread = true);
/**
* getPoller() 线
* Socket对象时线
*
* @param flag 线
*/
void preferCurrentThread(bool flag = true);
private:
EventPollerPool();
private:
bool _prefer_current_thread = true;
};
} // namespace toolkit
#endif /* EventPoller_h */