转载

基于springboot和quartz的任务调度系统

一、简述

项目地址: https://github.com/littlechare/job-manager

项目详细的信息请参看项目的 README

下载地址: https://download.csdn.net/download/w172087242/10333176

之前想写一个关于定时任务的组建,便于项目中快速集成;

写完之后,又继续在此基础上演变成系统,当然该系统还缺少很多东西;

如业务方的bid生成与验证、任务回调响应报文的规则、任务调度的监控、如何做HA等等;

因为是个人项目,所以用了部分下班时间和本次清明节的空闲时间实现,

所以项目可定也有不足的地方,但是如果开发者想使用,也可以下载下来快速

集成,也可在此基础上加以修改。

二、项目功能简述

2.1 任务类型支持说明

任务类型 说明 表达式示例 备注
CRON Cron类型的任务,不支持表达式更改 0/20 * * * * *? 每20秒调用一次
TRIGGER Trigger类型的任务,支持表达式更改 0 0 4 * * *? 每天凌晨4点调用
FIXED Fixed类型任务,不支持表达式更改 fixed=5000 单位毫秒,表示每5秒调用一次
FIXED_DELAY Fixed_delay任务,不支持表达式更改 fixed=5000,delay=10000 单位毫秒,表示延迟10秒后执行

2.2 业务方说明

1.业务方需要自己维护业务方编号(bid)和任务编号;

2.支持任务的随时取消、随时更改、随时添加等,回调地址一http(s)打头

2.3 模块说明与类说明

模块说明
模块名称 功能 说明
job-dispatcher-base 任务基础模块 定时任务能力提供者(可抽离成组件)
job-dispatcher-biz 任务业务模块 业务管理与任务触发模块
job-dispatcher-manager 任务管理模块 管理任务变动并发出变动事件提醒(分布部署可基于消息进行改进)
job-dispatcher-test 测试模块 独立模块,含有一个接口,便于测试任务回调
base模块类说明
类名(简要名称,前缀省略com.littlehow.job) 功能 说明
base.config.ScheduleTaskConfig 定时任务配置器 任务的核心管理器
base.config. TaskConfig 定时任务缓存 缓存任务信息
base.config. TaskType 定时任务类型 枚举CRON,TRIGGER,FIXED,FIXED_DELAY
base. BaseTask 任务基础信息 任务实体bean,实现runnable和initializingbean
cron基础类 因为csdn编辑器问题,类名写于此BaseCronTask
base. BaseFixedTask fixed基础类 实现baseTask类,与cron类似
ScheduleBaseService service类 对外暴露服务类
manager模块类说明
类名 (要名简称,前缀省略com.littlehow.job) 功能 说明
manager.api.TaskManagerService 任务持久化管理api manager模块可以独立出去,所以该api为唯一访问接口
manager.event.TaskEvent 任务事件 任务发送的事件实体,继承ApplicationEvent
manager.event.TaskEventType 任务事件类型 枚举ADD,UPDATE,REMOVE
manager.support.MysqlTaskManagerSupport api实现 mysql持久化任务的实现
biz模块类说明
类名 (要名简称,前缀省略 com.littlehow.job ) 功能 说明
advice.ExceptionAdvice 异常统一处理
advice.SuccessResponseAdvice 成功统一处理
config.InterceptorConfig 拦截器配置
interceptor.BusinessInterceptor 拦截器实现 主要运用验证和获取header中的bid
interceptor.BusinessContext 上下文 bid上下文管理
listener.TaskEventListener 任务事件监听 监听任务事件,实现ApplicationListener接口
service.CallbackExecuteService 回调处理服务 定时任务的回调地址调用
service.TaskExecuteService 任务处理服务
controller.TaskManagerController 任务管理web接口 支持任务的增删改查
init.InitTask 初始化任务 任务系统重启后需要重新将已有任务加入任务队列
vo.ResponseEntity 相应参数
vo.ArgumentsException 参数异常 校验参数时,如果不合法会抛出此异常
JobStart 启动类 springboot项目启动类
test模块类说明
类名 (要名简称,前缀省略 com.littlehow.job ) 功能 说明
test.TestController 测试controller
test.TestStart 测试启动类 springboot启动类

