评论

收藏

[Java] Springboot 配置RabbitMQ文档的方法步骤

编程语言 编程语言 发布于:2021-09-18 12:19 | 阅读数:578 | 评论:0

这篇文章主要介绍了Springboot 配置RabbitMQ文档的方法步骤,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友们下面随着小编来一起学习学习吧
简介
rabbitmq是实现amqp(高级消息队列协议)的消息中间件的一种,用于在分布式系统中存储转发消息,在易用性、扩展性、高可用性等方面表现不俗
概念:

  • 生产者 消息的产生方,负责将消息推送到消息队列
  • 消费者 消息的最终接受方,负责监听队列中的对应消息,消费消息
  • 队列 消息的寄存器,负责存放生产者发送的消息
  • 交换机 负责根据一定规则分发生产者产生的消息
  • 绑定 完成交换机和队列之间的绑定
模式:

  • direct:直连模式,用于实例间的任务分发
  • topic:话题模式,通过可配置的规则分发给绑定在该exchange上的队列
  • headers:适用规则复杂的分发,用headers里的参数表达规则
  • fanout:分发给所有绑定到该exchange上的队列,忽略routing key
springboot集成rabbitmq
一、引入maven依赖
<dependency>
 <groupid>org.springframework.boot</groupid>
 <artifactid>spring-boot-starter-amqp</artifactid>
 <version>1.5.2.release</version>
</dependency>
二、配置application.properties
# rabbitmq
spring.rabbitmq.host = dev-mq.a.pa.com
spring.rabbitmq.port = 5672
spring.rabbitmq.username = admin
spring.rabbitmq.password = admin
spring.rabbitmq.virtualhost = /message-test/
三、编写amqpconfiguration配置文件
package message.test.configuration;
import org.springframework.amqp.core.acknowledgemode;
import org.springframework.amqp.core.amqptemplate;
import org.springframework.amqp.core.binding;
import org.springframework.amqp.core.bindingbuilder;
import org.springframework.amqp.core.directexchange;
import org.springframework.amqp.core.queue;
import org.springframework.amqp.rabbit.config.simplerabbitlistenercontainerfactory;
import org.springframework.amqp.rabbit.connection.cachingconnectionfactory;
import org.springframework.amqp.rabbit.connection.connectionfactory;
import org.springframework.amqp.rabbit.core.rabbittemplate;
import org.springframework.beans.factory.annotation.autowired;
import org.springframework.beans.factory.annotation.qualifier;
import org.springframework.boot.autoconfigure.amqp.rabbitproperties;
import org.springframework.context.annotation.bean;
import org.springframework.context.annotation.configuration;
 
@configuration
public class amqpconfiguration {
/**
 * 消息编码
 */
 public static final string message_encoding = "utf-8";
 public static final string exchange_issue = "exchange_message_issue";
 public static final string queue_issue_user = "queue_message_issue_user";
 public static final string queue_issue_all_user = "queue_message_issue_all_user";
 public static final string queue_issue_all_device = "queue_message_issue_all_device";
 public static final string queue_issue_city = "queue_message_issue_city";
 public static final string routing_key_issue_user = "routing_key_message_issue_user";
 public static final string routing_key_issue_all_user = "routing_key_message_issue_all_user";
 public static final string routing_key_issue_all_device = "routing_key_message_issue_all_device";
 public static final string routing_key_issue_city = "routing_key_message_issue_city";
 public static final string exchange_push = "exchange_message_push";
 public static final string queue_push_result = "queue_message_push_result";
 
 @autowired
 private rabbitproperties rabbitproperties;
 
 @bean
 public queue issueuserqueue() {
  return new queue(queue_issue_user);
 }
 
 @bean
 public queue issuealluserqueue() {
  return new queue(queue_issue_all_user);
 }
 
 @bean
 public queue issuealldevicequeue() {
  return new queue(queue_issue_all_device);
 }
 
 @bean
 public queue issuecityqueue() {
  return new queue(queue_issue_city);
 }
 
 @bean
 public queue pushresultqueue() {
  return new queue(queue_push_result);
 }
 
 @bean
 public directexchange issueexchange() {
  return new directexchange(exchange_issue);
 }
 
 @bean
 public directexchange pushexchange() {
  // 参数1:队列
  // 参数2:是否持久化
  // 参数3:是否自动删除
  return new directexchange(exchange_push, true, true);
 }
 
 @bean
 public binding issueuserqueuebinding(@qualifier("issueuserqueue") queue queue,
  @qualifier("issueexchange") directexchange exchange) {
   return bindingbuilder.bind(queue).to(exchange).with(routing_key_issue_user);
 }
 
 @bean
 public binding issuealluserqueuebinding(@qualifier("issuealluserqueue") queue queue,
  @qualifier("issueexchange") directexchange exchange) {
  return bindingbuilder.bind(queue).to(exchange).with(routing_key_issue_all_user);
 }
 
 @bean
 public binding issuealldevicequeuebinding(@qualifier("issuealldevicequeue") queue queue,
  @qualifier("issueexchange") directexchange exchange) {
  return bindingbuilder.bind(queue).to(exchange).with(routing_key_issue_all_device);
 }
 
 @bean
 public binding issuecityqueuebinding(@qualifier("issuecityqueue") queue queue,
  @qualifier("issueexchange") directexchange exchange) {
  return bindingbuilder.bind(queue).to(exchange).with(routing_key_issue_city);
 }
 
 @bean
 public binding pushresultqueuebinding(@qualifier("pushresultqueue") queue queue,
  @qualifier("pushexchange") directexchange exchange) {
  return bindingbuilder.bind(queue).to(exchange).withqueuename();
 }
 
 @bean
 public connectionfactory defaultconnectionfactory() {
  cachingconnectionfactory connectionfactory = new cachingconnectionfactory();
  connectionfactory.sethost(rabbitproperties.gethost());
  connectionfactory.setport(rabbitproperties.getport());
  connectionfactory.setusername(rabbitproperties.getusername());
  connectionfactory.setpassword(rabbitproperties.getpassword());
  connectionfactory.setvirtualhost(rabbitproperties.getvirtualhost());
  return connectionfactory;
 }
 
 @bean
 public simplerabbitlistenercontainerfactory rabbitlistenercontainerfactory(
  @qualifier("defaultconnectionfactory") connectionfactory connectionfactory) {
  simplerabbitlistenercontainerfactory factory = new simplerabbitlistenercontainerfactory();
  factory.setconnectionfactory(connectionfactory);
  factory.setacknowledgemode(acknowledgemode.manual);
  return factory;
 }
 
 @bean
 public amqptemplate rabbittemplate(@qualifier("defaultconnectionfactory") connectionfactory connectionfactory) 
 {
  return new rabbittemplate(connectionfactory);
 }
}
三、编写生产者
body = json.tojsonstring(issuemessage).getbytes(amqpconfiguration.message_encoding);
 rabbittemplate.convertandsend(amqpconfiguration.exchange_issue,
      amqpconfiguration.routing_key_issue_user, body);
四、编写消费者
@rabbitlistener(queues = amqpconfiguration.queue_push_result)
public void handlepushresult(@payload byte[] data, channel channel,
  @header(amqpheaders.delivery_tag) long deliverytag) {
  
}
以上就是本文的全部内容,希望对大家的学习有所帮助,也希望大家多多支持CodeAE代码之家
原文链接:https://segmentfault.com/a/1190000018555963

关注下面的标签,发现更多相似文章