Skip to content

Kafka Producer gets stuck after **partition** **leadership** changes from one broker to another. #465

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
sudan opened this issue Jun 25, 2020 · 3 comments
Labels

Comments

@sudan
Copy link

sudan commented Jun 25, 2020

Describe the bug
Kafka Producer gets stuck after partition leadership changes from one broker to another. Once the message fails due
to Not Leader For Partition: the client attempted to send messages to a replica that is not the leader for some partition, the client's metadata are likely out of date, all subsequent messages do not get published

Kafka Version
What version(s) of Kafka are you testing against?
2.3.1

To Reproduce

`w := kafka.NewWriter(kafka.WriterConfig{
		Brokers:       strings.Split(Config.serverUrls, ","),
		Balancer:      &kafka.Hash{},
		Topic:         topic,
		MaxAttempts:   Config.retries,
		QueueCapacity: Config.queueCapacity,
		BatchSize:     Config.batchSize,
		BatchBytes:    Config.batchBytes,
		RequiredAcks:  Config.acks,
		Async:         false,
		WriteTimeout:  time.Duration(Config.writeTimeout) * time.Second,
		BatchTimeout: time.Duration(Config.batchTimeoutInMs) * time.Millisecond,
	})

         w.WriteMessages(ctx,
			kafka.Message{
				Key:   []byte(request.Key),
				Value: byteArr,
	})
`

If we run this function in a loop and if we reassign partitions of a topic from one broker to another using kafka-reassign-partitions.sh, it returns the above the error and subsequent messages gets stuck

Expected behavior
A clear and concise description of what you expected to happen.
Function should ideally refresh metadata cache and continue processing

Additional context
Add any other context about the problem here.
On debugging into the source code, found a couple of observations

in writer.go

func (w *writer) run() {
....
case wm, ok := <-w.msgs:
...
}

This channel never pulls data out but

func (w *writer) messages() chan<- writerMessage {
	return w.msgs
}

i can see messages are getting into this function.

This behaviour is after we get a client metadata change error

@sudan sudan added the bug label Jun 25, 2020
@brunocalza
Copy link

@sudan What version of the library are you using? We had the same problem today in production, we updated the library version to 0.3.7, and it works.

@vtolstov
Copy link

vtolstov commented Jul 5, 2020

i'm confirm that this error not present in 0.3.7

@stevevls
Copy link
Contributor

Thanks for the bug report and the independent confirmation that it's not present in v0.3.7. 😁 This is a duplicate of #451.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
Development

No branches or pull requests

4 participants