从零开始实现一个简单的RPC框架(3)


在本次开发中,将基于Zookeeper实现一个简易的注册中心服务

1. 注册中心概述

在第二节中,我们将注册中心的主要服务接口进行了定义,我们现在来专门梳理注册中心:

RPC中,消费者需要请求服务提供方的接口,于是我们必须要找到服务者的地址,那么,consumer要从哪里获取这个地址呢?

能不能consumer配置provider的地址?

从实现的角度上来看,是完全可以的,但是事实上没人会这样做。

  • consumer每次引用一次接口,都需要配置一次Provider的服务地址

  • consumer引用其他业务组的服务,需要跨团队进行沟通

  • Provider如果换服务器,挂掉,新增,都需要通知到consumer去修改服务地址,配置修改不及时将会导致服务异常

  • consumer如果引用了很多服务,配置将会非常麻烦

从上面的缺点来看,最好的方式是找个地方把配置管理起来
例如,把配置放到统一的数据库中,Provider 启动的时候,把自己的地址和接口写到表中; Consumer 在请求接口之前,就可以从表里获取该接口对应的Provider地址。
其实,这种把配置统一管理的地方,就叫 注册中心

注册中心 只是 Provider 感知 Consumer 的一种方式而已,最终 Provider 调用 Consumer 接口还是以直连的方式进行。
Provider 注册或者取消注册,注册中心会通知 Consumer,保证 Consumer 感知服务状态的及时性。

注册中心的特性

存储:可以简单的将注册中心理解为一个存储系统,存储着服务与服务提供方的映射表,一般对注册中心没有太多特别的要求。

高可用:注册中心一旦挂掉,Consumer将无法获取Provider的地址,整个服务链条就断掉了

健康检测:Provider向注册中心注册服务之后,注册中心需要定期向Provider发起健康检查,当Provider宕机的时候,注册中心能够更快的发现,

监听状态:当服务增加,减少Provider的时候,还要主动通知consumer,以便consumer能够快速更新本地缓存,减少错误请求的次数,

2. 注册中心的设计与实现

注册中心接口定义

注册中心的核心业务就是注册业务查找业务,这里的话我们可以解耦合,将这两个功能定义到不同的实体上

public interface ServiceDiscovery {
    /**
     * 通过远程服务名称查找服务
     * @param rpcServiceName
     * @return
     */
    InetSocketAddress lookUpService(String rpcServiceName);
}
public interface ServiceRegistry {
    /**
     * 存储键值对:rpcServiceName->socketAddress
     * @param rpcServiceName
     * @param socketAddress
     */
    void registerService(String rpcServiceName, InetSocketAddress socketAddress);
}

本地缓存

private static final Map<String, List<String>> LOCAL_REGISTRY
        = new ConcurrentHashMap<>();

public static List<String> putOrReset(String serviceName,List<String> list){
    LOCAL_REGISTRY.put(serviceName,list);
    return list;
}

public static List<String> getForPath(String serviceName){
    return LOCAL_REGISTRY.get(serviceName);
}

3. Zookpeer实战

基于Zookpeer实现服务查找功能

封装功能

public static List<String> getChildrenNodes(CuratorFramework zkClient, String rpcServiceName){
    //1.先走缓存
    if(LocalRegistry.getForPath(rpcServiceName)!=null){
        return LocalRegistry.getForPath(rpcServiceName);
    }
    //否则的话就要先请求数据
    List<String> result = null;
    String servicePath = ZK_REGISTER_ROOT_PATH + "/" + rpcServiceName;
    try {
        result = zkClient.getChildren().forPath(servicePath);
        LocalRegistry.putOrReset(rpcServiceName, result);
    } catch (Exception e) {
        log.error("get children nodes for path [{}] fail", servicePath);
    }
    return result;
}
@Override
public InetSocketAddress lookUpService(String rpcServiceName) throws RpcException {
    List<String> childrenNodes = CuratorUtils.getChildrenNodes(zkClient, rpcServiceName);
    if (CollectionUtil.isEmpty(childrenNodes)) {
        throw new RpcException("服务已经过期或者不存在!");
    }
    //现在还无法做一个负载均衡,因此我们先默认给第一个连接
    String socket = childrenNodes.get(0);
    String[] split = socket.split(":");
    String host = split[0];
    int port = Integer.parseInt(split[1]);
    return new InetSocketAddress(host,port);
}
public static void createPersistentNode(CuratorFramework zkClient, String servicePath) {
    try {
        if (REGISTERED_PATH_SET.contains(servicePath) || zkClient.checkExists().forPath(servicePath) != null) {
            log.info("The node already exists. The node is:[{}]", servicePath);
        } else {
            //eg: /my-rpc/github.javaguide.HelloService/127.0.0.1:9999
            zkClient.create().creatingParentsIfNeeded().withMode(CreateMode.PERSISTENT).forPath(servicePath);
            log.info("The node was created successfully. The node is:[{}]", servicePath);
        }
        REGISTERED_PATH_SET.add(servicePath);
    } catch (Exception e) {
        log.error("create persistent node for path [{}] fail", servicePath);
    }
}

文章作者: 穿山甲
版权声明: 本博客所有文章除特別声明外,均采用 CC BY 4.0 许可协议。转载请注明来源 穿山甲 !
  目录