# 并发安全

loading

# 一、什么是线程安全性

当多个线程访问某个类时,不管运行时环境采用何种调度方式或者这些线程将如何交替执行,并且在调用代码中不需要任何额外的同步或者协同,这个类都能表现出正确的行为,那么就称这个类是线程安全的。

# 1、线程封闭

实现好的并发是一件困难的事情,所以很多时候我们都想躲避并发。避免并发最简单的方法就是线程封闭。什么是线程封闭呢?

就是把对象封装到一个线程里,只有这一个线程能看到此对象。那么这个对象就算不是线程安全的也不会出现任何安全问题。实现线程封闭有哪些方法呢?

# (1) ad-hoc 线程封闭

这是完全靠实现者控制的线程封闭,他的线程封闭完全靠实现者实现。Ad-hoc 线程封闭非常脆弱,应该尽量避免使用。

# (2) 栈封闭

栈封闭是我们编程当中遇到的最多的线程封闭。什么是栈封闭呢?简单的说就是局部变量。多个线程访问一个方法,此方法中的局部变量都会被拷贝一份到线程栈中。所以局部变量是不被多个线程所共享的,也就不会出现并发问题。所以能用局部变量就别用全局的变量,全局变量容易引起并发问题。

# 2、无状态的类

没有任何成员变量的类,就叫无状态的类,这种类一定是线程安全的。例如:

public class StatelessClass {

    public int service(int a, int b) {
        return a + b;
    }

    public void serviceUser(UserVo user) {
        // do sth user
    }

}
1
2
3
4
5
6
7
8
9
10
11
public class UserVo {

    private int age;

    public int getAge() {
        return age;
    }

    public void setAge(int age) {
        this.age = age;
    }
}
1
2
3
4
5
6
7
8
9
10
11
12

在这个类中使用了对象 UserVo,那么它还是线程安全的吗?当然还是。因为多线程下,固然 user 这个对象的实例会不正常,但是对于 StatelessClass 这个类的对象实例来说,它并不持有 UserVo 的对象实例,它自己并不会有问题,有问题的是 UserVo 这个类,而非 StatelessClass 本身。

# 3、让类不可变

让状态不可变,有两种方式:

  1. final 关键字,对于一个类,所有的成员变量应该是私有的,同样的只要有可能,所有的成员变量应该加上 final 关键字,但是加上 final,要注意如果成员变量又是一个对象时,这个对象所对应的类也要是不可变,才能保证整个类是不可变的。
public class ImmutableClass {

    private final int a;

    public ImmutableClass(int a) {
        this.a = a;
    }

    public int getA() {
        return a;
    }

    public static class User {
        private int age;

        public int getAge() {
            return age;
        }

        public void setAge(int age) {
            this.age = age;
        }
    }
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
  1. 根本就不提供任何可供修改成员变量的地方,同时成员变量也不作为方法的返回值。
public class ImmutableClassTwo {

    private List<Integer> list = new ArrayList<>(3);

    public ImmutableClassTwo() {
        list.add(1);
        list.add(2);
        list.add(3);
    }

    public boolean isContain(int i) {
        return list.contains(i);
    }
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14

但是要注意,一旦类的成员变量中有对象,上述的 final 关键字保证不可变并不能保证类的安全性,为什么呢?因为在多线程下,虽然对象的引用不可变,但是对象在堆上的实例是有可能被多个线程同时修改的,没有正确处理的情况下,对象实例在堆中的数据是不可预知的。这就牵涉到了如何安全的发布对象这个问题。如下:

public class ImmutableClass {

    private final int a;

    /**
     * user 不安全
     */
    private final UserVo user = new UserVo();

    public ImmutableClass(int a) {
        this.a = a;
    }

    public int getA() {
        return a;
    }

    public UserVo getUser() {
        return user;
    }

    public static class User {
        private int age;

        public int getAge() {
            return age;
        }

        public void setAge(int age) {
            this.age = age;
        }
    }
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33

# volatile

volatile 并不能保证类的线程安全性,只能保证类的可见性,最适合一个线程写,多个线程读的情景。

# 4、加锁和 CAS

我们最常使用的保证线程安全的手段,使用 synchronized 关键字,使用显式锁,使用各种原子变量,修改数据时使用 CAS 机制等等。

# 5、安全的发布

类中持有的成员变量,如果是基本类型,发布出去,并没有关系,因为发布出去的其实是这个变量的一个副本,例如:

public class SafePublish {

