微信公众号:bugstack虫洞栈 | 沉淀、分享、成长,让自己和他人都能有所收获!
分布式任务DcsSchedule中间件,Github地址: github.com/fuzhengwei/…
分布式任务DcsSchedule控制台,Github地址: github.com/fuzhengwei/…
欢迎:star:Star和使用,你用剑 、我用刀:hocho:,好的代码都很骚:smirk:,望你不吝出招:dash:!
演示视频:《开发基于SpringBoot的分布式任务中间件DcsSchedule(为开源贡献力量)》
@SpringBootApplication @EnableScheduling public class Application{ public static void mian(String[] args){ SpringApplication.run(Application.class,args); } @Scheduled(cron = "0/3 * * * * *") public void demoTask() { //... } } 复制代码
咔咔,上面这段代码很熟悉吧,他就是SpringBoot的Schedule定时任务,简单易用。在我们开发中如果需要做一些定时或指定时刻循环执行逻辑时候,基本都会使用到Schedule。
但是,如果我们的任务是比较大型的,比如;定时跑批T+1结算、商品秒杀前状态变更、刷新数据预热到缓存等等,这些定时任务都相同的特点; 作业量大 、 实时性强 、 可用率高 。而这时候如果只是单纯使用Schedule就显得不足以控制。
那么,我们产品需求就出来了,分布式DcsSchedule任务;
嗯?有人憋半天了想说可以用
Quertz
,嗯可以的,但这不是本篇文章的重点。难道你不想看看一个自言开源中间件是怎么诞生的吗,怎么推到中心Maven仓的吗?比如下图; 真香不!
:grinning:好了,接下来开始介绍这个中间件如何使用和怎么开发的了!
版本 | 发布日期 | 备注 | |
---|---|---|---|
1 | 1.0.0-RELEASE | 2019-12-07 | 基本功能实现;任务接入、分布式启停 |
2 |
|
2019-12-07 | 上传测试版本 |
jdk1.8
StringBoot 2.x
配置中心zookeeper 3.4.14 {准备好zookeeper服务,如果windows调试可以从这里下载: www-eu.apache.org/dist/zookee… }
下载后解压,在bin同级路径创建文件夹data、logs
修改conf/zoo.cfg,修改配置如下;
dataDir=D://Program Files//apache-zookeeper-3.4.14//data dataLogDir=D://Program Files//apache-zookeeper-3.4.14//logs 复制代码
打包部署控制平台
下载地址: github.com/fuzhengwei/…
部署访问: http://localhost:7397
<dependency> <groupId>org.itstack.middleware</groupId> <artifactId>schedule-spring-boot-starter</artifactId> <version>1.0.0-RELEASE</version> </dependency> 复制代码
@SpringBootApplication @EnableDcsScheduling public class HelloWorldApplication { public static void main(String[] args) { SpringApplication.run(HelloWorldApplication.class, args); } } 复制代码
@Component("demoTaskThree") public class DemoTaskThree { @DcsScheduled(cron = "0 0 9,13 * * *", desc = "03定时任务执行测试:taskMethod01", autoStartup = false) public void taskMethod01() { System.out.println("03定时任务执行测试:taskMethod01"); } @DcsScheduled(cron = "0 0/30 8-10 * * *", desc = "03定时任务执行测试:taskMethod02", autoStartup = false) public void taskMethod02() { System.out.println("03定时任务执行测试:taskMethod02"); } } 复制代码
以SpringBoot为基础开发一款中间件我也是第一次,因为接触SpringBoot也刚刚1个月左右。虽然SpringBoot已经出来挺久的了,但由于我们项目开发并不使用SpringBoot的一套东西,所以一直依赖没有接触。直到上个月开始考虑领域驱动设计才接触,嗯!真的不错,那么就开始了夯实技能、学习思想用到项目里。
按照我的产品需求,开发这么一款分布式任务的中间件,我脑袋中的模型已经存在了。另外就是需要开发过程中去探索我需要的知识工具,简单包括;
schedule-spring-boot-starter └── src ├── main │ ├── java │ │ └── org.itstack.middleware.schedule │ │ ├── annotation │ │ │ ├── DcsScheduled.java │ │ │ └── EnableDcsScheduling.java │ │ ├── annotation │ │ │ └── InstructStatus.java │ │ ├── config │ │ │ ├── DcsSchedulingConfiguration.java │ │ │ ├── StarterAutoConfig.java │ │ │ └── StarterServiceProperties.java │ │ ├── domain │ │ │ ├── DataCollect.java │ │ │ ├── DcsScheduleInfo.java │ │ │ ├── DcsServerNode.java │ │ │ ├── ExecOrder.java │ │ │ └── Instruct.java │ │ ├── export │ │ │ └── DcsScheduleResource.java │ │ ├── service │ │ │ ├── HeartbeatService.java │ │ │ └── ZkCuratorServer.java │ │ ├── task │ │ │ ├── TaskScheduler.java │ │ │ ├── ScheduledTask.java │ │ │ ├── SchedulingConfig.java │ │ │ └── SchedulingRunnable.java │ │ ├── util │ │ │ └── StrUtil.java │ │ └── DoJoinPoint.java │ └── resources │ └── META_INF │ └── spring.factories └── test └── java └── org.itstack.demo.test └── ApiTest.java 复制代码
annotation/EnableDcsScheduling.java & 自定义注解
这个注解一堆的圈A,这些配置都是为了开始启动执行我们的中间件;
@Target({ElementType.TYPE}) @Retention(RetentionPolicy.RUNTIME) @Import({DcsSchedulingConfiguration.class}) @ImportAutoConfiguration({SchedulingConfig.class, CronTaskRegister.class, DoJoinPoint.class}) @ComponentScan("org.itstack.middleware.*") public @interface EnableDcsScheduling { } 复制代码
config/DcsSchedulingConfiguration.java & 初始化配置/服务、启动任务、挂在节点
@Override public Object postProcessAfterInitialization(Object bean, String beanName) throws BeansException { Class<?> targetClass = AopProxyUtils.ultimateTargetClass(bean); if (this.nonAnnotatedClasses.contains(targetClass)) return bean; Method[] methods = ReflectionUtils.getAllDeclaredMethods(bean.getClass()); if (methods == null) return bean; for (Method method : methods) { DcsScheduled dcsScheduled = AnnotationUtils.findAnnotation(method, DcsScheduled.class); if (null == dcsScheduled || 0 == method.getDeclaredAnnotations().length) continue; List<ExecOrder> execOrderList = Constants.execOrderMap.computeIfAbsent(beanName, k -> new ArrayList<>()); ExecOrder execOrder = new ExecOrder(); execOrder.setBean(bean); execOrder.setBeanName(beanName); execOrder.setMethodName(method.getName()); execOrder.setDesc(dcsScheduled.desc()); execOrder.setCron(dcsScheduled.cron()); execOrder.setAutoStartup(dcsScheduled.autoStartup()); execOrderList.add(execOrder); this.nonAnnotatedClasses.add(targetClass); } return bean; } 复制代码
private void init_server(ApplicationContext applicationContext) { try { //获取zk连接 CuratorFramework client = ZkCuratorServer.getClient(Constants.Global.zkAddress); //节点组装 path_root_server = StrUtil.joinStr(path_root, LINE, "server", LINE, schedulerServerId); path_root_server_ip = StrUtil.joinStr(path_root_server, LINE, "ip", LINE, Constants.Global.ip); //创建节点&递归删除本服务IP下的旧内容 ZkCuratorServer.deletingChildrenIfNeeded(client, path_root_server_ip); ZkCuratorServer.createNode(client, path_root_server_ip); ZkCuratorServer.setData(client, path_root_server, schedulerServerName); //添加节点&监听 ZkCuratorServer.createNodeSimple(client, Constants.Global.path_root_exec); ZkCuratorServer.addTreeCacheListener(applicationContext, client, Constants.Global.path_root_exec); } catch (Exception e) { logger.error("itstack middleware schedule init server error!", e); throw new RuntimeException(e); } } 复制代码
private void init_task(ApplicationContext applicationContext) { CronTaskRegister cronTaskRegistrar = applicationContext.getBean("itstack-middlware-schedule-cronTaskRegister", CronTaskRegister.class); Set<String> beanNames = Constants.execOrderMap.keySet(); for (String beanName : beanNames) { List<ExecOrder> execOrderList = Constants.execOrderMap.get(beanName); for (ExecOrder execOrder : execOrderList) { if (!execOrder.getAutoStartup()) continue; SchedulingRunnable task = new SchedulingRunnable(execOrder.getBean(), execOrder.getBeanName(), execOrder.getMethodName()); cronTaskRegistrar.addCronTask(task, execOrder.getCron()); } } } 复制代码
private void init_node() throws Exception { Set<String> beanNames = Constants.execOrderMap.keySet(); for (String beanName : beanNames) { List<ExecOrder> execOrderList = Constants.execOrderMap.get(beanName); for (ExecOrder execOrder : execOrderList) { String path_root_server_ip_clazz = StrUtil.joinStr(path_root_server_ip, LINE, "clazz", LINE, execOrder.getBeanName()); String path_root_server_ip_clazz_method = StrUtil.joinStr(path_root_server_ip_clazz, LINE, "method", LINE, execOrder.getMethodName()); String path_root_server_ip_clazz_method_status = StrUtil.joinStr(path_root_server_ip_clazz, LINE, "method", LINE, execOrder.getMethodName(), "/status"); //添加节点 ZkCuratorServer.createNodeSimple(client, path_root_server_ip_clazz); ZkCuratorServer.createNodeSimple(client, path_root_server_ip_clazz_method); ZkCuratorServer.createNodeSimple(client, path_root_server_ip_clazz_method_status); //添加节点数据[临时] ZkCuratorServer.appendPersistentData(client, path_root_server_ip_clazz_method + "/value", JSON.toJSONString(execOrder)); //添加节点数据[永久] ZkCuratorServer.setData(client, path_root_server_ip_clazz_method_status, execOrder.getAutoStartup() ? "1" : "0"); } } } 复制代码
service/ZkCuratorServer.java & zk服务
public static void addTreeCacheListener(final ApplicationContext applicationContext, final CuratorFramework client, String path) throws Exception { TreeCache treeCache = new TreeCache(client, path); treeCache.start(); treeCache.getListenable().addListener((curatorFramework, event) -> { //... switch (event.getType()) { case NODE_ADDED: case NODE_UPDATED: if (Constants.Global.ip.equals(instruct.getIp()) && Constants.Global.schedulerServerId.equals(instruct.getSchedulerServerId())) { //执行命令 Integer status = instruct.getStatus(); switch (status) { case 0: //停止任务 cronTaskRegistrar.removeCronTask(instruct.getBeanName() + "_" + instruct.getMethodName()); setData(client, path_root_server_ip_clazz_method_status, "0"); logger.info("itstack middleware schedule task stop {} {}", instruct.getBeanName(), instruct.getMethodName()); break; case 1: //启动任务 cronTaskRegistrar.addCronTask(new SchedulingRunnable(scheduleBean, instruct.getBeanName(), instruct.getMethodName()), instruct.getCron()); setData(client, path_root_server_ip_clazz_method_status, "1"); logger.info("itstack middleware schedule task start {} {}", instruct.getBeanName(), instruct.getMethodName()); break; case 2: //刷新任务 cronTaskRegistrar.removeCronTask(instruct.getBeanName() + "_" + instruct.getMethodName()); cronTaskRegistrar.addCronTask(new SchedulingRunnable(scheduleBean, instruct.getBeanName(), instruct.getMethodName()), instruct.getCron()); setData(client, path_root_server_ip_clazz_method_status, "1"); logger.info("itstack middleware schedule task refresh {} {}", instruct.getBeanName(), instruct.getMethodName()); break; } } break; case NODE_REMOVED: break; default: break; } }); } 复制代码
public void addCronTask(SchedulingRunnable task, String cronExpression) { if (null != Constants.scheduledTasks.get(task.taskId())) { removeCronTask(task.taskId()); } CronTask cronTask = new CronTask(task, cronExpression); Constants.scheduledTasks.put(task.taskId(), scheduleCronTask(cronTask)); } public void removeCronTask(String taskId) { ScheduledTask scheduledTask = Constants.scheduledTasks.remove(taskId); if (scheduledTask == null) return; scheduledTask.cancel(); } 复制代码
@Pointcut("@annotation(org.itstack.middleware.schedule.annotation.DcsScheduled)") public void aopPoint() { } @Around("aopPoint()") public Object doRouter(ProceedingJoinPoint jp) throws Throwable { long begin = System.currentTimeMillis(); Method method = getMethod(jp); try { return jp.proceed(); } finally { long end = System.currentTimeMillis(); logger.info("/nitstack middleware schedule method:{}.{} take time(m):{}", jp.getTarget().getClass().getSimpleName(), method.getName(), (end - begin)); } } 复制代码
开发完成后还是需要将Jar包发布到manven中心仓库的,这个过程较长单独写了博客; 发布Jar包到Maven中央仓库(为开发开源中间件做准备)