微服务高级

微服务保护

雪崩问题

微服务中,服务间调用关系错综复杂,一个微服务往往依赖于多个其它微服务。

如果服务提供者I发生了故障,当前的应用的部分业务因为依赖于服务I,因此也会被阻塞。此时,其它不依赖于服务I的业务似乎不受影响。
但是,依赖服务I的业务请求被阻塞,用户不会得到响应,则tomcat的这个线程不会释放,于是越来越多的用户请求到来,越来越多的线程会阻塞,服务器支持的线程和并发数有限,请求一直阻塞,会导致服务器资源耗尽,从而导致所有其它服务都不可用,那么当前服务也就不可用了。
那么,依赖于当前服务的其它服务随着时间的推移,最终也都会变的不可用,形成级联失败,雪崩就发生了。

雪崩问题解决方案

解决雪崩问题的常见方式有四种:

超时处理

超时处理:设定超时时间,请求超过一定时间没有响应就返回错误信息,不会无休止等待
(用的很少了)

仓壁模式

仓壁模式:限定每个业务能使用的线程数,避免耗尽整个tomcat的资源,因此也叫线程隔离。

断路器(降级熔断)

断路器模式:由断路器统计业务执行的异常比例,如果超出阈值则会熔断该业务,拦截访问该业务的一切请求。

当发现访问服务D的请求异常比例过高时,认为服务D有导致雪崩的风险,会拦截访问服务D的一切请求,形成熔断:

限流

流量控制:限制业务访问的QPS,避免服务因流量的突增而故障。

总结

什么是雪崩问题?

  • 微服务之间相互调用,因为调用链中的一个服务故障,引起整个链路都无法访问的情况。

可以认为:

限流是对服务的保护,避免因瞬间高并发流量而导致服务故障,进而避免雪崩。是一种预防措施。

超时处理、线程隔离、降级熔断是在部分服务故障时,将故障控制在一定范围,避免雪崩。是一种补救措施。

服务保护技术对比

早期比较流行的是Hystrix框架,但目前国内实用最广泛的还是阿里巴巴的Sentinel框架,这里我们做下对比:

Sentinel Hystrix
隔离策略 信号量隔离 线程池隔离/信号量隔离
熔断降级策略 基于慢调用比例或异常比例 基于失败比率
实时指标实现 滑动窗口 滑动窗口(基于 RxJava)
规则配置 支持多种数据源 支持多种数据源
扩展性 多个扩展点 插件的形式
基于注解的支持 支持 支持
限流 基于 QPS,支持基于调用关系的限流 有限的支持
流量整形 支持慢启动、匀速排队模式 不支持
系统自适应保护 支持 不支持
控制台 开箱即用,可配置规则、查看秒级监控、机器发现等 不完善
常见框架的适配 Servlet、Spring Cloud、Dubbo、gRPC 等 Servlet、Spring Cloud Netflix

Sentinel介绍和安装

初识Sentinel

Sentinel是阿里巴巴开源的一款微服务流量控制组件。官网地址:https://sentinelguard.io/zh-cn/index.html

Sentinel 具有以下特征:

丰富的应用场景:Sentinel 承接了阿里巴巴近 10 年的双十一大促流量的核心场景,例如秒杀(即突发流量控制在系统容量可以承受的范围)、消息削峰填谷、集群流量控制、实时熔断下游不可用应用等。

完备的实时监控:Sentinel 同时提供实时的监控功能。您可以在控制台中看到接入应用的单台机器秒级数据,甚至 500 台以下规模的集群的汇总运行情况。

广泛的开源生态:Sentinel 提供开箱即用的与其它开源框架/库的整合模块,例如与 Spring Cloud、Dubbo、gRPC 的整合。您只需要引入相应的依赖并进行简单的配置即可快速地接入 Sentinel。

完善的 SPI 扩展点:Sentinel 提供简单易用、完善的 SPI 扩展接口。您可以通过实现扩展接口来快速地定制逻辑。例如定制规则管理、适配动态数据源等。

运行Sentinel

1)下载

sentinel官方提供了UI控制台,方便我们对系统做限流设置。大家可以在GitHub下载。

2)运行

将jar包放到任意非中文目录,执行命令:

1
java -jar sentinel-dashboard-1.8.1.jar

如果要修改Sentinel的默认端口、账户、密码,可以通过下列配置:

配置项 默认值 说明
server.port 8080 服务端口
sentinel.dashboard.auth.username sentinel 默认用户名
sentinel.dashboard.auth.password sentinel 默认密码

例如,修改端口:

1
java -Dserver.port=8090 -jar sentinel-dashboard-1.8.1.jar

3)访问

访问http://localhost:8080页面,需要输入账号和密码,默认都是:sentinel

微服务整合Sentinel

我们在order-service中整合sentinel,并连接sentinel的控制台,步骤如下:

1)引入sentinel依赖

1
2
3
4
5
<!--sentinel-->
<dependency>
<groupId>com.alibaba.cloud</groupId>
<artifactId>spring-cloud-starter-alibaba-sentinel</artifactId>
</dependency>

2)配置控制台

修改application.yaml文件,添加下面内容:

1
2
3
4
5
6
7
server:
port: 8088
spring:
cloud:
sentinel:
transport:
dashboard: localhost:8080

3)访问order-service的任意端点

打开浏览器,访问http://localhost:8088/order/101,这样才能触发sentinel的监控。

流量控制(限流)

雪崩问题虽然有四种方案,但是限流是避免服务因突发的流量而发生故障,是对微服务雪崩问题的预防。

测试软件可以采用:jmeter

簇点链路

当请求进入微服务时,首先会访问DispatcherServlet,然后进入Controller、Service、Mapper,这样的一个调用链就叫做簇点链路。簇点链路中被监控的每一个接口就是一个资源

默认情况下sentinel会监控SpringMVC的每一个端点(Endpoint,也就是controller中的方法),因此SpringMVC的每一个端点(Endpoint)就是调用链路中的一个资源。

流控、熔断等都是针对簇点链路中的资源来设置的,因此我们可以点击对应资源后面的按钮来设置规则:

  • 流控:流量控制
  • 降级:降级熔断
  • 热点:热点参数限流,是限流的一种
  • 授权:请求的权限控制

流控模式

在添加限流规则时,点击高级选项,可以选择三种流控模式

  • 直接:统计当前资源的请求,触发阈值时对当前资源直接限流,也是默认的模式
  • 关联:统计与当前资源相关的另一个资源,触发阈值时,对当前资源限流
  • 链路:统计从指定链路访问到本资源的请求,触发阈值时,对指定链路限流

阈值一般采用QPS—每秒的请求数量

直接模式是默认模式,比较简单就不详细描述了

关联模式

关联模式:统计与当前资源相关的另一个资源,触发阈值时,对当前资源限流

注意:!!!
对谁做限流就对谁增加规则

配置规则

语法说明:当/write资源访问量触发阈值时,就会对/read资源限流,避免影响/write资源。

使用场景:比如用户支付时需要修改订单状态,同时用户要查询订单。查询和修改操作会争抢数据库锁,产生竞争。业务需求是优先支付和更新订单的业务,因此当修改订单业务触发阈值时,需要对查询订单业务限流。

需求说明

  • 在OrderController新建两个端点:/order/query和/order/update,无需实现业务

  • 配置流控规则,当/order/ update资源被访问的QPS超过5时,对/order/query请求限流

1)定义/order/query端点,模拟订单查询

1
2
3
4
@GetMapping("/query")
public String queryOrder() {
return "查询订单成功";
}

2)定义/order/update端点,模拟订单更新

1
2
3
4
@GetMapping("/update")
public String updateOrder() {
return "更新订单成功";
}

3)配置流控规则

对哪个端点限流,就点击哪个端点后面的按钮。我们是对订单查询/order/query限流,因此点击它后面的按钮:

在表单中填写流控规则:

4)在Jmeter测试

链路模式

链路模式:只针对从指定链路访问到本资源的请求做统计,判断是否超过阈值。

配置示例

例如有两条请求链路:

  • /test1 –> /common

  • /test2 –> /common

如果只希望统计从/test2进入到/common的请求,则可以这样配置

实战案例

需求:有查询订单和创建订单业务,两者都需要查询商品。针对从查询订单进入到查询商品的请求统计,并设置限流。防止高并发查询影响到创建订单业务

步骤:

  1. 在OrderService中添加一个queryGoods方法,不用实现业务

  2. 在OrderController中,改造/order/query端点,调用OrderService中的queryGoods方法

  3. 在OrderController中添加一个/order/save的端点,调用OrderService的queryGoods方法

  4. 给queryGoods设置限流规则,从/order/query进入queryGoods的方法限制QPS必须小于2

实现:

1)添加查询商品方法

在order-service服务中,给OrderService类添加一个queryGoods方法:

1
2
3
public void queryGoods(){
System.err.println("查询商品");
}

2)查询订单时,查询商品

在order-service的OrderController中,修改/order/query端点的业务逻辑:

1
2
3
4
5
6
7
8
@GetMapping("/query")
public String queryOrder() {
// 查询商品
orderService.queryGoods();
// 查询订单
System.out.println("查询订单");
return "查询订单成功";
}

3)新增订单,查询商品

在order-service的OrderController中,修改/order/save端点,模拟新增订单:

1
2
3
4
5
6
7
8
@GetMapping("/save")
public String saveOrder() {
// 查询商品
orderService.queryGoods();
// 查询订单
System.err.println("新增订单");
return "新增订单成功";
}

4)给查询商品添加资源标记

默认情况下,OrderService中的方法是不被Sentinel监控的,需要我们自己通过注解来标记要监控的方法。

给OrderService的queryGoods方法添加@SentinelResource注解:

1
2
3
4
@SentinelResource("goods")
public void queryGoods(){
System.err.println("查询商品");
}

链路模式中,是对不同来源的两个链路做监控。但是sentinel默认会给进入SpringMVC的所有请求设置同一个root资源,会导致链路模式失效。

我们需要关闭这种对SpringMVC的资源聚合,修改order-service服务的application.yml文件:

1
2
3
4
spring:
cloud:
sentinel:
web-context-unify: false # 关闭context整合

重启服务,访问/order/query和/order/save,可以查看到sentinel的簇点链路规则中,出现了新的资源:

5)添加流控规则

点击goods资源后面的流控按钮,在弹出的表单中填写下面信息:

只统计从/order/query进入/goods的资源,QPS阈值为2,超出则被限流。

6)Jmeter测试

总结

流控模式有哪些?

•直接:对当前资源限流

•关联:高优先级资源触发阈值,对低优先级资源限流。

•链路:阈值统计时,只统计从指定资源进入当前资源的请求,是对请求来源的限流

关联与链路模型对比:
关联用于业务具有优先级并且竞争相同资源时,通过对低优先级的业务增加监听高优先级业务的规则,当高优先级业务达到阈值,意味着高优先级业务需要更多的资源,这个时候,低优先级业务就会限流,从而为高优先级业务让出更多的资源
链路模型则用于存在不同链路,只对低优先级业务的进行监听,达到阈值就限流,防止影响到高优先级的业务,值得注意的是,一般链路模型设立规则在不同链路的汇流处(可能在service层)。

流控效果

在流控的高级选项中,还有一个流控效果选项:

流控效果是指请求达到流控阈值时应该采取的措施,包括三种:

  • 快速失败:达到阈值后,新的请求会被立即拒绝并抛出FlowException异常。是默认的处理方式。

  • warm up:预热模式,对超出阈值的请求同样是拒绝并抛出异常。但这种模式阈值会动态变化,从一个较小值逐渐增加到最大阈值。

  • 排队等待:让所有的请求按照先后次序排队执行,两个请求的间隔不能小于指定时长,超时仍然会拒绝访问

warm up

阈值一般是一个微服务能承担的最大QPS,但是一个服务刚刚启动时,一切资源尚未初始化(冷启动),如果直接将QPS跑到最大值,可能导致服务瞬间宕机。

warm up也叫预热模式,是应对服务冷启动的一种方案。请求阈值初始值是 maxThreshold / coldFactor,持续指定时长后,逐渐提高到maxThreshold值。而coldFactor的默认值是3.

例如,我设置QPS的maxThreshold为10,预热时间为5秒,那么初始阈值就是 10 / 3 ,也就是3,然后在5秒后逐渐增长到10.

排队等待

当请求超过QPS阈值时,快速失败和warm up 会拒绝新的请求并抛出异常。

而排队等待则是让所有请求进入一个队列中,然后按照阈值允许的时间间隔依次执行。后来的请求必须等待前面执行完成,如果请求预期的等待时间超出最大时长,则会被拒绝。

工作原理

例如:QPS = 5,意味着每200ms处理一个队列中的请求;timeout = 2000,意味着预期等待时长超过2000ms的请求会被拒绝并抛出异常。

那什么叫做预期等待时长呢?

比如现在一下子来了12 个请求,因为每200ms执行一个请求,那么:

  • 第6个请求的预期等待时长 = 200 * (6 - 1) = 1000ms
  • 第12个请求的预期等待时长 = 200 * (12-1) = 2200ms

如果使用队列模式做流控,所有进入的请求都要排队,以固定的200ms的间隔执行,QPS会变的很平滑:

平滑的QPS曲线,对于服务器来说是更友好的。(特别是突然的高并发)

总结

流控效果有哪些?

  • 快速失败:QPS超过阈值时,拒绝新的请求

  • warm up: QPS超过阈值时,拒绝新的请求;QPS阈值是逐渐提升的,可以避免冷启动时高并发导致服务宕机。

  • 排队等待:请求会进入队列,按照阈值允许的时间间隔依次执行请求;如果请求预期等待时长大于超时时间,直接拒绝

热点参数限流

之前的限流是统计访问某个资源的所有请求,判断是否超过QPS阈值。而热点参数限流是分别统计参数值相同的请求,判断是否超过QPS阈值。

例如,一个根据id查询商品的接口:

访问/goods/{id}的请求中,id参数值会有变化,热点参数限流会根据参数值分别统计QPS,统计结果:当id=1的请求触发阈值被限流时,id值不为1的请求不受影响。

在实际开发中,可能部分商品是热点商品,例如秒杀商品,我们希望这部分商品的QPS限制与其它商品不一样,高一些。那就需要配置热点参数限流的高级选项了

可以对多个id分别做限流规则

注意事项:热点参数限流对默认的SpringMVC资源无效,需要利用@SentinelResource注解标记资源(对应的controller方法)

隔离和降级

限流是一种预防措施,虽然限流可以尽量避免因高并发而引起的服务故障,但服务还会因为其它原因而故障。

而要将这些故障控制在一定范围,避免雪崩,就要靠线程隔离(舱壁模式)和熔断降级手段了。

线程隔离之前讲到过:调用者在调用服务提供者时,给每个调用的请求分配独立线程池,出现故障时,最多消耗这个线程池内资源,避免把调用者的所有资源耗尽。

熔断降级:是在调用方这边加入断路器,统计对服务提供者的调用,如果调用的失败比例过高,则熔断该业务,不允许访问该服务的提供者了。

注意:这两种手段在sentinel中都可以对阻塞服务做降级处理

可以看到,不管是线程隔离还是熔断降级,都是对客户端(调用方)的保护。需要在调用方 发起远程调用时做线程隔离、或者服务熔断。

而我们的微服务远程调用都是基于Feign来完成的,因此我们需要将Feign与Sentinel整合,在Feign里面实现线程隔离和服务熔断。

FeignClient整合Sentinel(重要 隔离和降级都必须要做整合)

SpringCloud中,微服务调用都是通过Feign来实现的,因此做客户端保护必须整合Feign和Sentinel。

修改配置,开启sentinel功能

修改OrderService的application.yml文件,开启Feign的Sentinel功能:

1
2
3
feign:
sentinel:
enabled: true # 开启feign对sentinel的支持

编写失败降级逻辑

业务失败后,不能直接报错,而应该返回用户一个友好提示或者默认结果,这个就是失败降级逻辑。

给FeignClient编写失败后的降级逻辑

①方式一:FallbackClass,无法对远程调用的异常做处理

②方式二:FallbackFactory,可以对远程调用的异常做处理,我们选择这种

这里我们演示方式二的失败降级处理。

步骤一:在feing-api项目中定义类,实现FallbackFactory:

代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17

@Slf4j
public class UserClientFallbackFactory implements FallbackFactory<UserClient> {
@Override
public UserClient create(Throwable throwable) {
//这里编写的就是降级处理返回请求
//一旦读取到降级处理就不走controller 而进入这里进行响应
return new UserClient() {
@Override
public User findById(Long id) {
log.error("查询用户异常", throwable);
return new User();
}
};
}
}

步骤二:在feign-api项目中的DefaultFeignConfiguration类中将UserClientFallbackFactory注册为一个Bean:(在feign的配置类中)

1
2
3
4
@Bean
public UserClientFallbackFactory userClientFallbackFactory(){
return new UserClientFallbackFactory();
}

步骤三:在feing-api项目中的UserClient接口中使用UserClientFallbackFactory:

1
2
3
4
5
6
7

@FeignClient(value = "userservice", fallbackFactory = UserClientFallbackFactory.class)
public interface UserClient {

@GetMapping("/user/{id}")
User findById(@PathVariable("id") Long id);
}

总结
Feign整合Sentinel的步骤:

  • 在application.yml中配置:feign.sentienl.enable=true
  • 给FeignClient编写FallbackFactory并注册为Bean
  • 将FallbackFactory配置到FeignClient

线程隔离(舱壁模式)

线程隔离的实现方式

线程隔离有两种方式实现:

  • 线程池隔离

  • 信号量隔离(Sentinel默认采用)

线程隔离两种方式.png

线程池隔离:给每个服务调用业务分配一个线程池,利用线程池本身实现隔离效果

信号量隔离:不创建线程池,而是计数器模式,记录业务使用的线程数量,达到信号量上限时,禁止新的请求。

线程隔离两种方式优缺点对比.png

sentinel的线程隔离

用法说明

在sentinel控制台限流处理中:

  • QPS:就是每秒的请求数,在快速入门中已经演示过

  • 线程数:是该资源能使用用的tomcat线程数的最大值。也就是通过限制线程数量,实现线程隔离(舱壁模式)。

熔断降级

熔断降级是解决雪崩问题的重要手段。其思路是由断路器统计服务调用的异常比例、慢请求比例,如果超出阈值则会熔断该服务。即拦截访问该服务的一切请求;而当服务恢复时,断路器会放行访问该服务的请求。

断路器控制熔断和放行是通过状态机来完成的:

熔断降级状态机.png

状态机包括三个状态:

  • closed:关闭状态,断路器放行所有请求,并开始统计异常比例、慢请求比例。超过阈值则切换到open状态
  • open:打开状态,服务调用被熔断,访问被熔断服务的请求会被拒绝,快速失败,直接走降级逻辑。Open状态5秒后会进入half-open状态
  • half-open:半开状态,放行一次请求,根据执行结果来判断接下来的操作。
    • 请求成功:则切换到closed状态
    • 请求失败:则切换到open状态

断路器熔断策略有三种:慢调用、异常比例、异常数

慢调用

慢调用:业务的响应时长(RT)大于指定时长的请求认定为慢调用请求。在指定时间内,如果请求数量超过设定的最小数量,慢调用比例大于设定的阈值,则触发熔断。

异常比例、异常数

异常比例或异常数:统计指定时间内的调用,如果调用次数超过指定请求数,并且出现异常的比例达到设定的比例阈值(或超过指定异常数),则触发熔断。

授权规则

授权规则可以对请求方来源做判断和控制。

基本规则

授权规则可以对调用方的来源做控制,有白名单和黑名单两种方式。

  • 白名单:来源(origin)在白名单内的调用者允许访问

  • 黑名单:来源(origin)在黑名单内的调用者不允许访问

  • 来源(origin)一般放在请求头里面进行传输

  • 资源名:就是受保护的资源,例如/order/{orderId}

  • 流控应用:是来源者的名单,

    • 如果是勾选白名单,则名单中的来源被许可访问。
    • 如果是勾选黑名单,则名单中的来源被禁止访问。

我们允许请求从gateway到order-service,不允许浏览器访问order-service,那么白名单中就要填写网关的来源名称(origin)

如何获取origin

Sentinel是通过RequestOriginParser这个接口的parseOrigin来获取请求的来源的。

1
2
3
4
5
6
public interface RequestOriginParser {
/**
* 从请求request对象中获取origin,获取方式自定义
*/
String parseOrigin(HttpServletRequest request);
}

这个方法的作用就是从request对象中,获取请求者的origin值并返回。
默认情况下,sentinel不管请求者从哪里来,返回值永远是default,也就是说一切请求的来源都被认为是一样的值default。
因此,我们需要自定义这个接口的实现,让不同的请求,返回不同的origin

例如order-service服务中,我们定义一个RequestOriginParser的实现类:

1
2
3
4
5
6
7
8
9
10
11
12
13
14