三、任务系统代码示例;

3.1 任务配置类(ScheduleTaskConfig)

package com.littlehow.job.base.config;

import com.littlehow.job.base.BaseTask;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Configurable;
import org.springframework.context.annotation.Lazy;
import org.springframework.scheduling.annotation.EnableScheduling;
import org.springframework.scheduling.annotation.SchedulingConfigurer;
import org.springframework.scheduling.config.*;
import org.springframework.scheduling.support.CronTrigger;
import org.springframework.stereotype.Component;

import java.util.Date;

@Lazy(false)
@Configurable
@Component
@EnableScheduling
public class ScheduleTaskConfig implements SchedulingConfigurer {
    private Logger log = LoggerFactory.getLogger(ScheduleTaskConfig.class);
    private final static String TASK_NOT_EXISTS = "not exists";
    private final static String TASK_EXISTS = "exists";
    private final static String FAILURE = "failure";
    private final static String SUCCESS = "success";
    private ScheduledTaskRegistrar scheduledTaskRegistrar;
    @Override
    public void configureTasks(ScheduledTaskRegistrar scheduledTaskRegistrar) {
        this.scheduledTaskRegistrar = scheduledTaskRegistrar;
        initTask();
    }

    /**
     * 添加任务
     * @param task
     * @return
     */
    public String addTask(BaseTask task) {
        if (scheduledTaskRegistrar == null || task == null) {
            return FAILURE;
        }
        if (TaskConfig.containsTask(task.getId())) {
            return TASK_EXISTS;
        }
        try {
            addTask0(task);
            TaskConfig.addTask(task);
            return SUCCESS;
        } catch (Exception e) {
            log.error("新增定时任务失败:" + task, e);
            throw e;
        }
    }

    /**
     * 改变任务执行频率
     * @param taskId
     * @param expression
     * @return
     */
    public String changeTask(String taskId, String expression) {
        BaseTask baseTask = TaskConfig.getTask(taskId);
        if (baseTask == null || TaskType.TRIGGER != baseTask.taskType || expression == null) {
            return TASK_NOT_EXISTS;
        }
        log.info("change trigger expression:(id=" + taskId + ",expression=" + expression+")");
        baseTask.setExpression(expression);
        return SUCCESS;
    }

    /**
     * 取消定时任务
     * @param taskId
     * @return
     */
    public String cancelTask(String taskId) {
        if (!TaskConfig.containsTask(taskId)) {
            return TASK_NOT_EXISTS;
        }
        try {
            log.info("cancel task:" + taskId);
            TaskConfig.removeTask(taskId).getScheduledTask().cancel();
        } catch (Exception e) {
            log.error("取消任务失败:" + taskId, e);
            throw e;
        }
        return SUCCESS;
    }


    /**
     * 初始化已配置任务
     */
    private void initTask() {
        TaskConfig.getTasks().forEach(task -> addTask0(task));
    }

    private void addTask0(BaseTask task) {
        log.info("add task:" + task);
        switch (task.taskType) {
            case TRIGGER: task.setScheduledTask(addTriggerTask(task));
                break;
            case CRON: task.setScheduledTask(addCronTask(task, task.getExpression()));
                break;
            case FIXED_RATE: task.setScheduledTask(addFixedRateTask(task, task.interval()));
                break;
            case FIXED_DELAY: task.setScheduledTask(addFixedDelayTask(task, task.interval(), task.delay()));
                break;
            default:
        }
    }

    /**
     * 添加不可改变时间表的定时任务
     * @param task
     */
    private ScheduledTask addCronTask(Runnable task, String expression) {
        return scheduledTaskRegistrar.scheduleCronTask(new CronTask(task, expression));
    }

    /**
     * 添加可变时间task
     * @param task
     * @return
     */
    private ScheduledTask addTriggerTask(BaseTask task) {
        return scheduledTaskRegistrar.scheduleTriggerTask(new TriggerTask(task, triggerContext -> {
            CronTrigger trigger = new CronTrigger(task.getExpression());
            Date nextExec = trigger.nextExecutionTime(triggerContext);
            return nextExec;
        }));
    }

