说说JMM,线程,线程池。一切都为了分布式而行动!
java的文件,需要进行编译,通过java编译编程class文件,class文件变成字节码,装载到类装载器中,通过类装载器进行执行,执行的过程中的一个模型就是下面这个图。
特性
1.可见性
>可见性是指当一个线程修改了共享变量后,其他线程能够立即得知这个修改。通过之前对synchronzed内存语义进行了分析,当线程获取锁时会从主内存中获取共享变量的最新值,释放锁的时候会将共享变量同步到主内存中。从而,synchronized具有可见性。同样的在volatile分析中,会通过在指令中添加lock指令,以实现内存可见性。因此, volatile具有可见性。
2.原子性
原子性是指一个操作是不可中断的,要么全部执行成功要么全部执行失败,有着“同生共死”的感觉。及时在多个线程一起执行的时候,一个操作一旦开始,就不会被其他线程所干扰。
3.顺序性
synchronized语义表示锁在同一时刻只能由一个线程进行获取,当锁被占用后,其他线程只能等待。
在java内存模型中说过,为了性能优化,编译器和处理器会进行指令重排序;也就是说java程序天然的有序性可以总结为:如果在本线程内观察,所有的操作都是有序的;如果在一个线程观察另一个线程,所有的操作都是无序的
由于JVM运行程序的实体是线程,而每个线程创建时JVM都会为其创建一个工作内存(有些地方称为栈空间),用于存储线程私有的数据,而Java内存模型中规定所有变量都存储在主内存,主内存是共享内存区域,所有线程都可以访问,但线程对变量的操作(读取赋值等)必须在工作内存中进行,首先要将变量从主内存拷贝的自己的工作内存空间,然后对变量进行操作,操作完成后再将变量写回主内存,不能直接操作主内存中的变量,工作内存中存储着主内存中的变量副本拷贝,前面说过,工作内存是每个线程的私有数据区域,因此不同的线程间无法访问对方的工作内存,线程间的通信(传值)必须通过主内存来完成。
4.Happens-Before原则
线程是一个操作系统概念。操作系统负责这个线程的创建、挂起、运行、阻塞和终结操作。而操作系统创建线程、切换线程状态、终结线程都要进行CPU调度——这是一个耗费时间和系统资源的事情。
生命周期
>Java当中,线程通常都有五种状态,创建、就绪、运行、阻塞和死亡。
可以用过jstack 或者idea debug快照显示状态
线程的优先级是将该线程的重要性传给了调度器、cpu处理线程顺序有一定的不确定,但是调度器会倾向于优先权高的先执行
Thread、Runnable、Callable
Runnable和Thread区别:
1、效果上没区别,写法上的区别而已。
2、没有可比性,Thread实现了Runnable接口并进行了扩展,我们通常拿来进行比较只是写法上的比较,而Thread和Runnable的实质是实现的关系,不是同类东西。
3、Callable1.5引入,具有返回值,并且支持泛型,并且不是run方法了是call方法。
线程的切换需要耗时,线程池直接从管理池子里面的线程,效率更高。
Java.util.concurrent中的Executor(中文翻译是执行器)是管理咱们之前的Thread线程的。目的是是简化并发编程。
ScheduledThreadPoolExecutor和ThreadPoolExecutor。
构造器:核心数量,任务队列容器,存活时间,线程工厂,处理器
public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime,//存活时间 TimeUnit unit, BlockingQueue<Runnable> workQueue, //阻塞队列 ThreadFactory threadFactory,//线程工厂 RejectedExecutionHandler handler) {//hander处理
Execute(submit)方法 ->java.util.concurrent.ThreadPoolExecutor#execute
简要分析一下execute源码:执行一个Runnable对象时,首先通过workerCountOf(c)获取线程池中线程的数量,如果池中的数量小于corePoolSize就调用addWorker添加一个线程来执行这个任务。否则通过workQueue.offer(command)方法入列。如果入列成功还需要在一次判断池中的线程数,因为我们创建线程池时可能要求核心线程数量为0,所以我们必须使用addWorker(null, false)来创建一个临时线程去阻塞队列中获取任务来执行。
if (command == null) throw new NullPointerException(); int c = ctl.get(); //判断是否小于核心数量,是直接新增work成功后直接退出 if (workerCountOf(c) < corePoolSize) { if (addWorker(command, true)) return; c = ctl.get();// 增加失败后继续获取标记 } //判断是运行状态并且扔到workQueue里成功后 if (isRunning(c) && workQueue.offer(command)) { int recheck = ctl.get(); //再次check判断运行状态如果是非运行状态就移除出去&reject掉 if (! isRunning(recheck) && remove(command)) reject(command); else if (workerCountOf(recheck) == 0) //否则发现可能运行线程数是0那么增加一个null的worker。 addWorker(null, false); } else if (!addWorker(command, false)) //直接增加worker如果不成功直接reject reject(command);
addWorker方法
>总结:
//这个两个for循环主要是判断能否增加一个线程,
//外循环来判断线程池的状态
//内循环主要是个增加线程数的CAS操作
第一个参数firstTask不为null,则创建的线程就会先执行firstTask对象,然后去阻塞队列中取任务,否直接到阻塞队列中获取任务来执行。第二个参数,core参数为真,则用corePoolSize作为池中线程数量的最大值;为假,则以maximumPoolSize作为池中线程数量的最大值。
T.start方法 run方法执行>调用了runWorker方法
retry: for (;;) { int c = ctl.get(); int rs = runStateOf(c); // Check if queue empty only if necessary. if (rs >= SHUTDOWN && ! (rs == SHUTDOWN && firstTask == null && ! workQueue.isEmpty())) return false;// 两种情况1.如果非关闭状态 2.不是这种情况(停止状态并且是null对象并且workQueue不等于null) for (;;) { int wc = workerCountOf(c); if (wc >= CAPACITY || wc >= (core ? corePoolSize : maximumPoolSize)) return false;// 判断是否饱和容量了 if (compareAndIncrementWorkerCount(c)) //增加一个work数量 然后跳出去 break retry; c = ctl.get(); //增加work失败后继续递归 if (runStateOf(c) != rs) continue retry; // else CAS failed due to workerCount change; retry inner loop } } boolean workerStarted = false; boolean workerAdded = false; Worker w = null; try { w = new Worker(firstTask);//增加一个worker final Thread t = w.thread; if (t != null) {//判断是否 为null final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try { // Recheck while holding lock. // Back out on ThreadFactory failure or if // shut down before lock acquired. 锁定后并重新检查下 是否存在线程工厂的失败或者锁定前的关闭 int rs = runStateOf(ctl.get()); if (rs < SHUTDOWN || (rs == SHUTDOWN && firstTask == null)) { if (t.isAlive()) // precheck that t is startable throw new IllegalThreadStateException(); workers.add(w); //增加work int s = workers.size(); if (s > largestPoolSize) largestPoolSize = s; workerAdded = true; } } finally { mainLock.unlock(); } if (workerAdded) { //本次要是新增加work成功就调用start运行 t.start(); workerStarted = true; } } } finally { if (! workerStarted) addWorkerFailed(w); } return workerStarted;
runWorker
>总结:获取任务、调用线程run方法
Thread wt = Thread.currentThread();//1.取到当前线程 Runnable task = w.firstTask; w.firstTask = null; w.unlock(); // allow interrupts boolean completedAbruptly = true; try { while (task != null || (task = getTask()) != null) { //获取任务 看看是否能拿到 点进入 Task中的队列poll take是真正拿到task //workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) : 超时取出 w.lock(); // If pool is stopping, ensure thread is interrupted; // if not, ensure thread is not interrupted. This // requires a recheck in second case to deal with // shutdownNow race while clearing interrupt if ((runStateAtLeast(ctl.get(), STOP) || (Thread.interrupted() && runStateAtLeast(ctl.get(), STOP))) && !wt.isInterrupted()) wt.interrupt();// 确保线程是能中断的 try { beforeExecute(wt, task); //开始任务前的钩子 Throwable thrown = null; try { task.run();//执行任务 } catch (RuntimeException x) { thrown = x; throw x; } catch (Error x) { thrown = x; throw x; } catch (Throwable x) { thrown = x; throw new Error(x); } finally { afterExecute(task, thrown); //任务后的钩子 } } finally { task = null; w.completedTasks++; w.unlock(); } } completedAbruptly = false; } finally { processWorkerExit(w, completedAbruptly); //看这儿 }
if (completedAbruptly) // If abrupt, then workerCount wasn't adjusted decrementWorkerCount(); final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try { completedTaskCount += w.completedTasks; workers.remove(w); //移除work } finally { mainLock.unlock(); } tryTerminate(); int c = ctl.get(); if (runStateLessThan(c, STOP)) { //判断是否还有任务 if (!completedAbruptly) { int min = allowCoreThreadTimeOut ? 0 : corePoolSize; if (min == 0 && ! workQueue.isEmpty()) min = 1; if (workerCountOf(c) >= min) return; // replacement not needed } addWorker(null, false);
四种策略 默认是AbortPolicy
名称 | 意义 |
---|---|
AbortPolicy (默认) | 当任务添加到线程池中被拒绝时,它将抛出 RejectedExecutionException 异常。 |
CallerRunsPolicy | 当任务添加到线程池中被拒绝时,会在线程池当前正在运行的Thread线程池中处理被拒绝的任务。 |
DiscardOldestPolicy | 当任务添加到线程池中被拒绝时,线程池会放弃等待队列中最旧的未处理任务,然后将被拒绝的任务添加到等待队列中 |
DiscardPolicy | 当任务添加到线程池中被拒绝时,线程池将丢弃被拒绝的任务。 |
ScheduledThreadPoolExecutor的实现
>ScheduledThreadPoolExecutor继承了ThreadPoolExecutor并且实现了ScheduledExecutorService接口。ScheduledExecutorService定义了使用ScheduledThreadPoolExecutor提交任务的接口schedule。schedule是一组返回值和参数不同的overwrite接口。以供用户选择提交任务的模式,比如是一次性的定时任务还是周期性的任务,需不需要返回结果等等。不管是哪个schedule接口,他的实现功能主要是把用户提交的任务和模式封装成ScheduledFutureTask,然后调用delayedExecute(Runnable command)。所以delayedExecute方法是真正的处理任务的方法。
PS:并发始终还是围绕这线程的判断来进行的一步一步的操作,所以提高服务程序效率的一个手段就是尽可能减少创建和销毁对象的次数,特别是一些很耗资源的对象创建和销毁。这块只做了解吧。我已经写懵逼了。
百度未收录
>>原创文章,欢迎转载。转载请注明:转载自IT人故事会,谢谢!
>>原文链接地址:上一篇:已是最新文章