Rabbit MQ之——概述、入门、AMQP、Spring Boot整合邮件发送

第一章 MQ概述

1.1.什么是MQ

MQ全称 Message Queue(消息队列) , 是在消息的传输过程中保存消息的容器. 多用于分布式系统之间进行通信.

消息队列是典型例子: 生产者消费者模型.

A系统向B系统发送消息, A系统先将消息一条一条存放到MQ中, B系统从MQ中一条一条读取消息

image-20201122194125554image-20201123091502006

1
2
3
4
5
MQ,消息队列, 存储消息的中间件
分布式系统通信方式:
直接远程调用
借助第三方完成间接通信
发送方称为生产者, 接收方称为消费者

1.2 MQ的优势和劣势

1.2.1 MQ的优势

1
2
3
应用解耦
快速应用变更与维护
削峰填谷

应用解耦

1
2
3
4
订单系统依赖 ---> 库存系统,支付系统,物流系统
当库存系统出现问题时,导致订单系统没有办法正常工作.

系统耦合性越高, 容错性就越低, 可维护性也就越低.

image-20201122195130351

1
使用MQ使得应用解耦,提升容错性和可维护性.

image-20201122195308230

异步提速

1
2
一个下单操作需要耗时: 20 + 300 + 300 + 300 = 920
执行效率太慢

image-20201122195651775

1
2
用户点击下单按钮时,后台只需要将信息写入数据库,并将消息存入MQ,响应给客户端即可
真正处理的时间用户时感知不到了.

image-20201122195912997

削峰填谷

image-20201122200036139

image-20201122200105031

1
使用了MQ之后, 限制消费消息的速度为1000, 这样以来, 高峰期产生的数据势必会被积压在MQ中, 高峰期就给"削"掉了, 但是因为消息积压, 在高峰期过后的一段时间内, 消费消息的速度还是会维持在1000, 直到消费完积压的消息, 这就是"填谷"

image-20201122200151829

小结

1
2
3
应用解耦: 提高系统容错性和可维护性
异步提速: 提升用户体验和系统吞吐量
削峰填谷: 增加系统稳定性

1.2.2 MQ的劣势

image-20201122201936018

1
2
3
4
5
6
系统可用性降低:
系统引入的外部依赖越多,系统的稳定性越差. 一旦MQ宕机, 将会对业务造成影响.
系统复杂度提高:
MQ的加入大大增加了系统的复杂度, 以前系统之间是同步的远程调用, 现在是通过MQ进行异步调用. 如何保证消息没有被重复消费? 怎么处理消息丢失情况? 怎么保证消息传递的顺序性?
一致性问题:
A系统处理完业务, 通过MQ给B,C,D三个系统发数据, 如果B系统,C系统处理成功, D系统处理失败. 如何保证消息数据处理的一致性?

1.2.3 MQ小结

1
2
3
4
既然MQ有优势也有劣势, 那么使用MQ需要满足什么条件呢?
1.生产者不需要从消费者处获取反馈. 引入消息队列之前的直接调用,其接口返回值必须为空.这样上层才能继续往后执行.
2.允许短暂的不一致性
3.确实是用了有效果. 既解耦,提速,削峰这些方法的收益,超过加入MQ,管理MQ的成本.

1.3 常见的MQ产品

image-20201122201358945

1.4 AMQP和JMS

AMQP

AMQP, 既Advanced Message Queuing Protocol(高级消息队列协议), 是一个网络协议, 是应用层协议的一个开发标准, 为面向消息的中间件设计. 基于此协议的客户端与消息中间件就可以传递消息了. 2006年,AMQP规范发布. 类似HTTP.

image-20201122223753472

image-20201123103039508

JMS(了解)

JMS 既 Java消息服务(JavaMessage Service) 应用程序接口, 是一个java平台中关于面向消息中间件的API.

JMS是JavaEE 13种规范中的一种, 类比JDBC.

很多消息中间件都实现了JMS规范, 例如: ActiveMQ.

RabbitMQ官方没有提供JMS的实现包, 但是开源社区有相关实现.

1
2
AMQP是协议,是消息中间件通讯的协议, 类比HTTP
JMS是API规范接口,类比JDBC

1.5 RabbitMQ简介

概述

2007年, Rabbit技术公司基于AMQP标准开发的RabbitMQ1.0版本发布了. RabbitMQ采用Erlang语言开发.Erlang语言是针开发高并发和分布式系统的一种语言, 在电信领域应用广泛.

RabbitMQ是基于AMQP的一款消息管理系统

官网: http://www.rabbitmq.com/

官方教程:http://www.rabbitmq.com/getstarted.html

image-20201122211225425

image-20201123105057894

相关概念

image-20201122211433093

image-20201122211449618

工作模式

RabbitMQ提供了6种消息模型,但是第6种其实是RPC,并不是MQ,因此不予学习。那么也就剩下5种。

但是其实3、4、5这三种都属于订阅模型,只不过进行路由的方式不同。

1527068544487

小结

1
2
3
4
5
6
7
8
9
10
11
12
rabbitMQ下载方式: windows Linux mac docker
rabbitMQ工作模式:
rabbitMQ相关概念:
Broker: 代理,指rabbitMQ服务器
虚拟机: 相当于mysql的Database
交换机: 判断消息类型,分发消息到不同的队列中
队列: 存放消息的最终位置
Connection: 连接
channel: 通道/甬道
生成者: 往MQ中存放消息的一方
消费者: 从MQ中拉取消息的一方

第二章 RabbitMQ下载与安装

2.1 下载与安装

官网下载地址:http://www.rabbitmq.com/download.html

快速入门地址:https://www.rabbitmq.com/getstarted.html

window安装

rabbit的安装依赖于erlang,所以得先安装erlang:

erlang安装

erlang下载https://www.erlang.org/downloads

可以直接拷贝《资料/otp_win64_23.0.exe》

下载好的exe文件直接双击运行:

设置环境变量:

验证erlang环境:

rabbit安装

rabbitmq下载:http://www.rabbitmq.com/download.html

可以直接拷贝《资料/rabbitmq-server-3.8.6.exe》

双击exe文件直接安装:

cmd进入安装目录:C:\Program Files\RabbitMQ Server\rabbitmq_server-3.8.6\sbin

安装web管理插件:

rabbitmq-plugins.bat enable rabbitmq_management

guest/guest登入:

到此rabbitmq就安装好了!

Linux安装

erlang安装

安装erlang需要的依赖:

1
# yum -y install make gcc gcc-c++ kernel-devel m4 ncurses-devel openssl-devel unixODBC-devel xz

下载erlang源文件(可以直接拷贝《资料/otp_src_23.0.tar.gz》):

