@@ -55,27 +55,36 @@ def extract_send_headers(args, kwargs):
55
55
@staticmethod
56
56
def extract_send_partition (instance , args , kwargs ):
57
57
"""extract partition `send` method arguments, using the `_partition` method in KafkaProducer class"""
58
- topic = KafkaPropertiesExtractor .extract_send_topic (args )
59
- key = KafkaPropertiesExtractor .extract_send_key (args , kwargs )
60
- value = KafkaPropertiesExtractor .extract_send_value (args , kwargs )
61
- partition = KafkaPropertiesExtractor ._extract_argument (
62
- "partition" , 4 , None , args , kwargs
63
- )
64
- key_bytes = instance ._serialize (
65
- instance .config ["key_serializer" ], topic , key
66
- )
67
- value_bytes = instance ._serialize (
68
- instance .config ["value_serializer" ], topic , value
69
- )
70
- valid_types = (bytes , bytearray , memoryview , type (None ))
71
- if (
72
- type (key_bytes ) not in valid_types
73
- or type (value_bytes ) not in valid_types
74
- ):
58
+ try :
59
+ topic = KafkaPropertiesExtractor .extract_send_topic (args )
60
+ key = KafkaPropertiesExtractor .extract_send_key (args , kwargs )
61
+ value = KafkaPropertiesExtractor .extract_send_value (args , kwargs )
62
+ partition = KafkaPropertiesExtractor ._extract_argument (
63
+ "partition" , 4 , None , args , kwargs
64
+ )
65
+ key_bytes = instance ._serialize (
66
+ instance .config ["key_serializer" ], topic , key
67
+ )
68
+ value_bytes = instance ._serialize (
69
+ instance .config ["value_serializer" ], topic , value
70
+ )
71
+ valid_types = (bytes , bytearray , memoryview , type (None ))
72
+ if (
73
+ type (key_bytes ) not in valid_types
74
+ or type (value_bytes ) not in valid_types
75
+ ):
76
+ return None
77
+
78
+ all_partitions = instance ._metadata .partitions_for_topic (topic )
79
+ if all_partitions is None or len (all_partitions ) == 0 :
80
+ return None
81
+
82
+ return instance ._partition (
83
+ topic , partition , key , value , key_bytes , value_bytes
84
+ )
85
+ except Exception as exception : # pylint: disable=W0703
86
+ _LOG .debug ("Unable to extract partition: %s" , exception )
75
87
return None
76
- return instance ._partition (
77
- topic , partition , key , value , key_bytes , value_bytes
78
- )
79
88
80
89
81
90
ProduceHookT = Optional [Callable [[Span , List , Dict ], None ]]
0 commit comments