消息队列入门知识


1. 什么是消息队列?

从操作系统的层面上讲:消息队列是在操作系统中维护的一个内核结构,这个结构可以被用于做一个进程间通信,具体的原理是在内核中生成一条消息队列,然后供订阅它的进程取数据和放数据,但是它的缺点就是存放的消息格式具有一定的限制,同时大小也具有一定的限制,在每次存取数据的时候,还需要发生内核态和用户态的切换

开发中用的分布式消息队列:首先它是一种先进先出的队列(数据结构),部署在专门服务器上,属于中间件的一种,工作的方式是这样的:

远端服务器上部署一个消费者进程,然后消费者进程不断往消息队列中指定的topic中取数据

远端服务器上部署一个生产者进程,然后生产者进程不断往消息队列中指定的topic中放数据

以此来达到生产者和消费者之间的一个解耦

2. 消息队列的作用是什么?

使用消息队列可以带来以下的好处:

  • 通过异步处理来提高QPS,比如说在项目中使用的Redis的消费者组的功能,通过这个功能,我们的用户不需要等到全部数据入库到MySQL中才返回秒杀结果,而是在判断库存后,生产一条消息,然后将这条消息投递到消息队列中,然后我们的后端服务器保证这个消息被可靠消费,这样的话就可以实现用户的响应得到了加快,提高了系统的性能
  • 削峰/限流:这个问题在腾讯一面的时候被问过,这里的话再深入理解一下

原问题:如果你服务端的QPS无限制地提高,会不会大量的请求到来,然后把你的服务端打宕机呢?

会的。这种情况通常是因为我们没有对我们服务器能够承受的QPS做一个预估,当超过的时候,有两种思路:

(1)将超过能力范围的请求拒绝掉,返回限流错误到客户端,限流

(2)将超过能力范围的请求忽略掉,直接返回服务端错误,削峰

这两种策略本质上都是为了降低QPS到我们服务器能够承受的范围之内,概括起来就三点,错峰限流削峰

下面简要说明一下这三种东西的实现原理

什么是错峰策略?

错峰:将请求峰值错开,服务器的QPS峰值可以看做是一系列的QPS峰值请求的叠加,因此可以通过一些技术手段来错开这些小峰值,从而降低最大的QPS,可以从服务端和客户端两个角色进行错峰

  • 服务端错峰策略:如果触发客户端的请求动作是通过服务端主动下发实现的,那么服务端就可以分批下发给客户端
  • 客户端错峰策略:常用的有一个随机算法,客户端采用随机的延迟,每个客户端在一定时间范围内,以一定的延迟去触发请求动作

什么是限流策略?

限流策略主要是为了消除频繁或者不必要的请求,常见的限流策略有令牌桶等

什么是削峰策略?

削峰从本质上来说就是更多地延缓用户的请求,但是不拒绝用户的请求,以及层层过滤用户的访问需求,遵从一个最后落地到数据库的请求数在单位时间内要尽可能少

消息队列解决削峰:用消息队列来缓冲瞬时流量,把同步的直接调用转换成异步的间接推送,中间通过一个队列在一端承接瞬时的流量洪峰,另一端平滑地将消息推送出去

典型的就是项目中将用户的请求存储到消息队列之后就立即返回结果,然后系统再对消息进行消费,因为用户请求数据写入消息队列之后就立即返回给用户了,但是请求数据在后续的业务校验,数据库操作中可能会失败,因此在使用消息队列进行处理之后,需要定义一种半成功的状态,这种状态就是后端服务器收到了请求,但是没有处理完毕,一种可行的方案就是在处理完这个请求之后,通过回调电子邮件或者是短信通信的方式来让用户确认结果

过滤掉一些无用的请求:它就像一个漏斗一样,通过在不同层过滤无效的请求,通过CDN过滤掉大量的图片,静态资源的请求,然后再通过Redis这样的分布式缓存,将一些读请求拦截在上游,避免落到下游数据库中

