0%

消息队列

1. 消息队列的优缺点?

  • 异步处理 - 相比于传统的串行、并行方式,消息队列使服务端能够异步处理请求,提高了系统吞吐量。
  • 应用解耦 - 系统间通过消息通信,不用关心其他系统的处理。
  • 流量削锋 - 可以通过消息队列长度控制请求量;可以缓解短时间内的高并发请求。
  • 日志处理 - 解决大量日志传输。
  • 消息通讯 - 消息队列一般都内置了高效的通信机制,因此也可以用在纯的消息通讯。比如实现点对点消息队列,或者聊天室等。

解耦、异步、削峰是什么?

  • 解耦:A 系统发送数据到 BCD 三个系统,通过接口调用发送。如果 E 系统也要这个数据呢?那如果 C 系统现在不需要了呢?A 系统负责人几乎崩溃…A 系统跟其它各种乱七八糟的系统严重耦合,A 系统产生一条比较关键的数据,很多系统都需要 A 系统将这个数据发送过来。如果使用 MQ,A 系统产生一条数据,发送到 MQ 里面去,哪个系统需要数据自己去 MQ 里面消费。如果新系统需要数据,直接从 MQ 里消费即可;如果某个系统不需要这条数据了,就取消对 MQ 消息的消费即可。这样下来,A 系统压根儿不需要去考虑要给谁发送数据,不需要维护这个代码,也不需要考虑人家是否调用成功、失败超时等情况。

-** 异步:**A 系统接收一个请求,需要在自己本地写库,还需要在 BCD 三个系统写库,自己本地写库要 3ms,BCD 三个系统分别写库要 300ms、450ms、200ms。最终请求总延时是 3 + 300 + 450 + 200 = 953ms,接近 1s,用户感觉搞个什么东西,慢死了慢死了。用户通过浏览器发起请求。如果使用 MQ,那么 A 系统连续发送 3 条消息到 MQ 队列中,假如耗时 5ms,A 系统从接受一个请求到返回响应给用户,总时长是 3 + 5 = 8ms

  • 削峰:将请求统统放到消息队列里去,服务器从消息队列中取任务,这样可以减少高峰时期对服务器压力。

缺点

  • 系统可用性降低:加入个消息队列进去,那消息队列宕机了了,你的系统也就无法使用。因此,系统可用性会降低;
  • 系统复杂度提高:加入了消息队列,要多考虑很多方面的问题,比如如何保证消息不被重复消费、如何保证消息可靠性传输等。因此,需要考虑的东西更多,复杂性增大。

2. RabbitMQ怎么保证消息的可靠性?

RabbitMQ消息的传输会经过三个阶段从生成者到Rabbit服务器的队列再到消费者,因此我们在三个阶段采取来保证消息的可靠性:

  • 生产者丢失数据:在消息还未到达RabbitMQ服务器上时,未防止RabbitMQ服务器宕机导致的消息丢失,我们可以将信道设置成 confirm 模式(发送方确认模式)一旦channel进入confirm模式,所有在该信道上发布的消息都将会被指派一个唯一的ID(从1开始),一旦消息被投递到所有匹配的队列之后;rabbitMQ就会发送一个ACK给生产者(包含消息的唯一ID),这就使得生产者知道消息已经正确到达目的队列了;如果rabbitMQ没能处理该消息,则会发送一个Nack消息给你,你可以进行重试操作。 >- 另外一个方案是走事务机制,送消息前,开启事务(channel.txSelect()),然后发送消息,如果发送过程中出现什么异常,事务就会回滚(channel.txRollback()),如果发送成功则提交事务(channel.txCommit())。然而,这种方式有个缺点:吞吐量下降;

  • 消息队列丢数据:开启RabbitMQ的持久化策略,将消息持久化进磁盘。和confirm机制配合使用,你可以在消息持久化磁盘后,再给生产者发送一个Ack信号。这样,如果消息持久化磁盘之前,rabbitMQ阵亡了,那么生产者收不到Ack信号,生产者会自动重发。 >操作和简单:1、将queue的持久化标识durable设置为true,则代表是一个持久的队列;2、 发送消息的时候将deliveryMode=2

  • 消费者丢失数据:消费者丢数据一般是因为采用了自动确认消息模式,我们将它改为手动确认即可

3. 交换机Exchange有哪些模式

  • fanout交换器:它会把所有发送到该交换器的消息路由到所有与该交换器绑定的队列中;
  • direct交换器:direct类型的交换器路由规则很简单,它会把消息路由到那些Routing_Key完全匹配的队列中;
  • topic交换器:匹配规则比direct更灵活。
  • headers交换器:根据发送消息内容的headers属性进⾏匹配(由于性能很差,不实⽤)

3. 消息基于什么传输?

由于TCP连接的创建和销毁开销较大,且并发数受系统资源限制,会造成性能瓶颈。 RabbitMQ使⽤信道的方式来传输数据。信道是建⽴在真实的TCP连接内的虚拟连接,且每条 TCP连接上的信道数量没有限制。

  • RabbitMQ采用类似非阻塞IO(Non-blocking I/O)做法,选择TCP连接复⽤,不仅可以减少性能开销,同时也便于管理。
  • 每个线程把持一个信道,所以信道复用了Connection的TCP连接。同时RabbitMQ可以确保每个线程的私密性,就像拥有独立的连接一样。

4. 如何避免消息重复投递或重复消费?

消费端处理消息的业务逻辑保持幂等性。只要保持幂等性,不管来多少条重复消息,最后处理的结果都一样。如保证每条消息都有唯一编号且保证消息处理成功与去重表的日志同时出现。利用一张日志表来记录已经处理成功的消息的 ID,如果新到的消息 ID 已经在日志表中,那么就不再处理这条消息。

5. 如何保证消息的顺序

  • (1)保证生产者 - MQServer - 消费者是一对一对一的关系。 缺陷:
    • 并行度就会成为消息系统的瓶颈(吞吐量不够)
    • 更多的异常处理,比如:只要消费端出现问题,就会导致整个处理流程阻塞,我们不得不花费更多的精力来解决阻塞的问题。
  • (2)通过合理的设计或者将问题分解来规避。
    • 不关注乱序的应用实际大量存在
    • 队列无序并不意味着消息无序 所以从业务层面来保证消息的顺序而不仅仅是依赖于消息系统,是一种更合理的方式。

6. 如果有一个消息一直得不到消费,比如客户端宕机且设置手动确认,然后一直未手动确认,此时怎么办?

在RabbitMQ中,当消费者未正常消费消息,由于未收到来自消费者的确认,就会进行重试策略,但重试策略必须有一个次数,不然一直有这样的消息得不到消费,一直占用队列资源就造成吞吐量的低下。

因此当一个消息在队列中达到一定的重试次数后仍然无法被成功处理,或者因为某些原因(如TTL过期、队列达到最大长度等)被队列拒绝返回nack时,RabbitMQ可以将这些消息发送到另一个交换机(Dead Letter Exchange),进而路由到一个专门的队列(Dead Letter Queue, DLQ),即死信队列

DLQ中的消息代表了那些经过多次尝试仍无法被成功处理的消息。你可以有一个专门的消费者来监听DLQ,并对这些消息进行进一步的处理,例如记录日志、发送告警或者进行人工干预。

7. RabbitMQ基本概念

  • Broker: 简单来说就是消息队列服务器实体
  • Exchange: 消息交换机,它指定消息按什么规则,路由到哪个队列
  • Queue: 消息队列载体,每个消息都会被投入到一个或多个队列
  • Binding: 绑定,它的作用就是把exchange和queue按照路由规则绑定起来
  • Routing Key: 路由关键字,exchange根据这个关键字进行消息投递
  • VHost: vhost 可以理解为虚拟 broker ,即 mini-RabbitMQ server。其内部均含有独立的 queue、exchange 和 binding 等,但最最重要的是,其拥有独立的权限系统,可以做到 vhost 范围的用户控制。当然,从 RabbitMQ 的全局角度,vhost 可以作为不同权限隔离的手段(一个典型的例子就是不同的应用可以跑在不同的 vhost 中)。
  • Producer: 消息生产者,就是投递消息的程序
  • Consumer: 消息消费者,就是接受消息的程序
  • Channel: 消息通道,在客户端的每个连接里,可建立多个channel,每个channel代表一个会话任务

8. 什么原因会导致 MQ 消息积压?

MQ消息积压是指生产者发送的消息在Broker端大量堆积,无法被消费者及时消费,导致业务功能无法正常使用。以下是一些导致MQ消息积压的常见原因:

