这周忙于部门的技术分享会,整理了一份消息队列中间件的技术知识文档
消息队列中间件
队列:先进先出
消息队列:把要传输的数据放在队列里
为什么使用MQ?
应用解耦
在这个场景中,A 系统跟其它各种乱七八糟的系统严重耦合,A 系统产生一条比较关键的数据,很多系统都需要 A 系统将这个数据发送过来。A 系统要时时刻刻考虑 BCDE 四个系统如果挂了该咋办?要不要重发,要不要把消息存起来?
一个系统或者一个模块,调用了多个系统或者模块,互相之间的调用很复杂,维护起来很麻烦。但是其实这个调用是不需要直接同步调用接口的,如果用 MQ 给它异步化解耦,也是可以的,你就需要去考虑在你的项目里,是不是可以运用这个 MQ 去进行系统的解耦。
总结:通过一个 MQ,Pub/Sub 发布订阅消息这么一个模型,A 系统就跟其它系统彻底解耦了。
异步处理
再来看一个场景,A 系统接收一个请求,需要在自己本地写库,还需要在 BCD 三个系统写库,自己本地写库要 3ms,BCD 三个系统分别写库要 300ms、450ms、200ms。最终请求总延时是 3 + 300 + 450 + 200 = 953ms,接近 1s
一般要求是每个请求都必须在 200 ms 以内完成,对用户几乎是无感知的。
如果使用 MQ,那么 A 系统连续发送 3 条消息到 MQ 队列中,假如耗时 5ms,A 系统从接受一个请求到返回响应给用户,总时长是 3 + 5 = 8ms,对于用户而言,其实感觉上就是点个按钮,8ms 以后就直接返回了
流量削峰
每天 0:00 到 12:00,系统风平浪静,每秒并发请求数量就 50 个。结果每次一到 12:00 ~ 13:00 ,每秒并发请求数量暴增,大量的请求涌入 MySQL,可能就直接把 MySQL 给打死了,导致系统崩溃。但是高峰期一过,到了下午的时候,就成了低峰期,对整个系统几乎没有任何的压力
如果使用 MQ,A 系统从 MQ 中慢慢拉取请求,每秒钟就拉取 2k 个请求,不要超过自己每秒能处理的最大请求数量就 ok,这样下来,哪怕是高峰期的时候,A 系统也绝对不会挂掉。只要高峰期一过,A 系统就会快速将积压的消息给解决掉。
使用MQ的劣势
系统可用性降低
系统引入的外部依赖越多,越容易挂掉。MQ 一挂,整套系统崩溃,你不就完了?如何保证消息队列的高可用?
系统复杂度提高
怎么保证消息没有重复消费?怎么处理消息丢失的情况?怎么保证消息传递的顺序性?
一致性问题
数据写入多个系统时需要保证一致性
订单系统、如果用户下单支付成功,需要更新订单状态、减少库存、赠送积分。
深入了解RabbitMQ
AMQP
AMQP协议
在OSI七层模型中属于应用层的协议(类似的还有HTTP、DNS、FTP)
一个AMQP服务器类似于邮件服务器,exchange类似于消息传输代理(email里的概念),message queue类似于邮箱。Binding定义了每一个传输代理中的消息路由表,发布者将消息发给特定的传输代理,然后传输代理将这些消息路由到邮箱中,消费者从这些邮箱中取出消息。
AMQP术语
Channel(信道): 在AMQP模型中,我们不需要通过建立太多的TCP连接来实现。假如针对每一个AMQP连接都建立一个TCP连接的话,会占用大量的系统资源。对此,AMQP提供了通道(channel)机制。即,共享一个TCP连接,可创建多个通道。
在多线程/进程的应用程序中正确做法是,对于每一个线程/进程,应分别建立一个通道,而不是多个线程/进程之间去共享一个通道。
Exchange(交换器):用于接受、分配消息;可以有好几种模式、相当于邮箱
Queue(队列):用于存储生产者的消息;
RoutingKey(路由键):用于把生成者的数据分配到交换器上;
BindingKey(绑定键):用于把交换器的消息绑定到队列上;
Broker(消息代理):消息代理会接收来自生产者(publishers/producers)生产的消息,并将它们路由(route,可以理解成按指定规则转发)给相应的消费者(consumers)手中。
VHOST(虚拟主机):为了在一个单独的代理上实现多个隔离的环境(用户、用户组、交换机、队列 等),AMQP 提供了一个虚拟主机(virtual hosts - vhosts)的概念。这跟 Web servers 虚拟主机概念非常相似,这为 AMQP 实体提供了完全隔离的环境。当连接被建立的时候,AMQP 客户端来指定使用哪个虚拟主机。
Exchange交换机
交换机是用来发送消息的 AMQP 实体。
交换机拿到一个消息之后将它路由给一个或零个队列。
它使用哪种路由算法是由交换机类型和绑定(Bindings)规则所决定的。
默认交换机
type:Default
自动命名的直交换机
直连交换机
type:Direct
Routing Key==Binding Key,严格匹配
扇形交换机
type:Fanout
把发送到该 Exchange 的消息路由到所有与它绑定的 Queue 中
主题交换机
type:Topic
Routing Key==Binding Key,模糊匹配
头交换机
type:Headers
根据发送的消息内容中的 headers 属性进行匹配
Queue队列
- 队列属性 todo
- 队列创建 todo
- 队列持久化 todo
Consumer消费者
- 将消息投递给应用(push 模型)
- 应用主动获取消息(pull 模型)
消息机制
1)自动确认模式:当消息代理(broker)将消息发送给应用后立即删除。(使用 AMQP 方法:basic.deliver 或 basic.get-ok)
2)显式确认模式:待应用(application)发送一个确认回执(acknowledgement)后再删除消息。(使用 AMQP 方法:basic.ack)
RabbitMQ
使用MQ可能遇到的各种情况
消息丢失怎么办?
发生在哪里,出现问题如何去预防排查解决。
Producer丢失
就是生产者发送数据之前开启 RabbitMQ 事务
channel.txSelect,然后发送消息,如果消息没有成功被 RabbitMQ 接收到,那么生产者会收到异常报错,此时就可以回滚事务channel.txRollback,然后重试发送消息;如果收到了消息,那么可以提交事务channel.txCommit。开启
confirm模式,在生产者那里设置开启confirm模式之后,你每次写的消息都会分配一个唯一的 id,然后如果写入了 RabbitMQ 中,RabbitMQ 会给你回传一个ack消息,告诉你说这个消息 ok 了。如果 RabbitMQ 没能处理这个消息,会回调你的一个nack接口,告诉你这个消息接收失败,你可以重试。MQ内存中丢失
开启 RabbitMQ 的持久化
注意,哪怕是你给 RabbitMQ 开启了持久化机制,也有一种可能,就是这个消息写到了 RabbitMQ 中,但是还没来得及持久化到磁盘上,结果不巧,此时 RabbitMQ 挂了,就会导致内存里的一点点数据丢失。
所以,持久化可以跟生产者那边的
confirm机制配合起来,只有消息被持久化到磁盘之后,才会通知生产者ack了,所以哪怕是在持久化到磁盘之前,RabbitMQ 挂了,数据丢了,生产者收不到ack,你也是可以自己重发的。Comsumer丢失
- 关闭自动ack,开启应用系统发送ack回执。
大量消息积压如何处理?
仅仅只修复consumer重新消费可能需要几小时
- 修复consumer
- 建立多个consumer进程去处理queue积压的数据
- 恢复原有部署
- 讨论:假如出现死循环Producer,也要去考虑是否是生产者的问题。
消息过期失效?
- RabbitMQ可以设置TTL过期时间
- 在用户低峰期将过期的数据查出后重新写入MQ
如何保证消息的顺序性?
一个queue,多个consumer
- 每个consumer对应一个queue,consumer可以用队列排序
如何保证消息不被重复消费?即保证数据幂等性
结合业务,多个层面控制
- 设置每个请求的全局id
- 数据库设置唯一键
- 先对数据校验再做操作
会后复习资料
讨论
要保证库存一致性的时候,即库存商品不会超买超卖,MQ很难做到这一点。MQ不适用于秒杀业务
当消费端处于unack时,消费端会锁定队列中前3(默认值)个的消息。
$ch->setPrefetchCount($count); //设定最大unacked消息的条数,默认为3;值得注意的是,设置为0时表示无限制
消息不被重复消费:
- 使用uuid保证请求唯一性
- Redis天然的幂等性
关于Channel信道:
在AMQP模型中,我们不需要通过建立太多的TCP连接来实现。假如针对每一个AMQP连接都建立一个TCP连接的话,会占用大量的系统资源。对此,AMQP提供了通道(channel)机制。即,共享一个TCP连接,可创建多个通道。
关于vhost虚拟主机:
为了在一个单独的代理上实现多个隔离的环境(用户、用户组、交换机、队列 等)
关于消息丢失的三种情况,生产者发送消息丢失过程中使用confirm模式的解释
代码
consumer.php
1 | $conn = new AMQPConnection($config); |