1
# wget http://erlang.org/download/otp_src_23.0.tar.gz

解压:

1
# tar -xvzf otp_src_23.0.tar.gz

执行安装:

1
2
3
4
# cd otp_src_23.0/
# ./configure --prefix=/usr/local/erlang --with-ssl -enable-threads -enable-smmp-support -enable-kernel-poll --enable-hipe --without-javac
# make
# make install

安装erlang,运行./configure提示如下错误

configure: error: No curses library functions found

configure: error: /bin/sh ‘/home/jiayi/otp_src_18.2.1/erts/configure’ failed for erts

解决:

yum -y install ncurses-devel

然后./configure

make

make install

配置环境变量:

1
2
3
4
5
6
7
8
9
# vi /etc/profile

# 最后一行添加以下内容:
ERLANG_HOME=/usr/local/erlang
PATH=$PATH:$JAVA_HOME/bin:$ERLANG_HOME/bin

#保存并退出
#以下命令生效配置
# source /etc/profile

验证erlang环境:

1
# erl

以上步骤就安装完了erlang的环境了!

rabbitmq安装

下载安装包:

1
# wget https://github.com/rabbitmq/rabbitmq-server/releases/download/v3.8.6/rabbitmq-server-generic-unix-3.8.6.tar.xz

解压下载的xz文件

1
2
3
# xz -d rabbitmq-server-generic-unix-3.8.6.tar.xz
# 生成的rabbitmq-server-generic-unix-3.8.6.tar文件继续解压
# tar -xvf rabbitmq-server-generic-unix-3.8.6.tar

启用web管理界面:./rabbitmq-plugins enable rabbitmq_management

如果出现{:query, :rabbit@server1, {:badrpc, :timeout}}错误,那么需要将主机名在hosts文件中进行配置本机的IP地址:

再次启用web管理:

启动rabbitmq:

./rabbitmq-server -detached

访问控制台:

http://192.168.148.139:15672/

使用guest/guest访问:

guest从3.3版本开始就禁用访问了,咋们可以添加一个用户然后设置权限:

1
2
3
# 添加一个用户admin 密码是 admin
# ./rabbitmqctl add_user admin admin
# ./rabbitmqctl set_user_tags admin administrator

注意:如果访问不了,出现以下错误就需要检查防火墙是否关闭。

linux防火墙关闭(centos7)

1:查看防火墙状态

systemctl status firewalld

2:停止防火墙

systemctl stop firewalld.service

3:禁止开机启动

systemctl disable firewalld.service

4:重启防火墙

systemctl enable firewalld

Docker安装

1
2
3
4
5
6
7
8
9
10
11
12
docker run --restart=always \
--network leyou-network \
-e RABBITMQ_DEFAULT_USER=leyou \
-e RABBITMQ_DEFAULT_PASS=123321 \
--name mq \
--hostname mq1 \
-p 15672:15672 \
-p 5672:5672 \
-d \
rabbitmq:3-management

# Linux和docker版没有内置账号,需要自己手动设置

2.2 使用管理界面

1)界面总览

1570592912783

2)用户权限管理

我们新建的itcast用户,不具备访问权限,如图:

1569411819639

我们需要添加对virtual hosts的访问权限,点击itcast用户,进入用户管理界面,然后添加权限:

1569411896139

添加权限以后,可以看到用户已经具备了访问权:

1569411955659

1
2
3
4
5
6
7
8
9
10
docker run \
-e RABBITMQ_DEFAULT_USER=leyou \
-e RABBITMQ_DEFAULT_PASS=123321 \
--name mq \
--hostname mq1 \
-p 15672:15672 \
-p 5672:5672 \
-d \
rabbitmq:3-management

第三章 RabbitMQ开发

3.1 Rabbit快速入门

1527070619131

需求:

1
使用简单模式完消息传递

步骤:

1
2
3
4
1.创建工程(生产者,消费者)
2.分别添加依赖
3.编写生产者发送消息
4.编写消费者接收消息

pom

1
2
3
4
5
6
7
<dependencies>
<dependency>
<groupId>com.rabbitmq</groupId>
<artifactId>amqp-client</artifactId>
<version>5.7.3</version>
</dependency>
</dependencies>

生产者:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
package com.itheima.demo1;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

/**
* TODO:消息的生产者
*/
public class Demo1Provider {

public static void main(String[] args) throws IOException, TimeoutException {
//TODO:1.创建连接的工厂对象
ConnectionFactory factory = new ConnectionFactory();
//TODO:2.设置连接参数
//设置服务地址,默认localhost
factory.setHost("192.168.190.153");
//设置端口,默认5672
factory.setPort(5672);
//设置账号信息,用户名、密码、虚拟机
factory.setVirtualHost("/itheima129"); // 先在 web 管理平台创建虚拟机
factory.setUsername("leyou");
factory.setPassword("123321");
//TODO:3.通过工厂获取连接
Connection connection = factory.newConnection();
//TODO:4.创建通道,channel 使用通道才能完成消息相关的操作
Channel channel = connection.createChannel();
//TODO:5.使用通道创建队列
/**
queueDeclare(String queue, boolean durable, boolean exclusive, boolean autoDelete, Map<String, Object> arguments)
参数:
参数1.queue 队列名称
参数2.durable 是否持久化,当MQ重启后是否存在
参数3.exclusive
是否独占. 只能有一个消费者监听此队列
参数4.autoDelete 是否自动删除, 没有consumer时自动删除
参数5.arguments 其他参数
*/
// 如果没有名称叫做hello_world的对象则创建,如果有则不创建
channel.queueDeclare("hello_world", true, false, false, null);
// TODO:6. 定义发送到mq的消息内容
for (int i =1;i<=10;i++){
String message = "Hello RabbitMQ! "+i;
/**
* basicPublish(String exchange, String routingKey, BasicProperties props, byte[] body)
* 参数1: exchange 交换机名称,简单模式下使用默认的 写""即可
* 参数2: routingKey 路由名称
* 当前没有交换机,我们直接指定队列
* 参数3: props 配置信息
* 参数4: body 发送的消息数据
*/
channel.basicPublish("", "hello_world", null, message.getBytes());
}

System.out.println("发布消息成功...");
//TODO:7.关闭通道和连接
channel.close();
connection.close();
}
}

消费者:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
package com.itheima.demo1;

import com.rabbitmq.client.*;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

