Skip to content

Commit 6c58534

Browse files
authored
DGS-19492 Handle records nested in arrays/maps when searching for tags (#1890)
* Handle records nested in arrays/maps when searching for tags * Fix formatting
1 parent e91dc57 commit 6c58534

File tree

2 files changed

+80
-1
lines changed

2 files changed

+80
-1
lines changed

src/confluent_kafka/schema_registry/avro.py

+5-1
Original file line numberDiff line numberDiff line change
@@ -752,7 +752,11 @@ def _get_inline_tags_recursively(
752752
return
753753
else:
754754
schema_type = schema.get("type")
755-
if schema_type == 'record':
755+
if schema_type == 'array':
756+
_get_inline_tags_recursively(ns, name, schema.get("items"), tags)
757+
elif schema_type == 'map':
758+
_get_inline_tags_recursively(ns, name, schema.get("values"), tags)
759+
elif schema_type == 'record':
756760
record_ns = schema.get("namespace")
757761
record_name = schema.get("name")
758762
if record_ns is None:

tests/schema_registry/test_avro_serdes.py

+75
Original file line numberDiff line numberDiff line change
@@ -757,6 +757,81 @@ def test_avro_cel_field_transform_complex_with_none():
757757
assert obj2 == newobj
758758

759759

760+
def test_avro_cel_field_transform_complex_nested():
761+
conf = {'url': _BASE_URL}
762+
client = SchemaRegistryClient.new_client(conf)
763+
ser_conf = {'auto.register.schemas': False, 'use.latest.version': True}
764+
schema = {
765+
'type': 'record',
766+
'name': 'UnionTest',
767+
'namespace': 'test',
768+
'fields': [
769+
{
770+
'name': 'emails',
771+
'type': [
772+
'null',
773+
{
774+
'type': 'array',
775+
'items': {
776+
'type': 'record',
777+
'name': 'Email',
778+
'fields': [
779+
{
780+
'name': 'email',
781+
'type': [
782+
'null',
783+
'string'
784+
],
785+
'doc': 'Email address',
786+
'confluent:tags': [
787+
'PII'
788+
]
789+
}
790+
]
791+
}
792+
}
793+
],
794+
'doc': 'Communication Email',
795+
}
796+
]
797+
}
798+
799+
rule = Rule(
800+
"test-cel",
801+
"",
802+
RuleKind.TRANSFORM,
803+
RuleMode.WRITE,
804+
"CEL_FIELD",
805+
None,
806+
None,
807+
"typeName == 'STRING' ; value + '-suffix'",
808+
None,
809+
None,
810+
False
811+
)
812+
client.register_schema(_SUBJECT, Schema(
813+
json.dumps(schema),
814+
"AVRO",
815+
[],
816+
None,
817+
RuleSet(None, [rule])
818+
))
819+
820+
obj = {
821+
'emails': [{'email': '[email protected]'}]
822+
}
823+
ser = AvroSerializer(client, schema_str=None, conf=ser_conf)
824+
ser_ctx = SerializationContext(_TOPIC, MessageField.VALUE)
825+
obj_bytes = ser(obj, ser_ctx)
826+
827+
obj2 = {
828+
'emails': [{'email': '[email protected]'}]
829+
}
830+
deser = AvroDeserializer(client)
831+
newobj = deser(obj_bytes, ser_ctx)
832+
assert obj2 == newobj
833+
834+
760835
def test_avro_cel_field_condition():
761836
conf = {'url': _BASE_URL}
762837
client = SchemaRegistryClient.new_client(conf)

0 commit comments

Comments
 (0)