Skip to content

feat: Advanced parser utility (pydantic) #118

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 21 commits into from
Oct 2, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
21 commits
Select commit Hold shift + click to select a range
913e310
feat: RFC: Validate incoming and outgoing events utility #95
Aug 16, 2020
3f9865a
Merge branch 'develop' into pydantic
heitorlessa Aug 23, 2020
7c55154
Merge branch 'develop' into pydantic
heitorlessa Aug 26, 2020
d50e261
improv: refactor structure to fit with utilities
heitorlessa Aug 26, 2020
bce7aab
added SQS schema & tests and sns skeleton
Aug 26, 2020
dc64b8a
Add validate function, fix flake8 issues
Aug 26, 2020
637a696
refactor: change to advanced parser
Sep 21, 2020
f986512
Merge branch 'develop' of github.com:risenberg-cyberark/aws-lambda-po…
Sep 21, 2020
3418767
refactor: pydantic as optional dependancy, remove lambdaContext
Sep 21, 2020
47cd711
feat: Advanced parser utility (pydantic)
Sep 22, 2020
b7cb539
Merge branch 'develop' of github.com:risenberg-cyberark/aws-lambda-po…
Sep 24, 2020
0edaf9a
Merge branch 'develop' of github.com:risenberg-cyberark/aws-lambda-po…
Sep 24, 2020
6ae1769
fix: add only pydantic (+1 squashed commit)
Sep 24, 2020
19a597f
Revert "fix: remove jmespath extras in Make"
heitorlessa Sep 25, 2020
57b6d23
poetry update (+2 squashed commits)
heitorlessa Sep 25, 2020
ad80cd3
chore: remove kitchen sink example
heitorlessa Sep 25, 2020
f1d39e1
chore: remove dev deps from example project
heitorlessa Sep 25, 2020
38e1582
fix: poetry update + pydantic, typing_extensions as optional
Sep 25, 2020
b1b7fb3
Merge branch 'develop' into pydantic
ran-isenberg Sep 25, 2020
a47056f
fix: reduce complexity of dynamo envelope
Sep 25, 2020
10d0079
fix: CR fixes
Oct 2, 2020
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ target:

dev:
pip install --upgrade pip poetry pre-commit
poetry install
poetry install --extras "pydantic"
pre-commit install

dev-docs:
Expand Down
6 changes: 6 additions & 0 deletions aws_lambda_powertools/utilities/advanced_parser/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
"""Advanced parser utility
"""
from .envelopes import Envelope, InvalidEnvelopeError, parse_envelope
from .parser import parser

__all__ = ["InvalidEnvelopeError", "Envelope", "parse_envelope", "parser"]
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
from .envelopes import Envelope, InvalidEnvelopeError, parse_envelope

__all__ = ["InvalidEnvelopeError", "Envelope", "parse_envelope"]
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
import logging
from abc import ABC, abstractmethod
from typing import Any, Dict

from pydantic import BaseModel, ValidationError

logger = logging.getLogger(__name__)


class BaseEnvelope(ABC):
def _parse_user_dict_schema(self, user_event: Dict[str, Any], schema: BaseModel) -> Any:
if user_event is None:
return None
logger.debug("parsing user dictionary schema")
try:
return schema(**user_event)
except (ValidationError, TypeError):
logger.exception("Validation exception while extracting user custom schema")
raise

def _parse_user_json_string_schema(self, user_event: str, schema: BaseModel) -> Any:
if user_event is None:
return None
# this is used in cases where the underlying schema is not a Dict that can be parsed as baseModel
# but a plain string i.e SQS has plain string payload
if schema == str:
logger.debug("input is string, returning")
return user_event
logger.debug("trying to parse as json encoded string")
try:
return schema.parse_raw(user_event)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

docs: could you add a comment as a context for parse_raw for non-pydantic maintainers?

except (ValidationError, TypeError):
logger.exception("Validation exception while extracting user custom schema")
raise

