多线程高级JUC


0、只要是并发编程,一定要加锁

一、概念

1、什么是JUC

在 Java 5.0 提供了 java.util.concurrent (简称JUC )包,在此包中增加了在并发编程中很常用的实用工具类,用于定义类似于线程的自定义子系统,包括线程池、异步 IO 和轻量级任务框架。提供可调的、灵活的线程池。还提供了设计用于多线程上下文中的 Collection 实现等。

2、什么是进程、线程

进程:一个程序,通常包含多个线程,至少包含一个!

java默认有2个线程:main,gc

线程:对java而言,Thread,Runnable,Callable

Java实际上无法开启线程,因为java是运行在虚拟机上的,无法直接操作硬件。是通过调用native修饰(C或C++)的本地方法。

3、并发、并行

并发:多线程运行在同一个CPU内,CPU不停做上下文切换执行任务,单位时间内只有一个线程在运行;

并行:多线程运行在多核心CPU内,同时执行任务;

4、Java的多线程是并发还是并行:

当多线程在跑时,所有的CPU核心都有负载,也就是说,线程确实是并行的。解释一下,现代操作系统是将线程最为最小的调度单位(由内核负责管理),进程作为资源分配的最小单位。由于进程是不活动的,只是纯粹作为存储线程的容器。也就是说,Java线程尽管被包裹在JVM进程内部,但是CPU调度的是进程中的线程。

5、线程状态

public enum State {
    /**
    * 新生
    */
    NEW,

    /**
    * 运行
    */
    RUNNABLE,

    /**
    * 阻塞
    */
    BLOCKED,

    /**
    * 等待,死死地等
    */
    WAITING,

    /**
    * 超时等待
    */
    TIMED_WAITING,

    /**
    * 终止
    */
    TERMINATED;
}

6、Lambda 表达式如何操作局部变量

  1. 使用实例变量或静态变量是没有限制的(可认为是通过 final 类型的局部变量 this 来引用前两者)
  2. 使用局部变量必须显式的声明为 final 或实际效果的的 final 类型,即该变量从未被改变过,可用一个 final 修饰的中间变量。

注:闭包,jdk1.8可以不写final,但是其实是默认有的,jvm会加上去

7、可能有用的方法

//查看系统的CPU核数
System.out.println(Runtime.getRuntime().availableProcessors());

//线程状态
String s = Thread.State.NEW.toString();

//线程休眠1秒
Thread.sleep(1000);
TimeUnit.SECONDS.sleep(1);  //企业常用

二、创建线程的三种方式

1、继承Thread类,重写run()方法:

①、定义类继承Thread;

②、复写T=hread类中的run方法;

  目的:将自定义代码存储在run方法,让线程运行

③、调用线程的start方法:

  该方法有两步:启动线程,调用run方法。

2、实现Runnable接口,实现接口run()方法:

接口应该由那些打算通过某一线程执行其实例的类来实现。类必须定义一个称为run 的无参方法。

①、定义类实现Runnable接口

②、覆盖Runnable接口中的run方法

  将线程要运行的代码放在该run方法中。

③、通过Thread类建立线程对象。

④、将Runnable接口的子类对象作为实际参数传递给Thread类的构造函数。

  自定义的run方法所属的对象是Runnable接口的子类对象。所以要让线程执行指定对象的run方法就要先明确run方法所属对象

⑤、调用Thread类的start方法开启线程并调用Runnable接口子类的run方法。

3、实现Callable接口,实现call()方法:

①、创建Callable接口的实现类,并实现call()方法,改方法将作为线程执行体,且具有返回值

②、创建Callable实现类的实例,使用FutrueTask类进行包装Callable对象,FutureTask对象封装了Callable对象的call()方法的返回值

③、使用FutureTask对象作为Thread对象启动新线程。

④、调用FutureTask对象的get()方法获取子线程执行结束后的返回值。

public class CallableTest {
    public static void main(String[] args) throws ExecutionException, InterruptedException {
        // new Thread(new Runnable()).start();
        // new Thread(new FutureTask<V>()).start();
        // new Thread(new FutureTask<V>( Callable )).start();

        MyThread thread = new MyThread();
        FutureTask<Integer> futureTask = new FutureTask<>(thread); // 适配类

        new Thread(futureTask,"A").start();
        new Thread(futureTask,"B").start(); // 结果会被缓存,效率高

        //获取方法返回结果
        //这个get 方法可能会产生阻塞!把他放到最后
        // 或者使用异步通信来处理!
        Integer o = futureTask.get(); 
        System.out.println(o);

    }
}

//该泛型为返回值类型
class MyThread implements Callable<Integer> {

    @Override
    public Integer call() {
        System.out.println("call()"); // 会打印几个call
        // 耗时的操作
        return 1024;
    }

}

三种方式区别

主要是单继承局限性、有无返回值、可否抛出异常3点上

(1)继承Thread:线程代码存放在Thread子类run方法中。

优势:编写简单,可直接用this.getname()获取当前线程,不必使用Thread.currentThread()方法。

劣势:已经继承了Thread类,无法再继承其他类。

(2)实现Runnable:线程代码存放在接口的子类的run方法中。

优势:避免了单继承的局限性、多个线程可以共享一个target对象,非常适合多线程处理同一份资源的情形。

劣势:比较复杂、访问线程必须使用Thread.currentThread()方法、无返回值。(其实runnable 也可以有返回值,通过构造方法传进去,等待线程结束。)

(3)实现Callable

优势:有返回值(用来判断线程是否已经执行完毕或者取消线程执行)、避免了单继承的局限性、多个线程可以共享一个target对象,非常适合多线程处理同一份资源的情形。

劣势:比较复杂、访问线程必须使用Thread.currentThread()方法

(4)异常

Callable()的call()方法可以抛出异常(在调用类中用try/catch捕获),而Runnable()的run()方法不可以

  建议使用实现接口的方式创建多线程。

三、常用辅助类(必会)

1、CountDownLatch(减法计数器)

每次有线程调用 countDown() 数量-1,假设计数器变为0,countDownLatch.await() 就会被唤醒,继续 执行!

指定线程执行完毕,在执行操作

// 计数器
// 场景:等6个人全部出教室在关门。
public class CountDownLatchDemo {
    public static void main(String[] args) throws InterruptedException {
        // 总数是6,必须要执行任务的时候,再使用!
        CountDownLatch countDownLatch = new CountDownLatch(6);

        for (int i = 1; i <=6 ; i++) {
            new Thread(()->{
                System.out.println(Thread.currentThread().getName()+" Go out");
                countDownLatch.countDown(); // 数量-1
            },String.valueOf(i)).start();
        }

        countDownLatch.await(); // 等待计数器归零,然后再向下执行

        System.out.println("Close Door");

    }
}
1 Go out
6 Go out
3 Go out
5 Go out
4 Go out
2 Go out
Close Door

CountDownLatch原理

CountDownLatch构造函数:CountDownLatch(int count);

  • 构造器中计数值(count)就是闭锁需要等待的线程数量,这个值只能被设置一次。

CountDownLatch类的方法:

  • void await():使当前线程在锁存器倒计数至零之前一直等待,除非线程被中断。
  • boolean await(long timeout, TimeUnit unit):使当前线程在锁存器倒计数至零之前一直等待,除非线程被中断或超出了指定的等待时间。
  • void countDown():递减锁存器的计数,如果计数到达零,则释放所有等待的线程。
  • long getCount():返回当前计数。
  • String toString():返回标识此锁存器及其状态的字符串。

与CountDownLatch第一次交互是主线程等待其它的线程,主线程必须在启动其它线程后立即调用await方法,这样主线程的操作就会在这个方法上阻塞,直到其他线程完成各自的任务。

其他的N个线程必须引用闭锁对象,因为他们需要通知 CountDownLatch 对象,他们已经完成了各自的任务,这种机制就是通过 countDown() 方法来完成的。每调用一次这个方法,在构造函数中初始化的count值就减1,所以当N个线程都调用了这个方法count的值等于0,然后主线程就能通过await方法,恢复自己的任务。

2、CyclicBarrier(加法计数器)

当计数器加到指定值时,开启指定线程。

每次线程执行到 cyclicBarrier.await(); 时,如果没累加到指定数值,该线程就会等待,直到累加到指定数值才会继续往下执行。

执行达到指定线程数在执行操作

public class CyclicBarrierDemo {
    public static void main(String[] args) {
        /**
         * 集齐7颗龙珠召唤神龙
         */
        // 召唤龙珠的线程,当计数器累加到7时执行
        CyclicBarrier cyclicBarrier = new CyclicBarrier(7,()->{
            System.out.println("召唤神龙成功!");
        });

        for (int i = 1; i <=7 ; i++) {
            final int temp = i;
            // lambda能操作到 i 吗
            new Thread(()->{
                System.out.println(Thread.currentThread().getName()+"收集"+temp+"个龙珠");
                try {
                    cyclicBarrier.await(); // 等待,计数器+1
                    System.out.println("召唤成功"+temp);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                } catch (BrokenBarrierException e) {
                    e.printStackTrace();
                }
            }).start();
        }
    }
}
Thread-1收集2个龙珠
Thread-2收集3个龙珠
Thread-4收集5个龙珠
Thread-0收集1个龙珠
Thread-5收集6个龙珠
Thread-3收集4个龙珠
Thread-6收集7个龙珠
召唤神龙成功!
召唤成功7
召唤成功5
召唤成功2
召唤成功3
召唤成功4
召唤成功6
召唤成功1

3、Semaphore(信号量)

限流,只允许指定个数的线程同时运行

相当于令牌桶,指定桶里有几个令牌,使用完放回

原理
semaphore.acquire() 获得,假设如果已经满了,等待,等待被释放为止!

semaphore.release(); 释放,会将当前的信号量释放 + 1,然后唤醒等待的线程!
作用: 多个共享资源互斥的使用!并发限流,控制大的线程数!

// 案例:抢车位,6辆车抢3个车位
public class SemaphoreDemo {
    public static void main(String[] args) {
        // 线程数量:停车位! 限流!
        Semaphore semaphore = new Semaphore(3);

        for (int i = 1; i <=6 ; i++) {
            new Thread(()->{
                // acquire() 得到
                try {
                    semaphore.acquire();
                    System.out.println(Thread.currentThread().getName()+"抢到车位");
                    TimeUnit.SECONDS.sleep(2);
                    System.out.println(Thread.currentThread().getName()+"离开车位");
                } catch (InterruptedException e) {
                    e.printStackTrace();
                } finally {
                    semaphore.release(); // release() 释放
                }

            },String.valueOf(i)).start();
        }

    }
}
1抢到车位
3抢到车位
4抢到车位
4离开车位
1离开车位
5抢到车位
3离开车位
2抢到车位
6抢到车位
6离开车位
5离开车位
2离开车位

三、锁

1、synchronized

  • 在修饰非静态方法时,锁的对象是方法的调用者。修饰静态方法时,锁的是类(class)。

  • 虽然锁的是对象,但是不影响普通方法(没加Synchronized)的执行。

  • 当同时锁静态和非静态方法时,其实一个锁的是对象,一个是类,所以2个方法可以同时执行。

  • 谁先获得谁先执行。

  • 可用于修饰方法,也可用作同步代码块synchronized{}

/**
 * 经典多线程卖票问题
 * 真正的多线程开发,公司中的开发,降低耦合性
 * 线程就是一个单独的资源类,没有任何附属的操作!
 * 1、 属性、方法
 */
public class SaleTicketDemo01 {
    public static void main(String[] args) {
        // 并发:多线程操作同一个资源类, 把资源类丢入线程
        Ticket ticket = new Ticket();

        // @FunctionalInterface 函数式接口,jdk1.8  lambda表达式 (参数)->{ 代码 }
        new Thread(()->{
            for (int i = 1; i < 40 ; i++) {
                ticket.sale();
            }
        },"A").start();

        new Thread(()->{
            for (int i = 1; i < 40 ; i++) {
                ticket.sale();
            }
        },"B").start();

        new Thread(()->{
            for (int i = 1; i < 40 ; i++) {
                ticket.sale();
            }
        },"C").start();


    }
}

// 资源类 OOP
class Ticket {
    // 属性、方法
    private int number = 30;

    // 卖票的方式
    // synchronized 本质: 队列,锁
    public synchronized void sale(){
        if (number>0){
            System.out.println(Thread.currentThread().getName()+"卖出了第"+(number--)+"票,剩余:"+number);
        }
    }

}

2、Lock接口

实现类:

(1)ReentrantLock()

(2)ReentrantReadWriteLock.ReadLock

(3)ReentrantReadWriteLock.WriteLock

ReentrantReadWriteLock:实现 ReadWriteLock 接口

ReadLockWriteLock :是内部类,实现 Lock 接口

ReentrantReadWriteLock readWriteLock = new ReentrantReadWriteLock();
ReentrantReadWriteLock.ReadLock readLock = readWriteLock.readLock();
ReentrantReadWriteLock.WriteLock writeLock = readWriteLock.writeLock();

方法

//1、获取锁,
//如果锁不可用,则当前线程将被禁用以进行线程调度,并处于休眠状态,直到获取锁。
void lock();

//2、解锁
void unlock();

//3、只有在调用时才可以获得锁。
//如果可用,则获取锁,并立即返回值为true 。 如果锁不可用,则此方法将立即返回值为false 。
boolean tryLock();

//4、如果锁可用,此方法将立即返回值为true 。 
//如果锁不可用,则当前线程将被禁用以进行线程调度,并且处于休眠状态,直至发生三件事情之一: 
//(1)当前线程获取到锁
//(2)一些其他线程interrupts(中断)当前线程,并且中断锁获取被支持
//(3)指定的等待时间过去了 
boolean tryLock(long time,TimeUnit unit);

//此用法可确保锁定已被取消,如果未获取锁定,则不会尝试解锁。
Lock lock = ...;
if (lock.tryLock()) { 
    try { 
        // manipulate protected state 
    } finally { 
        lock.unlock(); 
    } 
} else { 
    // perform alternative actions 
} 

Lock 发展历程:

无锁 -> 独占锁 -> 读写锁 -> 邮戳锁

下面一一介绍

3、ReentrantLock():可重入锁

相对于synchronized而言,较为灵活,没有结构化代码

继承自Lock接口

使用步骤:

1、 new ReentrantLock();
2、 lock.lock(); // 加锁
3、 finally=> lock.unlock(); // 解锁

public class SaleTicketDemo02  {
    public static void main(String[] args) {

        // 并发:多线程操作同一个资源类, 把资源类丢入线程
        Ticket2 ticket = new Ticket2();

        // @FunctionalInterface 函数式接口,jdk1.8  lambda表达式 (参数)->{ 代码 }
        new Thread(()->{for (int i = 1; i < 40 ; i++) ticket.sale();},"A").start();
        new Thread(()->{for (int i = 1; i < 40 ; i++) ticket.sale();},"B").start();
        new Thread(()->{for (int i = 1; i < 40 ; i++) ticket.sale();},"C").start();
    }
}

// Lock三部曲
// 1、 new ReentrantLock();
// 2、 lock.lock(); // 加锁
// 3、 finally=>  lock.unlock(); // 解锁
class Ticket2 {
    // 属性、方法
    private int number = 30;
    Lock lock = new ReentrantLock();

    public void sale(){

        lock.lock(); // 加锁

        try {
           // 业务代码
            if (number>0){
                System.out.println(Thread.currentThread().getName()+"卖出了"+(number--)+"票,剩余:"+number);
            }
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            lock.unlock(); // 解锁
        }
    }
}

ReentrantLock其他方法参见文档

4、ReentrantReadWriteLock:读写锁

读写锁定义:一个资源能被多个读线程访问,或者一个写线程访问,但不能读写同时存在(读写互斥,读读共享)

只有在读多写少的情况,读写锁才有较高的性能体现

有时候,在多线程中,有一些公共数据修改的机会比较少,而读的机会却是非常多的,此公共数据的操作基本都是读,如果每次操作都给此段代码加锁,太浪费时间了而且也很浪费资源,降低程序的效率,因为读操作不会修改数据,只是做一些查询,所以在读的时候不用给此段代码加锁,可以共享的访问,只有涉及到写的时候,互斥的访问就好了

缺点:

  • 写锁饥饿问题:大量的读操作一直占用锁,写操作长期无法获取到锁
  • 锁降级
