1. 什么是并发
并发:可简单理解为多线程操作相同的资源,在保证线程安全的前提下合理的使用资源。
下面本文将从线程与线程安全两个方面来展开并发的介绍。
讲到线程,我们应该听说过一个老生常谈的问题,什么是线程?与进程有什么区别?
- 进程:可简单理解为是资源分配的最小单位
- 线程:可简单理解为是CPU调度的最小单位
通俗来讲我们计算机中运行的每一个服务或应用都可以称为一个进程。在每一个服务里面,具体到CPU执行的逻辑就是线程。
那么,为什么要分为线程与进程呢?
本质就是CPU的执行速度远快于其他硬件的执行。因此为了充分的利用CPU的资源,将程序的执行按CPU工作时间段分为了进程与线程。
下面就让我们简单来了解下CPU的硬件内存模型。
2. 硬件内存模型
2.1 为什么需要缓存?
因为执行速度上,CPU远大于内存,加入缓存是为了解决CPU和内存之间速度不匹配的问题。
2.2 缓存的局部性原理
- 时间局部性:如果某个数据被访问,那么它在不久的将来有可能被再次访问
- 空间局部性:如果某个数据被访问,那么与它相邻的数据很快可能被访问
2.3 缓存一致性协议(MESI)的四种状态
- M: 被修改(Modified)
- 数据有效
- 内存中的数据不一致
- 存在于本地Cache中
- E: 独享的(Exclusive)
- 数据有效
- 数据和内存中的数据一致
- 只存在于本地Cache中
- S: 共享的(Shared)
- 数据有效
- 数据和内存中的数据一致
- 存在于很多Cache中
- I: 无效的(Invalid)
- 数据无效
2.4 CPU乱序执行优化
CPU为了进一步提高运算速度,会做出违背代码原有顺序的优化。
3. JMM(Java内存模型)
CPU内存模型如上所述,那么Java又是怎么工作的呢?
这里的共享变量对应着就是JVM运行时数据区的堆和方法区。
而线程中独享的对应JVM运行时数据区中的就是栈、本地方法栈和PC寄存器。
另外需要说明JMM与JVM内存模型不是一个概念,这里需要注意下。JMM 的主要是控制程序中变量的访问规则,而JVM内存模型包涵了Java执行全流程的规则。
由上图可知,每个线程需要操作共享变量时,都会把共享变量拷贝一份到线程本地的工作内存中。操作完后,通过JVM协调控制下写回主存,从而实现了多线程的并发操作。
倘若没JVM的协调会发生什么情况呢?
A、B线程都将共享变量读入本地内存,A、B同时进行修改,由于A、B的工作内存是各自独享的,此时A、B将修改后的变量同时写回主存,那么问题来了,到底主存中的变量被修改成了A的还是B的呢?
我们将此类问题就称为线程安全问题。
4. 线程安全
什么是线程安全
当多个线程访问某个类时,不管运行时环境采用何种调度方式或者这些线程将如何交替执行,并且在主调代码中不需要任何额外的同步或协同,这个类都能表现出正确的行为,那么就称这个类是线程安全的。
线程安全的三大特性
- 原子性
- 同一时刻只有一个线程对变量进行操作
- 可见性
- 一个线程对主存的修改可以及时的被其他线程观察到
- 有序性
- 如果两个线程不能从 happens-before原则 观察出来,那么就不能观察他们的有序性
4.1 原子性
Java一般通过CAS(compareAndSwap)和synchronized保证原子性。
4.1.1 CAS的实现过程
CAS是compareAndSwap(比较与交换)的缩写。以AtomicInteger为例。
//1. AtomicInteger.java
//该方法调用unsafe类的getAndAddInt
public final int incrementAndGet() {
return unsafe.getAndAddInt(this, valueOffset, 1) + 1;
}
//2. Unsafe.java
//可看到该方法调用的compareAndSwapInt是JNI方法
/**
* 原子性的更新一个对象在偏移量为offset处的成员变量的值,或者原子性的更新一个数组在偏移量为var2处的元素的值。
*
* @param var1 更新成员变量的对象,或者更新元素的数组
* @param var2 成员变量或者数组元素的偏移
* @param var4 要增加到的量
* @return 先前的值
*/
public final int getAndAddInt(Object var1, long var2, int var4) {
int var5;
do {
//获取AtomicInteger对象在内存中偏移量为var2处的值
var5 = this.getIntVolatile(var1, var2);
} while(!this.compareAndSwapInt(var1, var2, var5, var5 + var4));
return var5;
}
//3. Unsafe.java
/**
* 如果Java变量的当前值是var4,则原子性的把该变量的值更新为var5。
* @return 成功返回true
* /
public final native boolean compareAndSwapInt(Object var1, long var2, int var4, int var5);
通过上面3步就实现了AtomicInteger的自增取值操作。
其中我们可以看到CAS的核心思路其实就是循环判断第3步JNI方法是否成功。因此我们总结下CAS的原理。
原理:先获主存内该变量的值。然后判断该变量在获取后的时间段内是否被其他线程更新,如果没有,就更新成指定的值,否则就再来一遍。
4.1.2 CAS存在的问题
- ABA问题
- 描述:在比较检查时,如果内存中的值从A更新到B又被更新到A,此时CAS检查是没有发生改变的,但实际是变化了的。
- 解决方法:在变量前面添加版本号,每次变量更新的时候都把版本号加一,这样变化过程就从“A-B-A”变成了“1A-2B-3A”
- 对应类:AtomicStampedReference
- 只能保证一个共享变量的原子操作
- 描述:对一个共享变量执行操作时,CAS能够保证原子操作,但是对多个共享变量操作时,CAS是无法保证操作的原子性的。
- 解决方法:JDK提供了AtomicReference类来保证引用对象之间的原子性
- 对应类:AtomicReference
- 循环时间长开销大
- 描述:CAS操作如果长时间不成功,会导致其一直自旋,给CPU带来非常大的开销
- 解决方法:尽量减少并发数
4.1.3 synchronized的实现
synchronized是通过Java对象头中的monitor实现线程同步的,而monitor底层又是依赖操作系统的互斥锁实现的。
具体操作如下。
- 锁方法
- 在字节码中,给方法添加flag
ACC_SYNCHRONIZED
实现
- 在字节码中,给方法添加flag
- 锁同步块
- 在字节码中,通过
monitorenter
和monitorexit
JVM指令实现
- 在字节码中,通过
4.1.4 synchronized的锁优化
偏向锁 -> 轻量级锁 -> 重量级锁。
4.2 可见性
Java中可以通过synchronized
与volatile
实现可见。
-
synchronized
- 线程解锁前,必须把共享变量的最新值刷新到主存
- 线程加锁时,将清空工作内存中的共享变量,从主存中重新获取
-
volatile
- 通过内存屏障实现
- 变量写时,在写后加入Store屏障指令,将工作区的共享变量刷新到主存
- 变量读时,在读之前加入一条load屏障指令,从主存中读取共享变量
- 通过禁止重排序实现
- 通过内存屏障实现
volatile的本质是告诉JVM当前工作内存中的值是不确定的,需要从主存中获取最新值。没有对变量加锁,因此volatile不保证原子性。
4.3 有序性
因为指令重排序的存在,JVM中执行的代码可能会乱序,因此Java内存模型具备一套先天的有序性原则,happens-before原则
- happens-before原则
- 程序次序原则
- 锁定原则
- volatile变量规则
- 传递规则
- 线程启动原则
- 线程中断原则
- 线程终结规则
- 对象终结
5. 如何保证线程安全
5.1 单例模式
- 懒汉-doubleCheck式
- 饿汉式
- 静态内部类
- 枚举类
以上模式在单例模式中有详细解释,这里就不多说了。
5.2 不可变对象
满足条件
- 对象创建以后其状态就不能修改
- 对象所有域都是final类型
- 对象正确创建(无逃逸)
5.2.1 final关键字
- 修饰类
- 不能再被继承
- 修饰方法
- 锁定方法不被继承类修改
- 修饰变量
- 基本数据类型
- 不可修改
- 引用数据类型
- 不能指向另外一个对象
- 可以修改对象里的值
- 基本数据类型
5.2.2 不可变对象工具类
- Collections.unmodifiableXXX:Collection、List、Set、Map...
- Guava:ImmutableXXX:Collection、List、Set、Map...
- 注:这些类型的对象已经初始化,就不能修改引用,也不能修改里面的值
5.3 线程封闭
- 堆栈线程
- 局部变量,无并发问题
- ThreadLocal线程封闭
- 每个线程调用全局ThreadLocal对象的set方法,就相当于往其内部的map增加一条记录,key就是ThreadLocal对象,而value就是各自线程通过set方法传进去的值
5.4 线程安全容器
5.4.1 同步容器
- ArrayList(不安全) -> Vector,Stack(安全)
- Vector继承List
- 虽然每个方法都被synchronized修饰,但不同方法在不同线程中执行时,会有线程安全问题,如:a线程获取,b线程同时移除时,会产生指针越界
- stack继承Vector
- Vector继承List
- HashMap(不安全) -> HashTable(安全)
- hashtable key,value不能为null
- Collections.synchronizedxxx(List,Set,Map)
同步容器不严格保证安全,且加了synchronized修饰,性能较差,不建议生产使用
5.4.2 并发容器
- ArrayList(不安全)-> CopyOnWriteArrayList(安全)
- 特点
- 读写分离
- 最终一致
- 读操作不加锁,写需要加锁
- 缺点
- 拷贝数组时消耗内存
- 不能实时读取
- 特点
- HashSet(不安全)-> CopyOnWriteSet(安全)
- TreeSet(不安全) -> ConcurrentSkipListSet(安全)
- HashMap (不安全)-> ConcurrentHashMap(安全)
- TreeMap (不安全)-> ConcurrentSkipList(安全)
并发容器性能较好,推荐使用
6. Java并发工具包(J.U.C)
6.1 AbstractQueuedSynchroizer(AQS)
AQS是J.U.C的核心。
提供了一个基于FIFO的队列,可以用于构建锁或者其他相关同步装置的基础框架。
6.1.1 AQS的数据结构
AQS中state标记值为1 时表示有线程占用,其他线程需要进入到同步队列等待。同步队列是一个双向链表。
当获得锁的线程需要等待某个条件时,会进入 condition 的等待队列。等待队列为单向列表且可以有多个。
当condition条件满足时,线程会从等待队列重新进入同步队列进行获取锁的竞争。
6.2 基于AQS的其他工具类
6.2.1 CountDownLatch
CountDownLatch 是通过一个计数器来实现的,计数器的初始化值为线程的数量。每当一个线程完成了自己的任务后,计数器的值就相应得减 1。当计数器到达 0 时,表示所有的线程都已完成任务,然后在闭锁上等待的线程就可以恢复执行任务。
- 使用场景
- 并行计算
- code注意
- 在final中使用
countDownLatch.countDown()
countDownLatch.await(10, TimeUnit.MILLISECONDS)
可以指定时间,超时后会继续执行下面的请求,但提交的线程会仍被执行完毕
- 在final中使用
6.2.2 Semaphore
Semaphore可以控制并发访问的线程个数。
Semaphore主要方法说明
- void acquire():从此信号量获取一个许可,在提供一个许可前一直将线程阻塞,否则线程被中断
- void release():释放一个许可,将其返回给信号量
- tryAcquire():尝试获得一个许可
6.2.3 CyclicBarrier
利用CyclicBarrier类可以实现一组线程相互等待,当所有线程都到达某个屏障点后再进行后续的操作
CyclicBarrier主要方法说明
- barrier.await();阻塞线程,等线程都到达时,再执行后续操作
- barrier.await(long timeout, TimeUnit unit);阻塞线程,在超时时间内等线程都到达时,再执行后续操作,其余线程会抛出超时异常
6.2.4 ReentrantLock
ReentrantLock,可重入锁。
ReentrantLock与synchronized的区别
- 锁的实现
- ReentrantLock是JDK实现的
- synchronized是JVM实现的
- 性能
- synchronized优化后性能差不多
- 功能
- 便利性synchronized写法更简单
- ReentrantLock控制级别更细,灵活
ReentrantLock独有功能
- 可指定是公平锁还是非公平锁
- synchronized只能是非公平锁
- 公平锁:先等待的线程先获得锁
- 提供了Condition,可以分组唤醒需要的线程
- synchronized随机唤醒一个或者所有线程
- 提供能够中断等待锁的线程的机制, lock.lockInterruptibly
ReentrantLock使用
- lock(加锁)
- unlock(需要在finally中使用)
- trylock(尝试获取一把锁)
6.2.5 ReentrantReadWriteLock
- 在没有任何读写锁的时候,才可以取得写入锁
- 悲观读取
- 读取很多,写入很少时,会使写入线程遭遇饥饿
6.2.6 StampedLock
StampedLock类,在JDK1.8时引入,是对读写锁ReentrantReadWriteLock的增强,该类提供了一些功能,优化了读锁、写锁的访问,同时使读写锁之间可以互相转换,更细粒度控制并发。
6.3 J.U.C拓展组件
6.3.1 FutureTask
- 可以自定义返回值
- 异步执行,获取值时阻塞线程
6.3.2 Fork/Join
思想:将一个大的任务拆分成小的任务,然后多线程同时计算,提交计算效率。
demo
public class SumTask extends RecursiveTask<Integer> {
public static final int threshold = 2;
private int start;
private int end;
public SumTask(int start, int end) {
this.start = start;
this.end = end;
}
@Override
protected Integer compute() {
int sum = 0;
//如果任务足够小就计算任务
boolean canCompute = (end - start) <= threshold;
if (canCompute) {
for (int i = start; i <= end; i++) {
sum += i;
}
} else {
// 如果任务大于阈值,就分裂成两个子任务计算
int middle = (start + end) / 2;
SumTask leftTask = new SumTask(start, middle);
SumTask rightTask = new SumTask(middle + 1, end);
// 执行子任务
leftTask.fork();
rightTask.fork();
// 等待任务执行结束合并其结果
int leftResult = leftTask.join();
int rightResult = rightTask.join();
// 合并子任务
sum = leftResult + rightResult;
}
return sum;
}
public static void main(String[] args) throws Exception {
ForkJoinPool forkjoinPool = new ForkJoinPool();
//生成一个计算任务,计算1+2+3+4
SumTask task = new SumTask(1, 100);
//执行一个任务
Future<Integer> count = forkjoinPool.submit(task);
System.out.println("count = "+count.get());
}
}
6.3.3 BlockingQueue
阻塞队列的阻塞触发条件
- 当队列满的时候,进行入队操作
- 当队列空的时候,进行出队操作
特点
- 线程安全
- 适用于生产者/消费者
方法
操作类型 | 抛出异常 | 返回特殊值 | 阻塞线程 | 超时 |
---|---|---|---|---|
插入 | add(e) | offer(e) | put(e) | offer(e, time, unit) |
删除 | remove() | poll() | take() | poll(time, unit) |
读取 | element() | peek() | / | / |
实现类
- ArrayBlockingQueue
- FIFO
- 有界队列
- DelayQueue
- 阻塞内部元素
- 元素需排序
- 定时关闭连接
- 缓存对象
- LinkedBlockingQueue
- 链表
- FIFO
- 有界、无界可选
- PriorityBlockingQueue
- 无界队列
- 带有优先级的阻塞队列
- SynchronousQueue
- 同步移交队列
- 线程池无界或可拒绝任务时使用
7. 线程与线程池
7.1 线程几种状态
- NEW(新建)
- RUNNABLE(运行)
- BLOCKED(阻塞)
- WAITING(等待)
- TIME_WAITING(超时等待)
- TERMINATED(终结)
7.2 为什么使用线程池
7.2.1 直接new Thread的弊端
- 每次都新建线程,性能差
- 线程缺乏统一管理,无限创建新线程有OOM的风险
- 功能太少
7.2.2 线程池的好处
- 重用存在的线程
- 可有效控制并发
- 提供定时、定期执行
7.3 线程池(ThreadPoolExecutor)
7.3.1 初始化参数
- corePoolSize
- 核心线程数
- 小于核心线程数时,会新建线程处理
- maximumPoolSize
- 线程最大线程数
- 大于核心线程数,小于最大线程数,且workQueue已满,才会创建新的线程
- workQueue已满,大于最大线程数时,使用拒绝策略
- (BlockingQueue)workQueue
- 阻塞队列,储存待执行的任务
- keepAliveTime
- 线程没有任务执行时最多保存多长时间
- unit
- keepAliveTime单位
- threadFactory
- 线程工厂
- rejectHandler
- AbortPolicy(直接抛出异常)
- DiscardPolicy(丢弃当前的任务)
- DiscardOldestPolicy(丢弃workqueue靠前的任务)
- CallerRunsPolicy(在调用线程中执行)
- 自定义
7.3.2 线程创建顺序
corePoolSize -> workQueue -> maximumPoolSize -> 拒绝
7.3.3 BlockingQueue(workQueue)
- 有界队列
- ArrayBlockQueue
- 无界队列
- LinkedBlockQueue
- maximumPoolSize不会起作用
- 会到无界队列里
- LinkedBlockQueue
- 同步移交队列
- SynchronousQueue
7.4 线程池的使用
7.4.1 线程池的状态
- RUNNING
- 这是最正常的状态,接受新的任务,处理等待队列中的任务。线程池的初始化状态是RUNNING。线程池被一旦被创建,就处于RUNNING状态,并且线程池中的任务数为0。
- SHUTDOWN
- 不接受新的任务提交,但是会继续处理等待队列中的任务。调用线程池的shutdown()方法时,线程池由RUNNING -> SHUTDOWN。
- STOP
- 不接受新的任务提交,不再处理等待队列中的任务,中断正在执行任务的线程。调用线程池的shutdownNow()方法时,线程池由(RUNNING or SHUTDOWN ) -> STOP。
- TIDYING
- 所有的任务都销毁了,workCount 为 0,线程池的状态在转换为 TIDYING 状态时,会执行钩子方法 terminated()。因为terminated()在ThreadPoolExecutor类中是空的,所以用户想在线程池变为TIDYING时进行相应的处理;可以通过重载terminated()函数来实现。
7.4.2 线程池的常用方法
- execute
- 提交任务给线程池执行
- submit
- 提交任务给线程池执行并返回结果
- shutdown
- 关闭线程池,等待任务执行完
- shutdownNow
- 直接关闭线程池,不等待任务执行完毕
7.4.3 线程池的监控方法
- getTaskCount
- 线程已执行和未执行的任务总数
- getCompletedTaskCount
- 已完成的任务数量
- getPoolSize
- 当前线程数
- getActiveCount
- 当前线程池中正在执行任务的线程数量
7.5 Executor框架接口
- Executors.newCachedThreadPool
- 可缓存的线程池
- 空余时灵活回收空闲线程
- Executors.newFixedThreadPool
- 可控制最大并发数的线程池
- 超出线程,则在队列中等待
- Executors.newScheduledThreadPool
- 定时线程池
- 定时/周期的任务执行
- Executors.newSingleThreadPool
- 单线程化线程池
- 按指定顺序执行
7.6 线程池合理配置
- CPU密集型任务
- 尽量压榨CPU
- NCPU(CPU数量)+1
- IO密集型任务
- 2*NCPU(CPU数量)
本站文章除注明转载/出处外,均为本站原创或翻译,转载前请务必署名,转载请标明出处
最后编辑时间为: