Spring Batch是一个轻量级,全面的批处理框架。
一个典型的批处理过程可能是:
Spring Batch 由很多概念组成,其大致组成如下所示:
下面对这些概念一一作出解释:
从字面上可以理解为“任务仓库”。如果把一个批处理比作一个任务的话,这个仓库存储了许多这种任务。 JobRepository
奇特的一个点就是,它会将任务,包括其状态等等数据持久化,存储到数据库中,比如生成如下所示的很多表。 Spring Batch
默认会提供这样一个 SimpleJobRepository
仓库,方便我们开启批处理。
“任务”。每个批处理都是一个任务,除了任务本身之外,任务也存在成功和失败等等状态,所以可以引出两个概念 JobInstance
与 JobExecution
。 job
是一个接口, JobInstance
是其实现,代表了“任务”本身,提供了 getJobName
、 getInstanceId
等方法供我们获取任务本身的一些属性。 JobExecution
代表任务的状态,如创建时间、结束时间、结束状态、抛出的异常等等。
“步骤”。批处理任务肯定有非常多的步骤,如一个最基本的数据库同步,从 A 数据库读取数据,存入到 B 数据库中,这里就分为了两个步骤。在 Spring Batch
中,一个任务可以有很多个步骤,每个步骤大致分为三步:读、处理、写,其对应的类分别就是 Item Reader
, Item Processor
, Item Writer
。
“任务装置”。如火箭发射装置就是用来操作火箭发射的,这里的任务装置就是用来执行任务的,如一个简单的任务启动语句:
JobParameters jobParameters = new JobParametersBuilder().toJobParameters(); jobLauncher.run(job, jobParameters); 复制代码
就启动了一个 Job。那么任务该怎么启动呢?如火箭发射装置里面肯定写了很多发射相关的代码,如什么角度发射等等。这里又引入一个概念 JobParameters
,里面包含了若干 JobParameter
,这个类必然包含了任务启动时的一个属性。在源码中,可以看到 JobParameters
作为组成部分,形成 JobInstance,也就是任务本身。
JobInstance jobInstance = jobInstanceDao.getJobInstance(jobName, jobParameters); 复制代码
在刚刚学习 Spring Batch 的时候,其实就是在学习 SpringBoot 的时候,总是会有个误区,太过纠结 SpringBoot 的配置了,其实这对入门来说很不好,配置是为了简化工作,而对于入门来说,更重要的是知道为什么这样配置。所以这里,会弱化配置项,专注概念本身。
上面提到了 JobRepository
会将 Job
及其相关信息持久化,所以需要给其配置一个数据库让其持久化,这里用的是 mysql。在 application.yml 里面这样配置,使用的是阿里的 Druid 连接池。
spring: datasource: type: com.alibaba.druid.pool.DruidDataSource driver-class-name: com.mysql.cj.jdbc.Driver username: root password: root url: jdbc:mysql://localhost:3306/web?serverTimezone=UTC&useUnicode=true&characterEncoding=utf-8 schema: classpath:/org/springframework/batch/core/schema-mysql.sql batch: job: # 默认自动执行定义的Job(true),改为false,需要jobLauncher.run执行 enabled: false # 在数据库里面创建默认的数据表,如果不是always则会提示相关表不存在 initialize-schema: always main: # 解决重复扫描报错的问题 allow-bean-definition-overriding: true 复制代码
pom.xml:
<!-- druid --> <dependency> <groupId>com.alibaba</groupId> <artifactId>druid-spring-boot-starter</artifactId> <version>1.1.10</version> </dependency> 复制代码
创建 DataSourceConfig 将 DataSource 交由 Spring 容器管理,后面需要用到:
@Configuration public class DataSourceConfig { @ConfigurationProperties(prefix = "spring.datasource") @Bean public DruidDataSource druidDataSource(){ return new DruidDataSource(); } } 复制代码
创建 BatchConfig:
import com.alibaba.druid.pool.DruidDataSource; import org.springframework.batch.core.configuration.annotation.EnableBatchProcessing; import org.springframework.batch.core.launch.support.SimpleJobLauncher; import org.springframework.batch.core.repository.JobRepository; import org.springframework.batch.core.repository.support.JobRepositoryFactoryBean; import org.springframework.batch.support.DatabaseType; import org.springframework.beans.factory.annotation.Qualifier; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.core.annotation.Order; import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor; import org.springframework.transaction.PlatformTransactionManager; import javax.sql.DataSource; @Configuration @EnableBatchProcessing @Order(3) public class BatchConfig { @Bean(name = "taskExecutor") public ThreadPoolTaskExecutor taskExecutor() { ThreadPoolTaskExecutor taskExecutor = new ThreadPoolTaskExecutor(); taskExecutor.setCorePoolSize(5); taskExecutor.setMaxPoolSize(10); taskExecutor.setQueueCapacity(200); return taskExecutor; } @Bean public JobRepository jobRepository(DruidDataSource dataSource, PlatformTransactionManager transactionManager) throws Exception { JobRepositoryFactoryBean jobRepositoryFactoryBean = new JobRepositoryFactoryBean(); jobRepositoryFactoryBean.setDataSource(dataSource); jobRepositoryFactoryBean.setTransactionManager(transactionManager); jobRepositoryFactoryBean.setDatabaseType(String.valueOf(DatabaseType.MYSQL)); jobRepositoryFactoryBean.setMaxVarCharLength(5000); jobRepositoryFactoryBean.afterPropertiesSet(); return jobRepositoryFactoryBean.getObject(); } @Bean public SimpleJobLauncher jobLauncher(@Qualifier("taskExecutor") ThreadPoolTaskExecutor taskExecutor, JobRepository jobRepository) throws Exception { SimpleJobLauncher jobLauncher = new SimpleJobLauncher(); jobLauncher.setTaskExecutor(taskExecutor); jobLauncher.setJobRepository(jobRepository); return jobLauncher; } } 复制代码
创建一个 Controller:
import org.springframework.batch.core.*; import org.springframework.batch.core.configuration.annotation.JobBuilderFactory; import org.springframework.batch.core.configuration.annotation.StepBuilderFactory; import org.springframework.batch.core.launch.JobLauncher; import org.springframework.batch.core.repository.JobRepository; import org.springframework.batch.item.*; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.web.bind.annotation.GetMapping; import org.springframework.web.bind.annotation.RestController; import java.util.List; @RestController public class MainController { @Autowired JobRepository jobRepository; @Autowired JobLauncher jobLauncher; @Autowired JobBuilderFactory jobBuilderFactory; @Autowired StepBuilderFactory stepBuilderFactory; private int count = 0; @GetMapping("/test") public void hello() { try { // 首先构建 Step // 1. 创建 Item Reader,返回 null 读取结束 ItemReader<String> itemReader = new ItemReader<String>() { @Override public String read() throws Exception, UnexpectedInputException, ParseException, NonTransientResourceException { count++; if (count <= 10) { return "第" + count + "次"; }else { return null; } } }; // 2. 创建 Item Processor ItemProcessor<String, String> itemProcessor = new ItemProcessor<String, String>() { @Override public String process(String item) throws Exception { return "转换过后变为第" + count + "次"; } }; // 3. 创建 Item Writer ItemWriter<String> itemWriter = new ItemWriter<String>() { @Override public void write(List<? extends String> items) throws Exception { for (String me : items) { System.out.println(me); } } }; // 4. 构建 Step Step step = stepBuilderFactory.get("2Step") .<String, String>chunk(1) .reader(itemReader) .processor(itemProcessor) .writer(itemWriter) .build(); // 5. 构建 Job Job job = jobBuilderFactory.get("2Job").start(step).next(step).build(); // 6. 创建参数 JobParameters jobParameters = new JobParametersBuilder().toJobParameters(); // 7. 启动 Job JobExecution jobExecution = jobLauncher.run(job, jobParameters); }catch (Exception e){ e.printStackTrace(); } } } 复制代码
JobBuilderFactory
、 StepBuilderFactory
都是 Spring 提供的, JobRepository
、 JobLauncher
是 BatchConfig
里面提供的。通过控制台的打印,我们可以很清晰的看到其运行流程,即: Job -> Step1 -> Reader -> Processor -> Writer -> Step2 -> ...-> 结束
:
Ok,后续的高级用法再慢慢探索吧!