/**
 * 案例:模拟redis的key-value缓存
 * 独占锁(写锁) 一次只能被一个线程占有 ;一个线程写,其他线程不能写或读
 * 共享锁(读锁) 多个线程可以同时占有 ;可多个线程一起读,但读线程和写线程不共存
 * ReadWriteLock
 * 读-读  可以共存!
 * 读-写  不能共存!
 * 写-写  不能共存!
 */
public class ReadWriteLockDemo {
    public static void main(String[] args) {
        MyCache myCache = new MyCache();

        // 写入
        for (int i = 1; i <= 5 ; i++) {
            final int temp = i;
            new Thread(()->{
                myCache.put(temp+"",temp+"");
            },String.valueOf(i)).start();
        }

        // 读取
        for (int i = 1; i <= 5 ; i++) {
            final int temp = i;
            new Thread(()->{
                myCache.get(temp+"");
            },String.valueOf(i)).start();
        }

    }
}

// 加锁的
class MyCacheLock{

    private volatile Map<String,Object> map = new HashMap<>();
    // 读写锁: 更加细粒度的控制
    private ReadWriteLock readWriteLock = new ReentrantReadWriteLock();
    private Lock lock = new ReentrantLock();

    // 存,写入的时候,只希望同时只有一个线程写
    public void put(String key,Object value){
        readWriteLock.writeLock().lock();
        try {
            System.out.println(Thread.currentThread().getName()+"写入"+key);
            map.put(key,value);
            System.out.println(Thread.currentThread().getName()+"写入OK");
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            readWriteLock.writeLock().unlock();
        }
    }

    // 取,读,所有人都可以读!
    public void get(String key){
        readWriteLock.readLock().lock();
        try {
            System.out.println(Thread.currentThread().getName()+"读取"+key);
            Object o = map.get(key);
            System.out.println(Thread.currentThread().getName()+"读取OK");
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            readWriteLock.readLock().unlock();
        }
    }

}

/**
 * 不加锁的
 */
class MyCache{

    private volatile Map<String,Object> map = new HashMap<>();

    // 存,写
    public void put(String key,Object value){
        System.out.println(Thread.currentThread().getName()+"写入"+key);
        map.put(key,value);
        System.out.println(Thread.currentThread().getName()+"写入OK");
    }

    // 取,读
    public void get(String key){
        System.out.println(Thread.currentThread().getName()+"读取"+key);
        Object o = map.get(key);
        System.out.println(Thread.currentThread().getName()+"读取OK");
    }

}

锁降级

ReentrantReadWriteLock的锁降级:写锁降级为读锁

ReentrantReadWriteLock重入:该锁是可重入锁,以读写线程为例,读线程在获取读锁之后能再次获取读锁;写线程在获取写锁后能再次获取写锁,也能获取读锁

锁降级:如果同一个线程获取了写锁,然后获取了读锁,再释放了写锁,此时写锁已经降级为读锁了(顺序不能改变)

写锁可以降级为读锁,但是读锁不能升级为写锁

降级是为了让当前线程感知到数据的变化,目的是保证数据可见性。

这个提前释放写锁目的是为了让别的线程可以立刻读吧

ReentrantReadWriteLock lock = new ReentrantReadWriteLock();

lock.writeLock().lock();
System.out.println("写入操作");

lock.readLock().lock();
lock.writeLock().unlock();

System.out.println("读取操作");
lock.readLock().unlock();

不存在锁升级,以下代码发生死锁

ReentrantReadWriteLock lock = new ReentrantReadWriteLock();

lock.readLock().lock();
lock.writeLock().lock();

lock.readLock().unlock();
lock.writeLock().unlock();

5、StampedLock邮戳锁

StampedLock (邮戳锁,也叫票据锁)是jdk8新增的一个读写锁,也是对 ReentrantReadWriteLock 的优化。

stamp (戳记,long类型),代表锁的状态,当 stamp 返回 0 时,表示线程获取锁失败。并且当释放锁或转换锁的时候,都要传入最初获取的 stamp 值。

邮戳锁是为解决读写锁的锁饥饿问题而引出的。

读写锁的锁饥饿问题:读操作较多时,很难获取写锁;如999个读操作,1个写操作,那么这个写线程几乎抢不到写锁。虽然使用公平锁可以一定程度上缓解锁饥饿问题,但是这是以降低吞吐量为代价。

邮戳锁相对于读写锁的优化:

ReentrantReadWriteLock 读锁被占用时,其他线程获取写锁时会被阻塞。

StampedLock 采取乐观获取读锁,其他线程在获取写锁时不会被阻塞,但在读锁结束前需要进行结果的校验,判断期间是否有写操作改变了目标值。

乐观读在短的只读代码中,可以减少争用,提高吞吐量。

使用

  • 所有获取锁的方法都返回一个邮戳(stamp),stamp 为 0,表示获取失败,其余表示成功
  • 所有释放锁的方法都返回一个邮戳(stamp),这个 stamp 必须和成功获取锁时得到的stamp一致;

缺点

  • StampedLock 是不可重入锁,如果持有写锁时,再去获取写锁,则会造成死锁。
  • 悲观读和写锁都不支持条件变量 (Condition)
  • 使用 StampedLock 一定不要调用中断操作,即 interrupt() 方法,影响性能,且会产生奇怪的bug

三种访问模式:

  • 悲观读:同读写锁
  • 写模式:同读写锁
  • 乐观读:采用无锁机制,支持读写并发,乐观的认为读的过程中没人修改,假如被修改了再实现升级为悲观读模式
public class StampedLockTest {

    static int num = 0;

    StampedLock lock  = new StampedLock();

    /**
     * 悲观读
     */
    public void read(){
        long stamp = lock.readLock();

        try {
            System.out.println(Thread.currentThread().getName() + "悲观读:" + num);
            TimeUnit.SECONDS.sleep(1);
        } catch (InterruptedException e) {
            e.printStackTrace();
        } finally {
            lock.unlockRead(stamp);
        }
    }

    /**
     * 写操作
     */
    public void write(){
        long stamp = lock.writeLock();

        try {
            System.out.println(Thread.currentThread().getName() + "写操作");
            TimeUnit.SECONDS.sleep(1);
            num++;
        } catch (InterruptedException e) {
            e.printStackTrace();
        } finally {
            lock.unlockWrite(stamp);
        }
    }

    /**
     * 乐观读操作
     */
    public void optimsticRead(){
        long stamp = lock.tryOptimisticRead();

        try {
            System.out.println(Thread.currentThread().getName() + "乐观读:" + num);
            TimeUnit.SECONDS.sleep(1);
        } catch (InterruptedException e) {
            e.printStackTrace();
        } finally {
            // 判断是否有人修改过
            if (!lock.validate(stamp)){
                // 升级为悲观读,重新读取
                long stamp2 = lock.readLock();

                //重新读取
                try{
                    System.out.println("重读" + num);
                }finally {
                    lock.unlockRead(stamp2);
                }

            }else{
                lock.unlockRead(stamp);
            }

        }
    }

    /**
     * 验证多个悲观读操作可同时进行
     * 验证悲观读时,写操作无法进行
     */
    @Test
    public void test() throws InterruptedException {
        StampedLockTest test = new StampedLockTest();

        for (int i = 0; i < 10; i++) {
            new Thread(test::read, "read" + i).start();
        }

        TimeUnit.MILLISECONDS.sleep(500);

        for (int i = 0; i < 1; i++) {
            new Thread(test::write, "write" + i).start();
        }

        TimeUnit.SECONDS.sleep(5);
    }

    /**
     * 验证非重入锁:持有写锁,再去获取写锁时,发生死锁
     */
    @Test
    public void test2(){
        long stamp1 = lock.writeLock();

        long stamp2 = lock.writeLock();
    }
}

6、Synchronized 和 Lock 区别

(1)Synchronized 是 java 内置的关键字,Lock 是一个java类

(2)Synchronized 无法判断获取锁的状态,Lock 可以判断是否获取到锁

(3)Synchronized 会自动释放锁,lock 必须手动释放锁!否则会发生死锁

(4)Synchronized 可重入锁,非公平;Lock,可重入锁,可判断锁,默认非公平(可设置是否公平)