这样的削峰策略会带来什么问题?

  • 对写数据进行基于时间的合理分片,过滤掉过期的失效请求
  • 对写请求做限流的保护,将超出系统承载能力的请求给过滤掉
  • 设计到的读数据不做强一致性校验,减少因为一致性校验而产生的性能瓶颈

3. 消息队列是如何降低系统的耦合性的?

使用消息队列可以降低系统的耦合性,如果模块之间不存在直接调用,那么新增模块/修改模块就对其他模块的影响较小,这样的话对系统的可扩展性就会更好一些

生产者(客户端)发送消息到消息队列中去,接收者(服务端)处理消息,需要消费的系统直接去消息队列中取消息进行消费而可以不需要和其他系统耦合,这也提高了系统的扩张性

消息队列使用一个发布-订阅模式/点对点订阅模式(一个消息只有一个消费者)进行工作,消息发送者(生产者)发布消息,一个或者多个消息接收者订阅消息

对于新增的业务,只要对该类消息感兴趣的话,就可以订阅这个消息,对原有系统和业务没有任何的影响,从而实现网站业务的可扩展性的设计

消息接收者对消息进行过滤,处理,包装之后,构建一个新的消息类型,将消息继续发送出去,等待其他消息接收者订阅这个消息,因此这是一个基于事件驱动的模型

为了避免消息队列服务器宕机造成消息的丢失,会将成功发送到消息队列的消息存储在消息生产者服务器上,等到消息被真正消费者服务器处理后才会删除消息,在消息队列服务器宕机之后,生产者服务器会选择分布式消息队列服务器集群中的其他服务器发布消息

4. 如何基于消息队列实现分布式事务?

分布式事务的解决方案之一是MQ

5. 使用消息队列会带来哪些问题?

  • 系统的可用性会降低:系统可用性在某种程度上会降低,在加入MQ之前,不需要考虑消息丢失或者MQ挂掉的情况,但是在引入之后就需要考虑
  • 系统的复杂性提高:加入MQ之后,需要保证消息没有被重复消费,还要处理消息丢失,保证消息传递的顺序问题
  • 一致性问题:万一消息的真正消费者没有正确消费消息怎么办?(指的是没有按照规定的协议处理消息),这样就可能导致数据不一致了

6. 什么是JMS?

Java(JavaMessageServiceJava)消息服务,是Java的消息服务,JMS的客户端之间可以通过JMS服务进行异步的消息传输。JMSAPI是一个消息服务的标准或者规范,允许应用程序基于JavaEE平台创建、发送、接收和读取消息,它使得分布式通信耦合度更低,消息服务更加可靠以及实现异步性

点对点(P2P)模型:使用队列(Queue)使用消息通信的载体,满足生产者与消费者模式,一条消息只能够被一个消费者使用,未被消费的消息在队列中保留知道消费或者超时。

发布/订阅(pub/sub)模型:发布订阅模型(Pub/Sub)使用主题(Topic)作为消息通信载体,类似于广播模式,发布者发布一条消息,这个消息通过主题传递给所有的订阅者,在一条消息广播之后才订阅的用户则是收不到这条消息的

7. AMQP是什么?

Advanced Message Queuing Protocol,一个提供统一消息服务应用层标准(高级消息队列协议,二进制应用层协议),是应用层协议的一个开发标准,为面向消息的中间件设计的,兼容JMS,基于这个协议的客户端和消息中间件可以传递消息,不接收客户端/中间件同产品,不同开发语言等条件的限制

AMQP为消息定义了线路层(wire-level protocol)的协议,而JMS所定义的是API规范

8. RPC和消息队列有什么区别?

