一、Spring Batch 性能优化指标
Spring Batch 是一款伸缩性非常好的批处理工具,既可以处理简单的任务,也可以处理复杂的、高容量的任务。在性能调优方面,Spring Batch 提供了丰富的接口支持,各项优化指标可归纳如下:
- 多线程
Step
:由独立线程执行提交块(a chunk of items)的输入、处理和输出过程;
- 并行化
Step
:对于可并行处理的 Step
,交由不同的线程去处理;
- 分片化
Step
:通过 SPI(Serial Peripheral Interface)
,对 Step
分片执行;
- 远程组块:对于输入无性能瓶颈,但处理和输出有性能瓶颈的任务,交由远程组块执行;
详见Spring文档。
二、多线程 Step
配置
Spring Batch 执行一个 Step
,会按照 chunk
配置的数量分批次提交。对于多线程 Step
,由线程池去处理任务批次。因此,每个 chunk
都不用串行等待,这大大地提高了批处理性能。
配置多线程 Step
非常简单,可以通过 xml
或接口来配置。以接口配置为例:
1 2 3 4 5 6 7 8 9 10 11
| @Bean public Step sampleStep(TaskExecutor taskExecutor) { return this.stepBuilderFactory.get("sampleStep") .<String, String>chunk(10) .reader(itemReader()) .writer(itemWriter()) .taskExecutor(taskExecutor) .throttleLimit(20) .build(); }
|
此外,在配置多线程 Step
时,我们需要考虑得更多:
- 线程池:推荐使用
Spring
线程池 ThreadPoolTaskExecutor
,兼容性好;
- 线程安全:输入器和输出器必须是线程安全的,否则可能会导致重复任务、脏数据等问题;
- 框架节流:Spring Batch 自带节流器,默认最多可处理 4 个小任务,因此需要重新配置;
三、批处理配置
通过 Spring Batch 应用,迁移 100 万条数据。相关配置如下:
3.1 数据读取器
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41
| import com.example.springbatchdemo.component.reader.rowmapper.StudentRowMapper; import com.example.springbatchdemo.entity.Student; import org.springframework.batch.item.database.JdbcPagingItemReader; import org.springframework.batch.item.database.Order; import org.springframework.batch.item.database.builder.JdbcPagingItemReaderBuilder; import org.springframework.batch.item.database.support.MySqlPagingQueryProvider; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Qualifier; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import javax.sql.DataSource; import java.util.HashMap; import java.util.Map;
@Configuration public class CustomItemReader {
@Autowired @Qualifier(value = "batchDemoDB") private DataSource batchDemoDB;
@Bean("studentItemReader") public JdbcPagingItemReader<Student> studentItemReader() {
MySqlPagingQueryProvider queryProvider = new MySqlPagingQueryProvider(); queryProvider.setSelectClause("student_id, name, address"); queryProvider.setFromClause("from student_source");
Map<String, Order> sortKeys = new HashMap<>(1); sortKeys.put("student_id", Order.ASCENDING); queryProvider.setSortKeys(sortKeys);
return new JdbcPagingItemReaderBuilder<Student>() .name("studentItemReader") .dataSource(batchDemoDB) .fetchSize(1000) .rowMapper(new StudentRowMapper()) .queryProvider(queryProvider) .build(); } }
|
3.2 数据映射器
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16
| import com.example.springbatchdemo.entity.Student; import org.springframework.jdbc.core.RowMapper; import java.sql.ResultSet; import java.sql.SQLException;
public class StudentRowMapper implements RowMapper<Student> {
@Override public Student mapRow(ResultSet rs, int rowNum) throws SQLException { Student student = new Student(); student.setStudentId(rs.getLong("student_id")); student.setName(rs.getString("name")); student.setAddress(rs.getString("address")); return student; } }
|
3.3 数据处理器
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28
| import com.example.springbatchdemo.entity.Student; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.batch.item.ItemProcessor; import org.springframework.context.annotation.Configuration;
@Configuration public class StudentItemProcessor implements ItemProcessor<Student, Student> {
private static final Logger log = LoggerFactory.getLogger(StudentItemProcessor.class);
@Override public Student process(final Student studentSource) throws Exception {
final Long studentId = studentSource.getStudentId(); final String name = studentSource.getName(); final String address = studentSource.getAddress();
final Student studentTarget = new Student(); studentTarget.setStudentId(studentId); studentTarget.setName(name); studentTarget.setAddress(address);
log.info("Converting ({}) into ({})", studentSource, studentTarget);
return studentTarget; } }
|
3.4 数据写入器
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27
| import com.example.springbatchdemo.entity.Student; import org.springframework.batch.item.database.BeanPropertyItemSqlParameterSourceProvider; import org.springframework.batch.item.database.JdbcBatchItemWriter; import org.springframework.batch.item.database.builder.JdbcBatchItemWriterBuilder; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Qualifier; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import javax.sql.DataSource;
@Configuration public class CustomItemWriter {
@Autowired @Qualifier(value = "batchDemoDB") private DataSource batchDemoDB;
@Bean("studentItemWriter") public JdbcBatchItemWriter<Student> studentItemWriter() {
return new JdbcBatchItemWriterBuilder<Student>() .itemSqlParameterSourceProvider(new BeanPropertyItemSqlParameterSourceProvider<>()) .sql("INSERT INTO student_target (student_id, name, address) VALUES (:studentId, :name, :address)") .dataSource(batchDemoDB) .build(); } }
|
3.5 Step 配置-单线程
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38
| import com.example.springbatchdemo.component.processor.StudentItemProcessor; import com.example.springbatchdemo.entity.Student; import org.springframework.batch.core.Step; import org.springframework.batch.core.configuration.annotation.StepBuilderFactory; import org.springframework.batch.item.database.JdbcBatchItemWriter; import org.springframework.batch.item.database.JdbcPagingItemReader; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Qualifier; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration;
@Configuration public class BatchProcessStudentStep {
@Autowired public StepBuilderFactory stepBuilderFactory;
@Autowired @Qualifier(value = "studentItemReader") private JdbcPagingItemReader<Student> studentItemReader;
@Autowired @Qualifier(value = "studentItemWriter") private JdbcBatchItemWriter<Student> studentItemWriter;
@Autowired private StudentItemProcessor studentItemProcessor;
@Bean("batchProcessStudentStep1") public Step step1() { return stepBuilderFactory.get("step1") .<Student, Student>chunk(2000) .reader(studentItemReader) .processor(studentItemProcessor) .writer(studentItemWriter) .build(); } }
|
3.6 Step 配置-多线程
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46
| import com.example.springbatchdemo.component.processor.StudentItemProcessor; import com.example.springbatchdemo.entity.Student; import org.springframework.batch.core.Step; import org.springframework.batch.core.configuration.annotation.StepBuilderFactory; import org.springframework.batch.item.database.JdbcBatchItemWriter; import org.springframework.batch.item.database.JdbcPagingItemReader; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Qualifier; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor; import static com.example.springbatchdemo.config.ExecutorConfig.TASK_EXECUTOR;
@Configuration public class BatchProcessStudentStep {
@Autowired public StepBuilderFactory stepBuilderFactory;
@Autowired @Qualifier(value = "studentItemReader") private JdbcPagingItemReader<Student> studentItemReader;
@Autowired @Qualifier(value = "studentItemWriter") private JdbcBatchItemWriter<Student> studentItemWriter;
@Autowired private StudentItemProcessor studentItemProcessor;
@Autowired @Qualifier(value = TASK_EXECUTOR) private ThreadPoolTaskExecutor taskExecutor;
@Bean("batchProcessStudentStep1") public Step step1() { return stepBuilderFactory.get("step1") .<Student, Student>chunk(2000) .reader(studentItemReader) .processor(studentItemProcessor) .writer(studentItemWriter) .taskExecutor(taskExecutor) .throttleLimit(30) .build(); } }
|
3.7 Job 配置
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35
| import com.example.springbatchdemo.component.listener.BatchProcessStudentCompletionListener; import org.springframework.batch.core.Job; import org.springframework.batch.core.Step; import org.springframework.batch.core.configuration.annotation.EnableBatchProcessing; import org.springframework.batch.core.configuration.annotation.JobBuilderFactory; import org.springframework.batch.core.launch.support.RunIdIncrementer; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Qualifier; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration;
@Configuration @EnableBatchProcessing public class BatchProcessStudentJob {
@Autowired public JobBuilderFactory jobBuilderFactory;
@Autowired @Qualifier(value = "batchProcessStudentStep1") private Step batchProcessStudentStep1;
@Autowired private BatchProcessStudentCompletionListener batchProcessStudentCompletionListener;
@Bean public Job transferStudentJob() { return jobBuilderFactory.get("transferStudentJob") .incrementer(new RunIdIncrementer()) .listener(batchProcessStudentCompletionListener) .flow(batchProcessStudentStep1) .end() .build(); } }
|
3.8 MySQL
数据源配置
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20
| import com.zaxxer.hikari.HikariDataSource; import org.springframework.boot.context.properties.ConfigurationProperties; import org.springframework.boot.jdbc.DataSourceBuilder; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.context.annotation.Primary; import javax.sql.DataSource;
@Configuration public class DataSourceConfig {
@Primary @Bean(name = "batchDemoDB") @ConfigurationProperties(prefix = "spring.datasource.batch-demo") public DataSource druidDataSource() { return DataSourceBuilder.create().type(HikariDataSource.class).build(); } }
|
3.9 线程池配置
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24
| import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor; import java.util.concurrent.ThreadPoolExecutor;
@Configuration public class ExecutorConfig {
public static final String TASK_EXECUTOR = "taskExecutor";
@Bean(TASK_EXECUTOR) public ThreadPoolTaskExecutor taskExecutor() { ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor(); executor.setCorePoolSize(20); executor.setMaxPoolSize(30); executor.setQueueCapacity(10); executor.setKeepAliveSeconds(60); executor.setThreadNamePrefix("common-async-executor-"); executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy()); executor.setWaitForTasksToCompleteOnShutdown(true); executor.setAwaitTerminationSeconds(60); return executor; } }
|
四、批处理性能测试
4.1 单线程 Step
启动批处理任务,同步 100 万条数据。执行结果如下:
总耗时:313 秒
4.2 多线程 Step
启动批处理任务,同步 100 万条数据。执行结果如下:
总耗时:81 秒
性能提升超300%
五、总结
多线程 Step
将 chunk
任务交给线程池异步执行,可以显著地提升批处理的性能。但在多线程场景下,我们要了解 Spring Batch 的基础架构,避免并发导致的重复任务、脏数据等问题。
示例代码:spring-batch-demo