SpringCloudz之—— Stream的使用

Spring Cloud Stream是什么?

官网翻译:

Spring Cloud Stream是一个框架,用于构建与共享消息传递系统连接的高度可扩展的事件驱动型微服务

该框架提供了一个灵活的编程模型,该模型建立在已经建立并熟悉的Spring习惯用法和最佳实践的基础上,包括对持久性pub / sub语义,使用者组和有状态分区的支持。

网上看到的一句话理解: 屏蔽底层消息中间件的差异,降低切换成本,统一消息的编程模型

目前仅支持: RabbitMQApache Kafka

Spring Cloud Stream的核心构建块是: ()

  • Destination Binders:负责与外部消息传递系统集成的组件。
  • Destination Bindings:外部消息传递系统和应用程序之间提供的消息的生成者和消费者(由目标绑定程序创建)之间的桥梁。
  • Message:生产者和消费者使用的规范数据结构,用于与目标绑定程序(以及通过外部消息传递系统进行的其他应用程序)进行通信。

Stream就是通过Binder绑定器作为中间层实现了应用程序与中间件的隔离:

流程变成了, 生产者输出消息到绑定层, 由绑定层在转发到具体的中间件, 消息的消费也是由中间件到绑定层输入到具体的消费者。

**这样做的好处在于API一样, 可以简单的切换中间件 (目前只支持2种)**。

核心概念:
Barista接口:Barista接口是定义来作为后面类的参数,这一接口定义来通道类型和通道名称,通道名称是作为配置用,通道类型则决定了app会使用这一通道进行发送消息还是从中接收消息。

通道接口如何定义:
@Output:输出注解,用于定义发送消息接口
@Input:输入注解,用于定义消息的消费者接口
@StreamListener:用于定义监听方法的注解

使用Spring Cloud Stream 非常简单,只需要使用好这3个注解即可

一.SpringCloud Stream 消息驱动

1.使用MQ的场景

系统解耦防止级联失败
异步处理节省耗时,避免链路调用耗时(支付==>优惠券,积分,短信)
故障隔离不影响主体业务
削峰填谷存入消息队列,利用多线程去取(自己设置,可控,防止宕机)
日志处理收集,日志服务去读(保证性能,保存)

注意:上一步需要的结果,下一步需要则不能使用MQ

2.MQ问题概述==>SpringCloud Stream

MQ在项目中的问题:

1、开发人员的学习成本 和项目开发的成本

2、系统和MQ之间的耦合

SpringCloud 官方出SpringCloud Stream 技术解决系统和MQ之间的耦合问题(封装了各种操作Api)

3.基本概述

Spring Cloud Stream的binder对象负责与消息中间件交互(发布-订阅、消费组、分区)

目前仅支持RabbitMQ、Kafka,或自定义

img

3.1、工作原理

通过binder对象完成应用程序与消息中间件解耦,只需要替换相应的binder对象即可

img

4.Stream消息驱动入门

4.1、快速入门

①引入依赖

img

②生产者配置yaml

img

③生产者开启@EbableBinding(Source.class)

④发消息

img

⑤配置接受消息:@EbableBinding(Sink.class);将配置文件中output==>input

img

消息对象:

img

4.2、自定义消息通道

①参考Source接口:修改OUTPUT值,@Output(xxx)

②配置类@EnableBinding({Source.class,MySource.class}) //添加自定义配置

③修改配置文件

④发消息

img

⑤接收消息

img

img

4.3、分组消息

问题:防止MQ在集群部署的情况下,分发到每个服务下,造成过量消费;下单(-1)扣减库存(-3)

1
2
3
4
5
bindings:
# 消息接收通道
stream_input:
destination: myexchange.message # 绑定的交换机名称
group: group1 #分组名称,解决集群消费消息问题

分组消息:在 Stream 中处于同一个 group 中的多个消费者是竞争关系,只有一个消费方

4.4、分区消息

同时有多条同一个用户的多个数据发送过来,我们需要根据用户统计;

根据hash算法使得该用户消息,落在固定的消费方

生产方配置

img

img

