0%


Oracle SQL*LoaderOracle 数据库管理系统中一个强大的工具,用于高效地将大量数据从外部文件加载到 Oracle 数据库中。它提供了一种快速、灵活的方式来导入数据,适用于各种数据格式和文件类型。本文将介绍 SQL*Loader 的基本概念、工作原理以及实际应用场景。

一、什么是 SQL*Loader?

SQL*Loader 是一个用于导入数据的实用程序,允许用户将普通文件、CSV 文件等外部数据源中的数据加载到 Oracle 数据库表中。它是 Oracle 数据库中的一个标准工具,可以轻松地处理大规模的数据加载任务。

二、SQL*Loader 的工作原理

SQL*Loader 的工作原理比较简单:

  • 控制文件定义:编写一个控制文件,其中指定了要加载的目标表、字段映射关系、数据格式等信息。控制文件是 SQL*Loader 的核心配置文件之一;

  • 准备外部数据文件:用户需要准备一个包含待加载数据的外部文件,可以是纯文本文件、CSV 文件等格式;

  • 运行 SQL*Loader:通过命令行或者其他界面工具运行 SQL*Loader,并指定控制文件和数据文件的位置。SQL*Loader 将根据控制文件加载到目标表中;

  • 数据加载SQL*Loader 按照控制文件中指定的规则,逐行解析外部数据文件,并将数据插入到目标表中;

三、SQL*Loader 控制文件

这是一个控制文件模板样例:

1
2
3
4
5
6
7
8
9
10
11
12
13
LOAD DATA
INFILE 'data.csv' -- 指定外部数据文件的路径
INTO TABLE employees -- 指定目标表名
CHARACTERSET UTF8 -- 指定外部数据文件的字符集编码为 UTF-8
FIELDS TERMINATED BY ',' OPTIONALLY ENCLOSED BY '"' -- 指定字段分隔符和可选的字段包裹符
(
employee_id, -- 目标表的列名
first_name,
last_name,
email,
hire_date DATE 'YYYY-MM-DD' -- 数据格式化,确保日期格式正确
)
WHEN (hire_date >= '2022-01-01')
  • LOAD DATA 表示开始加载数据的声明;
  • INFILE 'data.csv' 指定了外部数据文件的路径。你需要将 data.csv 替换为实际的外部数据文件名,并确保文件路径正确;
  • INTO TABLE employees 指定了目标表名为 employees,即将数据加载到 employees 表中。你需要将 employees 替换为实际的目标表名;
  • CHARACTERSET UTF8 指定外部数据文件的字符集编码为 UTF-8
  • FIELDS TERMINATED BY ',' OPTIONALLY ENCLOSED BY '"' 定义了字段的分隔符和可选的字段包裹符。在这个示例中,字段被逗号分隔,并且可能被双引号包裹。根据实际情况,你可能需要调整这些选项;
  • (employee_id, first_name, last_name, email, hire_date DATE 'YYYY-MM-DD') 定义了要加载的字段和它们的数据类型。确保与目标表的列名和数据类型匹配。在这个示例中,hire_date 被格式化为日期,并指定了日期的格式;
  • WHEN (hire_date >= '2022-01-01') 定义了加载数据的条件,只有满足条件的数据,才会被导入 Oracle 数据库;

四、SQL*Loader 的应用场景

SQL*Loader 在实际应用中有广泛的用途,例如:

  • 数据迁移和导入:当需要将外部数据源中的数据导入到 Oracle 数据库中时,SQL*Loader 是一个好的选择。它可以通过灵活的配置,处理大量的数据;
  • 数据集成和同步:在数据集成和同步的场景中,SQL*Loader 可以用于将不同系统或者数据源中的数据整合到同一数据库中,以便进行分析、报告等操作;
  • 日常数据加载:从外部系统中获取数据,并将其加载到 Oracle 数据库中进行进一步处理。SQL*Loader 可以自动化这一过程,提高数据处理的效率;

五、实际应用

以下是一个简单的示例,演示如何使用 SQL*LoaderCSV 文件加载到 Oracle 表中:

  • 创建一个控制文件 data.ctl,定义目标表和字段映射关系:
1
2
3
4
5
6
7
8
9
10
11
sqlCopy codeLOAD DATA
INFILE 'data.csv'
INTO TABLE employees
FIELDS TERMINATED BY ',' OPTIONALLY ENCLOSED BY '"'
( employee_id,
first_name,
last_name,
email,
hire_date DATE 'YYYY-MM-DD'
)
WHEN (hire_date >= '2022-01-01')
  • 准备外部数据文件 data.csv,包含待加载的数据。
employee_id first_name last_name email hire_date
001 zhangsan@gmail.com 2023-01-01
002 lisi@gmail.com 2023-01-02
  • 在命令行中运行 SQL*Loader
1
2
bashCopy code
sqlldr username/password@database control=data.ctl

这样,SQL*Loader 就会将 data.csv 中的数据加载到名为 employees 的表中。

六、结论

Oracle SQL*Loader 是一个强大的数据加载工具,可用于高效地将外部数据加载到 Oracle 数据库中。通过简单的配置和命令,用户可以轻松地处理大量的数据加载任务,提高数据处理的效率和可靠性。


一、跨库分页查询

对于一定数量级的表,如订单表,通常采用分库分表的方式保存数据。根据指定字段,如果用户ID,散列数据到不同的库中。那么,如果需要按照下单时间分页查询订单信息,就涉及到跨库查询。

假设有45笔订单,存于三个库中,散列算法是 OrderID % 3,则数据分布为:

如果以每页五个订单,查询第二页的所有订单,则单库查询 sql 为:

1
select * from order_info order by id limit 5 offset 5;

但跨库查询就行不通了。下面,主要有三种方案可以用于跨库分页查询:

  • 全局查询法
  • 禁止跳页查询法
  • 二次查询法

二、全局查询法

全局查询法,需要在每个分库中执行查询语句,然后再程序中排序,再定位切割到指定的数据段。

如果需要查询第二页订单,需要查询每个库的前二页数据:

1
2
3
select * from order_info_1 order by id limit 10;
select * from order_info_2 order by id limit 10;
select * from order_info_3 order by id limit 10;

结果为:

将以上三个库的查询结果排序:

1,2,3,4,5,6,7,8,9,10,11,12,13,14,15,16,17,18,19,20,21,22,23,24,25,26,27,28,29,30

那么第二页的订单列表为:

6,7,8,9,10

小结:对于低页码查询,全局查询法是可以应付的,但是,当页码越来越大,查出来的数据也就越来越多,需要排序的数据也越来越多,查询效率也就会越来越慢。

三、禁止跳页查询法

全局查询法的一个显著缺陷,就是随着页码越来越大,查询的数据量也越来越大。那么,如果禁止跳页查询,且每次查询都以上次查询的最大ID为基点,就可以保证每次查询的数据量都是相同的。

查询第一页数据:

将以上三个库的查询结果排序:

1,2,3,4,5,6,7,8,9,10,11,12,13,14,15

那么第一页的订单列表为:

1,2,3,4,5

查询第二页数据:

第一页的订单ID最大值为5,因此第二页的订单ID起始值应大于5,查询得到:

将以上三个库的查询结果排序:

6,7,8,9,10,11,12,13,14,15,16,17,18,19,20

那么第二页的订单列表为:

6,7,8,9,10

小结:禁止跳页查询法,保证每次查询的数据量是相同的,避免了分页查询带来的性能衰减问题;但禁止跳页也是功能缺陷,没法一步定位到指定数据段。

四、二次查询法

二次查找法,既满足跳页查询,也能避免分页查询性能衰减。为了解释这一思想,我们以查询第三页订单数据为例。单库查询语句:

1
select * from order_info order by id limit 5 offset 10;

之所以叫二次查询法,当然需要查询两次。这两次查询有什么不同,希望通过以下四个步骤说清楚:

第一步:语句改写

select * from order_info order by id limit 5 offset 10 改写成 select * from order_info order by id limit 5 offset 3。偏移量10变成3,是基于10/3计算得出的。将语句在三个库分别执行,得到数据:

第二步:找最小值

  • 第一个库:最小数据为8
  • 第二个库:最小数据为11
  • 第三个库:最小数据为12

因此,从三个库中拿到的最小数据为8。

第三步:第二次语句改写

这次需要把 select * from order_info order by id limit 5 offset 3 改写成一个between语句,起点是最小的OrderID,终点是原来每个分库各自返回数据的最大值:

  • 第一个分库改写为: select * from order_info order by id where id between id_min and 22
  • 第二个分库改写为: select * from order_info order by id where id between id_min and 23
  • 第三个分库改写为: select * from order_info order by id where id between id_min and 24

查询结果如下:

第四步:找到id_min的全局偏移量

第一次查询的偏移量为3,那么每一个库的第一个目标数据的偏移量应该都是4。因此可知每个库的id_min的偏移量:

  • 第一个库:8就是id_min,偏移量为4;
  • 第二个库:11的偏移量为4,那么id_min的偏移量就是1;
  • 第三个库:12的偏移量为4,那么id_min的偏移量就是3;

因此id_min的全局偏移量为:4 + 1 + 3 = 8。

第五步:定位目标数据

  • 第一个库:8,13,14,19,22
  • 第二个库:9,10,11,16,17,18,23
  • 第三个库:12,15,20,21,24

经过排序,得到:

8,9,10,11,12,13,14,15,16,17,18,19,20,21,22,23,24

因为id_min的全局偏移量为8,最终结果需要 limit 5 offset 10,因此需要向后推移10 - 8 = 2 位,然后再取5位,得到:

11,12,13,14,15

小结:二次查找法,既避免了数据越处理越多,也支持跳转查询。但其也存在短板,需要查询两次,才能拿到目标数据。


一、零拷贝技术

零拷贝技术,是相对于传统 IO 的一种技术思想。传统 IO :读写数据是在用户空间和内核空间来回复制,而内核空间的数据是通过操作系统层面的 IO 接口从磁盘读取或写入的,中间也需要多次拷贝,因此效率较低。零拷贝技术,目的是尽可能地减少上下文切换和拷贝次数,提升操作性能。

二、传统 IO 实现原理

当应用服务接收客户端的请求时,传统 IO 通常需要两种系统调用:

1
2
3
4
// 读取
read(file, tmp_buf, len);
// 写入
write(socket, tmp_buf, len);

从细分图中可知,虽然只是简单的读写操作,但内部的流程还是比较复杂的。

一次读写将发生 4 次上下文切换:

  • 读取数据:从用户态切换到内核态;
  • 读取完毕:内核完成数据准备,从内核态切换到用户态;
  • 写入数据:从用户态切换到内核态;
  • 写入完毕:内核完成数据写入,从内核态切换到用户态;

一次读写将发生 4 次数据拷贝 (2 次 DMA 拷贝 + 2 次 CPU 拷贝):

  • 第一次拷贝 (DMA):把磁盘文件数据拷贝到内核缓冲区;
  • 第二次拷贝 (CPU):把内核缓冲区的数据拷贝到用户缓冲区,供应用程序使用;
  • 第三次拷贝 (CPU):把用户缓冲区的数据拷贝到内核 socket 缓冲区;
  • 第四次拷贝 (DMA):把内核 socket 缓冲区的数据拷贝到网卡缓冲区;

虽然一次上下文切换需耗时只有几微秒,但在高并发场景中,这种延迟容易被积累和放大,从而影响整体性能。此外,磁盘和网卡操作速度远远小于内存,而内存操作速度又远远小于 CPU,4 次拷贝将严重拖慢系统性能。因此,提高 IO 性能,需要从减少上下文切换次数和数据拷贝次数两方面入手。

三、零拷贝实现

基于以上的讨论,可知零拷贝技术的设计思路:尽可能地减少上下文切换次数和数据拷贝次数。

零拷贝的具体实现方式有:

  • mmap:将内核空间和用户空间的虚拟地址映射到同一物理地址;
  • sendfile:直接把内核缓冲区的数据拷贝到网卡缓冲区;
  • direct IO:在应用层与磁盘、网卡之间建立直接通道;
3.1 mmap 实现零拷贝

在介绍 mmap() 的作用机制之前,先介绍一个新概念:虚拟内存。虚拟内存是现代操作系统中普遍使用的内存结构,使用虚拟地址代替物理内存,有两点好处:一是多个虚拟地址可以指向同一个物理地址;二是虚拟内存空间远远大于物理内存空间。

在传统 IO 中,read() 调用会把内核缓冲区的数据拷贝到用户缓冲区,耗时又耗力。如果把内核空间和用户空间的虚拟地址映射到同一个物理地址,那么就不需要 CPU 来回复制了。

mmap() 正是利用了虚拟内存的这一特性,取代传统 IO 的 read() 操作,并将内核缓冲区和用户缓冲区地址映射到同一物理内存地址,省去一次 CPU 拷贝的过程,提升 IO 性能。具体过程如下:

  • 应用进程调用 mmap() 后,DMA 会把磁盘文件数据拷贝到内核缓冲区;

  • 应用进程跟操作系统内核共享这个缓冲区;

  • 应用进程再调用 write(),直接将内核缓冲区的数据拷贝到内核 socket 缓冲区;

  • DMA 把内核 socket 缓冲区的数据拷贝到网卡缓冲区;

从调用过程可知,与传统 IO 相比,mmap() + write 只减少了 1 次 CPU 拷贝,仍然要发生 4 次上下文切换和 3 次数据拷贝。

3.2 sendfile() 实现零拷贝

senfile() 是 Linux 提供的,专门用于发送文件的系统调用函数。sendfile() 可以替代传统 IO 的 read()、write() 函数,这意味着将省去 2 次上下文切换。此外,数据拷贝路径也有所优化,具体的优化方案与 Linux 内核版本有关,因为在 2.4 版本之后,Linux 提供了 SG-DMA 技术,它将提供比 DMA 技术更进一步的优化策略。

