百度360必应搜狗淘宝本站头条
当前位置:网站首页 > 技术文章 > 正文

「每天一道面试题」CyclicBarrier的实现原理?

cac55 2024-09-19 17:03 24 浏览 0 评论

要点解说

CyclicBarrier是一个同步辅助类,它允许一组线程互相等待,直到所有线程都到达某个公共屏障点(也可以叫同步点),即相互等待的线程都完成调用await方法,所有被屏障拦截的线程才会继续运行await方法后面的程序。在涉及一组固定大小的线程的程序中,这些线程必须不时地互相等待,此时CyclicBarrier很有用。因为该屏障点在释放等待线程后可以重用,所以称它为循环的屏障点。CyclicBarrier支持一个可选的Runnable命令,在一组线程中的最后一个线程到达屏障点之后(但在释放所有线程之前),该命令只在所有线程到达屏障点之后运行一次,并且该命令由最后一个进入屏障点的线程执行。

实例演示

CyclicBarrier简单使用样例。

public class CyclicBarrierDemo {

@Test

public void test() {

final CyclicBarrier barrier = new CyclicBarrier(2, myThread);

new Thread(new Runnable() {

@Override

public void run() {

try {

System.out.println(Thread.currentThread().getName());

barrier.await();

System.out.println(Thread.currentThread().getName());

} catch (Exception e) {

e.printStackTrace();

}

}

}, "thread1").start();

new Thread(new Runnable() {

@Override

public void run() {

try {

System.out.println(Thread.currentThread().getName());

barrier.await();

System.out.println(Thread.currentThread().getName());

} catch (Exception e) {

e.printStackTrace();

}

}

}, "thread2").start();

}

Thread myThread = new Thread(new Runnable() {

@Override

public void run() {

System.out.println("myThread");

}

}, "thread3");

}

结果输出:

thread1

thread2

myThread

thread2

thread1

方法解析

1.CyclicBarrier(int parties, Runnable barrierAction) 创建一个CyclicBarrier实例,parties指定参与相互等待的线程数,barrierAction指定当所有线程到达屏障点之后,首先执行的操作,该操作由最后一个进入屏障点的线程执行。

2.CyclicBarrier(int parties) 创建一个CyclicBarrier实例,parties指定参与相互等待的线程数。

3.getParties() 返回参与相互等待的线程数。

4.await() 该方法被调用时表示当前线程已经到达屏障点,当前线程阻塞进入休眠状态,直到所有线程都到达屏障点,当前线程才会被唤醒。

5.await(long timeout, TimeUnit unit) 该方法被调用时表示当前线程已经到达屏障点,当前线程阻塞进入休眠状态,在timeout指定的超时时间内,等待其他参与线程到达屏障点;如果超出指定的等待时间,则抛出TimeoutException异常,如果该时间小于等于零,则此方法根本不会等待。

6.isBroken() 判断此屏障是否处于中断状态。如果因为构造或最后一次重置而导致中断或超时,从而使一个或多个参与者摆脱此屏障点,或者因为异常而导致某个屏障操作失败,则返回true;否则返回false。

7.reset() 将屏障重置为其初始状态。

8.getNumberWaiting() 返回当前在屏障处等待的参与者数目,此方法主要用于调试和断言。

源码解析

CyclicBarrier(int parties, Runnable barrierAction)和await()方法是CyclicBarrier的核心,本篇重点分析这两个方法的背后实现原理。 首先,看一下CyclicBarrier内声明的一些属性信息:

//用于保护屏障入口的锁

private final ReentrantLock lock = new ReentrantLock();

//线程等待条件

private final Condition trip = lock.newCondition();

//记录参与等待的线程数

private final int parties;

//当所有线程到达屏障点之后,首先执行的命令

private final Runnable barrierCommand;

private Generation generation = new Generation();

//实际中仍在等待的线程数,每当有一个线程到达屏障点,count值就会减一;当一次新的运算开始后,count的值被重置为parties

private int count;

其中,Generation是CyclicBarrier的一个静态内部类,它只有一个boolean类型的属性,具体代码如下:

private static class Generation {

boolean broken = false;

}

当使用构造方法创建CyclicBarrier实例的时候,就是给上面这些属性赋值,

//创建一个CyclicBarrier实例,parties指定参与相互等待的线程数,

//barrierAction指定当所有线程到达屏障点之后,首先执行的操作,该操作由最后一个进入屏障点的线程执行。

public CyclicBarrier(int parties, Runnable barrierAction) {

if (parties <= 0) throw new IllegalArgumentException();

this.parties = parties;

this.count = parties;

this.barrierCommand = barrierAction;

}

//创建一个CyclicBarrier实例,parties指定参与相互等待的线程数

public CyclicBarrier(int parties) {

this(parties, null);

}

当调用await()方法时,当前线程已经到达屏障点,当前线程阻塞进入休眠状态,

//该方法被调用时表示当前线程已经到达屏障点,当前线程阻塞进入休眠状态

//直到所有线程都到达屏障点,当前线程才会被唤醒

public int await() throws InterruptedException, BrokenBarrierException {

try {

return dowait(false, 0L);

} catch (TimeoutException toe) {

throw new Error(toe); // cannot happen;

}

}

//该方法被调用时表示当前线程已经到达屏障点,当前线程阻塞进入休眠状态

//在timeout指定的超时时间内,等待其他参与线程到达屏障点

//如果超出指定的等待时间,则抛出TimeoutException异常,如果该时间小于等于零,则此方法根本不会等待

public int await(long timeout, TimeUnit unit)

throws InterruptedException,

BrokenBarrierException,

TimeoutException {

return dowait(true, unit.toNanos(timeout));

}

private int dowait(boolean timed, long nanos)

throws InterruptedException, BrokenBarrierException,

TimeoutException {

//使用独占资源锁控制多线程并发进入这段代码

final ReentrantLock lock = this.lock;

//独占锁控制线程并发访问

lock.lock();

try {

final Generation g = generation;

if (g.broken)

throw new BrokenBarrierException();

//如果线程中断,则唤醒所有等待线程

if (Thread.interrupted()) {

breakBarrier();

throw new InterruptedException();

}

//每调用一次await()方法,计数器就减一

int index = --count;

//当计数器值等于0的时

if (index == 0) { // tripped

boolean ranAction = false;

try {

final Runnable command = barrierCommand;

//如果在创建CyclicBarrier实例时设置了barrierAction,则先执行barrierAction

if (command != null)

command.run();

ranAction = true;

//当所有参与的线程都到达屏障点,为唤醒所有处于休眠状态的线程做准备工作

//需要注意的是,唤醒所有阻塞线程不是在这里

nextGeneration();

return 0;

} finally {

if (!ranAction)

breakBarrier();

}

}

// loop until tripped, broken, interrupted, or timed out

for (;;) {

try {

if (!timed)

//让当前执行的线程阻塞,处于休眠状态

trip.await();

else if (nanos > 0L)

//让当前执行的线程阻塞,在超时时间内处于休眠状态

nanos = trip.awaitNanos(nanos);

} catch (InterruptedException ie) {

if (g == generation && ! g.broken) {

breakBarrier();

throw ie;

} else {

// We're about to finish waiting even if we had not

// been interrupted, so this interrupt is deemed to

// "belong" to subsequent execution.

Thread.currentThread().interrupt();

}

}

if (g.broken)

throw new BrokenBarrierException();

if (g != generation)

return index;

if (timed && nanos <= 0L) {

breakBarrier();

throw new TimeoutException();

}

}

} finally {

//释放独占锁

lock.unlock();

}

}

private void nextGeneration() {

//为唤醒所有处于休眠状态的线程做准备工作

trip.signalAll();

//重置count值为parties

count = parties;

//重置中断状态为false

generation = new Generation();

}

private void breakBarrier() {

//重置中断状态为true

generation.broken = true;

//重置count值为parties

count = parties;

//为唤醒所有处于休眠状态的线程做准备工作

trip.signalAll();

}

