0%


写在前面的话

最近在做数据迁移工具。经过多方调研,最后选择阿里巴巴开源工具 DataX。为了兼容携程 Dal 组件,对 DataX 连接源库和目标库的部分做了改造,以便通过 TitanKey 实现数据的同步。因此,关于数据同步的内容,计划分为三个章节:DataX 工具研究介绍、携程 Dal 组件研究介绍、DataX 整合 Dal。本篇介绍 DataX 的作用机制与使用。

一、DataX 介绍

DataX 是阿里巴巴开源出来的数据同步工具,主要解决解决各种异构数据源之间的同步难题。目前已经拥有比较完善的插件体系,包括常用的 RDBMS 数据库、NOSQL、大数据计算系统。良好的架构设计,方便开发者引入新插件,一步步构建起数据同步的生态圈。

二、目前支持的插件
类型 数据源 Reader(读) Writer(写)
RDBMS 关系型数据库 MySQL
Oracle
SQLServer
PostgreSQL
DRDS(分布式关系型数据库)
通用 RDBMS (支持所有关系型数据库)
阿里云数仓数据存储 ODPS
ADS ×
OSS
OCS
NoSQL数据存储 OTS
Hbase0.94
Hbase1.1
Phoenix4.x
Phoenix5.x
MongoDB
Hive
Cassandra
无结构化数据存储 TxtFile
FTP
HDFS
Elasticsearch
时间序列数据库 OpenTSDB ×
TSDB
三、DataX 同步机制

DataX 将数据同步过程中的读取和写入分别抽象为 Reader/Writer插件,并且以框架作为媒介,实现读取数据源和同步数据源的灵活组合。

简单来说,DataX 的设计愿景,是建立一个万能数据池(图中的 FrameWork):有无限根管道通往池子,负责导入数据;同时又有无限根管道负责向外导出数据。向池子里导入数据,依赖 Reader 插件;从池子向外导出数据,依赖 Writer 插件。这种设计的精妙之处,在于这个数据池子是万能的,就是说可以从任何一根管道导入数据,也可以将数据导出到任何一根管道。因此,我们只需要关心导入数据和导出数据的管道建设,也就是开发新的 Reader/Writer插件,便可实现多种数据源之间的同步。

四、DataX数据同步过程

名词解释:

JobDataX 执行数据同步任务的最小业务单元;

TaskDataX 执行数据同步任务的最小执行单元,由 Job 拆分而来,为实现最大的同步效率;

TaskGroup:包含一组 Task 的集合;

DataX 调度过程:

提交一个数据同步 JobDataX 后,DataX 会开启一个 Job 进程,然后根据拆分策略,将 Job 拆分为多个 Task。接下来,Job 调用 Scheduler,依据配置的并发数量,重新对拆分好的 Task 进行组合,这样组合叫做 TaskGroup。最后,TaskGroup 以一定的并发量(配置项: channel)来执行组内的 Task。任务执行过程中,DataX 框架会收集任务执行结果,并以报表的形式打印在日志中。

五、 DataX 的使用

为了演示方便,我们直接做 MySQLMySQL 库的数据同步。

首先需要获取 DataX 工具包。有两种途径:一是下载Datax=X 源码,通过 Maven 打包;二是直接下载官方工具包

然后获取数据同步配置模板。查看源码可知,为了操作方便,DataX 已经内嵌了 Python 执行脚本,可以通过脚本语言获取配置模板,以及执行后续的同步操作。这里我们执行(在 DataX 工具包的 bin 目录下):

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
python datax.py -r mysqlreader -w mysqlwriter

DataX (DATAX-OPENSOURCE-3.0), From Alibaba !
Copyright (C) 2010-2017, Alibaba Group. All Rights Reserved.


Please refer to the mysqlreader document:
https://github.com/alibaba/DataX/blob/master/mysqlreader/doc/mysqlreader.md

Please refer to the mysqlwriter document:
https://github.com/alibaba/DataX/blob/master/mysqlwriter/doc/mysqlwriter.md

Please save the following configuration as a json file and use
python {DATAX_HOME}/bin/datax.py {JSON_FILE_NAME}.json
to run the job.

{
"job": {
"content": [
{
"reader": {
"name": "mysqlreader",
"parameter": {
"column": [],
"connection": [
{
"jdbcUrl": [],
"table": []
}
],
"password": "",
"username": "",
"where": ""
}
},
"writer": {
"name": "mysqlwriter",
"parameter": {
"column": [],
"connection": [
{
"jdbcUrl": "",
"table": []
}
],
"password": "",
"preSql": [],
"session": [],
"username": "",
"writeMode": ""
}
}
}
],
"setting": {
"speed": {
"channel": ""
}
}
}
}

上面打印出来的 Json 串就是 MySQLMySQL数据同步的配置模板。将模板中的选项补充完成,mysql2mysql.json 样例如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
{
"job": {
"content": [
{
"reader": {
"name": "mysqlreader",
"parameter": {
"column": [
"id",
"name"
],
"connection": [
{
"jdbcUrl": ["jdbc:mysql://127.0.0.1:3306/test?useSSL=false&zeroDateTimeBehavior=EXCEPTION&serverTimezone=UTC"],
"table": ["from_table"]
}
],
"password": "**********",
"username": "root",
"where": ""
}
},
"writer": {
"name": "mysqlwriter",
"parameter": {
"column": [
"id",
"name"
],
"connection": [
{
"jdbcUrl": "jdbc:mysql://127.0.0.1:3306/test?useSSL=false&zeroDateTimeBehavior=EXCEPTION&serverTimezone=UTC",
"table": ["to_table"]
}
],
"password": "**********",
"preSql": ["delete from to_table"],
"session": [],
"username": "root",
"writeMode": "insert"
}
}
}
],
"setting": {
"speed": {
"channel": 5
}
}
}
}

把这个文件放在 dataX 目录下,执行 Python 脚本(在 DataXbin 目录下):

