转载

SpringBatch 3 基礎使用教學

批次作業如果要處理的比較好 其實有非常多細節還是要去處理 比如 排程執行的紀錄 資料輸入輸出的統計 每一個 任務的成功失敗 資料流...等等...到 Retry / Skip 的處理.

這邊開發時是使用 Spring Boot 1.5.9 所以搭配的是 Spring Batch 3.0.8 , 現在 SpringBoot 2 發佈出來了, 搭配的是 Spring Batch 4.0.0, 如果開新專案建議直接開 SpringBoot 2 來開發吧, 後面再整理一下 Spring Batch 4.0.0 的寫法.

基本上呢, 就是下圖這樣的架構

SpringBatch 3 基礎使用教學

最小的 Step 就是由 ItemReader(讀來源) -> ItemProcessor(處理) -> ItemWriter(寫結果) 作為一次的處理動作

複數的 Step 可以組合在一起變一個 大的 Job 就這樣而已

而每一個 Step 的啟動 結束 成功失敗 讀多少筆資料 處理筆數 寫入筆數 都會完整記錄在 資料庫 中, 也可以大概知道排程處理資料的結果.

下面這是啟動的主程式

import org.springframework.batch.core.configuration.annotation.EnableBatchProcessing;  
import org.springframework.boot.SpringApplication;  
import org.springframework.boot.autoconfigure.SpringBootApplication;  
import org.springframework.context.ApplicationContext;  
import org.springframework.data.jpa.repository.config.EnableJpaAuditing;  
import org.springframework.scheduling.annotation.EnableAsync;  
import org.springframework.scheduling.annotation.EnableScheduling;  
  
import java.util.TimeZone;  
  
@EnableAsync  
@EnableScheduling  
@EnableJpaAuditing  
@EnableBatchProcessing  
@SpringBootApplication  
public class ImportApplication {  
    public static void main(String/[/] args) {  
        TimeZone.setDefault(TimeZone.getTimeZone("Asia/Shanghai"));  
  ApplicationContext context = SpringApplication.run(ImportApplication.class, args);  
  }  
}

@EnableBatchProcessing

就是啟用 SpringBatch 如果你還沒有 Batch 用的排程資料表, 他則會用你預設的 DataSource 來建立會用到的表格.

接下來介紹程式部分, 下面這支是一個很基本的作業流程配置

package com.ps.batch.schedule.config;

import com.ps.batch.batch.serivce.ImportFinanceService;
import com.ps.batch.constant.ImportFinanceConstant;
import com.ps.batch.dto.batch.ImportFinanceDto;
import com.ps.batch.dto.batch.ImportUserDto;
import com.ps.batch.schedule.BatchJobCompletionListener;
import com.ps.batch.schedule.processor.ImportFinanceDtoProcessor;
import com.ps.batch.schedule.processor.ImportUserDtoProcessor;
import com.ps.batch.schedule.reader.ImportFinanceDtoReader;
import com.ps.batch.schedule.reader.ImportUserDtoReader;
import com.ps.batch.schedule.writer.ImportFinanceDtoWriter;
import com.ps.batch.schedule.writer.ImportUserDtoWriter;
import com.ps.batch.service.JobService;
import com.ps.batch.service.MemberTagService;
import com.ps.batch.service.NotifyService;
import lombok.Data;
import lombok.extern.slf4j.Slf4j;
import org.springframework.batch.core.Job;
import org.springframework.batch.core.JobParameters;
import org.springframework.batch.core.JobParametersBuilder;
import org.springframework.batch.core.Step;
import org.springframework.batch.core.configuration.annotation.JobBuilderFactory;
import org.springframework.batch.core.configuration.annotation.StepBuilderFactory;
import org.springframework.batch.core.launch.support.RunIdIncrementer;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Configuration;
import org.springframework.core.task.TaskExecutor;
import org.springframework.scheduling.annotation.Scheduled;


