0%


一、简介

Spring Batch 是一款轻量级批处理框架,主要用于构建高容量、高性能的批处理应用。作为 Spring 的子项目,Spring Batch 基于 Spring 框架,已进化出一套完备的企业级解决方案。借助良好的 Spring 生态,被广泛应用于批处理领域。

Spring Batch 拥有强大的组件库,包括任务重启、任务跳过、任务统计、日志追踪、事务管理、资源管理等。此外,对于大批量数据处理任务,通过分区和优化技术,实现高性能作业。总之,Spring Batch 有着良好的可扩展性,既可以处理简单的任务,也可以处理复杂的、高容量的任务。

二、基础架构

看图可知:

  • 一个 Job 可以有一个或多个 Step
  • 每个 Step 都有一个 ItemReader、一个 ItemProcessor 和一个 ItemWriter
  • Job 需要 JobLauncher 发起;
  • 批处理过程中的元数据存在 JobRepository 中;

Job

Job 封装了整个批处理所需要的数据,可以通过 xmlJava Bean 注解配置。Job 的继承链:

JobInstanceJob 的运行实例,就像同一个 Java 类可以实例化出不同的对象, Job 也可以有不同的 JobInstance。因此可以说,JobInstance = Job + JobParametersJobExecutionJobInstance 的一次执行,包括要做什么、怎么做、执行结果等。


Step

看图可知,一个 Step 包含输入、处理、输出这样一种模型,说明在批处理框架中, Step 是最小的执行单元。Step 的继承链:

StepExecutionStep 的一次执行,包含 StepJobExecution 以及事务相关数据的引用,比如提交和回滚次数、开始和结束时间等。

ItemReaderItemProcessorItemWriter,是顶级接口。基于此,Spring Batch 已实现常用组件,如文件数据存取器、数据库数据存取器等,功能完备,开箱即用。


JobLauncher

JobLauncher 负责在指定的 JobParameters 下,启动 Job


JobRepository

JobRepository 专门负责与数据库打交道,记录整个批处理中的增加、检索、更新、删除动作。在 Java 应用中,使用注解 @EnableBatchProcessing 即可完成配置。此外,Spring Batch 是依赖数据库进行管理的。相关表的功能简介如下:

  • BATCH_JOB_INSTANCE:储存 JobInstance 相关的所有信息;
  • BATCH_JOB_EXECUTION_PARAMS: 储存 JobParameters 相关的所有信息;
  • BATCH_JOB_EXECUTION:储存 JobExecution 相关的所有信息;
  • BATCH_STEP_EXECUTION:存储 StepExecution 相关的所有信息;
  • BATCH_JOB_EXECUTION_CONTEXT:存储 Job - ExecutionContext 相关的所有信息;
  • BATCH_STEP_EXECUTION_CONTEXT:存储 Step - ExecutionContext 相关的所有信息;

三、设计原则

只需简单配置,Spring Batch 即可内嵌 Spring 应用中,小巧而强大。其设计原则如下:

  • 继承 Spring 编程模型,开发者只需专注于业务逻辑,将基础实现交由框架负责;

  • 解耦基础架构、执行环境和批处理应用之间的关注点;

  • 抽取核心服务,设计顶层接口;

  • 实现热门组件,开箱即用;

  • 增强核心服务的可拓展性;

  • 通过 Maven 构建简单的部署模型,独立于应用程序;

四、使用原则

开发者在构建批处理方案时,应考虑以下原则

  • 搭建架构和环境,尽量使用通用的构建块,因为批处理架构与在线框架之间会相互影响;

  • 避免在单机应用中构建复杂的逻辑结构;

  • 尽可能多地在内存中处理任务,尽可能少地使用系统资源,尤其是物理 IO

    • 缓存常用数据,避免在同一事务或不同事务中重复读取数据;
    • 全表扫描或索引扫描;
  • 不做重复的任务(记录已处理的任务,对于相同的后续任务,直接跳过);

  • 合理分配初始内存,防止处理任务中多次分配内存,耗费时间;

  • 设定足够的校验和记录,保证数据的完整性;

  • 模拟生产环境和数据量,尽早压测;

  • 注重数据备份;

五、总结

Spring Batch 是一款优秀的批处理框架,其良好的可扩展性和性能天花板,让批处理工作不再头疼。

一方面,Spring Batch 可以完全解耦批处理任务。原本复杂且庞大的一条龙任务,现在可以拆解为若干个 Step,各司其职。同时,每个 Step 都有自己的输入、处理、输出模型,高度规范,高度内聚,超级简单。

另一方面,Spring Batch 可以让项目设计更加科学合理。首先,任务拆解更加细致,工作量预估更加准确。其次,各个环节清晰明了,降低沟通成本。最后,也是最重要的,开发者不用在一个任务中从头磕到尾,头晕眼花,bug 爆炸。


一、MySQL 批处理介绍

执行多条增、删、改语句,mysql-connector-java 支持两种模式:

  • 串行化语句,一条一条发送;
  • 打包语句,分批次发送;

批处理模式,即按照包容量算法,将语句分批打包,发送到数据库服务器,旨在提升大批量语句的执行性能。在数据库连接参数 jdbc-url 后追加 rewriteBatchedStatements=true 即可完成配置,如:

1
spring.datasource.batch-demo.jdbc-url=jdbc:mysql://127.0.0.1:3306/batch-demo?rewriteBatchedStatements=true

二、MySQL 批处理基础实现

以下源码参照 ClientPreparedStatement.javamysql-connector-java 8.0.28 版本。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
@Override
protected long[] executeBatchInternal() throws SQLException {
synchronized (checkClosed().getConnectionMutex()) {
// 不能是只读连接
if (this.connection.isReadOnly()) {
throw new SQLException(Messages.getString("PreparedStatement.25") + Messages.getString("PreparedStatement.26"), MysqlErrorNumbers.SQL_STATE_ILLEGAL_ARGUMENT);
}

// 批量语句大小必须大于 0
if (this.query.getBatchedArgs() == null || this.query.getBatchedArgs().size() == 0) {
return new long[0];
}

// we timeout the entire batch, not individual statements
int batchTimeout = getTimeoutInMillis();
setTimeoutInMillis(0);

resetCancelledState();

try {
statementBegins();

clearWarnings();

// 1. 没有包含原始 sql 并且支持批量重写
// batchHasPlainStatements 包含原始 sql
// rewriteBatchedStatements 支持批量重写
if (!this.batchHasPlainStatements && this.rewriteBatchedStatements.getValue()) {
// 1.1 批量插入语句,支持多值重写
if (getParseInfo().canRewriteAsMultiValueInsertAtSqlLevel()) {
// 执行批量 insert
return executeBatchedInserts(batchTimeout);
}

// 1.2 删、改操作, 没有包含原始 sql 并且批次语句数量大于 3
if (!this.batchHasPlainStatements && this.query.getBatchedArgs() != null
&& this.query.getBatchedArgs().size() > 3) {
// 执行批量 delete 或 update
return executePreparedBatchAsMultiStatement(batchTimeout);
}
}

// 2. 串行执行语句
return executeBatchSerially(batchTimeout);
} finally {
this.query.getStatementExecuting().set(false);

clearBatch();
}
}
}

从源码实现可知,MySQL 的批处理操作,要求语句不包含原生 sql,且数据连接支持批量重写。因为 insert 语句的批量重写规则(多值拼接)与 deleteupdate 语句(英文分号拼接)不同 ,因此独立出 insert 批处理过程。

原生 sql 的简单示例:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
PreparedStatement preparedStatement = connection.prepareStatement("");
for (int i = 0; i < 10; i++) {
StringBuilder sql = new StringBuilder();
sql.append("INSERT INTO extenal_studentcj(grade,clazz,zkzh,NAME,scoretext,times) VALUES(");
sql.append("'").append(i).append("',");
sql.append("'").append(i).append("',");
sql.append("'").append(i).append("',");
sql.append("'").append(i).append("',");
sql.append("'").append(i).append("',");
sql.append("'").append(i).append("'");
sql.append(");");
pst.addBatch(sql.toString());
}

preparedStatement.executeBatch();
2.1 Insert 批处理

基础实现:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
protected long[] executeBatchedInserts(int batchTimeout) throws SQLException {
synchronized (checkClosed().getConnectionMutex()) {
// 获取语句值模板; 如 (?, ?, ?)
String valuesClause = getParseInfo().getValuesClause();

JdbcConnection locallyScopedConn = this.connection;

if (valuesClause == null) {
return executeBatchSerially(batchTimeout);
}

// insert 语句总数
int numBatchedArgs = this.query.getBatchedArgs().size();

if (this.retrieveGeneratedKeys) {
this.batchedGeneratedKeys = new ArrayList<>(numBatchedArgs);
}

// 计算每个批次的语句数量
int numValuesPerBatch = ((PreparedQuery<?>) this.query).computeBatchSize(numBatchedArgs);

if (numBatchedArgs < numValuesPerBatch) {
numValuesPerBatch = numBatchedArgs;
}

JdbcPreparedStatement batchedStatement = null;

int batchedParamIndex = 1;
long updateCountRunningTotal = 0;
int numberToExecuteAsMultiValue = 0;
int batchCounter = 0;
CancelQueryTask timeoutTask = null;
SQLException sqlEx = null;

long[] updateCounts = new long[numBatchedArgs];

try {
try {
// 构建批量 insert 预编译语句
batchedStatement = prepareBatchedInsertSQL(locallyScopedConn, numValuesPerBatch);

timeoutTask = startQueryTimer(batchedStatement, batchTimeout);

// 计算需要批量 insert 的次数
numberToExecuteAsMultiValue = numBatchedArgs < numValuesPerBatch ? numBatchedArgs : numBatchedArgs / numValuesPerBatch;

// 计算 batchedStatement 需要 insert 的语句总量
int numberArgsToExecute = numberToExecuteAsMultiValue * numValuesPerBatch;

// 循环填补 insert 值
for (int i = 0; i < numberArgsToExecute; i++) {
// 填补完一个批次, 发起执行, 然后再填补下一个批次
if (i != 0 && i % numValuesPerBatch == 0) {
try {
updateCountRunningTotal += batchedStatement.executeLargeUpdate();
} catch (SQLException ex) {
sqlEx = handleExceptionForBatch(batchCounter - 1, numValuesPerBatch, updateCounts, ex);
}

getBatchedGeneratedKeys(batchedStatement);
batchedStatement.clearParameters();
batchedParamIndex = 1;
}

batchedParamIndex = setOneBatchedParameterSet(batchedStatement, batchedParamIndex, this.query.getBatchedArgs().get(batchCounter++));
}

try {
updateCountRunningTotal += batchedStatement.executeLargeUpdate();
} catch (SQLException ex) {
sqlEx = handleExceptionForBatch(batchCounter - 1, numValuesPerBatch, updateCounts, ex);
}

getBatchedGeneratedKeys(batchedStatement);

numValuesPerBatch = numBatchedArgs - batchCounter;
} finally {
if (batchedStatement != null) {
batchedStatement.close();
batchedStatement = null;
}
}

// 如果按照以上批次执行, 仍有未执行的语句, 则在这里统一执行
try {
if (numValuesPerBatch > 0) {
batchedStatement = prepareBatchedInsertSQL(locallyScopedConn, numValuesPerBatch);

if (timeoutTask != null) {
timeoutTask.setQueryToCancel(batchedStatement);
}

batchedParamIndex = 1;

while (batchCounter < numBatchedArgs) {
batchedParamIndex = setOneBatchedParameterSet(batchedStatement, batchedParamIndex, this.query.getBatchedArgs().get(batchCounter++));
}

try {
updateCountRunningTotal += batchedStatement.executeLargeUpdate();
} catch (SQLException ex) {
sqlEx = handleExceptionForBatch(batchCounter - 1, numValuesPerBatch, updateCounts, ex);
}

getBatchedGeneratedKeys(batchedStatement);
}

if (sqlEx != null) {
throw SQLError.createBatchUpdateException(sqlEx, updateCounts, this.exceptionInterceptor);
}

if (numBatchedArgs > 1) {
long updCount = updateCountRunningTotal > 0 ? java.sql.Statement.SUCCESS_NO_INFO : 0;
for (int j = 0; j < numBatchedArgs; j++) {
updateCounts[j] = updCount;
}
} else {
updateCounts[0] = updateCountRunningTotal;
}
return updateCounts;
} finally {
if (batchedStatement != null) {
batchedStatement.close();
}
}
} finally {
stopQueryTimer(timeoutTask, false, false);
resetCancelledState();
}
}
}

