并发编程与高并发(一):线程安全性

概览

什么是线程安全

当多个线程访问某个类时,不管运行时环境采用何种调度方式或者这些进程将如何交替执行,并且在代码中不需要任何额外的同步或协同,这个类都能表现出正确的行为,那么这个类就是线程安全的。

线程安全三个特性

原子性:是指一次或者多次操作中,要么所有的操作全部都得到了执行并且不会受到任何因素的干扰而中断,要么全部回滚。

可见性:一个线程对主内存的修改可以及时的被其他线程观察到。

有序性:一个线程观察其他线程中的指令执行顺序,由于指令重排序的存在,该观察结果一般杂乱无序。

一、原子性

是指一次或者多次操作中,要么所有的操作全部都得到了执行并且不会受到任何因素的干扰而中断,要么全部回滚。

Atomic

先来看一段代码


import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Semaphore;

public class AtomicIntegerTestNotSafe {

    private static Integer count = 0;

    public static void main(String[] args) throws Exception {

        final Semaphore semaphore = new Semaphore(200);
        final CountDownLatch countDownLatch = new CountDownLatch(5000);

        ExecutorService executorService = Executors.newCachedThreadPool();
        for (int i = 0; i < 5000; i++) {
            executorService.execute(() -> {
                try {

                    semaphore.acquire();

                    count++;

                    semaphore.release();

                    countDownLatch.countDown();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }

            });
        }
        // 主线程等待
        countDownLatch.await();
        executorService.shutdown();
        System.out.println("count = " + count); // count = 4777
    }
}

理论上我们希望最后输出的 count 是5000,但是实际上输出的值不到5000。

这个类就是线程不安全的类,那么如何解决这个问题呢,使用Atomic包下面的类来避免出现这种问题。


import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicInteger;

public class AtomicIntegerTestSafe {

    private static AtomicInteger count = new AtomicInteger(0);

    public static void main(String[] args) throws Exception{

        final Semaphore semaphore = new Semaphore(200);
        final CountDownLatch countDownLatch = new CountDownLatch(5000);

        ExecutorService executorService = Executors.newCachedThreadPool();
        for (int i = 0; i < 5000; i++) {
            executorService.execute(() -> {
                try {

                    semaphore.acquire();

                    count.incrementAndGet();

                    semaphore.release();

                    countDownLatch.countDown();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }

            });
        }
        // 主线程等待
        countDownLatch.await();
        executorService.shutdown();
        System.out.println("count = " + count); // count = 5000
    }

}

通过源码来看看 AtomicInteger 是如何保证线程安全的

incrementAndGet()

    public final int incrementAndGet() {
        return unsafe.getAndAddInt(this, valueOffset, 1) + 1;
    }

getAndAddInt(Object var1, long var2, int var4)

    public final int getAndAddInt(Object var1, long var2, int var4) {
        int var5;
        do {
            var5 = this.getIntVolatile(var1, var2);
        // 重点在于 compareAndSwapInt 这个方法
        } while(!this.compareAndSwapInt(var1, var2, var5, var5 + var4));

        return var5;
    }

这个方法采用CAS来保证线程安全,那么什么是CAS呢,要了解CAS之前首先要讲解一下乐观锁和悲观锁

悲观锁的更新认为在更新数据的时候大概率会有其他线程争夺共享资源,所以悲观锁的做法是第一个获取资源的线程会将资源锁定起来,其他没竞争到资源的线程只能进入阻塞队列,等第一个获取资源的线程释放锁之后,这些线程才能有机会重新争夺资源,悲观锁的典型实现就是 synchronized。synchronized 操作起来简单,但是会使没抢到资源的线程进入阻塞状态,线程在阻塞状态和就绪状态之间的切换效率较低,可能会出现线程状态间的切换比更新操作还要耗时的场景。

乐观锁的更新认为:在更新数据的时候其他线程争抢这个共享变量的概率比较小,所以更新数据的时候不会对共享数据加锁。但是在正式更新数据之前会检查数据是否被其他线程改变过,如果未被其他线程改变过就将共享变量更新成最新值,如果发现共享变量已经被其他线程更新过了,就重新尝试,直到成功为止。乐观锁的典型实现就是CAS。

CAS中有三个核心的参数:

