一、项目简介 上一节介绍了 Spring Batch 的基础架构和设计原理,本节将通过一个简单的批处理任务来学习如何使用 Spring Batch。
现在需要将逗号分隔值文件 sample-data.cvs
中的数据,按照姓氏、名称拆分,导入数据表 person
中。看图可知,BatchProcessJob
只有一个 Step
,分为三个部分:解析 cvs
文件;将文件数据转化为 Person
对象;将 Person
对象信息导入数据表 batch-demo.person
。
二、项目搭建与配置
使用Spring 应用构建工具 ,即可创建项目。我的应用部署地址:spring-batch-demo
三、创建数据表 Spring Batch 的组件JobRepository
, 专门负责与数据库打交道,记录整个批处理中的增加、检索、更新、删除动作。也就是说,Spring Batch 是依赖数据库进行管理的。数据表创建脚本 可在仓库 中找到。MySQL
数据库的脚本文件为:schema-mysql.sql
。框架依赖表建好后,不要忘了创建表 batch-demo.person
。
1 2 3 4 5 6 7 USE batch-demo;CREATE TABLE `person` ( `person_id` bigint (30 ) unsigned NOT NULL AUTO_INCREMENT, `first_name` varchar (10 ) COLLATE utf8mb4_general_ci DEFAULT NULL , `last_name` varchar (20 ) COLLATE utf8mb4_general_ci DEFAULT NULL , PRIMARY KEY (`person_id` ) ) ENGINE =InnoDB AUTO_INCREMENT=142 DEFAULT CHARSET =utf8mb4 COLLATE =utf8mb4_general_ci COMMENT ='人员信息表' ;
四、批处理任务配置
根据批处理框架的运作流程,我们做出如下配置:
4.1 cvs
文件内容读取器:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 import com.example.springbatchdemo.entity.Person;import org.springframework.batch.item.file.FlatFileItemReader;import org.springframework.batch.item.file.builder.FlatFileItemReaderBuilder;import org.springframework.batch.item.file.mapping.BeanWrapperFieldSetMapper;import org.springframework.context.annotation.Bean;import org.springframework.context.annotation.Configuration;import org.springframework.core.io.ClassPathResource;@Configuration public class CustomItemReader { @Bean ("personItemReader" ) public FlatFileItemReader<Person> personItemReader () { return new FlatFileItemReaderBuilder<Person>() .name("personItemReader" ) .resource(new ClassPathResource("sample-data.csv" )) .delimited() .names(new String[]{"firstName" , "lastName" }) .fieldSetMapper(new BeanWrapperFieldSetMapper<Person>() {{ setTargetType(Person.class ) ; }}) .build(); } }
4.2 解析数据处理器:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 import com.example.springbatchdemo.entity.Person;import org.slf4j.Logger;import org.slf4j.LoggerFactory;import org.springframework.batch.item.ItemProcessor;import org.springframework.context.annotation.Configuration;@Configuration public class PersonItemProcessor implements ItemProcessor <Person , Person > { private static final Logger log = LoggerFactory.getLogger(PersonItemProcessor.class ) ; @Override public Person process (final Person person) throws Exception { final String firstName = person.getFirstName(); final String lastName = person.getLastName(); final Person transformedPerson = new Person(); transformedPerson.setFirstName(firstName); transformedPerson.setLastName(lastName); log.info("Converting ({}) into ({})" , person, transformedPerson); return transformedPerson; } }
4.3 Person
对象写入器:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 import com.example.springbatchdemo.entity.Person;import org.springframework.batch.item.database.BeanPropertyItemSqlParameterSourceProvider;import org.springframework.batch.item.database.JdbcBatchItemWriter;import org.springframework.batch.item.database.builder.JdbcBatchItemWriterBuilder;import org.springframework.beans.factory.annotation.Autowired;import org.springframework.beans.factory.annotation.Qualifier;import org.springframework.context.annotation.Bean;import org.springframework.context.annotation.Configuration;import javax.sql.DataSource;@Configuration public class CustomItemWriter { @Autowired @Qualifier (value = "batchDemoDB" ) private DataSource batchDemoDB; @Bean ("personItemWriter" ) public JdbcBatchItemWriter<Person> personItemWriter () { return new JdbcBatchItemWriterBuilder<Person>() .itemSqlParameterSourceProvider(new BeanPropertyItemSqlParameterSourceProvider<>()) .sql("INSERT INTO person (first_name, last_name) VALUES (:firstName, :lastName)" ) .dataSource(batchDemoDB) .build(); } }
4.4 MySQL
数据源配置:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 import com.zaxxer.hikari.HikariDataSource;import org.springframework.boot.context.properties.ConfigurationProperties;import org.springframework.boot.jdbc.DataSourceBuilder;import org.springframework.context.annotation.Bean;import org.springframework.context.annotation.Configuration;import org.springframework.context.annotation.Primary;import javax.sql.DataSource;@Configuration public class DataSourceConfig { @Primary @Bean (name = "batchDemoDB" ) @ConfigurationProperties (prefix = "spring.datasource.batch-demo" ) public DataSource druidDataSource () { return DataSourceBuilder.create().type(HikariDataSource.class ).build () ; } }
4.5 Step 配置
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 import com.example.springbatchdemo.component.processor.PersonItemProcessor;import com.example.springbatchdemo.entity.Person;import org.springframework.batch.core.Step;import org.springframework.batch.core.configuration.annotation.StepBuilderFactory;import org.springframework.batch.item.database.JdbcBatchItemWriter;import org.springframework.batch.item.file.FlatFileItemReader;import org.springframework.beans.factory.annotation.Autowired;import org.springframework.beans.factory.annotation.Qualifier;import org.springframework.context.annotation.Bean;import org.springframework.context.annotation.Configuration;import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;@Configuration public class BatchProcessPersonStep { @Autowired public StepBuilderFactory stepBuilderFactory; @Autowired @Qualifier (value = "personItemReader" ) private FlatFileItemReader<Person> personItemReader; @Autowired @Qualifier (value = "personItemWriter" ) private JdbcBatchItemWriter<Person> personItemWriter; @Autowired private PersonItemProcessor personItemProcessor; @Bean ("batchProcessPersonStep1" ) public Step step1 () { return stepBuilderFactory.get("step1" ) .<Person, Person>chunk(10 ) .reader(personItemReader) .processor(personItemProcessor) .writer(personItemWriter) .build(); } }
4.6 Job
配置
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 import com.example.springbatchdemo.component.listener.BatchProcessPersonCompletionListener;import org.springframework.batch.core.Job;import org.springframework.batch.core.Step;import org.springframework.batch.core.configuration.annotation.EnableBatchProcessing;import org.springframework.batch.core.configuration.annotation.JobBuilderFactory;import org.springframework.batch.core.launch.support.RunIdIncrementer;import org.springframework.beans.factory.annotation.Autowired;import org.springframework.beans.factory.annotation.Qualifier;import org.springframework.context.annotation.Bean;import org.springframework.context.annotation.Configuration;@Configuration @EnableBatchProcessing public class BatchProcessPersonJob { @Autowired public JobBuilderFactory jobBuilderFactory; @Autowired @Qualifier (value = "batchProcessPersonStep1" ) private Step batchProcessPersonStep1; @Autowired private BatchProcessPersonCompletionListener batchProcessPersonCompletionListener; @Bean public Job importUserJob () { return jobBuilderFactory.get("importUserJob" ) .preventRestart() .incrementer(new RunIdIncrementer()) .listener(batchProcessPersonCompletionListener) .flow(batchProcessPersonStep1) .end() .build(); } }
4.7 任务状态监听器
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 import com.example.springbatchdemo.entity.Person;import com.example.springbatchdemo.mapper.PersonMapper;import org.slf4j.Logger;import org.slf4j.LoggerFactory;import org.springframework.batch.core.BatchStatus;import org.springframework.batch.core.JobExecution;import org.springframework.batch.core.listener.JobExecutionListenerSupport;import org.springframework.beans.factory.annotation.Autowired;import org.springframework.stereotype.Component;import java.util.List;@Component public class BatchProcessPersonCompletionListener extends JobExecutionListenerSupport { private static final Logger log = LoggerFactory.getLogger(BatchProcessPersonCompletionListener.class ) ; @Autowired private PersonMapper personMapper; @Override public void afterJob (JobExecution jobExecution) { if (BatchStatus.COMPLETED.equals(jobExecution.getStatus())) { log.info("Job finished! Time to verify the results" ); List<Person> personList = personMapper.queryAll(); personList.forEach(person -> log.info("Found <{}> in the database." , person)); } } }
4.8 cvs
测试数据文件配置
五、执行批处理任务 运行 SpringBoot
项目,JobLauncher
自动发起任务 importUserJob
。任务执行结果如下:
查看表 batch-demo.person
,cvs
文件内的测试数据已成功导入数据表!