目前主流的消息中间件有activemq,rabbitmq,rocketmq,kafka,我们要根据实际的业务场景来选择一款合适的消息中间件,关注的主要指标有,消息投递的可靠性,可维护性,吞吐量以及中间件的特色等重要指标来选择,大数据领域肯定是kafka,那么传统的业务场景就是解耦,异步,削峰。那么就在剩下的3款产品中选择一款,从吞吐量,社区的活跃度,消息的可靠性出发,一般的中小型公司选择rabbitmq来说可能更为合适。那么我们就来看看如何使用它吧。
环境准备
本案例基于springboot集成rabbitmq,本案例主要侧重要实际的code,对于基础理论知识请自行百度。
jdk-version:1.8
rabbitmq-version:3.7
springboot-version:2.1.4.RELEASE
pom文件
<dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-amqp</artifactId> </dependency>
yml配置文件
spring: rabbitmq: password: guest username: guest port: 5672 addresses: 127.0.0.1 #开启发送失败返回 publisher-returns: true #开启发送确认 publisher-confirms: true listener: simple: #指定最小的消费者数量. concurrency: 2 #指定最大的消费者数量. max-concurrency: 2 #开启ack acknowledge-mode: auto #开启ack direct: acknowledge-mode: auto #支持消息的确认与返回 template: mandatory: true
配置rabbitMq的姿势
姿势一
基于javaconfig
package com.lly.order.message; import org.springframework.amqp.core.*; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; /** * @ClassName RabbitMqConfig * @Description rabbitMq配置类 * @Author lly * @Date 2019-05-13 15:05 * @Version 1.0 **/ @Configuration public class RabbitMqConfig { public final static String DIRECT_QUEUE = "directQueue"; public final static String TOPIC_QUEUE_ONE = "topic_queue_one"; public final static String TOPIC_QUEUE_TWO = "topic_queue_two"; public final static String FANOUT_QUEUE_ONE = "fanout_queue_one"; public final static String FANOUT_QUEUE_TWO = "fanout_queue_two"; public final static String TOPIC_EXCHANGE = "topic_exchange"; public final static String FANOUT_EXCHANGE = "fanout_exchange"; public final static String TOPIC_ROUTINGKEY_ONE = "common_key"; public final static String TOPIC_ROUTINGKEY_TWO = "*.key"; // direct模式队列 @Bean public Queue directQueue() { return new Queue(DIRECT_QUEUE, true); } // topic 订阅者模式队列 @Bean public Queue topicQueueOne() { return new Queue(TOPIC_QUEUE_ONE, true); } @Bean public Queue topicQueueTwo() { return new Queue(TOPIC_QUEUE_TWO, true); } // fanout 广播者模式队列 @Bean public Queue fanoutQueueOne() { return new Queue(FANOUT_QUEUE_ONE, true); } @Bean public Queue fanoutQueueTwo() { return new Queue(FANOUT_QUEUE_TWO, true); } // topic 交换器 @Bean public TopicExchange topExchange() { return new TopicExchange(TOPIC_EXCHANGE); } // fanout 交换器 @Bean public FanoutExchange fanoutExchange() { return new FanoutExchange(FANOUT_EXCHANGE); } // <div>本文来源gaodai.ma#com搞##代!^码7网</div>订阅者模式绑定 @Bean public Binding topExchangeBingingOne() { return BindingBuilder.bind(topicQueueOne()).to(topExchange()).with(TOPIC_ROUTINGKEY_ONE); } @Bean public Binding topicExchangeBingingTwo() { return BindingBuilder.bind(topicQueueTwo()).to(topExchange()).with(TOPIC_ROUTINGKEY_TWO); } // 广播模式绑定 @Bean public Binding fanoutExchangeBingingOne() { return BindingBuilder.bind(fanoutQueueOne()).to(fanoutExchange()); } @Bean public Binding fanoutExchangeBingingTwo() { return BindingBuilder.bind(fanoutQueueTwo()).to(fanoutExchange()); } }