0%


一、Spring Batch 性能优化指标

Spring Batch 是一款伸缩性非常好的批处理工具,既可以处理简单的任务,也可以处理复杂的、高容量的任务。在性能调优方面,Spring Batch 提供了丰富的接口支持,各项优化指标可归纳如下:

  • 多线程 Step:由独立线程执行提交块(a chunk of items)的输入、处理和输出过程;
  • 并行化 Step:对于可并行处理的 Step,交由不同的线程去处理;
  • 分片化 Step:通过 SPI(Serial Peripheral Interface),对 Step 分片执行;
  • 远程组块:对于输入无性能瓶颈,但处理和输出有性能瓶颈的任务,交由远程组块执行;

详见Spring文档

二、分片化 Step

如果一个 Step 的任务量比较大,可以尝试将其拆分成多个子任务。子任务之间可并行处理且互不干扰,这将大大提升批处理效率。例如:Master 这个 Step 迁移 100000 条数据需要 100 s,如果将其拆分为 100 个 Slave 任务,那么时间可缩短至 1 s。

Step 分片原理,是一个 Master 处理器对应多个 Salve 处理器。Slave 处理器可以是远程服务,也可以是本地执行线程。主从服务间的消息不需要持久化,也不需要严格保证传递,因为 JobRepository 的元数据管理,是将每个 Salve 独立保存在 batch_step_execution 中的,这样便可以保证每个 Slave 任务只执行一次。

Step 分片化,需要了解两个组件:分片器(Partitioner)和分片处理(PartitionHandler)。

  • 分片器(Partitioner):为每个 Slave 服务配置上下文(StepExecutionContext);

  • 分片处理(PartitionHandler):定义 Slave 服务的数量以及 Slave 任务内容;

比如在一个数据迁移 Step 中,分片处理就是将 1 个主任务拆分成 100 个从任务,并定义从任务的执行内容;分片器就是依次为这 100 个从任务划定数据迁移的范围(select * from table where id between ? and ?)。

三、批处理配置

3.1 Job 配置
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
import org.springframework.batch.core.Job;
import org.springframework.batch.core.Step;
import org.springframework.batch.core.configuration.annotation.EnableBatchProcessing;
import org.springframework.batch.core.configuration.annotation.JobBuilderFactory;
import org.springframework.batch.core.launch.support.RunIdIncrementer;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration
@EnableBatchProcessing
public class PartitionTransferStudentJob {

@Autowired
public JobBuilderFactory jobBuilderFactory;

@Autowired
@Qualifier(value = "masterTransferStudentStep1")
private Step masterTransferStudentStep;

@Bean
public Job transferStudentJob() {
return jobBuilderFactory.get("partitionTransferStudentJob")
.incrementer(new RunIdIncrementer())
.flow(masterTransferStudentStep)
.end()
.build();
}
}
3.2 Step 配置

MasterTransferStudentStep

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
import com.example.springbatchdemo.component.partitioner.TransferStudentPartitioner;
import org.springframework.batch.core.Step;
import org.springframework.batch.core.configuration.annotation.StepBuilderFactory;
import org.springframework.batch.core.partition.PartitionHandler;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.transaction.PlatformTransactionManager;

@Configuration
public class MasterTransferStudentStep {

@Autowired
public StepBuilderFactory stepBuilderFactory;

@Autowired
@Qualifier(value = "transferStudentPartitionHandler1")
private PartitionHandler transferStudentPartitionHandler;

@Autowired
private TransferStudentPartitioner transferStudentPartitioner;

@Bean("masterTransferStudentStep1")
public Step masterTransferStudentStep1(PlatformTransactionManager transactionManager) {
return stepBuilderFactory.get("masterTransferStudentStep1.manager")
.partitioner("masterTransferStudentStep1", transferStudentPartitioner)
.partitionHandler(transferStudentPartitionHandler)
.build();
}
}

SlaveTransferStudentStep

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
import com.example.springbatchdemo.component.processor.SlaveStudentItemProcessor;
import com.example.springbatchdemo.entity.Student;
import org.springframework.batch.core.Step;
import org.springframework.batch.core.configuration.annotation.StepBuilderFactory;
import org.springframework.batch.item.database.JdbcBatchItemWriter;
import org.springframework.batch.item.database.JdbcPagingItemReader;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.transaction.PlatformTransactionManager;

@Configuration
public class SlaveTransferStudentStep {

@Autowired
public StepBuilderFactory stepBuilderFactory;

@Autowired
@Qualifier(value = "slaveTransferStudentItemReader")
private JdbcPagingItemReader<Student> slaveTransferStudentItemReader;

@Autowired
@Qualifier(value = "slaveTransferStudentItemWriter")
private JdbcBatchItemWriter<Student> slaveTransferStudentItemWriter;

@Autowired
private SlaveStudentItemProcessor slaveStudentItemProcessor;


@Bean("slaveTransferStudentStep1")
public Step slaveTransferStudentStep1(PlatformTransactionManager transactionManager) {
return stepBuilderFactory.get("slaveTransferStudentStep1")
.transactionManager(transactionManager)
.<Student, Student>chunk(1000)
.reader(slaveTransferStudentItemReader)
.processor(slaveStudentItemProcessor)
.writer(slaveTransferStudentItemWriter)
.build();
}
}
3.3 Partitioner 配置
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.batch.core.partition.support.Partitioner;
import org.springframework.batch.item.ExecutionContext;
import org.springframework.context.annotation.Configuration;
import java.util.HashMap;
import java.util.Map;

@Configuration
public class TransferStudentPartitioner implements Partitioner {

private static final Logger LOGGER = LoggerFactory.getLogger(TransferStudentPartitioner.class);

@Override
public Map<String, ExecutionContext> partition(int gridSize) {

Map<String, ExecutionContext> result = new HashMap<>(gridSize);

int range = 1000;
int fromId = 0;
int toId = range;

for (int i = 1; i <= gridSize; i++) {

ExecutionContext value = new ExecutionContext();

value.putInt("fromId", fromId);
value.putInt("toId", toId);

result.put("partition" + i, value);

fromId = toId;
toId += range;

LOGGER.info("partition{}; fromId: {}; toId: {}", i, fromId, toId);
}

return result;
}
}
3.4 Partition-Handler 配置
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
import org.springframework.batch.core.Step;
import org.springframework.batch.core.partition.PartitionHandler;
import org.springframework.batch.core.partition.support.TaskExecutorPartitionHandler;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
import static com.example.springbatchdemo.config.ExecutorConfig.TASK_EXECUTOR;

@Configuration
public class TransferStudentPartitionHandler {

@Autowired
@Qualifier(value = TASK_EXECUTOR)
private ThreadPoolTaskExecutor taskExecutor;

@Autowired
@Qualifier(value = "slaveTransferStudentStep1")
private Step slaveTransferStudentStep;

@Bean("transferStudentPartitionHandler1")
public PartitionHandler transferStudentPartitionHandler1() {
TaskExecutorPartitionHandler retVal = new TaskExecutorPartitionHandler();
retVal.setTaskExecutor(taskExecutor);
retVal.setStep(slaveTransferStudentStep);
retVal.setGridSize(100);
return retVal;
}
}
3.5 数据输入器
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
import com.example.springbatchdemo.component.reader.rowmapper.StudentRowMapper;
import com.example.springbatchdemo.entity.Person;
import com.example.springbatchdemo.entity.Student;
import org.springframework.batch.core.configuration.annotation.StepScope;
import org.springframework.batch.item.database.JdbcPagingItemReader;
import org.springframework.batch.item.database.Order;
import org.springframework.batch.item.database.builder.JdbcPagingItemReaderBuilder;
import org.springframework.batch.item.database.support.MySqlPagingQueryProvider;
import org.springframework.batch.item.file.FlatFileItemReader;
import org.springframework.batch.item.file.builder.FlatFileItemReaderBuilder;
import org.springframework.batch.item.file.mapping.BeanWrapperFieldSetMapper;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.core.io.ClassPathResource;
import javax.sql.DataSource;
import java.util.HashMap;
import java.util.Map;

@Configuration
public class CustomItemReader {

@Autowired
@Qualifier(value = "batchDemoDB")
private DataSource batchDemoDB;

@Bean("slaveTransferStudentItemReader")
@StepScope
public JdbcPagingItemReader<Student> slaveTransferStudentItemReader(@Value("#{stepExecutionContext[fromId]}") final Long fromId,
@Value("#{stepExecutionContext[toId]}") final Long toId) {

MySqlPagingQueryProvider queryProvider = new MySqlPagingQueryProvider();
queryProvider.setSelectClause("student_id, name, address");
queryProvider.setFromClause("from student_source");
queryProvider.setWhereClause(String.format("where student_id > %s and student_id <= %s", fromId, toId));

Map<String, Order> sortKeys = new HashMap<>(1);
sortKeys.put("student_id", Order.ASCENDING);
queryProvider.setSortKeys(sortKeys);

return new JdbcPagingItemReaderBuilder<Student>()
.name("studentItemReader")
.dataSource(batchDemoDB)
.fetchSize(1000)
.rowMapper(new StudentRowMapper())
.queryProvider(queryProvider)
.build();
}
}
3.6 数据处理器
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
import com.example.springbatchdemo.entity.Student;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.batch.core.configuration.annotation.StepScope;
import org.springframework.batch.item.ItemProcessor;
import org.springframework.context.annotation.Configuration;

@Configuration
@StepScope
public class SlaveStudentItemProcessor implements ItemProcessor<Student, Student> {

private static final Logger log = LoggerFactory.getLogger(StudentItemProcessor.class);

@Override
public Student process(final Student studentSource) throws Exception {

final Long studentId = studentSource.getStudentId();
final String name = studentSource.getName();
final String address = studentSource.getAddress();

final Student studentTarget = new Student();
studentTarget.setStudentId(studentId);
studentTarget.setName(name);
studentTarget.setAddress(address);

log.info("Converting ({}) into ({})", studentSource, studentTarget);

return studentTarget;
}
}
3.7 数据输出器
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
import com.example.springbatchdemo.entity.Person;
import com.example.springbatchdemo.entity.Student;
import org.springframework.batch.core.configuration.annotation.StepScope;
import org.springframework.batch.item.database.BeanPropertyItemSqlParameterSourceProvider;
import org.springframework.batch.item.database.JdbcBatchItemWriter;
import org.springframework.batch.item.database.builder.JdbcBatchItemWriterBuilder;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import javax.sql.DataSource;

@Configuration
public class CustomItemWriter {

@Autowired
@Qualifier(value = "batchDemoDB")
private DataSource batchDemoDB;

@Bean("slaveTransferStudentItemWriter")
@StepScope
public JdbcBatchItemWriter<Student> slaveTransferStudentItemWriter() {

return new JdbcBatchItemWriterBuilder<Student>()
.itemSqlParameterSourceProvider(new BeanPropertyItemSqlParameterSourceProvider<>())
.sql("INSERT INTO student_target (student_id, name, address) VALUES (:studentId, :name, :address)")
.dataSource(batchDemoDB)
.build();
}
}

四、性能测试

测试数据量:100000

测试环境:Windows 10,i7-8核,MySQL-8.0.28

4.1 常规 Step

省略测试代码,具体请查看 demo。测试结果:

耗时:13s

4.2 分片化 Step

测试结果:

batch_step_execution 可以看出,共有 100 个子任务并行处理,每个子任务迁移 1000 条数据。

耗时:7s

示例代码:spring-batch-demo


一、Spring Batch 性能优化指标

Spring Batch 是一款伸缩性非常好的批处理工具,既可以处理简单的任务,也可以处理复杂的、高容量的任务。在性能调优方面,Spring Batch 提供了丰富的接口支持,各项优化指标可归纳如下:

  • 多线程 Step:由独立线程执行提交块(a chunk of items)的输入、处理和输出过程;
  • 并行化 Step:对于可并行处理的 Step,交由不同的线程去处理;
  • 分片化 Step:通过 SPI(Serial Peripheral Interface),对 Step 分片执行;
  • 远程组块:对于输入无性能瓶颈,但处理和输出有性能瓶颈的任务,交由远程组块执行;

详见Spring文档

二、并行化 Step

一个 Job 可配置多个 StepStep 之间可能存在关联,需要有先有后;也可能没有关联,先执行哪一个都可以。那么,若将这些互不关联的 Step 进行并行化处理,将会有效提升批处理性能。

比如,现有一个批处理任务,包含 4 个 Step

  • step1:在学生姓名后面追加字符串 “1”;
  • step2:在学生姓名后面追加字符串 “2”;
  • step3:在学生住址后面追加字符串 “8”;
  • step4:迁移所有学生信息;

我们发现,修改学生姓名的任务与修改学生住址的任务,互不干扰,并不需要有先后之分。因此,我们可以将 step1step2step3 并行执行。串行 Step 与并行 Step 流程如下:

三、批处理配置

3.1 Job 配置
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
import org.springframework.batch.core.Job;
import org.springframework.batch.core.Step;
import org.springframework.batch.core.configuration.annotation.EnableBatchProcessing;
import org.springframework.batch.core.configuration.annotation.JobBuilderFactory;
import org.springframework.batch.core.job.flow.Flow;
import org.springframework.batch.core.launch.support.RunIdIncrementer;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration
@EnableBatchProcessing
public class BatchManageStudentJob {

@Autowired
public JobBuilderFactory jobBuilderFactory;

@Autowired
@Qualifier(value = "batchProcessStudentSplitFlow1")
private Flow batchProcessStudentSplitFlow;

@Autowired
@Qualifier(value = "batchTransferStudentStep1")
private Step batchTransferStudentStep;

@Bean
public Job manageStudentJob() {
return jobBuilderFactory.get("manageStudentJob1")
.incrementer(new RunIdIncrementer())
// 姓名追加1、姓名追加2、地址追加8
.start(batchProcessStudentSplitFlow)
// 迁移学生信息; student_source -> student_target
.next(batchTransferStudentStep)
.end()
.build();
}
}
3.2 Fow 配置

batchProcessStudentSplitFlow

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
import org.springframework.batch.core.job.builder.FlowBuilder;
import org.springframework.batch.core.job.flow.Flow;
import org.springframework.batch.core.job.flow.support.SimpleFlow;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
import static com.example.springbatchdemo.config.ExecutorConfig.TASK_EXECUTOR;

@Configuration
public class BatchProcessStudentSplitFlow {

@Autowired
@Qualifier(value = TASK_EXECUTOR)
private ThreadPoolTaskExecutor taskExecutor;

@Autowired
@Qualifier(value = "batchUpdateStudentNameOneAndTwoFlow")
private Flow batchUpdateStudentNameOneAndTwoFlow;

@Autowired
@Qualifier(value = "batchUpdateStudentAddressFlow1")
private Flow batchUpdateStudentAddressFlow;

@Bean("batchProcessStudentSplitFlow1")
public Flow batchProcessStudentSplitFlow1() {
return new FlowBuilder<SimpleFlow>("batchProcessStudentSplitFlow1")
.split(taskExecutor)
.add(batchUpdateStudentNameOneAndTwoFlow, batchUpdateStudentAddressFlow)
.build();
}
}

batchUpdateStudentNameOneAndTwoFlow

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
import org.springframework.batch.core.Step;
import org.springframework.batch.core.job.builder.FlowBuilder;
import org.springframework.batch.core.job.flow.Flow;
import org.springframework.batch.core.job.flow.support.SimpleFlow;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration
public class BatchUpdateStudentNameFlow {

@Autowired
@Qualifier(value = "batchUpdateStudentNameStep1")
private Step batchUpdateStudentNameStep1;

@Autowired
@Qualifier(value = "batchUpdateStudentNameStep2")
private Step batchUpdateStudentNameStep2;

@Bean("batchUpdateStudentNameOneAndTwoFlow")
public Flow updateStudentNameOneAndTwoFlow() {
return new FlowBuilder<SimpleFlow>("batchUpdateStudentNameOneAndTwoFlow")
.start(batchUpdateStudentNameStep1)
.next(batchUpdateStudentNameStep2)
.build();
}
}

batchUpdateStudentNameOneAndTwoFlow

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
import org.springframework.batch.core.Step;
import org.springframework.batch.core.job.builder.FlowBuilder;
import org.springframework.batch.core.job.flow.Flow;
import org.springframework.batch.core.job.flow.support.SimpleFlow;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration
public class BatchUpdateStudentAddressFlow {

@Autowired
@Qualifier(value = "batchUpdateStudentAddressStep1")
private Step batchUpdateStudentAddressStep;

@Bean("batchUpdateStudentAddressFlow1")
public Flow batchUpdateStudentAddressFlow1() {
return new FlowBuilder<SimpleFlow>("batchUpdateStudentAddressFlow1")
.start(batchUpdateStudentAddressStep)
.build();
}
}
3.3 Step 配置

BatchUpdateStudentNameStep

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
import com.example.springbatchdemo.component.processor.AppendStudentNameOneProcessor;
import com.example.springbatchdemo.component.processor.AppendStudentNameTwoProcessor;
import com.example.springbatchdemo.entity.Student;
import org.springframework.batch.core.Step;
import org.springframework.batch.core.configuration.annotation.StepBuilderFactory;
import org.springframework.batch.item.database.JdbcBatchItemWriter;
import org.springframework.batch.item.database.JdbcPagingItemReader;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration
public class BatchUpdateStudentNameStep {

@Autowired
public StepBuilderFactory stepBuilderFactory;

@Autowired
@Qualifier(value = "studentItemReader")
private JdbcPagingItemReader<Student> studentItemReader;

@Autowired
@Qualifier(value = "studentItemUpdateName")
private JdbcBatchItemWriter<Student> studentItemUpdateName;

@Autowired
private AppendStudentNameOneProcessor appendStudentNameOneProcessor;

@Autowired
private AppendStudentNameTwoProcessor appendStudentNameTwoProcessor;

@Bean("batchUpdateStudentNameStep1")
public Step batchUpdateStudentNameStep1() {
return stepBuilderFactory.get("batchUpdateStudentNameStep1")
.<Student, Student>chunk(1000)
.reader(studentItemReader)
// 姓名追加 1
.processor(appendStudentNameOneProcessor)
.writer(studentItemUpdateName)
.build();
}

@Bean("batchUpdateStudentNameStep2")
public Step batchUpdateStudentNameStep2() {
return stepBuilderFactory.get("batchUpdateStudentNameStep2")
.<Student, Student>chunk(1000)
.reader(studentItemReader)
// 姓名追加 2
.processor(appendStudentNameTwoProcessor)
.writer(studentItemUpdateName)
.build();
}
}

