From cdda57d4e1088bc069b28298005e9a0bb970b8bd Mon Sep 17 00:00:00 2001 From: Binary Wang Date: Sun, 20 Sep 2020 14:18:28 +0800 Subject: [PATCH] =?UTF-8?q?:art:=20#1646=20=E4=BC=81=E4=B8=9A=E5=BE=AE?= =?UTF-8?q?=E4=BF=A1=E7=AC=AC=E4=B8=89=E6=96=B9=E5=BA=94=E7=94=A8=EF=BC=88?= =?UTF-8?q?=E6=9C=8D=E5=8A=A1=E5=95=86=EF=BC=89=E6=A8=A1=E5=9D=97=E9=87=8D?= =?UTF-8?q?=E6=9E=84=E5=AE=9E=E7=8E=B0=EF=BC=8C=E5=B9=B6=E6=8F=90=E4=BE=9B?= =?UTF-8?q?Router=E3=80=81Interceptor=E3=80=81Handler=E7=AD=89=E6=8E=A5?= =?UTF-8?q?=E5=8F=A3?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../common/util/LogExceptionHandler.java | 15 +- .../cp/api/impl/WxCpServiceOnTpImpl.java | 2 +- .../weixin/cp/message/WxCpMessageRouter.java | 10 +- .../cp/tp/message/WxCpTpMessageHandler.java | 33 ++ .../tp/message/WxCpTpMessageInterceptor.java | 32 ++ .../cp/tp/message/WxCpTpMessageMatcher.java | 20 ++ .../cp/tp/message/WxCpTpMessageRouter.java | 235 +++++++++++++ .../tp/message/WxCpTpMessageRouterRule.java | 313 ++++++++++++++++++ .../cp/{api => tp/service}/WxCpTpService.java | 11 +- .../service}/impl/BaseWxCpTpServiceImpl.java | 13 +- .../WxCpTpServiceApacheHttpClientImpl.java | 2 +- .../service}/impl/WxCpTpServiceImpl.java | 2 +- .../impl/BaseWxCpTpServiceImplTest.java | 28 +- .../weixin/mp/api/WxMpMessageRouter.java | 10 +- 14 files changed, 686 insertions(+), 40 deletions(-) create mode 100644 weixin-java-cp/src/main/java/me/chanjar/weixin/cp/tp/message/WxCpTpMessageHandler.java create mode 100644 weixin-java-cp/src/main/java/me/chanjar/weixin/cp/tp/message/WxCpTpMessageInterceptor.java create mode 100644 weixin-java-cp/src/main/java/me/chanjar/weixin/cp/tp/message/WxCpTpMessageMatcher.java create mode 100644 weixin-java-cp/src/main/java/me/chanjar/weixin/cp/tp/message/WxCpTpMessageRouter.java create mode 100644 weixin-java-cp/src/main/java/me/chanjar/weixin/cp/tp/message/WxCpTpMessageRouterRule.java rename weixin-java-cp/src/main/java/me/chanjar/weixin/cp/{api => tp/service}/WxCpTpService.java (96%) rename weixin-java-cp/src/main/java/me/chanjar/weixin/cp/{api => tp/service}/impl/BaseWxCpTpServiceImpl.java (96%) rename weixin-java-cp/src/main/java/me/chanjar/weixin/cp/{api => tp/service}/impl/WxCpTpServiceApacheHttpClientImpl.java (98%) rename weixin-java-cp/src/main/java/me/chanjar/weixin/cp/{api => tp/service}/impl/WxCpTpServiceImpl.java (82%) rename weixin-java-cp/src/test/java/me/chanjar/weixin/cp/{api => tp/service}/impl/BaseWxCpTpServiceImplTest.java (91%) diff --git a/weixin-java-common/src/main/java/me/chanjar/weixin/common/util/LogExceptionHandler.java b/weixin-java-common/src/main/java/me/chanjar/weixin/common/util/LogExceptionHandler.java index 7487a0fe2..35b0eea82 100644 --- a/weixin-java-common/src/main/java/me/chanjar/weixin/common/util/LogExceptionHandler.java +++ b/weixin-java-common/src/main/java/me/chanjar/weixin/common/util/LogExceptionHandler.java @@ -1,20 +1,17 @@ package me.chanjar.weixin.common.util; +import lombok.extern.slf4j.Slf4j; import me.chanjar.weixin.common.api.WxErrorExceptionHandler; import me.chanjar.weixin.common.error.WxErrorException; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - +/** + * @author Daniel Qian + */ +@Slf4j public class LogExceptionHandler implements WxErrorExceptionHandler { - - private Logger log = LoggerFactory.getLogger(WxErrorExceptionHandler.class); - @Override public void handle(WxErrorException e) { - - this.log.error("Error happens", e); - + log.error("Error happens", e); } } diff --git a/weixin-java-cp/src/main/java/me/chanjar/weixin/cp/api/impl/WxCpServiceOnTpImpl.java b/weixin-java-cp/src/main/java/me/chanjar/weixin/cp/api/impl/WxCpServiceOnTpImpl.java index 35eab626a..aa30385d6 100644 --- a/weixin-java-cp/src/main/java/me/chanjar/weixin/cp/api/impl/WxCpServiceOnTpImpl.java +++ b/weixin-java-cp/src/main/java/me/chanjar/weixin/cp/api/impl/WxCpServiceOnTpImpl.java @@ -3,7 +3,7 @@ package me.chanjar.weixin.cp.api.impl; import lombok.RequiredArgsConstructor; import me.chanjar.weixin.common.bean.WxAccessToken; import me.chanjar.weixin.common.error.WxErrorException; -import me.chanjar.weixin.cp.api.WxCpTpService; +import me.chanjar.weixin.cp.tp.service.WxCpTpService; /** *
diff --git a/weixin-java-cp/src/main/java/me/chanjar/weixin/cp/message/WxCpMessageRouter.java b/weixin-java-cp/src/main/java/me/chanjar/weixin/cp/message/WxCpMessageRouter.java
index fca432279..92de0c238 100644
--- a/weixin-java-cp/src/main/java/me/chanjar/weixin/cp/message/WxCpMessageRouter.java
+++ b/weixin-java-cp/src/main/java/me/chanjar/weixin/cp/message/WxCpMessageRouter.java
@@ -1,5 +1,6 @@
 package me.chanjar.weixin.cp.message;
 
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
 import lombok.extern.slf4j.Slf4j;
 import me.chanjar.weixin.common.api.WxErrorExceptionHandler;
 import me.chanjar.weixin.common.api.WxMessageDuplicateChecker;
@@ -17,10 +18,7 @@ import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
-import java.util.concurrent.ExecutionException;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.Future;
+import java.util.concurrent.*;
 
 /**
  * 
@@ -70,7 +68,9 @@ public class WxCpMessageRouter {
    */
   public WxCpMessageRouter(WxCpService wxCpService) {
     this.wxCpService = wxCpService;
-    this.executorService = Executors.newFixedThreadPool(DEFAULT_THREAD_POOL_SIZE);
+    ThreadFactory namedThreadFactory = new ThreadFactoryBuilder().setNameFormat("WxCpMessageRouter-pool-%d").build();
+    this.executorService = new ThreadPoolExecutor(DEFAULT_THREAD_POOL_SIZE, DEFAULT_THREAD_POOL_SIZE,
+      0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<>(), namedThreadFactory);
     this.messageDuplicateChecker = new WxMessageInMemoryDuplicateChecker();
     this.sessionManager = wxCpService.getSessionManager();
     this.exceptionHandler = new LogExceptionHandler();
diff --git a/weixin-java-cp/src/main/java/me/chanjar/weixin/cp/tp/message/WxCpTpMessageHandler.java b/weixin-java-cp/src/main/java/me/chanjar/weixin/cp/tp/message/WxCpTpMessageHandler.java
new file mode 100644
index 000000000..9ab718180
--- /dev/null
+++ b/weixin-java-cp/src/main/java/me/chanjar/weixin/cp/tp/message/WxCpTpMessageHandler.java
@@ -0,0 +1,33 @@
+package me.chanjar.weixin.cp.tp.message;
+
+import me.chanjar.weixin.common.error.WxErrorException;
+import me.chanjar.weixin.common.session.WxSessionManager;
+import me.chanjar.weixin.cp.bean.message.WxCpXmlMessage;
+import me.chanjar.weixin.cp.bean.message.WxCpXmlOutMessage;
+import me.chanjar.weixin.cp.tp.service.WxCpTpService;
+
+import java.util.Map;
+
+/**
+ * 处理微信推送消息的处理器接口
+ *
+ * @author Daniel Qian
+ */
+public interface WxCpTpMessageHandler {
+
+  /**
+   * Handle wx cp xml out message.
+   *
+   * @param wxMessage      the wx message
+   * @param context        上下文,如果handler或interceptor之间有信息要传递,可以用这个
+   * @param wxCpService    the wx cp service
+   * @param sessionManager the session manager
+   * @return xml格式的消息 ,如果在异步规则里处理的话,可以返回null
+   * @throws WxErrorException the wx error exception
+   */
+  WxCpXmlOutMessage handle(WxCpXmlMessage wxMessage,
+                           Map context,
+                           WxCpTpService wxCpService,
+                           WxSessionManager sessionManager) throws WxErrorException;
+
+}
diff --git a/weixin-java-cp/src/main/java/me/chanjar/weixin/cp/tp/message/WxCpTpMessageInterceptor.java b/weixin-java-cp/src/main/java/me/chanjar/weixin/cp/tp/message/WxCpTpMessageInterceptor.java
new file mode 100644
index 000000000..fe5ceefa0
--- /dev/null
+++ b/weixin-java-cp/src/main/java/me/chanjar/weixin/cp/tp/message/WxCpTpMessageInterceptor.java
@@ -0,0 +1,32 @@
+package me.chanjar.weixin.cp.tp.message;
+
+import me.chanjar.weixin.common.error.WxErrorException;
+import me.chanjar.weixin.common.session.WxSessionManager;
+import me.chanjar.weixin.cp.bean.message.WxCpXmlMessage;
+import me.chanjar.weixin.cp.tp.service.WxCpTpService;
+
+import java.util.Map;
+
+/**
+ * 微信消息拦截器,可以用来做验证
+ *
+ * @author Daniel Qian
+ */
+public interface WxCpTpMessageInterceptor {
+
+  /**
+   * 拦截微信消息
+   *
+   * @param wxMessage      the wx message
+   * @param context        上下文,如果handler或interceptor之间有信息要传递,可以用这个
+   * @param wxCpService    the wx cp service
+   * @param sessionManager the session manager
+   * @return true代表OK ,false代表不OK
+   * @throws WxErrorException the wx error exception
+   */
+  boolean intercept(WxCpXmlMessage wxMessage,
+                    Map context,
+                    WxCpTpService wxCpService,
+                    WxSessionManager sessionManager) throws WxErrorException;
+
+}
diff --git a/weixin-java-cp/src/main/java/me/chanjar/weixin/cp/tp/message/WxCpTpMessageMatcher.java b/weixin-java-cp/src/main/java/me/chanjar/weixin/cp/tp/message/WxCpTpMessageMatcher.java
new file mode 100644
index 000000000..8f7decf4b
--- /dev/null
+++ b/weixin-java-cp/src/main/java/me/chanjar/weixin/cp/tp/message/WxCpTpMessageMatcher.java
@@ -0,0 +1,20 @@
+package me.chanjar.weixin.cp.tp.message;
+
+import me.chanjar.weixin.cp.bean.message.WxCpXmlMessage;
+
+/**
+ * 消息匹配器,用在消息路由的时候
+ *
+ * @author Daniel Qian
+ */
+public interface WxCpTpMessageMatcher {
+
+  /**
+   * 消息是否匹配某种模式
+   *
+   * @param message the message
+   * @return the boolean
+   */
+  boolean match(WxCpXmlMessage message);
+
+}
diff --git a/weixin-java-cp/src/main/java/me/chanjar/weixin/cp/tp/message/WxCpTpMessageRouter.java b/weixin-java-cp/src/main/java/me/chanjar/weixin/cp/tp/message/WxCpTpMessageRouter.java
new file mode 100644
index 000000000..147f324db
--- /dev/null
+++ b/weixin-java-cp/src/main/java/me/chanjar/weixin/cp/tp/message/WxCpTpMessageRouter.java
@@ -0,0 +1,235 @@
+package me.chanjar.weixin.cp.tp.message;
+
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
+import lombok.extern.slf4j.Slf4j;
+import me.chanjar.weixin.common.api.WxErrorExceptionHandler;
+import me.chanjar.weixin.common.api.WxMessageDuplicateChecker;
+import me.chanjar.weixin.common.api.WxMessageInMemoryDuplicateChecker;
+import me.chanjar.weixin.common.session.InternalSession;
+import me.chanjar.weixin.common.session.InternalSessionManager;
+import me.chanjar.weixin.common.session.WxSessionManager;
+import me.chanjar.weixin.common.util.LogExceptionHandler;
+import me.chanjar.weixin.cp.bean.message.WxCpXmlMessage;
+import me.chanjar.weixin.cp.bean.message.WxCpXmlOutMessage;
+import me.chanjar.weixin.cp.message.WxCpMessageRouterRule;
+import me.chanjar.weixin.cp.tp.service.WxCpTpService;
+import org.apache.commons.lang3.StringUtils;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.*;
+
+/**
+ * 
+ * 微信消息路由器,通过代码化的配置,把来自微信的消息交给handler处理
+ *
+ * 说明:
+ * 1. 配置路由规则时要按照从细到粗的原则,否则可能消息可能会被提前处理
+ * 2. 默认情况下消息只会被处理一次,除非使用 {@link WxCpMessageRouterRule#next()}
+ * 3. 规则的结束必须用{@link WxCpMessageRouterRule#end()}或者{@link WxCpMessageRouterRule#next()},否则不会生效
+ *
+ * 使用方法:
+ * WxCpMessageRouter router = new WxCpMessageRouter();
+ * router
+ *   .rule()
+ *       .msgType("MSG_TYPE").event("EVENT").eventKey("EVENT_KEY").content("CONTENT")
+ *       .interceptor(interceptor, ...).handler(handler, ...)
+ *   .end()
+ *   .rule()
+ *       // 另外一个匹配规则
+ *   .end()
+ * ;
+ *
+ * // 将WxXmlMessage交给消息路由器
+ * router.route(message);
+ *
+ * 
+ * + * @author Daniel Qian + */ +@Slf4j +public class WxCpTpMessageRouter { + private static final int DEFAULT_THREAD_POOL_SIZE = 100; + private final List rules = new ArrayList<>(); + + private final WxCpTpService wxCpService; + + private ExecutorService executorService; + + private WxMessageDuplicateChecker messageDuplicateChecker; + + private WxSessionManager sessionManager; + + private WxErrorExceptionHandler exceptionHandler; + + /** + * 构造方法. + */ + public WxCpTpMessageRouter(WxCpTpService wxCpService) { + this.wxCpService = wxCpService; + ThreadFactory namedThreadFactory = new ThreadFactoryBuilder().setNameFormat("WxCpTpMessageRouter-pool-%d").build(); + this.executorService = new ThreadPoolExecutor(DEFAULT_THREAD_POOL_SIZE, DEFAULT_THREAD_POOL_SIZE, + 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<>(), namedThreadFactory); + this.messageDuplicateChecker = new WxMessageInMemoryDuplicateChecker(); + this.sessionManager = wxCpService.getSessionManager(); + this.exceptionHandler = new LogExceptionHandler(); + } + + /** + *
+   * 设置自定义的 {@link ExecutorService}
+   * 如果不调用该方法,默认使用 Executors.newFixedThreadPool(100)
+   * 
+ */ + public void setExecutorService(ExecutorService executorService) { + this.executorService = executorService; + } + + /** + *
+   * 设置自定义的 {@link WxMessageDuplicateChecker}
+   * 如果不调用该方法,默认使用 {@link WxMessageInMemoryDuplicateChecker}
+   * 
+ */ + public void setMessageDuplicateChecker(WxMessageDuplicateChecker messageDuplicateChecker) { + this.messageDuplicateChecker = messageDuplicateChecker; + } + + /** + *
+   * 设置自定义的{@link WxSessionManager}
+   * 如果不调用该方法,默认使用 {@link me.chanjar.weixin.common.session.StandardSessionManager}
+   * 
+ */ + public void setSessionManager(WxSessionManager sessionManager) { + this.sessionManager = sessionManager; + } + + /** + *
+   * 设置自定义的{@link WxErrorExceptionHandler}
+   * 如果不调用该方法,默认使用 {@link LogExceptionHandler}
+   * 
+ */ + public void setExceptionHandler(WxErrorExceptionHandler exceptionHandler) { + this.exceptionHandler = exceptionHandler; + } + + List getRules() { + return this.rules; + } + + /** + * 开始一个新的Route规则. + */ + public WxCpTpMessageRouterRule rule() { + return new WxCpTpMessageRouterRule(this); + } + + /** + * 处理微信消息. + */ + public WxCpXmlOutMessage route(final WxCpXmlMessage wxMessage, final Map context) { + if (isMsgDuplicated(wxMessage)) { + // 如果是重复消息,那么就不做处理 + return null; + } + + final List matchRules = new ArrayList<>(); + // 收集匹配的规则 + for (final WxCpTpMessageRouterRule rule : this.rules) { + if (rule.test(wxMessage)) { + matchRules.add(rule); + if (!rule.isReEnter()) { + break; + } + } + } + + if (matchRules.size() == 0) { + return null; + } + + WxCpXmlOutMessage res = null; + final List futures = new ArrayList<>(); + for (final WxCpTpMessageRouterRule rule : matchRules) { + // 返回最后一个非异步的rule的执行结果 + if (rule.isAsync()) { + futures.add( + this.executorService.submit(() -> { + rule.service(wxMessage, context, WxCpTpMessageRouter.this.wxCpService, WxCpTpMessageRouter.this.sessionManager, WxCpTpMessageRouter.this.exceptionHandler); + }) + ); + } else { + res = rule.service(wxMessage, context, this.wxCpService, this.sessionManager, this.exceptionHandler); + // 在同步操作结束,session访问结束 + log.debug("End session access: async=false, sessionId={}", wxMessage.getFromUserName()); + sessionEndAccess(wxMessage); + } + } + + if (futures.size() > 0) { + this.executorService.submit(() -> { + for (Future future : futures) { + try { + future.get(); + log.debug("End session access: async=true, sessionId={}", wxMessage.getFromUserName()); + // 异步操作结束,session访问结束 + sessionEndAccess(wxMessage); + } catch (InterruptedException e) { + log.error("Error happened when wait task finish", e); + Thread.currentThread().interrupt(); + } catch (ExecutionException e) { + log.error("Error happened when wait task finish", e); + } + } + }); + } + return res; + } + + /** + * 处理微信消息. + */ + public WxCpXmlOutMessage route(final WxCpXmlMessage wxMessage) { + return this.route(wxMessage, new HashMap<>(2)); + } + + private boolean isMsgDuplicated(WxCpXmlMessage wxMessage) { + StringBuilder messageId = new StringBuilder(); + if (wxMessage.getMsgId() == null) { + messageId.append(wxMessage.getCreateTime()) + .append("-").append(StringUtils.trimToEmpty(String.valueOf(wxMessage.getAgentId()))) + .append("-").append(wxMessage.getFromUserName()) + .append("-").append(StringUtils.trimToEmpty(wxMessage.getEventKey())) + .append("-").append(StringUtils.trimToEmpty(wxMessage.getEvent())); + } else { + messageId.append(wxMessage.getMsgId()) + .append("-").append(wxMessage.getCreateTime()) + .append("-").append(wxMessage.getFromUserName()); + } + + if (StringUtils.isNotEmpty(wxMessage.getUserId())) { + messageId.append("-").append(wxMessage.getUserId()); + } + + if (StringUtils.isNotEmpty(wxMessage.getChangeType())) { + messageId.append("-").append(wxMessage.getChangeType()); + } + + return this.messageDuplicateChecker.isDuplicate(messageId.toString()); + } + + /** + * 对session的访问结束. + */ + private void sessionEndAccess(WxCpXmlMessage wxMessage) { + InternalSession session = ((InternalSessionManager) this.sessionManager).findSession(wxMessage.getFromUserName()); + if (session != null) { + session.endAccess(); + } + + } +} diff --git a/weixin-java-cp/src/main/java/me/chanjar/weixin/cp/tp/message/WxCpTpMessageRouterRule.java b/weixin-java-cp/src/main/java/me/chanjar/weixin/cp/tp/message/WxCpTpMessageRouterRule.java new file mode 100644 index 000000000..8494978e2 --- /dev/null +++ b/weixin-java-cp/src/main/java/me/chanjar/weixin/cp/tp/message/WxCpTpMessageRouterRule.java @@ -0,0 +1,313 @@ +package me.chanjar.weixin.cp.tp.message; + +import lombok.Data; +import me.chanjar.weixin.common.api.WxErrorExceptionHandler; +import me.chanjar.weixin.common.error.WxErrorException; +import me.chanjar.weixin.common.session.WxSessionManager; +import me.chanjar.weixin.cp.bean.message.WxCpXmlMessage; +import me.chanjar.weixin.cp.bean.message.WxCpXmlOutMessage; +import me.chanjar.weixin.cp.message.WxCpMessageMatcher; +import me.chanjar.weixin.cp.tp.service.WxCpTpService; +import org.apache.commons.lang3.StringUtils; + +import java.util.*; +import java.util.regex.Pattern; + +/** + * The type Wx cp message router rule. + * + * @author Daniel Qian + */ +@Data +public class WxCpTpMessageRouterRule { + private final WxCpTpMessageRouter routerBuilder; + + private boolean async = true; + + private String fromUser; + + private String msgType; + + private String event; + + private String eventKey; + + private String eventKeyRegex; + + private String content; + + private String rContent; + + private WxCpMessageMatcher matcher; + + private boolean reEnter = false; + + private Integer agentId; + + private List handlers = new ArrayList<>(); + + private List interceptors = new ArrayList<>(); + + /** + * Instantiates a new Wx cp message router rule. + * + * @param routerBuilder the router builder + */ + protected WxCpTpMessageRouterRule(WxCpTpMessageRouter routerBuilder) { + this.routerBuilder = routerBuilder; + } + + /** + * 设置是否异步执行,默认是true + * + * @param async the async + * @return the wx cp message router rule + */ + public WxCpTpMessageRouterRule async(boolean async) { + this.async = async; + return this; + } + + /** + * 如果agentId匹配 + * + * @param agentId the agent id + * @return the wx cp message router rule + */ + public WxCpTpMessageRouterRule agentId(Integer agentId) { + this.agentId = agentId; + return this; + } + + /** + * 如果msgType等于某值 + * + * @param msgType the msg type + * @return the wx cp message router rule + */ + public WxCpTpMessageRouterRule msgType(String msgType) { + this.msgType = msgType; + return this; + } + + /** + * 如果event等于某值 + * + * @param event the event + * @return the wx cp message router rule + */ + public WxCpTpMessageRouterRule event(String event) { + this.event = event; + return this; + } + + /** + * 如果eventKey等于某值 + * + * @param eventKey the event key + * @return the wx cp message router rule + */ + public WxCpTpMessageRouterRule eventKey(String eventKey) { + this.eventKey = eventKey; + return this; + } + + /** + * 如果eventKey匹配该正则表达式 + * + * @param regex the regex + * @return the wx cp message router rule + */ + public WxCpTpMessageRouterRule eventKeyRegex(String regex) { + this.eventKeyRegex = regex; + return this; + } + + /** + * 如果content等于某值 + * + * @param content the content + * @return the wx cp message router rule + */ + public WxCpTpMessageRouterRule content(String content) { + this.content = content; + return this; + } + + /** + * 如果content匹配该正则表达式 + * + * @param regex the regex + * @return the wx cp message router rule + */ + public WxCpTpMessageRouterRule rContent(String regex) { + this.rContent = regex; + return this; + } + + /** + * 如果fromUser等于某值 + * + * @param fromUser the from user + * @return the wx cp message router rule + */ + public WxCpTpMessageRouterRule fromUser(String fromUser) { + this.fromUser = fromUser; + return this; + } + + /** + * 如果消息匹配某个matcher,用在用户需要自定义更复杂的匹配规则的时候 + * + * @param matcher the matcher + * @return the wx cp message router rule + */ + public WxCpTpMessageRouterRule matcher(WxCpMessageMatcher matcher) { + this.matcher = matcher; + return this; + } + + /** + * 设置微信消息拦截器 + * + * @param interceptor the interceptor + * @return the wx cp message router rule + */ + public WxCpTpMessageRouterRule interceptor(WxCpTpMessageInterceptor interceptor) { + return interceptor(interceptor, (WxCpTpMessageInterceptor[]) null); + } + + /** + * 设置微信消息拦截器 + * + * @param interceptor the interceptor + * @param otherInterceptors the other interceptors + * @return the wx cp message router rule + */ + public WxCpTpMessageRouterRule interceptor(WxCpTpMessageInterceptor interceptor, WxCpTpMessageInterceptor... otherInterceptors) { + this.interceptors.add(interceptor); + if (otherInterceptors != null && otherInterceptors.length > 0) { + Collections.addAll(this.interceptors, otherInterceptors); + } + return this; + } + + /** + * 设置微信消息处理器 + * + * @param handler the handler + * @return the wx cp message router rule + */ + public WxCpTpMessageRouterRule handler(WxCpTpMessageHandler handler) { + return handler(handler, (WxCpTpMessageHandler[]) null); + } + + /** + * 设置微信消息处理器 + * + * @param handler the handler + * @param otherHandlers the other handlers + * @return the wx cp message router rule + */ + public WxCpTpMessageRouterRule handler(WxCpTpMessageHandler handler, WxCpTpMessageHandler... otherHandlers) { + this.handlers.add(handler); + if (otherHandlers != null && otherHandlers.length > 0) { + Collections.addAll(this.handlers, otherHandlers); + } + return this; + } + + /** + * 规则结束,代表如果一个消息匹配该规则,那么它将不再会进入其他规则 + * + * @return the wx cp message router + */ + public WxCpTpMessageRouter end() { + this.routerBuilder.getRules().add(this); + return this.routerBuilder; + } + + /** + * 规则结束,但是消息还会进入其他规则 + * + * @return the wx cp message router + */ + public WxCpTpMessageRouter next() { + this.reEnter = true; + return end(); + } + + /** + * Test boolean. + * + * @param wxMessage the wx message + * @return the boolean + */ + protected boolean test(WxCpXmlMessage wxMessage) { + return + (this.fromUser == null || this.fromUser.equals(wxMessage.getFromUserName())) + && + (this.agentId == null || this.agentId.equals(wxMessage.getAgentId())) + && + (this.msgType == null || this.msgType.equalsIgnoreCase(wxMessage.getMsgType())) + && + (this.event == null || this.event.equalsIgnoreCase(wxMessage.getEvent())) + && + (this.eventKey == null || this.eventKey.equalsIgnoreCase(wxMessage.getEventKey())) + && + (this.eventKeyRegex == null || Pattern.matches(this.eventKeyRegex, StringUtils.trimToEmpty(wxMessage.getEventKey()))) + && + (this.content == null || this.content.equals(StringUtils.trimToNull(wxMessage.getContent()))) + && + (this.rContent == null || Pattern.matches(this.rContent, StringUtils.trimToEmpty(wxMessage.getContent()))) + && + (this.matcher == null || this.matcher.match(wxMessage)) + ; + } + + /** + * 处理微信推送过来的消息 + * + * @param wxMessage the wx message + * @param context the context + * @param wxCpService the wx cp service + * @param sessionManager the session manager + * @param exceptionHandler the exception handler + * @return true 代表继续执行别的router,false 代表停止执行别的router + */ + protected WxCpXmlOutMessage service(WxCpXmlMessage wxMessage, + Map context, + WxCpTpService wxCpService, + WxSessionManager sessionManager, + WxErrorExceptionHandler exceptionHandler) { + + if (context == null) { + context = new HashMap<>(2); + } + + try { + // 如果拦截器不通过 + for (WxCpTpMessageInterceptor interceptor : this.interceptors) { + if (!interceptor.intercept(wxMessage, context, wxCpService, sessionManager)) { + return null; + } + } + + // 交给handler处理 + WxCpXmlOutMessage res = null; + for (WxCpTpMessageHandler handler : this.handlers) { + // 返回最后handler的结果 + res = handler.handle(wxMessage, context, wxCpService, sessionManager); + } + return res; + + } catch (WxErrorException e) { + exceptionHandler.handle(e); + } + + return null; + + } + + +} diff --git a/weixin-java-cp/src/main/java/me/chanjar/weixin/cp/api/WxCpTpService.java b/weixin-java-cp/src/main/java/me/chanjar/weixin/cp/tp/service/WxCpTpService.java similarity index 96% rename from weixin-java-cp/src/main/java/me/chanjar/weixin/cp/api/WxCpTpService.java rename to weixin-java-cp/src/main/java/me/chanjar/weixin/cp/tp/service/WxCpTpService.java index 1e3852c50..26da1ddd4 100644 --- a/weixin-java-cp/src/main/java/me/chanjar/weixin/cp/api/WxCpTpService.java +++ b/weixin-java-cp/src/main/java/me/chanjar/weixin/cp/tp/service/WxCpTpService.java @@ -1,7 +1,8 @@ -package me.chanjar.weixin.cp.api; +package me.chanjar.weixin.cp.tp.service; import me.chanjar.weixin.common.bean.WxAccessToken; import me.chanjar.weixin.common.error.WxErrorException; +import me.chanjar.weixin.common.session.WxSessionManager; import me.chanjar.weixin.common.util.http.MediaUploadRequestExecutor; import me.chanjar.weixin.common.util.http.RequestExecutor; import me.chanjar.weixin.common.util.http.RequestHttp; @@ -12,7 +13,7 @@ import me.chanjar.weixin.cp.bean.WxCpTpPermanentCodeInfo; import me.chanjar.weixin.cp.config.WxCpTpConfigStorage; /** - * 微信第三方应用API的Service. + * 企业微信第三方应用API的Service. * * @author zhenjun cai */ @@ -226,4 +227,10 @@ public interface WxCpTpService { */ RequestHttp getRequestHttp(); + /** + * 获取WxSessionManager 对象 + * + * @return WxSessionManager session manager + */ + WxSessionManager getSessionManager(); } diff --git a/weixin-java-cp/src/main/java/me/chanjar/weixin/cp/api/impl/BaseWxCpTpServiceImpl.java b/weixin-java-cp/src/main/java/me/chanjar/weixin/cp/tp/service/impl/BaseWxCpTpServiceImpl.java similarity index 96% rename from weixin-java-cp/src/main/java/me/chanjar/weixin/cp/api/impl/BaseWxCpTpServiceImpl.java rename to weixin-java-cp/src/main/java/me/chanjar/weixin/cp/tp/service/impl/BaseWxCpTpServiceImpl.java index 0305692b5..859fc1648 100644 --- a/weixin-java-cp/src/main/java/me/chanjar/weixin/cp/api/impl/BaseWxCpTpServiceImpl.java +++ b/weixin-java-cp/src/main/java/me/chanjar/weixin/cp/tp/service/impl/BaseWxCpTpServiceImpl.java @@ -1,4 +1,4 @@ -package me.chanjar.weixin.cp.api.impl; +package me.chanjar.weixin.cp.tp.service.impl; import com.google.common.base.Joiner; import com.google.gson.JsonObject; @@ -9,6 +9,8 @@ import me.chanjar.weixin.common.enums.WxType; import me.chanjar.weixin.common.error.WxCpErrorMsgEnum; import me.chanjar.weixin.common.error.WxError; import me.chanjar.weixin.common.error.WxErrorException; +import me.chanjar.weixin.common.session.StandardSessionManager; +import me.chanjar.weixin.common.session.WxSessionManager; import me.chanjar.weixin.common.util.DataUtils; import me.chanjar.weixin.common.util.crypto.SHA1; import me.chanjar.weixin.common.util.http.RequestExecutor; @@ -16,9 +18,9 @@ import me.chanjar.weixin.common.util.http.RequestHttp; import me.chanjar.weixin.common.util.http.SimpleGetRequestExecutor; import me.chanjar.weixin.common.util.http.SimplePostRequestExecutor; import me.chanjar.weixin.common.util.json.GsonParser; -import me.chanjar.weixin.cp.api.WxCpTpService; import me.chanjar.weixin.cp.bean.*; import me.chanjar.weixin.cp.config.WxCpTpConfigStorage; +import me.chanjar.weixin.cp.tp.service.WxCpTpService; import org.apache.commons.lang3.StringUtils; import java.io.File; @@ -49,6 +51,8 @@ public abstract class BaseWxCpTpServiceImpl implements WxCpTpService, Requ protected WxCpTpConfigStorage configStorage; + private WxSessionManager sessionManager = new StandardSessionManager(); + /** * 临时文件目录. */ @@ -274,4 +278,9 @@ public abstract class BaseWxCpTpServiceImpl implements WxCpTpService, Requ return this; } + @Override + public WxSessionManager getSessionManager() { + return this.sessionManager; + } + } diff --git a/weixin-java-cp/src/main/java/me/chanjar/weixin/cp/api/impl/WxCpTpServiceApacheHttpClientImpl.java b/weixin-java-cp/src/main/java/me/chanjar/weixin/cp/tp/service/impl/WxCpTpServiceApacheHttpClientImpl.java similarity index 98% rename from weixin-java-cp/src/main/java/me/chanjar/weixin/cp/api/impl/WxCpTpServiceApacheHttpClientImpl.java rename to weixin-java-cp/src/main/java/me/chanjar/weixin/cp/tp/service/impl/WxCpTpServiceApacheHttpClientImpl.java index cdc6b2cfc..ec53789e6 100644 --- a/weixin-java-cp/src/main/java/me/chanjar/weixin/cp/api/impl/WxCpTpServiceApacheHttpClientImpl.java +++ b/weixin-java-cp/src/main/java/me/chanjar/weixin/cp/tp/service/impl/WxCpTpServiceApacheHttpClientImpl.java @@ -1,4 +1,4 @@ -package me.chanjar.weixin.cp.api.impl; +package me.chanjar.weixin.cp.tp.service.impl; import com.google.gson.JsonObject; diff --git a/weixin-java-cp/src/main/java/me/chanjar/weixin/cp/api/impl/WxCpTpServiceImpl.java b/weixin-java-cp/src/main/java/me/chanjar/weixin/cp/tp/service/impl/WxCpTpServiceImpl.java similarity index 82% rename from weixin-java-cp/src/main/java/me/chanjar/weixin/cp/api/impl/WxCpTpServiceImpl.java rename to weixin-java-cp/src/main/java/me/chanjar/weixin/cp/tp/service/impl/WxCpTpServiceImpl.java index f5582021e..58fb09cf9 100644 --- a/weixin-java-cp/src/main/java/me/chanjar/weixin/cp/api/impl/WxCpTpServiceImpl.java +++ b/weixin-java-cp/src/main/java/me/chanjar/weixin/cp/tp/service/impl/WxCpTpServiceImpl.java @@ -1,4 +1,4 @@ -package me.chanjar.weixin.cp.api.impl; +package me.chanjar.weixin.cp.tp.service.impl; /** *
diff --git a/weixin-java-cp/src/test/java/me/chanjar/weixin/cp/api/impl/BaseWxCpTpServiceImplTest.java b/weixin-java-cp/src/test/java/me/chanjar/weixin/cp/tp/service/impl/BaseWxCpTpServiceImplTest.java
similarity index 91%
rename from weixin-java-cp/src/test/java/me/chanjar/weixin/cp/api/impl/BaseWxCpTpServiceImplTest.java
rename to weixin-java-cp/src/test/java/me/chanjar/weixin/cp/tp/service/impl/BaseWxCpTpServiceImplTest.java
index 9f7973561..83ace79f3 100644
--- a/weixin-java-cp/src/test/java/me/chanjar/weixin/cp/api/impl/BaseWxCpTpServiceImplTest.java
+++ b/weixin-java-cp/src/test/java/me/chanjar/weixin/cp/tp/service/impl/BaseWxCpTpServiceImplTest.java
@@ -1,20 +1,20 @@
-package me.chanjar.weixin.cp.api.impl;
+package me.chanjar.weixin.cp.tp.service.impl;
 
 import com.google.gson.JsonObject;
 import me.chanjar.weixin.common.error.WxErrorException;
-import me.chanjar.weixin.cp.api.WxCpTpService;
 import me.chanjar.weixin.cp.bean.WxCpTpAuthInfo;
 import me.chanjar.weixin.cp.bean.WxCpTpCorp;
 import me.chanjar.weixin.cp.bean.WxCpTpPermanentCodeInfo;
 import me.chanjar.weixin.cp.config.WxCpTpConfigStorage;
 import me.chanjar.weixin.cp.config.impl.WxCpTpDefaultConfigImpl;
+import me.chanjar.weixin.cp.tp.service.WxCpTpService;
+import org.mockito.Mockito;
+import org.testng.Assert;
 import org.testng.annotations.Test;
 
 import static me.chanjar.weixin.cp.constant.WxCpApiPathConsts.Tp.GET_AUTH_INFO;
 import static me.chanjar.weixin.cp.constant.WxCpApiPathConsts.Tp.GET_PERMANENT_CODE;
 import static org.assertj.core.api.Assertions.assertThat;
-import static org.mockito.Mockito.*;
-import static org.testng.Assert.*;
 
 /**
  * 测试代码.
@@ -23,7 +23,7 @@ import static org.testng.Assert.*;
  * @date 2019-08-18
  */
 public class BaseWxCpTpServiceImplTest {
-  private WxCpTpService tpService = spy(new WxCpTpServiceImpl());
+  private final WxCpTpService tpService = Mockito.spy(new WxCpTpServiceImpl());
 
   @Test
   public void testCheckSignature() {
@@ -123,7 +123,7 @@ public class BaseWxCpTpServiceImplTest {
     JsonObject jsonObject = new JsonObject();
     String authCode = "";
     jsonObject.addProperty("auth_code", authCode);
-    doReturn(returnJson).when(tpService).post(configStorage.getApiUrl(GET_PERMANENT_CODE), jsonObject.toString());
+    Mockito.doReturn(returnJson).when(tpService).post(configStorage.getApiUrl(GET_PERMANENT_CODE), jsonObject.toString());
 
     final WxCpTpCorp tpCorp = tpService.getPermanentCode(authCode);
     assertThat(tpCorp.getPermanentCode()).isEqualTo("xxxx");
@@ -134,7 +134,7 @@ public class BaseWxCpTpServiceImplTest {
   }
 
   @Test
-  public void testGetPermanentCodeInfo() throws WxErrorException{
+  public void testGetPermanentCodeInfo() throws WxErrorException {
     String returnJson = "{\n" +
       "  \"access_token\": \"u6SoEWyrEmworJ1uNzddbiXh42mCLNU_mdd6b01Afo2LKmyi-WdaaYqhEGFZjB1RGZ-rhjLcAJ86ger7b7Q0gowSw9iIDR8dm49aVH_MztzmQttP3XFG7np1Dxs_VQkVwhhRmfRpEonAmK1_JWIFqayJXXiPUS3LsFd3tWpE7rxmsRa7Ev2ml2htbRp_qGUjtFTErKoDsnNGSka6_RqFPA\", \n" +
       "  \"expires_in\": 7200, \n" +
@@ -187,15 +187,15 @@ public class BaseWxCpTpServiceImplTest {
     JsonObject jsonObject = new JsonObject();
     String authCode = "";
     jsonObject.addProperty("auth_code", authCode);
-    doReturn(returnJson).when(tpService).post(configStorage.getApiUrl(GET_PERMANENT_CODE), jsonObject.toString());
+    Mockito.doReturn(returnJson).when(tpService).post(configStorage.getApiUrl(GET_PERMANENT_CODE), jsonObject.toString());
     final WxCpTpPermanentCodeInfo tpPermanentCodeInfo = tpService.getPermanentCodeInfo(authCode);
     assertThat(tpPermanentCodeInfo.getAuthInfo().getAgents().get(0).getAgentId()).isEqualTo(1000012);
-    assertNotNull(tpPermanentCodeInfo.getAuthInfo().getAgents().get(0).getSquareLogoUrl());
-    assertNotNull(tpPermanentCodeInfo.getAuthCorpInfo().getCorpSquareLogoUrl());
+    Assert.assertNotNull(tpPermanentCodeInfo.getAuthInfo().getAgents().get(0).getSquareLogoUrl());
+    Assert.assertNotNull(tpPermanentCodeInfo.getAuthCorpInfo().getCorpSquareLogoUrl());
   }
 
   @Test
-  public void testGetAuthInfo() throws WxErrorException{
+  public void testGetAuthInfo() throws WxErrorException {
     String returnJson = "{\n" +
       "    \"errcode\":0 ,\n" +
       "    \"errmsg\":\"ok\" ,\n" +
@@ -260,9 +260,9 @@ public class BaseWxCpTpServiceImplTest {
     String permanentCode = "xxxxx";
     jsonObject.addProperty("auth_corpid", authCorpId);
     jsonObject.addProperty("permanent_code", permanentCode);
-    doReturn(returnJson).when(tpService).post(configStorage.getApiUrl(GET_AUTH_INFO), jsonObject.toString());
-    WxCpTpAuthInfo authInfo = tpService.getAuthInfo(authCorpId,permanentCode);
-    assertNotNull(authInfo.getAuthCorpInfo().getCorpId());
+    Mockito.doReturn(returnJson).when(tpService).post(configStorage.getApiUrl(GET_AUTH_INFO), jsonObject.toString());
+    WxCpTpAuthInfo authInfo = tpService.getAuthInfo(authCorpId, permanentCode);
+    Assert.assertNotNull(authInfo.getAuthCorpInfo().getCorpId());
   }
 
   @Test
diff --git a/weixin-java-mp/src/main/java/me/chanjar/weixin/mp/api/WxMpMessageRouter.java b/weixin-java-mp/src/main/java/me/chanjar/weixin/mp/api/WxMpMessageRouter.java
index 9b2ca03fe..36556d984 100644
--- a/weixin-java-mp/src/main/java/me/chanjar/weixin/mp/api/WxMpMessageRouter.java
+++ b/weixin-java-mp/src/main/java/me/chanjar/weixin/mp/api/WxMpMessageRouter.java
@@ -1,5 +1,6 @@
 package me.chanjar.weixin.mp.api;
 
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
 import me.chanjar.weixin.common.api.WxErrorExceptionHandler;
 import me.chanjar.weixin.common.api.WxMessageDuplicateChecker;
 import me.chanjar.weixin.common.api.WxMessageInMemoryDuplicateChecker;
@@ -18,10 +19,7 @@ import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
-import java.util.concurrent.ExecutionException;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.Future;
+import java.util.concurrent.*;
 
 /**
  * 
@@ -68,7 +66,9 @@ public class WxMpMessageRouter {
 
   public WxMpMessageRouter(WxMpService wxMpService) {
     this.wxMpService = wxMpService;
-    this.executorService = Executors.newFixedThreadPool(DEFAULT_THREAD_POOL_SIZE);
+    ThreadFactory namedThreadFactory = new ThreadFactoryBuilder().setNameFormat("WxMpMessageRouter-pool-%d").build();
+    this.executorService = new ThreadPoolExecutor(DEFAULT_THREAD_POOL_SIZE, DEFAULT_THREAD_POOL_SIZE,
+      0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<>(), namedThreadFactory);
     this.messageDuplicateChecker = new WxMessageInMemoryDuplicateChecker();
     this.sessionManager = new StandardSessionManager();
     this.exceptionHandler = new LogExceptionHandler();