/**
 * 會員打標任務
 */

@Data
@Slf4j
@Configuration
public class BatchJobMemberTag {
    @Autowired
    private JobBuilderFactory jobBuilderFactory;
    @Autowired
    private StepBuilderFactory stepBuilderFactory;
    @Autowired
    private BatchJobCompletionListener batchJobCompletionListener;
    @Autowired
    private ImportFinanceService importFinanceService;
    @Autowired
    private MemberTagService memberTagService;

    private String jobName = "健身房 会员 打標 作業";

    @Value("${membertag.api.notify.mailto}")
    private String membertagApiNotifyMailto;

    @Autowired
    private JobService jobService;
    @Autowired
    private ImportFinanceDtoReader importFinanceDtoReader;
    @Autowired
    private ImportFinanceDtoProcessor importFinanceDtoProcessor;
    @Autowired
    private ImportFinanceDtoWriter importFinanceDtoWriter;
    @Autowired
    private ImportUserDtoReader importUserDtoReader;
    @Autowired
    private ImportUserDtoProcessor importUserDtoProcessor;
    @Autowired
    private ImportUserDtoWriter importUserDtoWriter;

    @Autowired
    private NotifyService notifyService;

    @Autowired
    @Qualifier("threadPoolTaskExecutor")
    private TaskExecutor taskExecutor;

    @Scheduled(initialDelay = 1 * 1000, fixedDelay = 10 * 60 * 1000)
    public void jobRun() {
        try {
            JobParametersBuilder jobParametersBuilder = new JobParametersBuilder();
            jobParametersBuilder.addLong("time", System.currentTimeMillis());
            jobParametersBuilder.addString("jobName", jobService.encodeToBase64Str(jobName));
            jobParametersBuilder.addString("mailTo", jobService.encodeToBase64Str(membertagApiNotifyMailto));
            JobParameters jobParameters = jobParametersBuilder.toJobParameters();
            jobService.runJob(exportMemberTag(), jobParameters, jobName);
        } catch (Exception e) {
            notifyService.notifyError("異常終止通知", "啟動失敗=" + e.getMessage(), jobName, null);
        }
    }

    public Job exportMemberTag() throws Exception {
        return jobBuilderFactory.get("exportTag")
                .incrementer(new RunIdIncrementer())
                .start(step_exportMemberTag())
                .next(step_exportImportFinanceDtoTag())
                .listener(batchJobCompletionListener)
                .build();
    }

    public Step step_exportImportFinanceDtoTag() throws Exception {
        return stepBuilderFactory.get("exportImportFinanceTag")
                .<ImportFinanceDto, ImportFinanceDto>chunk(100)
                .reader(importFinanceDtoReader.getUnTaggedImportFinance())
                .processor(importFinanceDtoProcessor.sendTag())
                .writer(importFinanceDtoWriter.updateTaggedWriter(ImportFinanceConstant.membertagged_tagged))
                .taskExecutor(taskExecutor)
                .throttleLimit(10)
                .build();
    }

    public Step step_exportMemberTag() throws Exception {
        return stepBuilderFactory.get("exportImportUserTag")
                .<ImportUserDto, ImportUserDto>chunk(100)
                .reader(importUserDtoReader.findUnTaggedImportUser())
                .processor(importUserDtoProcessor.sendTag())
                .writer(importUserDtoWriter.updateMembertaggedIsTrue())
                .taskExecutor(taskExecutor)
                .throttleLimit(10)
                .build();
    }
}

首先從 Scheduled 開始看, 這是 Spring Batch 提供的註解, 可用來配置定時任務, 可以用 cron 或是 initialDelay & fixedDelay 來設定, 另外要注意一點這邊啟動的時候會用著主執行緒一路執行下去, 也就是執行完一個下一次的才會啟動喔.

在啟動排程中, 我建立了 JobParametersBuilder 來傳遞必要的參數在排程中可以存取 例如任務名稱 jobName 跟 發信通知的對象 mailTo,

