• 欢迎访问搞代码网站,推荐使用最新版火狐浏览器和Chrome浏览器访问本网站!
  • 如果您觉得本站非常有看点,那么赶紧使用Ctrl+D 收藏搞代码吧

浅谈Java(SpringBoot)基于zookeeper的分布式锁实现

springboot 搞代码 4年前 (2022-01-05) 36次浏览 已收录 0个评论

这篇文章主要介绍了Java(SpringBoot)基于zookeeper的分布式锁实现,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友们下面随着小编来一起学习学习吧

通过zookeeper实现分布式锁

1、创建zookeeper的client

首先通过CuratorFrameworkFactory创建一个连接zookeeper的连接CuratorFramework client

 public class CuratorFactoryBean implements FactoryBean, InitializingBean, DisposableBean { private static final Logger LOGGER = LoggerFactory.getLogger(ContractFileInfoController.class); private String connectionString; private int sessionTimeoutMs; private int connectionTimeoutMs; private RetryPolicy retryPolicy; private CuratorFramework client; public CuratorFactoryBean(String connectionString) { this(connectionString, 500, 500); } public CuratorFactoryBean(String connectionString, int sessionTimeoutMs, int connectionTimeoutMs) { this.connectionString = connectionString; this.sessionTimeoutMs = sessionTimeoutMs; this.connectionTimeoutMs = connectionTimeoutMs; } @Override public void destroy() throws Exception { LOGGER.info("Closing curator framework..."); this.client.close(); LOGGER.info("Closed curator framework."); } @Override public CuratorFramework getObject() throws Exception { return this.client; } @Override public Class getObjectType() { return this.client != null ? this.client.getClass() : CuratorFramework.class; } @Override public boolean isSingleton() { return true; } @Override public void afterPropertiesSet() throws Exception { if (StringUtils.isEmpty(this.connectionString)) { throw new IllegalStateException("connectionString can not be empty."); } else { if (this.retryPolicy == null) { this.retryPolicy = new ExponentialBackoffRetry(1000, 2147483647, 180000); } this.client = CuratorFrameworkFactory.newClient(this.connectionString, this.sessionTimeoutMs, this.connectionTimeoutMs, this.retryPolicy); this.client.start(); this.client.blockUntilConnected(30, TimeUnit.MILLISECONDS); } } public void setConnectionString(String connectionString) { this.connectionString = connectionString; } public void setSessionTimeoutMs(int sessionTimeoutMs) { this.sessionTimeoutMs = sessionTimeoutMs; } public void setConnectionTimeoutMs(int connectionTimeoutMs) { this.connectionTimeoutMs = connectionTimeoutMs; } public void setRetryPolicy(RetryPolicy retryPolicy) { this.retryPolicy = retryPolicy; } public void setClient(CuratorFramework client) { this.client = client; } } 

2、封装分布式锁

根据CuratorFramework创建InterProcessMutex(分布式可重入排它锁)对一行数据进行上锁

 public InterProcessMutex(CuratorFramework client, String path) { this(client, path, new StandardLockInternalsDriver()); }

使用 acquire方法
1、acquire() :入参为空,调用该方法后,会一直堵塞,直到抢夺到锁资源,或者zookeeper连接中断后,上抛异常。
2、acquire(long time, TimeUnit unit):入参传入超时时间、单位,抢夺时,如果出现堵塞,会在超过该时间后,返回false。

 public void acquire() throws Exception { if (!this.internalLock(-1L, (TimeUnit)null)) { throw new IOException("Lost connection while trying to acquire lock: " + this.basePath); } } public boolean acquire(long time, TimeUnit unit) throws Exception { return this.internalLock(time, unit); } 

释放锁 mutex.release();

 public void release() throws Exception { Thread currentThread = Thread.currentThread(); InterProcessMutex.LockData lockData = (InterProcessMutex.LockData)this.threadData.get(currentThread); if (lockData == null) { throw new IllegalMonitorStateException("You do not own the lock: " + this.basePath); } else { int newLockCount = lockData.lockCount.decrementAndGet(); if (newLockCount <= 0) { if (newLockCount <0) { throw new IllegalMonitorStateException("Lock count has gone negative for lock: " + this.basePath); } else { try { this.internals.releaseLock(lockData.lockPath); } finally { this.threadData.remove(currentThread); } } } } } 

封装后的DLock代码
1、调用InterProcessMutex processMutex = dLock.mutex(path);

2、手动释放锁processMutex.release();

3、需要手动删除路径dLock.del(path);

推荐 使用:
都是 函数式编程
在业务代码执行完毕后 会释放锁和删除path
1、这个有返回结果
public T mutex(String path, ZkLockCallback zkLockCallback, long time, TimeUnit timeUnit)
2、这个无返回结果
public void mutex(String path, ZkVoidCallBack zkLockCallback, long time, TimeUnit timeUnit)

 public class DLock { private final Logger logger; private static final long TIMEOUT_D = 100L; private static final String ROOT_PATH_D = "/dLock"; private String lockRootPath; private CuratorFramework client; public DLock(CuratorFramework client) { this("/dLock", client); } public DLock(Str<em style="color:transparent">来源gao.dai.ma.com搞@代*码网</em>ing lockRootPath, CuratorFramework client) { this.logger = LoggerFactory.getLogger(DLock.class); this.lockRootPath = lockRootPath; this.client = client; } public InterProcessMutex mutex(String path) { if (!StringUtils.startsWith(path, "/")) { path = Constant.keyBuilder(new Object[]{"/", path}); } return new InterProcessMutex(this.client, Constant.keyBuilder(new Object[]{this.lockRootPath, "", path})); } public  T mutex(String path, ZkLockCallback zkLockCallback) throws ZkLockException { return this.mutex(path, zkLockCallback, 100L, TimeUnit.MILLISECONDS); } public  T mutex(String path, ZkLockCallback zkLockCallback, long time, TimeUnit timeUnit) throws ZkLockException { String finalPath = this.getLockPath(path); InterProcessMutex mutex = new InterProcessMutex(this.client, finalPath); try { if (!mutex.acquire(time, timeUnit)) { throw new ZkLockException("acquire zk lock return false"); } } catch (Exception var13) { throw new ZkLockException("acquire zk lock failed.", var13); } T var8; try { var8 = zkLockCallback.doInLock(); } finally { this.releaseLock(finalPath, mutex); } return var8; } private void releaseLock(String finalPath, InterProcessMutex mutex) { try { mutex.release(); this.logger.info("delete zk node path:{}", finalPath); this.deleteInternal(finalPath); } catch (Exception var4) { this.logger.error("dlock", "release lock failed, path:{}", finalPath, var4); //   LogUtil.error(this.logger, "dlock", "release lock failed, path:{}", new Object[]{finalPath, var4}); } } public void mutex(String path, ZkVoidCallBack zkLockCallback, long time, TimeUnit timeUnit) throws ZkLockException { String finalPath = this.getLockPath(path); InterProcessMutex mutex = new InterProcessMutex(this.client, finalPath); try { if (!mutex.acquire(time, timeUnit)) { throw new ZkLockException("acquire zk lock return false"); } } catch (Exception var13) { throw new ZkLockException("acquire zk lock failed.", var13); } try { zkLockCallback.response(); } finally { this.releaseLock(finalPath, mutex); } } public String getLockPath(String customPath) { if (!StringUtils.startsWith(customPath, "/")) { customPath = Constant.keyBuilder(new Object[]{"/", customPath}); } String finalPath = Constant.keyBuilder(new Object[]{this.lockRootPath, "", customPath}); return finalPath; } private void deleteInternal(String finalPath) { try { ((ErrorListenerPathable)this.client.delete().inBackground()).forPath(finalPath); } catch (Exception var3) { this.logger.info("delete zk node path:{} failed", finalPath); } } public void del(String customPath) { String lockPath = ""; try { lockPath = this.getLockPath(customPath); ((ErrorListenerPathable)this.client.delete().inBackground()).forPath(lockPath); } catch (Exception var4) { this.logger.info("delete zk node path:{} failed", lockPath); } } } 
 @FunctionalInterface public interface ZkLockCallback { T doInLock(); } @FunctionalInterface public interface ZkVoidCallBack { void response(); } public class ZkLockException extends Exception { public ZkLockException() { } public ZkLockException(String message) { super(message); } public ZkLockException(String message, Throwable cause) { super(message, cause); } } 

配置CuratorConfig

 @Configuration public class CuratorConfig { @Value("${zk.connectionString}") private String connectionString; @Value("${zk.sessionTimeoutMs:500}") private int sessionTimeoutMs; @Value("${zk.connectionTimeoutMs:500}") private int connectionTimeoutMs; @Value("${zk.dLockRoot:/dLock}") private String dLockRoot; @Bean public CuratorFactoryBean curatorFactoryBean() { return new CuratorFactoryBean(connectionString, sessionTimeoutMs, connectionTimeoutMs); } @Bean @Autowired public DLock dLock(CuratorFramework client) { return new DLock(dLockRoot, client); } } 

测试代码

 @RestController @RequestMapping("/dLock") public class LockController { @Autowired private DLock dLock; @RequestMapping("/lock") public Map testDLock(String no){ final String path = Constant.keyBuilder("/test/no/", no); Long mutex=0l; try { System.out.println("在拿锁:"+path+System.currentTimeMillis()); mutex = dLock.mutex(path, () -> { try { System.out.println("拿到锁了" + System.currentTimeMillis()); Thread.sleep(10000); System.out.println("操作完成了" + System.currentTimeMillis()); } finally { return System.currentTimeMillis(); } }, 1000, TimeUnit.MILLISECONDS); } catch (ZkLockException e) { System.out.println("拿不到锁呀"+System.currentTimeMillis()); } return Collections.singletonMap("ret",mutex); } @RequestMapping("/dlock") public Map testDLock1(String no){ final String path = Constant.keyBuilder("/test/no/", no); Long mutex=0l; try { System.out.println("在拿锁:"+path+System.currentTimeMillis()); InterProcessMutex processMutex = dLock.mutex(path); processMutex.acquire(); System.out.println("拿到锁了" + System.currentTimeMillis()); Thread.sleep(10000); processMutex.release(); System.out.println("操作完成了" + System.currentTimeMillis()); } catch (ZkLockException e) { System.out.println("拿不到锁呀"+System.currentTimeMillis()); e.printStackTrace(); }catch (Exception e){ e.printStackTrace(); } return Collections.singletonMap("ret",mutex); } @RequestMapping("/del") public Map delDLock(String no){ final String path = Constant.keyBuilder("/test/no/", no); dLock.del(path); return Collections.singletonMap("ret",1); } } 

以上就是浅谈Java(SpringBoot)基于zookeeper的分布式锁实现的详细内容,更多请关注gaodaima搞代码网其它相关文章!


搞代码网(gaodaima.com)提供的所有资源部分来自互联网,如果有侵犯您的版权或其他权益,请说明详细缘由并提供版权或权益证明然后发送到邮箱[email protected],我们会在看到邮件的第一时间内为您处理,或直接联系QQ:872152909。本网站采用BY-NC-SA协议进行授权
转载请注明原文链接:浅谈Java(SpringBoot)基于zookeeper的分布式锁实现

喜欢 (0)
[搞代码]
分享 (0)
发表我的评论
取消评论

表情 贴图 加粗 删除线 居中 斜体 签到

Hi,您需要填写昵称和邮箱!

  • 昵称 (必填)
  • 邮箱 (必填)
  • 网址