0%

Trluper框架

1 框架需求分析

本框架主要为适用于IO密集类型的服务器开发而设计,同时作为框架,一是对于开发者而言应该是简单容易上手层次分明,二是框架本身而言容易扩展,能够依据开发人员的需求添加业务处理。

为了保证框架能够实现基本的要求,其总体需求如下:

  • 框架能够较好的应对并发场景。
  • 开发人员基于该框架能够轻松的实现自己的业务处理逻辑。

接下来我们会一步一步的构建起这个框架,告诉大家这个框架是怎么实现的,而不是一下子出来一个完整的框架,让大家不知道怎么入手,同时也在这个过程中提出一些问题,让大家对框架更加熟悉,面试中能够回答出来一些关键点。

2 实现服务器内核

对于C++来说,实现网络通信服务器,离不开socketepoll。关于epoll大家可以看我的对epoll的专题解析。这里简单简单介绍一些,服务器通过使用epoll,将对文件描述符的监听工作交给内核出处理,而不用自己另启一个线程取监听,一旦发送变换,就会返回文件描述符变化的数量和已经变化过的文件描述吗,后续我们只需遍历这个集合,判断是读事件还是写事件后进行处理

总结来说,epoll的引入使服务器能够及时的发现是否有数据需要处理,在一定程度上提高了并发度。

在这里我们通过socketepoll构建起一个简单CS架构:

  • 功能
    • 服务端能够监听来自客户端的连接请求,并且建立链接,在建立后向客户端会送一个connected!消息
    • 服务器能够对来自于已建立客户端的信息进行回显。
  • 目录结构介绍:首先我们先建立如下的结构目录,
    • include:里面是我们框架声明的头文件,后续所有的抽象类和继承类头文件
    • src:放置我们的框架的源文件
    • build:放置框架构建好的文件
    • test:放置测试代码
    • TEST:放置测试的debug文件
    • client存放客户端代码
      1
      2
      3
      4
      5
      6
      7
      8
      9
      10
      11
      .
      ├── CMakeLists.txt
      ├── client
      │   ├── CMakeLists.txt
      │   └── build
      ├── TEST
      ├── build
      ├── include
      ├── lib
      ├── src
      └── test

代码实现

首先我们在includesrc处创建对应的Server头文件和源文件,负责客户端的连接请求和信息回显功能。首先,服务器示例应该是一个单例为好,这样后续扩展需要用到Server也能通过添加静态接口来实现

a)创建Server.h

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
/include/Server.h

#ifndef _SERVER_H_
#define _SERVER_H_

#include <string>
#include <list>
#include <iostream>
//epoll和socket头文件
#include <sys/epoll.h>
#include <sys/types.h>
#include <sys/socket.h>
#include <netinet/in.h>
#include <arpa/inet.h>
#include <netinet/in.h>
#include <thread>
#include <stdio.h>
#include <stdlib.h>
#include <fcntl.h>
#include <string.h>
#include <unistd.h>
#include <sys/wait.h>
#include <iostream>
#include <future>
#include <unordered_map>

/*
!v0.1版本
*Server应该以单例的模式运行,因此应该只提供实例化一个服务器对象,因此提供静态方法
*/
class Server{
public:
/*初始化一个服务器单例对象,提供服务的开始,内部其实纠错创建绑定监听socket、并将socker上树*/
static Server* ServerInit(std::string&& ip,int&& port);
/*停止服务器,在进程退出前建议调用该函数,回收相关的资源*/
static void ServerStop();
/*开始运行服务器*/
static void ServerRun();
/*断开一个连接*/
static void ServerDelConn(struct epoll_event& ev);
/*设置server_exit的状态*/
static void ServerExit();

private:
//构造、拷贝构造、赋值私有化
Server(std::string ip,int port);
Server(const Server& s){}
const Server& operator=(const Server& s){return *this;}
~Server();
bool init();
void run();
void ctlAcceptFd();
void ctlCloseFd(struct epoll_event& ev);
//epoll句柄
int epollfd = -1;
//listenfd的文件描述符
int listenfd = -1;
//serveraddr存储服务器的ip地址、端口号
struct sockaddr_in serveraddr;
//判断server的状态,当为true时,说明需要关闭服务器,那么执行ServerStop操作
bool server_exit=false;
//服务器单例对象
static Server* singleServer;

};

#endif

b)Server.cc

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
/src/Server.cpp

#include "Server.h"
Server* Server::singleServer = nullptr;

/// @brief调用构造函数Server()和Init(),创建服务器socket,并执行绑定、监听功能,调用init()进行epoll上树操作
/// @param ip :ip地址
/// @param port :端口号
Server *Server::ServerInit(std::string &&ip, int &&port)
{
if(nullptr==singleServer){
//完美转发
singleServer = new Server(std::forward<std::string>(ip),std::forward<int>(port));
}
return singleServer;
}

/// @brief 停止服务器前应该调用的函数,此时需要回收资源,内部调用end函数
void Server::ServerStop()
{
singleServer->~Server();
delete singleServer;
}



void Server::ServerRun()
{
if(nullptr!=singleServer){
singleServer->run();
}else{
std::cout<<"ServerRun failed, the value of singleServer is nullptr."<<std::endl;
}
}

void Server::ServerDelConn(epoll_event &ev)
{
if(nullptr!=singleServer) singleServer->ctlCloseFd(ev);
}

void Server::ServerExit()
{
if(nullptr!=singleServer) singleServer->server_exit=true;
}

/// @brief 创建服务器socket,并执行绑定、监听功能,调用init()进行epoll上树操作
/// @param ip :ip地址
/// @param port :端口号
Server::Server(std::string ip, int port)
{
const char* _ip=ip.c_str();
int sokcetfd=socket(AF_INET,SOCK_STREAM,0);
std::cout<<"Server start at IP:"<<ip<<" ,Port:"<<port<<std::endl;
if(0<=sokcetfd){
memset(&serveraddr, 0, sizeof(serveraddr));
serveraddr.sin_port=htons(port);
serveraddr.sin_family=AF_INET;
inet_pton(AF_INET,_ip,&serveraddr.sin_addr.s_addr);
//设置地址复用,确保服务器在time_wait也能重启
int opt=1;
setsockopt(sokcetfd,SOL_SOCKET,SO_REUSEADDR,&opt,sizeof(opt));
if(0==bind(sokcetfd,(struct sockaddr *)&serveraddr,sizeof(serveraddr))){
if(0==listen(sokcetfd,10)){
listenfd=sokcetfd;
init();
}
else{
perror("Server start error: Listen");
}
}else{
perror("Server start error: Bind");
}

}else{
perror("Server start error: Get socket failed");
}
}

Server::~Server()
{
if(0<=epollfd){
close(epollfd);
}
close(listenfd);
std::cout<<"GoodBye!"<<std::endl;
}

