Skip to content

Migrate to Python3.12 #9

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
144 changes: 137 additions & 7 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -123,13 +123,13 @@ What if a job in the ETL workflow fails? In such a case, there are error-handlin
# Preparing your development environment
Here’s a high-level checklist of what you need to do to setup your development environment.

1. Sign up for an AWS account if you haven't already and create an Administrator User. The steps are published [here](http://docs.aws.amazon.com/lambda/latest/dg/setting-up.html).
1. Sign up for an AWS account if you haven't already and create an Administrator User.

2. Ensure that you have Python 2.7 and Pip installed on your machine. Instructions for that varies based on your operating system and OS version.
2. Ensure that you have Python 3 and Pip installed on your machine. Instructions for that varies based on your operating system and OS version.

3. Create a Python [virtual environment](https://virtualenv.pypa.io/en/stable/) for the project with Virtualenv. This helps keep project’s python dependencies neatly isolated from your Operating System’s default python installation. **Once you’ve created a virtual python environment, activate it before moving on with the following steps**.

4. Use Pip to [install AWS CLI](http://docs.aws.amazon.com/cli/latest/userguide/installing.html). [Configure](http://docs.aws.amazon.com/cli/latest/userguide/cli-chap-getting-started.html) the AWS CLI. It is recommended that the access keys you configure are associated with an IAM User who has full access to the following:
4. Use Pip to [install AWS CLI V2](https://docs.aws.amazon.com/cli/latest/userguide/getting-started-install.html). [Configure](http://docs.aws.amazon.com/cli/latest/userguide/cli-chap-getting-started.html) the AWS CLI. It is recommended that the access keys you configure are associated with an IAM User who has full access to the following:
- Amazon S3
- Amazon DynamoDB
- AWS Lambda
Expand All @@ -138,9 +138,9 @@ Here’s a high-level checklist of what you need to do to setup your development
- AWS Step Functions
- Creating IAM Roles

The IAM User can be the Administrator User you created in Step 1.
**The IAM User can be the Administrator User you created in Step 1.**

5. Make sure you choose a region where all of the above services are available, such as us-east-1 (N. Virginia). Visit [this page](https://aws.amazon.com/about-aws/global-infrastructure/regional-product-services/) to learn more about service availability in AWS regions.
5. Make sure you choose a region where all of the above services are available, such as ap-south-1 (Mumbai). Visit [this page](https://aws.amazon.com/about-aws/global-infrastructure/regional-product-services/) to learn more about service availability in AWS regions.

6. Use Pip to install [Pynt](https://github.com/rags/pynt). Pynt is used for the project's Python-based build scripts.

Expand Down Expand Up @@ -233,6 +233,10 @@ Sample content:

```json
{
"athenarunner": {
"ArtifactBucketName": "<NO-DEFAULT>",
"LambdaSourceS3Key": "src/athenarunner.zip"
},
"gluerunner": {
"ArtifactBucketName": "<NO-DEFAULT>",
"LambdaSourceS3Key":"src/gluerunner.zip"
Expand All @@ -249,7 +253,7 @@ Sample content:

* `LambdaSourceS3Key` - The Amazon S3 key (e.g. `src/gluerunner.zip`) for your AWS Lambda function's .zip package.

>**NOTE: The values set here must match values set in `cloudformation/gluerunner-lambda-params.json`.**
>**NOTE: The values set here must match values set in `cloudformation/gluerunner-lambda-params.json, cloudformation/athenarunner-lambda-params.json, cloudformation/step-function-resources-params.json`.**


<a name="cloudformationglue-resources-paramsjson"></a>
Expand Down Expand Up @@ -317,6 +321,33 @@ Specifies the parameters used by Glue Runner AWS Lambda function at run-time.

* `glue_job_capacity` - The capacity, in [Data Processing Units](https://aws.amazon.com/glue/pricing/), which is allocated to every AWS Glue job started by Glue Runner.

<a name="athenarunner-configjson"></a>
## lambda/athenarunner/athenarunner-config.json

Specifies the parameters used by Athena Runner AWS Lambda function at run-time.

```json
{
"sfn_activity_arn": "arn:aws:states:<AWS-REGION>:<AWS-ACCOUNT-ID>:activity:AthenaRunnerActivity",
"sfn_worker_name": "athenarunner",
"ddb_table": "AthenaRunnerActiveJobs",
"ddb_query_limit": 50,
"sfn_max_executions": 100
}
```

#### Parameters:

* `sfn_activity_arn` - AWS Step Functions activity task ARN. This ARN is used to query AWS Step Functions for new tasks (i.e. new AWS Glue jobs to run). The ARN is a combination of the AWS region, your AWS account Id, and the name property of the [AWS::StepFunctions::Activity](https://docs.aws.amazon.com/AWSCloudFormation/latest/UserGuide/aws-resource-stepfunctions-activity.html) resource in the `stepfunctions-resources.yaml` CloudFormation template. An ARN looks as follows `arn:aws:states:<AWS-REGION>:<AWS-ACCOUNT-ID>:activity:<STEPFUNCTIONS-ACTIVITY-NAME>`. By default, the activity name is `AthenaRunnerActivity`.

* `sfn_worker_name` - A property that is passed to AWS Step Functions when getting activity tasks.

* `ddb_table` - The DynamoDB table name in which state for active AWS Glue jobs is persisted between Glue Runner invocations. **This parameter's value must match the value of `DDBTableName` parameter in the `cloudformation/athenarunner-lambda-params.json` config file.**

* `ddb_query_limit` - Maximum number of items to be retrieved when the DynamoDB state table is queried. Default is `50` items.

* `glue_job_capacity` - The capacity, in [Data Processing Units](https://aws.amazon.com/glue/pricing/), which is allocated to every AWS Glue job started by Glue Runner.

<a name="cloudformationstep-functions-resources-paramsjson"></a>
## cloudformation/step-functions-resources-params.json

Expand Down Expand Up @@ -354,6 +385,55 @@ Both parameters are also used by AWS CloudFormation during stack creation.

* `DataBucketName` - The Amazon S3 bucket name (without the `s3://...` prefix). All OnS3ObjectCreated CloudWatch Events will for the bucket be handled by the `ons3objectcreated` AWS Lambda function. **This bucket will be created by CloudFormation. CloudFormation stack creation will fail if the bucket already exists.**


<a name="cleaning up"></a>
# Clean up project on AWS
## Skip this section and move to Build Commands if you have already cleaned up files and related resources of this project on AWS or it is your first time deploying

### Delete data and buckets on S3

**Be very careful when deleting buckets so that you important data or important bucket is not accidentally deleted by you.**

It is recommended to use same prefix for buckets so it will be easier for searching and management of buckets

- Go to Amazon S3 console and in Buckets section see the General purpose buckets tab.
- Search for all the bucket name you have given in the configuration.
- Either you can delete the 5 buckets one by one from
- Console : Select bucket, Empty bucket, Delete bucket (Long process)
- CLI : run : `aws s3 rb s3://<bucket-name> --force`
- Ensure buckets are deleted

### Stop Execution of StateMachines
- Goto Step Functions > State machines.
- Open the statemachine `MarketingAndSalesETLOrchestrator`. On the page of State Machine below in the `execution tab` see if any executions are running. If yes then select those executions and click stop execution.
- Do the same for `AthenaRunnerTestETLOrchestrator` state machine.

### Other resources cleaning
The other remaining resource cleaning can be easily handled by deletion of stack
- Go to Cloudformation
- Click on Stacks, you’ll see the 4 stacks as :
- athenarunner-lambda
- gluerunner-lambda
- glue-resources
- step-functions-resources
- Now you have to delete them using :
- Console :
- Click any stack
- Click on Delete button and see the progress of stack deletion on clicking resources tab
- Ensure the stack status is “DELETE_COMPLETE” otherwise you need to check what resources are causing problems in deletion. You need to delete those resources first
- Do the same for others
- Using Pynt API :
- Delete the 4 stack using below command
```python
pynt deletestack["step-functions-resources"] \
pynt deletestack["step-functions-resources"] \
pynt deletestack["gluerunner-lambda"] \
pynt deletestack["athenarunner-lambda"]
```
- Ensure all the deletion is successful and you can verify in the Console by opening Cloudformation > Stacks. You either will be seeing the status of “DELETE_COMPLETE” or there will be no stacks.



<a name="build-commands"></a>
# Build commands
Common interactions with the project have been simplified for you. Using `pynt`, the following tasks are automated with simple commands:
Expand Down Expand Up @@ -476,8 +556,12 @@ pynt deploygluescripts

# Package and deploy Glue Runner AWS Lambda function
pynt packagelambda


pynt deploylambda

#Create bucket with Parameter value of key : ArtifactBucketName in cloudformation/glue-resources-params.json
aws s3 mb s3://<NO-DEFAULT>
# Deploy AWS Glue scripts
pynt deploygluescripts

Expand All @@ -498,9 +582,52 @@ pynt createstack["athenarunner-lambda"]

Note that the `step-functions-resources` stack **must** be created first, before the `glue-resources` stack.


## Setting runtime parameters of Glue Job
**// TODO Setting Runtime Parameters of Glue Job through script**

This is the bug in project that runtime parameters are not automatically set due to which these parameters have to be set manually

- Using AWS Console
- Go to AWS GLUE
- Click on ETL Jobs on left side navigation bar
- There are 3 jobs whose runtime needs to be modified and same process will be followed for all 3
- The 3 jobs are
- JoinMarketingAndSalesData
- ProcessSalesData
- ProcessMarketingData
- Process :
- Click on the Job, the script tab will open
- Now click on Job Details Tab
- You need to select the Language as Python 3
- Worker type and number of workers : Suits your need (Recommended to take lower configuration to save cost)
- Click on save.
- **Do for other glue jobs as well**



## Configuring Permissions of AthenaRunner
**// TODO Configuring permission policy automatically**

The permissions of Athenarunner lambda are not configured properly through template so it has to be given explicitly

- Go to IAM console
- Click on Roles on Left panel
- search for role name : `athenarunner-lambda-AthenaRunnerLambdaExecutionRole-*`
- Open the role by clicking on it
- Go to permissions tab
- In Permissions Policy sub-tab click on `Add Permissions'
- Click on Attach Policies
- Seach for `AmazonS3FullAccess` then click check box
- Then again search for `AWSGLueServiceRole` then click checkbox
- Click on Add permissions on bottom right of page.
- Check if the 2 permissions are attached.

## Start State Machines

Now head to the AWS Step Functions console. Start and observe an execution of the 'MarketingAndSalesETLOrchestrator' state machine. Execution should halt at the 'Wait for XYZ Data' states. At this point, you should upload the sample .CSV files under the `samples` directory to the S3 bucket you specified as the `DataBucketName` parameter value in `step-functions-resources-config.json` configuration file. **Upload the marketing sample file under prefix 'marketing' and the sales sample file under prefix 'sales'. To do that, you may issue the following AWS CLI commands while at the project's root directory:**

```
```bash
aws s3 cp samples/MarketingData_QuickSightSample.csv s3://{DataBucketName}/marketing/

aws s3 cp samples/SalesPipeline_QuickSightSample.csv s3://{DataBucketName}/sales/
Expand All @@ -514,6 +641,9 @@ If you have setup and run the sample correctly, you should see this output in th

This indicates that all jobs have been run and orchestrated successfully.

![AthenaRunnerLambda](resources/AthenaRunnerStateMachine.png)


<a name="license"></a>
# License
This project is licensed under the MIT-No Attribution (MIT-0) license.
Expand Down
2 changes: 1 addition & 1 deletion build.py
Original file line number Diff line number Diff line change
Expand Up @@ -208,7 +208,7 @@ def createstack(* stacks, **kwargs):

except Exception as e:
print("Stack creation FAILED.")
print(e.message)
print(e)


@task()
Expand Down
2 changes: 1 addition & 1 deletion cloudformation/athenarunner-lambda-params.json
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
[
{
"ParameterKey": "ArtifactBucketName",
"ParameterValue": "<NO-DEFAULT>"
"ParameterValue": "marketing-project-athena-runner-bucket"
},
{
"ParameterKey": "LambdaSourceS3Key",
Expand Down
2 changes: 1 addition & 1 deletion cloudformation/athenarunner-lambda.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -121,7 +121,7 @@ Resources:
S3Key: !Ref LambdaSourceS3Key
Timeout: 180 #seconds
MemorySize: 128 #MB
Runtime: python2.7
Runtime: python3.12
DependsOn:
- AthenaRunnerLambdaExecutionRole

Expand Down
4 changes: 2 additions & 2 deletions cloudformation/glue-resources-params.json
Original file line number Diff line number Diff line change
@@ -1,15 +1,15 @@
[
{
"ParameterKey": "ArtifactBucketName",
"ParameterValue": "<NO-DEFAULT>"
"ParameterValue": "marketing-project-glue-resources-bucket"
},
{
"ParameterKey": "ETLScriptsPrefix",
"ParameterValue": "scripts"
},
{
"ParameterKey": "DataBucketName",
"ParameterValue": "<NO-DEFAULT>"
"ParameterValue": "marketing-project-data-bucket"
},
{
"ParameterKey": "ETLOutputPrefix",
Expand Down
2 changes: 1 addition & 1 deletion cloudformation/gluerunner-lambda-params.json
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
[
{
"ParameterKey": "ArtifactBucketName",
"ParameterValue": "<NO-DEFAULT>"
"ParameterValue": "marketing-project-gluerunner-bucket"
},
{
"ParameterKey": "LambdaSourceS3Key",
Expand Down
2 changes: 1 addition & 1 deletion cloudformation/gluerunner-lambda.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -120,7 +120,7 @@ Resources:
S3Key: !Ref LambdaSourceS3Key
Timeout: 180 #seconds
MemorySize: 128 #MB
Runtime: python2.7
Runtime: python3.12
DependsOn:
- GlueRunnerLambdaExecutionRole

Expand Down
4 changes: 2 additions & 2 deletions cloudformation/step-functions-resources-params.json
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
[
{
"ParameterKey": "ArtifactBucketName",
"ParameterValue": "<NO-DEFAULT>"
"ParameterValue": "marketing-project-step-function-bucket"
},
{
"ParameterKey": "LambdaSourceS3Key",
Expand All @@ -13,6 +13,6 @@
},
{
"ParameterKey": "DataBucketName",
"ParameterValue": "etl-orchestrator-745-data"
"ParameterValue": "marketing-project-data-bucket"
}
]
4 changes: 2 additions & 2 deletions cloudformation/step-functions-resources.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ Resources:
- Effect: Allow
Principal:
Service:
- states.us-east-1.amazonaws.com
- states.ap-south-1.amazonaws.com
Action:
- sts:AssumeRole
Policies:
Expand Down Expand Up @@ -315,7 +315,7 @@ Resources:
S3Key: !Ref LambdaSourceS3Key
Timeout: 180 #seconds
MemorySize: 128 #MB
Runtime: python2.7
Runtime: python3.12
DependsOn:
- OnS3ObjectCreatedLambdaExecutionRole

Expand Down
2 changes: 1 addition & 1 deletion lambda/athenarunner/athenarunner-config.json
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
{
"sfn_activity_arn": "arn:aws:states:us-east-1:074599006431:activity:AthenaRunnerActivity",
"sfn_activity_arn": "arn:aws:states:<AWS-REGION>:<AWS-ACCOUNTID>:activity:AthenaRunnerActivity",
"sfn_worker_name": "athenarunner",
"ddb_table": "AthenaRunnerActiveJobs",
"ddb_query_limit": 50,
Expand Down
8 changes: 4 additions & 4 deletions lambda/athenarunner/athenarunner.py
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ def start_athena_queries(config):
)

except Exception as e:
logger.critical(e.message)
logger.critical(e)
logger.critical(
'Unrecoverable error invoking get_activity_task for {}.'.format(sfn_activity_arn))
raise
Expand Down Expand Up @@ -146,7 +146,7 @@ def start_athena_queries(config):

except Exception as e:
logger.error('Failed to query Athena database "{}" with query string "{}"..'.format(athena_database, athena_query_string))
logger.error('Reason: {}'.format(e.message))
logger.error('Reason: {}'.format(e))
logger.info('Sending "Task Failed" signal to Step Functions.')

response = sfn.send_task_failure(
Expand Down Expand Up @@ -283,7 +283,7 @@ def check_athena_queries(config):
except Exception as e:
logger.error('There was a problem checking status of Athena query..')
logger.error('Glue job Run Id "{}"'.format(athena_query_execution_id))
logger.error('Reason: {}'.format(e.message))
logger.error('Reason: {}'.format(e))
logger.info('Checking next Athena query.')

# Task failed, next item
Expand Down Expand Up @@ -317,6 +317,6 @@ def handler(event, context):

except Exception as e:
logger.critical('*** ERROR: Athena runner lambda function failed ***')
logger.critical(e.message)
logger.critical(e)
raise

8 changes: 4 additions & 4 deletions lambda/gluerunner/gluerunner.py
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ def start_glue_jobs(config):
)

except Exception as e:
logger.critical(e.message)
logger.critical(e)
logger.critical(
'Unrecoverable error invoking get_activity_task for {}.'.format(sfn_activity_arn))
raise
Expand Down Expand Up @@ -119,7 +119,7 @@ def start_glue_jobs(config):

except Exception as e:
logger.error('Failed to start Glue job named "{}"..'.format(glue_job_name))
logger.error('Reason: {}'.format(e.message))
logger.error('Reason: {}'.format(e))
logger.info('Sending "Task Failed" signal to Step Functions.')

response = sfn.send_task_failure(
Expand Down Expand Up @@ -244,7 +244,7 @@ def check_glue_jobs(config):
except Exception as e:
logger.error('There was a problem checking status of Glue job "{}"..'.format(glue_job_name))
logger.error('Glue job Run Id "{}"'.format(glue_job_run_id))
logger.error('Reason: {}'.format(e.message))
logger.error('Reason: {}'.format(e))
logger.info('Checking next Glue job.')


Expand Down Expand Up @@ -276,6 +276,6 @@ def handler(event, context):

except Exception as e:
logger.critical('*** ERROR: Glue runner lambda function failed ***')
logger.critical(e.message)
logger.critical(e)
raise

Loading