首先通过curatorframeworkfactory创建一个连接zookeeper的连接curatorframework clientpublic class curatorfactorybean implements factorybean<curatorframework>, initializingbean, disposablebean {
private static final logger logger = loggerfactory.getlogger(contractfileinfocontroller.class);
private string connectionstring;
private int sessiontimeoutms;
private int connectiontimeoutms;
private retrypolicy retrypolicy;
private curatorframework client;
public curatorfactorybean(string connectionstring) {
this(connectionstring, 500, 500);
public curatorfactorybean(string connectionstring, int sessiontimeoutms, int connectiontimeoutms) {
this.connectionstring = connectionstring;
this.sessiontimeoutms = sessiontimeoutms;
this.connectiontimeoutms = connectiontimeoutms;
public void destroy() throws exception {
logger.info("closing curator framework...");
logger.info("closed curator framework.");
public curatorframework getobject() throws exception {
return this.client;
public class<?> getobjecttype() {
return this.client != null ? this.client.getclass() : curatorframework.class;
public boolean issingleton() {
return true;
public void afterpropertiesset() throws exception {
if (stringutils.isempty(this.connectionstring)) {
throw new illegalstateexception("connectionstring can not be empty.");
} else {
if (this.retrypolicy == null) {
this.retrypolicy = new exponentialbackoffretry(1000, 2147483647, 180000);
this.client = curatorframeworkfactory.newclient(this.connectionstring, this.sessiontimeoutms, this.connectiontimeoutms, this.retrypolicy);
this.client.blockuntilconnected(30, timeunit.milliseconds);
public void setconnectionstring(string connectionstring) {
this.connectionstring = connectionstring;
public void setsessiontimeoutms(int sessiontimeoutms) {
this.sessiontimeoutms = sessiontimeoutms;
public void setconnectiontimeoutms(int connectiontimeoutms) {
this.connectiontimeoutms = connectiontimeoutms;
public void setretrypolicy(retrypolicy retrypolicy) {
this.retrypolicy = retrypolicy;
public void setclient(curatorframework client) {
this.client = client;
根据curatorframework创建interprocessmutex(分布式可重入排它锁)对一行数据进行上锁public interprocessmutex(curatorframework client, string path) {
this(client, path, new standardlockinternalsdriver());
使用 acquire方法
1、acquire() :入参为空,调用该方法后,会一直堵塞,直到抢夺到锁资源,或者zookeeper连接中断后,上抛异常。
2、acquire(long time, timeunit unit):入参传入超时时间、单位,抢夺时,如果出现堵塞,会在超过该时间后,返回false。public void acquire() throws exception {
if (!this.internallock(-1l, (timeunit)null)) {
throw new ioexception("lost connection while trying to acquire lock: " + this.basepath);
public boolean acquire(long time, timeunit unit) throws exception {
return this.internallock(time, unit);
释放锁 mutex.release();public void release() throws exception {
thread currentthread = thread.currentthread();
interprocessmutex.lockdata lockdata = (interprocessmutex.lockdata)this.threaddata.get(currentthread);
if (lockdata == null) {
throw new illegalmonitorstateexception("you do not own the lock: " + this.basepath);
} else {
int newlockcount = lockdata.lockcount.decrementandget();
if (newlockcount <= 0) {
if (newlockcount < 0) {
throw new illegalmonitorstateexception("lock count has gone negative for lock: " + this.basepath);
} else {
try {
} finally {
1、调用interprocessmutex processmutex = dlock.mutex(path);
推荐 使用:
都是 函数式编程
在业务代码执行完毕后 会释放锁和删除path
public t mutex(string path, zklockcallback zklockcallback, long time, timeunit timeunit)
public void mutex(string path, zkvoidcallback zklockcallback, long time, timeunit timeunit)public class dlock {
private final logger logger;
private static final long timeout_d = 100l;
private static final string root_path_d = "/dlock";
private string lockrootpath;
private curatorframework client;
public dlock(curatorframework client) {
this("/dlock", client);
public dlock(string lockrootpath, curatorframework client) {
this.logger = loggerfactory.getlogger(dlock.class);
this.lockrootpath = lockrootpath;
this.client = client;
public interprocessmutex mutex(string path) {
if (!stringutils.startswith(path, "/")) {
path = constant.keybuilder(new object[]{"/", path});
return new interprocessmutex(this.client, constant.keybuilder(new object[]{this.lockrootpath, "", path}));
public <t> t mutex(string path, zklockcallback<t> zklockcallback) throws zklockexception {
return this.mutex(path, zklockcallback, 100l, timeunit.milliseconds);
public <t> t mutex(string path, zklockcallback<t> zklockcallback, long time, timeunit timeunit) throws zklockexception {
string finalpath = this.getlockpath(path);
interprocessmutex mutex = new interprocessmutex(this.client, finalpath);
try {
if (!mutex.acquire(time, timeunit)) {
throw new zklockexception("acquire zk lock return false");
} catch (exception var13) {
throw new zklockexception("acquire zk lock failed.", var13);
t var8;
try {
var8 = zklockcallback.doinlock();
} finally {
this.releaselock(finalpath, mutex);
return var8;
private void releaselock(string finalpath, interprocessmutex mutex) {
try {
this.logger.info("delete zk node path:{}", finalpath);
} catch (exception var4) {
this.logger.error("dlock", "release lock failed, path:{}", finalpath, var4);
// logutil.error(this.logger, "dlock", "release lock failed, path:{}", new object[]{finalpath, var4});
public void mutex(string path, zkvoidcallback zklockcallback, long time, timeunit timeunit) throws zklockexception {
string finalpath = this.getlockpath(path);
interprocessmutex mutex = new interprocessmutex(this.client, finalpath);
try {
if (!mutex.acquire(time, timeunit)) {
throw new zklockexception("acquire zk lock return false");
} catch (exception var13) {
throw new zklockexception("acquire zk lock failed.", var13);
try {
} finally {
this.releaselock(finalpath, mutex);
public string getlockpath(string custompath) {
if (!stringutils.startswith(custompath, "/")) {
custompath = constant.keybuilder(new object[]{"/", custompath});
string finalpath = constant.keybuilder(new object[]{this.lockrootpath, "", custompath});
return finalpath;
private void deleteinternal(string finalpath) {
try {
} catch (exception var3) {
this.logger.info("delete zk node path:{} failed", finalpath);
public void del(string custompath) {
string lockpath = "";
try {
lockpath = this.getlockpath(custompath);
} catch (exception var4) {
this.logger.info("delete zk node path:{} failed", lockpath);
public interface zklockcallback<t> {
t doinlock();
public interface zkvoidcallback {
void response();
public class zklockexception extends exception {
public zklockexception() {
public zklockexception(string message) {
public zklockexception(string message, throwable cause) {
super(message, cause);
public class curatorconfig {
private string connectionstring;
private int sessiontimeoutms;
private int connectiontimeoutms;
private string dlockroot;
public curatorfactorybean curatorfactorybean() {
return new curatorfactorybean(connectionstring, sessiontimeoutms, connectiontimeoutms);
public dlock dlock(curatorframework client) {
return new dlock(dlockroot, client);
public class lockcontroller {
private dlock dlock;
public map testdlock(string no){
final string path = constant.keybuilder("/test/no/", no);
long mutex=0l;
try {
mutex = dlock.mutex(path, () -> {
try {
system.out.println("拿到锁了" + system.currenttimemillis());
system.out.println("操作完成了" + system.currenttimemillis());
} finally {
return system.currenttimemillis();
}, 1000, timeunit.milliseconds);
} catch (zklockexception e) {
return collections.singletonmap("ret",mutex);
public map testdlock1(string no){
final string path = constant.keybuilder("/test/no/", no);
long mutex=0l;
try {
interprocessmutex processmutex = dlock.mutex(path);
system.out.println("拿到锁了" + system.currenttimemillis());
system.out.println("操作完成了" + system.currenttimemillis());
} catch (zklockexception e) {
}catch (exception e){
return collections.singletonmap("ret",mutex);
public map deldlock(string no){
final string path = constant.keybuilder("/test/no/", no);
return collections.singletonmap("ret",1);
以上所述是小编给大家介绍的java(springboot)基于zookeeper的分布式锁实现详解整合,希望对大家有所帮助,如果大家有任何疑问请给我留言,小编会及时回复大家的。在此也非常感谢大家对CodeAE代码之家 网站的支持!