/// @brief 执行epoll句柄创建和listenfd的上树操作
/// @return 成功返回true,失败返回false
bool Server::init()
{
epollfd=epoll_create(1);
bool bRet=false;
struct epoll_event listenev;
//listenfd使用默认的水平触发
listenev.events=EPOLLIN|EPOLLOUT;
listenev.data.fd=listenfd;
if(0==epoll_ctl(epollfd,EPOLL_CTL_ADD,listenfd,&listenev)){
bRet=true;
}else{
perror("Server start error: Epoll init");
}
return bRet;
}

/// @brief 启动服务器,即while(true)调用epoll_wait
void Server::run()
{
while(false==server_exit){
struct epoll_event hashChange[100];
//阻塞监听
int iEpollRet = 0;
//!这里有一个特点,每次accept后都会再一次触发epoll_wait,这是因为datafd采用边缘触发,刚建立连接,datafd可写就会出现这种二次触发现象
iEpollRet = epoll_wait(epollfd, hashChange, 100, -1);
//!V0.1版本
for(int i=0;i<iEpollRet;++i){
//若是listenfd有变化且为读事件
if(hashChange[i].data.fd==listenfd&&hashChange[i].events&EPOLLIN){
ctlAcceptFd();
}else{
//执行回显
int currentfd=hashChange[i].data.fd;
char buf[1024];
//LT+非阻塞读取,需要循环读
bool continueRead=true;
std::string s("");
while(continueRead){
int ret=recv(currentfd,buf,sizeof(buf),MSG_DONTWAIT);
//*ret<0说明,errno三个状态是正常的:
//!EAGAIN:套接字currentfd设置是非阻塞的,但recv是阻塞的或者接受超时是出现
//!EWOULDBLOCK:用于非阻塞模式,不需要重新读或者写
//!EINTR:指操作被中断唤醒,需要重新读/写
if(0>ret){
if(errno==EAGAIN||errno==EWOULDBLOCK) break;
else if(errno==EINTR) continue;
else{
perror("recv:");
continueRead=false;
}
}
//*客户端关闭了连接,此时服务器应该也下树并关闭
else if(0==ret){
ctlCloseFd(hashChange[i]);
break;
}
else s.append(buf);
}
//!bug:buf不能用new、malloc分配,这样可能使得接收到的数据为空
char _outBuf[s.size()] = {0};
s.copy(_outBuf,s.size());
send(currentfd,_outBuf,sizeof(_outBuf),MSG_WAITALL);
memset(buf,0,1024);
}
}
}
}



/// @brief 对epoll_wait监听到的建立连接请求,进行上树
/// @param dataFd 建立链接后的文件描述符
void Server::ctlAcceptFd()
{
struct sockaddr_in clientSocket;
int dataFd = -1;
socklen_t lAddrlen=sizeof(clientSocket);
dataFd = accept(listenfd,(struct sockaddr*)&clientSocket,&lAddrlen);
if(dataFd>=0){
char ip[16];
inet_ntop(AF_INET,&clientSocket.sin_addr.s_addr,ip,16);
std::cout<<"新的客户端连接,ip:"<<ip<<std::endl;
struct epoll_event client;
//边缘触发
client.events=EPOLLIN|EPOLLOUT|EPOLLET;
//设置datafd为非阻塞
//int flags = fcntl(dataFd,F_GETFL);
//flags|=O_NONBLOCK;
//fcntl(dataFd,F_SETFD,flags);
client.data.fd=dataFd;
//*client.data.ptr可以作为dataFd的绑定通道,后续扩展
if(-1==epoll_ctl(epollfd,EPOLL_CTL_ADD,dataFd,&client))
perror("epoll_ctl error: client");
}else{
perror("accept:");
}

}

/// @brief 监听到客户的关闭信息,执行epoll下树操作和被动关闭socket
/// @param dataFd:需要关闭的套接字
void Server::ctlCloseFd(struct epoll_event& ev)
{
epoll_ctl(epollfd,EPOLL_CTL_DEL,ev.data.fd,&ev);
close(ev.data.fd);
}

c) 错误总结

笔者在写上面的代码的时候碰到了2个有意思的bug。这了做了一些总结:

  • 第一处:我们设置了dataFd为边缘触发,那么第一次连接请求到来后,会触发一次epoll_wait来进行accept。之后debug的时候,它还会再一次触发epoll_wait,网上给出的解释是:
  • 因为datafd采用边缘触发且设置了EPOLLOUT,刚建立连接,datafd可写的可写缓冲区由不可写变为可写,就会出现这种二次触发现象。
  • 第二处:我们代码每次会在触发epoll_wait后创建接受缓存,但是由于编译器的优化,导致这一步骤可能被优化,导致数据染脏。因此最后加入了memset(buf,0,1024)。读者可以去掉这句看看会发送什么结果。

d)测试

为了测试我们服务能够正常运行,我们写了如下的客户端代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
/client/client.h

#include <sys/socket.h>
#include <sys/types.h>
#include <sys/bitypes.h>
#include <string.h>
#include <netinet/in.h>
#include <arpa/inet.h>
#include <netinet/in.h>
#include <stdio.h>
#include <stdlib.h>
#include <fcntl.h>
#include <string.h>
#include <unistd.h>
#include <sys/wait.h>
#include <string>
#include <iostream>
#define messageLen 1024

class client{
public:
client(std::string _ip,int port):_serverIP(_ip),_port(port){
_clientFd=socket(AF_INET,SOCK_STREAM,0);
addr.sin_family = AF_INET;
addr.sin_port = htons(_port);
const char* ip=_serverIP.c_str();
inet_pton(AF_INET,ip,&addr.sin_addr.s_addr);
}
void run(){
int ero = connect(_clientFd,(struct sockaddr*)&addr,sizeof(addr));
if(0!=ero){
perror("connect:");
}
while(true){
std::string s("");
std::cin>>s;
int len=s.size();
//!bug:buf不能用new、malloc分配,这样可能使得介收到的数据为空
char buf[messageLen] = {0};
s.copy(buf,len);
s.clear();
//flags=0为阻塞
send(_clientFd,buf,len,0);
memset(buf,0,len);
recv(_clientFd,buf,messageLen,0);
s=buf;
std::cout<<s<<std::endl;
}
}

private:
struct sockaddr_in addr;
std::string _serverIP;
int _port;
int _clientFd;
};

运行下面的源文件main.cppclient.cpp

1
2
3
4
5
6
7
8
/test/main.cpp

#include "Server.h"

int main(){
Server::ServerInit("172.19.189.128",8080);
Server::ServerRun();
}
1
2
3
4
5
6
7
8
9
10
/client/client.cpp

#include "client.h"

int main(){
std::string serverip("172.19.189.128");
int port=8080;
client _client(serverip,port);
_client.run();
}
测试结果:
1
2
3
4
5
6
7
8
9
10
11
//Server
trluper@DESKTOP-67ADUGH:/home/project/Trluper/TEST$ ./Server
Server start at IP:172.19.189.128,Port:8080
新的客户端连接,ip:1172.19.189.128

