Java高并发编程

JUC是什么

java.util.concurrent 在并发编程中使用的工具类

进程与线程

进程

  • 程序由指令和数据组成,但这些指令要运行,数据要读写,就必须将指令加载至 CPU,数据加载至内存。在指令运行过程中还需要用到磁盘、网络等设备。进程就是用来加载指令、管理内存、管理 IO 的
  • 当一个程序被运行,从磁盘加载这个程序的代码至内存,这时就开启了一个进程进程就可以视为程序的一个实例。大部分程序可以同时运行多个实例进程(例如记事本、画图、浏览器等),也有的程序只能启动一个实例进程(例如网易云音乐、360 安全卫士等)。

线程

  • 通常在一个进程中可以包含若干个线程,当然一个进程中至少有一个线程,不然没有存在的意义。
  • 线程可以利用进程所拥有的资源,在引入线程的操作系统中,通常都是把进程作为分配资源的基本单位,而把线程作为独立运行和独立调度的基本单位,由于线程比进程更小,基本上不拥有系统资源,故对它的调度所付出的开销就会小得多,能更高效的提高系统多个程序间并发执行的程度

线程状态

NEW,RUNNABLE,BLOCKED,WAITING,TIMED_WAITING,TERMINATED;

image-20200830104045416

wait/sleep

功能都是当前线程暂停,有什么区别?wait放开手去睡,放开手里的锁sleep握紧手去睡,醒了手里还有锁。

并发和并行

并发:同一时刻多个线程在访问同一个资源,多个线程对一个点 例子:小米9今天上午10点,限量抢购 春运抢票 电商秒杀…

并行:多项工作一起执行,之后再汇总 例子:泡方便面,电水壶烧水,一边撕调料倒入桶中

Java 线程创建方式 🤔

继承 Thread

// 创建线程对象
Thread t = new Thread() {
// 重写run方法为业务逻辑
public void run() {
 		System.out.println("Hello MyThread!");
 }
};
// 启动线程
t.start();

实现 Runnable 接口配合 Thread

Runnable runnable = new Runnable() {
public void run(){
 		System.out.println("Hello MyRun!");
 }
};
// 创建线程对象
Thread t = new Thread( runnable );
// 启动线程
t.start();

// ⚠️Java 8 以后可以使用 lambda 精简代码(推荐)
new Thread(()->{
    System.out.println("Hello MyThread!");
},"t").start();

FutureTask 配合 Thread 🤔

// FutureTask是Runnable的实现类,并有Callable的构造函数
FutureTask<Integer> futureTask = new FutureTask<>(() -> {
 System.out.println("Hello");
 return 100;
});
// 参数1 是任务对象; 参数2 是线程名字,推荐
new Thread(futureTask, "t3").start();
// 主线程阻塞,同步等待 task 执行完毕的结果
System.out.println("结果是: " + futureTask.get());

什么是FutureTask?

未来的任务,用它就干一件事,异步调用。main方法就像一个冰糖葫芦,一个个方法由main串起来。但解决不了一个问题:正常调用挂起堵塞问题。例子:老师上着课,口渴了,去买水不合适,讲课线程继续,我可以单起个线程找班长帮忙买水,水买回来了放桌上,我需要的时候再去get。

一些细节 🤔

  • get()方法一般放在最后一行
  • 只计算一次

一般FutureTask多用于耗时的计算,主线程可以在完成自己的任务后,再去获取结果。仅在计算完成时才能检索结果;一旦计算完成,就不能再重新开始或取消计算get方法而获取结果只有在计算完成时获取,否则会一直阻塞直到任务转入完成状态,

Lock接口

锁实现提供了比使用同步方法和语句可以获得的更广泛的锁操作。它们允许更灵活的结构,可能具有非常不同的属性,并且可能支持多个关联的条件对象。

ReentrantLock实现类

lock的新特性:通过设置多个condition可以实现 精准通知 精准唤醒

ReentrantLock可重入锁,如何使用:

class X {
  private final ReentrantLock lock = new ReentrantLock();
  // ...
  public void m() {
   lock.lock(); // block until condition holds
   try {
    // ... method body
   } finally {
    lock.unlock()
   }
  }
 }

