线程池 ThreadPoolExecutor
在运行的过程中,业务并发量变动,需要不停服务调整线程池的线程数, ThreadPoolExecutor
支持动态调整 corePoolSize
与 maximumPoolSize
的值。
public class ThreadChangeTest { public static void main(String[] args) throws InterruptedException { ThreadPoolExecutor executor = new ThreadPoolExecutor(3, 10, 10l, TimeUnit.SECONDS, new LinkedBlockingQueue<>(10)); int count = 0; while (true) { Thread.sleep(1000l); for (int i = 0; i < 9; i++) { executor.execute(() -> { /*try { Thread.sleep(1l); } catch (InterruptedException e) { e.printStackTrace(); }*/ System.out.println("------------core:/t" + executor.getCorePoolSize() + "/tactive:/t" + executor.getActiveCount() + "/tmax:/t" + executor.getMaximumPoolSize()); }); } count++; if (count == 20) { executor.setCorePoolSize(2); executor.setMaximumPoolSize(9); System.out.println("----------------------------------------"); } if (count == 100) { executor.shutdown(); System.out.println("============================================="); break; } } Thread.currentThread().join(); } }
在程序运行中动态修改线程池 corePoolSize
与 maximumPoolSize
的值
public void setCorePoolSize(int corePoolSize) { if (corePoolSize < 0) throw new IllegalArgumentException(); int delta = corePoolSize - this.corePoolSize; this.corePoolSize = corePoolSize; //核心线程调小,中断空闲任务,否则线程池的当前任务结束,自动调小 if (workerCountOf(ctl.get()) > corePoolSize) interruptIdleWorkers(); //核心线程数调大后,从队列取任务 else if (delta > 0) { // We don't really know how many new threads are "needed". // As a heuristic, prestart enough new workers (up to new // core size) to handle the current number of tasks in // queue, but stop if queue becomes empty while doing so. //队列大小是否可以取任务 int k = Math.min(delta, workQueue.size()); //队列有任务就取,否则break while (k-- > 0 && addWorker(null, true)) { if (workQueue.isEmpty()) break; } } } public void setMaximumPoolSize(int maximumPoolSize) { if (maximumPoolSize <= 0 || maximumPoolSize < corePoolSize) throw new IllegalArgumentException(); this.maximumPoolSize = maximumPoolSize; //中断空闲任务,否则线程池的当前任务结束,自动调小 if (workerCountOf(ctl.get()) > maximumPoolSize) interruptIdleWorkers(); }
源码看出:线程池的调节时直接设置 corePoolSize
与 maximumPoolSize
的值
其中
workerCountOf(ctl.get())
代表工作任务线程数,参考我的博客JDK8线程池-ThreadPoolExecutor源码解析
调大 corePoolSize
与 maximumPoolSize
,线程池运行过程中自动生效,线程池处理逻辑增强。
调小 corePoolSize
与 maximumPoolSize
均会执行
interruptIdleWorkers();
private void interruptIdleWorkers() { interruptIdleWorkers(false); } private void interruptIdleWorkers(boolean onlyOne) { final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try { //workers是所有已存在的线程,包括空闲线程 for (Worker w : workers) { Thread t = w.thread; //这里注意,非常关键,加锁w.tryLock() if (!t.isInterrupted() && w.tryLock()) { try { t.interrupt(); } catch (SecurityException ignore) { } finally { w.unlock(); } } //从上面的参数onlyOne is false if (onlyOne) break; } } finally { mainLock.unlock(); } }
这里的 workers
注意:是一个 HashSet
,存放规则:
核心线程优先占满,即使核心线程有空闲,新任务来了会优先开启新的线程而不是复用,核心线程仅在占满才会复用,然后使用队列,最后使用 max
线程, max
线程数对应的 workers
会动态变化,
参考我的博客 JDK8
线程池- ThreadPoolExecutor
源码解析
我们看 ThreadPoolExecutor
执行任务的源码,参考我的博客 JDK8
线程池- ThreadPoolExecutor
源码解析
final void runWorker(Worker w) { Thread wt = Thread.currentThread(); Runnable task = w.firstTask; w.firstTask = null; w.unlock(); // allow interrupts boolean completedAbruptly = true; try { while (task != null || (task = getTask()) != null) { //这里注意,加锁了,非常关键 w.lock(); // If pool is stopping, ensure thread is interrupted; // if not, ensure thread is not interrupted. This // requires a recheck in second case to deal with // shutdownNow race while clearing interrupt if ((runStateAtLeast(ctl.get(), STOP) || (Thread.interrupted() && runStateAtLeast(ctl.get(), STOP))) && !wt.isInterrupted()) wt.interrupt(); try { beforeExecute(wt, task); Throwable thrown = null; try { //任务执行 task.run(); } catch (RuntimeException x) { thrown = x; throw x; } catch (Error x) { thrown = x; throw x; } catch (Throwable x) { thrown = x; throw new Error(x); } finally { afterExecute(task, thrown); } } finally { task = null; w.completedTasks++; w.unlock(); } } completedAbruptly = false; } finally { processWorkerExit(w, completedAbruptly); } }
可以看出在任务拿出来后,立即加锁
包括任务执行的过程都是加锁的。
private final class Worker extends AbstractQueuedSynchronizer implements Runnable { Worker(Runnable firstTask) { setState(-1); // inhibit interrupts until runWorker this.firstTask = firstTask; this.thread = getThreadFactory().newThread(this); } /** Delegates main run loop to outer runWorker */ public void run() { runWorker(this); } protected boolean tryAcquire(int unused) { if (compareAndSetState(0, 1)) { setExclusiveOwnerThread(Thread.currentThread()); return true; } return false; } protected boolean tryRelease(int unused) { setExclusiveOwnerThread(null); setState(0); return true; } public void lock() { acquire(1); } public boolean tryLock() { return tryAcquire(1); } public void unlock() { release(1); } public boolean isLocked() { return isHeldExclusively(); }
使用了AQS,自定义了加锁方式CAS模式
public abstract class AbstractQueuedSynchronizer extends AbstractOwnableSynchronizer implements java.io.Serializable { public final void acquire(int arg) { if (!tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) selfInterrupt(); } public final boolean release(int arg) { if (tryRelease(arg)) { Node h = head; if (h != null && h.waitStatus != 0) unparkSuccessor(h); return true; } return false; } }
可以看出使用 tryAcquire
和 tryRelease
,均重写方法
protected boolean tryAcquire(int unused) { if (compareAndSetState(0, 1)) { setExclusiveOwnerThread(Thread.currentThread()); return true; } return false; }
compareAndSetState(0, 1)
使用上面的代码加锁,意味着线程执行过程中都是加锁的,不会被销毁,只会销毁空闲线程,或者当前线程执行结束销毁。
线程池调小 corePoolSize
与 maximumPoolSize
对当前正在执行的任务没有影响。
队列是不可以动态调整的。
private final int capacity;
线程池 corePoolSize
与 maximumPoolSize
调大注意 max
线程数不要调过大,计算机资源是有限的。
线程池的队列初始化大小注意,不能动态调节,队列占用的是堆内存,注意JVM的内存大小与GC能力,尽量减小大对象的存在。
线程池 corePoolSize
与 maximumPoolSize
和队列调小注意,线程池的处理能力减弱,可能会执行拒绝策略。
如果大家喜欢我的文章,可以关注个人订阅号。欢迎随时留言、交流。如果想加入微信群的话一起讨论的话,请加管理员简栈文化-小助手(lastpass4u),他会拉你们进群。