从源码实现可知,insert 语句批处理,首先会按照一定的批次大小处理语句,剩余不够一个批次执行的,最后会交给保底任务执行。以插入10万条学生信息数据为例,具体的处理流程如下:

2.2 Delete、Update 批处理

基础实现:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
protected long[] executePreparedBatchAsMultiStatement(int batchTimeout) throws SQLException {
synchronized (checkClosed().getConnectionMutex()) {
// This is kind of an abuse, but it gets the job done
if (this.batchedValuesClause == null) {
this.batchedValuesClause = ((PreparedQuery<?>) this.query).getOriginalSql() + ";";
}

JdbcConnection locallyScopedConn = this.connection;

boolean multiQueriesEnabled = locallyScopedConn.getPropertySet().getBooleanProperty(PropertyKey.allowMultiQueries).getValue();
CancelQueryTask timeoutTask = null;

try {
clearWarnings();

// 语句总数
int numBatchedArgs = this.query.getBatchedArgs().size();

if (this.retrieveGeneratedKeys) {
this.batchedGeneratedKeys = new ArrayList<>(numBatchedArgs);
}

// 计算每个批次的语句数量
int numValuesPerBatch = ((PreparedQuery<?>) this.query).computeBatchSize(numBatchedArgs);

if (numBatchedArgs < numValuesPerBatch) {
numValuesPerBatch = numBatchedArgs;
}

java.sql.PreparedStatement batchedStatement = null;

int batchedParamIndex = 1;
int numberToExecuteAsMultiValue = 0;
int batchCounter = 0;
int updateCountCounter = 0;
long[] updateCounts = new long[numBatchedArgs * getParseInfo().getNumberOfQueries()];
SQLException sqlEx = null;

try {
if (!multiQueriesEnabled) {
((NativeSession) locallyScopedConn.getSession()).enableMultiQueries();
}

// 构建批处理语句
batchedStatement = this.retrieveGeneratedKeys
? ((Wrapper) locallyScopedConn.prepareStatement(generateMultiStatementForBatch(numValuesPerBatch), RETURN_GENERATED_KEYS)).unwrap(java.sql.PreparedStatement.class)
: ((Wrapper) locallyScopedConn.prepareStatement(generateMultiStatementForBatch(numValuesPerBatch))).unwrap(java.sql.PreparedStatement.class);

timeoutTask = startQueryTimer((StatementImpl) batchedStatement, batchTimeout);

// 计算批处理次数
numberToExecuteAsMultiValue = numBatchedArgs < numValuesPerBatch ? numBatchedArgs : numBatchedArgs / numValuesPerBatch;

// 计算 batchedStatement 需要执行的语句数量
int numberArgsToExecute = numberToExecuteAsMultiValue * numValuesPerBatch;

// 循环填补语句值
for (int i = 0; i < numberArgsToExecute; i++) {
// 填补完一个批次, 发起执行, 然后再填补下一个批次
if (i != 0 && i % numValuesPerBatch == 0) {
try {
batchedStatement.execute();
} catch (SQLException ex) {
sqlEx = handleExceptionForBatch(batchCounter, numValuesPerBatch, updateCounts, ex);
}

updateCountCounter = processMultiCountsAndKeys((StatementImpl) batchedStatement, updateCountCounter, updateCounts);

batchedStatement.clearParameters();
batchedParamIndex = 1;
}

batchedParamIndex = setOneBatchedParameterSet(batchedStatement, batchedParamIndex, this.query.getBatchedArgs().get(batchCounter++));
}

try {
batchedStatement.execute();
} catch (SQLException ex) {
sqlEx = handleExceptionForBatch(batchCounter - 1, numValuesPerBatch, updateCounts, ex);
}

updateCountCounter = processMultiCountsAndKeys((StatementImpl) batchedStatement, updateCountCounter, updateCounts);

batchedStatement.clearParameters();

numValuesPerBatch = numBatchedArgs - batchCounter;

if (timeoutTask != null) {
// we need to check the cancel state now because we loose if after the following batchedStatement.close()
((JdbcPreparedStatement) batchedStatement).checkCancelTimeout();
}
} finally {
if (batchedStatement != null) {
batchedStatement.close();
batchedStatement = null;
}
}

// 如果按照以上批次执行, 仍有未执行的语句, 则在这里统一执行
try {
if (numValuesPerBatch > 0) {

batchedStatement = this.retrieveGeneratedKeys
? locallyScopedConn.prepareStatement(generateMultiStatementForBatch(numValuesPerBatch), RETURN_GENERATED_KEYS)
: locallyScopedConn.prepareStatement(generateMultiStatementForBatch(numValuesPerBatch));

if (timeoutTask != null) {
timeoutTask.setQueryToCancel((Query) batchedStatement);
}

batchedParamIndex = 1;

while (batchCounter < numBatchedArgs) {
batchedParamIndex = setOneBatchedParameterSet(batchedStatement, batchedParamIndex, this.query.getBatchedArgs().get(batchCounter++));
}

try {
batchedStatement.execute();
} catch (SQLException ex) {
sqlEx = handleExceptionForBatch(batchCounter - 1, numValuesPerBatch, updateCounts, ex);
}

updateCountCounter = processMultiCountsAndKeys((StatementImpl) batchedStatement, updateCountCounter, updateCounts);

batchedStatement.clearParameters();
}

if (timeoutTask != null) {
stopQueryTimer(timeoutTask, true, true);
timeoutTask = null;
}

if (sqlEx != null) {
throw SQLError.createBatchUpdateException(sqlEx, updateCounts, this.exceptionInterceptor);
}

return updateCounts;
} finally {
if (batchedStatement != null) {
batchedStatement.close();
}
}
} finally {
stopQueryTimer(timeoutTask, false, false);
resetCancelledState();

if (!multiQueriesEnabled) {
((NativeSession) locallyScopedConn.getSession()).disableMultiQueries();
}

clearBatch();
}
}
}

deleteupdate 语句的批处理方案,与 insert 类似,区别在于 deleteupdate 语句是用英文分号拼接起来的,而 insert 语句是将 values 拼接起来的。以更新10万条学生信息数据为例,具体的处理流程如下:

三、MySQL 批处理性能测试

3.1 批量插入 100000 条学生信息:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
public void batchInsert() {

long start = System.currentTimeMillis();

Connection connection = null;

String sqlTemplate = "INSERT INTO student_target(student_id, name, address) VALUES (?, ?, ?)";

try {
// 查询10万条数据
List<Student> studentList = studentMapper.queryAll();

// 获取数据连接
connection = batchDemoDB.getConnection();

connection.setAutoCommit(false);

PreparedStatement preparedStatement = connection.prepareStatement(sqlTemplate);
for (Student student : studentList) {
preparedStatement.setLong(1, student.getStudentId());
preparedStatement.setString(2, student.getName());
preparedStatement.setString(3, student.getAddress());
preparedStatement.addBatch();
}

preparedStatement.executeBatch();

connection.commit();
} catch (Exception e) {
try {
connection.rollback();
} catch (Exception e2) {
LOGGER.error("transaction rollback failed: {}", e2.getMessage(), e2);
}
LOGGER.error("batch insert student info error: {}", e.getMessage(), e);
} finally {
try {
connection.close();
} catch (Exception e) {
LOGGER.error("connection close error: {}", e.getMessage(), e);
}
}

LOGGER.info("插入100000条数据, 耗时: {} ms", System.currentTimeMillis() - start);
}
  • rewriteBatchedStatements=false

    1
    插入100000条数据, 耗时: 18652 ms
  • rewriteBatchedStatements=true

    1
    插入100000条数据, 耗时: 2404 ms
3.2 批量更新 100000 条学生信息
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
public void batchUpdate() {

long start = System.currentTimeMillis();

Connection connection = null;

String sqlTemplate = "UPDATE student_target set address = ? WHERE student_id = ?";

try {
// 查询10万条数据
List<Student> studentList = studentMapper.queryAll();

// 获取数据连接
connection = batchDemoDB.getConnection();

connection.setAutoCommit(false);

PreparedStatement preparedStatement = connection.prepareStatement(sqlTemplate);
for (Student student : studentList) {
preparedStatement.setString(1, student.getName() + "_呵呵");
preparedStatement.setLong(2, student.getStudentId());
preparedStatement.addBatch();
}

preparedStatement.executeBatch();

connection.commit();
} catch (Exception e) {
try {
connection.rollback();
} catch (Exception e2) {
LOGGER.error("transaction rollback failed: {}", e2.getMessage(), e2);
}
LOGGER.error("batch update student info error: {}", e.getMessage(), e);
} finally {
try {
connection.close();
} catch (Exception e) {
LOGGER.error("connection close error: {}", e.getMessage(), e);
}
}

LOGGER.info("更新100000条数据, 耗时: {} ms", System.currentTimeMillis() - start);
}
  • rewriteBatchedStatements=false

    1
    更新100000条数据, 耗时: 25579 ms
  • rewriteBatchedStatements=true

    1
    更新100000条数据, 耗时: 10122 ms
3.3 批量删除 100000 条学生信息
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
public void batchDelete() {

long start = System.currentTimeMillis();

Connection connection = null;

String sqlTemplate = "DELETE FROM student_target WHERE student_id = ?";

try {
// 查询10万条数据
List<Student> studentList = studentMapper.queryAll();

// 获取数据连接
connection = batchDemoDB.getConnection();

connection.setAutoCommit(false);

PreparedStatement preparedStatement = connection.prepareStatement(sqlTemplate);
for (Student student : studentList) {
preparedStatement.setLong(1, student.getStudentId());
preparedStatement.addBatch();
}

preparedStatement.executeBatch();

connection.commit();
} catch (Exception e) {
try {
connection.rollback();
} catch (Exception e2) {
LOGGER.error("transaction rollback failed: {}", e2.getMessage(), e2);
}
LOGGER.error("batch delete student info error: {}", e.getMessage(), e);
} finally {
try {
connection.close();
} catch (Exception e) {
LOGGER.error("connection close error: {}", e.getMessage(), e);
}
}

LOGGER.info("删除100000条数据, 耗时: {} ms", System.currentTimeMillis() - start);
}
  • rewriteBatchedStatements=false

    1
    删除100000条数据, 耗时: 20817 ms
  • rewriteBatchedStatements=true

    1
    删除100000条数据, 耗时: 12053 ms


一、 Spring 线程池

Spring 自带的线程池 ThreadPoolTaskExecutor ,本质上是对 ThreadPoolExecutor 的包装。一方面,基于 SpringBoot 的项目可以通过 yamlproperties 文件快速配置线程池,并借助 @RefreshScope 实现线程池的热部署;另一方面,通过@EnableAsync@Async配置,可以方便地执行异步并发任务,无需编写异步调用代码。 凭借 Spring 生态圈,ThreadPoolTaskExecutor 因其良好的设计感和兼容性而被广泛使用。

二、 Spring 线程池死锁

jdk 线程池一样,Spring 线程池同样潜藏着一些坑,比如死锁。不得不说这是一个大坑。

首先,在业务上难以理解。线程池是面向 Task 的工具,无非就做三件事:创建并管理线程、执行任务调度、存放待办任务。对于 Task 来说,抢占到资源的就执行,抢占不到的,就按照拒绝策略处理。不同的 Task 怎么会互相等待呢?

其次,在技术上难以定位故障。线程池处于死锁状态,导致主线程走着走着就“失踪了”。重启系统后可以正常工作,一旦压测大概率又会死掉。没有错误日志,没有 cpu 飙升,没有 FullGC。从表象上看,跟并发有关系,而且属于偶发故障。我们把 dump 拉下来分析一下:

1
2
3
4
5
6
7
http-nio-8080-exec-32" daemon prio=5 tid=1676 WAITING
at sun.misc.Unsafe.park(Native Method)
at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
at java.util.concurrent.FutureTask.awaitDone(FutureTask.java:429)
Local Variable: java.util.concurrent.FutureTask$WaitNode#4
at java.util.concurrent.FutureTask.get(FutureTask.java:191)
Local Variable: java.util.concurrent.FutureTask#4

线程(1676) 目前处于 WAITING 状态,原因是 FutureTask 在等待 cpu 的宠幸。但在压测场景下,任务抢占线程资源实属正常,所以此时还没有朝着死锁上面想。