在 2.4 版本之前,CPU 可以直接把内核缓冲区的数据拷贝到内核 socket 缓冲区,省去拷贝到用户缓冲区这一步,因此还存在 2 次上下文切换和 3 次数据拷贝。

具体执行步骤:

  • DMA 把磁盘文件数据拷贝到内核缓冲区;
  • CPU 把内核缓冲区的数据拷贝到内核 socket 缓冲区;
  • DMA 把内核 socket 缓冲区的数据拷贝到网卡缓冲区;

在 2.4 版本之后,引入了 SG_DMA 技术,如果相应的网卡支持该技术,那么就可以把内核缓冲区的数据直接拷贝到网卡缓冲区,也就是说还存在 2 次上下文切换和 2 次数据拷贝。

具体执行步骤:

  • DMA 把磁盘文件数据拷贝到内核缓冲区;
  • 把内核缓冲区描述符和数据长度传到内核 socket 缓冲区;
  • SG-DMA 直接把内核缓冲区的数据拷贝到网卡缓冲区;

3.3 direct IO

直接 IO 是在用户缓冲区和磁盘、网卡之间建立直接通道的技术设计。直接 IO 在读写数据时,可以绕开内核,减少上下文切换和数据拷贝的次数,从而提高效率。

具体执行步骤:

  • DMA 把磁盘文件数据直接拷贝到用户缓冲区;
  • DMA 把用户缓冲区的数据直接拷贝到网卡缓冲区;

直接 IO 使用直接通道操作数据,由应用层完全管理数据,其优缺点也是很明显的。

优点:

  • 应用层与磁盘、网卡建立直接通道,减少了上下文切换和数据拷贝的次数,速度更快;
  • 数据直接缓存在应用层,应用可以更加灵活得操作数据;

缺点:

  • 在应用层引入直接 IO,需要应用层自主管理,给系统增添了额外的复杂度;
  • 若数据不在应用层缓冲区,那么将直接操作磁盘文件读写,将大大拖慢性能;


一、Spring Batch 性能优化指标

Spring Batch 是一款伸缩性非常好的批处理工具,既可以处理简单的任务,也可以处理复杂的、高容量的任务。在性能调优方面,Spring Batch 提供了丰富的接口支持,各项优化指标可归纳如下:

  • 多线程 Step:由独立线程执行提交块(a chunk of items)的输入、处理和输出过程;
  • 并行化 Step:对于可并行处理的 Step,交由不同的线程去处理;
  • 分片化 Step:通过 SPI(Serial Peripheral Interface),对 Step 分片执行;
  • 远程组块:对于输入无性能瓶颈,但处理和输出有性能瓶颈的任务,交由远程组块执行;

详见Spring文档

二、分片化 Step

如果一个 Step 的任务量比较大,可以尝试将其拆分成多个子任务。子任务之间可并行处理且互不干扰,这将大大提升批处理效率。例如:Master 这个 Step 迁移 100000 条数据需要 100 s,如果将其拆分为 100 个 Slave 任务,那么时间可缩短至 1 s。

Step 分片原理,是一个 Master 处理器对应多个 Salve 处理器。Slave 处理器可以是远程服务,也可以是本地执行线程。主从服务间的消息不需要持久化,也不需要严格保证传递,因为 JobRepository 的元数据管理,是将每个 Salve 独立保存在 batch_step_execution 中的,这样便可以保证每个 Slave 任务只执行一次。

Step 分片化,需要了解两个组件:分片器(Partitioner)和分片处理(PartitionHandler)。

  • 分片器(Partitioner):为每个 Slave 服务配置上下文(StepExecutionContext);

  • 分片处理(PartitionHandler):定义 Slave 服务的数量以及 Slave 任务内容;

比如在一个数据迁移 Step 中,分片处理就是将 1 个主任务拆分成 100 个从任务,并定义从任务的执行内容;分片器就是依次为这 100 个从任务划定数据迁移的范围(select * from table where id between ? and ?)。

三、批处理配置

3.1 Job 配置
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
import org.springframework.batch.core.Job;
import org.springframework.batch.core.Step;
import org.springframework.batch.core.configuration.annotation.EnableBatchProcessing;
import org.springframework.batch.core.configuration.annotation.JobBuilderFactory;
import org.springframework.batch.core.launch.support.RunIdIncrementer;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration
@EnableBatchProcessing
public class PartitionTransferStudentJob {

@Autowired
public JobBuilderFactory jobBuilderFactory;

@Autowired
@Qualifier(value = "masterTransferStudentStep1")
private Step masterTransferStudentStep;

@Bean
public Job transferStudentJob() {
return jobBuilderFactory.get("partitionTransferStudentJob")
.incrementer(new RunIdIncrementer())
.flow(masterTransferStudentStep)
.end()
.build();
}
}
3.2 Step 配置

MasterTransferStudentStep

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
import com.example.springbatchdemo.component.partitioner.TransferStudentPartitioner;
import org.springframework.batch.core.Step;
import org.springframework.batch.core.configuration.annotation.StepBuilderFactory;
import org.springframework.batch.core.partition.PartitionHandler;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.transaction.PlatformTransactionManager;

@Configuration
public class MasterTransferStudentStep {

@Autowired
public StepBuilderFactory stepBuilderFactory;

@Autowired
@Qualifier(value = "transferStudentPartitionHandler1")
private PartitionHandler transferStudentPartitionHandler;

@Autowired
private TransferStudentPartitioner transferStudentPartitioner;

@Bean("masterTransferStudentStep1")
public Step masterTransferStudentStep1(PlatformTransactionManager transactionManager) {
return stepBuilderFactory.get("masterTransferStudentStep1.manager")
.partitioner("masterTransferStudentStep1", transferStudentPartitioner)
.partitionHandler(transferStudentPartitionHandler)
.build();
}
}

SlaveTransferStudentStep

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
import com.example.springbatchdemo.component.processor.SlaveStudentItemProcessor;
import com.example.springbatchdemo.entity.Student;
import org.springframework.batch.core.Step;
import org.springframework.batch.core.configuration.annotation.StepBuilderFactory;
import org.springframework.batch.item.database.JdbcBatchItemWriter;
import org.springframework.batch.item.database.JdbcPagingItemReader;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.transaction.PlatformTransactionManager;

@Configuration
public class SlaveTransferStudentStep {

@Autowired
public StepBuilderFactory stepBuilderFactory;

@Autowired
@Qualifier(value = "slaveTransferStudentItemReader")
private JdbcPagingItemReader<Student> slaveTransferStudentItemReader;

@Autowired
@Qualifier(value = "slaveTransferStudentItemWriter")
private JdbcBatchItemWriter<Student> slaveTransferStudentItemWriter;

@Autowired
private SlaveStudentItemProcessor slaveStudentItemProcessor;


@Bean("slaveTransferStudentStep1")
public Step slaveTransferStudentStep1(PlatformTransactionManager transactionManager) {
return stepBuilderFactory.get("slaveTransferStudentStep1")
.transactionManager(transactionManager)
.<Student, Student>chunk(1000)
.reader(slaveTransferStudentItemReader)
.processor(slaveStudentItemProcessor)
.writer(slaveTransferStudentItemWriter)
.build();
}
}
3.3 Partitioner 配置
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.batch.core.partition.support.Partitioner;
import org.springframework.batch.item.ExecutionContext;
import org.springframework.context.annotation.Configuration;
import java.util.HashMap;
import java.util.Map;

@Configuration
public class TransferStudentPartitioner implements Partitioner {

private static final Logger LOGGER = LoggerFactory.getLogger(TransferStudentPartitioner.class);

@Override
public Map<String, ExecutionContext> partition(int gridSize) {

Map<String, ExecutionContext> result = new HashMap<>(gridSize);

int range = 1000;
int fromId = 0;
int toId = range;

for (int i = 1; i <= gridSize; i++) {

ExecutionContext value = new ExecutionContext();

value.putInt("fromId", fromId);
value.putInt("toId", toId);

result.put("partition" + i, value);

fromId = toId;
toId += range;

LOGGER.info("partition{}; fromId: {}; toId: {}", i, fromId, toId);
}

return result;
}
}
3.4 Partition-Handler 配置
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
import org.springframework.batch.core.Step;
import org.springframework.batch.core.partition.PartitionHandler;
import org.springframework.batch.core.partition.support.TaskExecutorPartitionHandler;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
import static com.example.springbatchdemo.config.ExecutorConfig.TASK_EXECUTOR;

@Configuration
public class TransferStudentPartitionHandler {

@Autowired
@Qualifier(value = TASK_EXECUTOR)
private ThreadPoolTaskExecutor taskExecutor;

@Autowired
@Qualifier(value = "slaveTransferStudentStep1")
private Step slaveTransferStudentStep;

@Bean("transferStudentPartitionHandler1")
public PartitionHandler transferStudentPartitionHandler1() {
TaskExecutorPartitionHandler retVal = new TaskExecutorPartitionHandler();
retVal.setTaskExecutor(taskExecutor);
retVal.setStep(slaveTransferStudentStep);
retVal.setGridSize(100);
return retVal;
}
}
3.5 数据输入器
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
import com.example.springbatchdemo.component.reader.rowmapper.StudentRowMapper;
import com.example.springbatchdemo.entity.Person;
import com.example.springbatchdemo.entity.Student;
import org.springframework.batch.core.configuration.annotation.StepScope;
import org.springframework.batch.item.database.JdbcPagingItemReader;
import org.springframework.batch.item.database.Order;
import org.springframework.batch.item.database.builder.JdbcPagingItemReaderBuilder;
import org.springframework.batch.item.database.support.MySqlPagingQueryProvider;
import org.springframework.batch.item.file.FlatFileItemReader;
import org.springframework.batch.item.file.builder.FlatFileItemReaderBuilder;
import org.springframework.batch.item.file.mapping.BeanWrapperFieldSetMapper;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.core.io.ClassPathResource;
import javax.sql.DataSource;
import java.util.HashMap;
import java.util.Map;

@Configuration
public class CustomItemReader {

@Autowired
@Qualifier(value = "batchDemoDB")
private DataSource batchDemoDB;

@Bean("slaveTransferStudentItemReader")
@StepScope
public JdbcPagingItemReader<Student> slaveTransferStudentItemReader(@Value("#{stepExecutionContext[fromId]}") final Long fromId,
@Value("#{stepExecutionContext[toId]}") final Long toId) {

MySqlPagingQueryProvider queryProvider = new MySqlPagingQueryProvider();
queryProvider.setSelectClause("student_id, name, address");
queryProvider.setFromClause("from student_source");
queryProvider.setWhereClause(String.format("where student_id > %s and student_id <= %s", fromId, toId));

Map<String, Order> sortKeys = new HashMap<>(1);
sortKeys.put("student_id", Order.ASCENDING);
queryProvider.setSortKeys(sortKeys);

return new JdbcPagingItemReaderBuilder<Student>()
.name("studentItemReader")
.dataSource(batchDemoDB)
.fetchSize(1000)
.rowMapper(new StudentRowMapper())
.queryProvider(queryProvider)
.build();
}
}
3.6 数据处理器
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
import com.example.springbatchdemo.entity.Student;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.batch.core.configuration.annotation.StepScope;
import org.springframework.batch.item.ItemProcessor;
import org.springframework.context.annotation.Configuration;

@Configuration
@StepScope
public class SlaveStudentItemProcessor implements ItemProcessor<Student, Student> {

private static final Logger log = LoggerFactory.getLogger(StudentItemProcessor.class);

@Override
public Student process(final Student studentSource) throws Exception {

final Long studentId = studentSource.getStudentId();
final String name = studentSource.getName();
final String address = studentSource.getAddress();

final Student studentTarget = new Student();
studentTarget.setStudentId(studentId);
studentTarget.setName(name);
studentTarget.setAddress(address);

log.info("Converting ({}) into ({})", studentSource, studentTarget);

return studentTarget;
}
}
3.7 数据输出器
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
import com.example.springbatchdemo.entity.Person;
import com.example.springbatchdemo.entity.Student;
import org.springframework.batch.core.configuration.annotation.StepScope;
import org.springframework.batch.item.database.BeanPropertyItemSqlParameterSourceProvider;
import org.springframework.batch.item.database.JdbcBatchItemWriter;
import org.springframework.batch.item.database.builder.JdbcBatchItemWriterBuilder;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import javax.sql.DataSource;

@Configuration
public class CustomItemWriter {

@Autowired
@Qualifier(value = "batchDemoDB")
private DataSource batchDemoDB;

@Bean("slaveTransferStudentItemWriter")
@StepScope
public JdbcBatchItemWriter<Student> slaveTransferStudentItemWriter() {

return new JdbcBatchItemWriterBuilder<Student>()
.itemSqlParameterSourceProvider(new BeanPropertyItemSqlParameterSourceProvider<>())
.sql("INSERT INTO student_target (student_id, name, address) VALUES (:studentId, :name, :address)")
.dataSource(batchDemoDB)
.build();
}
}

四、性能测试

测试数据量:100000

测试环境:Windows 10,i7-8核,MySQL-8.0.28

4.1 常规 Step

省略测试代码,具体请查看 demo。测试结果:

耗时:13s

4.2 分片化 Step

测试结果:

batch_step_execution 可以看出,共有 100 个子任务并行处理,每个子任务迁移 1000 条数据。

耗时:7s

示例代码:spring-batch-demo


一、Spring Batch 性能优化指标