@Component
public class HeaderOriginParser implements RequestOriginParser {
@Override
public String parseOrigin(HttpServletRequest request) {
// 1.获取请求头
String origin = request.getHeader("origin");
// 2.非空判断
if (StringUtils.isEmpty(origin)) {
origin = "blank";
}
return origin;
}
}

我们会尝试从request-header中获取origin值。

给网关添加请求头

既然获取请求origin的方式是从reques-header中获取origin值,我们必须让所有从gateway路由到微服务的请求都带上origin头

这个需要利用之前学习的一个GatewayFilter来实现,AddRequestHeaderGatewayFilter。

修改gateway服务中的application.yml,添加一个defaultFilter:

1
2
3
4
5
6
7
spring:
cloud:
gateway:
default-filters:
- AddRequestHeader=origin,gateway
routes:
# ...略

这样,从gateway路由的所有请求都会带上origin头,值为gateway。而从其它地方到达微服务的请求则没有这个头。

自定义异常结果

默认情况下,发生限流、降级、授权拦截时,都会抛出异常到调用方。异常结果都是flow limmiting(限流)。这样不够友好,无法得知是限流还是降级还是授权拦截。

异常类型

而如果要自定义异常时的返回结果,需要实现BlockExceptionHandler接口:

1
2
3
4
5
6
public interface BlockExceptionHandler {
/**
* 处理请求被限流、降级、授权拦截时抛出的异常:BlockException
*/
void handle(HttpServletRequest request, HttpServletResponse response, BlockException e) throws Exception;
}

这个方法有三个参数:

  • HttpServletRequest request:request对象
  • HttpServletResponse response:response对象
  • BlockException e:被sentinel拦截时抛出的异常

这里的BlockException包含多个不同的子类:

异常 说明
FlowException 限流异常
ParamFlowException 热点参数限流的异常
DegradeException 降级异常
AuthorityException 授权规则异常
SystemBlockException 系统规则异常

自定义异常处理

下面,我们就在order-service定义一个自定义异常处理类:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24

@Component
public class SentinelExceptionHandler implements BlockExceptionHandler {
@Override
public void handle(HttpServletRequest request, HttpServletResponse response, BlockException e) throws Exception {
String msg = "未知异常";
int status = 429;

if (e instanceof FlowException) {
msg = "请求被限流了";
} else if (e instanceof ParamFlowException) {
msg = "请求被热点参数限流";
} else if (e instanceof DegradeException) {
msg = "请求被降级了";
} else if (e instanceof AuthorityException) {
msg = "没有权限访问";
status = 401;
}

response.setContentType("application/json;charset=utf-8");
response.setStatus(status);
response.getWriter().println("{\"msg\": " + msg + ", \"status\": " + status + "}");
}
}

注意!!!:
此实现方式似乎只对路径形式的请求有作用比如限流和授权规则
热点参数 和 降级的阻塞并不能会被这个方法处理
降级不能被阻塞个人理解是因为异常并不能跨服务抛出,默认sentinel检测降级异常是在feign-userservice客户端检测的 所以在orderservice的异常处理肯定是识别不到的(不过这一点到无伤大雅,因为我们是做了降级处理,如果涉及到其他异常结果处理都可以在那个地方进行修改)
但是对于热点参数的异常无法识别可能是因为热点参数采取的是@SentinelResource(value = “hot”),这种注册名称hot的方式 换了如下的解决方案就可以结果热点参数异常返回自定义处理结果的需求了

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
//注意:!!!
//1两个方法必须在用一个类中
//2在@SentinelResource(value = "hot",blockHandler = "testSentinelException")指定异常处理方法
//3异常处理方法的参数列表必须保持原方法参数列表并在末尾多一个BlockException
//4异常处理方法的返回类型必须与原方法返回类型一致
@SentinelResource(value = "hot",blockHandler = "testSentinelException")
@GetMapping("{orderId}")
public Order queryOrderByUserId(@PathVariable("orderId") Long orderId) {
// 根据id查询订单并返回
return orderService.queryOrderById(orderId);
}

public Order testSentinelException(Long orderId,BlockException e){
Order order = new Order();
order.setName("热点参数限流");
System.err.println(e);
return order;
}

规则持久化

现在,sentinel的所有规则都是内存存储,重启后所有规则都会丢失。在生产环境下,我们必须确保这些规则的持久化,避免丢失。

规则管理模式

规则是否能持久化,取决于规则管理模式,sentinel支持三种规则管理模式:

  • 原始模式:Sentinel的默认模式,将规则保存在内存,重启服务会丢失。
  • pull模式
  • push模式(目前主要采用这种 可以做到实时更新)

pull模式

pull模式:控制台将配置的规则推送到Sentinel客户端,而客户端会将配置规则保存在本地文件或数据库中。以后会定时去本地文件或数据库中查询,更新本地规则。

push模式

push模式:控制台将配置规则推送到远程配置中心,例如Nacos。Sentinel客户端监听Nacos,获取配置变更的推送消息,完成本地配置更新。

实现push模式

修改OrderService,让其监听Nacos中的sentinel规则配置。

具体步骤如下:

1.引入依赖

在order-service中引入sentinel监听nacos的依赖:

1
2
3
4
<dependency>
<groupId>com.alibaba.csp</groupId>
<artifactId>sentinel-datasource-nacos</artifactId>
</dependency>

2.配置nacos地址

在order-service中的application.yml文件配置nacos地址及监听的配置信息:

1
2
3
4
5
6
7
8
9
10
spring:
cloud:
sentinel:
datasource:
flow:
nacos:
server-addr: localhost:8848 # nacos地址
dataId: orderservice-flow-rules
groupId: SENTINEL_GROUP
rule-type: flow # 还可以是:degrade、authority、param-flow

3.修改sentinel启动文件

修改sentinel启动文件较为复杂 在企业中一般都采取购买现成的云框架
不过也有一般的实现方式 这里就不赘述 在博客中应该会上传有关的方法

分布式事务

分布式事务问题

本地事务

本地事务,也就是传统的单机事务。在传统数据库事务中,必须要满足四个原则:ACID

ACID.png

分布式事务

分布式事务,就是指不是在单个服务或单个数据库架构下,产生的事务,例如:

  • 跨数据源的分布式事务
  • 跨服务的分布式事务
  • 综合情况

在数据库水平拆分、服务垂直拆分之后,一个业务操作通常要跨多个数据库、服务才能完成。例如电商行业中比较常见的下单付款案例,包括下面几个行为:

  • 创建新订单
  • 扣减商品库存
  • 从用户账户余额扣除金额

完成上面的操作需要访问三个不同的微服务和三个不同的数据库。

订单的创建、库存的扣减、账户扣款在每一个服务和数据库内是一个本地事务,可以保证ACID原则。

但是当我们把三件事情看做一个”业务”,要满足保证“业务”的原子性,要么所有操作全部成功,要么全部失败,不允许出现部分成功部分失败的现象,这就是分布式系统下的事务了。

此时ACID难以满足,这是分布式事务要解决的问题

理论基础

解决分布式事务问题,需要一些分布式系统的基础知识作为理论指导。

CAP定理

1998年,加州大学的计算机科学家 Eric Brewer 提出,分布式系统有三个指标。

  • Consistency(一致性)
  • Availability(可用性)
  • Partition tolerance (分区容错性)

这三个指标不可能同时做到。这个结论就叫做 CAP 定理。

一致性

Consistency(一致性):用户访问分布式系统中的任意节点,得到的数据必须一致。

比如现在包含两个节点,其中的初始数据是一致的

当我们修改其中一个节点的数据时,两者的数据产生了差异

要想保住一致性,就必须实现node01 到 node02的数据 同步

可用性

Availability (可用性):用户访问集群中的任意健康节点,必须能得到响应,而不是超时或拒绝。

有三个节点的集群,访问任何一个都可以及时得到响应:

当有部分节点因为网络故障或其它原因无法访问时,代表节点不可用:

分区容错

Partition(分区):因为网络故障或其它原因导致分布式系统中的部分节点与其它节点失去连接,形成独立分区。

Tolerance(容错):在集群出现分区时,整个系统也要持续对外提供服务

矛盾

在分布式系统中,系统间的网络不能100%保证健康,一定会有故障的时候,而服务有必须对外保证服务。因此Partition Tolerance不可避免。

当节点接收到新的数据变更时,就会出现问题了

如果此时要保证一致性,就必须等待网络恢复,完成数据同步后,整个集群才对外提供服务,服务处于阻塞状态,不可用。

如果此时要保证可用性,就不能等待网络恢复,那node01、node02与node03之间就会出现数据不一致。

也就是说,在P一定会出现的情况下,A和C之间只能实现一个。

BASE理论

BASE理论是对CAP的一种解决思路,包含三个思想:

  • Basically Available (基本可用):分布式系统在出现故障时,允许损失部分可用性,即保证核心可用。
  • Soft State(软状态):在一定时间内,允许出现中间状态,比如临时的不一致状态。
  • Eventually Consistent(最终一致性):虽然无法保证强一致性,但是在软状态结束后,最终达到数据一致。

解决分布式事务的思路

分布式事务最大的问题是各个子事务的一致性问题,因此可以借鉴CAP定理和BASE理论,有两种解决思路:

  • AP模式:各子事务分别执行和提交,允许出现结果不一致,然后采用弥补措施恢复数据即可,实现最终一致。

  • CP模式:各个子事务执行后互相等待,同时提交,同时回滚,达成强一致。但事务等待过程中,处于弱可用状态。

但不管是哪一种模式,都需要在子系统事务之间互相通讯,协调事务状态,也就是需要一个**事务协调者(TC)**:

这里的子系统事务,称为分支事务;有关联的各个分支事务在一起称为全局事务

初识Seata

官网地址:http://seata.io/,其中的文档、播客中提供了大量的使用说明、源码分析。

Seata的架构

Seata事务管理中有三个重要的角色:

  • TC (Transaction Coordinator) - 事务协调者:维护全局和分支事务的状态,协调全局事务提交或回滚。

  • TM (Transaction Manager) - 事务管理器:定义全局事务的范围、开始全局事务、提交或回滚全局事务。

  • RM (Resource Manager) - 资源管理器:管理分支事务处理的资源,与TC交谈以注册分支事务和报告分支事务的状态,并驱动分支事务提交或回滚。

整体的架构如图:

seata架构.png

Seata基于上述架构提供了四种不同的分布式事务解决方案:

  • XA模式:强一致性分阶段事务模式,牺牲了一定的可用性,无业务侵入
  • TCC模式:最终一致的分阶段事务模式,有业务侵入
  • AT模式:最终一致的分阶段事务模式,无业务侵入,也是Seata的默认模式
  • SAGA模式:长事务模式,有业务侵入

无论哪种方案,都离不开TC,也就是事务的协调者。

部署TC服务

先去下载seata官网下载压缩包 解压

修改配置

修改conf目录下的registry.conf文件:

内容如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
registry {
# tc服务的注册中心类,这里选择nacos,也可以是eureka、zookeeper等
type = "nacos"

nacos {
# seata tc 服务注册到 nacos的服务名称,可以自定义
application = "seata-tc-server" #自定义服务名
serverAddr = "127.0.0.1:8848" #nacos服务地址
group = "DEFAULT_GROUP" #分组位置
namespace = ""
cluster = "SH" #集群
username = "nacos" #nacos账号密码
password = "nacos"
}
}

config {
# 读取tc服务端的配置文件的方式,这里是从nacos配置中心读取,这样如果tc是集群,可以共享配置
type = "nacos"
# 配置nacos地址等信息
nacos {
serverAddr = "127.0.0.1:8848"
namespace = ""
group = "SEATA_GROUP"
username = "nacos"
password = "nacos"
dataId = "seataServer.properties"
}
}

在nacos添加配置

特别注意,为了让tc服务的集群可以共享配置,我们选择了nacos作为统一配置中心。因此服务端配置文件seataServer.properties文件需要在nacos中配好。

配置内容如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
# 数据存储方式,db代表数据库
store.mode=db
store.db.datasource=druid
store.db.dbType=mysql
store.db.driverClassName=com.mysql.jdbc.Driver
store.db.url=jdbc:mysql://127.0.0.1:3306/seata?useUnicode=true&rewriteBatchedStatements=true
store.db.user=root
store.db.password=123
store.db.minConn=5
store.db.maxConn=30
store.db.globalTable=global_table
store.db.branchTable=branch_table
store.db.queryLimit=100
store.db.lockTable=lock_table
store.db.maxWait=5000
# 事务、日志等配置
server.recovery.committingRetryPeriod=1000
server.recovery.asynCommittingRetryPeriod=1000
server.recovery.rollbackingRetryPeriod=1000
server.recovery.timeoutRetryPeriod=1000
server.maxCommitRetryTimeout=-1
server.maxRollbackRetryTimeout=-1
server.rollbackRetryTimeoutUnlockEnable=false
server.undo.logSaveDays=7
server.undo.logDeletePeriod=86400000

# 客户端与服务端传输方式
transport.serialization=seata
transport.compressor=none
# 关闭metrics功能,提高性能
metrics.enabled=false
metrics.registryType=compact
metrics.exporterList=prometheus
metrics.exporterPrometheusPort=9898

==其中的数据库地址、用户名、密码都需要修改成你自己的数据库信息。==

创建数据库表

特别注意:tc服务在管理分布式事务时,需要记录事务相关数据到数据库中,你需要提前创建好这些表。

这些表主要记录全局事务、分支事务、全局锁信息:(基本表结构)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46

SET NAMES utf8mb4;
SET FOREIGN_KEY_CHECKS = 0;

-- ----------------------------
-- 分支事务表
-- ----------------------------
DROP TABLE IF EXISTS `branch_table`;
CREATE TABLE `branch_table` (
`branch_id` bigint(20) NOT NULL,
`xid` varchar(128) CHARACTER SET utf8 COLLATE utf8_general_ci NOT NULL,
`transaction_id` bigint(20) NULL DEFAULT NULL,
`resource_group_id` varchar(32) CHARACTER SET utf8 COLLATE utf8_general_ci NULL DEFAULT NULL,
`resource_id` varchar(256) CHARACTER SET utf8 COLLATE utf8_general_ci NULL DEFAULT NULL,
`branch_type` varchar(8) CHARACTER SET utf8 COLLATE utf8_general_ci NULL DEFAULT NULL,
`status` tinyint(4) NULL DEFAULT NULL,
`client_id` varchar(64) CHARACTER SET utf8 COLLATE utf8_general_ci NULL DEFAULT NULL,
`application_data` varchar(2000) CHARACTER SET utf8 COLLATE utf8_general_ci NULL DEFAULT NULL,
`gmt_create` datetime(6) NULL DEFAULT NULL,
`gmt_modified` datetime(6) NULL DEFAULT NULL,
PRIMARY KEY (`branch_id`) USING BTREE,
INDEX `idx_xid`(`xid`) USING BTREE
) ENGINE = InnoDB CHARACTER SET = utf8 COLLATE = utf8_general_ci ROW_FORMAT = Compact;

-- ----------------------------
-- 全局事务表
-- ----------------------------
DROP TABLE IF EXISTS `global_table`;
CREATE TABLE `global_table` (
`xid` varchar(128) CHARACTER SET utf8 COLLATE utf8_general_ci NOT NULL,
`transaction_id` bigint(20) NULL DEFAULT NULL,
`status` tinyint(4) NOT NULL,
`application_id` varchar(32) CHARACTER SET utf8 COLLATE utf8_general_ci NULL DEFAULT NULL,
`transaction_service_group` varchar(32) CHARACTER SET utf8 COLLATE utf8_general_ci NULL DEFAULT NULL,
`transaction_name` varchar(128) CHARACTER SET utf8 COLLATE utf8_general_ci NULL DEFAULT NULL,
`timeout` int(11) NULL DEFAULT NULL,
`begin_time` bigint(20) NULL DEFAULT NULL,
`application_data` varchar(2000) CHARACTER SET utf8 COLLATE utf8_general_ci NULL DEFAULT NULL,
`gmt_create` datetime NULL DEFAULT NULL,
`gmt_modified` datetime NULL DEFAULT NULL,
PRIMARY KEY (`xid`) USING BTREE,
INDEX `idx_gmt_modified_status`(`gmt_modified`, `status`) USING BTREE,
INDEX `idx_transaction_id`(`transaction_id`) USING BTREE
) ENGINE = InnoDB CHARACTER SET = utf8 COLLATE = utf8_general_ci ROW_FORMAT = Compact;

SET FOREIGN_KEY_CHECKS = 1;

启动TC服务

进入bin目录,运行其中的seata-server.bat

微服务集成Seata

引入依赖

首先,在需要注册给RM管理的微服务中(分支事务)中引入依赖:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
<!--seata-->
<dependency>
<groupId>com.alibaba.cloud</groupId>
<artifactId>spring-cloud-starter-alibaba-seata</artifactId>
<exclusions>
<!--版本较低,1.3.0,因此排除-->
<exclusion>
<artifactId>seata-spring-boot-starter</artifactId>
<groupId>io.seata</groupId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>io.seata</groupId>
<artifactId>seata-spring-boot-starter</artifactId>
<!--seata starter 采用1.4.2版本-->
<version>${seata.version}</version>
</dependency>

配置TC地址

在分支事务中的application.yml中,配置TC服务信息,通过注册中心nacos,结合服务名称获取TC地址:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
seata:
registry: # TC服务注册中心的配置,微服务根据这些信息去注册中心获取tc服务地址
type: nacos # 注册中心类型 nacos
nacos:
server-addr: 127.0.0.1:8848 # nacos地址
namespace: "" # namespace,默认为空
group: DEFAULT_GROUP # 分组,默认是DEFAULT_GROUP
application: seata-tc-server # seata服务名称
username: nacos
password: nacos
tx-service-group: seata-demo # 事务组名称
service:
vgroup-mapping: # 事务组与cluster的映射关系
seata-demo: SH

微服务如何根据这些配置寻找TC的地址呢?

我们知道注册到Nacos中的微服务,确定一个具体实例需要四个信息:

  • namespace:命名空间
  • group:分组
  • application:服务名
  • cluster:集群名

以上四个信息,在刚才的yaml文件中都能找到:

seata微服务注册.png

namespace为空,就是默认的public

结合起来,TC服务的信息就是:public@DEFAULT_GROUP@seata-tc-server@SH,这样就能确定TC服务集群了。然后就可以去Nacos拉取对应的实例信息了。

seata四种事务模式

XA模式

XA 规范 是 X/Open 组织定义的分布式事务处理(DTP,Distributed Transaction Processing)标准,XA 规范 描述了全局的TM与局部的RM之间的接口,几乎所有主流的数据库都对 XA 规范 提供了支持。

两阶段提交

XA是规范,目前主流数据库都实现了这种规范,实现的原理都是基于两阶段提交。

一阶段:

  • 事务协调者通知每个事物参与者执行本地事务
  • 本地事务执行完成后报告事务执行状态给事务协调者,此时事务不提交,继续持有数据库锁

二阶段:

  • 事务协调者基于一阶段的报告来判断下一步操作
    • 如果一阶段都成功,则通知所有事务参与者,提交事务
    • 如果一阶段任意一个参与者失败,则通知所有事务参与者回滚事务

Seata的XA模型

Seata对原始的XA模式做了简单的封装和改造,以适应自己的事务模型,基本架构如图:

XA模型.png

TM一阶段的工作:

需要找到整个事务入口使用注解@GlobalTransactional开启全局事务

RM一阶段的工作:

① 注册分支事务到TC

② 执行分支业务sql但不提交

③ 报告执行状态到TC

TC二阶段的工作:

  • TC检测各分支事务执行状态

    a.如果都成功,通知所有RM提交事务

    b.如果有失败,通知所有RM回滚事务

RM二阶段的工作:

  • 接收TC指令,提交或回滚事务

优缺点

XA模式的优点是什么?

  • 事务的强一致性,满足ACID原则。
  • 常用数据库都支持,实现简单,并且没有代码侵入

XA模式的缺点是什么?

  • 因为一阶段需要锁定数据库资源,等待二阶段结束才释放,性能较差
  • 依赖关系型数据库实现事务

实现XA模式

Seata的starter已经完成了XA模式的自动装配,实现非常简单,步骤如下:

1)修改application.yml文件(每个参与事务的微服务),开启XA模式:

1
2
seata:
data-source-proxy-mode: XA

2)给发起全局事务的入口方法添加@GlobalTransactional注解:

本例中是OrderServiceImpl中的create方法.

AT模式

AT模式同样是分阶段提交的事务模型,不过缺弥补了XA模型中资源锁定周期过长的缺陷。

Seata的AT模型

基本流程图:

AT模型.png

TM一阶段的工作:

需要找到整个事务入口使用注解@GlobalTransactional开启全局事务

阶段一RM的工作:

  • 注册分支事务
  • 记录undo-log(数据快照)
  • 执行业务sql并提交
  • 报告事务状态

阶段二提交时RM的工作:

  • 删除undo-log即可