消费方配置

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
stream:
#分区配置---------------------------------------
instance-count: 2 # 消费者总数
instance-index: 1 # 当前消费者的索引【修改每个消息消费方的索引,从0开始】
#分区配置---------------------------------------
# Binder 配置项,对应 BinderProperties Map
binders:
#其它配置省略
bindings:
stream_input:
destination: myexchange.message # 绑定的交换机名称
group: group1 #分组名称,解决集群消费消息问题
consumer:
#分区配置---------------------------------------
partitioned: true # ##开启分区支持

同一分区下,两个分组也是竞争关系

4.5、延迟队列

安装rabbitmq-delayed-message-exchange 插件

配置:

设置过期时间:message.setHeader(“x-delay”, 5000) img

应用场景:

  • 订单超时未支付,30分钟后取消订单
  • 添加购物车扣减库存,30分钟后删除购物车回滚库存

优点:

1、维护性增强、扩展性增强

2、MQ和系统完全解耦

3、屏蔽底层MQ使用

二. 配置示例(短信发送,消费者消息确认机制)

生产者配置

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
spring:
stream:
rabbit:
bindings:
sms-output:
producer:
routing-key-expression: headers.type # 生产者配置RabbitMq的动态路由键
exchange-type: direct # 指定交换机类型
binders: #需要绑定的rabbitmq的服务信息
defaultRabbit: #定义的名称,用于bidding整合
type: rabbit #消息组件类型
environment: #配置rabbimq连接环境
spring:
rabbitmq:
host: 192.168.200.129 #rabbitmq 服务器的地址
port: 5672 #rabbitmq 服务器端口
username: admin #rabbitmq 用户名
password: pass #rabbitmq 密码
virtual-host: / #虚拟路径
bindings: #服务的整合处理
sms-output: #这个是消息通道的名称
destination: sms-exchange #exchange名称,交换模式默认是topic;把SpringCloud stream的消息输出通道绑定到RabbitMQ的sms-exchange交换器。
content-type: application/json #设置消息的类型,本次为json

生产者代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
public class SmsSendFaceImpl implements SmsSendFace {

@Autowired
SmsSendAdapter smsSendAdapter;

@Autowired
SmsSource smsSource;

@Autowired
IdentifierGenerator identifierGenerator;

@Value("${spring.application.name}")
private String applicationName;

@Value("${server.port}")
private String port;

@Override
public Boolean SendSms(SendMessageVo sendMessageVo) throws ProjectException {
String sendMessageVoString = JSONObject.toJSONString(sendMessageVo);
//发送队列信息
MqMessage mqMessage = MqMessage.builder()
.id((Long)identifierGenerator.nextId(sendMessageVo))
.title("sms-message")
.content(sendMessageVoString)
.messageType("sms-request")
.produceTime(Timestamp.valueOf(LocalDateTime.now()))
.sender(applicationName+":"+port)
.build();
Message<MqMessage> message = MessageBuilder.withPayload(mqMessage)
.setHeader("type", "sms-key")
.build();
Boolean flag = smsSource.smsOutput().send(message);
log.info("发送:{}结果:{}",mqMessage.toString(),flag);
return flag;
}

@Override
public Boolean querySendSms(SmsSendRecordVo smsSendRecordVo) throws ProjectException {
SmsSendRecord smsSendRecord = BeanConv.toBean(smsSendRecordVo, SmsSendRecord.class);
return smsSendAdapter.querySendSms(smsSendRecord);
}

@Override
public Boolean retrySendSms(SmsSendRecordVo smsSendRecord) throws ProjectException {
return null;
}
}

