转载

Spring实现异步任务处理

Spring提供了一个接口 org.springframework.core.task.TaskExecutor ,这个接口实现了 java.util.concurrent.Executor 接口,Executor这个接口熟悉多线程的同学应该都知道,这是java线程池的顶层接口,那也就是说TaskExecutor也是个线程池,通过线程池来执行我们的异步任务。

1.基于xml配置

通过xml文件来配置TaskExecutor线程池,具体配置如下。

<!--
        代码中引用即可 @Autowired
        id:线程池名称,池中线程的名称前缀 ThreadPoolTaskExecutor
        pool-size:单一的值表示核心线程数量,如果是5-25,表示core-max
        queue-capacity:队列容量,默认是无界队列,可能会导致OOM,而且在无界队列的情况下,最大线程就无效了,只有固定线程才适合无界队列
        rejection-policy:当都满了的时候的拒绝策略,AbortPolice,CallerRunsPolicy,DiscardPolicy,DiscardOldestPolicy
        keep-alive:超过核心线程数量的线程的保活时间,如果设置为0,表示任务执行完之后立即回收 单位:s
        -->
<task:executor id="myExecutor" 
               pool-size="5-25" 
               queue-capacity="200" 
               rejection-policy="ABORT" 
               keep-alive="120"/>
复制代码

从上面的配置我们可以看出来,其实就是线程池的常用配置。

使用方式:

public class MyAsync {

    /**
     * 注入线程池
     */
    @Autowired
    private TaskExecutor taskExecutor;

    /**
     * 无返回值、无参数
     */
    public void async() {
       // taskExecutor.execute(new MyTask());
    }

    /**
     * 有参数、无返回值
     *
     * @param param
     */
    public void param(String param) {
        // taskExecutor.execute(new MyTask());
    }

    /**
     * Future 异步返回
     *
     * @return
     */
    public Future<String> future() {
        // taskExecutor.execute(new MyTask());
        return new AsyncResult<String>("result");
    }

    /**
     * CompletableFuture 异步返回
     *
     * @return
     */
    public CompletableFuture<String> completedFuture() {
        // taskExecutor.execute(new MyTask());
        return CompletableFuture.completedFuture("result");
    }
}
复制代码

基于xml配置,也可以配置启用 @Async 注解,具体配置如下:

<!--
        该标签是用于开启注解模式的,识别@Scheduled
        executor:executor bean name
        scheduler:scheduler bean name
        exception-handler:ThreadPoolTaskExecutor exception handler bean name
        mode:代理方式 1.AspectJ 2.JDK Proxy  默认JDK Proxy
        proxy-target-class:是否代理目标类 默认false
        -->
    <task:annotation-driven executor="myExecutor"/>
复制代码

直接使用 @Async 注解即可,使用方式如下:

public class MyAsync {

    /**
     * 无返回值、无参数
     */
    @Async
    public void async() {
        
    }

    /**
     * 有参数、无返回值
     *
     * @param param
     */
    @Async
    public void param(String param) {

    }

    /**
     * Future 异步返回
     *
     * @return
     */
    @Async
    public Future<String> future() {

        return new AsyncResult<String>("result");
    }

    /**
     * CompletableFuture 异步返回
     *
     * @return
     */
    @Async
    public CompletableFuture<String> completedFuture() {

        return CompletableFuture.completedFuture("result");
    }
}
复制代码

2.基于java config

基于java config的配置需要实现 org.springframework.scheduling.annotation.AsyncConfigurer 接口,具体代码如下。

@Configuration
public class MyAsyncConfigure implements AsyncConfigurer {

    @Bean
    @Override
    public Executor getAsyncExecutor() {
        ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
        executor.setCorePoolSize(10);
        executor.execute(myTask());
        Future<?> future = executor.submit(myTask());
        return executor;
    }

    /**
     * 异常处理
     *
     * @return
     */
    @Override
    public AsyncUncaughtExceptionHandler getAsyncUncaughtExceptionHandler() {
        return null;
    }

    public MyTask myTask() {
        return new MyTask();
    }
}
复制代码

3.基于注解

基于注解的配置方式是使用 @EnableAsync@Asyc 两个注解

@Configuration
@EnableAsync //开启异步,相当于TaskExecutor
public class MyConfiguration {
    /**
     * 配置线程池
     * 如果不配置,@EnableAsync默认core-size=1
     *
     * @return
     */
    @Bean
    public TaskExecutor taskExecutor() {
        ThreadPoolTaskExecutor scheduler = new ThreadPoolTaskExecutor();
        scheduler.setCorePoolSize(10);
        return scheduler;
    }
}
复制代码

使用方式:

public class MyAsync {

    /**
     * 无返回值、无参数
     */
    @Async
    public void async() {
    }

    /**
     * 有参数、无返回值
     *
     * @param param
     */
    @Async
    public void param(String param) {

    }

    /**
     * Future 异步返回
     *
     * @return
     */
    @Async
    public Future<String> future() {
        return new AsyncResult<String>("result");
    }

    /**
     * CompletableFuture 异步返回
     *
     * @return
     */
    @Async
    public CompletableFuture<String> completedFuture() {
        return CompletableFuture.completedFuture("result");
    }
}
复制代码

4.异常处理

1.xml和注解

如果是使用xml配置或注解配置方式,异常处理需要实现 org.springframework.aop.interceptor.AsyncUncaughtExceptionHandler 接口,它有一个简单的实现 org.springframework.aop.interceptor.SimpleAsyncUncaughtExceptionHandler ,这个实现记录了异常日志,我们也可以实现这个接口,自定义异常处理。

public class MyAsyncUncaughtExceptionHandler implements AsyncUncaughtExceptionHandler {
    @Override
    public void handleUncaughtException(Throwable ex, Method method, Object... params) {

    }
}
复制代码

xml使用:

<!--异常处理-->
<bean id="myAsyncUncaughtExceptionHandler"      class="com.ly.task.async.MyAsyncUncaughtExceptionHandler"/>

<task:annotation-driven executor="myExecutor"
                            exception-handler="myAsyncUncaughtExceptionHandler"/>
复制代码

注解方式只要配置了,发生异常会自动调用。

原文  https://juejin.im/post/5ea801a9e51d454d8814a082
正文到此结束
Loading...