最后,冒出一个念头:如果 Task 嵌套 Task,并且共用同一个线程池,那么在并发争夺资源的场景下,就有可能出现外部 Task 等待内部 Task、内部 Task 等待外部 Task 的情况,从而导致整个线程池死锁。

三、场景模拟

为了更好地理解,我们用打工人进站乘地铁这件事来模拟 Spring 线程池的工作机制。

假设有 5 个常开闸机(核心线程数),5 个应急闸机(5+5=最大线程数),进站围栏可以排 100 人(阻塞队列大小)。

约定进站规则:

  • 必须有票;
  • 必须按照先来后到的顺序进站;
  • 一旦走到闸机口或者进入围栏,就不可再撤退;
  • 先使用 5 个常开闸机进站;
  • 5 个常开闸机若都有乘客进入,则安排后来者在进站围栏里排队等待;
  • 若进站围栏已经站满了乘客,则开启另外 5 个应急闸机(一个个开启);

为了模拟 Task 任务嵌套的场景,假设所有打工人都有一个同伴,而且都是走到闸机口才发现忘记买票,只能安排同伴去买票。这里:

  • 外层 Task:打工人进闸机;
  • 内层 Task:打工人的同伴出去买票;

假设张三、李四等人走到闸机口时才发现忘了买票,于是便可能发生如下对话:

  • 张三:“怎么出去买个票要这么久?”

  • 张三的同伴:“我在排队呢!闸机门口的人别挡着路!赶紧进去!”

  • 李四:“这边的人都忘了买票啦!伙伴们不把票带过来,我们也进不去啊!”

  • 熊二的同伴:“你们不进去,我们就没法向前走啊!”

  • 郑十:“你们都别吵了,我的同伴还在买票的路上呢!”

大家吵成一团,场面十分焦灼,但违反规定就会被拉去枪毙,于是这波打工人都堵在闸机口,一个能进站的都没有。

四、代码模拟

定义线程池:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;

/**
* @author zourongsheng
*/
@Configuration
public class ExecutorConfig {

public static final String TASK_EXECUTOR = "taskExecutor";

/**
* @return 任务线程池
*/
@Bean(TASK_EXECUTOR)
public ThreadPoolTaskExecutor taskExecutor() {
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
// 核心线程数
executor.setCorePoolSize(5);
// 最大线程数
executor.setMaxPoolSize(10);
// 阻塞队列大小
executor.setQueueCapacity(100);
// 线程最大空闲时间
executor.setKeepAliveSeconds(60);
// 线程名称前缀
executor.setThreadNamePrefix("common-sync-executor-");
// 拒绝策略
executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
executor.setWaitForTasksToCompleteOnShutdown(true);
executor.setAwaitTerminationSeconds(30);
return executor;
}
}

测试方法:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
/**
* @author zourongsheng
* @date 2021/12/23 14:33
*/
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import javax.annotation.Resource;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;

import java.util.ArrayList;
import java.util.List;
import java.util.Random;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;

import org.junit.runner.RunWith;
import org.springframework.boot.test.context.SpringBootTest;

import static ExecutorConfig.TASK_EXECUTOR;

@RunWith(SpringJUnit4ClassRunner.class)
@SpringBootTest(classes = ServiceInitializer.class)
public class TaskTest {

private static Logger LOGGER = LoggerFactory.getLogger(TaskTest.class);

@Resource(name = TASK_EXECUTOR)
private ThreadPoolTaskExecutor taskExecutor;

@Test
public void test() {
// 初始化打工人的姓名
int workerCount = 500;
List<String> workerNameList = new ArrayList<>(workerCount);
for (int i = 1; i <= workerCount; i++) {
workerNameList.add(String.format("%s号打工人", i));
}

Random random = new Random();

// 打工人进站
final List<CompletableFuture<Void>> completableFutures = workerNameList
.stream()
.map(workerName -> CompletableFuture.runAsync(() -> {
// 打印线程池的运行信息
this.printThreadPoolTaskExecutorInfo(workerName);

// 外层Task: 打工人进站
int passGateMillSecond = random.nextInt(500);
try {
TimeUnit.MILLISECONDS.sleep(passGateMillSecond);
} catch (InterruptedException e) {
e.printStackTrace();
}

int buyTicketMillSecond = random.nextInt(500);
LOGGER.info("{}走到闸机口, 发现未买票, 安排同伴去买票, 需要{}毫秒", workerName, buyTicketMillSecond);

// 内层Task: 打工人的同伴出来买票
try {
taskExecutor.submit(() -> {
LOGGER.info("{}的同伴出来买票了", workerName);
try {
TimeUnit.MILLISECONDS.sleep(buyTicketMillSecond);
} catch (InterruptedException e) {
e.printStackTrace();
}
return true;
}).get();
} catch (InterruptedException | ExecutionException e) {
LOGGER.error("{}的同伴出来买票失败:{}", workerName, e.getMessage(), e);
}
LOGGER.info("{}与同伴成功进站", workerName);
}, taskExecutor).exceptionally(e -> {
LOGGER.error("{}进站失败: {}", workerName, e.getMessage(), e);
return null;
}))
.collect(Collectors.toList());
// 等待所有打工人进站
completableFutures.stream().map(CompletableFuture::join).collect(Collectors.toList());

LOGGER.info("打工人都进站了");
}

/**
* 【打印线程池运行信息】
*/
private void printThreadPoolTaskExecutorInfo(String workName) {
LOGGER.info("进站人:{}", workName);
LOGGER.info("核心线程数:{}", taskExecutor.getCorePoolSize());
LOGGER.info("线程池大小:{}", taskExecutor.getPoolSize());
LOGGER.info("活跃线程数:{}", taskExecutor.getActiveCount());
LOGGER.info("线程保持时间(秒):{}", taskExecutor.getKeepAliveSeconds());
LOGGER.info("线程池最大数量:{}", taskExecutor.getMaxPoolSize());
LOGGER.info("线程池等待的任务数量: {}", taskExecutor.getThreadPoolExecutor().getQueue().size());
LOGGER.info("线程池已完成任务数量: {}", taskExecutor.getThreadPoolExecutor().getCompletedTaskCount());
}
}