RPC是一个应用框架,或者说是一种思想,而消息队列是一个中间件

  • 从用途来看:RPC通过屏蔽底层复杂的网络通信细节以及提供自定义协议接口来实现两个服务之间的远程调用通信,这个通信过程就好像调用本地方法一样简单,它本身不涉及存储,存储的数据仅仅只是远端服务器之间的在线和离线情况,基于注册中心实现。而消息队列主要用来降低系统的耦合性,实现任务的异步,有效地进行流量削峰
  • 从通信方式上来看:RPC是直接双向网络通信,也就是远程端底层通过Socket这样的套接字来描述连接,而消息队列则是通过服务端<=>消息队列<=>客户端这样的方式来实现的
  • 从架构上来看,消息队列需要把消息存储起来,RPC没有这个要求
  • 从请求处理的时效性来看,通过RPC发出的调用一般来说会被立即处理,但是存放在消息队列中的消息不一定也会被立即处理

9. 什么是Kafka?主要的应用场景有哪些?

Kafka是一个分布式流式计算平台,流平台具有三个关键的功能:

  • 消息队列:发布和订阅消息流,这个功能类似于消息队列,因此它可以作为一个消息队列来进行使用
  • 容错的持久方式存储记录消息流:Kafka会把消息持久化到磁盘,有效避免了消息丢失的封信
  • 流式处理平台:在消息发布的时候进行处理,Kafka提供了一个完整的流式处理类库

Kafka主要有两大的应用场景:

  • 消息队列:建立实时流数据管道,以可靠地在系统或者应用程序之间获取数据
  • 数据处理:构建实时的流数据处理程序来转换或者处理数据流

10. Kafka的优势在哪里?

Kafka的优势主要如下:

  • 极致的性能:基于ScalaJava语言进行开发,设计中大量使用了处理和异步的思想,最高可以每秒处理前往级别的消息
  • 生态系统的兼容性无可匹敌:Kafka与周边生态系统的兼容性是最好的,尤其是在大数据和流计算领域

Kafka在过去曾经是不完善的,而通过逐步的迭代,目前已经相对完善了

11. 简述队列模型?Kafka的消息模型是怎么样的?

队列模型:使用队列Queue作为消息通信的载体,满足生产者与消费者模式,一条消息只能够被一个消费者使用,没有被消费的消息在队列中保留知道被消费或者超时,但是这种模型有一定的局限性:

  • 需要将生产者产生的消息分发给多个消费者,并且每个消费者都能够接收到完整的消息内容

一种解决办法是这样的:为每个消费建立一个单独的队列,让生产者发送多份,这是一种非常愚蠢的做法。

  • 浪费空间
  • 使用消息队列,是为了解耦生产者和消费者之间的关系,如果是按照上面这种做法的话,那么生产者就要实时去感知消费者的上下线情况,这样就无法解耦了

发布-订阅模型(Pub-Sub):通过使用主题Topic作为消息通信的载体,类似于一种关闭模式,发布者发布一条消息,这个消息通过主题传递给所有的订阅者。

在发布-订阅模型中,如果只有一个订阅者,那么它和队列模型就基本上就是一样的了,因此发布订阅模型可以兼容队列模型。

12. Kafka中的核心概念

总体上来看,Kafka将生产者发布消息发送到Topic中,需要这些消息的消费者可以订阅这些Topic

Producer

产生消息的一方,这个应用程序将生产出来的消息发送到指定Broker一个Topic

Consumer

消费消息的一方,从Topic中取出数据,然后消费者从Topic中读取数据进行消费

Broker

可以看做是一个独立的kafka实例,多个Kafka Broker组成一个Kafka Cluster,由一个唯一的ID进行表示,负责消息的存储和转发,而每个Kafka集群中会选举一个Broker作为Controller,它负责执行分区、副本的分配、replica leader选举,数据的调度复制和迁移

每个Broker包含了有TopicPartition这些概念

Topic

Producer将消息发送到特定的主题,Consumer通过订阅特定的Topic来消费消息,Kafka维护消息的种类,每一类消息由Topic进行标识

Partition

Partition(分区):它属于Topic的一部分,一个Topic中可以有多个Partition,并且在同一个Topic下的Partition可以分布在不同的Broker上,这也就表明一个Topic可以跨越多个Broker,它被称为是Topic物理上的分组

Zookeeper

Zookeeper负责维护整个kafka集群的状态,存储Kafka各个节点的信息以及状态,实现Kafka集群的高可用,协调Kafka的工作内容

13. Zookpeer在Kafka中扮演了什么样的角色

Broker和Consumer有使用到Zookpper,而Producer没有使用到Zookpper,这是因为Producer不需要根据Zookpeer来获取集群的状态

在底层的实现中,Producer不需要从ZooKeeper来获取集群的状态,而是在Producer的配置信息中指定多个Broker节点来进行信息的发送,同 时和指定的Broker进行连接的建立,然后通过相关指令来获取集群中的状态信息。通过这个info就可以知道这时候Producer可以知道集群中有多少个Broker是否在存活状态,然后每个Broker上的Topic上有多少个Partition,然后Producer会将这些元信息读取到Producer的内存中

如果Producer向集群中一台Broker节点发送信息超时等故障,那么Producer会主动刷新该内存中的元信息,以此来获取当前Broker集群中的最新状态。

关键:leader选举和follower信息同步

从这个图可以看出,在Kafka集群中,操作数据的基本单位是Partition(分片),每个topic下设置了若干个分片,总体来说,这个图是干这样的事情:leader处理partition的所有读写请求,与此同时,follower会被动定期地去复制leader上的数据,Topic的分区被放在不同的Broker中,保证ProducerConsumer错开访问Broker,避免访问单个Broker造成过度的IO压力,使得负载均衡,注意这边涉及到了一定的负载均衡算法,通过这个负载均衡算法可以使得这些topic上的分区分布在不同的Broker上,可以以一定的概率限制生产者和消费者同时访问同一个Broker

下面来讲讲Zookeeper的具体作用:

关于Broker的注册

Broker是一个kafka的实例,如果我们要实现一个topic形成不同的分片,然后将这些分片分配到不同的Broker上的时候还要能够保证我们在访问这个kafka集群的时候,能够获取到这个topic上的完整数据,这就要求我们有一个根目录服务器,这个根目录服务器就充当一个注册系统,就可以将整个集群中Broker管理起来,这时候就用到了Zookeeper,在ZK上会有一个专门用来进行Broker服务器列表记录的节点,这样的话就可以知道当前集群中有几个Broker实例

每个Kakfa实例启动的时候,都会连接到ZK上,也就是到/brokers/ids下创建属于自己的节点,Kakfa使用了全局唯一性ID来指代每一个Broker服务器,创建完节点之后,每个Broker就会将自己的IP地址和端口信息记录到这个节点上去,其中Broker创建的节点类型是临时节点,一旦Broker宕机,那么对应的临时节点也会被删除

这个就很像项目中,我们启动一个vm_client,然后向redis中写入一个ID=>{IP,port}这样的东西!

关于Topic的注册

Kafka中,同一个topic的消息会被分成多个分区并且将其分布在多个Broker上,这个分区信息以及和Broker的对应关系也都是由Zookpeer在维护,由专门的节点来维护的,比如说/borkers/topics

kafka中每个topic都会以/brokers/topic/[topic]的形式被记录下来,比如说我们起了一个topic叫做login,当这个Broker服务器启动之后,会对应到Topic节点(/brokers/topics)上,这时候会生产一个全局唯一性的ID然后写入针对该Topic的分区总数,比如说我们会看到/brokers/topics/login/3->2,它表示意思讲,对于login这个topic的消息,提供了两个分区进行消息的存储,同样,这个分区节点也是临时节点

可以实现生产者的负载均衡

由于同一个Topic消息会被分区并且将其分布在多个Broker上,因此生产者需要将消息合理地发送到这些分布式的Broker上,也就说说,当一个Topic可以选择若干个分区发送的时候,这时候应该要基于一定的负载均衡算法,将相关的分片发送到合理的Broker上来保证负载均衡,在负载均衡层面,Kafka支持传统的四层负载均衡算法,也支持ZK方式来实现负载均衡