到这里CyclicBarrier的实现原理基本已经都清楚了,下面来深入源码分析一下线程阻塞代码trip.await()和线程唤醒trip.signalAll()的实现。

//await()是AQS内部类ConditionObject中的方法

public final void await() throws InterruptedException {

//如果线程中断抛异常

if (Thread.interrupted())

throw new InterruptedException();

//新建Node节点,并将新节点加入到Condition等待队列中

//Condition等待队列是AQS内部类ConditionObject实现的,ConditionObject有两个属性,分别是firstWaiter和lastWaiter,都是Node类型

//firstWaiter和lastWaiter分别用于代表Condition等待队列的头结点和尾节点

Node node = addConditionWaiter();

//释放独占锁,让其它线程可以获取到dowait()方法中的独占锁

int savedState = fullyRelease(node);

int interruptMode = 0;

//检测此节点是否在资源等待队列(AQS同步队列)中,

//如果不在,说明此线程还没有竞争资源锁的权利,此线程继续阻塞,直到检测到此节点在资源等待队列上(AQS同步队列)中

//这里出现了两个等待队列,分别是Condition等待队列和AQS资源锁等待队列(或者说是同步队列)

//Condition等待队列是等待被唤醒的线程队列,AQS资源锁等待队列是等待获取资源锁的队列

while (!isOnSyncQueue(node)) {

//阻塞当前线程,当前线程进入休眠状态,可以看到这里使用LockSupport.park阻塞当前线程

LockSupport.park(this);

if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)

break;

}

if (acquireQueued(node, savedState) && interruptMode != THROW_IE)

interruptMode = REINTERRUPT;

if (node.nextWaiter != null) // clean up if cancelled

unlinkCancelledWaiters();

if (interruptMode != 0)

reportInterruptAfterWait(interruptMode);

}

//addConditionWaiter()是AQS内部类ConditionObject中的方法

private Node addConditionWaiter() {

Node t = lastWaiter;

// 将condition等待队列中,节点状态不是CONDITION的节点,从condition等待队列中移除

if (t != null && t.waitStatus != Node.CONDITION) {

unlinkCancelledWaiters();

t = lastWaiter;

}

//以下操作是用此线程构造一个节点,并将之加入到condition等待队列尾部

Node node = new Node(Thread.currentThread(), Node.CONDITION);

if (t == null)

firstWaiter = node;

else

t.nextWaiter = node;

lastWaiter = node;

return node;

}

//signalAll是AQS内部类ConditionObject中的方法

public final void signalAll() {

if (!isHeldExclusively())

throw new IllegalMonitorStateException();

//Condition等待队列的头结点

Node first = firstWaiter;

if (first != null)

doSignalAll(first);

}

private void doSignalAll(Node first) {

lastWaiter = firstWaiter = null;

do {

Node next = first.nextWaiter;

first.nextWaiter = null;

//将Condition等待队列中的Node节点按之前顺序都转移到了AQS同步队列中

transferForSignal(first);

first = next;

} while (first != null);

}

final boolean transferForSignal(Node node) {

if (!compareAndSetWaitStatus(node, Node.CONDITION, 0))

return false;

//这里将Condition等待队列中的Node节点插入到AQS同步队列的尾部

Node p = enq(node);

int ws = p.waitStatus;

if (ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL))

LockSupport.unpark(node.thread);

return true;

}

//ReentrantLock#unlock()方法

public void unlock() {

//Sync是ReentrantLock的内部类,继承自AbstractQueuedSynchronizer,它是ReentrantLock中公平锁和非公平锁的基础实现

sync.release(1);

}

public final boolean release(int arg) {

//释放锁

if (tryRelease(arg)) {

//AQS同步队列头结点

Node h = head;

if (h != null && h.waitStatus != 0)

//唤醒节点中的线程

unparkSuccessor(h);

return true;

}

return false;

}

private void unparkSuccessor(Node node) {

int ws = node.waitStatus;

if (ws < 0)

compareAndSetWaitStatus(node, ws, 0);

Node s = node.next;

if (s == null || s.waitStatus > 0) {

s = null;

for (Node t = tail; t != null && t != node; t = t.prev)

if (t.waitStatus <= 0)

s = t;

}

if (s != null)

//唤醒阻塞线程

LockSupport.unpark(s.thread);

}

原理总结

用上面的示例总结一下CyclicBarrier的await方法实现,假设线程thread1和线程thread2都执行到CyclicBarrier的await(),都进入dowait(boolean timed, long nanos),thread1先获取到独占锁,执行到--count的时,index等于1,所以进入下面的for循环,接着执行trip.await(),进入await()方法,执行Node node = addConditionWaiter()将当前线程构造成Node节点并加入到Condition等待队列中,然后释放获取到的独占锁,当前线程进入阻塞状态;此时,线程thread2可以获取独占锁,继续执行--count,index等于0,所以先执行command.run(),输出myThread,然后执行nextGeneration(),nextGeneration()中trip.signalAll()只是将Condition等待队列中的Node节点按之前顺序都转移到了AQS同步队列中,这里也就是将thread1对应的Node节点转移到了AQS同步队列中,thread2执行完nextGeneration(),返回return 0之前,细看代码还需要执行lock.unlock(),这里会执行到ReentrantLock的unlock()方法,最终执行到AQS的unparkSuccessor(Node node)方法,从AQS同步队列中的头结点开始释放节点,唤醒节点对应的线程,即thread1恢复执行。

如果有三个线程thread1、thread2和thread3,假设线程执行顺序是thread1、thread2、thread3,那么thread1、thread2对应的Node节点会被加入到Condition等待队列中,当thread3执行的时候,会将thread1、thread2对应的Node节点按thread1、thread2顺序转移到AQS同步队列中,thread3执行lock.unlock()的时候,会先唤醒thread1,thread1恢复继续执行,thread1执行到lock.unlock()的时候会唤醒thread2恢复执行。

实战经验

一个excel有多个sheet,每个sheet记录用户的每日交易流水,如果要计算这个用户当月的日平均消费情况,可以使用多线程先分别计算每日的消费情况,然后再做汇总计算平均值。

面试考点

CyclicBarrier当所有线程都到达屏障点后,等待线程的执行顺序是什么样的?

CyclicBarrier的await方法是使用ReentrantLock和Condition控制实现的,使用的Condition实现类是ConditionObject,它里面有一个等待队列和await方法,这个await方法会向队列中加入元素。当调用CyclicBarrier的await方法会间接调用ConditionObject的await方法,当屏障关闭后首先执行指定的barrierAction,然后依次执行等待队列中的任务,有先后顺序。

相关推荐

花十几万配的顶级电脑:遭遇诡异Bug无法开机!机主绝望发帖求助

快科技7月1日消息,一位3D图形工作者在组装了一台价值约2万美元(约合14.3万元人民币)的顶级DIY电脑后,遭遇了令人头疼的问题。Reddit用户joel_motion介绍,他的这台电脑配备了AMD...

麒麟系统笔记本电脑问题及解决方法

最近配发了麒麟系统的笔记本电脑,WPS、微信、QQ等软件倒是都有,日常办公还行,但也发现了一些问题,如:1、(网络打印机问题)据到场的技术人员讲,直接USB口连接的常见打印机都有相应的驱动程序,可以正...

电脑驱动问题修复方法全总结(电脑驱动坏了怎么修复不成功呢)

在电脑使用过程中,驱动程序出现问题可能导致设备无法正常工作,影响用户体验。下面为您详细总结不同场景下修复电脑驱动问题的多种解决方案,涵盖从基础到进阶,再到系统级的操作方法,以及硬件排查和预防建议。一、...

电脑总是莫名其妙出故障,立即检查这个设置!

不久前贴吧看到过这么一个帖子:有个做设计的狠人,仗着自己32G内存条,非说虚拟内存是微软的智商税。结果呢?Photoshop渲染到99%直接闪退,3D建模文件当场“灰飞烟灭”。电脑操作系统可不像人懂得...