阶段二回滚时RM的工作:

  • 根据undo-log恢复数据到更新前

流程梳理

我们用一个真实的业务来梳理下AT模式的原理。

比如,现在又一个数据库表,记录用户余额:

id money
1 100

其中一个分支业务要执行的SQL为:

1
update tb_account set money = money - 10 where id = 1

AT模式下,当前分支事务执行流程如下:

一阶段:

1)TM发起并注册全局事务到TC

2)TM调用分支事务

3)分支事务准备执行业务SQL

4)RM拦截业务SQL,根据where条件查询原始数据,形成快照。

1
2
3
{
"id": 1, "money": 100
}

5)RM执行业务SQL,提交本地事务,释放数据库锁。此时 money = 90

6)RM报告本地事务状态给TC

二阶段:

1)TM通知TC事务结束

2)TC检查分支事务状态

a)如果都成功,则立即删除快照

b)如果有分支事务失败,需要回滚。读取快照数据({"id": 1, "money": 100}),将快照恢复到数据库。此时数据库再次恢复为100

流程图:

AT模型过程.png

AT与XA的区别

简述AT模式与XA模式最大的区别是什么?

  • XA模式一阶段不提交事务,锁定资源;AT模式一阶段直接提交,不锁定资源。
  • XA模式依赖数据库机制实现回滚;AT模式利用数据快照实现数据回滚。
  • XA模式强一致;AT模式最终一致

脏写问题

在多线程并发访问AT模式的分布式事务时,有可能出现脏写问题,如图:

AT脏写问题.png

解决思路就是引入了全局锁的概念。在释放DB锁之前,先拿到全局锁。避免同一时刻有另外一个事务来操作当前数据。

AT全局锁解决脏写问题.png

seata事务管理与非seata事务管理脏写问题

ATseata与非seata事务隔离.png

需要注意的是:
这种情况并不多见,因为一般在代码基本架构设计逻辑中,会尽量避免出现同一个表并发性业务出现seata和非seata事务同时操作的情况
但是尽管很少见 AT模式依然借用先后快照给出了处理方法(虽然处理方法并不友好)

优缺点

AT模式的优点:

  • 一阶段完成直接提交事务,释放数据库资源,性能比较好
  • 利用全局锁实现读写隔离
  • 没有代码侵入,框架自动完成回滚和提交

AT模式的缺点:

  • 两阶段之间属于软状态,属于最终一致
  • 框架的快照功能会影响性能,但比XA模式要好很多

实现AT模式

AT模式中的快照生成、回滚等动作都是由框架自动完成,没有任何代码侵入,因此实现非常简单。

只不过,AT模式需要一个表来记录全局锁、另一张表来记录数据快照undo_log。

1)导入数据库表,记录全局锁

导入Sql文件:seata-at.sql,其中lock_table导入到TC服务关联的数据库,undo_log表导入到微服务关联的数据库:

2)修改application.yml文件,将事务模式修改为AT模式即可:

1
2
seata:
data-source-proxy-mode: AT # 默认就是AT

TCC模式

TCC模式与AT模式非常相似,每阶段都是独立事务,不同的是TCC通过人工编码来实现数据恢复。需要实现三个方法:

  • Try:资源的检测和预留; (其实try中执行的就是无事务模式下的原有逻辑 confirm和cancel是为了保持最终一致性新增的代码)

  • Confirm:完成资源操作业务;要求 Try 成功 Confirm 一定要能成功。

  • Cancel:预留资源释放,可以理解为try的反向操作。

流程分析

举例,一个扣减用户余额的业务。假设账户A原来余额是100,需要余额扣减30元。

  • 阶段一( Try ):检查余额是否充足,如果充足则冻结金额增加30元,可用余额扣除30

此时,总金额 = 冻结金额 + 可用金额,数量依然是100不变。事务直接提交无需等待其它事务。

  • **阶段二(Confirm)**:假如要提交(Confirm),则冻结金额扣减30

确认可以提交,不过之前可用金额已经扣减过了,这里只要清除冻结金额就好了:

此时,总金额 = 冻结金额 + 可用金额 = 0 + 70 = 70元

  • **阶段二(Canncel)**:如果要回滚(Cancel),则冻结金额扣减30,可用余额增加30

需要回滚,那么就要释放冻结金额,恢复可用金额:

Seata的TCC模型

Seata中的TCC模型依然延续之前的事务架构,如图:

TCC模型.png

优缺点

TCC模式的每个阶段是做什么的?

  • Try:资源检查和预留
  • Confirm:业务执行和提交
  • Cancel:预留资源的释放

TCC的优点是什么?

  • 一阶段完成直接提交事务,释放数据库资源,性能好
  • 相比AT模型,无需生成快照,无需使用全局锁,性能最强
  • 不依赖数据库事务,而是依赖补偿操作,可以用于非事务型数据库

TCC的缺点是什么?

  • 有代码侵入,需要人为编写try、Confirm和Cancel接口,太麻烦
  • 软状态,事务是最终一致
  • 需要考虑Confirm和Cancel的失败情况,做好幂等处理

事务悬挂和空回滚和幂等处理

1)空回滚

当某分支事务的try阶段阻塞时,可能导致全局事务超时而触发二阶段的cancel操作。在未执行try操作时先执行了cancel操作,这时cancel不能做回滚,就是空回滚

如图:

TCC空回滚.png

执行cancel操作时,应当判断try是否已经执行,如果尚未执行,则应该空回滚。

2)业务悬挂

对于已经空回滚的业务,之前被阻塞的try操作恢复,继续执行try,就永远不可能confirm或cancel ,事务一直处于中间状态,这就是业务悬挂

执行try操作时,应当判断cancel是否已经执行过了,如果已经执行,应当阻止空回滚后的try操作,避免悬挂

3)幂等处理

在业务中,可能因为网络延迟等问题,出现重复要求confirm和try和cancel,我们需要保证多次执行这些操作的结果是一致的,换句话说,当执行这些操作之前,判断一下之前是否执行过相同操作

在try中因为我们要做防业务悬挂处理,这里面其实包含了幂等处理(会查找冻结表中是否有此事务) 不用再幂等处理

在confirm操作中,自维持了幂等性,(根据xid删除冻结表中的事务记录) 也不用幂等处理

在cancel操作中,需要做幂等处理,所以cancel中需要做两个判断 空回滚和幂等处理

实现TCC模式

解决空回滚和业务悬挂问题,必须要记录当前事务状态,是在try、还是cancel?

1)思路分析

这里我们定义一张表:(对应于原来的account表 表示冻结的资金)

1
2
3
4
5
6
7
8
CREATE TABLE `account_freeze_tbl` (
  `xid` varchar(128NOT NULL,
  `user_id` varchar(255DEFAULT NULL COMMENT '用户id',
  `freeze_money` int(11) unsigned DEFAULT '0' COMMENT '冻结金额',
  `state` int(1DEFAULT NULL COMMENT '事务状态,0:try,1:confirm,2:cancel',
  PRIMARY KEY (`xid`) USING BTREE
) ENGINE=InnoDB DEFAULT CHARSET=utf8 ROW_FORMAT=COMPACT;

其中:

  • xid:是全局事务id
  • freeze_money:用来记录用户冻结金额
  • state:用来记录事务状态

那此时,我们的业务开怎么做呢?

  • Try业务:
    • 记录冻结金额和事务状态到account_freeze表
    • 扣减account表可用金额
  • Confirm业务
    • 根据xid删除account_freeze表的冻结记录
  • Cancel业务
    • 修改account_freeze表,冻结金额为0,state为2
    • 修改account表,恢复可用金额
  • 如何判断是否空回滚?
    • cancel业务中,根据xid查询account_freeze,如果为null则说明try还没做,需要空回滚
  • 如何避免业务悬挂?
    • try业务中,根据xid查询account_freeze ,如果已经存在则证明Cancel已经执行,拒绝执行try业务

接下来,我们改造account-service,利用TCC实现余额扣减功能。

2)声明TCC接口

TCC的Try、Confirm、Cancel方法都需要在接口中基于注解来声明,

我们在account-service项目中的cn.itcast.account.service包中新建一个接口,声明TCC三个接口:

1
2
3
4
5
6
7
8
9
10
11
12
13
14

//指定TCC事务
@LocalTCC
public interface AccountTCCService {
//try
@TwoPhaseBusinessAction(name = "deduct",commitMethod = "confirm",rollbackMethod = "cancel")
void deduct(@BusinessActionContextParameter(paramName = "userId") String userId,
@BusinessActionContextParameter(paramName = "nomey") int money); //@BusinessActionContextParameter指定携带参数 可在confirm和cancel的ctx中获取
//cofirm

boolean confirm(BusinessActionContext ctx);
//cancel
boolean cancel(BusinessActionContext ctx);
}

3)编写实现类

TCC实现思路.png

在account-service服务中的cn.itcast.account.service.impl包下新建一个类,实现TCC业务:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76

@Slf4j
@Service
public class AccountTCCServiceImpl implements AccountTCCService {

@Autowired
private AccountMapper accountMapper;

@Autowired
private AccountFreezeMapper accountFreezeMapper;

@Override
public void deduct(String userId, int money) {

//0 获取事务id
String xid = RootContext.getXID();
//悬挂判断
//判断表中是否有冻结记录,如果有,则已经执行过cancel了,不能执行try了
AccountFreeze accountFreeze1 = accountFreezeMapper.selectById(xid);
if(accountFreeze1 != null){
return;
}
//1 扣减可用余额(数据库字段定义了非负 所以不用判断了)
accountMapper.deduct(userId,money);
//记录冻结金额,事务状态
AccountFreeze accountFreeze = new AccountFreeze();
accountFreeze.setXid(xid);
accountFreeze.setFreezeMoney(money);
accountFreeze.setState(AccountFreeze.State.TRY);
accountFreeze.setUserId(userId);
accountFreezeMapper.insert(accountFreeze);
}

@Override
public boolean confirm(BusinessActionContext ctx) {
//获取事务id
String xid = ctx.getXid();
//根据事务id删除事务
int count = accountFreezeMapper.deleteById(xid);
//只删除了一条记录表示删除成功
return count == 1;
}

@Override
public boolean cancel(BusinessActionContext ctx) {
//查询冻结记录
String xid = ctx.getXid();
AccountFreeze accountFreeze = accountFreezeMapper.selectById(xid);
//获取用户id
String userId = ctx.getActionContext("userId").toString();
//空回滚的判断
if(accountFreeze == null){
//证明try没有执行 直接进入第二阶段空回滚
accountFreeze = new AccountFreeze();
accountFreeze.setXid(xid);
accountFreeze.setUserId(userId);
accountFreeze.setFreezeMoney(0);
accountFreeze.setState(AccountFreeze.State.CANCEL);
accountFreezeMapper.insert(accountFreeze);
return true;
}
//幂等判断
if(accountFreeze.getState() == AccountFreeze.State.CANCEL){
//表中已经处理过一次cancel了
return true;
}
//恢复删除金额
accountMapper.refund(accountFreeze.getUserId(),accountFreeze.getFreezeMoney());
//将冻结金额清0 状态更改
accountFreeze.setState(AccountFreeze.State.CANCEL);
accountFreeze.setFreezeMoney(0);
int count = accountFreezeMapper.updateById(accountFreeze);
return count == 1;
}
}

TCC模式适用场景

使用场景:高并发,要求效率必须高,操作的数据库字段应该是降值而不是升值(很重要)
不适用场景:操作的数据库字段如为新增一条记录,或者数量上升无法使用,可改用AT(注意:不同微服务可以采用不同seata模式 可以共存的)

为什么只能降值?
一业务增值 一业务降值这种模式极端情况会出现问题
比如:
一业务:
基础100元
充值30元
总额:130元 –》冻结30元
二业务:(在一业务confirm之前)
扣款130元
总额0元–》confirm 冻结0元
一业务回滚试图扣30元但是此时账户中已经没钱了 扣款失败–》联系人工(cancel)

SAGA模式

原理

在 Saga 模式下,分布式事务内有多个参与者,每一个参与者都是一个冲正补偿服务,需要用户根据业务场景实现其正向操作和逆向回滚操作。

分布式事务执行过程中,依次执行各参与者的正向操作,如果所有正向操作均执行成功,那么分布式事务提交。如果任何一个正向操作执行失败,那么分布式事务会去退回去执行前面各参与者的逆向回滚操作,回滚已提交的参与者,使分布式事务回到初始状态。

SAGA模式.png

Saga也分为两个阶段:

  • 一阶段:直接提交本地事务
  • 二阶段:成功则什么都不做;失败则通过编写补偿业务来回滚

优缺点

优点:

  • 事务参与者可以基于事件驱动实现异步调用,吞吐高
  • 一阶段直接提交事务,无锁,性能好
  • 不用编写TCC中的三个阶段,实现简单

缺点:

  • 软状态持续时间不确定,时效性差
  • 没有锁,没有事务隔离,会有脏写

四种模式对比

值得注意的是 不同模式基于seata可以共存,也就是不同微服务可以采取不同模式

我们从以下几个方面来对比四种实现:

  • 一致性:能否保证事务的一致性?强一致还是最终一致?
  • 隔离性:事务之间的隔离性如何?
  • 代码侵入:是否需要对业务代码改造?
  • 性能:有无性能损耗?
  • 场景:常见的业务场景

如图:

四种seata模式对比.png

高可用

Seata的TC服务作为分布式事务核心,一定要保证集群的高可用性。

高可用架构模型

搭建TC服务集群非常简单,启动多个TC服务,注册到nacos即可。

但集群并不能确保100%安全,万一集群所在机房故障怎么办?所以如果要求较高,一般都会做异地多机房容灾。

微服务基于事务组(tx-service-group)与TC集群的映射关系,来查找当前应该使用哪个TC集群。当SH集群故障时,只需要将vgroup-mapping中的映射关系改成HZ。则所有微服务就会切换到HZ的TC集群了。

实现高可用

模拟异地容灾的TC集群

计划启动两台seata的tc服务节点:

节点名称 ip地址 端口号 集群名称
seata 127.0.0.1 8091 SH
seata2 127.0.0.1 8092 HZ

之前我们已经启动了一台seata服务,端口是8091,集群名为SH。

现在,将seata目录复制一份,起名为seata2

修改seata2/conf/registry.conf内容如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
registry {
# tc服务的注册中心类,这里选择nacos,也可以是eureka、zookeeper等
type = "nacos"

nacos {
# seata tc 服务注册到 nacos的服务名称,可以自定义
application = "seata-tc-server"
serverAddr = "127.0.0.1:8848"
group = "DEFAULT_GROUP"
namespace = ""
cluster = "HZ"
username = "nacos"
password = "nacos"
}
}

config {
# 读取tc服务端的配置文件的方式,这里是从nacos配置中心读取,这样如果tc是集群,可以共享配置
type = "nacos"
# 配置nacos地址等信息
nacos {
serverAddr = "127.0.0.1:8848"
namespace = ""
group = "SEATA_GROUP"
username = "nacos"
password = "nacos"
dataId = "seataServer.properties"
}
}

进入seata2/bin目录,然后运行命令:

1
seata-server.bat -p 8092

将事务组映射配置到nacos

接下来,我们需要将tx-service-group与cluster的映射关系都配置到nacos配置中心。

新建一个配置,注意groupid和名字应该与后面的微服务配置文件中的一致

配置的内容如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
# 事务组映射关系
service.vgroupMapping.seata-demo=SH

service.enableDegrade=false
service.disableGlobalTransaction=false
# 与TC服务的通信配置
transport.type=TCP
transport.server=NIO
transport.heartbeat=true
transport.enableClientBatchSendRequest=false
transport.threadFactory.bossThreadPrefix=NettyBoss
transport.threadFactory.workerThreadPrefix=NettyServerNIOWorker
transport.threadFactory.serverExecutorThreadPrefix=NettyServerBizHandler
transport.threadFactory.shareBossWorker=false
transport.threadFactory.clientSelectorThreadPrefix=NettyClientSelector
transport.threadFactory.clientSelectorThreadSize=1
transport.threadFactory.clientWorkerThreadPrefix=NettyClientWorkerThread
transport.threadFactory.bossThreadSize=1
transport.threadFactory.workerThreadSize=default
transport.shutdown.wait=3
# RM配置
client.rm.asyncCommitBufferLimit=10000
client.rm.lock.retryInterval=10
client.rm.lock.retryTimes=30
client.rm.lock.retryPolicyBranchRollbackOnConflict=true
client.rm.reportRetryCount=5
client.rm.tableMetaCheckEnable=false
client.rm.tableMetaCheckerInterval=60000
client.rm.sqlParserType=druid
client.rm.reportSuccessEnable=false
client.rm.sagaBranchRegisterEnable=false
# TM配置
client.tm.commitRetryCount=5
client.tm.rollbackRetryCount=5
client.tm.defaultGlobalTransactionTimeout=60000
client.tm.degradeCheck=false
client.tm.degradeCheckAllowTimes=10
client.tm.degradeCheckPeriod=2000

# undo日志配置
client.undo.dataValidation=true
client.undo.logSerialization=jackson
client.undo.onlyCareUpdateColumns=true
client.undo.logTable=undo_log
client.undo.compress.enable=true
client.undo.compress.type=zip
client.undo.compress.threshold=64k
client.log.exceptionRate=100

微服务读取nacos配置

接下来,需要修改每一个微服务的application.yml文件,让微服务读取nacos中的client.properties文件:

1
2
3
4
5
6
7
8
9
seata:
config:
type: nacos
nacos:
server-addr: 127.0.0.1:8848
username: nacos
password: nacos
group: SEATA_GROUP
data-id: client.properties #配置文件名字

重启微服务,现在微服务到底是连接tc的SH集群,还是tc的HZ集群,都统一由nacos的client.properties来决定了。(热更新)

分布式缓存

– 基于Redis集群解决单机Redis存在的问题

单机的Redis存在四大问题:

单机redis四大问题.png

Redis持久化

Redis有两种持久化方案:

  • RDB持久化
  • AOF持久化

RDB持久化

RDB全称Redis Database Backup file(Redis数据备份文件),也被叫做Redis数据快照。简单来说就是把内存中的所有数据都记录到磁盘中。当Redis实例故障重启后,从磁盘读取快照文件,恢复数据。快照文件称为RDB文件,默认是保存在当前运行目录。

执行时机

RDB持久化在四种情况下会执行:

  • 执行save命令
  • 执行bgsave命令
  • Redis停机时
  • 触发RDB条件时

1)save命令

save命令会导致主进程执行RDB,这个过程中其它所有命令都会被阻塞。只有在数据迁移时可能用到。

2)bgsave命令

这个命令执行后会开启独立进程(子进程)完成RDB,主进程可以持续处理用户请求,不受影响。

3)停机时

Redis停机时会执行一次save命令,实现RDB持久化。

4)触发RDB条件

Redis内部有触发RDB的机制,可以在redis.conf文件中找到,格式如下:

1
2
3
4
# 900秒内,如果至少有1个key被修改,则执行bgsave , 如果是save "" 则表示禁用RDB
save 900 1
save 300 10
save 60 10000

RDB的其它配置也可以在redis.conf文件中设置:

1
2
3
4
5
6
7
8
# 是否压缩 ,建议不开启,压缩也会消耗cpu,磁盘的话不值钱
rdbcompression yes

# RDB文件名称
dbfilename dump.rdb

# 文件保存的路径目录
dir ./

RDB原理

bgsave开始时会fork主进程得到子进程,子进程共享主进程的内存数据。完成fork后读取内存数据并写入 RDB 文件。(如果之前有RDB文件 会删除之前的生成新的)

bgsaveRDB原理.png

fork采用的是copy-on-write技术:

  • 当主进程执行读操作时,访问共享内存;
  • 当主进程执行写操作时,则会拷贝一份数据,执行写操作。

小结

RDB方式bgsave的基本流程?

  • fork主进程得到一个子进程,共享内存空间
  • 子进程读取内存数据并写入新的RDB文件
  • 用新RDB文件替换旧的RDB文件

RDB会在什么时候执行?save 60 1000代表什么含义?

  • 默认是服务停止时
  • 代表60秒内至少执行1000次修改则触发RDB

RDB的缺点?

  • RDB执行间隔时间长,两次RDB之间写入数据有丢失的风险
  • fork子进程、压缩、写出RDB文件都比较耗时

AOF持久化

AOF原理

AOF全称为Append Only File(追加文件)。Redis处理的每一个写命令都会记录在AOF文件,可以看做是命令日志文件。

AOF配置

AOF默认是关闭的,需要修改redis.conf配置文件来开启AOF:

1
2
3
4
# 是否开启AOF功能,默认是no
appendonly yes
# AOF文件的名称
appendfilename "appendonly.aof"

AOF的命令记录的频率也可以通过redis.conf文件来配:

