关于并发编程,粗略的将其划分为应用层和原理层两个部分;应用层又分为关键字、锁、工具,三个部分;原理层
分为Monitor、cas、aqs、内存屏障、CLH五个部分,接下来会详细的从应用层展开将原理层和应用层结合起来叙述

一、关键字

volatile

  1. 内存可见性

    我们知道volatile关键字变成汇编会产生“lock“前缀,该前缀对于早期的CPU采用的是锁总线的方式(LOCK#),保证操作的“原子性”;早期的CPU为了操作的原子性使得其他CPU都无法工作使得计算机运行效率大大的降低
    为了解决该问题,从P6 CPU开始就做了一个优化,改用Ringbus + MESI协议(Cache Locking),CPU会缓存主存中的数据,但是数据还是要写会主存因此可以使用其优化,尽量不再去锁总线。
    如果从我们java代码层面去理解就是当A和B两个线程去更新(无锁条件下的状态性赋值)|读取一个变量值,都能读取到最新的值

  2. 禁止指令重排序

    我们的java代码中两行代码如果前后没有关联的情况下CPU会优化代码的执行顺序,单线程模式下可能没什么问题,但是多线程模式下就容易出现问题;我们常见的单例模式饿汉式如果不加volatile关键字就会在对象实例
    化的时候出现问题,在下述代码中展示;
    关于禁止指令重排序,volatile关键字会在写指令后会加入写屏障,读指令前加上读屏障,至于什么是读写屏障,可以看看这篇文章

  3. 内存屏障如何添加

    个人觉得这篇文章将的及其透彻点击这里

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
//防止产生子类
public final class Singleton{

// volatile 禁止指令重排序
private static volatile Singleton INSTANCE = null;
private Singleton(){
// 防止反射破坏
if(INSTANCE != null){
throw new RuntimeException("system exception");
}
}

/**
*
* 通过以下字节码我们可知 INSTANCE = new Singleton();这个代码有四条指令,在第21和24条指令有可能会发生后面的先执行,所以需要加上volatile关键字禁止指令重排序
*
* 17 new #3 <structClass/Thread/Singleton>
* 20 dup
* 21 invokespecial #4 <structClass/Thread/Singleton.<init>>
* 24 putstatic #2 <structClass/Thread/Singleton.INSTANCE>
* @return
*/
public static Singleton getInstance(){
if(INSTANCE == null){
synchronized (Singleton.class){
if(INSTANCE == null){
INSTANCE = new Singleton();
}
}
}
return INSTANCE;
}

//防止反序列化破坏
private Object readResolve(){
return INSTANCE;
}
}

注意:

volatile无法保证类似于i++之类操作的原子性

final

  1. 作用

    final和static关键字组合作为类常量使用,在字节码加载阶段就已经初始化值了,后续会放在常量池中,使用类常量时可不必加载类;修饰类和方法时不能被子类重写或者继承;final类中的成员方法都会被隐式的指定为final方法

  2. 原理

    编译器会在 final 域的写之后,构造函数 return 之前,插入一个 StoreStore 屏障。这个屏障禁止处理器把 final 域的写重排序到构造函数之外;读 final 域的重排序规则要求编译器在读 final 域的操作前面插入一个 LoadLoad 屏障。

二、锁

synchronized(内部锁/监管所)

1.使用

  1. 在普通方法代码块或普通方法修饰符中锁的是this对象(this对象在本地方法变量表中处于第一位)
  2. 在静态方法中锁的对象是xxx.class(静态代码块在类初始化前就已经初始化了)
  3. 修饰代码块时,则对指定对象加锁

2.重量级锁(锁状态10)

  1. Monitor机制?

    通过上图可知Monitor有一种状态两个集合

Owner:当前正在运行的线程会将Monitor对象中Owner置为自己,说明自己正在执行
EntryList:后续进入的线程检测到Owner已被其它线程占有,便在EntrySet中等待Owner被释放
WaitSet:处于Owner的线程不满足执行条件时会去WaitSet中等待
Counter:可重入次数,该字段保证了用同一个锁对象的多个方法可以互相调用而不需要阻塞
所有的Object对象是一个天然的Monitor,以上所说的几种类型就是存储在ObjectMonitor中的,monitor的本质是依赖于底层操作系统的Mutex Lock实现,操作系统实现线程之间的切换需要从用户态到内核态的切换,切换成本非常高。

  1. 什么是重量级锁?
    A:重量级锁其实是一种状态,synchronized所处的一种状态;我们暂且将这种状态称之为“上锁”,如果将访问的公共资源比作房间,那么synchronized的这个状态就是对该房间上一把锁,每次使用时上锁使用完之后解锁
  2. 专业一点?
    B:当我们使用synchronized关键字时锁对象的Mark word会指向Monitor,如果Owner未被占用那么自己成为Owner并执行相关逻辑,如果Owner已被占用那么就在EntryList中等待Owner被释放
  3. 可否有证据?
    C:当synchronized修饰代码块时我们可以通过字节码指令对标到synchronized关键字,如下字节码第2,3行就是获取锁第4,5行释放锁;但如果修饰方法那么字节码中不会有显示指令集出现,但是会在方法描述标记flags处标记为ACC_SYNCHRONIZED;一个monitorenter对应两个monitorexit是保证在正常和异常情况都能释放锁
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    /**
    * 0 aload_0
    * 1 dup
    * 2 astore_1
    * 3 monitorenter
    * 4 aload_1
    * 5 monitorexit
    * 6 goto 14 (+8)
    * 9 astore_2
    * 10 aload_1
    * 11 monitorexit
    * 12 aload_2
    * 13 athrow
    * 14 return
    */
    public void test(){
    /**
    * 因为这是非静态方法里面默认参数有this
    */
    synchronized(this){

    }
    }
  4. Monitor机制有什么缺点吗?
    D:Monitor的介绍中我们知道其底层实现是一个高成本操作,所以当synchronized升级到重量级锁时,代价会比较大;但是有些时候多个线程不会同时去访问一个资源,可能会错峰或者仅有当前线程去访问资源,如果一直是重量级锁那该段代码执行效率会大打折扣,synchronized关键字会让人又爱又狠,所以jdk1.6之后引入了轻量级锁

3.轻量级锁(锁状态00)

  1. 什么是轻量级锁?
    A: 同重量级锁,我们也可以将其理解为一种状态,同样将访问的公共资源比作房间,因为这个房间很可能就自己一个人用,如果开门关门都要上锁,会浪费时间占用了办事的时间,所以我们可以在进房间前在门口摆双自己的鞋子(上锁),用完了穿上鞋子(解锁)
  2. 专业点?
    B:由于重量级锁会有频繁的用户态和内核态切换的低效操作,因此引入了轻量级锁;从普通业务开发角度看无法显示的体会到二者的区别,但是底层实现中对其做了详尽的区分
  3. 有多详尽?
    C:从上面的重量级锁,我们知道锁对象的Mark word指向了Monitor;而轻量级锁则是在当前栈帧中创建一个锁记录(Lock Record),并将锁对象的Mark word拷贝到锁记录中,然后通过CAS操作将LockRecord引用地址替换到锁对象的Mark word中
  4. 如果替换失败怎么办?
    D: 这个时候当前线程会占用处理器时间等待其他线程释放锁并尝试替换操作(自旋),如果一段时间还没有释放,这时会发生锁膨胀为重量级锁,且当前线程则会被挂起
  5. 它又是如何跟重量级锁产生交集的?
    F: 这个就涉及到了它的解锁(穿鞋)操作,解锁过程就是通过CAS操作将锁对象Mark word换回到对象头中,如果替换失败(期望值与实际值不符),则说明产生了竞争,此时轻量级锁已经膨胀为重量级锁,进入重量级锁释放锁流程

    4.偏向锁(锁状态01)

  6. 什么是偏向锁?
    A: 同上,这个房间很有可能是个很偏的房子,就你一个人用,所以你大可不必每次都拖鞋穿鞋,为了方便标识可以在门口用粉笔写上自己的名字,以后每次进入房间看门口是否有自己名字,如果有直接进入,如果没有则写上自己名字
  7. 专业点?
    B: 因为很多情况下,锁不仅不存在竞争,而且多次由一个线程获得,为了代价更低因此引入了偏向锁;即当一个线程访问同步块时,会在对象头和栈帧的锁记录存锁偏向的线程ID
  8. 如何解锁的?
    C: 从大白话描述我们可以知道,只有当其它线程尝试竞争偏向锁时,持有偏向锁的线程才会释放,但是偏向锁撤销时需要等待一个全局的安全点(该时间点没有任何字节码正在执行),然后暂停拥有偏向锁的线程,然后检测持有偏向锁的线程是否存活,如果非RUNNING状态则置为无锁;如果处于RUNNING状态,则遍历栈中的锁记录要么重新偏向要么置于无锁

ReentrantLock

  不同于synchronized的是,ReentrantLock是基于java实现的同步队列,我们可以从源码了解其实现原理

1.数据结构

我们知道ReentrantLock的有参构造函数可以实现公平锁,其底层会创建如下代码所示的基于公平机制实现的同步器

1
2
3
4
5
6
7
8
9
10
11
/**
* 非公平机制 同步器
*/
static final class NonfairSync extends Sync {
}

/**
* 公平机制同步器
*/
static final class FairSync extends Sync {
}

两个同步器都是基于同一个父类Sync,其又是基于同步队列实现即AQS;AQS设计使用了FIFO先入先出队列,且不支持优先级队列,底层参照了CLH队列实现

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
abstract static class Sync extends AbstractQueuedSynchronizer {

//底层是Node组成的双端队列,
static final class Node {
/** 指示节点在共享模式中等待的标记. */
static final Node SHARED = new Node();
/** 指示节点在独占模式中等待的标记. */
static final Node EXCLUSIVE = null;

/** 由于在同步队列中等待的线程等待超时或者被中断,需要从同步队列中取消等待,节点进入该状态将不会变化 */
static final int CANCELLED = 1;
/** 后继结点的线程处于等待状态,而当前节点的线程如果释放了同步状态或者被取消,
*将会通知后继节点,使后继节点的线程得以运行
*/
static final int SIGNAL = -1;
/** 节点在等待队列中,节点线程等待在Condition上,
*当其他线程对Condition调用了Signal()方法后,该节点将会从等待队列转移到同步队列中,加入到对同步状态的获取中 */
static final int CONDITION = -2;
/**
* 表示下一次共享式同步状态获取将会无条件地被传播下去
*/
static final int PROPAGATE = -3;

//线程等待状态 (为以上几个属性)
volatile int waitStatus;

//前驱节点
volatile Node prev;
//后继节点
volatile Node next;

volatile Thread thread;

//下个等待的节点
Node nextWaiter;

final boolean isShared() {
return nextWaiter == SHARED;
}
final Node predecessor() throws NullPointerException {
Node p = prev;
if (p == null)
throw new NullPointerException();
else
return p;
}
Node() { // Used to establish initial head or SHARED marker
}
Node(Thread thread, Node mode) { // Used by addWaiter
this.nextWaiter = mode;
this.thread = thread;
}

Node(Thread thread, int waitStatus) { // Used by Condition
this.waitStatus = waitStatus;
this.thread = thread;
}
}
}

2.加锁

非公平锁上锁

以下便是其上锁代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
//上锁
final void lock() {
//1获取锁成功 通过CAS机制设置state,期望该值为0,即没有被线程占用,无竞争;设置为1之后代表拿到独占锁并把把自己设置进去
if (compareAndSetState(0, 1))
setExclusiveOwnerThread(Thread.currentThread());
//2 获取锁失败
else
acquire(1);
}


// 2.1 AbstractQueuedSynchronizer
public final void acquire(int arg) {
//a. 通过tryAcquire校验是否再次能获取锁或者是否是重入锁 - > 失败
//b. 将当前线程加入队列尾部
//c. 当上个节点是头结点且当前线程获取到锁就设置自己为头结点;且通过上个节点的状态判断自己是否应该进入阻塞状态
if (!tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}

// 2.1.1 tryAcquire方法底层实现 AbstractQueuedSynchronizer.nonfairTryAcquire
final boolean nonfairTryAcquire(int acquires) {
final Thread current = Thread.currentThread();
int c = getState();
//如果当前没有获取锁
if (c == 0) {
//尝试获取锁并返回
if (compareAndSetState(0, acquires)) {
setExclusiveOwnerThread(current);
return true;
}
}
//如果检测到已获取的锁是自身,说明发生了锁重入
else if (current == getExclusiveOwnerThread()) {
//设置重入次数,通过state属性记录
int nextc = c + acquires;
if (nextc < 0) // overflow
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
return false;
}

//2.1.2 AbstractQueuedSynchronizer
private Node addWaiter(Node mode) {
// 将当前线程关联到一个 Node 对象上, 模式为独占模式
//此处mode为空,可以看出来头结点是空的,称为 Dummy(哑元)或哨兵,用来占位,并不关联线程
Node node = new Node(Thread.currentThread(), mode);
//如果 tail 不为 null, cas 尝试将 Node 对象加入 AQS 队列尾部
Node pred = tail;
if (pred != null) {
node.prev = pred;
if (compareAndSetTail(pred, node)) {
// 双向链表
pred.next = node;
return node;
}
}
// 尝试将 Node 加入 AQS
enq(node);
return node;
}

//2.1.2 AbstractQueuedSynchronizer
//其实到了这里就不断的尝试去获取锁,失败后进入 park 阻塞
final boolean acquireQueued(final Node node, int arg) {
boolean failed = true;
try {
boolean interrupted = false;
for (;;) {
// 上一个节点是 head, 表示轮到自己(当前线程对应的 node)了, 尝试获取
final Node p = node.predecessor();
if (p == head && tryAcquire(arg)) {
// 获取成功, 设置自己(当前线程对应的 node)为 head
setHead(node);
p.next = null; // help GC
failed = false;
return interrupted;
}
//通过判断上个节点是否阻塞,如果不是且为非取消的,然后将其waitstatus设置为-1
// 然后阻塞当前线程,后续线程进入之后会组成一个祖册链表
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;
}
} finally {
if (failed)
cancelAcquire(node);
}
}

公平锁上锁

从源码可知公平锁上锁直接进入了AQS的acquire方法,通过追踪底层源码可知acquire方法底层实现的tryAcquire方法跟非公平锁有所差异其他的都一样,我们可以专门分析下tryAcquire方法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
//上锁
final void lock() {
acquire(1);
}

protected final boolean tryAcquire(int acquires) {
final Thread current = Thread.currentThread();
int c = getState();
//如果当前没有线程执行并不会直接去抢占
if (c == 0) {
//首先判断其是否有前驱节点,如果没有才去获取锁,(就是看其前面有没有排队,不要插队的意思)
if (!hasQueuedPredecessors() &&
compareAndSetState(0, acquires)) {
setExclusiveOwnerThread(current);
return true;
}
}
//线程重入设置重入次数
else if (current == getExclusiveOwnerThread()) {
int nextc = c + acquires;
if (nextc < 0)
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
return false;
}

3.释放锁

公平锁和非公平锁的释放都是调用了AbstractQueuedSynchronizer的release方法,继续分析下该方法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
//释放锁
public final boolean release(int arg) {
if (tryRelease(arg)) {
Node h = head;
if (h != null && h.waitStatus != 0)
unparkSuccessor(h);
return true;
}
return false;
}
//Sync.class
protected final boolean tryRelease(int releases) {
//拿到state和arg的差值
int c = getState() - releases;
//如果获取锁线程不是当前线程,抛出异常
if (Thread.currentThread() != getExclusiveOwnerThread())
throw new IllegalMonitorStateException();
boolean free = false;
//如果差值为0表示没有重入,那么就释放锁
if (c == 0) {
free = true;
setExclusiveOwnerThread(null);
}
//重新设置state
setState(c);
return free;
}
//AbstractQueuedSynchronizer
private void unparkSuccessor(Node node) {
// 如果状态为 Node.SIGNAL 尝试重置状态为 0
int ws = node.waitStatus;
if (ws < 0)
compareAndSetWaitStatus(node, ws, 0);
// 找到需要 unpark 的节点, 但本节点从 AQS 队列中脱离, 是由唤醒节点完成的
Node s = node.next;
// 不考虑已取消的节点, 从 AQS 队列从后至前找到队列最前面需要 unpark 的节点
if (s == null || s.waitStatus > 0) {
s = null;
for (Node t = tail; t != null && t != node; t = t.prev)
if (t.waitStatus <= 0)
s = t;
}
//如果找到需要唤醒的节点,那么唤醒它
if (s != null)
LockSupport.unpark(s.thread);
}

4.公平锁和非公平锁区别

  我们可以看到在非公平锁上锁时会先设置state,公平锁上锁时则会先去读state状态,最后才回去更新这个值;这里巧妙的利用了volatile关键字的内存可见性。
  公平锁上锁是先获取state,即在该操作后面会加上loadload屏障(禁止下面的普通读和volatile读重排序)和loadstore屏障(禁止下面的普通写和上面的volatile读重排序);最后才是设置state状态,即在该操作前面加上storestore屏障(禁止上面普通写和volatile写重排序)操作后面加上storeload屏障(禁止下面可能存在的volatile读写和当前的volatile写重排序)
  非公平上锁同上面一样分析,即二者都强保证了对state字段操作的绝对正确性
至此加锁和释放锁的代码基本分析完了

组合方法

0.start run

  1. 直接调用run方法是就只是在主调方法中调用了run,且只有一个线程;
  2. 当调用start方法是会启动一个新的线程,并执行run方法中的代码

1.wait notify/notifyAll

wait:
    1. 我们知道Monitor机制中有个Owner属性,其代表当前活跃的线程,当Owner不满足运行条件时调用wait方法,进入WaitSet中,状态为WAITING
    2. WAITING状态的线程需要通过notify或者notifyAll来唤醒,但是唤醒之后还需进入EntryList中同其它线程竞争CPU时间片
    3. 即wait方法会放弃Owner
notify/notifyAll:
    唤醒WaitSet中等待的线程

3.sleep yield

sleep:
1. 调用 sleep 会让当前线程从 Running 进入 Timed Waiting 状态(阻塞),此时的阻塞是在Owner即不会释放锁
2. 其它线程可以使用 interrupt 方法打断正在睡眠的线程,这时 sleep 方法会抛出 InterruptedException
3. 睡眠结束后的线程未必会立刻得到执行
4. 建议用 TimeUnit 的 sleep 代替 Thread 的 sleep 来获得更好的可读性

yield:
1. 调用 yield 会让当前线程从 Running 进入 Runnable 就绪状态,然后调度执行其它线程
2. 具体的实现依赖于操作系统的任务调度器
3. 临时暂停当前正在执行的线程,来让有同样优先级的正在等待的线程有机会执行;如果没有正在等待的线程,或者所有正在等待的线程的优先级都比较低,那么该线程会继续运行

两种锁区别

  从上述分析我们已经可以很明了的看出两种锁的区别了

三、工具

线程池

1.ThreadPoolExecutor

源码剖析

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
public class ThreadPoolExecutor{

//线程池状态和线程池数量 Integer共32位 高三位代表线程池状态 低29位代表线程池数量
private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
// 线程池状态
// 111 -> 接收新的任务且会处理队列中的任务
private static final int RUNNING = -1 << COUNT_BITS;
// 000 -> 不会接收新的任务,但会处理队列中的任务
private static final int SHUTDOWN = 0 << COUNT_BITS;
// 001 -> 不接收新任务,不处理队列中的任务,停止正在执行的任务
private static final int STOP = 1 << COUNT_BITS;
// 010 -> 任务全部执行完毕,活动线程数为0,即将进入 TERMINATED状态
private static final int TIDYING = 2 << COUNT_BITS;
// 011 -> 所有任务都终止了
private static final int TERMINATED = 3 << COUNT_BITS;
//构造函数
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler) {
if (corePoolSize < 0 ||
maximumPoolSize <= 0 ||
maximumPoolSize < corePoolSize ||
keepAliveTime < 0)
throw new IllegalArgumentException();
if (workQueue == null || threadFactory == null || handler == null)
throw new NullPointerException();
this.acc = System.getSecurityManager() == null ?
null :
AccessController.getContext();
//核心线程数
this.corePoolSize = corePoolSize;
//最大线程数
this.maximumPoolSize = maximumPoolSize;
//阻塞队列
//如果队列选择了有界队列,那么任务超过了队列大小时,会创建 maximumPoolSize - corePoolSize 数目的线程来救急
this.workQueue = workQueue;
//存活时间 - 针对救急线程即 个数:maximumPoolSize-corePoolSize ,也就是超过核心线程个数之外创建的就是救急线程
this.keepAliveTime = unit.toNanos(keepAliveTime);
//线程工厂 主要是为了修饰线程
this.threadFactory = threadFactory;
//拒绝策略
//1.AbortPolicy 让调用者抛出 RejectedExecutionException 异常,这是默认策略
//2.CallerRunsPolicy 让调用者运行任务
//3.DiscardPolicy 放弃本次任务
//4.DiscardOldestPolicy 放弃队列中最早的任务,本任务取而代之
this.handler = handler;
}
//任务执行
//1) 检查core线程池数量<corePoolSize数量,是,可以提交任务或新建线程执行任务 。
//2)如果corePoolSize线程数量已使用,如果队列容量未满,则加入队列。
//3)队列已满,创建maximumPoolSize线程数量执行;如果失败则执行关闭线程池或者拒绝策略
public void execute(Runnable command) {
if (command == null)
throw new NullPointerException();
int c = ctl.get();
//计算线程数量,是否小于核心线程数
if (workerCountOf(c) < corePoolSize) {
//如果小于核心线程数,那么就创建一个核心线程
if (addWorker(command, true))
return;
c = ctl.get();
}
//判断线程池运行状态,工作队列是否有空间
if (isRunning(c) && workQueue.offer(command)) {
int recheck = ctl.get();
//如果线程池不处于RUNNING,那就移除该线程,并执行拒绝策略
if (! isRunning(recheck) && remove(command))
reject(command);
//如果线程池正在运行且核心线程数为0,那么就加入一个线程入队列,此处仅针对corePoolSize为0的情况
else if (workerCountOf(recheck) == 0)
addWorker(null, false);
}
//如果线程池满了或者状态为非RUNNING状态,那么就将其加入救急线程
else if (!addWorker(command, false))
//添加失败,执行拒绝侧率
reject(command);
}

//将线程加入队列中
//@param core 是否是核心线程
private boolean addWorker(Runnable firstTask, boolean core) {
retry:
for (;;) {
//获取线程状态
int c = ctl.get();
int rs = runStateOf(c);

// 如果线程状态为已关闭或者正在关闭,提交任务为空,队列不为空直接返回失败
if (rs >= SHUTDOWN &&
! (rs == SHUTDOWN &&
firstTask == null &&
! workQueue.isEmpty()))
return false;

for (;;) {
int wc = workerCountOf(c);
//线程数据大于总容量,或者大于(core=true)核心线程及(core=false)最大线程数,则直接返回失败
if (wc >= CAPACITY ||
wc >= (core ? corePoolSize : maximumPoolSize))
return false;
//原子++核心线程数,成功跳出循环,
if (compareAndIncrementWorkerCount(c))
break retry;
//失败检查状态,自旋
c = ctl.get(); // Re-read ctl
if (runStateOf(c) != rs)
continue retry;
// else CAS failed due to workerCount change; retry inner loop
}
}

//成功添加核心线程
boolean workerStarted = false;
boolean workerAdded = false;
Worker w = null;
try {
//创建新的线程或者复用之前线程,底层依赖于AQS
w = new Worker(firstTask);
final Thread t = w.thread;
if (t != null) {
//上锁
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
//获取线程状态
int rs = runStateOf(ctl.get());
//如果线程池在运行 或者 线程池关闭了但线程为空
if (rs < SHUTDOWN ||
(rs == SHUTDOWN && firstTask == null)) {
//如果添加的线程时活跃线程,抛出异常
if (t.isAlive()) // precheck that t is startable
throw new IllegalThreadStateException();
//将线程加入工作线程队列
workers.add(w);
int s = workers.size();
if (s > largestPoolSize)
largestPoolSize = s;
workerAdded = true;
}
} finally {
mainLock.unlock();
}
//加入成功启动线程
if (workerAdded) {
//即执行Worker中的run方法,其底层是执行添加的线程或者从队列中取出来的线程
t.start();
workerStarted = true;
}
}
} finally {
//失败处理:移除线程,线程数原子--,尝试关闭线程池
if (! workerStarted)
addWorkerFailed(w);
}
return workerStarted;
}
}


Semaphore,CountDownLatch

该关键字底层实现跟ReentrantLock差不多,都是基于AQS的公平锁和非公平锁,主要是用来原子的操作state状态


Q1:Ringbus + MESI是什么?

A1:多个CPU核心通过ringbus连到一起,每个核心都维护自己的Cache的状态。如果对于同一份内存数据在多个核里都有cache,则状态都为S(shared);一旦有一核心改了这个数据(状态变成了M),其他核心就能瞬间通过
ringbus感知到这个修改,从而把自己的cache状态变成I(Invalid),并且从标记为M的cache中读过来。同时,这个数据会被原子的写回到主存。最终,cache的状态又会变为S。