# 整合消息队列ActiveMq
常见的消息队列有activemq、rabbitmq、socketmq、kafka 这里选用最常见的activemq来完成项目
# 1.什么是MQ
ActiveMQ 是Apache出品,最流行的,能力强劲的开源消息总线。ActiveMQ 是一个完全支持JMS1.1和J2EE 1.4规范的 JMS Provider实现,尽管JMS规范出台已经是很久的事情了,但是JMS在当今的J2EE应用中间仍然扮演着特殊的地位
# 2.消息队列可以做什么
顾名思义,mq主要还是为了提高服务器响应速度,提高客户体验.举个例子大家就应该明白了。 比如当用户登录某商城后,点击某商品要求发送邮件或者发送短信,此时邮件和短信是有可能失败的,如果同步的话必然会引起客户长时间等待,不利于客户体验(mq是异步).所以mq可以设置一个定时器,每隔一段时间对于发送失败的邮件和短信重新发送。
# 3.下载安装MQ
下载地址:http://activemq.apache.org/components/classic/download/
将下载好的zip文件解压就可以直接使用了,选择自己电脑的版本,点击activemq.bat文件,类似与tomcat的使用,打开成功之后显示了MQ的端口
在浏览器打开查看,用户名和密码在conf/users.properties文件中可以查看,其他的文件解释:
bin目录:用于存放activemq启动,停止的批处理
conf目录:用于activemq的配置文件
data目录:用于存放activemq的日志
docs目录:用于存放activemq的说明文档
examples目录:用于存放activemq对外开放接口示例
lib目录:用于存放相关jar包
webapps/webapps-demo目录:启动activemq ,访问ActiveMQ的管理页面
到这里我们的MQ就可以使用了,接下来就可以使用java代码去集成
# 4.SpringBoot整合MQ的步骤
1.添加maven依赖
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-activemq</artifactId>
</dependency>
2
3
4
2.yml配置
##指定队列名称
queue: mq-test
spring:
##mq配置
activemq:
broker-url: tcp://127.0.0.1:61616
user: admin
password: admin
2
3
4
5
6
7
8
9
# 3.定义生产者代码
1.定义一个测试的实体类
/**
* @ Author :gjy
* @ Date :Created in 16:34 20122/05/08
* @ Description:mq测试
* @ Modified By:
* @Version: $
*/
public class MqTest {
private Long id;
private String name;
private Integer age;
public MqTest(Long id, String name, Integer age) {
super();
this.id = id;
this.name = name;
this.age = age;
}
public Long getId() {
return id;
}
public void setId(Long id) {
this.id = id;
}
public String getName() {
return name;
}
public void setName(String name) {
this.name = name;
}
public Integer getAge() {
return age;
}
public void setAge(Integer age) {
this.age = age;
}
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
2.定义生产者的代码
/**
* @ Author :gjy
* @ Date :Created in 16:32 2022/05/08
* @ Description:创建Producer,生产者,往消息队列中发送消息,为了演示明显,加入了定时任务
* @ Modified By:
* @Version: $
*/
import java.util.UUID;
import javax.jms.Queue;
import com.example.echart.entity.MqTest;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.jms.core.JmsMessagingTemplate;
import org.springframework.scheduling.annotation.EnableScheduling;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Component;
import com.alibaba.fastjson.JSONObject;
/**
* @classDesc: 功能描述:(生产者代码)
*/
@Component//将Producer注入到容器
@EnableScheduling//定时任务的注解
public class Producer {
@Autowired
private JmsMessagingTemplate jmsMessagingTemplate;
@Autowired//注入
private Queue queue;
private int age = 18;
@Scheduled(fixedDelay = 5000)//每隔5秒钟执行这个方法
public void send() {
System.out.println("---------mq定时器运行---------");
age++;
MqTest mqTest = new MqTest(System.currentTimeMillis(), UUID.randomUUID().toString(), age);
String json = new JSONObject().toJSONString(mqTest);//将实体类转换成json字符串
System.out.println("json:" + json);
jmsMessagingTemplate.convertAndSend(queue, json);//向指定队列中发送消息
}
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
这是启动项目,定时器每隔5秒向我们定义的mq-test消息队列中插入数据,这里的MqTest对象是自己定义的一个实体类,可以随便写,启动项目之后我们在Queue中可以看到我们定义的队列,并且接收到了多少条数据,发送了多少条数据
随着定时器的执行,还会不停的向该队列中插入数据,放到真实项目中,应该就是结合业务放入对应的信息,然后等待消费者去获取
3.定义消费者的代码
/**
* @ Author :gjy
* @ Date :Created in 16:41 2022/05/08
* @ Description:消费者,接收mq的信息
* @ Modified By:
* @Version: $
*/
import com.example.echart.entity.MqTest;
import org.springframework.jms.annotation.JmsListener;
import org.springframework.stereotype.Component;
import com.alibaba.fastjson.JSONObject;
@Component
public class Consumer {
//activeMq监听监听接收消息队列
@JmsListener(destination = "${queue}")
public void receive(String msg){//这个msg就是从消息队列获得到的参数
System.out.println(msg);
JSONObject jsonObject = new JSONObject();
MqTest userEntity = jsonObject.parseObject(msg,MqTest.class);//将json转换成实体类
System.out.println(userEntity.getName()+"---"+userEntity.getId());
}
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
运行项目,这是会将我们刚刚放入队列中的信息发送出去
← Linux笔记专栏 利用docker容器 →