(5)Synchronized 适合锁少量的代码同步问题,Lock 适合锁大量的同步代码

7、闭锁

(1)什么是闭锁

闭锁(latch)是一种Synchronizer:是一个对象,它根据本身的状态调节线程的控制流。常见类型的Synchronizer包括信号量、关卡和闭锁)。

闭锁可以延迟线程的进度直到线程线程到达终止状态。一个闭锁工作起来就像是一道大门:直到闭锁达到终点状态之前,门一直是关闭的,没有线程能够通过,在终点状态到来的时候,所有线程都可以通过。

(2)使用场景

闭锁可以用来确保特定活动直到其他的活动都完成后才开始发生,比如:

  1. 确保一个计算不会执行,直到它所需要的资源被初始化
  2. 确保一个服务不会开始,直到它依赖的其他服务都已经开始
  3. 等待,直到活动的所有部分都为继续处理做好充分准备
  4. 死锁检测,可以使用n个线程访问共享资源,在每次测试阶段的线程数目是不同的,并尝试产生死锁

(3)闭锁的实现

CountDownLatch 是一个同步辅助类,存在于 java.util.concurrent 包下,灵活的实现了闭锁,它允许一个或多个线程等待一个事件集的发生。

CountDownLatch 是通过一个计数器来实现的,计数器的初始值为线程的数量。每当一个线程完成了自己的任务后,计数的值就会减1。当计数器值到达0时,它所表示所有的线程已经完成了任务,然后在闭锁上等待的线程就可以恢复执行任务。

四、生产者和消费者问题

0、问题描述

经典的生产者-消费者模式,操作流程是这样的:

有多个生产者,可以并发生产产品,把产品置入队列中,如果队列满了,生产者就会阻塞;

有多个消费者,并发从队列中获取产品,如果队列空了,消费者就会阻塞;

1、Synchronized

注意:判断 number 时,必须用 while 判断,不能用 if 否则在多个线程的情况下会产生虚假唤醒问题。

何为虚假唤醒?

虚假唤醒就是一些obj.wait()会在除了obj.notify()和obj.notifyAll()的其他情况被唤醒,而此时是不应该唤醒的。

虚假唤醒产生原因

Object.wait()当我们运行到这个方法的时候,当前线程会进入等待状态,并释放锁,而当其他线程去唤醒它的时候,它会之后wait()的地方继续开始执行(已跳过 if 判断)。

导致虚假唤醒的原因主要就是一个线程直接在if代码块中被唤醒了,这时它已经跳过了if判断。我们只需要将if判断改为while,这样线程就会被重复判断而不再会跳出判断代码块,从而不会产生虚假唤醒这种情况了。

/**
 * 线程之间的通信问题:生产者和消费者问题!  等待唤醒,通知唤醒
 * 线程交替执行  A   B 操作同一个变量   num = 0
 * A num+1
 * B num-1
 */