這邊注意 這些參數 SpringBatch 都會儲存在 batch_job_execution_params 的表格內, 但是如果你放中文的話會儲存失敗造成 Exception, 所以我這邊就稍微編碼成 Base64 再傳送進去

JobParametersBuilder jobParametersBuilder = new JobParametersBuilder();
jobParametersBuilder.addLong("time", System.currentTimeMillis());
jobParametersBuilder.addString("jobName", jobService.encodeToBase64Str(jobName));
jobParametersBuilder.addString("mailTo", jobService.encodeToBase64Str(membertagApiNotifyMailto));
JobParameters jobParameters = jobParametersBuilder.toJobParameters();

batch_job_execution_params 表格內的資料大概是像這樣

JOB_EXECUTION_ID TYPE_CD KEY_NAME STRING_VAL DATE_VAL LONG_VAL DOUBLE_VAL IDENTIFYING
128992 LONG time 1970-01-01 08:00:00 1521009519606 0 Y
128992 STRING jobName 5YGl6Lqr5oi/IOWinumHjyDkvJrlkZgg5ZCM5q2l 1970-01-01 08:00:00 0 0 Y
128993 LONG time 1970-01-01 08:00:00 1521009530611 0 Y
128993 STRING jobName 5YGl6Lqr5oi/IOWinumHjyDorqLljZUg5ZCM5q2lKDI05bCP5pmCKQ== 1970-01-01 08:00:00 0 0 Y

然後 我的 jobService 做了什麼事?

jobService.runJob(exportMemberTag(), jobParameters, jobName);

其實就是做一個任務的啟動總開關

因為難免有要更換版的時候, 所以做了一個開關控制, 更新版本時先暫停不要再啟動新的任務, 等現在任務都結束後再進行 shoutdown 後更換 jar 檔 會是比較保險, 以免有的任務有卡 交易等等...善後很麻煩啊XD

public void runJob(Job job, JobParameters jobParameters, String jobName) {
    if (BatchConstant.batchEnable.booleanValue() == Boolean.TRUE.booleanValue()) {
        try {
            jobLauncher.run(job, jobParameters);
        } catch (Exception e) {
            log.error("", e);
            notifyService.notifyError("任务启动失败通知", e.getMessage(), jobName, null);
        }
    } else {
        log.warn("任务暫停執行通知 {}", jobName);
    }
}

接下來看怎麼去編排一個任務

public Job exportMemberTag() throws Exception {
        return jobBuilderFactory.get("exportTag")
                .incrementer(new RunIdIncrementer())
                .start(step_exportMemberTag())
                .next(step_exportImportFinanceDtoTag())
                .listener(batchJobCompletionListener)
                .build();
    }

透過工廠物件 jobBuilderFactory 去 組合你的 Step 步驟順序 跟監聽器 listener

那再來看我們的第一個 Step (step_exportMemberTag) 怎麼編排出來的

public Step step_exportImportFinanceDtoTag() throws Exception {
    return stepBuilderFactory.get("exportImportFinanceTag")
            .<ImportFinanceDto, ImportFinanceDto>chunk(100)
            .reader(importFinanceDtoReader.getUnTaggedImportFinance())
            .processor(importFinanceDtoProcessor.sendTag())
            .writer(importFinanceDtoWriter.updateTaggedWriter(ImportFinanceConstant.membertagged_tagged))
            .taskExecutor(taskExecutor)
            .throttleLimit(10)
            .build();
}

也是透過 stepBuilderFactory 去組合出來, 這邊需要定義的 chunk 是從 reader 讀出來一次要處理多少筆,

chunk 前面泛型 ImportFinanceDto 定義 reader & processor & writer 中間處理的資料型態

taskExecutor 則是定義用哪個 ThreadPool 這邊你可以先略過, 因為要用的話還有其他配置要搭配處理

