转载

java线程-调度

并发并不一定依赖多线程,但Java里谈论并发大多数都与线程脱不开关系。 线程是比进程更轻量级的调度执行单位 ,线程的引入可以把一个进程的资源分配和执行调度分开,各个线程既可以共享进程资源(内存地址,文件IO等),又可以独立调度(线程是CPU调度的基本单位)。 Thread类的所有关键方法都声明了native的 ,意味着这个方法没有使用或无法使用平台无关的手段来实现,也有可能是为了执行效率。

Java线程调度

线程调度是指系统为线程分配处理器使用权的过程,主要调度方式分两种,分别是协同式线程调度和抢占式线程调度。 协同式线程调度,线程执行时间由线程本身来控制,线程把自己的工作执行完之后,要主动通知系统切换到另外一个线程上。最大好处是实现简单,且切换操作对线程自己是可知的,没啥线程同步问题。坏处是线程执行时间不可控制,如果一个线程有问题,可能一直阻塞在那里。 抢占式调度,每个线程将由系统来分配执行时间,线程的切换不由线程本身来决定(Java中,Thread.yield()可以让出执行时间,但无法获取执行时间)。线程执行时间系统可控,也不会有一个线程导致整个进程阻塞。 **Java线程调度就是抢占式调度。 ** 希望系统能给某些线程多分配一些时间,给一些线程少分配一些时间,可以通过设置线程优先级来完成。Java语言一共10个级别的线程优先级(Thread.MIN_PRIORITY至Thread.MAX_PRIORITY),在两线程同时处于ready状态时,优先级越高的线程越容易被系统选择执行。但优先级并不是很靠谱,因为Java线程是通过映射到系统的原生线程上来实现的,所以线程调度最终还是取决于操作系统。

状态转换

java线程-调度

Java定义了5种线程状态,在任意一个点一个线程只能有且只有其中一种状态。无限等待和等待可以算在一起。所以共五种。 1、新建(New):创建后尚未启动的线程。 2、运行(Runnable):Runnable包括操作系统线程状态中的Running和Ready,也就是处于此状态的线程有可能正在执行,也有可能等待CPU为它分配执行时间。线程对象创建后,其他线程调用了该对象的start()方法。该状态的线程位于“可运行线程池”中,变得可运行,只等待获取CPU的使用权。即在就绪状态的进程除CPU之外,其它的运行所需资源都已全部获得。 3、无限期等待(Waiting):该状态下线程不会被分配CPU执行时间,要等待被其他线程显式唤醒。如没有设置timeout的object.wait()方法和Thread.join()方法,以及LockSupport.park()方法。 4、限期等待(Timed Waiting):不会被分配CPU执行时间,不过无须等待被其他线程显式唤醒,在一定时间之后会由系统自动唤醒。如Thread.sleep(),设置了timeout的object.wait()和thread.join(),LockSupport.parkNanos()以及LockSupport.parkUntil()方法。

5、阻塞(Blocked):线程被阻塞了。与等待状态的区别是:阻塞在等待着获取到一个排他锁,这个事件将在另外一个线程放弃这个锁的时候发生;而等待则在等待一段时间,或唤醒动作的发生。在等待进入同步区域时,线程将进入这种状态。

**阻塞的情况分三种: ** (1)、等待阻塞:运行的线程执行wait()方法,该线程会释放占用的所有资源,JVM会把该线程放入“等待池”中。进入这个状态后,是不能自动唤醒的,必须依靠其他线程调用notify()或notifyAll()方法才能被唤醒,即无限期等待。 (2)、同步阻塞:运行的线程在获取对象的同步锁时,若该同步锁被别的线程占用,则JVM会把该线程放入“锁池”中。 (3)、其他阻塞:运行的线程执行sleep()或join()方法,或者发出了I/O请求时,JVM会把该线程置为阻塞状态。当sleep()状态超时、join()等待线程终止或者超时、或者I/O处理完毕时,线程重新转入就绪状态。即限期等待。

注意:join() 代表当前线程需要等待子线程的执行完毕,当前线程可以是主线程也可以是一个其他的子线程。join()会交出当前的cpu时间分片。

线程收敛

本质上: 通过反射去调用相应的类的方法来进行Aop的编程。所有的线程框架最终都会调用thread构造函数,可以通过hook thread来打印相关的调用堆栈信息。来排查三方库函数对线程的使用。收敛就是要控制团队中线程的创建、使用。可以通过自建的线程池来管理线程。

多线程并发获取任务结果集

1.Futrue接口

public interface Future<V> {
    boolean cancel(boolean var1);
    boolean isCancelled();
    boolean isDone();
    V get() throws InterruptedException, ExecutionException;
    V get(long var1, TimeUnit var3) throws InterruptedException,ExecutionException,TimeoutException;
}

复制代码
public class FutureDemo {

    //结果集
    static List<Integer> list = new ArrayList<>();
    static List<Future<Integer>> futureList = new ArrayList<>();


    public static void main(String[] args) throws ExecutionException, InterruptedException {
        long start = System.currentTimeMillis();
        ExecutorService pool = Executors.newFixedThreadPool(10);
        //提交任务
        for (int i =0;i<10;i++){
            futureList.add(pool.submit(new CallableTask(i+1)));
        }
        long end = System.currentTimeMillis();

        //怎么获取任务结果?
        for (Future<Integer> future:futureList) {
            while(true){
                if(future.isDone()&&!future.isCancelled()){
                    Integer i = future.get();//获取结果
                    System.out.println("任务i="+i+"获取完成!"+new Date());
                    list.add(i);
                    break;//当前future获取结果完毕,跳出while
                }else {
                    Thread.sleep(10); //;轮训需要休息10s
                }
            }
        }
        System.out.println("list="+list);
        System.out.println("任务总耗时="+(System.currentTimeMillis()-start)+",取结果归集耗时="+(System.currentTimeMillis()-end));
    }