public class A {
    public static void main(String[] args) {
        Data data = new Data();

        new Thread(()->{
            for (int i = 0; i < 10; i++) {
                try {
                    data.increment();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        },"A").start();

        new Thread(()->{
            for (int i = 0; i < 10; i++) {
                try {
                    data.decrement();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        },"B").start();

        new Thread(()->{
            for (int i = 0; i < 10; i++) {
                try {
                    data.increment();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        },"C").start();


        new Thread(()->{
            for (int i = 0; i < 10; i++) {
                try {
                    data.decrement();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        },"D").start();
    }
}

// 判断等待,业务,通知
class Data{ // 数字 资源类

    private int number = 0;

    //+1
    public synchronized void increment() throws InterruptedException {
        //注意:此处如果使用 if 会产生虚假唤醒问题
        while (number!=0){  //0
            // 等待, 会在此处醒来, 如果使用 if 则不会在进行判断
            this.wait();
        }
        number++;
        System.out.println(Thread.currentThread().getName()+"=>"+number);
        // 通知其他线程,我+1完毕了
        this.notifyAll();
    }

    //-1
    public synchronized void decrement() throws InterruptedException {
        //注意:此处如果使用 if 会产生虚假唤醒问题
        while (number==0){ // 1
            // 等待
            this.wait();
        }
        number--;
        System.out.println(Thread.currentThread().getName()+"=>"+number);
        // 通知其他线程,我-1完毕了
        this.notifyAll();
    }

}

2、ReentrantLock

public class B  {
    public static void main(String[] args) {
        Data2 data = new Data2();

        new Thread(()->{
            for (int i = 0; i < 10; i++) {
                try {
                    data.increment();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        },"A").start();

        new Thread(()->{
            for (int i = 0; i < 10; i++) {
                try {
                    data.decrement();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        },"B").start();

        new Thread(()->{
            for (int i = 0; i < 10; i++) {
                try {
                    data.increment();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        },"C").start();

        new Thread(()->{
            for (int i = 0; i < 10; i++) {
                try {
                    data.decrement();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        },"D").start();

    }
}

// 判断等待,业务,通知
class Data2{ // 数字 资源类

    private int number = 0;

    Lock lock = new ReentrantLock();
    Condition condition = lock.newCondition();

    //condition.await(); // 等待
    //condition.signalAll(); // 唤醒全部
    //+1
    public void increment() throws InterruptedException {
        lock.lock();
        try {
            // 业务代码
            while (number!=0){  //0
                // 等待
                condition.await();
            }
            number++;
            System.out.println(Thread.currentThread().getName()+"=>"+number);
            // 通知其他线程,我+1完毕了
            condition.signalAll();
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            lock.unlock();
        }

    }

    //-1
    public synchronized void decrement() throws InterruptedException {
        lock.lock();
        try {
            while (number==0){ // 1
                // 等待
                condition.await();
            }
            number--;
            System.out.println(Thread.currentThread().getName()+"=>"+number);
            // 通知其他线程,我-1完毕了
            condition.signalAll();
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            lock.unlock();
        }
    }

}

3、线程顺序调用

A 执行完调用B,B执行完调用C,C执行完调用A
public class C {

    public static void main(String[] args) {
        Data3 data = new Data3();

        new Thread(()->{
            for (int i = 0; i <10 ; i++) {
                data.printA();
            }
        },"A").start();

        new Thread(()->{
            for (int i = 0; i <10 ; i++) {
                data.printB();
            }
        },"B").start();

        new Thread(()->{
            for (int i = 0; i <10 ; i++) {
                data.printC();
            }
        },"C").start();
    }

}

class Data3{ // 资源类 Lock

    private Lock lock = new ReentrantLock();
    private Condition condition1 = lock.newCondition();
    private Condition condition2 = lock.newCondition();
    private Condition condition3 = lock.newCondition();
    private int number = 1; // 1A  2B  3C

    public void printA(){
        lock.lock();
        try {
            // 业务,判断-> 执行-> 通知
            while (number!=1){
                // 等待
                condition1.await();
            }
            System.out.println(Thread.currentThread().getName()+"=>AAAAAAA");
            // 唤醒,唤醒指定的人,B
            number = 2;
            condition2.signal();
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            lock.unlock();
        }
    }

五、阻塞队列

FIFO

FIFO:first in first out 先进先出

不得不阻塞的情况

(1)写入,队列满了,必须阻塞等待

(2)取出,如果队列时空的,必须阻塞等待生产

1、BlockingQueue继承关系

阻塞队列 BlockingQueue

image-20210428135912262

image-20210428140012209

  • ArrayBlockingQueue: 这是一个由数组实现的容量固定的有界阻塞队列.

  • SynchronousQueue: 没有容量,不能缓存数据;每个put必须等待一个take; offer()的时候如果没有另一个线程在poll()或者take()的话返回false。

  • LinkedBlockingQueue: 这是一个由单链表实现的默认无界的阻塞队列。LinkedBlockingQueue提供了一个可选有界的构造函数,而在未指明容量时,容量默认为Integer.MAX_VALUE。

名称 描述
ArrayBlockingQueue 一个用数组实现的有界阻塞队列,此队列按照先进先出(FIFO)的原则对元素进行排序,并发采用可重入锁来控制
LinkedBlockingQueue 一个用链表结构组成的有界队列,此队列按照先进先出(FIFO)的原则对元素进行排序
PriorityBlockingQueue 一个支持线程优先级排序的无界队列
DelayQueue 一个实现PriorityBlockingQueue实现延迟获取的无界队列,在创建元素时,可以指定多久才能从队列中获取当前元素
SynchronousQueue 一个不存储元素的阻塞队列,每一个put操作必须等待take操作。支持公平锁和非公平锁。Executors.newCachedThreadPool()就使用了SynchronousQueue
LinkedTransferQueue 一个由链表结构组成的无界阻塞队列,相当于其它队列,LinkedTransferQueue队列多了transfer和tryTransfer方法
LinkedBlockingDeque 一个由链表结构组成的双向阻塞队列

2、BolckingQueue常用方法

方式 抛出异常 有返回值,无异常 阻塞 等待 超时等待
添加 add(E e) offer(E e) put(E e) offer(..)
移除 remove(Object o) poll(..) take() poll(..)
队首元素 E element() peek()

add

将指定的元素插入到此队列中,如果可以立即执行此操作而不违反容量限制, true在成功后返回 IllegalStateException如果当前没有可用空间,则抛出IllegalStateException。

boolean add(E e)

offer

将指定的元素插入此队列中,如果它是立即可行且不会违反容量限制,返回true在成功,如果当前没有空间可用,返回 false。 当使用容量限制队列时,此方法通常优于add(E),这可能无法仅通过抛出异常来插入元素。 不等待。

boolean offer(E e)

将指定的元素插入到此队列中,等待指定的等待时间(如有必要)才能使空间变得可用。

boolean offer(E e,long timeout,TimeUnit unit) throws InterruptedException

put

将指定的元素插入到此队列中。

void put(E e) throws InterruptedException

remove

删除元素,如果指定的元素存在,则返回true;如果指定的袁术不存在,返回false

boolean remove(Object o)

poll

检索并删除此队列的头,等待指定的等待时间(如有必要)使元素变为可用。

E poll(long timeout,TimeUnit unit) throws InterruptedException

take

检索并删除此队列的头,如有必要,等待元素可用。

E take() throws InterruptedException

element

检索,但不删除,这个队列的头。 此方法与peek不同之处在于,如果此队列为空,它将抛出异常。

E element()

peek

检索但不删除此队列的头部,如果此队列为空,则返回 null

E peek()

3、BolckingQueue使用

// 队列的大小
ArrayBlockingQueue blockingQueue = new ArrayBlockingQueue<>(3);

System.out.println(blockingQueue.add("a"));
System.out.println(blockingQueue.add("b"));
System.out.println(blockingQueue.add("c"));
// IllegalStateException: Queue full 抛出异常!
//System.out.println(blockingQueue.add("d"));

new Thread(()->{
    try {
        TimeUnit.SECONDS.sleep(1);
    } catch (InterruptedException e) {
        e.printStackTrace();
    }
    blockingQueue.remove("c");
}).start();
System.out.println(blockingQueue.offer("d",3,TimeUnit.SECONDS));    //正常
//System.out.println(blockingQueue.offer("d"));        //异常

System.out.println("=-===========");

System.out.println(blockingQueue.element()); // 查看队首元素是谁
System.out.println(blockingQueue.remove());


System.out.println(blockingQueue.remove());
System.out.println(blockingQueue.remove());

// java.util.NoSuchElementException 抛出异常!
// System.out.println(blockingQueue.remove());

4、SynchronousQueue 同步队列

没有容量,不允许空元素,底层采用 LockSupport 实现阻塞唤醒线程

  • put(E e):添加元素,并且阻塞当前线程,直到其他线程将该元素取出
  • E take():从队列中获取元素,并唤醒该元素对应的阻塞线程

此队列源码中充斥着大量的CAS语句

/**
*    准备put3个元素,每次put完必须等该元素取出后再能再次put
*/
BlockingQueue<String> blockingQueue = new SynchronousQueue<>(); // 同步队列

new Thread(()->{
    try {
        System.out.println(Thread.currentThread().getName()+" put 1");
        blockingQueue.put("1");        //put后阻塞,等待取出后继续运行
        System.out.println(Thread.currentThread().getName()+" put 2");
        blockingQueue.put("2");        //put后阻塞,等待取出后继续运行
        System.out.println(Thread.currentThread().getName()+" put 3");
        blockingQueue.put("3");        //put后阻塞,等待取出后继续运行
    } catch (InterruptedException e) {
        e.printStackTrace();
    }
},"T1").start();


new Thread(()->{
    try {
        TimeUnit.SECONDS.sleep(3);
        System.out.println(Thread.currentThread().getName()+"=>"+blockingQueue.take());
        TimeUnit.SECONDS.sleep(3);
        System.out.println(Thread.currentThread().getName()+"=>"+blockingQueue.take());
        TimeUnit.SECONDS.sleep(3);
        System.out.println(Thread.currentThread().getName()+"=>"+blockingQueue.take());
    } catch (InterruptedException e) {
        e.printStackTrace();
    }
},"T2").start();
T1 put 1
T2=>1
T1 put 2
T2=>2
T1 put 3
T2=>3

可通过多线程往其中插入多个元素,获取值时根据先进后出原则

SynchronousQueue<Integer> synchronousQueue = new SynchronousQueue<>();

new Thread(()->{
    try {
        Thread.sleep(500);
        System.out.println("put 1");
        synchronousQueue.put(1);
        System.out.println("put 1 end");

    } catch (InterruptedException e) {
        e.printStackTrace();
    }
}).start();

new Thread(()->{
    try {
        Thread.sleep(1000);
        System.out.println("put 2");
        synchronousQueue.put(2);
        System.out.println("put 2 end");

    } catch (InterruptedException e) {
        e.printStackTrace();
    }
}).start();

new Thread(()->{
    try {
        Thread.sleep(1500);
        System.out.println("put 3");
        synchronousQueue.put(3);
        System.out.println("put 3 end");

    } catch (InterruptedException e) {
        e.printStackTrace();
    }
}).start();

new Thread(()->{
    try {
        Thread.sleep(2000);
        System.out.println("take " + synchronousQueue.take());
        Thread.sleep(100);
        System.out.println("take " + synchronousQueue.take());
        Thread.sleep(100);
        System.out.println("take " + synchronousQueue.take());

    } catch (InterruptedException e) {
        e.printStackTrace();
    }
}).start();

Thread.sleep(3000);
put 1
put 2
put 3
take 1
put 1 end
take 2
put 2 end
put 3 end
take 3

(1)构造方法

public SynchronousQueue() {
    this(false);
}

public SynchronousQueue(boolean fair) {
    transferer = fair ? new TransferQueue<E>() : new TransferStack<E>();
}

底层为链表,节点中存储了当前节点数据对应的线程(即该元素是由哪个线程 put 的)

static final class QNode {
    volatile QNode next;          // next node in queue
    volatile Object item;         // CAS'ed to or from null
    volatile Thread waiter;       // to control park/unpark
    final boolean isData;
    ....
}

六、ForkJoin

并行执行任务!提高效率。大数据量!

采用“工作窃取”模式(work-stealing) :

当执行新的任务时它可以将其拆分分成更小的任务执行,并将小任务加到线程队列中,然后再从一个随机线程的队列中偷一个并把它放在自己的队列中。

相对于一般的线程池实现, fork/join框架的优势体现在对其中包含的任务的处理方式上.在一般的线程池中,如果一个线程正在执行的任务由于某些原因无法继续运行,那么该线程会处于等待状态.而在fork/join框架实现中,如果某个子问题由于等待另外一个子问题的完成而无法继续运行.那么处理该子问题的线程会主动寻找其他尚未运行的子问题来执行.这种方式减少了线程的等待时间,提高性能

image-20210429182101211

工作窃取(重点)

这个里面维护的都是双端队列。

2 个线程并行工作,此时 B 先执行完任务,会去窃取 A 的任务,从而提高效率。

image-20210429182313644

代码

案例:求和 1 到 10 亿

public static void main(String[] args) throws ExecutionException, InterruptedException {
    // test1(); // 12224ms
    // test2(); // 10038ms
    // test3(); // 153ms
}

// 普通方法
public static void test1(){
    Long sum = 0L;
    long start = System.currentTimeMillis();
    for (Long i = 1L; i <= 10_0000_0000; i++) {
        sum += i;
    }
    long end = System.currentTimeMillis();
    System.out.println("sum="+sum+" 时间:"+(end-start));
}

// 使用ForkJoin
public static void test2() throws ExecutionException, InterruptedException {
    long start = System.currentTimeMillis();

    // 1、ForkJoin线程池
    ForkJoinPool forkJoinPool = new ForkJoinPool();
    ForkJoinTask<Long> task = new ForkJoinDemo(0L, 10_0000_0000L);

    // 2、执行任务,默认不返回结果
    ForkJoinTask<Long> submit = forkJoinPool.submit(task);
    // get()等待执行任务执行完后获取其结果
    Long sum = submit.get();

    // 2、执行任务,返回结果
    //Long invoke = forkJoinPool.invoke(task);

    long end = System.currentTimeMillis();

    System.out.println("sum="+sum+" 时间:"+(end-start));
}

//使用Stream流计算
public static void test3(){
    long start = System.currentTimeMillis();
    // Stream并行流 () 
    //串行流(单线程):切换为并行流 parallel()
    //并行流:切换为串行流 sequential()
    long sum = LongStream.rangeClosed(0L, 10_0000_0000L)
                            .parallel()
                            .reduce(0, Long::sum);
    long end = System.currentTimeMillis();
    System.out.println("sum="+"时间:"+(end-start));
}
/**
 * 求和计算的任务!
 * // 如何使用 forkjoin
 * // 1、forkjoinPool 通过它来执行
 * // 2、计算任务 forkjoinPool.execute(ForkJoinTask task)
 * // 3. 计算类要继承 ForkJoinTask
 */
public class ForkJoinDemo extends RecursiveTask<Long> {

    private Long start;  // 1
    private Long end;    // 1990900000

    // 临界值,最小单位任务
    private Long temp = 10000L;

    public ForkJoinDemo(Long start, Long end) {
        this.start = start;
        this.end = end;
    }

    // 计算方法
    @Override
    protected Long compute() {
        if ((end-start)<temp){
            Long sum = 0L;
            for (Long i = start; i <= end; i++) {
                sum += i;
            }
            return sum;
        }else { // forkjoin 递归
            long middle = (start + end) / 2; // 中间值
            ForkJoinDemo task1 = new ForkJoinDemo(start, middle);
            task1.fork(); // 拆分任务,把任务压入线程队列
            ForkJoinDemo task2 = new ForkJoinDemo(middle+1, end);
            task2.fork(); // 拆分任务,把任务压入线程队列

            return task1.join() + task2.join();
        }
    }
}

七、CompletableFuture

0、Future接口

Future 接口(FutureTask是其实现类)提供一种异步并行计算的功能,定义了操作异步任务的一些方法,如获取异步任务的执行结果、取消任务的执行、判断任务是否被取消、判断任务是否完毕等。

Future 接口可以为主线程开一个分支线程,用于处理耗时工作。

Future 对结果的获取并不是很友好,只能通过阻塞和轮询的方法得到结果

CompletableFuture 是对 Future接口的扩展,主要增加了一下功能:

  • 回调通知 CompletableFutureexceptionally
  • 线程池结合异步任务的创建方式
  • 多个任务前后依赖,可以组合处理
  • 对计算速度选取最快的进行处理

异步编排 CompletableFuture 类似 promise

与同步处理相对,异步处理不用阻塞当前线程来等待处理完成

比如需要在异步查询完一级分类后,获取到结果,在查询二级分类的情况下

(1)接口源码

public interface Future<V> {

    /**
     * 尝试取消执行此任务。如果任务已经完成、已经取消或由于其他原因无法取消,则此尝试将失败。
     * 如果成功,并且在调用cancel时此任务尚未启动,则此任务不应运行。
     * 如果任务已经启动,那么mayinterruptirunning参数决定是否应该中断执行该任务的线程
     * 以尝试停止该任务。
     * 在此方法返回后,对isDone的后续调用将始终返回true。
     * 如果此方法返回true,后续对isCancelled的调用将始终返回true。
     */
    boolean cancel(boolean mayInterruptIfRunning);

    /**
     * 判断当前方法是否取消
     */
    boolean isCancelled();

    /**
     * 判断当前方法是否完成
     */
    boolean isDone();

    /**
     * 方法可以当任务结束后返回一个结果,如果调用时,工作还没有结束,则会阻塞线程,直到任务执行完毕
     */
    V get() throws InterruptedException, ExecutionException;

    /**
     * 最多等待timeout的时间就会返回结果
     */
    V get(long timeout, TimeUnit unit)
        throws InterruptedException, ExecutionException, TimeoutException;
}

(2)实现类

img

RunnableFuture

这个接口同时继承Future接口和Runnable接口,在成功执行 run() 方法后,可以通过Future访问执行结果。这个接口都实现类是FutureTask,一个可取消的异步计算,这个类提供了Future的基本实现,后面我们的demo也是用这个类实现,它实现了启动和取消一个计算,查询这个计算是否已完成,恢复计算结果。计算的结果只能在计算已经完成的情况下恢复。如果计算没有完成,get方法会阻塞,一旦计算完成,这个计算将不能被重启和取消,除非调用runAndReset方法。

FutureTask

能用来包装一个Callable或Runnable对象,因为它实现了Runnable接口,而且它能被传递到Executor进行执行。为了提供单例类,这个类在创建自定义的工作类时提供了protected构造函数。

SchedualFuture

这个接口表示一个延时的行为可以被取消。通常一个安排好的future是定时任务SchedualedExecutorService的结果

CompleteFuture

一个Future类是显示的完成,而且能被用作一个完成等级,通过它的完成触发支持的依赖函数和行为。当两个或多个线程要执行完成或取消操作时,只有一个能够成功。

ForkJoinTask

基于任务的抽象类,可以通过ForkJoinPool来执行。一个ForkJoinTask是类似于线程实体,但是相对于线程实体是轻量级的。大量的任务和子任务会被ForkJoinPool池中的真实线程挂起来,以某些使用限制为代价。

1、创建异步对象

// 没有返回值的 runAsync 异步回调
public static CompletableFuture<Void> runAsync(Runnable runnable);
// 没有返回值的 runAsync 异步回调,可以指定使用的线程池    
public static CompletableFuture<Void> runAsync(Runnable runnable,Executor executor);

// 有返回值的 supplyAsync 异步回调
public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier);
// 有返回值的 supplyAsync 异步回调,可以指定使用的线程池    
public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier,Executor executor);

有返回值示例

// 没有返回值的 runAsync 异步回调
CompletableFuture<String> completableFuture = CompletableFuture.supplyAsync(()->{
    try {
        TimeUnit.SECONDS.sleep(2);
    } catch (InterruptedException e) {
        e.printStackTrace();
    }
    System.out.println(Thread.currentThread().getName()+"runAsync=>Void");
    return "1";
});

String result = completableFuture.get(); // 获取阻塞执行结果,一般放在方法最后
System.out.println(result);

获取方法返回值

get()方法用于阻塞获取返回结果,建议设置最大阻塞时长,避免因为意外问题无法获取返回结果导致一直阻塞。

// 阻塞获取返回结果
public T get();

// 阻塞获取返回结果,若超过指定时长则放开阻塞,并抛出TimeoutException异常
public T get(long timeout, TimeUnit unit);

// 基本和get()方法一样,区别是不会抛出检查型异常
public T join()

2、失败与成功回调

类似前端的 promise

创建回调的方式

// 使用当前执行成功的异步线程去执行异步回调
public CompletableFuture<T> whenComplete(
        BiConsumer<? super T, ? super Throwable> action);

// 使用线程池中的其他线程去执行异步回调
public CompletableFuture<T> whenCompleteAsync(
        BiConsumer<? super T, ? super Throwable> action);

// 使用指定线程池中的其他线程去执行异步回调
public CompletableFuture<T> whenCompleteAsync(
        BiConsumer<? super T, ? super Throwable> action, Executor executor)

whenComplete :用于处理正常和异常的处理结果;虽然能得到异常信息,但是无法修改返回结果,相当于一个监听器

exceptionally 用于处理异常情况,可以得到异常信息,同时设置异常时的返回结果

有返回值的 supplyAsync 的异步回调

// 有返回值的 supplyAsync 异步回调
// ajax,成功和失败的回调
// 返回的是错误信息;
CompletableFuture<Integer> completableFuture = CompletableFuture.supplyAsync(()->{
    System.out.println(Thread.currentThread().getName()+"supplyAsync=>Integer");
    int i = 10/0;
    return 1024;
});

Integer result = completableFuture.whenComplete((res, excption) -> {
    System.out.println("t=>" + res); // 正常的返回结果
    System.out.println("u=>" + excption); // 错误信息
}).exceptionally((e) -> {
    System.out.println(e.getMessage());
    return 233; // 可以获取到错误的返回结果
}).get();

System.out.println(result);

3、handle()方法

handle() 方法与 whenComplete() 方法的主要区别在于,handle() 方法能够修改返回的结果。

public <U> CompletableFuture<U> handle(
    BiFunction<? super T, Throwable, ? extends U> fn);

public <U> CompletableFuture<U> handleAsync(
        BiFunction<? super T, Throwable, ? extends U> fn);

public <U> CompletableFuture<U> handleAsync(
        BiFunction<? super T, Throwable, ? extends U> fn, Executor executor);

示例

Integer result = completableFuture.handle((res, excption) -> {

    if (res != null) {
        // 正常的返回结果
        System.out.println("t=>" + res);
        return res;
    }

    if (excption != null){
        // 错误信息
        System.out.println("u=>" + excption);
        return 0;
    }
    return 1;
}).get();

System.out.println(result);

4、串行化

// 异步线程执行完方法后,继续执行 thenRun 对应的方法,无法感知上一步的结果,无返回结果
public CompletableFuture<Void> thenRun(Runnable action);
// 开启新线程执行 action 任务
public CompletableFuture<Void> thenRunAsync(Runnable action);
// 用指定的线程池中的线程执行 action 任务
public CompletableFuture<Void> thenRunAsync(Runnable action,Executor executor);

/* -------------------------------------------------------------- */
// 异步线程执行完方法后,继续执行 thenAccept 对应的方法,可获得上一步的结果,无返回结果
public CompletableFuture<Void> thenAccept(Consumer<? super T> action);
public CompletableFuture<Void> thenAcceptAsync(Consumer<? super T> action);
public CompletableFuture<Void> thenAcceptAsync(Consumer<? super T> action,
                                               Executor executor);

/* -------------------------------------------------------------- */
// 异步线程执行完方法后,继续执行 thenRun 对应的方法,可获得上一步的结果,可返回结果
public <U> CompletableFuture<U> thenApply(
    Function<? super T,? extends U> fn);

public <U> CompletableFuture<U> thenApplyAsync(
    Function<? super T,? extends U> fn);

public <U> CompletableFuture<U> thenApplyAsync(
    Function<? super T,? extends U> fn, Executor executor);

示例

CompletableFuture<Integer> completableFuture = CompletableFuture.supplyAsync(()->{
    System.out.println(Thread.currentThread().getName()+"supplyAsync=>Integer");
    //int i = 10/0;
    return 10;
});

CompletableFuture<Integer> future = completableFuture.thenApplyAsync((res) -> {
    return res * 2;
}).thenApplyAsync((res)->{
    return res * 4;
});

Integer result = future.get();
System.out.println(result);

5、两任务组合-都完成

当两个任务都完成时,触发该任务

// CompletionStage<?> other:该参数为另一个任务

// 不需要获取前一个任务结果,无法返回结果
public CompletableFuture<Void> runAfterBoth(CompletionStage<?> other,
                                            Runnable action);

public CompletableFuture<Void> runAfterBothAsync(CompletionStage<?> other,
                                                 Runnable action);

public CompletableFuture<Void> runAfterBothAsync(CompletionStage<?> other,
                                                 Runnable action,
                                                 Executor executor);

/* ------------------------------------------------ */

// 可获取前一个任务结果,无返回结果
public <U> CompletableFuture<Void> thenAcceptBoth(
    CompletionStage<? extends U> other,
    BiConsumer<? super T, ? super U> action);

public <U> CompletableFuture<Void> thenAcceptBothAsync(
    CompletionStage<? extends U> other,
    BiConsumer<? super T, ? super U> action);

public <U> CompletableFuture<Void> thenAcceptBothAsync(
    CompletionStage<? extends U> other,
    BiConsumer<? super T, ? super U> action, Executor executor);

/* -------------------------------------------------- */

// 可获取前一个任务结果,有返回结果
public <U,V> CompletableFuture<V> thenCombine(
    CompletionStage<? extends U> other,
    BiFunction<? super T,? super U,? extends V> fn);

public <U,V> CompletableFuture<V> thenCombineAsync(
    CompletionStage<? extends U> other,
    BiFunction<? super T,? super U,? extends V> fn);

public <U,V> CompletableFuture<V> thenCombineAsync(
    CompletionStage<? extends U> other,
    BiFunction<? super T,? super U,? extends V> fn, Executor executor);

示例

CompletableFuture<Integer> completableFuture = CompletableFuture.supplyAsync(()->{
    return 10;
});

CompletableFuture<Integer> completableFuture2 = CompletableFuture.supplyAsync(()->{
    return 20;
});

CompletableFuture<Integer> future = completableFuture.thenCombine(completableFuture2, (res1, res2) -> {
    System.out.println(res1);
    System.out.println(res2);
    return res1 + res2;
});

Integer result = future.get();
System.out.println(result);        // 30

6、两任务组合-一个完成

指定的两个任务中,其中一个完成时,执行任务

注意:当其中一个任务出现异常时,直接中断,并不会向下执行,这个只是用于,2个任务执行快慢不一的情况。

// runAfterEither: 无法获取上一个任务结果,无返回值
public CompletableFuture<Void> runAfterEither(CompletionStage<?> other,
                                              Runnable action);

public CompletableFuture<Void> runAfterEitherAsync(CompletionStage<?> other,
                                                   Runnable action);

public CompletableFuture<Void> runAfterEitherAsync(CompletionStage<?> other,
                                                   Runnable action,
                                                   Executor executor);

/* -------------------------------------------------------- */

// acceptEither: 可获取上一个任务结果,无返回值
public CompletableFuture<Void> acceptEither(
    CompletionStage<? extends T> other, Consumer<? super T> action);

public CompletableFuture<Void> acceptEitherAsync(
    CompletionStage<? extends T> other, Consumer<? super T> action);

public CompletableFuture<Void> acceptEitherAsync(
    CompletionStage<? extends T> other, Consumer<? super T> action,
    Executor executor);

/* --------------------------------------------------------- */

// applyToEither: 可获取上一个任务结果,有返回值
public <U> CompletableFuture<U> applyToEither(
    CompletionStage<? extends T> other, Function<? super T, U> fn);

public <U> CompletableFuture<U> applyToEitherAsync(
    CompletionStage<? extends T> other, Function<? super T, U> fn);

public <U> CompletableFuture<U> applyToEitherAsync(
    CompletionStage<? extends T> other, Function<? super T, U> fn,
    Executor executor);

示例

CompletableFuture<Integer> completableFuture = CompletableFuture.supplyAsync(()->{
    try {
        Thread.sleep(1000);
        System.out.println(10);
    } catch (InterruptedException e) {
        e.printStackTrace();
    }
    return 10;
});

CompletableFuture<Integer> completableFuture2 = CompletableFuture.supplyAsync(()->{
    System.out.println(20);
    //int i = 10/0;
    return 20;
});


CompletableFuture<Integer> future = completableFuture.applyToEither(completableFuture2, (res) -> {
    return res + 1;
});

Integer result = future.get();
System.out.println(result);        // 20 21 10

7、多任务组合(重要)

allOf :等待所有任务执行完成后,执行;返回的 CompletableFuture 无返回值

anyOd:其中一个任务执行完成后,执行;返回的 CompletableFuture 返回值是最先完成的任务的返回值

public static CompletableFuture<Void> allOf(CompletableFuture<?>... cfs);

public static CompletableFuture<Object> anyOf(CompletableFuture<?>... cfs)

allOf示例

CompletableFuture<Integer> future1 = CompletableFuture.supplyAsync(()->{
    System.out.println("查询商品分类信息");
    return 10;
});

CompletableFuture<Integer> future2 = CompletableFuture.supplyAsync(()->{
    System.out.println("查询商品销量信息");
    return 20;
});

CompletableFuture<Integer> future3 = CompletableFuture.supplyAsync(()->{
    System.out.println("查询商品属性信息");
    return 30;
});

CompletableFuture<Void> allFuture = CompletableFuture.allOf(future1, future2, future3);

allFuture.get();    // 阻塞等待所有线程执行完成

Integer result1 = future1.get();
Integer result2 = future2.get();
Integer result3 = future3.get();

System.out.println(result1);
System.out.println(result2);
System.out.println(result3);

anyOf示例

CompletableFuture<Integer> future1 = CompletableFuture.supplyAsync(()->{
    return 10;
});

CompletableFuture<Integer> future2 = CompletableFuture.supplyAsync(()->{
    return 20;
});

CompletableFuture<Integer> future3 = CompletableFuture.supplyAsync(()->{
    return 30;
});

CompletableFuture<Object> allFuture = CompletableFuture.anyOf(future1, future2, future3);

// 获取最早完成的任务的结果
Integer result = (Integer) allFuture.get();

System.out.println(result);

八、JMM

什么是JMM

Java内存模型,不存在的概念性的东西!

关于JMM的一些同步的约定:

1、线程解锁前,必须把共享变量立刻刷回主存。

2、线程加锁前,必须读取主存中的新值到工作内存中!

3、加锁和解锁是同一把锁

JMM内存模型

线程 工作内存 、主内存

image-20210502211347086

内存交互操作有8种,虚拟机实现必须保证每一个操作都是原子的,不可在分的(对于double和long类 型的变量来说,load、store、read和write操作在某些平台上允许例外)

  • read (读取):作用于主内存变量,它把一个变量的值从主内存传输到线程的工作内存中,以便 随后的load动作使用
  • load (载入):作用于工作内存的变量,它把read操作从主存中变量放入工作内存中
  • use (使用):作用于工作内存中的变量,它把工作内存中的变量传输给执行引擎,每当虚拟机 遇到一个需要使用到变量的值,就会使用到这个指令
  • assign (赋值):作用于工作内存中的变量,它把一个从执行引擎中接受到的值放入工作内存的变 量副本中
  • store (存储):作用于主内存中的变量,它把一个从工作内存中一个变量的值传送到主内存中, 以便后续的write使用
  • write (写入):作用于主内存中的变量,它把store操作从工作内存中得到的变量的值放入主内 存的变量中

以上6条保证了单条指令的原子性,针对多条指令的组合性原子保证,没有大面积加锁,所以JVM又提供了一下2条指令

  • lock (锁定):作用于主内存的变量,把一个变量标识为线程独占状态 ,只是在写的时候加锁,只锁写变量的过程

  • unlock (解锁):作用于主内存的变量,它把一个处于锁定状态的变量释放出来,释放后的变量 才可以被其他线程占用

JMM对这八种指令的使用,制定了如下规则:

  1. 不允许read和load、store和write操作之一单独出现。即使用了read必须load,使用了store必须 write
  2. 不允许线程丢弃他近的assign操作,即工作变量的数据改变了之后,必须告知主存
  3. 不允许一个线程将没有assign的数据从工作内存同步回主内存
  4. 一个新的变量必须在主内存中诞生,不允许工作内存直接使用一个未被初始化的变量。就是怼变量 实施use、store操作之前,必须经过assign和load操作
  5. 一个变量同一时间只有一个线程能对其进行lock。多次lock后,必须执行相同次数的unlock才能解 锁
  6. 如果对一个变量进行lock操作,会清空所有工作内存中此变量的值,在执行引擎使用这个变量前, 必须重新load或assign操作初始化变量的值
  7. 如果一个变量没有被lock,就不能对其进行unlock操作。也不能unlock一个被其他线程锁住的变量
  8. 对一个变量进行unlock操作之前,必须把此变量同步回主内存

九、Volatile

Volatile 保证可见性,但不保证原子性。并且禁止进行指令重排序。(实现有序性)

详细参阅:https://blog.csdn.net/qq_24047659/article/details/88031712

可见性

即一个线程修改了某个变量的值,这新值对其他线程来说是立即可见的。

原子性

volatile 只能保证对单次读/写的原子性。i++ 这种操作不能保证原子性。

保证可见性 演示

//加了volatile关键字的区别就是会强制保证可见性   不用考虑CPU有没有时间
//system.out.println 是加锁的,锁的是同一个值 num,里面有个synchronize代码块
public class JMMDemo {
    // 不加 volatile 程序就会死循环!
    // 加 volatile 可以保证可见性
    private volatile static int num = 0;

    public static void main(String[] args) { // main

        new Thread(()->{ // 线程 1 对主内存的变化不知道的
            while (num==0){

            }
        }).start();

        try {
            TimeUnit.SECONDS.sleep(1);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }

        num = 1;
        System.out.println(num);

    }
}

不保证原子性 演示

线程A在执行任务的时候,不能被打扰的,也不能被分割。要么同时成功,要么同时失败

// volatile 不保证原子性
public class VDemo02 {

    // volatile 不保证原子性
    // 原子类的 Integer
    private volatile static AtomicInteger num = new AtomicInteger();

    public static void add(){
        // num++; // 不是一个原子性操作
        num.getAndIncrement(); // AtomicInteger + 1 方法, CAS
    }

    public static void main(String[] args) {

        //理论上num结果应该为 2 万
        for (int i = 1; i <= 20; i++) {
            new Thread(()->{
                for (int j = 0; j < 1000 ; j++) {
                    add();
                }
            }).start();
        }

        while (Thread.activeCount()>2){ // main  gc
            //Thread.yield()是在主线程中执行的,意思只要还有除了GC和main线程之外的线程在跑,主线程就让出cpu不往下执行
            Thread.yield();
        }

        System.out.println(Thread.currentThread().getName() + " " + num);
    }
}

并发问题

1、CAS

CAS(Compare and swap)比较和替换是设计并发算法时用到的一种技术。

有点类似乐观锁

场景:在没有锁的情况下,在多线程访问的时候,保证线程一致性的去改动某一个值。

CAS : 比较当前工作内存中的值和主内存中的值,如果这个值是期望的,那么则执行操作!如果不是就 一直循环!
缺点:
1、 循环会耗时

2、一次性只能保证一个共享变量的原子性

3、ABA问题

ABA问题

当读取的初值:X=A,线程1在对X进行操作时,线程2读取了X,并将其修改为B后,在修改为A,此时,线程1对值X计算完毕,比较计算前获取的值X=A与当前的X=A相等,此时无法得知其他线程已经修改过X。

解决方法

加版本号(或者加boolean标签),当线程改动值的同时也要改动版本号。

public class CASDemo {

    //AtomicStampedReference 注意,如果泛型是一个包装类,注意对象的引用问题

    // 正常在业务操作,这里面比较的都是一个个对象
    static AtomicStampedReference<Integer> atomicStampedReference = new AtomicStampedReference<>(1,1);

    // CAS  compareAndSet : 比较并交换!
    public static void main(String[] args) {

        new Thread(()->{
            int stamp = atomicStampedReference.getStamp(); // 获得版本号
            System.out.println("a1=>"+stamp);

            try {
                TimeUnit.SECONDS.sleep(1);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }

            Lock lock = new ReentrantLock(true);

            atomicStampedReference.compareAndSet(1, 2,
                    atomicStampedReference.getStamp(), atomicStampedReference.getStamp() + 1);

            System.out.println("a2=>"+atomicStampedReference.getStamp());


            System.out.println(atomicStampedReference.compareAndSet(2, 1,
                    atomicStampedReference.getStamp(), atomicStampedReference.getStamp() + 1));

            System.out.println("a3=>"+atomicStampedReference.getStamp());

        },"a").start();


        // 乐观锁的原理相同!
        new Thread(()->{
            int stamp = atomicStampedReference.getStamp(); // 获得版本号
            System.out.println("b1=>"+stamp);

            try {
                TimeUnit.SECONDS.sleep(2);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }

            System.out.println(atomicStampedReference.compareAndSet(1, 6,
                    stamp, stamp + 1));

            System.out.println("b2=>"+atomicStampedReference.getStamp());

        },"b").start();

    }
}

2、AQS


  目录