/**
* TODO:消息的消费者
*/
public class Demo1Consumer {
public static void main(String[] args) throws IOException, TimeoutException {
//TODO:1.创建连接工厂对象
ConnectionFactory factory = new ConnectionFactory();
//TODO:2.设置参数
factory.setHost("192.168.190.153");
factory.setPort(5672);
factory.setVirtualHost("/itheima129");
factory.setUsername("leyou");
factory.setPassword("123321");
//TODO:3.获取连接
Connection connection = factory.newConnection();
//TODO:4.创建通道
Channel channel = connection.createChannel();
//TODO:找到队列(订阅队列)
/**
* 参数1: 队列名称
* 参数2: 是否持久化
* 参数3: 是否独有
* 参数4: 是否自动删除
* 参数5: 其他参数
*/
channel.queueDeclare("hello_world", true, false, false, null);
// TODO:创建消费者对象
// 当目标队列中有数据时,调用此消费者进行消费
Consumer consumer = new DefaultConsumer(channel){
/**
* 回调交付
* @param consumerTag 标记
* @param envelope 获取一些信息,交换机,路由
* @param properties 配置信息
* @param body 从队列中获取的数据
* @throws IOException
*/
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {

try {
//System.out.println("唯一标识: "+consumerTag);
System.out.println("数据: "+new String(body));
// 获取交换机信息
//System.out.println("交换机: "+envelope.getExchange());
//System.out.println("routingkey: "+envelope.getRoutingKey());
//System.out.println(1/0);
// 手动确认: 交付成功,确认交付
// 删除当前的数据 false : 单个删除
channel.basicAck(envelope.getDeliveryTag(),false);
System.out.println("============");
} catch (Exception e) {
// 如果程序出现了异常,将取出的归还
// 参数1: 确认字符串 参数2: 是否批量处理 参数3: 当前消费失败时,将消息重新存放到队列中
channel.basicNack(envelope.getDeliveryTag(),false,true);
e.printStackTrace();

}
}
};
// 消费(监听)
// 参数1: 队列的名称
// 参数2: 是否自动确认(一旦自动确认,该数据就会从队列中移除)
channel.basicConsume("hello_world",false,consumer);
// 阻塞线程
System.in.read();
}
}

小结

官方文档说明:

RabbitMQ是一个消息的代理者(Message Broker):它接收消息并且传递消息。

你可以认为它是一个邮局:当你投递邮件到一个邮箱,你很肯定邮递员会终究会将邮件递交给你的收件人。与此类似,RabbitMQ 可以是一个邮箱、邮局、同时还有邮递员。

不同之处在于:RabbitMQ不是传递纸质邮件,而是二进制的数据

基本消息模型图:

1527070619131

1
2
3
4
在上图的模型中,有以下概念:
- P:生产者,也就是要发送消息的程序
- C:消费者:消息的接受者,会一直等待消息到来。
- queue:消息队列,图中红色部分。类似一个邮箱,可以缓存消息;生产者向其中投递消息,消费者从其中取出消息。

3.2 work消息模型

3.2.1.说明

在刚才的基本模型中,一个生产者,一个消费者,生产的消息直接被消费者消费。比较简单。

Work queues,也被称为(Task queues),任务模型。

当消息处理比较耗时的时候,可能生产消息的速度会远远大于消息的消费速度。长此以往,消息就会堆积越来越多,无法及时处理。此时就可以使用work 模型:让多个消费者绑定到一个队列,共同消费队列中的消息。队列中的消息一旦消费,就会消失,因此任务是不会被重复执行的。

1527078437166

角色:

  • P:生产者:任务的发布者
  • C1:消费者,领取任务并且完成任务,假设完成速度较慢
  • C2:消费者2:领取任务并完成任务,假设完成速度快

3.2.2.生产者

工具类

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
package com.itheima.utils;

import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

/**
* 工具类封装
*/
public class ConnectionUtil {
public static Connection getConnection() throws IOException, TimeoutException {
//TODO:1.创建连接工厂对象
ConnectionFactory factory = new ConnectionFactory();
//TODO:2.设置参数
factory.setHost("192.168.190.153");
factory.setPort(5672);
factory.setVirtualHost("/itheima129");
factory.setUsername("leyou");
factory.setPassword("123321");
//TODO:3.获取连接
Connection connection = factory.newConnection();
return connection;
}
}

生产者与模式1中的几乎一样:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
package com.itheima.task_queue;

import com.itheima.utils.ConnectionUtil;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;

/**
* 消息生成者
*/
public class ProviderDemo2 {
public static void main(String[] argv) throws Exception {
// 获取到连接
Connection connection = ConnectionUtil.getConnection();
// 获取通道
Channel channel = connection.createChannel();
// 声明队列
/**
queueDeclare(String queue, boolean durable, boolean exclusive,
boolean autoDelete, Map<String, Object> arguments)
参数:
参数1.queue 队列名称
参数2.durable 是否持久化,当MQ重启后是否存在
参数3.exclusive
是否独占. 只能有一个消费者监听此队列
参数4.autoDelete 是否自动删除, 没有consumer时自动删除
参数5.arguments 其他参数
*/
// 当前方法执行完后,如果有队列就使用对应的队列
// 如果没有则创建一个队列
channel.queueDeclare("work-queue", false, false, false, null);
// 循环发布任务
for (int i = 0; i < 5000; i++) {
// 消息内容
String message = "task .. " + i;
channel.basicPublish("","work-queue", null, message.getBytes());
System.out.println(" 发布了 " + message + " ... ");
}
// 关闭通道和连接
channel.close();
connection.close();
}
}

我们循环发送50条消息。

1599876563452

3.2.3.消费者

消费者1: 睡一秒 较慢

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
package com.itheima.task_queue;

import com.itheima.utils.ConnectionUtil;
import com.rabbitmq.client.*;

import java.io.IOException;

/**
* 消息消费者
*/
public class ConsumerDemo2 {

public static void main(String[] argv) throws Exception {
// 获取到连接
Connection connection = ConnectionUtil.getConnection();
// 获取通道
final Channel channel = connection.createChannel();
// 声明队列
channel.queueDeclare("work-queue", false, false, false, null);
channel.basicQos(1); // 设置每一个消费者 同时只能处理一个消息

// 定义队列的消费者
DefaultConsumer consumer = new DefaultConsumer(channel) {
// 获取消息,并且处理,这个方法类似事件监听,如果有消息的时候,会被自动调用
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties,
byte[] body) throws IOException {
try {
// body 即消息体
String msg = new String(body);
System.out.println(" [消费者1] 消费了 : " + msg + "!");
Thread.sleep(1000);
// 手动ACK
channel.basicAck(envelope.getDeliveryTag(), false);
} catch (Exception e) {
channel.basicNack(envelope.getDeliveryTag(),false,true);
}

}
};
// 监听队列。
channel.basicConsume("work-queue", false, consumer);
}
}

消费者2: 较快一方

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
package com.itheima.task_queue;

import com.itheima.utils.ConnectionUtil;
import com.rabbitmq.client.*;

import java.io.IOException;

/**
* 消息消费者
*/
public class ConsumerDemo2_2 {

public static void main(String[] argv) throws Exception {
// 获取到连接
Connection connection = ConnectionUtil.getConnection();
// 获取通道
final Channel channel = connection.createChannel();
// 声明队列
channel.queueDeclare("work-queue", false, false, false, null);
channel.basicQos(1); // 设置每一个消费者 同时只能处理一个消息

// 定义队列的消费者
DefaultConsumer consumer = new DefaultConsumer(channel) {
// 获取消息,并且处理,这个方法类似事件监听,如果有消息的时候,会被自动调用
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties,
byte[] body) throws IOException {
try {
// body 即消息体
String msg = new String(body);
//Thread.sleep(1000);
System.out.println(" [消费者2] 消费了 : " + msg + "!");
// 手动ACK
channel.basicAck(envelope.getDeliveryTag(), false);
} catch (Exception e) {
channel.basicNack(envelope.getDeliveryTag(),false,true);
}

}
};
// 监听队列。
channel.basicConsume("work-queue", false, consumer);
}
}

与消费者1基本类似,就是没有设置消费耗时时间。

这里是模拟有些消费者快,有些比较慢。

接下来,两个消费者一同启动,然后发送50条消息:

1527085826462

可以发现,两个消费者各自消费了25条消息,而且各不相同,这就实现了任务的分发。

3.2.4.能者多劳原则

刚才的实现有问题吗?

  • 消费者1比消费者2的效率要低,一次任务的耗时较长
  • 然而两人最终消费的消息数量是一样的
  • 消费者2大量时间处于空闲状态,消费者1一直忙碌

现在的状态属于是把任务平均分配,正确的做法应该是消费越快的人,消费的越多

怎么实现呢?

我们可以修改设置,让消费者同一时间只接收一条消息,这样处理完成之前,就不会接收更多消息,就可以让处理快的人,接收更多消息 :

1527086103576

再次测试:

1527086159534

3.3 订阅模型分类

订阅模型示意图:

1527086284940

前面2个案例中,只有3个角色:

  • P:生产者,也就是要发送消息的程序
  • C:消费者:消息的接受者,会一直等待消息到来。
  • queue:消息队列,图中红色部分。类似一个邮箱,可以缓存消息;生产者向其中投递消息,消费者从其中取出消息。

而在订阅模型中,多了一个exchange角色,而且过程略有变化:

  • P:生产者,也就是要发送消息的程序,但是不再发送到队列中,而是发给X(交换机)
  • C:消费者,消息的接受者,会一直等待消息到来。
  • Queue:消息队列,接收消息、缓存消息。
  • Exchange:交换机,图中的X。一方面,接收生产者发送的消息。另一方面,知道如何处理消息,例如递交给某个特别队列、递交给所有队列、或是将消息丢弃。到底如何操作,取决于Exchange的类型。Exchange有以下3种类型:
    • Fanout:广播,将消息交给所有绑定到交换机的队列
    • Direct:定向,把消息交给符合指定routing key 的队列
    • Topic:通配符,把消息交给符合routing pattern(路由模式) 的队列

Exchange(交换机)只负责转发消息,不具备存储消息的能力,因此如果没有任何队列与Exchange绑定,或者没有符合路由规则的队列,那么消息会丢失!

3.4.广播模型-Fanout

Fanout,英文翻译是扇出,我觉得在MQ中叫广播更合适。

3.4.1.流程说明

流程图:

1527086564505

在广播模式下,消息发送流程是这样的:

  • 1) 可以有多个消费者
  • 2) 每个消费者有自己的queue(队列)
  • 3) 每个队列都要绑定到Exchange(交换机)
  • 4) 生产者发送的消息,只能发送到交换机,交换机来决定要发给哪个队列,生产者无法决定。
  • 5) 交换机把消息发送给绑定过的所有队列
  • 6) 队列的消费者都能拿到消息。实现一条消息被多个消费者消费

3.4.2.生产者

两个变化:

  • 1) 声明Exchange,不再声明Queue
  • 2) 发送消息到Exchange,不再发送到Queue
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
package com.itheima.demo3;


import com.itheima.utils.ConnectionUtil;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;

public class FanoutProducer {

public static void main(String[] args) throws Exception {

// 1. 建立和mq的连接
Connection connection = ConnectionUtil.getConnection();
// 2. 从连接中创建通道,channel 使用通道才能完成消息相关的操作
Channel channel = connection.createChannel();

//3.声明交换器和队列
String exchangeName = "exchange-fanout";
channel.exchangeDeclare(exchangeName,"fanout",true); //扇形交换机类型-fanout
// 创建多个队列
String fanoutQueue1 = "fanout_queue1";
//队列1
channel.queueDeclare(fanoutQueue1, false, false, false,null);
//队列2
String fanoutQueue2 = "fanout_queue2";
channel.queueDeclare(fanoutQueue2, false, false, false,null);

//4.同一个交换机与2个队列绑定 参数3: 扇出形式 不需要路由规则 两个队列都会接受到生产投递的消息
channel.queueBind(fanoutQueue1,exchangeName,"");//绑定第一个队列
channel.queueBind(fanoutQueue2,exchangeName,"");//绑定第二个队列
//5.生产消息
for(int i=0;i<10;i++){
channel.basicPublish(exchangeName,"", true,null,"hello fanout!".getBytes());
}
//6.关闭channel和连接
channel.close();
//关闭连接
connection.close();

}

}

1599880559042

1599880583045

扇出方式 两个队列都会接受交换机 传递过来的消息!

3.4.3.消费者

消费者1

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
package com.itheima.demo3;

import com.itheima.utils.ConnectionUtil;
import com.rabbitmq.client.*;

import java.io.IOException;

public class FanoutConsumer1 {

public static void main(String[] args) throws Exception {
// 1. 建立和mq的连接
Connection connection = ConnectionUtil.getConnection();
// 2. 从连接中创建通道,channel 使用通道才能完成消息相关的操作
final Channel channel = connection.createChannel();

// 3. 队列名称
String queueName = "fanout_queue1";
// 绑定队列到交换机
channel.queueBind(queueName, "exchange-fanout", "");

// 4. 定义队列的消费者
DefaultConsumer consumer = new DefaultConsumer(channel) {
// 获取消息,并且处理,这个方法类似事件监听,如果有消息的时候,会被自动调用
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties,
byte[] body) throws IOException {
try {
// body 即消息体
String msg = new String(body);
// System.out.println(1/0); // 模拟异常 表示消息未正常处理
System.out.println(" fanout-consumer 1 : [x] received : " + msg + "!");
channel.basicAck(envelope.getDeliveryTag(),false);// 代码没有异常 手动通知队列 删除消息即可
} catch (Exception e){
e.printStackTrace(); // 第三个参数 false 直接删除消息 true 表示 :把消息重回队列
channel.basicNack(envelope.getDeliveryTag(),false,true);
}
}
};
// 5. 监听队列,第二个参数:是否自动进行消息确认。 false 告诉队列不要删除消息
channel.basicConsume(queueName, false, consumer);

}

}

