mirror of
				https://gitee.com/binary/weixin-java-tools.git
				synced 2025-10-31 18:46:10 +08:00 
			
		
		
		
	🎨 #2663 优化重复消息检查器多实例导致多守护线程的问题,修改成单例+定时任务线程池处理
This commit is contained in:
		 helloworldByChinese
					helloworldByChinese
				
			
				
					committed by
					
						 GitHub
						GitHub
					
				
			
			
				
	
			
			
			 GitHub
						GitHub
					
				
			
						parent
						
							41bb3b9901
						
					
				
				
					commit
					95be03bf1c
				
			| @ -8,10 +8,12 @@ import java.util.concurrent.atomic.AtomicBoolean; | |||||||
|  * <pre> |  * <pre> | ||||||
|  * 默认消息重复检查器. |  * 默认消息重复检查器. | ||||||
|  * 将每个消息id保存在内存里,每隔5秒清理已经过期的消息id,每个消息id的过期时间是15秒 |  * 将每个消息id保存在内存里,每隔5秒清理已经过期的消息id,每个消息id的过期时间是15秒 | ||||||
|  |  * 替换类WxMessageInMemoryDuplicateCheckerSingleton | ||||||
|  * </pre> |  * </pre> | ||||||
|  * |  * | ||||||
|  * @author Daniel Qian |  * @author Daniel Qian | ||||||
|  */ |  */ | ||||||
|  | @Deprecated | ||||||
| public class WxMessageInMemoryDuplicateChecker implements WxMessageDuplicateChecker { | public class WxMessageInMemoryDuplicateChecker implements WxMessageDuplicateChecker { | ||||||
|  |  | ||||||
|   /** |   /** | ||||||
|  | |||||||
| @ -0,0 +1,91 @@ | |||||||
|  | package me.chanjar.weixin.common.api; | ||||||
|  |  | ||||||
|  | import com.google.common.util.concurrent.ThreadFactoryBuilder; | ||||||
|  | import lombok.extern.slf4j.Slf4j; | ||||||
|  |  | ||||||
|  | import java.util.concurrent.ConcurrentHashMap; | ||||||
|  | import java.util.concurrent.ScheduledThreadPoolExecutor; | ||||||
|  | import java.util.concurrent.ThreadPoolExecutor; | ||||||
|  | import java.util.concurrent.TimeUnit; | ||||||
|  |  | ||||||
|  | /** | ||||||
|  |  * @author jiangby | ||||||
|  |  * @version 1.0 | ||||||
|  |  * <p> | ||||||
|  |  *   消息去重,记录消息ID首次出现时的时间戳, | ||||||
|  |  *   15S后定时任务触发时废除该记录消息ID | ||||||
|  |  * </p> | ||||||
|  |  * @date 2022/5/26 1:32 | ||||||
|  |  */ | ||||||
|  | @Slf4j | ||||||
|  | public class WxMessageInMemoryDuplicateCheckerSingleton implements WxMessageDuplicateChecker { | ||||||
|  |  | ||||||
|  |   /** | ||||||
|  |    * 一个消息ID在内存的过期时间:15秒. | ||||||
|  |    */ | ||||||
|  |   private static final Long TIME_TO_LIVE = 15L; | ||||||
|  |  | ||||||
|  |   /** | ||||||
|  |    * 每隔多少周期检查消息ID是否过期:5秒. | ||||||
|  |    */ | ||||||
|  |   private static final Long CLEAR_PERIOD = 5L; | ||||||
|  |  | ||||||
|  |   /** | ||||||
|  |    * 线程池 | ||||||
|  |    */ | ||||||
|  |   private static final ScheduledThreadPoolExecutor SCHEDULED_THREAD_POOL_EXECUTOR = new ScheduledThreadPoolExecutor(1, | ||||||
|  |     new ThreadFactoryBuilder().setNameFormat("wxMessage-memory-pool-%d").build(), new ThreadPoolExecutor.AbortPolicy()); | ||||||
|  |  | ||||||
|  |   /** | ||||||
|  |    * 消息id->消息时间戳的map. | ||||||
|  |    */ | ||||||
|  |   private static final ConcurrentHashMap<String, Long> MSG_ID_2_TIMESTAMP = new ConcurrentHashMap<>(); | ||||||
|  |  | ||||||
|  |   static { | ||||||
|  |     SCHEDULED_THREAD_POOL_EXECUTOR.scheduleAtFixedRate(() -> { | ||||||
|  |       try { | ||||||
|  |         Long now = System.currentTimeMillis(); | ||||||
|  |         MSG_ID_2_TIMESTAMP.entrySet().removeIf(entry -> now - entry.getValue() > TIME_TO_LIVE * 1000); | ||||||
|  |       } catch (Exception ex) { | ||||||
|  |         log.error("重复消息去重任务出现异常", ex); | ||||||
|  |       } | ||||||
|  |     }, 1, CLEAR_PERIOD, TimeUnit.SECONDS); | ||||||
|  |   } | ||||||
|  |  | ||||||
|  |   /** | ||||||
|  |    * 私有化构造方法,避免外部调用 | ||||||
|  |    */ | ||||||
|  |   private WxMessageInMemoryDuplicateCheckerSingleton() { | ||||||
|  |   } | ||||||
|  |  | ||||||
|  |   /** | ||||||
|  |    * 获取单例 | ||||||
|  |    * | ||||||
|  |    * @return 单例对象 | ||||||
|  |    */ | ||||||
|  |   public static WxMessageInMemoryDuplicateCheckerSingleton getInstance() { | ||||||
|  |     return WxMessageInnerClass.CHECKER_SINGLETON; | ||||||
|  |   } | ||||||
|  |  | ||||||
|  |   /** | ||||||
|  |    * 内部类实现单例 | ||||||
|  |    */ | ||||||
|  |   private static class WxMessageInnerClass { | ||||||
|  |      static final WxMessageInMemoryDuplicateCheckerSingleton CHECKER_SINGLETON = new WxMessageInMemoryDuplicateCheckerSingleton(); | ||||||
|  |   } | ||||||
|  |  | ||||||
|  |   /** | ||||||
|  |    * messageId是否重复 | ||||||
|  |    * | ||||||
|  |    * @param messageId messageId | ||||||
|  |    * @return 是否 | ||||||
|  |    */ | ||||||
|  |   @Override | ||||||
|  |   public boolean isDuplicate(String messageId) { | ||||||
|  |     if (messageId == null) { | ||||||
|  |       return false; | ||||||
|  |     } | ||||||
|  |     Long timestamp = MSG_ID_2_TIMESTAMP.putIfAbsent(messageId, System.currentTimeMillis()); | ||||||
|  |     return timestamp != null; | ||||||
|  |   } | ||||||
|  | } | ||||||
| @ -0,0 +1,45 @@ | |||||||
|  | package me.chanjar.weixin.common.api; | ||||||
|  |  | ||||||
|  | import org.testng.annotations.Test; | ||||||
|  |  | ||||||
|  | import java.util.concurrent.TimeUnit; | ||||||
|  |  | ||||||
|  | import static org.testng.Assert.assertFalse; | ||||||
|  | import static org.testng.Assert.assertTrue; | ||||||
|  |  | ||||||
|  | /** | ||||||
|  |  * @author jiangby | ||||||
|  |  * @version 1.0 | ||||||
|  |  * @description: 作用 | ||||||
|  |  * @date 2022/5/26 1:46 | ||||||
|  |  */ | ||||||
|  | @Test | ||||||
|  | public class WxMessageInMemoryDuplicateCheckerSingletonTest { | ||||||
|  |  | ||||||
|  |   private static WxMessageInMemoryDuplicateCheckerSingleton checkerSingleton = WxMessageInMemoryDuplicateCheckerSingleton.getInstance(); | ||||||
|  |  | ||||||
|  |   public void test() throws InterruptedException { | ||||||
|  |     Long[] msgIds = new Long[]{1L, 2L, 3L, 4L, 5L, 6L, 7L, 8L}; | ||||||
|  |  | ||||||
|  |     // 第一次检查 | ||||||
|  |     for (Long msgId : msgIds) { | ||||||
|  |       boolean result = checkerSingleton.isDuplicate(String.valueOf(msgId)); | ||||||
|  |       assertFalse(result); | ||||||
|  |     } | ||||||
|  |  | ||||||
|  |     // 初始化后1S进行检查 每五秒检查一次,过期时间为15秒,过15秒再检查 | ||||||
|  |     TimeUnit.SECONDS.sleep(15); | ||||||
|  |     for (Long msgId : msgIds) { | ||||||
|  |       boolean result = checkerSingleton.isDuplicate(String.valueOf(msgId)); | ||||||
|  |       assertTrue(result); | ||||||
|  |     } | ||||||
|  |  | ||||||
|  |     // 过6秒再检查 | ||||||
|  |     TimeUnit.SECONDS.sleep(6); | ||||||
|  |     for (Long msgId : msgIds) { | ||||||
|  |       boolean result = checkerSingleton.isDuplicate(String.valueOf(msgId)); | ||||||
|  |       assertFalse(result); | ||||||
|  |     } | ||||||
|  |  | ||||||
|  |   } | ||||||
|  | } | ||||||
| @ -5,6 +5,7 @@ import lombok.extern.slf4j.Slf4j; | |||||||
| import me.chanjar.weixin.common.api.WxErrorExceptionHandler; | import me.chanjar.weixin.common.api.WxErrorExceptionHandler; | ||||||
| import me.chanjar.weixin.common.api.WxMessageDuplicateChecker; | import me.chanjar.weixin.common.api.WxMessageDuplicateChecker; | ||||||
| import me.chanjar.weixin.common.api.WxMessageInMemoryDuplicateChecker; | import me.chanjar.weixin.common.api.WxMessageInMemoryDuplicateChecker; | ||||||
|  | import me.chanjar.weixin.common.api.WxMessageInMemoryDuplicateCheckerSingleton; | ||||||
| import me.chanjar.weixin.common.session.InternalSession; | import me.chanjar.weixin.common.session.InternalSession; | ||||||
| import me.chanjar.weixin.common.session.InternalSessionManager; | import me.chanjar.weixin.common.session.InternalSessionManager; | ||||||
| import me.chanjar.weixin.common.session.WxSessionManager; | import me.chanjar.weixin.common.session.WxSessionManager; | ||||||
| @ -71,7 +72,7 @@ public class WxCpMessageRouter { | |||||||
|     ThreadFactory namedThreadFactory = new ThreadFactoryBuilder().setNameFormat("WxCpMessageRouter-pool-%d").build(); |     ThreadFactory namedThreadFactory = new ThreadFactoryBuilder().setNameFormat("WxCpMessageRouter-pool-%d").build(); | ||||||
|     this.executorService = new ThreadPoolExecutor(DEFAULT_THREAD_POOL_SIZE, DEFAULT_THREAD_POOL_SIZE, |     this.executorService = new ThreadPoolExecutor(DEFAULT_THREAD_POOL_SIZE, DEFAULT_THREAD_POOL_SIZE, | ||||||
|       0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<>(), namedThreadFactory); |       0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<>(), namedThreadFactory); | ||||||
|     this.messageDuplicateChecker = new WxMessageInMemoryDuplicateChecker(); |     this.messageDuplicateChecker = WxMessageInMemoryDuplicateCheckerSingleton.getInstance(); | ||||||
|     this.sessionManager = wxCpService.getSessionManager(); |     this.sessionManager = wxCpService.getSessionManager(); | ||||||
|     this.exceptionHandler = new LogExceptionHandler(); |     this.exceptionHandler = new LogExceptionHandler(); | ||||||
|   } |   } | ||||||
| @ -82,7 +83,7 @@ public class WxCpMessageRouter { | |||||||
|   public WxCpMessageRouter(WxCpService wxMpService, ExecutorService executorService) { |   public WxCpMessageRouter(WxCpService wxMpService, ExecutorService executorService) { | ||||||
|     this.wxCpService = wxMpService; |     this.wxCpService = wxMpService; | ||||||
|     this.executorService = executorService; |     this.executorService = executorService; | ||||||
|     this.messageDuplicateChecker = new WxMessageInMemoryDuplicateChecker(); |     this.messageDuplicateChecker = WxMessageInMemoryDuplicateCheckerSingleton.getInstance(); | ||||||
|     this.sessionManager = wxCpService.getSessionManager(); |     this.sessionManager = wxCpService.getSessionManager(); | ||||||
|     this.exceptionHandler = new LogExceptionHandler(); |     this.exceptionHandler = new LogExceptionHandler(); | ||||||
|   } |   } | ||||||
|  | |||||||
| @ -5,6 +5,7 @@ import lombok.extern.slf4j.Slf4j; | |||||||
| import me.chanjar.weixin.common.api.WxErrorExceptionHandler; | import me.chanjar.weixin.common.api.WxErrorExceptionHandler; | ||||||
| import me.chanjar.weixin.common.api.WxMessageDuplicateChecker; | import me.chanjar.weixin.common.api.WxMessageDuplicateChecker; | ||||||
| import me.chanjar.weixin.common.api.WxMessageInMemoryDuplicateChecker; | import me.chanjar.weixin.common.api.WxMessageInMemoryDuplicateChecker; | ||||||
|  | import me.chanjar.weixin.common.api.WxMessageInMemoryDuplicateCheckerSingleton; | ||||||
| import me.chanjar.weixin.common.session.InternalSession; | import me.chanjar.weixin.common.session.InternalSession; | ||||||
| import me.chanjar.weixin.common.session.InternalSessionManager; | import me.chanjar.weixin.common.session.InternalSessionManager; | ||||||
| import me.chanjar.weixin.common.session.WxSessionManager; | import me.chanjar.weixin.common.session.WxSessionManager; | ||||||
| @ -73,7 +74,7 @@ public class WxCpTpMessageRouter { | |||||||
|     ThreadFactory namedThreadFactory = new ThreadFactoryBuilder().setNameFormat("WxCpTpMessageRouter-pool-%d").build(); |     ThreadFactory namedThreadFactory = new ThreadFactoryBuilder().setNameFormat("WxCpTpMessageRouter-pool-%d").build(); | ||||||
|     this.executorService = new ThreadPoolExecutor(DEFAULT_THREAD_POOL_SIZE, DEFAULT_THREAD_POOL_SIZE, |     this.executorService = new ThreadPoolExecutor(DEFAULT_THREAD_POOL_SIZE, DEFAULT_THREAD_POOL_SIZE, | ||||||
|       0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<>(), namedThreadFactory); |       0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<>(), namedThreadFactory); | ||||||
|     this.messageDuplicateChecker = new WxMessageInMemoryDuplicateChecker(); |     this.messageDuplicateChecker = WxMessageInMemoryDuplicateCheckerSingleton.getInstance(); | ||||||
|     this.sessionManager = wxCpTpService.getSessionManager(); |     this.sessionManager = wxCpTpService.getSessionManager(); | ||||||
|     this.exceptionHandler = new LogExceptionHandler(); |     this.exceptionHandler = new LogExceptionHandler(); | ||||||
|   } |   } | ||||||
| @ -84,7 +85,7 @@ public class WxCpTpMessageRouter { | |||||||
|   public WxCpTpMessageRouter(WxCpTpService wxCpTpService, ExecutorService executorService) { |   public WxCpTpMessageRouter(WxCpTpService wxCpTpService, ExecutorService executorService) { | ||||||
|     this.wxCpTpService = wxCpTpService; |     this.wxCpTpService = wxCpTpService; | ||||||
|     this.executorService = executorService; |     this.executorService = executorService; | ||||||
|     this.messageDuplicateChecker = new WxMessageInMemoryDuplicateChecker(); |     this.messageDuplicateChecker = WxMessageInMemoryDuplicateCheckerSingleton.getInstance(); | ||||||
|     this.sessionManager = wxCpTpService.getSessionManager(); |     this.sessionManager = wxCpTpService.getSessionManager(); | ||||||
|     this.exceptionHandler = new LogExceptionHandler(); |     this.exceptionHandler = new LogExceptionHandler(); | ||||||
|   } |   } | ||||||
|  | |||||||
| @ -7,6 +7,7 @@ import lombok.Data; | |||||||
| import me.chanjar.weixin.common.api.WxErrorExceptionHandler; | import me.chanjar.weixin.common.api.WxErrorExceptionHandler; | ||||||
| import me.chanjar.weixin.common.api.WxMessageDuplicateChecker; | import me.chanjar.weixin.common.api.WxMessageDuplicateChecker; | ||||||
| import me.chanjar.weixin.common.api.WxMessageInMemoryDuplicateChecker; | import me.chanjar.weixin.common.api.WxMessageInMemoryDuplicateChecker; | ||||||
|  | import me.chanjar.weixin.common.api.WxMessageInMemoryDuplicateCheckerSingleton; | ||||||
| import me.chanjar.weixin.common.session.InternalSession; | import me.chanjar.weixin.common.session.InternalSession; | ||||||
| import me.chanjar.weixin.common.session.InternalSessionManager; | import me.chanjar.weixin.common.session.InternalSessionManager; | ||||||
| import me.chanjar.weixin.common.session.StandardSessionManager; | import me.chanjar.weixin.common.session.StandardSessionManager; | ||||||
| @ -48,7 +49,7 @@ public class WxMaMessageRouter { | |||||||
|       0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<>(), namedThreadFactory); |       0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<>(), namedThreadFactory); | ||||||
|     this.sessionManager = new StandardSessionManager(); |     this.sessionManager = new StandardSessionManager(); | ||||||
|     this.exceptionHandler = new LogExceptionHandler(); |     this.exceptionHandler = new LogExceptionHandler(); | ||||||
|     this.messageDuplicateChecker = new WxMessageInMemoryDuplicateChecker(); |     this.messageDuplicateChecker = WxMessageInMemoryDuplicateCheckerSingleton.getInstance(); | ||||||
|   } |   } | ||||||
|  |  | ||||||
|   /** |   /** | ||||||
| @ -59,7 +60,7 @@ public class WxMaMessageRouter { | |||||||
|     this.executorService = executorService; |     this.executorService = executorService; | ||||||
|     this.sessionManager = new StandardSessionManager(); |     this.sessionManager = new StandardSessionManager(); | ||||||
|     this.exceptionHandler = new LogExceptionHandler(); |     this.exceptionHandler = new LogExceptionHandler(); | ||||||
|     this.messageDuplicateChecker = new WxMessageInMemoryDuplicateChecker(); |     this.messageDuplicateChecker = WxMessageInMemoryDuplicateCheckerSingleton.getInstance(); | ||||||
|   } |   } | ||||||
|  |  | ||||||
|   /** |   /** | ||||||
|  | |||||||
| @ -6,6 +6,7 @@ import lombok.extern.slf4j.Slf4j; | |||||||
| import me.chanjar.weixin.common.api.WxErrorExceptionHandler; | import me.chanjar.weixin.common.api.WxErrorExceptionHandler; | ||||||
| import me.chanjar.weixin.common.api.WxMessageDuplicateChecker; | import me.chanjar.weixin.common.api.WxMessageDuplicateChecker; | ||||||
| import me.chanjar.weixin.common.api.WxMessageInMemoryDuplicateChecker; | import me.chanjar.weixin.common.api.WxMessageInMemoryDuplicateChecker; | ||||||
|  | import me.chanjar.weixin.common.api.WxMessageInMemoryDuplicateCheckerSingleton; | ||||||
| import me.chanjar.weixin.common.session.InternalSession; | import me.chanjar.weixin.common.session.InternalSession; | ||||||
| import me.chanjar.weixin.common.session.InternalSessionManager; | import me.chanjar.weixin.common.session.InternalSessionManager; | ||||||
| import me.chanjar.weixin.common.session.StandardSessionManager; | import me.chanjar.weixin.common.session.StandardSessionManager; | ||||||
| @ -72,7 +73,7 @@ public class WxMpMessageRouter { | |||||||
|     ThreadFactory namedThreadFactory = new ThreadFactoryBuilder().setNameFormat("WxMpMessageRouter-pool-%d").build(); |     ThreadFactory namedThreadFactory = new ThreadFactoryBuilder().setNameFormat("WxMpMessageRouter-pool-%d").build(); | ||||||
|     this.executorService = new ThreadPoolExecutor(DEFAULT_THREAD_POOL_SIZE, DEFAULT_THREAD_POOL_SIZE, |     this.executorService = new ThreadPoolExecutor(DEFAULT_THREAD_POOL_SIZE, DEFAULT_THREAD_POOL_SIZE, | ||||||
|       0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<>(), namedThreadFactory); |       0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<>(), namedThreadFactory); | ||||||
|     this.messageDuplicateChecker = new WxMessageInMemoryDuplicateChecker(); |     this.messageDuplicateChecker = WxMessageInMemoryDuplicateCheckerSingleton.getInstance(); | ||||||
|     this.sessionManager = new StandardSessionManager(); |     this.sessionManager = new StandardSessionManager(); | ||||||
|     this.exceptionHandler = new LogExceptionHandler(); |     this.exceptionHandler = new LogExceptionHandler(); | ||||||
|   } |   } | ||||||
| @ -83,7 +84,7 @@ public class WxMpMessageRouter { | |||||||
|   public WxMpMessageRouter(WxMpService wxMpService, ExecutorService executorService) { |   public WxMpMessageRouter(WxMpService wxMpService, ExecutorService executorService) { | ||||||
|     this.wxMpService = wxMpService; |     this.wxMpService = wxMpService; | ||||||
|     this.executorService = executorService; |     this.executorService = executorService; | ||||||
|     this.messageDuplicateChecker = new WxMessageInMemoryDuplicateChecker(); |     this.messageDuplicateChecker = WxMessageInMemoryDuplicateCheckerSingleton.getInstance(); | ||||||
|     this.sessionManager = new StandardSessionManager(); |     this.sessionManager = new StandardSessionManager(); | ||||||
|     this.exceptionHandler = new LogExceptionHandler(); |     this.exceptionHandler = new LogExceptionHandler(); | ||||||
|   } |   } | ||||||
|  | |||||||
		Reference in New Issue
	
	Block a user