Loading... ## SpringAMQP简介: SpringAMQP是基于RabbitMQ封装的一套模板,并且还利用SpringBoot对其实现了自动装配,使用起来非常方便。 ### SpringAMQP提供了三个功能: * 自动声明队列、交换机及其绑定关系 * 基于注解的监听器模式,异步接收消息 * 封装了RabbitTemplate工具,用于发送消息 ## 使用前准备: ### 父工程引入依赖: ```xml <?xml version="1.0" encoding="UTF-8"?> <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion> <groupId>org.example</groupId> <artifactId>mq-demo</artifactId> <version>1.0-SNAPSHOT</version> <properties> <maven.compiler.source>8</maven.compiler.source> <maven.compiler.target>8</maven.compiler.target> </properties> <modules> <module>consumer</module> <module>publisher</module> </modules> <packaging>pom</packaging> <parent> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-parent</artifactId> <version>2.3.9.RELEASE</version> <relativePath/> </parent> <dependencies> <!--消息转换器--> <dependency> <groupId>com.fasterxml.jackson.dataformat</groupId> <artifactId>jackson-dataformat-xml</artifactId> </dependency> <!--spring AMQ核心依赖--> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-amqp</artifactId> <version>3.0.2</version> </dependency> <!--springboot启动依赖--> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-test</artifactId> <version>2.3.9.RELEASE</version> </dependency> </dependencies> </project> ``` ### 使用到MQ的子工程yml配置文件 ```yaml spring: rabbitmq: host: 192.168.231.130 # rabbitMQ的ip地址 port: 5672 # 端口 username: rabbitmq # rabbitmq后台的登录账号 password: rabbitmq # rabbitmq后台的登录密码 virtual-host: / ``` ## SimpleQueue简单队列使用: 生产者和消费者使用的队列必须存在,否则消息会丢失 ### 添加队列的方法: #### 第一种:  #### 第二种: 在代码中声明bean ```java package cn.bdm.consumer.config; import org.springframework.amqp.core.Queue; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; /** * @code Description * @code author 本当迷 * @code date 2023/2/8-16:03 */ @Configuration public class FanoutConfig { /*声明队列*/ @Bean public Queue objectQueue(){ return new Queue("simple.queue"); } } ``` ### publisher消息生产者 ```java package cn.bdm.publisher; import org.junit.jupiter.api.Test; import org.junit.runner.RunWith; import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.boot.test.context.SpringBootTest; import org.springframework.test.context.junit4.SpringRunner; import java.util.HashMap; import java.util.Map; @SpringBootTest class PublisherApplicationTests { @Autowired private RabbitTemplate rabbitTemplate; @Test void contextLoads() { String queueName = "simple.queue"; String message = "hello, rabbitmq"; rabbitTemplate.convertAndSend(queueName, message); } } ``` ### consumer消息消费者 ```java @Component public class SpringRabbitListener { @RabbitListener(queues = "simple.queue") public void listenSimpleQueueMessage(String msg) { System.out.println("spring 消费者接受到的消息" + msg); } } ``` ## TopicExchange主题交换机使用: ### publisher消息生产者 ```java package cn.bdm.publisher; import org.junit.jupiter.api.Test; import org.junit.runner.RunWith; import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.boot.test.context.SpringBootTest; import org.springframework.test.context.junit4.SpringRunner; import java.util.HashMap; import java.util.Map; @SpringBootTest class PublisherApplicationTests { @Autowired private RabbitTemplate rabbitTemplate; @Test void rabbitTopicQueue(){ // 交换机名称 String exchangeName = "bdm.topic"; // 消息 String msg = "hello, topic"; // topic String topic = "A.C"; // 发送消息 rabbitTemplate.convertAndSend(exchangeName, topic, msg); } } ``` ### consumer消息消费者 ```java ork.amqp.core.ExchangeTypes; import org.springframework.amqp.rabbit.annotation.Exchange; import org.springframework.amqp.rabbit.annotation.Queue; import org.springframework.amqp.rabbit.annotation.QueueBinding; import org.springframework.amqp.rabbit.annotation.RabbitListener; import org.springframework.stereotype.Component; import java.text.SimpleDateFormat; import java.util.Date; import java.util.Map; /** * @code Description * @code author 本当迷 * @code date 2023/2/8-15:16 */ @Component public class SpringRabbitListener { @RabbitListener(bindings = @QueueBinding( value = @Queue(name = "topic.queue1"), exchange = @Exchange(name = "bdm.topic", type = ExchangeTypes.TOPIC), key = "A.*" )) public void listenTopicQueue1(String msg){ System.out.println("消费者1接受到topic的消息" + msg); } @RabbitListener(bindings = @QueueBinding( value = @Queue(name = "topic.queue2"), exchange = @Exchange(name = "bdm.topic", type = ExchangeTypes.TOPIC), key = "*.C" )) public void listenTopicQueue2(String msg){ System.out.println("消费者2接受到topic的消息" + msg); } } ``` ### 运行结果:  ## SpringAMQP序列化对象存在的问题: <div class="tip inlineBlock error"> > Spring会把你发送的消息序列化为字节发送给MQ,接收消息的时候,还会把字节反序列化为Java对象。 只不过,默认情况下Spring采用的序列化方式是JDK序列化。众所周知,JDK序列化存在下列问题: * **数据体积过大** * **有安全漏洞** * **可读性差** </div> ### 配置JSON转换器 **显然,JDK序列化方式并不合适。我们希望消息体的体积更小、可读性更高,因此可以使用JSON方式来做序列化和反序列化。** **在publisher和consumer两个服务中都引入依赖:** ```xml <dependency> <groupId>com.fasterxml.jackson.dataformat</groupId> <artifactId>jackson-dataformat-xml</artifactId> <version>2.9.10</version> </dependency> ``` **配置消息转换器。** **在启动类中添加一个Bean即可:** ```java @Bean public MessageConverter jsonMessageConverter(){ return new Jackson2JsonMessageConverter(); } ``` 最后修改:2023 年 02 月 08 日 © 允许规范转载 打赏 赞赏作者 支付宝微信 赞 1 如果文章有用,请随意打赏。