Skip to content

Commit c83ee7c

Browse files
committed
migrate to public repo
1 parent c054f3f commit c83ee7c

25 files changed

+575
-1
lines changed

.gitignore

+16
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,16 @@
1+
*.swp
2+
package-lock.json
3+
__pycache__
4+
.pytest_cache
5+
.venv
6+
*.egg-info
7+
8+
# CDK asset staging directory
9+
.cdk.staging
10+
cdk.out
11+
12+
.idea/*
13+
14+
*.zip
15+
loader_package/*
16+

Makefile

+64
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,64 @@
1+
VENV_CMD ?= python3 -m venv
2+
VENV_DIR ?= .venv
3+
VENV_RUN = . $(VENV_DIR)/bin/activate
4+
REGION = "us-east-2"
5+
6+
clean:
7+
rm -rf loader_package
8+
rm -rf kinesis_tinybird_loader_pkg.zip
9+
rm -rf test_logger_pkg.zip
10+
11+
build-tinybird-lambda:
12+
pip3 install -r lambdas/kinesis_tinybird_loader/requirements.txt --target ./loader_package
13+
cd loader_package ; zip -r ../kinesis_tinybird_loader_pkg.zip .
14+
zip -gj kinesis_tinybird_loader_pkg.zip lambdas/kinesis_tinybird_loader/kinesis_tinybird_loader.py
15+
16+
build-test-logger-lambda:
17+
zip -gj test_logger_pkg.zip lambdas/test_logger/test_logger.py
18+
19+
build-lambdas: build-tinybird-lambda build-test-logger-lambda
20+
21+
$(VENV_DIR)/bin/activate: setup.cfg setup.py
22+
$(VENV_CMD) $(VENV_DIR)
23+
$(VENV_RUN); pip install --upgrade pip setuptools wheel
24+
touch $(VENV_DIR)/bin/activate
25+
26+
venv: $(VENV_DIR)/bin/activate
27+
28+
lint:
29+
$(VENV_RUN); python -m pflake8 --show-source
30+
31+
lint-modified:
32+
$(VENV_RUN); python -m pflake8 --show-source `git ls-files -m | grep '\.py$$' | xargs`
33+
34+
format:
35+
$(VENV_RUN); python -m isort . ; python -m black .
36+
37+
format-modified:
38+
$(VENV_RUN); python -m isort `git ls-files -m | grep '\.py$$' | xargs`; python -m black `git ls-files -m | grep '\.py$$' | xargs`
39+
40+
init-precommit: install
41+
$(VENV_RUN); pre-commit install
42+
43+
install: venv
44+
$(VENV_RUN); pip install -e .[test,dev]
45+
46+
deploy-external-resources-local: clean install build-lambdas
47+
$(VENV_RUN); cd deployments/cdk; cdklocal bootstrap --app "python local_app.py" || true
48+
$(VENV_RUN); cd deployments/cdk; cdklocal deploy --app "python local_app.py" ExternalTestResourcesStack --require-approval never
49+
50+
invoke-test-logger-local:
51+
$(VENV_RUN); LAMBDA_NAME=$(shell awslocal cloudformation list-exports --region=$(REGION) --query="Exports[?Name=='test-logger-lambda-name'].Value" --no-paginate --output text); \
52+
$(VENV_RUN); awslocal lambda invoke --function-name $$LAMBDA_NAME /dev/stdout 2>/dev/null --region $(REGION) --payload '{"message": "hello world"}'
53+
54+
deploy-local: deploy-external-resources-local invoke-test-logger-local
55+
$(VENV_RUN); cd deployments/cdk; cdklocal deploy --app "python local_app.py" DataPipelineStack --require-approval never
56+
57+
test-integration: clean install
58+
$(VENV_RUN); pytest tests/integration
59+
60+
start-request-recorder:
61+
$(VENV_RUN); python tests/integration/mocks/tinybird_request_recorder.py
62+
63+
64+
.PHONY: clean build-tinybird-lambda build-test-logger-lambda build-lambdas venv lint lint-modified format format-modified init-precommit install

README.md

+25-1
Original file line numberDiff line numberDiff line change
@@ -1 +1,25 @@
1-
# serverless-streaming-data-pipeline
1+
# LocalStack Streaming Data Pipeline Demo
2+
3+
A serverless ETL for demonstrating streaming structured analytics events from Lambda to Tinybird using CloudWatch and Kinesis.
4+
5+
## Requirements
6+
* LocalStack Pro
7+
* AWS [CDK](https://aws.amazon.com/cdk/)
8+
* [cdklocal](https://github.com/localstack/aws-cdk-local)
9+
* Python 3
10+
11+
12+
## Running Locally
13+
14+
### Deploy Under LocalStack
15+
16+
1. Start LocalStack Pro: `LOCALSTACK_API_KEY="your_key" localstack start -d`
17+
2. Install python dependencies: `make install`
18+
3. Deploy locally: `make deploy-local`
19+
20+
### Testing
21+
After following the local deployments steps above, you can run the sample integration test with `make test-integration`.
22+
Alternatively, you can test end-to-end manually:
23+
* Start the local Tinybird server mock with `make start-request-recorder`
24+
* Emit an event by invoking the logger Lambda: `make invoke-test-logger-local`
25+
* Observe the event payload arrive in the mock server output.

deployments/cdk/__init__.py

Whitespace-only changes.

deployments/cdk/cdk.json

+30
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,30 @@
1+
{
2+
"app": "python3 app.py",
3+
"watch": {
4+
"include": [
5+
"**"
6+
],
7+
"exclude": [
8+
"README.md",
9+
"cdk*.json",
10+
"requirements*.txt",
11+
"source.bat",
12+
"**/__init__.py",
13+
"python/__pycache__",
14+
"tests"
15+
]
16+
},
17+
"context": {
18+
"@aws-cdk/aws-apigateway:usagePlanKeyOrderInsensitiveId": true,
19+
"@aws-cdk/core:stackRelativeExports": true,
20+
"@aws-cdk/aws-rds:lowercaseDbIdentifier": true,
21+
"@aws-cdk/aws-lambda:recognizeVersionProps": true,
22+
"@aws-cdk/aws-cloudfront:defaultSecurityPolicyTLSv1.2_2021": true,
23+
"@aws-cdk-containers/ecs-service-extensions:enableDefaultLogDriver": true,
24+
"@aws-cdk/aws-ec2:uniqueImdsv2TemplateName": true,
25+
"@aws-cdk/core:target-partitions": [
26+
"aws",
27+
"aws-cn"
28+
]
29+
}
30+
}

