将任务代码移到实现了 Runnable 接口的类的 run 方法中。这个接口只有一个方法
public interface Runnable { public abstract void run(); } 复制代码
由于 Runnable 是一个函数式接口,可以用 lambda 表达式建立一个实例:
Runnable r = () -> { task code }; 复制代码
由 Runnable 创建一个Thread 对象:
Thread t = new Thread(r); 复制代码
启动线程:
t.start(); 复制代码
也可以通过构建一个Thread类的子类定义一个线程
class MyThread extends Thread { public void run() { task code } } 复制代码
注意: 不要调用 Thread 类或 Runnable 对象的run方法。直接调用run方法,只会执行同一个线程中的任务,而不会启动新线程。应该调用 Thread.start 方法。。该方法将创建一个执行run方法的新线程。
当线程的run方法执行方法体中最后一条语句后,并经由执行return语句返回时,或者出现了在方法中没有捕获的异常时,线程将终止。 没有可以强制线程终止的方法。但是, interrupt 方法可以用来请求终止线程 当线程调用 interrupt 方法时,线程的中断状态将被置位。这是每一个线程都具有的 boolean 标志 每个线程都应该不时地检查这个标志,以判断线程是否被中断。
可调用静态的 Thread.currentThread 方法获得当前线程,然后调用isInterrupted 方法
while(!Thread.currentThread().isInterrupted() && more work to do){ do more work } 复制代码
不过,如果线程被阻塞,就无法检测中断状态,这时就会抛出 InterruptedException 异常 当在一个被阻塞的线程(调用 sleep 或 wait )上调用 interrupt 方法时,阻塞调用将会被 InterruptedException 异常中断
线程可以有如下6种状态:
当用 new 操作符创建一个新线程时,如 new Thread(r),该线程还没有开始运行。这意味着它的状态是new
一旦调用 start 方法,线程处于runnable 状态。一个可运行的线程可能正在运行也可能没有运行,这取决于操作系统给线程提供运行的时间。运行中的线程被中断,目的是为了让其他线程获得运行机会。线程调度的细节依赖于操作系统提供的服务。 抢占式调度 给每一个可运行线程一个时间片来执行任务。当时间片用完,操作系统剥夺该线程的运行权,并给另一个线程运行机会。 在任何给定时刻,一个可运行的线程可能正在运行也可能没有运行(这就是为什么将这个状态称为可运行而不是运行)
当线程处于被阻塞或等待状态时,它暂时不活动。它不运行任何代码且小号最少的资源。知道线程调度器重新激活它。细节取决于它是怎样达到非活动状态的
当一个线程试图获取一个内部的对象锁,而该锁被其他线程持有,则该线程进入阻塞状态。当所有其他线程释放该锁,并且线程调度器允许本线程持有它的时候,该线程将变成非阻塞状态
当线程等待另一个线程通知调度器一个条件时,它自己进入等待状态。在调用 Object.wait 方法或 Thread.join 方法,或者是等待 java.util.concurrent 库中的 Lock 或 Condition 时,就会出现这种情况
有几个方法有一个超时参数。调用它们导致线程进入计时等待状态。这一状态将一直保持到超时期满或者接收到适当的通知。带有超时的方法有 Thread.sleep 和 Object.wait 、 Object.join 、 Lock.tryLock 以及 Condition.await 的计时版
在大多数实际的多线程应用中,两个或两个以上的线程需要共享对同一数据的存取。如果两个线程存取相同的对象,并且每一个线程都调用了一个修改该对象状态的方法,将会发生什么呢?根据各线程访问数据的的次序,可能会产生讹误的对象。这样的情况通常被称为 竞争条件
我们来模拟一个有若干账户的银行。随机地生成在这些账户之间转移在钱款的交易。每一个账户有一个线程。每一笔交易中,会从线程所服务的账户中随机转移一定数目的钱款到另一个随机账户。
public void transfer(int from, int to, double amount) { if (accounts[from] < amount) return; System.out.println(Thread.currentThread()); accounts[from] -= amount; System.out.printf("%10.2f from %d to %d", amount, from, to); accounts[to] += amount; System.out.printf(" Total Balance: %10.2f%n", getTotalBalance()); } 复制代码
Runnable r = () -> { try { while (true){ //bank.size 返回的是Bank类accounts数组的长度,这里设置为100,数组中的值均为1000 int toAccount = (int)(bank.size() * Math.random()); //MAX_AMOUNT 为 1000 double amount = MAX_AMOUNT * Math.random(); bank.transfer(fromAccount, toAccount, amount); ///DELAY 为 10 Thread.sleep((int)(DELAY * Math.random())); } }catch (InterruptedException e){ } }; 复制代码
当这个模拟程序运行时,我们不清楚某一时刻某一个账户余额剩多少钱,不过我们唯一能确定的是,所有账户的总金额应该保持不变。应为 NACCOUNTS * INITIAL_BALANCE 所以我们在每次交易的结尾,transfer 方法重新计算总值打印出来。 ps:这个程序是个死循环,只能按 Ctrl+C 终止程序
程序运行结果如下:
正如结果所示,出现了错误。银行的余额应该保持在10W,才是正确的结果。但是过了一段时间之间,这个结果变了。
//测试类 public class UnsynchBankTest { public static final int NACCOUNTS = 100; public static final double INITIAL_BALANCE = 1000; public static final double MAX_AMOUNT = 1000; public static final int DELAY = 10; public static void main(String[] args) { BankUnSynch bank = new BankUnSynch(NACCOUNTS, INITIAL_BALANCE); for (int i=0;i<NACCOUNTS;i++) { int fromAccount = i; Runnable r = () -> { try { while (true){ int toAccount = (int)(bank.size() * Math.random()); double amount = MAX_AMOUNT * Math.random(); bank.transfer(fromAccount, toAccount, amount); Thread.sleep((int)(DELAY * Math.random())); } }catch (InterruptedException e){ } }; Thread t = new Thread(r); t.start(); } } } //银行类 import java.util.*; public class BankUnSynch { private final double[] accounts; public BankUnSynch(int n, double initialBalance) { accounts = new double[n]; Arrays.fill(accounts, initialBalance); } /** * 转账操作 * @param from * @param to * @param amount */ public void transfer(int from, int to, double amount) { if (accounts[from] < amount) return; //获取当前线程 System.out.println(Thread.currentThread()); accounts[from] -= amount; System.out.printf("%10.2f from %d to %d", amount, from, to); accounts[to] += amount; System.out.printf(" Total Balance: %10.2f%n", getTotalBalance()); } /** * 获取 accounts 数组的总金额 * @return */ public double getTotalBalance() { double sum = 0; for (double a: accounts) sum += a; return sum; } public int size() { return accounts.length; } } 复制代码
上面的代码运行时,其实有几个线程更新银行账户余额。一段时间之后,就出现了错误。总额要么增加了,要是变少了。当线程试图同时更新同一个账户的时候,这个问题就出现了。假设两个线程同时执行指令
accounts[to] += amount; 复制代码
这个时候会发生什么呢?由于这不是原子操作。该指令可能会被处理为:
现在我们假定第一个线程执行步骤1和2,然后,它被剥夺了运行权。第二个线程被唤醒并修改了 accounts 数组中的 同一项 ,即第二个线程执行完了这三个步骤。然后,第一个线程被唤醒并完成其第三步 这样,这一动作就擦去了第二个线程所作的更新,于是,总金额不在正确。
如何防止上面情况的发生呢? 有两种机制防止代码块受并发访问的干扰。
使用 ReentrantLock 保护代码块。代码如下
myLock.lock();// 一个ReentrantLock 对象 try{ critical section } finally{ myLock.unlock(); } 复制代码
这样我们就可以确保任何时刻只有一个线程进入临界区。一旦一个线程封锁了锁对象,其他任何线程都无法通过lock语句。当其他线程调用lock时,它们被阻塞,直到第一个线程释放锁对象
现在我们来回想一下业务,当账户中没有足够的余额时,我们是不是应该等待直到另一个线程向账户中注入资金?但是因为这一线程刚刚获得了对bankLock的排他性访问,因此导致了其他线程都被阻塞,这就是成了死锁。这时,我们就需要用到条件对象了。
一个锁对象可以有一个或多个相关的条件对象。可以用 newCondition 方法获得一个条件对象。习惯上给每一个条件对象命名为可以反映它所表达的条件的名字。
例如:
class Bank { private Lock bankLock; private Condition sufficientFunds; public Bank() { .... bankLock = new ReentrantLock(); sufficientFunds = bankLock.newCondition(); } } 复制代码
当 transfer 方法发现余额不足,调用
sufficientFunds.await(); 复制代码
表示当前线程现在被阻塞了,并放弃了锁。我们希望这样可以使得另一个线程可以进行增加账户余额的操作。
等待获得锁的线程和调用 await 方法的线程存在本质上的不同。一旦一个线程调用 await 方法,它进入该条件的 等待集 。当锁可用时,该线程不能马上解除阻塞。相反,它处于阻塞状态,直到另一线程调用同一条件上的 signalAll 方法时为止。
signalAll 方法激活因为这一条件而等待的所有线程。当这些线程从等待集当中移出时,它们再次成为可运行的,调度器将再次激活它们。此时,线程应该再次测试该条件。由于无法确保该条件被满足—— signalAll 方法仅仅是通知正在等待的线程:此时有可能已经满足条件,值得再次去检测该条件。 因此,对await 的调用应该采用以下的方式
while(!(ok to proceed)) condition.await(); 复制代码
何时调用 signalAll 方法呢?建议在对象的状态有利于等待线程的方向改变时调用 signalAll。如:当完成转账时,就调用 signalAll 方法。
public void transfer(int from,int to, double amount) throws InterruptedException { bankLock.lock(); try { while (accounts[from] < amount) sufficientFunds.await(); System.out.println(Thread.currentThread()); accounts[from] -= amount; System.out.printf("%10.2f from %d to %d", amount, from, to); accounts[to] += amount; System.out.printf(" Total Balance: %10.2f%n", getTotalBalance()); sufficientFunds.signalAll(); }finally { bankLock.unlock(); } } 复制代码
另一个方法是 signal ,此方法是随机解除选择等待集中 某个 线程的阻塞状态。这个方法存在危险,即随机选择的线程发现自己仍然不能运行,那它就会再次被阻塞,若无其他线程再次调用signal,那系统就死锁了。
import java.util.Arrays; import java.util.concurrent.locks.Condition; import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReentrantLock; public class Bank { private final double[] accounts; private Lock bankLock; private Condition sufficientFunds; /** * 初始化 * @param n 数组长度 * @param initialBalance */ public Bank(int n ,double initialBalance) { accounts = new double[n]; Arrays.fill(accounts, initialBalance); bankLock = new ReentrantLock(); sufficientFunds = bankLock.newCondition(); } /** * 转账操作 * @param from * @param to * @param amount * @throws InterruptedException */ public void transfer(int from,int to, double amount) throws InterruptedException { bankLock.lock(); try { while (accounts[from] < amount) sufficientFunds.await(); System.out.println(Thread.currentThread()); accounts[from] -= amount; System.out.printf("%10.2f from %d to %d", amount, from, to); accounts[to] += amount; System.out.printf(" Total Balance: %10.2f%n", getTotalBalance()); sufficientFunds.signalAll(); }finally { bankLock.unlock(); } } /** * 获取 accounts 数组的总金额 * @return */ public double getTotalBalance() { bankLock.lock(); try{ double sum = 0; for (double a: accounts) sum += a; return sum; }finally { bankLock.unlock(); } } public int size() { return accounts.length; } } 复制代码
Lock 和 Condition 接口为程序设计人员提供了高度的锁定控制。然而大多数情况下,并不需要那样的控制,并且可以使用一种嵌入到Java语言内部的机制。从 1.0 版本开始,Java 中的每一个对象都有一个内部锁。如果一个方法用 synchronized 关键字声明,那么对象的锁将保护整个方法。即,要调用该方法,线程必须获得内部的对象锁。 也就是说:
public synchronized void method() { do something } //等价于 public void method() { this.intrinsicLock.lock(); try{ do something } finally{ this.intrinsicLock.unlock(); } } 复制代码
内部对象锁只有一个相关条件。wait 方法添加一个线程到等待集中,notifyAll/notify 方法接触等待线程的阻塞状态。即,调用wait或notifyAll等价于
intrinsicCondition.await(); intrinsicCondition.signalAll(); 复制代码
import java.util.Arrays; import java.util.concurrent.locks.Condition; import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReentrantLock; public class BankSynch { private final double[] accounts; /** * 初始化 * @param n 数组长度 * @param initialBalance */ public BankSynch(int n ,double initialBalance) { accounts = new double[n]; Arrays.fill(accounts, initialBalance); } /** * 转账操作 * @param from * @param to * @param amount * @throws InterruptedException */ public synchronized void transfer(int from,int to, double amount) throws InterruptedException { System.out.println(Thread.currentThread()); accounts[from] -= amount; System.out.printf("%10.2f from %d to %d", amount, from, to); accounts[to] += amount; System.out.printf(" Total Balance: %10.2f%n", getTotalBalance()); } /** * 获取 accounts 数组的总金额 * @return */ public synchronized double getTotalBalance() { double sum = 0; for (double a : accounts) sum += a; return sum; } public int size() { return accounts.length; } } 复制代码
还有一种是客户端锁定,不过客户端锁定是非常脆弱的,通常不推荐使用,这里就不叙述了。