并发通常是用于提高运行在 单处理器
上的程序的性能。在单 CPU 机器上使用多任务的程序在任意时刻只在执行一项工作。
并发编程使得一个程序可以被划分为多个分离的、独立的任务。一个线程就是在进程中的一个单一的顺序控制流。 java的线程机制是抢占式。
线程的好处是提供了轻量级的执行上下文切换,只改变了程序的执行序列和局部变量。
多线程的主要缺陷:<!-- java编程思想 -->
当程序运行,JVM会为每一个线程分配一个独立的缓存用于提高执行效率,每一个线程都在自己独立的缓存中操作各自的数据。一个线程在缓冲中对数据进行修改,写入到主存后,其他线程无法得知数据已被更改,仍在操作缓存中已过时的数据,为了解决这个问题,提供了volatile关键字,实现内存可见,一旦主存数据被修改,便致使其他线程缓存数据行无效,强制前往主存获取新数据。
Example:内存不可见,导致主线程无法结束。
class ThreadDemo implements Runnable { //添加volatile关键字可实现内存可见性 public volatile boolean flag = false; public boolean flag = Boolean.false; @Override public void run() { try { Thread.sleep(200); } catch (InterruptedException e) { } flag = Boolean.true; System.out.println("ThreadDemo over"); } public boolean isFlag() { return flag; } } public class TestVolatile { public static void main(String[] args) { ThreadDemo demo = new ThreadDemo(); new Thread(demo).start(); while (true) { if (demo.flag || demo.isFlag()) { System.out.println("Main over"); break; } } } }/*output:打印ThreadDemo over,主线程持续循环*/
当多个线程操作共享数据时,保证内存中的数据可见性。采用底层的内存栅栏,及时的将缓存中修改的数据刷新到主存中,并导致其他线程所缓存的数据无效,使得这些线程必须去主存中获取修改的数据。
Example:使用volatile修饰,number自增问题。
class ThreadDemo implements Runnable { public volatile int number = 0; @Override public void run() { try { Thread.sleep(200); } catch (Exception e) { } System.out.print(getIncrementNumber() + " "); } public int getIncrementNumber() { return ++number; } } public class TestAtomic { public static void main(String[] args) { ThreadDemo demo = new ThreadDemo(); for (int i = 0; i < 10; i++) { new Thread(demo).start(); } } }/*output: 1 5 4 7 3 9 2 1 8 6 */
// ++number底层原理思想 int temp = number; // ① number = number + 1; // ② temp = number; // ③ return temp; // ④
由 ++number 可知,返回的是 temp 中存储的值,且自增是一个多步操作,当多个线程调用 incrementNumber方法时,方法去主存中获取 number 值放入 temp 中,根据 CPU 时间片切换,当 A 线程完成了 ③ 操作时,时间片到了被中断,A 线程开始执行 ① 时不幸被中断,接着 A 获取到了CPU执行权,继续执行完成 ④ 操作更新了主存中的值,紧接着 B 线程开始执行,但是 B 线程 temp中存储的值已经过时了。 注意:自增操作为四步,只有在第四步的时候才会刷新主存的值,而不是number = number + 1 操作就反映到主存中去。
如图所示:
volatile只能保证内存可见性,对多步操作的变量,无法保证其原子性,为了解决这个问题,提供了原子变量。
原子变量既含有volatile的内存可见性,又提供了对变量原子性操作的支持,采用底层硬件对并发操作共享数据的 CAS(Compare-And-Swap)算法,保证数据的原子性。
类 | 描述 |
---|---|
AtomicBoolean | 一个 boolean 值可以用原子更新。 |
AtomicInteger | 可能原子更新的 int 值。 |
AtomicIntegerArray | 一个 int 数组,其中元素可以原子更新。 |
AtomicIntegerFieldUpdater <T> | 基于反射的实用程序,可以对指定类的指定的 volatile int 字段进行原子更新。 |
AtomicLong | 一个 long 值可以用原子更新。 |
AtomicLongArray | 可以 long 地更新元素的 long 数组。 |
AtomicLongFieldUpdater <T> | 基于反射的实用程序,可以对指定类的指定的 volatile long 字段进行原子更新。 |
AtomicMarkableReference <V> | AtomicMarkableReference 维护一个对象引用以及可以原子更新的标记位。 |
AtomicReference <V> | 可以原子更新的对象引用。 |
AtomicReferenceArray <E> | 可以以原子方式更新元素的对象引用数组。 |
AtomicReferenceFieldUpdater <T,V> | 一种基于反射的实用程序,可以对指定类的指定的 volatile volatile引用原子更新。 |
AtomicStampedReference <V> | AtomicStampedReference 维护对象引用以及可以原子更新的整数“印记”。 |
DoubleAccumulator | 一个或多个变量一起维护使用提供的功能更新的运行的值 double 。 |
DoubleAdder | 一个或多个变量一起保持初始为零 double 和。 |
LongAccumulator | 一个或多个变量,它们一起保持运行 long 使用所提供的功能更新值。 |
LongAdder | 一个或多个变量一起保持初始为零 long 总和。 |
CAS(Compare-And-Swap)是底层硬件对于原子操作的一种算法,其包含了三个操作数:内存值(V),预估值(A),更新值(B)。当且仅当 V == A 时, 执行 V = B 操作;否则不执行任何结果。这里需要注意, A 和 B 两个操作数是原子性的,同一时刻只能有一个线程进行AB操作。
HashMap是线程不安全的,而HashTable是线程安全的,因为HashTable所维护的Hash表存在着独占锁,当多个线程并发访问时,只能有一个线程可进行操作,但是对于复合操作时,HashTable仍然存在线程安全问题,不使用HashTable的主要原因还是效率低下。
// 功能:不包含obj,则添加 if (!hashTable.contains(obj)) { // 复合操作,执行此处时线程中断,obj被其他线程添加至容器中,此处继续执行将导致重复添加 hashTable.put(obj); } 可知上述两个操作需要 “原子性”,为了达到效果,还不是得对代码块进行同步
采用锁分段机制,分为 16 个段(并发级别),每一个段下有一张表,该表采用链表结构链接着各个元素,每个段都使用独立的锁。当多个线程并发操作的时候,根据各自的级别不同,操作不同的段,多个线程并行操作,明显提高了效率,其次还提供了复合操作的诸多方法。 注:jdk1.8由原来的数组+单向链表结构转换成数据+单向链表+红黑树结构。
有序的哈希表,通过跳表实现,不允许null作为键或值。 ConcurrentSkipListMap详解
对collection进行写入操作时,将导致创建整个底层数组的副本,而源数组将保留在原地,使得复制的数组在被修改时,读取操作可以安全的执行。当修改完成时,一个原子性的操作将把心的数组换人,使得新的读取操作可以看到新的修改。<!--Java编程思想-->
好处之一是当多个迭代器同时遍历和修改列表时,不会抛出ConcurrentModificationException。
当一个修房子的 A 线程正在执行,需要砖头时,开启了一个线程 B 去拉砖头,此时 A 线程需要等待 B 线程的结果后才能继续执行时,但是线程之间都是并行操作的,为了解决这个问题,提供了CountDownLatch。
一个同步辅助类,为了保证执行某些操作时,“所有准备事项都已就绪”,仅当某些操作执行完毕后,才能执行后续的代码块,否则一直等待。
CountDownLatch中存在一个锁计数器,如果锁计数器不为 0 的话,它会阻塞任何一个调用 await() 方法的线程。也就是说,当一个线程调用 await() 方法时,如果锁计数器不等于 0,那么就会一直等待锁计数器为 0 的那一刻,这样就解决了需要等待其他线程执行完毕才执行的需求。
class ThreadDemo implements Runnable { private CountDownLatch latch = null; public ThreadDemo(CountDownLatch latch) { this.latch = latch; } @Override public void run() { try { System.out.println("execute over"); } finally { latch.countDown(); // 必须保证计数器减一 } } } public class TestCountDownLatch { public static void main(String[] args) { final int count = 10; final CountDownLatch latch = new CountDownLatch(count); ThreadDemo demo = new ThreadDemo(latch); for (int i = 0; i < count; ++i) { new Thread(demo).start(); } try { latch.await(); // 等待计数器为 0 System.out.println("其他线程结束,继续往下执行..."); } catch (InterruptedException e) { e.printStackTrace(); } } }/**output: execute over ... 其他线程结束,继续往下执行... */
当开启一个线程执行运算时,可能会需要该线程的计算结果,之前的 implements Runnable
和 extends Thread
的 run() 方法并没有提供可以返回的功能,因此提供了 Callable接口。 Callable 的运行结果, 需要使用 FutureTask 类来接受。
class ThreadDemo implements Callable<Integer> { private Integer cycleValue; public ThreadDemo(Integer cycleValue) { this.cycleValue = cycleValue; } @Override public Integer call() throws Exception { int result = 0; for (int i=0; i<cycleValue; ++i) { result += i; } return result; } } public class TestCallable { public static void main(String[] args) throws Exception { ThreadDemo demo = new ThreadDemo(Integer.MAX_VALUE); // 使用FutureTask接受结果 FutureTask<Integer> task = new FutureTask<>(demo); new Thread(task).start(); Integer result = task.get(); // 等待计算结果返回, 闭锁 System.out.println(result); } }/*output:1073741825 */
Lock: 在进行性能测试时,使用Lock通常会比使用synchronized要高效许多,并且synchronized的开销变化范围很大,而Lock相对稳定。只有在性能调优时才使用Lock对象。<!--Java编程思想-->
Condition:替代了 Object 监视器方法的使用,描述了可能会与锁有关的条件标量,相比 Object 的 notifyAll()
,Condition 的 signalAll()
更安全。Condition 实质上被绑定到一个锁上,使用newCondition() 方法为 Lock 实例获取 Condition。
Lock和Condition对象只有在困难的多线程问题中才是必须的。 <!--Java编程思想-->
synchonized | Lock |
---|---|
隐式锁 | 显示锁 |
JVM底层实现,由JVM维护 | 由程序员手动维护 |
灵活控制(也有风险) |
“虚假唤醒”:当一个线程A在等待时,被另一个线程唤醒,被唤醒的线程不一定满足了可继续向下执行的条件,如果被唤醒的线程未满足条件,而又向下执行了,那么称这个现象为 “虚假唤醒”。
// 安全的方式,保证退出等待循环前,一定能满足条件 while (条件) { wait(); }
Example:生产消费者<!--参考Java编程思想 P712-->
// 产品car class Car { private Lock lock = new ReentrantLock(); private Condition condition = lock.newCondition(); private boolean available = false; // false:无货;true有货 public void put(){ lock.lock(); try { while (available) { // 有货等待 condition.await(); } System.out.println(Thread.currentThread().getName() + "put(): 进货"); available = true; condition.signalAll(); } catch (InterruptedException e) { e.printStackTrace(); } finally { lock.unlock(); } } public void get() { lock.lock(); try { while (!available) { // 无货等待 condition.await(); } System.out.println(Thread.currentThread().getName() + "get():出货"); available = false; condition.signalAll(); } catch (InterruptedException e) { e.printStackTrace(); } finally { lock.unlock(); } } } // 消费者 class Consume implements Runnable { private Car car; public Consume(Car car) { this.car = car; } @Override public void run() { for (int i=0; i<TestProduceAndConsume.LOOP_SIZE; ++i) { car.get(); try { Thread.sleep(100); } catch (InterruptedException e) { } } } } // 生产者 class Produce implements Runnable { private Car car; public Produce(Car car) { this.car = car; } @Override public void run() { for (int i=0; i<TestProduceAndConsume.LOOP_SIZE; i++) { car.put(); } } } public class TestProduceAndConsume { public static final int LOOP_SIZE = 10; public static void main(String[] args) { Car car = new Car(); for (int i=0; i<5; ++i) { Consume consume = new Consume(car); Produce produce = new Produce(car); new Thread(consume, i + "--").start(); new Thread(produce, i + "--").start(); } } }
每一个 对 lock()
的调用都必须紧跟着一个 try-finally 子句,用以保证可以在任何情况下都能释放锁,任务在调用 await()
、 signal()
、 signalAll()
之前,必须拥有锁。
lock.lock(); try { ... // 业务代码 } finally { lock.unlock(); }
上述讲解的锁都是读写一把锁,不论是读或写,都是一把锁解决,当多线程访问数据时,若发生了一千次操作,其中的写操作只执行了一次,数据的更新率非常低,那么每次进行读操作时,都要加锁读取”不会更改的“数据,显然是不必要的开销,因此出现了 ReadWriteLock 读写锁,该对象提供读锁和写锁。
ReadWriteLock 维护了一对相关的锁,一个用于只读操作,另一个用于写入操作。只要没有 write写入操作,那么多个线程可以同时进行持有读锁。而写入锁是独占的,当执行写操作时,其他线程不可写,也不可读。
性能的提升取决于读写操作期间读取数据相对于修改数据的频率,如果读取操作远远大于写入操作时,便能增强并发性。
class Demo { private int value = 0; private ReadWriteLock lock = new ReentrantReadWriteLock(); public void read() { lock.readLock().lock(); try { System.out.println(Thread.currentThread().getName() + " : " + value); } finally { lock.readLock().unlock(); } } public void write(int value) { lock.writeLock().lock(); try { this.value = value; System.out.println("write(" + value + ")"); } finally { lock.writeLock().unlock(); } } } class ReadLock implements Runnable { private Demo demo = null; public ReadLock(Demo demo) { this.demo = demo; } @Override public void run() { for (int i=0; i<20; ++i) { demo.read(); try { Thread.sleep(320); } catch (InterruptedException e) { } } } } class WriteLock implements Runnable { private Demo demo = null; public WriteLock(Demo demo) { this.demo = demo; } @Override public void run() { for (int i=0; i<10; ++i) { demo.write(i); try { Thread.sleep(200); } catch (InterruptedException e) { } } } } public class TestReadWriteLock { public static void main(String[] args) { Demo demo = new Demo(); ReadLock readLock = new ReadLock(demo); WriteLock writeLock = new WriteLock(demo); for (int i=0; i<3; ++i) { new Thread(readLock, i + "--").start(); } new Thread(writeLock).start(); } }/**output: 0-- : 0 1-- : 0 2-- : 0 write(0) write(1) 1-- : 1 2-- : 1 0-- : 1 write(2) write(3) 1-- : 3 0-- : 3 ... */
在传统操作中(如连接数据库),当我们需要使用一个线程的时候,就 直接创建一个线程,线程完毕后被垃圾收集器回收。每一次需要线程的时候,不断的创建与销毁,大大增加了资源的开销。
线程池维护着一个线程队列,该队列中保存着所有等待着的线程,避免了重复的创建与销毁而带来的开销。
Execuotr:负责线程的使用与调度的根接口。 |- ExecutorService:线程池的主要接口。 |- ForkJoinPool:采用分而治之技术将任务分解。 |- ThreadPoolExecutor:线程池的实现类。 |- ScheduledExecutorService:负责线程调度的子接口。 |- ScheduledThreadPoolExecutor:负责线程池的调度。继承ThreadPoolExecutor并实现ScheduledExecutorService接口
方法 | 描述 |
---|---|
ExecutorService newFixedThreadPool(int nThreads) | 创建一个可重用固定数量的无界队列线程池。使用了有限的线程集来执行所提交的所有任务。创建的时候可以一次性预先进行代价高昂的线程分配。 |
ExecutorService newWorkStealingPool(int parallelism) | 创建一个维护足够的线程以支持给定的parallelism并行级别的线程池。 |
ExecutorService newSingleThreadExecutor() | 创建一个使用单个线程运行的无界队列的执行程序。 |
ExecutorService newCachedThreadPool() | 创建一个根据需要创建新线程的线程池,当有可用线程时将重新使用以前构造的线程。 |
ScheduledExecutorService newSingleThreadScheduledExecutor() | 创建一个单线程执行器,可以调度命令在给定的延迟之后运行,或定期执行。 |
ScheduledExecutorService newScheduledThreadPool(int corePoolSize) | 创建一个线程池,可以调度命令在给定的延迟之后运行,或定期执行。 |
ThreadFactory privilegedThreadFactory() | 返回一个用于创建与当前线程具有相同权限的新线程的线程工厂。 |
ExecutorService.shutdown():防止新任务被提交,并继续运行被调用之前所提交的所有任务,待任务都完成后退出。
CachedThreadPoo在程序执行过程中通常会创建与所需数量相同的线程,然后在它回收旧线程时停止创建新线程,是Executor的首选。仅当这个出现问题时,才需切换 FixedThreadPool。
SingleThreadExecutor: 类似于线程数量为 1 的FixedThreadPool,但它提供了不会存在两个及以上的线程被并发调用的并发。
public class TestThreadPool { public static void main(String[] args) throws Exception { ExecutorService pool = Executors.newFixedThreadPool(2); for (int i = 0; i < 10; ++i) { Future<String> future = pool.submit(new Callable<String>() { @Override public String call() throws Exception { return Thread.currentThread().getName(); } }); String threadName = future.get(); System.out.println(threadName); } pool.shutdown(); // 拒绝新任务并等待正在执行的线程完成当前任务后关闭。 } }/**output: pool-1-thread-1 pool-1-thread-2 pool-1-thread-1 pool-1-thread-2 ... */
public class TestThreadPool { public static void main(String[] args) throws Exception { ScheduledExecutorService pool = Executors.newScheduledThreadPool(2); for (int i = 0; i < 5; ++i) { ScheduledFuture<String> future = pool.schedule(new Callable<String>() { @Override public String call() throws Exception { return Thread.currentThread().getName() + " : " + Instant.now(); } }, 1, TimeUnit.SECONDS); // 延迟执行单位为 1秒的任务 String result = future.get(); System.out.println(result); } pool.shutdown(); } }/**output: pool-1-thread-1 : 2019-03-18T12:10:31.260Z pool-1-thread-1 : 2019-03-18T12:10:32.381Z pool-1-thread-2 : 2019-03-18T12:10:33.382Z pool-1-thread-1 : 2019-03-18T12:10:34.383Z pool-1-thread-2 : 2019-03-18T12:10:35.387Z */
<span style="color: red">注意:若没有执行 shutdown()
方法,则线程会一直等待而不停止。</span>
在一个线程队列中,假如队头的线程由于某种原因导致了阻塞,那么在该队列中的后继线程需要等待队头线程结束,只要队头一直阻塞,这个队列中的所有线程都将等待。此时,可能其他线程队列都已经完成了任务而空闲,这种情况下,就大大减少了吞吐量。
当执行一个新任务时,采用分而治之的思想,将其分解成更小的任务执行,并将分解的任务加入到线程队列中,当某一个线程队列没有任务时,会随机从其他线程队列中“偷取”一个任务,放入自己的队列中执行。
Example:
// 求次方: value为底,size为次方数 class CountPower extends RecursiveTask<Long> { private static final long serialVersionUID = 1L; public Long value = 0L; public int size = 0; public static final Long CRITICAL = 10L; // 阈值 public CountPower(Long value, int size) { this.value = value; this.size = size; } @Override protected Long compute() { // 当要开方的此时 小于 阈值,则计算 (视为最小的任务单元) if(size <= CRITICAL) { Long sum = 1L; for (int i=0; i<size; ++i) { sum *= value; } return sum; } else { int mid = size / 2; // 拆分任务,并压入线程队列 CountPower leftPower = new CountPower(value, mid); leftPower.fork(); CountPower rightPower = new CountPower(value, size - mid); rightPower.fork(); // 将当前两个任务返回的执行结果再相乘 return leftPower.join() * rightPower.join(); } } } public class TestForkJoinPool { public static void main(String[] args) throws Exception { ForkJoinPool pool = new ForkJoinPool(); CountPower task = new CountPower(2L, 11); Long result = pool.invoke(task); System.out.println(result); } }/**output: 2048*/
根据分而治之的思想进行分解,需要一个结束递归的条件,该条件内的代码就是被分解的最小单元。使用 fork()在当前任务正在运行的池中异步执行此任务,即将该任务压入线程队列。调用
join()`返回计算结果。RecursiveTask是有返回值的task,RecursiveAction则是没有返回值的。