本笔记来自于 书籍:Java并发编程的艺术

  • 第一章 并发编程的挑战
  • 第二章 Java 并发机制的底层实现原理
  • 第三章 Java 内存模型
  • 第四章 Java 并发编程基础
  • 第五章 Java 中的锁
  • 第六章 Java 并发容器和框架
  • 第七章 Java 中的 13 个原子操作类
  • 第八章 Java 中的并发工具类
  • 第九章 Java 中的线程池
  • 第十章 Executor 框架
  • 第十一章 Java 并发编程实践

第一章 Java 并发编程的挑战

说明并发编程的世界中可能遇到的哪些问题,以及如何解决。

并发的目的是让程序运行得更快,但并不是更多的线程就能让程序最大限度地并发执行。在多线程情景下,面临诸多挑战,如上下文切换、死锁、受限于软硬件资源等问题。

1.1 上下文切换

Cpu 通过为每个线程分配 cpu 时间片来实现多线程执行代码,因为时间片非常的短,所以cpu 需要通过不停地切换线程执行,让我们感觉多个线程是在同时执行的。

但是,在切换之前cpu会保存上一个任务的状态,以便下次切换回任务时,会加载任务的状态。

所以任务从保存到再加载的过程就是一次上下文切换。

很明显,当程序中的多个线程存在大量的上下文切换,程序执行的速度未必会比串行来得快。

1.1.1 减少上下文切换

  • 无锁并发编程。多线程竞争锁时,会引起上下文切换,所以多线程处理数据时,可以通过将数据的ID 按照 Hash算法取模分段,不同线程处理不同段的数据的方式,避免使用锁。
  • CAS 算法。Java 的 Atomic 包使用CAS 算法来更新数据,不需要加锁
  • 使用最少线程。避免创建不需要的线程,比如任务很少,但是创建了很多线程来处理,这样会造成大量线程都处于等待状态
  • 协程。在单线程里实现多任务的调度,并在单线程里维持多个任务间的切换。

1.2 死锁

死锁,即多个线程因某些问题无法释放锁,导致多个线程之间互相等待的场景。

避免死锁的常见方法:

  • 避免一个线程同时获取多个锁
  • 避免一个线程在锁内同时占用多个资源,尽量保证每个锁只占用一个资源
  • 尝试使用定时锁,使用 lock.tryLock(timeout) 来替代使用内部锁机制
  • 对于数据库锁,加锁和解锁必须在一个数据库连接里,否则会出现解锁失败的情况。

1.3 资源限制的挑战

资源限制是指程序受限于软硬件的条件,而不能达到预期的处理效果。主要包含有:带宽的上传/下载速度、硬盘读写速度、CPU处理速度。数据库连接数、socket连接数等。

引发的问题:并发执行时,因为增加了上下文切换和资源调度时间的原因。程序运行时可能会更慢。

解决方法:使用ODPS、Hadoop或者搭建服务器集群,不同机器处理不同的数据。可以通过 “数据ID%机器数”,计算出一个机器编号,然后由对应编号的机器处理这笔数据。

在资源受限的情况下进行并发编程:需要根据不同的资源限制调整程序的并发度。


第二章 Java 并发机制的底层实现原理

Java 代码在编译之后会变成 Java 字节码,字节码被类加载器加载到 JVM 里,JVM 执行字节码,最终转化为汇编指令在 CPU 上执行。

Java 中所使用的并发机制依赖于 JVM 的实现和 CPU 的指令。

2.1 volatile 的应用

volatile 是轻量级的 synchronized,它在多处理器开发中保证了共享变量的“可见性”。即当一个线程修改了一个共享变量时,另一个线程能够读到这个修改的值。

volatile 的官方定义:Java 编程语言允许线程访问共享变量,为了确保共享变量能被准确和一致地更新,线程应该确保通过排他锁单独获得这个变量。

volatile 实现相关的 CPU 术语:

术语 英文 术语描述
内存屏障 memory barriers 是一组处理器指令,用于实现对内存操作的顺序限制
缓冲行 cache line CPU 高速缓存中可以分配的最小存储单位。处理器填写缓存行时会加载整个缓存行,现代CPU需要执行几百次CPU指令
原子操作 atomic operations 不可中断的一个或一系列操作
缓存命中 cache hit 如果进行高速缓存行中操作的内存位置仍然是下次处理器访问的地址时,处理器将从缓存中读取操作数,而不是从内存中读取
写命中 write hit 当处理器将操作数写回到一个内存缓存的区域时,它首先会检查这个缓存的内存地址是否在缓存行中,如果存在一个有效的缓存行,则处理器将这个操作数写回到缓存,而不是写回到内存
写缺失 write misses the cache 一个有效的缓存行被写入到不存在的内存区域

volatile 修改的变量,在进行修改时,会引发两件事:

  1. 通过 Lock 前缀指令,将当前处理缓存行的数据写回到系统内存
  2. 写回内存的操作会使其他 CPU 里缓存了该内存地址的数据无效,当处理器对这个数据进行修改操作时,会重新从系统内存中把数据读到处理器缓存中。

2.2 synchronized 的实现原理与应用

synchronized 实现同步的基础:Java 中的每一个对象都可以作为锁。

  • 对于普通同步方法,锁是当前实例对象
  • 对于静态同步方法,锁是当前类的 Class 对象
  • 对于同步方法块,锁是 Synchronized 括号里配置的对象

Synchronized 在 JVM 里的实现原理:JVM基于进入和退出 Monitor 对象来实现方法同步和代码块同步,但是两者的实现细节不一样。代码块同步是使用 monitorenter 和 monitorexit 指令实现的。

monitorenter 指令是在编译后插入到同步代码块的开始位置,而 monitorexit 是插入到方法结束处和异常处,JVM保证每个 monitorenter 必须对应 monitorexit。线程执行到 monitorenter 指令时,会尝试获取monitor的所有权,即尝试获得对象的锁。

2.2.1 锁的升级与对比

Java1.6 为了减少获得锁和释放锁带来的性能消耗,引入了“偏向锁”和“轻量级锁”。

在1.6中,锁共有4种状态,级别从低到高依次是:无锁状态、偏向锁状态、轻量级锁状态和重量级锁状态。

锁可以升级但是不能降级,这种策略的目的是为了提高获得锁和释放锁的效率。

1. 偏向锁

大多数情况下,锁不仅不存在多线程竞争,而且总是由同一线程多次获得,为了让获得锁的代价更低而引入了偏向锁。

当一个线程访问同步块并获得锁时,会在对象头和栈帧中的锁记录里存储锁偏向的线程ID,以后该线程进入和退出同步块时不需要进行CAS操作来加锁和解锁,只需要检查MarkWord 里是否存储着指向当前线程的偏向锁。

即线程获取资源之后,会在资源上记录当前线程地址,并不会释放,当出现竞争时才会释放锁。

-XX:-UseBiasedLocking=false 关闭偏向锁

-XX:BiasedLockingStartupDelay=0 关闭延迟(默认情况下,偏向锁在程序启动几秒后才会激活)

2.轻量级锁

加锁:线程在执行同步块之前,JVM会将对象头中的MarkWord 复制到当前线程的锁记录中。然后线程尝试使用 CAS 将对象头中的 MarkWord 替换为指向锁记录的指针。如果成功,线程获得锁;失败,表示其他线程竞争锁,当前线程尝试使用 自旋(即当前线程不停地尝试请求资源,直到成功) 来获取锁。

解锁:解锁时,使用 CAS 操作将 MarkWord 替换回对象头。成功,则表示没有竞争发生;失败,则表示锁存在竞争,此时锁会膨胀成重量级锁。

争夺锁导致的锁膨胀

图中可以看出,自旋会消耗CPU,为了避免无用的自旋,一旦锁升级为重量级锁,就不会再降级为轻量级锁。重量级锁的所有线程在请求锁资源时,都会进入阻塞状态,只有当锁释放时,才会进行新一轮的夺锁之争。

锁的优缺点对比:

