From abd6b10044494b3e6f97e41d703bc49bb23f0c7d Mon Sep 17 00:00:00 2001 From: Carolyn Nguyen Date: Fri, 18 Jun 2021 19:23:31 -0700 Subject: [PATCH 1/6] Feature: Add support to Amazon EKS service integration --- doc/services.rst | 16 ++ src/stepfunctions/steps/__init__.py | 8 + src/stepfunctions/steps/service.py | 243 +++++++++++++++++++++++++ tests/unit/test_service_steps.py | 266 +++++++++++++++++++++++++++- 4 files changed, 532 insertions(+), 1 deletion(-) diff --git a/doc/services.rst b/doc/services.rst index 1ac3738..95bc670 100644 --- a/doc/services.rst +++ b/doc/services.rst @@ -8,6 +8,8 @@ This module provides classes to build steps that integrate with Amazon DynamoDB, - `Amazon DynamoDB <#amazon-dynamodb>`__ +- `Amazon EKS <#amazon-eks>`__ + - `Amazon SNS <#amazon-sns>`__ - `Amazon SQS <#amazon-sqs>`__ @@ -25,6 +27,20 @@ Amazon DynamoDB .. autoclass:: stepfunctions.steps.service.DynamoDBUpdateItemStep +Amazon DynamoDB +---------------- +.. autoclass:: stepfunctions.steps.service.EksCreateClusterStep + +.. autoclass:: stepfunctions.steps.service.EksCreateFargateProfileStep + +.. autoclass:: stepfunctions.steps.service.EksCreateNodeGroupStep + +.. autoclass:: stepfunctions.steps.service.EksDeleteClusterStep + +.. autoclass:: stepfunctions.steps.service.EksDeleteFargateProfileStep + +.. autoclass:: stepfunctions.steps.service.EksDeleteNodeGroupStep + Amazon SNS ----------- .. autoclass:: stepfunctions.steps.service.SnsPublishStep diff --git a/src/stepfunctions/steps/__init__.py b/src/stepfunctions/steps/__init__.py index 92598f1..c88d251 100644 --- a/src/stepfunctions/steps/__init__.py +++ b/src/stepfunctions/steps/__init__.py @@ -19,6 +19,14 @@ from stepfunctions.steps.sagemaker import TrainingStep, TransformStep, ModelStep, EndpointConfigStep, EndpointStep, TuningStep, ProcessingStep from stepfunctions.steps.compute import LambdaStep, BatchSubmitJobStep, GlueStartJobRunStep, EcsRunTaskStep from stepfunctions.steps.service import DynamoDBGetItemStep, DynamoDBPutItemStep, DynamoDBUpdateItemStep, DynamoDBDeleteItemStep +from stepfunctions.steps.service import ( + EksCreateClusterStep, + EksCreateFargateProfileStep, + EksCreateNodeGroupStep, + EksDeleteClusterStep, + EksDeleteFargateProfileStep, + EksDeleteNodeGroupStep, +) from stepfunctions.steps.service import SnsPublishStep, SqsSendMessageStep from stepfunctions.steps.service import EmrCreateClusterStep, EmrTerminateClusterStep, EmrAddStepStep, EmrCancelStepStep, EmrSetClusterTerminationProtectionStep, EmrModifyInstanceFleetByNameStep, EmrModifyInstanceGroupByNameStep diff --git a/src/stepfunctions/steps/service.py b/src/stepfunctions/steps/service.py index 6bf155f..39c32a7 100644 --- a/src/stepfunctions/steps/service.py +++ b/src/stepfunctions/steps/service.py @@ -18,6 +18,7 @@ from stepfunctions.steps.integration_resources import IntegrationPattern, get_service_integration_arn DYNAMODB_SERVICE_NAME = "dynamodb" +EKS_SERVICES_NAME = "eks" SNS_SERVICE_NAME = "sns" SQS_SERVICE_NAME = "sqs" ELASTICMAPREDUCE_SERVICE_NAME = "elasticmapreduce" @@ -29,6 +30,14 @@ class DynamoDBApi(Enum): DeleteItem = "deleteItem" UpdateItem = "updateItem" +class EKSApi(Enum): + CreateCluster = "createCluster" + DeleteCluster = "deleteCluster" + CreateFargateProfile = "createFargateProfile" + DeleteFargateProfile = "deleteFargateProfile" + CreateNodegroup = "createNodegroup" + DeleteNodegroup = "deleteNodegroup" + class SnsApi(Enum): Publish = "publish" @@ -167,6 +176,240 @@ def __init__(self, state_id, **kwargs): super(DynamoDBUpdateItemStep, self).__init__(state_id, **kwargs) +class EksCreateClusterStep(Task): + """ + Creates a Task state that creates an Amazon EKS cluster. Call Amazon EKS with Step Functions `_ for more details. + """ + + def __init__(self, state_id, wait_for_completion=True, **kwargs): + """ + Args: + state_id (str): State name whose length **must be** less than or equal to 128 unicode characters. State names **must be** unique within the scope of the whole state machine. + comment (str, optional): Human-readable comment or description. (default: None) + timeout_seconds (int, optional): Positive integer specifying timeout for the state in seconds. If the state runs longer than the specified timeout, then the interpreter fails the state with a `States.Timeout` Error Name. (default: 60) + timeout_seconds_path (str, optional): Path specifying the state's timeout value in seconds from the state input. When resolved, the path must select a field whose value is a positive integer. + heartbeat_seconds (int, optional): Positive integer specifying heartbeat timeout for the state in seconds. This value should be lower than the one specified for `timeout_seconds`. If more time than the specified heartbeat elapses between heartbeats from the task, then the interpreter fails the state with a `States.Timeout` Error Name. + heartbeat_seconds_path (str, optional): Path specifying the state's heartbeat value in seconds from the state input. When resolved, the path must select a field whose value is a positive integer. + input_path (str, optional): Path applied to the state’s raw input to select some or all of it; that selection is used by the state. (default: '$') + parameters (dict, optional): The value of this field becomes the effective input for the state. + result_path (str, optional): Path specifying the raw input’s combination with or replacement by the state’s result. (default: '$') + output_path (str, optional): Path applied to the state’s output after the application of `result_path`, producing the effective output which serves as the raw input for the next state. (default: '$') + wait_for_completion (bool, optional): Boolean value set to `True` if the Task state should wait to complete before proceeding to the next step in the workflow. (default: True) + """ + if wait_for_completion: + """ + Example resource arn: arn:aws:states:::eks:createCluster.sync + """ + + kwargs[Field.Resource.value] = get_service_integration_arn(EKS_SERVICES_NAME, + EKSApi.CreateCluster, + IntegrationPattern.WaitForCompletion) + else: + """ + Example resource arn: arn:aws:states:::eks:createCluster + """ + + kwargs[Field.Resource.value] = get_service_integration_arn(EKS_SERVICES_NAME, + EKSApi.CreateCluster) + + super(EksCreateClusterStep, self).__init__(state_id, **kwargs) + + +class EksCreateFargateProfileStep(Task): + """ + Creates a Task state that creates a Fargate profile. See `Call Amazon EKS with Step Functions `_ for more details. + """ + + def __init__(self, state_id, wait_for_completion=True, **kwargs): + """ + Args: + state_id (str): State name whose length **must be** less than or equal to 128 unicode characters. State names **must be** unique within the scope of the whole state machine. + comment (str, optional): Human-readable comment or description. (default: None) + timeout_seconds (int, optional): Positive integer specifying timeout for the state in seconds. If the state runs longer than the specified timeout, then the interpreter fails the state with a `States.Timeout` Error Name. (default: 60) + timeout_seconds_path (str, optional): Path specifying the state's timeout value in seconds from the state input. When resolved, the path must select a field whose value is a positive integer. + heartbeat_seconds (int, optional): Positive integer specifying heartbeat timeout for the state in seconds. This value should be lower than the one specified for `timeout_seconds`. If more time than the specified heartbeat elapses between heartbeats from the task, then the interpreter fails the state with a `States.Timeout` Error Name. + heartbeat_seconds_path (str, optional): Path specifying the state's heartbeat value in seconds from the state input. When resolved, the path must select a field whose value is a positive integer. + input_path (str, optional): Path applied to the state’s raw input to select some or all of it; that selection is used by the state. (default: '$') + parameters (dict, optional): The value of this field becomes the effective input for the state. + result_path (str, optional): Path specifying the raw input’s combination with or replacement by the state’s result. (default: '$') + output_path (str, optional): Path applied to the state’s output after the application of `result_path`, producing the effective output which serves as the raw input for the next state. (default: '$') + wait_for_completion (bool, optional): Boolean value set to `True` if the Task state should wait to complete before proceeding to the next step in the workflow. (default: True) + """ + if wait_for_completion: + """ + Example resource arn: arn:aws:states:::eks:createFargateProfile.sync + """ + + kwargs[Field.Resource.value] = get_service_integration_arn(EKS_SERVICES_NAME, + EKSApi.CreateFargateProfile, + IntegrationPattern.WaitForCompletion) + else: + """ + Example resource arn: arn:aws:states:::eks:createFargateProfile + """ + + kwargs[Field.Resource.value] = get_service_integration_arn(EKS_SERVICES_NAME, + EKSApi.CreateFargateProfile) + + super(EksCreateFargateProfileStep, self).__init__(state_id, **kwargs) + + +class EksDeleteFargateProfileStep(Task): + """ + Creates a Task state that deletes a Fargate profile. See `Call Amazon EKS with Step Functions `_ for more details. + """ + + def __init__(self, state_id, wait_for_completion=True, **kwargs): + """ + Args: + state_id (str): State name whose length **must be** less than or equal to 128 unicode characters. State names **must be** unique within the scope of the whole state machine. + comment (str, optional): Human-readable comment or description. (default: None) + timeout_seconds (int, optional): Positive integer specifying timeout for the state in seconds. If the state runs longer than the specified timeout, then the interpreter fails the state with a `States.Timeout` Error Name. (default: 60) + timeout_seconds_path (str, optional): Path specifying the state's timeout value in seconds from the state input. When resolved, the path must select a field whose value is a positive integer. + heartbeat_seconds (int, optional): Positive integer specifying heartbeat timeout for the state in seconds. This value should be lower than the one specified for `timeout_seconds`. If more time than the specified heartbeat elapses between heartbeats from the task, then the interpreter fails the state with a `States.Timeout` Error Name. + heartbeat_seconds_path (str, optional): Path specifying the state's heartbeat value in seconds from the state input. When resolved, the path must select a field whose value is a positive integer. + input_path (str, optional): Path applied to the state’s raw input to select some or all of it; that selection is used by the state. (default: '$') + parameters (dict, optional): The value of this field becomes the effective input for the state. + result_path (str, optional): Path specifying the raw input’s combination with or replacement by the state’s result. (default: '$') + output_path (str, optional): Path applied to the state’s output after the application of `result_path`, producing the effective output which serves as the raw input for the next state. (default: '$') + wait_for_completion (bool, optional): Boolean value set to `True` if the Task state should wait to complete before proceeding to the next step in the workflow. (default: True) + """ + if wait_for_completion: + """ + Example resource arn: arn:aws:states:::eks:deleteFargateProfile.sync + """ + + kwargs[Field.Resource.value] = get_service_integration_arn(EKS_SERVICES_NAME, + EKSApi.DeleteFargateProfile, + IntegrationPattern.WaitForCompletion) + else: + """ + Example resource arn: arn:aws:states:::eks:deleteFargateProfile + """ + + kwargs[Field.Resource.value] = get_service_integration_arn(EKS_SERVICES_NAME, + EKSApi.DeleteFargateProfile) + + super(EksDeleteFargateProfileStep, self).__init__(state_id, **kwargs) + + +class EksCreateNodeGroupStep(Task): + """ + Creates a Task state that creates a node group. See `Call Amazon EKS with Step Functions `_ for more details. + """ + + def __init__(self, state_id, wait_for_completion=True, **kwargs): + """ + Args: + state_id (str): State name whose length **must be** less than or equal to 128 unicode characters. State names **must be** unique within the scope of the whole state machine. + comment (str, optional): Human-readable comment or description. (default: None) + timeout_seconds (int, optional): Positive integer specifying timeout for the state in seconds. If the state runs longer than the specified timeout, then the interpreter fails the state with a `States.Timeout` Error Name. (default: 60) + timeout_seconds_path (str, optional): Path specifying the state's timeout value in seconds from the state input. When resolved, the path must select a field whose value is a positive integer. + heartbeat_seconds (int, optional): Positive integer specifying heartbeat timeout for the state in seconds. This value should be lower than the one specified for `timeout_seconds`. If more time than the specified heartbeat elapses between heartbeats from the task, then the interpreter fails the state with a `States.Timeout` Error Name. + heartbeat_seconds_path (str, optional): Path specifying the state's heartbeat value in seconds from the state input. When resolved, the path must select a field whose value is a positive integer. + input_path (str, optional): Path applied to the state’s raw input to select some or all of it; that selection is used by the state. (default: '$') + parameters (dict, optional): The value of this field becomes the effective input for the state. + result_path (str, optional): Path specifying the raw input’s combination with or replacement by the state’s result. (default: '$') + output_path (str, optional): Path applied to the state’s output after the application of `result_path`, producing the effective output which serves as the raw input for the next state. (default: '$') + wait_for_completion (bool, optional): Boolean value set to `True` if the Task state should wait to complete before proceeding to the next step in the workflow. (default: True) + """ + if wait_for_completion: + """ + Example resource arn: arn:aws:states:::eks:createNodegroup.sync + """ + + kwargs[Field.Resource.value] = get_service_integration_arn(EKS_SERVICES_NAME, + EKSApi.CreateNodegroup, + IntegrationPattern.WaitForCompletion) + else: + """ + Example resource arn: arn:aws:states:::eks:createNodegroup + """ + + kwargs[Field.Resource.value] = get_service_integration_arn(EKS_SERVICES_NAME, + EKSApi.CreateNodegroup) + + super(EksCreateNodeGroupStep, self).__init__(state_id, **kwargs) + + +class EksDeleteNodeGroupStep(Task): + """ + Creates a Task state that deletes a node group. See `Call Amazon EKS with Step Functions `_ for more details. + """ + + def __init__(self, state_id, wait_for_completion=True, **kwargs): + """ + Args: + state_id (str): State name whose length **must be** less than or equal to 128 unicode characters. State names **must be** unique within the scope of the whole state machine. + comment (str, optional): Human-readable comment or description. (default: None) + timeout_seconds (int, optional): Positive integer specifying timeout for the state in seconds. If the state runs longer than the specified timeout, then the interpreter fails the state with a `States.Timeout` Error Name. (default: 60) + timeout_seconds_path (str, optional): Path specifying the state's timeout value in seconds from the state input. When resolved, the path must select a field whose value is a positive integer. + heartbeat_seconds (int, optional): Positive integer specifying heartbeat timeout for the state in seconds. This value should be lower than the one specified for `timeout_seconds`. If more time than the specified heartbeat elapses between heartbeats from the task, then the interpreter fails the state with a `States.Timeout` Error Name. + heartbeat_seconds_path (str, optional): Path specifying the state's heartbeat value in seconds from the state input. When resolved, the path must select a field whose value is a positive integer. + input_path (str, optional): Path applied to the state’s raw input to select some or all of it; that selection is used by the state. (default: '$') + parameters (dict, optional): The value of this field becomes the effective input for the state. + result_path (str, optional): Path specifying the raw input’s combination with or replacement by the state’s result. (default: '$') + output_path (str, optional): Path applied to the state’s output after the application of `result_path`, producing the effective output which serves as the raw input for the next state. (default: '$') + wait_for_completion (bool, optional): Boolean value set to `True` if the Task state should wait to complete before proceeding to the next step in the workflow. (default: True) + """ + if wait_for_completion: + """ + Example resource arn: arn:aws:states:::eks:deleteNodegroup.sync + """ + + kwargs[Field.Resource.value] = get_service_integration_arn(EKS_SERVICES_NAME, + EKSApi.DeleteNodegroup, + IntegrationPattern.WaitForCompletion) + else: + """ + Example resource arn: arn:aws:states:::eks:deleteNodegroup + """ + + kwargs[Field.Resource.value] = get_service_integration_arn(EKS_SERVICES_NAME, + EKSApi.DeleteNodegroup) + + super(EksDeleteNodeGroupStep, self).__init__(state_id, **kwargs) + + +class EksDeleteClusterStep(Task): + """ + Creates a Task state that deletes an Amazon EKS cluster. See `Call Amazon EKS with Step Functions `_ for more details. + """ + + def __init__(self, state_id, wait_for_completion=True, **kwargs): + """ + Args: + state_id (str): State name whose length **must be** less than or equal to 128 unicode characters. State names **must be** unique within the scope of the whole state machine. + comment (str, optional): Human-readable comment or description. (default: None) + timeout_seconds (int, optional): Positive integer specifying timeout for the state in seconds. If the state runs longer than the specified timeout, then the interpreter fails the state with a `States.Timeout` Error Name. (default: 60) + timeout_seconds_path (str, optional): Path specifying the state's timeout value in seconds from the state input. When resolved, the path must select a field whose value is a positive integer. + heartbeat_seconds (int, optional): Positive integer specifying heartbeat timeout for the state in seconds. This value should be lower than the one specified for `timeout_seconds`. If more time than the specified heartbeat elapses between heartbeats from the task, then the interpreter fails the state with a `States.Timeout` Error Name. + heartbeat_seconds_path (str, optional): Path specifying the state's heartbeat value in seconds from the state input. When resolved, the path must select a field whose value is a positive integer. + input_path (str, optional): Path applied to the state’s raw input to select some or all of it; that selection is used by the state. (default: '$') + parameters (dict, optional): The value of this field becomes the effective input for the state. + result_path (str, optional): Path specifying the raw input’s combination with or replacement by the state’s result. (default: '$') + output_path (str, optional): Path applied to the state’s output after the application of `result_path`, producing the effective output which serves as the raw input for the next state. (default: '$') + wait_for_completion (bool, optional): Boolean value set to `True` if the Task state should wait to complete before proceeding to the next step in the workflow. (default: True) + """ + if wait_for_completion: + """ + Example resource arn: arn:aws:states:::eks:deleteCluster.sync + """ + + kwargs[Field.Resource.value] = get_service_integration_arn(EKS_SERVICES_NAME, + EKSApi.DeleteCluster, + IntegrationPattern.WaitForCompletion) + else: + """ + Example resource arn: arn:aws:states:::eks:deleteCluster + """ + + kwargs[Field.Resource.value] = get_service_integration_arn(EKS_SERVICES_NAME, + EKSApi.DeleteCluster) + + super(EksDeleteClusterStep, self).__init__(state_id, **kwargs) + + class SnsPublishStep(Task): """ diff --git a/tests/unit/test_service_steps.py b/tests/unit/test_service_steps.py index 6576aaf..ca98c72 100644 --- a/tests/unit/test_service_steps.py +++ b/tests/unit/test_service_steps.py @@ -12,11 +12,18 @@ # permissions and limitations under the License. from __future__ import absolute_import -import pytest import boto3 from unittest.mock import patch from stepfunctions.steps.service import DynamoDBGetItemStep, DynamoDBPutItemStep, DynamoDBUpdateItemStep, DynamoDBDeleteItemStep +from stepfunctions.steps.service import ( + EksCreateClusterStep, + EksCreateFargateProfileStep, + EksCreateNodeGroupStep, + EksDeleteClusterStep, + EksDeleteFargateProfileStep, + EksDeleteNodeGroupStep, +) from stepfunctions.steps.service import SnsPublishStep, SqsSendMessageStep from stepfunctions.steps.service import EmrCreateClusterStep, EmrTerminateClusterStep, EmrAddStepStep, EmrCancelStepStep, EmrSetClusterTerminationProtectionStep, EmrModifyInstanceFleetByNameStep, EmrModifyInstanceGroupByNameStep @@ -596,3 +603,260 @@ def test_emr_modify_instance_group_by_name_step_creation(): 'End': True } + +@patch.object(boto3.session.Session, 'region_name', 'us-east-1') +def test_eks_create_cluster_step_creation(): + step = EksCreateClusterStep('Create Eks cluster', parameters={ + "Name": "MyCluster", + "ResourcesVpcConfig": { + "SubnetIds": [ + "subnet-00000000000000000", + "subnet-00000000000000001" + ] + }, + "RoleArn": "arn:aws:iam::123456789012:role/MyEKSClusterRole" + }) + + assert step.to_dict() == { + 'Type': 'Task', + 'Resource': 'arn:aws:states:::eks:createCluster.sync', + 'Parameters': { + "Name": "MyCluster", + "ResourcesVpcConfig": { + "SubnetIds": [ + "subnet-00000000000000000", + "subnet-00000000000000001" + ] + }, + "RoleArn": "arn:aws:iam::123456789012:role/MyEKSClusterRole" + }, + 'End': True + } + + step = EksCreateClusterStep('Create Eks cluster', wait_for_completion=False, parameters={ + "Name": "MyCluster", + "ResourcesVpcConfig": { + "SubnetIds": [ + "subnet-00000000000000000", + "subnet-00000000000000001" + ] + }, + "RoleArn": "arn:aws:iam::123456789012:role/MyEKSClusterRole" + }) + + assert step.to_dict() == { + 'Type': 'Task', + 'Resource': 'arn:aws:states:::eks:createCluster', + 'Parameters': { + "Name": "MyCluster", + "ResourcesVpcConfig": { + "SubnetIds": [ + "subnet-00000000000000000", + "subnet-00000000000000001" + ] + }, + "RoleArn": "arn:aws:iam::123456789012:role/MyEKSClusterRole" + }, + 'End': True + } + + +@patch.object(boto3.session.Session, 'region_name', 'us-east-1') +def test_eks_delete_cluster_step_creation(): + step = EksDeleteClusterStep('Delete Eks cluster', parameters={ + "Name": "MyCluster" + }) + + assert step.to_dict() == { + 'Type': 'Task', + 'Resource': 'arn:aws:states:::eks:deleteCluster.sync', + 'Parameters': { + "Name": "MyCluster" + }, + 'End': True + } + + step = EksDeleteClusterStep('Delete Eks cluster', wait_for_completion=False, parameters={ + "Name": "MyCluster" + }) + + assert step.to_dict() == { + 'Type': 'Task', + 'Resource': 'arn:aws:states:::eks:deleteCluster', + 'Parameters': { + "Name": "MyCluster" + }, + 'End': True + } + + +@patch.object(boto3.session.Session, 'region_name', 'us-east-1') +def test_eks_create_fargate_profile_step_creation(): + step = EksCreateFargateProfileStep('Create Fargate profile', parameters={ + "ClusterName": "MyCluster", + "FargateProfileName": "MyFargateProfile", + "PodExecutionRoleArn": "arn:aws:iam::123456789012:role/MyFargatePodExecutionRole", + "Selectors": [{ + "Namespace": "my-namespace", + "Labels": {"my-label": "my-value"} + }] + }) + + assert step.to_dict() == { + 'Type': 'Task', + 'Resource': 'arn:aws:states:::eks:createFargateProfile.sync', + 'Parameters': { + "ClusterName": "MyCluster", + "FargateProfileName": "MyFargateProfile", + "PodExecutionRoleArn": "arn:aws:iam::123456789012:role/MyFargatePodExecutionRole", + "Selectors": [{ + "Namespace": "my-namespace", + "Labels": {"my-label": "my-value"} + }] + }, + 'End': True + } + + step = EksCreateFargateProfileStep('Create Fargate profile', wait_for_completion=False, parameters={ + "ClusterName": "MyCluster", + "FargateProfileName": "MyFargateProfile", + "PodExecutionRoleArn": "arn:aws:iam::123456789012:role/MyFargatePodExecutionRole", + "Selectors": [{ + "Namespace": "my-namespace", + "Labels": {"my-label": "my-value"} + }] + }) + + assert step.to_dict() == { + 'Type': 'Task', + 'Resource': 'arn:aws:states:::eks:createFargateProfile', + 'Parameters': { + "ClusterName": "MyCluster", + "FargateProfileName": "MyFargateProfile", + "PodExecutionRoleArn": "arn:aws:iam::123456789012:role/MyFargatePodExecutionRole", + "Selectors": [{ + "Namespace": "my-namespace", + "Labels": {"my-label": "my-value"} + }] + }, + 'End': True + } + + +@patch.object(boto3.session.Session, 'region_name', 'us-east-1') +def test_eks_delete_fargate_profile_step_creation(): + step = EksDeleteFargateProfileStep('Delete Fargate profile', parameters={ + "ClusterName": "MyCluster", + "FargateProfileName": "MyFargateProfile" + }) + + assert step.to_dict() == { + 'Type': 'Task', + 'Resource': 'arn:aws:states:::eks:deleteFargateProfile.sync', + 'Parameters': { + "ClusterName": "MyCluster", + "FargateProfileName": "MyFargateProfile" + }, + 'End': True + } + + step = EksDeleteFargateProfileStep('Delete Fargate profile', wait_for_completion=False, parameters={ + "ClusterName": "MyCluster", + "FargateProfileName": "MyFargateProfile" + }) + + assert step.to_dict() == { + 'Type': 'Task', + 'Resource': 'arn:aws:states:::eks:deleteFargateProfile', + 'Parameters': { + "ClusterName": "MyCluster", + "FargateProfileName": "MyFargateProfile" + }, + 'End': True + } + + +@patch.object(boto3.session.Session, 'region_name', 'us-east-1') +def test_eks_create_node_group_step_creation(): + step = EksCreateNodeGroupStep('Create Node Group', parameters={ + "ClusterName": "MyCluster", + "NodegroupName": "MyNodegroup", + "NodeRole": "arn:aws:iam::123456789012:role/MyNodeInstanceRole", + "Subnets": [ + "subnet-00000000000000000", + "subnet-00000000000000001" + ] + }) + + assert step.to_dict() == { + 'Type': 'Task', + 'Resource': 'arn:aws:states:::eks:createNodegroup.sync', + 'Parameters': { + "ClusterName": "MyCluster", + "NodegroupName": "MyNodegroup", + "NodeRole": "arn:aws:iam::123456789012:role/MyNodeInstanceRole", + "Subnets": [ + "subnet-00000000000000000", + "subnet-00000000000000001" + ], + }, + 'End': True + } + + step = EksCreateNodeGroupStep('Create Node Group', wait_for_completion=False, parameters={ + "ClusterName": "MyCluster", + "NodegroupName": "MyNodegroup", + "NodeRole": "arn:aws:iam::123456789012:role/MyNodeInstanceRole", + "Subnets": [ + "subnet-00000000000000000", + "subnet-00000000000000001" + ] + }) + + assert step.to_dict() == { + 'Type': 'Task', + 'Resource': 'arn:aws:states:::eks:createNodegroup', + 'Parameters': { + "ClusterName": "MyCluster", + "NodegroupName": "MyNodegroup", + "NodeRole": "arn:aws:iam::123456789012:role/MyNodeInstanceRole", + "Subnets": [ + "subnet-00000000000000000", + "subnet-00000000000000001" + ], + }, + 'End': True + } + + +@patch.object(boto3.session.Session, 'region_name', 'us-east-1') +def test_eks_delete_node_group_step_creation(): + step = EksDeleteNodeGroupStep('Delete Node Group', parameters={ + "ClusterName": "MyCluster", + "NodegroupName": "MyNodegroup" + }) + + assert step.to_dict() == { + 'Type': 'Task', + 'Resource': 'arn:aws:states:::eks:deleteNodegroup.sync', + 'Parameters': { + "ClusterName": "MyCluster", + "NodegroupName": "MyNodegroup" + }, + 'End': True + } + + step = EksDeleteNodeGroupStep('Delete Node Group', wait_for_completion=False, parameters={ + "ClusterName": "MyCluster", + "NodegroupName": "MyNodegroup" + }) + + assert step.to_dict() == { + 'Type': 'Task', + 'Resource': 'arn:aws:states:::eks:deleteNodegroup', + 'Parameters': { + "ClusterName": "MyCluster", + "NodegroupName": "MyNodegroup" + }, + 'End': True + } From 52db3dece4e304ba24266495cc4771c5bdd66765 Mon Sep 17 00:00:00 2001 From: Carolyn Nguyen Date: Tue, 17 Aug 2021 09:21:14 -0700 Subject: [PATCH 2/6] Removed double import due to merge --- src/stepfunctions/steps/__init__.py | 1 - 1 file changed, 1 deletion(-) diff --git a/src/stepfunctions/steps/__init__.py b/src/stepfunctions/steps/__init__.py index 3a52d2a..a67a2d1 100644 --- a/src/stepfunctions/steps/__init__.py +++ b/src/stepfunctions/steps/__init__.py @@ -28,7 +28,6 @@ EksDeleteFargateProfileStep, EksDeleteNodeGroupStep, ) -from stepfunctions.steps.service import SnsPublishStep, SqsSendMessageStep from stepfunctions.steps.service import EmrCreateClusterStep, EmrTerminateClusterStep, EmrAddStepStep, EmrCancelStepStep, EmrSetClusterTerminationProtectionStep, EmrModifyInstanceFleetByNameStep, EmrModifyInstanceGroupByNameStep from stepfunctions.steps.service import EventBridgePutEventsStep from stepfunctions.steps.service import GlueDataBrewStartJobRunStep From c81ee56762916cb6aca353011025c24a1e92d3a2 Mon Sep 17 00:00:00 2001 From: Carolyn Nguyen Date: Tue, 17 Aug 2021 15:50:35 -0700 Subject: [PATCH 3/6] Adding eks:call and eks:runJob steps --- doc/services.rst | 7 +- src/stepfunctions/steps/__init__.py | 4 +- src/stepfunctions/steps/service.py | 114 +++++++++++--- tests/unit/test_service_steps.py | 227 +++++++++++++++++++++++++++- 4 files changed, 330 insertions(+), 22 deletions(-) diff --git a/doc/services.rst b/doc/services.rst index 148e1cb..eee4e9e 100644 --- a/doc/services.rst +++ b/doc/services.rst @@ -34,6 +34,8 @@ Amazon DynamoDB Amazon EKS ---------- +.. autoclass:: stepfunctions.steps.service.EksCallStep + .. autoclass:: stepfunctions.steps.service.EksCreateClusterStep .. autoclass:: stepfunctions.steps.service.EksCreateFargateProfileStep @@ -44,7 +46,10 @@ Amazon EKS .. autoclass:: stepfunctions.steps.service.EksDeleteFargateProfileStep -.. autoclass:: stepfunctions.steps.service.EksDeleteNodeGroupStep +.. autoclass:: stepfunctions.steps.service.EksDeleteNodegroupStep + +.. autoclass:: stepfunctions.steps.service.EksRunJobStep + Amazon EMR ----------- diff --git a/src/stepfunctions/steps/__init__.py b/src/stepfunctions/steps/__init__.py index a67a2d1..93bc0d9 100644 --- a/src/stepfunctions/steps/__init__.py +++ b/src/stepfunctions/steps/__init__.py @@ -21,12 +21,14 @@ from stepfunctions.steps.service import DynamoDBGetItemStep, DynamoDBPutItemStep, DynamoDBUpdateItemStep, DynamoDBDeleteItemStep from stepfunctions.steps.service import ( + EksCallStep, EksCreateClusterStep, EksCreateFargateProfileStep, EksCreateNodeGroupStep, EksDeleteClusterStep, EksDeleteFargateProfileStep, - EksDeleteNodeGroupStep, + EksDeleteNodegroupStep, + EksRunJobStep, ) from stepfunctions.steps.service import EmrCreateClusterStep, EmrTerminateClusterStep, EmrAddStepStep, EmrCancelStepStep, EmrSetClusterTerminationProtectionStep, EmrModifyInstanceFleetByNameStep, EmrModifyInstanceGroupByNameStep from stepfunctions.steps.service import EventBridgePutEventsStep diff --git a/src/stepfunctions/steps/service.py b/src/stepfunctions/steps/service.py index 0eb5cd7..c4b2c15 100644 --- a/src/stepfunctions/steps/service.py +++ b/src/stepfunctions/steps/service.py @@ -26,20 +26,22 @@ SQS_SERVICE_NAME = "sqs" - class DynamoDBApi(Enum): GetItem = "getItem" PutItem = "putItem" DeleteItem = "deleteItem" UpdateItem = "updateItem" -class EKSApi(Enum): + +class EksApi(Enum): CreateCluster = "createCluster" DeleteCluster = "deleteCluster" CreateFargateProfile = "createFargateProfile" DeleteFargateProfile = "deleteFargateProfile" CreateNodegroup = "createNodegroup" DeleteNodegroup = "deleteNodegroup" + RunJob = "runJob" + Call = "call" class ElasticMapReduceApi(Enum): @@ -229,7 +231,7 @@ def __init__(self, state_id, **kwargs): class EksCreateClusterStep(Task): """ - Creates a Task state that creates an Amazon EKS cluster. Call Amazon EKS with Step Functions `_ for more details. + Creates a Task state that creates an Amazon EKS cluster. See `Call Amazon EKS with Step Functions `_ for more details. """ def __init__(self, state_id, wait_for_completion=True, **kwargs): @@ -253,7 +255,7 @@ def __init__(self, state_id, wait_for_completion=True, **kwargs): """ kwargs[Field.Resource.value] = get_service_integration_arn(EKS_SERVICES_NAME, - EKSApi.CreateCluster, + EksApi.CreateCluster, IntegrationPattern.WaitForCompletion) else: """ @@ -261,7 +263,7 @@ def __init__(self, state_id, wait_for_completion=True, **kwargs): """ kwargs[Field.Resource.value] = get_service_integration_arn(EKS_SERVICES_NAME, - EKSApi.CreateCluster) + EksApi.CreateCluster) super(EksCreateClusterStep, self).__init__(state_id, **kwargs) @@ -292,7 +294,7 @@ def __init__(self, state_id, wait_for_completion=True, **kwargs): """ kwargs[Field.Resource.value] = get_service_integration_arn(EKS_SERVICES_NAME, - EKSApi.CreateFargateProfile, + EksApi.CreateFargateProfile, IntegrationPattern.WaitForCompletion) else: """ @@ -300,7 +302,7 @@ def __init__(self, state_id, wait_for_completion=True, **kwargs): """ kwargs[Field.Resource.value] = get_service_integration_arn(EKS_SERVICES_NAME, - EKSApi.CreateFargateProfile) + EksApi.CreateFargateProfile) super(EksCreateFargateProfileStep, self).__init__(state_id, **kwargs) @@ -331,7 +333,7 @@ def __init__(self, state_id, wait_for_completion=True, **kwargs): """ kwargs[Field.Resource.value] = get_service_integration_arn(EKS_SERVICES_NAME, - EKSApi.DeleteFargateProfile, + EksApi.DeleteFargateProfile, IntegrationPattern.WaitForCompletion) else: """ @@ -339,7 +341,7 @@ def __init__(self, state_id, wait_for_completion=True, **kwargs): """ kwargs[Field.Resource.value] = get_service_integration_arn(EKS_SERVICES_NAME, - EKSApi.DeleteFargateProfile) + EksApi.DeleteFargateProfile) super(EksDeleteFargateProfileStep, self).__init__(state_id, **kwargs) @@ -370,7 +372,7 @@ def __init__(self, state_id, wait_for_completion=True, **kwargs): """ kwargs[Field.Resource.value] = get_service_integration_arn(EKS_SERVICES_NAME, - EKSApi.CreateNodegroup, + EksApi.CreateNodegroup, IntegrationPattern.WaitForCompletion) else: """ @@ -378,12 +380,12 @@ def __init__(self, state_id, wait_for_completion=True, **kwargs): """ kwargs[Field.Resource.value] = get_service_integration_arn(EKS_SERVICES_NAME, - EKSApi.CreateNodegroup) + EksApi.CreateNodegroup) super(EksCreateNodeGroupStep, self).__init__(state_id, **kwargs) -class EksDeleteNodeGroupStep(Task): +class EksDeleteNodegroupStep(Task): """ Creates a Task state that deletes a node group. See `Call Amazon EKS with Step Functions `_ for more details. """ @@ -409,7 +411,7 @@ def __init__(self, state_id, wait_for_completion=True, **kwargs): """ kwargs[Field.Resource.value] = get_service_integration_arn(EKS_SERVICES_NAME, - EKSApi.DeleteNodegroup, + EksApi.DeleteNodegroup, IntegrationPattern.WaitForCompletion) else: """ @@ -417,9 +419,9 @@ def __init__(self, state_id, wait_for_completion=True, **kwargs): """ kwargs[Field.Resource.value] = get_service_integration_arn(EKS_SERVICES_NAME, - EKSApi.DeleteNodegroup) + EksApi.DeleteNodegroup) - super(EksDeleteNodeGroupStep, self).__init__(state_id, **kwargs) + super(EksDeleteNodegroupStep, self).__init__(state_id, **kwargs) class EksDeleteClusterStep(Task): @@ -448,7 +450,7 @@ def __init__(self, state_id, wait_for_completion=True, **kwargs): """ kwargs[Field.Resource.value] = get_service_integration_arn(EKS_SERVICES_NAME, - EKSApi.DeleteCluster, + EksApi.DeleteCluster, IntegrationPattern.WaitForCompletion) else: """ @@ -456,11 +458,89 @@ def __init__(self, state_id, wait_for_completion=True, **kwargs): """ kwargs[Field.Resource.value] = get_service_integration_arn(EKS_SERVICES_NAME, - EKSApi.DeleteCluster) + EksApi.DeleteCluster) super(EksDeleteClusterStep, self).__init__(state_id, **kwargs) +class EksRunJobStep(Task): + """ + Creates a Task state that allows you to run a job on your Amazon EKS cluster. See `Call Amazon EKS with Step Functions `_ for more details. + """ + + def __init__(self, state_id, wait_for_completion=True, **kwargs): + """ + Args: + state_id (str): State name whose length **must be** less than or equal to 128 unicode characters. State names **must be** unique within the scope of the whole state machine. + comment (str, optional): Human-readable comment or description. (default: None) + timeout_seconds (int, optional): Positive integer specifying timeout for the state in seconds. If the state runs longer than the specified timeout, then the interpreter fails the state with a `States.Timeout` Error Name. (default: 60) + timeout_seconds_path (str, optional): Path specifying the state's timeout value in seconds from the state input. When resolved, the path must select a field whose value is a positive integer. + heartbeat_seconds (int, optional): Positive integer specifying heartbeat timeout for the state in seconds. This value should be lower than the one specified for `timeout_seconds`. If more time than the specified heartbeat elapses between heartbeats from the task, then the interpreter fails the state with a `States.Timeout` Error Name. + heartbeat_seconds_path (str, optional): Path specifying the state's heartbeat value in seconds from the state input. When resolved, the path must select a field whose value is a positive integer. + input_path (str, optional): Path applied to the state’s raw input to select some or all of it; that selection is used by the state. (default: '$') + parameters (dict, optional): The value of this field becomes the effective input for the state. + result_path (str, optional): Path specifying the raw input’s combination with or replacement by the state’s result. (default: '$') + output_path (str, optional): Path applied to the state’s output after the application of `result_path`, producing the effective output which serves as the raw input for the next state. (default: '$') + wait_for_completion (bool, optional): Boolean value set to `True` if the Task state should wait to complete before proceeding to the next step in the workflow. (default: True) + """ + if wait_for_completion: + """ + Example resource arn: arn:aws:states:::eks:createCluster.sync + """ + + kwargs[Field.Resource.value] = get_service_integration_arn(EKS_SERVICES_NAME, + EksApi.RunJob, + IntegrationPattern.WaitForCompletion) + else: + """ + Example resource arn: arn:aws:states:::eks:createCluster + """ + + kwargs[Field.Resource.value] = get_service_integration_arn(EKS_SERVICES_NAME, + EksApi.RunJob) + + super(EksRunJobStep, self).__init__(state_id, **kwargs) + + +class EksCallStep(Task): + """ + Creates a Task state that allows you to use the Kubernetes API to read and write Kubernetes resource objects via a Kubernetes API endpoint. See `Call Amazon EKS with Step Functions `_ for more details. + """ + + def __init__(self, state_id, wait_for_completion=True, **kwargs): + """ + Args: + state_id (str): State name whose length **must be** less than or equal to 128 unicode characters. State names **must be** unique within the scope of the whole state machine. + comment (str, optional): Human-readable comment or description. (default: None) + timeout_seconds (int, optional): Positive integer specifying timeout for the state in seconds. If the state runs longer than the specified timeout, then the interpreter fails the state with a `States.Timeout` Error Name. (default: 60) + timeout_seconds_path (str, optional): Path specifying the state's timeout value in seconds from the state input. When resolved, the path must select a field whose value is a positive integer. + heartbeat_seconds (int, optional): Positive integer specifying heartbeat timeout for the state in seconds. This value should be lower than the one specified for `timeout_seconds`. If more time than the specified heartbeat elapses between heartbeats from the task, then the interpreter fails the state with a `States.Timeout` Error Name. + heartbeat_seconds_path (str, optional): Path specifying the state's heartbeat value in seconds from the state input. When resolved, the path must select a field whose value is a positive integer. + input_path (str, optional): Path applied to the state’s raw input to select some or all of it; that selection is used by the state. (default: '$') + parameters (dict, optional): The value of this field becomes the effective input for the state. + result_path (str, optional): Path specifying the raw input’s combination with or replacement by the state’s result. (default: '$') + output_path (str, optional): Path applied to the state’s output after the application of `result_path`, producing the effective output which serves as the raw input for the next state. (default: '$') + wait_for_completion (bool, optional): Boolean value set to `True` if the Task state should wait to complete before proceeding to the next step in the workflow. (default: True) + """ + if wait_for_completion: + """ + Example resource arn: arn:aws:states:::eks:createCluster.sync + """ + + kwargs[Field.Resource.value] = get_service_integration_arn(EKS_SERVICES_NAME, + EksApi.Call, + IntegrationPattern.WaitForCompletion) + else: + """ + Example resource arn: arn:aws:states:::eks:createCluster + """ + + kwargs[Field.Resource.value] = get_service_integration_arn(EKS_SERVICES_NAME, + EksApi.Call) + + super(EksCallStep, self).__init__(state_id, **kwargs) + + class GlueDataBrewStartJobRunStep(Task): """ diff --git a/tests/unit/test_service_steps.py b/tests/unit/test_service_steps.py index b183aae..1a401f3 100644 --- a/tests/unit/test_service_steps.py +++ b/tests/unit/test_service_steps.py @@ -17,12 +17,14 @@ from unittest.mock import patch from stepfunctions.steps.service import DynamoDBGetItemStep, DynamoDBPutItemStep, DynamoDBUpdateItemStep, DynamoDBDeleteItemStep from stepfunctions.steps.service import ( + EksCallStep, EksCreateClusterStep, EksCreateFargateProfileStep, EksCreateNodeGroupStep, EksDeleteClusterStep, EksDeleteFargateProfileStep, - EksDeleteNodeGroupStep, + EksDeleteNodegroupStep, + EksRunJobStep, ) from stepfunctions.steps.service import EmrCreateClusterStep, EmrTerminateClusterStep, EmrAddStepStep, EmrCancelStepStep, EmrSetClusterTerminationProtectionStep, EmrModifyInstanceFleetByNameStep, EmrModifyInstanceGroupByNameStep from stepfunctions.steps.service import EventBridgePutEventsStep @@ -942,7 +944,7 @@ def test_eks_create_node_group_step_creation_sync(): @patch.object(boto3.session.Session, 'region_name', 'us-east-1') def test_eks_delete_node_group_step_creation(): - step = EksDeleteNodeGroupStep("Delete Node Group", wait_for_completion=False, parameters={ + step = EksDeleteNodegroupStep("Delete Node Group", wait_for_completion=False, parameters={ 'ClusterName': 'MyCluster', 'NodegroupName': 'MyNodegroup' }) @@ -960,7 +962,7 @@ def test_eks_delete_node_group_step_creation(): @patch.object(boto3.session.Session, 'region_name', 'us-east-1') def test_eks_delete_node_group_step_creation_sync(): - step = EksDeleteNodeGroupStep("Delete Node Group sync", parameters={ + step = EksDeleteNodegroupStep("Delete Node Group sync", parameters={ 'ClusterName': 'MyCluster', 'NodegroupName': 'MyNodegroup' }) @@ -974,3 +976,222 @@ def test_eks_delete_node_group_step_creation_sync(): }, 'End': True } + + +@patch.object(boto3.session.Session, 'region_name', 'us-east-1') +def test_eks_run_job_step_creation(): + step = EksRunJobStep("Run Job", wait_for_completion=False, parameters={ + 'ClusterName': 'MyCluster', + 'CertificateAuthority': 'ANPAJ2UCCR6DPCEXAMPLE', + 'Endpoint': 'https://AKIAIOSFODNN7EXAMPLE.yl4.us-east-1.eks.amazonaws.com', + 'LogOptions': { + 'RetrieveLogs': True + }, + 'Job': { + 'apiVersion': 'batch/v1', + 'kind': 'Job', + 'metadata': { + 'name': 'example-job' + }, + 'spec': { + 'backoffLimit': 0, + 'template': { + 'metadata': { + 'name': 'example-job' + }, + 'spec': { + 'containers': [ + { + 'name': 'pi-2000', + 'image': 'perl', + 'command': ['perl'], + 'args': [ + '-Mbignum=bpi', + '-wle', + 'print bpi(2000)' + ] + } + ], + 'restartPolicy': 'Never' + } + } + } + } + }) + + assert step.to_dict() == { + 'Type': 'Task', + 'Resource': 'arn:aws:states:::eks:runJob', + 'Parameters': { + 'CertificateAuthority': 'ANPAJ2UCCR6DPCEXAMPLE', + 'ClusterName': 'MyCluster', + 'Endpoint': 'https://AKIAIOSFODNN7EXAMPLE.yl4.us-east-1.eks.amazonaws.com', + 'Job': { + 'apiVersion': 'batch/v1', + 'kind': 'Job', + 'metadata': {'name': 'example-job'}, + 'spec': { + 'backoffLimit': 0, + 'template': { + 'metadata': {'name': 'example-job'}, + 'spec': { + 'containers': [{ + 'args': ['-Mbignum=bpi', + '-wle', + 'print ' + 'bpi(2000)'], + 'command': ['perl'], + 'image': 'perl', + 'name': 'pi-2000'}], + 'restartPolicy': 'Never'} + } + } + }, + 'LogOptions': {'RetrieveLogs': True} + }, + 'End': True + } + + +@patch.object(boto3.session.Session, 'region_name', 'us-east-1') +def test_eks_run_job_step_creation_sync(): + step = EksRunJobStep("Run Job sync", parameters={ + 'ClusterName': 'MyCluster', + 'CertificateAuthority': 'ANPAJ2UCCR6DPCEXAMPLE', + 'Endpoint': 'https://AKIAIOSFODNN7EXAMPLE.yl4.us-east-1.eks.amazonaws.com', + 'LogOptions': { + 'RetrieveLogs': True + }, + 'Job': { + 'apiVersion': 'batch/v1', + 'kind': 'Job', + 'metadata': { + 'name': 'example-job' + }, + 'spec': { + 'backoffLimit': 0, + 'template': { + 'metadata': { + 'name': 'example-job' + }, + 'spec': { + 'containers': [ + { + 'name': 'pi-2000', + 'image': 'perl', + 'command': ['perl'], + 'args': [ + '-Mbignum=bpi', + '-wle', + 'print bpi(2000)' + ] + } + ], + 'restartPolicy': 'Never' + } + } + } + } + }) + + assert step.to_dict() == { + 'Type': 'Task', + 'Resource': 'arn:aws:states:::eks:runJob.sync', + 'Parameters': { + 'CertificateAuthority': 'ANPAJ2UCCR6DPCEXAMPLE', + 'ClusterName': 'MyCluster', + 'Endpoint': 'https://AKIAIOSFODNN7EXAMPLE.yl4.us-east-1.eks.amazonaws.com', + 'Job': { + 'apiVersion': 'batch/v1', + 'kind': 'Job', + 'metadata': {'name': 'example-job'}, + 'spec': { + 'backoffLimit': 0, + 'template': { + 'metadata': {'name': 'example-job'}, + 'spec': { + 'containers': [{ + 'args': ['-Mbignum=bpi', + '-wle', + 'print ' + 'bpi(2000)'], + 'command': ['perl'], + 'image': 'perl', + 'name': 'pi-2000'}], + 'restartPolicy': 'Never'} + } + } + }, + 'LogOptions': {'RetrieveLogs': True} + }, + 'End': True + } + + +@patch.object(boto3.session.Session, 'region_name', 'us-east-1') +def test_eks_call_step_creation(): + step = EksCallStep("Call", wait_for_completion=False, parameters={ + 'ClusterName': 'MyCluster', + 'CertificateAuthority': 'ANPAJ2UCCR6DPCEXAMPLE', + 'Endpoint': 'https://444455556666.yl4.us-east-1.eks.amazonaws.com', + 'Method': 'GET', + 'Path': '/api/v1/namespaces/default/pods', + 'QueryParameters': { + 'labelSelector': [ + 'job-name=example-job' + ] + } + }) + + assert step.to_dict() == { + 'Type': 'Task', + 'Resource': 'arn:aws:states:::eks:call', + 'Parameters': { + 'ClusterName': 'MyCluster', + 'CertificateAuthority': 'ANPAJ2UCCR6DPCEXAMPLE', + 'Endpoint': 'https://444455556666.yl4.us-east-1.eks.amazonaws.com', + 'Method': 'GET', + 'Path': '/api/v1/namespaces/default/pods', + 'QueryParameters': { + 'labelSelector': [ + 'job-name=example-job' + ] + } + }, + 'End': True + } + + +@patch.object(boto3.session.Session, 'region_name', 'us-east-1') +def test_eks_call_step_creation_sync(): + step = EksCallStep("Call sync", parameters={ + 'ClusterName': 'MyCluster', + 'CertificateAuthority': 'ANPAJ2UCCR6DPCEXAMPLE', + 'Endpoint': 'https://444455556666.yl4.us-east-1.eks.amazonaws.com', + 'Method': 'GET', + 'Path': '/api/v1/namespaces/default/pods', + 'QueryParameters': { + 'labelSelector': [ + 'job-name=example-job' + ] + } + }) + + assert step.to_dict() == { + 'Type': 'Task', + 'Resource': 'arn:aws:states:::eks:call.sync', + 'Parameters': { + 'ClusterName': 'MyCluster', + 'CertificateAuthority': 'ANPAJ2UCCR6DPCEXAMPLE', + 'Endpoint': 'https://444455556666.yl4.us-east-1.eks.amazonaws.com', + 'Method': 'GET', + 'Path': '/api/v1/namespaces/default/pods', + 'QueryParameters': { + 'labelSelector': [ + 'job-name=example-job' + ] + } + }, + 'End': True + } + From 1907b7d640ec6d1e037d87554ce99c822880596d Mon Sep 17 00:00:00 2001 From: Carolyn Nguyen <83104894+ca-nguyen@users.noreply.github.com> Date: Wed, 25 Aug 2021 14:40:38 -0700 Subject: [PATCH 4/6] Apply suggestions from code review Update documentation Co-authored-by: Adam Wong <55506708+wong-a@users.noreply.github.com> --- src/stepfunctions/steps/service.py | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/src/stepfunctions/steps/service.py b/src/stepfunctions/steps/service.py index c4b2c15..4212d7e 100644 --- a/src/stepfunctions/steps/service.py +++ b/src/stepfunctions/steps/service.py @@ -270,7 +270,7 @@ def __init__(self, state_id, wait_for_completion=True, **kwargs): class EksCreateFargateProfileStep(Task): """ - Creates a Task state that creates a Fargate profile. See `Call Amazon EKS with Step Functions `_ for more details. + Creates a Task state that creates an AWS Fargate profile for your Amazon EKS cluster. See `Call Amazon EKS with Step Functions `_ for more details. """ def __init__(self, state_id, wait_for_completion=True, **kwargs): @@ -309,7 +309,7 @@ def __init__(self, state_id, wait_for_completion=True, **kwargs): class EksDeleteFargateProfileStep(Task): """ - Creates a Task state that deletes a Fargate profile. See `Call Amazon EKS with Step Functions `_ for more details. + Creates a Task state that deletes an AWS Fargate profile. See `Call Amazon EKS with Step Functions `_ for more details. """ def __init__(self, state_id, wait_for_completion=True, **kwargs): @@ -348,7 +348,7 @@ def __init__(self, state_id, wait_for_completion=True, **kwargs): class EksCreateNodeGroupStep(Task): """ - Creates a Task state that creates a node group. See `Call Amazon EKS with Step Functions `_ for more details. + Creates a Task state that creates a node group for an Amazon EKS cluster. See `Call Amazon EKS with Step Functions `_ for more details. """ def __init__(self, state_id, wait_for_completion=True, **kwargs): @@ -387,7 +387,7 @@ def __init__(self, state_id, wait_for_completion=True, **kwargs): class EksDeleteNodegroupStep(Task): """ - Creates a Task state that deletes a node group. See `Call Amazon EKS with Step Functions `_ for more details. + Creates a Task state that deletes an Amazon EKS node group for a cluster. See `Call Amazon EKS with Step Functions `_ for more details. """ def __init__(self, state_id, wait_for_completion=True, **kwargs): From 9a670efe656b0d84baef0e7edeab1bb356019c06 Mon Sep 17 00:00:00 2001 From: Carolyn Nguyen Date: Wed, 1 Sep 2021 00:54:37 -0700 Subject: [PATCH 5/6] Removed eks:call.sync as it is not supported and removed logoptions parasms for eks:runJob --- src/stepfunctions/steps/service.py | 21 ++++--------- tests/unit/test_service_steps.py | 49 ++++-------------------------- 2 files changed, 12 insertions(+), 58 deletions(-) diff --git a/src/stepfunctions/steps/service.py b/src/stepfunctions/steps/service.py index 4212d7e..0e016fb 100644 --- a/src/stepfunctions/steps/service.py +++ b/src/stepfunctions/steps/service.py @@ -507,7 +507,7 @@ class EksCallStep(Task): Creates a Task state that allows you to use the Kubernetes API to read and write Kubernetes resource objects via a Kubernetes API endpoint. See `Call Amazon EKS with Step Functions `_ for more details. """ - def __init__(self, state_id, wait_for_completion=True, **kwargs): + def __init__(self, state_id, **kwargs): """ Args: state_id (str): State name whose length **must be** less than or equal to 128 unicode characters. State names **must be** unique within the scope of the whole state machine. @@ -520,23 +520,14 @@ def __init__(self, state_id, wait_for_completion=True, **kwargs): parameters (dict, optional): The value of this field becomes the effective input for the state. result_path (str, optional): Path specifying the raw input’s combination with or replacement by the state’s result. (default: '$') output_path (str, optional): Path applied to the state’s output after the application of `result_path`, producing the effective output which serves as the raw input for the next state. (default: '$') - wait_for_completion (bool, optional): Boolean value set to `True` if the Task state should wait to complete before proceeding to the next step in the workflow. (default: True) """ - if wait_for_completion: - """ - Example resource arn: arn:aws:states:::eks:createCluster.sync - """ - kwargs[Field.Resource.value] = get_service_integration_arn(EKS_SERVICES_NAME, - EksApi.Call, - IntegrationPattern.WaitForCompletion) - else: - """ - Example resource arn: arn:aws:states:::eks:createCluster - """ + """ + Example resource arn: arn:aws:states:::eks:createCluster + """ - kwargs[Field.Resource.value] = get_service_integration_arn(EKS_SERVICES_NAME, - EksApi.Call) + kwargs[Field.Resource.value] = get_service_integration_arn(EKS_SERVICES_NAME, + EksApi.Call) super(EksCallStep, self).__init__(state_id, **kwargs) diff --git a/tests/unit/test_service_steps.py b/tests/unit/test_service_steps.py index 1a401f3..25861ec 100644 --- a/tests/unit/test_service_steps.py +++ b/tests/unit/test_service_steps.py @@ -984,9 +984,6 @@ def test_eks_run_job_step_creation(): 'ClusterName': 'MyCluster', 'CertificateAuthority': 'ANPAJ2UCCR6DPCEXAMPLE', 'Endpoint': 'https://AKIAIOSFODNN7EXAMPLE.yl4.us-east-1.eks.amazonaws.com', - 'LogOptions': { - 'RetrieveLogs': True - }, 'Job': { 'apiVersion': 'batch/v1', 'kind': 'Job', @@ -1046,8 +1043,7 @@ def test_eks_run_job_step_creation(): 'restartPolicy': 'Never'} } } - }, - 'LogOptions': {'RetrieveLogs': True} + } }, 'End': True } @@ -1101,6 +1097,9 @@ def test_eks_run_job_step_creation_sync(): 'CertificateAuthority': 'ANPAJ2UCCR6DPCEXAMPLE', 'ClusterName': 'MyCluster', 'Endpoint': 'https://AKIAIOSFODNN7EXAMPLE.yl4.us-east-1.eks.amazonaws.com', + 'LogOptions': { + 'RetrieveLogs': True + }, 'Job': { 'apiVersion': 'batch/v1', 'kind': 'Job', @@ -1121,8 +1120,7 @@ def test_eks_run_job_step_creation_sync(): 'restartPolicy': 'Never'} } } - }, - 'LogOptions': {'RetrieveLogs': True} + } }, 'End': True } @@ -1130,7 +1128,7 @@ def test_eks_run_job_step_creation_sync(): @patch.object(boto3.session.Session, 'region_name', 'us-east-1') def test_eks_call_step_creation(): - step = EksCallStep("Call", wait_for_completion=False, parameters={ + step = EksCallStep("Call", parameters={ 'ClusterName': 'MyCluster', 'CertificateAuthority': 'ANPAJ2UCCR6DPCEXAMPLE', 'Endpoint': 'https://444455556666.yl4.us-east-1.eks.amazonaws.com', @@ -1160,38 +1158,3 @@ def test_eks_call_step_creation(): }, 'End': True } - - -@patch.object(boto3.session.Session, 'region_name', 'us-east-1') -def test_eks_call_step_creation_sync(): - step = EksCallStep("Call sync", parameters={ - 'ClusterName': 'MyCluster', - 'CertificateAuthority': 'ANPAJ2UCCR6DPCEXAMPLE', - 'Endpoint': 'https://444455556666.yl4.us-east-1.eks.amazonaws.com', - 'Method': 'GET', - 'Path': '/api/v1/namespaces/default/pods', - 'QueryParameters': { - 'labelSelector': [ - 'job-name=example-job' - ] - } - }) - - assert step.to_dict() == { - 'Type': 'Task', - 'Resource': 'arn:aws:states:::eks:call.sync', - 'Parameters': { - 'ClusterName': 'MyCluster', - 'CertificateAuthority': 'ANPAJ2UCCR6DPCEXAMPLE', - 'Endpoint': 'https://444455556666.yl4.us-east-1.eks.amazonaws.com', - 'Method': 'GET', - 'Path': '/api/v1/namespaces/default/pods', - 'QueryParameters': { - 'labelSelector': [ - 'job-name=example-job' - ] - } - }, - 'End': True - } - From 1cef855fb1a8fa7ea93123c012b6c039fe66e21c Mon Sep 17 00:00:00 2001 From: Carolyn Nguyen Date: Wed, 1 Sep 2021 09:29:31 -0700 Subject: [PATCH 6/6] Corrected eks::call and eks::runJob resource arn example --- src/stepfunctions/steps/service.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/src/stepfunctions/steps/service.py b/src/stepfunctions/steps/service.py index 0e016fb..a0c3950 100644 --- a/src/stepfunctions/steps/service.py +++ b/src/stepfunctions/steps/service.py @@ -485,7 +485,7 @@ def __init__(self, state_id, wait_for_completion=True, **kwargs): """ if wait_for_completion: """ - Example resource arn: arn:aws:states:::eks:createCluster.sync + Example resource arn: arn:aws:states:::eks:runJob.sync """ kwargs[Field.Resource.value] = get_service_integration_arn(EKS_SERVICES_NAME, @@ -493,7 +493,7 @@ def __init__(self, state_id, wait_for_completion=True, **kwargs): IntegrationPattern.WaitForCompletion) else: """ - Example resource arn: arn:aws:states:::eks:createCluster + Example resource arn: arn:aws:states:::eks:runJob """ kwargs[Field.Resource.value] = get_service_integration_arn(EKS_SERVICES_NAME, @@ -523,7 +523,7 @@ def __init__(self, state_id, **kwargs): """ """ - Example resource arn: arn:aws:states:::eks:createCluster + Example resource arn: arn:aws:states:::eks:call """ kwargs[Field.Resource.value] = get_service_integration_arn(EKS_SERVICES_NAME,