一个进程可以有多个线程,一个线程可以有多个协程。
先说说线程跟进程:协程平时听的比较少,所以这里引用维基百科的解释:
start()方法是运行多线程的主方法,在start()方法内部,通过调用run()方法启动这个线程。run()方法用以启动一个线程,然后主线程立刻返回。该启动的线程不会马上执行,而是在等待队列中等待CPU的调度。
我们知道在操作系统中,进程有五种状态: 新建、运行、就绪、阻塞、终止。 注意这只是比较广泛的说法,不同的操作系统对进程状态定义也有不同。
但是在Java中,Thread类定义了六种线程状态,千万不要混淆了:
补充: 进入Thread.getState()方法可以发现在JVM中也定义了六种线程的状态:
public State getState() { // get current thread state return sun.misc.VM.toThreadState(threadStatus); } 复制代码
sun.misc包中的VM类中的线程状态:
private static final int JVMTI_THREAD_STATE_ALIVE = 1; private static final int JVMTI_THREAD_STATE_TERMINATED = 2; private static final int JVMTI_THREAD_STATE_RUNNABLE = 4; private static final int JVMTI_THREAD_STATE_BLOCKED_ON_MONITOR_ENTER = 1024; private static final int JVMTI_THREAD_STATE_WAITING_INDEFINITELY = 16; private static final int JVMTI_THREAD_STATE_WAITING_WITH_TIMEOUT = 32; 复制代码
synchronized({Object})可以用以锁住一个对象,同样synchronized还可以用在方法签名上。那么问题来了,对于非静态方法,synchronized很明显是锁住调用这个方法的实体,那么对于静态方法,锁住的是什么?答案是锁住的是类在JVM中所存储的Class对象。
private static List<Integer> list = Collections.synchronizedList(new ArrayList<>()); 复制代码
首先,他们都是Object类中的方法,接下来看下面这个例子:
public static void main(String[] args) { new Thread(new Thread1()).start(); new Thread(new Thread2()).start(); new Thread(new Thread3()).start(); } static class Thread1 extends Thread { @Override public void run() { System.out.println("Thread1"); } } static class Thread2 extends Thread { @Override public void run() { System.out.println("Thread2"); } } static class Thread3 extends Thread { @Override public void run() { System.out.println("Thread3"); } } 复制代码
这里我开辟了三个线程,分别去实现自己的方法。结果很明显,就是按顺序打印结果:
Thread1 Thread2 Thread3 复制代码
但是当我对Thread1使用wait()方法时,
static class Thread1 extends Thread { @Override public void run() { synchronized (lock){ try { lock.wait(); } catch (InterruptedException e) { e.printStackTrace(); } } System.out.println("Thread1"); } } 复制代码
Thread会交出自己所持有的锁,让其他进程去争夺这个锁,而自己就一直等待着其他线程的唤醒。
他就一直在这等啊,直到我们在其他线程中使用notify()方法将其唤醒:
static class Thread3 extends Thread { @Override public void run() { synchronized (lock){ lock.notify(); } System.out.println("Thread3"); } } 复制代码
这时候,执行的顺序也改变了:
Thread2 Thread3 Thread1 复制代码
那么notifyAll()就是唤醒其他正在等待的线程,然后让他们之间重新去争夺锁:
static class Thread1 extends Thread { @Override public void run() { synchronized (lock){ try { lock.wait(); } catch (InterruptedException e) { e.printStackTrace(); } } System.out.println("Thread1"); } } static class Thread2 extends Thread { @Override public void run() { synchronized (lock){ try { lock.wait(); } catch (InterruptedException e) { e.printStackTrace(); } } System.out.println("Thread2"); } } static class Thread3 extends Thread { @Override public void run() { synchronized (lock){ lock.notifyAll(); } System.out.println("Thread3"); } } 复制代码
结果:
Thread1 Thread2 Thread3 复制代码
注意:使用Object的wait(),notify(),notifyAll()方法都需要持有这个对象的监视器,就是代码中的synchronized()。java doc中的原话:
* This method should only be called by a thread that is the owner * of this object's monitor. 复制代码
否则会报出 IllegalMonitorStateException
异常。
由于Java的线程调度完全依赖于操作系统,所以每个线程都会占用一定的资源,进而我们不可能随心所欲的去开辟线程。所以,这时候就需要线程池了,线程池就是预先在内存中开辟一块资源,专门用于线程的调度。当需要线程执行任务时,就通过线程池管理器来实现线程的分配。
java.util.concurrent包中的Executors类中,封装了四种开辟线程池的方法。
ExecutorService executorService = Executors.newFixedThreadPool(10); executorService.submit(new Callable<Integer>() { @Override public Integer call() throws Exception { return 1; } }); executorService.submit(new Runnable() { @Override public void run() { System.out.println(" "); } }); System.out.println(submit.get()); //线程池使用完毕后,必须关闭 executorService.shutdown(); 复制代码
程序允许完毕,jvm会等待非守护线程完成后关闭,但是jvm不会等待守护线程关闭,守护线程最典型的例子就是GC进程。
像下面这种情况锁与锁之间相互竞争的情况:
public class MultiplyThreadTest { private static final Object lock = new Object(); private static final Object lock1 = new Object(); public static void main(String[] args) { new ThreadClass1().start(); new ThreadClass2().start(); } static class ThreadClass1 extends Thread { @Override public void run() { synchronized (lock) { try { Thread.sleep(0); } catch (InterruptedException e) { e.printStackTrace(); } synchronized (lock1) { System.out.println("Thread1"); } } } } static class ThreadClass2 extends Thread { @Override public void run() { synchronized (lock1) { synchronized (lock) { System.out.println("Thread2"); } } } } } 复制代码
其实我刚开始写出的来的时候,我自己都有疑问:
其实这地方,涉及到一个冷门的知识点,可以参考stackoverflow的 回答 ,那就是绝大部分的操作系统的时间精度都在10ms,所以看上去是sleep(0),但由于精度问题,实际上不是睡0ms。另外一个,不加 Thread.sleep(0)
就不会产生死锁的原因在于,线程执行这两个简单的方法的过程其实是非常短暂的,所以锁之间根本来不及相互竞争。
jsp
命令查看哪些进程正在执行:
jstack 17424 > ~/Desktop/1.txt 复制代码
可以看到日志中,有详细的死锁信息。
public class ProducerConsumer1 { private static final Object lock = new Object(); private static Optional<Integer> optional = Optional.empty(); public static void main(String[] args) throws InterruptedException { Container container = new Container(optional); Producer producer = new Producer(container); Consumer consumer = new Consumer(container); producer.start(); consumer.start(); producer.join(); producer.join(); } public static class Producer extends Thread { Container container; public Producer(Container container) { this.container = container; } @Override public void run() { for (int i = 0; i < 10; i++) { synchronized (lock) { while (container.getOptional().isPresent()) { try { lock.wait(); } catch (InterruptedException e) { e.printStackTrace(); } } Integer integer = new Random().nextInt(); System.out.println("Product:" + integer); container.setOptional(Optional.of(integer)); lock.notify(); } } } } public static class Consumer extends Thread { Container container; public Consumer(Container container) { this.container = container; } @Override public void run() { for (int i = 0; i < 10; i++) { synchronized (lock) { while (!container.getOptional().isPresent()) { try { lock.wait(); } catch (InterruptedException e) { e.printStackTrace(); } } System.out.println("Consume:" + container.getOptional().get()); container.setOptional(Optional.empty()); lock.notify(); } } } } } 复制代码
虽然看起来代码挺多的,但其实就是一个思想,就是生产者在生产之前,先检查容器中是否为空,如果不为空就等待,为空才生产。同样,消费者也需要在容器中有值的情况下才会消费,否则就等待。
借鉴Condition接口中的提示:
* public void put(Object x) throws InterruptedException { * <b>lock.lock(); * try {</b> * while (count == items.length) * <b>notFull.await();</b> * items[putptr] = x; * if (++putptr == items.length) putptr = 0; * ++count; * <b>notEmpty.signal();</b> * <b>} finally { * lock.unlock(); * }</b> * } 复制代码
代码如下:
public class ProducerConsumer2 { private static final Lock lock = new ReentrantLock(); private static final Condition notConsumer = lock.newCondition(); private static final Condition notProduct = lock.newCondition(); private static Optional<Integer> optional = Optional.empty(); public static void main(String[] args) throws InterruptedException { Container container = new Container(optional); Producer producer = new Producer(container); Consumer consumer = new Consumer(container); producer.start(); consumer.start(); producer.join(); producer.join(); } public static class Producer extends Thread { private Container container; public Producer(Container container) { this.container = container; } @Override public void run() { lock.lock(); try { while (container.getOptional().isPresent()) { try { //注意:是await()而不是wait() notConsumer.await(); } catch (InterruptedException e) { e.printStackTrace(); } } int random = new Random().nextInt(); System.out.println("Product" + random); container.setOptional(Optional.of(random)); notProduct.signal(); } finally { lock.unlock(); } } } public static class Consumer extends Thread { private Container container; public Consumer(Container container) { this.container = container; } @Override public void run() { try { lock.lock(); while (!container.getOptional().isPresent()) { try { notProduct.await(); } catch (InterruptedException e) { e.printStackTrace(); } } System.out.println(container.getOptional().get()); container.setOptional(Optional.empty()); notConsumer.notify(); } finally { lock.unlock(); } } } } 复制代码
与第一种方法类似,这里增加了Condition条件判断,并用到了ReentrantLock。
public class ProducerConsumer3 { private static final BlockingQueue<Integer> queue = new LinkedBlockingDeque<>(1); private static final BlockingQueue<Integer> signal = new LinkedBlockingDeque<>(1); public static void main(String[] args) throws InterruptedException { Producer producer = new Producer(queue, signal); Consumer consumer = new Consumer(queue, signal); producer.start(); consumer.start(); producer.join(); producer.join(); } public static class Producer extends Thread { BlockingQueue<Integer> queue; BlockingQueue<Integer> signal; public Producer(BlockingQueue<Integer> queue, BlockingQueue<Integer> signal) { this.queue = queue; this.signal = signal; } @Override public void run() { for (int i = 0; i < 10; i++) { int random = new Random().nextInt(); System.out.println("Product:" + random); try { queue.put(random); signal.take(); } catch (InterruptedException e) { e.printStackTrace(); } } } } public static class Consumer extends Thread { BlockingQueue<Integer> queue; BlockingQueue<Integer> signal; public Consumer(BlockingQueue<Integer> queue, BlockingQueue<Integer> signal) { this.queue = queue; this.signal = signal; } @Override public void run() { for (int i = 0; i < 10; i++) { try { System.out.println("Consumer:" + queue.take()); signal.put(0); } catch (InterruptedException e) { e.printStackTrace(); } } } } } 复制代码
这种方法就比较简单了,不用判断容器是否为空,因为BlockingQueue在内部为我们实现了判断,直接拿来用就可以了。这里需要注意一点,由于take()与put()方法几乎是同时执行的,所有需要加一个signal标志信息,避免乱序。
Container容器:
public class Container { Optional<Integer> optional; public Container(Optional<Integer> optional) { this.optional = optional; } public Optional<Integer> getOptional() { return optional; } public void setOptional(Optional<Integer> optional) { this.optional = optional; } } 复制代码