[JAVA]并发编程-AQS

AQS全称AbstractQueuedSynchronizer,是JUC包下的一个核心的抽象类,是用于构建锁和其他同步器的基础框架,子类可以通过继承AQS实现同步的需求。比如ThreadPoolExecutor,CountDownLatch,ReentrantLock,Semaphore,都是基于AQS去实现的。

在AQS中有三大核心内容:

1.核心属性state

/**
 * The synchronization state.
 */
private volatile int state;

就拿ReentrantLock来说,state等于0,就代表当前没有线程持有这个lock锁,如果state大于0就代表当前的lock锁被某个线程持有。

2.同步队列

其实同步队列它是一个双向链表的数据结构,还继续拿ReentrantLock举例来说,如果A线程想获取锁资源,但是发现锁资源被其他线程占用着,A线程就会被封装成一个Node对象,进入到同步队列尾部去排队,等待锁资源。

3.Condition单向链表

public class ConditionObject implements Condition, java.io.Serializable {
        private static final long serialVersionUID = 1173984872572414699L;
        /** First node of condition queue. */
        private transient Node firstWaiter;
        /** Last node of condition queue. */
        private transient Node lastWaiter;

        /**
         * Creates a new {@code ConditionObject} instance.
         */
        public ConditionObject() { }
        .......
}

本质就是AQS内部类ConditionObject类中提供的一个基于Node对象组成的单向链表,是通过Node中的nextWaiter进行连接的。主要作用是:但持有lock锁的线程调用了await方法,会将当前线程封装成Node对象插入到单向链表中,等待被其他线程调用signal唤醒,然后扔到同步队列中去竞争锁资源。

公平锁和非公平锁

继续拿ReentrantLock来举例,在ReentrantLock中并没有直接的继承AQS,而是通过抽象静态内部类Sync去继承AQS。锁的具体实现分为了公平锁(FairSync)和非公平锁(NoFairSync)。从代码上看两种锁的具体实现逻辑。

// 公平锁
final void lock() {
    acquire(1);
}

//非公平锁
final void lock() {
    if (compareAndSetState(0, 1))
        setExclusiveOwnerThread(Thread.currentThread());
    else
        acquire(1);
}

下面具体看一下AQS关键方法的代码实现

// 公平锁的逻辑
protected final boolean tryAcquire(int acquires) {
    final Thread current = Thread.currentThread();
    int c = getState();
    if (c == 0) {
        // 如果当前锁资源没有被占用,需要通过一定的判断尝试去获取锁
        //1.如果同步队列中没有排队的Node,直接执行cas去抢锁
        //2.如果有排队的Node,并且排在最前面的是当前线程,直接执行cas去抢锁
        //3.如果有排队的Node,并且排在最前面的不是当前线程,不能抢锁。
        if (!hasQueuedPredecessors() &&
            compareAndSetState(0, acquires)) {
            setExclusiveOwnerThread(current);
            return true;
        }
    }
    else if (current == getExclusiveOwnerThread()) {
        int nextc = c + acquires;
        if (nextc < 0)
            throw new Error("Maximum lock count exceeded");
        setState(nextc);
        return true;
    }
    return false;
}
// 非公平锁的逻辑
final boolean nonfairTryAcquire(int acquires) {
    // 获取当前线程
    final Thread current = Thread.currentThread();
    // 获取state属性的值
    int c = getState();
    if (c == 0) {
        // 当前锁资源没有被线程持有,通过cas获取锁资源,尝试将state从0改为1
        if (compareAndSetState(0, acquires)) {
            // 修改成功,将exclusiveOwnerThread设置为当前线程
            setExclusiveOwnerThread(current);
            // 返回true,代表拿锁成功
            return true;
        }
    }
    // 当前锁资源被占用,判断占用锁资源的是不是当前线程
    else if (current == getExclusiveOwnerThread()) {
        // state加1
        int nextc = c + acquires;
        if (nextc < 0) // overflow
            throw new Error("Maximum lock count exceeded");
        // 赋值给state
        setState(nextc);
        // 锁重入成功
        return true;
    }
    return false;
}
// 执行到这个方法时,就代表当前线程已经在同步队列中排队了
final boolean acquireQueued(final Node node, int arg) {
    try {
        boolean interrupted = false;
        // 进入到死循环
        for (;;) {
            // 获取当前Node的prev节点
            final Node p = node.predecessor();
            // 如果p==head,说明当前节点是head.next,直接走tryAcquire抢锁
            if (p == head && tryAcquire(arg)) {
                // 说明抢锁成功
                setHead(node);
                p.next = null; // help GC
                return interrupted;
            }
            // 执行挂起逻辑,通过UnSafe的park方法进行挂起
            if (shouldParkAfterFailedAcquire(p, node) &&
                parkAndCheckInterrupt())
                interrupted = true;
        }
    } catch (Throwable t) {
        cancelAcquire(node);
        throw t;
    }
}
private Node addWaiter(Node mode) {
    // 将当前线程封装成Node对象,然后通过cas操作插入到队列的尾部
    Node node = new Node(mode);

    for (;;) {
        Node oldTail = tail;
        if (oldTail != null) {
            U.putObject(node, Node.PREV, oldTail);
            if (compareAndSetTail(oldTail, node)) {
                oldTail.next = node;
                return node;
            }
        } else {
            initializeSyncQueue();
        }
    }
}

标签

发表评论