1
2
3
4
5
6
7
8
9
10
11
python datax.py ./mysql2mysql.json

...
2021-04-08 11:20:25.263 [job-0] INFO JobContainer -
任务启动时刻 : 2021-04-08 11:20:15
任务结束时刻 : 2021-04-08 11:20:25
任务总计耗时 : 10s
任务平均流量 : 205B/s
记录写入速度 : 5rec/s
读出记录总数 : 50
读写失败总数 : 0

这样我们便可以将 from_table 中数据同步至 to_table 中。DataX 框架收集到的各项指标打印在最后。关于配置项中的各项指标,可参见源码的文档介绍。


1、拥有个人图床的必要性

图床是存储图片的服务器,可以生成外链供在线加载,被广泛用于网站建设。目前市面上主流的图床有 ImgURLSM.MS新浪微博路过图床七牛云等,主要有两个特点:免费图床广告多、不稳定;收费图床不适合打工人。作为搞技术的人,我们拥有开源精神的同时,也要利用好开源技术。GitHubGitLabGitee 就是最干净的、最稳定的图床。由于GitHubGitLab 在国内的访问速度远不如 Gitee,所以我们选择搭建 Gitee 图床。

2、图床工具-PicGo

即使有了 Gitee 图床,我们仍然需要借助 Git 将图片 Push 到远端,然后再到仓库中 Copy 在线图片的加载地址。跟那些主流图床比起来,这样做只会显得更加傻蛋。为此,我们要借助一款优秀的图床工具 PicGo

PicGo: 一个用于快速上传图片并获取图片 URL 链接的工具

简单地说,PicGo 可以主动 Push 图片到远端,并主动获取远端图片的外链,不用我们操心。

3、搭建基于 Gitee 的图床
3.1 创建 Gitee 图床仓库

创建图床仓库,需要注意两点:仓库需要开源,否则别人没有权限查看你的图片;分支只需要初始化 master ,图床操作都在这个分支上。

3.2 安装配置 PicGo

下载安装包:version: 2.3.0

windows平台请选择:

运行软件,可以看到【图床设置】一栏是没有 Gitee 的,这需要额外安装插件。操作【插件设置】,输入 gitee 查询相关插件,选择安装 gitee 2.0.5

插件安装基于 npm,需要先安装 Nodejs

插件安装成功后重启软件,打开【图床设置】,选择【Gitee图床】,各项配置如下:

owner: Gitee用户名 (donehub);

repo: Gitee 图片仓库名 (img-bed);

path: 图片存放的目录,可以不填写;

token: Gitee 上生成的私有令牌,用于授权 PicGo 操作 Gitee 图床;

message: 用默认证即可;

具体配置方法:

ownerrepo 容易写错,建议直接到 GiteeCopy

Gitee 生成私有令牌:

步骤:个人主页-》个人设置-》安全设置-》私有令牌-》配置权限-》提交;

注意:私有令牌只会展示一次,建议复制下来长久保存;

3.3 上传图片

打开【上传区】,可以看到 PicGo 支持四种图片上传方式:拖拽上传、选择上传、剪切上传、URL 上传。返回的在线图片链接格式有 MarkdownHTML 等。

我们选择一张图片上传,上传成功后,打开【相册】。在相册里,不仅可以看到已上传的所有图片,还可以拷贝、修改在线图片链接。

4. Typora 内嵌 PicGo

虽然 PicGo + Gitee 已经非常好用了,但我们依然需要手动上传图片并拷贝外链。既然图床是服务于内容,那么是否可以内嵌 PicGo 到编辑器内部呢?

Typora 是一款主流的 Markdown 编辑器, 0.9.98 及以上版本可以内嵌 PicGo 工具。配置完成之后,只需要将图片拖入页面,即可自动上传图床并插入外链。

配置方法:文件-》偏好设置-》图像-》插入图片时选择上传图片-》上传服务设定

打开 Test.md,将图片拖入页面,可以看到短暂的 loading 提示,然后上传成功并替换图片外链。


一、ForkJoinPool 介绍

ForkJoinPool 是 jdk 1.7 引入的一个线程池,其底层设计基于分治算法(Divide-and-Conquer)的并行实现,是一款可以获得良好并行性能的简单且高效的设计技术。通过任务分治,可以更好地利用多处理器,并行处理任务,提升计算效能。

Fork/Join 框架主要包含三个模块:

  • 线程池:ForkJoinPool
  • 执行 Fork/Join 任务的线程:ForkJoinWorkerThread
  • 任务对象:ForkJoinTask,继承者有 RecursiveTask、RecursiveAction、CountedCompleter

ForkJoinPool 通过 ForkJoinWorkerThread 来处理提交的 ForkJoinTask。通常不会直接创建 ForkJoinTask,而是借助其继承类,根据实际需要创建对应的分治任务。RecursiveTask 是一个可以递归执行的 ForkJoinTask;RecursiveAction 是一个没有返回值的 RecursiveTask;CountedCompleter 在完成任务执行后,回自动触发执行一个自定义的钩子函数。

二、工作窃取(Work-Stealing)算法

工作窃取 (work-stealing) 算法,是 Fork/Join 的设计原理,指线程池内的所有工作线程都尝试找到并执行已经提交的任务,或者是被其他活动任务创建的子任务。因此,在运行多个可以产生子任务的任务,或提交的许多小任务,ForkJoinPool 的效率非常高。

在 ForkJoinPool 中,每个工作线程 (ForkJoinWorkerThread) 都对应一个任务队列 (WorkQueue),工作线程优先处理自身队列的任务,然后以 FIFO 的顺序随机窃取其他队列中的任务。处理自身队列的任务的方式有两种:先进先出 (FIFO)、先进后出 (LIFO)。这由 ForkJoinPool 的构造参数 asyncMode 决定,默认先进先出 (FIFO)。

  • 每个工作线程 (ForkJoinWorkerThread) 都有自己的一个 WorkQueue,该工作队列是一个双端队列;
  • WorkQueue 支持三种操作 push、pop、poll;
  • push/pop 只能被队列的所有者线程调用,而 poll 可以被其他线程调用;
  • 划分的子任务调用 fork 方法时,都会被 push 到自己的 WorkQueue 中;
  • 一般情况下,工作线程 (ForkJoinWorkerThread) 从自己的双端队列获出任务并执行;
  • 当自己的队列为空时,工作线程 (ForkJoinWorkerThread) 便随机从其他 WorkQueue 末尾调用 poll 方法窃取任务并执行;

三、ForkJoinPool 使用

我们以 RecursiveTask 学习使用 ForkJoinPool,可递归分治任务实现类图:

计算1+2+3+…+10000的值:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
public class SumTest extends RecursiveTask<Integer> {
final int start;
final int end;

SumTest(int start, int end) {
this.start = start;
this.end = end;
}

@Override
protected Integer compute() {

if (end - start < 100) {
System.out.println(Thread.currentThread().getName() + " 开始执行: " + start + "-" + end);
int sum = 0;
for (int i = start; i <= end; i++) {
sum += i;
}
return sum;
}

SumTest sumTest1 = new SumTest(start, (end + start) / 2);
SumTest sumTest2 = new SumTest((start + end) / 2 + 1, end);

sumTest1.fork();
sumTest2.fork();

return sumTest1.join() + sumTest2.join();
}

public static void main(String[] args) throws ExecutionException, InterruptedException {
ForkJoinPool pool = new ForkJoinPool();
ForkJoinTask<Integer> task = new SumTest(1, 1000);
pool.submit(task);
System.out.println(task.get());
}
}

运行结果:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
ForkJoinPool-1-worker-13 开始执行: 1-63
ForkJoinPool-1-worker-11 开始执行: 501-563
ForkJoinPool-1-worker-13 开始执行: 126-188
ForkJoinPool-1-worker-11 开始执行: 564-625
ForkJoinPool-1-worker-13 开始执行: 189-250
ForkJoinPool-1-worker-10 开始执行: 251-313
ForkJoinPool-1-worker-4 开始执行: 64-125
ForkJoinPool-1-worker-10 开始执行: 314-375
ForkJoinPool-1-worker-13 开始执行: 376-438
ForkJoinPool-1-worker-10 开始执行: 439-500
ForkJoinPool-1-worker-4 开始执行: 626-688
ForkJoinPool-1-worker-0 开始执行: 814-875
ForkJoinPool-1-worker-7 开始执行: 751-813
ForkJoinPool-1-worker-11 开始执行: 689-750
ForkJoinPool-1-worker-3 开始执行: 939-1000
ForkJoinPool-1-worker-15 开始执行: 876-938
500500

从执行结果来看,ForkJoinPool 通过分治算法,一级级拆分表达式,直到拆分的数据单元的差值小于100,接着分别计算各个小数据单元的和,提交汇总。

四、Fork/Join 任务提交方式

ForkJoinPool 支持三种任务提交方式:

  • submit: 异步执行,有返回值,通过 task.get() 获取执行结果;
  • invoke: 同步执行,等待任务执行完毕后,返回计算结果;
  • execute: 直接提交任务,同步执行,无返回结果。


一、为什么要使用域名

曾经很多次想给别人介绍我熬着夜写出的技术博客,但网址是什么,我经常拼不出来。虽然 donehub.github.io 的结构比较简单,但摆到台面上,总差那么点意思。因此,为了让自己更容易被记住,我需要一个域名。

二、GitHub Pages 支持域名配置

GitHub Pages 是一个很好的博客载体,部署成功之后会分配一个 username.github.io 的访问地址。同时,GitHub Pages 也预留了域名配置项,“我们可以定制化一个域名,不用再去访问默认的地址”。这使得绑定个人域名成为可能。

三、绑定个人域名

3.1 申请个人域名

域名注册平台有阿里云GoDaddy 等。我在阿里云平台注册的,叫 takeshell.com。每年只需要几十块钱,还能玩得起。

3.2 GitHub Pages 域名替换

GitHub Pages 默认是部署在 https://username.github.io/ 上的,我们有两种方法将域名替换为 takeshell.com

方法一:在项目目录上手动添加 CNAME 文件(没有后缀名),文本内容为 takeshell.com。提交之后,可以看到 Custome domain 已自动回填新的域名。

方法二:直接操作 Custom domain 配置项,填写个人域名,并保存。

3.3 DNS 配置

替换为个人域名后, GitHub 的域名健康检测是不通过的,这是因为我们没有为个人域名配置 DNS

主要有两种域名绑定方案:

  • A: 全称为AddressDNS 将个人域名解析到指定 IP
  • CNAME: 全称为 Canonical NameDNS 将域名指向为另外一个域名;

Address 记录类型:

首先,通过 ping username.github.io 获取 IP,如 185.199.111.153。然后在阿里云平台,选择域名解析->解析设置->添加记录。

这里的主机记录是 www,需要再添加一个 @ 的配置,这样, https://www.takeshell.comhttps://takeshell.com 都可以访问博客。最后启用 DNS ,就可以用 takeshell.com 访问博客了。这里的 TTL(Time To Live),是 DNS 的缓存时间,默认10 min 就好了。

CNAME 记录类型:

个人域名,除了可以直接映射到指定 IP 上,还可以映射到 username.github.io 上。相当于开启一个域名分身,虽然来路不同,但目的地是一致的。这样的映射机制也叫域名别名。

同样地,再申请一个主机记录为 @ 的配置。开启 DNS 后,也可以用 takeshell.com 正常访问博客。

四、总结

拥有个人域名是件令人兴奋的事情。首先,个人域名要足够简单,能让别人一眼记住。其次,个人域名要突出个人风格,geek 精神还是日常生活,通过域名必须能看出来。最后,也是最重要的,就是我们看中的域名,不能名花有主了。

个人博客是一个展示自己的舞台,但这个舞台不能深藏于巷,否则就是自导自演。如今,终于绑上了个人域名,希望这个广告牌可以吸引更多的目光。当然,前提是要有好的表演。

