fix: WxMpServiceImpl中用ThreadLocal记录重试次数,这个是有问题的,因为在多线程(线程池)环境下ThreadLocal是不会被清0的

This commit is contained in:
Daniel Qian
2015-01-21 14:08:34 +08:00
parent 1635024146
commit e53c921d17
8 changed files with 280 additions and 72 deletions

View File

@ -461,4 +461,22 @@ public interface WxMpService {
* @param wxConfigProvider
*/
public void setWxMpConfigStorage(WxMpConfigStorage wxConfigProvider);
/**
* <pre>
* 设置当微信系统响应系统繁忙时,要等待多少 retrySleepMillis(ms) * 2^(重试次数 - 1) 再发起重试
* 默认1000ms
* </pre>
* @param retrySleepMillis
*/
void setRetrySleepMillis(int retrySleepMillis);
/**
* <pre>
* 设置当微信系统响应系统繁忙时,最大重试次数
* 默认5次
* </pre>
* @param maxRetryTimes
*/
void setMaxRetryTimes(int maxRetryTimes);
}

View File

@ -54,12 +54,14 @@ public class WxMpServiceImpl implements WxMpService {
protected WxMpConfigStorage wxMpConfigStorage;
protected final ThreadLocal<Integer> retryTimes = new ThreadLocal<Integer>();
protected CloseableHttpClient httpClient;
protected HttpHost httpProxy;
private int retrySleepMillis = 1000;
private int maxRetryTimes = 5;
public boolean checkSignature(String timestamp, String nonce, String signature) {
try {
return SHA1.gen(wxMpConfigStorage.getToken(), timestamp, nonce).equals(signature);
@ -439,7 +441,7 @@ public class WxMpServiceImpl implements WxMpService {
return execute(new SimplePostRequestExecutor(), url, postData);
}
/**
/**
* 向微信端发送请求在这里执行的策略是当发生access_token过期时才去刷新然后重新执行请求而不是全局定时请求
* @param executor
* @param uri
@ -448,6 +450,33 @@ public class WxMpServiceImpl implements WxMpService {
* @throws WxErrorException
*/
public <T, E> T execute(RequestExecutor<T, E> executor, String uri, E data) throws WxErrorException {
int retryTimes = 0;
do {
try {
return executeInternal(executor, uri, data);
} catch (WxErrorException e) {
WxError error = e.getError();
/**
* -1 系统繁忙, 1000ms后重试
*/
if (error.getErrorCode() == -1) {
int sleepMillis = retrySleepMillis * (1 << retryTimes);
try {
System.out.println("微信系统繁忙," + sleepMillis + "ms后重试(第" + (retryTimes + 1) + "次)");
Thread.sleep(sleepMillis);
} catch (InterruptedException e1) {
throw new RuntimeException(e1);
}
} else {
throw e;
}
}
} while(++retryTimes < maxRetryTimes);
throw new RuntimeException("微信服务端异常,超出重试次数");
}
protected <T, E> T executeInternal(RequestExecutor<T, E> executor, String uri, E data) throws WxErrorException {
String accessToken = getAccessToken(false);
String uriWithAccessToken = uri;
@ -467,27 +496,6 @@ public class WxMpServiceImpl implements WxMpService {
wxMpConfigStorage.expireAccessToken();
return execute(executor, uri, data);
}
/**
* -1 系统繁忙, 1000ms后重试
*/
if (error.getErrorCode() == -1) {
if(retryTimes.get() == null) {
retryTimes.set(0);
}
if (retryTimes.get() > 4) {
retryTimes.set(0);
throw new RuntimeException("微信服务端异常,超出重试次数");
}
int sleepMillis = 1000 * (1 << retryTimes.get());
try {
System.out.println("微信系统繁忙," + sleepMillis + "ms后重试");
Thread.sleep(sleepMillis);
retryTimes.set(retryTimes.get() + 1);
return execute(executor, uri, data);
} catch (InterruptedException e1) {
throw new RuntimeException(e1);
}
}
if (error.getErrorCode() != 0) {
throw new WxErrorException(error);
}
@ -533,4 +541,16 @@ public class WxMpServiceImpl implements WxMpService {
}
}
@Override
public void setRetrySleepMillis(int retrySleepMillis) {
this.retrySleepMillis = retrySleepMillis;
}
@Override
public void setMaxRetryTimes(int maxRetryTimes) {
this.maxRetryTimes = maxRetryTimes;
}
}

View File

@ -0,0 +1,66 @@
package me.chanjar.weixin.mp.api;
import me.chanjar.weixin.common.bean.result.WxError;
import me.chanjar.weixin.common.exception.WxErrorException;
import me.chanjar.weixin.common.util.http.RequestExecutor;
import org.testng.annotations.DataProvider;
import org.testng.annotations.Test;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
@Test
public class WxMpBusyRetryTest {
@DataProvider(name="getService")
public Object[][] getService() {
WxMpService service = new WxMpServiceImpl() {
@Override
protected <T, E> T executeInternal(RequestExecutor<T, E> executor, String uri, E data) throws WxErrorException {
WxError error = new WxError();
error.setErrorCode(-1);
throw new WxErrorException(error);
}
};
service.setMaxRetryTimes(3);
service.setRetrySleepMillis(500);
return new Object[][] {
new Object[] { service }
};
}
@Test(dataProvider = "getService", expectedExceptions = RuntimeException.class)
public void testRetry(WxMpService service) throws WxErrorException {
service.execute(null, null, null);
}
@Test(dataProvider = "getService")
public void testRetryInThreadPool(final WxMpService service) throws InterruptedException, ExecutionException {
// 当线程池中的线程复用的时候,还是能保证相同的重试次数
ExecutorService executorService = Executors.newFixedThreadPool(1);
Runnable runnable = new Runnable() {
@Override
public void run() {
try {
System.out.println("=====================");
System.out.println(Thread.currentThread().getName() + ": testRetry");
service.execute(null, null, null);
} catch (WxErrorException e) {
throw new RuntimeException(e);
} catch (RuntimeException e) {
// OK
}
}
};
Future<?> submit1 = executorService.submit(runnable);
Future<?> submit2 = executorService.submit(runnable);
submit1.get();
submit2.get();
}
}

View File

@ -3,6 +3,7 @@
<suite name="Weixin-java-tool-suite" verbose="1">
<test name="API_Test">
<classes>
<class name="me.chanjar.weixin.mp.api.WxMpBusyRetryTest" />
<class name="me.chanjar.weixin.mp.api.WxMpBaseAPITest" />
<class name="me.chanjar.weixin.mp.api.WxMpCustomMessageAPITest" />
<class name="me.chanjar.weixin.mp.api.WxMpMenuAPITest" />