diff --git a/docs/changelog/90482.yaml b/docs/changelog/90482.yaml new file mode 100644 index 0000000000000..61f3e42609ea8 --- /dev/null +++ b/docs/changelog/90482.yaml @@ -0,0 +1,6 @@ +pr: 90482 +summary: Transport threads and `_hot_threads` +area: Infra/Core +type: enhancement +issues: + - 90334 diff --git a/server/src/main/java/org/elasticsearch/monitor/jvm/HotThreads.java b/server/src/main/java/org/elasticsearch/monitor/jvm/HotThreads.java index 0a1dbacf7e2f6..e8cec009056f6 100644 --- a/server/src/main/java/org/elasticsearch/monitor/jvm/HotThreads.java +++ b/server/src/main/java/org/elasticsearch/monitor/jvm/HotThreads.java @@ -15,6 +15,7 @@ import org.elasticsearch.common.unit.ByteSizeValue; import org.elasticsearch.common.util.Maps; import org.elasticsearch.core.TimeValue; +import org.elasticsearch.transport.Transports; import java.lang.management.ManagementFactory; import java.lang.management.ThreadInfo; @@ -313,12 +314,15 @@ String innerDetect(ThreadMXBean threadBean, SunThreadInfo sunThreadInfo, long cu case CPU -> { double percentCpu = getTimeSharePercentage(topThread.getCpuTime()); double percentOther = getTimeSharePercentage(topThread.getOtherTime()); + double percentTotal = (Transports.isTransportThread(threadName)) ? percentCpu : percentOther + percentCpu; + String otherLabel = (Transports.isTransportThread(threadName)) ? "idle" : "other"; sb.append( String.format( Locale.ROOT, - "%n%4.1f%% [cpu=%1.1f%%, other=%1.1f%%] (%s out of %s) %s usage by thread '%s'%n", - percentOther + percentCpu, + "%n%4.1f%% [cpu=%1.1f%%, %s=%1.1f%%] (%s out of %s) %s usage by thread '%s'%n", + percentTotal, percentCpu, + otherLabel, percentOther, TimeValue.timeValueNanos(topThread.getCpuTime() + topThread.getOtherTime()), interval, @@ -412,8 +416,8 @@ static int similarity(ThreadInfo threadInfo, ThreadInfo threadInfo0) { static class ThreadTimeAccumulator { private final long threadId; + private final String threadName; private final TimeValue interval; - private long cpuTime; private long blockedTime; private long waitedTime; @@ -425,6 +429,7 @@ static class ThreadTimeAccumulator { this.cpuTime = cpuTime; this.allocatedBytes = allocatedBytes; this.threadId = info.getThreadId(); + this.threadName = info.getThreadName(); this.interval = interval; } @@ -457,6 +462,8 @@ public long getRunnableTime() { // not running, or it has been asleep forever. if (getCpuTime() == 0) { return 0; + } else if (Transports.isTransportThread(threadName)) { + return getCpuTime(); } return Math.max(interval.nanos() - getWaitedTime() - getBlockedTime(), 0); } @@ -468,7 +475,7 @@ public long getOtherTime() { return 0; } - return Math.max(getRunnableTime() - getCpuTime(), 0); + return Math.max(interval.nanos() - getWaitedTime() - getBlockedTime() - getCpuTime(), 0); } public long getBlockedTime() { diff --git a/server/src/main/java/org/elasticsearch/transport/Transports.java b/server/src/main/java/org/elasticsearch/transport/Transports.java index 992c543ae1bd3..897a88337c8bc 100644 --- a/server/src/main/java/org/elasticsearch/transport/Transports.java +++ b/server/src/main/java/org/elasticsearch/transport/Transports.java @@ -36,7 +36,16 @@ public enum Transports { * networking threads. */ public static boolean isTransportThread(Thread t) { - final String threadName = t.getName(); + return isTransportThread(t.getName()); + } + + /** + * Utility method to detect whether a thread is a network thread. Typically + * used in assertions to make sure that we do not call blocking code from + * networking threads. + * @param threadName the name of the thread + */ + public static boolean isTransportThread(String threadName) { for (String s : TRANSPORT_THREAD_NAMES) { if (threadName.contains(s)) { return true; diff --git a/server/src/test/java/org/elasticsearch/monitor/jvm/HotThreadsTests.java b/server/src/test/java/org/elasticsearch/monitor/jvm/HotThreadsTests.java index 733c08a87bcf9..b1fba5d18966f 100644 --- a/server/src/test/java/org/elasticsearch/monitor/jvm/HotThreadsTests.java +++ b/server/src/test/java/org/elasticsearch/monitor/jvm/HotThreadsTests.java @@ -21,6 +21,7 @@ import java.util.Locale; import java.util.Map; +import static org.elasticsearch.transport.Transports.TEST_MOCK_TRANSPORT_THREAD_PREFIX; import static org.hamcrest.Matchers.containsString; import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.stringContainsInOrder; @@ -210,15 +211,15 @@ class SimilarityTestCase { assertEquals(0, HotThreads.similarity(threadOne, null)); } - private ThreadInfo makeThreadInfoMocksHelper(ThreadMXBean mockedMXBean, long threadId) { - return makeThreadInfoMocksHelper(mockedMXBean, threadId, 1L); + private ThreadInfo makeThreadInfoMocksHelper(ThreadMXBean mockedMXBean, String threadPrefix, long threadId) { + return makeThreadInfoMocksHelper(mockedMXBean, threadPrefix, threadId, 1L); } - private ThreadInfo makeThreadInfoMocksHelper(ThreadMXBean mockedMXBean, long threadId, long cpuMultiplier) { + private ThreadInfo makeThreadInfoMocksHelper(ThreadMXBean mockedMXBean, String threadPrefix, long threadId, long cpuMultiplier) { when(mockedMXBean.getThreadCpuTime(threadId)).thenReturn(0L).thenReturn(threadId * cpuMultiplier); ThreadInfo mockedThreadInfo = mock(ThreadInfo.class); when(mockedMXBean.getThreadInfo(eq(threadId), anyInt())).thenReturn(mockedThreadInfo); - when(mockedThreadInfo.getThreadName()).thenReturn(String.format(Locale.ROOT, "Thread %d", threadId)); + when(mockedThreadInfo.getThreadName()).thenReturn(String.format(Locale.ROOT, "%s %d", threadPrefix, threadId)); // We create some variability for the blocked and waited times. Odd and even. when(mockedThreadInfo.getBlockedCount()).thenReturn(0L).thenReturn(threadId % 2); @@ -242,16 +243,21 @@ private ThreadInfo makeThreadInfoMocksHelper(ThreadMXBean mockedMXBean, long thr return mockedThreadInfo; } - private List makeThreadInfoMocksHelper(ThreadMXBean mockedMXBean, long[] threadIds) { - return makeThreadInfoMocksHelper(mockedMXBean, threadIds, 1L); + private List makeThreadInfoMocksHelper(ThreadMXBean mockedMXBean, String threadPrefix, long[] threadIds) { + return makeThreadInfoMocksHelper(mockedMXBean, threadPrefix, threadIds, 1L); } // We call this helper for each different mode to reset the before and after timings. - private List makeThreadInfoMocksHelper(ThreadMXBean mockedMXBean, long[] threadIds, long cpuMultiplier) { + private List makeThreadInfoMocksHelper( + ThreadMXBean mockedMXBean, + String threadPrefix, + long[] threadIds, + long cpuMultiplier + ) { List allInfos = new ArrayList<>(threadIds.length); for (long threadId : threadIds) { - allInfos.add(makeThreadInfoMocksHelper(mockedMXBean, threadId, cpuMultiplier)); + allInfos.add(makeThreadInfoMocksHelper(mockedMXBean, threadPrefix, threadId, cpuMultiplier)); } when(mockedMXBean.getThreadInfo(ArgumentMatchers.any(), anyInt())).thenReturn(allInfos.toArray(new ThreadInfo[0])); @@ -285,7 +291,7 @@ public void testInnerDetectCPUMode() throws Exception { long mockCurrentThreadId = 0L; when(mockedMXBean.getAllThreadIds()).thenReturn(threadIds); - List allInfos = makeThreadInfoMocksHelper(mockedMXBean, threadIds, 100_000); + List allInfos = makeThreadInfoMocksHelper(mockedMXBean, "Thread", threadIds, 100_000); List cpuOrderedInfos = List.of(allInfos.get(0), allInfos.get(1), allInfos.get(2), allInfos.get(3)); when(mockedMXBean.getThreadInfo(ArgumentMatchers.any(), anyInt())).thenReturn(cpuOrderedInfos.toArray(new ThreadInfo[0])); @@ -322,7 +328,7 @@ public void testInnerDetectCPUMode() throws Exception { assertThat(innerResult, containsString("0.0% [cpu=0.0%, other=0.0%] (0s out of 10ms) cpu usage by thread 'Thread 1'")); // Test with the legacy sort order - allInfos = makeThreadInfoMocksHelper(mockedMXBean, threadIds, 100_000); + allInfos = makeThreadInfoMocksHelper(mockedMXBean, "Thread", threadIds, 100_000); cpuOrderedInfos = List.of(allInfos.get(3), allInfos.get(2), allInfos.get(1), allInfos.get(0)); when(mockedMXBean.getThreadInfo(ArgumentMatchers.any(), anyInt())).thenReturn(cpuOrderedInfos.toArray(new ThreadInfo[0])); @@ -366,7 +372,7 @@ public void testInnerDetectWaitMode() throws Exception { .threadElementsSnapshotCount(11) .ignoreIdleThreads(false); - List allInfos = makeThreadInfoMocksHelper(mockedMXBean, threadIds); + List allInfos = makeThreadInfoMocksHelper(mockedMXBean, "Thread", threadIds); List waitOrderedInfos = List.of(allInfos.get(3), allInfos.get(1), allInfos.get(0), allInfos.get(2)); when(mockedMXBean.getThreadInfo(ArgumentMatchers.any(), anyInt())).thenReturn(waitOrderedInfos.toArray(new ThreadInfo[0])); @@ -390,7 +396,7 @@ public void testInnerDetectWaitMode() throws Exception { .threadElementsSnapshotCount(11) .ignoreIdleThreads(false); - allInfos = makeThreadInfoMocksHelper(mockedMXBean, threadIds); + allInfos = makeThreadInfoMocksHelper(mockedMXBean, "Thread", threadIds); waitOrderedInfos = List.of(allInfos.get(3), allInfos.get(1), allInfos.get(0), allInfos.get(2)); when(mockedMXBean.getThreadInfo(ArgumentMatchers.any(), anyInt())).thenReturn(waitOrderedInfos.toArray(new ThreadInfo[0])); @@ -420,7 +426,7 @@ public void testInnerDetectBlockedMode() throws Exception { .threadElementsSnapshotCount(11) .ignoreIdleThreads(false); - List allInfos = makeThreadInfoMocksHelper(mockedMXBean, threadIds); + List allInfos = makeThreadInfoMocksHelper(mockedMXBean, "Thread", threadIds); List blockOrderedInfos = List.of(allInfos.get(2), allInfos.get(0), allInfos.get(1), allInfos.get(3)); when(mockedMXBean.getThreadInfo(ArgumentMatchers.any(), anyInt())).thenReturn(blockOrderedInfos.toArray(new ThreadInfo[0])); @@ -444,7 +450,7 @@ public void testInnerDetectBlockedMode() throws Exception { .threadElementsSnapshotCount(11) .ignoreIdleThreads(false); - allInfos = makeThreadInfoMocksHelper(mockedMXBean, threadIds); + allInfos = makeThreadInfoMocksHelper(mockedMXBean, "Thread", threadIds); blockOrderedInfos = List.of(allInfos.get(2), allInfos.get(0), allInfos.get(1), allInfos.get(3)); when(mockedMXBean.getThreadInfo(ArgumentMatchers.any(), anyInt())).thenReturn(blockOrderedInfos.toArray(new ThreadInfo[0])); @@ -469,7 +475,7 @@ public void testInnerDetectMemoryMode() throws Exception { long mockCurrentThreadId = 0L; when(mockedMXBean.getAllThreadIds()).thenReturn(threadIds); - List allInfos = makeThreadInfoMocksHelper(mockedMXBean, threadIds); + List allInfos = makeThreadInfoMocksHelper(mockedMXBean, "Thread", threadIds); List cpuOrderedInfos = List.of(allInfos.get(3), allInfos.get(2), allInfos.get(1), allInfos.get(0)); when(mockedMXBean.getThreadInfo(ArgumentMatchers.any(), anyInt())).thenReturn(cpuOrderedInfos.toArray(new ThreadInfo[0])); @@ -497,7 +503,7 @@ public void testInnerDetectMemoryMode() throws Exception { // Sort order has no impact on memory mode - allInfos = makeThreadInfoMocksHelper(mockedMXBean, threadIds); + allInfos = makeThreadInfoMocksHelper(mockedMXBean, "Thread", threadIds); cpuOrderedInfos = List.of(allInfos.get(3), allInfos.get(2), allInfos.get(1), allInfos.get(0)); when(mockedMXBean.getThreadInfo(ArgumentMatchers.any(), anyInt())).thenReturn(cpuOrderedInfos.toArray(new ThreadInfo[0])); @@ -534,7 +540,7 @@ public void testInnerDetectSingleSnapshot() throws Exception { when(mockedMXBean.getAllThreadIds()).thenReturn(threadIds); // Test with only one stack to trigger the different print in innerDetect - List allInfos = makeThreadInfoMocksHelper(mockedMXBean, threadIds); + List allInfos = makeThreadInfoMocksHelper(mockedMXBean, "Thread", threadIds); List cpuOrderedInfos = List.of(allInfos.get(3), allInfos.get(2), allInfos.get(1), allInfos.get(0)); when(mockedMXBean.getThreadInfo(ArgumentMatchers.any(), anyInt())).thenReturn(cpuOrderedInfos.toArray(new ThreadInfo[0])); @@ -565,7 +571,7 @@ public void testEnsureInnerDetectSkipsCurrentThread() throws Exception { when(mockedMXBean.getAllThreadIds()).thenReturn(threadIds); - List allInfos = makeThreadInfoMocksHelper(mockedMXBean, threadIds); + List allInfos = makeThreadInfoMocksHelper(mockedMXBean, "Thread", threadIds); when(mockedMXBean.getThreadInfo(ArgumentMatchers.any(), anyInt())).thenReturn(allInfos.toArray(new ThreadInfo[0])); HotThreads hotThreads = new HotThreads().busiestThreads(4) @@ -620,7 +626,7 @@ public void testGetAllValidThreadInfos() { .ignoreIdleThreads(false); // Test the case when all threads exist before and after sleep - List allInfos = makeThreadInfoMocksHelper(mockedMXBean, threadIds); + List allInfos = makeThreadInfoMocksHelper(mockedMXBean, "Thread", threadIds); Map validInfos = hotThreads.getAllValidThreadInfos( mockedMXBean, @@ -656,7 +662,7 @@ public void testGetAllValidThreadInfos() { } // Test when a thread has terminated during sleep, we don't report that thread - allInfos = makeThreadInfoMocksHelper(mockedMXBean, threadIds); + allInfos = makeThreadInfoMocksHelper(mockedMXBean, "Thread", threadIds); validInfos = hotThreads.getAllValidThreadInfos(mockedMXBean, mockedSunThreadInfo, mockCurrentThreadId); assertEquals(allInfos.size(), validInfos.size()); @@ -730,7 +736,7 @@ public void testCaptureThreadStacks() throws InterruptedException { .ignoreIdleThreads(false); // Set up the mocks - List allInfos = makeThreadInfoMocksHelper(mockedMXBean, threadIds); + List allInfos = makeThreadInfoMocksHelper(mockedMXBean, "Thread", threadIds); long[] topThreadIds = new long[] { threadIds[threadIds.length - 1], threadIds[threadIds.length - 2] }; List topThreads = List.of(allInfos.get(threadIds.length - 1), allInfos.get(threadIds.length - 2)); @@ -751,8 +757,8 @@ public void testThreadInfoAccumulator() { ThreadMXBean mockedMXBean = mock(ThreadMXBean.class); when(mockedMXBean.isThreadCpuTimeSupported()).thenReturn(true); - ThreadInfo threadOne = makeThreadInfoMocksHelper(mockedMXBean, 1L); - ThreadInfo threadTwo = makeThreadInfoMocksHelper(mockedMXBean, 2L); + ThreadInfo threadOne = makeThreadInfoMocksHelper(mockedMXBean, "Thread", 1L); + ThreadInfo threadTwo = makeThreadInfoMocksHelper(mockedMXBean, "Thread", 2L); TimeValue maxTime = new TimeValue(1000L); @@ -792,7 +798,7 @@ public void testWaitBlockTimeMonitoringEnabled() throws Exception { long mockCurrentThreadId = 0L; when(mockedMXBean.getAllThreadIds()).thenReturn(threadIds); - List allInfos = makeThreadInfoMocksHelper(mockedMXBean, threadIds); + List allInfos = makeThreadInfoMocksHelper(mockedMXBean, "Thread", threadIds); List cpuOrderedInfos = List.of(allInfos.get(3), allInfos.get(2), allInfos.get(1), allInfos.get(0)); when(mockedMXBean.getThreadInfo(ArgumentMatchers.any(), anyInt())).thenReturn(cpuOrderedInfos.toArray(new ThreadInfo[0])); @@ -827,7 +833,7 @@ public void testGetThreadAllocatedBytesFailures() throws Exception { long mockCurrentThreadId = 0L; when(mockedMXBean.getAllThreadIds()).thenReturn(threadIds); - List allInfos = makeThreadInfoMocksHelper(mockedMXBean, threadIds); + List allInfos = makeThreadInfoMocksHelper(mockedMXBean, "Thread", threadIds); List cpuOrderedInfos = List.of(allInfos.get(3), allInfos.get(2), allInfos.get(1), allInfos.get(0)); when(mockedMXBean.getThreadInfo(ArgumentMatchers.any(), anyInt())).thenReturn(cpuOrderedInfos.toArray(new ThreadInfo[0])); @@ -843,4 +849,91 @@ public void testGetThreadAllocatedBytesFailures() throws Exception { ); assertThat(exception.getMessage(), equalTo("thread allocated memory is not supported on this JDK")); } + + public void testInnerDetectCPUModeTransportThreads() throws Exception { + ThreadMXBean mockedMXBean = makeMockMXBeanHelper(); + SunThreadInfo mockedSunThreadInfo = makeMockSunThreadInfoHelper(); + + long[] threadIds = new long[] { 1, 2, 3, 4 }; // Adds up to 10, the intervalNanos for calculating time percentages + long mockCurrentThreadId = 0L; + when(mockedMXBean.getAllThreadIds()).thenReturn(threadIds); + + List allInfos = makeThreadInfoMocksHelper(mockedMXBean, TEST_MOCK_TRANSPORT_THREAD_PREFIX, threadIds, 100_000); + List cpuOrderedInfos = List.of(allInfos.get(0), allInfos.get(1), allInfos.get(2), allInfos.get(3)); + when(mockedMXBean.getThreadInfo(ArgumentMatchers.any(), anyInt())).thenReturn(cpuOrderedInfos.toArray(new ThreadInfo[0])); + + HotThreads hotThreads = new HotThreads().busiestThreads(4) + .type(HotThreads.ReportType.CPU) + .interval(TimeValue.timeValueMillis(10)) + .threadElementsSnapshotCount(11) + .ignoreIdleThreads(false); + + String innerResult = hotThreads.innerDetect(mockedMXBean, mockedSunThreadInfo, mockCurrentThreadId, (interval) -> null); + + assertThat(innerResult, containsString("Hot threads at ")); + assertThat(innerResult, containsString("interval=10ms, busiestThreads=4, ignoreIdleThreads=false:")); + assertThat(innerResult, containsString("11/11 snapshots sharing following 2 elements")); + assertThat( + innerResult, + stringContainsInOrder( + "4.0% [cpu=4.0%, idle=56.0%] (6ms out of 10ms) cpu usage by thread '__mock_network_thread 1'", + "3.0% [cpu=3.0%, idle=67.0%] (7ms out of 10ms) cpu usage by thread '__mock_network_thread 2'", + "2.0% [cpu=2.0%, idle=78.0%] (8ms out of 10ms) cpu usage by thread '__mock_network_thread 3'", + "1.0% [cpu=1.0%, idle=89.0%] (9ms out of 10ms) cpu usage by thread '__mock_network_thread 4'" + ) + ); + assertThat(innerResult, containsString("org.elasticsearch.monitor.test.method_0(Some_File:1)")); + assertThat(innerResult, containsString("org.elasticsearch.monitor.test.method_1(Some_File:1)")); + assertThat(innerResult, containsString("org.elasticsearch.monitor.testOther.methodFinal(Some_File:1)")); + + // Let's ask again without progressing the CPU thread counters, e.g. resetting the mocks + innerResult = hotThreads.innerDetect(mockedMXBean, mockedSunThreadInfo, mockCurrentThreadId, (interval) -> null); + + assertThat( + innerResult, + containsString("0.0% [cpu=0.0%, idle=0.0%] (0s out of 10ms) cpu usage by thread '__mock_network_thread 1'") + ); + assertThat( + innerResult, + containsString("0.0% [cpu=0.0%, idle=0.0%] (0s out of 10ms) cpu usage by thread '__mock_network_thread 2'") + ); + assertThat( + innerResult, + containsString("0.0% [cpu=0.0%, idle=0.0%] (0s out of 10ms) cpu usage by thread '__mock_network_thread 3'") + ); + assertThat( + innerResult, + containsString("0.0% [cpu=0.0%, idle=0.0%] (0s out of 10ms) cpu usage by thread '__mock_network_thread 4'") + ); + + // Test with the legacy sort order + allInfos = makeThreadInfoMocksHelper(mockedMXBean, TEST_MOCK_TRANSPORT_THREAD_PREFIX, threadIds, 100_000); + cpuOrderedInfos = List.of(allInfos.get(3), allInfos.get(2), allInfos.get(1), allInfos.get(0)); + when(mockedMXBean.getThreadInfo(ArgumentMatchers.any(), anyInt())).thenReturn(cpuOrderedInfos.toArray(new ThreadInfo[0])); + + hotThreads = new HotThreads().busiestThreads(4) + .type(HotThreads.ReportType.CPU) + .interval(TimeValue.timeValueMillis(10)) + .sortOrder(HotThreads.SortOrder.CPU) + .threadElementsSnapshotCount(11) + .ignoreIdleThreads(false); + + innerResult = hotThreads.innerDetect(mockedMXBean, mockedSunThreadInfo, mockCurrentThreadId, (interval) -> null); + + assertThat(innerResult, containsString("Hot threads at ")); + assertThat(innerResult, containsString("interval=10ms, busiestThreads=4, ignoreIdleThreads=false:")); + assertThat(innerResult, containsString("11/11 snapshots sharing following 2 elements")); + assertThat( + innerResult, + stringContainsInOrder( + "4.0% [cpu=4.0%, idle=56.0%] (6ms out of 10ms) cpu usage by thread '__mock_network_thread 4'", + "3.0% [cpu=3.0%, idle=67.0%] (7ms out of 10ms) cpu usage by thread '__mock_network_thread 3'", + "2.0% [cpu=2.0%, idle=78.0%] (8ms out of 10ms) cpu usage by thread '__mock_network_thread 2'", + "1.0% [cpu=1.0%, idle=89.0%] (9ms out of 10ms) cpu usage by thread '__mock_network_thread 1'" + ) + ); + assertThat(innerResult, containsString("org.elasticsearch.monitor.test.method_0(Some_File:1)")); + assertThat(innerResult, containsString("org.elasticsearch.monitor.test.method_1(Some_File:1)")); + assertThat(innerResult, containsString("org.elasticsearch.monitor.testOther.methodFinal(Some_File:1)")); + } }