前面写过一篇 通过 Jdbc + Job 解决跨库大表数据迁移 ,那只是个初始版本,后面对其进行了优化改造,数据迁移性能大幅度提升。
下面是主要代码,其它代码可参考 通过 Jdbc + Job 解决跨库大表数据迁移 :
/** * 表数据迁移JOB * * @author qimok * @since 2020-1-17 */ @Component @Slf4j(system = SERVICE_NAME, module = DATA_MIGRATE_JOB) @SuppressWarnings({"checkstyle:linelength", "checkstyle:magicnumber", "checkstyle:methodlength", "PMD"}) public class OldTableMigrateJob { @Value("${old.table.migration.enabled}") private Boolean enabled; @Value("${migration.qps}") private Integer qps; @Value("${migration.every.commit.count}") private Integer everyCommitCount; @Value("${migration.thread.num}") private Integer threadNum; @Autowired private RedisService redisService; @Autowired private JdbcConnection conn; /** * 按照privdoc库的表正序迁移,不重复 * <p> * 每10秒执行一次 */ @Scheduled(fixedRate = 1000 * 10) public void process() { if (enabled == null || !enabled && !"1".equals(redisService.getFromString(MIGRATION_SWITCH))) { return; } long beginTime = System.currentTimeMillis(); String commLog = "数据迁移>>开始时间(" + OffsetDateTime.now().toLocalDateTime() + ")"; ExecutorService executorService = Executors.newFixedThreadPool(threadNum); CountDownLatch latch = new CountDownLatch(threadNum); try { for (int i = 0; i < threadNum; i++) { executorService.submit(() -> { Long solveCount = qps / threadNum * 60L; Long offset = redisService.incr(MIGRATION_INCR); // offset == 1L 表示刚开始迁移,当前ID需要设置为0 Long currId = offset == 0L ? 0L : (offset - 1) * solveCount; // 当前ID Long idLe = offset * solveCount; executeMigrate(commLog, offset, currId, idLe); latch.countDown(); }); } latch.await(); } catch (InterruptedException e) { log.error(commLog + "整体异常【offset: %s】", e); } finally { long endTime = System.currentTimeMillis(); log.info(String.format(commLog + "线程数量:%s 个,总花费时长:%s 秒", threadNum, String.format("%.4f", (endTime - beginTime) / 1000d))); executorService.shutdown(); } } private void executeMigrate(String commLog, Long offset, Long currId, Long idLe) { commLog = String.format("%s【offset: %s,处理的ID的范围:(%s, %s]】->", commLog, offset, currId, idLe); log.info(commLog + "开始..."); long startTime = System.currentTimeMillis(); Connection connOld = null; Connection connNew = null; Statement stmt = null; ResultSet resFromOld = null; PreparedStatement pstmt = null; Long currMaxId = 0L; // 当前最大的ID Long count = 0L; // 迁移数量 try { connOld = conn.getOldConn(); connNew = conn.getNewConn(); connNew.setAutoCommit(false); String oldSql = "SELECT a.id, LOWER(CONCAT(/n" + " SUBSTR(HEX(a.guid), 1, 8), '-',/n" + " SUBSTR(HEX(a.guid), 9, 4), '-',/n" + " SUBSTR(HEX(a.guid), 13, 4), '-',/n" + " SUBSTR(HEX(a.guid), 17, 4), '-',/n" + " SUBSTR(HEX(a.guid), 21)/n" + " )) as guid/n" + "..." + "a.created, a.created/n" + " FROM old.old_table a left join old.extra b/n" + " on a.id = b.deliverId " + "where a.id > " + currId + " and a.id <= " + idLe; String newSql = "INSERT IGNORE INTO new.new_table (id, guid, ..., created, updated) " + "VALUES (?, ?, ..., ?, ?)"; stmt = connOld.createStatement(); long readStartTime = System.currentTimeMillis(); stmt.execute(oldSql); // 执行sql long readEndTime = System.currentTimeMillis(); log.info(String.format(commLog + "读花费时长:%s 毫秒", readEndTime - readStartTime)); resFromOld = stmt.getResultSet(); // 获取结果集; pstmt = connNew.prepareStatement(newSql); // 预编译 int num = 0; long writeStartTime = System.currentTimeMillis(); while (resFromOld.next()) { num++; count++; pstmt.setLong(1, resFromOld.getLong("id")); pstmt.setString(2, resFromOld.getString("guid")); ... ... ... pstmt.setTimestamp(16, resFromOld.getTimestamp("created")); pstmt.setTimestamp(17, resFromOld.getTimestamp("created")); pstmt.addBatch(); // 每everyCommitCount条数据提交一次事务 if (num > everyCommitCount) { long executeStartTime = System.currentTimeMillis(); pstmt.executeBatch(); connNew.commit(); pstmt.clearBatch(); long executeEndTime = System.currentTimeMillis(); log.info(String.format(commLog + "每组执行花费时长(每组执行 %s 条数据):%s 毫秒", everyCommitCount, executeEndTime - executeStartTime)); num = 0; } currMaxId = Math.max(resFromOld.getLong("id"), currMaxId); } pstmt.executeBatch(); connNew.commit(); long writeEndTime = System.currentTimeMillis(); log.info(String.format(commLog + "写花费时长:%s 秒", String.format("%.4f", (writeEndTime - writeStartTime) / 1000d))); } catch (Exception e) { log.error(commLog + "错误", e); } finally { conn.closeConn(connOld, null, connNew, stmt, resFromOld, pstmt); long endTime = System.currentTimeMillis(); // 记录程序结束时间 log.info(String.format(commLog + "单次总共扫描了:%s 条数据,本次redis中存的迁移偏移ID: %s, " + "本次迁移的最大的ID:%s,消耗时长:%s 秒", count, currId, currMaxId, String.format("%.4f", (endTime - startTime) / 1000d))); } } }
@Override public Long incr(String key) { RedisAtomicLong entityIdCounter = new RedisAtomicLong(key, redisTemplate.getConnectionFactory()); return entityIdCounter.getAndIncrement(); }