deployments/cdk/constants.py

+3
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,3 @@
1+
from aws_cdk import Environment
2+
3+
ENV_LOCAL = Environment(account="000000000000", region="us-east-2")
+71
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,71 @@
1+
from os import path
2+
3+
from aws_cdk import (
4+
Duration,
5+
Stack,
6+
aws_kinesis,
7+
aws_lambda,
8+
aws_lambda_event_sources,
9+
aws_logs,
10+
aws_logs_destinations,
11+
)
12+
from constructs import Construct
13+
14+
15+
class DataPipelineStack(Stack):
16+
def __init__(
17+
self,
18+
scope: Construct,
19+
construct_id: str,
20+
monitored_log_group_arn: str,
21+
tinybird_auth_token: str,
22+
tinybird_url: str,
23+
**kwargs
24+
) -> None:
25+
super().__init__(scope, construct_id, **kwargs)
26+
27+
kinesis_stream = aws_kinesis.Stream(
28+
self,
29+
"data_pipeline_kinesis_stream",
30+
shard_count=1,
31+
)
32+
33+
monitored_log_group = aws_logs.LogGroup.from_log_group_arn(
34+
self, "monitored_log_group", monitored_log_group_arn
35+
)
36+
monitored_log_group.add_subscription_filter(
37+
"monitored_log_group_cloudwatch_subscription",
38+
destination=aws_logs_destinations.KinesisDestination(kinesis_stream),
39+
filter_pattern=aws_logs.FilterPattern.all(
40+
aws_logs.FilterPattern.exists("$.event_class"),
41+
aws_logs.FilterPattern.exists("$.event_type"),
42+
),
43+
)
44+
45+
loader_lambda = aws_lambda.Function(
46+
self,
47+
"kinesis_tinybird_loader",
48+
runtime=aws_lambda.Runtime.PYTHON_3_9,
49+
handler="kinesis_tinybird_loader.handler",
50+
code=aws_lambda.Code.from_asset(
51+
path.join(
52+
path.realpath(path.dirname(__file__)),
53+
"../..",
54+
"kinesis_tinybird_loader_pkg.zip",
55+
)
56+
),
57+
timeout=Duration.seconds(7),
58+
environment={
59+
"TINYBIRD_AUTH_TOKEN": tinybird_auth_token,
60+
"TINYBIRD_URL": tinybird_url,
61+
},
62+
)
63+
loader_lambda.add_event_source(
64+
aws_lambda_event_sources.KinesisEventSource(
65+
kinesis_stream,
66+
starting_position=aws_lambda.StartingPosition.LATEST,
67+
retry_attempts=20,
68+
batch_size=200,
69+
max_batching_window=Duration.minutes(2),
70+
)
71+
)
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,47 @@
1+
from os import path
2+
3+
from aws_cdk import ArnFormat, CfnOutput, Duration, Stack, aws_lambda
4+
from constructs import Construct
5+
6+
7+
class ExternalTestResourcesStack(Stack):
8+
def __init__(self, scope: Construct, construct_id: str, **kwargs) -> None:
9+
super().__init__(scope, construct_id, **kwargs)
10+
11+
test_logger_lambda = aws_lambda.Function(
12+
self,
13+
"test_logger",
14+
runtime=aws_lambda.Runtime.PYTHON_3_9,
15+
handler="test_logger.handler",
16+
code=aws_lambda.Code.from_asset(
17+
path.join(
18+
path.realpath(path.dirname(__file__)),
19+
"../..",
20+
"test_logger_pkg.zip",
21+
)
22+
),
23+
timeout=Duration.seconds(2),
24+
)
25+
26+
test_logger_log_group_arn = self.format_arn(
27+
resource="log-group",
28+
service="logs",
29+
account=self.account,
30+
partition=self.partition,
31+
region=self.region,
32+
arn_format=ArnFormat.COLON_RESOURCE_NAME,
33+
resource_name=f"/aws/lambda/{test_logger_lambda.function_name}:*",
34+
)
35+
36+
CfnOutput(
37+
self,
38+
"test_logger_lambda_name",
39+
value=test_logger_lambda.function_name,
40+
export_name="test-logger-lambda-name",
41+
)
42+
CfnOutput(
43+
self,
44+
"test_logger_log_group_arn",
45+
value=test_logger_log_group_arn,
46+
export_name="test-logger-log-group-arn",
47+
)

