盛夏的果实 发表于 2021-7-21 17:45:49

RocketMQ集群NameServer高可用测试

一、环境说明

ip地址主机名操作系统版本RocketMQ版本JDK版本maven版本备注
172.16.7.91nameserver01centos 7.64.8.01.8.0_2913.6Name Server集群
172.16.7.92nameserver03centos 7.64.8.01.8.0_2913.6Name Server集群
172.16.7.93master01centos 7.64.8.01.8.0_2913.6Broker集群1
172.16.7.94slave01centos 7.64.8.01.8.0_2913.6Broker集群1
172.16.7.95master02centos 7.64.8.01.8.0_2913.6Broker集群2
172.16.7.96slave02centos 7.64.8.01.8.0_2913.6Broker集群2

二、部署概况


三、消息正常发送
1.Producer代码
package com.my.maven.rocketmq;​import org.apache.rocketmq.client.producer.DefaultMQProducer;import org.apache.rocketmq.client.producer.SendResult;import org.apache.rocketmq.common.message.Message;import org.apache.rocketmq.remoting.common.RemotingHelper;​public class Producer {    public static void main(String[] args) throws Exception {      //Instantiate with a producer group name.      DefaultMQProducer producer = new            DefaultMQProducer("group_test_123");      // Specify name server addresses.      producer.setNamesrvAddr("172.16.7.91:9876;172.16.7.92:9876");      producer.setRetryTimesWhenSendAsyncFailed(2);      //Launch the instance.      producer.start();      for (int i = 0; i < 100; i++) {            //Create a message instance, specifying topic, tag and message body.            Message msg = new Message("topic_test_123" /* Topic */,                "TagA" /* Tag */,                ("NameServer Test" +                  i).getBytes(RemotingHelper.DEFAULT_CHARSET) /* Message body */            );            //Call send message to deliver message to one of brokers.            SendResult sendResult = producer.send(msg);            System.out.printf("%s%n", sendResult);      }      //Shut down once the producer instance is not longer in use.      producer.shutdown();    }}发送消息NameServer Test0--NameServer Test99
2.运行Producer

3.发送查看
日志:
SendResult , queueOffset=0]SendResult , queueOffset=0]SendResult , queueOffset=0]SendResult , queueOffset=0]SendResult , queueOffset=0]SendResult , queueOffset=0]……一共100条发送记录
console查看

broker-a和broker-b两个分片各发送了52和48条消息
4.Consumer代码
package com.my.maven.rocketmq;​import java.util.List;​import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;import org.apache.rocketmq.client.exception.MQClientException;import org.apache.rocketmq.common.consumer.ConsumeFromWhere;import org.apache.rocketmq.common.message.MessageExt;​public class Consumer {​    public static void main(String[] args) throws InterruptedException,            MQClientException {​      DefaultMQPushConsumer consumer = new DefaultMQPushConsumer(                "group_test_123");      consumer.setNamesrvAddr("172.16.7.91:9876;172.16.7.92:9876");​      consumer.subscribe("topic_test_123", "TagA || TagB");      consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);      consumer.registerMessageListener(new MessageListenerConcurrently() {​            public ConsumeConcurrentlyStatus consumeMessage(                  List<MessageExt> msgs, ConsumeConcurrentlyContext context) {                System.out.println(Thread.currentThread().getName()                        + " Receive New Messages: " + msgs);                MessageExt msg = msgs.get(0);                if (msg.getTopic().equals("topic_test_123")) {                  if (msg.getTags() != null && msg.getTags().equals("TagA")) {                        // 获取消息体                        String message = new String(msg.getBody());                        System.out.println("receive TagA message:" + message);                  } else if (msg.getTags() != null                            && msg.getTags().equals("TagB")) {                        // 获取消息体                        String message = new String(msg.getBody());                        System.out.println("receive TagB message:" + message);                  }​                }                // 成功                return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;            }      });      consumer.start();      System.out.println("Consumer Started.");    }​}5.运行Consumer

6.消费查看
消费日志:

Consumer Started.ConsumeMessageThread_6 Receive New Messages: , transactionId='null'}]]receive TagA message:NameServer Test85ConsumeMessageThread_1 Receive New Messages: , transactionId='null'}]]receive TagA message:NameServer Test21ConsumeMessageThread_9 Receive New Messages: , transactionId='null'}]]receive TagA message:NameServer Test45ConsumeMessageThread_13 Receive New Messages: , transactionId='null'}]]receive TagA message:NameServer Test81ConsumeMessageThread_4 Receive New Messages: , transactionId='null'}]]receive TagA message:NameServer Test13ConsumeMessageThread_13 Receive New Messages: , transactionId='null'}]]receive TagA message:NameServer Test52……一共消费了100条消息
console查看

发现消息消费一共也是100条

四、关闭一台nameserver节点
1.关闭nameserver02
# init 02.发送消息
再次发送100条消息


查看日志和console,发现新增消息100条
3.消息消费


消息消费也新增100条
4.结论
结论一:当一个nameserver节点宕机时,不影响消息发送和消费。

五、关闭所有nameserver节点
1.消息发送
继续发送10000条消息,发送的同时继续关闭nameserver01
# init 0

发送10000条消息,发送的同时关闭nameserver01,发现消息只发送了367条
2.消息消费

发现无法消费,无消费记录
3.结论
结论二:当所有nameserver宕机时,消息发送和接收都会无法进行。

六、开启nameserver01
1.nameserver01开机
消息发送和消费会恢复,但是会丢消息



2.结论
结论三:当nameserver集群恢复时,部分消息会恢复发送和消费,同时出现部分消息丢失情况。

七、总结
总结:为保证RocketMQ集群能正常对外提供服务,需至少保证有一台nameserver服务器处于运行状态;当所有nameserver服务器宕机时,消息无法发送和消费。

本文所有代码和配置文件已上传github:RocketMQ_NameServer_HA_Test
单机版RocketMQ搭建详见:Centos7.6搭建RocketMQ4.8全纪录
集群版RocketMQ搭建详见:RocketMQ4.8集群搭建全纪录
集群启停详见:RocketMQ集群启停手册
集群消息收发测试:RocketMQ集群消息收发测试全纪录



文档来源:51CTO技术博客https://blog.51cto.com/u_3241766/3155755
页: [1]
查看完整版本: RocketMQ集群NameServer高可用测试