RabbitMQ基础学习


https://www.rabbitmq.com/

1. MQ产生的问题

系统的可用性降低:这是因为系统中引入了多一个消息队列服务,一旦消息队列服务挂掉,那么整个系统就会挂掉

系统引入的外部依赖越多,系统的稳定性越差

如何保证MQ的高可用?

系统的复杂度提高

  • 如何保证消息没有被重复消费
  • 怎么处理消息丢失的情况
  • 如何保证消息传递的顺序性

一致性问题

一个事务,需要ABC三个系统共同协作工作完成后才能完成,假设通过MQ给ABC发送消息,AB成功了,但是C失败了,那么如何保证消息数据处理的一致性

可以使用MQ的场景如下

(1)生产者不需要从消费者处得到反馈,也就是说,这个过程不是强同步的,比如说A系统需要调用BC系统,如果处理逻辑是:A系统必须调用完了B系统,得到B系统的返回值之后才能去调用C系统,这样的场景不适用

(2)可以容忍短暂的不一致,消息队列实现的是最终一致性

(3)可以实现解耦、提速、削峰

2. RabbitMQ概述

AMQP(Advanced Message Queuing Protocol)高级消息队列协议,它是一个网络协议,是应用层协议的一个开放标准,为面向消息的中间件设计,基于此协议的客户端与消息中间件传递消息,本质上是一个应用层协议

Publisher:消息发布者

Exchange:消息分发者

Queue:消息的具体存储载体

Routes:通过一定的规则,Exchange将消息分发到具体的Queue中

Broker:接收和分发消息的应用,RabbitMQ Server就是Messgae Broker

Vitrual Host:出于多租户和安全因素设计的,将AMQP的基本组件划分到一个虚拟的分组中,类似于网络中的namespace概念,当多个不同的用户使用同一个RabbitMQ提供的服务的时候,可以划分出多个vhost,每个用户在主机的vhost创建exchange/queue

Connection:publisher/consumer和broker之间的TCP连接

Channel:相当于在一个TCP连接上跑多个channel,类似与HTTP2中的stream流

Exchange:message到达broker的第一站,根据分发规则,匹配查询表中的routing key,分发消息到queue中去,常用的类型有direct(Point to Point)topc(publish-subscribe)fanout(multicast)

Message:消息暂存的地方,等待被consumer取走

Binding:exchange和queue之间的虚拟连接,简单来说就是一个路由表信息,当exchange需要将message转发到具体的queue的时候,需要访问这个Binding

6种工作模式:

  • 简单模式
  • work queues
  • Publish/Subscribe发布与订阅模式
  • Routing路由模式
  • Topics主题模式
  • RPC远程调用模式

JMS(Java Message Service):规定了Java平台中关于面向消息中间件的API,类似于一个接口规范

3. Rabbit-Client基本操作

生产者代码-简单模式


// 4. 创建Channel
/**
 * queue – 队列的名称
 * durable – 是否持久化?
 * exclusive - 是否只能够有一个消费者监听这个队列,当Connection关闭时,是否删除这个队列?
 * autoDelete - 是否自动删除?当没有consumer时,自动删除此队列
 * arguments - 参数信息
 */
// 如果没有名为是testQueue队列,则会创建,否则会创建
channel.queueDeclare("testQueue", true, false, false, null);
/**
 * exchange:交换机的名称,简单情况下,会使用默认的
 * routingKey:路由键
 * props:配置信息
 * body:发送的消息数据
 */
String body = "test";
channel.basicPublish("", "testQueue", null, body.getBytes());
connection.close();

消费者代码

public static void main(String[] args) throws IOException, TimeoutException {
    channel.basicConsume("testQueue", true, new DefaultConsumer(channel) {
        // 收到消息后,会自动执行该回调方法

        /**
         *
         * @param consumerTag 消息的唯一标识
         * @param envelope 获取一些信息,交换机信息,路由key等
         * @param properties 配置信息
         * @param body 收到的真实数据
         * @throws IOException 抛出的异常
         */
        @Override
        public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
            System.out.println("consumerTag:" + consumerTag + "envelope:" + envelope.toString() + "properties:" + properties + "body:" + new String(body));
        }
    });
    // 监听程序,需要关闭吗?
    // 不需要
    // connection.close();

}

4. Rabbit工作模式详解

4.1 工作队列模式

这种模式相对于简单模式,多了一个或者一些消费端,多个消费端共同消费同一个队列中的消息,但是要注意,同一条信息只能被一个consumer消费,因此这些消费者是处于一个竞争的状态

应用状态:对于任务过重的模式,可以使用这种模式,可以由多个消费者进程同时处理任务

4.2 Pub/Sub 发布订阅模式

多了一个Exchange角色,而且过程略有变化

  • Exchange:交换机(X),一方面,接收生产者发送的消息,另一方面,知道如何处理消息,例如递交给某个特别的队列,递交给所有的队列,或者是将消息丢弃,至于如何操作,需要取决于Exchange的类,常见的有以下三种类型
    • Fanout:广播,将消息交给所有绑定到交换机的队列
    • Direct:定向,把消息交给符合指定routing key的队列
    • Topic:通配符,把消息交给符合routing pattern(路由模式)的队列

Exchange只负责转发消息,不具备存储消息的能力,因此没有任何队列与Exchange绑定,或者没有命中路由规则,那么消息就会丢失

/**
 * 1. exchange:交换机的名称
 * 2. type:交换机类型,BuiltinExchangeType.DIRECT等
 * 3. durable:是否持久化
 * 4. autoDelete:是否自动删除交换机
 * 5. internal:是否内部使用,一般false
 * 6. arguments:参数
 */
channel.exchangeDeclare();

4.3 Routing

  • 队列与交换机绑定,不能是任意绑定,而是需要指定一个RoutingKey
  • 消息的发送方在向Exchange发送消息的时候,也必须要执行消息的RoutingKey
  • Exchange不再把消息交给每一个绑定的队列,而是根据消息的RoutingKey进行判断,只有队列的RoutingKey与消息的RoutingKey完全一致才会收到消息

文章作者: 穿山甲
版权声明: 本博客所有文章除特別声明外,均采用 CC BY 4.0 许可协议。转载请注明来源 穿山甲 !
  目录