1
2
3
4
5
6
# 表示每执行一次写命令,立即记录到AOF文件
appendfsync always
# 写命令执行完先放入AOF缓冲区,然后表示每隔1秒将缓冲区数据写到AOF文件,是默认方案
appendfsync everysec
# 写命令执行完先放入AOF缓冲区,由操作系统决定何时将缓冲区内容写回磁盘
appendfsync no

三种策略对比:

AOF三种执行策略比较.png

AOF文件重写

因为是记录命令,AOF文件会比RDB文件大的多。而且AOF会记录对同一个key的多次写操作,但只有最后一次写操作才有意义。通过执行BGREWRITEAOF命令,可以让AOF文件执行重写功能,用最少的命令达到相同效果。

Redis也会在触发阈值时自动去重写AOF文件。阈值也可以在redis.conf中配置:

1
2
3
4
# AOF文件比上次文件 增长超过多少百分比则触发重写
auto-aof-rewrite-percentage 100
# AOF文件体积最小多大以上才触发重写
auto-aof-rewrite-min-size 64mb

RDB与AOF对比

RDB和AOF各有自己的优缺点,如果对数据安全性要求较高,在实际开发中往往会结合两者来使用。

RDB与AOF对比.png

RDB的快照文件保存对象是redis当前数据,所以每次更新时会删除已有的上次文件,AOF的文件保存对象是redis所有的操作,所以每次更新时会在原appendonly.aof文件上增加

Redis主从

搭建主从架构

虚拟机shell可视化采取mobaXterm

单节点Redis的并发能力是有上限的,要进一步提高Redis的并发能力,就需要搭建主从集群,实现读写分离。

集群结构

Redis主从架构.png

三个节点,一个主节点,两个从节点。

这里我们会在同一台虚拟机中开启3个redis实例,模拟主从集群,信息如下:

IP PORT 角色
192.168.150.101 7001 master
192.168.150.101 7002 slave
192.168.150.101 7003 slave

准备实例和配置

要在同一台虚拟机开启3个实例,必须准备三份不同的配置文件和目录,配置文件所在目录也就是工作目录。

1)创建目录

我们创建三个文件夹,名字分别叫7001、7002、7003:

1
2
3
4
# 进入/tmp目录
cd /tmp
# 创建目录
mkdir 7001 7002 7003

2)恢复原始配置

修改redis-6.2.4/redis.conf文件,将其中的持久化模式改为默认的RDB模式,AOF保持关闭状态。

1
2
3
4
5
6
7
8
# 开启RDB
# save ""
save 3600 1
save 300 100
save 60 10000

# 关闭AOF
appendonly no

3)拷贝配置文件到每个实例目录

然后将redis-6.2.4/redis.conf文件拷贝到三个目录中(在/tmp目录执行下列命令):

1
2
3
4
5
6
# 方式一:逐个拷贝
cp redis-6.2.4/redis.conf 7001
cp redis-6.2.4/redis.conf 7002
cp redis-6.2.4/redis.conf 7003
# 方式二:管道组合命令,一键拷贝
echo 7001 7002 7003 | xargs -t -n 1 cp redis-6.2.4/redis.conf

4)修改每个实例的端口、工作目录

修改每个文件夹内的配置文件,将端口分别修改为7001、7002、7003,将rdb文件保存位置都修改为自己所在目录(在/tmp目录执行下列命令):

1
2
3
sed -i -e 's/6379/7001/g' -e 's/dir .\//dir \/tmp\/7001\//g' 7001/redis.conf
sed -i -e 's/6379/7002/g' -e 's/dir .\//dir \/tmp\/7002\//g' 7002/redis.conf
sed -i -e 's/6379/7003/g' -e 's/dir .\//dir \/tmp\/7003\//g' 7003/redis.conf

5)修改每个实例的声明IP

虚拟机本身有多个IP,为了避免将来混乱,我们需要在redis.conf文件中指定每一个实例的绑定ip信息,格式如下:

1
2
# redis实例的声明 IP
replica-announce-ip 192.168.150.101

每个目录都要改,我们一键完成修改(在/tmp目录执行下列命令):

1
2
3
4
5
6
7
# 逐一执行 要用自己的虚拟机ip
sed -i '1a replica-announce-ip 192.168.150.101' 7001/redis.conf
sed -i '1a replica-announce-ip 192.168.150.101' 7002/redis.conf
sed -i '1a replica-announce-ip 192.168.150.101' 7003/redis.conf

# 或者一键修改
printf '%s\n' 7001 7002 7003 | xargs -I{} -t sed -i '1a replica-announce-ip 192.168.150.101' {}/redis.conf

启动

为了方便查看日志,我们打开3个ssh窗口,分别启动3个redis实例,启动命令:

1
2
3
4
5
6
# 第1个
redis-server 7001/redis.conf
# 第2个
redis-server 7002/redis.conf
# 第3个
redis-server 7003/redis.conf

如果要一键停止,可以运行下面命令:

1
printf '%s\n' 7001 7002 7003 | xargs -I{} -t redis-cli -p {} shutdown

开启主从关系

现在三个实例还没有任何关系,要配置主从可以使用replicaof 或者slaveof(5.0以前)命令。

有临时和永久两种模式:

  • 修改配置文件(永久生效)

    • 在redis.conf中添加一行配置:slaveof <masterip> <masterport>
  • 使用redis-cli客户端连接到redis服务,执行slaveof命令(重启后失效):

1
slaveof <masterip> <masterport>

注意:在5.0以后新增命令replicaof,与salveof效果一致。

演示方式二

通过redis-cli命令连接7002,执行下面命令:

1
2
3
4
# 连接 7002
redis-cli -p 7002
# 执行slaveof
slaveof 192.168.150.101 7001

通过redis-cli命令连接7003,执行下面命令:

1
2
3
4
# 连接 7003
redis-cli -p 7003
# 执行slaveof
slaveof 192.168.150.101 7001

然后连接 7001节点,查看集群状态:

1
2
3
4
# 连接 7001
redis-cli -p 7001
# 查看状态
info replication

主从数据同步原理

全量同步

主从第一次建立连接时,会执行全量同步,将master节点的所有数据都拷贝给slave节点,流程:

这里有一个问题,master如何得知salve是第一次来连接呢??

redis全量同步原理.png

有几个概念,可以作为判断依据:

  • Replication Id:简称replid,是数据集的标记,id一致则说明是同一数据集。每一个master都有唯一的replid,slave则会继承master节点的replid
  • offset:偏移量,随着记录在repl_baklog中的数据增多而逐渐增大。slave完成同步时也会记录当前同步的offset。如果slave的offset小于master的offset,说明slave数据落后于master,需要更新。

因此slave做数据同步,必须向master声明自己的replication id 和offset,master才可以判断到底需要同步哪些数据。

因为slave原本也是一个master,有自己的replid和offset,当第一次变成slave,与master建立连接时,发送的replid和offset是自己的replid和offset。

master判断发现slave发送来的replid与自己的不一致,说明这是一个全新的slave,就知道要做全量同步了。

master会将自己的replid和offset都发送给这个slave,slave保存这些信息。以后slave的replid就与master一致了。

因此,master判断一个节点是否是第一次同步的依据,就是看replid是否一致

完整流程描述:

  • slave节点请求增量同步
  • master节点判断replid,发现不一致,拒绝增量同步
  • master将完整内存数据生成RDB,这里会在硬盘生成对应的RDB文件,发送RDB到slave
  • slave清空本地数据,加载master的RDB
  • master将RDB期间的命令记录在repl_baklog,并持续将log中的命令发送给slave
  • slave执行接收到的命令,保持与master之间的同步

增量同步

全量同步需要先做RDB,然后将RDB文件通过网络传输个slave,成本太高了。因此除了第一次做全量同步,其它大多数时候slave与master都是做增量同步

什么是增量同步?就是只更新slave与master存在差异的部分数据。

repl_backlog原理

master怎么知道slave与自己的数据差异在哪里呢?

这就要说到全量同步时的repl_baklog文件了。

这个文件是一个固定大小的数组,只不过数组是环形,也就是说角标到达数组末尾后,会再次从0开始读写,这样数组头部的数据就会被覆盖。

repl_baklog中会记录Redis处理过的命令日志及offset,包括master当前的offset,和slave已经拷贝到的offset

slave与master的offset之间的差异,就是salve需要增量拷贝的数据了。

随着不断有数据写入,master的offset逐渐变大,slave也不断的拷贝,追赶master的offset

直到数组被填满

此时,如果有新的数据写入,就会覆盖数组中的旧数据。不过,旧的数据只要是绿色的,说明是已经被同步到slave的数据,即便被覆盖了也没什么影响。因为未同步的仅仅是红色部分。

但是,如果slave出现网络阻塞,导致master的offset远远超过了slave的offset

如果master继续写入新数据,其offset就会覆盖旧的数据,直到将slave现在的offset也覆盖:

redis增量同步上限.png

棕色框中的红色部分,就是尚未同步,但是却已经被覆盖的数据。此时如果slave恢复,需要同步,却发现自己的offset都没有了,无法完成增量同步了。只能做全量同步。

主从同步优化

主从同步可以保证主从数据的一致性,非常重要。

可以从以下几个方面来优化Redis主从就集群:

  • 在master中配置repl-diskless-sync yes启用无磁盘复制,避免全量同步时的磁盘IO。
  • Redis单节点上的内存占用不要太大,减少RDB导致的过多磁盘IO
  • 适当提高repl_baklog的大小,发现slave宕机时尽快实现故障恢复,尽可能避免全量同步
  • 限制一个master上的slave节点数量,如果实在是太多slave,则可以采用主-从-从链式结构,减少master压力

主从从架构图:

redis主从从架构.png

小结

简述全量同步和增量同步区别?

  • 全量同步:master将完整内存数据生成RDB,发送RDB到slave。后续命令则记录在repl_baklog,逐个发送给slave。
  • 增量同步:slave提交自己的offset到master,master获取repl_baklog中从offset之后的命令给slave

什么时候执行全量同步?

  • slave节点第一次连接master节点时
  • slave节点断开时间太久,repl_baklog中的offset已经被覆盖时

什么时候执行增量同步?

  • slave节点断开又恢复,并且在repl_baklog中能找到offset时

Redis哨兵

Redis提供了哨兵(Sentinel)机制来实现主从集群的自动故障恢复。

哨兵原理

集群结构和作用

哨兵的结构如图:

redis哨兵集群.png

哨兵的作用如下:

  • 监控:Sentinel 会不断检查您的master和slave是否按预期工作
  • 自动故障恢复:如果master故障,Sentinel会将一个slave提升为master。当故障实例恢复后也以新的master为主
  • 通知:Sentinel充当Redis客户端的服务发现来源,当集群发生故障转移时,会将最新信息推送给Redis的客户端

集群监控原理

Sentinel基于心跳机制监测服务状态,每隔1秒向集群的每个实例发送ping命令:

•主观下线:如果某sentinel节点发现某实例未在规定时间响应,则认为该实例主观下线

•客观下线:若超过指定数量(quorum)的sentinel都认为该实例主观下线,则该实例客观下线。quorum值最好超过Sentinel实例数量的一半。

集群故障恢复原理

一旦发现master故障,sentinel需要在salve中选择一个作为新的master,选择依据是这样的:

  • 首先会判断slave节点与master节点断开时间长短,如果超过指定值(down-after-milliseconds * 10)则会排除该slave节点
  • 然后判断slave节点的slave-priority值,越小优先级越高,如果是0则永不参与选举
  • 如果slave-prority一样,则判断slave节点的offset值,越大说明数据越新,优先级越高
  • 最后是判断slave节点的运行id大小,越小优先级越高。

当选出一个新的master后,该如何实现切换呢?

流程如下:

  • sentinel给备选的slave1节点发送slaveof no one命令,让该节点成为master
  • sentinel给所有其它slave发送slaveof 192.168.150.101 7002 命令,让这些slave成为新master的从节点,开始从新的master上同步数据。
  • 最后,sentinel将故障节点标记为slave,(标记原理是修改故障节点的redis.conf文件,添加SLAVEOF 新的master的ip 端口)当故障节点恢复后会自动成为新的master的slave节点

小结

Sentinel的三个作用是什么?

  • 监控
  • 故障转移
  • 通知

Sentinel如何判断一个redis实例是否健康?

  • 每隔1秒发送一次ping命令,如果超过一定时间没有相向则认为是主观下线
  • 如果大多数sentinel都认为实例主观下线,则判定服务下线

故障转移步骤有哪些?

  • 首先选定一个slave作为新的master,执行slaveof no one
  • 然后让所有节点都执行slaveof 新master
  • 修改故障节点配置,添加slaveof 新master

搭建哨兵集群

集群结构

这里我们搭建一个三节点形成的Sentinel集群,来监管之前的Redis主从集群。如图:

三个sentinel实例信息如下:

节点 IP PORT
s1 192.168.150.101 27001
s2 192.168.150.101 27002
s3 192.168.150.101 27003

准备实例和配置

要在同一台虚拟机开启3个实例,必须准备三份不同的配置文件和目录,配置文件所在目录也就是工作目录。

我们创建三个文件夹,名字分别叫s1、s2、s3:

1
2
3
4
# 进入/tmp目录
cd /tmp
# 创建目录
mkdir s1 s2 s3

然后我们在s1目录创建一个sentinel.conf文件,添加下面的内容:

1
2
3
4
5
6
port 27001
sentinel announce-ip 192.168.150.101
sentinel monitor mymaster 192.168.150.101 7001 2
sentinel down-after-milliseconds mymaster 5000
sentinel failover-timeout mymaster 60000
dir "/tmp/s1"

解读:

  • port 27001:是当前sentinel实例的端口
  • sentinel monitor mymaster 192.168.150.101 7001 2:指定主节点信息
    • mymaster:主节点名称,自定义,任意写
    • 192.168.150.101 7001:主节点的ip和端口
    • 2:选举master时的quorum值

然后将s1/sentinel.conf文件拷贝到s2、s3两个目录中(在/tmp目录执行下列命令):

1
2
3
4
5
# 方式一:逐个拷贝
cp s1/sentinel.conf s2
cp s1/sentinel.conf s3
# 方式二:管道组合命令,一键拷贝
echo s2 s3 | xargs -t -n 1 cp s1/sentinel.conf

修改s2、s3两个文件夹内的配置文件,将端口分别修改为27002、27003:

1
2
sed -i -e 's/27001/27002/g' -e 's/s1/s2/g' s2/sentinel.conf
sed -i -e 's/27001/27003/g' -e 's/s1/s3/g' s3/sentinel.conf

启动

为了方便查看日志,我们打开3个ssh窗口,分别启动3个redis实例,启动命令:

1
2
3
4
5
6
# 第1个
redis-sentinel s1/sentinel.conf
# 第2个
redis-sentinel s2/sentinel.conf
# 第3个
redis-sentinel s3/sentinel.conf

RedisTemplate

在Sentinel集群监管下的Redis主从集群,其节点会因为自动故障转移而发生变化,Redis的客户端必须感知这种变化,及时更新连接信息。Spring的RedisTemplate底层利用lettuce实现了节点的感知和自动切换。

引入依赖

在项目的pom文件中引入依赖:

1
2
3
4
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-data-redis</artifactId>
</dependency>

配置Redis地址

然后在配置文件application.yml中指定redis的sentinel相关信息:

1
2
3
4
5
6
7
8
spring:
redis:
sentinel:
master: mymaster
nodes:
- 192.168.150.101:27001
- 192.168.150.101:27002
- 192.168.150.101:27003

配置读写分离

在项目的启动类中,添加一个新的bean:

1
2
3
4
@Bean
public LettuceClientConfigurationBuilderCustomizer clientConfigurationBuilderCustomizer(){
return clientConfigurationBuilder -> clientConfigurationBuilder.readFrom(ReadFrom.REPLICA_PREFERRED);
}

这个bean中配置的就是读写策略,包括四种:

  • MASTER:从主节点读取
  • MASTER_PREFERRED:优先从master节点读取,master不可用才读取replica
  • REPLICA:从slave(replica)节点读取
  • REPLICA _PREFERRED:优先从slave(replica)节点读取,所有的slave都不可用才读取master

Redis分片集群

搭建分片集群

主从和哨兵可以解决高可用、高并发读的问题。但是依然有两个问题没有解决:

  • 海量数据存储问题

  • 高并发写的问题

使用分片集群可以解决上述问题,如图:

redis分片集群架构.png

分片集群特征:

  • 集群中有多个master,每个master保存不同数据

  • 每个master都可以有多个slave节点

  • master之间通过ping监测彼此健康状态(互为哨兵)

  • 客户端请求可以访问集群任意节点,最终都会被转发到正确节点

集群结构

分片集群需要的节点数量较多,这里我们搭建一个最小的分片集群,包含3个master节点,每个master包含一个slave节点,结构如下:

这里我们会在同一台虚拟机中开启6个redis实例,模拟分片集群,信息如下:

IP PORT 角色
192.168.150.101 7001 master
192.168.150.101 7002 master
192.168.150.101 7003 master
192.168.150.101 8001 slave
192.168.150.101 8002 slave
192.168.150.101 8003 slave

准备实例和配置

删除之前的7001、7002、7003这几个目录,重新创建出7001、7002、7003、8001、8002、8003目录:

1
2
3
4
5
6
# 进入/tmp目录
cd /tmp
# 删除旧的,避免配置干扰
rm -rf 7001 7002 7003
# 创建目录
mkdir 7001 7002 7003 8001 8002 8003

在/tmp下准备一个新的redis.conf文件,内容如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
port 6379
# 开启集群功能
cluster-enabled yes
# 集群的配置文件名称,不需要我们创建,由redis自己维护
cluster-config-file /tmp/6379/nodes.conf
# 节点心跳失败的超时时间
cluster-node-timeout 5000
# 持久化文件存放目录
dir /tmp/6379
# 绑定地址
bind 0.0.0.0
# 让redis后台运行
daemonize yes
# 注册的实例ip
replica-announce-ip 192.168.174.128
# 保护模式
protected-mode no
# 数据库数量
databases 1
# 日志
logfile /tmp/6379/run.log

将这个文件拷贝到每个目录下:

1
2
3
4
# 进入/tmp目录
cd /tmp
# 执行拷贝
echo 7001 7002 7003 8001 8002 8003 | xargs -t -n 1 cp redis.conf

修改每个目录下的redis.conf,将其中的6379修改为与所在目录一致:

1
2
3
4
# 进入/tmp目录
cd /tmp
# 修改配置文件
printf '%s\n' 7001 7002 7003 8001 8002 8003 | xargs -I{} -t sed -i 's/6379/{}/g' {}/redis.conf

启动

因为已经配置了后台启动模式,所以可以直接启动服务:

1
2
3
4
# 进入/tmp目录
cd /tmp
# 一键启动所有服务
printf '%s\n' 7001 7002 7003 8001 8002 8003 | xargs -I{} -t redis-server {}/redis.conf

通过ps查看状态:

1
ps -ef | grep redis

如果要关闭所有进程,可以执行命令:

1
ps -ef | grep redis | awk '{print $2}' | xargs kill

或者(推荐这种方式):

1
printf '%s\n' 7001 7002 7003 8001 8002 8003 | xargs -I{} -t redis-cli -p {} shutdown

创建集群

虽然服务启动了,但是目前每个服务之间都是独立的,没有任何关联。

我们需要执行命令来创建集群,在Redis5.0之前创建集群比较麻烦,5.0之后集群管理命令都集成到了redis-cli中。

1)Redis5.0之前

Redis5.0之前集群命令都是用redis安装包下的src/redis-trib.rb来实现的。因为redis-trib.rb是有ruby语言编写的所以需要安装ruby环境。

1
2
3
# 安装依赖
yum -y install zlib ruby rubygems
gem install redis

然后通过命令来管理集群:

1
2
3
4
# 进入redis的src目录
cd /tmp/redis-6.2.4/src
# 创建集群
./redis-trib.rb create --replicas 1 192.168.174.128:7001 192.168.174.128:7002 192.168.174.128:7003 192.168.174.128:8001 192.168.174.128:8002 192.168.174.128:8003

2)Redis5.0以后

我们使用的是Redis6.2.4版本,集群管理以及集成到了redis-cli中,格式如下:

1
redis-cli --cluster create --cluster-replicas 1 192.168.174.128:7001 192.168.174.128:7002 192.168.174.128:7003 192.168.174.128:8001 192.168.174.128:8002 192.168.174.128:8003

命令说明:

  • redis-cli --cluster或者./redis-trib.rb:代表集群操作命令
  • create:代表是创建集群
  • --replicas 1或者--cluster-replicas 1 :指定集群中每个master的副本个数为1,此时节点总数 ÷ (replicas + 1) 得到的就是master的数量。因此节点列表中的前n个就是master,其它节点都是slave节点,随机分配到不同master

通过命令可以查看集群状态:

1
redis-cli -p 7001 cluster nodes

集群操作时,需要给redis-cli加上-c参数才可以:

1
redis-cli -c -p 7001

分片集群依然满足基本的主写从读原则和哨兵监控故障转移原则

