@@ -120,30 +120,7 @@ def get_consumer_offsets(self):
120
120
# {(consumer_group, topic, partition): offset}
121
121
consumer_offsets = {}
122
122
123
- if self .config ._monitor_unlisted_consumer_groups :
124
- # Get all consumer groups
125
- consumer_groups = []
126
- consumer_groups_future = self .kafka_client .list_consumer_groups ()
127
- self .log .debug ('MONITOR UNLISTED CG FUTURES: %s' , consumer_groups_future )
128
- try :
129
- list_consumer_groups_result = consumer_groups_future .result ()
130
- self .log .debug ('MONITOR UNLISTED FUTURES RESULT: %s' , list_consumer_groups_result )
131
-
132
- consumer_groups .extend (
133
- valid_consumer_group .group_id for valid_consumer_group in list_consumer_groups_result .valid
134
- )
135
- except Exception as e :
136
- self .log .error ("Failed to collect consumer offsets %s" , e )
137
-
138
- elif self .config ._consumer_groups :
139
- self ._validate_consumer_groups ()
140
- consumer_groups = self .config ._consumer_groups
141
-
142
- else :
143
- raise ConfigurationError (
144
- "Cannot fetch consumer offsets because no consumer_groups are specified and "
145
- "monitor_unlisted_consumer_groups is %s." % self .config ._monitor_unlisted_consumer_groups
146
- )
123
+ consumer_groups = self ._get_consumer_groups ()
147
124
148
125
for future in self ._get_consumer_offset_futures (consumer_groups ):
149
126
try :
@@ -178,6 +155,31 @@ def get_consumer_offsets(self):
178
155
179
156
return consumer_offsets
180
157
158
+ def _get_consumer_groups (self ):
159
+ if self .config ._monitor_unlisted_consumer_groups or self .config ._consumer_groups_regex :
160
+ # Get all consumer groups
161
+ consumer_groups = []
162
+ consumer_groups_future = self .kafka_client .list_consumer_groups ()
163
+ self .log .debug ('MONITOR UNLISTED CG FUTURES: %s' , consumer_groups_future )
164
+ try :
165
+ list_consumer_groups_result = consumer_groups_future .result ()
166
+ self .log .debug ('MONITOR UNLISTED FUTURES RESULT: %s' , list_consumer_groups_result )
167
+
168
+ consumer_groups .extend (
169
+ valid_consumer_group .group_id for valid_consumer_group in list_consumer_groups_result .valid
170
+ )
171
+ except Exception as e :
172
+ self .log .error ("Failed to collect consumer groups: %s" , e )
173
+ return consumer_groups
174
+ elif self .config ._consumer_groups :
175
+ self ._validate_consumer_groups ()
176
+ return self .config ._consumer_groups
177
+ else :
178
+ raise ConfigurationError (
179
+ "Cannot fetch consumer offsets because no consumer_groups are specified and "
180
+ "monitor_unlisted_consumer_groups is %s." % self .config ._monitor_unlisted_consumer_groups
181
+ )
182
+
181
183
def _get_consumer_offset_futures (self , consumer_groups ):
182
184
topics = self .kafka_client .list_topics (timeout = self .config ._request_timeout )
183
185
# {(consumer_group, topic, partition): offset}
@@ -221,24 +223,77 @@ def _get_topic_partitions(self, topics, consumer_group):
221
223
222
224
partitions = list (topics .topics [topic ].partitions .keys ())
223
225
224
- for partition in partitions :
225
- # Get all topic-partition combinations allowed based on config
226
- # if topics is None => collect all topics and partitions for the consumer group
227
- # if partitions is None => collect all partitions from the consumer group's topic
228
- if not self .config ._monitor_unlisted_consumer_groups and self .config ._consumer_groups .get (
229
- consumer_group
226
+ if self .config ._monitor_unlisted_consumer_groups :
227
+ for partition in partitions :
228
+ topic_partition = TopicPartition (topic , partition )
229
+ self .log .debug ("TOPIC PARTITION: %s" , topic_partition )
230
+ yield topic_partition
231
+
232
+ elif self .config ._consumer_groups_regex :
233
+ for filtered_topic_partition in self ._get_regex_filtered_topic_partitions (
234
+ consumer_group , topic , partitions
230
235
):
231
- if (
232
- self .config ._consumer_groups [consumer_group ]
233
- and topic not in self .config ._consumer_groups [consumer_group ]
234
- ):
236
+ topic_partition = TopicPartition (filtered_topic_partition [0 ], filtered_topic_partition [1 ])
237
+ self .log .debug ("TOPIC PARTITION: %s" , topic_partition )
238
+ yield topic_partition
239
+
240
+ if self .config ._consumer_groups :
241
+ for partition in partitions :
242
+ # Get all topic-partition combinations allowed based on config
243
+ # if topics is None => collect all topics and partitions for the consumer group
244
+ # if partitions is None => collect all partitions from the consumer group's topic
245
+ if self .config ._consumer_groups .get (consumer_group ):
246
+ if (
247
+ self .config ._consumer_groups [consumer_group ]
248
+ and topic not in self .config ._consumer_groups [consumer_group ]
249
+ ):
250
+ self .log .debug (
251
+ "Partition %s skipped because the topic %s is not in the consumer_group." ,
252
+ partition ,
253
+ topic ,
254
+ )
255
+ continue
256
+ if (
257
+ self .config ._consumer_groups [consumer_group ].get (topic )
258
+ and partition not in self .config ._consumer_groups [consumer_group ][topic ]
259
+ ):
260
+ self .log .debug (
261
+ "Partition %s skipped because it is not defined in the consumer group for the topic %s" ,
262
+ partition ,
263
+ topic ,
264
+ )
265
+ continue
266
+
267
+ topic_partition = TopicPartition (topic , partition )
268
+ self .log .debug ("TOPIC PARTITION: %s" , topic_partition )
269
+ yield topic_partition
270
+
271
+ def _get_regex_filtered_topic_partitions (self , consumer_group , topic , partitions ):
272
+ for partition in partitions :
273
+ # Do a regex filtering here for consumer groups
274
+ for consumer_group_compiled_regex in self .config ._consumer_groups_compiled_regex :
275
+ if not consumer_group_compiled_regex .match (consumer_group ):
276
+ return
277
+
278
+ consumer_group_topics_regex = self .config ._consumer_groups_compiled_regex .get (
279
+ consumer_group_compiled_regex
280
+ )
281
+
282
+ # If topics is empty, return all combinations of topic and partition
283
+ if not consumer_group_topics_regex :
284
+ yield (topic , partition )
285
+
286
+ # Do a regex filtering here for topics
287
+ for topic_regex in consumer_group_topics_regex :
288
+ if not topic_regex .match (topic ):
235
289
self .log .debug (
236
290
"Partition %s skipped because the topic %s is not in the consumer_group." , partition , topic
237
291
)
238
292
continue
293
+
239
294
if (
240
- self . config . _consumer_groups [ consumer_group ]. get (topic )
241
- and partition not in self . config . _consumer_groups [ consumer_group ][ topic ]
295
+ consumer_group_topics_regex . get (topic_regex )
296
+ and partition not in consumer_group_topics_regex [ topic_regex ]
242
297
):
243
298
self .log .debug (
244
299
"Partition %s skipped because it is not defined in the consumer group for the topic %s" ,
@@ -247,6 +302,4 @@ def _get_topic_partitions(self, topics, consumer_group):
247
302
)
248
303
continue
249
304
250
- self .log .debug ("TOPIC PARTITION: %s" , TopicPartition (topic , partition ))
251
-
252
- yield TopicPartition (topic , partition )
305
+ yield (topic , partition )
0 commit comments