JUC
1 什么是 JUC
1.什么是juc
java.util .concurrent 工具包的简称
2.线程和进程的区别
`进程`:指在系统中正在运行的一个应用程序;程序一旦运行就是进程;进程--资源分配的最小单位。
`线程`:系统分配处理器时间资源的基本单元,或者说进程之内独立执行的一个单元执行流。线程——程序执行的最小单位。
1.3线程的状态
1.3.1 线程状态枚举类
Thread.State
NEW,(新建)
RUNNABLE,(准备就绪)
BLOCKED,(阻塞)
WAITING,(不见不散)
TIMED_WAITING,(过时不候)
TERMINATED;(终结)
1.3.2 wait/sleep 的区别
(1)sleep 是 Thread 的静态方法,wait 是 Object 的方法,任何对象实例都能调用。
(2)sleep `不会释放锁,它也不需要占用锁`。wait `会释放锁,但调用它的前提是当前线程占有锁`(即代码要在synchronized 中)。
(3)它们都可以被 interrupted 方法中断。
1.4 并发与并行
1.4.1 串行模式
串行表示所有任务都一一按先后顺序进行。
#串行是一次只能取得一个任务,并执行这个任务。
1.4.2 并行模式
并行意味着`可以同时取得多个任务,并同时去执行所取得的这些任务`。
并行模式相当于将长长的一条队列,划分成了多条短队列,所以并行缩短了任务队列的长度。并行的效率从代码层次上强依赖于多进程/多线程代码,从硬件角度上则依赖于多核 CPU。
1.4.3 并发
`并发(concurrent)指的是多个程序可以同时运行的现象,更细化的是多进程可以同时运行或者多指令可以同时运行`。但这不是重点,在描述并发的时候也不会去扣这种字眼是否精确,并发的重点在于它是一种现象, 并发描述的是多进程同时运行的现象。但实际上,对于单核心 CPU 来说,同一时刻只能运行一个线程。所以,这里的"同时运行"表示的不是真的同一时刻有多个线程运行的现象,这是并行的概念,而是提供一种功能让用户看来多个程序同时运行起来了,但实际上这些程序中的进程不是一直霸占 CPU 的,而是执行一会停一会。
要解决大并发问题,通常是将大任务分解成多个小任务, 由于操作系统对进程的调度是随机的,所以切分成多个小任务后,可能会从任一小任务处执行。这可能会出现一些现象:
• 1.可能出现一个小任务执行了多次,还没开始下个任务的情况。这时一般会采用队列或类似的数据结构来存放各个小任务的成果
• 2.可能出现还没准备好第一步就执行第二步的可能。这时,一般采用多路复用或异步的方式,比如只有准备好产生了事件通知才 执行某个任务。
• 3.可以多进程/多线程的方式并行执行这些小任务。也可以单进程/单线程执行这些小任务,这时很可能要配合多路复用才能达 到较高的效率
1.4.4 小结(重点)
并发:同一时刻多个线程在访问同一个资源,多个线程对一个点
例子:春运抢票 电商秒杀...
并行:多项工作一起执行,之后再汇总
例子:泡方便面,电水壶烧水,一边撕调料倒入桶中
1.5 管程
管程(monitor)是`保证了同一时刻只有一个进程在管程内活动`,即管程内定义的操作在同一时刻只被一个进程调用(由编译器实现).但是这样并不能保证进程以设计的顺序执行
JVM 中同步是基于进入和退出管程(monitor)对象实现的,每个对象都会有一个管程(monitor)对象,管程(monitor)会随着 java 对象一同创建和销毁
执行线程首先要持有管程对象,然后才能执行方法,当方法完成之后会释放管程,方法在执行时候会持有管程,其他线程无法再获取同一个管程
1.6 用户线程和守护线程
`用户线程`:平时用到的普通线程,自定义线程
`守护线程`:运行在后台,是一种特殊的线程,比如垃圾回收
当主线程结束后,用户线程还在运行,JVM 存活 如果没有用户线程,都是守护线程,JVM 结束
2 Lock 接口
2.1 Synchronized
2.1.1 Synchronized 关键字
synchronized 是 Java 中的关键字,是一种`同步锁`。它修饰的对象有以下几种:
1. 修饰一个代码块,被修饰的代码块称为同步语句块,其作用的范围是大括号{}括起来的代码,作用的对象是调用这个代码块的对象;
2. 修饰一个方法,被修饰的方法称为同步方法,其作用的范围是整个方法,作用的对象是调用这个方法的对象;虽然可以使用 synchronized 来定义方法,但 synchronized 并不属于方法定义的一部分,因此,synchronized 关键字不能被继承。如果在父类中的某个方法使用了 synchronized 关键字,而在子类中覆盖了这个方法,在子类中的这个方法默认情况下并不是同步的,而必须显式地在子类的这个方法中加上synchronized 关键字才可以。当然,还可以在子类方法中调用父类中相应的方法,这样虽然子类中的方法不是同步的,但子类调用了父类的同步方法,因此,子类的方法也就相当于同步了。
3. 修改一个静态的方法,其作用的范围是整个静态方法,作用的对象是这个类的所有对象;
4. 修改一个类,其作用的范围是 synchronized 后面括号括起来的部分,作用主的对象是这个类的所有对象。
如果一个代码块被 synchronized 修饰了,当一个线程获取了对应的锁,并执行该代码块时,其他线程便只能一直等待,等待获取锁的线程释放锁,而这里获取锁的线程释放锁只会有两种情况:
1)获取锁的线程执行完了该代码块,然后线程释放对锁的占有;
2)线程执行发生异常,此时 JVM 会让线程自动释放锁。
那么如果这个获取锁的线程由于要等待 IO 或者其他原因(比如调用 sleep方法)被阻塞了,但是又没有释放锁,其他线程便只能干巴巴地等待,试想一下,这多么影响程序执行效率。因此就需要有一种机制可以不让等待的线程一直无期限地等待下去(比如只等待一定的时间或者能够响应中断),通过 Lock 就可以办到。
使用synchronized实现卖票
//第一步 创建资源类,定义属性和和操作方法
class Ticket {
//票数
private int number = 30;
//操作方法:卖票
public synchronized void sale() {
//判断:是否有票
if(number > 0) {
System.out.println(Thread.currentThread().getName()+" : 卖出:"+(number--)+" 剩下:"+number);
}
}
}
public class SaleTicket {
//第二步 创建多个线程,调用资源类的操作方法
public static void main(String[] args) {
//创建Ticket对象
Ticket ticket = new Ticket();
//创建三个线程
new Thread(new Runnable() {
@Override
public void run() {
//调用卖票方法
for (int i = 0; i < 40; i++) {
ticket.sale();
}
}
},"AA").start();
new Thread(new Runnable() {
@Override
public void run() {
//调用卖票方法
for (int i = 0; i < 40; i++) {
ticket.sale();
}
}
},"BB").start();
new Thread(new Runnable() {
@Override
public void run() {
//调用卖票方法
for (int i = 0; i < 40; i++) {
ticket.sale();
}
}
},"CC").start();
}
}
2.2 什么是 Lock
Lock 锁实现提供了比使用同步方法和语句可以获得的更广泛的锁操作。它们允许更灵活的结构,可能具有非常不同的属性,并且可能支持多个关联的条件对象。Lock 提供了比 synchronized 更多的功能。
Lock 与的 Synchronized 区别
• Lock 不是 Java 语言内置的,synchronized 是 Java 语言的关键字,因此是内置特性。Lock 是一个类,通过这个类可以实现同步访问;
• Lock 和 synchronized 有一点非常大的不同,采用 `synchronized 不需要用户去手动释放锁`,当 synchronized 方法或者 synchronized 代码块执行完之后,系统会自动让线程释放对锁的占用;而 `Lock 则必须要用户去手动释放锁`,如果没有主动释放锁,就有可能导致出现死锁现象。
1. Lock 是一个接口,而 synchronized 是 Java 中的关键字,synchronized 是内置的语言实现;
2. synchronized 在发生异常时,会自动释放线程占有的锁,因此不会导致死锁现象发生;而 Lock 在发生异常时,如果没有主动通过 unLock()去释放锁,则很可能造成死锁现象,因此使用 Lock 时需要在 finally 块中释放锁;
3. Lock 可以让等待锁的线程响应中断,而 synchronized 却不行,使用synchronized 时,等待的线程会一直等待下去,不能够响应中断;
4. 通过 Lock 可以知道有没有成功获取锁,而 synchronized 却无法办到。
5. Lock 可以提高多个线程进行读操作的效率。在性能上来说,如果竞争资源不激烈,两者的性能是差不多的,而当竞争资源非常激烈时(即有大量线程同时竞争),此时 Lock 的性能要远远优于synchronized
//使用lock实现卖票
import java.util.concurrent.locks.ReentrantLock;
//第一步 创建资源类,定义属性和和操作方法
class LTicket {
//票数量
private int number = 30;
//创建可重入锁
private final ReentrantLock lock = new ReentrantLock(true);
//卖票方法
public void sale() {
//上锁
lock.lock();
try {
//判断是否有票
if(number > 0) {
System.out.println(Thread.currentThread().getName()+" :卖出"+(number--)+" 剩余:"+number);
}
} finally {
//解锁
lock.unlock();
}
}
}
public class LSaleTicket {
//第二步 创建多个线程,调用资源类的操作方法
//创建三个线程
public static void main(String[] args) {
LTicket ticket = new LTicket();
new Thread(()-> {
for (int i = 0; i < 40; i++) {
ticket.sale();
}
},"AA").start();
new Thread(()-> {
for (int i = 0; i < 40; i++) {
ticket.sale();
}
},"BB").start();
new Thread(()-> {
for (int i = 0; i < 40; i++) {
ticket.sale();
}
},"CC").start();
}
}
2.2.1 Lock 接口**
public interface Lock {
void lock();
void lockInterruptibly() throws InterruptedException;
boolean tryLock();
boolean tryLock(long time, TimeUnit unit) throws InterruptedException;
void unlock();
Condition newCondition();
}
1.lock()
lock()方法是平常使用得最多的一个方法,就是用来获取锁。如果锁已被其他线程获取,则进行等待。
采用 Lock,必须主动去释放锁,并且在发生异常时,不会自动释放锁。因此一般来说,使用 Lock 必须在try{}catch{}块中进行,并且将释放锁的操作放在finally 块中进行,以保证锁一定被被释放,防止死锁的发生。通常使用 Lock来进行同步的话,是以下面这种形式去使用的:
Lock lock = ...;
lock.lock();
try{
//处理任务
}catch(Exception ex){
}finally{
lock.unlock(); //释放锁
}
2.newCondition
关键字 synchronized 与 wait()/notify()这两个方法一起使用可以实现等待/通知模式, Lock 锁的 newContition()方法返回 Condition 对象,Condition 类也可以实现等待/通知模式。
用 notify()通知时,JVM 会随机唤醒某个等待的线程, 使用 Condition 类可以进行选择性通知, Condition 比较常用的两个方法:
• await()会使当前线程等待,同时会释放锁,当其他线程调用 signal()时,线程会重
新获得锁并继续执行。
• signal()用于唤醒一个等待的线程。
==注意:在调用 Condition 的 await()/signal()方法前,也需要线程持有相关的 Lock 锁,调用 await()后线程会释放这个锁,在 singal()调用后会从当前Condition 对象的等待队列中,唤醒 一个线程,唤醒的线程尝试获得锁, 一旦获得锁成功就继续执行。==
3. ReentrantLock
ReentrantLock,意思是“可重入锁”,关于可重入锁的概念将在后面讲述。ReentrantLock 是唯一实现了 Lock 接口的类,并且 ReentrantLock 提供了更多的方法。下面通过一些实例看具体看一下如何使用。
4. ReadWriteLock
//ReadWriteLock 也是一个接口,在它里面只定义了两个方法:
public interface ReadWriteLock {
/**
* Returns the lock used for reading.
*
* @return the lock used for reading.
*/
Lock readLock();
/**
* Returns the lock used for writing.
*
* @return the lock used for writing.
*/
Lock writeLock();
}
一个用来获取读锁,一个用来获取写锁。也就是说将文件的读写操作分开,分成 2 个锁来分配给线程,从而使得多个线程可以同时进行读操作。下面的ReentrantReadWriteLock 实现了 ReadWriteLock 接口。ReentrantReadWriteLock 里面提供了很多丰富的方法,不过最主要的有两个方法:readLock()和 writeLock()用来获取读锁和写锁。下面通过几个例子来看一下 ReentrantReadWriteLock 具体用法。假如有多个线程要同时进行读操作的话,先看一下 synchronized 达到的效果:
public class Test {
private ReentrantReadWriteLock rwl = new
ReentrantReadWriteLock();
public static void main(String[] args) {
final Test test = new Test();
new Thread(){
public void run() {
test.get(Thread.currentThread());
};
}.start();
new Thread(){
public void run() {
test.get(Thread.currentThread());
};
}.start();
}
public synchronized void get(Thread thread) {
long start = System.currentTimeMillis();
while(System.currentTimeMillis() - start <= 1) {
System.out.println(thread.getName()+"正在进行读操作");
}
System.out.println(thread.getName()+"读操作完毕");
}
}
//而改成用读写锁的话:
public class Test {
private ReentrantReadWriteLock rwl = new ReentrantReadWriteLock();
public static void main(String[] args) {
final Test test = new Test();
new Thread(){
public void run() {
test.get(Thread.currentThread());
};
}.start();
new Thread(){
public void run() {
test.get(Thread.currentThread());
};
}.start();
}
public void get(Thread thread) {
rwl.readLock().lock();
try {
long start = System.currentTimeMillis();
while(System.currentTimeMillis() - start <= 1) {
System.out.println(thread.getName()+"正在进行读操作");
}
System.out.println(thread.getName()+"读操作完毕");
} finally {
rwl.readLock().unlock();
}
}
}
说明 thread1 和 thread2 在同时进行读操作。这样就大大提升了读操作的效率。
==注意:==
• 如果有一个线程已经占用了读锁,则此时其他线程如果要申请写锁,则申请写
锁的线程会一直等待释放读锁。
• 如果有一个线程已经占用了写锁,则此时其他线程如果申请写锁或者读锁,则
申请的线程会一直等待释放写锁。
3 线程间通信
3.1 synchronized 方案
线程间通信的模型有两种:
共享内存
和消息传递
,
//场景---两个线程,一个线程对当前数值加 1,另一个线程对当前数值减 1,要求用线程间通信
//第一步 创建资源类,定义属性和操作方法
class Share {
//初始值
private int number = 0;
//+1的方法
public synchronized void incr() throws InterruptedException {
//第二步 判断 干活 通知
while(number != 0) { //判断number值是否是0,如果不是0,等待
this.wait(); //在哪里睡,就在哪里醒
}
//如果number值是0,就+1操作
number++;
System.out.println(Thread.currentThread().getName()+" :: "+number);
//通知其他线程
this.notifyAll();
}
//-1的方法
public synchronized void decr() throws InterruptedException {
//判断
while(number != 1) {
this.wait();
}
//干活
number--;
System.out.println(Thread.currentThread().getName()+" :: "+number);
//通知其他线程
this.notifyAll();
}
}
public class ThreadDemo1 {
//第三步 创建多个线程,调用资源类的操作方法
public static void main(String[] args) {
Share share = new Share();
//创建线程
new Thread(()->{
for (int i = 1; i <=10; i++) {
try {
share.incr(); //+1
} catch (InterruptedException e) {
e.printStackTrace();
}
}
},"AA").start();
new Thread(()->{
for (int i = 1; i <=10; i++) {
try {
share.decr(); //-1
} catch (InterruptedException e) {
e.printStackTrace();
}
}
},"BB").start();
new Thread(()->{
for (int i = 1; i <=10; i++) {
try {
share.incr(); //+1
} catch (InterruptedException e) {
e.printStackTrace();
}
}
},"CC").start();
new Thread(()->{
for (int i = 1; i <=10; i++) {
try {
share.decr(); //-1
} catch (InterruptedException e) {
e.printStackTrace();
}
}
},"DD").start();
}
}
3.2Lock 方案
//wiat方法 判断条件必须 用while循环,否则会出现虚假唤醒问题
-----------------------------------------------------------------------------------------------
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
//第一步 创建资源类,定义属性和操作方法
class Share {
private int number = 0;
//创建Lock
private Lock lock = new ReentrantLock();
private Condition condition = lock.newCondition();
//+1
public void incr() throws InterruptedException {
//上锁
lock.lock();
try {
//判断
while (number != 0) {
condition.await();
}
//干活
number++;
System.out.println(Thread.currentThread().getName()+" :: "+number);
//通知
condition.signalAll(); //相当于notifyall
}finally {
//解锁
lock.unlock();
}
}
//-1
public void decr() throws InterruptedException {
lock.lock();
try {
while(number != 1) {
condition.await();
}
number--;
System.out.println(Thread.currentThread().getName()+" :: "+number);
condition.signalAll();
}finally {
lock.unlock();
}
}
}
public class ThreadDemo2 {
public static void main(String[] args) {
Share share = new Share();
new Thread(()->{
for (int i = 1; i <=10; i++) {
try {
share.incr();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
},"AA").start();
new Thread(()->{
for (int i = 1; i <=10; i++) {
try {
share.decr();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
},"BB").start();
new Thread(()->{
for (int i = 1; i <=10; i++) {
try {
share.incr();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
},"CC").start();
new Thread(()->{
for (int i = 1; i <=10; i++) {
try {
share.decr();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
},"DD").start();
}
}
3.4 线程间定制化通信
线程交替打印 ppt21页 视频13
import java.util.ArrayList;
import java.util.List;
import java.util.UUID;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
//第一步 创建资源类
class ShareResource {
//定义标志位
private int flag = 1; // 1 AA 2 BB 3 CC
//创建Lock锁
private Lock lock = new ReentrantLock();
//创建三个condition
private Condition c1 = lock.newCondition();
private Condition c2 = lock.newCondition();
private Condition c3 = lock.newCondition();
//打印5次,参数第几轮
public void print5(int loop) throws InterruptedException {
//上锁
lock.lock();
try {
//判断
while(flag != 1) {
//等待
c1.await();
}
//干活
for (int i = 1; i <=5; i++) {
System.out.println(Thread.currentThread().getName()+" :: "+i+" :轮数:"+loop);
}
//通知
flag = 2; //修改标志位 2
c2.signal(); //通知BB线程 Wakes up one waiting thread. 唤醒BB线程
}finally {
//释放锁
lock.unlock();
}
}
//打印10次,参数第几轮
public void print10(int loop) throws InterruptedException {
lock.lock();
try {
while(flag != 2) {
c2.await();
}
for (int i = 1; i <=10; i++) {
System.out.println(Thread.currentThread().getName()+" :: "+i+" :轮数:"+loop);
}
//修改标志位
flag = 3;
//通知CC线程
c3.signal();
}finally {
lock.unlock();
}
}
//打印15次,参数第几轮
public void print15(int loop) throws InterruptedException {
lock.lock();
try {
while(flag != 3) {
c3.await();
}
for (int i = 1; i <=15; i++) {
System.out.println(Thread.currentThread().getName()+" :: "+i+" :轮数:"+loop);
}
//修改标志位
flag = 1;
//通知AA线程
c1.signal();
}finally {
lock.unlock();
}
}
}
public class ThreadDemo3 {
public static void main(String[] args) {
ShareResource shareResource = new ShareResource();
new Thread(()->{
for (int i = 1; i <=10; i++) {
try {
shareResource.print5(i);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
},"AA").start();
new Thread(()->{
for (int i = 1; i <=10; i++) {
try {
shareResource.print10(i);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
},"BB").start();
new Thread(()->{
for (int i = 1; i <=10; i++) {
try {
shareResource.print15(i);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
},"CC").start();
}
}
4 集合的线程安全
4.1ArrayList()
异常内容
java.util.ConcurrentModificationException
import java.util.ArrayList;
import java.util.List;
import java.util.UUID;
/**
* 集合线程安全案例
*/
public class NotSafeDemo {
/**
* 多个线程同时对集合进行修改
* @param args
*/
public static void main(String[] args) {
List list = new ArrayList();
for (int i = 0; i < 100; i++) {
new Thread(() ->{
list.add(UUID.randomUUID().toString());
System.out.println(list);
}, "线程" + i).start();
}
}
}
4.2如何去解决 List 类型的线程安全问题?
4.2.1Vector
List list = new Vector();
4.2.2Collections
List list = Collections.synchronizedList(new ArrayList<>());
4.2.3 CopyOnWriteArrayList(重点)
List list = new CopyOnWriteArrayList();
4.3Set/Map
Set<String> set = new CopyOnWriteArraySet<>();
Map<String,String> map = new ConcurrentHashMap<>();
5 多线程锁
5.1 Synchonized 的八个问题(锁范围)
具体表现为以下 3 种形式。
对于普通同步方法,锁是当前实例对象。
对于静态同步方法,锁是当前类的 Class 对象。
对于同步方法块,锁是 Synchonized 括号里配置的对象
5.2公平锁和非公平锁
Lock lock = new ReentrantLock(true); //公平 线程饿死问题
Lock lock = new ReentrantLock(); //非公平 效率高
5.3可重入锁
5.3.1synchronized实现(隐式)
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
//可重入锁
public class SyncLockDemo {
public static void main(String[] args) {
// synchronized
Object o = new Object();
new Thread(()->{
synchronized(o) {
System.out.println(Thread.currentThread().getName()+" 外层");
synchronized (o) {
System.out.println(Thread.currentThread().getName()+" 中层");
synchronized (o) {
System.out.println(Thread.currentThread().getName()+" 内层");
}
}
}
},"t1").start();
}
}
5.3.2Lock(显式)
package com.atguigu.sync;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
//可重入锁
public class SyncLockDemo {
public static void main(String[] args) {
//Lock演示可重入锁
Lock lock = new ReentrantLock();
//创建线程
new Thread(()->{
try {
//上锁
lock.lock();
System.out.println(Thread.currentThread().getName()+" 外层");
try {
//上锁
lock.lock();
System.out.println(Thread.currentThread().getName()+" 内层");
}finally {
//释放锁
lock.unlock();
}
}finally {
//释放做
lock.unlock();
}
},"t1").start();
//创建新线程
new Thread(()->{
lock.lock();
System.out.println("aaaa");
lock.unlock();
},"aa").start();
}
}
5.4死锁
产生死锁的原因:
1.系统资源不足 2.进程运行推进顺序不合适 3.资源分配不当
验证是否是死锁:
1.jps -l 2.jstack jvm自带的堆栈跟踪工具
import java.util.concurrent.TimeUnit;
/**
* 演示死锁
*/
@SuppressWarnings("all")
public class DeadLock {
//创建两个对象
static Object a = new Object();
static Object b = new Object();
public static void main(String[] args) {
new Thread(()->{
synchronized (a) {
System.out.println(Thread.currentThread().getName()+" 持有锁a,试图获取锁b");
try {
TimeUnit.SECONDS.sleep(1);
} catch (InterruptedException e) {
e.printStackTrace();
}
synchronized (b) {
System.out.println(Thread.currentThread().getName()+" 获取锁b");
}
}
},"A").start();
new Thread(()->{
synchronized (b) {
System.out.println(Thread.currentThread().getName()+" 持有锁b,试图获取锁a");
try {
TimeUnit.SECONDS.sleep(1);
} catch (InterruptedException e) {
e.printStackTrace();
}
synchronized (a) {
System.out.println(Thread.currentThread().getName()+" 获取锁a");
}
}
},"B").start();
}
}
6 .Callable&Future 接口
6.1Callable和Runnable区别
• 为了实现 Runnable,需要实现不返回任何内容的 `run()`方法,
而对于Callable,需要实现在完成时返回结果的 `call()`方法。
• call()方法可以引发异常,而 run()则不能。
• 为实现 Callable 而必须重写 call 方法
• 不能直接替换 runnable,因为 Thread 类的构造方法根本没有 Callable
6.2 Future 接口
//FutureTask
FutureTask<Integer> futureTask1 = new FutureTask<>(new MyThread2());
//lam表达式
FutureTask<Integer> futureTask2 = new FutureTask<>(()->{
System.out.println(Thread.currentThread().getName()+" come in callable");
return 1024;
});
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.FutureTask;
//比较两个接口
//实现Runnable接口
class MyThread1 implements Runnable {
@Override
public void run() {
}
}
//实现Callable接口
class MyThread2 implements Callable {
@Override
public Integer call() throws Exception {
System.out.println(Thread.currentThread().getName()+" come in callable");
return 200;
}
}
public class Demo1 {
public static void main(String[] args) throws ExecutionException, InterruptedException {
//Runnable接口创建线程
new Thread(new MyThread1(),"AA").start();
//Callable接口,报错
// new Thread(new MyThread2(),"BB").start();
//FutureTask
FutureTask<Integer> futureTask1 = new FutureTask<>(new MyThread2());
//lam表达式
FutureTask<Integer> futureTask2 = new FutureTask<>(()->{
System.out.println(Thread.currentThread().getName()+" come in callable");
return 1024;
});
//创建一个线程
new Thread(futureTask2,"lucy").start();
new Thread(futureTask1,"mary").start();
while(!futureTask2.isDone()) { //判断是否已经完成
System.out.println("wait.....");
}
//调用FutureTask的get方法
System.out.println(futureTask2.get()); //futuretask只会计算一次
System.out.println(futureTask1.get());
System.out.println(Thread.currentThread().getName()+" come over");
}
}
7 .JUC 三大辅助类
• CountDownLatch: 减少计数
• CyclicBarrier: 循环栅栏
• Semaphore: 信号灯
7.1 减少计数 CountDownLatch
CountDownLatch 类可以设置一个计数器,然后通过 `countDown 方法来进行减1` 的操作,使用 await 方法等待计数器不大于 0,然后继续执行 await 方法之后的语句。
• CountDownLatch 主要有两个方法,当一个或多个线程`调用 await 方法时,这些线程会阻塞`
• 其它线程调用 `countDown 方法会将计数器减 1`(调用 countDown 方法的线程不会阻塞)
• 当计数器的值变为 0 时,因 await 方法阻塞的线程会被唤醒,继续执行
import java.util.concurrent.CountDownLatch;
//演示 CountDownLatch
public class CountDownLatchDemo {
//6个同学陆续离开教室之后,班长锁门
public static void main(String[] args) throws InterruptedException {
//创建CountDownLatch对象,设置初始值
CountDownLatch countDownLatch = new CountDownLatch(6);
//6个同学陆续离开教室之后
for (int i = 1; i <=6; i++) {
new Thread(()->{
System.out.println(Thread.currentThread().getName()+" 号同学离开了教室");
//计数 -1
countDownLatch.countDown();
},String.valueOf(i)).start();
}
//等待
countDownLatch.await();
System.out.println(Thread.currentThread().getName()+" 班长锁门走人了");
}
}
7.2 循环栅栏 CyclicBarrier
//达到屏障点才会触发,否则程序一直阻塞
import java.util.concurrent.BrokenBarrierException;
import java.util.concurrent.CyclicBarrier;
//集齐7颗龙珠就可以召唤神龙
public class CyclicBarrierDemo {
//创建固定值
private static final int NUMBER = 7;
public static void main(String[] args) {
//创建CyclicBarrier
CyclicBarrier cyclicBarrier =
new CyclicBarrier(NUMBER,()->{
System.out.println("*****集齐7颗龙珠就可以召唤神龙");
});
//集齐七颗龙珠过程
for (int i = 1; i <=7; i++) {
new Thread(()->{
try {
System.out.println(Thread.currentThread().getName()+" 星龙被收集到了");
//等待
cyclicBarrier.await();
} catch (Exception e) {
e.printStackTrace();
}
},String.valueOf(i)).start();
}
}
}
7.3 信号灯 Semaphore
Semaphore 的构造方法中传入的第一个参数是最大信号量(可以看成最大线程池),
每个信号量初始化为一个最多只能分发一个许可证。
使用 acquire 方法获得许可证,release 方法释放许可
import java.util.Random;
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
//6辆汽车,停3个车位
public class SemaphoreDemo {
public static void main(String[] args) {
//创建Semaphore,设置许可数量
Semaphore semaphore = new Semaphore(3);
//模拟6辆汽车
for (int i = 1; i <=6; i++) {
new Thread(()->{
try {
//抢占
semaphore.acquire();
System.out.println(Thread.currentThread().getName()+" 抢到了车位");
//设置随机停车时间
TimeUnit.SECONDS.sleep(new Random().nextInt(5));
System.out.println(Thread.currentThread().getName()+" ------离开了车位");
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
//释放
semaphore.release();
}
},String.valueOf(i)).start();
}
}
}
8.读写锁
JAVA 的并发包提供了读写锁 ReentrantReadWriteLock,它表示两个锁,一个是`读操作相关的锁`,称为`共享锁`;一个是`写相关的锁`,称为`排他锁`
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
//资源类
class MyCache {
//创建map集合
private volatile Map<String,Object> map = new HashMap<>();
//创建读写锁对象
private ReadWriteLock rwLock = new ReentrantReadWriteLock();
//放数据
public void put(String key,Object value) {
//添加写锁
rwLock.writeLock().lock();
try {
System.out.println(Thread.currentThread().getName()+" 正在写操作"+key);
//暂停一会
TimeUnit.MICROSECONDS.sleep(300);
//放数据
map.put(key,value);
System.out.println(Thread.currentThread().getName()+" 写完了"+key);
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
//释放写锁
rwLock.writeLock().unlock();
}
}
//取数据
public Object get(String key) {
//添加读锁
rwLock.readLock().lock();
Object result = null;
try {
System.out.println(Thread.currentThread().getName()+" 正在读取操作"+key);
//暂停一会
TimeUnit.MICROSECONDS.sleep(300);
result = map.get(key);
System.out.println(Thread.currentThread().getName()+" 取完了"+key);
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
//释放读锁
rwLock.readLock().unlock();
}
return result;
}
}
public class ReadWriteLockDemo {
public static void main(String[] args) throws InterruptedException {
MyCache myCache = new MyCache();
//创建线程放数据
for (int i = 1; i <=5; i++) {
final int num = i;
new Thread(()->{
myCache.put(num+"",num+"");
},String.valueOf(i)).start();
}
TimeUnit.MICROSECONDS.sleep(300);
//创建线程取数据
for (int i = 1; i <=5; i++) {
final int num = i;
new Thread(()->{
myCache.get(num+"");
},String.valueOf(i)).start();
}
}
}
在线程`持有读锁`的情况下,该线程`不能取得写锁`
在线程`持有写锁`的情况下,该线程`可以继续获取读锁`
8.1锁降级
//写锁降级为读锁 读锁不能升级为写锁
//写锁降级为读锁 //写锁降级为读锁 //写锁降级为读锁 //写锁降级为读锁 //写锁降级为读锁 //写锁降级为读锁
import java.util.concurrent.locks.ReentrantReadWriteLock;
//演示读写锁降级
public class Demo1 {
public static void main(String[] args) {
//可重入读写锁对象
ReentrantReadWriteLock rwLock = new ReentrantReadWriteLock();
ReentrantReadWriteLock.ReadLock readLock = rwLock.readLock();//读锁
ReentrantReadWriteLock.WriteLock writeLock = rwLock.writeLock();//写锁
//锁降级
//2 获取读锁
readLock.lock();
System.out.println("---read");
//1 获取写锁
writeLock.lock();
System.out.println("att");
//3 释放写锁
//writeLock.unlock();
//4 释放读锁
//readLock.unlock();
}
}
9. 阻塞队列
9.1 BlockingQueue 简介
//实现类
ArrayBlockingQueue , DelayQueue , LinkedBlockingDeque , LinkedBlockingQueue , LinkedTransferQueue , PriorityBlockingQueue , SynchronousQueue
1.放入数据
• offer(anObject):表示如果可能的话,将 anObject 加到 BlockingQueue 里,即如果 BlockingQueue 可以容纳,则返回 true,否则返回 false.(本方法不阻塞当前执行方法的线程)
• offer(E o, long timeout, TimeUnit unit):可以设定等待的时间,如果在指定的时间内,还不能往队列中加入 BlockingQueue,则返回失败
• put(anObject):把 anObject 加到 BlockingQueue 里,如果 BlockQueue 没有空间,则调用此方法的线程被阻断直到 BlockingQueue 里面有空间再继续.
2.获取数据
• poll(time): 取走 BlockingQueue 里排在首位的对象,若不能立即取出,则可以等time 参数规定的时间,取不到时返回 null
• poll(long timeout, TimeUnit unit):从 BlockingQueue 取出一个队首的对象,如果在指定时间内,队列一旦有数据可取,则立即返回队列中的数据。否则知道时间超时还没有数据可取,返回失败。
• take(): 取走 BlockingQueue 里排在首位的对象,若 BlockingQueue 为空,阻断进入等待状态直到 BlockingQueue 有新的数据被加入;
• drainTo(): 一次性从 BlockingQueue 获取所有可用的数据对象(还可以指定获取数据的个数),通过该方法,可以提升获取数据效率;不需要多次分批加锁或释放锁。
方法类型 | 抛出异常 | 特殊值 | 阻塞 | 超时 |
---|---|---|---|---|
插入 | add(e) | offer(e) | put(e) | offer(e.time,unit) |
移除 | remove() | poll() | take() | poll(time,poll) |
检查 | element() | peek() | 不可用 | 不可用 |
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
//阻塞队列
public class BlockingQueueDemo {
public static void main(String[] args) throws InterruptedException {
//创建阻塞队列
BlockingQueue<String> blockingQueue = new ArrayBlockingQueue<>(3);
//第一组
// System.out.println(blockingQueue.add("a"));
//// System.out.println(blockingQueue.add("b"));
//// System.out.println(blockingQueue.add("c"));
//// //System.out.println(blockingQueue.element());
////
//// //System.out.println(blockingQueue.add("w")); //队列长度为3 ,再添加元素会抛异常
//// System.out.println(blockingQueue.remove());
//// System.out.println(blockingQueue.remove());
//// System.out.println(blockingQueue.remove());
//// System.out.println(blockingQueue.remove()); //一共三个元素,移除会抛异常
//第二组 offer添加元素 poll取元素
// System.out.println(blockingQueue.offer("a"));
// System.out.println(blockingQueue.offer("b"));
// System.out.println(blockingQueue.offer("c"));
// System.out.println(blockingQueue.offer("www")); //false
// System.out.println(blockingQueue.poll());
// System.out.println(blockingQueue.poll());
// System.out.println(blockingQueue.poll());
// System.out.println(blockingQueue.poll());
//第三组
// blockingQueue.put("a");
// blockingQueue.put("b");
// blockingQueue.put("c");
// //blockingQueue.put("w"); //队列长度满了,会一直阻塞
// System.out.println(blockingQueue.take());
// System.out.println(blockingQueue.take());
// System.out.println(blockingQueue.take());
// System.out.println(blockingQueue.take()); //队列为空,会一直阻塞
//第四组
System.out.println(blockingQueue.offer("a"));
System.out.println(blockingQueue.offer("b"));
System.out.println(blockingQueue.offer("c"));
System.out.println(blockingQueue.offer("w",3L, TimeUnit.SECONDS)); //超过超时时间自动退出
}
}
9.2常见的 BlockingQueue
#1 ArrayBlockingQueue(常用)
基于数组的阻塞队列实现,在 ArrayBlockingQueue 内部,维护了一个定长数组,以便缓存队列中的数据对象,这是一个常用的阻塞队列,除了一个定长数组外,ArrayBlockingQueue 内部还保存着两个整形变量,分别标识着队列的
头部和尾部在数组中的位置。
ArrayBlockingQueue 在生产者放入数据和消费者获取数据,都是共用同一个锁对象,由此也意味着两者无法真正并行运行,这点尤其不同于LinkedBlockingQueue;按照实现原理来分析,ArrayBlockingQueue 完全可以采用分离锁,从而实现生产者和消费者操作的完全并行运行。Doug Lea 之所以没这样去做,也许是因为 ArrayBlockingQueue 的数据写入和获取操作已
经足够轻巧,以至于引入独立的锁机制,除了给代码带来额外的复杂性外,其在性能上完全占不到任何便宜。 ArrayBlockingQueue 和LinkedBlockingQueue 间还有一个明显的不同之处在于,前者在插入或删除元素时不会产生或销毁任何额外的对象实例,而后者则会生成一个额外的Node 对象。这在长时间内需要高效并发地处理大批量数据的系统中,其对于GC 的影响还是存在一定的区别。而在创建 ArrayBlockingQueue 时,我们还可以控制对象的内部锁是否采用公平锁,默认采用非公平锁。
==一句话总结: `由数组结构组成的有界阻塞队列`。==
#2 LinkedBlockingQueue(常用)
基于链表的阻塞队列,同 ArrayListBlockingQueue 类似,其内部也维持着一个数据缓冲队列(该队列由一个链表构成),当生产者往队列中放入一个数据时,队列会从生产者手中获取数据,并缓存在队列内部,而生产者立即返回;只有当队列缓冲区达到最大值缓存容量时(LinkedBlockingQueue 可以通过构造函数指定该值),才会阻塞生产者队列,直到消费者从队列中消费掉一份数据,生产者线程会被唤醒,反之对于消费者这端的处理也基于同样的原理。而 LinkedBlockingQueue 之所以能够高效的处理并发数据,还因为其对于生产者端和消费者端分别采用了独立的锁来控制数据同步,这也意味着在高并发的情况下生产者和消费者可以并行地操作队列中的数据,以此来提高整个队列的并发性能。ArrayBlockingQueue 和 LinkedBlockingQueue 是两个最普通也是最常用的阻塞队列,一般情况下,在处理多线程间的生产者消费者问题,使用这两个类足以。
==一句话总结: `由链表结构组成的有界(但大小默认值为integer.MAX_VALUE)阻塞队列`。==
#3 DelayQueue
DelayQueue 中的元素只有当其指定的延迟时间到了,才能够从队列中获取到该元素。DelayQueue 是一个没有大小限制的队列,因此往队列中插入数据的操作(生产者)永远不会被阻塞,而只有获取数据的操作(消费者)才会被阻塞。
==一句话总结: 使用优先级队列实现的延迟无界阻塞队列。==
//9.4.4 PriorityBlockingQueue
基于优先级的阻塞队列(优先级的判断通过构造函数传入的 Compator 对象来决定),但需要注意的PriorityBlockingQueue 并不会阻塞数据生产者,而只会在没有可消费的数据时,阻塞数据的消费者。因此使用的时候要特别注意,生产者生产数据的速度绝对不能快于消费者消费数据的速度,否则时间一长,会最终耗尽所有的可用堆内存空间。在实现 PriorityBlockingQueue 时,内部控制线程同步的锁采用的是公平锁。
==一句话总结: `支持优先级排序的无界阻塞队列`。==
# 5 SynchronousQueue
一种无缓冲的等待队列,类似于无中介的直接交易,有点像原始社会中的生产者和消费者,生产者拿着产品去集市销售给产品的最终消费者,而消费者必须亲自去集市找到所要商品的直接生产者,如果一方没有找到合适的目标,那么对不起,大家都在集市等待。相对于有缓冲的 BlockingQueue 来说,少了一个中间经销商的环节(缓冲区),如果有经销商,生产者直接把产品批发给经销商,而无需在意经销商最终会将这些产品卖给那些消费者,由于经销商可以库存一部分商品,因此相对于直接交易模式,总体来说采用中间经销商的模式会吞吐量高一些(可以批量买卖);但另一方面,又因为经销商的引入,使得产品从生产者到消费者中间增加了额外的交易环节,单个产品的及时响应性能可能会降低。声明一个 SynchronousQueue 有两种不同的方式,它们之间有着不太一样的行为。
公平模式和非公平模式的区别:
• 公平模式:SynchronousQueue 会采用公平锁,并配合一个 FIFO 队列来阻塞多余的生产者和消费者,从而体系整体的公平策略;
• 非公平模式(SynchronousQueue 默认):SynchronousQueue 采用非公平锁,同时配合一个 LIFO 队列来管理多余的生产者和消费者,而后一种模式,如果生产者和消费者的处理速度有差距,则很容易出现饥渴的情况,即可能有某些生产者或者是消费者的数据永远都得不到处理。
==一句话总结: `不存储元素的阻塞队列,也即单个元素的队列`。==
#6 LinkedTransferQueue
LinkedTransferQueue 是一个由链表结构组成的无界阻塞 TransferQueue 队列。相对于其他阻塞队列LinkedTransferQueue 多了 tryTransfer 和transfer 方法。LinkedTransferQueue 采用一种预占模式。意思就是消费者线程取元素时,如果队列不为空,则直接取走数据,若队列为空,那就生成一个节点(节点元素为 null)入队,然后消费者线程被等待在这个节点上,后面生产者线程入队时发现有一个元素为 null 的节点,生产者线程就不入队了,直接就将元素填充到该节点,并唤醒该节点等待的线程,被唤醒的消费者线程取走元素,从调用的方法返回。
==一句话总结: `由链表组成的无界阻塞队列`。==
#7 LinkedBlockingDeque
LinkedBlockingDeque 是一个由链表结构组成的双向阻塞队列,即可以从队列的两端插入和移除元素。对于一些指定的操作,在插入或者获取队列元素时如果队列状态不允许该操作可能会阻塞住该线程直到队列状态变更为允许操作,这里的阻塞一般有两种情况 • 插入元素时: 如果当前队列已满将会进入阻塞状态,一直等到队列有空的位置时再讲该元素插入,该操作可以通过设置超时参数,超时后返回 false 表示操作失败,也可以不设置超时参数一直阻塞,中断后抛出 InterruptedException 异 常 • 读取元素时: 如果当前队列为空会阻塞住直到队列不为空然后返回元素,同样可以通过设置超时参数
==一句话总结: `由链表组成的双向阻塞队列`==
10 ThreadPool 线程池
它的主要特点为:
• 降低资源消耗: 通过重复利用已创建的线程降低线程创建和销毁造成的销耗。
• 提高响应速度: 当任务到达时,任务可以不需要等待线程创建就能立即执行。
• 提高线程的可管理性: 线程是稀缺资源,如果无限制的创建,不仅会销耗系统资源,还会降低系统的稳定性,使用线程池可以进行统一的分配,调优和监控。
• Java 中的线程池是通过 Executor 框架实现的,该框架中用到了 Executor,Executors,ExecutorService,ThreadPoolExecutor 这几个类
10.2 线程池参数说明
• corePoolSize 线程池的核心线程数
• maximumPoolSize 能容纳的最大线程数
• keepAliveTime 空闲线程存活时间
• unit 存活的时间单位
• workQueue 存放提交但未执行任务的队列
• threadFactory 创建线程的工厂类
• handler 等待队列满后的拒绝策略
线程池中,有三个重要的参数,决定影响了拒绝策略:corePoolSize - 核心线程数,也即最小的线程数。workQueue - 阻塞队列 。
maximumPoolSize -最大线程数当提交任务数大于 corePoolSize 的时候,会优先将任务放到 workQueue 阻塞队列中。当阻塞队列饱和后,会扩充线程池中线程数,直到达到maximumPoolSize 最大线程数配置。此时,再多余的任务,则会触发线程池的拒绝策略了。
总结起来,也就是一句话,当提交的任务数大于(workQueue.size() + maximumPoolSize ),就会触发线程池的拒绝策略。
10.2.2 拒绝策略(重点)
`CallerRunsPolicy`:(调用者模式) 当触发拒绝策略,只要线程池没有关闭的话,则使用调用 线程直接运行任务。一般并发比较小,性能要求不高,不允许失败。但是,由 于调用者自己运行任务,如果任务提交速度过快,可能导致程序阻塞,性能效率上必然的损失较大
`AbortPolicy`: (默认的)抛出拒绝执行 RejectedExecutionException 异常信息。`会打断当前的执行流程,影响后续的任务执行`。
`DiscardPolicy`: (会丢任务)默默丢弃无法处理的任务,不予任何处理也不抛出异常
`DiscardOldestPolicy`: 抛弃队列中等待最久的任务,然后把当前任务加入队列中,尝试再次提交当前任务
10.3 线程池的种类与创建
10.3.1 newCachedThreadPool(常用)
• 线程池中数量没有固定,可达到最大值(Interger. MAX_VALUE)
• 线程池中的线程可进行缓存重复利用和回收(回收默认时间为 1 分钟)
• 当线程池中,没有可用线程,会重新创建一个线程
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
//演示线程池三种常用分类
public class ThreadPoolDemo1 {
public static void main(String[] args) {
//一池五线程
ExecutorService threadPool1 = Executors.newFixedThreadPool(5); //5个窗口
//一池一线程
ExecutorService threadPool2 = Executors.newSingleThreadExecutor(); //一个窗口
//一池可扩容线程
ExecutorService threadPool3 = Executors.newCachedThreadPool();
//10个顾客请求
try {
for (int i = 1; i <=10; i++) {
//执行
threadPool3.execute(()->{
System.out.println(Thread.currentThread().getName()+" 办理业务");
});
}
}catch (Exception e) {
e.printStackTrace();
}finally {
//关闭线程池
threadPool3.shutdown();
}
}
}
10.4线程池参数
/**
* 可缓存线程池
* @return
*/
public static ExecutorService newCachedThreadPool(){
/**
* corePoolSize 线程池的核心线程数
* maximumPoolSize 能容纳的最大线程数
* keepAliveTime 空闲线程存活时间
* unit 存活的时间单位
* workQueue 存放提交但未执行任务的队列
* threadFactory 创建线程的工厂类:可以省略
* handler 等待队列满后的拒绝策略:可以省略
*/
return new ThreadPoolExecutor(0,
Integer.MAX_VALUE,
60L,
TimeUnit.SECONDS,
new SynchronousQueue<>(),
Executors.defaultThreadFactory(),
new ThreadPoolExecutor.AbortPolicy());
}
10.5 线程池底层工作原理(重要)
1. 在创建了线程池后,线程池中的线程数为零
2. 当调用 execute()方法添加一个请求任务时,线程池会做出如下判断:
2.1 如果正在运行的线程数量小于 corePoolSize,那么马上创建线程运行这个任务;
2.2 如果正在运行的线程数量大于或等于 corePoolSize,那么将这个任务放入队列;
2.3 如果这个时候队列满了且正在运行的线程数量还小于maximumPoolSize,那么还是要创建非核心线程立刻运行这个任务; 2.4 如果队列满了且正在运行的线程数量大于或等于 maximumPoolSize,那么线程池会启动饱和拒绝策略来执行。
3. 当一个线程完成任务时,它会从队列中取下一个任务来执行
4. 当一个线程无事可做超过一定的时间(keepAliveTime)时,线程会判断:
4.1 如果当前运行的线程数大于 corePoolSize,那么这个线程就被停掉。
4.2 所以线程池的所有任务完成后,它最终会收缩到 corePoolSize 的大小。
10.6自定义线程池
import java.util.concurrent.*;
//自定义线程池创建
public class ThreadPoolDemo2 {
public static void main(String[] args) {
ExecutorService threadPool = new ThreadPoolExecutor(
2,
5,
2L,
TimeUnit.SECONDS,
new ArrayBlockingQueue<>(3),
Executors.defaultThreadFactory(),
new ThreadPoolExecutor.AbortPolicy()
);
//10个顾客请求
try {
for (int i = 1; i <=10; i++) {
//执行
threadPool.execute(()->{
System.out.println(Thread.currentThread().getName()+" 办理业务");
});
}
}catch (Exception e) {
e.printStackTrace();
}finally {
//关闭
threadPool.shutdown();
}
}
}
11 Fork/Join
分支合并框架
Fork/Join 它可以将一个大的任务拆分成多个子任务进行并行处理,最后将子任务结果合并成最后的计算结果,并进行输出。Fork/Join 框架要完成两件事情:
1.Fork:把一个复杂任务进行分拆,大事化小
2.Join:把分拆任务的结果进行合并
import java.util.concurrent.*;
class MyTask extends RecursiveTask<Integer> {
//拆分差值不能超过10,计算10以内运算
private static final Integer VALUE = 10;
private int begin ;//拆分开始值
private int end;//拆分结束值
private int result ; //返回结果
//创建有参数构造
public MyTask(int begin,int end) {
this.begin = begin;
this.end = end;
}
//拆分和合并过程
@Override
protected Integer compute() {
//判断相加两个数值是否大于10
if((end-begin)<=VALUE) {
//相加操作
for (int i = begin; i <=end; i++) {
result = result+i;
}
} else {//进一步拆分
//获取中间值
int middle = (begin+end)/2;
//拆分左边
MyTask task01 = new MyTask(begin,middle);
//拆分右边
MyTask task02 = new MyTask(middle+1,end);
//调用方法拆分
task01.fork();
task02.fork();
//合并结果
result = task01.join()+task02.join();
}
return result;
}
}
public class ForkJoinDemo {
public static void main(String[] args) throws ExecutionException, InterruptedException {
//创建MyTask对象
MyTask myTask = new MyTask(0,100);
//创建分支合并池对象
ForkJoinPool forkJoinPool = new ForkJoinPool();
ForkJoinTask<Integer> forkJoinTask = forkJoinPool.submit(myTask);
//获取最终合并之后结果
Integer result = forkJoinTask.get();
System.out.println(result);
//关闭池对象
forkJoinPool.shutdown();
}
}
12 CompletableFuture (异步回调)
12.1没有返回值/有返回值的异步调用
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
//异步调用和同步调用
public class CompletableFutureDemo {
public static void main(String[] args) throws Exception {
//同步调用
CompletableFuture<Void> completableFuture1 = CompletableFuture.runAsync(()->{
System.out.println(Thread.currentThread().getName()+" : CompletableFuture1");
});
completableFuture1.get();
//mq消息队列
//异步调用
CompletableFuture<Integer> completableFuture2 = CompletableFuture.supplyAsync(()->{
System.out.println(Thread.currentThread().getName()+" : CompletableFuture2");
//模拟异常
int i = 10/0;
return 1024;
});
//t: 方法返回值 u: 异常信息
completableFuture2.whenComplete((t,u)->{
System.out.println("------t="+t);
System.out.println("------u="+u);
}).get();
}
}