Skip to content

Commit 95be03b

Browse files
🎨 #2663 优化重复消息检查器多实例导致多守护线程的问题,修改成单例+定时任务线程池处理
1 parent 41bb3b9 commit 95be03b

File tree

7 files changed

+150
-8
lines changed

7 files changed

+150
-8
lines changed

weixin-java-common/src/main/java/me/chanjar/weixin/common/api/WxMessageInMemoryDuplicateChecker.java

+2
Original file line numberDiff line numberDiff line change
@@ -8,10 +8,12 @@
88
* <pre>
99
* 默认消息重复检查器.
1010
* 将每个消息id保存在内存里,每隔5秒清理已经过期的消息id,每个消息id的过期时间是15秒
11+
* 替换类WxMessageInMemoryDuplicateCheckerSingleton
1112
* </pre>
1213
*
1314
* @author Daniel Qian
1415
*/
16+
@Deprecated
1517
public class WxMessageInMemoryDuplicateChecker implements WxMessageDuplicateChecker {
1618

1719
/**
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,91 @@
1+
package me.chanjar.weixin.common.api;
2+
3+
import com.google.common.util.concurrent.ThreadFactoryBuilder;
4+
import lombok.extern.slf4j.Slf4j;
5+
6+
import java.util.concurrent.ConcurrentHashMap;
7+
import java.util.concurrent.ScheduledThreadPoolExecutor;
8+
import java.util.concurrent.ThreadPoolExecutor;
9+
import java.util.concurrent.TimeUnit;
10+
11+
/**
12+
* @author jiangby
13+
* @version 1.0
14+
* <p>
15+
* 消息去重,记录消息ID首次出现时的时间戳,
16+
* 15S后定时任务触发时废除该记录消息ID
17+
* </p>
18+
* @date 2022/5/26 1:32
19+
*/
20+
@Slf4j
21+
public class WxMessageInMemoryDuplicateCheckerSingleton implements WxMessageDuplicateChecker {
22+
23+
/**
24+
* 一个消息ID在内存的过期时间:15秒.
25+
*/
26+
private static final Long TIME_TO_LIVE = 15L;
27+
28+
/**
29+
* 每隔多少周期检查消息ID是否过期:5秒.
30+
*/
31+
private static final Long CLEAR_PERIOD = 5L;
32+
33+
/**
34+
* 线程池
35+
*/
36+
private static final ScheduledThreadPoolExecutor SCHEDULED_THREAD_POOL_EXECUTOR = new ScheduledThreadPoolExecutor(1,
37+
new ThreadFactoryBuilder().setNameFormat("wxMessage-memory-pool-%d").build(), new ThreadPoolExecutor.AbortPolicy());
38+
39+
/**
40+
* 消息id->消息时间戳的map.
41+
*/
42+
private static final ConcurrentHashMap<String, Long> MSG_ID_2_TIMESTAMP = new ConcurrentHashMap<>();
43+
44+
static {
45+
SCHEDULED_THREAD_POOL_EXECUTOR.scheduleAtFixedRate(() -> {
46+
try {
47+
Long now = System.currentTimeMillis();
48+
MSG_ID_2_TIMESTAMP.entrySet().removeIf(entry -> now - entry.getValue() > TIME_TO_LIVE * 1000);
49+
} catch (Exception ex) {
50+
log.error("重复消息去重任务出现异常", ex);
51+
}
52+
}, 1, CLEAR_PERIOD, TimeUnit.SECONDS);
53+
}
54+
55+
/**
56+
* 私有化构造方法,避免外部调用
57+
*/
58+
private WxMessageInMemoryDuplicateCheckerSingleton() {
59+
}
60+
61+
/**
62+
* 获取单例
63+
*
64+
* @return 单例对象
65+
*/
66+
public static WxMessageInMemoryDuplicateCheckerSingleton getInstance() {
67+
return WxMessageInnerClass.CHECKER_SINGLETON;
68+
}
69+
70+
/**
71+
* 内部类实现单例
72+
*/
73+
private static class WxMessageInnerClass {
74+
static final WxMessageInMemoryDuplicateCheckerSingleton CHECKER_SINGLETON = new WxMessageInMemoryDuplicateCheckerSingleton();
75+
}
76+
77+
/**
78+
* messageId是否重复
79+
*
80+
* @param messageId messageId
81+
* @return 是否
82+
*/
83+
@Override
84+
public boolean isDuplicate(String messageId) {
85+
if (messageId == null) {
86+
return false;
87+
}
88+
Long timestamp = MSG_ID_2_TIMESTAMP.putIfAbsent(messageId, System.currentTimeMillis());
89+
return timestamp != null;
90+
}
91+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,45 @@
1+
package me.chanjar.weixin.common.api;
2+
3+
import org.testng.annotations.Test;
4+
5+
import java.util.concurrent.TimeUnit;
6+
7+
import static org.testng.Assert.assertFalse;
8+
import static org.testng.Assert.assertTrue;
9+
10+
/**
11+
* @author jiangby
12+
* @version 1.0
13+
* @description: 作用
14+
* @date 2022/5/26 1:46
15+
*/
16+
@Test
17+
public class WxMessageInMemoryDuplicateCheckerSingletonTest {
18+
19+
private static WxMessageInMemoryDuplicateCheckerSingleton checkerSingleton = WxMessageInMemoryDuplicateCheckerSingleton.getInstance();
20+
21+
public void test() throws InterruptedException {
22+
Long[] msgIds = new Long[]{1L, 2L, 3L, 4L, 5L, 6L, 7L, 8L};
23+
24+
// 第一次检查
25+
for (Long msgId : msgIds) {
26+
boolean result = checkerSingleton.isDuplicate(String.valueOf(msgId));
27+
assertFalse(result);
28+
}
29+
30+
// 初始化后1S进行检查 每五秒检查一次,过期时间为15秒,过15秒再检查
31+
TimeUnit.SECONDS.sleep(15);
32+
for (Long msgId : msgIds) {
33+
boolean result = checkerSingleton.isDuplicate(String.valueOf(msgId));
34+
assertTrue(result);
35+
}
36+
37+
// 过6秒再检查
38+
TimeUnit.SECONDS.sleep(6);
39+
for (Long msgId : msgIds) {
40+
boolean result = checkerSingleton.isDuplicate(String.valueOf(msgId));
41+
assertFalse(result);
42+
}
43+
44+
}
45+
}

weixin-java-cp/src/main/java/me/chanjar/weixin/cp/message/WxCpMessageRouter.java

+3-2
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@
55
import me.chanjar.weixin.common.api.WxErrorExceptionHandler;
66
import me.chanjar.weixin.common.api.WxMessageDuplicateChecker;
77
import me.chanjar.weixin.common.api.WxMessageInMemoryDuplicateChecker;
8+
import me.chanjar.weixin.common.api.WxMessageInMemoryDuplicateCheckerSingleton;
89
import me.chanjar.weixin.common.session.InternalSession;
910
import me.chanjar.weixin.common.session.InternalSessionManager;
1011
import me.chanjar.weixin.common.session.WxSessionManager;
@@ -71,7 +72,7 @@ public WxCpMessageRouter(WxCpService wxCpService) {
7172
ThreadFactory namedThreadFactory = new ThreadFactoryBuilder().setNameFormat("WxCpMessageRouter-pool-%d").build();
7273
this.executorService = new ThreadPoolExecutor(DEFAULT_THREAD_POOL_SIZE, DEFAULT_THREAD_POOL_SIZE,
7374
0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<>(), namedThreadFactory);
74-
this.messageDuplicateChecker = new WxMessageInMemoryDuplicateChecker();
75+
this.messageDuplicateChecker = WxMessageInMemoryDuplicateCheckerSingleton.getInstance();
7576
this.sessionManager = wxCpService.getSessionManager();
7677
this.exceptionHandler = new LogExceptionHandler();
7778
}
@@ -82,7 +83,7 @@ public WxCpMessageRouter(WxCpService wxCpService) {
8283
public WxCpMessageRouter(WxCpService wxMpService, ExecutorService executorService) {
8384
this.wxCpService = wxMpService;
8485
this.executorService = executorService;
85-
this.messageDuplicateChecker = new WxMessageInMemoryDuplicateChecker();
86+
this.messageDuplicateChecker = WxMessageInMemoryDuplicateCheckerSingleton.getInstance();
8687
this.sessionManager = wxCpService.getSessionManager();
8788
this.exceptionHandler = new LogExceptionHandler();
8889
}

weixin-java-cp/src/main/java/me/chanjar/weixin/cp/tp/message/WxCpTpMessageRouter.java

+3-2
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@
55
import me.chanjar.weixin.common.api.WxErrorExceptionHandler;
66
import me.chanjar.weixin.common.api.WxMessageDuplicateChecker;
77
import me.chanjar.weixin.common.api.WxMessageInMemoryDuplicateChecker;
8+
import me.chanjar.weixin.common.api.WxMessageInMemoryDuplicateCheckerSingleton;
89
import me.chanjar.weixin.common.session.InternalSession;
910
import me.chanjar.weixin.common.session.InternalSessionManager;
1011
import me.chanjar.weixin.common.session.WxSessionManager;
@@ -73,7 +74,7 @@ public WxCpTpMessageRouter(WxCpTpService wxCpTpService) {
7374
ThreadFactory namedThreadFactory = new ThreadFactoryBuilder().setNameFormat("WxCpTpMessageRouter-pool-%d").build();
7475
this.executorService = new ThreadPoolExecutor(DEFAULT_THREAD_POOL_SIZE, DEFAULT_THREAD_POOL_SIZE,
7576
0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<>(), namedThreadFactory);
76-
this.messageDuplicateChecker = new WxMessageInMemoryDuplicateChecker();
77+
this.messageDuplicateChecker = WxMessageInMemoryDuplicateCheckerSingleton.getInstance();
7778
this.sessionManager = wxCpTpService.getSessionManager();
7879
this.exceptionHandler = new LogExceptionHandler();
7980
}
@@ -84,7 +85,7 @@ public WxCpTpMessageRouter(WxCpTpService wxCpTpService) {
8485
public WxCpTpMessageRouter(WxCpTpService wxCpTpService, ExecutorService executorService) {
8586
this.wxCpTpService = wxCpTpService;
8687
this.executorService = executorService;
87-
this.messageDuplicateChecker = new WxMessageInMemoryDuplicateChecker();
88+
this.messageDuplicateChecker = WxMessageInMemoryDuplicateCheckerSingleton.getInstance();
8889
this.sessionManager = wxCpTpService.getSessionManager();
8990
this.exceptionHandler = new LogExceptionHandler();
9091
}

weixin-java-miniapp/src/main/java/cn/binarywang/wx/miniapp/message/WxMaMessageRouter.java

+3-2
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@
77
import me.chanjar.weixin.common.api.WxErrorExceptionHandler;
88
import me.chanjar.weixin.common.api.WxMessageDuplicateChecker;
99
import me.chanjar.weixin.common.api.WxMessageInMemoryDuplicateChecker;
10+
import me.chanjar.weixin.common.api.WxMessageInMemoryDuplicateCheckerSingleton;
1011
import me.chanjar.weixin.common.session.InternalSession;
1112
import me.chanjar.weixin.common.session.InternalSessionManager;
1213
import me.chanjar.weixin.common.session.StandardSessionManager;
@@ -48,7 +49,7 @@ public WxMaMessageRouter(WxMaService wxMaService) {
4849
0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<>(), namedThreadFactory);
4950
this.sessionManager = new StandardSessionManager();
5051
this.exceptionHandler = new LogExceptionHandler();
51-
this.messageDuplicateChecker = new WxMessageInMemoryDuplicateChecker();
52+
this.messageDuplicateChecker = WxMessageInMemoryDuplicateCheckerSingleton.getInstance();
5253
}
5354

5455
/**
@@ -59,7 +60,7 @@ public WxMaMessageRouter(WxMaService wxMaService, ExecutorService executorServic
5960
this.executorService = executorService;
6061
this.sessionManager = new StandardSessionManager();
6162
this.exceptionHandler = new LogExceptionHandler();
62-
this.messageDuplicateChecker = new WxMessageInMemoryDuplicateChecker();
63+
this.messageDuplicateChecker = WxMessageInMemoryDuplicateCheckerSingleton.getInstance();
6364
}
6465

6566
/**

weixin-java-mp/src/main/java/me/chanjar/weixin/mp/api/WxMpMessageRouter.java

+3-2
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@
66
import me.chanjar.weixin.common.api.WxErrorExceptionHandler;
77
import me.chanjar.weixin.common.api.WxMessageDuplicateChecker;
88
import me.chanjar.weixin.common.api.WxMessageInMemoryDuplicateChecker;
9+
import me.chanjar.weixin.common.api.WxMessageInMemoryDuplicateCheckerSingleton;
910
import me.chanjar.weixin.common.session.InternalSession;
1011
import me.chanjar.weixin.common.session.InternalSessionManager;
1112
import me.chanjar.weixin.common.session.StandardSessionManager;
@@ -72,7 +73,7 @@ public WxMpMessageRouter(WxMpService wxMpService) {
7273
ThreadFactory namedThreadFactory = new ThreadFactoryBuilder().setNameFormat("WxMpMessageRouter-pool-%d").build();
7374
this.executorService = new ThreadPoolExecutor(DEFAULT_THREAD_POOL_SIZE, DEFAULT_THREAD_POOL_SIZE,
7475
0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<>(), namedThreadFactory);
75-
this.messageDuplicateChecker = new WxMessageInMemoryDuplicateChecker();
76+
this.messageDuplicateChecker = WxMessageInMemoryDuplicateCheckerSingleton.getInstance();
7677
this.sessionManager = new StandardSessionManager();
7778
this.exceptionHandler = new LogExceptionHandler();
7879
}
@@ -83,7 +84,7 @@ public WxMpMessageRouter(WxMpService wxMpService) {
8384
public WxMpMessageRouter(WxMpService wxMpService, ExecutorService executorService) {
8485
this.wxMpService = wxMpService;
8586
this.executorService = executorService;
86-
this.messageDuplicateChecker = new WxMessageInMemoryDuplicateChecker();
87+
this.messageDuplicateChecker = WxMessageInMemoryDuplicateCheckerSingleton.getInstance();
8788
this.sessionManager = new StandardSessionManager();
8889
this.exceptionHandler = new LogExceptionHandler();
8990
}

0 commit comments

Comments
 (0)