百度360必应搜狗淘宝本站头条
当前位置:网站首页 > 技术文章 > 正文

循环栅栏 CycleBarrier 理解到深入

cac55 2024-09-19 17:04 29 浏览 0 评论

关于 CycleBarrier

Java JUC 包中提供类似的工具,可以设置一个简单的集合点,等所有成员到齐了之后,再执行下一步操作。CycleBarrier 它就相当于是一个栅栏,所有线程在到达栅栏后都需要等待其他线程,等所有线程都到达后,再一起通过。

CyclicBarrier 可以有不止一个栅栏,因为它的栅栏(Barrier)可以重复使用(Cyclic)。

CycleBarrier 特别适用于并行迭代计算,每个线程负责一部分计算,然后在栅栏处等待其他线程完成,所有线程到齐后,交换数据和计算结果,再进行下一次迭代。

初步理解

CyclicBarrier 有一个 parties 成员变量,表示参与的线程的个数,下面来看一个例子,我们通过这个例子来理解下 Cycle 的含义:

public class Demo1 {
 
    public static void main(String[] args) {
        // 初始化一个 CyclicBarrier,大小为 5,当 5 个线程满足条件之后,就执行一次:执行完毕!
        CyclicBarrier cyclicBarrier = new CyclicBarrier( 5, () -> {
            System.out.println("执行完毕!");
        });
 
        // 为了测试是否能循环,这里循环设置为 20,预期的结果应该是会执行 4 次:执行完毕!
        for (int i=0; i<20; i++) {
            int temp = i;
            new Thread(() -> {
                System.out.println(Thread.currentThread().getName() + ", 执行到:" + temp );
                try{
                    cyclicBarrier.await();  // 等待其他线程到齐,到齐了之后就会执行一次
                } catch (BrokenBarrierException | InterruptedException e) {
                    e.printStackTrace();
                }
            }).start();
        }
    }
}

程序输出

由于多线程输出,我是多刷了几遍(也不知道刷了多少遍了)就会出现下面的执行结果,这里就可以看出来效果了,这里有 20个线程,5 个线程为一组(如果是按照源码上面来说,就是 5 个线程 parties 为一代 generation ),可以满足我们的预期。

每次参与的线程数为 5,然后当 5个线程都执行完毕之后,再进入下一代的计算,看到这里应该还是比较容易理解的。

源码探究

看到这里了,我们就去看下源码是什么情况吧。

成员变量

首先,我们看下 CycleBarrier 的成员变量:

//同步操作锁
private final ReentrantLock lock = new ReentrantLock();
//线程拦截器
private final Condition trip = lock.newCondition();
//每次拦截的线程数
private final int parties;
//换代前执行的任务
private final Runnable barrierCommand;
//表示栅栏的当前代
private Generation generation = new Generation();
//计数器
private int count;
 
//静态内部类Generation
private static class Generation {
    Generation() {}                 // 防止访问构造函数创建
    boolean broken;                 // 初始化为 false
}

这里说明下:

  • trip 是一个 condition,就是用来阻塞与唤醒线程使用的。
  • 两个 int 类型的变量 parties 与 count,在初始化 CycleBarrier 的时候,两个值的大小是一样的,之后随着线程的 await 方法的调用而 -1,直到减到 0 ,就将所有的线程全部唤醒。
  • 内部静态变量 Generation,表示当前栅栏的代,上面的例子中,5个线程就为一代,执行完了,就进行下一代。
  • barrierCommond,表示换代前执行的任务,当 count 减为 0 时表示本次结束。在转到下一局游戏之前,将所有的线程唤醒,在唤醒之前可以通过指定的 barrierCommond 来执行自己的任务。

构造器

接着,我们看下构造器相关的源码,这里就可以看到 count 与 parties 的初始化的时候就是一致的

//构造器,这里就是我们上面那个例子使用的构造器
public CyclicBarrier(int parties, Runnable barrierAction) {
  if (parties <= 0) throw new IllegalArgumentException();
  this.parties = parties;
  this.count = parties;
  this.barrierCommand = barrierAction;
}
 
// 构造器,只传入一个 parties 的大小,不进行自定义任务
public CyclicBarrier(int parties) {
  this(parties, null);
}

await() 方法

这里我们看下 await() 方法,在上面的例子中,是使用 await() 方法等待其他线程到齐,再去执行后面的操作

// 非定时等待
public int await() throws InterruptedException, BrokenBarrierException {
  try {
    return dowait(false, 0L);
  } catch (TimeoutException toe) {
    throw new Error(toe);
  }
}
 
// 定时等待
public int await(long timeout, TimeUnit unit) throws InterruptedException, BrokenBarrierException, TimeoutException {
  return dowait(true, unit.toNanos(timeout));
}