优点 缺点 适用场景
偏向锁 加锁解锁不需要额外消耗,和执行非同步方法相比,仅纳秒级差距 如果线程间存在锁竞争,会带来锁撤销的消耗 只有一个线程访问同步块的场景
轻量级锁 竞争的线程不会阻塞,提高了程序的响应速度 如果始终得不到锁竞争,会使用自旋消耗CPU 追求响应时间 同步块执行速度非常快
重量级锁 线程竞争不使用自旋,不会消耗CPU 线程阻塞,响应时间缓慢 追求吞吐量 同步块执行速度较长

2.3 原子操作的实现原理

原子(atomic):“不能被进一步分隔的最小粒子”。

原子操作(atomic operation):“不可被中断的一个或一系列操作”。

原子操作的相关术语:

术语 英文 解释
缓存行 cache line 缓存的最小操作单位
比较并交换 Compare and Swap CAS,比较旧值与新值。当旧值发生改变之后才进行替换
CPU流水线 CPU pipeline 在CPU内由56个不同功能的电路单元组成一条指令处理流水线,然后将一条x86指令分为56步后再由这些电路单元分别执行,这样就能在一个CPU时钟周期完成一条指令,从而提高CPU的运算速度
内存顺序冲突 Memory order violation 内存顺序冲突一般由假共享引起,即多个CPU同时修改一个缓存行的不同部分,引起其中一个CPU操作无效,当出现该冲突时,CPU必须清空流水线

处理器如何实现原子操作:

  1. 通过总线锁保证原子性:
    1. 即使用处理器提供 LOCK # 信号,当一个处理器在总线上输出此信号时,其他处理器的请求将被阻塞住,该处理器则独占内存。
    2. 缺点:总线锁是把CPU和内存之间的通信锁住了,这使得锁定期内,其他处理器不能操作其他内存地址的数据,因此开销很大。
  2. 通过缓存锁定来保证原子性:
    1. 指内存区域如果被缓存在处理器的缓存行中,并且在 Lock 操作期间被锁定,那么当它执行锁操作回写到内存时,处理器直接修改内部的内存地址,并允许它的缓存一致性机制来保证操作的原子性。
    2. 缓存一致性:该机制会阻止同时修改由两个以上处理器缓存的内存区域数据,当其他处理器回写已被锁定的缓存行的数据时,会使缓存行无效。
    3. 不能使用缓存锁定的场景:a. 操作的数据跨缓存行时,b. 处理器不支持缓存锁定。这两种情况都被调用总线锁定。

Java 如何实现原子操作:

在 Java 中可以通过 循环CAS 的方式来实现原子操作。

从 Java 1.5 开始,JDK提供了一些原子包装类来支持原子操作。

CAS 实现原子操作的三大问题

  1. ABA 问题
    1. 原因:因为CAS 在操作值时,先比较旧值是否发生了变化,如果发生变化,再去修改该值。问题则出现在,如果A = 1;A=2;A=1;此时CAS检查时,并未发现A的变化,但是实际上却变化了。
    2. 解决思路:加上版本号即A1=1,A2=2,A3=1
    3. JDK 中提供了一个类 AtomicStampedReference,通过检查值和预期标记的方法来判断是否需要更新
  2. 循环时间长开销大
    1. 原因:自旋CAS如果长时间不成功,会给CPU带来非常大的执行开销。
    2. 解决方法:如果JVM能支持处理器提供的pause指令,则效率会有一定提升。pause指令能延迟流水线执行指令,避免在退出循环时因内存顺序冲突而引起CPU流水线被清空,从而提升CPU执行效率。
  3. 只能保证一个共享变量的原子操作。
    1. 原因:当对一个共享变量执行操作时,可以使用循环 CAS,但是对多个共享变量时,就无法保证操作的原子性,此时可以用锁。
    2. 解决方法:将多个共享变量合为一个共享变量来操作,如 i=2,j=a => ij=2a
    3. JDK解决方法,提供了AtomicReference类来保证引用对象之间的原子性,即可以把多个变量放在一个对象中进行CAS操作。

第三章 Java 内存模型

主要包含4个部分:

  1. Java 内存模型的基础
  2. Java 内存模型中的顺序一致性,即重排序和顺序一致性内存模型
  3. 同步原语,3个同步原语的内存语义和重排序规则在处理器中的实现
  4. Java 内存模型的设计

3.1 Java 内存模型的基础

3.1.1 并发编程模型的两个关键问题

在并发编程中,需要处理两个关键问题:1. 线程之间如何通信;2.线程之间如何同步。

__通信__,是指线程间以何种机制交换信息,如 共享内存消息传递

共享内存:在此模型中,线程之间共享程序的公共状态,通过写 - 读内存中的公共状态进行隐式通信。

消息传递:在此模型中,线程间没有公共状态,只能通过发送消息来显式进行通信。

__同步__,指程序中用于控制不同线程间操作发生相对顺序的机制。

共享内存:在此模型中,同步是显式进行的,程序员必须显式指定某个方法或某段代码需要在线程之间互斥执行。

消息传递:在此模型中,由于消息发送必须在消息接收之前,因此同步是隐式的。

3.1.2 Java 内存模型的抽象结构

在 Java 中,所有实例域、静态域、数组元素都存储在堆内存中,堆内存在线程之间共享。局部变量、方法定义参数、异常处理参数不会在线程之间共享,它们不会有内存可见性问题,也不受内存模型的影响。

Java 线程之间的通信由 Java 内存模型(JMM) 控制,JMM决定一个线程对共享变量的写入何时对另一个线程可见。

JMM定义了线程和主内存之间的抽象关系:线程之间的共享变量存储在主内存中,每个线程都有一个私有的本地内存(抽象概念),本地内存中存储了该线程以读/写共享变量的副本。

本地内存涵盖了缓存、写缓冲区、寄存器以及其他硬件和编译器优化。

Java 内存模型的抽象结构示意图

线程A与B之间通信,需要经历如下两个步骤:

  1. 线程A把本地内存A中更新过的共享变量刷新到主内存中
  2. 线程B到主内存中读取线程A之前已更新过的共享变量

总体来看,这两步的实质是线程A向线程B通信,且这个通信过程必须要经过主内存。

3.1.3 指令重排序

在执行程序时,为了提高性能,编译器和处理器常常会对指令做重排序。

指令重排的顺序:源代码 –> 1.编译器优化重排序 –> 2. 指令集并行重排序 –> 3. 内存系统重排序 –> 最终执行的指令序列

为了保证内存可见性,Java 编译器在生成指令序列的适当位置会插入内存屏障指令来禁止特定类型的处理器重排序。

3.1.4 Happens-Before

happens-before 用来阐述操作之间的内存可见性,在JMM中,如果一个操作执行的结果需要对另一个操作可见,那么这两个操作之间必须要存在 happens-before 关系。这两个操作可以在不同的线程中。

3.2 重排序

重排序是指编译器和处理器为了优化程序性能而对指令序列进行重新排序的一种手段。

3.2.1 数据依赖性

如果两个操作访问同一个变量,且这两个操作中有一个为写操作,此时这两个操作之间就存在数据依赖性。

编译器和处理器在重排序时,会遵守数据依赖性,不会改变存在数据依赖性关系的两个操作的执行顺序。

3.2.2 as-if-serial

as-if-serial:不管怎么重排序(提高并行度),程序的执行结果不能改变。

3.2.3 结论

在不改变程序结果的前提下,尽可能提高并行度。

3.3 顺序一致性

顺序一致性内存模型是一个理论参考模型,在设计时,处理器的内存模型和编程语言的内存模型都会以顺序一致性内存模型作为参照。

顺序一致性:即程序的执行结果与该程序在顺序一致性内存模型中的执行结果相同。

3.3.1 顺序一致性内存模型

顺序一致性内存模型具有两大特性:

  1. 一个线程中的所有操作必须按照程序的顺序来执行
  2. (不管程序是否同步)所有线程都只能看到一个单一的操作执行顺序。在顺序一致性内存模型中,每个操作都必须原子执行且立刻对所有线程可见。

3.4 volatile 的内存语义

当声明共享变量为 volatile 后,可以看成是使用同一个锁对这些单个读写操作做了同步。

volatile 具有以下特性:

  1. 可见性。对一个volatile 变量的读,总是能看到对这个volatile 变量最后的写入。
  2. 原子性。对任意单个 volatile 变量的读写具有原子性,但类似于 volatile++ 这种复合操作不具有原子性。

