Skip to content

feat(idempotency): support dataclasses & pydantic models payloads #908

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
Merged
Show file tree
Hide file tree
Changes from 15 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 18 additions & 1 deletion aws_lambda_powertools/utilities/idempotency/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,23 @@
logger = logging.getLogger(__name__)


def _prepare_data(data: Any) -> Any:
"""Prepare data for json serialization.

We will convert Python dataclasses, pydantic models or event source data classes to a dict,
otherwise return data as-is.
"""
if hasattr(data, "__dataclass_fields__"):
import dataclasses

return dataclasses.asdict(data)

if callable(getattr(data, "dict", None)):
return data.dict()

return getattr(data, "raw_event", data)


class IdempotencyHandler:
"""
Base class to orchestrate calls to persistence layer.
Expand Down Expand Up @@ -52,7 +69,7 @@ def __init__(
Function keyword arguments
"""
self.function = function
self.data = function_payload
self.data = _prepare_data(function_payload)
self.fn_args = function_args
self.fn_kwargs = function_kwargs

Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
"""
Persistence layers supporting idempotency
"""

import datetime
import hashlib
import json
Expand Down Expand Up @@ -226,7 +225,6 @@ def _generate_hash(self, data: Any) -> str:
Hashed representation of the provided data

"""
data = getattr(data, "raw_event", data) # could be a data class depending on decorator order
hashed_data = self.hash_function(json.dumps(data, cls=Encoder, sort_keys=True).encode())
return hashed_data.hexdigest()

Expand Down
112 changes: 95 additions & 17 deletions docs/utilities/idempotency.md
Original file line number Diff line number Diff line change
Expand Up @@ -124,45 +124,50 @@ You can quickly start by initializing the `DynamoDBPersistenceLayer` class and u

Similar to [idempotent decorator](#idempotent-decorator), you can use `idempotent_function` decorator for any synchronous Python function.

When using `idempotent_function`, you must tell us which keyword parameter in your function signature has the data we should use via **`data_keyword_argument`** - Such data must be JSON serializable.
When using `idempotent_function`, you must tell us which keyword parameter in your function signature has the data we should use via **`data_keyword_argument`**.

!!! info "We support JSON serializable data, [Python Dataclasses](https://docs.python.org/3.7/library/dataclasses.html){target="_blank"}, [Parser/Pydantic Models](parser.md){target="_blank"}, and our [Event Source Data Classes](./data_classes.md){target="_blank"}."

!!! warning "Make sure to call your decorated function using keyword arguments"

=== "app.py"
=== "batch_sample.py"

This example also demonstrates how you can integrate with [Batch utility](batch.md), so you can process each record in an idempotent manner.

```python hl_lines="4 13 18 25"
import uuid

from aws_lambda_powertools.utilities.batch import sqs_batch_processor
from aws_lambda_powertools.utilities.idempotency import idempotent_function, DynamoDBPersistenceLayer, IdempotencyConfig
```python hl_lines="4-5 16 21 29"
from aws_lambda_powertools.utilities.batch import (BatchProcessor, EventType,
batch_processor)
from aws_lambda_powertools.utilities.data_classes.sqs_event import SQSRecord
from aws_lambda_powertools.utilities.idempotency import (
DynamoDBPersistenceLayer, IdempotencyConfig, idempotent_function)


processor = BatchProcessor(event_type=EventType.SQS)
dynamodb = DynamoDBPersistenceLayer(table_name="idem")
config = IdempotencyConfig(
event_key_jmespath="messageId", # see "Choosing a payload subset for idempotency" section
event_key_jmespath="messageId", # see Choosing a payload subset section
use_local_cache=True,
)

@idempotent_function(data_keyword_argument="data", config=config, persistence_store=dynamodb)
def dummy(arg_one, arg_two, data: dict, **kwargs):
return {"data": data}


@idempotent_function(data_keyword_argument="record", config=config, persistence_store=dynamodb)
def record_handler(record):
def record_handler(record: SQSRecord):
return {"message": record["body"]}


@sqs_batch_processor(record_handler=record_handler)
@idempotent_function(data_keyword_argument="data", config=config, persistence_store=dynamodb)
def dummy(arg_one, arg_two, data: dict, **kwargs):
return {"data": data}


@batch_processor(record_handler=record_handler, processor=processor)
def lambda_handler(event, context):
# `data` parameter must be called as a keyword argument to work
dummy("hello", "universe", data="test")
return {"statusCode": 200}
return processor.response()
```

=== "Example event"
=== "Batch event"

```json hl_lines="4"
{
Expand Down Expand Up @@ -193,6 +198,79 @@ When using `idempotent_function`, you must tell us which keyword parameter in yo
}
```

=== "dataclass_sample.py"

```python hl_lines="3-4 23 32"
from dataclasses import dataclass

from aws_lambda_powertools.utilities.idempotency import (
DynamoDBPersistenceLayer, IdempotencyConfig, idempotent_function)

dynamodb = DynamoDBPersistenceLayer(table_name="idem")
config = IdempotencyConfig(
event_key_jmespath="messageId", # see Choosing a payload subset section
use_local_cache=True,
)

@dataclass
class OrderItem:
sku: str
description: str

@dataclass
class Order:
item: OrderItem
order_id: int


@idempotent_function(data_keyword_argument="order", config=config, persistence_store=dynamodb)
def process_order(order: Order):
return f"processed order {order.order_id}"


order_item = OrderItem(sku="fake", description="sample")
order = Order(item=order_item, order_id="fake-id")

# `order` parameter must be called as a keyword argument to work
process_order(order=order)
```

=== "parser_pydantic_sample.py"

```python hl_lines="1-2 22 31"
from aws_lambda_powertools.utilities.idempotency import (
DynamoDBPersistenceLayer, IdempotencyConfig, idempotent_function)
from aws_lambda_powertools.utilities.parser import BaseModel

dynamodb = DynamoDBPersistenceLayer(table_name="idem")
config = IdempotencyConfig(
event_key_jmespath="messageId", # see Choosing a payload subset section
use_local_cache=True,
)


class OrderItem(BaseModel):
sku: str
description: str


class Order(BaseModel):
item: OrderItem
order_id: int


@idempotent_function(data_keyword_argument="order", config=config, persistence_store=dynamodb)
def process_order(order: Order):
return f"processed order {order.order_id}"


order_item = OrderItem(sku="fake", description="sample")
order = Order(item=order_item, order_id="fake-id")

# `order` parameter must be called as a keyword argument to work
process_order(order=order)
```

### Choosing a payload subset for idempotency

!!! tip "Dealing with always changing payloads"
Expand All @@ -209,7 +287,7 @@ Imagine the function executes successfully, but the client never receives the re
!!! warning "Idempotency for JSON payloads"
The payload extracted by the `event_key_jmespath` is treated as a string by default, so will be sensitive to differences in whitespace even when the JSON payload itself is identical.

To alter this behaviour, we can use the [JMESPath built-in function](jmespath_functions.md#powertools_json-function) `powertools_json()` to treat the payload as a JSON object rather than a string.
To alter this behaviour, we can use the [JMESPath built-in function](jmespath_functions.md#powertools_json-function) `powertools_json()` to treat the payload as a JSON object (dict) rather than a string.

=== "payment.py"

Expand Down
18 changes: 6 additions & 12 deletions tests/functional/idempotency/conftest.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
import datetime
import hashlib
import json
from collections import namedtuple
from decimal import Decimal
Expand All @@ -11,20 +10,15 @@
from botocore.config import Config
from jmespath import functions

from aws_lambda_powertools.shared.json_encoder import Encoder
from aws_lambda_powertools.utilities.idempotency import DynamoDBPersistenceLayer
from aws_lambda_powertools.utilities.idempotency.idempotency import IdempotencyConfig
from aws_lambda_powertools.utilities.jmespath_utils import extract_data_from_envelope
from aws_lambda_powertools.utilities.validation import envelopes
from tests.functional.utils import load_event
from tests.functional.utils import hash_idempotency_key, json_serialize, load_event

TABLE_NAME = "TEST_TABLE"


def serialize(data):
return json.dumps(data, sort_keys=True, cls=Encoder)


@pytest.fixture(scope="module")
def config() -> Config:
return Config(region_name="us-east-1")
Expand Down Expand Up @@ -66,12 +60,12 @@ def lambda_response():

@pytest.fixture(scope="module")
def serialized_lambda_response(lambda_response):
return serialize(lambda_response)
return json_serialize(lambda_response)


@pytest.fixture(scope="module")
def deserialized_lambda_response(lambda_response):
return json.loads(serialize(lambda_response))
return json.loads(json_serialize(lambda_response))


@pytest.fixture
Expand Down Expand Up @@ -150,20 +144,20 @@ def expected_params_put_item_with_validation(hashed_idempotency_key, hashed_vali
def hashed_idempotency_key(lambda_apigw_event, default_jmespath, lambda_context):
compiled_jmespath = jmespath.compile(default_jmespath)
data = compiled_jmespath.search(lambda_apigw_event)
return "test-func.lambda_handler#" + hashlib.md5(serialize(data).encode()).hexdigest()
return "test-func.lambda_handler#" + hash_idempotency_key(data)


@pytest.fixture
def hashed_idempotency_key_with_envelope(lambda_apigw_event):
event = extract_data_from_envelope(
data=lambda_apigw_event, envelope=envelopes.API_GATEWAY_HTTP, jmespath_options={}
)
return "test-func.lambda_handler#" + hashlib.md5(serialize(event).encode()).hexdigest()
return "test-func.lambda_handler#" + hash_idempotency_key(event)


@pytest.fixture
def hashed_validation_key(lambda_apigw_event):
return hashlib.md5(serialize(lambda_apigw_event["requestContext"]).encode()).hexdigest()
return hash_idempotency_key(lambda_apigw_event["requestContext"])


@pytest.fixture
Expand Down
Loading