看源码之后,发现 await() 方法最后都是调用 dowait() 方法,我们就直接看下 核心的 dowait 方法的执行逻辑,会发现这块并没有特别晦涩难懂,那么我们就一起继续向下读

private int dowait(boolean timed, long nanos) throws InterruptedException, BrokenBarrierException, TimeoutException {
    final ReentrantLock lock = this.lock;
    lock.lock();    // 开始上锁
    try {
        final Generation g = generation;
 
        if (g.broken)    // 判断是否当前代被打破
            throw new BrokenBarrierException();
 
        // 检查当前线程是否被中断
        if (Thread.interrupted()) {
            // 如果中断了,就会执行下面三件事
            // 1. 将 generation 的 broken 标记设置为 true
            // 2. 重置 count 值
            // 3. 唤醒当前拦截的线程
            breakBarrier();
            throw new InterruptedException();
        }
 
        int index = --count;    // 每次都将计数器的值减 1
        if (index == 0) {          // 计数器的值减为0,就会唤醒所有线程并转换到下一代
            Runnable command = barrierCommand;
            if (command != null) {       // 如果存在自定义的任务,这里就会做判断,在唤醒所有线程之前执行
                try {
                    command.run();
                } catch (Throwable ex) {
                    breakBarrier();
                    throw ex;
                }
            }
            nextGeneration();    // 唤醒所有线程,并转到下一代
            return 0;
        }
 
        //官方注释: loop until tripped, broken, interrupted, or timed out
        //这里循环是:如果计数器不为0则执行此循环
        for (;;) {
            try {
                // 根据传入的参数来决定是定时等待还是非定时等待
                if (!timed)
                    trip.await();
                else if (nanos > 0L)
                    nanos = trip.awaitNanos(nanos);
            } catch (InterruptedException ie) {
                if (g == generation && ! g.broken) {
                    // 如果当前线程在等待期间被中断,则打破栅栏唤醒其他线程
                    breakBarrier();
                    throw ie;
                } else {
                     //若在捕获中断异常前已经完成在栅栏上的等待, 则直接调用中断操作
                    Thread.currentThread().interrupt();
                }
            }
 
            if (g.broken)
                throw new BrokenBarrierException();
 
            if (g != generation) //如果线程因为换代操作而被唤醒则返回计数器的值
                return index;
 
            // 如果是因为时间到了而被唤醒则打破栅栏,抛出异常
            if (timed && nanos <= 0L) {
                breakBarrier();
                throw new TimeoutException();
            }
        }
    } finally {
        lock.unlock();    // 解锁
    }
}

nextGeneration() 与 breakBarrier()

后面,我们看下刚刚上面提到过的切换代 nextGeneration(), 打破栅栏方法 breakBarrier()

//切换栅栏到下一代
private void nextGeneration() {
  //唤醒条件队列所有线程
  trip.signalAll();
  //设置计数器的值为需要拦截的线程数
  count = parties;
  //重新设置栅栏代次
  generation = new Generation();
}
 
//打破当前栅栏
private void breakBarrier() {
  //将当前栅栏状态设置为打破
  generation.broken = true;
  //设置计数器的值为需要拦截的线程数
  count = parties;
  //唤醒所有线程
  trip.signalAll();
}

reset()

最后,我们看下怎么重置,这里其实就是调用了上面说的 切换代 nextGeneration() 与 breakBarrier() 方法

public void reset() {
    final ReentrantLock lock = this.lock;
    lock.lock();
    try {
        breakBarrier();   // break the current generation
        nextGeneration(); // start a new generation
    } finally {
        lock.unlock();
    }
}

回顾

最后看一个例子回顾下

public class Demo {
 
    static class Tourist extends Thread {
        CyclicBarrier cyclicBarrier;
 
        public Tourist(CyclicBarrier cyclicBarrier){
            this.cyclicBarrier = cyclicBarrier;
        }
 
        @Override
        public void run() {
            try {
                // 模拟点先各自独立运行
                Thread.sleep( (int) (Math.random() * 1000) );
                // 集合点 A
                cyclicBarrier.await();
                System.out.println(this.getName() + ",到达集合点 A " + System.currentTimeMillis() );
 
                // 集合后模拟再各自独立运行
                Thread.sleep( (int) (Math.random() * 1000));
                // 集合点 B
                cyclicBarrier.await();
                System.out.println(this.getName() + ",到达集合点 B " + System.currentTimeMillis() );
 
            } catch (InterruptedException | BrokenBarrierException e) {
                e.printStackTrace();
            }
        }
    }
 
    public static void main(String[] args) {
        // 定义出多个 线程
        int num = 3;
        Tourist[] tourist = new Tourist[num];
 
        CyclicBarrier barrier = new CyclicBarrier(3, () -> {
            System.out.println("集合完毕: " + System.currentTimeMillis() + " 执行的线程为: " + Thread.currentThread().getName());
        });
 
        for (int i = 0; i< num; i++){
            tourist[i] = new Tourist(barrier);
            tourist[i].start();
        }
    }
}