3.4.1 volatile 的读写

写:当写一个 volatile 变量时,JMM会把该线程对应的本地内存中的共享变量值刷新到主内存中。

读:当读一个 volatile 变量时,JMM 会把该线程对应的本地内存置为无效,线程从主内存中读取共享变量。

3.4.2 volatile 内存语义

基于保守策略的 JMM 内存屏障插入策略:

  1. 在每个 volatile 写操作的前面插入一个 StoreStore 屏障
    1. StoreStore 屏障,将保障上面所有的普通写,在volatile 写之前刷新到主内存
  2. 在每个 volatile 写操作的后面插入一个 StoreLoad 屏障
    1. StoreLoad 屏障,避免volatile写与后面可能有的 volatile 读写操作重排序
  3. 在每个 volatile 读操作的后面插入一个 LoadLoad 屏障
    1. LoadLoad 屏障,用来禁止处理器把上面的 volatile 读与下面的普通读重排序
  4. 在每个 volatile 读操作的后面插入一个 LoadStore 屏障
    1. LoadStore 屏障,用来禁止处理器把上面的 volatile 读与下面的普通写重排序

volatile 仅能保证对单个 volatile 变量的读写具有原子性,而锁的互斥执行能确保整个临界区代码的执行具有原子性。在功能上,锁比 volatile 更强大;在可伸缩性和执行性能上,volatile 更有优势。

3.5 锁的内存语义

锁是 Java 并发编程中最重要的同步机制。锁除了让临界区互斥执行外,还可以让释放锁的线程向获取同一个锁的线程发送消息。

3.5.1 锁的获取与释放

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
class MonitorExample{
int a = 0;

// 根据程序次序规则,1 happens-before 2, 2 happens-before 3 ...
public synchronized void writer(){ // 1
a++; // 2
} // 3

// 根据监视器锁规则,3 happens-before 4
public synchronized void reader(){ // 4
int i = a; // 5 根据happens-before 的传递性,2 happens-before 5
} // 6

public static void main(String[] args){
MonitorExample me = new MonitorExample();

new Thread(() => {
me.writer();
}).run(); // 当线程释放锁时,JMM 会把线程对应的本地内存中的共享变量刷新到主内存中

new Thread(() => { // 当线程获取锁时,JMM 会把该线程对应的本地内存置为无效,从而使得被监视器保护的临界区代码必须从主内存中读取共享变量
me.reader();
}).run();
}
}

3.5.2 锁的内存语义

以 ReentrantLock 为例,分为公平锁和非公平锁。

公平锁

加锁方法 lock() 调用轨迹:

  1. ReentrantLock:lock()
  2. FairSync:lock()
  3. AbstractQueuedSynchronizer:acquire(int arg)
  4. ReentrantLock: tryAcquire(int acquires)

在获取锁时,会首先读 volatile 变量 state。

解锁方法 unlock()调用轨迹:

  1. ReentrantLock:unlock()
  2. AbstractQueuedSynchronizer:release(int arg)
  3. Sync: tryRelease(int releases)

在释放锁的最后,会写 volatile 变量 state

非公平锁

加锁:

  1. ReentrantLock: lock()
  2. NonfairSync: lock()
  3. AbstractQueuedSynchronizer: compareAndSetState(int expect,int update)

第三步时,会以原子操作的方式更新 state 变量。而 CAS 同时具有 volatile 读和 volatile 写的内存语义。即编译器不会对 CAS 与 CAS 前面和后面的任意内存操作重排序。

解锁同加锁,以CAS 操作 state。

公平锁与非公平锁的区别

公平锁:加锁前先检查是否有排队等待的线程,优先排队等待的线程,FIFO(先入先出)

非公平锁:加锁时不考虑排队等待问题,直接尝试获取锁,获取不到自动到队尾等待

3.5.4 concurrent 包的实现

一个通用化的实现方式:

  1. 首先,声明共享变量为 volatile
  2. 然后,使用 CAS 的原子条件更新来实现线程之间的同步
  3. 同时,配合以 volatile 的读写和 CAS 具有的 volatile 读写的内存语义来实现线程之间的通信。

AQS,非阻塞数据结构和原子变量类(java.util.concurrent.atomic 包中的类),这些类都是使用这种模式来实现的。

concurrent包的实现示意图

3.6 happens-before 的定义

  1. 如果一个操作 happens-before 另一个操作,那么第一个操作的执行结果将对第二个操作可见,且第一个操作的执行顺序在第二个操作之前
  2. 两个操作之间存在 happens-before 关系,并不意味着 Java 平台的具体实现必须按照 happens-before 关系指定的顺序来执行。但是JMM 会保证执行的结果一致性

3.6.1 happens-before 规则

  1. 程序顺序规则:一个线程中的每个操作,happens-before 于该线程中的任意后续操作
  2. 监视器锁规则:对一个锁的解锁,happens-before 于任意后续对这个 volatile 域的读
  3. volatile 变量规则,对一个 volatile域的写,happens-before 于任意后续对这个volatile 域的读
  4. 传递性:A happens-before B,B happens-before C => A happens-before C
  5. start() 规则:如果线程A执行 ThreadB.start() ,那么A线程的 ThreadB.start() 操作 happens-before于线程B中的任意操作
  6. join() 规则:如果线程A执行ThreadB.join() 并成功返回,那么线程B中的任意操作 happens-before 于线程A 从ThreadB.join() 操作成功返回

3.7 双重检查锁定

第一种的同步实例化方法,当多个线程同时调用该方法时,会导致加锁和解锁的操作频发,带来性能的问题。

1
2
3
4
5
6
7
8
9
10
public class SafeLazyInit{
private static Instance instance;

// 由于 getInstance() 方法做了同步处理,synchronized 将导致性能开销。
public synchronized static Instance getInstance(){
if(instance == null)
instance = new Instance();
return instance;
}
}

第二种使用双重检查机制,在第一次检查通过后,再进行加锁。但是由于 JMM 的重排序存在,执行结果可能并不如预期一致。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
// 使用双重锁机制,来降低同步的开销。
public class DoubleCheckLocking{
private static Instance instance;

public static Instance getInstance(){
// 如果第一次检查instance 不为 null,那么就不需要执行下面的加锁和解锁操作,可以大幅降低synchronized 带来的性能开销
if(instance == null){
synchronized (DoubleCheckLocking.class){
if(instance == null)
instance = new Instance(); // 但是,这里存在问题
}
}
return instance;
}
}

instance = new Instance() 应该分为具体的三步:1. 分配内存空间 2. 初始化对象 3. 将 instance 指向该内存空间 ,第二步和第三步之间可能出现指令重排。即存在instance尚未初始化,但是分配了内存空间的多线程问题。

第三种,基于 volatile 解决方案,将声明对象的引用设置为 volatile 后,代码中的指令重排在多线程环境中将被禁止。

1
2
3
4
5
6
7
8
9
10
11
12
13
public class DoubleCheckLocking{
private volatile static Instance instance;

public static Instance getInstance(){
if(instance == null){
synchronized (DoubleCheckLocking.class){
if(instance == null)
instance = new Instance();
}
}
return instance;
}
}

第四章 Java 并发编程基础

4.1 线程

4.1.1 什么是线程

操作系统在运行一个程序时,会创建一个进程,如一个Java程序就对应一个 Java 进程。

一个进程可以创建多个线程,每个线程都拥有各自的计数器、堆栈和局部变量等属性,并且能够访问共享的内存变量。

处理器在这些线程间高速切换,让使用者感觉是在同时运行。

4.1.2 多线程

使用多线程的优势如下:

  1. 更好的利用多处理器核心
  2. 程序运行更快,以获得更快的响应时间
  3. 更好的编程模型

4.1.3 线程优先级

线程优先级决定了线程能够使用处理器的资源多少,默认优先级为5,优先级范围为 1~10.

针对频繁阻塞的(休眠或者 IO操作)线程需要设置较高优先级,而偏重计算(需要较多CPU时间或者偏运算)的线程则设置较低的优先级,确保处理器不会被独占。

4.1.4 线程的状态

Java 线程的六种生命周期:

状态 说明
NEW 初始状态,线程被构建,但是还未调用 start() 方法
RUNNABLE 运行状态,即操作系统中的就绪和运行两种状态
BLOCKED 阻塞状态,表示线程阻塞于锁
WAITING 等待状态,表示当前线程需要等待其他线程做出一些特定动作(通知或中断)
TIME_WAITING 超时等待状态,表示可以在指定时间自动返回的等待状态
TERMINATED 终止状态,表示线程已执行完毕

Java线程状态变迁

4.1.5 Daemon 线程

当Java 虚拟机中不存在 Daemon 线程时,虚拟机将会退出。

通过 Thread.setDaemon(true) 将线程设置为 Daemon 线程。

4.2 启动、终止线程

通过调用线程的 start() 方法启动,随着 run() 方法的执行完毕,线程随之终止。

4.2.1 中断

中断表示一个运行中的线程是否被其他线程进行了中断操作。

中断操作是一种简便的线程间交互方式,而这种交互方式最适合用来取消或停止任务。

4.2.2 安全地终止线程

通过标识位或者中断操作的方式,能够使线程在终止时有机会去清理资源,而不是武断地将线程停止。

通过标识位和中断来停止线程:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
public class TestRunner implements Runnable {
private long i;
private volatile boolean on = true;
@Override
public void run() {
while(on && !Thread.currentThread().isInterrupted()){
i++;
}
}

public void cancel(){
on = false;
}
}

4.3 线程间通信

Java 支持多个线程同时访问一个对象或者对象的成员变量,由于每个线程可以拥有这个变量的拷贝,所以程序在执行过程中,各线程看到的变量并不一定是最新的。

4.3.1 volatile 和 synchronized 关键字

volatile 关键字:用来修饰的变量,可以使访问该变量的线程均从共享内存中获取。且对共享变量的修改,必须同步刷新回共享内存中。但是过多地使用volatile 会降低程序执行的效率。

synchronized 关键字:用来修饰方法或者同步块,确保多个线程在同一时刻,只能有一个线程处于方法或者同步块中,保证了线程对变量访问的可见性和排他性。

4.3.2 等待/通知机制

等待/ 通知机制,是指一个线程A 调用了对象O 的wait() 方法而进入等待状态。线程B 调用了对象O 的notify()或notifyAll() 方法,线程A 收到通知后从对象O的wait() 方法返回,进而执行后续操作。

等待通知的经典范式:

等待方(消费方):

  1. 获取对象的锁
  2. 如果条件不满足,则调用对象的wait()方法,被通知后仍要检查条件
  3. 条件满足则执行对应的逻辑
1
2
3
4
5
6
synchronized(对象){
while(条件不满足){
对象.wait();
}
处理逻辑
}

通知方(生产者):

  1. 获得对象的锁
  2. 改变条件
  3. 通知所有等待在对象上的线程
1
2
3
4
synchronized(对象){
改变条件
对象.notifyAll();
}

4.3.3 Thread.join()

如果一个线程 A 执行了 thread.join() 语句,其含义是:当前线程 A 等待 thread 线程终止之后才从 thread.join() 返回。

4.4 线程应用实例

4.4.1 等待超时模式

调用一个方法后,如果在等待时间内得到结果则立即返回,否则返回默认结果。

1
2
3
4
5
6
7
8
9
10
11
12
// 伪代码:
public synchronized Object get(long mills) throws InterruptedException {
long future = System.currentTimeMills() + mills;
long remaining = mills;

while((result == null) && remaining > 0){
wait(remaining);
remaining = future - System.currentTimeMills();
}

return result;
}

4.4.2 简单的数据库连接池

4.4.3 基于数据库连接池的Web服务器


第五章 Java中的锁

锁的作用:用来控制多个线程访问共享资源的方式。一个锁能够防止多个线程同时访问共享资源(但有些锁可以允许多个线程并发的访问共享资源,比如读写锁)。

5.1 Lock 接口

Lock 接口提供了与 synchronized 关键字类似的同步功能,只是在使用时需要显示地获取和释放锁。虽然 Lock 接口缺少了 synchronized 方法隐式获取释放锁的便捷性,但是却拥有了锁获取与释放的可操作性、可中断的获取锁以及超时获取锁等多种 synchronized 关键字所不具备的同步特性。

Lock 接口的 API:

方法名 描述
void lock() 获取锁
void lockInterruptibly() throws InterruptedException 可中断的获取锁
boolean tryLock() 尝试非阻塞的获取锁
boolean tryLock(long time, TimeUnit unit) throws InterruptedException 超时获取锁
void unlock() 释放锁
Condition newCondition() 获取等待通知组件

5.2 队列同步器

队列同步器(AbstractQueuedSynchronizer),是用来构建锁或者其他同步组件的基础框架,通过内置 FIFO 队列来完成资源获取线程的排队工作。

同步器是实现锁的关键: – 锁面向使用者,定义了使用者与锁交互的接口,隐藏了实现细节; – 同步器面向锁的实现者,简化了锁的实现方式,屏蔽了同步状态管理、线程的排队、等待与唤醒等底层操作。

5.2.1 同步器接口

队列同步器提供了三个抽象方法供实现类实现:

  1. getState():获取当前同步状态
  2. setState(int newState):设置当前同步状态
  3. compareAndSetState(int expect,int update):使用CAS设置当前状态

同时,同步器提供了多个可重写的方法:

  1. tryAcquire(int arg):独占式获取同步状态
  2. tryRelease(int arg):独占式释放同步状态
  3. tryAcquireShared(int arg):共享式获取同步状态
  4. tryReleaseShared(int arg):共享式释放同步状态
  5. isHeldExclusively():当前同步器是否被线程独占
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.AbstractQueuedSynchronizer;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;

/**
* 使用 Mutex 时并不会直接与内部同步器的实现打交道,而是调用 Mutex 提供的方法.
* 如 获取锁的lock() 方法,只需要在方法实现中调用同步器的 acquire(int arg) 即可。
* 当前线程后去同步状态失败后,会被加入到同步队列中等待
*/
public class Mutex implements Lock {

private static class Sync extends AbstractQueuedSynchronizer {
// 是否处于占用状态
@Override
protected boolean isHeldExclusively() {
return getState() == 1;
}

// 当状态为0时获取锁
@Override
protected boolean tryAcquire(int arg) {
// 通过CAS设置state
if(compareAndSetState(0,1)){
setExclusiveOwnerThread(Thread.currentThread());
return true;
}
return false;
}

// 释放锁,将状态设置为0
@Override
protected boolean tryRelease(int arg) {
if(getState() == 0) throw new IllegalMonitorStateException();
setExclusiveOwnerThread(null);
setState(0);
return true;
}

// 返回一个 Condition ,每个Condition 都包含了一个 condition 队列
Condition newCondition() {
return new ConditionObject();
}
}

private final Sync sync = new Sync();

@Override
public void lock() {
sync.acquire(1);
}

@Override
public void lockInterruptibly() throws InterruptedException {
sync.acquireInterruptibly(1);
}

@Override
public boolean tryLock() {
return sync.tryAcquire(1);
}

@Override
public boolean tryLock(long time, TimeUnit unit) throws InterruptedException {
return sync.tryAcquireNanos(1,unit.toNanos(time));
}

@Override
public void unlock() {
sync.release(1);
}

@Override
public Condition newCondition() {
return sync.newCondition();
}
}

5.2.2 队列同步器的实现分析

1. 同步队列

同步器内部使用一个同步队列(FIFO)来完成同步状态的管理,当线程获取同步状态失败后,会将当前线程及其等待状态等信息构造成为一个节点(Node)并加入同步队列,同时阻塞当前线程。

当同步状态释放时,会将首节点唤醒,再次尝试获取同步状态。

节点(Node)的属性类型与名称描述

属性类型与名称 描述
int waitStatus 等待状态
Node prev 前驱节点
Node next 后继节点
Node nextWaiter 等待队列中的后继节点
Thread 获取同步状态的线程

类似于双向链表

2. 独占式同步状态获取与释放

调用同步器的 acquire(int arg) 方法,可以获取同步状态,JDK 中acquire的实现如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
/**
* Acquires in exclusive mode, ignoring interrupts. Implemented
* by invoking at least once {@link #tryAcquire},
* returning on success. Otherwise the thread is queued, possibly
* repeatedly blocking and unblocking, invoking {@link
* #tryAcquire} until success. This method can be used
* to implement method {@link Lock#lock}.
*
* @param arg the acquire argument. This value is conveyed to
* {@link #tryAcquire} but is otherwise uninterpreted and
* can represent anything you like.
*/
public final void acquire(int arg) {
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}

