[JAVA]Spring Boot + RabbitMQ:轻松掌握五种基本工作模式

本文旨在介绍如何利用Spring Boot高效集成RabbitMQ,并简要概述其五大基础工作模式。若您对RabbitMQ的功能尚不熟悉或尚未完成安装,建议先进行初步了解后再继续阅读本文,以便更好地掌握相关内容。

工作模式

接下来,我们将逐一探索各种模式之间的差异。

在正式开始之前,让我们先简要介绍一些 RabbitMQ 中的核心概念,通过类比写信的过程来帮助理解:

  • 生产者(写信人):负责生成信息或数据的一方。
  • 交换机(邮局):接收来自生产者的信息,并根据特定规则将这些信息分发到一个或多个队列中。
  • 队列(邮箱):存储待处理的信息,等待被消费者取走。
  • 消费者(收件人):接收并处理队列中的信息的一方。

写信人写完信,然后投递到邮局,根据地址投递到收件人附近小区的快递点/邮箱 收件人接收到信息去获取邮件

我们将对内容进行整合,并在后续部分依据代码进行详细描述。

<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-amqp</artifactId>
</dependency>

然后就可以直接使用RabbitTemplate 来进行发送信息

@Autowired
private RabbitTemplate rabbitTemplate;

yml文件中的简单配置

rabbitmq:
  host: localhost
  password: guest
  username: guest
  virtual-host: /

简单模式

可以看出,生产者直接将信息发送至队列,随后由消费者进行处理。

这里需要特别注意的是,尽管图中未明确标示出交换机,但实际上存在一个默认交换机。当我们在控制面板中添加队列时,默认情况下该队列即与默认交换机绑定。对于那些不熟悉如何在控制台创建队列的朋友们,接下来我将为大家演示具体步骤(由于无法上传视频,因此以GIF格式呈现,画质可能稍显模糊。如果有人知晓更佳的视频上传方法,欢迎告知)。

在测试类中直接编写

生产者端

@Test
void testNormal() {
    rabbitTemplate.convertAndSend("", "test_normal", "test_normal message"); // ①
    //  rabbitTemplate.convertAndSend("test_normal", "test_normal no exchange message"); // ②
}
  1. 首个参数为交换机,鉴于我们使用的是默认交换机(即为空字符串 “”),因此这里直接采用默认值。第二个参数为路由键,由于我们并未设置特定的路由键,而是通过队列名称进行绑定,因此此处应填写相应的队列名称。第三个参数则是需要传递的消息内容
  2. 由于采用了默认交换机,此参数实际上可省略。您可以通过查阅相关方法的具体实现来进一步了解这一点。

当消息成功发送后,您可以在控制面板中观察到相关信息的变化。

消费者端

@RabbitListener(queues = {"test_normal"})
public void getNormalMessage(String object,Message message, Channel channel) {
    System.out.println("接收到信息啦~~~~" + object);
}

注意: 第一个参数:用于接收信息。如果生产者发送的消息为对象(例如 Student),则可以将第一个参数调整为 Student student 以实现接收。

第二个参数:这是AMQP协议中的消息对象,包含了消息体(body)以及一系列的属性(properties)。消息体可以是任何二进制数据,而属性则包括路由键(routing key)、交换器(exchange)等元数据信息。在Spring AMQP中,Message类封装了这些信息,使得你可以方便地访问它们。

第三个参数:Channel:在RabbitMQ中,所有的操作几乎都是通过Channel完成的。Channel是客户端与RabbitMQ服务器之间的通信通道。通过这个通道,你可以执行各种命令,如发布消息、订阅队列等。在@RabbitListener方法中提供Channel参数,可以让你在需要的时候直接与RabbitMQ交互,比如手动确认消息(acknowledge messages)。

首先,请确保消费者端的代码已准备就绪,随后启动项目。您可以在测试类或 RabbitMQ 控制台中发送消息,并通过设置断点来检查消费者是否成功接收到了消息。

工作模式

其实也就是多了一个消费者监听该队列罢了,所以我们在后台再写一个消费者,监听该队列即可。

在这里笔者没有使用后台发送消息的方式,而是通过了rabbitmq控制面板直接发送消息,更加方便快捷。

