Semaphore控制并发的信号量

在面试的时候被问到关于java信号量,并没有很了解这个并发同步器,这里学习之后做一点简单的记录。Semaphore我们通常称之为信号量,这是用来控制同时访问某个资源的线程数量的一个同步器。比如数据库连接,如果数据库连接的数量只有10个,但是此时在执行的线程数量大于10个,那么线程会报错,获取不到数据库连接。为了防止这样的情况,我们可以通过信号量Semaphore来进行流量的控制。

使用Semaphore控制并发流量

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
package com.souche.study;

import java.util.concurrent.*;

/**
* Created by yqz on 10/25/18.
*/
public class SemaphoreTest {
//创建一个线程池
public static ExecutorService threadPoolExecutor= Executors.newFixedThreadPool(2);
public static //这里用信号量尝试控制流量
Semaphore semaphore =new Semaphore(10);

//public static volatile int count=0;

public static void main(String[] args) {


for (int i=0;i<30;i++){
threadPoolExecutor.execute(new Runnable() {
@Override
public void run() {
try {
semaphore.acquire();
//模拟获取数据库连接
System.out.println(Thread.currentThread().getName()+" get database link : ");
semaphore.release();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
});
}
}

}

Semaphore API介绍

Semaphore对每一个进入获取令牌的线程,都会尝试给予令牌,但是如果出现了供不应求的情况,每个线程需要的处理方式可能是不同的:

semaphore.acquire()

线程选择直接获取(非阻塞),进入等待队列,循环等待没获取到线程挂起

通过源码分析在底层的具体实现:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
private void doAcquireSharedInterruptibly(int arg)
throws InterruptedException {
final Node node = addWaiter(Node.SHARED);
boolean failed = true;
try {
for (;;) {
final Node p = node.predecessor();
if (p == head) {
int r = tryAcquireShared(arg);
if (r >= 0) {
setHeadAndPropagate(node, r);
p.next = null; // help GC
failed = false;
return;
}
}
if (shouldParkAfterFailedAcquire(p, node) &&//公平加入等待队列,如果没获取到直接触发中断,直接抛出中断异常
parkAndCheckInterrupt())
throw new InterruptedException();
}
} finally {
if (failed)
cancelAcquire(node);
}
}

#####

semaphore.tryAcquire()

线程可以选择等待一段时间(非阻塞),如果超时获取不到返回false,检测到中断信号线程中断

贴上源码的实现:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
private boolean doAcquireSharedNanos(int arg, long nanosTimeout)
throws InterruptedException {
if (nanosTimeout <= 0L)
return false;
final long deadline = System.nanoTime() + nanosTimeout;
final Node node = addWaiter(Node.SHARED);
boolean failed = true;
try {
for (;;) {
final Node p = node.predecessor();
if (p == head) {
int r = tryAcquireShared(arg);
if (r >= 0) {
setHeadAndPropagate(node, r);
p.next = null; // help GC
failed = false;
return true;
}
}
nanosTimeout = deadline - System.nanoTime();
if (nanosTimeout <= 0L)
return false;//如果超时了返回获取锁失败
if (shouldParkAfterFailedAcquire(p, node) &&
nanosTimeout > spinForTimeoutThreshold)
LockSupport.parkNanos(this, nanosTimeout);
if (Thread.interrupted())//将中断信号标志位复位,同时抛出中断异常
throw new InterruptedException();//检测到当前线程已经被中断,执行自我中断,抛出异常。
}
} finally {
if (failed)
cancelAcquire(node);
}
}
semaphore.acquireUninterruptibly()

获取锁的过程中不允许中断,线程会一直处于获取锁的状态,只会在失败之后将标志位置为true。

贴上源码的实现:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
/**
* Acquires in shared uninterruptible mode.
* @param arg the acquire argument
*/
private void doAcquireShared(int arg) {
final Node node = addWaiter(Node.SHARED);
boolean failed = true;
try {
boolean interrupted = false;
for (;;) {
final Node p = node.predecessor();
if (p == head) {
int r = tryAcquireShared(arg);
if (r >= 0) {
setHeadAndPropagate(node, r);
p.next = null; // help GC
if (interrupted)
selfInterrupt();
failed = false;
return;
}
}
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;//不抛出异常,方法不支持中断抛出异常
}
} finally {
if (failed)
cancelAcquire(node);
}
}

Semaphore的两种实现(公平or非公平)

Semaphore信号量提供了两种实现,公平和非公平的方式,这个可以类比retrantlock的公平非公平机制。一个会加入等待队列,一个不会加入等待队列。我们看下两个的源码:

1
2
3
4
5
6
7
8
9
final int nonfairTryAcquireShared(int acquires) {
for (;;) {//不会中断哦,一直循环占用cpu
int available = getState();
int remaining = available - acquires;
if (remaining < 0 ||
compareAndSetState(available, remaining))
return remaining;//非公平的方式也会一直循环直到获取成功,少了的话也会返回一个负值
}
}
1
2
3
4
5
6
7
8
9
10
11
protected int tryAcquireShared(int acquires) {
for (;;) {
if (hasQueuedPredecessors())//公平的方式这里多维护了一个队列,如果已经加入了队列,直接返回,队列中的线程会被调度。
return -1;
int available = getState();
int remaining = available - acquires;
if (remaining < 0 ||
compareAndSetState(available, remaining))
return remaining;
}
}

我们看到公平和非公平的方式只是多了一个方法,那么看下这个方法到底实现了什么:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
* @return {@code true} if there is a queued thread preceding the
* current thread, and {@code false} if the current thread
* is at the head of the queue or the queue is empty
* @since 1.7
*/
public final boolean hasQueuedPredecessors() {
// The correctness of this depends on head being initialized
// before tail and on head.next being accurate if the current
// thread is first in queue.
Node t = tail; // Read fields in reverse initialization order
Node h = head;
Node s;
return h != t &&
((s = h.next) == null || s.thread != Thread.currentThread());
}
//这里是维护的node队列,node是记录一个线程信息的对象。该方法主要判断当前线程是否已经维护进了公平等待的队列中,如果维护进去了,就返回false,没有维护进去就返回true。也就是公平锁会直接返回一个-1,线程继续执行,进入等待队列中调度执行。

总结

在面试的时候被问到信号量当时也是蛮懵逼的,觉得自己深入了解过AQS队列同步器,但是对java原生实现的一些同步器了解并不是特别多。信号量这个名字并不是特别好理解,可以更形象的称之为令牌管理器。在并发线程比较多的时候,一些有限的资源类似数据库连接数不能及时供应,那么我们需要进行一个流量控制。对于客户端的请求来说,可以在一段时间获取不到这个令牌之后选择不再等待,也可以在获取令牌的方法一直等待直到被中断,或者干脆一直等下去,死磕。网上搜索的时候发现一篇介绍同样内容的[博客]还不错,可以mark下https://blog.csdn.net/hanchao5272/article/details/79780045。