Skip to content

Commit 60592f6

Browse files
liutao365dnwe
andauthored
fix(consumer): add recovery from no leader partitions (#3101)
When some topic partitions have no leader due to Kafka broker failures, the Sarama consumer group should be able to continue consuming partitions that do have leaders and resume consuming the partitions that previously had no leader once they return to normal. Signed-off-by: liutao366 <[email protected]> Signed-off-by: Dominic Evans <[email protected]> Co-authored-by: Dominic Evans <[email protected]>
1 parent 9ae475a commit 60592f6

File tree

2 files changed

+34
-6
lines changed

2 files changed

+34
-6
lines changed

client.go

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -113,6 +113,9 @@ type Client interface {
113113
// LeastLoadedBroker retrieves broker that has the least responses pending
114114
LeastLoadedBroker() *Broker
115115

116+
// check if partition is readable
117+
PartitionNotReadable(topic string, partition int32) bool
118+
116119
// Close shuts down all broker connections managed by this client. It is required
117120
// to call this function before a client object passes out of scope, as it will
118121
// otherwise leak memory. You must close any Producers or Consumers using a client
@@ -1283,3 +1286,14 @@ type nopCloserClient struct {
12831286
func (ncc *nopCloserClient) Close() error {
12841287
return nil
12851288
}
1289+
1290+
func (client *client) PartitionNotReadable(topic string, partition int32) bool {
1291+
client.lock.RLock()
1292+
defer client.lock.RUnlock()
1293+
1294+
pm := client.metadata[topic][partition]
1295+
if pm == nil {
1296+
return true
1297+
}
1298+
return pm.Leader == -1
1299+
}

consumer_group.go

Lines changed: 20 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -861,18 +861,32 @@ func newConsumerGroupSession(ctx context.Context, parent *consumerGroup, claims
861861
return nil, err
862862
}
863863

864-
// start consuming
864+
// start consuming each topic partition in its own goroutine
865865
for topic, partitions := range claims {
866866
for _, partition := range partitions {
867-
sess.waitGroup.Add(1)
868-
867+
sess.waitGroup.Add(1) // increment wait group before spawning goroutine
869868
go func(topic string, partition int32) {
870869
defer sess.waitGroup.Done()
871-
872-
// cancel the as session as soon as the first
873-
// goroutine exits
870+
// cancel the group session as soon as any of the consume calls return
874871
defer sess.cancel()
875872

873+
// if partition not currently readable, wait for it to become readable
874+
if sess.parent.client.PartitionNotReadable(topic, partition) {
875+
timer := time.NewTimer(5 * time.Second)
876+
defer timer.Stop()
877+
878+
for sess.parent.client.PartitionNotReadable(topic, partition) {
879+
select {
880+
case <-ctx.Done():
881+
return
882+
case <-parent.closed:
883+
return
884+
case <-timer.C:
885+
timer.Reset(5 * time.Second)
886+
}
887+
}
888+
}
889+
876890
// consume a single topic/partition, blocking
877891
sess.consume(topic, partition)
878892
}(topic, partition)

0 commit comments

Comments
 (0)