转载

Java 多线程设计模式之 Single Threades Execution

所谓 Single Threades Execution 模式,意即“以一个线程执行”。就像独木桥同一时间内只允许一个人通行一样,该模式用于设置限制,以确保同一时间内只能让一个线程执行处理。

Demo

不使用 Single Threades Execution 模式的程序

使用程序模拟三个人频繁地通过一个只允许一个人经过的门情形。当人们通过门的时候,统计人数便会递增。另外程序还会记录通行者的“姓名和出生地”

类一览表

名字 说明
Main 创建门,并让三个人不断地通过的类
Gate 表示门的类。它会在人们通过门时记录其姓名与出生地
UserThread 表示人的类。人们不断地通过门
// Main.java

public class Main {

    public static void main(String[] args) {
        Gate gate = new Gate();
        new UserThread(gate, "Bob", "Britain").start();
        new UserThread(gate, "Cao", "China").start();
        new UserThread(gate, "Uber", "USA").start();
    }
}
复制代码
// Gate.java

public class Gate {

    private int counter = 0;
    private String name = "Nobody";
    private String address = "NoWhere";

    public void pass(String name, String address) {
        this.counter++;
        this.name = name;
        this.address = address;
        check();
    }

    private void check() {
        if (this.name.charAt(0) != this.address.charAt(0)) {
            System.out.println("******** BROKEN ********** : " + toString());
        }
    }

    @Override
    public String toString() {
        return "No. " + this.counter + " : " + this.name + " , " + this.address;
    }
}
复制代码
// UserThread.java

public class UserThread extends Thread {

    private final Gate gate;
    private final String name;
    private final String address;

    public UserThread(Gate gate, String name, String address) {
        this.gate = gate;
        this.name = name;
        this.address = address;
    }

    @Override
    public void run() {
        System.out.println(this.name + " BEGIN");
        while (true) {
            gate.pass(this.name, this.address);
        }
    }
}
复制代码

当这个程序执行时,时间点不同,生成的结果也会不一样,以下是打印出来的 log

Bob BEGIN
Cao BEGIN
******** BROKEN ********** : No. 59622 : Bob , Britain
Uber BEGIN
******** BROKEN ********** : No. 77170 : Uber , USA
******** BROKEN ********** : No. 89771 : Uber , USA
******** BROKEN ********** : No. 93128 : Cao , China
******** BROKEN ********** : No. 95654 : Uber , USA
******** BROKEN ********** : No. 98440 : Cao , China
******** BROKEN ********** : No. 102283 : Cao , China
******** BROKEN ********** : No. 104491 : Cao , China
******** BROKEN ********** : No. 106791 : Uber , USA
******** BROKEN ********** : No. 110022 : Uber , USA
******** BROKEN ********** : No. 112073 : Uber , USA
******** BROKEN ********** : No. 113973 : Uber , USA
******** BROKEN ********** : No. 77170 : Uber , USA
******** BROKEN ********** : No. 116050 : Bob , China
******** BROKEN ********** : No. 117334 : Bob , Britain
******** BROKEN ********** : No. 119992 : Bob , USA
******** BROKEN ********** : No. 124427 : Uber , USA
******** BROKEN ********** : No. 117152 : Bob , Britain
******** BROKEN ********** : No. 129298 : Bob , China
******** BROKEN ********** : No. 130552 : Cao , Britain
******** BROKEN ********** : No. 147176 : Cao , China
******** BROKEN ********** : No. 148546 : Uber , USA
复制代码

通过 log 可以知道运行结果与预期不一致,所以说 Gate 类是不安全的,是非线程安全类。

如果仔细看一下 counter 的值,最开始显示 BROKEN 的时候,counter 的值已经变为了 59622。也就是说,在检察处第一个错误的时候 Gate 的 pass 方法已经运行了 5 万多次了。在这里,因为 UserThread 类的 run 方法执行的是无限循环,所以才检查除了错误。但是如果只测试几次,是根本找不出错误的。

这就是多线程程序设计的难点之一。如果检察出错误,那么说明程序并不安全。但是就算没有检察出错误,也不能说程序就一定是安全的。

调试信息也不可靠

仔细看 log 会发现还有一个奇怪的现象,比如:

******** BROKEN ********** : No. 59622 : Bob , Britain
复制代码

虽然此处输出了 BROKEN 信息,但是姓名和出生地首字母是一样的。尽管显示了 BROKEN,但是调试信息好像并没有错。

导致这种现象的原因是,在某个线程执行 check 方法时,其他线程不断执行 pass 方法,改谢了 name 字段和 address 字段的值。

这也是多线程程序设计的难点之一。如果显示调试信息的代码本身就是非线程安全的,那么显示的调试信息就很可能是错误的。

如果连操作测试和调试信息都无法确保安全性,那就进行代码评审吧。多个人一起仔细阅读代码,确认是否会发生问题,这是确保程序安全性的一个有效方法。

修改 Gate 类使其线程安全

// Gate.java

public class Gate {
    ...
    
    public synchronized void pass(String name, String address) {
        this.counter++;
        this.name = name;
        this.address = address;
        check();
    }
    
    ...
}
复制代码

之后程序就可以正常的运行,也不在打印 BROKEN 的 log 信息了

Single Threaded Execution 模式归纳

SharedResource 共享资源

在刚才的示例中,Gate 类扮演 SharedResource 的角色

SharedResource 角色是可被多个线程访问的类,包含很多方法,但这些方法主要分为如下两类:

  • safeMethod: 多个线程同时调用也不会发生问题的方法
  • unsafeMethod:多个线程同时调用会发生问题,因此必须加以保护的方法

而 unsafeMethod 在被多个线程同时执行时,实例状态有可能发生分歧。这时就需要保护该方法,使其不被多个线程同时访问。 Java 则是通过将 unsafeMethod 声明为 synchronized 方法来进行保护

死锁

在该模式下,满足下列条件时,死锁就会发生

  • 存在多个 SharedResource 角色
  • 线程在持有着某个 SharedResource 角色锁的同时,还想获取其他 SharedResource 角色的锁
  • 获取 SharedResource 角色的锁的顺序并不固定

原子操作

不可分割的操作通常称为原子操作。

上述示例中 Gate类是线程安全的 我们将 pass 声明为了 synchronized 方法,这样 pass 方法也就成为了原子操作

Java 编程规范中定义了一些原子操作。例如 char、int 等基本类型的赋值和引用操作都是原子的。另外,对象等引用类型的赋值和引用操作也是原子的。由于本身就是原子的,所以就算不加上 synchronized,这些操作也不会被分割。 但是 long、double 的赋值和引用操作并不是原子的

总结如下:

  • 基本类型、引用类型的赋值和引用是原子操作
  • 但 long 和 double 的赋值和引用是非原子操作
  • long 或 double 在线程间共享时,需要将其放入 synchronized 中操作,或者声明为 volatile

计数信号量和 Semaphore 类

上面介绍 Single Threaded Execution 模式用于确保某个区域“只能由一个线程”执行。下面我们将这种模式进一步扩展,以确保某个区域“最多只能由 N 个线程”执行。这时就要用计数信号量来控制线程数量。

java.util.concurrent 包提供了表示计数信号量的 Semaphore 类

资源的许可个数将通过 Semaphore 的构造函数来指定

Semaphore 的 acquire 方法用于确保存在可用资源。当存在可用资源时,线程会立即从 acquire 方法返回,同时信号量内部的资源个数会减 1 。 如无可用资源,线程阻塞在 acquire 方法内,直至出现可用资源。

Semaphore 的 release 方法用于释放资源。释放资源后,信号量内部的资源个数会增加 1。另外如果 acquire 中存在等待的线程,那么其中一个线程会被唤醒,并从 acquire 方法返回。

示例

// BoundedResource.java

public class BoundedResource {

    private final int permits;
    private final Semaphore semaphore;
    private final Random random = new Random(314159);

    public BoundedResource(int permits) {
        this.semaphore = new Semaphore(permits);
        this.permits = permits;
    }

    public void use() throws InterruptedException {
        try {
            this.semaphore.acquire();
            doUse();
        } finally {
            this.semaphore.release();
        }

    }

    private void doUse() throws InterruptedException {
        System.out.println(Thread.currentThread().getName() + " : BEGIN used = " + (this.permits - this.semaphore.availablePermits()));
        Thread.sleep(this.random.nextInt(500));
        System.out.println(Thread.currentThread().getName() + " : END used = " + (this.permits - this.semaphore.availablePermits()));
    }
}
复制代码
// SemaphoreThread.java

public class SemaphoreThread extends Thread{

    private final Random random = new Random(26535);
    private final BoundedResource resource;

    public SemaphoreThread(BoundedResource resource) {
        this.resource = resource;
    }

    @Override
    public void run() {
        try {
            while (true) {
                this.resource.use();
                Thread.sleep(this.random.nextInt(2000));
            }
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
}
复制代码
// Main.java

public class Main {

    public static void main(String[] args) {
        BoundedResource boundedResource = new BoundedResource(3);
        new SemaphoreThread(boundedResource).start();
        new SemaphoreThread(boundedResource).start();
        new SemaphoreThread(boundedResource).start();
    }
}
复制代码

打印结果:

Thread-0 : BEGIN used = 2
Thread-2 : BEGIN used = 3
Thread-1 : BEGIN used = 2
Thread-2 : END used = 3
Thread-1 : END used = 2
Thread-0 : END used = 1
Thread-2 : BEGIN used = 1
Thread-2 : END used = 1
Thread-1 : BEGIN used = 1
Thread-0 : BEGIN used = 2
Thread-1 : END used = 2
Thread-0 : END used = 1
Thread-2 : BEGIN used = 1
Thread-2 : END used = 1
Thread-1 : BEGIN used = 1
Thread-0 : BEGIN used = 2
Thread-2 : BEGIN used = 3
Thread-0 : END used = 3

复制代码
原文  https://juejin.im/post/5ba0e190e51d450e9059d457
正文到此结束
Loading...