其主要逻辑为:1. tryAcquire 尝试获取同步状态,如果失败的话则构造同步节点,并通过addWirter方法将该节点加入到同步队列的尾部;2. 调用 acquireQueued 方法,使该节点以死循环的方式获取同步状态,如果获取不到则阻塞节点中的线程。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
/**
* Inserts node into queue, initializing if necessary. See picture above.
* @param node the node to insert
* @return node's predecessor
*/
private Node enq(final Node node) {
for (;;) {
Node t = tail;
if (t == null) { // Must initialize
if (compareAndSetHead(new Node()))
tail = head;
} else {
node.prev = t;
if (compareAndSetTail(t, node)) {
t.next = node;
return t;
}
}
}
}

/**
* Creates and enqueues node for current thread and given mode.
*
* @param mode Node.EXCLUSIVE for exclusive, Node.SHARED for shared
* @return the new node
*/
private Node addWaiter(Node mode) {
Node node = new Node(Thread.currentThread(), mode);
// Try the fast path of enq; backup to full enq on failure
Node pred = tail;
if (pred != null) {
node.prev = pred;
if (compareAndSetTail(pred, node)) {
pred.next = node;
return node;
}
}
enq(node);
return node;
}

独占式同步状态获取流程

3. 共享式同步状态获取与释放

共享式与独占式最主要的区别在于,同一时刻能否有多个线程同时获取到同步状态。

主要方法有:acquireShared(int arg)、tryAcquireShared(int arg)、releaseShared(int arg)

4. 独占式超时获取同步状态

同步器的 doAcquireNanos(int arg, long nanosTimeout) 方法可以超时获取同步状态,即在指定的时间段内获取同步状态,成功返回true。

独占式超时获取同步状态流程

5.3 重入锁 ReentrantLock

重入锁,表示该锁能够支持一个线程对资源的重复加锁。即 ReentrantLock 在调用 lock() 方法时已经获取到锁的线程,再次调用 lock() 方法依旧能够获取锁而不被阻塞。

此外,重入锁还支持获取锁时的公平性与非公平性选择。

synchronized 关键字隐式的支持重入

1. 实现重进入

重进入,指任意线程在获取锁之后能够再次获取该锁而不被阻塞,实现该特性需要解决两个问题:

  1. 线程再次获取锁:即锁需要识别获取的锁是否为当前占据锁的线程。
  2. 锁的最终释放:通过计数自增的方式表示线程重复n次加锁,锁释放也是同理。

2. 公平与非公平获取锁的区别

如果一个锁是公平的,那么锁的获取顺序就应该符合FIFO原则。

一般情况下,非公平锁的效率是要高于公平锁的。但是非公平锁可能使线程“饥饿”,即先来的线程因优先级低一直处于等待状态。

5.4 读写锁

上面的锁基本都是排他锁,在同一时刻只允许一个线程进行访问。而读写锁能在同一时刻允许多个__读线程__访问,但在__写线程__访问时,所有__读、写线程均被阻塞__。

5.4.1 ReentrantReadWriteLock 的特性与API

特性 说明
公平性选择 支持公平与非公平的锁获取方式
重进入 支持重进入,即写锁能支持一个线程多次获取,读锁也是
锁降级 写锁能够降级为读锁
方法名称 描述
int getReadLockCount() 返回当前读锁被获取的次数
int getReadHoldCount() 返回当前线程获取读锁的次数
boolean isWriteLocked() 判断写锁是否被获取
int getWriteHoldCount() 返回当前写锁被获取的次数

5.4.2 读写锁的实现分析

1. 读写状态的设计

读写锁同样依赖于自定义同步器来实现同步功能,但是读写锁的同步器需要维护__多个读线程和一个写线程__。

2. 写锁的获取与释放

写锁是一个支持重入的排他锁,只能被一个线程获取及重入。

如果当前线程在获取写锁时,读锁已经被获取(state!=0)或者该线程不是获取写锁的线程,则进入等待状态。

写锁的释放与 ReentrantLock 类似,都是维护一个 __写状态属性__,为0时表示写锁被释放,同时写锁的修改对其后的读锁可见。

3. 读锁的获取与释放

读锁是一个支持重入的共享锁,能被多个线程同时获取。

如果当前没有其他写线程,则读锁总会被成功获取。

读锁的每次释放均减少读状态。

4. 锁降级

__写锁降级为读锁__,指当前拥有写锁的线程,再获取读锁,随后释放之前拥有的写锁的过程。

ReentrantReadWriteLock 不支持锁升级。

5.5 LockSupport 工具

当需要阻塞或唤醒一个线程的时候,会使用 LockSupport 工具类来完成相应的工作。

方法名称 描述
void park() 阻塞当前线程
void parkNanos(long nanos) 阻塞当前线程nanos纳秒
void parkUntil(long deadline) 阻塞当前线程知道deadline
void unpark(Thread thread) 唤醒处于阻塞状态的线程

5.6 Condition 接口

Object 对象拥有一组监视器方法,包含:wait()、wait(long timeout)、notify()及notifyAll() ,这些方法与 synchronized 关键字配合,实现 等待/通知模式。

Condition 也提供了类似的方法,与Lock 配合实现 等待/通知 模式。


第六章 Java 并发容器和框架

6.1 ConcurrentHashMap

ConcurrentHashMap 是线程安全且高效的 HashMap。

6.1.1 为什么使用 ConcurrentHashMap

1. HashMap 线程不安全

在多线程环境下,使用 HashMap 进行 put 操作会引起死循环,导致 CPU 利用率接近 100%,所以在并发情况下不能使用 HashMap。

HashMap 在并发执行 put 操作时会引起死循环,是因为多线程会导致 HashMap 的 Entry 链表形成环形数据结构,一旦形成环形数据结构,Entry 的 next 节点永不为空,就会产生死循环获取 Entry。

2. HashTable 效率低下

HashTable 容器使用 synchronized 来保证线程安全,但在线程竞争激烈的情况下 HashTable 效率低下。

因为当一个线程访问 HashTable 的同步方法,其他线程再访问时,会进入阻塞或轮询状态。

3. ConcurrentHashMap 的锁分段技术

HashTable 在线程竞争激烈时的效率低下原因是所有访问 HashTable 的线程都必须竞争同一把锁。

而ConcurrentHashMap 的锁分段技术,将容器中的数据分段存储起来,为每一段数据配一把锁,当一个线程占用锁访问其中一段数据时,其他段的数据也能被其他线程访问。

6.1.2 ConcurrentHashMap 的结构

ConcurrentHashMap 是由 Segment 数组结构和 HashEntry 数组结构组成。

Segment 是一种可重入锁(ReentrantLock),扮演锁的角色。Segment与HashMap 类似,是一种数组和链表结构。

HashEntry 则用于存储键值对数据。每个 Segment 守护着一个 HashEntry 数组里的元素,当对 HashEntry 数组的数据进行修改时,必须先获得对应的 Segment 锁。

6.1.3 ConcurrentHashMap 初始化

通过 initalCapacity、loadFactor 和 concurrencyLevel 等几个参数来初始化 segment 数组、段偏移量 segmentShift、段掩码 segmentMask 和每个 segment 里的 HashEntry 数组,以此来初始化 ConcurrentHashMap。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
/**
* Creates a new, empty map with an initial table size based on
* the given number of elements ({@code initialCapacity}), table
* density ({@code loadFactor}), and number of concurrently
* updating threads ({@code concurrencyLevel}).
*
* @param initialCapacity the initial capacity. The implementation
* performs internal sizing to accommodate this many elements,
* given the specified load factor.
* @param loadFactor the load factor (table density) for
* establishing the initial table size
* @param concurrencyLevel the estimated number of concurrently
* updating threads. The implementation may use this value as
* a sizing hint.
* @throws IllegalArgumentException if the initial capacity is
* negative or the load factor or concurrencyLevel are
* nonpositive
*/
public ConcurrentHashMap(int initialCapacity,
float loadFactor, int concurrencyLevel) {
if (!(loadFactor > 0.0f) || initialCapacity < 0 || concurrencyLevel <= 0)
throw new IllegalArgumentException();
if (initialCapacity < concurrencyLevel) // Use at least as many bins
initialCapacity = concurrencyLevel; // as estimated threads
long size = (long)(1.0 + (long)initialCapacity / loadFactor);
int cap = (size >= (long)MAXIMUM_CAPACITY) ?
MAXIMUM_CAPACITY : tableSizeFor((int)size);
this.sizeCtl = cap;
}

1. 初始化 Segments 数组

1
2
3
4
5
6
7
8
9
10
11
if(concurrencyLevel > MAX_SEGMENTs)
concurrencyLevel = MAX_SEGMENTS;
int sshift = 0;
int ssize = 1;
while(ssize < concurrencyLevel){ // 必须保证 segments 数组长度是 2的N次方,如 concurrencyLevel = 14,ssize = 16
++sshift;
ssize <<= 1;
}
segmentShift = 32 - sshift;
segmentMask = ssize - 1;
this.segments = Segment.newArray(ssize);

Segments 数组的程度是由 ssize 决定的,而ssize 是通过 concurrencyLevel 计算得出的 ,原因是必须保证 ssize 是 2的N次方。

2. 初始化 SegmentShift 和 SegmentMask

段偏移量 SegmentShift:segmentShift = 32 - sshift;

段掩码:segmentMask = ssize - 1; 是散列运算的掩码,等于 ssize - 1,因为 ssize 为2的N次方,因此 segmentMask 掩码的二进制各个位都是 1.

3. 初始化每个 segment

initialCapacity 是 ConcurrentHashMap 的初始化容量,loadFactor 是每个 segment 的负载因子。

4. 定位 Segment

ConcurrentHashMap 在插入和获取元素时,会通过散列算法定位 Segment,并会对元素的 hashCode 进行一次再散列。

在散列的目的是减少散列冲突,使元素能均匀地分布在不同的 Segment 上,从而提高容器的存取效率。

如果不进行再散列的话,无论散列值的高位是多少,只要低位相同,都会被存储到一个Segment 上。

6.1.5 ConcurrentHashMap 操作

1. get操作

先进行再散列,然后使用散列值进行散列运算,定位 Segment,再通过散列算法定位到元素。

1
2
3
4
public V get(Object key){
int hash = hash(key.hash);
return segmentFor(hash).get(key,hash);
}

整个 get 操作不需要加锁,除非读到的值是空才会加锁重读。原因是 ConcurrentHashMap 将需要共享的变量都定义为 volatile 类型。

2. put操作

put操作会对共享变量进行写操作,所以必须加锁。

put 方法首先定位到 Segment,然后在 Segment 里进行插入操作。在插入之前,先判断是否需要对 Segment 里的 HashEntry 进行扩容,然后定位添加元素的位置,将其放入 HashEntry 数组中。

3. size 操作

统计 ConcurrentHashMap 里元素的大小,就必须统计所有 Segment 里元素的大小后求和。 Segment 里的count 是一个 volatile 变量,但是多线程场景下,某个 segment 的 count 发生改变后,也可能会使结果不准确。

因此 size 操作的具体过程为:先尝试 2 次通过不锁住 Segment 的方式来统计各个 Segment 大小,如果统计过程中,容器的 count 发生了变化(容器 count 会在 put、remove、clean 时会使 modCount + 1),再采用加锁的方式(将所有 Segment 的 put、remove、clean 方法全部锁住)来统计 Segment 的大小。

6.2 ConcurrentLinkedQueue

ConcurrentLinkedQueue 是一个基于链接节点的无界线程安全队列,采用 FIFO 的规则对节点进行排序。

6.2.1 ConcurrentLinkedQueue 结构

ConcurrentLinkedQueue 由 head 节点和 tail 节点组成,每个节点(Node)由节点元素(item)和指向下一个节点(next)的引用组成,节点与节点之间就是通过这个 next 关联,组成一张链表结果的队列。

6.2.2 入队列

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
/**
* 插入元素到当前队列的尾部,且因为队列是无界的,结果总是true
*/
public boolean offer(E e) {
checkNotNull(e);
// 1. 将插入元素构建为 node
final Node<E> newNode = new Node<E>(e);

for (Node<E> t = tail, p = t;;) {
Node<E> q = p.next;
if (q == null) {
if (p.casNext(null, newNode)) {
if (p != t)
casTail(t, newNode);
return true;
}
}
else if (p == q)
p = (t != (t = tail)) ? t : head;
else
p = (p != t && t != (t = tail)) ? t : q;
}
}

1. 入队列

入队列就是将入队节点添加到队列的尾部。在入队时,先将入队节点设置成当前队列尾节点的下一个节点,然后更新 tail 节点,如果 tail 节点的 next 节点不为空,则将入队节点设置成 tail 节点,如果 tail 节点的 next 节点为空,则将入队节点设置为 tail 的 next 节点。

在设置入队节点为 tail 的 next 节点时,会使用 CAS 来保证多线程下的安全性。

2. 定位尾节点

1
2
3
4
5
6
7
8
9
/**
* Returns the successor of p, or the head node if p.next has been
* linked to self, which will only be true if traversing with a
* stale pointer that is now off the list.
*/
final Node<E> succ(Node<E> p) {
Node<E> next = p.next;
return (p == next) ? head : next;
}

3. 设置入队节点为尾节点

p.casNext(null,n) 将入队节点设置为当前队列尾节点的 next 节点,如果 p 为null,表示 p 是当前队列的尾节点,不为空则表示其他线程更新了尾节点,需要重新获取当前队列的尾节点。

6.2.3 出队列

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
public E poll() {
restartFromHead:
for (;;) {
// 1. 获取头节点
for (Node<E> h = head, p = h, q;;) {
E item = p.item;

// 2. 头节点不为空,则用 CAS 将头节点的引用置null;为空则表示头节点被另一线程的出队操作取出
if (item != null && p.casItem(item, null)) {
// 3. CAS 操作成功,直接返回头节点
if (p != h)
updateHead(h, ((q = p.next) != null) ? q : p);
return item;
}
else if ((q = p.next) == null) {
updateHead(h, p);
return null;
}
// 4. 如果取出失败,则继续进行循环取值
else if (p == q)
continue restartFromHead;
else
p = q;
}
}
}

6.3 阻塞队列

6.3.1 什么是阻塞队列

阻塞队列是一个支持阻塞插入和阻塞移除的方法。

插入和移除操作的4中处理方式:

方法/处理方式 抛出异常 返回特殊值 一直阻塞 超时退出
插入方法 add(e) offer(e) put(e) offer(e,time,unit)
移除方法 remove(e) poll() take() poll(time,unit)
检查方法 element() peek() / /

6.3.2 Java 阻塞队列

JDK 中提供了 7个阻塞队列:

  • ArrayBlockingQueue:由数组结构组成的有界阻塞队列
    • 队列按照FOFO原则对元素进行排序
    • 默认不保证线程访问队列时的公平性
  • LinkedBlockingQueue:由链表结构组成的有界阻塞队列
    • FIFO原则对元素进行排序
    • 队列默认和最大长度为 Integer.MAX_VALUE
  • PriorityBlockingQueue:支持优先级排序的无界阻塞队列
    • 支持优先级、无界阻塞
    • 元素采用自然顺序升序排列,也可以自定义 compareTo 来排序
    • 不能保证同优先级元素的顺序
  • DelayQueue:使用优先级队列实现的无界阻塞队列
    • 支持延迟获取元素
    • 队列元素必须实现 Delayed 接口
    • 队列使用 PriorityQueue 实现
    • 创建元素时可指定多久后才能获取当前元素
    • DelayQueue常用于:
      • 缓存系统的设计:用 DelayQueue 保存缓存元素的有效期
      • 定时任务调度:使用 DelayQueue 保存执行的任务和执行时间
  • SynchronousQueue:不存储元素的阻塞队列
    • 不存储元素,每个put操作必须等待一个 take 操作
    • 支持公平访问队列,默认是非公平的
    • 队列本身不存储任何元素,适合传递性场景
    • 吞吐量高于 LinkedBlockingQueue 和 ArrayBlockingQueue
  • LinkedTransferQueue:由链表结构组成的无界阻塞队列
    • 提供 transfer 方法,立即将元素 transfer 给消费者,如果无消费者等待则将元素放在队列的 tail 节点,等待消费者消费
    • 提供 tryTransfer 方法,试探生产者传入的元素能够直接传给消费者,如果无消费者等待接收,则返回false。
  • LinkedBlockingQueue:由链表结构组成的双向阻塞队列
    • 双向队列,使多线程同时入队时,减少了一般竞争,因为多了一个入队口
    • 增加了 addFirst、addLast、offerFirst、offerLast、peekFirst、peekLast等方法
    • add 等同于 addLast,remove 等同于 removeFirst