要注意代码中:队列需要和交换机绑定

1
2
3
4
// 3. 队列名称
String queueName = "fanout_queue1";
// 绑定队列到交换机
channel.queueBind(queueName, "exchange-fanout", "");

消费者2

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
package com.itheima.demo3;


import com.itheima.utils.ConnectionUtil;
import com.rabbitmq.client.*;

import java.io.IOException;

public class FanoutConsumer2 {

public static void main(String[] args) throws Exception {
// 1. 建立和mq的连接
Connection connection = ConnectionUtil.getConnection();
// 2. 从连接中创建通道,channel 使用通道才能完成消息相关的操作
final Channel channel = connection.createChannel();

// 3. 队列名称
String queueName = "fanout_queue2";
// 绑定队列到交换机
channel.queueBind(queueName, "exchange-fanout", "");

// 4. 定义队列的消费者
DefaultConsumer consumer = new DefaultConsumer(channel) {
// 获取消息,并且处理,这个方法类似事件监听,如果有消息的时候,会被自动调用
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties,
byte[] body) throws IOException {
try {
// body 即消息体
String msg = new String(body);
// System.out.println(1/0); // 模拟异常 表示消息未正常处理
System.out.println(" fanout-consumer 2 : [x] received : " + msg + "!");
channel.basicAck(envelope.getDeliveryTag(),false);// 代码没有异常 手动通知队列 删除消息即可
} catch (Exception e){
e.printStackTrace(); // 第三个参数 false 直接删除消息 true 表示 :把消息重回队列
channel.basicNack(envelope.getDeliveryTag(),false,true);
}
}
};
// 5. 监听队列,第二个参数:是否自动进行消息确认。 false 告诉队列不要删除消息
channel.basicConsume(queueName, false, consumer);

}

}

测试: 先启动生产者发送消息

然后 我们再运行消费者1,然后再运行消费者2:

1599881321174

扇出方式: 两个队列彼此独立,互不干扰!

3.5.定向模型-Direct

3.5.1.说明

在Fanout模式中,一条消息,会被所有订阅的队列都消费。但是,在某些场景下,我们希望不同的消息被不同的队列消费。这时就要用到Direct类型的Exchange。

在Direct模型下:

  • 队列与交换机的绑定,不能是任意绑定了,而是要指定一个RoutingKey(路由key)
  • 消息的发送方在 向 Exchange发送消息时,也必须指定消息的 RoutingKey
  • Exchange不再把消息交给每一个绑定的队列,而是根据消息的Routing Key进行判断,只有队列的Routingkey与消息的 Routing key完全一致,才会接收到消息

流程图:

1527087677192

图解:

  • P:生产者,向Exchange发送消息,发送消息时,会指定一个routing key。
  • X:Exchange(交换机),接收生产者的消息,然后把消息递交给 与routing key完全匹配的队列
  • C1:消费者,其所在队列指定了需要routing key 为 error 的消息
  • C2:消费者,其所在队列指定了需要routing key 为 info、error、warning 的消息

3.5.2.生产者

此处我们模拟商品的增删改,发送消息的RoutingKey分别是:insert、update、delete

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
package com.itheima.demo4;

import com.itheima.utils.ConnectionUtil;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;

public class DirectProducer {

public static void main(String[] args) throws Exception {

// 1. 建立和mq的连接
Connection connection = ConnectionUtil.getConnection();
// 2. 从连接中创建通道,channel 使用通道才能完成消息相关的操作
Channel channel = connection.createChannel();
//3.声明交换器和队列
String exchangeName = "exchange-direct";
channel.exchangeDeclare(exchangeName,"direct",false); //交换机类型-direct
// 创建2个队列
//队列1
String directQueue1 = "direct_queue1";
channel.queueDeclare(directQueue1, false, false, false,null);
//队列2
String directQueue2 = "direct_queue2";
channel.queueDeclare(directQueue2, false, false, false,null);

//4.同一个交换机与2个队列绑定 参数3: 直连direct形式 定义路由规则 两个队列按照路由 接受对应的消息
channel.queueBind(directQueue1,exchangeName,"insert");//绑定第一个队列
channel.queueBind(directQueue2,exchangeName,"delete");//绑定第二个队列
channel.queueBind(directQueue2,exchangeName,"update");//绑定第二个队列
//5.生产消息 向指定的队列投递消息
for(int i=0;i<10;i++){
channel.basicPublish(exchangeName,"insert", true,null,"hello direct insert!".getBytes());
channel.basicPublish(exchangeName,"delete", true,null,"hello direct delete!".getBytes());
channel.basicPublish(exchangeName,"update", true,null,"hello direct update!".getBytes());
}

//6.关闭channel和连接
channel.close();
//关闭连接
connection.close();
}

}

3.5.3.消费者

我们此处假设消费者1只接收1种类型的消息:例如 接受添加商品信息。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
package com.itheima.demo4;
import com.itheima.utils.ConnectionUtil;
import com.rabbitmq.client.*;

import java.io.IOException;

public class DirectConsumer1 {
public static void main(String[] args) throws Exception {

// 1. 建立和mq的连接
Connection connection = ConnectionUtil.getConnection();
// 2. 从连接中创建通道,channel 使用通道才能完成消息相关的操作
final Channel channel = connection.createChannel();

// 3. 队列名称 insert
String queueName = "direct_queue1";
// 绑定队列到交换机
channel.queueBind(queueName, "exchange-direct", "insert");

// 4. 定义队列的消费者
DefaultConsumer consumer = new DefaultConsumer(channel) {
// 获取消息,并且处理,这个方法类似事件监听,如果有消息的时候,会被自动调用
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties,
byte[] body) throws IOException {
try {
// body 即消息体
String msg = new String(body);
// System.out.println(1/0); // 模拟异常 表示消息未正常处理
System.out.println(" direct-consumer 1 : [x] received : " + msg + "!");
channel.basicAck(envelope.getDeliveryTag(),false);// 代码没有异常 手动通知队列 删除消息即可
} catch (Exception e){
e.printStackTrace(); // 第三个参数 false 直接删除消息 true 表示 :把消息重回队列
channel.basicNack(envelope.getDeliveryTag(),false,true);
}
}
};
// 5. 监听队列,第二个参数:是否自动进行消息确认。 false 告诉队列不要删除消息
channel.basicConsume(queueName, false, consumer);

}
}

