SpringCloud
图片来源:黑马以及网络及自制
资料来源:维基、百度百科、黑马
目录:
- 什么是微服务
- Eureka注册中心
- Ribbon负载均衡
- Nacos注册中心
- Feign远程调用
- Gateway服务网关
- Docker应用容器引擎
- RabbitMQ服务异步通讯
- Elasticsearch分布式搜索引擎
- Sentinel高可用流量防护组件
- Seate分布式事务
- Redis分布式缓存
- 多级缓存
- RabbitMQ高级特性
微服务
什么是微服务
维基上对其定义为:一种软件开发技术- 面向服务的体系结构(SOA)架构样式的一种变体,它提倡将单一应用程序划分成一组小的服务,服务之间互相协调、互相配合,为用户提供最终价值。每个服务运行在其独立的进程中,服务与服务间采用轻量级的通信机制互相沟通(通常是基于HTTP的RESTful API)。每个服务都围绕着具体业务进行构建,并且能够独立地部署到生产环境、类生产环境等。另外,应尽量避免统一的、集中式的服务管理机制,对具体的一个服务而言,应根据上下文,选择合适的语言、工具对其进行构建。
微服务框架示意图:
自动化管理工具:
微服务知识体系:
传统的单体架构
将业务的所有功能集中在一个项目中开发,打成一个包部署
- 优点:
- 架构简单
- 部署成本低
- 缺点:
- 耦合度高
分布式架构
根据业务功能对系统进行拆分,每个业务模块作为独立项目开发,称为一个服务
- 优点:
- 降低服务耦合
- 有利于服务升级拓展
- 缺点:架构复杂
soa分布式和微服务关系和区别:
- 分布式:将单体架构中单个部分进行拆分,然后部署到不同机子
- soa和微服务是分布式架构的
- soa是面向服务的架构,系统所有服务注册在总线上,调用服务时,从总线上找服务信息,然后进行调用
- 微服务是更彻底的面向服务的架构:将系统的各个功能抽成一个小小的应用程序,基本保持一个应用对应一个服务的架构
微服务
微服务是一种经过良好架构设计的分布式架构方案,微服务架构特征:
- 单一职责:微服务拆分粒度更小,每一个服务都对应唯一的业务能力,做到单一职责,避免重复业务开发
- 面向服务:微服务对外暴露业务接口
- 自治:团队独立、技术独立、数据独立、部署独立
- 隔离性强:服务调用做好隔离、容错、降级,避免出现级联问题
常见的企业微服务结合技术
SpringCloud
SpringCloud集成了各种微服务功能组件,并基于SpringBoot实现了这些组件的自动装配,从而提供了良好的开箱即用体验
常见的组件
这里学习的版本是Hoxton.SR10,因此对应的SpringBoot版本是2.3.x版本
各个版本的对应关系
微服务拆分案例
-> cloud-demo
cloud-demo:父工程,管理依赖
- order-service:订单微服务,负责订单相关业务
- user-service:用户微服务,负责用户相关业务
项目被分成两个独立的服务包括各自独立的数据库,这里的需求就是在order服务去调用获取user服务相关的信息
步骤
在order-service的OrderApplication中注册RestTemplate
1
2
3
4
5
6
7
8
9
10
11
12
13
14
public class OrderApplication {
public static void main(String[] args) {
SpringApplication.run(OrderApplication.class, args);
}
public RestTemplate restTemplate() {
return new RestTemplate();
}
}在需要调用远程服务的controller注入RestTemplate并发起调用
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
private RestTemplate restTemplate;
public Order queryOrderByUserId( { Long orderId)
// 根据id查询订单并返回
Order order = orderService.queryOrderById(orderId);
// TODO: 2022/9/19 查询用户并加入到查询结果中
//访问的另一个服务地址
String url = "http://localhost:8081/user/" + order.getUserId();
//调用restTemplate进行远程服务获取用户信息
User user = restTemplate.getForObject(url, User.class);
// 将查询结果封装在order
order.setUser(user);
// 返回最终结果
return order;
}idea操作多个微服务
参考链接:IDEA中启动多个微服务(开启Services管理)
通过下述方式即可操作多个微服务
点击之后即可在以下窗口操作多个微服务
提供者与消费者
在服务调用关系中,会有两个不同的角色:
服务提供者:一次业务中,被其它微服务调用的服务。(提供接口给其它微服务)
服务消费者:一次业务中,调用其它微服务的服务。(调用其它微服务提供的接口)
其中,一个服务既可以是服务提供者也可以是服务消费者
eureka注册中心
eureka作用与工作划分
多服务调用出现的问题
- 服务消费者该如何获取服务提供者的地址信息?注册服务信息、拉取服务
- 如果有多个服务提供者,消费者该如何选择?负载均衡
- 消费者如何得知服务提供者的健康状态?心跳续约
eureka工作示意图:
在Eureka架构中,微服务角色有两类:
- EurekaServer:服务端,注册中心
- 记录服务信息
- 心跳监控
- EurekaClient:客户端
- Provider:服务提供者,例如案例中的 user-service
注册自己的信息到EurekaServer
每隔30秒向EurekaServer发送心跳 - Consumer:服务消费者,例如案例中的 order-service
根据服务名称从EurekaServer拉取服务列表
基于服务列表做负载均衡,选中一个微服务后发起远程调用
- Provider:服务提供者,例如案例中的 user-service
eureka注册中心搭建
步骤如图
搭建EurekaServer注册中心
引入依赖
1
2
3
4<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-netflix-eureka-server</artifactId>
</dependency>开启注册中心功能
1
2
3
4
5
6
7
8
9
public class EurekaServerApplication {
public static void main(String[] args) {
SpringApplication.run(EurekaServerApplication.class, args);
}
}配置文件
1
2
3
4
5
6
7
8
9server:
port: 10086
spring:
application:
name: eureka-server
eureka:
client:
service-url:
defaultZone: http://127.0.0.1:10086/eureka其中:为何注册中心也需要配置自己的eureka,原因是注册中心本身也是一个微服务,以后可能会出现多个注册中心互相调用的情况
服务注册
引入依赖
1
2
3
4<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-netflix-eureka-client</artifactId>
</dependency>配置文件:配置服务名称以及注册中心地址
1
2
3
4
5
6
7spring:
application:
name: userservice
eureka:
client:
service-url:
defaultZone: http://127.0.0.1:10086/eureka启动多个user-service实例
配置名称以及没有正在使用的端口号
做完以上步骤就可以在注册中心查看当前服务注册的端口
服务发现
在第二步骤做完后在OrderApplication应用主入口返回RestTemplate处添加一个负载均衡注解
@LoadBalanced
1
2
3
4
5
public RestTemplate restTemplate() {
return new RestTemplate();
}接着在需要访问的服务处将ip地址更改为服务名称
"http://userservice:8081/user/"
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
public Order queryOrderByUserId( { Long orderId)
// 根据id查询订单并返回
Order order = orderService.queryOrderById(orderId);
// TODO: 2022/9/19 查询用户并加入到查询结果中
//访问的另一个服务地址
//String url = "http://localhost:8081/user/" + order.getUserId();
String url = "http://userservice:8081/user/" + order.getUserId();
//调用restTemplate进行远程服务获取用户信息
User user = restTemplate.getForObject(url, User.class);
// 将查询结果封装在order
order.setUser(user);
// 返回最终结果
return order;
}spring会自动帮助我们从eureka-server端,根据userservice这个服务名称,获取实例列表,而后完成负载均衡
Ribbon负载均衡
流程
源码(后面再补)
基本流程如下:
- 拦截我们的RestTemplate请求http://userservice/user/1
- RibbonLoadBalancerClient会从请求url中获取服务名称,也就是user-service
- DynamicServerListLoadBalancer根据user-service到eureka拉取服务列表
- eureka返回列表,localhost:8081、localhost:8082
- IRule利用内置负载均衡规则,从列表中选择一个,例如localhost:8081
- RibbonLoadBalancerClient修改请求地址,用localhost:8081替代userservice,得到http://localhost:8081/user/1,发起真实请求
负载均衡策略
不同规则的含义如下:
默认实现方式为ZoneAvoidanceRule
内置负载均衡规则类 | 规则描述 |
---|---|
RoundRobinRule | 简单轮询服务列表来选择服务器。它是Ribbon默认的负载均衡规则。 |
AvailabilityFilteringRule | 对以下两种服务器进行忽略: (1)在默认情况下,这台服务器如果3次连接失败,这台服务器就会被设置为“短路”状态。短路状态将持续30秒,如果再次连接失败,短路的持续时间就会几何级地增加。 (2)并发数过高的服务器。如果一个服务器的并发连接数过高,配置了AvailabilityFilteringRule规则的客户端也会将其忽略。并发连接数的上限,可以由客户端的 |
WeightedResponseTimeRule | 为每一个服务器赋予一个权重值。服务器响应时间越长,这个服务器的权重就越小。这个规则会随机选择服务器,这个权重值会影响服务器的选择。 |
ZoneAvoidanceRule | 以区域可用的服务器为基础进行服务器的选择。使用Zone对服务器进行分类,这个Zone可以理解为一个机房、一个机架等。而后再对Zone内的多个服务做轮询。 |
BestAvailableRule | 忽略那些短路的服务器,并选择并发数较低的服务器。 |
RandomRule | 随机选择一个可用的服务器。 |
RetryRule | 重试机制的选择逻辑 |
自定义负载均衡策略
两种方式修改负载均衡规则
bean修改方式
1
2
3
4
public IRule randomRule(){
return new RandomRule();
}配置文件配置
1
2
3userservice: # 给某个微服务配置负载均衡规则,这里是userservice服务
ribbon:
NFLoadBalancerRuleClassName: com.netflix.loadbalancer.RandomRule # 负载均衡规则
注意:一般用默认的负载均衡规则,不做修改。
饥饿加载
Ribbon默认是采用懒加载,只会在第一次访问时才会创建LoadBalanceClient
饥饿加载是在项目刚启动时便创建LoadBalanceClient
1 | ribbon: |
nacos注册中心
Nacos简介与安装
Nacos /nɑ:kəʊs/ 是 Dynamic Naming and Configuration Service的首字母简称,一个更易于构建云原生应用的动态服务发现、配置管理和服务管理平台。
Nacos 致力于帮助您发现、配置和管理微服务。Nacos 提供了一组简单易用的特性集,帮助您快速实现动态服务发现、服务配置、服务元数据及流量管理。
Nacos 帮助您更敏捷和容易地构建、交付和管理微服务平台。 Nacos 是构建以“服务”为中心的现代应用架构 (例如微服务范式、云原生范式) 的服务基础设施。
更多简介请看这里
解压压缩包后在bin目录下执行以下命令开启单机版nacos服务
1 | startup.cmd -m standalone |
默认端口为8848,想改可以通过以下示意图修改
访问http://192.168.8.225:8848/nacos/index.html进入nacos,账号密码皆为nacos
Nacos服务注册
在父工程maven管理下引入spring-cloud-alibaba,若有eureka则注释掉其依赖以及相关配置
1 | <!--spring-cloud-alibaba--> |
在需要注册服务的项目下引入alibaba-nacos-discovery依赖
1 | <dependency> |
添加nacos地址
1 | spring: |
查看
Nacos服务分级存储模型
为防止某个机房实例群即集群出现集体意外,所以一般会在多个地方建立多个服务集群,保证服务宕机的意外出现
Nacos服务分级存储模型
- 一级是服务,例如userservice
- 二级是集群,例如杭州或上海
- 三级是实例,例如杭州机房的某台部署了userservice的服务器
服务间的调用可以采用调用本地,也可以去调用远程的,但一般为了性能着想,预先考虑本地服务
模拟集群
在所有服务的application.yml中添加集群配置
1 | spring: |
临时配置可以通过以下方式(复制应用)
1 | -Dspring.cloud.nacos.discovery.cluster-name=SH |
查看
根据集群负载均衡
在服务消费者application.yml加入以下配置
1 | # NACOS配置 |
该策略详细内容如下
- 优先选择同集群服务实例列表
- 在本地集群找不到服务提供者的前提下,会去其它集群寻找,此时控制台会报警告
- 确定了可用实例列表后,再采用随机负载均衡挑选实例
根据权重负载均衡
权重为0-1,为0时永不访问该实例,权重相对于其他实例越低,访问几率越低
可以依照上述规则对性能优劣的服务器进行负载均衡,达到最大效率
配置方式
Nacos环境隔离
Nacos提供了namespace来实现环境隔离功能
- nacos中可以有多个namespace
- namespace下可以有group、service等
- 不同namespace之间相互隔离,例如不同namespace的服务互相不可见
配置方式
在nacos客户端配置以下信息
修改服务实例的application.yml文件
1 | spring: |
查看
此时服务提供者若没有在同一命名空间,则服务消费者会在控制台报错
nacos原理以及与eureka对比
nacos工作示意图
对比一下eureka工作示意图
可以得出:
在服务提供者方面,nacos拥有临时实例和非临时实例两种类型,临时实例采用心跳检测,某种程度讲,eureka就只有临时实例着一种类型,每隔30s向注册中心发一次心跳续约,若30s内注册中心没有收到💓,则主动关闭与之连接
非临时实例,某种程度像注册中心的亲儿子,注册中心在用户不主动删除该服务的前提下会一直保留该服务,不会从服务列表剔除,并每隔一段时间主动询问该服务是否正常
一般服务默认都是临时实例,配置永久实例方式如下
1
2
3
4
5spring:
cloud:
nacos:
discovery:
ephemeral: false # 设置为非临时实例服务消费者方面,nacos的注册中心会主动推送变更信息,若某个服务挂了或是怎样的,注册中心会主动推送消息通知服务消费者,这样,服务列表更新相对于eureka更及时
服务列表缓存:nacos的服务消费者会定时将服务提供者进行缓存,免去重复请求麻烦,并定时更新
Nacos集群默认采用AP方式,当集群中存在非临时实例时,采用CP模式;Eureka采用AP方式(后面补充相关知识)
Nacos配置管理
统一配置管理
Nacos除了可以做注册中心,同样可以做配置管理来使用
注意:项目的核心配置,需要热更新的配置才有放到nacos管理的必要。基本不会变更的一些配置还是保存在微服务本地比较好
nacos创建配置过程
在配置列表点击+号
添加相关配置属性
查看
微服务拉取nacos配置
项目的启动加载配置过程如下图
若有nacos配置文件的话,会在加载application.yml前加载nacos配置文件
引入nacos-config依赖
1
2
3
4
5<!--nacos配置管理依赖-->
<dependency>
<groupId>com.alibaba.cloud</groupId>
<artifactId>spring-cloud-starter-alibaba-nacos-config</artifactId>
</dependency>添加bootstrap.yaml,并将服务名、服务环境、nacos等相关配置从application.yml移动到该文件下
1
2
3
4
5
6
7
8
9
10spring:
application:
name: userservice # 服务名称
profiles:
active: dev #开发环境,这里是dev
cloud:
nacos:
server-addr: localhost:8848 # Nacos地址
config:
file-extension: yaml # 文件后缀名测试读取的配置
1
2
3
4
5
6
7
8
9
10
private String dateformat;
public String nowTime() {
Date now = new Date();
SimpleDateFormat simpleDateFormat = new SimpleDateFormat(dateformat);
String nowString = simpleDateFormat.format(now);
return nowString;
}通过浏览器访问
.../nowTime
查看配置效果
热更新配置
在nacos修改相关配置后就会自动刷新最新的配置,而不需重启服务
方式一:在@Value注入的变量所在类上添加注解@RefreshScope
方式二 :写一个配置类自动读取并注入pattern前缀的属性,之后在需要使用的地方注入即可
1 |
|
使用
1 |
|
配置共享
微服务启动时会从nacos读取多个配置文件:
- [spring.application.name]-[spring.profiles.active].yaml,例如:userservice-dev.yaml
- [spring.application.name].yaml,例如:userservice.yaml
无论profile如何变化,[spring.application.name].yaml这个文件一定会加载,因此多环境共享配置可以写入这个文件
同之前热更新的方式二一样,读取该属性
1 |
|
接着,修改UserApplication2的profile值,这样,他就是test环境,而UserApplication1是dev环境,我们是没有配置test环境的nacos相关配置的
因为直接运行会报错,UserApplication2读取不到pattern.dateformat的值会抛出异常,所以在本地配置以下数据
1 | pattern: |
查看结果
无论是哪个环境都会拿到envSharedValue属性的值。
优先级
Nacos集群搭建
Nacos生产环境下一定要部署为集群状态
搭建Nacos集群的步骤
搭建MySQL集群并初始化数据库表
Nacos默认数据存储在内嵌数据库Derby中,不属于生产可用的数据库。
官方推荐的最佳实践是使用带有主从的高可用数据库集群,下面采用单点数据库为例
新建名为nacos的数据库,并导入下述sql
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198CREATE TABLE `config_info` (
`id` bigint(20) NOT NULL AUTO_INCREMENT COMMENT 'id',
`data_id` varchar(255) NOT NULL COMMENT 'data_id',
`group_id` varchar(255) DEFAULT NULL,
`content` longtext NOT NULL COMMENT 'content',
`md5` varchar(32) DEFAULT NULL COMMENT 'md5',
`gmt_create` datetime NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT '创建时间',
`gmt_modified` datetime NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT '修改时间',
`src_user` text COMMENT 'source user',
`src_ip` varchar(50) DEFAULT NULL COMMENT 'source ip',
`app_name` varchar(128) DEFAULT NULL,
`tenant_id` varchar(128) DEFAULT '' COMMENT '租户字段',
`c_desc` varchar(256) DEFAULT NULL,
`c_use` varchar(64) DEFAULT NULL,
`effect` varchar(64) DEFAULT NULL,
`type` varchar(64) DEFAULT NULL,
`c_schema` text,
PRIMARY KEY (`id`),
UNIQUE KEY `uk_configinfo_datagrouptenant` (`data_id`,`group_id`,`tenant_id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8 COLLATE=utf8_bin COMMENT='config_info';
/******************************************/
/* 数据库全名 = nacos_config */
/* 表名称 = config_info_aggr */
/******************************************/
CREATE TABLE `config_info_aggr` (
`id` bigint(20) NOT NULL AUTO_INCREMENT COMMENT 'id',
`data_id` varchar(255) NOT NULL COMMENT 'data_id',
`group_id` varchar(255) NOT NULL COMMENT 'group_id',
`datum_id` varchar(255) NOT NULL COMMENT 'datum_id',
`content` longtext NOT NULL COMMENT '内容',
`gmt_modified` datetime NOT NULL COMMENT '修改时间',
`app_name` varchar(128) DEFAULT NULL,
`tenant_id` varchar(128) DEFAULT '' COMMENT '租户字段',
PRIMARY KEY (`id`),
UNIQUE KEY `uk_configinfoaggr_datagrouptenantdatum` (`data_id`,`group_id`,`tenant_id`,`datum_id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8 COLLATE=utf8_bin COMMENT='增加租户字段';
/******************************************/
/* 数据库全名 = nacos_config */
/* 表名称 = config_info_beta */
/******************************************/
CREATE TABLE `config_info_beta` (
`id` bigint(20) NOT NULL AUTO_INCREMENT COMMENT 'id',
`data_id` varchar(255) NOT NULL COMMENT 'data_id',
`group_id` varchar(128) NOT NULL COMMENT 'group_id',
`app_name` varchar(128) DEFAULT NULL COMMENT 'app_name',
`content` longtext NOT NULL COMMENT 'content',
`beta_ips` varchar(1024) DEFAULT NULL COMMENT 'betaIps',
`md5` varchar(32) DEFAULT NULL COMMENT 'md5',
`gmt_create` datetime NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT '创建时间',
`gmt_modified` datetime NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT '修改时间',
`src_user` text COMMENT 'source user',
`src_ip` varchar(50) DEFAULT NULL COMMENT 'source ip',
`tenant_id` varchar(128) DEFAULT '' COMMENT '租户字段',
PRIMARY KEY (`id`),
UNIQUE KEY `uk_configinfobeta_datagrouptenant` (`data_id`,`group_id`,`tenant_id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8 COLLATE=utf8_bin COMMENT='config_info_beta';
/******************************************/
/* 数据库全名 = nacos_config */
/* 表名称 = config_info_tag */
/******************************************/
CREATE TABLE `config_info_tag` (
`id` bigint(20) NOT NULL AUTO_INCREMENT COMMENT 'id',
`data_id` varchar(255) NOT NULL COMMENT 'data_id',
`group_id` varchar(128) NOT NULL COMMENT 'group_id',
`tenant_id` varchar(128) DEFAULT '' COMMENT 'tenant_id',
`tag_id` varchar(128) NOT NULL COMMENT 'tag_id',
`app_name` varchar(128) DEFAULT NULL COMMENT 'app_name',
`content` longtext NOT NULL COMMENT 'content',
`md5` varchar(32) DEFAULT NULL COMMENT 'md5',
`gmt_create` datetime NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT '创建时间',
`gmt_modified` datetime NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT '修改时间',
`src_user` text COMMENT 'source user',
`src_ip` varchar(50) DEFAULT NULL COMMENT 'source ip',
PRIMARY KEY (`id`),
UNIQUE KEY `uk_configinfotag_datagrouptenanttag` (`data_id`,`group_id`,`tenant_id`,`tag_id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8 COLLATE=utf8_bin COMMENT='config_info_tag';
/******************************************/
/* 数据库全名 = nacos_config */
/* 表名称 = config_tags_relation */
/******************************************/
CREATE TABLE `config_tags_relation` (
`id` bigint(20) NOT NULL COMMENT 'id',
`tag_name` varchar(128) NOT NULL COMMENT 'tag_name',
`tag_type` varchar(64) DEFAULT NULL COMMENT 'tag_type',
`data_id` varchar(255) NOT NULL COMMENT 'data_id',
`group_id` varchar(128) NOT NULL COMMENT 'group_id',
`tenant_id` varchar(128) DEFAULT '' COMMENT 'tenant_id',
`nid` bigint(20) NOT NULL AUTO_INCREMENT,
PRIMARY KEY (`nid`),
UNIQUE KEY `uk_configtagrelation_configidtag` (`id`,`tag_name`,`tag_type`),
KEY `idx_tenant_id` (`tenant_id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8 COLLATE=utf8_bin COMMENT='config_tag_relation';
/******************************************/
/* 数据库全名 = nacos_config */
/* 表名称 = group_capacity */
/******************************************/
CREATE TABLE `group_capacity` (
`id` bigint(20) unsigned NOT NULL AUTO_INCREMENT COMMENT '主键ID',
`group_id` varchar(128) NOT NULL DEFAULT '' COMMENT 'Group ID,空字符表示整个集群',
`quota` int(10) unsigned NOT NULL DEFAULT '0' COMMENT '配额,0表示使用默认值',
`usage` int(10) unsigned NOT NULL DEFAULT '0' COMMENT '使用量',
`max_size` int(10) unsigned NOT NULL DEFAULT '0' COMMENT '单个配置大小上限,单位为字节,0表示使用默认值',
`max_aggr_count` int(10) unsigned NOT NULL DEFAULT '0' COMMENT '聚合子配置最大个数,,0表示使用默认值',
`max_aggr_size` int(10) unsigned NOT NULL DEFAULT '0' COMMENT '单个聚合数据的子配置大小上限,单位为字节,0表示使用默认值',
`max_history_count` int(10) unsigned NOT NULL DEFAULT '0' COMMENT '最大变更历史数量',
`gmt_create` datetime NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT '创建时间',
`gmt_modified` datetime NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT '修改时间',
PRIMARY KEY (`id`),
UNIQUE KEY `uk_group_id` (`group_id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8 COLLATE=utf8_bin COMMENT='集群、各Group容量信息表';
/******************************************/
/* 数据库全名 = nacos_config */
/* 表名称 = his_config_info */
/******************************************/
CREATE TABLE `his_config_info` (
`id` bigint(64) unsigned NOT NULL,
`nid` bigint(20) unsigned NOT NULL AUTO_INCREMENT,
`data_id` varchar(255) NOT NULL,
`group_id` varchar(128) NOT NULL,
`app_name` varchar(128) DEFAULT NULL COMMENT 'app_name',
`content` longtext NOT NULL,
`md5` varchar(32) DEFAULT NULL,
`gmt_create` datetime NOT NULL DEFAULT CURRENT_TIMESTAMP,
`gmt_modified` datetime NOT NULL DEFAULT CURRENT_TIMESTAMP,
`src_user` text,
`src_ip` varchar(50) DEFAULT NULL,
`op_type` char(10) DEFAULT NULL,
`tenant_id` varchar(128) DEFAULT '' COMMENT '租户字段',
PRIMARY KEY (`nid`),
KEY `idx_gmt_create` (`gmt_create`),
KEY `idx_gmt_modified` (`gmt_modified`),
KEY `idx_did` (`data_id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8 COLLATE=utf8_bin COMMENT='多租户改造';
/******************************************/
/* 数据库全名 = nacos_config */
/* 表名称 = tenant_capacity */
/******************************************/
CREATE TABLE `tenant_capacity` (
`id` bigint(20) unsigned NOT NULL AUTO_INCREMENT COMMENT '主键ID',
`tenant_id` varchar(128) NOT NULL DEFAULT '' COMMENT 'Tenant ID',
`quota` int(10) unsigned NOT NULL DEFAULT '0' COMMENT '配额,0表示使用默认值',
`usage` int(10) unsigned NOT NULL DEFAULT '0' COMMENT '使用量',
`max_size` int(10) unsigned NOT NULL DEFAULT '0' COMMENT '单个配置大小上限,单位为字节,0表示使用默认值',
`max_aggr_count` int(10) unsigned NOT NULL DEFAULT '0' COMMENT '聚合子配置最大个数',
`max_aggr_size` int(10) unsigned NOT NULL DEFAULT '0' COMMENT '单个聚合数据的子配置大小上限,单位为字节,0表示使用默认值',
`max_history_count` int(10) unsigned NOT NULL DEFAULT '0' COMMENT '最大变更历史数量',
`gmt_create` datetime NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT '创建时间',
`gmt_modified` datetime NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT '修改时间',
PRIMARY KEY (`id`),
UNIQUE KEY `uk_tenant_id` (`tenant_id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8 COLLATE=utf8_bin COMMENT='租户容量信息表';
CREATE TABLE `tenant_info` (
`id` bigint(20) NOT NULL AUTO_INCREMENT COMMENT 'id',
`kp` varchar(128) NOT NULL COMMENT 'kp',
`tenant_id` varchar(128) default '' COMMENT 'tenant_id',
`tenant_name` varchar(128) default '' COMMENT 'tenant_name',
`tenant_desc` varchar(256) DEFAULT NULL COMMENT 'tenant_desc',
`create_source` varchar(32) DEFAULT NULL COMMENT 'create_source',
`gmt_create` bigint(20) NOT NULL COMMENT '创建时间',
`gmt_modified` bigint(20) NOT NULL COMMENT '修改时间',
PRIMARY KEY (`id`),
UNIQUE KEY `uk_tenant_info_kptenantid` (`kp`,`tenant_id`),
KEY `idx_tenant_id` (`tenant_id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8 COLLATE=utf8_bin COMMENT='tenant_info';
CREATE TABLE `users` (
`username` varchar(50) NOT NULL PRIMARY KEY,
`password` varchar(500) NOT NULL,
`enabled` boolean NOT NULL
);
CREATE TABLE `roles` (
`username` varchar(50) NOT NULL,
`role` varchar(50) NOT NULL,
UNIQUE INDEX `idx_user_role` (`username` ASC, `role` ASC) USING BTREE
);
CREATE TABLE `permissions` (
`role` varchar(50) NOT NULL,
`resource` varchar(255) NOT NULL,
`action` varchar(8) NOT NULL,
UNIQUE INDEX `uk_role_permission` (`role`,`resource`,`action`) USING BTREE
);
INSERT INTO users (username, password, enabled) VALUES ('nacos', '$2a$10$EuWPZHzz32dJN7jexM34MOeYirDdFAZm2kuWj7VEOJhhZkDrxfvUu', TRUE);
INSERT INTO roles (username, role) VALUES ('nacos', 'ROLE_ADMIN');修改nacos集群配置(节点信息)、数据库配置
三个nacos节点的地址
节点 ip port nacos1 127.0.0.1 8845 nacos2 127.0.0.1 8846 nacos3 127.0.0.1 8847 在nacos的conf目录下将cluster.conf.example改为cluster.conf
接着打开该文件,加入三个ip
1
2
3127.0.0.1:8845
127.0.0.1.8846
127.0.0.1.8847修改application.properties文件,添加数据库配置
1
2
3
4
5
6
7
8
9# nacos集群数据库配置
spring.datasource.platform=mysql
db.num=1
db.url.0=jdbc:mysql://127.0.0.1:3306/nacos?characterEncoding=utf8&connectTimeout=1000&socketTimeout=3000&autoReconnect=true&useUnicode=true&useSSL=false&serverTimezone=UTC
db.user.0=root
db.password.0=333333
# nacos集群数据库配置复制三份,如下图,忽略nginx
分别修改三个nacos的application.properties,修改各自的ip(8845、8846、8847)
分别启动多个nacos节点
直接在各自的bin目录下打开控制台启动各自服务
1
startup.cmd
nginx反向代理
在nginx目录下修改相关配置文件conf/nginx.conf
1
2
3
4
5
6
7
8
9
10
11
12
13upstream nacos-cluster {
server 127.0.0.1:8845;
server 127.0.0.1:8846;
server 127.0.0.1:8847;
}
server {
listen 80;
server_name localhost;
location /nacos {
proxy_pass http://nacos-cluster;
}
}有些配置nginx已经有了,如下图所示,找好位置
在浏览器访问http://localhost/nacos即可
其他微服务向nacos注册服务访问的ip要有所改变
1 | spring: |
优化
实际部署时,需要给做反向代理的nginx服务器设置一个域名,这样后续如果有服务器迁移nacos的客户端也无需更改配置.
Nacos的各个节点应该部署到多个不同服务器,做好容灾和隔离
Feign远程调用
简介与使用
每次使用RestTemplate发起远程调用时都要去定义请求url,请求的参数少还好,一多起来,一大串字符串拼接就可以搞死一个人了,而且用RestTemplate编写的代码可读性较差,没有使用过的人会较难理解该代码含义
feign官网,feign可以帮助我们优雅的实现http请求的发送
使用步骤
在需要调用其他api的服务下引入依赖
1
2
3
4<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-openfeign</artifactId>
</dependency>启动类中开启feign注解
@EnableFeignClients
1
2
3
4
5
6
7
8
9
//todo 开启feign远程调用
public class OrderApplication {
public static void main(String[] args) {
SpringApplication.run(OrderApplication.class, args);
}
}编写feign请求接口
1
2
3
4
5// TODO: 2022/9/21 连接远程服务客户端
public interface UserClient {
User findById(long id);
}其中
@FeignClient
为请求的服务名称(内部会做服务地址解析),@GetMapping("/user/{id}")
为请求的路径以及请求参数,整个编码过程与SpringMVC类似,易上手使用
将上述接口注入需调用的controller,执行里面的查询语句即可
1
2
3
4
5
6
7
8
9
10
11
12
13
private UserClient userClient;
public Order queryOrderByUserIdFeign( { Long orderId)
// 根据id查询订单并返回
Order order = orderService.queryOrderById(orderId);
User user = userClient.findById(order.getUserId());// todo 利用feign发起服务请求
// 将查询结果封装在order
order.setUser(user);
// 返回最终结果
return order;
}自定义配置
feign支持的部分配置如下
类型 | 作用 | 说明 |
---|---|---|
feign.Logger.Level | 修改日志级别 | 包含四种不同的级别:NONE、BASIC、HEADERS、FULL |
feign.codec.Decoder | 响应结果的解析器 | http远程调用的结果做解析,例如解析json字符串为java对象 |
feign.codec.Encoder | 请求参数编码 | 将请求参数编码,便于通过http请求发送 |
feign. Contract | 支持的注解格式 | 默认是SpringMVC的注解 |
feign. Retryer | 失败重试机制 | 请求失败的重试机制,默认是没有,不过会使用Ribbon的重试 |
一般默认即可满足需求
有两种方式可以进行个性化配置
配置文件方式
日志各级别的含义
- NONE:不记录任何日志信息,这是默认值。
- BASIC:仅记录请求的方法,URL以及响应状态码和执行时间
- HEADERS:在BASIC的基础上,额外记录了请求和响应的头信息
- FULL:记录所有请求和响应的明细,包括头信息、请求体、元数据。
1 | #feign配置 |
代码方式
1 | //todo java形式实现feign配置 |
生效方式
全局生效
1
2
3
4
5
6
7
// TODO: 2022/9/21 feign配置全局生效
public class OrderApplication {
public static void main(String[] args) {
SpringApplication.run(OrderApplication.class, args);
}
}局部生效
1
2
3
4
5// TODO: 2022/9/21 feign配置局部生效
public interface UserClient {
User findById(long id);
}Feign使用优化
Feign底层发起http请求,依赖于其它的框架。其底层客户端实现包括:
- URLConnection:默认实现,不支持连接池
- Apache HttpClient :支持连接池
- OKHttp:支持连接池
连接池可以提高性能,毕竟,你每次请求是要建立连接,三握四握很耗费性能的(后续需补原理),所以这里为了提高性能,会采用Apache HttpClient
使用步骤
引入依赖
1
2
3
4
5<!--httpClient的依赖 -->
<dependency>
<groupId>io.github.openfeign</groupId>
<artifactId>feign-httpclient</artifactId>
</dependency>配置连接池
1
2
3
4
5feign:
httpclient:
enabled: true # 开启feign对HttpClient的支持
max-connections: 200 # 最大的连接数
max-connections-per-route: 50 # 每个路径的最大连接数
为了进一步提高feign的性能,可以将日志输出设置为none
或basic
Feign最佳实践
以后要是有多个服务都要请求userservice的相关api时,我们就得重复书写以下代码,如何简化这些操作呢,否则,上千个微服务要调同一个服务时,你就得重复写同样的代码上千次?
继承方式
这种方式的缺点,图中英文已经指出,多个微服务继承同一个接口,造成了耦合提升,这是不符合规范以及后期维护和服务分离的,以及一些其他的内容里面有讲(注解等麻烦)
抽取方式
将FeignClient抽取为独立模块,并且把接口有关的POJO、默认的Feign配置都放到这个模块中,提供给所有消费者使用
这思想有点类似于Vue组件,越抽越小,一个小组件可以复用于多个父组件
优点就是可复用,缺点就是可能有许多内部api其他服务不需要,会造成浪费
基于抽取的实践
创建新maven子项目
引入feign依赖
1
2
3
4<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-openfeign</artifactId>
</dependency>将服务消费方(order-service)中编写(可复用的)的UserClient、User、DefaultFeignConfiguration复制到feign-api项目中
在服务消费方(order-service)中使用feign-api(删除order-service中的UserClient、User、DefaultFeignConfiguration等类或接口),引入创建的feign-api依赖(若引入失败可尝试
package
一下feign-api,以及查看其父子依赖是否成功)1
2
3
4
5<dependency>
<groupId>cn.itcast.demo</groupId>
<artifactId>feign-api</artifactId>
<version>1.0</version>
</dependency>修改相关导包
扫描包(类)
这里因为UserClient现在在cn.itcast.feign.clients包下,而order-service的@EnableFeignClients注解是在cn.itcast.order包下,不在同一个包,无法扫描到UserClient,所以我们需要指定Feign扫描的包,两种方式,一种指定包,一种到具体的类
简介与功能
Springcloud Gateway是Spring Cloud的一个全新项目,基于Spring5.0+SpringBoot2.0和Project Reactor 等技术开发的网关,它旨在为微服务架构提供一种简单有效的统一的API路由管理方式,提供统一的路由方式且基于Filter链的方式提供了网关基本的功能,例如:安全,监控/指标,限流等
网关功能
- 身份认证和权限校验
- 服务路由、负载均衡
- 请求限流(防止请求流量过高时服务爆炸)
技术实现
- gateway
- zuul
zuul为阻塞式编程、gateway为响应式编程(后边补充编程类型)
gateway快速入门
创建网关项目(简单的maven项目,继承于spring-cloud)
引入依赖
1
2
3
4
5
6
7
8
9
10<!--网关-->
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-gateway</artifactId>
</dependency>
<!--nacos服务发现依赖-->
<dependency>
<groupId>com.alibaba.cloud</groupId>
<artifactId>spring-cloud-starter-alibaba-nacos-discovery</artifactId>
</dependency>编写启动类
1
2
3
4
5
6
7
8
9
10
11package cn.itcast.gateway;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
public class GatewayApplication {
public static void main(String[] args) {
SpringApplication.run(GatewayApplication.class, args);
}
}编写基础配置和路由规则
网关端口、服务名称、nacos都是一些之前的配置,重点在gateway,里面包括配置路由,routers为一个数组,id为路由id、uri为路由地址,predicates是设置判断请求是否符合路由规则的条件
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19server:
port: 10010 # 网关端口
spring:
application:
name: gateway # 服务名称
cloud:
nacos:
server-addr: localhost:8848 # nacos地址
gateway:
routes: # 网关路由配置
- id: user-service # 路由id,自定义,只要唯一即可
# uri: http://127.0.0.1:8081 # 路由的目标地址 http就是固定地址
uri: lb://userservice # 路由的目标地址 lb就是负载均衡,后面跟服务名称
predicates: # 路由断言,也就是判断请求是否符合路由规则的条件
- Path=/user/** # 这个是按照路径匹配,只要以/user/开头就符合要求
- id: order-service
uri: lb://orderservice
predicates:
- Path=/order/**网关路由的流程图
断言工厂
我们在配置文件中写的断言规则(predicates
)只是字符串,这些字符串会被Predicate Factory读取并处理,转变为路由判断的条件
名称 | 说明 | 示例 |
---|---|---|
After | 是某个时间点后的请求 | - After=2037-01-20T17:42:47.789-07:00[America/Denver] |
Before | 是某个时间点之前的请求 | - Before=2031-04-13T15:14:47.433+08:00[Asia/Shanghai] |
Between | 是某两个时间点之前的请求 | - Between=2037-01-20T17:42:47.789-07:00[America/Denver], 2037-01-21T17:42:47.789-07:00[America/Denver] |
Cookie | 请求必须包含某些cookie | - Cookie=chocolate, ch.p |
Header | 请求必须包含某些header | - Header=X-Request-Id, \d+ |
Host | 请求必须是访问某个host(域名) | - Host=.somehost.org,.anotherhost.org |
Method | 请求方式必须是指定方式 | - Method=GET,POST |
Path | 请求路径必须符合指定规则 | - Path=/red/{segment},/blue/** |
Query | 请求参数必须包含指定参数 | - Query=name, Jack或者- Query=name |
RemoteAddr | 请求者的ip必须是指定范围 | - RemoteAddr=192.168.1.1/24 |
Weight | 权重处理 |
path就是对路径做判断的断言规则,我们可以尝试其他规则-Between
1 | spring: |
定义了该规则的路由在2037-01-20这个时间点前访问将会报404
过滤器工厂
GatewayFilter是网关中提供的一种过滤器,可以对进入网关的请求和微服务返回的响应做处理
Spring提供了33种不同的路由过滤器工厂,如下图
以AddRequestHeader演示
需求:给所有进入userservice的请求添加一个请求头:Truth=itcast is freaking awesome!
1 | spring: |
当前过滤器会在每次请求userservice路由时添加请求头
默认过滤器
默认过滤器对所有路由生效,通过该网关访问所有请求时会加上该请求头
1 | spring: |
全局过滤器
全局过滤器的作用也是处理一切进入网关的请求和微服务响应,与GatewayFilter的作用一样。区别在于GatewayFilter通过配置定义,处理逻辑是固定的。而GlobalFilter的逻辑需要自己写代码实现。定义方式是实现GlobalFilter接口
通过全局过滤器可以实现如以下需求
- 登录状态判断
- 权限校验
- 请求限流等
模拟请求验证案例
1 |
|
过滤器执行顺序
每一个过滤器都必须指定一个int类型的order值,order值越小,优先级越高,执行顺序越靠前。
GlobalFilter通过实现Ordered接口,或者添加@Order注解来指定order值,由我们自己指定
接口实现:
1
2
3
4
5
6
7
8
9
10
11
12
13
//@Order(-1) //order为过滤器级别,数字越小级别越高
public class AuthorizeFilter implements GlobalFilter, Ordered {
public Mono<Void> filter(ServerWebExchange exchange, GatewayFilterChain chain) {
//...
}
public int getOrder() {
return 1;
}
}路由过滤器和defaultFilter的order由Spring指定,默认是按照声明顺序从1递增
当过滤器的order值一样时,按照下图所示依次执行:默认过滤器>路由过滤器>全局过滤器
详细内容,可以查看源码(后面补充):
org.springframework.cloud.gateway.route.RouteDefinitionRouteLocator#getFilters()
方法是先加载defaultFilters,然后再加载某个route的filters,然后合并。
org.springframework.cloud.gateway.handler.FilteringWebHandler#handle()
方法会加载全局过滤器,与前面的过滤器合并后根据order排序,组织过滤器链
跨域问题
gateway服务解决跨域
模拟跨域,使用vscode的live-server插件在5501端口打开
1 |
|
后端gateway服务的application.yml配置
1 | spring: |
Docker
简介与作用
Docker 是一个开源的应用容器引擎,让开发者可以打包他们的应用以及依赖包到一个可移植的镜像中,然后发布到任何流行的 Linux或Windows操作系统的机器上,也可以实现虚拟化。容器是完全使用沙箱机制,相互之间不会有任何接口
大型项目组件较多,运行环境也较为复杂,部署时会碰到一些问题:
- 依赖关系复杂,容易出现兼容性问题
- 开发、测试、生产环境有差异
Docker为了解决依赖的兼容问题的,采用了两个手段:
将应用的Libs(函数库)、Deps(依赖)、配置与应用一起打包
将每个应用放到一个隔离容器去运行,避免互相干扰
Docker如何解决不同系统环境的问题?
- Docker将用户程序与所需要调用的系统(比如Ubuntu)函数库一起打包
- Docker运行到不同操作系统时,直接基于打包的函数库,借助于操作系统的Linux内核来运行
Docker是一个快速交付应用、运行应用的技术,具备下列优势:
- 可以将程序及其依赖、运行环境一起打包为一个镜像,可以迁移到任意Linux操作系统
- 运行时利用沙箱机制形成隔离容器,各个应用互不干扰
- 启动、移除都可以通过一行命令完成,方便快捷
Docker和虚拟机的区别
虚拟机(virtual machine)是在操作系统中模拟硬件设备,然后运行另一个操作系统,比如在 Windows 系统里面运行 Ubuntu 系统,这样就可以运行任意的Ubuntu应用了。
Docker仅仅是封装函数库,并没有模拟完整的操作系统
Docker和虚拟机的差异:
- docker是一个系统进程;虚拟机是在操作系统中的操作系统
- docker体积小、启动速度快、性能好;虚拟机体积大、启动速度慢、性能一般
Docker架构
镜像与容器
镜像(Image):Docker将应用程序及其所需的依赖、函数库、环境、配置等文件打包在一起,称为镜像。
容器(Container):镜像中的应用程序运行后形成的进程就是容器,只是Docker会给容器进程做隔离,对外不可见。
DockerHub
开源应用程序非常多,打包这些应用往往是重复的劳动。为了避免这些重复劳动,人们就会将自己打包的应用镜像,例如Redis、MySQL镜像放到网络上,共享使用,就像GitHub的代码共享一样。
DockerHub:DockerHub是一个官方的Docker镜像的托管平台。这样的平台称为Docker Registry。
Docker架构
Docker是一个CS架构的程序,由两部分组成:
服务端(server):Docker守护进程,负责处理Docker指令,管理镜像、容器等
客户端(client):通过命令或RestAPI向Docker服务端发送指令。可以在本地或远程向服务端发送指令。
Docker在centos的安装
移步安装教程->Centos7安装Docker.md
查看docker运行状态
1 | docker version |
Docker的基本操作
镜像
镜像名称一般分两部分组成:[repository]:[tag]
在没有指定tag时,默认是latest,代表最新版本的镜像
Docker操作镜像结构图
拉取镜像
从docker hub查找对应镜像拉取指令
xshell执行该命令
1 | docker pull nginx |
查看现有镜像
1 | docker images |
导出与加载本地镜像
- 利用docker xx –help命令查看docker save和docker load的语法
- 使用docker save导出镜像到磁盘
- 使用docker load加载镜像
1 | docker save -o nginx.tar |
1 | docker load -i nginx.tar |
镜像操作指令
docker images
查看镜像
docker rmi
移除镜像
docker pull
拉取镜像
docker build -t 镜像名:版本 . (.为当前目录)
构建镜像
docker push
推送镜像
docker save
打包镜像
docker load
加载镜像压缩包
docker rmi –force [-f] $(docker images | grep 共有关键字 | awk ‘{print $3}’)
批量删除含共有关键字的镜像
容器操作
容器操作结构图
以运行一个nginx为例,可以通过dockerHub查找相关容器运行指令
1 | docker run --name containerName -p 80:80 -d nginx |
命令解读:
docker run :创建并运行一个容器
–name : 给容器起一个名字,比如叫做mn
-p :将宿主机端口与容器端口映射,冒号左侧是宿主机端口,右侧是容器端口
-d:后台运行容器
nginx:镜像名称,例如nginx
进入容器并修改容器内容(以修改nginx的html为例)
进入容器
1
docker exec -it mn bash
命令解读:
- docker exec :进入容器内部,执行一个命令
- -it : 给当前进入的容器创建一个标准输入、输出终端,允许我们与容器交互
- mn :要进入的容器的名称
- bash:进入容器后执行的命令,bash是一个linux终端交互命令
进入html目录
1
cd /usr/share/nginx/html
修改html文件内容
1
2sed -i 's#Welcome to nginx#你是个什么东西😒#g' index.html
sed -i 's#<head>#<head><meta charset="utf-8">#g' index.html
注意:xec命令可以进入容器修改文件,但是在容器内修改文件是不推荐的
操作redis容器
运行redis
1 | docker run --name rd -p 6379:6379 -d redis redis-server --save 60 1 --loglevel warning |
进入redis 并运行redis-cli
1 | docker exec -it rd redis-cli |
执行redis操作
1 | set num 666 |
退出
1 | exit |
容器操作指令
docker logs
查看容器日志
添加 -f 参数可以持续查看日志
docker ps
查看容器状态码
docker exec -it [容器名] [要执行的命令]
进入容器
docker rm
删除容器
不能删除运行中的容器,除非添加 -f 参数
docker rm $(docker ps -aq)
删除所有未使用的容器
数据卷(容器数据管理)
容器与数据耦合的问题
数据卷(volume)是一个虚拟目录,指向宿主机文件系统中的某个目录
我们就可以直接操作宿主机文件系统下的文件,从而使得容器与数据分离,解耦合,方便操作容器内数据,保证数据安全
数据卷操作
数据卷操作的基本语法:docker volume [COMMAND]
docker volume
命令是数据卷操作,根据命令后跟随的command
来确定下一步的操作:
create 创建一个volume
inspect 显示一个或多个volume的信息
ls 列出所有的volume
prune 删除未使用的volume
rm 删除一个或多个指定的volume
挂载数据卷
在创建容器时,可以通过 -v 参数来挂载一个数据卷到某个容器内目录,没有该数据卷时docker会帮我们创建一个
1 | docker run --name mn -v html:/usr/share/nginx/html -p80:80 -d nginx |
上述内容是将html数据卷挂载到容器的/usr/share/nginx/html目录
修改容器的数据卷内容
1 | # 查看html数据卷的位置 |
目录挂载
可以越过volumes直接将宿主机目录挂载起来
目录挂载与数据卷挂载的语法是类似的:
- -v [宿主机目录]:[容器内目录]
- -v [宿主机文件]:[容器内文件]
- -v [volume名称]:[容器内目录]
两者优劣:
- 数据卷挂载耦合度低,由docker来管理目录,但是目录较深,不好找
- 目录挂载耦合度高,需要我们自己管理目录,不过目录容易寻找查看
创建并运行一个MySQL容器,将宿主机目录直接挂载到容器
通过拉取(pull)或者本地有镜像直接加载(load)
1
docker load -i mysql.tar
查看镜像是否加载
1
docker images
创建目录用于挂载
/tmp/mysql/data
/tmp/mysql/conf
将自定义的配置文件放入conf目录下:hmy.cnf
运行容器,通过docker hub可以查询相关操作
查看mysql容器conf所在目录
通过
-e MYSQL_ROOT_PASSWORD=你的密码
可以设置mysql登陆密码完整执行命令
1
2
3
4
5
6
7
8docker run \
--name mysql \
-e MYSQL_ROOT_PASSWORD=123 \
-p 3306:3306 \
-v /tmp/mysql/conf:/etc/mysql/conf.d \
-v /tmp/mysql/data:/var/lib/mysql \
-d \
mysql:5.7.25Docker自定义镜像
简介
我们自己也可以构建Docker镜像,但需要首先了解镜像的结构
镜像是将应用程序及其需要的系统函数库、环境、配置、依赖打包而成
构建镜像其实就是将上图所示的各层打包形成一个镜像的过程
DockerFile
Dockerfile reference | Docker Documentation
Dockerfile就是一个文本文件,其中包含一个个的指令(Instruction),用指令来说明要执行什么操作来构建镜像。每一个指令都会形成一层Layer,一下是常见的指令
指令 | 说明 | 示例 |
---|---|---|
FROM | 指定基础镜像 | FROM centos:6 |
ENV | 设置环境变量,可在后面指令使用 | ENV key value |
COPY | 拷贝本地文件到镜像的指定目录 | COPY ./mysql-5.7.rpm /tmp |
RUN | 执行Linux的shell命令,一般是安装过程的命令 | RUN yum install gcc |
EXPOSE | 指定容器运行时监听的端口,是给镜像使用者看的 | EXPOSE 8080 |
ENTRYPOINT | 镜像中应用的启动命令,容器运行时调用 | ENTRYPOINT java -jar xx.jar |
DockerFile示例
1 | # 指定基础镜像 |
构建Java项目
一、基于Ubuntu构建
新建一个空文件夹
1
mkdir docker-demo
拷贝jar文件到这个目录
拷贝jdk8.tar.gz文件到这个目录
创建或拷贝Dockerfile到这个目录
dockerfile文件内容如下
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22# 指定基础镜像
FROM ubuntu:16.04
# 配置环境变量,JDK的安装目录
ENV JAVA_DIR=/usr/local
# 拷贝jdk和java项目的包
COPY ./jdk8.tar.gz $JAVA_DIR/
COPY ./docker-demo.jar /tmp/app.jar
# 安装JDK
RUN cd $JAVA_DIR \
&& tar -xf ./jdk8.tar.gz \
&& mv ./jdk1.8.0_144 ./java8
# 配置环境变量
ENV JAVA_HOME=$JAVA_DIR/java8
ENV PATH=$PATH:$JAVA_HOME/bin
# 暴露端口
EXPOSE 8090
# 入口,java项目的启动命令
ENTRYPOINT java -jar /tmp/app.jar使用命令进入目录
1
cd /tmp/docker-demo
运行命令:docker build -t 镜像名:版本 . (.为当前目录)
使用docker run创建容器并运行
1
docker run --name javaweb -p 8090:8090 -d javaweb:1.0
二、基于java8构建Java项目
在DockerFile里,很多项目都是基于java8进行环境配置的,也就是说,下图所含内容都是重复性工作,每个项目都这么弄会重复工作,所以docker提供了镜像免去这些配置
新建一个空的目录,然后在目录中新建一个文件,命名为Dockerfile
1
2mkdir docker-demo
touch Dockerfile拷贝jar包到这个目录中
编写Dockerfile文件:
- 基于java:8-alpine作为基础镜像
- 将app.jar拷贝到镜像中
- 暴露端口
- 编写入口ENTRYPOINT
1
2
3
4FROM java:8-alpine
COPY ./docker-demo.jar /tmp/app.jar
EXPOSE 8090
ENTRYPOINT java -jar /tmp/app.jar使用docker build命令构建镜像
1
docker build -t javaweb:2.0 .
使用docker run创建容器并运行
1
docker run --name javaweb -p 8090:8090 -d javaweb:1.0
总结:
Dockerfile的本质是一个文件,通过指令描述镜像的构建过程
Dockerfile的第一行必须是FROM,从一个基础镜像来构建
基础镜像可以是基本操作系统,如Ubuntu。也可以是其他人制作好的镜像,例如:java:8-alpine
Docker-Compose
可爱的🐙章鱼镇楼
简介
当我们有多个微服务需要构建时,用自定义镜像的方式去一个个构建难免有点麻烦,Docker-Compose可以帮助我们批量处理,只需要定义好各个包的相关属性即可
Docker Compose可以基于Compose文件帮我们快速的部署分布式应用,而无需手动一个个创建和运行容器
Compose文件是一个文本文件(yaml格式),通过指令定义集群中的每个容器如何运行
DockerCompose的详细语法参考官网
下面的Compose文件就描述一个项目,其中包含两个容器:
- mysql:一个基于
mysql:5.7.25
镜像构建的容器,并且挂载了两个目录 - web:一个基于
docker build
临时构建的镜像容器,映射端口8090(注意:构建自己的项目默认会去找app.jar,所以打包后的java项目一定要命名为app)
1 | version: "3.8" |
DockerCompose文件可以看做是将多个docker run命令写到一个文件,只是语法稍有差异
安装
移步安装教程->Centos7安装Docker.md
相关操作
docker-compose stop 服务名
停止服务,服务名以docker-compose.yml文件里面为准
docker-compose down
上述命令将会停止并删除docker-compose.yml文件中定义的所有容器,同时删除可能存在的网络、卷和映射
docker-compose stop
停止当前目录下所有镜像
docker-compose up -d 服务名
启动某个镜像,若不指定则启动所有镜像
docker-compose build
构建镜像
构建微服务集群(重点)
PS:踩了一天的坑,差点🧚其中,😅
实现思路如下:
创建如下目录
1
2
3
4
5
6
7
8
9
10
11
12
13
14-项目名
-微服务名1
-微服务名2
-微服务名...
-mysql
-conf
-data
docker-compose.yml我的项目如下所示:
每个微服务目录下新建Dockerfile文件,并填入以下内容
1
2
3FROM java:8-alpine
COPY ./app.jar /tmp/app.jar
ENTRYPOINT java -jar /tmp/app.jar含义是根据java:8-alpine进行打包,并构建当前目录下的app.jar包,所以,接下来你知道了吧,将每个微服务打包成名为app.jar包
用idea在每个微服务的pom.xml文件夹下修改或添加以下内容,用于将当前微服务打包为app.jar,记住,根目录下的就不要加这个东西了,在微服务里加
1
2
3
4
5
6
7
8
9<build>
<finalName>app</finalName>
<plugins>
<plugin>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>
</plugin>
</plugins>
</build>既然来到了idea,我们需要再改个东西,每个微服务请求的其他服务理应都该是服务名称,否则一旦部署到其他环境,ip地址改变,我们就无从下手了(当然肯定有办法的只是比较麻烦),这里的nacos以及mysql等一些需要请求的服务ip都改为服务名称
1
2
3
4spring:
datasource:
#url: jdbc:mysql://localhost:3306/cloud_user?useSSL=false
url: jdbc:mysql://mysql:3306/cloud_user?useSSL=false #用于docker部署设置的微服务名1
2
3
4
5
6spring:
cloud:
nacos:
#server-addr: localhost:8848 # Nacos地址
#server-addr: localhost:80 # nginx反向代理Nacos
server-addr: nacos:8848 # 用于docker部署设置的微服务名然后就是进行打包操作了,gogogo,在项目根目录执行package操作
然后,将打包后的各个jar包拖至前面创建好的哥哥微服务目录下
接下来配置下mysql,mysql目录下的conf目录是用于存储相关的配置文件,文件命名为
hmy.cnf
,文件具体内容可以从网上查找,也可以直接复制下面的data目录用于存储一些mysql的数据,这里先不管了,后面构建完再创建也行
1
2
3
4
5[mysqld]
skip-name-resolve
character_set_server=utf8
datadir=/var/lib/mysql
server-id=1000然后,进到docker-compose.yml文件里,我们需要配置各个微服务的构建
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26version: "3.2"
services:
nacos:
image: nacos/nacos-server
environment:
MODE: standalone
ports:
- "8848:8848"
mysql:
image: mysql:5.7.25
environment:
MYSQL_ROOT_PASSWORD: 123456
volumes:
- "$PWD/mysql/data:/var/lib/mysql"
- "$PWD/mysql/conf:/etc/mysql/conf.d/"
ports:
- "3306:3306"
userservice:
build: ./user-service
orderservice:
build: ./order-service
gateway:
build: ./gateway
ports:
- "10010:10010"至此,整体文件创建配置完成,下面就要到服务器上的配置了,整体目录结构如下
将整个目录上传至虚拟机,在目录根路径下利用
docker-compose up -d
来部署,你可以输入docker-compose --help
去查看相关操作,这时候,项目就会自动化构建并运行了还没完,这时候我们通过
docker-compose logs 某个服务名
会发现服务会报错,因为在这里,nacos启动比其他微服务慢了,导致其他项目启动时没发现nacos,无法注册服务导致抛出异常,所以在这里我们需要重启下其他微服务,重启前最好重启下nacos服务,保证在浏览器能访问到http://服务器地址:8848/nacos/index.html
后再进行下一步1
2docker-compose restart nacos
docker-compose restart 微服务1 微服务2 微服务...在上一步完成后,服务相关的设置就已经ok了,但是这时的mysql还没有数据,我们需要在自己电脑上将本地的数据进行sql导出,然后连接上linux里面的数据库,将数据导入
大功告成,浏览器输入地址享用吧,在这时你也可以将mysql的data目录进行备份处理,这样在其他的系统就可以直接拷贝过去,无需执行第十步了
Docker镜像仓库
简介
registry - Official Image | Docker Hub
镜像仓库( Docker Registry )有公共的和私有的两种形式:
- 公共仓库:例如Docker官方的 Docker Hub,国内也有一些云服务商提供类似于 Docker Hub 的公开服务,比如 网易云镜像服务、DaoCloud 镜像服务、阿里云镜像服务等。
- 私有仓库:用户在本地搭建的私有 Docker Registry,企业自己的镜像最好是采用私有Docker Registry来实现
安装并搭建私有镜像仓库
移步安装教程->Centos7安装Docker.md
推送镜像和拉取镜像
记住:记得重新打包(tag)时,一定要加上镜像仓库地址(如下)
tag本地镜像
1 | docker tag nginx:latest 你的ip:8080/nginx:1.0 |
推送镜像
1 | docker push 你的ip:8080/nginx:1.0 |
拉取镜像
1 | docker pull 你的ip:8080/nginx:1.0 |
RabbitMQ-服务异步通讯
同步通讯和异步通讯
同步通讯和异步通讯的区别:
Feign调用就属于同步方式
同步调用的优点:时效性较强,可以立即得到结果
但是同步调用存在以下问题
异步调用常见实现就是事件驱动模式
异步通信的优点:
耦合度低
每个服务都可以灵活插拔,可替换
吞吐量提升
无需等待订阅者处理完成,响应更快速
故障隔离
服务没有直接调用,不存在级联失败问题
流量削峰
不管发布事件的流量波动多大,都由Broker接收,订阅者可以按照自己的速度去处理事件
调用间没有阻塞
不会造成无效的资源占用
异步通信的缺点:
- 依赖于Broker的可靠性、安全性、吞吐能力
- 架构复杂了,业务没有明显的流程线,不好追踪管理
RabbitMQ
MQ含义:
MQ,中文是消息队列(MessageQueue),字面来看就是存放消息的队列。也就是事件驱动架构中的Broker
RabbitMQ
RabbitMQ是实现了高级消息队列协议(AMQP)的开源消息代理软件(亦称面向消息的中间件)。RabbitMQ服务器是用Erlang语言编写的,而集群和故障转移是构建在开放电信平台框架上的。所有主要的编程语言均有与代理接口通讯的客户端库
常见的MQ技术以及对比
RabbitMQ | ActiveMQ | RocketMQ | Kafka | |
---|---|---|---|---|
公司/社区 | Rabbit | Apache | 阿里 | Apache |
开发语言 | Erlang | Java | Java | Scala&Java |
协议支持 | AMQP,XMPP,SMTP,STOMP | OpenWire,STOMP,REST,XMPP,AMQP | 自定义协议 | 自定义协议 |
可用性 | 高 | 一般 | 高 | 高 |
单机吞吐量 | 一般 | 差 | 高 | 非常高 |
消息延迟 | 微秒级 | 毫秒级 | 毫秒级 | 毫秒以内 |
消息可靠性 | 高 | 一般 | 高 | 一般 |
RabbitMQ结构图
RabbitMQ中的一些角色:
- publisher:生产者,发送消息的程序
- consumer:消费者,订阅队列
- exchange:交换机,负责消息路由,一方面,接收生产者发送的消息。另一方面,知道如何处理消息,例如递交给某个特别队列、递交给所有队列、或是将消息丢弃。到底如何操作,取决于Exchange的类型
- queue:队列,存储消息
- virtualHost:虚拟主机,隔离不同租户的exchange、queue、消息的隔离
RabbitMQ单机版安装
这里在Centos7使用Docker安装
下载镜像
1
docker pull rabbitmq:3-management
安装并运行Rabbit容器
1
2
3
4
5
6
7
8
9docker run \
-e RABBITMQ_DEFAULT_USER=dong \
-e RABBITMQ_DEFAULT_PASS=333333 \
--name mq \
--hostname mq1 \
-p 15672:15672 \
-p 5672:5672 \
-d \
rabbitmq:3-management浏览器输入
你的ip:15672登录访问即可
,登录名和密码分别对应RABBITMQ_DEFAULT_USER
和RABBITMQ_DEFAULT_PASS
RabbitMQ消息模型
入门案例(基本消息队列)
官方的HelloWorld是基于最基础的消息队列模型来实现的,只包括三个角色:
- publisher:消息发布者,将消息发送到队列queue
- queue:消息队列,负责接受并缓存消息
- consumer:订阅队列,处理队列中的消息
publisher(发布者代码)
思路:
- 建立连接
- 创建Channel
- 声明队列
- 发送消息
- 关闭连接和channel
1 | package cn.itcast.mq.helloworld; |
consumer(消费者代码)
思路:
- 建立connection
- 创建channel
- 利用channel声明队列
- 定义consumer的消费行为handleDelivery()
- 利用channel将消费者与队列绑定
消费者和发布者都会建立连接以及创建通道和队列,原因是消费者不知道发布者是否创建了上述东西,所以为了保证存在所以需要创建
1 | public class ConsumerTest { |
主要在以下选项执行,可以通过断点查看各个状态
SpringAMQP
SpringAMQP是基于RabbitMQ封装的一套模板,并且还利用SpringBoot对其实现了自动装配
SpringAMQP提供了三个功能:
- 自动声明队列、交换机及其绑定关系
- 基于注解的监听器模式,异步接收消息
- 封装了RabbitTemplate工具,用于发送消息
Basic Queue 简单队列模型
引入依赖
1
2
3
4
5<!--AMQP依赖,包含RabbitMQ-->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>配置文件
在消息发送方和消息接收方都需配置
1
2
3
4
5
6
7spring:
rabbitmq:
host: 192.168.150.101 # 主机名
port: 5672 # 端口
virtual-host: / # 虚拟主机
username: dong # 用户名
password: 333333 # 密码消息发送
注意:消息发送前需在rabbit创建队列名称
1
2
3
4
5
6
7
8
9
public void testSimpleQueue() {
//队列名称
String queueName = "simple.queue";
//消息
String message = "hello, spring amqp!";
//发送信息
rabbitTemplate.convertAndSend(queueName, message);
}消息接收
定义一个SpringRabbitListener类并注入为组件
使用
RabbitListener
注解去实现监听,注解参数添加监听的队列,通过参数接收监听到的值1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20package cn.itcast.mq.listener;
import org.springframework.amqp.core.ExchangeTypes;
import org.springframework.amqp.rabbit.annotation.Exchange;
import org.springframework.amqp.rabbit.annotation.Queue;
import org.springframework.amqp.rabbit.annotation.QueueBinding;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
import java.time.LocalDateTime;
import java.util.Map;
public class SpringRabbitListener {
public void listenSimpleQueueMessage(String msg) {
System.out.println(msg);
}
}
注意:消息一旦消费就会从队列删除,RabbitMQ没有消息回溯功能
Work Queue 工作队列模型
Work queue工作队列:让多个消费者绑定到一个队列,共同消费队列中的消息,可以提高消息处理速度,避免队列消息堆积
消息接收者,在这里定义两个,分别为消费者一和消费者二,同时监听simple.queue队列,两者区别是消费者1每20毫秒就能处理一个信息,消费者2每100毫秒才能处理一个信息,通过这样模拟两者的性能差异,消费者一比消费者二性能优秀些
1 |
|
消息发布者:这里模拟一秒内每隔20毫秒发布一个消息,总共会发布50个消息
1 | // TODO: 2022/9/26 模拟工作队列 |
进行测试后发现:消息会被平均分,每个消费者预取25条信息(这里涉及到一个专业名词-消息预取,即消费者不管自己处理能力如何,都会一股脑将发布者的消息取过来,放在自己队列中,一个一个处理。)每个消费者处理一半消息,这样就造成了消费者1很快完成了自己的25条消息。消费者2却在缓慢的处理自己的25条消息
进行能者多劳式分配
1 | spring: |
发布订阅模型
发布订阅模式与之前案例的区别就是允许将同一消息发送给多个消费者。实现方式是加入了exchange(交换机)
常见exchange类型包括:
- Fanout:广播,将消息交给所有绑定到交换机的队列
- Direct:定向,把消息交给符合指定routing key 的队列
- Topic:通配符,把消息交给符合routing pattern(路由模式) 的队列
注意:exchange负责消息路由,而不是存储,路由失败则消息丢失
发布、订阅模型-Fanout
在广播模式下,消息发送流程是这样的:
- 1) 可以有多个队列
- 2) 每个队列都要绑定到Exchange(交换机)
- 3) 生产者发送的消息,只能发送到交换机,交换机来决定要发给哪个队列,生产者无法决定
- 4) 交换机把消息发送给绑定过的所有队列
- 5) 订阅队列的消费者都能拿到消息
实践:
编写配置类用于声明交换机和队列以及他们之间的绑定
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41package cn.itcast.mq.config;
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.FanoutExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
public class FanoutConfig {
// TODO: 2022/9/26 声明交换机
public FanoutExchange fanoutExchange() {
return new FanoutExchange("dong.fanout");
}
// TODO: 2022/9/26 声明队列
public Queue queue() {
return new Queue("fanout.queue1");
}
// TODO: 2022/9/26 交换机绑定队列
public Binding bindingQueue1(FanoutExchange fanoutExchange, Queue queue) {
return BindingBuilder.bind(queue).to(fanoutExchange);
}
// TODO: 2022/9/26 声明队列
public Queue queue2() {
return new Queue("fanout.queue2");
}
// TODO: 2022/9/26 交换机绑定队列
public Binding bindingQueue2(FanoutExchange fanoutExchange, Queue queue2) {
return BindingBuilder.bind(queue2).to(fanoutExchange);
}
}消息发送
这里的消息发送和之前有些许不同,是通过交换机进行convertAndSend方法的第二个参数为routingKey,即根据路由规则进行消息传送,后面会涉及
1
2
3
4
5
6
7
8// TODO: 2022/9/26 Fanout消息发送
public void testFanoutExchange() {
// TODO: 2022/9/26 交换机
String exchangeName = "dong.fanout";
Object message = "hello, everyone!";
rabbitTemplate.convertAndSend(exchangeName, "", message);
}消息接收
1
2
3
4
5
6
7
8
9
public void listenFanoutQueue1(String msg) throws InterruptedException {
System.err.println("消费者1:" + msg + "_" + LocalDateTime.now());
}
public void listenFanoutQueue2(String msg) throws InterruptedException {
System.err.println("消费者2:" + msg + "_" + LocalDateTime.now());
}发布、订阅模型-Direct
Direct Exchange 会将接收到的消息根据规则路由到指定的Queue,因此称为路由模式(routes)
- 每一个Queue都与Exchange设置一个BindingKey
- 发布者发送消息时,指定消息的RoutingKey
- Exchange将消息路由到BindingKey与消息RoutingKey一致的队列
实践:
消费者接收信息
这里抛弃上面的bean注入实现交换机和队列的绑定,换成通过注解去实现交换机和队列的绑定以及关键字的绑定
1 | // TODO: 2022/9/26 注解实现directQueue |
发布者发布信息
convertAndSend
方法第二个参数传入指定的路由规则,可以自定义尝试,查看结果
1 |
|
发布、订阅模型-Topic
TopicExchange与DirectExchange类似,区别在于routingKey必须是多个单词的列表,并且以 .
分割
Queue与Exchange指定BindingKey时可以使用通配符:
- #:代指0个或多个单词
- *:代指一个单词
实践:
消息接收者,与上面唯一的区别就是key通过字符串去定义
1 | // TODO: 2022/9/26 实现topicExchange |
消息发送者
1 |
|
消息转换器
Spring会把你发送的消息序列化为字节发送给MQ,接收消息的时候,还会把字节反序列化为Java对象
Spring的对消息对象的处理是由org.springframework.amqp.support.converter.MessageConverter
来处理的。而默认实现是SimpleMessageConverter
,基于JDK的ObjectOutputStream
完成序列化,存在以下问题
- 数据体积过大
- 有安全漏洞
- 可读性差
配置JSON转换器
通过json格式实现将多种类型数据转化为json字符串,提高消息可读性
引入json转换依赖
1
2
3
4
5
6<!--jackson-->
<dependency>
<groupId>com.fasterxml.jackson.dataformat</groupId>
<artifactId>jackson-dataformat-xml</artifactId>
<version>2.9.10</version>
</dependency>在消息发送方和消息接收方的启动类中都注入一个json转换的Bean,记得MessageConverter引入的包名为
org.springframework.amqp.support.converter.MessageConverter;
1
2
3
4
public MessageConverter jsonMessageConverter(){
return new Jackson2JsonMessageConverter();
}分布式搜索引擎
-Elasticsearch:官方分布式搜索和分析引擎 | Elastic
简介
Elasticsearch 是一个分布式、高扩展、高实时的搜索与数据分析引擎。它能很方便的使大量数据具有搜索、分析和探索的能力。充分利用Elasticsearch的水平伸缩性,能使数据在生产环境变得更有价值。Elasticsearch 的实现原理主要分为以下几个步骤,首先用户将数据提交到Elasticsearch 数据库中,再通过分词控制器去将对应的语句分词,将其权重和分词结果一并存入数据,当用户搜索数据时候,再根据权重将结果排名,打分,再将返回结果呈现给用户
任何搜索都可以基于该技术实现:如谷歌搜索、GitHub搜索、电商甚至地图等
ELK技术栈
elasticsearch结合kibana、Logstash、Beats,也就是elastic stack(ELK)。被广泛应用在日志数据分析、实时监控等领域
elasticsearch是elastic stack的核心,负责存储、搜索、分析数据,构建于 Lucence 之上,Lucence是Apache的开源搜索引擎类库,提供了搜索引擎的核心API
倒排索引
正向索引
但如果是基于title做模糊查询,只能是逐行扫描数据,流程如下:
1)用户搜索数据,条件是title符合"%手机%"
2)逐行获取数据,比如id为1的数据
3)判断数据中的title是否符合用户搜索条件
4)如果符合则放入结果集,不符合则丢弃。回到步骤1
倒排索引
创建倒排索引是对正向索引的一种特殊处理,流程如下:
- 将每一个文档的数据利用算法分词,得到一个个词条
- 创建表,每行数据包括词条、词条所在文档id、位置等信息
- 因为词条唯一性,可以给词条创建索引,例如hash表结构索引
倒排索引的搜索流程如下(以搜索”华为手机”为例):
1)用户输入条件"华为手机"
进行搜索。
2)对用户输入内容分词,得到词条:华为
、手机
。
3)拿着词条在倒排索引中查找,可以得到包含词条的文档id:1、2、3。
4)拿着文档id到正向索引中查找具体文档。
两者优缺点
正向索引:
- 优点:
- 可以给多个字段创建索引
- 根据索引字段搜索、排序速度非常快
- 缺点:
- 根据非索引字段,或者索引字段中的部分词条查找时,只能全表扫描。
倒排索引:
- 优点:
- 根据词条搜索、模糊搜索时,速度非常快
- 缺点:
- 只能给词条创建索引,而不是字段
- 无法根据字段做排序
elasticsearch
文档和字段
elasticsearch是面向文档存储的,可以是数据库中的一条商品数据,一个订单信息。
文档数据会被序列化为json格式后存储在elasticsearch中,而Json文档中往往包含很多的字段(Field),类似于数据库中的列
索引和映射
索引(Index):就是相同类型的文档的集合
映射(mapping):索引中文档的字段约束信息,类似表的结构约束
mysql和elasticsearch的对比
MySQL | Elasticsearch | 说明 |
---|---|---|
Table | Index | 索引(index),就是文档的集合,类似数据库的表(table) |
Row | Document | 文档(Document),就是一条条的数据,类似数据库中的行(Row),文档都是JSON格式 |
Column | Field | 字段(Field),就是JSON文档中的字段,类似数据库中的列(Column) |
Schema | Mapping | Mapping(映射)是索引中文档的约束,例如字段类型约束。类似数据库的表结构(Schema) |
SQL | DSL | DSL是elasticsearch提供的JSON风格的请求语句,用来操作elasticsearch,实现CRUD |
- Mysql:擅长事务类型操作,可以确保数据的安全和一致性
- Elasticsearch:擅长海量数据的搜索、分析、计算
两者用途
- 对安全性要求较高的写操作,使用mysql实现
- 对查询性能要求较高的搜索需求,使用elasticsearch实现
- 两者再基于某种方式,实现数据的同步,保证一致性
安装
SpringCloud\hotel-demo\资料\安装elasticsearch
索引库操作
mapping是对索引库中文档的约束,常见的mapping属性包括:
- type:字段数据类型,常见的简单类型有:
- 字符串:text(可分词的文本)、keyword(精确值,例如:品牌、国家、ip地址)
- 数值:long、integer、short、byte、double、float、
- 布尔:boolean
- 日期:date
- 对象:object
- index:是否创建索引,默认为true
- analyzer:使用哪种分词器
- properties:该字段的子字段
1 | { |
创建索引库和映射
基本语法:
- 请求方式:PUT
- 请求路径:/索引库名,可以自定义
- 请求参数:mapping映射
1 | PUT /索引库名称 |
查询索引库
基本语法:
- 请求方式:GET
- 请求路径:/索引库名
- 请求参数:无
1 | GET /索引库名 |
修改索引库
倒排索引结构虽然不复杂,但是一旦数据结构改变(比如改变了分词器),就需要重新创建倒排索引,因此索引库一旦创建,无法修改mapping
但是允许添加新的字段到mapping中,因为不会对倒排索引产生影响
1 | PUT /索引库名/_mapping |
删除索引库
语法:
- 请求方式:DELETE
- 请求路径:/索引库名
- 请求参数:无
1 | DELETE /索引库名 |
文档操作
新增文档
1 | POST /索引库名/_doc/文档id |
查询文档
1 | GET /{索引库名称}/_doc/{id} |
删除文档
1 | DELETE /{索引库名}/_doc/id值 |
修改文档
修改有两种方式:
- 全量修改:直接覆盖原来的文档,如无此数据,则会当作新增处理
- 增量修改:修改文档中的部分字段
全量修改
1 | PUT /{索引库名}/_doc/文档id |
增量修改
1 | POST /{索引库名}/_update/文档id |
RestAPI操作索引库
Elasticsearch Clients | Elastic
ES官方提供了各种不同语言的客户端,用来操作ES。这些客户端的本质就是组装DSL语句,通过http请求发送给ES
学习Java HighLevel Rest Client客户端API
-> hotel-demo
导入案例前需加入以下数据
-> tb_hotel.sql
初始化项目
引入es的RestHighLevelClient依赖:
1
2
3
4<dependency>
<groupId>org.elasticsearch.client</groupId>
<artifactId>elasticsearch-rest-high-level-client</artifactId>
</dependency>需要覆盖默认的ES版本
1
2
3
4<properties>
<java.version>1.8</java.version>
<elasticsearch.version>7.12.1</elasticsearch.version>
</properties>在测试类中创建@BeforeEach和@AfterEach方法,再每次测试前后都会去创建elastic客户端连接以及关闭连接操作
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17public class HotelIndexTest {
private RestHighLevelClient client;
//测试前
void setUp() {
// TODO: 2022/9/29 创建elastic客户端连接
this.client = new RestHighLevelClient(RestClient.builder(HttpHost.create("http://192.168.254.130:9200")));
}
//测试后
void tearDown() throws IOException {
// TODO: 2022/9/29 关闭连接
this.client.close();
}
}索引库操作 ->test/HotelIndexTest
创建索引库
删除索引库
1 | // TODO: 2022/9/29 删除索引 |
判断索引库是否存在
1 |
|
总结
JavaRestClient操作elasticsearch的流程基本类似。核心是client.indices()方法来获取索引库的操作对象。
索引库操作的基本步骤:
- 初始化RestHighLevelClient
- 创建XxxIndexRequest。XXX是Create、Get、Delete
- 准备DSL( Create时需要,其它是无参)
- 发送请求。调用RestHighLevelClient#indices().xxx()方法,xxx是create、exists、delete
RestClient操作文档
文档操作->HotelDocumentTest
初始化操作同索引库操作一致,唯一区别加入IHotelService进行mysql数据库查询操作
1 |
|
新增文档
1 | // TODO: 2022/9/29 新增文档 |
查询文档
1 | // TODO: 2022/9/29 查询文档 |
删除文档
1 | // TODO: 2022/9/29 删除文档 |
修改文档
修改文档数据有两种方式:
- 方式一:全量更新,再次写入id一样的文档,就会删除旧文档,添加新文档
- 方式二:局部更新,只更新部分字段,下面演示方式二
1 | // TODO: 2022/9/29 修改文档 |
批量导入文档
批量处理BulkRequest,其本质就是将多个普通的CRUD请求组合在一起发送
其中提供了一个add方法,用来添加其他请求:
可以看到,能添加的请求包括:
- IndexRequest,也就是新增
- UpdateRequest,也就是修改
- DeleteRequest,也就是删除
1 | // TODO: 2022/9/29 批量插入数据 |
DSL查询文档
简介
Elasticsearch提供了基于JSON的DSL(Domain Specific Language)来定义查询。常见的查询类型包括:
查询所有:查询出所有数据,一般测试用。例如:match_all
全文检索(full text)查询:利用分词器对用户输入内容分词,然后去倒排索引库中匹配。例如:
- match_query
- multi_match_query
精确查询:根据精确词条值查找数据,一般是查找keyword、数值、日期、boolean等类型字段。例如:
- ids
- range
- term
地理(geo)查询:根据经纬度查询。例如:
- geo_distance
- geo_bounding_box
复合(compound)查询:复合查询可以将上述各种查询条件组合起来,合并查询条件。例如:
- bool
- function_score
简单全部查询
1 | GET hotel/_search |
全文检索查询
全文检索查询的基本流程如下:
- 对用户搜索的内容做分词,得到词条
- 根据词条去倒排索引库中匹配,得到文档id
- 根据文档id找到文档,返回给用户
match单字段查询
1 | GET /indexName/_search |
案例:
指定字段查询
1 | GET hotel/_search |
对所有字段查询
1 | GET hotel/_search |
multi_match多字段查询
1 | GET /indexName/_search |
案例
1 | GET hotel/_search |
注意:multi_match搜索字段越多,对查询性能影响越大
精确匹配查询
精确查询一般是查找keyword、数值、日期、boolean等类型字段。所以不会对搜索条件分词。常见的有:
- term:根据词条精确值查询
- range:根据值的范围查询
term查询
精确查询的字段是不分词的字段,因此查询的条件也必须是不分词的词条
1 | // term查询 |
案例
1 | GET hotel/_search |
range查询
范围查询,一般应用在对数值类型做范围过滤的时候
1 | // range查询 |
案例
1 | GET hotel/_search |
地理查询
所谓的地理坐标查询,其实就是根据经纬度查询
矩形范围查询
geo_bounding_box查询,查询坐标落在某个矩形范围的所有文档
1 | // geo_bounding_box查询 |
案例
1 | GET hotel/_search |
点距离(缓冲区)查询
查询到指定中心点小于某个距离值的所有文档
1 | // geo_distance 查询 |
案例
1 | GET hotel/_search |
复合查询
复合(compound)查询:复合查询可以将其它简单查询组合起来,实现更复杂的搜索逻辑。常见的有两种:
- fuction score:算分函数查询,可以控制文档相关性算分,控制文档排名
- bool query:布尔查询,利用逻辑关系组合多个其它的查询,实现复杂搜索
相关性算分
当我们利用match查询时,文档结果会根据与搜索词条的关联度打分(_score),返回结果时按照分值降序排列
在elasticsearch中,早期使用的打分算法是TF-IDF算法,公式如下
后来演变为
在5.1版本,算法改进为BM25
TF-IDF算法有一各缺陷,就是词条频率越高,文档得分也会越高,单个词条对文档影响较大。而BM25则会让单个词条的算分有一个上限,曲线更加平滑
相关性算分查询
function score 查询中包含四部分内容:
- 原始查询条件:query部分,基于这个条件搜索文档,并且基于BM25算法给文档打分,原始算分(query score)
- 过滤条件:filter部分,符合该条件的文档才会重新算分
- 算分函数:符合filter条件的文档要根据这个函数做运算,得到的函数算分(function score),有四种函数
- weight:函数结果是常量
- field_value_factor:以文档中的某个字段值作为函数结果
- random_score:以随机数作为函数结果
- script_score:自定义算分函数算法
- 运算模式:算分函数的结果、原始查询的相关性算分,两者之间的运算方式,包括:
- multiply:相乘
- replace:用function score替换query score
- 其它,例如:sum、avg、max、min
function score的运行流程如下:
- 1)根据原始条件查询搜索文档,并且计算相关性算分,称为原始算分(query score)
- 2)根据过滤条件,过滤文档
- 3)符合过滤条件的文档,基于算分函数运算,得到函数算分(function score)
- 4)将原始算分(query score)和函数算分(function score)基于运算模式做运算,得到最终结果,作为相关性算分
整个过程的关键点是:
- 过滤条件:决定哪些文档的算分被修改
- 算分函数:决定函数算分的算法
- 运算模式:决定最终算分结果
案例
1 | GET hotel/_search |
布尔查询
布尔查询是一个或多个查询子句的组合,每一个子句就是一个子查询。子查询的组合方式有:
- must:必须匹配每个子查询,类似“与”
- should:选择性匹配子查询,类似“或”
- must_not:必须不匹配,不参与算分,类似“非”
- filter:必须匹配,不参与算分
搜索时,参与打分的字段越多,查询的性能也越差,建议搜索框的关键字搜索,是全文检索查询,使用must查询,参与算分,其它过滤条件,采用filter查询。不参与算分
1 | GET hotel/_search |
案例:搜索名字包含“如家”,价格不高于400,在坐标31.21,121.5周围10km范围内的酒店
1 | GET /hotel/_search |
搜索结果处理
排序
elasticsearch默认是根据相关度算分(_score)来排序,但是也支持自定义方式对搜索结果排序。可以排序字段类型有:keyword类型、数值类型、地理坐标类型、日期类型等
语法
1 | GET /indexName/_search |
排序条件是一个数组,也就是可以写多个排序条件。按照声明的顺序,当第一个条件相等时,再按照第二个条件排序,以此类推
普通字段排序
1 | GET hotel/_search |
地理坐标排序
1 | GET hotel/_search |
分页
elasticsearch 默认情况下只返回top10的数据。而如果要查询更多数据就需要修改分页参数了。elasticsearch中通过修改from、size参数来控制要返回的分页结果:
- from:从第几个文档开始
- size:总共查询几个文档
类似于mysql中的limit ?, ?
基本分页
1 | GET /hotel/_search |
深度分页
- search after:分页时需要排序,原理是从上一次的排序值开始,查询下一页数据。官方推荐使用的方式。
- scroll:原理将排序后的文档id形成快照,保存在内存。官方已经不推荐使用
分页查询的常见实现方案以及优缺点:
from + size
:- 优点:支持随机翻页
- 缺点:深度分页问题,默认查询上限(from + size)是10000
- 场景:百度、京东、谷歌、淘宝这样的随机翻页搜索
after search
:- 优点:没有查询上限(单次查询的size不超过10000)
- 缺点:只能向后逐页查询,不支持随机翻页
- 场景:没有随机翻页需求的搜索,例如手机向下滚动翻页
scroll
:- 优点:没有查询上限(单次查询的size不超过10000)
- 缺点:会有额外内存消耗,并且搜索结果是非实时的
- 场景:海量数据的获取和迁移。从ES7.1开始不推荐,建议用 after search方案。
高亮
高亮显示的实现分为两步:
- 给文档中的所有关键字都添加一个标签,例如
<em>
标签 - 页面给
<em>
标签编写CSS样式
语法
1 | GET /hotel/_search |
注意:
- 高亮是对关键字高亮,因此搜索条件必须带有关键字,而不能是范围这样的查询。
- 默认情况下,高亮的字段,必须与搜索指定的字段一致,否则无法高亮
- 如果要对非搜索字段高亮,则需要添加一个属性:required_field_match=false
案例
1 | GET hotel/_search |
RestClient查询文档
基本步骤包括:
- 准备Request对象
- 准备请求参数
- 发起请求
- 解析响应基本步骤包括:
- 准备Request对象
- 准备请求参数
- 发起请求
- 解析响应
全部查询matchAll
查询过程
第一步,创建
SearchRequest
对象,指定索引库名第二步,利用
request.source()
构建DSL,DSL中可以包含查询、分页、排序、高亮等query()
:代表查询条件,利用QueryBuilders.matchAllQuery()
构建一个match_all查询的DSL
第三步,利用client.search()发送请求,得到响应
1 |
|
解析过程:
1 | private void handleResponse(SearchResponse response) { |
全文检索match
1 |
|
精确查询term
1 |
|
范围查询range
1 |
|
布尔查询bool
1 |
|
排序sort
1 |
|
分页page
1 |
|
高亮hight
首先是查询条件的相关设置
1 |
|
结果集的映射
- 第一步:从结果中获取
source.hit.getSourceAsString()
,这部分是非高亮结果,json字符串。还需要反序列为HotelDoc对象 - 第二步:获取高亮结果。
hit.getHighlightFields()
,返回值是一个Map,key是高亮字段名称,值是HighlightField对象,代表高亮值 - 第三步:从map中根据高亮字段名称,获取高亮字段值对象HighlightField
- 第四步:从HighlightField中获取Fragments,并且转为字符串。这部分就是真正的高亮字符串了
- 第五步:用高亮的结果替换HotelDoc中的非高亮结果
1 | // TODO: 2022/10/8 高亮格式化 |
算分functionScore
1 | // 2.算分控制 |
地理排序geoDistance
1 | request.source().sort(SortBuilders |
酒店旅游案例
->hotel-demo
数据聚合
简介及分类
聚合(aggregations)可以让我们极其方便的实现对数据的统计、分析、运算,聚合常见的有三类:
- 桶(Bucket)聚合:用来对文档做分组
- TermAggregation:按照文档字段值分组,例如按照品牌值分组、按照国家分组
- Date Histogram:按照日期阶梯分组,例如一周为一组,或者一月为一组
- 度量(Metric)聚合:用以计算一些值,比如:最大值、最小值、平均值等
- Avg:求平均值
- Max:求最大值
- Min:求最小值
- Stats:同时求max、min、avg、sum等
- 管道(pipeline)聚合:其它聚合的结果为基础做聚合
注意:参加聚合的字段必须是keyword、日期、数值、布尔类型
DSL实现聚合
Bucker聚合
1 | GET /hotel/_search |
聚合结果排序
默认情况下,Bucket聚合会统计Bucket内的文档数量,记为_count,并且按照_count降序排序
我们可以指定order属性,自定义聚合的排序方式:
1 | GET hotel/_search |
聚合范围限定
默认情况下,Bucket聚合是对索引库的所有文档做聚合,但真实场景下,用户会输入搜索条件,因此聚合必须是对搜索结果聚合。那么聚合必须添加限定条件
我们可以限定要聚合的文档范围,只要按之前查询的方式添加query条件即可
1 | GET /hotel/_search |
聚合统计(Metric)
对聚合结果的字段进行min、max、avg等求值
1 | GET /hotel/_search |
总结
aggs代表聚合,与query同级,此时query的作用是
- 限定聚合的的文档范围
聚合必须的三要素:
- 聚合名称
- 聚合类型
- 聚合字段
聚合可配置属性有:
- size:指定聚合结果数量
- order:指定聚合结果排序方式
- field:指定聚合字段
RestAPI实现聚合
聚合条件与query条件同级别,因此需要使用request.source()来指定聚合条件
聚合结果解析
酒店实现聚合
需求:搜索页面的品牌、城市等信息不应该是在页面写死,而是通过聚合索引库中的酒店数据得来的:
需求:
用户搜索“东方明珠”,那搜索的酒店肯定是在上海东方明珠附近,因此,城市只能是上海,此时城市列表中就不应该显示北京、深圳、杭州这些信息了
也就是说,搜索结果中包含哪些城市,页面就应该列出哪些城市;搜索结果中包含哪些品牌,页面就应该列出哪些品牌
代码
->hotel-demo->HotelController->getFilters
分词器/自动补全
拼音分词器
简介看下面
安装看这里
测试
1 | GET /_analyze |
自定义分词器
默认的拼音分词器会将每个汉字单独分为拼音,而我们希望的是每个词条形成一组拼音,需要对拼音分词器做个性化定制,形成自定义分词器
elasticsearch中分词器(analyzer)的组成包含三部分:
- character filters:在tokenizer之前对文本进行处理。例如删除字符、替换字符
- tokenizer:将文本按照一定的规则切割成词条(term)。例如keyword,就是不分词;还有ik_smart
- tokenizer filter:将tokenizer输出的词条做进一步处理。例如大小写转换、同义词处理、拼音处理等
1 | #创建基于拼音分词器的索引库 |
测试
1 | GET /test/_analyze |
注意:为了避免搜索到同音字,搜索时不要使用拼音分词器
搜索框自动补全
elasticsearch提供了Completion Suggester查询来实现自动补全功能。这个查询会匹配以用户输入内容开头的词条并返回。为了提高补全查询的效率,对于文档中字段的类型有一些约束:
- 参与补全查询的字段必须是completion类型。
- 字段的内容一般是用来补全的多个词条形成的数组。
比如,一个这样的索引库:
1 | // 创建索引库 |
然后插入下面的数据:
1 | // 示例数据 |
查询的DSL语句如下:
1 | // 自动补全查询 |
RestAPI实现自动补全
自动补全请求
自动补全结果解析
代码
1 |
|
酒店实现自动补全
我们的hotel索引库还没有设置拼音分词器,需要修改索引库中的配置。但是我们知道索引库是无法修改的,只能删除然后重新创建。
另外,我们需要添加一个字段,用来做自动补全,将brand、suggestion、city等都放进去,作为自动补全的提示。
需要完成的步骤如下
修改hotel索引库结构,设置自定义拼音分词器
映射结构->hotel-demo->资料->分词器-自动补全.json->酒店自动补全
修改索引库的name、all字段,使用自定义分词器
索引库添加一个新字段suggestion,类型为completion类型,使用自定义的分词器
给HotelDoc类添加suggestion字段,内容包含brand、business
重点在suggestion的组装
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
public class HotelDoc {
private Long id;
private String name;
private String address;
private Integer price;
private Integer score;
private String brand;
private String city;
private String starName;
private String business;
private String location;
private String pic;
private Object distance;
private Boolean isAD;
private List<String> suggestion;
public HotelDoc(Hotel hotel) {
this.id = hotel.getId();
this.name = hotel.getName();
this.address = hotel.getAddress();
this.price = hotel.getPrice();
this.score = hotel.getScore();
this.brand = hotel.getBrand();
this.city = hotel.getCity();
this.starName = hotel.getStarName();
this.business = hotel.getBusiness();
this.location = hotel.getLatitude() + ", " + hotel.getLongitude();
this.pic = hotel.getPic();
// 组装suggestion
if(this.business.contains("/")){
// business有多个值,需要切割
String[] arr = this.business.split("/");
// 添加元素
this.suggestion = new ArrayList<>();
this.suggestion.add(this.brand);
Collections.addAll(this.suggestion, arr);
}else {
this.suggestion = Arrays.asList(this.brand, this.business);
}
}
}重新导入数据到hotel库
hotel-demo->test->HotelDocumentTest->testBulkRequest
酒店实现搜索框自动补全代码
->hotel-demo->HotelController->getSuggestions
数据同步
简介与几种同步方式
elasticsearch中的酒店数据来自于mysql数据库,因此mysql数据发生改变时,elasticsearch也必须跟着改变,这个就是elasticsearch与mysql之间的数据同步
同步调用
- hotel-demo对外提供接口,用来修改elasticsearch中的数据
- 酒店管理服务在完成数据库操作后,直接调用hotel-demo提供的接口,
异步通知
- hotel-admin对mysql数据库数据完成增、删、改后,发送MQ消息
- hotel-demo监听MQ,接收到消息后完成elasticsearch数据修改
监听binlog
- 给mysql开启binlog功能
- mysql完成增、删、改操作都会记录在binlog中
- hotel-demo基于canal监听binlog变化,实时更新elasticsearch中的内容
三种方式的优缺点
方式一:同步调用
- 优点:实现简单,粗暴
- 缺点:业务耦合度高
方式二:异步通知
- 优点:低耦合,实现难度一般
- 缺点:依赖mq的可靠性
方式三:监听binlog
- 优点:完全解除服务间耦合
- 缺点:开启binlog增加数据库负担、实现复杂度高
酒店实现数据同步
案例的hotelAdmin是进行酒店管理的系统,这里的需求便是进行酒店增删改时,对应的es文档库对进行相应修改,方式采用MQ结合Nacos和Feign
在hotel-admin和hotel-demo中的cn.itcast.hotel.constatnts
包下新建一个类MqConstants
:
1 | package cn.itcast.hotel.constatnts; |
在两个项目中都引入引入nacos和feign依赖,之后记得启动nacos服务
1 | <!--nacos--> |
进行相关配置分别定义各自名称
1 | spring: |
对hotelAdmin进行操作,在HotelController执行增删改时发送mq消息,建议将该业务移动至service层,符合开发规范
1 |
|
接下来是对hotelDmoe进行操作,调用hotelAdmin服务并完成相关数据更新操作
hotelDemo定义队列交换机
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37package cn.itcast.hotel.config;
import cn.itcast.hotel.constatnts.MqConstants;
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.core.TopicExchange;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
public class MqConfig {
public TopicExchange topicExchange(){
return new TopicExchange(MqConstants.HOTEL_EXCHANGE, true, false);
}
public Queue insertQueue(){
return new Queue(MqConstants.HOTEL_INSERT_QUEUE, true);
}
public Queue deleteQueue(){
return new Queue(MqConstants.HOTEL_DELETE_QUEUE, true);
}
public Binding insertQueueBinding(){
return BindingBuilder.bind(insertQueue()).to(topicExchange()).with(MqConstants.HOTEL_INSERT_KEY);
}
public Binding deleteQueueBinding(){
return BindingBuilder.bind(deleteQueue()).to(topicExchange()).with(MqConstants.HOTEL_DELETE_KEY);
}
}编写监听器
在hotel-demo中的
cn.itcast.hotel.mq
包新增一个类1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
public class HotelListener {
private IHotelService hotelService;
/**
* 监听酒店新增或修改的业务
* @param id 酒店id
*/
public void listenHotelInsertOrUpdate(Long id){
hotelService.insertById(id);
}
/**
* 监听酒店删除的业务
* @param id 酒店id
*/
public void listenHotelDelete(Long id){
hotelService.deleteById(id);
}
}在IHotelService加入deleteById和insertById方法
1
2void deleteById(Long id);
void insertById(Long id);在进行相关删除修改操作前先在HotelDemoApplication中开启Feign注解,用于后续调取hotelAdmin服务
1
2
3
4
public class HotelDemoApplication {}编写feign接口用于调取hotelAdmin服务
1
2
3
4
5
public interface HotelAdmin {
Hotel queryByIds(; Long id)
}在HotelService实现类中注入并实现相关业务
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
private HotelAdmin hotelAdmin;
public void deleteById(Long id) {
try {
// 1.准备Request
DeleteRequest request = new DeleteRequest("hotel", id.toString());
// 2.发送请求
client.delete(request, RequestOptions.DEFAULT);
} catch (IOException e) {
throw new RuntimeException(e);
}
}
public void insertById(Long id) {
try {
// 0.根据id查询酒店数据
// Hotel hotel = getById(id);
// TODO: 2022/10/12 使用feign向注册中心调取服务
final Hotel hotel = hotelAdmin.queryById(id);
// 转换为文档类型
HotelDoc hotelDoc = new HotelDoc(hotel);
// 1.准备Request对象
IndexRequest request = new IndexRequest("hotel").id(hotel.getId().toString());
// 2.准备Json文档
request.source(JSON.toJSONString(hotelDoc), XContentType.JSON);
// 3.发送请求
client.index(request, RequestOptions.DEFAULT);
} catch (IOException e) {
throw new RuntimeException(e);
}
}集群
简介与集群搭建
单机的elasticsearch做数据存储,必然面临两个问题:海量数据存储问题、单点故障问题
- 海量数据存储问题:将索引库从逻辑上拆分为N个分片(shard),存储到多个节点
- 单点故障问题:将分片数据在不同节点备份(replica )
集群相关概念
集群(cluster):一组拥有共同的 cluster name 的 节点。
节点(node) :集群中的一个 Elasticearch 实例
分片(shard):索引可以被拆分为不同的部分进行存储,称为分片。在集群环境下,一个索引的不同分片可以拆分到不同的节点中
解决问题:数据量太大,单点存储量有限的问题。
此处,我们把数据分成3片:shard0、shard1、shard2
主分片(Primary shard):相对于副本分片的定义。
副本分片(Replica shard)每个主分片可以有一个或者多个副本,数据和主分片一样
数据备份可以保证高可用,但是每个分片备份一份,所需要的节点数量就会翻一倍,成本实在是太高了!
为了在高可用和成本间寻求平衡,我们可以这样做:
- 首先对数据分片,存储到不同节点
- 然后对每个分片进行备份,放到对方节点,完成互相备份
这样可以大大减少所需要的服务节点数量,如图,我们以3分片,每个分片备份一份为例:
现在,每个分片都有1个备份,存储在3个节点:
- node0:保存了分片0和1
- node1:保存了分片0和2
- node2:保存了分片1和2
搭建
SpringCloud\hotel-demo\资料\安装elasticsearch
集群职责划分
集群节点的职责划分
节点类型 | 配置参数 | 默认值 | 节点职责 |
---|---|---|---|
master eligible | node.master | true | 备选主节点:主节点可以管理和记录集群状态、决定分片在哪个节点、处理创建和删除索引库的请求 |
data | node.data | true | 数据节点:存储数据、搜索、聚合、CRUD |
ingest | node.ingest | true | 数据存储之前的预处理 |
coordinating | 上面3个参数都为false则为coordinating节点 | 无 | 路由请求到其它节点合并其它节点处理的结果,返回给用户 |
默认情况下,集群中的任何一个节点都同时具备上述四种角色
但是真实的集群一定要将集群职责分离:
- master节点:对CPU要求高,但是内存要求第
- data节点:对CPU和内存要求都高
- coordinating节点:对网络带宽、CPU要求高
职责分离可以让我们根据不同节点的需求分配不同的硬件去部署。而且避免业务之间的互相干扰
集群脑裂问题
脑裂是因为集群中的节点失联导致的。
例如一个集群中,主节点与其它节点失联:
此时,node2和node3认为node1宕机,就会重新选主:
当node3当选后,集群继续对外提供服务,node2和node3自成集群,node1自成集群,两个集群数据不同步,出现数据差异。
当网络恢复后,因为集群中有两个master节点,集群状态的不一致,出现脑裂的情况:
解决脑裂的方案是,要求选票超过 ( eligible节点数量 + 1 )/ 2 才能当选为主,因此eligible节点数量最好是奇数。对应配置项是discovery.zen.minimum_master_nodes,在es7.0以后,已经成为默认配置,因此一般不会发生脑裂问题
集群分布式存储
elasticsearch会通过hash算法来计算文档应该存储到哪个分片:
说明:
- _routing默认是文档的id
- 算法与分片数量有关,因此索引库一旦创建,分片数量不能修改!
可以通过explain属性来查看数据所属分片
1 | { |
新增文档流程图
- 新增一个id=1的文档
- 对id做hash运算,假如得到的是2,则应该存储到shard-2
- shard-2的主分片在node3节点,将数据路由到node3
- 保存文档
- 同步给shard-2的副本replica-2,在node2节点
- 返回结果给coordinating-node节点
集群分布式查询
集群故障转移
集群的master节点会监控集群中的节点状态,如果发现有节点宕机,会立即将宕机节点的分片数据迁移到其它节点,确保数据安全,这个叫做故障转移
可以通过docker stop es01
后查看cerebro状态,es集群会自动进行故障转移
Sentinel
初识
雪崩
微服务调用链路中的某个服务故障,引起整个链路中的所有微服务都不可用
解决雪崩的方式
超时处理
设定超时时间,请求超过一定时间没有响应就返回错误信息,不会无休止等待
舱壁模式
限定每个业务能使用的线程数,避免耗尽整个tomcat的资源,因此也叫线程隔离
熔断降级
由断路器统计业务执行的异常比例,如果超出阈值则会熔断该业务,拦截访问该业务的一切请求
流量控制
限制业务访问的QPS,避免服务因流量的突增而故障
Sentinel
特性
- 丰富的应用场景:Sentinel 承接了阿里巴巴近 10 年的双十一大促流量的核心场景,例如秒杀(即突发流量控制在系统容量可以承受的范围)、消息削峰填谷、集群流量控制、实时熔断下游不可用应用等。
- 完备的实时监控:Sentinel 同时提供实时的监控功能。您可以在控制台中看到接入应用的单台机器秒级数据,甚至 500 台以下规模的集群的汇总运行情况。
- 广泛的开源生态:Sentinel 提供开箱即用的与其它开源框架/库的整合模块,例如与 Spring Cloud、Dubbo、gRPC 的整合。您只需要引入相应的依赖并进行简单的配置即可快速地接入 Sentinel。
- 完善的 SPI 扩展点:Sentinel 提供简单易用、完善的 SPI 扩展接口。您可以通过实现扩展接口来快速地定制逻辑。例如定制规则管理、适配动态数据源等。
Sentinel | Hystrix | |
---|---|---|
隔离策略 | 信号量隔离 | 线程池隔离/信号量隔离 |
熔断降级策略 | 基于慢调用比例或异常比例 | 基于失败比率 |
实时指标实现 | 滑动窗口 | 滑动窗口(基于 RxJava) |
规则配置 | 支持多种数据源 | 支持多种数据源 |
扩展性 | 多个扩展点 | 插件的形式 |
基于注解的支持 | 支持 | 支持 |
限流 | 基于 QPS,支持基于调用关系的限流 | 有限的支持 |
流量整形 | 支持慢启动、匀速排队模式 | 不支持 |
系统自适应保护 | 支持 | 不支持 |
控制台 | 开箱即用,可配置规则、查看秒级监控、机器发现等 | 不完善 |
常见框架的适配 | Servlet、Spring Cloud、Dubbo、gRPC 等 | Servlet、Spring Cloud Netflix |
安装与使用
下载:GitHub
运行:
1 | java -jar sentinel-dashboard-1.8.1.jar |
若出现jdk版本错误提示,先使用下述命令
1 | set Path=你的jdk路径(可在环境变量查看) |
使用:浏览器访问8080端口,账密默认sentinel
修改配置:
可以在jar包目录下新建配置文件,进行下述配置
配置项 | 默认值 | 说明 |
---|---|---|
server.port | 8080 | 服务端口 |
sentinel.dashboard.auth.username | sentinel | 默认用户名 |
sentinel.dashboard.auth.password | sentinel | 默认密码 |
或者命令行进行对应配置
1 | java -jar sentinel-dashboard-1.8.1.jar -Dserver.port=8090 |
微服务整合:
引入sentinel依赖
1
2
3
4
5<!--sentinel-->
<dependency>
<groupId>com.alibaba.cloud</groupId>
<artifactId>spring-cloud-starter-alibaba-sentinel</artifactId>
</dependency>配置sentinel地址
1
2
3
4
5spring:
cloud:
sentinel:
transport:
dashboard: localhost:8080访问微服务任意端点(接口),触发sentinel监控
流量控制
簇点链路
簇点链路:就是项目内的调用链路,链路中被监控的每个接口就是一个资源。默认情况下sentinel会监控SpringMVC的每一个端点(Endpoint),因此SpringMVC的每一个端点(Endpoint)就是调用链路中的一个资源。
流控、熔断等都是针对簇点链路中的资源来设置的,因此我们可以点击对应资源后面的按钮来设置规则
入门
点击流控按钮:添加规则
这里可以使用jemeter性能测试软件进行测试
流控模式
在添加限流规则时,点击高级选项,可以选择三种流控模式:
- 直接:统计当前资源的请求,触发阈值时对当前资源直接限流,也是默认的模式
- 关联:统计与当前资源相关的另一个资源,触发阈值时,对当前资源限流
- 链路:统计从指定链路访问到本资源的请求,触发阈值时,对指定链路限流
流控模式-关联模式
使用场景:比如用户支付时需要修改订单状态,同时用户要查询订单。查询和修改操作会争抢数据库锁,产生竞争。业务需求是有限支付和更新订单的业务,因此当修改订单业务触发阈值时,需要对查询订单业务限流
满足下面条件可以使用关联模式:
- 两个有竞争关系的资源
- 一个优先级较高,一个优先级较低
流控模式-链路模式
例如有两条请求链路:
/test1 -> /common
/test2 -> /common
如果只希望统计从/test2进入到/common的请求,则可以这样配置:
需求:有查询订单和创建订单业务,两者都需要查询商品。针对从查询订单进入到查询商品的请求统计,并设置限流
步骤
1)添加查询商品方法
在order-service服务中,给OrderService类添加一个queryGoods方法:
1 | public void queryGoods(){ |
2)查询订单时,查询商品
在order-service的OrderController中,修改/order/query端点的业务逻辑:
1 |
|
3)新增订单,查询商品
在order-service的OrderController中,修改/order/save端点,模拟新增订单:
1 |
|
4)给查询商品添加资源标记
默认情况下,OrderService中的方法是不被Sentinel监控的,需要我们自己通过注解来标记要监控的方法。
给OrderService的queryGoods方法添加@SentinelResource注解:
1 |
|
链路模式中,是对不同来源的两个链路做监控。但是sentinel默认会给进入SpringMVC的所有请求设置同一个root资源,会导致链路模式失效。
我们需要关闭这种对SpringMVC的资源聚合,修改order-service服务的application.yml文件:
1 | spring: |
重启服务,访问/order/query和/order/save,可以查看到sentinel的簇点链路规则中,出现了新的资源:
5)添加流控规则
点击goods资源后面的流控按钮,在弹出的表单中填写下面信息:
只统计从/order/query进入/goods的资源,QPS阈值为2,超出则被限流。
流控效果
流控效果是指请求达到流控阈值时应该采取的措施,包括三种:
- 快速失败:达到阈值后,新的请求会被立即拒绝并抛出FlowException异常。是默认的处理方式。
- warm up:预热模式,对超出阈值的请求同样是拒绝并抛出异常。但这种模式阈值会动态变化,从一个较小值逐渐增加到最大阈值。
- 排队等待:让所有的请求按照先后次序排队执行,两个请求的间隔不能小于指定时长
warm up
warm up也叫预热模式,是应对服务冷启动的一种方案。请求阈值初始值是 maxThreshold / coldFactor,持续指定时长后,逐渐提高到maxThreshold值。而coldFactor的默认值是3
例如,我设置QPS的maxThreshold为10,预热时间为5秒,那么初始阈值就是 10 / 3 ,也就是3,然后在5秒后逐渐增长到10
排队等待
排队等待则是让所有请求进入一个队列中,然后按照阈值允许的时间间隔依次执行。后来的请求必须等待前面执行完成,如果请求预期的等待时间超出最大时长,则会被拒绝
例如:QPS = 5,意味着每200ms处理一个队列中的请求;timeout = 2000,意味着预期等待时长超过2000ms的请求会被拒绝并抛出异常。
那什么叫做预期等待时长呢?
比如现在一下子来了12 个请求,因为每200ms执行一个请求,那么:
- 第6个请求的预期等待时长 = 200 * (6 - 1) = 1000ms
- 第12个请求的预期等待时长 = 200 * (12-1) = 2200ms
热点参数限流
之前的限流是统计访问某个资源的所有请求,判断是否超过QPS阈值。而热点参数限流是分别统计参数值相同的请求,判断是否超过QPS阈值
全局参数限流
例如,一个根据id查询商品的接口:
访问/goods/{id}的请求中,id参数值会有变化,热点参数限流会根据参数值分别统计QPS,统计结果:
当id=1的请求触发阈值被限流时,id值不为1的请求不受影响
配置示例:
代表的含义是:对hot这个资源的0号参数(第一个参数)做统计,每1秒相同参数值的请求数不能超过5
热点参数限流
刚才的配置中,对查询商品这个接口的所有商品一视同仁,QPS都限定为5.
而在实际开发中,可能部分商品是热点商品,例如秒杀商品,我们希望这部分商品的QPS限制与其它商品不一样,高一些。那就需要配置热点参数限流的高级选项了:
结合上一个配置,这里的含义是对0号的long类型参数限流,每1秒相同参数的QPS不能超过5,有两个例外:
•如果参数值是100,则每1秒允许的QPS为10
•如果参数值是101,则每1秒允许的QPS为15
注意事项:热点参数限流对默认的SpringMVC资源无效,需要利用@SentinelResource注解标记资源
示例:
给order-service中的OrderController中的/order/{orderId}资源添加注解:
点击左侧菜单中热点规则菜单:
点击新增,填写表单:
隔离和降级
限流是一种预防措施,虽然限流可以尽量避免因高并发而引起的服务故障,但服务还会因为其它原因而故障。
而要将这些故障控制在一定范围,避免雪崩,就要靠线程隔离(舱壁模式)和熔断降级手段了
线程隔离之前讲到过:调用者在调用服务提供者时,给每个调用的请求分配独立线程池,出现故障时,最多消耗这个线程池内资源,避免把调用者的所有资源耗尽。
熔断降级:是在调用方这边加入断路器,统计对服务提供者的调用,如果调用的失败比例过高,则熔断该业务,不允许访问该服务的提供者了。
可以看到,不管是线程隔离还是熔断降级,都是对客户端(调用方)的保护。需要在调用方 发起远程调用时做线程隔离、或者服务熔断。
而我们的微服务远程调用都是基于Feign来完成的,因此我们需要将Feign与Sentinel整合,在Feign里面实现线程隔离和服务熔断。
FeignClient整合Sentinel
修改配置,开启sentinel功能:
修改OrderService的application.yml文件,开启Feign的Sentinel功能:
1 | feign: |
编写失败降级逻辑:
业务失败后,不能直接报错,而应该返回用户一个友好提示或者默认结果,这个就是失败降级逻辑。
给FeignClient编写失败后的降级逻辑
①方式一:FallbackClass,无法对远程调用的异常做处理
②方式二:FallbackFactory,可以对远程调用的异常做处理,我们选择这种
这里我们演示方式二的失败降级处理。
步骤一:在feing-api项目中定义类,实现FallbackFactory:
代码:
1 | package cn.itcast.feign.clients.fallback; |
步骤二:在feing-api项目中的DefaultFeignConfiguration类中将UserClientFallbackFactory注册为一个Bean:
1 |
|
步骤三:在feing-api项目中的UserClient接口中使用UserClientFallbackFactory:
1 | import cn.itcast.feign.clients.fallback.UserClientFallbackFactory; |
重启后,访问一次订单查询业务,然后查看sentinel控制台,可以看到新的簇点链路:
线程隔离(舱壁模式)
线程隔离有两种方式实现:
线程池隔离
信号量隔离(Sentinel默认采用)
如图:
线程池隔离:给每个服务调用业务分配一个线程池,利用线程池本身实现隔离效果
信号量隔离:不创建线程池,而是计数器模式,记录业务使用的线程数量,达到信号量上限时,禁止新的请求。
两者的优缺点:
sentinel线程隔离实践
案例需求:给 order-service服务中的UserClient的查询用户接口设置流控规则,线程数不能超过 2。然后利用jemeter测试
1)配置隔离规则
选择feign接口后面的流控按钮:
填写表单:
2)Jmeter测试
选择《阈值类型-线程数<2》:
一次发生10个请求,有较大概率并发线程数超过2,而超出的请求会走之前定义的失败降级逻辑。
查看运行结果:
发现虽然结果都是通过了,不过部分请求得到的响应是降级返回的null信息。
熔断降级
熔断降级是解决雪崩问题的重要手段。其思路是由断路器统计服务调用的异常比例、慢请求比例,如果超出阈值则会熔断该服务。即拦截访问该服务的一切请求;而当服务恢复时,断路器会放行访问该服务的请求。
断路器控制熔断和放行是通过状态机来完成的:
状态机包括三个状态:
- closed:关闭状态,断路器放行所有请求,并开始统计异常比例、慢请求比例。超过阈值则切换到open状态
- open:打开状态,服务调用被熔断,访问被熔断服务的请求会被拒绝,快速失败,直接走降级逻辑。Open状态5秒后会进入half-open状态
- half-open:半开状态,放行一次请求,根据执行结果来判断接下来的操作。
- 请求成功:则切换到closed状态
- 请求失败:则切换到open状态
断路器熔断策略有三种:慢调用、异常比例、异常数
熔断降级-慢调用
慢调用:业务的响应时长(RT)大于指定时长的请求认定为慢调用请求。在指定时间内,如果请求数量超过设定的最小数量,慢调用比例大于设定的阈值,则触发熔断。
例如:
解读:RT超过500ms的调用是慢调用,统计最近10000ms内的请求,如果请求量超过10次,并且慢调用比例不低于0.5,则触发熔断,熔断时长为5秒。然后进入half-open状态,放行一次请求做测试。
熔断降级-异常比例/异常数
异常比例或异常数:统计指定时间内的调用,如果调用次数超过指定请求数,并且出现异常的比例达到设定的比例阈值(或超过指定异常数),则触发熔断。
例如,一个异常比例设置:
解读:统计最近1000ms内的请求,如果请求量超过10次,并且异常比例不低于0.4,则触发熔断。
一个异常数设置:
解读:统计最近1000ms内的请求,如果请求量超过10次,并且异常比例不低于2次,则触发熔断
授权规则
授权规则可以对请求方来源做判断和控制
授权规则可以对调用方的来源做控制,有白名单和黑名单两种方式。
白名单:来源(origin)在白名单内的调用者允许访问
黑名单:来源(origin)在黑名单内的调用者不允许访问
点击左侧菜单的授权,可以看到授权规则:
资源名:就是受保护的资源,例如/order/{orderId}
流控应用:是来源者的名单,
- 如果是勾选白名单,则名单中的来源被许可访问。
- 如果是勾选黑名单,则名单中的来源被禁止访问。
比如:
我们允许请求从gateway到order-service,不允许浏览器访问order-service,那么白名单中就要填写网关的来源名称(origin)。
Sentinel是通过RequestOriginParser这个接口的parseOrigin来获取请求的来源的。
1 | public interface RequestOriginParser { |
这个方法的作用就是从request对象中,获取请求者的origin值并返回。
默认情况下,sentinel不管请求者从哪里来,返回值永远是default,也就是说一切请求的来源都被认为是一样的值default。
因此,我们需要自定义这个接口的实现,让不同的请求,返回不同的origin。
例如order-service服务中,我们定义一个RequestOriginParser的实现类:
1 | package cn.itcast.order.sentinel; |
我们会尝试从request-header中获取origin值。
既然获取请求origin的方式是从reques-header中获取origin值,我们必须让所有从gateway路由到微服务的请求都带上origin头。
这个需要利用之前学习的一个GatewayFilter来实现,AddRequestHeaderGatewayFilter。
修改gateway服务中的application.yml,添加一个defaultFilter:
1 | spring: |
这样,从gateway路由的所有请求都会带上origin头,值为gateway。而从其它地方到达微服务的请求则没有这个头
接下来,我们添加一个授权规则,放行origin值为gateway的请求。
配置如下:
现在,我们直接跳过网关,访问order-service服务:
通过网关访问:
自定义异常结果
如果要自定义异常时的返回结果,需要实现BlockExceptionHandler接口:
1 | public interface BlockExceptionHandler { |
这个方法有三个参数:
- HttpServletRequest request:request对象
- HttpServletResponse response:response对象
- BlockException e:被sentinel拦截时抛出的异常
这里的BlockException包含多个不同的子类:
异常 | 说明 |
---|---|
FlowException | 限流异常 |
ParamFlowException | 热点参数限流的异常 |
DegradeException | 降级异常 |
AuthorityException | 授权规则异常 |
SystemBlockException | 系统规则异常 |
自定义异常处理类
1 | package cn.itcast.order.sentinel; |
规则持久化
sentinel的所有规则都是内存存储,重启后所有规则都会丢失。在生产环境下,我们必须确保这些规则的持久化,避免丢失
规则是否能持久化,取决于规则管理模式,sentinel支持三种规则管理模式:
- 原始模式:Sentinel的默认模式,将规则保存在内存,重启服务会丢失。
- pull模式
- push模式
pull模式
pull模式:控制台将配置的规则推送到Sentinel客户端,而客户端会将配置规则保存在本地文件或数据库中。以后会定时去本地文件或数据库中查询,更新本地规则。
push模式
push模式:控制台将配置规则推送到远程配置中心,例如Nacos。Sentinel客户端监听Nacos,获取配置变更的推送消息,完成本地配置更新。
实现push模式
->sentinel规则持久化文件
分布式事务
初识
分布式事务,就是指不是在单个服务或单个数据库架构下,产生的事务,例如:
- 跨数据源的分布式事务
- 跨服务的分布式事务
在数据库水平拆分、服务垂直拆分之后,一个业务操作通常要跨多个数据库、服务才能完成。例如电商行业中比较常见的下单付款案例,包括下面几个行为:
- 创建新订单
- 扣减商品库存
- 从用户账户余额扣除金额
完成上面的操作需要访问三个不同的微服务和三个不同的数据库。
订单的创建、库存的扣减、账户扣款在每一个服务和数据库内是一个本地事务,可以保证ACID原则。
但是当我们把三件事情看做一个”业务”,要满足保证“业务”的原子性,要么所有操作全部成功,要么全部失败,不允许出现部分成功部分失败的现象,这就是分布式系统下的事务了。
此时ACID难以满足,这是分布式事务要解决的问题
CAP定理
分布式系统有三个指标。
- Consistency(一致性)
- Availability(可用性)
- Partition tolerance (分区容错性)
- Consistency(一致性):用户访问分布式系统中的任意节点,得到的数据必须一致
- Availability (可用性):用户访问集群中的任意健康节点,必须能得到响应,而不是超时或拒绝
- Partition(分区):因为网络故障或其它原因导致分布式系统中的部分节点与其它节点失去连接,形成独立分区
- Tolerance(容错):在集群出现分区时,整个系统也要持续对外提供服务
在分布式系统中,系统间的网络不能100%保证健康,一定会有故障的时候,而服务有必须对外保证服务。因此Partition Tolerance不可避免
当节点接收到新的数据变更时,就会出现问题了:
如果此时要保证一致性,就必须等待网络恢复,完成数据同步后,整个集群才对外提供服务,服务处于阻塞状态,不可用
如果此时要保证可用性,就不能等待网络恢复,那node01、node02与node03之间就会出现数据不一致
也就是说,在P一定会出现的情况下,A和C之间只能实现一个
BASE理论
BASE理论是对CAP的一种解决思路,包含三个思想:
- Basically Available (基本可用):分布式系统在出现故障时,允许损失部分可用性,即保证核心可用
- Soft State(软状态):在一定时间内,允许出现中间状态,比如临时的不一致状态
- Eventually Consistent(最终一致性):虽然无法保证强一致性,但是在软状态结束后,最终达到数据一致
分布式事务解决思路
分布式事务最大的问题是各个子事务的一致性问题,因此可以借鉴CAP定理和BASE理论,有两种解决思路:
- AP模式:各子事务分别执行和提交,允许出现结果不一致,然后采用弥补措施恢复数据即可,实现最终一致
- CP模式:各个子事务执行后互相等待,同时提交,同时回滚,达成强一致。但事务等待过程中,处于弱可用状态
但不管是哪一种模式,都需要在子系统事务之间互相通讯,协调事务状态,也就是需要一个事务协调者(TC):
这里的子系统事务,称为分支事务;有关联的各个分支事务在一起称为全局事务
SEATA
官网地址:http://seata.io/
Seata事务管理中有三个重要的角色:
- TC (Transaction Coordinator) - 事务协调者:维护全局和分支事务的状态,协调全局事务提交或回滚。
- TM (Transaction Manager) - 事务管理器:定义全局事务的范围、开始全局事务、提交或回滚全局事务。
- RM (Resource Manager) - 资源管理器:管理分支事务处理的资源,与TC交谈以注册分支事务和报告分支事务的状态,并驱动分支事务提交或回滚
整体的架构如图:
Seata基于上述架构提供了四种不同的分布式事务解决方案:
- XA模式:强一致性分阶段事务模式,牺牲了一定的可用性,无业务侵入
- AT模式:最终一致的分阶段事务模式,无业务侵入,也是Seata的默认模式
- TCC模式:最终一致的分阶段事务模式,有业务侵入
- SAGA模式:长事务模式,有业务侵入
Seata-TC服务部署
-> seata的部署和集成.md
微服务集成Seata
首先,在order-service中引入依赖:
1 | <!--seata--> |
在order-service中的application.yml中,配置TC服务信息,通过注册中心nacos,结合服务名称获取TC地址:
1 | seata: |
微服务如何根据这些配置寻找TC的地址呢?
我们知道注册到Nacos中的微服务,确定一个具体实例需要四个信息:
- namespace:命名空间
- group:分组
- application:服务名
- cluster:集群名
以上四个信息,在刚才的yaml文件中都能找到:
namespace为空,就是默认的public
结合起来,TC服务的信息就是:public@DEFAULT_GROUP@seata-tc-server@SH,这样就能确定TC服务集群了。然后就可以去Nacos拉取对应的实例信息了。
XA模式
简介
XA 规范 是 X/Open 组织定义的分布式事务处理(DTP,Distributed Transaction Processing)标准,XA 规范 描述了全局的TM与局部的RM之间的接口,几乎所有主流的数据库都对 XA 规范 提供了支持
XA是规范,目前主流数据库都实现了这种规范,实现的原理都是基于两阶段提交。
正常情况:
异常情况:
一阶段:
- 事务协调者通知每个事物参与者执行本地事务
- 本地事务执行完成后报告事务执行状态给事务协调者,此时事务不提交,继续持有数据库锁
二阶段:
- 事务协调者基于一阶段的报告来判断下一步操作
- 如果一阶段都成功,则通知所有事务参与者,提交事务
- 如果一阶段任意一个参与者失败,则通知所有事务参与者回滚事务
SEATA的XA模式如下
Seata对原始的XA模式做了简单的封装和改造,以适应自己的事务模型,基本架构如图:
RM一阶段的工作:
① 注册分支事务到TC
② 执行分支业务sql但不提交
③ 报告执行状态到TC
TC二阶段的工作:
TC检测各分支事务执行状态
a.如果都成功,通知所有RM提交事务
b.如果有失败,通知所有RM回滚事务
RM二阶段的工作:
- 接收TC指令,提交或回滚事务
优缺点
优点
- 事务的强一致性,满足ACID原则。
- 常用数据库都支持,实现简单,并且没有代码侵入
缺点
- 因为一阶段需要锁定数据库资源,等待二阶段结束才释放,性能较差
- 依赖关系型数据库实现事务
代码实现
1)修改application.yml文件(每个参与事务的微服务),开启XA模式:
1 | seata: |
2)给发起全局事务的入口方法添加@GlobalTransactional注解:
本例中是OrderServiceImpl中的create方法.
3)重启服务并测试
AT模式
简介
AT模式同样是分阶段提交的事务模型,不过缺弥补了XA模型中资源锁定周期过长的缺陷
Seata的AT模型
基本流程图:
阶段一RM的工作:
- 注册分支事务
- 记录undo-log(数据快照)
- 执行业务sql并提交
- 报告事务状态
阶段二提交时RM的工作:
- 删除undo-log即可
阶段二回滚时RM的工作:
- 根据undo-log恢复数据到更新前
流程梳理
我们用一个真实的业务来梳理下AT模式的原理。
比如,现在又一个数据库表,记录用户余额:
id | money |
---|---|
1 | 100 |
其中一个分支业务要执行的SQL为:
1 | update tb_account set money = money - 10 where id = 1 |
AT模式下,当前分支事务执行流程如下:
一阶段:
1)TM发起并注册全局事务到TC
2)TM调用分支事务
3)分支事务准备执行业务SQL
4)RM拦截业务SQL,根据where条件查询原始数据,形成快照。
1 | { |
5)RM执行业务SQL,提交本地事务,释放数据库锁。此时 money = 90
6)RM报告本地事务状态给TC
二阶段:
1)TM通知TC事务结束
2)TC检查分支事务状态
a)如果都成功,则立即删除快照
b)如果有分支事务失败,需要回滚。读取快照数据({"id": 1, "money": 100}
),将快照恢复到数据库。此时数据库再次恢复为100
流程图:
AT与AX的区别
- XA模式一阶段不提交事务,锁定资源;AT模式一阶段直接提交,不锁定资源。
- XA模式依赖数据库机制实现回滚;AT模式利用数据快照实现数据回滚。
- XA模式强一致;AT模式最终一致
AT模式的优缺点
AT模式的优点:
- 一阶段完成直接提交事务,释放数据库资源,性能比较好
- 利用全局锁实现读写隔离
- 没有代码侵入,框架自动完成回滚和提交
AT模式的缺点:
- 两阶段之间属于软状态,属于最终一致
- 框架的快照功能会影响性能,但比XA模式要好很多
脏写问题
在多线程并发访问AT模式的分布式事务时,有可能出现脏写问题,如图:
解决思路就是引入了全局锁的概念。在释放DB锁之前,先拿到全局锁。避免同一时刻有另外一个事务来操作当前数据。
但上面那种情况只适用于两者都用Seata进行管理,当有一个事务不是由Seata管理时,则没法获取全局锁,解决方式如下:核心就是在释放全局锁时的before-image以及after-image用于确定在该业务过程中是否有其他事务操作过该数据
代码实现
AT模式中的快照生成、回滚等动作都是由框架自动完成,没有任何代码侵入,因此实现非常简单。
只不过,AT模式需要一个表来记录全局锁、另一张表来记录数据快照undo_log。
1)导入数据库表,记录全局锁
导入课前资料提供的Sql文件:seata-at.sql,其中lock_table导入到TC服务关联的数据库,undo_log表导入到微服务关联的数据库:
2)修改application.yml文件,将事务模式修改为AT模式即可:
1 | seata: |
3)重启服务并测试
TCC模式
简介
TCC模式与AT模式非常相似,每阶段都是独立事务,不同的是TCC通过人工编码来实现数据恢复。需要实现三个方法:
Try:资源的检测和预留;
Confirm:完成资源操作业务;要求 Try 成功 Confirm 一定要能成功。
Cancel:预留资源释放,可以理解为try的反向操作。
流程分析
举例,一个扣减用户余额的业务。假设账户A原来余额是100,需要余额扣减30元。
- 阶段一( Try ):检查余额是否充足,如果充足则冻结金额增加30元,可用余额扣除30
初识余额:
余额充足,可以冻结:
此时,总金额 = 冻结金额 + 可用金额,数量依然是100不变。事务直接提交无需等待其它事务。
- 阶段二(Confirm):假如要提交(Confirm),则冻结金额扣减30
确认可以提交,不过之前可用金额已经扣减过了,这里只要清除冻结金额就好了:
此时,总金额 = 冻结金额 + 可用金额 = 0 + 70 = 70元
- 阶段二(Canncel):如果要回滚(Cancel),则冻结金额扣减30,可用余额增加30
需要回滚,那么就要释放冻结金额,恢复可用金额:
Seata的TCC模型
Seata中的TCC模型依然延续之前的事务架构,如图:
TCC模式的每个阶段是做什么的?
- Try:资源检查和预留
- Confirm:业务执行和提交
- Cancel:预留资源的释放
优缺点
优点
- 一阶段完成直接提交事务,释放数据库资源,性能好
- 相比AT模型,无需生成快照,无需使用全局锁,性能最强
- 不依赖数据库事务,而是依赖补偿操作,可以用于非事务型数据库
缺点
- 有代码侵入,需要人为编写try、Confirm和Cancel接口,太麻烦
- 软状态,事务是最终一致
- 需要考虑Confirm和Cancel的失败情况,做好幂等处理
事务悬挂和空回滚
空回滚
当某分支事务的try阶段阻塞时,可能导致全局事务超时而触发二阶段的cancel操作。在未执行try操作时先执行了cancel操作,这时cancel不能做回滚,就是空回滚。
如图:
执行cancel操作时,应当判断try是否已经执行,如果尚未执行,则应该空回滚。
业务悬挂
对于已经空回滚的业务,之前被阻塞的try操作恢复,继续执行try,就永远不可能confirm或cancel ,事务一直处于中间状态,这就是业务悬挂。
执行try操作时,应当判断cancel是否已经执行过了,如果已经执行,应当阻止空回滚后的try操作,避免悬挂
代码实现
1)思路分析
这里我们定义一张表:
1 | CREATE TABLE `account_freeze_tbl` ( |
其中:
- xid:是全局事务id
- freeze_money:用来记录用户冻结金额
- state:用来记录事务状态
那此时,我们的业务开怎么做呢?
- Try业务:
- 记录冻结金额和事务状态到account_freeze表
- 扣减account表可用金额
- Confirm业务
- 根据xid删除account_freeze表的冻结记录
- Cancel业务
- 修改account_freeze表,冻结金额为0,state为2
- 修改account表,恢复可用金额
- 如何判断是否空回滚?
- cancel业务中,根据xid查询account_freeze,如果为null则说明try还没做,需要空回滚
- 如何避免业务悬挂?
- try业务中,根据xid查询account_freeze ,如果已经存在则证明Cancel已经执行,拒绝执行try业务
接下来,我们改造account-service,利用TCC实现余额扣减功能。
2)声明TCC接口
TCC的Try、Confirm、Cancel方法都需要在接口中基于注解来声明,
我们在account-service项目中的cn.itcast.account.service
包中新建一个接口,声明TCC三个接口:
1 | package cn.itcast.account.service; |
3)编写实现类
在account-service服务中的cn.itcast.account.service.impl
包下新建一个类,实现TCC业务:
1 | package cn.itcast.account.service.impl; |
SAGA模式
Saga 模式是 Seata 即将开源的长事务解决方案,将由蚂蚁金服主要贡献。
其理论基础是Hector & Kenneth 在1987年发表的论文Sagas。
Seata官网对于Saga的指南:https://seata.io/zh-cn/docs/user/saga.html
原理
在 Saga 模式下,分布式事务内有多个参与者,每一个参与者都是一个冲正补偿服务,需要用户根据业务场景实现其正向操作和逆向回滚操作。
分布式事务执行过程中,依次执行各参与者的正向操作,如果所有正向操作均执行成功,那么分布式事务提交。如果任何一个正向操作执行失败,那么分布式事务会去退回去执行前面各参与者的逆向回滚操作,回滚已提交的参与者,使分布式事务回到初始状态。
Saga也分为两个阶段:
- 一阶段:直接提交本地事务
- 二阶段:成功则什么都不做;失败则通过编写补偿业务来回滚
优缺点
优点:
- 事务参与者可以基于事件驱动实现异步调用,吞吐高
- 一阶段直接提交事务,无锁,性能好
- 不用编写TCC中的三个阶段,实现简单
缺点:
- 软状态持续时间不确定,时效性差
- 没有锁,没有事务隔离,会有脏写
四种模式对比
我们从以下几个方面来对比四种实现:
- 一致性:能否保证事务的一致性?强一致还是最终一致?
- 隔离性:事务之间的隔离性如何?
- 代码侵入:是否需要对业务代码改造?
- 性能:有无性能损耗?
- 场景:常见的业务场景
如图:
高可用
微服务基于事务组(tx-service-group)与TC集群的映射关系,来查找当前应该使用哪个TC集群。当SH集群故障时,只需要将vgroup-mapping中的映射关系改成HZ。则所有微服务就会切换到HZ的TC集群了
实现
-> seata的部署和集成.md
分布式缓存
单机的Redis存在四大问题:
Redis持久化
Redis有两种持久化方案:
- RDB持久化
- AOF持久化
RDB持久化
概述及执行时机
RDB全称Redis Database Backup file(Redis数据备份文件),也被叫做Redis数据快照。简单来说就是把内存中的所有数据都记录到磁盘中。当Redis实例故障重启后,从磁盘读取快照文件,恢复数据。快照文件称为RDB文件,默认是保存在当前运行目录
RDB持久化在四种情况下会执行:
执行save命令
执行下面的命令,可以立即执行一次RDB:
save命令会导致主进程执行RDB,这个过程中其它所有命令都会被阻塞。只有在数据迁移时可能用到
执行bgsave命令
下面的命令可以异步执行RDB:
这个命令执行后会开启独立进程完成RDB,主进程可以持续处理用户请求,不受影响
Redis停机时
Redis停机时会执行一次save命令,实现RDB持久化
触发RDB条件时
Redis内部有触发RDB的机制,可以在redis.conf文件中找到,格式如下:
1
2
3
4# 900秒内,如果至少有1个key被修改,则执行bgsave , 如果是save "" 则表示禁用RDB
save 900 1
save 300 10
save 60 10000RDB的其它配置也可以在redis.conf文件中设置:
1
2
3
4
5
6
7
8# 是否压缩 ,建议不开启,压缩也会消耗cpu,磁盘的话不值钱
rdbcompression yes
# RDB文件名称
dbfilename dump.rdb
# 文件保存的路径目录
dir ./
原理
bgsave开始时会fork主进程得到子进程,子进程共享主进程的内存数据。完成fork后读取内存数据并写入 RDB 文件。
fork采用的是copy-on-write技术:
- 当主进程执行读操作时,访问共享内存;
- 当主进程执行写操作时,则会拷贝一份数据,执行写操作。
RDB的缺点
- RDB执行间隔时间长,两次RDB之间写入数据有丢失的风险
- fork子进程、压缩、写出RDB文件都比较耗时
AOF持久化
概述
AOF全称为Append Only File(追加文件)。Redis处理的每一个写命令都会记录在AOF文件,可以看做是命令日志文件
AOF配置
AOF默认是关闭的,需要修改redis.conf配置文件来开启AOF:
1 | # 是否开启AOF功能,默认是no |
AOF的命令记录的频率也可以通过redis.conf文件来配:
1 | # 表示每执行一次写命令,立即记录到AOF文件 |
三种策略对比:
AOF文件重写
因为是记录命令,AOF文件会比RDB文件大的多。而且AOF会记录对同一个key的多次写操作,但只有最后一次写操作才有意义。通过执行bgrewriteaof命令,可以让AOF文件执行重写功能,用最少的命令达到相同效果
Redis也会在触发阈值时自动去重写AOF文件。阈值也可以在redis.conf中配置:
1 | # AOF文件比上次文件 增长超过多少百分比则触发重写 |
RDB与AOF对比
RDB和AOF各有自己的优缺点,如果对数据安全性要求较高,在实际开发中往往会结合两者来使用
Redis主从
主从架构搭建
单节点Redis的并发能力是有上限的,要进一步提高Redis的并发能力,就需要搭建主从集群,实现读写分离。
具体搭建流程参考Redis集群.md
全量同步
主从第一次建立连接时,会执行全量同步,将master节点的所有数据都拷贝给slave节点,流程:
这里有一个问题,master如何得知salve是第一次来连接呢?
有几个概念,可以作为判断依据:
- Replication Id:简称replid,是数据集的标记,id一致则说明是同一数据集。每一个master都有唯一的replid,slave则会继承master节点的replid
- offset:偏移量,随着记录在repl_baklog中的数据增多而逐渐增大。slave完成同步时也会记录当前同步的offset。如果slave的offset小于master的offset,说明slave数据落后于master,需要更新。
因此slave做数据同步,必须向master声明自己的replication id 和offset,master才可以判断到底需要同步哪些数据
因为slave原本也是一个master,有自己的replid和offset,当第一次变成slave,与master建立连接时,发送的replid和offset是自己的replid和offset。
master判断发现slave发送来的replid与自己的不一致,说明这是一个全新的slave,就知道要做全量同步了
master会将自己的replid和offset都发送给这个slave,slave保存这些信息,以后slave的replid就与master一致了
master判断一个节点是否是第一次同步的依据,就是看replid是否一致。
如图:
完整流程描述:
- slave节点请求增量同步
- master节点判断replid,发现不一致,拒绝增量同步
- master将完整内存数据生成RDB,发送RDB到slave
- slave清空本地数据,加载master的RDB
- master将RDB期间的命令记录在repl_baklog,并持续将log中的命令发送给slave
- slave执行接收到的命令,保持与master之间的同步
增量同步
全量同步需要先做RDB,然后将RDB文件通过网络传输个slave,成本太高了。因此除了第一次做全量同步,其它大多数时候slave与master都是做增量同步。
什么是增量同步?就是只更新slave与master存在差异的部分数据。如图:
repl_backlog原理
master怎么知道slave与自己的数据差异在哪里呢
这就要说到全量同步时的repl_baklog文件了
这个文件是一个固定大小的数组,只不过数组是环形,也就是说角标到达数组末尾后,会再次从0开始读写,这样数组头部的数据就会被覆盖
repl_baklog中会记录Redis处理过的命令日志及offset,包括master当前的offset,和slave已经拷贝到的offset:
slave与master的offset之间的差异,就是salve需要增量拷贝的数据了
随着不断有数据写入,master的offset逐渐变大,slave也不断的拷贝,追赶master的offset:
直到数组被填满:
此时,如果有新的数据写入,就会覆盖数组中的旧数据。不过,旧的数据只要是绿色的,说明是已经被同步到slave的数据,即便被覆盖了也没什么影响。因为未同步的仅仅是红色部分
但是,如果slave出现网络阻塞,导致master的offset远远超过了slave的offset:
如果master继续写入新数据,其offset就会覆盖旧的数据,直到将slave现在的offset也覆盖:
棕色框中的红色部分,就是尚未同步,但是却已经被覆盖的数据。此时如果slave恢复,需要同步,却发现自己的offset都没有了,无法完成增量同步了。只能做全量同步。
主从同步优化
可以从以下几个方面来优化Redis主从就集群:
在master中配置repl-diskless-sync yes启用无磁盘复制,避免全量同步时的磁盘IO。
Redis单节点上的内存占用不要太大,减少RDB导致的过多磁盘IO
适当提高repl_baklog的大小,发现slave宕机时尽快实现故障恢复,尽可能避免全量同步
限制一个master上的slave节点数量,如果实在是太多slave,则可以采用主-从-从链式结构,减少master压力
架构图:
Redis哨兵
Redis提供了哨兵(Sentinel)机制来实现主从集群的自动故障恢复
集群结构和作用
哨兵的结构如图:
哨兵的作用如下:
- 监控:Sentinel 会不断检查您的master和slave是否按预期工作
- 自动故障恢复:如果master故障,Sentinel会将一个slave提升为master。当故障实例恢复后也以新的master为主
- 通知:Sentinel充当Redis客户端的服务发现来源,当集群发生故障转移时,会将最新信息推送给Redis的客户端
集群监控原理
Sentinel基于心跳机制监测服务状态,每隔1秒向集群的每个实例发送ping命令:
•主观下线:如果某sentinel节点发现某实例未在规定时间响应,则认为该实例主观下线。
•客观下线:若超过指定数量(quorum)的sentinel都认为该实例主观下线,则该实例客观下线。quorum值最好超过Sentinel实例数量的一半。
集群故障恢复原理
一旦发现master故障,sentinel需要在salve中选择一个作为新的master,选择依据是这样的:
- 首先会判断slave节点与master节点断开时间长短,如果超过指定值(down-after-milliseconds * 10)则会排除该slave节点
- 然后判断slave节点的slave-priority值,越小优先级越高,如果是0则永不参与选举
- 如果slave-prority一样,则判断slave节点的offset值,越大说明数据越新,优先级越高
- 最后是判断slave节点的运行id大小,越小优先级越高。
当选出一个新的master后,该如何实现切换呢?
流程如下:
- sentinel给备选的slave1节点发送slaveof no one命令,让该节点成为master
- sentinel给所有其它slave发送slaveof 192.168.150.101 7002 命令,让这些slave成为新master的从节点,开始从新的master上同步数据。
- 最后,sentinel将故障节点标记为slave,当故障节点恢复后会自动成为新的master的slave节点
搭建哨兵集群
具体搭建流程参考Redis集群.md
RedisTemplate
在Sentinel集群监管下的Redis主从集群,其节点会因为自动故障转移而发生变化,Redis的客户端必须感知这种变化,及时更新连接信息。Spring的RedisTemplate底层利用lettuce实现了节点的感知和自动切换
下面,我们通过一个测试来实现RedisTemplate集成哨兵机制
详情代码redis-demo工程
在项目的pom文件中引入依赖:
1
2
3
4<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-data-redis</artifactId>
</dependency>配置Redis地址
在配置文件application.yml中指定redis的sentinel相关信息
1
2
3
4
5
6
7
8spring:
redis:
sentinel:
master: mymaster
nodes:
- 192.168.150.101:27001
- 192.168.150.101:27002
- 192.168.150.101:27003配置读写分离
在项目的启动类中,添加一个新的bean:
1
2
3
4
public LettuceClientConfigurationBuilderCustomizer clientConfigurationBuilderCustomizer(){
return clientConfigurationBuilder -> clientConfigurationBuilder.readFrom(ReadFrom.REPLICA_PREFERRED);
}这个bean中配置的就是读写策略,包括四种:
- MASTER:从主节点读取
- MASTER_PREFERRED:优先从master节点读取,master不可用才读取replica
- REPLICA:从slave(replica)节点读取
- REPLICA _PREFERRED:优先从slave(replica)节点读取,所有的slave都不可用才读取master
Redis分片集群
初识与分片集群的搭建
主从和哨兵可以解决高可用、高并发读的问题。但是依然有两个问题没有解决:
海量数据存储问题
高并发写的问题
使用分片集群可以解决上述问题,如图:
分片集群特征:
集群中有多个master,每个master保存不同数据
每个master都可以有多个slave节点
master之间通过ping监测彼此健康状态
客户端请求可以访问集群任意节点,最终都会被转发到正确节点
具体搭建流程参考Redis集群.md
散列插槽
插槽原理
Redis会把每一个master节点映射到0~16383共16384个插槽(hash slot)上,查看集群信息时就能看到:
数据key不是与节点绑定,而是与插槽绑定。redis会根据key的有效部分计算插槽值,分两种情况:
- key中包含”{}”,且“{}”中至少包含1个字符,“{}”中的部分是有效部分
- key中不包含“{}”,整个key都是有效部分
例如:key是num,那么就根据num计算,如果是{itcast}num,则根据itcast计算。计算方式是利用CRC16算法得到一个hash值,然后对16384取余,得到的结果就是slot值。
如图,在7001这个节点执行set a 1时,对a做hash运算,对16384取余,得到的结果是15495,因此要存储到103节点。
到了7003后,执行get num
时,对num做hash运算,对16384取余,得到的结果是2765,因此需要切换到7001节点
如何将同一类数据固定的保存在同一个Redis实例?
- 这一类数据使用相同的有效部分,例如key都以{typeId}为前缀
- 方便分类管理,比如商品统一放在一个集群上,用户统一放在另一个集群上
集群伸缩
redis-cli –cluster提供了很多操作集群的命令,可以通过下面方式查看:
比如,添加节点的命令:
需求:向集群中添加一个新的master节点,并向其中存储 num = 10,具体步骤如下
- 启动一个新的redis实例,端口为7004
- 添加7004到之前的集群,并作为一个master节点
- 给7004节点分配插槽,使得num这个key可以存储到7004实例
这里需要两个新的功能:
- 添加一个节点到集群中
- 将部分插槽分配到新插槽
创建新的redis实例
创建一个文件夹:
1
mkdir 7004
拷贝配置文件:
1
cp redis.conf /7004
修改配置文件:
1
sed /s/6379/7004/g 7004/redis.conf
启动
1
redis-server 7004/redis.conf
添加新节点到redis
添加节点的语法如下:
执行命令:
1
redis-cli --cluster add-node 192.168.150.101:7004 192.168.150.101:7001
通过命令查看集群状态:
1
redis-cli -p 7001 cluster nodes
如图,7004加入了集群,并且默认是一个master节点:
但是,可以看到7004节点的插槽数量为0,因此没有任何数据可以存储到7004上
转移插槽
我们要将num存储到7004节点,因此需要先看看num的插槽是多少:
如上图所示,num的插槽为2765.
我们可以将0~3000的插槽从7001转移到7004,命令格式如下:
具体命令如下:
建立连接:
得到下面的反馈:
询问要移动多少个插槽,我们计划是3000个:
新的问题来了:
那个node来接收这些插槽??
显然是7004,那么7004节点的id是多少呢?
复制这个id,然后拷贝到刚才的控制台后:
这里询问,你的插槽是从哪里移动过来的?
- all:代表全部,也就是三个节点各转移一部分
- 具体的id:目标节点的id
- done:没有了
这里我们要从7001获取,因此填写7001的id:
填完后,点击done,这样插槽转移就准备好了:
确认要转移吗?输入yes:
然后,通过命令查看结果:
可以看到:
目的达成
故障转移
自动故障转移
当集群中有一个master宕机会发生什么呢?
直接停止一个redis实例,例如7002:
1 | redis-cli -p 7002 shutdown |
1)首先是该实例与其它实例失去连接
2)然后是疑似宕机:
3)最后是确定下线,自动提升一个slave为新的master:
4)当7002再次启动,就会变为一个slave节点了:
手动故障转移
利用cluster failover命令可以手动让集群中的某个master宕机,切换到执行cluster failover命令的这个slave节点,实现无感知的数据迁移。其流程如下:
这种failover命令可以指定三种模式:
- 缺省:默认的流程,如图1~6歩
- force:省略了对offset的一致性校验
- takeover:直接执行第5歩,忽略数据一致性、忽略master状态和其它master的意见
案例需求:在7002这个slave节点执行手动故障转移,重新夺回master地位
步骤如下:
1)利用redis-cli连接7002这个节点
2)执行cluster failover命令
如图:
效果:
restTemplate访问分片集群
RedisTemplate底层同样基于lettuce实现了分片集群的支持,而使用的步骤与哨兵模式基本一致:
1)引入redis的starter依赖
2)配置分片集群地址
3)配置读写分离
与哨兵模式相比,其中只有分片集群的配置方式略有差异,如下:
1 | spring: |
多级缓存
初识
传统的缓存策略一般是请求到达Tomcat后,先查询Redis,如果未命中则查询数据库,如图:
存在下面的问题:
•请求要经过Tomcat处理,Tomcat的性能成为整个系统的瓶颈
•Redis缓存失效时,会对数据库产生冲击
多级缓存就是充分利用请求处理的每个环节,分别添加缓存,减轻Tomcat压力,提升服务性能:
- 浏览器访问静态资源时,优先读取浏览器本地缓存
- 访问非静态资源(ajax查询数据)时,访问服务端
- 请求到达Nginx后,优先读取Nginx本地缓存
- 如果Nginx本地缓存未命中,则去直接查询Redis(不经过Tomcat)
- 如果Redis查询未命中,则查询Tomcat
- 请求进入Tomcat后,优先查询JVM进程缓存
- 如果JVM进程缓存未命中,则查询数据库
另外,我们的Tomcat服务将来也会部署为集群模式:
在多级缓存架构中,Nginx内部需要编写本地缓存查询、Redis查询、Tomcat查询的业务逻辑,因此这样的nginx服务不再是一个反向代理服务器,而是一个编写业务的Web服务器了
多级缓存的关键有两个:
一个是在nginx中编写业务,实现nginx本地缓存、Redis、Tomcat的查询
另一个就是在Tomcat中实现JVM进程缓存
其中Nginx编程则会用到OpenResty框架结合Lua这样的语言
案例导入
参考 -> 案例导入说明.md
JVM进程缓存-Caffeine
初识
缓存在日常开发中启动至关重要的作用,由于是存储在内存中,数据的读取速度是非常快的,能大量减少对数据库的访问,减少数据库的压力。我们把缓存分为两类:
- 分布式缓存,例如Redis:
- 优点:存储容量更大、可靠性更好、可以在集群间共享
- 缺点:访问缓存有网络开销
- 场景:缓存数据量较大、可靠性要求较高、需要在集群间共享
- 进程本地缓存,例如HashMap、GuavaCache:
- 优点:读取本地内存,没有网络开销,速度更快
- 缺点:存储容量有限、可靠性较低、无法共享
- 场景:性能要求较高,缓存数据量较小
Caffeine是一个基于Java8开发的,提供了近乎最佳命中率的高性能的本地缓存库。目前Spring内部的缓存使用的就是Caffeine。GitHub地址:https://github.com/ben-manes/caffeine
缓存使用的基本API:
1 |
|
Caffeine既然是缓存的一种,肯定需要有缓存的清除策略,不然的话内存总会有耗尽的时候。
Caffeine提供了三种缓存驱逐策略:
基于容量:设置缓存的数量上限
1
2
3
4// 创建缓存对象
Cache<String, String> cache = Caffeine.newBuilder()
.maximumSize(1) // 设置缓存大小上限为 1
.build();基于时间:设置缓存的有效时间
1
2
3
4
5
6// 创建缓存对象
Cache<String, String> cache = Caffeine.newBuilder()
// 设置缓存有效期为 10 秒,从最后一次写入开始计时
.expireAfterWrite(Duration.ofSeconds(10))
.build();基于引用:设置缓存为软引用或弱引用,利用GC来回收缓存数据。性能较差,不建议使用。
注意:在默认情况下,当一个缓存元素过期的时候,Caffeine不会自动立即将其清理和驱逐。而是在一次读或写操作后,或者在空闲时间完成对失效数据的驱逐
实践
需求:
- 给根据id查询商品的业务添加缓存,缓存未命中时查询数据库
- 给根据id查询商品库存的业务添加缓存,缓存未命中时查询数据库
- 缓存初始大小为100
- 缓存上限为10000
首先,定义两个Caffeine的缓存对象,分别保存商品、库存的缓存数据。
在item-service的com.heima.item.config
包下定义CaffeineConfig
类:
1 | package com.heima.item.config; |
然后,修改item-service中的com.heima.item.web
包下的ItemController类,添加缓存逻辑:
1 |
|
Lua语法入门
初识
Lua 是一种轻量小巧的脚本语言,用标准C语言编写并以源代码形式开放, 其设计目的是为了嵌入应用程序中,从而为应用程序提供灵活的扩展和定制功能。官网:https://www.lua.org/
Nginx本身也是C语言开发,因此也允许基于Lua做拓展
CentOS7默认已经安装了Lua语言环境,所以可以直接运行Lua代码。
1)在Linux虚拟机的任意目录下,新建一个hello.lua文件
2)添加下面的内容
1 | print("Hello World!") |
3)运行
变量
数据类型
Lua中支持的常见数据类型包括:
数据类型 | 描述 |
---|---|
nil | 这个最简单,只有值nil属于该类,表示一个无效值(在条件表达式中相当于false)。 |
boolean | 包含两个值:false和true |
number | 表示双精度类型的实浮点数 |
string | 字符串由一对双引号或单引号来表示 |
function | 由 C 或 Lua 编写的函数 |
table | Lua 中的表(table)其实是一个”关联数组”(associative arrays),数组的索引可以是数字、字符串或表类型。在 Lua 里,table 的创建是通过”构造表达式”来完成,最简单构造表达式是{},用来创建一个空表。 |
另外,Lua提供了type()函数来判断一个变量的数据类型:
声明变量
Lua声明变量的时候无需指定数据类型,而是用local来声明变量为局部变量:
1 | -- 声明字符串,可以用单引号或双引号, |
Lua中的table类型既可以作为数组,又可以作为Java中的map来使用。数组就是特殊的table,key是数组角标而已:
1 | -- 声明数组 ,key为角标的 table |
Lua中的数组角标是从1开始,访问的时候与Java中类似:
1 | -- 访问数组,lua数组的角标从1开始 |
Lua中的table可以用key来访问:
1 | -- 访问table |
循环
对于table,我们可以利用for循环来遍历,不过数组和普通table遍历略有差异
遍历数组:
1 | -- 声明数组 key为索引的 table |
遍历普通table
1 | -- 声明map,也就是table |
条件控制
Lua中的条件控制和函数声明与Java类似
1 | if(布尔表达式) |
布尔表达式中的逻辑运算是基于英文单词:
函数
定义函数的语法:
1 | function 函数名( argument1, argument2..., argumentn) |
例如,定义一个函数,用来打印数组:
1 | function printArr(arr) |
多级缓存
多级缓存的实现离不开Nginx编程,而Nginx编程又离不开OpenResty
OpenResty
OpenResty® 是一个基于 Nginx的高性能 Web 平台,用于方便地搭建能够处理超高并发、扩展性极高的动态 Web 应用、Web 服务和动态网关,官方网站: https://openresty.org/cn/
特点:
- 具备Nginx的完整功能
- 基于Lua语言进行扩展,集成了大量精良的 Lua 库、第三方模块
- 允许使用Lua自定义业务逻辑、自定义库
安装OpenResty
-> 安装OpenResty.md
入门
我们希望达到的多级缓存架构如图:
其中:
- windows上的nginx用来做反向代理服务,将前端的查询商品的ajax请求代理到OpenResty集群
- OpenResty集群用来编写多级缓存业务
反向代理流程
现在,商品详情页使用的是假的商品数据。不过在浏览器中,可以看到页面有发起ajax请求查询真实商品数据。
这个请求如下:
请求地址是localhost,端口是80,就被windows上安装的Nginx服务给接收到了。然后代理给了OpenResty集群:
我们需要在OpenResty中编写业务,查询商品数据并返回到浏览器
OpenResty监听请求
OpenResty的很多功能都依赖于其目录下的Lua库,需要在nginx.conf中指定依赖库的目录,并导入依赖:
1)添加对OpenResty的Lua模块的加载
修改/usr/local/openresty/nginx/conf/nginx.conf
文件,在其中的http下面,添加下面代码:
1 | #lua 模块 |
2)监听/api/item路径
修改/usr/local/openresty/nginx/conf/nginx.conf
文件,在nginx.conf的server下面,添加对/api/item这个路径的监听:
1 | location /api/item { |
这个监听,就类似于SpringMVC中的@GetMapping("/api/item")
做路径映射
而content_by_lua_file lua/item.lua
则相当于调用item.lua这个文件,执行其中的业务,把结果返回给用户。相当于java中调用service
编写item.lua
1)在/usr/loca/openresty/nginx
目录创建文件夹:lua
2)在/usr/loca/openresty/nginx/lua
文件夹下,新建文件:item.lua
3)编写item.lua,返回假数据
item.lua中,利用ngx.say()函数返回数据到Response中
1 | ngx.say('{"id":10001,"name":"SALSA AIR","title":"RIMOWA 21寸托运箱拉杆箱 SALSA AIR系列果绿色 820.70.36.4","price":17900,"image":"https://m.360buyimg.com/mobilecms/s720x720_jfs/t6934/364/1195375010/84676/e9f2c55f/597ece38N0ddcbc77.jpg!q70.jpg.webp","category":"拉杆箱","brand":"RIMOWA","spec":"","status":1,"createTime":"2019-04-30T16:00:00.000+00:00","updateTime":"2019-04-30T16:00:00.000+00:00","stock":2999,"sold":31290}') |
4)重新加载配置
1 | nginx -s reload |
请求参数处理
获取参数的API
OpenResty中提供了一些API用来获取不同类型的前端请求参数:
获取参数并返回
在前端发起的ajax请求如图:
可以看到商品id是以路径占位符方式传递的,因此可以利用正则表达式匹配的方式来获取ID
1)获取商品id
修改/usr/loca/openresty/nginx/nginx.conf
文件中监听/api/item的代码,利用正则表达式获取ID:
1 | location ~ /api/item/(\d+) { |
2)拼接ID并返回
修改/usr/loca/openresty/nginx/lua/item.lua
文件,获取id并拼接到结果中返回:
1 | -- 获取商品id |
3)重新加载并测试
运行命令以重新加载OpenResty配置:
1 | nginx -s reload |
刷新页面可以看到结果中已经带上了ID
查询Tomcat
拿到商品ID后,本应去缓存中查询商品信息,不过目前我们还未建立nginx、redis缓存。因此,这里我们先根据商品id去tomcat查询商品信息。我们实现如图部分:
需要注意的是,我们的OpenResty是在虚拟机,Tomcat是在Windows电脑上。两者IP一定不要搞错了。
发送http请求的api
nginx提供了内部API用以发送http请求:
1 | local resp = ngx.location.capture("/path",{ |
返回的响应内容包括:
- resp.status:响应状态码
- resp.header:响应头,是一个table
- resp.body:响应体,就是响应数据
注意:这里的path是路径,并不包含IP和端口。这个请求会被nginx内部的server监听并处理。
但是我们希望这个请求发送到Tomcat服务器,所以还需要编写一个server来对这个路径做反向代理:
1 | location /path { |
原理如图:
封装http工具
封装一个发送Http请求的工具,基于ngx.location.capture来实现查询tomcat
1)添加反向代理,到windows的Java服务
因为item-service中的接口都是/item开头,所以我们监听/item路径,代理到windows上的tomcat服务。
修改 /usr/local/openresty/nginx/conf/nginx.conf
文件,添加一个location:
1 | location /item { |
以后,只要我们调用ngx.location.capture("/item")
,就一定能发送请求到windows的tomcat服务。
2)封装工具类
之前我们说过,OpenResty启动时会加载以下两个目录中的工具文件:
所以,自定义的http工具也需要放到这个目录下。
在/usr/local/openresty/lualib
目录下,新建一个common.lua文件:
1 | vi /usr/local/openresty/lualib/common.lua |
内容如下:
1 | -- 封装函数,发送http请求,并解析响应 |
这个工具将read_http函数封装到_M这个table类型的变量中,并且返回,这类似于导出。
使用的时候,可以利用require('common')
来导入该函数库,这里的common是函数库的文件名。
3)实现商品查询
最后,我们修改/usr/local/openresty/lua/item.lua
文件,利用刚刚封装的函数库实现对tomcat的查询:
1 | -- 引入自定义common工具模块,返回值是common中返回的 _M |
这里查询到的结果是json字符串,并且包含商品、库存两个json字符串,页面最终需要的是把两个json拼接为一个json:
这就需要我们先把JSON变为lua的table,完成数据整合后,再转为JSON。
CJSON工具类
OpenResty提供了一个cjson的模块用来处理JSON的序列化和反序列化。
官方地址: https://github.com/openresty/lua-cjson/
1)引入cjson模块:
1 | local cjson = require "cjson" |
2)序列化:
1 | local obj = { |
3)反序列化:
1 | local json = '{"name": "jack", "age": 21}' |
实现tomcat查询
修改之前的item.lua中的业务,添加json处理功能:
1 | -- 导入common函数库 |
基于ID负载均衡
刚才的代码中,我们的tomcat是单机部署。而实际开发中,tomcat一定是集群模式:
因此,OpenResty需要对tomcat集群做负载均衡。
而默认的负载均衡规则是轮询模式,当我们查询/item/10001时:
- 第一次会访问8081端口的tomcat服务,在该服务内部就形成了JVM进程缓存
- 第二次会访问8082端口的tomcat服务,该服务内部没有JVM缓存(因为JVM缓存无法共享),会查询数据库
- …
你看,因为轮询的原因,第一次查询8081形成的JVM缓存并未生效,直到下一次再次访问到8081时才可以生效,缓存命中率太低了
如果能让同一个商品,每次查询时都访问同一个tomcat服务,那么JVM缓存就一定能生效了。
也就是说,我们需要根据商品id做负载均衡,而不是轮询
原理
nginx提供了基于请求路径做负载均衡的算法:
nginx根据请求路径做hash运算,把得到的数值对tomcat服务的数量取余,余数是几,就访问第几个服务,实现负载均衡
例如:
- 我们的请求路径是 /item/10001
- tomcat总数为2台(8081、8082)
- 对请求路径/item/1001做hash运算求余的结果为1
- 则访问第一个tomcat服务,也就是8081
只要id不变,每次hash运算结果也不会变,那就可以保证同一个商品,一直访问同一个tomcat服务,确保JVM缓存生效
实现
修改/usr/local/openresty/nginx/conf/nginx.conf
文件,实现基于ID做负载均衡。
首先,定义tomcat集群,并设置基于路径做负载均衡:
1 | upstream tomcat-cluster { |
然后,修改对tomcat服务的反向代理,目标指向tomcat集群:
1 | location /item { |
重新加载OpenResty
1 | nginx -s reload |
测试
启动两台tomcat服务:
同时启动:
清空日志后,再次访问页面,可以看到不同id的商品,访问到了不同的tomcat服务:
Redis缓存预热
Redis缓存会面临冷启动问题:
冷启动:服务刚刚启动时,Redis中并没有缓存,如果所有商品数据都在第一次查询时添加缓存,可能会给数据库带来较大压力
缓存预热:在实际开发中,我们可以利用大数据统计用户访问的热点数据,在项目启动时将这些热点数据提前查询并保存到Redis中
我们数据量较少,并且没有数据统计相关功能,目前可以在启动时将所有数据都放入缓存中
1)利用Docker安装Redis
1 | docker run --name redis -p 6379:6379 -d redis redis-server --appendonly yes |
2)在item-service服务中引入Redis依赖
1 | <dependency> |
3)配置Redis地址
1 | spring: |
4)编写初始化类
缓存预热需要在项目启动时完成,并且必须是拿到RedisTemplate之后。
这里我们利用InitializingBean接口来实现,因为InitializingBean可以在对象被Spring创建并且成员变量全部注入后执行。
1 | package com.heima.item.config; |
查询Redis缓存
现在,Redis缓存已经准备就绪,我们可以再OpenResty中实现查询Redis的逻辑了。如下图红框所示:
当请求进入OpenResty之后:
- 优先查询Redis缓存
- 如果Redis缓存未命中,再查询Tomcat
封装Redis工具
OpenResty提供了操作Redis的模块,我们只要引入该模块就能直接使用。但是为了方便,我们将Redis操作封装到之前的common.lua工具库中。
修改
/usr/local/openresty/lualib/common.lua
文件:1)引入Redis模块,并初始化Redis对象
1
2
3
4
5-- 导入redis
local redis = require('resty.redis')
-- 初始化redis
local red = redis:new()
red:set_timeouts(1000, 1000, 1000)2)封装函数,用来释放Redis连接,其实是放入连接池
1
2
3
4
5
6
7
8
9-- 关闭redis连接的工具方法,其实是放入连接池
local function close_redis(red)
local pool_max_idle_time = 10000 -- 连接的空闲时间,单位是毫秒
local pool_size = 100 --连接池大小
local ok, err = red:set_keepalive(pool_max_idle_time, pool_size)
if not ok then
ngx.log(ngx.ERR, "放入redis连接池失败: ", err)
end
end3)封装函数,根据key查询Redis数据
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22-- 查询redis的方法 ip和port是redis地址,key是查询的key
local function read_redis(ip, port, key)
-- 获取一个连接
local ok, err = red:connect(ip, port)
if not ok then
ngx.log(ngx.ERR, "连接redis失败 : ", err)
return nil
end
-- 查询redis
local resp, err = red:get(key)
-- 查询失败处理
if not resp then
ngx.log(ngx.ERR, "查询Redis失败: ", err, ", key = " , key)
end
--得到的数据为空处理
if resp == ngx.null then
resp = nil
ngx.log(ngx.ERR, "查询Redis数据为空, key = ", key)
end
close_redis(red)
return resp
end4)导出
1
2
3
4
5
6-- 将方法导出
local _M = {
read_http = read_http,
read_redis = read_redis
}
return _M完整的common.lua:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58-- 导入redis
local redis = require('resty.redis')
-- 初始化redis
local red = redis:new()
red:set_timeouts(1000, 1000, 1000)
-- 关闭redis连接的工具方法,其实是放入连接池
local function close_redis(red)
local pool_max_idle_time = 10000 -- 连接的空闲时间,单位是毫秒
local pool_size = 100 --连接池大小
local ok, err = red:set_keepalive(pool_max_idle_time, pool_size)
if not ok then
ngx.log(ngx.ERR, "放入redis连接池失败: ", err)
end
end
-- 查询redis的方法 ip和port是redis地址,key是查询的key
local function read_redis(ip, port, key)
-- 获取一个连接
local ok, err = red:connect(ip, port)
if not ok then
ngx.log(ngx.ERR, "连接redis失败 : ", err)
return nil
end
-- 查询redis
local resp, err = red:get(key)
-- 查询失败处理
if not resp then
ngx.log(ngx.ERR, "查询Redis失败: ", err, ", key = " , key)
end
--得到的数据为空处理
if resp == ngx.null then
resp = nil
ngx.log(ngx.ERR, "查询Redis数据为空, key = ", key)
end
close_redis(red)
return resp
end
-- 封装函数,发送http请求,并解析响应
local function read_http(path, params)
local resp = ngx.location.capture(path,{
method = ngx.HTTP_GET,
args = params,
})
if not resp then
-- 记录错误信息,返回404
ngx.log(ngx.ERR, "http查询失败, path: ", path , ", args: ", args)
ngx.exit(404)
end
return resp.body
end
-- 将方法导出
local _M = {
read_http = read_http,
read_redis = read_redis
}
return _M实现Redis查询
查询逻辑是:
- 根据id查询Redis
- 如果查询失败则继续查询Tomcat
- 将查询结果返回
1)修改
/usr/local/openresty/lua/item.lua
文件,添加一个查询函数:1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17-- 导入common函数库
local common = require('common')
local read_http = common.read_http
local read_redis = common.read_redis
-- 封装查询函数
function read_data(key, path, params)
-- 查询本地缓存
local val = read_redis("127.0.0.1", 6379, key)
-- 判断查询结果
if not val then
ngx.log(ngx.ERR, "redis查询失败,尝试查询http, key: ", key)
-- redis查询失败,去查询http
val = read_http(path, params)
end
-- 返回数据
return val
end2)而后修改商品查询、库存查询的业务:
3)完整的item.lua代码:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38-- 导入common函数库
local common = require('common')
local read_http = common.read_http
local read_redis = common.read_redis
-- 导入cjson库
local cjson = require('cjson')
-- 封装查询函数
function read_data(key, path, params)
-- 查询本地缓存
local val = read_redis("127.0.0.1", 6379, key)
-- 判断查询结果
if not val then
ngx.log(ngx.ERR, "redis查询失败,尝试查询http, key: ", key)
-- redis查询失败,去查询http
val = read_http(path, params)
end
-- 返回数据
return val
end
-- 获取路径参数
local id = ngx.var[1]
-- 查询商品信息
local itemJSON = read_data("item:id:" .. id, "/item/" .. id, nil)
-- 查询库存信息
local stockJSON = read_data("item:stock:id:" .. id, "/item/stock/" .. id, nil)
-- JSON转化为lua的table
local item = cjson.decode(itemJSON)
local stock = cjson.decode(stockJSON)
-- 组合数据
item.stock = stock.stock
item.sold = stock.sold
-- 把item序列化为json 返回结果
ngx.say(cjson.encode(item))
Nginx本地缓存
本地缓存API
OpenResty为Nginx提供了shard dict的功能,可以在nginx的多个worker之间共享数据,实现缓存功能。
1)开启共享字典,在nginx.conf的http下添加配置:
1 | # 共享字典,也就是本地缓存,名称叫做:item_cache,大小150m |
2)操作共享字典:
1 | -- 获取本地缓存对象 |
实现本地缓存查询
1)修改/usr/local/openresty/lua/item.lua
文件,修改read_data查询函数,添加本地缓存逻辑:
1 | -- 导入共享词典,本地缓存 |
2)修改item.lua中查询商品和库存的业务,实现最新的read_data函数:
其实就是多了缓存时间参数,过期后nginx缓存会自动删除,下次访问即可更新缓存。
这里给商品基本信息设置超时时间为30分钟,库存为1分钟。
因为库存更新频率较高,如果缓存时间过长,可能与数据库差异较大。
3)完整的item.lua文件:
1 | -- 导入common函数库 |
缓存同步策略
大多数情况下,浏览器查询到的都是缓存数据,如果缓存数据与数据库数据存在较大差异,可能会产生比较严重的后果。
所以我们必须保证数据库数据、缓存数据的一致性,这就是缓存与数据库的同步。
数据同步策略
缓存数据同步的常见方式有三种:
设置有效期:给缓存设置有效期,到期后自动删除。再次查询时更新
- 优势:简单、方便
- 缺点:时效性差,缓存过期之前可能不一致
- 场景:更新频率较低,时效性要求低的业务
同步双写:在修改数据库的同时,直接修改缓存
- 优势:时效性强,缓存与数据库强一致
- 缺点:有代码侵入,耦合度高;
- 场景:对一致性、时效性要求较高的缓存数据
异步通知:修改数据库时发送事件通知,相关服务监听到通知后修改缓存数据
- 优势:低耦合,可以同时通知多个缓存服务
- 缺点:时效性一般,可能存在中间不一致状态
- 场景:时效性要求一般,有多个服务需要同步
而异步实现又可以基于MQ或者Canal来实现:
1)基于MQ的异步通知:
解读:
- 商品服务完成对数据的修改后,只需要发送一条消息到MQ中。
- 缓存服务监听MQ消息,然后完成对缓存的更新
依然有少量的代码侵入。
2)基于Canal的通知
解读:
- 商品服务完成商品修改后,业务直接结束,没有任何代码侵入
- Canal监听MySQL变化,当发现变化后,立即通知缓存服务
- 缓存服务接收到canal通知,更新缓存
代码零侵入
Canal初识
Canal [kə’næl],译意为水道/管道/沟渠,canal是阿里巴巴旗下的一款开源项目,基于Java开发。基于数据库增量日志解析,提供增量数据订阅&消费。GitHub的地址:https://github.com/alibaba/canal
Canal是基于mysql的主从同步来实现的,MySQL主从同步的原理如下:
- 1)MySQL master 将数据变更写入二进制日志( binary log),其中记录的数据叫做binary log events
- 2)MySQL slave 将 master 的 binary log events拷贝到它的中继日志(relay log)
- 3)MySQL slave 重放 relay log 中事件,将数据变更反映它自己的数据
而Canal就是把自己伪装成MySQL的一个slave节点,从而监听master的binary log变化。再把得到的变化信息通知给Canal的客户端,进而完成对其它数据库的同步。
Canal安装
参考 -> 安装Canal
监听Canal
Canal提供了各种语言的客户端,当Canal监听到binlog变化时,会通知Canal的客户端。
我们可以利用Canal提供的Java客户端,监听Canal通知消息。当收到变化的消息时,完成对缓存的更新。
不过这里会使用GitHub上的第三方开源的canal-starter客户端。地址:https://github.com/NormanGyllenhaal/canal-client与SpringBoot完美整合,自动装配,比官方客户端要简单好用很多
引入依赖
1
2
3
4
5<dependency>
<groupId>top.javatool</groupId>
<artifactId>canal-spring-boot-starter</artifactId>
<version>1.2.1-RELEASE</version>
</dependency>编写配置
1
2
3canal:
destination: heima # canal的集群名字,要与安装canal时设置的名称一致
server: 192.168.150.101:11111 # canal服务地址修改item实体类
通过@Id、@Column、等注解完成Item与数据库表字段的映射:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37package com.heima.item.pojo;
import com.baomidou.mybatisplus.annotation.IdType;
import com.baomidou.mybatisplus.annotation.TableField;
import com.baomidou.mybatisplus.annotation.TableId;
import com.baomidou.mybatisplus.annotation.TableName;
import lombok.Data;
import org.springframework.data.annotation.Id;
import org.springframework.data.annotation.Transient;
import javax.persistence.Column;
import java.util.Date;
public class Item {
private Long id;//商品id
private String name;//商品名称
private String title;//商品标题
private Long price;//价格(分)
private String image;//商品图片
private String category;//分类名称
private String brand;//品牌名称
private String spec;//规格
private Integer status;//商品状态 1-正常,2-下架
private Date createTime;//创建时间
private Date updateTime;//更新时间
private Integer stock;
private Integer sold;
}编写监听器
通过实现
EntryHandler<T>
接口编写监听器,监听Canal消息。注意两点:- 实现类通过
@CanalTable("tb_item")
指定监听的表信息 - EntryHandler的泛型是与表对应的实体类
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43package com.heima.item.canal;
import com.github.benmanes.caffeine.cache.Cache;
import com.heima.item.config.RedisHandler;
import com.heima.item.pojo.Item;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import top.javatool.canal.client.annotation.CanalTable;
import top.javatool.canal.client.handler.EntryHandler;
public class ItemHandler implements EntryHandler<Item> {
private RedisHandler redisHandler;
private Cache<Long, Item> itemCache;
public void insert(Item item) {
// 写数据到JVM进程缓存
itemCache.put(item.getId(), item);
// 写数据到redis
redisHandler.saveItem(item);
}
public void update(Item before, Item after) {
// 写数据到JVM进程缓存
itemCache.put(after.getId(), after);
// 写数据到redis
redisHandler.saveItem(after);
}
public void delete(Item item) {
// 删除数据到JVM进程缓存
itemCache.invalidate(item.getId());
// 删除数据到redis
redisHandler.deleteItemById(item.getId());
}
}在这里对Redis的操作都封装到了RedisHandler这个对象中,是我们之前做缓存预热时编写的一个类,内容如下:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65package com.heima.item.config;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.heima.item.pojo.Item;
import com.heima.item.pojo.ItemStock;
import com.heima.item.service.IItemService;
import com.heima.item.service.IItemStockService;
import org.springframework.beans.factory.InitializingBean;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.redis.core.StringRedisTemplate;
import org.springframework.stereotype.Component;
import java.util.List;
public class RedisHandler implements InitializingBean {
private StringRedisTemplate redisTemplate;
private IItemService itemService;
private IItemStockService stockService;
private static final ObjectMapper MAPPER = new ObjectMapper();
public void afterPropertiesSet() throws Exception {
// 初始化缓存
// 1.查询商品信息
List<Item> itemList = itemService.list();
// 2.放入缓存
for (Item item : itemList) {
// 2.1.item序列化为JSON
String json = MAPPER.writeValueAsString(item);
// 2.2.存入redis
redisTemplate.opsForValue().set("item:id:" + item.getId(), json);
}
// 3.查询商品库存信息
List<ItemStock> stockList = stockService.list();
// 4.放入缓存
for (ItemStock stock : stockList) {
// 2.1.item序列化为JSON
String json = MAPPER.writeValueAsString(stock);
// 2.2.存入redis
redisTemplate.opsForValue().set("item:stock:id:" + stock.getId(), json);
}
}
public void saveItem(Item item) {
try {
String json = MAPPER.writeValueAsString(item);
redisTemplate.opsForValue().set("item:id:" + item.getId(), json);
} catch (JsonProcessingException e) {
throw new RuntimeException(e);
}
}
public void deleteItemById(Long id) {
redisTemplate.delete("item:id:" + id);
}
}- 实现类通过
RabbitMQ的高级特性
消息队列在使用过程中,面临着很多实际问题需要思考:
消息可靠性
消息从发送,到消费者接收,会经历多个过程:
其中的每一步都可能导致消息丢失,常见的丢失原因包括:
- 发送时丢失:
- 生产者发送的消息未送达exchange
- 消息到达exchange后未到达queue
- MQ宕机,queue将消息丢失
- consumer接收到消息后未消费就宕机
针对这些问题,RabbitMQ分别给出了解决方案:
- 生产者确认机制
- mq持久化
- 消费者确认机制
- 失败重试机制
生产者消息确认
RabbitMQ提供了publisher confirm机制来避免消息发送到MQ过程中丢失。这种机制必须给每个消息指定一个唯一ID。消息发送到MQ以后,会返回一个结果给发送者,表示消息是否处理成功
返回结果有两种方式:
- publisher-confirm,发送者确认
- 消息成功投递到交换机,返回ack
- 消息未投递到交换机,返回nack
- publisher-return,发送者回执
- 消息投递到交换机了,但是没有路由到队列。返回ACK,及路由失败原因
注意:确认机制发送消息时,需要给每个消息设置一个全局唯一id,以区分不同消息,避免ack冲突
首先,修改publisher服务中的application.yml文件,添加下面的内容:
1
2
3
4
5
6
7spring:
rabbitmq:
publisher-confirm-type: correlated
publisher-returns: true
template:
mandatory: true
说明:
publish-confirm-type
:开启publisher-confirm,这里支持两种类型:simple
:同步等待confirm结果,直到超时correlated
:异步回调,定义ConfirmCallback,MQ返回结果时会回调这个ConfirmCallback
publish-returns
:开启publish-return功能,同样是基于callback机制,不过是定义ReturnCallbacktemplate.mandatory
:定义消息路由失败时的策略true,则调用ReturnCallback;false:则直接丢弃消息
定义Return回调
每个RabbitTemplate只能配置一个ReturnCallback,因此需要在项目加载时配置:
修改publisher服务,添加一个:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25package cn.itcast.mq.config;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.BeansException;
import org.springframework.context.ApplicationContext;
import org.springframework.context.ApplicationContextAware;
import org.springframework.context.annotation.Configuration;
public class CommonConfig implements ApplicationContextAware {
public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
// 获取RabbitTemplate
RabbitTemplate rabbitTemplate = applicationContext.getBean(RabbitTemplate.class);
// 设置ReturnCallback
rabbitTemplate.setReturnCallback((message, replyCode, replyText, exchange, routingKey) -> {
// 投递失败,记录日志
log.info("消息发送失败,应答码{},原因{},交换机{},路由键{},消息{}",
replyCode, replyText, exchange, routingKey, message.toString());
// 如果有业务需要,可以重发消息
});
}
}定义ConfirmCallback
ConfirmCallback可以在发送消息时指定,因为每个业务处理confirm成功或失败的逻辑不一定相同
在publisher服务的cn.itcast.mq.spring.SpringAmqpTest类中,定义一个单元测试方法:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24public void testSendMessage2SimpleQueue() throws InterruptedException {
// 1.消息体
String message = "hello, spring amqp!";
// 2.全局唯一的消息ID,需要封装到CorrelationData中
CorrelationData correlationData = new CorrelationData(UUID.randomUUID().toString());
// 3.添加callback
correlationData.getFuture().addCallback(
result -> {
if(result.isAck()){
// 3.1.ack,消息成功
log.debug("消息发送成功, ID:{}", correlationData.getId());
}else{
// 3.2.nack,消息失败
log.error("消息发送失败, ID:{}, 原因{}",correlationData.getId(), result.getReason());
}
},
ex -> log.error("消息发送异常, ID:{}, 原因{}",correlationData.getId(),ex.getMessage())
);
// 4.发送消息
rabbitTemplate.convertAndSend("task.direct", "task", message, correlationData);
// 休眠一会儿,等待ack回执
Thread.sleep(2000);
}
消息持久化
生产者确认可以确保消息投递到RabbitMQ的队列中,但是消息发送到RabbitMQ以后,如果突然宕机,也可能导致消息丢失
要想确保消息在RabbitMQ中安全保存,必须开启消息持久化机制
- 交换机持久化
- 队列持久化
- 消息持久化
交换机持久化
RabbitMQ中交换机默认是非持久化的,mq重启后就丢失
SpringAMQP中可以通过代码指定交换机持久化:
1 |
|
事实上,默认情况下,由SpringAMQP声明的交换机都是持久化的
可以在RabbitMQ控制台看到持久化的交换机都会带上D
的标示:
队列持久化
RabbitMQ中队列默认是非持久化的,mq重启后就丢失
SpringAMQP中可以通过代码指定交换机持久化:
1 |
|
事实上,默认情况下,由SpringAMQP声明的队列都是持久化的
可以在RabbitMQ控制台看到持久化的队列都会带上D
的标示:
消息持久化
利用SpringAMQP发送消息时,可以设置消息的属性(MessageProperties),指定delivery-mode:
- 1:非持久化
- 2:持久化
用java代码指定:
1 | Message msg = MessageBuilder |
默认情况下,SpringAMQP发出的任何消息都是持久化的,不用特意指定
消费者消息确认
RabbitMQ是阅后即焚机制,RabbitMQ确认消息被消费者消费后会立刻删除
而RabbitMQ是通过消费者回执来确认消费者是否成功处理消息的:消费者获取消息后,应该向RabbitMQ发送ACK回执,表明自己已经处理消息
SpringAMQP则允许配置三种确认模式:
- manual:手动ack,需要在业务代码结束后,调用api发送ack
- auto:自动ack,由spring监测listener代码是否出现异常,没有异常则返回ack;抛出异常则返回nack
- none:关闭ack,MQ假定消费者获取消息后会成功处理,因此消息投递后立即被删除
none模式
修改consumer服务的application.yml文件,添加下面内容:
1 | spring: |
修改consumer服务的SpringRabbitListener类中的方法,模拟一个消息处理异常:
1 |
|
测试可以发现,当消息处理抛异常时,消息依然被RabbitMQ删除了
auto模式
把确认机制修改为auto:
1 | spring: |
在异常位置打断点,再次发送消息,程序卡在断点时,可以发现此时消息状态为unack(未确定状态):
抛出异常后,因为Spring会自动返回nack,所以消息恢复至Ready状态,并且没有被RabbitMQ删除:
消费失败重试机制
当消费者出现异常后,消息会不断requeue(重入队)到队列,再重新发送给消费者,然后再次异常,再次requeue,无限循环,导致mq的消息处理飙升,带来不必要的压力:
本地重试
可以利用Spring的retry机制,在消费者出现异常时利用本地重试,而不是无限制的requeue到mq队列。
修改consumer服务的application.yml文件,添加内容:
1 | spring: |
重启consumer服务,重复之前的测试。可以发现:
- 在重试3次后,SpringAMQP会抛出异常AmqpRejectAndDontRequeueException,说明本地重试触发了
- 查看RabbitMQ控制台,发现消息被删除了,说明最后SpringAMQP返回的是ack,mq删除消息了
结论:
- 开启本地重试时,消息处理过程中抛出异常,不会requeue到队列,而是在消费者本地重试
- 重试达到最大次数后,Spring会返回ack,消息会被丢弃
失败策略
在之前的测试中,达到最大重试次数后,消息会被丢弃,这是由Spring内部机制决定的
在开启重试模式后,重试次数耗尽,如果消息依然失败,则需要有MessageRecovery接口来处理,它包含三种不同的实现:
RejectAndDontRequeueRecoverer:重试耗尽后,直接reject,丢弃消息。默认就是这种方式
ImmediateRequeueMessageRecoverer:重试耗尽后,返回nack,消息重新入队
RepublishMessageRecoverer:重试耗尽后,将失败消息投递到指定的交换机
比较优雅的一种处理方案是RepublishMessageRecoverer,失败后将消息投递到一个指定的,专门存放异常消息的队列,后续由人工集中处理
1)在consumer服务中定义处理失败消息的交换机和队列
1 |
|
2)定义一个RepublishMessageRecoverer,关联队列和交换机
1 |
|
完整代码:
1 | package cn.itcast.mq.config; |
总结
如何确保RabbitMQ消息的可靠性?
- 开启生产者确认机制,确保生产者的消息能到达队列
- 开启持久化功能,确保消息未消费前在队列中不会丢失
- 开启消费者确认机制为auto,由spring确认消息处理成功后完成ack
- 开启消费者失败重试机制,并设置MessageRecoverer,多次重试失败后将消息投递到异常交换机,交由人工处理
死信交换机
初识死信交换机
当一个队列中的消息满足下列情况之一时,可以成为死信(dead letter):
- 消费者使用basic.reject或 basic.nack声明消费失败,并且消息的requeue参数设置为false
- 消息是一个过期消息,超时无人消费
- 要投递的队列消息满了,无法投递
如果这个包含死信的队列配置了dead-letter-exchange
属性,指定了一个交换机,那么队列中的死信就会投递到这个交换机中,而这个交换机称为死信交换机(Dead Letter Exchange,检查DLX)
如图,一个消息被消费者拒绝了,变成了死信:
因为simple.queue绑定了死信交换机 dl.direct,因此死信会投递给这个交换机:
如果这个死信交换机也绑定了一个队列,则消息最终会进入这个存放死信的队列:
另外,队列将死信投递给死信交换机时,必须知道两个信息:
- 死信交换机名称
- 死信交换机与死信队列绑定的RoutingKey
这样才能确保投递的消息能到达死信交换机,并且正确的路由到死信队列
实践
在失败重试策略中,默认的RejectAndDontRequeueRecoverer会在本地重试次数耗尽后,发送reject给RabbitMQ,消息变成死信,被丢弃。
我们可以给simple.queue添加一个死信交换机,给死信交换机绑定一个队列。这样消息变成死信后也不会丢弃,而是最终投递到死信交换机,路由到与死信交换机绑定的队列
我们在consumer服务中,定义一组死信交换机、死信队列:
1 | // 声明普通的 simple.queue队列,并且为其指定死信交换机:dl.direct |
TTL
一个队列中的消息如果超时未消费,则会变为死信,超时分为两种情况:
- 消息所在的队列设置了超时时间
- 消息本身设置了超时时间
接收超时死信的死信交换机
在consumer服务的SpringRabbitListener中,定义一个新的消费者,并且声明 死信交换机、死信队列:
1
2
3
4
5
6
7
8
public void listenDlQueue(String msg){
log.info("接收到 dl.ttl.queue的延迟消息:{}", msg);
}声明一个队列,并且指定TTL
要给队列设置超时时间,需要在声明队列时配置x-message-ttl属性:
1
2
3
4
5
6
7
public Queue ttlQueue(){
return QueueBuilder.durable("ttl.queue") // 指定队列名称,并持久化
.ttl(10000) // 设置队列的超时时间,10秒
.deadLetterExchange("dl.ttl.direct") // 指定死信交换机
.build();
}注意,这个队列设定了死信交换机为
dl.ttl.direct
声明交换机,将ttl与交换机绑定:
1
2
3
4
5
6
7
8
public DirectExchange ttlExchange(){
return new DirectExchange("ttl.direct");
}
public Binding ttlBinding(){
return BindingBuilder.bind(ttlQueue()).to(ttlExchange()).with("ttl");
}发送消息,但是不要指定TTL:
1
2
3
4
5
6
7
8
9
10
11
public void testTTLQueue() {
// 创建消息
String message = "hello, ttl queue";
// 消息ID,需要封装到CorrelationData中
CorrelationData correlationData = new CorrelationData(UUID.randomUUID().toString());
// 发送消息
rabbitTemplate.convertAndSend("ttl.direct", "ttl", message, correlationData);
// 记录日志
log.debug("发送消息成功");
}发送消息的日志:
查看下接收消息的日志:
因为队列的TTL值是10000ms,也就是10秒。可以看到消息发送与接收之间的时差刚好是10秒。
发送消息时,设定TTL
在发送消息时,也可以指定TTL:
1
2
3
4
5
6
7
8
9
10
11
12
13
public void testTTLMsg() {
// 创建消息
Message message = MessageBuilder
.withBody("hello, ttl message".getBytes(StandardCharsets.UTF_8))
.setExpiration("5000")
.build();
// 消息ID,需要封装到CorrelationData中
CorrelationData correlationData = new CorrelationData(UUID.randomUUID().toString());
// 发送消息
rabbitTemplate.convertAndSend("ttl.direct", "ttl", message, correlationData);
log.debug("发送消息成功");
}查看发送消息日志:
接收消息日志:
这次,发送与接收的延迟只有5秒。说明当队列、消息都设置了TTL时,任意一个到期就会成为死信
延迟队列
利用TTL结合死信交换机,我们实现了消息发出后,消费者延迟收到消息的效果。这种消息模式就称为延迟队列(Delay Queue)模式。
延迟队列的使用场景包括:
- 延迟发送短信
- 用户下单,如果用户在15 分钟内未支付,则自动取消
- 预约工作会议,20分钟后自动通知所有参会人员
因为延迟队列的需求非常多,所以RabbitMQ的官方也推出了一个插件,原生支持延迟队列效果。
这个插件就是DelayExchange插件。参考RabbitMQ的插件列表页面:https://www.rabbitmq.com/community-plugins.html
使用方式可以参考官网地址:https://blog.rabbitmq.com/posts/2015/04/scheduling-messages-with-rabbitmq
安装DelayExchange插件
参考-> RabbitMQ部署指南.md
DelayExchange原理
elayExchange需要将一个交换机声明为delayed类型。当我们发送消息到delayExchange时,流程如下:
- 接收消息
- 判断消息是否具备x-delay属性
- 如果有x-delay属性,说明是延迟消息,持久化到硬盘,读取x-delay值,作为延迟时间
- 返回routing not found结果给消息发送者
- x-delay时间到期后,重新投递消息到指定队列
实践
插件的使用也非常简单:声明一个交换机,交换机的类型可以是任意类型,只需要设定delayed属性为true即可,然后声明队列与其绑定即可
声明DelayExchange交换机
基于注解方式(推荐):
也可以基于@Bean的方式:
发送消息
发送消息时,一定要携带x-delay属性,指定延迟的时间:
报错处理
在进行测试发现,控制台会输出错误,这个错误是正常的,不影响使用,主要用来提示一开始消息没被消费的警告,不展示该错误的方式如下:在配置returnCallback里面进行判断,略过该错误
惰性队列
消息堆积问题
生产者发送消息的速度超过了消费者处理消息的速度,就会导致队列中的消息堆积,直到队列存储消息达到上限。之后发送的消息就会成为死信,可能会被丢弃,这就是消息堆积问题。
解决消息堆积有两种思路:
- 增加更多消费者,提高消费速度
- 在消费者内开启线程池加快消息处理速度
- 扩大队列容积,提高堆积上限
简介
从RabbitMQ的3.6.0版本开始,就增加了Lazy Queues的概念,也就是惰性队列。惰性队列的特征如下:
- 接收到消息后直接存入磁盘而非内存
- 消费者要消费消息时才会从磁盘中读取并加载到内存
- 支持数百万条的消息存储
基于命令行设置lazy-queue
要设置一个队列为惰性队列,只需要在声明队列时,指定x-queue-mode属性为lazy即可。可以通过命令行将一个运行中的队列修改为惰性队列:
1 | rabbitmqctl set_policy Lazy "^lazy-queue$" '{"queue-mode":"lazy"}' --apply-to queues |
命令解读:
rabbitmqctl
:RabbitMQ的命令行工具set_policy
:添加一个策略Lazy
:策略名称,可以自定义"^lazy-queue$"
:用正则表达式匹配队列的名字'{"queue-mode":"lazy"}'
:设置队列模式为lazy模式--apply-to queues
:策略的作用对象,是所有的队列
基于@Bean声明lazy-queue
基于@RabbitListener声明LazyQueue
优缺点
惰性队列的优点有哪些?
- 基于磁盘存储,消息上限高
- 没有间歇性的page-out,性能比较稳定
惰性队列的缺点有哪些?
- 基于磁盘存储,消息时效性会降低
- 性能受限于磁盘的IO
MQ集群
初识
RabbitMQ的是基于Erlang语言编写,而Erlang又是一个面向并发的语言,天然支持集群模式。RabbitMQ的集群有两种模式:
普通集群:是一种分布式集群,将队列分散到集群的各个节点,从而提高整个集群的并发能力
镜像集群:是一种主从集群,普通集群的基础上,添加了主从备份功能,提高集群的数据可用性
仲裁队列:镜像集群虽然支持主从,但主从同步并不是强一致的,某些情况下可能有数据丢失的风险。因此在RabbitMQ的3.8版本以后,推出了新的功能:仲裁队列来代替镜像集群,底层采用Raft协议确保主从的数据一致性
普通集群
结构特征
普通集群,或者叫标准集群(classic cluster),具备下列特征:
- 会在集群的各个节点间共享部分数据,包括:交换机、队列元信息。不包含队列中的消息。
- 当访问集群某节点时,如果队列不在该节点,会从数据所在节点传递到当前节点并返回
- 队列所在节点宕机,队列中的消息就会丢失
结构如图:
实践
参考-> RabbitMQ部署指南.md
镜像集群
结构特征
镜像集群:本质是主从模式,具备下面的特征:
- 交换机、队列、队列中的消息会在各个mq的镜像节点之间同步备份。
- 创建队列的节点被称为该队列的主节点,备份到的其它节点叫做该队列的镜像节点。
- 一个队列的主节点可能是另一个队列的镜像节点
- 所有操作都是主节点完成,然后同步给镜像节点
- 主宕机后,镜像节点会替代成新的主节点
结构如图:
实践
参考-> RabbitMQ部署指南.md
仲裁队列
结构特征
仲裁队列:仲裁队列是3.8版本以后才有的新功能,用来替代镜像队列,具备下列特征:
- 与镜像队列一样,都是主从模式,支持主从数据同步
- 使用非常简单,没有复杂的配置
- 主从同步基于Raft协议,强一致
实践
参考-> RabbitMQ部署指南.md
Java代码创建仲裁队列
1 |
|
SpringAMQP连接MQ集群
注意,这里用address来代替host、port方式
1 | spring: |