一、背景介绍

ThreadLocal是线程提供的本地变量,因其线程特殊属性,被经常用于存储与线程相关的信息,如保存登录用户信息、数据库连接配置等。但我们忽略了一个关键点:ThreadLocal 只能用在同步线程中,而在异步线程或线程池中则不起作用。因此,本博文主要探讨如何在异步线程和线程池中传递 ThreadLocal 上下文。

二、问题描述

发版之后,爆出线上问题:“用户信息获取失败”。为了更好地解释这个问题,先介绍下公司的系统架构:用户登录时,后台系统通过用户 key 获取个人信息并保存在 ThreadLocal 静态对象中,以供后续使用。简要实现如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
/**
* 拦截器
*/
@Component
public class HandlerAccessInterceptor implements HandlerInterceptor {

@Override
public boolean preHandle(HttpServletRequest httpServletRequest,
HttpServletResponse httpServletResponse, Object o) throws Exception {
// 增加允许跨域的返回信息
httpServletResponse.setHeader("Access-Control-Allow-Origin", "*");
httpServletResponse.setHeader("Access-Control-Allow-Headers", "Content-Type,Content-Length, Authorization, Accept,X-Requested-With");
httpServletResponse.setHeader("Access-Control-Allow-Methods", "PUT,POST,GET,DELETE,OPTIONS");

// 获取并保存用户信息
if (httpServletRequest.getCookies() != null) {
for (Cookie cookie : httpServletRequest.getCookies()) {
if ("vkey".equals(cookie.getName())) {
UserContextUtil.setUser(cookie.getValue());
}
}
}

return true;
}

@Override
public void postHandle(HttpServletRequest httpServletRequest, HttpServletResponse httpServletResponse, Object o, ModelAndView modelAndView) throws Exception {

}

// 清除用户信息
@Override
public void afterCompletion(HttpServletRequest httpServletRequest, HttpServletResponse httpServletResponse, Object o, Exception e) throws Exception {
UserContextUtil.remove();
}
}

/**
* 用户信息工具
*/
@Component
public class UserContextUtil {
private static ThreadLocal<SyUser> threadLocal = new ThreadLocal<>();
private static final String TOKEN_BEARER = "Bearer ";

/**
* 获取当前线程用户信息
*
* @return 前线程用户信息
*/
public synchronized static SyUser getUser() {
SyUser syUser = threadLocal.get();
return syUser;
}

/**
* 直接保存用户信息
*/
public synchronized static boolean setSyUser(SyUser syUser) {

if (syUser == null) {
return false;
}

threadLocal.set(syUser);

return true;
}

/**
* 转换登录用户信息为当期系统用户信息
*
* @param cookies
*/
public synchronized static boolean setUser(Cookie[] cookies) {
if (cookies == null) {
return false;
}
for (Cookie cookie : cookies) {
String name = cookie.getName().toLowerCase();
if ("vkey".equals(name)) {
String value = cookie.getValue();
if (StrUtil.isEmpty(value)) {
return false;
}
LoginUser loginUser = new LoginUser(value);
if (loginUser.getId() == null) {
return false;
}
SyUser syUser = new SyUser();
syUser.setId(loginUser.getId().intValue());
syUser.setUserName(loginUser.getMobile());
syUser.setTrueName(loginUser.getName());
return setSyUser(syUser);
}
}
return false;
}

/**
* 转换登录用户信息为当期系统用户信息
*
* @param headerValue 头部 vkey 值
*/
public synchronized static boolean setUser(String headerValue) {
if (StrUtil.isEmpty(headerValue)) {
return false;
}
if (headerValue.startsWith(TOKEN_BEARER)) {
headerValue = headerValue.substring(TOKEN_BEARER.length());
}
LoginUser loginUser = new LoginUser(headerValue);
if (loginUser.getId() == null) {
return false;
}
SyUser syUser = new SyUser();
syUser.setId(loginUser.getId().intValue());
syUser.setUserName(loginUser.getMobile());
syUser.setTrueName(loginUser.getName());
syUser.setPhone(loginUser.getMobile());
return setSyUser(syUser);
}

/**
* 获取登录用户姓名
*/
public synchronized static String getUserName() {
SyUser syUser = threadLocal.get();
if (syUser == null) {
return null;
}
return syUser.getTrueName();
}

/**
* 移除线程中用户信息
*/
public synchronized static void remove() {
threadLocal.remove();
}
}

问题代码定位:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
@RestController
@RequestMapping("/web/file")
@Slf4j
public class ManageFileController {

@Resource
private ManageFileService manageFileService;

/**
* 【加载文件】
* 下载文件的数据准备阶段可能非常长, 前台操作人员无法得知进度, 且无法做其他操作
* 故将数据准备与文件下载解耦, 解决交互上不舒服的问题
*
* @param loadFileReqDTO 加载文件请求
* @return 调用是否成功
*/
@PostMapping("/loadFile")
public RespDTO<String> loadFile(@Valid @RequestBody LoadFileReqDTO loadFileReqDTO) {

// 初始化加载文件信息
AsyncLoadFile asyncLoadFile = manageFileService.initLoadFileInfo(loadFileReqDTO);

// 执行文件加载
manageFileService manageFileService.loadFile(loadFileReqDTO, inheritedSyUser);

return RespDTO.success();
}
}

