批次作業如果要處理的比較好 其實有非常多細節還是要去處理 比如 排程執行的紀錄 資料輸入輸出的統計 每一個 任務的成功失敗 資料流...等等...到 Retry / Skip 的處理.
這邊開發時是使用 Spring Boot 1.5.9 所以搭配的是 Spring Batch 3.0.8 , 現在 SpringBoot 2 發佈出來了, 搭配的是 Spring Batch 4.0.0, 如果開新專案建議直接開 SpringBoot 2 來開發吧, 後面再整理一下 Spring Batch 4.0.0 的寫法.
基本上呢, 就是下圖這樣的架構
最小的 Step 就是由 ItemReader(讀來源) -> ItemProcessor(處理) -> ItemWriter(寫結果) 作為一次的處理動作
複數的 Step 可以組合在一起變一個 大的 Job 就這樣而已
而每一個 Step 的啟動 結束 成功失敗 讀多少筆資料 處理筆數 寫入筆數 都會完整記錄在 資料庫 中, 也可以大概知道排程處理資料的結果.
下面這是啟動的主程式
import org.springframework.batch.core.configuration.annotation.EnableBatchProcessing; import org.springframework.boot.SpringApplication; import org.springframework.boot.autoconfigure.SpringBootApplication; import org.springframework.context.ApplicationContext; import org.springframework.data.jpa.repository.config.EnableJpaAuditing; import org.springframework.scheduling.annotation.EnableAsync; import org.springframework.scheduling.annotation.EnableScheduling; import java.util.TimeZone; @EnableAsync @EnableScheduling @EnableJpaAuditing @EnableBatchProcessing @SpringBootApplication public class ImportApplication { public static void main(String/[/] args) { TimeZone.setDefault(TimeZone.getTimeZone("Asia/Shanghai")); ApplicationContext context = SpringApplication.run(ImportApplication.class, args); } }
@EnableBatchProcessing
就是啟用 SpringBatch 如果你還沒有 Batch 用的排程資料表, 他則會用你預設的 DataSource 來建立會用到的表格.
接下來介紹程式部分, 下面這支是一個很基本的作業流程配置
package com.ps.batch.schedule.config; import com.ps.batch.batch.serivce.ImportFinanceService; import com.ps.batch.constant.ImportFinanceConstant; import com.ps.batch.dto.batch.ImportFinanceDto; import com.ps.batch.dto.batch.ImportUserDto; import com.ps.batch.schedule.BatchJobCompletionListener; import com.ps.batch.schedule.processor.ImportFinanceDtoProcessor; import com.ps.batch.schedule.processor.ImportUserDtoProcessor; import com.ps.batch.schedule.reader.ImportFinanceDtoReader; import com.ps.batch.schedule.reader.ImportUserDtoReader; import com.ps.batch.schedule.writer.ImportFinanceDtoWriter; import com.ps.batch.schedule.writer.ImportUserDtoWriter; import com.ps.batch.service.JobService; import com.ps.batch.service.MemberTagService; import com.ps.batch.service.NotifyService; import lombok.Data; import lombok.extern.slf4j.Slf4j; import org.springframework.batch.core.Job; import org.springframework.batch.core.JobParameters; import org.springframework.batch.core.JobParametersBuilder; import org.springframework.batch.core.Step; import org.springframework.batch.core.configuration.annotation.JobBuilderFactory; import org.springframework.batch.core.configuration.annotation.StepBuilderFactory; import org.springframework.batch.core.launch.support.RunIdIncrementer; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Qualifier; import org.springframework.beans.factory.annotation.Value; import org.springframework.context.annotation.Configuration; import org.springframework.core.task.TaskExecutor; import org.springframework.scheduling.annotation.Scheduled; /** * 會員打標任務 */ @Data @Slf4j @Configuration public class BatchJobMemberTag { @Autowired private JobBuilderFactory jobBuilderFactory; @Autowired private StepBuilderFactory stepBuilderFactory; @Autowired private BatchJobCompletionListener batchJobCompletionListener; @Autowired private ImportFinanceService importFinanceService; @Autowired private MemberTagService memberTagService; private String jobName = "健身房 会员 打標 作業"; @Value("${membertag.api.notify.mailto}") private String membertagApiNotifyMailto; @Autowired private JobService jobService; @Autowired private ImportFinanceDtoReader importFinanceDtoReader; @Autowired private ImportFinanceDtoProcessor importFinanceDtoProcessor; @Autowired private ImportFinanceDtoWriter importFinanceDtoWriter; @Autowired private ImportUserDtoReader importUserDtoReader; @Autowired private ImportUserDtoProcessor importUserDtoProcessor; @Autowired private ImportUserDtoWriter importUserDtoWriter; @Autowired private NotifyService notifyService; @Autowired @Qualifier("threadPoolTaskExecutor") private TaskExecutor taskExecutor; @Scheduled(initialDelay = 1 * 1000, fixedDelay = 10 * 60 * 1000) public void jobRun() { try { JobParametersBuilder jobParametersBuilder = new JobParametersBuilder(); jobParametersBuilder.addLong("time", System.currentTimeMillis()); jobParametersBuilder.addString("jobName", jobService.encodeToBase64Str(jobName)); jobParametersBuilder.addString("mailTo", jobService.encodeToBase64Str(membertagApiNotifyMailto)); JobParameters jobParameters = jobParametersBuilder.toJobParameters(); jobService.runJob(exportMemberTag(), jobParameters, jobName); } catch (Exception e) { notifyService.notifyError("異常終止通知", "啟動失敗=" + e.getMessage(), jobName, null); } } public Job exportMemberTag() throws Exception { return jobBuilderFactory.get("exportTag") .incrementer(new RunIdIncrementer()) .start(step_exportMemberTag()) .next(step_exportImportFinanceDtoTag()) .listener(batchJobCompletionListener) .build(); } public Step step_exportImportFinanceDtoTag() throws Exception { return stepBuilderFactory.get("exportImportFinanceTag") .<ImportFinanceDto, ImportFinanceDto>chunk(100) .reader(importFinanceDtoReader.getUnTaggedImportFinance()) .processor(importFinanceDtoProcessor.sendTag()) .writer(importFinanceDtoWriter.updateTaggedWriter(ImportFinanceConstant.membertagged_tagged)) .taskExecutor(taskExecutor) .throttleLimit(10) .build(); } public Step step_exportMemberTag() throws Exception { return stepBuilderFactory.get("exportImportUserTag") .<ImportUserDto, ImportUserDto>chunk(100) .reader(importUserDtoReader.findUnTaggedImportUser()) .processor(importUserDtoProcessor.sendTag()) .writer(importUserDtoWriter.updateMembertaggedIsTrue()) .taskExecutor(taskExecutor) .throttleLimit(10) .build(); } }
首先從 Scheduled 開始看, 這是 Spring Batch 提供的註解, 可用來配置定時任務, 可以用 cron 或是 initialDelay & fixedDelay 來設定, 另外要注意一點這邊啟動的時候會用著主執行緒一路執行下去, 也就是執行完一個下一次的才會啟動喔.
在啟動排程中, 我建立了 JobParametersBuilder 來傳遞必要的參數在排程中可以存取 例如任務名稱 jobName 跟 發信通知的對象 mailTo,
這邊注意 這些參數 SpringBatch 都會儲存在 batch_job_execution_params 的表格內, 但是如果你放中文的話會儲存失敗造成 Exception, 所以我這邊就稍微編碼成 Base64 再傳送進去
JobParametersBuilder jobParametersBuilder = new JobParametersBuilder(); jobParametersBuilder.addLong("time", System.currentTimeMillis()); jobParametersBuilder.addString("jobName", jobService.encodeToBase64Str(jobName)); jobParametersBuilder.addString("mailTo", jobService.encodeToBase64Str(membertagApiNotifyMailto)); JobParameters jobParameters = jobParametersBuilder.toJobParameters();
batch_job_execution_params 表格內的資料大概是像這樣
JOB_EXECUTION_ID | TYPE_CD | KEY_NAME | STRING_VAL | DATE_VAL | LONG_VAL | DOUBLE_VAL | IDENTIFYING |
---|---|---|---|---|---|---|---|
128992 | LONG | time | 1970-01-01 08:00:00 | 1521009519606 | 0 | Y | |
128992 | STRING | jobName | 5YGl6Lqr5oi/IOWinumHjyDkvJrlkZgg5ZCM5q2l | 1970-01-01 08:00:00 | 0 | 0 | Y |
128993 | LONG | time | 1970-01-01 08:00:00 | 1521009530611 | 0 | Y | |
128993 | STRING | jobName | 5YGl6Lqr5oi/IOWinumHjyDorqLljZUg5ZCM5q2lKDI05bCP5pmCKQ== | 1970-01-01 08:00:00 | 0 | 0 | Y |
然後 我的 jobService 做了什麼事?
jobService.runJob(exportMemberTag(), jobParameters, jobName);
其實就是做一個任務的啟動總開關
因為難免有要更換版的時候, 所以做了一個開關控制, 更新版本時先暫停不要再啟動新的任務, 等現在任務都結束後再進行 shoutdown 後更換 jar 檔 會是比較保險, 以免有的任務有卡 交易等等...善後很麻煩啊XD
public void runJob(Job job, JobParameters jobParameters, String jobName) { if (BatchConstant.batchEnable.booleanValue() == Boolean.TRUE.booleanValue()) { try { jobLauncher.run(job, jobParameters); } catch (Exception e) { log.error("", e); notifyService.notifyError("任务启动失败通知", e.getMessage(), jobName, null); } } else { log.warn("任务暫停執行通知 {}", jobName); } }
接下來看怎麼去編排一個任務
public Job exportMemberTag() throws Exception { return jobBuilderFactory.get("exportTag") .incrementer(new RunIdIncrementer()) .start(step_exportMemberTag()) .next(step_exportImportFinanceDtoTag()) .listener(batchJobCompletionListener) .build(); }
透過工廠物件 jobBuilderFactory 去 組合你的 Step 步驟順序 跟監聽器 listener
那再來看我們的第一個 Step (step_exportMemberTag) 怎麼編排出來的
public Step step_exportImportFinanceDtoTag() throws Exception { return stepBuilderFactory.get("exportImportFinanceTag") .<ImportFinanceDto, ImportFinanceDto>chunk(100) .reader(importFinanceDtoReader.getUnTaggedImportFinance()) .processor(importFinanceDtoProcessor.sendTag()) .writer(importFinanceDtoWriter.updateTaggedWriter(ImportFinanceConstant.membertagged_tagged)) .taskExecutor(taskExecutor) .throttleLimit(10) .build(); }
也是透過 stepBuilderFactory 去組合出來, 這邊需要定義的 chunk 是從 reader 讀出來一次要處理多少筆,
chunk 前面泛型 ImportFinanceDto 定義 reader & processor & writer 中間處理的資料型態
taskExecutor 則是定義用哪個 ThreadPool 這邊你可以先略過, 因為要用的話還有其他配置要搭配處理
throttleLimit 則是讓資料同時發散出去的大小, 比如設定 10 就同時會讓 10 筆資料丟到 ThreadPool 讓他去執行, 跟上面 taskExecutor 要搭配一起配置
再來看一下 reader 怎麼寫
public ItemReader<ImportFinanceDto> getUnTaggedImportFinance() { JdbcCursorItemReader<ImportFinanceDto> reader = new JdbcCursorItemReader<ImportFinanceDto>(); reader.setSql("select * from importfinance where membertagged = 0 order by orgcreatetime "); reader.setDataSource(batchDataSource); reader.setRowMapper(this.getImportFinanceDtoRowMapper()); return reader; } private RowMapper<ImportFinanceDto> getImportFinanceDtoRowMapper() { return new RowMapper<ImportFinanceDto>() { @Override public ImportFinanceDto mapRow(ResultSet resultSet, int i) throws SQLException { if (!(resultSet.isAfterLast()) && !(resultSet.isBeforeFirst())) { ImportFinanceDto importFinanceDto = new ImportFinanceDto(); importFinanceDto.setSerid(resultSet.getLong("serid")); importFinanceDto.setPlatform(resultSet.getString("platform")); importFinanceDto.setPpk(resultSet.getString("ppk")); importFinanceDto.setRulecodes(resultSet.getString("rulecodes")); importFinanceDto.setSaleamount(resultSet.getDouble("saleamount")); importFinanceDto.setTradeamount(resultSet.getDouble("tradeamount")); importFinanceDto.setSource(resultSet.getLong("source")); importFinanceDto.setSourcetype(resultSet.getInt("sourcetype")); importFinanceDto.setOuterid(resultSet.getString("outerid")); importFinanceDto.setOutertype(resultSet.getInt("outertype")); importFinanceDto.setTerminusUserId(resultSet.getLong("terminususerid")); importFinanceDto.setExported(resultSet.getInt("exported")); importFinanceDto.setCardtype(resultSet.getString("cardtype")); importFinanceDto.setOrgcreatetime(resultSet.getLong("orgcreatetime")); importFinanceDto.setMembertagged(resultSet.getInt("membertagged")); importFinanceDto.setCreateddate(resultSet.getDate("createddate")); importFinanceDto.setLastmodifieddate(resultSet.getDate("lastmodifieddate")); return importFinanceDto; } else { log.info("Returning null from rowMapper"); return null; } } }; }
這樣就是一個最基本常用的 ItemReader , 如果你有需要使用 多執行緒下去取資料 簡單一點可以加個 synchronized 像下面這樣
public ItemReader<ImportFinanceDto> getUnExportImportFinance() { JdbcCursorItemReader<ImportFinanceDto> reader = new JdbcCursorItemReader<ImportFinanceDto>() { @Override public synchronized ImportFinanceDto read() throws Exception { return super.read(); } }; reader.setSql("select * from importfinance where exported = 0 order by orgcreatetime "); reader.setDataSource(batchDataSource); reader.setRowMapper(this.getImportFinanceDtoRowMapper()); return reader; }
如果你想真的比較有效率的取資料 則可以改用 JdbcPagingItemReader 分頁來讀取資料庫 , 下面是另一個範例
public ItemReader<ImportUserDto> findNewImportUserThreadSafe() { JdbcPagingItemReader<ImportUserDto> jdbcPagingItemReader = new JdbcPagingItemReader(); jdbcPagingItemReader.setDataSource(batchDataSource); jdbcPagingItemReader.setFetchSize(10); jdbcPagingItemReader.setPageSize(100); try { jdbcPagingItemReader.setQueryProvider(new MySqlPagingQueryProvider() {{ setSelectClause("select * "); setFromClause("from importusers"); setWhereClause("isnew = :isnew"); setSortKeys(new HashMap<String, Order>() {{ put("serid", Order.ASCENDING); }}); }}); jdbcPagingItemReader.setParameterValues(new HashMap<String, Object>() {{ put("isnew", "1"); }}); jdbcPagingItemReader.setRowMapper(this.getImportUserDtoRowMapper()); // 這個要做, 不然沒有 JdbcTemplate 可以操作 jdbcPagingItemReader.afterPropertiesSet(); } catch (Exception e) { e.printStackTrace(); } return jdbcPagingItemReader; }
再講 processor 的中間處理, 在這階段你可以做一些像過濾, 或是轉換物件給下一階段 Step 處理
public ItemProcessor<ImportFinanceDto, ImportFinanceDto> sendTag() { return new ItemProcessor<ImportFinanceDto, ImportFinanceDto>() { @Override public ImportFinanceDto process(ImportFinanceDto importFinanceDto) throws Exception { ImportFinanceDto return_obj = null; MemberTagCreateResult memberTagCreateResult = null; try { // 建檔 memberTagCreateResult = memberTagService.tagCreate(importFinanceDto); // 都成功了就要回傳 return_obj = importFinanceDto; } catch (Exception e) { log.error("打標系統發生未知錯誤 錯誤訊息 UnknowException", e); bacthJobEvents.appendError("打標系統發生未知錯誤 錯誤訊息 " + e.getMessage()); } return return_obj; } }; }
最後 接收結果來更新資料庫
public ItemWriter<ImportFinanceDto> updateTaggedWriter(Integer membertagged) { JdbcBatchItemWriter<ImportFinanceDto> databaseItemWriter = new JdbcBatchItemWriter<ImportFinanceDto>(); databaseItemWriter.setDataSource(batchDataSource); databaseItemWriter.setSql("update importfinance set membertagged = ?, lastmodifieddate = ? where serid = ? "); databaseItemWriter.setItemPreparedStatementSetter(updateTaggedWriterSetter(membertagged)); return databaseItemWriter; } private ItemPreparedStatementSetter<ImportFinanceDto> updateTaggedWriterSetter(Integer membertagged) { return (importFinance, ps) -> { ps.setInt(1, membertagged); ps.setTimestamp(2, new Timestamp(System.currentTimeMillis())); ps.setLong(3, importFinance.getSerid()); }; }
上面是使用像 PreparedStatement 的語句來進行配置, 有時欄位一多就對到眼花, 你可以換像用 Hibernate 的 SQL 寫法 使用 : 來指定參數 參考如下 直接使用 InLeftDto 來提供參數
public ItemWriter<InLeftDto> saveInLeft() { JdbcBatchItemWriter<InLeftDto> databaseItemWriter = new JdbcBatchItemWriter<InLeftDto>(); NamedParameterJdbcTemplate jdbcTemplate = new NamedParameterJdbcTemplate(erpDataSource); databaseItemWriter.setJdbcTemplate(jdbcTemplate); databaseItemWriter.setSql("REPLACE into T_INLEFT ( VC_CODE, VC_CLUB) values ( :vcCode, :vcClub)"); ItemSqlParameterSourceProvider<InLeftDto> paramProvider = new BeanPropertyItemSqlParameterSourceProvider<InLeftDto>(); databaseItemWriter.setItemSqlParameterSourceProvider(paramProvider); databaseItemWriter.afterPropertiesSet(); return databaseItemWriter; }
這樣就是一個基本 Batch 操作
← Configuring Visual Studio Code Run SpringBoot