1. Socket通信实战
什么是Socket
在计算机网络中,Socket
被描述为一个抽象的关键字,由IP+port
组成,它描述了TCP
连接的一个对等端
如果要通过互联网进行通信,则至少需要一对套接字
- 运行于服务器的
Server Socket
- 运行于客户端的
Client Server
这个图描述了TCP
三次握手->传输数据->TCP
四次挥手的过程
client
实际上并不需要指定端口,因为有默认端口的设定,它会在TCP
的首部中进行描述,而它需要指定的是,你这个消息要发往哪个目的主机和哪个端口,这样对方的服务器才能够通过此端口接收数据
server
需要选择监听哪个端口,才能正确接收数据,由于发过来的TCP
报文段中具有对方的源地址和源端口信息,因此在回送的时候可以直接根据头部信息获取回送的路径
BIO:简单的EchoServer实现
BIO的实现思路是使用阻塞式的IO输入,只有当双方都完成IO后,连接才会断开,阻塞的时机是在accept()
中,只有当接收到数据后,线程才会继续向下执行代码
ServerSocket serverSocket = new ServerSocket(port);
//2. 通过accept监听客户端发送过来的请求
Socket socket;
while ((socket = serverSocket.accept())!=null){
//3. 通过输入流读取客户端发送过来的信息
log.info("客户端已经连接");
InputStream inputStream = socket.getInputStream();
OutputStream outputStream = socket.getOutputStream();
ObjectInputStream objectInputStream = new ObjectInputStream(inputStream);
ObjectOutputStream objectOutputStream = new ObjectOutputStream(outputStream);
Object message = objectInputStream.readObject();
log.info("接收到的信息是{}",message);
//4. 通过输出流回送信息
objectOutputStream.writeObject("end");
objectOutputStream.flush();
如果要进行修正,那么可以引入一个线程池
public static final Logger log = LoggerFactory.getLogger(EchoServerTest.class);
/**
* 服务端开始监听
* @param port
*/
public static void start(int port){
//1. 创建ServerSocket,并且监听一个端口
try {
ServerSocket serverSocket = new ServerSocket(port);
//2. 通过accept监听客户端发送过来的请求
Socket socket;
while ((socket = serverSocket.accept())!=null){
log.info("处理的线程ID是{}",Thread.currentThread().getId());
//3. 通过输入流读取客户端发送过来的信息
log.info("客户端已经连接");
InputStream inputStream = socket.getInputStream();
OutputStream outputStream = socket.getOutputStream();
ObjectInputStream objectInputStream = new ObjectInputStream(inputStream);
ObjectOutputStream objectOutputStream = new ObjectOutputStream(outputStream);
Object message = objectInputStream.readObject();
log.info("接收到的信息是{}",message);
//4. 通过输出流回送信息
objectOutputStream.writeObject("end");
objectOutputStream.flush();
}
} catch (IOException e) {
e.printStackTrace();
} catch (ClassNotFoundException e) {
e.printStackTrace();
}
}
public static void main(String[] args) {
//1.创建线程池
ThreadFactory threadFactory = Executors.defaultThreadFactory();
ExecutorService threadPool = new ThreadPoolExecutor(10,100,1,
TimeUnit.MINUTES,new ArrayBlockingQueue<>(100),threadFactory);
threadPool.execute(()->{
log.info("创建新的线程,线程ID:{}",Thread.currentThread().getId());
start(80);
});
}
BIO的最大痛点是:处理线程和请求数是1:1的出现的,其关键在于监听请求和处理请求都是同一个线程完成的
尽管引入了线程池,但是当线程数量过多的时候,也会导致程序的崩溃,因此我们应该尽可能的减少线程的使用,一个办法是将监听请求的线程和处理请求的线程给解耦,有一个线程专门负责请求的接收,然后将此请求分发到下面的子线程进行处理
threadPool.submit(()->{
log.info("处理的线程ID是{}",Thread.currentThread().getId());
//3. 通过输入流读取客户端发送过来的信息
log.info("客户端已经连接");
InputStream inputStream = null;
try {
inputStream = socket.getInputStream();
OutputStream outputStream = socket.getOutputStream();
ObjectInputStream objectInputStream = new ObjectInputStream(inputStream);
ObjectOutputStream objectOutputStream = new ObjectOutputStream(outputStream);
Object message = objectInputStream.readObject();
log.info("接收到的信息是{}",message);
//4. 通过输出流回送信息
objectOutputStream.writeObject("end");
objectOutputStream.flush();
} catch (IOException e) {
e.printStackTrace();
} catch (ClassNotFoundException e) {
e.printStackTrace();
}
});
经过我自己的测试,当线程数达到十万级别的时候,CPU飙到了百分之90,而这仅仅只是传送一个end
而已,而且还有很多请求被丢掉了,性能实在太差
于是引入了NIO
2. Netty入门实战
Netty
成功找到了一种方式,使得在不妥协可维护性和性能情况下实现易于开发,性能,稳定性和灵活性的方式
首先定义RPC
通信对象
@Data
public class RpcRequest {
private String interfaceName;
private String methodName;
}
@Data
public class RpcResponse {
private String message;
}
初始化客户端程序
客户端的职责:用一个向服务端发送消息的sendMessage()
方法,通过此方法可以将消息也就是RpcRequest
对象发送到服务端,并且可以同步获取到服务端返回的结构,也就是RpcResponse
我们来看看初始化的时候,都做了一些什么:
private static final Bootstrap b;
//初始化相关资源
static {
EventLoopGroup eventLoopGroup = new NioEventLoopGroup();
b = new Bootstrap();
//初始化序列化器
KryoSerializer kryoSerializer = new KryoSerializer();
b.group(eventLoopGroup)
.channel(NioSocketChannel.class).handler(new LoggingHandler(LogLevel.INFO))
.option(ChannelOption.CONNECT_TIMEOUT_MILLIS,5000)
.handler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel socketChannel) throws Exception {
//自定义序列化编码器
//RpcResponse->ByteBuf
socketChannel.pipeline().addLast();
//ByteBuf->RpcRequest
socketChannel.pipeline().addLast();
socketChannel.pipeline().addLast();
}
});
}
有很多不认识的东西,我们先来认识一下这些东西
Bootstrap
:它是启动引导类,它是客户端/服务端启动的时候的辅助类,它的作用是规定了连接的端口以及处理I/O
,接收I/O
请求的规则,
EventLoopGroup
:它相当于一个线程组,EventLoop
相当于一个处理线程,那么group
就相当于一个线程池,Netty用来处理请求的线程就来自于这个参数,就相当于,它是一个施工队,当需要干活的人从里面挑工人出去干活
channel
:是代表连接的类型,比如说是TCP
连接?还是UDP
等
handler
:消息处理器,代表当请求到来的时候如何处理请求,它就类似于工程师,专门设计方案的,设计出来后,交给施工队进行处理,施工队拿着图纸开始施工
初始化通道处理器的过程之后再讲
,这里先看编码器
public class NettyKryoEncoder extends MessageToByteEncoder<Object> {
private final Serializer serializer;
private final Class<?> genericClass;
@Override
protected void encode(ChannelHandlerContext channelHandlerContext, Object o, ByteBuf byteBuf) throws Exception {
if(genericClass.isInstance(o)){
//1.将对象转化为Byte
byte[] serialize = serializer.serialize(o);
//2. 读取消息的长度
int dataLength = serialize.length;
//3. 写入消息长度
byteBuf.writeInt(dataLength);
//4. 写入消息
byteBuf.writeBytes(serialize);
}
}
}
public class NettyKryoDecoder extends ByteToMessageDecoder {
private final Serializer serializer;
private final Class<?> genericClass;
private static final int BODY_LENGTH = 4;
@Override
protected void decode(ChannelHandlerContext channelHandlerContext, ByteBuf byteBuf, List<Object> list) throws Exception {
if(byteBuf == null || byteBuf.readableBytes()<BODY_LENGTH){
return;
}
//1.读取消息的长度
byteBuf.markReaderIndex();
int dataLength = byteBuf.readInt();
//2.参数校验
if(dataLength <0 || byteBuf.readableBytes()<0){
return ;
}
//3.如果可读的字节数小于消息长度的话也是不合理的
if(dataLength > byteBuf.readableBytes()){
//那么这时候只能放弃解码,重置读指针,还原本次的修改,等待下一次的I/O
byteBuf.resetReaderIndex();
return;
}
byte[] body = new byte[dataLength];
byteBuf.readBytes(body);
//反序列化
Object target = serializer.deserialize(body, genericClass);
list.add(target);
log.info("反序列化完成");
}
}
技术点:这里解决了粘包和半包的问题
粘包的话会导致消息的长度<字节数组的长度,这里的话用的方案是提供读取长度的方案。
那么为什么Handler
上要注册这几个东西呢?
我们先来回顾一下,在连接上,我们到底在做什么?
首先对于客户端而言,它的工作是将自己产生的RpcRequest
发送到服务端对吧?
那么在这个过程中,通道上需要有一个方法支持,它支持将这个对象编码为二进制字符串,因此需要注册
Decoder、Encoder
方法
第二个,客户端还需要解码来自于服务端的消息
因此通道上需要注册一个将二进制字符串转换为对象的方法
第三个,客户端获取到了一个响应,那这个响应要放到哪里去?
一个通道内的数据通常应该是共享的,因此设计了
通道域
,将通道中的信息放到一个容器中,对于调用方,只要访问这个容器就能取到数据了
我们可以看到,这上面的三个环节实际上是环环相扣的,也就是每一部分的工作都由专人负责
技术点:使用责任链模式解耦合编码/解码/数据存取工作
至此,客户端就初始完毕了
接下来是核心部分,就是规定客户端如何收发数据
public RpcResponse sendMessage(RpcRequest rpcRequest){
//1.建立连接,并且获取连接对象数据
//2.建立连接成功,向通道中写入数据
//3.监听返回状态
//4.返回信息成功,从通道中取出返回值
//5.返回信息失败,显示错误信息
}
大致流程如上,接下来我们借助于netty进行实现
public RpcResponse sendMessage(RpcRequest rpcRequest){
try {
//1.建立连接,并且获取连接对象数据
ChannelFuture channelFuture = b.connect(host, port).sync();
//1.1 获取通道描述对象
Channel channel = channelFuture.channel();
if(channel != null) {
//2.建立连接成功,向通道中写入数据,并且监听发送状态
channel.writeAndFlush(rpcRequest).addListener(future -> {
if(future.isSuccess()){
log.info("客户端成功发送了信息,请求信息是:{}",rpcRequest.toString());
}else{
log.error("客户端发送信息失败!");
}
});
//3.等待服务端返回信息,当信息传递完毕后通道关闭,因此我们监听通道的存活状态
channel.closeFuture().sync();
//4.通道关闭,数据应当已经写入到了通道域中
AttributeKey<RpcResponse> key = AttributeKey.valueOf("rpcResponse");
//4.返回信息成功,取出返回值
return channel.attr(key).get();
}
} catch (InterruptedException e) {
//5.返回信息失败,显示错误信息
e.printStackTrace();
}
return null;
}
服务端开发
首先服务端的架构是基于IO多路复用
的架构,这个架构的特点是
- 有一个主线程,该主线程负责接收请求,并且把请求分发到下属的工作线程
- 工作线程完成任务后,将任务结果通知主线程,主线程将结果返回给客户端
private void run() {
//1.主线程
EventLoopGroup bossGroup = new NioEventLoopGroup();
//2.工作者线程
EventLoopGroup workerGroup = new NioEventLoopGroup();
//3.序列化器实现
KryoSerializer kryoSerializer = new KryoSerializer();
try{
//4.初始化引导器
ServerBootstrap b = new ServerBootstrap();
b.group(bossGroup,workerGroup)
.channel(NioServerSocketChannel.class)//注册通道类型
.childOption(ChannelOption.TCP_NODELAY,true)//注册每一个channel的TCP策略
.childOption(ChannelOption.SO_KEEPALIVE,true)
.option(ChannelOption.SO_BACKLOG,128)
.handler(new LoggingHandler(LogLevel.INFO))
.childHandler(new ChannelInitializer<SocketChannel>() {//每一个channel的处理策略
@Override
protected void initChannel(SocketChannel socketChannel) throws Exception {
socketChannel.pipeline().addLast();
socketChannel.pipeline().addLast();
socketChannel.pipeline().addLast();
}
});
}finally {
}
}
依然的是有很多东西看不懂,下面一个个的进行解读
为什么客户端的初始化是在静态块中进行初始化,那到服务器上就变成了实例了?
这个与功能有关
客户端的引导器在初始化的时候并不需要知道对方的IP和port,只有当发起调用的时候才需要知道,因此引导器的初始化放在static
和实例中
均能够完成初始化,选择在static
中能够加速首次调用,但是会降低整个服务的启动速度
服务端的引导器在初始化的时候就必须绑定端口号,但是端口号是程序设定的,一般来说,只有在程序准备完毕后才会执行开启服务的调用,如果在类初始化时就开启调用,可能导致错误的发生
child和非child有什么区别?
从架构上来看,是一个主线程下分配若干个子线程去真正的在Channel
中处理数据,因此需要额外配置每个子线程的和子通道的处理策略
ChannelOption.TCP_NODELAY
它意味着开启了Nagle算法,该算法的作用是尽可能快的发送大数据块,减少网络的传输量,这个参数的作用当为true的时候,就开启了Nagle
算法
ChannelOption.SO_KEEPALIVE
它意味着开启了TCP底层的保活机制,避免客户端关闭后占用线程资源
ChannelOption.SO_BACKLOG
它表示TCP连接队列的数量,如果连接建立频繁,服务器处理新连接比较慢,可以适当调大这个参数
然后是子线程的处理机制我们要怎么设计?
从通道中接收到一个数据,工作是
- 将二进制串解码,还原对象
- 执行方法
- 将产生的响应对象编码,发送到通道中
于是设计为:
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel socketChannel) throws Exception {
socketChannel.pipeline().addLast(new NettyKryoDecoder(kryoSerializer, RpcRequest.class));
socketChannel.pipeline().addLast(new NettyKryoEncoder(kryoSerializer, RpcResponse.class));
socketChannel.pipeline().addLast(new NettyServerHandler());
}
初始化完毕,开启服务端
//绑定监听端口,同步等待绑定成功
ChannelFuture future = b.bind(port).sync();
//等待服务端监听端口关闭
future.channel().closeFuture().sync();
完整代码
package netty.server;
/**
* Netty发起远程调用的服务端
*
* @author: 张庭杰
* @date: 2023年02月03日 16:40
*/
public class NettyServer {
private static final Logger log = LoggerFactory.getLogger(NettyServer.class);
private final int port;
public NettyServer(int port) {//监听端口
this.port = port;
}
private void run() {
//1.主线程
EventLoopGroup bossGroup = new NioEventLoopGroup();
//2.工作者线程
EventLoopGroup workerGroup = new NioEventLoopGroup();
//3.序列化器实现
KryoSerializer kryoSerializer = new KryoSerializer();
try{
//4.初始化引导器
ServerBootstrap b = new ServerBootstrap();
b.group(bossGroup,workerGroup)
.channel(NioServerSocketChannel.class)//注册通道类型
.childOption(ChannelOption.TCP_NODELAY,true)
.childOption(ChannelOption.SO_KEEPALIVE,true)
.option(ChannelOption.SO_BACKLOG,128)
.handler(new LoggingHandler(LogLevel.INFO))
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel socketChannel) throws Exception {
socketChannel.pipeline().addLast(new NettyKryoDecoder(kryoSerializer, RpcRequest.class));
socketChannel.pipeline().addLast(new NettyKryoEncoder(kryoSerializer, RpcResponse.class));
socketChannel.pipeline().addLast(new NettyServerHandler());
}
});
//绑定监听端口,同步等待绑定成功
ChannelFuture future = b.bind(port).sync();
//等待服务端监听端口关闭
future.channel().closeFuture().sync();
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
//关闭后,执行关闭
bossGroup.shutdownGracefully();
workerGroup.shutdownGracefully();
}
}
}
将处理消息的逻辑处理
try {
//1.获取请求信息
RpcRequest rpcRequest = (RpcRequest) msg;
log.info("接收到了信息{},次数:{}",rpcRequest.toString(),COUNTER.incrementAndGet());
//2.返回调用信息
//TODO:在本地发起调用...
RpcResponse rpcResponse = new RpcResponse();
rpcResponse.setMessage("complete!");
//3.向通道中写回信息
ChannelFuture future = ctx.writeAndFlush(rpcResponse);
future.addListener(ChannelFutureListener.CLOSE);
}finally {
ReferenceCountUtil.release(msg);
}
ReferenceCountUtil.release(msg);
这一句是对不需要消息的收尾,防止触发GC
的时候大量消息在JVM
中,导致GC缓慢
测试代码
new NettyServer(6666).run();
for (int i = 0; i < 10; i++) {
RpcRequest rpcRequest = new RpcRequest();
rpcRequest.setInterfaceName("haha");
rpcRequest.setMethodName("haha");
NettyClient nettyClient = new NettyClient("127.0.0.1",6666);
RpcResponse rpcResponse = nettyClient.sendMessage(rpcRequest);
System.out.println(rpcResponse);
}
测试通过
3. 网络传输模块分析
上面通过一个简单的Netty
案例实现了一个远程调用,只不过我们还没有真正意义上的去调用远程的方法,因此下面我们进行改造
3.1 完善实体类信息
我们想要调用一个远程方法,要知道的必选元素有:
- 暴露出来的接口名称
- 暴露出来的方法名称
- 方法所需要的参数
- 方法所需要的参数的类型
这些调用一个方法所必须的元素,但是这个实体类有个问题
当多请求同时发送的时候,这时候响应是几乎同时响应的,那么我怎么知道哪次调用是谁的呢
如果是BIO的话我们不会有这个问题,但是我们目前是NIO。
这里的话我们借鉴
HTTP/2
中的StreamId
的思想,给它添加一个requestId
,用来作为一对请求和响应的标识。
第二个问题,就是当我调用的这个接口,如果有多个实现类,那么我怎么调用到准确的那个实体类呢?
也是添加一个标识字段,名字任意,这里运用了最佳实践中的group
第三个问题,当接口的实现类修改了它的实现,而且是一次重大升级,为了保证兼容,服务端提供了两个接口,一个是旧版本的,一个是新版本的,而他们是一个实现类,这时候也要添加一个标识符,我把它叫做version
于是,最终设计出来的结果是,UUID是生成的当前的时间戳
public class RpcRequest {
private static final long serialVersionUID = 1675615710L;
/** 必选参数类型 **/
private String interfaceName;//暴露出来的接口名称
private String methodName;//暴露出来的方法名称
private Object[] parameters;//调用该方法的参数实参
private Class<?> paraTypes;//这些参数的类型
/** 辅助参数类型 **/
private String group;//接口的实现类标识
private String version;//接口实现方法版本标识
private String requestId;//请求ID,类似于HTTP/2中的StreamId
}
接着要完成的是响应类的设计
像HTTP
响应一样,响应需要有响应头和响应体
- 关于响应头
响应头应该包含了常见的信息,比如说:
状态码code
:本次请求的结果,如200,403,304等
请求Id
:代表本次响应对应的是哪一次请求
消息message
:代表本次响应的消息,比如是成功,还是失败
- 关于响应体
响应体就是封装回来的数据,由于不确定,我们使用泛型
public class RpcResponse <T>{
private static final long serialVersionUID = 1675616430L;
/*必备参数*/
private String code;//响应状态码
private String message;//自定义响应消息,ok?fail?
private T data;//响应体数据
}
为了便于结果的封装,我们这里提供了响应成功和响应失败的便捷方法
public static <T> RpcResponse<T> ok(T data,String requestId){
//封装数据
RpcResponse<T> rpcResponse = new RpcResponse<>();
rpcResponse.setCode(RpcResponseCode.SUCCESSS.getCode());
rpcResponse.setMessage(RpcResponseCode.SUCCESSS.getMessage());
rpcResponse.setRequestId(requestId);
if (data != null) {
rpcResponse.setData(data);
}
return rpcResponse;
}
public static <T> RpcResponse<T> fail(String requestId,RpcResponseCode rpcResponseCode){
RpcResponse<T> rpcResponse = new RpcResponse<>();
rpcResponse.setCode(rpcResponse.getCode());
rpcResponse.setMessage(rpcResponse.getMessage());
rpcResponse.setRequestId(requestId);
return rpcResponse;
}
3.2 基于Socket实现的RPC远程调用
首先,我们先将暴露出来的接口准备好
/**
*
* @param rpcRequest 请求对象
* @return Object:返回请求回来的数据
*/
Object sendRprRequest(RpcRequest rpcRequest);
然后我们只需要去实现这个接口就可以了
基于Socket实现客户端
首先我们分析Socket
要做什么,它要做的就是将request
对象传输到服务端,然后服务端解析,获取解析后的结果再返回就可以了
那么在这个过程中,我们如果要访问一个服务,那么必然就是要从注册中去查找这个服务,于是在做实际的服务之前,我们必须先定义好查找服务的接口
第一步:首先规定服务名称的格式,服务名称应该能够起唯一标识的作用,我们根据
/** 必选参数类型 **/
private String interfaceName;//暴露出来的接口名称
private String methodName;//暴露出来的方法名称
private Object[] parameters;//调用该方法的参数实参
private Class<?> paraTypes;//这些参数的类型
/** 辅助参数类型 **/
private String group;//接口的实现类标识
private String version;//接口实现方法版本标识
private String requestId;//请求ID,类似于HTTP/2中的StreamId
初步规定服务名称的格式为
interfaceName/group/version
为什么不用
method
?这是因为这里只是访问它的服务类,方法的调用当信息传递到远程后再决定
于是我们新定义一个方法,能够组装起每个请求所想要访问的服务名称:
public String toServiceName(){
return this.getInterfaceName() + File.separator +
this.getGroup()+ File.separator+
this.getVersion();
}
第二步:定义服务发现的接口与服务查找相关服务
/**
* 通过远程服务名称查找服务
* @param rpcServiceName
* @return
*/
InetSocketAddress lookUpService(String rpcServiceName);
第三步:实现客户端接口
private final ServiceDiscovery serviceDiscovery;
//先手动实现注入
public SocketRpcClient(ServiceDiscovery serviceDiscovery){
this.serviceDiscovery = serviceDiscovery;
}
/***
* 通过Socket请求远程服务
* @param rpcRequest 请求对象
* @return
*/
@Override
public Object sendRprRequest(RpcRequest rpcRequest) {
//1. 获取远程服务名称
//2. 通过注册中心查找服务
//3. 查找到服务后,通过socket连接发送对象数据
//此时server解析rpcRequest数据后,写入到outputStream中
//4. 获取返回后的结果
return null;
}
然后就是按照这个思路来实现代码了
public Object sendRprRequest(RpcRequest rpcRequest) {
Object result = null;
//1. 获取远程服务名称
String rpcServiceName = rpcRequest.toServiceName();
//2. 通过注册中心查找服务
InetSocketAddress inetSocketAddress = serviceDiscovery.lookUpService(rpcServiceName);
//3. 查找到服务后,通过socket连接发送对象数据
try(Socket socket = new Socket()){
//3.1 连接到远程服务器,并且将当前对象写入到对方的缓存区中
socket.connect(inetSocketAddress);
//3.2 获取输出流,输出对象到对方
OutputStream outputStream = socket.getOutputStream();
ObjectOutputStream objectOutputStream = new ObjectOutputStream(outputStream);
objectOutputStream.writeObject(rpcRequest);
//3.3 此时server解析rpcRequest数据后,写入到outputStream中
InputStream inputStream = socket.getInputStream();
ObjectInputStream objectInputStream = new ObjectInputStream(inputStream);
//4. 获取返回后的结果
result = objectInputStream.readObject();
} catch (IOException e) {
e.printStackTrace();
} catch (ClassNotFoundException e) {
e.printStackTrace();
}
return result;
}
基于Socket实现服务端
要实现服务端,首先这个服务端要先注册自己的服务到注册中心上并且发布,于是我们定义一个服务注册的实现类
这一步比较复杂,涉及到整个注册中心以及服务调用的流程
3.3 定义注册中心基本结构
这一步非常关键,是我们理解整个注册中心机制的过程,我们先画个图理一下思路
我们建立的过程是自顶向下的,因此从应用的最高层开始建立
- 最高层的是一个注册中心的实现,注册中心的功能是:
作为根目录服务器,提供键值对映射,远程服务名称->(IP,端口)
,然后发送数据到此服务器上,服务器执行监听即可
/**
* 存储键值对:rpcServiceName->socketAddress
* @param rpcServiceName
* @param socketAddress
*/
void registerService(String rpcServiceName, InetSocketAddress socketAddress);
那么我们如果想要注册服务,那么就必须知道服务器的Ip和端口对吧,然后知道了之后,我们怎么去调用呢?
使用socket进行通信的核心思路是:远程服务器开启一个主线程监听端口,接收端口的数据,然后根据远程调用的请求信息调用方法,最后返回对象
- 第二层就是让你实现这样一个服务器
分析:首先需要开启一个主线程监听服务,然后在主线程中提交rpc
请求。
于是需要:工作者线程池
、主线程的业务逻辑
于是写出框架如下:
/**
* 主线程开始监听客户端的请求
* @param port:监听端口号
*/
public void start(int port){
try(ServerSocket socket = new ServerSocket()){
//1.开启socket,监听port端口
String host = InetAddress.getLocalHost().getHostAddress();
//这里使用这个对象是为了更好的获得hostname和port,便于服务的注册
socket.bind(new InetSocketAddress(host,port));
//2.开始监听
Socket clientSocket;
while ((clientSocket = socket.accept())!=null){
log.info("客户端已经连接,客户端ip:{}",clientSocket.getInetAddress().getHostAddress());
workerGroup.execute(...);
}
workerGroup.shutdown();
} catch (IOException e) {
e.printStackTrace();
}
}
- 第三步:当客户端和服务端的连接打开后,服务端开始处理客户端发过来的请求,那么我们就需要定义一套逻辑专门用来处理这个连接上的参数
于是定义:
public class SocketRpcRequestHandler implements Runnable{
private Socket socket;
public SocketRpcRequestHandler(Socket socket) {
this.socket = socket;
}
@Override
public void run() {
}
}
这里带socket是为了让这个线程专门去处理socket
//1.从client中获取对方发送的请求
//2.将这个请求发送给专门的处理器
//3.处理器返回结果
//4.将结果写回输出流
还需要额外定义一个处理器类来完成结果返回与结果处理的解耦,于是定义
/**
* 处理请求对象
* @param rpcRequest
* @return
*/
public static Object handle(RpcRequest rpcRequest){
//1.根据服务名称获取具体的服务实现类
//2.获取到具体的服务实现类后,根据方法以及参数调用方法
}
private Object invokeMethod(RpcRequest rpcRequest,Object service){
Object result = null;
try{
//1.获取方法
String methodName = rpcRequest.getMethodName();
Method method = service.getClass().getMethod(methodName);
//2.使用上参数,调用方法
result = method.invoke(service,rpcRequest.getParameters());
} catch (NoSuchMethodException e) {
e.printStackTrace();
} catch (InvocationTargetException e) {
e.printStackTrace();
} catch (IllegalAccessException e) {
e.printStackTrace();
}
//3.返回结果
return result;
}
于是到了这一步,如何根据服务名称获得对应的实现类?
思路:封装一个服务提供者,里面缓存了当前实例所提供的所有
远程服务名称->服务实现类
,然后get出来即可
为什么要额外封装?这是因为我们的这个类是向外提供服务的,然而这个服务提供者还需要完成服务的注册与推送的功能,因此在此解耦合
public interface ServiceProvider {
/**
* 发布新的服务,将服务注册
* @param rpcServiceConfig
*/
void publishService(RpcServiceConfig rpcServiceConfig);
/**
* 通过远程调用名称获取服务的实现
* @param rpcServiceName
* @return
*/
Object getService(String rpcServiceName);
}
然后完善处理器代码
public Object handle(RpcRequest rpcRequest){
//1.根据服务名称获取具体的服务实现类
Object service = serviceProvider.getService(rpcRequest.toServiceName());
//2.获取到具体的服务实现类后,根据方法以及参数调用方法
return invokeMethod(rpcRequest,service);
}
接着回去编写线程任务代码
我们注意到,任务处理器不必每一次都处理一个新的,因为它没有线程安全问题,因此在此创建一个单例工厂
//存储单例对象
private static final Map<String,Object> singletonMap = new ConcurrentHashMap<>();
private SingletonFactory(){}
public static Object getInstance(Class<?> clazz){
if(clazz == null){
throw new IllegalArgumentException();
}
String key = clazz.toString();
if(singletonMap.containsKey(key)){
return clazz.cast(singletonMap.get(key));
}else{
try {
singletonMap.put(key,clazz.getDeclaredConstructor().newInstance());
} catch (InstantiationException e) {
e.printStackTrace();
} catch (IllegalAccessException e) {
e.printStackTrace();
} catch (InvocationTargetException e) {
e.printStackTrace();
} catch (NoSuchMethodException e) {
e.printStackTrace();
}
}
return singletonMap.get(key);
}
完整代码
public class SocketRpcRequestHandlerTask implements Runnable{
private Socket socket;
private SocketRpcRequestHandler socketRpcRequestHandler;
public SocketRpcRequestHandlerTask(Socket socket) {
this.socket = socket;
socketRpcRequestHandler = SingletonFactory.getInstance(SocketRpcRequestHandler.class);
}
@Override
public void run() {
log.info("当前处理任务线程:{}",Thread.currentThread().getId());
try{
//1.从client中获取对方发送的请求
InputStream inputStream = socket.getInputStream();
ObjectInputStream objectInputStream = new ObjectInputStream(inputStream);
RpcRequest rpcRequest = (RpcRequest) objectInputStream.readObject();
//2.将这个请求发送给专门的处理器进行解析和方法的调用
Object result = socketRpcRequestHandler.handle(rpcRequest);
//3.将结果写回输出流
OutputStream outputStream = socket.getOutputStream();
ObjectOutputStream objectOutputStream = new ObjectOutputStream(outputStream);
objectOutputStream.writeObject(result);
//4.完成响应
} catch (IOException e) {
e.printStackTrace();
} catch (ClassNotFoundException e) {
e.printStackTrace();
}
}
}
至此,基于Socket
实现的RPC远程调用就完成了
3.4 基于Netty实现的RPC远程调用(客户端)
在前面Netty
入门实战中,我们已经实现了一个基础可用的RPC
远程调用程序,但是存在问题
- 没有提供注册中心
- 客户端和服务器的通信机制单一,无法获得其他服务
基于此,我们要做的就是将我们刚才搭的注册中心的框架引入进去。
理顺执行的流程:
第一步:是客户端要通过用户的远程调用服务名称,从中取出服务器的地址
第二步:与对方服务器建立连接
第三步:使用封装好的请求对象进行数据交互
因此,我们需要引入注册中心:
private final ServiceDiscovery serviceDiscovery;
this.serviceDiscovery = serviceDiscovery;
在连接之前,通过服务端提交过来的请求
参数确定服务地址, 然后连接到远程,将东西发送过去就行了
初步修改:
InetSocketAddress inetSocketAddress = serviceDiscovery.lookUpService(rpcRequest.toServiceName());
try {
//1.建立连接,并且获取连接对象数据
ChannelFuture channelFuture = b.connect(inetSocketAddress).sync();
这样的话就把我们的注册中心引入进来了
3.5 客户端性能分析与优化
我们先来分析一下当前的代码有什么问题:
第一个问题:通道连接的浪费,目前我们的客户端处理响应是这样的
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
try{
RpcResponse rpcResponse = (RpcResponse) msg;
log.info("客户端读取到的信息为:{}",rpcResponse.getMessage());
AttributeKey<RpcResponse> key = AttributeKey.valueOf("rpcResponse");
ctx.channel().attr(key).set(rpcResponse);
ctx.channel().close();
}finally {
ReferenceCountUtil.release(msg);
}
}
也就是说,当服务端传过来一个响应之后,我们就立即关闭掉这个通道了,然后我们看看我们的通道是如何建立的
//1.建立连接,并且获取连接对象数据
ChannelFuture channelFuture = b.connect(inetSocketAddress).sync();
//1.1 获取通道描述对象
Channel channel = channelFuture.channel();
if(channel != null) {}
首先我们可以看出,建立通道是一个耗时的过程,如果不是耗时的过程也不会需要用到同步阻塞,同时,连接的连接可能会不成功
既然连接的建立那么费劲,那么我们为什么不复用连接呢?
在这里,我们的实现思路也是一个基于pooling
的基本思路,如果我们发现一条连接依然是有效的,那么我们就复用它,否则删除重建
- 第一步:建立一个
Channel
管理类,它封装了对Channel
的相关操作,包括:判断是否存活,存取Channel
等操作
编写如下:
public interface ChannelProvider {
Channel get(InetSocketAddress inetSocketAddress) ;
void set(InetSocketAddress inetSocketAddress, Channel channel);
public void remove(InetSocketAddress inetSocketAddress);
}
//内部缓存
private final Map<String,Channel> channelMap;
public ChannelProvider() {
channelMap = new ConcurrentHashMap<String, Channel>();
}
/**
* 通过inetSocketAddress来查找当前是否存在这个隧道
* @param inetSocketAddress
* @return
*/
public Channel get(InetSocketAddress inetSocketAddress){
String key = inetSocketAddress.toString();
Channel channel;
if(channelMap.containsKey(key)){
//如果存在这个key
//判断隧道还是否有效
channel = channelMap.get(key);
if(channel!=null && channel.isActive()){
return channel;
}else{//否则就已经失效了
channelMap.remove(key);
}
}
//否则的话就是压根没这个key
return null;
}
public void set(InetSocketAddress inetSocketAddress, Channel channel){
channelMap.put(inetSocketAddress.toString(),channel);
}
public void remove(InetSocketAddress inetSocketAddress){
channelMap.remove(inetSocketAddress.toString());
}
- 第二步,在客户端中注入这个对象,并且对获取通道的过程再次封装
/**
* 获取连接对象
* @param inetSocketAddress
* @return
*/
private Channel doConnect(InetSocketAddress inetSocketAddress) throws ExecutionException, InterruptedException {
CompletableFuture<Channel> completableFuture = new CompletableFuture<>();
b.connect(inetSocketAddress).addListener((ChannelFutureListener)future -> {
if(future.isSuccess()){
log.info("客户端连接服务器成功!");
completableFuture.complete(future.channel());
}else{
log.error("客户端连接服务器失败!");
throw new RpcException("客户端连接服务器失败!");
}
});
return completableFuture.get();
}
private Channel getChannel(InetSocketAddress inetSocketAddress){
Channel channel = provider.get(inetSocketAddress);
if(channel == null){//重建连接
try {
channel = doConnect(inetSocketAddress);
} catch (ExecutionException e) {
e.printStackTrace();
} catch (InterruptedException e) {
e.printStackTrace();
}
//然后这个隧道入池
provider.set(inetSocketAddress,channel);
}
return channel;
}
- 第三步:在业务中引入我们设计的池子,要替换的是这一段
//1.建立连接,并且获取连接对象数据
ChannelFuture channelFuture = b.connect(inetSocketAddress).sync();
//1.1 获取通道描述对象
Channel channel = channelFuture.channel();
于是修改为
Channel channel = getChannel(inetSocketAddress);
于是这个优化点就结束了
技术点:通过异步回调的方式获取连接后的通道,避免阻塞式连接,提高性能
此时,我们的客户端代码也需要修改,我们先来回顾一下之前写的逻辑
if(channel!=null && channel.isActive()) {
//2.建立连接成功,向通道中写入数据,并且监听发送状态
channel.writeAndFlush(rpcRequest).addListener(future -> {
if(future.isSuccess()){
log.info("客户端成功发送了信息,请求信息是:{}",rpcRequest.toString());
}else{
log.error("客户端发送信息失败!");
}
});
//3.等待服务端返回信息,当信息传递完毕后通道关闭,因此我们监听通道的存活状态
channel.closeFuture().sync();
//4.通道关闭,数据应当已经写入到了通道域中
AttributeKey<RpcResponse> key = AttributeKey.valueOf("rpcResponse");
//4.返回信息成功,取出返回值
return channel.attr(key).get();
}
//1.获取请求信息
RpcRequest rpcRequest = (RpcRequest) msg;
log.info("接收到了信息{},次数:{}",rpcRequest.toString(),COUNTER.incrementAndGet());
//2.返回调用信息
//TODO:在本地发起调用...
RpcResponse rpcResponse = new RpcResponse();
rpcResponse.setMessage("complete!");
//3.向通道中写回信息
ChannelFuture future = ctx.writeAndFlush(rpcResponse);
future.addListener(ChannelFutureListener.CLOSE);
在之前,我们判断数据是否被写入是根据客户端是否正确接收到数据,现在我们选用复用通道,那么就肯定不是一次连接一次关闭了,因此我们的策略是修改通道为不直接关闭
于是先初步修改为
//1.获取请求信息
RpcRequest rpcRequest = (RpcRequest) msg;
log.info("接收到了信息{},次数:{}",rpcRequest.toString(),COUNTER.incrementAndGet());
//2.返回调用信息
//TODO:在本地发起调用...
RpcResponse rpcResponse = new RpcResponse();
rpcResponse.setMessage("complete!");
//3.向通道中写回信息
ChannelFuture future = ctx.writeAndFlush(rpcResponse);
但是这里的话就有一个问题了,就是怎么拿对应的数据出来?
第一种方案:利用我们提供的requestId
作为key,存到通道域里面去,代码如下
AttributeKey<RpcResponse> key = AttributeKey.valueOf(String.valueOf(((RpcResponse<?>) msg).getRequestId()));
ctx.channel().attr(key).set(rpcResponse);
然后对应的客户端从中取出key即可
分析:这种方式存在什么问题?
它的问题主要是串行执行,当客户端想要获取数据的时候,必须串行执行任务,也就是说,当通信线程需要从通道中get到数据的时候,都会被迫因为sync()
而被阻塞,但是这是避免不了的,因为只有当对方发出关闭通道的信号的时候,我才能确保这个key被写入了,否则的话必须不断轮询map
如何解决?
解决串行同步执行的一般方法是异步执行,那么对谁异步处理呢?
对处理的结果,也就是这个key-value
的键值对,我们想要的是RpcResponse
这个对象
- 第一步:在客户端发送数据的时候,给这个
RpcResponse
创建异步回调任务,并且使用容器存储起来 - 第二步:服务端处理数据,返回
RpcResponse
到客户端 - 第三步:客户端接收到这个
RpcResponse
后,将异步回调任务设置为complete
由于RpcResponse
对象中是否有值,取决于客户端是否处理完这个数据,而只有当客户端去强行get
这个对象的时候,如果数据还没有准备好,才会将客户端的线程挂起。
因此是十分灵活的,客户端只需要将请求发送出去,然后返回这个响应所对应的异步任务,当客户端真正需要这个异步任务的结果的时候,才将自己阻塞。
为此,我们使用一个封装对象,这个对象封装了对应的请求的Id和对应的future
对象,这个类还应该要提供
- 当客户端能够接收完对象后,能够将这个
future
给complete
,同时删除键值对
private static final Map<String, CompletableFuture<RpcResponse>> requestMap = new ConcurrentHashMap<>();
public void complete(RpcResponse rpcResponse) throws RpcException {
CompletableFuture<RpcResponse> future = requestMap.remove(String.valueOf(rpcResponse.getRequestId()));
if(future == null){
throw new RpcException("响应并发错误!");
}
//否则的话就将这个future完成掉
future.complete(rpcResponse);
}
public void put(int requestId,CompletableFuture<RpcResponse> future){
requestMap.put(String.valueOf(requestId),future);
}
然后我们对客户端进行改造
public CompletableFuture<RpcResponse> sendMessage(RpcRequest rpcRequest) throws RpcException {
CompletableFuture<RpcResponse> future = new CompletableFuture<>();
InetSocketAddress inetSocketAddress = serviceDiscovery.lookUpService(rpcRequest.toServiceName());
Channel channel = getChannel(inetSocketAddress);
if(channel!=null && channel.isActive()) {
//将当前的请求-响应任务加入到队列中
UnprocessedRequests.put(rpcRequest.getRequestId(),future);
//2.建立连接成功,向通道中写入数据,并且监听发送状态
channel.writeAndFlush(rpcRequest).addListener((ChannelFutureListener)f-> {
if(f.isSuccess()){
log.info("客户端发送数据成功{}",rpcRequest.toServiceName());
}else{
f.channel().close();
future.completeExceptionally(f.cause());
log.error("客户端发送失败,原因:{}",f.cause().toString());
}
});
}else{
throw new RpcException("通道关闭异常!");
}
return future;
}
接着是这个客户端处理线程的改动,它要做的是接收到来自服务端的响应,然后把它complete掉
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
try{
if(msg instanceof RpcResponse){
RpcResponse rpcResponse = (RpcResponse) msg;
log.info("客户端读取到的信息为:{}",rpcResponse.getMessage());
//将它complete掉即可
UnprocessedRequests.complete(rpcResponse);
}else{
log.info("响应格式不合法!");
}
}finally {
ReferenceCountUtil.release(msg);
}
}
3.6 基于Netty实现的RPC远程调用(服务端)
基于3.4和3.5我们对整个网络传输模块进行了异步调用的优化,接着我们完成服务端的改动
服务端是一个操作对象,因此它应该具有暴露一个服务注册的功能,于是我们添加:
public void registryService(RpcServiceConfig rpcServiceConfig){
serviceProvider.publishService(rpcServiceConfig);
}
//1.获取请求信息
RpcRequest rpcRequest = (RpcRequest) msg;
log.info("接收到了信息{},次数:{}",rpcRequest.toString(),COUNTER.incrementAndGet());
//2.返回调用信息
//TODO:在本地发起调用...
RpcResponse rpcResponse = new RpcResponse();
rpcResponse.setMessage("complete!");
//3.向通道中写回信息
ChannelFuture future = ctx.writeAndFlush(rpcResponse);
future.addListener(ChannelFutureListener.CLOSE_ON_FAILURE);
4. 自定义通信协议
4.1 通信协议设计
目前来说,我们所实现的这个rpc框架还不需要用到通信协议
但是我们的rpc
框架最终是在分布式的环境下进行通信的,假设是Java
客户端与golang
客户端进行通信,JDK
所提供的序列化方式是golang
客户端所不知道的,因此我们必须在通信协议的头部添加一个标记,规定双方使用什么样的方式来对数据进行解释,这就是通信协议存在的意义
我们设计的通信协议如下:
魔数:magicCode:四字节,32bit,用来生成一个协议标志,拿到数据包后会先检查这个魔数,如果魔数合法,那么就继续解析,否则丢弃数据包
版本号version:占1个字节,表示通信协议的版本号,当协议发生变化时,对等端的协议解析器也要修改相关解析代码,因此需要校验版本号后才能进行解析,否则丢弃数据包
消息长度full-length:占4个字节,表示当前消息的body的长度,这个问题解决了底层TCP的半包和粘包的问题
消息类型message-type,占1个字节:分别为
- 请求消息类型
- 响应消息类型
- 心跳检测消息类型
序列化方式codec,占1个字节,其中不同的编码代表不同解析方式状态机
以下是基于HTTP/2实现的头部/首部压缩的添加字段
- 消息压缩方式compress:占1个字节
- 请求ID(StreamId)requestID:4个字节
4.2 添加客户端封装成帧(包)代码
首先我们先将一些定义添加到我们的常量池中
public static final int MAGIC_NUMBER = 114514;
public static final byte VERSION = 0x1;
public static final byte RESPONSE_MESSAGE = 0x1;
public static final byte REQUEST_MESSAGE = 0x2;
public static final byte HEARTBEAT_MESSAGE = 0x3;
public static final byte KRYO_CODEC = 0x1;
public static final byte GZIP_COMPRESS = 0x1;
然后封装一个协议包的对象
public class RpcMessage {
/**
* 消息类型
*/
private byte messageType;
/**
* 序列化类型
*/
private byte codec;
/**
* 压缩方式
*/
private byte compress;
/**
* request id
*/
private int requestId;
/**
* request data
*/
private Object data;
}
由于剩下的字段都是在运行时计算得出的,或者是固定常量,因此在encoder
里面写
@Override
protected void encode(ChannelHandlerContext channelHandlerContext,RpcPackage rpcPackage, ByteBuf out) throws Exception {
if(genericClass.isInstance(rpcPackage)){
//1.写入魔数(4B->INT)
out.writeInt(MAGIC_NUMBER);
//2.写入版本号1B
out.writeInt(VERSION);
//3.写入数据长度4B
out.markWriterIndex();
out.writerIndex(out.writerIndex() + 4);
//4.写入数据类型message,1B
out.writeByte(rpcPackage.getMessageType());
//5.写入序列化方式,1B
out.writeByte(rpcPackage.getCodec());
//6.写入消息压缩格式,1B
out.writeByte(rpcPackage.getCompress());
//7.写入请求ID(4B->INT)
out.writeInt(rpcPackage.getRequestId());
//TODO:数据压缩未完成
//TODO:这里还有数据的长度没有设置的,由于存在数据的压缩,因此这里还需要做一个数据的压缩(包体)
byte[] serialize = serializer.serialize(rpcPackage);
int fullLength = serialize.length+HEADER_LENGTH;
out.writeBytes(serialize);
//3.回去写入这个数据
out.resetWriterIndex().writeInt(fullLength);
//序列化完成
}
}
接下来,在客户端中封装这个帧即可
//重新封装
//消息类型、序列化协议、压缩类型、请求Id、请求数据:包体
RpcPackage rpcPackage = RpcPackage.builder()
.messageType(REQUEST_MESSAGE)
.codec(KRYO_CODEC)
.compress(GZIP_COMPRESS)
.requestId(rpcRequest.getRequestId())
.data(rpcRequest)
.build();
channel.writeAndFlush(rpcPackage).addListener((ChannelFutureListener)f-> {
4.3 添加服务端解析包头代码
服务端首先接收到数据后要先进行包的拆解和对象的还原,因此修改Encoder
if(byteBuf == null || byteBuf.readableBytes()<BODY_LENGTH){
throw new IllegalArgumentException("参数不合法!");
}
byteBuf.markReaderIndex();
//1.检查魔数
int magicNumber = byteBuf.readInt();
if(magicNumber != MAGIC_NUMBER){
throw new RpcException("魔数错误!不是当前协议的包");
}
//2.检查版本
byte version = byteBuf.readByte();
if(version != VERSION){
throw new RpcException("协议版本号不一致");
}
//3.读取数据长度
int fullLength = byteBuf.readInt();
//4.读取数据的类型
byte messageType = byteBuf.readByte();
//5.读取序列化方式
byte codec = byteBuf.readByte();
//6.读取消息压缩格式
byte compress = byteBuf.readByte();
//7.获取请求ID
int requestId = byteBuf.readInt();
RpcPackage rpcPackage = RpcPackage.builder()
.codec(codec)
.messageType(messageType)
.compress(compress)
.requestId(requestId).build();
/*处理body部分*/
int bodyLength = fullLength - HEADER_LENGTH;
//1.如果body是空体
if(bodyLength == 0){
return;
}
//2.如果body中的字节数小于规定的数量
if(bodyLength < 0){
byteBuf.resetReaderIndex();
log.info("出现半包现象,等待下一次I/O读取完整数据");
return ;
}
//3.正常情况或者整包
byte[] data = new byte[bodyLength];
byteBuf.readBytes(data);
//TODO:这里可以做一个包的展开
Object result = serializer.deserialize(data,genericClass);
list.add(result);
log.info("反序列化完成");
然后修改发送响应的逻辑
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
try {
if(msg instanceof RpcPackage){
RpcPackage rpcPackage = (RpcPackage) msg;
RpcRequest rpcRequest = (RpcRequest) rpcPackage.getData();
log.info("接收到了信息{},次数:{}",rpcRequest.toString(),COUNTER.incrementAndGet());
RpcPackage rpcResponse = RpcPackage.builder()
.requestId(rpcRequest.getRequestId())
.codec(KRYO_CODEC)
.compress(GZIP_COMPRESS)
.messageType(RESPONSE_MESSAGE).build();
//3.向通道中写回信息
Object result = rpcRequestHandler.handle(rpcRequest);
rpcResponse.setData(result);
ChannelFuture future = ctx.writeAndFlush(rpcResponse);
future.addListener(ChannelFutureListener.CLOSE_ON_FAILURE);
}
}finally {
ReferenceCountUtil.release(msg);
}
}
至此,一个完整的具有自定义通信协议的RPC远程调用就被完成了
5. 心跳机制
5.1 心跳机制简述(为什么)
什么是心跳
在TCP连接中,在客户端和服务端建立连接之后,需要定期发送数据包,来通知对方自己还在线,以确保连接的有效性,如果一个连接上时间没有心跳,需要及时断开,否则服务端会维护很多很多无用的连接,浪费服务器的资源
简单来说,我们在之前做客户端性能分析与优化的时候,我们并没有设置通道啥时候关闭,它是否关闭取决于这条TCP连接什么时候失效,因此为了避免连接被操作系统回收,那么我们必须定时在这条连接上发送数据包,否则这条连接因为闲置就会被回收。
心跳的好处
- 提供了保活的思路,避免连接被频繁回收导致高成本的连接与释放的开销
- 提供了检测条件,能够检测连接是否还存在
IdleStateHandler
Netty 已经为我们提供了心跳的 Handler,当连接的空闲时间太长的时候,就会触发一个IdleStateEvent
事件,传递到下一个Handler
,可以通过PipleHandler
中重写userEventTrigged
来处理该事件,注意我们自己的 Handler 需要在 IdleStateHandler
后面。
5.2 IdleStateHandler
IdleStateHandler构造函数解读
public IdleStateHandler(boolean observeOutput,
long readerIdleTime, long writerIdleTime, long allIdleTime,
TimeUnit unit) {
}
- observeOutput:是否考虑出站较慢的情况,如果 true:当出站时间太长,超过空闲时间,那么将不触发此次事件。如果 false,超过空闲时间就会触发事件。默认 false。
- readerIdleTime:读空闲的时间,0 表示禁用读空闲事件。
- writerIdleTime:写空闲的时间,0 表示禁用写空闲事件。
- allIdleTime:读或写空闲的时间,0 表示禁用事件。
- unit:单位
事件处理机制
IdleStateHandler
继承 ChannelDuplexHandler
,重写了出站和入站的事件,我们来看看代码。
当 channel 添加、注册、活跃的时候,会初始化 initialize(ctx)
,删除、不活跃的时候销毁 destroy()
,读写的时候设置 lastReadTime
和 lastWriteTime
字段。
初始化
当 channel 添加、注册、活跃的时候,会初始化 initialize(ctx)
,下面我们就来看看初始化的代码:
其实初始化很简单,就是根据构造函数给的 读写空闲时间 去决定初始化哪些定时任务,分别是:ReaderIdleTimeoutTask
(读空闲超时任务)、WriterIdleTimeoutTask
(写空闲超时任务)、AllIdleTimeoutTask
(读写空闲超时任务)。
定时任务
① 如果读空闲超时了,则重新起一个定时器,然后触发事件
② 如果读空闲未超时,则新起一个时间更短(readerIdleTimeNanos - ticksInNanos() - lastReadTime
)的定时器
触发事件
private void invokeUserEventTriggered(Object event) {
if (invokeHandler()) {
try {
// 触发事件,说白了,就是直接调用 userEventTriggered 方法而已
((ChannelInboundHandler) handler()).userEventTriggered(this, event);
} catch (Throwable t) {
notifyHandlerException(t);
}
} else {
fireUserEventTriggered(event);
}
}
其实触发事件,就是把事件传给下一个 Handler (next
),就是调用 userEventTriggered
方法而已。所以我们处理心跳的 Handler 一定要写到 IdleStateHandler
。
5.3 客户端实现
首先我们先将这个闲置时间检测
的处理器注册到引导器上
socketChannel.pipeline().addLast(new IdleStateHandler(0,5,0, TimeUnit.SECONDS));
然后当这个事件触发的时候,调用的是handler
中的userEventTriggered
方法,因此我们进行重写
public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
if(evt instanceof IdleStateEvent){
//当传来一个事件的时候
IdleStateEvent idleStateEvent = (IdleStateEvent) evt;
//写空闲
if(((IdleStateEvent) evt).state().equals(IdleState.WRITER_IDLE)){
log.info("写空闲时间触发 [{}]", ctx.channel().remoteAddress());
Channel channel = ctx.channel();
//向通道中发送心跳
//...
}
}
}
这里的话我们思考一下,心跳到底要如何交流?
public static final byte HEARTBEAT_REQUEST_MESSAGE = 0x3;
public static final byte HEARTBEAT_RESPONSE_MESSAGE = 0x4;
public static final String PING = "PING";
public static final String PONG = "PONG";
思路是这样的,发送心跳的一方为心跳请求方,它发送一个PING
出去,如果对方给我发来PONG
,就说明这条连接是可用的
补充事件处理器
if(((IdleStateEvent) evt).state().equals(IdleState.WRITER_IDLE)){
log.info("写空闲时间触发 [{}]", ctx.channel().remoteAddress());
Channel channel = ctx.channel();
//向通道中发送心跳
RpcPackage rpcPackage = RpcPackage.builder()
.compress(DUMMY_COMPRESS)
.codec(KRYO_CODEC)
.data(PING)
.messageType(HEARTBEAT_REQUEST_MESSAGE).build();
channel.writeAndFlush(rpcPackage);
}else{
super.userEventTriggered(ctx,evt);
}
补充线程处理器
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
try{
if(msg instanceof RpcPackage){
RpcPackage rpcMessage = (RpcPackage) msg;
byte messageType = rpcMessage.getMessageType();
if(messageType == RESPONSE_MESSAGE){
log.info("客户端读取到的信息为:{}",rpcMessage.getData());
//将它complete掉即可
UnprocessedRequests.complete(((RpcResponse) rpcMessage.getData()));
}else if(messageType == HEARTBEAT_RESPONSE_MESSAGE){
log.info("接收到心跳{}",rpcMessage.getData());
}
}else{
log.info("响应格式不合法!");
}
}finally {
ReferenceCountUtil.release(msg);
}
}
5.4 服务端实现
服务端的话,实际上有两种情况,一种是收到心跳包后响应,一种是收到心跳包后不响应
首先第一种是就是强维护,这种心跳包是为了保证连接一直存活不断开
第二种是弱维护,这种心跳包就是为了释放资源,找到合适的时机将连接断开
这里的话我们实现第一种,PING-PONG
try {
if(msg instanceof RpcPackage){
RpcPackage rpcPackage = (RpcPackage) msg;
RpcRequest rpcRequest = (RpcRequest) rpcPackage.getData();
log.info("接收到了信息{},次数:{}",rpcRequest.toString(),COUNTER.incrementAndGet());
//分情况讨论,分别是心跳包和非心跳包
RpcPackage response = new RpcPackage();
response.setCodec(KRYO_CODEC);
response.setCompress(GZIP_COMPRESS);
byte messageType = ((RpcPackage) msg).getMessageType();
if(messageType == HEARTBEAT_REQUEST_MESSAGE){
//响应pong
response.setData(PONG);
response.setMessageType(HEARTBEAT_RESPONSE_MESSAGE);
}else if(messageType == REQUEST_MESSAGE){
response.setRequestId(rpcRequest.getRequestId());
response.setMessageType(RESPONSE_MESSAGE);
Object result = rpcRequestHandler.handle(rpcRequest);
if(ctx.channel().isActive() && ctx.channel().isWritable()){
RpcResponse<Object> rpcResponse = RpcResponse.ok(result,rpcRequest.getRequestId());
rpcPackage.setData(rpcResponse);
}else{
RpcResponse<Object> rpcResponse = RpcResponse.fail(rpcRequest.getRequestId(),FAIL);
rpcPackage.setData(rpcResponse);
}
ctx.writeAndFlush(rpcPackage).addListener(ChannelFutureListener.CLOSE_ON_FAILURE);
}
}
}finally {
ReferenceCountUtil.release(msg);
}
socketChannel.pipeline().addLast(new IdleStateHandler(30,0,0, TimeUnit.SECONDS));
实际上第二种也能够保证连接不断开,因为连接是否要断开取决于服务端
这时候,如果客户端的包长时间到不来服务端,但是实际上这条连接还是在工作的,那么这时候就要使用重连机制了
private Channel doConnect(InetSocketAddress inetSocketAddress) throws ExecutionException, InterruptedException {
CompletableFuture<Channel> completableFuture = new CompletableFuture<>();
b.connect(inetSocketAddress).addListener((ChannelFutureListener)future -> {
if(future.isSuccess()){
log.info("客户端连接服务器成功!");
completableFuture.complete(future.channel());
}else{
log.error("客户端连接服务器失败!");
throw new RpcException("客户端连接服务器失败!");
}
});
return completableFuture.get();
}
private Channel getChannel(InetSocketAddress inetSocketAddress){
Channel channel = provider.get(inetSocketAddress);
if(channel == null){//重建连接
try {
channel = doConnect(inetSocketAddress);
} catch (ExecutionException e) {
e.printStackTrace();
} catch (InterruptedException e) {
e.printStackTrace();
}
//然后这个隧道入池
provider.set(inetSocketAddress,channel);
}
return channel;
}