BatchUpdateStudentAddressStep

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
import com.example.springbatchdemo.component.processor.AppendStudentAddressProcessor;
import com.example.springbatchdemo.entity.Student;
import org.springframework.batch.core.Step;
import org.springframework.batch.core.configuration.annotation.StepBuilderFactory;
import org.springframework.batch.item.database.JdbcBatchItemWriter;
import org.springframework.batch.item.database.JdbcPagingItemReader;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration
public class BatchUpdateStudentAddressStep {

@Autowired
public StepBuilderFactory stepBuilderFactory;

@Autowired
@Qualifier(value = "studentItemReader")
private JdbcPagingItemReader<Student> studentItemReader;

@Autowired
@Qualifier(value = "studentItemUpdateAddress")
private JdbcBatchItemWriter<Student> studentItemUpdateAddress;

@Autowired
private AppendStudentAddressProcessor appendStudentAddressProcessor;

@Bean("batchUpdateStudentAddressStep1")
public Step batchUpdateStudentAddressStep1() {
return stepBuilderFactory.get("batchUpdateStudentAddressStep1")
.<Student, Student>chunk(1000)
.reader(studentItemReader)
// 住址追加 8
.processor(appendStudentAddressProcessor)
.writer(studentItemUpdateAddress)
.build();
}
}

BatchProcessStudentStep

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
import com.example.springbatchdemo.component.processor.StudentItemProcessor;
import com.example.springbatchdemo.entity.Student;
import org.springframework.batch.core.Step;
import org.springframework.batch.core.configuration.annotation.StepBuilderFactory;
import org.springframework.batch.item.database.JdbcBatchItemWriter;
import org.springframework.batch.item.database.JdbcPagingItemReader;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration
public class BatchProcessStudentStep {

@Autowired
public StepBuilderFactory stepBuilderFactory;

@Autowired
@Qualifier(value = "studentItemReader")
private JdbcPagingItemReader<Student> studentItemReader;

@Autowired
@Qualifier(value = "studentItemWriter")
private JdbcBatchItemWriter<Student> studentItemWriter;

@Autowired
private StudentItemProcessor studentItemProcessor;

@Bean("batchTransferStudentStep1")
public Step batchTransferStudentStep1() {
return stepBuilderFactory.get("batchTransferStudentStep1")
.<Student, Student>chunk(1000)
.reader(studentItemReader)
// 迁移数据; student_source -> student_target
.processor(studentItemProcessor)
.writer(studentItemWriter)
.build();
}
}
3.4 数据输入器
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
import com.example.springbatchdemo.component.reader.rowmapper.StudentRowMapper;
import com.example.springbatchdemo.entity.Person;
import com.example.springbatchdemo.entity.Student;
import org.springframework.batch.core.configuration.annotation.StepScope;
import org.springframework.batch.item.database.JdbcPagingItemReader;
import org.springframework.batch.item.database.Order;
import org.springframework.batch.item.database.builder.JdbcPagingItemReaderBuilder;
import org.springframework.batch.item.database.support.MySqlPagingQueryProvider;
import org.springframework.batch.item.file.FlatFileItemReader;
import org.springframework.batch.item.file.builder.FlatFileItemReaderBuilder;
import org.springframework.batch.item.file.mapping.BeanWrapperFieldSetMapper;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.core.io.ClassPathResource;
import javax.sql.DataSource;
import java.util.HashMap;
import java.util.Map;

@Configuration
public class CustomItemReader {

@Autowired
@Qualifier(value = "batchDemoDB")
private DataSource batchDemoDB;

@Bean("studentItemReader")
@StepScope
public JdbcPagingItemReader<Student> studentItemReader() {

MySqlPagingQueryProvider queryProvider = new MySqlPagingQueryProvider();
queryProvider.setSelectClause("student_id, name, address");
queryProvider.setFromClause("from student_source");

Map<String, Order> sortKeys = new HashMap<>(1);
sortKeys.put("student_id", Order.ASCENDING);
queryProvider.setSortKeys(sortKeys);

return new JdbcPagingItemReaderBuilder<Student>()
.name("studentItemReader")
.dataSource(batchDemoDB)
.fetchSize(1000)
.rowMapper(new StudentRowMapper())
.queryProvider(queryProvider)
.build();
}
}
3.5 数据处理器

AppendStudentNameOneProcessor

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
import com.example.springbatchdemo.entity.Student;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.batch.item.ItemProcessor;
import org.springframework.context.annotation.Configuration;

@Configuration
public class AppendStudentNameOneProcessor implements ItemProcessor<Student, Student> {

private static final Logger log = LoggerFactory.getLogger(AppendStudentNameOneProcessor.class);

@Override
public Student process(final Student studentSource) throws Exception {

final Long studentId = studentSource.getStudentId();
final String name = studentSource.getName();
final String address = studentSource.getAddress();

final Student studentTarget = new Student();
studentTarget.setStudentId(studentId);
studentTarget.setName(name.concat("_1"));
studentTarget.setAddress(address);

log.info("Converting ({}) into ({})", studentSource, studentTarget);

return studentTarget;
}
}

AppendStudentNameTwoProcessor

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
import com.example.springbatchdemo.entity.Student;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.batch.item.ItemProcessor;
import org.springframework.context.annotation.Configuration;

@Configuration
public class AppendStudentNameTwoProcessor implements ItemProcessor<Student, Student> {

private static final Logger log = LoggerFactory.getLogger(AppendStudentNameTwoProcessor.class);

@Override
public Student process(final Student studentSource) throws Exception {

final Long studentId = studentSource.getStudentId();
final String name = studentSource.getName();
final String address = studentSource.getAddress();

final Student studentTarget = new Student();
studentTarget.setStudentId(studentId);
studentTarget.setName(name.concat("_2"));
studentTarget.setAddress(address);

log.info("Converting ({}) into ({})", studentSource, studentTarget);

return studentTarget;
}
}

AppendStudentAddressProcessor

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
import com.example.springbatchdemo.entity.Student;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.batch.item.ItemProcessor;
import org.springframework.context.annotation.Configuration;

@Configuration
public class AppendStudentAddressProcessor implements ItemProcessor<Student, Student> {

private static final Logger log = LoggerFactory.getLogger(AppendStudentAddressProcessor.class);

@Override
public Student process(final Student studentSource) throws Exception {

final Long studentId = studentSource.getStudentId();
final String name = studentSource.getName();
final String address = studentSource.getAddress();

final Student studentTarget = new Student();
studentTarget.setStudentId(studentId);
studentTarget.setName(name);
studentTarget.setAddress(address.concat("_8"));

log.info("Converting ({}) into ({})", studentSource, studentTarget);

return studentTarget;
}
}

StudentItemProcessor

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
import com.example.springbatchdemo.entity.Student;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.batch.item.ItemProcessor;
import org.springframework.context.annotation.Configuration;

@Configuration
public class StudentItemProcessor implements ItemProcessor<Student, Student> {

private static final Logger log = LoggerFactory.getLogger(StudentItemProcessor.class);

@Override
public Student process(final Student studentSource) throws Exception {

final Long studentId = studentSource.getStudentId();
final String name = studentSource.getName();
final String address = studentSource.getAddress();

final Student studentTarget = new Student();
studentTarget.setStudentId(studentId);
studentTarget.setName(name);
studentTarget.setAddress(address);

log.info("Converting ({}) into ({})", studentSource, studentTarget);

return studentTarget;
}
}
3.6 数据输出器
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
import com.example.springbatchdemo.entity.Person;
import com.example.springbatchdemo.entity.Student;
import org.springframework.batch.core.configuration.annotation.StepScope;
import org.springframework.batch.item.database.BeanPropertyItemSqlParameterSourceProvider;
import org.springframework.batch.item.database.JdbcBatchItemWriter;
import org.springframework.batch.item.database.builder.JdbcBatchItemWriterBuilder;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import javax.sql.DataSource;

@Configuration
public class CustomItemWriter {

@Autowired
@Qualifier(value = "batchDemoDB")
private DataSource batchDemoDB;

@Bean("studentItemWriter")
public JdbcBatchItemWriter<Student> studentItemWriter() {

return new JdbcBatchItemWriterBuilder<Student>()
.itemSqlParameterSourceProvider(new BeanPropertyItemSqlParameterSourceProvider<>())
.sql("INSERT INTO student_target (student_id, name, address) VALUES (:studentId, :name, :address)")
.dataSource(batchDemoDB)
.build();
}

@Bean("studentItemUpdateName")
@StepScope
public JdbcBatchItemWriter<Student> studentItemUpdateName() {

return new JdbcBatchItemWriterBuilder<Student>()
.itemSqlParameterSourceProvider(new BeanPropertyItemSqlParameterSourceProvider<>())
.sql("UPDATE student_source SET name = :name WHERE student_id = :studentId")
.dataSource(batchDemoDB)
.build();
}

@Bean("studentItemUpdateAddress")
public JdbcBatchItemWriter<Student> studentItemUpdateAddress() {

return new JdbcBatchItemWriterBuilder<Student>()
.itemSqlParameterSourceProvider(new BeanPropertyItemSqlParameterSourceProvider<>())
.sql("UPDATE student_source SET address = :address WHERE student_id = :studentId")
.dataSource(batchDemoDB)
.build();
}
}

@StepScope:

从上面的 Step 配置可知,studentItemReader 被多个 Step 引用。默认情况下 studentItemReader 的生命周期是与 Job 保持一致,那么在多 Step 引用的情况下,就会抛出类似下面这种异常:

1
>Caused by: java.lang.IllegalStateException: Cannot open an already opened ItemReader, call close first

使用注解 StepScope,让 studentItemReader 的生命周期与 Step 保持同步,保证每个 Step 拿到的 ItemReader 都是新的实例。同样,ItemWriterItemProcessor 存在多 Step 引用的,都要使用该注解。

四、性能测试

测试数据量:100000

测试环境:Windows 10,i7-8核,MySQL-8.0.28

4.1 串行 Step

串行 Step 批处理,只需要按照顺序配置 Step(省略代码示例)。测试结果:

耗时:91s

4.2 并行 Step

测试结果:

耗时:68s

示例代码:spring-batch-demo


一、Spring Batch 性能优化指标

Spring Batch 是一款伸缩性非常好的批处理工具,既可以处理简单的任务,也可以处理复杂的、高容量的任务。在性能调优方面,Spring Batch 提供了丰富的接口支持,各项优化指标可归纳如下:

  • 多线程 Step:由独立线程执行提交块(a chunk of items)的输入、处理和输出过程;
  • 并行化 Step:对于可并行处理的 Step,交由不同的线程去处理;
  • 分片化 Step:通过 SPI(Serial Peripheral Interface),对 Step 分片执行;
  • 远程组块:对于输入无性能瓶颈,但处理和输出有性能瓶颈的任务,交由远程组块执行;

详见Spring文档

二、多线程 Step 配置

Spring Batch 执行一个 Step,会按照 chunk 配置的数量分批次提交。对于多线程 Step,由线程池去处理任务批次。因此,每个 chunk 都不用串行等待,这大大地提高了批处理性能。

配置多线程 Step 非常简单,可以通过 xml 或接口来配置。以接口配置为例:

1
2
3
4
5
6
7
8
9
10
11
@Bean
public Step sampleStep(TaskExecutor taskExecutor) {
return this.stepBuilderFactory.get("sampleStep")
.<String, String>chunk(10)
.reader(itemReader())
.writer(itemWriter())
.taskExecutor(taskExecutor)
// 节流配置, 不要超过线程池的最大线程数量
.throttleLimit(20)
.build();
}

此外,在配置多线程 Step 时,我们需要考虑得更多:

  • 线程池:推荐使用 Spring 线程池 ThreadPoolTaskExecutor,兼容性好;
  • 线程安全:输入器和输出器必须是线程安全的,否则可能会导致重复任务、脏数据等问题;
  • 框架节流:Spring Batch 自带节流器,默认最多可处理 4 个小任务,因此需要重新配置;

三、批处理配置

通过 Spring Batch 应用,迁移 100 万条数据。相关配置如下:

3.1 数据读取器
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
import com.example.springbatchdemo.component.reader.rowmapper.StudentRowMapper;
import com.example.springbatchdemo.entity.Student;
import org.springframework.batch.item.database.JdbcPagingItemReader;
import org.springframework.batch.item.database.Order;
import org.springframework.batch.item.database.builder.JdbcPagingItemReaderBuilder;
import org.springframework.batch.item.database.support.MySqlPagingQueryProvider;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import javax.sql.DataSource;
import java.util.HashMap;
import java.util.Map;

@Configuration
public class CustomItemReader {

@Autowired
@Qualifier(value = "batchDemoDB")
private DataSource batchDemoDB;

@Bean("studentItemReader")
public JdbcPagingItemReader<Student> studentItemReader() {

MySqlPagingQueryProvider queryProvider = new MySqlPagingQueryProvider();
queryProvider.setSelectClause("student_id, name, address");
queryProvider.setFromClause("from student_source");

Map<String, Order> sortKeys = new HashMap<>(1);
sortKeys.put("student_id", Order.ASCENDING);
queryProvider.setSortKeys(sortKeys);

return new JdbcPagingItemReaderBuilder<Student>()
.name("studentItemReader")
.dataSource(batchDemoDB)
.fetchSize(1000)
.rowMapper(new StudentRowMapper())
.queryProvider(queryProvider)
.build();
}
}
3.2 数据映射器
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
import com.example.springbatchdemo.entity.Student;
import org.springframework.jdbc.core.RowMapper;
import java.sql.ResultSet;
import java.sql.SQLException;

public class StudentRowMapper implements RowMapper<Student> {

@Override
public Student mapRow(ResultSet rs, int rowNum) throws SQLException {
Student student = new Student();
student.setStudentId(rs.getLong("student_id"));
student.setName(rs.getString("name"));
student.setAddress(rs.getString("address"));
return student;
}
}
3.3 数据处理器
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
import com.example.springbatchdemo.entity.Student;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.batch.item.ItemProcessor;
import org.springframework.context.annotation.Configuration;

@Configuration
public class StudentItemProcessor implements ItemProcessor<Student, Student> {

private static final Logger log = LoggerFactory.getLogger(StudentItemProcessor.class);

@Override
public Student process(final Student studentSource) throws Exception {

final Long studentId = studentSource.getStudentId();
final String name = studentSource.getName();
final String address = studentSource.getAddress();

final Student studentTarget = new Student();
studentTarget.setStudentId(studentId);
studentTarget.setName(name);
studentTarget.setAddress(address);

log.info("Converting ({}) into ({})", studentSource, studentTarget);

return studentTarget;
}
}
3.4 数据写入器
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
import com.example.springbatchdemo.entity.Student;
import org.springframework.batch.item.database.BeanPropertyItemSqlParameterSourceProvider;
import org.springframework.batch.item.database.JdbcBatchItemWriter;
import org.springframework.batch.item.database.builder.JdbcBatchItemWriterBuilder;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import javax.sql.DataSource;

@Configuration
public class CustomItemWriter {

@Autowired
@Qualifier(value = "batchDemoDB")
private DataSource batchDemoDB;

@Bean("studentItemWriter")
public JdbcBatchItemWriter<Student> studentItemWriter() {

return new JdbcBatchItemWriterBuilder<Student>()
.itemSqlParameterSourceProvider(new BeanPropertyItemSqlParameterSourceProvider<>())
.sql("INSERT INTO student_target (student_id, name, address) VALUES (:studentId, :name, :address)")
.dataSource(batchDemoDB)
.build();
}
}
3.5 Step 配置-单线程
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
import com.example.springbatchdemo.component.processor.StudentItemProcessor;
import com.example.springbatchdemo.entity.Student;
import org.springframework.batch.core.Step;
import org.springframework.batch.core.configuration.annotation.StepBuilderFactory;
import org.springframework.batch.item.database.JdbcBatchItemWriter;
import org.springframework.batch.item.database.JdbcPagingItemReader;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration
public class BatchProcessStudentStep {

@Autowired
public StepBuilderFactory stepBuilderFactory;

@Autowired
@Qualifier(value = "studentItemReader")
private JdbcPagingItemReader<Student> studentItemReader;

@Autowired
@Qualifier(value = "studentItemWriter")
private JdbcBatchItemWriter<Student> studentItemWriter;

@Autowired
private StudentItemProcessor studentItemProcessor;

@Bean("batchProcessStudentStep1")
public Step step1() {
return stepBuilderFactory.get("step1")
.<Student, Student>chunk(2000)
.reader(studentItemReader)
.processor(studentItemProcessor)
.writer(studentItemWriter)
.build();
}
}
3.6 Step 配置-多线程
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
import com.example.springbatchdemo.component.processor.StudentItemProcessor;
import com.example.springbatchdemo.entity.Student;
import org.springframework.batch.core.Step;
import org.springframework.batch.core.configuration.annotation.StepBuilderFactory;
import org.springframework.batch.item.database.JdbcBatchItemWriter;
import org.springframework.batch.item.database.JdbcPagingItemReader;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
import static com.example.springbatchdemo.config.ExecutorConfig.TASK_EXECUTOR;

@Configuration
public class BatchProcessStudentStep {

@Autowired
public StepBuilderFactory stepBuilderFactory;

@Autowired
@Qualifier(value = "studentItemReader")
private JdbcPagingItemReader<Student> studentItemReader;

@Autowired
@Qualifier(value = "studentItemWriter")
private JdbcBatchItemWriter<Student> studentItemWriter;

@Autowired
private StudentItemProcessor studentItemProcessor;

@Autowired
@Qualifier(value = TASK_EXECUTOR)
private ThreadPoolTaskExecutor taskExecutor;

@Bean("batchProcessStudentStep1")
public Step step1() {
return stepBuilderFactory.get("step1")
.<Student, Student>chunk(2000)
.reader(studentItemReader)
.processor(studentItemProcessor)
.writer(studentItemWriter)
.taskExecutor(taskExecutor)
.throttleLimit(30)
.build();
}
}
3.7 Job 配置
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
import com.example.springbatchdemo.component.listener.BatchProcessStudentCompletionListener;
import org.springframework.batch.core.Job;
import org.springframework.batch.core.Step;
import org.springframework.batch.core.configuration.annotation.EnableBatchProcessing;
import org.springframework.batch.core.configuration.annotation.JobBuilderFactory;
import org.springframework.batch.core.launch.support.RunIdIncrementer;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration
@EnableBatchProcessing
public class BatchProcessStudentJob {

@Autowired
public JobBuilderFactory jobBuilderFactory;

@Autowired
@Qualifier(value = "batchProcessStudentStep1")
private Step batchProcessStudentStep1;

@Autowired
private BatchProcessStudentCompletionListener batchProcessStudentCompletionListener;

@Bean
public Job transferStudentJob() {
return jobBuilderFactory.get("transferStudentJob")
.incrementer(new RunIdIncrementer())
.listener(batchProcessStudentCompletionListener)
.flow(batchProcessStudentStep1)
.end()
.build();
}
}
3.8 MySQL 数据源配置
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
import com.zaxxer.hikari.HikariDataSource;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.boot.jdbc.DataSourceBuilder;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Primary;
import javax.sql.DataSource;