-** 流量变大而服务器配置偏低:当消息的产生速度大于消费速度时,如果RabbitMQ服务器配置较低,就可能导致消息积压。 - 消费者故障:如果消费者出现宕机或网络问题,导致无法及时消费消息,消息会持续堆积。 - 程序逻辑设计问题:如果生产者持续生产消息,但消费者由于某种原因(如处理逻辑耗时过长)消费能力不足,也会造成消息积压。 - 新上线的消费者功能存在BUG:新上线的消费者功能如果有缺陷,可能导致消息无法被正常消费,从而引发消息堆积。 - 配置不合理:消息队列的容量设置过小或消费者的线程数设置过少,都可能导致消息积压。 - 生产者推送大量消息**:在特定场景下,如大促活动,生产者可能短时间内推送大量消息至Broker,如果消费者的消费能力不足以应对这种突发流量,也会导致消息堆积。

了解决MQ消息积压问题,可以采取以下策略:

  • 扩容:纵向扩容,增加服务器资源,如内存和CPU;横向扩容,将单机改为集群模式,增加集群节点,并增加消费者数量。
  • 优化程序逻辑:确保生产者和消费者的逻辑设计合理,避免生产者过快生产消息或消费者处理消息过慢。
  • 监控和报警:建立有效的监控和报警机制,及时发现并解决消息积压问题。

8. 有几百万消息持续积压几小时,怎么办?

消息积压处理办法:临时紧急扩容

  • 1.先修复 consumer 的问题,确保其恢复消费速度,然后将现有 cnosumer 都停掉。
  • 2.新建一个topic,临时建立好原先 10 倍的 queue 数量。
  • 3.然后写一个临时的分发数据的 consumer 程序,这个程序部署上去消费积压的数据,消费之后不做耗时的处理,直接均匀轮询写入临时建立好的 10 倍数量的 queue。
  • 4.接着临时征用 10 倍的机器来部署 consumer,每一批 consumer 消费一个临时 queue 的数据。这种做法相当于是临时将 queue 资源和 consumer 资源扩大 10 倍,以正常的 10 倍速度来消费数据
  • 5.等快速消费完积压数据之后,得恢复原先部署的架构,重新用原先的 consumer 机器来消费消息。

9 与其他消息队列的区别比较?

ctiveMQ:的社区算是比较成熟,但是较目前来说,ActiveMQ 的性能比较差,而且版本迭代很慢,不推荐使用。

RabbitMQ:在吞吐量方面虽然稍逊于 Kafka 和 RocketMQ ,但是由于它基于 erlang 开发,所以并发能力很强,性能极其好,延时很低,达到微秒级。但是也因为 RabbitMQ 基于 erlang 开发,所以国内很少有公司有实力做erlang源码级别的研究和定制。如果业务场景对并发量要求不是太高(十万级、百万级),那这四种消息队列中,RabbitMQ 一定是你的首选。如果是大数据领域的实时计算、日志采集等场景,用 Kafka 是业内标准的,绝对没问题,社区活跃度很高,绝对不会黄,何况几乎是全世界这个领域的事实性规范。

RocketMQ:阿里出品,Java 系开源项目,源代码我们可以直接阅读,然后可以定制自己公司的 MQ,并且 RocketMQ 有阿里巴巴的实际业务场景的实战考验。RocketMQ 社区活跃度相对较为一般,不过也还可以,文档相对来说简单一些,然后接口这块不是按照标准 JMS 规范走的有些系统要迁移需要修改大量代码。还有就是阿里出台的技术,你得做好这个技术万一被抛弃,社区黄掉的风险,那如果你们公司有技术实力我觉得用RocketMQ 挺好的。

kafka:特点其实很明显,就是仅仅提供较少的核心功能,但是提供超高的吞吐量,ms 级的延迟,极高的可用性以及可靠性,而且分布式可以任意扩展。同时 kafka 最好是支撑较少的 topic 数量即可,保证其超高吞吐量。kafka 唯一的一点劣势是有可能消息重复消费,那么对数据准确性会造成极其轻微的影响,在大数据领域中以及日志采集中,这点轻微影响可以忽略这个特性天然适合大数据实时计算以及日志收集。

10 RabbitMQ 上的⼀个 queue 中存放的 message 是否有数量限制?

可以认为是没有限制,因为限制取决于机器的内存,但是消息过多会导致处理效率的下降。