[JAVA]Spring Boot + RabbitMQ:轻松掌握五种基本工作模式
本文旨在介绍如何利用Spring Boot高效集成RabbitMQ,并简要概述其五大基础工作模式。若您对RabbitMQ的功能尚不熟悉或尚未完成安装,建议先进行初步了解后再继续阅读本文,以便更好地掌握相关内容。
工作模式
接下来,我们将逐一探索各种模式之间的差异。
在正式开始之前,让我们先简要介绍一些 RabbitMQ
中的核心概念,通过类比写信的过程来帮助理解:
- 生产者(写信人):负责生成信息或数据的一方。
- 交换机(邮局):接收来自生产者的信息,并根据特定规则将这些信息分发到一个或多个队列中。
- 队列(邮箱):存储待处理的信息,等待被消费者取走。
- 消费者(收件人):接收并处理队列中的信息的一方。
写信人写完信,然后投递到邮局,根据地址投递到收件人附近小区的快递点/邮箱 收件人接收到信息去获取邮件
我们将对内容进行整合,并在后续部分依据代码进行详细描述。
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
然后就可以直接使用RabbitTemplate
来进行发送信息
@Autowired
private RabbitTemplate rabbitTemplate;
yml文件中的简单配置
rabbitmq:
host: localhost
password: guest
username: guest
virtual-host: /
简单模式
可以看出,生产者直接将信息发送至队列,随后由消费者进行处理。
这里需要特别注意的是,尽管图中未明确标示出交换机,但实际上存在一个默认交换机。当我们在控制面板中添加队列时,默认情况下该队列即与默认交换机绑定。对于那些不熟悉如何在控制台创建队列的朋友们,接下来我将为大家演示具体步骤(由于无法上传视频,因此以GIF格式呈现,画质可能稍显模糊。如果有人知晓更佳的视频上传方法,欢迎告知)。
在测试类中直接编写
生产者端
@Test
void testNormal() {
rabbitTemplate.convertAndSend("", "test_normal", "test_normal message"); // ①
// rabbitTemplate.convertAndSend("test_normal", "test_normal no exchange message"); // ②
}
- 首个参数为交换机,鉴于我们使用的是默认交换机(即为空字符串 “”),因此这里直接采用默认值。第二个参数为路由键,由于我们并未设置特定的路由键,而是通过队列名称进行绑定,因此此处应填写相应的队列名称。第三个参数则是需要传递的消息内容。
- 由于采用了默认交换机,此参数实际上可省略。您可以通过查阅相关方法的具体实现来进一步了解这一点。
当消息成功发送后,您可以在控制面板中观察到相关信息的变化。
消费者端
@RabbitListener(queues = {"test_normal"})
public void getNormalMessage(String object,Message message, Channel channel) {
System.out.println("接收到信息啦~~~~" + object);
}
注意: 第一个参数:用于接收信息。如果生产者发送的消息为对象(例如 Student
),则可以将第一个参数调整为 Student student
以实现接收。
第二个参数:这是AMQP协议中的消息对象,包含了消息体(body)以及一系列的属性(properties)。消息体可以是任何二进制数据,而属性则包括路由键(routing key)、交换器(exchange)等元数据信息。在Spring AMQP中,Message
类封装了这些信息,使得你可以方便地访问它们。
第三个参数:Channel:在RabbitMQ中,所有的操作几乎都是通过Channel
完成的。Channel
是客户端与RabbitMQ服务器之间的通信通道。通过这个通道,你可以执行各种命令,如发布消息、订阅队列等。在@RabbitListener
方法中提供Channel
参数,可以让你在需要的时候直接与RabbitMQ交互,比如手动确认消息(acknowledge messages)。
首先,请确保消费者端的代码已准备就绪,随后启动项目。您可以在测试类或 RabbitMQ 控制台中发送消息,并通过设置断点来检查消费者是否成功接收到了消息。
工作模式
其实也就是多了一个消费者监听该队列罢了,所以我们在后台再写一个消费者,监听该队列即可。
在这里笔者没有使用后台发送消息的方式,而是通过了rabbitmq控制面板直接发送消息,更加方便快捷。
根据打印信息可以清楚地看出,一旦某个消费者处理了一条消息,其他消费者就无法再处理该消息,这表明它们之间存在竞争关系。那么,有人可能会问:“如果我希望所有消费者都能接收到这条消息,应该怎么办呢?”针对这一需求,我们可以探讨另一种工作模式——发布/订阅模式。
订阅发布模式
从该模式开始,官方提供的示意图可以看出,已经明确标注了交换机的位置。接下来,我将采用两种不同的方法完成上述绑定操作:一种是在RabbitMQ管理控制台中进行设置;另一种则是通过后端代码实现绑定。具体来说,我们将创建一个名为 test_work
的交换机,以及两个队列(分别命名为 queue_work_1
和 queue_work_2
)。
创建队列的方法可参照之前简单模式下的动态演示图。接下来,我将详细展示如何创建交换机及其与队列之间的绑定关系。
创建交换机
绑定
注意:这种模式下的交换机是扇形交换机(Fanout Exchange),广播模式,即凡是订阅了该交换机的队列都接收到信息。
后台绑定方式(绑定+消费者)
@RabbitListener(bindings =
@QueueBinding(
value = @Queue(value = "queue_work_3"),
exchange = @Exchange(name = "test_work_1", type = ExchangeTypes.FANOUT)
))
public void getWork3Message(String object,Message message, Channel channel) {
System.out.println("getWork3Message fanout queue_work_3 message ~" + object);
}
// 注意交换机的类型,默认是direct
@RabbitListener(bindings =
@QueueBinding(
value = @Queue(value = "queue_work_4"),
exchange = @Exchange(name = "test_work_1", type = ExchangeTypes.FANOUT)
))
public void getWork4Message(String object,Message message, Channel channel) {
System.out.println("getWork4Message fanout queue_work_3 message ~" + object);
}
QueueBinding能够帮助我们创建交换机和队列(如果它们尚不存在),并将其绑定在一起。
重新启动之后查看rabbitmq控制台后,可以看到成功绑定。
消费者端跟上面的消费者端一样的写法,就是监听其队列即可。
@RabbitListener(queues = {"queue_work_1"})
public void getWorkMessage1(String object,Message message, Channel channel) {
System.out.println("getWorkMessage1 接收到信息啦~~~~" + object);
}
@RabbitListener(queues = {"queue_work_2"})
public void getWorkMessage2(String object,Message message, Channel channel) {
System.out.println("getWorkMessage2 接收到信息啦~~~~" + object);
}
生产者端
rabbitTemplate.convertAndSend("test_work","", "test_work message 222");
当然,您也可以通过 RabbitMQ 控制面板来发送消息。与之前直接在队列中发送信息的方式不同,这次我们将通过交换机来发送消息。
此时解决了如果我想要该消息被所有消费者消费的难题了,那么问题又抛出来了,假如此时有一条私密信息来了,我又不想让所有消费者读取到该信息,使用
工作模式
又要面临竞争关系,那万一被另一个消费者读取到了呢?此时有机智的小伙伴想到了,那我就一个消费者不就行了吗。好好好~,出院!不过,为了更高效、更安全地处理这类情况,我们可以考虑采用另一种模式(路由模式),轻松应对这一难题。
路由模式
从图表中可以看出,该模式在原有模式的基础上引入了路由键。那么,路由键究竟起到了什么作用呢?接下来,我将通过一个实例来具体阐述这一模式的应用及其功能。
场景描述
假设你有一个新闻系统,生产者会发送不同类型的消息,例如:
- 明星绯闻
- 体育竞技
- 科技前沿
你有两个队列,分别关注不同的新闻类别:
queue_sports_gossip
:监听明星绯闻&监听体育竞技queue_tech
:监听科技前沿
交换机和路由键
- 交换机:使用一个
direct
类型的交换机,命名为news_exchange
。 - 路由键:每种新闻类型都有一个对应的路由键:
sports
:体育竞技tech
:科技前沿gossip
:明星绯闻
绑定关系
queue_sports_gossip
绑定到news_exchange
,路由键为sports
/gossip
。queue_tech
绑定到news_exchange
,路由键为tech
。
rabbitmq控制台进行绑定就是在上述订阅发布模式
绑定过程中,添加上对应的路由键
即可,绑定结果图如下所示
此时的效果是,当我们将路由键指定为 sports
或 gossip
时,监听 queue_sports_gossip
队列即可接收到相应信息。
此外,我们也可以通过 RabbitMQ 控制台发送消息,只要标记指定路由键
即可。具体操作如图所示。
此时我们也可以通过后端来发送信息,代码如下
@Test
void contextLoads() {
rabbitTemplate.convertAndSend( "news_exchange","tech", "test route to tech message");
}
消费端就跟上述消费端代码是一致的,只是更改了监听队列名
注意:交换机类型哦!
此时再来回顾上面抛出的问题,使用该模式是不是能够很轻松的完成该需求呢?我们只需创建一个私有队列,并正确配置路由键与交换机绑定,即可实现对私密消息的精确获取(妈妈再也不担心了)。
主题模式
其实主题模式是跟路由模式很像的,不仅可以涵盖路由模式的功能还可以解决该模式的痛点——可以使用通配符。
路由模式:只支持精确匹配,这意味着每个队列只能接收特定类型的消息,如果业务需求发生变化,可能需要重新定义队列和绑定关系,这在大规模系统中可能会比较麻烦
主题模式:使用通配符(* 和 #)来匹配路由键,使得一个队列可以订阅多个相关主题的消息。当业务需求变化时,可以通过修改路由键或添加新的队列来适应,而不需要改变现有的队列结构
我们将继续以路由模式为例,进行适当的调整与优化。
场景描述
假设你有一个新闻系统,生产者会发送不同类型的消息,例如:
- 明星绯闻
- 体育竞技
- 科技前沿
你有三个队列,分别关注不同的新闻类别:
queue_entertainment
:监听所有新闻queue_sports
:监听体育竞技queue_techs
:监听科技前沿
交换机和路由键
- 交换机:使用一个
topic
类型的交换机,命名为news_topic_exchange
。 - 路由键:每种新闻类型都有一个对应的路由键:
news.sports
:体育竞技news.techs
:科技前沿news.entertainment
:娱乐新闻
绑定关系
queue_entertainment
绑定到news_topic_exchange
,路由键为news.*
(匹配所有以news.
开头的消息)。queue_sports
绑定到news_exchange
,路由键为news.sports
。queue_techs
绑定到news_exchange
,路由键为news.tech
。
注意:无论您是在 RabbitMQ 控制台还是通过后台动态创建此模式下的交换机时,请务必注意选择正确的交换机类型。
通配符说明:
*
(星号):用于匹配单个单词。#
(井号):用于匹配零个或多个单词。
这里引出一个问题:如果消息的主题为 news.sports.boll
,它是否能够被成功匹配呢?欢迎各位小伙伴自行测试体验。
当消费者订阅了 queue_entertainment
队列后,将能够接收到所有以 news.
开头的消息。
在rabbitmq控制台绑定跟路由模式绑定手法一致,只是路由键可以写成这种带
通配符(news.*)
的而已
生产者端
@Test
void contextLoads() {
rabbitTemplate.convertAndSend("news_topic_exchange","news.*", "test topic to tech message");
}
小伙伴们可以查看,是否可以消费到消息哦。
此时此刻,可能有些小伙伴会有疑问:在这种模式下,我们如何在后台动态地进行绑定呢?别着急,答案马上揭晓。
@RabbitListener(bindings =
@QueueBinding(
value = @Queue(value = "queue_top_1"),
exchange = @Exchange(name = "news_topic_exchange_1", type = ExchangeTypes.TOPIC),
key = {"news.*"}
))
public void getTopicMessage(String object,Message message, Channel channel) {
System.out.println("news_topic_exchange_1 fanout queue_top_1 message ~" + object);
}
小伙伴们,不妨将上述动态绑定的方法与之前的方式进行对比,细心体会其中的差异之处吧~
来看看控制面板绑定成功没
发表评论