//client
trluper@DESKTOP-67ADUGH:/home/project/Trluper/client/build$ ./client
niha
niha
dad
dad

3 实现连接层:为每一个连接都提供自己的datafd管理读写分发

很好,我们已经构建了能够正常通信的基础服务器。但是上面的服务器中我们将读写在写在了Server中,每个客户端都执行相同的逻辑,而且业务扩展极难,这不符合我们的程序设计规则。

因此我们希望每一个·dataFd都绑定有一个专门负责自己读写的类Connections,同时实现读写分离,便于后续的扩展。因此现在我们的框架架构应该是这样

因此,我们可以创建一个Connections类来维护每个客户端的通道,在Connections中执行消息的接收和发送。同时为了后续自定义消息类型的实现,我们还创建了Message.h头文件。那么有了

代码实现

A) AbstarctHandle.h

创建抽象处理类AbstarctHandle.h,后续的连接层、数据层和业务层都继承与该抽象类。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
/include/AbstractHandle.h

#ifndef _ABSTRACTHANDLE_H_
#define _ABSTRACTHANDLE_H_

#include<string>
#include "Message.h"


/*定义一个支持动态类型转换的宏,判断orig数据能否转为type类型的指针,若转换失败则执行返回NULL,成功则返回相应的指针*/
#define DYNAMIC_GETREF(type, ref, orig) type * ref = dynamic_cast<type *>(&orig); if (nullptr == ref) {return nullptr;}

/// @brief 所有带处理操作的基类,后续的Connections、
class AbstractHandle{
public:
AbstractHandle(){}
virtual ~AbstractHandle(){}
//所有继承AbstractHandle共同接口提供给外部调用的接口,内部会调用currentHandle和GetNxetHanlder
virtual void Handle(AbstractMsg& msg);
protected:
//后续连接层、数据处理层和业务应用层都需要实现地函数,本阶段真正的处理函数和下一个处理阶段
virtual AbstractMsg* currentHandle(AbstractMsg& msg) = 0;
virtual AbstractHandle* GetNextHanlder(AbstractMsg& msg) =0;
};

#endif

B) Connections.h

Connections.h规定了连接层的大部分函数,其中许多共同处理已经写定。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
/include/Connections.h

#ifndef _CONNECTIONS_H_
#define _CONNECTIONS_H_

#include "AbstractHandle.h"
#include <list>
#include <memory>
#include <stdio.h>
#include <sys/socket.h>
#include <string.h>
#include <iostream>
#include "Server.h"


class Server;
class UserMsg;
/*
* @brief:当一个连接到来时就会创建Connections对象,与相应的dataFd执行绑定,后续的监听到读写事件会转到Connections对象执行。一般开发者只需要重写类内的GetDataProcessObj虚函数即可和添加一个数据处理对象指针成员
* @memberof dataFd:与dataFd绑定
* @memberof NeedClose:判断这个连接是否被客户端关闭了
* @memberof writeBuffer:当服务器要发送数据时,会将数据存储在这里,这里充当服务器的写缓存
*/
class Connections : public AbstractHandle{
public:
Connections(int _dataFd);
virtual ~Connections();
//获得当前连接的文件描述符
int GetFd(){return dataFd;}
//由Server调用,进行写回,内部调用writeFd
void FlushOut();
//当writebBuffer空时,调用HashOutPut说明已经有数据要发送,此时要告诉Server的可写缓存区变化:有不可写变为可写,来触发epoll_wait
bool HashOutPut(){return false==writerBuffer.empty();}
//当readFd返回值为0时,就会调用该函数,NeedClose置true,执行关闭
void SetConnectionClose(){NeedClose=true;}
//返回NeedClose的状态
bool ConnectionNeedClose(){return NeedClose;}
//!得到数据层对象,开发者需重写这个函数,返回一个DataProcess类型对象指针
virtual DataProcess* GetDataProcessObj(Message& msg) = 0;
protected:
//当writerBuffer有数据时,会调用writeFd将数据写回给客户端
bool writeFd(std::string& _output);
//从dataFd中读取数据
bool ReadFd(std::string& _input);
//当前连接层的处理逻辑在此实现,对于IN返回的是Message对象,对于OUT,将消息写入writerBuffer,返回nullptr
virtual AbstractMsg* currentHandle(AbstractMsg& msg)override;
//位于连接层,因此只处理IN的下一个处理者,对于OUT没有下一个处理者
virtual AbstractHandle* GetNextHanlder(AbstractMsg& msg)override;
protected:
int dataFd; //绑定dataFD
bool NeedClose = false; //指示当前dataFd的开关状态,通知Server应该关闭这个连接
std::list<std::shared_ptr<std::string>> writerBuffer; //当需要主动发送信息给客户端时,这里作为写缓存存储数据
Message* _msg_; //维护一个堆式的读缓存,客户端断开连接后释放
};

#endif

C) Message.h

Message.h内部定义了框架会使用到的一些基本消息,也为后续用户消息的自定义打下基础

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
/include/Message.h


#ifndef _MESSAGE_H_
#define _MESSAGE_H_

#include <string>
#include <memory>

//!IO方向,IN指示方向为连接层-->数据层-->业务层;OUT为业务层-->数据层-->连接层
enum IO_Direction{
IN,OUT
};
class Request;

//!所有消息类的父类
class AbstractMsg{
public:
AbstractMsg();
virtual ~AbstractMsg();
};

//!IO状态类,指示该消息是IN<--->read或OUT<--->write状态,不需要继承
class IOState:public AbstractMsg{
public:
IO_Direction IO_DIC;
IOState(IO_Direction dic);
virtual ~IOState();
};
//!在连接层会构造的传输的消息封装类,不需要继承重写
class Message:public IOState{
public:
Message(IO_Direction dic);
virtual ~Message(){}
std::string message;
};

//!用于协议层解析请求后,递交给应用层的消息封装类,不需要继承重写
class UserMsg:public IOState
{
public:
UserMsg(IO_Direction dic);
virtual ~UserMsg();
Request* getRequest();
void SetRequest(Request* _request);
private:
Request* request = nullptr;
};

//!用户请求类,由用户来添加相关的业务字段,比如业务的请求类型定义(开发者必须重写实现的类)
class Request{
public:
Request(){}
virtual ~Request(){}
};

inline Request* UserMsg::getRequest()
{
return request;
}

inline void UserMsg::SetRequest(Request *_request)
{
request = _request;
}

#endif

D) AbstractHandle.cc

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
/src/AbstractHandle.cc

#include "AbstractHandle.h"

void AbstractHandle::Handle(AbstractMsg &msg)
{
AbstractMsg* _msg = nullptr;
AbstractHandle* nextHanlder = nullptr;
_msg = currentHandle(msg);
if(nullptr != _msg){
nextHanlder = GetNextHanlder(*_msg);
if(nullptr != nextHanlder){
nextHanlder->Handle(*_msg);
}
}
}

E)Connection.cc