synchronized与Lock的区别

  1. 首先synchronized是java内置关键字,在jvm层面,Lock是个java类;

  2. synchronized无法判断是否获取锁的状态,Lock可以判断是否获取到锁;

  3. synchronized会自动释放锁(a 线程执行完同步代码会释放锁 ;b 线程执行过程中发生异常会释放锁),Lock需在finally中手工释放锁(unlock()方法释放锁),否则容易造成线程死锁;

  4. 用synchronized关键字的两个线程1和线程2,如果当前线程1获得锁,线程2线程等待。如果线程1阻塞,线程2则会一直等待下去,而Lock锁就不一定会等待下去,如果尝试获取不到锁,线程可以不用一直等待就结束了;

  5. synchronized的锁可重入、不可中断、非公平,而Lock锁可重入、可判断、可公平(两者皆可)

  6. 在高争用 高耗时的环境下synchronized效率更高,在低争用 低耗时的环境下Lock (CAS)效率更高

    • synchronized到重量级之后是等待队列(不消耗CPU), 而CAS(等待期间消耗CPU)
    • 一切以实测为准

接口里是否能有实现方法

  • default方法

  • 静态方法实现:接口新增

线程间通信 🤔

面试题:四个线程打印

现在4个线程,可以操作初始值为0的一个变量。实现两个线程对该变量加1,两个线程对该变量减1,实现交替加减10轮。

1.高内聚低耦合前提下,线程操作资源类

2.判断/干活/通知

3.多线程交互中,必须要防止多线程的虚假唤醒,也即(判断只用while,不能用if)

synchronized版

image-20200830172609995

⚠️:if 情况下,同时唤醒了两个+1的线程,并且没有被拉回再次判断导致shareData+1+1。 code

