Skip to content

Producer stuck on TCP connection error #451

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
abuchanan-nr opened this issue May 21, 2020 · 2 comments
Closed

Producer stuck on TCP connection error #451

abuchanan-nr opened this issue May 21, 2020 · 2 comments
Labels

Comments

@abuchanan-nr
Copy link
Contributor

When a producer encounters a TCP connection error, it becomes completely blocked and doesn't recover with a new connection.

I suspect I introduced a bug in #382. I haven't verified that. If so, it would affect v0.3.5 and v0.3.6.

Kafka Version
Tested against kafka v0.10.2.1

To Reproduce

package main

import (
	"context"
	"log"
	"os"
	"time"

	"github.com/segmentio/kafka-go"
)

func main() {
	logger := log.New(os.Stderr, "kafkago", 0)
	writer := kafka.NewWriter(kafka.WriterConfig{
		Brokers:     []string{"127.0.0.1:9093"},
		Topic:       "test_topic",
		Logger:      logger,
		ErrorLogger: logger,
	})
	ctx := context.Background()

	for range time.Tick(time.Second) {
		log.Println("tick")

		err := writer.WriteMessages(ctx, kafka.Message{
			Key:   []byte("Key"),
			Value: []byte("Val"),
		})
		if err != nil {
			log.Println(err)
		}
	}
}

Then run docker stop kafka (assuming you're running a test kafka broker in a container).

The producer will correct retry once, but get stuck on the second attempt. I suspect that's because nothing is reading from the result channel being written to at this line:
https://github.com/segmentio/kafka-go/blob/master/writer.go#L748

Issue #426 could be related.

@lukemassa
Copy link

I believe I am seeing the same issue, recording some info here in case it's useful:

I create NewWriter() and try to write to it, however my brokers are unreachable (for unrelated reasons). I then see these log lines:

time="2020-07-08T22:25:24Z" level=error msg="error dialing kafka brokers for topic testtopic (partition 0): dial tcp 10.41.10.178:29092: connect: connection refused"
time="2020-07-08T22:25:24Z" level=error msg="error dialing kafka brokers for topic testtopic (partition 0): dial tcp 10.41.10.178:29092: connect: connection refused"
time="2020-07-08T22:25:25Z" level=error msg="error dialing kafka brokers for topic testtopic (partition 0): dial tcp 10.41.10.178:29092: connect: connection refused"

Followed by the code hanging. I killed go and grabbed the stack trace that it was hung up on:

goroutine 186 [chan send, 4 minutes]:
github.com/segmentio/kafka-go.(*writer).write(0xc000298000, 0x0, 0xc0001a4480, 0x1, 0x1, 0xc000136290, 0x1, 0x1, 0x0, 0x153b5c0, ...)
	/go/pkg/mod/github.com/segmentio/[email protected]/writer.go:734 +0x5ea
github.com/segmentio/kafka-go.(*writer).writeWithRetries(0xc000298000, 0x0, 0xc0001a4480, 0x1, 0x1, 0xc000136290, 0x1, 0x1, 0x203000, 0x203000, ...)
	/go/pkg/mod/github.com/segmentio/[email protected]/writer.go:715 +0x132
github.com/segmentio/kafka-go.(*writer).run(0xc000298000)
	/go/pkg/mod/github.com/segmentio/[email protected]/writer.go:675 +0x74c
created by github.com/segmentio/kafka-go.newWriter
	/go/pkg/mod/github.com/segmentio/[email protected]/writer.go:564 +0x1eb

@stevevls
Copy link
Contributor

Thanks all. We've fixed this bug in the latest version (v0.3.7). If you're still having issues after upgrading, please let us know.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
Development

No branches or pull requests

3 participants