@Configuration
public class DataSourceConfig {

@Primary
@Bean(name = "batchDemoDB")
// 数据源配置参数识别前缀, 根据具体配置来设定
@ConfigurationProperties(prefix = "spring.datasource.batch-demo")
public DataSource druidDataSource() {
// 使用 SpringBoot 默认的数据源 HikariDataSource
return DataSourceBuilder.create().type(HikariDataSource.class).build();
}
}
3.9 线程池配置
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
import java.util.concurrent.ThreadPoolExecutor;

@Configuration
public class ExecutorConfig {

public static final String TASK_EXECUTOR = "taskExecutor";

@Bean(TASK_EXECUTOR)
public ThreadPoolTaskExecutor taskExecutor() {
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
executor.setCorePoolSize(20);
executor.setMaxPoolSize(30);
executor.setQueueCapacity(10);
executor.setKeepAliveSeconds(60);
executor.setThreadNamePrefix("common-async-executor-");
executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
executor.setWaitForTasksToCompleteOnShutdown(true);
executor.setAwaitTerminationSeconds(60);
return executor;
}
}

四、批处理性能测试

4.1 单线程 Step

启动批处理任务,同步 100 万条数据。执行结果如下:

总耗时:313 秒

4.2 多线程 Step

启动批处理任务,同步 100 万条数据。执行结果如下:

总耗时:81 秒


性能提升超300%

五、总结

多线程 Stepchunk 任务交给线程池异步执行,可以显著地提升批处理的性能。但在多线程场景下,我们要了解 Spring Batch 的基础架构,避免并发导致的重复任务、脏数据等问题。

示例代码:spring-batch-demo


一、Spring Batch 数据输出器

Spring Batch 的数据输出器,是通过接口 ItemWriter 来实现的。针对常用的数据输出场景,Spring Batch 提供了丰富的组件支持(查看所有组件),本文介绍最常用的五个组件:

  • FlatFileItemWriter:输出文本数据;
  • JdbcBatchItemWriter:输出数据到数据库;
  • StaxEventItemWriter:输出 XML 文件数据;
  • JsonFileItemWriter:输出 JSON 文件数据;
  • ClassifierCompositeItemWriter:输出多文本数据;

二、简单使用

实体类 Ticket.class

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
import lombok.Data;
import java.math.BigDecimal;

@Data
public class Ticket {

/**
* 始发站
*/
private String departureStation;

/**
* 到达站
*/
private String arrivalStation;

/**
* 票价
*/
private BigDecimal price;

@Override
public String toString() {
return String.format("始发站: %s; 到达站: %s; 票价: %s", departureStation, arrivalStation, price);
}
}

文件 ticket.csv

1
2
3
4
5
合肥,蚌埠,60.00
南京,蚌埠,70.00
上海,蚌埠,220.00
上海,杭州,75.20
上海,昆山,19.00
2.1 FlatFileItemWriter-文本数据输出

ticket.csv 中的信息,转换为 JSON 字符串,输出到文件 ticket_output.txt 中:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
/**
* Job
*/
@Bean
public Job testFlatFileItemWriterJob() {
return jobBuilderFactory.get("testFlatFileItemWriterJob")
.incrementer(new RunIdIncrementer())
.flow(testFlatFileItemWriterStep)
.end()
.build();
}

/**
* Step
*/
@Bean("testFlatFileItemWriterStep")
public Step testFlatFileItemWriterStep(PlatformTransactionManager transactionManager) {
return stepBuilderFactory.get("testFlatFileItemWriterStep")
.transactionManager(transactionManager)
.reader(ticketFileItemReader)
.writer(ticketFileItemWriter)
.build();
}

/**
* Reader
*/
@Bean("ticketFileItemReader")
public FlatFileItemReader<Ticket> ticketFileItemReader() {
return new FlatFileItemReaderBuilder<Ticket>()
.name("ticketFileItemReader")
.resource(new ClassPathResource("ticket.csv"))
.delimited()
.names(new String[]{"departureStation", "arrivalStation", "price"})
.fieldSetMapper(new BeanWrapperFieldSetMapper<Ticket>() {{
setTargetType(Ticket.class);
}})
.build();
}

/**
* Writer
*/
@Bean("ticketFileItemWriter")
public FlatFileItemWriter<Ticket> ticketFileItemWriter() {

// 聚合器; JSON 序列化
LineAggregator<Ticket> aggregator = item -> {
try {
ObjectMapper mapper = new ObjectMapper();
return mapper.writeValueAsString(item);
} catch (JsonProcessingException e) {
LOGGER.error("parse object to json error: {}", e.getMessage(), e);
}
return "";
};

return new FlatFileItemWriterBuilder<Ticket>()
.name("ticketFileItemWriter")
.resource(new FileSystemResource("ticket_output.txt"))
.lineAggregator(aggregator)
.build();
}

输出文本数据,要求目标数据的格式为字符串,因此需要将 POJO 按照一定规则聚合成字符串。Spring Batch 已实现聚合器 LineAggregatorPassThroughLineAggregator(打印 POJO)、RecursiveCollectionLineAggregator(打印 POJO 列表)、DelimitedLineAggregator(分隔符拼接 POJO 字段值)、FormatterLineAggregator(格式化 POJO 字段值)。当然,我们也可以手动实现聚合器,例如示例代码中,将 POJO 转换为 JSON 格式。

启动应用,生成文件 ticket_output.txt

1
2
3
4
5
{"departureStation":"合肥","arrivalStation":"蚌埠","price":60.00}
{"departureStation":"南京","arrivalStation":"蚌埠","price":70.00}
{"departureStation":"上海","arrivalStation":"蚌埠","price":220.00}
{"departureStation":"上海","arrivalStation":"杭州","price":75.20}
{"departureStation":"上海","arrivalStation":"昆山","price":19.00}
2.2 JdbcBatchItemWriter-数据库数据输出

将文件 student.cvs 中的信息(内容如下),导入到 MySQL 数据表 student 中:

1
2
3
1,张三,合肥
2,李四,蚌埠
3,王二,南京
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
/**
* Job
*/
@Bean
public Job testDatabaseItemWriterJob() {
return jobBuilderFactory.get("testDatabaseItemWriterJob")
.incrementer(new RunIdIncrementer())
.flow(testDatabaseItemWriterStep)
.end()
.build();
}

/**
* Step
*/
@Bean("testDatabaseItemWriterStep")
public Step testDatabaseItemWriterStep(PlatformTransactionManager transactionManager) {
return stepBuilderFactory.get("testDatabaseItemWriterStep")
.transactionManager(transactionManager)
.<Student, Student>chunk(10)
.reader(studentFileItemReader)
.writer(studentItemWriter)
.build();
}

/**
* Reader
*/
@Bean("studentFileItemReader")
public FlatFileItemReader<Student> studentFileItemReader() {
return new FlatFileItemReaderBuilder<Student>()
.name("ticketFileItemReader")
.resource(new ClassPathResource("student.csv"))
.delimited()
.names(new String[]{"studentId", "name", "address"})
.fieldSetMapper(new BeanWrapperFieldSetMapper<Student>() {{
setTargetType(Student.class);
}})
.build();
}

/**
* Writer
*/
@Bean("studentItemWriter")
public JdbcBatchItemWriter<Student> studentItemWriter() {
return new JdbcBatchItemWriterBuilder<Student>()
.itemSqlParameterSourceProvider(new BeanPropertyItemSqlParameterSourceProvider<>())
.sql("INSERT INTO student_target (student_id, name, address) VALUES (:studentId, :name, :address)")
.dataSource(batchDemoDB)
.build();
}

/**
* MySQL 数据源配置
*/
@Primary
@Bean(name = "batchDemoDB")
@ConfigurationProperties(prefix = "spring.datasource.batch-demo")
public DataSource druidDataSource() {
return DataSourceBuilder.create().type(HikariDataSource.class).build();
}

启动应用,文件中的数据已导入表 student

2.3 StaxEventItemWriter-XML 文件数据输出

ticket.csv 中的信息,输出到文件 ticket_output.xml 中:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
/**
* Job
*/
@Bean
public Job testXmlItemWriterJob() {
return jobBuilderFactory.get("testXmlItemWriterJob")
.incrementer(new RunIdIncrementer())
.flow(testXmlItemWriterStep)
.build();
}

/**
* Step
*/
@Bean("testXmlItemWriterStep")
public Step testXmlItemWriterStep(PlatformTransactionManager transactionManager) {
return stepBuilderFactory.get("testXmlItemWriterStep")
.transactionManager(transactionManager)
.<Ticket, Ticket>chunk(10)
.reader(ticketFileItemReader)
.writer(ticketXmlItemWriter)
.build();
}


/**
* Reader
*/
@Bean("ticketFileItemReader")
public FlatFileItemReader<Ticket> ticketFileItemReader() {
return new FlatFileItemReaderBuilder<Ticket>()
.name("ticketFileItemReader")
.resource(new ClassPathResource("ticket.csv"))
.delimited()
.names(new String[]{"departureStation", "arrivalStation", "price"})
.fieldSetMapper(new BeanWrapperFieldSetMapper<Ticket>() {{
setTargetType(Ticket.class);
}})
.build();
}

/**
* Writer
*/
@Bean("ticketXmlItemWriter")
public StaxEventItemWriter<Ticket> ticketXmlItemWriter() {
return new StaxEventItemWriterBuilder<Ticket>()
.name("ticketXmlItemWriter")
.marshaller(ticketMarshaller)
.resource(new FileSystemResource("ticket_output.xml"))
.rootTagName("tickets")
.overwriteOutput(true)
.build();
}

/**
* 映射器
*/
@Bean("ticketMarshaller")
public XStreamMarshaller ticketMarshaller() {

Map<String, Class<Ticket>> aliases = new HashMap<>(1);
aliases.put("ticket", Ticket.class);

XStreamMarshaller marshaller = new XStreamMarshaller();
marshaller.setAliases(aliases);

return marshaller;
}

启动应用,生成文件 ticket_output.xml

1
<?xml version="1.0" encoding="UTF-8"?><tickets><ticket><departureStation>合肥</departureStation><arrivalStation>蚌埠</arrivalStation><price>60.00</price></ticket><ticket><departureStation>南京</departureStation><arrivalStation>蚌埠</arrivalStation><price>70.00</price></ticket><ticket><departureStation>上海</departureStation><arrivalStation>蚌埠</arrivalStation><price>220.00</price></ticket><ticket><departureStation>上海</departureStation><arrivalStation>杭州</arrivalStation><price>75.20</price></ticket><ticket><departureStation>上海</departureStation><arrivalStation>昆山</arrivalStation><price>19.00</price></ticket></tickets>
2.4 JsonFileItemWriter-JSON文件数据输出

ticket.csv 中的信息,输出到文件 ticket_output.json 中:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
/**
* Job
*/
@Bean
public Job testJsonItemWriterJob() {
return jobBuilderFactory.get("testJsonItemWriterJob")
.incrementer(new RunIdIncrementer())
.flow(testJsonItemWriterStep)
.end()
.build();
}

/**
* Step
*/
@Bean("testJsonItemWriterStep")
public Step testJsonItemWriterStep(PlatformTransactionManager transactionManager) {
return stepBuilderFactory.get("testJsonItemWriterStep")
.transactionManager(transactionManager)
.<Ticket, Ticket>chunk(10)
.reader(ticketFileItemReader)
.writer(ticketJsonItemWriter)
.build();
}

/**
* Reader
*/
@Bean("ticketFileItemReader")
public FlatFileItemReader<Ticket> ticketFileItemReader() {
return new FlatFileItemReaderBuilder<Ticket>()
.name("ticketFileItemReader")
.resource(new ClassPathResource("ticket.csv"))
.delimited()
.names(new String[]{"departureStation", "arrivalStation", "price"})
.fieldSetMapper(new BeanWrapperFieldSetMapper<Ticket>() {{
setTargetType(Ticket.class);
}})
.build();
}

/**
* Writer
*/
@Bean("ticketJsonItemWriter")
public JsonFileItemWriter<Ticket> ticketJsonItemWriter() {
return new JsonFileItemWriterBuilder<Ticket>()
.jsonObjectMarshaller(new JacksonJsonObjectMarshaller<>())
.resource(new FileSystemResource("ticket_output.json"))
.name("ticketJsonItemWriter")
.build();
}

启动应用,生成文件 ticket_output.json

1
2
3
4
5
6
7
[
{"departureStation":"合肥","arrivalStation":"蚌埠","price":60.00},
{"departureStation":"南京","arrivalStation":"蚌埠","price":70.00},
{"departureStation":"上海","arrivalStation":"蚌埠","price":220.00},
{"departureStation":"上海","arrivalStation":"杭州","price":75.20},
{"departureStation":"上海","arrivalStation":"昆山","price":19.00}
]
2.5 ClassifierCompositeItemWriter-输出多文本数据

将文件 ticket.csv 中始发站为上海的车票信息输出到文本中,其余的输出到 XML 文件中:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
/**
* Job
*/
@Bean
public Job testMultiFileItemWriterJob() {
return jobBuilderFactory.get("testMultiFileItemWriterJob")
.incrementer(new RunIdIncrementer())
.flow(testMultiFileItemWriterStep)
.end()
.build();
}

/**
* Step
*/
@Bean("testMultiFileItemWriterStep")
public Step testMultiFileItemWriterStep(PlatformTransactionManager transactionManager) {
return stepBuilderFactory.get("testMultiFileItemWriterStep")
.transactionManager(transactionManager)
.<Ticket, Ticket>chunk(10)
.reader(ticketFileItemReader)
.writer(ticketClassifierMultiFileItemWriter)
.stream(ticketFileItemWriter)
.stream(ticketXmlItemWriter)
.build();
}

/**
* Reader
*/
@Bean("ticketFileItemReader")
public FlatFileItemReader<Ticket> ticketFileItemReader() {
return new FlatFileItemReaderBuilder<Ticket>()
.name("ticketFileItemReader")
.resource(new ClassPathResource("ticket.csv"))
.delimited()
.names(new String[]{"departureStation", "arrivalStation", "price"})
.fieldSetMapper(new BeanWrapperFieldSetMapper<Ticket>() {{
setTargetType(Ticket.class);
}})
.build();
}

/**
* Classifier Writer
*/
@Bean("ticketClassifierMultiFileItemWriter")
public ClassifierCompositeItemWriter<Ticket> ticketClassifierMultiFileItemWriter() {
ClassifierCompositeItemWriter<Ticket> writer = new ClassifierCompositeItemWriter<>();
writer.setClassifier((Classifier<Ticket, ItemWriter<? super Ticket>>) ticket -> {
// 始发站是上海的, 输出到文本中, 否则输出到 XML 文件中
return "上海".equals(ticket.getDepartureStation()) ? ticketFileItemWriter() : ticketXmlItemWriter();
});
return writer;
}

/**
* 文本-Writer
*/
@Bean("ticketFileItemWriter")
public FlatFileItemWriter<Ticket> ticketFileItemWriter() {

// 聚合器; JSON 序列化
LineAggregator<Ticket> aggregator = item -> {
try {
ObjectMapper mapper = new ObjectMapper();
return mapper.writeValueAsString(item);
} catch (JsonProcessingException e) {
LOGGER.error("parse object to json error: {}", e.getMessage(), e);
}
return "";
};

return new FlatFileItemWriterBuilder<Ticket>()
.name("ticketFileItemWriter")
.resource(new FileSystemResource("ticket_output.txt"))
.lineAggregator(aggregator)
.build();
}

/**
* XML-Writer
*/
@Bean("ticketXmlItemWriter")
public StaxEventItemWriter<Ticket> ticketXmlItemWriter() {
return new StaxEventItemWriterBuilder<Ticket>()
.name("ticketXmlItemWriter")
.marshaller(ticketMarshaller)
.resource(new FileSystemResource("ticket_output.xml"))
.rootTagName("tickets")
.overwriteOutput(true)
.build();
}

/**
* 映射器
*/
@Bean("ticketMarshaller")
public XStreamMarshaller ticketMarshaller() {

Map<String, Class<Ticket>> aliases = new HashMap<>(1);
aliases.put("ticket", Ticket.class);

XStreamMarshaller marshaller = new XStreamMarshaller();
marshaller.setAliases(aliases);

return marshaller;
}

启动应用,生成文件如下:

ticket_output.txt

1
2
3
{"departureStation":"上海","arrivalStation":"蚌埠","price":220.00}
{"departureStation":"上海","arrivalStation":"杭州","price":75.20}
{"departureStation":"上海","arrivalStation":"昆山","price":19.00}

ticket_output.xml

1
<?xml version="1.0" encoding="UTF-8"?><tickets><ticket><departureStation>合肥</departureStation><arrivalStation>蚌埠</arrivalStation><price>60.00</price></ticket><ticket><departureStation>南京</departureStation><arrivalStation>蚌埠</arrivalStation><price>70.00</price></ticket></tickets>

示例代码:spring-batch-demo


一、Spring Batch 数据读取器

Spring Batch 的数据读取器,是通过接口 ItemReader 来实现的。针对常用的数据读取场景,Spring Batch 提供了丰富的组件支持(查看所有组件),本文介绍最常用的五个组件:

  • FlatFileItemReader:读取文本数据;
  • JdbcPagingItemReader:分页读取数据库的数据;
  • StaxEventItemReader:读取 XML 文件数据;
  • JsonItemReader:读取 JSON 文件数据;
  • MultiResourceItemReader:读取多文本数据;