相关推荐

Mac电脑强制删除任何软件方法-含自启动应用

对于打工者来说,进入企业上班使用的电脑大概率是会被监控起来,比如各种流行的数据防泄漏DLP,奇安信天擎,甚至360安全卫士,这些安全软件你想卸载是非常困难的,甚至卸载后它自己又安装回来了,并且还在你不...

Linux基础知识 | 文件与目录大全讲解

1.linux文件权限与目录配置1.文件属性Linux一般将文件可存取的身份分为三个类别,分别是owner/group/others,且三种身份各read/write/execute等权限文...

文件保护不妥协:2025 年 10 款顶级加密工具推荐

数据安全无小事,2025年这10款加密工具凭借独特功能脱颖而出,从个人到企业场景全覆盖,第一款为Ping32,其余为国外英文软件。1.Ping32企业级加密核心工具,支持200+文件格...

省心省力 一个软件搞定系统维护_省心安装在哪里能找到

◆系统类似于我们居住的房间,需要经常打理才能保持清洁、高效。虽然它本身也自带一些清理和优化的工具,但借助于好用的第三方工具来执行这方面的任务,会更让人省心省力。下面笔者就为大家介绍一款集多项功能于一身...

JAVA程序员常用的几个工具类_java程序员一般用什么软件写程序

好的工具做起事来常常事半功倍,下面介绍几个开发中常用到的工具类,收藏一下,也许后面真的会用到。字符串处理:org.apache.commons.lang.StringUtilsisBlank(Char...

手工解决Windows10的若干难题_windows10系统卡顿怎么解决

【电脑报在线】很多朋友已经开始使用Win10,估计还只是测试版本的原因,使用过程中难免会出现一些问题,这里介绍解决一些解决难题的技巧。技巧1:让ProjectSpartan“重归正途”从10074...

System32文件夹千万不能删除,看完这篇你就知道为什么了

C:\Windows\System32目录是Windows操作系统的关键部分,重要的系统文件存储在该目录中。网上的一些恶作剧者可能会告诉你删除它,但你不应该尝试去操作,如果你尝试的话,我们会告诉你会发...

Windows.old 文件夹:系统备份的解析与安全删除指南

Windows.old是Windows系统升级(如Win10升Win11)或重装时,系统自动在C盘创建的备份文件夹,其核心作用是保留旧系统的文件、程序与配置,为“回退旧系统”提供保...

遇到疑难杂症?Windows 10回收站问题巧解决

回收站是Windows10的一个重要组件。然而,我们在使用过程中,可能会遇到一些问题。例如,不论回收站里有没有文件,都显示同一个图标,让人无法判别回收站的空和满的真实情况;没有了像Windows7...

卸载软件怎么彻底删掉?简单几个步骤彻底卸载,电脑小白看过来

日常工作学习生活中,我们需要在安装一些软件程序,但随着软件的更新迭代速度,很多时候我们需要重新下载安装新的程序,这时就需要将旧的一些软件程序进行卸载。但是卸载软件虽然很简单,但是很多小伙伴们表示卸载不...

用不上就删!如何完全卸载OneDrive?

作为Windows10自带的云盘,OneDrive为资料的自动备份和同步提供了方便。然而,从隐私或其他方面考虑,有些人不愿意使用OneDrive。但Windows10本身不提供直接卸载OneDri...

【Linux知识】Linux下快速删除大量文件/文件夹方法

在Linux下,如果需要快速删除大量文件或文件夹,可以使用如下方法:使用rm命令删除文件:可以使用rm命令删除文件,例如:rm-rf/path/to/directory/*这个命令会递...

清理系统不用第三方工具_清理系统垃圾用什么软件

清理优化系统一定要借助于优化工具吗?其实,手动优化系统也没有那么神秘,掌握了方法和技巧,系统清理也是一件简单和随心的事。一方面要为每一个可能产生累赘的文件找到清理的方法,另一方面要寻找能够提高工作效率...

系统小技巧:软件卸载不了?这里办法多

在正常情况下,我们都是通过软件程序组中的卸载图标,或利用控制面板中的“程序和功能”模块来卸载软件的。但有时,我们也会发现利用卸载图标无法卸载软件或者卸载图标干脆丢失找不到了,甚至控制面板中卸载软件的功...

麒麟系统无法删除文件夹_麒麟系统删除文件权限不够

删除文件夹方法例:sudorm-rf文件夹名称。删除文件方法例:sudorm-r文件名包括扩展名。如果没有权限,给文件夹加一下权限再删。加最高权限chmod775文件名加可执行权限...

取消回复欢迎 发表评论: