0%

RabbitMQ

这周忙于部门的技术分享会,整理了一份消息队列中间件的技术知识文档

  1. 什么是消息队列(MQ)?
  2. 使用MQ的优劣势?
  3. RabbitMQ构成
  4. AMQP协议
  5. 深入了解RabbitMQ
  6. 代码

消息队列中间件

队列:先进先出

消息队列:把要传输的数据放在队列里

为什么使用MQ?

  • 应用解耦

    在这个场景中,A 系统跟其它各种乱七八糟的系统严重耦合,A 系统产生一条比较关键的数据,很多系统都需要 A 系统将这个数据发送过来。A 系统要时时刻刻考虑 BCDE 四个系统如果挂了该咋办?要不要重发,要不要把消息存起来?

    一个系统或者一个模块,调用了多个系统或者模块,互相之间的调用很复杂,维护起来很麻烦。但是其实这个调用是不需要直接同步调用接口的,如果用 MQ 给它异步化解耦,也是可以的,你就需要去考虑在你的项目里,是不是可以运用这个 MQ 去进行系统的解耦。

    总结:通过一个 MQ,Pub/Sub 发布订阅消息这么一个模型,A 系统就跟其它系统彻底解耦了。

  • 异步处理

    再来看一个场景,A 系统接收一个请求,需要在自己本地写库,还需要在 BCD 三个系统写库,自己本地写库要 3ms,BCD 三个系统分别写库要 300ms、450ms、200ms。最终请求总延时是 3 + 300 + 450 + 200 = 953ms,接近 1s

    一般要求是每个请求都必须在 200 ms 以内完成,对用户几乎是无感知的。

    如果使用 MQ,那么 A 系统连续发送 3 条消息到 MQ 队列中,假如耗时 5ms,A 系统从接受一个请求到返回响应给用户,总时长是 3 + 5 = 8ms,对于用户而言,其实感觉上就是点个按钮,8ms 以后就直接返回了

  • 流量削峰

    每天 0:00 到 12:00,系统风平浪静,每秒并发请求数量就 50 个。结果每次一到 12:00 ~ 13:00 ,每秒并发请求数量暴增,大量的请求涌入 MySQL,可能就直接把 MySQL 给打死了,导致系统崩溃。但是高峰期一过,到了下午的时候,就成了低峰期,对整个系统几乎没有任何的压力

    如果使用 MQ,A 系统从 MQ 中慢慢拉取请求,每秒钟就拉取 2k 个请求,不要超过自己每秒能处理的最大请求数量就 ok,这样下来,哪怕是高峰期的时候,A 系统也绝对不会挂掉。只要高峰期一过,A 系统就会快速将积压的消息给解决掉。

使用MQ的劣势

  • 系统可用性降低

    系统引入的外部依赖越多,越容易挂掉。MQ 一挂,整套系统崩溃,你不就完了?如何保证消息队列的高可用?

  • 系统复杂度提高

    怎么保证消息没有重复消费?怎么处理消息丢失的情况?怎么保证消息传递的顺序性?

  • 一致性问题

    数据写入多个系统时需要保证一致性

    订单系统、如果用户下单支付成功,需要更新订单状态、减少库存、赠送积分。

深入了解RabbitMQ

AMQP

AMQP协议

  • 在OSI七层模型中属于应用层的协议(类似的还有HTTP、DNS、FTP)

  • 一个AMQP服务器类似于邮件服务器,exchange类似于消息传输代理(email里的概念),message queue类似于邮箱。Binding定义了每一个传输代理中的消息路由表,发布者将消息发给特定的传输代理,然后传输代理将这些消息路由到邮箱中,消费者从这些邮箱中取出消息。

  • AMQP术语

    • Channel(信道): 在AMQP模型中,我们不需要通过建立太多的TCP连接来实现。假如针对每一个AMQP连接都建立一个TCP连接的话,会占用大量的系统资源。对此,AMQP提供了通道(channel)机制。即,共享一个TCP连接,可创建多个通道。

      ​ 在多线程/进程的应用程序中正确做法是,对于每一个线程/进程,应分别建立一个通道,而不是多个线程/进程之间去共享一个通道。

    • Exchange(交换器):用于接受、分配消息;可以有好几种模式、相当于邮箱

    • Queue(队列):用于存储生产者的消息;

    • RoutingKey(路由键):用于把生成者的数据分配到交换器上;

    • BindingKey(绑定键):用于把交换器的消息绑定到队列上;

    • Broker(消息代理):消息代理会接收来自生产者(publishers/producers)生产的消息,并将它们路由(route,可以理解成按指定规则转发)给相应的消费者(consumers)手中。

    • VHOST(虚拟主机):为了在一个单独的代理上实现多个隔离的环境(用户、用户组、交换机、队列 等),AMQP 提供了一个虚拟主机(virtual hosts - vhosts)的概念。这跟 Web servers 虚拟主机概念非常相似,这为 AMQP 实体提供了完全隔离的环境。当连接被建立的时候,AMQP 客户端来指定使用哪个虚拟主机。

Exchange交换机

​ 交换机是用来发送消息的 AMQP 实体。

​ 交换机拿到一个消息之后将它路由给一个或零个队列。

​ 它使用哪种路由算法是由交换机类型绑定(Bindings)规则所决定的。

默认交换机

​ type:Default

​ 自动命名的直交换机

直连交换机

​ type:Direct

​ Routing Key==Binding Key,严格匹配

扇形交换机

​ type:Fanout

​ 把发送到该 Exchange 的消息路由到所有与它绑定的 Queue 中

主题交换机

​ type:Topic

​ Routing Key==Binding Key,模糊匹配

头交换机

​ type:Headers

​ 根据发送的消息内容中的 headers 属性进行匹配

