Skip to content

Latest commit

 

History

History
167 lines (135 loc) · 8.02 KB

stream_processors.md

File metadata and controls

167 lines (135 loc) · 8.02 KB

Data Source: mongodbatlas_stream_processors

mongodbatlas_stream_processors returns all stream processors in a stream instance.

Example Usages

resource "mongodbatlas_stream_instance" "example" {
  project_id    = var.project_id
  instance_name = "InstanceName"
  data_process_region = {
    region         = "VIRGINIA_USA"
    cloud_provider = "AWS"
  }
}

resource "mongodbatlas_stream_connection" "example-sample" {
  project_id      = var.project_id
  instance_name   = mongodbatlas_stream_instance.example.instance_name
  connection_name = "sample_stream_solar"
  type            = "Sample"
}

resource "mongodbatlas_stream_connection" "example-cluster" {
  project_id      = var.project_id
  instance_name   = mongodbatlas_stream_instance.example.instance_name
  connection_name = "ClusterConnection"
  type            = "Cluster"
  cluster_name    = var.cluster_name
  db_role_to_execute = {
    role = "atlasAdmin"
    type = "BUILT_IN"
  }
}

resource "mongodbatlas_stream_connection" "example-kafka" {
  project_id      = var.project_id
  instance_name   = mongodbatlas_stream_instance.example.instance_name
  connection_name = "KafkaPlaintextConnection"
  type            = "Kafka"
  authentication = {
    mechanism = "PLAIN"
    username  = var.kafka_username
    password  = var.kafka_password
  }
  bootstrap_servers = "localhost:9092,localhost:9092"
  config = {
    "auto.offset.reset" : "earliest"
  }
  security = {
    protocol = "PLAINTEXT"
  }
}

resource "mongodbatlas_stream_processor" "stream-processor-sample-example" {
  project_id     = var.project_id
  instance_name  = mongodbatlas_stream_instance.example.instance_name
  processor_name = "sampleProcessorName"
  pipeline = jsonencode([
    { "$source" = { "connectionName" = resource.mongodbatlas_stream_connection.example-sample.connection_name } },
    { "$emit" = { "connectionName" : resource.mongodbatlas_stream_connection.example-cluster.connection_name, "db" : "sample", "coll" : "solar", "timeseries" : { "timeField" : "_ts" } } }
  ])
  state = "STARTED"
}

resource "mongodbatlas_stream_processor" "stream-processor-cluster-to-kafka-example" {
  project_id     = var.project_id
  instance_name  = mongodbatlas_stream_instance.example.instance_name
  processor_name = "clusterProcessorName"
  pipeline = jsonencode([
    { "$source" = { "connectionName" = resource.mongodbatlas_stream_connection.example-cluster.connection_name } },
    { "$emit" = { "connectionName" : resource.mongodbatlas_stream_connection.example-kafka.connection_name, "topic" : "topic_from_cluster" } }
  ])
  state = "CREATED"
}

resource "mongodbatlas_stream_processor" "stream-processor-kafka-to-cluster-example" {
  project_id     = var.project_id
  instance_name  = mongodbatlas_stream_instance.example.instance_name
  processor_name = "kafkaProcessorName"
  pipeline = jsonencode([
    { "$source" = { "connectionName" = resource.mongodbatlas_stream_connection.example-kafka.connection_name, "topic" : "topic_source" } },
    { "$emit" = { "connectionName" : resource.mongodbatlas_stream_connection.example-cluster.connection_name, "db" : "kafka", "coll" : "topic_source", "timeseries" : { "timeField" : "ts" } }
  }])
  state = "CREATED"
  options = {
    dlq = {
      coll            = "exampleColumn"
      connection_name = resource.mongodbatlas_stream_connection.example-cluster.connection_name
      db              = "exampleDb"
    }
  }
}

data "mongodbatlas_stream_processors" "example-stream-processors" {
  project_id    = var.project_id
  instance_name = mongodbatlas_stream_instance.example.instance_name
}

data "mongodbatlas_stream_processor" "example-stream-processor" {
  project_id     = var.project_id
  instance_name  = mongodbatlas_stream_instance.example.instance_name
  processor_name = mongodbatlas_stream_processor.stream-processor-sample-example.processor_name
}

# example making use of data sources
output "stream_processors_state" {
  value = data.mongodbatlas_stream_processor.example-stream-processor.state
}

output "stream_processors_results" {
  value = data.mongodbatlas_stream_processors.example-stream-processors.results
}

Schema

Required

  • instance_name (String) Human-readable label that identifies the stream instance.
  • project_id (String) Unique 24-hexadecimal digit string that identifies your project. Use the /groups endpoint to retrieve all projects to which the authenticated user has access.

NOTE: Groups and projects are synonymous terms. Your group id is the same as your project id. For existing groups, your group/project id remains the same. The resource and corresponding endpoints use the term groups.

Read-Only

  • results (Attributes List) Returns all Stream Processors within the specified stream instance.

To use this resource, the requesting API Key must have the Project Owner

role or Project Stream Processing Owner role. (see below for nested schema)

Nested Schema for results

Read-Only:

  • id (String) Unique 24-hexadecimal character string that identifies the stream processor.
  • instance_name (String) Human-readable label that identifies the stream instance.
  • options (Attributes) Optional configuration for the stream processor. (see below for nested schema)
  • pipeline (String) Stream aggregation pipeline you want to apply to your streaming data. MongoDB Atlas Docs contain more information. Using jsonencode is recommended when setting this attribute. For more details see the Aggregation Pipelines Documentation
  • processor_name (String) Human-readable label that identifies the stream processor.
  • project_id (String) Unique 24-hexadecimal digit string that identifies your project. Use the /groups endpoint to retrieve all projects to which the authenticated user has access.

NOTE: Groups and projects are synonymous terms. Your group id is the same as your project id. For existing groups, your group/project id remains the same. The resource and corresponding endpoints use the term groups.

  • state (String) The state of the stream processor. Commonly occurring states are 'CREATED', 'STARTED', 'STOPPED' and 'FAILED'. Used to start or stop the Stream Processor. Valid values are CREATED, STARTED or STOPPED. When a Stream Processor is created without specifying the state, it will default to CREATED state. When a Stream Processor is updated without specifying the state, it will default to the Previous state.

NOTE When a Stream Processor is updated without specifying the state, it is stopped and then restored to previous state upon update completion.

  • stats (String) The stats associated with the stream processor. Refer to the MongoDB Atlas Docs for more information.

Nested Schema for results.options

Read-Only:

Nested Schema for results.options.dlq

Read-Only:

  • coll (String) Name of the collection to use for the DLQ.
  • connection_name (String) Name of the connection to write DLQ messages to. Must be an Atlas connection.
  • db (String) Name of the database to use for the DLQ.

For more information see: MongoDB Atlas API - Stream Processor Documentation.