throttleLimit 則是讓資料同時發散出去的大小, 比如設定 10 就同時會讓 10 筆資料丟到 ThreadPool 讓他去執行, 跟上面 taskExecutor 要搭配一起配置

再來看一下 reader 怎麼寫

public ItemReader<ImportFinanceDto> getUnTaggedImportFinance() {
    JdbcCursorItemReader<ImportFinanceDto> reader = new JdbcCursorItemReader<ImportFinanceDto>();
    reader.setSql("select * from importfinance where membertagged = 0 order by orgcreatetime ");
    reader.setDataSource(batchDataSource);
    reader.setRowMapper(this.getImportFinanceDtoRowMapper());
    return reader;
}

private RowMapper<ImportFinanceDto> getImportFinanceDtoRowMapper() {
    return new RowMapper<ImportFinanceDto>() {
        @Override
        public ImportFinanceDto mapRow(ResultSet resultSet, int i) throws SQLException {
            if (!(resultSet.isAfterLast()) && !(resultSet.isBeforeFirst())) {
                ImportFinanceDto importFinanceDto = new ImportFinanceDto();
                importFinanceDto.setSerid(resultSet.getLong("serid"));
                importFinanceDto.setPlatform(resultSet.getString("platform"));
                importFinanceDto.setPpk(resultSet.getString("ppk"));
                importFinanceDto.setRulecodes(resultSet.getString("rulecodes"));
                importFinanceDto.setSaleamount(resultSet.getDouble("saleamount"));
                importFinanceDto.setTradeamount(resultSet.getDouble("tradeamount"));
                importFinanceDto.setSource(resultSet.getLong("source"));
                importFinanceDto.setSourcetype(resultSet.getInt("sourcetype"));
                importFinanceDto.setOuterid(resultSet.getString("outerid"));
                importFinanceDto.setOutertype(resultSet.getInt("outertype"));
                importFinanceDto.setTerminusUserId(resultSet.getLong("terminususerid"));
                importFinanceDto.setExported(resultSet.getInt("exported"));
                importFinanceDto.setCardtype(resultSet.getString("cardtype"));
                importFinanceDto.setOrgcreatetime(resultSet.getLong("orgcreatetime"));
                importFinanceDto.setMembertagged(resultSet.getInt("membertagged"));
                importFinanceDto.setCreateddate(resultSet.getDate("createddate"));
                importFinanceDto.setLastmodifieddate(resultSet.getDate("lastmodifieddate"));
                return importFinanceDto;
            } else {
                log.info("Returning null from rowMapper");
                return null;
            }
        }
    };
}

這樣就是一個最基本常用的 ItemReader , 如果你有需要使用 多執行緒下去取資料 簡單一點可以加個 synchronized 像下面這樣

public ItemReader<ImportFinanceDto> getUnExportImportFinance() {
    JdbcCursorItemReader<ImportFinanceDto> reader = new JdbcCursorItemReader<ImportFinanceDto>() {
        @Override
        public synchronized ImportFinanceDto read() throws Exception {
            return super.read();
        }
    };
    reader.setSql("select * from importfinance where exported = 0 order by orgcreatetime ");
    reader.setDataSource(batchDataSource);
    reader.setRowMapper(this.getImportFinanceDtoRowMapper());
    return reader;
}

如果你想真的比較有效率的取資料 則可以改用 JdbcPagingItemReader 分頁來讀取資料庫 , 下面是另一個範例

