1. MQ产生的问题
系统的可用性降低:这是因为系统中引入了多一个消息队列服务,一旦消息队列服务挂掉,那么整个系统就会挂掉
系统引入的外部依赖越多,系统的稳定性越差
如何保证MQ的高可用?
系统的复杂度提高
- 如何保证消息没有被重复消费
- 怎么处理消息丢失的情况
- 如何保证消息传递的顺序性
一致性问题
一个事务,需要ABC三个系统共同协作工作完成后才能完成,假设通过MQ给ABC发送消息,AB成功了,但是C失败了,那么如何保证消息数据处理的一致性
可以使用MQ的场景如下
(1)生产者不需要从消费者处得到反馈,也就是说,这个过程不是强同步的,比如说A系统需要调用BC系统,如果处理逻辑是:A系统必须调用完了B系统,得到B系统的返回值之后才能去调用C系统,这样的场景不适用
(2)可以容忍短暂的不一致,消息队列实现的是最终一致性
(3)可以实现解耦、提速、削峰
2. RabbitMQ概述
AMQP(Advanced Message Queuing Protocol)高级消息队列协议
,它是一个网络协议,是应用层协议的一个开放标准,为面向消息的中间件设计,基于此协议的客户端与消息中间件传递消息,本质上是一个应用层协议
Publisher:消息发布者
Exchange:消息分发者
Queue:消息的具体存储载体
Routes:通过一定的规则,Exchange将消息分发到具体的Queue中
Broker:接收和分发消息的应用,RabbitMQ Server就是Messgae Broker
Vitrual Host:出于多租户和安全因素设计的,将AMQP的基本组件划分到一个虚拟的分组中,类似于网络中的namespace概念,当多个不同的用户使用同一个RabbitMQ提供的服务的时候,可以划分出多个vhost,每个用户在主机的vhost创建exchange/queue
Connection:publisher/consumer和broker之间的TCP连接
Channel:相当于在一个TCP连接上跑多个channel,类似与HTTP2中的stream流
Exchange:message到达broker的第一站,根据分发规则,匹配查询表中的routing key,分发消息到queue中去,常用的类型有direct(Point to Point)
、topc(publish-subscribe)
、fanout(multicast)
Message:消息暂存的地方,等待被consumer取走
Binding:exchange和queue之间的虚拟连接,简单来说就是一个路由表信息,当exchange需要将message转发到具体的queue的时候,需要访问这个Binding
6种工作模式:
- 简单模式
- work queues
- Publish/Subscribe发布与订阅模式
- Routing路由模式
- Topics主题模式
- RPC远程调用模式
JMS(Java Message Service)
:规定了Java平台中关于面向消息中间件的API,类似于一个接口规范
3. Rabbit-Client基本操作
生产者代码-简单模式
// 4. 创建Channel
/**
* queue – 队列的名称
* durable – 是否持久化?
* exclusive - 是否只能够有一个消费者监听这个队列,当Connection关闭时,是否删除这个队列?
* autoDelete - 是否自动删除?当没有consumer时,自动删除此队列
* arguments - 参数信息
*/
// 如果没有名为是testQueue队列,则会创建,否则会创建
channel.queueDeclare("testQueue", true, false, false, null);
/**
* exchange:交换机的名称,简单情况下,会使用默认的
* routingKey:路由键
* props:配置信息
* body:发送的消息数据
*/
String body = "test";
channel.basicPublish("", "testQueue", null, body.getBytes());
connection.close();
消费者代码
public static void main(String[] args) throws IOException, TimeoutException {
channel.basicConsume("testQueue", true, new DefaultConsumer(channel) {
// 收到消息后,会自动执行该回调方法
/**
*
* @param consumerTag 消息的唯一标识
* @param envelope 获取一些信息,交换机信息,路由key等
* @param properties 配置信息
* @param body 收到的真实数据
* @throws IOException 抛出的异常
*/
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("consumerTag:" + consumerTag + "envelope:" + envelope.toString() + "properties:" + properties + "body:" + new String(body));
}
});
// 监听程序,需要关闭吗?
// 不需要
// connection.close();
}
4. Rabbit工作模式详解
4.1 工作队列模式
这种模式相对于简单模式,多了一个或者一些消费端,多个消费端共同消费同一个队列中的消息,但是要注意,同一条信息只能被一个consumer消费,因此这些消费者是处于一个竞争的状态
应用状态:对于任务过重的模式,可以使用这种模式,可以由多个消费者进程同时处理任务
4.2 Pub/Sub 发布订阅模式
多了一个Exchange角色,而且过程略有变化
- Exchange:交换机(X),一方面,接收生产者发送的消息,另一方面,知道如何处理消息,例如递交给某个特别的队列,递交给所有的队列,或者是将消息丢弃,至于如何操作,需要取决于Exchange的类,常见的有以下三种类型
- Fanout:广播,将消息交给所有绑定到交换机的队列
- Direct:定向,把消息交给符合指定routing key的队列
- Topic:通配符,把消息交给符合routing pattern(路由模式)的队列
Exchange
只负责转发消息,不具备存储消息的能力,因此没有任何队列与Exchange绑定,或者没有命中路由规则,那么消息就会丢失
/**
* 1. exchange:交换机的名称
* 2. type:交换机类型,BuiltinExchangeType.DIRECT等
* 3. durable:是否持久化
* 4. autoDelete:是否自动删除交换机
* 5. internal:是否内部使用,一般false
* 6. arguments:参数
*/
channel.exchangeDeclare();
4.3 Routing
- 队列与交换机绑定,不能是任意绑定,而是需要指定一个RoutingKey
- 消息的发送方在向Exchange发送消息的时候,也必须要执行消息的RoutingKey
- Exchange不再把消息交给每一个绑定的队列,而是根据消息的RoutingKey进行判断,只有队列的RoutingKey与消息的RoutingKey完全一致才会收到消息