/**
* @author zourongsheng
* @version 1.0
* @date 2020/10/21 13:55
*/
@Service
@Slf4j
public class ManageFileService {

@Resource
private ManageFileHelper manageFileHelper;

@Resource
private AsyncLoadFileMapper asyncLoadFileMapper;

/**
* 【加载文件】
*
* @param loadFileReqDTO 加载文件请求
* @param asyncLoadFile 初始化加载文件信息
* @param syUser 用户信息
*/
@Async(TASK_EXECUTOR)
public void loadFile(LoadFileReqDTO loadFileReqDTO, AsyncLoadFile asyncLoadFile, SyUser syUser) {

try {

log.info("异步加载文件开始; userName: {}", UserContextUtil.getUserName());

// 路由加载服务
LoadFileService loadFileService = manageFileHelper.router(loadFileReqDTO.getLoadFileType());

// 开启文件加载过程
LoadFileReqBO loadFileReqBO = new LoadFileReqBO();
BeanUtils.copyProperties(loadFileReqDTO, loadFileReqBO);

LoadFileRespBO loadFileRespBO = loadFileService.executeLoadFile(loadFileReqBO);

// 备份文件至影像件系统
String fileKey = FileUtil.uploadFile(loadFileRespBO.getFilePathUrl(), FileTypeEnum.XLSX, STORE_FILE_KEY);

CuiShouAssert.notEmpty(fileKey, "调用影像件系统异常!");

asyncLoadFile.setFileName(loadFileRespBO.getFileName());
asyncLoadFile.setFileKey(fileKey);
asyncLoadFile.setLoadStatus(LoadStatusEnum.SUCCESS.name());
} catch (Exception e) {
log.info("加载文件异常; errMsg: {}", e.getMessage(), e);
asyncLoadFile.setLoadStatus(LoadStatusEnum.FAILURE.name());
asyncLoadFile.setRemark(e.getMessage());
}

asyncLoadFileMapper.update(asyncLoadFile);

log.info("异步加载文件结束; userName: {}", UserContextUtil.getUserName());
}
}
1
2
运行结果...
2020-12-14 17:51:15.235 INFO [-,d11235d6f6eda8d0,8c023b1e45855f97,false] 19444 --- [common-async-executor-1] c.v.c.o.service.file.ManageFileService : 异步加载文件开始; userName: null

问题很好定位,@Async 异步线程池开启子线程处理任务后,父线程提供的本地线程变量就销毁了。

三、解决方法

  • 将用户信息当作参数传入异步方法,再在异步方法中重新设置本地变量;
  • 采用 InheritedThreadLocal 实现子线程上下文的传递。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
/**
* 【加载文件-方法一】
* 下载文件的数据准备阶段可能非常长, 前台操作人员无法得知进度, 且无法做其他操作
* 故将数据准备与文件下载解耦, 解决交互上不舒服的问题
*
* @param loadFileReqDTO 加载文件请求
* @return 调用是否成功
*/
@PostMapping("/loadFile")
public RespDTO<String> loadFile(@Valid @RequestBody LoadFileReqDTO loadFileReqDTO) {

// 初始化加载文件信息
AsyncLoadFile asyncLoadFile = manageFileService.initLoadFileInfo(loadFileReqDTO);

// #loadFile 为异步方法, 这里将操作人信息继承到子线程中
SyUser syUser = UserContextUtil.getUser();

SyUser inheritedSyUser = new SyUser();

BeanUtils.copyProperties(syUser, inheritedSyUser);

// 执行文件加载
manageFileService manageFileService.loadFile(loadFileReqDTO, inheritedSyUser);

return RespDTO.success();
}

/**
* 【加载文件】
* 下载文件的数据准备阶段可能非常长, 前台操作人员无法得知进度, 且无法做其他操作
* 故将数据准备与文件下载解耦, 解决交互上不舒服的问题
*
* @param loadFileReqDTO 加载文件请求
* @param asyncLoadFile 初始化加载文件信息
* @param syUser 用户信息
*/
@Async(TASK_EXECUTOR)
public void loadFile(LoadFileReqDTO loadFileReqDTO, AsyncLoadFile asyncLoadFile, SyUser syUser) {

UserContextUtil.setSyUser(syUser);
log.info("异步加载文件开始; UserName: {}", UserContextUtil.getRealName());
UserContextUtil.remove();
log.info("异步加载文件结束; UserName: {}", UserContextUtil.getRealName());
}
1
2
3
方法一运行结果...
2020-12-14 18:03:34.947 INFO [-,480f9c55e785ec6f,c8de532979871193,false] 19444 --- [common-async-executor-2] c.v.c.o.service.file.ManageFileService : 异步加载文件开始; UserName: 邹荣升
2020-12-14 18:03:35.235 INFO [-,480f9c55e785ec6f,c8de532979871193,false] 19444 --- [common-async-executor-2] c.v.c.o.service.file.ManageFileService : 异步加载文件结束; UserName: null

方法一可以说是傻瓜操作,治标不治本,后来的同事稍不注意还会掉进坑里。我们来实现方法二:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
private static ThreadLocal<SyUser> threadLocal = new InheritableThreadLocal<>();

/**
* 【加载文件-异步线程】
* 下载文件的数据准备阶段可能非常长, 前台操作人员无法得知进度, 且无法做其他操作
* 故将数据准备与文件下载解耦, 解决交互上不舒服的问题
*
* @param loadFileReqDTO 加载文件请求
* @param asyncLoadFile 初始化加载文件信息
* @param syUser 用户信息
*/
@Async
public void loadFile(LoadFileReqDTO loadFileReqDTO, AsyncLoadFile asyncLoadFile, SyUser syUser) {

UserContextUtil.setSyUser(syUser);
log.info("异步加载文件开始; UserName: {}", UserContextUtil.getRealName());
UserContextUtil.remove();
log.info("异步加载文件结束; UserName: {}", UserContextUtil.getRealName());
}

/**
* 【加载文件-异步线程池】
* 下载文件的数据准备阶段可能非常长, 前台操作人员无法得知进度, 且无法做其他操作
* 故将数据准备与文件下载解耦, 解决交互上不舒服的问题
*
* @param loadFileReqDTO 加载文件请求
* @param asyncLoadFile 初始化加载文件信息
* @param syUser 用户信息
*/
@Async(TASK_EXECUTOR)
public void loadFile(LoadFileReqDTO loadFileReqDTO, AsyncLoadFile asyncLoadFile, SyUser syUser) {

UserContextUtil.setSyUser(syUser);
log.info("异步加载文件开始; UserName: {}", UserContextUtil.getRealName());
UserContextUtil.remove();
log.info("异步加载文件结束; UserName: {}", UserContextUtil.getRealName());
}

异步线程和异步线程池两种情况下的运行结果:

1
2
3
4
运行结果...
2020-12-14 18:36:41.560 INFO [-,8658c694de3e9b7a,ccc7a782204c91b0,false] 19876 --- [common-async-executor-1] c.v.c.o.service.file.ManageFileService : 异步加载文件开始; UserName: 邹荣升

2020-12-14 18:41:17.735 INFO [-,67b4725bcaacfc64,36ee2acec232be32,false] 21600 --- [common-async-executor-1] c.v.c.o.service.file.ManageFileService : 异步加载文件开始; UserName: 邹荣升

三、总结

虽然 ThreadLocal 的线程相关属性提供了很多便利性,但在高并发系统中,直接使用原生静态对象并不合适。因此,如使用 RedisMQ 一样,深入了解常用工具的特性,是系统设计的必备要素。


最近在忙很多事,工作和生活都乱糟糟的,博客也停更了。写技术还是美食呢,当然是美食啦。

今天做萝卜丸子。边做边吃,忽然想起读初二的一天,同学们凑钱买了五斤萝卜丸子,然后在英语课上偷吃的故事。十五年过去了,但历历在目。

一、食材准备

青萝卜、胡萝卜、红薯淀粉、面粉、鸡蛋、辣椒、胡椒、葱姜蒜、生抽、老抽、蚝油、冰糖。

DOPsrd.jpg

二、调料子

首先将萝卜刨丝。

DOPjRU.jpg

然后要杀掉萝卜的水气和臭味。比较喜欢看老饭骨,大伯给的方子是在沸水中放入冰糖,再倒入萝卜丝。今天试了下,效果确实很棒,建议民间推广。

DOPgat.jpg

接下来调糊糊。用筷子顺着一个方向搅拌,直到可以用筷子提起来。这步也叫上劲,是关键一环。

DOiNLj.jpg

调糊糊前,可以先把油热上。

DOiXTI.jpg

待油七分热,将糊糊团成丸子,依次放入。油温太高会让丸子糊掉,太低又会导致丸子吸油太多。七分热怎么看呢?可以先放一小块糊糊进去,剧烈翻滚但短时间内又不会泛黄就可以了。

DOFTNq.jpg

文火慢炸20分钟,表面金黄时捞出。

DOFxb9.jpg

丸子放入次序不一样,可能炸出来的颜色也不一样。强迫症患者可以将丸子下入油中复炸一次,让颜色均匀。

萝卜丸子可以当零食吃,也可以做汤。


一、InnoDB 索引

索引是数据库应用中非常重要的组成部分,可以提升数据检索效率,减轻服务引擎的压力。InnoDB 服务引擎,支持哈希索引和B+Tree索引。其中,B+Tree 是默认选择,也是最常用的索引结构。普通索引和唯一索引是最常用的两种索引类型,本文详细介绍一下两者的区别。

二、普通索引与唯一索引的作用

普通索引,规律分布在叶子节点上,唯一的目的就是方便随机查找和范围查找,提升检索效率。

唯一索引,不仅要实现普通索引的功能,还要保证值的唯一性。

三、普通索引与唯一索引的区别

change buffer

探讨两者的区别,需要从它们的作用机制入手。对于普通索引,change buffer 的作用至关重要,这也是其与唯一索引区别最大的地方。至于 change buffer 是什么,要从 MySQL 的数据管理机制说起。

为了减少 IO 读的次数,MySQL 将数据分别存在磁盘和内存中,磁盘中是原始数据,内存(buffer pool)中是缓存的数据页和索引页。可以说,buffer pool 是缓解磁盘读的一种手段。当然,MySQL 不会单单研究出来一个缓解磁盘读的机制,毕竟磁盘读和写都是十分消耗的操作,而 change buffer 就是缓解磁盘写的一种手段。

当需要更新一个数据页时,如果数据页在 buffer pool 中,则直接更新;如果不在缓存中,InnoDB 就会把更新操作缓存在 change buffer 中,而不会从磁盘中读取数据页到 buffer pool。在合适的时机,如再次访问这个数据页时,InnoDB 便会将数据页读入内存,然后执行 change buffer 中的相关操作。总之,在不影响数据逻辑正确性的前提下,减少了磁盘写的次数。

对于 InnoDB 数据引擎,将 change buffer 的操作应用到数据页的过程,叫 merge。以下场景会触发 merge 操作:

  • 访问这个数据页;
  • 系统后台线程定期 merge;
  • change buffer 容量不够用时;
  • 数据库正常关闭时;
  • redo log 写满时(固定大小,循环写);

此外,虽然 change buffer 看起来像是一个临时缓存,但它却是可持久化的数据,不仅存于内存,也被写到磁盘上。这样的设计才是合理的,当出现突发状况时,可以通过磁盘上的 change buffer 恢复逻辑操作,保证数据的正确性和系统的容灾能力。


3.1 在查询方面的差异

id 是聚簇索引,k 是二级索引。可以看出,这是一个典型的索引覆盖场景。

1
select id from T where k = 10;

如果给 k 加上普通索引,那么随机查询将根据二分法找到第一个目标数据,然后再沿着叶子节点查找下一个数据,判断是否为 10,如果不是,则返回查询结果。

如果给 k 加上唯一索引,那么随机查询将根据二分法找到第一个目标数据,然后返回查询结果,因为这个目标数据一定是唯一的。

所以,在查询数据方面,普通索引比唯一索引多一个向下一个节点查找的步骤。如果下一个节点在同一个数据页,通过指针直接就可以拿到;如果下一个节点不在当前数据页,那么将会发生一次 IO 读。就是说,普通索引的查询性能弱于唯一索引。

3.2 在更新方面的差异