Spring Batch 是一款伸缩性非常好的批处理工具,既可以处理简单的任务,也可以处理复杂的、高容量的任务。在性能调优方面,Spring Batch 提供了丰富的接口支持,各项优化指标可归纳如下:

  • 多线程 Step:由独立线程执行提交块(a chunk of items)的输入、处理和输出过程;
  • 并行化 Step:对于可并行处理的 Step,交由不同的线程去处理;
  • 分片化 Step:通过 SPI(Serial Peripheral Interface),对 Step 分片执行;
  • 远程组块:对于输入无性能瓶颈,但处理和输出有性能瓶颈的任务,交由远程组块执行;

详见Spring文档

二、并行化 Step

一个 Job 可配置多个 StepStep 之间可能存在关联,需要有先有后;也可能没有关联,先执行哪一个都可以。那么,若将这些互不关联的 Step 进行并行化处理,将会有效提升批处理性能。

比如,现有一个批处理任务,包含 4 个 Step

  • step1:在学生姓名后面追加字符串 “1”;
  • step2:在学生姓名后面追加字符串 “2”;
  • step3:在学生住址后面追加字符串 “8”;
  • step4:迁移所有学生信息;

我们发现,修改学生姓名的任务与修改学生住址的任务,互不干扰,并不需要有先后之分。因此,我们可以将 step1step2step3 并行执行。串行 Step 与并行 Step 流程如下:

三、批处理配置

3.1 Job 配置
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
import org.springframework.batch.core.Job;
import org.springframework.batch.core.Step;
import org.springframework.batch.core.configuration.annotation.EnableBatchProcessing;
import org.springframework.batch.core.configuration.annotation.JobBuilderFactory;
import org.springframework.batch.core.job.flow.Flow;
import org.springframework.batch.core.launch.support.RunIdIncrementer;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration
@EnableBatchProcessing
public class BatchManageStudentJob {

@Autowired
public JobBuilderFactory jobBuilderFactory;

@Autowired
@Qualifier(value = "batchProcessStudentSplitFlow1")
private Flow batchProcessStudentSplitFlow;

@Autowired
@Qualifier(value = "batchTransferStudentStep1")
private Step batchTransferStudentStep;

@Bean
public Job manageStudentJob() {
return jobBuilderFactory.get("manageStudentJob1")
.incrementer(new RunIdIncrementer())
// 姓名追加1、姓名追加2、地址追加8
.start(batchProcessStudentSplitFlow)
// 迁移学生信息; student_source -> student_target
.next(batchTransferStudentStep)
.end()
.build();
}
}
3.2 Fow 配置

batchProcessStudentSplitFlow

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
import org.springframework.batch.core.job.builder.FlowBuilder;
import org.springframework.batch.core.job.flow.Flow;
import org.springframework.batch.core.job.flow.support.SimpleFlow;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
import static com.example.springbatchdemo.config.ExecutorConfig.TASK_EXECUTOR;

@Configuration
public class BatchProcessStudentSplitFlow {

@Autowired
@Qualifier(value = TASK_EXECUTOR)
private ThreadPoolTaskExecutor taskExecutor;

@Autowired
@Qualifier(value = "batchUpdateStudentNameOneAndTwoFlow")
private Flow batchUpdateStudentNameOneAndTwoFlow;

@Autowired
@Qualifier(value = "batchUpdateStudentAddressFlow1")
private Flow batchUpdateStudentAddressFlow;

@Bean("batchProcessStudentSplitFlow1")
public Flow batchProcessStudentSplitFlow1() {
return new FlowBuilder<SimpleFlow>("batchProcessStudentSplitFlow1")
.split(taskExecutor)
.add(batchUpdateStudentNameOneAndTwoFlow, batchUpdateStudentAddressFlow)
.build();
}
}

batchUpdateStudentNameOneAndTwoFlow

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
import org.springframework.batch.core.Step;
import org.springframework.batch.core.job.builder.FlowBuilder;
import org.springframework.batch.core.job.flow.Flow;
import org.springframework.batch.core.job.flow.support.SimpleFlow;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration
public class BatchUpdateStudentNameFlow {

@Autowired
@Qualifier(value = "batchUpdateStudentNameStep1")
private Step batchUpdateStudentNameStep1;

@Autowired
@Qualifier(value = "batchUpdateStudentNameStep2")
private Step batchUpdateStudentNameStep2;

@Bean("batchUpdateStudentNameOneAndTwoFlow")
public Flow updateStudentNameOneAndTwoFlow() {
return new FlowBuilder<SimpleFlow>("batchUpdateStudentNameOneAndTwoFlow")
.start(batchUpdateStudentNameStep1)
.next(batchUpdateStudentNameStep2)
.build();
}
}

batchUpdateStudentNameOneAndTwoFlow

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
import org.springframework.batch.core.Step;
import org.springframework.batch.core.job.builder.FlowBuilder;
import org.springframework.batch.core.job.flow.Flow;
import org.springframework.batch.core.job.flow.support.SimpleFlow;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration
public class BatchUpdateStudentAddressFlow {

@Autowired
@Qualifier(value = "batchUpdateStudentAddressStep1")
private Step batchUpdateStudentAddressStep;

@Bean("batchUpdateStudentAddressFlow1")
public Flow batchUpdateStudentAddressFlow1() {
return new FlowBuilder<SimpleFlow>("batchUpdateStudentAddressFlow1")
.start(batchUpdateStudentAddressStep)
.build();
}
}
3.3 Step 配置

BatchUpdateStudentNameStep

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
import com.example.springbatchdemo.component.processor.AppendStudentNameOneProcessor;
import com.example.springbatchdemo.component.processor.AppendStudentNameTwoProcessor;
import com.example.springbatchdemo.entity.Student;
import org.springframework.batch.core.Step;
import org.springframework.batch.core.configuration.annotation.StepBuilderFactory;
import org.springframework.batch.item.database.JdbcBatchItemWriter;
import org.springframework.batch.item.database.JdbcPagingItemReader;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration
public class BatchUpdateStudentNameStep {

@Autowired
public StepBuilderFactory stepBuilderFactory;

@Autowired
@Qualifier(value = "studentItemReader")
private JdbcPagingItemReader<Student> studentItemReader;

@Autowired
@Qualifier(value = "studentItemUpdateName")
private JdbcBatchItemWriter<Student> studentItemUpdateName;

@Autowired
private AppendStudentNameOneProcessor appendStudentNameOneProcessor;

@Autowired
private AppendStudentNameTwoProcessor appendStudentNameTwoProcessor;

@Bean("batchUpdateStudentNameStep1")
public Step batchUpdateStudentNameStep1() {
return stepBuilderFactory.get("batchUpdateStudentNameStep1")
.<Student, Student>chunk(1000)
.reader(studentItemReader)
// 姓名追加 1
.processor(appendStudentNameOneProcessor)
.writer(studentItemUpdateName)
.build();
}

@Bean("batchUpdateStudentNameStep2")
public Step batchUpdateStudentNameStep2() {
return stepBuilderFactory.get("batchUpdateStudentNameStep2")
.<Student, Student>chunk(1000)
.reader(studentItemReader)
// 姓名追加 2
.processor(appendStudentNameTwoProcessor)
.writer(studentItemUpdateName)
.build();
}
}

BatchUpdateStudentAddressStep

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
import com.example.springbatchdemo.component.processor.AppendStudentAddressProcessor;
import com.example.springbatchdemo.entity.Student;
import org.springframework.batch.core.Step;
import org.springframework.batch.core.configuration.annotation.StepBuilderFactory;
import org.springframework.batch.item.database.JdbcBatchItemWriter;
import org.springframework.batch.item.database.JdbcPagingItemReader;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration
public class BatchUpdateStudentAddressStep {

@Autowired
public StepBuilderFactory stepBuilderFactory;

@Autowired
@Qualifier(value = "studentItemReader")
private JdbcPagingItemReader<Student> studentItemReader;

@Autowired
@Qualifier(value = "studentItemUpdateAddress")
private JdbcBatchItemWriter<Student> studentItemUpdateAddress;

@Autowired
private AppendStudentAddressProcessor appendStudentAddressProcessor;

@Bean("batchUpdateStudentAddressStep1")
public Step batchUpdateStudentAddressStep1() {
return stepBuilderFactory.get("batchUpdateStudentAddressStep1")
.<Student, Student>chunk(1000)
.reader(studentItemReader)
// 住址追加 8
.processor(appendStudentAddressProcessor)
.writer(studentItemUpdateAddress)
.build();
}
}

BatchProcessStudentStep

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
import com.example.springbatchdemo.component.processor.StudentItemProcessor;
import com.example.springbatchdemo.entity.Student;
import org.springframework.batch.core.Step;
import org.springframework.batch.core.configuration.annotation.StepBuilderFactory;
import org.springframework.batch.item.database.JdbcBatchItemWriter;
import org.springframework.batch.item.database.JdbcPagingItemReader;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration
public class BatchProcessStudentStep {

@Autowired
public StepBuilderFactory stepBuilderFactory;

@Autowired
@Qualifier(value = "studentItemReader")
private JdbcPagingItemReader<Student> studentItemReader;

@Autowired
@Qualifier(value = "studentItemWriter")
private JdbcBatchItemWriter<Student> studentItemWriter;

@Autowired
private StudentItemProcessor studentItemProcessor;

@Bean("batchTransferStudentStep1")
public Step batchTransferStudentStep1() {
return stepBuilderFactory.get("batchTransferStudentStep1")
.<Student, Student>chunk(1000)
.reader(studentItemReader)
// 迁移数据; student_source -> student_target
.processor(studentItemProcessor)
.writer(studentItemWriter)
.build();
}
}
3.4 数据输入器
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
import com.example.springbatchdemo.component.reader.rowmapper.StudentRowMapper;
import com.example.springbatchdemo.entity.Person;
import com.example.springbatchdemo.entity.Student;
import org.springframework.batch.core.configuration.annotation.StepScope;
import org.springframework.batch.item.database.JdbcPagingItemReader;
import org.springframework.batch.item.database.Order;
import org.springframework.batch.item.database.builder.JdbcPagingItemReaderBuilder;
import org.springframework.batch.item.database.support.MySqlPagingQueryProvider;
import org.springframework.batch.item.file.FlatFileItemReader;
import org.springframework.batch.item.file.builder.FlatFileItemReaderBuilder;
import org.springframework.batch.item.file.mapping.BeanWrapperFieldSetMapper;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.core.io.ClassPathResource;
import javax.sql.DataSource;
import java.util.HashMap;
import java.util.Map;

@Configuration
public class CustomItemReader {

@Autowired
@Qualifier(value = "batchDemoDB")
private DataSource batchDemoDB;

@Bean("studentItemReader")
@StepScope
public JdbcPagingItemReader<Student> studentItemReader() {

MySqlPagingQueryProvider queryProvider = new MySqlPagingQueryProvider();
queryProvider.setSelectClause("student_id, name, address");
queryProvider.setFromClause("from student_source");

Map<String, Order> sortKeys = new HashMap<>(1);
sortKeys.put("student_id", Order.ASCENDING);
queryProvider.setSortKeys(sortKeys);

return new JdbcPagingItemReaderBuilder<Student>()
.name("studentItemReader")
.dataSource(batchDemoDB)
.fetchSize(1000)
.rowMapper(new StudentRowMapper())
.queryProvider(queryProvider)
.build();
}
}
3.5 数据处理器

AppendStudentNameOneProcessor

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
import com.example.springbatchdemo.entity.Student;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.batch.item.ItemProcessor;
import org.springframework.context.annotation.Configuration;

@Configuration
public class AppendStudentNameOneProcessor implements ItemProcessor<Student, Student> {

private static final Logger log = LoggerFactory.getLogger(AppendStudentNameOneProcessor.class);

@Override
public Student process(final Student studentSource) throws Exception {

final Long studentId = studentSource.getStudentId();
final String name = studentSource.getName();
final String address = studentSource.getAddress();

final Student studentTarget = new Student();
studentTarget.setStudentId(studentId);
studentTarget.setName(name.concat("_1"));
studentTarget.setAddress(address);

log.info("Converting ({}) into ({})", studentSource, studentTarget);

return studentTarget;
}
}

AppendStudentNameTwoProcessor

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
import com.example.springbatchdemo.entity.Student;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.batch.item.ItemProcessor;
import org.springframework.context.annotation.Configuration;

@Configuration
public class AppendStudentNameTwoProcessor implements ItemProcessor<Student, Student> {

private static final Logger log = LoggerFactory.getLogger(AppendStudentNameTwoProcessor.class);

@Override
public Student process(final Student studentSource) throws Exception {

final Long studentId = studentSource.getStudentId();
final String name = studentSource.getName();
final String address = studentSource.getAddress();

final Student studentTarget = new Student();
studentTarget.setStudentId(studentId);
studentTarget.setName(name.concat("_2"));
studentTarget.setAddress(address);

log.info("Converting ({}) into ({})", studentSource, studentTarget);

return studentTarget;
}
}

AppendStudentAddressProcessor

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
import com.example.springbatchdemo.entity.Student;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.batch.item.ItemProcessor;
import org.springframework.context.annotation.Configuration;

@Configuration
public class AppendStudentAddressProcessor implements ItemProcessor<Student, Student> {

private static final Logger log = LoggerFactory.getLogger(AppendStudentAddressProcessor.class);

@Override
public Student process(final Student studentSource) throws Exception {

final Long studentId = studentSource.getStudentId();
final String name = studentSource.getName();
final String address = studentSource.getAddress();

final Student studentTarget = new Student();
studentTarget.setStudentId(studentId);
studentTarget.setName(name);
studentTarget.setAddress(address.concat("_8"));

log.info("Converting ({}) into ({})", studentSource, studentTarget);

return studentTarget;
}
}

StudentItemProcessor

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
import com.example.springbatchdemo.entity.Student;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.batch.item.ItemProcessor;
import org.springframework.context.annotation.Configuration;

@Configuration
public class StudentItemProcessor implements ItemProcessor<Student, Student> {

private static final Logger log = LoggerFactory.getLogger(StudentItemProcessor.class);

@Override
public Student process(final Student studentSource) throws Exception {

final Long studentId = studentSource.getStudentId();
final String name = studentSource.getName();
final String address = studentSource.getAddress();

final Student studentTarget = new Student();
studentTarget.setStudentId(studentId);
studentTarget.setName(name);
studentTarget.setAddress(address);

log.info("Converting ({}) into ({})", studentSource, studentTarget);

return studentTarget;
}
}
3.6 数据输出器
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
import com.example.springbatchdemo.entity.Person;
import com.example.springbatchdemo.entity.Student;
import org.springframework.batch.core.configuration.annotation.StepScope;
import org.springframework.batch.item.database.BeanPropertyItemSqlParameterSourceProvider;
import org.springframework.batch.item.database.JdbcBatchItemWriter;
import org.springframework.batch.item.database.builder.JdbcBatchItemWriterBuilder;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import javax.sql.DataSource;

@Configuration
public class CustomItemWriter {

@Autowired
@Qualifier(value = "batchDemoDB")
private DataSource batchDemoDB;

@Bean("studentItemWriter")
public JdbcBatchItemWriter<Student> studentItemWriter() {

return new JdbcBatchItemWriterBuilder<Student>()
.itemSqlParameterSourceProvider(new BeanPropertyItemSqlParameterSourceProvider<>())
.sql("INSERT INTO student_target (student_id, name, address) VALUES (:studentId, :name, :address)")
.dataSource(batchDemoDB)
.build();
}

@Bean("studentItemUpdateName")
@StepScope
public JdbcBatchItemWriter<Student> studentItemUpdateName() {

return new JdbcBatchItemWriterBuilder<Student>()
.itemSqlParameterSourceProvider(new BeanPropertyItemSqlParameterSourceProvider<>())
.sql("UPDATE student_source SET name = :name WHERE student_id = :studentId")
.dataSource(batchDemoDB)
.build();
}

@Bean("studentItemUpdateAddress")
public JdbcBatchItemWriter<Student> studentItemUpdateAddress() {

return new JdbcBatchItemWriterBuilder<Student>()
.itemSqlParameterSourceProvider(new BeanPropertyItemSqlParameterSourceProvider<>())
.sql("UPDATE student_source SET address = :address WHERE student_id = :studentId")
.dataSource(batchDemoDB)
.build();
}
}

@StepScope:

从上面的 Step 配置可知,studentItemReader 被多个 Step 引用。默认情况下 studentItemReader 的生命周期是与 Job 保持一致,那么在多 Step 引用的情况下,就会抛出类似下面这种异常:

1
>Caused by: java.lang.IllegalStateException: Cannot open an already opened ItemReader, call close first

使用注解 StepScope,让 studentItemReader 的生命周期与 Step 保持同步,保证每个 Step 拿到的 ItemReader 都是新的实例。同样,ItemWriterItemProcessor 存在多 Step 引用的,都要使用该注解。

四、性能测试

测试数据量:100000

测试环境:Windows 10,i7-8核,MySQL-8.0.28

4.1 串行 Step

串行 Step 批处理,只需要按照顺序配置 Step(省略代码示例)。测试结果:

耗时:91s

4.2 并行 Step

测试结果:

耗时:68s

示例代码:spring-batch-demo


一、Spring Batch 性能优化指标

Spring Batch 是一款伸缩性非常好的批处理工具,既可以处理简单的任务,也可以处理复杂的、高容量的任务。在性能调优方面,Spring Batch 提供了丰富的接口支持,各项优化指标可归纳如下:

  • 多线程 Step:由独立线程执行提交块(a chunk of items)的输入、处理和输出过程;
  • 并行化 Step:对于可并行处理的 Step,交由不同的线程去处理;
  • 分片化 Step:通过 SPI(Serial Peripheral Interface),对 Step 分片执行;
  • 远程组块:对于输入无性能瓶颈,但处理和输出有性能瓶颈的任务,交由远程组块执行;

详见Spring文档

二、多线程 Step 配置

Spring Batch 执行一个 Step,会按照 chunk 配置的数量分批次提交。对于多线程 Step,由线程池去处理任务批次。因此,每个 chunk 都不用串行等待,这大大地提高了批处理性能。

配置多线程 Step 非常简单,可以通过 xml 或接口来配置。以接口配置为例:

1
2
3
4
5
6
7
8
9
10
11
@Bean
public Step sampleStep(TaskExecutor taskExecutor) {
return this.stepBuilderFactory.get("sampleStep")
.<String, String>chunk(10)
.reader(itemReader())
.writer(itemWriter())
.taskExecutor(taskExecutor)
// 节流配置, 不要超过线程池的最大线程数量
.throttleLimit(20)
.build();
}

此外,在配置多线程 Step 时,我们需要考虑得更多:

  • 线程池:推荐使用 Spring 线程池 ThreadPoolTaskExecutor,兼容性好;
  • 线程安全:输入器和输出器必须是线程安全的,否则可能会导致重复任务、脏数据等问题;
  • 框架节流:Spring Batch 自带节流器,默认最多可处理 4 个小任务,因此需要重新配置;

三、批处理配置

通过 Spring Batch 应用,迁移 100 万条数据。相关配置如下:

3.1 数据读取器
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
import com.example.springbatchdemo.component.reader.rowmapper.StudentRowMapper;
import com.example.springbatchdemo.entity.Student;
import org.springframework.batch.item.database.JdbcPagingItemReader;
import org.springframework.batch.item.database.Order;
import org.springframework.batch.item.database.builder.JdbcPagingItemReaderBuilder;
import org.springframework.batch.item.database.support.MySqlPagingQueryProvider;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import javax.sql.DataSource;
import java.util.HashMap;
import java.util.Map;

@Configuration
public class CustomItemReader {

@Autowired
@Qualifier(value = "batchDemoDB")
private DataSource batchDemoDB;

@Bean("studentItemReader")
public JdbcPagingItemReader<Student> studentItemReader() {

MySqlPagingQueryProvider queryProvider = new MySqlPagingQueryProvider();
queryProvider.setSelectClause("student_id, name, address");
queryProvider.setFromClause("from student_source");

Map<String, Order> sortKeys = new HashMap<>(1);
sortKeys.put("student_id", Order.ASCENDING);
queryProvider.setSortKeys(sortKeys);

return new JdbcPagingItemReaderBuilder<Student>()
.name("studentItemReader")
.dataSource(batchDemoDB)
.fetchSize(1000)
.rowMapper(new StudentRowMapper())
.queryProvider(queryProvider)
.build();
}
}
3.2 数据映射器
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
import com.example.springbatchdemo.entity.Student;
import org.springframework.jdbc.core.RowMapper;
import java.sql.ResultSet;
import java.sql.SQLException;

public class StudentRowMapper implements RowMapper<Student> {

@Override
public Student mapRow(ResultSet rs, int rowNum) throws SQLException {
Student student = new Student();
student.setStudentId(rs.getLong("student_id"));
student.setName(rs.getString("name"));
student.setAddress(rs.getString("address"));
return student;
}
}
3.3 数据处理器
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
import com.example.springbatchdemo.entity.Student;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.batch.item.ItemProcessor;
import org.springframework.context.annotation.Configuration;

@Configuration
public class StudentItemProcessor implements ItemProcessor<Student, Student> {

private static final Logger log = LoggerFactory.getLogger(StudentItemProcessor.class);

@Override
public Student process(final Student studentSource) throws Exception {

final Long studentId = studentSource.getStudentId();
final String name = studentSource.getName();
final String address = studentSource.getAddress();

final Student studentTarget = new Student();
studentTarget.setStudentId(studentId);
studentTarget.setName(name);
studentTarget.setAddress(address);

log.info("Converting ({}) into ({})", studentSource, studentTarget);

return studentTarget;
}
}
3.4 数据写入器
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
import com.example.springbatchdemo.entity.Student;
import org.springframework.batch.item.database.BeanPropertyItemSqlParameterSourceProvider;
import org.springframework.batch.item.database.JdbcBatchItemWriter;
import org.springframework.batch.item.database.builder.JdbcBatchItemWriterBuilder;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import javax.sql.DataSource;

@Configuration
public class CustomItemWriter {

@Autowired
@Qualifier(value = "batchDemoDB")
private DataSource batchDemoDB;

@Bean("studentItemWriter")
public JdbcBatchItemWriter<Student> studentItemWriter() {

return new JdbcBatchItemWriterBuilder<Student>()
.itemSqlParameterSourceProvider(new BeanPropertyItemSqlParameterSourceProvider<>())
.sql("INSERT INTO student_target (student_id, name, address) VALUES (:studentId, :name, :address)")
.dataSource(batchDemoDB)
.build();
}
}
3.5 Step 配置-单线程
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
import com.example.springbatchdemo.component.processor.StudentItemProcessor;
import com.example.springbatchdemo.entity.Student;
import org.springframework.batch.core.Step;
import org.springframework.batch.core.configuration.annotation.StepBuilderFactory;
import org.springframework.batch.item.database.JdbcBatchItemWriter;
import org.springframework.batch.item.database.JdbcPagingItemReader;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration
public class BatchProcessStudentStep {

@Autowired
public StepBuilderFactory stepBuilderFactory;

@Autowired
@Qualifier(value = "studentItemReader")
private JdbcPagingItemReader<Student> studentItemReader;

@Autowired
@Qualifier(value = "studentItemWriter")
private JdbcBatchItemWriter<Student> studentItemWriter;

@Autowired
private StudentItemProcessor studentItemProcessor;

@Bean("batchProcessStudentStep1")
public Step step1() {
return stepBuilderFactory.get("step1")
.<Student, Student>chunk(2000)
.reader(studentItemReader)
.processor(studentItemProcessor)
.writer(studentItemWriter)
.build();
}
}
3.6 Step 配置-多线程
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
import com.example.springbatchdemo.component.processor.StudentItemProcessor;
import com.example.springbatchdemo.entity.Student;
import org.springframework.batch.core.Step;
import org.springframework.batch.core.configuration.annotation.StepBuilderFactory;
import org.springframework.batch.item.database.JdbcBatchItemWriter;
import org.springframework.batch.item.database.JdbcPagingItemReader;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
import static com.example.springbatchdemo.config.ExecutorConfig.TASK_EXECUTOR;

@Configuration
public class BatchProcessStudentStep {

@Autowired
public StepBuilderFactory stepBuilderFactory;

@Autowired
@Qualifier(value = "studentItemReader")
private JdbcPagingItemReader<Student> studentItemReader;

@Autowired
@Qualifier(value = "studentItemWriter")
private JdbcBatchItemWriter<Student> studentItemWriter;

@Autowired
private StudentItemProcessor studentItemProcessor;

@Autowired
@Qualifier(value = TASK_EXECUTOR)
private ThreadPoolTaskExecutor taskExecutor;

@Bean("batchProcessStudentStep1")
public Step step1() {
return stepBuilderFactory.get("step1")
.<Student, Student>chunk(2000)
.reader(studentItemReader)
.processor(studentItemProcessor)
.writer(studentItemWriter)
.taskExecutor(taskExecutor)
.throttleLimit(30)
.build();
}
}
3.7 Job 配置
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
import com.example.springbatchdemo.component.listener.BatchProcessStudentCompletionListener;
import org.springframework.batch.core.Job;
import org.springframework.batch.core.Step;
import org.springframework.batch.core.configuration.annotation.EnableBatchProcessing;
import org.springframework.batch.core.configuration.annotation.JobBuilderFactory;
import org.springframework.batch.core.launch.support.RunIdIncrementer;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration
@EnableBatchProcessing
public class BatchProcessStudentJob {

@Autowired
public JobBuilderFactory jobBuilderFactory;

@Autowired
@Qualifier(value = "batchProcessStudentStep1")
private Step batchProcessStudentStep1;

@Autowired
private BatchProcessStudentCompletionListener batchProcessStudentCompletionListener;

@Bean
public Job transferStudentJob() {
return jobBuilderFactory.get("transferStudentJob")
.incrementer(new RunIdIncrementer())
.listener(batchProcessStudentCompletionListener)
.flow(batchProcessStudentStep1)
.end()
.build();
}
}
3.8 MySQL 数据源配置
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
import com.zaxxer.hikari.HikariDataSource;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.boot.jdbc.DataSourceBuilder;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Primary;
import javax.sql.DataSource;

@Configuration
public class DataSourceConfig {

@Primary
@Bean(name = "batchDemoDB")
// 数据源配置参数识别前缀, 根据具体配置来设定
@ConfigurationProperties(prefix = "spring.datasource.batch-demo")
public DataSource druidDataSource() {
// 使用 SpringBoot 默认的数据源 HikariDataSource
return DataSourceBuilder.create().type(HikariDataSource.class).build();
}
}
3.9 线程池配置
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
import java.util.concurrent.ThreadPoolExecutor;

@Configuration
public class ExecutorConfig {

public static final String TASK_EXECUTOR = "taskExecutor";

@Bean(TASK_EXECUTOR)
public ThreadPoolTaskExecutor taskExecutor() {
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
executor.setCorePoolSize(20);
executor.setMaxPoolSize(30);
executor.setQueueCapacity(10);
executor.setKeepAliveSeconds(60);
executor.setThreadNamePrefix("common-async-executor-");
executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
executor.setWaitForTasksToCompleteOnShutdown(true);
executor.setAwaitTerminationSeconds(60);
return executor;
}
}

四、批处理性能测试

4.1 单线程 Step

启动批处理任务,同步 100 万条数据。执行结果如下:

总耗时:313 秒

4.2 多线程 Step

