转载

Half-sync/Half-async模式 下

什么网站都能找得到

评价与实现考量

1.既发挥了异步编程的优势——增加系统的并发性,减少不必要的等待,又保持了同步编程的简单性。

通过把低级任务或者耗时较短的任务安排在异步层,减少了客户端的等待,有利于提升系统的吞吐率。

高级任务或耗时较长的任务被安排在同步层,这使得可以在不影响客户端代码处理性能的情况下保持了同步编程的简单性。

2.独立的并发访问控制策略。

Half-sync/Half-async模式的分层架构使得各个层次的代码可以有各自的并发访问控制策略。

Half-sync/Half-async模式的异步层对接的是多线程环境,因此该层通常需要并发访问控制措施。而同步层是一个单线程环境,因此它无须任何并发访问控制即可保证线程安全。

例子

在Java Swing中,为了避免GUI组件(如按钮)死锁(Dead lock),Swing的GUI层的代码特意采用单线程模型(Swing的GUI层代码运行在Event Loop线程中)。这样就可以避免使用锁,也就自然地避免了死锁。但是,这同时也带来另外的问题:如果要在Swing的Event Loop线程中执行耗时较长的任务(如从服务器上下载大文件),那么GUI就会被“冻住”而无法响应用户操作。为了解决这个问题,JDK 1.6引入了SwingWorker类。该类应用了Half-sync/Half-async模式。SwingWorker就像一个转换器,其一端(异步层)连接着Swing的单线程GUI层,另一端(同步层)连接着多线程的应用代码层(如从服务器上下载文件的代码)。这样,耗时较长的应用代码仍然可以多线程执行,比如多个线程分段从服务器上下载大文件,而不影响Swing GUI层的单线程模式。

问题:队列积压

Half-sync/Half-async模式是Producer-Consumer模式的一个实例。AsyncTask和SyncTask分别相当于Producer-Consumer模式的Producer和Consumer参与者。

在使用Half-sync/Half-async模式的时候,需要根据实际情况在有界队列和无界队列之间做出选择。选择有界队列有个隐含的好处:当队列积压到满的情况下,Producer会因其需要等待队列非满而降低处理速率,从而在一定程度上减轻了Consumer的处理负担。

有界队列可以使用java.util.concurrent.ArrayBlockingQueue。ArrayBlockingQueue用于存储队列元素的存储空间是预先分配的,这意味着它在使用过程中内存开销较小(无须动态申请存储空间)。

无界队列可以使用java.util.concurrent.LinkedBlockingQueue。LinkedBlockingQueue用于存储队列元素的存储空间是在其使用过程中动态分配的,因此它可能会增加JVM垃圾回收的负担。

问题:避免同步层处理过慢

同步层中的高级任务取决于其依赖的外部资源(如数据库服务器)。而这些外部资源又往往不在控制范围之内。因此要避免同步层处理过慢,还要从同步层自身的设计入手,比如改用非阻塞I/O(如基于Java NIO的API),并采用专门的线程去处理I/O等待事件。

Half-sync/Half-async模式的可复用实现代码

AsyncTask类的dispatch方法会先调用onPreExecute方法执行耗时较短的任务,对客户端的输入做一些初步处理。然后,dispatch方法耗时较长的任务(doInBackround方法中)提交到同步层,由java.util.concurrent.Executor实现类负责执行。

AsyncTask类的executor实例变量是Half-sync/Half-async模式的SyncTask参与者,它负责执行doInBackground中实现的同步任务。

AsyncTask类的dispatch方法运行在异步层中,其调用的onPreExecute方法执行耗时较短的任务。AsyncTask类的doInBackground方法执行耗时较长的任务,运行在同步层的工作者线程中。

package io.github.viscent.mtpattern.ch14.hsha;

import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
import java.util.concurrent.FutureTask;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.RejectedExecutionHandler;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;

/**
 * Half-sync/Half-async模式的可复用实现。
 * 
 * @author Viscent Huang
 *
 * @param <V>
 *          同步任务的处理结果类型
 */
public abstract class AsyncTask<V> {

   // 相当于Half-sync/Half-async模式的同步层:用于执行异步层提交的任务
   private volatile Executor executor;
   private final static ExecutorService DEFAULT_EXECUTOR;

   static {
      DEFAULT_EXECUTOR = new ThreadPoolExecutor(1, 1, 8, TimeUnit.HOURS,
          new LinkedBlockingQueue<Runnable>(), new ThreadFactory() {

             @Override
             public Thread newThread(Runnable r) {
                Thread thread = new Thread(r, "AsyncTaskDefaultWorker");

                // 使该线程在JVM关闭时自动停止
                thread.setDaemon(true);
                return thread;
             }

          }, new RejectedExecutionHandler() {

             /**
              * 该RejectedExecutionHandler支持重试。 
              * 当任务被ThreadPoolExecutor拒绝时,
              * 该RejectedExecutionHandler支持
              * 重新将任务放入ThreadPoolExecutor
              * 的工作队列(这意味着,此时客户端代码
              * 需要等待ThreadPoolExecutor的队列非满)。
              */
             @Override
             public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
                if (!executor.isShutdown()) {
                   try {
                      executor.getQueue().put(r);
                   } catch (InterruptedException e) {
                      ;
                   }
                }

             }

          });
   }

   public AsyncTask(Executor executor) {
      this.executor = executor;
   }

   public AsyncTask() {
      this(DEFAULT_EXECUTOR);
   }

   public void setExecutor(Executor executor) {
      this.executor = executor;
   }

   /**
    * 留给子类实现耗时较短的任务,默认实现什么也不做。
    * 
    * @param params
    *          客户端代码调用dispatch方法时所传递的参数列表
    */
   protected void onPreExecute(Object... params) {
      // 什么也不做
   }

   /**
    * 留给子类实现。用于实现同步任务执行结束后所需执行的操作。 默认实现什么也不做。
    * 
    * @param result
    *          同步任务的处理结果
    */
   protected void onPostExecute(V result) {
      // 什么也不做
   }

   protected void onExecutionError(Exception e) {
      e.printStackTrace();
   }

   /**
    * 留给子类实现耗时较长的任务(同步任务),由后台线程负责调用。
    * 
    * @param params
    *          客户端代码调用dispatch方法时所传递的参数列表
    * @return 同步任务的处理结果
    */
   protected abstract V doInBackground(Object... params);

   /**
    * 对外(其子类)暴露的服务方法。 该类的子类需要定义一个比该方法命名更为具体的服务方法(如downloadLargeFile)。
    * 该命名具体的服务方法(如downloadLargeFile)可直接调用该方法。
    * 
    * @param params
    *          客户端代码传递的参数列表
    * @return 可借以获取任务处理结果的Promise(参见第6章,Promise模式)实例。
    */
   protected Future<V> dispatch(final Object... params) {
      FutureTask<V> ft = null;

      // 进行异步层初步处理
      onPreExecute(params);

      Callable<V> callable = new Callable<V>() {
         @Override
         public V call() throws Exception {
            V result;
            result = doInBackground(params);
            return result;
         }

      };

      ft = new FutureTask<V>(callable) {

         @Override
         protected void done() {
            try {
               onPostExecute(this.get());
            } catch (InterruptedException e) {
               onExecutionError(e);
            } catch (ExecutionException e) {
               onExecutionError(e);
            }
         }

      };

      // 提交任务到同步层处理
      executor.execute(ft);

      return ft;
   }

}

使用AsyncTask类实现Half-sync/Half-async模式,只需要创建AsyncTask的子类(或者匿名子类),并在子类中完成以下几个任务:

1. 【必需】定义一个含义具体的服务方法名,该方法可直接调用父类的dispatch方法。

2. 【必需】实现父类的抽象方法doInBackground方法。

3. 【可选】根据应用的实际需要覆盖父类的onPreExecute方法、onPostExecute方法和onExecutionException方法。

package io.github.viscent.mtpattern.ch14.hsha.example;

import java.util.LinkedList;
import java.util.List;
import java.util.concurrent.Future;

import io.github.viscent.mtpattern.ch14.hsha.AsyncTask;
import io.github.viscent.util.Debug;

public class SampleAsyncTask {

   public static void main(String[] args) {
      XAsyncTask task = new XAsyncTask();
      List<Future<String>> results = new LinkedList<Future<String>>();

      try {
         results.add(task.doSomething("Half-sync/Half-async", 1));

         results.add(task.doSomething("Pattern", 2));

         for (Future<String> result : results) {
            Debug.info(result.get());
         }

         Thread.sleep(200);
      } catch (Exception e) {

         e.printStackTrace();
      }

   }

   private static class XAsyncTask extends AsyncTask<String> {

      @Override
      protected String doInBackground(Object... params) {
         String message = (String) params[0];
         int sequence = (Integer) params[1];
         Debug.info("doInBackground:" + message);
         return "message " + sequence + ":" + message;
      }

      @Override
      protected void onPreExecute(Object... params) {
         String message = (String) params[0];
         int sequence = (Integer) params[1];
         Debug.info("onPreExecute:[" + sequence + "]" + message);
      }

      public Future<String> doSomething(String message, int sequence) {
         if (sequence < 0) {
            throw new IllegalArgumentException("Invalid sequence:" + sequence);
         }
         return this.dispatch(message, sequence);
      }
   }
}
原文  http://www.yongjava.com/2019/04/05/half-sync-half-async模式-下/
正文到此结束
Loading...