完成上述的Connections.h的声明后,我们可以实现它的内部函数:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
/src/Connections.cc

#include "Connections.h"

//6/12
Connections::Connections(int _dataFd):dataFd(_dataFd),_msg_(new Message(IO_Direction::IN))
{

}

Connections::~Connections()
{
delete _msg_;
}




inline void Connections::FlushOut()
{
//!bug:Segmentation fault
while(writerBuffer.size()>0){
if(true == writeFd(*writerBuffer.front()))
writerBuffer.pop_front();
else break;
}
}

inline bool Connections::writeFd(std::string &_output)
{
bool ret = false;
char *_outbuf = (char*)calloc(1UL,_output.size());
_output.copy(_outbuf,_output.size());
if(dataFd>=0 && (_output.size() == send(dataFd,_outbuf,_output.size(),0))){
std::cout<<"send to client:"<<dataFd<<", packetSize: "<<_output.size()<<std::endl;
ret = true;
}
free(_outbuf);
return ret;
}

inline bool Connections::ReadFd(std::string& _input)
{
bool ret = false;
char buf[1024] = {0};
while(true){
ssize_t rcvLen = recv(dataFd,buf,sizeof(buf),MSG_DONTWAIT);
if(0 < rcvLen){
_input.append(buf,rcvLen);
ret = true;
}
//客户端关闭,将NeedClose置为true,服务器执行被动关闭前,需要将writerBffer的数据发送出去
else if(0 == rcvLen){
FlushOut();
SetConnectionClose();
break;
}
else{
//!EAGAIN:套接字currentfd设置是非阻塞的,但recv是阻塞的或者接受超时是出现
//!EWOULDBLOCK:用于非阻塞模式,不需要重新读或者写
//!EINTR:指操作被中断唤醒,需要重新读/写
if(errno == EAGAIN || errno == EWOULDBLOCK) break;
else if(errno == EINTR) continue;
else{
perror("Recv:");
break;
}
}
}
return ret;
}

inline AbstractMsg *Connections::currentHandle(AbstractMsg &msg)
{
AbstractMsg* _msg = nullptr;
DYNAMIC_GETREF(IOState,state,msg);
if(state->IO_DIC==IO_Direction::IN){
if(true == ReadFd(_msg_->message)){
_msg = _msg_;
}
}
/*若是业务层有数据要发送,会调用Handle函数,数据是Message形式的额,同时不会马上发送,而是存储在writeBuffer里面;同时我们还需要让服务器能够察觉到有数据要发送触发epoll_wait的写事件
因此服务器必须提供一个静态函数来设置相应dataFd的event,是不可写状态变为可写*/
else if(state->IO_DIC == IO_Direction::OUT){
DYNAMIC_GETREF(Message,_outMsg,msg);
if(false == HashOutPut()){
Server::ServerSetConectionOut(*this);
}
std::shared_ptr<std::string> str = std::make_shared<std::string>(_outMsg->message);
writerBuffer.push_back(str);
//对来自数据层的Message释放
delete _outMsg;
_outMsg = nullptr;
}
return _msg;
}

inline AbstractHandle *Connections::GetNextHanlder(AbstractMsg &msg)
{
AbstractHandle* nextHanlder = nullptr;
DYNAMIC_GETREF(Message,_msg,msg);
if(_msg->IO_DIC==IO_Direction::IN){
nextHanlder = GetDataProcessObj(*_msg);
}
return nextHanlder;
}

很好,我们已经大体完成了连接层,上面我们能够成功的完成连接请求的响应和接收数据,但是还没有完成数据交由数据层和业务层处理后发挥响应给客户端端,后续我们会依次实现,现在我们先只要做好连接层的事。

Server代码变动

添加了ConnectionsMessage后,Server.h应该增加一个成员变量std::unordered_map<int,Connections*> m_ConnctionsMap;作为Server的连接统计管理。同时增加一个静态函数ServerDelConn能够给提供关闭连接的接口,代码如下

a) Server.h

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
/include/Server.h


#ifndef _SERVER_H_
#define _SERVER_H_

#include <string>
#include <list>
#include <iostream>
//epoll和socket头文件
#include <sys/epoll.h>
#include <sys/types.h>
#include <sys/socket.h>
#include <netinet/in.h>
#include <arpa/inet.h>
#include <netinet/in.h>
#include <thread>
#include <stdio.h>
#include <stdlib.h>
#include <fcntl.h>
#include <string.h>
#include <unistd.h>
#include <sys/wait.h>
#include <iostream>
#include <future>
#include <unordered_map>
#include "Connections.h"

/*
* @Author: wuwenjie wenjiewu2017@163.com
* @Date: 2023-09-14 20:11:48
* @LastEditors: wuwenjie wenjiewu2017@163.com
* @LastEditTime: 2023-09-14 22:07:07
*/

!v0.2版本
*Server应该以单例的模式运行,因此应该只提供实例化一个服务器对象,因此提供静态方法
*/
class Connections;

class Server{
public:
/*初始化一个服务器单例对象,提供服务的开始,内部其实纠错创建绑定监听socket、并将socker上树*/
static Server* ServerInit(std::string&& ip,int&& port);
/*停止服务器,在进程退出前建议调用该函数,回收相关的资源*/
static void ServerStop();
/*开始运行服务器*/
static void ServerRun();
/*断开一个连接*/
static void ServerDelConn(struct epoll_event& ev);
/*设置server_exit的状态*/
static void ServerExit();
/*
设置dataFd由不可写状态变为可写状态,该静态函数是在连接层writeBuffer由没数据变为有数据时触发
*static void ServerSetConectionOut(Connections& conn);
在写完后,重新设置dataFd由可写变为不可写状态
*static void ServerUnSetConnectionOut(Connections& conn);
*/

private:
//构造、拷贝构造、赋值私有化
Server(std::string ip,int port);
Server(const Server& s){}
const Server& operator=(const Server& s){return *this;}
~Server();
bool init();
void run();
//listenfd调用该函数,执行accept、创建Connctions与dataFd绑定、dataFd上树
void ctlAcceptFd();
//当客户端关闭时会调用此函数
void ctlCloseFd(struct epoll_event& ev);
//epoll句柄
int epollfd = -1;
//listenfd的文件描述符
int listenfd = -1;
//serveraddr存储服务器的ip地址、端口号
struct sockaddr_in serveraddr;
//判断server的状态,当为true时,说明需要关闭服务器,那么执行ServerStop操作
bool server_exit=false;
//管理每个dataFd绑定的Connections对象
std::unordered_map<int,Connections*> m_ConnctionsMap;
//服务器单例对象
static Server* singleServer;

};

#endif

c) Server.cc

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
src/Server.cpp

//Trluper框架
#include "Server.h"

Server* Server::singleServer = nullptr;

/// @brief调用构造函数Server()和Init(),创建服务器socket,并执行绑定、监听功能,调用init()进行epoll上树操作
/// @param ip :ip地址
/// @param port :端口号
Server *Server::ServerInit(std::string &&ip, int &&port,AbstractFactory* _singleFactory)
{
if(nullptr==singleServer){
//完美转发
singleServer = new Server(std::forward<std::string>(ip),std::forward<int>(port), _singleFactory);//9/12
}
return singleServer;
}

void Server::ServerExceptionStop()
{
struct sigaction SigEvent;
SigEvent.sa_sigaction = ServerStop;
SigEvent.sa_flags = SA_NOCLDWAIT|SA_NOCLDSTOP;
sigaction(SIGINT,&SigEvent,NULL);
sigaction(SIGQUIT,&SigEvent,NULL);
sigaction(SIGSTOP,&SigEvent,NULL);
}

/// @brief 停止服务器前应该调用的函数,此时需要回收资源,内部调用end函数
void Server::ServerStop(int signal,siginfo_t* info,void* context)
{
singleServer->~Server();
delete singleServer;

}



void Server::ServerRun()
{
if(nullptr!=singleServer){
singleServer->run();
}else{
std::cout<<"ServerRun failed, the value of singleServer is nullptr."<<std::endl;
}
}

void Server::ServerDelConn(epoll_event &ev)
{
if(nullptr!=singleServer) singleServer->ctlCloseFd(ev);
}

void Server::ServerExit()
{
if(nullptr!=singleServer) singleServer->server_exit=true;
}

#ifdef _OLD_CODE_

void Server::ServerSetConectionOut(Connections &conn)
{
int dataFd = conn.GetFd();
struct epoll_event ev;
ev.events = EPOLLIN | EPOLLOUT | EPOLLET;
ev.data.ptr = &conn;
//!若不注释,这里会出现神奇的bug,dataFd的赋值会导致ptr地址值产生变化
//ev.data.fd = dataFd;
epoll_ctl(singleServer->epollfd,EPOLL_CTL_MOD,dataFd,&ev);

}

void Server::ServerUnSetConnectionOut(Connections &conn)
{
struct epoll_event ev;
ev.events = EPOLLIN | EPOLLET;
ev.data.ptr = &conn;
//ev.data.fd = conn.GetFd();
epoll_ctl(singleServer->epollfd,EPOLL_CTL_MOD,conn.GetFd(),&ev);
}
#endif

void Server::ServerUseHandleOfDataProcess(DataProcess &process, Request* request)
{
UserMsg* _UMsg = new UserMsg(IO_Direction::OUT);
_UMsg->SetRequest(request);
process.Handle(*_UMsg);
}


/// @brief 创建服务器socket,并执行绑定、监听功能,调用init()进行epoll上树操作
/// @param ip :ip地址
/// @param port :端口号
Server::Server(std::string ip, int port,AbstractFactory* _singleFactory):singleFactory(_singleFactory)
{
const char* _ip=ip.c_str();
int sokcetfd=socket(AF_INET,SOCK_STREAM,0);
std::cout<<"Server start at IP:"<<ip<<" ,Port:"<<port<<std::endl;
if(0<=sokcetfd){
memset(&serveraddr, 0, sizeof(serveraddr));
serveraddr.sin_port=htons(port);
serveraddr.sin_family=AF_INET;
inet_pton(AF_INET,_ip,&serveraddr.sin_addr.s_addr);
//设置地址复用,确保服务器在time_wait也能重启
int opt=1;
setsockopt(sokcetfd,SOL_SOCKET,SO_REUSEADDR,&opt,sizeof(opt));
if(0==bind(sokcetfd,(struct sockaddr *)&serveraddr,sizeof(serveraddr))){
if(0==listen(sokcetfd,10)){
listenfd=sokcetfd;
init();
}
else{
perror("Server start error: Listen");
}
}else{
perror("Server start error: Bind");
}

}else{
perror("Server start error: Get socket failed");
}
}

Server::~Server()
{
//释放工厂对象
delete singleFactory;
//释放连接
auto it=m_ConnctionsMap.begin();
while(it!=m_ConnctionsMap.end()){
close(it->first);
delete it->second;
m_ConnctionsMap.erase(it->first);
it=m_ConnctionsMap.begin();
}
if(0<=epollfd){
close(epollfd);
}
close(listenfd);
std::cout<<"GoodBye!"<<std::endl;
}

/// @brief 执行epoll句柄创建和listenfd的上树操作
/// @return 成功返回true,失败返回false
bool Server::init()
{
epollfd=epoll_create(1);
bool bRet=false;
struct epoll_event listenev;
//listenfd使用默认的水平触发
listenev.events=EPOLLIN|EPOLLOUT;
listenev.data.fd=listenfd;
if(0==epoll_ctl(epollfd,EPOLL_CTL_ADD,listenfd,&listenev)){
bRet=true;
}else{
perror("Server start error: Epoll init");
}
return bRet;
}

/// @brief 启动服务器
void Server::run()
{
while(false==server_exit){
struct epoll_event hashChange[100];
//阻塞监听
int iEpollRet = 0;
//!这里有一个bug,每次accept后都会再一次触发epoll_wait,这是因为datafd采用边缘触发,刚建立连接,datafd由不可写变为可写就会出现这种二次触发现象(已修正)
iEpollRet = epoll_wait(epollfd, hashChange, 100, -1);
//!V0.3版本
for(int i=0;i<iEpollRet;++i){
//*listenfd读事件
if(hashChange[i].data.fd==listenfd&&hashChange[i].events&EPOLLIN){
ctlAcceptFd();
}
//*dataFd读事件,这里读写事件混合再一起会出现问题,后续需要改变
else if(hashChange[i].events&EPOLLIN){
IOState state(IO_Direction::IN);
Connections* conn =(Connections*)hashChange[i].data.ptr;
conn->Handle(state);
//判断客户端的连接状态
if(conn->ConnectionNeedClose()==true){
Server::ServerDelConn(hashChange[i]);
}
}
#ifdef _OLD_CODE_
//*dataFd写事件,服务器主动发送时就会触发
else{
IOState state(IO_Direction::OUT);
Connections* conn =(Connections*)hashChange[i].data.ptr;
conn->FlushOut();
if(false == conn->HashOutPut()) ServerUnSetConnectionOut(*conn);

}
#endif
}
}
}



/// @brief 对epoll_wait监听到的建立连接请求,进行上树
/// @param dataFd 建立链接后的文件描述符
void Server::ctlAcceptFd()
{
struct sockaddr_in clientSocket;
int dataFd = -1;
socklen_t lAddrlen=sizeof(clientSocket);
dataFd = accept(listenfd,(struct sockaddr*)&clientSocket,&lAddrlen);
if(dataFd>=0){
char ip[16];
inet_ntop(AF_INET,&clientSocket.sin_addr.s_addr,ip,16);
std::cout<<"新的客户端连接,ip:"<<ip<<std::endl;
std::cout<<"<------------------------------------------------------------->"<<std::endl;
//创建相应的连接层、数据层和业务层对象
Connections* conn = singleFactory->CreateAllObjWhenAccept(dataFd);
if(nullptr!=conn){
m_ConnctionsMap[dataFd] = conn; //3/12
struct epoll_event client;
//边缘触发+EPOLLIN,EPOLLOUT后续要send的时候添加,使epoll_wait能够触发写操作
client.events=EPOLLIN|EPOLLET;
//*client.data.ptr可以作为dataFd的绑定通道
client.data.ptr = conn;
if(-1==epoll_ctl(epollfd,EPOLL_CTL_ADD,dataFd,&client))
perror("epoll_ctl error: client");
}
//创建conn失败,向客户端发送“Unknow error!”,直接close
else{
char buf[]="Unknow error!";
send(dataFd,buf,sizeof(buf),MSG_DONTWAIT);
close(dataFd);
}

}else{
perror("accept:");
}

}

/// @brief 监听到客户的关闭信息,执行epoll下树操作和被动关闭socket
/// @param dataFd:需要关闭的套接字
void Server::ctlCloseFd(struct epoll_event& ev)
{
Connections* conn = (Connections*)ev.data.ptr;
m_ConnctionsMap.erase(ev.data.fd);
epoll_ctl(epollfd,EPOLL_CTL_DEL,ev.data.fd,&ev);
close(ev.data.fd);
//释放堆区
delete conn;
}

我们对服务器的内核做了最后的变动,后面就很少去改动原因的代码了,因为连接层作为三层结构中与与服务器内核直接联系的一层,目前已经确定,因此不需要去变动原因的代码。后续有改动只需要添加一下新函数和代码就可以了。从这里可以看出,这样的三层结构能够很好地解耦合我们地框架

测试

由于我们还没有写出数据层、业务层,因此指令直接修改readFd函数测试:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
/src/Connections.cpp/ReadFd

bool Connections::ReadFd(std::string& _input)
{
bool ret = false;
char buf[1024] = {0};
while(true){
ssize_t rcvLen = recv(dataFd,buf,sizeof(buf),MSG_DONTWAIT);
if(0 < rcvLen){
_input.append(buf,rcvLen);
ret = true;
}
//客户端关闭,将NeedClose置为true
else if(0 == rcvLen){
SetConnectionClose();
break;
}
else{
//!EAGAIN:套接字currentfd设置是非阻塞的,但recv是阻塞的或者接受超时是出现
//!EWOULDBLOCK:用于非阻塞模式,不需要重新读或者写
//!EINTR:指操作被中断唤醒,需要重新读/写
if(errno == EAGAIN || errno == EWOULDBLOCK) break;
else if(errno == EINTR) continue;
else{
perror("Recv:");
break;
}
}
}
//测试代码
#ifndef _TEST_
#define _TEST_
std::cout<<this<<std::endl;
Server::ServerSetConectionOut(*this);
char * senBuf =(char*)calloc(1UL,_input.size());
_input.copy(senBuf,_input.size());
send(dataFd,senBuf,_input.size(),0);
#endif
return ret;
}

  • 然后进入:/home/project/Trluper/client/build$进行make

    1
    2
    3
    trluper@DESKTOP-67ADUGH:/home/project/Trluper/client/build$ make
    Consolidate compiler generated dependencies of target client
    [100%] Built target client

  • 进入/home/project/Trluper/TEST$进行make

    1
    2
    3
    4
    5
    trluper@DESKTOP-67ADUGH:/home/project/Trluper/TEST$ make
    Consolidate compiler generated dependencies of target Server
    [ 20%] Building CXX object CMakeFiles/Server.dir/src/Server.cpp.o
    [ 40%] Linking CXX executable Server
    [100%] Built target Server

  • 执行可执行文件进行测试

bug总结

  • 上面过程遇到一个很神奇的bug,在:ServerSetConectionOutServerUnSetConnectionOut中,因为ev.data.fd = dataFd;这条语句的存在,会使得程序在FlushOut()函数的while(writerBuffer.size()>0)报出段错误Segmentation fault
    • 原因:在ev.data.fd = dataFd;执行后,会使得ev.data.ptr的存储的conn对象地址发生改变,导致报错。(我也不知道为啥会影响到ev.data.ptr,有知道的可以告诉我一下)

4 逻辑解放:添加工厂类

上面我们引入了Connections类实现了每一个连接都会创建自己的Connections对象来管理读写,但是作为框架,Connections一般作为开发者需要继承的实现类,在Connections的基础上增加开发者的东西:

  • 问题1:因此我们不能在ServerctlAcceptFd()中,将实现逻辑写成直接实例化一个原Connections对象,而是应该实例化开发者继承实现的连接对象
    1
    2
    3
    4
    /src/Server.cpp/CtlAcceptFd()

    //创建相应的Connections对象,需要修改
    Connections* conn = new Connections(dataFd);
  • 问题2:同样的,Connections类中应该包含至少一个纯虚函数,告诉编译器Connections不可以实例化,需要开发者继承实现这些纯虚函数,声明自己的连接类。(后续对于数据层、业务层也同样如此)

抽象工厂模式

为了解决问题1,我们声明一个抽象工厂,内部提供一个纯虚函数接口CreateAllObjWhenAccept();开发者在后续只需要继承实现这个CreateAllObjWhenAccept(),在连接到了时创建这个dataFd的所有对象,并返回最底层的Connections对象

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
/include/Server.h


//抽象工厂,开发者要继承实现这个抽象工厂
class AbstractFactory{
protected:
AbstractFactory(){}
AbstractFactory(const AbstractFactory& obj){}
AbstractFactory& operator=(const AbstractFactory& obj){}
public:
virtual ~AbstractFactory(){};
//开发者需要重写的函数,一个是覆盖静态函数GetSingle,一个是重写虚函数CreateAllObjWhenAccept
static AbstractFactory* GetSingle(){}
virtual Connections* CreateAllObjWhenAccept(int _dataFd) = 0;
};

修改Server

那么引入抽象工厂AbstractFactory以后,就需要对Server进行修改,之前我你们之前使用Connections实例化对象,现在我们使用AbsractFactory来实例化对象,因此做如下修改:

  • 修改1:在Server.h中添加成员singleFactory;

    1
    2
    3
    /include/Server.h
    //工厂的单例对象,需要开发者初始化
    AbstractFactory* singleFactory;

  • 修改2:修改构造函数和ServerInit静态函数如下

    1
    2
    3
    4
      /*初始化一个服务器单例对象,提供服务的开始,内部其实纠错创建绑定监听socket、并将socker上树*/
    static Server* ServerInit(std::string&& ip,int&& port, AbstractFactory* _singleFactory);
    //构造、拷贝构造、赋值私有化
    Server(std::string ip,int port,AbstractFactory* _singleFactory);

  • 修改3:将CtlAcceptFd函数的new Connectons(dataFd)修改为:

    1
    2
    //创建相应的连接层、数据层和业务层对象
    Connections* conn = singleFactory->CreateAllObjWhenAccept(dataFd);

测试

为了测试引入的抽象工厂是否能够正常工作,我们需要继承AbstractFactory实现里面的函数,同时也要继承实现Connections。我们在test目录下创建两个头文件Factor.hMyConnections.h:

a) Factory.h文件:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
/test/Factory.h

#include "Server.h"
#include "MyConnections.h"

class Factory:public AbstractFactory
{
private:
Factory();
~Factory();
Factory(const Factory& obj){}
Factory& operator=(const Factory& obj){}
public:
static AbstractFactory* GetSingle();
virtual Connections* CreateAllObjWhenAccept(int _dataFd) override;
private:
static AbstractFactory* FactorySingle;
};

AbstractFactory* Factory::FactorySingle = new Factory();
Factory::Factory()
{
}

Factory::~Factory()
{
}

AbstractFactory *Factory::GetSingle()
{
return FactorySingle;
}

Connections *Factory::CreateAllObjWhenAccept(int _dataFd)
{
Connections* conn = new MyConnections(_dataFd);
return conn;
}

b)MyConnections.h文件

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
#include "Connections.h"
class MyConnections:public Connections
{
private:
/* data */
public:
MyConnections(int _dataFd);
~MyConnections();
};

MyConnections::MyConnections(int _dataFd):Connections(_dataFd)
{
}

MyConnections::~MyConnections()
{
}

c) main.cpp文件

1
2
3
4
5
6
7
8
9
10
11
#include "Server.h"
#include "Factory.h"




int main(){
AbstractFactory* factorySingle = Factory::GetSingle();
Server::ServerInit("172.19.189.128",8080,factorySingle);
Server::ServerRun();
}

现在我们的框架的目录结构如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
.
├── CMakeLists.txt
├── TEST
├── build
├── client
│   ├── CMakeLists.txt
│   ├── build
│   ├── client.cpp
│   └── client.h
├── include
│   ├── AbstractHandle.h
│   ├── Connections.h
│   ├── DataProcess.h
│   ├── Message.h
│   └── Server.h
├── lib
├── src
│   ├── AbstractHandle.cpp
│   ├── Connections.cpp
│   ├── DataProcess.cpp
│   ├── Message.cpp
│   └── Server.cpp
└── test
├── Factory.h
├── MyConnections.h
└── main.cpp

我们分别进入TESTclient\build目录执行下述命令生成可执行文件Serverclient

1
2
cmake ..
make

执行可执行文件后,运行结果:

1
2
3
4
trluper@DESKTOP-67ADUGH:/home/project/Trluper/TEST$ ./Server
Server start at IP:172.19.189.128 ,Port:8080
新的客户端连接,ip:172.19.189.128
<------------------------------------------------------------->
1
2
3
trluper@DESKTOP-67ADUGH:/home/project/Trluper/client/build$ ./client
你好,我是客户端1
你好,我是客户端1

5 实现数据层

截至目前为止,我们已经实现了服务器对于客户端数据的接收过程,能够通过Server、Connections、Message对请求做接收和简单处理。但实际的服务器过程中,我们对来自于客户端的请求,应该会经过接收-->数据处理-->业务的过程,处理完后再由业务-->数据处理-->发送过程。因此本服务器框架对一个请求的响应应如下如所示:

因此,我们现在需要实现数据层DataProcess处理来自连接层的消息和来自业务层的用户数据:

  • 对于连接层的消息:目前服务器仅支持TCP,需要在数据层做粘包处理后,将一个完整请求打包成用户数据给业务层(反序列化)。
  • 对于业务层的数据:主要做序列化处理

因此数据处理层的工作就是对于来自连接层的数据进行粘包处理,然后转化为业务层能够识别的消息递交给业务应用层;能够对业务层下来的消息进行封装成请求给连接层发送;同时数据处理层能够很方便的支持protobuf的序列化和反序列化操作

代码实现

我们在includesrc创建相应的DataProcess头文件和源文件实现数据处理层。

a) DataProcess.h

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
/include/DataProcess.h

#include <unordered_map>
#include "AbstractHandle.h"
#include "Connections.h"
#include "FApplycations.h"


class Connections;
class FApplycations;
class UserMsg;


class DataProcess:public AbstractHandle
{
public:
DataProcess();
virtual ~DataProcess();
//!开发需要重写该函数获得绑定的conn对象,返回一个Connections对象指针,
virtual Connections* GetConnectionsObj(AbstractMsg& msg) = 0;
//!该函数获得绑定的app对象,内部调用RouterOfApp函数获得FApplycations对象指针(添加路由模块后弃用)
virtual FApplycations* GetApplycationsObj(AbstractMsg& msg) = 0;
//*将连接层递交上来的message处理为Request类型,这里就要开发者自己依据自定义额数据格式来重写该函数返回一个Request对象指针
virtual Request* MsgToRequest(Message& msg) = 0;
//*将应用层递交下来的Request类型的数据转为Message类型,这里就要开发者自己依据自定义额数据格式来重写该函数
virtual Message* RequestToMsg(Request& request) = 0;
protected:
virtual AbstractMsg* currentHandle(AbstractMsg& msg)override;
virtual AbstractHandle* GetNextHanlder(AbstractMsg& msg)override;
};

#endif

上述头文件中,主要由四个主要待实现函数:

  • GetApplycationsObj:由开发者实现,其返回一个业务层对象指针。这个表明连接层的消息给数据层处理后得到的请求,应该交给这个业务对象处理。

  • GetConnectionsObj:由开发者实现,其返回一个连接层对象指针。这个表明业务层的请求响应给数据层处理后得到的信息,应该交给这个连接对象发送给客户端。

  • MsgToRequest:将连接层递交上来的message处理为Request类型,这里就要开发者自己依据自定义额数据格式来重写该函数返回一个Request对象指针

  • RequestToMsg:将应用层递交下来的Request类型的数据转为Message类型,这里就要开发者自己依据自定义额数据格式来重写该函数

b) DataProcess.cc

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
/src/DataProcess.cc

#include "DataProcess.h"

DataProcess::DataProcess()
{
}

DataProcess::~DataProcess()
{

}


//!msg的真实类型是Message或者UserMsg
inline AbstractMsg *DataProcess::currentHandle(AbstractMsg &msg)
{
AbstractMsg* _UMmsg = nullptr;
DYNAMIC_GETREF(IOState,state,msg);
if(IO_Direction::IN==state->IO_DIC){//IN
DYNAMIC_GETREF(Message,_msg,msg);
Request *request = MsgToRequest(*_msg);
if(nullptr != request){
//需要在业务层释放
UserMsg* _msg_ = new UserMsg(IO_Direction::IN);
_msg_->SetRequest(request);
_UMmsg = _msg_;
}
}
else if(IO_Direction::OUT==state->IO_DIC){
DYNAMIC_GETREF(UserMsg,_userMsg,msg);
Message* response = RequestToMsg(*(_userMsg->getRequest()));
if(nullptr != response){
//需要在连接层释放
_UMmsg = response;
}
//释放业务层下来的msg
delete _userMsg;
_userMsg = nullptr;
}
return _UMmsg;
}

