• 欢迎访问搞代码网站,推荐使用最新版火狐浏览器和Chrome浏览器访问本网站!
  • 如果您觉得本站非常有看点,那么赶紧使用Ctrl+D 收藏搞代码吧

Spring Boot教程之利用ActiveMQ实现延迟消息

java 搞代码 4年前 (2022-01-05) 18次浏览 已收录 0个评论

这篇文章主要给大家介绍了关于Spring Boot教程之利用ActiveMQ实现延迟消息的相关资料,文中通过示例代码介绍的非常详细,对大家学习或者使用Spring Boot具有一定的参考学习价值,需要的朋友们下面来一起学习学习吧

一、安装activeMQ

Linux环境ActiveMQ部署方法:https://www.gaodaima.com/article/162320.htm

安装步骤参照上面这篇文章,本文不做介绍

Windows下安装ActiveMQ:

到官网(http://activemq.apache.org/download-archives.html)下载最新发布的压缩包(我下的是5.15.9)到本地后解压(我解压到D盘Dev目录下)即可。进入解压后的bin目录,我是64位机器,再进入win64目录后,双击activemq.bat启动:

 wrapper | --> Wrapper Started as Console wrapper | Launching a JVM... jvm 1 | Wrapper (Version 3.2.3) http://wrapper.tanukisoftware.org jvm 1 | Copyright 1999-2006 Tanuki Software, Inc. All Rights Reserved. jvm 1 | jvm 1 | Java Runtime: Oracle Corporation 1.8.0_181 C:\Program Files\Java\jre1.8.0_181 jvm 1 | Heap sizes: current=125952k free=115299k max=932352k jvm 1 | JVM args: -Dactivemq.home=../.. -Dactivemq.base=../.. -Djavax.net.ssl.keyStorePassword=password -Djavax.net.ssl.trustStorePassword=password -Djavax.net.ssl.keyStore=../../conf/broker.ks -Djavax.net.ssl.trustStore=../../conf/broker.ts -Dcom.sun.management.jmxremote -Dorg.apache.activemq.UseDedicatedTaskRunner=true -Djava.util.logging.config.file=logging.properties -Dactivemq.conf=../../conf -Dactivemq.data=../../data -Djava.security.auth.login.config=../../conf/login.config -Xmx1024m -Djava.library.path=../../bin/win64 -Dwrapper.key=mChNCWMZ2FoXhZ9g -Dwrapper.port=32000 -Dwrapper.jvm.port.min=31000 -Dwrapper.jvm.port.max=31999 -Dwrapper.pid=3500 -Dwrapper.version=3.2.3 -Dwrapper.native_library=wrapper -Dwrapper.cpu.timeout=10 -Dwrapper.jvmid=1 jvm 1 | Extensions classpath: jvm 1 | [..\..\lib,..\..\lib\camel,..\..\lib\optional,..\..\lib\web,..\..\lib\extra] jvm 1 | ACTIVEMQ_HOME: ..\.. jvm 1 | ACTIVEMQ_BASE: ..\.. jvm 1 | ACTIVEMQ_CONF: ..\..\conf jvm 1 | ACTIVEMQ_DATA: ..\..\data jvm 1 | Loading message broker from: xbean:activemq.xml jvm 1 | INFO | Refreshing org.apache.activemq.xbean.XBeanBrokerFactory$1@f0ef68d: startup date [Fri May 24 15:16:21 CST 2019]; root of context hierarchy jvm 1 | INFO | Using Persistence Adapter: KahaDBPersistenceAdapter[D:\Dev\apache-activemq-5.15.9\bin\win64\..\..\data\kahadb] jvm 1 | INFO | PListStore:[D:\Dev\apache-activemq-5.15.9\bin\win64\..\..\data\localhost\tmp_storage] started jvm 1 | INFO | Apache ActiveMQ 5.15.9 (localhost, ID:wulf00-51057-1558682182909-0:1) is starting jvm 1 | INFO | Listening for connections at: tcp://wulf00:61616?maximumConnections=1000&wireFormat.maxFrameSize=104857600 jvm 1 | INFO | Connector openwire started jvm 1 | INFO | Listening for connections at: amqp://wulf00:5672?maximumConnections=1000&wireFormat.maxFrameSize=104857600 jvm 1 | INFO | Connector amqp started jvm 1 | INFO | Listening for connections at: stomp://wulf00:61613?maximumConnections=1000&wireFormat.maxFrameSize=104857600 jvm 1 | INFO | Connector stomp started jvm 1 | INFO | Listening for connections at: mqtt://wulf00:1883?maximumConnections=1000&wireFormat.maxFrameSize=104857600 jvm 1 | INFO | Connector mqtt started jvm 1 | INFO | Starting Jetty server jvm 1 | INFO | Creating Jetty connector jvm 1 | WARN | [email protected]@17bc7c8a{/,null,STARTING} has uncovered http methods for path: / jvm 1 | INFO | Listening for connections at ws://wulf00:61614?maximumConnections=1000&wireFormat.maxFrameSize=104857600 jvm 1 | INFO | Connector ws started jvm 1 | INFO | Apache ActiveMQ 5.15.9 (localhost, ID:wulf00-51057-1558682182909-0:1) started jvm 1 | INFO | For help or more information please see: http://activemq.apache.org jvm 1 | WARN | Store limit is 102400 mb (current store usage is 0 mb). The data directory: D:\Dev\apache-activemq-5.15.9\bin\win64\..\..\data\kahadb only has 92649 mb of usable space. - resetting to maximum available disk space: 92649 mb jvm 1 | INFO | No Spring WebApplicationInitializer types detected on classpath jvm 1 | INFO | ActiveMQ WebConsole available at http://0.0.0.0:8161/ jvm 1 | INFO | ActiveMQ Jolokia REST API available at http://0.0.0.0:8161/api/jolokia/ jvm 1 | INFO | Initializing Spring FrameworkServlet 'dispatcher' jvm 1 | INFO | No Spring WebApplicationInitializer types detected on classpath jvm 1 | INFO | jolokia-agent: Using policy access restrictor classpath:/jolokia-access.xml

默认端口8161,访问下http://localhost:8161/admin,用户名密码都是admin,进入控制台页面:

我们用坐上方的Queues来创建一个叫vboxlog的队列:

 

二、修改activeMQ配置文件

broker新增配置信息 schedulerSupport=”true”

     "> <!-- The constantPendingMessageLimitStrategy is used to prevent slow topic consumers to block producers and affect other consumers by limiting the number of messages that are retained For more information, see: http://activemq.apache.org/slow-consumer-handling.html --> 

三、创建SpringBoot工程

1、配置ActiveMQ工厂信息,信任包必须配置否则会报错

 package com.example.demoactivemq.config; import org.apache.activemq.ActiveMQConnectionFactory; import org.apache.activemq.RedeliveryPolicy; import org.springframework.beans.factory.annotation.Value; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import java.util.ArrayList; import java.util.List; /** * @author shanks on 2019-11-12 */ @Configuration public class ActiveMqConfig { @Bean public ActiveMQConnectionFactory factory(@Value("${spring.activemq.broker-url}") String url){ ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(url); // 设置信任序列化包集合 List models = new ArrayList(); models.add("com.example.demoactivemq.domain"); factory.setTrustedPackages(models); return factory; } }

消息实体类

 package com.example.demoactivemq.domain; import lombok.Builder; import lombok.Data; import java.io.Serializable; /** * @author shanks on 2019-11-12 */ @Builder @Data public class MessageModel implements Serializable { private String titile; private String message; }

生产者

 package com.example.demoactivemq.producer; import lombok.extern.slf4j.Slf4j; import org.apache.activemq.ScheduledMessage; import org.apache.activemq.command.ActiveMQQueue; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.boot.autoconfigure.jms.JmsProperties; import org.springframework.jms.core.JmsMessagingTemplate; import org.springframework.stereotype.Service; import javax.jms.*; import java.io.Serializable; /** * 消息生产者 * * @author shanks */ @Service @Slf4j public class Producer { public static final Destination DEFAULT_QUEUE = new ActiveMQQueue("delay.queue"); @Autowired private JmsMessagingTemplate template; /** * 发送消息 * * @param destination destination是发送到的队列 * @param message message是待发送的消息 */ public  void send(Destination destination, T message) { template.convertAndSend(destination, message); } /** * 延时发送 * * @param destination 发送的队列 * @param data 发送的消息 * @param time 延迟时间 */ public  void delaySend(Destination destination, T data, Long time) { Connection connection = null; Session session = null; MessageProducer producer = null; // 获取连接工厂 ConnectionFactory connectionFactory = template.getConnectionFactory(); try { // 获取连接 connection = connectionFactory.createConnection(); connection.start(); // 获取session,true开启事务,false关闭事务 session = connection.createSession(Boolean.TRUE, Session.AUTO_ACKNOWLEDGE); // 创建一个消息队列 producer = session.createProducer(destination); producer.setDeliveryMode(JmsProperties.DeliveryMode.PERSISTENT.getValue()); ObjectMessage message = session.createObjectMessage(data); //设置延迟时间 message.setLongProperty(ScheduledMessage.AMQ_SCHEDULED_DELAY, time); // 发送消息 producer.send(message); log.info("发送消息:{}", data); session.commit(); } catch (Exception e) { e.printStackTrace(); } finally { try { if (producer != null) { producer.close(); } if (session != null) { session.close(); } if (connection != null) { connection.close(); } } catch (Exception e) { e.printStackTrace(); } } } }

消费者

 package com.example.demoactivemq.producer; import com.example.demoactivemq.domain.MessageModel; import lombok.extern.slf4j.Slf4j; import org.springframework.jms.annotation.JmsListener; import org.springframework.stereotype.Component; /** * 消费者 */ @Component @Slf4j public class Consumer { @JmsListener(destination = "delay.queue") public void receiveQueue(MessageModel message) { log.info("收到消息:{}", message); } }

application.yml

 spring: activ<i style="color:transparent">来源gaodai$ma#com搞$代*码网</i>emq: broker-url: tcp://localhost:61616

测试类

 package com.example.demoactivemq; import com.example.demoactivemq.domain.MessageModel; import com.example.demoactivemq.producer.Producer; import org.junit.jupiter.api.Test; import org.junit.runner.RunWith; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.boot.test.context.SpringBootTest; import org.springframework.test.context.junit4.SpringRunner; @SpringBootTest(classes = DemoActivemqApplication.class) @RunWith(SpringRunner.class) class DemoActivemqApplicationTests { /** * 消息生产者 */ @Autowired private Producer producer; /** * 及时消息队列测试 */ @Test public void test() { MessageModel messageModel = MessageModel.builder() .message("测试消息") .titile("消息000") .build(); // 发送消息 producer.send(Producer.DEFAULT_QUEUE, messageModel); } /** * 延时消息队列测试 */ @Test public void test2() { for (int i = 0; i <5; i++) { MessageModel messageModel = MessageModel.builder() .titile("延迟10秒执行") .message("测试消息" + i) .build(); // 发送延迟消息 producer.delaySend(Producer.DEFAULT_QUEUE, messageModel, 10000L); } try { // 休眠100秒,等等消息执行 Thread.currentThread().sleep(100000L); } catch (InterruptedException e) { e.printStackTrace(); } } }

执行结果

2019-11-12 22:18:52.939  INFO 17263 — [           main] c.e.demoactivemq.producer.Producer       : 发送消息:MessageModel(titile=延迟10秒执行, message=测试消息0)
2019-11-12 22:18:52.953  INFO 17263 — [           main] c.e.demoactivemq.producer.Producer       : 发送消息:MessageModel(titile=延迟10秒执行, message=测试消息1)
2019-11-12 22:18:52.958  INFO 17263 — [           main] c.e.demoactivemq.producer.Producer       : 发送消息:MessageModel(titile=延迟10秒执行, message=测试消息2)
2019-11-12 22:18:52.964  INFO 17263 — [           main] c.e.demoactivemq.producer.Producer       : 发送消息:MessageModel(titile=延迟10秒执行, message=测试消息3)
2019-11-12 22:18:52.970  INFO 17263 — [           main] c.e.demoactivemq.producer.Producer       : 发送消息:MessageModel(titile=延迟10秒执行, message=测试消息4)
2019-11-12 22:19:03.012  INFO 17263 — [enerContainer-1] c.e.demoactivemq.producer.Consumer       : 收到消息:MessageModel(titile=延迟10秒执行, message=测试消息0)
2019-11-12 22:19:03.017  INFO 17263 — [enerContainer-1] c.e.demoactivemq.producer.Consumer       : 收到消息:MessageModel(titile=延迟10秒执行, message=测试消息1)
2019-11-12 22:19:03.019  INFO 17263 — [enerContainer-1] c.e.demoactivemq.producer.Consumer       : 收到消息:MessageModel(titile=延迟10秒执行, message=测试消息2)
2019-11-12 22:19:03.020  INFO 17263 — [enerContainer-1] c.e.demoactivemq.producer.Consumer       : 收到消息:MessageModel(titile=延迟10秒执行, message=测试消息3)
2019-11-12 22:19:03.021  INFO 17263 — [enerContainer-1] c.e.demoactivemq.producer.Consumer       : 收到消息:MessageModel(titile=延迟10秒执行, message=测试消息4)

比你优秀的人比你还努力,你有什么资格不去奋斗!!!

总结

以上就是这篇文章的全部内容了,希望本文的内容对大家的学习或者工作具有一定的参考学习价值,谢谢大家对gaodaima搞代码网的支持。

以上就是Spring Boot教程之利用ActiveMQ实现延迟消息的详细内容,更多请关注gaodaima搞代码网其它相关文章!


搞代码网(gaodaima.com)提供的所有资源部分来自互联网,如果有侵犯您的版权或其他权益,请说明详细缘由并提供版权或权益证明然后发送到邮箱[email protected],我们会在看到邮件的第一时间内为您处理,或直接联系QQ:872152909。本网站采用BY-NC-SA协议进行授权
转载请注明原文链接:Spring Boot教程之利用ActiveMQ实现延迟消息

喜欢 (0)
[搞代码]
分享 (0)
发表我的评论
取消评论

表情 贴图 加粗 删除线 居中 斜体 签到

Hi,您需要填写昵称和邮箱!

  • 昵称 (必填)
  • 邮箱 (必填)
  • 网址