【十 三】Netty 私有协议栈开发
Netty 私有协议栈开发
- 私有协议介绍
- 跨节点通信
- 私有协议栈设计
- 私有协议栈的网络拓扑图
- 协议栈功能描述
- 协议的通信模型
- 消息体定义
- 消息定义表
- 私有协议消息头定义
- 私有协议支持的字段类型
- 私有协议的编解码规范
- 私有协议的编码
- 私有协议的解码
- 链路的建立
- 客户端握手请求
- 握手请求消息定义
- 服务端握手请求
- 握手应答消息定义
- 链路的关闭
- 可靠性设计
- 心跳机制
- 设计思路
- 重连机制
- 重复登录保护
- 消息缓存重发
- 安全设计
- 可扩展性设计
- 私有协议栈开发
- maven 依赖
- 数据结构定义
- NettyMessage 类
- 消息头 Header
- 消息类型 MessageType
- 消息工厂类 NettyMessageFactory
- 消息编解码
- Netty消息编码 NettyMessageEncoder
- JBoss Marshalling 编码类 MarshallingEncoder
- MarshallingEncoder 依赖类ChannelBufferByteOutput
- 消息解码类 NettyMessageDecoder
- JBoss Marshalling 解码类 MarshallingDecoderPrivate
- MarshallingDecoderPrivate 依赖类 ChannelBufferByteInput
- 握手和安全认证
- 客户端握手业务处理类 LoginAuthRespHandler
- 服务端握手业务处理类 LoginAuthRespHandler
- 心跳检测逻辑
- 客户端心跳业务处理类 HeartBeatReqHandler
- 服务端心跳业务处理类 HeartBeatRespHandler
- 启动类
- 客户端启动类 NettyClient
- 服务端启动类 NettyServer
- 测试
- 正常场景
- 服务端打印截图:
- 客户端打印截图
- 模拟服务端宕机
- 服务端关闭后截图
- 机器内存使用情况截图:
- 服务端启动后截图
- 模拟客户端宕机
- 关闭客户端之后的服务端截图
- 重启客户端之后的服务端截图
- 常见bug
- 问题现象 服务端接收不到数据
- NettyMessageDecoder
- 问题现象 客户端无法发送数据
- NettyMessageEncoder
- bug分析指导
- 总结
- 参考内容:
私有协议介绍
通信协议从广义上区分,可以分为公有协议和私有协议。由于私有协议的灵活性,它往往会在某个公司或组织内部使用,按需定制,升级起来毕竟方便,灵活性较好。绝大多数的私有协议传输层都是基于TCP/IP。所以利用Netty 的NIO TCP协议栈可以非常方便地进行私有协议的定制和开发。
私有协议本质上是厂商内部发展和采用的标准,除非授权,其他厂商一般无权使用该协议。
由于现代软件系统的复杂性,一个大型软件往往被拆分为多个模块,随着移动互联网的兴起,网址的规模也越来越大,业务的功能越来越多,为了能够支撑业务的发展,往往需要集群和分布式部署。这样,各个模块之前就需要跨节点通信。
跨节点通信
传统的跨节点通信
(1)通过RMI进行远程服务调用
(2)通过Java 的Socket+Java 序列化的方式进行跨节点调用
(3)利用一些开源的RPC 框架进行远程服务调用,例如FaceBook的Thrift,Apache 的Avro, 阿里巴巴的 Dubbo 等
(4)利用标准的共有协议进行跨节点服务调用。例如 HTTP+XML,RESTFUL+JSON 或者 WebService。
跨节点的远程服务调用,除了链路层的物理连接外,还需要对请求和响应消息进行编解码。在请求和应答消息本身以外,也需要携带一些其他控制和管理类指令,例如链路建立的握手请求和响应消息,链路检查的心跳消息等。这些功能组合到一起,就会形成私有协议。
私有协议栈设计
该协议用于内部模块之间的通信,基于TCP/IP 协议栈,是一个类HTTP协议的应用层协议栈,相比于传统的标准协议栈,它更加轻巧,灵活和实用。
私有协议栈的网络拓扑图
如上图所示,每个Netty节点(Netty进程)之间建立长连接,使用Netty协议进行通信,Netty节点并没有服务端和客户端的区分,谁首先发起连接,谁就是客户端,另一方自然就成为服务端。一个Netty节点既可以作为客户端连接其他的Netty节点,也可以作为Netty服务端被其他Netty节点连接,完全取决于使用者的业务场景。
协议栈功能描述
该私有协议承载了业务内部各模块之间的消息较好和服务调用,主要功能如下:
(1)基于Netty的NIO通信框架,提供高性能POJO的序列化和反序列化
(2)提供消息的编码解码框架,可以实现POJO的序列化和反序列化
(3)提供基于IP地址的白名单计入认知机制
(4)链路的有效性校验机制
(5)链路的断连重连机制
协议的通信模型
该私有协议 双方链路建立成功之后,双方可以进行全双工通信,无论客户端还是服务端,都可以主动发送请求消息给对方,通信方式可以是TWO WAY 或者 ONE WAY。双方之间的心跳采用Ping-Pong机制,当链路处于空闲状态时,客户端主动发送Ping消息给服务端,服务端接收到消息后发送应答消息Pong给客户端,如果客户端连续发送N条消息都没有接收到服务端返回的Pong消息,说明链路已经挂死或者对方处于异常状态,客户端主动关闭连接,间隔周期T后发起重连操作。
消息体定义
私有协议栈消息体定义分为两部分:
(1)消息头
(2)消息体
消息定义表
私有协议消息头定义
私有协议支持的字段类型
私有协议的编解码规范
私有协议的编码
私有协议 NettyMessage 的编码规范
(1)crcCode:java.nio.ByteBuffer.putInt(int value)。如果采用其他缓存,必须于其等价。
(2)length:java.nio.ByteBuffer.putInt(int value)。如果采用其他缓存,必须于其等价。
(3)sessionID:java.nio.ByteBuffer.putInt(int value)。如果采用其他缓存,必须于其等价。
(4)type:java.nio.ByteBuffer.putLong(long value)。 如果采用其他缓存,必须于其等价。
(5)priority:java.nio.ByteBuffer.put(byte b)。如果采用其他缓存,必须于其等价。
(6)attachment:它的编码规则为:如果attachment 长度为0,表示没有可选附件,则将长度编码设置为0,
java.nio.ByteBuffer.putInt(0);如果大于0,说明有附件需要编码,具体的编码规则如下
一:首先对附件的个数进行编码,java.nio.ByteBuffer.putInt(attachment.size());
二:然后对Key进行编码,先编码长度,再将它转换成数组之后编码内容。
(7)body的编码:通过JBoss Marshallng 将其序列化为byte数组,然后调用java.nio.ByteBuffer.put(byte[]src)将其写入ByteBuffer缓冲区中。
由于整个消息的长度必须等全部字段都编码完成之后才能确认,所以最后需要更新消息头的length字段,将其重新写入ByteBuffer中。
私有协议的解码
相对于NettyMessage的编码,仍旧以java.nio.ByteBuffer为例,给出Netty协议的解码规范
(1)crcCode:通过java.nio.ByteBuffer.getInt()获取校验码字段,其他缓冲区需要于其等价
(2)length:通过java.nio.ByteBuffer.getInt()获取Netty消息的长度,其他缓冲区需要于其等价
(3)sessionID:通过java.nio.ByteBuffer.getLong()获取会话ID,其他缓冲区需要于其等价
(4)type:通过java.nio.ByteBuffer.get()获取消息类型,其他缓冲区需要于其等价
(5)priority:通过java.nio.ByteBuffer.get获取消息优先级,其他缓冲区需要于其等价
(6)attachment:它的解码规则为,首先创建一个新的attachment对象,调用java.nio.ByteBuffer.getInt()获取附件长度,如果为0,说明附件为空。解码结束,继续解码消息体,如果非空,则根据长度通过for循环进行解码。
(7)body:通过JBoss 的marshaller进行解码
链路的建立
该私有协议栈支持服务端和客户端,对于使用该私有协议栈的应用程序而言,不需要刻意区分到底是客户端还是服务端,在分布式组网环境中,一个节点可能既是服务端也是客户端。
考虑到安全,链路建立需要通过基于IP地址或者号段的黑白名单 安全认证机制,本协议使用基于IP地址的安全认证,如果有多个IP,通过逗号进行分割。
客户端握手请求
客户端与服务端链路建立成功之后,由客户端发送握手请求消息。
握手请求消息定义
(1)消息头的type 字段值为3
(2)可选附件个数为0
(3)消息体为空
(4)握手消息的长度为22个字节
服务端握手请求
服务端收到客户端的握手请求之后,如果IP校验通过,返回握手成功应答消息给客户端。应用层链路建立成功。
握手应答消息定义
(1)消息头的type字段值为4
(2)可选附件个数为0
(3)消息体为byte类型的结果,0 :认证成功;1:认证失败
链路建立成功之后,客户端和服务端就可以互相发送业务消息了。
链路的关闭
由于采用长连接通信,在正常的业务运行期间,双方通过心跳和业务消息维持链路,任何一方都不需要主动关闭连接。但是,下面清空服务端和客户端需要关闭连接
(1)当对方宕机或者重启,会主动关闭链路,另一方读取到操作系统的通知信号,得知对方REST链路,需要关闭链路,释放自身的句柄等资源。由于采用TCP全双工通信,通信双方都需要关闭连接,释放资源
(2)消息读写过程中,发送了I/O异常,需要主动关闭连接
(3)心跳消息读写过程中发生了I/O异常,需要主动关闭连接
(4)心跳超时,需要主动关闭连接
(5)发送编码异常等不可恢复错误时,需要主动关闭连接
可靠性设计
该私有协议栈可能会运行在非常恶劣的网络环境中,网络超时,闪断,对方进程僵死或者处理缓慢等情况都有可能发送。为了包装在这些极端异常场景下 私有协议栈能够正常工作或者自动恢复,需要对可靠性进行规划和设计。
心跳机制
在凌晨等业务低谷期时段,如果发生网络问题,连接被Hang住时,由于没有业务消息,很难发现问题。到了白天业务高峰期间,会发生大量的网络通信失败,严重的会导致一段时间进程内无法处理业务消息。为了解决该问题,在网络空闲时采用心跳机制来检测链路的互通性,一旦发生网络故障,立即关闭链路,主动重连。以此来保证 连接可用。
设计思路
(1)当网络处于空闲状态持续时间达到T(连续周期T 没有读写消息)时,客户端主动发起Ping 心跳消息给服务端
(2)如果在下一个周期T到来时客户端没有收到对方发送的Pong心跳应答消息或者读取到服务端发送的其他业务消息,则心跳失败计数器加1.
(3)每当客户端接收到服务端的业务消息或者Pong应答消息,将心跳失败计数器清0。当连续N次没有接收到服务端的Pong消息或者业务消息,则关闭链路。间隔 INTERVAL时间发起重连操作。
(4)服务端网络空闲状态持续时间达到T后,服务端将心跳失败计数器加1,只要接收到客户端发送的Ping消息或者其他业务消息,计数器清零。
(5)服务端濑尿虾N次没有接收到客户端的Ping消息或者其他业务消息,则关闭链路,释放资源,等待客户端重连。
通过Ping-Pong双向心跳机制,可用保证无论通信哪一方出现网络故障,都能被及时地检测出来,为了防止由于短时间内繁忙没有及时返回应答造成的误判,设计 只有连续N次心跳检测都失败才认定链路已经损害,需要关闭链路并重建链路。
当读或者写心跳消息发送I/O异常时,说明链路已经中断,此时需要立即关闭链路,如果时客户端,需要重新发起连接,如果时服务端,需要清空缓冲的半包信息,等待客户端重连。
重连机制
如果链路中断,等待INTERVAL周期时间后,由客户端发起重连操作,如果重连失败,间隔周期INTERVAL后发起重连,直到重连成功。
为了保证服务端能够有充足的时间释放句柄资源,在首次断连时客户端需要 等待INTERVAL 时间后再发起重连,而不是失败后就立即重连。
为了保证句柄资源能够及时释放,无论什么场景下的重连失败,客户端都必须保证自身的资源被及时释放,包括不仅限于SocketChannel,Sokcet。 否则大量连接失败 可能导致内存泄漏。
重连失败后,需要大于异常堆栈信息,方便定位跟踪问题。
重复登录保护
当客户端握手成功之后,在链路处于正常状态下,不允许客户端重复登录,以防止客户端在异常状态下反复重连导致句柄资源被耗尽。
服务端接收到客户端的握手请求消息之后,首先对IP地址进行合法性校验,如果校验成功,在缓存的地址表中查看客户端是否已经登录,如果已经登录,则拒绝重复登录,返回错误码-1.同时关闭TCP链路,并在服务端的日志打印握手失败的原因。
客户端接收到握手失败的应答消息之后,关闭客户端的TCP连接,等待INTERVAL时间之后,再次发起TCP连接,直到认证成功。
为了防止由服务端和客户端对链路状态理解不一致导致的客户端无法握手成功的问题,当服务端连续N次心跳超时之后需要主动关闭链路,清空该客户端的地址缓存信息,以保证后续该客户端可用重连成功,防止被重复登录保护机制拒绝掉。
消息缓存重发
无论客户端还是服务端,当发送链路中断之后,在链路恢复之前,缓存在消息队列中待发送的消息不能丢失,等链路恢复之后,重新发送这些消息,保证链路中断期间消息不丢失。
考虑到内存溢出的风险,可用设置消息缓存队列的上限,当达到上限后,拒绝继续向该队列添加新的消息。
安全设计
为了保证整个集群环境的安全,内部长连接采用基于IP地址的安全认证机制,服务端对握手请求消息的IP地址进行合法性校验,如果在白名单之内,则校验通过,否则,拒绝对方连接。
如果将该私有协议放入到公网使用,需要采用更加严格的安全认知机制,例如基于密码和AES加密的用户+密码认证机制,也可以采用SSL/TSL安全传输。
可扩展性设计
该私有协议需要具备一定的扩展能力,业务可以在消息头中自定义业务域字段,通过Netty 消息头中的可选附件attachment字段,业务可以方便地进行自定义扩展。
私有协议栈开发
maven 依赖
<dependency>
<groupId>org.jboss.marshalling</groupId>
<artifactId>jboss-marshalling</artifactId>
<version>1.4.11.Final</version>
</dependency>
<dependency>
<groupId>org.jboss.marshalling</groupId>
<artifactId>jboss-marshalling-serial</artifactId>
<version>1.4.11.Final</version>
</dependency>
<dependency>
<groupId>org.jibx</groupId>
<artifactId>jibx-run</artifactId>
<version>1.4.2</version>
</dependency>
<!-- https://mvnrepository.com/artifact/org.jibx/jibx-extras -->
<dependency>
<groupId>org.jibx</groupId>
<artifactId>jibx-extras</artifactId>
<version>1.4.2</version>
</dependency>
<!-- https://mvnrepository.com/artifact/org.jibx/jibx-bind -->
<dependency>
<groupId>org.jibx</groupId>
<artifactId>jibx-bind</artifactId>
<version>1.4.2</version>
</dependency>
<!-- https://mvnrepository.com/artifact/org.jibx/jibx-tools -->
<dependency>
<groupId>org.jibx</groupId>
<artifactId>jibx-tools</artifactId>
<version>1.4.2</version>
</dependency>
<!-- https://mvnrepository.com/artifact/org.jibx/jibx-schema -->
<dependency>
<groupId>org.jibx</groupId>
<artifactId>jibx-schema</artifactId>
<version>1.4.2</version>
</dependency>
<!-- https://mvnrepository.com/artifact/org.apache.bcel/bcel -->
<dependency>
<groupId>org.apache.bcel</groupId>
<artifactId>bcel</artifactId>
<version>6.7.0</version>
</dependency>
<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-all</artifactId> <!-- Use 'netty5-all' for 5.0-->
<version>5.0.0.Alpha1</version>
<scope>compile</scope>
</dependency>
数据结构定义
NettyMessage 类
public class NettyMessage {
//消息头
private Header header;
//消息体
private Object body;
public Header getHeader() {
return header;
}
public void setHeader(Header header) {
this.header = header;
}
public Object getBody() {
return body;
}
public void setBody(Object body) {
this.body = body;
}
@Override
public String toString() {
return "NettyMessage{" +
"header=" + header +
", body=" + body +
'}';
}
}
消息头 Header
public class Header {
private int crcCode = 0xabef0101;
//消息长度
private int length;
//会话ID
private long sessionID;
//消息类型
private byte type;
//消息优先级
private byte priority;
//附件
private Map<String, Object> attachment = new HashMap<String, Object>();
public int getCrcCode() {
return crcCode;
}
public void setCrcCode(int crcCode) {
this.crcCode = crcCode;
}
public int getLength() {
return length;
}
public void setLength(int length) {
this.length = length;
}
public long getSessionID() {
return sessionID;
}
public void setSessionID(long sessionID) {
this.sessionID = sessionID;
}
public byte getType() {
return type;
}
public void setType(byte type) {
this.type = type;
}
public byte getPriority() {
return priority;
}
public void setPriority(byte priority) {
this.priority = priority;
}
public Map<String, Object> getAttachment() {
return attachment;
}
public void setAttachment(Map<String, Object> attachment) {
this.attachment = attachment;
}
@Override
public String toString() {
return "Header{" +
"crcCode=" + crcCode +
", length=" + length +
", sessionID=" + sessionID +
", type=" + type +
", priority=" + priority +
", attachment=" + attachment +
'}';
}
}
消息类型 MessageType
public enum MessageType {
//业务请求消息
SERVICE_REQ((byte)0),
//业务响应消息
SERVICE_RESP((byte)1),
//业务ONE WAY 消息
ONE_WAY((byte)2),
//握手请求消息
LOGIN_REQ((byte)3),
//握手响应消息
LOGIN_RESP((byte)4),
//心跳请求消息
HEARTBEAT_REQ((byte)5),
//心跳响应消息
HEARTBEAT_RESP((byte)6);
private byte value;
MessageType (byte value){
this.value=value;
}
public byte value(){
return value;
}
}
消息工厂类 NettyMessageFactory
抽取公共的方法
public class NettyMessageFactory {
public static NettyMessage buildNettyMessage(byte value){
NettyMessage message=new NettyMessage();
Header header=new Header();
header.setType(value);
message.setHeader(header);
return message;
}
}
消息编解码
分别定义了NettyMessageDecoder和NettyMessageEncoder。用于NettyMessage消息的编解码。
Netty消息编码 NettyMessageEncoder
public final class NettyMessageEncoder extends MessageToByteEncoder<NettyMessage> {
MarshallingEncoder marshallingEncoder;
public NettyMessageEncoder() throws IOException {
System.out.println("NettyMessageEncoder 构造");
this.marshallingEncoder=new MarshallingEncoder();
}
/**
* 编码
* @param context
* @param nettyMessage
* @param sendBuf
* @throws Exception
*/
@Override
protected void encode(ChannelHandlerContext context,
NettyMessage nettyMessage, ByteBuf sendBuf) throws Exception {
System.out.println("NettyMessageEncoder.encode");
if (nettyMessage ==null || nettyMessage.getHeader()==null){
throw new Exception("The encode message is null");
}
// ByteBuf sendBuf= Unpooled.buffer();
sendBuf.writeInt(nettyMessage.getHeader().getCrcCode());//校验码
sendBuf.writeInt(nettyMessage.getHeader().getLength());//总长度
sendBuf.writeLong(nettyMessage.getHeader().getSessionID());//回话id
sendBuf.writeByte(nettyMessage.getHeader().getType());//消息类型
sendBuf.writeByte(nettyMessage.getHeader().getPriority());//优先级
//附件长度编码
//编码规则为:如果attachment的长度为0,表示没有可选附件,则将长度 编码设置为0
//如果attachment长度大于0,则需要编码,规则:
//首先对附件的个数进行编码
sendBuf.writeInt(nettyMessage.getHeader().getAttachment().size());
String key=null;
byte[] keyArray=null;
Object value= null;
//然后循环对每个附件进行编码
for (Map.Entry<String,Object> param:nettyMessage.getHeader().getAttachment().entrySet()){
key=param.getKey();
keyArray=key.getBytes(StandardCharsets.UTF_8);
sendBuf.writeInt(keyArray.length);
sendBuf.writeBytes(keyArray);
//采用JBoss 的Marshaling 进行编码
marshallingEncoder.encode(value,sendBuf);
}
key =null;
keyArray=null;
value=null;
if (nettyMessage.getBody()!=null){
//body 也是采用JBoss 的Marshalling进行编码
marshallingEncoder.encode(nettyMessage.getBody(),sendBuf);
}else {
//如果没有数据,则进行补位,方便后续的decoder操作
sendBuf.writeInt(0);
}
//最后我们要获取整个数据包的总长度,也就是header + body
//这里需要减掉 8个字节,是因为 要把CRC 和长度本身占的减掉
//总长度是在header 协议的第二个标记字段中
//第一个参数是长度属性的索引位置
sendBuf.setInt(4,sendBuf.readableBytes()-8);
}
}
JBoss Marshalling 编码类 MarshallingEncoder
public class MarshallingEncoder {
//空白占位:用于预留设置 body 的数据长度包长度
private static final byte[] LENGTH_PLACEHOLDER=new byte[4];
Marshaller marshaller;
public MarshallingEncoder () throws IOException {
marshaller=MarshallingCodeCFactory.buildMarshalling();
}
public void encode(Object msg, ByteBuf out) throws IOException {
try {
//必须要知道当前的数据位置在哪,起始位置
int lengthPos=out.writerIndex();
//占位写操作,先写一个4个字节的空的内容,记录起始数据位置,用于设置内容长度
out.writeBytes(LENGTH_PLACEHOLDER);
ChannelBufferByteOutput output=new ChannelBufferByteOutput(out);
marshaller.start(output);
marshaller.writeObject(msg);
marshaller.finish();
//总长度(结束位置)-初始化长度(起始位置)-预留的长度 = body 数据长度
out.setInt(lengthPos,out.writerIndex()-lengthPos-4);
} catch (IOException e) {
e.printStackTrace();
}finally {
marshaller.close();
}
}
}
MarshallingEncoder 依赖类ChannelBufferByteOutput
public class ChannelBufferByteOutput implements ByteOutput {
private final ByteBuf byteBuf;
public ChannelBufferByteOutput(ByteBuf byteBuf){
this.byteBuf=byteBuf;
}
@Override
public void write(int i) throws IOException {
byteBuf.writeByte(i);
}
@Override
public void write(byte[] bytes) throws IOException {
byteBuf.writeBytes(bytes);
}
@Override
public void write(byte[] bytes, int srcIndex, int length) throws IOException {
byteBuf.writeBytes(bytes,srcIndex,length);
}
@Override
public void close() throws IOException {
}
@Override
public void flush() throws IOException {
}
public ByteBuf getByteBuf() {
return byteBuf;
}
}
消息解码类 NettyMessageDecoder
public class NettyMessageDecoder extends LengthFieldBasedFrameDecoder {
MarshallingDecoderPrivate marshallingDecoder;
/**
*
* @param maxFrameLength 代表最大的序列化长度
* @param lengthFieldoffset 长度属性的偏移量,简单来说就是message中,
* 总长度的起始位置(Header中的Length属性的起始位置),本例中是 4
* @param lengthFieLength 代表长度属性的长度,整个属性占多长 4
* @throws IOException
*/
public NettyMessageDecoder(int maxFrameLength,int lengthFieldoffset,int lengthFieLength) throws IOException {
super(maxFrameLength,lengthFieldoffset,lengthFieLength);
System.out.println("NettyMessageDecoder 构造方法");
marshallingDecoder=new MarshallingDecoderPrivate();
}
protected Object decode(ChannelHandlerContext context, ByteBuf in) throws IOException {
System.out.println("NettyMessageDecoder decode.....");
ByteBuf frame= null;
try {
frame = (ByteBuf) super.decode(context,in);
} catch (Exception e) {
e.printStackTrace();
}
if (frame==null){
return null;
}
NettyMessage message=new NettyMessage();
Header header=new Header();
//代码写错了,将frame 写成了in .排查问题 2小时。
//此时 in.readInt 报错。校验的时候 报数组下标越界.
header.setCrcCode(frame.readInt());
header.setLength(frame.readInt());
header.setSessionID(frame.readLong());
header.setType(frame.readByte());
header.setPriority(frame.readByte());
int size=frame.readInt();
//附件个数大于0,则需要解码操作
if (size>0){
Map<String,Object> attachment=new HashMap<>(size);
int keySize=0;
byte[]keyArray=null;
String key =null;
for (int i=0;i<size;i++){
keySize=in.readInt();
keyArray=new byte[keySize];
in.readBytes(keyArray);
key=new String(keyArray,"UTF-8");
attachment.put(key,marshallingDecoder.decode(frame));
}
keyArray=null;
key=null;
header.setAttachment(attachment);
}
if (in.readableBytes()>4){
message.setBody(marshallingDecoder.decode(in));
}
//对于ByteBuf来说,对一个数据,就少一个,所以读完header,生效就是body了。
message.setHeader(header);
return message;
}
}
JBoss Marshalling 解码类 MarshallingDecoderPrivate
public class MarshallingDecoderPrivate {
private final Unmarshaller unmarshaller;
public MarshallingDecoderPrivate() throws IOException {
unmarshaller = MarshallingCodeCFactory.buildMUnMarshaller();
}
public Object decode(ByteBuf in) throws IOException {
//首先读取4个长度
int objectSize = in.readInt();
//获取实际body 的缓冲内容
ByteBuf buf = in.slice(in.readerIndex(), objectSize);
//转换
ByteInput input = new ChannelBufferByteInput(buf);
try {
//读取
unmarshaller.start(input);
Object obj = unmarshaller.readObject();
unmarshaller.finish();
//读取完成后,更新当前读取起始位置
in.readerIndex(in.readerIndex() + objectSize);
return obj;
} catch (IOException e) {
e.printStackTrace();
} catch (ClassNotFoundException e) {
e.printStackTrace();
} finally {
unmarshaller.close();
}
return null;
}
}
MarshallingDecoderPrivate 依赖类 ChannelBufferByteInput
public class ChannelBufferByteInput implements ByteInput {
private final ByteBuf byteBuf;
public ChannelBufferByteInput(ByteBuf byteBuf) {
this.byteBuf = byteBuf;
}
@Override
public int read() throws IOException {
if (byteBuf.isReadable()) {
return byteBuf.readByte() & 0xff;
}
return -1;
}
@Override
public int read(byte[] bytes) throws IOException {
return read(bytes,0,bytes.length);
}
@Override
public int read(byte[] bytes, int dstIndex, int length) throws IOException {
int available=available();
if (available==0){
return -1;
}
length=Math.min(available,length);
byteBuf.readBytes(bytes,dstIndex,length);
return length;
}
@Override
public int available() throws IOException {
return byteBuf.readableBytes();
}
@Override
public long skip(long bytes) throws IOException {
int readAble=byteBuf.readableBytes();
if (readAble<bytes){
bytes=readAble;
}
byteBuf.readerIndex((int)(byteBuf.readerIndex()+bytes));
return bytes;
}
@Override
public void close() throws IOException {
}
}
握手和安全认证
客户端握手业务处理类 LoginAuthRespHandler
public class LoginAuthReqHandler extends ChannelHandlerAdapter {
public void channelActive(ChannelHandlerContext context){
System.out.println("LoginAuthReqHandler .channelActive");
//握手成功后,客户端给服务端发送消息,类型为3
NettyMessage message=NettyMessageFactory.buildNettyMessage(MessageType.LOGIN_REQ.value());
System.out.println("握手成功后,客户端给服务端发送心跳消息 --->"+message);
context.writeAndFlush(message);
}
public void channelRead(ChannelHandlerContext context,Object msg){
System.out.println("LoginAuthReqHandler.channelRead");
NettyMessage message=(NettyMessage)msg;
//如果是握手应答消息,需要判断是否认证成功
if (message.getHeader()!=null&&message.getHeader().getType()== MessageType.LOGIN_REQ.value()){
byte loginResult=(byte) message.getBody();
//非0 表示认证失败
if (loginResult!= (byte) 0){
//握手失败,关闭连接,关闭链路。等会发现连接
context.close();
}else {
System.out.println("Login is ok : "+message);
context.fireChannelRead(msg);
}
}
else {
//如果不是握手应答消息,则直接传给后面的ChannelHandler进行错了
context.fireChannelRead(msg);
}
}
public void exceptionCaught(ChannelHandlerContext context,Throwable cause){
context.fireExceptionCaught(cause);
}
}
服务端握手业务处理类 LoginAuthRespHandler
public class LoginAuthRespHandler extends ChannelHandlerAdapter {
private Map<String,Boolean> nodeCheck=new ConcurrentHashMap<>();
//定义白名单
private String[] whiteList={"127.0.0.1","172.21.27.85"};
public void channelRead(ChannelHandlerContext context,Object msg){
System.out.println("LoginAuthRespHandler.channelRead");
NettyMessage message =(NettyMessage) msg;
//如果是握手请求消息,处理
if (message.getHeader()!=null&&message.getHeader().getType()== MessageType.LOGIN_REQ.value()){
String nodeIndex=context.channel().remoteAddress().toString();
NettyMessage loginResp=null;
//重复登录,拒绝,防止由于客户端重复登录导致的句柄泄露(即这个对象 无法被回收导致内存无效使用)
if (nodeCheck.containsKey(nodeIndex)){
loginResp=buildResponse((byte)-1);
}else {
//校验地址是否在白名单中,如果在,则通过握手成功
InetSocketAddress address=(InetSocketAddress) context.channel().remoteAddress();
String ip=address.getAddress().getHostAddress();
boolean isOk=false;
for (String whiteIp:whiteList){
if (whiteIp.equals(ip)){
isOk=true;
break;
}
}
//握手成功后,需要返回握手应答消息
loginResp= isOk?buildResponse((byte)0):buildResponse((byte)-1);
if (isOk){
nodeCheck.put(nodeIndex,true);
}
}
System.out.println("The login response is : "+loginResp+" body ["+loginResp.getBody()+" ]");
context.writeAndFlush(loginResp);
}else {
context.fireChannelRead(msg);
}
}
private NettyMessage buildResponse(byte result){
NettyMessage message = NettyMessageFactory.buildNettyMessage(MessageType.LOGIN_RESP.value());
message.setBody(result);
return message;
}
public void exceptionCaught(ChannelHandlerContext context,Throwable cause){
//发生异常的时候,删除缓存中 的地址信息,保证后续客户端能重连成功
nodeCheck.remove(context.channel().remoteAddress().toString());
context.close();
context.fireExceptionCaught(cause);
}
}
心跳检测逻辑
客户端心跳业务处理类 HeartBeatReqHandler
public class HeartBeatReqHandler extends ChannelHandlerAdapter {
private volatile ScheduledFuture<?> heartBeat;
public void channelRead(ChannelHandlerContext context,Object msg){
System.out.println("HeartBeatReqHandler.channelRead");
NettyMessage message=(NettyMessage) msg;
//握手成功,主动发送心跳消息
if (message.getHeader()!=null&& message.getHeader().getType()== MessageType.LOGIN_RESP.value()){
System.out.println("客户端发送消息给服务端.... ");
//如果握手成功,循环给服务端发送心跳
heartBeat =context.executor().scheduleAtFixedRate(
new HeartBeatTask(context),0,5000, TimeUnit.MILLISECONDS);
//如果握手消息,则 打印服务端的消息
}else if (message.getHeader()!=null&& MessageType.HEARTBEAT_RESP.value()==message.getHeader().getType()){
System.out.println("Client receive server heart beat message : --->"+message);
}else {
context.fireChannelRead(msg);
}
}
private class HeartBeatTask implements Runnable{
private final ChannelHandlerContext context;
public HeartBeatTask(ChannelHandlerContext context){
this.context=context;
}
@Override
public void run() {
//构造NettyMessage 实体,然后通过ChannelHandlerContext 发送心跳
NettyMessage beatMessage= NettyMessageFactory.buildNettyMessage(MessageType.HEARTBEAT_REQ.value());
System.out.println("Client send start beat message to server : -->"+beatMessage);
context.writeAndFlush(beatMessage);
}
}
public void exceptionCaught(ChannelHandlerContext context,Throwable cause){
if (heartBeat!=null){
heartBeat.cancel(true);
heartBeat=null;
}
context.fireExceptionCaught(cause);
}
}
服务端心跳业务处理类 HeartBeatRespHandler
public class HeartBeatRespHandler extends ChannelHandlerAdapter {
public void channelActive(ChannelHandlerContext context){
System.out.println("channel is connected");
}
public void channelRead(ChannelHandlerContext context,Object msg){
System.out.println("HeartBeatRespHandler.channelRead");
NettyMessage message =(NettyMessage) msg;
//返回心跳应答消息,构造NettyMessage 直接返回即可
//然后打印 接收和发送的心跳消息
if (message.getHeader()!=null&&message.getHeader().getType()== MessageType.HEARTBEAT_REQ.value()){
System.out.println("receive client heart beat message : -->"+message);
NettyMessage heartBeat= NettyMessageFactory.buildNettyMessage(MessageType.HEARTBEAT_RESP.value());
System.out.println("Send heart beat response message to client : --> "+heartBeat);
context.writeAndFlush(heartBeat);
}else {
context.fireChannelRead(msg);
}
}
}
启动类
客户端启动类 NettyClient
public class NettyClient {
private ScheduledExecutorService executorService= Executors.newScheduledThreadPool(1);
EventLoopGroup group=new NioEventLoopGroup();
public void connect(String host,int port){
try {
//配置客户端NIO线程组
Bootstrap bootstrap=new Bootstrap();
bootstrap.group(group)
.option(ChannelOption.TCP_NODELAY,true)
.channel(NioSocketChannel.class)
.handler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel socketChannel) throws Exception {
//NettyMessageDecoder 用于消息解码,为了防止由于单条消息过大导致的内存
//溢出或者畸形码流导致解码错位引起内存分配失败,对单条消息的最大长度做了上限限制
socketChannel.pipeline()
.addLast(new NettyMessageDecoder(1024*1024,4,4));
//NettyMessageEncoder 用于协议消息的自动编码
socketChannel.pipeline()
.addLast(new NettyMessageEncoder());
//ReadTimeoutHandler用于处理超时
socketChannel.pipeline()
.addLast("readTimeoutHandler",new ReadTimeoutHandler(50));
//LoginAuthReqHandler 握手处理
socketChannel.pipeline()
.addLast("LoginAuthHandler",new LoginAuthReqHandler());
//HeartBeatReqHandler 心跳处理
socketChannel.pipeline()
.addLast("HeartBeatHandler",new HeartBeatReqHandler());
}
});
ChannelFuture future= bootstrap.connect(host,port).sync();
System.out.println("The client is connect to server with port : "+port);
future.channel().closeFuture().sync();
} catch (InterruptedException e) {
e.printStackTrace();
System.out.println("链接失败....."+e.getMessage());
}finally {
//所有资源释放完成之后,清空资源,再次发起重连操作
executorService.execute(new Runnable() {
@Override
public void run() {
try {
TimeUnit.SECONDS.sleep(1);
//重连操作
System.out.println("发起重连操作");
connect("127.0.0.1",8080);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
});
}
}
public static void main(String[] args) {
new NettyClient().connect("127.0.0.1",8080);
}
}
服务端启动类 NettyServer
public class NettyServer {
public void bind(){
//配置服务端的NIO线程组
EventLoopGroup bossGroup=new NioEventLoopGroup();
EventLoopGroup workGroup=new NioEventLoopGroup();
try {
ServerBootstrap bootstrap=new ServerBootstrap();
bootstrap.group(bossGroup,workGroup)
.channel(NioServerSocketChannel.class)
.option(ChannelOption.SO_BACKLOG,1024)
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel socketChannel) throws Exception {
socketChannel.pipeline()
.addLast(new NettyMessageDecoder(1024*1024,4,4));
socketChannel.pipeline().addLast(new NettyMessageEncoder());
socketChannel.pipeline()
.addLast("readTimeoutHandler",new ReadTimeoutHandler(50));
socketChannel.pipeline()
.addLast(new LoginAuthRespHandler());
socketChannel.pipeline()
.addLast("heartBeatHandler",new HeartBeatRespHandler());
/* socketChannel.pipeline()
.addLast(new ServerHandler());*/
}
});
ChannelFuture future=bootstrap.bind("127.0.0.1",8080).sync();
System.out.println("the server is started ...");
future.channel().closeFuture().sync();
} catch (InterruptedException e) {
e.printStackTrace();
}finally {
bossGroup.shutdownGracefully();
workGroup.shutdownGracefully();
}
}
public static void main(String[] args) {
new NettyServer().bind();
}
}
测试
正常场景
依次启动服务端和客户端。
服务端打印截图:
客户端打印截图
客户端和服务端都能正常打印数据,发送心跳请求和握手请求。没有发送丢包和粘包现象。
模拟服务端宕机
在服务端和客户端正常连接发送消息的过程中。重启服务端。
观察如下功能是否正常
(1)客户端能否正常发起重连
(2)重连成功后,不再发起重连
(3)短链期间,心跳定时器停止工作,不再发送心跳请求消息
(4)服务端重启成功后,允许客户端重新登录
(5)服务端重启成功过之后,客户端端能重连并握手成功
(6)重连成功之后,双方的心跳能够正常互发
(7)性能指标,重连期间,客户端资源得到正确回收,不会导致句柄资源泄漏
服务端关闭后截图
机器内存使用情况截图:
截图可以看出,内存没有随着发起 重连请求的次数增加而增加,说明没有发送句柄泄漏。
但是本机模拟的数据较小,可能不太准确,小伙伴们可以多弄几个客户端测试下。
服务端启动后截图
通过截图能看出,上述功能均正常。
模拟客户端宕机
客户端宕机后,服务端需要能够清除缓存信息。允许客户端重新登录。
关闭客户端之后的服务端截图
重启客户端之后的服务端截图
常见bug
我本人遇到的问题最多 的还是编码解码那块的代码。错了之后,系统把异常吃掉了,只能是去debugger跟踪。
很是麻烦。在此讲下自己遇到的一些问题。
问题现象 服务端接收不到数据
问题简单说明如截图。问题的现象是服务端接收不到数据。捋清楚的思路,想了下 应该是服务端解码出现了问题。然后就一直debugg 看了,还好找到了。
NettyMessageDecoder
问题现象 客户端无法发送数据
也是分析了很久,发现是客户端无法发送数据。大概率是发送数据的时候,编码出现了问题。
NettyMessageEncoder
错误的版本 错误的版本中,NettyMessageEncoder 是继承的MessageToMessageEncoder。
还是重写的encode方法。但是 入参不一样。错误的版本中,有个Listout 参数。需要我们 把编码后的ByteBuf 对象加入到 out 属性中继续传递下。但是没有加一行代码。也是找了半天问题。关键是要定位是什么问题,
比如客户端无法发出数据还是客户端无法接收数据,还是服务端无法发出数据,服务端无法接收数据。
我刚好遇到了客户端无法发出数据和服务端无法接收数据 两个问题。
public final class NettyMessageEncoder extends MessageToMessageEncoder<NettyMessage> {
MarshallingEncoder marshallingEncoder;
public NettyMessageEncoder() throws IOException {
System.out.println("NettyMessageEncoder 构造");
this.marshallingEncoder=new MarshallingEncoder();
}
/**
* 编码
* @param context
* @param nettyMessage
* @param list
* @throws Exception
*/
@Override
protected void encode(ChannelHandlerContext context,
NettyMessage nettyMessage, List<Object> list) throws Exception {
//System.out.println("NettyMessageEncoder.encode");
if (nettyMessage ==null || nettyMessage.getHeader()==null){
throw new Exception("The encode message is null");
}
ByteBuf sendBuf= Unpooled.buffer();
sendBuf.writeInt(nettyMessage.getHeader().getCrcCode());//校验码
sendBuf.writeInt(nettyMessage.getHeader().getLength());//总长度
sendBuf.writeLong(nettyMessage.getHeader().getSessionID());//回话id
sendBuf.writeByte(nettyMessage.getHeader().getType());//消息类型
sendBuf.writeByte(nettyMessage.getHeader().getPriority());//优先级
//附件长度编码
//编码规则为:如果attachment的长度为0,表示没有可选附件,则将长度 编码设置为0
//如果attachment长度大于0,则需要编码,规则:
//首先对附件的个数进行编码
sendBuf.writeInt(nettyMessage.getHeader().getAttachment().size());
String key=null;
byte[] keyArray=null;
Object value= null;
//然后循环对每个附件进行编码
for (Map.Entry<String,Object> param:nettyMessage.getHeader().getAttachment().entrySet()){
key=param.getKey();
keyArray=key.getBytes(StandardCharsets.UTF_8);
sendBuf.writeInt(keyArray.length);
sendBuf.writeBytes(keyArray);
//采用JBoss 的Marshaling 进行编码
marshallingEncoder.encode(value,sendBuf);
}
key =null;
keyArray=null;
value=null;
if (nettyMessage.getBody()!=null){
//body 也是采用JBoss 的Marshalling进行编码
marshallingEncoder.encode(nettyMessage.getBody(),sendBuf);
}else {
//如果没有数据,则进行补位,方便后续的decoder操作
sendBuf.writeInt(0);
}
//最后我们要获取整个数据包的总长度,也就是header + body
//这里需要减掉 8个字节,是因为 要把CRC 和长度本身占的减掉
//总长度是在header 协议的第二个标记字段中
sendBuf.setInt(4,sendBuf.readableBytes()-8);
}
}
bug分析指导
当出现了问题较难排查时,可以将客户端和服务端分开。用自己的客户端 连接 正确的服务端,来判断客户端是否有问题。用自己的服务端来连接正确的客户端。判断服务端是否有问题。
当然了,本博客的代码基本流程时正确的,大家可以模拟去验证下自己写的代码。
总结
本章的内容比较多,从内容介绍,协议栈的设计,各种场景的考虑以及实现细节的分析到代码的落地。都有详细的说明。
如果我们需要自己在工作中开发协议栈,这个博客是个很好的例子供我们学习然后逐步深入。
参考内容:
<<netty权威指南>> 需要可以评论区留下邮箱
博客:https://blog.csdn.net/qq_42651904/article/details/106484685