我们以创建容器的服务为例,将项目转为dubbo的服务
1. 改造服务提供者
第一步:定义服务接口
public interface ContainerService {
/**
* 在宿主机中创建容器
* @param vmContainer:容器端口信息
* @param imageName:镜像名称
* @param containerName:容器名称
* @return 当ID为null的时候代表容器创建失败,否则就是创建成功
*/
VmContainer createContainer(VmContainer vmContainer,String imageName,String containerName) throws ContainerException, IOException;
}
第二步:导入依赖
<!-- https://mvnrepository.com/artifact/org.apache.dubbo/dubbo -->
<dependency>
<groupId>org.apache.dubbo</groupId>
<artifactId>dubbo</artifactId>
<version>2.7.3</version>
</dependency>
<dependency>
<groupId>org.apache.dubbo</groupId>
<artifactId>dubbo-registry-redis</artifactId>
</dependency>
第三步:编写具体实现类
public class ContainerServiceImpl implements ContainerService {...省略}
第四步:配置注册中心
<dependency>
<groupId>com.alibaba.boot</groupId>
<artifactId>dubbo-spring-boot-starter</artifactId>
<version>0.2.0</version>
</dependency>
<dependency>
<groupId>redis.clients</groupId>
<artifactId>jedis</artifactId>
<version>3.7.1</version>
</dependency>
<dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-pool2</artifactId>
<version>2.11.1</version>
</dependency>
第五步:添加服务注解
@Service
public class ContainerServiceImpl implements ContainerService {}
在这个过程中,其实完成了两件事情
- 将一个类型为
ContainerService
的Bean然后将这个实现类设置为这个bean的具体实现类 - 将这个bean注入到了IoC容器中
第六步:开启启动类注解
@EnableDubbo
public class VmNodeApplication {}
它实际上就是扫描当前包下所有具有这个DubboService
的所有类,并且完成那两步操作
2. 改造服务消费者
第一步:添加相关的依赖
第二步:添加配置文件的相关信息
第三步:编写测试用例
@Reference
private ContainerService containerService;
@Test
public void testHello(){
System.out.println(containerService.helloWorld());
}
第四步:启动项目
3. 负载均衡方案
梳理目前的需求:
- 平台上部署了多台物理节点,并且是以
Redis
作为注册中心进行注册的,要做的是将容器的创建请求通过权重轮询
的方法,分配到当前时刻负载最低的那一台节点上 - 此时就产生了问题,将容器具体分配到哪个节点是RPC框架帮我们完成的,并不知道这个容器在哪台服务器上
- 又希望能够以一个
一致性哈希
的方法,将删除请求、查询请求等这些需要精准知道容器的位置的请求所对应的机器给对应上
API
定义如下
public interface ContainerService {
//创建容器请求
VmContainer createContainer(String imageName, String containerName) ;
void stopContainer(String containerId) throws IOException;
void startContainer(String containerId) throws IOException;
void removeContainer(String containerId) throws IOException;
}
样例说明:比如说现在有节点A192.168.132.1
节点B192.168.132.2
、节点C192.168.132.3
然后通过调用ContainerService
的创建容器的请求,分别将:
容器1分配到了节点A,容器2、3分配到了节点B
现在通过调用removeContainer(节点A.ID)
,希望能够找到这个容器所对应的机器并且删除
第一个方案:将当前的服务拆开,分别分为两类:
- 创建容器类:这个请求是依据权重轮询算法进行分配的,具体的步骤就是通过采集各台物理机上的CPU、硬盘、内存等参数,然后对URL做个排序,然后选择这些URL即可,在选择完毕后,将当前
容器ID
作为key,然后物理节点的地址
作为IP,存储到Redis
中
或者使用Mysql也可以
- 删除/停止容器类:由于这一类的请求需要精确到每一台机器上,因此这种就是属于一种
一致性哈希
的算法,具体算法流程就是:当拿到一个容器ID的时候,从Redis
中去拿value,然后就可以精确地定位到这台物理机上
4. 负载均衡落地实现
- 第一步:将服务方法进行拆分
public interface ContainerCreateService {
VmContainer createContainer(VmContainer vmContainer, String imageName, String containerName) throws ContainerException, IOException, InterruptedException ;
String ping();
}
public interface ContainerOperationService {
void stopContainer(String containerId) throws IOException;
void startContainer(String containerId) throws IOException;
void removeContainer(String containerId) throws IOException;
String ping();
}
第二步:将服务类的方法进行复原,省略
5. 自定义负载均衡算法实战
编写加权轮询的负载均衡算法
第一步:先定义一个LoadBalance
,并且实现相关方法
public class LoadBalanceWithWeights implements LoadBalance {
@Override
public <T> Invoker<T> select(List<Invoker<T>> list, URL url, Invocation invocation) throws RpcException {
return null;
}
}
第二步,编写具体算法
@Component
@Slf4j
public class LoadBalanceWithWeights implements LoadBalance {
@Resource
private StringRedisTemplate stringRedisTemplate;
@Override
public <T> Invoker<T> select(List<Invoker<T>> list,
URL url, Invocation invocation) throws RpcException {
//1. 获取所有的键值对:IpEntry<Ip,LoadData>,并且定义排序规则
TreeMap<IPEntry,Integer> treeMap = new TreeMap<>(this::getSort);
//2.开始填充LoadData
//2.1 获取长度
int length = list.size();
for (int i = 0;i<length;i++) {
//2.2 组装key,获取当前的LoadData
String ip = list.get(i).getUrl().getIp();
String key = RedisConst.LOAD_DATA_KEY + ip;
//2.3 获取当前的负载信息
String jsonStr = Objects.requireNonNull(stringRedisTemplate.opsForZSet().popMax(key)).getValue();
//2.4 解析为LoadData的对象
LoadData loadData = JSONUtil.toBean(jsonStr, LoadData.class);
treeMap.put(new IPEntry(ip, loadData),i);
}
//3. 取出最合适的
IPEntry res = treeMap.lastEntry().getKey();
Integer index = treeMap.lastEntry().getValue();
log.info("当前的ip提供者是:{},负载是:{}",res.getIp(),res.getLoadData());
return list.get(index);
}
private int getSort(IPEntry e1, IPEntry e2) {
LoadData d1 = e1.getLoadData();
LoadData d2 = e2.getLoadData();
//1. 比较内存
if (d1.getRamFree() - d2.getRamFree() > 0.0) {
return 1;
} else if (d1.getRamFree() - d2.getRamFree() < 0.0) {
return -1;
}
//2. 比较磁盘
if (d1.getDiskFree() - d2.getDiskFree() > 0.0) {
return 1;
} else if (d1.getDiskFree() - d2.getDiskFree() < 0.0) {
return -1;
}
//3. 比较CPU的利用率
if (d1.getCpuFree() - d2.getCpuFree() > 0.0) {
return 1;
} else if (d1.getCpuFree() - d2.getCpuFree() < 0.0) {
return -1;
}
return 0;
}
}
第三步,在resources
文件夹下创建META-INF.dubbo
文件夹,然后创建org.apache.dubbo.rpc.cluster.LoadBalance
文本文件,输入一个键值对
key是你定义的名字,value的类名
loadBalanceWithWeight=org.jeecg.vm.loadBalance.LoadBalanceWithWeights
第四步,在需要使用负载均衡的接口上使用你自己的负载均衡算法名称即可
@Reference(version = "1.0",loadbalance = "loadBalanceWithWeight")
private ContainerCreateService containerCreateService;
同理,编写删除等操作的接口也是如法炮制,下面直接贴代码
@Override
public <T> Invoker<T> select(List<Invoker<T>> list, URL url, Invocation invocation) throws RpcException {
//1. 将containerId拿出来
Object[] arguments = invocation.getArguments();
if(!(arguments[0] instanceof String)){
throw new RpcException("参数有误!");
}
//2. 从redis中取出nodeId
String containerId = (String) arguments[0];
String ip = stringRedisTemplate.opsForValue().get(CONTAINER_ALIVE + containerId);
log.info("对应的IP地址是{}",ip);
//3. 获取对应的服务
Invoker<T> invoker = null;
for (Invoker<T> tInvoker : list) {
if(tInvoker.getUrl().getIp().equals(ip)){
invoker = tInvoker;
break;
}
}
return invoker;
}
至此,自定义负载均衡就完成了
6. 其他相关服务的改动
首先先来设计一下关于容器负载的key-value
,它首先是一个hash结构,外层的key是:container_alive:id
key | value |
---|---|
node_ip | 该容器所在的物理机的IP地址 |
load_data | 该容器在当前时刻的负载信息 |
改动1:容器在被删除后,要删除掉对应的key
//在此处修改,将key给删除掉
String key = CONTAINER_ALIVE + containerId;
stringRedisTemplate.opsForHash().delete(key);
log.info("{}已经被删除了",containerId);
改动2:容器在创建的时候,需要在redis中添加对应的key
//在redis中添加相关代码
String key = CONTAINER_ALIVE+containerId;
String ip = nodeLoadInfoService.getIPv4();
stringRedisTemplate.opsForHash().put(key+vmContainer.getId(),"node_ip",ip);
log.info("在redis中注册容器信息{}:{}",key,ip);
改动3:容器相关接口替换为RPC的实现
//通过RPC调用来使用计算节点的功能
cn.edu.scau.vm.entity.VmContainer rpcContainer = BeanUtil.copyProperties(vmContainer, cn.edu.scau.vm.entity.VmContainer.class);
try {
containerCreateService.createContainer(rpcContainer,bindImage.getHubUrl(),containerName);
} catch (ContainerException | IOException | InterruptedException e) {
e.printStackTrace();
}
改动4:完善controller
这里的动作比较多,略
改动5:关于容器负载的具体实现
@Scheduled(cron = "0/10 * * * * ?")
private void containerLoadData() {
List<Container> containers = dockerClient.listContainersCmd().exec();
for (Container container : containers) {
String containerId = container.getId();
String cmd = CONTAINER_LOAD_CMD_PRE+containerId+CONTAINER_LOAD_CMD_SUF;
String key = RedisConst.CONTAINER_ALIVE+containerId;
String ret = null;
try {
ret = ExecUtils.execSingleCmd(ExecUtils.getConnectWithPassword(nodeLoadInfoService.getIPv4(), nodeLoadInfoService.getPort(), nodeLoadInfoService.getHost(), nodeLoadInfoService.getPassword()), cmd);
} catch (SSHException | IOException e) {
e.printStackTrace();
}
List<String> allInfo = getAllInfo(ret);
//[36afcd8d8e0a] [0.13%] [6.926MiB / 972.1MiB] [656B / 0B] [0B / 0B] [0.71%]
//[{{.Container}}] [{{.CPUPerc}}] [{{.MemUsage}}] [{{.NetIO}}] [{{.BlockIO}}] [{{.MemPerc}}]
//写入对象
//覆盖写入
stringRedisTemplate.opsForHash().put(key,RedisConst.CONTAINER_LOAD_DATA+containerId,jsonStr);
}
思路就是获取当前及其上的所有容器ID,然后查询这些容器的具体负载,然后丢到redis上去
改动6:负载均衡修改数据结构