Skip to content

Commit ad26e31

Browse files
committed
add mock redis, redis validation, sentinel support
1 parent cceba13 commit ad26e31

File tree

6 files changed

+325
-14
lines changed

6 files changed

+325
-14
lines changed

Makefile

+2
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,8 @@ test-idempotency-redis:
4040
docker run --name test-idempotency-redis -d -p 63005:6379 redis
4141
poetry run pytest tests/integration/idempotency;docker stop test-idempotency-redis;docker rm test-idempotency-redis
4242

43+
44+
4345
e2e-test:
4446
python parallel_run_e2e.py
4547

aws_lambda_powertools/utilities/idempotency/exceptions.py

+6
Original file line numberDiff line numberDiff line change
@@ -71,3 +71,9 @@ class IdempotencyKeyError(BaseError):
7171
"""
7272
Payload does not contain an idempotent key
7373
"""
74+
75+
76+
class IdempotencyRedisClientConfigError(BaseError):
77+
"""
78+
Redis Client passed into persistant layer is not valid
79+
"""

aws_lambda_powertools/utilities/idempotency/persistence/redis.py

+22-7
Original file line numberDiff line numberDiff line change
@@ -1,16 +1,20 @@
11
import datetime
22
import logging
3-
from typing import Any, Dict
3+
from typing import Any, Dict, Union
44

55
try:
66
import redis # type:ignore
77
except ImportError:
88
redis = None
99

10+
11+
import redis
12+
1013
from aws_lambda_powertools.utilities.idempotency import BasePersistenceLayer
1114
from aws_lambda_powertools.utilities.idempotency.exceptions import (
1215
IdempotencyItemAlreadyExistsError,
1316
IdempotencyItemNotFoundError,
17+
IdempotencyRedisClientConfigError,
1418
)
1519
from aws_lambda_powertools.utilities.idempotency.persistence.base import (
1620
STATUS_CONSTANTS,
@@ -23,7 +27,7 @@
2327
class RedisCachePersistenceLayer(BasePersistenceLayer):
2428
def __init__(
2529
self,
26-
connection,
30+
connection: Union[redis.Redis, redis.cluster.RedisCluster],
2731
in_progress_expiry_attr: str = "in_progress_expiration",
2832
status_attr: str = "status",
2933
data_attr: str = "data",
@@ -44,6 +48,12 @@ def __init__(
4448
"""
4549

4650
# Initialize connection with Redis
51+
52+
if not hasattr(connection, "get_connection_kwargs"):
53+
raise IdempotencyRedisClientConfigError
54+
if not connection.get_connection_kwargs().get("decode_responses", False):
55+
# Requires decode_responses to be true
56+
raise IdempotencyRedisClientConfigError
4757
self._connection = connection
4858

4959
self.in_progress_expiry_attr = in_progress_expiry_attr
@@ -106,29 +116,34 @@ def _put_record(self, data_record: DataRecord) -> None:
106116
# - first time that this invocation key is used
107117
# - previous invocation with the same key was deleted due to TTL
108118
idempotency_record = self._connection.hgetall(data_record.idempotency_key)
119+
print(idempotency_record)
109120
if len(idempotency_record) > 0:
110121
# record already exists.
111122

112123
# status is completed, so raise exception because it exists and still valid
113124
if idempotency_record[self.status_attr] == STATUS_CONSTANTS["COMPLETED"]:
114-
raise
125+
raise IdempotencyItemAlreadyExistsError
115126

116127
# checking if in_progress_expiry_attr exists
117128
# if in_progress_expiry_attr exist, must be lower than now
118129
if self.in_progress_expiry_attr in idempotency_record and int(
119130
idempotency_record[self.in_progress_expiry_attr],
120131
) > int(now.timestamp() * 1000):
121-
raise
132+
raise IdempotencyItemAlreadyExistsError
122133

123134
logger.debug(f"Putting record on Redis for idempotency key: {data_record.idempotency_key}")
124135
self._connection.hset(**item)
125136
# hset type must set expiration after adding the record
126137
# Need to review this to get ttl in seconds
127138
# Q: should we replace self.expires_after_seconds with _get_expiry_timestamp? more consistent
128139
self._connection.expire(name=data_record.idempotency_key, time=self.expires_after_seconds)
129-
except Exception:
130-
logger.debug(f"Failed to put record for already existing idempotency key: {data_record.idempotency_key}")
131-
raise IdempotencyItemAlreadyExistsError
140+
except redis.exceptions.RedisError:
141+
raise redis.exceptions.RedisError
142+
except redis.exceptions.RedisClusterException:
143+
raise redis.exceptions.RedisClusterException
144+
except Exception as e:
145+
logger.debug(f"encountered non-redis exception:{e}")
146+
raise e
132147

133148
def _update_record(self, data_record: DataRecord) -> None:
134149
item: Dict[str, Any] = {}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,226 @@
1+
import copy
2+
import time as t
3+
4+
import pytest
5+
6+
from aws_lambda_powertools.utilities.idempotency import RedisCachePersistenceLayer
7+
from aws_lambda_powertools.utilities.idempotency.exceptions import (
8+
IdempotencyAlreadyInProgressError,
9+
IdempotencyItemAlreadyExistsError,
10+
IdempotencyItemNotFoundError,
11+
IdempotencyRedisClientConfigError,
12+
)
13+
from aws_lambda_powertools.utilities.idempotency.idempotency import (
14+
idempotent,
15+
idempotent_function,
16+
)
17+
18+
19+
@pytest.fixture
20+
def lambda_context():
21+
class LambdaContext:
22+
def __init__(self):
23+
self.function_name = "test-func"
24+
self.memory_limit_in_mb = 128
25+
self.invoked_function_arn = "arn:aws:lambda:eu-west-1:809313241234:function:test-func"
26+
self.aws_request_id = "52fdfc07-2182-154f-163f-5f0f9a621d72"
27+
28+
def get_remaining_time_in_millis(self) -> int:
29+
return 1000
30+
31+
return LambdaContext()
32+
33+
34+
class MockRedis:
35+
def __init__(self, decode_responses, cache, **kwargs):
36+
self.cache = cache or {}
37+
self.expire_dict = {}
38+
self.decode_responses = decode_responses
39+
self.acl = {}
40+
self.username = ""
41+
42+
def hset(self, name, mapping):
43+
self.expire_dict.pop(name, {})
44+
self.cache[name] = mapping
45+
46+
# not covered by test yet.
47+
def expire(self, name, time):
48+
self.expire_dict[name] = t.time() + time
49+
50+
# return {} if no match
51+
def hgetall(self, name):
52+
if self.expire_dict.get(name, t.time() + 1) < t.time():
53+
self.cache.pop(name, {})
54+
return self.cache.get(name, {})
55+
56+
def get_connection_kwargs(self):
57+
return {"decode_responses": self.decode_responses}
58+
59+
def auth(self, username, **kwargs):
60+
self.username = username
61+
62+
def delete(self, name):
63+
self.cache.pop(name, {})
64+
65+
66+
@pytest.fixture
67+
def persistence_store_standalone_redis():
68+
# you will need to handle yourself the connection to pass again the password
69+
# and avoid AuthenticationError at redis queries
70+
redis_client = MockRedis(
71+
host="localhost",
72+
port="63005",
73+
decode_responses=True,
74+
)
75+
return RedisCachePersistenceLayer(connection=redis_client)
76+
77+
78+
# test basic
79+
def test_idempotent_function_and_lambda_handler_redis_basic(
80+
# idempotency_config: IdempotencyConfig,
81+
persistence_store_standalone_redis: RedisCachePersistenceLayer,
82+
lambda_context,
83+
):
84+
mock_event = {"data": "value"}
85+
persistence_layer = persistence_store_standalone_redis
86+
expected_result = {"message": "Foo"}
87+
88+
@idempotent_function(persistence_store=persistence_layer, data_keyword_argument="record")
89+
def record_handler(record):
90+
return expected_result
91+
92+
@idempotent(persistence_store=persistence_layer)
93+
def lambda_handler(event, context):
94+
return expected_result
95+
96+
# WHEN calling the function
97+
fn_result = record_handler(record=mock_event)
98+
# WHEN calling lambda handler
99+
handler_result = lambda_handler(mock_event, lambda_context)
100+
# THEN we expect the function and lambda handler to execute successfully
101+
assert fn_result == expected_result
102+
assert handler_result == expected_result
103+
104+
105+
def test_idempotent_lambda_redis_no_decode():
106+
redis_client = MockRedis(
107+
host="localhost",
108+
port="63005",
109+
decode_responses=False,
110+
)
111+
# decode_responses=False will not be accepted
112+
with pytest.raises(IdempotencyRedisClientConfigError):
113+
RedisCachePersistenceLayer(connection=redis_client)
114+
115+
116+
def test_idempotent_function_and_lambda_handler_redis_cache(
117+
persistence_store_standalone_redis: RedisCachePersistenceLayer,
118+
lambda_context,
119+
):
120+
mock_event = {"data": "value2"}
121+
persistence_layer = persistence_store_standalone_redis
122+
result = {"message": "Foo"}
123+
expected_result = copy.deepcopy(result)
124+
125+
@idempotent_function(persistence_store=persistence_layer, data_keyword_argument="record")
126+
def record_handler(record):
127+
return result
128+
129+
@idempotent(persistence_store=persistence_layer)
130+
def lambda_handler(event, context):
131+
return result
132+
133+
# WHEN calling the function
134+
fn_result = record_handler(record=mock_event)
135+
# WHEN calling lambda handler
136+
handler_result = lambda_handler(mock_event, lambda_context)
137+
# THEN we expect the function and lambda handler to execute successfully
138+
assert fn_result == expected_result
139+
assert handler_result == expected_result
140+
141+
# modify the return to check if idem cache works
142+
result = {"message": "Bar"}
143+
fn_result2 = record_handler(record=mock_event)
144+
# Second time calling lambda handler, test if same result
145+
handler_result2 = lambda_handler(mock_event, lambda_context)
146+
assert fn_result2 == expected_result
147+
assert handler_result2 == expected_result
148+
149+
# modify the mock event to check if we got updated result
150+
mock_event = {"data": "value3"}
151+
fn_result3 = record_handler(record=mock_event)
152+
# thrid time calling lambda handler, test if result updated
153+
handler_result3 = lambda_handler(mock_event, lambda_context)
154+
assert fn_result3 == result
155+
assert handler_result3 == result
156+
157+
158+
# test idem-inprogress
159+
def test_idempotent_lambda_redis_in_progress(
160+
persistence_store_standalone_redis: RedisCachePersistenceLayer,
161+
lambda_context,
162+
):
163+
"""
164+
Test idempotent decorator where lambda_handler is already processing an event with matching event key
165+
"""
166+
167+
mock_event = {"data": "value4"}
168+
persistence_store = persistence_store_standalone_redis
169+
lambda_response = {"foo": "bar"}
170+
171+
@idempotent(persistence_store=persistence_store)
172+
def lambda_handler(event, context):
173+
return lambda_response
174+
175+
# register the context first
176+
lambda_handler(mock_event, lambda_context)
177+
# save additional to in_progress
178+
mock_event = {"data": "value7"}
179+
try:
180+
persistence_store.save_inprogress(mock_event, 1000)
181+
except IdempotencyItemAlreadyExistsError:
182+
pass
183+
184+
with pytest.raises(IdempotencyAlreadyInProgressError):
185+
lambda_handler(mock_event, lambda_context)
186+
187+
188+
# test -remove
189+
def test_idempotent_lambda_redis_delete(
190+
persistence_store_standalone_redis: RedisCachePersistenceLayer,
191+
lambda_context,
192+
):
193+
mock_event = {"data": "test_delete"}
194+
persistence_layer = persistence_store_standalone_redis
195+
result = {"message": "Foo"}
196+
197+
@idempotent(persistence_store=persistence_layer)
198+
def lambda_handler(event, _):
199+
return result
200+
201+
handler_result = lambda_handler(mock_event, lambda_context)
202+
assert handler_result == result
203+
204+
# delete the idem and handler should output new result
205+
persistence_layer.delete_record(mock_event, IdempotencyItemNotFoundError)
206+
result = {"message": "Foo2"}
207+
handler_result2 = lambda_handler(mock_event, lambda_context)
208+
assert handler_result2 == result
209+
210+
211+
"""def test_idempotent_lambda_redis_credential(lambda_context):
212+
redis_client = MockRedis(
213+
host='localhost',
214+
port='63005',
215+
decode_responses=True,
216+
)
217+
pwd = "terriblePassword"
218+
usr = "test_acl_denial"
219+
redis_client.acl_setuser(username=usr, enabled=True, passwords="+"+pwd,keys='*',commands=['+hgetall','-set'])
220+
redis_client.auth(password=pwd,username=usr)
221+
222+
@idempotent(persistence_store=RedisCachePersistenceLayer(connection=redis_client))
223+
def lambda_handler(event, _):
224+
return True
225+
with pytest.raises(IdempotencyPersistenceLayerError):
226+
handler_result = lambda_handler("test_Acl", lambda_context)"""
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,7 @@
1+
docker run --name redis_master -p 6379:6379 -d redis
2+
docker run --name redis_slave_1 -p 6380:6380 --link redis_master:redis_master -d redis redis-server --slaveof redis_master 6379
3+
docker run --name redis_slave_2 -p 6381:6381 --link redis_master:redis_master -d redis redis-server --slaveof redis_master 6379
4+
docker run --name redis_slave_3 -p 6382:6382 --link redis_master:redis_master -d redis redis-server --slaveof redis_master 6379
5+
docker run --name redis_sentinel_1 -d -e REDIS_MASTER_HOST=redis_master -e REDIS_SENTINEL_PORT_NUMBER=26379 -e REDIS_SENTINEL_QUORUM=2 -p 26379:26379 --link redis_master:redis_master bitnami/redis-sentinel:latest
6+
docker run --name redis_sentinel_2 -d -e REDIS_MASTER_HOST=redis_master -e REDIS_SENTINEL_PORT_NUMBER=26380 -e REDIS_SENTINEL_QUORUM=2 -p 26380:26380 --link redis_master:redis_master bitnami/redis-sentinel:latest
7+
docker run --name redis_sentinel_3 -d -e REDIS_MASTER_HOST=redis_master -e REDIS_SENTINEL_PORT_NUMBER=26381 -e REDIS_SENTINEL_QUORUM=2 -p 26381:26381 --link redis_master:redis_master bitnami/redis-sentinel:latest

0 commit comments

Comments
 (0)