第七章 Java 中的 13 个原子操作类

7.1 原子更新基本类型

Atomic 包提供了3中基本类:

  • AtomicBoolean
  • AtomicInteger
  • AtomicLong

这三个类的方法几乎一样,这里以 AtomicInteger 为例。AtomicInteger 的常用方法如下:

  • int addAndGet(int delta)
    • 以原子方式相加原值与传入值,返回结果
  • boolean compareAndSet(int expect, int update)
    • 如果输入值等于预期值,则以原子方式设置该值
  • int getAndIncrement()
    • 原子方式自增,返回旧值
  • void lazySet(int newValue)
    • 延迟设置值,可能导致其他线程在一小段时间内还是读到旧值
  • int getAndSet(int newValue)
    • 以原子方式设置为 newValue,返回旧值

AtomicBoolean 是把 Boolean转换为 整型,再使用 compareAndSwapInt 进行CAS。

7.2 原子更新数组

Atomic 包提供了以下 4 个类:

  • AtomicIntegerArray
    • 原子更新整型数组里的元素
  • AtomicLongArray
    • 原子更新长整型数组里的元素
  • AtomicReferenceArray
    • 原子更新引用类型数组里的元素

以 AtomicIntegerArray 为例,常用方法有:

  • int addAndGet(int i,int delta)
  • boolean compareAndSet(int i,int expect,int update)

7.3 原子更新应用类型

Atomic 包提供了以下 3 个类:

  • AtomicReference
    • 原子更新引用类型
  • AtomicReferenceFieldUpdater
    • 原子更新引用类型里的字段
  • AtomicMarkableReference
    • 原子更新带有标记位的应用类型

7.4 原子更新字段类

Atomic 包提供了以下 3 个类:

  • AtomicIntegerFieldUpdater
    • 原子更新整型的字段-更新器
  • AtomicLongFieldUpdater
    • 原子更新长整型字段的更新器
  • AtomicStampedReference
    • 原子更新带版本号的应用类型

第八章 Java 中的并发工具类

8.1 CountDownLatch 等待多线程完成

CountDownLatch 允许一个或多个线程等待其他线程完成操作。

CountDownLatch 在构造时接收 int 参数作为计数器,当线程执行到 countDown 是,计数器数值 N - 1 。可以是 1 个线程执行 N 个步骤,也可以是多个线程共同执行 N 个步骤。

await 方法会使线程进入等待状态,如果线程执行过久,可以使用 await(long time,TimeUnit unit),等待特定时间。

8.2 CyclicBarrier 同步屏障

CyclicBarrier 意思为可循环使用的屏障,当一组达到一个屏障时被阻塞,知道最后一个线程到达屏障时,屏障才会打开,所有被屏障拦截的线程才会继续运行。

CountDownLatch 的计数器只能使用一次

8.2.1 CyclicBarrier

CyclicBarrier(int parties) 构造函数的参数表示需要屏障拦截的线程数量,线程调用 await 方法通知 CyclicBarrier 已到达屏障,然后线程被阻塞。

  1. 如果 parties 参数为 3,但是只有两个线程执行 await 方法,那么这两个线程会一直阻塞
  2. 构造函数:CyclicBarrier(int parties,Runnable barrierAction) ,会在线程到达屏障时,优先执行 barrierAction,方便处理更复杂的业务场景。
  3. getNumberWaiting :获取 CyclicBarrier 阻塞的线程数量
  4. isBroken() :用来了解阻塞的线程是否被中断
  5. reset() :重置 CyclicBarrier 的计数器

8.3 Semaphore 控制并发线程数

Semaphore(信号量)用来控制同时访问特定资源的线程数量,通过协调各个线程,以保证合理使用公共资源。

1. 应用场景

Semaphore 可以用于流量控制,特别是公共资源有限的应用场景,如数据库连接等。比如几十条线程并发读取数据,但是数据库只支持10个连接数,此时就需要使用 Semaphore 做流量控制,否则会报错,无法获取数据库连接。

Semaphore 的常用方法:

  • acquire() :获取一个许可证
  • release() :释放一个许可证
  • tryAcquire:尝试获取许可证
  • int availablePermits():返回此信号量中当前可用的许可证数
  • int getQueueLength():返回正在等待获取许可证的线程数
  • boolean hasQueuedThreads():是否有线程正在等待获取许可证
  • void reducePermits(int reduction):减少reduction 个许可证(protected方法)
  • Collection getQueuedThreads():返回所有等待获取许可证的线程集合(protected方法)

8.4 Exchanger 线程间交换数据

Exchange(交换者)是一个用于线程间协作的工具类。

Exchanger 用于进行线程间的数据交换。它提供一个同步点,在这个同步点,两个线程可以交换彼此的数据。

当第一个线程先执行 exchange() 方法,它会一直等待第二个线程也执行 exchange 方法,当两个线程都到达同步点后,会将彼此线程生产出来的数据传递给对方。

应用场景:

  1. Exchanger 可以用于遗传算法
    1. 如AB进行交配,当交换数据后使用交叉规则得出两个交配结果
  2. Exchanger 可以用于校对工作
    1. 如A、B线程同时执行,结果是否相同

如果有一个线程一直未执行 exchange() ,则会一直等待。可以使用 exchange(V x,long timeout,TimeUnit unit) 设置最大等待时间。


第九章 Java 中的线程池

Java 中的线程池是运用场景最多的并发框架,几乎所有异步或并发执行的程序都可以使用线程池。合理使用线程池的好处有:

  1. 降低资源消耗
    1. 通过重复利用已创建的线程降低线程创建和销毁时的消耗
  2. 提高响应速度
    1. 当任务抵达时,无需创建线程,就能立即执行
  3. 提高线程的可管理型
    1. 线程池能够统一分配、调优和监控线程
    2. 线程属于稀缺资源,不能无限制的创建

9.1 线程池的实现原理

线程池的主要处理流程

ThreadPoolExecutor 执行示意图

ThreadPoolExecutor.execute(Runnable command) 方法如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
/**
* Executes the given task sometime in the future. The task
* may execute in a new thread or in an existing pooled thread.
*
* If the task cannot be submitted for execution, either because this
* executor has been shutdown or because its capacity has been reached,
* the task is handled by the current {@code RejectedExecutionHandler}.
*
* @param command the task to execute
* @throws RejectedExecutionException at discretion of
* {@code RejectedExecutionHandler}, if the task
* cannot be accepted for execution
* @throws NullPointerException if {@code command} is null
*/
public void execute(Runnable command) {
if (command == null)
throw new NullPointerException();
/*
* Proceed in 3 steps:
*
* 1. If fewer than corePoolSize threads are running, try to
* start a new thread with the given command as its first
* task. The call to addWorker atomically checks runState and
* workerCount, and so prevents false alarms that would add
* threads when it shouldn't, by returning false.
*
* 2. If a task can be successfully queued, then we still need
* to double-check whether we should have added a thread
* (because existing ones died since last checking) or that
* the pool shut down since entry into this method. So we
* recheck state and if necessary roll back the enqueuing if
* stopped, or start a new thread if there are none.
*
* 3. If we cannot queue task, then we try to add a new
* thread. If it fails, we know we are shut down or saturated
* and so reject the task.
*/
int c = ctl.get();
if (workerCountOf(c) < corePoolSize) {
if (addWorker(command, true))
return;
c = ctl.get();
}
if (isRunning(c) && workQueue.offer(command)) {
int recheck = ctl.get();
if (! isRunning(recheck) && remove(command))
reject(command);
else if (workerCountOf(recheck) == 0)
addWorker(null, false);
}
else if (!addWorker(command, false))
reject(command);
}