    private int i;

    public SafePublish() {
        i = 2;
    }

    public int getI() {
        return i;
    }

    public static void main(String[] args) {
        SafePublish safePublish = new SafePublish();
        int j = safePublish.getI();
        System.out.println("before set,  j = " + j);
        j = 3;
        System.out.println("after set, j = " + j);
        System.out.println("getI = " + safePublish.getI());
    }
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21

执行结果:

before set,  j = 2
after set, j = 3
getI = 2
1
2
3

但是如果类中持有的成员变量是对象的引用,如果这个成员对象不是线程安全的,通过 get() 等方法发布出去,会造成这个成员对象本身持有的数据在多线程下不正确的修改,从而造成整个类线程不安全的问题。例如:

public class UnSafePublish {

    private List<Integer> list = new ArrayList<>(3);

    public UnSafePublish() {
        list.add(1);
        list.add(2);
        list.add(3);
    }

    public List<Integer> getList() {
        return list;
    }

    public static void main(String[] args) {
        UnSafePublish unSafePublish = new UnSafePublish();
        List<Integer> list = unSafePublish.getList();
        System.out.println("before add, list = " + list);
        list.add(4);
        System.out.println("after add, list = " + list);
        System.out.println("getList = " + unSafePublish.getList());
    }
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23

执行结果:

before add, list = [1, 2, 3]
after add, list = [1, 2, 3, 4]
getList = [1, 2, 3, 4]
1
2
3

可以看出,这个 list 发布出去后,是可以被外部线程之间修改,那么在多个线程同时修改的情况下不安全问题是肯定存在的,怎么修正这个问题呢?我们在发布这对象出去的时候,就应该用线程安全的方式包装这个对象。如下:

public class SafePublishTwo {

    private List<Integer> list = Collections.synchronizedList(new ArrayList<>(3));

    public SafePublishTwo() {
        list.add(1);
        list.add(2);
        list.add(3);
    }

    public List<Integer> getList() {
        return list;
    }

    public static void main(String[] args) {
        SafePublishTwo safePublishTwo = new SafePublishTwo();
        List<Integer> list = safePublishTwo.getList();
        System.out.println("before add, list = " + list);
        list.add(4);
        System.out.println("after add, list = " + list);
        System.out.println("getList = " + safePublishTwo.getList());
    }
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23

执行结果:

before add, list = [1, 2, 3]
after add, list = [1, 2, 3, 4]
getList = [1, 2, 3, 4]
1
2
3

虽然执行结果没有什么变化,但是将 listCollections.synchronizedList 进行包装以后,无论多少线程使用这个 list,就都是线程安全的了。

对于我们自己使用或者声明的类,jdk 自然没有提供这种包装类的办法,但是我们可以仿造这种模式或者委托给线程安全的类,当然,对这种通过 get() 等方法发布出去的对象,最根本的解决办法还是应该在实现上就考虑到线程安全问题,例如:

public class SafePublicUser {

    private final UserVo user;

    public UserVo getUser() {
        return user;
    }

    public SafePublicUser(UserVo user) {
        this.user = new SyncUser(user);
    }

    private static class SyncUser extends UserVo {

        private final UserVo userVo;
        private final Object lock = new Object();

        public SyncUser(UserVo userVo) {
            this.userVo = userVo;
        }

        @Override
        public int getAge() {
            synchronized (lock) {
                return userVo.getAge();
            }
        }

        @Override
        public void setAge(int age) {
            synchronized (lock) {
                userVo.setAge(age);
            }
        }
    }
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36

SyncUser 继承 UserVo,在实际上发布的是 SyncUser 对象,在 SyncUser 内部使用 synchronized 加锁进行了安全控制。

如果 UserVofinal 的无法继承时怎么办呢,例如:

public final class FinalUserVo {

    private int age;

    public int getAge() {
        return age;
    }

    public void setAge(int age) {
        this.age = age;
    }
}
1
2
3
4
5
6
7
8
9
10
11
12

这时就可以使用代理的方式:

public class SafePublicFinalUser {