Queue队列

  • 队列属性 todo
  • 队列创建 todo
  • 队列持久化 todo

Consumer消费者

  • 将消息投递给应用(push 模型)
  • 应用主动获取消息(pull 模型)

消息机制

1)自动确认模式:当消息代理(broker)将消息发送给应用后立即删除。(使用 AMQP 方法:basic.deliver 或 basic.get-ok)

2)显式确认模式:待应用(application)发送一个确认回执(acknowledgement)后再删除消息。(使用 AMQP 方法:basic.ack)

RabbitMQ

使用MQ可能遇到的各种情况

消息丢失怎么办?

发生在哪里,出现问题如何去预防排查解决。

  • Producer丢失

    就是生产者发送数据之前开启 RabbitMQ 事务channel.txSelect,然后发送消息,如果消息没有成功被 RabbitMQ 接收到,那么生产者会收到异常报错,此时就可以回滚事务channel.txRollback,然后重试发送消息;如果收到了消息,那么可以提交事务channel.txCommit

    开启 confirm 模式,在生产者那里设置开启 confirm 模式之后,你每次写的消息都会分配一个唯一的 id,然后如果写入了 RabbitMQ 中,RabbitMQ 会给你回传一个 ack 消息,告诉你说这个消息 ok 了。如果 RabbitMQ 没能处理这个消息,会回调你的一个 nack 接口,告诉你这个消息接收失败,你可以重试。

  • MQ内存中丢失

    开启 RabbitMQ 的持久化

    注意,哪怕是你给 RabbitMQ 开启了持久化机制,也有一种可能,就是这个消息写到了 RabbitMQ 中,但是还没来得及持久化到磁盘上,结果不巧,此时 RabbitMQ 挂了,就会导致内存里的一点点数据丢失。

    所以,持久化可以跟生产者那边的 confirm 机制配合起来,只有消息被持久化到磁盘之后,才会通知生产者 ack 了,所以哪怕是在持久化到磁盘之前,RabbitMQ 挂了,数据丢了,生产者收不到 ack,你也是可以自己重发的。

  • Comsumer丢失

    • 关闭自动ack,开启应用系统发送ack回执。

大量消息积压如何处理?

仅仅只修复consumer重新消费可能需要几小时

  1. 修复consumer
  2. 建立多个consumer进程去处理queue积压的数据
  3. 恢复原有部署
  • 讨论:假如出现死循环Producer,也要去考虑是否是生产者的问题。

消息过期失效?

  • RabbitMQ可以设置TTL过期时间
  • 在用户低峰期将过期的数据查出后重新写入MQ

如何保证消息的顺序性?

一个queue,多个consumer

  • 每个consumer对应一个queue,consumer可以用队列排序

如何保证消息不被重复消费?即保证数据幂等性

结合业务,多个层面控制

  • 设置每个请求的全局id
  • 数据库设置唯一键
  • 先对数据校验再做操作

会后复习资料

讨论

  • 要保证库存一致性的时候,即库存商品不会超买超卖,MQ很难做到这一点。MQ不适用于秒杀业务

  • 当消费端处于unack时,消费端会锁定队列中前3(默认值)个的消息。

    • $ch->setPrefetchCount($count); //设定最大unacked消息的条数,默认为3;值得注意的是,设置为0时表示无限制
  • 消息不被重复消费:

    • 使用uuid保证请求唯一性
    • Redis天然的幂等性
  • 关于Channel信道:

    在AMQP模型中,我们不需要通过建立太多的TCP连接来实现。假如针对每一个AMQP连接都建立一个TCP连接的话,会占用大量的系统资源。对此,AMQP提供了通道(channel)机制。即,共享一个TCP连接,可创建多个通道。

  • 关于vhost虚拟主机:

    为了在一个单独的代理上实现多个隔离的环境(用户、用户组、交换机、队列 等)

  • 关于消息丢失的三种情况,生产者发送消息丢失过程中使用confirm模式的解释

    RabbitMQ系列四

代码

consumer.php

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
$conn = new AMQPConnection($config);
try {
$conn->connect();
} catch (\AMQPConnectionException $AMQPConnectionException) {
exit('connect failed');
}

//创建一个channel
$ch = new AMQPChannel($conn);
$ch.basic_qos(prefetch_count=1)

//创建一个交换机
$ex = new AMQPExchange($ch);

//声明路由键
$routingKey = 'key_1';

//声明交换机名称
$exchangeName = 'exchange_1';

//设置交换机名称
$ex->setName($exchangeName);

//设置交换机类型
//AMQP_EX_TYPE_DIRECT:直连交换机
//AMQP_EX_TYPE_FANOUT:扇形交换机
//AMQP_EX_TYPE_HEADERS:头交换机
//AMQP_EX_TYPE_TOPIC:主题交换机
$ex->setType(AMQP_EX_TYPE_DIRECT);

//设置交换机持久
$ex->setFlags(AMQP_DURABLE);

//声明交换机
$ex->declareExchange();

//创建一个消息队列
$q = new AMQPQueue($ch);

//设置队列名称
$q->setName('queue_1');

//设置队列持久
$q->setFlags(AMQP_DURABLE);

//声明消息队列
$q->declareQueue();

//交换机和队列通过$routingKey进行绑定
$q->bind($ex->getName(), $routingKey);


//接收消息并进行处理的回调方法
function receive($envelope, $queue)
{
//休眠两秒,
sleep(2);
//echo消息内容
echo $envelope->getBody() . "\n";
//显式确认,队列收到消费者显式确认后,会删除该消息
echo 'without ack';
//$queue->ack($envelope->getDeliveryTag());
}

//设置消息队列消费者回调方法,并进行阻塞
$q->consume("receive");
//$q->consume("receive", AMQP_AUTOACK);//隐式确认,不推荐