转载

『互联网架构』软件架构-分布式系列并发编程(29)

说说JMM,线程,线程池。一切都为了分布式而行动!

『互联网架构』软件架构-分布式系列并发编程(29)

JMM

  • 理解下面的图

java的文件,需要进行编译,通过java编译编程class文件,class文件变成字节码,装载到类装载器中,通过类装载器进行执行,执行的过程中的一个模型就是下面这个图。

『互联网架构』软件架构-分布式系列并发编程(29)

『互联网架构』软件架构-分布式系列并发编程(29)

  • 特性

    1.可见性

    >可见性是指当一个线程修改了共享变量后,其他线程能够立即得知这个修改。通过之前对synchronzed内存语义进行了分析,当线程获取锁时会从主内存中获取共享变量的最新值,释放锁的时候会将共享变量同步到主内存中。从而,synchronized具有可见性。同样的在volatile分析中,会通过在指令中添加lock指令,以实现内存可见性。因此, volatile具有可见性。

2.原子性

原子性是指一个操作是不可中断的,要么全部执行成功要么全部执行失败,有着“同生共死”的感觉。及时在多个线程一起执行的时候,一个操作一旦开始,就不会被其他线程所干扰。

3.顺序性

synchronized语义表示锁在同一时刻只能由一个线程进行获取,当锁被占用后,其他线程只能等待。

在java内存模型中说过,为了性能优化,编译器和处理器会进行指令重排序;也就是说java程序天然的有序性可以总结为:如果在本线程内观察,所有的操作都是有序的;如果在一个线程观察另一个线程,所有的操作都是无序的

由于JVM运行程序的实体是线程,而每个线程创建时JVM都会为其创建一个工作内存(有些地方称为栈空间),用于存储线程私有的数据,而Java内存模型中规定所有变量都存储在主内存,主内存是共享内存区域,所有线程都可以访问,但线程对变量的操作(读取赋值等)必须在工作内存中进行,首先要将变量从主内存拷贝的自己的工作内存空间,然后对变量进行操作,操作完成后再将变量写回主内存,不能直接操作主内存中的变量,工作内存中存储着主内存中的变量副本拷贝,前面说过,工作内存是每个线程的私有数据区域,因此不同的线程间无法访问对方的工作内存,线程间的通信(传值)必须通过主内存来完成。

『互联网架构』软件架构-分布式系列并发编程(29)

4.Happens-Before原则

『互联网架构』软件架构-分布式系列并发编程(29)

什么是线程

线程是一个操作系统概念。操作系统负责这个线程的创建、挂起、运行、阻塞和终结操作。而操作系统创建线程、切换线程状态、终结线程都要进行CPU调度——这是一个耗费时间和系统资源的事情。

  • 生命周期

    >Java当中,线程通常都有五种状态,创建、就绪、运行、阻塞和死亡。

  1. 创建状态。在生成线程对象,并没有调用该对象的start方法,这是线程处于创建状态。
  2. 就绪状态。当调用了线程对象的start方法之后,该线程就进入了就绪状态,但是此时线程调度程序还没有把该线程设置为当前线程,此时处于就绪状态。在线程运行之后,从等待或者睡眠中回来之后,也会处于就绪状态。
  3. 运行状态。线程调度程序将处于就绪状态的线程设置为当前线程,此时线程就进入了运行状态,开始运行run函数当中的代码。
  4. 阻塞状态。线程正在运行的时候,被暂停,通常是为了等待某个时间的发生(比如说某项资源就绪)之后再继续运行。sleep,suspend,wait等方法都可以导致线程阻塞。
  5. 死亡状态。如果一个线程的run方法执行结束或者调用stop方法后,该线程就会死亡。对于已经死亡的线程,无法再使用start方法令其进入就绪。

『互联网架构』软件架构-分布式系列并发编程(29)

可以用过jstack 或者idea debug快照显示状态

  1. “Low Memory Detector” 负责对可使用内存进行检测,如果发现可用内存低,分配新的内存空间。
  2. “CompilerThread0” 用来调用JITing,实时编译装卸class。
  3. “Signal Dispatcher” 负责分发内部事件。
  4. “Finalizer” 负责调用Finalizer方法。
  5. “Reference Handler” 负责处理引用。
  6. “main” 是主线程。
  7. “VM Thread”, “VM Periodic Task Thread”从名字上看是虚机内部线程。
  • 状态描述
  1. NEW 状态是指线程刚创建, 尚未启动
  2. RUNNABLE 状态是线程正在正常运行中, 当然可能会有某种耗时计算/IO等待的操作/CPU时间片切换等, 这个状态下发生的等待一般是其他系统资源, 而不是锁, Sleep等
  3. BLOCKED 这个状态下, 是在多个线程有同步操作的场景, 比如正在等待另一个线程的synchronized 块的执行释放, 或者可重入的 synchronized块里别人调用wait() 方法, 也就是这里是线程在等待进入临界区
  4. WAITING 这个状态下是指线程拥有了某个锁之后, 调用了他的wait方法, 等待其他线程/锁拥有者调用 notify / notifyAll 一遍该线程可以继续下一步操作, 这里要区分 BLOCKED 和 WATING 的区别, 一个是在临界点外面等待进入, 一个是在理解点里面wait等待别人notify, 线程调用了join方法 join了另外的线程的时候, 也会进入WAITING状态, 等待被他join的线程执行结
  5. TIMED_WAITING 这个状态就是有限的(时间限制)的WAITING, 一般出现在调用wait(long), join(long)等情况下, 另外一个线程sleep后, 也会进入 TIMED_WAITING状态TERMINATED 这个状态下表示 该线程的run方法已经执行完毕了, 基本上就等于死亡了(当时如果线程被持久持有, 可能不会被回收)
  • 优先级Priority

    线程的优先级是将该线程的重要性传给了调度器、cpu处理线程顺序有一定的不确定,但是调度器会倾向于优先权高的先执行

  • 实现方式

    Thread、Runnable、Callable

    Runnable和Thread区别:

    1、效果上没区别,写法上的区别而已。

    2、没有可比性,Thread实现了Runnable接口并进行了扩展,我们通常拿来进行比较只是写法上的比较,而Thread和Runnable的实质是实现的关系,不是同类东西。

    3、Callable1.5引入,具有返回值,并且支持泛型,并且不是run方法了是call方法。

  • 为什么有线程还要用线程池

    线程的切换需要耗时,线程池直接从管理池子里面的线程,效率更高。

Executor框架体系

Java.util.concurrent中的Executor(中文翻译是执行器)是管理咱们之前的Thread线程的。目的是是简化并发编程。

ScheduledThreadPoolExecutor和ThreadPoolExecutor。

『互联网架构』软件架构-分布式系列并发编程(29)

『互联网架构』软件架构-分布式系列并发编程(29)

构造器:核心数量,任务队列容器,存活时间,线程工厂,处理器

public ThreadPoolExecutor(int corePoolSize,
                          int maximumPoolSize,
                          long keepAliveTime,//存活时间
                          TimeUnit unit,
                          BlockingQueue<Runnable> workQueue, //阻塞队列
                          ThreadFactory threadFactory,//线程工厂
                          RejectedExecutionHandler handler) {//hander处理
  • LinkedBlockingQueue
  • Execute(submit)方法 ->java.util.concurrent.ThreadPoolExecutor#execute

简要分析一下execute源码:执行一个Runnable对象时,首先通过workerCountOf(c)获取线程池中线程的数量,如果池中的数量小于corePoolSize就调用addWorker添加一个线程来执行这个任务。否则通过workQueue.offer(command)方法入列。如果入列成功还需要在一次判断池中的线程数,因为我们创建线程池时可能要求核心线程数量为0,所以我们必须使用addWorker(null, false)来创建一个临时线程去阻塞队列中获取任务来执行。

if (command == null)
    throw new NullPointerException();
int c = ctl.get();
//判断是否小于核心数量,是直接新增work成功后直接退出 
if (workerCountOf(c) < corePoolSize) {
    if (addWorker(command, true))
        return;
    c = ctl.get();// 增加失败后继续获取标记
}
//判断是运行状态并且扔到workQueue里成功后
if (isRunning(c) && workQueue.offer(command)) {
    int recheck = ctl.get();
//再次check判断运行状态如果是非运行状态就移除出去&reject掉
    if (! isRunning(recheck) && remove(command))
        reject(command);
    else if (workerCountOf(recheck) == 0) //否则发现可能运行线程数是0那么增加一个null的worker。
        addWorker(null, false);
}
else if (!addWorker(command, false)) //直接增加worker如果不成功直接reject
    reject(command);
  • addWorker方法

    >总结:

    //这个两个for循环主要是判断能否增加一个线程,

    //外循环来判断线程池的状态

    //内循环主要是个增加线程数的CAS操作

    第一个参数firstTask不为null,则创建的线程就会先执行firstTask对象,然后去阻塞队列中取任务,否直接到阻塞队列中获取任务来执行。第二个参数,core参数为真,则用corePoolSize作为池中线程数量的最大值;为假,则以maximumPoolSize作为池中线程数量的最大值。

    T.start方法 run方法执行>调用了runWorker方法

retry:
for (;;) {
    int c = ctl.get();
    int rs = runStateOf(c);

    // Check if queue empty only if necessary. 
    if (rs >= SHUTDOWN &&
        ! (rs == SHUTDOWN &&
           firstTask == null &&
           ! workQueue.isEmpty()))
        return false;// 两种情况1.如果非关闭状态  2.不是这种情况(停止状态并且是null对象并且workQueue不等于null)

    for (;;) {
        int wc = workerCountOf(c);
        if (wc >= CAPACITY ||
            wc >= (core ? corePoolSize : maximumPoolSize))
            return false;// 判断是否饱和容量了
        if (compareAndIncrementWorkerCount(c)) //增加一个work数量 然后跳出去
            break retry;
        c = ctl.get();  //增加work失败后继续递归
        if (runStateOf(c) != rs)
            continue retry;
        // else CAS failed due to workerCount change; retry inner loop
    }
}

boolean workerStarted = false;
boolean workerAdded = false;
Worker w = null;
try {
    w = new Worker(firstTask);//增加一个worker
    final Thread t = w.thread;
    if (t != null) {//判断是否 为null
        final ReentrantLock mainLock = this.mainLock;
        mainLock.lock();
        try {
            // Recheck while holding lock.
            // Back out on ThreadFactory failure or if
            // shut down before lock acquired.  锁定后并重新检查下 是否存在线程工厂的失败或者锁定前的关闭
            int rs = runStateOf(ctl.get());

            if (rs < SHUTDOWN ||
                (rs == SHUTDOWN && firstTask == null)) {
                if (t.isAlive()) // precheck that t is startable
                    throw new IllegalThreadStateException();  
                workers.add(w);   //增加work
                int s = workers.size();
                if (s > largestPoolSize)
                    largestPoolSize = s;
                workerAdded = true;
            }
        } finally {
            mainLock.unlock();
        }
        if (workerAdded) { //本次要是新增加work成功就调用start运行
            t.start();
            workerStarted = true;
        }
    }
} finally {
    if (! workerStarted)
        addWorkerFailed(w);
}
return workerStarted;
  • runWorker

    >总结:获取任务、调用线程run方法

Thread wt = Thread.currentThread();//1.取到当前线程
Runnable task = w.firstTask;
w.firstTask = null;
w.unlock(); // allow interrupts
boolean completedAbruptly = true;
try {
    while (task != null || (task = getTask()) != null) { //获取任务 看看是否能拿到 点进入
Task中的队列poll take是真正拿到task 
//workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
超时取出
        w.lock();
        // If pool is stopping, ensure thread is interrupted;
        // if not, ensure thread is not interrupted.  This
        // requires a recheck in second case to deal with
        // shutdownNow race while clearing interrupt
        if ((runStateAtLeast(ctl.get(), STOP) ||
             (Thread.interrupted() &&
              runStateAtLeast(ctl.get(), STOP))) &&
            !wt.isInterrupted())
            wt.interrupt();// 确保线程是能中断的
        try {
            beforeExecute(wt, task); //开始任务前的钩子
            Throwable thrown = null;
            try {
                task.run();//执行任务
            } catch (RuntimeException x) {
                thrown = x; throw x;
            } catch (Error x) {
                thrown = x; throw x;
            } catch (Throwable x) {
                thrown = x; throw new Error(x);
            } finally {
                afterExecute(task, thrown); //任务后的钩子
            }
        } finally {
            task = null;
            w.completedTasks++;
            w.unlock();
        }
    }
    completedAbruptly = false;
} finally {
    processWorkerExit(w, completedAbruptly); //看这儿
}
  • processWorkerExit
if (completedAbruptly) // If abrupt, then workerCount wasn't adjusted
    decrementWorkerCount();

final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
    completedTaskCount += w.completedTasks;
    workers.remove(w);  //移除work
} finally {
    mainLock.unlock();
}

tryTerminate();

int c = ctl.get();
if (runStateLessThan(c, STOP)) { //判断是否还有任务
    if (!completedAbruptly) {
        int min = allowCoreThreadTimeOut ? 0 : corePoolSize;
        if (min == 0 && ! workQueue.isEmpty())
            min = 1;
        if (workerCountOf(c) >= min)
            return; // replacement not needed
    }
    addWorker(null, false);

线程池的拒绝策略

四种策略 默认是AbortPolicy

『互联网架构』软件架构-分布式系列并发编程(29)

名称 意义
AbortPolicy (默认) 当任务添加到线程池中被拒绝时,它将抛出 RejectedExecutionException 异常。
CallerRunsPolicy 当任务添加到线程池中被拒绝时,会在线程池当前正在运行的Thread线程池中处理被拒绝的任务。
DiscardOldestPolicy 当任务添加到线程池中被拒绝时,线程池会放弃等待队列中最旧的未处理任务,然后将被拒绝的任务添加到等待队列中
DiscardPolicy 当任务添加到线程池中被拒绝时,线程池将丢弃被拒绝的任务。
  • ScheduledThreadPoolExecutor的实现

    >ScheduledThreadPoolExecutor继承了ThreadPoolExecutor并且实现了ScheduledExecutorService接口。ScheduledExecutorService定义了使用ScheduledThreadPoolExecutor提交任务的接口schedule。schedule是一组返回值和参数不同的overwrite接口。以供用户选择提交任务的模式,比如是一次性的定时任务还是周期性的任务,需不需要返回结果等等。不管是哪个schedule接口,他的实现功能主要是把用户提交的任务和模式封装成ScheduledFutureTask,然后调用delayedExecute(Runnable command)。所以delayedExecute方法是真正的处理任务的方法。

PS:并发始终还是围绕这线程的判断来进行的一步一步的操作,所以提高服务程序效率的一个手段就是尽可能减少创建和销毁对象的次数,特别是一些很耗资源的对象创建和销毁。这块只做了解吧。我已经写懵逼了。

百度未收录

>>原创文章,欢迎转载。转载请注明:转载自IT人故事会,谢谢!

>>原文链接地址:上一篇:

已是最新文章

原文  https://idig8.com/2019/03/09/hulianwangjiagouruanjianjiagou-fenbushixiliebingfabiancheng29/
正文到此结束
Loading...