    private final SyncFinalUser user;

    public SyncFinalUser getUser() {
        return user;
    }

    public SafePublicFinalUser(FinalUserVo user) {
        this.user = new SyncFinalUser(user);
    }

    public static class SyncFinalUser {

        private final FinalUserVo userVo;
        private final Object lock = new Object();

        public SyncFinalUser(FinalUserVo userVo) {
            this.userVo = userVo;
        }

        public int getAge() {
            synchronized (lock) {
                return userVo.getAge();
            }
        }

        public void setAge(int age) {
            synchronized (lock) {
                userVo.setAge(age);
            }
        }
    }
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34

SyncFinalUser 内部持有一个 FinalUserVo 的引用,实际发布出去的还是安全的 SyncFinalUser 对象。

# 6、TheadLocal

ThreadLocal 是实现线程封闭的最好方法。ThreadLocal 内部维护了一个 MapMapkey 是每个线程的名称,而 Map 的值就是我们要封闭的对象。每个线程中的对象都对应着 Map 中一个值,也就是 ThreadLocal 利用 Map 实现了对象的线程封闭。

# 7、Servlet 辨析

Servlet 不是线程安全的类,为什么我们平时没感觉到呢,因为

  1. 在需求上,很少有共享的需求
  2. 接收到了请求,返回应答的时候,一般都是由一个线程来负责的。但是只要 Servlet 中有成员变量,一旦有多线程下的写,就很容易产生线程安全问题。

# 二、死锁

# 1、概念

是指两个或两个以上的进程在执行过程中,由于竞争资源或者由于彼此通信而造成的一种阻塞的现象,若无外力作用,它们都将无法推进下去。此时称系统处于死锁状态或系统产生了死锁。

死锁是必然发生在多操作者(M >= 2 个)情况下,争夺多个资源(N >= 2 个,且 N <= M)才会发生这种情况。很明显,单线程自然不会有死锁,同时,死锁还有一个重要的要求,争夺资源的顺序不对,如果争夺资源的顺序是一样的,也不会产生死锁。

# 学术化的定义

死锁的发生必须具备以下四个必要条件:

  1. 互斥条件

指进程对所分配到的资源进行排它性使用,即在一段时间内某资源只由一个进程占用。如果此时还有其它进程请求资源,则请求者只能等待,直至占有资源的进程用毕释放。

  1. 请求和保持条件

指进程已经保持至少一个资源,但又提出了新的资源请求,而该资源已被其它进程占有,此时请求进程阻塞,但又对自己已获得的其它资源保持不放。

3)不剥夺条件

指进程已获得的资源,在未使用完之前,不能被剥夺,只能在使用完时由自己释放。

4)环路等待条件

指在发生死锁时,必然存在一个进程——资源的环形链,即进程集合 {P0,P1,P2,···,Pn} 中的 P0 正在等待一个 P1 占用的资源;P1 正在等待 P2 占用的资源…… Pn 正在等待已被 P0 占用的资源。

理解了死锁的原因,尤其是产生死锁的四个必要条件,就可以最大可能地避免、预防和解除死锁。只要打破四个必要条件之一就能有效预防死锁的发生。打破互斥条件:改造独占性资源为虚拟资源,大部分资源已无法改造。打破不可抢占条件:当一进程占有一独占性资源后又申请一独占性资源而无法满足,则退出原占有的资源。打破占有且申请条件:采用资源预先分配策略,即进程运行前申请全部资源,满足则运行,不然就等待,这样就不会占有且申请。打破循环等待条件:实现资源有序分配策略,对所有设备实现分类编号,所有进程只能采用按序号递增的形式申请资源。

避免死锁常见的算法有有序资源分配法、银行家算法。

# 2、现象、危害和解决

数据库里多事务而且要同时操作多个表的情况下会产生死锁的情况。所以数据库设计的时候就考虑到了检测死锁和从死锁中恢复的机制。比如 oracle 提供了检测和处理死锁的语句,而 mysql 也提供了“循环依赖检测的机制。

# (1) 现象

# 简单顺序死锁

下面看一个死锁的例子:

public class NormalDeadLock {

    /**
     * 第一个锁
     */
    private static Object valueFirst = new Object();

