Skip to content

feat: Adds Stream Processor update support #3180

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 10 commits into from
Mar 25, 2025
3 changes: 3 additions & 0 deletions .changelog/3180.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
```release-note:enhancement
resource/mongodbatlas_stream_processor: Adds update support
```
4 changes: 2 additions & 2 deletions docs/data-sources/stream_processor.md
Original file line number Diff line number Diff line change
Expand Up @@ -128,9 +128,9 @@ output "stream_processors_results" {
- `id` (String) Unique 24-hexadecimal character string that identifies the stream processor.
- `options` (Attributes) Optional configuration for the stream processor. (see [below for nested schema](#nestedatt--options))
- `pipeline` (String) Stream aggregation pipeline you want to apply to your streaming data. [MongoDB Atlas Docs](https://www.mongodb.com/docs/atlas/atlas-stream-processing/stream-aggregation/#std-label-stream-aggregation) contain more information. Using [jsonencode](https://developer.hashicorp.com/terraform/language/functions/jsonencode) is recommended when setting this attribute. For more details see the [Aggregation Pipelines Documentation](https://www.mongodb.com/docs/atlas/atlas-stream-processing/stream-aggregation/)
- `state` (String) The state of the stream processor. Commonly occurring states are 'CREATED', 'STARTED', 'STOPPED' and 'FAILED'. Used to start or stop the Stream Processor. Valid values are `CREATED`, `STARTED` or `STOPPED`. When a Stream Processor is created without specifying the state, it will default to `CREATED` state.
- `state` (String) The state of the stream processor. Commonly occurring states are 'CREATED', 'STARTED', 'STOPPED' and 'FAILED'. Used to start or stop the Stream Processor. Valid values are `CREATED`, `STARTED` or `STOPPED`. When a Stream Processor is created without specifying the state, it will default to `CREATED` state. When a Stream Processor is updated without specifying the state, it will default to the Previous state.

**NOTE** When creating a stream processor, setting the state to STARTED can automatically start the stream processor.
**NOTE** When a Stream Processor is updated without specifying the state, it is stopped and then restored to previous state upon update completion.
- `stats` (String) The stats associated with the stream processor. Refer to the [MongoDB Atlas Docs](https://www.mongodb.com/docs/atlas/atlas-stream-processing/manage-stream-processor/#view-statistics-of-a-stream-processor) for more information.

<a id="nestedatt--options"></a>
Expand Down
4 changes: 2 additions & 2 deletions docs/data-sources/stream_processors.md
Original file line number Diff line number Diff line change
Expand Up @@ -143,9 +143,9 @@ Read-Only:
- `project_id` (String) Unique 24-hexadecimal digit string that identifies your project. Use the [/groups](#tag/Projects/operation/listProjects) endpoint to retrieve all projects to which the authenticated user has access.

**NOTE**: Groups and projects are synonymous terms. Your group id is the same as your project id. For existing groups, your group/project id remains the same. The resource and corresponding endpoints use the term groups.
- `state` (String) The state of the stream processor. Commonly occurring states are 'CREATED', 'STARTED', 'STOPPED' and 'FAILED'. Used to start or stop the Stream Processor. Valid values are `CREATED`, `STARTED` or `STOPPED`. When a Stream Processor is created without specifying the state, it will default to `CREATED` state.
- `state` (String) The state of the stream processor. Commonly occurring states are 'CREATED', 'STARTED', 'STOPPED' and 'FAILED'. Used to start or stop the Stream Processor. Valid values are `CREATED`, `STARTED` or `STOPPED`. When a Stream Processor is created without specifying the state, it will default to `CREATED` state. When a Stream Processor is updated without specifying the state, it will default to the Previous state.

**NOTE** When creating a stream processor, setting the state to STARTED can automatically start the stream processor.
**NOTE** When a Stream Processor is updated without specifying the state, it is stopped and then restored to previous state upon update completion.
- `stats` (String) The stats associated with the stream processor. Refer to the [MongoDB Atlas Docs](https://www.mongodb.com/docs/atlas/atlas-stream-processing/manage-stream-processor/#view-statistics-of-a-stream-processor) for more information.

<a id="nestedatt--results--options"></a>
Expand Down
20 changes: 6 additions & 14 deletions docs/resources/stream_processor.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,18 +2,10 @@

`mongodbatlas_stream_processor` provides a Stream Processor resource. The resource lets you create, delete, import, start and stop a stream processor in a stream instance.

**NOTE**: Updating an Atlas Stream Processor is currently not supported. As a result, the following steps are needed to be able to change an Atlas Stream Processor with an Atlas Change Stream Source:
1. Retrieve the value of Change Stream Source Token `changeStreamState` from the computed `stats` attribute in `mongodbatlas_stream_processor` resource or datasource or from the Terraform state file. This takes the form of a [resume token](https://www.mongodb.com/docs/manual/changeStreams/#resume-tokens-from-change-events). The Stream Processor has to be running in the state `STARTED` for the `stats` attribute to be available. However, before you retrieve the value, you should set the `state` to `STOPPED` to get the latest `changeStreamState`.
- Example:
```
{\"changeStreamState\":{\"_data\":\"8266C71670000000012B0429296E1404\"}
```
2. Update the `pipeline` argument setting `config.StartAfter` with the value retrieved in the previous step. More details in the [MongoDB Collection Change Stream](https://www.mongodb.com/docs/atlas/atlas-stream-processing/sp-agg-source/#mongodb-collection-change-stream) documentation.
- Example:
```
pipeline = jsonencode([{ "$source" = { "connectionName" = resource.mongodbatlas_stream_connection.example-cluster.connection_name, "config" = { "startAfter" = { "_data" : "8266C71562000000012B0429296E1404" } } } }, { "$emit" = { "connectionName" : "KafkaConnectionDest", "topic": "kafka-topic" } }])
```
3. Delete the existing Atlas Stream Processor and then create a new Atlas Stream Processor with updated pipeline parameter and the updated values.
**NOTE**: When updating an Atlas Stream Processor, the following behavior applies:
1. If the processor is in a `STARTED` state, it will automatically be stopped before the update is applied
2. The update will be performed while the processor is in `STOPPED` state
3. If the processor was originally in `STARTED` state, it will be restarted after the update

## Example Usages

Expand Down Expand Up @@ -141,9 +133,9 @@ output "stream_processors_results" {
### Optional

- `options` (Attributes) Optional configuration for the stream processor. (see [below for nested schema](#nestedatt--options))
- `state` (String) The state of the stream processor. Commonly occurring states are 'CREATED', 'STARTED', 'STOPPED' and 'FAILED'. Used to start or stop the Stream Processor. Valid values are `CREATED`, `STARTED` or `STOPPED`. When a Stream Processor is created without specifying the state, it will default to `CREATED` state.
- `state` (String) The state of the stream processor. Commonly occurring states are 'CREATED', 'STARTED', 'STOPPED' and 'FAILED'. Used to start or stop the Stream Processor. Valid values are `CREATED`, `STARTED` or `STOPPED`. When a Stream Processor is created without specifying the state, it will default to `CREATED` state. When a Stream Processor is updated without specifying the state, it will default to the Previous state.
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this detail might not be necessary. The expectation in Terraform is that a non modified attribute will not be changed. Maybe we should mention in a more general level the fact that the stream will be stopped. Maybe something similar to:

**NOTE**: When updating an Atlas Stream Processor, the following behavior applies:
1. If the processor is in a `STARTED` state, it will automatically be stopped before the update is applied
2. The update will be performed while the processor is in `STOPPED` state
3. If the processor was originally in `STARTED` state, it will be restarted after the update

docs team can confirm when they review

Copy link
Collaborator Author

@jwongmongodb jwongmongodb Mar 19, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what about something like this?

***NOTE***: The following behavior applies to an Atlas Stream Processor that is in a STARTED state:
1. The processor will automatically be stopped before the update is applied.
2. The update will be performed while the processor is in a `STOPPED` state.
3. The processor will be restarted after the update if the config state field is empty or set to `STARTED`.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think this behavior is specific to TF right? Is this something that can be captured in the API spec instead so we can simply add a link here?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The TF behavior may calling multiple endpoints at once (STOP, GET and MODIFY endpoint). Due to the nature of the stream processor, it can only be modified in the CREATED or STOPPED state. I don't think we can have one specific link to a particular endpoint like MODIFY. Perhaps we can add a requirement that the stream processor must be STOPPED before we can modify it.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Currently, the client needs to stop, modify, then start again a processor that is running. The update/modify API does not do it. So in TF we are removing this complexity to have a better user experience

Copy link
Collaborator

@maastha maastha Mar 21, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

got it, in that case, I like @oarbusi 's suggestion. Although I'd say we should add it as a separate note to call out what happens during an update instead of adding this note to state.
For the state description I think the current one makes sense, maybe rephrase a bit like "When a Stream Processor is updated without specifying the state, it is stopped & then restored to previous state upon update completion."

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is now changed with this commit. Can you also take another look @davidhou17 to see if these documentation changes look good?


**NOTE** When creating a stream processor, setting the state to STARTED can automatically start the stream processor.
**NOTE** When a Stream Processor is updated without specifying the state, it is stopped and then restored to previous state upon update completion.

### Read-Only

Expand Down
37 changes: 37 additions & 0 deletions internal/service/streamprocessor/model.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,43 @@ func NewStreamProcessorReq(ctx context.Context, plan *TFStreamProcessorRSModel)
return streamProcessor, nil
}

func NewStreamProcessorUpdateReq(ctx context.Context, plan *TFStreamProcessorRSModel) (*admin.ModifyStreamProcessorApiParams, diag.Diagnostics) {
pipeline, diags := convertPipelineToSdk(plan.Pipeline.ValueString())
if diags != nil {
return nil, diags
}

streamProcessorAPIParams := &admin.ModifyStreamProcessorApiParams{
GroupId: plan.ProjectID.ValueString(),
TenantName: plan.InstanceName.ValueString(),
ProcessorName: plan.ProcessorName.ValueString(),
StreamsModifyStreamProcessor: &admin.StreamsModifyStreamProcessor{
Name: plan.ProcessorName.ValueStringPointer(),
Pipeline: &pipeline,
},
}

if !plan.Options.IsNull() && !plan.Options.IsUnknown() {
optionsModel := &TFOptionsModel{}
if diags := plan.Options.As(ctx, optionsModel, basetypes.ObjectAsOptions{}); diags.HasError() {
return nil, diags
}
dlqModel := &TFDlqModel{}
if diags := optionsModel.Dlq.As(ctx, dlqModel, basetypes.ObjectAsOptions{}); diags.HasError() {
return nil, diags
}
streamProcessorAPIParams.StreamsModifyStreamProcessor.Options = &admin.StreamsModifyStreamProcessorOptions{
Dlq: &admin.StreamsDLQ{
Coll: dlqModel.Coll.ValueStringPointer(),
ConnectionName: dlqModel.ConnectionName.ValueStringPointer(),
Db: dlqModel.DB.ValueStringPointer(),
},
}
}

return streamProcessorAPIParams, nil
}

func NewStreamProcessorWithStats(ctx context.Context, projectID, instanceName string, apiResp *admin.StreamsProcessorWithStats) (*TFStreamProcessorRSModel, diag.Diagnostics) {
if apiResp == nil {
return nil, diag.Diagnostics{diag.NewErrorDiagnostic("streamProcessor API response is nil", "")}
Expand Down
103 changes: 57 additions & 46 deletions internal/service/streamprocessor/resource.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ package streamprocessor
import (
"context"
"errors"
"fmt"
"regexp"

"go.mongodb.org/atlas-sdk/v20250219001/admin"
Expand Down Expand Up @@ -159,63 +158,84 @@ func (r *streamProcessorRS) Update(ctx context.Context, req resource.UpdateReque
return
}

connV2 := r.Client.AtlasV2
pendingStates := []string{CreatedState}
desiredState := []string{}
plannedState := plan.State.ValueString()
if plannedState == "" {
plannedState = state.State.ValueString()
}

projectID := plan.ProjectID.ValueString()
instanceName := plan.InstanceName.ValueString()
processorName := plan.ProcessorName.ValueString()
currentState := state.State.ValueString()
if !updatedStateOnly(&plan, &state) {
resp.Diagnostics.AddError("updating a Stream Processor is not supported", "")
connV2 := r.Client.AtlasV2
var streamProcessorResp *admin.StreamsProcessorWithStats

// requestParams are needed for the state transition via the GET API
requestParams := &admin.GetStreamProcessorApiParams{
GroupId: projectID,
TenantName: instanceName,
ProcessorName: processorName,
}

if errMsg, isValidStateTransition := ValidateUpdateStateTransition(currentState, plannedState); !isValidStateTransition {
resp.Diagnostics.AddError(errMsg, "")
return
}
switch plan.State.ValueString() {
case StartedState:
desiredState = append(desiredState, StartedState)
pendingStates = append(pendingStates, StoppedState)
_, _, err := connV2.StreamsApi.StartStreamProcessorWithParams(ctx,
&admin.StartStreamProcessorApiParams{
GroupId: projectID,
TenantName: instanceName,
ProcessorName: processorName,

// we must stop the current stream processor if the current state is started
if currentState == StartedState {
_, _, err := connV2.StreamsApi.StopStreamProcessorWithParams(ctx,
&admin.StopStreamProcessorApiParams{
GroupId: plan.ProjectID.ValueString(),
TenantName: plan.InstanceName.ValueString(),
ProcessorName: plan.ProcessorName.ValueString(),
},
).Execute()
if err != nil {
resp.Diagnostics.AddError("Error starting stream processor", err.Error())
resp.Diagnostics.AddError("Error stopping stream processor", err.Error())
return
}
case StoppedState:
if currentState != StartedState {
resp.Diagnostics.AddError(fmt.Sprintf("Stream Processor must be in %s state to transition to %s state", StartedState, StoppedState), "")

// wait for transition from started to stopped
_, err = WaitStateTransition(ctx, requestParams, r.Client.AtlasV2.StreamsApi, []string{StartedState}, []string{StoppedState})
if err != nil {
resp.Diagnostics.AddError("Error changing state of stream processor", err.Error())
return
}
desiredState = append(desiredState, StoppedState)
pendingStates = append(pendingStates, StartedState)
_, _, err := connV2.StreamsApi.StopStreamProcessorWithParams(ctx,
&admin.StopStreamProcessorApiParams{
}

// modify the stream processor
modifyAPIRequestParams, diags := NewStreamProcessorUpdateReq(ctx, &plan)
if diags.HasError() {
resp.Diagnostics.Append(diags...)
return
}
streamProcessorResp, _, err := r.Client.AtlasV2.StreamsApi.ModifyStreamProcessorWithParams(ctx, modifyAPIRequestParams).Execute()
if err != nil {
resp.Diagnostics.AddError("Error modifying stream processor", err.Error())
return
}

// start the stream processor if the desired state is started
if plannedState == StartedState {
_, _, err := r.Client.AtlasV2.StreamsApi.StartStreamProcessorWithParams(ctx,
&admin.StartStreamProcessorApiParams{
GroupId: projectID,
TenantName: instanceName,
ProcessorName: processorName,
},
).Execute()
if err != nil {
resp.Diagnostics.AddError("Error stopping stream processor", err.Error())
resp.Diagnostics.AddError("Error starting stream processor", err.Error())
return
}
default:
resp.Diagnostics.AddError("transitions to states other than STARTED or STOPPED are not supported", "")
return
}

requestParams := &admin.GetStreamProcessorApiParams{
GroupId: projectID,
TenantName: instanceName,
ProcessorName: processorName,
}

streamProcessorResp, err := WaitStateTransition(ctx, requestParams, connV2.StreamsApi, pendingStates, desiredState)
if err != nil {
resp.Diagnostics.AddError("Error changing state of stream processor", err.Error())
// wait for transition from stopped to started
streamProcessorResp, err = WaitStateTransition(ctx, requestParams, r.Client.AtlasV2.StreamsApi, []string{StoppedState}, []string{StartedState})
if err != nil {
resp.Diagnostics.AddError("Error changing state of stream processor", err.Error())
return
}
}

newStreamProcessorModel, diags := NewStreamProcessorWithStats(ctx, projectID, instanceName, streamProcessorResp)
Expand Down Expand Up @@ -267,12 +287,3 @@ func splitImportID(id string) (projectID, instanceName, processorName *string, e

return
}

func updatedStateOnly(plan, state *TFStreamProcessorRSModel) bool {
return plan.ProjectID.Equal(state.ProjectID) &&
plan.InstanceName.Equal(state.InstanceName) &&
plan.ProcessorName.Equal(state.ProcessorName) &&
plan.Pipeline.Equal(state.Pipeline) &&
(plan.Options.Equal(state.Options) || plan.Options.IsUnknown()) &&
!plan.State.Equal(state.State)
}
2 changes: 1 addition & 1 deletion internal/service/streamprocessor/resource_schema.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ func ResourceSchema(ctx context.Context) schema.Schema {
Optional: true,
Computed: true,
MarkdownDescription: "The state of the stream processor. Commonly occurring states are 'CREATED', 'STARTED', 'STOPPED' and 'FAILED'. Used to start or stop the Stream Processor. Valid values are `CREATED`, `STARTED` or `STOPPED`." +
" When a Stream Processor is created without specifying the state, it will default to `CREATED` state.\n\n**NOTE** When creating a stream processor, setting the state to STARTED can automatically start the stream processor.",
" When a Stream Processor is created without specifying the state, it will default to `CREATED` state. When a Stream Processor is updated without specifying the state, it will default to the Previous state. \n\n**NOTE** When a Stream Processor is updated without specifying the state, it is stopped and then restored to previous state upon update completion.",
},
"options": schema.SingleNestedAttribute{
Optional: true,
Expand Down
Loading
Loading