@abstractmethod
def parse(self, event: Dict[str, Any], schema: BaseModel):
return NotImplemented
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
import logging
from typing import Any, Dict, List
from typing_extensions import Literal

from pydantic import BaseModel, ValidationError

from aws_lambda_powertools.utilities.advanced_parser.envelopes.base import BaseEnvelope
from aws_lambda_powertools.utilities.advanced_parser.schemas import DynamoDBSchema

logger = logging.getLogger(__name__)


# returns a List of dictionaries which each contains two keys, "NewImage" and "OldImage".
# The values are the parsed schema models. The images' values can also be None.
# Length of the list is the record's amount in the original event.
class DynamoDBEnvelope(BaseEnvelope):
def parse(self, event: Dict[str, Any], schema: BaseModel) -> List[Dict[Literal["NewImage", "OldImage"], BaseModel]]:
try:
parsed_envelope = DynamoDBSchema(**event)
except (ValidationError, TypeError):
logger.exception("Validation exception received from input dynamodb stream event")
raise
output = []
for record in parsed_envelope.Records:
output.append(
{
"NewImage": self._parse_user_dict_schema(record.dynamodb.NewImage, schema),
"OldImage": self._parse_user_dict_schema(record.dynamodb.OldImage, schema),
}
)
return output
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
import logging
from enum import Enum
from typing import Any, Dict

from pydantic import BaseModel

from aws_lambda_powertools.utilities.advanced_parser.envelopes.base import BaseEnvelope
from aws_lambda_powertools.utilities.advanced_parser.envelopes.dynamodb import DynamoDBEnvelope
from aws_lambda_powertools.utilities.advanced_parser.envelopes.event_bridge import EventBridgeEnvelope
from aws_lambda_powertools.utilities.advanced_parser.envelopes.sqs import SqsEnvelope

logger = logging.getLogger(__name__)


"""Built-in envelopes"""


class Envelope(str, Enum):
SQS = "sqs"
EVENTBRIDGE = "eventbridge"
DYNAMODB_STREAM = "dynamodb_stream"


class InvalidEnvelopeError(Exception):
"""Input envelope is not one of the Envelope enum values"""


# enum to BaseEnvelope handler class
__ENVELOPE_MAPPING = {
Envelope.SQS: SqsEnvelope,
Envelope.DYNAMODB_STREAM: DynamoDBEnvelope,
Envelope.EVENTBRIDGE: EventBridgeEnvelope,
}


def parse_envelope(event: Dict[str, Any], envelope: Envelope, schema: BaseModel):
envelope_handler: BaseEnvelope = __ENVELOPE_MAPPING.get(envelope)
if envelope_handler is None:
logger.exception("envelope must be an instance of Envelope enum")
raise InvalidEnvelopeError("envelope must be an instance of Envelope enum")
logger.debug(f"Parsing and validating event schema, envelope={str(envelope.value)}")
return envelope_handler().parse(event=event, schema=schema)
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
import logging
from typing import Any, Dict

from pydantic import BaseModel, ValidationError

from aws_lambda_powertools.utilities.advanced_parser.envelopes.base import BaseEnvelope
from aws_lambda_powertools.utilities.advanced_parser.schemas import EventBridgeSchema

logger = logging.getLogger(__name__)


# returns a parsed BaseModel object according to schema type
class EventBridgeEnvelope(BaseEnvelope):
def parse(self, event: Dict[str, Any], schema: BaseModel) -> BaseModel:
try:
parsed_envelope = EventBridgeSchema(**event)
except (ValidationError, TypeError):
logger.exception("Validation exception received from input eventbridge event")
raise
return self._parse_user_dict_schema(parsed_envelope.detail, schema)
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
import logging
from typing import Any, Dict, List, Union

from pydantic import BaseModel, ValidationError

from aws_lambda_powertools.utilities.advanced_parser.envelopes.base import BaseEnvelope
from aws_lambda_powertools.utilities.advanced_parser.schemas import SqsSchema

