Skip to content

feat: Add support for AWS Glue DataBrew #151

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 11 commits into from
Aug 5, 2021
6 changes: 6 additions & 0 deletions doc/services.rst
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@ This module provides classes to build steps that integrate with Amazon DynamoDB,

- `Amazon EventBridge <#amazon-eventbridge>`__

- `AWS Glue DataBrew <#aws-glue-databrew>`__

- `Amazon SNS <#amazon-sns>`__

- `Amazon SQS <#amazon-sqs>`__
Expand Down Expand Up @@ -47,6 +49,10 @@ Amazon EventBridge
-----------
.. autoclass:: stepfunctions.steps.service.EventBridgePutEventsStep

AWS Glue DataBrew
--------------------
.. autoclass:: stepfunctions.steps.service.GlueDataBrewStartJobRunStep

Amazon SNS
-----------
.. autoclass:: stepfunctions.steps.service.SnsPublishStep
Expand Down
1 change: 1 addition & 0 deletions src/stepfunctions/steps/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,4 +21,5 @@
from stepfunctions.steps.service import DynamoDBGetItemStep, DynamoDBPutItemStep, DynamoDBUpdateItemStep, DynamoDBDeleteItemStep
from stepfunctions.steps.service import EmrCreateClusterStep, EmrTerminateClusterStep, EmrAddStepStep, EmrCancelStepStep, EmrSetClusterTerminationProtectionStep, EmrModifyInstanceFleetByNameStep, EmrModifyInstanceGroupByNameStep
from stepfunctions.steps.service import EventBridgePutEventsStep
from stepfunctions.steps.service import GlueDataBrewStartJobRunStep
from stepfunctions.steps.service import SnsPublishStep, SqsSendMessageStep
61 changes: 53 additions & 8 deletions src/stepfunctions/steps/service.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
DYNAMODB_SERVICE_NAME = "dynamodb"
ELASTICMAPREDUCE_SERVICE_NAME = "elasticmapreduce"
EVENTBRIDGE_SERVICE_NAME = "events"
GLUE_DATABREW_SERVICE_NAME = "databrew"
SNS_SERVICE_NAME = "sns"
SQS_SERVICE_NAME = "sqs"

Expand All @@ -32,14 +33,6 @@ class DynamoDBApi(Enum):
UpdateItem = "updateItem"


class SnsApi(Enum):
Publish = "publish"


class SqsApi(Enum):
SendMessage = "sendMessage"


class ElasticMapReduceApi(Enum):
CreateCluster = "createCluster"
TerminateCluster = "terminateCluster"
Expand All @@ -54,6 +47,18 @@ class EventBridgeApi(Enum):
PutEvents = "putEvents"


class GlueDataBrewApi(Enum):
StartJobRun = "startJobRun"


class SnsApi(Enum):
Publish = "publish"


class SqsApi(Enum):
SendMessage = "sendMessage"


