项目地址: https://github.com/littlechare/job-manager
项目详细的信息请参看项目的 README
下载地址: https://download.csdn.net/download/w172087242/10333176
之前想写一个关于定时任务的组建,便于项目中快速集成;
写完之后,又继续在此基础上演变成系统,当然该系统还缺少很多东西;
如业务方的bid生成与验证、任务回调响应报文的规则、任务调度的监控、如何做HA等等;
因为是个人项目,所以用了部分下班时间和本次清明节的空闲时间实现,
所以项目可定也有不足的地方,但是如果开发者想使用,也可以下载下来快速
集成,也可在此基础上加以修改。
任务类型 | 说明 | 表达式示例 | 备注 |
---|---|---|---|
CRON | Cron类型的任务,不支持表达式更改 | 0/20 * * * * *? | 每20秒调用一次 |
TRIGGER | Trigger类型的任务,支持表达式更改 | 0 0 4 * * *? | 每天凌晨4点调用 |
FIXED | Fixed类型任务,不支持表达式更改 | fixed=5000 | 单位毫秒,表示每5秒调用一次 |
FIXED_DELAY | Fixed_delay任务,不支持表达式更改 | fixed=5000,delay=10000 | 单位毫秒,表示延迟10秒后执行 |
1.业务方需要自己维护业务方编号(bid)和任务编号;
2.支持任务的随时取消、随时更改、随时添加等,回调地址一http(s)打头
模块名称 | 功能 | 说明 |
---|---|---|
job-dispatcher-base | 任务基础模块 | 定时任务能力提供者(可抽离成组件) |
job-dispatcher-biz | 任务业务模块 | 业务管理与任务触发模块 |
job-dispatcher-manager | 任务管理模块 | 管理任务变动并发出变动事件提醒(分布部署可基于消息进行改进) |
job-dispatcher-test | 测试模块 | 独立模块,含有一个接口,便于测试任务回调 |
类名(简要名称,前缀省略com.littlehow.job) | 功能 | 说明 |
---|---|---|
base.config.ScheduleTaskConfig | 定时任务配置器 | 任务的核心管理器 |
base.config. TaskConfig | 定时任务缓存 | 缓存任务信息 |
base.config. TaskType | 定时任务类型 | 枚举CRON,TRIGGER,FIXED,FIXED_DELAY |
base. BaseTask | 任务基础信息 | 任务实体bean,实现runnable和initializingbean |
cron基础类 | 因为csdn编辑器问题,类名写于此BaseCronTask | |
base. BaseFixedTask | fixed基础类 | 实现baseTask类,与cron类似 |
ScheduleBaseService | service类 | 对外暴露服务类 |
类名 (要名简称,前缀省略com.littlehow.job) | 功能 | 说明 |
---|---|---|
manager.api.TaskManagerService | 任务持久化管理api | manager模块可以独立出去,所以该api为唯一访问接口 |
manager.event.TaskEvent | 任务事件 | 任务发送的事件实体,继承ApplicationEvent |
manager.event.TaskEventType | 任务事件类型 | 枚举ADD,UPDATE,REMOVE |
manager.support.MysqlTaskManagerSupport | api实现 | mysql持久化任务的实现 |
类名 (要名简称,前缀省略 com.littlehow.job ) | 功能 | 说明 |
---|---|---|
advice.ExceptionAdvice | 异常统一处理 | |
advice.SuccessResponseAdvice | 成功统一处理 | |
config.InterceptorConfig | 拦截器配置 | |
interceptor.BusinessInterceptor | 拦截器实现 | 主要运用验证和获取header中的bid |
interceptor.BusinessContext | 上下文 | bid上下文管理 |
listener.TaskEventListener | 任务事件监听 | 监听任务事件,实现ApplicationListener接口 |
service.CallbackExecuteService | 回调处理服务 | 定时任务的回调地址调用 |
service.TaskExecuteService | 任务处理服务 | |
controller.TaskManagerController | 任务管理web接口 | 支持任务的增删改查 |
init.InitTask | 初始化任务 | 任务系统重启后需要重新将已有任务加入任务队列 |
vo.ResponseEntity | 相应参数 | |
vo.ArgumentsException | 参数异常 | 校验参数时,如果不合法会抛出此异常 |
JobStart | 启动类 | springboot项目启动类 |
类名 (要名简称,前缀省略 com.littlehow.job ) | 功能 | 说明 |
---|---|---|
test.TestController | 测试controller | |
test.TestStart | 测试启动类 | springboot启动类 |
package com.littlehow.job.base.config; import com.littlehow.job.base.BaseTask; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Configurable; import org.springframework.context.annotation.Lazy; import org.springframework.scheduling.annotation.EnableScheduling; import org.springframework.scheduling.annotation.SchedulingConfigurer; import org.springframework.scheduling.config.*; import org.springframework.scheduling.support.CronTrigger; import org.springframework.stereotype.Component; import java.util.Date; @Lazy(false) @Configurable @Component @EnableScheduling public class ScheduleTaskConfig implements SchedulingConfigurer { private Logger log = LoggerFactory.getLogger(ScheduleTaskConfig.class); private final static String TASK_NOT_EXISTS = "not exists"; private final static String TASK_EXISTS = "exists"; private final static String FAILURE = "failure"; private final static String SUCCESS = "success"; private ScheduledTaskRegistrar scheduledTaskRegistrar; @Override public void configureTasks(ScheduledTaskRegistrar scheduledTaskRegistrar) { this.scheduledTaskRegistrar = scheduledTaskRegistrar; initTask(); } /** * 添加任务 * @param task * @return */ public String addTask(BaseTask task) { if (scheduledTaskRegistrar == null || task == null) { return FAILURE; } if (TaskConfig.containsTask(task.getId())) { return TASK_EXISTS; } try { addTask0(task); TaskConfig.addTask(task); return SUCCESS; } catch (Exception e) { log.error("新增定时任务失败:" + task, e); throw e; } } /** * 改变任务执行频率 * @param taskId * @param expression * @return */ public String changeTask(String taskId, String expression) { BaseTask baseTask = TaskConfig.getTask(taskId); if (baseTask == null || TaskType.TRIGGER != baseTask.taskType || expression == null) { return TASK_NOT_EXISTS; } log.info("change trigger expression:(id=" + taskId + ",expression=" + expression+")"); baseTask.setExpression(expression); return SUCCESS; } /** * 取消定时任务 * @param taskId * @return */ public String cancelTask(String taskId) { if (!TaskConfig.containsTask(taskId)) { return TASK_NOT_EXISTS; } try { log.info("cancel task:" + taskId); TaskConfig.removeTask(taskId).getScheduledTask().cancel(); } catch (Exception e) { log.error("取消任务失败:" + taskId, e); throw e; } return SUCCESS; } /** * 初始化已配置任务 */ private void initTask() { TaskConfig.getTasks().forEach(task -> addTask0(task)); } private void addTask0(BaseTask task) { log.info("add task:" + task); switch (task.taskType) { case TRIGGER: task.setScheduledTask(addTriggerTask(task)); break; case CRON: task.setScheduledTask(addCronTask(task, task.getExpression())); break; case FIXED_RATE: task.setScheduledTask(addFixedRateTask(task, task.interval())); break; case FIXED_DELAY: task.setScheduledTask(addFixedDelayTask(task, task.interval(), task.delay())); break; default: } } /** * 添加不可改变时间表的定时任务 * @param task */ private ScheduledTask addCronTask(Runnable task, String expression) { return scheduledTaskRegistrar.scheduleCronTask(new CronTask(task, expression)); } /** * 添加可变时间task * @param task * @return */ private ScheduledTask addTriggerTask(BaseTask task) { return scheduledTaskRegistrar.scheduleTriggerTask(new TriggerTask(task, triggerContext -> { CronTrigger trigger = new CronTrigger(task.getExpression()); Date nextExec = trigger.nextExecutionTime(triggerContext); return nextExec; })); } /** * 设置固定频率的定时任务 * @param task * @param interval */ private ScheduledTask addFixedRateTask(Runnable task, long interval) { return scheduledTaskRegistrar.scheduleFixedRateTask(new IntervalTask(task, interval, 0L)); } /** * 设置延迟以固定频率执行的定时任务 * @param task * @param interval * @param delay */ private ScheduledTask addFixedDelayTask(Runnable task, long interval, long delay) { return scheduledTaskRegistrar.scheduleFixedDelayTask(new IntervalTask(task, interval, delay)); } }
package com.littlehow.job.base; import com.littlehow.job.base.config.TaskConfig; import com.littlehow.job.base.config.TaskType; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.InitializingBean; import org.springframework.scheduling.config.ScheduledTask; /** * 基本的任务配置类 */ public abstract class BaseTask implements Runnable,InitializingBean { protected final Logger log = LoggerFactory.getLogger(this.getClass()); public final TaskType taskType; private ScheduledTask scheduledTask; private final String id; public BaseTask(TaskType taskType, String id) { this.taskType = taskType; this.id = id; } /** * 获取任务表达式如:0 0 0/1 * * *? (每个整点执行) * @return */ public abstract String getExpression(); /** * 固定频率执行的时间间隔 * @return */ public abstract long interval(); /** * 固定频率执行的延迟时间 * @return */ public abstract long delay(); /** * 设置任务表达式 * @param expression */ public abstract void setExpression(String expression); /** * 获取任务唯一标识 * @return */ public String getId() { return id; } public final ScheduledTask getScheduledTask() { return scheduledTask; } public final void setScheduledTask(ScheduledTask scheduledTask) { this.scheduledTask = scheduledTask; } @Override public void afterPropertiesSet() { TaskConfig.addTask(this); } public String toString() { return this.getClass().getSimpleName() + "(id:" + id + ",expression:" + getExpression() + ",type:" + taskType + ",interval:" + interval() + ", delay:" + delay() + ")"; } }
package com.littlehow.job.service; import com.littlehow.job.base.BaseCronTask; import com.littlehow.job.base.BaseFixedTask; import com.littlehow.job.base.ScheduleBaseService; import com.littlehow.job.manager.pojo.TaskDto; import com.littlehow.job.manager.pojo.TaskType; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Service; import org.springframework.util.StringUtils; import java.util.regex.Matcher; import java.util.regex.Pattern; @Service public class TaskExecuteService { @Autowired private ScheduleBaseService scheduleBaseService; @Autowired private CallbackExecuteService executeService; private static final Pattern fixed = Pattern.compile("fixed=(//d+)//s*(,//s*delay=(//d+))?"); /** * 新增任务 * 因表达式解析错误可能会抛出异常 * @param taskDto */ public void addTask(TaskDto taskDto) { if (taskDto == null) { return; } String taskId = getTaskId(taskDto); CallbackExecuteService.addOrUpdateTask(taskId, taskDto.getCallbackUrl()); TaskType taskType = taskDto.getTaskType(); switch (taskType) { case FIXED: case FIXED_DELAY: long[] params = getFixedParameters(taskDto.getExpression()); BaseFixedTask baseFixedTask = new BaseFixedTask(taskId, params[0], params[1]) { @Override public void run() { executeService.execute(this.getId()); } }; scheduleBaseService.addTask(baseFixedTask); break; case CRON: case TRIGGER: BaseCronTask baseCronTask = new BaseCronTask(taskType == TaskType.CRON ? com.littlehow.job.base.config.TaskType.CRON : com.littlehow.job.base.config.TaskType.TRIGGER, taskId) { @Override public void run() { executeService.execute(this.getId()); } }; baseCronTask.setExpression(taskDto.getExpression()); scheduleBaseService.addTask(baseCronTask); } } /** * 修改任务 * @param taskDto */ public void updateTask(TaskDto taskDto) { if (!StringUtils.isEmpty(taskDto.getExpression())) { //修改表达式 scheduleBaseService.changeTask(getTaskId(taskDto), taskDto.getExpression()); } if (!StringUtils.isEmpty(taskDto.getCallbackUrl())) { //修改回调地址 CallbackExecuteService.addOrUpdateTask(getTaskId(taskDto), taskDto.getCallbackUrl()); } } /** * 删除任务 * @param taskDto */ public void removeTask(TaskDto taskDto) { String taskId = getTaskId(taskDto); scheduleBaseService.removeTask(taskId); CallbackExecuteService.removeTask(taskId); } /** * 获取fixed类型的实际参数信息 * @param expression * @return */ private static long[] getFixedParameters(String expression) { Matcher matcher = fixed.matcher(expression); if (matcher.find()) { long[] fixedParam = {0L, 0L}; fixedParam[0] = Long.parseLong(matcher.group(1)); String delay = matcher.group(3); if (delay != null) { fixedParam[1] = Long.parseLong(delay); } return fixedParam; } throw new IllegalArgumentException("invalid expression:" +expression); } /** * 获取任务编号 * @param task * @return */ private String getTaskId(TaskDto task) { return task.getBusinessId() + "-" + task.getTaskId(); } }
package com.littlehow.job.service; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.http.ResponseEntity; import org.springframework.stereotype.Service; import org.springframework.web.client.RestTemplate; import java.util.HashMap; import java.util.Map; /** * 执行调用定时任务的回调地址 * 可以约定定时任务的回执报文,便于进一步解析 * littlehow 2018/4/7 */ @Service public class CallbackExecuteService { private final static Logger log = LoggerFactory.getLogger(CallbackExecuteService.class); /** * 缓存任务与回调之间的关系 */ private static final Map<String, String> callbackInfo = new HashMap<>(); /** * http任务处理接口 */ private RestTemplate restTemplate = new RestTemplate(); /** * 执行远程任务 * @param taskId */ public void execute(String taskId) { String url = callbackInfo.get(taskId); if (url == null) { log.error("任务[" + taskId + "]的回调地址为空"); //FIXME 可以在此处加上监控或通知,也可以已异常的形式抛出,用aop统一处理 return; } try { //FIXME 如果制定了返回值规则,则可以详细解析,否则就简单解析状态码 ResponseEntity<String> response = restTemplate.getForEntity(url, String.class); if (response.getStatusCodeValue() >= 200 && response.getStatusCodeValue() <300) { //成功 log.info("定时任务[" + taskId + "]调用完成"); } else { //失败 log.error("任务[" + taskId + ":" + url + "]执行失败," + response.toString()); } } catch (Throwable t) { log.error("任务[" + taskId + ":" + url + "]执行异常", t); //FIXME 可以在此处加上监控或通知,也可以抛出此部分异常,用aop统一处理 } } /** * 新增或修改任务回调 * @param taskId * @param callbackUrl */ static String addOrUpdateTask(String taskId, String callbackUrl) { return callbackInfo.put(taskId, callbackUrl); } /** * 删除任务回调 * @param taskId * @return */ static String removeTask(String taskId) { return callbackInfo.remove(taskId); } }