启动批处理任务,同步 100 万条数据。执行结果如下:

总耗时:81 秒


性能提升超300%

五、总结

多线程 Stepchunk 任务交给线程池异步执行,可以显著地提升批处理的性能。但在多线程场景下,我们要了解 Spring Batch 的基础架构,避免并发导致的重复任务、脏数据等问题。

示例代码:spring-batch-demo


一、Spring Batch 数据输出器

Spring Batch 的数据输出器,是通过接口 ItemWriter 来实现的。针对常用的数据输出场景,Spring Batch 提供了丰富的组件支持(查看所有组件),本文介绍最常用的五个组件:

  • FlatFileItemWriter:输出文本数据;
  • JdbcBatchItemWriter:输出数据到数据库;
  • StaxEventItemWriter:输出 XML 文件数据;
  • JsonFileItemWriter:输出 JSON 文件数据;
  • ClassifierCompositeItemWriter:输出多文本数据;

二、简单使用

实体类 Ticket.class

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
import lombok.Data;
import java.math.BigDecimal;

@Data
public class Ticket {

/**
* 始发站
*/
private String departureStation;

/**
* 到达站
*/
private String arrivalStation;

/**
* 票价
*/
private BigDecimal price;

@Override
public String toString() {
return String.format("始发站: %s; 到达站: %s; 票价: %s", departureStation, arrivalStation, price);
}
}

文件 ticket.csv

1
2
3
4
5
合肥,蚌埠,60.00
南京,蚌埠,70.00
上海,蚌埠,220.00
上海,杭州,75.20
上海,昆山,19.00
2.1 FlatFileItemWriter-文本数据输出

ticket.csv 中的信息,转换为 JSON 字符串,输出到文件 ticket_output.txt 中:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
/**
* Job
*/
@Bean
public Job testFlatFileItemWriterJob() {
return jobBuilderFactory.get("testFlatFileItemWriterJob")
.incrementer(new RunIdIncrementer())
.flow(testFlatFileItemWriterStep)
.end()
.build();
}

/**
* Step
*/
@Bean("testFlatFileItemWriterStep")
public Step testFlatFileItemWriterStep(PlatformTransactionManager transactionManager) {
return stepBuilderFactory.get("testFlatFileItemWriterStep")
.transactionManager(transactionManager)
.reader(ticketFileItemReader)
.writer(ticketFileItemWriter)
.build();
}

/**
* Reader
*/
@Bean("ticketFileItemReader")
public FlatFileItemReader<Ticket> ticketFileItemReader() {
return new FlatFileItemReaderBuilder<Ticket>()
.name("ticketFileItemReader")
.resource(new ClassPathResource("ticket.csv"))
.delimited()
.names(new String[]{"departureStation", "arrivalStation", "price"})
.fieldSetMapper(new BeanWrapperFieldSetMapper<Ticket>() {{
setTargetType(Ticket.class);
}})
.build();
}

/**
* Writer
*/
@Bean("ticketFileItemWriter")
public FlatFileItemWriter<Ticket> ticketFileItemWriter() {

// 聚合器; JSON 序列化
LineAggregator<Ticket> aggregator = item -> {
try {
ObjectMapper mapper = new ObjectMapper();
return mapper.writeValueAsString(item);
} catch (JsonProcessingException e) {
LOGGER.error("parse object to json error: {}", e.getMessage(), e);
}
return "";
};

return new FlatFileItemWriterBuilder<Ticket>()
.name("ticketFileItemWriter")
.resource(new FileSystemResource("ticket_output.txt"))
.lineAggregator(aggregator)
.build();
}

输出文本数据,要求目标数据的格式为字符串,因此需要将 POJO 按照一定规则聚合成字符串。Spring Batch 已实现聚合器 LineAggregatorPassThroughLineAggregator(打印 POJO)、RecursiveCollectionLineAggregator(打印 POJO 列表)、DelimitedLineAggregator(分隔符拼接 POJO 字段值)、FormatterLineAggregator(格式化 POJO 字段值)。当然,我们也可以手动实现聚合器,例如示例代码中,将 POJO 转换为 JSON 格式。

启动应用,生成文件 ticket_output.txt

1
2
3
4
5
{"departureStation":"合肥","arrivalStation":"蚌埠","price":60.00}
{"departureStation":"南京","arrivalStation":"蚌埠","price":70.00}
{"departureStation":"上海","arrivalStation":"蚌埠","price":220.00}
{"departureStation":"上海","arrivalStation":"杭州","price":75.20}
{"departureStation":"上海","arrivalStation":"昆山","price":19.00}
2.2 JdbcBatchItemWriter-数据库数据输出

将文件 student.cvs 中的信息(内容如下),导入到 MySQL 数据表 student 中:

1
2
3
1,张三,合肥
2,李四,蚌埠
3,王二,南京
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
/**
* Job
*/
@Bean
public Job testDatabaseItemWriterJob() {
return jobBuilderFactory.get("testDatabaseItemWriterJob")
.incrementer(new RunIdIncrementer())
.flow(testDatabaseItemWriterStep)
.end()
.build();
}

/**
* Step
*/
@Bean("testDatabaseItemWriterStep")
public Step testDatabaseItemWriterStep(PlatformTransactionManager transactionManager) {
return stepBuilderFactory.get("testDatabaseItemWriterStep")
.transactionManager(transactionManager)
.<Student, Student>chunk(10)
.reader(studentFileItemReader)
.writer(studentItemWriter)
.build();
}

/**
* Reader
*/
@Bean("studentFileItemReader")
public FlatFileItemReader<Student> studentFileItemReader() {
return new FlatFileItemReaderBuilder<Student>()
.name("ticketFileItemReader")
.resource(new ClassPathResource("student.csv"))
.delimited()
.names(new String[]{"studentId", "name", "address"})
.fieldSetMapper(new BeanWrapperFieldSetMapper<Student>() {{
setTargetType(Student.class);
}})
.build();
}

/**
* Writer
*/
@Bean("studentItemWriter")
public JdbcBatchItemWriter<Student> studentItemWriter() {
return new JdbcBatchItemWriterBuilder<Student>()
.itemSqlParameterSourceProvider(new BeanPropertyItemSqlParameterSourceProvider<>())
.sql("INSERT INTO student_target (student_id, name, address) VALUES (:studentId, :name, :address)")
.dataSource(batchDemoDB)
.build();
}

/**
* MySQL 数据源配置
*/
@Primary
@Bean(name = "batchDemoDB")
@ConfigurationProperties(prefix = "spring.datasource.batch-demo")
public DataSource druidDataSource() {
return DataSourceBuilder.create().type(HikariDataSource.class).build();
}

启动应用,文件中的数据已导入表 student

2.3 StaxEventItemWriter-XML 文件数据输出

ticket.csv 中的信息,输出到文件 ticket_output.xml 中:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
/**
* Job
*/
@Bean
public Job testXmlItemWriterJob() {
return jobBuilderFactory.get("testXmlItemWriterJob")
.incrementer(new RunIdIncrementer())
.flow(testXmlItemWriterStep)
.build();
}

/**
* Step
*/
@Bean("testXmlItemWriterStep")
public Step testXmlItemWriterStep(PlatformTransactionManager transactionManager) {
return stepBuilderFactory.get("testXmlItemWriterStep")
.transactionManager(transactionManager)
.<Ticket, Ticket>chunk(10)
.reader(ticketFileItemReader)
.writer(ticketXmlItemWriter)
.build();
}


/**
* Reader
*/
@Bean("ticketFileItemReader")
public FlatFileItemReader<Ticket> ticketFileItemReader() {
return new FlatFileItemReaderBuilder<Ticket>()
.name("ticketFileItemReader")
.resource(new ClassPathResource("ticket.csv"))
.delimited()
.names(new String[]{"departureStation", "arrivalStation", "price"})
.fieldSetMapper(new BeanWrapperFieldSetMapper<Ticket>() {{
setTargetType(Ticket.class);
}})
.build();
}

/**
* Writer
*/
@Bean("ticketXmlItemWriter")
public StaxEventItemWriter<Ticket> ticketXmlItemWriter() {
return new StaxEventItemWriterBuilder<Ticket>()
.name("ticketXmlItemWriter")
.marshaller(ticketMarshaller)
.resource(new FileSystemResource("ticket_output.xml"))
.rootTagName("tickets")
.overwriteOutput(true)
.build();
}

/**
* 映射器
*/
@Bean("ticketMarshaller")
public XStreamMarshaller ticketMarshaller() {

Map<String, Class<Ticket>> aliases = new HashMap<>(1);
aliases.put("ticket", Ticket.class);

XStreamMarshaller marshaller = new XStreamMarshaller();
marshaller.setAliases(aliases);

return marshaller;
}

启动应用,生成文件 ticket_output.xml

1
<?xml version="1.0" encoding="UTF-8"?><tickets><ticket><departureStation>合肥</departureStation><arrivalStation>蚌埠</arrivalStation><price>60.00</price></ticket><ticket><departureStation>南京</departureStation><arrivalStation>蚌埠</arrivalStation><price>70.00</price></ticket><ticket><departureStation>上海</departureStation><arrivalStation>蚌埠</arrivalStation><price>220.00</price></ticket><ticket><departureStation>上海</departureStation><arrivalStation>杭州</arrivalStation><price>75.20</price></ticket><ticket><departureStation>上海</departureStation><arrivalStation>昆山</arrivalStation><price>19.00</price></ticket></tickets>
2.4 JsonFileItemWriter-JSON文件数据输出

ticket.csv 中的信息,输出到文件 ticket_output.json 中:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
/**
* Job
*/
@Bean
public Job testJsonItemWriterJob() {
return jobBuilderFactory.get("testJsonItemWriterJob")
.incrementer(new RunIdIncrementer())
.flow(testJsonItemWriterStep)
.end()
.build();
}

/**
* Step
*/
@Bean("testJsonItemWriterStep")
public Step testJsonItemWriterStep(PlatformTransactionManager transactionManager) {
return stepBuilderFactory.get("testJsonItemWriterStep")
.transactionManager(transactionManager)
.<Ticket, Ticket>chunk(10)
.reader(ticketFileItemReader)
.writer(ticketJsonItemWriter)
.build();
}

/**
* Reader
*/
@Bean("ticketFileItemReader")
public FlatFileItemReader<Ticket> ticketFileItemReader() {
return new FlatFileItemReaderBuilder<Ticket>()
.name("ticketFileItemReader")
.resource(new ClassPathResource("ticket.csv"))
.delimited()
.names(new String[]{"departureStation", "arrivalStation", "price"})
.fieldSetMapper(new BeanWrapperFieldSetMapper<Ticket>() {{
setTargetType(Ticket.class);
}})
.build();
}

/**
* Writer
*/
@Bean("ticketJsonItemWriter")
public JsonFileItemWriter<Ticket> ticketJsonItemWriter() {
return new JsonFileItemWriterBuilder<Ticket>()
.jsonObjectMarshaller(new JacksonJsonObjectMarshaller<>())
.resource(new FileSystemResource("ticket_output.json"))
.name("ticketJsonItemWriter")
.build();
}

启动应用,生成文件 ticket_output.json

1
2
3
4
5
6
7
[
{"departureStation":"合肥","arrivalStation":"蚌埠","price":60.00},
{"departureStation":"南京","arrivalStation":"蚌埠","price":70.00},
{"departureStation":"上海","arrivalStation":"蚌埠","price":220.00},
{"departureStation":"上海","arrivalStation":"杭州","price":75.20},
{"departureStation":"上海","arrivalStation":"昆山","price":19.00}
]
2.5 ClassifierCompositeItemWriter-输出多文本数据

将文件 ticket.csv 中始发站为上海的车票信息输出到文本中,其余的输出到 XML 文件中:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
/**
* Job
*/
@Bean
public Job testMultiFileItemWriterJob() {
return jobBuilderFactory.get("testMultiFileItemWriterJob")
.incrementer(new RunIdIncrementer())
.flow(testMultiFileItemWriterStep)
.end()
.build();
}

/**
* Step
*/
@Bean("testMultiFileItemWriterStep")
public Step testMultiFileItemWriterStep(PlatformTransactionManager transactionManager) {
return stepBuilderFactory.get("testMultiFileItemWriterStep")
.transactionManager(transactionManager)
.<Ticket, Ticket>chunk(10)
.reader(ticketFileItemReader)
.writer(ticketClassifierMultiFileItemWriter)
.stream(ticketFileItemWriter)
.stream(ticketXmlItemWriter)
.build();
}

/**
* Reader
*/
@Bean("ticketFileItemReader")
public FlatFileItemReader<Ticket> ticketFileItemReader() {
return new FlatFileItemReaderBuilder<Ticket>()
.name("ticketFileItemReader")
.resource(new ClassPathResource("ticket.csv"))
.delimited()
.names(new String[]{"departureStation", "arrivalStation", "price"})
.fieldSetMapper(new BeanWrapperFieldSetMapper<Ticket>() {{
setTargetType(Ticket.class);
}})
.build();
}

/**
* Classifier Writer
*/
@Bean("ticketClassifierMultiFileItemWriter")
public ClassifierCompositeItemWriter<Ticket> ticketClassifierMultiFileItemWriter() {
ClassifierCompositeItemWriter<Ticket> writer = new ClassifierCompositeItemWriter<>();
writer.setClassifier((Classifier<Ticket, ItemWriter<? super Ticket>>) ticket -> {
// 始发站是上海的, 输出到文本中, 否则输出到 XML 文件中
return "上海".equals(ticket.getDepartureStation()) ? ticketFileItemWriter() : ticketXmlItemWriter();
});
return writer;
}

