一排人在等车的时候,调度员指挥等待坐车的队伍一次进来5个人上车,等这5个人坐车触发,再放进去下一批。
Semaphore就是个计数器,其基本逻辑基于acquire/release。
import java.util.concurrent.Semaphore; public class AbnormalSemaphoreSample { public static void main(String[] args) throws InterruptedException { Semaphore semaphore = new Semaphore(0); for (int i = 0; i < 10; i++) { Thread t = new Thread(new MyWorker(semaphore)); t.start(); } System.out.println("Action...GO!"); semaphore.release(5); System.out.println("Wait for permits off"); while (semaphore.availablePermits()!=0) { Thread.sleep(100L); } System.out.println("Action...GO again!"); semaphore.release(5); } } class MyWorker implements Runnable { private Semaphore semaphore; public MyWorker(Semaphore semaphore) { this.semaphore = semaphore; } @Override public void run() { try { semaphore.acquire(); System.out.println("Executed!"); } catch (InterruptedException e) { e.printStackTrace(); } } }
CountDownLatch和CyclicBarrier的区别:
CountDownLatch实现
import java.util.concurrent.CountDownLatch; public class LatchSample { public static void main(String[] args) throws InterruptedException { CountDownLatch latch = new CountDownLatch(6); for (int i = 0; i < 5; i++) { Thread t = new Thread(new FirstBatchWorker(latch)); t.start(); } for (int i = 0; i < 5; i++) { Thread t = new Thread(new SecondBatchWorker(latch)); t.start(); } // 注意这里也是演示目的的逻辑,并不是推荐的协调方式 while ( latch.getCount() != 1 ){ Thread.sleep(100L); } System.out.println("Wait for first batch finish"); latch.countDown(); } } class FirstBatchWorker implements Runnable { private CountDownLatch latch; public FirstBatchWorker(CountDownLatch latch) { this.latch = latch; } @Override public void run() { System.out.println("First batch executed!"); latch.countDown(); } } class SecondBatchWorker implements Runnable { private CountDownLatch latch; public SecondBatchWorker(CountDownLatch latch) { this.latch = latch; } @Override public void run() { try { latch.await(); System.out.println("Second batch executed!"); } catch (InterruptedException e) { e.printStackTrace(); } } }
CyclicBarrier实现,使用到了barrierAction,当屏障被触发时,Java自动调度该动作。注意,CyclicBarrier会 自动进行重置 。
import java.util.concurrent.CountDownLatch; public class LatchSample { public static void main(String[] args) throws InterruptedException { CountDownLatch latch = new CountDownLatch(6); for (int i = 0; i < 5; i++) { Thread t = new Thread(new FirstBatchWorker(latch)); t.start(); } for (int i = 0; i < 5; i++) { Thread t = new Thread(new SecondBatchWorker(latch)); t.start(); } // 注意这里也是演示目的的逻辑,并不是推荐的协调方式 while ( latch.getCount() != 1 ){ Thread.sleep(100L); } System.out.println("Wait for first batch finish"); latch.countDown(); } } class FirstBatchWorker implements Runnable { private CountDownLatch latch; public FirstBatchWorker(CountDownLatch latch) { this.latch = latch; } @Override public void run() { System.out.println("First batch executed!"); latch.countDown(); } } class SecondBatchWorker implements Runnable { private CountDownLatch latch; public SecondBatchWorker(CountDownLatch latch) { this.latch = latch; } @Override public void run() { try { latch.await(); System.out.println("Second batch executed!"); } catch (InterruptedException e) { e.printStackTrace(); } } }
CopyOnWriteArraySet通过包装CopyOnWriteArrayList来实现。
CopyOnWrite对于任何修改操作,如add、set、remove,都会拷贝原数组,修改后替换原来的数组。
public boolean add(E e) { synchronized (lock) { Object[] elements = getArray(); int len = elements.length; // 拷贝 Object[] newElements = Arrays.copyOf(elements, len + 1); newElements[len] = e; // 替换 setArray(newElements); return true; } } final void setArray(Object[] a) { array = a; }