deployments/cdk/local_app.py

+36
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,36 @@
1+
import os
2+
3+
import constants
4+
from aws_cdk import App, Fn
5+
from data_pipeline_stack import DataPipelineStack
6+
from external_test_resources_stack import ExternalTestResourcesStack
7+
8+
9+
def main():
10+
port = 5111
11+
host = os.getenv("TINYBIRD_MOCK_HOST") or "host.docker.internal"
12+
tinybird_mock_path = "tinybird"
13+
tinybird_mock_url = f"http://{host}:{port}/{tinybird_mock_path}"
14+
app = App()
15+
16+
external_resources = ExternalTestResourcesStack(
17+
app,
18+
"ExternalTestResourcesStack",
19+
env=constants.ENV_LOCAL,
20+
)
21+
22+
data_pipeline = DataPipelineStack(
23+
app,
24+
"DataPipelineStack",
25+
env=constants.ENV_LOCAL,
26+
monitored_log_group_arn=Fn.import_value("test-logger-log-group-arn"),
27+
tinybird_auth_token="dummy value",
28+
tinybird_url=tinybird_mock_url,
29+
)
30+
data_pipeline.add_dependency(external_resources)
31+
32+
app.synth()
33+
34+
35+
if __name__ == "__main__":
36+
main()

lambdas/__init__.py