从 change buffer 的介绍可知,当数据页不在内存中,使用普通索引会将更新操作缓存在 change buffer 中,然后在适当的时机 merge 到原数据页中,从而减少 IO 次数;而使用唯一索引,为了保证数据的唯一性,需要将数据页读到内存中来,进行更新操作,再写回磁盘。就是说,普通索引的更新性能强于唯一索引。


一、@Retention 简介

@Retention 是用来定义注解的注解,这类注解也叫 元注解。@Retention 定义了注解的生命周期,生命周期的长短取决于 RetentionPolicy 属性值。

1
2
3
4
5
6
7
8
9
10
@Documented
@Retention(RetentionPolicy.RUNTIME)
@Target(ElementType.ANNOTATION_TYPE)
public @interface Retention {
/**
* Returns the retention policy.
* @return the retention policy
*/
RetentionPolicy value();
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
public enum RetentionPolicy {
/**
* Annotations are to be discarded by the compiler.
*/
SOURCE,

/**
* Annotations are to be recorded in the class file by the compiler
* but need not be retained by the VM at run time. This is the default
* behavior.
*/
CLASS,

/**
* Annotations are to be recorded in the class file by the compiler and
* retained by the VM at run time, so they may be read reflectively.
*
* @see java.lang.reflect.AnnotatedElement
*/
RUNTIME
}

归纳如下:

属性值 描述 使用场景
RetentionPolicy.SOURCE 注解只保留在源文件,当被编译成class文件,注解就会消失 代码预检,如 @Override、@SuppressWarnings
RetentionPolicy.CLASS 注解被保留到class文件,但当jvm加载class文件时被遗弃 编译时进行一些预处理操作,如生成辅助信息(META-INF/services)
RetentionPolicy.RUNTIME 注解不仅被保存到class文件中,jvm加载class文件之后,仍然存在 运行时动态获取注解信息,如 @Autowired、@Required

二、通过字节码认识 @Retention

定义三种生命周期的注解:SourcePolicy、ClassPolicy、RuntimePolicy
1
2
3
4
5
6
7
8
9
10
11
12
13
14
@Retention(RetentionPolicy.SOURCE)
@Target(ElementType.METHOD)
public @interface SourcePolicy {
}

@Retention(RetentionPolicy.CLASS)
@Target(ElementType.METHOD)
public @interface ClassPolicy {
}

@Retention(RetentionPolicy.RUNTIME)
@Target(ElementType.METHOD)
public @interface RuntimePolicy {
}

使用以上注解:

1
2
3
4
5
6
7
8
9
10
11
12
13
public class RetentionPolicyTest {
@SourcePolicy
public void sourcePolicy() {
}

@ClassPolicy
public void classPolicy() {
}

@RuntimePolicy
public void runtimePolicy() {
}
}

javap -v RetentionPolicyTest 查看字节码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
{
public RetentionPolicyTest();
flags: ACC_PUBLIC
Code:
stack=1, locals=1, args_size=1
0: aload_0
1: invokespecial #1 // Method java/lang/Object."<init>":()V
4: return
LineNumberTable:
line 3: 0

public void sourcePolicy();
flags: ACC_PUBLIC
Code:
stack=0, locals=1, args_size=1
0: return
LineNumberTable:
line 7: 0

public void classPolicy();
flags: ACC_PUBLIC
Code:
stack=0, locals=1, args_size=1
0: return
LineNumberTable:
line 11: 0
RuntimeInvisibleAnnotations:
0: #11()

public void runtimePolicy();
flags: ACC_PUBLIC
Code:
stack=0, locals=1, args_size=1
0: return
LineNumberTable:
line 15: 0
RuntimeVisibleAnnotations:
0: #14()
}

可以看到:

  • 编译之后,@SourcePolicy 信息不存在;
  • 编译之后,@ClassPolicy 具有属性:RuntimeInvisibleAnnotations;
  • 编译之后,@RuntimePolicy 具有属性:RuntimeVisibleAnnotations;


自从国内猪肉紧俏以来,市面上就经常买到进口猪肉。外国人养猪比较人性化,不给小猪做阉割,所以猪肉又腥又骚。今天抱着踩雷的心态,买了一份肉,并在叮咚上特意备注道:“只要中国猪肉!”。

今天来做个狮子头。

一、食材准备

五花肉(肥瘦比: 6:4),山药(马蹄/萝卜)、老豆腐、红薯淀粉、辣椒、葱姜蒜、生抽、老抽、蚝油、料酒、八角、鸡蛋。

wyDNmd.jpg

辅料我用的是山药,大家可以根据自己的口味调整。辅料添加要适量,否则可能团不成肉丸子。

二、制作流程
1
2
3
4
5
graph LR
A[拌料] --> B(成型)
B --> C(煎炸)
C --> D(红烧)
D --> E(勾芡)
三、操作步骤
  1. 将五花肉,山药、老豆腐、葱姜蒜、生抽、老抽、蚝油、料酒、鸡蛋依次放入碗中,用手沿着一个方向搅拌上劲,待拌料可以黏在手上,加入少量淀粉后继续搅拌摔打;

    wyD8SO.jpg

  2. 将拌料团成你喜欢的大小;

    wyDJ6e.jpg

  3. 热锅冷油,煎炸二十分钟;油热6分可入锅,全程小火;这一步的主要目的是定型,口味清淡的朋友也可以用沸水来操作;

    wyDGlD.jpg

  4. 将辣椒、葱姜蒜、八角炒香,再加入生抽、老抽、蚝油、料酒,添2/3锅水,加入肉丸子小火慢煮30分钟;

    wyDYOH.jpg

  5. 勾芡 ;待锅内汤汁不多时,盛出肉丸子,然后加入水淀粉熬制挂锅,然后将红烧汁浇在狮子头上;

    wygNnS.jpg

