Skip to content

YARN-10058. Handle uncaught exception for async-scheduling threads to prevent scheduler hangs #7129

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 3 commits into from
Dec 9, 2024
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
import java.util.concurrent.atomic.AtomicBoolean;

import org.apache.commons.lang3.StringUtils;
import org.apache.hadoop.yarn.server.resourcemanager.RMCriticalThreadUncaughtExceptionHandler;
import org.apache.hadoop.yarn.server.resourcemanager.placement.ApplicationPlacementContext;
import org.apache.hadoop.yarn.server.resourcemanager.placement.CSMappingPlacementRule;
import org.apache.hadoop.yarn.server.resourcemanager.placement.PlacementFactory;
Expand Down Expand Up @@ -3543,7 +3544,10 @@ static class AsyncSchedulingConfiguration {

this.asyncSchedulerThreads = new ArrayList<>();
for (int i = 0; i < maxAsyncSchedulingThreads; i++) {
asyncSchedulerThreads.add(new AsyncScheduleThread(cs));
AsyncScheduleThread ast = new AsyncScheduleThread(cs);
ast.setUncaughtExceptionHandler(
new RMCriticalThreadUncaughtExceptionHandler(cs.rmContext));
asyncSchedulerThreads.add(ast);
}
this.resourceCommitterService = new ResourceCommitterService(cs);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerConfiguration;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.TestCapacitySchedulerAsyncScheduling;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.TestUtils;
import org.apache.hadoop.yarn.util.resource.DominantResourceCalculator;
import org.apache.hadoop.yarn.util.resource.ResourceCalculator;
import org.junit.Assert;
Expand Down Expand Up @@ -135,6 +136,52 @@ public void testAsyncScheduleThreadStateAfterRMHATransit() throws Exception {
rm2.stop();
}

@Test(timeout = 30000)
public void testAsyncScheduleThreadExit() throws Exception {
// start two RMs, and transit rm1 to active, rm2 to standby
startRMs();
// register NM
rm1.registerNode("192.1.1.1:1234", 8192, 8);
rm1.drainEvents();

// test async-scheduling thread exit
try{
// set resource calculator to be null to simulate
// NPE in async-scheduling thread
CapacityScheduler cs =
(CapacityScheduler) rm1.getRMContext().getScheduler();
cs.setResourceCalculator(null);
checkAsyncSchedulerThreads(Thread.currentThread());

// wait for RM to be shutdown until timeout
boolean done = TestUtils.waitForUntilTimeout(
() -> rm1.getRMContext().getHAServiceState()
== HAServiceProtocol.HAServiceState.STANDBY, 100, 5000);
Assert.assertTrue(
"RM1 should be transitioned to standby, but got state: "
+ rm1.getRMContext().getHAServiceState(), done);

// failover RM2 to RM1
HAServiceProtocol.StateChangeRequestInfo requestInfo =
new HAServiceProtocol.StateChangeRequestInfo(
HAServiceProtocol.RequestSource.REQUEST_BY_USER);
rm2.adminService.transitionToStandby(requestInfo);
rm1.adminService.transitionToActive(requestInfo);
done = TestUtils.waitForUntilTimeout(
() -> rm1.getRMContext().getHAServiceState()
== HAServiceProtocol.HAServiceState.ACTIVE, 100, 5000);
Assert.assertTrue(
"RM1 should be transitioned to active, but got state: "
+ rm1.getRMContext().getHAServiceState(), done);

// make sure async-scheduling thread is correct after failover
checkAsyncSchedulerThreads(Thread.currentThread());
} finally {
rm1.stop();
rm2.stop();
}
}

private RMApp submitAppAndCheckLaunched(MockRM rm) throws Exception {
MockRMAppSubmissionData data =
MockRMAppSubmissionData.Builder.createWithMemory(200, rm)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,7 @@
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.junit.contrib.java.lang.system.internal.NoExitSecurityManager;
import org.mockito.Mockito;
import org.mockito.invocation.InvocationOnMock;
import org.mockito.stubbing.Answer;
Expand Down Expand Up @@ -1072,6 +1073,38 @@ public Boolean answer(InvocationOnMock invocation) throws Exception {
rm.stop();
}

@Test(timeout = 30000)
public void testAsyncScheduleThreadExit() throws Exception {
// init RM & NM
final MockRM rm = new MockRM(conf);
rm.start();
rm.registerNode("192.168.0.1:1234", 8 * GB);
rm.drainEvents();

// Set no exit security manager to catch System.exit
SecurityManager originalSecurityManager = System.getSecurityManager();
NoExitSecurityManager noExitSecurityManager =
new NoExitSecurityManager(originalSecurityManager);
System.setSecurityManager(noExitSecurityManager);

// test async-scheduling thread exit
try{
// set resource calculator to be null to simulate
// NPE in async-scheduling thread
CapacityScheduler cs =
(CapacityScheduler) rm.getRMContext().getScheduler();
cs.setResourceCalculator(null);

// wait for RM to be shutdown until timeout
boolean done = TestUtils.waitForUntilTimeout(
noExitSecurityManager::isCheckExitCalled, 100, 5000);
Assert.assertTrue("RM should be shut down, but nothing happened", done);
} finally {
System.setSecurityManager(originalSecurityManager);
rm.stop();
}
}

private ResourceCommitRequest createAllocateFromReservedProposal(
int containerId, Resource allocateResource, FiCaSchedulerApp schedulerApp,
SchedulerNode allocateNode, SchedulerNode reservedNode,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@
import java.io.IOException;
import java.util.Map;
import java.util.Set;
import java.util.function.Supplier;

import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.doReturn;
Expand Down Expand Up @@ -486,4 +487,26 @@ public FiCaSchedulerApp getApplicationAttempt(
cs.submitResourceCommitRequest(clusterResource,
csAssignment);
}

/**
* Wait until the condition is met or timeout.
* @param condition condition to check
* @param intervalMs interval to check the condition
* @param timeoutMs timeout
* @return true if the condition is met before timeout, false otherwise
* @throws InterruptedException
*/
public static boolean waitForUntilTimeout(Supplier<Boolean> condition,
long intervalMs, long timeoutMs) throws InterruptedException {
long startTime = System.currentTimeMillis();
while (!condition.get()) {
long elapsedTime = System.currentTimeMillis() - startTime;
if (elapsedTime > timeoutMs) {
return false;
}
long remainingTime = timeoutMs - elapsedTime;
Thread.sleep(Math.min(intervalMs, remainingTime > 0 ? remainingTime : 0));
}
return true;
}
}
Loading