根据打印信息可以清楚地看出,一旦某个消费者处理了一条消息,其他消费者就无法再处理该消息,这表明它们之间存在竞争关系。那么,有人可能会问:“如果我希望所有消费者都能接收到这条消息,应该怎么办呢?”针对这一需求,我们可以探讨另一种工作模式——发布/订阅模式。

订阅发布模式

从该模式开始,官方提供的示意图可以看出,已经明确标注了交换机的位置。接下来,我将采用两种不同的方法完成上述绑定操作:一种是在RabbitMQ管理控制台中进行设置;另一种则是通过后端代码实现绑定。具体来说,我们将创建一个名为 test_work 的交换机,以及两个队列(分别命名为 queue_work_1queue_work_2)。

创建队列的方法可参照之前简单模式下的动态演示图。接下来,我将详细展示如何创建交换机及其与队列之间的绑定关系

创建交换机

绑定

注意:这种模式下的交换机是扇形交换机(Fanout Exchange),广播模式,即凡是订阅了该交换机的队列都接收到信息。

后台绑定方式(绑定+消费者)

@RabbitListener(bindings =
@QueueBinding(
        value = @Queue(value = "queue_work_3"),
        exchange = @Exchange(name = "test_work_1", type = ExchangeTypes.FANOUT)
))
public void getWork3Message(String object,Message message, Channel channel) {
    System.out.println("getWork3Message fanout queue_work_3 message ~" + object);
}
// 注意交换机的类型,默认是direct
@RabbitListener(bindings =
@QueueBinding(
        value = @Queue(value = "queue_work_4"),
        exchange = @Exchange(name = "test_work_1", type = ExchangeTypes.FANOUT)
))
public void getWork4Message(String object,Message message, Channel channel) {
    System.out.println("getWork4Message fanout queue_work_3 message ~" + object);
}

QueueBinding能够帮助我们创建交换机和队列(如果它们尚不存在),并将其绑定在一起。

重新启动之后查看rabbitmq控制台后,可以看到成功绑定。

消费者端跟上面的消费者端一样的写法,就是监听其队列即可。

@RabbitListener(queues = {"queue_work_1"})
public void getWorkMessage1(String object,Message message, Channel channel) {
    System.out.println("getWorkMessage1 接收到信息啦~~~~" + object);
}
@RabbitListener(queues = {"queue_work_2"})
public void getWorkMessage2(String object,Message message, Channel channel) {
    System.out.println("getWorkMessage2 接收到信息啦~~~~" + object);
}

生产者端

rabbitTemplate.convertAndSend("test_work","", "test_work message 222");

当然,您也可以通过 RabbitMQ 控制面板来发送消息。与之前直接在队列中发送信息的方式不同,这次我们将通过交换机来发送消息。

此时解决了如果我想要该消息被所有消费者消费的难题了,那么问题又抛出来了,假如此时有一条私密信息来了,我又不想让所有消费者读取到该信息,使用工作模式又要面临竞争关系,那万一被另一个消费者读取到了呢?此时有机智的小伙伴想到了,那我就一个消费者不就行了吗。好好好~,出院!不过,为了更高效、更安全地处理这类情况,我们可以考虑采用另一种模式(路由模式),轻松应对这一难题。

路由模式

从图表中可以看出,该模式在原有模式的基础上引入了路由键。那么,路由键究竟起到了什么作用呢?接下来,我将通过一个实例来具体阐述这一模式的应用及其功能。

场景描述

假设你有一个新闻系统,生产者会发送不同类型的消息,例如:

  • 明星绯闻
  • 体育竞技
  • 科技前沿

你有两个队列,分别关注不同的新闻类别:

  • queue_sports_gossip:监听明星绯闻&监听体育竞技
  • queue_tech:监听科技前沿

交换机和路由键

  1. 交换机:使用一个direct类型的交换机,命名为news_exchange
  2. 路由键:每种新闻类型都有一个对应的路由键:
    • sports:体育竞技
    • tech:科技前沿
    • gossip:明星绯闻