主内存中存放的共享变量的值V(一般情况这个V是内存的地址,下图直接写成地址中的值)

工作内存中共享变量的副本值A,也叫做预期值

共享变量的新值B

下图说明两个线程同时为 count = 10 进行 + 1 操作

CAS会造成三个问题

1.长时间自旋导致系统开销过大

2.只能保证一个共享变量的原子操作(JDK的 AtomicReference 来保证应用对象之间的原子性,可以把多个变量放在一个对象里来进行CAS操作,解决了这一问题)

3.著名的ABA问题

什么是ABA问题?

什么意思呢?

在你非常渴的情况下你发现一个盛满水的杯子,你一饮而尽。之后再给杯子里重新倒满水。然后你离开,当杯子的真正主人回来时看到杯子还是盛满水,他当然不知道是否被人喝完重新倒满。

下图示例线程3对 count = 10 进行 + 1 操作

但是进行到一半的时候 线程1 对10 + 1成功 count = 11

线程2对 count = 11 进行 -1 操作 成功 count = 10

对于 AtomicInteger 等类,没有什么可修改的属性,我们只在意结果值,所以就算发生了ABA问题也不会造成什么影响。

但是对于其他场景,在ABA中丢失的是时间,以及时间带来的业务含义。

LongAdder

LongAdderAtomicInteger 类似不同的是 LongAdder 中会维护一个或多个变量,当多线程同时add值时,为了减少竞争,可能会动态地增加这组变量的数量,返回这组变量的和。

在并发比较小的时候 AtomicIntegerLongAdder 效率基本相似

在并发比较大的时候 LongAdder 的效率会明显提升

AtomicBoolean

booleanBoolean 都是线程不安全的,在并发的情况下会产生问题

AtomicBoolean 底层也是采用了CAS来保证了线程安全

使用Boolean导致线程不安全的写法,实际上进行了很多条的sout打印


import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Semaphore;

public class AtomicBooleanTestNotSafe {
    private static Boolean flag = Boolean.TRUE;

    public static void main(String[] args) throws Exception {
        final Semaphore semaphore = new Semaphore(200);
        final CountDownLatch countDownLatch = new CountDownLatch(5000);

        ExecutorService executorService = Executors.newCachedThreadPool();
        for (int i = 0; i < 5000; i++) {
            executorService.execute(() -> {
                try {
                    semaphore.acquire();

                    if (flag) {
                        // 这里的代码只执行一次
                        System.out.println("这里的代码只执行一次");
                        flag = false;
                    }
                    semaphore.release();
                    countDownLatch.countDown();

                } catch (Exception e) {
                    e.printStackTrace();
                }
            });
        }
        countDownLatch.await();
        executorService.shutdown();
    }
}

下面这个例子说明用 AtomicBoolean 来确保代码在多线程的情况下只会执行一次

import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Semaphore;
import java.util.concurrent.atomic.AtomicBoolean;


public class AtomicBooleanTestSafe {
    private static AtomicBoolean flag = new AtomicBoolean(true);

    public static void main(String[] args) throws Exception {
        final Semaphore semaphore = new Semaphore(200);
        final CountDownLatch countDownLatch = new CountDownLatch(5000);

        ExecutorService executorService = Executors.newCachedThreadPool();
        for (int i = 0; i < 5000; i++) {
            executorService.execute(() -> {
                try {
                    semaphore.acquire();

                    if (flag.compareAndSet(true, false)) {
                        // 这里的代码只执行一次
                        System.out.println("这里的代码只执行一次");
                    }
                    semaphore.release();
                    countDownLatch.countDown();
                } catch (Exception e) {
                    e.printStackTrace();
                }
            });
        }
        countDownLatch.await();
        executorService.shutdown();
    }
}

AtomicReference

AtomicReferenceAtomicInteger 非常类似,不同之处就在于 AtomicInteger 是对整数的封装,而 AtomicReference 则对应普通的对象引用。可以保证在修改对象引用时的安全性。

import java.util.concurrent.atomic.AtomicReference;

public class AtomicReferenceTestNotSafeAndSafe {

