From 27c484a48e3edd29265ce5af9b4c89c82b19e7a1 Mon Sep 17 00:00:00 2001 From: Corneil du Plessis Date: Fri, 23 Aug 2024 17:15:50 +0200 Subject: [PATCH 1/2] Added ThinTaskExecution to store ctrStatus. Update Controller and Service to populate the ctr status. Fixes #5907 --- .../dataflow/core/ThinTaskExecution.java | 54 +++++++++++++++++++ .../resource/TaskExecutionThinResource.java | 32 +++++++++-- .../TaskExecutionThinController.java | 16 ++++-- .../task/DataflowTaskExecutionQueryDao.java | 2 + .../server/task/DataflowTaskExplorer.java | 7 +++ .../DefaultDataFlowTaskExecutionQueryDao.java | 33 ++++++++++++ .../impl/DefaultDataflowTaskExplorer.java | 5 ++ .../TaskExecutionControllerTests.java | 2 + 8 files changed, 142 insertions(+), 9 deletions(-) create mode 100644 spring-cloud-dataflow-core/src/main/java/org/springframework/cloud/dataflow/core/ThinTaskExecution.java diff --git a/spring-cloud-dataflow-core/src/main/java/org/springframework/cloud/dataflow/core/ThinTaskExecution.java b/spring-cloud-dataflow-core/src/main/java/org/springframework/cloud/dataflow/core/ThinTaskExecution.java new file mode 100644 index 0000000000..6d4c5b797a --- /dev/null +++ b/spring-cloud-dataflow-core/src/main/java/org/springframework/cloud/dataflow/core/ThinTaskExecution.java @@ -0,0 +1,54 @@ +/* + * Copyright 2024 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.springframework.cloud.dataflow.core; + +import java.time.LocalDateTime; +import java.util.List; + +import org.springframework.cloud.task.repository.TaskExecution; + +/** + * Provide for ctr status + * @author Corneil du Plessis + */ +public class ThinTaskExecution extends TaskExecution { + private String ctrTaskStatus; + + public ThinTaskExecution() { + } + public ThinTaskExecution(TaskExecution taskExecution) { + super(taskExecution.getExecutionId(), taskExecution.getExitCode(), taskExecution.getTaskName(), taskExecution.getStartTime(), taskExecution.getEndTime(), taskExecution.getExitMessage(), taskExecution.getArguments(), taskExecution.getErrorMessage(), taskExecution.getExternalExecutionId(), taskExecution.getParentExecutionId()); + } + public ThinTaskExecution(TaskExecution taskExecution, String ctrTaskStatus) { + super(taskExecution.getExecutionId(), taskExecution.getExitCode(), taskExecution.getTaskName(), taskExecution.getStartTime(), taskExecution.getEndTime(), taskExecution.getExitMessage(), taskExecution.getArguments(), taskExecution.getErrorMessage(), taskExecution.getExternalExecutionId(), taskExecution.getParentExecutionId()); + this.ctrTaskStatus = ctrTaskStatus; + } + public ThinTaskExecution(long executionId, Integer exitCode, String taskName, LocalDateTime startTime, LocalDateTime endTime, String exitMessage, List arguments, String errorMessage, String externalExecutionId, Long parentExecutionId) { + super(executionId, exitCode, taskName, startTime, endTime, exitMessage, arguments, errorMessage, externalExecutionId, parentExecutionId); + } + + public ThinTaskExecution(long executionId, Integer exitCode, String taskName, LocalDateTime startTime, LocalDateTime endTime, String exitMessage, List arguments, String errorMessage, String externalExecutionId) { + super(executionId, exitCode, taskName, startTime, endTime, exitMessage, arguments, errorMessage, externalExecutionId); + } + + public String getCtrTaskStatus() { + return ctrTaskStatus; + } + + public void setCtrTaskStatus(String ctrTaskStatus) { + this.ctrTaskStatus = ctrTaskStatus; + } +} diff --git a/spring-cloud-dataflow-rest-resource/src/main/java/org/springframework/cloud/dataflow/rest/resource/TaskExecutionThinResource.java b/spring-cloud-dataflow-rest-resource/src/main/java/org/springframework/cloud/dataflow/rest/resource/TaskExecutionThinResource.java index 52d34691cc..95c063a8a6 100644 --- a/spring-cloud-dataflow-rest-resource/src/main/java/org/springframework/cloud/dataflow/rest/resource/TaskExecutionThinResource.java +++ b/spring-cloud-dataflow-rest-resource/src/main/java/org/springframework/cloud/dataflow/rest/resource/TaskExecutionThinResource.java @@ -17,7 +17,7 @@ import java.time.LocalDateTime; -import org.springframework.cloud.task.repository.TaskExecution; +import org.springframework.cloud.dataflow.core.ThinTaskExecution; import org.springframework.hateoas.PagedModel; import org.springframework.hateoas.RepresentationModel; @@ -66,15 +66,14 @@ public class TaskExecutionThinResource extends RepresentationModel { } } diff --git a/spring-cloud-dataflow-server-core/src/main/java/org/springframework/cloud/dataflow/server/controller/TaskExecutionThinController.java b/spring-cloud-dataflow-server-core/src/main/java/org/springframework/cloud/dataflow/server/controller/TaskExecutionThinController.java index 4bfd137402..baec4f3480 100644 --- a/spring-cloud-dataflow-server-core/src/main/java/org/springframework/cloud/dataflow/server/controller/TaskExecutionThinController.java +++ b/spring-cloud-dataflow-server-core/src/main/java/org/springframework/cloud/dataflow/server/controller/TaskExecutionThinController.java @@ -16,9 +16,12 @@ package org.springframework.cloud.dataflow.server.controller; +import org.springframework.cloud.dataflow.core.ThinTaskExecution; import org.springframework.cloud.dataflow.rest.resource.TaskExecutionThinResource; import org.springframework.cloud.dataflow.server.task.DataflowTaskExplorer; import org.springframework.cloud.task.repository.TaskExecution; +import org.springframework.data.domain.Page; +import org.springframework.data.domain.PageImpl; import org.springframework.data.domain.Pageable; import org.springframework.data.web.PagedResourcesAssembler; import org.springframework.hateoas.PagedModel; @@ -47,21 +50,24 @@ public class TaskExecutionThinController { public TaskExecutionThinController(DataflowTaskExplorer explorer) { this.explorer = explorer; - this.resourceAssembler = new TaskExecutionThinResourceAssembler(); + this.resourceAssembler = new TaskExecutionThinResourceAssembler(); } @GetMapping(produces = "application/json") @ResponseStatus(HttpStatus.OK) - public PagedModel listTasks(Pageable pageable, PagedResourcesAssembler pagedAssembler) { - return pagedAssembler.toModel(explorer.findAll(pageable), resourceAssembler); + public PagedModel listTasks(Pageable pageable, PagedResourcesAssembler pagedAssembler) { + Page page = explorer.findAll(pageable); + Page thinTaskExecutions = new PageImpl<>(page.stream().map(ThinTaskExecution::new).toList(), pageable, page.getTotalElements()); + explorer.populateCtrStatus(thinTaskExecutions.getContent()); + return pagedAssembler.toModel(thinTaskExecutions, resourceAssembler); } - static class TaskExecutionThinResourceAssembler extends RepresentationModelAssemblerSupport { + static class TaskExecutionThinResourceAssembler extends RepresentationModelAssemblerSupport { public TaskExecutionThinResourceAssembler() { super(TaskExecutionThinController.class, TaskExecutionThinResource.class); } @Override - public TaskExecutionThinResource toModel(TaskExecution entity) { + public TaskExecutionThinResource toModel(ThinTaskExecution entity) { TaskExecutionThinResource resource = new TaskExecutionThinResource(entity); resource.add(linkTo(methodOn(TaskExecutionController.class).view(resource.getExecutionId())).withSelfRel()); resource.add(linkTo(methodOn(TaskDefinitionController.class).display(resource.getTaskName(), true)).withRel("tasks/definitions")); diff --git a/spring-cloud-dataflow-server-core/src/main/java/org/springframework/cloud/dataflow/server/task/DataflowTaskExecutionQueryDao.java b/spring-cloud-dataflow-server-core/src/main/java/org/springframework/cloud/dataflow/server/task/DataflowTaskExecutionQueryDao.java index 4e36367e50..00ee8c3d94 100644 --- a/spring-cloud-dataflow-server-core/src/main/java/org/springframework/cloud/dataflow/server/task/DataflowTaskExecutionQueryDao.java +++ b/spring-cloud-dataflow-server-core/src/main/java/org/springframework/cloud/dataflow/server/task/DataflowTaskExecutionQueryDao.java @@ -19,6 +19,7 @@ import java.util.Date; import java.util.List; +import org.springframework.cloud.dataflow.core.ThinTaskExecution; import org.springframework.cloud.task.repository.TaskExecution; import org.springframework.cloud.task.repository.dao.TaskExecutionDao; import org.springframework.data.domain.Page; @@ -166,4 +167,5 @@ public interface DataflowTaskExecutionQueryDao { TaskExecution geTaskExecutionByExecutionId(String executionId, String taskName); + void populateCtrStatus(Collection aggregateTaskExecutions); } diff --git a/spring-cloud-dataflow-server-core/src/main/java/org/springframework/cloud/dataflow/server/task/DataflowTaskExplorer.java b/spring-cloud-dataflow-server-core/src/main/java/org/springframework/cloud/dataflow/server/task/DataflowTaskExplorer.java index 22e68c7a60..21ddb27510 100644 --- a/spring-cloud-dataflow-server-core/src/main/java/org/springframework/cloud/dataflow/server/task/DataflowTaskExplorer.java +++ b/spring-cloud-dataflow-server-core/src/main/java/org/springframework/cloud/dataflow/server/task/DataflowTaskExplorer.java @@ -20,6 +20,7 @@ import java.util.List; import java.util.Set; +import org.springframework.cloud.dataflow.core.ThinTaskExecution; import org.springframework.cloud.task.repository.TaskExecution; import org.springframework.data.domain.Page; import org.springframework.data.domain.Pageable; @@ -175,4 +176,10 @@ public interface DataflowTaskExplorer { * @see #getLatestTaskExecutionsByTaskNames(String...) */ TaskExecution getLatestTaskExecutionForTaskName(String taskName); + + /** + * Populate CTR status for all tasks + * @param aggregateTaskExecutions + */ + void populateCtrStatus(Collection aggregateTaskExecutions); } diff --git a/spring-cloud-dataflow-server-core/src/main/java/org/springframework/cloud/dataflow/server/task/impl/DefaultDataFlowTaskExecutionQueryDao.java b/spring-cloud-dataflow-server-core/src/main/java/org/springframework/cloud/dataflow/server/task/impl/DefaultDataFlowTaskExecutionQueryDao.java index bc4a381ec0..fedc1fcc95 100644 --- a/spring-cloud-dataflow-server-core/src/main/java/org/springframework/cloud/dataflow/server/task/impl/DefaultDataFlowTaskExecutionQueryDao.java +++ b/spring-cloud-dataflow-server-core/src/main/java/org/springframework/cloud/dataflow/server/task/impl/DefaultDataFlowTaskExecutionQueryDao.java @@ -28,6 +28,9 @@ import java.util.List; import java.util.Map; import java.util.Set; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.function.Function; +import java.util.stream.Collectors; import javax.sql.DataSource; @@ -35,6 +38,7 @@ import org.slf4j.LoggerFactory; import org.springframework.batch.item.database.Order; +import org.springframework.cloud.dataflow.core.ThinTaskExecution; import org.springframework.cloud.dataflow.server.task.DataflowTaskExecutionQueryDao; import org.springframework.cloud.task.repository.TaskExecution; import org.springframework.cloud.task.repository.database.PagingQueryProvider; @@ -79,6 +83,14 @@ public class DefaultDataFlowTaskExecutionQueryDao implements DataflowTaskExecuti private static final String FIND_TASK_ARGUMENTS = "SELECT TASK_EXECUTION_ID, " + "TASK_PARAM from TASK_EXECUTION_PARAMS where TASK_EXECUTION_ID = :taskExecutionId"; + private static final String FIND_CTR_STATUS = "SELECT T.TASK_EXECUTION_ID as TASK_EXECUTION_ID, J.EXIT_CODE as CTR_STATUS" + + " from TASK_EXECUTION T" + + " JOIN TASK_TASK_BATCH TB ON TB.TASK_EXECUTION_ID = T.TASK_EXECUTION_ID" + + " JOIN BATCH_JOB_EXECUTION J ON J.JOB_EXECUTION_ID = TB.JOB_EXECUTION_ID" + + " WHERE T.TASK_EXECUTION_ID in (:taskExecutionIds) " + + " AND (select count(*) from TASK_EXECUTION CT" + // it is the parent of one or more tasks + " where CT.PARENT_EXECUTION_ID = T.TASK_EXECUTION_ID) > 0"; + private static final String GET_EXECUTIONS = "SELECT " + SELECT_CLAUSE + " from TASK_EXECUTION"; @@ -509,4 +521,25 @@ private List getTaskArguments(long taskExecutionId) { handler); return params; } + + @Override + public void populateCtrStatus(Collection taskExecutions) { + final AtomicInteger updated = new AtomicInteger(0); + Map taskExecutionMap = taskExecutions.stream() + .collect(Collectors.toMap(ThinTaskExecution::getExecutionId, Function.identity())); + String ids = taskExecutionMap.keySet() + .stream() + .map(Object::toString) + .collect(Collectors.joining(",")); + String sql = FIND_CTR_STATUS.replace(":taskExecutionIds", ids); + jdbcTemplate.query(sql, rs -> { + Long id = rs.getLong("TASK_EXECUTION_ID"); + String ctrStatus = rs.getString("CTR_STATUS"); + logger.debug("populateCtrStatus:{}={}", id, ctrStatus); + ThinTaskExecution execution = taskExecutionMap.get(id); + Assert.notNull(execution, "Expected TaskExecution for " + id + " from " + ids); + updated.incrementAndGet(); + execution.setCtrTaskStatus(ctrStatus); + }); + } } diff --git a/spring-cloud-dataflow-server-core/src/main/java/org/springframework/cloud/dataflow/server/task/impl/DefaultDataflowTaskExplorer.java b/spring-cloud-dataflow-server-core/src/main/java/org/springframework/cloud/dataflow/server/task/impl/DefaultDataflowTaskExplorer.java index 6431dfa1f2..ea806540a0 100644 --- a/spring-cloud-dataflow-server-core/src/main/java/org/springframework/cloud/dataflow/server/task/impl/DefaultDataflowTaskExplorer.java +++ b/spring-cloud-dataflow-server-core/src/main/java/org/springframework/cloud/dataflow/server/task/impl/DefaultDataflowTaskExplorer.java @@ -25,6 +25,7 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import org.springframework.cloud.dataflow.core.ThinTaskExecution; import org.springframework.cloud.dataflow.server.task.DataflowTaskExplorer; import org.springframework.cloud.dataflow.server.task.DataflowTaskExecutionQueryDao; import org.springframework.cloud.dataflow.server.task.TaskDefinitionReader; @@ -185,4 +186,8 @@ public TaskExecution getLatestTaskExecutionForTaskName(String taskName) { return taskExplorer.getLatestTaskExecutionForTaskName(taskName); } + @Override + public void populateCtrStatus(Collection taskExecutions) { + this.taskExecutionQueryDao.populateCtrStatus(taskExecutions); + } } diff --git a/spring-cloud-dataflow-server-core/src/test/java/org/springframework/cloud/dataflow/server/controller/TaskExecutionControllerTests.java b/spring-cloud-dataflow-server-core/src/test/java/org/springframework/cloud/dataflow/server/controller/TaskExecutionControllerTests.java index 71b1014b92..581757a9f7 100644 --- a/spring-cloud-dataflow-server-core/src/test/java/org/springframework/cloud/dataflow/server/controller/TaskExecutionControllerTests.java +++ b/spring-cloud-dataflow-server-core/src/test/java/org/springframework/cloud/dataflow/server/controller/TaskExecutionControllerTests.java @@ -332,6 +332,7 @@ void getAllExecutions() throws Exception { .andExpect(status().isOk())) .andExpect(jsonPath("$._embedded.taskExecutionResourceList[*].executionId", containsInAnyOrder(4, 3, 2, 1))) .andExpect(jsonPath("$._embedded.taskExecutionResourceList[*].parentExecutionId", containsInAnyOrder(null, null, null, 1))) + .andExpect(jsonPath("$._embedded.taskExecutionResourceList[*].taskExecutionStatus", containsInAnyOrder("RUNNING", "RUNNING","RUNNING","RUNNING"))) .andExpect(jsonPath("$._embedded.taskExecutionResourceList", hasSize(4))); } @@ -342,6 +343,7 @@ void getAllThinExecutions() throws Exception { .andExpect(status().isOk()) .andExpect(jsonPath("$._embedded.taskExecutionThinResourceList[*].executionId", containsInAnyOrder(4, 3, 2, 1))) .andExpect(jsonPath("$._embedded.taskExecutionThinResourceList[*].parentExecutionId", containsInAnyOrder(null, null, null, 1))) + .andExpect(jsonPath("$._embedded.taskExecutionThinResourceList[*].taskExecutionStatus", containsInAnyOrder("RUNNING", "RUNNING","RUNNING","RUNNING"))) .andExpect(jsonPath("$._embedded.taskExecutionThinResourceList", hasSize(4))); } From ab46cc03d28562ae0047e54df5c8774b1a204d79 Mon Sep 17 00:00:00 2001 From: Corneil du Plessis Date: Mon, 26 Aug 2024 13:11:48 +0200 Subject: [PATCH 2/2] Updated for comments. --- .../cloud/dataflow/core/ThinTaskExecution.java | 2 +- .../server/task/DataflowTaskExecutionQueryDao.java | 2 +- .../cloud/dataflow/server/task/DataflowTaskExplorer.java | 4 ++-- .../task/impl/DefaultDataFlowTaskExecutionQueryDao.java | 7 ++----- .../server/task/impl/DefaultDataflowTaskExplorer.java | 4 ++-- 5 files changed, 8 insertions(+), 11 deletions(-) diff --git a/spring-cloud-dataflow-core/src/main/java/org/springframework/cloud/dataflow/core/ThinTaskExecution.java b/spring-cloud-dataflow-core/src/main/java/org/springframework/cloud/dataflow/core/ThinTaskExecution.java index 6d4c5b797a..2bfbf11cd4 100644 --- a/spring-cloud-dataflow-core/src/main/java/org/springframework/cloud/dataflow/core/ThinTaskExecution.java +++ b/spring-cloud-dataflow-core/src/main/java/org/springframework/cloud/dataflow/core/ThinTaskExecution.java @@ -21,7 +21,7 @@ import org.springframework.cloud.task.repository.TaskExecution; /** - * Provide for ctr status + * Overrides TaskExecution class to provide CTR status required. * @author Corneil du Plessis */ public class ThinTaskExecution extends TaskExecution { diff --git a/spring-cloud-dataflow-server-core/src/main/java/org/springframework/cloud/dataflow/server/task/DataflowTaskExecutionQueryDao.java b/spring-cloud-dataflow-server-core/src/main/java/org/springframework/cloud/dataflow/server/task/DataflowTaskExecutionQueryDao.java index 00ee8c3d94..c5f0445a62 100644 --- a/spring-cloud-dataflow-server-core/src/main/java/org/springframework/cloud/dataflow/server/task/DataflowTaskExecutionQueryDao.java +++ b/spring-cloud-dataflow-server-core/src/main/java/org/springframework/cloud/dataflow/server/task/DataflowTaskExecutionQueryDao.java @@ -167,5 +167,5 @@ public interface DataflowTaskExecutionQueryDao { TaskExecution geTaskExecutionByExecutionId(String executionId, String taskName); - void populateCtrStatus(Collection aggregateTaskExecutions); + void populateCtrStatus(Collection thinTaskExecutions); } diff --git a/spring-cloud-dataflow-server-core/src/main/java/org/springframework/cloud/dataflow/server/task/DataflowTaskExplorer.java b/spring-cloud-dataflow-server-core/src/main/java/org/springframework/cloud/dataflow/server/task/DataflowTaskExplorer.java index 21ddb27510..12e89f3ffb 100644 --- a/spring-cloud-dataflow-server-core/src/main/java/org/springframework/cloud/dataflow/server/task/DataflowTaskExplorer.java +++ b/spring-cloud-dataflow-server-core/src/main/java/org/springframework/cloud/dataflow/server/task/DataflowTaskExplorer.java @@ -179,7 +179,7 @@ public interface DataflowTaskExplorer { /** * Populate CTR status for all tasks - * @param aggregateTaskExecutions + * @param thinTaskExecutions */ - void populateCtrStatus(Collection aggregateTaskExecutions); + void populateCtrStatus(Collection thinTaskExecutions); } diff --git a/spring-cloud-dataflow-server-core/src/main/java/org/springframework/cloud/dataflow/server/task/impl/DefaultDataFlowTaskExecutionQueryDao.java b/spring-cloud-dataflow-server-core/src/main/java/org/springframework/cloud/dataflow/server/task/impl/DefaultDataFlowTaskExecutionQueryDao.java index fedc1fcc95..a36dae9e35 100644 --- a/spring-cloud-dataflow-server-core/src/main/java/org/springframework/cloud/dataflow/server/task/impl/DefaultDataFlowTaskExecutionQueryDao.java +++ b/spring-cloud-dataflow-server-core/src/main/java/org/springframework/cloud/dataflow/server/task/impl/DefaultDataFlowTaskExecutionQueryDao.java @@ -28,7 +28,6 @@ import java.util.List; import java.util.Map; import java.util.Set; -import java.util.concurrent.atomic.AtomicInteger; import java.util.function.Function; import java.util.stream.Collectors; @@ -523,9 +522,8 @@ private List getTaskArguments(long taskExecutionId) { } @Override - public void populateCtrStatus(Collection taskExecutions) { - final AtomicInteger updated = new AtomicInteger(0); - Map taskExecutionMap = taskExecutions.stream() + public void populateCtrStatus(Collection thinTaskExecutions) { + Map taskExecutionMap = thinTaskExecutions.stream() .collect(Collectors.toMap(ThinTaskExecution::getExecutionId, Function.identity())); String ids = taskExecutionMap.keySet() .stream() @@ -538,7 +536,6 @@ public void populateCtrStatus(Collection taskExecutions) { logger.debug("populateCtrStatus:{}={}", id, ctrStatus); ThinTaskExecution execution = taskExecutionMap.get(id); Assert.notNull(execution, "Expected TaskExecution for " + id + " from " + ids); - updated.incrementAndGet(); execution.setCtrTaskStatus(ctrStatus); }); } diff --git a/spring-cloud-dataflow-server-core/src/main/java/org/springframework/cloud/dataflow/server/task/impl/DefaultDataflowTaskExplorer.java b/spring-cloud-dataflow-server-core/src/main/java/org/springframework/cloud/dataflow/server/task/impl/DefaultDataflowTaskExplorer.java index ea806540a0..7fc4866062 100644 --- a/spring-cloud-dataflow-server-core/src/main/java/org/springframework/cloud/dataflow/server/task/impl/DefaultDataflowTaskExplorer.java +++ b/spring-cloud-dataflow-server-core/src/main/java/org/springframework/cloud/dataflow/server/task/impl/DefaultDataflowTaskExplorer.java @@ -187,7 +187,7 @@ public TaskExecution getLatestTaskExecutionForTaskName(String taskName) { } @Override - public void populateCtrStatus(Collection taskExecutions) { - this.taskExecutionQueryDao.populateCtrStatus(taskExecutions); + public void populateCtrStatus(Collection thinTaskExecutions) { + this.taskExecutionQueryDao.populateCtrStatus(thinTaskExecutions); } }