    wygwkj.jpg

四、多说一句

做狮子头比较耗时耗力,只能在周末享用。相比来说,我更喜欢老家的肉丸子。材料比较简单,猪肉、馒头渣、黑胡椒。团成丸子,再兑上半锅水,炖煮一个小时。味浓又不油腻!

一、应用背景

在支付系统中,线程池在批量支付中有着十分重要的作用。随着业务量的增多,支付接口的处理能力越来越弱,甚至出现一个支付批次的处理时间超过5分钟的情况。这对于调用方来说,是无法容忍的。最简单粗暴的解决办法,便是增加线程数和阻塞队列容量。但至于怎么调整,才能做到既保证接口对支付请求的处理能力,又不浪费系统资源,好像只能凭感觉来了。Prometheus + Grafana 是目前比较流行的监控体系,在管理线程池方面的表现也十分出色。

二、线程池配置

采用 Spring 提供的 ThreadPoolTaskExecutor 来完成线程池的配置。根据业务需要,创建两个线程池:同步线程池和异步线程池。具体配置如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
@Configuration
public class ExecutorPoolConfig {

/**
* 线程池参数配置
*/
@Resource
private ExecutorProperties executorProperties;

private static final String ASYNC_EXECUTOR = "asyncExecutor";
private static final String SYNC_EXECUTOR = "syncExecutor";

/**
* @return 同步线程池
*/
@RefreshScope
@Bean(SYNC_EXECUTOR)
public ThreadPoolTaskExecutor serviceExecutor() {
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
executor.setCorePoolSize(executorProperties.getSync().getCorePoolSize());
executor.setMaxPoolSize(executorProperties.getSync().getMaxPoolSize());
executor.setQueueCapacity(executorProperties.getSync().getQueueCapacity());
executor.setKeepAliveSeconds((int)executorProperties.getSync()
.getKeepAlive().getSeconds());
executor.setThreadNamePrefix("executor-sync-service-");
executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
executor.setWaitForTasksToCompleteOnShutdown(true);
executor.setAwaitTerminationSeconds(executorProperties.getSync()
.getAwaitTerminationSeconds());
return executor;
}

/**
* @Async 注解默认走该线程池
* @return 异步处理线程池
*/
@RefreshScope
@Bean(ASYNC_EXECUTOR)
public ThreadPoolTaskExecutor taskExecutor() {
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
executor.setCorePoolSize(executorProperties.getAsync().getCorePoolSize());
executor.setMaxPoolSize(executorProperties.getAsync().getMaxPoolSize());
executor.setQueueCapacity(executorProperties.getAsync().getQueueCapacity());
executor.setKeepAliveSeconds((int)executorProperties.getAsync()
.getKeepAlive().getSeconds());
executor.setThreadNamePrefix("common-async-executor-");
executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
executor.setWaitForTasksToCompleteOnShutdown(true);
executor.setAwaitTerminationSeconds(executorProperties.getAsync()
.getAwaitTerminationSeconds());
return executor;
}
}

三、监控指标

综合线程池的核心要素和生产业务的关键要素,提出以下几种监控指标:

  • 核心线程数;

  • 最大线程数;

  • 活跃线程数;

  • 当前线程数;

  • 队列中任务数;

  • 已完成任务数;

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    48
    49
    50
    51
    52
    53
    54
    55
    56
    57
    58
    59
    60
    61
    62
    63
    64
    import io.micrometer.core.instrument.Gauge;
    import io.micrometer.core.instrument.MeterRegistry;
    import io.micrometer.core.instrument.Metrics;

    @Component
    public class ExecutorMetricsSupport implements InitializingBean {

    /**
    * Prometheus 数据采集中心
    */
    @Resource
    private MeterRegistry meterRegistry;

    @Autowired
    @Qualifier(SYNC_EXECUTOR)
    private ThreadPoolTaskExecutor syncExecutor;

    @Autowired
    @Qualifier(ASYNC_EXECUTOR)
    private ThreadPoolTaskExecutor asyncExecutor;

    @Override
    public void afterPropertiesSet() throws Exception {
    initServiceExecutorMetrics(syncExecutor, "executor.sync");
    initServiceExecutorMetrics(asyncExecutor, "executor.async");
    }

    /**
    * 线程池metrics指标监控
    * @param serviceExecutor 线程池
    * @param namePrefix 指标名称前缀
    */
    private void initServiceExecutorMetrics(ThreadPoolTaskExecutor serviceExecutor, String namePrefix) {
    Gauge
    .builder(namePrefix.concat(".active"),
    serviceExecutor, ThreadPoolTaskExecutor::getActiveCount)
    .register(meterRegistry);

    Gauge
    .builder(namePrefix.concat(".core"),
    serviceExecutor, ThreadPoolTaskExecutor::getCorePoolSize)
    .register(meterRegistry);

    Gauge
    .builder(namePrefix.concat(".max"),
    serviceExecutor, ThreadPoolTaskExecutor::getMaxPoolSize)
    .register(meterRegistry);

    Gauge
    .builder(namePrefix.concat(".pool"),
    serviceExecutor, ThreadPoolTaskExecutor::getPoolSize)
    .register(meterRegistry);

    Gauge
    .builder(namePrefix.concat(".queue"), serviceExecutor,
    executor -> executor.getThreadPoolExecutor().getQueue().size())
    .register(meterRegistry);

    Gauge
    .builder(namePrefix.concat(".completetask"), serviceExecutor,
    executor -> executor.getThreadPoolExecutor().getCompletedTaskCount())
    .register(meterRegistry);
    }
    }

四、指标分析

通过 PSQL 将以上各项指标展示在 Grafana 中。 Prometheus 默认 15s 拉取一次数据,对于线程池这种波定性较大的指标,建议将拉取时间调整至 10s,以便灵活且准确地反应线程池的运行情况。

  • 异步线程池

dDNE9g.png

  • 同步线程池

dDN6vd.png

由监控结果可以作出以下分析:

  • 异步线程池的负担非常小,目前还没出现过队列积压的情况,可以适当减少核心线程和最大线程数;
  • 同步线程池配置较为妥当,峰值期间未启用非核心线程,队列积压任务量在合理区间,请求处理效率非常高。