我们在应用开发的时候,应该都碰到过这种需求:每天固定时间点跑一个任务;创建一些临时的任务去初始化数据或者做数据迁移;固定一个时间周期去轮询是否有新的状态发生;在java中有两个类可以帮我们处理这种需求,一个是java.util.TimerTask,一个是 java.util.concurrent.ScheduledExecutorService , 但是随着业务的发展,任务调度的需求会越来越多,对调度器的要求也会更高,例如能够监控任务的执行进度,能够根据应用负载动态路由选择更健康的执行器去执行任务,所以我们需要一个系统将调度器和执行器分离开,在调度系统中增加更多功能来辅助我们运维业务中的各种任务。
我们可以思考一下,一个集中式任务调度系统需要哪些功能点,才能方便我们对业务中的应用进行管理。
对于一个调度中心,我们要解决的核心问题主要是两个,有了这两个功能那么一个调度中心就基本可用,而其他功能就是将这个系统打造的更易用,更完善 一个是调度器的实现,这个我们可以借助业界常用的quartz框架,quartz也支持集群部署。另外一个是调度器如何通过rpc通知执行器去执行某个任务单元,并将执行参数传输过去,如果执行器是一个web容器,部署在tomcat中,那么可以在执行器中增加一个servlet来接受调度器的请求,在这个servlet内部去派发到对应的任务单元执行和取消,当然我们也可以借助其他的rpc框架,例如dubboo,grpc等,由于调度器和执行器之间的调用比较简单,我们也可以通过netty实现一个简单rpc程序
1.执行器注册
当没有注册功能的时候,我们需要手工编辑线上的执行器,而每次应用变更IP或者上下线都需要手工维护,如果不维护执行器我们就无法做路由,而且每次调度都需要输入执行器地址。 注册功能我们可以借助zookeeper来实现,执行器也就是我们的应用在启动的时候可以将相关信息写入一个临时节点,当执行器退出的时候这个临时节点会自动删除,而这些信息都可以通知到调度器,类似的注册中心实现还有eureka,etcd等,加入这些注册中心可能让我们的应用很重,执行器和调度器都需要依赖注册中心的客户端应用,而此时我们只是需要将执行器相关信息在启动和退出的时候发送给调度中心,我们可以在调度中心监听一个http端口,执行器在启动和退出的时候可以直接通过httpp将信息发给调度中心,配合域名也很方便的可以做调度中心的负载均衡,当然如果我们调度器和执行器之间本身已经有RPC调用了,这些注册信息也可以通过rpc进行传输,执行器的启动可以在相关核心类的初始化方法中实现,执行器的关闭可以借助相关资源类的释放或者JVM的关闭钩子
2.任务单元的注册
如果任务单元不自动注册,那么我们每次上线需要手工在管理后台编辑新增任务,并且选择对应的执行器列表,如果任务单元也可以自己向调度中心注册,我们的使用将更加简单,我们可以规定我们的任务单元都实现一个指定的接口,或者加一个注解,更推荐用注解的形式,我们可以通过在注解上可以配置一些调度策略,否则的话这些调度策略只能在接口中体现了,而我们只需要在应用启动的时候将这些接口和注解扫描出来发给注册中心就好了,由于我们的执行器都是分布式部署的,所以同一个任务单元随着应用的重启会向注册中心注册多次,而注册中心需要根据任务单元的特性进行防重复设计,而任务单元在代码废除后,我们可以手工在调度器进行删除,或者先在注解上加一个参数,之后在下一个版本中删除
3.任务中断
一般我们的任务单元在被调度的时候都会在一个线程工厂创建的特定线程里面运行,而将正在执行的任务中断我们只能调用thread.interrupt()方法,而这个方法可能只能在这个线程wait的时候才能退出,我们建议针对任务单元设计一个volatile变量标示任务是否终止,而在任务单元内部可以循环遍历该变量是否已经取消,这种方式当我们的任务是处理一些批量数据,并且需要循环遍历的时候相对优雅很多
4.任务调度模式
路由策略,失败重试,超时检查,并发控制,任务依赖,这些我们一般通过编写特定的策略就可以实现,关于分片,类似于数据库的分库分表,这种一般需要任务单元内部支持,而调度中心只是在上层辅助进行调度,选择多少个执行器并发执行,将分片结果以参数形式分发给执行器
5.GLUE模式
这个我们需要借助groove的classloader将传人的代码加载成JVM里面的任务单元
下面我们分析下开源的XXL-job的实现
xxl-job 调度中心和执行器之间通过一个叫XXL-RPC的中间件进行双向通讯,调度中心通过RPC向执行器下发调度请求,执行器向调度中心注册执行器信息,XXL-RPC是作者开源的一个机于protobuff协议实现的RPC,而调度器采用开源的quartz实现 调度器和应用端集成,需要通过配置文件来配置调度中心的地址,如果应用是在spring来管理bean的生命周期,可以配置一个bean: XxlJobExecutor,并在属性中设置调度中心的IP地址以及通讯信息,并设置初始化方法init-method, 在init-method方法内部,会去初始化RPC相关类,然后将执行器注册到调度中心,然后会扫描任务单元并缓存对应的bean实例。
如果调度器要集群部署,有两种模式
这种模式需要集群中选一个master,其他节点都为slave节点,同一时刻只有一个调度器能够下发任务指令(所有节点在下发指令前判断是否是master节点),因为我们不允许同一个任务被多个调度器触发执行造成job重复执行,当master节点挂了之后,其他slave选举出一个新的节点作为master继续提供服务,选取master的算法可以利用paxos算法或者raft算法,在多个调度器节点之间进行选举,并且通过心跳检测来发现master节点挂了之后重新进行选举,这类算法实现起来较为复杂,我们可以利用zookeeper的客户端在失去连接后会自动删除这个客户端创建的所有临时节点,并可以通知监听程序来实现,所有的节点在启动的时候都向zookeeper创建同一个节点,第一个创建成功的就是master节点,其他节点发现已经存在该节点就自动作为slave节点,并监听这个节点的变化,当这个节点被删除后,所有的slave节点在收到通知后在创建这个节点来争master,这种方式实现起来比较简单,但是调度器的可用性不能仅仅通过和zookeeper是否能够连接成功表示,有时候调度器如果连接不是数据库也会 导致调度器不可用,这种情况下可以在调度器启动一些监控程序,查看和数据库的连接,cpu负载情况,当达到阈值后主动让出master节点
这种有点类似于redis集群,当用上面的主备模式,如果被调动的任务单元数量上升会对调度器造成很大的压力,那么一个任务单元如果能够在多个调度器之间做随机选择一个被调度,这样可以减少调度器的压力,当其中一个调度器挂了之后,这个调度器所管理的任务单元将有其他调度器接管, 这种模式下我们可以把任务调度和任务管理拆分出来部署