/**
* 文本-Writer
*/
@Bean("ticketFileItemWriter")
public FlatFileItemWriter<Ticket> ticketFileItemWriter() {

// 聚合器; JSON 序列化
LineAggregator<Ticket> aggregator = item -> {
try {
ObjectMapper mapper = new ObjectMapper();
return mapper.writeValueAsString(item);
} catch (JsonProcessingException e) {
LOGGER.error("parse object to json error: {}", e.getMessage(), e);
}
return "";
};

return new FlatFileItemWriterBuilder<Ticket>()
.name("ticketFileItemWriter")
.resource(new FileSystemResource("ticket_output.txt"))
.lineAggregator(aggregator)
.build();
}

/**
* XML-Writer
*/
@Bean("ticketXmlItemWriter")
public StaxEventItemWriter<Ticket> ticketXmlItemWriter() {
return new StaxEventItemWriterBuilder<Ticket>()
.name("ticketXmlItemWriter")
.marshaller(ticketMarshaller)
.resource(new FileSystemResource("ticket_output.xml"))
.rootTagName("tickets")
.overwriteOutput(true)
.build();
}

/**
* 映射器
*/
@Bean("ticketMarshaller")
public XStreamMarshaller ticketMarshaller() {

Map<String, Class<Ticket>> aliases = new HashMap<>(1);
aliases.put("ticket", Ticket.class);

XStreamMarshaller marshaller = new XStreamMarshaller();
marshaller.setAliases(aliases);

return marshaller;
}

启动应用,生成文件如下:

ticket_output.txt

1
2
3
{"departureStation":"上海","arrivalStation":"蚌埠","price":220.00}
{"departureStation":"上海","arrivalStation":"杭州","price":75.20}
{"departureStation":"上海","arrivalStation":"昆山","price":19.00}

ticket_output.xml

1
<?xml version="1.0" encoding="UTF-8"?><tickets><ticket><departureStation>合肥</departureStation><arrivalStation>蚌埠</arrivalStation><price>60.00</price></ticket><ticket><departureStation>南京</departureStation><arrivalStation>蚌埠</arrivalStation><price>70.00</price></ticket></tickets>

示例代码:spring-batch-demo


一、Spring Batch 数据读取器

Spring Batch 的数据读取器,是通过接口 ItemReader 来实现的。针对常用的数据读取场景,Spring Batch 提供了丰富的组件支持(查看所有组件),本文介绍最常用的五个组件:

  • FlatFileItemReader:读取文本数据;
  • JdbcPagingItemReader:分页读取数据库的数据;
  • StaxEventItemReader:读取 XML 文件数据;
  • JsonItemReader:读取 JSON 文件数据;
  • MultiResourceItemReader:读取多文本数据;

二、简单使用

实体类 Ticket.class

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
import lombok.Data;
import java.math.BigDecimal;

@Data
public class Ticket {

/**
* 始发站
*/
private String departureStation;

/**
* 到达站
*/
private String arrivalStation;

/**
* 票价
*/
private BigDecimal price;

@Override
public String toString() {
return String.format("始发站: %s; 到达站: %s; 票价: %s", departureStation, arrivalStation, price);
}
}
2.1 FlatFileItemReader-文本数据读取

文件 ticket.csv

1
2
3
4
5
合肥,蚌埠,60.00
南京,蚌埠,70.00
上海,蚌埠,220.00
上海,杭州,75.20
上海,昆山,19.00

可以看到,文本数据的每一行代表一个 Ticket 实体,对象属性之间以英文逗号分隔。通过 FlatFileItemReader,可以按照行将文本数据转换为 POJO 存储。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
/**
* Job
*/
@Bean
public Job testFlatItemFileReaderJob() {
return jobBuilderFactory.get("testFlatItemFileReaderJob")
.incrementer(new RunIdIncrementer())
.flow(testFlatFileItemReaderStep)
.end()
.build();
}

/**
* Step
*/
@Bean("testFlatFileItemReaderStep")
public Step testFlatFileItemReaderStep(PlatformTransactionManager transactionManager) {
return stepBuilderFactory.get("testFlatFileItemReaderStep")
.transactionManager(transactionManager)
.<Ticket, Ticket>chunk(10)
.reader(ticketFileItemReader)
.writer(list -> list.forEach(System.out::println))
.build();
}

/**
* Reader
*/
@Bean("ticketFileItemReader")
public FlatFileItemReader<Ticket> ticketFileItemReader() {
return new FlatFileItemReaderBuilder<Ticket>()
.name("ticketFileItemReader")
.resource(new ClassPathResource("ticket.csv"))
.delimited()
.names(new String[]{"departureStation", "arrivalStation", "price"})
.fieldSetMapper(new BeanWrapperFieldSetMapper<Ticket>() {{
setTargetType(Ticket.class);
}})
.build();
}

启动应用,控制台打印日志:

1
2
3
4
5
6
7
8
2022-06-02 13:50:23.538  INFO 77808 --- [restartedMain] o.s.b.c.l.support.SimpleJobLauncher      : Job: [FlowJob: [name=testFlatItemFileReaderJob]] launched with the following parameters: [{run.id=2}]
2022-06-02 13:50:23.599 INFO 77808 --- [restartedMain] o.s.batch.core.job.SimpleStepHandler : Executing step: [testFlatFileItemReaderStep]
始发站: 合肥; 到达站: 蚌埠; 票价: 60.00
始发站: 南京; 到达站: 蚌埠; 票价: 70.00
始发站: 上海; 到达站: 蚌埠; 票价: 220.00
始发站: 上海; 到达站: 杭州; 票价: 75.20
始发站: 上海; 到达站: 昆山; 票价: 19.00
2022-06-02 13:50:23.680 INFO 77808 --- [restartedMain] o.s.batch.core.step.AbstractStep : Step: [testFlatFileItemReaderStep] executed in 79ms
2.2 JdbcPagingItemReader-数据库数据读取

MySQL 数据库,分页读取表 student 的数据,并打印数据内容。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
/**
* Job
*/
@Bean
public Job testDatabaseItemReaderJob() {
return jobBuilderFactory.get("testDatabaseItemReaderJob")
.incrementer(new RunIdIncrementer())
.flow(testDatabaseItemReaderStep)
.end()
.build();
}

/**
* Step
*/
@Bean("testDatabaseItemReaderStep")
public Step testDatabaseItemReaderStep(PlatformTransactionManager transactionManager) {
return stepBuilderFactory.get("testDatabaseItemReaderStep")
.transactionManager(transactionManager)
.<Student, Student>chunk(10)
.reader(studentItemReader)
.writer(list -> list.forEach(System.out::println))
.build();
}

/**
* Reader
*/
@Bean("studentItemReader")
public JdbcPagingItemReader<Student> studentItemReader() {

MySqlPagingQueryProvider queryProvider = new MySqlPagingQueryProvider();
queryProvider.setSelectClause("student_id, name, address");
queryProvider.setFromClause("from student_source");

Map<String, Order> sortKeys = new HashMap<>(1);
sortKeys.put("student_id", Order.ASCENDING);
queryProvider.setSortKeys(sortKeys);

return new JdbcPagingItemReaderBuilder<Student>()
.name("studentItemReader")
.dataSource(batchDemoDB)
.fetchSize(1000)
.rowMapper(new StudentRowMapper())
.queryProvider(queryProvider)
.build();
}

public class StudentRowMapper implements RowMapper<Student> {

/**
* Student 字段映射
*/
@Override
public Student mapRow(ResultSet rs, int rowNum) throws SQLException {

Student student = new Student();
student.setStudentId(rs.getLong("student_id"));
student.setName(rs.getString("name"));
student.setAddress(rs.getString("address"));
return student;
}
}

/**
* MySQL 数据源配置
*/
@Primary
@Bean(name = "batchDemoDB")
@ConfigurationProperties(prefix = "spring.datasource.batch-demo")
public DataSource druidDataSource() {
return DataSourceBuilder.create().type(HikariDataSource.class).build();
}

启动应用,控制台打印日志:

1
2
3
4
5
6
7
8
9
10
11
12
13
2022-06-02 14:00:19.010  INFO 67748 --- [restartedMain] o.s.b.c.l.support.SimpleJobLauncher      : Job: [FlowJob: [name=testDatabaseItemReaderJob]] launched with the following parameters: [{run.id=2}]
2022-06-02 14:00:19.107 INFO 67748 --- [restartedMain] o.s.batch.core.job.SimpleStepHandler : Executing step: [testDatabaseItemReaderStep]
name: 张三1, address: 上海市1
name: 张三2, address: 上海市2
name: 张三3, address: 上海市3
name: 张三4, address: 上海市4
name: 张三5, address: 上海市5
name: 张三6, address: 上海市6
name: 张三7, address: 上海市7
name: 张三8, address: 上海市8
name: 张三9, address: 上海市9
name: 张三10, address: 上海市10
2022-06-02 14:00:19.284 INFO 67748 --- [restartedMain] o.s.batch.core.step.AbstractStep : Step: [testDatabaseItemReaderStep] executed in 176ms
2.3 StaxEventItemReader-XML 数据读取

文件 ticket.xml

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
<?xml version="1.0" encoding="UTF-8"?>
<tickets>
<ticket>
<departureStation>合肥</departureStation>
<arrivalStation>蚌埠</arrivalStation>
<price>60.00</price>
</ticket>
<ticket>
<departureStation>南京</departureStation>
<arrivalStation>蚌埠</arrivalStation>
<price>70.00</price>
</ticket>
<ticket>
<departureStation>上海</departureStation>
<arrivalStation>蚌埠</arrivalStation>
<price>220.00</price>
</ticket>
<ticket>
<departureStation>上海</departureStation>
<arrivalStation>杭州</arrivalStation>
<price>75.20</price>
</ticket>
<ticket>
<departureStation>上海</departureStation>
<arrivalStation>昆山</arrivalStation>
<price>19.00</price>
</ticket>
</tickets>

可以看到,文件内容是多组 ticket 标签组成的,每一个标签代表一个 Ticket 实体;每个 ticket 标签,内含 3 个子标签,代表 Ticket 实体的 3 个属性值。

涉及到 XMLObject 的映射,因此需要引入 OXM 技术。推荐使用 spring oxm,pom 依赖:

1
2
3
4
5
6
7
8
9
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-oxm</artifactId>
</dependency>
<dependency>
<groupId>com.thoughtworks.xstream</groupId>
<artifactId>xstream</artifactId>
<version>1.4.11.1</version>
</dependency>
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
/**
* Job
*/
@Bean
public Job testXmlItemReaderJob() {
return jobBuilderFactory.get("testXmlItemReaderJob")
.incrementer(new RunIdIncrementer())
.flow(testXmlItemReaderStep)
.end()
.build();
}

/**
* Step
*/
@Bean("testXmlItemReaderStep")
public Step testXmlItemReaderStep(PlatformTransactionManager transactionManager) {
return stepBuilderFactory.get("testXmlItemReaderStep")
.transactionManager(transactionManager)
.<Ticket, Ticket>chunk(10)
.reader(ticketXmlItemReader)
.writer(list -> list.forEach(System.out::println))
.build();
}

/**
* Reader
*/
@Bean("ticketXmlItemReader")
public StaxEventItemReader<Ticket> itemReader() {
return new StaxEventItemReaderBuilder<Ticket>()
.name("ticketXmlItemReader")
.resource(new ClassPathResource("ticket.xml"))
.addFragmentRootElements("ticket")
.unmarshaller(ticketMarshaller)
.build();
}

/**
* 映射器
*/
@Bean("ticketMarshaller")
public XStreamMarshaller ticketMarshaller() {

Map<String, Class<Ticket>> aliases = new HashMap<>(1);
aliases.put("ticket", Ticket.class);

XStreamMarshaller marshaller = new XStreamMarshaller();
marshaller.setAliases(aliases);

return marshaller;
}

启动应用,控制台打印日志:

1
2
3
4
5
6
7
8
2022-06-02 14:15:48.444  INFO 87024 --- [restartedMain] o.s.b.c.l.support.SimpleJobLauncher      : Job: [FlowJob: [name=testXmlItemReaderJob]] launched with the following parameters: [{run.id=3}]
2022-06-02 14:15:48.503 INFO 87024 --- [restartedMain] o.s.batch.core.job.SimpleStepHandler : Executing step: [testXmlItemReaderStep]
始发站: 合肥; 到达站: 蚌埠; 票价: 60.00
始发站: 南京; 到达站: 蚌埠; 票价: 70.00
始发站: 上海; 到达站: 蚌埠; 票价: 220.00
始发站: 上海; 到达站: 杭州; 票价: 75.20
始发站: 上海; 到达站: 昆山; 票价: 19.00
2022-06-02 14:15:48.710 INFO 87024 --- [restartedMain] o.s.batch.core.step.AbstractStep : Step: [testXmlItemReaderStep] executed in 205ms
2.4 JsonItemReader-JSON 数据读取

文件 ticket.json

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
[
{
"departureStation": "合肥",
"arrivalStation": "蚌埠",
"price": "60.00"
},
{
"departureStation": "南京",
"arrivalStation": "蚌埠",
"price": "70.00"
},
{
"departureStation": "上海",
"arrivalStation": "蚌埠",
"price": "220.00"
},
{
"departureStation": "上海",
"arrivalStation": "杭州",
"price": "75.20"
},
{
"departureStation": "上海",
"arrivalStation": "昆山",
"price": "19.00"
}
]
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
/**
* Job
*/
@Bean
public Job testJsonItemReaderJob() {
return jobBuilderFactory.get("testJsonItemReaderJob")
.incrementer(new RunIdIncrementer())
.flow(testJsonItemReaderStep)
.end()
.build();
}

/**
* Step
*/
@Bean("testJsonItemReaderStep")
public Step testJsonItemReaderStep(PlatformTransactionManager transactionManager) {
return stepBuilderFactory.get("testJsonItemReaderStep")
.transactionManager(transactionManager)
.<Ticket, Ticket>chunk(10)
.reader(ticketJsonItemReader)
.writer(list -> list.forEach(System.out::println))
.build();
}

/**
* Reader
*/
@Bean("ticketJsonItemReader")
public JsonItemReader<Ticket> ticketJsonItemReader() {
return new JsonItemReaderBuilder<Ticket>()
.name("ticketJsonItemReader")
.jsonObjectReader(new JacksonJsonObjectReader<>(Ticket.class))
.resource(new ClassPathResource("ticket.json"))
.build();
}

启动应用,控制台打印日志:

1
2
3
4
5
6
7
8
2022-06-02 14:25:38.142  INFO 76544 --- [restartedMain] o.s.b.c.l.support.SimpleJobLauncher      : Job: [FlowJob: [name=testJsonItemReaderJob]] launched with the following parameters: [{run.id=2}]
2022-06-02 14:25:38.211 INFO 76544 --- [restartedMain] o.s.batch.core.job.SimpleStepHandler : Executing step: [testJsonItemReaderStep]
始发站: 合肥; 到达站: 蚌埠; 票价: 60.00
始发站: 南京; 到达站: 蚌埠; 票价: 70.00
始发站: 上海; 到达站: 蚌埠; 票价: 220.00
始发站: 上海; 到达站: 杭州; 票价: 75.20
始发站: 上海; 到达站: 昆山; 票价: 19.00
2022-06-02 14:25:38.328 INFO 76544 --- [restartedMain] o.s.batch.core.step.AbstractStep : Step: [testJsonItemReaderStep] executed in 116ms
2.5 MultiResourceItemReader-多文本数据读取

多文本数据读取,与文本数据读取的原理一致,只是在其基础上,做了一层代理。多文本数据读取,要求每个文本的数据结构相同,如从 ticket-1.cvsticket-2.cvs 两个文件中读取数据:

1
2
3
合肥,蚌埠,60.00
南京,蚌埠,70.00
上海,蚌埠,220.00
1
2
上海,杭州,75.20
上海,昆山,19.00
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
/**
* Job
*/
@Bean
public Job testMultiFileItemReaderJob() {
return jobBuilderFactory.get("testMultiFileItemReaderJob")
.incrementer(new RunIdIncrementer())
.flow(testMultiFileItemReaderStep)
.end()
.build();
}

/**
* Step
*/
@Bean("testMultiFileItemReaderStep")
public Step testMultiFileItemReaderStep1(PlatformTransactionManager transactionManager) {
return stepBuilderFactory.get("testMultiFileItemReaderStep")
.transactionManager(transactionManager)
.<Ticket, Ticket>chunk(10)
.reader(ticketMultiFileItemReader)
.writer(list -> list.forEach(System.out::println))
.build();
}

/**
* Proxy Reader
*/
@Bean("ticketMultiFileItemReader")
public MultiResourceItemReader<Ticket> ticketMultiFileItemReader() {

// 资源文件
Resource[] resources = new Resource[]{
new ClassPathResource("ticket-1.csv"),
new ClassPathResource("ticket-2.csv")};

return new MultiResourceItemReaderBuilder<Ticket>()
.name("ticketMultiFileItemReader")
.delegate(commonTicketFileItemReader())
.resources(resources)
.build();
}

/**
* Reader
*/
@Bean("commonTicketFileItemReader")
public FlatFileItemReader<Ticket> commonTicketFileItemReader() {
return new FlatFileItemReaderBuilder<Ticket>()
.name("commonTicketFileItemReader")
.delimited()
.names(new String[]{"departureStation", "arrivalStation", "price"})
.fieldSetMapper(new BeanWrapperFieldSetMapper<Ticket>() {{
setTargetType(Ticket.class);
}})
.build();
}

启动程序,控制台打印日志:

1
2
3
4
5
6
7
8
2022-06-02 14:37:49.693  INFO 86124 --- [restartedMain] o.s.b.c.l.support.SimpleJobLauncher      : Job: [FlowJob: [name=testMultiFileItemReaderJob]] launched with the following parameters: [{run.id=2}]
2022-06-02 14:37:49.785 INFO 86124 --- [restartedMain] o.s.batch.core.job.SimpleStepHandler : Executing step: [testMultiFileItemReaderStep]
始发站: 合肥; 到达站: 蚌埠; 票价: 60.00
始发站: 南京; 到达站: 蚌埠; 票价: 70.00
始发站: 上海; 到达站: 蚌埠; 票价: 220.00
始发站: 上海; 到达站: 杭州; 票价: 75.20
始发站: 上海; 到达站: 昆山; 票价: 19.00
2022-06-02 14:37:49.944 INFO 86124 --- [restartedMain] o.s.batch.core.step.AbstractStep : Step: [testMultiFileItemReaderStep] executed in 157ms

示例代码:spring-batch-demo


一、Spring Batch 监听器

在批处理过程中,需要对一些关键节点,如启动、结束、抛异常等,添加额外的处理。关注节点,需要借助 Spring Batch 监听器。Spring Batch 提供了两个维度的监听器:

Job 层面:

  • JobExecutionListener: 在 Job 执行之前(beforeJob)、之后(afterJob)触发;

Step 层面:

  • ChunkListener: 在 Chunk 执行之前(beforeChunk)、之后(afterChunk)和异常后(afterChunkError)触发;
  • StepExecutionListener: 在 Step 执行之前(beforeStep)、之后(afterStep)触发;
  • ItemReadListener: 在 Read 执行之前(beforeRead)、之后(afterRead)和异常时(onReadError)触发;
  • ItemProcessListener: 在 Process 执行之前(beforeProcess)、之后(afterProcess)和异常时(onProcessError)触发;
  • ItemWriteListener: 在 Write 执行之前(beforeWrite)、之后(afterWrite)和异常时(onWriteError)触发;

二、简单使用

将文件 ticket.cvs 中的内容,打印出来:

1
2
3
4
5
合肥,蚌埠,60.00
南京,蚌埠,70.00
上海,蚌埠,220.00
上海,杭州,75.20
上海,昆山,19.00

实体类:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
import lombok.Data;
import java.math.BigDecimal;

@Data
public class Ticket {

/**
* 始发站
*/
private String departureStation;

/**
* 到达站
*/
private String arrivalStation;

/**
* 票价
*/
private BigDecimal price;

@Override
public String toString() {
return String.format("始发站: %s; 到达站: %s; 票价: %s", departureStation, arrivalStation, price);
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
/**
* Job
*/
@Bean
public Job testListenerJob() {
return jobBuilderFactory.get("testListenerJob")
.incrementer(new RunIdIncrementer())
// job 监听器
.listener(testJobListener)
.flow(testListenerStep)
.end()
.build();
}

/**
* Step
*/
@Bean("testListenerStep")
public Step testListenerStep(PlatformTransactionManager transactionManager) {
return stepBuilderFactory.get("testListenerStep")
// step 监听器
.listener(testStepListener)
.transactionManager(transactionManager)
.<Ticket, Ticket>chunk(2)
.faultTolerant()
// chunk 监听器
.listener(testChunkListener)
.reader(ticketFileItemReader)
// read 监听器
.listener(testReadListener)
.processor(ticketItemProcessor)
// process 监听器
.listener(testProcessListener)
.writer(list -> list.forEach(System.out::println))
// write 监听器
.listener(testWriteListener)
.build();
}

/**
* Reader
*/
public FlatFileItemReader<Ticket> ticketFileItemReader() {
return new FlatFileItemReaderBuilder<Ticket>()
.name("ticketFileItemReader")
.resource(new ClassPathResource("ticket.csv"))
.delimited()
.names(new String[]{"departureStation", "arrivalStation", "price"})
.fieldSetMapper(new BeanWrapperFieldSetMapper<Ticket>() {{
setTargetType(Ticket.class);
}})
.build();
}

/**
* Processor
*/
@Component
public class TicketItemProcessor implements ItemProcessor<Ticket, Ticket> {

private static final Logger log = LoggerFactory.getLogger(TicketItemProcessor.class);

@Override
public Ticket process(final Ticket ticketSource) throws Exception {

final String departureStation = ticketSource.getDepartureStation();
final String arrivalStation = ticketSource.getArrivalStation();
final BigDecimal price = ticketSource.getPrice();

final Ticket ticketTarget = new Ticket();
ticketTarget.setDepartureStation(departureStation);
ticketTarget.setArrivalStation(arrivalStation);
ticketTarget.setPrice(price);

return ticketTarget;
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
/**
* Job Listener
*/
@Component
public class TestJobListener extends JobExecutionListenerSupport {

private static final Logger log = LoggerFactory.getLogger(TestJobListener.class);

@Override
public void beforeJob(JobExecution jobExecution) {
log.info("before job: {}", jobExecution.getJobInstance().getJobName());
}

@Override
public void afterJob(JobExecution jobExecution) {
log.info("after job: {}", jobExecution.getJobInstance().getJobName());
}
}

/**
* Chunk Listener
*/
@Component
public class TestChunkListener extends ChunkListenerSupport {

private static final Logger log = LoggerFactory.getLogger(TestChunkListener.class);

@Override
public void beforeChunk(ChunkContext context) {
log.info("before chunk: {}", context.getStepContext().getStepName());
}

@Override
public void afterChunk(ChunkContext context) {
log.info("after chunk: {}", context.getStepContext().getStepName());
}

@Override
public void afterChunkError(ChunkContext context) {
log.info("after chunk error: {}", context.getStepContext().getStepName());
}
}

/**
* Read Listener
*/
@Component
public class TestReadListener implements ItemReadListener<Ticket> {

private static final Logger log = LoggerFactory.getLogger(TestReadListener.class);

@Override
public void beforeRead() {
log.info("before read");
}

@Override
public void afterRead(Ticket item) {
log.info("after read: {}", item);
}

@Override
public void onReadError(Exception ex) {
log.info("read item error: {}", ex.getMessage(), ex);
}
}

/**
* Process Listener
*/
@Component
public class TestProcessListener implements ItemProcessListener<Ticket, Ticket> {

private static final Logger log = LoggerFactory.getLogger(TestProcessListener.class);

@Override
public void beforeProcess(Ticket item) {
log.info("before process: {}", item);

}

@Override
public void afterProcess(Ticket item, Ticket result) {
log.info("after process: {}", item);
}

@Override
public void onProcessError(Ticket item, Exception e) {
log.info("process: {} error: {}", item, e.getMessage(), e);
}
}

/**
* Write Listener
*/
@Component
public class TestWriteListener implements ItemWriteListener<Ticket> {

private static final Logger log = LoggerFactory.getLogger(TestWriteListener.class);

@Override
public void beforeWrite(List<? extends Ticket> items) {
log.info("before write: {}", items);
}

@Override
public void afterWrite(List<? extends Ticket> items) {
log.info("after write: {}", items);
}

@Override
public void onWriteError(Exception exception, List<? extends Ticket> items) {
log.info("write item error: {}", exception.getMessage(), exception);
}
}

启动应用,打印日志:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
2022-06-12 19:31:13.774  INFO 33680 --- [restartedMain] o.s.b.c.l.support.SimpleJobLauncher      : Job: [FlowJob: [name=testListenerJob]] launched with the following parameters: [{run.id=4}]
2022-06-12 19:31:13.820 INFO 33680 --- [restartedMain] c.e.s.c.listener.job.TestJobListener : before job: testListenerJob
2022-06-12 19:31:13.858 INFO 33680 --- [restartedMain] o.s.batch.core.job.SimpleStepHandler : Executing step: [testListenerStep]
2022-06-12 19:31:13.867 INFO 33680 --- [restartedMain] c.e.s.c.listener.step.TestStepListener : before step: testListenerStep
2022-06-12 19:31:13.889 INFO 33680 --- [restartedMain] c.e.s.c.l.chunk.TestChunkListener : before chunk: testListenerStep
2022-06-12 19:31:13.891 INFO 33680 --- [restartedMain] c.e.s.c.l.reader.TestReadListener : before read
2022-06-12 19:31:13.905 INFO 33680 --- [restartedMain] c.e.s.c.l.reader.TestReadListener : after read: 始发站: 合肥; 到达站: 蚌埠; 票价: 60.00
2022-06-12 19:31:13.906 INFO 33680 --- [restartedMain] c.e.s.c.l.reader.TestReadListener : before read
2022-06-12 19:31:13.907 INFO 33680 --- [restartedMain] c.e.s.c.l.reader.TestReadListener : after read: 始发站: 南京; 到达站: 蚌埠; 票价: 70.00
2022-06-12 19:31:13.911 INFO 33680 --- [restartedMain] c.e.s.c.l.processor.TestProcessListener : before process: 始发站: 合肥; 到达站: 蚌埠; 票价: 60.00
2022-06-12 19:31:13.912 INFO 33680 --- [restartedMain] c.e.s.c.l.processor.TestProcessListener : after process: 始发站: 合肥; 到达站: 蚌埠; 票价: 60.00
2022-06-12 19:31:13.912 INFO 33680 --- [restartedMain] c.e.s.c.l.processor.TestProcessListener : before process: 始发站: 南京; 到达站: 蚌埠; 票价: 70.00
2022-06-12 19:31:13.912 INFO 33680 --- [restartedMain] c.e.s.c.l.processor.TestProcessListener : after process: 始发站: 南京; 到达站: 蚌埠; 票价: 70.00
2022-06-12 19:31:13.913 INFO 33680 --- [restartedMain] c.e.s.c.l.writer.TestWriteListener : before write: [始发站: 合肥; 到达站: 蚌埠; 票价: 60.00, 始发站: 南京; 到达站: 蚌埠; 票价: 70.00]
始发站: 合肥; 到达站: 蚌埠; 票价: 60.00
始发站: 南京; 到达站: 蚌埠; 票价: 70.00
2022-06-12 19:31:13.914 INFO 33680 --- [restartedMain] c.e.s.c.l.writer.TestWriteListener : after write: [始发站: 合肥; 到达站: 蚌埠; 票价: 60.00, 始发站: 南京; 到达站: 蚌埠; 票价: 70.00]
2022-06-12 19:31:13.928 INFO 33680 --- [restartedMain] c.e.s.c.l.chunk.TestChunkListener : after chunk: testListenerStep
2022-06-12 19:31:13.929 INFO 33680 --- [restartedMain] c.e.s.c.l.chunk.TestChunkListener : before chunk: testListenerStep
2022-06-12 19:31:13.929 INFO 33680 --- [restartedMain] c.e.s.c.l.reader.TestReadListener : before read
2022-06-12 19:31:13.930 INFO 33680 --- [restartedMain] c.e.s.c.l.reader.TestReadListener : after read: 始发站: 上海; 到达站: 蚌埠; 票价: 220.00
2022-06-12 19:31:13.930 INFO 33680 --- [restartedMain] c.e.s.c.l.reader.TestReadListener : before read
2022-06-12 19:31:13.931 INFO 33680 --- [restartedMain] c.e.s.c.l.reader.TestReadListener : after read: 始发站: 上海; 到达站: 杭州; 票价: 75.20
2022-06-12 19:31:13.931 INFO 33680 --- [restartedMain] c.e.s.c.l.processor.TestProcessListener : before process: 始发站: 上海; 到达站: 蚌埠; 票价: 220.00
2022-06-12 19:31:13.931 INFO 33680 --- [restartedMain] c.e.s.c.l.processor.TestProcessListener : after process: 始发站: 上海; 到达站: 蚌埠; 票价: 220.00
2022-06-12 19:31:13.931 INFO 33680 --- [restartedMain] c.e.s.c.l.processor.TestProcessListener : before process: 始发站: 上海; 到达站: 杭州; 票价: 75.20
2022-06-12 19:31:13.932 INFO 33680 --- [restartedMain] c.e.s.c.l.processor.TestProcessListener : after process: 始发站: 上海; 到达站: 杭州; 票价: 75.20
2022-06-12 19:31:13.932 INFO 33680 --- [restartedMain] c.e.s.c.l.writer.TestWriteListener : before write: [始发站: 上海; 到达站: 蚌埠; 票价: 220.00, 始发站: 上海; 到达站: 杭州; 票价: 75.20]
始发站: 上海; 到达站: 蚌埠; 票价: 220.00
始发站: 上海; 到达站: 杭州; 票价: 75.20
2022-06-12 19:31:13.932 INFO 33680 --- [restartedMain] c.e.s.c.l.writer.TestWriteListener : after write: [始发站: 上海; 到达站: 蚌埠; 票价: 220.00, 始发站: 上海; 到达站: 杭州; 票价: 75.20]
2022-06-12 19:31:13.943 INFO 33680 --- [restartedMain] c.e.s.c.l.chunk.TestChunkListener : after chunk: testListenerStep
2022-06-12 19:31:13.944 INFO 33680 --- [restartedMain] c.e.s.c.l.chunk.TestChunkListener : before chunk: testListenerStep
2022-06-12 19:31:13.944 INFO 33680 --- [restartedMain] c.e.s.c.l.reader.TestReadListener : before read
2022-06-12 19:31:13.945 INFO 33680 --- [restartedMain] c.e.s.c.l.reader.TestReadListener : after read: 始发站: 上海; 到达站: 昆山; 票价: 19.00
2022-06-12 19:31:13.945 INFO 33680 --- [restartedMain] c.e.s.c.l.reader.TestReadListener : before read
2022-06-12 19:31:13.945 INFO 33680 --- [restartedMain] c.e.s.c.l.processor.TestProcessListener : before process: 始发站: 上海; 到达站: 昆山; 票价: 19.00
2022-06-12 19:31:13.945 INFO 33680 --- [restartedMain] c.e.s.c.l.processor.TestProcessListener : after process: 始发站: 上海; 到达站: 昆山; 票价: 19.00
2022-06-12 19:31:13.946 INFO 33680 --- [restartedMain] c.e.s.c.l.writer.TestWriteListener : before write: [始发站: 上海; 到达站: 昆山; 票价: 19.00]
始发站: 上海; 到达站: 昆山; 票价: 19.00
2022-06-12 19:31:13.946 INFO 33680 --- [restartedMain] c.e.s.c.l.writer.TestWriteListener : after write: [始发站: 上海; 到达站: 昆山; 票价: 19.00]
2022-06-12 19:31:13.959 INFO 33680 --- [restartedMain] c.e.s.c.l.chunk.TestChunkListener : after chunk: testListenerStep
2022-06-12 19:31:13.959 INFO 33680 --- [restartedMain] c.e.s.c.listener.step.TestStepListener : after step: testListenerStep
2022-06-12 19:31:13.962 INFO 33680 --- [restartedMain] o.s.batch.core.step.AbstractStep : Step: [testListenerStep] executed in 104ms
2022-06-12 19:31:13.978 INFO 33680 --- [restartedMain] c.e.s.c.listener.job.TestJobListener : after job: testListenerJob
2022-06-12 19:31:13.997 INFO 33680 --- [restartedMain] o.s.b.c.l.support.SimpleJobLauncher : Job: [FlowJob: [name=testListenerJob]] completed with the following parameters: [{run.id=4}] and the following status: [COMPLETED] in 178ms

从日志可以看出:

  • Job 、Step 监听器贯穿任务的始终;

  • 每一个 chunk 中,执行 2 次读、2 次处理、1 次写;

  • 每一次 read 过程,触发 beforeRead()、afterRead();

  • 每一次 process 过程,触发 beforeProcess()、afterProcess();

  • 每一次 write 过程,触发 beforeWrite()、afterWrite();

示例代码:spring-batch-demo


一、项目简介

上一节介绍了 Spring Batch 的基础架构和设计原理,本节将通过一个简单的批处理任务来学习如何使用 Spring Batch。

现在需要将逗号分隔值文件 sample-data.cvs 中的数据,按照姓氏、名称拆分,导入数据表 person 中。看图可知,BatchProcessJob 只有一个 Step,分为三个部分:解析 cvs 文件;将文件数据转化为 Person 对象;将 Person 对象信息导入数据表 batch-demo.person

二、项目搭建与配置

使用Spring 应用构建工具,即可创建项目。我的应用部署地址:spring-batch-demo

三、创建数据表

Spring Batch 的组件JobRepository, 专门负责与数据库打交道,记录整个批处理中的增加、检索、更新、删除动作。也就是说,Spring Batch 是依赖数据库进行管理的。数据表创建脚本可在仓库中找到。MySQL 数据库的脚本文件为:schema-mysql.sql。框架依赖表建好后,不要忘了创建表 batch-demo.person

1
2
3
4
5
6
7
USE batch-demo;
CREATE TABLE `person` (
`person_id` bigint(30) unsigned NOT NULL AUTO_INCREMENT,
`first_name` varchar(10) COLLATE utf8mb4_general_ci DEFAULT NULL,
`last_name` varchar(20) COLLATE utf8mb4_general_ci DEFAULT NULL,
PRIMARY KEY (`person_id`)
) ENGINE=InnoDB AUTO_INCREMENT=142 DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_general_ci COMMENT='人员信息表';

四、批处理任务配置

根据批处理框架的运作流程,我们做出如下配置:

4.1 cvs文件内容读取器:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
import com.example.springbatchdemo.entity.Person;
import org.springframework.batch.item.file.FlatFileItemReader;
import org.springframework.batch.item.file.builder.FlatFileItemReaderBuilder;
import org.springframework.batch.item.file.mapping.BeanWrapperFieldSetMapper;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.core.io.ClassPathResource;

@Configuration
public class CustomItemReader {

@Bean("personItemReader")
public FlatFileItemReader<Person> personItemReader() {
return new FlatFileItemReaderBuilder<Person>()
.name("personItemReader")
.resource(new ClassPathResource("sample-data.csv"))
.delimited()
.names(new String[]{"firstName", "lastName"})
.fieldSetMapper(new BeanWrapperFieldSetMapper<Person>() {{
setTargetType(Person.class);
}})
.build();
}
}

4.2 解析数据处理器:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
import com.example.springbatchdemo.entity.Person;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.batch.item.ItemProcessor;
import org.springframework.context.annotation.Configuration;

@Configuration
public class PersonItemProcessor implements ItemProcessor<Person, Person> {

private static final Logger log = LoggerFactory.getLogger(PersonItemProcessor.class);

@Override
public Person process(final Person person) throws Exception {

final String firstName = person.getFirstName();
final String lastName = person.getLastName();

final Person transformedPerson = new Person();
transformedPerson.setFirstName(firstName);
transformedPerson.setLastName(lastName);

log.info("Converting ({}) into ({})", person, transformedPerson);

return transformedPerson;
}
}

4.3 Person 对象写入器:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
import com.example.springbatchdemo.entity.Person;
import org.springframework.batch.item.database.BeanPropertyItemSqlParameterSourceProvider;
import org.springframework.batch.item.database.JdbcBatchItemWriter;
import org.springframework.batch.item.database.builder.JdbcBatchItemWriterBuilder;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import javax.sql.DataSource;

@Configuration
public class CustomItemWriter {

@Autowired
@Qualifier(value = "batchDemoDB")
private DataSource batchDemoDB;

@Bean("personItemWriter")
public JdbcBatchItemWriter<Person> personItemWriter() {

return new JdbcBatchItemWriterBuilder<Person>()
.itemSqlParameterSourceProvider(new BeanPropertyItemSqlParameterSourceProvider<>())
.sql("INSERT INTO person (first_name, last_name) VALUES (:firstName, :lastName)")
.dataSource(batchDemoDB)
.build();
}
}

4.4 MySQL 数据源配置:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
import com.zaxxer.hikari.HikariDataSource;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.boot.jdbc.DataSourceBuilder;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Primary;
import javax.sql.DataSource;

@Configuration
public class DataSourceConfig {

@Primary
@Bean(name = "batchDemoDB")
// 数据源配置参数识别前缀, 根据具体配置来设定
@ConfigurationProperties(prefix = "spring.datasource.batch-demo")
public DataSource druidDataSource() {
// 使用 SpringBoot 默认的数据源 HikariDataSource
return DataSourceBuilder.create().type(HikariDataSource.class).build();
}
}

4.5 Step 配置

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
import com.example.springbatchdemo.component.processor.PersonItemProcessor;
import com.example.springbatchdemo.entity.Person;
import org.springframework.batch.core.Step;
import org.springframework.batch.core.configuration.annotation.StepBuilderFactory;
import org.springframework.batch.item.database.JdbcBatchItemWriter;
import org.springframework.batch.item.file.FlatFileItemReader;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;

@Configuration
public class BatchProcessPersonStep {

@Autowired
public StepBuilderFactory stepBuilderFactory;

@Autowired
@Qualifier(value = "personItemReader")
private FlatFileItemReader<Person> personItemReader;

@Autowired
@Qualifier(value = "personItemWriter")
private JdbcBatchItemWriter<Person> personItemWriter;

@Autowired
private PersonItemProcessor personItemProcessor;

@Bean("batchProcessPersonStep1")
public Step step1() {
return stepBuilderFactory.get("step1")
.<Person, Person>chunk(10)
.reader(personItemReader)
.processor(personItemProcessor)
.writer(personItemWriter)
.build();
}
}

4.6 Job 配置

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
import com.example.springbatchdemo.component.listener.BatchProcessPersonCompletionListener;
import org.springframework.batch.core.Job;
import org.springframework.batch.core.Step;
import org.springframework.batch.core.configuration.annotation.EnableBatchProcessing;
import org.springframework.batch.core.configuration.annotation.JobBuilderFactory;
import org.springframework.batch.core.launch.support.RunIdIncrementer;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration
@EnableBatchProcessing
public class BatchProcessPersonJob {

@Autowired
public JobBuilderFactory jobBuilderFactory;

@Autowired
@Qualifier(value = "batchProcessPersonStep1")
private Step batchProcessPersonStep1;

@Autowired
private BatchProcessPersonCompletionListener batchProcessPersonCompletionListener;

@Bean
public Job importUserJob() {
return jobBuilderFactory.get("importUserJob")
.preventRestart()
.incrementer(new RunIdIncrementer())
.listener(batchProcessPersonCompletionListener)
.flow(batchProcessPersonStep1)
.end()
.build();
}
}

4.7 任务状态监听器

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
import com.example.springbatchdemo.entity.Person;
import com.example.springbatchdemo.mapper.PersonMapper;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.batch.core.BatchStatus;
import org.springframework.batch.core.JobExecution;
import org.springframework.batch.core.listener.JobExecutionListenerSupport;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import java.util.List;

@Component
public class BatchProcessPersonCompletionListener extends JobExecutionListenerSupport {

private static final Logger log = LoggerFactory.getLogger(BatchProcessPersonCompletionListener.class);

@Autowired
private PersonMapper personMapper;

@Override
public void afterJob(JobExecution jobExecution) {
if (BatchStatus.COMPLETED.equals(jobExecution.getStatus())) {
log.info("Job finished! Time to verify the results");

// spring-mybatis 查询所有的人员信息
List<Person> personList = personMapper.queryAll();
personList.forEach(person -> log.info("Found <{}> in the database.", person));
}
}
}

4.8 cvs 测试数据文件配置

五、执行批处理任务

运行 SpringBoot 项目,JobLauncher 自动发起任务 importUserJob。任务执行结果如下:

查看表 batch-demo.personcvs 文件内的测试数据已成功导入数据表!