网友被很简单的一个电脑问题折腾了几个月还没弄好,挺感慨的

昨天晚上,一个网友询问一个型号的电脑主机买了不合适可不可以退,我挺奇怪的,就问他具体怎么回事,他说他的电吉它总是连不上电脑,无法调音,请人重装了几次系统,又请人折腾数次,几个月一直没搞定,怀疑是电脑本...

一次诡异的电脑重启故障(电脑出现异常重启)

在公司的日常运营中,设备的稳定运行至关重要。近日,公司里一台用于检测设备的电脑出现了异常状况,着实让我费了一番周折才解决问题。那天,我接到同事反馈,说这台检测设备电脑莫名地重启。我立刻放下手中的工作,...

德国所有机场突发电脑系统故障 大量航班受影响

当地时间1月3日,德国所有机场突发电脑系统故障,导致大范围航班运行受影响。据德国联邦警察发言人证实,机场边检及相关系统无法正常运行,旅客需面临长时间排队和等待的状况。△资料图多个机场出现严重技术故障据...

电脑维修入门基本知识大全(电脑维修入门基本知识大全图解)

以下是电脑维修入门基本知识的系统整理,结合硬件维护、故障排查及使用习惯三大核心方向,帮助新手快速掌握关键要点:一、硬件基础与识别1.核心组件认知主板:硬件连接中枢,故障易导致开机无反应。CPU/内存/...

电脑上网常用故障及其解决方法(电脑网络故障怎么修复)

干了这么多年的网络,今天我就来总结一下,经常出现的网络故障及其解决的方法,看看有没有哪些故障是你遇见过的?怎么解决的?下面我们从这几个点来分析:一、IP地址问题显示IP地址冲突,我们怎么办?Windo...

维修电脑常用的7个方法(维修电脑常用的7个方法是什么)

今天介绍维修电脑常用的7个解决方法,平时电脑开不了机,按开机键没反应,或者电源指示灯闪烁,屏幕不亮,显示无信号,或者开机主板有警报声,电脑缺少各种系统文件,或者开机蓝屏等使用以下这7个方法,可以修好8...

突然崩了!很多人以为电脑坏了!官方紧急回应

QQ崩了这事儿真挺逗。上午十一点多正干活呢,桌面QQ突然闪退报错,反复登录就是进不去。第一反应绝对是骂电脑不争气,有人甚至把系统重装了。结果热搜跳出来"QQ崩了",合着白忙活半天。腾讯...

电脑网络连不上网?10个步骤轻松排查故障,小白也能自己修!

大家有没有遇到过这种情况?正追剧到关键时刻,突然提示"网络连接失败";急着要交的文档传不上网盘,急得直冒汗。别慌!今天教大家10个小白都能操作的排查方法,不用拆电脑也不用求人,跟着做就...

电脑开机花屏肯定不是电脑的问题(电脑开机花屏是怎么回事)

文章最后,了解更多,领取红包。世界上这么多人,每天运用电脑的人就更是数不计数,所以每天世界上的每个角落都有很多人在面临着不一样的电脑问题。小编我今天就来说说,当电脑出现开机花屏的时候,应该怎么搞定。所...

电脑没有声音怎么办?不是电脑的问题,是你不知道怎么设置的问题

电脑没有声音怎么办?了解这几招快速解决!有没有碰到过这样的经历,电脑用着突然没有声音了;或者重启电脑之后,电脑没声音了。然后绞尽脑汁尝试各种办法,搞了好久也没有解决这个问题,今天小编给大家介绍几种方法...

《电脑故障不求人,这些维修技巧要掌握!》

电脑故障不求人,这些维修技巧要掌握!电脑罢工别慌!掌握基础排查逻辑,80%小问题可自救。以下分场景拆解实用技巧,让你秒变“修机达人”。一、开机无反应:先查“供电三件套”1.电源检查-按机箱电源键,...

取消回复欢迎 发表评论: