diff --git a/aws_lambda_powertools/utilities/data_masking/base.py b/aws_lambda_powertools/utilities/data_masking/base.py index c5f49cfade8..c5fd6f274e7 100644 --- a/aws_lambda_powertools/utilities/data_masking/base.py +++ b/aws_lambda_powertools/utilities/data_masking/base.py @@ -9,7 +9,8 @@ import functools import logging import warnings -from typing import TYPE_CHECKING, Any, Callable, Mapping, Sequence, overload +from copy import deepcopy +from typing import TYPE_CHECKING, Any, Callable, Mapping, Sequence from jsonpath_ng.ext import parse @@ -18,6 +19,7 @@ DataMaskingUnsupportedTypeError, ) from aws_lambda_powertools.utilities.data_masking.provider import BaseProvider +from aws_lambda_powertools.warnings import PowertoolsUserWarning if TYPE_CHECKING: from numbers import Number @@ -67,11 +69,39 @@ def encrypt( provider_options: dict | None = None, **encryption_context: str, ) -> str: + """ + Encrypt data using the configured encryption provider. + + Parameters + ---------- + data : dict, Mapping, Sequence, or Number + The data to encrypt. + provider_options : dict, optional + Provider-specific options for encryption. + **encryption_context : str + Additional key-value pairs for encryption context. + + Returns + ------- + str + The encrypted data as a base64-encoded string. + + Example + -------- + + encryption_provider = AWSEncryptionSDKProvider(keys=[KMS_KEY_ARN]) + data_masker = DataMasking(provider=encryption_provider) + encrypted = data_masker.encrypt({"secret": "value"}) + """ return self._apply_action( data=data, fields=None, action=self.provider.encrypt, provider_options=provider_options or {}, + dynamic_mask=None, + custom_mask=None, + regex_pattern=None, + mask_format=None, **encryption_context, ) @@ -81,28 +111,91 @@ def decrypt( provider_options: dict | None = None, **encryption_context: str, ) -> Any: + """ + Decrypt data using the configured encryption provider. + + Parameters + ---------- + data : dict, Mapping, Sequence, or Number + The data to encrypt. + provider_options : dict, optional + Provider-specific options for encryption. + **encryption_context : str + Additional key-value pairs for encryption context. + + Returns + ------- + str + The encrypted data as a base64-encoded string. + + Example + -------- + + encryption_provider = AWSEncryptionSDKProvider(keys=[KMS_KEY_ARN]) + data_masker = DataMasking(provider=encryption_provider) + encrypted = data_masker.decrypt(encrypted_data) + """ + return self._apply_action( data=data, fields=None, action=self.provider.decrypt, provider_options=provider_options or {}, + dynamic_mask=None, + custom_mask=None, + regex_pattern=None, + mask_format=None, **encryption_context, ) - @overload - def erase(self, data, fields: None) -> str: ... - - @overload - def erase(self, data: list, fields: list[str]) -> list[str]: ... - - @overload - def erase(self, data: tuple, fields: list[str]) -> tuple[str]: ... + def erase( + self, + data: Any, + fields: list[str] | None = None, + *, + dynamic_mask: bool | None = None, + custom_mask: str | None = None, + regex_pattern: str | None = None, + mask_format: str | None = None, + masking_rules: dict | None = None, + ) -> Any: + """ + Erase or mask sensitive data in the input. - @overload - def erase(self, data: dict, fields: list[str]) -> dict: ... + Parameters + ---------- + data : Any + The data to be erased or masked. + fields : list of str, optional + List of field names to be erased or masked. + dynamic_mask : bool, optional + Whether to use dynamic masking. + custom_mask : str, optional + Custom mask to apply instead of the default. + regex_pattern : str, optional + Regular expression pattern for identifying data to mask. + mask_format : str, optional + Format string for the mask. + masking_rules : dict, optional + Dictionary of custom masking rules. - def erase(self, data: Sequence | Mapping, fields: list[str] | None = None) -> str | list[str] | tuple[str] | dict: - return self._apply_action(data=data, fields=fields, action=self.provider.erase) + Returns + ------- + Any + The data with sensitive information erased or masked. + """ + if masking_rules: + return self._apply_masking_rules(data=data, masking_rules=masking_rules) + else: + return self._apply_action( + data=data, + fields=fields, + action=self.provider.erase, + dynamic_mask=dynamic_mask, + custom_mask=custom_mask, + regex_pattern=regex_pattern, + mask_format=mask_format, + ) def _apply_action( self, @@ -110,8 +203,12 @@ def _apply_action( fields: list[str] | None, action: Callable, provider_options: dict | None = None, - **encryption_context: str, - ): + dynamic_mask: bool | None = None, + custom_mask: str | None = None, + regex_pattern: str | None = None, + mask_format: str | None = None, + **kwargs: Any, + ) -> Any: """ Helper method to determine whether to apply a given action to the entire input data or to specific fields if the 'fields' argument is specified. @@ -127,8 +224,6 @@ def _apply_action( and returns the modified value. provider_options : dict Provider specific keyword arguments to propagate; used as an escape hatch. - encryption_context: str - Encryption context to use in encrypt and decrypt operations. Returns ------- @@ -143,11 +238,23 @@ def _apply_action( fields=fields, action=action, provider_options=provider_options, - **encryption_context, + dynamic_mask=dynamic_mask, + custom_mask=custom_mask, + regex_pattern=regex_pattern, + mask_format=mask_format, + **kwargs, ) else: logger.debug(f"Running action {action.__name__} with the entire data") - return action(data=data, provider_options=provider_options, **encryption_context) + return action( + data=data, + provider_options=provider_options, + dynamic_mask=dynamic_mask, + custom_mask=custom_mask, + regex_pattern=regex_pattern, + mask_format=mask_format, + **kwargs, + ) def _apply_action_to_fields( self, @@ -155,6 +262,10 @@ def _apply_action_to_fields( fields: list, action: Callable, provider_options: dict | None = None, + dynamic_mask: bool | None = None, + custom_mask: str | None = None, + regex_pattern: str | None = None, + mask_format: str | None = None, **encryption_context: str, ) -> dict | str: """ @@ -201,8 +312,10 @@ def _apply_action_to_fields( new_dict = {'a': {'b': {'c': '*****'}}, 'x': {'y': '*****'}} ``` """ + if not fields: + raise ValueError("Fields parameter cannot be empty") - data_parsed: dict = self._normalize_data_to_parse(fields, data) + data_parsed: dict = self._normalize_data_to_parse(data) # For in-place updates, json_parse accepts a callback function # this function must receive 3 args: field_value, fields, field_name @@ -211,6 +324,10 @@ def _apply_action_to_fields( self._call_action, action=action, provider_options=provider_options, + dynamic_mask=dynamic_mask, + custom_mask=custom_mask, + regex_pattern=regex_pattern, + mask_format=mask_format, **encryption_context, # type: ignore[arg-type] ) @@ -232,12 +349,6 @@ def _apply_action_to_fields( # For in-place updates, json_parse accepts a callback function # that receives 3 args: field_value, fields, field_name # We create a partial callback to pre-populate known provider options (action, provider opts, enc ctx) - update_callback = functools.partial( - self._call_action, - action=action, - provider_options=provider_options, - **encryption_context, # type: ignore[arg-type] - ) json_parse.update( data_parsed, @@ -246,6 +357,59 @@ def _apply_action_to_fields( return data_parsed + def _apply_masking_rules(self, data: dict, masking_rules: dict) -> dict: + """ + Apply masking rules to data, supporting both simple field names and complex path expressions. + + Args: + data: The dictionary containing data to mask + masking_rules: Dictionary mapping field names or path expressions to masking rules + + Returns: + dict: The masked data dictionary + """ + result = deepcopy(data) + + for path, rule in masking_rules.items(): + try: + jsonpath_expr = parse(f"$.{path}") + matches = jsonpath_expr.find(result) + + if not matches: + warnings.warn(f"No matches found for path: {path}", stacklevel=2) + continue + + for match in matches: + try: + value = match.value + if value is not None: + masked_value = self.provider.erase(str(value), **rule) + match.full_path.update(result, masked_value) + + except Exception as e: + warnings.warn( + f"Error masking value for path {path}: {str(e)}", + category=PowertoolsUserWarning, + stacklevel=2, + ) + continue + + except Exception as e: + warnings.warn(f"Error processing path {path}: {str(e)}", category=PowertoolsUserWarning, stacklevel=2) + continue + + return result + + def _mask_nested_field(self, data: dict, field_path: str, mask_function): + keys = field_path.split(".") + current = data + for key in keys[:-1]: + current = current.get(key, {}) + if not isinstance(current, dict): + return + if keys[-1] in current: + current[keys[-1]] = self.provider.erase(current[keys[-1]], **mask_function) + @staticmethod def _call_action( field_value: Any, @@ -253,6 +417,10 @@ def _call_action( field_name: str, action: Callable, provider_options: dict[str, Any] | None = None, + dynamic_mask: bool | None = None, + custom_mask: str | None = None, + regex_pattern: str | None = None, + mask_format: str | None = None, **encryption_context, ) -> None: """ @@ -270,13 +438,18 @@ def _call_action( Returns: - fields[field_name]: Returns the processed field value """ - fields[field_name] = action(field_value, provider_options=provider_options, **encryption_context) + fields[field_name] = action( + field_value, + provider_options=provider_options, + dynamic_mask=dynamic_mask, + custom_mask=custom_mask, + regex_pattern=regex_pattern, + mask_format=mask_format, + **encryption_context, + ) return fields[field_name] - def _normalize_data_to_parse(self, fields: list, data: str | dict) -> dict: - if not fields: - raise ValueError("No fields specified.") - + def _normalize_data_to_parse(self, data: str | dict) -> dict: if isinstance(data, str): # Parse JSON string as dictionary data_parsed = self.json_deserializer(data) diff --git a/aws_lambda_powertools/utilities/data_masking/provider/base.py b/aws_lambda_powertools/utilities/data_masking/provider/base.py index 8751becfb2c..16fa22d16b8 100644 --- a/aws_lambda_powertools/utilities/data_masking/provider/base.py +++ b/aws_lambda_powertools/utilities/data_masking/provider/base.py @@ -2,10 +2,14 @@ import functools import json -from typing import Any, Callable, Iterable +import re +from typing import Any, Callable from aws_lambda_powertools.utilities.data_masking.constants import DATA_MASKING_STRING +PRESERVE_CHARS = set("-_. ") +_regex_cache = {} + class BaseProvider: """ @@ -24,7 +28,7 @@ def encrypt(self, data) -> str: def decrypt(self, data) -> Any: # Implementation logic for data decryption - def erase(self, data) -> str | Iterable: + def erase(self, data) -> Any | Iterable: # Implementation logic for data masking pass @@ -63,19 +67,123 @@ def decrypt(self, data, provider_options: dict | None = None, **encryption_conte """ raise NotImplementedError("Subclasses must implement decrypt()") - def erase(self, data, **kwargs) -> Iterable[str]: - """ - This method irreversibly erases data. - - If the data to be erased is of type `str`, `dict`, or `bytes`, - this method will return an erased string, i.e. "*****". - - If the data to be erased is of an iterable type like `list`, `tuple`, - or `set`, this method will return a new object of the same type as the - input data but with each element replaced by the string "*****". - """ - if isinstance(data, (str, dict, bytes)): - return DATA_MASKING_STRING + def erase( + self, + data: Any, + dynamic_mask: bool | None = None, + custom_mask: str | None = None, + regex_pattern: str | None = None, + mask_format: str | None = None, + masking_rules: dict | None = None, + **kwargs, + ) -> Any: + + result: Any = DATA_MASKING_STRING + + if not any([dynamic_mask, custom_mask, regex_pattern, mask_format, masking_rules]): + if isinstance(data, (str, int, float, dict, bytes)): + return DATA_MASKING_STRING + elif isinstance(data, (list, tuple, set)): + return type(data)([DATA_MASKING_STRING] * len(data)) + else: + return DATA_MASKING_STRING + + if isinstance(data, (str, int, float)): + result = self._mask_primitive(str(data), dynamic_mask, custom_mask, regex_pattern, mask_format) + elif isinstance(data, dict): + result = self._mask_dict( + data, + dynamic_mask, + custom_mask, + regex_pattern, + mask_format, + masking_rules, + ) elif isinstance(data, (list, tuple, set)): - return type(data)([DATA_MASKING_STRING] * len(data)) - return DATA_MASKING_STRING + result = self._mask_iterable( + data, + dynamic_mask, + custom_mask, + regex_pattern, + mask_format, + masking_rules, + ) + + return result + + def _mask_primitive( + self, + data: str, + dynamic_mask: bool | None, + custom_mask: str | None, + regex_pattern: str | None, + mask_format: str | None, + ) -> str: + if regex_pattern and mask_format: + return self._regex_mask(data, regex_pattern, mask_format) + elif custom_mask: + return self._pattern_mask(data, custom_mask) + + return self._custom_erase(data) + + def _mask_dict( + self, + data: dict, + dynamic_mask: bool | None, + custom_mask: str | None, + regex_pattern: str | None, + mask_format: str | None, + masking_rules: dict | None, + ) -> dict: + return { + k: self.erase( + v, + dynamic_mask=dynamic_mask, + custom_mask=custom_mask, + regex_pattern=regex_pattern, + mask_format=mask_format, + masking_rules=masking_rules, + ) + for k, v in data.items() + } + + def _mask_iterable( + self, + data: list | tuple | set, + dynamic_mask: bool | None, + custom_mask: str | None, + regex_pattern: str | None, + mask_format: str | None, + masking_rules: dict | None, + ) -> list | tuple | set: + masked_data = [ + self.erase( + item, + dynamic_mask=dynamic_mask, + custom_mask=custom_mask, + regex_pattern=regex_pattern, + mask_format=mask_format, + masking_rules=masking_rules, + ) + for item in data + ] + return type(data)(masked_data) + + def _pattern_mask(self, data: str, pattern: str) -> str: + """Apply pattern masking to string data.""" + return pattern[: len(data)] if len(pattern) >= len(data) else pattern + + def _regex_mask(self, data: str, regex_pattern: str, mask_format: str) -> str: + """Apply regex masking to string data.""" + try: + if regex_pattern not in _regex_cache: + _regex_cache[regex_pattern] = re.compile(regex_pattern) + return _regex_cache[regex_pattern].sub(mask_format, data) + except re.error: + return data + + def _custom_erase(self, data: str) -> str: + if not data: + return "" + + return "".join("*" if char not in PRESERVE_CHARS else char for char in data) diff --git a/docs/utilities/data_masking.md b/docs/utilities/data_masking.md index 162292e79a0..94e470aa965 100644 --- a/docs/utilities/data_masking.md +++ b/docs/utilities/data_masking.md @@ -43,7 +43,7 @@ stateDiagram-v2 ## Terminology -**Erasing** replaces sensitive information **irreversibly** with a non-sensitive placeholder _(`*****`)_. This operation replaces data in-memory, making it a one-way action. +**Erasing** replaces sensitive information **irreversibly** with a non-sensitive placeholder _(`*****`)_, or with a customized mask. This operation replaces data in-memory, making it a one-way action. **Encrypting** transforms plaintext into ciphertext using an encryption algorithm and a cryptographic key. It allows you to encrypt any sensitive data, so only allowed personnel to decrypt it. Learn more about encryption [here](https://aws.amazon.com/blogs/security/importance-of-encryption-and-how-aws-can-help/){target="_blank"}. @@ -117,6 +117,52 @@ Erasing will remove the original data and replace it with a `*****`. This means --8<-- "examples/data_masking/src/getting_started_erase_data_output.json" ``` +#### Custom masking + +The `erase` method also supports additional flags for more advanced and flexible masking: + +=== "dynamic_mask" + + (bool) Enables dynamic masking behavior when set to `True`, by maintaining the original length and structure of the text replacing with *. + + > Expression: `data_masker.erase(data, fields=["address.zip"], dynamic_mask=True)` + + > Field result: `'street': '*** **** **'` + +=== "custom_mask" + + (str) Specifies a simple pattern for masking data. This pattern is applied directly to the input string, replacing all the original characters. For example, with a `custom_mask` of "XX-XX" applied to "12345", the result would be "XX-XX". + + > Expression: `data_masker.erase(data, fields=["address.zip"], custom_mask="XX")` + + > Field result: `'zip': 'XX'` + +=== "regex_pattern & mask_format" + + (str) `regex_pattern` defines a regular expression pattern used to identify parts of the input string that should be masked. This allows for more complex and flexible masking rules. It's used in conjunction with `mask_format`. + `mask_format` specifies the format to use when replacing parts of the string matched by `regex_pattern`. It can include placeholders (like \1, \2) to refer to captured groups in the regex pattern, allowing some parts of the original string to be preserved. + + > Expression: `data_masker.erase(data, fields=["email"], regex_pattern=r"(.)(.*)(@.*)", mask_format=r"\1****\3")` + + > Field result: `'email': 'j****@example.com'` + +=== "masking_rules" + + (dict) Allows you to apply different masking rules (flags) for each data field. + ```python hl_lines="20" + --8<-- "examples/data_masking/src/custom_data_masking.py" + ``` +=== "Input example" + + ```json + --8<-- "examples/data_masking/src/payload_custom_masking.json" + ``` +=== "Masking rules output example" + + ```json hl_lines="4 5 10 21" + --8<-- "examples/data_masking/src/output_custom_masking.json" + ``` + ### Encrypting data ???+ note "About static typing and encryption" diff --git a/examples/data_masking/src/custom_data_masking.py b/examples/data_masking/src/custom_data_masking.py new file mode 100644 index 00000000000..7b96f6f379f --- /dev/null +++ b/examples/data_masking/src/custom_data_masking.py @@ -0,0 +1,22 @@ +from __future__ import annotations + +from aws_lambda_powertools.utilities.data_masking import DataMasking +from aws_lambda_powertools.utilities.typing import LambdaContext + +data_masker = DataMasking() + + +def lambda_handler(event: dict, context: LambdaContext) -> dict: + data: dict = event.get("body", {}) + + # Masking rules for each field + masking_rules = { + "email": {"regex_pattern": "(.)(.*)(@.*)", "mask_format": r"\1****\3"}, + "age": {"dynamic_mask": True}, + "address.zip": {"custom_mask": "xxx"}, + "$.other_address[?(@.postcode > 12000)]": {"custom_mask": "Masked"}, + } + + result = data_masker.erase(data, masking_rules=masking_rules) + + return result diff --git a/examples/data_masking/src/output_custom_masking.json b/examples/data_masking/src/output_custom_masking.json new file mode 100644 index 00000000000..0571da99808 --- /dev/null +++ b/examples/data_masking/src/output_custom_masking.json @@ -0,0 +1,29 @@ +{ + "id": 1, + "name": "John Doe", + "age": "**", + "email": "j****@example.com", + "address": { + "street": "123 Main St", + "city": "Anytown", + "state": "CA", + "zip": "xxx", + "postcode": 12345, + "product": { + "name": "Car" + } + }, + "other_address": [ + { + "postcode": 11345, + "street": "123 Any Drive" + }, + "Masked" + ], + "company_address": { + "street": "456 ACME Ave", + "city": "Anytown", + "state": "CA", + "zip": "12345" + } +} \ No newline at end of file diff --git a/examples/data_masking/src/payload_custom_masking.json b/examples/data_masking/src/payload_custom_masking.json new file mode 100644 index 00000000000..d50b715ffa4 --- /dev/null +++ b/examples/data_masking/src/payload_custom_masking.json @@ -0,0 +1,34 @@ +{ + "body": { + "id": 1, + "name": "Jane Doe", + "age": 30, + "email": "janedoe@example.com", + "address": { + "street": "123 Main St", + "city": "Anytown", + "state": "CA", + "zip": "12345", + "postcode": 12345, + "product": { + "name": "Car" + } + }, + "other_address": [ + { + "postcode": 11345, + "street": "123 Any Drive" + }, + { + "postcode": 67890, + "street": "100 Main Street," + } + ], + "company_address": { + "street": "456 ACME Ave", + "city": "Anytown", + "state": "CA", + "zip": "12345" + } + } +} \ No newline at end of file diff --git a/noxfile.py b/noxfile.py index 7b73fd0dc59..4710bcbca2c 100644 --- a/noxfile.py +++ b/noxfile.py @@ -140,6 +140,8 @@ def test_with_aws_encryption_sdk_as_required_package(session: nox.Session): folders=[ f"{PREFIX_TESTS_FUNCTIONAL}/data_masking/_aws_encryption_sdk/", f"{PREFIX_TESTS_UNIT}/data_masking/_aws_encryption_sdk/", + f"{PREFIX_TESTS_FUNCTIONAL}/data_masking/required_dependencies/", + f"{PREFIX_TESTS_UNIT}/data_masking/required_dependencies/", ], extras="datamasking", ) diff --git a/tests/functional/data_masking/required_dependencies/test_erase_data_masking.py b/tests/functional/data_masking/required_dependencies/test_erase_data_masking.py new file mode 100644 index 00000000000..12ffd054376 --- /dev/null +++ b/tests/functional/data_masking/required_dependencies/test_erase_data_masking.py @@ -0,0 +1,488 @@ +import json + +import pytest + +from aws_lambda_powertools.utilities.data_masking.base import DataMasking +from aws_lambda_powertools.utilities.data_masking.constants import DATA_MASKING_STRING +from aws_lambda_powertools.utilities.data_masking.exceptions import ( + DataMaskingFieldNotFoundError, + DataMaskingUnsupportedTypeError, +) +from aws_lambda_powertools.warnings import PowertoolsUserWarning + + +@pytest.fixture +def data_masker() -> DataMasking: + return DataMasking() + + +def test_erase_int(data_masker): + # GIVEN an int data type + + # WHEN erase is called with no fields argument + erased_string = data_masker.erase(42) + + # THEN the result is the data masked + assert erased_string == DATA_MASKING_STRING + + +def test_erase_int_custom_mask(data_masker): + # GIVEN an int data type + + # WHEN erase is called with no fields argument + erased_string = data_masker.erase(42, custom_mask="XX") + + # THEN the result is the data masked + assert erased_string == "XX" + + +def test_erase_float(data_masker): + # GIVEN a float data type + + # WHEN erase is called with no fields argument + erased_string = data_masker.erase(4.2) + + # THEN the result is the data masked + assert erased_string == DATA_MASKING_STRING + + +def test_erase_bool(data_masker): + # GIVEN a bool data type + + # WHEN erase is called with no fields argument + erased_string = data_masker.erase(True) + + # THEN the result is the data masked + assert erased_string == DATA_MASKING_STRING + + +def test_erase_none(data_masker): + # GIVEN a None data type + + # WHEN erase is called with no fields argument + erased_string = data_masker.erase(None) + + # THEN the result is the data masked + assert erased_string == DATA_MASKING_STRING + + +def test_erase_str(data_masker): + # GIVEN a str data type + + # WHEN erase is called with no fields argument + erased_string = data_masker.erase("this is a string") + + # THEN the result is the data masked + assert erased_string == DATA_MASKING_STRING + + +def test_erase_list(data_masker): + # GIVEN a list data type + + # WHEN erase is called with no fields argument + erased_string = data_masker.erase([1, 2, "string", 3]) + + # THEN the result is the data masked, while maintaining type list + assert erased_string == [DATA_MASKING_STRING, DATA_MASKING_STRING, DATA_MASKING_STRING, DATA_MASKING_STRING] + + +def test_erase_dict(data_masker): + # GIVEN a dict data type + data = { + "a": { + "1": {"None": "hello", "four": "world"}, + "b": {"3": {"4": "goodbye", "e": "world"}}, + }, + } + + # WHEN erase is called with no fields argument + erased_string = data_masker.erase(data) + + # THEN the result is the data masked + assert erased_string == DATA_MASKING_STRING + + +def test_erase_dict_with_fields(data_masker): + # GIVEN a dict data type + data = { + "a": { + "1": {"None": "hello", "four": "world"}, + "b": {"3": {"4": "goodbye", "e": "world"}}, + }, + } + + # WHEN erase is called with a list of fields specified + erased_string = data_masker.erase(data, fields=["a.'1'.None", "a..'4'"]) + + # THEN the result is only the specified fields are erased + assert erased_string == { + "a": { + "1": {"None": DATA_MASKING_STRING, "four": "world"}, + "b": {"3": {"4": DATA_MASKING_STRING, "e": "world"}}, + }, + } + + +def test_erase_json_dict_with_fields(data_masker): + # GIVEN the data type is a json representation of a dictionary + data = json.dumps( + { + "a": { + "1": {"None": "hello", "four": "world"}, + "b": {"3": {"4": "goodbye", "e": "world"}}, + }, + }, + ) + + # WHEN erase is called with a list of fields specified + masked_json_string = data_masker.erase(data, fields=["a.'1'.None", "a..'4'"]) + + # THEN the result is only the specified fields are erased + assert masked_json_string == { + "a": { + "1": {"None": DATA_MASKING_STRING, "four": "world"}, + "b": {"3": {"4": DATA_MASKING_STRING, "e": "world"}}, + }, + } + + +def test_encrypt_not_implemented(data_masker): + # GIVEN DataMasking is not initialized with a Provider + + # WHEN attempting to call the encrypt method on the data + with pytest.raises(NotImplementedError): + # THEN the result is a NotImplementedError + data_masker.encrypt("hello world") + + +def test_decrypt_not_implemented(data_masker): + # GIVEN DataMasking is not initialized with a Provider + + # WHEN attempting to call the decrypt method on the data + with pytest.raises(NotImplementedError): + # THEN the result is a NotImplementedError + data_masker.decrypt("hello world") + + +def test_parsing_unsupported_data_type(data_masker): + # GIVEN an initialization of the DataMasking class + + # WHEN attempting to pass in a list of fields with input data that is not a dict + with pytest.raises(DataMaskingUnsupportedTypeError): + # THEN the result is a TypeError + data_masker.erase(42, ["this.field"]) + + +def test_parsing_with_empty_field(data_masker): + # GIVEN an initialization of the DataMasking class + + # WHEN attempting to pass in a list of fields with input data that is not a dict + with pytest.raises(ValueError): + # THEN the result is a TypeError + data_masker.erase(42, []) + + +def test_parsing_nonexistent_fields_with_raise_on_missing_field(): + # GIVEN a dict data type + + data_masker = DataMasking(raise_on_missing_field=True) + data = { + "3": { + "1": {"None": "hello", "four": "world"}, + "4": {"33": {"5": "goodbye", "e": "world"}}, + }, + } + + # WHEN attempting to pass in fields that do not exist in the input data + with pytest.raises(DataMaskingFieldNotFoundError): + # THEN the result is a KeyError + data_masker.erase(data, ["'3'..True"]) + + +def test_parsing_nonexistent_fields_warning_on_missing_field(): + # GIVEN a dict data type + + data_masker = DataMasking(raise_on_missing_field=False) + data = { + "3": { + "1": {"None": "hello", "four": "world"}, + "4": {"33": {"5": "goodbye", "e": "world"}}, + }, + } + + # WHEN erase is called with a non-existing field + with pytest.warns(UserWarning, match="Field or expression*"): + masked_json_string = data_masker.erase(data, fields=["non-existing"]) + + # THEN the "erased" payload is the same of the original + assert masked_json_string == data + + +def test_regex_mask(data_masker): + # GIVEN a str data type + data = "Hello! My name is John Doe" + + # WHEN erase is called with regex pattern and mask format + regex_pattern = r"\b[A-Z][a-z]+ [A-Z][a-z]+\b" + mask_format = "XXXX XXXX" + + result = data_masker.erase(data, regex_pattern=regex_pattern, mask_format=mask_format) + + # THEN the result is the regex part masked by the masked format + assert result == "Hello! My name is XXXX XXXX" + + +def test_regex_mask_with_cache(data_masker): + # GIVEN a str data type + data = "Hello! My name is John Doe" + data1 = "Hello! My name is John Xix" + + # WHEN erase is called with regex pattern and mask format + regex_pattern = r"\b[A-Z][a-z]+ [A-Z][a-z]+\b" + mask_format = "XXXX XXXX" + + # WHEN erasing twice to check the regex compiled and stored in the cache + result = data_masker.erase(data, regex_pattern=regex_pattern, mask_format=mask_format) + result1 = data_masker.erase(data1, regex_pattern=regex_pattern, mask_format=mask_format) + + # THEN the result is the regex part masked by the masked format + assert result == "Hello! My name is XXXX XXXX" + assert result1 == "Hello! My name is XXXX XXXX" + + +def test_erase_json_dict_with_fields_and_masks(data_masker): + # GIVEN the data type is a json representation of a dictionary + data = json.dumps( + { + "a": { + "1": {"None": "hello", "four": "world"}, + "b": {"3": {"4": "goodbye", "e": "world"}}, + }, + }, + ) + + # WHEN erase is called with a list of fields specified + masked_json_string = data_masker.erase(data, fields=["a.'1'.None", "a..'4'"], dynamic_mask=True) + + # THEN the result is only the specified fields are erased + assert masked_json_string == { + "a": { + "1": {"None": "*****", "four": "world"}, + "b": {"3": {"4": "*******", "e": "world"}}, + }, + } + + +def test_erase_json_dict_with_complex_masking_rules(data_masker): + # GIVEN the data type is a json representation of a dictionary with nested and filtered paths + data = { + "email": "johndoe@example.com", + "age": 30, + "address": {"zip": 13000, "street": "123 Main St", "details": {"name": "Home", "type": "Primary"}}, + } + + # WHEN erase is called with complex masking rules + masking_rules = { + "email": {"regex_pattern": "(.)(.*)(@.*)", "mask_format": r"\1****\3"}, + "age": {"dynamic_mask": True}, + "address.zip": {"custom_mask": "xxx"}, + } + + masked_json_string = data_masker.erase(data=data, masking_rules=masking_rules) + + # THEN the result should have all specified fields masked according to their rules + assert masked_json_string == { + "email": "j****@example.com", + "age": "**", + "address": {"zip": "xxx", "street": "123 Main St", "details": {"name": "Home", "type": "Primary"}}, + } + + +def test_dynamic_mask_with_string(data_masker): + # GIVEN the data type is a json representation of a dictionary with nested and filtered paths + data = "XYZEKDEDE" + + masked_json_string = data_masker.erase(data=data, dynamic_mask=True) + + # THEN the result should have all specified fields masked according to their rules + assert masked_json_string == "*********" + + +def test_no_matches_for_masking_rule(data_masker): + # GIVEN a dictionary without the expected field + data = {"name": "Ana"} + masking_rules = {"$.missing_field": {"dynamic_mask": True}} + + # WHEN applying the masking rule + with pytest.warns(UserWarning, match=r"No matches found *"): + result = data_masker.erase(data=data, masking_rules=masking_rules) + + # THEN the original data remains unchanged + assert result == data + + +def test_warning_during_masking_value(data_masker): + # GIVEN data and a masking rule + data = {"value": "test"} + + # Mock provider that raises an error + class MockProvider: + def erase(self, value, **kwargs): + raise ValueError("Mock error") + + data_masker.provider = MockProvider() + + # WHEN erase is called + with pytest.warns(expected_warning=PowertoolsUserWarning, match="Error masking value for path value: Mock error"): + masked_data = data_masker.erase(data, masking_rules={"value": {"rule": "value"}}) + + # THEN the original data should remain unchanged + assert masked_data["value"] == "test" + + +def test_mask_nested_field_success(data_masker): + # GIVEN nested data with a field to mask + data = {"user": {"contact": {"details": {"address": {"street": "123 Main St", "zip": "12345"}}}}} + + # WHEN masking a nested field with a masking rule + data_masked = data_masker.erase(data=data, fields=["user.contact.details.address.zip"], custom_mask="xxx") + + # THEN the nested field should be masked while other data remains unchanged + assert data_masked == {"user": {"contact": {"details": {"address": {"street": "123 Main St", "zip": "xxx"}}}}} + + +def test_erase_dictionary_with_masking_rules(data_masker): + # GIVEN a dictionary with nested sensitive data + data = {"user": {"name": "John Doe", "ssn": "123-45-6789", "address": {"street": "123 Main St", "zip": "12345"}}} + + # AND masking rules for specific fields + masking_rules = {"user.ssn": {"custom_mask": "XXX-XX-XXXX"}, "user.address.zip": {"custom_mask": "00000"}} + + # WHEN erase is called with masking rules + result = data_masker.erase(data, masking_rules=masking_rules) + + # THEN only the specified fields should be masked + assert result == { + "user": { + "name": "John Doe", # unchanged + "ssn": "XXX-XX-XXXX", # masked + "address": {"street": "123 Main St", "zip": "00000"}, # unchanged # masked + }, + } + + +def test_erase_dictionary_with_masking_rules_with_list(data_masker): + # GIVEN a dictionary with nested sensitive data + data = {"user": {"name": ["leandro", "powertools"]}} + + # AND masking rules for specific fields + masking_rules = {"user.name": {"custom_mask": "NO-NAME"}} + + # WHEN erase is called with masking rules + result = data_masker.erase(data, masking_rules=masking_rules) + + # THEN only the specified fields should be masked + assert result == { + "user": { + "name": "NO-NAME", + }, + } + + +def test_erase_list_with_custom_mask(data_masker): + # GIVEN a dictionary with nested sensitive data + data = {"user": {"name": ["leandro", "powertools"]}} + + # WHEN erase is called with masking rules + result = data_masker.erase(data, fields=["user.name"], dynamic_mask=True) + + # THEN only the specified fields should be masked + assert result == { + "user": { + "name": ["*******", "**********"], + }, + } + + +def test_erase_dictionary_with_global_mask(data_masker): + # GIVEN a dictionary with sensitive data + data = {"user": {"name": "John Doe", "ssn": "123-45-6789"}} + + # WHEN erase is called with a custom mask for all fields + result = data_masker.erase(data, custom_mask="REDACTED") + + # THEN all fields should use the custom mask + assert result == {"user": {"name": "REDACTED", "ssn": "REDACTED"}} + + +def test_erase_empty_dictionary(data_masker): + # GIVEN an empty dictionary + data = {} + + # WHEN erase is called + result = data_masker.erase(data, custom_mask="MASKED") + + # THEN an empty dictionary should be returned + assert result == {} + + +def test_erase_different_iterables_with_masking(data_masker): + # GIVEN different types of iterables + list_data = ["name", "phone", "email"] + tuple_data = ("name", "phone", "email") + set_data = {"name", "phone", "email"} + + # WHEN erase is called with a custom mask + masked_list = data_masker.erase(list_data, custom_mask="XXX") + masked_tuple = data_masker.erase(tuple_data, custom_mask="XXX") + masked_set = data_masker.erase(set_data, custom_mask="XXX") + + # THEN the masked data should maintain its original type + assert isinstance(masked_list, list) + assert isinstance(masked_tuple, tuple) + assert isinstance(masked_set, set) + + # AND all values should be masked + expected_values = {"XXX"} + assert set(masked_list) == expected_values + assert set(masked_tuple) == expected_values + assert masked_set == expected_values + + +def test_erase_handles_invalid_regex_pattern(data_masker): + # GIVEN a string and an invalid regex pattern + data = "test123" + + # WHEN masking with invalid regex + result = data_masker.erase( + data, + regex_pattern="[", + mask_format="X", # Invalid regex pattern that will raise re.error + ) + + # THEN original data should be returned + assert result == "test123" + + +def test_erase_handles_empty_string_with_dynamic_mask(data_masker): + # GIVEN an empty string + data = "" + + # WHEN erase is called with dynamic_mask + result = data_masker.erase(data, dynamic_mask=True) + + # THEN empty string should be returned + assert result == "" + + +def test_erase_dictionary_with_masking_rules_wrong_field(data_masker): + # GIVEN a dictionary with nested sensitive data + data = {"user": {"name": "John Doe", "ssn": "123-45-6789", "address": {"street": "123 Main St", "zip": "12345"}}} + + # AND masking rules for specific fields + masking_rules = {"user.ssn...": {"custom_mask": "XXX-XX-XXXX"}, "user.address.zip": {"custom_mask": "00000"}} + + # WHEN erase is called with wrong masking rules + # We must have a warning + with pytest.warns(expected_warning=PowertoolsUserWarning, match="Error processing path*"): + data_masker.erase(data, masking_rules=masking_rules) diff --git a/tests/unit/data_masking/_aws_encryption_sdk/test_unit_data_masking.py b/tests/unit/data_masking/_aws_encryption_sdk/test_unit_data_masking.py deleted file mode 100644 index 4fbbc188ceb..00000000000 --- a/tests/unit/data_masking/_aws_encryption_sdk/test_unit_data_masking.py +++ /dev/null @@ -1,207 +0,0 @@ -import json - -import pytest - -from aws_lambda_powertools.utilities.data_masking.base import DataMasking -from aws_lambda_powertools.utilities.data_masking.constants import DATA_MASKING_STRING -from aws_lambda_powertools.utilities.data_masking.exceptions import ( - DataMaskingFieldNotFoundError, - DataMaskingUnsupportedTypeError, -) - - -@pytest.fixture -def data_masker() -> DataMasking: - return DataMasking() - - -def test_erase_int(data_masker): - # GIVEN an int data type - - # WHEN erase is called with no fields argument - erased_string = data_masker.erase(42) - - # THEN the result is the data masked - assert erased_string == DATA_MASKING_STRING - - -def test_erase_float(data_masker): - # GIVEN a float data type - - # WHEN erase is called with no fields argument - erased_string = data_masker.erase(4.2) - - # THEN the result is the data masked - assert erased_string == DATA_MASKING_STRING - - -def test_erase_bool(data_masker): - # GIVEN a bool data type - - # WHEN erase is called with no fields argument - erased_string = data_masker.erase(True) - - # THEN the result is the data masked - assert erased_string == DATA_MASKING_STRING - - -def test_erase_none(data_masker): - # GIVEN a None data type - - # WHEN erase is called with no fields argument - erased_string = data_masker.erase(None) - - # THEN the result is the data masked - assert erased_string == DATA_MASKING_STRING - - -def test_erase_str(data_masker): - # GIVEN a str data type - - # WHEN erase is called with no fields argument - erased_string = data_masker.erase("this is a string") - - # THEN the result is the data masked - assert erased_string == DATA_MASKING_STRING - - -def test_erase_list(data_masker): - # GIVEN a list data type - - # WHEN erase is called with no fields argument - erased_string = data_masker.erase([1, 2, "string", 3]) - - # THEN the result is the data masked, while maintaining type list - assert erased_string == [DATA_MASKING_STRING, DATA_MASKING_STRING, DATA_MASKING_STRING, DATA_MASKING_STRING] - - -def test_erase_dict(data_masker): - # GIVEN a dict data type - data = { - "a": { - "1": {"None": "hello", "four": "world"}, - "b": {"3": {"4": "goodbye", "e": "world"}}, - }, - } - - # WHEN erase is called with no fields argument - erased_string = data_masker.erase(data) - - # THEN the result is the data masked - assert erased_string == DATA_MASKING_STRING - - -def test_erase_dict_with_fields(data_masker): - # GIVEN a dict data type - data = { - "a": { - "1": {"None": "hello", "four": "world"}, - "b": {"3": {"4": "goodbye", "e": "world"}}, - }, - } - - # WHEN erase is called with a list of fields specified - erased_string = data_masker.erase(data, fields=["a.'1'.None", "a..'4'"]) - - # THEN the result is only the specified fields are erased - assert erased_string == { - "a": { - "1": {"None": DATA_MASKING_STRING, "four": "world"}, - "b": {"3": {"4": DATA_MASKING_STRING, "e": "world"}}, - }, - } - - -def test_erase_json_dict_with_fields(data_masker): - # GIVEN the data type is a json representation of a dictionary - data = json.dumps( - { - "a": { - "1": {"None": "hello", "four": "world"}, - "b": {"3": {"4": "goodbye", "e": "world"}}, - }, - }, - ) - - # WHEN erase is called with a list of fields specified - masked_json_string = data_masker.erase(data, fields=["a.'1'.None", "a..'4'"]) - - # THEN the result is only the specified fields are erased - assert masked_json_string == { - "a": { - "1": {"None": DATA_MASKING_STRING, "four": "world"}, - "b": {"3": {"4": DATA_MASKING_STRING, "e": "world"}}, - }, - } - - -def test_encrypt_not_implemented(data_masker): - # GIVEN DataMasking is not initialized with a Provider - - # WHEN attempting to call the encrypt method on the data - with pytest.raises(NotImplementedError): - # THEN the result is a NotImplementedError - data_masker.encrypt("hello world") - - -def test_decrypt_not_implemented(data_masker): - # GIVEN DataMasking is not initialized with a Provider - - # WHEN attempting to call the decrypt method on the data - with pytest.raises(NotImplementedError): - # THEN the result is a NotImplementedError - data_masker.decrypt("hello world") - - -def test_parsing_unsupported_data_type(data_masker): - # GIVEN an initialization of the DataMasking class - - # WHEN attempting to pass in a list of fields with input data that is not a dict - with pytest.raises(DataMaskingUnsupportedTypeError): - # THEN the result is a TypeError - data_masker.erase(42, ["this.field"]) - - -def test_parsing_with_empty_field(data_masker): - # GIVEN an initialization of the DataMasking class - - # WHEN attempting to pass in a list of fields with input data that is not a dict - with pytest.raises(ValueError): - # THEN the result is a TypeError - data_masker.erase(42, []) - - -def test_parsing_nonexistent_fields_with_raise_on_missing_field(): - # GIVEN a dict data type - - data_masker = DataMasking(raise_on_missing_field=True) - data = { - "3": { - "1": {"None": "hello", "four": "world"}, - "4": {"33": {"5": "goodbye", "e": "world"}}, - }, - } - - # WHEN attempting to pass in fields that do not exist in the input data - with pytest.raises(DataMaskingFieldNotFoundError): - # THEN the result is a KeyError - data_masker.erase(data, ["'3'..True"]) - - -def test_parsing_nonexistent_fields_warning_on_missing_field(): - # GIVEN a dict data type - - data_masker = DataMasking(raise_on_missing_field=False) - data = { - "3": { - "1": {"None": "hello", "four": "world"}, - "4": {"33": {"5": "goodbye", "e": "world"}}, - }, - } - - # WHEN erase is called with a non-existing field - with pytest.warns(UserWarning, match="Field or expression*"): - masked_json_string = data_masker.erase(data, fields=["non-existing"]) - - # THEN the "erased" payload is the same of the original - assert masked_json_string == data diff --git a/tests/unit/data_masking/required_dependencies/test_base_functions.py b/tests/unit/data_masking/required_dependencies/test_base_functions.py new file mode 100644 index 00000000000..1af532967c7 --- /dev/null +++ b/tests/unit/data_masking/required_dependencies/test_base_functions.py @@ -0,0 +1,30 @@ +import pytest + +from aws_lambda_powertools.utilities.data_masking.base import DataMasking + + +@pytest.fixture +def data_masker() -> DataMasking: + return DataMasking() + + +def test_mask_nested_field_with_non_dict_value(data_masker): + # GIVEN nested data where a middle path component is not a dictionary + data = {"user": {"contact": "not_a_dict", "details": {"ssn": "123-45-6789"}}} # This will stop the traversal + + # WHEN attempting to mask a field through a path containing a non-dict value + data_masker._mask_nested_field(data, "user.contact.details.ssn", lambda x: "MASKED") + + # THEN the data should remain unchanged since traversal stopped at non-dict value + assert data == {"user": {"contact": "not_a_dict", "details": {"ssn": "123-45-6789"}}} + + +def test_mask_nested_field_success(data_masker): + # GIVEN nested data with a field to mask + data = {"user": {"contact": {"details": {"address": {"street": "123 Main St", "zip": "12345"}}}}} + + # WHEN masking a nested field with a masking rule + data_masker._mask_nested_field(data, "user.contact.details.address.zip", {"custom_mask": "xxx"}) + + # THEN the nested field should be masked while other data remains unchanged + assert data == {"user": {"contact": {"details": {"address": {"street": "123 Main St", "zip": "xxx"}}}}}