首页
消息队列
使用场景
- 异步处理减少响应时间
- 填谷削峰
- 系统解耦
消息丢失
可能发生在生产者端、MQ端、消费者端,总结来说是生产者保证消息最少成功发送一次,MQ保证消息落盘后才返回成功,MQ保证消费者至少成功消费一次。
生产者丢消息
应该选择有回调方式,即不选择单向发送,同步发送不会出现丢消息问题(但有网络可靠性问题),在有本地事务的场景下应该使用事务消息(时效性严格场景)或本地消息表方案。
本地消息表方案
- 保存消息至本地消息表,与本地事务一起提交;
- 发送异步消息,注册监听器;
- MQ回调通知发送结果,更新本地消息表消息状态;
- 定时检查消息状态,一段时间后如仍未收到MQ回调则执行重试。
- 如何保证有序发送:保存每条消息的前置消息ID,判断前置消息发送成功再发送。
有本地事务的场景下同步发送的问题
- 消息发送成功后提交本地事务:本地事务回滚后需要回滚消息,本地事务还未提交消息就已经被消费;
- 提交本地事务后发送消息:消息发送失败需要回滚本地事务。
网络可靠性问题
网络可靠性问题是容易被忽视的一环,网络超时可能导致生产者可能无法接受到来自MQ端的响应。则消息生产者会认为消息发送失败,但实际上MQ已经持久化了消息,如果因此回滚了本地事务则会造成严重影响。
- 消费消息时从业务角度判断当前消息是否是有效,实现难度较高;
- 去MQ端查询消息是否发送成功,MQ中间件可能不支持,且理论上的并不完全可靠,因为查询时可能消息还在持久化过程中;
- 最可靠方案:发送端通过重试保证至少成功发送一次,消费端通过幂等保证最多消费一次。
MQ丢消息
MQ接受到消息后先保存到内存,还没来得及保存到磁盘或同步给从节点即宕机,MQ可以保证消息不丢失,但是会损失性能。
- RocketMQ:选择同步刷盘。
- Kafka:可以通过调整配置保证。
- producer配置:ack=all,保证所有参与复制的broker(即所有partition的副本)都返回ack才视作发送成功;设置retries,生成者消息发送失败后重试;
- broker配置:replication.factor设置大于1,即要求每个partition至少有两个副本,保证消息冗余;min.insync.replicas设置大于1,要求每个partition的leader至少感知到一个副本存在才可用,要小于replication.factor值,否则一旦一个副本离线整个partition都不可用了。
消费者丢消息
消费者消费消息不能自动ack,否则消费者接受到消息后宕机,而MQ自动认为消费成功。
- RocketMQ:默认即是消费者手动ack。
- Kafka:调整消费者端配置,enable.auto.commit设为false。
消费模型
消费者数量要小于等于topic中队列(或分区)数量,集群模式下消费模型分为顺序消费和并发消费,并行消费允许多个消费者线程从同一个队列中拉取消息,顺序消费则是只允许一个。
-
RocketMQ:默认并发消费,每个Consumer会维护一个消费线程池,每个线程都可以从同一个消息队列拉取消息消费,互相不影响消费进度,顺序消费实现原理是拉取消息前向Broker申请独占锁;并发消费如果消费者拉取的消息堆积太多则会延迟拉取,顺序消费如果申请独占锁失败,也会延迟一段时间再次尝试。
-
kafka:一个consumer实例只对应一个worker线程,即天然就是顺序消费,如果要提高消费能力不推荐盲目增加partition数量的方式,因为会带来更大的网络开销。
重复消费
在所有消息系统中消费消息有三种模式:at-most-once(最多一次)、at-least-once(最少一次)和 exactly-only-once(精确仅一次),RocketMQ和Kafka都是保证at-lease-once,如果要保证消息不丢失则会由重复消费的可能,需要自行在业务层面保证消费的幂等性。
- RocketMQ通过ACK来确保消息至少被消费一次,即消费者消费完成后向服务器返回ACK。
- Kafka在Broker端会记录消息消费的offset,消费者消费完消息后,会定时提交已消费的offset。
顺序消息
顺序消息分为局部有序和全局有序,前提是保证有序的发送,同时消费者端一般也需要顺序消费。
- 局部有序:只要发送到同一个队列(kafka则是partition)中则是天然有序的,则在发送时指定发送到同一Topic下的同一个队列即能保证有序;
- 全局有序:只要设置Topic的队列为1即是全局有序,通常不推荐。
记录消费顺序 + 本地消息表
生产者发送消息时记录消费顺序,消费者并发消费消息,持久化消息到本地消息表后提交ack,之后按照消息的消费顺序进行异步消费即可。
消息积压
如果由于程序问题导致积压,使得topic内其他消息也无法被消费,则需要新增一个临时消费者,消费掉堆积的消息,转发到新的topic或保存到数据库等。
RocketMQ
- NameServer:为无状态的,每个实例相互独立,轻量级,支持横向拓展;
- Broker:RocketMQ实例,支持主从架构,启动后向所有NameServer注册,并维持长连接,每30秒发送一次心跳;
- Producer:从NameServer获取Broker地址,根据负载均衡策略选择一台发送消息;
- Consumer:从NameServer获取Broker地址,并主动拉取消息消费。
消息保存流程
MQ接收到消息,首先使用顺序IO的方式保存到commitlog文件,之后执行重投递,投递到ConsumeQueue文件和IndexFile文件。
- 每个topic对应多个ConsumeQueue文件(相当于kafka的partition),其不保存消息内容,只保存消息在commitlog中的偏移量、消息大小和消息tag的哈希值;
- IndexFile用于提供按key或时间查询功能,以时间戳命名,也不保存消息内容,只记录消息Key和在commitlog中偏移量的对应关系,使用哈希槽的设计。
事务消息
消息发送成功后才提交本地事务的问题是无法随本地事务一起回滚,所以需要事务消息,使用本地消息表解决方案也可以实现事务消息。
- 生产者向MQ发送‘半消息’,此时消息对消费者不可见;
- ‘半消息’发送成功后MQ回调生产者执行本地事务,本地事务执行完成后发送提交或回滚请求;
- 由于MQ和生成者都有故障的可能,所以MQ需要能向生成者确认本地事务的执行结果,一般将本地事务执行结果保存到数据库或分布式缓存中。
延迟消息
不支持任意时间的延时,只支持几个固定的延时等级,所有的延迟消息由producer发出之后,都会存放到同一个topic下(SCHEDULE_TOPIC_XXXX),不同的延迟级别会对应不同的队列序号,每个队列对应一个定时线程,定时处理到期的消息并重新投递到真正指定的topic下,此时消息才对于consumer端可见。
消费模式
- 集群模式:每条消息只能被同一个consumer组内一个consumer消费。
- 广播模式:每个consumer都会消费消息一次。
pull & push
RocketMQ并没有真正实现Push模式,而是对Pull模式进行一层包装,通过Pull不断轮询Broker获取消息,当不存在新消息时,Broker会挂起请求,直到有新消息产生,取消挂起,返回新消息。
- 为什么不支持Push? 因为broker段不知道consumer的实际消费能力,如果集群消费模式下,push大量消息到某个consumer,而该consumer无法消费,则会导致消息堆积,所以应该是consumer去主动pull。
Kafka
由多个broker组成,每个broker是一个节点;每个topic可以划分为多个 partition,每个partition可以存在于不同的broker上;每个partition又可以有多个replica,选举出一个leader,其他作为follower。
吞吐量高的原因
- 顺序读写硬盘:消息顺序写入每个partition文件,读取时维护offset顺序读取。
- Page Cache:通过mmap(内存映射文件)提高IO速度。
- 零拷贝:使用直接内存,让网卡直接读取文件数据。
- 并行:每个topic拆分为多个partition。
- 批量传输:支持批量操作,消息达到一定阈值才发送到broker,但这样也导致了存在一定延时。
再平衡
消费者集群中新增或删除消费者、topic组下新增topic、topic的分区调整,会触发再平衡,即重新分配分区到消费者。
- Round Robin:轮询式分配,会导致消费者承载的分区数不一致;
- Range:计算总分区数量,平均分配给消费者;
- Sticky:保证每个消费者消费的分区数量大致相同,同时已分配过的分区尽量分配给原消费者。
ZooKeeper作用
使用ZooKeeper作为分布式协调,2.8版本开始会支持使用Raft协议的Quorum。
- broker注册:broker启动后会在ZooKeeper上注册一个临时节点;
- topic注册:使用ZooKeeper保存topic下partition和broker的对应关系,使用其选举leader;
- consumer注册:消费者组需要在ZooKeeper上注册,记录消费者集合,以及分区和消费者的关系。
- Controller选举:使用ZooKeeper选举出一个broker作为Controller,负责跟Zookeeper进行交互,它负责管理整个Kafka集群中所有分区和副本的状态,其他broker监听Controller节点的数据变化。