diff --git a/doc/services.rst b/doc/services.rst index 405ccf4..76b09a0 100644 --- a/doc/services.rst +++ b/doc/services.rst @@ -12,6 +12,8 @@ This module provides classes to build steps that integrate with Amazon DynamoDB, - `Amazon EventBridge <#amazon-eventbridge>`__ +- `AWS Glue DataBrew <#aws-glue-databrew>`__ + - `Amazon SNS <#amazon-sns>`__ - `Amazon SQS <#amazon-sqs>`__ @@ -47,6 +49,10 @@ Amazon EventBridge ----------- .. autoclass:: stepfunctions.steps.service.EventBridgePutEventsStep +AWS Glue DataBrew +-------------------- +.. autoclass:: stepfunctions.steps.service.GlueDataBrewStartJobRunStep + Amazon SNS ----------- .. autoclass:: stepfunctions.steps.service.SnsPublishStep diff --git a/src/stepfunctions/steps/__init__.py b/src/stepfunctions/steps/__init__.py index b27d411..4e85666 100644 --- a/src/stepfunctions/steps/__init__.py +++ b/src/stepfunctions/steps/__init__.py @@ -21,4 +21,5 @@ from stepfunctions.steps.service import DynamoDBGetItemStep, DynamoDBPutItemStep, DynamoDBUpdateItemStep, DynamoDBDeleteItemStep from stepfunctions.steps.service import EmrCreateClusterStep, EmrTerminateClusterStep, EmrAddStepStep, EmrCancelStepStep, EmrSetClusterTerminationProtectionStep, EmrModifyInstanceFleetByNameStep, EmrModifyInstanceGroupByNameStep from stepfunctions.steps.service import EventBridgePutEventsStep +from stepfunctions.steps.service import GlueDataBrewStartJobRunStep from stepfunctions.steps.service import SnsPublishStep, SqsSendMessageStep diff --git a/src/stepfunctions/steps/service.py b/src/stepfunctions/steps/service.py index 5c32a88..4f4a538 100644 --- a/src/stepfunctions/steps/service.py +++ b/src/stepfunctions/steps/service.py @@ -20,6 +20,7 @@ DYNAMODB_SERVICE_NAME = "dynamodb" ELASTICMAPREDUCE_SERVICE_NAME = "elasticmapreduce" EVENTBRIDGE_SERVICE_NAME = "events" +GLUE_DATABREW_SERVICE_NAME = "databrew" SNS_SERVICE_NAME = "sns" SQS_SERVICE_NAME = "sqs" @@ -32,14 +33,6 @@ class DynamoDBApi(Enum): UpdateItem = "updateItem" -class SnsApi(Enum): - Publish = "publish" - - -class SqsApi(Enum): - SendMessage = "sendMessage" - - class ElasticMapReduceApi(Enum): CreateCluster = "createCluster" TerminateCluster = "terminateCluster" @@ -54,6 +47,18 @@ class EventBridgeApi(Enum): PutEvents = "putEvents" +class GlueDataBrewApi(Enum): + StartJobRun = "startJobRun" + + +class SnsApi(Enum): + Publish = "publish" + + +class SqsApi(Enum): + SendMessage = "sendMessage" + + class DynamoDBGetItemStep(Task): """ Creates a Task state to get an item from DynamoDB. See `Call DynamoDB APIs with Step Functions `_ for more details. @@ -213,6 +218,46 @@ def __init__(self, state_id, **kwargs): super(DynamoDBUpdateItemStep, self).__init__(state_id, **kwargs) +class GlueDataBrewStartJobRunStep(Task): + + """ + Creates a Task state that starts a DataBrew job. See `Manage AWS Glue DataBrew Jobs with Step Functions `_ for more details. + """ + + def __init__(self, state_id, wait_for_completion=True, **kwargs): + """ + Args: + state_id (str): State name whose length **must be** less than or equal to 128 unicode characters. State names **must be** unique within the scope of the whole state machine. + comment (str, optional): Human-readable comment or description. (default: None) + timeout_seconds (int, optional): Positive integer specifying timeout for the state in seconds. If the state runs longer than the specified timeout, then the interpreter fails the state with a `States.Timeout` Error Name. (default: 60) + timeout_seconds_path (str, optional): Path specifying the state's timeout value in seconds from the state input. When resolved, the path must select a field whose value is a positive integer. + heartbeat_seconds (int, optional): Positive integer specifying heartbeat timeout for the state in seconds. This value should be lower than the one specified for `timeout_seconds`. If more time than the specified heartbeat elapses between heartbeats from the task, then the interpreter fails the state with a `States.Timeout` Error Name. + heartbeat_seconds_path (str, optional): Path specifying the state's heartbeat value in seconds from the state input. When resolved, the path must select a field whose value is a positive integer. + input_path (str, optional): Path applied to the state’s raw input to select some or all of it; that selection is used by the state. (default: '$') + parameters (dict, optional): The value of this field becomes the effective input for the state. + result_path (str, optional): Path specifying the raw input’s combination with or replacement by the state’s result. (default: '$') + output_path (str, optional): Path applied to the state’s output after the application of `result_path`, producing the effective output which serves as the raw input for the next state. (default: '$') + wait_for_completion (bool, optional): Boolean value set to `True` if the Task state should wait to complete before proceeding to the next step in the workflow. (default: True) + """ + if wait_for_completion: + """ + Example resource arn: arn:aws:states:::databrew:startJobRun.sync + """ + + kwargs[Field.Resource.value] = get_service_integration_arn(GLUE_DATABREW_SERVICE_NAME, + GlueDataBrewApi.StartJobRun, + IntegrationPattern.WaitForCompletion) + else: + """ + Example resource arn: arn:aws:states:::databrew:startJobRun + """ + + kwargs[Field.Resource.value] = get_service_integration_arn(GLUE_DATABREW_SERVICE_NAME, + GlueDataBrewApi.StartJobRun) + + super(GlueDataBrewStartJobRunStep, self).__init__(state_id, **kwargs) + + class SnsPublishStep(Task): """ diff --git a/tests/unit/test_service_steps.py b/tests/unit/test_service_steps.py index 53039c7..627ae55 100644 --- a/tests/unit/test_service_steps.py +++ b/tests/unit/test_service_steps.py @@ -17,9 +17,10 @@ from unittest.mock import patch from stepfunctions.steps.service import DynamoDBGetItemStep, DynamoDBPutItemStep, DynamoDBUpdateItemStep, DynamoDBDeleteItemStep -from stepfunctions.steps.service import SnsPublishStep, SqsSendMessageStep from stepfunctions.steps.service import EmrCreateClusterStep, EmrTerminateClusterStep, EmrAddStepStep, EmrCancelStepStep, EmrSetClusterTerminationProtectionStep, EmrModifyInstanceFleetByNameStep, EmrModifyInstanceGroupByNameStep from stepfunctions.steps.service import EventBridgePutEventsStep +from stepfunctions.steps.service import SnsPublishStep, SqsSendMessageStep +from stepfunctions.steps.service import GlueDataBrewStartJobRunStep @patch.object(boto3.session.Session, 'region_name', 'us-east-1') @@ -661,3 +662,34 @@ def test_emr_modify_instance_group_by_name_step_creation(): 'End': True } + +@patch.object(boto3.session.Session, 'region_name', 'us-east-1') +def test_databrew_start_job_run_step_creation_sync(): + step = GlueDataBrewStartJobRunStep('Start Glue DataBrew Job Run - Sync', parameters={ + "Name": "MyWorkflowJobRun" + }) + + assert step.to_dict() == { + 'Type': 'Task', + 'Resource': 'arn:aws:states:::databrew:startJobRun.sync', + 'Parameters': { + 'Name': 'MyWorkflowJobRun' + }, + 'End': True + } + + +@patch.object(boto3.session.Session, 'region_name', 'us-east-1') +def test_databrew_start_job_run_step_creation(): + step = GlueDataBrewStartJobRunStep('Start Glue DataBrew Job Run', wait_for_completion=False, parameters={ + "Name": "MyWorkflowJobRun" + }) + + assert step.to_dict() == { + 'Type': 'Task', + 'Resource': 'arn:aws:states:::databrew:startJobRun', + 'Parameters': { + 'Name': 'MyWorkflowJobRun' + }, + 'End': True + }