Skip to content

Commit ac652ab

Browse files
authored
feat: Add step for StartJobRun in AWS Glue DataBrew (#151)
Added a StartJobRun step which allows to run a DataBrew job. Closes #112
1 parent ab08aad commit ac652ab

File tree

4 files changed

+93
-9
lines changed

4 files changed

+93
-9
lines changed

doc/services.rst

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,8 @@ This module provides classes to build steps that integrate with Amazon DynamoDB,
1212

1313
- `Amazon EventBridge <#amazon-eventbridge>`__
1414

15+
- `AWS Glue DataBrew <#aws-glue-databrew>`__
16+
1517
- `Amazon SNS <#amazon-sns>`__
1618

1719
- `Amazon SQS <#amazon-sqs>`__
@@ -47,6 +49,10 @@ Amazon EventBridge
4749
-----------
4850
.. autoclass:: stepfunctions.steps.service.EventBridgePutEventsStep
4951

52+
AWS Glue DataBrew
53+
--------------------
54+
.. autoclass:: stepfunctions.steps.service.GlueDataBrewStartJobRunStep
55+
5056
Amazon SNS
5157
-----------
5258
.. autoclass:: stepfunctions.steps.service.SnsPublishStep

src/stepfunctions/steps/__init__.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,4 +21,5 @@
2121
from stepfunctions.steps.service import DynamoDBGetItemStep, DynamoDBPutItemStep, DynamoDBUpdateItemStep, DynamoDBDeleteItemStep
2222
from stepfunctions.steps.service import EmrCreateClusterStep, EmrTerminateClusterStep, EmrAddStepStep, EmrCancelStepStep, EmrSetClusterTerminationProtectionStep, EmrModifyInstanceFleetByNameStep, EmrModifyInstanceGroupByNameStep
2323
from stepfunctions.steps.service import EventBridgePutEventsStep
24+
from stepfunctions.steps.service import GlueDataBrewStartJobRunStep
2425
from stepfunctions.steps.service import SnsPublishStep, SqsSendMessageStep

src/stepfunctions/steps/service.py

Lines changed: 53 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020
DYNAMODB_SERVICE_NAME = "dynamodb"
2121
ELASTICMAPREDUCE_SERVICE_NAME = "elasticmapreduce"
2222
EVENTBRIDGE_SERVICE_NAME = "events"
23+
GLUE_DATABREW_SERVICE_NAME = "databrew"
2324
SNS_SERVICE_NAME = "sns"
2425
SQS_SERVICE_NAME = "sqs"
2526

@@ -32,14 +33,6 @@ class DynamoDBApi(Enum):
3233
UpdateItem = "updateItem"
3334

3435

35-
class SnsApi(Enum):
36-
Publish = "publish"
37-
38-
39-
class SqsApi(Enum):
40-
SendMessage = "sendMessage"
41-
42-
4336
class ElasticMapReduceApi(Enum):
4437
CreateCluster = "createCluster"
4538
TerminateCluster = "terminateCluster"
@@ -54,6 +47,18 @@ class EventBridgeApi(Enum):
5447
PutEvents = "putEvents"
5548

5649

50+
class GlueDataBrewApi(Enum):
51+
StartJobRun = "startJobRun"
52+
53+
54+
class SnsApi(Enum):
55+
Publish = "publish"
56+
57+
58+
class SqsApi(Enum):
59+
SendMessage = "sendMessage"
60+
61+
5762
class DynamoDBGetItemStep(Task):
5863
"""
5964
Creates a Task state to get an item from DynamoDB. See `Call DynamoDB APIs with Step Functions <https://docs.aws.amazon.com/step-functions/latest/dg/connect-ddb.html>`_ for more details.
@@ -213,6 +218,46 @@ def __init__(self, state_id, **kwargs):
213218
super(DynamoDBUpdateItemStep, self).__init__(state_id, **kwargs)
214219

215220

221+
class GlueDataBrewStartJobRunStep(Task):
222+
223+
"""
224+
Creates a Task state that starts a DataBrew job. See `Manage AWS Glue DataBrew Jobs with Step Functions <https://docs.aws.amazon.com/step-functions/latest/dg/connect-databrew.html>`_ for more details.
225+
"""
226+
227+
def __init__(self, state_id, wait_for_completion=True, **kwargs):
228+
"""
229+
Args:
230+
state_id (str): State name whose length **must be** less than or equal to 128 unicode characters. State names **must be** unique within the scope of the whole state machine.
231+
comment (str, optional): Human-readable comment or description. (default: None)
232+
timeout_seconds (int, optional): Positive integer specifying timeout for the state in seconds. If the state runs longer than the specified timeout, then the interpreter fails the state with a `States.Timeout` Error Name. (default: 60)
233+
timeout_seconds_path (str, optional): Path specifying the state's timeout value in seconds from the state input. When resolved, the path must select a field whose value is a positive integer.
234+
heartbeat_seconds (int, optional): Positive integer specifying heartbeat timeout for the state in seconds. This value should be lower than the one specified for `timeout_seconds`. If more time than the specified heartbeat elapses between heartbeats from the task, then the interpreter fails the state with a `States.Timeout` Error Name.
235+
heartbeat_seconds_path (str, optional): Path specifying the state's heartbeat value in seconds from the state input. When resolved, the path must select a field whose value is a positive integer.
236+
input_path (str, optional): Path applied to the state’s raw input to select some or all of it; that selection is used by the state. (default: '$')
237+
parameters (dict, optional): The value of this field becomes the effective input for the state.
238+
result_path (str, optional): Path specifying the raw input’s combination with or replacement by the state’s result. (default: '$')
239+
output_path (str, optional): Path applied to the state’s output after the application of `result_path`, producing the effective output which serves as the raw input for the next state. (default: '$')
240+
wait_for_completion (bool, optional): Boolean value set to `True` if the Task state should wait to complete before proceeding to the next step in the workflow. (default: True)
241+
"""
242+
if wait_for_completion:
243+
"""
244+
Example resource arn: arn:aws:states:::databrew:startJobRun.sync
245+
"""
246+
247+
kwargs[Field.Resource.value] = get_service_integration_arn(GLUE_DATABREW_SERVICE_NAME,
248+
GlueDataBrewApi.StartJobRun,
249+
IntegrationPattern.WaitForCompletion)
250+
else:
251+
"""
252+
Example resource arn: arn:aws:states:::databrew:startJobRun
253+
"""
254+
255+
kwargs[Field.Resource.value] = get_service_integration_arn(GLUE_DATABREW_SERVICE_NAME,
256+
GlueDataBrewApi.StartJobRun)
257+
258+
super(GlueDataBrewStartJobRunStep, self).__init__(state_id, **kwargs)
259+
260+
216261
class SnsPublishStep(Task):
217262

218263
"""

tests/unit/test_service_steps.py

Lines changed: 33 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,9 +17,10 @@
1717

1818
from unittest.mock import patch
1919
from stepfunctions.steps.service import DynamoDBGetItemStep, DynamoDBPutItemStep, DynamoDBUpdateItemStep, DynamoDBDeleteItemStep
20-
from stepfunctions.steps.service import SnsPublishStep, SqsSendMessageStep
2120
from stepfunctions.steps.service import EmrCreateClusterStep, EmrTerminateClusterStep, EmrAddStepStep, EmrCancelStepStep, EmrSetClusterTerminationProtectionStep, EmrModifyInstanceFleetByNameStep, EmrModifyInstanceGroupByNameStep
2221
from stepfunctions.steps.service import EventBridgePutEventsStep
22+
from stepfunctions.steps.service import SnsPublishStep, SqsSendMessageStep
23+
from stepfunctions.steps.service import GlueDataBrewStartJobRunStep
2324

2425

2526
@patch.object(boto3.session.Session, 'region_name', 'us-east-1')
@@ -661,3 +662,34 @@ def test_emr_modify_instance_group_by_name_step_creation():
661662
'End': True
662663
}
663664