logger = logging.getLogger(__name__)


# returns a list of parsed schemas of type BaseModel or plain string.
# The record's body parameter is a string. However, it can also be a JSON encoded string which
# can then be parsed into a BaseModel object.
# Note that all records will be parsed the same way so if schema is str,
# all the items in the list will be parsed as str and npt as JSON (and vice versa).
class SqsEnvelope(BaseEnvelope):
def parse(self, event: Dict[str, Any], schema: Union[BaseModel, str]) -> List[Union[BaseModel, str]]:
try:
parsed_envelope = SqsSchema(**event)
except (ValidationError, TypeError):
logger.exception("Validation exception received from input sqs event")
raise
output = []
for record in parsed_envelope.Records:
output.append(self._parse_user_json_string_schema(record.body, schema))
return output
68 changes: 68 additions & 0 deletions aws_lambda_powertools/utilities/advanced_parser/parser.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
import logging
from typing import Any, Callable, Dict, Optional

from pydantic import BaseModel, ValidationError

from aws_lambda_powertools.middleware_factory import lambda_handler_decorator
from aws_lambda_powertools.utilities.advanced_parser.envelopes import Envelope, parse_envelope

logger = logging.getLogger(__name__)


@lambda_handler_decorator
def parser(
handler: Callable[[Dict, Any], Any],
event: Dict[str, Any],
context: Dict[str, Any],
schema: BaseModel,
envelope: Optional[Envelope] = None,
) -> Any:
"""Decorator to conduct advanced parsing & validation for lambda handlers events

As Lambda follows (event, context) signature we can remove some of the boilerplate
and also capture any exception any Lambda function throws as metadata.
event will be the parsed and passed as a BaseModel pydantic class of the input type "schema"
to the lambda handler.
event will be extracted from the envelope in case envelope is not None.
In case envelope is None, the complete event is parsed to match the schema parameter BaseModel definition.
In case envelope is not None, first the event is parsed as the envelope's schema definition, and the user
message is extracted and parsed again as the schema parameter's definition.

Example
-------
**Lambda function using validation decorator**

@parser(schema=MyBusiness, envelope=envelopes.EVENTBRIDGE)
def handler(event: MyBusiness , context: LambdaContext):
...

Parameters
----------
handler: input for lambda_handler_decorator, wraps the handler lambda
event: AWS event dictionary
context: AWS lambda context
schema: pydantic BaseModel class. This is the user data schema that will replace the event.
event parameter will be parsed and a new schema object will be created from it.
envelope: what envelope to extract the schema from, can be any AWS service that is currently
supported in the envelopes module. Can be None.

Raises
------
err
TypeError - in case event is None
pydantic.ValidationError - event fails validation, either of the envelope
"""
lambda_handler_name = handler.__name__
parsed_event = None
if envelope is None:
try:
logger.debug("Parsing and validating event schema, no envelope is used")
parsed_event = schema(**event)
except (ValidationError, TypeError):
logger.exception("Validation exception received from input event")
raise
else:
parsed_event = parse_envelope(event, envelope, schema)

logger.debug(f"Calling handler {lambda_handler_name}")
return handler(parsed_event, context)
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
from .dynamodb import DynamoDBSchema, DynamoRecordSchema, DynamoScheme
from .event_bridge import EventBridgeSchema
from .sqs import SqsRecordSchema, SqsSchema

__all__ = [
"DynamoDBSchema",
"EventBridgeSchema",
"DynamoScheme",
"DynamoRecordSchema",
"SqsSchema",
"SqsRecordSchema",
]
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
from datetime import date
from typing import Any, Dict, List, Optional

from pydantic import BaseModel, root_validator
from typing_extensions import Literal


