diff --git a/weixin-java-common/src/main/java/me/chanjar/weixin/common/util/LogExceptionHandler.java b/weixin-java-common/src/main/java/me/chanjar/weixin/common/util/LogExceptionHandler.java index 7487a0fe2..35b0eea82 100644 --- a/weixin-java-common/src/main/java/me/chanjar/weixin/common/util/LogExceptionHandler.java +++ b/weixin-java-common/src/main/java/me/chanjar/weixin/common/util/LogExceptionHandler.java @@ -1,20 +1,17 @@ package me.chanjar.weixin.common.util; +import lombok.extern.slf4j.Slf4j; import me.chanjar.weixin.common.api.WxErrorExceptionHandler; import me.chanjar.weixin.common.error.WxErrorException; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - +/** + * @author Daniel Qian + */ +@Slf4j public class LogExceptionHandler implements WxErrorExceptionHandler { - - private Logger log = LoggerFactory.getLogger(WxErrorExceptionHandler.class); - @Override public void handle(WxErrorException e) { - - this.log.error("Error happens", e); - + log.error("Error happens", e); } } diff --git a/weixin-java-cp/src/main/java/me/chanjar/weixin/cp/api/impl/WxCpServiceOnTpImpl.java b/weixin-java-cp/src/main/java/me/chanjar/weixin/cp/api/impl/WxCpServiceOnTpImpl.java index 35eab626a..aa30385d6 100644 --- a/weixin-java-cp/src/main/java/me/chanjar/weixin/cp/api/impl/WxCpServiceOnTpImpl.java +++ b/weixin-java-cp/src/main/java/me/chanjar/weixin/cp/api/impl/WxCpServiceOnTpImpl.java @@ -3,7 +3,7 @@ package me.chanjar.weixin.cp.api.impl; import lombok.RequiredArgsConstructor; import me.chanjar.weixin.common.bean.WxAccessToken; import me.chanjar.weixin.common.error.WxErrorException; -import me.chanjar.weixin.cp.api.WxCpTpService; +import me.chanjar.weixin.cp.tp.service.WxCpTpService; /** *
diff --git a/weixin-java-cp/src/main/java/me/chanjar/weixin/cp/message/WxCpMessageRouter.java b/weixin-java-cp/src/main/java/me/chanjar/weixin/cp/message/WxCpMessageRouter.java
index fca432279..92de0c238 100644
--- a/weixin-java-cp/src/main/java/me/chanjar/weixin/cp/message/WxCpMessageRouter.java
+++ b/weixin-java-cp/src/main/java/me/chanjar/weixin/cp/message/WxCpMessageRouter.java
@@ -1,5 +1,6 @@
 package me.chanjar.weixin.cp.message;
 
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
 import lombok.extern.slf4j.Slf4j;
 import me.chanjar.weixin.common.api.WxErrorExceptionHandler;
 import me.chanjar.weixin.common.api.WxMessageDuplicateChecker;
@@ -17,10 +18,7 @@ import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
-import java.util.concurrent.ExecutionException;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.Future;
+import java.util.concurrent.*;
 
 /**
  * 
@@ -70,7 +68,9 @@ public class WxCpMessageRouter {
    */
   public WxCpMessageRouter(WxCpService wxCpService) {
     this.wxCpService = wxCpService;
-    this.executorService = Executors.newFixedThreadPool(DEFAULT_THREAD_POOL_SIZE);
+    ThreadFactory namedThreadFactory = new ThreadFactoryBuilder().setNameFormat("WxCpMessageRouter-pool-%d").build();
+    this.executorService = new ThreadPoolExecutor(DEFAULT_THREAD_POOL_SIZE, DEFAULT_THREAD_POOL_SIZE,
+      0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<>(), namedThreadFactory);
     this.messageDuplicateChecker = new WxMessageInMemoryDuplicateChecker();
     this.sessionManager = wxCpService.getSessionManager();
     this.exceptionHandler = new LogExceptionHandler();
diff --git a/weixin-java-cp/src/main/java/me/chanjar/weixin/cp/tp/message/WxCpTpMessageHandler.java b/weixin-java-cp/src/main/java/me/chanjar/weixin/cp/tp/message/WxCpTpMessageHandler.java
new file mode 100644
index 000000000..9ab718180
--- /dev/null
+++ b/weixin-java-cp/src/main/java/me/chanjar/weixin/cp/tp/message/WxCpTpMessageHandler.java
@@ -0,0 +1,33 @@
+package me.chanjar.weixin.cp.tp.message;
+
+import me.chanjar.weixin.common.error.WxErrorException;
+import me.chanjar.weixin.common.session.WxSessionManager;
+import me.chanjar.weixin.cp.bean.message.WxCpXmlMessage;
+import me.chanjar.weixin.cp.bean.message.WxCpXmlOutMessage;
+import me.chanjar.weixin.cp.tp.service.WxCpTpService;
+
+import java.util.Map;
+
+/**
+ * 处理微信推送消息的处理器接口
+ *
+ * @author Daniel Qian
+ */
+public interface WxCpTpMessageHandler {
+
+  /**
+   * Handle wx cp xml out message.
+   *
+   * @param wxMessage      the wx message
+   * @param context        上下文,如果handler或interceptor之间有信息要传递,可以用这个
+   * @param wxCpService    the wx cp service
+   * @param sessionManager the session manager
+   * @return xml格式的消息 ,如果在异步规则里处理的话,可以返回null
+   * @throws WxErrorException the wx error exception
+   */
+  WxCpXmlOutMessage handle(WxCpXmlMessage wxMessage,
+                           Map context,
+                           WxCpTpService wxCpService,
+                           WxSessionManager sessionManager) throws WxErrorException;
+
+}
diff --git a/weixin-java-cp/src/main/java/me/chanjar/weixin/cp/tp/message/WxCpTpMessageInterceptor.java b/weixin-java-cp/src/main/java/me/chanjar/weixin/cp/tp/message/WxCpTpMessageInterceptor.java
new file mode 100644
index 000000000..fe5ceefa0
--- /dev/null
+++ b/weixin-java-cp/src/main/java/me/chanjar/weixin/cp/tp/message/WxCpTpMessageInterceptor.java
@@ -0,0 +1,32 @@
+package me.chanjar.weixin.cp.tp.message;
+
+import me.chanjar.weixin.common.error.WxErrorException;
+import me.chanjar.weixin.common.session.WxSessionManager;
+import me.chanjar.weixin.cp.bean.message.WxCpXmlMessage;
+import me.chanjar.weixin.cp.tp.service.WxCpTpService;
+
+import java.util.Map;
+
+/**
+ * 微信消息拦截器,可以用来做验证
+ *
+ * @author Daniel Qian
+ */
+public interface WxCpTpMessageInterceptor {
+
+  /**
+   * 拦截微信消息
+   *
+   * @param wxMessage      the wx message
+   * @param context        上下文,如果handler或interceptor之间有信息要传递,可以用这个
+   * @param wxCpService    the wx cp service
+   * @param sessionManager the session manager
+   * @return true代表OK ,false代表不OK
+   * @throws WxErrorException the wx error exception
+   */
+  boolean intercept(WxCpXmlMessage wxMessage,
+                    Map context,
+                    WxCpTpService wxCpService,
+                    WxSessionManager sessionManager) throws WxErrorException;
+
+}
diff --git a/weixin-java-cp/src/main/java/me/chanjar/weixin/cp/tp/message/WxCpTpMessageMatcher.java b/weixin-java-cp/src/main/java/me/chanjar/weixin/cp/tp/message/WxCpTpMessageMatcher.java
new file mode 100644
index 000000000..8f7decf4b
--- /dev/null
+++ b/weixin-java-cp/src/main/java/me/chanjar/weixin/cp/tp/message/WxCpTpMessageMatcher.java
@@ -0,0 +1,20 @@
+package me.chanjar.weixin.cp.tp.message;
+
+import me.chanjar.weixin.cp.bean.message.WxCpXmlMessage;
+
+/**
+ * 消息匹配器,用在消息路由的时候
+ *
+ * @author Daniel Qian
+ */
+public interface WxCpTpMessageMatcher {
+
+  /**
+   * 消息是否匹配某种模式
+   *
+   * @param message the message
+   * @return the boolean
+   */
+  boolean match(WxCpXmlMessage message);
+
+}
diff --git a/weixin-java-cp/src/main/java/me/chanjar/weixin/cp/tp/message/WxCpTpMessageRouter.java b/weixin-java-cp/src/main/java/me/chanjar/weixin/cp/tp/message/WxCpTpMessageRouter.java
new file mode 100644
index 000000000..147f324db
--- /dev/null
+++ b/weixin-java-cp/src/main/java/me/chanjar/weixin/cp/tp/message/WxCpTpMessageRouter.java
@@ -0,0 +1,235 @@
+package me.chanjar.weixin.cp.tp.message;
+
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
+import lombok.extern.slf4j.Slf4j;
+import me.chanjar.weixin.common.api.WxErrorExceptionHandler;
+import me.chanjar.weixin.common.api.WxMessageDuplicateChecker;
+import me.chanjar.weixin.common.api.WxMessageInMemoryDuplicateChecker;
+import me.chanjar.weixin.common.session.InternalSession;
+import me.chanjar.weixin.common.session.InternalSessionManager;
+import me.chanjar.weixin.common.session.WxSessionManager;
+import me.chanjar.weixin.common.util.LogExceptionHandler;
+import me.chanjar.weixin.cp.bean.message.WxCpXmlMessage;
+import me.chanjar.weixin.cp.bean.message.WxCpXmlOutMessage;
+import me.chanjar.weixin.cp.message.WxCpMessageRouterRule;
+import me.chanjar.weixin.cp.tp.service.WxCpTpService;
+import org.apache.commons.lang3.StringUtils;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.*;
+
+/**
+ * 
+ * 微信消息路由器,通过代码化的配置,把来自微信的消息交给handler处理
+ *
+ * 说明:
+ * 1. 配置路由规则时要按照从细到粗的原则,否则可能消息可能会被提前处理
+ * 2. 默认情况下消息只会被处理一次,除非使用 {@link WxCpMessageRouterRule#next()}
+ * 3. 规则的结束必须用{@link WxCpMessageRouterRule#end()}或者{@link WxCpMessageRouterRule#next()},否则不会生效
+ *
+ * 使用方法:
+ * WxCpMessageRouter router = new WxCpMessageRouter();
+ * router
+ *   .rule()
+ *       .msgType("MSG_TYPE").event("EVENT").eventKey("EVENT_KEY").content("CONTENT")
+ *       .interceptor(interceptor, ...).handler(handler, ...)
+ *   .end()
+ *   .rule()
+ *       // 另外一个匹配规则
+ *   .end()
+ * ;
+ *
+ * // 将WxXmlMessage交给消息路由器
+ * router.route(message);
+ *
+ * 
+ *
+ * @author Daniel Qian
+ */
+@Slf4j
+public class WxCpTpMessageRouter {
+  private static final int DEFAULT_THREAD_POOL_SIZE = 100;
+  private final List rules = new ArrayList<>();
+
+  private final WxCpTpService wxCpService;
+
+  private ExecutorService executorService;
+
+  private WxMessageDuplicateChecker messageDuplicateChecker;
+
+  private WxSessionManager sessionManager;
+
+  private WxErrorExceptionHandler exceptionHandler;
+
+  /**
+   * 构造方法.
+   */
+  public WxCpTpMessageRouter(WxCpTpService wxCpService) {
+    this.wxCpService = wxCpService;
+    ThreadFactory namedThreadFactory = new ThreadFactoryBuilder().setNameFormat("WxCpTpMessageRouter-pool-%d").build();
+    this.executorService = new ThreadPoolExecutor(DEFAULT_THREAD_POOL_SIZE, DEFAULT_THREAD_POOL_SIZE,
+      0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<>(), namedThreadFactory);
+    this.messageDuplicateChecker = new WxMessageInMemoryDuplicateChecker();
+    this.sessionManager = wxCpService.getSessionManager();
+    this.exceptionHandler = new LogExceptionHandler();
+  }
+
+  /**
+   * 
+   * 设置自定义的 {@link ExecutorService}
+   * 如果不调用该方法,默认使用 Executors.newFixedThreadPool(100)
+   * 
+   */
+  public void setExecutorService(ExecutorService executorService) {
+    this.executorService = executorService;
+  }
+
+  /**
+   * 
+   * 设置自定义的 {@link WxMessageDuplicateChecker}
+   * 如果不调用该方法,默认使用 {@link WxMessageInMemoryDuplicateChecker}
+   * 
+   */
+  public void setMessageDuplicateChecker(WxMessageDuplicateChecker messageDuplicateChecker) {
+    this.messageDuplicateChecker = messageDuplicateChecker;
+  }
+
+  /**
+   * 
+   * 设置自定义的{@link WxSessionManager}
+   * 如果不调用该方法,默认使用 {@link me.chanjar.weixin.common.session.StandardSessionManager}
+   * 
+   */
+  public void setSessionManager(WxSessionManager sessionManager) {
+    this.sessionManager = sessionManager;
+  }
+
+  /**
+   * 
+   * 设置自定义的{@link WxErrorExceptionHandler}
+   * 如果不调用该方法,默认使用 {@link LogExceptionHandler}
+   * 
+   */
+  public void setExceptionHandler(WxErrorExceptionHandler exceptionHandler) {
+    this.exceptionHandler = exceptionHandler;
+  }
+
+  List getRules() {
+    return this.rules;
+  }
+
+  /**
+   * 开始一个新的Route规则.
+   */
+  public WxCpTpMessageRouterRule rule() {
+    return new WxCpTpMessageRouterRule(this);
+  }
+
+  /**
+   * 处理微信消息.
+   */
+  public WxCpXmlOutMessage route(final WxCpXmlMessage wxMessage, final Map context) {
+    if (isMsgDuplicated(wxMessage)) {
+      // 如果是重复消息,那么就不做处理
+      return null;
+    }
+
+    final List matchRules = new ArrayList<>();
+    // 收集匹配的规则
+    for (final WxCpTpMessageRouterRule rule : this.rules) {
+      if (rule.test(wxMessage)) {
+        matchRules.add(rule);
+        if (!rule.isReEnter()) {
+          break;
+        }
+      }
+    }
+
+    if (matchRules.size() == 0) {
+      return null;
+    }
+
+    WxCpXmlOutMessage res = null;
+    final List futures = new ArrayList<>();
+    for (final WxCpTpMessageRouterRule rule : matchRules) {
+      // 返回最后一个非异步的rule的执行结果
+      if (rule.isAsync()) {
+        futures.add(
+          this.executorService.submit(() -> {
+            rule.service(wxMessage, context, WxCpTpMessageRouter.this.wxCpService, WxCpTpMessageRouter.this.sessionManager, WxCpTpMessageRouter.this.exceptionHandler);
+          })
+        );
+      } else {
+        res = rule.service(wxMessage, context, this.wxCpService, this.sessionManager, this.exceptionHandler);
+        // 在同步操作结束,session访问结束
+        log.debug("End session access: async=false, sessionId={}", wxMessage.getFromUserName());
+        sessionEndAccess(wxMessage);
+      }
+    }
+
+    if (futures.size() > 0) {
+      this.executorService.submit(() -> {
+        for (Future future : futures) {
+          try {
+            future.get();
+            log.debug("End session access: async=true, sessionId={}", wxMessage.getFromUserName());
+            // 异步操作结束,session访问结束
+            sessionEndAccess(wxMessage);
+          } catch (InterruptedException e) {
+            log.error("Error happened when wait task finish", e);
+            Thread.currentThread().interrupt();
+          } catch (ExecutionException e) {
+            log.error("Error happened when wait task finish", e);
+          }
+        }
+      });
+    }
+    return res;
+  }
+
+  /**
+   * 处理微信消息.
+   */
+  public WxCpXmlOutMessage route(final WxCpXmlMessage wxMessage) {
+    return this.route(wxMessage, new HashMap<>(2));
+  }
+
+  private boolean isMsgDuplicated(WxCpXmlMessage wxMessage) {
+    StringBuilder messageId = new StringBuilder();
+    if (wxMessage.getMsgId() == null) {
+      messageId.append(wxMessage.getCreateTime())
+        .append("-").append(StringUtils.trimToEmpty(String.valueOf(wxMessage.getAgentId())))
+        .append("-").append(wxMessage.getFromUserName())
+        .append("-").append(StringUtils.trimToEmpty(wxMessage.getEventKey()))
+        .append("-").append(StringUtils.trimToEmpty(wxMessage.getEvent()));
+    } else {
+      messageId.append(wxMessage.getMsgId())
+        .append("-").append(wxMessage.getCreateTime())
+        .append("-").append(wxMessage.getFromUserName());
+    }
+
+    if (StringUtils.isNotEmpty(wxMessage.getUserId())) {
+      messageId.append("-").append(wxMessage.getUserId());
+    }
+
+    if (StringUtils.isNotEmpty(wxMessage.getChangeType())) {
+      messageId.append("-").append(wxMessage.getChangeType());
+    }
+
+    return this.messageDuplicateChecker.isDuplicate(messageId.toString());
+  }
+
+  /**
+   * 对session的访问结束.
+   */
+  private void sessionEndAccess(WxCpXmlMessage wxMessage) {
+    InternalSession session = ((InternalSessionManager) this.sessionManager).findSession(wxMessage.getFromUserName());
+    if (session != null) {
+      session.endAccess();
+    }
+
+  }
+}
diff --git a/weixin-java-cp/src/main/java/me/chanjar/weixin/cp/tp/message/WxCpTpMessageRouterRule.java b/weixin-java-cp/src/main/java/me/chanjar/weixin/cp/tp/message/WxCpTpMessageRouterRule.java
new file mode 100644
index 000000000..8494978e2
--- /dev/null
+++ b/weixin-java-cp/src/main/java/me/chanjar/weixin/cp/tp/message/WxCpTpMessageRouterRule.java
@@ -0,0 +1,313 @@
+package me.chanjar.weixin.cp.tp.message;
+
+import lombok.Data;
+import me.chanjar.weixin.common.api.WxErrorExceptionHandler;
+import me.chanjar.weixin.common.error.WxErrorException;
+import me.chanjar.weixin.common.session.WxSessionManager;
+import me.chanjar.weixin.cp.bean.message.WxCpXmlMessage;
+import me.chanjar.weixin.cp.bean.message.WxCpXmlOutMessage;
+import me.chanjar.weixin.cp.message.WxCpMessageMatcher;
+import me.chanjar.weixin.cp.tp.service.WxCpTpService;
+import org.apache.commons.lang3.StringUtils;
+
+import java.util.*;
+import java.util.regex.Pattern;
+
+/**
+ * The type Wx cp message router rule.
+ *
+ * @author Daniel Qian
+ */
+@Data
+public class WxCpTpMessageRouterRule {
+  private final WxCpTpMessageRouter routerBuilder;
+
+  private boolean async = true;
+
+  private String fromUser;
+
+  private String msgType;
+
+  private String event;
+
+  private String eventKey;
+
+  private String eventKeyRegex;
+
+  private String content;
+
+  private String rContent;
+
+  private WxCpMessageMatcher matcher;
+
+  private boolean reEnter = false;
+
+  private Integer agentId;
+
+  private List handlers = new ArrayList<>();
+
+  private List interceptors = new ArrayList<>();
+
+  /**
+   * Instantiates a new Wx cp message router rule.
+   *
+   * @param routerBuilder the router builder
+   */
+  protected WxCpTpMessageRouterRule(WxCpTpMessageRouter routerBuilder) {
+    this.routerBuilder = routerBuilder;
+  }
+
+  /**
+   * 设置是否异步执行,默认是true
+   *
+   * @param async the async
+   * @return the wx cp message router rule
+   */
+  public WxCpTpMessageRouterRule async(boolean async) {
+    this.async = async;
+    return this;
+  }
+
+  /**
+   * 如果agentId匹配
+   *
+   * @param agentId the agent id
+   * @return the wx cp message router rule
+   */
+  public WxCpTpMessageRouterRule agentId(Integer agentId) {
+    this.agentId = agentId;
+    return this;
+  }
+
+  /**
+   * 如果msgType等于某值
+   *
+   * @param msgType the msg type
+   * @return the wx cp message router rule
+   */
+  public WxCpTpMessageRouterRule msgType(String msgType) {
+    this.msgType = msgType;
+    return this;
+  }
+
+  /**
+   * 如果event等于某值
+   *
+   * @param event the event
+   * @return the wx cp message router rule
+   */
+  public WxCpTpMessageRouterRule event(String event) {
+    this.event = event;
+    return this;
+  }
+
+  /**
+   * 如果eventKey等于某值
+   *
+   * @param eventKey the event key
+   * @return the wx cp message router rule
+   */
+  public WxCpTpMessageRouterRule eventKey(String eventKey) {
+    this.eventKey = eventKey;
+    return this;
+  }
+
+  /**
+   * 如果eventKey匹配该正则表达式
+   *
+   * @param regex the regex
+   * @return the wx cp message router rule
+   */
+  public WxCpTpMessageRouterRule eventKeyRegex(String regex) {
+    this.eventKeyRegex = regex;
+    return this;
+  }
+
+  /**
+   * 如果content等于某值
+   *
+   * @param content the content
+   * @return the wx cp message router rule
+   */
+  public WxCpTpMessageRouterRule content(String content) {
+    this.content = content;
+    return this;
+  }
+
+  /**
+   * 如果content匹配该正则表达式
+   *
+   * @param regex the regex
+   * @return the wx cp message router rule
+   */
+  public WxCpTpMessageRouterRule rContent(String regex) {
+    this.rContent = regex;
+    return this;
+  }
+
+  /**
+   * 如果fromUser等于某值
+   *
+   * @param fromUser the from user
+   * @return the wx cp message router rule
+   */
+  public WxCpTpMessageRouterRule fromUser(String fromUser) {
+    this.fromUser = fromUser;
+    return this;
+  }
+
+  /**
+   * 如果消息匹配某个matcher,用在用户需要自定义更复杂的匹配规则的时候
+   *
+   * @param matcher the matcher
+   * @return the wx cp message router rule
+   */
+  public WxCpTpMessageRouterRule matcher(WxCpMessageMatcher matcher) {
+    this.matcher = matcher;
+    return this;
+  }
+
+  /**
+   * 设置微信消息拦截器
+   *
+   * @param interceptor the interceptor
+   * @return the wx cp message router rule
+   */
+  public WxCpTpMessageRouterRule interceptor(WxCpTpMessageInterceptor interceptor) {
+    return interceptor(interceptor, (WxCpTpMessageInterceptor[]) null);
+  }
+
+  /**
+   * 设置微信消息拦截器
+   *
+   * @param interceptor       the interceptor
+   * @param otherInterceptors the other interceptors
+   * @return the wx cp message router rule
+   */
+  public WxCpTpMessageRouterRule interceptor(WxCpTpMessageInterceptor interceptor, WxCpTpMessageInterceptor... otherInterceptors) {
+    this.interceptors.add(interceptor);
+    if (otherInterceptors != null && otherInterceptors.length > 0) {
+      Collections.addAll(this.interceptors, otherInterceptors);
+    }
+    return this;
+  }
+
+  /**
+   * 设置微信消息处理器
+   *
+   * @param handler the handler
+   * @return the wx cp message router rule
+   */
+  public WxCpTpMessageRouterRule handler(WxCpTpMessageHandler handler) {
+    return handler(handler, (WxCpTpMessageHandler[]) null);
+  }
+
+  /**
+   * 设置微信消息处理器
+   *
+   * @param handler       the handler
+   * @param otherHandlers the other handlers
+   * @return the wx cp message router rule
+   */
+  public WxCpTpMessageRouterRule handler(WxCpTpMessageHandler handler, WxCpTpMessageHandler... otherHandlers) {
+    this.handlers.add(handler);
+    if (otherHandlers != null && otherHandlers.length > 0) {
+      Collections.addAll(this.handlers, otherHandlers);
+    }
+    return this;
+  }
+
+  /**
+   * 规则结束,代表如果一个消息匹配该规则,那么它将不再会进入其他规则
+   *
+   * @return the wx cp message router
+   */
+  public WxCpTpMessageRouter end() {
+    this.routerBuilder.getRules().add(this);
+    return this.routerBuilder;
+  }
+
+  /**
+   * 规则结束,但是消息还会进入其他规则
+   *
+   * @return the wx cp message router
+   */
+  public WxCpTpMessageRouter next() {
+    this.reEnter = true;
+    return end();
+  }
+
+  /**
+   * Test boolean.
+   *
+   * @param wxMessage the wx message
+   * @return the boolean
+   */
+  protected boolean test(WxCpXmlMessage wxMessage) {
+    return
+      (this.fromUser == null || this.fromUser.equals(wxMessage.getFromUserName()))
+        &&
+        (this.agentId == null || this.agentId.equals(wxMessage.getAgentId()))
+        &&
+        (this.msgType == null || this.msgType.equalsIgnoreCase(wxMessage.getMsgType()))
+        &&
+        (this.event == null || this.event.equalsIgnoreCase(wxMessage.getEvent()))
+        &&
+        (this.eventKey == null || this.eventKey.equalsIgnoreCase(wxMessage.getEventKey()))
+        &&
+        (this.eventKeyRegex == null || Pattern.matches(this.eventKeyRegex, StringUtils.trimToEmpty(wxMessage.getEventKey())))
+        &&
+        (this.content == null || this.content.equals(StringUtils.trimToNull(wxMessage.getContent())))
+        &&
+        (this.rContent == null || Pattern.matches(this.rContent, StringUtils.trimToEmpty(wxMessage.getContent())))
+        &&
+        (this.matcher == null || this.matcher.match(wxMessage))
+      ;
+  }
+
+  /**
+   * 处理微信推送过来的消息
+   *
+   * @param wxMessage        the wx message
+   * @param context          the context
+   * @param wxCpService      the wx cp service
+   * @param sessionManager   the session manager
+   * @param exceptionHandler the exception handler
+   * @return true 代表继续执行别的router,false 代表停止执行别的router
+   */
+  protected WxCpXmlOutMessage service(WxCpXmlMessage wxMessage,
+                                      Map context,
+                                      WxCpTpService wxCpService,
+                                      WxSessionManager sessionManager,
+                                      WxErrorExceptionHandler exceptionHandler) {
+
+    if (context == null) {
+      context = new HashMap<>(2);
+    }
+
+    try {
+      // 如果拦截器不通过
+      for (WxCpTpMessageInterceptor interceptor : this.interceptors) {
+        if (!interceptor.intercept(wxMessage, context, wxCpService, sessionManager)) {
+          return null;
+        }
+      }
+
+      // 交给handler处理
+      WxCpXmlOutMessage res = null;
+      for (WxCpTpMessageHandler handler : this.handlers) {
+        // 返回最后handler的结果
+        res = handler.handle(wxMessage, context, wxCpService, sessionManager);
+      }
+      return res;
+
+    } catch (WxErrorException e) {
+      exceptionHandler.handle(e);
+    }
+
+    return null;
+
+  }
+
+
+}
diff --git a/weixin-java-cp/src/main/java/me/chanjar/weixin/cp/api/WxCpTpService.java b/weixin-java-cp/src/main/java/me/chanjar/weixin/cp/tp/service/WxCpTpService.java
similarity index 96%
rename from weixin-java-cp/src/main/java/me/chanjar/weixin/cp/api/WxCpTpService.java
rename to weixin-java-cp/src/main/java/me/chanjar/weixin/cp/tp/service/WxCpTpService.java
index 1e3852c50..26da1ddd4 100644
--- a/weixin-java-cp/src/main/java/me/chanjar/weixin/cp/api/WxCpTpService.java
+++ b/weixin-java-cp/src/main/java/me/chanjar/weixin/cp/tp/service/WxCpTpService.java
@@ -1,7 +1,8 @@
-package me.chanjar.weixin.cp.api;
+package me.chanjar.weixin.cp.tp.service;
 
 import me.chanjar.weixin.common.bean.WxAccessToken;
 import me.chanjar.weixin.common.error.WxErrorException;
+import me.chanjar.weixin.common.session.WxSessionManager;
 import me.chanjar.weixin.common.util.http.MediaUploadRequestExecutor;
 import me.chanjar.weixin.common.util.http.RequestExecutor;
 import me.chanjar.weixin.common.util.http.RequestHttp;
@@ -12,7 +13,7 @@ import me.chanjar.weixin.cp.bean.WxCpTpPermanentCodeInfo;
 import me.chanjar.weixin.cp.config.WxCpTpConfigStorage;
 
 /**
- * 微信第三方应用API的Service.
+ * 企业微信第三方应用API的Service.
  *
  * @author zhenjun cai
  */
@@ -226,4 +227,10 @@ public interface WxCpTpService {
    */
   RequestHttp, ?> getRequestHttp();
 
+  /**
+   * 获取WxSessionManager 对象
+   *
+   * @return WxSessionManager session manager
+   */
+  WxSessionManager getSessionManager();
 }
diff --git a/weixin-java-cp/src/main/java/me/chanjar/weixin/cp/api/impl/BaseWxCpTpServiceImpl.java b/weixin-java-cp/src/main/java/me/chanjar/weixin/cp/tp/service/impl/BaseWxCpTpServiceImpl.java
similarity index 96%
rename from weixin-java-cp/src/main/java/me/chanjar/weixin/cp/api/impl/BaseWxCpTpServiceImpl.java
rename to weixin-java-cp/src/main/java/me/chanjar/weixin/cp/tp/service/impl/BaseWxCpTpServiceImpl.java
index 0305692b5..859fc1648 100644
--- a/weixin-java-cp/src/main/java/me/chanjar/weixin/cp/api/impl/BaseWxCpTpServiceImpl.java
+++ b/weixin-java-cp/src/main/java/me/chanjar/weixin/cp/tp/service/impl/BaseWxCpTpServiceImpl.java
@@ -1,4 +1,4 @@
-package me.chanjar.weixin.cp.api.impl;
+package me.chanjar.weixin.cp.tp.service.impl;
 
 import com.google.common.base.Joiner;
 import com.google.gson.JsonObject;
@@ -9,6 +9,8 @@ import me.chanjar.weixin.common.enums.WxType;
 import me.chanjar.weixin.common.error.WxCpErrorMsgEnum;
 import me.chanjar.weixin.common.error.WxError;
 import me.chanjar.weixin.common.error.WxErrorException;
+import me.chanjar.weixin.common.session.StandardSessionManager;
+import me.chanjar.weixin.common.session.WxSessionManager;
 import me.chanjar.weixin.common.util.DataUtils;
 import me.chanjar.weixin.common.util.crypto.SHA1;
 import me.chanjar.weixin.common.util.http.RequestExecutor;
@@ -16,9 +18,9 @@ import me.chanjar.weixin.common.util.http.RequestHttp;
 import me.chanjar.weixin.common.util.http.SimpleGetRequestExecutor;
 import me.chanjar.weixin.common.util.http.SimplePostRequestExecutor;
 import me.chanjar.weixin.common.util.json.GsonParser;
-import me.chanjar.weixin.cp.api.WxCpTpService;
 import me.chanjar.weixin.cp.bean.*;
 import me.chanjar.weixin.cp.config.WxCpTpConfigStorage;
+import me.chanjar.weixin.cp.tp.service.WxCpTpService;
 import org.apache.commons.lang3.StringUtils;
 
 import java.io.File;
@@ -49,6 +51,8 @@ public abstract class BaseWxCpTpServiceImpl implements WxCpTpService, Requ
 
   protected WxCpTpConfigStorage configStorage;
 
+  private WxSessionManager sessionManager = new StandardSessionManager();
+
   /**
    * 临时文件目录.
    */
@@ -274,4 +278,9 @@ public abstract class BaseWxCpTpServiceImpl implements WxCpTpService, Requ
     return this;
   }
 