Whitespace-only changes.

lambdas/kinesis_tinybird_loader/__init__.py

Whitespace-only changes.
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,68 @@
1+
import base64
2+
import json
3+
import logging
4+
import os
5+
import zlib
6+
from typing import List
7+
8+
import requests
9+
10+
# AWS Lambda function for reading structured analytics events off of a Kinesis stream and sending them to Tinybird
11+
12+
log = logging.getLogger()
13+
14+
15+
def extract_analytics_events(lambda_event) -> List[str]:
16+
"""
17+
Extracts analytics events from a lambda event invocation payload.
18+
:param lambda_event: The event payload sent to the Lambda function at its execution. This payload is expected to
19+
contain Kinesis records, which in turn contain CloudWatch log messages (base64 encoded and
20+
gzipped).
21+
:return: A list of serialized analytics events, each with a newline at the end.
22+
"""
23+
analytics_events = []
24+
for record in lambda_event["Records"]:
25+
record_data = record["kinesis"]["data"]
26+
log_payload = json.loads(
27+
zlib.decompress(base64.b64decode(record_data), 16 + zlib.MAX_WBITS)
28+
)
29+
for log_event in log_payload["logEvents"]:
30+
log_message = log_event["message"]
31+
analytics_events.append(
32+
log_message + "\n" if not log_message.endswith("\n") else log_message
33+
)
34+
return analytics_events
35+
36+
37+
def load_events_to_tinybird(
38+
analytics_events: List[str], tinybird_url: str, tinybird_auth_token: str
39+
):
40+
"""
41+
Loads analytics events into a Tinybird datasource.
42+
:param analytics_events: A list of serialized, JSON-encoded analytics events. Each event is expected to end with a
43+
newline delimiter.
44+
:param tinybird_url: Complete Tinybird URL to POST events to (via the 'events' API)
45+
:param tinybird_auth_token: HTTP bearer token for Tinybird
46+
:raises RuntimeError: If Tinybird returns a non 2XX HTTP response code
47+
"""
48+
headers = {"Authorization": f"Bearer {tinybird_auth_token}"}
49+
body = "".join(analytics_events)
50+
response = requests.post(tinybird_url, data=body, headers=headers)
51+
if response.status_code > 299:
52+
log.error(
53+
f"failed to load events to Tinybird with status code {response.status_code}: {response.text}"
54+
)
55+
raise RuntimeError("failed to load events to Tinybird")
56+
log.info(response.text)
57+
58+
59+
def handler(event, context):
60+
tinybird_url = os.getenv("TINYBIRD_URL")
61+
if tinybird_url is None:
62+
raise EnvironmentError("env var TINYBIRD_URL not set")
63+
tinybird_auth_token = os.getenv("TINYBIRD_AUTH_TOKEN")
64+
if tinybird_auth_token is None:
65+
raise EnvironmentError("env var TINYBIRD_AUTH_TOKEN not set")
66+
67+
analytics_events = extract_analytics_events(event)
68+
load_events_to_tinybird(analytics_events, tinybird_url, tinybird_auth_token)
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
requests

lambdas/test_logger/test_logger.py

+9
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,9 @@
1+
import json
2+
3+
# This Lambda prints a JSON payload with some arbitrary message field attached.
4+
# It is intended for flushing structured log lines into the streaming data pipeline for testing purposes.
5+
6+
7+
def handler(event, context):
8+
message = event.get("message", None)
9+
print(json.dumps({"event_class": "test", "event_type": "test", "message": message}))

0 commit comments

Comments
 (0)