class DynamoScheme(BaseModel):
ApproximateCreationDateTime: Optional[date]
Keys: Dict[str, Dict[str, Any]]
NewImage: Optional[Dict[str, Any]]
OldImage: Optional[Dict[str, Any]]
SequenceNumber: str
SizeBytes: int
StreamViewType: Literal["NEW_AND_OLD_IMAGES", "KEYS_ONLY", "NEW_IMAGE", "OLD_IMAGE"]

# since both images are optional, they can both be None. However, at least one must
# exist in a legal schema of NEW_AND_OLD_IMAGES type
@root_validator
def check_one_image_exists(cls, values):
newimg, oldimg = values.get("NewImage"), values.get("OldImage")
stream_type = values.get("StreamViewType")
if stream_type == "NEW_AND_OLD_IMAGES" and not newimg and not oldimg:
raise TypeError("DynamoDB streams schema failed validation, missing both new & old stream images")
return values


class UserIdentity(BaseModel):
type: Literal["Service"] # noqa: VNE003, A003
principalId: Literal["dynamodb.amazonaws.com"]


class DynamoRecordSchema(BaseModel):
eventID: str
eventName: Literal["INSERT", "MODIFY", "REMOVE"]
eventVersion: float
eventSource: Literal["aws:dynamodb"]
awsRegion: str
eventSourceARN: str
dynamodb: DynamoScheme
userIdentity: Optional[UserIdentity]


class DynamoDBSchema(BaseModel):
Records: List[DynamoRecordSchema]
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
from datetime import datetime
from typing import Any, Dict, List

from pydantic import BaseModel, Field


class EventBridgeSchema(BaseModel):
version: str
id: str # noqa: A003,VNE003
source: str
account: str
time: datetime
region: str
resources: List[str]
detailtype: str = Field(None, alias="detail-type")
detail: Dict[str, Any]
65 changes: 65 additions & 0 deletions aws_lambda_powertools/utilities/advanced_parser/schemas/sqs.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
import re
from datetime import datetime
from typing import Dict, List, Optional

from pydantic import BaseModel, root_validator, validator
from typing_extensions import Literal


class SqsAttributesSchema(BaseModel):
ApproximateReceiveCount: str
ApproximateFirstReceiveTimestamp: datetime
MessageDeduplicationId: Optional[str]
MessageGroupId: Optional[str]
SenderId: str
SentTimestamp: datetime
SequenceNumber: Optional[str]
AWSTraceHeader: Optional[str]


class SqsMsgAttributeSchema(BaseModel):
stringValue: Optional[str]
binaryValue: Optional[str]
stringListValues: List[str] = []
binaryListValues: List[str] = []
dataType: str

# Amazon SQS supports the logical data types String, Number, and Binary with optional custom data type
# labels with the format .custom-data-type.
# https://docs.aws.amazon.com/AWSSimpleQueueService/latest/SQSDeveloperGuide/sqs-message-metadata.html#sqs-message-attributes
@validator("dataType")
def valid_type(cls, v): # noqa: VNE001
pattern = re.compile("Number.*|String.*|Binary.*")
if not pattern.match(v):
raise TypeError("data type is invalid")
return v

# validate that dataType and value are not None and match
@root_validator
def check_str_and_binary_values(cls, values):
binary_val, str_val = values.get("binaryValue", ""), values.get("stringValue", "")
dataType = values.get("dataType")
if not str_val and not binary_val:
raise TypeError("both binaryValue and stringValue are missing")
if dataType.startswith("Binary") and not binary_val:
raise TypeError("binaryValue is missing")
if (dataType.startswith("String") or dataType.startswith("Number")) and not str_val:
raise TypeError("stringValue is missing")
return values


class SqsRecordSchema(BaseModel):
messageId: str
receiptHandle: str
body: str
attributes: SqsAttributesSchema
messageAttributes: Dict[str, SqsMsgAttributeSchema]
md5OfBody: str
md5OfMessageAttributes: Optional[str]
eventSource: Literal["aws:sqs"]
eventSourceARN: str
awsRegion: str


class SqsSchema(BaseModel):
Records: List[SqsRecordSchema]
Loading