第一章 MQ概述
1.1.什么是MQ
MQ全称 Message Queue(消息队列) , 是在消息的传输过程中保存消息的容器. 多用于分布式系统之间进行通信.
消息队列是典型例子: 生产者消费者模型.
A系统向B系统发送消息, A系统先将消息一条一条存放到MQ中, B系统从MQ中一条一条读取消息
1 2 3 4 5
| MQ,消息队列, 存储消息的中间件 分布式系统通信方式: 直接远程调用 借助第三方完成间接通信 发送方称为生产者, 接收方称为消费者
|
1.2 MQ的优势和劣势
1.2.1 MQ的优势
应用解耦
1 2 3 4
| 订单系统依赖 ---> 库存系统,支付系统,物流系统 当库存系统出现问题时,导致订单系统没有办法正常工作. 系统耦合性越高, 容错性就越低, 可维护性也就越低.
|
异步提速
1 2
| 一个下单操作需要耗时: 20 + 300 + 300 + 300 = 920 执行效率太慢
|
1 2
| 用户点击下单按钮时,后台只需要将信息写入数据库,并将消息存入MQ,响应给客户端即可 真正处理的时间用户时感知不到了.
|
削峰填谷
1
| 使用了MQ之后, 限制消费消息的速度为1000, 这样以来, 高峰期产生的数据势必会被积压在MQ中, 高峰期就给"削"掉了, 但是因为消息积压, 在高峰期过后的一段时间内, 消费消息的速度还是会维持在1000, 直到消费完积压的消息, 这就是"填谷"
|
小结
1 2 3
| 应用解耦: 提高系统容错性和可维护性 异步提速: 提升用户体验和系统吞吐量 削峰填谷: 增加系统稳定性
|
1.2.2 MQ的劣势
1 2 3 4 5 6
| 系统可用性降低: 系统引入的外部依赖越多,系统的稳定性越差. 一旦MQ宕机, 将会对业务造成影响. 系统复杂度提高: MQ的加入大大增加了系统的复杂度, 以前系统之间是同步的远程调用, 现在是通过MQ进行异步调用. 如何保证消息没有被重复消费? 怎么处理消息丢失情况? 怎么保证消息传递的顺序性? 一致性问题: A系统处理完业务, 通过MQ给B,C,D三个系统发数据, 如果B系统,C系统处理成功, D系统处理失败. 如何保证消息数据处理的一致性?
|
1.2.3 MQ小结
1 2 3 4
| 既然MQ有优势也有劣势, 那么使用MQ需要满足什么条件呢? 1.生产者不需要从消费者处获取反馈. 引入消息队列之前的直接调用,其接口返回值必须为空.这样上层才能继续往后执行. 2.允许短暂的不一致性 3.确实是用了有效果. 既解耦,提速,削峰这些方法的收益,超过加入MQ,管理MQ的成本.
|
1.3 常见的MQ产品
1.4 AMQP和JMS
AMQP
AMQP, 既Advanced Message Queuing Protocol(高级消息队列协议), 是一个网络协议, 是应用层协议的一个开发标准, 为面向消息的中间件设计. 基于此协议的客户端与消息中间件就可以传递消息了. 2006年,AMQP规范发布. 类似HTTP.
JMS(了解)
JMS 既 Java消息服务(JavaMessage Service) 应用程序接口, 是一个java平台中关于面向消息中间件的API.
JMS是JavaEE 13种规范中的一种, 类比JDBC.
很多消息中间件都实现了JMS规范, 例如: ActiveMQ.
RabbitMQ官方没有提供JMS的实现包, 但是开源社区有相关实现.
1 2
| AMQP是协议,是消息中间件通讯的协议, 类比HTTP JMS是API规范接口,类比JDBC
|
1.5 RabbitMQ简介
概述
2007年, Rabbit技术公司基于AMQP标准开发的RabbitMQ1.0版本发布了. RabbitMQ采用Erlang语言开发.Erlang语言是针开发高并发和分布式系统的一种语言, 在电信领域应用广泛.
RabbitMQ是基于AMQP的一款消息管理系统
官网: http://www.rabbitmq.com/
官方教程:http://www.rabbitmq.com/getstarted.html
相关概念
工作模式
RabbitMQ提供了6种消息模型,但是第6种其实是RPC,并不是MQ,因此不予学习。那么也就剩下5种。
但是其实3、4、5这三种都属于订阅模型,只不过进行路由的方式不同。
小结
1 2 3 4 5 6 7 8 9 10 11 12
| rabbitMQ下载方式: windows Linux mac docker rabbitMQ工作模式: rabbitMQ相关概念: Broker: 代理,指rabbitMQ服务器 虚拟机: 相当于mysql的Database 交换机: 判断消息类型,分发消息到不同的队列中 队列: 存放消息的最终位置 Connection: 连接 channel: 通道/甬道 生成者: 往MQ中存放消息的一方 消费者: 从MQ中拉取消息的一方
|
第二章 RabbitMQ下载与安装
2.1 下载与安装
官网下载地址:http://www.rabbitmq.com/download.html
快速入门地址:https://www.rabbitmq.com/getstarted.html
window安装
rabbit的安装依赖于erlang,所以得先安装erlang:
erlang安装
erlang下载:https://www.erlang.org/downloads
可以直接拷贝《资料/otp_win64_23.0.exe》
下载好的exe文件直接双击运行:
设置环境变量:
验证erlang环境:
rabbit安装
rabbitmq下载:http://www.rabbitmq.com/download.html
可以直接拷贝《资料/rabbitmq-server-3.8.6.exe》
双击exe文件直接安装:
cmd进入安装目录:C:\Program Files\RabbitMQ Server\rabbitmq_server-3.8.6\sbin
安装web管理插件:
rabbitmq-plugins.bat enable rabbitmq_management
用guest/guest登入:
到此rabbitmq就安装好了!
Linux安装
erlang安装
安装erlang需要的依赖:
1
| # yum -y install make gcc gcc-c++ kernel-devel m4 ncurses-devel openssl-devel unixODBC-devel xz
|
下载erlang源文件(可以直接拷贝《资料/otp_src_23.0.tar.gz》):
解压:
1
| # tar -xvzf otp_src_23.0.tar.gz
|
执行安装:
1 2 3 4
| # cd otp_src_23.0/ # ./configure --prefix=/usr/local/erlang --with-ssl -enable-threads -enable-smmp-support -enable-kernel-poll --enable-hipe --without-javac # make # make install
|
安装erlang,运行./configure提示如下错误
configure: error: No curses library functions found
configure: error: /bin/sh ‘/home/jiayi/otp_src_18.2.1/erts/configure’ failed for erts
解决:
yum -y install ncurses-devel
然后./configure
make
make install
配置环境变量:
1 2 3 4 5 6 7 8 9
|
ERLANG_HOME=/usr/local/erlang PATH=$PATH:$JAVA_HOME/bin:$ERLANG_HOME/bin
|
验证erlang环境:
以上步骤就安装完了erlang的环境了!
rabbitmq安装
下载安装包:
1
| # wget https://github.com/rabbitmq/rabbitmq-server/releases/download/v3.8.6/rabbitmq-server-generic-unix-3.8.6.tar.xz
|
解压下载的xz文件
1 2 3
| # xz -d rabbitmq-server-generic-unix-3.8.6.tar.xz # 生成的rabbitmq-server-generic-unix-3.8.6.tar文件继续解压 # tar -xvf rabbitmq-server-generic-unix-3.8.6.tar
|
启用web管理界面:./rabbitmq-plugins enable rabbitmq_management
如果出现{:query, :rabbit@server1, {:badrpc, :timeout}}错误,那么需要将主机名在hosts文件中进行配置本机的IP地址:
再次启用web管理:
启动rabbitmq:
./rabbitmq-server -detached
访问控制台:
http://192.168.148.139:15672/
使用guest/guest访问:
guest从3.3版本开始就禁用访问了,咋们可以添加一个用户然后设置权限:
1 2 3
| # 添加一个用户admin 密码是 admin # ./rabbitmqctl add_user admin admin # ./rabbitmqctl set_user_tags admin administrator
|
注意:如果访问不了,出现以下错误就需要检查防火墙是否关闭。
linux防火墙关闭(centos7)
1:查看防火墙状态
systemctl status firewalld
2:停止防火墙
systemctl stop firewalld.service
3:禁止开机启动
systemctl disable firewalld.service
4:重启防火墙
systemctl enable firewalld
Docker安装
1 2 3 4 5 6 7 8 9 10 11 12
| docker run --restart=always \ --network leyou-network \ -e RABBITMQ_DEFAULT_USER=leyou \ -e RABBITMQ_DEFAULT_PASS=123321 \ --name mq \ --hostname mq1 \ -p 15672:15672 \ -p 5672:5672 \ -d \ rabbitmq:3-management
|
2.2 使用管理界面
1)界面总览
2)用户权限管理
我们新建的itcast用户,不具备访问权限,如图:
我们需要添加对virtual hosts的访问权限,点击itcast用户,进入用户管理界面,然后添加权限:
添加权限以后,可以看到用户已经具备了访问权:
1 2 3 4 5 6 7 8 9 10
| docker run \ -e RABBITMQ_DEFAULT_USER=leyou \ -e RABBITMQ_DEFAULT_PASS=123321 \ --name mq \ --hostname mq1 \ -p 15672:15672 \ -p 5672:5672 \ -d \ rabbitmq:3-management
|
第三章 RabbitMQ开发
3.1 Rabbit快速入门
需求:
步骤:
1 2 3 4
| 1.创建工程(生产者,消费者) 2.分别添加依赖 3.编写生产者发送消息 4.编写消费者接收消息
|
pom
1 2 3 4 5 6 7
| <dependencies> <dependency> <groupId>com.rabbitmq</groupId> <artifactId>amqp-client</artifactId> <version>5.7.3</version> </dependency> </dependencies>
|
生产者:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64
| package com.itheima.demo1;
import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory;
import java.io.IOException; import java.util.concurrent.TimeoutException;
public class Demo1Provider {
public static void main(String[] args) throws IOException, TimeoutException { ConnectionFactory factory = new ConnectionFactory(); factory.setHost("192.168.190.153"); factory.setPort(5672); factory.setVirtualHost("/itheima129"); factory.setUsername("leyou"); factory.setPassword("123321"); Connection connection = factory.newConnection(); Channel channel = connection.createChannel();
channel.queueDeclare("hello_world", true, false, false, null); for (int i =1;i<=10;i++){ String message = "Hello RabbitMQ! "+i;
channel.basicPublish("", "hello_world", null, message.getBytes()); }
System.out.println("发布消息成功..."); channel.close(); connection.close(); } }
|
消费者:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75
| package com.itheima.demo1;
import com.rabbitmq.client.*;
import java.io.IOException; import java.util.concurrent.TimeoutException;
public class Demo1Consumer { public static void main(String[] args) throws IOException, TimeoutException { ConnectionFactory factory = new ConnectionFactory(); factory.setHost("192.168.190.153"); factory.setPort(5672); factory.setVirtualHost("/itheima129"); factory.setUsername("leyou"); factory.setPassword("123321"); Connection connection = factory.newConnection(); Channel channel = connection.createChannel();
channel.queueDeclare("hello_world", true, false, false, null); Consumer consumer = new DefaultConsumer(channel){
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
try { System.out.println("数据: "+new String(body)); channel.basicAck(envelope.getDeliveryTag(),false); System.out.println("============"); } catch (Exception e) { channel.basicNack(envelope.getDeliveryTag(),false,true); e.printStackTrace();
} } }; channel.basicConsume("hello_world",false,consumer); System.in.read(); } }
|
小结
官方文档说明:
RabbitMQ是一个消息的代理者(Message Broker):它接收消息并且传递消息。
你可以认为它是一个邮局:当你投递邮件到一个邮箱,你很肯定邮递员会终究会将邮件递交给你的收件人。与此类似,RabbitMQ 可以是一个邮箱、邮局、同时还有邮递员。
不同之处在于:RabbitMQ不是传递纸质邮件,而是二进制的数据
基本消息模型图:
1 2 3 4
| 在上图的模型中,有以下概念: - P:生产者,也就是要发送消息的程序 - C:消费者:消息的接受者,会一直等待消息到来。 - queue:消息队列,图中红色部分。类似一个邮箱,可以缓存消息;生产者向其中投递消息,消费者从其中取出消息。
|
3.2 work消息模型
3.2.1.说明
在刚才的基本模型中,一个生产者,一个消费者,生产的消息直接被消费者消费。比较简单。
Work queues,也被称为(Task queues),任务模型。
当消息处理比较耗时的时候,可能生产消息的速度会远远大于消息的消费速度。长此以往,消息就会堆积越来越多,无法及时处理。此时就可以使用work 模型:让多个消费者绑定到一个队列,共同消费队列中的消息。队列中的消息一旦消费,就会消失,因此任务是不会被重复执行的。
角色:
- P:生产者:任务的发布者
- C1:消费者,领取任务并且完成任务,假设完成速度较慢
- C2:消费者2:领取任务并完成任务,假设完成速度快
3.2.2.生产者
工具类
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27
| package com.itheima.utils;
import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory;
import java.io.IOException; import java.util.concurrent.TimeoutException;
public class ConnectionUtil { public static Connection getConnection() throws IOException, TimeoutException { ConnectionFactory factory = new ConnectionFactory(); factory.setHost("192.168.190.153"); factory.setPort(5672); factory.setVirtualHost("/itheima129"); factory.setUsername("leyou"); factory.setPassword("123321"); Connection connection = factory.newConnection(); return connection; } }
|
生产者与模式1中的几乎一样:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43
| package com.itheima.task_queue;
import com.itheima.utils.ConnectionUtil; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection;
public class ProviderDemo2 { public static void main(String[] argv) throws Exception { Connection connection = ConnectionUtil.getConnection(); Channel channel = connection.createChannel();
channel.queueDeclare("work-queue", false, false, false, null); for (int i = 0; i < 5000; i++) { String message = "task .. " + i; channel.basicPublish("","work-queue", null, message.getBytes()); System.out.println(" 发布了 " + message + " ... "); } channel.close(); connection.close(); } }
|
我们循环发送50条消息。
3.2.3.消费者
消费者1: 睡一秒 较慢
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45
| package com.itheima.task_queue;
import com.itheima.utils.ConnectionUtil; import com.rabbitmq.client.*;
import java.io.IOException;
public class ConsumerDemo2 {
public static void main(String[] argv) throws Exception { Connection connection = ConnectionUtil.getConnection(); final Channel channel = connection.createChannel(); channel.queueDeclare("work-queue", false, false, false, null); channel.basicQos(1);
DefaultConsumer consumer = new DefaultConsumer(channel) { @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { try { String msg = new String(body); System.out.println(" [消费者1] 消费了 : " + msg + "!"); Thread.sleep(1000); channel.basicAck(envelope.getDeliveryTag(), false); } catch (Exception e) { channel.basicNack(envelope.getDeliveryTag(),false,true); }
} }; channel.basicConsume("work-queue", false, consumer); } }
|
消费者2: 较快一方
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45
| package com.itheima.task_queue;
import com.itheima.utils.ConnectionUtil; import com.rabbitmq.client.*;
import java.io.IOException;
public class ConsumerDemo2_2 {
public static void main(String[] argv) throws Exception { Connection connection = ConnectionUtil.getConnection(); final Channel channel = connection.createChannel(); channel.queueDeclare("work-queue", false, false, false, null); channel.basicQos(1);
DefaultConsumer consumer = new DefaultConsumer(channel) { @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { try { String msg = new String(body); System.out.println(" [消费者2] 消费了 : " + msg + "!"); channel.basicAck(envelope.getDeliveryTag(), false); } catch (Exception e) { channel.basicNack(envelope.getDeliveryTag(),false,true); }
} }; channel.basicConsume("work-queue", false, consumer); } }
|
与消费者1基本类似,就是没有设置消费耗时时间。
这里是模拟有些消费者快,有些比较慢。
接下来,两个消费者一同启动,然后发送50条消息:
可以发现,两个消费者各自消费了25条消息,而且各不相同,这就实现了任务的分发。
3.2.4.能者多劳原则
刚才的实现有问题吗?
- 消费者1比消费者2的效率要低,一次任务的耗时较长
- 然而两人最终消费的消息数量是一样的
- 消费者2大量时间处于空闲状态,消费者1一直忙碌
现在的状态属于是把任务平均分配,正确的做法应该是消费越快的人,消费的越多。
怎么实现呢?
我们可以修改设置,让消费者同一时间只接收一条消息,这样处理完成之前,就不会接收更多消息,就可以让处理快的人,接收更多消息 :
再次测试:
3.3 订阅模型分类
订阅模型示意图:
前面2个案例中,只有3个角色:
- P:生产者,也就是要发送消息的程序
- C:消费者:消息的接受者,会一直等待消息到来。
- queue:消息队列,图中红色部分。类似一个邮箱,可以缓存消息;生产者向其中投递消息,消费者从其中取出消息。
而在订阅模型中,多了一个exchange角色,而且过程略有变化:
- P:生产者,也就是要发送消息的程序,但是不再发送到队列中,而是发给X(交换机)
- C:消费者,消息的接受者,会一直等待消息到来。
- Queue:消息队列,接收消息、缓存消息。
- Exchange:交换机,图中的X。一方面,接收生产者发送的消息。另一方面,知道如何处理消息,例如递交给某个特别队列、递交给所有队列、或是将消息丢弃。到底如何操作,取决于Exchange的类型。Exchange有以下3种类型:
- Fanout:广播,将消息交给所有绑定到交换机的队列
- Direct:定向,把消息交给符合指定routing key 的队列
- Topic:通配符,把消息交给符合routing pattern(路由模式) 的队列
Exchange(交换机)只负责转发消息,不具备存储消息的能力,因此如果没有任何队列与Exchange绑定,或者没有符合路由规则的队列,那么消息会丢失!
3.4.广播模型-Fanout
Fanout,英文翻译是扇出,我觉得在MQ中叫广播更合适。
3.4.1.流程说明
流程图:
在广播模式下,消息发送流程是这样的:
- 1) 可以有多个消费者
- 2) 每个消费者有自己的queue(队列)
- 3) 每个队列都要绑定到Exchange(交换机)
- 4) 生产者发送的消息,只能发送到交换机,交换机来决定要发给哪个队列,生产者无法决定。
- 5) 交换机把消息发送给绑定过的所有队列
- 6) 队列的消费者都能拿到消息。实现一条消息被多个消费者消费
3.4.2.生产者
两个变化:
- 1) 声明Exchange,不再声明Queue
- 2) 发送消息到Exchange,不再发送到Queue
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43
| package com.itheima.demo3;
import com.itheima.utils.ConnectionUtil; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection;
public class FanoutProducer {
public static void main(String[] args) throws Exception {
Connection connection = ConnectionUtil.getConnection(); Channel channel = connection.createChannel();
String exchangeName = "exchange-fanout"; channel.exchangeDeclare(exchangeName,"fanout",true); String fanoutQueue1 = "fanout_queue1"; channel.queueDeclare(fanoutQueue1, false, false, false,null); String fanoutQueue2 = "fanout_queue2"; channel.queueDeclare(fanoutQueue2, false, false, false,null);
channel.queueBind(fanoutQueue1,exchangeName,""); channel.queueBind(fanoutQueue2,exchangeName,""); for(int i=0;i<10;i++){ channel.basicPublish(exchangeName,"", true,null,"hello fanout!".getBytes()); } channel.close(); connection.close();
}
}
|
扇出方式 两个队列都会接受交换机 传递过来的消息!
3.4.3.消费者
消费者1
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45
| package com.itheima.demo3;
import com.itheima.utils.ConnectionUtil; import com.rabbitmq.client.*;
import java.io.IOException;
public class FanoutConsumer1 {
public static void main(String[] args) throws Exception { Connection connection = ConnectionUtil.getConnection(); final Channel channel = connection.createChannel();
String queueName = "fanout_queue1"; channel.queueBind(queueName, "exchange-fanout", "");
DefaultConsumer consumer = new DefaultConsumer(channel) { @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { try { String msg = new String(body);
System.out.println(" fanout-consumer 1 : [x] received : " + msg + "!"); channel.basicAck(envelope.getDeliveryTag(),false); } catch (Exception e){ e.printStackTrace(); channel.basicNack(envelope.getDeliveryTag(),false,true); } } }; channel.basicConsume(queueName, false, consumer);
}
}
|
要注意代码中:队列需要和交换机绑定
1 2 3 4
| String queueName = "fanout_queue1";
channel.queueBind(queueName, "exchange-fanout", "");
|
消费者2
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46
| package com.itheima.demo3;
import com.itheima.utils.ConnectionUtil; import com.rabbitmq.client.*;
import java.io.IOException;
public class FanoutConsumer2 {
public static void main(String[] args) throws Exception { Connection connection = ConnectionUtil.getConnection(); final Channel channel = connection.createChannel();
String queueName = "fanout_queue2"; channel.queueBind(queueName, "exchange-fanout", "");
DefaultConsumer consumer = new DefaultConsumer(channel) { @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { try { String msg = new String(body);
System.out.println(" fanout-consumer 2 : [x] received : " + msg + "!"); channel.basicAck(envelope.getDeliveryTag(),false); } catch (Exception e){ e.printStackTrace(); channel.basicNack(envelope.getDeliveryTag(),false,true); } } }; channel.basicConsume(queueName, false, consumer);
}
}
|
测试: 先启动生产者发送消息
然后 我们再运行消费者1,然后再运行消费者2:
扇出方式: 两个队列彼此独立,互不干扰!
3.5.定向模型-Direct
3.5.1.说明
在Fanout模式中,一条消息,会被所有订阅的队列都消费。但是,在某些场景下,我们希望不同的消息被不同的队列消费。这时就要用到Direct类型的Exchange。
在Direct模型下:
- 队列与交换机的绑定,不能是任意绑定了,而是要指定一个
RoutingKey
(路由key) - 消息的发送方在 向 Exchange发送消息时,也必须指定消息的
RoutingKey
。 - Exchange不再把消息交给每一个绑定的队列,而是根据消息的
Routing Key
进行判断,只有队列的Routingkey
与消息的 Routing key
完全一致,才会接收到消息
流程图:
图解:
- P:生产者,向Exchange发送消息,发送消息时,会指定一个routing key。
- X:Exchange(交换机),接收生产者的消息,然后把消息递交给 与routing key完全匹配的队列
- C1:消费者,其所在队列指定了需要routing key 为 error 的消息
- C2:消费者,其所在队列指定了需要routing key 为 info、error、warning 的消息
3.5.2.生产者
此处我们模拟商品的增删改,发送消息的RoutingKey分别是:insert、update、delete
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43
| package com.itheima.demo4;
import com.itheima.utils.ConnectionUtil; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection;
public class DirectProducer {
public static void main(String[] args) throws Exception {
Connection connection = ConnectionUtil.getConnection(); Channel channel = connection.createChannel(); String exchangeName = "exchange-direct"; channel.exchangeDeclare(exchangeName,"direct",false); String directQueue1 = "direct_queue1"; channel.queueDeclare(directQueue1, false, false, false,null); String directQueue2 = "direct_queue2"; channel.queueDeclare(directQueue2, false, false, false,null);
channel.queueBind(directQueue1,exchangeName,"insert"); channel.queueBind(directQueue2,exchangeName,"delete"); channel.queueBind(directQueue2,exchangeName,"update"); for(int i=0;i<10;i++){ channel.basicPublish(exchangeName,"insert", true,null,"hello direct insert!".getBytes()); channel.basicPublish(exchangeName,"delete", true,null,"hello direct delete!".getBytes()); channel.basicPublish(exchangeName,"update", true,null,"hello direct update!".getBytes()); }
channel.close(); connection.close(); }
}
|
3.5.3.消费者
我们此处假设消费者1只接收1种类型的消息:例如 接受添加商品信息。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43
| package com.itheima.demo4; import com.itheima.utils.ConnectionUtil; import com.rabbitmq.client.*;
import java.io.IOException;
public class DirectConsumer1 { public static void main(String[] args) throws Exception {
Connection connection = ConnectionUtil.getConnection(); final Channel channel = connection.createChannel();
String queueName = "direct_queue1"; channel.queueBind(queueName, "exchange-direct", "insert");
DefaultConsumer consumer = new DefaultConsumer(channel) { @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { try { String msg = new String(body);
System.out.println(" direct-consumer 1 : [x] received : " + msg + "!"); channel.basicAck(envelope.getDeliveryTag(),false); } catch (Exception e){ e.printStackTrace(); channel.basicNack(envelope.getDeliveryTag(),false,true); } } }; channel.basicConsume(queueName, false, consumer);
} }
|
消费者2
我们此处假设消费者2接收2种类型的消息:更新商品和删除商品。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43
| package com.itheima.demo4; import com.itheima.utils.ConnectionUtil; import com.rabbitmq.client.*;
import java.io.IOException; public class DirectConsumer2 { public static void main(String[] args) throws Exception {
Connection connection = ConnectionUtil.getConnection(); final Channel channel = connection.createChannel();
String queueName2 = "direct_queue2"; channel.queueBind(queueName2, "exchange-direct", "delete"); channel.queueBind(queueName2, "exchange-direct", "update");
DefaultConsumer consumer = new DefaultConsumer(channel) { @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { try { String msg = new String(body);
System.out.println(" direct-consumer 2 : [x] received : " + msg + "!"); channel.basicAck(envelope.getDeliveryTag(),false); } catch (Exception e){ e.printStackTrace(); channel.basicNack(envelope.getDeliveryTag(),false,true); } } }; channel.basicConsume(queueName2, false, consumer);
} }
|
3.6.通配符模型-Topic
3.6.1.说明
Topic
类型的Exchange
与Direct
相比,都是可以根据RoutingKey
把消息路由到不同的队列。只不过Topic
类型Exchange
可以让队列在绑定Routing key
的时候使用通配符!
Routingkey
一般都是有一个或多个单词组成,多个单词之间以”.”分割,例如: item.insert
通配符规则:
#
:匹配一个或多个词
*
:匹配不多不少恰好1个词
举例:
item.#
:能够匹配item.spu.insert
或者 item.spu
item.*
:只能匹配item.spu
图示:
解释:
- 红色Queue:绑定的是
usa.#
,因此凡是以 usa.
开头的routing key
都会被匹配到 - 黄色Queue:绑定的是
#.news
,因此凡是以 .news
结尾的 routing key
都会被匹配
3.6.2.生产者
使用topic类型的Exchange,发送消息的routing key
注意: 此案例 先启动消费端 按照指定路由监听: 再启动服务端,再投递消息!
可以根据 user.update product.insert user.insert 测试
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44
| package com.itheima.topic;
import com.itheima.utils.ConnectionUtil; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection;
public class Demo5Provider { public static void main(String[] args) throws Exception {
Connection connection = ConnectionUtil.getConnection(); Channel channel = connection.createChannel(); String exchangeName = "exchange-topic"; channel.exchangeDeclare(exchangeName,"topic",false); String topicQueue1 = "topic_queue1"; channel.queueDeclare(topicQueue1, false, false, false,null); String topicQueue2 = "topic_queue2"; channel.queueDeclare(topicQueue2, false, false, false,null);
channel.queueBind(topicQueue1,exchangeName,"abc.#"); channel.queueBind(topicQueue2,exchangeName,"#.hello"); channel.queueBind(topicQueue2,exchangeName,"*.world"); for(int i=0;i<10;i++){ channel.basicPublish(exchangeName,"abc.abc", true,null,"abc前缀的值!".getBytes()); channel.basicPublish(exchangeName,"abc.demo.hello", true,null,"hello ".getBytes()); channel.basicPublish(exchangeName,"update.world", true,null,"update_world!".getBytes()); }
channel.close(); connection.close(); }
}
|
3.6.3.消费者1
我们此处假设消费者1只接收的RoutingKey为 user.#
,代表所有以user开头的key
注意: 本案例 :交换机和队列 均由 消费者创建 ,生产者不创建 交换机和队列来演示
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44
| package com.itheima.topic;
import com.itheima.utils.ConnectionUtil; import com.rabbitmq.client.*;
import java.io.IOException;
public class Demo5Consumer1 { public static void main(String[] args) throws Exception {
Connection connection = ConnectionUtil.getConnection(); final Channel channel = connection.createChannel();
String queueName = "topic_queue1"; channel.queueBind(queueName, "exchange-topic", "abc.#");
DefaultConsumer consumer = new DefaultConsumer(channel) { @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { try { String msg = new String(body);
System.out.println(" 获取 : " + msg + "!"); channel.basicAck(envelope.getDeliveryTag(),false); } catch (Exception e){ e.printStackTrace(); channel.basicNack(envelope.getDeliveryTag(),false,true); } } }; channel.basicConsume(queueName, false, consumer);
} }
|
3.6.4.消费者2
我们此处假设消费者2接收的消息key:*.insert
,与新增有关。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45
| package com.itheima.topic;
import com.itheima.utils.ConnectionUtil; import com.rabbitmq.client.*;
import java.io.IOException;
public class Demo5Consumer2 { public static void main(String[] args) throws Exception {
Connection connection = ConnectionUtil.getConnection(); final Channel channel = connection.createChannel();
String queueName = "topic_queue2"; channel.queueBind(queueName, "exchange-topic", "#.hello"); channel.queueBind(queueName, "exchange-topic", "*.world");
DefaultConsumer consumer = new DefaultConsumer(channel) { @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { try { String msg = new String(body);
System.out.println(" 获取 : " + msg + "!"); channel.basicAck(envelope.getDeliveryTag(),false); } catch (Exception e){ e.printStackTrace(); channel.basicNack(envelope.getDeliveryTag(),false,true); } } }; channel.basicConsume(queueName, false, consumer);
} }
|
3.7.持久化
为了提高并发能力,MQ的数据默认是在内存中存储的,包括交换机、队列、消息。
这样就会出现数据安全问题,如果服务宕机,存储在MQ中未被消费的消息都会丢失。
所以我们需要将交换机、队列、消息持久化到硬盘,以防服务宕机。
交换机持久化:
队列持久化:
消息持久化:
第四章 SpringBoot AMQP(重点掌握)
4.1.简介
Sprin有很多不同的项目,其中就有对AMQP的支持:
Spring AMQP的页面:http://projects.spring.io/spring-amqp/
注意这里一段描述:
Spring-amqp是对AMQP协议的抽象实现,而spring-rabbit 是对协议的具体实现,也是目前的唯一实现。底层使用的就是RabbitMQ。
4.2.依赖和配置
添加AMQP的启动器:
1 2 3 4
| <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-amqp</artifactId> </dependency>
|
在application.yml
中添加RabbitMQ地址: 手动在控制台 新建一个虚拟机 /heima
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20
| server: port: 8081 spring: application: name: producer-application rabbitmq: virtual-host: /itheima129 username: leyou password: 123321 addresses: 192.168.190.153:5672 template: retry: enabled: true initial-interval: 10000ms max-interval: 80000ms multiplier: 2 exchange: publisher-confirms: true
|
4.3.快速入门
我们以直连 direct 为例,看看Spring中如何发送消息、接收消息。
4.3.1.生成者发送消息
Spring为AMQP提供了统一的消息处理模板:AmqpTemplate,非常方便的发送消息,其发送方法:
比较常用的3个方法,分别是:
- 指定交换机、RoutingKey和消息体
- 指定消息(默认队列)
- 指定RoutingKey和消息,这里的RoutingKey其实是队列名称
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29
| <?xml version="1.0" encoding="UTF-8"?> <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion>
<groupId>com.itheima</groupId> <artifactId>day08-rabbit-producer</artifactId> <version>1.0-SNAPSHOT</version> <parent> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-parent</artifactId> <version>2.2.10.RELEASE</version> <relativePath/> </parent>
<dependencies> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-web</artifactId> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-amqp</artifactId> </dependency> </dependencies> </project>
|
代码如下: 向指定队列直接发送消息
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26
| package com.itheima.controller;
import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.web.bind.annotation.PathVariable; import org.springframework.web.bind.annotation.RequestMapping; import org.springframework.web.bind.annotation.RestController;
@RestController public class ProducerController { @Autowired private RabbitTemplate rabbitTemplate;
@RequestMapping("/send/{msg}") public String sendMsg(@PathVariable("msg") String msg){ for (int i = 1; i <= 10; i++) { rabbitTemplate.convertAndSend("boot-queue",msg); } return "OK"; } }
|
启动main 浏览器 输入地址 :
http://localhost:8081/send
结果:
4.3.2.接受消息
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16
| @Component public class TestConsumer {
@RabbitListener(queues = "heima-queue") public void receive(String msg, Channel channel, Message message) throws IOException { long deliveryTag = message.getMessageProperties().getDeliveryTag(); try { System.out.println("收到消息id:" + deliveryTag); Thread.sleep(1000); System.out.println("message:" +message.toString()); }catch (Exception e){ } }
}
|
补充:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58
| package com.leyou.search.mq;
import com.leyou.common.constants.MQConstants; import com.leyou.common.exception.LyException; import com.leyou.search.entity.Goods; import com.leyou.search.repository.GoodsRepository; import com.leyou.search.repository.SearchService; import com.leyou.starter.elastic.handler.DataResponseHandler; import org.apache.http.HttpStatus; import org.apache.lucene.search.TotalHits; import org.elasticsearch.action.search.SearchResponse; import org.elasticsearch.index.query.QueryBuilders; import org.elasticsearch.search.builder.SearchSourceBuilder; import org.springframework.amqp.core.ExchangeTypes; import org.springframework.amqp.core.Message; import org.springframework.amqp.rabbit.annotation.Exchange; import org.springframework.amqp.rabbit.annotation.Queue; import org.springframework.amqp.rabbit.annotation.QueueBinding; import org.springframework.amqp.rabbit.annotation.RabbitListener; import org.springframework.stereotype.Component;
@Component public class ItemListener { private final SearchService searchService;
public ItemListener(SearchService searchService) { this.searchService = searchService; }
@RabbitListener(bindings = @QueueBinding( value = @Queue(name = MQConstants.QueueConstants.SEARCH_ITEM_UP, durable = "true"), exchange = @Exchange(value = MQConstants.ExchangeConstants.ITEM_EXCHANGE_NAME, type = ExchangeTypes.TOPIC), key = MQConstants.RoutingKeyConstants.ITEM_UP_KEY )) public void listenerUp(Long spuId) {
searchService.saveGoodsById(spuId); }
@RabbitListener(bindings = @QueueBinding( value = @Queue(name = MQConstants.QueueConstants.SEARCH_ITEM_DOWN, durable = "true"), exchange = @Exchange(value = MQConstants.ExchangeConstants.ITEM_EXCHANGE_NAME, type = ExchangeTypes.TOPIC), key = MQConstants.RoutingKeyConstants.ITEM_DOWN_KEY )) public void listenerDown(Long spuId) { searchService.deleteGoodsById(spuId); }
}
|
运行后查看日志:
springboot默认也是ack自动确认!
4.4.消息转换器
之前说过,Spring会把你发送的消息序列化为字节发送给MQ,接收消息的时候,还会把字节反序列化为Java对象。
只不过,默认情况下Spring采用的序列化方式是JDK序列化。总所周知,JDK序列化存在下列问题:
我们来测试一下。
4.4.1.测试发送Java对象
我们修改消息发送的代码,发送一个Map对象:
1 2 3 4 5 6 7 8
| @GetMapping("/sendMap") public String testSendMap() throws InterruptedException { Map<String,Object> msg = new HashMap<>(); msg.put("name", "Jack"); msg.put("age", 21); rabbitTemplate.convertAndSend("heima-queue", msg); return "mapok"; }
|
暂时关闭消费者的监听:
重新运行测试类。
然后在MQ的控制台可以查看到消息,如下:
4.4.2.配置JSON转换器 (生产者 消费者都要配置, 否则序列化的数据会出错)
显然,JDK序列化方式并不合适。我们希望消息体的体积更小、可读性更高,因此可以使用JSON方式来做序列化和反序列化。
引入依赖:
1 2 3 4 5
| <dependency> <groupId>com.fasterxml.jackson.dataformat</groupId> <artifactId>jackson-dataformat-xml</artifactId> <version>2.9.10</version> </dependency>
|
配置消息转换器。
在启动类中添加一个Bean即可:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16
| package com.leyou.item.config;
import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration;
@Configuration public class RabbitConfig {
@Bean public Jackson2JsonMessageConverter messageConverter(){ return new Jackson2JsonMessageConverter(); } }
|
重新启动 再次查看结果
4.4.3.测试
再次发送消息,查看效果:
此时,消费者也可以用Map来接收消息了:Map接受即可,消费者端 别忘记导入依赖:和转换器
1 2 3 4 5 6 7 8 9 10 11 12 13
| <dependency> <groupId>com.fasterxml.jackson.dataformat</groupId> <artifactId>jackson-dataformat-xml</artifactId> <version>2.9.10</version> </dependency>
@Bean public MessageConverter jsonMessageConverter(){ // 创建JSON的消息转换器 return new Jackson2JsonMessageConverter(); }
|
** **
结果:
第五章 SpringBoot整合邮件发送
SpringBoot 完成邮件的投递
准备工作: 登录自己的开通邮件的 以163为例
生成自己唯一的授权码
- 基于springboot环境导入email发送依赖
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23
| <parent> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-parent</artifactId> <version>2.4.0</version> <relativePath/> </parent> <dependencies> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter</artifactId> </dependency>
<dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-mail</artifactId> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-test</artifactId> <scope>test</scope> </dependency> </dependencies>
|
1 2 3 4 5 6 7
| # # spring.mail.username=fanqixxxx@163.com # # spring.mail.password=xxxxxxx # # spring.mail.host=smtp.163.com # spring.mail.properties.mail.smtp.ssl.enable=true
|
1 2 3 4 5 6 7 8 9 10 11 12 13
| spring: mail: username: xsitheima163@163.com password: AKELOGXEFRJZOAWU host: smtp.163.com properties: mail: smtp: ssl: enable: true
|
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21
| @SpringBootTest @SpringBootApplication @RunWith(SpringRunner.class) public class EmailSend {
@Autowired private JavaMailSenderImpl javaMailSender; @Test public void send(){ try { SimpleMailMessage message = new SimpleMailMessage(); message.setSubject("传智健康官方邮件"); message.setText("你好,请保持好验证码:7788,打死都不能泄露给你的同桌"); message.setTo("tps520tps@163.com"); message.setFrom("tps520wx@163.com"); javaMailSender.send(message); } catch (MailException e) { e.printStackTrace(); } } }
|
查收邮件:
邮件发送示例
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30
| package com.itheima;
import org.junit.jupiter.api.Test; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.boot.test.context.SpringBootTest; import org.springframework.mail.MailException; import org.springframework.mail.SimpleMailMessage; import org.springframework.mail.javamail.JavaMailSenderImpl;
@SpringBootTest class SendEmailApplicationTests { @Autowired private JavaMailSenderImpl javaMailSender; @Test public void sendMail() { try { SimpleMailMessage message = new SimpleMailMessage(); message.setSubject("表白邮件"); message.setText("rose 你好, 我想....."); message.setTo("xsitheima126@126.com"); message.setFrom("xsitheima163@163.com"); javaMailSender.send(message); System.out.println("========发送成功"); } catch (MailException e) { e.printStackTrace(); } }
}
|
预备账号
1 2 3 4 5
| xsitheima126@126.com a123456 授权码: YKJOGNHCCFDLIJKO xsitheima163@163.com a12345678 授权码: AKELOGXEFRJZOAWU
|
总结
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33
| MQ: 消息队列 消息在传输过程中保存消息的容器.主要使用在分布式环境下 MQ优势: 应用解耦 快速应用变更与维护 削峰填谷 MQ劣势: 可用性降低 系统复杂度提高 短时间内无法保证一致性 在什么情况下使用MQ? 访问的服务无返回值的情况下 允许短暂性不一致的情况下 使用MQ的优势大于维护成本 AMQP协议: 高级消息队列协议,约束消息生产者和消费者与MQ传递数据时的规范 生产者 ----> 交换机 ----> 路由 ----> 队列 <---- 消费者 JMS: java提供的操作消息中间件的API RabbitMQ概述: erlang语言基于AMQP协议开发的一个消息队列软件. 五种工作机制: 直连式: 生产者 ----> 队列 <---- 消费者 work模式(任务模式): 生产者 ----> 队列 <---- 多个消费者(平分队列中的消息) 订阅模式: 路由模式: topics:
|