665+
666+
@patch.object(boto3.session.Session, 'region_name', 'us-east-1')
667+
def test_databrew_start_job_run_step_creation_sync():
668+
step = GlueDataBrewStartJobRunStep('Start Glue DataBrew Job Run - Sync', parameters={
669+
"Name": "MyWorkflowJobRun"
670+
})
671+
672+
assert step.to_dict() == {
673+
'Type': 'Task',
674+
'Resource': 'arn:aws:states:::databrew:startJobRun.sync',
675+
'Parameters': {
676+
'Name': 'MyWorkflowJobRun'
677+
},
678+
'End': True
679+
}
680+
681+
682+
@patch.object(boto3.session.Session, 'region_name', 'us-east-1')
683+
def test_databrew_start_job_run_step_creation():
684+
step = GlueDataBrewStartJobRunStep('Start Glue DataBrew Job Run', wait_for_completion=False, parameters={
685+
"Name": "MyWorkflowJobRun"
686+
})
687+
688+
assert step.to_dict() == {
689+
'Type': 'Task',
690+
'Resource': 'arn:aws:states:::databrew:startJobRun',
691+
'Parameters': {
692+
'Name': 'MyWorkflowJobRun'
693+
},
694+
'End': True
695+
}

0 commit comments

Comments
 (0)