    public static void main(String[] args) {
        Person p1 = new Person("p1");
        Person p2 = new Person("p2");

        AtomicReference<Person> atomicReference = new AtomicReference<>(p1);
        atomicReference.compareAndSet(p1, p2);
        // p1对象的引用没有改变,所以可以更新成功
        System.out.println(atomicReference.get().getName());


        Person pp1 = new Person("pp1");
        Person pp2 = new Person("pp2");

        AtomicReference<Person> atomicReference2 = new AtomicReference<>(pp1);
        pp1 = new Person("new person");
        atomicReference2.compareAndSet(pp1, pp2);
        // pp1对象的引用已经改变,所以没有更新成功
        System.out.println(atomicReference2.get().getName());
    }
    static class Person {
        private String name;

        public Person(String name) {
            this.name = name;
        }
        public String getName() {
            return name;
        }
    }
}

AtomicIntegerFieldUpdater

AtomicInteger 在并发情况下只能保证静态变量的线程安全,并不能保证一个对象中number类型的线程安全

AtomicIntegerFieldUpdater用来在并发的情况下对一个对象中的number类型进行更新操作

number类型的字段必须用 volatile 修饰并且不能被 static 修饰

import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;

public class AtomicIntegerFieldUpdaterTestSafe {

    static final AtomicIntegerFieldUpdater<User> atomicIntegerFieldUpdater = AtomicIntegerFieldUpdater.newUpdater(User.class, "age");

    public static void main(String[] args) {
        User u1 = new User("Jack", 10);

        if (atomicIntegerFieldUpdater.compareAndSet(u1, 10, 20)) {
            System.out.println("执行1");
        }
        if (atomicIntegerFieldUpdater.compareAndSet(u1, 10, 30)) {
            System.out.println("执行2");
        }
        System.out.println("u1.age = " + u1.getAge());
    }
    static class User {
        public String name;
        public volatile int age;

        public User(String name, int age) {
            this.name = name;
            this.age = age;
        }

        public int getAge() {
            return age;
        }
    }

}

AtomicStampReference

是通过增加版本号来解决CAS的ABA问题,用法和其他Atomic类相似

下面是AtomicStampReference的核心方法,里面多了一个stamp的比较,这个stamp也就是版本号

    public boolean compareAndSet(V expectedReference, V newReference, int expectedStamp, int newStamp) {
        
        AtomicStampedReference.Pair<V> current = pair;
        
        return expectedReference == current.reference 
                && expectedStamp == current.stamp 
                && ((newReference == current.reference 
                && newStamp == current.stamp) || casPair(current, AtomicStampedReference.Pair.of(newReference, newStamp)));
    }

AtomicLongArray

AtomicLongArrayAtomicInteger 类似,api只是多了一个索引值用来更新数组中的元素

    public final boolean compareAndSet(int i, long expect, long update) {
        return compareAndSetRaw(checkedByteOffset(i), expect, update);
    }

Synchronized

Synchronized 是Java语言的关键字,由JVM来实现的

Synchronized 在线程发生异常时会自动释放锁,不会发生异常死锁

当调用一个父类有 synchorinzed 修饰的方法时,子类的这个方法并不会被 synchronized 修饰

synchronized作用范围与作用对象
作用范围作用对象
修饰代码块大括号括起来的代码,作用于调用的对象
修饰实例方法整个方法,作用于调用的对象
修饰静态方法整个静态方法,作用于所有对象
修饰类括号括起来的部分,作用于所有对象

修饰代码块和修饰实例方法

import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

/**
 *  修饰代码块
 */
public class SynchronizedTest1 {
    public static void main(String[] args) throws InterruptedException {

        SynchronizedTest1 s1 = new SynchronizedTest1();
        ExecutorService threadPool1 = Executors.newCachedThreadPool();
        threadPool1.execute(s1::codeBlock);
        threadPool1.execute(s1::codeBlock);
        threadPool1.shutdown();

        Thread.sleep(1000);
        System.out.println("------------------------这是一个分割线------------------------");

        SynchronizedTest1 s2 = new SynchronizedTest1();
        ExecutorService threadPool2 = Executors.newCachedThreadPool();
        threadPool2.execute(s2 ::objectMethod);
        threadPool2.execute(s2 ::objectMethod);
        threadPool2.shutdown();
    }

    /**
     * 修饰一个代码块,作用于调用对象
     */
    public void codeBlock() {
        synchronized (this){
            for (int i = 0; i < 10; i++) {
                System.out.println(Thread.currentThread().getName() + "-codeBlock-" + i + "  ");
            }
            System.out.println();
        }
    }

