同步容器类包括Vector和Hashtable,这两个是早期JDK的一部分。此外还包括在JDK1.2中添加Collections.synchronizedXxx等工厂方法,这些类实现线程安全的方式是:将它们的状态封装起来,并对公有方法进行同步,使得每次只有一个线程能访问容器的状态。
同步容器类都是线程安全的,但是某些 复合操作
需要额外的客户端加锁来保护,常见的复合操作:
由于同步容器类要遵守同步策略,即支持客户端加锁,因此可能会创造一些新的操作,只要我们知道应该使用哪一锁,那么这些新操作就与其他操作一样都是原子操作。
for (int i = 0; i < vector.size();i++){ doSomething(vector.get(i)) }
一个线程访问,一个线程删除,在这两个线程交替执行的时候,代码就可能出现问题,将抛出ArrayIndexOutOfBoundsException。
解决方式是将怎么for循环加锁,锁为vector对象。
不论是直接迭代还是jdk5引入的for-each语法,对容器的标准访问方式就是使用 Iterator(迭代器)
,但是Iterator并没有考虑到并发修改的问题,迭代器使用的策略是 快速失败(fail-fast)
。当迭代器发现容器在迭代过程被修改了,就会抛出一个ConcurrentModificationException异常。
这种快速失败机制并不算是完备的处理机制,只是捕获了可能会出现并发错误,只能作为一个并发问题预警指示器。要想避免ConcurrentModificationException异常,就必须在迭代过程持有容器的锁。
在某些情况下,迭代器会隐藏起来。例如 javaSystem.out.println("iterm:"+set);
,编译器将字符串的连接操作转换为调用 StringBuilder.append(Object)
,而这个方法将会调用容器的 toString
方法,标准容器的toString方法将迭代容器。
正如封装对象的状态有助于维持不变性条件一样,封装对象的同步机制同样有助于确保实施同步策略。
同步容器通过对所有容器状态的访问都串行化,以实现它们的线程安全性。当然这种办法的代价是严重降低并发性,当多个线程竞争容器的锁时,吞吐量将严重降低。
通过并发容器来代替同步容器,可以极大的提高伸缩性并降低风险。
ConcurrentHashMap
,用来代替同步且基于散列的Map。 CopyOnWriteArrayList
,用于在遍历操作为主要操作的情况下代替同步的List。 ConCurrentMap
接口中增加了一些常用的复合条件,如“若没有就添加”,替换,以及有条件删除等。
JDK5中增加了两种新容器, Queue
和 BlockingQueue
,用来临时保存一组等待处理的元素。它提供了几种实现,包括:
ConcurrentLinkedQueue PriorityQueue
Queue本身就是由LinkedList实现的
),但是还是需要一个Queue类,因为它能去掉List的随机访问需求,实现更加高效的并发。
BlockingQueue
拓展了Queue,增加了可阻塞的插入和获取等操作。
JDK6引入了:
ConcurrentSkipListMap
:同步的 SortedMap
的并发替代品 ConcurrentSkipListSet
:同步的 SortedSet
的并发替代品
同步容器类在执行每个操作期间都持有一个锁。与HashMap一样,ConcurrentHashMap也是一个 基于散列的Map
,使用了一种不同的策略来提供更高的并发性和伸缩性:
分段锁(Locking Striping)
做更细粒度的加锁机制来实现更大程度的共享。这样的好处是,在多线程并发访问下将实现更高的吞吐量,而单线程环境只损失非常小的性能。 ConcurrentModificationException异常
,ConcurrentHashMap返回的迭代器具有 弱一致性(Weakly Consistent)
。 弱一致性的迭代器可以容忍并发的修改
,当创建迭代器的时候会遍历已有的元素,可以(并不保证)在迭代器在被构造后将修改操作反映给容器。 与Hashtable和synconizedMap相比,ConcurrentHashMap有着更多的优势和更少的劣势,在大多数并发情况下,用ConcurrentHashMap来代替同步Map能提高代码的可伸缩性,只有当需要加锁Map进行访问时,才应该放弃使用concurrentHashMap。
由于ConcurrentMap并不是通过持有锁来控制对象的独占访问,所以我们无法靠加锁新建原子操作。但是常见的如:“若没有则添加”,“若相等则移除”,“若相等则替换” 等复合操作都已经实现(具体可以看接口描述)。
CopyOnWriteArraryList 用于替代同步List,在某些情况下它能提供更好的并发性能,在迭代期间并不需要对容器进行复制或者加锁。
CopyOnWrite(写入时复制)
容器的线程安全性在于,只要 正确发布一个事实不变的对象
,那么在访问该对象的时候就不需要进一步的同步。 阻塞队列提供了:
put
和 take
方法:如果队列满了,那么put队列将阻塞至有空间可用;如果队列为空,take方法将会阻塞至有元素可用。 offer
和 poll
方法 队列可以是有界的也可以是无界的,无界队列永远也不会充满,所以put方法永远也不会阻塞。
阻塞队列支持生产者-消费者模式,把 找出需要完成的工作
和 执行工作
这两个过程分开,并将工作放入一个 待完成
列表,以便后续处理,而不是找出立即处理。
在构建高可靠的应用程序时,有界队列是一种强大的资源管理工具:它们能抑制并防止产生过多的工作项,使应用程序在负荷过载的情况下变得更加健壮。
LinkedBlockingQueue
和 ArrayBlockingQueue
是 First In,First Out(FIFO)队列
,分别和LinkedList和ArrayList相似。但比同步List有更好的并发性能。 PriorityBlockingQueue
是一个优先队列,可以通过元素的自然顺序比较,也可以使用Comparator方法。 SynchronousQueue
,实际上并不算是一个真正的队列,因为它不会为队列中的元素维护存储空间。与其他队列不同的是,它维护一组线程,这些线程在等着把元素加入或者移出队列。这种实现队列的方式看起来很奇怪,等于直接将工作交付给消费者线程,从而降低了将数据从生产者移动到消费者的延迟(避免串行入列和出列)。并且一旦工作被交付,生产者可以立即得到反馈。 jdk6增加了两种容器类型:
Deque BlockingDeque
Deque是一个 双端队列
,实现了在队列头和队列尾的高效插入和移除,具体实现有ArrayDeque和LinkedBlockingDeque。
正如阻塞队列适用于生产者-消费者模式,双端队列适用于工作密取模式。
在生产者-消费者模式中,所有消费者都有一个共享的工作队列。而工作密取中,每个消费者都有一个自己的双端队列,如果一个消费者完成了自己双端队列中的全部工作,那么它可以从其他消费者队列的尾部来秘密获取工作(G1中的处理DCQ就是使用工作密取模式)。
工作密取比生产者消费者模式具有更好的伸缩性,消费者基本不会在单个共享的任务队列上发生竞争。当一个消费者线程要访问另一个队列时,是从尾部而不是头部获取,进一步降低了队列的竞争程度。
线程可能会阻塞或暂停执行,原因有多种:
当线程阻塞时,它通常被挂起,并处于某种阻塞状态:
BLOCK WAITING TIMED_WAITING
被阻塞的线程必须等待某个不受它控制的事件发生后才能继续执行,例如等待的I/O操作已经完成,锁可用了等等。当某个外部事件发生时,线程被置回 RUNNABLE
状态,并且可以执行调度。
Thread提供了 interrupt
方法,用于中断线程或者查询线程是否已经被中断,每个线程都有个boolean来表示线程的中断状态。
中断是一种协作机制,一个线程不能强制其他线程停止正在执行的操作而去执行其他操作。
当线程A中断B,A仅仅是要求B在执行到某个可以暂停的地方停止正在执行的操作(前提是如果B愿意停下)。最常使用的中断的情况就是取消某个操作。
当在代码中调用了一个将抛出InterruptedException异常的方法时,你的方法就变成了阻塞方法,并且必须处理中断的响应:
传递InterruptedExcption
:避开这个异常通常是最好的选择,只需要将这个异常传递给调用者,并不捕获或者恢复该异常,然后在执行某种简单的清理工作后再次抛出这个异常。 恢复中断
:有时候不能抛出这个异常,例如代码是Runnable的一部分时,必须捕获该异常,并通过当前线程上的 interrupt
方法恢复中断。 闭锁是一种同步工具类,可以延迟线程的进度直到其到达终止状态。闭锁可以用来确保某些活动直到其他活动都完成后才继续执行。
CountDownLatch
是一种灵活的闭锁实现,它可以使一个或多个线程等待一组事件发生。闭锁状态有一个计数器,这个计数器被初始化为一个正数,表示等待的事件数量。countDown方法递减计数器,表示已经有一个事件发生了,而await方法会一直阻塞,直到计数器为0,或者等待的线程中断或者超时。
public class TestHarness { public long timeTask(int nThreads, final Runnable task) throws InterruptedException{ final CountDownLatch startGate = new CountDownLatch(1); final CountDownLatch endGate = new CountDownLatch(nThreads); for (int i = 0; i < nThreads; i++) { Thread t = new Thread(){ public void run(){ try { startGate.await(); try{ task.run(); }finally { endGate.countDown(); } }catch (InterruptedException ignored){} } }; t.start(); } long start = System.nanoTime(); startGate.countDown(); endGate.await(); long end = System.nanoTime(); return end - start; } }
FutureTask也可以用做为闭锁。FutureTask实现了Future语义,表示一种抽象的可生成结果的计算。FutureTask表示的计算是通过Callable来实现的。
当FutureTask进入完成状态后,它会永远停在这个状态上。
Future.get行为取决于任务的状态:
FutureTask将计算结果从执行计算的线程传递到获取这个结果的线程,并且FutureTask能够 保证这种传递过程能实现结果的安全发布
。
FutureTask
类实现了 RunnableFuture
接口, RunnableFuture
继承了 Runnable接口和Future接口
,所以它既可以作为Runnable被线程执行,又可以作为Future得到Callable的返回值。 事实上,FutureTask是Future接口的一个唯一实现类。
Callable表示的任务可以抛出受检查的或未受检查的异常,并且任何代码都可能抛出一个Error。无论任务代码抛出什么异常,都会被封装到一个ExecutionException中,并在Future.get中重新抛出。所以当ExcutionException时,可能是以下三种情况之一:
计数信号量(Counting Semaphore)
:用来控制访问某个特定资源的操作数量,或者同时执行某个制定操作的数量。计数信号量还可以用来实现某些资源池,或者对容器加边界。
Semaphore中管理一组许可,许可的初始数量可以通过构造函数来指定。在执行操作的时候,首先获得许可,在使用以后释放许可。如果没有许可,acquire将阻塞直到有许可(或者被中断或超时)。
public class BoundedHashSet<T> { private final Set<T> set; private final Semaphore semaphore; public BoundedHashSet(int bound) { this.set = Collections.synchronizedSet(new HashSet<T>()); semaphore = new Semaphore(bound); } public boolean add(T t) throws InterruptedException { semaphore.acquire(); boolean wasAdded = false; try { wasAdded = set.add(t); return wasAdded; } finally { if (!wasAdded) { semaphore.release(); } } } public boolean remove(T o) { boolean wasRemoved = set.remove(o); if (wasRemoved) { semaphore.release(); } return wasRemoved; } }
这里实现了一个有界阻塞Set容器,信号量的计数值初始化为容器的最大值。每次添加元素之前,要获得一个许可,如果add失败,并没有成功添加上元素,就释放许可。删除成功也释放许可,底层的set实现并不知道关于边界的信息,都是由BoundedHashSet来处理的。
闭锁用于等待事件,而栅栏用来等待其他线程。
CyclicBarrier
可以使一定数量的参与方反复地在栅栏位置汇集,它在并行迭代算法中非常有用:
BrokenBarrierException