散列插槽

插槽原理

Redis会把每一个master节点映射到0~16383共16384个插槽(hash slot)上,查看集群信息时就能看到:

数据key不是与节点绑定,而是与插槽绑定。redis会根据key的有效部分计算插槽值,分两种情况:

  • key中包含”{}”,且“{}”中至少包含1个字符,“{}”中的部分是有效部分
  • key中不包含“{}”,整个key都是有效部分

例如:key是num,那么就根据num计算,如果是{itcast}num,则根据itcast计算。计算方式是利用CRC16算法得到一个hash值,然后对16384取余,得到的结果就是slot值。

小结

Redis如何判断某个key应该在哪个实例?

  • 将16384个插槽分配到不同的实例
  • 根据key的有效部分计算哈希值,对16384取余
  • 余数作为插槽,寻找插槽所在实例即可

如何将同一类数据固定的保存在同一个Redis实例?

  • 这一类数据使用相同的有效部分,例如key都以{typeId}为前缀
  • 注意:插槽表示的是位置,所以一个插槽可以放多个键值对

集群伸缩

需求分析

需求:向集群中添加一个新的master节点,并向其中存储 num = 10

  • 启动一个新的redis实例,端口为7004
  • 添加7004到之前的集群,并作为一个master节点
  • 给7004节点分配插槽,使得num这个key可以存储到7004实例

这里需要两个新的功能:

  • 添加一个节点到集群中
  • 将部分插槽分配到新插槽

创建新的redis实例

创建一个文件夹:

1
mkdir 7004

拷贝配置文件:

1
cp redis.conf /7004

修改配置文件:

1
sed /s/6379/7004/g 7004/redis.conf

启动

1
redis-server 7004/redis.conf

添加新节点到redis

执行命令:

1
redis-cli --cluster add-node  192.168.150.101:7004 192.168.150.101:7001

通过命令查看集群状态:

1
redis-cli -p 7001 cluster nodes

转移插槽

1
redis-cli --cluster reshard 192.168.150.101:7001 #转移7001的槽数

后面的一系列指令根据提示执行即可

故障转移

自动故障转移

当集群中有一个master宕机会发生什么呢?

直接停止一个redis实例,例如7002:

1
redis-cli -p 7002 shutdown

1)首先是该实例与其它实例失去连接

2)然后是疑似宕机:

3)最后是确定下线,自动提升一个slave为新的master:

4)当7002再次启动,就会变为一个slave节点了:

原理和哨兵实现故障转移是一样的

手动故障转移

利用cluster failover命令可以手动让集群中的slave的master宕机,让master切换到执行cluster failover命令的这个slave节点,实现无感知的数据迁移。其流程如下:

redisFAILOVER.png

这种failover命令可以指定三种模式:

  • 缺省:默认的流程,如图1~6歩
  • force:省略了对offset的一致性校验
  • takeover:直接执行第5歩,忽略数据一致性、忽略master状态和其它master的意见

案例需求:在7002这个slave节点执行手动故障转移,重新夺回master地位

步骤如下:

1)利用redis-cli连接7002这个节点

2)执行cluster failover命令

RedisTemplate访问分片集群

RedisTemplate底层同样基于lettuce实现了分片集群的支持,而使用的步骤与哨兵模式基本一致:

1)引入redis的starter依赖

2)配置分片集群地址

3)配置读写分离

与哨兵模式相比,其中只有分片集群的配置方式略有差异,如下:

1
2
3
4
5
6
7
8
9
10
spring:
redis:
cluster:
nodes:
- 192.168.150.101:7001
- 192.168.150.101:7002
- 192.168.150.101:7003
- 192.168.150.101:8001
- 192.168.150.101:8002
- 192.168.150.101:8003

多级缓存

什么是多级缓存

传统的缓存策略一般是请求到达Tomcat后,先查询Redis,如果未命中则查询数据库

存在下面的问题:

•请求要经过Tomcat处理,Tomcat的性能成为整个系统的瓶颈

•Redis缓存失效时,会对数据库产生冲击

多级缓存就是充分利用请求处理的每个环节,分别添加缓存,减轻Tomcat压力,提升服务性能:

  • 浏览器访问静态资源时,优先读取浏览器本地缓存
  • 访问非静态资源(ajax查询数据)时,访问服务端
  • 请求到达Nginx后,优先读取Nginx本地缓存
  • 如果Nginx本地缓存未命中,则去直接查询Redis(不经过Tomcat)
  • 如果Redis查询未命中,则查询Tomcat
  • 请求进入Tomcat后,优先查询JVM进程缓存
  • 如果JVM进程缓存未命中,则查询数据库

在多级缓存架构中,Nginx内部需要编写本地缓存查询、Redis查询、Tomcat查询的业务逻辑,因此这样的nginx服务不再是一个反向代理服务器,而是一个编写业务的Web服务器了

因此这样的业务Nginx服务也需要搭建集群来提高并发,再有专门的nginx服务来做反向代理,如图:

另外,我们的Tomcat服务将来也会部署为集群模式:

多级缓存架构.png

可见,多级缓存的关键有两个:

  • 一个是在nginx中编写业务,实现nginx本地缓存、Redis、Tomcat的查询

  • 另一个就是在Tomcat中实现JVM进程缓存

其中Nginx编程则会用到OpenResty框架结合Lua这样的语言。

JVM进程缓存

初识Caffeine

缓存在日常开发中启动至关重要的作用,由于是存储在内存中,数据的读取速度是非常快的,能大量减少对数据库的访问,减少数据库的压力。我们把缓存分为两类:

  • 分布式缓存,例如Redis:
    • 优点:存储容量更大、可靠性更好、可以在集群间共享
    • 缺点:访问缓存有网络开销
    • 场景:缓存数据量较大、可靠性要求较高、需要在集群间共享
  • 进程本地缓存,例如HashMap、GuavaCache:
    • 优点:读取本地内存,没有网络开销,速度更快
    • 缺点:存储容量有限、可靠性较低、无法共享
    • 场景:性能要求较高,缓存数据量较小

我们今天会利用Caffeine框架来实现JVM进程缓存。

Caffeine是一个基于Java8开发的,提供了近乎最佳命中率的高性能的本地缓存库。目前Spring内部的缓存使用的就是Caffeine。GitHub地址:https://github.com/ben-manes/caffeine

1
2
3
4
<dependency>
<groupId>com.github.ben-manes.caffeine</groupId>
<artifactId>caffeine</artifactId>
</dependency>

缓存使用的基本API:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
@Test
void testBasicOps() {
// 构建cache对象
Cache<String, String> cache = Caffeine.newBuilder().build();

// 存数据
cache.put("gf", "迪丽热巴");

// 取数据 不存在就返回null
String gf = cache.getIfPresent("gf");
System.out.println("gf = " + gf);

// 取数据,包含两个参数:
// 参数一:缓存的key
// 参数二:Lambda表达式,表达式参数就是缓存的key,方法体是查询数据库的逻辑
// 优先根据key查询JVM缓存,如果未命中,则执行参数二的Lambda表达式
String defaultGF = cache.get("defaultGF", key -> {
// 根据key去数据库查询数据
return "柳岩";
});
System.out.println("defaultGF = " + defaultGF);
}

Caffeine既然是缓存的一种,肯定需要有缓存的清除策略,不然的话内存总会有耗尽的时候。

Caffeine提供了三种缓存驱逐策略:

  • 基于容量:设置缓存的数量上限
1
2
3
4
// 创建缓存对象
Cache<String, String> cache = Caffeine.newBuilder()
.maximumSize(1) // 设置缓存大小上限为 1 丢弃策略采取的是LRU
.build();
  • 基于时间:设置缓存的有效时间
1
2
3
4
5
6
// 创建缓存对象
Cache<String, String> cache = Caffeine.newBuilder()
// 设置缓存有效期为 10 秒,从最后一次写入开始计时
.expireAfterWrite(Duration.ofSeconds(10))
.build();

  • 基于引用:设置缓存为软引用或弱引用,利用GC来回收缓存数据。性能较差,不建议使用。

注意:在默认情况下,当一个缓存元素过期的时候,Caffeine不会自动立即将其清理和驱逐。而是在一次读或写操作后,或者在空闲时间完成对失效数据的驱逐。

实现JVM进程缓存

需求

利用Caffeine实现下列需求:

  • 给根据id查询商品的业务添加缓存,缓存未命中时查询数据库
  • 给根据id查询商品库存的业务添加缓存,缓存未命中时查询数据库
  • 缓存初始大小为100
  • 缓存上限为10000

实现

首先,我们需要定义两个Caffeine的缓存对象,分别保存商品、库存的缓存数据。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20

@Configuration
public class CaffeineConfig {

@Bean
public Cache<Long, Item> itemCache(){
return Caffeine.newBuilder()
.initialCapacity(100)//指定初始化缓存大小
.maximumSize(10_000)//最大上限 超出就LRU淘汰
.build();
}

@Bean
public Cache<Long, ItemStock> stockCache(){
return Caffeine.newBuilder()
.initialCapacity(100)
.maximumSize(10_000)
.build();
}
}

然后,修改item-service中的com.heima.item.web包下的ItemController类,添加缓存逻辑:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
@RestController
@RequestMapping("item")
public class ItemController {

@Autowired
private IItemService itemService;
@Autowired
private IItemStockService stockService;

@Autowired
private Cache<Long, Item> itemCache;
@Autowired
private Cache<Long, ItemStock> stockCache;

// ...其它略

@GetMapping("/{id}")
public Item findById(@PathVariable("id") Long id) {
return itemCache.get(id, key -> itemService.query()
.ne("status", 3).eq("id", key)
.one()
);
}

@GetMapping("/stock/{id}")
public ItemStock findStockById(@PathVariable("id") Long id) {
return stockCache.get(id, key -> stockService.getById(key));
}
}

Lua语法入门

Nginx编程需要用到Lua语言,因此我们必须先入门Lua的基本语法。

初识Lua

Lua 是一种轻量小巧的脚本语言,用标准C语言编写并以源代码形式开放, 其设计目的是为了嵌入应用程序中,从而为应用程序提供灵活的扩展和定制功能。官网:https://www.lua.org/

Lua经常嵌入到C语言开发的程序中,例如游戏开发、游戏插件等。

Nginx本身也是C语言开发,因此也允许基于Lua做拓展。

变量和循环

学习任何语言必然离不开变量,而变量的声明必须先知道数据的类型。

Lua的数据类型

Lua中支持的常见数据类型包括:

lua数据类型.png

另外,Lua提供了type()函数来判断一个变量的数据类型

声明变量

Lua声明变量的时候无需指定数据类型,而是用local来声明变量为局部变量:

1
2
3
4
5
6
7
8
-- 声明字符串,可以用单引号或双引号,
local str = 'hello'
-- 字符串拼接可以使用 ..
local str2 = 'hello' .. 'world'
-- 声明数字
local num = 21
-- 声明布尔类型
local flag = true

Lua中的table类型既可以作为数组,又可以作为Java中的map来使用。数组就是特殊的table,key是数组角标而已:

1
2
3
4
-- 声明数组 ,key为角标的 table
local arr = {'java', 'python', 'lua'}
-- 声明table,类似java的map
local map = {name='Jack', age=21}

Lua中的数组角标是从1开始,访问的时候与Java中类似:

1
2
-- 访问数组,lua数组的角标从1开始
print(arr[1])

Lua中的table可以用key来访问:

1
2
3
-- 访问table
print(map['name'])
print(map.name)

循环

对于table,我们可以利用for循环来遍历。不过数组和普通table遍历略有差异。

遍历数组:

1
2
3
4
5
6
-- 声明数组 key为索引的 table
local arr = {'java', 'python', 'lua'}
-- 遍历数组
for index,value in ipairs(arr) do
print(index, value)
end

遍历普通table

1
2
3
4
5
6
-- 声明map,也就是table
local map = {name='Jack', age=21}
-- 遍历table
for key,value in pairs(map) do
print(key, value)
end

条件控制、函数

Lua中的条件控制和函数声明与Java类似。

函数

定义函数的语法:

1
2
3
4
function 函数名( argument1, argument2..., argumentn)
-- 函数体
return 返回值
end

例如,定义一个函数,用来打印数组:

1
2
3
4
5
function printArr(arr)
for index, value in ipairs(arr) do
print(value)
end
end

条件控制

类似Java的条件控制,例如if、else语法:

1
2
3
4
5
6
7
if(布尔表达式)
then
--[ 布尔表达式为 true 时执行该语句块 --]
else
--[ 布尔表达式为 false 时执行该语句块 --]
end

与java不同,布尔表达式中的逻辑运算是基于英文单词:

lua逻辑运算符.png

案例

需求:自定义一个函数,可以打印table,当参数为nil时,打印错误信息

1
2
3
4
5
6
7
8
function printArr(arr) #如果这里传入的是map则不能打印(虽然都是table
if not arr then
print('数组不能为空!')
end
for index, value in ipairs(arr) do
print(value)
end
end

实现多级缓存

多级缓存的实现离不开Nginx编程,而Nginx编程又离不开OpenResty。

安装OpenResty

OpenResty® 是一个基于 Nginx的高性能 Web 平台,用于方便地搭建能够处理超高并发、扩展性极高的动态 Web 应用、Web 服务和动态网关。具备下列特点:

  • 具备Nginx的完整功能
  • 基于Lua语言进行扩展,集成了大量精良的 Lua 库、第三方模块
  • 允许使用Lua自定义业务逻辑自定义库

官方网站: https://openresty.org/cn/

1.安装

1)安装开发库

首先要安装OpenResty的依赖开发库,执行命令:

1
yum install -y pcre-devel openssl-devel gcc --skip-broken

2)安装OpenResty仓库

你可以在你的 CentOS 系统中添加 openresty 仓库,这样就可以便于未来安装或更新我们的软件包(通过 yum check-update 命令)。运行下面的命令就可以添加我们的仓库:

1
yum-config-manager --add-repo https://openresty.org/package/centos/openresty.repo

如果提示说命令不存在,则运行:

1
yum install -y yum-utils 

然后再重复上面的命令

3)安装OpenResty

然后就可以像下面这样安装软件包,比如 openresty

1
yum install -y openresty

4)安装opm工具

opm是OpenResty的一个管理工具,可以帮助我们安装一个第三方的Lua模块。

如果你想安装命令行工具 opm,那么可以像下面这样安装 openresty-opm 包:

1
yum install -y openresty-opm

5)目录结构

默认情况下,OpenResty安装的目录是:/usr/local/openresty

OpenResty就是在Nginx基础上集成了一些Lua模块。

6)配置nginx的环境变量

打开配置文件:

1
vi /etc/profile

在最下面加入两行:

1
2
export NGINX_HOME=/usr/local/openresty/nginx
export PATH=${NGINX_HOME}/sbin:$PATH

NGINX_HOME:后面是OpenResty安装目录下的nginx的目录

然后让配置生效:

1
source /etc/profile

2.启动和运行

OpenResty底层是基于Nginx的,查看OpenResty目录的nginx目录,结构与windows中安装的nginx基本一致

所以运行方式与nginx基本一致:

1
2
3
4
5
6
# 启动nginx
nginx
# 重新加载配置
nginx -s reload
# 停止
nginx -s stop

nginx的默认配置文件注释太多,这里将nginx.conf中的注释部分删除,保留有效部分。

修改/usr/local/openresty/nginx/conf/nginx.conf文件,内容如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28

#user nobody;
worker_processes 1;
error_log logs/error.log;

events {
worker_connections 1024;
}

http {
include mime.types;
default_type application/octet-stream;
sendfile on;
keepalive_timeout 65;

server {
listen 8081;
server_name localhost;
location / {
root html;
index index.html index.htm;
}
error_page 500 502 503 504 /50x.html;
location = /50x.html {
root html;
}
}
}

在Linux的控制台输入命令以启动nginx:

1
nginx

3.备注

加载OpenResty的lua模块:

1
2
3
4
#lua 模块
lua_package_path "/usr/local/openresty/lualib/?.lua;;";
#c模块
lua_package_cpath "/usr/local/openresty/lualib/?.so;;";

OpenResty快速入门

我们希望达到的多级缓存架构如图:

多级缓存架构.png

其中:

  • windows上的nginx用来做反向代理服务,将前端的查询商品的ajax请求代理到OpenResty集群

  • OpenResty集群用来编写多级缓存业务

反向代理流程

现在,商品详情页使用的是假的商品数据。不过在浏览器中,可以看到页面有发起ajax请求查询真实商品数据。

请求地址是localhost,端口是80,就被windows上安装的Nginx服务给接收到了。然后代理给了OpenResty集群:

nginx反向代理给openResty.png

我们需要在OpenResty中编写业务,查询商品数据并返回到浏览器。

OpenResty监听请求

OpenResty的很多功能都依赖于其目录下的Lua库,需要在nginx.conf中指定依赖库的目录,并导入依赖:

1)添加对OpenResty的Lua模块的加载

修改/usr/local/openresty/nginx/conf/nginx.conf文件,在其中的http下面,添加下面代码:

1
2
3
4
#lua 模块
lua_package_path "/usr/local/openresty/lualib/?.lua;;";
#c模块
lua_package_cpath "/usr/local/openresty/lualib/?.so;;";

2)监听/api/item路径

修改/usr/local/openresty/nginx/conf/nginx.conf文件,在nginx.conf的server下面,添加对/api/item这个路径的监听:

1
2
3
4
5
6
location  /api/item {
# 默认的响应类型
default_type application/json;
# 响应结果由lua/item.lua文件来决定
content_by_lua_file lua/item.lua;
}

这个监听,就类似于SpringMVC中的@GetMapping("/api/item")做路径映射。

content_by_lua_file lua/item.lua则相当于调用item.lua这个文件,执行其中的业务,把结果返回给用户。相当于java中调用service。

编写item.lua

1)在/usr/loca/openresty/nginx目录创建文件夹:lua

2)在/usr/loca/openresty/nginx/lua文件夹下,新建文件:item.lua

3)编写item.lua,返回假数据item.lua中,利用ngx.say()函数返回数据到Response中

1
ngx.say('{"id":10001,"name":"SALSA AIR","title":"RIMOWA 21寸托运箱拉杆箱 SALSA AIR系列果绿色 820.70.36.4","price":17900,"image":"https://m.360buyimg.com/mobilecms/s720x720_jfs/t6934/364/1195375010/84676/e9f2c55f/597ece38N0ddcbc77.jpg!q70.jpg.webp","category":"拉杆箱","brand":"RIMOWA","spec":"","status":1,"createTime":"2019-04-30T16:00:00.000+00:00","updateTime":"2019-04-30T16:00:00.000+00:00","stock":2999,"sold":31290}')

4)重新加载配置

1
nginx -s reload

请求参数处理

要返回真实数据,必须根据前端传递来的商品id,查询商品信息才可以。

那么如何获取前端传递的商品参数呢?

获取参数的API

OpenResty中提供了一些API用来获取不同类型的前端请求参数:

OpenRestyAPI.png

获取参数并返回

1)获取商品id

修改/usr/loca/openresty/nginx/nginx.conf文件中监听/api/item的代码,利用正则表达式获取ID:

1
2
3
4
5
6
location ~ /api/item/(\d+) {
# 默认的响应类型
default_type application/json;
# 响应结果由lua/item.lua文件来决定
content_by_lua_file lua/item.lua;
}

2)拼接ID并返回

修改/usr/loca/openresty/nginx/lua/item.lua文件,获取id并拼接到结果中返回:

1
2
3
4
-- 获取商品id
local id = ngx.var[1]
-- 拼接并返回
ngx.say('{"id":' .. id .. ',"name":"SALSA AIR","title":"RIMOWA 21寸托运箱拉杆箱 SALSA AIR系列果绿色 820.70.36.4","price":17900,"image":"https://m.360buyimg.com/mobilecms/s720x720_jfs/t6934/364/1195375010/84676/e9f2c55f/597ece38N0ddcbc77.jpg!q70.jpg.webp","category":"拉杆箱","brand":"RIMOWA","spec":"","status":1,"createTime":"2019-04-30T16:00:00.000+00:00","updateTime":"2019-04-30T16:00:00.000+00:00","stock":2999,"sold":31290}')

3)重新加载并测试

运行命令以重新加载OpenResty配置:

1
nginx -s reload

查询Tomcat

拿到商品ID后,本应去缓存中查询商品信息,不过目前我们还未建立nginx、redis缓存。因此,这里我们先根据商品id去tomcat查询商品信息。我们实现如图部分:(暂时去掉了中间的redis)

openResty访问Tomcat.png

需要注意的是,我们的OpenResty是在虚拟机,Tomcat是在Windows电脑上。两者IP一定不要搞错了。

发送http请求的API

nginx提供了内部API用以发送http请求:

1
2
3
4
local resp = ngx.location.capture("/path",{
method = ngx.HTTP_GET, -- 请求方式
args = {a=1,b=2}, -- get方式传参数
})

返回的响应内容包括:

  • resp.status:响应状态码
  • resp.header:响应头,是一个table
  • resp.body:响应体,就是响应数据

注意:这里的path是路径,并不包含IP和端口。这个请求会被nginx内部的server监听并处理。

但是我们希望这个请求发送到Tomcat服务器,所以还需要编写一个server来对这个路径做反向代理:

1
2
3
4
location /path {
# 这里是windows电脑的ip和Java服务端口,需要确保windows防火墙处于关闭状态
proxy_pass http://192.168.150.1:8081;
}

原理如图:

openResty访问TomCat通过反向代理.png

封装http工具

下面,我们封装一个发送Http请求的工具,基于ngx.location.capture来实现查询tomcat。

1)添加反向代理,到windows的Java服务

因为item-service中的接口都是/item开头,所以我们监听/item路径,代理到windows上的tomcat服务。

修改 /usr/local/openresty/nginx/conf/nginx.conf文件,添加一个location:

1
2
3
location /item {
proxy_pass http://192.168.150.1:8081;
}

以后,只要我们调用ngx.location.capture("/item"),就一定能发送请求到windows的tomcat服务。

2)封装工具类

之前我们说过,OpenResty启动时会加载以下两个目录中的工具文件,所以,自定义的http工具也需要放到这个目录下。(/lualib)

/usr/local/openresty/lualib目录下,新建一个common.lua文件:

1
vi /usr/local/openresty/lualib/common.lua

内容如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
-- 封装函数,发送http请求,并解析响应
local function read_http(path, params)
local resp = ngx.location.capture(path,{
method = ngx.HTTP_GET,
args = params,
})
if not resp then
-- 记录错误信息,返回404
ngx.log(ngx.ERR, "http请求查询失败, path: ", path , ", args: ", args)
ngx.exit(404)
end
return resp.body
end
-- 将方法导出
local _M = {
read_http = read_http
}
return _M

这个工具将read_http函数封装到_M这个table类型的变量中,并且返回,这类似于导出。

使用的时候,可以利用require('common')来导入该函数库,这里的common是函数库的文件名。

3)实现商品查询

最后,我们修改/usr/local/openresty/lua/item.lua文件,利用刚刚封装的函数库实现对tomcat的查询:

1
2
3
4
5
6
7
8
9
10
-- 引入自定义common工具模块,返回值是common中返回的 _M
local common = require("common")
-- 从 common中获取read_http这个函数
local read_http = common.read_http
-- 获取路径参数
local id = ngx.var[1]
-- 根据id查询商品
local itemJSON = read_http("/item/".. id, nil)
-- 根据id查询商品库存
local itemStockJSON = read_http("/item/stock/".. id, nil)

这里查询到的结果是json字符串,并且包含商品、库存两个json字符串,页面最终需要的是把两个json拼接为一个json

这就需要我们先把JSON变为lua的table,完成数据整合后,再转为JSON。

CJSON工具类

OpenResty提供了一个cjson的模块用来处理JSON的序列化和反序列化。

官方地址: https://github.com/openresty/lua-cjson/

1)引入cjson模块:(默认在lualib默认库里面 直接引入就可以了)

1
local cjson = require "cjson"

2)序列化:

1
2
3
4
5
6
local obj = {
name = 'jack',
age = 21
}
-- 把 table 序列化为 json
local json = cjson.encode(obj)

3)反序列化:

1
2
3
4
local json = '{"name": "jack", "age": 21}'
-- 反序列化 json为 table
local obj = cjson.decode(json);
print(obj.name)

实现Tomcat查询

下面,我们修改之前的item.lua中的业务,添加json处理功能:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
-- 导入common函数库
local common = require('common')
local read_http = common.read_http
-- 导入cjson库
local cjson = require('cjson')

-- 获取路径参数
local id = ngx.var[1]
-- 根据id查询商品
local itemJSON = read_http("/item/".. id, nil)
-- 根据id查询商品库存
local itemStockJSON = read_http("/item/stock/".. id, nil)

-- JSON转化为lua的table
local item = cjson.decode(itemJSON)
local stock = cjson.decode(stockJSON)

-- 组合数据
item.stock = stock.stock
item.sold = stock.sold

-- 把item序列化为json 返回结果
ngx.say(cjson.encode(item))

基于ID负载均衡

刚才的代码中,我们的tomcat是单机部署。而实际开发中,tomcat一定是集群模式

因此,OpenResty需要对tomcat集群做负载均衡。

而默认的负载均衡规则是轮询模式,当我们查询/item/10001时:

  • 第一次会访问8081端口的tomcat服务,在该服务内部就形成了JVM进程缓存
  • 第二次会访问8082端口的tomcat服务,该服务内部没有JVM缓存(因为JVM缓存无法共享),会查询数据库

你看,因为轮询的原因,第一次查询8081形成的JVM缓存并未生效,直到下一次再次访问到8081时才可以生效,缓存命中率太低了。

也就是说,我们需要根据商品id做负载均衡,而不是轮询。

1)原理

nginx提供了基于请求路径做负载均衡的算法:

nginx根据请求路径做hash运算,把得到的数值对tomcat服务的数量取余,余数是几,就访问第几个服务,实现负载均衡。

例如:

  • 我们的请求路径是 /item/10001
  • tomcat总数为2台(8081、8082)
  • 对请求路径/item/10001做hash运算求余的结果为1
  • 则访问第一个tomcat服务,也就是8081

只要id不变,每次hash运算结果也不会变,那就可以保证同一个商品,一直访问同一个tomcat服务,确保JVM缓存生效。

2)实现

修改/usr/local/openresty/nginx/conf/nginx.conf文件,实现基于ID做负载均衡。

首先,定义tomcat集群,并设置基于路径做负载均衡:

1
2
3
4
5
upstream tomcat-cluster {
hash $request_uri;
server 192.168.150.1:8081;
server 192.168.150.1:8082;
}

然后,修改对tomcat服务的反向代理,目标指向tomcat集群:

1
2
3
location /item {
proxy_pass http://tomcat-cluster;
}

重新加载OpenResty

1
nginx -s reload

Redis缓存预热

Redis缓存会面临冷启动问题:

冷启动:服务刚刚启动时,Redis中并没有缓存,如果所有商品数据都在第一次查询时添加缓存,可能会给数据库带来较大压力。

缓存预热:在实际开发中,我们可以利用大数据统计用户访问的热点数据,在项目启动时将这些热点数据提前查询并保存到Redis中。

1)利用Docker启动Redis(需要先安装redis镜像)

1
docker run --name redis -p 6379:6379 -d redis redis-server --appendonly yes

2)在item-service服务中引入Redis依赖

1
2
3
4
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-data-redis</artifactId>
</dependency>

3)配置Redis地址

1
2
3
spring:
redis:
host: 192.168.150.101

4)编写初始化类

缓存预热需要在项目启动时完成,并且必须是拿到RedisTemplate之后。

这里我们利用InitializingBean接口来实现,因为InitializingBean可以在对象被Spring创建并且成员变量全部注入后执行。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38

@Component
public class RedisHandler implements InitializingBean {

@Autowired
private StringRedisTemplate redisTemplate;

@Autowired
private IItemService itemService;
@Autowired
private IItemStockService stockService;

private static final ObjectMapper MAPPER = new ObjectMapper();

@Override
public void afterPropertiesSet() throws Exception {
// 初始化缓存
// 1.查询商品信息
List<Item> itemList = itemService.list();
// 2.放入缓存
for (Item item : itemList) {
// 2.1.item序列化为JSON
String json = MAPPER.writeValueAsString(item);
// 2.2.存入redis
redisTemplate.opsForValue().set("item:id:" + item.getId(), json);
}

// 3.查询商品库存信息
List<ItemStock> stockList = stockService.list();
// 4.放入缓存
for (ItemStock stock : stockList) {
// 2.1.item序列化为JSON
String json = MAPPER.writeValueAsString(stock);
// 2.2.存入redis
redisTemplate.opsForValue().set("item:stock:id:" + stock.getId(), json);
}
}
}

查询Redis缓存

现在,Redis缓存已经准备就绪,我们可以再OpenResty中实现查询Redis的逻辑了。

当请求进入OpenResty之后:

  • 优先查询Redis缓存
  • 如果Redis缓存未命中,再查询Tomcat

封装Redis工具

OpenResty提供了操作Redis的模块,我们只要引入该模块就能直接使用。但是为了方便,我们将Redis操作封装到之前的common.lua工具库中。

修改/usr/local/openresty/lualib/common.lua文件:

1)引入Redis模块,并初始化Redis对象

1
2
3
4
5
-- 导入redis
local redis = require('resty.redis')
-- 初始化redis
local red = redis:new()
red:set_timeouts(1000, 1000, 1000)

2)封装函数,用来释放Redis连接,其实是放入连接池

1
2
3
4
5
6
7
8
9
-- 关闭redis连接的工具方法,其实是放入连接池
local function close_redis(red)
local pool_max_idle_time = 10000 -- 连接的空闲时间,单位是毫秒
local pool_size = 100 --连接池大小
local ok, err = red:set_keepalive(pool_max_idle_time, pool_size)
if not ok then
ngx.log(ngx.ERR, "放入redis连接池失败: ", err)
end
end

3)封装函数,根据key查询Redis数据

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
-- 查询redis的方法 ip和port是redis地址,key是查询的key
local function read_redis(ip, port, key)
-- 获取一个连接
local ok, err = red:connect(ip, port)
if not ok then
ngx.log(ngx.ERR, "连接redis失败 : ", err)
return nil
end
-- 查询redis
local resp, err = red:get(key)
-- 查询失败处理
if not resp then
ngx.log(ngx.ERR, "查询Redis失败: ", err, ", key = " , key)
end
--得到的数据为空处理
if resp == ngx.null then
resp = nil
ngx.log(ngx.ERR, "查询Redis数据为空, key = ", key)
end
close_redis(red)
return resp
end

4)导出

1
2
3
4
5
6
-- 将方法导出
local _M = {
read_http = read_http,
read_redis = read_redis
}
return _M

完整的common.lua:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
-- 导入redis
local redis = require('resty.redis')
-- 初始化redis
local red = redis:new()
red:set_timeouts(1000, 1000, 1000)

-- 关闭redis连接的工具方法,其实是放入连接池
local function close_redis(red)
local pool_max_idle_time = 10000 -- 连接的空闲时间,单位是毫秒
local pool_size = 100 --连接池大小
local ok, err = red:set_keepalive(pool_max_idle_time, pool_size)
if not ok then
ngx.log(ngx.ERR, "放入redis连接池失败: ", err)
end
end

-- 查询redis的方法 ip和port是redis地址,key是查询的key
local function read_redis(ip, port, key)
-- 获取一个连接
local ok, err = red:connect(ip, port)
if not ok then
ngx.log(ngx.ERR, "连接redis失败 : ", err)
return nil
end
-- 查询redis
local resp, err = red:get(key)
-- 查询失败处理
if not resp then
ngx.log(ngx.ERR, "查询Redis失败: ", err, ", key = " , key)
end
--得到的数据为空处理
if resp == ngx.null then
resp = nil
ngx.log(ngx.ERR, "查询Redis数据为空, key = ", key)
end
close_redis(red)
return resp
end

-- 封装函数,发送http请求,并解析响应
local function read_http(path, params)
local resp = ngx.location.capture(path,{
method = ngx.HTTP_GET,
args = params,
})
if not resp then
-- 记录错误信息,返回404
ngx.log(ngx.ERR, "http查询失败, path: ", path , ", args: ", args)
ngx.exit(404)
end
return resp.body
end
-- 将方法导出
local _M = {
read_http = read_http,
read_redis = read_redis
}
return _M

实现Redis查询

接下来,我们就可以去修改item.lua文件,实现对Redis的查询了。

查询逻辑是:

  • 根据id查询Redis
  • 如果查询失败则继续查询Tomcat
  • 将查询结果返回

1)修改/usr/local/openresty/lua/item.lua文件,添加一个查询函数:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
-- 导入common函数库
local common = require('common')
local read_http = common.read_http
local read_redis = common.read_redis
-- 封装查询函数
function read_data(key, path, params)
-- 查询本地缓存
local val = read_redis("127.0.0.1", 6379, key)
-- 判断查询结果
if not val then
ngx.log(ngx.ERR, "redis查询失败,尝试查询http, key: ", key)
-- redis查询失败,去查询http
val = read_http(path, params)
end
-- 返回数据
return val
end

2)而后修改商品查询、库存查询的业务

3)完整的item.lua代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
-- 导入common函数库
local common = require('common')
local read_http = common.read_http
local read_redis = common.read_redis
-- 导入cjson库
local cjson = require('cjson')

-- 封装查询函数
function read_data(key, path, params)
-- 查询本地缓存
local val = read_redis("127.0.0.1", 6379, key)
-- 判断查询结果
if not val then
ngx.log(ngx.ERR, "redis查询失败,尝试查询http, key: ", key)
-- redis查询失败,去查询http
val = read_http(path, params)
end
-- 返回数据
return val
end

-- 获取路径参数
local id = ngx.var[1]

-- 查询商品信息
local itemJSON = read_data("item:id:" .. id, "/item/" .. id, nil)
-- 查询库存信息
local stockJSON = read_data("item:stock:id:" .. id, "/item/stock/" .. id, nil)

-- JSON转化为lua的table
local item = cjson.decode(itemJSON)
local stock = cjson.decode(stockJSON)
-- 组合数据
item.stock = stock.stock
item.sold = stock.sold

-- 把item序列化为json 返回结果
ngx.say(cjson.encode(item))

Nginx本地缓存

现在,整个多级缓存中只差最后一环,也就是nginx的本地缓存了。

本地缓存API

OpenResty为Nginx提供了shard dict的功能,可以在nginx的多个worker之间共享数据,实现缓存功能。

1)开启共享字典,在nginx.conf的http下添加配置:

1
2
# 共享字典,也就是本地缓存,名称叫做:item_cache,大小150m
lua_shared_dict item_cache 150m;

2)操作共享字典:

1
2
3
4
5
6
-- 获取本地缓存对象
local item_cache = ngx.shared.item_cache
-- 存储, 指定key、value、过期时间,单位s,默认为0代表永不过期
item_cache:set('key', 'value', 1000)
-- 读取
local val = item_cache:get('key')

实现本地缓存查询

1)修改/usr/local/openresty/lua/item.lua文件,修改read_data查询函数,添加本地缓存逻辑:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
-- 导入共享词典,本地缓存
local item_cache = ngx.shared.item_cache

-- 封装查询函数
function read_data(key, expire, path, params)
-- 查询本地缓存
local val = item_cache:get(key)
if not val then
ngx.log(ngx.ERR, "本地缓存查询失败,尝试查询Redis, key: ", key)
-- 查询redis
val = read_redis("127.0.0.1", 6379, key)
-- 判断查询结果
if not val then
ngx.log(ngx.ERR, "redis查询失败,尝试查询http, key: ", key)
-- redis查询失败,去查询http
val = read_http(path, params)
end
end
-- 查询成功,把数据写入本地缓存
item_cache:set(key, val, expire)
-- 返回数据
return val
end

2)修改item.lua中查询商品和库存的业务,实现最新的read_data函数:

其实就是多了缓存时间参数,过期后nginx缓存会自动删除,下次访问即可更新缓存。

这里给商品基本信息设置超时时间为30分钟,库存为1分钟。

因为库存更新频率较高,如果缓存时间过长,可能与数据库差异较大。

3)完整的item.lua文件:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
-- 导入common函数库
local common = require('common')
local read_http = common.read_http
local read_redis = common.read_redis
-- 导入cjson库
local cjson = require('cjson')
-- 导入共享词典,本地缓存
local item_cache = ngx.shared.item_cache

-- 封装查询函数
function read_data(key, expire, path, params)
-- 查询本地缓存
local val = item_cache:get(key)
if not val then
ngx.log(ngx.ERR, "本地缓存查询失败,尝试查询Redis, key: ", key)
-- 查询redis
val = read_redis("127.0.0.1", 6379, key)
-- 判断查询结果
if not val then
ngx.log(ngx.ERR, "redis查询失败,尝试查询http, key: ", key)
-- redis查询失败,去查询http
val = read_http(path, params)
end
end
-- 查询成功,把数据写入本地缓存
item_cache:set(key, val, expire)
-- 返回数据
return val
end

-- 获取路径参数
local id = ngx.var[1]

-- 查询商品信息
local itemJSON = read_data("item:id:" .. id, 1800, "/item/" .. id, nil)
-- 查询库存信息
local stockJSON = read_data("item:stock:id:" .. id, 60, "/item/stock/" .. id, nil)

-- JSON转化为lua的table
local item = cjson.decode(itemJSON)
local stock = cjson.decode(stockJSON)
-- 组合数据
item.stock = stock.stock
item.sold = stock.sold

-- 把item序列化为json 返回结果
ngx.say(cjson.encode(item))

缓存同步

大多数情况下,浏览器查询到的都是缓存数据,如果缓存数据与数据库数据存在较大差异,可能会产生比较严重的后果。

所以我们必须保证数据库数据、缓存数据的一致性,这就是缓存与数据库的同步。

数据同步策略

缓存数据同步的常见方式有三种:

设置有效期:给缓存设置有效期,到期后自动删除。再次查询时更新

  • 优势:简单、方便
  • 缺点:时效性差,缓存过期之前可能不一致
  • 场景:更新频率较低,时效性要求低的业务

同步双写:在修改数据库的同时,直接修改缓存

  • 优势:时效性强,缓存与数据库强一致
  • 缺点:有代码侵入,耦合度高;
  • 场景:对一致性、时效性要求较高的缓存数据

异步通知:修改数据库时发送事件通知,相关服务监听到通知后修改缓存数据

  • 优势:低耦合,可以同时通知多个缓存服务
  • 缺点:时效性一般,可能存在中间不一致状态
  • 场景:时效性要求一般,有多个服务需要同步

而异步实现又可以基于MQ或者Canal来实现:

1)基于MQ的异步通知:

基于mq的缓存同步.png

解读:

  • 商品服务完成对数据的修改后,只需要发送一条消息到MQ中。
  • 缓存服务监听MQ消息,然后完成对缓存的更新

依然有少量的代码侵入。

2)基于Canal的通知

基于canal的缓存同步.png

解读:

  • 商品服务完成商品修改后,业务直接结束,没有任何代码侵入
  • Canal监听MySQL变化,当发现变化后,立即通知缓存服务
  • 缓存服务接收到canal通知,更新缓存

代码零侵入

安装Canal

认识Canal

**Canal [kə’næl]**,译意为水道/管道/沟渠,canal是阿里巴巴旗下的一款开源项目,基于Java开发。基于数据库增量日志解析,提供增量数据订阅&消费。GitHub的地址:https://github.com/alibaba/canal

Canal是基于mysql的主从同步来实现的,MySQL主从同步的原理如下:

mysql主从同步原理.png

  • 1)MySQL master 将数据变更写入二进制日志( binary log),其中记录的数据叫做binary log events
  • 2)MySQL slave 将 master 的 binary log events拷贝到它的中继日志(relay log)
  • 3)MySQL slave 重放 relay log 中事件,将数据变更反映它自己的数据

而Canal就是把自己伪装成MySQL的一个slave节点,从而监听master的binary log变化。再把得到的变化信息通知给Canal的客户端,进而完成对其它数据库的同步。

安装Canal

开启MySQL主从

Canal是基于MySQL的主从同步功能,因此必须先开启MySQL的主从功能才可以。

这里以之前用Docker运行的mysql为例:

开启binlog

打开mysql容器挂载的日志文件,我的在/tmp/mysql/conf目录:

修改文件:

1
vi /tmp/mysql/conf/my.cnf

添加内容:

1
2
log-bin=/var/lib/mysql/mysql-bin
binlog-do-db=heima

注意 必须文件内容开头加上[mysqld]

配置解读:

  • log-bin=/var/lib/mysql/mysql-bin:设置binary log文件的存放地址和文件名,叫做mysql-bin
  • binlog-do-db=heima:指定对哪个database记录binary log events,这里记录heima这个库

最终效果:

1
2
3
4
5
6
7
[mysqld]
skip-name-resolve
character_set_server=utf8
datadir=/var/lib/mysql
server-id=1000
log-bin=/var/lib/mysql/mysql-bin
binlog-do-db=heima

设置用户权限

接下来添加一个仅用于数据同步的账户,出于安全考虑,这里仅提供对heima这个库的操作权限。

1
2
3
create user canal@'%' IDENTIFIED by 'canal';
GRANT SELECT, REPLICATION SLAVE, REPLICATION CLIENT,SUPER ON *.* TO 'canal'@'%' identified by 'canal';
FLUSH PRIVILEGES;

重启mysql容器即可

1
docker restart mysql

测试设置是否成功:在mysql控制台,或者Navicat中,输入命令:

1
show master status;

安装Canal

创建网络

我们需要创建一个网络,将MySQL、Canal、MQ放到同一个Docker网络中:

网络内部的docker可以相互访问,否则docker是相互隔离不可访问的

1
docker network create heima

让mysql加入这个网络:

1
docker network connect heima mysql

安装Canal

下载canal的镜像压缩包:

上传到虚拟机,然后通过命令导入:

1
docker load -i canal.tar

然后运行命令创建Canal容器:

1
2
3
4
5
6
7
8
9
10
11
docker run -p 11111:11111 --name canal \
-e canal.destinations=heima \
-e canal.instance.master.address=mysql:3306 \
-e canal.instance.dbUsername=canal \
-e canal.instance.dbPassword=canal \
-e canal.instance.connectionCharset=UTF-8 \
-e canal.instance.tsdb.enable=true \
-e canal.instance.gtidon=false \
-e canal.instance.filter.regex=heima\\..* \
--network heima \
-d canal/canal-server:v1.1.5

说明:

  • -p 11111:11111:这是canal的默认监听端口
  • -e canal.instance.master.address=mysql:3306:数据库地址和端口,如果不知道mysql容器地址,可以通过docker inspect 容器id来查看
  • -e canal.instance.dbUsername=canal:数据库用户名
  • -e canal.instance.dbPassword=canal :数据库密码
  • -e canal.instance.filter.regex=:要监听的表名称

表名称监听支持的语法:

1
2
3
4
5
6
7
8
mysql 数据解析关注的表,Perl正则表达式.
多个正则之间以逗号(,)分隔,转义符需要双斜杠(\\)
常见例子:
1. 所有表:.* or .*\\..*
2. canal schema下所有表: canal\\..*
3. canal下的以canal打头的表:canal\\.canal.*
4. canal schema下的一张表:canal.test1
5. 多个规则组合使用然后以逗号隔开:canal\\..*,mysql.test1,mysql.test2

监听Canal

Canal提供了各种语言的客户端,当Canal监听到binlog变化时,会通知Canal的客户端。

canal原理.png

我们可以利用Canal提供的Java客户端,监听Canal通知消息。当收到变化的消息时,完成对缓存的更新。

不过这里我们会使用GitHub上的第三方开源的canal-starter客户端。地址:https://github.com/NormanGyllenhaal/canal-client

与SpringBoot完美整合,自动装配,比官方客户端要简单好用很多。

引入依赖

1
2
3
4
5
<dependency>
<groupId>top.javatool</groupId>
<artifactId>canal-spring-boot-starter</artifactId>
<version>1.2.1-RELEASE</version>
</dependency>

编写配置

1
2
3
canal:
destination: heima # canal的集群名字,要与安装canal时设置的名称一致
server: 192.168.150.101:11111 # canal服务地址

修改Item实体类

通过@Id、@Column、等注解完成Item与数据库表字段的映射:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25

@Data
@TableName("tb_item")
public class Item {
@TableId(type = IdType.AUTO)
@Id
private Long id;//商品id
@Column(name = "name")
private String name;//商品名称
private String title;//商品标题
private Long price;//价格(分)
private String image;//商品图片
private String category;//分类名称
private String brand;//品牌名称
private String spec;//规格
private Integer status;//商品状态 1-正常,2-下架
private Date createTime;//创建时间
private Date updateTime;//更新时间
@TableField(exist = false)
@Transient
private Integer stock;
@TableField(exist = false)
@Transient
private Integer sold;
}

编写监听器

通过实现EntryHandler<T>接口编写监听器,监听Canal消息。注意两点:

  • 实现类通过@CanalTable("tb_item")指定监听的表信息
  • EntryHandler的泛型是与表对应的实体类
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34

@CanalTable("tb_item")
@Component
public class ItemHandler implements EntryHandler<Item> {

@Autowired
private RedisHandler redisHandler;
@Autowired
private Cache<Long, Item> itemCache;

@Override
public void insert(Item item) {
// 写数据到JVM进程缓存
itemCache.put(item.getId(), item);
// 写数据到redis
redisHandler.saveItem(item);
}

@Override
public void update(Item before, Item after) {
// 写数据到JVM进程缓存
itemCache.put(after.getId(), after);
// 写数据到redis
redisHandler.saveItem(after);
}

@Override
public void delete(Item item) {
// 删除数据到JVM进程缓存
itemCache.invalidate(item.getId());
// 删除数据到redis
redisHandler.deleteItemById(item.getId());
}
}

在这里对Redis的操作都封装到了RedisHandler这个对象中,是我们之前做缓存预热时编写的一个类,内容如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51

@Component
public class RedisHandler implements InitializingBean {

@Autowired
private StringRedisTemplate redisTemplate;

@Autowired
private IItemService itemService;
@Autowired
private IItemStockService stockService;

private static final ObjectMapper MAPPER = new ObjectMapper();

@Override
public void afterPropertiesSet() throws Exception {
// 初始化缓存
// 1.查询商品信息
List<Item> itemList = itemService.list();
// 2.放入缓存
for (Item item : itemList) {
// 2.1.item序列化为JSON
String json = MAPPER.writeValueAsString(item);
// 2.2.存入redis
redisTemplate.opsForValue().set("item:id:" + item.getId(), json);
}

// 3.查询商品库存信息
List<ItemStock> stockList = stockService.list();
// 4.放入缓存
for (ItemStock stock : stockList) {
// 2.1.item序列化为JSON
String json = MAPPER.writeValueAsString(stock);
// 2.2.存入redis
redisTemplate.opsForValue().set("item:stock:id:" + stock.getId(), json);
}
}

public void saveItem(Item item) {
try {
String json = MAPPER.writeValueAsString(item);
redisTemplate.opsForValue().set("item:id:" + item.getId(), json);
} catch (JsonProcessingException e) {
throw new RuntimeException(e);
}
}

public void deleteItemById(Long id) {
redisTemplate.delete("item:id:" + id);
}
}

服务异步通信

消息队列在使用过程中,面临着很多实际问题需要思考:

使用消息队列可能的问题.png

消息可靠性

消息从发送,到消费者接收,会经理多个过程:

消息队列发送基本过程.png

其中的每一步都可能导致消息丢失,常见的丢失原因包括:

  • 发送时丢失:
    • 生产者发送的消息未送达exchange
    • 消息到达exchange后未到达queue
  • MQ宕机,queue将消息丢失
  • consumer接收到消息后未消费就宕机

针对这些问题,RabbitMQ分别给出了解决方案:

  • 生产者确认机制
  • mq持久化
  • 消费者确认机制
  • 失败重试机制

生产者消息确认

RabbitMQ提供了publisher confirm机制来避免消息发送到MQ过程中丢失。这种机制必须给每个消息指定一个唯一ID。消息发送到MQ以后,会返回一个结果给发送者,表示消息是否处理成功。

返回结果有两种方式:

  • publisher-confirm,发送者确认
    • 消息成功投递到交换机,返回ack
    • 消息未投递到交换机,返回nack
  • publisher-return,发送者回执
    • 消息投递到交换机了,但是没有路由到队列。返回ACK,及路由失败原因。

注意:

修改配置

首先,修改publisher服务中的application.yml文件,添加下面的内容:

1
2
3
4
5
6
7
spring:
rabbitmq:
publisher-confirm-type: correlated
publisher-returns: true
template:
mandatory: true

说明:

  • publish-confirm-type:开启publisher-confirm,这里支持两种类型:
    • simple:同步等待confirm结果,直到超时
    • correlated:异步回调,定义ConfirmCallback,MQ返回结果时会回调这个ConfirmCallback
  • publish-returns:开启publish-return功能,同样是基于callback机制,不过是定义ReturnCallback
  • template.mandatory:定义消息路由失败时的策略。true,则调用ReturnCallback;false:则直接丢弃消息

定义ReturnCallback

这是处理到达交换机但是没有到达队列的消息return回调

每个RabbitTemplate只能配置一个ReturnCallback,因此需要在项目加载时配置:

修改publisher服务,添加一个:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18

@Slf4j
@Configuration
public class CommonConfig implements ApplicationContextAware {
//implements ApplicationContextAware
//ApplicationContext里面存放的是bean对象,Aware是通知类,在所有bean初始化完后会执行通知类 实现ApplicationContextAware可以在bean初始完之后通知调用此类
@Override
public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
// 获取RabbitTemplate
RabbitTemplate rabbitTemplate = applicationContext.getBean(RabbitTemplate.class);
// 设置ReturnCallback
rabbitTemplate.setReturnCallback((message, replyCode, replyText, exchange, routingKey) -> {
// 投递失败,记录日志
log.info("消息发送失败,应答码{},原因{},交换机{},路由键{},消息{}",replyCode, replyText, exchange, routingKey, message.toString());
// 如果有业务需要,可以重发消息
});
}
}

定义ConfirmCallback

这是处理是否到达交换机的消息confirm回调

ConfirmCallback可以在发送消息时指定,因为每个业务处理confirm成功或失败的逻辑不一定相同。

在publisher服务的cn.itcast.mq.spring.SpringAmqpTest类中,定义一个单元测试方法:

mq全局ID.png

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
public void testSendMessage2SimpleQueue() throws InterruptedException {
// 1.消息体
String message = "hello, spring amqp!";
// 2.全局唯一的消息ID,需要封装到CorrelationData中
CorrelationData correlationData = new CorrelationData(UUID.randomUUID().toString());
// 3.添加callback
correlationData.getFuture().addCallback(
result -> {
if(result.isAck()){
// 3.1.ack,消息成功
log.debug("消息发送成功, ID:{}", correlationData.getId());
}else{
// 3.2.nack,消息失败
log.error("消息发送失败, ID:{}, 原因{}",correlationData.getId(), result.getReason());
}
},
ex -> log.error("消息发送异常, ID:{}, 原因{}",correlationData.getId(),ex.getMessage())
);
// 4.发送消息
rabbitTemplate.convertAndSend("task.direct", "task", message, correlationData);

// 休眠一会儿,等待ack回执
Thread.sleep(2000);
}

消息持久化

生产者确认可以确保消息投递到RabbitMQ的队列中,但是消息发送到RabbitMQ以后,如果突然宕机,也可能导致消息丢失。

要想确保消息在RabbitMQ中安全保存,必须开启消息持久化机制。

  • 交换机持久化
  • 队列持久化
  • 消息持久化

交换机持久化

RabbitMQ中交换机默认是非持久化的,mq重启后就丢失。(在可视界面声明的交换机)

SpringAMQP中可以通过代码指定交换机持久化:

1
2
3
4
5
@Bean
public DirectExchange simpleExchange(){
// 三个参数:交换机名称、是否持久化、当没有queue与其绑定时是否自动删除
return new DirectExchange("simple.direct", true, false);
}

事实上,默认情况下,由SpringAMQP声明的交换机都是持久化的。

队列持久化

RabbitMQ中队列默认是非持久化的,mq重启后就丢失。(在可视界面声明的队列)

SpringAMQP中可以通过代码指定交换机持久化:

1
2
3
4
5
@Bean
public Queue simpleQueue(){
// 使用QueueBuilder构建队列,durable就是持久化的
return QueueBuilder.durable("simple.queue").build();
}

事实上,默认情况下,由SpringAMQP声明的队列都是持久化的。

消息持久化

利用SpringAMQP发送消息时,可以设置消息的属性(MessageProperties),指定delivery-mode:

  • 1:非持久化
  • 2:持久化
1
2
3
4
5
6
7
8
@Test
public void testDurableMessage() {
//准备消息
Message message = MessageBuilder.withBody("hello".getBytes(StandardCharsets.UTF_8))
.setDeliveryMode(MessageDeliveryMode.PERSISTENT)//持久化
.build();
rabbitTemplate.convertAndSend("simple.queue",message);
}

默认情况下,SpringAMQP发出的任何消息都是持久化的,不用特意指定。

消费者消息确认

RabbitMQ是阅后即焚机制,RabbitMQ确认消息被消费者消费后会立刻删除。

而RabbitMQ是通过消费者回执来确认消费者是否成功处理消息的:消费者获取消息后,应该向RabbitMQ发送ACK回执,表明自己已经处理消息。如果收到的是unack会重新发消息

设想这样的场景:

  • 1)RabbitMQ投递消息给消费者
  • 2)消费者获取消息后,返回ACK给RabbitMQ
  • 3)RabbitMQ删除消息
  • 4)消费者宕机,消息尚未处理

这样,消息就丢失了。因此消费者返回ACK的时机非常重要。

而SpringAMQP则允许配置三种确认模式:

•manual:手动ack,需要在业务代码结束后,调用api发送ack。

•auto:自动ack,由spring监测listener代码是否出现异常,没有异常则返回ack;抛出异常则返回nack(用的最多)

•none:关闭ack,MQ假定消费者获取消息后会成功处理,因此消息投递后立即被删除

由此可知:

  • none模式下,消息投递是不可靠的,可能丢失
  • auto模式类似事务机制,出现异常时返回nack,消息回滚到mq;没有异常,返回ack
  • manual:自己根据业务情况,判断什么时候该ack

一般,我们都是使用默认的auto即可。

演示auto模式

确认机制修改为auto:

1
2
3
4
5
spring:
rabbitmq:
listener:
simple:
acknowledge-mode: auto # 关闭ack

认为设置一个异常位置,发送消息,可以发现此时消息状态为unack(未确定状态)

抛出异常后,因为Spring会自动返回unack,所以消息恢复至Ready状态,并且没有被RabbitMQ删除,会重新发送

消费失败重试机制

当消费者出现异常后,消息会不断requeue(重入队)到队列,再重新发送给消费者,然后再次异常,再次requeue,无限循环,导致mq的消息处理飙升,带来不必要的压力

本地重试

我们可以利用Spring的retry机制,在消费者出现异常时利用本地重试,而不是无限制的requeue到mq队列。

修改consumer服务的application.yml文件,添加内容:

1
2
3
4
5
6
7
8
9
10
spring:
rabbitmq:
listener:
simple:
retry:
enabled: true # 开启消费者失败重试
initial-interval: 1000 # 初识的失败等待时长为1秒
multiplier: 1 # 失败的等待时长倍数,下次等待时长 = multiplier * last-interval
max-attempts: 3 # 最大重试次数
stateless: true # true无状态;false有状态。如果业务中包含事务,这里改为false 一般就默认无状态true

结论:

  • 开启本地重试时,消息处理过程中抛出异常,不会requeue到队列,而是在消费者本地重试
  • 重试达到最大次数后,Spring会返回reject拒绝,mq队列中消息会被丢弃
失败策略

在之前的测试中,达到最大重试次数后,消息会被丢弃,这是由Spring内部机制决定的。

在开启重试模式后,重试次数耗尽,如果消息依然失败,则需要有MessageRecovery接口来处理,它包含三种不同的实现:

  • RejectAndDontRequeueRecoverer:重试耗尽后,直接reject,丢弃消息。默认就是这种方式

  • ImmediateRequeueMessageRecoverer:重试耗尽后,返回nack,消息重新入队

  • RepublishMessageRecoverer:重试耗尽后,将失败消息投递到指定的交换机

比较优雅的一种处理方案是RepublishMessageRecoverer,失败后将消息投递到一个指定的,专门存放异常消息的队列,后续由人工集中处理。

1)在consumer服务中定义处理失败消息的交换机和队列

1
2
3
4
5
6
7
8
9
10
11
12
@Bean
public DirectExchange errorMessageExchange(){
return new DirectExchange("error.direct");
}
@Bean
public Queue errorQueue(){
return new Queue("error.queue", true);
}
@Bean
public Binding errorBinding(Queue errorQueue, DirectExchange errorMessageExchange){
return BindingBuilder.bind(errorQueue).to(errorMessageExchange).with("error");
}

2)定义一个RepublishMessageRecoverer,关联队列和交换机

1
2
3
4
@Bean //覆盖MessageRecovery原有的默认bean实现
public MessageRecoverer republishMessageRecoverer(RabbitTemplate rabbitTemplate){
return new RepublishMessageRecoverer(rabbitTemplate, "error.direct", "error");
}

完整代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22


@Configuration
public class ErrorMessageConfig {
@Bean
public DirectExchange errorMessageExchange(){
return new DirectExchange("error.direct");
}
@Bean
public Queue errorQueue(){
return new Queue("error.queue", true);
}
@Bean
public Binding errorBinding(Queue errorQueue, DirectExchange errorMessageExchange){
return BindingBuilder.bind(errorQueue).to(errorMessageExchange).with("error");
}

@Bean
public MessageRecoverer republishMessageRecoverer(RabbitTemplate rabbitTemplate){
return new RepublishMessageRecoverer(rabbitTemplate, "error.direct", "error");
}
}

这样当消息彻底失败后会最终由消费者交给MessageRecovery处理转给一个必定的异常交换机和异常队列

总结

如何确保RabbitMQ消息的可靠性?

  • 开启生产者确认机制,确保生产者的消息能到达队列
  • 开启持久化功能,确保消息未消费前在队列中不会丢失
  • 开启消费者确认机制为auto,由spring确认消息处理成功后完成ack
  • 开启消费者失败重试机制,并设置MessageRecoverer,多次重试失败后将消息投递到异常交换机,交由人工处理

死信交换机

什么是死信交换机

什么是死信?

当一个队列中的消息满足下列情况之一时,可以成为死信(dead letter):

  • 消费者使用basic.reject或 basic.nack声明消费失败,并且消息的requeue参数设置为false
  • 消息是一个过期消息,超时无人消费
  • 要投递的队列消息满了,无法投递

如果这个包含死信的队列配置了dead-letter-exchange属性,指定了一个交换机,那么队列中的死信就会投递到这个交换机中,而这个交换机称为死信交换机(Dead Letter Exchange,检查DLX)。

如图,一个消息被消费者拒绝了,变成了死信:

因为simple.queue绑定了死信交换机 dl.direct,因此死信会投递给这个交换机:

如果这个死信交换机也绑定了一个队列,则消息最终会进入这个存放死信的队列:

死信交换机原理.png

另外,队列将死信投递给死信交换机时,必须知道两个信息:

  • 死信交换机名称
  • 死信交换机与死信队列绑定的RoutingKey (这个好像不是必须的)

这样才能确保投递的消息能到达死信交换机,并且正确的路由到死信队列。

利用死信交换机接收死信

在失败重试策略中,默认的RejectAndDontRequeueRecoverer会在本地重试次数耗尽后,发送reject给RabbitMQ,消息变成死信,被丢弃。

我们可以给simple.queue添加一个死信交换机,给死信交换机绑定一个队列。这样消息变成死信后也不会丢弃,而是最终投递到死信交换机,路由到与死信交换机绑定的队列。

我们在consumer服务中,定义一组死信交换机、死信队列:

需要注意!!
死信交换机本质上也只是普通的交换机 只是需要处理死信的队列需要deadLetterExchange指定一个交换机为他处理死信,所以才叫死信交换机,声明方式和一般交换机没有区别

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
// 声明普通的 simple.queue队列,并且为其指定死信交换机:dl.direct
@Bean
public Queue simpleQueue2(){
return QueueBuilder.durable("simple.queue") // 指定队列名称,并持久化
.deadLetterExchange("dl.direct") // 指定死信交换机
.deadLetterRoutingKey("dl") //这个似乎不是必须的
.build();
}
// 声明死信交换机 dl.direct
@Bean
public DirectExchange dlExchange(){
return new DirectExchange("dl.direct", true, false);
}
// 声明存储死信的队列 dl.queue
@Bean
public Queue dlQueue(){
return new Queue("dl.queue", true);
}
// 将死信队列 与 死信交换机绑定
@Bean
public Binding dlBinding(){
return BindingBuilder.bind(dlQueue()).to(dlExchange()).with("simple");
}

总结

什么样的消息会成为死信?

  • 消息被消费者reject或者返回nack
  • 消息超时未消费
  • 队列满了

死信交换机的使用场景是什么?

  • 如果队列绑定了死信交换机,死信会投递到死信交换机;
  • 可以利用死信交换机收集所有消费者处理失败的消息(死信),交由人工处理,进一步提高消息队列的可靠性。

TTL

死信交换机可以做定时器,但是比较麻烦 需要自定义死信交换机和队列

一个队列中的消息如果超时未消费,则会变为死信,超时分为两种情况:

  • 消息所在的队列设置了超时时间
  • 消息本身设置了超时时间

接收超时死信的死信交换机

在consumer服务的SpringRabbitListener中,定义一个新的消费者,并且声明 死信交换机、死信队列:

1
2
3
4
5
6
7
8
@RabbitListener(bindings = @QueueBinding(
value = @Queue(name = "dl.ttl.queue", durable = "true"),
exchange = @Exchange(name = "dl.ttl.direct"),
key = "ttl"
))
public void listenDlQueue(String msg){
log.info("接收到 dl.ttl.queue的延迟消息:{}", msg);
}

声明一个队列,并且指定TTL

要给队列设置超时时间,需要在声明队列时配置x-message-ttl属性:

