From edf8db8a1a843a9c4ffb75b7f307e2cb85463a0f Mon Sep 17 00:00:00 2001 From: Tao Yang Date: Fri, 25 Oct 2024 12:50:14 +0800 Subject: [PATCH 1/3] [YARN-10058] Handle uncaught exception for async-scheduling threads to prevent scheduler hangs. --- .../scheduler/capacity/CapacityScheduler.java | 6 ++- .../TestRMHAForAsyncScheduler.java | 47 +++++++++++++++++++ .../TestCapacitySchedulerAsyncScheduling.java | 33 +++++++++++++ .../scheduler/capacity/TestUtils.java | 23 +++++++++ 4 files changed, 108 insertions(+), 1 deletion(-) diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java index 310f9ece34309..805a87e1bd048 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java @@ -36,6 +36,7 @@ import java.util.concurrent.atomic.AtomicBoolean; import org.apache.commons.lang3.StringUtils; +import org.apache.hadoop.yarn.server.resourcemanager.RMCriticalThreadUncaughtExceptionHandler; import org.apache.hadoop.yarn.server.resourcemanager.placement.ApplicationPlacementContext; import org.apache.hadoop.yarn.server.resourcemanager.placement.CSMappingPlacementRule; import org.apache.hadoop.yarn.server.resourcemanager.placement.PlacementFactory; @@ -3543,7 +3544,10 @@ static class AsyncSchedulingConfiguration { this.asyncSchedulerThreads = new ArrayList<>(); for (int i = 0; i < maxAsyncSchedulingThreads; i++) { - asyncSchedulerThreads.add(new AsyncScheduleThread(cs)); + AsyncScheduleThread ast = new AsyncScheduleThread(cs); + ast.setUncaughtExceptionHandler( + new RMCriticalThreadUncaughtExceptionHandler(cs.rmContext)); + asyncSchedulerThreads.add(ast); } this.resourceCommitterService = new ResourceCommitterService(cs); } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMHAForAsyncScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMHAForAsyncScheduler.java index a609d8458e849..5a8f3243dfad3 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMHAForAsyncScheduler.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMHAForAsyncScheduler.java @@ -29,6 +29,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerConfiguration; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.TestCapacitySchedulerAsyncScheduling; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.TestUtils; import org.apache.hadoop.yarn.util.resource.DominantResourceCalculator; import org.apache.hadoop.yarn.util.resource.ResourceCalculator; import org.junit.Assert; @@ -135,6 +136,52 @@ public void testAsyncScheduleThreadStateAfterRMHATransit() throws Exception { rm2.stop(); } + @Test(timeout = 30000) + public void testAsyncScheduleThreadExit() throws Exception { + // start two RMs, and transit rm1 to active, rm2 to standby + startRMs(); + // register NM + rm1.registerNode("192.1.1.1:1234", 8192, 8); + rm1.drainEvents(); + + // test async-scheduling thread exit + try{ + // set resource calculator to be null to simulate + // NPE in async-scheduling thread + CapacityScheduler cs = + (CapacityScheduler) rm1.getRMContext().getScheduler(); + cs.setResourceCalculator(null); + checkAsyncSchedulerThreads(Thread.currentThread()); + + // wait for RM to be shutdown until timeout + boolean done = TestUtils.waitForUntilTimeout( + () -> rm1.getRMContext().getHAServiceState() + == HAServiceProtocol.HAServiceState.STANDBY, 100, 5000); + Assert.assertTrue( + "RM1 should be transitioned to standby, but got state: " + + rm1.getRMContext().getHAServiceState(), done); + + // failover RM2 to RM1 + HAServiceProtocol.StateChangeRequestInfo requestInfo = + new HAServiceProtocol.StateChangeRequestInfo( + HAServiceProtocol.RequestSource.REQUEST_BY_USER); + rm2.adminService.transitionToStandby(requestInfo); + rm1.adminService.transitionToActive(requestInfo); + done = TestUtils.waitForUntilTimeout( + () -> rm1.getRMContext().getHAServiceState() + == HAServiceProtocol.HAServiceState.ACTIVE, 100, 5000); + Assert.assertTrue( + "RM1 should be transitioned to active, but got state: " + + rm1.getRMContext().getHAServiceState(), done); + + // make sure async-scheduling thread is correct after failover + checkAsyncSchedulerThreads(Thread.currentThread()); + } finally { + rm1.stop(); + rm2.stop(); + } + } + private RMApp submitAppAndCheckLaunched(MockRM rm) throws Exception { MockRMAppSubmissionData data = MockRMAppSubmissionData.Builder.createWithMemory(200, rm) diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacitySchedulerAsyncScheduling.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacitySchedulerAsyncScheduling.java index b36e0edc73503..4dc76ce796ff4 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacitySchedulerAsyncScheduling.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacitySchedulerAsyncScheduling.java @@ -67,6 +67,7 @@ import org.junit.Assert; import org.junit.Before; import org.junit.Test; +import org.junit.contrib.java.lang.system.internal.NoExitSecurityManager; import org.mockito.Mockito; import org.mockito.invocation.InvocationOnMock; import org.mockito.stubbing.Answer; @@ -1072,6 +1073,38 @@ public Boolean answer(InvocationOnMock invocation) throws Exception { rm.stop(); } + @Test(timeout = 30000) + public void testAsyncScheduleThreadExit() throws Exception { + // init RM & NM + final MockRM rm = new MockRM(conf); + rm.start(); + rm.registerNode("192.168.0.1:1234", 8 * GB); + rm.drainEvents(); + + // Set no exit security manager to catch System.exit + SecurityManager originalSecurityManager = System.getSecurityManager(); + NoExitSecurityManager noExitSecurityManager = + new NoExitSecurityManager(originalSecurityManager); + System.setSecurityManager(noExitSecurityManager); + + // test async-scheduling thread exit + try{ + // set resource calculator to be null to simulate + // NPE in async-scheduling thread + CapacityScheduler cs = + (CapacityScheduler) rm.getRMContext().getScheduler(); + cs.setResourceCalculator(null); + + // wait for RM to be shutdown until timeout + boolean done = TestUtils.waitForUntilTimeout( + noExitSecurityManager::isCheckExitCalled, 100, 5000); + Assert.assertTrue("RM should be shut down, but nothing happened", done); + } finally { + System.setSecurityManager(originalSecurityManager); + rm.stop(); + } + } + private ResourceCommitRequest createAllocateFromReservedProposal( int containerId, Resource allocateResource, FiCaSchedulerApp schedulerApp, SchedulerNode allocateNode, SchedulerNode reservedNode, diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestUtils.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestUtils.java index 06600e8710b36..94c03062b1e89 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestUtils.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestUtils.java @@ -54,6 +54,7 @@ import java.io.IOException; import java.util.Map; import java.util.Set; +import java.util.function.Supplier; import static org.mockito.ArgumentMatchers.any; import static org.mockito.Mockito.doReturn; @@ -486,4 +487,26 @@ public FiCaSchedulerApp getApplicationAttempt( cs.submitResourceCommitRequest(clusterResource, csAssignment); } + + /** + * Wait until the condition is met or timeout. + * @param condition condition to check + * @param intervalMs interval to check the condition + * @param timeoutMs timeout + * @return true if the condition is met before timeout, false otherwise + * @throws InterruptedException + */ + public static boolean waitForUntilTimeout(Supplier condition, + long intervalMs, long timeoutMs) throws InterruptedException { + long startTime = System.currentTimeMillis(); + while (!condition.get()) { + long elapsedTime = System.currentTimeMillis() - startTime; + if (elapsedTime > timeoutMs) { + return false; + } + long remainingTime = timeoutMs - elapsedTime; + Thread.sleep(Math.min(intervalMs, remainingTime > 0 ? remainingTime : 0)); + } + return true; + } } From 8bcfd739519057a86e27b62ea2536fff90e1d6b4 Mon Sep 17 00:00:00 2001 From: Tao Yang Date: Tue, 5 Nov 2024 10:10:47 +0800 Subject: [PATCH 2/3] Imporve UT: use GenericTestUtils#waitFor and avoid intermittent failure. --- .../TestRMHAForAsyncScheduler.java | 38 +++++++++++-------- .../TestCapacitySchedulerAsyncScheduling.java | 6 +-- .../scheduler/capacity/TestUtils.java | 23 ----------- 3 files changed, 25 insertions(+), 42 deletions(-) diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMHAForAsyncScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMHAForAsyncScheduler.java index 5a8f3243dfad3..7de060ae3fd19 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMHAForAsyncScheduler.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMHAForAsyncScheduler.java @@ -20,6 +20,7 @@ import org.apache.hadoop.ha.HAServiceProtocol; import org.apache.hadoop.security.UserGroupInformation; +import org.apache.hadoop.test.GenericTestUtils; import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppState; @@ -144,6 +145,9 @@ public void testAsyncScheduleThreadExit() throws Exception { rm1.registerNode("192.1.1.1:1234", 8192, 8); rm1.drainEvents(); + // make sure async-scheduling thread is correct at beginning + checkAsyncSchedulerThreads(Thread.currentThread()); + // test async-scheduling thread exit try{ // set resource calculator to be null to simulate @@ -151,28 +155,30 @@ public void testAsyncScheduleThreadExit() throws Exception { CapacityScheduler cs = (CapacityScheduler) rm1.getRMContext().getScheduler(); cs.setResourceCalculator(null); - checkAsyncSchedulerThreads(Thread.currentThread()); - // wait for RM to be shutdown until timeout - boolean done = TestUtils.waitForUntilTimeout( - () -> rm1.getRMContext().getHAServiceState() - == HAServiceProtocol.HAServiceState.STANDBY, 100, 5000); - Assert.assertTrue( - "RM1 should be transitioned to standby, but got state: " - + rm1.getRMContext().getHAServiceState(), done); + // wait for rm1 to be transitioned to standby + GenericTestUtils.waitFor(() -> rm1.getRMContext().getHAServiceState() + == HAServiceProtocol.HAServiceState.STANDBY, 100, 5000); - // failover RM2 to RM1 + // failover rm2 to rm1 HAServiceProtocol.StateChangeRequestInfo requestInfo = new HAServiceProtocol.StateChangeRequestInfo( HAServiceProtocol.RequestSource.REQUEST_BY_USER); rm2.adminService.transitionToStandby(requestInfo); - rm1.adminService.transitionToActive(requestInfo); - done = TestUtils.waitForUntilTimeout( - () -> rm1.getRMContext().getHAServiceState() - == HAServiceProtocol.HAServiceState.ACTIVE, 100, 5000); - Assert.assertTrue( - "RM1 should be transitioned to active, but got state: " - + rm1.getRMContext().getHAServiceState(), done); + GenericTestUtils.waitFor(() -> { + try { + // this call may fail when rm1 is still initializing + // in StandByTransitionRunnable thread + rm1.adminService.transitionToActive(requestInfo); + return true; + } catch (Exception e) { + return false; + } + }, 100, 3000); + + // wait for rm1 to be transitioned to active again + GenericTestUtils.waitFor(() -> rm1.getRMContext().getHAServiceState() + == HAServiceProtocol.HAServiceState.ACTIVE, 100, 5000); // make sure async-scheduling thread is correct after failover checkAsyncSchedulerThreads(Thread.currentThread()); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacitySchedulerAsyncScheduling.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacitySchedulerAsyncScheduling.java index 4dc76ce796ff4..7baa6b2c7761f 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacitySchedulerAsyncScheduling.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacitySchedulerAsyncScheduling.java @@ -18,6 +18,7 @@ package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity; +import org.apache.hadoop.test.GenericTestUtils; import org.apache.hadoop.thirdparty.com.google.common.collect.ImmutableList; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.yarn.api.records.Container; @@ -1096,9 +1097,8 @@ public void testAsyncScheduleThreadExit() throws Exception { cs.setResourceCalculator(null); // wait for RM to be shutdown until timeout - boolean done = TestUtils.waitForUntilTimeout( - noExitSecurityManager::isCheckExitCalled, 100, 5000); - Assert.assertTrue("RM should be shut down, but nothing happened", done); + GenericTestUtils.waitFor(noExitSecurityManager::isCheckExitCalled, + 100, 5000); } finally { System.setSecurityManager(originalSecurityManager); rm.stop(); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestUtils.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestUtils.java index 94c03062b1e89..06600e8710b36 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestUtils.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestUtils.java @@ -54,7 +54,6 @@ import java.io.IOException; import java.util.Map; import java.util.Set; -import java.util.function.Supplier; import static org.mockito.ArgumentMatchers.any; import static org.mockito.Mockito.doReturn; @@ -487,26 +486,4 @@ public FiCaSchedulerApp getApplicationAttempt( cs.submitResourceCommitRequest(clusterResource, csAssignment); } - - /** - * Wait until the condition is met or timeout. - * @param condition condition to check - * @param intervalMs interval to check the condition - * @param timeoutMs timeout - * @return true if the condition is met before timeout, false otherwise - * @throws InterruptedException - */ - public static boolean waitForUntilTimeout(Supplier condition, - long intervalMs, long timeoutMs) throws InterruptedException { - long startTime = System.currentTimeMillis(); - while (!condition.get()) { - long elapsedTime = System.currentTimeMillis() - startTime; - if (elapsedTime > timeoutMs) { - return false; - } - long remainingTime = timeoutMs - elapsedTime; - Thread.sleep(Math.min(intervalMs, remainingTime > 0 ? remainingTime : 0)); - } - return true; - } } From 806cddeb31efa9c2753982646a8619275e2addc2 Mon Sep 17 00:00:00 2001 From: Tao Yang Date: Mon, 9 Dec 2024 11:14:51 +0800 Subject: [PATCH 3/3] Fix checkstyle issue. --- .../yarn/server/resourcemanager/TestRMHAForAsyncScheduler.java | 1 - 1 file changed, 1 deletion(-) diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMHAForAsyncScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMHAForAsyncScheduler.java index 7de060ae3fd19..1da9a61d1dcdf 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMHAForAsyncScheduler.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMHAForAsyncScheduler.java @@ -30,7 +30,6 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerConfiguration; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.TestCapacitySchedulerAsyncScheduling; -import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.TestUtils; import org.apache.hadoop.yarn.util.resource.DominantResourceCalculator; import org.apache.hadoop.yarn.util.resource.ResourceCalculator; import org.junit.Assert;