    private static class CallableTask implements Callable<Integer> {
        int i = 0;
        public CallableTask(int i) {
            this.i = i;
        }

        @Override
        public Integer call() throws Exception {
            if(i==1){
                Thread.sleep(3000);//任务1耗时3秒
            }else if(i==5){
                Thread.sleep(5000);//任务5耗时5秒
            }else {
                Thread.sleep(1000);//其它任务耗时1秒
            }
            System.out.println("task线程:"+Thread.currentThread().getName()+"任务i="+i+",完成!");
            return i;
        }
    }
}
复制代码

2.FutureTask

public class FutureTask<V> implements RunnableFuture<V> {}

public interface RunnableFuture<V> extends Runnable, Future<V> {
    void run();
}
继承Runnable和Future,那么就可以开启一个单线程,在阻塞获取任务结果。不推荐使用
复制代码

3.CompletionService

内部通过阻塞队列+FutureTask,实现了 任务先完成可优先获取到,即结果按照完成先后顺序排序。

4.CompletableFuture jdk1.8

CompletableFuture满足并发执行,顺序完成先手顺序获取的目标。而且支持每个任务的异常返回,配合流式编程,用起来速度飞起。JDK源生支持,API丰富,推荐使用。

总结:

Futrue FutureTask CompletionService CompletableFuture
原理 Futrue接口 接口RunnableFuture的唯一实现类,RunnableFuture接口继承自Future+Runnable: 内部通过阻塞队列+FutureTask接口 JDK8实现了Future, CompletionStage2个接口
多任务并发执行 支持 支持 支持 支持
获取任务结果的顺序 按照提交顺序获取结果 未知 支持任务完成的先后顺序 支持任务完成的先后顺序
异常捕捉 自己捕捉 自己捕捉 自己捕捉 源生API支持,返回每个任务的异常
建议 CPU高速轮询,耗资源,可以使用,但不推荐 功能不对口,并发任务这一块多套一层,不推荐使用。 推荐使用,没有JDK8 CompletableFuture 之前最好的方案,没有质疑 API极端丰富,配合流式编程,速度飞起,推荐使用!

线程池

Executors四种线程池分析:

//newFixedThreadPool LinkedBlockingQueue 无界队列
public static ExecutorService newFixedThreadPool(int nThreads) {
        return new ThreadPoolExecutor(nThreads, nThreads,
                                      0L, TimeUnit.MILLISECONDS,
                                      new LinkedBlockingQueue<Runnable>());
    }
//newSingleThreadExecutor 新的job来了都会创建新的thread
public static ExecutorService newSingleThreadExecutor() {
        return new FinalizableDelegatedExecutorService
            (new ThreadPoolExecutor(1, 1,
                                    0L, TimeUnit.MILLISECONDS,
                                    new LinkedBlockingQueue<Runnable>()));
    }
//newCachedThreadPool 有界队列 当前一个线程执行完毕,后续的job就可以复用这个线程
 public static newScheduledThreadPool newCachedThreadPool() {
        return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
                                      60L, TimeUnit.SECONDS,
                                      new SynchronousQueue<Runnable>());
    }
//newScheduledThreadPool 执行周期的job或者延迟任务
 public static ScheduledExecutorService newScheduledThreadPool(
            int corePoolSize, ThreadFactory threadFactory) {
        return new ScheduledThreadPoolExecutor(corePoolSize, threadFactory);
    }

复制代码

ThreadPoolExecutor分析:

public ThreadPoolExecutor(int corePoolSize,
                              int maximumPoolSize,
                              long keepAliveTime,
                              TimeUnit unit,
                              BlockingQueue<Runnable> workQueue) {
        this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
             Executors.defaultThreadFactory(), defaultHandler);
    }
复制代码
  1. 当线程池小于corePoolSize时,新提交任务将创建一个新线程执行任务,即使此时线程池中存在空闲线程。
  2. 当线程池达到corePoolSize时,新提交任务将被放入workQueue中,等待线程池中任务调度执行。
  3. 当workQueue已满,且maximumPoolSize>corePoolSize时,新提交任务会创建新线程执行任务。
  4. 当提交任务数超过maximumPoolSize时,新提交任务由RejectedExecutionHandler处理。
  5. 当线程池中超过corePoolSize线程,空闲时间达到keepAliveTime时,关闭空闲线程。
  6. 当设置allowCoreThreadTimeOut(true)时,线程池中corePoolSize线程空闲时间达到keepAliveTime也将关闭。

线程池中的堵塞队列:

BlockingQueue《I》

LinkedBlockingQueue:LinkedBlockingQueue中也有两个Node分别用来存放首尾节点,并且里面有个初始值为0的原子变量count用来记录队列元素个数,另外里面有两个ReentrantLock的独占锁,分别用来控制元素入队和出队加锁,其中takeLock用来控制同时只有一个线程可以从队列获取元素,其他线程必须等待,putLock控制同时只能有一个线程可以获取锁去添加元素,其他线程必须等待。本质上就是生产消费模型。

ReentrantLock:可重入锁。

SynchronousQueue:队列长度为1,每一个put对应一个take方法,非常适合生产消费模式。

本质思考:

  • 队列的有界无界是针对队列的元素大小来说的,LinkedBlockingQueue给了队列的size就是有界,如果调用了无参数的构造就是无界的。无界的任务队列不存在任务入队失败的情况(除非资源耗尽)。
java线程-调度
原文  https://juejin.im/post/5d505ab06fb9a06b1d213523
正文到此结束
Loading...