Spring Cloud Stream是什么?
官网翻译:
Spring Cloud Stream是一个框架,用于构建与共享消息传递系统连接的高度可扩展的事件驱动型微服务。
该框架提供了一个灵活的编程模型,该模型建立在已经建立并熟悉的Spring习惯用法和最佳实践的基础上,包括对持久性pub / sub语义,使用者组和有状态分区的支持。
网上看到的一句话理解: 屏蔽底层消息中间件的差异,降低切换成本,统一消息的编程模型
目前仅支持: RabbitMQ
、Apache Kafka
Spring Cloud Stream的核心构建块是: ()
- Destination Binders:负责与外部消息传递系统集成的组件。
- Destination Bindings:外部消息传递系统和应用程序之间提供的消息的生成者和消费者(由目标绑定程序创建)之间的桥梁。
- Message:生产者和消费者使用的规范数据结构,用于与目标绑定程序(以及通过外部消息传递系统进行的其他应用程序)进行通信。
Stream
就是通过Binder
绑定器作为中间层实现了应用程序与中间件的隔离:
流程变成了, 生产者输出消息到绑定层, 由绑定层在转发到具体的中间件, 消息的消费也是由中间件到绑定层输入到具体的消费者。
**这样做的好处在于API
一样, 可以简单的切换中间件 (目前只支持2种)**。
核心概念:
Barista接口
:Barista接口是定义来作为后面类的参数,这一接口定义来通道类型和通道名称,通道名称是作为配置用,通道类型则决定了app会使用这一通道进行发送消息还是从中接收消息。
通道接口如何定义:
@Output
:输出注解,用于定义发送消息接口
@Input
:输入注解,用于定义消息的消费者接口
@StreamListener
:用于定义监听方法的注解
使用Spring Cloud Stream 非常简单,只需要使用好这3个注解即可
一.SpringCloud Stream 消息驱动
1.使用MQ的场景
系统解耦 | 防止级联失败 |
---|
异步处理 | 节省耗时,避免链路调用耗时(支付==>优惠券,积分,短信) |
故障隔离 | 不影响主体业务 |
削峰填谷 | 存入消息队列,利用多线程去取(自己设置,可控,防止宕机) |
日志处理 | 收集,日志服务去读(保证性能,保存) |
注意:上一步需要的结果,下一步需要则不能使用MQ
2.MQ问题概述==>SpringCloud Stream
MQ在项目中的问题:
1、开发人员的学习成本 和项目开发的成本
2、系统和MQ之间的耦合
SpringCloud 官方出SpringCloud Stream 技术解决系统和MQ之间的耦合问题(封装了各种操作Api)
3.基本概述
Spring Cloud Stream的binder对象负责与消息中间件交互(发布-订阅、消费组、分区)
目前仅支持RabbitMQ、Kafka,或自定义
3.1、工作原理
通过binder对象完成应用程序与消息中间件解耦,只需要替换相应的binder对象即可
4.Stream消息驱动入门
4.1、快速入门
①引入依赖
②生产者配置yaml
③生产者开启@EbableBinding(Source.class)
④发消息
⑤配置接受消息:@EbableBinding(Sink.class);将配置文件中output==>input
消息对象:
4.2、自定义消息通道
①参考Source接口:修改OUTPUT值,@Output(xxx)
②配置类@EnableBinding({Source.class,MySource.class}) //添加自定义配置
③修改配置文件
④发消息
⑤接收消息
4.3、分组消息
问题:防止MQ在集群部署的情况下,分发到每个服务下,造成过量消费;下单(-1)扣减库存(-3)
1 2 3 4 5
| bindings:
stream_input: destination: myexchange.message group: group1
|
分组消息:在 Stream 中处于同一个 group 中的多个消费者是竞争关系,只有一个消费方
4.4、分区消息
同时有多条同一个用户的多个数据发送过来,我们需要根据用户统计;
根据hash算法使得该用户消息,落在固定的消费方
生产方配置
消费方配置
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
| stream: instance-count: 2 instance-index: 1 binders: bindings: stream_input: destination: myexchange.message group: group1 consumer: partitioned: true
|
同一分区下,两个分组也是竞争关系
4.5、延迟队列
安装rabbitmq-delayed-message-exchange 插件
配置:
设置过期时间:message.setHeader(“x-delay”, 5000)
应用场景:
- 订单超时未支付,30分钟后取消订单
- 添加购物车扣减库存,30分钟后删除购物车回滚库存
优点:
1、维护性增强、扩展性增强
2、MQ和系统完全解耦
3、屏蔽底层MQ使用
二. 配置示例(短信发送,消费者消息确认机制)
生产者配置
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23
| spring: stream: rabbit: bindings: sms-output: producer: routing-key-expression: headers.type exchange-type: direct binders: defaultRabbit: type: rabbit environment: spring: rabbitmq: host: 192.168.200.129 port: 5672 username: admin password: pass virtual-host: / bindings: sms-output: destination: sms-exchange content-type: application/json
|
生产者代码
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48
| public class SmsSendFaceImpl implements SmsSendFace {
@Autowired SmsSendAdapter smsSendAdapter;
@Autowired SmsSource smsSource;
@Autowired IdentifierGenerator identifierGenerator;
@Value("${spring.application.name}") private String applicationName;
@Value("${server.port}") private String port;
@Override public Boolean SendSms(SendMessageVo sendMessageVo) throws ProjectException { String sendMessageVoString = JSONObject.toJSONString(sendMessageVo); MqMessage mqMessage = MqMessage.builder() .id((Long)identifierGenerator.nextId(sendMessageVo)) .title("sms-message") .content(sendMessageVoString) .messageType("sms-request") .produceTime(Timestamp.valueOf(LocalDateTime.now())) .sender(applicationName+":"+port) .build(); Message<MqMessage> message = MessageBuilder.withPayload(mqMessage) .setHeader("type", "sms-key") .build(); Boolean flag = smsSource.smsOutput().send(message); log.info("发送:{}结果:{}",mqMessage.toString(),flag); return flag; }
@Override public Boolean querySendSms(SmsSendRecordVo smsSendRecordVo) throws ProjectException { SmsSendRecord smsSendRecord = BeanConv.toBean(smsSendRecordVo, SmsSendRecord.class); return smsSendAdapter.querySendSms(smsSendRecord); }
@Override public Boolean retrySendSms(SmsSendRecordVo smsSendRecord) throws ProjectException { return null; } }
|
消费者配置
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50
| spring: stream: rabbit: bindings: log-input: consumer: acknowledge-mode: MANUAL durable-subscription: true binding-routing-key: log-key prefetch: 100 max-concurrency: 10 exchange-type: direct sms-input: consumer: acknowledge-mode: MANUAL durable-subscription: true binding-routing-key: sms-key prefetch: 100 max-concurrency: 10 exchange-type: direct binders: defaultRabbit: type: rabbit environment: spring: rabbitmq: host: 192.168.200.129 port: 5672 username: admin password: pass virtual-host: / bindings: log-input: destination: log-exchange content-type: application/json group: log-queue binder: defaultRabbit consumer: concurrency: 3 max-attempts: 6 sms-input: destination: sms-exchange content-type: application/json group: sms-queue binder: defaultRabbit consumer: concurrency: 3 max-attempts: 6
|
消费者代码
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47
| import com.alibaba.fastjson.JSONObject; import com.itheima.restkeeper.adapter.SmsSendAdapter; import com.itheima.restkeeper.exception.ProjectException; import com.itheima.restkeeper.pojo.MqMessage; import com.itheima.restkeeper.req.SendMessageVo; import com.itheima.restkeeper.sink.SmsSink; import com.rabbitmq.client.Channel; import lombok.extern.slf4j.Slf4j; import org.springframework.amqp.support.AmqpHeaders; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.cloud.stream.annotation.StreamListener; import org.springframework.messaging.handler.annotation.Header; import org.springframework.messaging.handler.annotation.Payload; import org.springframework.stereotype.Component;
import java.io.IOException;
public class SmsListen {
@Autowired SmsSendAdapter smsSendAdapter;
@StreamListener(SmsSink.SMS_INPUT) public void onMessage(@Payload MqMessage message, @Header(AmqpHeaders.CHANNEL) Channel channel, @Header(AmqpHeaders.DELIVERY_TAG) Long deliveryTag) throws IOException { try { String jsonConten = message.getContent(); log.info("[onMessage][线程编号:{} 消息内容:{}]", Thread.currentThread().getId(), message); SendMessageVo sendMessageVo = JSONObject.parseObject(jsonConten, SendMessageVo.class); boolean flag = smsSendAdapter.SendSms( sendMessageVo.getTemplateNo(), sendMessageVo.getSginNo(), sendMessageVo.getLoadBalancerType(), sendMessageVo.getMobiles(), sendMessageVo.getTemplateParam()); if (flag){ channel.basicAck(deliveryTag,false); } } catch (ProjectException | IOException ex) { ex.printStackTrace(); channel.basicReject(deliveryTag, true); } } }
|
三. 面试题
1、如何解决消息堆积的问题
增加如下两个配置
1 2
| prefetch: 100 max-concurrency: 10
|
2、如何保证消息的可靠性
- 生产者消息发送确认机制
- 消息持久化机制:Spring Stream开启了分组就开启了持久化,否则不开启
- 消费者确认机制:如上面的短信发送配置示例
- 失败重试机制