1
2
3
4
5
6
7
8
@Bean
public Queue ttlQueue(){
return QueueBuilder.durable("ttl.queue") // 指定队列名称,并持久化
.ttl(10000) // 设置队列的超时时间,10秒
.deadLetterExchange("dl.ttl.direct") // 指定死信交换机
.deadLetterRoutingKey("dl") //这个似乎不是必须的
.build();
}

注意,这个队列设定了死信交换机为dl.ttl.direct

声明交换机,将ttl与交换机绑定:

1
2
3
4
5
6
7
8
@Bean
public DirectExchange ttlExchange(){
return new DirectExchange("ttl.direct");
}
@Bean
public Binding ttlBinding(){
return BindingBuilder.bind(ttlQueue()).to(ttlExchange()).with("ttl");
}

发送消息,但是不要指定TTL:

1
2
3
4
5
6
7
8
9
10
11
@Test
public void testTTLQueue() {
// 创建消息
String message = "hello, ttl queue";
// 消息ID,需要封装到CorrelationData中
CorrelationData correlationData = new CorrelationData(UUID.randomUUID().toString());
// 发送消息
rabbitTemplate.convertAndSend("ttl.direct", "ttl", message, correlationData);
// 记录日志
log.debug("发送消息成功");
}

因为队列的TTL值是10000ms,也就是10秒。可以看到消息发送与接收之间的时差刚好是10秒。

发送消息时,设定TTL

在发送消息时,也可以指定TTL:

1
2
3
4
5
6
7
8
9
10
11
12
13
@Test
public void testTTLMsg() {
// 创建消息
Message message = MessageBuilder
.withBody("hello, ttl message".getBytes(StandardCharsets.UTF_8))
.setExpiration("5000")
.build();
// 消息ID,需要封装到CorrelationData中
CorrelationData correlationData = new CorrelationData(UUID.randomUUID().toString());
// 发送消息
rabbitTemplate.convertAndSend("ttl.direct", "ttl", message, correlationData);
log.debug("发送消息成功");
}

这次,发送与接收的延迟只有5秒。说明当队列、消息都设置了TTL时,任意一个到期就会成为死信。

显然,这种方式可以做定时器,但是比较麻烦 需要自定义死信交换机和队列

总结

消息超时的两种方式是?

  • 给队列设置ttl属性,进入队列后超过ttl时间的消息变为死信
  • 给消息设置ttl属性,队列接收到消息超过ttl时间后变为死信

如何实现发送一个消息20秒后消费者才收到消息?

  • 给消息的目标队列指定死信交换机 不指定消费者
  • 将消费者监听的队列绑定到死信交换机
  • 发送消息时给消息设置超时时间为20秒

延迟队列

利用TTL结合死信交换机,我们实现了消息发出后,消费者延迟收到消息的效果。这种消息模式就称为延迟队列(Delay Queue)模式。

延迟队列的使用场景包括:

  • 延迟发送短信
  • 用户下单,如果用户在15 分钟内未支付,则自动取消
  • 预约工作会议,20分钟后自动通知所有参会人员

因为延迟队列的需求非常多,所以RabbitMQ的官方也推出了一个插件,原生支持延迟队列效果。

使用方式可以参考官网地址:https://blog.rabbitmq.com/posts/2015/04/scheduling-messages-with-rabbitmq

安装DelayExchange插件

官方的安装指南地址为:https://blog.rabbitmq.com/posts/2015/04/scheduling-messages-with-rabbitmq

上述文档是基于linux原生安装RabbitMQ,然后安装插件。

所以下面我们会讲解基于Docker来安装RabbitMQ插件。

下载插件

RabbitMQ有一个官方的插件社区,地址为:https://www.rabbitmq.com/community-plugins.html

对应的GitHub页面下载3.8.9版本的插件,地址为https://github.com/rabbitmq/rabbitmq-delayed-message-exchange/releases/tag/3.8.9这个对应RabbitMQ的3.8.5以上版本。

上传插件

需要先查看RabbitMQ的插件目录对应的数据卷。如果不是基于Docker的,重新创建Docker容器。

1
docker pull rabbitmq:3.8-management
1
2
3
4
5
6
7
8
9
10
docker run \
-e RABBITMQ_DEFAULT_USER=itcast \
-e RABBITMQ_DEFAULT_PASS=123321 \
-v mq-plugins:/plugins \
--name mq \
--hostname mq1 \
-p 15672:15672 \
-p 5672:5672 \
-d \
rabbitmq:3.8-management

我们之前设定的RabbitMQ的数据卷名称为mq-plugins,所以我们使用下面命令查看数据卷:

1
docker volume inspect mq-plugins
安装插件

最后就是安装了,需要进入MQ容器内部来执行安装。我的容器名为mq,所以执行下面命令:

1
docker exec -it mq bash

执行时,请将其中的 -it 后面的mq替换为你自己的容器名.

进入容器内部后,执行下面命令开启插件:

1
rabbitmq-plugins enable rabbitmq_delayed_message_exchange

DelayExchange原理

DelayExchange需要将一个交换机声明为delayed类型。当我们发送消息到delayExchange时,流程如下:

  • 接收消息
  • 判断消息是否具备x-delay属性
  • 如果有x-delay属性,说明是延迟消息,持久化到硬盘,读取x-delay值,作为延迟时间
  • 返回routing not found结果给消息发送者
  • x-delay时间到期后,重新投递消息到指定队列

使用DelayExchange

插件的使用也非常简单:声明一个交换机,交换机的类型可以是任意类型,只需要设定delayed属性为true即可,然后声明队列与其绑定即可。

1)声明DelayExchange交换机

基于注解方式(推荐):

1
2
3
4
5
6
7
8
@RabbitListener(bindings = @QueueBinding(
value = @Queue(name = "delay.queue",durable = "true"),
exchange = @Exchange(name = "delay.direct",delayed = "true"),
key = "delay"
))
public void listenDelayExchange(String msg){
log.info("getDelayMessage:{}",msg);
}

也可以基于@Bean的方式:

1
2
3
4
5
6
7
8
@Bean
public DirectExchange dlExchange(){
return ExchangeBuilder
.directExchange("delay.direct")
.durable(true)
.delayed() //指定延迟交换机
.build();
}

2)发送消息

发送消息时,一定要携带x-delay属性,指定延迟的时间:

1
2
3
4
5
6
7
8
9
10
11
12
@Test
public void testDelayMessage() {
//准备消息
Message message = MessageBuilder.withBody("hello".getBytes(StandardCharsets.UTF_8))
.setDeliveryMode(MessageDeliveryMode.PERSISTENT)
.setHeader("x-delay",5000)
.build();
//设立消息全局ID
CorrelationData correlationData = new CorrelationData(UUID.randomUUID().toString());
rabbitTemplate.convertAndSend("delay.direct","delay",message,correlationData);
log.info("消息成功发送");
}

这里需要注意的是:如果设立了全局returncallback 超时的消息会被捕获 但这是不合理的

需要做出处理

1
2
3
4
5
//判断是否是延迟消息
if(message.getMessageProperties().getReceivedDelay()>0){
//忽略
return;
}

总结

延迟队列插件的使用步骤包括哪些?

•声明一个交换机,添加delayed属性为true

•发送消息时,添加x-delay头,值为超时时间

惰性队列

消息堆积问题

当生产者发送消息的速度超过了消费者处理消息的速度,就会导致队列中的消息堆积,直到队列存储消息达到上限。之后发送的消息就会成为死信,可能会被丢弃,这就是消息堆积问题。

解决消息堆积有两种思路:

  • 增加更多消费者,提高消费速度。也就是我们之前说的work queue模式
  • 扩大队列容积,提高堆积上限
  • 开启线程池

要提升队列容积,把消息保存在内存中显然是不行的。

惰性队列

从RabbitMQ的3.6.0版本开始,就增加了Lazy Queues的概念,也就是惰性队列。惰性队列的特征如下:

  • 接收到消息后直接存入磁盘而非内存
  • 消费者要消费消息时才会从磁盘中读取并加载到内存
  • 支持数百万条的消息存储

基于命令行设置lazy-queue

而要设置一个队列为惰性队列,只需要在声明队列时,指定x-queue-mode属性为lazy即可。可以通过命令行将一个运行中的队列修改为惰性队列:

1
rabbitmqctl set_policy Lazy "^lazy-queue$" '{"queue-mode":"lazy"}' --apply-to queues  

命令解读:

  • rabbitmqctl :RabbitMQ的命令行工具
  • set_policy :添加一个策略
  • Lazy :策略名称,可以自定义
  • "^lazy-queue$" :用正则表达式匹配队列的名字
  • '{"queue-mode":"lazy"}' :设置队列模式为lazy模式
  • --apply-to queues :策略的作用对象,是所有的队列

基于@Bean声明lazy-queue

1
2
3
4
5
6
@Bean
public Queue lazyQueue() {
return QueueBuilder.durable("lazy.queue")
.lazy()
.build();
}

基于@RabbitListener声明LazyQueue

1
2
3
4
5
@RabbitListenner(queuesToDeclare = @Queue(
name = "lazy.queue",
durable = "true",
arguments = @Argument(name = "x-queue-mode",value = "lazy")))
public void listenLazyQueue(String msg){}

总结

惰性队列的优点有哪些?

  • 基于磁盘存储,消息上限高
  • 没有间歇性的page-out,性能比较稳定

惰性队列的缺点有哪些?

  • 基于磁盘存储,消息时效性会降低
  • 性能受限于磁盘的IO

MQ集群

集群分类

RabbitMQ的是基于Erlang语言编写,而Erlang又是一个面向并发的语言,天然支持集群模式。RabbitMQ的集群有两种模式:

普通集群:是一种分布式集群,将队列分散到集群的各个节点,从而提高整个集群的并发能力。

镜像集群:是一种主从集群,普通集群的基础上,添加了主从备份功能,提高集群的数据可用性。

镜像集群虽然支持主从,但主从同步并不是强一致的,某些情况下可能有数据丢失的风险。因此在RabbitMQ的3.8版本以后,推出了新的功能:仲裁队列来代替镜像集群,底层采用Raft协议确保主从的数据一致性。

普通集群

集群结构和特征

普通集群,或者叫标准集群(classic cluster),具备下列特征:

  • 会在集群的各个节点间共享部分数据,包括:交换机、队列元信息。不包含队列中的消息。
  • 当访问集群某节点时,如果队列不在该节点,会从数据所在节点传递到当前节点并返回
  • 队列所在节点宕机,队列中的消息就会丢失

结构如图:

mq普通集群特征.png

部署

普通模式集群,我们的计划部署3节点的mq集群:

主机名 控制台端口 amqp通信端口
mq1 8081 —> 15672 8071 —> 5672
mq2 8082 —> 15672 8072 —> 5672
mq3 8083 —> 15672 8073 —> 5672

集群中的节点标示默认都是:rabbit@[hostname],因此以上三个节点的名称分别为:

  • rabbit@mq1
  • rabbit@mq2
  • rabbit@mq3
获取cookie

RabbitMQ底层依赖于Erlang,而Erlang虚拟机就是一个面向分布式的语言,默认就支持集群模式。集群模式中的每个RabbitMQ 节点使用 cookie 来确定它们是否被允许相互通信。

要使两个节点能够通信,它们必须具有相同的共享秘密,称为Erlang cookie。cookie 只是一串最多 255 个字符的字母数字字符。

每个集群节点必须具有相同的 cookie。实例之间也需要它来相互通信。

我们先在之前启动的mq容器中获取一个cookie值,作为集群的cookie。执行下面的命令:

1
docker exec -it mq cat /var/lib/rabbitmq/.erlang.cookie

可以看到cookie值如下:(要替换成自己的cookie)

1
FXZMCVGLBIXZCDEMMVZQ

接下来,停止并删除当前的mq容器,我们重新搭建集群。(同时可以清理一下数据卷 docker volume prune)

1
docker rm -f mq
准备集群配置

在/tmp目录新建一个配置文件 rabbitmq.conf:

1
2
3
cd /tmp
# 创建文件
touch rabbitmq.conf

文件内容如下:

1
2
3
4
5
6
loopback_users.guest = false
listeners.tcp.default = 5672
cluster_formation.peer_discovery_backend = rabbit_peer_discovery_classic_config
cluster_formation.classic_config.nodes.1 = rabbit@mq1
cluster_formation.classic_config.nodes.2 = rabbit@mq2
cluster_formation.classic_config.nodes.3 = rabbit@mq3

再创建一个文件,记录cookie

1
2
3
4
5
6
7
cd /tmp
# 创建cookie文件
touch .erlang.cookie
# 写入cookie
echo "XKASOVMXKWAGJFLMKNHG" > .erlang.cookie
# 修改cookie文件的权限(只读)
chmod 600 .erlang.cookie

准备三个目录,mq1、mq2、mq3:

1
2
3
cd /tmp
# 创建目录
mkdir mq1 mq2 mq3

然后拷贝rabbitmq.conf、cookie文件到mq1、mq2、mq3:

1
2
3
4
5
6
7
8
9
# 进入/tmp
cd /tmp
# 拷贝
cp rabbitmq.conf mq1
cp rabbitmq.conf mq2
cp rabbitmq.conf mq3
cp .erlang.cookie mq1
cp .erlang.cookie mq2
cp .erlang.cookie mq3
启动集群

创建一个网络:

1
docker network create mq-net

运行命令

1
2
3
4
5
6
7
8
9
10
docker run -d --net mq-net \
-v ${PWD}/mq1/rabbitmq.conf:/etc/rabbitmq/rabbitmq.conf \
-v ${PWD}/.erlang.cookie:/var/lib/rabbitmq/.erlang.cookie \
-e RABBITMQ_DEFAULT_USER=itcast \
-e RABBITMQ_DEFAULT_PASS=123321 \
--name mq1 \
--hostname mq1 \
-p 8071:5672 \
-p 8081:15672 \
rabbitmq:3.8-management
1
2
3
4
5
6
7
8
9
10
docker run -d --net mq-net \
-v ${PWD}/mq2/rabbitmq.conf:/etc/rabbitmq/rabbitmq.conf \
-v ${PWD}/.erlang.cookie:/var/lib/rabbitmq/.erlang.cookie \
-e RABBITMQ_DEFAULT_USER=itcast \
-e RABBITMQ_DEFAULT_PASS=123321 \
--name mq2 \
--hostname mq2 \
-p 8072:5672 \
-p 8082:15672 \
rabbitmq:3.8-management
1
2
3
4
5
6
7
8
9
10
docker run -d --net mq-net \
-v ${PWD}/mq3/rabbitmq.conf:/etc/rabbitmq/rabbitmq.conf \
-v ${PWD}/.erlang.cookie:/var/lib/rabbitmq/.erlang.cookie \
-e RABBITMQ_DEFAULT_USER=itcast \
-e RABBITMQ_DEFAULT_PASS=123321 \
--name mq3 \
--hostname mq3 \
-p 8073:5672 \
-p 8083:15672 \
rabbitmq:3.8-management

镜像集群

集群结构和特征

镜像集群:本质是主从模式,具备下面的特征:

  • 交换机、队列、队列中的消息会在各个mq的镜像节点之间同步备份。
  • 创建队列的节点被称为该队列的主节点,备份到的其它节点叫做该队列的镜像节点。
  • 一个队列的主节点可能是另一个队列的镜像节点
  • 所有操作都是主节点完成,然后同步给镜像节点
  • 主宕机后,镜像节点会替代成新的主
  • 不具备负载均衡功能,因为所有操作都会有主节点完成(但是不同队列,其主节点可以不同,可以利用这个提高吞吐量)
    结构如图:

mq镜像集群特征.png

部署

镜像模式的配置

镜像模式的配置有3种模式:

ha-mode ha-params 效果
准确模式exactly 队列的副本量count 集群中队列副本(主服务器和镜像服务器之和)的数量。count如果为1意味着单个副本:即队列主节点。count值为2表示2个副本:1个队列主和1个队列镜像。换句话说:count = 镜像数量 + 1。如果群集中的节点数少于count,则该队列将镜像到所有节点。如果有集群总数大于count+1,并且包含镜像的节点出现故障,则将在另一个节点上创建一个新的镜像。
all (none) 队列在群集中的所有节点之间进行镜像。队列将镜像到任何新加入的节点。镜像到所有节点将对所有群集节点施加额外的压力,包括网络I / O,磁盘I / O和磁盘空间使用情况。推荐使用exactly,设置副本数为(N / 2 +1)。
nodes node names 指定队列创建到哪些节点,如果指定的节点全部不存在,则会出现异常。如果指定的节点在集群中存在,但是暂时不可用,会创建节点到当前客户端连接到的节点。

这里我们以rabbitmqctl命令作为案例来讲解配置语法。

语法示例:

exactly模式
1
rabbitmqctl set_policy ha-two "^two\." '{"ha-mode":"exactly","ha-params":2,"ha-sync-mode":"automatic"}'
  • rabbitmqctl set_policy:固定写法
  • ha-two:策略名称,自定义
  • "^two\.":匹配队列的正则表达式,符合命名规则的队列才生效,这里是任何以two.开头的队列名称
  • '{"ha-mode":"exactly","ha-params":2,"ha-sync-mode":"automatic"}': 策略内容
    • "ha-mode":"exactly":策略模式,此处是exactly模式,指定副本数量
    • "ha-params":2:策略参数,这里是2,就是副本数量为2,1主1镜像
    • "ha-sync-mode":"automatic":同步策略,默认是manual,即新加入的镜像节点不会同步旧的消息。如果设置为automatic,则新加入的镜像节点会把主节点中所有消息都同步,会带来额外的网络开销
all模式
1
rabbitmqctl set_policy ha-all "^all\." '{"ha-mode":"all"}'
  • ha-all:策略名称,自定义
  • "^all\.":匹配所有以all.开头的队列名
  • '{"ha-mode":"all"}':策略内容
    • "ha-mode":"all":策略模式,此处是all模式,即所有节点都会称为镜像节点
nodes模式
1
rabbitmqctl set_policy ha-nodes "^nodes\." '{"ha-mode":"nodes","ha-params":["rabbit@nodeA", "rabbit@nodeB"]}'
  • rabbitmqctl set_policy:固定写法
  • ha-nodes:策略名称,自定义
  • "^nodes\.":匹配队列的正则表达式,符合命名规则的队列才生效,这里是任何以nodes.开头的队列名称
  • '{"ha-mode":"nodes","ha-params":["rabbit@nodeA", "rabbit@nodeB"]}': 策略内容
    • "ha-mode":"nodes":策略模式,此处是nodes模式
    • "ha-params":["rabbit@mq1", "rabbit@mq2"]:策略参数,这里指定副本所在节点名称

仲裁队列

集群特征

仲裁队列:仲裁队列是3.8版本以后才有的新功能,用来替代镜像队列,具备下列特征:

  • 与镜像队列一样,都是主从模式,支持主从数据同步
  • 使用非常简单,没有复杂的配置
  • 主从同步基于Raft协议,强一致

部署

添加仲裁队列

在任意控制台添加一个队列,一定要选择队列类型为Quorum类型。

仲裁队列的 + 2字样。代表这个队列有2个镜像节点。

因为仲裁队列默认的镜像数为5。如果你的集群有7个节点,那么镜像数肯定是5;而我们集群只有3个节点,因此镜像数量就是2.(不包括自身)

Java代码创建仲裁队列

1
2
3
4
5
6
7
@Bean
public Queue quorumQueue() {
return QueueBuilder
.durable("quorum.queue") // 持久化
.quorum() // 仲裁队列
.build();
}

SpringAMQP连接MQ集群

注意,这里用address来代替host、port方式

1
2
3
4
5
6
spring:
rabbitmq:
addresses: 192.168.150.105:8071, 192.168.150.105:8072, 192.168.150.105:8073
username: itcast
password: 123321
virtual-host: /

集群扩容

加入集群

1)启动一个新的MQ容器:

1
2
3
4
5
6
7
8
9
docker run -d --net mq-net \
-v ${PWD}/.erlang.cookie:/var/lib/rabbitmq/.erlang.cookie \
-e RABBITMQ_DEFAULT_USER=itcast \
-e RABBITMQ_DEFAULT_PASS=123321 \
--name mq4 \
--hostname mq5 \
-p 8074:15672 \
-p 8084:15672 \
rabbitmq:3.8-management

2)进入容器控制台:

1
docker exec -it mq4 bash

3)停止mq进程

1
rabbitmqctl stop_app

4)重置RabbitMQ中的数据:

1
rabbitmqctl reset

5)加入mq1:

1
rabbitmqctl join_cluster rabbit@mq1

6)再次启动mq进程

1
rabbitmqctl start_app

增加仲裁队列副本

我们先查看下quorum.queue这个队列目前的副本情况,进入mq1容器:

1
docker exec -it mq1 bash

执行命令:(可以查看集群状态)

1
rabbitmq-queues quorum_status "quorum.queue"

现在,我们让mq4也加入进来:

1
rabbitmq-queues add_member "quorum.queue" "rabbit@mq4"