public ItemReader<ImportUserDto> findNewImportUserThreadSafe() {
    JdbcPagingItemReader<ImportUserDto> jdbcPagingItemReader = new JdbcPagingItemReader();
    jdbcPagingItemReader.setDataSource(batchDataSource);
    jdbcPagingItemReader.setFetchSize(10);
    jdbcPagingItemReader.setPageSize(100);
    try {
        jdbcPagingItemReader.setQueryProvider(new MySqlPagingQueryProvider() {{
            setSelectClause("select * ");
            setFromClause("from importusers");
            setWhereClause("isnew = :isnew");
            setSortKeys(new HashMap<String, Order>() {{
                put("serid", Order.ASCENDING);
            }});
        }});
        jdbcPagingItemReader.setParameterValues(new HashMap<String, Object>() {{
            put("isnew", "1");
        }});
        jdbcPagingItemReader.setRowMapper(this.getImportUserDtoRowMapper());
        // 這個要做, 不然沒有 JdbcTemplate 可以操作

        jdbcPagingItemReader.afterPropertiesSet();
    } catch (Exception e) {
        e.printStackTrace();
    }
    return jdbcPagingItemReader;
}

再講 processor 的中間處理, 在這階段你可以做一些像過濾, 或是轉換物件給下一階段 Step 處理

public ItemProcessor<ImportFinanceDto, ImportFinanceDto> sendTag() {
    return new ItemProcessor<ImportFinanceDto, ImportFinanceDto>() {
        @Override
        public ImportFinanceDto process(ImportFinanceDto importFinanceDto) throws Exception {
            ImportFinanceDto return_obj = null;
            MemberTagCreateResult memberTagCreateResult = null;
            try {
                // 建檔

                memberTagCreateResult = memberTagService.tagCreate(importFinanceDto);
                // 都成功了就要回傳

                return_obj = importFinanceDto;
            } catch (Exception e) {
                log.error("打標系統發生未知錯誤 錯誤訊息 UnknowException", e);
                bacthJobEvents.appendError("打標系統發生未知錯誤 錯誤訊息 " + e.getMessage());
            }
            return return_obj;
        }
    };
}

最後 接收結果來更新資料庫

public ItemWriter<ImportFinanceDto> updateTaggedWriter(Integer membertagged) {
    JdbcBatchItemWriter<ImportFinanceDto> databaseItemWriter = new JdbcBatchItemWriter<ImportFinanceDto>();
    databaseItemWriter.setDataSource(batchDataSource);
    databaseItemWriter.setSql("update importfinance set membertagged = ?, lastmodifieddate = ? where serid = ? ");
    databaseItemWriter.setItemPreparedStatementSetter(updateTaggedWriterSetter(membertagged));
    return databaseItemWriter;
}

private ItemPreparedStatementSetter<ImportFinanceDto> updateTaggedWriterSetter(Integer membertagged) {
    return (importFinance, ps) -> {
        ps.setInt(1, membertagged);
        ps.setTimestamp(2, new Timestamp(System.currentTimeMillis()));
        ps.setLong(3, importFinance.getSerid());
    };
}

上面是使用像 PreparedStatement 的語句來進行配置, 有時欄位一多就對到眼花, 你可以換像用 Hibernate 的 SQL 寫法 使用 : 來指定參數 參考如下 直接使用 InLeftDto 來提供參數

public ItemWriter<InLeftDto> saveInLeft() {
        JdbcBatchItemWriter<InLeftDto> databaseItemWriter = new JdbcBatchItemWriter<InLeftDto>();
        NamedParameterJdbcTemplate jdbcTemplate = new NamedParameterJdbcTemplate(erpDataSource);
        databaseItemWriter.setJdbcTemplate(jdbcTemplate);
        databaseItemWriter.setSql("REPLACE into T_INLEFT ( VC_CODE, VC_CLUB) values ( :vcCode, :vcClub)");
        ItemSqlParameterSourceProvider<InLeftDto> paramProvider =
                new BeanPropertyItemSqlParameterSourceProvider<InLeftDto>();
        databaseItemWriter.setItemSqlParameterSourceProvider(paramProvider);
        databaseItemWriter.afterPropertiesSet();
        return databaseItemWriter;
    }

這樣就是一個基本 Batch 操作

← Configuring Visual Studio Code Run SpringBoot

原文  http://samchu.logdown.com/posts/6867377-introducing-springbatch-3
正文到此结束
Loading...