public class ThreadWaitNotifyDemo {
    public static void main(String[] args) {
        ShareData shareData = new ShareData();
        new Thread(() -> {
            // method
            for (int i = 0; i < 10; i++) {
                try {
                    shareData.increment();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        }, "A").start();
        new Thread(() -> {
            // method
            for (int i = 0; i < 10; i++) {
                try {
                    shareData.decrement();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        }, "B").start();
        new Thread(() -> {
            // method
            for (int i = 0; i < 10; i++) {
                try {
                    shareData.increment();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        }, "C").start();
        new Thread(() -> {
            // method
            for (int i = 0; i < 10; i++) {
                try {
                    shareData.decrement();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        }, "D").start();
    }

}

class ShareData {
    private int number = 0;

    public synchronized void increment() throws InterruptedException {
        //1 判断
        while (number != 0) {
            this.wait();
        }
        //2 干活
        number++;
        System.out.println(Thread.currentThread().getName() + ": " + number);
        //3 通知
        this.notifyAll();
    }

    public synchronized void decrement() throws InterruptedException {
        //1 判断
        while (number == 0) {
            this.wait();
        }
        //2 干活
        number--;
        System.out.println(Thread.currentThread().getName() + ": " + number);
        //3 通知
        this.notifyAll();
    }
}

lock版 🤔

image-20200830173333135

code

class ShareData {
    private int number = 0;
    private Lock lock = new ReentrantLock();
    private Condition condition = lock.newCondition();

    public void increment() throws InterruptedException {
        lock.lock();
        try {
            //1 判断
            while (number != 0) {
                condition.await();
            }
            //2 干活
            number++;
            System.out.println(Thread.currentThread().getName() + ": " + number);
            //3 通知
            condition.signalAll();
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            lock.unlock();
        }
    }
    public void decrement() throws InterruptedException {
        lock.lock();
        try {
            //1 判断
            while (number == 0) {
                condition.await();
            }
            //2 干活
            number--;
            System.out.println(Thread.currentThread().getName() + ": " + number);
            //3 通知
            condition.signalAll();
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            lock.unlock();
        }
    }
}

面试题:线程间定制化调用通信

lock的新特性:通过设置多个condition可以实现 精准通知 精准唤醒 code

并发编程口诀:

1.高内聚低耦合前提下,线程操作资源类

2.判断/干活/通知

3.多线程交互中,必须要防止多线程的虚假唤醒,也即(判断只用while,不能用if)

4.注意标志位的修改和定位

题目:多线程之间按顺序调用,实现A->B->C。三个线程启动,要求如下:

  1. AA打印5次,BB打印10次,CC打印15次
  2. 按此顺序打印10轮

面试题:多线程锁

code

1. 标准访问,先打印短信还是邮件
   - 邮件
2. 停4秒在短信方法内,先打印短信还是邮件
   - 邮件
3.  普通的hello方法,是先打短信还是hello
   - Hello
4.  现在有两部手机,先打印短信还是邮件
   - 短信
5.  两个静态同步方法,1部手机,先打印短信还是邮件
   - 邮件
6.  两个静态同步方法,2部手机,先打印短信还是邮件
   - 邮件
7. 1个静态同步方法,1个普通同步方法,1部手机,先打印短信还是邮件
   - 短信(也不是同一个锁,一个是类一个是实例)
8. 1个静态同步方法,1个普通同步方法,2部手机,先打印短信还是邮件
   - 短信

synchronized实现同步的基础

Java中的每一个对象都可以作为锁。具体表现为以下3种形式:

  1. 对于普通同步方法,锁是当前实例对象。
  2. 对于静态同步方法,锁是当前类的Class对象。
  3. 对于同步方法块,锁是Synchonized括号里配置的对象。

详细分析

  1. 锁的是当前对象this,被锁定后,其它的线程都不能进入到当前对象的其它的synchronized方法。
    • 一个对象里面如果有多个synchronized方法,某一个时刻内,只要一个线程去调用其中的一个synchronized方法了,其它的线程都只能等待,换句话说,某一个时刻内,只能有唯一一个线程去访问这些synchronized方法。
  2. 加个普通方法Hello()后发现和同步锁无关。

  3. 换成两个对象后,不是同一把锁了,情况立刻变化。这两把锁是两个不同的对象,所以静态同步方法与非静态同步方法之间是不会有竞态条件的。
    • 也就是说如果一个实例对象的非静态同步方法获取锁后,该实例对象的其他非静态同步方法必须等待获取锁的方法释放锁后才能获取锁,可是别的实例对象的非静态同步方法因为跟该实例对象的非静态同步方法用的是不同的锁,所以无须等待该实例对象已获取锁的非静态同步方法释放锁就可以获取他们自己的锁。
  4. 所有的静态同步方法用的也是同一把锁——类对象本身
    • 一旦一个静态同步方法获取锁后,其他的静态同步方法都必须等待该方法释放锁后才能获取锁,而不管是同一个实例对象的静态同步方法之间,还是不同的实例对象的静态同步方法之间,只要它们同一个类的实例对象!
  5. 当一个线程试图访问同步代码块时,它首先必须得到锁,退出或抛出异常时必须释放锁。

集合类不安全

list不安全

请举例说明list集合类是不安全的 code

public class NotSafeDemo {
    public static void main(String[] args) {
        List<String> list = new ArrayList<>(); // ArrayList线程不安全

        for (int i = 0; i < 10; i++) {
            new Thread(() -> {
                // method
                list.add(UUID.randomUUID().toString().substring(0,8));
                System.out.println(list);
            }, String.valueOf(i)).start();
}}}
运行结果:⚠️ java.util.ConcurrentModificationException
[139dc871, 59b54246, c2feb94e]
[139dc871, 59b54246, c2feb94e, 022e001a, 7038fae8, d2f4a59e, d1458001]
[139dc871, 59b54246, c2feb94e, 022e001a, 7038fae8, d2f4a59e, d1458001, 7ea4371f, 7690e39c, ac2b872f]
[139dc871, 59b54246, c2feb94e, 022e001a, 7038fae8, d2f4a59e]
Exception in thread "6" [139dc871, 59b54246, c2feb94e]
Exception in thread "9" [139dc871, 59b54246, c2feb94e, 022e001a, 7038fae8]
[139dc871, 59b54246, c2feb94e, 022e001a]java.util.ConcurrentModificationException
[139dc871, 59b54246, c2feb94e]
	at java.base/java.util.ArrayList$Itr.checkForComodification(ArrayList.java:1042)
	at java.base/java.util.ArrayList$Itr.next(ArrayList.java:996)
	at java.base/java.util.AbstractCollection.toString(AbstractCollection.java:472)
	at java.base/java.lang.String.valueOf(String.java:2951)
	at java.base/java.io.PrintStream.println(PrintStream.java:897)
	at main.java.com.silince.juc.NotSafeDemo.lambda$main$0(NotSafeDemo.java:22)
	at java.base/java.lang.Thread.run(Thread.java:834)

分析

1)故障现象

java.util.ConcurrentModificationException

2)导致原因

ArrayList在迭代的时候如果同时对其进行修改就会抛出java.util.ConcurrentModificationException(并发修改异常)。

3)解决方案

  1. 把ArrayList改为Vector

  2. 小数据量时候,使用Collections工具类

    // 解决方案
    List<Object> list = Collections.synchronizedList(new ArrayList<>());
    Map<String, String> map = Collections.synchronizedMap(new HashMap<String, String>());
    Set<Object> set = Collections.synchronizedSet(new HashSet<>());
    
  3. 使用java.util.concurrent.CopyOnWriteArrayList ⚠️推荐,底层用的是lock锁

    // CopyOnWriteArrayList 底层用的是lock锁
    List<String> list = new CopyOnWriteArrayList<>();  
    

    原理:写时复制,读写分离。 🤔

    CopyOnWrite容器即写时复制的容器。往一个容器添加元素的时候,不直接往当前容器Object[]添加,而是先将当前容器Object[]进行Copy,复制出一个新的容器Object[] newElements,然后向新的容器Object[] newElements里添加元素。添加元素后,再将原容器的引用指向新的容器setArray(newElements)。这样做的好处是可以对CopyOnWrite容器进行并发的读,而不需要加锁,因为当前容器不会添加任何元素。所以CopyOnWrite容器也是一种读写分离的思想,读和写不同的容器。

    // CopyOnWriteArrayList
    public class CopyOnWriteArrayList<E>
        implements List<E>, RandomAccess, Cloneable, java.io.Serializable {
        private static final long serialVersionUID = 8673264195747942595L;
       
        /** The lock protecting all mutators */
        final transient ReentrantLock lock = new ReentrantLock();
       
        /** The array, accessed only via getArray/setArray. */
        private transient volatile Object[] array; // ⚠️ 底层是一个Object类的数组
      	...
        // add方法底层源码
        public boolean add(E e) {
          final ReentrantLock lock = this.lock;
          lock.lock(); // 锁定资源类
          try {
            Object[] elements = getArray();// 先获取一份原来的数据,之后再开始添加
            int len = elements.length;
            Object[] newElements = Arrays.copyOf(elements, len + 1); // ⚠️扩容方式 Arrays.copyOf(),每次扩 1 个
            newElements[len] = e; 
            setArray(newElements); // 添加完成后替换原来的Object[]
            return true;
          } finally {
            lock.unlock();
          }
        }
    }
    

set不安全

Set set = new HashSet();//线程不安全 
Set set = new CopyOnWriteArraySet();//线程安全 

HashSet底层数据结构是什么?

ANSWER:HashMap。 但HashSet的add是放一个值,而HashMap是放K、V键值对。

// 源码解析
// HashSet底层就是HashMap
public HashSet() {    
  map = new HashMap();
} 
// ⚠️ HashSet的add()方法调的就是HashMap的put()方法,只是value默认为上面定义的常量Object
private static final Object PRESENT = new Object();
public boolean add(E e) {    return map.put(e, PRESENT)==null;}

map不安全 🤔

Map map = new HashMap();//线程不安全 
Map map = new ConcurrentHashMap();//线程安全 ⚠️ 换了个命名方式	

java8中HashMap底层数据结构是什么?

  • node类型的数组 + node类型的单向链表(尾插法) + 红黑树(链表长度>8时变为红黑树)

  • HashMap的初始长度为16,负载因子为0.75;也可以自己设定。

    • 长度超过16*0.75=12的时候扩容
    • ArrayList扩容原长度的一半,HashMap扩容至原来的两倍
    • 优化:把初始值设的大一点

    image-20200831205042601

JUC强大的辅助类讲解

CountDownLatch减少计数

例子(做减法):6个同学各上各的自习,中间没有交互。都走完了班长才能关门 code

  • CountDownLatch主要有两个方法,当一个或多个线程调用await方法时,这些线程会阻塞。
  • 其它线程调用countDown方法会将计数器减1(调用countDown方法的线程不会阻塞),
  • 当计数器的值变为0时,因await方法阻塞的线程会被唤醒,继续执行。

CyclicBarrier循环栅栏

例子(做加法):集齐七颗龙珠就能召唤神龙 code

  • CyclicBarrie的字面意思是可循环(Cyclic)使用的屏障(Barrier)。它要做的事情是,让一组线程到达一个屏障(也可以叫同步点)时被阻塞, 直到最后一个线程到达屏障时,屏障才会开门,所有被屏障拦截的线程才会继续干活。
  • 线程进入屏障通过CyclicBarrier的await()方法。

Semaphore信号量

例子:抢车位,7个车线程抢4个车位,然后持有3秒,走一个进一个直到每个车都抢到一次。 code

在信号量上我们定义两种操作:

  • acquire(获取) 当一个线程调用acquire操作时,它要么通过成功获取信号量(信号量减1),要么一直等下去,直到有线程释放信号量,或超时。

  • release(释放)实际上会将信号量的值加1,然后唤醒等待的线程。
  • 信号量主要用于两个目的,一个是用于多个共享资源的互斥使用,另一个用于并发线程数的控制。

LockSupport

用两个线程,一个输出字母,一个输出数字,交替输出1A2B3C…26Z code

LockSupport 和 CAS 是Java并发包中很多并发工具控制机制的基础,它们底层其实都是依赖Unsafe实现。

LockSupport类的核心方法其实就两个:

  • park() 方法用来阻塞当前调用线程。
  • unpark() 方法用于唤醒指定线程。 ⚠️ 可以先唤醒,那么下次的park()就不阻塞;notify()则不行。

这其实和Object类的wait()和notify()方法有些类似,但是LockSupport的这两种方法从语意上讲比Object类的方法更清晰,而且可以针对指定线程进行阻塞和唤醒。

读写锁ReentrantReadWriteLock

源码分析:https://blog.csdn.net/fxkcsdn/article/details/82217760

多个线程同时读一个资源类没有任何问题,所以为了满足并发量,读取共享资源应该可以同时进行。但是如果有一个线程想去写共享资源,就不应该再有其它线程可以对该资源进行读或写。 code

总结:

  • 上锁和释放锁都是通过AQS来实现的

  • 线程进入读锁的前提条件:
    • 没有其他线程的写锁
    • 没有写请求或者有写请求,但调用线程和持有锁的线程是同一个
  • 线程进入写锁的前提条件:
    • 没有其他线程的读锁
    • 没有其他线程的写锁
  • 而读写锁有以下三个重要的特性:
    • 公平选择性:支持非公平(默认)和公平的锁获取方式,吞吐量还是非公平优于公平。
      • 如果新来的线程必须先入队才能获取锁就是公平的,否则就是非公平的。
    • 读锁和写锁都支持线程重进入。可重入就是说某个线程已经获得某个锁,可以再次获取锁而不会出现死锁
    • 锁降级:遵循获取写锁、获取读锁再释放写锁的次序,写锁能够降级成为读锁。

阻塞队列BlockingQueue

阻塞队列是一个队列,在数据结构中起的作用如下图:

image-20200901130348078

  • 线程1往阻塞队列里添加元素,线程2从阻塞队列里移除元素
  • 试图从空的队列中获取元素的线程将会被阻塞,直到其他线程往空的队列插入新的元素
  • 试图向已满的队列中添加新元素的线程将会被阻塞,直到其他线程从队列中移除一个或多个元素或者完全清空,使队列变得空闲起来并后续新增

阻塞队列的用处

在多线程领域:所谓阻塞,在某些情况下会挂起线程(即阻塞),一旦条件满足,被挂起的线程又会自动被唤起。

好处是我们不需要关心什么时候需要阻塞线程,什么时候需要唤醒线程,因为这一切BlockingQueue都给你一手包办了。在concurrent包发布以前,在多线程环境下,我们每个程序员都必须去自己控制这些细节,尤其还要兼顾效率和线程安全,而这会给我们的程序带来不小的复杂度。

架构梳理、种类分析

  • ArrayBlockingQueue:由数组结构组成的有界阻塞队列。
  • LinkedBlockingQueue:由链表结构组成的有界(但大小默认值为integer.MAX_VALUE)阻塞队列。
  • PriorityBlockingQueue:支持优先级排序的无界阻塞队列。
  • DelayQueue:使用优先级队列实现的延迟无界阻塞队列。
  • SynchronousQueue:不存储元素的阻塞队列,也即单个元素的队列。
  • LinkedTransferQueue:由链表组成的无界阻塞队列。
  • LinkedBlockingDeque:由链表组成的双向阻塞队列。

BlockingQueue核心方法

image-20200901131722246

抛出异常

  • 当阻塞队列满时,再往队列里add插入元素会抛IllegalStateException:Queue full
  • 当阻塞队列空时,再往队列里remove移除元素会抛NoSuchElementException

特殊值

  • 插入方法,成功ture失败false
  • 移除方法,成功返回出队列的元素,队列里没有就返回null

一直阻塞

  • 当阻塞队列满时,生产者线程继续往队列里put元素,队列会一直阻塞生产者线程直到put数据或响应中断退出
  • 当阻塞队列空时,消费者线程试图从队列里take元素,队列会一直阻塞消费者线程直到队列可用

超时退出

  • 当阻塞队列满时,队列会阻塞生产者线程一定时间,超过限时后生产者线程会退出

ThreadPool线程池 🤔

线程池 = 一堆线程+ 一个等待队列

为什么用线程池?

例子:10年前单核CPU电脑,假的多线程,像马戏团小丑玩多个球,CPU需要来回切换。现在是多核电脑,多个线程各自跑在独立的CPU上,不用切换效率高。 code

线程池的优势:

线程池做的工作只要是控制运行的线程数量,处理过程中将任务放入队列,然后在线程创建后启动这些任务,如果线程数量超过了最大数量,超出数量的线程排队等候,等其他线程执行完毕,再从队列中取出任务来执行。

它的主要特点为:线程复用;控制最大并发数;管理线程。

第一:降低资源消耗。通过重复利用已创建的线程降低线程创建和销毁造成的销耗。

第二:提高响应速度。当任务到达时,任务可以不需要等待线程创建就能立即执行。

第三:提高线程的可管理性。线程是稀缺资源,如果无限制的创建,不仅会销耗系统资源,还会降低系统的稳定性,使用线程池可以进行统一的分配,调优和监控。

线程池如何使用

Java中的线程池是通过Executor框架实现的,该框架中用到了Executor,Executors(工具类),ExecutorService,ThreadPoolExecutor这几个类

image-20200901144207635

  • Executors.newFixedThreadPool(int)
    • 执行长期任务性能好,创建一个线程池,一池有N个固定的线程,有固定线程数的线程
  • Executors.newSingleThreadExecutor()
    • 一个任务一个任务的执行,一池一线程
  • Executors.newCachedThreadPool()
    • 执行很多短期异步任务,线程池根据需要创建新线程,但在先前构建的线程可用时将重用它们。可扩容,遇强则强。
  • Executors.newScheduledThread(int corePoolSize)
    • 返回的是ScheduledExecutorService 对象,可以指定线程数量。和FixedXxx 不同的是,该对象可以根据时间需要对线程进行调度
  • Executors.SingleThreadScheduledExecutor()
    • 和上述类似,只不过单池单线程,依旧可以进行时间调度;(在某个固定的延时过后进行执行,或者周期性执行)

线程池底层工作原理

线程池的底层实现

newFixedThreadPool(int)、newSingleThreadExecutor()、newCachedThreadPool()底层实现都是newFixedThreadPool。

image-20200901152207257

线程池几个重要参数

  1. corePoolSize:线程池中的常驻核心线程数
  2. maximumPoolSize:线程池中能够容纳同时执行的最大线程数,此值必须大于等于1
    • ⚠️ CPU密集型:设为 CPU内核数 + 1;Runtime.getRuntime().availableProcessors()+1
    • IO密集型:设为CPU核数/阻塞系数
  3. keepAliveTime:多余的空闲线程的存活时间当前池中线程数量超过corePoolSize时,当空闲时间达到keepAliveTime时,多余线程会被销毁直到只剩下corePoolSize个线程为止
  4. unit:keepAliveTime的单位
  5. workQueue:任务队列,被提交但尚未被执行的任务
  6. threadFactory:表示生成线程池中工作线程的线程工厂,用于创建线程,一般默认的即可
  7. handler:拒绝策略,表示当队列满了,并且工作线程大于等于线程池的最大线程数(maximumPoolSize)时如何来拒绝请求执行的runnable的策略
// ThreadPoolExecutor 构造方法    
public ThreadPoolExecutor(int corePoolSize,
                              int maximumPoolSize,
                              long keepAliveTime,
                              TimeUnit unit,
                              BlockingQueue<Runnable> workQueue,
                              ThreadFactory threadFactory,
                              RejectedExecutionHandler handler) {
        if (corePoolSize < 0 ||
            maximumPoolSize <= 0 ||
            maximumPoolSize < corePoolSize ||
            keepAliveTime < 0)
            throw new IllegalArgumentException();
        if (workQueue == null || threadFactory == null || handler == null)
            throw new NullPointerException();
        this.acc = System.getSecurityManager() == null ?
                null :
                AccessController.getContext();
        this.corePoolSize = corePoolSize;
        this.maximumPoolSize = maximumPoolSize;
        this.workQueue = workQueue;
        this.keepAliveTime = unit.toNanos(keepAliveTime);
        this.threadFactory = threadFactory;
        this.handler = handler;
    }

底层原理 🤔

  1. 在创建了线程池后,开始等待请求。
  2. 当调用execute()方法添加一个请求任务时,线程池会做出如下判断:
    • 如果正在运行的线程数量小于corePoolSize,那么马上创建线程运行这个任务;
    • 如果正在运行的线程数量大于或等于corePoolSize,那么将这个任务放入队列;
    • 如果这个时候队列满了且正在运行的线程数量还小于maximumPoolSize,那么还是要创建非核心线程立刻运行这个任务;
    • 如果队列满了且正在运行的线程数量大于或等于maximumPoolSize,那么线程池会启动饱和拒绝策略来执行。
  3. 当一个线程完成任务时,它会从队列中取下一个任务来执行。
  4. 当一个线程无事可做超过一定的时间(keepAliveTime)时,线程会判断:
    • 如果当前运行的线程数大于corePoolSize,那么这个线程就被停掉。
    • 所以线程池的所有任务完成后,它最终会收缩到corePoolSize的大小

image-20200901164445912

image-20200901164926612

线程池用哪个?生产中如设置合理参数 🤔

线程池的拒绝策略

  • AbortPolicy(默认):直接抛出RejectedExecutionException异常阻止系统正常运行
  • CallerRunsPolicy:“调用者运行”一种调节机制,该策略既不会抛弃任务,也不会抛出异常,而是将某些任务回退到调用者(比如main线程),从而降低新任务的流量。
  • DiscardOldestPolicy抛弃队列中等待最久的任务,然后把当前任务加人队列中尝试再次提交当前任务。
  • DiscardPolicy:该策略偷偷地丢弃无法处理的任务,不予任何处理也不抛出异常。如果允许任务丢失,这是最好的一种策略。

在工作中单一的/固定数的/可变的三种创建线程池的方法哪个用的多?超级大坑 🤔

ASNSWER:答案是一个都不用,我们工作中只能使用自定义的.

image-20200901172248771

newFixedThreadPool()的详细说明: 可重用固定个数的线程池,当当前线程数大于总线程数时会进行等待,等待线程池内的线程执行完,相对来说占用内存少,因为如果等待线程数量过多,也是相对耗废资源的。

// newFixedThreadPool 构造方法
public static ExecutorService newFixedThreadPool(int nThreads) {
        return new ThreadPoolExecutor(nThreads, nThreads,
                                      0L, TimeUnit.MILLISECONDS,
                                      new LinkedBlockingQueue<Runnable>());
    }
// 其中new的阻塞队列, ⚠️ 默认允许的请求队列长度为 Integer.MAX_VALUE!
public LinkedBlockingQueue() {
        this(Integer.MAX_VALUE);
    }   

在工作中如何使用线程池,是否自定义过线程池

public class MyThreadPoolDemo2 {
    public static void main(String[] args) {
        ExecutorService threadPool = new ThreadPoolExecutor(2,
                Runtime.getRuntime().availableProcessors()+1, // ⚠️ CPU密集型:CPU内核数 + 1;IO密集型:CPU核数/阻塞系数
                2L, TimeUnit.SECONDS,
                new LinkedBlockingQueue<>(3), //阻塞队列长度为3,默认的是Integer.MAX_VALUE
                Executors.defaultThreadFactory(),
                new ThreadPoolExecutor.AbortPolicy());

        try {
            // ⚠️ 可执行最大线程数= maximumPoolSize + capacity
            for (int i = 0; i < 9; i++) {
                threadPool.execute(() -> {
                    System.out.println(Thread.currentThread().getName() + "\t办理业务");
                });
            }
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            threadPool.shutdown();
        }
    }
}

Error:java.util.concurrent.ThreadPoolExecutor$AbortPolicy.rejectedExecution(ThreadPoolExecutor.java:2063)

分支合并框架

原理:

  • Fork:把一个复杂任务进行分拆,大事化小

  • Join:把分拆任务的结果进行合并

相关类

  • ForkJoinPool:分支合并池 类比=> 线程池
  • ForkJoinTask:ForkJoinTask 类比=> FutureTask
  • RecursiveTask:递归任务:继承后可以实现递归(自己调自己)调用的任务

例子

// 实现1+2+...+100 的分治计算
public class ForkJoinDemo {
    public static void main(String[] args) throws Exception {
        MyTask myTask = new MyTask(0, 100);
        ForkJoinPool threadPool = new ForkJoinPool();

        // 向池子递交任务
        ForkJoinTask<Integer> forkJoinTask = threadPool.submit(myTask);
        System.out.println(forkJoinTask.get());

        threadPool.shutdown();
    }
}

class MyTask extends RecursiveTask<Integer> {
    private static final Integer ADJUST_VALUE = 10;
    private int begin;
    private int end;
    private int result;

    public MyTask(int begin, int end) {
        this.begin = begin;
        this.end = end;
    }

    @Override
    protected Integer compute() {
        // 计算小时不触发分治
        if ((end - begin) <= ADJUST_VALUE) {
            for (int i = begin; i <= end; i++) {
                result = result + i;
            }
        } else {
            int middle = (end + begin) / 2;
            MyTask task01 = new MyTask(begin, middle);
            MyTask task02 = new MyTask(middle + 1, end);
            task01.fork(); // 调完fork还是回来调用compute()
            task02.fork();
            result = task01.join() + task02.join();
        }
        return result;
    }
}

异步回调

原理

同步、异步、异步回调

public class CompletableFutureDemo {
    public static void main(String[] args) throws ExecutionException, InterruptedException {
        // 异步回调 无返回值
        CompletableFuture<Void> completableFuture = CompletableFuture.runAsync(() -> {
            System.out.println(Thread.currentThread().getName() + "没有返回, update mysql ok");
        });
        System.out.println(completableFuture.get());

        // 异步回调 有返回值
        CompletableFuture<Integer> completableFuture1 = CompletableFuture.supplyAsync(() -> {
            System.out.println(Thread.currentThread().getName() + "有返回, insert mysql ok");
            int age=10/0;
            return 1024;
        });
        completableFuture1.whenComplete((t,u)->{
            System.out.println("***t: "+t);
            System.out.println("***u: "+u);
        }).exceptionally(f->{  //异常分支
            System.out.println("***exception: "+f.getMessage());
            return 4444;
        }).get();
    }
}

正常回调结果:

ForkJoinPool.commonPool-worker-1没有返回, update mysql ok null ForkJoinPool.commonPool-worker-1有返回, insert mysql ok **t: 1024 **u: null

异常回调结果:

ForkJoinPool.commonPool-worker-1没有返回, update mysql ok null ForkJoinPool.commonPool-worker-1有返回, insert mysql ok **t: null **u: java.util.concurrent.CompletionException: java.lang.ArithmeticException: / by zero ***exception: java.lang.ArithmeticException: / by zero