运行起来,日志打印:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
13:02:46.354 [common-sync-executor-1] INFO TaskTest - 进站人:1号打工人
13:02:46.355 [common-sync-executor-1] INFO TaskTest - 核心线程数:5
13:02:46.355 [common-sync-executor-1] INFO TaskTest - 线程池大小:3
13:02:46.355 [common-sync-executor-1] INFO TaskTest - 活跃线程数:3
13:02:46.355 [common-sync-executor-2] INFO TaskTest - 进站人:2号打工人
13:02:46.355 [common-sync-executor-1] INFO TaskTest - 线程保持时间(秒):60
13:02:46.355 [common-sync-executor-2] INFO TaskTest - 核心线程数:5
13:02:46.355 [common-sync-executor-1] INFO TaskTest - 线程池最大数量:10
13:02:46.355 [common-sync-executor-2] INFO TaskTest - 线程池大小:3
13:02:46.355 [common-sync-executor-1] INFO TaskTest - 线程池等待的任务数量: 0
13:02:46.355 [common-sync-executor-2] INFO TaskTest - 活跃线程数:3
13:02:46.355 [common-sync-executor-2] INFO TaskTest - 线程保持时间(秒):60
13:02:46.355 [common-sync-executor-1] INFO TaskTest - 线程池已完成任务数量: 0
13:02:46.355 [common-sync-executor-2] INFO TaskTest - 线程池最大数量:10
13:02:46.355 [common-sync-executor-2] INFO TaskTest - 线程池等待的任务数量: 0
13:02:46.355 [common-sync-executor-2] INFO TaskTest - 线程池已完成任务数量: 0
13:02:46.355 [common-sync-executor-1] INFO TaskTest - 1号打工人走到闸机口, 发现未买票, 安排同伴去买票, 需要275毫秒
13:02:46.355 [common-sync-executor-3] INFO TaskTest - 进站人:3号打工人
13:02:46.356 [common-sync-executor-3] INFO TaskTest - 核心线程数:5
13:02:46.356 [common-sync-executor-3] INFO TaskTest - 线程池大小:4
13:02:46.356 [common-sync-executor-3] INFO TaskTest - 活跃线程数:4
13:02:46.356 [common-sync-executor-3] INFO TaskTest - 线程保持时间(秒):60
13:02:46.356 [common-sync-executor-3] INFO TaskTest - 线程池最大数量:10
13:02:46.356 [common-sync-executor-3] INFO TaskTest - 线程池等待的任务数量: 0
13:02:46.356 [common-sync-executor-3] INFO TaskTest - 线程池已完成任务数量: 0
13:02:46.356 [common-sync-executor-4] INFO TaskTest - 进站人:4号打工人
13:02:46.356 [common-sync-executor-4] INFO TaskTest - 核心线程数:5
13:02:46.356 [common-sync-executor-4] INFO TaskTest - 线程池大小:5
13:02:46.356 [common-sync-executor-4] INFO TaskTest - 活跃线程数:5
13:02:46.356 [common-sync-executor-4] INFO TaskTest - 线程保持时间(秒):60
13:02:46.356 [common-sync-executor-4] INFO TaskTest - 线程池最大数量:10
13:02:46.356 [common-sync-executor-4] INFO TaskTest - 线程池等待的任务数量: 0
13:02:46.356 [common-sync-executor-4] INFO TaskTest - 线程池已完成任务数量: 0
13:02:46.356 [common-sync-executor-5] INFO TaskTest - 进站人:5号打工人
13:02:46.356 [common-sync-executor-5] INFO TaskTest - 核心线程数:5
13:02:46.357 [common-sync-executor-5] INFO TaskTest - 线程池大小:5
13:02:46.357 [common-sync-executor-5] INFO TaskTest - 活跃线程数:5
13:02:46.357 [common-sync-executor-5] INFO TaskTest - 线程保持时间(秒):60
13:02:46.357 [common-sync-executor-5] INFO TaskTest - 线程池最大数量:10
13:02:46.357 [common-sync-executor-5] INFO TaskTest - 线程池等待的任务数量: 100
13:02:46.357 [common-sync-executor-5] INFO TaskTest - 线程池已完成任务数量: 0
13:02:46.358 [common-sync-executor-6] INFO TaskTest - 进站人:106号打工人
13:02:46.358 [common-sync-executor-6] INFO TaskTest - 核心线程数:5
13:02:46.358 [common-sync-executor-6] INFO TaskTest - 线程池大小:8
13:02:46.358 [common-sync-executor-6] INFO TaskTest - 活跃线程数:8
13:02:46.358 [common-sync-executor-7] INFO TaskTest - 1号打工人的同伴出来买票了
13:02:46.358 [common-sync-executor-6] INFO TaskTest - 线程保持时间(秒):60
13:02:46.358 [common-sync-executor-6] INFO TaskTest - 线程池最大数量:10
13:02:46.358 [common-sync-executor-6] INFO TaskTest - 线程池等待的任务数量: 100
13:02:46.358 [common-sync-executor-6] INFO TaskTest - 线程池已完成任务数量: 0
13:02:46.358 [common-sync-executor-8] INFO TaskTest - 进站人:107号打工人
13:02:46.358 [common-sync-executor-8] INFO TaskTest - 核心线程数:5
13:02:46.358 [common-sync-executor-8] INFO TaskTest - 线程池大小:9
13:02:46.358 [common-sync-executor-8] INFO TaskTest - 活跃线程数:9
13:02:46.359 [common-sync-executor-8] INFO TaskTest - 线程保持时间(秒):60
13:02:46.359 [common-sync-executor-8] INFO TaskTest - 线程池最大数量:10
13:02:46.359 [common-sync-executor-8] INFO TaskTest - 线程池等待的任务数量: 100
13:02:46.359 [common-sync-executor-8] INFO TaskTest - 线程池已完成任务数量: 0
13:02:46.359 [common-sync-executor-9] INFO TaskTest - 进站人:108号打工人
13:02:46.359 [common-sync-executor-9] INFO TaskTest - 核心线程数:5
13:02:46.359 [common-sync-executor-9] INFO TaskTest - 线程池大小:10
13:02:46.359 [common-sync-executor-9] INFO TaskTest - 活跃线程数:10
13:02:46.359 [common-sync-executor-9] INFO TaskTest - 线程保持时间(秒):60
13:02:46.359 [common-sync-executor-9] INFO TaskTest - 线程池最大数量:10
13:02:46.359 [common-sync-executor-9] INFO TaskTest - 线程池等待的任务数量: 100
13:02:46.359 [common-sync-executor-9] INFO TaskTest - 线程池已完成任务数量: 0
13:02:46.359 [main] INFO TaskTest - 进站人:110号打工人
13:02:46.359 [common-sync-executor-10] INFO TaskTest - 进站人:109号打工人
13:02:46.359 [main] INFO TaskTest - 核心线程数:5
13:02:46.359 [common-sync-executor-10] INFO TaskTest - 核心线程数:5
13:02:46.359 [common-sync-executor-10] INFO TaskTest - 线程池大小:10
13:02:46.359 [main] INFO TaskTest - 线程池大小:10
13:02:46.359 [common-sync-executor-10] INFO TaskTest - 活跃线程数:10
13:02:46.359 [main] INFO TaskTest - 活跃线程数:10
13:02:46.359 [common-sync-executor-10] INFO TaskTest - 线程保持时间(秒):60
13:02:46.359 [main] INFO TaskTest - 线程保持时间(秒):60
13:02:46.359 [common-sync-executor-10] INFO TaskTest - 线程池最大数量:10
13:02:46.359 [main] INFO TaskTest - 线程池最大数量:10
13:02:46.359 [common-sync-executor-10] INFO TaskTest - 线程池等待的任务数量: 100
13:02:46.359 [main] INFO TaskTest - 线程池等待的任务数量: 100
13:02:46.359 [common-sync-executor-10] INFO TaskTest - 线程池已完成任务数量: 0
13:02:46.360 [main] INFO TaskTest - 线程池已完成任务数量: 0
13:02:46.368 [common-sync-executor-8] INFO TaskTest - 107号打工人走到闸机口, 发现未买票, 安排同伴去买票, 需要81毫秒
13:02:46.368 [common-sync-executor-8] INFO TaskTest - 107号打工人的同伴出来买票了
13:02:46.417 [common-sync-executor-2] INFO TaskTest - 2号打工人走到闸机口, 发现未买票, 安排同伴去买票, 需要391毫秒
13:02:46.417 [common-sync-executor-2] INFO TaskTest - 2号打工人的同伴出来买票了
13:02:46.449 [common-sync-executor-8] INFO TaskTest - 107号打工人与同伴成功进站
13:02:46.449 [common-sync-executor-8] INFO TaskTest - 进站人:6号打工人
13:02:46.449 [common-sync-executor-8] INFO TaskTest - 核心线程数:5
13:02:46.449 [common-sync-executor-8] INFO TaskTest - 线程池大小:10
13:02:46.449 [common-sync-executor-8] INFO TaskTest - 活跃线程数:10
13:02:46.449 [common-sync-executor-8] INFO TaskTest - 线程保持时间(秒):60
13:02:46.449 [common-sync-executor-8] INFO TaskTest - 线程池最大数量:10
13:02:46.449 [common-sync-executor-8] INFO TaskTest - 线程池等待的任务数量: 99
13:02:46.449 [common-sync-executor-8] INFO TaskTest - 线程池已完成任务数量: 1
13:02:46.515 [common-sync-executor-4] INFO TaskTest - 4号打工人走到闸机口, 发现未买票, 安排同伴去买票, 需要34毫秒
13:02:46.559 [common-sync-executor-6] INFO TaskTest - 106号打工人走到闸机口, 发现未买票, 安排同伴去买票, 需要116毫秒
13:02:46.559 [common-sync-executor-6] INFO TaskTest - 106号打工人的同伴出来买票了
13:02:46.570 [common-sync-executor-9] INFO TaskTest - 108号打工人走到闸机口, 发现未买票, 安排同伴去买票, 需要149毫秒
13:02:46.570 [common-sync-executor-9] INFO TaskTest - 108号打工人的同伴出来买票了
13:02:46.633 [common-sync-executor-7] INFO TaskTest - 进站人:7号打工人
13:02:46.633 [common-sync-executor-1] INFO TaskTest - 1号打工人与同伴成功进站
13:02:46.633 [common-sync-executor-7] INFO TaskTest - 核心线程数:5
13:02:46.633 [common-sync-executor-7] INFO TaskTest - 线程池大小:10
13:02:46.633 [common-sync-executor-1] INFO TaskTest - 进站人:8号打工人
13:02:46.633 [common-sync-executor-7] INFO TaskTest - 活跃线程数:10
13:02:46.633 [common-sync-executor-1] INFO TaskTest - 核心线程数:5
13:02:46.633 [common-sync-executor-1] INFO TaskTest - 线程池大小:10
13:02:46.633 [common-sync-executor-7] INFO TaskTest - 线程保持时间(秒):60
13:02:46.633 [common-sync-executor-7] INFO TaskTest - 线程池最大数量:10
13:02:46.633 [common-sync-executor-1] INFO TaskTest - 活跃线程数:10
13:02:46.633 [common-sync-executor-1] INFO TaskTest - 线程保持时间(秒):60
13:02:46.633 [common-sync-executor-7] INFO TaskTest - 线程池等待的任务数量: 98
13:02:46.633 [common-sync-executor-7] INFO TaskTest - 线程池已完成任务数量: 3
13:02:46.633 [common-sync-executor-1] INFO TaskTest - 线程池最大数量:10
13:02:46.633 [common-sync-executor-1] INFO TaskTest - 线程池等待的任务数量: 98
13:02:46.633 [common-sync-executor-1] INFO TaskTest - 线程池已完成任务数量: 3
13:02:46.675 [common-sync-executor-6] INFO TaskTest - 106号打工人与同伴成功进站
13:02:46.675 [common-sync-executor-6] INFO TaskTest - 进站人:9号打工人
13:02:46.675 [common-sync-executor-6] INFO TaskTest - 核心线程数:5
13:02:46.675 [common-sync-executor-6] INFO TaskTest - 线程池大小:10
13:02:46.675 [common-sync-executor-6] INFO TaskTest - 活跃线程数:10
13:02:46.675 [common-sync-executor-6] INFO TaskTest - 线程保持时间(秒):60
13:02:46.675 [common-sync-executor-6] INFO TaskTest - 线程池最大数量:10
13:02:46.675 [common-sync-executor-6] INFO TaskTest - 线程池等待的任务数量: 97
13:02:46.675 [common-sync-executor-6] INFO TaskTest - 线程池已完成任务数量: 4
13:02:46.719 [common-sync-executor-9] INFO TaskTest - 108号打工人与同伴成功进站
13:02:46.719 [common-sync-executor-5] INFO TaskTest - 5号打工人走到闸机口, 发现未买票, 安排同伴去买票, 需要380毫秒
13:02:46.719 [common-sync-executor-9] INFO TaskTest - 进站人:10号打工人
13:02:46.719 [common-sync-executor-9] INFO TaskTest - 核心线程数:5
13:02:46.719 [common-sync-executor-9] INFO TaskTest - 线程池大小:10
13:02:46.719 [common-sync-executor-9] INFO TaskTest - 活跃线程数:10
13:02:46.719 [common-sync-executor-9] INFO TaskTest - 线程保持时间(秒):60
13:02:46.719 [common-sync-executor-9] INFO TaskTest - 线程池最大数量:10
13:02:46.719 [common-sync-executor-9] INFO TaskTest - 线程池等待的任务数量: 97
13:02:46.719 [common-sync-executor-9] INFO TaskTest - 线程池已完成任务数量: 5
13:02:46.784 [main] INFO TaskTest - 110号打工人走到闸机口, 发现未买票, 安排同伴去买票, 需要291毫秒
13:02:46.796 [common-sync-executor-3] INFO TaskTest - 3号打工人走到闸机口, 发现未买票, 安排同伴去买票, 需要205毫秒
13:02:46.808 [common-sync-executor-2] INFO TaskTest - 2号打工人与同伴成功进站
13:02:46.808 [common-sync-executor-2] INFO TaskTest - 进站人:11号打工人
13:02:46.808 [common-sync-executor-2] INFO TaskTest - 核心线程数:5
13:02:46.808 [common-sync-executor-2] INFO TaskTest - 线程池大小:10
13:02:46.808 [common-sync-executor-2] INFO TaskTest - 活跃线程数:10
13:02:46.808 [common-sync-executor-2] INFO TaskTest - 线程保持时间(秒):60
13:02:46.808 [common-sync-executor-2] INFO TaskTest - 线程池最大数量:10
13:02:46.808 [common-sync-executor-2] INFO TaskTest - 线程池等待的任务数量: 98
13:02:46.808 [common-sync-executor-2] INFO TaskTest - 线程池已完成任务数量: 6
13:02:46.815 [common-sync-executor-2] INFO TaskTest - 11号打工人走到闸机口, 发现未买票, 安排同伴去买票, 需要192毫秒
13:02:46.821 [common-sync-executor-10] INFO TaskTest - 109号打工人走到闸机口, 发现未买票, 安排同伴去买票, 需要115毫秒
13:02:46.837 [common-sync-executor-1] INFO TaskTest - 8号打工人走到闸机口, 发现未买票, 安排同伴去买票, 需要67毫秒
13:02:46.837 [common-sync-executor-1] INFO TaskTest - 8号打工人的同伴出来买票了
13:02:46.904 [common-sync-executor-1] INFO TaskTest - 8号打工人与同伴成功进站
13:02:46.904 [common-sync-executor-1] INFO TaskTest - 进站人:12号打工人
13:02:46.904 [common-sync-executor-1] INFO TaskTest - 核心线程数:5
13:02:46.904 [common-sync-executor-1] INFO TaskTest - 线程池大小:10
13:02:46.904 [common-sync-executor-1] INFO TaskTest - 活跃线程数:10
13:02:46.904 [common-sync-executor-1] INFO TaskTest - 线程保持时间(秒):60
13:02:46.904 [common-sync-executor-1] INFO TaskTest - 线程池最大数量:10
13:02:46.904 [common-sync-executor-1] INFO TaskTest - 线程池等待的任务数量: 99
13:02:46.904 [common-sync-executor-1] INFO TaskTest - 线程池已完成任务数量: 7
13:02:46.934 [common-sync-executor-8] INFO TaskTest - 6号打工人走到闸机口, 发现未买票, 安排同伴去买票, 需要198毫秒
13:02:46.949 [common-sync-executor-7] INFO TaskTest - 7号打工人走到闸机口, 发现未买票, 安排同伴去买票, 需要470毫秒
13:02:46.949 [common-sync-executor-7] INFO TaskTest - 7号打工人的同伴出来买票了
13:02:46.969 [common-sync-executor-6] INFO TaskTest - 9号打工人走到闸机口, 发现未买票, 安排同伴去买票, 需要142毫秒
13:02:46.969 [common-sync-executor-6] INFO TaskTest - 9号打工人的同伴出来买票了
13:02:47.102 [common-sync-executor-1] INFO TaskTest - 12号打工人走到闸机口, 发现未买票, 安排同伴去买票, 需要470毫秒
13:02:47.102 [common-sync-executor-1] INFO TaskTest - 12号打工人的同伴出来买票了
13:02:47.111 [common-sync-executor-6] INFO TaskTest - 9号打工人与同伴成功进站
13:02:47.111 [common-sync-executor-9] INFO TaskTest - 10号打工人走到闸机口, 发现未买票, 安排同伴去买票, 需要109毫秒
13:02:47.111 [common-sync-executor-6] INFO TaskTest - 进站人:13号打工人
13:02:47.111 [common-sync-executor-6] INFO TaskTest - 核心线程数:5
13:02:47.111 [common-sync-executor-6] INFO TaskTest - 线程池大小:10
13:02:47.111 [common-sync-executor-6] INFO TaskTest - 活跃线程数:10
13:02:47.111 [common-sync-executor-6] INFO TaskTest - 线程保持时间(秒):60
13:02:47.111 [common-sync-executor-6] INFO TaskTest - 线程池最大数量:10
13:02:47.111 [common-sync-executor-6] INFO TaskTest - 线程池等待的任务数量: 100
13:02:47.111 [common-sync-executor-6] INFO TaskTest - 线程池已完成任务数量: 8
13:02:47.220 [common-sync-executor-6] INFO TaskTest - 13号打工人走到闸机口, 发现未买票, 安排同伴去买票, 需要245毫秒
13:02:47.220 [common-sync-executor-6] INFO TaskTest - 13号打工人的同伴出来买票了
13:02:47.419 [common-sync-executor-7] INFO TaskTest - 7号打工人与同伴成功进站
13:02:47.419 [common-sync-executor-7] INFO TaskTest - 进站人:14号打工人
13:02:47.419 [common-sync-executor-7] INFO TaskTest - 核心线程数:5
13:02:47.419 [common-sync-executor-7] INFO TaskTest - 线程池大小:10
13:02:47.419 [common-sync-executor-7] INFO TaskTest - 活跃线程数:10
13:02:47.419 [common-sync-executor-7] INFO TaskTest - 线程保持时间(秒):60
13:02:47.419 [common-sync-executor-7] INFO TaskTest - 线程池最大数量:10
13:02:47.419 [common-sync-executor-7] INFO TaskTest - 线程池等待的任务数量: 99
13:02:47.419 [common-sync-executor-7] INFO TaskTest - 线程池已完成任务数量: 9
13:02:47.465 [common-sync-executor-6] INFO TaskTest - 13号打工人与同伴成功进站
13:02:47.465 [common-sync-executor-6] INFO TaskTest - 进站人:15号打工人
13:02:47.465 [common-sync-executor-6] INFO TaskTest - 核心线程数:5
13:02:47.465 [common-sync-executor-6] INFO TaskTest - 线程池大小:10
13:02:47.465 [common-sync-executor-6] INFO TaskTest - 活跃线程数:10
13:02:47.465 [common-sync-executor-6] INFO TaskTest - 线程保持时间(秒):60
13:02:47.465 [common-sync-executor-6] INFO TaskTest - 线程池最大数量:10
13:02:47.465 [common-sync-executor-6] INFO TaskTest - 线程池等待的任务数量: 98
13:02:47.465 [common-sync-executor-6] INFO TaskTest - 线程池已完成任务数量: 10
13:02:47.468 [common-sync-executor-6] INFO TaskTest - 15号打工人走到闸机口, 发现未买票, 安排同伴去买票, 需要392毫秒
13:02:47.572 [common-sync-executor-1] INFO TaskTest - 12号打工人与同伴成功进站
13:02:47.572 [common-sync-executor-1] INFO TaskTest - 进站人:16号打工人
13:02:47.572 [common-sync-executor-1] INFO TaskTest - 核心线程数:5
13:02:47.572 [common-sync-executor-1] INFO TaskTest - 线程池大小:10
13:02:47.572 [common-sync-executor-1] INFO TaskTest - 活跃线程数:10
13:02:47.572 [common-sync-executor-1] INFO TaskTest - 线程保持时间(秒):60
13:02:47.572 [common-sync-executor-1] INFO TaskTest - 线程池最大数量:10
13:02:47.572 [common-sync-executor-1] INFO TaskTest - 线程池等待的任务数量: 98
13:02:47.572 [common-sync-executor-1] INFO TaskTest - 线程池已完成任务数量: 11
13:02:47.898 [common-sync-executor-7] INFO TaskTest - 14号打工人走到闸机口, 发现未买票, 安排同伴去买票, 需要249毫秒
13:02:48.064 [common-sync-executor-1] INFO TaskTest - 16号打工人走到闸机口, 发现未买票, 安排同伴去买票, 需要407毫秒
...线程池饥饿死锁

