diff --git a/weixin-java-miniapp/src/main/java/cn/binarywang/wx/miniapp/config/impl/AbstractWxMaRedisConfig.java b/weixin-java-miniapp/src/main/java/cn/binarywang/wx/miniapp/config/impl/AbstractWxMaRedisConfig.java index 19d3a00f6..aabdd4893 100644 --- a/weixin-java-miniapp/src/main/java/cn/binarywang/wx/miniapp/config/impl/AbstractWxMaRedisConfig.java +++ b/weixin-java-miniapp/src/main/java/cn/binarywang/wx/miniapp/config/impl/AbstractWxMaRedisConfig.java @@ -1,10 +1,11 @@ package cn.binarywang.wx.miniapp.config.impl; -import com.github.jedis.lock.JedisLock; import me.chanjar.weixin.common.error.WxRuntimeException; import redis.clients.jedis.Jedis; +import redis.clients.jedis.params.SetParams; import java.io.File; +import java.util.Collections; import java.util.HashMap; import java.util.Map; import java.util.concurrent.TimeUnit; @@ -223,16 +224,21 @@ public abstract class AbstractWxMaRedisConfig extends WxMaDefaultConfigImpl { */ private class DistributedLock implements Lock { - private JedisLock lock; + private final String LOCK_SUCCESS = "OK"; + + private final Long RELEASE_SUCCESS = 1L; + + private String lockKey; private DistributedLock(String key) { - this.lock = new JedisLock(getRedisKey(key)); + this.lockKey = key; } @Override public void lock() { try (Jedis jedis = getConfiguredJedis()) { - if (!lock.acquire(jedis)) { + + if (!acquire(jedis)) { throw new WxRuntimeException("acquire timeouted"); } } catch (InterruptedException e) { @@ -240,10 +246,11 @@ public abstract class AbstractWxMaRedisConfig extends WxMaDefaultConfigImpl { } } + @Override public void lockInterruptibly() throws InterruptedException { try (Jedis jedis = getConfiguredJedis()) { - if (!lock.acquire(jedis)) { + if (!acquire(jedis)) { throw new WxRuntimeException("acquire timeouted"); } } @@ -252,7 +259,7 @@ public abstract class AbstractWxMaRedisConfig extends WxMaDefaultConfigImpl { @Override public boolean tryLock() { try (Jedis jedis = getConfiguredJedis()) { - return lock.acquire(jedis); + return acquire(jedis); } catch (InterruptedException e) { throw new WxRuntimeException("lock failed", e); } @@ -261,14 +268,14 @@ public abstract class AbstractWxMaRedisConfig extends WxMaDefaultConfigImpl { @Override public boolean tryLock(long time, TimeUnit unit) throws InterruptedException { try (Jedis jedis = getConfiguredJedis()) { - return lock.acquire(jedis); + return acquire(jedis); } } @Override public void unlock() { try (Jedis jedis = getConfiguredJedis()) { - lock.release(jedis); + releaseDistributedLock(jedis); } } @@ -277,5 +284,54 @@ public abstract class AbstractWxMaRedisConfig extends WxMaDefaultConfigImpl { throw new WxRuntimeException("unsupported method"); } + + /** + * 尝试获取锁 有限次数的重试 + * + * @param jedis + * @return + * @throws InterruptedException + */ + private Boolean acquire(Jedis jedis) throws InterruptedException { + Integer i = 0; + do { + i++; + boolean locked = tryGetDistributedLock(jedis); + if (locked) { + return true; + } else { + Thread.sleep(100L); + } + } while (i < 20); + return false; + } + + /** + * 尝试获取锁 + * + * @param jedis + * @return + */ + private Boolean tryGetDistributedLock(Jedis jedis) { + Long millisecondsToExpire = 2L; + Long threadId = Thread.currentThread().getId(); + String result = jedis.set(this.lockKey, threadId.toString(), SetParams.setParams().nx().px(millisecondsToExpire)); + return LOCK_SUCCESS.equals(result); + } + + + /** + * 释放分布式锁 + * + * @param jedis + * @return 是否释放成功 + */ + private Boolean releaseDistributedLock(Jedis jedis) { + Long threadId = Thread.currentThread().getId(); + String script = "if redis.call('get', KEYS[1]) == ARGV[1] then return redis.call('del', KEYS[1]) else return 0 end"; + Object result = jedis.eval(script, Collections.singletonList(lockKey), Collections.singletonList(threadId.toString())); + return RELEASE_SUCCESS.equals(result); + } + } }