    /**
     * 设置固定频率的定时任务
     * @param task
     * @param interval
     */
    private ScheduledTask addFixedRateTask(Runnable task, long interval) {
        return scheduledTaskRegistrar.scheduleFixedRateTask(new IntervalTask(task, interval, 0L));
    }

    /**
     * 设置延迟以固定频率执行的定时任务
     * @param task
     * @param interval
     * @param delay
     */
    private ScheduledTask addFixedDelayTask(Runnable task, long interval, long delay) {
        return scheduledTaskRegistrar.scheduleFixedDelayTask(new IntervalTask(task, interval, delay));
    }
}

3.2 任务基本实体类(BaseTask)

package com.littlehow.job.base;


import com.littlehow.job.base.config.TaskConfig;
import com.littlehow.job.base.config.TaskType;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.InitializingBean;
import org.springframework.scheduling.config.ScheduledTask;

/**
 * 基本的任务配置类
 */
public abstract class BaseTask implements Runnable,InitializingBean {
    protected final Logger log = LoggerFactory.getLogger(this.getClass());
    public final TaskType taskType;
    private ScheduledTask scheduledTask;
    private final String id;
    public BaseTask(TaskType taskType, String id) {
        this.taskType = taskType;
        this.id = id;
    }

    /**
     *  获取任务表达式如:0 0 0/1 * * *? (每个整点执行)
     * @return
     */
    public abstract String getExpression();

    /**
     * 固定频率执行的时间间隔
     * @return
     */
    public abstract long interval();

    /**
     * 固定频率执行的延迟时间
     * @return
     */
    public abstract long delay();

    /**
     * 设置任务表达式
     * @param expression
     */
    public abstract void setExpression(String expression);

    /**
     * 获取任务唯一标识
     * @return
     */
    public String getId() {
        return id;
    }

    public final ScheduledTask getScheduledTask() {
        return scheduledTask;
    }

    public final void setScheduledTask(ScheduledTask scheduledTask) {
        this.scheduledTask = scheduledTask;
    }

    @Override
    public void afterPropertiesSet() {
        TaskConfig.addTask(this);
    }

    public String toString() {
        return this.getClass().getSimpleName() + "(id:" + id + ",expression:" + getExpression()
                + ",type:" + taskType + ",interval:" + interval()
                + ", delay:" + delay() + ")";
    }
}

3.3 任务执行类(TaskExecuteService)

package com.littlehow.job.service;

import com.littlehow.job.base.BaseCronTask;
import com.littlehow.job.base.BaseFixedTask;
import com.littlehow.job.base.ScheduleBaseService;
import com.littlehow.job.manager.pojo.TaskDto;
import com.littlehow.job.manager.pojo.TaskType;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import org.springframework.util.StringUtils;

import java.util.regex.Matcher;
import java.util.regex.Pattern;

@Service
public class TaskExecuteService {
    @Autowired
    private ScheduleBaseService scheduleBaseService;
    @Autowired
    private CallbackExecuteService executeService;
    private static final Pattern fixed = Pattern.compile("fixed=(//d+)//s*(,//s*delay=(//d+))?");

    /**
     * 新增任务
     * 因表达式解析错误可能会抛出异常
     * @param taskDto
     */
    public void addTask(TaskDto taskDto) {
        if (taskDto == null) {
            return;
        }
        String taskId = getTaskId(taskDto);
        CallbackExecuteService.addOrUpdateTask(taskId, taskDto.getCallbackUrl());
        TaskType taskType = taskDto.getTaskType();
        switch (taskType) {
            case FIXED:
            case FIXED_DELAY:
                long[] params = getFixedParameters(taskDto.getExpression());
                BaseFixedTask baseFixedTask = new BaseFixedTask(taskId,
                        params[0], params[1]) {
                    @Override
                    public void run() {
                        executeService.execute(this.getId());
                    }
                };
                scheduleBaseService.addTask(baseFixedTask);
                break;
            case CRON:
            case TRIGGER:
                BaseCronTask baseCronTask = new BaseCronTask(taskType == TaskType.CRON ?
                com.littlehow.job.base.config.TaskType.CRON :
                com.littlehow.job.base.config.TaskType.TRIGGER, taskId) {
                    @Override
                    public void run() {
                        executeService.execute(this.getId());
                    }
                };
                baseCronTask.setExpression(taskDto.getExpression());
                scheduleBaseService.addTask(baseCronTask);
        }
    }

