Apache Curator是一个比较完善的ZooKeeper客户端框架,通过封装的一套高级API 简化了ZooKeeper的操作。Curator次要解决了三类问题:
- 封装ZooKeeper client与ZooKeeper server之间的连贯解决
- 提供了一套Fluent格调的操作API
- 提供ZooKeeper各种利用场景(recipe, 比方:分布式锁服务、集群领导选举、共享计数器、缓存机制、分布式队列等)的形象封装
Curator次要从以下几个方面升高了zk应用的复杂性:
- 重试机制:提供可插拔的重试机制, 它将给捕捉所有可复原的异样配置一个重试策略,并且外部也提供了几种规范的重试策略(比方指数弥补)
- 连贯状态监控: Curator初始化之后会始终对zk连贯进行监听,一旦发现连贯状态发生变化将会作出相应的解决
- zk客户端实例治理:Curator会对zk客户端到server集群的连贯进行治理,并在须要的时候重建zk实例,保障与zk集群连贯的可靠性
- 各种应用场景反对:Curator实现了zk反对的大部分应用场景(甚至包含zk本身不反对的场景),这些实现都遵循了zk的最佳实际,并思考了各种极其状况
(一):客户端连贯的创立:
curator的操作客户端是:CuratorFramework。其连贯的建设形式如下:
@Bean public CuratorFramework curatorFramework() { RetryForever forever = new RetryForever(500); CuratorFramework framework= CuratorFrameworkFactory.builder().connectString(zkUrl) .connectionTimeoutMs(60000) .sessionTimeoutMs(120000) .retryPolicy(forever).build(); framework.start(); return framework; }
其参数除了连贯字符串之外,还有如下是三个参数:
1:连贯超时工夫:如果配置了curator-default-connection-timeout参数,则取该参数值。 默认值是15秒
2:会话超时工夫:如果配置了curator-default-session-timeout参数,则取该参数值。 默认值是60秒
3:重试策略。curator提供了如下重试策略:
3.1:RetryForever。始终重试,参数是重试间隔时间,单位毫秒
3.2:RetryOneTime。重试一次,参数是重试之间的间隔时间,单位毫秒。
3.3:RetryNTimes。重试N次, 参数是重试次数和重试之间的间隔时间,单位毫秒。
3.4:ExponentialBackoffRetry。参数为最大重试次数(默认值29),最大休眠工夫,根本休眠工夫。其休眠工夫不确定。
3.5:BoundedExponentialBackoffRetry。该类继承了ExponentialBackoffRetry。
(二):Watch机制
默认状况下,在操作zookeeper的命令中,应用 usingWatcher() 办法调用的监听器是一次性的。Curator提供了Cache机制,一次注册监听器即可。 Curator提供了如下三种watch。(carator版本不一样,可能会有所不同) 1:NodeCache。提供的Listener是NodeCacheListener,其监听了节点数据的变动。 2:PathChildrenCache。提供的Listener是PathChildrenCacheListener。其监听子节点的变动。 3:TreeCache。提供的Listener是TreeCacheListener。其监听了节点数据和子节点的变动。 应用例子如下:
NodeCache nodeCache = new NodeCache(cutator, "/file/cache"); nodeCache.getListenable().addListener(new NodeCacheListener() { @Override public void nodeChanged() throws Exception { System.out.println("path: /file/cache changed!!!!!!"); } }); nodeCache.start();
TreeCache treeCache = new TreeCache(cutator, "/file/cache"); treeCache.getListenable().addListener(new TreeCacheListener() { @Override public void childEvent(CuratorFramework client, TreeCacheEvent event) throws Exception { System.out.println("TREE CACHE: " + event.toString()); } }); treeCache.start();
(三):分布式锁
curator提供了几种分布式锁。其有如下几种:
InterProcessMultiLock
InterProcessMutex
InterProcessReadWriteLock
InterProcessSemaphoreMutex
InterProcessSemaphoreV2。
curator提供的基于zookeeper的分布式锁和redis提供的分布式锁有如下不一样:
1:curator的分布式锁无过期工夫,redis的分布式锁个别会设置过期工夫。
2:curator的分布式锁在服务进行或者重启后,会开释,如果死锁能够应用这个方法解锁。而redis锁则不行。
3:curator分布式锁会防止羊群效应。
4:curator分布式锁是可重入锁。
curator锁的应用流程大略如下:
1:获取锁时,创立长期程序节点。
2:判断以后创立的节点是不是首节点,如果是首节点,则认为锁获取胜利。
3:如果不是首节点,则对上一个节点增加监听器(watcher),而后以后线程wait。
4:当上一个节点删除时,监听器触发,获取锁胜利
5:锁开释的时候,会删除呈现的长期程序节点
// 构造函数,参数是连贯客户端和锁门路 public InterProcessMutex(CuratorFramework client, String path) { this(client, path, new StandardLockInternalsDriver()); } // 获取锁 private boolean internalLock(long time, TimeUnit unit) throws Exception { /* Note on concurrency: a given lockData instance can be only acted on by a single thread so locking isn't necessary */ // 判断是否以后线程获取到锁,如果是以后线程,则返回获取胜利 Thread currentThread = Thread.currentThread(); LockData lockData = threadData.get(currentThread); if ( lockData != null ) { lockData.lockCount.incrementAndGet(); return true; } // 尝试获取锁,如果获取到,就缓存在内存中。 String lockPath = internals.attemptLock(time, unit, getLockNodeBytes()); if ( lockPath != null ) { LockData newLockData = new LockData(currentThread, lockPath); threadData.put(currentThread, newLockData); return true; } return false; } String attemptLock(long time, TimeUnit unit, byte[] lockNodeBytes) throws Exception { final long startMillis = System.currentTimeMillis(); final Long millisToWait = (unit != null) ? unit.toMillis(time) : null; final byte[] localLockNodeBytes = (revocable.get() != null) ? new byte[0] : lockNodeBytes; int retryCount = 0; String ourPath = null<p style="color:transparent">来源gao!daima.com搞$代!码网</p>; boolean hasTheLock = false; boolean isDone = false; while ( !isDone ) { isDone = true; try { // 在该门路下创立长期程序节点。 ourPath = driver.createsTheLock(client, path, localLockNodeBytes); // 判断以后创立的节点是否为序号最小的节点,如果不是,则对上一个节点创立监听器watcher,而后期待 hasTheLock = internalLockLoop(startMillis, millisToWait, ourPath); } catch ( KeeperException.NoNodeException e ) { // gets thrown by StandardLockInternalsDriver when it can't find the lock node // this can happen when the session expires, etc. So, if the retry allows, just try it all again // 如果抛出异样提醒节点不存在,则认为是会话超时,则进行重试。 if ( client.getZookeeperClient().getRetryPolicy().allowRetry(retryCount++, System.currentTimeMillis() - startMillis, RetryLoop.getDefaultRetrySleeper()) ){ isDone = false; } else { throw e; } } } if ( hasTheLock ){ return ourPath; } return null; }
应用例子如下:
public void run() { try { InterProcessMutex lock = new InterProcessMutex(client, path); lock.acquire(); //获取锁,能够设置超时工夫 System.out.println("thread-" + idx + " get the lock!!!"); Random random = new Random(); int time = random.nextInt(10); TimeUnit.MILLISECONDS.sleep(time * 1000); System.out.println("thread-" + idx + " release the lock!!!"); lock.release(); //开释锁 } catch (Exception e) { e.printStackTrace(); } }
(四):选举用法
zookeeper是提供了2个类用于抉择,别离如下:
LeaderLatch:这里提供的选举是同步的
LeaderSelector:这里提供的选举是异步的,提供一个LeaderSelectorListener用于接管选举后果。
用法例子如下:
@Override public void run() { LeaderSelector leader = new LeaderSelector(client, path, new LeaderSelectorListener() { @Override public void stateChanged(CuratorFramework curatorFramework, ConnectionState connectionState) { } @Override public void takeLeadership(CuratorFramework client) throws Exception { System.out.println("LeaderSelector Thread-" + idx + " is the leader!!!!!!"); } }); leader.autoRequeue(); //重新加入抢的序列 leader.start(); } public void run() { try { LeaderLatch leader = new LeaderLatch(client, path); leader.addListener(new LeaderLatchListener() { @Override public void isLeader() { System.out.println("(Listener)Thread-" + idx + " is the leader!!!!!!"); } @Override public void notLeader() { System.out.println("(Listener)Thread-" + idx + " is not the leader!!!!!!"); } }); leader.start(); leader.await(); //阻塞直到获取到Leader身份 if (leader.hasLeadership()) { System.out.println("Thread-" + idx + " is the leader!!!!!!"); } else { System.out.println("Thread-" + idx + " is not the leader!!!!!!"); } } catch (Exception e) { e.printStackTrace(); } }