TCP半关闭状态分析和skynet对半关闭状态的支持
TCP半关闭状态分析
- 一、背景
- 二、TCP四次挥手流程
- 三、发送FIN包的场景
- 四、skynet 网络封装支持半关闭状态
- 4.1、连接的建立
- 4.2、连接断开
- 4.3、消息到达
- 4.4、消息发送完毕
- 五、测试skynet对半关闭的支持
- 5.1、测试直接关闭进程
- 5.2、测试关闭读端
- 5.2、测试关闭写端
- 总结
- 后言
一、背景
TCP四次挥手中的半关闭状态是否需要解决,依赖于使用场景,大多数场景不解决也不会有影响,但有些场景(特别是游戏服务器)还是需要关注这个半关闭状态的。
所谓半关闭,就是只关闭读端或写端;半关闭状态是接收到FIN包并返回ACK包时 到 发送FIN包前的状态。
对半关闭状态进行了解决的有JAVA的netty、skynet开源框架。
大多数网络连接程序在read=0时即调用close()关闭TCP连接;但是,在read=0到调用close()之间,可能还有很多数据需要发送(send),如果read=0时即调用close()那么对端就收不到这些数据了。
对TCP三次握手和四次挥手不了解的,可以查阅前面的文章《Linux网络设计之TCP网络协议栈》。
二、TCP四次挥手流程
需要四次挥手的原因:希望将FIN包控制权交给应用程序去处理,以便处理断开连接的善后工作,即对端可能还有数据要发送。
四次挥手流程:
- 服务器收到FIN包,自动回复ACK包,进入CLOSE_WAIT状态。
- 此时TCP协议栈会为FIN包插入一个文件描述符EOF到内核态网络读缓冲区。
- 应用程序通过read=0来感知这个FIN包。
- 如果此时应用程序有数据要发送,则发送数据后调用关闭连接操作(发送FIN包到对端)。
- 双端关闭都需要一个FIN和一个ACK流程。
三、发送FIN包的场景
- 关闭读端:调用shutdown(fd,SHUT_RD)。
- 关闭写端:调用shutdown(fd,SHUT_WR);半关闭连接,仍旧接收数据,继而等待对方关闭。
- 关闭双端:调用close(fd) 或者shoutdown(fd,SHUT_RDWR)。
- 关闭进程:内核协议栈会自动发送FIN包。
如果客户端想进入半关闭状态,需要关闭写端,服务端会把自己的读端关闭,从而进入半关闭状态。
如果客户端是关闭读端或者把读写端都关闭了(调用close(fd) 或者shoutdown(fd,SHUT_RDWR))或者是关闭了进程,那么会进入快速关闭流程:
即接受到对端回复的FIN包之后回复RST包,直接进入CLOSE状态,而没有TIME_WAIT状态,这是解决有大量TIME_WAIT现象的常用方法。
思考:四次挥手中,为什么存在TIME_WAIT状态?
因为ACK不会重传,防止没有LAST_ACK或LAST_ACK丢失,对端没有收到LAST_ACK而一直重发FIN包。一直重发已经不存在的socket,可能会对新建立的连接造成干扰。
四、skynet 网络封装支持半关闭状态
skynet 网络层支持半关闭状态。skynet 采用 reactor 网络编程模型;reactor 网络模型是一种异步事件模型。
通过 epoll_ctl 设置 struct epoll_event 中 data.ptr =(struct socket *)ud; 来完成 fd 与 actor绑定。skynet 通过 socket.start(fd, func) 来完成 actor 与 fd 的绑定。
4.1、连接的建立
客户端与服务端建立连接处理:建立成功的标识是 listenfd,有读事件触发。
服务端与第三方服务建立连接:建立成功的标识是 connect 返回的fd,有可写事件触发。
4.2、连接断开
skynet 网络层支持半关闭状态。
(1)读写端都关闭,epoll的事件EPOLLHUP 读写段都关闭:
(socket_epoll.h)
static int
sp_wait(int efd, struct event *e, int max) {
struct epoll_event ev[max];
int n = epoll_wait(efd , ev, max, -1);
int i;
for (i=0;i<n;i++) {
e[i].s = ev[i].data.ptr;
unsigned flag = ev[i].events;
e[i].write = (flag & EPOLLOUT) != 0;
e[i].read = (flag & EPOLLIN) != 0;
e[i].error = (flag & EPOLLERR) != 0;
e[i].eof = (flag & EPOLLHUP) != 0;
}
return n;
}
(socket_server.c)
// 处理:直接关闭并向 actor 返回事件 SOCKET_CLOSE
if (e->eof) {
// For epoll (at least), FIN packets are exchanged both ways.
// See: https://stackoverflow.com/questions/52976152/tcp-when-is-epollhup-generated
int halfclose = halfclose_read(s);
force_close(ss, s, &l, result);
// 如果前面因为关闭读端已经发送SOCKET_CLOSE,在这里避免重复 SOCKET_CLOSE
if (!halfclose) {
return SOCKET_CLOSE;
}
}
(2)检测读端关闭:
(socket_server.c)
// return -1 (ignore) when error
static int
forward_message_tcp(struct socket_server *ss, struct socket *s, struct socket_lock *l, struct socket_message * result) {
int sz = s->p.size;
char * buffer = MALLOC(sz);
int n = (int)read(s->fd, buffer, sz);
if (n<0) {
FREE(buffer);
switch(errno) {
case EINTR:
case AGAIN_WOULDBLOCK:
break;
default:
return report_error(s, result, strerror(errno));
}
return -1;
}
if (n==0) {
FREE(buffer);
if (s->closing) { // 如果该连接的 socket 已经关闭
// Rare case : if s->closing is true, reading event is disable, and SOCKET_CLOSE is raised.
if (nomore_sending_data(s)) {
force_close(ss,s,l,result);
}
return -1;
}
int t = ATOM_LOAD(&s->type);
if (t == SOCKET_TYPE_HALFCLOSE_READ) { // 如果已经处理过读端关闭
// Rare case : Already shutdown read.
return -1;
}
if (t == SOCKET_TYPE_HALFCLOSE_WRITE) { // 如果之前已经处理过写端关闭,则直接 close
// Remote shutdown read (write error) before.
force_close(ss,s,l,result);
} else {
close_read(ss, s, result);
}
return SOCKET_CLOSE;
}
if (halfclose_read(s)) {
// discard recv data (Rare case : if socket is HALFCLOSE_READ, reading event is disable.)
FREE(buffer);
return -1;
}
stat_read(ss,s,n);
result->opaque = s->opaque;
result->id = s->id;
result->ud = n;
result->data = buffer;
if (n == sz) {
s->p.size *= 2;
return SOCKET_MORE;
} else if (sz > MIN_READ_BUFFER && n*2 < sz) {
s->p.size /= 2;
}
return SOCKET_DATA;
}
(3)检测写端关闭:write < 0 && errno == EPIPE,能检测对端读端关闭。
(socket_server.c)
static int
send_list_tcp(struct socket_server *ss, struct socket *s, struct wb_list *list, struct socket_lock *l, struct socket_message *result) {
while (list->head) {
struct write_buffer * tmp = list->head;
for (;;) {
ssize_t sz = write(s->fd, tmp->ptr, tmp->sz);
if (sz < 0) {
switch(errno) {
case EINTR:
continue;
case AGAIN_WOULDBLOCK:
return -1;
}
return close_write(ss, s, l, result);
}
stat_write(ss,s,(int)sz);
s->wb_size -= sz;
if (sz != tmp->sz) {
tmp->ptr += sz;
tmp->sz -= sz;
return -1;
}
break;
}
list->head = tmp->next;
write_buffer_free(ss,tmp);
}
list->tail = NULL;
return -1;
}
4.3、消息到达
单线程读。读策略:
int n = read(fd, buf, sz);
sz 初始值为 64,根据从网络接收数据情况,动态调整 sz 的大小。
(socket_server.c)
// return -1 (ignore) when error
static int
forward_message_tcp(struct socket_server *ss, struct socket *s, struct socket_lock *l, struct socket_message * result) {
int sz = s->p.size;
char * buffer = MALLOC(sz);
int n = (int)read(s->fd, buffer, sz);
if (n<0) {
FREE(buffer);
switch(errno) {
case EINTR:
case AGAIN_WOULDBLOCK:
break;
default:
return report_error(s, result, strerror(errno));
}
return -1;
}
if (n==0) {
FREE(buffer);
if (s->closing) {
// Rare case : if s->closing is true, reading event is disable, and SOCKET_CLOSE is raised.
if (nomore_sending_data(s)) {
force_close(ss,s,l,result);
}
return -1;
}
int t = ATOM_LOAD(&s->type);
if (t == SOCKET_TYPE_HALFCLOSE_READ) {
// Rare case : Already shutdown read.
return -1;
}
if (t == SOCKET_TYPE_HALFCLOSE_WRITE) {
// Remote shutdown read (write error) before.
force_close(ss,s,l,result);
} else {
close_read(ss, s, result);
}
return SOCKET_CLOSE;
}
if (halfclose_read(s)) {
// discard recv data (Rare case : if socket is HALFCLOSE_READ, reading event is disable.)
FREE(buffer);
return -1;
}
stat_read(ss,s,n);
result->opaque = s->opaque;
result->id = s->id;
result->ud = n;
result->data = buffer;
if (n == sz) {
s->p.size *= 2;
return SOCKET_MORE;
} else if (sz > MIN_READ_BUFFER && n*2 < sz) {
s->p.size /= 2;
}
return SOCKET_DATA;
}
4.4、消息发送完毕
多线程写。同一个 fd 可以在不同的 actor 中发送数据。skynet底层通过加锁确保数据正确发送到 socket 的写缓冲区。
int
socket_server_send(struct socket_server *ss, struct socket_sendbuffer *buf) {
int id = buf->id;
struct socket * s = &ss->slot[HASH_ID(id)];
if (socket_invalid(s, id) || s->closing) {
free_buffer(ss, buf);
return -1;
}
struct socket_lock l;
socket_lock_init(s, &l);
// 确保能在fd上写数据,连接可用状态,检测 socket 线程是否在对该fd操作
if (can_direct_write(s,id) && socket_trylock(&l)) {
// may be we can send directly, double check
if (can_direct_write(s,id)) { // 双检测
// send directly
struct send_object so;
send_object_init_from_sendbuffer(ss, &so, buf);
ssize_t n;
if (s->protocol == PROTOCOL_TCP) {
// 尝试在 work 线程直接写,如果n >0,则写成功
n = write(s->fd, so.buffer, so.sz);
} else {
union sockaddr_all sa;
socklen_t sasz = udp_socket_address(s, s->p.udp_address, &sa);
if (sasz == 0) {
skynet_error(NULL, "socket-server : set udp (%d) address first.", id);
socket_unlock(&l);
so.free_func((void *)buf->buffer);
return -1;
}
n = sendto(s->fd, so.buffer, so.sz, 0, &sa.s, sasz);
}
if (n<0) {
// ignore error, let socket thread try again
// 如果失败,不要在 work 线程中处理异常,所有网络异常在 socket 线程中处理
n = 0;
}
stat_write(ss,s,n);
if (n == so.sz) {
// write done
socket_unlock(&l);
so.free_func((void *)buf->buffer);
return 0;
}
// write failed, put buffer into s->dw_* , and let socket thread send it. see send_buffer()
s->dw_buffer = clone_buffer(buf, &s->dw_size);
s->dw_offset = n;
socket_unlock(&l);
struct request_package request;
request.u.send.id = id;
request.u.send.sz = 0;
request.u.send.buffer = NULL;
// 如果写失败,可能写缓冲区满,或被中断打断,直接交由 socket 线程去重试。
// 这里通过 pipe 来与 socket 线程通信。
// let socket thread enable write event
send_request(ss, &request, 'W', sizeof(request.u.send));
return 0;
}
socket_unlock(&l);
}
inc_sending_ref(s, id);
struct request_package request;
request.u.send.id = id;
request.u.send.buffer = clone_buffer(buf, &request.u.send.sz);
send_request(ss, &request, 'D', sizeof(request.u.send));
return 0;
}
注意:在 work 线程中调用的。如果work线程写失败,可能写缓冲区满,或被中断打断,直接交由 socket 线程去重试。这里通过 pipe 来与 socket 线程通信。
五、测试skynet对半关闭的支持
main.lua
local skynet = require "skynet"
local socket = require "skynet.socket"
skynet.start(function ()
local listenfd = socket.listen("0.0.0.0", 8888)
socket.start(listenfd, function (clientfd, addr)
print("receive a client:", clientfd, addr)
socket.start(clientfd)
while true do
local data = socket.readline(clientfd, "\n")
if not data then -- read = 0
-- for i=1,10 do
socket.write(clientfd, "FINAL\n")
-- end
socket.close(clientfd)
print("closed", clientfd)
return
end
if data == "quit" then
print("recv quit", clientfd)
socket.close(clientfd)
return
else
print("S recv:", data)
socket.write(clientfd, "=> "..data.."\n")
end
end
end)
end)
client.lua
package.cpath = "./skynet/luaclib/?.so"
package.path = "./skynet/lualib/?.lua;./?.lua"
if _VERSION ~= "Lua 5.4" then
error "Use lua 5.4"
end
local socket = require "client.socket"
local fd = assert(socket.connect("127.0.0.1", 8888))
local function send_package(pack)
socket.send(fd, pack .. "\n")
end
local function unpack_package(text)
local size = #text
if size < 1 then
return nil, text
end
local pos = text:find("\n", 1, true)
if not pos then
return nil, text
end
return text:sub(1,pos-1), text:sub(pos+1)
end
local function recv_package(last)
local result
result, last = unpack_package(last)
if result then
return result, last
end
local r = socket.recv(fd)
if not r then
return nil, last
end
if r == "" then
error "Server closed"
end
return unpack_package(last .. r)
end
local last = ""
local function dispatch_package()
while true do
local v
v, last = recv_package(last)
if not v then
break
end
print(v)
end
end
while true do
dispatch_package()
local cmd = socket.readstdin()
if cmd then
if cmd == "quit" then
send_package("quit")
elseif cmd == "shut_r" then
-- send_package("shut_r")
socket.shutdown(fd, "r")
-- send_package("shut_r")
elseif cmd == "shut_w" then
-- send_package("shut_w")
send_package("shut_w1")
send_package("shut_w2")
send_package("shut_w3")
send_package("shut_w4")
socket.shutdown(fd, "w")
else
send_package(cmd)
end
else
socket.usleep(100)
end
end
config
thread=4
logger=nil
harbor=0
start="main" -- 启动第一个服务
lua_path="./skynet/lualib/?.lua;".."./skynet/lualib/?/init.lua;".."./lualib/?.lua;"
luaservice="./skynet/service/?.lua;./app/?.lua"
lualoader="./skynet/lualib/loader.lua"
cpath="./skynet/cservice/?.so"
lua_cpath="./skynet/luaclib/?.so"
Makefile
SKYNET_PATH?=./skynet
all:
cd $(SKYNET_PATH) && $(MAKE) PLAT='linux'
clean:
cd $(SKYNET_PATH) && $(MAKE) clean
项目结构:
-- mytest
----+ skynet
----| app
-------- main.lua
-------- client.lua
----+ config
----+ Makefile
编译和运行:
make
./skynet/skynet config
启动client.lua的方式:
./skynet/3rd/lua/lua ./app/client.lua
5.1、测试直接关闭进程
启动client.lua,然后CTL+C直接关闭进程。
./skynet/3rd/lua/lua ./app/client.lua
- 此时内核协议栈会收到一个FIN包,内核协议栈的读缓冲区插入EOF,并触发读事件。
- skynet开始读数据,读到EOF(即read()==0);开始关闭读事件和读端。
- 然后进入main.lua的while循环,尝试发送消息;但是消息是无法发送出去了,因为客户端进程已经关闭,资源已经被释放,协议栈会把其丢弃。
- 注意,读数据、写数据、业务逻辑都在线程池执行,socket网络线程只负责监听触发事件;socket网络线程和线程池通过pipeline消息交互,epoll可以监听这个消息,然后注册读、写事件。
5.2、测试关闭读端
启动client.lua,输入shut_r命令。
$ ./skynet/3rd/lua/lua ./app/client.lua
shut_r
- 关闭读端,会走RST流程,立即进入CLOSE状态,服务端会报出“Connection reset by peer”错误。
- 服务端关闭写端,但仍然能接受到客户端发送的数据。
- 接受完数据后才调用关闭读端,结束连接。
receive a client: 4 127.0.0.1:35384
S recv: shut_r
S recv: shut_r2
closed 4
[:00000008] socket: error on 4 Connection reset by peer
5.2、测试关闭写端
启动client.lua,输入shut_w命令。
$ ./skynet/3rd/lua/lua ./app/client.lua
shut_w
- 关闭写端,服务器会关闭读端。
- 在完全关闭连接之前,服务器还可以发送数据到客户端。
server端:
receive a client: 5 127.0.0.1:35486
S recv: shut_w1
S recv: shut_w2
S recv: shut_w3
S recv: shut_w4
closed 5
client端:
shut_w
SHUTDOWN 3 1
=> shut_w1
=> shut_w2
=> shut_w3
=> shut_w4
FINAL
总结
ACK包是不会重发的,如果ACK包丢失了,那么发送端在一定时间内接受不到数据就会重发FIN包,一般三次FIN包都没有收到ACK就会直接进入close。
后言
本专栏知识点是通过<零声教育>的系统学习,进行梳理总结写下文章,对c/c++linux系统提升感兴趣的读者,可以点击链接查看详细的服务:C/C++服务器开发 。