从日志可以看出,TASK_EXECUTOR 线程池一共经历了三个阶段:开启核心线程处理任务;将多余任务放入阻塞队列;开启非核心线程处理任务。一共有 500 个打工人需要进站,最终只有大概 11 人(ThreadPoolExecutor().getCompletedTaskCount()返回模糊数量)成功进站。线程池死锁,主线程全部处于 WAITING 状态。

五、 Spring 线程池饥饿死锁解决方案

线程池死锁,影响比较严重。在编码过程中容易忽略(需要追溯整条业务代码链);在测试过程中不易发现(需要压测,且是偶发现象)。我们可以采用以下方案:

  • 禁止嵌套任务共用线程池(推荐);
  • 硬编码 jdk 线程池,执行完任务后手动shutdown(不推荐);
  • 回归业务,整合嵌套任务(不推荐)。


一、 Spring 对象拷贝的具体实现

Spring 对象拷贝,基于反射和内省,将源对象字段值装填到目标对象字段上。主要分以下两步:

  • 通过内省,获取源对象和目标对象的属性描述器;
  • 通过反射,解析源属性值,赋值到目标属性中;
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
/**
* Spring 对象拷贝基础方法
*
* @param source 源对象
* @param target 目标对象
* @param editable 限制目标 Class
* @param ignoreProperties 需要忽略的拷贝字段
*/
private static void copyProperties(Object source, Object target, Class<?> editable,
String... ignoreProperties) throws BeansException {

Assert.notNull(source, "Source must not be null");
Assert.notNull(target, "Target must not be null");

Class<?> actualEditable = target.getClass();
if (editable != null) {
// 如果 target 不是 editable 的实例, 则中断拷贝
if (!editable.isInstance(target)) {
throw new IllegalArgumentException("Target class [" + target.getClass().getName() +"] not assignable to Editable class [" + editable.getName() + "]");
}
actualEditable = editable;
}
// 内省目标对象, 获取其属性描述器列表
PropertyDescriptor[] targetPds = getPropertyDescriptors(actualEditable);
// 解析需要忽略拷贝的字段
List<String> ignoreList = (ignoreProperties != null ? Arrays.asList(ignoreProperties) : null);
// 遍历目标对象的属性描述器, 依次进行属性值的拷贝
for (PropertyDescriptor targetPd : targetPds) {
// 解析目标属性描述器的写入方法
Method writeMethod = targetPd.getWriteMethod();
// 如果目标属性可以写入且需要拷贝, 则内省源对象, 获取对应的属性描述器, 读取属性值并拷贝到目标属性中
if (writeMethod != null && (ignoreList == null || !ignoreList.contains(targetPd.getName()))) {
// 内省源对象, 缓存属性描述器, 并根据目标属性名称取出对应的源属性的描述器
PropertyDescriptor sourcePd = getPropertyDescriptor(source.getClass(), targetPd.getName());
if (sourcePd != null) {
// 解析源属性值的读取方法
Method readMethod = sourcePd.getReadMethod();
if (readMethod != null
&& ClassUtils.isAssignable(writeMethod.getParameterTypes()[0],
readMethod.getReturnType())) {

try {
if (!Modifier.isPublic(readMethod.getDeclaringClass().getModifiers())) {
readMethod.setAccessible(true);
}
// 读取源属性值
Object value = readMethod.invoke(source);
if (!Modifier.isPublic(writeMethod.getDeclaringClass().getModifiers())) {
writeMethod.setAccessible(true);
}
// 写入目标属性
writeMethod.invoke(target, value);
} catch (Throwable ex) {
throw new FatalBeanException("Could not copy property '" + targetPd.getName() + "' from source to target", ex);
}
}
}
}
}
}

二、 BeanUtils.copyProperties实现原理

根据以上分析,整合出 Spring 对象拷贝的实现原理:

通过内省机制,对 Bean 进行拆分,得到每个属性的描述器,缓存在 Map 中,Key为变量名,Value为属性描述器。属性描述器主要包括:属性名称、读取属性值的方法、设置属性值的方法。拷贝过程中,先获取目标属性的写入方法,再获取对应源属性的读取方法,最后通过反射拷贝属性值。

三、JavaBean内省机制

JavaBean 内省,是建立在反射基础上的,通过解析 Bean各个属性的描述器,以便通过属性描述器来操作 Bean 的一种机制。反射是将 Java 类中的各种成分映射成相应的 Java 类,可以获取所有属性以及调用任何方法。与反射不同的是,内省是通过属性描述器来暴露一个 Bean 的属性、方法和时间的,而且只有符合 JavaBean 规则的类的成员才可以调用内生 API 进行操作。

内省在 java.beans.Introspector中的具体实现:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
// 获取所有的 public 方法
Method methodList[] = getPublicDeclaredMethods(beanClass);
// 循环遍历处理每一个 public 方法, 为方便讲解, 此处我们以第一个方法为例...

Method method = methodList[0];
if (method == null) {
continue;
}
// 跳过 static 方法
int mods = method.getModifiers();
if (Modifier.isStatic(mods)) {
continue;
}
// 获取该方法的名称, 如setAge、getAge
String name = method.getName();
// 获取该方法的入参和返参
Class<?>[] argTypes = method.getParameterTypes();
Class<?> resultType = method.getReturnType();
// 获取该方法的入参个数
int argCount = argTypes.length;
PropertyDescriptor pd = null;

if (argCount == 0) {
// 1. 没有入参: 说明是获取属性值的方法
if (name.startsWith(GET_PREFIX)) {
// 1.1 该方法名称以 get 开头, 如 getAge
pd = new PropertyDescriptor(this.beanClass, name.substring(3), method, null);
} else if (resultType == boolean.class && name.startsWith(IS_PREFIX)) {
// 1.2 该方法名称以 is 开头, 如 isMale, 只处理基本类型的布尔值
pd = new PropertyDescriptor(this.beanClass, name.substring(2), method, null);
}
} else if (argCount == 1) {
// 2. 有一个入参
if (int.class.equals(argTypes[0]) && name.startsWith(GET_PREFIX)) {
// 2.1 获取属性值的方法, 如 getChild(Integer index), 则封装成索引属性器
pd = new IndexedPropertyDescriptor(this.beanClass, name.substring(3), null, null, method, null);
} else if (void.class.equals(resultType) && name.startsWith(SET_PREFIX)) {
// 2.2 设置属性值的方法
pd = new PropertyDescriptor(this.beanClass, name.substring(3), null, method);
if (throwsException(method, PropertyVetoException.class)) {
pd.setConstrained(true);
}
}
} else if (argCount == 2) {
// 3. 有两个入参
if (void.class.equals(resultType) && int.class.equals(argTypes[0]) && name.startsWith(SET_PREFIX)) {
// 3.1 只处理设置属性值的方法, 如 setChild(Integer index, Child child), 则封装成索引属性器
pd = new IndexedPropertyDescriptor(this.beanClass, name.substring(3), null, null, null, method);
if (throwsException(method, PropertyVetoException.class)) {
pd.setConstrained(true);
}
}
}

return PropertyDescriptor;

由此可以看出,一个类的方法名称、入参个数、反参类型是JavaBean 内省的主要要素,可以总结为:

  • 只能内省一个类暴露的 public 非静态方法;
  • 可以内省标准化的 set 方法,如 void setAge(Integer age)
  • 可以内省标准化的 get 方法,如 ResultType getAge()
  • 可以内省设置索引属性的方法,如 setChild(Integer index, Child child)
  • 可以内省获取索引属性的方法,如 getChild(Integer index)
  • 可以内省获取基本类型布尔值的且以 is 开头的方法,如 boolean isMale()

五、总结

Spring 对象拷贝,基于反射和内省机制,通过属性描述器,将源属性值写入目标属性。如今 Spring 架构已被广泛使用,旗下各种好用的工具也是顺手拈来,但无端的滥用也潜藏着一些问题。比如 Spring 对象拷贝,要求操作的对象必须符合 JavaBean 规范,否则将无法拷贝。如拷贝包装类型的布尔值,其读取方法为 Boolean isMale ,不符合 JavaBean 规范,对应的目标属性值一定是 null


一、 携程 Dal 开源框架

Dal 是携程开源的数据库访问框架,为大规模的 DB 管理和使用提供一套优质的解决方案。

首先在 DB 管理方面,Dal 统一集成了主流程的数据访问:支持 JavaC# 客户端;支持 MySQLSQLServer 数据库;支持 ORM 和非 ORM 方式的数据访问;使用了 Emit 映射技术,提供高性能 ORM;多数据源访问和主从分离(读写分离);日志、监控集成。

其次在 DB 使用方面, Dal 支持代码生成。通过 Dal 平台,可一键生成 EntityDaoUnit Test。这不仅可以让开发者脱离 DB 编程、提升开发效率,还可以统一面向 DB 的代码风格和代码质量。

二、 DataXDal 的兼容

Dal 的主要特点是统一收口和集中管理,在 DB 连接方面,客户端无需配置 DB 的用户名和密码,只需要配置 Dal 提供的 TitanKeyClusterName 即可。简单来说,TitanKeyClusterName 就是 Dal 生成的,供客户端的访问 DB 的钥匙。也正因此,DataXDal 架构的系统上就不起作用了。需要解决两个问题:DataX 如何配置 TitanKeyClusterNameDataX 如何通过 Dal 获取数据源。

三、 Datax 配置 TitanKeyClusterName

这是 mysqlwriter 的配置信息模板:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
{
"name": "mysqlwriter",
"parameter": {
"username": "",
"password": "",
"writeMode": "",
"column": [],
"session": [],
"preSql": [],
"connection": [
{
"jdbcUrl": "",
"table": []
}
]
}
}

Dal 不关心 usernamepassword,不妨将 TitanKeyClusterName放在 jdbcurl处。

四、DataX 获取 Dal 数据源

DataX 通过如下方式获取数据连接:

1
2
3
4
5
6
7
8
9
private static synchronized Connection connect(DataBaseType dataBaseType, String url, Properties prop) {
try {
Class.forName(dataBaseType.getDriverClassName());
DriverManager.setLoginTimeout(Constant.TIMEOUT_SECONDS);
return DriverManager.getConnection(url, prop);
} catch (Exception e) {
throw RdbmsException.asConnException(dataBaseType, e, prop.getProperty("user"), null);
}
}

Dal 通过如下方式获取数据连接:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
import javax.annotation.Resource;
import javax.sql.DataSource;
import java.sql.Connection;
import com.ctrip.datasource.configure.DalDataSourceFactory;

public final class DBUtil {

private DBUtil() {
}

@Resource
private DalDataSourceFactory dsFactory;

/**
* 数据源工厂
*/
@Bean
public DalDataSourceFactory getCtripDalDataSource() {
return new DalDataSourceFactory();
}

/**
* 通过 titanKey 获取数据连接
*/
public static Connection getConnectionByTitanKey(final String titanKey) {
try {
DataSource dataSource = dsFactory.createDataSource(titanKey);
return dataSource.getConnection();
} catch (Exception e) {
throw DataXException
.asDataXException(DBUtilErrorCode.CONN_DB_ERROR,
String.format("数据库连接失败. 因为根据您配置的连接信息:%s获取数据库连接失败. 请检查您的配置并作出修改.", titanKey), e);
}
}

/**
* 通过 clusterName 获取数据连接
*/
public static Connection getConnectionByClusterName(final String clusterName) {
try {
DataSource ds = dsFactory.getOrCreateDataSource(clusterName);
return dataSource.getConnection();
} catch (Exception e) {
throw DataXException
.asDataXException(DBUtilErrorCode.CONN_DB_ERROR,
String.format("数据库连接失败. 因为根据您配置的连接信息:%s获取数据库连接失败. 请检查您的配置并作出修改.", clusterName), e);
}
}
}

综上:借助 DataSource 工厂,就可以用 Dal 数据连接替换掉 DataX 的数据连接。

五、DataX 兼容 Dal 优化

从以上实现可以看出,获取 Dal 数据连接主要有两步:生成数据源和建立数据连接,并且每次数据同步都要重复一遍。既然 Dal 已经提供了 DataSource 工厂,是否可以考虑将数据源缓存下来呢?

所以,有一套更好的兼容方案:在工具启动过程中,加载数据源并缓存下来, dbName 作为查找数据源的 key

DataSourceConfiguration.class

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
import com.alibaba.datax.plugin.rdbms.util.DBUtil;
import com.ctrip.datasource.configure.DalDataSourceFactory;
import com.google.common.base.Throwables;
import com.google.common.collect.Maps;
import org.apache.commons.lang.StringUtils;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.util.Assert;

import javax.annotation.PostConstruct;
import javax.annotation.Resource;
import javax.sql.DataSource;
import java.util.Map;

/**
* @author zourongsheng
*/
@Configuration
public class DataSourceConfiguration {

@Resource
private DalDataSourceFactory dsFactory;

/**
* 数据源工厂
*/
@Bean
public DalDataSourceFactory getCtripDalDataSource() {
return new DalDataSourceFactory();
}

private static final String CLUSTER_CONN_TYPE_FLAG = "cluster";

/**
* titan key 连接信息
*/
public static final String TITAN_KEY_TEST_DB = "test_titan_db";

/**
* cluster name 连接信息
*/
public static final String CLUSTER_NAME_TEST_DB = "test_cluster_db";

/**
* 根据 titan key 加载数据源
*/
private void fillDataSourceFromTitanKey(String titanKey) {
try {

Assert.hasText(titanKey,
"connect to db failed; titan key cannot be null or empty");

DataSource dataSource = dsFactory.createDataSource(titanKey);
// 缓存数据源
DBUtil.setDataSourceIfAbsent(titanKey, dataSource);
} catch (Exception t) {
throw Throwables.propagate(t);
}
}

/**
* 根据 cluster name 加载单库数据源
*/
private void fillDataSourceFromClusterName(String clusterName) {
try {
Assert.hasText(clusterName,
"connect to db failed; dal cluster cannot be null or empty");

// 判断是否为 clusterName 格式
Assert.isTrue(clusterName.contains(CLUSTER_CONN_TYPE_FLAG),
String.format("%s is not in a cluster format", clusterName));

DataSource dataSource = dsFactory.getOrCreateDataSource(clusterName);
// 缓存数据源
DBUtil.setDataSourceIfAbsent(clusterName, dataSource);
} catch (Exception t) {
throw Throwables.propagate(t);
}
}

@PostConstruct
public void initDataSource() {

// 通过 titan key 加载数据源
fillDataSourceFromTitanKey(TITAN_KEY_TEST_DB);

// 通过 cluster name 加载数据源
fillDataSourceFromClusterName(CLUSTER_NAME_TEST_DB);
}
}

DBUtil.class

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
import com.alibaba.datax.common.exception.DataXException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import javax.sql.DataSource;
import java.io.File;
import java.sql.*;
import java.util.*;
import java.util.concurrent.*;

public final class DBUtil {

private DBUtil() {
}

private static final Logger LOG = LoggerFactory.getLogger(DBUtil.class);
private static final Map<String, DataSource> DS_MAP = new ConcurrentHashMap<>();

/**
* 从外部将数据源放到Engine里
*
* @param dsName 数据源名称(后续根据名称取出数据源)
* @param ds 数据源
*/
public static void setDataSourceIfAbsent(String dsName, DataSource ds) {
if (DS_MAP.containsKey(dsName)) {
return;
}
synchronized (DS_MAP) {
if (!dsMap.containsKey(dsName)) {
DS_MAP.put(dsName, ds);
LOG.info("setDataSourceIfAbsent将数据源{}放入Engine", dsName);
}
}
}

/**
* 获取数据源
* @param dsName 数据源名称
* @return 数据源
*/
private static DataSource getDataSource(String dsName) {
// 处理 jdbc 格式的数据源名称
if (dsName.contains("?")) {
dsName = dsName.substring(0, dsName.indexOf("?"));
}

return DS_MAP.get(dsName);
}

/**
* 通过数据源名称获取数据库连接
* @param dsName 数据源名称
* @return 数据库连接
*/
public static Connection getConnection(String dsName) {
try {
// 获取数据源
DataSource dataSource = getDataSource(dsName);

Assert.notNull(dataSource, String.format("获取数据源%s失败", dsName));

return dataSource.getConnection();
} catch (Exception e) {
throw DataXException
.asDataXException(DBUtilErrorCode.CONN_DB_ERROR,
String.format("数据库连接失败. 因为根据您配置的连接信息:%s获取数据库连接失败. 请检查您的配置并作出修改.", dsName), e);
}
}
}


一、Spring Retry 使用场景之调用第三方服务

在很多应用中,都需要对接第三方平台,如调用支付平台、调用外部企业平台等。出于对接口通信的安全性考虑,第三方服务往往都会下发一个 token 给调用方,调用方只有通过这个 token 才能完成业务处理。但这个 token 是存在有效期的,因此调用方也会把 token 缓存一定的时间,在有效期内直接读取缓存,缓存过期便再次请求第三方平台下发一个新的 token,缓存起来。这样既可以减少对第三方服务的调用次数,也可以缩短接口的处理时间。

这样就存在一个问题:如果这个 token 因为种种原因在第三方是已过期的状态,而调用方的缓存未过期,那么调用方的所有业务操作都会出错。从接口逻辑上来说,报错是正确的,也是有必要的,但在系统设计层面上来看,因为 token 过期导致的所有接口报错,是不合理的。好的设计应该是在接口层重新获取 token,继续完成业务逻辑,对顶层调用无感。

因此,spring retry 在对接第三方服务的场景中,可以有效提升接口的容错性。

二、Spring Retry 在实际应用中的关注点

spring retry 本质上是代理逻辑的重复处理,每次处理的代码逻辑都是一样的,那么重复处理的意义何在?

我们以调用支付服务为例,当服务端告诉我们:“token 已过期,查询支付结果失败”,我们首先要做的事,就是把token已过期这个异常抛出去,让 spring retry 监听到这个异常,进行重试。重试的结果,必然还是“token 已过期,查询支付结果失败”,因为重试过程中使用的 token 还是已过期的 token。因此,不仅需要重试,还需要在重试之前清除 token 缓存。也就是说,spring retry 至少需要关注两点:

  • 监听异常:对于指定的异常,按照配置的策略,开启重试;
  • 清除缓存:对于指定的异常,清除缓存;

三、代码实现

在底层方法上,使用 @Retryable 和 @TokenExpiredExceptionCatch 两个注解。前者的作用是发起业务重试,后者的作用是清除 token 缓存。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
@Retryable(value = {TokenExpiredException.class}, maxAttempts = 3, backoff = @Backoff(delay = 100L, multiplier = 1))
@TokenExpiredExceptionCatch
public Response invoke(Request request) {
// 从缓存中获取 token
String token = cache.getToken();
// 缓存不存在,则调用接口请求再次下发 token
if (StringUtils.isEmpty(token)) {
// 调用第三方平台,获取新的token,并缓存
token = newToken;
}
// 调用第三方服务
ResponseInfo responseInfo= Server.queryPayResult(token, request);
if (responseInfo.getCode == 1002) {
throw new TokenExpiredException(responseInfo.getMessage());
}
return responseInfo.getData();
}

自定义注解 @TokenExpiredExceptionCatch

1
2
3
4
@Target(ElementType.METHOD)
@Retention(RetentionPolicy.RUNTIME)
public @interface TokenExpiredExceptionCatch {
}

切面监听注解 @TokenExpiredExceptionCatch

1
2
3
4
5
6
7
8
9
10
11
12
13
14
@Around("@annotation(com.aspects.TokenExpiredExceptionCatch)")
public Object handler(ProceedingJoinPoint joinPoint) throws Throwable {
try {
return joinPoint.proceed();
} catch (AbstractCustomException e) {
// 解析异常码值
ErrorCodeEnum ErrorCodeEnum = ErrorCodeEnum.getByCode(String.valueOf(e.getCode()));
// --------------处理 token 已过期异常--------------
if (ErrorCodeEnum.TOKEN_EXPIRED.equals(ErrorCodeEnum)) {
// 清除缓存 token
}
throw e;
}
}

四、Spring Retry 注意事项

spring retry 是一个很好用的重试工具,配置简单,对 Spring 项目兼容良好,但也不可无脑使用,让所有接口都在底层无感重试。如果支付/退款业务发生重试,极有可能引来客诉或发生资损。因此,在引入重试机制的接口上,一定要判断接口的幂等性。


一、Swicth 关键字

switchJava 中的选择语句,与 if/else 不同的是,switch 只支持常量表达式,包括 byte、short、int、char、枚举常量和字符串常量(From jdk1.7)。

通常说,引入 switch ,一是为了优化代码结构,让代码更简洁;二是为了优化性能,提升效率。为了进一步学习 switch 作用机制,本文将从字节码的角度来探索其底层实现。

二、Switch 的典型应用

2.1 对整型-int的支持
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
public static void switchInt(int flag) {
switch (flag) {
case 1:
System.out.println("这是1");
break;
case 8:
System.out.println("这是8");
break;
case 3:
System.out.println("这是3");
break;
default:
System.out.println("这是未知数");
break;
}
}

使用 Jclasslib 查看字节码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
 0 iload_0
1 lookupswitch 3
1: 36 (+35)
3: 58 (+57)
8: 47 (+46)
default: 69 (+68)
36 getstatic #3 <java/lang/System.out : Ljava/io/PrintStream;>
39 ldc #4 <这是1>
41 invokevirtual #5 <java/io/PrintStream.println : (Ljava/lang/String;)V>
44 goto 77 (+33)
47 getstatic #3 <java/lang/System.out : Ljava/io/PrintStream;>
50 ldc #6 <这是8>
52 invokevirtual #5 <java/io/PrintStream.println : (Ljava/lang/String;)V>
55 goto 77 (+22)
58 getstatic #3 <java/lang/System.out : Ljava/io/PrintStream;>
61 ldc #7 <这是3>
63 invokevirtual #5 <java/io/PrintStream.println : (Ljava/lang/String;)V>
66 goto 77 (+11)
69 getstatic #3 <java/lang/System.out : Ljava/io/PrintStream;>
72 ldc #8 <这是未知数>
74 invokevirtual #5 <java/io/PrintStream.println : (Ljava/lang/String;)V>
77 return

可以看出,switch 对整形-int类型的选择,是直接比较 int 值的。

2.2 对整型-byte的支持
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
public static void switchByte(byte flag) {
switch (flag) {
case 1:
System.out.println("这是1");
break;
case 8:
System.out.println("这是8");
break;
case 3:
System.out.println("这是3");
break;
default:
System.out.println("这是未知数");
break;
}
}

使用 Jclasslib 查看字节码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
 0 iload_0
1 lookupswitch 3
1: 36 (+35)
3: 58 (+57)
8: 47 (+46)
default: 69 (+68)
36 getstatic #3 <java/lang/System.out : Ljava/io/PrintStream;>
39 ldc #4 <这是1>
41 invokevirtual #5 <java/io/PrintStream.println : (Ljava/lang/String;)V>
44 goto 77 (+33)
47 getstatic #3 <java/lang/System.out : Ljava/io/PrintStream;>
50 ldc #6 <这是8>
52 invokevirtual #5 <java/io/PrintStream.println : (Ljava/lang/String;)V>
55 goto 77 (+22)
58 getstatic #3 <java/lang/System.out : Ljava/io/PrintStream;>
61 ldc #7 <这是3>
63 invokevirtual #5 <java/io/PrintStream.println : (Ljava/lang/String;)V>
66 goto 77 (+11)
69 getstatic #3 <java/lang/System.out : Ljava/io/PrintStream;>
72 ldc #8 <这是未知数>
74 invokevirtual #5 <java/io/PrintStream.println : (Ljava/lang/String;)V>
77 return

可以看出,switch 对整形-byte类型的选择,是先将其转化为 int 类型,再比较 int 值的。

2.3 对整型-char的支持
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
public static void switchChar(char flag) {
switch (flag) {
case 'a':
System.out.println("这是a");
break;
case 'c':
System.out.println("这是c");
break;
case 'b':
System.out.println("这是b");
break;
default:
System.out.println("这是未知数");
break;
}
}

使用 Jclasslib 查看字节码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
 0 iload_0
1 tableswitch 97 to 99
97: 28 (+27)
98: 50 (+49)
99: 39 (+38)
default: 61 (+60)
28 getstatic #3 <java/lang/System.out : Ljava/io/PrintStream;>
31 ldc #4 <这是a>
33 invokevirtual #5 <java/io/PrintStream.println : (Ljava/lang/String;)V>
36 goto 69 (+33)
39 getstatic #3 <java/lang/System.out : Ljava/io/PrintStream;>
42 ldc #6 <这是c>
44 invokevirtual #5 <java/io/PrintStream.println : (Ljava/lang/String;)V>
47 goto 69 (+22)
50 getstatic #3 <java/lang/System.out : Ljava/io/PrintStream;>
53 ldc #7 <这是b>
55 invokevirtual #5 <java/io/PrintStream.println : (Ljava/lang/String;)V>
58 goto 69 (+11)
61 getstatic #3 <java/lang/System.out : Ljava/io/PrintStream;>
64 ldc #8 <这是未知数>
66 invokevirtual #5 <java/io/PrintStream.println : (Ljava/lang/String;)V>
69 return

可以看出,switch 对整形-char类型的选择,是先将其转化为 int 类型,再比较 int 值的。此外,short 类型的选择也是先转化为 int 类型再做比较的,就不一一展开了。

2.4 对枚举类型的支持
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
public static void switchEnum(PayStatusEnum flag) {
switch (flag) {
case INIT:
System.out.println("INIT");
break;
case PAYING:
System.out.println("PAYING");
break;
case PAID:
System.out.println("PAID");
break;
default:
System.out.println("非法状态");
break;
}
}

使用 Jclasslib 查看字节码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
0 getstatic #4 <com/example/springbatchdemo/job/Test$1.$SwitchMap$com$example$springbatchdemo$job$PayStatusEnum : [I>
3 aload_0
4 invokevirtual #5 <com/example/springbatchdemo/job/PayStatusEnum.ordinal : ()I>
7 iaload
8 tableswitch 1 to 3
1: 36 (+28)
2: 47 (+39)
3: 58 (+50)
default: 69 (+61)
36 getstatic #6 <java/lang/System.out : Ljava/io/PrintStream;>
39 ldc #7 <INIT>
41 invokevirtual #8 <java/io/PrintStream.println : (Ljava/lang/String;)V>
44 goto 77 (+33)
47 getstatic #6 <java/lang/System.out : Ljava/io/PrintStream;>
50 ldc #9 <PAYING>
52 invokevirtual #8 <java/io/PrintStream.println : (Ljava/lang/String;)V>
55 goto 77 (+22)
58 getstatic #6 <java/lang/System.out : Ljava/io/PrintStream;>
61 ldc #10 <PAID>
63 invokevirtual #8 <java/io/PrintStream.println : (Ljava/lang/String;)V>
66 goto 77 (+11)
69 getstatic #6 <java/lang/System.out : Ljava/io/PrintStream;>
72 ldc #11 <非法状态>
74 invokevirtual #8 <java/io/PrintStream.println : (Ljava/lang/String;)V>
77 return

可以看出,switch 对枚举类型的选择,是先将枚举项的ordinal()的返回值+1,再做比较的。

2.5 对字符串的支持
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
public static void switchString(String flag) {
switch (flag) {
case "ONE":
System.out.println("这是ONE");
break;
case "TWO":
System.out.println("这是TWO");
break;
case "THREE":
System.out.println("这是THREE");
break;
default:
System.out.println("未知数");
break;
}
}

使用 Jclasslib 查看字节码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
  0 aload_0
1 astore_1
2 iconst_m1
3 istore_2
4 aload_1
5 invokevirtual #4 <java/lang/String.hashCode : ()I>
8 lookupswitch 3
78406: 44 (+36)
83500: 58 (+50)
79801726: 72 (+64)
default: 83 (+75)
44 aload_1
45 ldc #2 <ONE>
47 invokevirtual #5 <java/lang/String.equals : (Ljava/lang/Object;)Z>
50 ifeq 83 (+33)
53 iconst_0
54 istore_2
55 goto 83 (+28)
58 aload_1
59 ldc #6 <TWO>
61 invokevirtual #5 <java/lang/String.equals : (Ljava/lang/Object;)Z>
64 ifeq 83 (+19)
67 iconst_1
68 istore_2
69 goto 83 (+14)
72 aload_1
73 ldc #7 <THREE>
75 invokevirtual #5 <java/lang/String.equals : (Ljava/lang/Object;)Z>
78 ifeq 83 (+5)
81 iconst_2
82 istore_2
83 iload_2
84 tableswitch 0 to 2
0: 112 (+28)
1: 123 (+39)
2: 134 (+50)
default: 145 (+61)
112 getstatic #8 <java/lang/System.out : Ljava/io/PrintStream;>
115 ldc #9 <这是ONE>
117 invokevirtual #10 <java/io/PrintStream.println : (Ljava/lang/String;)V>
120 goto 153 (+33)
123 getstatic #8 <java/lang/System.out : Ljava/io/PrintStream;>
126 ldc #11 <这是TWO>
128 invokevirtual #10 <java/io/PrintStream.println : (Ljava/lang/String;)V>
131 goto 153 (+22)
134 getstatic #8 <java/lang/System.out : Ljava/io/PrintStream;>
137 ldc #12 <这是THREE>
139 invokevirtual #10 <java/io/PrintStream.println : (Ljava/lang/String;)V>
142 goto 153 (+11)
145 getstatic #8 <java/lang/System.out : Ljava/io/PrintStream;>
148 ldc #13 <未知数>
150 invokevirtual #10 <java/io/PrintStream.println : (Ljava/lang/String;)V>
153 return

可以看出,switch 对字符串类型的选择,是先比较其 hashCode,若出现哈希碰撞,再通过 String.equals做校验的。

三、Switch 的特点

3.1 本质上是 int 值的筛选

从以上使用场景和对应的字节码可知,不管 switch 筛选何种类型的数据,最终都都是将其转化为 int 类型,再做值的比较的。

3.2 不支持 long 类型

既然 switch 本质上是 int 值的筛选,那么就不支持 long 类型。在设计之初,switch 为什么不把筛选范围定到 long 呢?

首先,switch 的诞生也为了满足实际需求,而绝大多数的选择语句都是简单语句,int 值域已经可以满足绝大多数的场景了。另一方面,筛选值域越大,要求就越高,意味着底层设计也越复杂。因此,基于实际需求与设计复杂度两方面的平衡,switch 把筛选值域定为 int,是最佳实践。

3.3 对筛选值排序

以上案例出现一个很有意思的现象,如 switch 筛选1,8,3这个三个值,但字节码中的筛选顺序却是1,3,8,如果筛选值更多更乱,就更能验证这个机制——对筛选值排序。

说到排序查找,很容易想到二分查找。实际上,switch 确实引入了二分查找算法(时间复杂度:O(log2n))。在分支较多的情况下,使用二分查找,可以大大降低查找时间,提升筛选效率。

3.4 lookupswitch 和 tableswitch

从字节码可以看出,switch 的作用机制,是先比较int值,再映射到执行地址的。这种类似 map 的映射结构,专业名叫跳转表。那么 switch 的跳转表为什么会有两种呢?

从以上案例可以发现:

  • 筛选值是1,3,8时,使用的是 lookupswitch
  • 筛选值是字符串的哈希值时,使用的是 lookupswitch
  • 筛选值是a,b,c时,对应的 int 值分别是97,98,99,使用的是 tableswitch
  • 筛选值是枚举项的下标+1时,对应的 int 值分别是1,2,3,使用的是 tableswitch

阅相关文档可知:lookpswitch 应用于筛选值离散度比较高的场景,tableswitch 应用于筛选值离散度比较低的场景。这是由编译器在编辑阶段,根据分支的离散度决定的,本质上都是为了提升查找速度。

参考:stackoverflow: 为什么 switch 不支持 long 类型?


一、unsafe.Pointer 定义及使用背景

1
2
3
4
5
// ArbitraryType is here for the purposes of documentation only and is not actually
// part of the unsafe package. It represents the type of an arbitrary Go expression.
type ArbitraryType int

type Pointer *ArbitraryType

本质上,unsafe.Pointerint 类型的指针,用于各种类型指针转换的桥接。Go 语言有着严格的类型系统,弱化了指针的操作,所允许的操作仅仅操作其指向的对象,不能进行类似 C 语言的指针转换和运算。但在日常开发中,可能就需要打破这种强制限制,对内存执行任意的读写。因此,作为通用的指针类型,unsafe.Pointer 开启了一扇指针操作的“后门”。

二、unsafe.Pointer 特性

  • 任意类型的指针都可以转换为 unsafe.Pointer

  • unsafe.Pointer 可以转换为任意类型的指针;

  • uintptr 可以转换为 unsafe.Pointer

  • unsafe.Pointer 可以转换为 uintptr

    前面说到,unsafe.Pointer 是通用的指针类型,只能转换不同类型的指针,无法实现类似 C 语言的指针运算。因此 Go 引入内置类型 uintptr,以弥补类型系统带来的短板。uintptr 的官方定义:

    1
    2
    3
    // uintptr is an integer type that is large enough to hold the bit pattern of
    // any pointer.
    type uintptr uintptr

    本质上,uintptr 是一个足够大的无符号整型,可以表示任意指针的地址。相当于一个中介,可以完成指针运算或者数值类型到指针类型的转换。

三、unsafe.Pointer 应用

3.1 指针与指针之间的转换

作为通用的指针类型,unsafe.Pointer 最基本的功能就是转换不同类型的指针。从 *X 转到 *Y 要求 Y 不大于 X 且两者具有相同的内存布局。

例如 bytestring 互转。由于 Go 的类型系统限制,byte 指针是不可以直接转为 string 指针的,在编译阶段就会报错。我们需要借助 unsafe.Pointer 作为中间桥接类型来完成这个转换。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
package main

import (
"fmt"
"unsafe"
)

func main() {
b := []byte{'a', 'b', 'c'}
fmt.Println(b)

// []byte -> string
s := *(*string)(unsafe.Pointer(&b))
fmt.Println(s)

// string -> []byte
bb := *(*[]byte)(unsafe.Pointer(&s))
fmt.Println(bb)
}

输出:
[97 98 99]
abc
[97 98 99]
3.2 数值与指针之间的转换

C 语言中,经常使用普通数值来表示指针,这也就意味着要完成数值与指针之间的互转。 unsafe.Pointer 是通用指针,已无能为力。因此中间人 uintptr 就派上用场了。我们借助 uintptr 先将数值转换为 unsafe.Pointer,然后再转换为任意类型的指针;或者将任意类型的指针,先转换为 unsafe.Pointer,再转换为 uintptr。实际上,数值与指针的互转也是 CGO 编程的要点之一。

例如 int64*C.char 互转:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
package main

import "C"
import (
"fmt"
"unsafe"
)

func main() {
var num = int64(12)

// int64 -> C.char
p := (*C.char)(unsafe.Pointer(uintptr(num)))

// C.char -> int64
num2 := int64(uintptr(unsafe.Pointer(p)))

fmt.Println(num2)
}

输出:
12
3.3 指针运算

Go 指针不仅不支持不同类型的转换,也不支持指针的运算。借助 uintptr 可以实现指针的移动和运算。

例如依次打印一个字节组信息:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
package main

import (
"fmt"
"unsafe"
)

func main() {
data := []byte("1234")
for i := 0; i < len(data); i++ {
ptr := unsafe.Pointer(uintptr(unsafe.Pointer(&data[0])) + uintptr(i)*unsafe.Sizeof(data[0]))
fmt.Printf("%c\n", *(*byte)(ptr))
}
}

输出:
1
2
3
4

四、总结

unsafe.Pointer 的意义在于绕过 Go 的类型系统,直接读写内存,高效操作。正如字面理解那样,这是一种不安全的行为,如 uintptr 并没有指针的语义,所指向的对象存在被 GC 回收的风险。Go 是十分不鼓励这样操作的。


一、学习背景

Freemarker 是一款强大的模板引擎,可以用来生成网页、邮件、文档等。对于简单的 Word 文档导出,只需要手动编写 ftl 文件即可。但如果要导出复杂的文档,比如带有复杂样式、页眉页脚、内嵌图片、批注等,手动编写模板就行不通了。现在提出一个从目标文档出发的解决方案:先将目标 Word 模板文档转换为 xml 文档,然后将 xml 文档转换为 ftl 文档,手动替换模板中的变量之后即可导出复杂 Word

二、根据目标文档获取 ftl 文档

我们以导出房屋租赁合同文档为例,模板中有房东、租客信息、房屋信息等。

1. 将目标模板转换为 xml 文档

操作 Word 文档,点击【文件】,另存为 xml 文档。

NotePad++Sublime 打开 xml 文档,内容缺乏层次感,这里需要格式化一下。

2. 将 xml 文档转换为 ftl 文档

格式化之后的 xml 文档,选择【文件】,另存为 ftl 文档。接下来需要手动替换模板参数。

文本参数:根据模板中的默认值,找到其所在位置,直接替换。

图片参数:图片参数是对图片进行 Base64 加密之后的值,加密操作可以由 Java 来完成。

三、使用 Java 根据 ftl 模板导出 Word 文档

Resource 目录下新建文件夹 freemarker_template,将 ftl 文档粘贴进去。

图片 Base64 位编码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
import com.company.exception.ServiceException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Service;
import sun.misc.BASE64Encoder;

import java.io.File;
import java.io.FileInputStream;
import java.io.IOException;
import java.io.InputStream;

/**
* @author zourongsheng
* @version 1.0
* @date 2021/07/11 22:48
*/
@Service
public class ImageServiceImpl implements ImageService {

private static final Logger LOGGER = LoggerFactory.getLogger(ImageServiceImpl.class);

/**
* 【对图片进行 Base64 编码】
*
* @param fileSrc 图片的存储地址: filePath + fileName
* @return 图片 Base64 编码
*/
@Override
public String getImgBase64Data(String fileSrc) {

File img = new File(fileSrc);

if (!img.exists()) {
return null;
}

try (InputStream in = new FileInputStream(img)) {
byte[] data = new byte[in.available()];
in.read(data);
BASE64Encoder encoder = new BASE64Encoder();
return encoder.encode(data);
} catch (IOException e) {
LOGGER.error("invoke ImageService.getImgBase64Data error: {}", e.getMessage(), e);
throw new ServiceException(e.getMessage(), e);
}
}
}

解析模板内容实现:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
import com.company.exception.ServiceException;
import freemarker.template.Configuration;
import freemarker.template.Template;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Service;
import org.springframework.util.Assert;

import java.io.File;
import java.io.StringWriter;
import java.nio.charset.StandardCharsets;
import java.util.Map;

/**
* @author zourongsheng
* @version 1.0
* @date 2021/07/11 16:14
*/
@Service
public class TemplateServiceImpl implements TemplateService {

private static final Logger LOGGER = LoggerFactory.getLogger(TemplateServiceImpl.class);

/**
* 【组装数据模板信息】
*
* @param templatePath 模板存放的根目录
* @param templateName 模板名称
* @param params 模板内容参数
* @return 数据模板信息
*/
@Override
public String getTemplateContent(String templatePath, String templateName, Map<String, Object> params) {
try {

LOGGER.info("start building template content. path: 【{}】; name: 【{}】; params: 【{}】", templatePath, templateName, params);

Assert.hasText(templatePath, "template path cannot be null or empty");

Assert.hasText(templateName, "template name cannot be null or empty");

// 获取资源目录
String resourcePath = TemplateServiceImpl.class.getResource(File.separator).getPath();

// 模板配置信息
Configuration configuration = new Configuration(Configuration.VERSION_2_3_28);
configuration.setDefaultEncoding(StandardCharsets.UTF_8.name());
String standardTemplatePath = templatePath.endsWith(File.separator) ? templatePath.concat(File.separator) : templatePath;
configuration.setDirectoryForTemplateLoading(new File(resourcePath.concat(standardTemplatePath)));

// 生成模板
Template template = configuration.getTemplate(templateName);

// 填充模板内容参数
StringWriter writer = new StringWriter();
template.process(params, writer);

String content = writer.toString();

LOGGER.info("finish building template content.");

return content;
} catch (Exception e) {
LOGGER.error("invoke TemplateService.getStringFromVm error: {}", e.getMessage(), e);
throw new ServiceException(e.getMessage(), e);
}
}
}

单元测试:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
import ImageService;
import TemplateService;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.commons.io.IOUtils;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import javax.annotation.Resource;
import java.io.File;
import java.io.FileOutputStream;
import java.io.InputStream;
import java.io.OutputStream;
import java.nio.charset.StandardCharsets;
import java.util.Map;
import org.junit.runner.RunWith;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.junit4.SpringRunner;

/**
* @author zourongsheng
* @version 1.0
* @date 2021/07/11 16:44
*/
@RunWith(SpringRunner.class)
@SpringBootTest
public class TemplateTest {

private static final Logger LOGGER = LoggerFactory.getLogger(TemplateTest.class);

@Resource
private TemplateService templateService;

@Resource
private ImageService imageService;

@Test
public void generateWordFromTemplate() {

String templatePath = "freemarker_template/";
String templateName = "contract.ftl";

ContractInfo contractInfo = new ContractInfo();
contractInfo.setLandlordName("地头蛇");
contractInfo.setLandlordIdNo("100011232132112");
contractInfo.setLandlordAddress("上海市青浦区");
contractInfo.setLandlordPhoneNo("13032389090");
contractInfo.setTenantName("打工人");
contractInfo.setTenantIdNo("340323199901013217");
contractInfo.setTenantAddress("安徽省蚌埠市");
contractInfo.setTenantPhoneNo("15656997878");
contractInfo.setYear("2020");
contractInfo.setMonth("01");
contractInfo.setDay("01");
// 图片 Base64 编码
String imgBase64Data = imageService.getImgBase64Data("C:\\house.jpg");
contractInfo.setImgBase64Data(imgBase64Data);

ObjectMapper objectMapper = new ObjectMapper();
Map<String, Object> params = objectMapper.convertValue(contractInfo, Map.class);

String content = templateService.getTemplateContent(templatePath, templateName, params);

File file = new File("租房合同-打工人.doc");

try (InputStream in = IOUtils.toInputStream(content, StandardCharsets.UTF_8);
OutputStream out = new FileOutputStream(file)) {

byte[] data = new byte[1024];

int len;
while (-1 != (len = in.read(data, 0, data.length))) {
out.write(data, 0, len);
}
out.flush();
} catch (Exception e) {
LOGGER.error("下载租房合同失败; errMsg: {}", e.getMessage(), e);
}
}
}

注意: 通过这种方式导出的 Word 文档,本质上还是 xml 文档,因此必须使用 .doc 后缀,具体请查看MsOffice Word docx 研究

运行起来,导出租房合同-打工人.doc

四、总结

通过将目标模板转换为 ftl 文档,再解析得到目标文档的办法,理论上可以应对任何复杂程度的文档导出需求。但这种好办法也有弊端:ftl 文档包含太多的内联样式、复杂标签等,可读性太差。当模板发生变化时,手动替换太多的模板参数将会是一种灾难。

参考:


一、docdocx 简介

doc 全程为 document,是常见的文件扩展名,也是 Word2003 及之前版本的文本文档格式,其基于二进制形式存储;docxWord2007 及之后版本的文本文档格式,其基于 Office Open XML 标准的压缩文件格式。

二、docxdoc 的区别

既然 docx 基于 ooxml 的格式,那么本质上就是一个 zip 文件。以下是内容相同的文档,分别以 docdocx 格式保存之后所占空间大小,可以看出 docx 文件明显比 doc 要小很多。


为了进一步了解 ooxml 结构,我们以一个含有页眉页脚、文本、图片的 docx 文件为例。

手动修改文件后缀为 .zip 后保存,然后解压得到文件结构:

  • rels
    • .rels: 指定主要信息、扩展信息、文档内容的引用 ID
  • docProps
    • app.xml: 扩展信息,包括字数、行数、段落数、页数等
    • core.xml:主要信息,包括创建人、修改人、创建时间、修改时间等
  • word:文档信息
    • _rels:文档引用信息
      • document.xml.rels:指定文档中的页眉页脚、主题样式、图片音视频等的引用 ID
    • media:存放文档中使用的图片、音频、视频等媒体文件
      • image1.jpg:文档中引用的图片
    • theme:文档主题信息
      • theme1.xml
    • document.xml:文档内容
    • endnotes.xml
    • fontTable.xml
    • footer1.xml:页脚信息
    • footnotes.xml
    • header1.xml:页眉信息
    • settings.xml:文档配置信息
    • styles.xml:文档样式信息
    • webSettings.xml:网页样式配置信息
  • [Content_Types].xml: 指定文件配置,包括图片类型、页眉页脚、主题样式、文档内容等

docxooxml 存储模式,将文档按照功能区划分为:配置信息、主题样式信息、页眉页脚信息、引用定义信息、媒体文件信息、文档内容信息等模块。这样便可将对应的信息抽离出来,存放在 xml 文件中。一方面,清晰的文档底层结构,方便查看内容细节,也方便进行二次开发;另一方面,文档信息分模块保存,好比把鸡蛋放在多个篮子里,可以增加容错性,使得文档修复更加方便。


综上,与 doc 相比,docx 主要有以下特点:

  • 压缩率高,存储相同内容所占空间更小;
  • 将文档信息拆分保存,方便查看或二次开发;
  • 多个 xml 文件打包,易于跨平台使用;
  • 增加文档容错性,方便修复损坏文档。

参考: