这周忙于部门的技术分享会,整理了一份消息队列中间件的技术知识文档
消息队列中间件
队列:先进先出
消息队列:把要传输的数据放在队列里
为什么使用MQ?
应用解耦
在这个场景中,A 系统跟其它各种乱七八糟的系统严重耦合,A 系统产生一条比较关键的数据,很多系统都需要 A 系统将这个数据发送过来。A 系统要时时刻刻考虑 BCDE 四个系统如果挂了该咋办?要不要重发,要不要把消息存起来?
一个系统或者一个模块,调用了多个系统或者模块,互相之间的调用很复杂,维护起来很麻烦。但是其实这个调用是不需要直接同步调用接口的,如果用 MQ 给它异步化解耦,也是可以的,你就需要去考虑在你的项目里,是不是可以运用这个 MQ 去进行系统的解耦。
总结:通过一个 MQ,Pub/Sub 发布订阅消息这么一个模型,A 系统就跟其它系统彻底解耦了。
异步处理
再来看一个场景,A 系统接收一个请求,需要在自己本地写库,还需要在 BCD 三个系统写库,自己本地写库要 3ms,BCD 三个系统分别写库要 300ms、450ms、200ms。最终请求总延时是 3 + 300 + 450 + 200 = 953ms,接近 1s
一般要求是每个请求都必须在 200 ms 以内完成,对用户几乎是无感知的。
如果使用 MQ,那么 A 系统连续发送 3 条消息到 MQ 队列中,假如耗时 5ms,A 系统从接受一个请求到返回响应给用户,总时长是 3 + 5 = 8ms,对于用户而言,其实感觉上就是点个按钮,8ms 以后就直接返回了
流量削峰
每天 0:00 到 12:00,系统风平浪静,每秒并发请求数量就 50 个。结果每次一到 12:00 ~ 13:00 ,每秒并发请求数量暴增,大量的请求涌入 MySQL,可能就直接把 MySQL 给打死了,导致系统崩溃。但是高峰期一过,到了下午的时候,就成了低峰期,对整个系统几乎没有任何的压力
如果使用 MQ,A 系统从 MQ 中慢慢拉取请求,每秒钟就拉取 2k 个请求,不要超过自己每秒能处理的最大请求数量就 ok,这样下来,哪怕是高峰期的时候,A 系统也绝对不会挂掉。只要高峰期一过,A 系统就会快速将积压的消息给解决掉。
使用MQ的劣势
系统可用性降低
系统引入的外部依赖越多,越容易挂掉。MQ 一挂,整套系统崩溃,你不就完了?如何保证消息队列的高可用?
系统复杂度提高
怎么保证消息没有重复消费?怎么处理消息丢失的情况?怎么保证消息传递的顺序性?
一致性问题
数据写入多个系统时需要保证一致性
订单系统、如果用户下单支付成功,需要更新订单状态、减少库存、赠送积分。
深入了解RabbitMQ
AMQP
AMQP协议
在OSI七层模型中属于应用层的协议(类似的还有HTTP、DNS、FTP)
一个AMQP服务器类似于邮件服务器,exchange类似于消息传输代理(email里的概念),message queue类似于邮箱。Binding定义了每一个传输代理中的消息路由表,发布者将消息发给特定的传输代理,然后传输代理将这些消息路由到邮箱中,消费者从这些邮箱中取出消息。
AMQP术语
Channel(信道): 在AMQP模型中,我们不需要通过建立太多的TCP连接来实现。假如针对每一个AMQP连接都建立一个TCP连接的话,会占用大量的系统资源。对此,AMQP提供了通道(channel)机制。即,共享一个TCP连接,可创建多个通道。
在多线程/进程的应用程序中正确做法是,对于每一个线程/进程,应分别建立一个通道,而不是多个线程/进程之间去共享一个通道。
Exchange(交换器):用于接受、分配消息;可以有好几种模式、相当于邮箱
Queue(队列):用于存储生产者的消息;
RoutingKey(路由键):用于把生成者的数据分配到交换器上;
BindingKey(绑定键):用于把交换器的消息绑定到队列上;
Broker(消息代理):消息代理会接收来自生产者(publishers/producers)生产的消息,并将它们路由(route,可以理解成按指定规则转发)给相应的消费者(consumers)手中。
VHOST(虚拟主机):为了在一个单独的代理上实现多个隔离的环境(用户、用户组、交换机、队列 等),AMQP 提供了一个虚拟主机(virtual hosts - vhosts)的概念。这跟 Web servers 虚拟主机概念非常相似,这为 AMQP 实体提供了完全隔离的环境。当连接被建立的时候,AMQP 客户端来指定使用哪个虚拟主机。
Exchange交换机
交换机是用来发送消息的 AMQP 实体。
交换机拿到一个消息之后将它路由给一个或零个队列。
它使用哪种路由算法是由交换机类型和绑定(Bindings)规则所决定的。
默认交换机
type:Default
自动命名的直交换机
直连交换机
type:Direct
Routing Key==Binding Key,严格匹配
扇形交换机
type:Fanout
把发送到该 Exchange 的消息路由到所有与它绑定的 Queue 中
主题交换机
type:Topic
Routing Key==Binding Key,模糊匹配
头交换机
type:Headers
根据发送的消息内容中的 headers 属性进行匹配
Queue队列
- 队列属性 todo
- 队列创建 todo
- 队列持久化 todo
Consumer消费者
- 将消息投递给应用(push 模型)
- 应用主动获取消息(pull 模型)
消息机制
1)自动确认模式:当消息代理(broker)将消息发送给应用后立即删除。(使用 AMQP 方法:basic.deliver 或 basic.get-ok)
2)显式确认模式:待应用(application)发送一个确认回执(acknowledgement)后再删除消息。(使用 AMQP 方法:basic.ack)
RabbitMQ
使用MQ可能遇到的各种情况
消息丢失怎么办?
发生在哪里,出现问题如何去预防排查解决。
Producer丢失
就是生产者发送数据之前开启 RabbitMQ 事务
channel.txSelect
,然后发送消息,如果消息没有成功被 RabbitMQ 接收到,那么生产者会收到异常报错,此时就可以回滚事务channel.txRollback
,然后重试发送消息;如果收到了消息,那么可以提交事务channel.txCommit
。开启
confirm
模式,在生产者那里设置开启confirm
模式之后,你每次写的消息都会分配一个唯一的 id,然后如果写入了 RabbitMQ 中,RabbitMQ 会给你回传一个ack
消息,告诉你说这个消息 ok 了。如果 RabbitMQ 没能处理这个消息,会回调你的一个nack
接口,告诉你这个消息接收失败,你可以重试。MQ内存中丢失
开启 RabbitMQ 的持久化
注意,哪怕是你给 RabbitMQ 开启了持久化机制,也有一种可能,就是这个消息写到了 RabbitMQ 中,但是还没来得及持久化到磁盘上,结果不巧,此时 RabbitMQ 挂了,就会导致内存里的一点点数据丢失。
所以,持久化可以跟生产者那边的
confirm
机制配合起来,只有消息被持久化到磁盘之后,才会通知生产者ack
了,所以哪怕是在持久化到磁盘之前,RabbitMQ 挂了,数据丢了,生产者收不到ack
,你也是可以自己重发的。Comsumer丢失
- 关闭自动ack,开启应用系统发送ack回执。
大量消息积压如何处理?
仅仅只修复consumer重新消费可能需要几小时
- 修复consumer
- 建立多个consumer进程去处理queue积压的数据
- 恢复原有部署
- 讨论:假如出现死循环Producer,也要去考虑是否是生产者的问题。
消息过期失效?
- RabbitMQ可以设置TTL过期时间
- 在用户低峰期将过期的数据查出后重新写入MQ
如何保证消息的顺序性?
一个queue,多个consumer
- 每个consumer对应一个queue,consumer可以用队列排序
如何保证消息不被重复消费?即保证数据幂等性
结合业务,多个层面控制
- 设置每个请求的全局id
- 数据库设置唯一键
- 先对数据校验再做操作
会后复习资料
讨论
要保证库存一致性的时候,即库存商品不会超买超卖,MQ很难做到这一点。MQ不适用于秒杀业务
当消费端处于unack时,消费端会锁定队列中前3(默认值)个的消息。
$ch->setPrefetchCount($count); //设定最大unacked消息的条数,默认为3;值得注意的是,设置为0时表示无限制
消息不被重复消费:
- 使用uuid保证请求唯一性
- Redis天然的幂等性
关于Channel信道:
在AMQP模型中,我们不需要通过建立太多的TCP连接来实现。假如针对每一个AMQP连接都建立一个TCP连接的话,会占用大量的系统资源。对此,AMQP提供了通道(channel)机制。即,共享一个TCP连接,可创建多个通道。
关于vhost虚拟主机:
为了在一个单独的代理上实现多个隔离的环境(用户、用户组、交换机、队列 等)
关于消息丢失的三种情况,生产者发送消息丢失过程中使用confirm模式的解释
代码
consumer.php
1 | $conn = new AMQPConnection($config); |