websocket服务器支持根据url选择不同的运行逻辑

This commit is contained in:
xiongziliang 2020-01-15 15:11:42 +08:00
parent 34e3e9f720
commit caa870c37e
2 changed files with 139 additions and 53 deletions

View File

@ -30,16 +30,80 @@
#include "HttpSession.h"
#include "Network/TcpServer.h"
/**
*
*/
class SendInterceptor{
public:
typedef function<int(const Buffer::Ptr &buf)> onBeforeSendCB;
SendInterceptor() = default;
virtual ~SendInterceptor() = default;
virtual void setOnBeforeSendCB(const onBeforeSendCB &cb) = 0;
};
/**
* TcpSession派生类发送数据的截取
* websocket协议的打包
*/
template <typename TcpSessionType>
class TcpSessionTypeImp : public TcpSessionType, public SendInterceptor{
public:
typedef std::shared_ptr<TcpSessionTypeImp> Ptr;
TcpSessionTypeImp(const Parser &header, const HttpSession &parent, const Socket::Ptr &pSock) :
_identifier(parent.getIdentifier()), TcpSessionType(pSock) {}
~TcpSessionTypeImp() {}
/**
*
* @param cb
*/
void setOnBeforeSendCB(const onBeforeSendCB &cb) override {
_beforeSendCB = cb;
}
protected:
/**
* send函数截取数据
* @param buf
* @return
*/
int send(const Buffer::Ptr &buf) override {
if (_beforeSendCB) {
return _beforeSendCB(buf);
}
return TcpSessionType::send(buf);
}
string getIdentifier() const override {
return _identifier;
}
private:
onBeforeSendCB _beforeSendCB;
string _identifier;
};
template <typename TcpSessionType>
class TcpSessionCreator {
public:
//返回的TcpSession必须派生于SendInterceptor可以返回null
TcpSession::Ptr operator()(const Parser &header, const HttpSession &parent, const Socket::Ptr &pSock){
return std::make_shared<TcpSessionTypeImp<TcpSessionType> >(header,parent,pSock);
}
};
/**
* WebSocket协议
* WebSock协议下的具体业务协议WebSocket协议的Rtmp协议等
* @tparam SessionType TcpSession类
*/
template <class SessionType,class HttpSessionType = HttpSession,WebSocketHeader::Type DataType = WebSocketHeader::TEXT>
class WebSocketSession : public HttpSessionType {
template<typename Creator, typename HttpSessionType = HttpSession, WebSocketHeader::Type DataType = WebSocketHeader::TEXT>
class WebSocketSessionBase : public HttpSessionType {
public:
WebSocketSession(const Socket::Ptr &pSock) : HttpSessionType(pSock){}
virtual ~WebSocketSession(){}
WebSocketSessionBase(const Socket::Ptr &pSock) : HttpSessionType(pSock){}
virtual ~WebSocketSessionBase(){}
//收到eof或其他导致脱离TcpServer事件的回调
void onError(const SockException &err) override{
@ -69,23 +133,27 @@ protected:
*/
bool onWebSocketConnect(const Parser &header) override{
//创建websocket session类
_session = std::make_shared<SessionImp>(HttpSessionType::getIdentifier(),HttpSessionType::_sock);
_session = _creator(header, *this,HttpSessionType::_sock);
if(!_session){
//此url不允许创建websocket连接
return false;
}
auto strongServer = _weakServer.lock();
if(strongServer){
_session->attachServer(*strongServer);
}
//此处截取数据并进行websocket协议打包
weak_ptr<WebSocketSession> weakSelf = dynamic_pointer_cast<WebSocketSession>(HttpSessionType::shared_from_this());
_session->setOnBeforeSendCB([weakSelf](const Buffer::Ptr &buf){
weak_ptr<WebSocketSessionBase> weakSelf = dynamic_pointer_cast<WebSocketSessionBase>(HttpSessionType::shared_from_this());
dynamic_pointer_cast<SendInterceptor>(_session)->setOnBeforeSendCB([weakSelf](const Buffer::Ptr &buf) {
auto strongSelf = weakSelf.lock();
if(strongSelf){
if (strongSelf) {
WebSocketHeader header;
header._fin = true;
header._reserved = 0;
header._opcode = DataType;
header._mask_flag = false;
strongSelf->WebSocketSplitter::encode(header,buf);
strongSelf->WebSocketSplitter::encode(header, buf);
}
return buf->size();
});
@ -155,50 +223,19 @@ protected:
void onWebSocketEncodeData(const Buffer::Ptr &buffer) override{
SocketHelper::send(buffer);
}
private:
typedef function<int(const Buffer::Ptr &buf)> onBeforeSendCB;
/**
* TcpSession派生类发送数据的截取
* websocket协议的打包
*/
class SessionImp : public SessionType{
public:
SessionImp(const string &identifier,const Socket::Ptr &pSock) :
_identifier(identifier),SessionType(pSock){}
~SessionImp(){}
/**
*
* @param cb
*/
void setOnBeforeSendCB(const onBeforeSendCB &cb){
_beforeSendCB = cb;
}
protected:
/**
* send函数截取数据
* @param buf
* @return
*/
int send(const Buffer::Ptr &buf) override {
if(_beforeSendCB){
return _beforeSendCB(buf);
}
return SessionType::send(buf);
}
string getIdentifier() const override{
return _identifier;
}
private:
onBeforeSendCB _beforeSendCB;
string _identifier;
};
private:
string _remian_data;
weak_ptr<TcpServer> _weakServer;
std::shared_ptr<SessionImp> _session;
TcpSession::Ptr _session;
Creator _creator;
};
template<typename TcpSessionType,typename HttpSessionType = HttpSession,WebSocketHeader::Type DataType = WebSocketHeader::TEXT>
class WebSocketSession : public WebSocketSessionBase<TcpSessionCreator<TcpSessionType>,HttpSessionType,DataType>{
public:
WebSocketSession(const Socket::Ptr &pSock) : WebSocketSessionBase<TcpSessionCreator<TcpSessionType>,HttpSessionType,DataType>(pSock){}
virtual ~WebSocketSession(){}
};
#endif //ZLMEDIAKIT_WEBSOCKETSESSION_H

View File

@ -51,6 +51,7 @@ public:
}
void onRecv(const Buffer::Ptr &buffer) override {
//回显数据
send("from EchoSession:");
send(buffer);
}
void onError(const SockException &err) override{
@ -62,6 +63,48 @@ public:
}
};
class EchoSessionWithUrl : public TcpSession {
public:
EchoSessionWithUrl(const Socket::Ptr &pSock) : TcpSession(pSock){
DebugL;
}
virtual ~EchoSessionWithUrl(){
DebugL;
}
void attachServer(const TcpServer &server) override{
DebugL << getIdentifier() << " " << TcpSession::getIdentifier();
}
void onRecv(const Buffer::Ptr &buffer) override {
//回显数据
send("from EchoSessionWithUrl:");
send(buffer);
}
void onError(const SockException &err) override{
WarnL << err.what();
}
//每隔一段时间触发,用来做超时管理
void onManager() override{
DebugL;
}
};
/**
* websocket 访url选择创建不同的对象
*/
struct EchoSessionCreator {
//返回的TcpSession必须派生于SendInterceptor可以返回null(拒绝连接)
TcpSession::Ptr operator()(const Parser &header, const HttpSession &parent, const Socket::Ptr &pSock) {
// return nullptr;
if (header.Url() == "/") {
return std::make_shared<TcpSessionTypeImp<EchoSession> >(header, parent, pSock);
}
return std::make_shared<TcpSessionTypeImp<EchoSessionWithUrl> >(header, parent, pSock);
}
};
int main(int argc, char *argv[]) {
//设置日志
Logger::Instance().add(std::make_shared<ConsoleChannel>());
@ -71,13 +114,19 @@ int main(int argc, char *argv[]) {
TcpServer::Ptr httpSrv(new TcpServer());
//http服务器,支持websocket
httpSrv->start<WebSocketSession<EchoSession,HttpSession>>(80);//默认80
httpSrv->start<WebSocketSessionBase<EchoSessionCreator,HttpSession> >(80);//默认80
TcpServer::Ptr httpsSrv(new TcpServer());
//https服务器,支持websocket
httpsSrv->start<WebSocketSession<EchoSession,HttpsSession>>(443);//默认443
httpsSrv->start<WebSocketSessionBase<EchoSessionCreator,HttpsSession> >(443);//默认443
TcpServer::Ptr httpSrvOld(new TcpServer());
//兼容之前的代码(但是不支持根据url选择生成TcpSession类型)
httpSrvOld->start<WebSocketSession<EchoSession,HttpSession> >(8080);
DebugL << "请打开网页:http://www.websocket-test.com/,进行测试";
DebugL << "连接 ws://127.0.0.1/xxxxws://127.0.0.1/ 测试的效果将不同支持根据url选择不同的处理逻辑";
DebugL << "请打开网页:http://www.websocket-test.com/,连接 ws://127.0.0.1/测试";
//设置退出信号处理函数
static semaphore sem;