tcp是一种基于流的应用层协议,其“可靠的数据传输”现实的原理就是,“塞控制”的滑动窗口机制,该机制包含的算法法主要有“缓慢启动”,“塞避免”,“快速重传”。
linux环境下,应用层TCP消息体定义如下:
typedef结构TcpMsg_s { TcpMsgHeader 头; 无效*味精; }TcpMsg;
其中,head表示自定义的TCP消息头,它的自定义如下:
//TCP消息类型,根据业务需要请求定义
typedef enum MSGTYPE _e { EP_REG_REQ = 0 , EP_REQ_RSP = 1 , }消息类型;
//TCP消息头定义的通用框架 typedef结构TcpMsgHead_s { 国际长度;//消息长度(使用TCP粘包处理) MSGTYPE类型;//消息类型(使用接收端消息的解析) }TcpMsgHead;
TCP客户端和服务端都采用linux提供的epoll机制(epoll_create(),epoll_wait(),epoll_ctl())对socket实现监听(可读,可写事件等)。
开源事件驱动库lievent对socket事件的监听也是通过对epoll事件的封装实现的。
基础原理:利用linux网络通信API(scoket(),bind(),listen())来创建服务器端socket;
代码如下:输入参数:localip,本地ip;port:服务端本地的监听端口号;输出:返回-1,表示失败;返回>0的fd,表示socket建立成功;
1 int TcpServer(uint32_t lcoalip, int port) 2 { 3 int fd; 4 结构sockaddr_in 地址; 5 6 // socket建立 7 if ((fd = socket(AF_INET, SOCK_STREAM, IPPROTO_TCP)) < 0 ) 8 { 9 printf( " IN TcpServer() scoket created failed,errno is %d, strerror is %s\n " ,错误号,strerror(错误号)); 10 返回- 1 ; 11 } 12 13 //设置socket为非阻塞模型 14 int flags = fcntl(fd, F_GETFL, 0 ); 15 fcntl(fd, F_SETFL, 标志 | O_NONBLOCK); 16 17 memset(&addr, 0 , sizeof (addr)); 18 addr.sin_family = AF_INET; 19 addr.sin_addr.s_addr = localip; 20 addr.sin_port =端口; 21 22 //绑定本端口和IP 23 if (bind(fd, ( struct sockaddr_in)&addr, sizeof (addr) < 0 )) 24 { 25 printf( " IN TcpServer() 绑定失败,fd 是 %d,errno 是 %d,strerror 是 %s\n " , fd, errno, strerror(errno)); 26 返回- 1 ; 27 } 28 29 if (listen(fd, 20 < 0 )) 30 { 31 printf( " IN TcpServer() listen failed,fd is %d, errno is %d, strerror is %s\n " , fd, errno,错误(错误号)); 32 返回- 1 ; 33 } 34 35 //添加套接字到 epoll 事件 36 if (SubscribeFd(fd, SOCKET_EV) != 0)
{
return -1;
} 37 返回 fd; 38 }
实际情况如下:
输入参数:fd,待监听的fd;type,个举型变量,表示TCP类,是客户端还是服务端;端口:服务端的监听端口号;输出:返回-1,表示监听失败;返回0,表示将socket成功添加到维护在全局变化g_epoll(TCP_EPOLL类型结构)中的监听事件中;其中TCP_TYPE个数变化和TCP_EPOLL结构的定义义如下:
类型定义枚举 { 客户= 0 , 服务器= 1 , }TCP_TYPE; #define MAX_NUM_EPOLL 1000 //最多可监听的socket数目 typedef struct TCP_EPOLL_s { 结构epoll_event * p_event; int nb_evnet;
int nb_client;//用于 tcp 服务器 int epoll_fd; int sock_listen;//用于 tcp 服务器 int sock[MAX_NUM_EPOLL]; TCP_NL_MSG * p_tcp_nl_msg; // TCP粘包处理数据结构 }TCP_EPOLL;
int SubscribeFd ( int fd, TCP_TYPE 类型) { 结构epoll_event事件; 如果(客户==类型) { 事件.events = EPOLLOUT | 埃波莱特;//监听类型为可写事件 } else if (SERVER == type) { event.events = EPOLLIN | EPOLLET;//监听类型为可读事件 } event.date.u64 = 0; evnet.data.fd = fd; g_epoll.nb_event++; g_epoll.p_event = realloc(g_epoll.p_event, g_epoll.nb_event * sizeof(struct epoll_event)); //add epoll control event if (epoll_ctl(g_epoll.epoll_fd, EPOLL_CTL_ADD, fd, &event) != 0) { printf( " epoll_ctl failed for fd %d, errno is %d, strerror is %s\n " , fd, errno, strerror(errno)); 返回- 1 ; } printf( "成功订阅 fd %d\n " , fd); 返回 0 ; }
基础原理:利用linux网络通信API(scoket(),connect())来创建客户端socket;
代码如下:输入参数:peerip,服务端IP;localip,本地ip;port:服务端的监听端口号;输入:返回-1,表示失败;返回>0的fd,表示socket建立成功;
1 int TCPClient(uint32_t peerip,uint32_t localip,uint16_t port) 2 { 3 int fd = socket(AF_INET, SOCK_STREAM, IPPROTO_TCP); 4 if (fd < 0) 5 { 6 printf("TCPClient() socket failed"); 7 return -1; 8 } 9 10 struct sockaddr_in localaddr = {0}; 11 localaddr.sin_family = AF_INET; 12 localaddr.sin_addr.s_addr = localip; 13 //localaddr.sin_port = htons(port); 14 15 int ret = bind(fd, (struct sockaddr *)&localaddr, sizeof(localaddr)); 16 if (ret < 0) 17 { 18 printf("TCPClient() bind failed localip %u", localip); 19 return -1; 20 } 21 22 int flags = fcntl(fd, F_GETFL, 0); 23 fcntl(fd, F_SETFL, flags | O_NONBLOCK); 24 25 struct sockaddr_in servaddr = {0}; 26 servaddr.sin_family = AF_INET; 27 servaddr.sin_addr.s_addr = peerip; 28 servaddr.sin_port = htons(port); 29 30 ret = connect(fd, (sockaddr *)&servaddr, sizeof(servaddr)); 31 if(ret < 0) 32 { 33 if (errno != EINPROGRESS) 34 { 35 printf("TCPClient() connect failed, peerip %u, port %u", peerip, port); 36 return -1; 37 } 38 } 39 40 printf("TCPClient() connect success, fd = %u,peerip %u, port %u",fd, peerip, port); 41 42 return fd; 43 }
TCP服务端监听到监听socket有EPOLLIN事件到来时,调用int accept_fd = accept();接收此连接请求,然后服务端要利用epoll_create()为accept_fd创建新的监听事件;
1 void tcp_thread() 2 { 3 CreateEpoll(); 4 CreateSocketFdEpoll(g_tcp_type); 5 6 while (1) 7 { 8 //wait for a message 9 EpollRecvMsg(); 10 } 11 }
1 TCP_EPOLL g_epoll;//全局Epoll变量 2 3 //EPOLL事件的建立 4 void CreateEpoll() 5 { 6 g_epoll.epoll_fd = epoll_create1(0); 7 g_epoll.nb_event = 0; 8 }
1 int CreateSocketFdEpoll(TCP_TYPE type) 2 { 3 uint32_t server_ip = inet_addr(SERVER_IP); 4 uint32_t local_ip = inet_addr(LOCAL_IP); 5 6 int fd; 7 if (CLIENT == type) 8 { 9 fd = TcpClient(server_ip, SERVER_PORT, local_ip); 10 g_epoll.sock = fd; 11 } 12 else if (SERVER == type) 13 { 14 fd = TcpServer(local_ip, LOCAL_PORT); 15 g_epoll.sock_listen = fd; 16 } 17 18 g_epoll.p_tcpNLMsg = (TCP_NL_MSG)malloc(sizeof(TCP_NL_MSG)); 19 20 InitTcpNLMsg(g_epoll.p_tcpNLMsg); 21 }
1 void InitTcpNLMsg(TCP_NL_MSG* pTcpNLMsg) 2 { 3 pTcpNLMsg->g_recv_len = 0; 4 pTcpNLMsg->flag_in_NL_proc = FALSE; 5 memset(pTcpNLMsg->g_recv_buff, 0, MAX_MSG_LEN); 6 }
其中,TCP粘包处理的数据结构设计和处理逻辑分析详见另一篇博文:
1 void EpollRecvMsg() 2 { 3 int epoll_ret = 0; 4 int epoll_timeout = -1; 5 6 do 7 { 8 epoll_ret = epoll_wait(g_epoll.epoll_fd, g_epoll.p_event, g_epoll.nb_event, epoll_timeout); 9 }while(epoll_ret < 0 && errno == EINTR); 10 11 if (epoll_ret < 0) 12 { 13 printf("epoll_wait failed: %s\n", strerror(errno)); 14 return; 15 } 16 17 //遍历处理每一个当前监听到的事件 18 for (int i=0;i<epoll_ret;++i) 19 { 20 int fd = g_epoll.p_event[i].data.fd; 21 22 if (CLIENT == g_tcp_type) 23 { 24 if (g_epoll.p_event[i].events & EPOLLOUT) //the socket is writable,socket可写,表明服务端已accept该客户端的connect请求 25 { 26 if (JudgeIfConnSucc(fd) == 0)//判断TCP连接是否建立成功 27 { 28 struct epoll_event* p_ev = &(g_epoll.p_event[i]); 29 p_ev ->events = EPOLLIN | EPOLLET; 30 31 epoll_ctl(g_epoll.epoll_fd, EPOLL_CTL_MOD,fd, p_ev );//对TCP客户端socket修改其监听类型,由可写改为可读 32 33 printf("tcp_fd_client %d can be written\n", fd); 34 } 35 } 36 else if(g_epoll.p_event[i].events & EPOLLIN) //the socket is readable 37 { 38 RecvTcpMsg(fd); 39 } 40 } 41 else if (SERVER== g_tcp_type) 42 { if (g_epoll.p_event[i].events & EPOLLIN) //the socket is readable,服务端socket可读 43 { 44 if (fd == g_epoll.sock_listen)//服务端接收到一个TCP连接请求 45 { 46 struct sockaddr s_addr; 47 socklen_t length = sizeof(struct sockaddr); 48 49 int conn_fd = accept(fd, &s_addr, &length);//服务端接收来自客户端的连接请求 50 51 int flags = fcntl(conn_fd, F_GETFL, 0); 52 fcmt(conn_fd, F_SETFL, flags | O_NONBLOCK); 53 54 g_epoll.sock[g_epoll.nb_client++] = conn_fd; 55 56 SubscribeFd(conn_fd, SERVER);//服务端将新建立的TCP连接建立新的epoll监听事件,并维护在全局变量中 57 58 printf("Receive a tcp conn request, conn_fd is %d\n", fd); 59 } 60 else //support multi tcp client 61 { 62 RecvTcpMsg(fd);//接收TCP消息(先进行粘包处理,然后根据消息类型进入不同的处理分支) 63 } 64 } 65 } 66 } 67 }
函数实现如下:
输入:fd,发送socket;type,业务定义的tcp消息类型;msg指针:指向待发送的消息地址;length,待发送的msg的字节数;
输出:成功,返回发送的字节数;失败,返回-1;
#define MAX_LEN_BUFF 65535
int SendTcpMsg(int fd, MSGTYPE type, void* msg, int length) { uint8_t buf[MAX_LEN_BUFF]; memset(buf,0,MAX_LEN_BUFF); uint32_t bsize = 0; TcpMsgHead* head = (TcpMsgHead*)buf; bsize += sizeof(TcpMsgHead);
//将待发送消息内容拷贝到待发送缓存中 memcpy(buf+bsize, msg, length); bsize += length;
//封装TCP消息头,指明消息类型(用作接收端消息的解析)和消息长度(用作TCP粘包处理) head->type = type; head->msglen = bsize; int ret = send(fd,(const void*)buf,bsize,0); if(ret != bsize) { printf( "发送 tcp 消息失败,errno=%u,ret=%d, strerror 是 %s\n " , errno, ret, strerror(errno)); 返回- 1 ; } printf( "发送 tcp 消息成功,消息类型为 %d\n " , type); 返还;
联系客服