Skip to content

refactor: move executor to AbstractWorkflowExecutor, add submit method #1957

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Jun 21, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
import java.util.stream.Collectors;

Expand All @@ -30,12 +31,14 @@ public abstract class AbstractWorkflowExecutor<P extends HasMetadata> {
private final Map<DependentResourceNode, Future<?>> actualExecutions = new ConcurrentHashMap<>();
private final Map<DependentResourceNode, Exception> exceptionsDuringExecution =
new ConcurrentHashMap<>();
private final ExecutorService executorService;

public AbstractWorkflowExecutor(Workflow<P> workflow, P primary, Context<P> context) {
this.workflow = workflow;
this.primary = primary;
this.context = context;
this.primaryID = ResourceID.fromResource(primary);
executorService = context.getWorkflowExecutorService();
}

protected abstract Logger logger();
Expand Down Expand Up @@ -107,10 +110,16 @@ protected synchronized void handleNodeExecutionFinish(
}
}

@SuppressWarnings("unchecked")
protected <R> boolean isConditionMet(Optional<Condition<R, P>> condition,
DependentResource<R, P> dependentResource) {
return condition.map(c -> c.isMet(dependentResource, primary, context))
.orElse(true);
return condition.map(c -> c.isMet(dependentResource, primary, context)).orElse(true);
}

protected <R> void submit(DependentResourceNode<R, P> dependentResourceNode,
NodeExecutor<R, P> nodeExecutor, String operation) {
final Future<?> future = executorService.submit(nodeExecutor);
markAsExecuting(dependentResourceNode, future);
logger().debug("Submitted to {}: {} primaryID: {}", operation, dependentResourceNode,
primaryID);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,6 @@
import java.util.List;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
import java.util.stream.Collectors;

import org.slf4j.Logger;
Expand All @@ -19,15 +17,14 @@
public class WorkflowCleanupExecutor<P extends HasMetadata> extends AbstractWorkflowExecutor<P> {

private static final Logger log = LoggerFactory.getLogger(WorkflowCleanupExecutor.class);
private static final String CLEANUP = "cleanup";

private final Set<DependentResourceNode> postDeleteConditionNotMet =
ConcurrentHashMap.newKeySet();
private final Set<DependentResourceNode> deleteCalled = ConcurrentHashMap.newKeySet();
private final ExecutorService executorService;

public WorkflowCleanupExecutor(Workflow<P> workflow, P primary, Context<P> context) {
super(workflow, primary, context);
this.executorService = context.getWorkflowExecutorService();
}

public synchronized WorkflowCleanupResult cleanup() {
Expand Down Expand Up @@ -55,9 +52,7 @@ private synchronized void handleCleanup(DependentResourceNode dependentResourceN
return;
}

Future<?> nodeFuture = executorService.submit(new CleanupExecutor<>(dependentResourceNode));
markAsExecuting(dependentResourceNode, nodeFuture);
log.debug("Submitted for cleanup: {}", dependentResourceNode);
submit(dependentResourceNode, new CleanupExecutor<>(dependentResourceNode), CLEANUP);
}


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,6 @@
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
import java.util.stream.Collectors;

import org.slf4j.Logger;
Expand All @@ -21,6 +19,8 @@
public class WorkflowReconcileExecutor<P extends HasMetadata> extends AbstractWorkflowExecutor<P> {

private static final Logger log = LoggerFactory.getLogger(WorkflowReconcileExecutor.class);
private static final String RECONCILE = "reconcile";
private static final String DELETE = "delete";


private final Set<DependentResourceNode> notReady = ConcurrentHashMap.newKeySet();
Expand All @@ -32,11 +32,9 @@ public class WorkflowReconcileExecutor<P extends HasMetadata> extends AbstractWo
private final Set<DependentResourceNode> reconciled = ConcurrentHashMap.newKeySet();
private final Map<DependentResource, ReconcileResult> reconcileResults =
new ConcurrentHashMap<>();
private final ExecutorService executorService;

public WorkflowReconcileExecutor(Workflow<P> workflow, P primary, Context<P> context) {
super(workflow, primary, context);
this.executorService = context.getWorkflowExecutorService();
}

public synchronized WorkflowReconcileResult reconcile() {
Expand Down Expand Up @@ -69,9 +67,7 @@ private synchronized <R> void handleReconcile(DependentResourceNode<R, P> depend
if (!reconcileConditionMet) {
handleReconcileConditionNotMet(dependentResourceNode);
} else {
var nodeFuture = executorService.submit(new NodeReconcileExecutor(dependentResourceNode));
markAsExecuting(dependentResourceNode, nodeFuture);
log.debug("Submitted to reconcile: {} primaryID: {}", dependentResourceNode, primaryID);
submit(dependentResourceNode, new NodeReconcileExecutor<>(dependentResourceNode), RECONCILE);
}
}

Expand All @@ -87,10 +83,7 @@ private synchronized void handleDelete(DependentResourceNode dependentResourceNo
return;
}

Future<?> nodeFuture = executorService
.submit(new NodeDeleteExecutor(dependentResourceNode));
markAsExecuting(dependentResourceNode, nodeFuture);
log.debug("Submitted to delete: {}", dependentResourceNode);
submit(dependentResourceNode, new NodeDeleteExecutor<>(dependentResourceNode), DELETE);
}

private boolean allDependentsDeletedAlready(DependentResourceNode<?, P> dependentResourceNode) {
Expand Down