В многопоточном программировании часто случаются ситуации, когда поток должен находится в режиме ожидания в определенной точке приложения, пока остальные потоки не достигнут этой точки. Как только нужное количество потоков достигает точки ожидания, то приостановка выполнения потоков снимается и потоки могут вновь продолжить свою работу. На пример, если ряд потоков, каждый из которых выполняет часть общего вычисления и результат их работы должен быть объединен в общий результат. Объединить результат вычисления можно только в том случае, когда все потоки завершат свои маленькие вычисления. Поэтому потоки, которые первые завершили свою работы должны ожидать завершения работы оставшихся потоков. Для реализации описанного механизма в пакете java.util.concurrent предусмотрен класс CyclicBarrier (с версии 1.5).
Класс CyclicBarrier
Для создания экземпляра класса CyclicBarrier предусмотрено два конструктора. Первый конструктор принимает в качестве параметра — количество потоков которое нужно достигнуть для снятия барьера:
1 2 3 |
public CyclicBarrier(int parties) { this(parties, null); } |
Второй конструктор принимает количество потоков и поток вида Runnable, который будет запущен по достижении барьера указанным в первом параметре количеством потоков:
1 2 3 4 5 6 |
public CyclicBarrier(int parties, Runnable barrierAction) { if (parties <= 0) throw new IllegalArgumentException(); this.parties = parties; this.count = parties; this.barrierCommand = barrierAction; } |
Когда поток достигает барьера нужно вызвать метод await класса CyclicBarrier. В результате выполнения метода await работа потока приостанавливается до того момента пока указанное нами количество потоков не вызовет метод await и барьер не будет снят.
В классе CyclicBarrier представлено две формы метода await. В первой форме ожидание снятия барьера длится без ограничения по времени:
1 2 3 4 5 6 7 |
public int await() throws InterruptedException, BrokenBarrierException { try { return dowait(false, 0L); } catch (TimeoutException toe) { throw new Error(toe); // cannot happen } } |
Во второй же форме указывается ограничение по времени, по истечении которого барьер будет снят:
1 2 3 4 5 6 |
public int await(long timeout, TimeUnit unit) throws InterruptedException, BrokenBarrierException, TimeoutException { return dowait(true, unit.toNanos(timeout)); } |
Время ожидания указывается в формате TimeUnit.
После того как барьер будет снят и все потоки освобождены, барьер можно будет использовать повторно, в этом его принципиальное отличие от класса CountDownLatch, который мы рассматривали ранее.
Пример
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 |
package ru.javanerd; import java.util.concurrent.BrokenBarrierException; import java.util.concurrent.CyclicBarrier; public class CyclicBarrierExample { public static void main(String[] args) { //Создание барьера с ожиданием 5 потоков и //потоком Action который будет выполнен при снятии барьера CyclicBarrier cb = new CyclicBarrier(5, new Action()); System.out.println("Старт потоков!"); //Запуск потоков new CustomThread(cb, "Поток a"); new CustomThread(cb, "Поток b"); new CustomThread(cb, "Поток c"); new CustomThread(cb, "Поток d"); new CustomThread(cb, "Поток e"); } } //Реализация потоков, которые будут ждать снятия барьера class CustomThread implements Runnable { final private CyclicBarrier cb; final private String message; CustomThread(CyclicBarrier cb, String message) { this.cb = cb; this.message = message; //Старт потока new Thread(this).start(); } public void run() { System.out.println(this.message); try { //Ожидание потоком снятия барьера this.cb.await(); } catch (BrokenBarrierException | InterruptedException ex) { System.out.println(ex); } } } //Поток который будет запущен при снятии барьера class Action implements Runnable { public void run() { System.out.println("Барьер снят!"); } } |
Возможный результат выполнения:
1 2 3 4 5 6 7 |
Старт потоков! Поток a Поток c Поток d Поток e Поток b Барьер снят! |
В теле метода main() создается барьер с ожиданием 5 потоков и указывается поток который должен быть выполнен по достижению барьера. Далее запускаются 5 потоков. Потоки, которые будут ждать снятия барьера представлены классом CustomThread, в конструкторе которого происходит непосредственно запуск потока, а в методе run() происходит вызов ожидания снятия барьера с обязательным перехватом исключений. Поток, который будет выполнен в результате снятия барьера представлен классом Action и состоит всего из одного метода run().
Исходный код доступен на GitHub