评论

收藏

[Linux] RocketMQ集群消息收发测试全纪录

服务系统 服务系统 发布于:2021-07-13 09:36 | 阅读数:426 | 评论:0

  
一、环境说明
ip地址主机名操作系统版本RocketMQ版本JDK版本maven版本备注172.16.7.91nameserver01centos 7.64.8.01.8.0_2913.6Name Server集群172.16.7.92nameserver03centos 7.64.8.01.8.0_2913.6Name Server集群172.16.7.93master01centos 7.64.8.01.8.0_2913.6Broker集群1172.16.7.94slave01centos 7.64.8.01.8.0_2913.6Broker集群1172.16.7.95master02centos 7.64.8.01.8.0_2913.6Broker集群2172.16.7.96slave02centos 7.64.8.01.8.0_2913.6Broker集群2
二、部署概况
DSC0000.png

三、创建Maven Project
1.新建Maven project
DSC0001.png
DSC0002.png
选择Maven Project
DSC0003.png
配置目录
DSC0004.png
选择原型
DSC0005.png
自定义group id和artifact id,完成maven project的创建。
DSC0006.png
2.导入依赖库
修改pom.xml,加入如下代码
   <dependency>      <groupId>org.apache.rocketmq</groupId>      <artifactId>rocketmq-client</artifactId>      <version>4.3.0</version>    </dependency>
DSC0007.png
会发现多了很多依赖包
DSC0008.png

四、生产者测试
1.测试前集群查看
启动各节点服务,查看集群状态
DSC0009.png
测试前无消息生产和消费
2.新建topic
2.1新增主题topic_test_123
DSC00010.png
主题配置如下:
DSC00011.png
集群名为MyRocketmq,BROKER_NAME两个broker都选择
2.2查看新增的主题
DSC00012.png
4.新建订阅组
4.1新建订阅组group_test_123
DSC00013.png
配置如下:
DSC00014.png
4.2查看新建的订阅组
DSC00015.png
5.新建类Producer
DSC00016.png
DSC00017.png
DSC00018.png
新建类Producer
DSC00019.png
生产者消息发送代码:
package com.my.maven.rocketmq;​import org.apache.rocketmq.client.producer.DefaultMQProducer;import org.apache.rocketmq.client.producer.SendResult;import org.apache.rocketmq.common.message.Message;import org.apache.rocketmq.remoting.common.RemotingHelper;​public class Producer {    public static void main(String[] args) throws Exception {        //Instantiate with a producer group name.        DefaultMQProducer producer = new            DefaultMQProducer("group_test_123");        // Specify name server addresses.        producer.setNamesrvAddr("172.16.7.91:9876;172.16.7.92:9876");        producer.setRetryTimesWhenSendAsyncFailed(2);        //Launch the instance.        producer.start();        for (int i = 0; i < 100; i++) {            //Create a message instance, specifying topic, tag and message body.            Message msg = new Message("topic_test_123" /* Topic */,                "TagA" /* Tag */,                ("Message Test" +                    i).getBytes(RemotingHelper.DEFAULT_CHARSET) /* Message body */            );            //Call send message to deliver message to one of brokers.            SendResult sendResult = producer.send(msg);            System.out.printf("%s%n", sendResult);        }        //Shut down once the producer instance is not longer in use.        producer.shutdown();    }}​
生产者配置项 retryTimesWhenSendAsyncFailed 表示异步重试的次数,默认为 2 次,加上正常发送的1次,总共有3次发送机会。
发送消息Message Test0--Message Test99,共100条消息。
6.运行报错
运行Produce发送消息时报错,如图:
DSC00020.png
解决:
DSC00021.png
由于测试是在本地电脑虚机上进行的,同时开多个虚机和eclipse应用会占用很多内存,解决办法是进入eclipse的安装目录,修改文件eclipse.ini,将参数-Xms和-Xmx改小点即可。
7.运行Produce
DSC00022.png
8.发送消息状态查看
8.1集群查看
DSC00023.png
可以看到broker-a和broker-b各产生了50条消息
8.2消息查看
DSC00024.png
消息详情:
DSC00025.png
8.3消费者查看
DSC00026.png
此时还未消费

五、消费者测试
1.新建类Consumer
DSC00027.png
消费代码:
package com.my.maven.rocketmq;​import java.util.List;​import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;import org.apache.rocketmq.client.exception.MQClientException;import org.apache.rocketmq.common.consumer.ConsumeFromWhere;import org.apache.rocketmq.common.message.MessageExt;​public class Consumer {​    public static void main(String[] args) throws InterruptedException,            MQClientException {​        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer(                "group_test_123");        consumer.setNamesrvAddr("172.16.7.91:9876;172.16.7.92:9876");​        consumer.subscribe("topic_test_123", "TagA || TagB");        consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);        consumer.registerMessageListener(new MessageListenerConcurrently() {​            public ConsumeConcurrentlyStatus consumeMessage(                    List<MessageExt> msgs, ConsumeConcurrentlyContext context) {                System.out.println(Thread.currentThread().getName()                        + " Receive New Messages: " + msgs);                MessageExt msg = msgs.get(0);                if (msg.getTopic().equals("topic_test_123")) {                    if (msg.getTags() != null && msg.getTags().equals("TagA")) {                        // 获取消息体                        String message = new String(msg.getBody());                        System.out.println("receive TagA message:" + message);                    } else if (msg.getTags() != null                            && msg.getTags().equals("TagB")) {                        // 获取消息体                        String message = new String(msg.getBody());                        System.out.println("receive TagB message:" + message);                    }​                }                // 成功                return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;            }        });        consumer.start();        System.out.println("Consumer Started.");    }​}
2.运行Consumer
DSC00028.png
3.消费消息状态查看
3.1消费者查看
DSC00029.png
3.2查看消费详情
DSC00030.png
3.3集群查看
DSC00031.png
3.4消息详情查看
DSC00032.png
发现消息已被消费
4消费者console日志
DSC00033.png
一共100条消息被消费
 
本文所有代码和配置文件已上传github:RocketMQ_Message_Test
单机版RocketMQ搭建详见:Centos7.6搭建RocketMQ4.8全纪录
集群版RocketMQ搭建详见:RocketMQ4.8集群搭建全纪录
集群启停详见:RocketMQ集群启停手册
DSC00034.png

  
关注下面的标签,发现更多相似文章