二、简单使用

实体类 Ticket.class

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
import lombok.Data;
import java.math.BigDecimal;

@Data
public class Ticket {

/**
* 始发站
*/
private String departureStation;

/**
* 到达站
*/
private String arrivalStation;

/**
* 票价
*/
private BigDecimal price;

@Override
public String toString() {
return String.format("始发站: %s; 到达站: %s; 票价: %s", departureStation, arrivalStation, price);
}
}
2.1 FlatFileItemReader-文本数据读取

文件 ticket.csv

1
2
3
4
5
合肥,蚌埠,60.00
南京,蚌埠,70.00
上海,蚌埠,220.00
上海,杭州,75.20
上海,昆山,19.00

可以看到,文本数据的每一行代表一个 Ticket 实体,对象属性之间以英文逗号分隔。通过 FlatFileItemReader,可以按照行将文本数据转换为 POJO 存储。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
/**
* Job
*/
@Bean
public Job testFlatItemFileReaderJob() {
return jobBuilderFactory.get("testFlatItemFileReaderJob")
.incrementer(new RunIdIncrementer())
.flow(testFlatFileItemReaderStep)
.end()
.build();
}

/**
* Step
*/
@Bean("testFlatFileItemReaderStep")
public Step testFlatFileItemReaderStep(PlatformTransactionManager transactionManager) {
return stepBuilderFactory.get("testFlatFileItemReaderStep")
.transactionManager(transactionManager)
.<Ticket, Ticket>chunk(10)
.reader(ticketFileItemReader)
.writer(list -> list.forEach(System.out::println))
.build();
}

/**
* Reader
*/
@Bean("ticketFileItemReader")
public FlatFileItemReader<Ticket> ticketFileItemReader() {
return new FlatFileItemReaderBuilder<Ticket>()
.name("ticketFileItemReader")
.resource(new ClassPathResource("ticket.csv"))
.delimited()
.names(new String[]{"departureStation", "arrivalStation", "price"})
.fieldSetMapper(new BeanWrapperFieldSetMapper<Ticket>() {{
setTargetType(Ticket.class);
}})
.build();
}

启动应用,控制台打印日志:

1
2
3
4
5
6
7
8
2022-06-02 13:50:23.538  INFO 77808 --- [restartedMain] o.s.b.c.l.support.SimpleJobLauncher      : Job: [FlowJob: [name=testFlatItemFileReaderJob]] launched with the following parameters: [{run.id=2}]
2022-06-02 13:50:23.599 INFO 77808 --- [restartedMain] o.s.batch.core.job.SimpleStepHandler : Executing step: [testFlatFileItemReaderStep]
始发站: 合肥; 到达站: 蚌埠; 票价: 60.00
始发站: 南京; 到达站: 蚌埠; 票价: 70.00
始发站: 上海; 到达站: 蚌埠; 票价: 220.00
始发站: 上海; 到达站: 杭州; 票价: 75.20
始发站: 上海; 到达站: 昆山; 票价: 19.00
2022-06-02 13:50:23.680 INFO 77808 --- [restartedMain] o.s.batch.core.step.AbstractStep : Step: [testFlatFileItemReaderStep] executed in 79ms
2.2 JdbcPagingItemReader-数据库数据读取

MySQL 数据库,分页读取表 student 的数据,并打印数据内容。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
/**
* Job
*/
@Bean
public Job testDatabaseItemReaderJob() {
return jobBuilderFactory.get("testDatabaseItemReaderJob")
.incrementer(new RunIdIncrementer())
.flow(testDatabaseItemReaderStep)
.end()
.build();
}

/**
* Step
*/
@Bean("testDatabaseItemReaderStep")
public Step testDatabaseItemReaderStep(PlatformTransactionManager transactionManager) {
return stepBuilderFactory.get("testDatabaseItemReaderStep")
.transactionManager(transactionManager)
.<Student, Student>chunk(10)
.reader(studentItemReader)
.writer(list -> list.forEach(System.out::println))
.build();
}

/**
* Reader
*/
@Bean("studentItemReader")
public JdbcPagingItemReader<Student> studentItemReader() {

MySqlPagingQueryProvider queryProvider = new MySqlPagingQueryProvider();
queryProvider.setSelectClause("student_id, name, address");
queryProvider.setFromClause("from student_source");

Map<String, Order> sortKeys = new HashMap<>(1);
sortKeys.put("student_id", Order.ASCENDING);
queryProvider.setSortKeys(sortKeys);

return new JdbcPagingItemReaderBuilder<Student>()
.name("studentItemReader")
.dataSource(batchDemoDB)
.fetchSize(1000)
.rowMapper(new StudentRowMapper())
.queryProvider(queryProvider)
.build();
}

public class StudentRowMapper implements RowMapper<Student> {

/**
* Student 字段映射
*/
@Override
public Student mapRow(ResultSet rs, int rowNum) throws SQLException {

Student student = new Student();
student.setStudentId(rs.getLong("student_id"));
student.setName(rs.getString("name"));
student.setAddress(rs.getString("address"));
return student;
}
}

/**
* MySQL 数据源配置
*/
@Primary
@Bean(name = "batchDemoDB")
@ConfigurationProperties(prefix = "spring.datasource.batch-demo")
public DataSource druidDataSource() {
return DataSourceBuilder.create().type(HikariDataSource.class).build();
}

启动应用,控制台打印日志:

1
2
3
4
5
6
7
8
9
10
11
12
13
2022-06-02 14:00:19.010  INFO 67748 --- [restartedMain] o.s.b.c.l.support.SimpleJobLauncher      : Job: [FlowJob: [name=testDatabaseItemReaderJob]] launched with the following parameters: [{run.id=2}]
2022-06-02 14:00:19.107 INFO 67748 --- [restartedMain] o.s.batch.core.job.SimpleStepHandler : Executing step: [testDatabaseItemReaderStep]
name: 张三1, address: 上海市1
name: 张三2, address: 上海市2
name: 张三3, address: 上海市3
name: 张三4, address: 上海市4
name: 张三5, address: 上海市5
name: 张三6, address: 上海市6
name: 张三7, address: 上海市7
name: 张三8, address: 上海市8
name: 张三9, address: 上海市9
name: 张三10, address: 上海市10
2022-06-02 14:00:19.284 INFO 67748 --- [restartedMain] o.s.batch.core.step.AbstractStep : Step: [testDatabaseItemReaderStep] executed in 176ms
2.3 StaxEventItemReader-XML 数据读取

文件 ticket.xml

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
<?xml version="1.0" encoding="UTF-8"?>
<tickets>
<ticket>
<departureStation>合肥</departureStation>
<arrivalStation>蚌埠</arrivalStation>
<price>60.00</price>
</ticket>
<ticket>
<departureStation>南京</departureStation>
<arrivalStation>蚌埠</arrivalStation>
<price>70.00</price>
</ticket>
<ticket>
<departureStation>上海</departureStation>
<arrivalStation>蚌埠</arrivalStation>
<price>220.00</price>
</ticket>
<ticket>
<departureStation>上海</departureStation>
<arrivalStation>杭州</arrivalStation>
<price>75.20</price>
</ticket>
<ticket>
<departureStation>上海</departureStation>
<arrivalStation>昆山</arrivalStation>
<price>19.00</price>
</ticket>
</tickets>

可以看到,文件内容是多组 ticket 标签组成的,每一个标签代表一个 Ticket 实体;每个 ticket 标签,内含 3 个子标签,代表 Ticket 实体的 3 个属性值。

涉及到 XMLObject 的映射,因此需要引入 OXM 技术。推荐使用 spring oxm,pom 依赖:

1
2
3
4
5
6
7
8
9
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-oxm</artifactId>
</dependency>
<dependency>
<groupId>com.thoughtworks.xstream</groupId>
<artifactId>xstream</artifactId>
<version>1.4.11.1</version>
</dependency>
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
/**
* Job
*/
@Bean
public Job testXmlItemReaderJob() {
return jobBuilderFactory.get("testXmlItemReaderJob")
.incrementer(new RunIdIncrementer())
.flow(testXmlItemReaderStep)
.end()
.build();
}

/**
* Step
*/
@Bean("testXmlItemReaderStep")
public Step testXmlItemReaderStep(PlatformTransactionManager transactionManager) {
return stepBuilderFactory.get("testXmlItemReaderStep")
.transactionManager(transactionManager)
.<Ticket, Ticket>chunk(10)
.reader(ticketXmlItemReader)
.writer(list -> list.forEach(System.out::println))
.build();
}

/**
* Reader
*/
@Bean("ticketXmlItemReader")
public StaxEventItemReader<Ticket> itemReader() {
return new StaxEventItemReaderBuilder<Ticket>()
.name("ticketXmlItemReader")
.resource(new ClassPathResource("ticket.xml"))
.addFragmentRootElements("ticket")
.unmarshaller(ticketMarshaller)
.build();
}

/**
* 映射器
*/
@Bean("ticketMarshaller")
public XStreamMarshaller ticketMarshaller() {

Map<String, Class<Ticket>> aliases = new HashMap<>(1);
aliases.put("ticket", Ticket.class);

XStreamMarshaller marshaller = new XStreamMarshaller();
marshaller.setAliases(aliases);

return marshaller;
}

启动应用,控制台打印日志:

1
2
3
4
5
6
7
8
2022-06-02 14:15:48.444  INFO 87024 --- [restartedMain] o.s.b.c.l.support.SimpleJobLauncher      : Job: [FlowJob: [name=testXmlItemReaderJob]] launched with the following parameters: [{run.id=3}]
2022-06-02 14:15:48.503 INFO 87024 --- [restartedMain] o.s.batch.core.job.SimpleStepHandler : Executing step: [testXmlItemReaderStep]
始发站: 合肥; 到达站: 蚌埠; 票价: 60.00
始发站: 南京; 到达站: 蚌埠; 票价: 70.00
始发站: 上海; 到达站: 蚌埠; 票价: 220.00
始发站: 上海; 到达站: 杭州; 票价: 75.20
始发站: 上海; 到达站: 昆山; 票价: 19.00
2022-06-02 14:15:48.710 INFO 87024 --- [restartedMain] o.s.batch.core.step.AbstractStep : Step: [testXmlItemReaderStep] executed in 205ms
2.4 JsonItemReader-JSON 数据读取

文件 ticket.json

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
[
{
"departureStation": "合肥",
"arrivalStation": "蚌埠",
"price": "60.00"
},
{
"departureStation": "南京",
"arrivalStation": "蚌埠",
"price": "70.00"
},
{
"departureStation": "上海",
"arrivalStation": "蚌埠",
"price": "220.00"
},
{
"departureStation": "上海",
"arrivalStation": "杭州",
"price": "75.20"
},
{
"departureStation": "上海",
"arrivalStation": "昆山",
"price": "19.00"
}
]
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
/**
* Job
*/
@Bean
public Job testJsonItemReaderJob() {
return jobBuilderFactory.get("testJsonItemReaderJob")
.incrementer(new RunIdIncrementer())
.flow(testJsonItemReaderStep)
.end()
.build();
}

/**
* Step
*/
@Bean("testJsonItemReaderStep")
public Step testJsonItemReaderStep(PlatformTransactionManager transactionManager) {
return stepBuilderFactory.get("testJsonItemReaderStep")
.transactionManager(transactionManager)
.<Ticket, Ticket>chunk(10)
.reader(ticketJsonItemReader)
.writer(list -> list.forEach(System.out::println))
.build();
}

/**
* Reader
*/
@Bean("ticketJsonItemReader")
public JsonItemReader<Ticket> ticketJsonItemReader() {
return new JsonItemReaderBuilder<Ticket>()
.name("ticketJsonItemReader")
.jsonObjectReader(new JacksonJsonObjectReader<>(Ticket.class))
.resource(new ClassPathResource("ticket.json"))
.build();
}

启动应用,控制台打印日志:

1
2
3
4
5
6
7
8
2022-06-02 14:25:38.142  INFO 76544 --- [restartedMain] o.s.b.c.l.support.SimpleJobLauncher      : Job: [FlowJob: [name=testJsonItemReaderJob]] launched with the following parameters: [{run.id=2}]
2022-06-02 14:25:38.211 INFO 76544 --- [restartedMain] o.s.batch.core.job.SimpleStepHandler : Executing step: [testJsonItemReaderStep]
始发站: 合肥; 到达站: 蚌埠; 票价: 60.00
始发站: 南京; 到达站: 蚌埠; 票价: 70.00
始发站: 上海; 到达站: 蚌埠; 票价: 220.00
始发站: 上海; 到达站: 杭州; 票价: 75.20
始发站: 上海; 到达站: 昆山; 票价: 19.00
2022-06-02 14:25:38.328 INFO 76544 --- [restartedMain] o.s.batch.core.step.AbstractStep : Step: [testJsonItemReaderStep] executed in 116ms
2.5 MultiResourceItemReader-多文本数据读取

多文本数据读取,与文本数据读取的原理一致,只是在其基础上,做了一层代理。多文本数据读取,要求每个文本的数据结构相同,如从 ticket-1.cvsticket-2.cvs 两个文件中读取数据:

1
2
3
合肥,蚌埠,60.00
南京,蚌埠,70.00
上海,蚌埠,220.00
1
2
上海,杭州,75.20
上海,昆山,19.00
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
/**
* Job
*/
@Bean
public Job testMultiFileItemReaderJob() {
return jobBuilderFactory.get("testMultiFileItemReaderJob")
.incrementer(new RunIdIncrementer())
.flow(testMultiFileItemReaderStep)
.end()
.build();
}

/**
* Step
*/
@Bean("testMultiFileItemReaderStep")
public Step testMultiFileItemReaderStep1(PlatformTransactionManager transactionManager) {
return stepBuilderFactory.get("testMultiFileItemReaderStep")
.transactionManager(transactionManager)
.<Ticket, Ticket>chunk(10)
.reader(ticketMultiFileItemReader)
.writer(list -> list.forEach(System.out::println))
.build();
}

/**
* Proxy Reader
*/
@Bean("ticketMultiFileItemReader")
public MultiResourceItemReader<Ticket> ticketMultiFileItemReader() {

// 资源文件
Resource[] resources = new Resource[]{
new ClassPathResource("ticket-1.csv"),
new ClassPathResource("ticket-2.csv")};

return new MultiResourceItemReaderBuilder<Ticket>()
.name("ticketMultiFileItemReader")
.delegate(commonTicketFileItemReader())
.resources(resources)
.build();
}

/**
* Reader
*/
@Bean("commonTicketFileItemReader")
public FlatFileItemReader<Ticket> commonTicketFileItemReader() {
return new FlatFileItemReaderBuilder<Ticket>()
.name("commonTicketFileItemReader")
.delimited()
.names(new String[]{"departureStation", "arrivalStation", "price"})
.fieldSetMapper(new BeanWrapperFieldSetMapper<Ticket>() {{
setTargetType(Ticket.class);
}})
.build();
}

启动程序,控制台打印日志:

1
2
3
4
5
6
7
8
2022-06-02 14:37:49.693  INFO 86124 --- [restartedMain] o.s.b.c.l.support.SimpleJobLauncher      : Job: [FlowJob: [name=testMultiFileItemReaderJob]] launched with the following parameters: [{run.id=2}]
2022-06-02 14:37:49.785 INFO 86124 --- [restartedMain] o.s.batch.core.job.SimpleStepHandler : Executing step: [testMultiFileItemReaderStep]
始发站: 合肥; 到达站: 蚌埠; 票价: 60.00
始发站: 南京; 到达站: 蚌埠; 票价: 70.00
始发站: 上海; 到达站: 蚌埠; 票价: 220.00
始发站: 上海; 到达站: 杭州; 票价: 75.20
始发站: 上海; 到达站: 昆山; 票价: 19.00
2022-06-02 14:37:49.944 INFO 86124 --- [restartedMain] o.s.batch.core.step.AbstractStep : Step: [testMultiFileItemReaderStep] executed in 157ms

示例代码:spring-batch-demo


一、Spring Batch 监听器

在批处理过程中,需要对一些关键节点,如启动、结束、抛异常等,添加额外的处理。关注节点,需要借助 Spring Batch 监听器。Spring Batch 提供了两个维度的监听器:

Job 层面:

  • JobExecutionListener: 在 Job 执行之前(beforeJob)、之后(afterJob)触发;

Step 层面:

  • ChunkListener: 在 Chunk 执行之前(beforeChunk)、之后(afterChunk)和异常后(afterChunkError)触发;
  • StepExecutionListener: 在 Step 执行之前(beforeStep)、之后(afterStep)触发;
  • ItemReadListener: 在 Read 执行之前(beforeRead)、之后(afterRead)和异常时(onReadError)触发;
  • ItemProcessListener: 在 Process 执行之前(beforeProcess)、之后(afterProcess)和异常时(onProcessError)触发;
  • ItemWriteListener: 在 Write 执行之前(beforeWrite)、之后(afterWrite)和异常时(onWriteError)触发;

二、简单使用

将文件 ticket.cvs 中的内容,打印出来:

1
2
3
4
5
合肥,蚌埠,60.00
南京,蚌埠,70.00
上海,蚌埠,220.00
上海,杭州,75.20
上海,昆山,19.00

实体类:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
import lombok.Data;
import java.math.BigDecimal;