(1)四层的负载均衡,根据生产者的IP地址和端口来为其确定一个相关联的Broker,通常来说,一个生产者只会对应单个Broker,然后这个生产者产生的消息都会发送到这个Broker上,这种方式的逻辑十分简单,每个上生产者不需要和其他系统建立额外的TCP连接,只需要和Broker维护单个TCP连接就可以了,但是无法做到真正的负载均衡,因为实际系统中的每个生产者产生的消息量以及每个Broker的消息存储量都是不同的,如果有些生产者产生的的消息远多于其他生产者的话,那么就会导致不同的Broker接收到的消息总数差异绝大,同时生产者也无法实时感知到Broker的上线和下线

(2)基于ZK实现的负载均衡,当每个Broker启动的时候,都会完成Broker注册过程,生产者会通过这个节点的变化来动态感知到Broker服务器列表的变更,这样就可以实现动态的负载均衡机制

可以实现消费者的负载均衡

与生产者类似,Kafka中的消费者同样需要进行负载均衡来实现多个消费者合理地从对应的Broker服务器上接收消息,每个消费者分组包含若干个消费者,每条消息都会发送给分组中一个消费者,不同的消费者分组消费自己特定的Topic下的消息,互不干扰

分区和消费者的关系:在Kafka中,规定了每个消息分区只能被同组的一个消费者进行消费

消费组(Consumer Group):consumer group下有多个Consumer,对于每个消费者组Consumer Group,Kafka都会为其分配一个全局唯一的Group ID,Group内部的所有消费者共享该ID,订阅topic下的每个分区只能分配给某个group下的一个consumer(当然这个分区还可以被分配给其他的group)

同时,kafka为消费者分配一个ConsumerID,通常会采用Hostname:UUID的形式来表示

需要在ZK上记录了消息分区和Consumer之间的关系,每个消费者一旦确定了对一个消息分区的消费权利的时候,就需要将这个consumerID写入到ZK对应消息分区的临时节点上

消息 消费进度Offset记录

在消费者对指定消息分区进行消息消费的过程中,需要定时地将分区消息的消费进度Offset记录到Zookpeer上,以便在该消费者进行重启或者其他消费者重新接管这个消息分区的消息消费后,能够从从之前的进度开始消费,Offset在ZK中由一个专门的节点进行计入

消费者注册

消费者服务器在初始化启动的时候加入消费者分组的步骤如下:

(1)注册到消费者分组,每个消费者服务器启动的时候,都要到ZK的指定节点下创建一个属于自己的消费者节点,完成节点的创建之后,消费者就会将自己订阅的Topic信息写入这个临时节点

(2)对消费者分组中的消费者的变化注册监听,每个消费者都需要关注所属的消费者分组中其他消费者服务器的变化情况,也就是说对/consumer/[group_id]/ids节点注册子节点变化的Watcher监听,一旦发现了消费者新增或者减少,就会触发消费的负载均衡

(3)对Broker服务器变化注册监听,如果发现了Broker服务器列表发生了变化,那么就根据具体情况来决定是否需要进行消费者负载均衡

(4)进行消费者负载均衡。为了让同一个Topic下不同分区的消息尽量均衡地被多个 消费者 消费而进行 消费者 与 消息 分区分配的过程,通常,对于一个消费者分组,如果组内的消费者服务器发生变更或Broker服务器发生变更,会发出消费者负载均衡

14. Kafaka的多分区以及多副本机制有什么用?

Kafaka为分区Partition引入了多副本Replica机制,愤分区中的多个副本之间会有一个leader,其他副本称为是follower,发送的消息被发送到leader副本,然后follower副本才能从leader副本中拉取消息进行同步。你可以这样理解:

  • 生产者和消费者只和leader副本交互,而其他副本只是leader副本的拷贝,他们的存在只是为了保证消息存储的安全性,当leader副本发生故障的时候,会从follower中选举出一个leader,但是follower中如果有和leader同步程度达不到要求的参加不了leader的选举

15. Kafka如何保证消息的消费顺序?

