转载

AQS(AbstractQueuedSynchronizer)队列

AQS(AbstractQueuedSynchronizer)队列

Node节点

1、Node类是对要访问同步代码的线程的封装,包含了线程本身及其状态叫waitStatus。waitStatus五种状态如下:

  • SIGNAL 值为-1,当一个节点的状态为SIGNAL时就意味着在等待获取同步状态,前节点是头节点也就是获取同步状态的节点

  • CANCELLED 值为1、因为超时或者中断,结点会被设置为取消状态,被取消状态的结点不应该去竞争锁,只能保持取消状态不变,不能转换为其他状态。处于这种状态的结点会被踢出队列,被GC回收( 一旦节点状态值为1说明被取消,那么这个节点会从同步队列中删除

  • CONDITION 值为-2、节点在等待队列中、节点线程等待在Condition、当其它线程对Condition调用了singal()方法该节点会从等待队列中移到同步队列中

  • PROPAGATE 值为-3、表示下一次共享式同步状态获取将会被无条件的被传播下去(读写锁中存在的状态,代表后续还有资源,可以多个线程同时拥有同步状态)

  • initial 值为0、表示当前没有线程获取锁(初始状态)

2、Node类有两个常量,SHARED和EXCLUSIVE,分别代表共享模式和独占模式

Reentrantlock

ReentrantLock与Synchronized比较

ReentrantLock是Lock的实现类,是一个互斥的同步锁。

  • 从功能角度,ReentrantLock比Synchronized的同步操作更精细(因为可以像普通对象一样使用),甚至实现Synchronized没有的高级功能,如:
    • 等待可中断 :当持有锁的线程长期不释放锁的时候,正在等待的线程可以选择放弃等待,对处理执行时间非常长的同步块很有用。
    • 带超时的获取锁尝试 :在指定的时间范围内获取锁,如果时间到了仍然无法获取则返回。
    • 可以判断是否有线程在排队等待获取锁
    • 可以响应中断请求 :与Synchronized不同,当获取到锁的线程被中断时,能够响应中断,中断异常将会被抛出,同时锁会被释放。
    • 可以实现公平锁
  • 从锁释放角度,Synchronized在JVM层面上实现的,不但可以通过一些监控工具监控Synchronized的锁定,而且在代码执行出现异常时,JVM会自动释放锁定;但是使用Lock则不行,Lock是通过代码实现的,要保证锁定一定会被释放,就必须将unLock()放到finally{}中。
  • 从性能角度,Synchronized早期实现比较低效,对比ReentrantLock,大多数场景性能都相差较大。但是在Java6中对其进行了非常多的改进,在竞争不激烈时,Synchronized的性能要优于ReetrantLock;在高竞争情况下,Synchronized的性能会下降几十倍,但是ReetrantLock的性能能维持常态。

Reentrantlock可重入性

ReentrantLock内部自定义了同步器Sync(Sync既实现了AQS,又实现了AOS,而AOS提供了一种互斥锁持有的方式),其实就是加锁的时候通过CAS算法,将线程对象放到一个双向链表中,每次获取锁的时候,看下当前维护的那个线程ID和当前请求的线程ID是否一样,一样就可重入了。

CountDownLatch

CountDownLatch (倒计时器):CountDownLatch是一个同步工具类,基于AQS,用来协调多个线程之间的同步。这个工具通常用来控制线程等待,它可以让某一个线程等待直到倒计时结束,再开始执行.

简单代码示例

import java.util.Random;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;

public class CloseDoor {

    public static void main(String[] args) {

        CountDownLatch countDownLatch = new CountDownLatch(5);
        CloseDoor closeDoor = new CloseDoor();
        new Thread(() -> {
            closeDoor.sayBye("Lucy");
            countDownLatch.countDown();
        }).start();
        new Thread(() -> {

            closeDoor.sayBye("Bob");
            countDownLatch.countDown();
        }).start();
        new Thread(() -> {
            closeDoor.sayBye("Lily");
            countDownLatch.countDown();

        }).start();
        new Thread(() -> {
            closeDoor.sayBye("Sam");
            countDownLatch.countDown();

        }).start();
        new Thread(() -> {
            closeDoor.sayBye("Gates");
            countDownLatch.countDown();

        }).start();

        //班长锁门
        try {
            System.out.println("同学离开中");
            countDownLatch.await();
            System.out.println("班长锁门");
        } catch (InterruptedException e) {
            e.printStackTrace();
        }

    }

    public void sayBye(String name) {
        try {
            System.out.println(name + " pack books to bag");
            Random random = new Random();
            TimeUnit.SECONDS.sleep(new Random().nextInt(10));
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        System.out.println(name + " is gone");
    }

}
复制代码

运行结果:

AQS(AbstractQueuedSynchronizer)队列

CyclicBarrier

简单代码示例

/**
 * 热身运动
 */
public class PlayBasketballDemo {

    private static final int THREAD_NUM = 6;

    private static final Random random = new Random();

    public static void main(String[] args) throws InterruptedException {

        String[] names = {"王五", "李四", "冯北", "刘青扬", "刘士凯", "薛贵", "赵钱孙"};

        // 使用构造方法:public CyclicBarrier(int parties, Runnable barrierAction)
        // 参数parties表示一共有多少线程参与这次“活动”,barrierAction是可选的,用来指定当所有线程都完成这些必须的“任务”之后要干的其他事情
        CyclicBarrier barrier = new CyclicBarrier(THREAD_NUM , new Runnable() {
            @Override
            public void run() {
                // 最后写完数据的线程,会执行这个任务
                System.out.println("/n人都到齐了,开始热身运动/n");
            }
        });

        // 启动5个线程,写数据
        for (int i = 0; i < THREAD_NUM; i++) {
            if (i < THREAD_NUM - 1) {
                Thread t = new Thread(new MyTask(barrier, names[i]), "线程名" + i);
                t.start();
            } else {
                // 最后一个线程延迟3秒执行
                Thread.sleep(3000);
                Thread t = new Thread(new MyTask(barrier, names[i]));
                t.start();
            }
        }
    }


    static class MyTask extends Thread {

        private CyclicBarrier barrier;

        private String name;

        public MyTask(CyclicBarrier barrier, String name) {
            this.barrier = barrier;
            this.name = name;
        }

        @Override
        public void run() {
            int time = random.nextInt(1000);
            System.out.println(name + " 从宿舍出发");
            try {
                Thread.sleep(time);
            } catch (InterruptedException e1) {
                e1.printStackTrace();
            }
            System.out.println(name + " 到达篮球场");
            try {
                // 用来挂起当前线程,直至所有线程都到达barrier状态再同时执行后续任务;
                // 等待所有线程都调用过此函数才能继续后续动作
                // 只等待2s,那么最后一个线程3秒才执行,则必定会超时
//                barrier.await(2000, TimeUnit.MILLISECONDS);
                barrier.await();
            } catch (Exception e) {
                e.printStackTrace();
            }
            System.out.println(name + " 开始热身");
        }

    }

}
复制代码

运行结果:

AQS(AbstractQueuedSynchronizer)队列

CyclicBarrier和CountDownLatch比较

public CyclicBarrier(intparties,RunnablebarrierAction)

Semaphore

Semaphore是基于AQS的共享模式实现的

简单代码示例

import java.util.concurrent.Semaphore;
/**
 * 吃饭问题
 * 现在 N 个人;M双筷子
 */
public class EatNoodles {

    private static int PEOPLE_NUM = 8;            //人数

    private static int CHOPSTICKS_NUM = 5;   //筷子个数

    public static void main(String[] args) {
        Semaphore semaphore = new Semaphore(CHOPSTICKS_NUM);
        String[] names = {"Lucy", "Lily", "Tom", "Sam", "Gates", "James", "Kates", "Trump", "Zoe", "Mia"};
        for (int i = 0; i < PEOPLE_NUM; i++)
            new Worker(names[i], semaphore).start();
    }

    static class Worker extends Thread {
        private String name;
        private Semaphore semaphore;

        public Worker(String name, Semaphore semaphore) {
            this.name = name;
            this.semaphore = semaphore;
        }

        @Override
        public void run() {
            try {
                semaphore.acquire();
                System.out.println(this.name + " 占用一双筷子在吃饭...");
                Thread.sleep(2000);
                System.out.println(this.name + " 吃完饭释放筷子...");
                semaphore.release();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }
}
复制代码

运行结果:

AQS(AbstractQueuedSynchronizer)队列

参考文章

  • JavaGuide AQS
  • Java并发之AQS详解
  • AQS源码分析之ConditionObject
原文  https://juejin.im/post/5e95b439f265da47ab194c15
正文到此结束
Loading...