@Data
public class Ticket {

/**
* 始发站
*/
private String departureStation;

/**
* 到达站
*/
private String arrivalStation;

/**
* 票价
*/
private BigDecimal price;

@Override
public String toString() {
return String.format("始发站: %s; 到达站: %s; 票价: %s", departureStation, arrivalStation, price);
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
/**
* Job
*/
@Bean
public Job testListenerJob() {
return jobBuilderFactory.get("testListenerJob")
.incrementer(new RunIdIncrementer())
// job 监听器
.listener(testJobListener)
.flow(testListenerStep)
.end()
.build();
}

/**
* Step
*/
@Bean("testListenerStep")
public Step testListenerStep(PlatformTransactionManager transactionManager) {
return stepBuilderFactory.get("testListenerStep")
// step 监听器
.listener(testStepListener)
.transactionManager(transactionManager)
.<Ticket, Ticket>chunk(2)
.faultTolerant()
// chunk 监听器
.listener(testChunkListener)
.reader(ticketFileItemReader)
// read 监听器
.listener(testReadListener)
.processor(ticketItemProcessor)
// process 监听器
.listener(testProcessListener)
.writer(list -> list.forEach(System.out::println))
// write 监听器
.listener(testWriteListener)
.build();
}

/**
* Reader
*/
public FlatFileItemReader<Ticket> ticketFileItemReader() {
return new FlatFileItemReaderBuilder<Ticket>()
.name("ticketFileItemReader")
.resource(new ClassPathResource("ticket.csv"))
.delimited()
.names(new String[]{"departureStation", "arrivalStation", "price"})
.fieldSetMapper(new BeanWrapperFieldSetMapper<Ticket>() {{
setTargetType(Ticket.class);
}})
.build();
}

/**
* Processor
*/
@Component
public class TicketItemProcessor implements ItemProcessor<Ticket, Ticket> {

private static final Logger log = LoggerFactory.getLogger(TicketItemProcessor.class);

@Override
public Ticket process(final Ticket ticketSource) throws Exception {

final String departureStation = ticketSource.getDepartureStation();
final String arrivalStation = ticketSource.getArrivalStation();
final BigDecimal price = ticketSource.getPrice();

final Ticket ticketTarget = new Ticket();
ticketTarget.setDepartureStation(departureStation);
ticketTarget.setArrivalStation(arrivalStation);
ticketTarget.setPrice(price);

return ticketTarget;
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
/**
* Job Listener
*/
@Component
public class TestJobListener extends JobExecutionListenerSupport {

private static final Logger log = LoggerFactory.getLogger(TestJobListener.class);

@Override
public void beforeJob(JobExecution jobExecution) {
log.info("before job: {}", jobExecution.getJobInstance().getJobName());
}

@Override
public void afterJob(JobExecution jobExecution) {
log.info("after job: {}", jobExecution.getJobInstance().getJobName());
}
}

/**
* Chunk Listener
*/
@Component
public class TestChunkListener extends ChunkListenerSupport {

private static final Logger log = LoggerFactory.getLogger(TestChunkListener.class);

@Override
public void beforeChunk(ChunkContext context) {
log.info("before chunk: {}", context.getStepContext().getStepName());
}

@Override
public void afterChunk(ChunkContext context) {
log.info("after chunk: {}", context.getStepContext().getStepName());
}

@Override
public void afterChunkError(ChunkContext context) {
log.info("after chunk error: {}", context.getStepContext().getStepName());
}
}

/**
* Read Listener
*/
@Component
public class TestReadListener implements ItemReadListener<Ticket> {

private static final Logger log = LoggerFactory.getLogger(TestReadListener.class);

@Override
public void beforeRead() {
log.info("before read");
}

@Override
public void afterRead(Ticket item) {
log.info("after read: {}", item);
}

@Override
public void onReadError(Exception ex) {
log.info("read item error: {}", ex.getMessage(), ex);
}
}

/**
* Process Listener
*/
@Component
public class TestProcessListener implements ItemProcessListener<Ticket, Ticket> {

private static final Logger log = LoggerFactory.getLogger(TestProcessListener.class);

@Override
public void beforeProcess(Ticket item) {
log.info("before process: {}", item);

}

@Override
public void afterProcess(Ticket item, Ticket result) {
log.info("after process: {}", item);
}

@Override
public void onProcessError(Ticket item, Exception e) {
log.info("process: {} error: {}", item, e.getMessage(), e);
}
}

/**
* Write Listener
*/
@Component
public class TestWriteListener implements ItemWriteListener<Ticket> {

private static final Logger log = LoggerFactory.getLogger(TestWriteListener.class);

@Override
public void beforeWrite(List<? extends Ticket> items) {
log.info("before write: {}", items);
}

@Override
public void afterWrite(List<? extends Ticket> items) {
log.info("after write: {}", items);
}

@Override
public void onWriteError(Exception exception, List<? extends Ticket> items) {
log.info("write item error: {}", exception.getMessage(), exception);
}
}

启动应用,打印日志:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
2022-06-12 19:31:13.774  INFO 33680 --- [restartedMain] o.s.b.c.l.support.SimpleJobLauncher      : Job: [FlowJob: [name=testListenerJob]] launched with the following parameters: [{run.id=4}]
2022-06-12 19:31:13.820 INFO 33680 --- [restartedMain] c.e.s.c.listener.job.TestJobListener : before job: testListenerJob
2022-06-12 19:31:13.858 INFO 33680 --- [restartedMain] o.s.batch.core.job.SimpleStepHandler : Executing step: [testListenerStep]
2022-06-12 19:31:13.867 INFO 33680 --- [restartedMain] c.e.s.c.listener.step.TestStepListener : before step: testListenerStep
2022-06-12 19:31:13.889 INFO 33680 --- [restartedMain] c.e.s.c.l.chunk.TestChunkListener : before chunk: testListenerStep
2022-06-12 19:31:13.891 INFO 33680 --- [restartedMain] c.e.s.c.l.reader.TestReadListener : before read
2022-06-12 19:31:13.905 INFO 33680 --- [restartedMain] c.e.s.c.l.reader.TestReadListener : after read: 始发站: 合肥; 到达站: 蚌埠; 票价: 60.00
2022-06-12 19:31:13.906 INFO 33680 --- [restartedMain] c.e.s.c.l.reader.TestReadListener : before read
2022-06-12 19:31:13.907 INFO 33680 --- [restartedMain] c.e.s.c.l.reader.TestReadListener : after read: 始发站: 南京; 到达站: 蚌埠; 票价: 70.00
2022-06-12 19:31:13.911 INFO 33680 --- [restartedMain] c.e.s.c.l.processor.TestProcessListener : before process: 始发站: 合肥; 到达站: 蚌埠; 票价: 60.00
2022-06-12 19:31:13.912 INFO 33680 --- [restartedMain] c.e.s.c.l.processor.TestProcessListener : after process: 始发站: 合肥; 到达站: 蚌埠; 票价: 60.00
2022-06-12 19:31:13.912 INFO 33680 --- [restartedMain] c.e.s.c.l.processor.TestProcessListener : before process: 始发站: 南京; 到达站: 蚌埠; 票价: 70.00
2022-06-12 19:31:13.912 INFO 33680 --- [restartedMain] c.e.s.c.l.processor.TestProcessListener : after process: 始发站: 南京; 到达站: 蚌埠; 票价: 70.00
2022-06-12 19:31:13.913 INFO 33680 --- [restartedMain] c.e.s.c.l.writer.TestWriteListener : before write: [始发站: 合肥; 到达站: 蚌埠; 票价: 60.00, 始发站: 南京; 到达站: 蚌埠; 票价: 70.00]
始发站: 合肥; 到达站: 蚌埠; 票价: 60.00
始发站: 南京; 到达站: 蚌埠; 票价: 70.00
2022-06-12 19:31:13.914 INFO 33680 --- [restartedMain] c.e.s.c.l.writer.TestWriteListener : after write: [始发站: 合肥; 到达站: 蚌埠; 票价: 60.00, 始发站: 南京; 到达站: 蚌埠; 票价: 70.00]
2022-06-12 19:31:13.928 INFO 33680 --- [restartedMain] c.e.s.c.l.chunk.TestChunkListener : after chunk: testListenerStep
2022-06-12 19:31:13.929 INFO 33680 --- [restartedMain] c.e.s.c.l.chunk.TestChunkListener : before chunk: testListenerStep
2022-06-12 19:31:13.929 INFO 33680 --- [restartedMain] c.e.s.c.l.reader.TestReadListener : before read
2022-06-12 19:31:13.930 INFO 33680 --- [restartedMain] c.e.s.c.l.reader.TestReadListener : after read: 始发站: 上海; 到达站: 蚌埠; 票价: 220.00
2022-06-12 19:31:13.930 INFO 33680 --- [restartedMain] c.e.s.c.l.reader.TestReadListener : before read
2022-06-12 19:31:13.931 INFO 33680 --- [restartedMain] c.e.s.c.l.reader.TestReadListener : after read: 始发站: 上海; 到达站: 杭州; 票价: 75.20
2022-06-12 19:31:13.931 INFO 33680 --- [restartedMain] c.e.s.c.l.processor.TestProcessListener : before process: 始发站: 上海; 到达站: 蚌埠; 票价: 220.00
2022-06-12 19:31:13.931 INFO 33680 --- [restartedMain] c.e.s.c.l.processor.TestProcessListener : after process: 始发站: 上海; 到达站: 蚌埠; 票价: 220.00
2022-06-12 19:31:13.931 INFO 33680 --- [restartedMain] c.e.s.c.l.processor.TestProcessListener : before process: 始发站: 上海; 到达站: 杭州; 票价: 75.20
2022-06-12 19:31:13.932 INFO 33680 --- [restartedMain] c.e.s.c.l.processor.TestProcessListener : after process: 始发站: 上海; 到达站: 杭州; 票价: 75.20
2022-06-12 19:31:13.932 INFO 33680 --- [restartedMain] c.e.s.c.l.writer.TestWriteListener : before write: [始发站: 上海; 到达站: 蚌埠; 票价: 220.00, 始发站: 上海; 到达站: 杭州; 票价: 75.20]
始发站: 上海; 到达站: 蚌埠; 票价: 220.00
始发站: 上海; 到达站: 杭州; 票价: 75.20
2022-06-12 19:31:13.932 INFO 33680 --- [restartedMain] c.e.s.c.l.writer.TestWriteListener : after write: [始发站: 上海; 到达站: 蚌埠; 票价: 220.00, 始发站: 上海; 到达站: 杭州; 票价: 75.20]
2022-06-12 19:31:13.943 INFO 33680 --- [restartedMain] c.e.s.c.l.chunk.TestChunkListener : after chunk: testListenerStep
2022-06-12 19:31:13.944 INFO 33680 --- [restartedMain] c.e.s.c.l.chunk.TestChunkListener : before chunk: testListenerStep
2022-06-12 19:31:13.944 INFO 33680 --- [restartedMain] c.e.s.c.l.reader.TestReadListener : before read
2022-06-12 19:31:13.945 INFO 33680 --- [restartedMain] c.e.s.c.l.reader.TestReadListener : after read: 始发站: 上海; 到达站: 昆山; 票价: 19.00
2022-06-12 19:31:13.945 INFO 33680 --- [restartedMain] c.e.s.c.l.reader.TestReadListener : before read
2022-06-12 19:31:13.945 INFO 33680 --- [restartedMain] c.e.s.c.l.processor.TestProcessListener : before process: 始发站: 上海; 到达站: 昆山; 票价: 19.00
2022-06-12 19:31:13.945 INFO 33680 --- [restartedMain] c.e.s.c.l.processor.TestProcessListener : after process: 始发站: 上海; 到达站: 昆山; 票价: 19.00
2022-06-12 19:31:13.946 INFO 33680 --- [restartedMain] c.e.s.c.l.writer.TestWriteListener : before write: [始发站: 上海; 到达站: 昆山; 票价: 19.00]
始发站: 上海; 到达站: 昆山; 票价: 19.00
2022-06-12 19:31:13.946 INFO 33680 --- [restartedMain] c.e.s.c.l.writer.TestWriteListener : after write: [始发站: 上海; 到达站: 昆山; 票价: 19.00]
2022-06-12 19:31:13.959 INFO 33680 --- [restartedMain] c.e.s.c.l.chunk.TestChunkListener : after chunk: testListenerStep
2022-06-12 19:31:13.959 INFO 33680 --- [restartedMain] c.e.s.c.listener.step.TestStepListener : after step: testListenerStep
2022-06-12 19:31:13.962 INFO 33680 --- [restartedMain] o.s.batch.core.step.AbstractStep : Step: [testListenerStep] executed in 104ms
2022-06-12 19:31:13.978 INFO 33680 --- [restartedMain] c.e.s.c.listener.job.TestJobListener : after job: testListenerJob
2022-06-12 19:31:13.997 INFO 33680 --- [restartedMain] o.s.b.c.l.support.SimpleJobLauncher : Job: [FlowJob: [name=testListenerJob]] completed with the following parameters: [{run.id=4}] and the following status: [COMPLETED] in 178ms

从日志可以看出:

  • Job 、Step 监听器贯穿任务的始终;

  • 每一个 chunk 中,执行 2 次读、2 次处理、1 次写;

  • 每一次 read 过程,触发 beforeRead()、afterRead();
  • 每一次 process 过程,触发 beforeProcess()、afterProcess();
  • 每一次 write 过程,触发 beforeWrite()、afterWrite();

示例代码:spring-batch-demo


一、项目简介

上一节介绍了 Spring Batch 的基础架构和设计原理,本节将通过一个简单的批处理任务来学习如何使用 Spring Batch。

现在需要将逗号分隔值文件 sample-data.cvs 中的数据,按照姓氏、名称拆分,导入数据表 person 中。看图可知,BatchProcessJob 只有一个 Step,分为三个部分:解析 cvs 文件;将文件数据转化为 Person 对象;将 Person 对象信息导入数据表 batch-demo.person

二、项目搭建与配置

使用Spring 应用构建工具,即可创建项目。我的应用部署地址:spring-batch-demo

三、创建数据表

Spring Batch 的组件JobRepository, 专门负责与数据库打交道,记录整个批处理中的增加、检索、更新、删除动作。也就是说,Spring Batch 是依赖数据库进行管理的。数据表创建脚本可在仓库中找到。MySQL 数据库的脚本文件为:schema-mysql.sql。框架依赖表建好后,不要忘了创建表 batch-demo.person

1
2
3
4
5
6
7
USE batch-demo;
CREATE TABLE `person` (
`person_id` bigint(30) unsigned NOT NULL AUTO_INCREMENT,
`first_name` varchar(10) COLLATE utf8mb4_general_ci DEFAULT NULL,
`last_name` varchar(20) COLLATE utf8mb4_general_ci DEFAULT NULL,
PRIMARY KEY (`person_id`)
) ENGINE=InnoDB AUTO_INCREMENT=142 DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_general_ci COMMENT='人员信息表';

四、批处理任务配置

根据批处理框架的运作流程,我们做出如下配置:

4.1 cvs文件内容读取器:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
import com.example.springbatchdemo.entity.Person;
import org.springframework.batch.item.file.FlatFileItemReader;
import org.springframework.batch.item.file.builder.FlatFileItemReaderBuilder;
import org.springframework.batch.item.file.mapping.BeanWrapperFieldSetMapper;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.core.io.ClassPathResource;

@Configuration
public class CustomItemReader {

@Bean("personItemReader")
public FlatFileItemReader<Person> personItemReader() {
return new FlatFileItemReaderBuilder<Person>()
.name("personItemReader")
.resource(new ClassPathResource("sample-data.csv"))
.delimited()
.names(new String[]{"firstName", "lastName"})
.fieldSetMapper(new BeanWrapperFieldSetMapper<Person>() {{
setTargetType(Person.class);
}})
.build();
}
}

4.2 解析数据处理器:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
import com.example.springbatchdemo.entity.Person;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.batch.item.ItemProcessor;
import org.springframework.context.annotation.Configuration;

@Configuration
public class PersonItemProcessor implements ItemProcessor<Person, Person> {

private static final Logger log = LoggerFactory.getLogger(PersonItemProcessor.class);

@Override
public Person process(final Person person) throws Exception {

final String firstName = person.getFirstName();
final String lastName = person.getLastName();

final Person transformedPerson = new Person();
transformedPerson.setFirstName(firstName);
transformedPerson.setLastName(lastName);

log.info("Converting ({}) into ({})", person, transformedPerson);

return transformedPerson;
}
}

4.3 Person 对象写入器:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
import com.example.springbatchdemo.entity.Person;
import org.springframework.batch.item.database.BeanPropertyItemSqlParameterSourceProvider;
import org.springframework.batch.item.database.JdbcBatchItemWriter;
import org.springframework.batch.item.database.builder.JdbcBatchItemWriterBuilder;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import javax.sql.DataSource;

@Configuration
public class CustomItemWriter {

@Autowired
@Qualifier(value = "batchDemoDB")
private DataSource batchDemoDB;

@Bean("personItemWriter")
public JdbcBatchItemWriter<Person> personItemWriter() {

return new JdbcBatchItemWriterBuilder<Person>()
.itemSqlParameterSourceProvider(new BeanPropertyItemSqlParameterSourceProvider<>())
.sql("INSERT INTO person (first_name, last_name) VALUES (:firstName, :lastName)")
.dataSource(batchDemoDB)
.build();
}
}

4.4 MySQL 数据源配置:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
import com.zaxxer.hikari.HikariDataSource;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.boot.jdbc.DataSourceBuilder;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Primary;
import javax.sql.DataSource;

@Configuration
public class DataSourceConfig {

@Primary
@Bean(name = "batchDemoDB")
// 数据源配置参数识别前缀, 根据具体配置来设定
@ConfigurationProperties(prefix = "spring.datasource.batch-demo")
public DataSource druidDataSource() {
// 使用 SpringBoot 默认的数据源 HikariDataSource
return DataSourceBuilder.create().type(HikariDataSource.class).build();
}
}

4.5 Step 配置

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
import com.example.springbatchdemo.component.processor.PersonItemProcessor;
import com.example.springbatchdemo.entity.Person;
import org.springframework.batch.core.Step;
import org.springframework.batch.core.configuration.annotation.StepBuilderFactory;
import org.springframework.batch.item.database.JdbcBatchItemWriter;
import org.springframework.batch.item.file.FlatFileItemReader;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;

@Configuration
public class BatchProcessPersonStep {

@Autowired
public StepBuilderFactory stepBuilderFactory;

@Autowired
@Qualifier(value = "personItemReader")
private FlatFileItemReader<Person> personItemReader;

@Autowired
@Qualifier(value = "personItemWriter")
private JdbcBatchItemWriter<Person> personItemWriter;

@Autowired
private PersonItemProcessor personItemProcessor;

@Bean("batchProcessPersonStep1")
public Step step1() {
return stepBuilderFactory.get("step1")
.<Person, Person>chunk(10)
.reader(personItemReader)
.processor(personItemProcessor)
.writer(personItemWriter)
.build();
}
}

4.6 Job 配置

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
import com.example.springbatchdemo.component.listener.BatchProcessPersonCompletionListener;
import org.springframework.batch.core.Job;
import org.springframework.batch.core.Step;
import org.springframework.batch.core.configuration.annotation.EnableBatchProcessing;
import org.springframework.batch.core.configuration.annotation.JobBuilderFactory;
import org.springframework.batch.core.launch.support.RunIdIncrementer;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration
@EnableBatchProcessing
public class BatchProcessPersonJob {

@Autowired
public JobBuilderFactory jobBuilderFactory;

@Autowired
@Qualifier(value = "batchProcessPersonStep1")
private Step batchProcessPersonStep1;

@Autowired
private BatchProcessPersonCompletionListener batchProcessPersonCompletionListener;

@Bean
public Job importUserJob() {
return jobBuilderFactory.get("importUserJob")
.preventRestart()
.incrementer(new RunIdIncrementer())
.listener(batchProcessPersonCompletionListener)
.flow(batchProcessPersonStep1)
.end()
.build();
}
}

4.7 任务状态监听器

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
import com.example.springbatchdemo.entity.Person;
import com.example.springbatchdemo.mapper.PersonMapper;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.batch.core.BatchStatus;
import org.springframework.batch.core.JobExecution;
import org.springframework.batch.core.listener.JobExecutionListenerSupport;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import java.util.List;

@Component
public class BatchProcessPersonCompletionListener extends JobExecutionListenerSupport {

private static final Logger log = LoggerFactory.getLogger(BatchProcessPersonCompletionListener.class);

@Autowired
private PersonMapper personMapper;

@Override
public void afterJob(JobExecution jobExecution) {
if (BatchStatus.COMPLETED.equals(jobExecution.getStatus())) {
log.info("Job finished! Time to verify the results");

// spring-mybatis 查询所有的人员信息
List<Person> personList = personMapper.queryAll();
personList.forEach(person -> log.info("Found <{}> in the database.", person));
}
}
}

4.8 cvs 测试数据文件配置

五、执行批处理任务

运行 SpringBoot 项目,JobLauncher 自动发起任务 importUserJob。任务执行结果如下:

查看表 batch-demo.personcvs 文件内的测试数据已成功导入数据表!


一、简介

Spring Batch 是一款轻量级批处理框架,主要用于构建高容量、高性能的批处理应用。作为 Spring 的子项目,Spring Batch 基于 Spring 框架,已进化出一套完备的企业级解决方案。借助良好的 Spring 生态,被广泛应用于批处理领域。

Spring Batch 拥有强大的组件库,包括任务重启、任务跳过、任务统计、日志追踪、事务管理、资源管理等。此外,对于大批量数据处理任务,通过分区和优化技术,实现高性能作业。总之,Spring Batch 有着良好的可扩展性,既可以处理简单的任务,也可以处理复杂的、高容量的任务。

二、基础架构

看图可知:

  • 一个 Job 可以有一个或多个 Step
  • 每个 Step 都有一个 ItemReader、一个 ItemProcessor 和一个 ItemWriter
  • Job 需要 JobLauncher 发起;
  • 批处理过程中的元数据存在 JobRepository 中;

Job

Job 封装了整个批处理所需要的数据,可以通过 xmlJava Bean 注解配置。Job 的继承链:

JobInstanceJob 的运行实例,就像同一个 Java 类可以实例化出不同的对象, Job 也可以有不同的 JobInstance。因此可以说,JobInstance = Job + JobParametersJobExecutionJobInstance 的一次执行,包括要做什么、怎么做、执行结果等。


Step

看图可知,一个 Step 包含输入、处理、输出这样一种模型,说明在批处理框架中, Step 是最小的执行单元。Step 的继承链:

StepExecutionStep 的一次执行,包含 StepJobExecution 以及事务相关数据的引用,比如提交和回滚次数、开始和结束时间等。

ItemReaderItemProcessorItemWriter,是顶级接口。基于此,Spring Batch 已实现常用组件,如文件数据存取器、数据库数据存取器等,功能完备,开箱即用。


JobLauncher

JobLauncher 负责在指定的 JobParameters 下,启动 Job


JobRepository

JobRepository 专门负责与数据库打交道,记录整个批处理中的增加、检索、更新、删除动作。在 Java 应用中,使用注解 @EnableBatchProcessing 即可完成配置。此外,Spring Batch 是依赖数据库进行管理的。相关表的功能简介如下:

  • BATCH_JOB_INSTANCE:储存 JobInstance 相关的所有信息;
  • BATCH_JOB_EXECUTION_PARAMS: 储存 JobParameters 相关的所有信息;
  • BATCH_JOB_EXECUTION:储存 JobExecution 相关的所有信息;
  • BATCH_STEP_EXECUTION:存储 StepExecution 相关的所有信息;
  • BATCH_JOB_EXECUTION_CONTEXT:存储 Job - ExecutionContext 相关的所有信息;
  • BATCH_STEP_EXECUTION_CONTEXT:存储 Step - ExecutionContext 相关的所有信息;

三、设计原则

只需简单配置,Spring Batch 即可内嵌 Spring 应用中,小巧而强大。其设计原则如下:

  • 继承 Spring 编程模型,开发者只需专注于业务逻辑,将基础实现交由框架负责;

  • 解耦基础架构、执行环境和批处理应用之间的关注点;

  • 抽取核心服务,设计顶层接口;

  • 实现热门组件,开箱即用;

  • 增强核心服务的可拓展性;
  • 通过 Maven 构建简单的部署模型,独立于应用程序;

四、使用原则

开发者在构建批处理方案时,应考虑以下原则

  • 搭建架构和环境,尽量使用通用的构建块,因为批处理架构与在线框架之间会相互影响;

  • 避免在单机应用中构建复杂的逻辑结构;

  • 尽可能多地在内存中处理任务,尽可能少地使用系统资源,尤其是物理 IO
    • 缓存常用数据,避免在同一事务或不同事务中重复读取数据;
    • 全表扫描或索引扫描;
  • 不做重复的任务(记录已处理的任务,对于相同的后续任务,直接跳过);
  • 合理分配初始内存,防止处理任务中多次分配内存,耗费时间;
  • 设定足够的校验和记录,保证数据的完整性;
  • 模拟生产环境和数据量,尽早压测;
  • 注重数据备份;

五、总结

Spring Batch 是一款优秀的批处理框架,其良好的可扩展性和性能天花板,让批处理工作不再头疼。

一方面,Spring Batch 可以完全解耦批处理任务。原本复杂且庞大的一条龙任务,现在可以拆解为若干个 Step,各司其职。同时,每个 Step 都有自己的输入、处理、输出模型,高度规范,高度内聚,超级简单。

另一方面,Spring Batch 可以让项目设计更加科学合理。首先,任务拆解更加细致,工作量预估更加准确。其次,各个环节清晰明了,降低沟通成本。最后,也是最重要的,开发者不用在一个任务中从头磕到尾,头晕眼花,bug 爆炸。


一、MySQL 批处理介绍

执行多条增、删、改语句,mysql-connector-java 支持两种模式:

  • 串行化语句,一条一条发送;
  • 打包语句,分批次发送;

批处理模式,即按照包容量算法,将语句分批打包,发送到数据库服务器,旨在提升大批量语句的执行性能。在数据库连接参数 jdbc-url 后追加 rewriteBatchedStatements=true 即可完成配置,如:

1
spring.datasource.batch-demo.jdbc-url=jdbc:mysql://127.0.0.1:3306/batch-demo?rewriteBatchedStatements=true

二、MySQL 批处理基础实现

以下源码参照 ClientPreparedStatement.javamysql-connector-java 8.0.28 版本。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
@Override
protected long[] executeBatchInternal() throws SQLException {
synchronized (checkClosed().getConnectionMutex()) {
// 不能是只读连接
if (this.connection.isReadOnly()) {
throw new SQLException(Messages.getString("PreparedStatement.25") + Messages.getString("PreparedStatement.26"), MysqlErrorNumbers.SQL_STATE_ILLEGAL_ARGUMENT);
}

// 批量语句大小必须大于 0
if (this.query.getBatchedArgs() == null || this.query.getBatchedArgs().size() == 0) {
return new long[0];
}

// we timeout the entire batch, not individual statements
int batchTimeout = getTimeoutInMillis();
setTimeoutInMillis(0);

resetCancelledState();

try {
statementBegins();

clearWarnings();

// 1. 没有包含原始 sql 并且支持批量重写
// batchHasPlainStatements 包含原始 sql
// rewriteBatchedStatements 支持批量重写
if (!this.batchHasPlainStatements && this.rewriteBatchedStatements.getValue()) {
// 1.1 批量插入语句,支持多值重写
if (getParseInfo().canRewriteAsMultiValueInsertAtSqlLevel()) {
// 执行批量 insert
return executeBatchedInserts(batchTimeout);
}

// 1.2 删、改操作, 没有包含原始 sql 并且批次语句数量大于 3
if (!this.batchHasPlainStatements && this.query.getBatchedArgs() != null
&& this.query.getBatchedArgs().size() > 3) {
// 执行批量 delete 或 update
return executePreparedBatchAsMultiStatement(batchTimeout);
}
}

// 2. 串行执行语句
return executeBatchSerially(batchTimeout);
} finally {
this.query.getStatementExecuting().set(false);

clearBatch();
}
}
}

从源码实现可知,MySQL 的批处理操作,要求语句不包含原生 sql,且数据连接支持批量重写。因为 insert 语句的批量重写规则(多值拼接)与 deleteupdate 语句(英文分号拼接)不同 ,因此独立出 insert 批处理过程。

原生 sql 的简单示例:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
PreparedStatement preparedStatement = connection.prepareStatement("");
for (int i = 0; i < 10; i++) {
StringBuilder sql = new StringBuilder();
sql.append("INSERT INTO extenal_studentcj(grade,clazz,zkzh,NAME,scoretext,times) VALUES(");
sql.append("'").append(i).append("',");
sql.append("'").append(i).append("',");
sql.append("'").append(i).append("',");
sql.append("'").append(i).append("',");
sql.append("'").append(i).append("',");
sql.append("'").append(i).append("'");
sql.append(");");
pst.addBatch(sql.toString());
}

preparedStatement.executeBatch();
2.1 Insert 批处理

基础实现:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
protected long[] executeBatchedInserts(int batchTimeout) throws SQLException {
synchronized (checkClosed().getConnectionMutex()) {
// 获取语句值模板; 如 (?, ?, ?)
String valuesClause = getParseInfo().getValuesClause();

JdbcConnection locallyScopedConn = this.connection;

if (valuesClause == null) {
return executeBatchSerially(batchTimeout);
}

// insert 语句总数
int numBatchedArgs = this.query.getBatchedArgs().size();

if (this.retrieveGeneratedKeys) {
this.batchedGeneratedKeys = new ArrayList<>(numBatchedArgs);
}

// 计算每个批次的语句数量
int numValuesPerBatch = ((PreparedQuery<?>) this.query).computeBatchSize(numBatchedArgs);

if (numBatchedArgs < numValuesPerBatch) {
numValuesPerBatch = numBatchedArgs;
}

JdbcPreparedStatement batchedStatement = null;

int batchedParamIndex = 1;
long updateCountRunningTotal = 0;
int numberToExecuteAsMultiValue = 0;
int batchCounter = 0;
CancelQueryTask timeoutTask = null;
SQLException sqlEx = null;

long[] updateCounts = new long[numBatchedArgs];

try {
try {
// 构建批量 insert 预编译语句
batchedStatement = prepareBatchedInsertSQL(locallyScopedConn, numValuesPerBatch);

timeoutTask = startQueryTimer(batchedStatement, batchTimeout);

// 计算需要批量 insert 的次数
numberToExecuteAsMultiValue = numBatchedArgs < numValuesPerBatch ? numBatchedArgs : numBatchedArgs / numValuesPerBatch;

// 计算 batchedStatement 需要 insert 的语句总量
int numberArgsToExecute = numberToExecuteAsMultiValue * numValuesPerBatch;

// 循环填补 insert 值
for (int i = 0; i < numberArgsToExecute; i++) {
// 填补完一个批次, 发起执行, 然后再填补下一个批次
if (i != 0 && i % numValuesPerBatch == 0) {
try {
updateCountRunningTotal += batchedStatement.executeLargeUpdate();
} catch (SQLException ex) {
sqlEx = handleExceptionForBatch(batchCounter - 1, numValuesPerBatch, updateCounts, ex);
}

getBatchedGeneratedKeys(batchedStatement);
batchedStatement.clearParameters();
batchedParamIndex = 1;
}

batchedParamIndex = setOneBatchedParameterSet(batchedStatement, batchedParamIndex, this.query.getBatchedArgs().get(batchCounter++));
}

try {
updateCountRunningTotal += batchedStatement.executeLargeUpdate();
} catch (SQLException ex) {
sqlEx = handleExceptionForBatch(batchCounter - 1, numValuesPerBatch, updateCounts, ex);
}

getBatchedGeneratedKeys(batchedStatement);

numValuesPerBatch = numBatchedArgs - batchCounter;
} finally {
if (batchedStatement != null) {
batchedStatement.close();
batchedStatement = null;
}
}

// 如果按照以上批次执行, 仍有未执行的语句, 则在这里统一执行
try {
if (numValuesPerBatch > 0) {
batchedStatement = prepareBatchedInsertSQL(locallyScopedConn, numValuesPerBatch);

if (timeoutTask != null) {
timeoutTask.setQueryToCancel(batchedStatement);
}

batchedParamIndex = 1;

while (batchCounter < numBatchedArgs) {
batchedParamIndex = setOneBatchedParameterSet(batchedStatement, batchedParamIndex, this.query.getBatchedArgs().get(batchCounter++));
}

try {
updateCountRunningTotal += batchedStatement.executeLargeUpdate();
} catch (SQLException ex) {
sqlEx = handleExceptionForBatch(batchCounter - 1, numValuesPerBatch, updateCounts, ex);
}

getBatchedGeneratedKeys(batchedStatement);
}

if (sqlEx != null) {
throw SQLError.createBatchUpdateException(sqlEx, updateCounts, this.exceptionInterceptor);
}

if (numBatchedArgs > 1) {
long updCount = updateCountRunningTotal > 0 ? java.sql.Statement.SUCCESS_NO_INFO : 0;
for (int j = 0; j < numBatchedArgs; j++) {
updateCounts[j] = updCount;
}
} else {
updateCounts[0] = updateCountRunningTotal;
}
return updateCounts;
} finally {
if (batchedStatement != null) {
batchedStatement.close();
}
}
} finally {
stopQueryTimer(timeoutTask, false, false);
resetCancelledState();
}
}
}

从源码实现可知,insert 语句批处理,首先会按照一定的批次大小处理语句,剩余不够一个批次执行的,最后会交给保底任务执行。以插入10万条学生信息数据为例,具体的处理流程如下:

2.2 Delete、Update 批处理

基础实现:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
protected long[] executePreparedBatchAsMultiStatement(int batchTimeout) throws SQLException {
synchronized (checkClosed().getConnectionMutex()) {
// This is kind of an abuse, but it gets the job done
if (this.batchedValuesClause == null) {
this.batchedValuesClause = ((PreparedQuery<?>) this.query).getOriginalSql() + ";";
}

JdbcConnection locallyScopedConn = this.connection;

boolean multiQueriesEnabled = locallyScopedConn.getPropertySet().getBooleanProperty(PropertyKey.allowMultiQueries).getValue();
CancelQueryTask timeoutTask = null;

try {
clearWarnings();

// 语句总数
int numBatchedArgs = this.query.getBatchedArgs().size();

if (this.retrieveGeneratedKeys) {
this.batchedGeneratedKeys = new ArrayList<>(numBatchedArgs);
}

// 计算每个批次的语句数量
int numValuesPerBatch = ((PreparedQuery<?>) this.query).computeBatchSize(numBatchedArgs);

if (numBatchedArgs < numValuesPerBatch) {
numValuesPerBatch = numBatchedArgs;
}

java.sql.PreparedStatement batchedStatement = null;

int batchedParamIndex = 1;
int numberToExecuteAsMultiValue = 0;
int batchCounter = 0;
int updateCountCounter = 0;
long[] updateCounts = new long[numBatchedArgs * getParseInfo().getNumberOfQueries()];
SQLException sqlEx = null;

try {
if (!multiQueriesEnabled) {
((NativeSession) locallyScopedConn.getSession()).enableMultiQueries();
}

// 构建批处理语句
batchedStatement = this.retrieveGeneratedKeys
? ((Wrapper) locallyScopedConn.prepareStatement(generateMultiStatementForBatch(numValuesPerBatch), RETURN_GENERATED_KEYS)).unwrap(java.sql.PreparedStatement.class)
: ((Wrapper) locallyScopedConn.prepareStatement(generateMultiStatementForBatch(numValuesPerBatch))).unwrap(java.sql.PreparedStatement.class);

timeoutTask = startQueryTimer((StatementImpl) batchedStatement, batchTimeout);

// 计算批处理次数
numberToExecuteAsMultiValue = numBatchedArgs < numValuesPerBatch ? numBatchedArgs : numBatchedArgs / numValuesPerBatch;

// 计算 batchedStatement 需要执行的语句数量
int numberArgsToExecute = numberToExecuteAsMultiValue * numValuesPerBatch;

// 循环填补语句值
for (int i = 0; i < numberArgsToExecute; i++) {
// 填补完一个批次, 发起执行, 然后再填补下一个批次
if (i != 0 && i % numValuesPerBatch == 0) {
try {
batchedStatement.execute();
} catch (SQLException ex) {
sqlEx = handleExceptionForBatch(batchCounter, numValuesPerBatch, updateCounts, ex);
}

updateCountCounter = processMultiCountsAndKeys((StatementImpl) batchedStatement, updateCountCounter, updateCounts);

batchedStatement.clearParameters();
batchedParamIndex = 1;
}

batchedParamIndex = setOneBatchedParameterSet(batchedStatement, batchedParamIndex, this.query.getBatchedArgs().get(batchCounter++));
}

try {
batchedStatement.execute();
} catch (SQLException ex) {
sqlEx = handleExceptionForBatch(batchCounter - 1, numValuesPerBatch, updateCounts, ex);
}

updateCountCounter = processMultiCountsAndKeys((StatementImpl) batchedStatement, updateCountCounter, updateCounts);

batchedStatement.clearParameters();

numValuesPerBatch = numBatchedArgs - batchCounter;

if (timeoutTask != null) {
// we need to check the cancel state now because we loose if after the following batchedStatement.close()
((JdbcPreparedStatement) batchedStatement).checkCancelTimeout();
}
} finally {
if (batchedStatement != null) {
batchedStatement.close();
batchedStatement = null;
}
}

// 如果按照以上批次执行, 仍有未执行的语句, 则在这里统一执行
try {
if (numValuesPerBatch > 0) {

batchedStatement = this.retrieveGeneratedKeys
? locallyScopedConn.prepareStatement(generateMultiStatementForBatch(numValuesPerBatch), RETURN_GENERATED_KEYS)
: locallyScopedConn.prepareStatement(generateMultiStatementForBatch(numValuesPerBatch));

if (timeoutTask != null) {
timeoutTask.setQueryToCancel((Query) batchedStatement);
}

batchedParamIndex = 1;

while (batchCounter < numBatchedArgs) {
batchedParamIndex = setOneBatchedParameterSet(batchedStatement, batchedParamIndex, this.query.getBatchedArgs().get(batchCounter++));
}

try {
batchedStatement.execute();
} catch (SQLException ex) {
sqlEx = handleExceptionForBatch(batchCounter - 1, numValuesPerBatch, updateCounts, ex);
}

updateCountCounter = processMultiCountsAndKeys((StatementImpl) batchedStatement, updateCountCounter, updateCounts);

batchedStatement.clearParameters();
}

if (timeoutTask != null) {
stopQueryTimer(timeoutTask, true, true);
timeoutTask = null;
}

if (sqlEx != null) {
throw SQLError.createBatchUpdateException(sqlEx, updateCounts, this.exceptionInterceptor);
}

return updateCounts;
} finally {
if (batchedStatement != null) {
batchedStatement.close();
}
}
} finally {
stopQueryTimer(timeoutTask, false, false);
resetCancelledState();

if (!multiQueriesEnabled) {
((NativeSession) locallyScopedConn.getSession()).disableMultiQueries();
}

clearBatch();
}
}
}

deleteupdate 语句的批处理方案,与 insert 类似,区别在于 deleteupdate 语句是用英文分号拼接起来的,而 insert 语句是将 values 拼接起来的。以更新10万条学生信息数据为例,具体的处理流程如下:

三、MySQL 批处理性能测试

3.1 批量插入 100000 条学生信息:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
public void batchInsert() {

long start = System.currentTimeMillis();

Connection connection = null;

String sqlTemplate = "INSERT INTO student_target(student_id, name, address) VALUES (?, ?, ?)";

try {
// 查询10万条数据
List<Student> studentList = studentMapper.queryAll();

// 获取数据连接
connection = batchDemoDB.getConnection();

connection.setAutoCommit(false);

PreparedStatement preparedStatement = connection.prepareStatement(sqlTemplate);
for (Student student : studentList) {
preparedStatement.setLong(1, student.getStudentId());
preparedStatement.setString(2, student.getName());
preparedStatement.setString(3, student.getAddress());
preparedStatement.addBatch();
}

preparedStatement.executeBatch();

connection.commit();
} catch (Exception e) {
try {
connection.rollback();
} catch (Exception e2) {
LOGGER.error("transaction rollback failed: {}", e2.getMessage(), e2);
}
LOGGER.error("batch insert student info error: {}", e.getMessage(), e);
} finally {
try {
connection.close();
} catch (Exception e) {
LOGGER.error("connection close error: {}", e.getMessage(), e);
}
}

LOGGER.info("插入100000条数据, 耗时: {} ms", System.currentTimeMillis() - start);
}
  • rewriteBatchedStatements=false

    1
    插入100000条数据, 耗时: 18652 ms
  • rewriteBatchedStatements=true

    1
    插入100000条数据, 耗时: 2404 ms
3.2 批量更新 100000 条学生信息
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
public void batchUpdate() {

long start = System.currentTimeMillis();

Connection connection = null;

String sqlTemplate = "UPDATE student_target set address = ? WHERE student_id = ?";

try {
// 查询10万条数据
List<Student> studentList = studentMapper.queryAll();

// 获取数据连接
connection = batchDemoDB.getConnection();

connection.setAutoCommit(false);

PreparedStatement preparedStatement = connection.prepareStatement(sqlTemplate);
for (Student student : studentList) {
preparedStatement.setString(1, student.getName() + "_呵呵");
preparedStatement.setLong(2, student.getStudentId());
preparedStatement.addBatch();
}

preparedStatement.executeBatch();

connection.commit();
} catch (Exception e) {
try {
connection.rollback();
} catch (Exception e2) {
LOGGER.error("transaction rollback failed: {}", e2.getMessage(), e2);
}
LOGGER.error("batch update student info error: {}", e.getMessage(), e);
} finally {
try {
connection.close();
} catch (Exception e) {
LOGGER.error("connection close error: {}", e.getMessage(), e);
}
}

LOGGER.info("更新100000条数据, 耗时: {} ms", System.currentTimeMillis() - start);
}
  • rewriteBatchedStatements=false

    1
    更新100000条数据, 耗时: 25579 ms
  • rewriteBatchedStatements=true

    1
    更新100000条数据, 耗时: 10122 ms
3.3 批量删除 100000 条学生信息
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
public void batchDelete() {

long start = System.currentTimeMillis();

Connection connection = null;

String sqlTemplate = "DELETE FROM student_target WHERE student_id = ?";

try {
// 查询10万条数据
List<Student> studentList = studentMapper.queryAll();

// 获取数据连接
connection = batchDemoDB.getConnection();

connection.setAutoCommit(false);

PreparedStatement preparedStatement = connection.prepareStatement(sqlTemplate);
for (Student student : studentList) {
preparedStatement.setLong(1, student.getStudentId());
preparedStatement.addBatch();
}

preparedStatement.executeBatch();

connection.commit();
} catch (Exception e) {
try {
connection.rollback();
} catch (Exception e2) {
LOGGER.error("transaction rollback failed: {}", e2.getMessage(), e2);
}
LOGGER.error("batch delete student info error: {}", e.getMessage(), e);
} finally {
try {
connection.close();
} catch (Exception e) {
LOGGER.error("connection close error: {}", e.getMessage(), e);
}
}

LOGGER.info("删除100000条数据, 耗时: {} ms", System.currentTimeMillis() - start);
}
  • rewriteBatchedStatements=false

    1
    删除100000条数据, 耗时: 20817 ms
  • rewriteBatchedStatements=true

    1
    删除100000条数据, 耗时: 12053 ms


一、 Spring 线程池

Spring 自带的线程池 ThreadPoolTaskExecutor ,本质上是对 ThreadPoolExecutor 的包装。一方面,基于 SpringBoot 的项目可以通过 yamlproperties 文件快速配置线程池,并借助 @RefreshScope 实现线程池的热部署;另一方面,通过@EnableAsync@Async配置,可以方便地执行异步并发任务,无需编写异步调用代码。 凭借 Spring 生态圈,ThreadPoolTaskExecutor 因其良好的设计感和兼容性而被广泛使用。

二、 Spring 线程池死锁

jdk 线程池一样,Spring 线程池同样潜藏着一些坑,比如死锁。不得不说这是一个大坑。

首先,在业务上难以理解。线程池是面向 Task 的工具,无非就做三件事:创建并管理线程、执行任务调度、存放待办任务。对于 Task 来说,抢占到资源的就执行,抢占不到的,就按照拒绝策略处理。不同的 Task 怎么会互相等待呢?

其次,在技术上难以定位故障。线程池处于死锁状态,导致主线程走着走着就“失踪了”。重启系统后可以正常工作,一旦压测大概率又会死掉。没有错误日志,没有 cpu 飙升,没有 FullGC。从表象上看,跟并发有关系,而且属于偶发故障。我们把 dump 拉下来分析一下:

1
2
3
4
5
6
7
http-nio-8080-exec-32" daemon prio=5 tid=1676 WAITING
at sun.misc.Unsafe.park(Native Method)
at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
at java.util.concurrent.FutureTask.awaitDone(FutureTask.java:429)
Local Variable: java.util.concurrent.FutureTask$WaitNode#4
at java.util.concurrent.FutureTask.get(FutureTask.java:191)
Local Variable: java.util.concurrent.FutureTask#4

线程(1676) 目前处于 WAITING 状态,原因是 FutureTask 在等待 cpu 的宠幸。但在压测场景下,任务抢占线程资源实属正常,所以此时还没有朝着死锁上面想。

最后,冒出一个念头:如果 Task 嵌套 Task,并且共用同一个线程池,那么在并发争夺资源的场景下,就有可能出现外部 Task 等待内部 Task、内部 Task 等待外部 Task 的情况,从而导致整个线程池死锁。

三、场景模拟

为了更好地理解,我们用打工人进站乘地铁这件事来模拟 Spring 线程池的工作机制。

假设有 5 个常开闸机(核心线程数),5 个应急闸机(5+5=最大线程数),进站围栏可以排 100 人(阻塞队列大小)。

约定进站规则:

  • 必须有票;
  • 必须按照先来后到的顺序进站;
  • 一旦走到闸机口或者进入围栏,就不可再撤退;
  • 先使用 5 个常开闸机进站;
  • 5 个常开闸机若都有乘客进入,则安排后来者在进站围栏里排队等待;
  • 若进站围栏已经站满了乘客,则开启另外 5 个应急闸机(一个个开启);

为了模拟 Task 任务嵌套的场景,假设所有打工人都有一个同伴,而且都是走到闸机口才发现忘记买票,只能安排同伴去买票。这里:

  • 外层 Task:打工人进闸机;
  • 内层 Task:打工人的同伴出去买票;

假设张三、李四等人走到闸机口时才发现忘了买票,于是便可能发生如下对话:

  • 张三:“怎么出去买个票要这么久?”

  • 张三的同伴:“我在排队呢!闸机门口的人别挡着路!赶紧进去!”

  • 李四:“这边的人都忘了买票啦!伙伴们不把票带过来,我们也进不去啊!”

  • 熊二的同伴:“你们不进去,我们就没法向前走啊!”

  • 郑十:“你们都别吵了,我的同伴还在买票的路上呢!”

大家吵成一团,场面十分焦灼,但违反规定就会被拉去枪毙,于是这波打工人都堵在闸机口,一个能进站的都没有。

四、代码模拟

定义线程池:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;

/**
* @author zourongsheng
*/
@Configuration
public class ExecutorConfig {

public static final String TASK_EXECUTOR = "taskExecutor";

/**
* @return 任务线程池
*/
@Bean(TASK_EXECUTOR)
public ThreadPoolTaskExecutor taskExecutor() {
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
// 核心线程数
executor.setCorePoolSize(5);
// 最大线程数
executor.setMaxPoolSize(10);
// 阻塞队列大小
executor.setQueueCapacity(100);
// 线程最大空闲时间
executor.setKeepAliveSeconds(60);
// 线程名称前缀
executor.setThreadNamePrefix("common-sync-executor-");
// 拒绝策略
executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
executor.setWaitForTasksToCompleteOnShutdown(true);
executor.setAwaitTerminationSeconds(30);
return executor;
}
}

测试方法:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
/**
* @author zourongsheng
* @date 2021/12/23 14:33
*/
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import javax.annotation.Resource;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;

import java.util.ArrayList;
import java.util.List;
import java.util.Random;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;

import org.junit.runner.RunWith;
import org.springframework.boot.test.context.SpringBootTest;

import static ExecutorConfig.TASK_EXECUTOR;

@RunWith(SpringJUnit4ClassRunner.class)
@SpringBootTest(classes = ServiceInitializer.class)
public class TaskTest {

private static Logger LOGGER = LoggerFactory.getLogger(TaskTest.class);

@Resource(name = TASK_EXECUTOR)
private ThreadPoolTaskExecutor taskExecutor;

@Test
public void test() {
// 初始化打工人的姓名
int workerCount = 500;
List<String> workerNameList = new ArrayList<>(workerCount);
for (int i = 1; i <= workerCount; i++) {
workerNameList.add(String.format("%s号打工人", i));
}

Random random = new Random();

// 打工人进站
final List<CompletableFuture<Void>> completableFutures = workerNameList
.stream()
.map(workerName -> CompletableFuture.runAsync(() -> {
// 打印线程池的运行信息
this.printThreadPoolTaskExecutorInfo(workerName);

// 外层Task: 打工人进站
int passGateMillSecond = random.nextInt(500);
try {
TimeUnit.MILLISECONDS.sleep(passGateMillSecond);
} catch (InterruptedException e) {
e.printStackTrace();
}

int buyTicketMillSecond = random.nextInt(500);
LOGGER.info("{}走到闸机口, 发现未买票, 安排同伴去买票, 需要{}毫秒", workerName, buyTicketMillSecond);

// 内层Task: 打工人的同伴出来买票
try {
taskExecutor.submit(() -> {
LOGGER.info("{}的同伴出来买票了", workerName);
try {
TimeUnit.MILLISECONDS.sleep(buyTicketMillSecond);
} catch (InterruptedException e) {
e.printStackTrace();
}
return true;
}).get();
} catch (InterruptedException | ExecutionException e) {
LOGGER.error("{}的同伴出来买票失败:{}", workerName, e.getMessage(), e);
}
LOGGER.info("{}与同伴成功进站", workerName);
}, taskExecutor).exceptionally(e -> {
LOGGER.error("{}进站失败: {}", workerName, e.getMessage(), e);
return null;
}))
.collect(Collectors.toList());
// 等待所有打工人进站
completableFutures.stream().map(CompletableFuture::join).collect(Collectors.toList());

LOGGER.info("打工人都进站了");
}

/**
* 【打印线程池运行信息】
*/
private void printThreadPoolTaskExecutorInfo(String workName) {
LOGGER.info("进站人:{}", workName);
LOGGER.info("核心线程数:{}", taskExecutor.getCorePoolSize());
LOGGER.info("线程池大小:{}", taskExecutor.getPoolSize());
LOGGER.info("活跃线程数:{}", taskExecutor.getActiveCount());
LOGGER.info("线程保持时间(秒):{}", taskExecutor.getKeepAliveSeconds());
LOGGER.info("线程池最大数量:{}", taskExecutor.getMaxPoolSize());
LOGGER.info("线程池等待的任务数量: {}", taskExecutor.getThreadPoolExecutor().getQueue().size());
LOGGER.info("线程池已完成任务数量: {}", taskExecutor.getThreadPoolExecutor().getCompletedTaskCount());
}
}

运行起来,日志打印:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
13:02:46.354 [common-sync-executor-1] INFO TaskTest - 进站人:1号打工人
13:02:46.355 [common-sync-executor-1] INFO TaskTest - 核心线程数:5
13:02:46.355 [common-sync-executor-1] INFO TaskTest - 线程池大小:3
13:02:46.355 [common-sync-executor-1] INFO TaskTest - 活跃线程数:3
13:02:46.355 [common-sync-executor-2] INFO TaskTest - 进站人:2号打工人
13:02:46.355 [common-sync-executor-1] INFO TaskTest - 线程保持时间(秒):60
13:02:46.355 [common-sync-executor-2] INFO TaskTest - 核心线程数:5
13:02:46.355 [common-sync-executor-1] INFO TaskTest - 线程池最大数量:10
13:02:46.355 [common-sync-executor-2] INFO TaskTest - 线程池大小:3
13:02:46.355 [common-sync-executor-1] INFO TaskTest - 线程池等待的任务数量: 0
13:02:46.355 [common-sync-executor-2] INFO TaskTest - 活跃线程数:3
13:02:46.355 [common-sync-executor-2] INFO TaskTest - 线程保持时间(秒):60
13:02:46.355 [common-sync-executor-1] INFO TaskTest - 线程池已完成任务数量: 0
13:02:46.355 [common-sync-executor-2] INFO TaskTest - 线程池最大数量:10
13:02:46.355 [common-sync-executor-2] INFO TaskTest - 线程池等待的任务数量: 0
13:02:46.355 [common-sync-executor-2] INFO TaskTest - 线程池已完成任务数量: 0
13:02:46.355 [common-sync-executor-1] INFO TaskTest - 1号打工人走到闸机口, 发现未买票, 安排同伴去买票, 需要275毫秒
13:02:46.355 [common-sync-executor-3] INFO TaskTest - 进站人:3号打工人
13:02:46.356 [common-sync-executor-3] INFO TaskTest - 核心线程数:5
13:02:46.356 [common-sync-executor-3] INFO TaskTest - 线程池大小:4
13:02:46.356 [common-sync-executor-3] INFO TaskTest - 活跃线程数:4
13:02:46.356 [common-sync-executor-3] INFO TaskTest - 线程保持时间(秒):60
13:02:46.356 [common-sync-executor-3] INFO TaskTest - 线程池最大数量:10
13:02:46.356 [common-sync-executor-3] INFO TaskTest - 线程池等待的任务数量: 0
13:02:46.356 [common-sync-executor-3] INFO TaskTest - 线程池已完成任务数量: 0
13:02:46.356 [common-sync-executor-4] INFO TaskTest - 进站人:4号打工人
13:02:46.356 [common-sync-executor-4] INFO TaskTest - 核心线程数:5
13:02:46.356 [common-sync-executor-4] INFO TaskTest - 线程池大小:5
13:02:46.356 [common-sync-executor-4] INFO TaskTest - 活跃线程数:5
13:02:46.356 [common-sync-executor-4] INFO TaskTest - 线程保持时间(秒):60
13:02:46.356 [common-sync-executor-4] INFO TaskTest - 线程池最大数量:10
13:02:46.356 [common-sync-executor-4] INFO TaskTest - 线程池等待的任务数量: 0
13:02:46.356 [common-sync-executor-4] INFO TaskTest - 线程池已完成任务数量: 0
13:02:46.356 [common-sync-executor-5] INFO TaskTest - 进站人:5号打工人
13:02:46.356 [common-sync-executor-5] INFO TaskTest - 核心线程数:5
13:02:46.357 [common-sync-executor-5] INFO TaskTest - 线程池大小:5
13:02:46.357 [common-sync-executor-5] INFO TaskTest - 活跃线程数:5
13:02:46.357 [common-sync-executor-5] INFO TaskTest - 线程保持时间(秒):60
13:02:46.357 [common-sync-executor-5] INFO TaskTest - 线程池最大数量:10
13:02:46.357 [common-sync-executor-5] INFO TaskTest - 线程池等待的任务数量: 100
13:02:46.357 [common-sync-executor-5] INFO TaskTest - 线程池已完成任务数量: 0
13:02:46.358 [common-sync-executor-6] INFO TaskTest - 进站人:106号打工人
13:02:46.358 [common-sync-executor-6] INFO TaskTest - 核心线程数:5
13:02:46.358 [common-sync-executor-6] INFO TaskTest - 线程池大小:8
13:02:46.358 [common-sync-executor-6] INFO TaskTest - 活跃线程数:8
13:02:46.358 [common-sync-executor-7] INFO TaskTest - 1号打工人的同伴出来买票了
13:02:46.358 [common-sync-executor-6] INFO TaskTest - 线程保持时间(秒):60
13:02:46.358 [common-sync-executor-6] INFO TaskTest - 线程池最大数量:10
13:02:46.358 [common-sync-executor-6] INFO TaskTest - 线程池等待的任务数量: 100
13:02:46.358 [common-sync-executor-6] INFO TaskTest - 线程池已完成任务数量: 0
13:02:46.358 [common-sync-executor-8] INFO TaskTest - 进站人:107号打工人
13:02:46.358 [common-sync-executor-8] INFO TaskTest - 核心线程数:5
13:02:46.358 [common-sync-executor-8] INFO TaskTest - 线程池大小:9
13:02:46.358 [common-sync-executor-8] INFO TaskTest - 活跃线程数:9
13:02:46.359 [common-sync-executor-8] INFO TaskTest - 线程保持时间(秒):60
13:02:46.359 [common-sync-executor-8] INFO TaskTest - 线程池最大数量:10
13:02:46.359 [common-sync-executor-8] INFO TaskTest - 线程池等待的任务数量: 100
13:02:46.359 [common-sync-executor-8] INFO TaskTest - 线程池已完成任务数量: 0
13:02:46.359 [common-sync-executor-9] INFO TaskTest - 进站人:108号打工人
13:02:46.359 [common-sync-executor-9] INFO TaskTest - 核心线程数:5
13:02:46.359 [common-sync-executor-9] INFO TaskTest - 线程池大小:10
13:02:46.359 [common-sync-executor-9] INFO TaskTest - 活跃线程数:10
13:02:46.359 [common-sync-executor-9] INFO TaskTest - 线程保持时间(秒):60
13:02:46.359 [common-sync-executor-9] INFO TaskTest - 线程池最大数量:10
13:02:46.359 [common-sync-executor-9] INFO TaskTest - 线程池等待的任务数量: 100
13:02:46.359 [common-sync-executor-9] INFO TaskTest - 线程池已完成任务数量: 0
13:02:46.359 [main] INFO TaskTest - 进站人:110号打工人
13:02:46.359 [common-sync-executor-10] INFO TaskTest - 进站人:109号打工人
13:02:46.359 [main] INFO TaskTest - 核心线程数:5
13:02:46.359 [common-sync-executor-10] INFO TaskTest - 核心线程数:5
13:02:46.359 [common-sync-executor-10] INFO TaskTest - 线程池大小:10
13:02:46.359 [main] INFO TaskTest - 线程池大小:10
13:02:46.359 [common-sync-executor-10] INFO TaskTest - 活跃线程数:10
13:02:46.359 [main] INFO TaskTest - 活跃线程数:10
13:02:46.359 [common-sync-executor-10] INFO TaskTest - 线程保持时间(秒):60
13:02:46.359 [main] INFO TaskTest - 线程保持时间(秒):60
13:02:46.359 [common-sync-executor-10] INFO TaskTest - 线程池最大数量:10
13:02:46.359 [main] INFO TaskTest - 线程池最大数量:10
13:02:46.359 [common-sync-executor-10] INFO TaskTest - 线程池等待的任务数量: 100
13:02:46.359 [main] INFO TaskTest - 线程池等待的任务数量: 100
13:02:46.359 [common-sync-executor-10] INFO TaskTest - 线程池已完成任务数量: 0
13:02:46.360 [main] INFO TaskTest - 线程池已完成任务数量: 0
13:02:46.368 [common-sync-executor-8] INFO TaskTest - 107号打工人走到闸机口, 发现未买票, 安排同伴去买票, 需要81毫秒
13:02:46.368 [common-sync-executor-8] INFO TaskTest - 107号打工人的同伴出来买票了
13:02:46.417 [common-sync-executor-2] INFO TaskTest - 2号打工人走到闸机口, 发现未买票, 安排同伴去买票, 需要391毫秒
13:02:46.417 [common-sync-executor-2] INFO TaskTest - 2号打工人的同伴出来买票了
13:02:46.449 [common-sync-executor-8] INFO TaskTest - 107号打工人与同伴成功进站
13:02:46.449 [common-sync-executor-8] INFO TaskTest - 进站人:6号打工人
13:02:46.449 [common-sync-executor-8] INFO TaskTest - 核心线程数:5
13:02:46.449 [common-sync-executor-8] INFO TaskTest - 线程池大小:10
13:02:46.449 [common-sync-executor-8] INFO TaskTest - 活跃线程数:10
13:02:46.449 [common-sync-executor-8] INFO TaskTest - 线程保持时间(秒):60
13:02:46.449 [common-sync-executor-8] INFO TaskTest - 线程池最大数量:10
13:02:46.449 [common-sync-executor-8] INFO TaskTest - 线程池等待的任务数量: 99
13:02:46.449 [common-sync-executor-8] INFO TaskTest - 线程池已完成任务数量: 1
13:02:46.515 [common-sync-executor-4] INFO TaskTest - 4号打工人走到闸机口, 发现未买票, 安排同伴去买票, 需要34毫秒
13:02:46.559 [common-sync-executor-6] INFO TaskTest - 106号打工人走到闸机口, 发现未买票, 安排同伴去买票, 需要116毫秒
13:02:46.559 [common-sync-executor-6] INFO TaskTest - 106号打工人的同伴出来买票了
13:02:46.570 [common-sync-executor-9] INFO TaskTest - 108号打工人走到闸机口, 发现未买票, 安排同伴去买票, 需要149毫秒
13:02:46.570 [common-sync-executor-9] INFO TaskTest - 108号打工人的同伴出来买票了
13:02:46.633 [common-sync-executor-7] INFO TaskTest - 进站人:7号打工人
13:02:46.633 [common-sync-executor-1] INFO TaskTest - 1号打工人与同伴成功进站
13:02:46.633 [common-sync-executor-7] INFO TaskTest - 核心线程数:5
13:02:46.633 [common-sync-executor-7] INFO TaskTest - 线程池大小:10
13:02:46.633 [common-sync-executor-1] INFO TaskTest - 进站人:8号打工人
13:02:46.633 [common-sync-executor-7] INFO TaskTest - 活跃线程数:10
13:02:46.633 [common-sync-executor-1] INFO TaskTest - 核心线程数:5
13:02:46.633 [common-sync-executor-1] INFO TaskTest - 线程池大小:10
13:02:46.633 [common-sync-executor-7] INFO TaskTest - 线程保持时间(秒):60
13:02:46.633 [common-sync-executor-7] INFO TaskTest - 线程池最大数量:10
13:02:46.633 [common-sync-executor-1] INFO TaskTest - 活跃线程数:10
13:02:46.633 [common-sync-executor-1] INFO TaskTest - 线程保持时间(秒):60
13:02:46.633 [common-sync-executor-7] INFO TaskTest - 线程池等待的任务数量: 98
13:02:46.633 [common-sync-executor-7] INFO TaskTest - 线程池已完成任务数量: 3
13:02:46.633 [common-sync-executor-1] INFO TaskTest - 线程池最大数量:10
13:02:46.633 [common-sync-executor-1] INFO TaskTest - 线程池等待的任务数量: 98
13:02:46.633 [common-sync-executor-1] INFO TaskTest - 线程池已完成任务数量: 3
13:02:46.675 [common-sync-executor-6] INFO TaskTest - 106号打工人与同伴成功进站
13:02:46.675 [common-sync-executor-6] INFO TaskTest - 进站人:9号打工人
13:02:46.675 [common-sync-executor-6] INFO TaskTest - 核心线程数:5
13:02:46.675 [common-sync-executor-6] INFO TaskTest - 线程池大小:10
13:02:46.675 [common-sync-executor-6] INFO TaskTest - 活跃线程数:10
13:02:46.675 [common-sync-executor-6] INFO TaskTest - 线程保持时间(秒):60
13:02:46.675 [common-sync-executor-6] INFO TaskTest - 线程池最大数量:10
13:02:46.675 [common-sync-executor-6] INFO TaskTest - 线程池等待的任务数量: 97
13:02:46.675 [common-sync-executor-6] INFO TaskTest - 线程池已完成任务数量: 4
13:02:46.719 [common-sync-executor-9] INFO TaskTest - 108号打工人与同伴成功进站
13:02:46.719 [common-sync-executor-5] INFO TaskTest - 5号打工人走到闸机口, 发现未买票, 安排同伴去买票, 需要380毫秒
13:02:46.719 [common-sync-executor-9] INFO TaskTest - 进站人:10号打工人
13:02:46.719 [common-sync-executor-9] INFO TaskTest - 核心线程数:5
13:02:46.719 [common-sync-executor-9] INFO TaskTest - 线程池大小:10
13:02:46.719 [common-sync-executor-9] INFO TaskTest - 活跃线程数:10
13:02:46.719 [common-sync-executor-9] INFO TaskTest - 线程保持时间(秒):60
13:02:46.719 [common-sync-executor-9] INFO TaskTest - 线程池最大数量:10
13:02:46.719 [common-sync-executor-9] INFO TaskTest - 线程池等待的任务数量: 97
13:02:46.719 [common-sync-executor-9] INFO TaskTest - 线程池已完成任务数量: 5
13:02:46.784 [main] INFO TaskTest - 110号打工人走到闸机口, 发现未买票, 安排同伴去买票, 需要291毫秒
13:02:46.796 [common-sync-executor-3] INFO TaskTest - 3号打工人走到闸机口, 发现未买票, 安排同伴去买票, 需要205毫秒
13:02:46.808 [common-sync-executor-2] INFO TaskTest - 2号打工人与同伴成功进站
13:02:46.808 [common-sync-executor-2] INFO TaskTest - 进站人:11号打工人
13:02:46.808 [common-sync-executor-2] INFO TaskTest - 核心线程数:5
13:02:46.808 [common-sync-executor-2] INFO TaskTest - 线程池大小:10
13:02:46.808 [common-sync-executor-2] INFO TaskTest - 活跃线程数:10
13:02:46.808 [common-sync-executor-2] INFO TaskTest - 线程保持时间(秒):60
13:02:46.808 [common-sync-executor-2] INFO TaskTest - 线程池最大数量:10
13:02:46.808 [common-sync-executor-2] INFO TaskTest - 线程池等待的任务数量: 98
13:02:46.808 [common-sync-executor-2] INFO TaskTest - 线程池已完成任务数量: 6
13:02:46.815 [common-sync-executor-2] INFO TaskTest - 11号打工人走到闸机口, 发现未买票, 安排同伴去买票, 需要192毫秒
13:02:46.821 [common-sync-executor-10] INFO TaskTest - 109号打工人走到闸机口, 发现未买票, 安排同伴去买票, 需要115毫秒
13:02:46.837 [common-sync-executor-1] INFO TaskTest - 8号打工人走到闸机口, 发现未买票, 安排同伴去买票, 需要67毫秒
13:02:46.837 [common-sync-executor-1] INFO TaskTest - 8号打工人的同伴出来买票了
13:02:46.904 [common-sync-executor-1] INFO TaskTest - 8号打工人与同伴成功进站
13:02:46.904 [common-sync-executor-1] INFO TaskTest - 进站人:12号打工人
13:02:46.904 [common-sync-executor-1] INFO TaskTest - 核心线程数:5
13:02:46.904 [common-sync-executor-1] INFO TaskTest - 线程池大小:10
13:02:46.904 [common-sync-executor-1] INFO TaskTest - 活跃线程数:10
13:02:46.904 [common-sync-executor-1] INFO TaskTest - 线程保持时间(秒):60
13:02:46.904 [common-sync-executor-1] INFO TaskTest - 线程池最大数量:10
13:02:46.904 [common-sync-executor-1] INFO TaskTest - 线程池等待的任务数量: 99
13:02:46.904 [common-sync-executor-1] INFO TaskTest - 线程池已完成任务数量: 7
13:02:46.934 [common-sync-executor-8] INFO TaskTest - 6号打工人走到闸机口, 发现未买票, 安排同伴去买票, 需要198毫秒
13:02:46.949 [common-sync-executor-7] INFO TaskTest - 7号打工人走到闸机口, 发现未买票, 安排同伴去买票, 需要470毫秒
13:02:46.949 [common-sync-executor-7] INFO TaskTest - 7号打工人的同伴出来买票了
13:02:46.969 [common-sync-executor-6] INFO TaskTest - 9号打工人走到闸机口, 发现未买票, 安排同伴去买票, 需要142毫秒
13:02:46.969 [common-sync-executor-6] INFO TaskTest - 9号打工人的同伴出来买票了
13:02:47.102 [common-sync-executor-1] INFO TaskTest - 12号打工人走到闸机口, 发现未买票, 安排同伴去买票, 需要470毫秒
13:02:47.102 [common-sync-executor-1] INFO TaskTest - 12号打工人的同伴出来买票了
13:02:47.111 [common-sync-executor-6] INFO TaskTest - 9号打工人与同伴成功进站
13:02:47.111 [common-sync-executor-9] INFO TaskTest - 10号打工人走到闸机口, 发现未买票, 安排同伴去买票, 需要109毫秒
13:02:47.111 [common-sync-executor-6] INFO TaskTest - 进站人:13号打工人
13:02:47.111 [common-sync-executor-6] INFO TaskTest - 核心线程数:5
13:02:47.111 [common-sync-executor-6] INFO TaskTest - 线程池大小:10
13:02:47.111 [common-sync-executor-6] INFO TaskTest - 活跃线程数:10
13:02:47.111 [common-sync-executor-6] INFO TaskTest - 线程保持时间(秒):60
13:02:47.111 [common-sync-executor-6] INFO TaskTest - 线程池最大数量:10
13:02:47.111 [common-sync-executor-6] INFO TaskTest - 线程池等待的任务数量: 100
13:02:47.111 [common-sync-executor-6] INFO TaskTest - 线程池已完成任务数量: 8
13:02:47.220 [common-sync-executor-6] INFO TaskTest - 13号打工人走到闸机口, 发现未买票, 安排同伴去买票, 需要245毫秒
13:02:47.220 [common-sync-executor-6] INFO TaskTest - 13号打工人的同伴出来买票了
13:02:47.419 [common-sync-executor-7] INFO TaskTest - 7号打工人与同伴成功进站
13:02:47.419 [common-sync-executor-7] INFO TaskTest - 进站人:14号打工人
13:02:47.419 [common-sync-executor-7] INFO TaskTest - 核心线程数:5
13:02:47.419 [common-sync-executor-7] INFO TaskTest - 线程池大小:10
13:02:47.419 [common-sync-executor-7] INFO TaskTest - 活跃线程数:10
13:02:47.419 [common-sync-executor-7] INFO TaskTest - 线程保持时间(秒):60
13:02:47.419 [common-sync-executor-7] INFO TaskTest - 线程池最大数量:10
13:02:47.419 [common-sync-executor-7] INFO TaskTest - 线程池等待的任务数量: 99
13:02:47.419 [common-sync-executor-7] INFO TaskTest - 线程池已完成任务数量: 9
13:02:47.465 [common-sync-executor-6] INFO TaskTest - 13号打工人与同伴成功进站
13:02:47.465 [common-sync-executor-6] INFO TaskTest - 进站人:15号打工人
13:02:47.465 [common-sync-executor-6] INFO TaskTest - 核心线程数:5
13:02:47.465 [common-sync-executor-6] INFO TaskTest - 线程池大小:10
13:02:47.465 [common-sync-executor-6] INFO TaskTest - 活跃线程数:10
13:02:47.465 [common-sync-executor-6] INFO TaskTest - 线程保持时间(秒):60
13:02:47.465 [common-sync-executor-6] INFO TaskTest - 线程池最大数量:10
13:02:47.465 [common-sync-executor-6] INFO TaskTest - 线程池等待的任务数量: 98
13:02:47.465 [common-sync-executor-6] INFO TaskTest - 线程池已完成任务数量: 10
13:02:47.468 [common-sync-executor-6] INFO TaskTest - 15号打工人走到闸机口, 发现未买票, 安排同伴去买票, 需要392毫秒
13:02:47.572 [common-sync-executor-1] INFO TaskTest - 12号打工人与同伴成功进站
13:02:47.572 [common-sync-executor-1] INFO TaskTest - 进站人:16号打工人
13:02:47.572 [common-sync-executor-1] INFO TaskTest - 核心线程数:5
13:02:47.572 [common-sync-executor-1] INFO TaskTest - 线程池大小:10
13:02:47.572 [common-sync-executor-1] INFO TaskTest - 活跃线程数:10
13:02:47.572 [common-sync-executor-1] INFO TaskTest - 线程保持时间(秒):60
13:02:47.572 [common-sync-executor-1] INFO TaskTest - 线程池最大数量:10
13:02:47.572 [common-sync-executor-1] INFO TaskTest - 线程池等待的任务数量: 98
13:02:47.572 [common-sync-executor-1] INFO TaskTest - 线程池已完成任务数量: 11
13:02:47.898 [common-sync-executor-7] INFO TaskTest - 14号打工人走到闸机口, 发现未买票, 安排同伴去买票, 需要249毫秒
13:02:48.064 [common-sync-executor-1] INFO TaskTest - 16号打工人走到闸机口, 发现未买票, 安排同伴去买票, 需要407毫秒
...线程池饥饿死锁

从日志可以看出,TASK_EXECUTOR 线程池一共经历了三个阶段:开启核心线程处理任务;将多余任务放入阻塞队列;开启非核心线程处理任务。一共有 500 个打工人需要进站,最终只有大概 11 人(ThreadPoolExecutor().getCompletedTaskCount()返回模糊数量)成功进站。线程池死锁,主线程全部处于 WAITING 状态。

五、 Spring 线程池饥饿死锁解决方案

线程池死锁,影响比较严重。在编码过程中容易忽略(需要追溯整条业务代码链);在测试过程中不易发现(需要压测,且是偶发现象)。我们可以采用以下方案:

  • 禁止嵌套任务共用线程池(推荐);
  • 硬编码 jdk 线程池,执行完任务后手动shutdown(不推荐);
  • 回归业务,整合嵌套任务(不推荐)。