消息队列_01消息队列入门

RabbitMQ 从入门到精通 (一):https://www.cnblogs.com/dwlovelife/p/10982735.html
Ubuntu安装RabbitMQ:https://www.cnblogs.com/kingsonfu/p/10579366.html

介绍

将 MQ 掰开了揉碎了来看,都是「一发一存一消费」,再直白点就是一个「转发器」。
生产者先将消息投递一个叫做「队列」的容器中,然后再从这个容器中取出消息,最后再转发给消费者,仅此而已。

上面这个图便是消息队列最原始的模型,它包含了两个关键词:消息和队列。
1、消息:就是要传输的数据,可以是最简单的文本字符串,也可以是自定义的复杂格式(只要能按预定格式解析出来即可)。
2、队列:大家应该再熟悉不过了,是一种先进先出数据结构。它是存放消息的容器,消息从队尾入队,从队头出队,入队即发消息的过程,出队即收消息的过程。

原始模型的进化

再看今天我们最常用的消息队列产品(RocketMQ、Kafka 等等),你会发现:它们都在最原始的消息模型上做了扩展,同时提出了一些新名词,比如:主题(topic)、分区(partition)、队列(queue)等等。
要彻底理解这些五花八门的新概念,我们化繁为简,先从消息模型的演进说起(道理好比:架构从来不是设计出来的,而是演进而来的

队列模型

最初的消息队列就是上一节讲的原始模型,它是一个严格意义上的队列(Queue)。消息按照什么顺序写进去,就按照什么顺序读出来。不过,队列没有 “读” 这个操作,读就是出队,从队头中 “删除” 这个消息。

这便是队列模型:它允许多个生产者往同一个队列发送消息。但是,如果有多个消费者,实际上是竞争的关系,也就是一条消息只能被其中一个消费者接收到,读完即被删除。

发布-订阅模型

如果需要将一份消息数据分发给多个消费者,并且每个消费者都要求收到全量的消息。很显然,队列模型无法满足这个需求。
一个可行的方案是:为每个消费者创建一个单独的队列,让生产者发送多份。这种做法比较笨,而且同一份数据会被复制多份,也很浪费空间。
为了解决这个问题,就演化出了另外一种消息模型:发布-订阅模型。
在发布-订阅模型中,存放消息的容器变成了 “主题”,订阅者在接收消息之前需要先 “订阅主题”。最终,每个订阅者都可以收到同一个主题的全量消息。
仔细对比下它和 “队列模式” 的异同:生产者就是发布者,队列就是主题,消费者就是订阅者,无本质区别。唯一的不同点在于:一份消息数据是否可以被多次消费。

透过模型看 MQ 的应用场景

目前,MQ 的应用场景非常多,大家能倒背如流的是:系统解耦、异步通信和流量削峰。除此之外,还有延迟通知、最终一致性保证、顺序消息、流式处理等等。
那到底是先有消息模型,还是先有应用场景呢?答案肯定是:先有应用场景(也就是先有问题),再有消息模型,因为消息模型只是解决方案的抽象而已。
MQ 经过 30 多年的发展,能从最原始的队列模型发展到今天百花齐放的各种消息中间件(平台级的解决方案),我觉得万变不离其宗,还是得益于:消息模型的适配性很广。
我们试着重新理解下消息队列的模型。它其实解决的是:生产者和消费者的通信问题。那它对比 RPC 有什么联系和区别呢?

通过对比,能很明显地看出两点差异:
1、引入 MQ 后,由之前的一次 RPC 变成了现在的两次 RPC,而且生产者只跟队列耦合,它根本无需知道消费者的存在。
2、多了一个中间节点「队列」进行消息转储,相当于将同步变成了异步。

再返过来思考 MQ 的所有应用场景,就不难理解 MQ 为什么适用了?因为这些应用场景无外乎都利用了上面两个特性。
举一个实际例子,比如说电商业务中最常见的「订单支付」场景:在订单支付成功后,需要更新订单状态、更新用户积分、通知商家有新订单、更新推荐系统中的用户画像等等。


引入 MQ 后,订单支付现在只需要关注它最重要的流程:更新订单状态即可。其他不重要的事情全部交给 MQ 来通知。这便是 MQ 解决的最核心的问题:系统解耦。
改造前订单系统依赖 3 个外部系统,改造后仅仅依赖 MQ,而且后续业务再扩展(比如:营销系统打算针对支付用户奖励优惠券),也不涉及订单系统的修改,从而保证了核心流程的稳定性,降低了维护成本。
这个改造还带来了另外一个好处:因为 MQ 的引入,更新用户积分、通知商家、更新用户画像这些步骤全部变成了异步执行,能减少订单支付的整体耗时,提升订单系统的吞吐量。这便是 MQ 的另一个典型应用场景:异步通信。
除此以外,由于队列能转储消息,对于超出系统承载能力的场景,可以用 MQ 作为 “漏斗” 进行限流保护,即所谓的流量削峰。
我们还可以利用队列本身的顺序性,来满足消息必须按顺序投递的场景;利用队列 + 定时任务来实现消息的延时消费 ……
MQ 其他的应用场景基本类似,都能回归到消息模型的特性上,找到它适用的原因,这里就不一一分析了。
总之,就是建议大家多从复杂多变的实践场景再回归到理论层面进行思考和抽象,这样能吃得更透。

三类角色


另外,将「一发一存一消费」这个核心流程进一步细化后,比较完整的数据流如下:

基于上面两个图,我们可以很快明确出 3 类角色的作用,分别如下:
1、Broker(服务端):MQ 中最核心的部分,是 MQ 的服务端,核心逻辑几乎全在这里,它为生产者和消费者提供 RPC 接口,负责消息的存储、备份和删除,以及消费关系的维护等。
2、Producer(生产者):MQ 的客户端之一,调用 Broker 提供的 RPC 接口发送消息。
3、Consumer(消费者):MQ 的另外一个客户端,调用 Broker 提供的 RPC 接口接收消息,同时完成消费确认。

MQ解决什么问题

MQ是一直存在,不过随着微服务架构的流行,成了解决微服务之间问题的常用工具。

应用解耦

以电商应用为例,应用中有订单系统、库存系统、物流系统、支付系统。用户创建订单后,如果耦合调用库存系统、物流系统、支付系统,任何一个子系统出了故障,都会造成下单操作异常。
当转变成基于消息队列的方式后,系统间调用的问题会减少很多,比如物流系统因为发生故障,需要几分钟来修复。在这几分钟的时间里,物流系统要处理的内存被缓存在消息队列中,用户的下单操作可以正常完成。当物流系统恢复后,继续处理订单信息即可,中单用户感受不到物流系统的故障。提升系统的可用性

流量消峰

举个栗子,如果订单系统最多能处理一万次订单,这个处理能力应付正常时段的下单时绰绰有余,正常时段我们下单一秒后就能返回结果。但是在高峰期,如果有两万次下单操作系统是处理不了的,只能限制订单超过一万后不允许用户下单。
使用消息队列做缓冲,我们可以取消这个限制,把一秒内下的订单分散成一段时间来处理,这事有些用户可能在下单十几秒后才能收到下单成功的操作,但是比不能下单的体验要好。

消息分发

多个服务队数据感兴趣,只需要监听同一类消息即可处理。

例如A产生数据,B对数据感兴趣。如果没有消息的队列A每次处理完需要调用一下B服务。过了一段时间C对数据也感性,A就需要改代码,调用B服务,调用C服务。只要有服务需要,A服务都要改动代码。很不方便。

有了消息队列后,A只管发送一次消息,B对消息感兴趣,只需要监听消息。C感兴趣,C也去监听消息。A服务作为基础服务完全不需要有改动

异步消息


有些服务间调用是异步的,例如A调用B,B需要花费很长时间执行,但是A需要知道B什么时候可以执行完,以前一般有两种方式,A过一段时间去调用B的查询api查询。或者A提供一个callback api,B执行完之后调用api通知A服务。这两种方式都不是很优雅

使用消息总线,可以很方便解决这个问题,A调用B服务后,只需要监听B处理完成的消息,当B处理完成后,会发送一条消息给MQ,MQ会将此消息转发给A服务。
这样A服务既不用循环调用B的查询api,也不用提供callback api。同样B服务也不用做这些操作。A服务还能及时的得到异步处理成功的消息

常见消息队列及比较

消息丢失,消息重复,消息积压一些处理方式

https://zhuanlan.zhihu.com/p/112681372

总结下来也就是消息丢失,消息重复,消息积压这三大类,只要了解它的原理这些问题我们就能处理个七七八八了。我们知道一条消息大致都要经历这么几个过程:生产者产生消息–>消息队列存储消息–>消费者消费消息。
消息丢失
1,Producer直到收到Broker的确认响应后才会停止重试消息发送
2,Broker收到来自Producer的消息,持久化到磁盘(存储阶段):在这个阶段只要Broker正常工作基本上都能序列化正常,但是在集群化的Broker中发生概率还是比较高的。因此我们要对刷盘机制根据情况做一些调整,在单节点的情况下,我们可以设置当Broker接收到消息后先刷盘,当刷盘成功后再给Producer成功响应。
3,消费者通过网络,从Broker上读取消息(消费阶段):在这个阶段我们需要注意的一定要当消费者处理完你自身业务逻辑后给Broker发送消费确认。
消息重复
上面我们说到了消息丢失的处理方式,大家有没有发现有好几个地方是有问题呢?问题就是:当我们保证了不丢消息的同时,又引入了消息重复的问题
在一定情况下当消费者消费成功后再ack时异常或者超,Broker此时会认为此消息未消费成功,进入重复消费阶段。其实现在解决这个问题就显得非常简单了,我们不管是生产者重复,还是Broker重复我们只要在消费端保证幂等性就可以(任意多次执行所产生的影响均与一次执行的影响相同称为幂等操作),
下面我就介绍几种常用的保证幂等性的方法:
1,通过数据库的为唯一键实现幂等:
2,通过版本号/数据快照实现幂等:其实这种方式有点类似于乐观锁的实现方式,就是需要消息中带有此业务当前一个瞬时状态的值,通过这个值与业务当前数据比较来判断是否执行更新操作。
消息积压
两种情况:要么生产者消息数量增加导致的积压;要么就是消费者消费变慢导致的消息积压。对于一个消息队列我们肯定在上线前就预估好,单节点最大承受流量与系统目前最大峰值流量的数据,一般情况下消息队列收发性能是远大于业务处理性能的,一旦出现的话问题也很显而易见:要么就是流量突然增加,要么就是业务逻辑异常。我能应该从三个方面来查找问题:
01.生产端:一般当生产端发生积压(Broker正常的情况下)就要查看你的业务逻辑是否有异常的耗时步骤导致的。是否需要改并行化操作等。
02,Broker端:当Broker端发生积压我们首先要查看,消息队列内存使用情况,如果有分区的的话还得看每个分区积压的消息数量差异。当每个分区的消息积压数据量相对均匀的话,我们大致可以认为是流量激增。需要在消费端做优化,或者同时需要增加Broker节点(相当于存储扩容),如果分区加压消息数量差异很大的话(有的队列满了,有的队列可能还是空闲状态),我们这时候就要检查我们的路由转发规则是否合理,
03。消费端:在使用消息队列的时候大部分的问题都出在消费端,当消费速度小于生产速度很快就会出现积压,导致消息延迟,以至于丢失。这里需要重点说明一点的是,当消费速度小于生产速度的时候,仅增加消费者是没有用处的,因为多个消费者在同一个分区上实际是单线程资源竞争关系(当然还有一些冒险的单队列多消费者并行方式就是:消费者接到消息就ack成功再去处理业务逻辑,这样你就要承受消息丢失的代价),我们需要同时增加Broker上的分区数量才能解决这一问题。
那么上面我们说到消息积压的问题所在,那么遇到这样问题我们怎么能够快速的解决呢?我们需要查看是否有无限重发的消息或者有进入死锁的程序等等,当确定是流量激增的话,我们需要评估是否需要增加资源还是通过限流的方式解决,当短时间大量消息需要处理时,在资源允许的情况下,我们可以新启一批消费者与消息队列,将原来的消费者中的消息直接作为生产者转发到临时应急队列中,这样大概率的能够快速解决消息积压。与其事后处理不如我们在设计之初就要把积压考虑进来,对于数据量非常大,但是实时性要求不高的场景,可以设计出批量消息发送,当队列积累到一定阀值再做批量消费消费,这里需要注意的就是重复消费带来的影响,设计不好就是一场灾难。

题外话:RocketMQ消息何时删除?

今天写到这里我突然想到一个漏掉的问题。在RocketMQ中存在多个消费组时,消费者只记录自己的消费位置offset,那么过消息啥时候过期?过期后啥时候删除?怎么删除呢?前面在《什么是消息模型?》中我们说到了RocketMQ通过一个主题包含多个队列的方式实现并行生产与消费,并且一个队列给每个消费者提供offset标示消费位置。我们知道它的消息是顺序写在在一个叫CommitLog的文件中,然后会把里面的消息分发对应到主题上的多个Consume Queue,消费端根据offset跳过历史记录按次序获取每个Queue上最新的历史记录。默认情况下CommitLog在每天4点删除超过48小时的文件或者当磁盘水位线达到75%

参考

消息队列(mq)是什么?:https://blog.csdn.net/weixin_43958556/article/details/116381853
RabbitMQ从入门到精通:https://www.cnblogs.com/heirenxilou/p/13632438.html

架构设计和高并发系列
读书_大型网站技术架构01_李智慧
读书_大型网站技术架构02_李智慧
读书_大型网站技术架构03_李智慧
读书_高并发设计40问之一基础
读书_高并发设计40问之二数据库
读书_高并发设计40问之三缓存
读书_高并发设计40问之四消息队列
读书_高并发设计40问之五分布式服务
读书_w3c架构师01通用设计与方法论
读书_w3c架构师02典型架构实践
读书_w3c架构师03数据库与缓存
分布式事务
高并发之缓存
高并发之降级
高并发之限流
数据库_读写分离
消息队列_01消息队列入门
消息队列_02rabbitMQ入门
消息队列_03rabbitMQ安装和使用

Your browser is out-of-date!

Update your browser to view this website correctly. Update my browser now

×