1 概述
CountDownLatch
以及CyclicBarrier
都是Java
外面的同步工具之一,本文介绍了两者的基本原理以及根本应用办法。
2 CountDownLatch
CountDownLatch
是一个同步工具类,常见的应用场景包含:
- 容许一个或多个线程期待一系列的其余线程完结
- 在串行化工作中须要进行并行化解决,并期待所有并行化工作完结,串行化工作能力持续进行
比方思考这样一个场景,在一个电商网站中,用户点击了首页,须要一部分的商品,同时显示它们的价格,那么,调用的流程应该是:
- 获取商品
- 计算售价
- 返回所有商品的最终售价
解决这样的问题能够应用串行化或并行化操作,串行化就是逐个计算商品的售价,并返回,并行化就是获取商品后,并行计算每一个商品的售价,最初返回,显然后一种计划要比前一种要好,那么这时候就能够用上CountDownLatch
了。
一份简略的模仿代码如下:
<code class="java">import java.util.List; import java.util.concurrent.*; import java.util.stream.Collectors; import java.util.stream.IntStream; import static java.util.concurrent.ThreadLocalRandom.current; public class CountDownLatchExample { public static void main(String[] args) throws InterruptedException{ List<Price> list = IntStream.rangeClosed(1,10).mapToObj(Price::new).collect(Collectors.toList()); //计数器大小为商品列表的长度 final CountDownLatch latch = new CountDownLatch(list.size()); //线程池 ThreadPoolExecutor executor = new ThreadPoolExecutor(5,10,2, TimeUnit.SECONDS,new ArrayBlockingQueue<>(10), Executors.defaultThreadFactory(),new ThreadPoolExecutor.AbortPolicy()); list.forEach(p-> executor.execute(()->{ System.out.println("Product "+p.id+" start calculate price "); try{ //随机休眠模仿业务操作耗时 TimeUnit.SECONDS.sleep(current().nextInt(10)); p.setPrice(p.getPrice()*((p.getId() & 1) == 1 ? 0.9 : 0.7)); System.out.println("Product "+p.id+" calculate price completed"); }catch (InterruptedException e){ e.printStackTrace(); }finally { //每实现计算一个商品,将计数器减1,留神须要放在finally中 latch.countDown(); } })); //主线程阻塞直到所有的计数器为0,也就是期待所有的子工作计算价格结束 latch.await(); System.out.println("All of prices calculate finished"); //手动终止,不然不会完结运行 executor.shutdown(); } private static class Price{ private final int id; private double price; public Price(int id) { this.id = id; } public int getId() { return id; } public double getPrice() { return price; } public void setPrice(double price) { this.price = price; } } }
输入:
代码比较简单,要害中央用上了正文,能够看到代码执行程序如下:
- 创立多个工作计算商品的价格
- 主线程阻塞
- 计算实现后,将计数器减1
- 当计数器为0时,主线程退出阻塞状态
值得注意的是计数器减1的操作须要放在finally
中,因为有可能会出现异常,如果出现异常导致计数器不能缩小,那么主线程会始终阻塞。
另外,CountDownLatch
还有一个await(long timeout,TimeUnit unit)
办法,是带有超时参数的,也就是说,如果在超时工夫内,计数器的值还是大于0(还有工作没执行实现),会使得以后线程退出阻塞状态。
3 CyclicBarrier
CyclicBarrier
与CountDownLatch
有很多相似的中央,也是一个同步工具类,容许多个线程在执行完相应的操作之后彼此期待达到同一个barrier point
(屏障点)。CyclicBarrier
也适宜某个串行化的工作被拆分为多个并行化工作,这点与CountDownLatch
相似,然而CyclicBarrier
具备的一个更弱小的性能是,CyclicBarrier
能够被重复使用。
3.1 期待实现
先简略说一下CyclicBarrier
的实现原理:
- 初始化
CyclicBarrier
,传入一个int
参数,示意分片(parites
),通常意义上来说分片数就是工作的数量 - 同时串行化执行多个工作
- 工作执行实现后,调用
await()
,期待其余线程也达到barrier point
- 当所有线程达到后,持续以串行化形式运行工作
常见的应用办法是设置分片数为工作数+1,这样,能够在主线程中执行await()
,期待所有子工作实现。比方上面是应用CyclicBarrier
实现同样性能的模仿代码:
<code class="java">import java.util.List; import java.util.concurrent.*; import java.util.stream.Collectors; import java.util.stream.IntStream; import static java.util.concurrent.ThreadLocalRandom.current; public class CountDownLatchExample { public static void main(String[] args) throws InterruptedException,BrokenBarrierException{ List<Price> list = IntStream.rangeClosed(1,10).mapToObj(Price::new).collect(Collectors.toList()); final CyclicBarrier barrier = new CyclicBarrier(11); ThreadPoolExecutor executor = new ThreadPoolExecutor(10,10,2, TimeUnit.SECONDS,new ArrayBlockingQueue<>(10), Executors.defaultThreadFactory(),new ThreadPoolExecutor.AbortPolicy()); list.forEach(p-> executor.execute(()->{ System.out.println("Product "+p.id+" start calculate price "); try{ TimeUnit.SECONDS.sleep(current().nextInt(10)); p.setPrice(p.getPrice()*((p.getId() & 1) == 1 ? 0.9 : 0.7)); System.out.println("Product "+p.id+" calculate price completed"); }catch (InterruptedException e){ e.printStackTrace(); }finally { try{ barrier.await(); }catch (InterruptedException | BrokenBarrierException e){ e.printStackTrace(); } } })); barrier.await(); System.out.println("All of prices calculate finished"); executor.shutdown(); } private static class Price{ private final int id; private double price; public Price(int id) { this.id = id; } public int getId() { return id; } public double getPrice() { return price; } public void setPrice(double price) { this.price = price; } } }
输入雷同,代码大部分类似,不同的中央有:
latch.countDown()
替换成了barrier.await()
latch.await()
替换成了barrier.await()
- 线程池的外围线程数替换成了
10
await()
办法会期待所有的线程达到barrier point
,下面代码执行流程简述如下:
- 初始化
CyclicBarrier
,分片数为11(子线程数+1) - 主线程调用
await()
,期待子线程执行实现 - 子线程各自进行商品价格的计算,计算实现后,调用
await()
,期待其余线程也达到barrier point
- 当所有子线程计算实现后,因为没有后续操作,所以子线程运行完结,同时因为主线程还有后续操作,会先输入提示信息再终止线程池
留神一个很大的不同就是这里的线程池外围线程数目改成了 10,那么,为什么须要10?
因为如果是设置一个小于10的外围线程个数,因为线程池是会先创立外围线程来执行工作,外围线程满了之后,放进工作队列中,而假如只有5个外围线程,那么:
- 5个线程进行计算价格
- 另外5个工作放在工作队列中
这样的话,会呈现死锁,因为计算中的线程须要队列中的工作达到barrier point
能力完结,而队列中的工作须要外围线程计算结束后,能力调度进去计算,这样死锁就呈现了。
3.2 重复使用
CyclicBarrier
与CountDownLatch
的一个最大不同是,CyclicBarrier
能够被重复使用,原理上来说,await()
会将外部计数器减1,当计数器减为0时,会主动进行计数器(分片数)重置。比方,在下面的代码中,因为遇上促销流动,须要对商品的价格再次进行计算:
<code class="java">import java.util.List; import java.util.concurrent.*; import java.util.stream.Collectors; import java.util.stream.IntStream; import static java.util.concurrent.ThreadLocalRandom.current; public class CountDownLatchExample { public static void main(String[] args) throws InterruptedException,BrokenBarrierException{ List<Price> list = IntStream.rangeClosed(1,10).mapToObj(Price::new).collect(Collectors.toList()); final CyclicBarrier barrier = new CyclicBarrier(11); ThreadPoolExecutor executor = new ThreadPoolEx<i style="color:transparent">来源gaodai$ma#com搞$$代**码网</i>ecutor(10,10,2, TimeUnit.SECONDS,new ArrayBlockingQueue<>(10), Executors.defaultThreadFactory(),new ThreadPoolExecutor.AbortPolicy()); list.forEach(p-> executor.execute(()->{ System.out.println("Product "+p.id+" start calculate price."); try{ TimeUnit.SECONDS.sleep(current().nextInt(10)); p.setPrice(p.getPrice()*((p.getId() & 1) == 1 ? 0.9 : 0.7)); System.out.println("Product "+p.id+" calculate price completed."); }catch (InterruptedException e){ e.printStackTrace(); }finally { try{ barrier.await(); }catch (InterruptedException | BrokenBarrierException e){ e.printStackTrace(); } } })); barrier.await(); System.out.println("All of prices calculate finished."); //复制的一段雷同代码 list.forEach(p-> executor.execute(()->{ System.out.println("Product "+p.id+" start calculate price again."); try{ TimeUnit.SECONDS.sleep(current().nextInt(10)); p.setPrice(p.getPrice()*((p.getId() & 1) == 1 ? 0.9 : 0.7)); System.out.println("Product "+p.id+" calculate price completed."); }catch (InterruptedException e){ e.printStackTrace(); }finally { try{ barrier.await(); }catch (InterruptedException | BrokenBarrierException e){ e.printStackTrace(); } } })); barrier.await(); System.out.println("All of prices calculate finished again."); executor.shutdown(); } private static class Price{ private final int id; private double price; public Price(int id) { this.id = id; } public int getId() { return id; } public double getPrice() { return price; } public void setPrice(double price) { this.price = price; } } }
将计算价格的代码复制一遍,其中没有手动批改计数器,只是调用await()
,输入如下:
能够看到,并没有对CycliBarrier
进行相似reset
之类的操作,然而仍然能按失常逻辑运行,这是因为await()
外部会保护一个计数器,当计数器为0的时候,会主动进行重置,上面是await()
在OpenJDK 11
下的源码:
<code class="java">public int await() throws InterruptedException, BrokenBarrierException { try { return this.dowait(false, 0L); } catch (TimeoutException var2) { throw new Error(var2); } } private int dowait(boolean timed, long nanos) throws InterruptedException, BrokenBarrierException, TimeoutException { ReentrantLock lock = this.lock; lock.lock(); byte var9; try { //... int index = --this.count; if (index != 0) { //计数器不为0的状况 //.... } boolean ranAction = false; try { Runnable command = this.barrierCommand; if (command != null) { command.run(); } ranAction = true; this.nextGeneration(); var9 = 0; } finally { if (!ranAction) { this.breakBarrier(); } } } finally { lock.unlock(); } return var9; } private void nextGeneration() { this.trip.signalAll(); this.count = this.parties; this.generation = new CyclicBarrier.Generation(); }
当计数器为0时,会生成新的Generation
,并将var9
置为0,最初返回var9
(在这个办法中var9
只有一处赋值,就是代码中的var9=0
,能够了解成间接返回0)。
3.3 CyclicBarrier
其余的一些罕用办法
CyclicBarrier(int parties,Runnable barrierAction)
:结构的时候传入一个Runnable
,示意所有线程达到barrier point
时,会调用该Runnable
await(long timeout,TimeUnit unit)
:与无参的await()
相似,底层调用的是雷同的doWait()
,不过减少了超时性能isBroken()
:返回broken
状态,某个线程因为执行await
而进入阻塞,此时如果执行了中断操作(比方interrupt
),那么isBroken()
会返回true
。须要留神,处于broken
状态的CyclicBarrier
不能被间接应用,须要调用reset()
进行重置
4 总结
上面是CountDownLatch
与CyclicBarrier
的一些简略比拟,相同点如下:
- 都是
java.util.concurrent
包下的线程同步工具类 - 都能够用于“主线程阻塞始终期待,直到子工作实现,主线程才继续执行”的状况
不同点:
CountDownLatch
的await()
办法会期待计数器归0,而CyclicBarrier
的await()
会期待其余线程达到barrier point
CyclicBarrier
外部的计数器是能够被重置的,然而CountDownLatch
不能够CyclicBarrier
是由Lock
和Condition
实现的,而CountDownLatch
是由同步控制器AQS
实现的- 结构时
CyclicBarrier
不容许parties
为0,而CountDownLatch
容许count
为0