inline AbstractHandle *DataProcess::GetNextHanlder(AbstractMsg &msg)
{
AbstractHandle* nextHanlder = nullptr;
DYNAMIC_GETREF(IOState,state,msg);
if(IO_Direction::IN == state->IO_DIC){
//msg为UserMsg
nextHanlder = GetApplycationsObj(msg);
}
else{
//msg为Message
nextHanlder = GetConnectionsObj(msg);
}
return nextHanlder;
}

Connections的代码变动

因为加入了DataProcess,那么Connections必须知道接收到的数据给谁处理,因此必须增加一个待实现的纯虚函数,返回一个DataProcess对象指针。

1
2
3
4
/include/Connections.h

//!得到数据层对象,开发者需重写这个函数,返回一个DataProcess类型对象指针
virtual DataProcess* GetDataProcessObj(Message& msg) = 0;

实现业务层

上面我们已经完成了服务器的内核、连接层和数据处理层,已经能够成果地与客户端建立连接,同时接收客户端地消息并转化为请求指令,那么现在缺少的是对请求指令的业务处理。因此我们现在来实现我们Trluper框架的业务层

代码实现

a)FApplycations.h

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
/include/FApplycations.h

#include "AbstractHandle.h"
#include <list>

class UserMsg;

class FApplycations:public AbstractHandle{
public:
FApplycations();
virtual ~FApplycations();
//该业务对象处理请求的函数,內部实现业务功能,若还需要进一步处理,则需要返回相关的请求信息(堆对象)
virtual Request* ProcRequest(Request& _request) = 0;
//下一个业务处理者处理
virtual FApplycations* GetNextApplycationsObj(AbstractMsg& msg) = 0;

protected:
//本阶段真正的处理函数,
virtual AbstractMsg* currentHandle(AbstractMsg& msg)override;
virtual AbstractHandle* GetNextHanlder(AbstractMsg& msg)override;
};

#endif

b)FApplycations.cc

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
/src/FApplycations.cc

#include "FApplycations.h"

FApplycations::FApplycations()
{
}

FApplycations::~FApplycations()
{
}

AbstractMsg *FApplycations::currentHandle(AbstractMsg &msg)
{
AbstractMsg* _UMsg = nullptr;
DYNAMIC_GETREF(IOState,state,msg);
if(IO_Direction::IN==state->IO_DIC){
DYNAMIC_GETREF(UserMsg,_umsg_,msg);
Request* _request = ProcRequest(*(_umsg_->getRequest()));
if(nullptr != _request){
UserMsg* _UMsg_ = new UserMsg(IO_Direction::IN);
_UMsg_->SetRequest(_request);
_UMsg = _UMsg_;
}
//释放Dataprocess阶段的的请求
delete _umsg_;
_umsg_ = nullptr;
}
return _UMsg;
}

AbstractHandle *FApplycations::GetNextHanlder(AbstractMsg &msg)
{
return GetNextApplycationsObj(msg);
}

Server内核添加

上面我们建立了服务器框架从连接层到业务层的传递链路,主要通过责任链的形式,即获得下个层的对象指针,然后调用Handle函数来传递处理,但是目前从业务层到连接层的链路好像没有建立,因此我们需要在Server处添加一个静态函数,这个静态函数由开发者在实现业务逻辑的时候调用,将请求传递给数据处理层的对象process:

1
2
3
4
/include/Server.h

/*当业务应用层处理完后,应该调用数据层的额Handle函数进行响应,该函数在业务层处理后调用,由开发者调用*/
static void ServerUseHandleOfDataProcess(DataProcess& process,Request* request);

1
2
3
4
5
6
7
8
9
/src/Server.cc

void Server::ServerUseHandleOfDataProcess(DataProcess &process, Request* request)
{
UserMsg* _UMsg = new UserMsg(IO_Direction::OUT);
_UMsg->SetRequest(request);
process.Handle(*_UMsg);
}

程序异常退出时的信号捕捉

当程序异常退出时,比如ctrl+zctrl+c或者ctrl+\时,服务器应该能够捕捉到这些异常信号,执行资源回收后再退出。这里我们通过给服务器内核增加一个函数ServerExceptionStop(),再服务器启动前,将信号的处理程序改为资源回收然后退出。

1
2
3
4
5
6
7
8
9
10
11
/include/Server.cc

void Server::ServerExceptionStop()
{
struct sigaction SigEvent;
SigEvent.sa_sigaction = ServerStop;
SigEvent.sa_flags = SA_NOCLDWAIT|SA_NOCLDSTOP;
sigaction(SIGINT,&SigEvent,NULL);
sigaction(SIGQUIT,&SigEvent,NULL);
sigaction(SIGSTOP,&SigEvent,NULL);
}

路由模块

上面我们已经完成了框架的整体,一个来自客户端的消息能够顺利被服务器接收、然后经过连接层、数据处理层和业务层的处理后得到响应,响应能够通过依此通过数据处理层、连接层将响应发送出去。然而,上面还存在以下缺陷:

  • 第一点是,上面的的逻辑中我们可以发现,当我们的业务场景很庞大时,我们不可能依照责任链去识别哪个业务者去处理这个请求,我们应该再数据处理后,应该有一个路由表来识别这个由数据处理后的请求应该首先交给哪个业务处理。

  • 第二点是,每次一个客户端连接到来,对于那些每个连接都分配一个对象的业务场景来说,我们每次都要在CreateAllObjWhenAccept函数中一下子将所有业务对象都创建下来,当业务对象多、连接多少,这回造成大量的内存无用销耗,因为不是每一个业务都会被客户端访问到,因此对于业务对象来说,应该在使用的时候去创建,也就是延时创建。

此我们可在业务应用层和数据处理层直接插入一个路由模块,数据处理层后经过路由选择对应的业务处理者入口

代码实现

a) Router.h

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
/include/Router.h

#ifndef _ROUTER_H_
#define _ROUTER_H_


#include "FApplycations.h"
#include "Message.h"

/// @brief 路由模块,当框架应用于多业务常见时,提供给框架在数据处理层时能够依据请求类型来解决交给哪个业务对象处理
class FApplycations;
class Request;

class Router{
public:
Router(){};
virtual ~Router(){};
/*依据数据处理层转化成的请求,获得业务对象*/
virtual FApplycations* GetFApplycationsObj(Request* request) = 0;
/*r如果路由表中改业务对象为nullptr,则需要创建*/
virtual void CreatApplycationsObj(Request* request) = 0;
};

#endif

使用Router与否,完全看框架使用者是否需要。在使用时,要到实现的DataProcess类内声明Router对象,并增加相关的RouterOfApp函数来获得指定业务对象;同时在ROuter中你需要自己实现两个纯虚函数。