+  @Override
+  public WxSessionManager getSessionManager() {
+    return this.sessionManager;
+  }
+
 }
diff --git a/weixin-java-cp/src/main/java/me/chanjar/weixin/cp/api/impl/WxCpTpServiceApacheHttpClientImpl.java b/weixin-java-cp/src/main/java/me/chanjar/weixin/cp/tp/service/impl/WxCpTpServiceApacheHttpClientImpl.java
similarity index 98%
rename from weixin-java-cp/src/main/java/me/chanjar/weixin/cp/api/impl/WxCpTpServiceApacheHttpClientImpl.java
rename to weixin-java-cp/src/main/java/me/chanjar/weixin/cp/tp/service/impl/WxCpTpServiceApacheHttpClientImpl.java
index cdc6b2cfc..ec53789e6 100644
--- a/weixin-java-cp/src/main/java/me/chanjar/weixin/cp/api/impl/WxCpTpServiceApacheHttpClientImpl.java
+++ b/weixin-java-cp/src/main/java/me/chanjar/weixin/cp/tp/service/impl/WxCpTpServiceApacheHttpClientImpl.java
@@ -1,4 +1,4 @@
-package me.chanjar.weixin.cp.api.impl;
+package me.chanjar.weixin.cp.tp.service.impl;
 
 
 import com.google.gson.JsonObject;
