Skip to content
This repository was archived by the owner on May 14, 2025. It is now read-only.

Forward port ctr status on Thin Controller. #5906

Merged
merged 2 commits into from
Aug 27, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
/*
* Copyright 2024 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.cloud.dataflow.core;

import java.time.LocalDateTime;
import java.util.List;

import org.springframework.cloud.task.repository.TaskExecution;

/**
* Overrides TaskExecution class to provide CTR status required.
* @author Corneil du Plessis
*/
public class ThinTaskExecution extends TaskExecution {
private String ctrTaskStatus;

public ThinTaskExecution() {
}
public ThinTaskExecution(TaskExecution taskExecution) {
super(taskExecution.getExecutionId(), taskExecution.getExitCode(), taskExecution.getTaskName(), taskExecution.getStartTime(), taskExecution.getEndTime(), taskExecution.getExitMessage(), taskExecution.getArguments(), taskExecution.getErrorMessage(), taskExecution.getExternalExecutionId(), taskExecution.getParentExecutionId());
}
public ThinTaskExecution(TaskExecution taskExecution, String ctrTaskStatus) {
super(taskExecution.getExecutionId(), taskExecution.getExitCode(), taskExecution.getTaskName(), taskExecution.getStartTime(), taskExecution.getEndTime(), taskExecution.getExitMessage(), taskExecution.getArguments(), taskExecution.getErrorMessage(), taskExecution.getExternalExecutionId(), taskExecution.getParentExecutionId());
this.ctrTaskStatus = ctrTaskStatus;
}
public ThinTaskExecution(long executionId, Integer exitCode, String taskName, LocalDateTime startTime, LocalDateTime endTime, String exitMessage, List<String> arguments, String errorMessage, String externalExecutionId, Long parentExecutionId) {
super(executionId, exitCode, taskName, startTime, endTime, exitMessage, arguments, errorMessage, externalExecutionId, parentExecutionId);
}

public ThinTaskExecution(long executionId, Integer exitCode, String taskName, LocalDateTime startTime, LocalDateTime endTime, String exitMessage, List<String> arguments, String errorMessage, String externalExecutionId) {
super(executionId, exitCode, taskName, startTime, endTime, exitMessage, arguments, errorMessage, externalExecutionId);
}

public String getCtrTaskStatus() {
return ctrTaskStatus;
}

public void setCtrTaskStatus(String ctrTaskStatus) {
this.ctrTaskStatus = ctrTaskStatus;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@

import java.time.LocalDateTime;

import org.springframework.cloud.task.repository.TaskExecution;
import org.springframework.cloud.dataflow.core.ThinTaskExecution;
import org.springframework.hateoas.PagedModel;
import org.springframework.hateoas.RepresentationModel;

Expand Down Expand Up @@ -66,22 +66,22 @@ public class TaskExecutionThinResource extends RepresentationModel<TaskExecution

private String errorMessage;

private String composedTaskJobExecutionStatus;

public TaskExecutionThinResource() {
}

public TaskExecutionThinResource(TaskExecution taskExecution) {
public TaskExecutionThinResource(ThinTaskExecution taskExecution) {
this.executionId = taskExecution.getExecutionId();

this.taskName = taskExecution.getTaskName();

this.externalExecutionId = taskExecution.getExternalExecutionId();
this.parentExecutionId =taskExecution.getParentExecutionId();
this.startTime = taskExecution.getStartTime();
this.endTime = taskExecution.getEndTime();
this.exitCode = taskExecution.getExitCode();
this.exitMessage = taskExecution.getExitMessage();
this.errorMessage = taskExecution.getErrorMessage();
this.composedTaskJobExecutionStatus = taskExecution.getCtrTaskStatus();
}

public long getExecutionId() {
Expand Down Expand Up @@ -156,6 +156,30 @@ public void setErrorMessage(String errorMessage) {
this.errorMessage = errorMessage;
}

public String getComposedTaskJobExecutionStatus() {
return composedTaskJobExecutionStatus;
}

public void setComposedTaskJobExecutionStatus(String composedTaskJobExecutionStatus) {
this.composedTaskJobExecutionStatus = composedTaskJobExecutionStatus;
}

public TaskExecutionStatus getTaskExecutionStatus() {
if (this.startTime == null) {
return TaskExecutionStatus.UNKNOWN;
}
if (this.endTime == null) {
return TaskExecutionStatus.RUNNING;
}
if (this.composedTaskJobExecutionStatus != null) {
return (this.composedTaskJobExecutionStatus.equals("ABANDONED") ||
this.composedTaskJobExecutionStatus.equals("FAILED") ||
this.composedTaskJobExecutionStatus.equals("STOPPED")) ?
TaskExecutionStatus.ERROR : TaskExecutionStatus.COMPLETE;
}
return (this.exitCode == null) ? TaskExecutionStatus.RUNNING :
((this.exitCode == 0) ? TaskExecutionStatus.COMPLETE : TaskExecutionStatus.ERROR);
}
public static class Page extends PagedModel<TaskExecutionThinResource> {
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,12 @@
package org.springframework.cloud.dataflow.server.controller;


import org.springframework.cloud.dataflow.core.ThinTaskExecution;
import org.springframework.cloud.dataflow.rest.resource.TaskExecutionThinResource;
import org.springframework.cloud.dataflow.server.task.DataflowTaskExplorer;
import org.springframework.cloud.task.repository.TaskExecution;
import org.springframework.data.domain.Page;
import org.springframework.data.domain.PageImpl;
import org.springframework.data.domain.Pageable;
import org.springframework.data.web.PagedResourcesAssembler;
import org.springframework.hateoas.PagedModel;
Expand Down Expand Up @@ -47,21 +50,24 @@ public class TaskExecutionThinController {

public TaskExecutionThinController(DataflowTaskExplorer explorer) {
this.explorer = explorer;
this.resourceAssembler = new TaskExecutionThinResourceAssembler();
this.resourceAssembler = new TaskExecutionThinResourceAssembler();
}

@GetMapping(produces = "application/json")
@ResponseStatus(HttpStatus.OK)
public PagedModel<TaskExecutionThinResource> listTasks(Pageable pageable, PagedResourcesAssembler<TaskExecution> pagedAssembler) {
return pagedAssembler.toModel(explorer.findAll(pageable), resourceAssembler);
public PagedModel<TaskExecutionThinResource> listTasks(Pageable pageable, PagedResourcesAssembler<ThinTaskExecution> pagedAssembler) {
Page<TaskExecution> page = explorer.findAll(pageable);
Page<ThinTaskExecution> thinTaskExecutions = new PageImpl<>(page.stream().map(ThinTaskExecution::new).toList(), pageable, page.getTotalElements());
explorer.populateCtrStatus(thinTaskExecutions.getContent());
return pagedAssembler.toModel(thinTaskExecutions, resourceAssembler);
}

static class TaskExecutionThinResourceAssembler extends RepresentationModelAssemblerSupport<TaskExecution, TaskExecutionThinResource> {
static class TaskExecutionThinResourceAssembler extends RepresentationModelAssemblerSupport<ThinTaskExecution, TaskExecutionThinResource> {
public TaskExecutionThinResourceAssembler() {
super(TaskExecutionThinController.class, TaskExecutionThinResource.class);
}
@Override
public TaskExecutionThinResource toModel(TaskExecution entity) {
public TaskExecutionThinResource toModel(ThinTaskExecution entity) {
TaskExecutionThinResource resource = new TaskExecutionThinResource(entity);
resource.add(linkTo(methodOn(TaskExecutionController.class).view(resource.getExecutionId())).withSelfRel());
resource.add(linkTo(methodOn(TaskDefinitionController.class).display(resource.getTaskName(), true)).withRel("tasks/definitions"));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import java.util.Date;
import java.util.List;

import org.springframework.cloud.dataflow.core.ThinTaskExecution;
import org.springframework.cloud.task.repository.TaskExecution;
import org.springframework.cloud.task.repository.dao.TaskExecutionDao;
import org.springframework.data.domain.Page;
Expand Down Expand Up @@ -166,4 +167,5 @@ public interface DataflowTaskExecutionQueryDao {

TaskExecution geTaskExecutionByExecutionId(String executionId, String taskName);

void populateCtrStatus(Collection<ThinTaskExecution> thinTaskExecutions);
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import java.util.List;
import java.util.Set;

import org.springframework.cloud.dataflow.core.ThinTaskExecution;
import org.springframework.cloud.task.repository.TaskExecution;
import org.springframework.data.domain.Page;
import org.springframework.data.domain.Pageable;
Expand Down Expand Up @@ -175,4 +176,10 @@ public interface DataflowTaskExplorer {
* @see #getLatestTaskExecutionsByTaskNames(String...)
*/
TaskExecution getLatestTaskExecutionForTaskName(String taskName);

/**
* Populate CTR status for all tasks
* @param thinTaskExecutions
*/
void populateCtrStatus(Collection<ThinTaskExecution> thinTaskExecutions);
}
Original file line number Diff line number Diff line change
Expand Up @@ -28,13 +28,16 @@
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.function.Function;
import java.util.stream.Collectors;

import javax.sql.DataSource;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import org.springframework.batch.item.database.Order;
import org.springframework.cloud.dataflow.core.ThinTaskExecution;
import org.springframework.cloud.dataflow.server.task.DataflowTaskExecutionQueryDao;
import org.springframework.cloud.task.repository.TaskExecution;
import org.springframework.cloud.task.repository.database.PagingQueryProvider;
Expand Down Expand Up @@ -79,6 +82,14 @@ public class DefaultDataFlowTaskExecutionQueryDao implements DataflowTaskExecuti
private static final String FIND_TASK_ARGUMENTS = "SELECT TASK_EXECUTION_ID, "
+ "TASK_PARAM from TASK_EXECUTION_PARAMS where TASK_EXECUTION_ID = :taskExecutionId";

private static final String FIND_CTR_STATUS = "SELECT T.TASK_EXECUTION_ID as TASK_EXECUTION_ID, J.EXIT_CODE as CTR_STATUS" +
" from TASK_EXECUTION T" +
" JOIN TASK_TASK_BATCH TB ON TB.TASK_EXECUTION_ID = T.TASK_EXECUTION_ID" +
" JOIN BATCH_JOB_EXECUTION J ON J.JOB_EXECUTION_ID = TB.JOB_EXECUTION_ID" +
" WHERE T.TASK_EXECUTION_ID in (:taskExecutionIds) " +
" AND (select count(*) from TASK_EXECUTION CT" + // it is the parent of one or more tasks
" where CT.PARENT_EXECUTION_ID = T.TASK_EXECUTION_ID) > 0";

private static final String GET_EXECUTIONS = "SELECT " + SELECT_CLAUSE +
" from TASK_EXECUTION";

Expand Down Expand Up @@ -509,4 +520,23 @@ private List<String> getTaskArguments(long taskExecutionId) {
handler);
return params;
}

@Override
public void populateCtrStatus(Collection<ThinTaskExecution> thinTaskExecutions) {
Map<Long, ThinTaskExecution> taskExecutionMap = thinTaskExecutions.stream()
.collect(Collectors.toMap(ThinTaskExecution::getExecutionId, Function.identity()));
String ids = taskExecutionMap.keySet()
.stream()
.map(Object::toString)
.collect(Collectors.joining(","));
String sql = FIND_CTR_STATUS.replace(":taskExecutionIds", ids);
jdbcTemplate.query(sql, rs -> {
Long id = rs.getLong("TASK_EXECUTION_ID");
String ctrStatus = rs.getString("CTR_STATUS");
logger.debug("populateCtrStatus:{}={}", id, ctrStatus);
ThinTaskExecution execution = taskExecutionMap.get(id);
Assert.notNull(execution, "Expected TaskExecution for " + id + " from " + ids);
execution.setCtrTaskStatus(ctrStatus);
});
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import org.springframework.cloud.dataflow.core.ThinTaskExecution;
import org.springframework.cloud.dataflow.server.task.DataflowTaskExplorer;
import org.springframework.cloud.dataflow.server.task.DataflowTaskExecutionQueryDao;
import org.springframework.cloud.dataflow.server.task.TaskDefinitionReader;
Expand Down Expand Up @@ -185,4 +186,8 @@ public TaskExecution getLatestTaskExecutionForTaskName(String taskName) {
return taskExplorer.getLatestTaskExecutionForTaskName(taskName);
}

@Override
public void populateCtrStatus(Collection<ThinTaskExecution> thinTaskExecutions) {
this.taskExecutionQueryDao.populateCtrStatus(thinTaskExecutions);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -332,6 +332,7 @@ void getAllExecutions() throws Exception {
.andExpect(status().isOk()))
.andExpect(jsonPath("$._embedded.taskExecutionResourceList[*].executionId", containsInAnyOrder(4, 3, 2, 1)))
.andExpect(jsonPath("$._embedded.taskExecutionResourceList[*].parentExecutionId", containsInAnyOrder(null, null, null, 1)))
.andExpect(jsonPath("$._embedded.taskExecutionResourceList[*].taskExecutionStatus", containsInAnyOrder("RUNNING", "RUNNING","RUNNING","RUNNING")))
.andExpect(jsonPath("$._embedded.taskExecutionResourceList", hasSize(4)));
}

Expand All @@ -342,6 +343,7 @@ void getAllThinExecutions() throws Exception {
.andExpect(status().isOk())
.andExpect(jsonPath("$._embedded.taskExecutionThinResourceList[*].executionId", containsInAnyOrder(4, 3, 2, 1)))
.andExpect(jsonPath("$._embedded.taskExecutionThinResourceList[*].parentExecutionId", containsInAnyOrder(null, null, null, 1)))
.andExpect(jsonPath("$._embedded.taskExecutionThinResourceList[*].taskExecutionStatus", containsInAnyOrder("RUNNING", "RUNNING","RUNNING","RUNNING")))
.andExpect(jsonPath("$._embedded.taskExecutionThinResourceList", hasSize(4)));
}

Expand Down
Loading