    /**
     * 第二个锁
     */
    private static Object valueSecond = new Object();

    /**
     * 先拿第一个锁,再拿第二个锁
     *
     * @throws InterruptedException
     */
    private static void firstToSecond() throws InterruptedException {
        String threadName = Thread.currentThread().getName();
        synchronized (valueFirst) {
            System.out.println(threadName + " get 1st");
            Thread.sleep(100);
            synchronized (valueSecond) {
                System.out.println(threadName + " get 2nd");
            }
        }
    }

    /**
     * 先拿第二个锁,再拿第一个锁
     *
     * @throws InterruptedException
     */
    private static void SecondToFirst() throws InterruptedException {
        String threadName = Thread.currentThread().getName();
        synchronized (valueSecond) {
            System.out.println(threadName + " get 2nd");
            Thread.sleep(100);
            synchronized (valueFirst) {
                System.out.println(threadName + " get 1st");
            }
        }
    }

    private static class TestThread extends Thread {

        private String name;

        public TestThread(String name) {
            this.name = name;
        }

        public void run() {
            Thread.currentThread().setName(name);
            try {
                SecondToFirst();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }

    public static void main(String[] args) {
        Thread.currentThread().setName("TestDeadLock");
        TestThread testThread = new TestThread("SubTestThread");
        testThread.start();
        try {
            firstToSecond();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73

运行结果:

TestDeadLock get 1st
SubTestThread get 2nd
1
2

两个进程互相等待。要想解决死锁的问题,只需要让两个线程拿锁的顺序相同即可:

public class NormalDeadLock {

    /**
     * 第一个锁
     */
    private static Object valueFirst = new Object();

    /**
     * 第二个锁
     */
    private static Object valueSecond = new Object();

    /**
     * 先拿第一个锁,再拿第二个锁
     *
     * @throws InterruptedException
     */
    private static void firstToSecond() throws InterruptedException {
        String threadName = Thread.currentThread().getName();
        synchronized (valueFirst) {
            System.out.println(threadName + " get 1st");
            Thread.sleep(100);
            synchronized (valueSecond) {
                System.out.println(threadName + " get 2nd");
            }
        }
    }

    /**
     * 先拿第二个锁,再拿第一个锁
     *
     * @throws InterruptedException
     */
    private static void SecondToFirst() throws InterruptedException {
        String threadName = Thread.currentThread().getName();
        synchronized (valueFirst) {
            System.out.println(threadName + " get 1st");
            Thread.sleep(100);
            synchronized (valueSecond) {
                System.out.println(threadName + " get 2nd");
            }
        }
    }

    private static class TestThread extends Thread {

        private String name;

        public TestThread(String name) {
            this.name = name;
        }

        public void run() {
            Thread.currentThread().setName(name);
            try {
                SecondToFirst();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }

    public static void main(String[] args) {
        Thread.currentThread().setName("TestDeadLock");
        TestThread testThread = new TestThread("SubTestThread");
        testThread.start();
        try {
            firstToSecond();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73

运行结果:

TestDeadLock get 1st
TestDeadLock get 2nd
SubTestThread get 1st
SubTestThread get 2nd
1
2
3
4
# 动态顺序死锁

模拟一个转账的业务,用户账户类:

public class UserAccount {

    /**
     * 账户名称
     */
    private final String name;

    /**
     * 账户余额l
     */
    private int money;

    private final Lock lock = new ReentrantLock();

    public Lock getLock() {
        return lock;
    }

    public UserAccount(String name, int amount) {
        this.name = name;
        this.money = amount;
    }

    public String getName() {
        return name;
    }

    public int getAmount() {
        return money;
    }

    @Override
    public String toString() {
        return "UserAccount{" +
                "name='" + name + '\'' +
                ", money=" + money +
                '}';
    }

    /**
     * 转入资金
     *
     * @param amount
     */
    public void addMoney(int amount) {
        money = money + amount;
    }

    /**
     * 转出资金
     *
     * @param amount
     */
    public void flyMoney(int amount) {
        money = money - amount;
    }
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57

转账接口:

public interface ITransfer {

    void transfer(UserAccount from, UserAccount to, int amount) throws InterruptedException;
}
1
2
3
4

先来看第一种实现:

public class TransferAccount implements ITransfer {

    @Override
    public void transfer(UserAccount from, UserAccount to, int amount) throws InterruptedException {
        synchronized (from) {
            System.out.println(Thread.currentThread().getName() + " get " + from.getName());
            Thread.sleep(100);
            synchronized (to) {
                System.out.println(Thread.currentThread().getName() + " get " + to.getName());
                from.flyMoney(amount);
                to.addMoney(amount);
            }
        }
    }
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15

业务逻辑:

public class PayCompany {

    /**
     * 执行转账动作的线程
     */
    private static class TransferThread extends Thread {

        private String name;
        private UserAccount from;
        private UserAccount to;
        private int amount;
        private ITransfer transfer;

        public TransferThread(String name, UserAccount from, UserAccount to, int amount, ITransfer transfer) {
            this.name = name;
            this.from = from;
            this.to = to;
            this.amount = amount;
            this.transfer = transfer;
        }

        public void run() {
            Thread.currentThread().setName(name);
            try {
                transfer.transfer(from, to, amount);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }

    }

    public static void main(String[] args) {
        UserAccount zhangsan = new UserAccount("zhangsan", 20000);
        UserAccount lisi = new UserAccount("lisi", 20000);
        ITransfer transfer = new TransferAccount();

        TransferThread zhangsanToLisi = new TransferThread("zhangsanToLisi",
                zhangsan, lisi, 2000, transfer);
        TransferThread lisiToZhangsan = new TransferThread("lisiToZhangsan",
                lisi, zhangsan, 4000, transfer);

        zhangsanToLisi.start();
        lisiToZhangsan.start();
    }
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46

执行结果:

zhangsanToLisi get zhangsan
lisiToZhangsan get lisi
1
2

产生了死锁,虽然在实现中固定了获取锁的先后顺序,但是在调用方法的时候,由于两个参数的顺序不同造成了获取锁的顺序不同,进而造成了死锁。

下面看第一种解决方案:

public class SafeOperation implements ITransfer {

    /**
     * 第三把锁
     */
    private static Object tieLock = new Object();

    @Override
    public void transfer(UserAccount from, UserAccount to, int amount) throws InterruptedException {
        int fromHash = System.identityHashCode(from);
        int toHash = System.identityHashCode(to);

        if (fromHash < toHash) {
            synchronized (from) {
                System.out.println(Thread.currentThread().getName() + " get " + from.getName());
                Thread.sleep(100);
                synchronized (to) {
                    System.out.println(Thread.currentThread().getName() + " get " + to.getName());
                    from.flyMoney(amount);
                    to.addMoney(amount);
                    System.out.println(from);
                    System.out.println(to);
                }
            }
        } else if (toHash < fromHash) {
            synchronized (to) {
                System.out.println(Thread.currentThread().getName() + " get " + to.getName());
                Thread.sleep(100);
                synchronized (from) {
                    System.out.println(Thread.currentThread().getName() + " get " + from.getName());
                    from.flyMoney(amount);
                    to.addMoney(amount);
                    System.out.println(from);
                    System.out.println(to);
                }
            }
        } else {
            synchronized (tieLock) {
                synchronized (from) {
                    synchronized (to) {
                        from.flyMoney(amount);
                        to.addMoney(amount);
                    }
                }
            }
        }
    }
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48

PayCompany 使用 SafeOperation 这个实现后的执行结果:

zhangsanToLisi get zhangsan
zhangsanToLisi get lisi
UserAccount{name='zhangsan', money=18000}
UserAccount{name='lisi', money=22000}
lisiToZhangsan get zhangsan
lisiToZhangsan get lisi
UserAccount{name='lisi', money=18000}
UserAccount{name='zhangsan', money=22000}
1
2
3
4
5
6
7
8

通过对比两个锁的 hash 值后来决定先后顺序而解决了死锁问题。

第二种解决方案:

public class SafeOperateTwo implements ITransfer {

    @Override
    public void transfer(UserAccount from, UserAccount to, int amount) throws InterruptedException {
        Random r = new Random();
        while (true) {
            if (from.getLock().tryLock()) {
                System.out.println(Thread.currentThread().getName() + " get " + from.getName());
                try {
                    if (to.getLock().tryLock()) {
                        try {
                            System.out.println(Thread.currentThread().getName() + " get " + to.getName());
                            from.flyMoney(amount);
                            to.addMoney(amount);
                            System.out.println(from);
                            System.out.println(to);
                            break;
                        } finally {
                            to.getLock().unlock();
                        }
                    }
                } finally {
                    from.getLock().unlock();
                }

            }
            // 使用休眠可以错开两个线程获取锁的时间
            Thread.sleep(r.nextInt(2));
        }
    }
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31

PayCompany 使用 SafeOperationTwo 这个实现后的执行结果:

lisiToZhangsan get lisi
zhangsanToLisi get zhangsan
lisiToZhangsan get lisi
zhangsanToLisi get zhangsan
zhangsanToLisi get zhangsan
lisiToZhangsan get lisi
lisiToZhangsan get zhangsan
UserAccount{name='lisi', money=16000}
UserAccount{name='zhangsan', money=24000}
zhangsanToLisi get zhangsan
zhangsanToLisi get lisi
UserAccount{name='zhangsan', money=22000}
UserAccount{name='lisi', money=18000}
1
2
3
4
5
6
7
8
9
10
11
12
13

这里使用了每个账户类中的 Lock 锁,并且通过 tryLock() 方法尝试性获取锁,当第一把锁获取成功后立刻 tryLock() 第二把锁,当两个锁都获取到后进行业务逻辑,当任意一个锁没有获取成功的时候就会进入到 finally 中释放锁。

通过一个短暂的随机休眠可以错开多个线程获取锁的时间。

# (2) 危害

  1. 线程不工作了,但是整个程序还是活着的
  2. 没有任何的异常信息可以供我们检查。
  3. 一旦程序发生了发生了死锁,是没有任何的办法恢复的,只能重启程序,对生产平台的程序来说,这是个很严重的问题。
# 实际工作中的死锁

时间不定,不是每次必现;一旦出现没有任何异常信息,只知道这个应用的所有业务越来越慢,最后停止服务,无法确定是哪个具体业务导致的问题;测试部门也无法复现,并发量不够。

# (3) 解决

  1. 定位

通过 java 安装好的 bin 目录中的工具 jps,使用 jps -v 就可以查询正在运行的 java 应用的 id

jps -v

再通过 jstack 10440 查看应用的锁持有情况:

jstack id

  1. 修正

关键是保证拿锁的顺序一致,有两种解决方式:

1、内部通过顺序比较,确定拿锁的顺序;

2、采用尝试拿锁的机制。

# 三、其它安全问题

# 1、活锁

两个线程在尝试拿锁的机制中,发生多个线程之间互相谦让,不断发生同一个线程总是拿到同一把锁,在尝试拿另一把锁时因为拿不到,而将本来已经持有的锁释放的过程。例如上面写的 SafeOperateTwo 类如果不错开线程拿锁的时间的话,有可能会产生活锁。

解决办法:每个线程休眠随机数,错开拿锁的时间。

# 2、线程饥饿

低优先级的线程,总是拿不到执行时间。

# 四、并发下的性能

使用并发的目标是为了提高性能,引入多线程后,其实会引入额外的开销,如线程之间的协调、增加的上下文切换,线程的创建和销毁,线程的调度等等。过度的使用和不恰当的使用,会导致多线程程序甚至比单线程还要低。

衡量应用的程序的性能:服务时间,延迟时间,吞吐量,可伸缩性等等,其中服务时间,延迟时间(多快),吞吐量(处理能力的指标,完成工作的多少)。多快和多少,完全独立,甚至是相互矛盾的。

对服务器应用来说:多少(可伸缩性,吞吐量)这个方面比多快更受重视。我们做应用的时候:

  1. 先保证程序正确,确实达不到要求的时候,再提高速度。(黄金原则)
  2. 一定要以测试为基准。

# 1、线程引入的开销

# (1) 上下文切换

如果主线程是唯一的线程,那么它基本上不会被调度出去。另一方面,如果可运行的线程数大于 CPU 的数量,那么操作系统最终会将某个正在运行的线程调度出来,从而使其他线程能够使用 CPU。这将导致一次上下文切换,在这个过程中将保存当前运行线程的执行上下文,并将新调度进来的线程的执行上下文设置为当前上下文。上下文切换有点像我们同时阅读几本书,在来回切换书本的同时我们需要记住每本书当前读到的页码。

切换上下文需要一定的开销,而在线程调度过程中需要访问由操作系统和JVM 共享的数据结构。应用程序、操作系统以及 JVM 都使用一组相同的 CPU。在 JVM 和操作系统的代码中消耗越多的 CPU 时钟周期,应用程序的可用 CPU 时钟周期就越少。但上下文切换的开销并不只是包含 JVM 和操作系统的开销。当一个新的线程被切换进来时,它所需要的数据可能不在当前处理器的本地缓存中,因此上下文切换将导致一些缓存缺失,因而线程在首次调度运行时会更加缓慢。

当线程由于等待某个发生竞争的锁而被阻塞时, JVM 通常会将这个线程挂起, 并允许它被交换出去。如果线程频繁地发生阻塞,那么它们将无法使用完整的调度时间片。在程序中发生越多的阻塞(包括阻塞 IO,等待获取发生竞争的锁,或者在条件变量上等待),与 CPU 密集型的程序就会发生越多的上下文切换,从而增加调度开销,并因此而降低吞吐量。

上下文切换是计算密集型操作。也就是说,它需要相当可观的处理器时间。所以,上下文切换对系统来说意味着消耗大量的 CPU 时间,事实上,可能是操作系统中时间消耗最大的操作。上下文切换的实际开销会随着平台的不同而变化, 然而按照经验来看:在大多数通用的处理器中,上下文切换的开销相当于50~10000 个时钟周期,也就是几微秒。

UNIX 系统的 vmstat 命令能报告上下文切换次数以及在内核中执行时间所占比例等信息。如果内核占用率较高(超过 10%),那么通常表示调度活动发生得很频繁,这很可能是由 IO 或竞争锁导致的阻塞引起的。

# (2) 内存同步

同步操作的性能开销包括多个方面。在 synchronizedvolatile 提供的可见性保证中可能会使用一些特殊指令,即内存栅栏( Memory Barrier)。

内存栅栏可以刷新缓存,使缓存无效刷新硬件的写缓冲,以及停止执行管道。

内存栅栏可能同样会对性能带来间接的影响,因为它们将抑制一些编译器优化操作。在内存栅栏中,大多数操作都是不能被重排序的。

# (3) 阻塞

引起阻塞的原因:包括阻塞 IO,等待获取发生竞争的锁,或者在条件变量上等待等等。

阻塞会导致线程挂起(挂起:挂起进程在操作系统中可以定义为暂时被淘汰出内存的进程,机器的资源是有限的,在资源不足的情况下,操作系统对在内存中的程序进行合理的安排,其中有的进程被暂时调离出内存,当条件允许的时候,会被操作系统再次调回内存,重新进入等待被执行的状态即就绪态,系统在超过一定的时间没有任何动作)。

很明显这个操作至少包括两次额外的上下文切换,还有相关的操作系统级的操作等等。

# 2、如何减少锁的竞争

# (1) 减少锁的粒度

使用锁的时候,锁所保护的对象是多个,当这些多个对象其实是独立变化的时候,不如用多个锁来一一保护这些对象。但是如果有同时要持有多个锁的业务方法,要注意避免发生死锁。例如:

public class FinenessLock {

    public final Set<String> users = new HashSet<>();
    public final Set<String> queries = new HashSet<>();

    public synchronized void addUser(String u) {
        users.add(u);
    }

    public synchronized void addQuery(String q) {
        queries.add(q);
    }

    public synchronized void removeUser(String u) {
        users.remove(u);
    }

    public synchronized void removeQuery(String q) {
        queries.remove(q);
    }

    public void addUserDiv(String u) {
        synchronized (users) {
            users.add(u);
        }
    }

    public void addQueryDiv(String q) {
        synchronized (queries) {
            queries.add(q);
        }
    }
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33

原本才操作两个 Set 的时候对整个对象都进行加锁,优化后改为了操作各自的 Set 时,只对各自的 Set 对象进行加锁。

# (2) 缩小锁的范围

对锁的持有实现快进快出,尽量缩短持由锁的的时间。将一些与锁无关的代码移出锁的范围,特别是一些耗时,可能阻塞的操作。例如:

public class ReduceLock {

    private Map<String, String> matchMap = new HashMap<>();

    public synchronized boolean isMatch(String name, String regexp) {
        String key = "user." + name;
        String job = matchMap.get(key);
        if (job == null) {
            return false;
        } else {
            return Pattern.matches(regexp, job);
        }
    }

    private boolean isMatchReduce(String name, String regexp) {
        String key = "user." + name;
        String job;
        synchronized (this) {
            job = matchMap.get(key);
        }

        if (job == null) {
            return false;
        } else {
            return Pattern.matches(regexp, job);
        }
    }

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28

isMatch() 对整个方法进行了加锁,其中就包括很耗时的正则匹配代码,优化后的 isMatchReduce() 只对可能产生线程安全问题部分的代码进行加锁,节省了性能。

# (3) 避免多余的锁

两次加锁之间的语句非常简单,导致加锁的时间比执行这些语句还长,这个时候应该进行锁粗化—扩大锁的范围。例如:

        synchronized (this) {
            xxx
        }
        i++;
        synchronized (this) {
            xxx
        }
1
2
3
4
5
6
7

在两个加锁的代码块中间只进行了非常简单的 i++ 的操作,但是获取锁、释放锁却做了2次,这更加耗费性能,此时可以将两个加锁的代码合为一个,并将中间的代码包裹进去。

# (4) 锁分段

ConcurrrentHashMap 就是典型的锁分段。

# (5) 替换独占锁

在业务允许的情况下:

  1. 使用读写锁,
  2. 用自旋 CAS
  3. 使用系统的并发容器

# 五、线程安全的单例模式

# 1、双重检查锁

双重判空并使用 synchronized 关键字来写的单例大家一定都会写,那么我们来看看正确的写法应该是什么:

public class SingleDcl {

    private volatile static SingleDcl singleDcl;

    private SingleDcl() {
    }

    public static SingleDcl getInstance() {
        // 第一次检查,不加锁
        if (singleDcl == null) {
            System.out.println(Thread.currentThread() + " is null");
            // 加锁
            synchronized (SingleDcl.class) {
                // 第二次检查,加锁情况下
                if (singleDcl == null) {
                    System.out.println(Thread.currentThread() + " is null");
                    // new SingleDcl 时需要做3件事情:
                    // 1、内存中分配空间
                    // 2、空间初始化
                    // 3、把这个空间的地址给我们的引用
                    singleDcl = new SingleDcl();
                }
            }
        }
        return singleDcl;
    }
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27

由于在 new 实例的时候,由于 java 的重排序,可能让后进入 synchronized 的线程却错误的读取到了已经创建好对象的状态,但实际上先进入 synchronized 的线程还没有初始化好对象。从而导致空指针的问题。

使用 volatile 关键字让变量在线程间能够共享状态,从而读取到变量是否初始化好的状态。

# 2、延迟初始化占位类模式

使用静态内部类持有对象的引用可以实现安全的单例:

public class SingleInit {

    private SingleInit(){}

    private static class InstanceHolder{
        private static SingleInit instance = new SingleInit();
    }

    public static SingleInit getInstance(){
        return InstanceHolder.instance;
    }
}
1
2
3
4
5
6
7
8
9
10
11
12

# 3、饿汉式

饿汉式实现安全的单例:

public class SingleEHan {

    private SingleEHan() {
    }

    private static SingleEHan singleDcl = new SingleEHan();

}
1
2
3
4
5
6
7
8

# 4、在域上运用延迟初始化占位类模式

public class InstanceLazy {

    private Integer value;

    /**
     * 假如这个成员变量很耗资源
     */
    private Integer heavy;

    public InstanceLazy(Integer value) {
        this.value = value;
    }

    private static class InstanceHolder {
        public static Integer heavy = 100;
    }

    public Integer getValue() {
        return value;
    }

    public Integer getHeavy() {
        return InstanceHolder.heavy;
    }

}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26

还是使用静态内部类持有成员变量的一个引用,在 getHeavy() 方法中发布出去。


上次更新: 2020-08-21 09:02:51(10 小时前)