# 并发安全
# 一、什么是线程安全性
当多个线程访问某个类时,不管运行时环境采用何种调度方式或者这些线程将如何交替执行,并且在调用代码中不需要任何额外的同步或者协同,这个类都能表现出正确的行为,那么就称这个类是线程安全的。
# 1、线程封闭
实现好的并发是一件困难的事情,所以很多时候我们都想躲避并发。避免并发最简单的方法就是线程封闭。什么是线程封闭呢?
就是把对象封装到一个线程里,只有这一个线程能看到此对象。那么这个对象就算不是线程安全的也不会出现任何安全问题。实现线程封闭有哪些方法呢?
# (1) ad-hoc 线程封闭
这是完全靠实现者控制的线程封闭,他的线程封闭完全靠实现者实现。Ad-hoc
线程封闭非常脆弱,应该尽量避免使用。
# (2) 栈封闭
栈封闭是我们编程当中遇到的最多的线程封闭。什么是栈封闭呢?简单的说就是局部变量。多个线程访问一个方法,此方法中的局部变量都会被拷贝一份到线程栈中。所以局部变量是不被多个线程所共享的,也就不会出现并发问题。所以能用局部变量就别用全局的变量,全局变量容易引起并发问题。
# 2、无状态的类
没有任何成员变量的类,就叫无状态的类,这种类一定是线程安全的。例如:
public class StatelessClass {
public int service(int a, int b) {
return a + b;
}
public void serviceUser(UserVo user) {
// do sth user
}
}
2
3
4
5
6
7
8
9
10
11
public class UserVo {
private int age;
public int getAge() {
return age;
}
public void setAge(int age) {
this.age = age;
}
}
2
3
4
5
6
7
8
9
10
11
12
在这个类中使用了对象 UserVo
,那么它还是线程安全的吗?当然还是。因为多线程下,固然 user
这个对象的实例会不正常,但是对于 StatelessClass
这个类的对象实例来说,它并不持有 UserVo
的对象实例,它自己并不会有问题,有问题的是 UserVo
这个类,而非 StatelessClass
本身。
# 3、让类不可变
让状态不可变,有两种方式:
- 加
final
关键字,对于一个类,所有的成员变量应该是私有的,同样的只要有可能,所有的成员变量应该加上final
关键字,但是加上final
,要注意如果成员变量又是一个对象时,这个对象所对应的类也要是不可变,才能保证整个类是不可变的。
public class ImmutableClass {
private final int a;
public ImmutableClass(int a) {
this.a = a;
}
public int getA() {
return a;
}
public static class User {
private int age;
public int getAge() {
return age;
}
public void setAge(int age) {
this.age = age;
}
}
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
- 根本就不提供任何可供修改成员变量的地方,同时成员变量也不作为方法的返回值。
public class ImmutableClassTwo {
private List<Integer> list = new ArrayList<>(3);
public ImmutableClassTwo() {
list.add(1);
list.add(2);
list.add(3);
}
public boolean isContain(int i) {
return list.contains(i);
}
}
2
3
4
5
6
7
8
9
10
11
12
13
14
但是要注意,一旦类的成员变量中有对象,上述的 final
关键字保证不可变并不能保证类的安全性,为什么呢?因为在多线程下,虽然对象的引用不可变,但是对象在堆上的实例是有可能被多个线程同时修改的,没有正确处理的情况下,对象实例在堆中的数据是不可预知的。这就牵涉到了如何安全的发布对象这个问题。如下:
public class ImmutableClass {
private final int a;
/**
* user 不安全
*/
private final UserVo user = new UserVo();
public ImmutableClass(int a) {
this.a = a;
}
public int getA() {
return a;
}
public UserVo getUser() {
return user;
}
public static class User {
private int age;
public int getAge() {
return age;
}
public void setAge(int age) {
this.age = age;
}
}
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
# volatile
volatile
并不能保证类的线程安全性,只能保证类的可见性,最适合一个线程写,多个线程读的情景。
# 4、加锁和 CAS
我们最常使用的保证线程安全的手段,使用 synchronized
关键字,使用显式锁,使用各种原子变量,修改数据时使用 CAS
机制等等。
# 5、安全的发布
类中持有的成员变量,如果是基本类型,发布出去,并没有关系,因为发布出去的其实是这个变量的一个副本,例如:
public class SafePublish {
private int i;
public SafePublish() {
i = 2;
}
public int getI() {
return i;
}
public static void main(String[] args) {
SafePublish safePublish = new SafePublish();
int j = safePublish.getI();
System.out.println("before set, j = " + j);
j = 3;
System.out.println("after set, j = " + j);
System.out.println("getI = " + safePublish.getI());
}
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
执行结果:
before set, j = 2
after set, j = 3
getI = 2
2
3
但是如果类中持有的成员变量是对象的引用,如果这个成员对象不是线程安全的,通过 get()
等方法发布出去,会造成这个成员对象本身持有的数据在多线程下不正确的修改,从而造成整个类线程不安全的问题。例如:
public class UnSafePublish {
private List<Integer> list = new ArrayList<>(3);
public UnSafePublish() {
list.add(1);
list.add(2);
list.add(3);
}
public List<Integer> getList() {
return list;
}
public static void main(String[] args) {
UnSafePublish unSafePublish = new UnSafePublish();
List<Integer> list = unSafePublish.getList();
System.out.println("before add, list = " + list);
list.add(4);
System.out.println("after add, list = " + list);
System.out.println("getList = " + unSafePublish.getList());
}
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
执行结果:
before add, list = [1, 2, 3]
after add, list = [1, 2, 3, 4]
getList = [1, 2, 3, 4]
2
3
可以看出,这个 list
发布出去后,是可以被外部线程之间修改,那么在多个线程同时修改的情况下不安全问题是肯定存在的,怎么修正这个问题呢?我们在发布这对象出去的时候,就应该用线程安全的方式包装这个对象。如下:
public class SafePublishTwo {
private List<Integer> list = Collections.synchronizedList(new ArrayList<>(3));
public SafePublishTwo() {
list.add(1);
list.add(2);
list.add(3);
}
public List<Integer> getList() {
return list;
}
public static void main(String[] args) {
SafePublishTwo safePublishTwo = new SafePublishTwo();
List<Integer> list = safePublishTwo.getList();
System.out.println("before add, list = " + list);
list.add(4);
System.out.println("after add, list = " + list);
System.out.println("getList = " + safePublishTwo.getList());
}
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
执行结果:
before add, list = [1, 2, 3]
after add, list = [1, 2, 3, 4]
getList = [1, 2, 3, 4]
2
3
虽然执行结果没有什么变化,但是将 list
用 Collections.synchronizedList
进行包装以后,无论多少线程使用这个 list
,就都是线程安全的了。
对于我们自己使用或者声明的类,jdk
自然没有提供这种包装类的办法,但是我们可以仿造这种模式或者委托给线程安全的类,当然,对这种通过 get()
等方法发布出去的对象,最根本的解决办法还是应该在实现上就考虑到线程安全问题,例如:
public class SafePublicUser {
private final UserVo user;
public UserVo getUser() {
return user;
}
public SafePublicUser(UserVo user) {
this.user = new SyncUser(user);
}
private static class SyncUser extends UserVo {
private final UserVo userVo;
private final Object lock = new Object();
public SyncUser(UserVo userVo) {
this.userVo = userVo;
}
@Override
public int getAge() {
synchronized (lock) {
return userVo.getAge();
}
}
@Override
public void setAge(int age) {
synchronized (lock) {
userVo.setAge(age);
}
}
}
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
将 SyncUser
继承 UserVo
,在实际上发布的是 SyncUser
对象,在 SyncUser
内部使用 synchronized
加锁进行了安全控制。
如果 UserVo
是 final
的无法继承时怎么办呢,例如:
public final class FinalUserVo {
private int age;
public int getAge() {
return age;
}
public void setAge(int age) {
this.age = age;
}
}
2
3
4
5
6
7
8
9
10
11
12
这时就可以使用代理的方式:
public class SafePublicFinalUser {
private final SyncFinalUser user;
public SyncFinalUser getUser() {
return user;
}
public SafePublicFinalUser(FinalUserVo user) {
this.user = new SyncFinalUser(user);
}
public static class SyncFinalUser {
private final FinalUserVo userVo;
private final Object lock = new Object();
public SyncFinalUser(FinalUserVo userVo) {
this.userVo = userVo;
}
public int getAge() {
synchronized (lock) {
return userVo.getAge();
}
}
public void setAge(int age) {
synchronized (lock) {
userVo.setAge(age);
}
}
}
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
在 SyncFinalUser
内部持有一个 FinalUserVo
的引用,实际发布出去的还是安全的 SyncFinalUser
对象。
# 6、TheadLocal
ThreadLocal
是实现线程封闭的最好方法。ThreadLocal
内部维护了一个 Map
,Map
的 key
是每个线程的名称,而 Map
的值就是我们要封闭的对象。每个线程中的对象都对应着 Map
中一个值,也就是 ThreadLocal
利用 Map
实现了对象的线程封闭。
# 7、Servlet 辨析
Servlet
不是线程安全的类,为什么我们平时没感觉到呢,因为
- 在需求上,很少有共享的需求
- 接收到了请求,返回应答的时候,一般都是由一个线程来负责的。但是只要
Servlet
中有成员变量,一旦有多线程下的写,就很容易产生线程安全问题。
# 二、死锁
# 1、概念
是指两个或两个以上的进程在执行过程中,由于竞争资源或者由于彼此通信而造成的一种阻塞的现象,若无外力作用,它们都将无法推进下去。此时称系统处于死锁状态或系统产生了死锁。
死锁是必然发生在多操作者(M >= 2
个)情况下,争夺多个资源(N >= 2
个,且 N <= M
)才会发生这种情况。很明显,单线程自然不会有死锁,同时,死锁还有一个重要的要求,争夺资源的顺序不对,如果争夺资源的顺序是一样的,也不会产生死锁。
# 学术化的定义
死锁的发生必须具备以下四个必要条件:
- 互斥条件
指进程对所分配到的资源进行排它性使用,即在一段时间内某资源只由一个进程占用。如果此时还有其它进程请求资源,则请求者只能等待,直至占有资源的进程用毕释放。
- 请求和保持条件
指进程已经保持至少一个资源,但又提出了新的资源请求,而该资源已被其它进程占有,此时请求进程阻塞,但又对自己已获得的其它资源保持不放。
3)不剥夺条件
指进程已获得的资源,在未使用完之前,不能被剥夺,只能在使用完时由自己释放。
4)环路等待条件
指在发生死锁时,必然存在一个进程——资源的环形链,即进程集合 {P0,P1,P2,···,Pn}
中的 P0
正在等待一个 P1
占用的资源;P1
正在等待 P2
占用的资源…… Pn
正在等待已被 P0
占用的资源。
理解了死锁的原因,尤其是产生死锁的四个必要条件,就可以最大可能地避免、预防和解除死锁。只要打破四个必要条件之一就能有效预防死锁的发生。打破互斥条件:改造独占性资源为虚拟资源,大部分资源已无法改造。打破不可抢占条件:当一进程占有一独占性资源后又申请一独占性资源而无法满足,则退出原占有的资源。打破占有且申请条件:采用资源预先分配策略,即进程运行前申请全部资源,满足则运行,不然就等待,这样就不会占有且申请。打破循环等待条件:实现资源有序分配策略,对所有设备实现分类编号,所有进程只能采用按序号递增的形式申请资源。
避免死锁常见的算法有有序资源分配法、银行家算法。
# 2、现象、危害和解决
数据库里多事务而且要同时操作多个表的情况下会产生死锁的情况。所以数据库设计的时候就考虑到了检测死锁和从死锁中恢复的机制。比如 oracle
提供了检测和处理死锁的语句,而 mysql
也提供了“循环依赖检测的机制。
# (1) 现象
# 简单顺序死锁
下面看一个死锁的例子:
public class NormalDeadLock {
/**
* 第一个锁
*/
private static Object valueFirst = new Object();
/**
* 第二个锁
*/
private static Object valueSecond = new Object();
/**
* 先拿第一个锁,再拿第二个锁
*
* @throws InterruptedException
*/
private static void firstToSecond() throws InterruptedException {
String threadName = Thread.currentThread().getName();
synchronized (valueFirst) {
System.out.println(threadName + " get 1st");
Thread.sleep(100);
synchronized (valueSecond) {
System.out.println(threadName + " get 2nd");
}
}
}
/**
* 先拿第二个锁,再拿第一个锁
*
* @throws InterruptedException
*/
private static void SecondToFirst() throws InterruptedException {
String threadName = Thread.currentThread().getName();
synchronized (valueSecond) {
System.out.println(threadName + " get 2nd");
Thread.sleep(100);
synchronized (valueFirst) {
System.out.println(threadName + " get 1st");
}
}
}
private static class TestThread extends Thread {
private String name;
public TestThread(String name) {
this.name = name;
}
public void run() {
Thread.currentThread().setName(name);
try {
SecondToFirst();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
public static void main(String[] args) {
Thread.currentThread().setName("TestDeadLock");
TestThread testThread = new TestThread("SubTestThread");
testThread.start();
try {
firstToSecond();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
运行结果:
TestDeadLock get 1st
SubTestThread get 2nd
2
两个进程互相等待。要想解决死锁的问题,只需要让两个线程拿锁的顺序相同即可:
public class NormalDeadLock {
/**
* 第一个锁
*/
private static Object valueFirst = new Object();
/**
* 第二个锁
*/
private static Object valueSecond = new Object();
/**
* 先拿第一个锁,再拿第二个锁
*
* @throws InterruptedException
*/
private static void firstToSecond() throws InterruptedException {
String threadName = Thread.currentThread().getName();
synchronized (valueFirst) {
System.out.println(threadName + " get 1st");
Thread.sleep(100);
synchronized (valueSecond) {
System.out.println(threadName + " get 2nd");
}
}
}
/**
* 先拿第二个锁,再拿第一个锁
*
* @throws InterruptedException
*/
private static void SecondToFirst() throws InterruptedException {
String threadName = Thread.currentThread().getName();
synchronized (valueFirst) {
System.out.println(threadName + " get 1st");
Thread.sleep(100);
synchronized (valueSecond) {
System.out.println(threadName + " get 2nd");
}
}
}
private static class TestThread extends Thread {
private String name;
public TestThread(String name) {
this.name = name;
}
public void run() {
Thread.currentThread().setName(name);
try {
SecondToFirst();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
public static void main(String[] args) {
Thread.currentThread().setName("TestDeadLock");
TestThread testThread = new TestThread("SubTestThread");
testThread.start();
try {
firstToSecond();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
运行结果:
TestDeadLock get 1st
TestDeadLock get 2nd
SubTestThread get 1st
SubTestThread get 2nd
2
3
4
# 动态顺序死锁
模拟一个转账的业务,用户账户类:
public class UserAccount {
/**
* 账户名称
*/
private final String name;
/**
* 账户余额l
*/
private int money;
private final Lock lock = new ReentrantLock();
public Lock getLock() {
return lock;
}
public UserAccount(String name, int amount) {
this.name = name;
this.money = amount;
}
public String getName() {
return name;
}
public int getAmount() {
return money;
}
@Override
public String toString() {
return "UserAccount{" +
"name='" + name + '\'' +
", money=" + money +
'}';
}
/**
* 转入资金
*
* @param amount
*/
public void addMoney(int amount) {
money = money + amount;
}
/**
* 转出资金
*
* @param amount
*/
public void flyMoney(int amount) {
money = money - amount;
}
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
转账接口:
public interface ITransfer {
void transfer(UserAccount from, UserAccount to, int amount) throws InterruptedException;
}
2
3
4
先来看第一种实现:
public class TransferAccount implements ITransfer {
@Override
public void transfer(UserAccount from, UserAccount to, int amount) throws InterruptedException {
synchronized (from) {
System.out.println(Thread.currentThread().getName() + " get " + from.getName());
Thread.sleep(100);
synchronized (to) {
System.out.println(Thread.currentThread().getName() + " get " + to.getName());
from.flyMoney(amount);
to.addMoney(amount);
}
}
}
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
业务逻辑:
public class PayCompany {
/**
* 执行转账动作的线程
*/
private static class TransferThread extends Thread {
private String name;
private UserAccount from;
private UserAccount to;
private int amount;
private ITransfer transfer;
public TransferThread(String name, UserAccount from, UserAccount to, int amount, ITransfer transfer) {
this.name = name;
this.from = from;
this.to = to;
this.amount = amount;
this.transfer = transfer;
}
public void run() {
Thread.currentThread().setName(name);
try {
transfer.transfer(from, to, amount);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
public static void main(String[] args) {
UserAccount zhangsan = new UserAccount("zhangsan", 20000);
UserAccount lisi = new UserAccount("lisi", 20000);
ITransfer transfer = new TransferAccount();
TransferThread zhangsanToLisi = new TransferThread("zhangsanToLisi",
zhangsan, lisi, 2000, transfer);
TransferThread lisiToZhangsan = new TransferThread("lisiToZhangsan",
lisi, zhangsan, 4000, transfer);
zhangsanToLisi.start();
lisiToZhangsan.start();
}
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
执行结果:
zhangsanToLisi get zhangsan
lisiToZhangsan get lisi
2
产生了死锁,虽然在实现中固定了获取锁的先后顺序,但是在调用方法的时候,由于两个参数的顺序不同造成了获取锁的顺序不同,进而造成了死锁。
下面看第一种解决方案:
public class SafeOperation implements ITransfer {
/**
* 第三把锁
*/
private static Object tieLock = new Object();
@Override
public void transfer(UserAccount from, UserAccount to, int amount) throws InterruptedException {
int fromHash = System.identityHashCode(from);
int toHash = System.identityHashCode(to);
if (fromHash < toHash) {
synchronized (from) {
System.out.println(Thread.currentThread().getName() + " get " + from.getName());
Thread.sleep(100);
synchronized (to) {
System.out.println(Thread.currentThread().getName() + " get " + to.getName());
from.flyMoney(amount);
to.addMoney(amount);
System.out.println(from);
System.out.println(to);
}
}
} else if (toHash < fromHash) {
synchronized (to) {
System.out.println(Thread.currentThread().getName() + " get " + to.getName());
Thread.sleep(100);
synchronized (from) {
System.out.println(Thread.currentThread().getName() + " get " + from.getName());
from.flyMoney(amount);
to.addMoney(amount);
System.out.println(from);
System.out.println(to);
}
}
} else {
synchronized (tieLock) {
synchronized (from) {
synchronized (to) {
from.flyMoney(amount);
to.addMoney(amount);
}
}
}
}
}
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
PayCompany
使用 SafeOperation
这个实现后的执行结果:
zhangsanToLisi get zhangsan
zhangsanToLisi get lisi
UserAccount{name='zhangsan', money=18000}
UserAccount{name='lisi', money=22000}
lisiToZhangsan get zhangsan
lisiToZhangsan get lisi
UserAccount{name='lisi', money=18000}
UserAccount{name='zhangsan', money=22000}
2
3
4
5
6
7
8
通过对比两个锁的 hash
值后来决定先后顺序而解决了死锁问题。
第二种解决方案:
public class SafeOperateTwo implements ITransfer {
@Override
public void transfer(UserAccount from, UserAccount to, int amount) throws InterruptedException {
Random r = new Random();
while (true) {
if (from.getLock().tryLock()) {
System.out.println(Thread.currentThread().getName() + " get " + from.getName());
try {
if (to.getLock().tryLock()) {
try {
System.out.println(Thread.currentThread().getName() + " get " + to.getName());
from.flyMoney(amount);
to.addMoney(amount);
System.out.println(from);
System.out.println(to);
break;
} finally {
to.getLock().unlock();
}
}
} finally {
from.getLock().unlock();
}
}
// 使用休眠可以错开两个线程获取锁的时间
Thread.sleep(r.nextInt(2));
}
}
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
PayCompany
使用 SafeOperationTwo
这个实现后的执行结果:
lisiToZhangsan get lisi
zhangsanToLisi get zhangsan
lisiToZhangsan get lisi
zhangsanToLisi get zhangsan
zhangsanToLisi get zhangsan
lisiToZhangsan get lisi
lisiToZhangsan get zhangsan
UserAccount{name='lisi', money=16000}
UserAccount{name='zhangsan', money=24000}
zhangsanToLisi get zhangsan
zhangsanToLisi get lisi
UserAccount{name='zhangsan', money=22000}
UserAccount{name='lisi', money=18000}
2
3
4
5
6
7
8
9
10
11
12
13
这里使用了每个账户类中的 Lock
锁,并且通过 tryLock()
方法尝试性获取锁,当第一把锁获取成功后立刻 tryLock()
第二把锁,当两个锁都获取到后进行业务逻辑,当任意一个锁没有获取成功的时候就会进入到 finally
中释放锁。
通过一个短暂的随机休眠可以错开多个线程获取锁的时间。
# (2) 危害
- 线程不工作了,但是整个程序还是活着的
- 没有任何的异常信息可以供我们检查。
- 一旦程序发生了发生了死锁,是没有任何的办法恢复的,只能重启程序,对生产平台的程序来说,这是个很严重的问题。
# 实际工作中的死锁
时间不定,不是每次必现;一旦出现没有任何异常信息,只知道这个应用的所有业务越来越慢,最后停止服务,无法确定是哪个具体业务导致的问题;测试部门也无法复现,并发量不够。
# (3) 解决
- 定位
通过 java
安装好的 bin
目录中的工具 jps
,使用 jps -v
就可以查询正在运行的 java
应用的 id
:
再通过 jstack 10440
查看应用的锁持有情况:
- 修正
关键是保证拿锁的顺序一致,有两种解决方式:
1、内部通过顺序比较,确定拿锁的顺序;
2、采用尝试拿锁的机制。
# 三、其它安全问题
# 1、活锁
两个线程在尝试拿锁的机制中,发生多个线程之间互相谦让,不断发生同一个线程总是拿到同一把锁,在尝试拿另一把锁时因为拿不到,而将本来已经持有的锁释放的过程。例如上面写的 SafeOperateTwo
类如果不错开线程拿锁的时间的话,有可能会产生活锁。
解决办法:每个线程休眠随机数,错开拿锁的时间。
# 2、线程饥饿
低优先级的线程,总是拿不到执行时间。
# 四、并发下的性能
使用并发的目标是为了提高性能,引入多线程后,其实会引入额外的开销,如线程之间的协调、增加的上下文切换,线程的创建和销毁,线程的调度等等。过度的使用和不恰当的使用,会导致多线程程序甚至比单线程还要低。
衡量应用的程序的性能:服务时间,延迟时间,吞吐量,可伸缩性等等,其中服务时间,延迟时间(多快),吞吐量(处理能力的指标,完成工作的多少)。多快和多少,完全独立,甚至是相互矛盾的。
对服务器应用来说:多少(可伸缩性,吞吐量)这个方面比多快更受重视。我们做应用的时候:
- 先保证程序正确,确实达不到要求的时候,再提高速度。(黄金原则)
- 一定要以测试为基准。
# 1、线程引入的开销
# (1) 上下文切换
如果主线程是唯一的线程,那么它基本上不会被调度出去。另一方面,如果可运行的线程数大于 CPU
的数量,那么操作系统最终会将某个正在运行的线程调度出来,从而使其他线程能够使用 CPU
。这将导致一次上下文切换,在这个过程中将保存当前运行线程的执行上下文,并将新调度进来的线程的执行上下文设置为当前上下文。上下文切换有点像我们同时阅读几本书,在来回切换书本的同时我们需要记住每本书当前读到的页码。
切换上下文需要一定的开销,而在线程调度过程中需要访问由操作系统和JVM 共享的数据结构。应用程序、操作系统以及 JVM
都使用一组相同的 CPU
。在 JVM
和操作系统的代码中消耗越多的 CPU
时钟周期,应用程序的可用 CPU
时钟周期就越少。但上下文切换的开销并不只是包含 JVM
和操作系统的开销。当一个新的线程被切换进来时,它所需要的数据可能不在当前处理器的本地缓存中,因此上下文切换将导致一些缓存缺失,因而线程在首次调度运行时会更加缓慢。
当线程由于等待某个发生竞争的锁而被阻塞时, JVM
通常会将这个线程挂起, 并允许它被交换出去。如果线程频繁地发生阻塞,那么它们将无法使用完整的调度时间片。在程序中发生越多的阻塞(包括阻塞 IO
,等待获取发生竞争的锁,或者在条件变量上等待),与 CPU
密集型的程序就会发生越多的上下文切换,从而增加调度开销,并因此而降低吞吐量。
上下文切换是计算密集型操作。也就是说,它需要相当可观的处理器时间。所以,上下文切换对系统来说意味着消耗大量的 CPU
时间,事实上,可能是操作系统中时间消耗最大的操作。上下文切换的实际开销会随着平台的不同而变化, 然而按照经验来看:在大多数通用的处理器中,上下文切换的开销相当于50~10000 个时钟周期,也就是几微秒。
UNIX
系统的 vmstat
命令能报告上下文切换次数以及在内核中执行时间所占比例等信息。如果内核占用率较高(超过 10%),那么通常表示调度活动发生得很频繁,这很可能是由 IO
或竞争锁导致的阻塞引起的。
# (2) 内存同步
同步操作的性能开销包括多个方面。在 synchronized
和 volatile
提供的可见性保证中可能会使用一些特殊指令,即内存栅栏( Memory Barrier
)。
内存栅栏可以刷新缓存,使缓存无效刷新硬件的写缓冲,以及停止执行管道。
内存栅栏可能同样会对性能带来间接的影响,因为它们将抑制一些编译器优化操作。在内存栅栏中,大多数操作都是不能被重排序的。
# (3) 阻塞
引起阻塞的原因:包括阻塞 IO,等待获取发生竞争的锁,或者在条件变量上等待等等。
阻塞会导致线程挂起(挂起:挂起进程在操作系统中可以定义为暂时被淘汰出内存的进程,机器的资源是有限的,在资源不足的情况下,操作系统对在内存中的程序进行合理的安排,其中有的进程被暂时调离出内存,当条件允许的时候,会被操作系统再次调回内存,重新进入等待被执行的状态即就绪态,系统在超过一定的时间没有任何动作)。
很明显这个操作至少包括两次额外的上下文切换,还有相关的操作系统级的操作等等。
# 2、如何减少锁的竞争
# (1) 减少锁的粒度
使用锁的时候,锁所保护的对象是多个,当这些多个对象其实是独立变化的时候,不如用多个锁来一一保护这些对象。但是如果有同时要持有多个锁的业务方法,要注意避免发生死锁。例如:
public class FinenessLock {
public final Set<String> users = new HashSet<>();
public final Set<String> queries = new HashSet<>();
public synchronized void addUser(String u) {
users.add(u);
}
public synchronized void addQuery(String q) {
queries.add(q);
}
public synchronized void removeUser(String u) {
users.remove(u);
}
public synchronized void removeQuery(String q) {
queries.remove(q);
}
public void addUserDiv(String u) {
synchronized (users) {
users.add(u);
}
}
public void addQueryDiv(String q) {
synchronized (queries) {
queries.add(q);
}
}
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
原本才操作两个 Set
的时候对整个对象都进行加锁,优化后改为了操作各自的 Set
时,只对各自的 Set
对象进行加锁。
# (2) 缩小锁的范围
对锁的持有实现快进快出,尽量缩短持由锁的的时间。将一些与锁无关的代码移出锁的范围,特别是一些耗时,可能阻塞的操作。例如:
public class ReduceLock {
private Map<String, String> matchMap = new HashMap<>();
public synchronized boolean isMatch(String name, String regexp) {
String key = "user." + name;
String job = matchMap.get(key);
if (job == null) {
return false;
} else {
return Pattern.matches(regexp, job);
}
}
private boolean isMatchReduce(String name, String regexp) {
String key = "user." + name;
String job;
synchronized (this) {
job = matchMap.get(key);
}
if (job == null) {
return false;
} else {
return Pattern.matches(regexp, job);
}
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
isMatch()
对整个方法进行了加锁,其中就包括很耗时的正则匹配代码,优化后的 isMatchReduce()
只对可能产生线程安全问题部分的代码进行加锁,节省了性能。
# (3) 避免多余的锁
两次加锁之间的语句非常简单,导致加锁的时间比执行这些语句还长,这个时候应该进行锁粗化—扩大锁的范围。例如:
synchronized (this) {
xxx
}
i++;
synchronized (this) {
xxx
}
2
3
4
5
6
7
在两个加锁的代码块中间只进行了非常简单的 i++
的操作,但是获取锁、释放锁却做了2次,这更加耗费性能,此时可以将两个加锁的代码合为一个,并将中间的代码包裹进去。
# (4) 锁分段
ConcurrrentHashMap
就是典型的锁分段。
# (5) 替换独占锁
在业务允许的情况下:
- 使用读写锁,
- 用自旋
CAS
- 使用系统的并发容器
# 五、线程安全的单例模式
# 1、双重检查锁
双重判空并使用 synchronized
关键字来写的单例大家一定都会写,那么我们来看看正确的写法应该是什么:
public class SingleDcl {
private volatile static SingleDcl singleDcl;
private SingleDcl() {
}
public static SingleDcl getInstance() {
// 第一次检查,不加锁
if (singleDcl == null) {
System.out.println(Thread.currentThread() + " is null");
// 加锁
synchronized (SingleDcl.class) {
// 第二次检查,加锁情况下
if (singleDcl == null) {
System.out.println(Thread.currentThread() + " is null");
// new SingleDcl 时需要做3件事情:
// 1、内存中分配空间
// 2、空间初始化
// 3、把这个空间的地址给我们的引用
singleDcl = new SingleDcl();
}
}
}
return singleDcl;
}
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
由于在 new
实例的时候,由于 java
的重排序,可能让后进入 synchronized
的线程却错误的读取到了已经创建好对象的状态,但实际上先进入 synchronized
的线程还没有初始化好对象。从而导致空指针的问题。
使用 volatile
关键字让变量在线程间能够共享状态,从而读取到变量是否初始化好的状态。
# 2、延迟初始化占位类模式
使用静态内部类持有对象的引用可以实现安全的单例:
public class SingleInit {
private SingleInit(){}
private static class InstanceHolder{
private static SingleInit instance = new SingleInit();
}
public static SingleInit getInstance(){
return InstanceHolder.instance;
}
}
2
3
4
5
6
7
8
9
10
11
12
# 3、饿汉式
饿汉式实现安全的单例:
public class SingleEHan {
private SingleEHan() {
}
private static SingleEHan singleDcl = new SingleEHan();
}
2
3
4
5
6
7
8
# 4、在域上运用延迟初始化占位类模式
public class InstanceLazy {
private Integer value;
/**
* 假如这个成员变量很耗资源
*/
private Integer heavy;
public InstanceLazy(Integer value) {
this.value = value;
}
private static class InstanceHolder {
public static Integer heavy = 100;
}
public Integer getValue() {
return value;
}
public Integer getHeavy() {
return InstanceHolder.heavy;
}
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
还是使用静态内部类持有成员变量的一个引用,在 getHeavy()
方法中发布出去。
← 线程池 JMM和底层实现原理 →