Java并发与多线程-小结

/ 并发

1. 什么是并发

并发:可简单理解为多线程操作相同的资源,在保证线程安全的前提下合理的使用资源。

下面本文将从线程与线程安全两个方面来展开并发的介绍。

讲到线程,我们应该听说过一个老生常谈的问题,什么是线程?与进程有什么区别?

通俗来讲我们计算机中运行的每一个服务或应用都可以称为一个进程。在每一个服务里面,具体到CPU执行的逻辑就是线程。

那么,为什么要分为线程与进程呢?

本质就是CPU的执行速度远快于其他硬件的执行。因此为了充分的利用CPU的资源,将程序的执行按CPU工作时间段分为了进程与线程。

下面就让我们简单来了解下CPU的硬件内存模型。

2. 硬件内存模型

CPU硬件模型

2.1 为什么需要缓存?

因为执行速度上,CPU远大于内存,加入缓存是为了解决CPU和内存之间速度不匹配的问题。

2.2 缓存的局部性原理

2.3 缓存一致性协议(MESI)的四种状态

2.4 CPU乱序执行优化

CPU为了进一步提高运算速度,会做出违背代码原有顺序的优化。

3. JMM(Java内存模型)

CPU内存模型如上所述,那么Java又是怎么工作的呢?

Java内存模型

这里的共享变量对应着就是JVM运行时数据区的堆和方法区。

而线程中独享的对应JVM运行时数据区中的就是栈、本地方法栈和PC寄存器。

另外需要说明JMM与JVM内存模型不是一个概念,这里需要注意下。JMM 的主要是控制程序中变量的访问规则,而JVM内存模型包涵了Java执行全流程的规则。

由上图可知,每个线程需要操作共享变量时,都会把共享变量拷贝一份到线程本地的工作内存中。操作完后,通过JVM协调控制下写回主存,从而实现了多线程的并发操作。

倘若没JVM的协调会发生什么情况呢?

A、B线程都将共享变量读入本地内存,A、B同时进行修改,由于A、B的工作内存是各自独享的,此时A、B将修改后的变量同时写回主存,那么问题来了,到底主存中的变量被修改成了A的还是B的呢?

我们将此类问题就称为线程安全问题。

4. 线程安全

什么是线程安全

当多个线程访问某个类时,不管运行时环境采用何种调度方式或者这些线程将如何交替执行,并且在主调代码中不需要任何额外的同步或协同,这个类都能表现出正确的行为,那么就称这个类是线程安全的。

线程安全的三大特性

4.1 原子性

Java一般通过CAS(compareAndSwap)synchronized保证原子性。

4.1.1 CAS的实现过程

CAS是compareAndSwap(比较与交换)的缩写。以AtomicInteger为例。

//1. AtomicInteger.java
//该方法调用unsafe类的getAndAddInt
public final int incrementAndGet() {
    return unsafe.getAndAddInt(this, valueOffset, 1) + 1;
}

//2. Unsafe.java
//可看到该方法调用的compareAndSwapInt是JNI方法
/**
 * 原子性的更新一个对象在偏移量为offset处的成员变量的值,或者原子性的更新一个数组在偏移量为var2处的元素的值。
 *
 * @param var1 更新成员变量的对象,或者更新元素的数组
 * @param var2 成员变量或者数组元素的偏移
 * @param var4 要增加到的量
 * @return 先前的值
 */
public final int getAndAddInt(Object var1, long var2, int var4) {
    int var5;
    do {
        //获取AtomicInteger对象在内存中偏移量为var2处的值
        var5 = this.getIntVolatile(var1, var2);
    } while(!this.compareAndSwapInt(var1, var2, var5, var5 + var4));

    return var5;
}

//3. Unsafe.java
/**
 * 如果Java变量的当前值是var4,则原子性的把该变量的值更新为var5。
 * @return 成功返回true
 * /
public final native boolean compareAndSwapInt(Object var1, long var2, int var4, int var5);

通过上面3步就实现了AtomicInteger的自增取值操作。

其中我们可以看到CAS的核心思路其实就是循环判断第3步JNI方法是否成功。因此我们总结下CAS的原理。

原理:先获主存内该变量的值。然后判断该变量在获取后的时间段内是否被其他线程更新,如果没有,就更新成指定的值,否则就再来一遍。

4.1.2 CAS存在的问题
4.1.3 synchronized的实现

synchronized是通过Java对象头中的monitor实现线程同步的,而monitor底层又是依赖操作系统的互斥锁实现的。

具体操作如下。

4.1.4 synchronized的锁优化

偏向锁 -> 轻量级锁 -> 重量级锁。

4.2 可见性

Java中可以通过synchronizedvolatile实现可见。

volatile的本质是告诉JVM当前工作内存中的值是不确定的,需要从主存中获取最新值。没有对变量加锁,因此volatile不保证原子性。

4.3 有序性

因为指令重排序的存在,JVM中执行的代码可能会乱序,因此Java内存模型具备一套先天的有序性原则,happens-before原则

5. 如何保证线程安全

5.1 单例模式

以上模式在单例模式中有详细解释,这里就不多说了。

5.2 不可变对象

满足条件

5.2.1 final关键字
5.2.2 不可变对象工具类

5.3 线程封闭

5.4 线程安全容器

5.4.1 同步容器

同步容器不严格保证安全,且加了synchronized修饰,性能较差,不建议生产使用

5.4.2 并发容器

并发容器性能较好,推荐使用

6. Java并发工具包(J.U.C)

6.1 AbstractQueuedSynchroizer(AQS)

AQS是J.U.C的核心。

提供了一个基于FIFO的队列,可以用于构建锁或者其他相关同步装置的基础框架。

6.1.1 AQS的数据结构

AQS队列

AQS中state标记值为1 时表示有线程占用,其他线程需要进入到同步队列等待。同步队列是一个双向链表。

当获得锁的线程需要等待某个条件时,会进入 condition 的等待队列。等待队列为单向列表且可以有多个。

当condition条件满足时,线程会从等待队列重新进入同步队列进行获取锁的竞争。

6.2 基于AQS的其他工具类

6.2.1 CountDownLatch

countdownlatch(图片来源于网络)

CountDownLatch 是通过一个计数器来实现的,计数器的初始化值为线程的数量。每当一个线程完成了自己的任务后,计数器的值就相应得减 1。当计数器到达 0 时,表示所有的线程都已完成任务,然后在闭锁上等待的线程就可以恢复执行任务。

6.2.2 Semaphore

Semaphore可以控制并发访问的线程个数。

Semaphore主要方法说明

6.2.3 CyclicBarrier

利用CyclicBarrier类可以实现一组线程相互等待,当所有线程都到达某个屏障点后再进行后续的操作

CyclicBarrier(图片来源于网络)

CyclicBarrier主要方法说明

6.2.4 ReentrantLock

ReentrantLock,可重入锁。

ReentrantLock与synchronized的区别

ReentrantLock独有功能

ReentrantLock使用

6.2.5 ReentrantReadWriteLock
6.2.6 StampedLock

StampedLock类,在JDK1.8时引入,是对读写锁ReentrantReadWriteLock的增强,该类提供了一些功能,优化了读锁、写锁的访问,同时使读写锁之间可以互相转换,更细粒度控制并发。

6.3 J.U.C拓展组件

6.3.1 FutureTask
6.3.2 Fork/Join

思想:将一个大的任务拆分成小的任务,然后多线程同时计算,提交计算效率。

demo

public class SumTask extends RecursiveTask<Integer> {

    public static final int threshold = 2;
    private int start;
    private int end;

    public SumTask(int start, int end) {
        this.start = start;
        this.end = end;
    }

    @Override
    protected Integer compute() {
        int sum = 0;

        //如果任务足够小就计算任务
        boolean canCompute = (end - start) <= threshold;
        if (canCompute) {
            for (int i = start; i <= end; i++) {
                sum += i;
            }
        } else {
            // 如果任务大于阈值,就分裂成两个子任务计算
            int middle = (start + end) / 2;
            SumTask leftTask = new SumTask(start, middle);
            SumTask rightTask = new SumTask(middle + 1, end);

            // 执行子任务
            leftTask.fork();
            rightTask.fork();

            // 等待任务执行结束合并其结果
            int leftResult = leftTask.join();
            int rightResult = rightTask.join();

            // 合并子任务
            sum = leftResult + rightResult;
        }
        return sum;
    }

    public static void main(String[] args) throws Exception {
        ForkJoinPool forkjoinPool = new ForkJoinPool();

        //生成一个计算任务,计算1+2+3+4
        SumTask task = new SumTask(1, 100);

        //执行一个任务
        Future<Integer> count = forkjoinPool.submit(task);
        System.out.println("count = "+count.get());
    }
}
6.3.3 BlockingQueue

BlockingQueue

阻塞队列的阻塞触发条件

特点

方法

操作类型抛出异常返回特殊值阻塞线程超时
插入add(e)offer(e)put(e)offer(e, time, unit)
删除remove()poll()take()poll(time, unit)
读取element()peek()//

实现类

7. 线程与线程池

7.1 线程几种状态

线程状态图

7.2 为什么使用线程池

7.2.1 直接new Thread的弊端
7.2.2 线程池的好处

7.3 线程池(ThreadPoolExecutor)

7.3.1 初始化参数
7.3.2 线程创建顺序

corePoolSize -> workQueue -> maximumPoolSize -> 拒绝

7.3.3 BlockingQueue(workQueue)

7.4 线程池的使用

7.4.1 线程池的状态
7.4.2 线程池的常用方法
7.4.3 线程池的监控方法

7.5 Executor框架接口

7.6 线程池合理配置