欢迎访问欧博网址!

佳木斯租房网:Java读源码之ReentrantLock(2)

admin1个月前5

前言

本文是 ReentrantLock 源码的第二篇,第一篇主要先容了公正锁非公正锁正常的加锁解锁流程,虽然表达能力有限不知道有没有讲清楚,本着不太监的原则,本文填补下第一篇中挖的坑。

Java读源码之ReentrantLock

源码剖析

感知中止锁

若是我们希望检测到中止后能马上抛出异常就用 lockInterruptibly 方式去加锁,照样建议用 lock 方式,自界说中止处置,更天真一点。

  • ReentrantLock#lockInterruptibly

我们只需要把 ReentrantLock#lock 改成 ReentrantLock#lockInterruptibly 方式就可以获得内部检测中止的锁了

public void lockInterruptibly() throws InterruptedException {
  sync.acquireInterruptibly(1);
}
  • AbstractQueuedSynchronizer#acquireInterruptibly

主要流程和前文先容的类似

public final void acquireInterruptibly(int arg)
  throws InterruptedException {
  // 一上来就检查下中止,中止直接异常,就没必要抢锁排队了
  if (Thread.interrupted())
    throw new InterruptedException();
  if (!tryAcquire(arg))
    doAcquireInterruptibly(arg);
}
  • AbstractQueuedSynchronizer#doAcquireInterruptibly

和正常加锁唯一区别就是这个方式,然则定睛一看是不是似曾相识?最大区别就是把中止标识给去掉了,检测到中止直接抛异常

private void doAcquireInterruptibly(int arg)
    throws InterruptedException {
  	// 大神也偷懒了,由于这个方式,只有独占锁且检查中止这一个应用场景,把节点入队的步骤也揉了进来
    final Node node = addWaiter(Node.EXCLUSIVE);
    boolean failed = true;
    try {
        for (;;) {
            final Node p = node.predecessor();
            if (p == head && tryAcquire(arg)) {
                setHead(node);
                p.next = null; // help GC
                failed = false;
                return;
            }
            if (shouldParkAfterFailedAcquire(p, node) &&
                parkAndCheckInterrupt())
              	// 当线程拿到锁苏醒过来,发现自己挂起历程被中止了,直接抛出异常
                throw new InterruptedException();
        }
    } finally {
      	// 只要发生了中止异常,就会进作废加锁方式
        if (failed)
            cancelAcquire(node);
    }
}
  • AbstractQueuedSynchronizer#cancelAcquire

此方式很有器械,只保证该节点失效,然后延迟移出守候行列

private void cancelAcquire(Node node) {
    if (node == null)
        return;
		// 把节点里挂号守候的线程去掉,完成这一步此节点已经没有作用了
    node.thread = null;

    // 下面的三步实在可以放到一个CAS中,直接设置 CANCELLED 状态 ,拿前一个节点,predNext 也一定是自己,然则吞吐量就下来了
    // 这里大神,没有这样做也是出于了性能思量,由于我们已经把守候线程设置成 null 了,以是此节点已经没有任何意义,没有必要去保证节点第一时间被释放,只要设置好 CANCELLED 状态
    // 就算后面 CAS 调整守候行列失败了,下次作废操作也会帮着接纳。响应地代码复杂度提高了。
  
    /* ----------------------------------------- */
    // 找到自己前面第一个没作废的节点,
    Node pred = node.prev;
    while (pred.waitStatus > 0)
        node.prev = pred = pred.prev;
    // 主要是为了下面把链表接上
    Node predNext = pred.next;
    // 这里逻辑上把当前节点的状态设置成作废,便于检测释放
    node.waitStatus = Node.CANCELLED;
  	/* ----------------------------------------- */
  
    // 若是当前节点是尾节点,就把前一个没作废的节点设成新尾巴
    if (node == tail && compareAndSetTail(node, pred)) {
      	// 把新尾巴的 next 设置成空
        compareAndSetNext(pred, predNext, null);
    } else {
        // 进到这里说明当前节点一定不是尾节点了
        int ws;
      	// 条件1: 若是前一个非作废节点不是头,也就是还需要排队
      	// 条件2: 若是前一个节点为 SIGNAL,也就是说后面一定另有线程守候被叫醒
      	// 条件3: 若是前一个节点也作废了,说明前一个节点也作废了,还没来得及设置状态
        if (pred != head &&
            ((ws = pred.waitStatus) == Node.SIGNAL ||
             (ws <= 0 && compareAndSetWaitStatus(pred, ws, Node.SIGNAL))) &&
            pred.thread != null) {
            Node next = node.next;
            if (next != null && next.waitStatus <= 0)
              	// 当前节点后一个没作废的话,就接到前一个正常的节点后面
                compareAndSetNext(pred, predNext, next);
        } else {
          	// 前一篇文章解锁部门讲过,会把下一个节点中的线程恢复,然后把后继节点接上
            unparkSuccessor(node);
        }

      	// 有点花里胡哨,直接 = null不行么,
        node.next = node; // help GC
    }
}

来张图说明下,如果我们现在守候行列里有7个线程:

守候条件锁

上篇文章看源码历程中,AQS中有个 CONDITION 状态没有研究

static final int CONDITION = -2;

ReentrantLock 中的 newCondition 等 Condition 相关方式正是基于 AQS 中的实现的,让我们先大致领会一波作用和用法

Condition简介

Condition 类似于 Object 中的 wait 和 notify ,主要用于线程间通讯,最大的优势是 Object 的 wait 是把线程放到当前工具的守候池中,也就是说一个工具只能有一个守候条件,而 Condition 可以支持多个守候条件,举个例子,商品要等至少三小我私家预定了才最先发售,第一个预定的减500,第二三两个减100。正式发售之后恢复原价。

public class ReentrantLockConditionDemo {

    private final ReentrantLock reentrantLock = new ReentrantLock();
    private final Condition wait1 = reentrantLock.newCondition();
    private final Condition wait2 = reentrantLock.newCondition();
    private int wait1Count = 0;
    private int wait2Count = 0;

    public void buy() {
        int price = 999;
        reentrantLock.lock();
        try {
            while (wait1Count++ < 1) {
                System.out.println(Thread.currentThread().getName() + "减500");
                wait1.await();
                price -= 500;
            }
            wait1.signal();
            while (wait2Count++ < 2) {
                System.out.println(Thread.currentThread().getName() + "减100");
                wait2.await();
                price -= 100;
            }
            wait2.signal();
            System.out.println(Thread.currentThread().getName() + "得手价" + price);
        } catch (InterruptedException e) {
            e.printStackTrace();
        } finally {
            reentrantLock.unlock();
        }
    }

    public static void main(String[] args) {
        ExecutorService executorService = Executors.newCachedThreadPool();
        ReentrantLockConditionDemo reentrantLockConditionDemo = new ReentrantLockConditionDemo();
        IntStream.rangeClosed(0, 4)
                .forEach(num -> executorService
                        .execute(reentrantLockConditionDemo::buy)
                );
    }
  
    /**
     * 输出:
     *
     * pool-1-thread-1减500
     * pool-1-thread-2减100
     * pool-1-thread-3减100
     * pool-1-thread-4得手价999
     * pool-1-thread-5得手价999
     * pool-1-thread-1得手价499
     * pool-1-thread-2得手价899
     * pool-1-thread-3得手价899
     */
}
  • ReentrantLock#newCondition

先来看条件的建立,需要基于锁工具使用 newCondition 去建立

public Condition newCondition() {
  return sync.newCondition();
}

final ConditionObject newCondition() {
  // ConditionObject 是 AQS 中对 Condition 的实现
  return new ConditionObject();
}

ConditionObject结构

上一篇文章中先容了 Node 结构,这里条件也使用了这个节点界说了一个单链表,统称为条件行列,上一篇先容统称同步行列。条件行列结构相当简朴就不单独绘图了。

// 条件行列头
private transient Node firstWaiter;
// 条件行列尾
private transient Node lastWaiter;

// 由于默认感知中止,需要思量若何处置
// 退出条件行列时重新设置中止位
private static final int REINTERRUPT =  1;
// 退出条件行列时直接抛异常
private static final int THROW_IE    = -1;

条件行列入队

  • AbstractQueuedSynchronizer.ConditionObject#await
public final void await() throws InterruptedException {
  if (Thread.interrupted())
    throw new InterruptedException();
  // 到条件行列中排队,下文详解
  Node node = addConditionWaiter();
  // 此方式比较简朴,就是挪用前一篇讲过的 release 方式释放锁(挪用 await 时必定是锁的持有者)
  // savedState 是进入条件行列前,持有锁的数目
  // 失败会直接抛出异常,而且最终把节点状态设置为 CANCELLED
  int savedState = fullyRelease(node);
  int interruptMode = 0;
  // 判断在不在同步行列(当挪用signal之后会从条件行列移到同步行列),此判断很简朴:节点状态是 CONDITION 一定 false,否则就到同步行列中去找
  while (!isOnSyncQueue(node)) {
    // 挂起
    LockSupport.park(this);
    // 检查是不是由于中止被叫醒的,下文详解
    if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
      break;
  }
  // 上一篇先容过acquireQueued自旋抢锁,若是抢到锁了,而且中止模式不是 -1(默认0),就纪录中止模式为1,示意需要重新设置中止
  if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
    interruptMode = REINTERRUPT;
  // 消灭条件行列中作废的节点
  if (node.nextWaiter != null)
    // 下文详解,在addConditionWaiter方式中也有用到
    unlinkCancelledWaiters();
  // 处置中止
  if (interruptMode != 0)
    // 1:再次中止	-1:抛出异常
    reportInterruptAfterWait(interruptMode);
}
  • AbstractQueuedSynchronizer.ConditionObject#addConditionWaiter

加入条件行列

private Node addConditionWaiter() {
  Node t = lastWaiter;
  // 若是条件行列最后一个节点作废了,就清算
  if (t != null && t.waitStatus != Node.CONDITION) {
    unlinkCancelledWaiters();
    t = lastWaiter;
  }
  // 新建一个 waitStatus = -2 的节点
  Node node = new Node(Thread.currentThread(), Node.CONDITION);
  // 下面是简朴的单链表操作,之前同步行列入队用的 CAS 操作,由于会有许多线程去抢锁,而线程进入条件行列一定是拿到锁了,不满足条件了,以是不存在并发问题
  if (t == null)
    firstWaiter = node;
  else
    t.nextWaiter = node;
  lastWaiter = node;
  return node;
}
  • AbstractQueuedSynchronizer.ConditionObject#unlinkCancelledWaiters
private void unlinkCancelledWaiters() {
    Node t = firstWaiter;
    // 辅助变量,用于接尾巴,trail始终即是循环中当前节点t的上一个不是作废状态的节点
    Node trail = null;
    while (t != null) {
        Node next = t.nextWaiter;
        // 判断当前节点有没有作废
        if (t.waitStatus != Node.CONDITION) {
            // 断当前节点链
            t.nextWaiter = null;
            // trail == null 说明现在条件行列内里全作废了
            if (trail == null)
                // 头节点指向第一个没作废的节点
                firstWaiter = next;
            else
                // trail 是 t 的前一个节点,也就是踢出了 t
                trail.nextWaiter = next;
            // 若是最后一个节点作废了,那需要改一下尾指针
            if (next == null)
                lastWaiter = trail;
        }
        else
            trail = t;
        t = next;
    }
}
  • AbstractQueuedSynchronizer.ConditionObject#checkInterruptWhileWaiting

上文 await 方式中,线程一旦叫醒会先检查中止

private int checkInterruptWhileWaiting(Node node) {
    // 没中止,返回0,中止了需要放回同步行列
    return Thread.interrupted() ?
        (transferAfterCancelledWait(node) ? THROW_IE : REINTERRUPT) :
    0;
}
  • AbstractQueuedSynchronizer#transferAfterCancelledWait
// 若是
final boolean transferAfterCancelledWait(Node node) {
    // 把由于中止醒来的节点,设置状态为全新的节点,从条件行列放入同步行列
    if (compareAndSetWaitStatus(node, Node.CONDITION, 0)) {
        enq(node);
        return true;
    }
    // 上面改状态为什么要 CAS ? 若是中止叫醒的同时被 signal 叫醒了,在 signal 入队乐成之前让出cpu,然则不释放锁
    while (!isOnSyncQueue(node))
        Thread.yield();
    return false;
}

条件行列出队

单个叫醒和叫醒以是掉的方式类似,看一个单个叫醒流程就可

  • AbstractQueuedSynchronizer.ConditionObject#signal
public final void signal() {
    // 若是持有锁的线程不是当前线程就抛异常,也就是只有获得锁的线程可以执行叫醒操作
    if (!isHeldExclusively())
        throw new IllegalMonitorStateException();
    Node first = firstWaiter;
    // 通知条件行列中的第一个节点,也就是等的最久的节点
    if (first != null)
        doSignal(first);
}
  • AbstractQueuedSynchronizer.ConditionObject#doSignal
private void doSignal(Node first) {
    do {
        // 把 first 断链
        if ( (firstWaiter = first.nextWaiter) == null)
            lastWaiter = null;
        first.nextWaiter = null;
        // 若是转移到同步行列失败了,而且另有条件行列不为空就叫醒下一个
    } while (!transferForSignal(first) &&
             (first = firstWaiter) != null);
}
  • AbstractQueuedSynchronizer#transferForSignal
final boolean transferForSignal(Node node) {
    // 若是节点作废了,转移失败
    if (!compareAndSetWaitStatus(node, Node.CONDITION, 0))
        return false;

    // 这里的 p 是 node 在同步行列里的前驱节点
    Node p = enq(node);
    int ws = p.waitStatus;
    // 看过上一篇文章应该有映像,只要是进同步行列,都需要把前一个节点状态设为 -1
    if (ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL))
        // 若是作废了,或者状态设置失败,叫醒后继续挂起
        LockSupport.unpark(node.thread);
    return true;
}

最后根据老例连系上面的案例,画张图总结下:

,

sunbet

www.0379st.com信誉来自于每一位客户的口碑,Sunbet贴心的服务,让你尊享贵宾通道,秒速提现,秒速到账,同行业中体验最佳。

上一篇 下一篇

猜你喜欢

最新文章
热评文章
热门文章
随机文章
热门标签