9.2 线程池的使用

9.2.1 创建线程池

1
ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory, RejectedExecutionHandler handler)

通过上面的构造方法来创建一个线程池,其具体参数如下:

  1. corePoolSize 线程池的基本大小

    1. 线程池的基本大小,当任务提交时,会创建线程进行执行,且不会销毁。
    2. 调用 prestartAllCoreThreads() 提前创建并启动所有基本线程
  2. maximumPoolSize

    1. 线程池允许创建的最大线程数
  3. keepAliveTime

    1. 线程活动保持时间
  4. workQueue

    1. 任务队列,用于保存等待执行的任务的阻塞队列
      1. ArrayBlockingQueue
      2. DelayedWorkQueue
      3. ForwardingBlockingQueue
      4. SynchronousQueue
      5. DelayQueue
      6. LinkedBlockingQueue
      7. PriorityBlockingQueue
  5. threadFactory

    1. 用于设置创建线程的工厂

    2. 如使用 guava 提供的 ThreadFactoryBuilder 可以快速给线程池中的线程设置有意义的名字,如:

    3. new ThreadFactoryBuilder().setNameFormat(“XX-task-%d”).build();

  6. handler

    1. 饱和策略,当队列和线程池都满了,则需要一种策略来处理新提交的任务
    2. 默认策略为 AbortPolicy
    3. JDK 提供的几种策略:
      1. DiscardOldestPolicy:丢弃队列里最近的一个任务,并执行当前任务
      2. AbortPolicy:直接抛出异常
      3. CallerRunsPolicy:只用调用者所在线程来运行任务
      4. DiscardPolicy:不处理,丢弃掉

9.2.2 提交任务至线程池

execute() 方法,用于提交不需要返回值的任务,所以无法判断任务是否执行成功。

submit() 方法,用于提交需要返回值的任务。线程池会返回一个 future 类型对象。

9.2.3 关闭线程池

通过 shutdown 或 shutdownNow 方法来关闭线程池。

原理:遍历线程池中的工作线程,逐个调用线程的 interrupt 方法来中断线程,所以无法相应的线程可能永远无法终止。

调用这两个方法后,isShutdown 会返回true;当所有任务都关闭后,isTerminated 返回true。

通常会使用 shutdown 来关闭线程池,但是如果任务不一定要执行完毕,可以调用 shutdownNow 方法。

9.2.4 合理地配置线程池

想要合理配置线程池,需要先分析任务的特性:

  • 任务的性质:CPU 密集型任务、IO 密集型任务、混合型任务
    • CPU 密集型任务应该配置尽可能小的线程,如 N(cpu) + 1 个线程的线程池
    • IO 密集型任务并非一直执行任务,应该配置尽可能多的线程,如 2*N(cpu) 个线程的线程池
    • 混合型任务:如果可以拆分,将其拆分为一个 CPU 密集型任务和一个 IO 密集型任务
  • 任务的优先级:高、中、低
    • 优先级不同的任务,可以使用 PriorityBlockingQueue 优先级队列来处理
    • 如果一直提交高优先级任务,则低优先级任务可能永远不会执行
  • 任务的执行时间:长、中、短
    • 可以使用优先级队列,让执行时间短的任务先执行
    • 或者不同时间的任务交给不同规模的线程池来处理
  • 任务的依赖性:是否依赖其他系统资源,如数据库连接
    • 如依赖数据库连接的线程,当提交SQL 给数据库后需要等待数据库返回结果,等待时间越长,CPU空闲越久。那么线程数可以设置的越大,更好的利用CPU

Runtime.getRuntime().availableProcessors() 获取当前设备的 CPU 个数。

建议使用有界队列 。有界队列能增加系统的稳定性和预警能力。

9.2.5 线程池监控

监控线程池时可以使用以下属性:

  • taskCount:线程池需要执行的任务数量
  • completedTaskCount:线程池在运行过程中已完成的任务数量,<= taskCount
  • largestPoolSize:线程池里曾创建过的最大线程数量。
    • 可以通过该数据知道线程池是否满过
  • getPoolSize:线程池的线程数量
    • 如果线程池不销毁的话,线程池里的线程不会自动销毁
  • getActiveCount:获取活动的线程数

或者可以通过继承线程池来自定义线程池,重写线程池的 beforeExecute、afterExecute和terminated 方法。


第十章 Executor 框架

从 JDK 1.5 开始,Java 将工作单元与执行机制分离开来。

工作单元包括 Runnable 和 Callable,而执行机制由 Executor 框架提供。

10.1 Executor 框架介绍

10.1.1 Executor 框架的两级调度模型

上层:Java 多线程程序通常把应用分解为若干个任务,然后使用用户级的调度器(Executor 框架)将这些任务映射为固定数量的线程。

底层:操作系统内核将这些线程映射到硬件处理器上。

10.1.2 Executor 框架结构与成员

1. Executor 框架的结构

Executor 框架主要由 3 部分组成:

  • 任务。包括被执行任务需要实现的接口:Runnable、Callable接口
    • Runnable 接口和 Callable 接口 都可以被 ThreadPoolExecutor 或 ScheduledThreadPoolExecutor 执行
  • 任务的执行。包括任务执行机制的核心接口 Executor,以及继承自 Executor 的 ExecutorService 接口。
    • Executor ,是 Executor 框架的基础,将任务的提交与任务的执行分离开来
    • ThreadPoolExecutor,是线程池的核心实现类,用来执行被提交的任务
    • ScheduledThreadPoolExecutor,可以在给定的延迟后运行命令,或者定期执行命令,比 Timer 更灵活、强大。
  • 异步计算的结果。包括接口 Future 和实现 Future 的 FutureTask 类
    • Future 接口和实现 Future 的 FutureTask 类,代表异步计算的结果。

主要流程为:

  1. 主线程创建实现 Runnable、Callable 接口的任务对象
  2. Executors 将 Runnable 对象封装为 Callable 对象
  3. 将 Runnable 对象交给 ExecutorService.execute(Runnable command) 执行,或者交给 ExecutorService.submit(Callable task) 执行
    1. 如果执行 submit,则 ExecutorService 返回一个实现 Future 接口的对象。
    2. 由于 FutureTask 实现了 Runnable,因此FutureTask 也可以直接交给 ExecutorService 执行
  4. 主线程执行 FutureTask.get() 方法等待任务执行完成;或者执行 FutureTask.cancel(boolean mayInterruptIfRunning) 来取消任务执行

2. Executor 框架的成员

  1. ThreadPoolExecutor:通常由工厂类 Executors 创建,可创建以下 3 种类型的ThreadPoolExecutor
    1. FixedThreadPool:创建使用固定线程数的ThreadPool,适用于需要限制当前线程数量的应用场景,适用于负载较重的服务器。
    2. SingleThreadExecutor:创建使用单个线程的 ThreadPool,适用于需要顺序执行各个任务;且在任意时间点,不会有多个线程是活动的应用场景
    3. CachedThreadPool:创建一个会根据需要创建新线程的 ThreadPool,是大小无界的线程池,适用于执行很多的短期异步任务的小程序,或者是负载较轻的服务器。
  2. ScheduledThreadPoolExecutor:通常由工厂类 Executors 创建,可创建以下 2 种类型的 ScheduledThreadPoolExecutor
    1. ScheduledThreadPoolExecutor,包含若干线程,适用于需要多个后台线程周期执行任务,同时需要限制后台线程数量的场景
    2. SingleThreadScheduledExecutor,只包含一个线程,适用于需要单个后台线程执行周期任务,同时需要保证顺序执行各个任务的应用场景
  3. Future/FutureTask:表示异步计算的结果
  4. Runnable/Callable:被 ThreadPoolExecutor 执行的任务。
    1. Runnable 不返回结果
    2. Callable 返回结果

10.2 ThreadPoolExecutor 详解

  • corePool:核心线程池的大小

  • maximumPool:最大线程池大小

  • BlockingQueue:阻塞队列,用来暂存任务的工作队列

  • RejectedExecutionHandler:拒绝策略,当 ThreadPoolExecutor 关闭或饱和时,将要调用的 Handler