消费者2

我们此处假设消费者2接收2种类型的消息:更新商品和删除商品。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
package com.itheima.demo4;
import com.itheima.utils.ConnectionUtil;
import com.rabbitmq.client.*;

import java.io.IOException;
public class DirectConsumer2 {
public static void main(String[] args) throws Exception {

// 1. 建立和mq的连接
Connection connection = ConnectionUtil.getConnection();
// 2. 从连接中创建通道,channel 使用通道才能完成消息相关的操作
final Channel channel = connection.createChannel();

// 3. 队列名称
String queueName2 = "direct_queue2";
// 绑定队列到交换机
channel.queueBind(queueName2, "exchange-direct", "delete");
channel.queueBind(queueName2, "exchange-direct", "update");

// 4. 定义队列的消费者
DefaultConsumer consumer = new DefaultConsumer(channel) {
// 获取消息,并且处理,这个方法类似事件监听,如果有消息的时候,会被自动调用
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties,
byte[] body) throws IOException {
try {
// body 即消息体
String msg = new String(body);
// System.out.println(1/0); // 模拟异常 表示消息未正常处理
System.out.println(" direct-consumer 2 : [x] received : " + msg + "!");
channel.basicAck(envelope.getDeliveryTag(),false);// 代码没有异常 手动通知队列 删除消息即可
} catch (Exception e){
e.printStackTrace(); // 第三个参数 false 直接删除消息 true 表示 :把消息重回队列
channel.basicNack(envelope.getDeliveryTag(),false,true);
}
}
};
// 5. 监听队列,第二个参数:是否自动进行消息确认。 false 告诉队列不要删除消息
channel.basicConsume(queueName2, false, consumer);

}
}

3.6.通配符模型-Topic

3.6.1.说明

Topic类型的ExchangeDirect相比,都是可以根据RoutingKey把消息路由到不同的队列。只不过Topic类型Exchange可以让队列在绑定Routing key 的时候使用通配符!

Routingkey 一般都是有一个或多个单词组成,多个单词之间以”.”分割,例如: item.insert

通配符规则:

#:匹配一个或多个词

*:匹配不多不少恰好1个词

举例:

item.#:能够匹配item.spu.insert 或者 item.spu

item.*:只能匹配item.spu

image-20201123163445058

图示:

1527088518574

解释:

  • 红色Queue:绑定的是usa.# ,因此凡是以 usa.开头的routing key 都会被匹配到
  • 黄色Queue:绑定的是#.news ,因此凡是以 .news结尾的 routing key 都会被匹配

3.6.2.生产者

使用topic类型的Exchange,发送消息的routing key

注意: 此案例 先启动消费端 按照指定路由监听: 再启动服务端,再投递消息!

可以根据 user.update product.insert user.insert 测试

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
package com.itheima.topic;

import com.itheima.utils.ConnectionUtil;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;

public class Demo5Provider {
public static void main(String[] args) throws Exception {

// 1. 建立和mq的连接
Connection connection = ConnectionUtil.getConnection();
// 2. 从连接中创建通道,channel 使用通道才能完成消息相关的操作
Channel channel = connection.createChannel();
//3.声明交换器和队列
//3.声明交换器和队列
String exchangeName = "exchange-topic";
channel.exchangeDeclare(exchangeName,"topic",false); //交换机类型-direct
// 创建2个队列
//队列1
String topicQueue1 = "topic_queue1";
channel.queueDeclare(topicQueue1, false, false, false,null);
//队列2
String topicQueue2 = "topic_queue2";
channel.queueDeclare(topicQueue2, false, false, false,null);

//4.同一个交换机与2个队列绑定 参数3: 直连direct形式 定义路由规则 两个队列按照路由 接受对应的消息
channel.queueBind(topicQueue1,exchangeName,"abc.#");//绑定第一个队列
channel.queueBind(topicQueue2,exchangeName,"#.hello");//绑定第二个队列
channel.queueBind(topicQueue2,exchangeName,"*.world");//绑定第二个队列
//5.生产消息 向指定的队列投递消息
for(int i=0;i<10;i++){
channel.basicPublish(exchangeName,"abc.abc", true,null,"abc前缀的值!".getBytes());
channel.basicPublish(exchangeName,"abc.demo.hello", true,null,"hello ".getBytes());
channel.basicPublish(exchangeName,"update.world", true,null,"update_world!".getBytes());
}

//6.关闭channel和连接
channel.close();
//关闭连接
connection.close();
}

}

3.6.3.消费者1

我们此处假设消费者1只接收的RoutingKey为 user.#,代表所有以user开头的key

注意: 本案例 :交换机和队列 均由 消费者创建 ,生产者不创建 交换机和队列来演示

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
package com.itheima.topic;

import com.itheima.utils.ConnectionUtil;
import com.rabbitmq.client.*;

import java.io.IOException;

public class Demo5Consumer1 {
public static void main(String[] args) throws Exception {

// 1. 建立和mq的连接
Connection connection = ConnectionUtil.getConnection();
// 2. 从连接中创建通道,channel 使用通道才能完成消息相关的操作
final Channel channel = connection.createChannel();

// 3. 队列名称 insert
String queueName = "topic_queue1";
// 绑定队列到交换机
channel.queueBind(queueName, "exchange-topic", "abc.#");

// 4. 定义队列的消费者
DefaultConsumer consumer = new DefaultConsumer(channel) {
// 获取消息,并且处理,这个方法类似事件监听,如果有消息的时候,会被自动调用
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties,
byte[] body) throws IOException {
try {
// body 即消息体
String msg = new String(body);
// System.out.println(1/0); // 模拟异常 表示消息未正常处理
System.out.println(" 获取 : " + msg + "!");
channel.basicAck(envelope.getDeliveryTag(),false);// 代码没有异常 手动通知队列 删除消息即可
} catch (Exception e){
e.printStackTrace(); // 第三个参数 false 直接删除消息 true 表示 :把消息重回队列
channel.basicNack(envelope.getDeliveryTag(),false,true);
}
}
};
// 5. 监听队列,第二个参数:是否自动进行消息确认。 false 告诉队列不要删除消息
channel.basicConsume(queueName, false, consumer);

}
}

3.6.4.消费者2

我们此处假设消费者2接收的消息key:*.insert,与新增有关。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
package com.itheima.topic;

import com.itheima.utils.ConnectionUtil;
import com.rabbitmq.client.*;

import java.io.IOException;