在使用消息队列的过程中经常有业务需要严格保证消息的消费顺序的,比如说同时发布了两个消息,有:

  • 更改用户会员登记
  • 根据会员登记计算订单的价格

假设这两条消息的消费顺序不一样,那么就会造成最终结果的不同

Kafka中,一次消息的投递的关联是这样的:当发送一个消息的时候,这时候会发送一个Partition到指定的Topic中,于是在一次性发送多条消息的时候,实际上就是给特定的Topic发送多个Partition

每次添加消息到Partition的分区的时候都会采用尾插法,Kafka只能够为我们保证Partition中的消息是有序的,消息在被追加到Partition(分区)的时候都会分配一个特定的偏移量offset,Kafka通过偏移量来保证消息在分片内的顺序性,而无法保证Topic的有序性

于是,我们就有一个很简单的办法,就是一个Topic只对应一个Partition,但是这样破坏了Kafka的初衷

其次,我们可以人为规定将Partition发送到哪个Partition上,我们可以指定topicpartitionkeydata,于是,我们可以在发送消息的时候,指定你的这个key/Partition

16. Kafka如何保证消息不丢失?

如果生产者丢失消息了怎么办?

生产者调用send方法发送消息之后,消息可能因为网络问题没有发送过去,所以不能在调用send()之后就默认消息发送成功了,为了确保消息发送是成功的,要判断发送的结果,要注意,在Java中调用send()方法实际上是异步的操作,也就是说我们用get()会被阻塞,但是我们可以针对这个future添加一个callbackfunction来实现

另外可以推荐为Producer的重试次数设置一个比较合理的值,一般来说是3,但是为了保证消息不丢失的话,这里一般会设置大一些

如果消费者丢失消息了怎么办?

消息在被追加到Partition的时候都会分配一个特定的offset,这个offset表示你这个consumer消费到的分区所在的位置,当消费者拉取到了这个分区的某个消息之后,消费者自动提交了offset,自动提交就会有一个问题,不能保证可靠消费,但是关闭自动提交,非原子性的操作又会造成重复消费

如果kafka丢失消息了怎么办?

我们知道Kafka为分区引入了多副本的机制,分区中有两个角色,分别是leader和follwer,接收消息和处理消息都是在leader副本上实现的,然后follower副本不断从leader上拉取最新的数据。

假设有这样的情况,假设leader副本所在的broker突然挂掉了,那么这就意味着我们就要从follower上重新选出一个leader,但是leader上的数据还要一些没有被follower副本的同步的话,就会造成丢失

解决方法如下:

设置acks=all:通过设置这个参数,当我们配置acks=all表示只有所有的副本全部收到消息完成同步的时候,这时候生产者才会收到一个ack

设置replication.factor >=3:通过设置这个参数,这样可以保证至少有3个副本是完整同步的

设置min.insync.replicas>1:这样的配置代表消息至少要被写入到2个副本才算被成功发送

为了保证整个Kafka服务的高可用,必须要确保replication.factor > min.insync.replicas

反例:replication.factor <= min.insybc.replicas,如果我们只是有一个副本挂掉的话,整个分区就无法工作了

17. Kafka如何保证消息不重复消费?

Kafka出现消息重复消费的原因

  • 服务端已经消费的数据没有提交offset
  • Kafka侧因为服务端处理业务时间长或者网络连接等问题,将服务判断为死亡,触发了分区rebalance

解决方案:

  • 将消费消息做一个幂等校验,比如说Redis中的set,MySQL主键等天然幂等功能
  • 关闭自动提交,然后在代码中手动提交offset

如果处理完消息再提交

非原子性操作,容易造成消费的重复消费

拉取到消息就提交:会有消息丢失的风险,允许消息延时的场景,一般会采用这种方式,然后通过定时任务在业务不繁忙的时候左数据兜底


文章作者: 穿山甲
版权声明: 本博客所有文章除特別声明外,均采用 CC BY 4.0 许可协议。转载请注明来源 穿山甲 !
  目录