class DynamoDBGetItemStep(Task):
"""
Creates a Task state to get an item from DynamoDB. See `Call DynamoDB APIs with Step Functions <https://docs.aws.amazon.com/step-functions/latest/dg/connect-ddb.html>`_ for more details.
Expand Down Expand Up @@ -213,6 +218,46 @@ def __init__(self, state_id, **kwargs):
super(DynamoDBUpdateItemStep, self).__init__(state_id, **kwargs)


class GlueDataBrewStartJobRunStep(Task):

"""
Creates a Task state that starts a DataBrew job. See `Manage AWS Glue DataBrew Jobs with Step Functions <https://docs.aws.amazon.com/step-functions/latest/dg/connect-databrew.html>`_ for more details.
"""

def __init__(self, state_id, wait_for_completion=True, **kwargs):
"""
Args:
state_id (str): State name whose length **must be** less than or equal to 128 unicode characters. State names **must be** unique within the scope of the whole state machine.
comment (str, optional): Human-readable comment or description. (default: None)
timeout_seconds (int, optional): Positive integer specifying timeout for the state in seconds. If the state runs longer than the specified timeout, then the interpreter fails the state with a `States.Timeout` Error Name. (default: 60)
timeout_seconds_path (str, optional): Path specifying the state's timeout value in seconds from the state input. When resolved, the path must select a field whose value is a positive integer.
heartbeat_seconds (int, optional): Positive integer specifying heartbeat timeout for the state in seconds. This value should be lower than the one specified for `timeout_seconds`. If more time than the specified heartbeat elapses between heartbeats from the task, then the interpreter fails the state with a `States.Timeout` Error Name.
heartbeat_seconds_path (str, optional): Path specifying the state's heartbeat value in seconds from the state input. When resolved, the path must select a field whose value is a positive integer.
input_path (str, optional): Path applied to the state’s raw input to select some or all of it; that selection is used by the state. (default: '$')
parameters (dict, optional): The value of this field becomes the effective input for the state.
result_path (str, optional): Path specifying the raw input’s combination with or replacement by the state’s result. (default: '$')
output_path (str, optional): Path applied to the state’s output after the application of `result_path`, producing the effective output which serves as the raw input for the next state. (default: '$')
wait_for_completion (bool, optional): Boolean value set to `True` if the Task state should wait to complete before proceeding to the next step in the workflow. (default: True)
"""
if wait_for_completion:
"""
Example resource arn: arn:aws:states:::databrew:startJobRun.sync
"""

kwargs[Field.Resource.value] = get_service_integration_arn(GLUE_DATABREW_SERVICE_NAME,
GlueDataBrewApi.StartJobRun,
IntegrationPattern.WaitForCompletion)
else:
"""
Example resource arn: arn:aws:states:::databrew:startJobRun
"""

kwargs[Field.Resource.value] = get_service_integration_arn(GLUE_DATABREW_SERVICE_NAME,
GlueDataBrewApi.StartJobRun)

super(GlueDataBrewStartJobRunStep, self).__init__(state_id, **kwargs)


class SnsPublishStep(Task):

"""
Expand Down
34 changes: 33 additions & 1 deletion tests/unit/test_service_steps.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,10 @@

from unittest.mock import patch
from stepfunctions.steps.service import DynamoDBGetItemStep, DynamoDBPutItemStep, DynamoDBUpdateItemStep, DynamoDBDeleteItemStep
from stepfunctions.steps.service import SnsPublishStep, SqsSendMessageStep
from stepfunctions.steps.service import EmrCreateClusterStep, EmrTerminateClusterStep, EmrAddStepStep, EmrCancelStepStep, EmrSetClusterTerminationProtectionStep, EmrModifyInstanceFleetByNameStep, EmrModifyInstanceGroupByNameStep
from stepfunctions.steps.service import EventBridgePutEventsStep
from stepfunctions.steps.service import SnsPublishStep, SqsSendMessageStep
from stepfunctions.steps.service import GlueDataBrewStartJobRunStep


@patch.object(boto3.session.Session, 'region_name', 'us-east-1')
Expand Down Expand Up @@ -661,3 +662,34 @@ def test_emr_modify_instance_group_by_name_step_creation():
'End': True
}


@patch.object(boto3.session.Session, 'region_name', 'us-east-1')
def test_databrew_start_job_run_step_creation_sync():
step = GlueDataBrewStartJobRunStep('Start Glue DataBrew Job Run - Sync', parameters={
"Name": "MyWorkflowJobRun"
})

assert step.to_dict() == {
'Type': 'Task',
'Resource': 'arn:aws:states:::databrew:startJobRun.sync',
'Parameters': {
'Name': 'MyWorkflowJobRun'
},
'End': True
}


@patch.object(boto3.session.Session, 'region_name', 'us-east-1')
def test_databrew_start_job_run_step_creation():
step = GlueDataBrewStartJobRunStep('Start Glue DataBrew Job Run', wait_for_completion=False, parameters={
"Name": "MyWorkflowJobRun"
})

assert step.to_dict() == {
'Type': 'Task',
'Resource': 'arn:aws:states:::databrew:startJobRun',
'Parameters': {
'Name': 'MyWorkflowJobRun'
},
'End': True
}