[JAVA]并发编程-AQS
AQS全称AbstractQueuedSynchronizer,是JUC包下的一个核心的抽象类,是用于构建锁和其他同步器的基础框架,子类可以通过继承AQS实现同步的需求。比如ThreadPoolExecutor,CountDownLatch,ReentrantLock,Semaphore,都是基于AQS去实现的。
在AQS中有三大核心内容:
1.核心属性state
/**
* The synchronization state.
*/
private volatile int state;
就拿ReentrantLock来说,state等于0,就代表当前没有线程持有这个lock锁,如果state大于0就代表当前的lock锁被某个线程持有。
2.同步队列
其实同步队列它是一个双向链表的数据结构,还继续拿ReentrantLock举例来说,如果A线程想获取锁资源,但是发现锁资源被其他线程占用着,A线程就会被封装成一个Node对象,进入到同步队列尾部去排队,等待锁资源。
3.Condition单向链表
public class ConditionObject implements Condition, java.io.Serializable {
private static final long serialVersionUID = 1173984872572414699L;
/** First node of condition queue. */
private transient Node firstWaiter;
/** Last node of condition queue. */
private transient Node lastWaiter;
/**
* Creates a new {@code ConditionObject} instance.
*/
public ConditionObject() { }
.......
}
本质就是AQS内部类ConditionObject类中提供的一个基于Node对象组成的单向链表,是通过Node中的nextWaiter进行连接的。主要作用是:但持有lock锁的线程调用了await方法,会将当前线程封装成Node对象插入到单向链表中,等待被其他线程调用signal唤醒,然后扔到同步队列中去竞争锁资源。
公平锁和非公平锁
继续拿ReentrantLock来举例,在ReentrantLock中并没有直接的继承AQS,而是通过抽象静态内部类Sync去继承AQS。锁的具体实现分为了公平锁(FairSync)和非公平锁(NoFairSync)。从代码上看两种锁的具体实现逻辑。
// 公平锁
final void lock() {
acquire(1);
}
//非公平锁
final void lock() {
if (compareAndSetState(0, 1))
setExclusiveOwnerThread(Thread.currentThread());
else
acquire(1);
}
下面具体看一下AQS关键方法的代码实现
// 公平锁的逻辑
protected final boolean tryAcquire(int acquires) {
final Thread current = Thread.currentThread();
int c = getState();
if (c == 0) {
// 如果当前锁资源没有被占用,需要通过一定的判断尝试去获取锁
//1.如果同步队列中没有排队的Node,直接执行cas去抢锁
//2.如果有排队的Node,并且排在最前面的是当前线程,直接执行cas去抢锁
//3.如果有排队的Node,并且排在最前面的不是当前线程,不能抢锁。
if (!hasQueuedPredecessors() &&
compareAndSetState(0, acquires)) {
setExclusiveOwnerThread(current);
return true;
}
}
else if (current == getExclusiveOwnerThread()) {
int nextc = c + acquires;
if (nextc < 0)
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
return false;
}
// 非公平锁的逻辑
final boolean nonfairTryAcquire(int acquires) {
// 获取当前线程
final Thread current = Thread.currentThread();
// 获取state属性的值
int c = getState();
if (c == 0) {
// 当前锁资源没有被线程持有,通过cas获取锁资源,尝试将state从0改为1
if (compareAndSetState(0, acquires)) {
// 修改成功,将exclusiveOwnerThread设置为当前线程
setExclusiveOwnerThread(current);
// 返回true,代表拿锁成功
return true;
}
}
// 当前锁资源被占用,判断占用锁资源的是不是当前线程
else if (current == getExclusiveOwnerThread()) {
// state加1
int nextc = c + acquires;
if (nextc < 0) // overflow
throw new Error("Maximum lock count exceeded");
// 赋值给state
setState(nextc);
// 锁重入成功
return true;
}
return false;
}
// 执行到这个方法时,就代表当前线程已经在同步队列中排队了
final boolean acquireQueued(final Node node, int arg) {
try {
boolean interrupted = false;
// 进入到死循环
for (;;) {
// 获取当前Node的prev节点
final Node p = node.predecessor();
// 如果p==head,说明当前节点是head.next,直接走tryAcquire抢锁
if (p == head && tryAcquire(arg)) {
// 说明抢锁成功
setHead(node);
p.next = null; // help GC
return interrupted;
}
// 执行挂起逻辑,通过UnSafe的park方法进行挂起
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;
}
} catch (Throwable t) {
cancelAcquire(node);
throw t;
}
}
private Node addWaiter(Node mode) {
// 将当前线程封装成Node对象,然后通过cas操作插入到队列的尾部
Node node = new Node(mode);
for (;;) {
Node oldTail = tail;
if (oldTail != null) {
U.putObject(node, Node.PREV, oldTail);
if (compareAndSetTail(oldTail, node)) {
oldTail.next = node;
return node;
}
} else {
initializeSyncQueue();
}
}
}
发表评论