diff --git a/spring-cloud-dataflow-core/src/main/java/org/springframework/cloud/dataflow/core/ThinTaskExecution.java b/spring-cloud-dataflow-core/src/main/java/org/springframework/cloud/dataflow/core/ThinTaskExecution.java new file mode 100644 index 0000000000..2bfbf11cd4 --- /dev/null +++ b/spring-cloud-dataflow-core/src/main/java/org/springframework/cloud/dataflow/core/ThinTaskExecution.java @@ -0,0 +1,54 @@ +/* + * Copyright 2024 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.springframework.cloud.dataflow.core; + +import java.time.LocalDateTime; +import java.util.List; + +import org.springframework.cloud.task.repository.TaskExecution; + +/** + * Overrides TaskExecution class to provide CTR status required. + * @author Corneil du Plessis + */ +public class ThinTaskExecution extends TaskExecution { + private String ctrTaskStatus; + + public ThinTaskExecution() { + } + public ThinTaskExecution(TaskExecution taskExecution) { + super(taskExecution.getExecutionId(), taskExecution.getExitCode(), taskExecution.getTaskName(), taskExecution.getStartTime(), taskExecution.getEndTime(), taskExecution.getExitMessage(), taskExecution.getArguments(), taskExecution.getErrorMessage(), taskExecution.getExternalExecutionId(), taskExecution.getParentExecutionId()); + } + public ThinTaskExecution(TaskExecution taskExecution, String ctrTaskStatus) { + super(taskExecution.getExecutionId(), taskExecution.getExitCode(), taskExecution.getTaskName(), taskExecution.getStartTime(), taskExecution.getEndTime(), taskExecution.getExitMessage(), taskExecution.getArguments(), taskExecution.getErrorMessage(), taskExecution.getExternalExecutionId(), taskExecution.getParentExecutionId()); + this.ctrTaskStatus = ctrTaskStatus; + } + public ThinTaskExecution(long executionId, Integer exitCode, String taskName, LocalDateTime startTime, LocalDateTime endTime, String exitMessage, List arguments, String errorMessage, String externalExecutionId, Long parentExecutionId) { + super(executionId, exitCode, taskName, startTime, endTime, exitMessage, arguments, errorMessage, externalExecutionId, parentExecutionId); + } + + public ThinTaskExecution(long executionId, Integer exitCode, String taskName, LocalDateTime startTime, LocalDateTime endTime, String exitMessage, List arguments, String errorMessage, String externalExecutionId) { + super(executionId, exitCode, taskName, startTime, endTime, exitMessage, arguments, errorMessage, externalExecutionId); + } + + public String getCtrTaskStatus() { + return ctrTaskStatus; + } + + public void setCtrTaskStatus(String ctrTaskStatus) { + this.ctrTaskStatus = ctrTaskStatus; + } +} diff --git a/spring-cloud-dataflow-rest-resource/src/main/java/org/springframework/cloud/dataflow/rest/resource/TaskExecutionThinResource.java b/spring-cloud-dataflow-rest-resource/src/main/java/org/springframework/cloud/dataflow/rest/resource/TaskExecutionThinResource.java index 52d34691cc..95c063a8a6 100644 --- a/spring-cloud-dataflow-rest-resource/src/main/java/org/springframework/cloud/dataflow/rest/resource/TaskExecutionThinResource.java +++ b/spring-cloud-dataflow-rest-resource/src/main/java/org/springframework/cloud/dataflow/rest/resource/TaskExecutionThinResource.java @@ -17,7 +17,7 @@ import java.time.LocalDateTime; -import org.springframework.cloud.task.repository.TaskExecution; +import org.springframework.cloud.dataflow.core.ThinTaskExecution; import org.springframework.hateoas.PagedModel; import org.springframework.hateoas.RepresentationModel; @@ -66,15 +66,14 @@ public class TaskExecutionThinResource extends RepresentationModel { } } diff --git a/spring-cloud-dataflow-server-core/src/main/java/org/springframework/cloud/dataflow/server/controller/TaskExecutionThinController.java b/spring-cloud-dataflow-server-core/src/main/java/org/springframework/cloud/dataflow/server/controller/TaskExecutionThinController.java index 4bfd137402..baec4f3480 100644 --- a/spring-cloud-dataflow-server-core/src/main/java/org/springframework/cloud/dataflow/server/controller/TaskExecutionThinController.java +++ b/spring-cloud-dataflow-server-core/src/main/java/org/springframework/cloud/dataflow/server/controller/TaskExecutionThinController.java @@ -16,9 +16,12 @@ package org.springframework.cloud.dataflow.server.controller; +import org.springframework.cloud.dataflow.core.ThinTaskExecution; import org.springframework.cloud.dataflow.rest.resource.TaskExecutionThinResource; import org.springframework.cloud.dataflow.server.task.DataflowTaskExplorer; import org.springframework.cloud.task.repository.TaskExecution; +import org.springframework.data.domain.Page; +import org.springframework.data.domain.PageImpl; import org.springframework.data.domain.Pageable; import org.springframework.data.web.PagedResourcesAssembler; import org.springframework.hateoas.PagedModel; @@ -47,21 +50,24 @@ public class TaskExecutionThinController { public TaskExecutionThinController(DataflowTaskExplorer explorer) { this.explorer = explorer; - this.resourceAssembler = new TaskExecutionThinResourceAssembler(); + this.resourceAssembler = new TaskExecutionThinResourceAssembler(); } @GetMapping(produces = "application/json") @ResponseStatus(HttpStatus.OK) - public PagedModel listTasks(Pageable pageable, PagedResourcesAssembler pagedAssembler) { - return pagedAssembler.toModel(explorer.findAll(pageable), resourceAssembler); + public PagedModel listTasks(Pageable pageable, PagedResourcesAssembler pagedAssembler) { + Page page = explorer.findAll(pageable); + Page thinTaskExecutions = new PageImpl<>(page.stream().map(ThinTaskExecution::new).toList(), pageable, page.getTotalElements()); + explorer.populateCtrStatus(thinTaskExecutions.getContent()); + return pagedAssembler.toModel(thinTaskExecutions, resourceAssembler); } - static class TaskExecutionThinResourceAssembler extends RepresentationModelAssemblerSupport { + static class TaskExecutionThinResourceAssembler extends RepresentationModelAssemblerSupport { public TaskExecutionThinResourceAssembler() { super(TaskExecutionThinController.class, TaskExecutionThinResource.class); } @Override - public TaskExecutionThinResource toModel(TaskExecution entity) { + public TaskExecutionThinResource toModel(ThinTaskExecution entity) { TaskExecutionThinResource resource = new TaskExecutionThinResource(entity); resource.add(linkTo(methodOn(TaskExecutionController.class).view(resource.getExecutionId())).withSelfRel()); resource.add(linkTo(methodOn(TaskDefinitionController.class).display(resource.getTaskName(), true)).withRel("tasks/definitions")); diff --git a/spring-cloud-dataflow-server-core/src/main/java/org/springframework/cloud/dataflow/server/task/DataflowTaskExecutionQueryDao.java b/spring-cloud-dataflow-server-core/src/main/java/org/springframework/cloud/dataflow/server/task/DataflowTaskExecutionQueryDao.java index 4e36367e50..c5f0445a62 100644 --- a/spring-cloud-dataflow-server-core/src/main/java/org/springframework/cloud/dataflow/server/task/DataflowTaskExecutionQueryDao.java +++ b/spring-cloud-dataflow-server-core/src/main/java/org/springframework/cloud/dataflow/server/task/DataflowTaskExecutionQueryDao.java @@ -19,6 +19,7 @@ import java.util.Date; import java.util.List; +import org.springframework.cloud.dataflow.core.ThinTaskExecution; import org.springframework.cloud.task.repository.TaskExecution; import org.springframework.cloud.task.repository.dao.TaskExecutionDao; import org.springframework.data.domain.Page; @@ -166,4 +167,5 @@ public interface DataflowTaskExecutionQueryDao { TaskExecution geTaskExecutionByExecutionId(String executionId, String taskName); + void populateCtrStatus(Collection thinTaskExecutions); } diff --git a/spring-cloud-dataflow-server-core/src/main/java/org/springframework/cloud/dataflow/server/task/DataflowTaskExplorer.java b/spring-cloud-dataflow-server-core/src/main/java/org/springframework/cloud/dataflow/server/task/DataflowTaskExplorer.java index 22e68c7a60..12e89f3ffb 100644 --- a/spring-cloud-dataflow-server-core/src/main/java/org/springframework/cloud/dataflow/server/task/DataflowTaskExplorer.java +++ b/spring-cloud-dataflow-server-core/src/main/java/org/springframework/cloud/dataflow/server/task/DataflowTaskExplorer.java @@ -20,6 +20,7 @@ import java.util.List; import java.util.Set; +import org.springframework.cloud.dataflow.core.ThinTaskExecution; import org.springframework.cloud.task.repository.TaskExecution; import org.springframework.data.domain.Page; import org.springframework.data.domain.Pageable; @@ -175,4 +176,10 @@ public interface DataflowTaskExplorer { * @see #getLatestTaskExecutionsByTaskNames(String...) */ TaskExecution getLatestTaskExecutionForTaskName(String taskName); + + /** + * Populate CTR status for all tasks + * @param thinTaskExecutions + */ + void populateCtrStatus(Collection thinTaskExecutions); } diff --git a/spring-cloud-dataflow-server-core/src/main/java/org/springframework/cloud/dataflow/server/task/impl/DefaultDataFlowTaskExecutionQueryDao.java b/spring-cloud-dataflow-server-core/src/main/java/org/springframework/cloud/dataflow/server/task/impl/DefaultDataFlowTaskExecutionQueryDao.java index bc4a381ec0..a36dae9e35 100644 --- a/spring-cloud-dataflow-server-core/src/main/java/org/springframework/cloud/dataflow/server/task/impl/DefaultDataFlowTaskExecutionQueryDao.java +++ b/spring-cloud-dataflow-server-core/src/main/java/org/springframework/cloud/dataflow/server/task/impl/DefaultDataFlowTaskExecutionQueryDao.java @@ -28,6 +28,8 @@ import java.util.List; import java.util.Map; import java.util.Set; +import java.util.function.Function; +import java.util.stream.Collectors; import javax.sql.DataSource; @@ -35,6 +37,7 @@ import org.slf4j.LoggerFactory; import org.springframework.batch.item.database.Order; +import org.springframework.cloud.dataflow.core.ThinTaskExecution; import org.springframework.cloud.dataflow.server.task.DataflowTaskExecutionQueryDao; import org.springframework.cloud.task.repository.TaskExecution; import org.springframework.cloud.task.repository.database.PagingQueryProvider; @@ -79,6 +82,14 @@ public class DefaultDataFlowTaskExecutionQueryDao implements DataflowTaskExecuti private static final String FIND_TASK_ARGUMENTS = "SELECT TASK_EXECUTION_ID, " + "TASK_PARAM from TASK_EXECUTION_PARAMS where TASK_EXECUTION_ID = :taskExecutionId"; + private static final String FIND_CTR_STATUS = "SELECT T.TASK_EXECUTION_ID as TASK_EXECUTION_ID, J.EXIT_CODE as CTR_STATUS" + + " from TASK_EXECUTION T" + + " JOIN TASK_TASK_BATCH TB ON TB.TASK_EXECUTION_ID = T.TASK_EXECUTION_ID" + + " JOIN BATCH_JOB_EXECUTION J ON J.JOB_EXECUTION_ID = TB.JOB_EXECUTION_ID" + + " WHERE T.TASK_EXECUTION_ID in (:taskExecutionIds) " + + " AND (select count(*) from TASK_EXECUTION CT" + // it is the parent of one or more tasks + " where CT.PARENT_EXECUTION_ID = T.TASK_EXECUTION_ID) > 0"; + private static final String GET_EXECUTIONS = "SELECT " + SELECT_CLAUSE + " from TASK_EXECUTION"; @@ -509,4 +520,23 @@ private List getTaskArguments(long taskExecutionId) { handler); return params; } + + @Override + public void populateCtrStatus(Collection thinTaskExecutions) { + Map taskExecutionMap = thinTaskExecutions.stream() + .collect(Collectors.toMap(ThinTaskExecution::getExecutionId, Function.identity())); + String ids = taskExecutionMap.keySet() + .stream() + .map(Object::toString) + .collect(Collectors.joining(",")); + String sql = FIND_CTR_STATUS.replace(":taskExecutionIds", ids); + jdbcTemplate.query(sql, rs -> { + Long id = rs.getLong("TASK_EXECUTION_ID"); + String ctrStatus = rs.getString("CTR_STATUS"); + logger.debug("populateCtrStatus:{}={}", id, ctrStatus); + ThinTaskExecution execution = taskExecutionMap.get(id); + Assert.notNull(execution, "Expected TaskExecution for " + id + " from " + ids); + execution.setCtrTaskStatus(ctrStatus); + }); + } } diff --git a/spring-cloud-dataflow-server-core/src/main/java/org/springframework/cloud/dataflow/server/task/impl/DefaultDataflowTaskExplorer.java b/spring-cloud-dataflow-server-core/src/main/java/org/springframework/cloud/dataflow/server/task/impl/DefaultDataflowTaskExplorer.java index 6431dfa1f2..7fc4866062 100644 --- a/spring-cloud-dataflow-server-core/src/main/java/org/springframework/cloud/dataflow/server/task/impl/DefaultDataflowTaskExplorer.java +++ b/spring-cloud-dataflow-server-core/src/main/java/org/springframework/cloud/dataflow/server/task/impl/DefaultDataflowTaskExplorer.java @@ -25,6 +25,7 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import org.springframework.cloud.dataflow.core.ThinTaskExecution; import org.springframework.cloud.dataflow.server.task.DataflowTaskExplorer; import org.springframework.cloud.dataflow.server.task.DataflowTaskExecutionQueryDao; import org.springframework.cloud.dataflow.server.task.TaskDefinitionReader; @@ -185,4 +186,8 @@ public TaskExecution getLatestTaskExecutionForTaskName(String taskName) { return taskExplorer.getLatestTaskExecutionForTaskName(taskName); } + @Override + public void populateCtrStatus(Collection thinTaskExecutions) { + this.taskExecutionQueryDao.populateCtrStatus(thinTaskExecutions); + } } diff --git a/spring-cloud-dataflow-server-core/src/test/java/org/springframework/cloud/dataflow/server/controller/TaskExecutionControllerTests.java b/spring-cloud-dataflow-server-core/src/test/java/org/springframework/cloud/dataflow/server/controller/TaskExecutionControllerTests.java index 71b1014b92..581757a9f7 100644 --- a/spring-cloud-dataflow-server-core/src/test/java/org/springframework/cloud/dataflow/server/controller/TaskExecutionControllerTests.java +++ b/spring-cloud-dataflow-server-core/src/test/java/org/springframework/cloud/dataflow/server/controller/TaskExecutionControllerTests.java @@ -332,6 +332,7 @@ void getAllExecutions() throws Exception { .andExpect(status().isOk())) .andExpect(jsonPath("$._embedded.taskExecutionResourceList[*].executionId", containsInAnyOrder(4, 3, 2, 1))) .andExpect(jsonPath("$._embedded.taskExecutionResourceList[*].parentExecutionId", containsInAnyOrder(null, null, null, 1))) + .andExpect(jsonPath("$._embedded.taskExecutionResourceList[*].taskExecutionStatus", containsInAnyOrder("RUNNING", "RUNNING","RUNNING","RUNNING"))) .andExpect(jsonPath("$._embedded.taskExecutionResourceList", hasSize(4))); } @@ -342,6 +343,7 @@ void getAllThinExecutions() throws Exception { .andExpect(status().isOk()) .andExpect(jsonPath("$._embedded.taskExecutionThinResourceList[*].executionId", containsInAnyOrder(4, 3, 2, 1))) .andExpect(jsonPath("$._embedded.taskExecutionThinResourceList[*].parentExecutionId", containsInAnyOrder(null, null, null, 1))) + .andExpect(jsonPath("$._embedded.taskExecutionThinResourceList[*].taskExecutionStatus", containsInAnyOrder("RUNNING", "RUNNING","RUNNING","RUNNING"))) .andExpect(jsonPath("$._embedded.taskExecutionThinResourceList", hasSize(4))); }