`

并发包源码分析 -- AQS

    博客分类:
  • java
 
阅读更多

Java 的concurrent 包给我们的并发编程提供了更灵活高效的方案,开发人员可以很方便的利用API利用线程池去处理多任务。最近对concurrent包进行了比较细致的分析,最核心的基础莫过于AQS(AbstractQueuedSynchronizer)。下面看看来自官方的摘要。

 

Provides a framework for implementing blocking locks and related synchronizers (semaphores, events, etc) that rely on first-in-first-out (FIFO) wait queues. This class is designed to be a useful basis for most kinds of synchronizers that rely on a single atomic int value to represent state. Subclasses must define the protected methods that change this state, and which define what that state means in terms of this object being acquired or released. Given these, the other methods in this class carry out all queuing and blocking mechanics. Subclasses can maintain other state fields, but only the atomically updated int value manipulated using methods getState(),setState(int) and compareAndSetState(int,int) is tracked with respect to synchronization.

Subclasses should be defined as non-public internal helper classes that are used to implement the synchronization properties of their enclosing class. Class AbstractQueuedSynchronizer does not implement any synchronization interface. Instead it defines methods such asacquireInterruptibly(int) that can be invoked as appropriate by concrete locks and related synchronizers to implement their public methods.

 

 

提供基于FIFO的等待队列锁阻塞的一个框架,对于需要用atomic int 原子状态来控制锁的同步非常有用。并发包里面的各种Lock都是基于AQS实现,AQS 子类应该实现改变状态的方法。一般子类都是作为一个内部的帮助类去实现锁,下面来看一个不可重入锁的实现,状态0表示unlock,1表示lock。

class Mutex implements Lock, java.io.Serializable {

	   // Our internal helper class
	   private static class Sync extends AbstractQueuedSynchronizer {
	     // Report whether in locked state
	     protected boolean isHeldExclusively() {
	    	 
	       return getState() == 1;
	     }

	     // Acquire the lock if state is zero
	     public boolean tryAcquire(int acquires) {
	       assert acquires == 1; // Otherwise unused
	       if (compareAndSetState(0, 1)) {
	         setExclusiveOwnerThread(Thread.currentThread());
	         return true;
	       }
	       return false;
	     }

	     // Release the lock by setting state to zero
	     protected boolean tryRelease(int releases) {
	       assert releases == 1; // Otherwise unused
	       if (getState() == 0) throw new IllegalMonitorStateException();
	       setExclusiveOwnerThread(null);
	       setState(0);
	       return true;
	     }

	     // Provide a Condition
	     Condition newCondition() { return new ConditionObject(); }

	     // Deserialize properly
	     private void readObject(ObjectInputStream s)
	         throws IOException, ClassNotFoundException {
	       s.defaultReadObject();
	       setState(0); // reset to unlocked state
	     }
	   }

	   // The sync object does all the hard work. We just forward to it.
	   private final Sync sync = new Sync();

	   public void lock()                { sync.acquire(1); }
	   public boolean tryLock()          { return sync.tryAcquire(1); }
	   public void unlock()              { sync.release(1); }
	   public Condition newCondition()   { return sync.newCondition(); }
	   public boolean isLocked()         { return sync.isHeldExclusively(); }
	   public boolean hasQueuedThreads() { return sync.hasQueuedThreads(); }
	   public void lockInterruptibly() throws InterruptedException {
	     sync.acquireInterruptibly(1);
	   }
	   public boolean tryLock(long timeout, TimeUnit unit)
	       throws InterruptedException {
	     return sync.tryAcquireNanos(1, unit.toNanos(timeout));
	   }
}

 

 AQS用了一个内部的静态类实现了一个双向链表,此链表用来存储所有等待的线程以及其状态。 由于代码太多,我作了提炼,请看看Node的基本结构。

 

 static final class Node {
//Marker to indicate a node is waiting in shared mode
         static final Node SHARED = new Node();
//Marker to indicate a node is waiting in exclusive mode
         static final Node EXCLUSIVE = null;       
//waitStatus value to indicate thread has cancelled
         static final int CANCELLED =  1;      
//waitStatus value to indicate successor's thread needs unparking
         static final int SIGNAL    = -1;
//waitStatus value to indicate thread is waiting on condition
//        static final int CONDITION = -2;
 
         volatile int waitStatus;

        
//Link to predecessor node that current node/thread relies on for checking waitStatus. Assigned during enqueing, and nulled out (for sake of GC) only upon dequeuing. Also, upon cancellation of a predecessor, we short-circuit while finding a non-cancelled one, which will always exist because the head node is never cancelled: A node becomes head only as a result of successful acquire. A cancelled thread never succeeds in acquiring, and a thread only cancels itself, not any other node.

         volatile Node prev;

        
//Link to the successor node that the current node/thread unparks upon release. Assigned during enqueuing, adjusted when bypassing cancelled predecessors, and nulled out (for sake of GC) when dequeued. The enq operation does not assign next field of a predecessor until after attachment, so seeing a null next field does not necessarily mean that node is at end of queue. However, if a next field appears to be null, we can scan prev's from the tail to double-check. The next field of cancelled nodes is set to point to the node itself instead of null, to make life easier for isOnSyncQueue.
 
         volatile Node next;

        
//The thread that enqueued this node. Initialized on construction and nulled out after use.

        volatile Thread thread;

        
//Link to next node waiting on condition, or the special value SHARED. Because condition queues are accessed only when holding in exclusive mode, we just need a simple linked queue to hold nodes while they are waiting on conditions. They are then transferred to the queue to re-acquire. And because conditions can only be exclusive, we save a field by using special value to indicate shared mode.

        Node nextWaiter;
}

 

 链表的节点有三个状态,这些状态将在后面的逻辑中会参与判断该节点是否可以被park,是否可以被unpark.

waitingStauts默认是0,不属于任何下面的状态。任何对状态的修改都是CAS操作,当一个线程被加入锁的阻塞队列的时候状态会编程SIGNAL,表示我需要upark来唤醒。

waitStatus value to indicate thread has cancelled

        static final int CANCELLED =  1;

waitStatus value to indicate successor's thread needs unparking

       static final int SIGNAL    = -1;      

waitStatus value to indicate thread is waiting on condition

       static final int CONDITION = -2;

 

 

    public final void acquire(int arg) {
        if (!tryAcquire(arg) &&
            acquireQueued(addWaiter(Node.EXCLUSIVE), arg))   
         selfInterrupt();
    }

 这里简单的解释为:如果获取锁没有成功则入等待队列,入队是一定保证成功的,然后accquireQueued循环检查当前加入的节点是不是第一个节点,如果是则继续tryAcquire尝试获取锁成功则返回(如果为true则标记当前线程中断),否则将阻塞直到自己是第一个节点

 

 

 

final boolean acquireQueued(final Node node, int arg) {
         boolean failed = true;
         try {
             boolean interrupted = false;
             for (;;) {
                 final Node p = node.predecessor();
                 if (p == head && tryAcquire(arg)) {            
                     setHead(node);
                     p.next = null; // help GC
                     failed = false;
                     return interrupted;
                 }
                 if (shouldParkAfterFailedAcquire(p, node) &&     
                     parkAndCheckInterrupt())
                     interrupted = true;
             }
         } finally {
             if (failed)
                 cancelAcquire(node);
         }
     }

 这里判断当前节是否是处于队列的第一个节点,如果是则用tryAcquire尝试下(因为锁最终会被另外的线程释放),如果成功,则将节点从链表中移除重置头节点,并最终返回interrupted(注意:因为park在没有得到许可的时候会阻塞,唤醒必须是unpark或者响应interupt(),但是当唤醒的时候我们并不知道是哪种情况导致的唤醒,所以下面park的时候有个中断检查,如果interrupted为true则说明是由于中断唤醒的,否则是unpark唤醒的,这里无论interrupted是true还是false都表示获取锁成功了,不会阻塞,但是我们要将这个唤醒的原因告诉调用上层去继续决定怎么处理。实际上我们aquire方法中可以看到,又调用了一次标记中断,说明这里会继续交由线程去处理。中断导致的情况是极少发生的,因为我们不会随便调用中断

对于后面的if, 先进行状态检查看看是否真的应该阻塞(shouldParkAfterFailedAcquire,判断的逻辑在这个方法中,就是根据上面提到的状态判断的,初始状态为0,通过CAS将状态标记为SINGAL,表示需要unpark),如果是的则会通过LockSupport.park()将当前线程阻塞,等到其他线程unpark的时候唤醒进入下一个循环,直到获取锁。有点类似于while wait。

 

 

     private Node addWaiter(Node mode) {
         Node node = new Node(Thread.currentThread(), mode);
         // Try the fast path of enq; backup to full enq on failure
         Node pred = tail;
         if (pred != null) {
             node.prev = pred;
             if (compareAndSetTail(pred, node)) {  
                 pred.next = node;
                 return node;
             }
         }
         enq(node);
         return node;
     }

这个方法会将当前节点加入等待队列。首先判断尾巴节点是否为空,不为空则设通过CAS置尾巴节点设置为当前节点。 尾巴节点不为空说明等待队列不为空,直接将当前节点加入,如果失败还有后面的enq 方法通过轮询一定保证入队成功。

 

 

     private Node enq(final Node node) {
         for (;;) {                              //无限循环,通过CAS操作让当前节点安全入队。
             Node t = tail;
             if (t == null) { // Must initialize
                 if (compareAndSetHead(new Node()))  
                     tail = head;
             } else { 
                 node.prev = t;                       
                 if (compareAndSetTail(t, node)) {
                     t.next = node;
                     return t;
                 }
             }
         }
     }

 

对于第一个if 初始化时,首尾都为空,这个时候利用CAS加入一个new出来的空的头节点。

初始化完毕后,下一个循环既会进入else分支,对于第一个入队的节点,很显然前面初始化的时候已经成功入队,所以这里CAS尝试更新尾巴节点肯定失败,所以会继续循环更新尾巴节点,最终更新prev跟next(next指向自己)。对于队列大于1的入队这个CAS则有可能成功,并添加到尾巴节点。如果CAS失败则继续循环保证入队一定成功。

分享到:
评论

相关推荐

Global site tag (gtag.js) - Google Analytics