issue #69 添加Session的支持

This commit is contained in:
Daniel Qian
2015-01-21 16:01:33 +08:00
parent 7ea6e3ec03
commit 4accbe6ec2
10 changed files with 205 additions and 35 deletions

View File

@ -39,11 +39,9 @@ public interface InternalSession {
void access(); void access();
/** /**
* Set the <code>isNew</code> flag for this session. * End the access.
*
* @param isNew The new value for the <code>isNew</code> flag
*/ */
void setNew(boolean isNew); void endAccess();
/** /**
* Set the creation time for this session. This method is called by the * Set the creation time for this session. This method is called by the

View File

@ -4,6 +4,7 @@ import me.chanjar.weixin.common.util.res.StringManager;
import java.util.*; import java.util.*;
import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicInteger;
public class SessionImpl implements WxSession, InternalSession { public class SessionImpl implements WxSession, InternalSession {
@ -90,11 +91,6 @@ public class SessionImpl implements WxSession, InternalSession {
*/ */
protected volatile boolean isValid = false; protected volatile boolean isValid = false;
/**
* Flag indicating whether this session is new or not.
*/
protected boolean isNew = false;
/** /**
* We are currently processing a session expiration, so bypass * We are currently processing a session expiration, so bypass
* certain IllegalStateException tests. NOTE: This value is not * certain IllegalStateException tests. NOTE: This value is not
@ -123,11 +119,6 @@ public class SessionImpl implements WxSession, InternalSession {
*/ */
protected volatile long thisAccessedTime = creationTime; protected volatile long thisAccessedTime = creationTime;
/**
* The last accessed time for this Session.
*/
protected volatile long lastAccessedTime = creationTime;
/** /**
* The default maximum inactive interval for Sessions created by * The default maximum inactive interval for Sessions created by
* this Manager. * this Manager.
@ -140,9 +131,15 @@ public class SessionImpl implements WxSession, InternalSession {
*/ */
protected transient InternalSessionFacade facade = null; protected transient InternalSessionFacade facade = null;
/**
* The access count for this session.
*/
protected transient AtomicInteger accessCount = null;
public SessionImpl(InternalSessionManager manager) { public SessionImpl(InternalSessionManager manager) {
this.manager = manager; this.manager = manager;
this.accessCount = new AtomicInteger();
} }
@ -176,7 +173,28 @@ public class SessionImpl implements WxSession, InternalSession {
@Override @Override
public boolean isValid() { public boolean isValid() {
return isValid; if (!this.isValid) {
return false;
}
if (this.expiring) {
return true;
}
if (accessCount.get() > 0) {
return true;
}
if (maxInactiveInterval > 0) {
long timeNow = System.currentTimeMillis();
int timeIdle;
timeIdle = (int) ((timeNow - thisAccessedTime) / 1000L);
if (timeIdle >= maxInactiveInterval) {
expire();
}
}
return this.isValid;
} }
@Override @Override
@ -215,6 +233,8 @@ public class SessionImpl implements WxSession, InternalSession {
// Mark this session as "being expired" // Mark this session as "being expired"
expiring = true; expiring = true;
accessCount.set(0);
// Remove this session from our manager's active sessions // Remove this session from our manager's active sessions
manager.remove(this, true); manager.remove(this, true);
@ -238,23 +258,23 @@ public class SessionImpl implements WxSession, InternalSession {
public void access() { public void access() {
this.thisAccessedTime = System.currentTimeMillis(); this.thisAccessedTime = System.currentTimeMillis();
accessCount.incrementAndGet();
} }
@Override @Override
public void setNew(boolean isNew) { public void endAccess() {
this.isNew = isNew; this.thisAccessedTime = System.currentTimeMillis();
accessCount.decrementAndGet();
} }
@Override @Override
public void setCreationTime(long time) { public void setCreationTime(long time) {
this.creationTime = time; this.creationTime = time;
this.lastAccessedTime = time;
this.thisAccessedTime = time; this.thisAccessedTime = time;
} }

View File

@ -6,6 +6,7 @@ import org.slf4j.LoggerFactory;
import java.util.Map; import java.util.Map;
import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicBoolean;
public class SessionManagerImpl implements WxSessionManager, InternalSessionManager { public class SessionManagerImpl implements WxSessionManager, InternalSessionManager {
@ -104,6 +105,11 @@ public class SessionManagerImpl implements WxSessionManager, InternalSessionMana
*/ */
protected int processExpiresFrequency = 6; protected int processExpiresFrequency = 6;
/**
* 后台清理线程是否已经开启
*/
private final AtomicBoolean backgroundProcessStarted = new AtomicBoolean(false);
@Override @Override
public void remove(InternalSession session) { public void remove(InternalSession session) {
remove(session, false); remove(session, false);
@ -155,7 +161,6 @@ public class SessionManagerImpl implements WxSessionManager, InternalSessionMana
InternalSession session = createEmptySession(); InternalSession session = createEmptySession();
// Initialize the properties of the new session and return it // Initialize the properties of the new session and return it
session.setNew(true);
session.setValid(true); session.setValid(true);
session.setCreationTime(System.currentTimeMillis()); session.setCreationTime(System.currentTimeMillis());
session.setMaxInactiveInterval(this.maxInactiveInterval); session.setMaxInactiveInterval(this.maxInactiveInterval);
@ -191,6 +196,26 @@ public class SessionManagerImpl implements WxSessionManager, InternalSessionMana
@Override @Override
public void add(InternalSession session) { public void add(InternalSession session) {
// 当第一次有session创建的时候开启session清理线程
if (!backgroundProcessStarted.getAndSet(true)) {
Thread t = new Thread(new Runnable() {
@Override
public void run() {
while (true) {
try {
// 每秒清理一次
Thread.sleep(1000l);
backgroundProcess();
} catch (InterruptedException e) {
log.error("SessionManagerImpl.backgroundProcess error", e);
}
}
}
});
t.setDaemon(true);
t.start();
}
sessions.put(session.getIdInternal(), session); sessions.put(session.getIdInternal(), session);
int size = getActiveSessions(); int size = getActiveSessions();
if( size > maxActive ) { if( size > maxActive ) {

View File

@ -2,9 +2,19 @@ package me.chanjar.weixin.common.session;
public interface WxSessionManager { public interface WxSessionManager {
/**
* 获取某个sessionId对应的session,如果sessionId没有对应的session则新建一个并返回。
* @param sessionId
* @return
*/
public WxSession getSession(String sessionId); public WxSession getSession(String sessionId);
/**
* 获取某个sessionId对应的session,如果sessionId没有对应的session若create为true则新建一个否则返回null。
* @param sessionId
* @param create
* @return
*/
public WxSession getSession(String sessionId, boolean create); public WxSession getSession(String sessionId, boolean create);

View File

@ -2,6 +2,7 @@ package me.chanjar.weixin.common.util;
import java.util.Map; import java.util.Map;
import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicBoolean;
/** /**
* <pre> * <pre>
@ -21,8 +22,16 @@ public class WxMsgIdMemoryDuplicateChecker implements WxMsgIdDuplicateChecker {
*/ */
private final Long clearPeriod; private final Long clearPeriod;
/**
* 消息id->消息时间戳的map
*/
private final ConcurrentHashMap<Long, Long> msgId2Timestamp = new ConcurrentHashMap<Long, Long>(); private final ConcurrentHashMap<Long, Long> msgId2Timestamp = new ConcurrentHashMap<Long, Long>();
/**
* 后台清理线程是否已经开启
*/
private final AtomicBoolean backgroundProcessStarted = new AtomicBoolean(false);
/** /**
* WxMsgIdInMemoryDuplicateChecker构造函数 * WxMsgIdInMemoryDuplicateChecker构造函数
* <pre> * <pre>
@ -33,7 +42,6 @@ public class WxMsgIdMemoryDuplicateChecker implements WxMsgIdDuplicateChecker {
public WxMsgIdMemoryDuplicateChecker() { public WxMsgIdMemoryDuplicateChecker() {
this.timeToLive = 15 * 1000l; this.timeToLive = 15 * 1000l;
this.clearPeriod = 5 * 1000l; this.clearPeriod = 5 * 1000l;
this.start();
} }
/** /**
@ -44,10 +52,12 @@ public class WxMsgIdMemoryDuplicateChecker implements WxMsgIdDuplicateChecker {
public WxMsgIdMemoryDuplicateChecker(Long timeToLive, Long clearPeriod) { public WxMsgIdMemoryDuplicateChecker(Long timeToLive, Long clearPeriod) {
this.timeToLive = timeToLive; this.timeToLive = timeToLive;
this.clearPeriod = clearPeriod; this.clearPeriod = clearPeriod;
this.start();
} }
private void start() { protected void checkBackgroundProcessStarted() {
if (backgroundProcessStarted.getAndSet(true)) {
return;
}
Thread t = new Thread(new Runnable() { Thread t = new Thread(new Runnable() {
@Override @Override
public void run() { public void run() {
@ -72,6 +82,7 @@ public class WxMsgIdMemoryDuplicateChecker implements WxMsgIdDuplicateChecker {
@Override @Override
public boolean isDuplicate(Long wxMsgId) { public boolean isDuplicate(Long wxMsgId) {
checkBackgroundProcessStarted();
Long timestamp = msgId2Timestamp.putIfAbsent(wxMsgId, System.currentTimeMillis()); Long timestamp = msgId2Timestamp.putIfAbsent(wxMsgId, System.currentTimeMillis());
if (timestamp == null) { if (timestamp == null) {
// 第一次接收到这个消息 // 第一次接收到这个消息

View File

@ -3,6 +3,8 @@ package me.chanjar.weixin.cp.api;
import me.chanjar.weixin.common.bean.WxMenu; import me.chanjar.weixin.common.bean.WxMenu;
import me.chanjar.weixin.common.bean.result.WxMediaUploadResult; import me.chanjar.weixin.common.bean.result.WxMediaUploadResult;
import me.chanjar.weixin.common.exception.WxErrorException; import me.chanjar.weixin.common.exception.WxErrorException;
import me.chanjar.weixin.common.session.WxSession;
import me.chanjar.weixin.common.session.WxSessionManager;
import me.chanjar.weixin.common.util.http.RequestExecutor; import me.chanjar.weixin.common.util.http.RequestExecutor;
import me.chanjar.weixin.cp.bean.WxCpDepart; import me.chanjar.weixin.cp.bean.WxCpDepart;
import me.chanjar.weixin.cp.bean.WxCpMessage; import me.chanjar.weixin.cp.bean.WxCpMessage;
@ -371,4 +373,28 @@ public interface WxCpService {
* @param maxRetryTimes * @param maxRetryTimes
*/ */
void setMaxRetryTimes(int maxRetryTimes); void setMaxRetryTimes(int maxRetryTimes);
/**
* 获取某个sessionId对应的session,如果sessionId没有对应的session则新建一个并返回。
* @param id id可以为任意字符串建议使用FromUserName作为id
* @return
*/
WxSession getSession(String id);
/**
* 获取某个sessionId对应的session,如果sessionId没有对应的session若create为true则新建一个否则返回null。
* @param id id可以为任意字符串建议使用FromUserName作为id
* @param create
* @return
*/
WxSession getSession(String id, boolean create);
/**
* <pre>
* 设置WxSessionManager只有当需要使用个性化的WxSessionManager的时候才需要调用此方法
* WxCpService默认使用的是{@link me.chanjar.weixin.common.session.SessionManagerImpl}
* </pre>
* @param sessionManager
*/
void setSessionManager(WxSessionManager sessionManager);
} }

View File

@ -12,6 +12,9 @@ import me.chanjar.weixin.common.bean.WxMenu;
import me.chanjar.weixin.common.bean.result.WxError; import me.chanjar.weixin.common.bean.result.WxError;
import me.chanjar.weixin.common.bean.result.WxMediaUploadResult; import me.chanjar.weixin.common.bean.result.WxMediaUploadResult;
import me.chanjar.weixin.common.exception.WxErrorException; import me.chanjar.weixin.common.exception.WxErrorException;
import me.chanjar.weixin.common.session.SessionManagerImpl;
import me.chanjar.weixin.common.session.WxSession;
import me.chanjar.weixin.common.session.WxSessionManager;
import me.chanjar.weixin.common.util.StringUtils; import me.chanjar.weixin.common.util.StringUtils;
import me.chanjar.weixin.common.util.crypto.SHA1; import me.chanjar.weixin.common.util.crypto.SHA1;
import me.chanjar.weixin.common.util.fs.FileUtils; import me.chanjar.weixin.common.util.fs.FileUtils;
@ -64,6 +67,8 @@ public class WxCpServiceImpl implements WxCpService {
private int maxRetryTimes = 5; private int maxRetryTimes = 5;
protected WxSessionManager sessionManager = new SessionManagerImpl();
public boolean checkSignature(String msgSignature, String timestamp, String nonce, String data) { public boolean checkSignature(String msgSignature, String timestamp, String nonce, String data) {
try { try {
return SHA1.gen(wxCpConfigStorage.getToken(), timestamp, nonce, data).equals(msgSignature); return SHA1.gen(wxCpConfigStorage.getToken(), timestamp, nonce, data).equals(msgSignature);
@ -473,4 +478,26 @@ public class WxCpServiceImpl implements WxCpService {
this.maxRetryTimes = maxRetryTimes; this.maxRetryTimes = maxRetryTimes;
} }
@Override
public WxSession getSession(String id) {
if (sessionManager == null) {
return null;
}
return sessionManager.getSession(id);
}
@Override
public WxSession getSession(String id, boolean create) {
if (sessionManager == null) {
return null;
}
return sessionManager.getSession(id, create);
}
@Override
public void setSessionManager(WxSessionManager sessionManager) {
this.sessionManager = sessionManager;
}
} }

View File

@ -1,16 +1,25 @@
package me.chanjar.weixin.mp.api; package me.chanjar.weixin.mp.api;
import me.chanjar.weixin.common.session.InternalSession;
import me.chanjar.weixin.common.session.SessionManagerImpl;
import me.chanjar.weixin.common.session.WxSession;
import me.chanjar.weixin.common.session.WxSessionManager;
import me.chanjar.weixin.common.util.WxMsgIdDuplicateChecker; import me.chanjar.weixin.common.util.WxMsgIdDuplicateChecker;
import me.chanjar.weixin.common.util.WxMsgIdMemoryDuplicateChecker; import me.chanjar.weixin.common.util.WxMsgIdMemoryDuplicateChecker;
import me.chanjar.weixin.mp.bean.WxMpXmlMessage; import me.chanjar.weixin.mp.bean.WxMpXmlMessage;
import me.chanjar.weixin.mp.bean.WxMpXmlOutMessage; import me.chanjar.weixin.mp.bean.WxMpXmlOutMessage;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import javax.swing.text.StyledEditorKit;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.HashMap; import java.util.HashMap;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService; import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors; import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.regex.Pattern; import java.util.regex.Pattern;
/** /**
@ -41,6 +50,8 @@ import java.util.regex.Pattern;
*/ */
public class WxMpMessageRouter { public class WxMpMessageRouter {
protected final Logger log = LoggerFactory.getLogger(WxMpMessageRouter.class);
private static final int DEFAULT_THREAD_POOL_SIZE = 20; private static final int DEFAULT_THREAD_POOL_SIZE = 20;
private final List<Rule> rules = new ArrayList<Rule>(); private final List<Rule> rules = new ArrayList<Rule>();
@ -51,6 +62,8 @@ public class WxMpMessageRouter {
private WxMsgIdDuplicateChecker wxMsgIdDuplicateChecker; private WxMsgIdDuplicateChecker wxMsgIdDuplicateChecker;
protected WxSessionManager sessionManager = new SessionManagerImpl();
public WxMpMessageRouter(WxMpService wxMpService) { public WxMpMessageRouter(WxMpService wxMpService) {
this.wxMpService = wxMpService; this.wxMpService = wxMpService;
this.executorService = Executors.newFixedThreadPool(DEFAULT_THREAD_POOL_SIZE); this.executorService = Executors.newFixedThreadPool(DEFAULT_THREAD_POOL_SIZE);
@ -113,21 +126,54 @@ public class WxMpMessageRouter {
} }
WxMpXmlOutMessage res = null; WxMpXmlOutMessage res = null;
final List<Future> futures = new ArrayList<Future>();
for (final Rule rule : matchRules) { for (final Rule rule : matchRules) {
// 返回最后一个非异步的rule的执行结果 // 返回最后一个非异步的rule的执行结果
if(rule.async) { if(rule.async) {
executorService.submit(new Runnable() { futures.add(
public void run() { executorService.submit(new Runnable() {
rule.service(wxMessage); public void run() {
} rule.service(wxMessage);
}); }
})
);
} else { } else {
res = rule.service(wxMessage); res = rule.service(wxMessage);
} }
} }
// 告诉session它已经用不着了
if (futures.size() > 0) {
executorService.submit(new Runnable() {
@Override
public void run() {
for (Future future : futures) {
try {
future.get();
} catch (InterruptedException e) {
log.error("Error happened when wait task finish", e);
} catch (ExecutionException e) {
log.error("Error happened when wait task finish", e);
}
}
// 在这里session再也不会被使用了
sessionEndAccess(wxMessage);
}
});
} else {
// 在这里session再也不会被使用了
sessionEndAccess(wxMessage);
}
return res; return res;
} }
protected void sessionEndAccess(WxMpXmlMessage wxMessage) {
WxSession session = sessionManager.getSession(wxMessage.getFromUserName(), false);
if (session != null) {
((InternalSession) session).endAccess();
}
}
public static class Rule { public static class Rule {
private final WxMpMessageRouter routerBuilder; private final WxMpMessageRouter routerBuilder;

View File

@ -3,6 +3,8 @@ package me.chanjar.weixin.mp.api;
import me.chanjar.weixin.common.bean.WxMenu; import me.chanjar.weixin.common.bean.WxMenu;
import me.chanjar.weixin.common.bean.result.WxMediaUploadResult; import me.chanjar.weixin.common.bean.result.WxMediaUploadResult;
import me.chanjar.weixin.common.exception.WxErrorException; import me.chanjar.weixin.common.exception.WxErrorException;
import me.chanjar.weixin.common.session.WxSession;
import me.chanjar.weixin.common.session.WxSessionManager;
import me.chanjar.weixin.common.util.http.RequestExecutor; import me.chanjar.weixin.common.util.http.RequestExecutor;
import me.chanjar.weixin.mp.bean.*; import me.chanjar.weixin.mp.bean.*;
import me.chanjar.weixin.mp.bean.result.*; import me.chanjar.weixin.mp.bean.result.*;
@ -456,10 +458,10 @@ public interface WxMpService {
*/ */
public <T, E> T execute(RequestExecutor<T, E> executor, String uri, E data) throws WxErrorException; public <T, E> T execute(RequestExecutor<T, E> executor, String uri, E data) throws WxErrorException;
/** /**
* 注入 {@link WxMpConfigStorage} 的实现 * 注入 {@link WxMpConfigStorage} 的实现
* @param wxConfigProvider * @param wxConfigProvider
*/ */
public void setWxMpConfigStorage(WxMpConfigStorage wxConfigProvider); public void setWxMpConfigStorage(WxMpConfigStorage wxConfigProvider);
/** /**
@ -479,4 +481,5 @@ public interface WxMpService {
* @param maxRetryTimes * @param maxRetryTimes
*/ */
void setMaxRetryTimes(int maxRetryTimes); void setMaxRetryTimes(int maxRetryTimes);
} }

View File

@ -10,6 +10,9 @@ import me.chanjar.weixin.common.bean.WxMenu;
import me.chanjar.weixin.common.bean.result.WxError; import me.chanjar.weixin.common.bean.result.WxError;
import me.chanjar.weixin.common.bean.result.WxMediaUploadResult; import me.chanjar.weixin.common.bean.result.WxMediaUploadResult;
import me.chanjar.weixin.common.exception.WxErrorException; import me.chanjar.weixin.common.exception.WxErrorException;
import me.chanjar.weixin.common.session.SessionManagerImpl;
import me.chanjar.weixin.common.session.WxSession;
import me.chanjar.weixin.common.session.WxSessionManager;
import me.chanjar.weixin.common.util.StringUtils; import me.chanjar.weixin.common.util.StringUtils;
import me.chanjar.weixin.common.util.crypto.SHA1; import me.chanjar.weixin.common.util.crypto.SHA1;
import me.chanjar.weixin.common.util.fs.FileUtils; import me.chanjar.weixin.common.util.fs.FileUtils;
@ -66,6 +69,8 @@ public class WxMpServiceImpl implements WxMpService {
private int maxRetryTimes = 5; private int maxRetryTimes = 5;
protected WxSessionManager sessionManager = new SessionManagerImpl();
public boolean checkSignature(String timestamp, String nonce, String signature) { public boolean checkSignature(String timestamp, String nonce, String signature) {
try { try {
return SHA1.gen(wxMpConfigStorage.getToken(), timestamp, nonce).equals(signature); return SHA1.gen(wxMpConfigStorage.getToken(), timestamp, nonce).equals(signature);
@ -545,7 +550,6 @@ public class WxMpServiceImpl implements WxMpService {
} }
} }
@Override @Override
public void setRetrySleepMillis(int retrySleepMillis) { public void setRetrySleepMillis(int retrySleepMillis) {
this.retrySleepMillis = retrySleepMillis; this.retrySleepMillis = retrySleepMillis;