转载

Spring Batch 入门

Spring Batch是一个轻量级,全面的批处理框架。

一个典型的批处理过程可能是:

  • 从数据库,文件或队列中读取大量记录。
  • 以某种方式处理数据。
  • 以修改之后的形式写回数据

Spring Batch 由很多概念组成,其大致组成如下所示:

Spring Batch 入门

下面对这些概念一一作出解释:

1.1 JobRepository

从字面上可以理解为“任务仓库”。如果把一个批处理比作一个任务的话,这个仓库存储了许多这种任务。 JobRepository 奇特的一个点就是,它会将任务,包括其状态等等数据持久化,存储到数据库中,比如生成如下所示的很多表。 Spring Batch 默认会提供这样一个 SimpleJobRepository 仓库,方便我们开启批处理。

Spring Batch 入门

1.2 Job

“任务”。每个批处理都是一个任务,除了任务本身之外,任务也存在成功和失败等等状态,所以可以引出两个概念 JobInstanceJobExecutionjob 是一个接口, JobInstance 是其实现,代表了“任务”本身,提供了 getJobNamegetInstanceId 等方法供我们获取任务本身的一些属性。 JobExecution 代表任务的状态,如创建时间、结束时间、结束状态、抛出的异常等等。

1.3 Step

“步骤”。批处理任务肯定有非常多的步骤,如一个最基本的数据库同步,从 A 数据库读取数据,存入到 B 数据库中,这里就分为了两个步骤。在 Spring Batch 中,一个任务可以有很多个步骤,每个步骤大致分为三步:读、处理、写,其对应的类分别就是 Item ReaderItem ProcessorItem Writer

1.4 JobLauncher

“任务装置”。如火箭发射装置就是用来操作火箭发射的,这里的任务装置就是用来执行任务的,如一个简单的任务启动语句:

JobParameters jobParameters = new JobParametersBuilder().toJobParameters();
jobLauncher.run(job, jobParameters);
复制代码

就启动了一个 Job。那么任务该怎么启动呢?如火箭发射装置里面肯定写了很多发射相关的代码,如什么角度发射等等。这里又引入一个概念 JobParameters ,里面包含了若干 JobParameter ,这个类必然包含了任务启动时的一个属性。在源码中,可以看到 JobParameters 作为组成部分,形成 JobInstance,也就是任务本身。

JobInstance jobInstance = jobInstanceDao.getJobInstance(jobName, jobParameters);
复制代码

二、实 践

在刚刚学习 Spring Batch 的时候,其实就是在学习 SpringBoot 的时候,总是会有个误区,太过纠结 SpringBoot 的配置了,其实这对入门来说很不好,配置是为了简化工作,而对于入门来说,更重要的是知道为什么这样配置。所以这里,会弱化配置项,专注概念本身。

2.1 配置数据源

上面提到了 JobRepository 会将 Job 及其相关信息持久化,所以需要给其配置一个数据库让其持久化,这里用的是 mysql。在 application.yml 里面这样配置,使用的是阿里的 Druid 连接池。

spring:
  datasource:
    type: com.alibaba.druid.pool.DruidDataSource
    driver-class-name: com.mysql.cj.jdbc.Driver
    username: root
    password: root
    url: jdbc:mysql://localhost:3306/web?serverTimezone=UTC&useUnicode=true&characterEncoding=utf-8
    schema: classpath:/org/springframework/batch/core/schema-mysql.sql
  batch:
    job:
      # 默认自动执行定义的Job(true),改为false,需要jobLauncher.run执行
      enabled: false
    # 在数据库里面创建默认的数据表,如果不是always则会提示相关表不存在
    initialize-schema: always
  main:
        # 解决重复扫描报错的问题
    allow-bean-definition-overriding: true
复制代码

pom.xml:

<!-- druid -->
<dependency>
  <groupId>com.alibaba</groupId>
  <artifactId>druid-spring-boot-starter</artifactId>
  <version>1.1.10</version>
</dependency>
复制代码

创建 DataSourceConfig 将 DataSource 交由 Spring 容器管理,后面需要用到:

@Configuration
public class DataSourceConfig {
    @ConfigurationProperties(prefix = "spring.datasource")
    @Bean
    public DruidDataSource druidDataSource(){
        return new DruidDataSource();
    }
}
复制代码

2.2 配置 JobRepository、JobLauncher

创建 BatchConfig:

import com.alibaba.druid.pool.DruidDataSource;
import org.springframework.batch.core.configuration.annotation.EnableBatchProcessing;
import org.springframework.batch.core.launch.support.SimpleJobLauncher;
import org.springframework.batch.core.repository.JobRepository;
import org.springframework.batch.core.repository.support.JobRepositoryFactoryBean;
import org.springframework.batch.support.DatabaseType;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.core.annotation.Order;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
import org.springframework.transaction.PlatformTransactionManager;
import javax.sql.DataSource;
@Configuration
@EnableBatchProcessing
@Order(3)
public class BatchConfig {
    @Bean(name = "taskExecutor")
    public ThreadPoolTaskExecutor taskExecutor() {
        ThreadPoolTaskExecutor taskExecutor = new ThreadPoolTaskExecutor();
        taskExecutor.setCorePoolSize(5);
        taskExecutor.setMaxPoolSize(10);
        taskExecutor.setQueueCapacity(200);
        return taskExecutor;
    }
    
    @Bean
    public JobRepository jobRepository(DruidDataSource dataSource, PlatformTransactionManager transactionManager) throws Exception {
        JobRepositoryFactoryBean jobRepositoryFactoryBean = new JobRepositoryFactoryBean();
        jobRepositoryFactoryBean.setDataSource(dataSource);
        jobRepositoryFactoryBean.setTransactionManager(transactionManager);
        jobRepositoryFactoryBean.setDatabaseType(String.valueOf(DatabaseType.MYSQL));
        jobRepositoryFactoryBean.setMaxVarCharLength(5000);
        jobRepositoryFactoryBean.afterPropertiesSet();
        return jobRepositoryFactoryBean.getObject();
    }
    
    @Bean
    public SimpleJobLauncher jobLauncher(@Qualifier("taskExecutor") ThreadPoolTaskExecutor taskExecutor,
                                         JobRepository jobRepository) throws Exception {
        SimpleJobLauncher jobLauncher = new SimpleJobLauncher();
        jobLauncher.setTaskExecutor(taskExecutor);
        jobLauncher.setJobRepository(jobRepository);
        return jobLauncher;
    }
}
复制代码

2.3 启动

创建一个 Controller:

import org.springframework.batch.core.*;
import org.springframework.batch.core.configuration.annotation.JobBuilderFactory;
import org.springframework.batch.core.configuration.annotation.StepBuilderFactory;
import org.springframework.batch.core.launch.JobLauncher;
import org.springframework.batch.core.repository.JobRepository;
import org.springframework.batch.item.*;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RestController;
import java.util.List;
@RestController
public class MainController {
    @Autowired
    JobRepository jobRepository;
    @Autowired
    JobLauncher jobLauncher;
    @Autowired
    JobBuilderFactory jobBuilderFactory;
    @Autowired
    StepBuilderFactory stepBuilderFactory;
    private int count = 0;
    @GetMapping("/test")
    public void hello() {
        try {
            // 首先构建 Step
            // 1. 创建 Item Reader,返回 null 读取结束
            ItemReader<String> itemReader = new ItemReader<String>() {
                @Override
                public String read() throws Exception, UnexpectedInputException, ParseException, NonTransientResourceException {
                    count++;
                    if (count <= 10) {
                        return "第" + count + "次";
                    }else {
                        return null;
                    }
                }
            };
            // 2. 创建 Item Processor
            ItemProcessor<String, String> itemProcessor = new ItemProcessor<String, String>() {
                @Override
                public String process(String item) throws Exception {
                    return "转换过后变为第" + count + "次";
                }
            };
            // 3. 创建 Item Writer
            ItemWriter<String> itemWriter = new ItemWriter<String>() {
                @Override
                public void write(List<? extends String> items) throws Exception {
                    for (String me : items) {
                        System.out.println(me);
                    }
                }
            };
            // 4. 构建 Step
            Step step = stepBuilderFactory.get("2Step")
                    .<String, String>chunk(1)
                    .reader(itemReader)
                    .processor(itemProcessor)
                    .writer(itemWriter)
                    .build();
            // 5. 构建 Job
            Job job = jobBuilderFactory.get("2Job").start(step).next(step).build();
            // 6. 创建参数
            JobParameters jobParameters = new JobParametersBuilder().toJobParameters();
            // 7. 启动 Job
            JobExecution jobExecution = jobLauncher.run(job, jobParameters);
        }catch (Exception e){
            e.printStackTrace();
        }
    }
}
复制代码

JobBuilderFactoryStepBuilderFactory 都是 Spring 提供的, JobRepositoryJobLauncherBatchConfig 里面提供的。通过控制台的打印,我们可以很清晰的看到其运行流程,即: Job -> Step1 -> Reader -> Processor -> Writer -> Step2 -> ...-> 结束

Spring Batch 入门

Ok,后续的高级用法再慢慢探索吧!

原文  https://juejin.im/post/5ef80de65188252e851c468b
正文到此结束
Loading...