    /**
     * 修改任务
     * @param taskDto
     */
    public void updateTask(TaskDto taskDto) {
        if (!StringUtils.isEmpty(taskDto.getExpression())) {
            //修改表达式
            scheduleBaseService.changeTask(getTaskId(taskDto), taskDto.getExpression());
        }
        if (!StringUtils.isEmpty(taskDto.getCallbackUrl())) {
            //修改回调地址
            CallbackExecuteService.addOrUpdateTask(getTaskId(taskDto), taskDto.getCallbackUrl());
        }
    }

    /**
     * 删除任务
     * @param taskDto
     */
    public void removeTask(TaskDto taskDto) {
        String taskId = getTaskId(taskDto);
        scheduleBaseService.removeTask(taskId);
        CallbackExecuteService.removeTask(taskId);
    }

    /**
     * 获取fixed类型的实际参数信息
     * @param expression
     * @return
     */
    private static long[] getFixedParameters(String expression) {
        Matcher matcher = fixed.matcher(expression);
        if (matcher.find()) {
            long[] fixedParam = {0L, 0L};
            fixedParam[0] = Long.parseLong(matcher.group(1));
            String delay = matcher.group(3);
            if (delay != null) {
                fixedParam[1] = Long.parseLong(delay);
            }
            return fixedParam;
        }
        throw new IllegalArgumentException("invalid expression:" +expression);
    }

    /**
     * 获取任务编号
     * @param task
     * @return
     */
    private String getTaskId(TaskDto task) {
        return task.getBusinessId() + "-" + task.getTaskId();
    }
}

3.4 任务回调地址处理类(CallbackExecuteService)

package com.littlehow.job.service;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.http.ResponseEntity;
import org.springframework.stereotype.Service;
import org.springframework.web.client.RestTemplate;

import java.util.HashMap;
import java.util.Map;

/**
 * 执行调用定时任务的回调地址
 * 可以约定定时任务的回执报文,便于进一步解析
 * littlehow 2018/4/7
 */
@Service
public class CallbackExecuteService {
    private final static Logger log = LoggerFactory.getLogger(CallbackExecuteService.class);
    /**
     * 缓存任务与回调之间的关系
     */
    private static final Map<String, String> callbackInfo = new HashMap<>();

    /**
     * http任务处理接口
     */
    private RestTemplate restTemplate = new RestTemplate();

    /**
     * 执行远程任务
     * @param taskId
     */
    public void execute(String taskId) {
        String url = callbackInfo.get(taskId);
        if (url == null) {
            log.error("任务[" + taskId + "]的回调地址为空");
            //FIXME 可以在此处加上监控或通知,也可以已异常的形式抛出,用aop统一处理
            return;
        }
        try {
            //FIXME 如果制定了返回值规则,则可以详细解析,否则就简单解析状态码
            ResponseEntity<String> response = restTemplate.getForEntity(url, String.class);
            if (response.getStatusCodeValue() >= 200 && response.getStatusCodeValue() <300) {
                //成功
                log.info("定时任务[" + taskId + "]调用完成");
            } else {
                //失败
                log.error("任务[" + taskId + ":" + url + "]执行失败," + response.toString());
            }
        } catch (Throwable t) {
            log.error("任务[" + taskId + ":" + url + "]执行异常", t);
            //FIXME 可以在此处加上监控或通知,也可以抛出此部分异常,用aop统一处理
        }
    }

    /**
     * 新增或修改任务回调
     * @param taskId
     * @param callbackUrl
     */
    static String addOrUpdateTask(String taskId, String callbackUrl) {
        return callbackInfo.put(taskId, callbackUrl);
    }

    /**
     * 删除任务回调
     * @param taskId
     * @return
     */
    static String removeTask(String taskId) {
        return callbackInfo.remove(taskId);
    }
}
原文  https://blog.csdn.net/w172087242/article/details/79847998
正文到此结束
Loading...