public class Demo5Consumer2 {
public static void main(String[] args) throws Exception {

// 1. 建立和mq的连接
Connection connection = ConnectionUtil.getConnection();
// 2. 从连接中创建通道,channel 使用通道才能完成消息相关的操作
final Channel channel = connection.createChannel();

// 3. 队列名称 insert
String queueName = "topic_queue2";
// 绑定队列到交换机
channel.queueBind(queueName, "exchange-topic", "#.hello");
channel.queueBind(queueName, "exchange-topic", "*.world");

// 4. 定义队列的消费者
DefaultConsumer consumer = new DefaultConsumer(channel) {
// 获取消息,并且处理,这个方法类似事件监听,如果有消息的时候,会被自动调用
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties,
byte[] body) throws IOException {
try {
// body 即消息体
String msg = new String(body);
// System.out.println(1/0); // 模拟异常 表示消息未正常处理
System.out.println(" 获取 : " + msg + "!");
channel.basicAck(envelope.getDeliveryTag(),false);// 代码没有异常 手动通知队列 删除消息即可
} catch (Exception e){
e.printStackTrace(); // 第三个参数 false 直接删除消息 true 表示 :把消息重回队列
channel.basicNack(envelope.getDeliveryTag(),false,true);
}
}
};
// 5. 监听队列,第二个参数:是否自动进行消息确认。 false 告诉队列不要删除消息
channel.basicConsume(queueName, false, consumer);

}
}

3.7.持久化

为了提高并发能力,MQ的数据默认是在内存中存储的,包括交换机、队列、消息。

这样就会出现数据安全问题,如果服务宕机,存储在MQ中未被消费的消息都会丢失。

所以我们需要将交换机、队列、消息持久化到硬盘,以防服务宕机。

交换机持久化:

1527088933255

队列持久化:

1527088960059

消息持久化:

1527088984029

第四章 SpringBoot AMQP(重点掌握)

4.1.简介

Sprin有很多不同的项目,其中就有对AMQP的支持:

1527089338661

Spring AMQP的页面:http://projects.spring.io/spring-amqp/

1527089365281

注意这里一段描述:

​ Spring-amqp是对AMQP协议的抽象实现,而spring-rabbit 是对协议的具体实现,也是目前的唯一实现。底层使用的就是RabbitMQ。

4.2.依赖和配置

添加AMQP的启动器:

1
2
3
4
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>

application.yml中添加RabbitMQ地址: 手动在控制台 新建一个虚拟机 /heima

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
server:
port: 8081
spring:
application:
name: producer-application
rabbitmq:
virtual-host: /itheima129 # 虚拟机名称
username: leyou # rabbitMQ用户名
password: 123321 # rabbitMQ密码
addresses: 192.168.190.153:5672

# 以下配置如果在只接收消息的服务上, 那么不需要配置
template:
retry: # 失败重试
enabled: true # 失败重试
initial-interval: 10000ms # 第一次重试的间隔时长
max-interval: 80000ms # 最长重试间隔,超过这个间隔将不再重试
multiplier: 2 # 下次重试间隔的倍数,此处是2即下次重试间隔是上次的2倍
exchange: # 缺省的交换机名称,此处配置后,发送消息如果不指定交换机就会使用这个
publisher-confirms: true # 生产者确认机制,确保消息会正确发送,如果发送失败会有错误回执,从而触发重试

4.3.快速入门

我们以直连 direct 为例,看看Spring中如何发送消息、接收消息。

1599889782488

4.3.1.生成者发送消息

Spring为AMQP提供了统一的消息处理模板:AmqpTemplate,非常方便的发送消息,其发送方法:

1599899378264

比较常用的3个方法,分别是:

  • 指定交换机、RoutingKey和消息体
  • 指定消息(默认队列)
  • 指定RoutingKey和消息,这里的RoutingKey其实是队列名称
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>

<groupId>com.itheima</groupId>
<artifactId>day08-rabbit-producer</artifactId>
<version>1.0-SNAPSHOT</version>
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>2.2.10.RELEASE</version>
<relativePath/>
</parent>

<dependencies>
<!-- web环境启动器 -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<!-- rabbitMQ启动器 -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
</dependencies>
</project>

代码如下: 向指定队列直接发送消息

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
package com.itheima.controller;

import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.PathVariable;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;

@RestController
public class ProducerController {
// rabbit客户端对象
@Autowired
private RabbitTemplate rabbitTemplate;

@RequestMapping("/send/{msg}")
public String sendMsg(@PathVariable("msg") String msg){
// 参数1: 队列名称,操作的队列必须实现存在
// 参数2: 存放的消息信息
for (int i = 1; i <= 10; i++) {
rabbitTemplate.convertAndSend("boot-queue",msg);
}
return "OK";
}
}


启动main 浏览器 输入地址 :

http://localhost:8081/send

结果:

1599899718932

4.3.2.接受消息

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
@Component
public class TestConsumer {

@RabbitListener(queues = "heima-queue")// 注意只需要 队列名称一致即可!
public void receive(String msg, Channel channel, Message message) throws IOException {
//会话唯一ID
long deliveryTag = message.getMessageProperties().getDeliveryTag();
try {
System.out.println("收到消息id:" + deliveryTag);
Thread.sleep(1000);
System.out.println("message:" +message.toString());
}catch (Exception e){
}
}

}

补充:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
package com.leyou.search.mq;


import com.leyou.common.constants.MQConstants;
import com.leyou.common.exception.LyException;
import com.leyou.search.entity.Goods;
import com.leyou.search.repository.GoodsRepository;
import com.leyou.search.repository.SearchService;
import com.leyou.starter.elastic.handler.DataResponseHandler;
import org.apache.http.HttpStatus;
import org.apache.lucene.search.TotalHits;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.index.query.QueryBuilders;
import org.elasticsearch.search.builder.SearchSourceBuilder;
import org.springframework.amqp.core.ExchangeTypes;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.Exchange;
import org.springframework.amqp.rabbit.annotation.Queue;
import org.springframework.amqp.rabbit.annotation.QueueBinding;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

@Component
public class ItemListener {
private final SearchService searchService;

public ItemListener(SearchService searchService) {
this.searchService = searchService;
}

/**
* 监听MQ的spu上架消息
*/
@RabbitListener(bindings = @QueueBinding(
value = @Queue(name = MQConstants.QueueConstants.SEARCH_ITEM_UP, durable = "true"),
exchange = @Exchange(value = MQConstants.ExchangeConstants.ITEM_EXCHANGE_NAME, type = ExchangeTypes.TOPIC),
key = MQConstants.RoutingKeyConstants.ITEM_UP_KEY
))
public void listenerUp(Long spuId) {

searchService.saveGoodsById(spuId);
}

/**
* 监听MQ的spu下架消息
*/
@RabbitListener(bindings = @QueueBinding(
value = @Queue(name = MQConstants.QueueConstants.SEARCH_ITEM_DOWN, durable = "true"),
exchange = @Exchange(value = MQConstants.ExchangeConstants.ITEM_EXCHANGE_NAME, type = ExchangeTypes.TOPIC),
key = MQConstants.RoutingKeyConstants.ITEM_DOWN_KEY
))
public void listenerDown(Long spuId) {
searchService.deleteGoodsById(spuId);
}


}

