在大家的项目中,想必都有那种,启动时候要去其他服务拉一些数据的情况,如果我们启动时,其他服务没启动,按岂不是就起不来了吗,如果这段拉数据的代码,并不是核心业务,那你这就有点说不过去了:不能因为对方没启动,我们也不能启动吧?
经过一些思考后,我觉得可以这样,启动的时候:
我这边可以大概就大家演示下。
随便写了个spring boot服务端,监听本机8082端口。模拟第三方服务
@RestController @Slf4j public class BusinessController { @GetMapping("/") public String test() { return "success"; } } @SpringBootApplication @Slf4j public class WebDemoApplicationServer { public static void main(String[] args) { ConfigurableApplicationContext context = SpringApplication.run(WebDemoApplicationServer.class, args); } }
客户端程序,依赖第三方服务,启动时,要去上面的服务端拉数据。
代码和上面差不多,唯一是在启动时,会执行以下逻辑:
@Component public class InitRunner implements CommandLineRunner{ private static final Logger log = LoggerFactory.getLogger(InitRunner.class); @Autowired private RestTemplate restTemplate; @Override public void run(String... args) throws Exception { ResponseEntity<String> entity = restTemplate.getForEntity("http://localhost:8082", String.class); String s = entity.toString(); log.info("get data:{}",s); } }
在上面的服务没启动的时候,这个客户端是起不来的。
怎么解决呢,很简单。
public class InitRunnerV2 implements CommandLineRunner { @Autowired private RestTemplate restTemplate; // 1 ScheduledThreadPoolExecutor scheduledThreadPoolExecutor = new ScheduledThreadPoolExecutor(1, new NamedThreadFactory("init-data-from-third-sys")); @Override public void run(String... args) { //2 TestTask task = new TestTask(restTemplate); //3 ScheduledFuture<?> scheduledFuture = scheduledThreadPoolExecutor.scheduleAtFixedRate(task, 0, 10, TimeUnit.SECONDS); // 4 task.setScheduledFuture(scheduledFuture); } }
1处,new了一个线程池,ScheduledThreadPoolExecutor类型,可周期执行某个任务
2处,new了一个任务,这个任务会执行我们的拉数据逻辑。
这个任务的代码如下:
@Slf4j public class TestTask implements Runnable{ private RestTemplate restTemplate; private volatile ScheduledFuture<?> scheduledFuture; public TestTask(RestTemplate restTemplate) { this.restTemplate = restTemplate; } ... public void setScheduledFuture(ScheduledFuture<?> scheduledFuture) { this.scheduledFuture = scheduledFuture; } }
其实很简单,就是定义了2个字段,一个是RestTemplate,请求数据时要用;另一个是ScheduledFuture<?>类型,这个字段在上面的 InitRunnerV2
代码的第三处被赋值。
3处,让这个任务循环执行,每10s一次。
4处,给task的 ScheduledFuture
赋值,注意的是,在task中,这个字段我们定义为volatile,保证线程可见。
下面是任务代码的剖析:
@Override public void run() { try { ResponseEntity<String> entity = restTemplate.getForEntity("http://localhost:8082", String.class); String s = entity.toString(); log.info("get data:{}",s); } catch (Exception e) { // log.error("e:{}",e); log.error("error"); return; } /** * 1 有可能任务执行太快,future还没被赋值 */ if (scheduledFuture != null) { scheduledFuture.cancel(true); } }
唯一有什么要说的,就是1处,如果成功了,我们就会调用 scheduledFuture.cancel(true);
,这样,这个scheduled 任务就不会继续执行了,也就达到了我们的目的,经济实惠。
到此,代码基本就这样了,详细代码见:
https://gitee.com/ckl111/all-simple-demo-in-work/tree/master/spring-boot-scheduler-future-demo-parent
因为上面的方案挺简单实用,但感觉没啥干货,于是我想着是否可以自己来实现一个定制的线程池,把这些事情给自动化了。
希望实现的最终效果如下,给future增加一个回调,需要在任务执行成功时,该回调自动被调用:
public class InitRunnerV3 implements CommandLineRunner { @Autowired private RestTemplate restTemplate; CustomScheduledThreadPoolExecutor scheduledThreadPoolExecutor = new CustomScheduledThreadPoolExecutor(1, new NamedThreadFactory("init-data-from-third-sys")); @Override public void run(String... args) { // 1 TestTaskV3 task = new TestTaskV3(restTemplate); // 2 CustomScheduledFuture<?> scheduledFuture = scheduledThreadPoolExecutor.scheduleAtFixedRate(task, 0, 10, TimeUnit.SECONDS); // 3 scheduledFuture.setCustomFutureCallBack(new CustomFutureCallBack() { @Override public void onSuccess(CustomScheduledFuture customScheduledFuture) { log.info("onSuccess"); // 4 customScheduledFuture.cancel(true); } @Override public void onException(Throwable throwable) { log.error("e:{}",throwable); } }); }
1处,执行任务,任务内部如下,去除了设置future的逻辑,和取消的逻辑
@Slf4j public class TestTaskV3 implements Runnable{ private RestTemplate restTemplate;
public TestTaskV3(RestTemplate restTemplate) { this.restTemplate = restTemplate; } @Override public void run() { try { ResponseEntity<String> entity = restTemplate.getForEntity("http://localhost:8082", String.class); String s = entity.toString(); log.info("get data:{}",s); } catch (Exception e) { // log.error("e:{}",e); log.error("error"); throw e; } } }
2处,循环执行任务,这里的scheduled线程池,是我们自定义的,回头再说;获取其返回的future
3处,给future增加回调,在回调中,如果成功,则取消该任务。
@Override public void onSuccess(CustomScheduledFuture customScheduledFuture) { log.info("onSuccess"); // 4 customScheduledFuture.cancel(true); }
这里,afterExecute是个空实现,就是留给子线程池扩展用的:
protected void afterExecute(Runnable r, Throwable t) { }
那我们可以考虑下,要怎么才能实现我们的目标呢,我们要在这个方法内,通过传进来的 Runnable r
,获取到下面这个future才能实现目的:
CustomScheduledFuture<?> scheduledFuture = scheduledThreadPoolExecutor.scheduleAtFixedRate(task, 0, 10, TimeUnit.SECONDS);
获取到future,就能拿到在future上设置的callback对象,就能调用callback,所以,现在问题是,要在传进来的Runnable中,获取到 scheduledFuture
。
所以,我们就得包装一下,传进来的runnable,我们定义了如下的Runnable:
@Data public class CustomDecoratedRunnable implements Runnable { Runnable runnable; CustomScheduledFuture customScheduledFuture; public CustomDecoratedRunnable(Runnable runnable,CustomScheduledFuture customScheduledFuture) { this.runnable = runnable; this.customScheduledFuture = customScheduledFuture; } @Override public void run() { this.runnable.run(); } }
我们具体看看,我们定制的线程池对象,我们的线程池,直接继承了 ScheduledThreadPoolExecutor
:
public class CustomScheduledThreadPoolExecutor<V> extends ScheduledThreadPoolExecutor { public CustomScheduledThreadPoolExecutor(int corePoolSize, ThreadFactory threadFactory) { super(corePoolSize, threadFactory); } ... }
其 scheduleAtFixedRate
方法,我们进行了重写:
@Override public CustomScheduledFuture<V> scheduleAtFixedRate(Runnable command, long initialDelay, long period, TimeUnit unit) { /** * 1 */ CustomScheduledFuture customScheduledFuture = new CustomScheduledFuture(); // 2 将future设置到task中 CustomDecoratedRunnable customDecoratedRunnable = new CustomDecoratedRunnable(command,customScheduledFuture); // 3 ScheduledFuture<?> scheduledFuture = super.scheduleAtFixedRate(customDecoratedRunnable, initialDelay, period, unit); /** * 4 将返回的future,设置到我们包装过的future */ customScheduledFuture.setScheduledFuture((RunnableScheduledFuture) scheduledFuture); return customScheduledFuture; }
1处,新建一个自定义的future
2处,将自定义的future,设置到上面说的task中
3处,把包装过的task,丢给线程池
4处,返回一个定制的future,这个future,包装了原有的future,同时,支持设置callback
public class CustomScheduledFuture<V> implements RunnableScheduledFuture<V> { /** * 其实是下面这种类型: * {@link java.util.concurrent.ScheduledThreadPoolExecutor.ScheduledFutureTask * */ RunnableScheduledFuture<V> scheduledFuture; // 设置callback时,赋值 CustomFutureCallBack customFutureCallBack; Runnable runnable; }
本来,我以为,丢给线程池什么Runnable对象,在afterExecute就能拿到什么样的Runnable对象,结果:
发现,传进来的,已经被包装过了,应该是为了支持周期执行。
所以,没办法,看起来路被堵死了,通过这个传进来的Runnable,也拿不到我们原始的Runnable。
后边找了半天,找到下面这个点:
#java.util.concurrent.ScheduledThreadPoolExecutor#scheduleAtFixedRate public ScheduledFuture<?> scheduleAtFixedRate(Runnable command, long initialDelay, long period, TimeUnit unit) { if (command == null || unit == null) throw new NullPointerException(); if (period <= 0) throw new IllegalArgumentException(); ScheduledFutureTask<Void> sft = new ScheduledFutureTask<Void>(command, null, triggerTime(initialDelay, unit), unit.toNanos(period)); // 1 RunnableScheduledFuture<Void> t = decorateTask(command, sft); sft.outerTask = t; delayedExecute(t); return t; }
1处,会调用decorateTask来包装task,默认实现,就是如下:
protected <V> RunnableScheduledFuture<V> decorateTask( Runnable runnable, RunnableScheduledFuture<V> task) { return task; }
这里的task,就是前面那个代码里的 ScheduledFutureTask<Void> sft
:
ScheduledFutureTask<Void> sft = new ScheduledFutureTask<Void>(command, null, triggerTime(initialDelay, unit), unit.toNanos(period)); // 1 RunnableScheduledFuture<Void> t = decorateTask(command, sft);
所以,我们得想办法重载这个方法:
@Override protected <V> RunnableScheduledFuture<V> decorateTask(Runnable runnable, RunnableScheduledFuture<V> task) { CustomScheduledFuture<V> future = new CustomScheduledFuture<>(); future.setRunnable(runnable); future.setScheduledFuture(task); return future; }
这里,利用CustomScheduledFuture,封装了task和runnable两个对象。
同时,我们自定义的这个 CustomScheduledFuture
,也是实现了这个方法的返回值,指定的接口:
@Data public class CustomScheduledFuture<V> implements RunnableScheduledFuture<V>
目前为止,经过包装后,在afterExecute处,拿到的Runnable如下:
@Override protected void afterExecute(Runnable r, Throwable t) { super.afterExecute(r, t); CustomScheduledFuture future; CustomDecoratedRunnable runnable = null; if (r instanceof CustomScheduledFuture) { future = (CustomScheduledFuture) r; // 1 runnable = (CustomDecoratedRunnable) future.getRunnable(); } // 2 CustomScheduledFuture customScheduledFuture = runnable.getCustomScheduledFuture(); // 3 CustomFutureCallBack customFutureCallBack = customScheduledFuture.getCustomFutureCallBack(); if (customFutureCallBack != null) { if (t != null) { customFutureCallBack.onException(t); } else { // 4 customFutureCallBack.onSuccess(customScheduledFuture); } } }
2020-04-10 09:45:28.068 INFO 14456 --- [ main] No active profile set, falling back to default profiles: default 2020-04-10 09:45:28.822 INFO 14456 --- [ main] Started WebDemoApplication in 1.153 seconds (JVM running for 1.805) 2020-04-10 09:45:36.933 ERROR 14456 --- [init-data-from-third-sys-1-thread-1] error 2020-04-10 09:48:48.975 INFO 14456 --- [init-data-from-third-sys-1-thread-1] onSuccess
可以看到,任务执行失败了,但为啥会调用onSuccess呢;另外,大家可以看到,都是在线程池的线程中执行的。
为啥会error了,还执行success呢,我发现,即使我在task中抛出了异常,但是上层没捕获。
我猜测,是因为:
public interface Runnable { /** * When an object implementing interface <code>Runnable</code> is used * to create a thread, starting the thread causes the object's * <code>run</code> method to be called in that separately executing * thread. * <p> * The general contract of the method <code>run</code> is that it may * take any action whatsoever. * * @see java.lang.Thread#run() */ public abstract void run(); }
这里没有抛出异常,所以,即使实现的runnable中抛了,上层也不管。
具体还要验证。
另一个点是,执行失败了,等了10s,并没有再次执行,猜测是我的定制task,导致了周期执行的问题。这个待验证和解决。
但,一个简单的回调,我们已经实现了。
大家使用方案1 就可以了;后面的方案,是折腾着玩的。希望对大家有帮助。
全部代码都在:
https://gitee.com/ckl111/all-simple-demo-in-work/tree/master/spring-boot-scheduler-future-demo-parent