Loading... 介绍如何使用Docker-compose快速部署一个单节点单副本 RocketMQ 服务,并完成简单的消息收发。 ## docker-compose.yml ```java version: '3.8' services: # NameServer 服务 namesrv: image: apache/rocketmq:5.3.1 container_name: rmqnamesrv ports: - "9876:9876" networks: - rocketmq environment: - JAVA_OPTS=-Duser.home=/opt command: sh mqnamesrv restart: always # Broker 服务 broker: image: apache/rocketmq:5.3.1 container_name: rmqbroker ports: - "10909:10909" - "10911:10911" - "10912:10912" networks: - rocketmq environment: - NAMESRV_ADDR=rmqnamesrv:9876 # 使用容器名称连接 NameServer depends_on: - namesrv command: sh mqbroker restart: always # RocketMQ 控制台 rocketmq-dashboard: image: apacherocketmq/rocketmq-dashboard:latest container_name: rocketmq-dashboard ports: - "8088:8080" networks: - rocketmq environment: - JAVA_OPTS=-Drocketmq.namesrv.addr=rmqnamesrv:9876 # 配置 NameServer 地址 depends_on: - namesrv - broker restart: always proxy: image: apache/rocketmq:5.3.1 container_name: rmqproxy networks: - rocketmq depends_on: - broker - namesrv ports: - 8080:8080 - 8081:8081 restart: on-failure environment: - NAMESRV_ADDR=rmqnamesrv:9876 command: sh mqproxy networks: rocketmq: driver: bridge ``` ### 启动RocketMQ ```java docker-compose up -d ``` ### 查看启动日志 ```java docker-compose logs -f ``` ## 新增主题 ### 控制台新增主题  ### 服务中新增主题 进入broker容器,通过mqadmin创建 Topic ```java docker exec -it rmqbroker bash sh mqadmin updatetopic -t TestTopic -c DefaultCluster ``` ## 测试消息生产和发送 ### 引入依赖 ```java <dependency> <groupId>org.apache.rocketmq</groupId> <artifactId>rocketmq-client-java</artifactId> <version>5.0.6</version> </dependency> ``` ### 测试生产者 ```java package cn.bdmcom.product; import org.apache.rocketmq.client.apis.ClientConfiguration; import org.apache.rocketmq.client.apis.ClientConfigurationBuilder; import org.apache.rocketmq.client.apis.ClientException; import org.apache.rocketmq.client.apis.ClientServiceProvider; import org.apache.rocketmq.client.apis.message.Message; import org.apache.rocketmq.client.apis.producer.Producer; import org.apache.rocketmq.client.apis.producer.SendReceipt; import org.slf4j.Logger; import org.slf4j.LoggerFactory; public class ProducerExample { private static final Logger logger = LoggerFactory.getLogger(ProducerExample.class); public static void main(String[] args) throws ClientException { // 接入点地址,需要设置成Proxy的地址和端口列表,一般是xxx:8080;xxx:8081 String endpoint = "192.168.59.129:8080"; // 消息发送的目标Topic名称,需要提前创建。 String topic = "TestTopic"; ClientServiceProvider provider = ClientServiceProvider.loadService(); ClientConfigurationBuilder builder = ClientConfiguration.newBuilder().setEndpoints(endpoint); ClientConfiguration configuration = builder.enableSsl(false).build(); // 初始化Producer时需要设置通信配置以及预绑定的Topic。 Producer producer = provider.newProducerBuilder() .setTopics(topic) .setClientConfiguration(configuration) .build(); // 普通消息发送。 Message message = provider.newMessageBuilder() .setTopic(topic) // 设置消息索引键,可根据关键字精确查找某条消息。 .setKeys("messageKey") // 设置消息Tag,用于消费端根据指定Tag过滤消息。 .setTag("messageTag") // 消息体。 .setBody("messageBody".getBytes()) .build(); try { // 发送消息,需要关注发送结果,并捕获失败等异常。 SendReceipt sendReceipt = producer.send(message); logger.info("Send message successfully, messageId={}", sendReceipt.getMessageId()); } catch (ClientException e) { logger.error("Failed to send message", e); } // producer.close(); } } ``` ### 测试消费者 ```java package cn.bdmcom.consumer; import java.io.IOException; import java.util.Collections; import org.apache.rocketmq.client.apis.ClientConfiguration; import org.apache.rocketmq.client.apis.ClientException; import org.apache.rocketmq.client.apis.ClientServiceProvider; import org.apache.rocketmq.client.apis.consumer.ConsumeResult; import org.apache.rocketmq.client.apis.consumer.FilterExpression; import org.apache.rocketmq.client.apis.consumer.FilterExpressionType; import org.apache.rocketmq.client.apis.consumer.PushConsumer; import org.slf4j.Logger; import org.slf4j.LoggerFactory; public class PushConsumerExample { private static final Logger logger = LoggerFactory.getLogger(PushConsumerExample.class); private PushConsumerExample() { } public static void main(String[] args) throws ClientException, IOException, InterruptedException { final ClientServiceProvider provider = ClientServiceProvider.loadService(); // 接入点地址,需要设置成Proxy的地址和端口列表,一般是xxx:8080;xxx:8081 String endpoint = "192.168.59.129:8080"; ClientConfiguration clientConfiguration = ClientConfiguration.newBuilder() .setEndpoints(endpoint) .build(); // 订阅消息的过滤规则,表示订阅所有Tag的消息。 String tag = "*"; FilterExpression filterExpression = new FilterExpression(tag, FilterExpressionType.TAG); // 为消费者指定所属的消费者分组,Group需要提前创建。 String consumerGroup = "DefaultCluster"; // 指定需要订阅哪个目标Topic,Topic需要提前创建。 String topic = "TestTopic"; // 初始化PushConsumer,需要绑定消费者分组ConsumerGroup、通信参数以及订阅关系。 PushConsumer pushConsumer = provider.newPushConsumerBuilder() .setClientConfiguration(clientConfiguration) // 设置消费者分组。 .setConsumerGroup(consumerGroup) // 设置预绑定的订阅关系。 .setSubscriptionExpressions(Collections.singletonMap(topic, filterExpression)) // 设置消费监听器。 .setMessageListener(messageView -> { // 处理消息并返回消费结果。 logger.info("Consume message successfully, messageId={}", messageView.getMessageId()); return ConsumeResult.SUCCESS; }) .build(); Thread.sleep(Long.MAX_VALUE); // 如果不需要再使用 PushConsumer,可关闭该实例。 // pushConsumer.close(); } } ``` ## 问题解决 ### 镜像不可用 镜像源拉不下来,可以使用以下参考: ```java vim /etc/docker/daemon.json ``` ```java { "registry-mirrors": [ "https://dockerpull.org", "https://docker.1panel.dev", "https://docker.fxxk.dedyn.io", "https://docker.xn--6oq72ry9d5zx.cn", "https://docker.zhai.cm", "https://docker.5z5f.com", "https://a.ussh.net", "https://docker.cloudlayer.icu", "https://hub.littlediary.cn", "https://hub.crdz.gq", "https://docker.unsee.tech", "https://docker.kejilion.pro", "https://registry.dockermirror.com", "https://hub.rat.dev", "https://dhub.kubesre.xyz", "https://docker.nastool.de", "https://docker.udayun.com", "https://docker.rainbond.cc", "https://hub.geekery.cn", "https://docker.1panelproxy.com", "https://atomhub.openatom.cn", "https://docker.m.daocloud.io" ] } ``` ### 虚拟机内存不足 假如内存飙升100%,需要进行扩容  可以扩展卷组来使用新的磁盘空间: ```java sudo vgextend ubuntu-vg /dev/sda ``` 接下来,扩展逻辑卷,以便利用新增的空间: ```java sudo lvextend -l +100%FREE /dev/ubuntu-vg/ubuntu-lv ``` 完成上述操作后,检查文件系统和逻辑卷的大小: ```java lvdisplay /dev/ubuntu-vg/ubuntu-lv ``` 最后修改:2024 年 12 月 01 日 © 允许规范转载 打赏 赞赏作者 支付宝微信 赞 如果文章有用,请随意打赏。