private ExecutorService pool = Executors.newFixedThreadPool(500); public void handle() throws IOException { // 处理请求 try (ServerSocketChannel ssc = ServerSocketChannel.open().bind(new InetSocketAddress(8080))) { while (true) { // 接收请求 SocketChannel sc = ssc.accept(); // 将请求处理任务提交给线程池 pool.execute(() -> { try { // 读Socket ByteBuffer rb = ByteBuffer.allocateDirect(1024); sc.read(rb); TimeUnit.SECONDS.sleep(1); // 写Socket ByteBuffer wb = (ByteBuffer) rb.flip(); sc.write(wb); sc.close(); } catch (IOException | InterruptedException ignored) { } }); } } finally { pool.shutdown(); } }
private ExecutorService pool = new ThreadPoolExecutor(50, 500, 60L, TimeUnit.SECONDS, new LinkedBlockingQueue<>(2000), // 有界队列 runnable -> new Thread(runnable, "echo-" + runnable.hashCode()), // ThreadFactory new ThreadPoolExecutor.CallerRunsPolicy()); // 拒绝策略
// L1、L2阶段共用线程池 ExecutorService pool = Executors.newFixedThreadPool(2); CountDownLatch l1 = new CountDownLatch(2); for (int i = 0; i < 2; i++) { System.out.println("L1"); pool.execute(() -> { CountDownLatch l2 = new CountDownLatch(2); for (int j = 0; j < 2; j++) { pool.execute(() -> { System.out.println("L2"); l2.countDown(); }); } try { // 线程池中的2个线程都阻塞在l2.await(),没有多余线程去执行L2阶段的任务(在线程池的任务队列中等待) l2.await(); // line 28 } catch (InterruptedException ignored) { } l1.countDown(); }); } l1.await(); // 输出 // L1 // L1
// 阻塞在l2.await() "pool-1-thread-2" #11 prio=5 os_prio=31 tid=0x00007f934e8f5000 nid=0x4303 waiting on condition [0x000070000792d000] java.lang.Thread.State: WAITING (parking) at sun.misc.Unsafe.park(Native Method) - parking to wait for <0x000000079609bd58> (a java.util.concurrent.CountDownLatch$Sync) at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175) at java.util.concurrent.locks.AbstractQueuedSynchronizer.parkAndCheckInterrupt(AbstractQueuedSynchronizer.java:836) at java.util.concurrent.locks.AbstractQueuedSynchronizer.doAcquireSharedInterruptibly(AbstractQueuedSynchronizer.java:997) at java.util.concurrent.locks.AbstractQueuedSynchronizer.acquireSharedInterruptibly(AbstractQueuedSynchronizer.java:1304) at java.util.concurrent.CountDownLatch.await(CountDownLatch.java:231) at time.geek.worker.thread.DeadLock.lambda$deadLockTest$1(DeadLock.java:28) at time.geek.worker.thread.DeadLock$$Lambda$1/1221555852.run(Unknown Source) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:748) // 阻塞在l2.await() "pool-1-thread-1" #10 prio=5 os_prio=31 tid=0x00007f9350142800 nid=0x3c03 waiting on condition [0x000070000782a000] java.lang.Thread.State: WAITING (parking) at sun.misc.Unsafe.park(Native Method) - parking to wait for <0x0000000795ff56a8> (a java.util.concurrent.CountDownLatch$Sync) at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175) at java.util.concurrent.locks.AbstractQueuedSynchronizer.parkAndCheckInterrupt(AbstractQueuedSynchronizer.java:836) at java.util.concurrent.locks.AbstractQueuedSynchronizer.doAcquireSharedInterruptibly(AbstractQueuedSynchronizer.java:997) at java.util.concurrent.locks.AbstractQueuedSynchronizer.acquireSharedInterruptibly(AbstractQueuedSynchronizer.java:1304) at java.util.concurrent.CountDownLatch.await(CountDownLatch.java:231) at time.geek.worker.thread.DeadLock.lambda$deadLockTest$1(DeadLock.java:28) at time.geek.worker.thread.DeadLock$$Lambda$1/1221555852.run(Unknown Source) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:748)
转载请注明出处:http://zhongmingmao.me/2019/05/24/java-concurrent-worker-thread/
访问原文「 Java并发 -- Worker Thread模式 」获取最佳阅读体验并参与讨论