    /**
     * 修饰实例方法,作用于调用对象
     */
    public synchronized void objectMethod() {
        for (int i = 0; i < 10; i++) {
            System.out.println(Thread.currentThread().getName() + "-objectMethod-" + i + "  ");
        }
        System.out.println();
    }

}

修饰静态方法和修饰类

import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

public class SynchronizedTest2 {
    public static void main(String[] args) throws InterruptedException {
        SynchronizedTest2 s1 = new SynchronizedTest2();
        SynchronizedTest2 s2 = new SynchronizedTest2();


        ExecutorService threadPool1 = Executors.newCachedThreadPool();
        threadPool1.execute(s1::codeBlock);
        threadPool1.execute(s2::codeBlock);
        threadPool1.shutdown();



        ExecutorService threadPool2 = Executors.newCachedThreadPool();
        threadPool2.execute(() -> s1.objectMethod());
        threadPool2.execute(() -> s2.objectMethod());
        threadPool2.shutdown();


    }

    /**
     * 修饰静态方法,作用于所有对象
     */
    public static   synchronized void objectMethod() {
        for (int i = 0; i < 10; i++) {
            System.out.println(Thread.currentThread().getName() + "-objectMethod-" + i + "  ");
        }
        System.out.println();
    }

    /**
     * 修饰一个类,作用于所有对象
     */
    public void codeBlock() {
        synchronized (SynchronizedTest2.class){
            for (int i = 0; i < 10; i++) {
                System.out.println(Thread.currentThread().getName() + "-codeBlock-" + i + "  ");
            }
            System.out.println();
        }
    }

}

Lock

Lock是一个接口,依赖特殊的CPU指令,是JDK实现的

Lock在线程发生异常时不会自动释放锁,需要在finally中实现释放锁

原子性对比

synchronized不可中断锁,适合竞争不激烈,可读性好
Lock可中断锁,多样化同步,竞争激烈时能维持常态
Atomic竞争激烈时能维持常态,比Lock性能好;只能同步一个值

二、可见性

一个线程对主内存的修改可以及时的被其他线程观察到。

共享变量在线程间不可见的原因

1.线程交叉执行

2.重排序结合线程交叉执行

3.共享变量更新后的值没有在工作内存与主存间及时更新

可见性之synchronized

JMM关于synchronized的两条规定

线程解锁前,必须把共享变量的最新值刷新到主内存

线程加锁时,将清空工作内存中共享变量的值,从而使用共享变量时需要从主内存中重新读取最新的值(注意,加锁与解锁是同一把锁)。

可见性之volatile

对volatile变量写操作时,会在写操作后加入一条store屏障指令,将本地内存中的共享变量值刷新到主内存

对volatile变量读操作时,会在读操作前加入一条load屏障指令,从主内存中读取共享变量

三、有序性

Java内存模型中,允许编译器和处理器对指令进行重排序,但是重排序过程不会影响到单线程程序的执行,却会影响到多线程并发执行的正确性 

happens-before原则

程序次序规则:一个线程内,按照代码顺序,书写在前面的操作先行于发生于书写在后面的操作

锁定规则:一个unLock现行发生于后面对同一个锁的lock操作

volatile变量规则:对一个变量的写操作先行发生于后面对这个变量的读操作

传递规则:如果操作A现行发生于操作B,而操作B又先行发生于操作C,则可以得出操作A先行发生于操作C

线程启动规则:Thread对象的start()方法先行发生于此线程的每一个动作

线程中断规则:对线程interrupt()方法的调用先行发生于被中断线程的代码检测到中断事件的发生

线程终结规则:线程中所有的操作都限行发生于线程的终止检测,我们可以通过Thread.join()方法结束、Thread.isAlive()的返回值手段检测到线程已经终止执行

对象终结规则:一个对象的初始化完成先行发生于他的finalize()方法的开始

如果两个操作的执行次序无法从happens-before原则中推导出来,那就无法保证有序性,虚拟机就可以随意的进行重排序。


版权声明:本文为qq_42163058原创文章,遵循CC 4.0 BY-SA版权协议,转载请附上原文出处链接和本声明。