CyclicBarrier
循环屏障,初始化时需要设定参与线程的数量,内部使用可重入锁ReentrantLock和Condition,每次调用await()方法,计数器(参与线程的数量)减1,阻塞当前线程,当计数器减到0时,唤醒所有阻塞线程。在这之后如果继续调用await()方法,新的一轮循环重新开始。
public class Demo01CyclicBarrier {
// 定义参与线程数为5
// static CyclicBarrier cyclicBarrier = new CyclicBarrier(5);
// 第二个参数是一个runnable,当所有线程准备完成,先执行这个runnable中的run()方法
static CyclicBarrier cyclicBarrier = new CyclicBarrier(5,new Start());
static Random random = new Random();
public static void main(String[] args) {
// 启动5个线程
for (int i = 0; i < 5; i++) {
new Thread(new Runnable() {
@Override
public void run() {
try {
// 模拟线程准备时间
Thread.sleep(random.nextInt(3000));
System.out.println(Thread.currentThread().getName()+" 准备就绪");
// 将线程阻塞在屏障前
cyclicBarrier.await();
// 线程执行
System.out.println(Thread.currentThread().getName()+" 开始执行");
} catch (InterruptedException e) {
e.printStackTrace();
} catch (BrokenBarrierException e) {
e.printStackTrace();
}
}
}).start();
}
}
static class Start implements Runnable{
@Override
public void run() {
System.out.println("全部就位,开始");
}
}
}
如果线程数量达不到设定的参与线程数,所有线程会一直阻塞。
CountDownLatch
倒计数门闩,指await()方法在倒计数为0之前会阻塞当前线程
public class Demo02CountDownLatch {
static CountDownLatch countDownLatch = new CountDownLatch(5);
static Random random = new Random();
public static void main(String[] args) {
// 开启5个线程
for (int i = 0; i < 5; i++) {
new Thread(new Runnable() {
@Override
public void run() {
// 默认程序执行耗时
try {
Thread.sleep(random.nextInt(3000));
countDownLatch.countDown();
System.out.println(Thread.currentThread().getName()+" 执行完成");
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}).start();
}
// 主线程等待所有子线程执行完成
try {
// 等待子线程执行方案 1 使用join 2 Thread.activeCount()+ yield
countDownLatch.await();
System.out.println("主线程继续执行");
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
Semaphore
信号量,就像银行的窗口,比如银行开了3个窗口,同一时间只能有3个窗口提供服务,同一时刻只能有3个线程执行,其他线程进入等待状态
public class Demo03Semaphore {
// 允许3个线程同时执行
static Semaphore semaphore = new Semaphore(3);
static Random random = new Random();
public static void main(String[] args) {
// 模拟10个线程
for (int i = 0; i < 10; i++) {
new Thread(new Runnable() {
@Override
public void run() {
// 使用Semaphore获取运行权利
try {
semaphore.acquire();
System.out.println(Thread.currentThread().getName()+" 开始执行");
// 模拟线程执行时间
Thread.sleep(random.nextInt(3000));
// 释放信号量
semaphore.release();
System.out.println(Thread.currentThread().getName()+" 执行完成,下一个");
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}).start();
}
}
}
JDK7 HashMap
线程不安全
public class Demo04Hashmap {
static HashMap<String,String> hashMap = new HashMap<>();
static CountDownLatch countDownLatch = new CountDownLatch(10);
public static void main(String[] args) {
// 启动10个线程
for (int i = 0; i < 10; i++) {
Run run = new Run(i);
new Thread(run).start();
}
// 主线程等待子线程执行完成
try {
countDownLatch.await();
} catch (InterruptedException e) {
e.printStackTrace();
}
// 打印出hashMap的元素个数
System.out.println(hashMap.size());
}
static class Run implements Runnable{
int startIndex;
public Run(int startIndex) {
this.startIndex = startIndex;
}
@Override
public void run() {
for (int i = 0; i < 10; i++) {
hashMap.put("key"+(startIndex)*10+i,"1");
try {
Thread.sleep(10);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
countDownLatch.countDown();
}
}
}
hashmap关键参数
// 初始容量
static final int DEFAULT_INITIAL_CAPACITY = 1 << 4; // aka 16
// 最大容量
static final int MAXIMUM_CAPACITY = 1 << 30; // 1073741824
// 负载因子 当数组元素大于初始容量*负载因子时,数组进行扩容,扩容为原来的2倍
static final float DEFAULT_LOAD_FACTOR = 0.75f;
JDK7 ConcurrentHashMap
ConcurrentHashMap是线程安全的
public class Demo04Hashmap {
// static HashMap<String,String> hashMap = new HashMap<>();
static ConcurrentHashMap<String,String> hashMap = new ConcurrentHashMap<>();
static CountDownLatch countDownLatch = new CountDownLatch(10);
public static void main(String[] args) {
// System.out.println(1<<30);
// 启动10个线程
for (int i = 0; i < 10; i++) {
Run run = new Run(i);
new Thread(run).start();
}
// 主线程等待子线程执行完成
try {
countDownLatch.await();
} catch (InterruptedException e) {
e.printStackTrace();
}
// 打印出hashMap的元素个数
System.out.println(hashMap.size());
}
static class Run implements Runnable{
int startIndex;
public Run(int startIndex) {
this.startIndex = startIndex;
}
@Override
public void run() {
for (int i = 0; i < 10; i++) {
hashMap.put("key"+(startIndex)*10+i,"1");
try {
Thread.sleep(10);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
countDownLatch.countDown();
}
}
}
JDK8 HashMap
结构是数组+链表+红黑树,使用红黑树是为了提高查询效率,在链表节点个数超过8个时,将链表转换成红黑树
// 链表节点个数阈值,超过这个值,转变成红黑树
static final int TREEIFY_THRESHOLD = 8;
红黑树
红黑树性质:
- 节点是红色或者黑色
- 根节点是黑色
- 每个红色节点的两个子节点都是黑色,从每个叶子结点到跟的所有路径上不能有两个连续的红色节点
- 从任一节点到其每个叶子的所有路径包含相同数目的黑色节点
插入新节点后,如果不满足红黑树性质,需要对节点进行旋转,旋转是为了让树更倾向于平衡
左旋:如果对节点P进行左旋,则P的右子节点会上升为P的父节点,P则变成左子节点
右旋:如果对节点P进行右旋,则P的左子节点会上升为P的父节点,P则变成右子节点
JDK8 ConcurrentHashMap
结构:数组+链表+红黑树
在使用put的时候,最开始初始化数组,容量默认是16,计算Key的哈希值在数组中的位置,判断数组中该位置的节点是否为空,如果为空,通过CAS操作写入节点,如果数组该位置的节点不为空,首先锁定该节点(链表的首节点),遍历链表,先判断key是否存在链表中,存在就替换value,不存在将新的节点添加在链表中
支持边迭代边修改
public class Demo05ConcurrentHashMap {
// static Hashtable<String,Integer> hashMap = new Hashtable<>();
// static HashMap<String,Integer> hashMap = new HashMap<>();
static ConcurrentHashMap<String,Integer> hashMap = new ConcurrentHashMap<>();
public static void main(String[] args) {
// 往hashMap中插入数据
for (int i = 0; i < 10; i++) {
hashMap.put("Key"+i,i);
}
// 一边遍历一边修改
for (Map.Entry<String, Integer> entry : hashMap.entrySet()) {
hashMap.remove(entry.getKey());
}
System.out.println(hashMap.size());
}
}
JUC队列
一般使用队列都是在生产者、消费者场景。
非阻塞队列
ConcurrentLinkedQueue
public class Demo06ConcurrentLikedQueue {
// 初始化队列
static ConcurrentLinkedQueue<Integer> queue = new ConcurrentLinkedQueue<>();
public static void main(String[] args) {
// 开启生产者线程
new Thread(new Runnable() {
@Override
public void run() {
for (int i = 0; i < 10; i++) {
queue.add(i);
System.out.println("生产者生产:"+i);
}
}
}).start();
// 消费者线程
new Thread(new Runnable() {
@Override
public void run() {
for (int i = 0; i < 15; i++) {
Integer poll = queue.poll();
System.out.println("消费者消费:"+poll);
}
}
}).start();
}
}
ArrayBlockingQueue
阻塞的关键在于使用了Lock和Condition
生产者
public class Demo07ArrayBlockingQueue {
// 需要设置数组容量
static ArrayBlockingQueue<Integer> queue = new ArrayBlockingQueue<>(10);
public static void main(String[] args) {
// 生产者线程
new Thread(new Runnable() {
@Override
public void run() {
// 循环插入数据
for (int i = 0; i < 11; i++) {
// queue.add(i); // 如果队列满了,会抛出java.lang.IllegalStateException: Queue full
// queue.offer(i); // 如果队列满了,新加入的元素直接抛弃
try {
queue.put(i); // 如果队列满了,阻塞生产者线程
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("添加元素:"+i);
}
}
}).start();
// 等待子线程执行完成
while (Thread.activeCount()>2){
Thread.yield();
}
// 打印出队列的值
System.out.println(queue);
}
}
消费者
public class Demo07ArrayBlockingQueue {
// 需要设置数组容量
static ArrayBlockingQueue<Integer> queue = new ArrayBlockingQueue<>(10);
public static void main(String[] args) {
// 生产者线程
new Thread(new Runnable() {
@Override
public void run() {
// 循环插入数据
for (int i = 0; i < 10; i++) {
// queue.add(i); // 如果队列满了,会抛出java.lang.IllegalStateException: Queue full
// queue.offer(i); // 如果队列满了,新加入的元素直接抛弃
try {
queue.put(i); // 如果队列满了,阻塞生产者线程
} catch (InterruptedException e) {
e.printStackTrace();
}
// System.out.println("添加元素:"+i);
}
}
}).start();
// 等待子线程执行完成
while (Thread.activeCount()>2){
Thread.yield();
}
// 打印出队列的值
System.out.println(queue);
// 消费者线程
new Thread(new Runnable() {
@Override
public void run() {
for (int i = 0; i < 11; i++) {
// Integer integer = queue.remove(); // 如果队列是空的,抛出java.util.NoSuchElementException
// Integer integer = queue.poll(); // 如果队列为空,获取到的是null
Integer integer = null; // 如果队列为空,阻塞消费者线程
try {
integer = queue.take();
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("消费者消费:"+integer);
}
}
}).start();
}
}
LinkedBlockingQueue
构造函数可以传入容量,如果不传入默认使用Integer.MAX_VALUE
生产者
public class Demo08LinkedBlockingQueue {
// 设置容量为10
static LinkedBlockingQueue<Integer> queue = new LinkedBlockingQueue<>(10);
public static void main(String[] args) {
// 生产者线程
new Thread(new Runnable() {
@Override
public void run() {
// 循环插入数据
for (int i = 0; i < 11; i++) {
// queue.add(i); // 如果队列满了,会抛出java.lang.IllegalStateException: Queue full
// queue.offer(i); // 如果队列满了,新加入的元素直接抛弃
try {
queue.put(i); // 如果队列满了,阻塞生产者线程
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("添加元素:"+i);
}
}
}).start();
// 等待子线程执行完成
while (Thread.activeCount()>2){
Thread.yield();
}
// 打印出队列的值
System.out.println(queue);
}
}
消费者
public class Demo08LinkedBlockingQueue {
// 设置容量为10
static LinkedBlockingQueue<Integer> queue = new LinkedBlockingQueue<>(10);
public static void main(String[] args) {
// 生产者线程
new Thread(new Runnable() {
@Override
public void run() {
// 循环插入数据
for (int i = 0; i < 10; i++) {
// queue.add(i); // 如果队列满了,会抛出java.lang.IllegalStateException: Queue full
// queue.offer(i); // 如果队列满了,新加入的元素直接抛弃
try {
queue.put(i); // 如果队列满了,阻塞生产者线程
} catch (InterruptedException e) {
e.printStackTrace();
}
// System.out.println("添加元素:"+i);
}
}
}).start();
// 等待子线程执行完成
while (Thread.activeCount()>2){
Thread.yield();
}
// 打印出队列的值
System.out.println(queue);
// 消费者线程
new Thread(new Runnable() {
@Override
public void run() {
for (int i = 0; i < 11; i++) {
// Integer integer = queue.remove(); // 如果队列是空的,抛出java.util.NoSuchElementException
// Integer integer = queue.poll(); // 如果队列为空,获取到的是null
Integer integer = null; // 如果队列为空,阻塞消费者线程
try {
integer = queue.take();
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("消费者消费:"+integer);
}
}
}).start();
}
}
PriorityBlockingQueue
默认属性
// 默认容量
private static final int DEFAULT_INITIAL_CAPACITY = 11;
创建时可以指定容量大小,如果不指定,容量默认为11,当向队列增加元素时,队列大小会自动增加
队列内部实现了排序,排序比较小的元素有限出队
public class Demo09PriorityBlockingQueue {
// 如果不设置初始容量,默认是11
static PriorityBlockingQueue<Integer> queue = new PriorityBlockingQueue<>();
static Random random = new Random();
public static void main(String[] args) {
// 生产者线程
Thread producer = new Thread(new Runnable() {
@Override
public void run() {
for (int i = 0; i < 15; i++) {
queue.put(random.nextInt(100));
}
}
});
producer.start();
// 主线程进行等待
try {
producer.join();
System.out.println(queue);
} catch (InterruptedException e) {
e.printStackTrace();
}
// 消费者线程
Thread consumer = new Thread(new Runnable() {
@Override
public void run() {
for (int i = 0; i < 16; i++) {
Integer integer = null;
// integer = queue.remove();
// integer = queue.poll();
try {
integer = queue.take();
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("消费者消费: "+integer);
}
}
});
consumer.start();
// 主线程进行等待
try {
consumer.join();
System.out.println(queue);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
SynchronousQueue
队列本身不存储数据
public class Demo10SynchronousQueue {
static SynchronousQueue<Integer> queue = new SynchronousQueue<>();
public static void main(String[] args) {
//生产者
new Thread(new Runnable() {
@Override
public void run() {
// 往队列添加数据
// queue.add(1); // 队列本身不保存数据
queue.offer(1); // 做抛弃处理
}
}).start();
while (Thread.activeCount()>2){
Thread.yield();
}
System.out.println(queue);
}
}
生产者与消费者结合
public class Demo10SynchronousQueue {
static SynchronousQueue<Integer> queue = new SynchronousQueue<>();
public static void main(String[] args) {
//生产者
new Thread(new Runnable() {
@Override
public void run() {
// 往队列添加数据
// queue.add(1); // 队列本身不保存数据
// queue.offer(1); // 做抛弃处理
for (int i = 0; i < 10; i++) {
try {
queue.put(i);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("生产者生产: "+i);
}
}
}).start();
// while (Thread.activeCount()>2){
// Thread.yield();
// }
// System.out.println(queue);
// 消费者线程
new Thread(new Runnable() {
@Override
public void run() {
for (int i = 0; i < 10; i++) {
try {
System.out.println("消费者消费:"+queue.take());
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}).start();
}
}
线程池
ThreadPoolExecutor
构造函数
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler)
corePoolSize:线程池中核心线程的数量,即时空闲,也会维持该数量的线程
maximumPoolSize:线程池最大支持的线程数,当线程池的阻塞队列满了之后,如果还有任务提交,且当前线程数小于这个最大值,会启动新的线程来执行任务。注意,如果使用的是无界队列,该参数无效
keepAliveTime:当线程数比核心线程数多时,空闲线程的最大存活时间
unit:keepAliveTime的单位
workQueue:用来保存等待执行的任务的阻塞队列,等待的任务必须实现Runnable接口
threadFactory:创建线程的工厂
handler:线程池的拒绝策略,默认是AbortPolicy,直接抛出异常
创建线程和销毁线程都有开销,类似于电脑的开机和关机,线程池就相当于网吧,电脑已经开好了,上来用就行。
核心线程就是网吧早上刚开门,来一个客户就开一台电脑,如果核心线程数是10,开10台电脑,即时客户下线离开,这10台电脑也不会关机。
最大线程数相当于网吧所有的电脑数,如果最大线程数为100,网吧有100台电脑。
keepAliveTime,网吧除了那10台电脑外,其他的电脑最大的空闲时间,超过这个时间,就关机。
workQueue,工作队列,也就是在网吧进行排队,当核心数量的电脑有人下线后,从队列中取出任务放到核心电脑上,如果队列满了,开启新的电脑来执行
handler,核心线程满了,工作队列也满了,线程达到了最大maximumPoolSize,此时再来顾客,直接拒绝
public class Demo11ThreadPool {
public static void main(String[] args) {
Random random = new Random();
// 创建一个核心线程数为5 最大线程数为10 工作队列容量为10 的线程池
ThreadPoolExecutor executor = new ThreadPoolExecutor(5,10,0, TimeUnit.SECONDS,
new ArrayBlockingQueue<>(10));
// 开启线程
for (int i = 0; i < 15; i++) {
// 往线程池中提交任务
executor.submit(new Runnable() {
@Override
public void run() {
System.out.println(Thread.currentThread().getName()+" 线程执行");
try {
// 模拟任务随机运行时间
Thread.sleep(random.nextInt(3000));
} catch (InterruptedException e) {
e.printStackTrace();
}
}
});
}
// 线程执行完成后关机线程池
executor.shutdown();
System.out.println("主线程继续执行");
}
}
可以看到一共有5个核心线程提供服务
将线程数改为16,除了5个核心线程执行任务外,将创建1个非核心线程来执行任务
继续增大线程数为20,除了5个核心线程执行任务外,会创建5个非核心线程执行任务
继续增大线程数为21,可以看到有一个线程任务被拒绝了
将队列从ArrayBlockingQueue修改为ArrayBlockingQueue,相当于用了一个无界队列,所有的任务都会等待核心线程来执行
如果将核心线程设置为0,工作队列容量设置为10,任务数也设置为10,可以看到任务是单线程来执行
继续将任务数增大到11,可以看到有两个线程来执行任务
FixedThreadPool
public class Demo12FixedThreadPool {
public static void main(String[] args) {
// 创建一个核心线程数为5 最大线程数为5 工作队列容量相当于无界
ExecutorService executorService = Executors.newFixedThreadPool(5);
Random random = new Random();
// 往线程池中添加任务
for (int i = 0; i < 20; i++) {
executorService.submit(new Runnable() {
@Override
public void run() {
System.out.println(Thread.currentThread().getName()+" 线程执行");
try {
// 模拟任务随机运行时间
Thread.sleep(random.nextInt(5000));
} catch (InterruptedException e) {
e.printStackTrace();
}
}
});
}
executorService.shutdown();
}
}
SingleThreadExecutor
public class Demo13SingleThreadPool {
public static void main(String[] args) {
// 创建核心线程数为1 最大线程数为1 工作队列相当于无界队列
ExecutorService executorService = Executors.newSingleThreadExecutor();
Random random = new Random();
for (int i = 0; i < 10; i++) {
executorService.submit(new Runnable() {
@Override
public void run() {
System.out.println(Thread.currentThread().getName()+" 线程执行");
try {
Thread.sleep(random.nextInt(4000));
} catch (InterruptedException e) {
e.printStackTrace();
}
}
});
}
executorService.shutdown();
}
}
CachedThreadPool
public class Demo14CachedThreadPool {
public static void main(String[] args) {
// 创建核心线程数为0 最大线程数为Integer.MAX_VALUE 的线程池
ExecutorService executorService = Executors.newCachedThreadPool();
Random random = new Random();
for (int i = 0; i < 20; i++) {
executorService.submit(new Runnable() {
@Override
public void run() {
System.out.println(Thread.currentThread().getName()+" 线程执行");
try {
Thread.sleep(random.nextInt(2000));
} catch (InterruptedException e) {
e.printStackTrace();
}
}
});
}
executorService.shutdown();
}
}
ScheduledExecutorService
public class Demo15ScheduledThreadPool {
public static void main(String[] args) {
ScheduledExecutorService executorService = Executors.newScheduledThreadPool(5);
System.out.println(System.currentTimeMillis());
// 第一中用法
// 第一个参数是要运行的任务 第二个参数是延迟执行的时间 第三个参数是延迟时间的单位
// executorService.schedule(new Runnable() {
// @Override
// public void run() {
// System.out.println(Thread.currentThread().getName()+" 开始执行" + System.currentTimeMillis());
// }
// },3, TimeUnit.SECONDS);
// executorService.shutdown();
// 第二种用法
// 第一个参数是要运行的任务 第二个参数是延迟执行的时间 第三个参数是执行的间隔 第四个参数是延迟时间的单位
executorService.scheduleAtFixedRate(new Runnable() {
@Override
public void run() {
System.out.println(Thread.currentThread().getName()+" 开始执行" + System.currentTimeMillis());
}
},2,2,TimeUnit.SECONDS);
}
}