消费者配置

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
spring:
stream:
rabbit:
bindings:
log-input:
consumer:
acknowledge-mode: MANUAL #签收模式
durable-subscription: true #是否持久化队
binding-routing-key: log-key
prefetch: 100 #默认:1,限制consumer在消费消息时,一次能同时获取的消息数量,。
max-concurrency: 10 # 默认:1,queue的消费者的最大数量。当前消费者数量不足以及时消费消息时, 会动态增加消费者数量, 直到到达最大数量, 即该配置的值.
exchange-type: direct # 指定交换机类型
sms-input:
consumer:
acknowledge-mode: MANUAL #签收模式
durable-subscription: true #是否持久化队
binding-routing-key: sms-key
prefetch: 100 #默认:1,限制consumer在消费消息时,一次能同时获取的消息数量,。
max-concurrency: 10 # 默认:1,queue的消费者的最大数量。当前消费者数量不足以及时消费消息时, 会动态增加消费者数量, 直到到达最大数量, 即该配置的值.
exchange-type: direct # 指定交换机类型
binders: #需要绑定的rabbitmq的服务信息
defaultRabbit: #定义的名称,用于bidding整合
type: rabbit #消息组件类型
environment: #配置rabbimq连接环境
spring:
rabbitmq:
host: 192.168.200.129 #rabbitmq 服务器的地址
port: 5672 #rabbitmq 服务器端口
username: admin #rabbitmq 用户名
password: pass #rabbitmq 密码
virtual-host: / #虚拟路径
# Binding 配置项,对应 BindingProperties Map
bindings:
log-input:
destination: log-exchange # 目的地。这里使用 Topic
content-type: application/json # 内容格式。这里使用 JSON
group: log-queue # 消费者分组,队列名称:destination+group,此时队列为持久化的
binder: defaultRabbit
consumer:
concurrency: 3 # 初始/最少/空闲时 消费者数量。默认1
max-attempts: 6 # 重试次数
sms-input:
destination: sms-exchange # 目的地。这里使用 Topic
content-type: application/json # 内容格式。这里使用 JSON
group: sms-queue # 消费者分组,队列名称:destination+group,此时队列为持久化的
binder: defaultRabbit
consumer:
concurrency: 3 # 初始/最少/空闲时 消费者数量。默认1
max-attempts: 6 # 重试次数

消费者代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
import com.alibaba.fastjson.JSONObject;
import com.itheima.restkeeper.adapter.SmsSendAdapter;
import com.itheima.restkeeper.exception.ProjectException;
import com.itheima.restkeeper.pojo.MqMessage;
import com.itheima.restkeeper.req.SendMessageVo;
import com.itheima.restkeeper.sink.SmsSink;
import com.rabbitmq.client.Channel;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.support.AmqpHeaders;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.cloud.stream.annotation.StreamListener;
import org.springframework.messaging.handler.annotation.Header;
import org.springframework.messaging.handler.annotation.Payload;
import org.springframework.stereotype.Component;

import java.io.IOException;

public class SmsListen {

@Autowired
SmsSendAdapter smsSendAdapter;

@StreamListener(SmsSink.SMS_INPUT)
public void onMessage(@Payload MqMessage message,
@Header(AmqpHeaders.CHANNEL) Channel channel,
@Header(AmqpHeaders.DELIVERY_TAG) Long deliveryTag) throws IOException {
try {
String jsonConten = message.getContent();
log.info("[onMessage][线程编号:{} 消息内容:{}]", Thread.currentThread().getId(), message);
SendMessageVo sendMessageVo = JSONObject.parseObject(jsonConten, SendMessageVo.class);
boolean flag = smsSendAdapter.SendSms(
sendMessageVo.getTemplateNo(),
sendMessageVo.getSginNo(),
sendMessageVo.getLoadBalancerType(),
sendMessageVo.getMobiles(),
sendMessageVo.getTemplateParam());
if (flag){
channel.basicAck(deliveryTag,false);
}
} catch (ProjectException | IOException ex) {
ex.printStackTrace();
//第一个参数deliveryTag:发布的每一条消息都会获得一个唯一的deliveryTag,deliveryTag在channel范围内是唯一的
//第二个参数requeue:表示如何处理这条消息,如果值为true,则重新放入RabbitMQ的发送队列,如果值为false,则通知RabbitMQ销毁这条消息
channel.basicReject(deliveryTag, true);
}
}
}

三. 面试题

1、如何解决消息堆积的问题

增加如下两个配置

1
2
prefetch: 100 #默认:1,限制consumer在消费消息时,一次能同时获取的消息数量,。
max-concurrency: 10 # 默认:1,queue的消费者的最大数量。当前消费者数量不足以及时消费消息时, 会动态增加消费者数量, 直到到达最大数量,

2、如何保证消息的可靠性

  • 生产者消息发送确认机制
  • 消息持久化机制:Spring Stream开启了分组就开启了持久化,否则不开启
  • 消费者确认机制:如上面的短信发送配置示例
  • 失败重试机制