绑定关系

  • queue_sports_gossip 绑定到 news_exchange,路由键为 sports/ gossip
  • queue_tech 绑定到 news_exchange,路由键为 tech

rabbitmq控制台进行绑定就是在上述订阅发布模式绑定过程中,添加上对应的路由键即可,绑定结果图如下所示

此时的效果是,当我们将路由键指定为 sportsgossip 时,监听 queue_sports_gossip 队列即可接收到相应信息。

此外,我们也可以通过 RabbitMQ 控制台发送消息,只要标记指定路由键即可。具体操作如图所示。

此时我们也可以通过后端来发送信息,代码如下

@Test
void contextLoads() {
    rabbitTemplate.convertAndSend( "news_exchange","tech", "test route to tech message");
}

消费端就跟上述消费端代码是一致的,只是更改了监听队列名

注意:交换机类型哦!

此时再来回顾上面抛出的问题,使用该模式是不是能够很轻松的完成该需求呢?我们只需创建一个私有队列,并正确配置路由键与交换机绑定,即可实现对私密消息的精确获取(妈妈再也不担心了)。

主题模式

其实主题模式是跟路由模式很像的,不仅可以涵盖路由模式的功能还可以解决该模式的痛点——可以使用通配符。

路由模式:只支持精确匹配,这意味着每个队列只能接收特定类型的消息,如果业务需求发生变化,可能需要重新定义队列和绑定关系,这在大规模系统中可能会比较麻烦

主题模式:使用通配符(* 和 #)来匹配路由键,使得一个队列可以订阅多个相关主题的消息。当业务需求变化时,可以通过修改路由键或添加新的队列来适应,而不需要改变现有的队列结构

我们将继续以路由模式为例,进行适当的调整与优化。

场景描述

假设你有一个新闻系统,生产者会发送不同类型的消息,例如:

  • 明星绯闻
  • 体育竞技
  • 科技前沿

你有三个队列,分别关注不同的新闻类别:

  • queue_entertainment:监听所有新闻
  • queue_sports:监听体育竞技
  • queue_techs:监听科技前沿

交换机和路由键

  1. 交换机:使用一个topic类型的交换机,命名为news_topic_exchange
  2. 路由键:每种新闻类型都有一个对应的路由键:
    • news.sports:体育竞技
    • news.techs:科技前沿
    • news.entertainment:娱乐新闻

绑定关系

  • queue_entertainment 绑定到 news_topic_exchange,路由键为 news.*(匹配所有以news.开头的消息)。
  • queue_sports 绑定到 news_exchange,路由键为 news.sports
  • queue_techs 绑定到 news_exchange,路由键为 news.tech

注意:无论您是在 RabbitMQ 控制台还是通过后台动态创建此模式下的交换机时,请务必注意选择正确的交换机类型

通配符说明

  • *(星号):用于匹配单个单词。
  • #(井号):用于匹配零个或多个单词。

这里引出一个问题:如果消息的主题为 news.sports.boll,它是否能够被成功匹配呢?欢迎各位小伙伴自行测试体验。

当消费者订阅了 queue_entertainment 队列后,将能够接收到所有以 news. 开头的消息。

在rabbitmq控制台绑定跟路由模式绑定手法一致,只是路由键可以写成这种带通配符(news.*)的而已

生产者端

@Test
void contextLoads() {
    rabbitTemplate.convertAndSend("news_topic_exchange","news.*", "test topic to tech message");
}

小伙伴们可以查看,是否可以消费到消息哦。

此时此刻,可能有些小伙伴会有疑问:在这种模式下,我们如何在后台动态地进行绑定呢?别着急,答案马上揭晓。

@RabbitListener(bindings =
@QueueBinding(
        value = @Queue(value = "queue_top_1"),
        exchange = @Exchange(name = "news_topic_exchange_1", type = ExchangeTypes.TOPIC),
        key = {"news.*"}
))
public void getTopicMessage(String object,Message message, Channel channel) {
    System.out.println("news_topic_exchange_1 fanout queue_top_1 message ~" + object);
}

小伙伴们,不妨将上述动态绑定的方法与之前的方式进行对比,细心体会其中的差异之处吧~

来看看控制面板绑定成功没

标签

发表评论