0、只要是并发编程,一定要加锁
一、概念
1、什么是JUC
在 Java 5.0 提供了 java.util.concurrent (简称JUC )包,在此包中增加了在并发编程中很常用的实用工具类,用于定义类似于线程的自定义子系统,包括线程池、异步 IO 和轻量级任务框架。提供可调的、灵活的线程池。还提供了设计用于多线程上下文中的 Collection 实现等。
2、什么是进程、线程
进程:一个程序,通常包含多个线程,至少包含一个!
java默认有2个线程:main,gc
线程:对java而言,Thread,Runnable,Callable
Java实际上无法开启线程,因为java是运行在虚拟机上的,无法直接操作硬件。是通过调用native
修饰(C或C++)的本地方法。
3、并发、并行
并发:多线程运行在同一个CPU内,CPU不停做上下文切换执行任务,单位时间内只有一个线程在运行;
并行:多线程运行在多核心CPU内,同时执行任务;
4、Java的多线程是并发还是并行:
当多线程在跑时,所有的CPU核心都有负载,也就是说,线程确实是并行的。解释一下,现代操作系统是将线程最为最小的调度单位(由内核负责管理),进程作为资源分配的最小单位。由于进程是不活动的,只是纯粹作为存储线程的容器。也就是说,Java线程尽管被包裹在JVM进程内部,但是CPU调度的是进程中的线程。
5、线程状态
public enum State {
/**
* 新生
*/
NEW,
/**
* 运行
*/
RUNNABLE,
/**
* 阻塞
*/
BLOCKED,
/**
* 等待,死死地等
*/
WAITING,
/**
* 超时等待
*/
TIMED_WAITING,
/**
* 终止
*/
TERMINATED;
}
6、Lambda 表达式如何操作局部变量
- 使用实例变量或静态变量是没有限制的(可认为是通过 final 类型的局部变量 this 来引用前两者)
- 使用局部变量必须显式的声明为 final 或实际效果的的 final 类型,即该变量从未被改变过,可用一个
final
修饰的中间变量。
注:闭包,jdk1.8可以不写final,但是其实是默认有的,jvm会加上去
7、可能有用的方法
//查看系统的CPU核数
System.out.println(Runtime.getRuntime().availableProcessors());
//线程状态
String s = Thread.State.NEW.toString();
//线程休眠1秒
Thread.sleep(1000);
TimeUnit.SECONDS.sleep(1); //企业常用
二、创建线程的三种方式
1、继承Thread类,重写run()方法:
①、定义类继承Thread;
②、复写T=hread类中的run方法;
目的:将自定义代码存储在run方法,让线程运行
③、调用线程的start方法:
该方法有两步:启动线程,调用run方法。
2、实现Runnable接口,实现接口run()方法:
接口应该由那些打算通过某一线程执行其实例的类来实现。类必须定义一个称为run 的无参方法。
①、定义类实现Runnable接口
②、覆盖Runnable接口中的run方法
将线程要运行的代码放在该run方法中。
③、通过Thread类建立线程对象。
④、将Runnable接口的子类对象作为实际参数传递给Thread类的构造函数。
自定义的run方法所属的对象是Runnable接口的子类对象。所以要让线程执行指定对象的run方法就要先明确run方法所属对象
⑤、调用Thread类的start方法开启线程并调用Runnable接口子类的run方法。
3、实现Callable接口,实现call()方法:
①、创建Callable接口的实现类,并实现call()方法,改方法将作为线程执行体,且具有返回值。
②、创建Callable实现类的实例,使用FutrueTask类进行包装Callable对象,FutureTask对象封装了Callable对象的call()方法的返回值
③、使用FutureTask对象作为Thread对象启动新线程。
④、调用FutureTask对象的get()方法获取子线程执行结束后的返回值。
public class CallableTest {
public static void main(String[] args) throws ExecutionException, InterruptedException {
// new Thread(new Runnable()).start();
// new Thread(new FutureTask<V>()).start();
// new Thread(new FutureTask<V>( Callable )).start();
MyThread thread = new MyThread();
FutureTask<Integer> futureTask = new FutureTask<>(thread); // 适配类
new Thread(futureTask,"A").start();
new Thread(futureTask,"B").start(); // 结果会被缓存,效率高
//获取方法返回结果
//这个get 方法可能会产生阻塞!把他放到最后
// 或者使用异步通信来处理!
Integer o = futureTask.get();
System.out.println(o);
}
}
//该泛型为返回值类型
class MyThread implements Callable<Integer> {
@Override
public Integer call() {
System.out.println("call()"); // 会打印几个call
// 耗时的操作
return 1024;
}
}
三种方式区别
主要是单继承局限性、有无返回值、可否抛出异常3点上
(1)继承Thread:线程代码存放在Thread子类run方法中。
优势:编写简单,可直接用this.getname()获取当前线程,不必使用Thread.currentThread()方法。
劣势:已经继承了Thread类,无法再继承其他类。
(2)实现Runnable:线程代码存放在接口的子类的run方法中。
优势:避免了单继承的局限性、多个线程可以共享一个target对象,非常适合多线程处理同一份资源的情形。
劣势:比较复杂、访问线程必须使用Thread.currentThread()方法、无返回值。(其实runnable
也可以有返回值,通过构造方法传进去,等待线程结束。)
(3)实现Callable:
优势:有返回值(用来判断线程是否已经执行完毕或者取消线程执行)、避免了单继承的局限性、多个线程可以共享一个target对象,非常适合多线程处理同一份资源的情形。
劣势:比较复杂、访问线程必须使用Thread.currentThread()方法
(4)异常
Callable()的call()方法可以抛出异常(在调用类中用try/catch捕获),而Runnable()的run()方法不可以
建议使用实现接口的方式创建多线程。
三、常用辅助类(必会)
1、CountDownLatch(减法计数器)
每次有线程调用 countDown() 数量-1,假设计数器变为0,countDownLatch.await() 就会被唤醒,继续 执行!
指定线程执行完毕,在执行操作
// 计数器
// 场景:等6个人全部出教室在关门。
public class CountDownLatchDemo {
public static void main(String[] args) throws InterruptedException {
// 总数是6,必须要执行任务的时候,再使用!
CountDownLatch countDownLatch = new CountDownLatch(6);
for (int i = 1; i <=6 ; i++) {
new Thread(()->{
System.out.println(Thread.currentThread().getName()+" Go out");
countDownLatch.countDown(); // 数量-1
},String.valueOf(i)).start();
}
countDownLatch.await(); // 等待计数器归零,然后再向下执行
System.out.println("Close Door");
}
}
1 Go out
6 Go out
3 Go out
5 Go out
4 Go out
2 Go out
Close Door
CountDownLatch原理
CountDownLatch构造函数:CountDownLatch(int count);
- 构造器中计数值(count)就是闭锁需要等待的线程数量,这个值只能被设置一次。
CountDownLatch类的方法:
void await()
:使当前线程在锁存器倒计数至零之前一直等待,除非线程被中断。boolean await(long timeout, TimeUnit unit)
:使当前线程在锁存器倒计数至零之前一直等待,除非线程被中断或超出了指定的等待时间。void countDown()
:递减锁存器的计数,如果计数到达零,则释放所有等待的线程。long getCount()
:返回当前计数。String toString()
:返回标识此锁存器及其状态的字符串。
与CountDownLatch第一次交互是主线程等待其它的线程,主线程必须在启动其它线程后立即调用await方法,这样主线程的操作就会在这个方法上阻塞,直到其他线程完成各自的任务。
其他的N个线程必须引用闭锁对象,因为他们需要通知 CountDownLatch
对象,他们已经完成了各自的任务,这种机制就是通过 countDown()
方法来完成的。每调用一次这个方法,在构造函数中初始化的count值就减1,所以当N个线程都调用了这个方法count的值等于0,然后主线程就能通过await方法,恢复自己的任务。
2、CyclicBarrier(加法计数器)
当计数器加到指定值时,开启指定线程。
每次线程执行到 cyclicBarrier.await();
时,如果没累加到指定数值,该线程就会等待,直到累加到指定数值才会继续往下执行。
执行达到指定线程数在执行操作
public class CyclicBarrierDemo {
public static void main(String[] args) {
/**
* 集齐7颗龙珠召唤神龙
*/
// 召唤龙珠的线程,当计数器累加到7时执行
CyclicBarrier cyclicBarrier = new CyclicBarrier(7,()->{
System.out.println("召唤神龙成功!");
});
for (int i = 1; i <=7 ; i++) {
final int temp = i;
// lambda能操作到 i 吗
new Thread(()->{
System.out.println(Thread.currentThread().getName()+"收集"+temp+"个龙珠");
try {
cyclicBarrier.await(); // 等待,计数器+1
System.out.println("召唤成功"+temp);
} catch (InterruptedException e) {
e.printStackTrace();
} catch (BrokenBarrierException e) {
e.printStackTrace();
}
}).start();
}
}
}
Thread-1收集2个龙珠
Thread-2收集3个龙珠
Thread-4收集5个龙珠
Thread-0收集1个龙珠
Thread-5收集6个龙珠
Thread-3收集4个龙珠
Thread-6收集7个龙珠
召唤神龙成功!
召唤成功7
召唤成功5
召唤成功2
召唤成功3
召唤成功4
召唤成功6
召唤成功1
3、Semaphore(信号量)
限流,只允许指定个数的线程同时运行
相当于令牌桶,指定桶里有几个令牌,使用完放回
原理:
semaphore.acquire() 获得,假设如果已经满了,等待,等待被释放为止!
semaphore.release(); 释放,会将当前的信号量释放 + 1,然后唤醒等待的线程!
作用: 多个共享资源互斥的使用!并发限流,控制大的线程数!
// 案例:抢车位,6辆车抢3个车位
public class SemaphoreDemo {
public static void main(String[] args) {
// 线程数量:停车位! 限流!
Semaphore semaphore = new Semaphore(3);
for (int i = 1; i <=6 ; i++) {
new Thread(()->{
// acquire() 得到
try {
semaphore.acquire();
System.out.println(Thread.currentThread().getName()+"抢到车位");
TimeUnit.SECONDS.sleep(2);
System.out.println(Thread.currentThread().getName()+"离开车位");
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
semaphore.release(); // release() 释放
}
},String.valueOf(i)).start();
}
}
}
1抢到车位
3抢到车位
4抢到车位
4离开车位
1离开车位
5抢到车位
3离开车位
2抢到车位
6抢到车位
6离开车位
5离开车位
2离开车位
三、锁
1、synchronized
在修饰非静态方法时,锁的对象是方法的调用者。修饰静态方法时,锁的是类(class)。
虽然锁的是对象,但是不影响普通方法(没加Synchronized)的执行。
当同时锁静态和非静态方法时,其实一个锁的是对象,一个是类,所以2个方法可以同时执行。
谁先获得谁先执行。
可用于修饰方法,也可用作同步代码块
synchronized{}
/**
* 经典多线程卖票问题
* 真正的多线程开发,公司中的开发,降低耦合性
* 线程就是一个单独的资源类,没有任何附属的操作!
* 1、 属性、方法
*/
public class SaleTicketDemo01 {
public static void main(String[] args) {
// 并发:多线程操作同一个资源类, 把资源类丢入线程
Ticket ticket = new Ticket();
// @FunctionalInterface 函数式接口,jdk1.8 lambda表达式 (参数)->{ 代码 }
new Thread(()->{
for (int i = 1; i < 40 ; i++) {
ticket.sale();
}
},"A").start();
new Thread(()->{
for (int i = 1; i < 40 ; i++) {
ticket.sale();
}
},"B").start();
new Thread(()->{
for (int i = 1; i < 40 ; i++) {
ticket.sale();
}
},"C").start();
}
}
// 资源类 OOP
class Ticket {
// 属性、方法
private int number = 30;
// 卖票的方式
// synchronized 本质: 队列,锁
public synchronized void sale(){
if (number>0){
System.out.println(Thread.currentThread().getName()+"卖出了第"+(number--)+"票,剩余:"+number);
}
}
}
2、Lock接口
实现类:
(1)ReentrantLock()
(2)ReentrantReadWriteLock.ReadLock
(3)ReentrantReadWriteLock.WriteLock
ReentrantReadWriteLock
:实现 ReadWriteLock
接口
ReadLock
和 WriteLock
:是内部类,实现 Lock
接口
ReentrantReadWriteLock readWriteLock = new ReentrantReadWriteLock();
ReentrantReadWriteLock.ReadLock readLock = readWriteLock.readLock();
ReentrantReadWriteLock.WriteLock writeLock = readWriteLock.writeLock();
方法
//1、获取锁,
//如果锁不可用,则当前线程将被禁用以进行线程调度,并处于休眠状态,直到获取锁。
void lock();
//2、解锁
void unlock();
//3、只有在调用时才可以获得锁。
//如果可用,则获取锁,并立即返回值为true 。 如果锁不可用,则此方法将立即返回值为false 。
boolean tryLock();
//4、如果锁可用,此方法将立即返回值为true 。
//如果锁不可用,则当前线程将被禁用以进行线程调度,并且处于休眠状态,直至发生三件事情之一:
//(1)当前线程获取到锁
//(2)一些其他线程interrupts(中断)当前线程,并且中断锁获取被支持
//(3)指定的等待时间过去了
boolean tryLock(long time,TimeUnit unit);
//此用法可确保锁定已被取消,如果未获取锁定,则不会尝试解锁。
Lock lock = ...;
if (lock.tryLock()) {
try {
// manipulate protected state
} finally {
lock.unlock();
}
} else {
// perform alternative actions
}
Lock 发展历程:
无锁 -> 独占锁 -> 读写锁 -> 邮戳锁
下面一一介绍
3、ReentrantLock():可重入锁
相对于synchronized
而言,较为灵活,没有结构化代码
继承自Lock接口
使用步骤:
1、 new ReentrantLock();
2、 lock.lock(); // 加锁
3、 finally=> lock.unlock(); // 解锁
public class SaleTicketDemo02 {
public static void main(String[] args) {
// 并发:多线程操作同一个资源类, 把资源类丢入线程
Ticket2 ticket = new Ticket2();
// @FunctionalInterface 函数式接口,jdk1.8 lambda表达式 (参数)->{ 代码 }
new Thread(()->{for (int i = 1; i < 40 ; i++) ticket.sale();},"A").start();
new Thread(()->{for (int i = 1; i < 40 ; i++) ticket.sale();},"B").start();
new Thread(()->{for (int i = 1; i < 40 ; i++) ticket.sale();},"C").start();
}
}
// Lock三部曲
// 1、 new ReentrantLock();
// 2、 lock.lock(); // 加锁
// 3、 finally=> lock.unlock(); // 解锁
class Ticket2 {
// 属性、方法
private int number = 30;
Lock lock = new ReentrantLock();
public void sale(){
lock.lock(); // 加锁
try {
// 业务代码
if (number>0){
System.out.println(Thread.currentThread().getName()+"卖出了"+(number--)+"票,剩余:"+number);
}
} catch (Exception e) {
e.printStackTrace();
} finally {
lock.unlock(); // 解锁
}
}
}
ReentrantLock
其他方法参见文档
4、ReentrantReadWriteLock:读写锁
读写锁定义:一个资源能被多个读线程访问,或者一个写线程访问,但不能读写同时存在(读写互斥,读读共享)
只有在读多写少的情况,读写锁才有较高的性能体现
有时候,在多线程中,有一些公共数据修改的机会比较少,而读的机会却是非常多的,此公共数据的操作基本都是读,如果每次操作都给此段代码加锁,太浪费时间了而且也很浪费资源,降低程序的效率,因为读操作不会修改数据,只是做一些查询,所以在读的时候不用给此段代码加锁,可以共享的访问,只有涉及到写的时候,互斥的访问就好了
缺点:
- 写锁饥饿问题:大量的读操作一直占用锁,写操作长期无法获取到锁
- 锁降级
/**
* 案例:模拟redis的key-value缓存
* 独占锁(写锁) 一次只能被一个线程占有 ;一个线程写,其他线程不能写或读
* 共享锁(读锁) 多个线程可以同时占有 ;可多个线程一起读,但读线程和写线程不共存
* ReadWriteLock
* 读-读 可以共存!
* 读-写 不能共存!
* 写-写 不能共存!
*/
public class ReadWriteLockDemo {
public static void main(String[] args) {
MyCache myCache = new MyCache();
// 写入
for (int i = 1; i <= 5 ; i++) {
final int temp = i;
new Thread(()->{
myCache.put(temp+"",temp+"");
},String.valueOf(i)).start();
}
// 读取
for (int i = 1; i <= 5 ; i++) {
final int temp = i;
new Thread(()->{
myCache.get(temp+"");
},String.valueOf(i)).start();
}
}
}
// 加锁的
class MyCacheLock{
private volatile Map<String,Object> map = new HashMap<>();
// 读写锁: 更加细粒度的控制
private ReadWriteLock readWriteLock = new ReentrantReadWriteLock();
private Lock lock = new ReentrantLock();
// 存,写入的时候,只希望同时只有一个线程写
public void put(String key,Object value){
readWriteLock.writeLock().lock();
try {
System.out.println(Thread.currentThread().getName()+"写入"+key);
map.put(key,value);
System.out.println(Thread.currentThread().getName()+"写入OK");
} catch (Exception e) {
e.printStackTrace();
} finally {
readWriteLock.writeLock().unlock();
}
}
// 取,读,所有人都可以读!
public void get(String key){
readWriteLock.readLock().lock();
try {
System.out.println(Thread.currentThread().getName()+"读取"+key);
Object o = map.get(key);
System.out.println(Thread.currentThread().getName()+"读取OK");
} catch (Exception e) {
e.printStackTrace();
} finally {
readWriteLock.readLock().unlock();
}
}
}
/**
* 不加锁的
*/
class MyCache{
private volatile Map<String,Object> map = new HashMap<>();
// 存,写
public void put(String key,Object value){
System.out.println(Thread.currentThread().getName()+"写入"+key);
map.put(key,value);
System.out.println(Thread.currentThread().getName()+"写入OK");
}
// 取,读
public void get(String key){
System.out.println(Thread.currentThread().getName()+"读取"+key);
Object o = map.get(key);
System.out.println(Thread.currentThread().getName()+"读取OK");
}
}
锁降级
ReentrantReadWriteLock
的锁降级:写锁降级为读锁
ReentrantReadWriteLock
重入:该锁是可重入锁,以读写线程为例,读线程在获取读锁之后能再次获取读锁;写线程在获取写锁后能再次获取写锁,也能获取读锁
锁降级:如果同一个线程获取了写锁,然后获取了读锁,再释放了写锁,此时写锁已经降级为读锁了(顺序不能改变)
写锁可以降级为读锁,但是读锁不能升级为写锁
降级是为了让当前线程感知到数据的变化,目的是保证数据可见性。
这个提前释放写锁目的是为了让别的线程可以立刻读吧
ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
lock.writeLock().lock();
System.out.println("写入操作");
lock.readLock().lock();
lock.writeLock().unlock();
System.out.println("读取操作");
lock.readLock().unlock();
不存在锁升级,以下代码发生死锁
ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
lock.readLock().lock();
lock.writeLock().lock();
lock.readLock().unlock();
lock.writeLock().unlock();
5、StampedLock邮戳锁
StampedLock
(邮戳锁,也叫票据锁)是jdk8新增的一个读写锁,也是对 ReentrantReadWriteLock
的优化。
stamp
(戳记,long类型),代表锁的状态,当 stamp 返回 0 时,表示线程获取锁失败。并且当释放锁或转换锁的时候,都要传入最初获取的 stamp 值。
邮戳锁是为解决读写锁的锁饥饿问题而引出的。
读写锁的锁饥饿问题:读操作较多时,很难获取写锁;如999个读操作,1个写操作,那么这个写线程几乎抢不到写锁。虽然使用公平锁可以一定程度上缓解锁饥饿问题,但是这是以降低吞吐量为代价。
邮戳锁相对于读写锁的优化:
ReentrantReadWriteLock
读锁被占用时,其他线程获取写锁时会被阻塞。
StampedLock
采取乐观获取读锁,其他线程在获取写锁时不会被阻塞,但在读锁结束前需要进行结果的校验,判断期间是否有写操作改变了目标值。
乐观读在短的只读代码中,可以减少争用,提高吞吐量。
使用
- 所有获取锁的方法都返回一个邮戳(stamp),stamp 为 0,表示获取失败,其余表示成功
- 所有释放锁的方法都返回一个邮戳(stamp),这个 stamp 必须和成功获取锁时得到的stamp一致;
缺点
StampedLock
是不可重入锁,如果持有写锁时,再去获取写锁,则会造成死锁。- 悲观读和写锁都不支持条件变量 (Condition)
- 使用
StampedLock
一定不要调用中断操作,即interrupt()
方法,影响性能,且会产生奇怪的bug
三种访问模式:
- 悲观读:同读写锁
- 写模式:同读写锁
- 乐观读:采用无锁机制,支持读写并发,乐观的认为读的过程中没人修改,假如被修改了再实现升级为悲观读模式
public class StampedLockTest {
static int num = 0;
StampedLock lock = new StampedLock();
/**
* 悲观读
*/
public void read(){
long stamp = lock.readLock();
try {
System.out.println(Thread.currentThread().getName() + "悲观读:" + num);
TimeUnit.SECONDS.sleep(1);
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
lock.unlockRead(stamp);
}
}
/**
* 写操作
*/
public void write(){
long stamp = lock.writeLock();
try {
System.out.println(Thread.currentThread().getName() + "写操作");
TimeUnit.SECONDS.sleep(1);
num++;
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
lock.unlockWrite(stamp);
}
}
/**
* 乐观读操作
*/
public void optimsticRead(){
long stamp = lock.tryOptimisticRead();
try {
System.out.println(Thread.currentThread().getName() + "乐观读:" + num);
TimeUnit.SECONDS.sleep(1);
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
// 判断是否有人修改过
if (!lock.validate(stamp)){
// 升级为悲观读,重新读取
long stamp2 = lock.readLock();
//重新读取
try{
System.out.println("重读" + num);
}finally {
lock.unlockRead(stamp2);
}
}else{
lock.unlockRead(stamp);
}
}
}
/**
* 验证多个悲观读操作可同时进行
* 验证悲观读时,写操作无法进行
*/
@Test
public void test() throws InterruptedException {
StampedLockTest test = new StampedLockTest();
for (int i = 0; i < 10; i++) {
new Thread(test::read, "read" + i).start();
}
TimeUnit.MILLISECONDS.sleep(500);
for (int i = 0; i < 1; i++) {
new Thread(test::write, "write" + i).start();
}
TimeUnit.SECONDS.sleep(5);
}
/**
* 验证非重入锁:持有写锁,再去获取写锁时,发生死锁
*/
@Test
public void test2(){
long stamp1 = lock.writeLock();
long stamp2 = lock.writeLock();
}
}
6、Synchronized 和 Lock 区别
(1)Synchronized 是 java 内置的关键字,Lock 是一个java类
(2)Synchronized 无法判断获取锁的状态,Lock 可以判断是否获取到锁
(3)Synchronized 会自动释放锁,lock 必须手动释放锁!否则会发生死锁
(4)Synchronized 可重入锁,非公平;Lock,可重入锁,可判断锁,默认非公平(可设置是否公平)
(5)Synchronized 适合锁少量的代码同步问题,Lock 适合锁大量的同步代码
7、闭锁
(1)什么是闭锁
闭锁(latch)是一种Synchronizer:是一个对象,它根据本身的状态调节线程的控制流。常见类型的Synchronizer包括信号量、关卡和闭锁)。
闭锁可以延迟线程的进度直到线程线程到达终止状态。一个闭锁工作起来就像是一道大门:直到闭锁达到终点状态之前,门一直是关闭的,没有线程能够通过,在终点状态到来的时候,所有线程都可以通过。
(2)使用场景
闭锁可以用来确保特定活动直到其他的活动都完成后才开始发生,比如:
- 确保一个计算不会执行,直到它所需要的资源被初始化
- 确保一个服务不会开始,直到它依赖的其他服务都已经开始
- 等待,直到活动的所有部分都为继续处理做好充分准备
- 死锁检测,可以使用n个线程访问共享资源,在每次测试阶段的线程数目是不同的,并尝试产生死锁
(3)闭锁的实现
CountDownLatch
是一个同步辅助类,存在于 java.util.concurrent
包下,灵活的实现了闭锁,它允许一个或多个线程等待一个事件集的发生。
CountDownLatch
是通过一个计数器来实现的,计数器的初始值为线程的数量。每当一个线程完成了自己的任务后,计数的值就会减1。当计数器值到达0时,它所表示所有的线程已经完成了任务,然后在闭锁上等待的线程就可以恢复执行任务。
四、生产者和消费者问题
0、问题描述
经典的生产者-消费者模式,操作流程是这样的:
有多个生产者,可以并发生产产品,把产品置入队列中,如果队列满了,生产者就会阻塞;
有多个消费者,并发从队列中获取产品,如果队列空了,消费者就会阻塞;
1、Synchronized
注意:判断 number 时,必须用 while 判断,不能用 if 否则在多个线程的情况下会产生虚假唤醒问题。
何为虚假唤醒?
虚假唤醒就是一些obj.wait()会在除了obj.notify()和obj.notifyAll()的其他情况被唤醒,而此时是不应该唤醒的。
虚假唤醒产生原因
Object.wait()当我们运行到这个方法的时候,当前线程会进入等待状态,并释放锁,而当其他线程去唤醒它的时候,它会之后wait()的地方继续开始执行(已跳过 if 判断)。
导致虚假唤醒的原因主要就是一个线程直接在if代码块中被唤醒了,这时它已经跳过了if判断。我们只需要将if判断改为while,这样线程就会被重复判断而不再会跳出判断代码块,从而不会产生虚假唤醒这种情况了。
/**
* 线程之间的通信问题:生产者和消费者问题! 等待唤醒,通知唤醒
* 线程交替执行 A B 操作同一个变量 num = 0
* A num+1
* B num-1
*/
public class A {
public static void main(String[] args) {
Data data = new Data();
new Thread(()->{
for (int i = 0; i < 10; i++) {
try {
data.increment();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
},"A").start();
new Thread(()->{
for (int i = 0; i < 10; i++) {
try {
data.decrement();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
},"B").start();
new Thread(()->{
for (int i = 0; i < 10; i++) {
try {
data.increment();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
},"C").start();
new Thread(()->{
for (int i = 0; i < 10; i++) {
try {
data.decrement();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
},"D").start();
}
}
// 判断等待,业务,通知
class Data{ // 数字 资源类
private int number = 0;
//+1
public synchronized void increment() throws InterruptedException {
//注意:此处如果使用 if 会产生虚假唤醒问题
while (number!=0){ //0
// 等待, 会在此处醒来, 如果使用 if 则不会在进行判断
this.wait();
}
number++;
System.out.println(Thread.currentThread().getName()+"=>"+number);
// 通知其他线程,我+1完毕了
this.notifyAll();
}
//-1
public synchronized void decrement() throws InterruptedException {
//注意:此处如果使用 if 会产生虚假唤醒问题
while (number==0){ // 1
// 等待
this.wait();
}
number--;
System.out.println(Thread.currentThread().getName()+"=>"+number);
// 通知其他线程,我-1完毕了
this.notifyAll();
}
}
2、ReentrantLock
public class B {
public static void main(String[] args) {
Data2 data = new Data2();
new Thread(()->{
for (int i = 0; i < 10; i++) {
try {
data.increment();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
},"A").start();
new Thread(()->{
for (int i = 0; i < 10; i++) {
try {
data.decrement();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
},"B").start();
new Thread(()->{
for (int i = 0; i < 10; i++) {
try {
data.increment();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
},"C").start();
new Thread(()->{
for (int i = 0; i < 10; i++) {
try {
data.decrement();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
},"D").start();
}
}
// 判断等待,业务,通知
class Data2{ // 数字 资源类
private int number = 0;
Lock lock = new ReentrantLock();
Condition condition = lock.newCondition();
//condition.await(); // 等待
//condition.signalAll(); // 唤醒全部
//+1
public void increment() throws InterruptedException {
lock.lock();
try {
// 业务代码
while (number!=0){ //0
// 等待
condition.await();
}
number++;
System.out.println(Thread.currentThread().getName()+"=>"+number);
// 通知其他线程,我+1完毕了
condition.signalAll();
} catch (Exception e) {
e.printStackTrace();
} finally {
lock.unlock();
}
}
//-1
public synchronized void decrement() throws InterruptedException {
lock.lock();
try {
while (number==0){ // 1
// 等待
condition.await();
}
number--;
System.out.println(Thread.currentThread().getName()+"=>"+number);
// 通知其他线程,我-1完毕了
condition.signalAll();
} catch (Exception e) {
e.printStackTrace();
} finally {
lock.unlock();
}
}
}
3、线程顺序调用
A 执行完调用B,B执行完调用C,C执行完调用A
public class C {
public static void main(String[] args) {
Data3 data = new Data3();
new Thread(()->{
for (int i = 0; i <10 ; i++) {
data.printA();
}
},"A").start();
new Thread(()->{
for (int i = 0; i <10 ; i++) {
data.printB();
}
},"B").start();
new Thread(()->{
for (int i = 0; i <10 ; i++) {
data.printC();
}
},"C").start();
}
}
class Data3{ // 资源类 Lock
private Lock lock = new ReentrantLock();
private Condition condition1 = lock.newCondition();
private Condition condition2 = lock.newCondition();
private Condition condition3 = lock.newCondition();
private int number = 1; // 1A 2B 3C
public void printA(){
lock.lock();
try {
// 业务,判断-> 执行-> 通知
while (number!=1){
// 等待
condition1.await();
}
System.out.println(Thread.currentThread().getName()+"=>AAAAAAA");
// 唤醒,唤醒指定的人,B
number = 2;
condition2.signal();
} catch (Exception e) {
e.printStackTrace();
} finally {
lock.unlock();
}
}
五、阻塞队列
FIFO
FIFO:first in first out 先进先出
不得不阻塞的情况
(1)写入,队列满了,必须阻塞等待
(2)取出,如果队列时空的,必须阻塞等待生产
1、BlockingQueue继承关系
阻塞队列 BlockingQueue
ArrayBlockingQueue: 这是一个由数组实现的容量固定的有界阻塞队列.
SynchronousQueue: 没有容量,不能缓存数据;每个put必须等待一个take; offer()的时候如果没有另一个线程在poll()或者take()的话返回false。
LinkedBlockingQueue: 这是一个由单链表实现的默认无界的阻塞队列。LinkedBlockingQueue提供了一个可选有界的构造函数,而在未指明容量时,容量默认为Integer.MAX_VALUE。
名称 | 描述 |
---|---|
ArrayBlockingQueue | 一个用数组实现的有界阻塞队列,此队列按照先进先出(FIFO)的原则对元素进行排序,并发采用可重入锁来控制 |
LinkedBlockingQueue | 一个用链表结构组成的有界队列,此队列按照先进先出(FIFO)的原则对元素进行排序 |
PriorityBlockingQueue | 一个支持线程优先级排序的无界队列 |
DelayQueue | 一个实现PriorityBlockingQueue实现延迟获取的无界队列,在创建元素时,可以指定多久才能从队列中获取当前元素 |
SynchronousQueue | 一个不存储元素的阻塞队列,每一个put操作必须等待take操作。支持公平锁和非公平锁。Executors.newCachedThreadPool()就使用了SynchronousQueue |
LinkedTransferQueue | 一个由链表结构组成的无界阻塞队列,相当于其它队列,LinkedTransferQueue队列多了transfer和tryTransfer方法 |
LinkedBlockingDeque | 一个由链表结构组成的双向阻塞队列 |
2、BolckingQueue常用方法
方式 | 抛出异常 | 有返回值,无异常 | 阻塞 等待 | 超时等待 |
---|---|---|---|---|
添加 | add(E e) |
offer(E e) |
put(E e) |
offer(..) |
移除 | remove(Object o) |
poll(..) |
take() |
poll(..) |
队首元素 | E element() |
peek() |
add
将指定的元素插入到此队列中,如果可以立即执行此操作而不违反容量限制, true
在成功后返回 IllegalStateException
如果当前没有可用空间,则抛出IllegalStateException。
boolean add(E e)
offer
将指定的元素插入此队列中,如果它是立即可行且不会违反容量限制,返回true
在成功,如果当前没有空间可用,返回 false
。 当使用容量限制队列时,此方法通常优于add(E)
,这可能无法仅通过抛出异常来插入元素。 不等待。
boolean offer(E e)
将指定的元素插入到此队列中,等待指定的等待时间(如有必要)才能使空间变得可用。
boolean offer(E e,long timeout,TimeUnit unit) throws InterruptedException
put
将指定的元素插入到此队列中。
void put(E e) throws InterruptedException
remove
删除元素,如果指定的元素存在,则返回true;如果指定的袁术不存在,返回false
boolean remove(Object o)
poll
检索并删除此队列的头,等待指定的等待时间(如有必要)使元素变为可用。
E poll(long timeout,TimeUnit unit) throws InterruptedException
take
检索并删除此队列的头,如有必要,等待元素可用。
E take() throws InterruptedException
element
检索,但不删除,这个队列的头。 此方法与peek
的不同之处在于,如果此队列为空,它将抛出异常。
E element()
peek
检索但不删除此队列的头部,如果此队列为空,则返回 null
。
E peek()
3、BolckingQueue使用
// 队列的大小
ArrayBlockingQueue blockingQueue = new ArrayBlockingQueue<>(3);
System.out.println(blockingQueue.add("a"));
System.out.println(blockingQueue.add("b"));
System.out.println(blockingQueue.add("c"));
// IllegalStateException: Queue full 抛出异常!
//System.out.println(blockingQueue.add("d"));
new Thread(()->{
try {
TimeUnit.SECONDS.sleep(1);
} catch (InterruptedException e) {
e.printStackTrace();
}
blockingQueue.remove("c");
}).start();
System.out.println(blockingQueue.offer("d",3,TimeUnit.SECONDS)); //正常
//System.out.println(blockingQueue.offer("d")); //异常
System.out.println("=-===========");
System.out.println(blockingQueue.element()); // 查看队首元素是谁
System.out.println(blockingQueue.remove());
System.out.println(blockingQueue.remove());
System.out.println(blockingQueue.remove());
// java.util.NoSuchElementException 抛出异常!
// System.out.println(blockingQueue.remove());
4、SynchronousQueue 同步队列
没有容量,不允许空元素,底层采用 LockSupport
实现阻塞唤醒线程
put(E e)
:添加元素,并且阻塞当前线程,直到其他线程将该元素取出E take()
:从队列中获取元素,并唤醒该元素对应的阻塞线程
此队列源码中充斥着大量的CAS语句
/**
* 准备put3个元素,每次put完必须等该元素取出后再能再次put
*/
BlockingQueue<String> blockingQueue = new SynchronousQueue<>(); // 同步队列
new Thread(()->{
try {
System.out.println(Thread.currentThread().getName()+" put 1");
blockingQueue.put("1"); //put后阻塞,等待取出后继续运行
System.out.println(Thread.currentThread().getName()+" put 2");
blockingQueue.put("2"); //put后阻塞,等待取出后继续运行
System.out.println(Thread.currentThread().getName()+" put 3");
blockingQueue.put("3"); //put后阻塞,等待取出后继续运行
} catch (InterruptedException e) {
e.printStackTrace();
}
},"T1").start();
new Thread(()->{
try {
TimeUnit.SECONDS.sleep(3);
System.out.println(Thread.currentThread().getName()+"=>"+blockingQueue.take());
TimeUnit.SECONDS.sleep(3);
System.out.println(Thread.currentThread().getName()+"=>"+blockingQueue.take());
TimeUnit.SECONDS.sleep(3);
System.out.println(Thread.currentThread().getName()+"=>"+blockingQueue.take());
} catch (InterruptedException e) {
e.printStackTrace();
}
},"T2").start();
T1 put 1
T2=>1
T1 put 2
T2=>2
T1 put 3
T2=>3
可通过多线程往其中插入多个元素,获取值时根据先进后出原则
SynchronousQueue<Integer> synchronousQueue = new SynchronousQueue<>();
new Thread(()->{
try {
Thread.sleep(500);
System.out.println("put 1");
synchronousQueue.put(1);
System.out.println("put 1 end");
} catch (InterruptedException e) {
e.printStackTrace();
}
}).start();
new Thread(()->{
try {
Thread.sleep(1000);
System.out.println("put 2");
synchronousQueue.put(2);
System.out.println("put 2 end");
} catch (InterruptedException e) {
e.printStackTrace();
}
}).start();
new Thread(()->{
try {
Thread.sleep(1500);
System.out.println("put 3");
synchronousQueue.put(3);
System.out.println("put 3 end");
} catch (InterruptedException e) {
e.printStackTrace();
}
}).start();
new Thread(()->{
try {
Thread.sleep(2000);
System.out.println("take " + synchronousQueue.take());
Thread.sleep(100);
System.out.println("take " + synchronousQueue.take());
Thread.sleep(100);
System.out.println("take " + synchronousQueue.take());
} catch (InterruptedException e) {
e.printStackTrace();
}
}).start();
Thread.sleep(3000);
put 1
put 2
put 3
take 1
put 1 end
take 2
put 2 end
put 3 end
take 3
(1)构造方法
public SynchronousQueue() {
this(false);
}
public SynchronousQueue(boolean fair) {
transferer = fair ? new TransferQueue<E>() : new TransferStack<E>();
}
底层为链表,节点中存储了当前节点数据对应的线程(即该元素是由哪个线程 put 的)
static final class QNode {
volatile QNode next; // next node in queue
volatile Object item; // CAS'ed to or from null
volatile Thread waiter; // to control park/unpark
final boolean isData;
....
}
六、ForkJoin
并行执行任务!提高效率。大数据量!
采用“工作窃取”模式(work-stealing) :
当执行新的任务时它可以将其拆分分成更小的任务执行,并将小任务加到线程队列中,然后再从一个随机线程的队列中偷一个并把它放在自己的队列中。
相对于一般的线程池实现, fork/join框架的优势体现在对其中包含的任务的处理方式上.在一般的线程池中,如果一个线程正在执行的任务由于某些原因无法继续运行,那么该线程会处于等待状态.而在fork/join框架实现中,如果某个子问题由于等待另外一个子问题的完成而无法继续运行.那么处理该子问题的线程会主动寻找其他尚未运行的子问题来执行.这种方式减少了线程的等待时间,提高性能
工作窃取(重点)
这个里面维护的都是双端队列。
2 个线程并行工作,此时 B 先执行完任务,会去窃取 A 的任务,从而提高效率。
代码
案例:求和 1 到 10 亿
public static void main(String[] args) throws ExecutionException, InterruptedException {
// test1(); // 12224ms
// test2(); // 10038ms
// test3(); // 153ms
}
// 普通方法
public static void test1(){
Long sum = 0L;
long start = System.currentTimeMillis();
for (Long i = 1L; i <= 10_0000_0000; i++) {
sum += i;
}
long end = System.currentTimeMillis();
System.out.println("sum="+sum+" 时间:"+(end-start));
}
// 使用ForkJoin
public static void test2() throws ExecutionException, InterruptedException {
long start = System.currentTimeMillis();
// 1、ForkJoin线程池
ForkJoinPool forkJoinPool = new ForkJoinPool();
ForkJoinTask<Long> task = new ForkJoinDemo(0L, 10_0000_0000L);
// 2、执行任务,默认不返回结果
ForkJoinTask<Long> submit = forkJoinPool.submit(task);
// get()等待执行任务执行完后获取其结果
Long sum = submit.get();
// 2、执行任务,返回结果
//Long invoke = forkJoinPool.invoke(task);
long end = System.currentTimeMillis();
System.out.println("sum="+sum+" 时间:"+(end-start));
}
//使用Stream流计算
public static void test3(){
long start = System.currentTimeMillis();
// Stream并行流 ()
//串行流(单线程):切换为并行流 parallel()
//并行流:切换为串行流 sequential()
long sum = LongStream.rangeClosed(0L, 10_0000_0000L)
.parallel()
.reduce(0, Long::sum);
long end = System.currentTimeMillis();
System.out.println("sum="+"时间:"+(end-start));
}
/**
* 求和计算的任务!
* // 如何使用 forkjoin
* // 1、forkjoinPool 通过它来执行
* // 2、计算任务 forkjoinPool.execute(ForkJoinTask task)
* // 3. 计算类要继承 ForkJoinTask
*/
public class ForkJoinDemo extends RecursiveTask<Long> {
private Long start; // 1
private Long end; // 1990900000
// 临界值,最小单位任务
private Long temp = 10000L;
public ForkJoinDemo(Long start, Long end) {
this.start = start;
this.end = end;
}
// 计算方法
@Override
protected Long compute() {
if ((end-start)<temp){
Long sum = 0L;
for (Long i = start; i <= end; i++) {
sum += i;
}
return sum;
}else { // forkjoin 递归
long middle = (start + end) / 2; // 中间值
ForkJoinDemo task1 = new ForkJoinDemo(start, middle);
task1.fork(); // 拆分任务,把任务压入线程队列
ForkJoinDemo task2 = new ForkJoinDemo(middle+1, end);
task2.fork(); // 拆分任务,把任务压入线程队列
return task1.join() + task2.join();
}
}
}
七、CompletableFuture
0、Future接口
Future
接口(FutureTask
是其实现类)提供一种异步并行计算的功能,定义了操作异步任务的一些方法,如获取异步任务的执行结果、取消任务的执行、判断任务是否被取消、判断任务是否完毕等。
Future
接口可以为主线程开一个分支线程,用于处理耗时工作。
而 Future
对结果的获取并不是很友好,只能通过阻塞和轮询的方法得到结果
CompletableFuture
是对 Future
接口的扩展,主要增加了一下功能:
- 回调通知
CompletableFuture
和exceptionally
- 线程池结合异步任务的创建方式
- 多个任务前后依赖,可以组合处理
- 对计算速度选取最快的进行处理
异步编排 CompletableFuture 类似 promise
与同步处理相对,异步处理不用阻塞当前线程来等待处理完成
比如需要在异步查询完一级分类后,获取到结果,在查询二级分类的情况下
(1)接口源码
public interface Future<V> {
/**
* 尝试取消执行此任务。如果任务已经完成、已经取消或由于其他原因无法取消,则此尝试将失败。
* 如果成功,并且在调用cancel时此任务尚未启动,则此任务不应运行。
* 如果任务已经启动,那么mayinterruptirunning参数决定是否应该中断执行该任务的线程
* 以尝试停止该任务。
* 在此方法返回后,对isDone的后续调用将始终返回true。
* 如果此方法返回true,后续对isCancelled的调用将始终返回true。
*/
boolean cancel(boolean mayInterruptIfRunning);
/**
* 判断当前方法是否取消
*/
boolean isCancelled();
/**
* 判断当前方法是否完成
*/
boolean isDone();
/**
* 方法可以当任务结束后返回一个结果,如果调用时,工作还没有结束,则会阻塞线程,直到任务执行完毕
*/
V get() throws InterruptedException, ExecutionException;
/**
* 最多等待timeout的时间就会返回结果
*/
V get(long timeout, TimeUnit unit)
throws InterruptedException, ExecutionException, TimeoutException;
}
(2)实现类
RunnableFuture
这个接口同时继承Future接口和Runnable接口,在成功执行 run()
方法后,可以通过Future访问执行结果。这个接口都实现类是FutureTask,一个可取消的异步计算,这个类提供了Future的基本实现,后面我们的demo也是用这个类实现,它实现了启动和取消一个计算,查询这个计算是否已完成,恢复计算结果。计算的结果只能在计算已经完成的情况下恢复。如果计算没有完成,get方法会阻塞,一旦计算完成,这个计算将不能被重启和取消,除非调用runAndReset方法。
FutureTask
能用来包装一个Callable或Runnable对象,因为它实现了Runnable接口,而且它能被传递到Executor进行执行。为了提供单例类,这个类在创建自定义的工作类时提供了protected构造函数。
SchedualFuture
这个接口表示一个延时的行为可以被取消。通常一个安排好的future是定时任务SchedualedExecutorService的结果
CompleteFuture
一个Future类是显示的完成,而且能被用作一个完成等级,通过它的完成触发支持的依赖函数和行为。当两个或多个线程要执行完成或取消操作时,只有一个能够成功。
ForkJoinTask
基于任务的抽象类,可以通过ForkJoinPool来执行。一个ForkJoinTask是类似于线程实体,但是相对于线程实体是轻量级的。大量的任务和子任务会被ForkJoinPool池中的真实线程挂起来,以某些使用限制为代价。
1、创建异步对象
// 没有返回值的 runAsync 异步回调
public static CompletableFuture<Void> runAsync(Runnable runnable);
// 没有返回值的 runAsync 异步回调,可以指定使用的线程池
public static CompletableFuture<Void> runAsync(Runnable runnable,Executor executor);
// 有返回值的 supplyAsync 异步回调
public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier);
// 有返回值的 supplyAsync 异步回调,可以指定使用的线程池
public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier,Executor executor);
有返回值示例
// 没有返回值的 runAsync 异步回调
CompletableFuture<String> completableFuture = CompletableFuture.supplyAsync(()->{
try {
TimeUnit.SECONDS.sleep(2);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println(Thread.currentThread().getName()+"runAsync=>Void");
return "1";
});
String result = completableFuture.get(); // 获取阻塞执行结果,一般放在方法最后
System.out.println(result);
获取方法返回值
get()
方法用于阻塞获取返回结果,建议设置最大阻塞时长,避免因为意外问题无法获取返回结果导致一直阻塞。
// 阻塞获取返回结果
public T get();
// 阻塞获取返回结果,若超过指定时长则放开阻塞,并抛出TimeoutException异常
public T get(long timeout, TimeUnit unit);
// 基本和get()方法一样,区别是不会抛出检查型异常
public T join()
2、失败与成功回调
类似前端的 promise
创建回调的方式
// 使用当前执行成功的异步线程去执行异步回调
public CompletableFuture<T> whenComplete(
BiConsumer<? super T, ? super Throwable> action);
// 使用线程池中的其他线程去执行异步回调
public CompletableFuture<T> whenCompleteAsync(
BiConsumer<? super T, ? super Throwable> action);
// 使用指定线程池中的其他线程去执行异步回调
public CompletableFuture<T> whenCompleteAsync(
BiConsumer<? super T, ? super Throwable> action, Executor executor)
whenComplete
:用于处理正常和异常的处理结果;虽然能得到异常信息,但是无法修改返回结果,相当于一个监听器
exceptionally
用于处理异常情况,可以得到异常信息,同时设置异常时的返回结果
有返回值的 supplyAsync 的异步回调
// 有返回值的 supplyAsync 异步回调
// ajax,成功和失败的回调
// 返回的是错误信息;
CompletableFuture<Integer> completableFuture = CompletableFuture.supplyAsync(()->{
System.out.println(Thread.currentThread().getName()+"supplyAsync=>Integer");
int i = 10/0;
return 1024;
});
Integer result = completableFuture.whenComplete((res, excption) -> {
System.out.println("t=>" + res); // 正常的返回结果
System.out.println("u=>" + excption); // 错误信息
}).exceptionally((e) -> {
System.out.println(e.getMessage());
return 233; // 可以获取到错误的返回结果
}).get();
System.out.println(result);
3、handle()方法
handle()
方法与 whenComplete()
方法的主要区别在于,handle()
方法能够修改返回的结果。
public <U> CompletableFuture<U> handle(
BiFunction<? super T, Throwable, ? extends U> fn);
public <U> CompletableFuture<U> handleAsync(
BiFunction<? super T, Throwable, ? extends U> fn);
public <U> CompletableFuture<U> handleAsync(
BiFunction<? super T, Throwable, ? extends U> fn, Executor executor);
示例
Integer result = completableFuture.handle((res, excption) -> {
if (res != null) {
// 正常的返回结果
System.out.println("t=>" + res);
return res;
}
if (excption != null){
// 错误信息
System.out.println("u=>" + excption);
return 0;
}
return 1;
}).get();
System.out.println(result);
4、串行化
// 异步线程执行完方法后,继续执行 thenRun 对应的方法,无法感知上一步的结果,无返回结果
public CompletableFuture<Void> thenRun(Runnable action);
// 开启新线程执行 action 任务
public CompletableFuture<Void> thenRunAsync(Runnable action);
// 用指定的线程池中的线程执行 action 任务
public CompletableFuture<Void> thenRunAsync(Runnable action,Executor executor);
/* -------------------------------------------------------------- */
// 异步线程执行完方法后,继续执行 thenAccept 对应的方法,可获得上一步的结果,无返回结果
public CompletableFuture<Void> thenAccept(Consumer<? super T> action);
public CompletableFuture<Void> thenAcceptAsync(Consumer<? super T> action);
public CompletableFuture<Void> thenAcceptAsync(Consumer<? super T> action,
Executor executor);
/* -------------------------------------------------------------- */
// 异步线程执行完方法后,继续执行 thenRun 对应的方法,可获得上一步的结果,可返回结果
public <U> CompletableFuture<U> thenApply(
Function<? super T,? extends U> fn);
public <U> CompletableFuture<U> thenApplyAsync(
Function<? super T,? extends U> fn);
public <U> CompletableFuture<U> thenApplyAsync(
Function<? super T,? extends U> fn, Executor executor);
示例
CompletableFuture<Integer> completableFuture = CompletableFuture.supplyAsync(()->{
System.out.println(Thread.currentThread().getName()+"supplyAsync=>Integer");
//int i = 10/0;
return 10;
});
CompletableFuture<Integer> future = completableFuture.thenApplyAsync((res) -> {
return res * 2;
}).thenApplyAsync((res)->{
return res * 4;
});
Integer result = future.get();
System.out.println(result);
5、两任务组合-都完成
当两个任务都完成时,触发该任务
// CompletionStage<?> other:该参数为另一个任务
// 不需要获取前一个任务结果,无法返回结果
public CompletableFuture<Void> runAfterBoth(CompletionStage<?> other,
Runnable action);
public CompletableFuture<Void> runAfterBothAsync(CompletionStage<?> other,
Runnable action);
public CompletableFuture<Void> runAfterBothAsync(CompletionStage<?> other,
Runnable action,
Executor executor);
/* ------------------------------------------------ */
// 可获取前一个任务结果,无返回结果
public <U> CompletableFuture<Void> thenAcceptBoth(
CompletionStage<? extends U> other,
BiConsumer<? super T, ? super U> action);
public <U> CompletableFuture<Void> thenAcceptBothAsync(
CompletionStage<? extends U> other,
BiConsumer<? super T, ? super U> action);
public <U> CompletableFuture<Void> thenAcceptBothAsync(
CompletionStage<? extends U> other,
BiConsumer<? super T, ? super U> action, Executor executor);
/* -------------------------------------------------- */
// 可获取前一个任务结果,有返回结果
public <U,V> CompletableFuture<V> thenCombine(
CompletionStage<? extends U> other,
BiFunction<? super T,? super U,? extends V> fn);
public <U,V> CompletableFuture<V> thenCombineAsync(
CompletionStage<? extends U> other,
BiFunction<? super T,? super U,? extends V> fn);
public <U,V> CompletableFuture<V> thenCombineAsync(
CompletionStage<? extends U> other,
BiFunction<? super T,? super U,? extends V> fn, Executor executor);
示例
CompletableFuture<Integer> completableFuture = CompletableFuture.supplyAsync(()->{
return 10;
});
CompletableFuture<Integer> completableFuture2 = CompletableFuture.supplyAsync(()->{
return 20;
});
CompletableFuture<Integer> future = completableFuture.thenCombine(completableFuture2, (res1, res2) -> {
System.out.println(res1);
System.out.println(res2);
return res1 + res2;
});
Integer result = future.get();
System.out.println(result); // 30
6、两任务组合-一个完成
指定的两个任务中,其中一个完成时,执行任务
注意:当其中一个任务出现异常时,直接中断,并不会向下执行,这个只是用于,2个任务执行快慢不一的情况。
// runAfterEither: 无法获取上一个任务结果,无返回值
public CompletableFuture<Void> runAfterEither(CompletionStage<?> other,
Runnable action);
public CompletableFuture<Void> runAfterEitherAsync(CompletionStage<?> other,
Runnable action);
public CompletableFuture<Void> runAfterEitherAsync(CompletionStage<?> other,
Runnable action,
Executor executor);
/* -------------------------------------------------------- */
// acceptEither: 可获取上一个任务结果,无返回值
public CompletableFuture<Void> acceptEither(
CompletionStage<? extends T> other, Consumer<? super T> action);
public CompletableFuture<Void> acceptEitherAsync(
CompletionStage<? extends T> other, Consumer<? super T> action);
public CompletableFuture<Void> acceptEitherAsync(
CompletionStage<? extends T> other, Consumer<? super T> action,
Executor executor);
/* --------------------------------------------------------- */
// applyToEither: 可获取上一个任务结果,有返回值
public <U> CompletableFuture<U> applyToEither(
CompletionStage<? extends T> other, Function<? super T, U> fn);
public <U> CompletableFuture<U> applyToEitherAsync(
CompletionStage<? extends T> other, Function<? super T, U> fn);
public <U> CompletableFuture<U> applyToEitherAsync(
CompletionStage<? extends T> other, Function<? super T, U> fn,
Executor executor);
示例
CompletableFuture<Integer> completableFuture = CompletableFuture.supplyAsync(()->{
try {
Thread.sleep(1000);
System.out.println(10);
} catch (InterruptedException e) {
e.printStackTrace();
}
return 10;
});
CompletableFuture<Integer> completableFuture2 = CompletableFuture.supplyAsync(()->{
System.out.println(20);
//int i = 10/0;
return 20;
});
CompletableFuture<Integer> future = completableFuture.applyToEither(completableFuture2, (res) -> {
return res + 1;
});
Integer result = future.get();
System.out.println(result); // 20 21 10
7、多任务组合(重要)
allOf
:等待所有任务执行完成后,执行;返回的 CompletableFuture
无返回值
anyOd
:其中一个任务执行完成后,执行;返回的 CompletableFuture
返回值是最先完成的任务的返回值
public static CompletableFuture<Void> allOf(CompletableFuture<?>... cfs);
public static CompletableFuture<Object> anyOf(CompletableFuture<?>... cfs)
allOf示例
CompletableFuture<Integer> future1 = CompletableFuture.supplyAsync(()->{
System.out.println("查询商品分类信息");
return 10;
});
CompletableFuture<Integer> future2 = CompletableFuture.supplyAsync(()->{
System.out.println("查询商品销量信息");
return 20;
});
CompletableFuture<Integer> future3 = CompletableFuture.supplyAsync(()->{
System.out.println("查询商品属性信息");
return 30;
});
CompletableFuture<Void> allFuture = CompletableFuture.allOf(future1, future2, future3);
allFuture.get(); // 阻塞等待所有线程执行完成
Integer result1 = future1.get();
Integer result2 = future2.get();
Integer result3 = future3.get();
System.out.println(result1);
System.out.println(result2);
System.out.println(result3);
anyOf示例
CompletableFuture<Integer> future1 = CompletableFuture.supplyAsync(()->{
return 10;
});
CompletableFuture<Integer> future2 = CompletableFuture.supplyAsync(()->{
return 20;
});
CompletableFuture<Integer> future3 = CompletableFuture.supplyAsync(()->{
return 30;
});
CompletableFuture<Object> allFuture = CompletableFuture.anyOf(future1, future2, future3);
// 获取最早完成的任务的结果
Integer result = (Integer) allFuture.get();
System.out.println(result);
八、JMM
什么是JMM
Java内存模型,不存在的概念性的东西!
关于JMM的一些同步的约定:
1、线程解锁前,必须把共享变量立刻刷回主存。
2、线程加锁前,必须读取主存中的新值到工作内存中!
3、加锁和解锁是同一把锁
JMM内存模型
线程 工作内存 、主内存
内存交互操作有8种,虚拟机实现必须保证每一个操作都是原子的,不可在分的(对于double和long类 型的变量来说,load、store、read和write操作在某些平台上允许例外)
- read (读取):作用于主内存变量,它把一个变量的值从主内存传输到线程的工作内存中,以便 随后的load动作使用
- load (载入):作用于工作内存的变量,它把read操作从主存中变量放入工作内存中
- use (使用):作用于工作内存中的变量,它把工作内存中的变量传输给执行引擎,每当虚拟机 遇到一个需要使用到变量的值,就会使用到这个指令
- assign (赋值):作用于工作内存中的变量,它把一个从执行引擎中接受到的值放入工作内存的变 量副本中
- store (存储):作用于主内存中的变量,它把一个从工作内存中一个变量的值传送到主内存中, 以便后续的write使用
- write (写入):作用于主内存中的变量,它把store操作从工作内存中得到的变量的值放入主内 存的变量中
以上6条保证了单条指令的原子性,针对多条指令的组合性原子保证,没有大面积加锁,所以JVM又提供了一下2条指令
lock (锁定):作用于主内存的变量,把一个变量标识为线程独占状态 ,只是在写的时候加锁,只锁写变量的过程
unlock (解锁):作用于主内存的变量,它把一个处于锁定状态的变量释放出来,释放后的变量 才可以被其他线程占用
JMM对这八种指令的使用,制定了如下规则:
- 不允许read和load、store和write操作之一单独出现。即使用了read必须load,使用了store必须 write
- 不允许线程丢弃他近的assign操作,即工作变量的数据改变了之后,必须告知主存
- 不允许一个线程将没有assign的数据从工作内存同步回主内存
- 一个新的变量必须在主内存中诞生,不允许工作内存直接使用一个未被初始化的变量。就是怼变量 实施use、store操作之前,必须经过assign和load操作
- 一个变量同一时间只有一个线程能对其进行lock。多次lock后,必须执行相同次数的unlock才能解 锁
- 如果对一个变量进行lock操作,会清空所有工作内存中此变量的值,在执行引擎使用这个变量前, 必须重新load或assign操作初始化变量的值
- 如果一个变量没有被lock,就不能对其进行unlock操作。也不能unlock一个被其他线程锁住的变量
- 对一个变量进行unlock操作之前,必须把此变量同步回主内存
九、Volatile
Volatile 保证可见性,但不保证原子性。并且禁止进行指令重排序。(实现有序性)
详细参阅:https://blog.csdn.net/qq_24047659/article/details/88031712
可见性
即一个线程修改了某个变量的值,这新值对其他线程来说是立即可见的。
原子性
volatile 只能保证对单次读/写的原子性。i++ 这种操作不能保证原子性。
保证可见性 演示
//加了volatile关键字的区别就是会强制保证可见性 不用考虑CPU有没有时间
//system.out.println 是加锁的,锁的是同一个值 num,里面有个synchronize代码块
public class JMMDemo {
// 不加 volatile 程序就会死循环!
// 加 volatile 可以保证可见性
private volatile static int num = 0;
public static void main(String[] args) { // main
new Thread(()->{ // 线程 1 对主内存的变化不知道的
while (num==0){
}
}).start();
try {
TimeUnit.SECONDS.sleep(1);
} catch (InterruptedException e) {
e.printStackTrace();
}
num = 1;
System.out.println(num);
}
}
不保证原子性 演示
线程A在执行任务的时候,不能被打扰的,也不能被分割。要么同时成功,要么同时失败
// volatile 不保证原子性
public class VDemo02 {
// volatile 不保证原子性
// 原子类的 Integer
private volatile static AtomicInteger num = new AtomicInteger();
public static void add(){
// num++; // 不是一个原子性操作
num.getAndIncrement(); // AtomicInteger + 1 方法, CAS
}
public static void main(String[] args) {
//理论上num结果应该为 2 万
for (int i = 1; i <= 20; i++) {
new Thread(()->{
for (int j = 0; j < 1000 ; j++) {
add();
}
}).start();
}
while (Thread.activeCount()>2){ // main gc
//Thread.yield()是在主线程中执行的,意思只要还有除了GC和main线程之外的线程在跑,主线程就让出cpu不往下执行
Thread.yield();
}
System.out.println(Thread.currentThread().getName() + " " + num);
}
}
并发问题
1、CAS
CAS
(Compare and swap)比较和替换是设计并发算法时用到的一种技术。
有点类似乐观锁
场景:在没有锁的情况下,在多线程访问的时候,保证线程一致性的去改动某一个值。
CAS : 比较当前工作内存中的值和主内存中的值,如果这个值是期望的,那么则执行操作!如果不是就 一直循环!
缺点:
1、 循环会耗时
2、一次性只能保证一个共享变量的原子性
3、ABA问题
ABA问题
当读取的初值:X=A,线程1
在对X进行操作时,线程2
读取了X,并将其修改为B后,在修改为A,此时,线程1
对值X计算完毕,比较计算前获取的值X=A与当前的X=A相等,此时无法得知其他线程已经修改过X。
解决方法
加版本号(或者加boolean标签),当线程改动值的同时也要改动版本号。
public class CASDemo {
//AtomicStampedReference 注意,如果泛型是一个包装类,注意对象的引用问题
// 正常在业务操作,这里面比较的都是一个个对象
static AtomicStampedReference<Integer> atomicStampedReference = new AtomicStampedReference<>(1,1);
// CAS compareAndSet : 比较并交换!
public static void main(String[] args) {
new Thread(()->{
int stamp = atomicStampedReference.getStamp(); // 获得版本号
System.out.println("a1=>"+stamp);
try {
TimeUnit.SECONDS.sleep(1);
} catch (InterruptedException e) {
e.printStackTrace();
}
Lock lock = new ReentrantLock(true);
atomicStampedReference.compareAndSet(1, 2,
atomicStampedReference.getStamp(), atomicStampedReference.getStamp() + 1);
System.out.println("a2=>"+atomicStampedReference.getStamp());
System.out.println(atomicStampedReference.compareAndSet(2, 1,
atomicStampedReference.getStamp(), atomicStampedReference.getStamp() + 1));
System.out.println("a3=>"+atomicStampedReference.getStamp());
},"a").start();
// 乐观锁的原理相同!
new Thread(()->{
int stamp = atomicStampedReference.getStamp(); // 获得版本号
System.out.println("b1=>"+stamp);
try {
TimeUnit.SECONDS.sleep(2);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println(atomicStampedReference.compareAndSet(1, 6,
stamp, stamp + 1));
System.out.println("b2=>"+atomicStampedReference.getStamp());
},"b").start();
}
}