1 框架需求分析
本框架主要为适用于IO密集类型的服务器开发而设计,同时作为框架,一是对于开发者而言应该是简单容易上手层次分明,二是框架本身而言容易扩展,能够依据开发人员的需求添加业务处理。
为了保证框架能够实现基本的要求,其总体需求如下:
框架能够较好的应对并发场景。
开发人员基于该框架能够轻松的实现自己的业务处理逻辑。
接下来我们会一步一步的构建起这个框架,告诉大家这个框架是怎么实现的,而不是一下子出来一个完整的框架,让大家不知道怎么入手,同时也在这个过程中提出一些问题,让大家对框架更加熟悉,面试中能够回答出来一些关键点。
2 实现服务器内核
对于C++来说,实现网络通信服务器,离不开socket和epoll。关于epoll大家可以看我的对epoll 的专题解析。这里简单简单介绍一些,服务器通过使用epoll,将对文件描述符的监听工作交给内核出处理,而不用自己另启一个线程取监听,一旦发送变换,就会返回文件描述符变化的数量和已经变化过的文件描述吗,后续我们只需遍历这个集合,判断是读事件还是写事件后进行处理 。
总结来说,epoll的引入使服务器能够及时的发现是否有数据需要处理,在一定程度上提高了并发度。
在这里我们通过socket和epoll构建起一个简单CS架构:
功能 :
服务端能够监听来自客户端的连接请求,并且建立链接,在建立后向客户端会送一个connected!消息
服务器能够对来自于已建立客户端的信息进行回显。
目录结构介绍: 首先我们先建立如下的结构目录,
include:里面是我们框架声明的头文件,后续所有的抽象类和继承类头文件
src:放置我们的框架的源文件
build:放置框架构建好的文件
test:放置测试代码
TEST:放置测试的debug文件
client存放客户端代码 1 2 3 4 5 6 7 8 9 10 11 . ├── CMakeLists.txt ├── client │ ├── CMakeLists.txt │ └── build ├── TEST ├── build ├── include ├── lib ├── src └── test
代码实现
首先我们在include和src处创建对应的Server头文件和源文件,负责客户端的连接请求和信息回显功能。首先,服务器示例应该是一个单例为好,这样后续扩展需要用到Server也能通过添加静态接口来实现
a)创建Server.h
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 /include/Server.h #ifndef _SERVER_H_ #define _SERVER_H_ #include <string> #include <list> #include <iostream> #include <sys/epoll.h> #include <sys/types.h> #include <sys/socket.h> #include <netinet/in.h> #include <arpa/inet.h> #include <netinet/in.h> #include <thread> #include <stdio.h> #include <stdlib.h> #include <fcntl.h> #include <string.h> #include <unistd.h> #include <sys/wait.h> #include <iostream> #include <future> #include <unordered_map> class Server {public : static Server* ServerInit (std::string&& ip,int && port) ; static void ServerStop () ; static void ServerRun () ; static void ServerDelConn (struct epoll_event& ev) ; static void ServerExit () ; private : Server (std::string ip,int port); Server (const Server& s){} const Server& operator =(const Server& s){return *this ;} ~Server (); bool init () ; void run () ; void ctlAcceptFd () ; void ctlCloseFd (struct epoll_event& ev) ; int epollfd = -1 ; int listenfd = -1 ; struct sockaddr_in serveraddr; bool server_exit=false ; static Server* singleServer; }; #endif
b)Server.cc
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 /src/Server.cpp #include "Server.h" Server* Server::singleServer = nullptr ; Server *Server::ServerInit (std::string &&ip, int &&port) { if (nullptr ==singleServer){ singleServer = new Server (std::forward<std::string>(ip),std::forward<int >(port)); } return singleServer; } void Server::ServerStop () { singleServer->~Server (); delete singleServer; } void Server::ServerRun () { if (nullptr !=singleServer){ singleServer->run (); }else { std::cout<<"ServerRun failed, the value of singleServer is nullptr." <<std::endl; } } void Server::ServerDelConn (epoll_event &ev) { if (nullptr !=singleServer) singleServer->ctlCloseFd (ev); } void Server::ServerExit () { if (nullptr !=singleServer) singleServer->server_exit=true ; } Server::Server (std::string ip, int port) { const char * _ip=ip.c_str (); int sokcetfd=socket (AF_INET,SOCK_STREAM,0 ); std::cout<<"Server start at IP:" <<ip<<" ,Port:" <<port<<std::endl; if (0 <=sokcetfd){ memset (&serveraddr, 0 , sizeof (serveraddr)); serveraddr.sin_port=htons (port); serveraddr.sin_family=AF_INET; inet_pton (AF_INET,_ip,&serveraddr.sin_addr.s_addr); int opt=1 ; setsockopt (sokcetfd,SOL_SOCKET,SO_REUSEADDR,&opt,sizeof (opt)); if (0 ==bind (sokcetfd,(struct sockaddr *)&serveraddr,sizeof (serveraddr))){ if (0 ==listen (sokcetfd,10 )){ listenfd=sokcetfd; init (); } else { perror ("Server start error: Listen" ); } }else { perror ("Server start error: Bind" ); } }else { perror ("Server start error: Get socket failed" ); } } Server::~Server () { if (0 <=epollfd){ close (epollfd); } close (listenfd); std::cout<<"GoodBye!" <<std::endl; } bool Server::init () { epollfd=epoll_create (1 ); bool bRet=false ; struct epoll_event listenev; listenev.events=EPOLLIN|EPOLLOUT; listenev.data.fd=listenfd; if (0 ==epoll_ctl (epollfd,EPOLL_CTL_ADD,listenfd,&listenev)){ bRet=true ; }else { perror ("Server start error: Epoll init" ); } return bRet; } void Server::run () { while (false ==server_exit){ struct epoll_event hashChange[100 ]; int iEpollRet = 0 ; iEpollRet = epoll_wait (epollfd, hashChange, 100 , -1 ); for (int i=0 ;i<iEpollRet;++i){ if (hashChange[i].data.fd==listenfd&&hashChange[i].events&EPOLLIN){ ctlAcceptFd (); }else { int currentfd=hashChange[i].data.fd; char buf[1024 ]; bool continueRead=true ; std::string s ("" ) ; while (continueRead){ int ret=recv (currentfd,buf,sizeof (buf),MSG_DONTWAIT); if (0 >ret){ if (errno==EAGAIN||errno==EWOULDBLOCK) break ; else if (errno==EINTR) continue ; else { perror ("recv:" ); continueRead=false ; } } else if (0 ==ret){ ctlCloseFd (hashChange[i]); break ; } else s.append (buf); } char _outBuf[s.size ()] = {0 }; s.copy (_outBuf,s.size ()); send (currentfd,_outBuf,sizeof (_outBuf),MSG_WAITALL); memset (buf,0 ,1024 ); } } } } void Server::ctlAcceptFd () { struct sockaddr_in clientSocket; int dataFd = -1 ; socklen_t lAddrlen=sizeof (clientSocket); dataFd = accept (listenfd,(struct sockaddr*)&clientSocket,&lAddrlen); if (dataFd>=0 ){ char ip[16 ]; inet_ntop (AF_INET,&clientSocket.sin_addr.s_addr,ip,16 ); std::cout<<"新的客户端连接,ip:" <<ip<<std::endl; struct epoll_event client; client.events=EPOLLIN|EPOLLOUT|EPOLLET; client.data.fd=dataFd; if (-1 ==epoll_ctl (epollfd,EPOLL_CTL_ADD,dataFd,&client)) perror ("epoll_ctl error: client" ); }else { perror ("accept:" ); } } void Server::ctlCloseFd (struct epoll_event& ev) { epoll_ctl (epollfd,EPOLL_CTL_DEL,ev.data.fd,&ev); close (ev.data.fd); }
c) 错误总结
笔者在写上面的代码的时候碰到了2个有意思的bug。这了做了一些总结:
第一处:我们设置了dataFd为边缘触发,那么第一次连接请求到来后,会触发一次epoll_wait来进行accept。之后debug的时候,它还会再一次触发epoll_wait,网上给出的解释是:
因为datafd采用边缘触发且设置了EPOLLOUT,刚建立连接,datafd可写的可写缓冲区由不可写变为可写,就会出现这种二次触发现象。
第二处:我们代码每次会在触发epoll_wait后创建接受缓存,但是由于编译器的优化,导致这一步骤可能被优化,导致数据染脏。因此最后加入了memset(buf,0,1024)。读者可以去掉这句看看会发送什么结果。
d)测试
为了测试我们服务能够正常运行,我们写了如下的客户端代码:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 /client/client.h #include <sys/socket.h> #include <sys/types.h> #include <sys/bitypes.h> #include <string.h> #include <netinet/in.h> #include <arpa/inet.h> #include <netinet/in.h> #include <stdio.h> #include <stdlib.h> #include <fcntl.h> #include <string.h> #include <unistd.h> #include <sys/wait.h> #include <string> #include <iostream> #define messageLen 1024 class client {public : client (std::string _ip,int port):_serverIP(_ip),_port(port){ _clientFd=socket (AF_INET,SOCK_STREAM,0 ); addr.sin_family = AF_INET; addr.sin_port = htons (_port); const char * ip=_serverIP.c_str (); inet_pton (AF_INET,ip,&addr.sin_addr.s_addr); } void run () { int ero = connect (_clientFd,(struct sockaddr*)&addr,sizeof (addr)); if (0 !=ero){ perror ("connect:" ); } while (true ){ std::string s ("" ) ; std::cin>>s; int len=s.size (); char buf[messageLen] = {0 }; s.copy (buf,len); s.clear (); send (_clientFd,buf,len,0 ); memset (buf,0 ,len); recv (_clientFd,buf,messageLen,0 ); s=buf; std::cout<<s<<std::endl; } } private : struct sockaddr_in addr; std::string _serverIP; int _port; int _clientFd; };
运行下面的源文件main.cpp和client.cpp:
1 2 3 4 5 6 7 8 /test/main.cpp #include "Server.h" int main () { Server::ServerInit ("172.19.189.128" ,8080 ); Server::ServerRun (); }
1 2 3 4 5 6 7 8 9 10 /client/client.cpp #include "client.h" int main () { std::string serverip ("172.19.189.128" ) ; int port=8080 ; client _client(serverip,port); _client.run (); }
测试结果: 1 2 3 4 5 6 7 8 9 10 11 trluper@DESKTOP-67 ADUGH:/home/project/Trluper/TEST$ ./Server Server start at IP:172.19 .189 .128 ,Port:8080 新的客户端连接,ip:1172.19 .189 .128 trluper@DESKTOP-67 ADUGH:/home/project/Trluper/client/build$ ./client niha niha dad dad
3
实现连接层:为每一个连接都提供自己的datafd管理读写分发
很好,我们已经构建了能够正常通信的基础服务器。但是上面的服务器中我们将读写 在写在了Server中,每个客户端都执行相同的逻辑,而且业务扩展极难,这不符合我们的程序设计规则。
因此我们希望每一个·dataFd都绑定有一个专门负责自己读写的类Connections,同时实现读写分离,便于后续的扩展。因此现在我们的框架架构应该是这样
因此,我们可以创建一个Connections类来维护每个客户端的通道,在Connections中执行消息的接收和发送。同时为了后续自定义消息类型的实现,我们还创建了Message.h头文件。那么有了
代码实现
A) AbstarctHandle.h
创建抽象处理类AbstarctHandle.h,后续的连接层、数据层和业务层都继承与该抽象类。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 /include/AbstractHandle.h #ifndef _ABSTRACTHANDLE_H_ #define _ABSTRACTHANDLE_H_ #include <string> #include "Message.h" #define DYNAMIC_GETREF(type, ref, orig) type * ref = dynamic_cast<type *> (&orig); if (nullptr == ref) {return nullptr;} class AbstractHandle {public : AbstractHandle (){} virtual ~AbstractHandle (){} virtual void Handle (AbstractMsg& msg) ; protected : virtual AbstractMsg* currentHandle (AbstractMsg& msg) = 0 ; virtual AbstractHandle* GetNextHanlder (AbstractMsg& msg) =0 ; }; #endif
B) Connections.h
Connections.h规定了连接层的大部分函数,其中许多共同处理已经写定。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 /include/Connections.h #ifndef _CONNECTIONS_H_ #define _CONNECTIONS_H_ #include "AbstractHandle.h" #include <list> #include <memory> #include <stdio.h> #include <sys/socket.h> #include <string.h> #include <iostream> #include "Server.h" class Server ;class UserMsg ;class Connections : public AbstractHandle{public : Connections (int _dataFd); virtual ~Connections (); int GetFd () {return dataFd;} void FlushOut () ; bool HashOutPut () {return false ==writerBuffer.empty ();} void SetConnectionClose () {NeedClose=true ;} bool ConnectionNeedClose () {return NeedClose;} virtual DataProcess* GetDataProcessObj (Message& msg) = 0 ; protected : bool writeFd (std::string& _output) ; bool ReadFd (std::string& _input) ; virtual AbstractMsg* currentHandle (AbstractMsg& msg) override ; virtual AbstractHandle* GetNextHanlder (AbstractMsg& msg) override ; protected : int dataFd; bool NeedClose = false ; std::list<std::shared_ptr<std::string>> writerBuffer; Message* _msg_; }; #endif
C) Message.h
Message.h内部定义了框架会使用到的一些基本消息,也为后续用户消息的自定义打下基础
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 /include/Message.h #ifndef _MESSAGE_H_ #define _MESSAGE_H_ #include <string> #include <memory> enum IO_Direction { IN,OUT }; class Request ; class AbstractMsg {public : AbstractMsg (); virtual ~AbstractMsg (); }; class IOState :public AbstractMsg{public : IO_Direction IO_DIC; IOState (IO_Direction dic); virtual ~IOState (); }; class Message :public IOState{public : Message (IO_Direction dic); virtual ~Message (){} std::string message; }; class UserMsg :public IOState{ public : UserMsg (IO_Direction dic); virtual ~UserMsg (); Request* getRequest () ; void SetRequest (Request* _request) ; private : Request* request = nullptr ; }; class Request {public : Request (){} virtual ~Request (){} }; inline Request* UserMsg::getRequest () { return request; } inline void UserMsg::SetRequest (Request *_request) { request = _request; } #endif
D) AbstractHandle.cc
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 /src/AbstractHandle.cc #include "AbstractHandle.h" void AbstractHandle::Handle (AbstractMsg &msg) { AbstractMsg* _msg = nullptr ; AbstractHandle* nextHanlder = nullptr ; _msg = currentHandle (msg); if (nullptr != _msg){ nextHanlder = GetNextHanlder (*_msg); if (nullptr != nextHanlder){ nextHanlder->Handle (*_msg); } } }
E)Connection.cc
完成上述的Connections.h的声明后,我们可以实现它的内部函数:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 /src/Connections.cc #include "Connections.h" Connections::Connections (int _dataFd):dataFd (_dataFd),_msg_(new Message (IO_Direction::IN)) { } Connections::~Connections () { delete _msg_; } inline void Connections::FlushOut () { while (writerBuffer.size ()>0 ){ if (true == writeFd (*writerBuffer.front ())) writerBuffer.pop_front (); else break ; } } inline bool Connections::writeFd (std::string &_output) { bool ret = false ; char *_outbuf = (char *)calloc (1UL ,_output.size ()); _output.copy (_outbuf,_output.size ()); if (dataFd>=0 && (_output.size () == send (dataFd,_outbuf,_output.size (),0 ))){ std::cout<<"send to client:" <<dataFd<<", packetSize: " <<_output.size ()<<std::endl; ret = true ; } free (_outbuf); return ret; } inline bool Connections::ReadFd (std::string& _input) { bool ret = false ; char buf[1024 ] = {0 }; while (true ){ ssize_t rcvLen = recv (dataFd,buf,sizeof (buf),MSG_DONTWAIT); if (0 < rcvLen){ _input.append (buf,rcvLen); ret = true ; } else if (0 == rcvLen){ FlushOut (); SetConnectionClose (); break ; } else { if (errno == EAGAIN || errno == EWOULDBLOCK) break ; else if (errno == EINTR) continue ; else { perror ("Recv:" ); break ; } } } return ret; } inline AbstractMsg *Connections::currentHandle (AbstractMsg &msg) { AbstractMsg* _msg = nullptr ; DYNAMIC_GETREF (IOState,state,msg); if (state->IO_DIC==IO_Direction::IN){ if (true == ReadFd (_msg_->message)){ _msg = _msg_; } } else if (state->IO_DIC == IO_Direction::OUT){ DYNAMIC_GETREF (Message,_outMsg,msg); if (false == HashOutPut ()){ Server::ServerSetConectionOut (*this ); } std::shared_ptr<std::string> str = std::make_shared <std::string>(_outMsg->message); writerBuffer.push_back (str); delete _outMsg; _outMsg = nullptr ; } return _msg; } inline AbstractHandle *Connections::GetNextHanlder (AbstractMsg &msg) { AbstractHandle* nextHanlder = nullptr ; DYNAMIC_GETREF (Message,_msg,msg); if (_msg->IO_DIC==IO_Direction::IN){ nextHanlder = GetDataProcessObj (*_msg); } return nextHanlder; }
很好,我们已经大体完成了连接层,上面我们能够成功的完成连接请求的响应和接收数据 ,但是还没有完成数据交由数据层和业务层处理后发挥响应给客户端端,后续我们会依次实现,现在我们先只要做好连接层的事。
Server代码变动
添加了Connections和Message后,Server.h应该增加一个成员变量std::unordered_map<int,Connections*> m_ConnctionsMap; 作为Server的连接统计管理。同时增加一个静态函数ServerDelConn能够给提供关闭连接的接口,代码如下
a) Server.h
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 /include/Server.h #ifndef _SERVER_H_ #define _SERVER_H_ #include <string> #include <list> #include <iostream> #include <sys/epoll.h> #include <sys/types.h> #include <sys/socket.h> #include <netinet/in.h> #include <arpa/inet.h> #include <netinet/in.h> #include <thread> #include <stdio.h> #include <stdlib.h> #include <fcntl.h> #include <string.h> #include <unistd.h> #include <sys/wait.h> #include <iostream> #include <future> #include <unordered_map> #include "Connections.h" !v0.2 版本 *Server应该以单例的模式运行,因此应该只提供实例化一个服务器对象,因此提供静态方法 */ class Connections ;class Server {public : static Server* ServerInit (std::string&& ip,int && port) ; static void ServerStop () ; static void ServerRun () ; static void ServerDelConn (struct epoll_event& ev) ; static void ServerExit () ; private : Server (std::string ip,int port); Server (const Server& s){} const Server& operator =(const Server& s){return *this ;} ~Server (); bool init () ; void run () ; void ctlAcceptFd () ; void ctlCloseFd (struct epoll_event& ev) ; int epollfd = -1 ; int listenfd = -1 ; struct sockaddr_in serveraddr; bool server_exit=false ; std::unordered_map<int ,Connections*> m_ConnctionsMap; static Server* singleServer; }; #endif
c) Server.cc
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 src/Server.cpp #include "Server.h" Server* Server::singleServer = nullptr ; Server *Server::ServerInit (std::string &&ip, int &&port,AbstractFactory* _singleFactory) { if (nullptr ==singleServer){ singleServer = new Server (std::forward<std::string>(ip),std::forward<int >(port), _singleFactory); } return singleServer; } void Server::ServerExceptionStop () { struct sigaction SigEvent; SigEvent.sa_sigaction = ServerStop; SigEvent.sa_flags = SA_NOCLDWAIT|SA_NOCLDSTOP; sigaction (SIGINT,&SigEvent,NULL ); sigaction (SIGQUIT,&SigEvent,NULL ); sigaction (SIGSTOP,&SigEvent,NULL ); } void Server::ServerStop (int signal,siginfo_t * info,void * context) { singleServer->~Server (); delete singleServer; } void Server::ServerRun () { if (nullptr !=singleServer){ singleServer->run (); }else { std::cout<<"ServerRun failed, the value of singleServer is nullptr." <<std::endl; } } void Server::ServerDelConn (epoll_event &ev) { if (nullptr !=singleServer) singleServer->ctlCloseFd (ev); } void Server::ServerExit () { if (nullptr !=singleServer) singleServer->server_exit=true ; } #ifdef _OLD_CODE_ void Server::ServerSetConectionOut (Connections &conn) { int dataFd = conn.GetFd (); struct epoll_event ev; ev.events = EPOLLIN | EPOLLOUT | EPOLLET; ev.data.ptr = &conn; epoll_ctl (singleServer->epollfd,EPOLL_CTL_MOD,dataFd,&ev); } void Server::ServerUnSetConnectionOut (Connections &conn) { struct epoll_event ev; ev.events = EPOLLIN | EPOLLET; ev.data.ptr = &conn; epoll_ctl (singleServer->epollfd,EPOLL_CTL_MOD,conn.GetFd (),&ev); } #endif void Server::ServerUseHandleOfDataProcess (DataProcess &process, Request* request) { UserMsg* _UMsg = new UserMsg (IO_Direction::OUT); _UMsg->SetRequest (request); process.Handle (*_UMsg); } Server::Server (std::string ip, int port,AbstractFactory* _singleFactory):singleFactory (_singleFactory) { const char * _ip=ip.c_str (); int sokcetfd=socket (AF_INET,SOCK_STREAM,0 ); std::cout<<"Server start at IP:" <<ip<<" ,Port:" <<port<<std::endl; if (0 <=sokcetfd){ memset (&serveraddr, 0 , sizeof (serveraddr)); serveraddr.sin_port=htons (port); serveraddr.sin_family=AF_INET; inet_pton (AF_INET,_ip,&serveraddr.sin_addr.s_addr); int opt=1 ; setsockopt (sokcetfd,SOL_SOCKET,SO_REUSEADDR,&opt,sizeof (opt)); if (0 ==bind (sokcetfd,(struct sockaddr *)&serveraddr,sizeof (serveraddr))){ if (0 ==listen (sokcetfd,10 )){ listenfd=sokcetfd; init (); } else { perror ("Server start error: Listen" ); } }else { perror ("Server start error: Bind" ); } }else { perror ("Server start error: Get socket failed" ); } } Server::~Server () { delete singleFactory; auto it=m_ConnctionsMap.begin (); while (it!=m_ConnctionsMap.end ()){ close (it->first); delete it->second; m_ConnctionsMap.erase (it->first); it=m_ConnctionsMap.begin (); } if (0 <=epollfd){ close (epollfd); } close (listenfd); std::cout<<"GoodBye!" <<std::endl; } bool Server::init () { epollfd=epoll_create (1 ); bool bRet=false ; struct epoll_event listenev; listenev.events=EPOLLIN|EPOLLOUT; listenev.data.fd=listenfd; if (0 ==epoll_ctl (epollfd,EPOLL_CTL_ADD,listenfd,&listenev)){ bRet=true ; }else { perror ("Server start error: Epoll init" ); } return bRet; } void Server::run () { while (false ==server_exit){ struct epoll_event hashChange[100 ]; int iEpollRet = 0 ; iEpollRet = epoll_wait (epollfd, hashChange, 100 , -1 ); for (int i=0 ;i<iEpollRet;++i){ if (hashChange[i].data.fd==listenfd&&hashChange[i].events&EPOLLIN){ ctlAcceptFd (); } else if (hashChange[i].events&EPOLLIN){ IOState state (IO_Direction::IN); Connections* conn =(Connections*)hashChange[i].data.ptr; conn->Handle (state); if (conn->ConnectionNeedClose ()==true ){ Server::ServerDelConn (hashChange[i]); } } #ifdef _OLD_CODE_ else { IOState state (IO_Direction::OUT); Connections* conn =(Connections*)hashChange[i].data.ptr; conn->FlushOut (); if (false == conn->HashOutPut ()) ServerUnSetConnectionOut (*conn); } #endif } } } void Server::ctlAcceptFd () { struct sockaddr_in clientSocket; int dataFd = -1 ; socklen_t lAddrlen=sizeof (clientSocket); dataFd = accept (listenfd,(struct sockaddr*)&clientSocket,&lAddrlen); if (dataFd>=0 ){ char ip[16 ]; inet_ntop (AF_INET,&clientSocket.sin_addr.s_addr,ip,16 ); std::cout<<"新的客户端连接,ip:" <<ip<<std::endl; std::cout<<"<------------------------------------------------------------->" <<std::endl; Connections* conn = singleFactory->CreateAllObjWhenAccept (dataFd); if (nullptr !=conn){ m_ConnctionsMap[dataFd] = conn; struct epoll_event client; client.events=EPOLLIN|EPOLLET; client.data.ptr = conn; if (-1 ==epoll_ctl (epollfd,EPOLL_CTL_ADD,dataFd,&client)) perror ("epoll_ctl error: client" ); } else { char buf[]="Unknow error!" ; send (dataFd,buf,sizeof (buf),MSG_DONTWAIT); close (dataFd); } }else { perror ("accept:" ); } } void Server::ctlCloseFd (struct epoll_event& ev) { Connections* conn = (Connections*)ev.data.ptr; m_ConnctionsMap.erase (ev.data.fd); epoll_ctl (epollfd,EPOLL_CTL_DEL,ev.data.fd,&ev); close (ev.data.fd); delete conn; }
我们对服务器的内核做了最后的变动,后面就很少去改动原因的代码了,因为连接层作为三层结构中与与服务器内核直接联系的一层,目前已经确定,因此不需要去变动原因的代码。后续有改动只需要添加一下新函数和代码就可以了。从这里可以看出,这样的三层结构能够很好地解耦合我们地框架
测试
由于我们还没有写出数据层、业务层,因此指令直接修改readFd函数测试:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 /src/Connections.cpp/ReadFd bool Connections::ReadFd (std::string& _input) { bool ret = false ; char buf[1024 ] = {0 }; while (true ){ ssize_t rcvLen = recv (dataFd,buf,sizeof (buf),MSG_DONTWAIT); if (0 < rcvLen){ _input.append (buf,rcvLen); ret = true ; } else if (0 == rcvLen){ SetConnectionClose (); break ; } else { if (errno == EAGAIN || errno == EWOULDBLOCK) break ; else if (errno == EINTR) continue ; else { perror ("Recv:" ); break ; } } } #ifndef _TEST_ #define _TEST_ std::cout<<this <<std::endl; Server::ServerSetConectionOut (*this ); char * senBuf =(char *)calloc (1UL ,_input.size ()); _input.copy (senBuf,_input.size ()); send (dataFd,senBuf,_input.size (),0 ); #endif return ret; }
然后进入:/home/project/Trluper/client/build$进行make
1 2 3 trluper@DESKTOP-67 ADUGH:/home/ project /Trluper/ client/build$ make Consolidate compiler generated dependencies of target client [100 %] Built target client
进入/home/project/Trluper/TEST$进行make
1 2 3 4 5 trluper@DESKTOP-67 ADUGH:/home/project/Trluper/TEST$ make Consolidate compiler generated dependencies of target Server [ 20 %] Building CXX object CMakeFiles/Server.dir/src/Server.cpp.o [ 40 %] Linking CXX executable Server [100 %] Built target Server
执行可执行文件进行测试
bug总结
上面过程遇到一个很神奇的bug,在:ServerSetConectionOut和ServerUnSetConnectionOut中,因为ev.data.fd = dataFd;这条语句的存在,会使得程序在FlushOut()函数的while(writerBuffer.size()>0)报出段错误Segmentation fault
原因:在ev.data.fd = dataFd;执行后,会使得ev.data.ptr的存储的conn对象地址发生改变,导致报错 。(我也不知道为啥会影响到ev.data.ptr,有知道的可以告诉我一下)
4 逻辑解放:添加工厂类
上面我们引入了Connections类实现了每一个连接都会创建自己的Connections对象来管理读写,但是作为框架,Connections一般作为开发者需要继承的实现类,在Connections的基础上增加开发者的东西:
问题1 :因此我们不能在Server的ctlAcceptFd()中,将实现逻辑写成直接实例化一个原Connections对象,而是应该实例化开发者继承实现的连接对象 。
1 2 3 4 /src/Server.cpp/CtlAcceptFd () Connections* conn = new Connections (dataFd);
问题2 :同样的,Connections类中应该包含至少一个纯虚函数,告诉编译器Connections不可以实例化,需要开发者继承实现这些纯虚函数,声明自己的连接类。(后续对于数据层、业务层也同样如此)
抽象工厂模式
为了解决问题1,我们声明一个抽象工厂,内部提供一个纯虚函数接口CreateAllObjWhenAccept();开发者在后续只需要继承实现这个CreateAllObjWhenAccept(),在连接到了时创建这个dataFd的所有对象,并返回最底层的Connections对象
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 /include/Server.h class AbstractFactory {protected : AbstractFactory (){} AbstractFactory (const AbstractFactory& obj){} AbstractFactory& operator =(const AbstractFactory& obj){} public : virtual ~AbstractFactory (){}; static AbstractFactory* GetSingle () {} virtual Connections* CreateAllObjWhenAccept (int _dataFd) = 0 ; };
修改Server
那么引入抽象工厂AbstractFactory以后,就需要对Server进行修改,之前我你们之前使用Connections实例化对象,现在我们使用AbsractFactory来实例化对象,因此做如下修改:
修改1:在Server.h中添加成员singleFactory;
1 2 3 /include/Server.h AbstractFactory* singleFactory;
修改2:修改构造函数和ServerInit静态函数如下
1 2 3 4 static Server* ServerInit (std::string&& ip,int && port, AbstractFactory* _singleFactory) ; Server (std::string ip,int port,AbstractFactory* _singleFactory);
修改3:将CtlAcceptFd函数的new Connectons(dataFd)修改为:
1 2 Connections* conn = singleFactory->CreateAllObjWhenAccept (dataFd);
测试
为了测试引入的抽象工厂是否能够正常工作,我们需要继承AbstractFactory实现里面的函数,同时也要继承实现Connections。我们在test目录下创建两个头文件Factor.h和MyConnections.h:
a) Factory.h文件:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 /test/Factory.h #include "Server.h" #include "MyConnections.h" class Factory :public AbstractFactory{ private : Factory (); ~Factory (); Factory (const Factory& obj){} Factory& operator =(const Factory& obj){} public : static AbstractFactory* GetSingle (); virtual Connections* CreateAllObjWhenAccept (int _dataFd) override ; private : static AbstractFactory* FactorySingle; }; AbstractFactory* Factory::FactorySingle = new Factory (); Factory::Factory () { } Factory::~Factory () { } AbstractFactory *Factory::GetSingle () { return FactorySingle; } Connections *Factory::CreateAllObjWhenAccept (int _dataFd) { Connections* conn = new MyConnections (_dataFd); return conn; }
b)MyConnections.h文件
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 #include "Connections.h" class MyConnections :public Connections{ private : public : MyConnections (int _dataFd); ~MyConnections (); }; MyConnections::MyConnections (int _dataFd):Connections (_dataFd) { } MyConnections::~MyConnections () { }
c) main.cpp文件
1 2 3 4 5 6 7 8 9 10 11 #include "Server.h" #include "Factory.h" int main () { AbstractFactory* factorySingle = Factory::GetSingle (); Server::ServerInit ("172.19.189.128" ,8080 ,factorySingle); Server::ServerRun (); }
现在我们的框架的目录结构如下: 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 . ├── CMakeLists.txt ├── TEST ├── build ├── client │ ├── CMakeLists.txt │ ├── build │ ├── client.cpp │ └── client.h ├── include │ ├── AbstractHandle.h │ ├── Connections.h │ ├── DataProcess.h │ ├── Message.h │ └── Server.h ├── lib ├── src │ ├── AbstractHandle.cpp │ ├── Connections.cpp │ ├── DataProcess.cpp │ ├── Message.cpp │ └── Server.cpp └── test ├── Factory.h ├── MyConnections.h └── main.cpp
我们分别 进入TEST和client\build目录 执行下述命令生成可执行文件Server和client
执行可执行文件后,运行结果: 1 2 3 4 trluper@DESKTOP-67 ADUGH:/home/project/Trluper/TEST$ ./Server Server start at IP:172.19 .189 .128 ,Port:8080 新的客户端连接,ip:172.19 .189 .128 <------------------------------------------------------------->
1 2 3 trluper@DESKTOP-67 ADUGH:/home/project/Trluper/client/build$ ./client 你好,我是客户端1 你好,我是客户端1
5 实现数据层
截至目前为止,我们已经实现了服务器对于客户端数据的接收过程,能够通过Server、Connections、Message对请求做接收和简单处理。但实际的服务器过程中,我们对来自于客户端的请求,应该会经过接收-->数据处理-->业务的过程,处理完后再由业务-->数据处理-->发送过程。因此本服务器框架对一个请求的响应应如下如所示:
因此,我们现在需要实现数据层DataProcess处理来自连接层的消息和来自业务层的用户数据:
对于连接层的消息 :目前服务器仅支持TCP,需要在数据层做粘包处理后,将一个完整请求打包成用户数据给业务层(反序列化)。
对于业务层的数据 :主要做序列化处理
因此数据处理层的工作就是对于来自连接层的数据进行粘包处理,然后转化为业务层能够识别的消息递交给业务应用层;能够对业务层下来的消息进行封装成请求给连接层发送;同时数据处理层能够很方便的支持protobuf的序列化和反序列化操作
代码实现
我们在include和src创建相应的DataProcess头文件和源文件实现数据处理层。
a) DataProcess.h
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 /include/DataProcess.h #include <unordered_map> #include "AbstractHandle.h" #include "Connections.h" #include "FApplycations.h" class Connections ;class FApplycations ;class UserMsg ;class DataProcess :public AbstractHandle{ public : DataProcess (); virtual ~DataProcess (); virtual Connections* GetConnectionsObj (AbstractMsg& msg) = 0 ; virtual FApplycations* GetApplycationsObj (AbstractMsg& msg) = 0 ; virtual Request* MsgToRequest (Message& msg) = 0 ; virtual Message* RequestToMsg (Request& request) = 0 ; protected : virtual AbstractMsg* currentHandle (AbstractMsg& msg) override ; virtual AbstractHandle* GetNextHanlder (AbstractMsg& msg) override ; }; #endif
上述头文件中,主要由四个主要待实现函数:
GetApplycationsObj:由开发者实现,其返回一个业务层对象指针。这个表明连接层的消息给数据层处理后得到的请求,应该交给这个业务对象处理。
GetConnectionsObj:由开发者实现,其返回一个连接层对象指针。这个表明业务层的请求响应给数据层处理后得到的信息,应该交给这个连接对象发送给客户端。
MsgToRequest:将连接层递交上来的message处理为Request类型,这里就要开发者自己依据自定义额数据格式来重写该函数返回一个Request对象指针
RequestToMsg:将应用层递交下来的Request类型的数据转为Message类型,这里就要开发者自己依据自定义额数据格式来重写该函数
b) DataProcess.cc
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 /src/DataProcess.cc #include "DataProcess.h" DataProcess::DataProcess () { } DataProcess::~DataProcess () { } inline AbstractMsg *DataProcess::currentHandle (AbstractMsg &msg) { AbstractMsg* _UMmsg = nullptr ; DYNAMIC_GETREF (IOState,state,msg); if (IO_Direction::IN==state->IO_DIC){ DYNAMIC_GETREF (Message,_msg,msg); Request *request = MsgToRequest (*_msg); if (nullptr != request){ UserMsg* _msg_ = new UserMsg (IO_Direction::IN); _msg_->SetRequest (request); _UMmsg = _msg_; } } else if (IO_Direction::OUT==state->IO_DIC){ DYNAMIC_GETREF (UserMsg,_userMsg,msg); Message* response = RequestToMsg (*(_userMsg->getRequest ())); if (nullptr != response){ _UMmsg = response; } delete _userMsg; _userMsg = nullptr ; } return _UMmsg; } inline AbstractHandle *DataProcess::GetNextHanlder (AbstractMsg &msg) { AbstractHandle* nextHanlder = nullptr ; DYNAMIC_GETREF (IOState,state,msg); if (IO_Direction::IN == state->IO_DIC){ nextHanlder = GetApplycationsObj (msg); } else { nextHanlder = GetConnectionsObj (msg); } return nextHanlder; }
Connections的代码变动
因为加入了DataProcess,那么Connections必须知道接收到的数据给谁处理,因此必须增加一个待实现的纯虚函数,返回一个DataProcess对象指针。
1 2 3 4 /include/Connections.h virtual DataProcess* GetDataProcessObj (Message& msg) = 0 ;
实现业务层
上面我们已经完成了服务器的内核、连接层和数据处理层,已经能够成果地与客户端建立连接,同时接收客户端地消息并转化为请求指令,那么现在缺少的是对请求指令的业务处理。因此我们现在来实现我们Trluper框架的业务层
代码实现
a)FApplycations.h
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 /include/FApplycations.h #include "AbstractHandle.h" #include <list> class UserMsg ;class FApplycations :public AbstractHandle{public : FApplycations (); virtual ~FApplycations (); virtual Request* ProcRequest (Request& _request) = 0 ; virtual FApplycations* GetNextApplycationsObj (AbstractMsg& msg) = 0 ; protected : virtual AbstractMsg* currentHandle (AbstractMsg& msg) override ; virtual AbstractHandle* GetNextHanlder (AbstractMsg& msg) override ; }; #endif
b)FApplycations.cc
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 /src/FApplycations.cc #include "FApplycations.h" FApplycations::FApplycations () { } FApplycations::~FApplycations () { } AbstractMsg *FApplycations::currentHandle (AbstractMsg &msg) { AbstractMsg* _UMsg = nullptr ; DYNAMIC_GETREF (IOState,state,msg); if (IO_Direction::IN==state->IO_DIC){ DYNAMIC_GETREF (UserMsg,_umsg_,msg); Request* _request = ProcRequest (*(_umsg_->getRequest ())); if (nullptr != _request){ UserMsg* _UMsg_ = new UserMsg (IO_Direction::IN); _UMsg_->SetRequest (_request); _UMsg = _UMsg_; } delete _umsg_; _umsg_ = nullptr ; } return _UMsg; } AbstractHandle *FApplycations::GetNextHanlder (AbstractMsg &msg) { return GetNextApplycationsObj (msg); }
Server内核添加
上面我们建立了服务器框架从连接层到业务层的传递链路,主要通过责任链的形式,即获得下个层的对象指针,然后调用Handle函数来传递处理,但是目前从业务层到连接层的链路好像没有建立,因此我们需要在Server处添加一个静态函数,这个静态函数由开发者在实现业务逻辑的时候调用,将请求传递给数据处理层的对象process:
1 2 3 4 /include/Server.h static void ServerUseHandleOfDataProcess (DataProcess& process,Request* request) ;
1 2 3 4 5 6 7 8 9 /src/Server.cc void Server::ServerUseHandleOfDataProcess (DataProcess &process, Request* request) { UserMsg* _UMsg = new UserMsg (IO_Direction::OUT); _UMsg->SetRequest (request); process.Handle (*_UMsg); }
程序异常退出时的信号捕捉
当程序异常退出时,比如ctrl+z、ctrl+c或者ctrl+\时,服务器应该能够捕捉到这些异常信号,执行资源回收后再退出。这里我们通过给服务器内核增加一个函数ServerExceptionStop(),再服务器启动前,将信号的处理程序改为资源回收然后退出。
1 2 3 4 5 6 7 8 9 10 11 /include/Server.cc void Server::ServerExceptionStop () { struct sigaction SigEvent; SigEvent.sa_sigaction = ServerStop; SigEvent.sa_flags = SA_NOCLDWAIT|SA_NOCLDSTOP; sigaction (SIGINT,&SigEvent,NULL ); sigaction (SIGQUIT,&SigEvent,NULL ); sigaction (SIGSTOP,&SigEvent,NULL ); }
路由模块
上面我们已经完成了框架的整体,一个来自客户端的消息能够顺利被服务器接收、然后经过连接层、数据处理层和业务层的处理后得到响应,响应能够通过依此通过数据处理层、连接层将响应发送出去。然而,上面还存在以下缺陷:
第一点是,上面的的逻辑中我们可以发现,当我们的业务场景很庞大时,我们不可能依照责任链去识别哪个业务者去处理这个请求,我们应该再数据处理后,应该有一个路由表来识别这个由数据处理后的请求应该首先交给哪个业务处理。
第二点是,每次一个客户端连接到来,对于那些每个连接都分配一个对象的业务场景来说,我们每次都要在CreateAllObjWhenAccept函数中一下子将所有业务对象都创建下来,当业务对象多、连接多少,这回造成大量的内存无用销耗,因为不是每一个业务都会被客户端访问到,因此对于业务对象来说,应该在使用的时候去创建,也就是延时创建。
因此我们可在业务应用层和数据处理层直接插入一个路由模块,数据处理层后经过路由选择对应的业务处理者入口
代码实现
a) Router.h
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 /include/Router.h #ifndef _ROUTER_H_ #define _ROUTER_H_ #include "FApplycations.h" #include "Message.h" class FApplycations ;class Request ;class Router {public : Router (){}; virtual ~Router (){}; virtual FApplycations* GetFApplycationsObj (Request* request) = 0 ; virtual void CreatApplycationsObj (Request* request) = 0 ; }; #endif
使用Router与否,完全看框架使用者是否需要。在使用时,要到实现的DataProcess类内声明Router对象,并增加相关的RouterOfApp函数来获得指定业务对象;同时在ROuter中你需要自己实现两个纯虚函数。