从零开始实现一个简单的RPC框架(2)


1. Socket通信实战

什么是Socket

在计算机网络中,Socket被描述为一个抽象的关键字,由IP+port组成,它描述了TCP连接的一个对等端

如果要通过互联网进行通信,则至少需要一对套接字

  • 运行于服务器的Server Socket
  • 运行于客户端的Client Server

这个图描述了TCP三次握手->传输数据->TCP四次挥手的过程

client实际上并不需要指定端口,因为有默认端口的设定,它会在TCP的首部中进行描述,而它需要指定的是,你这个消息要发往哪个目的主机和哪个端口,这样对方的服务器才能够通过此端口接收数据

server需要选择监听哪个端口,才能正确接收数据,由于发过来的TCP报文段中具有对方的源地址和源端口信息,因此在回送的时候可以直接根据头部信息获取回送的路径

BIO:简单的EchoServer实现

BIO的实现思路是使用阻塞式的IO输入,只有当双方都完成IO后,连接才会断开,阻塞的时机是在accept()中,只有当接收到数据后,线程才会继续向下执行代码

ServerSocket serverSocket = new ServerSocket(port);
//2. 通过accept监听客户端发送过来的请求
Socket socket;
while ((socket = serverSocket.accept())!=null){
    //3. 通过输入流读取客户端发送过来的信息
    log.info("客户端已经连接");
    InputStream inputStream = socket.getInputStream();
    OutputStream outputStream = socket.getOutputStream();
    ObjectInputStream objectInputStream = new ObjectInputStream(inputStream);
    ObjectOutputStream objectOutputStream = new ObjectOutputStream(outputStream);
    Object message = objectInputStream.readObject();
    log.info("接收到的信息是{}",message);
    //4. 通过输出流回送信息
    objectOutputStream.writeObject("end");
    objectOutputStream.flush();

如果要进行修正,那么可以引入一个线程池

public static final Logger log = LoggerFactory.getLogger(EchoServerTest.class);
/**
 * 服务端开始监听
 * @param port
 */
public static void start(int port){
    //1. 创建ServerSocket,并且监听一个端口
    try {
        ServerSocket serverSocket = new ServerSocket(port);
        //2. 通过accept监听客户端发送过来的请求
        Socket socket;
        while ((socket = serverSocket.accept())!=null){
            log.info("处理的线程ID是{}",Thread.currentThread().getId());
            //3. 通过输入流读取客户端发送过来的信息
            log.info("客户端已经连接");
            InputStream inputStream = socket.getInputStream();
            OutputStream outputStream = socket.getOutputStream();
            ObjectInputStream objectInputStream = new ObjectInputStream(inputStream);
            ObjectOutputStream objectOutputStream = new ObjectOutputStream(outputStream);
            Object message = objectInputStream.readObject();
            log.info("接收到的信息是{}",message);
            //4. 通过输出流回送信息
            objectOutputStream.writeObject("end");
            objectOutputStream.flush();
        }
    } catch (IOException e) {
        e.printStackTrace();
    } catch (ClassNotFoundException e) {
        e.printStackTrace();
    }
}

public static void main(String[] args) {
    //1.创建线程池
    ThreadFactory threadFactory = Executors.defaultThreadFactory();
    ExecutorService threadPool = new ThreadPoolExecutor(10,100,1,
            TimeUnit.MINUTES,new ArrayBlockingQueue<>(100),threadFactory);
    threadPool.execute(()->{
        log.info("创建新的线程,线程ID:{}",Thread.currentThread().getId());
        start(80);
    });
}

BIO的最大痛点是:处理线程和请求数是1:1的出现的,其关键在于监听请求和处理请求都是同一个线程完成的

尽管引入了线程池,但是当线程数量过多的时候,也会导致程序的崩溃,因此我们应该尽可能的减少线程的使用,一个办法是将监听请求的线程和处理请求的线程给解耦,有一个线程专门负责请求的接收,然后将此请求分发到下面的子线程进行处理

threadPool.submit(()->{
    log.info("处理的线程ID是{}",Thread.currentThread().getId());
    //3. 通过输入流读取客户端发送过来的信息
    log.info("客户端已经连接");
    InputStream inputStream = null;
    try {
        inputStream = socket.getInputStream();
        OutputStream outputStream = socket.getOutputStream();
        ObjectInputStream objectInputStream = new ObjectInputStream(inputStream);
        ObjectOutputStream objectOutputStream = new ObjectOutputStream(outputStream);
        Object message = objectInputStream.readObject();
        log.info("接收到的信息是{}",message);
        //4. 通过输出流回送信息
        objectOutputStream.writeObject("end");
        objectOutputStream.flush();
    } catch (IOException e) {
        e.printStackTrace();
    } catch (ClassNotFoundException e) {
        e.printStackTrace();
    }
});

经过我自己的测试,当线程数达到十万级别的时候,CPU飙到了百分之90,而这仅仅只是传送一个end而已,而且还有很多请求被丢掉了,性能实在太差

于是引入了NIO

2. Netty入门实战

Netty成功找到了一种方式,使得在不妥协可维护性和性能情况下实现易于开发,性能,稳定性和灵活性的方式

首先定义RPC通信对象

@Data
public class RpcRequest {
    private String interfaceName;
    private String methodName;
}
@Data
public class RpcResponse {
    private String message;
}

初始化客户端程序

客户端的职责:用一个向服务端发送消息的sendMessage()方法,通过此方法可以将消息也就是RpcRequest对象发送到服务端,并且可以同步获取到服务端返回的结构,也就是RpcResponse

我们来看看初始化的时候,都做了一些什么:

private static final Bootstrap b;

//初始化相关资源
static {
    EventLoopGroup eventLoopGroup = new NioEventLoopGroup();
    b = new Bootstrap();
    //初始化序列化器
    KryoSerializer kryoSerializer = new KryoSerializer();
    b.group(eventLoopGroup)
            .channel(NioSocketChannel.class).handler(new LoggingHandler(LogLevel.INFO))
            .option(ChannelOption.CONNECT_TIMEOUT_MILLIS,5000)
            .handler(new ChannelInitializer<SocketChannel>() {
                @Override
                protected void initChannel(SocketChannel socketChannel) throws Exception {
                    //自定义序列化编码器
                    //RpcResponse->ByteBuf
                    socketChannel.pipeline().addLast();
                    //ByteBuf->RpcRequest
                    socketChannel.pipeline().addLast();
                    socketChannel.pipeline().addLast();
                }
            });
}

有很多不认识的东西,我们先来认识一下这些东西

Bootstrap:它是启动引导类,它是客户端/服务端启动的时候的辅助类,它的作用是规定了连接的端口以及处理I/O,接收I/O请求的规则,

EventLoopGroup:它相当于一个线程组,EventLoop相当于一个处理线程,那么group就相当于一个线程池,Netty用来处理请求的线程就来自于这个参数,就相当于,它是一个施工队,当需要干活的人从里面挑工人出去干活

channel:是代表连接的类型,比如说是TCP连接?还是UDP

handler:消息处理器,代表当请求到来的时候如何处理请求,它就类似于工程师,专门设计方案的,设计出来后,交给施工队进行处理,施工队拿着图纸开始施工

初始化通道处理器的过程之后再讲,这里先看编码器

public class NettyKryoEncoder extends MessageToByteEncoder<Object> {
    private final Serializer serializer;
    private final Class<?> genericClass;

    @Override
    protected void encode(ChannelHandlerContext channelHandlerContext, Object o, ByteBuf byteBuf) throws Exception {
        if(genericClass.isInstance(o)){
            //1.将对象转化为Byte
            byte[] serialize = serializer.serialize(o);
            //2. 读取消息的长度
            int dataLength = serialize.length;
            //3. 写入消息长度
            byteBuf.writeInt(dataLength);
            //4. 写入消息
            byteBuf.writeBytes(serialize);
        }
    }
}
public class NettyKryoDecoder extends ByteToMessageDecoder {

    private final Serializer serializer;
    private final Class<?> genericClass;

    private static final int BODY_LENGTH = 4;


    @Override
    protected void decode(ChannelHandlerContext channelHandlerContext, ByteBuf byteBuf, List<Object> list) throws Exception {
        if(byteBuf == null || byteBuf.readableBytes()<BODY_LENGTH){
            return;
        }
        //1.读取消息的长度
        byteBuf.markReaderIndex();
        int dataLength = byteBuf.readInt();
        //2.参数校验
        if(dataLength <0 || byteBuf.readableBytes()<0){
            return ;
        }
        //3.如果可读的字节数小于消息长度的话也是不合理的
        if(dataLength > byteBuf.readableBytes()){
            //那么这时候只能放弃解码,重置读指针,还原本次的修改,等待下一次的I/O
            byteBuf.resetReaderIndex();
            return;
        }
        byte[] body = new byte[dataLength];
        byteBuf.readBytes(body);
        //反序列化
        Object target = serializer.deserialize(body, genericClass);
        list.add(target);
        log.info("反序列化完成");
    }
}

技术点:这里解决了粘包和半包的问题

粘包的话会导致消息的长度<字节数组的长度,这里的话用的方案是提供读取长度的方案。

那么为什么Handler上要注册这几个东西呢?

我们先来回顾一下,在连接上,我们到底在做什么?

首先对于客户端而言,它的工作是将自己产生的RpcRequest发送到服务端对吧?

那么在这个过程中,通道上需要有一个方法支持,它支持将这个对象编码为二进制字符串,因此需要注册Decoder、Encoder方法

第二个,客户端还需要解码来自于服务端的消息

因此通道上需要注册一个将二进制字符串转换为对象的方法

第三个,客户端获取到了一个响应,那这个响应要放到哪里去?

一个通道内的数据通常应该是共享的,因此设计了通道域,将通道中的信息放到一个容器中,对于调用方,只要访问这个容器就能取到数据了

我们可以看到,这上面的三个环节实际上是环环相扣的,也就是每一部分的工作都由专人负责

技术点:使用责任链模式解耦合编码/解码/数据存取工作

至此,客户端就初始完毕了

接下来是核心部分,就是规定客户端如何收发数据

public RpcResponse sendMessage(RpcRequest rpcRequest){
    //1.建立连接,并且获取连接对象数据

    //2.建立连接成功,向通道中写入数据

    //3.监听返回状态

    //4.返回信息成功,从通道中取出返回值

    //5.返回信息失败,显示错误信息
}

大致流程如上,接下来我们借助于netty进行实现

public RpcResponse sendMessage(RpcRequest rpcRequest){
    try {
        //1.建立连接,并且获取连接对象数据
        ChannelFuture channelFuture = b.connect(host, port).sync();
        //1.1 获取通道描述对象
        Channel channel = channelFuture.channel();
        if(channel != null) {
            //2.建立连接成功,向通道中写入数据,并且监听发送状态
            channel.writeAndFlush(rpcRequest).addListener(future -> {
                if(future.isSuccess()){
                    log.info("客户端成功发送了信息,请求信息是:{}",rpcRequest.toString());
                }else{
                    log.error("客户端发送信息失败!");
                }
            });
            //3.等待服务端返回信息,当信息传递完毕后通道关闭,因此我们监听通道的存活状态
            channel.closeFuture().sync();
            //4.通道关闭,数据应当已经写入到了通道域中
            AttributeKey<RpcResponse> key = AttributeKey.valueOf("rpcResponse");
            //4.返回信息成功,取出返回值
            return channel.attr(key).get();
        }
    } catch (InterruptedException e) {
        //5.返回信息失败,显示错误信息
        e.printStackTrace();
    }
    return null;
}

服务端开发

首先服务端的架构是基于IO多路复用的架构,这个架构的特点是

  • 有一个主线程,该主线程负责接收请求,并且把请求分发到下属的工作线程
  • 工作线程完成任务后,将任务结果通知主线程,主线程将结果返回给客户端
private void run() {
    //1.主线程
    EventLoopGroup bossGroup = new NioEventLoopGroup();
    //2.工作者线程
    EventLoopGroup workerGroup = new NioEventLoopGroup();
    //3.序列化器实现
    KryoSerializer kryoSerializer = new KryoSerializer();
    try{
        //4.初始化引导器
        ServerBootstrap b = new ServerBootstrap();
        b.group(bossGroup,workerGroup)
                .channel(NioServerSocketChannel.class)//注册通道类型
                .childOption(ChannelOption.TCP_NODELAY,true)//注册每一个channel的TCP策略
                .childOption(ChannelOption.SO_KEEPALIVE,true)
                .option(ChannelOption.SO_BACKLOG,128)
                .handler(new LoggingHandler(LogLevel.INFO))
                .childHandler(new ChannelInitializer<SocketChannel>() {//每一个channel的处理策略
                    @Override
                    protected void initChannel(SocketChannel socketChannel) throws Exception {
                        socketChannel.pipeline().addLast();
                        socketChannel.pipeline().addLast();
                        socketChannel.pipeline().addLast();
                    }
                });
    }finally {

    }
}

依然的是有很多东西看不懂,下面一个个的进行解读

为什么客户端的初始化是在静态块中进行初始化,那到服务器上就变成了实例了?

这个与功能有关

客户端的引导器在初始化的时候并不需要知道对方的IP和port,只有当发起调用的时候才需要知道,因此引导器的初始化放在static实例中均能够完成初始化,选择在static中能够加速首次调用,但是会降低整个服务的启动速度

服务端的引导器在初始化的时候就必须绑定端口号,但是端口号是程序设定的,一般来说,只有在程序准备完毕后才会执行开启服务的调用,如果在类初始化时就开启调用,可能导致错误的发生

child和非child有什么区别?

从架构上来看,是一个主线程下分配若干个子线程去真正的在Channel中处理数据,因此需要额外配置每个子线程的和子通道的处理策略

ChannelOption.TCP_NODELAY

它意味着开启了Nagle算法,该算法的作用是尽可能快的发送大数据块,减少网络的传输量,这个参数的作用当为true的时候,就开启了Nagle算法

ChannelOption.SO_KEEPALIVE

它意味着开启了TCP底层的保活机制,避免客户端关闭后占用线程资源

ChannelOption.SO_BACKLOG

它表示TCP连接队列的数量,如果连接建立频繁,服务器处理新连接比较慢,可以适当调大这个参数

然后是子线程的处理机制我们要怎么设计?

从通道中接收到一个数据,工作是

  • 将二进制串解码,还原对象
  • 执行方法
  • 将产生的响应对象编码,发送到通道中

于是设计为:

.childHandler(new ChannelInitializer<SocketChannel>() {
    @Override
    protected void initChannel(SocketChannel socketChannel) throws Exception {
        socketChannel.pipeline().addLast(new NettyKryoDecoder(kryoSerializer, RpcRequest.class));
        socketChannel.pipeline().addLast(new NettyKryoEncoder(kryoSerializer, RpcResponse.class));
        socketChannel.pipeline().addLast(new NettyServerHandler());
    }

初始化完毕,开启服务端

//绑定监听端口,同步等待绑定成功
ChannelFuture future = b.bind(port).sync();
//等待服务端监听端口关闭
future.channel().closeFuture().sync();

完整代码

package netty.server;
/**
 * Netty发起远程调用的服务端
 *
 * @author: 张庭杰
 * @date: 2023年02月03日 16:40
 */
public class NettyServer {
    private static final Logger log = LoggerFactory.getLogger(NettyServer.class);
    private final int port;
    public NettyServer(int port) {//监听端口
        this.port = port;
    }
    private void run() {
        //1.主线程
        EventLoopGroup bossGroup = new NioEventLoopGroup();
        //2.工作者线程
        EventLoopGroup workerGroup = new NioEventLoopGroup();
        //3.序列化器实现
        KryoSerializer kryoSerializer = new KryoSerializer();
        try{
            //4.初始化引导器
            ServerBootstrap b = new ServerBootstrap();
            b.group(bossGroup,workerGroup)
                    .channel(NioServerSocketChannel.class)//注册通道类型
                    .childOption(ChannelOption.TCP_NODELAY,true)
                    .childOption(ChannelOption.SO_KEEPALIVE,true)
                    .option(ChannelOption.SO_BACKLOG,128)
                    .handler(new LoggingHandler(LogLevel.INFO))
                    .childHandler(new ChannelInitializer<SocketChannel>() {
                        @Override
                        protected void initChannel(SocketChannel socketChannel) throws Exception {
                            socketChannel.pipeline().addLast(new NettyKryoDecoder(kryoSerializer, RpcRequest.class));
                            socketChannel.pipeline().addLast(new NettyKryoEncoder(kryoSerializer, RpcResponse.class));
                            socketChannel.pipeline().addLast(new NettyServerHandler());
                        }
                    });
            //绑定监听端口,同步等待绑定成功
            ChannelFuture future = b.bind(port).sync();
            //等待服务端监听端口关闭
            future.channel().closeFuture().sync();
        } catch (InterruptedException e) {
            e.printStackTrace();
        } finally {
            //关闭后,执行关闭
            bossGroup.shutdownGracefully();
            workerGroup.shutdownGracefully();
        }
    }
}

将处理消息的逻辑处理

try {
    //1.获取请求信息
    RpcRequest rpcRequest = (RpcRequest) msg;
    log.info("接收到了信息{},次数:{}",rpcRequest.toString(),COUNTER.incrementAndGet());
    //2.返回调用信息
    //TODO:在本地发起调用...
    RpcResponse rpcResponse = new RpcResponse();
    rpcResponse.setMessage("complete!");
    //3.向通道中写回信息
    ChannelFuture future = ctx.writeAndFlush(rpcResponse);
    future.addListener(ChannelFutureListener.CLOSE);
}finally {
    ReferenceCountUtil.release(msg);
}
ReferenceCountUtil.release(msg);

这一句是对不需要消息的收尾,防止触发GC的时候大量消息在JVM中,导致GC缓慢

测试代码

new NettyServer(6666).run();
for (int i = 0; i < 10; i++) {
    RpcRequest rpcRequest = new RpcRequest();
    rpcRequest.setInterfaceName("haha");
    rpcRequest.setMethodName("haha");
    NettyClient nettyClient = new NettyClient("127.0.0.1",6666);
    RpcResponse rpcResponse = nettyClient.sendMessage(rpcRequest);
    System.out.println(rpcResponse);
}

测试通过

3. 网络传输模块分析

上面通过一个简单的Netty案例实现了一个远程调用,只不过我们还没有真正意义上的去调用远程的方法,因此下面我们进行改造

3.1 完善实体类信息

我们想要调用一个远程方法,要知道的必选元素有:

  • 暴露出来的接口名称
  • 暴露出来的方法名称
  • 方法所需要的参数
  • 方法所需要的参数的类型

这些调用一个方法所必须的元素,但是这个实体类有个问题

当多请求同时发送的时候,这时候响应是几乎同时响应的,那么我怎么知道哪次调用是谁的呢

如果是BIO的话我们不会有这个问题,但是我们目前是NIO。

这里的话我们借鉴HTTP/2中的StreamId的思想,给它添加一个requestId,用来作为一对请求和响应的标识。

第二个问题,就是当我调用的这个接口,如果有多个实现类,那么我怎么调用到准确的那个实体类呢?

也是添加一个标识字段,名字任意,这里运用了最佳实践中的group

第三个问题,当接口的实现类修改了它的实现,而且是一次重大升级,为了保证兼容,服务端提供了两个接口,一个是旧版本的,一个是新版本的,而他们是一个实现类,这时候也要添加一个标识符,我把它叫做version

于是,最终设计出来的结果是,UUID是生成的当前的时间戳

public class RpcRequest {
    private static final long serialVersionUID = 1675615710L;
    /** 必选参数类型 **/
    private String interfaceName;//暴露出来的接口名称
    private String methodName;//暴露出来的方法名称
    private Object[] parameters;//调用该方法的参数实参
    private Class<?> paraTypes;//这些参数的类型

    /** 辅助参数类型 **/
    private String group;//接口的实现类标识
    private String version;//接口实现方法版本标识
    private String requestId;//请求ID,类似于HTTP/2中的StreamId
}

接着要完成的是响应类的设计

HTTP响应一样,响应需要有响应头和响应体

  • 关于响应头

响应头应该包含了常见的信息,比如说:

状态码code:本次请求的结果,如200,403,304等

请求Id:代表本次响应对应的是哪一次请求

消息message:代表本次响应的消息,比如是成功,还是失败

  • 关于响应体

响应体就是封装回来的数据,由于不确定,我们使用泛型

public class RpcResponse <T>{
    private static final long serialVersionUID = 1675616430L;
    /*必备参数*/
    private String code;//响应状态码
    private String message;//自定义响应消息,ok?fail?
    private T data;//响应体数据
}

为了便于结果的封装,我们这里提供了响应成功和响应失败的便捷方法

public static <T> RpcResponse<T> ok(T data,String requestId){
    //封装数据
    RpcResponse<T> rpcResponse = new RpcResponse<>();
    rpcResponse.setCode(RpcResponseCode.SUCCESSS.getCode());
    rpcResponse.setMessage(RpcResponseCode.SUCCESSS.getMessage());
    rpcResponse.setRequestId(requestId);
    if (data != null) {
        rpcResponse.setData(data);
    }
    return rpcResponse;
}

public static <T> RpcResponse<T> fail(String requestId,RpcResponseCode rpcResponseCode){
    RpcResponse<T> rpcResponse = new RpcResponse<>();
    rpcResponse.setCode(rpcResponse.getCode());
    rpcResponse.setMessage(rpcResponse.getMessage());
    rpcResponse.setRequestId(requestId);
    return rpcResponse;
}

3.2 基于Socket实现的RPC远程调用

首先,我们先将暴露出来的接口准备好

/**
 *
 * @param rpcRequest 请求对象
 * @return Object:返回请求回来的数据
 */
Object sendRprRequest(RpcRequest rpcRequest);

然后我们只需要去实现这个接口就可以了

基于Socket实现客户端

首先我们分析Socket要做什么,它要做的就是将request对象传输到服务端,然后服务端解析,获取解析后的结果再返回就可以了

那么在这个过程中,我们如果要访问一个服务,那么必然就是要从注册中去查找这个服务,于是在做实际的服务之前,我们必须先定义好查找服务的接口

第一步:首先规定服务名称的格式,服务名称应该能够起唯一标识的作用,我们根据

/** 必选参数类型 **/
private String interfaceName;//暴露出来的接口名称
private String methodName;//暴露出来的方法名称
private Object[] parameters;//调用该方法的参数实参
private Class<?> paraTypes;//这些参数的类型

/** 辅助参数类型 **/
private String group;//接口的实现类标识
private String version;//接口实现方法版本标识
private String requestId;//请求ID,类似于HTTP/2中的StreamId

初步规定服务名称的格式为

interfaceName/group/version

为什么不用method?这是因为这里只是访问它的服务类,方法的调用当信息传递到远程后再决定

于是我们新定义一个方法,能够组装起每个请求所想要访问的服务名称:

public String toServiceName(){
    return this.getInterfaceName() + File.separator +
            this.getGroup()+ File.separator+
            this.getVersion();
}

第二步:定义服务发现的接口与服务查找相关服务

/**
 * 通过远程服务名称查找服务
 * @param rpcServiceName
 * @return
 */
InetSocketAddress lookUpService(String rpcServiceName);

第三步:实现客户端接口

private final ServiceDiscovery serviceDiscovery;
//先手动实现注入
public SocketRpcClient(ServiceDiscovery serviceDiscovery){
    this.serviceDiscovery = serviceDiscovery;
}

/***
 * 通过Socket请求远程服务
 * @param rpcRequest 请求对象
 * @return
 */
@Override
public Object sendRprRequest(RpcRequest rpcRequest) {
    //1. 获取远程服务名称

    //2. 通过注册中心查找服务

    //3. 查找到服务后,通过socket连接发送对象数据

    //此时server解析rpcRequest数据后,写入到outputStream中

    //4. 获取返回后的结果

    return null;
}

然后就是按照这个思路来实现代码了

public Object sendRprRequest(RpcRequest rpcRequest) {
    Object result = null;
    //1. 获取远程服务名称
    String rpcServiceName = rpcRequest.toServiceName();
    //2. 通过注册中心查找服务
    InetSocketAddress inetSocketAddress = serviceDiscovery.lookUpService(rpcServiceName);
    //3. 查找到服务后,通过socket连接发送对象数据
    try(Socket socket = new Socket()){
        //3.1 连接到远程服务器,并且将当前对象写入到对方的缓存区中
        socket.connect(inetSocketAddress);
        //3.2 获取输出流,输出对象到对方
        OutputStream outputStream = socket.getOutputStream();
        ObjectOutputStream objectOutputStream = new ObjectOutputStream(outputStream);
        objectOutputStream.writeObject(rpcRequest);
        //3.3 此时server解析rpcRequest数据后,写入到outputStream中
        InputStream inputStream = socket.getInputStream();
        ObjectInputStream objectInputStream = new ObjectInputStream(inputStream);
        //4. 获取返回后的结果
        result = objectInputStream.readObject();
    } catch (IOException e) {
        e.printStackTrace();
    } catch (ClassNotFoundException e) {
        e.printStackTrace();
    }
    return result;
}

基于Socket实现服务端

要实现服务端,首先这个服务端要先注册自己的服务到注册中心上并且发布,于是我们定义一个服务注册的实现类

这一步比较复杂,涉及到整个注册中心以及服务调用的流程

3.3 定义注册中心基本结构

这一步非常关键,是我们理解整个注册中心机制的过程,我们先画个图理一下思路

我们建立的过程是自顶向下的,因此从应用的最高层开始建立

  • 最高层的是一个注册中心的实现,注册中心的功能是:作为根目录服务器,提供键值对映射,远程服务名称->(IP,端口),然后发送数据到此服务器上,服务器执行监听即可
/**
 * 存储键值对:rpcServiceName->socketAddress
 * @param rpcServiceName
 * @param socketAddress
 */
void registerService(String rpcServiceName, InetSocketAddress socketAddress);

那么我们如果想要注册服务,那么就必须知道服务器的Ip和端口对吧,然后知道了之后,我们怎么去调用呢?

使用socket进行通信的核心思路是:远程服务器开启一个主线程监听端口,接收端口的数据,然后根据远程调用的请求信息调用方法,最后返回对象

  • 第二层就是让你实现这样一个服务器

分析:首先需要开启一个主线程监听服务,然后在主线程中提交rpc请求。

于是需要:工作者线程池主线程的业务逻辑

于是写出框架如下:

/**
 * 主线程开始监听客户端的请求
 * @param port:监听端口号
 */
public void start(int port){
    try(ServerSocket socket = new ServerSocket()){
        //1.开启socket,监听port端口
        String host = InetAddress.getLocalHost().getHostAddress();
        //这里使用这个对象是为了更好的获得hostname和port,便于服务的注册
        socket.bind(new InetSocketAddress(host,port));
        //2.开始监听
        Socket clientSocket;
        while ((clientSocket = socket.accept())!=null){
            log.info("客户端已经连接,客户端ip:{}",clientSocket.getInetAddress().getHostAddress());
            workerGroup.execute(...);
        }
        workerGroup.shutdown();
    } catch (IOException e) {
        e.printStackTrace();
    }
}
  • 第三步:当客户端和服务端的连接打开后,服务端开始处理客户端发过来的请求,那么我们就需要定义一套逻辑专门用来处理这个连接上的参数

于是定义:

public class SocketRpcRequestHandler implements Runnable{
    private Socket socket;
    public SocketRpcRequestHandler(Socket socket) {
        this.socket = socket;
    }
    @Override
    public void run() {

    }
}

这里带socket是为了让这个线程专门去处理socket

//1.从client中获取对方发送的请求

//2.将这个请求发送给专门的处理器

//3.处理器返回结果

//4.将结果写回输出流

还需要额外定义一个处理器类来完成结果返回与结果处理的解耦,于是定义

/**
 * 处理请求对象
 * @param rpcRequest
 * @return
 */
public static Object handle(RpcRequest rpcRequest){
    //1.根据服务名称获取具体的服务实现类
    
    //2.获取到具体的服务实现类后,根据方法以及参数调用方法

}

private Object invokeMethod(RpcRequest rpcRequest,Object service){
    Object result = null;
    try{
        //1.获取方法
        String methodName = rpcRequest.getMethodName();
        Method method = service.getClass().getMethod(methodName);
        //2.使用上参数,调用方法
        result = method.invoke(service,rpcRequest.getParameters());
    } catch (NoSuchMethodException e) {
        e.printStackTrace();
    } catch (InvocationTargetException e) {
        e.printStackTrace();
    } catch (IllegalAccessException e) {
        e.printStackTrace();
    }
    //3.返回结果
    return result;
}

于是到了这一步,如何根据服务名称获得对应的实现类?

思路:封装一个服务提供者,里面缓存了当前实例所提供的所有远程服务名称->服务实现类,然后get出来即可

为什么要额外封装?这是因为我们的这个类是向外提供服务的,然而这个服务提供者还需要完成服务的注册与推送的功能,因此在此解耦合

public interface ServiceProvider {
    /**
     * 发布新的服务,将服务注册
     * @param rpcServiceConfig
     */
    void publishService(RpcServiceConfig rpcServiceConfig);

    /**
     * 通过远程调用名称获取服务的实现
     * @param rpcServiceName
     * @return
     */
    Object getService(String rpcServiceName);
}

然后完善处理器代码

public Object handle(RpcRequest rpcRequest){
    //1.根据服务名称获取具体的服务实现类
    Object service = serviceProvider.getService(rpcRequest.toServiceName());
    //2.获取到具体的服务实现类后,根据方法以及参数调用方法
    return invokeMethod(rpcRequest,service);
}

接着回去编写线程任务代码

我们注意到,任务处理器不必每一次都处理一个新的,因为它没有线程安全问题,因此在此创建一个单例工厂

//存储单例对象
private static final Map<String,Object> singletonMap = new ConcurrentHashMap<>();
private SingletonFactory(){}

public static Object getInstance(Class<?> clazz){
    if(clazz == null){
        throw new IllegalArgumentException();
    }
    String key = clazz.toString();
    if(singletonMap.containsKey(key)){
        return clazz.cast(singletonMap.get(key));
    }else{
        try {
            singletonMap.put(key,clazz.getDeclaredConstructor().newInstance());
        } catch (InstantiationException e) {
            e.printStackTrace();
        } catch (IllegalAccessException e) {
            e.printStackTrace();
        } catch (InvocationTargetException e) {
            e.printStackTrace();
        } catch (NoSuchMethodException e) {
            e.printStackTrace();
        }
    }
    return singletonMap.get(key);
}

完整代码

public class SocketRpcRequestHandlerTask implements Runnable{
    private Socket socket;
    private SocketRpcRequestHandler socketRpcRequestHandler;
    public SocketRpcRequestHandlerTask(Socket socket) {
        this.socket = socket;
        socketRpcRequestHandler = SingletonFactory.getInstance(SocketRpcRequestHandler.class);
    }

    @Override
    public void run() {
        log.info("当前处理任务线程:{}",Thread.currentThread().getId());
        try{
            //1.从client中获取对方发送的请求
            InputStream inputStream = socket.getInputStream();
            ObjectInputStream objectInputStream = new ObjectInputStream(inputStream);
            RpcRequest rpcRequest = (RpcRequest) objectInputStream.readObject();
            //2.将这个请求发送给专门的处理器进行解析和方法的调用
            Object result = socketRpcRequestHandler.handle(rpcRequest);
            //3.将结果写回输出流
            OutputStream outputStream = socket.getOutputStream();
            ObjectOutputStream objectOutputStream = new ObjectOutputStream(outputStream);
            objectOutputStream.writeObject(result);
            //4.完成响应
        } catch (IOException e) {
            e.printStackTrace();
        } catch (ClassNotFoundException e) {
            e.printStackTrace();
        }
    }
}

至此,基于Socket实现的RPC远程调用就完成了

3.4 基于Netty实现的RPC远程调用(客户端)

在前面Netty入门实战中,我们已经实现了一个基础可用的RPC远程调用程序,但是存在问题

  • 没有提供注册中心
  • 客户端和服务器的通信机制单一,无法获得其他服务

基于此,我们要做的就是将我们刚才搭的注册中心的框架引入进去。

理顺执行的流程:

第一步:是客户端要通过用户的远程调用服务名称,从中取出服务器的地址

第二步:与对方服务器建立连接

第三步:使用封装好的请求对象进行数据交互

因此,我们需要引入注册中心:

private final ServiceDiscovery serviceDiscovery;
this.serviceDiscovery = serviceDiscovery;

在连接之前,通过服务端提交过来的请求参数确定服务地址, 然后连接到远程,将东西发送过去就行了

初步修改:

InetSocketAddress inetSocketAddress = serviceDiscovery.lookUpService(rpcRequest.toServiceName());
try {
    //1.建立连接,并且获取连接对象数据
    ChannelFuture channelFuture = b.connect(inetSocketAddress).sync();

这样的话就把我们的注册中心引入进来了

3.5 客户端性能分析与优化

我们先来分析一下当前的代码有什么问题:

第一个问题:通道连接的浪费,目前我们的客户端处理响应是这样的

public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
    try{
        RpcResponse rpcResponse = (RpcResponse) msg;
        log.info("客户端读取到的信息为:{}",rpcResponse.getMessage());
        AttributeKey<RpcResponse> key = AttributeKey.valueOf("rpcResponse");
        ctx.channel().attr(key).set(rpcResponse);
        ctx.channel().close();
    }finally {
        ReferenceCountUtil.release(msg);
    }
}

也就是说,当服务端传过来一个响应之后,我们就立即关闭掉这个通道了,然后我们看看我们的通道是如何建立的

//1.建立连接,并且获取连接对象数据
ChannelFuture channelFuture = b.connect(inetSocketAddress).sync();
//1.1 获取通道描述对象
Channel channel = channelFuture.channel();
if(channel != null) {}

首先我们可以看出,建立通道是一个耗时的过程,如果不是耗时的过程也不会需要用到同步阻塞,同时,连接的连接可能会不成功

既然连接的建立那么费劲,那么我们为什么不复用连接呢?

在这里,我们的实现思路也是一个基于pooling的基本思路,如果我们发现一条连接依然是有效的,那么我们就复用它,否则删除重建

  • 第一步:建立一个Channel管理类,它封装了对Channel的相关操作,包括:判断是否存活,存取Channel等操作

编写如下:

public interface ChannelProvider {
    Channel get(InetSocketAddress inetSocketAddress) ;
    void set(InetSocketAddress inetSocketAddress, Channel channel);
    public void remove(InetSocketAddress inetSocketAddress);
}
//内部缓存
private final Map<String,Channel> channelMap;

public ChannelProvider() {
    channelMap = new ConcurrentHashMap<String, Channel>();
}

/**
 * 通过inetSocketAddress来查找当前是否存在这个隧道
 * @param inetSocketAddress
 * @return
 */
public Channel get(InetSocketAddress inetSocketAddress){
    String key = inetSocketAddress.toString();
    Channel channel;
    if(channelMap.containsKey(key)){
        //如果存在这个key
        //判断隧道还是否有效
        channel = channelMap.get(key);
        if(channel!=null && channel.isActive()){
            return channel;
        }else{//否则就已经失效了
            channelMap.remove(key);
        }
    }
    //否则的话就是压根没这个key
    return null;
}
public void set(InetSocketAddress inetSocketAddress, Channel channel){
    channelMap.put(inetSocketAddress.toString(),channel);
}

public void remove(InetSocketAddress inetSocketAddress){
    channelMap.remove(inetSocketAddress.toString());
}
  • 第二步,在客户端中注入这个对象,并且对获取通道的过程再次封装
/**
 * 获取连接对象
 * @param inetSocketAddress
 * @return
 */
private Channel doConnect(InetSocketAddress inetSocketAddress) throws ExecutionException, InterruptedException {
    CompletableFuture<Channel> completableFuture = new CompletableFuture<>();
    b.connect(inetSocketAddress).addListener((ChannelFutureListener)future -> {
        if(future.isSuccess()){
            log.info("客户端连接服务器成功!");
            completableFuture.complete(future.channel());
        }else{
            log.error("客户端连接服务器失败!");
            throw new RpcException("客户端连接服务器失败!");
        }
    });
    return completableFuture.get();
}

private Channel getChannel(InetSocketAddress inetSocketAddress){
    Channel channel = provider.get(inetSocketAddress);
    if(channel == null){//重建连接
        try {
            channel = doConnect(inetSocketAddress);
        } catch (ExecutionException e) {
            e.printStackTrace();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        //然后这个隧道入池
        provider.set(inetSocketAddress,channel);
    }
    return channel;
}
  • 第三步:在业务中引入我们设计的池子,要替换的是这一段
//1.建立连接,并且获取连接对象数据
ChannelFuture channelFuture = b.connect(inetSocketAddress).sync();
//1.1 获取通道描述对象
Channel channel = channelFuture.channel();

于是修改为

Channel channel = getChannel(inetSocketAddress);

于是这个优化点就结束了

技术点:通过异步回调的方式获取连接后的通道,避免阻塞式连接,提高性能

此时,我们的客户端代码也需要修改,我们先来回顾一下之前写的逻辑

if(channel!=null && channel.isActive()) {
    //2.建立连接成功,向通道中写入数据,并且监听发送状态
    channel.writeAndFlush(rpcRequest).addListener(future -> {
        if(future.isSuccess()){
            log.info("客户端成功发送了信息,请求信息是:{}",rpcRequest.toString());
        }else{
            log.error("客户端发送信息失败!");
        }
    });
    //3.等待服务端返回信息,当信息传递完毕后通道关闭,因此我们监听通道的存活状态
    channel.closeFuture().sync();
    //4.通道关闭,数据应当已经写入到了通道域中
    AttributeKey<RpcResponse> key = AttributeKey.valueOf("rpcResponse");
    //4.返回信息成功,取出返回值
    return channel.attr(key).get();
}
//1.获取请求信息
RpcRequest rpcRequest = (RpcRequest) msg;
log.info("接收到了信息{},次数:{}",rpcRequest.toString(),COUNTER.incrementAndGet());
//2.返回调用信息
//TODO:在本地发起调用...
RpcResponse rpcResponse = new RpcResponse();
rpcResponse.setMessage("complete!");
//3.向通道中写回信息
ChannelFuture future = ctx.writeAndFlush(rpcResponse);
future.addListener(ChannelFutureListener.CLOSE);

在之前,我们判断数据是否被写入是根据客户端是否正确接收到数据,现在我们选用复用通道,那么就肯定不是一次连接一次关闭了,因此我们的策略是修改通道为不直接关闭

于是先初步修改为

//1.获取请求信息
RpcRequest rpcRequest = (RpcRequest) msg;
log.info("接收到了信息{},次数:{}",rpcRequest.toString(),COUNTER.incrementAndGet());
//2.返回调用信息
//TODO:在本地发起调用...
RpcResponse rpcResponse = new RpcResponse();
rpcResponse.setMessage("complete!");
//3.向通道中写回信息
ChannelFuture future = ctx.writeAndFlush(rpcResponse);

但是这里的话就有一个问题了,就是怎么拿对应的数据出来?

第一种方案:利用我们提供的requestId作为key,存到通道域里面去,代码如下

AttributeKey<RpcResponse> key = AttributeKey.valueOf(String.valueOf(((RpcResponse<?>) msg).getRequestId()));
ctx.channel().attr(key).set(rpcResponse);

然后对应的客户端从中取出key即可

分析:这种方式存在什么问题?

它的问题主要是串行执行,当客户端想要获取数据的时候,必须串行执行任务,也就是说,当通信线程需要从通道中get到数据的时候,都会被迫因为sync()而被阻塞,但是这是避免不了的,因为只有当对方发出关闭通道的信号的时候,我才能确保这个key被写入了,否则的话必须不断轮询map

如何解决?

解决串行同步执行的一般方法是异步执行,那么对谁异步处理呢?

对处理的结果,也就是这个key-value的键值对,我们想要的是RpcResponse这个对象

  • 第一步:在客户端发送数据的时候,给这个RpcResponse创建异步回调任务,并且使用容器存储起来
  • 第二步:服务端处理数据,返回RpcResponse到客户端
  • 第三步:客户端接收到这个RpcResponse后,将异步回调任务设置为complete

由于RpcResponse对象中是否有值,取决于客户端是否处理完这个数据,而只有当客户端去强行get这个对象的时候,如果数据还没有准备好,才会将客户端的线程挂起。

因此是十分灵活的,客户端只需要将请求发送出去,然后返回这个响应所对应的异步任务,当客户端真正需要这个异步任务的结果的时候,才将自己阻塞。

为此,我们使用一个封装对象,这个对象封装了对应的请求的Id和对应的future对象,这个类还应该要提供

  • 当客户端能够接收完对象后,能够将这个futurecomplete,同时删除键值对
private static final Map<String, CompletableFuture<RpcResponse>> requestMap = new ConcurrentHashMap<>();

public void complete(RpcResponse rpcResponse) throws RpcException {
    CompletableFuture<RpcResponse> future = requestMap.remove(String.valueOf(rpcResponse.getRequestId()));
    if(future == null){
        throw new RpcException("响应并发错误!");
    }
    //否则的话就将这个future完成掉
    future.complete(rpcResponse);
}

public void put(int requestId,CompletableFuture<RpcResponse> future){
    requestMap.put(String.valueOf(requestId),future);
}

然后我们对客户端进行改造

public CompletableFuture<RpcResponse> sendMessage(RpcRequest rpcRequest) throws RpcException {
    CompletableFuture<RpcResponse> future = new CompletableFuture<>();
    InetSocketAddress inetSocketAddress = serviceDiscovery.lookUpService(rpcRequest.toServiceName());
    Channel channel = getChannel(inetSocketAddress);
    if(channel!=null && channel.isActive()) {
        //将当前的请求-响应任务加入到队列中
        UnprocessedRequests.put(rpcRequest.getRequestId(),future);
        //2.建立连接成功,向通道中写入数据,并且监听发送状态
        channel.writeAndFlush(rpcRequest).addListener((ChannelFutureListener)f-> {
            if(f.isSuccess()){
                log.info("客户端发送数据成功{}",rpcRequest.toServiceName());
            }else{
                f.channel().close();
                future.completeExceptionally(f.cause());
                log.error("客户端发送失败,原因:{}",f.cause().toString());
            }
        });
    }else{
        throw new RpcException("通道关闭异常!");
    }
    return future;
}

接着是这个客户端处理线程的改动,它要做的是接收到来自服务端的响应,然后把它complete掉

public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
    try{
        if(msg instanceof RpcResponse){
            RpcResponse rpcResponse = (RpcResponse) msg;
            log.info("客户端读取到的信息为:{}",rpcResponse.getMessage());
            //将它complete掉即可
            UnprocessedRequests.complete(rpcResponse);
        }else{
            log.info("响应格式不合法!");
        }
    }finally {
        ReferenceCountUtil.release(msg);
    }
}

3.6 基于Netty实现的RPC远程调用(服务端)

基于3.4和3.5我们对整个网络传输模块进行了异步调用的优化,接着我们完成服务端的改动

服务端是一个操作对象,因此它应该具有暴露一个服务注册的功能,于是我们添加:

public void registryService(RpcServiceConfig rpcServiceConfig){
    serviceProvider.publishService(rpcServiceConfig);
}
//1.获取请求信息
RpcRequest rpcRequest = (RpcRequest) msg;
log.info("接收到了信息{},次数:{}",rpcRequest.toString(),COUNTER.incrementAndGet());
//2.返回调用信息
//TODO:在本地发起调用...
RpcResponse rpcResponse = new RpcResponse();
rpcResponse.setMessage("complete!");
//3.向通道中写回信息
ChannelFuture future = ctx.writeAndFlush(rpcResponse);
future.addListener(ChannelFutureListener.CLOSE_ON_FAILURE);

4. 自定义通信协议

4.1 通信协议设计

目前来说,我们所实现的这个rpc框架还不需要用到通信协议

但是我们的rpc框架最终是在分布式的环境下进行通信的,假设是Java客户端与golang客户端进行通信,JDK所提供的序列化方式是golang客户端所不知道的,因此我们必须在通信协议的头部添加一个标记,规定双方使用什么样的方式来对数据进行解释,这就是通信协议存在的意义

我们设计的通信协议如下:

  • 魔数:magicCode:四字节,32bit,用来生成一个协议标志,拿到数据包后会先检查这个魔数,如果魔数合法,那么就继续解析,否则丢弃数据包

  • 版本号version:占1个字节,表示通信协议的版本号,当协议发生变化时,对等端的协议解析器也要修改相关解析代码,因此需要校验版本号后才能进行解析,否则丢弃数据包

  • 消息长度full-length:占4个字节,表示当前消息的body的长度,这个问题解决了底层TCP的半包和粘包的问题

  • 消息类型message-type,占1个字节:分别为

    • 请求消息类型
    • 响应消息类型
    • 心跳检测消息类型
  • 序列化方式codec,占1个字节,其中不同的编码代表不同解析方式状态机

以下是基于HTTP/2实现的头部/首部压缩的添加字段

  • 消息压缩方式compress:占1个字节
  • 请求ID(StreamId)requestID:4个字节

4.2 添加客户端封装成帧(包)代码

首先我们先将一些定义添加到我们的常量池中

public static final int MAGIC_NUMBER = 114514;
public static final byte VERSION = 0x1;
public static final byte RESPONSE_MESSAGE = 0x1;
public static final byte REQUEST_MESSAGE  = 0x2;
public static final byte HEARTBEAT_MESSAGE = 0x3;
public static final byte KRYO_CODEC = 0x1;
public static final byte GZIP_COMPRESS = 0x1;

然后封装一个协议包的对象

public class RpcMessage {

    /**
     * 消息类型
     */
    private byte messageType;


    /**
     * 序列化类型
     */
    private byte codec;

    /**
     * 压缩方式
     */

    private byte compress;

    /**
     * request id
     */
    private int requestId;
    /**
     * request data
     */
    private Object data;
}

由于剩下的字段都是在运行时计算得出的,或者是固定常量,因此在encoder里面写

@Override
protected void encode(ChannelHandlerContext channelHandlerContext,RpcPackage rpcPackage, ByteBuf out) throws Exception {
    if(genericClass.isInstance(rpcPackage)){
        //1.写入魔数(4B->INT)
        out.writeInt(MAGIC_NUMBER);
        //2.写入版本号1B
        out.writeInt(VERSION);
        //3.写入数据长度4B
        out.markWriterIndex();
        out.writerIndex(out.writerIndex() + 4);
        //4.写入数据类型message,1B
        out.writeByte(rpcPackage.getMessageType());
        //5.写入序列化方式,1B
        out.writeByte(rpcPackage.getCodec());
        //6.写入消息压缩格式,1B
        out.writeByte(rpcPackage.getCompress());
        //7.写入请求ID(4B->INT)
        out.writeInt(rpcPackage.getRequestId());
        //TODO:数据压缩未完成
        //TODO:这里还有数据的长度没有设置的,由于存在数据的压缩,因此这里还需要做一个数据的压缩(包体)
        byte[] serialize = serializer.serialize(rpcPackage);
        int fullLength = serialize.length+HEADER_LENGTH;
        out.writeBytes(serialize);
        //3.回去写入这个数据
        out.resetWriterIndex().writeInt(fullLength);
        //序列化完成
    }
}

接下来,在客户端中封装这个帧即可

//重新封装
            //消息类型、序列化协议、压缩类型、请求Id、请求数据:包体
            RpcPackage rpcPackage = RpcPackage.builder()
                    .messageType(REQUEST_MESSAGE)
                    .codec(KRYO_CODEC)
                    .compress(GZIP_COMPRESS)
                    .requestId(rpcRequest.getRequestId())
                    .data(rpcRequest)
                    .build();
            channel.writeAndFlush(rpcPackage).addListener((ChannelFutureListener)f-> {

4.3 添加服务端解析包头代码

服务端首先接收到数据后要先进行包的拆解和对象的还原,因此修改Encoder

if(byteBuf == null || byteBuf.readableBytes()<BODY_LENGTH){
    throw new IllegalArgumentException("参数不合法!");
}
byteBuf.markReaderIndex();
//1.检查魔数
int magicNumber = byteBuf.readInt();
if(magicNumber != MAGIC_NUMBER){
    throw new RpcException("魔数错误!不是当前协议的包");
}
//2.检查版本
byte version = byteBuf.readByte();
if(version != VERSION){
    throw new RpcException("协议版本号不一致");
}
//3.读取数据长度
int fullLength = byteBuf.readInt();
//4.读取数据的类型
byte messageType = byteBuf.readByte();
//5.读取序列化方式
byte codec = byteBuf.readByte();
//6.读取消息压缩格式
byte compress = byteBuf.readByte();
//7.获取请求ID
int requestId = byteBuf.readInt();
RpcPackage rpcPackage = RpcPackage.builder()
        .codec(codec)
        .messageType(messageType)
        .compress(compress)
        .requestId(requestId).build();

/*处理body部分*/
int bodyLength = fullLength - HEADER_LENGTH;
//1.如果body是空体
if(bodyLength == 0){
    return;
}
//2.如果body中的字节数小于规定的数量
if(bodyLength < 0){
    byteBuf.resetReaderIndex();
    log.info("出现半包现象,等待下一次I/O读取完整数据");
    return ;
}
//3.正常情况或者整包
byte[] data = new byte[bodyLength];
byteBuf.readBytes(data);
//TODO:这里可以做一个包的展开
Object result = serializer.deserialize(data,genericClass);
list.add(result);
log.info("反序列化完成");

然后修改发送响应的逻辑

@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
    try {
        if(msg instanceof RpcPackage){
            RpcPackage rpcPackage = (RpcPackage) msg;
            RpcRequest rpcRequest = (RpcRequest) rpcPackage.getData();
            log.info("接收到了信息{},次数:{}",rpcRequest.toString(),COUNTER.incrementAndGet());
            RpcPackage rpcResponse = RpcPackage.builder()
                    .requestId(rpcRequest.getRequestId())
                    .codec(KRYO_CODEC)
                    .compress(GZIP_COMPRESS)
                    .messageType(RESPONSE_MESSAGE).build();
            //3.向通道中写回信息
            Object result = rpcRequestHandler.handle(rpcRequest);
            rpcResponse.setData(result);
            ChannelFuture future = ctx.writeAndFlush(rpcResponse);
            future.addListener(ChannelFutureListener.CLOSE_ON_FAILURE);
        }
    }finally {
        ReferenceCountUtil.release(msg);
    }
}

至此,一个完整的具有自定义通信协议的RPC远程调用就被完成了

5. 心跳机制

5.1 心跳机制简述(为什么)

什么是心跳

在TCP连接中,在客户端和服务端建立连接之后,需要定期发送数据包,来通知对方自己还在线,以确保连接的有效性,如果一个连接上时间没有心跳,需要及时断开,否则服务端会维护很多很多无用的连接,浪费服务器的资源

简单来说,我们在之前做客户端性能分析与优化的时候,我们并没有设置通道啥时候关闭,它是否关闭取决于这条TCP连接什么时候失效,因此为了避免连接被操作系统回收,那么我们必须定时在这条连接上发送数据包,否则这条连接因为闲置就会被回收。

心跳的好处

  • 提供了保活的思路,避免连接被频繁回收导致高成本的连接与释放的开销
  • 提供了检测条件,能够检测连接是否还存在

IdleStateHandler

Netty 已经为我们提供了心跳的 Handler,当连接的空闲时间太长的时候,就会触发一个IdleStateEvent事件,传递到下一个Handler,可以通过PipleHandler中重写userEventTrigged来处理该事件,注意我们自己的 Handler 需要在 IdleStateHandler 后面。

5.2 IdleStateHandler

IdleStateHandler构造函数解读

public IdleStateHandler(boolean observeOutput,
            long readerIdleTime, long writerIdleTime, long allIdleTime,
            TimeUnit unit) {
}
  • observeOutput:是否考虑出站较慢的情况,如果 true:当出站时间太长,超过空闲时间,那么将不触发此次事件。如果 false,超过空闲时间就会触发事件。默认 false。
  • readerIdleTime:读空闲的时间,0 表示禁用读空闲事件。
  • writerIdleTime:写空闲的时间,0 表示禁用写空闲事件。
  • allIdleTime:读或写空闲的时间,0 表示禁用事件。
  • unit:单位

事件处理机制

IdleStateHandler 继承 ChannelDuplexHandler,重写了出站和入站的事件,我们来看看代码。
当 channel 添加、注册、活跃的时候,会初始化 initialize(ctx),删除、不活跃的时候销毁 destroy(),读写的时候设置 lastReadTimelastWriteTime 字段。

初始化

当 channel 添加、注册、活跃的时候,会初始化 initialize(ctx),下面我们就来看看初始化的代码:

其实初始化很简单,就是根据构造函数给的 读写空闲时间 去决定初始化哪些定时任务,分别是:ReaderIdleTimeoutTask(读空闲超时任务)、WriterIdleTimeoutTask(写空闲超时任务)、AllIdleTimeoutTask(读写空闲超时任务)。

定时任务
① 如果读空闲超时了,则重新起一个定时器,然后触发事件
② 如果读空闲未超时,则新起一个时间更短(readerIdleTimeNanos - ticksInNanos() - lastReadTime)的定时器

触发事件

private void invokeUserEventTriggered(Object event) {
    if (invokeHandler()) {
        try {
            // 触发事件,说白了,就是直接调用 userEventTriggered 方法而已
            ((ChannelInboundHandler) handler()).userEventTriggered(this, event);
        } catch (Throwable t) {
            notifyHandlerException(t);
        }
    } else {
        fireUserEventTriggered(event);
    }
}

其实触发事件,就是把事件传给下一个 Handler (next),就是调用 userEventTriggered 方法而已。所以我们处理心跳的 Handler 一定要写到 IdleStateHandler

5.3 客户端实现

首先我们先将这个闲置时间检测的处理器注册到引导器上

socketChannel.pipeline().addLast(new IdleStateHandler(0,5,0, TimeUnit.SECONDS));

然后当这个事件触发的时候,调用的是handler中的userEventTriggered方法,因此我们进行重写

public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
    if(evt instanceof IdleStateEvent){
        //当传来一个事件的时候
        IdleStateEvent idleStateEvent = (IdleStateEvent) evt;
        //写空闲
        if(((IdleStateEvent) evt).state().equals(IdleState.WRITER_IDLE)){
            log.info("写空闲时间触发 [{}]", ctx.channel().remoteAddress());
            Channel channel = ctx.channel();
            //向通道中发送心跳
            //...
        }
    }
}

这里的话我们思考一下,心跳到底要如何交流?

public static final byte HEARTBEAT_REQUEST_MESSAGE = 0x3;
public static final byte HEARTBEAT_RESPONSE_MESSAGE = 0x4;

public static final String PING = "PING";
public static final String PONG = "PONG";

思路是这样的,发送心跳的一方为心跳请求方,它发送一个PING出去,如果对方给我发来PONG,就说明这条连接是可用的

补充事件处理器

if(((IdleStateEvent) evt).state().equals(IdleState.WRITER_IDLE)){
    log.info("写空闲时间触发 [{}]", ctx.channel().remoteAddress());
    Channel channel = ctx.channel();
    //向通道中发送心跳
    RpcPackage rpcPackage = RpcPackage.builder()
            .compress(DUMMY_COMPRESS)
            .codec(KRYO_CODEC)
            .data(PING)
            .messageType(HEARTBEAT_REQUEST_MESSAGE).build();
    channel.writeAndFlush(rpcPackage);
}else{
    super.userEventTriggered(ctx,evt);
}

补充线程处理器

public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
    try{
        if(msg instanceof RpcPackage){
            RpcPackage rpcMessage = (RpcPackage) msg;
            byte messageType = rpcMessage.getMessageType();
            if(messageType == RESPONSE_MESSAGE){
                log.info("客户端读取到的信息为:{}",rpcMessage.getData());
                //将它complete掉即可
                UnprocessedRequests.complete(((RpcResponse) rpcMessage.getData()));
            }else if(messageType == HEARTBEAT_RESPONSE_MESSAGE){
                log.info("接收到心跳{}",rpcMessage.getData());
            }
        }else{
            log.info("响应格式不合法!");
        }
    }finally {
        ReferenceCountUtil.release(msg);
    }
}

5.4 服务端实现

服务端的话,实际上有两种情况,一种是收到心跳包后响应,一种是收到心跳包后不响应

首先第一种是就是强维护,这种心跳包是为了保证连接一直存活不断开

第二种是弱维护,这种心跳包就是为了释放资源,找到合适的时机将连接断开

这里的话我们实现第一种,PING-PONG

try {
    if(msg instanceof RpcPackage){
        RpcPackage rpcPackage = (RpcPackage) msg;
        RpcRequest rpcRequest = (RpcRequest) rpcPackage.getData();
        log.info("接收到了信息{},次数:{}",rpcRequest.toString(),COUNTER.incrementAndGet());
        //分情况讨论,分别是心跳包和非心跳包
        RpcPackage response = new RpcPackage();
        response.setCodec(KRYO_CODEC);
        response.setCompress(GZIP_COMPRESS);
        byte messageType = ((RpcPackage) msg).getMessageType();
        if(messageType == HEARTBEAT_REQUEST_MESSAGE){
            //响应pong
            response.setData(PONG);
            response.setMessageType(HEARTBEAT_RESPONSE_MESSAGE);
        }else if(messageType == REQUEST_MESSAGE){
            response.setRequestId(rpcRequest.getRequestId());
            response.setMessageType(RESPONSE_MESSAGE);
            Object result = rpcRequestHandler.handle(rpcRequest);
            if(ctx.channel().isActive() && ctx.channel().isWritable()){
                RpcResponse<Object> rpcResponse = RpcResponse.ok(result,rpcRequest.getRequestId());
                rpcPackage.setData(rpcResponse);
            }else{
                RpcResponse<Object> rpcResponse = RpcResponse.fail(rpcRequest.getRequestId(),FAIL);
                rpcPackage.setData(rpcResponse);
            }
            ctx.writeAndFlush(rpcPackage).addListener(ChannelFutureListener.CLOSE_ON_FAILURE);
        }
    }
}finally {
    ReferenceCountUtil.release(msg);
}
socketChannel.pipeline().addLast(new IdleStateHandler(30,0,0, TimeUnit.SECONDS));

实际上第二种也能够保证连接不断开,因为连接是否要断开取决于服务端

这时候,如果客户端的包长时间到不来服务端,但是实际上这条连接还是在工作的,那么这时候就要使用重连机制了

private Channel doConnect(InetSocketAddress inetSocketAddress) throws ExecutionException, InterruptedException {
    CompletableFuture<Channel> completableFuture = new CompletableFuture<>();
    b.connect(inetSocketAddress).addListener((ChannelFutureListener)future -> {
        if(future.isSuccess()){
            log.info("客户端连接服务器成功!");
            completableFuture.complete(future.channel());
        }else{
            log.error("客户端连接服务器失败!");
            throw new RpcException("客户端连接服务器失败!");
        }
    });
    return completableFuture.get();
}

private Channel getChannel(InetSocketAddress inetSocketAddress){
    Channel channel = provider.get(inetSocketAddress);
    if(channel == null){//重建连接
        try {
            channel = doConnect(inetSocketAddress);
        } catch (ExecutionException e) {
            e.printStackTrace();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        //然后这个隧道入池
        provider.set(inetSocketAddress,channel);
    }
    return channel;
}

文章作者: 穿山甲
版权声明: 本博客所有文章除特別声明外,均采用 CC BY 4.0 许可协议。转载请注明来源 穿山甲 !
  目录