并发并不一定依赖多线程,但Java里谈论并发大多数都与线程脱不开关系。 线程是比进程更轻量级的调度执行单位 ,线程的引入可以把一个进程的资源分配和执行调度分开,各个线程既可以共享进程资源(内存地址,文件IO等),又可以独立调度(线程是CPU调度的基本单位)。 Thread类的所有关键方法都声明了native的 ,意味着这个方法没有使用或无法使用平台无关的手段来实现,也有可能是为了执行效率。
线程调度是指系统为线程分配处理器使用权的过程,主要调度方式分两种,分别是协同式线程调度和抢占式线程调度。 协同式线程调度,线程执行时间由线程本身来控制,线程把自己的工作执行完之后,要主动通知系统切换到另外一个线程上。最大好处是实现简单,且切换操作对线程自己是可知的,没啥线程同步问题。坏处是线程执行时间不可控制,如果一个线程有问题,可能一直阻塞在那里。 抢占式调度,每个线程将由系统来分配执行时间,线程的切换不由线程本身来决定(Java中,Thread.yield()可以让出执行时间,但无法获取执行时间)。线程执行时间系统可控,也不会有一个线程导致整个进程阻塞。 **Java线程调度就是抢占式调度。 ** 希望系统能给某些线程多分配一些时间,给一些线程少分配一些时间,可以通过设置线程优先级来完成。Java语言一共10个级别的线程优先级(Thread.MIN_PRIORITY至Thread.MAX_PRIORITY),在两线程同时处于ready状态时,优先级越高的线程越容易被系统选择执行。但优先级并不是很靠谱,因为Java线程是通过映射到系统的原生线程上来实现的,所以线程调度最终还是取决于操作系统。
Java定义了5种线程状态,在任意一个点一个线程只能有且只有其中一种状态。无限等待和等待可以算在一起。所以共五种。 1、新建(New):创建后尚未启动的线程。 2、运行(Runnable):Runnable包括操作系统线程状态中的Running和Ready,也就是处于此状态的线程有可能正在执行,也有可能等待CPU为它分配执行时间。线程对象创建后,其他线程调用了该对象的start()方法。该状态的线程位于“可运行线程池”中,变得可运行,只等待获取CPU的使用权。即在就绪状态的进程除CPU之外,其它的运行所需资源都已全部获得。 3、无限期等待(Waiting):该状态下线程不会被分配CPU执行时间,要等待被其他线程显式唤醒。如没有设置timeout的object.wait()方法和Thread.join()方法,以及LockSupport.park()方法。 4、限期等待(Timed Waiting):不会被分配CPU执行时间,不过无须等待被其他线程显式唤醒,在一定时间之后会由系统自动唤醒。如Thread.sleep(),设置了timeout的object.wait()和thread.join(),LockSupport.parkNanos()以及LockSupport.parkUntil()方法。
5、阻塞(Blocked):线程被阻塞了。与等待状态的区别是:阻塞在等待着获取到一个排他锁,这个事件将在另外一个线程放弃这个锁的时候发生;而等待则在等待一段时间,或唤醒动作的发生。在等待进入同步区域时,线程将进入这种状态。
**阻塞的情况分三种: ** (1)、等待阻塞:运行的线程执行wait()方法,该线程会释放占用的所有资源,JVM会把该线程放入“等待池”中。进入这个状态后,是不能自动唤醒的,必须依靠其他线程调用notify()或notifyAll()方法才能被唤醒,即无限期等待。 (2)、同步阻塞:运行的线程在获取对象的同步锁时,若该同步锁被别的线程占用,则JVM会把该线程放入“锁池”中。 (3)、其他阻塞:运行的线程执行sleep()或join()方法,或者发出了I/O请求时,JVM会把该线程置为阻塞状态。当sleep()状态超时、join()等待线程终止或者超时、或者I/O处理完毕时,线程重新转入就绪状态。即限期等待。
注意:join() 代表当前线程需要等待子线程的执行完毕,当前线程可以是主线程也可以是一个其他的子线程。join()会交出当前的cpu时间分片。
本质上: 通过反射去调用相应的类的方法来进行Aop的编程。所有的线程框架最终都会调用thread构造函数,可以通过hook thread来打印相关的调用堆栈信息。来排查三方库函数对线程的使用。收敛就是要控制团队中线程的创建、使用。可以通过自建的线程池来管理线程。
1.Futrue接口
public interface Future<V> { boolean cancel(boolean var1); boolean isCancelled(); boolean isDone(); V get() throws InterruptedException, ExecutionException; V get(long var1, TimeUnit var3) throws InterruptedException,ExecutionException,TimeoutException; } 复制代码
public class FutureDemo { //结果集 static List<Integer> list = new ArrayList<>(); static List<Future<Integer>> futureList = new ArrayList<>(); public static void main(String[] args) throws ExecutionException, InterruptedException { long start = System.currentTimeMillis(); ExecutorService pool = Executors.newFixedThreadPool(10); //提交任务 for (int i =0;i<10;i++){ futureList.add(pool.submit(new CallableTask(i+1))); } long end = System.currentTimeMillis(); //怎么获取任务结果? for (Future<Integer> future:futureList) { while(true){ if(future.isDone()&&!future.isCancelled()){ Integer i = future.get();//获取结果 System.out.println("任务i="+i+"获取完成!"+new Date()); list.add(i); break;//当前future获取结果完毕,跳出while }else { Thread.sleep(10); //;轮训需要休息10s } } } System.out.println("list="+list); System.out.println("任务总耗时="+(System.currentTimeMillis()-start)+",取结果归集耗时="+(System.currentTimeMillis()-end)); } private static class CallableTask implements Callable<Integer> { int i = 0; public CallableTask(int i) { this.i = i; } @Override public Integer call() throws Exception { if(i==1){ Thread.sleep(3000);//任务1耗时3秒 }else if(i==5){ Thread.sleep(5000);//任务5耗时5秒 }else { Thread.sleep(1000);//其它任务耗时1秒 } System.out.println("task线程:"+Thread.currentThread().getName()+"任务i="+i+",完成!"); return i; } } } 复制代码
2.FutureTask
public class FutureTask<V> implements RunnableFuture<V> {} public interface RunnableFuture<V> extends Runnable, Future<V> { void run(); } 继承Runnable和Future,那么就可以开启一个单线程,在阻塞获取任务结果。不推荐使用 复制代码
3.CompletionService
内部通过阻塞队列+FutureTask,实现了 任务先完成可优先获取到,即结果按照完成先后顺序排序。
4.CompletableFuture jdk1.8
CompletableFuture满足并发执行,顺序完成先手顺序获取的目标。而且支持每个任务的异常返回,配合流式编程,用起来速度飞起。JDK源生支持,API丰富,推荐使用。
总结:
Futrue | FutureTask | CompletionService | CompletableFuture | |
---|---|---|---|---|
原理 | Futrue接口 | 接口RunnableFuture的唯一实现类,RunnableFuture接口继承自Future+Runnable: | 内部通过阻塞队列+FutureTask接口 | JDK8实现了Future, CompletionStage2个接口 |
多任务并发执行 | 支持 | 支持 | 支持 | 支持 |
获取任务结果的顺序 | 按照提交顺序获取结果 | 未知 | 支持任务完成的先后顺序 | 支持任务完成的先后顺序 |
异常捕捉 | 自己捕捉 | 自己捕捉 | 自己捕捉 | 源生API支持,返回每个任务的异常 |
建议 | CPU高速轮询,耗资源,可以使用,但不推荐 | 功能不对口,并发任务这一块多套一层,不推荐使用。 | 推荐使用,没有JDK8 CompletableFuture 之前最好的方案,没有质疑 | API极端丰富,配合流式编程,速度飞起,推荐使用! |
Executors四种线程池分析:
//newFixedThreadPool LinkedBlockingQueue 无界队列 public static ExecutorService newFixedThreadPool(int nThreads) { return new ThreadPoolExecutor(nThreads, nThreads, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>()); } //newSingleThreadExecutor 新的job来了都会创建新的thread public static ExecutorService newSingleThreadExecutor() { return new FinalizableDelegatedExecutorService (new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>())); } //newCachedThreadPool 有界队列 当前一个线程执行完毕,后续的job就可以复用这个线程 public static newScheduledThreadPool newCachedThreadPool() { return new ThreadPoolExecutor(0, Integer.MAX_VALUE, 60L, TimeUnit.SECONDS, new SynchronousQueue<Runnable>()); } //newScheduledThreadPool 执行周期的job或者延迟任务 public static ScheduledExecutorService newScheduledThreadPool( int corePoolSize, ThreadFactory threadFactory) { return new ScheduledThreadPoolExecutor(corePoolSize, threadFactory); } 复制代码
ThreadPoolExecutor分析:
public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue) { this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, Executors.defaultThreadFactory(), defaultHandler); } 复制代码
BlockingQueue《I》
LinkedBlockingQueue:LinkedBlockingQueue中也有两个Node分别用来存放首尾节点,并且里面有个初始值为0的原子变量count用来记录队列元素个数,另外里面有两个ReentrantLock的独占锁,分别用来控制元素入队和出队加锁,其中takeLock用来控制同时只有一个线程可以从队列获取元素,其他线程必须等待,putLock控制同时只能有一个线程可以获取锁去添加元素,其他线程必须等待。本质上就是生产消费模型。
ReentrantLock:可重入锁。
SynchronousQueue:队列长度为1,每一个put对应一个take方法,非常适合生产消费模式。
本质思考: