这里写目录标题

一 JUC 介绍

1 进程线程介绍介绍

进程 : 后台运行的一个程序就是一个进程 比如 运行IDEA程序
线程 : 进捏的组成部分 如 IDEA进程内的语法检查功能 就是一个线程
2 并发并行的介绍
并发(concurrent)同一时刻多个线程在访问同一个资源,多个线程对一个点
例子:小米9今天上午10点,限量抢购
春运抢票
电商秒杀…
并行 多项工作一起执行,之后再汇总
例子:泡方便面,电水壶烧水,一边撕调料倒入桶中
3 wait 和 sleep的区别
wait放开手去睡,放开手里的锁
sleep握紧手去睡,醒了手里还有锁
4 线程的状态
NEW,(新建)
RUNNABLE,(准备就绪)
BLOCKED,(阻塞)
WAITING,(不见不散)没定时间
TIMED_WAITING,(过时不候) 定了时间
TERMINATED;(终结)
二 卖票算法的企业级模板实现
企业级简单实现(synchronized)
没有像以往的那样使用接口 使类0污染比较干净
package com.luyi.demo;
/**
* 卖票 企业级套路 + 模板
* 1. 在高内聚低耦合的前提下: 线程 操作(对外暴露的调用方法) 资源类
* 高内聚: 资源类对外暴露的功能只在自己身上实现 低耦合: 调用者和 资源类之间并无关系
* @author 卢意
* @create 2021-01-10 9:42
*/
public class SaleTicket {
public static void main(String[] args) {
// 资源类
Ticket ticket = new Ticket();
// 线程 Thread(Runnable target, String ThreadName)
new Thread(new Runnable() {
@Override
public void run() {
for (int i = 0; i <= 30 ; i++) {
ticket.saleTicket();
}
}
}, "售票员A").start();
new Thread(new Runnable() {
@Override
public void run() {
for (int i = 0; i <= 30 ; i++) {
ticket.saleTicket();
}
}
}, "售票员B").start();
new Thread(new Runnable() {
@Override
public void run() {
for (int i = 0; i <= 30 ; i++) {
ticket.saleTicket();
}
}
}, "售票员C").start();
}
}
// 资源类
class Ticket {
private int number = 30;
// 操作
public synchronized void saleTicket(){
if(number > 0) {
System.out.println(Thread.currentThread().getName() + "\t卖出第" + (number--) + "\t 还剩下:" + number);
}
}
}
juc优化卖票
进一步优化锁的粒度
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
class Ticket {
private int number = 30;
private Lock lock = new ReentrantLock();// 可重入锁
// 操作
public void saleTicket(){
lock.lock();
try {
if(number > 0) {
System.out.println(Thread.currentThread().getName() + "\t卖出第" + (number--) + "\t 还剩下:" + number);
}
}finally {
lock.unlock();
}
}
}
lambda优化
package com.luyi.demo;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
/**
* 卖票 企业级套路 + 模板
* 1. 在高内聚低耦合的前提下: 线程 操作(对外暴露的调用方法) 资源类
* 高内聚: 资源类对外暴露的功能只在自己身上实现 低耦合: 调用者和 资源类之间并无关系
* @author 卢意
* @create 2021-01-10 9:42
*/
public class SaleTicket {
public static void main(String[] args) {
// 资源类
Ticket ticket = new Ticket();
// 线程 Thread(Runnable target, String ThreadName)
new Thread(() -> { for (int i = 0; i <= 30 ; i++) ticket.saleTicket(); }, "售票员A").start();
new Thread(() -> { for (int i = 0; i <= 30 ; i++) ticket.saleTicket(); }, "售票员B").start();
new Thread(() -> { for (int i = 0; i <= 30 ; i++) ticket.saleTicket(); }, "售票员C").start();
}
}
// 资源类
class Ticket {
private int number = 30;
private Lock lock = new ReentrantLock();// 可重入锁
// 操作
public void saleTicket(){
lock.lock();
try {
if(number > 0) {
System.out.println(Thread.currentThread().getName() + "\t卖出第" + (number--) + "\t 还剩下:" + number);
}
}finally {
lock.unlock();
}
}
}
三 Lambda Expression 快速复习
接口内如果只有一个声明的方法 就是函数式接口@FunctionalInterface(只有一个方法 默认加上)
interface Foo {
public void sayHello();
}
在函数式接口的情况下
口诀
拷贝小括号(复制小括号的内容) 写死右箭头 落地大箭头 () -> {}Foo foo = () -> System.out.println(“Hello”);
有参数方法
interface Foo {
public int add(int x, int y);
}
Foo foo = (int x, int y) -> {
System.out.println(“add method”);
return x + y;
};
System.out.println(foo.add(1, 2));
// add method
// 3
甚至可以去除参数的类型
Foo foo = (x, y) -> {
System.out.println(“add method”);
return x + y;
};
四 浅谈java8接口的变化
java8以后允许接口内定义接口的实现
interface Foo {
public int add(int x, int y);
default int div(int x, int y) {
return x + y;
}
default int div2(int x, int y) {
return x + y;
}
}
上面的接口还是算一个函数式接口
我们可以知道 default影响函数式接口的定义
接口支持静态方法
interface Foo {
public int add(int x, int y);
default int div(int x, int y) {
return x + y;
}
default int div2(int x, int y) {
return x + y;
}
public static int div3(int x, int y) {
return x + y;
}
}
四 线程间的通信
线程间的横向交互
线程的生产者消费者(synchronized)
package com.luyi.demo;
/**
* 两个线程操作一个变量 一个线程让他加一 一个线程让他减一
* 实现交替操作 10轮 让这个变量的结果还是为0
* 1. 高内聚低耦合的前提下, 线程 操作 资源类
* 2. 判断 干活 通知
* @author 卢意
* @create 2021-01-10 15:16
*/
public class ThreadWaitNotifyDemo {
public static void main(String[] args) {
AirConditioner airConditioner = new AirConditioner();
for (int i = 0; i < 10; i++) {
}
new Thread(() -> {
for (int i = 0; i < 10; i++) {
try {
airConditioner.increment();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}, "加一线程").start();
new Thread(() -> {
for (int i = 0; i < 10; i++) {
try {
airConditioner.decrement();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}, "减一线程").start();
}
}
// 资源类
class AirConditioner{
private int number = 0;
public synchronized void increment() throws InterruptedException {
// 判断
if(number != 0) {
this.wait();
}
// 干活
number++;
System.out.println(Thread.currentThread().getName() + "\t" + number);
// 通知
this.notifyAll();
}
public synchronized void decrement() throws InterruptedException {
// 判断
if (number == 0) {
this.wait();
}
// 干活
number--;
System.out.println(Thread.currentThread().getName() + "\t" + number);
// 通知
this.notifyAll();
}
}
* 如果需求变化
* 四个线程操作一个变量 两个线程让他加一 两个线程让他减一
* 实现交替操作 10轮 让这个变量的结果还是为0
需要设置防止虚假唤醒
public synchronized void increment() throws InterruptedException {
// 判断
while (number != 0) {
this.wait();
}
// 干活
number++;
System.out.println(Thread.currentThread().getName() + "\t" + number);
// 通知
this.notifyAll();
}
public synchronized void decrement() throws InterruptedException {
// 判断
while (number == 0) {
this.wait();
}
// 干活
number--;
System.out.println(Thread.currentThread().getName() + "\t" + number);
// 通知
this.notifyAll();
}
那么为什么2个线程没问题 4个线程就会出现虚假唤醒呢
两个进程 一个进程wait()了 释放内存 那么只有另一个线程会得到资源
四个或者多个进程 一个进程wait()了 可能会有相同功能的线程抢到资源继续wait(), 当notifyAll()之后 两个相同功能的线程都被唤醒 导致对资源同时操作两次
JUC生产者消费者写法


package com.luyi.demo;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
// 资源类
class AirConditioner{
private int number = 0;
private Lock lock = new ReentrantLock();
private Condition condition = lock.newCondition();
public void increment() throws InterruptedException {
lock.lock();
try {
// 判断
while (number != 0) {
condition.await();
}
// 干活
number++;
System.out.println(Thread.currentThread().getName() + "\t" + number);
// 通知
condition.signalAll();
}finally {
lock.unlock();
}
}
public void decrement() throws InterruptedException {
lock.lock();
try {
// 判断
while (number == 0) {
condition.await();
}
// 干活
number--;
System.out.println(Thread.currentThread().getName() + "\t" + number);
// 通知
condition.signalAll();
}finally {
lock.unlock();
}
}
}
JUC 精确通知顺序访问
juc比synchronized强的地方 可以实现精准加锁和放锁
需求
多线程之前按顺序调用 实现 A -> b -> c
三个线程启动 要求如下
AA打印5次 BB打印10次 CC打印15次
接着
AA打印5次 BB打印10次 CC打印15次
…来10轮
package com.luyi.demo;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
/** 多线程之前按顺序调用 实现 A -> b -> c
* 三个线程启动 要求如下
* AA打印5次 BB打印10次 CC打印15次
* 接着
* AA打印5次 BB打印10次 CC打印15次
* ....来10轮
* @author 卢意
* > 1. 高内聚低耦合的前提下, 线程 操作 资源类
* > 2. 判断 干活 通知
* > 3. 多线程交互中 需要防止多线程的虚假唤醒 即判断资源状态使用while 不能用if
* > 4. 标志位
* @create 2021-01-10 16:49
*/
public class ThreadOrderAccess {
public static void main(String[] args) {
ShareResource shareResource = new ShareResource();
for (int i = 0; i < 10; i++) {
new Thread(() -> {
try {
shareResource.print5();
} catch (InterruptedException e) {
e.printStackTrace();
}
}, "ThreadNameA").start();
new Thread(() -> {
try {
shareResource.print10();
} catch (InterruptedException e) {
e.printStackTrace();
}
}, "ThreadNameB").start();
new Thread(() -> {
try {
shareResource.print15();
} catch (InterruptedException e) {
e.printStackTrace();
}
}, "ThreadNameC").start();
}
}
}
class ShareResource {
private int number = 1; // 1对应A 2对应B 3对应C
private Lock lock = new ReentrantLock();
private Condition conditionA = lock.newCondition();
private Condition conditionB = lock.newCondition();
private Condition conditionC = lock.newCondition();
public void print5() throws InterruptedException {
lock.lock();
try {
// 判断
while(number != 1) {
conditionA.await();
}
// 干活
for (int i = 0; i < 5; i++) {
System.out.println(Thread.currentThread().getName() + "\t" + (i + 1));
}
// 标志位
number = 2;
// 通知
conditionB.signal();
}finally {
lock.unlock();
}
}
public void print10() throws InterruptedException {
lock.lock();
try {
// 判断
while(number != 2) {
conditionB.await();
}
// 干活
for (int i = 0; i < 10; i++) {
System.out.println(Thread.currentThread().getName() + "\t" + (i + 1));
}
// 标志位
number = 3;
// 通知
conditionC.signal();
}finally {
lock.unlock();
}
}
public void print15() throws InterruptedException {
lock.lock();
try {
// 判断
while(number != 3) {
conditionC.await();
}
// 干活
for (int i = 0; i < 15; i++) {
System.out.println(Thread.currentThread().getName() + "\t" + (i + 1));
}
// 标志位
number = 1;
// 通知
conditionA.signal();
}finally {
lock.unlock();
}
}
}
完整线程口诀
- 高内聚低耦合的前提下, 线程 操作 资源类
- 判断 干活 通知
- 多线程交互中 需要防止多线程的虚假唤醒 即判断资源状态使用while 不能用if
- 标志位
五 多线程8锁的现象和解释
现象
package com.luyi.demo;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
class Phone {
private static Lock lock = new ReentrantLock();
public static void sendEmail() {
lock.lock();
try {
try { TimeUnit.SECONDS.sleep(4); } catch (InterruptedException e) { e.printStackTrace(); }
System.out.println("-------send Email");
}finally {
lock.unlock();
}
}
public void sendSMS() {
lock.lock();
try {
System.out.println("-------send SMS");
}finally {
lock.unlock();
}
}
public void hello() {
System.out.println("-------say hello");
}
}
/**
* 多线程8锁
* 1. 标准访问 Phone的邮件和短信功能 打印顺序不一定(决定权是操作系统)
* 2. 访问 Phone的邮件和短信功能 发邮件方法需要4秒 发短信方法不需要 则执行顺序是先邮件 再短信
* 3. 新增一个普通方法hello不加lock 先执行hello 再执行Email
* 4. 两部手机 一部发邮件 一部发短信 先短信 后邮件
* 5. 两个静态同步方法 同一部手机 先邮件 再短信
* 6. 两个静态同步方法 两部手机 一部发邮件 一部发短信 先邮件 再短信
* 7. 一个静态同步方法 一个非静态同步方法 一部手机 无论哪个方法是静态的 顺序都是先邮件 再短信(我的juc lock方法结果是这样 但是阳哥的synchronized 结果是相反的)
* 8. 一个静态同步方法 一个非静态同步方法 两部手机 无论哪个方法是静态的 顺序都是先邮件 再短信(我的juc lock方法结果是这样 但是阳哥的synchronized 结果是相反的)
*
* @author 卢意
* @create 2021-01-10 17:15
*/
public class Lock8 {
public static void main(String[] args) {
Phone phone1 = new Phone();
Phone phone2 = new Phone();
new Thread(() -> phone1.sendEmail(), "A").start();
new Thread(phone2::sendSMS, "B").start();
}
}
解释
1 和 2 锁的解释
一个对象里面如果有多个synchronized方法,某一个时刻内,只要一个线程去调用其中的一个synchronized方法了,
其它的线程都只能等待,换句话说,某一个时刻内,只能有唯一一个线程去访问这些synchronized方法
锁的是当前对象this(资源类),被锁定后,其它的线程都不能进入到当前对象的其它的synchronized方法
3 锁的解释
加个普通方法后发现和同步锁无关 (手机和手机壳互不相关)
4 锁的解释
换成两个对象后,不是同一把锁了,情况立刻变化。(两个人两个手机)
5 6 锁解释
synchronized实现同步的基础:Java中的每一个对象都可以作为锁。
具体表现为以下3种形式。
对于普通同步方法,锁是当前实例对象。
对于静态同步方法,锁是当前类的Class对象。( 普通同步方法的锁和静态同步方法的锁 两个锁互不影响)
对于同步方法块,锁是Synchonized括号里配置的对象
当一个线程试图访问同步代码块时,它首先必须得到锁,退出或抛出异常时必须释放锁。
也就是说如果一个实例对象的非静态同步方法获取锁后,该实例对象的其他非静态同步方法必须等待获取锁的方法释放锁后才能获取锁,
可是别的实例对象的非静态同步方法因为跟该实例对象的非静态同步方法用的是不同的锁,
所以毋须等待该实例对象已获取锁的非静态同步方法释放锁就可以获取他们自己的锁。
7 8 锁解释 静态同步方法所得是类这个模板 非静态同步方法所得是这个类的实例 的锁不是同一个 无关联
所有的静态同步方法用的也是同一把锁——类对象本身,
这两把锁是两个不同的对象,所以静态同步方法与非静态同步方法之间是不会有竞态条件的。
但是一旦一个静态同步方法获取锁后,其他的静态同步方法都必须等待该方法释放锁后才能获取锁,
而不管是同一个实例对象的静态同步方法之间,
还是不同的实例对象的静态同步方法之间,只要它们同一个类的实例对象!
六 List不安全
举例说明
package com.luyi.demo;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.UUID;
/**
* @author 卢意
* @create 2021-01-10 19:42
*/
public class NoSafeDemo {
public static void main(String[] args) {
List<String> list = new ArrayList<>();
for (int i = 0; i < 3; i++) {
new Thread(() -> {
list.add(UUID.randomUUID().toString().substring(0,8));
System.out.println(list);
}, i + "").start();
}
// 正确结果
// [3fb16395]
// [3fb16395, 4b2551d6]
// [3fb16395, 4b2551d6, a6f52743]
// 但是偶尔会这样
// [null, f69d4ac4]
// [null, f69d4ac4, 41e8ffc4]
// [null, f69d4ac4]
}
}
如过时30个线程 可能还会报错
分析
故障现象
java.util.ConcurrentModificationException
导致原因
ArrayList在迭代的时候如果同时对其进行修改就会
抛出java.util.ConcurrentModificationException异常
并发修改异常
看ArrayList的源码
public boolean add(E e) {
ensureCapacityInternal(size + 1); // Increments modCount!!
elementData[size++] = e;
return true;
}
没有synchronized线程不安全
解决方案
Vector
List<String> list = new Vector<>();
看Vector的源码
public synchronized boolean add(E e) {
modCount++;
ensureCapacityHelper(elementCount + 1);
elementData[elementCount++] = e;
return true;
}

synchronizedList
Collections提供了方法synchronizedList保证list是同步线程安全的
List<String> list = Collections.synchronizedList(new ArrayList<>());

还有一种JUC方法 写时复制
List list = new CopyOnWriteArrayList<>();

关于JUC的写时复制CopyOnWriteArrayList需要详细讲一下
不加锁会出错 加锁性能又有影响
CopyOnWriteArrayList定义
A thread-safe variant of ArrayList in which all mutative operations (add, set, and so on) are implemented by making a fresh copy of the underlying array.
CopyOnWriteArrayList是arraylist的一种线程安全变体,
其中所有可变操作(add、set等)都是通过生成底层数组的新副本来实现的。
源码
/**
* Appends the specified element to the end of this list.
*
* @param e element to be appended to this list
* @return {@code true} (as specified by {@link Collection#add})
*/
public boolean add(E e) {
final ReentrantLock lock = this.lock;
lock.lock();
try {
Object[] elements = getArray();
int len = elements.length;
Object[] newElements = Arrays.copyOf(elements, len + 1);
newElements[len] = e;
setArray(newElements);
return true;
} finally {
lock.unlock();
}
}
CopyOnWrite容器即写时复制的容器。往一个容器添加元素的时候,不直接往当前容器Object[]添加,
而是先将当前容器Object[]进行Copy,复制出一个新的容器Object[] newElements,然后向新的容器Object[] newElements里添加元素。
添加元素后,再将原容器的引用指向新的容器setArray(newElements)。
这样做的好处是可以对CopyOnWrite容器进行并发的读,而不需要加锁,因为当前容器不会添加任何元素。
所以CopyOnWrite容器也是一种读写分离的思想,读和写不同的容器。
六 Set不安全
Set<String> set = new HashSet<>();//线程不安全
一样的在多线程条件下 会报ConcurrentModifyException异常
Set<String> set = Collections.synchronizedSet(new HashSet<>());//线程安全
Set<String> set = new CopyOnWriteArraySet<>();//线程安全
HashSet底层数据结构是什么
看起来是HashSet 但是不仅如此
但HashSet的add是放一个值,而HashMap是放K、V键值对
HashSet的add(Value)方法 就是底层HashMap的Key 而Value是固定的Object对象
因为HashMap的Key是唯一的
也可以得出结论HashSet可以add(Null)

七 Map线程不安全
依然ConcurrentModificationException
Collections.synchronizedMap(new HashMap<>());是加锁后的HashMap
Map<String, Object> map = Collections.synchronizedMap(new HashMap<>());
没有想上面一样有写时复制的HashMap了 但是有ConcurrentHashMap
Map<String, Object> map = new ConcurrentHashMap();
HashMap底层数据结构是什么
数组(Node类型) + 链表(Node类型) + 红黑树(1.8之后) (拉链 | 散列表)
链表长度大于八之后 单项链表将变为红黑树加快效率
所谓Node类型 像是树的数据结构
Hashmap初始大小为 16
每次扩容都是原来的一倍
Hashmap的负载因子是0,75
- 负载因子的大小决定了HashMap的数据密度
- 负载因子越大密度越大,发生碰撞的几率越高,数组中的链表越容易长,造成查询或插入时比较次数增多,性能会下降
- 负载因子越小,就越容易触发扩容,数据密度也越小,意味着发生碰撞的几率越小,数组中链表也就越短,查询和插入时比较的次数也越小,性能会更高。但是会浪费一定的内存空间。而且经常扩容也会影响性能,建议初始化预设大一点的空间
- 会考虑将负载因子设置为0.7~0.75,此时平均检索长度接近于常数

可以通过构造函数设置初始值和负载因子 不写就是 默认 16 和 0.75
八 Callable
多线程中 , 三种获得多线程的方式
- 继承Thread类
- 实现Runnable接口
- 实现 Callable接口

Callable和Runnable的比较
//创建新类MyThread实现runnable接口
class MyThread implements Runnable{
@Override
public void run() {
}
}
//新类MyThread2实现callable接口
class MyThread2 implements Callable<Integer>{
@Override
public Integer call() throws Exception {
return 200;
}
}
面试题:callable接口与runnable接口的区别?
- 是否有返回值
- 是否抛异常
- 落地方法不一样,一个是run,一个是call
Callable可以有返回值 可以检查异常 可以细粒度化的对多线程进行控制和管理
Callable实现方式
Thread不能直接调用Callable的实现类 因为 thread类的构造方法根本没有Callable
但是从中可以分析出 Thread类的构造方法需要实现Runable接口的实现类
通过 java多态的思想,一个类可以实现多个接口!!

MyThread myThread = new MyThread();
FutureTask futureTask= new FutureTask(myThread);
Thread t1 = new Thread(futureTask);
t1.start();
获取返回值
futureTask.get();
完整代码
package com.luyi.demo;
import java.util.concurrent.*;
/**
* 多线程中 , 三种获得多线程的方式
* 1. 继承Thread类
* 2. 实现Runnable接口
* 3.
* @author 卢意
* @create 2021-01-11 9:55
*/
class MyThread implements Callable<Integer> {
@Override
public Integer call() throws Exception {
System.out.println("-----Callable here");
return 200;
}
}
public class CallableDemo {
public static void main(String[] args) throws ExecutionException, InterruptedException {
MyThread myThread = new MyThread();
FutureTask futureTask= new FutureTask(myThread);
Thread t1 = new Thread(futureTask);
t1.start();
System.out.println(futureTask.get());
// 结果
// -----Callable here
// 200
}
}
Callable的一些细节
FutureTask的简单理解
线程再处理一系列问题的时候 如果遇到比较困难的问题 另起一个线程(Callable)去处理这个困难问题 原线程继续解决其他问题
当两个线程的问题都解决完了 再进行汇总 (get方法获取Callable的返回值)
原理上理解FutureTask
- 在主线程中需要执行比较耗时的操作时,但又不想阻塞主线程时,可以把这些作业交给Future对象在后台完成,
当主线程将来需要时,就可以通过Future对象获得后台作业的计算结果或者执行状态。- 一般FutureTask多用于耗时的计算,主线程可以在完成自己的任务后,再去获取结果。
- 仅在计算完成时才能检索结果;如果计算尚未完成,则阻塞 get 方法。一旦计算完成,
就不能再重新开始或取消计算。get方法而获取结果只有在计算完成时获取,否则会一直阻塞直到任务转入完成状态,- 然后会返回结果或者抛出异常。
两个或多个线程进行调用同一个Callable接口的call()方法只计算一次(可以理解为 比如让Callable接口去计算 1 + 1 计算结果已经知道是2了 在调用一次 就不在进行计算了)
get方法一般放在最后一行
九 JUC强大的辅助类
CountDownLatch
CountDownLatch主要有两个方法,当一个或多个线程调用await方法时,这些线程会阻塞。
其它线程调用countDown方法会将计数器减1(调用countDown方法的线程不会阻塞),
当计数器的值变为0时,因await方法阻塞的线程会被唤醒,继续执行。
package com.luyi.demo;
import java.util.concurrent.CountDownLatch;
/**
* 需求 当同学走光了 班长再关门
* @author 卢意
* @create 2021-01-11 11:07
*/
public class CountDownLatchDemo {
public static void main(String[] args) throws InterruptedException {
CountDownLatch countDownLatch = new CountDownLatch(6);
for (int i = 0; i < 6; i++) {
new Thread(() -> {
System.out.println(Thread.currentThread().getName() + "\t 离开教室");
countDownLatch.countDown();
}, i + "").start();
}
countDownLatch.await(); // 阻塞住后面的线程 直到计数器为0
System.out.println(Thread.currentThread().getName() + "\t 班长关门");
//0 离开教室
//1 离开教室
//2 离开教室
//3 离开教室
//4 离开教室
//5 离开教室
//main 班长关门
}
private static void closeDoor() {
for (int i = 0; i <= 6; i++) {
new Thread(() -> System.out.println(Thread.currentThread().getName() + "\t 离开教室"), i + "").start();
}
System.out.println(Thread.currentThread().getName() + "\t 班长关门");
}
}
CyclicBarrier
CyclicBarrier
的字面意思是可循环(Cyclic)使用的屏障(Barrier)。它要做的事情是,
让一组线程到达一个屏障(也可以叫同步点)时被阻塞,
直到最后一个线程到达屏障时,屏障才会开门,所有
被屏障拦截的线程才会继续干活。
线程进入屏障通过CyclicBarrier的await()方法。
package com.luyi.demo;
import java.util.concurrent.BrokenBarrierException;
import java.util.concurrent.CyclicBarrier;
/**
* @author 卢意
* @create 2021-01-11 11:27
*/
public class CyclicBarrierDemo {
public static void main(String[] args) {
// CyclicBarrier(int parties, Runnable barrierAction)
CyclicBarrier cyclicBarrier = new CyclicBarrier(7,() -> {
System.out.println("人到齐了 开始开会");
});
for (int i = 0; i < 7; i++) {
new Thread(() -> {
System.out.println(Thread.currentThread().getName() + "\t 进入会议室");
try {
cyclicBarrier.await();
} catch (InterruptedException e) {
e.printStackTrace();
} catch (BrokenBarrierException e) {
e.printStackTrace();
}
}, i + "").start();
}
}
}
Semaphore
在信号量上我们定义两种操作:
acquire(获取) 当一个线程调用acquire操作时,它要么通过成功获取信号量(信号量减1),
要么一直等下去,直到有线程释放信号量,或超时。
release(释放)实际上会将信号量的值加1,然后唤醒等待的线程。
信号量主要用于两个目的,一个是用于多个共享资源的互斥使用,另一个用于并发线程数的控制。
package com.luyi.demo;
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
/**
* @author 卢意
* @create 2021-01-11 13:38
*/
public class SemaphoreDemo {
public static void main(String[] args) {
Semaphore semaphore = new Semaphore(3); // 模拟资源类有三个
for (int i = 0; i < 10 ; i++) {
new Thread(() -> {
try {
semaphore.acquire(); // 资源类减一
System.out.println(Thread.currentThread().getName() + "\t 抢到内存");
try { TimeUnit.SECONDS.sleep(3); } catch (InterruptedException e) { e.printStackTrace(); }
System.out.println(Thread.currentThread().getName() + "\t 释放内存");
} catch (InterruptedException e) {
e.printStackTrace();
}finally {
semaphore.release(); // 资源类加一
}
}, i + "").start();
}
}
}
十 ReentrantReadWriteLock 读写锁
加入读写锁 使得写操作只有一个线程进行操作 而读的操作可以并发操作 但是在写操作的时候 不能进行读操作
package com.luyi.demo;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
/**
* 多个线程同时读一个资源类 没有问题. 所以为了满足并发量 读取共享资源应该可以同时进行
* 但是
* 如果有一个线程想去写共享资源,就不应该在有其他线程可以对资源进行度或者写
*总结
* 读 - 读 可以共存
* 读 - 写 不能共存
* 写 - 写 不能共存
* @author 卢意
* @create 2021-01-11 13:53
*/
class MyCache {
private volatile Map<String, Object> map = new HashMap<>();
private ReadWriteLock readWriteLock = new ReentrantReadWriteLock();
public void put(String key, Object value) {
readWriteLock.writeLock().lock();
try {
System.out.println(Thread.currentThread().getName() + "\t 开始写入");
try { TimeUnit.SECONDS.sleep(2); } catch (InterruptedException e) { e.printStackTrace(); }
map.put(key, value);
System.out.println(Thread.currentThread().getName() + "\t ----写入成功");
} catch (Exception e){
e.printStackTrace();
}finally {
readWriteLock.writeLock().unlock();
}
}
public void get(String key) {
readWriteLock.readLock().lock();
try {
System.out.println(Thread.currentThread().getName() + "\t 开始读取");
try { TimeUnit.SECONDS.sleep(2); } catch (InterruptedException e) { e.printStackTrace(); }
Object o = map.get(key);
System.out.println(Thread.currentThread().getName() + "\t ----读取成功 " + o);
} catch (Exception e){
e.printStackTrace();
}finally {
readWriteLock.readLock().unlock();
}
}
}
public class ReentrantReadWriteLockDemo {
public static void main(String[] args) {
MyCache myCache = new MyCache();
for (int i = 0; i < 5; i++) {
final int tempInt = i;
new Thread(() -> {
myCache.put(tempInt + "", tempInt);
}, i + "").start();
}
for (int i = 0; i < 5; i++) {
final int tempInt = i;
new Thread(() -> {
myCache.get(tempInt + "");
}, i + "").start();
}
// 结果
// 1 开始写入
//1 ----写入成功
//0 开始写入
//0 ----写入成功
//2 开始写入
//2 ----写入成功
//3 开始写入
//3 ----写入成功
//4 开始写入
//4 ----写入成功
//0 开始读取
//1 开始读取
//2 开始读取
//3 开始读取
//4 开始读取
//0 ----读取成功 0
//1 ----读取成功 1
//2 ----读取成功 2
//3 ----读取成功 3
//4 ----读取成功 4
//
}
}
十一 BlockingQueueDemo 阻塞队列
阻塞队列的介绍

当队列是空的,从队列中获取元素的操作将会被阻塞
当队列是满的,从队列中添加元素的操作将会被阻塞
试图从空的队列中获取元素的线程将会被阻塞,直到其他线程往空的队列插入新的元素
试图向已满的队列中添加新元素的线程将会被阻塞,直到其他线程从队列中移除一个或多个元素或者完全清空,使队列变得空闲起来并后续新增
阻塞
在多线程领域:所谓阻塞,在某些情况下会挂起线程(即阻塞),一旦条件满足,被挂起的线程又会自动被唤起
为什么需要BlockingQueue
好处是我们不需要关心什么时候需要阻塞线程,什么时候需要唤醒线程,因为这一切BlockingQueue都给你一手包办了
在concurrent包发布以前,在多线程环境下,我们每个程序员都必须去自己控制这些细节,尤其还要兼顾效率和线程安全,而这会给我们的程序带来不小的复杂度。

实现类的介绍
ArrayBlockingQueue (重要)
由数据结构组成的有界阻塞队列
LinkedBlockingQueue (重要)
由链表结构组成的有界但大小默认值为Integer,Max.VALUE)阻塞队列
PriorityBlockingQueue
支持优先级排序的无界阻塞队列
DelayQueue
使用优先级队列实现的延迟无界阻塞队列
SynchronousQueue(重要)
不储存元素的队列, 也即单个元素的队列
LinkedTransferQueue
由链表组成的无界阻塞队列
linkedBlockingDeque
由链表组成的双向阻塞队列
核心方法

十二 线程池
线程池的介绍
- 线程池做的工作只要是控制运行的线程数量,处理过程中将任务放入队列,然后在线程创建后启动这些任务,如果线程数量超过了最大数量,超出数量的线程排队等候,等其他线程执行完毕,再从队列中取出任务来执行。
- . 它的主要特点为:线程复用;控制最大并发数;管理线程。
线程池的优势
第一:降低资源消耗。通过重复利用已创建的线程降低线程创建和销毁造成的销耗。
第二:提高响应速度。当任务到达时,任务可以不需要等待线程创建就能立即执行。
第三:提高线程的可管理性。线程是稀缺资源,如果无限制的创建,不仅会销耗系统资源,还会降低系统的稳定性,使用线程池可以进行统一的分配,调优和监控
线程池的三大方法
Java中的线程池是通过Executor框架实现的,该框架中用到了Executor,Executors,ExecutorService,ThreadPoolExecutor这几个类
ExecutorService接口用的比较多 实现类为ThreadPoolExecutor
Executors相当于Arrays
Executors.newFixedThreadPool(int n)
例子
package com.luyi.demo;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
/**
* @author 卢意
* @create 2021-01-11 15:36
*/
public class MyThreadPoolDemo {
public static void main(String[] args) {
ExecutorService threadPool = Executors.newFixedThreadPool(5); // 一池5个受理线程
try {
// 模拟10个顾客办理业务 但是只有5个员工办理业务
for (int i = 0; i < 10; i++) {
threadPool.execute(() -> {
System.out.println(Thread.currentThread().getName() + "\t 办理业务");
});
}
} catch (Exception e){
e.printStackTrace();
}finally {
threadPool.shutdown();
}
// pool-1-thread-1 办理业务
// pool-1-thread-3 办理业务
// pool-1-thread-2 办理业务
// pool-1-thread-3 办理业务
// pool-1-thread-2 办理业务
// pool-1-thread-3 办理业务
// pool-1-thread-3 办理业务
// pool-1-thread-2 办理业务
// pool-1-thread-4 办理业务
// pool-1-thread-5 办理业务
}
}
源 码
public static ExecutorService newFixedThreadPool(int nThreads) {
return new ThreadPoolExecutor(nThreads, nThreads,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>());
}
newFixedThreadPool创建的线程池corePoolSize和maximumPoolSize值是相等的,它使用的是LinkedBlockingQueue
执行长期任务性能好, 创建一个线程池 , 一个线程池里有N个固定线程, 即有固定线程数的线程池
newSingleThreadExecutor
例子
package com.luyi.demo;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
/**
* @author 卢意
* @create 2021-01-11 15:36
*/
public class MyThreadPoolDemo {
public static void main(String[] args) {
ExecutorService threadPool = Executors.newSingleThreadExecutor(); // 一池一个受理线程
try {
// 模拟10个顾客办理业务 但是只有5个员工办理业务
for (int i = 0; i < 10; i++) {
threadPool.execute(() -> {
System.out.println(Thread.currentThread().getName() + "\t 办理业务");
});
}
} catch (Exception e){
e.printStackTrace();
}finally {
threadPool.shutdown();
}
// pool-1-thread-1 办理业务
// pool-1-thread-1 办理业务
// pool-1-thread-1 办理业务
// pool-1-thread-1 办理业务
// pool-1-thread-1 办理业务
// pool-1-thread-1 办理业务
// pool-1-thread-1 办理业务
// pool-1-thread-1 办理业务
// pool-1-thread-1 办理业务
// pool-1-thread-1 办理业务
}
}
源码
public static ExecutorService newSingleThreadExecutor() {
return new FinalizableDelegatedExecutorService
(new ThreadPoolExecutor(1, 1,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>()));
}
newSingleThreadExecutor 创建的线程池corePoolSize和maximumPoolSize值都是1,它使用的是LinkedBlockingQueue
一个任务一个任务的执行 , 一个线程池一个线程
newCachedThreadPool
例子
package com.luyi.demo;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
/**
* @author 卢意
* @create 2021-01-11 15:36
*/
public class MyThreadPoolDemo {
public static void main(String[] args) {
ExecutorService threadPool = Executors.newCachedThreadPool(); // 一池N个受理线程
try {
// 模拟10个顾客办理业务 但是只有5个员工办理业务
for (int i = 0; i < 10; i++) {
threadPool.execute(() -> {
System.out.println(Thread.currentThread().getName() + "\t 办理业务");
});
}
} catch (Exception e){
e.printStackTrace();
}finally {
threadPool.shutdown();
}
// pool-1-thread-1 办理业务
// pool-1-thread-2 办理业务
// pool-1-thread-3 办理业务
// pool-1-thread-4 办理业务
// pool-1-thread-5 办理业务
// pool-1-thread-6 办理业务
// pool-1-thread-9 办理业务
// pool-1-thread-8 办理业务
// pool-1-thread-10 办理业务
// pool-1-thread-7 办理业务
}
}
源码
public static ExecutorService newCachedThreadPool() {
return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
60L, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>());
}
newCachedThreadPool创建的线程池将corePoolSize设置为0,将maximumPoolSize设置为Integer.MAX_VALUE,它使用的是SynchronousQueue,也就是说来了任务就创建线程运行,当线程空闲超过60秒,就销毁线程。
执行很多短期异步认为 线程池根据需要创建新线程, 但在先前构建的线程可用时将重用他们, 可扩容
线程池的七大参数
/**
* Creates a new {@code ThreadPoolExecutor} with the given initial
* parameters.
*
* @param corePoolSize 线程池的常驻核心线程数
* @param maximumPoolSize 线程池中能狗容纳同时执行的最大线程数,此值必须大于等于1
* @param keepAliveTime 多于线程的存或时间 当线程池中的线程数量超过corePoolSize时,
* 当空闲时间到达keepAlivetime时, 多余线程会被销毁知道只剩下corePoolSize个线程位为止
* @param unit keepAliveTime 的单位
* @param workQueue 任务队列, 被提交但尚未被执行的任务
* @param threadFactory 表示生产线程池中工作线程的线程工厂 (一般默认即可)
* @param handler 拒绝策略, 表示队列满了,并且工作线程大于等于线程池的最大数maximumPoolSize时,
* 如何来拒绝请求执行的Runable的策略
*/
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler) {
if (corePoolSize < 0 ||
maximumPoolSize <= 0 ||
maximumPoolSize < corePoolSize ||
keepAliveTime < 0)
throw new IllegalArgumentException();
if (workQueue == null || threadFactory == null || handler == null)
throw new NullPointerException();
this.corePoolSize = corePoolSize;
this.maximumPoolSize = maximumPoolSize;
this.workQueue = workQueue;
this.keepAliveTime = unit.toNanos(keepAliveTime);
this.threadFactory = threadFactory;
this.handler = handler;
}
线程池的底层工作原理


- 在创建了线程池后,开始等待请求。
- 当调用execute()方法添加一个请求任务时,线程池会做出如下判断:
2.1 如果正在运行的线程数量小于corePoolSize,那么马上创建线程运行这个任务;
2.2 如果正在运行的线程数量大于或等于corePoolSize,那么将这个任务放入队列;
2.3 如果这个时候队列满了且正在运行的线程数量还小于maximumPoolSize,那么还是要创建非核心线程立刻运行这个任务;
2.4 如果队列满了且正在运行的线程数量大于或等于maximumPoolSize,那么线程池会启动饱和拒绝策略来执行。
- 当一个线程完成任务时,它会从队列中取下一个任务来执行。
- 当一个线程无事可做超过一定的时间(keepAliveTime)时,线程会判断:
4.1 如果当前运行的线程数大于corePoolSize,那么这个线程就被停掉。
4.2 所以线程池的所有任务完成后,它最终会收缩到corePoolSize的大小。
线程池的用哪个

四大拒绝策略
什么是拒绝策略
等待队列已经排满了,再也塞不下新任务了
同时,
线程池中的max线程也达到了,无法继续为新任务服务。
这个是时候我们就需要拒绝策略机制合理的处理这个问题
ThreadPoolExecutor.AbortPolicy() 默认情况
直接抛出RejectedExecutionException 异常阻止系统正常运行
ThreadPoolExecutor. CallerRunsPolicy()
“调用者运行” 一种调节机制, 该策略既不会抛弃任务, 也不会抛出异常, 而是将某些任务回退到调用者, 从而降低新任务的流量.
main 办理业务
main 办理业务
pool-1-thread-1 办理业务
pool-1-thread-1 办理业务
pool-1-thread-1 办理业务
pool-1-thread-1 办理业务
pool-1-thread-2 办理业务
pool-1-thread-3 办理业务
pool-1-thread-4 办理业务
pool-1-thread-5 办理业务
ThreadPoolExecutor.DiscardOldestPolicy()
抛弃队列中等待最久的任务, 然后把当前任务加到队列中,尝试再次提交当前任务
ThreadPoolExecutor.DiscardPolicy()
该策略默默的丢弃无法处理的任务, 不予任何处理也不抛出异常, 如果允许任务丢失,这是最好的一种策略
以上拒绝策略都实现了
RejectExcutionHandle接口
参数设置
CPU密集型
最大线程数 是运行环境的cpu内核数 + 1
System.out.println(Runtime.getRuntime().availableProcessors());
IO密集型
一种考虑到IO密集型,大部分线程都阻塞,因此需要多配置线程数 :
参考公式 : cpu核数 /(1 - 阻塞系数) 阻塞系数:0.8 ~ 0.9
比如8核cpu :8 / (1 - 0.9)=80个线程数
十三 四大函数式接口
四大函数式接口可以配合Lambda表达式 简化开发
向下面介绍的流式计算的filter就是用到了Predicate断言接口
十四 Java8Stream流式计算
流(Stream是什么)
是数据渠道,用于操作数据源(集合、数组等)所生成的元素序列。
“集合讲的是数据,流讲的是计算!”
流(Stream的特点)
- Stream自己不会存储元素
- Stream不会改变源对象. 相反, 他们会返回一个持有结果的新Stream
- Stream操作时延迟执行的. 这意味着他们会等到乣结果的时候才执行

Stream map(Function<? super T, ? extends R> mapper); 是将集合映射成一个新数组再将里面的值进行你想要的操作 如下面将User的userName转为大写
.map((user -> user.getUserName().toUpperCase()))
package com.luyi.demo;
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
import java.util.Arrays;
import java.util.List;
/**
* @author 卢意
* @create 2021-01-11 20:31
*/
@Data
@AllArgsConstructor
@NoArgsConstructor
class User {
private int id;
private String userName;
private int age;
}
/**
* 找出满足
* 偶数Id 且年龄大于24 且用户名转为大写且用户名字母倒排序
* 只输出一个名字 E
*/
public class StreamDemo {
public static void main(String[] args) {
User user1 = new User(11, "a", 23);
User user2 = new User(12, "b", 24);
User user3 = new User(13, "c", 22);
User user4 = new User(14, "d", 28);
User user5 = new User(16, "e", 26);
List<User> list = Arrays.asList(user1, user2, user3, user4, user5);
list.stream().filter((User) -> User.getId() % 2 == 0 && User.getAge() > 24)
.map((user -> user.getUserName().toUpperCase()))
.sorted((u1, u2) -> { return u2.compareTo(u1); }).limit(1).forEach(System.out::println);
}
}
感觉java再把事务的方面完善一下都可以吧数据库给干了 …
十五 分支合并框架ForkJoin
ForkJoin模式介绍
Fork:把一个复杂任务进行分拆,大事化小
Join:把分拆任务的结果进行合并
黑色fork
蓝色join

相关类
分支合并池 ForkJoinPool 类比=> 线程池

ForkJoinTask 类比=> FutureTask
递归任务RecursiveTask:继承后可以实现递归(自己调自己)调用的任务
class Fibonacci extends RecursiveTask<Integer> {
final int n;
Fibonacci(int n) { this.n = n; }
Integer compute() {
if (n <= 1)
例子
package com.luyi.demo;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.ForkJoinTask;
import java.util.concurrent.RecursiveTask;
/**
* @author 卢意
* @create 2021-01-12 10:08
*/
class MyTask extends RecursiveTask<Integer> {
// 10以内不需要分开
private static final Integer ADJUSTER_VALUE = 10;
private int begin;
private int end;
private int result;
public MyTask(int begin, int end) {
this.begin = begin;
this.end = end;
}
@Override
protected Integer compute() {
if (end - begin <= ADJUSTER_VALUE) { // 不用分支
for (int i = begin; i <= end; i++) {
result = result + i;
}
} else {
int mid = (end + begin) / 2;
MyTask task1 = new MyTask(begin, mid);
MyTask task2 = new MyTask(mid + 1, end);
task1.fork();
task2.fork();
result = task1.join() + task2.join();
}
return result;
}
}
public class ForkJoinDemo {
public static void main(String[] args) {
MyTask myTask = new MyTask(0,100);
ForkJoinPool forkJoinPool = new ForkJoinPool();
ForkJoinTask<Integer> forkJoinTask = forkJoinPool.submit(myTask);
try {
System.out.println(forkJoinTask.get());
} catch (InterruptedException | ExecutionException e) {
e.printStackTrace();
} finally {
forkJoinPool.shutdown();
}
}
}
十五 异步回调CompletableFuture
例子
package com.luyi.demo;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
/**
* @author 卢意
* @create 2021-01-12 10:34
*/
public class CompletableFutureDemo {
public static void main(String[] args) throws ExecutionException, InterruptedException {
// CompletableFuture.runAsync(Runnable) 没有返回值
CompletableFuture<Void> completableFuture = CompletableFuture.runAsync(() -> {
System.out.println(Thread.currentThread().getName() + "\t 没有返回值");
});
completableFuture.get(); // ForkJoinPool.commonPool-worker-1没有返回值
CompletableFuture<Integer> integerCompletableFuture = CompletableFuture.supplyAsync(() -> {
System.out.println(Thread.currentThread().getName() + "\t 有返回值");
int a = 10/0; // 模拟异常
return 1024;
});
integerCompletableFuture.whenComplete((t, u) -> {
System.out.println("-----t : " + t);
System.out.println("-----u : " + u);
}).exceptionally(f -> {
System.out.println("-----exception: " + f.getMessage());
return 500;
});
// 正常情况的结果
// ForkJoinPool.commonPool-worker-1 没有返回值
// ForkJoinPool.commonPool-worker-1 有返回值
// -----t : 1024
// -----u : null
// 异常情况的结果
// ForkJoinPool.commonPool-worker-1 没有返回值
// ForkJoinPool.commonPool-worker-1 有返回值
// -----t : null
// -----u : java.util.concurrent.CompletionException: java.lang.ArithmeticException: / by zero
// -----exception: java.lang.ArithmeticException: / by zero
}
}