运行后查看日志:

1599899772187

springboot默认也是ack自动确认!

1599899784341

4.4.消息转换器

之前说过,Spring会把你发送的消息序列化为字节发送给MQ,接收消息的时候,还会把字节反序列化为Java对象。

image-20200525170410401

只不过,默认情况下Spring采用的序列化方式是JDK序列化。总所周知,JDK序列化存在下列问题:

  • 数据体积过大
  • 有安全漏洞
  • 可读性差

我们来测试一下。

4.4.1.测试发送Java对象

我们修改消息发送的代码,发送一个Map对象:

1
2
3
4
5
6
7
8
@GetMapping("/sendMap")
public String testSendMap() throws InterruptedException {
Map<String,Object> msg = new HashMap<>();
msg.put("name", "Jack");
msg.put("age", 21);
rabbitTemplate.convertAndSend("heima-queue", msg);
return "mapok";
}

暂时关闭消费者的监听:

重新运行测试类。

然后在MQ的控制台可以查看到消息,如下:

1599900081223

4.4.2.配置JSON转换器 (生产者 消费者都要配置, 否则序列化的数据会出错)

显然,JDK序列化方式并不合适。我们希望消息体的体积更小、可读性更高,因此可以使用JSON方式来做序列化和反序列化。

引入依赖:

1
2
3
4
5
<dependency>
<groupId>com.fasterxml.jackson.dataformat</groupId>
<artifactId>jackson-dataformat-xml</artifactId>
<version>2.9.10</version>
</dependency>

配置消息转换器。

在启动类中添加一个Bean即可:

1599900131536

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
package com.leyou.item.config;

import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;


@Configuration
public class RabbitConfig {

@Bean
public Jackson2JsonMessageConverter messageConverter(){
// 创建JSON的消息转换器
return new Jackson2JsonMessageConverter();
}
}

重新启动 再次查看结果

4.4.3.测试

再次发送消息,查看效果:

1599900288272

此时,消费者也可以用Map来接收消息了:Map接受即可,消费者端 别忘记导入依赖:和转换器

1
2
3
4
5
6
7
8
9
10
11
12
13
<dependency>
<groupId>com.fasterxml.jackson.dataformat</groupId>
<artifactId>jackson-dataformat-xml</artifactId>
<version>2.9.10</version>
</dependency>



@Bean
public MessageConverter jsonMessageConverter(){
// 创建JSON的消息转换器
return new Jackson2JsonMessageConverter();
}

**1599900675393 **

结果:

1599900642675

image-20201123080006082

第五章 SpringBoot整合邮件发送

image-20210708162108584

SpringBoot 完成邮件的投递

准备工作: 登录自己的开通邮件的 以163为例

1600574189645

生成自己唯一的授权码

1600574220514

  • 基于springboot环境导入email发送依赖
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>2.4.0</version>
<relativePath/> <!-- lookup parent from repository -->
</parent>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter</artifactId>
</dependency>

<!--springboot邮件发送-->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-mail</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
</dependencies>
  • 配置邮件发送参数 yaml文件
1
2
3
4
5
6
7
##    邮件发送配置
# spring.mail.username=fanqixxxx@163.com
# #spring.mail.password 填写授权码
# spring.mail.password=xxxxxxx
# #spring.mail.host 填写邮箱供应的SMTP地址
# spring.mail.host=smtp.163.com
# spring.mail.properties.mail.smtp.ssl.enable=true
1
2
3
4
5
6
7
8
9
10
11
12
13
spring:
mail:
# 发送者的邮箱地址
username: xsitheima163@163.com
# 此密码并非登录的密码,而是使用java代码发送邮件时使用的授权码
password: AKELOGXEFRJZOAWU
host: smtp.163.com
properties:
mail:
smtp:
ssl:
enable: true

  • 编写测试类 完成邮件发送
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
@SpringBootTest
@SpringBootApplication
@RunWith(SpringRunner.class)
public class EmailSend {

@Autowired
private JavaMailSenderImpl javaMailSender;
@Test
public void send(){
try {
SimpleMailMessage message = new SimpleMailMessage();
message.setSubject("传智健康官方邮件");
message.setText("你好,请保持好验证码:7788,打死都不能泄露给你的同桌");
message.setTo("tps520tps@163.com");
message.setFrom("tps520wx@163.com");
javaMailSender.send(message);
} catch (MailException e) {
e.printStackTrace();
}
}
}

查收邮件:

1600574300195

邮件发送示例

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
package com.itheima;

import org.junit.jupiter.api.Test;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.mail.MailException;
import org.springframework.mail.SimpleMailMessage;
import org.springframework.mail.javamail.JavaMailSenderImpl;

@SpringBootTest
class SendEmailApplicationTests {
@Autowired
private JavaMailSenderImpl javaMailSender;
@Test
public void sendMail() {
try {
SimpleMailMessage message = new SimpleMailMessage();
message.setSubject("表白邮件");
message.setText("rose 你好, 我想.....");
message.setTo("xsitheima126@126.com");
message.setFrom("xsitheima163@163.com");
javaMailSender.send(message);
System.out.println("========发送成功");
} catch (MailException e) {
e.printStackTrace();
}
}

}

预备账号

1
2
3
4
5
xsitheima126@126.com    a123456
授权码: YKJOGNHCCFDLIJKO
xsitheima163@163.com a12345678
授权码: AKELOGXEFRJZOAWU

总结

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
MQ: 消息队列
消息在传输过程中保存消息的容器.主要使用在分布式环境下
MQ优势:
应用解耦
快速应用变更与维护
削峰填谷
MQ劣势:
可用性降低
系统复杂度提高
短时间内无法保证一致性
在什么情况下使用MQ?
访问的服务无返回值的情况下
允许短暂性不一致的情况下
使用MQ的优势大于维护成本
AMQP协议:
高级消息队列协议,约束消息生产者和消费者与MQ传递数据时的规范
生产者 ----> 交换机 ----> 路由 ----> 队列 <---- 消费者
JMS: java提供的操作消息中间件的API
RabbitMQ概述:
erlang语言基于AMQP协议开发的一个消息队列软件.
五种工作机制:
直连式:
生产者 ----> 队列 <---- 消费者
work模式(任务模式):
生产者 ----> 队列 <---- 多个消费者(平分队列中的消息)
订阅模式:

路由模式:

topics: