Skip to content

Added query for composed-task-runner status. #5792

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -581,6 +581,7 @@ public AggregateTaskExecution mapRow(ResultSet rs, int rowNum) throws SQLExcepti
rs.getString("EXTERNAL_EXECUTION_ID"),
parentExecutionId,
null,
null,
schemaTarget
);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -156,6 +156,7 @@ public AggregateTaskExecution from(TaskExecution execution, String schemaTarget,
execution.getExternalExecutionId(),
execution.getParentExecutionId(),
platformName,
null,
schemaTarget);
}
return null;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,8 @@ public class TaskExecutionThinResource extends RepresentationModel<TaskExecution

private String taskExecutionStatus;

private String composedTaskJobExecutionStatus;

/**
* @since 2.11.0
*/
Expand All @@ -94,6 +96,7 @@ public TaskExecutionThinResource(AggregateTaskExecution aggregateTaskExecution)
this.exitCode = aggregateTaskExecution.getExitCode();
this.exitMessage = aggregateTaskExecution.getExitMessage();
this.errorMessage = aggregateTaskExecution.getErrorMessage();
this.composedTaskJobExecutionStatus = aggregateTaskExecution.getCtrTaskStatus();
}

public long getExecutionId() {
Expand Down Expand Up @@ -187,6 +190,14 @@ public void setTaskExecutionStatus(String taskExecutionStatus) {
this.taskExecutionStatus = taskExecutionStatus;
}

public String getComposedTaskJobExecutionStatus() {
return composedTaskJobExecutionStatus;
}

public void setComposedTaskJobExecutionStatus(String composedTaskJobExecutionStatus) {
this.composedTaskJobExecutionStatus = composedTaskJobExecutionStatus;
}

/**
* Returns the calculated status of this {@link TaskExecution}.
*
Expand All @@ -211,7 +222,12 @@ public TaskExecutionStatus getTaskExecutionStatus() {
if (this.endTime == null) {
return TaskExecutionStatus.RUNNING;
}

if (this.composedTaskJobExecutionStatus != null) {
return (this.composedTaskJobExecutionStatus.equals("ABANDONED") ||
this.composedTaskJobExecutionStatus.equals("FAILED") ||
this.composedTaskJobExecutionStatus.equals("STOPPED")) ?
TaskExecutionStatus.ERROR : TaskExecutionStatus.COMPLETE;
}
return (this.exitCode == null) ? TaskExecutionStatus.RUNNING :
((this.exitCode == 0) ? TaskExecutionStatus.COMPLETE : TaskExecutionStatus.ERROR);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,8 @@ public class AggregateTaskExecution {
private String schemaTarget;

private String platformName;

private String ctrTaskStatus;
/**
* The arguments that were used for this task execution.
*/
Expand All @@ -92,7 +94,8 @@ public AggregateTaskExecution() {

public AggregateTaskExecution(long executionId, Integer exitCode, String taskName,
Date startTime, Date endTime, String exitMessage, List<String> arguments,
String errorMessage, String externalExecutionId, Long parentExecutionId, String platformName, String schemaTarget) {
String errorMessage, String externalExecutionId, Long parentExecutionId, String platformName,
String ctrTaskStatus, String schemaTarget) {

Assert.notNull(arguments, "arguments must not be null");
this.executionId = executionId;
Expand All @@ -107,14 +110,15 @@ public AggregateTaskExecution(long executionId, Integer exitCode, String taskNam
this.parentExecutionId = parentExecutionId;
this.schemaTarget = schemaTarget;
this.platformName = platformName;
this.ctrTaskStatus = ctrTaskStatus;
}

public AggregateTaskExecution(long executionId, Integer exitCode, String taskName,
Date startTime, Date endTime, String exitMessage, List<String> arguments,
String errorMessage, String externalExecutionId, String platformName, String schemaTarget) {
String errorMessage, String externalExecutionId, String platformName, String ctrTaskStatus, String schemaTarget) {

this(executionId, exitCode, taskName, startTime, endTime, exitMessage, arguments,
errorMessage, externalExecutionId, null, platformName, schemaTarget);
errorMessage, externalExecutionId, null, platformName, ctrTaskStatus, schemaTarget);
}

public long getExecutionId() {
Expand Down Expand Up @@ -209,22 +213,31 @@ public void setPlatformName(String platformName) {
this.platformName = platformName;
}

public String getCtrTaskStatus() {
return ctrTaskStatus;
}

public void setCtrTaskStatus(String ctrTaskStatus) {
this.ctrTaskStatus = ctrTaskStatus;
}

@Override
public String toString() {
return "AggregateTaskExecution{" +
"executionId=" + executionId +
", parentExecutionId=" + parentExecutionId +
", exitCode=" + exitCode +
", taskName='" + taskName + '\'' +
", startTime=" + startTime +
", endTime=" + endTime +
", exitMessage='" + exitMessage + '\'' +
", externalExecutionId='" + externalExecutionId + '\'' +
", errorMessage='" + errorMessage + '\'' +
", schemaTarget='" + schemaTarget + '\'' +
", platformName='" + platformName + '\'' +
", arguments=" + arguments +
'}';
"executionId=" + executionId +
", parentExecutionId=" + parentExecutionId +
", exitCode=" + exitCode +
", taskName='" + taskName + '\'' +
", startTime=" + startTime +
", endTime=" + endTime +
", exitMessage='" + exitMessage + '\'' +
", externalExecutionId='" + externalExecutionId + '\'' +
", errorMessage='" + errorMessage + '\'' +
", schemaTarget='" + schemaTarget + '\'' +
", platformName='" + platformName + '\'' +
", ctrTaskStatus='" + ctrTaskStatus + '\'' +
", arguments=" + arguments +
'}';
}

public TaskExecution toTaskExecution() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -300,8 +300,8 @@ public TaskExecutionController taskExecutionController(
}

@Bean
public TaskExecutionThinController taskExecutionThinController(AggregateTaskExplorer aggregateTaskExplorer) {
return new TaskExecutionThinController(aggregateTaskExplorer);
public TaskExecutionThinController taskExecutionThinController(AggregateTaskExplorer aggregateTaskExplorer, TaskJobService taskJobService) {
return new TaskExecutionThinController(aggregateTaskExplorer, taskJobService);
}

@Bean
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@
import org.springframework.cloud.dataflow.aggregate.task.AggregateTaskExplorer;
import org.springframework.cloud.dataflow.rest.resource.TaskExecutionThinResource;
import org.springframework.cloud.dataflow.schema.AggregateTaskExecution;
import org.springframework.cloud.dataflow.server.service.TaskJobService;
import org.springframework.data.domain.Page;
import org.springframework.data.domain.Pageable;
import org.springframework.data.web.PagedResourcesAssembler;
import org.springframework.hateoas.PagedModel;
Expand All @@ -26,6 +28,8 @@
import org.springframework.http.HttpStatus;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestMethod;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.ResponseStatus;
import org.springframework.web.bind.annotation.RestController;

Expand All @@ -44,15 +48,29 @@ public class TaskExecutionThinController {
private final AggregateTaskExplorer explorer;
private final TaskExecutionThinResourceAssembler resourceAssembler;

public TaskExecutionThinController(AggregateTaskExplorer explorer) {
private final TaskJobService taskJobService;

public TaskExecutionThinController(AggregateTaskExplorer explorer, TaskJobService taskJobService) {
this.explorer = explorer;
this.taskJobService = taskJobService;
this.resourceAssembler = new TaskExecutionThinResourceAssembler();
}

@GetMapping(produces = "application/json")
@ResponseStatus(HttpStatus.OK)
public PagedModel<TaskExecutionThinResource> listTasks(Pageable pageable, PagedResourcesAssembler<AggregateTaskExecution> pagedAssembler) {
return pagedAssembler.toModel(explorer.findAll(pageable, true), resourceAssembler);
Page<AggregateTaskExecution> page = explorer.findAll(pageable, true);
taskJobService.populateComposeTaskRunnerStatus(page.getContent());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Have we determined the cost of running the retrieval of the CTR status for the results set?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I compared before and after timings from org.springframework.cloud.dataflow.integration.test.tasks.TaskExecutionQueryIT
I added index in parent_execution_id and now the difference is hardly noticeable.

return pagedAssembler.toModel(page, resourceAssembler);
}

@RequestMapping(value = "", method = RequestMethod.GET, params = "name")
@ResponseStatus(HttpStatus.OK)
public PagedModel<TaskExecutionThinResource> retrieveTasksByName(@RequestParam("name") String taskName,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we have a test to verify the population of the status?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

adding

Pageable pageable, PagedResourcesAssembler<AggregateTaskExecution> pagedAssembler) {
Page<AggregateTaskExecution> page = this.explorer.findTaskExecutionsByName(taskName, pageable);
taskJobService.populateComposeTaskRunnerStatus(page.getContent());
return pagedAssembler.toModel(page, resourceAssembler);
}

static class TaskExecutionThinResourceAssembler extends RepresentationModelAssemblerSupport<AggregateTaskExecution, TaskExecutionThinResource> {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
/*
* Copyright 2024 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.cloud.dataflow.server.db.migration;

import java.util.Arrays;
import java.util.List;

import org.springframework.cloud.dataflow.common.flyway.AbstractMigration;
import org.springframework.cloud.dataflow.common.flyway.SqlCommand;

/**
* Provide indexes to improve performance of finding child tasks.
* @author Corneil du Plessis
*/
public abstract class AbstractCreateTaskParentIndexMigration extends AbstractMigration {
protected static final String CREATE_TASK_PARENT_INDEX =
"create index TASK_EXECUTION_PARENT_IX on TASK_EXECUTION(PARENT_EXECUTION_ID)";
protected static final String CREATE_BOOT3_TASK_PARENT_INDEX =
"create index BOOT3_TASK_EXECUTION_PARENT_IX on BOOT3_TASK_EXECUTION(PARENT_EXECUTION_ID)";

public AbstractCreateTaskParentIndexMigration() {
super(null);
}

@Override
public List<SqlCommand> getCommands() {
return Arrays.asList(
SqlCommand.from(CREATE_TASK_PARENT_INDEX),
SqlCommand.from(CREATE_BOOT3_TASK_PARENT_INDEX)
);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
package org.springframework.cloud.dataflow.server.db.migration.db2;

import org.springframework.cloud.dataflow.server.db.migration.AbstractCreateTaskParentIndexMigration;

public class V11__CreateTaskParentIndex extends AbstractCreateTaskParentIndexMigration {

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
package org.springframework.cloud.dataflow.server.db.migration.mariadb;

import org.springframework.cloud.dataflow.server.db.migration.AbstractCreateTaskParentIndexMigration;

public class V12__CreateTaskParentIndex extends AbstractCreateTaskParentIndexMigration {

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
package org.springframework.cloud.dataflow.server.db.migration.mysql;

import org.springframework.cloud.dataflow.server.db.migration.AbstractCreateTaskParentIndexMigration;

public class V12__CreateTaskParentIndex extends AbstractCreateTaskParentIndexMigration {

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
package org.springframework.cloud.dataflow.server.db.migration.oracle;

import org.springframework.cloud.dataflow.server.db.migration.AbstractCreateTaskParentIndexMigration;

public class V12__CreateTaskParentIndex extends AbstractCreateTaskParentIndexMigration {

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
package org.springframework.cloud.dataflow.server.db.migration.postgresql;

import org.springframework.cloud.dataflow.server.db.migration.AbstractCreateTaskParentIndexMigration;

public class V13__CreateTaskParentIndex extends AbstractCreateTaskParentIndexMigration {

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
package org.springframework.cloud.dataflow.server.db.migration.sqlserver;

import org.springframework.cloud.dataflow.server.db.migration.AbstractCreateTaskParentIndexMigration;

public class V11__CreateTaskParentIndex extends AbstractCreateTaskParentIndexMigration {

}
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,8 @@
package org.springframework.cloud.dataflow.server.repository;


import java.util.Collection;
import java.util.Date;
import java.util.List;

import org.springframework.batch.core.BatchStatus;
import org.springframework.batch.core.JobInstance;
Expand All @@ -26,6 +26,7 @@
import org.springframework.batch.core.launch.NoSuchJobInstanceException;
import org.springframework.cloud.dataflow.rest.job.JobInstanceExecutions;
import org.springframework.cloud.dataflow.rest.job.TaskJobExecution;
import org.springframework.cloud.dataflow.schema.AggregateTaskExecution;
import org.springframework.data.domain.Page;
import org.springframework.data.domain.Pageable;

Expand Down Expand Up @@ -60,4 +61,6 @@ public interface AggregateJobQueryDao {

JobInstance getJobInstance(long id, String schemaTarget) throws NoSuchJobInstanceException;

void populateCtrStatus(Collection<AggregateTaskExecution> aggregateTaskExecutions);

}
Original file line number Diff line number Diff line change
Expand Up @@ -22,13 +22,15 @@
import java.time.Instant;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.Date;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.TreeMap;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Function;
import java.util.stream.Collectors;

import javax.sql.DataSource;
Expand Down Expand Up @@ -58,6 +60,7 @@
import org.springframework.cloud.dataflow.core.database.support.DatabaseType;
import org.springframework.cloud.dataflow.rest.job.JobInstanceExecutions;
import org.springframework.cloud.dataflow.rest.job.TaskJobExecution;
import org.springframework.cloud.dataflow.schema.AggregateTaskExecution;
import org.springframework.cloud.dataflow.schema.AppBootSchemaVersion;
import org.springframework.cloud.dataflow.schema.SchemaVersionTarget;
import org.springframework.cloud.dataflow.schema.service.SchemaService;
Expand Down Expand Up @@ -171,6 +174,12 @@ public class JdbcAggregateJobQueryDao implements AggregateJobQueryDao {
" LEFT OUTER JOIN AGGREGATE_TASK_BATCH TT ON E.JOB_EXECUTION_ID = TT.JOB_EXECUTION_ID AND E.SCHEMA_TARGET = TT.SCHEMA_TARGET" +
" LEFT OUTER JOIN AGGREGATE_TASK_EXECUTION T ON TT.TASK_EXECUTION_ID = T.TASK_EXECUTION_ID AND TT.SCHEMA_TARGET = T.SCHEMA_TARGET";

private static final String FIND_CTR_STATUS = "SELECT T.TASK_EXECUTION_ID as TASK_EXECUTION_ID, J.EXIT_MESSAGE as CTR_STATUS" +
" from AGGREGATE_TASK_EXECUTION T" +
" JOIN AGGREGATE_TASK_BATCH TB ON TB.TASK_EXECUTION_ID=T.TASK_EXECUTION_ID AND TB.SCHEMA_TARGET=T.SCHEMA_TARGET" +
" JOIN AGGREGATE_JOB_EXECUTION J ON J.JOB_EXECUTION_ID=TB.JOB_EXECUTION_ID AND J.SCHEMA_TARGET=TB.SCHEMA_TARGET" +
" WHERE T.TASK_EXECUTION_ID in (:taskExecutionIds) AND T.SCHEMA_TARGET = ? AND (select count(*) from AGGREGATE_TASK_EXECUTION WHERE PARENT_EXECUTION_ID = T.TASK_EXECUTION_ID) > 0";

private static final String FIND_JOB_BY_NAME_INSTANCE_ID = FIND_JOB_BY +
" where I.JOB_NAME = ? AND I.JOB_INSTANCE_ID = ?";

Expand Down Expand Up @@ -269,6 +278,31 @@ public Page<JobInstanceExecutions> listJobInstances(String jobName, Pageable pag

}

@Override
public void populateCtrStatus(Collection<AggregateTaskExecution> aggregateTaskExecutions) {
Map<String, List<AggregateTaskExecution>> targets = aggregateTaskExecutions.stream().collect(Collectors.groupingBy(aggregateTaskExecution -> aggregateTaskExecution.getSchemaTarget()));
final AtomicInteger updated = new AtomicInteger(0);
for(Map.Entry<String, List<AggregateTaskExecution>> entry : targets.entrySet()) {
String target = entry.getKey();
Map<Long, AggregateTaskExecution> aggregateTaskExecutionMap = entry.getValue().stream()
.collect(Collectors.toMap(AggregateTaskExecution::getExecutionId, Function.identity()));
String ids = aggregateTaskExecutionMap.keySet()
.stream()
.map(Object::toString)
.collect(Collectors.joining(","));
jdbcTemplate.query(FIND_CTR_STATUS.replace(":taskExecutionIds", ids), ps -> ps.setString(1, target), rs -> {
Long id = rs.getLong("TASK_EXECUTION_ID");
String ctrStatus = rs.getString("CTR_STATUS");
LOG.debug("populateCtrStatus:{}={}", id, ctrStatus);
AggregateTaskExecution execution = aggregateTaskExecutionMap.get(id);
Assert.notNull(execution, "Expected AggregateTaskExecution for " + id + " from " + ids);
updated.incrementAndGet();
execution.setCtrTaskStatus(ctrStatus);
});
}
LOG.debug("updated {} ctr statuses", updated.get());
}

@Override
public JobInstanceExecutions getJobInstanceExecution(String jobName, long instanceId) {
LOG.debug("getJobInstanceExecution:{}:{}:{}", jobName, instanceId, FIND_JOB_BY_NAME_INSTANCE_ID);
Expand Down
Loading
Loading