多线程高并发--3

分享 123456789987654321 ⋅ 于 2021-02-05 15:40:15 ⋅ 1616 阅读

CyclicBarrier

循环屏障,初始化时需要设定参与线程的数量,内部使用可重入锁ReentrantLock和Condition,每次调用await()方法,计数器(参与线程的数量)减1,阻塞当前线程,当计数器减到0时,唤醒所有阻塞线程。在这之后如果继续调用await()方法,新的一轮循环重新开始。

public class Demo01CyclicBarrier {
    // 定义参与线程数为5
//    static CyclicBarrier cyclicBarrier = new CyclicBarrier(5);
    // 第二个参数是一个runnable,当所有线程准备完成,先执行这个runnable中的run()方法
    static CyclicBarrier cyclicBarrier = new CyclicBarrier(5,new Start());
    static Random random = new Random();
    public static void main(String[] args) {
        // 启动5个线程
        for (int i = 0; i < 5; i++) {
            new Thread(new Runnable() {
                @Override
                public void run() {
                    try {
                        // 模拟线程准备时间
                        Thread.sleep(random.nextInt(3000));
                        System.out.println(Thread.currentThread().getName()+" 准备就绪");
                        // 将线程阻塞在屏障前
                        cyclicBarrier.await();
                        // 线程执行
                        System.out.println(Thread.currentThread().getName()+" 开始执行");
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    } catch (BrokenBarrierException e) {
                        e.printStackTrace();
                    }
                }
            }).start();
        }
    }

    static class Start implements  Runnable{

        @Override
        public void run() {
            System.out.println("全部就位,开始");
        }
    }
}

如果线程数量达不到设定的参与线程数,所有线程会一直阻塞。

CountDownLatch

倒计数门闩,指await()方法在倒计数为0之前会阻塞当前线程