diff --git a/weixin-java-cp/src/main/java/me/chanjar/weixin/cp/api/impl/WxCpTpServiceImpl.java b/weixin-java-cp/src/main/java/me/chanjar/weixin/cp/tp/service/impl/WxCpTpServiceImpl.java
similarity index 82%
rename from weixin-java-cp/src/main/java/me/chanjar/weixin/cp/api/impl/WxCpTpServiceImpl.java
rename to weixin-java-cp/src/main/java/me/chanjar/weixin/cp/tp/service/impl/WxCpTpServiceImpl.java
index f5582021e..58fb09cf9 100644
--- a/weixin-java-cp/src/main/java/me/chanjar/weixin/cp/api/impl/WxCpTpServiceImpl.java
+++ b/weixin-java-cp/src/main/java/me/chanjar/weixin/cp/tp/service/impl/WxCpTpServiceImpl.java
@@ -1,4 +1,4 @@
-package me.chanjar.weixin.cp.api.impl;
+package me.chanjar.weixin.cp.tp.service.impl;
 
 /**
  * 
diff --git a/weixin-java-cp/src/test/java/me/chanjar/weixin/cp/api/impl/BaseWxCpTpServiceImplTest.java b/weixin-java-cp/src/test/java/me/chanjar/weixin/cp/tp/service/impl/BaseWxCpTpServiceImplTest.java
similarity index 91%
rename from weixin-java-cp/src/test/java/me/chanjar/weixin/cp/api/impl/BaseWxCpTpServiceImplTest.java
rename to weixin-java-cp/src/test/java/me/chanjar/weixin/cp/tp/service/impl/BaseWxCpTpServiceImplTest.java
index 9f7973561..83ace79f3 100644
--- a/weixin-java-cp/src/test/java/me/chanjar/weixin/cp/api/impl/BaseWxCpTpServiceImplTest.java
+++ b/weixin-java-cp/src/test/java/me/chanjar/weixin/cp/tp/service/impl/BaseWxCpTpServiceImplTest.java
@@ -1,20 +1,20 @@
-package me.chanjar.weixin.cp.api.impl;
+package me.chanjar.weixin.cp.tp.service.impl;
 
 import com.google.gson.JsonObject;
 import me.chanjar.weixin.common.error.WxErrorException;
-import me.chanjar.weixin.cp.api.WxCpTpService;
 import me.chanjar.weixin.cp.bean.WxCpTpAuthInfo;
 import me.chanjar.weixin.cp.bean.WxCpTpCorp;
 import me.chanjar.weixin.cp.bean.WxCpTpPermanentCodeInfo;
 import me.chanjar.weixin.cp.config.WxCpTpConfigStorage;
 import me.chanjar.weixin.cp.config.impl.WxCpTpDefaultConfigImpl;
+import me.chanjar.weixin.cp.tp.service.WxCpTpService;
+import org.mockito.Mockito;
+import org.testng.Assert;
 import org.testng.annotations.Test;
 
 import static me.chanjar.weixin.cp.constant.WxCpApiPathConsts.Tp.GET_AUTH_INFO;
 import static me.chanjar.weixin.cp.constant.WxCpApiPathConsts.Tp.GET_PERMANENT_CODE;
 import static org.assertj.core.api.Assertions.assertThat;
-import static org.mockito.Mockito.*;
-import static org.testng.Assert.*;
 
 /**
  * 测试代码.
@@ -23,7 +23,7 @@ import static org.testng.Assert.*;
  * @date 2019-08-18
  */
 public class BaseWxCpTpServiceImplTest {
-  private WxCpTpService tpService = spy(new WxCpTpServiceImpl());
+  private final WxCpTpService tpService = Mockito.spy(new WxCpTpServiceImpl());
 
   @Test
   public void testCheckSignature() {
@@ -123,7 +123,7 @@ public class BaseWxCpTpServiceImplTest {
     JsonObject jsonObject = new JsonObject();
     String authCode = "";
     jsonObject.addProperty("auth_code", authCode);
-    doReturn(returnJson).when(tpService).post(configStorage.getApiUrl(GET_PERMANENT_CODE), jsonObject.toString());
+    Mockito.doReturn(returnJson).when(tpService).post(configStorage.getApiUrl(GET_PERMANENT_CODE), jsonObject.toString());
 
     final WxCpTpCorp tpCorp = tpService.getPermanentCode(authCode);
     assertThat(tpCorp.getPermanentCode()).isEqualTo("xxxx");
@@ -134,7 +134,7 @@ public class BaseWxCpTpServiceImplTest {
   }
 
   @Test
-  public void testGetPermanentCodeInfo() throws WxErrorException{
+  public void testGetPermanentCodeInfo() throws WxErrorException {
     String returnJson = "{\n" +
       "  \"access_token\": \"u6SoEWyrEmworJ1uNzddbiXh42mCLNU_mdd6b01Afo2LKmyi-WdaaYqhEGFZjB1RGZ-rhjLcAJ86ger7b7Q0gowSw9iIDR8dm49aVH_MztzmQttP3XFG7np1Dxs_VQkVwhhRmfRpEonAmK1_JWIFqayJXXiPUS3LsFd3tWpE7rxmsRa7Ev2ml2htbRp_qGUjtFTErKoDsnNGSka6_RqFPA\", \n" +
       "  \"expires_in\": 7200, \n" +
@@ -187,15 +187,15 @@ public class BaseWxCpTpServiceImplTest {
     JsonObject jsonObject = new JsonObject();
     String authCode = "";
     jsonObject.addProperty("auth_code", authCode);
-    doReturn(returnJson).when(tpService).post(configStorage.getApiUrl(GET_PERMANENT_CODE), jsonObject.toString());
+    Mockito.doReturn(returnJson).when(tpService).post(configStorage.getApiUrl(GET_PERMANENT_CODE), jsonObject.toString());
     final WxCpTpPermanentCodeInfo tpPermanentCodeInfo = tpService.getPermanentCodeInfo(authCode);
     assertThat(tpPermanentCodeInfo.getAuthInfo().getAgents().get(0).getAgentId()).isEqualTo(1000012);
-    assertNotNull(tpPermanentCodeInfo.getAuthInfo().getAgents().get(0).getSquareLogoUrl());
-    assertNotNull(tpPermanentCodeInfo.getAuthCorpInfo().getCorpSquareLogoUrl());
+    Assert.assertNotNull(tpPermanentCodeInfo.getAuthInfo().getAgents().get(0).getSquareLogoUrl());
+    Assert.assertNotNull(tpPermanentCodeInfo.getAuthCorpInfo().getCorpSquareLogoUrl());
   }
 
   @Test
-  public void testGetAuthInfo() throws WxErrorException{
+  public void testGetAuthInfo() throws WxErrorException {
     String returnJson = "{\n" +
       "    \"errcode\":0 ,\n" +
       "    \"errmsg\":\"ok\" ,\n" +
@@ -260,9 +260,9 @@ public class BaseWxCpTpServiceImplTest {
     String permanentCode = "xxxxx";
     jsonObject.addProperty("auth_corpid", authCorpId);
     jsonObject.addProperty("permanent_code", permanentCode);
-    doReturn(returnJson).when(tpService).post(configStorage.getApiUrl(GET_AUTH_INFO), jsonObject.toString());
-    WxCpTpAuthInfo authInfo = tpService.getAuthInfo(authCorpId,permanentCode);
-    assertNotNull(authInfo.getAuthCorpInfo().getCorpId());
+    Mockito.doReturn(returnJson).when(tpService).post(configStorage.getApiUrl(GET_AUTH_INFO), jsonObject.toString());
+    WxCpTpAuthInfo authInfo = tpService.getAuthInfo(authCorpId, permanentCode);
+    Assert.assertNotNull(authInfo.getAuthCorpInfo().getCorpId());
   }
 
   @Test
diff --git a/weixin-java-mp/src/main/java/me/chanjar/weixin/mp/api/WxMpMessageRouter.java b/weixin-java-mp/src/main/java/me/chanjar/weixin/mp/api/WxMpMessageRouter.java
index 9b2ca03fe..36556d984 100644
--- a/weixin-java-mp/src/main/java/me/chanjar/weixin/mp/api/WxMpMessageRouter.java
+++ b/weixin-java-mp/src/main/java/me/chanjar/weixin/mp/api/WxMpMessageRouter.java
@@ -1,5 +1,6 @@
 package me.chanjar.weixin.mp.api;
 
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
 import me.chanjar.weixin.common.api.WxErrorExceptionHandler;
 import me.chanjar.weixin.common.api.WxMessageDuplicateChecker;
 import me.chanjar.weixin.common.api.WxMessageInMemoryDuplicateChecker;
@@ -18,10 +19,7 @@ import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
-import java.util.concurrent.ExecutionException;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.Future;
+import java.util.concurrent.*;
 
 /**
  * 
@@ -68,7 +66,9 @@ public class WxMpMessageRouter {
 
   public WxMpMessageRouter(WxMpService wxMpService) {
     this.wxMpService = wxMpService;
-    this.executorService = Executors.newFixedThreadPool(DEFAULT_THREAD_POOL_SIZE);
+    ThreadFactory namedThreadFactory = new ThreadFactoryBuilder().setNameFormat("WxMpMessageRouter-pool-%d").build();
+    this.executorService = new ThreadPoolExecutor(DEFAULT_THREAD_POOL_SIZE, DEFAULT_THREAD_POOL_SIZE,
+      0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<>(), namedThreadFactory);
     this.messageDuplicateChecker = new WxMessageInMemoryDuplicateChecker();
     this.sessionManager = new StandardSessionManager();
     this.exceptionHandler = new LogExceptionHandler();