Skip to content

Removing Saga from Dapr Workflows #1216

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 3 commits into from
Feb 28, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -29,13 +29,13 @@
import org.testcontainers.junit.jupiter.Container;
import org.testcontainers.junit.jupiter.Testcontainers;

import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

import static io.dapr.it.spring.data.DaprSpringDataConstants.BINDING_NAME;
import static io.dapr.it.spring.data.DaprSpringDataConstants.STATE_STORE_NAME;
import static io.dapr.it.testcontainers.DaprContainerConstants.IMAGE_TAG;
import static org.junit.jupiter.api.Assertions.*;

/**
Expand Down Expand Up @@ -65,7 +65,7 @@ public class DaprKeyValueRepositoryIT {

@Container
@ServiceConnection
private static final DaprContainer DAPR_CONTAINER = new DaprContainer("daprio/daprd:1.13.2")
private static final DaprContainer DAPR_CONTAINER = new DaprContainer(IMAGE_TAG)
.withAppName("postgresql-repository-dapr-app")
.withNetwork(DAPR_NETWORK)
.withComponent(new Component(STATE_STORE_NAME, "state.postgresql", "v1", STATE_STORE_PROPERTIES))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,6 @@
import org.springframework.boot.testcontainers.service.connection.ServiceConnection;
import org.springframework.data.keyvalue.core.query.KeyValueQuery;
import org.springframework.test.context.ContextConfiguration;
import org.springframework.test.context.DynamicPropertyRegistry;
import org.springframework.test.context.DynamicPropertySource;
import org.springframework.test.context.junit.jupiter.SpringExtension;
import org.testcontainers.containers.MySQLContainer;
import org.testcontainers.containers.Network;
Expand All @@ -39,14 +37,14 @@
import java.time.Duration;
import java.time.temporal.ChronoUnit;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;

import static io.dapr.it.spring.data.DaprSpringDataConstants.STATE_STORE_NAME;
import static io.dapr.it.spring.data.DaprSpringDataConstants.BINDING_NAME;
import static io.dapr.it.testcontainers.DaprContainerConstants.IMAGE_TAG;
import static org.assertj.core.api.Assertions.assertThat;
import static org.junit.jupiter.api.Assertions.assertEquals;

Expand Down Expand Up @@ -82,7 +80,7 @@ public class MySQLDaprKeyValueTemplateIT {

@Container
@ServiceConnection
private static final DaprContainer DAPR_CONTAINER = new DaprContainer("daprio/daprd:1.13.2")
private static final DaprContainer DAPR_CONTAINER = new DaprContainer(IMAGE_TAG)
.withAppName("mysql-dapr-app")
.withNetwork(DAPR_NETWORK)
.withComponent(new Component(STATE_STORE_NAME, "state.mysql", "v1", STATE_STORE_PROPERTIES))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,6 @@
import org.springframework.boot.testcontainers.service.connection.ServiceConnection;
import org.springframework.data.keyvalue.core.query.KeyValueQuery;
import org.springframework.test.context.ContextConfiguration;
import org.springframework.test.context.DynamicPropertyRegistry;
import org.springframework.test.context.DynamicPropertySource;
import org.springframework.test.context.junit.jupiter.SpringExtension;
import org.testcontainers.containers.Network;
import org.testcontainers.containers.PostgreSQLContainer;
Expand All @@ -38,6 +36,7 @@

import static io.dapr.it.spring.data.DaprSpringDataConstants.BINDING_NAME;
import static io.dapr.it.spring.data.DaprSpringDataConstants.STATE_STORE_NAME;
import static io.dapr.it.testcontainers.DaprContainerConstants.IMAGE_TAG;
import static org.assertj.core.api.Assertions.assertThat;
import static org.junit.jupiter.api.Assertions.assertEquals;

Expand Down Expand Up @@ -68,7 +67,7 @@ public class PostgreSQLDaprKeyValueTemplateIT {

@Container
@ServiceConnection
private static final DaprContainer DAPR_CONTAINER = new DaprContainer("daprio/daprd:1.13.2")
private static final DaprContainer DAPR_CONTAINER = new DaprContainer(IMAGE_TAG)
.withAppName("postgresql-dapr-app")
.withNetwork(DAPR_NETWORK)
.withComponent(new Component(STATE_STORE_NAME, "state.postgresql", "v1", STATE_STORE_PROPERTIES))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
import java.util.Collections;
import java.util.List;

import static io.dapr.it.testcontainers.DaprContainerConstants.IMAGE_TAG;
import static org.assertj.core.api.Assertions.assertThat;

@SpringBootTest(
Expand All @@ -60,7 +61,7 @@ public class DaprSpringMessagingIT {

@Container
@ServiceConnection
private static final DaprContainer DAPR_CONTAINER = new DaprContainer("daprio/daprd:1.13.2")
private static final DaprContainer DAPR_CONTAINER = new DaprContainer(IMAGE_TAG)
.withAppName("messaging-dapr-app")
.withNetwork(DAPR_NETWORK)
.withComponent(new Component("pubsub", "pubsub.in-memory", "v1", Collections.emptyMap()))
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
package io.dapr.it.testcontainers;

public interface DaprContainerConstants {
String IMAGE_TAG = "daprio/daprd:1.14.1";
}
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@
import static com.github.tomakehurst.wiremock.client.WireMock.urlEqualTo;
import static com.github.tomakehurst.wiremock.client.WireMock.urlMatching;
import static com.github.tomakehurst.wiremock.client.WireMock.verify;
import static io.dapr.it.testcontainers.DaprContainerConstants.IMAGE_TAG;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertTrue;
Expand All @@ -61,7 +62,7 @@ public class DaprContainerIT {
private static final String PUBSUB_TOPIC_NAME = "topic";

@Container
private static final DaprContainer DAPR_CONTAINER = new DaprContainer("daprio/daprd")
private static final DaprContainer DAPR_CONTAINER = new DaprContainer(IMAGE_TAG)
.withAppName("dapr-app")
.withAppPort(8081)
.withAppChannelAddress("host.testcontainers.internal");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@
import java.util.Collections;
import java.util.Map;

import static io.dapr.it.testcontainers.DaprContainerConstants.IMAGE_TAG;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertNotNull;

Expand All @@ -56,7 +57,7 @@ public class DaprWorkflowsIT {
private static final Network DAPR_NETWORK = Network.newNetwork();

@Container
private static final DaprContainer DAPR_CONTAINER = new DaprContainer("daprio/daprd:1.13.2")
private static final DaprContainer DAPR_CONTAINER = new DaprContainer(IMAGE_TAG)
.withAppName("workflow-dapr-app")
.withNetwork(DAPR_NETWORK)
.withComponent(new Component("kvstore", "state.in-memory", "v1",
Expand Down
44 changes: 1 addition & 43 deletions sdk-workflows/src/main/java/io/dapr/workflows/Workflow.java
Original file line number Diff line number Diff line change
Expand Up @@ -13,11 +13,6 @@

package io.dapr.workflows;

import com.microsoft.durabletask.interruption.ContinueAsNewInterruption;
import com.microsoft.durabletask.interruption.OrchestratorBlockedException;
import io.dapr.workflows.saga.SagaCompensationException;
import io.dapr.workflows.saga.SagaOptions;

/**
* Common interface for workflow implementations.
*/
Expand All @@ -39,43 +34,6 @@ public interface Workflow {
default void run(WorkflowContext ctx) {
WorkflowStub stub = this.create();

if (!this.isSagaEnabled()) {
// saga disabled
stub.run(ctx);
} else {
// saga enabled
try {
stub.run(ctx);
} catch (OrchestratorBlockedException | ContinueAsNewInterruption e) {
throw e;
} catch (SagaCompensationException e) {
// Saga compensation is triggered gracefully but failed in exception
// don't need to trigger compensation again
throw e;
} catch (Exception e) {
try {
ctx.getSagaContext().compensate();
} catch (Exception se) {
se.addSuppressed(e);
throw se;
}

throw e;
}
}
}

default boolean isSagaEnabled() {
return this.getSagaOption() != null;
}

/**
* get saga configuration.
*
* @return saga configuration
*/
default SagaOptions getSagaOption() {
// by default, saga is disabled
return null;
stub.run(ctx);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@
import com.microsoft.durabletask.Task;
import com.microsoft.durabletask.TaskCanceledException;
import com.microsoft.durabletask.TaskFailedException;
import io.dapr.workflows.saga.SagaContext;
import org.slf4j.Logger;

import javax.annotation.Nullable;
Expand Down Expand Up @@ -530,12 +529,4 @@ default void continueAsNew(Object input) {
default UUID newUuid() {
throw new RuntimeException("No implementation found.");
}

/**
* get saga context.
*
* @return saga context
* @throws UnsupportedOperationException if saga is not enabled.
*/
SagaContext getSagaContext();
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,9 +22,6 @@
import io.dapr.workflows.WorkflowContext;
import io.dapr.workflows.WorkflowTaskOptions;
import io.dapr.workflows.WorkflowTaskRetryPolicy;
import io.dapr.workflows.runtime.saga.DefaultSagaContext;
import io.dapr.workflows.saga.Saga;
import io.dapr.workflows.saga.SagaContext;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.slf4j.helpers.NOPLogger;
Expand All @@ -39,7 +36,6 @@
public class DefaultWorkflowContext implements WorkflowContext {
private final TaskOrchestrationContext innerContext;
private final Logger logger;
private final Saga saga;

/**
* Constructor for DaprWorkflowContextImpl.
Expand All @@ -58,23 +54,7 @@ public DefaultWorkflowContext(TaskOrchestrationContext context) throws IllegalAr
* @param logger Logger
* @throws IllegalArgumentException if context or logger is null
*/
public DefaultWorkflowContext(TaskOrchestrationContext context, Logger logger) throws IllegalArgumentException {
this(context, logger, null);
}

public DefaultWorkflowContext(TaskOrchestrationContext context, Saga saga) throws IllegalArgumentException {
this(context, LoggerFactory.getLogger(WorkflowContext.class), saga);
}

/**
* Constructor for DaprWorkflowContextImpl.
*
* @param context TaskOrchestrationContext
* @param logger Logger
* @param saga saga object, if null, saga is disabled
* @throws IllegalArgumentException if context or logger is null
*/
public DefaultWorkflowContext(TaskOrchestrationContext context, Logger logger, Saga saga)
public DefaultWorkflowContext(TaskOrchestrationContext context, Logger logger)
throws IllegalArgumentException {
if (context == null) {
throw new IllegalArgumentException("Context cannot be null");
Expand All @@ -85,7 +65,6 @@ public DefaultWorkflowContext(TaskOrchestrationContext context, Logger logger, S

this.innerContext = context;
this.logger = logger;
this.saga = saga;
}

/**
Expand Down Expand Up @@ -249,15 +228,6 @@ public UUID newUuid() {
return this.innerContext.newUUID();
}

@Override
public SagaContext getSagaContext() {
if (this.saga == null) {
throw new UnsupportedOperationException("Saga is not enabled");
}

return new DefaultSagaContext(this.saga, this);
}

private static TaskOptions toTaskOptions(WorkflowTaskOptions options) {
if (options == null) {
return null;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@
import com.microsoft.durabletask.TaskOrchestration;
import com.microsoft.durabletask.TaskOrchestrationFactory;
import io.dapr.workflows.Workflow;
import io.dapr.workflows.saga.Saga;

import java.lang.reflect.Constructor;
import java.lang.reflect.InvocationTargetException;
Expand All @@ -30,6 +29,7 @@ class WorkflowClassWrapper<T extends Workflow> implements TaskOrchestrationFacto

public WorkflowClassWrapper(Class<T> clazz) {
this.name = clazz.getCanonicalName();

try {
this.workflowConstructor = clazz.getDeclaredConstructor();
} catch (NoSuchMethodException e) {
Expand All @@ -48,6 +48,7 @@ public String getName() {
public TaskOrchestration create() {
return ctx -> {
T workflow;

try {
workflow = this.workflowConstructor.newInstance();
} catch (InstantiationException | IllegalAccessException | InvocationTargetException e) {
Expand All @@ -56,13 +57,7 @@ public TaskOrchestration create() {
);
}

if (workflow.getSagaOption() != null) {
Saga saga = new Saga(workflow.getSagaOption());
workflow.run(new DefaultWorkflowContext(ctx, saga));
} else {
workflow.run(new DefaultWorkflowContext(ctx));
}
workflow.run(new DefaultWorkflowContext(ctx));
};

}
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@
import com.microsoft.durabletask.TaskOrchestration;
import com.microsoft.durabletask.TaskOrchestrationFactory;
import io.dapr.workflows.Workflow;
import io.dapr.workflows.saga.Saga;

/**
* Wrapper for Durable Task Framework orchestration factory.
Expand All @@ -37,13 +36,6 @@ public String getName() {

@Override
public TaskOrchestration create() {
return ctx -> {
if (workflow.getSagaOption() != null) {
Saga saga = new Saga(workflow.getSagaOption());
workflow.run(new DefaultWorkflowContext(ctx, saga));
} else {
workflow.run(new DefaultWorkflowContext(ctx));
}
};
return ctx -> workflow.run(new DefaultWorkflowContext(ctx));
}
}
Loading