Skip to content

Commit 13a46ab

Browse files
committed
Provide method for stopping Batch 5 Jobs upon user request
In the previous release of SCDF we used the JsrJobOperator to stop job executions. The 2 stages of stopping jobs is as follows: 1) Sets the Batch Status of the job execution to STOPPING. This signals to Spring Batch to stop execution at the next step. 2) If the Job is a StepLocator it will go through each of the StoppableTasklets and stop them. So when using Batch 4.x it was just a quick check of the JobRegistry to retrieve the job, which was always empty since SCDF never deals with Jobs directly. But with Batch 5.x they loaded the Job Registry and attempted to retrieve the Job in a different way using the SimpleJobOperator. In the updated solution, SCDF doesn't use the SimpleJobOperator since SCDF doesn't have access to the StepLocator for the Job, nor does it use the JobRegistry and even if it did, it would always be empty. w * Modify stopAll so that it calls stop() instead of using its own logic * Add Test for stopAll Updated based on code review Extract job stop code from stop(long) and place into stopJobExecution method. The stopJobExecution method will be used by stop(long) and stopAll. Updated Based on Code Review Moved Job Stop Code from assert methods to the tests. Updated the JobExecution Create methods so that they verify that the job is not Stopping
1 parent df26691 commit 13a46ab

File tree

2 files changed

+36
-15
lines changed

2 files changed

+36
-15
lines changed

spring-cloud-dataflow-server-core/src/main/java/org/springframework/cloud/dataflow/server/batch/SimpleJobService.java

Lines changed: 11 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -225,9 +225,7 @@ public int stopAll() {
225225
Collection<JobExecution> result = jobExecutionDao.getRunningJobExecutions();
226226
for (JobExecution jobExecution : result) {
227227
try {
228-
jobExecution.getStepExecutions().forEach(StepExecution::setTerminateOnly);
229-
jobExecution.setStatus( BatchStatus.STOPPING);
230-
jobRepository.update(jobExecution);
228+
stopJobExecution(jobExecution);
231229
} catch (Exception e) {
232230
throw new IllegalArgumentException("The following JobExecutionId was not found: " + jobExecution.getId(), e);
233231
}
@@ -238,14 +236,19 @@ public int stopAll() {
238236

239237
@Override
240238
public JobExecution stop(Long jobExecutionId) throws NoSuchJobExecutionException, JobExecutionNotRunningException {
241-
JobExecution jobExecution = getJobExecution(jobExecutionId);
239+
return stopJobExecution(getJobExecution(jobExecutionId));
240+
}
241+
242+
private JobExecution stopJobExecution(JobExecution jobExecution) throws JobExecutionNotRunningException{
242243
if (!jobExecution.isRunning()) {
243244
throw new JobExecutionNotRunningException("JobExecution is not running and therefore cannot be stopped");
244245
}
245-
246-
logger.info("Stopping job execution: " + jobExecution);
247-
248-
jobExecution.setStatus(BatchStatus.STOPPED);
246+
// Indicate the execution should be stopped by setting it's status to
247+
// 'STOPPING'. It is assumed that
248+
// the step implementation will check this status at chunk boundaries.
249+
logger.info("Stopping job execution: {}", jobExecution);
250+
jobExecution.getStepExecutions().forEach(StepExecution::setTerminateOnly);
251+
jobExecution.setStatus(BatchStatus.STOPPING);
249252
jobRepository.update(jobExecution);
250253
return jobExecution;
251254

spring-cloud-dataflow-server-core/src/test/java/org/springframework/cloud/dataflow/server/batch/AbstractSimpleJobServiceTests.java

Lines changed: 25 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,8 @@
2828
import javax.sql.DataSource;
2929

3030
import org.junit.jupiter.api.Test;
31+
import org.springframework.batch.core.launch.JobExecutionNotRunningException;
32+
import org.springframework.batch.core.launch.NoSuchJobExecutionException;
3133
import org.testcontainers.containers.JdbcDatabaseContainer;
3234

3335
import org.springframework.batch.core.BatchStatus;
@@ -191,13 +193,25 @@ void exceptionsShouldBeThrownIfRequestForNonExistingJobInstance() {
191193

192194
@Test
193195
void stoppingJobExecutionShouldLeaveJobExecutionWithStatusOfStopping() throws Exception {
194-
JobExecution jobExecution = createJobExecution(BASE_JOB_INST_NAME,true);
195-
jobExecution = jobService.getJobExecution(jobExecution.getId());
196-
assertThat(jobExecution.isRunning()).isTrue();
197-
assertThat(jobExecution.getStatus()).isNotEqualTo(BatchStatus.STOPPING);
196+
JobExecution jobExecution = createRunningJobExecution(BASE_JOB_INST_NAME);
198197
jobService.stop(jobExecution.getId());
198+
assertJobHasStopped(jobExecution);
199+
}
200+
201+
@Test
202+
void stoppingAllJobExecutionsShouldLeaveJobExecutionsWithStatusOfStopping() throws Exception {
203+
JobExecution jobExecutionOne = createRunningJobExecution(BASE_JOB_INST_NAME);
204+
JobExecution jobExecutionTwo = createRunningJobExecution(BASE_JOB_INST_NAME+"_TWO");
205+
jobService.stop(jobExecutionOne.getId());
206+
assertJobHasStopped(jobExecutionOne);
207+
jobService.stop(jobExecutionTwo.getId());
208+
assertJobHasStopped(jobExecutionTwo);
209+
}
210+
211+
private void assertJobHasStopped(JobExecution jobExecution) throws NoSuchJobExecutionException, JobExecutionNotRunningException {
199212
jobExecution = jobService.getJobExecution(jobExecution.getId());
200-
assertThat(jobExecution.getStatus()).isEqualTo(BatchStatus.STOPPED);
213+
assertThat(jobExecution.getStatus()).isEqualTo(BatchStatus.STOPPING);
214+
assertThat(jobExecution.isRunning()).isTrue();
201215
}
202216

203217
private void verifyJobInstance(long id, String name) throws Exception {
@@ -221,9 +235,13 @@ private JobExecution createJobExecution(String name) throws Exception {
221235
return createJobExecution(name, BatchStatus.STARTING, false);
222236
}
223237

224-
private JobExecution createJobExecution(String name, boolean isRunning)
238+
private JobExecution createRunningJobExecution(String name)
225239
throws Exception {
226-
return createJobExecution(name, BatchStatus.STARTING, isRunning);
240+
JobExecution jobExecution = createJobExecution(name, BatchStatus.STARTING, true);
241+
jobExecution = jobService.getJobExecution(jobExecution.getId());
242+
assertThat(jobExecution.isRunning()).isTrue();
243+
assertThat(jobExecution.getStatus()).isNotEqualTo(BatchStatus.STOPPING);
244+
return jobExecution;
227245
}
228246

229247
private JobExecution createJobExecution(String name, BatchStatus batchStatus, boolean isRunning) throws Exception {

0 commit comments

Comments
 (0)