public class Demo02CountDownLatch {
    static CountDownLatch countDownLatch = new CountDownLatch(5);
    static Random random = new Random();
    public static void main(String[] args) {
        // 开启5个线程
        for (int i = 0; i < 5; i++) {
            new Thread(new Runnable() {
                @Override
                public void run() {
                    // 默认程序执行耗时
                    try {
                        Thread.sleep(random.nextInt(3000));
                        countDownLatch.countDown();
                        System.out.println(Thread.currentThread().getName()+" 执行完成");
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
            }).start();
        }
        // 主线程等待所有子线程执行完成
        try {
            // 等待子线程执行方案 1 使用join   2  Thread.activeCount()+ yield
            countDownLatch.await();
            System.out.println("主线程继续执行");
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
}

Semaphore

信号量,就像银行的窗口,比如银行开了3个窗口,同一时间只能有3个窗口提供服务,同一时刻只能有3个线程执行,其他线程进入等待状态

public class Demo03Semaphore {
    // 允许3个线程同时执行
    static Semaphore semaphore = new Semaphore(3);
    static Random random = new Random();
    public static void main(String[] args) {
        // 模拟10个线程
        for (int i = 0; i < 10; i++) {
            new Thread(new Runnable() {
                @Override
                public void run() {
                    // 使用Semaphore获取运行权利
                    try {
                        semaphore.acquire();
                        System.out.println(Thread.currentThread().getName()+" 开始执行");
                        // 模拟线程执行时间
                        Thread.sleep(random.nextInt(3000));
                        // 释放信号量
                        semaphore.release();
                        System.out.println(Thread.currentThread().getName()+" 执行完成,下一个");
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
            }).start();
        }
    }
}

JDK7 HashMap

线程不安全

public class Demo04Hashmap {
    static HashMap<String,String> hashMap = new HashMap<>();
    static CountDownLatch countDownLatch = new CountDownLatch(10);
    public static void main(String[] args) {
        // 启动10个线程
        for (int i = 0; i < 10; i++) {
            Run run = new Run(i);
            new Thread(run).start();
        }
        // 主线程等待子线程执行完成
        try {
            countDownLatch.await();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        // 打印出hashMap的元素个数
        System.out.println(hashMap.size());
    }

    static class Run implements Runnable{
        int startIndex;

        public Run(int startIndex) {
            this.startIndex = startIndex;
        }

        @Override
        public void run() {
            for (int i = 0; i < 10; i++) {
                hashMap.put("key"+(startIndex)*10+i,"1");
                try {
                    Thread.sleep(10);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
            countDownLatch.countDown();
        }
    }
}

hashmap关键参数

// 初始容量
static final int DEFAULT_INITIAL_CAPACITY = 1 << 4; // aka 16
// 最大容量
static final int MAXIMUM_CAPACITY = 1 << 30;  // 1073741824
// 负载因子 当数组元素大于初始容量*负载因子时,数组进行扩容,扩容为原来的2倍
static final float DEFAULT_LOAD_FACTOR = 0.75f;

JDK7 ConcurrentHashMap

ConcurrentHashMap是线程安全的

public class Demo04Hashmap {
//    static HashMap<String,String> hashMap = new HashMap<>();
    static ConcurrentHashMap<String,String> hashMap = new ConcurrentHashMap<>();
    static CountDownLatch countDownLatch = new CountDownLatch(10);
    public static void main(String[] args) {
//        System.out.println(1<<30);
        // 启动10个线程
        for (int i = 0; i < 10; i++) {
            Run run = new Run(i);
            new Thread(run).start();
        }
        // 主线程等待子线程执行完成
        try {
            countDownLatch.await();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        // 打印出hashMap的元素个数
        System.out.println(hashMap.size());
    }

    static class Run implements Runnable{
        int startIndex;

        public Run(int startIndex) {
            this.startIndex = startIndex;
        }

        @Override
        public void run() {
            for (int i = 0; i < 10; i++) {
                hashMap.put("key"+(startIndex)*10+i,"1");
                try {
                    Thread.sleep(10);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
            countDownLatch.countDown();
        }
    }
}

JDK8 HashMap

结构是数组+链表+红黑树,使用红黑树是为了提高查询效率,在链表节点个数超过8个时,将链表转换成红黑树

// 链表节点个数阈值,超过这个值,转变成红黑树
static final int TREEIFY_THRESHOLD = 8;

红黑树

红黑树性质:

  • 节点是红色或者黑色
  • 根节点是黑色
  • 每个红色节点的两个子节点都是黑色,从每个叶子结点到跟的所有路径上不能有两个连续的红色节点
  • 从任一节点到其每个叶子的所有路径包含相同数目的黑色节点

插入新节点后,如果不满足红黑树性质,需要对节点进行旋转,旋转是为了让树更倾向于平衡

左旋:如果对节点P进行左旋,则P的右子节点会上升为P的父节点,P则变成左子节点

右旋:如果对节点P进行右旋,则P的左子节点会上升为P的父节点,P则变成右子节点

JDK8 ConcurrentHashMap

结构:数组+链表+红黑树

在使用put的时候,最开始初始化数组,容量默认是16,计算Key的哈希值在数组中的位置,判断数组中该位置的节点是否为空,如果为空,通过CAS操作写入节点,如果数组该位置的节点不为空,首先锁定该节点(链表的首节点),遍历链表,先判断key是否存在链表中,存在就替换value,不存在将新的节点添加在链表中

支持边迭代边修改

public class Demo05ConcurrentHashMap {
//    static Hashtable<String,Integer> hashMap = new Hashtable<>();
//    static HashMap<String,Integer> hashMap = new HashMap<>();
    static ConcurrentHashMap<String,Integer> hashMap = new ConcurrentHashMap<>();
    public static void main(String[] args) {
        // 往hashMap中插入数据
        for (int i = 0; i < 10; i++) {
            hashMap.put("Key"+i,i);
        }
        // 一边遍历一边修改
        for (Map.Entry<String, Integer> entry : hashMap.entrySet()) {
            hashMap.remove(entry.getKey());
        }
        System.out.println(hashMap.size());
    }
}

JUC队列

一般使用队列都是在生产者、消费者场景。

非阻塞队列

ConcurrentLinkedQueue

public class Demo06ConcurrentLikedQueue {
    // 初始化队列
    static ConcurrentLinkedQueue<Integer> queue = new ConcurrentLinkedQueue<>();
    public static void main(String[] args) {
        // 开启生产者线程
        new Thread(new Runnable() {
            @Override
            public void run() {
                for (int i = 0; i < 10; i++) {
                    queue.add(i);
                    System.out.println("生产者生产:"+i);
                }
            }
        }).start();
        // 消费者线程
        new Thread(new Runnable() {
            @Override
            public void run() {
                for (int i = 0; i < 15; i++) {
                    Integer poll = queue.poll();
                    System.out.println("消费者消费:"+poll);
                }
            }
        }).start();
    }
}

ArrayBlockingQueue

阻塞的关键在于使用了Lock和Condition

生产者

public class Demo07ArrayBlockingQueue {
    // 需要设置数组容量
    static ArrayBlockingQueue<Integer> queue = new ArrayBlockingQueue<>(10);
    public static void main(String[] args) {
        // 生产者线程
        new Thread(new Runnable() {
            @Override
            public void run() {
                // 循环插入数据
                for (int i = 0; i < 11; i++) {
//                    queue.add(i);   // 如果队列满了,会抛出java.lang.IllegalStateException: Queue full
//                    queue.offer(i); // 如果队列满了,新加入的元素直接抛弃
                    try {
                        queue.put(i);   // 如果队列满了,阻塞生产者线程
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                    System.out.println("添加元素:"+i);
                }
            }
        }).start();
        // 等待子线程执行完成
        while (Thread.activeCount()>2){
            Thread.yield();
        }
        // 打印出队列的值
        System.out.println(queue);
    }
}

消费者

public class Demo07ArrayBlockingQueue {
    // 需要设置数组容量
    static ArrayBlockingQueue<Integer> queue = new ArrayBlockingQueue<>(10);
    public static void main(String[] args) {
        // 生产者线程
        new Thread(new Runnable() {
            @Override
            public void run() {
                // 循环插入数据
                for (int i = 0; i < 10; i++) {
//                    queue.add(i);   // 如果队列满了,会抛出java.lang.IllegalStateException: Queue full
//                    queue.offer(i); // 如果队列满了,新加入的元素直接抛弃
                    try {
                        queue.put(i);   // 如果队列满了,阻塞生产者线程
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
//                    System.out.println("添加元素:"+i);
                }
            }
        }).start();
        // 等待子线程执行完成
        while (Thread.activeCount()>2){
            Thread.yield();
        }
        // 打印出队列的值
        System.out.println(queue);

        // 消费者线程
        new Thread(new Runnable() {
            @Override
            public void run() {
                for (int i = 0; i < 11; i++) {
//                    Integer integer = queue.remove();    // 如果队列是空的,抛出java.util.NoSuchElementException
//                    Integer integer = queue.poll(); // 如果队列为空,获取到的是null
                    Integer integer = null; // 如果队列为空,阻塞消费者线程
                    try {
                        integer = queue.take();
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                    System.out.println("消费者消费:"+integer);
                }
            }
        }).start();
    }
}

LinkedBlockingQueue

构造函数可以传入容量,如果不传入默认使用Integer.MAX_VALUE

生产者

public class Demo08LinkedBlockingQueue {
    // 设置容量为10
    static LinkedBlockingQueue<Integer> queue = new LinkedBlockingQueue<>(10);
    public static void main(String[] args) {
        // 生产者线程
        new Thread(new Runnable() {
            @Override
            public void run() {
                // 循环插入数据
                for (int i = 0; i < 11; i++) {
//                    queue.add(i);   // 如果队列满了,会抛出java.lang.IllegalStateException: Queue full
//                    queue.offer(i); // 如果队列满了,新加入的元素直接抛弃
                    try {
                        queue.put(i);   // 如果队列满了,阻塞生产者线程
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                    System.out.println("添加元素:"+i);
                }
            }
        }).start();
        // 等待子线程执行完成
        while (Thread.activeCount()>2){
            Thread.yield();
        }
        // 打印出队列的值
        System.out.println(queue);
    }
}

消费者

public class Demo08LinkedBlockingQueue {
    // 设置容量为10
    static LinkedBlockingQueue<Integer> queue = new LinkedBlockingQueue<>(10);
    public static void main(String[] args) {
        // 生产者线程
        new Thread(new Runnable() {
            @Override
            public void run() {
                // 循环插入数据
                for (int i = 0; i < 10; i++) {
//                    queue.add(i);   // 如果队列满了,会抛出java.lang.IllegalStateException: Queue full
//                    queue.offer(i); // 如果队列满了,新加入的元素直接抛弃
                    try {
                        queue.put(i);   // 如果队列满了,阻塞生产者线程
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
//                    System.out.println("添加元素:"+i);
                }
            }
        }).start();
        // 等待子线程执行完成
        while (Thread.activeCount()>2){
            Thread.yield();
        }
        // 打印出队列的值
        System.out.println(queue);

        // 消费者线程
        new Thread(new Runnable() {
            @Override
            public void run() {
                for (int i = 0; i < 11; i++) {
//                    Integer integer = queue.remove();    // 如果队列是空的,抛出java.util.NoSuchElementException
//                    Integer integer = queue.poll(); // 如果队列为空,获取到的是null
                    Integer integer = null; // 如果队列为空,阻塞消费者线程
                    try {
                        integer = queue.take();
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                    System.out.println("消费者消费:"+integer);
                }
            }
        }).start();
    }
}

PriorityBlockingQueue

默认属性

// 默认容量
private static final int DEFAULT_INITIAL_CAPACITY = 11;

创建时可以指定容量大小,如果不指定,容量默认为11,当向队列增加元素时,队列大小会自动增加

队列内部实现了排序,排序比较小的元素有限出队

public class Demo09PriorityBlockingQueue {
    // 如果不设置初始容量,默认是11
    static PriorityBlockingQueue<Integer> queue = new PriorityBlockingQueue<>();
    static Random random = new Random();
    public static void main(String[] args) {
        // 生产者线程
        Thread producer = new Thread(new Runnable() {
            @Override
            public void run() {
                for (int i = 0; i < 15; i++) {
                    queue.put(random.nextInt(100));
                }
            }
        });
        producer.start();
        // 主线程进行等待
        try {
            producer.join();
            System.out.println(queue);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        // 消费者线程
        Thread consumer = new Thread(new Runnable() {
            @Override
            public void run() {
                for (int i = 0; i < 16; i++) {
                    Integer integer = null;
//                    integer = queue.remove();
//                    integer = queue.poll();
                    try {
                        integer = queue.take();
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                    System.out.println("消费者消费: "+integer);
                }
            }
        });
        consumer.start();
        // 主线程进行等待
        try {
            consumer.join();
            System.out.println(queue);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
}

SynchronousQueue

队列本身不存储数据

public class Demo10SynchronousQueue {
    static SynchronousQueue<Integer> queue = new SynchronousQueue<>();

    public static void main(String[] args) {
        //生产者
        new Thread(new Runnable() {
            @Override
            public void run() {
                // 往队列添加数据
//                queue.add(1);   // 队列本身不保存数据
                queue.offer(1); // 做抛弃处理
            }
        }).start();

        while (Thread.activeCount()>2){
            Thread.yield();
        }
        System.out.println(queue);
    }
}

生产者与消费者结合

public class Demo10SynchronousQueue {
    static SynchronousQueue<Integer> queue = new SynchronousQueue<>();

    public static void main(String[] args) {
        //生产者
        new Thread(new Runnable() {
            @Override
            public void run() {
                // 往队列添加数据
//                queue.add(1);   // 队列本身不保存数据
//                queue.offer(1); // 做抛弃处理
                for (int i = 0; i < 10; i++) {
                    try {
                        queue.put(i);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                    System.out.println("生产者生产: "+i);
                }
            }
        }).start();

//        while (Thread.activeCount()>2){
//            Thread.yield();
//        }
//        System.out.println(queue);

        // 消费者线程
        new Thread(new Runnable() {
            @Override
            public void run() {
                for (int i = 0; i < 10; i++) {
                    try {
                        System.out.println("消费者消费:"+queue.take());
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
            }
        }).start();
    }
}

线程池

ThreadPoolExecutor

构造函数

public ThreadPoolExecutor(int corePoolSize,
                              int maximumPoolSize,
                              long keepAliveTime,
                              TimeUnit unit,
                              BlockingQueue<Runnable> workQueue,
                              ThreadFactory threadFactory,
                              RejectedExecutionHandler handler) 
corePoolSize:线程池中核心线程的数量,即时空闲,也会维持该数量的线程
maximumPoolSize:线程池最大支持的线程数,当线程池的阻塞队列满了之后,如果还有任务提交,且当前线程数小于这个最大值,会启动新的线程来执行任务。注意,如果使用的是无界队列,该参数无效
keepAliveTime:当线程数比核心线程数多时,空闲线程的最大存活时间
unit:keepAliveTime的单位
workQueue:用来保存等待执行的任务的阻塞队列,等待的任务必须实现Runnable接口
threadFactory:创建线程的工厂
handler:线程池的拒绝策略,默认是AbortPolicy,直接抛出异常

创建线程和销毁线程都有开销,类似于电脑的开机和关机,线程池就相当于网吧,电脑已经开好了,上来用就行。

核心线程就是网吧早上刚开门,来一个客户就开一台电脑,如果核心线程数是10,开10台电脑,即时客户下线离开,这10台电脑也不会关机。

最大线程数相当于网吧所有的电脑数,如果最大线程数为100,网吧有100台电脑。

keepAliveTime,网吧除了那10台电脑外,其他的电脑最大的空闲时间,超过这个时间,就关机。

workQueue,工作队列,也就是在网吧进行排队,当核心数量的电脑有人下线后,从队列中取出任务放到核心电脑上,如果队列满了,开启新的电脑来执行

handler,核心线程满了,工作队列也满了,线程达到了最大maximumPoolSize,此时再来顾客,直接拒绝

public class Demo11ThreadPool {
    public static void main(String[] args) {
        Random random = new Random();
        // 创建一个核心线程数为5 最大线程数为10 工作队列容量为10 的线程池
        ThreadPoolExecutor executor = new ThreadPoolExecutor(5,10,0, TimeUnit.SECONDS,
                new ArrayBlockingQueue<>(10));
        // 开启线程
        for (int i = 0; i < 15; i++) {
            // 往线程池中提交任务
            executor.submit(new Runnable() {
                @Override
                public void run() {
                    System.out.println(Thread.currentThread().getName()+" 线程执行");
                    try {
                        // 模拟任务随机运行时间
                        Thread.sleep(random.nextInt(3000));
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
            });
        }
        // 线程执行完成后关机线程池
        executor.shutdown();
        System.out.println("主线程继续执行");
    }
}

可以看到一共有5个核心线程提供服务

将线程数改为16,除了5个核心线程执行任务外,将创建1个非核心线程来执行任务

继续增大线程数为20,除了5个核心线程执行任务外,会创建5个非核心线程执行任务

继续增大线程数为21,可以看到有一个线程任务被拒绝了

将队列从ArrayBlockingQueue修改为ArrayBlockingQueue,相当于用了一个无界队列,所有的任务都会等待核心线程来执行

如果将核心线程设置为0,工作队列容量设置为10,任务数也设置为10,可以看到任务是单线程来执行

继续将任务数增大到11,可以看到有两个线程来执行任务

FixedThreadPool

public class Demo12FixedThreadPool {
    public static void main(String[] args) {
        // 创建一个核心线程数为5 最大线程数为5 工作队列容量相当于无界
        ExecutorService executorService = Executors.newFixedThreadPool(5);
        Random random = new Random();
        // 往线程池中添加任务
        for (int i = 0; i < 20; i++) {
            executorService.submit(new Runnable() {
                @Override
                public void run() {
                    System.out.println(Thread.currentThread().getName()+" 线程执行");
                    try {
                        // 模拟任务随机运行时间
                        Thread.sleep(random.nextInt(5000));
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
            });
        }
        executorService.shutdown();
    }
}

SingleThreadExecutor

public class Demo13SingleThreadPool {
    public static void main(String[] args) {
        // 创建核心线程数为1 最大线程数为1 工作队列相当于无界队列
        ExecutorService executorService = Executors.newSingleThreadExecutor();
        Random random = new Random();
        for (int i = 0; i < 10; i++) {
            executorService.submit(new Runnable() {
                @Override
                public void run() {
                    System.out.println(Thread.currentThread().getName()+" 线程执行");
                    try {
                        Thread.sleep(random.nextInt(4000));
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
            });
        }
        executorService.shutdown();
    }
}

CachedThreadPool

public class Demo14CachedThreadPool {
    public static void main(String[] args) {
        // 创建核心线程数为0 最大线程数为Integer.MAX_VALUE 的线程池
        ExecutorService executorService = Executors.newCachedThreadPool();
        Random random = new Random();
        for (int i = 0; i < 20; i++) {
            executorService.submit(new Runnable() {
                @Override
                public void run() {
                    System.out.println(Thread.currentThread().getName()+" 线程执行");
                    try {
                        Thread.sleep(random.nextInt(2000));
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
            });
        }
        executorService.shutdown();
    }
}

ScheduledExecutorService

public class Demo15ScheduledThreadPool {
    public static void main(String[] args) {
        ScheduledExecutorService executorService = Executors.newScheduledThreadPool(5);
        System.out.println(System.currentTimeMillis());
        // 第一中用法
        // 第一个参数是要运行的任务  第二个参数是延迟执行的时间 第三个参数是延迟时间的单位
//        executorService.schedule(new Runnable() {
//            @Override
//            public void run() {
//                System.out.println(Thread.currentThread().getName()+" 开始执行" + System.currentTimeMillis());
//            }
//        },3, TimeUnit.SECONDS);
//        executorService.shutdown();
        // 第二种用法
        // 第一个参数是要运行的任务  第二个参数是延迟执行的时间 第三个参数是执行的间隔 第四个参数是延迟时间的单位
        executorService.scheduleAtFixedRate(new Runnable() {
            @Override
            public void run() {
                System.out.println(Thread.currentThread().getName()+" 开始执行" + System.currentTimeMillis());
            }
        },2,2,TimeUnit.SECONDS);

    }
}
版权声明:原创作品,允许转载,转载时务必以超链接的形式表明出处和作者信息。否则将追究法律责任。来自海汼部落-123456789987654321,http://hainiubl.com/topics/75390
点赞
成为第一个点赞的人吧 :bowtie:
回复数量: 0
    暂无评论~~
    • 请注意单词拼写,以及中英文排版,参考此页
    • 支持 Markdown 格式, **粗体**、~~删除线~~、`单行代码`, 更多语法请见这里 Markdown 语法
    • 支持表情,可用Emoji的自动补全, 在输入的时候只需要 ":" 就可以自动提示了 :metal: :point_right: 表情列表 :star: :sparkles:
    • 上传图片, 支持拖拽和剪切板黏贴上传, 格式限制 - jpg, png, gif,教程
    • 发布框支持本地存储功能,会在内容变更时保存,「提交」按钮点击时清空
    Ctrl+Enter