本文共 9668 字,大约阅读时间需要 32 分钟。
public final void acquire(int arg) { if (!tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) selfInterrupt(); }
if 语句块中,先尝试执行获取操作 tryAcquire(int arg) (可以发现该方法仅仅是抛出异常,因为需要在子类中覆盖该方法,以实现我们自定义的需求/逻辑,暂时不管),如果获取操作失败则 addWaiter 方法将当前线程构造成一个独占模式的 Node 节点,并将该节点加入到同步队列的尾部,此时节点的状态 waitStatus 为 0(初始状态),重点在于 acquireQueued(Node node, int arg) 方法,该方法使得当前线程尝试以独占且不可中断的模式获取锁。
2. acquireQueued(Node node, int arg) 方法,该方法返回一个 boolean 值,如果返回值为 true,则当前线程执行自我中断,并从 acquire(int arg) 方法返回。acquireQueued 方法源代码如下:final boolean acquireQueued(final Node node, int arg) { boolean failed = true; try { boolean interrupted = false; for (;;) { final Node p = node.predecessor(); if (p == head && tryAcquire(arg)) { setHead(node); p.next = null; // help GC failed = false; return interrupted; } if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()) interrupted = true; } } finally { if (failed) cancelAcquire(node); } }
首先看 for 循环,如果当前节点(即 当前线程所在的节点)的前任节点为头节点 head 的话,当前线程会再次尝试执行获取操作,如果成功获取,则将自己设置为头节点,并返回自己的中断状态。如果获取失败则进入 shouldParkAfterFailedAcquire 方法判断是否 park(通过调用 LockSupport.park() 方法使得调用线程让出 CPU,退出线程调度) 自己,进入 shouldParkAfterFailedAcquire 方法,我们发现,使得当前线程 park 自己的唯一条件(即 使得 shouldParkAfterFailedAcquire 返回 true)是当前节点的前任节点的状态 waitStatus 值为 Node.SIGNAL(该状态表示后继节点的线程需要被 unpark),注意,这里是一个节点(当前线程所在的节点)设置另一个节点(当前节点的前任节点)的状态 waitStatus 值。当前节点将它的前任节点的状态设置为 Node.SIGNAL 后会在紧接着的下一次 for 循环中 park 自己,设置前任节点的状态为 Node.SIGNAL 目的是告诉前任节点在它释放锁时通知自己,完成设置操作,当前线程才能安心的 park 自己,因为它知道自己会被通知(signal)的。
for 循环可能发生异常,导致执行失败(即 failed = true)(我认为应该是为了处理 tryAcquire 方法可能的异常,因为该方法需要子类覆盖以实现自定义的获取操作逻辑,其行为是未知的,考虑其异常处理是必要的),如果失败(即 failed = true),则执行取消获取 cancelAcquire,从同步队列中移除自己,注意,只有 try 语句执行异常的线程才会进而执行取消操作。public final boolean release(int arg) { if (tryRelease(arg)) { Node h = head; if (h != null && h.waitStatus != 0) unparkSuccessor(h); return true; } return false; }
当成功释放锁(即 tryRelease(int arg) 方法返回 true,可以发现该方法仅仅是抛出异常,因为需要在子类中覆盖该方法,以实现我们自定义的需求/逻辑,暂时不管)时,会执行唤醒同步队列中头节点的后继节点,重点在于 unparkSuccessor 方法,注意,在该方法中并没有更新头节点的操作,因为当某个线程获取到该独占锁时,会将自己设置为头节点。
protected boolean tryAcquire(int arg) { // 独占式获取 throw new UnsupportedOperationException(); } protected boolean tryRelease(int arg) { // 独占式释放 throw new UnsupportedOperationException(); } protected int tryAcquireShared(int arg) { // 共享式获取 throw new UnsupportedOperationException(); } protected boolean tryReleaseShared(int arg) { // 共享式释放 throw new UnsupportedOperationException(); } protected boolean isHeldExclusively() { throw new UnsupportedOperationException(); }
该类也提供了修改同步器内部状态的方法,我们覆盖上述获取及释放锁的方法时,通过修改同步器内部状态值来定义锁的获取及释放语义。修改同步器内部状态值的方法如下:
/* 获取同步器内部状态 int 值 */ protected final int getState() { return state; } /* 设置同步器内部状态,该方法本身非线程安全, * 必须在确保线程安全的场景中才能使用该方法 */ protected final void setState(int newState) { state = newState; } /* CAS 操作设置同步器内部状态值,线程安全 */ protected final boolean compareAndSetState(int expect, int update) { return unsafe.compareAndSwapInt(this, stateOffset, expect, update); }
/* 自定义独占式锁/排它锁 */class MMLock{ private final Sync sync; /* 记录当前持有锁的线程对象,用来防止未持有锁的线程试图释放锁 */ private volatile Thread threadOwnedLock; /* 将获取锁与释放锁的操作委托给内部实现类 Sync */ public MMLock() { sync = new Sync(); } /* 获取锁 */ public void lock() { sync.acquire(1); } /* 释放锁 */ public void unlock() { if (threadOwnedLock == Thread.currentThread()) { sync.release(1); } } private class Sync extends AbstractQueuedSynchronizer{ private static final long serialVersionUID = 1L; @Override protected boolean tryAcquire(int arg) { boolean res = super.compareAndSetState(0, arg); if (res) { // 记录成功持有锁的线程对象,用来防止未持有锁的线程释放锁 threadOwnedLock = Thread.currentThread(); } return res; } @Override protected boolean tryRelease(int arg) { if (super.getState() == arg) { threadOwnedLock = null; setState(0); // 释放锁 } return true; } }}
可以看到这个自定义排它锁的实现非常简单(内部实现中,状态 0 表示锁 ’空闲‘,锁可以被获取,状态 1 表示 ‘已加锁’,而对于锁的使用者,只需执行获取及释放锁操作即可),仅仅选择性覆盖/实现了 tryAcquire 和 tryRelease 这两个方法,其中 tryAcquire 方法通过 CAS 操作(即 compareAndSwap )提供了原子操作保证,因为同一时间可能会有多个线程试图获取锁,而 tryRelease 方法却没有使用 CAS 操作,因为这是排它锁,且通过变量 threadOwnedLock 记录当前持有锁的线程对象,使得仅仅持有锁的线程才能执行释放锁操作,所以释放操作不会有多线程同时执行的情况,注意,变量 threadOwnedLock 使用 volatile 关键字修饰,确保内存可见性。
将第一篇文章中累加器程序使用的利用 ‘等待/通知’机制 实现的锁 MyLock 更换为上述的自定义锁组件 MMLock 类,可见,程序可以得到正确结果。/* 观察自定义锁组件 MMLock 对象的内部状态 */import java.util.concurrent.locks.AbstractQueuedSynchronizer;public class Temp_2 { public static void main(String[] args) throws InterruptedException { MMLock lock = new MMLock(); int thread_num = 3; // 线程数量为 3 Thread[] threads = new Thread[thread_num]; for(int i = 0;i < thread_num;i++) { threads[i] = new Thread(new Runnable() { @Override public void run() { try { Thread.sleep(1 * 1000); } catch (InterruptedException e) { e.printStackTrace(); } // 尝试获取锁,因为主线程(main-thread)先获取锁,且一直不释放锁,导致该线程进入同步队列等待 lock.lock(); } }, "t-" + i); // 线程名称分别为:t-0,t-1,t-2,共 3 个线程 } for(int i = 0;i < thread_num;i++) { threads[i].start(); } lock.lock(); // 主线程获取锁 System.out.println("main-thread acquired lock, then into sleep"); Thread.sleep(0); // 主线程持有锁,且一直不释放。只管睡觉。 }}/* 自定义独占式锁/排它锁 */class MMLock{ private final Sync sync; /* 记录当前持有锁的线程对象,用来防止未持有锁的线程试图释放锁 */ private volatile Thread threadOwnedLock; /* 将获取锁与释放锁的操作委托给内部实现类 Sync */ public MMLock() { sync = new Sync(); } public void lock() { sync.acquire(1); } public void unlock() { if (threadOwnedLock == Thread.currentThread()) { sync.release(1); } } private class Sync extends AbstractQueuedSynchronizer{ private static final long serialVersionUID = 1L; @Override protected boolean tryAcquire(int arg) { boolean res = super.compareAndSetState(0, arg); if (res) { // 记录成功持有锁的线程对象,用来防止未持有锁的线程释放锁 threadOwnedLock = Thread.currentThread(); } return res; } @Override protected boolean tryRelease(int arg) { if (super.getState() == arg) { threadOwnedLock = null; setState(0); // 释放锁 } return true; } }}
先运行程序,然后使用 JDK 自带的工具 Java VisualVM 查看运行中的程序,选中运行中的程序,在 ‘Monitor’ 面板,点击 ‘Heap Dump’ 按钮执行堆快照操作,然后在生成的快照面板,在 ‘Classes’ 界面使用类名称 ‘mmlock’ 筛选类,可以看到结果显示有一个该类的实例,显示如下:
双击上图中的第二行,进入查看该类型实例对象的界面,如下图: 可以发现,当前持有锁的线程为主线程(main),而其它 3 个线程(t-0,t-1,t-2)都在同步队列中等待,通过查看同步队列中节点的状态 waitStatus 值,可以发现除了最后一个节点即 尾节点的状态值为 0(初始状态值),其它节点包括头节点的状态值都为 -1 即 Node.SIGNAL 的值,这是因为每个节点的线程在 park 自己之前,会设置它的前任节点的状态值为 Node.SIGNAL,以表示当前节点需要被通知(即 signal),而尾节点并没有后继节点,所以它还是初始值。可见,同步队列中,从头节点开始,每一个节点对紧跟它的后继节点负责,负责 signal 它的后继节点。通过查看 ‘Threads’ 面板可以看到,这个 3 个线程都在 park 方法上 waiting,如下图: - 还没有说共享锁的自定义实现,在下一篇文章中继续讨论。转载地址:http://rclsi.baihongyu.com/