JUC:java.util.concurrent (Java并发编程工具类)
一般面试提问:面向对象和高级语法、Java集合类、Java多线程、JUC 和高并发、Java IO和 NIO
获取多线程的4种方法:
1.继承Thread类,重写run方法(其实Thread类本身也实现了Runnable接口)
2.实现Runnable接口,重写run方法
3.实现Callable接口,重写call方法(有返回值)
4.使用线程池(有返回值):通过Executors提供四种线程池
进程:
线程:
笔试:一般一个进程包含多个线程,线程可以利用进程所拥有的资源,在引入线程的操作系统中,把线程作为独立运行和独立调度的基本单位
面试:**线程是进程的组成部分,一般一个进程包含多个线程,它代表了一条顺序的执行流。**例如IDEA上的代码提示、自动补全、格式化等功能
并发:在同一实体上的两个或多个使事件在同一 时间段 内发生
并行:在不同实体上的两个或多个事件在同一 时刻 发生
高内聚:类与类、对象与对象、模块与模块之间高度地聚集和关联
低耦合:AB两个对象可以进行数据共享,但是AB两个对象又各自 独立
在高内聚低耦合的前提下,线程(thread.start())操作(对外暴露的调用方法)资源类(操作的对象):
Thread.currentThread().getName()
获取当前线程名
Thread(Runnable target, String name)
// target:Runnable接口的run() 方法的实现, run():线程处于就绪状态 name:线程名
线程(thread.start())操作只是让该线程处于就绪状态而不是启动,具体的执行与否决定于cpu和操作系统底层调度通知,
在并发编程中,经常遇到多个线程访问同一个 共享资源 ,为了维护数据一致性,synchronized关键字被常用于维护数据一致性。synchronized机制是给共享资源上锁,只有拿到锁的线程才可以访问共享资源,这样就可以强制使得对共享资源的访问都是顺序的,因为对于共享资源属性访问是必要也是必须的
乐观锁/悲观锁:
独享锁/共享锁:独享锁是指该锁一次只能被一个线程所持有。共享锁是指该锁可被多个线程所持有。
互斥锁/读写锁:互斥锁/读写锁就是独享锁/共享锁具体的实现,分别是ReentrantLock和ReadWriteLock
可重入锁:可重入锁又名递归锁,是指在同一个线程在外层方法获取锁的时候,在进入内层方法会自动获取锁
公平锁/非公平锁:
分段锁:类似HashMap的结构,即内部拥有一个Entry数组,数组中的每个元素又是一个链表,同时又是一个ReentrantLock,是一种锁的设计
偏向锁/轻量级锁/重量级锁:
偏向锁是指一段同步代码一直被一个线程所访问,那么该线程会自动获取锁。降低获取锁的代价。
轻量级锁是指当锁是偏向锁时,被另一个线程所访问,偏向锁就会升级为轻量级锁,其他线程会通过自旋的形式尝试获取锁,不会阻塞,提高性能。
重量级锁是指当锁为轻量级锁的时候,另一个线程虽然是自旋,但自旋不会一直持续下去,当自旋一定次数的时候,还没有获取到锁,就会进入阻塞,该锁膨胀为重量级锁。重量级锁会让他申请的线程进入阻塞,性能降低。
自旋锁:尝试获取锁的线程不会立即阻塞,而是采用循环的方式去尝试获取锁,这样的好处是减少线程上下文切换的消耗,缺点是循环会消耗CPU。
Interface Lock:比synchronized更牛的锁
已知的实现类:可重复锁ReentrantLock, ReetrantReadWriteLock.ReadLock, ReentrantReadWriteLock.WriteLock
ReentrantLock lock = new ReentrantLock(); // 创建锁对象 lock.lock(); // 上锁 lock.unlock(); // 释放锁
相比synchronized 的完全锁整个方法,ReentrantLock() 可以在 lock() 和 unlock() 之间的语句进行同步锁
多线程状态 Thread.State:
Lambda闭包:
拷贝小括号,写死右箭头,落地大括号 、 @FunctionalInterface注解表示为函数时接口,此接口里参数数量相同的方法只能有一个
default 开头的Lambda表达式表示在接口里声明+实现,不会影响参数数量相同的未实现方法。
Java8之前不可以在接口里实现,Java8之后通过default可以在接口里实现方法。一个函数时接口可以有多个default或static实现方法
多线程交互(如 wait(), notifyAll()
)中,必须要防止多线程的虚拟唤醒,即交互时判断只用while。wait和notify方法都在Object类里
新版生产者消费者问题:
在JUC中,Lock代替了 synchronized,Condition 代替了 Object monitor methods(wait, notify, notifyAll)
Condition.await() 代替了 this.wait() Condition.signalAll() 代替了 this.notifyAll()
Lock 和 Condition、ReadWriteLock 都是 java.util.concurrent.locks
下的子接口
相对于 synchronized
优势在于:精确通知,顺序访问、可以不局限在整个方法加同步锁,而是在一段语句内
public void print5(){ lock.lock(); try { while(number != 1){ condition1.await(); } // 干活 for(int i = 1; i <= 5; i++){ System.out.println(Thread.currentThread().getName()+"/t"+i); } number = 2; // 修改标志位 condition2.signal(); // 精准通知 condition2 } catch (Exception e) { e.printStackTrace(); } finally{ lock.unlock(); } } 复制代码
private Lock lock = new ReentrantLock(); Condition condition = lock.newCondition(); lock.lock(); // 加同步锁 condition.await(); // 等待 condition.signalAll(); // 通知 lock.unlock(); 复制代码
try{ TimeUnit.SECONDS.sleep( 1 ); } catch(InterruptedException e) {e.printStackTrace();}
// 拿着锁不会释放
public static synchronized void sendEmail() throws Exception { try{ TimeUnit.SECONDS.sleep( 4 ); } catch(InterruptedException e) {e.printStackTrace();} System.out.println("sendEmai.."); // 打印邮件 } public static synchronized void sendSMS() throws Exception { System.out.println("sendSMS.."); // 打印短信 } public void hello() { System.out.println("hello.."); } 复制代码
Phone phone = new Phone(); Phone phone2 = new Phone(); new Thread(() -> { try { phone.sendEmail(); } catch (Exception e) { e.printStackTrace(); } }, "A").start(); Thread.sleep(100); new Thread(() -> { try { // phone.sendSMS(); // phone.hello(); phone2.sendSMS(); } catch (Exception e) { e.printStackTrace(); } }, "B").start(); 复制代码
1.两个线程调用同一个对象的两个同步方法:标准访问(无TimeUnit.SECONDS.sleep( 4 )),先打印邮件还是短信?邮件
2.新增sleep()给某个方法:邮件方法暂停4秒,先打印邮件还是短信?邮件
3.新增一个线程调用新增的一个普通方法:新增普通方法hello(), 先打印邮件还是hello?hello
4.两个线程调用两个对象的同步方法,其中一个方法有Thread.sleep():两部手机,先打印邮件还是短信?短信
**5.将两个方法均设置为static方法,并且让两个线程用同一个对象调用两个方法:**两个静态同步方法,同一部手机,先打印邮件还是短信?邮件
6.两个静态同步方法,2 部手机,先打印邮件还是短信?邮件
7.一个普通同步方法,一个静态同步方法,1 部手机,先打印邮件还是短信?短信
8.一个普通同步方法,一个静态同步方法,2 部手机,先打印邮件还是短信?短信
2个常用的生成随机数工具类: UUID.randomUUID().toString().substring(x,x)
和 System.currentTimeMillis()
典型的RuntimeException(运行时异常) 包括 NullPointerException, ClassCastException(类型转换异常),IndexOutOfBoundsException(越界异常), IllegalArgumentException(非法参数异常),ArrayStoreException(数组存储异常),AruthmeticException(算术异常), BufferOverflowException(缓冲区溢出异常), 并发修改异常 java.util.ConcurrentModificationException、OutOfMemoryError内存溢出
· 当使用线程不安全的集合在高并发会出现异常,抛出并发修改异常 java.util.ConcurrentModificationException · 如何使线程安全? · 方法1(不建议):可改用Vector避免并发修改异常, Vector是ArrayList的前身,底层方法实现有synchronized修饰 · List<String> list = Collections.synchronizedList(new ArrayList<>()); 用工具类将ArrayList转换为线程安全的,适用小数据量。 · 还可以是Collections.synchronizedMap()、Collections.synchronizedSet() · 方法2:List<Object> list = new CopyOnWriteArrayList<>(); 底层用的是lock锁,采用写时复制(读写分离)思想,读和写不同容器,复制一份然后供集体读,CopyOnWriteArrayList的add()底层是:Arrays.copyOf(elements, len + 1); 即从原集合拷贝一份再写, add时扩容每次扩一个 · 类似的还可以有CopyOnWriteArraySet<>() · Map<Object, Object> map = new ConcurrentHashMap<>(); · 方法3:使用优于Runnable的 Callable 接口 List<String> list = new CopyOnWriteArrayList<>(); for (int i = 1; i <= 30; i++) { new Thread(() -> { list.add(UUID.randomUUID().toString().substring(0, 8)); System.out.println(list); }, String.valueOf(i)).start(); } 复制代码
除了vector,statck、hashtable、enumeration、StringBuffer是线程安全,其他的集合类都是线程不安全的
HashSet的底层是HashMap,但操作HashSet时只操作HashMap的key,源码: map.put(e, PRESENT)==null; PRESENT = new Object();
HashMap底层是:node类型的数组+node类型的链表+node类型的红黑树, 容量为16,负载因子0.75(即装载的内存超过容量的3/4会自动扩容,HashMap扩容为原来的一倍,即2^(4+1),ArrayList扩容为原来的一半)
new HashMap() 等价于 new HashMap(16, 0.75); 默认容量16和负载因子0.75,但可以修改
Runnable和Callable 区别:1.Callable有返回值 2.有抛异常 3.落地方法不同,Callable是call(), Runnable是run()。他们都是函数式接口
new Thread(无法传入Callable.class); 需要先找到 Runnable,再找到它的子接口 RunnableFuture,再找到它实现类FutureTask,该类也实现了Runnable接口,再 找到它的构造方法FuturexTask(Callable<V> callable)
, 便可通过这种多态是思想找到与Callable和Runnable相关联的方法。
class Mythread implements Callable<Integer>{ @Override public Integer call() throws Exception { try {TimeUnit.SECONDS.sleep(2);} catch (Exception e) {e.printStackTrace();} System.out.println("Callable"); return 1024; } } public class CallableDemo { public static void main(String[] args) throws Exception { FutureTask<Integer> futureTask = new FutureTask<>(new Mythread()); new Thread(futureTask, "A").start(); new Thread(futureTask, "B").start(); // 只会调用1次new FutureTask<>(new Mythread()); System.out.println(futureTask.get()); // 输出返回值 } } 复制代码
因此使用FutureTask来代替 new Thread 从而创建线程。这里用了多态的思想:接口与实现之间即使是在构造方法的参数也可以和接口相关联
细节:Callable的call() 内部有缓存机制,只会调用一次 new FutureTask<>(new Mythread());
CountDownLatch countDownLatch = new CountDownLatch(6); // 信号数为6 for(int i = 1; i <= 6; i++){ new Thread(() -> { System.out.println(Thread.currentThread().getName()+"/t离开教室"); countDownLatch.countDown(); // 倒计信号数,执行一次减一 }, String.valueOf(i)).start(); } countDownLatch.await(); // 堵住该main线程直到除main外的其他线程结束后才放行 System.out.println(Thread.currentThread().getName()+"/t班长关门走人"); // 信号数减到0时才执行main线程 复制代码
CyclicBarrier cyclicBarrier = new CyclicBarrier(7, () -> {System.out.println("召唤神龙");}); //设置信号数 for (int i = 1; i <= 7; i++) { final int tempInt = i; new Thread(() -> { System.out.println(Thread.currentThread().getName()+"/t收集到第:"+tempInt+"颗龙珠"); try { cyclicBarrier.await(); } catch (Exception e) { // TODO: handle exception e.printStackTrace(); } }, String.valueOf(i)).start(); // 信号数加到7时才执行main线程 } 复制代码
public static void main(String[] args) { // 模拟资源类,有3个空车位, 当一个线程占用资源后减少一个空车位,应用场景:抢红包 Semaphore semaphore = new Semaphore(3); // 3是设置的信号量,用于多个共享资源的互斥使用,还用于并发线程数的控制,限流 for(int i = 1; i <= 6; i++){ new Thread(() -> { try { semaphore.acquire(); System.out.println(Thread.currentThread().getName()+"/t抢占到了车位"); try{TimeUnit.SECONDS.sleep(3);}catch (Exception e) {e.printStackTrace();} System.out.println(Thread.currentThread().getName()+"/t离开了车位"); } catch (InterruptedException e) { e.printStackTrace(); } finally { semaphore.release(); // 释放资源 } }, String.valueOf(i)).start(); } } 复制代码
public void put(String key, Object value){ readWriteLock.writeLock().lock(); try { System.out.println(Thread.currentThread().getName()+"/t 开始写入"); map.put(key, value); System.out.println(Thread.currentThread().getName()+"/t 写入完成"); } catch (Exception e) { // TODO: handle exception e.printStackTrace(); } finally{ readWriteLock.writeLock().unlock(); } } 复制代码
当队列是空,从队列中获取元素的操作将被阻塞;当队列满,从队列中添加元素的操作将会被阻塞
不需要关心何时需要阻塞线程,什么时候需要唤醒线程,一切BlockingQueue都一手包办
7大BlockingQueue队列,只需掌握3个红色部分
Java线程池是通过Excutor框架实现的, 最主要的类: ThreadPoolExecutor
主要特点:线程复用、控制最大并发数、管理线程
线程池ThreadPoolExecutor的三大方法:
Excutors.newFixedThreadPool(int) : 一池N个固定线程,类似银行受理窗口,执行长期任务性好,底层原理为:
Excutors.newSingleThreadPool(int):一池单个线程,底层原理为:
Excutors.newCachedThreadPool(int):一池有可自动收缩可自动扩充的线程,底层原理为:
public static ExecutorService newCachedThreadPool() { return new ThreadPoolExecutor(0, Integer.MAX_VALUE, // 三大方法底层调用的都是同个方法ThreadPoolExecutor 60L, TimeUnit.SECONDS, new SynchronousQueue<Runnable>()); } 复制代码
public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue) { this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, // 线程池ThreadPoolExecutor构造参数 Executors.defaultThreadFactory(), defaultHandler); } 复制代码
// 线程池7大参数 public ThreadPoolExecutor(int corePoolSize, // 1.线程池中常驻的核心线程数,简称核心数。类似银行当天值日窗口 int maximumPoolSize, // 2.线程池中能够容纳同时执行的最大线程数,指可扩容的线程数,必须大于等于1,包含核心数 long keepAliveTime,//3.多余空闲线程(除核心线程的线程)的存活时间,当等待时间超过keepAliveTime空闲线程会被销毁 TimeUnit unit, // 4.keepAliveTime的单位 BlockingQueue<Runnable> workQueue, // 5.任务阻塞队列,被提交但尚未被执行的任务,类似候客区 ThreadFactory threadFactory, // 6.表示生成线程池中工作线程的线程工厂,用于创建线程,一般默认即可 RejectedExecutionHandler handler) { // 7.拒绝策略,当队列满,并且工作线程大于等于线程池的 最大线程数maximumPoolSize时如何拒绝那些请求执行的runnable的策略 if (corePoolSize < 0 || maximumPoolSize <= 0 || maximumPoolSize < corePoolSize || keepAliveTime < 0) throw new IllegalArgumentException(); if (workQueue == null || threadFactory == null || handler == null) throw new NullPointerException(); this.corePoolSize = corePoolSize; this.maximumPoolSize = maximumPoolSize; this.workQueue = workQueue; this.keepAliveTime = unit.toNanos(keepAliveTime); this.threadFactory = threadFactory; this.handler = handler; } 复制代码
public static void main(String[] args) { System.out.println(Runtime.getRuntime().availableProcessors()); // 获取计算机内核数 ExecutorService threadPool = new ThreadPoolExecutor( 2, 5, // 如果得到的内核数是CPU密集型,就比核数多1~2 2L, TimeUnit.SECONDS, new LinkedBlockingDeque<>(3), Executors.defaultThreadFactory(), new ThreadPoolExecutor.AbortPolicy()); try { for (int i = 0; i <= 10; i++) { threadPool.execute(() -> { // Runnable 可用Lambda表达式 System.out.println(Thread.currentThread().getName()+"/t办理业务"); }); } } catch (Exception e) { // TODO: handle exception e.printStackTrace(); } } 复制代码
一般使用以下方法创建线程池, 此时线程池最大容纳数是5+3, 最大线程数是5
ExecutorService threadPool = new ThreadPoolExecutor(2, 5, 2L, TimeUnit.SECONDS, new LinkedBlockingDeque<>(3), Executors.defaultThreadFactory(), new ThreadPoolExecutor.AbortPolicy()); 复制代码
Consumer
Consumer<String> consumer = s -> {System.out.println(s);}; consumer.accept("a"); // 输出a 复制代码
Supplier
Supplier<String> supplier = () -> {return "sup";}; System.out.println(supplier.get()); // 固定输出sup 复制代码
Function<T,R>
public static void main(String[] args) { Function<String, Integer> function = (s -> {return s.length();}); System.out.println(function.apply("abc")); // 返回s.length() 复制代码
Predicate
Predicate<String> predicate = s -> {return s.isEmpty();}; // lambda表达式s System.out.println(predicate.test("")); // true 复制代码
@Accessors
Stream是数据渠道,用于操作数据源(集合、数组等)所生成的元素序列。集合讲的是数据,流讲的是计算!
public static void main(String[] args) { // stream式计算 User u1 = new User(11, "a", 23); User u2 = new User(12, "b", 24); User u3 = new User(13, "c", 22); User u4 = new User(14, "d", 28); User u5 = new User(16, "e", 26); List<User> list = Arrays.asList(u1,u2,u3,u4,u5); // 形参 u 代表 List<User>的泛型User list.stream().filter(u -> {return u.getId() % 2 == 0;}) // 选取偶数id .filter(u -> {return u.getAge() > 24;}) // 选取 age 大于 24 .map(m -> {return m.getUserName().toUpperCase();}) // list转为map,名字变为大写 .sorted((o1,o2) -> {return o2.compareTo(o1);}).limit(1) // 逆序,输出第一个对象 .forEach(System.out::println); // 遍历输出E } 复制代码
线程接口中,能干活的线程接口有Runnable (无返回值) 、Callable(有返回值)
ForkJoinPool :类比线程池
ForkJoinTask:类比FutureTask
RecursiveTask:递归任务,继承后可以实现递归调用的任务
抽象类不能直接通过new而实例化,需要创建一个指向自己的对象引用(其子类)来实例化
class MyTask extends RecursiveTask<Integer>{ private static final Integer ADJUST_VALUE = 10; private int begin; private int end; private int result; public MyTask(int begin, int end) { this.begin = begin; this.end = end; } @Override protected Integer compute() { // RecursiveTask 的抽象方法,执行递归任务 if((end - begin) <= ADJUST_VALUE){ for(int i = begin; i <= end; i++){ result = result + i; } }else{ int middle = (end + begin) / 2; MyTask task01 = new MyTask(begin, middle); MyTask task02 = new MyTask(middle+1, end); task01.fork(); // task01递归、分支直到 (end - begin) <= ADJUST_VALUE task02.fork(); // task02递归、分支直到 (end - begin) <= ADJUST_VALUE result = task01.join() + task02.join(); // 将所有子结果合并 } return result; } } public class ForkJoinDemo { public static void main(String[] args) throws InterruptedException, ExecutionException { MyTask myTask = new MyTask(0, 100); ForkJoinPool threadPool = new ForkJoinPool(); ForkJoinTask<Integer> forkJoinTask = threadPool.submit(myTask); System.out.println(forkJoinTask.get()); threadPool.shutdown(); } } 复制代码
public class CompletableFutureDemo { public static void main(String[] args) throws InterruptedException, ExecutionException { CompletableFuture<Void> completableFuture = CompletableFuture.runAsync(() -> { System.out.println(Thread.currentThread().getName() + "/t没有返回, update mysql ok"); }); completableFuture.get(); // 异步回调 CompletableFuture<Integer> completableFuture2 = CompletableFuture.supplyAsync(() -> { System.out.println(Thread.currentThread().getName() + "/tcompletableFuture2"); // int age = 10/0; return 1024; }); System.out.println(completableFuture2.whenComplete((t, u) -> { // 正常时返回 completableFuture2 的返回值 System.out.println("*****t:" + t); // t 为 completableFuture2 异步回调的返回值 System.out.println("*****u:" + u); // u 为 completableFuture2 异步回调的异常信息 }).exceptionally(f -> { // 异常时返回 completableFuture2 的异常 System.out.println("*****exception:" + f.getMessage()); return 444; }).get()); // 打印返回值结果 } } 复制代码