初次写作尝试:本文试以问答形式对quartz做一些介绍。
Ⅰ Quartz是什么?为什么要有这样一篇文章?
Quartz 是一个完全由 Java 编写的开源作业调度框架,为在 Java 应用程序中进行作业调度提供了简单却强大的机制。Quartz最早的issue记录在jira.terracotta.org,时间可以追溯到大约2010年的2月。Quartz年代久远难以满足技术人的新奇感,也没有hadoop、spark那么重量、时髦,但深入了解一下这位老前辈的设计思想,仍能使我们获得一些开发方面的参考,这也正是本文的初衷。
Ⅱ 作业调度是什么?
想象一下,所有程序都是被动触发的,比如对于web程序,一次外部请求就是一次触发。有一类程序需要由时间触发,比如抢火车票的场景,每隔一段时间查询一次余票,最简单的实现,就是构造一个死循环,不断查询余票,每次启动主程序时,同时启动这个线程,这样就有了一个随时间周期性自动运行的程序。周期性触发称为一种调度规则,“由时间触发程序运行”就称为“作业调度”。
Ⅲ 我们为什么需要作业调度?
时间的流动不知疲倦永不停歇,由时间触发程序能自动化大量业务过程。
Ⅳ Quartz怎样实现作业调度?
仔细考虑一下我们需要的作业调度应该有些什么:
1. 一个“调度者”,
2. 一个“调度规则”,
3. 一个被调度的“作业”
这三者可以构成一个有基本功能的作业调度,在quartz中,Scheduler对应“调度者”, Trigger对应“调度规则”,Job对应“作业”。
调度规则和作业配置在quartz的保存方式有很多实现,基于内存的RAMJobStore、基于JDBC的JobStoreSupport等等。在Scheduler和JobStore之间还有一层DriveDelegate,DriveDelegate非常像JobStore,只是扩展了基于不同database实现时的一些实现。Listener则提供了Scheduler、Trigger、Job运行时,一些时间点的通知。
自绘业务架构图:
Ⅴ 描述一下Quartz的一些基本活动?
首先是Quartz启动时的活动图:
看一下对应的源码片段:
启动的入口在QuartzScheduler的start:
1. 在QuartzScheduler.start()前,主调度线程SchedulerThread和其他配置通过StdSchedulerFactory初始化,源代码在StdSchedulerFactory.instantiate(),接近800行,此处不列举。
2. 检查Scheduler是否正在关闭或者已关闭。
3. 通知SchedulerListener正在启动。
4. 启动Scheduler,启动插件。
5. 如果Scheduler从暂停恢复运行,通知JobSupport恢复Scheduler。
6. 通知主调度线程开始运行。
7. 通知SchedulerListener启动完成。
public class QuartzScheduler implements RemotableQuartzScheduler { public void start() throws SchedulerException { // 检查Scheduler是否正在关闭或者已关闭 if (shuttingDown|| closed) { throw new SchedulerException( "The Scheduler cannot be restarted after shutdown() has been called."); } //通知SchedulerListener正在启动 notifySchedulerListenersStarting(); if (initialStart == null) { initialStart = new Date(); //启动Scheduler this.resources.getJobStore().schedulerStarted(); //启动各种插件 startPlugins(); } else { //如果Scheduler从暂停恢复运行,通知JobSupport恢复Scheduler resources.getJobStore().schedulerResumed(); } //通知主调度线程可以开始运行 schedThread.togglePause(false); //通知SchedulerListener启动完成 notifySchedulerListenersStarted(); } }
通知监听:创建SchedulerListener的列表并逐个通知。
public class QuartzScheduler implements RemotableQuartzScheduler { public void notifySchedulerListenersStarting() { // 创建SchedulerListener的列表. List<SchedulerListener> schedListeners = buildSchedulerListenerList(); // 逐个通知Listener for (SchedulerListener sl : schedListeners) { sl.schedulerStarting(); } } }
获取监听列表:从ListenerManager获取Listener,这里需要开发者主动将自己的Listener注册到ListenerManager。
public class QuartzScheduler implements RemotableQuartzScheduler { private List<SchedulerListener> buildSchedulerListenerList() { List<SchedulerListener> allListeners = new LinkedList<SchedulerListener>(); // 从ListenerManager获取Listener 这里需要开发者主动将自己的Listener注册到ListenerManager allListeners.addAll(getListenerManager().getSchedulerListeners()); allListeners.addAll(getInternalSchedulerListeners()); return allListeners; } }
正式启动Scheduler
1. 集群部署时初始化ClusterManager线程并启动。
2. 单机部署时恢复Job。
3. 初始化MisFire处理线程并启动。
public abstract class JobStoreSupportimplements JobStore,Constants { public void schedulerStarted() throws SchedulerException { // 集群部署时初始化ClusterManager线程并启动 if (isClustered()) { clusterManagementThread = new ClusterManager(); clusterManagementThread.initialize(); } else { //单机部署直接恢复Job recoverJobs(); } //初始化MisFire处理线程并启动 misfireHandler = new MisfireHandler(); misfireHandler.initialize(); } }
首先初始化ClusterManager,把ClusterManager放进线程池执行。
class ClusterManager extends Thread { public void initialize() { // 初始化ClusterManager this.manage(); ThreadExecutor executor = getThreadExecutor(); // 把ClusterManager放进线程池执行 executor.execute(ClusterManager.this); } }
ClusterManage 进一步doCheckin。
class ClusterManager extends Thread { private boolean manage() { boolean res = false; // 节点登入集群 res = doCheckin(); return res; } }
Checkin 细节:
1. 每次checkin都检查是否有意外中断的作业。
2. 从db获取锁后,再恢复作业。
public abstract class JobStoreSupportimplements JobStore,Constants { protected boolean doCheckin() throws JobPersistenceException { boolean recovered = false; Connection conn = getNonManagedTXConnection(); try { // 每次都要检查是否有意外中断的作业 List<SchedulerStateRecord> failedRecords = null; if (!firstCheckIn) { failedRecords = clusterCheckIn(conn); commitConnection(conn); } if (firstCheckIn || (failedRecords.size() > 0)) { // 从db获取锁 getLockHandler().obtainLock(conn, LOCK_STATE_ACCESS); transStateOwner = true; failedRecords = (firstCheckIn) ? clusterCheckIn(conn) : findFailedInstances(conn); if (failedRecords.size() > 0) { getLockHandler().obtainLock(conn, LOCK_TRIGGER_ACCESS); // 恢复中断的作业 clusterRecover(conn, failedRecords); recovered = true; } } commitConnection(conn); } catch (JobPersistenceException e) { rollbackConnection(conn); } finally { } firstCheckIn = false; return recovered; } }
恢复作业:首先找到意外中断的调度记录,保存更新节点checkin的时间。
public abstract class JobStoreSupportimplements JobStore,Constants { protected List<SchedulerStateRecord> clusterCheckIn(Connection conn) throws JobPersistenceException { //找到意外中断的调度记录 List<SchedulerStateRecord> failedInstances = findFailedInstances(conn); try { // 保存更新节点checkin的时间 lastCheckin = System.currentTimeMillis(); if (getDelegate().updateSchedulerState(conn, getInstanceId(), lastCheckin) == 0) { getDelegate().insertSchedulerState(conn, getInstanceId(), lastCheckin, getClusterCheckinInterval()); } } catch (Exception e) { } return failedInstances; } }
恢复细节:查询获得一个集群中所有失败节点的列表,如果当前节点首次checkin,列表里会有当前节点:
1. 查询所有节点的调度记录。
2. 找到当前节点的调度记录。
3. 找到所有调度意外中断的节点的记录。
如果是当前节点第一次checkIn,还要把有触发记录但丢失调度记录的补全,构造虚拟的调度记录。
public abstract class JobStoreSupportimplements JobStore,Constants { protected List<SchedulerStateRecord> findFailedInstances(Connection conn) throws JobPersistenceException { try { List<SchedulerStateRecord> failedInstances = new LinkedList<SchedulerStateRecord>(); long timeNow = System.currentTimeMillis(); // 查询所有节点的调度记录 List<SchedulerStateRecord> states = getDelegate().selectSchedulerStateRecords(conn, null); for (SchedulerStateRecord rec : states) { // 找到当前节点的记录 if (rec.getSchedulerInstanceId().equals(getInstanceId())) { if (firstCheckIn) { failedInstances.add(rec); } } else { // 找到所有调度意外中断的节点的记录 if (calcFailedIfAfter(rec) < timeNow) { failedInstances.add(rec); } } } // 如果是当前节点第一次checkIn,还要把有触发记录但丢失调度记录的补全,构造虚拟的调度记录. if (firstCheckIn) { failedInstances.addAll(findOrphanedFailedInstances(conn, states)); } return failedInstances; } catch (Exception e) { } } }
细节:针对有Trigger调度记录但没有Scheduler调度记录的,创建虚拟的Scheduler调度记录 。
public abstract class JobStoreSupportimplements JobStore,Constants { private List<SchedulerStateRecord> findOrphanedFailedInstances( Connection conn, List<SchedulerStateRecord> schedulerStateRecords) throws SQLException, NoSuchDelegateException { List<SchedulerStateRecord> orphanedInstances = new ArrayList<SchedulerStateRecord>(); Set<String> allFiredTriggerInstanceNames = getDelegate().selectFiredTriggerInstanceNames(conn); if (!allFiredTriggerInstanceNames.isEmpty()) { for (SchedulerStateRecord rec : schedulerStateRecords) { allFiredTriggerInstanceNames.remove(rec.getSchedulerInstanceId()); } for (String inst : allFiredTriggerInstanceNames) { SchedulerStateRecord orphanedInstance = new SchedulerStateRecord(); orphanedInstance.setSchedulerInstanceId(inst); orphanedInstances.add(orphanedInstance); } } return orphanedInstances; } }
完成作业恢复:然后用这些调度记录创建SimpleTriggerImpl,恢复对应的Trigger,通知主调度线程调度。
public abstract class JobStoreSupportimplements JobStore,Constants { protected void clusterRecover(Connection conn, List<SchedulerStateRecord> failedInstances) throws JobPersistenceException { if (failedInstances.size() > 0) { long recoverIds = System.currentTimeMillis(); try { for (SchedulerStateRecord rec : failedInstances) { List<FiredTriggerRecord> firedTriggerRecs = getDelegate().selectInstancesFiredTriggerRecords(conn, rec.getSchedulerInstanceId()); int acquiredCount = 0; int recoveredCount = 0; int otherCount = 0; Set<TriggerKey> triggerKeys = new HashSet<TriggerKey>(); for (FiredTriggerRecord ftRec : firedTriggerRecs) { TriggerKey tKey = ftRec.getTriggerKey(); JobKey jKey = ftRec.getJobKey(); triggerKeys.add(tKey); if (ftRec.getFireInstanceState().equals(STATE_BLOCKED)) { getDelegate().updateTriggerStatesForJobFromOtherState(conn, jKey, STATE_WAITING, STATE_BLOCKED); } else if (ftRec.getFireInstanceState().equals(STATE_PAUSED_BLOCKED)) { getDelegate().updateTriggerStatesForJobFromOtherState(conn, jKey, STATE_PAUSED, STATE_PAUSED_BLOCKED); } if (ftRec.getFireInstanceState().equals(STATE_ACQUIRED)) { getDelegate().updateTriggerStateFromOtherState(conn, tKey, STATE_WAITING, STATE_ACQUIRED); acquiredCount++; } else if (ftRec.isJobRequestsRecovery()) { if (jobExists(conn, jKey)) { SimpleTriggerImpl rcvryTrig = new SimpleTriggerImpl("recover_" + rec.getSchedulerInstanceId() + "_" + String.valueOf(recoverIds++), Scheduler.DEFAULT_RECOVERY_GROUP, new Date(ftRec.getScheduleTimestamp())); rcvryTrig.setJobName(jKey.getName()); rcvryTrig.setJobGroup(jKey.getGroup()); rcvryTrig.setMisfireInstruction(SimpleTrigger.MISFIRE_INSTRUCTION_IGNORE_MISFIRE_POLICY); rcvryTrig.setPriority(ftRec.getPriority()); JobDataMap jd = getDelegate().selectTriggerJobDataMap(conn, tKey.getName(), tKey.getGroup()); jd.put(Scheduler.FAILED_JOB_ORIGINAL_TRIGGER_NAME, tKey.getName()); jd.put(Scheduler.FAILED_JOB_ORIGINAL_TRIGGER_GROUP, tKey.getGroup()); jd.put(Scheduler.FAILED_JOB_ORIGINAL_TRIGGER_FIRETIME_IN_MILLISECONDS, String.valueOf(ftRec.getFireTimestamp())); jd.put(Scheduler.FAILED_JOB_ORIGINAL_TRIGGER_SCHEDULED_FIRETIME_IN_MILLISECONDS, String.valueOf(ftRec.getScheduleTimestamp())); rcvryTrig.setJobDataMap(jd); rcvryTrig.computeFirstFireTime(null); storeTrigger(conn, rcvryTrig, null, false, STATE_WAITING, false, true); recoveredCount++; } else { otherCount++; } } else { otherCount++; } if (ftRec.isJobDisallowsConcurrentExecution()) { getDelegate().updateTriggerStatesForJobFromOtherState(conn, jKey, STATE_WAITING, STATE_BLOCKED); getDelegate().updateTriggerStatesForJobFromOtherState(conn, jKey, STATE_PAUSED, STATE_PAUSED_BLOCKED); } } getDelegate().deleteFiredTriggers(conn, rec.getSchedulerInstanceId()); int completeCount = 0; for (TriggerKey triggerKey : triggerKeys) { if (getDelegate().selectTriggerState(conn, triggerKey).equals(STATE_COMPLETE)) { List<FiredTriggerRecord> firedTriggers = getDelegate().selectFiredTriggerRecords(conn, triggerKey.getName(), triggerKey.getGroup()); if (firedTriggers.isEmpty()) { if (removeTrigger(conn, triggerKey)) { completeCount++; } } } } if (!rec.getSchedulerInstanceId().equals(getInstanceId())) { getDelegate().deleteSchedulerState(conn, rec.getSchedulerInstanceId()); } } } catch (Throwable e) { } } } }
此外,ClusterManager运行时也会周期性地恢复其他异常节点调度的Trigger,并且立即通知当前节点的调度线程插入这些立即触发的Trigger。
class ClusterManager extends Thread { public void run() { while (!shutdown) { if (!shutdown) { long timeToSleep = getClusterCheckinInterval(); long transpiredTime = (System.currentTimeMillis() - lastCheckin); timeToSleep = timeToSleep - transpiredTime; if (timeToSleep <= 0) { timeToSleep = 100L; } if (numFails > 0) { timeToSleep = Math.max(getDbRetryInterval(), timeToSleep); } try { Thread.sleep(timeToSleep); } catch (Exception ignore) { } } if (!shutdown && this.manage()) { //通知当前节点的主调度线程插入一批新的Trigger触发 signalSchedulingChangeImmediately(0L); } } } }
错过了Trigger的触发时间会怎样?
有一个专门处理错过触发时间超过一定阈值(60s)的线程,活动图如下:
Misfire 处理线程启动:
1. 查询错过触发时间阈值的作业。
2. 通知主调度线程插入这些重新调度的作业。
class MisfireHandler extends Thread { @Override public void run() { while (!shutdown) { long sTime = System.currentTimeMillis(); // 查询错过触发阈值的作业 RecoverMisfiredJobsResult recoverMisfiredJobsResult = manage(); // 通知主调度线程插入这些重新调度的作业 if (recoverMisfiredJobsResult.getProcessedMisfiredTriggerCount() > 0) { signalSchedulingChangeImmediately(recoverMisfiredJobsResult.getEarliestNewTime()); } if (!shutdown) { long timeToSleep = 50l; //sleep直到下个循环 if (!recoverMisfiredJobsResult.hasMoreMisfiredTriggers()) { timeToSleep = getMisfireThreshold() - (System.currentTimeMillis() - sTime); if (timeToSleep <= 0) { timeToSleep = 50l; } if (numFails > 0) { timeToSleep = Math.max(getDbRetryInterval(), timeToSleep); } } try { Thread.sleep(timeToSleep); } catch (Exception ignore) { } }//while !shutdown } } }
manage 继续调用doRecoverMisfires。
class MisfireHandler extends Thread { private RecoverMisfiredJobsResult manage() { try { //查询错过触发的job RecoverMisfiredJobsResult res = doRecoverMisfires(); return res; } catch (Exception e) { } return RecoverMisfiredJobsResult.NO_OP; } }
Count 是否有错过触发需要重新调度的作业,再获取集群锁,然后再获取作业。
class MisfireHandler extends Thread { protected RecoverMisfiredJobsResult doRecoverMisfires() throws JobPersistenceException { boolean transOwner = false; Connection conn = getNonManagedTXConnection(); try { RecoverMisfiredJobsResult result = RecoverMisfiredJobsResult.NO_OP; // Count是否有错过触发需要重新调度的作业,再获取集群锁,然后再获取作业 int misfireCount = (getDoubleCheckLockMisfireHandler()) ? getDelegate().countMisfiredTriggersInState(conn, STATE_WAITING, getMisfireTime()) : Integer.MAX_VALUE; if (misfireCount == 0) { } else { transOwner = getLockHandler().obtainLock(conn, LOCK_TRIGGER_ACCESS); result = recoverMisfiredJobs(conn, false); } commitConnection(conn); return result; } finally { } } }
最后通知主调度线程正式启动。
public class QuartzSchedulerThread extends Thread { void togglePause(boolean pause) { synchronized (sigLock) { paused = pause; if (paused) { signalSchedulingChange(0); } else { sigLock.notifyAll(); } } } }
然后是调度作业的活动图:
再来看主调度线程的代码片段:
1. 检查是否关闭、暂停主调度线程,然后wait。
2. DB 有异常时稍微等待再继续。
3. 获取空闲线程
4. 获取一批即将调度的Trigger。
5. 距触发时间有一段时间时,检查是否有其他插入的Trigger,wait
6. 如果找到了其他插入的Trigger,释放当前的一批Trigger,重新循环。
7. 通知JobStore trigger已经被触发,获取触发结果。
8. Trigger 触发结果是失败时释放这个Trigger。
9. Trigger 触发成功时,创建JobRunShell对象,JobRunlShell初始化获取调度作业。
10. 线程池运行调度作业。
public class QuartzSchedulerThread extends Thread { public void run() { int acquiresFailed = 0; // 检查是否关闭、暂停主调度线程,然后wait while (!halted.get()) { try { synchronized (sigLock) { while (paused && !halted.get()) { sigLock.wait(1000L); acquiresFailed = 0; } if (halted.get()) { break; } } // DB有异常时稍微等待再继续 if (acquiresFailed > 1) { long delay = computeDelayForRepeatedErrors(qsRsrcs.getJobStore(), acquiresFailed); Thread.sleep(delay); } // 获取空闲线程,但这里其实恒为true int availThreadCount = qsRsrcs.getThreadPool().blockForAvailableThreads(); if (availThreadCount > 0) { List<OperableTrigger> triggers; long now = System.currentTimeMillis(); clearSignaledSchedulingChange(); try { // 获取一批即将调度的Trigger triggers = qsRsrcs.getJobStore().acquireNextTriggers(now + idleWaitTime, Math.min(availThreadCount, qsRsrcs.getMaxBatchSize()), qsRsrcs.getBatchTimeWindow()); acquiresFailed = 0; } catch (JobPersistenceException jpe) { if (acquiresFailed == 0) { qs.notifySchedulerListenersError("An error occurred while scanning for the next triggers to fire.", jpe); } acquiresFailed++; continue; } acquiresFailed++; continue; } if (triggers != null && !triggers.isEmpty()) { now = System.currentTimeMillis(); long triggerTime = triggers.get(0).getNextFireTime().getTime(); long timeUntilTrigger = triggerTime - now; // 距触发时间有一段时间时,检查是否有其他插入的Trigger,wait while (timeUntilTrigger > 2) { synchronized (sigLock) { if (halted.get()) { break; } if (!isCandidateNewTimeEarlierWithinReason(triggerTime, false)) { sigLock.wait(timeUntilTrigger); } } //如果找到了其他插入的Trigger,释放当前的一批Trigger,重新循环 if (releaseIfScheduleChangedSignificantly(triggers, triggerTime)) { break; } now = System.currentTimeMillis(); timeUntilTrigger = triggerTime - now; } if (triggers.isEmpty()) continue; List<TriggerFiredResult> bndles = new ArrayList<TriggerFiredResult>(); boolean goAhead = true; synchronized (sigLock) { goAhead = !halted.get(); } if (goAhead) { try { //通知JobStore trigger已经被触发,获取触发结果 List<TriggerFiredResult> res = qsRsrcs.getJobStore().triggersFired(triggers); if (res != null) bndles = res; } catch (SchedulerException se) { qs.notifySchedulerListenersError("An error occurred while firing triggers '" + triggers + "'", se); for (int i = 0; i < triggers.size(); i++) { qsRsrcs.getJobStore().releaseAcquiredTrigger(triggers.get(i)); } continue; } } for (int i = 0; i < bndles.size(); i++) { TriggerFiredResult result = bndles.get(i); TriggerFiredBundle bndle = result.getTriggerFiredBundle(); Exception exception = result.getException(); //Trigger触发结果是失败时释放这个Trigger if (exception instanceof RuntimeException) { qsRsrcs.getJobStore().releaseAcquiredTrigger(triggers.get(i)); continue; } if (bndle == null) { qsRsrcs.getJobStore().releaseAcquiredTrigger(triggers.get(i)); continue; } //Trigger触发成功时,创建JobRunShell对象,JobRunlShell初始化获取调度作业 JobRunShell shell = null; try { shell = qsRsrcs.getJobRunShellFactory().createJobRunShell(bndle); shell.initialize(qs); } catch (SchedulerException se) { qsRsrcs.getJobStore().triggeredJobComplete(triggers.get(i), bndle.getJobDetail(), CompletedExecutionInstruction.SET_ALL_JOB_TRIGGERS_ERROR); continue; } //线程池运行调度作业, if (qsRsrcs.getThreadPool().runInThread(shell) == false) { qsRsrcs.getJobStore().triggeredJobComplete(triggers.get(i), bndle.getJobDetail(), CompletedExecutionInstruction.SET_ALL_JOB_TRIGGERS_ERROR); } } continue; } else { continue; // while (!halted) } long now = System.currentTimeMillis(); long waitTime = now + getRandomizedIdleWaitTime(); long timeUntilContinue = waitTime - now; synchronized (sigLock) { if (!halted.get()) { if (!isScheduleChanged()) { sigLock.wait(timeUntilContinue); } } } } catch (RuntimeException re) { } } } }
默认线程池运行作业:
1. wait 空闲线程或继续执行。
2. 即使线程池被关闭,依然可以继续执行作业。
public class SimpleThreadPool implements ThreadPool { public boolean runInThread(Runnable runnable) { synchronized (nextRunnableLock) { handoffPending = true; // wait空闲线程或继续执行 // Wait until a worker thread is available while ((availWorkers.size() < 1) && !isShutdown) { nextRunnableLock.wait(500); } if (!isShutdown) { WorkerThread wt = (WorkerThread) availWorkers.removeFirst(); busyWorkers.add(wt); wt.run(runnable); } else { // 即使线程池被关闭,依然可以继续执行作业 WorkerThread wt = new WorkerThread(this, threadGroup, "WorkerThread-LastJob", prio, isMakeThreadsDaemons(), runnable); busyWorkers.add(wt); workers.add(wt); wt.start(); } nextRunnableLock.notifyAll(); handoffPending = false; } return true; } }
这里的run只是唤醒当前对象在另一个线程里的wait。
class WorkerThread extends Thread { public void run(Runnable newRunnable) { synchronized (lock) { runnable = newRunnable; lock.notifyAll(); } } }
JobRunShell 在这里执行。
class WorkerThread extends Thread { public void run() { boolean ran = false; while (run.get()) { try { synchronized (lock) { // 没有作业时wait空循环 while (runnable == null && run.get()) { lock.wait(500); } // 有作业时执行 if (runnable != null) { ran = true; runnable.run(); } } } finally { if (getPriority() != tp.getThreadPriority()) { setPriority(tp.getThreadPriority()); } if (runOnce) { run.set(false); clearFromBusyWorkersList(this); } else if (ran) { ran = false; makeAvailable(this); } } } } }
JobShell 内部实现:
1. 如果有事务,开启事务。
2. 通知TriggerListener和JobListener。
3. 传入Job的上下文运行Job。
4. 事务控制的Job在事务异常时才允许重复运行job。
5. 结束事务。
public class JobRunShell extends SchedulerListenerSupport implements Runnable { public void run() { qs.addInternalSchedulerListener(this); try { OperableTrigger trigger = (OperableTrigger) jec.getTrigger(); JobDetail jobDetail = jec.getJobDetail(); do { JobExecutionException jobExEx = null; Job job = jec.getJobInstance(); try { // 如果有事务,开启事务 begin(); } catch (SchedulerException se) { break; } // 通知TriggerListener和JobListener try { if (!notifyListenersBeginning(jec)) { break; } } catch (VetoedException ve) { try { CompletedExecutionInstruction instCode = trigger.executionComplete(jec, null); qs.notifyJobStoreJobVetoed(trigger, jobDetail, instCode); if (jec.getTrigger().getNextFireTime() == null) { qs.notifySchedulerListenersFinalized(jec.getTrigger()); } complete(true); } catch (SchedulerException se) { qs.notifySchedulerListenersError("Error during veto of Job (" + jec.getJobDetail().getKey() + ": couldn't finalize execution.", se); } break; } long startTime = System.currentTimeMillis(); long endTime = startTime; // 传入Job的上下文运行Job // execute the job try { job.execute(jec); endTime = System.currentTimeMillis(); } catch (JobExecutionException jee) { endTime = System.currentTimeMillis(); jobExEx = jee; } catch (Throwable e) { endTime = System.currentTimeMillis(); qs.notifySchedulerListenersError("Job (" + jec.getJobDetail().getKey() + " threw an exception.", se); jobExEx = new JobExecutionException(se, false); } jec.setJobRunTime(endTime - startTime); if (!notifyJobListenersComplete(jec, jobExEx)) { break; } CompletedExecutionInstruction instCode = CompletedExecutionInstruction.NOOP; try { instCode = trigger.executionComplete(jec, jobExEx); } catch (Exception e) { SchedulerException se = new SchedulerException("Trigger threw an unhandled exception.", e); qs.notifySchedulerListenersError("Please report this error to the Quartz developers.", se); } if (!notifyTriggerListenersComplete(jec, instCode)) { break; } // 事务控制的Job在事务异常时才允许重复运行job. if (instCode == CompletedExecutionInstruction.RE_EXECUTE_JOB) { jec.incrementRefireCount(); try { complete(false); } catch (SchedulerException se) { qs.notifySchedulerListenersError("Error executing Job (" + jec.getJobDetail().getKey() + ": couldn't finalize execution.", se); } continue; } try { // 结束事务. complete(true); } catch (SchedulerException se) { qs.notifySchedulerListenersError("Error executing Job (" + jec.getJobDetail().getKey() + ": couldn't finalize execution.", se); continue; } qs.notifyJobStoreJobComplete(trigger, jobDetail, instCode); break; } while (true); } finally { qs.removeInternalSchedulerListener(this); } } }
Ⅵ 对比一下Quartz和其他流行的调度作业框架?
58 的作业调度框架基于XXL-JOB,XXL-JOB早期基于Quartz实现调度。由于对XXL-JOB并不熟悉,因此直接参考了官方文档,XXL-JOB官方文档将自己和Quartz做了对比:
Quartz 作为开源作业调度中的佼佼者,是作业调度的首选。但是集群环境中Quartz采用API的方式对任务进行管理,从而可以避免上述问题,但是同样存在以下问题:
问题一:调用API的的方式操作任务,不人性化;
问题二:需要持久化业务QuartzJobBean到底层数据表中,系统侵入性相当严重。
问题三:调度逻辑和QuartzJobBean耦合在同一个项目中,这将导致一个问题,在调度任务数量逐 渐增多,同时调度任务逻辑逐渐加重的情况加,此时调度系统的性能将大大受限于业务;
问题四:quartz底层以“抢占式”获取DB锁并由抢占成功节点负责运行任务,会导致节点负载悬殊非 常大;而XXL-JOB通过执行器实现“协同分配式”运行任务,充分发挥集群优势,负载各节点均衡。
XXL-JOB 弥补了quartz的上述不足之处。
通过XXL-JOB官方文档我们主要了解到,XXL-JOB提供了一套界面且操作API管理调度作业,优化了各节点负载,但并非XXL-JOB能优化负载而Quartz不能,比如作业以Job为单元执行,将作业分散部署在多个集群,将作业量接近的Job部署在同一个集群内,节点内控制合适的线程池数量,负载问题可以缓解一大块。对于问题三,调度任务增多影响性能的问题,根源实际上在于将作业代码写在调度集群内,通过进程隔离解决这个问题并不难。对于问题二,持久化业务需要保存数据到数据表,事实上任何作业调度都无法避免,通过服务拆分和进程隔离,仍然可以一定程度缓解这个问题。总的来看,XXL-JOB对Quartz做出了一些优化,也不失为一个作业调度的选择。
最后总结一下本文:
本文从两方面初步介绍了Quartz的基本实现:
1. Quartz 启动的主流程:
1) 通过配置初始化Scheduler和SchedulerThread主调度线程。
2) 以集群节点或单机的身份恢复作业调度。
3) 启动Misfire处理,检查恢复错过调度一定时间阈值的作业。
4) 在各个节点通知Listener。
2. Quartz 调度作业的流程:
1) 获取Trigger和对应的Job。
2) 检查并立即调度插入的Trigger和Job。
3) 将Trigger和Job交给作业线程池执行。
4) 在各个节点通知Listener。
本文没有涉及的:
1. Quartz 初始化配置的过程。
2. Quartz 的JobStore的多种实现以及细节。
3. Job 数据的持久化。
4. Job 的事务控制。
5. Listener 和Plugin。
6. Cluster 的细节。
感兴趣的可以自己下载源码阅读。