Skip to content

WriteMessages "Unexpected EOF" , config with sasl-plain #795

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
tong3jie opened this issue Nov 20, 2021 · 55 comments
Closed

WriteMessages "Unexpected EOF" , config with sasl-plain #795

tong3jie opened this issue Nov 20, 2021 · 55 comments
Assignees
Labels

Comments

@tong3jie
Copy link
Contributor

tong3jie commented Nov 20, 2021

Describe the bug
when writer message to kafka server , it report that "Unexpected EOF"

both of report are about io or net

I can't solve it


Kafka Version
kafka server : 0.10.22
kafka-go : 0.4.23


To Reproduce

package utils

import (
	"context"
	"crypto/tls"
	"crypto/x509"
	"io/ioutil"
	"os"
	"path"
	"strings"
	"sync"
	"time"

	"github.com/segmentio/kafka-go"
	"github.com/segmentio/kafka-go/sasl/plain"
)

type Client struct {
	r      *kafka.Reader
	w      *kafka.Writer
	ctx    context.Context
	cancel context.CancelFunc
}

type Kafka struct {
	Clients  map[string]*Client
	Topics   map[string]struct{}
	CallBack func(message chan []byte)
	Lock     sync.Mutex
}

func NewKafkaDialer() *kafka.Dialer {
	conf := Conf
	dialer := &kafka.Dialer{}
	dialer.ClientID = "clientId"
	pwd, _ := os.Getwd()
	certBytes, err := ioutil.ReadFile(path.Join(pwd, "/utils/ca.cert"))
	if err != nil {
		Logger.Error("kafka client read cert file failed : " + err.Error())
	}
	clientCertPool := x509.NewCertPool()
	ok := clientCertPool.AppendCertsFromPEM(certBytes)
	if !ok {
		Logger.Error("kafka client failed to parse root certificate")
	}
	dialer.TLS = &tls.Config{
		RootCAs:            clientCertPool,
		InsecureSkipVerify: true,
	}
	dialer.SASLMechanism = plain.Mechanism{
		Username: conf.Kafka.UserName,
		Password: conf.Kafka.Password,
	}
	dialer.Timeout = time.Second * 10
	dialer.DualStack = true
	return dialer
}

func NewKafka(topics []string, callback func(message chan []byte)) *Kafka {
	conf := Conf
	brokers := strings.Split(conf.Kafka.Broker, ",")
	kafkaClients := make(map[string]*Client, len(topics))
	dialer := NewKafkaDialer()

	conn, err := dialer.DialContext(context.Background(), "tcp", brokers[0])
	if err != nil {
		Logger.Error("kafka dial failed : " + err.Error())
	}
	partitions, err := conn.ReadPartitions()
	if err != nil {
		Logger.Error("kafka read partitions failed : " + err.Error())
	}
	Topics := make(map[string]struct{}, len(partitions))

	for _, partition := range partitions {
		Topics[partition.Topic] = struct{}{}
	}

	for _, topic := range topics {
		if _, ok := Topics[topic]; !ok {
			return nil
		}
		r := kafka.NewReader(kafka.ReaderConfig{
			Brokers:        brokers,
			Topic:          topic,
			GroupID:        "server",
			MinBytes:       10e2,
			MaxBytes:       10e5,
			Dialer:         dialer,
			CommitInterval: 1 * time.Second,
		})

		w := kafka.NewWriter(kafka.WriterConfig{
			Brokers:  brokers,
			Topic:    topic,
			Balancer: &kafka.Hash{},
			Dialer:   dialer,
		})
		ctx, cancel := context.WithCancel(context.Background())
		kafkaClients[topic] = &Client{
			r:      r,
			w:      w,
			ctx:    ctx,
			cancel: cancel,
		}
	}
	return &Kafka{
		Clients:  kafkaClients,
		Topics:   Topics,
		CallBack: callback,
		Lock:     sync.Mutex{},
	}
}

func (k *Kafka) Pub(topic string, msg []byte) {
	if _, ok := k.Topics[topic]; !ok {
		return
	}

	err := k.Clients[topic].w.WriteMessages(k.Clients[topic].ctx, kafka.Message{
		Value: msg,
	})
	if err != nil {
		Logger.Error("kafka publish failed : " + err.Error())
	}
}

func (k *Kafka) Sub(topics []string) {
	for _, t := range topics {
		if _, ok := k.Topics[t]; !ok {
			return
		}
	}

	ch := make(chan []byte, 10000)
	go func(ch chan []byte) {
		for _, t := range topics {
			for {
				m, err := k.Clients[t].r.ReadMessage(k.Clients[t].ctx)
				if err != nil {
					Logger.Error("kafka subscribe failed : " + err.Error())
					return
				}
				ch <- m.Value
				
			}

		}
	}(ch)
	go k.CallBack(ch)
}

func (k *Kafka) Pause(topic string) {
	defer k.Lock.Unlock()
	k.Lock.Lock()
	if _, ok := k.Topics[topic]; !ok {
		return
	}
	k.Clients[topic].cancel()
	delete(k.Clients, topic)
	time.Sleep(time.Second * 5)
	k.Resume(topic)
}

func (k *Kafka) Resume(topic string) {
	if _, ok := k.Topics[topic]; !ok {
		return
	}
	r := kafka.NewReader(kafka.ReaderConfig{
		Brokers:  strings.Split(Conf.Kafka.Broker, ","),
		Topic:    topic,
		GroupID:  "server",
		MinBytes: 10e2,
		MaxBytes: 10e5,

		Dialer: NewKafkaDialer(),
	})
	w := kafka.NewWriter(kafka.WriterConfig{
		Brokers:  strings.Split(Conf.Kafka.Broker, ","),
		Topic:    topic,
		Balancer: &kafka.Hash{},
		Dialer:   NewKafkaDialer(),
	})
	ctx, cancel := context.WithCancel(context.Background())
	k.Clients[topic] = &Client{
		r:      r,
		w:      w,
		ctx:    ctx,
		cancel: cancel,
	}
}

func (k *Kafka) Close() {
	for _, c := range k.Clients {
		c.cancel()
		if err := c.r.Close(); err != nil {
			Logger.Error("failed to close reader:" + err.Error())
		}
	}
}

Expected behavior


Additional context
because kafka is not priority queue , so I must pause some low priority topics and resume it after little time .

@tong3jie tong3jie added the bug label Nov 20, 2021
@tong3jie
Copy link
Contributor Author

I can get all topics from kafka server

@tong3jie tong3jie changed the title ReaderMessage " io: read/write on closed pipe" and WriteMessages "Unexpected EOF", config with sasl-plain ReaderMessage " io: read/write on closed pipe" , config with sasl-plain Nov 22, 2021
@tong3jie tong3jie changed the title ReaderMessage " io: read/write on closed pipe" , config with sasl-plain WriteMessages "Unexpected EOF" , config with sasl-plain Nov 22, 2021
@tong3jie
Copy link
Contributor Author

is anybody here?

@tong3jie
Copy link
Contributor Author

I found the error is at here

func (d *decoder) readFull(b []byte) bool {

because read 4 bytes but not enough

@mrkagelui
Copy link
Contributor

not sure if it's the same issue, but #799 (comment) might help you

@tong3jie
Copy link
Contributor Author

tong3jie commented Dec 2, 2021

@mrkagelui same error, used your case

@mrkagelui
Copy link
Contributor

I hit EOF, not unexpected EOF today, it turned out to be network connectivity issue, have this code worked elsewhere?

@mrkagelui
Copy link
Contributor

wait, you said some operation was successful while others aren't? that's odd, not authentication issue then

@tong3jie
Copy link
Contributor Author

tong3jie commented Dec 5, 2021

yes ,I can list all topics and read messages

@rhansen2
Copy link
Collaborator

Hi, thanks for your patience,

Are you still experiencing this issue?

@Jrmy2402
Copy link

Jrmy2402 commented Feb 9, 2022

Hi, I have the same err with the version v0.4.28 and 0.10.2 for Kafka server version.
The error comes from readFull also.
If I use the Writer, I have the: error unexpected EOF

w := &kafka.Writer{
	Addr:  kafka.TCP("kafka:9093"),
	Topic: "topic",
	Transport: &kafka.Transport{
		SASL: plain.Mechanism{
			Username: "user",
			Password: "XXXXX",
		},
		TLS: &tls.Config{},
	},
}

err = w.WriteMessages(context.Background(),
	kafka.Message{
		Value: []byte("message"),
	},
)
if err != nil {
	log.Fatal("failed to write messages:", err)
}

But if I use Conn, it's work.

partition := 0
dialer := &kafka.Dialer{
	Timeout:   10 * time.Second,
	DualStack: true,
	TLS: &tls.Config{
		InsecureSkipVerify: true,
	},
	SASLMechanism: plain.Mechanism{
		Username: "user",
		Password: "XXXXX",
	},
}

conn, err := dialer.DialLeader(context.Background(), "tcp", "kafka:9093", "topic", partition)
if err != nil {
	log.Fatal("failed to dial leader:", err)
}

conn.SetWriteDeadline(time.Now().Add(10 * time.Second))
_, err = conn.WriteMessages(
	kafka.Message{
		Value: []byte("message"),
	},
)
if err != nil {
	log.Error("failed to write messages : " + err.Error())
}

With version 0.4.0 and the older versions, it works. The error seems to come with this commit: 81bd03c

@rhansen2
Copy link
Collaborator

Unfortunately I'm still not able to reproduce this issue. I haven't been able to successfully run version 0.10.2 with TLS locally so I've only been testing with more recent versions of Kafka. If you're able to, please provide a reproduction that includes a set-up for Kafka.

I'm wondering if there's potentially a difference between go versions, what version of go are you running?

If you're not able to provide a runnable reproduction could you potentially provide a dump of the communication between kafka-go and your Kafka server?

Thanks!

@Jrmy2402
Copy link

I use the go version go1.17 linux/amd64.
I am not able to provide you a runnable reproduction, sorry.
How can I make dumped the communication between Kafka-go and the server?

@rhansen2
Copy link
Collaborator

A tool like wireshark can be used to dump the network communication between kafka-go and the server. Setting a logger on the kafka.Writer type could also potentially give more information as to what's going on.

@lxxwilliam
Copy link

lxxwilliam commented Feb 16, 2022

I meet the bug too, and I found the error is at here
file: github.com\segmentio\[email protected]\writer.go
line: 734
but I found the conn can work
and I found a similarity from conn and newReader which them all used the Dialer, and, the old version from newWriter used the Dialer too
so,could you please check it, I mean, I suspect there's something wrong here

image

@lxxwilliam
Copy link

我也遇到了这个错误,我发现错误在这里 文件: github.com\segmentio\[email protected]\writer.go line: 734 但我发现 conn 可以工作 ,我发现 conn 有相似之处和newReader,他们都使用了拨号器,而且newWriter的旧版本也使用了拨号器 ,请你检查一下,我的意思是,我怀疑这里有问题

图片

#490 (comment)
I used the version 0.3.7 ,and the newWriter can work. so, there is maybe some bugs in the new versions, please check it, thanks.

@rhansen2
Copy link
Collaborator

I appreciate the additional reports; but without a reproduction or any information about the Kafka server configuration I'm not sure what else I can do to debug this.

I do encourage anyone experiencing the issue to submit a PR with a fix.

Thanks!

@lxxwilliam
Copy link

I appreciate the additional reports; but without a reproduction or any information about the Kafka server configuration I'm not sure what else I can do to debug this.

I do encourage anyone experiencing the issue to submit a PR with a fix.

Thanks!

OS : win10
Kafka Version
kafka server : 0.10.2
kafka-go : 0.4.28
Because of the version 0.4.x(maybe after 0.4.0) can not work, I suggest you can compare with the version 0.3.7 and 0.4.28 which code in Writer(0.4.x) and NewWriter(0.3.7),by the way, I found NewWriter in version 0.4.28 can not work too, the same bug.

@rhansen2
Copy link
Collaborator

You mentioned your os is windows10 is that for the client? I wonder if that's related? Unfortunately I don't have a way to test from Windows currently.

Is anyone else experiencing this error on windows?

@rhansen2
Copy link
Collaborator

rhansen2 commented Mar 5, 2022

Hi there,
I've recently merged a PR that might help narrow down the source of this issue. Could anyone retry their code with the @latest and let me know if you see the message broker appears to be expecting TLS?

Thanks!

@ThomasObenaus
Copy link

ThomasObenaus commented Mar 5, 2022

Hi @rhansen2 ,

I have the same issue

  1. Writing using connection works
tlsCert := tls.Certificate{
		Certificate: [][]byte{cert.Raw},
		PrivateKey:  key.(crypto.PrivateKey),
		Leaf:        cert,
	}

	dialer := &kafka.Dialer{
		Timeout:   10 * time.Second,
		DualStack: true,
		TLS: &tls.Config{
			InsecureSkipVerify: true,
			MaxVersion:       tls.VersionTLS12,
			Certificates:       []tls.Certificate{tlsCert},
		},
	}

conn, err := dialer.DialLeader(ctx, "tcp", "myserver:443", "my-topic", 0)
	if err != nil {
		log.Fatal(err)
	}
	defer conn.Close()

	conn.SetWriteDeadline(time.Now().Add(10 * time.Second))
	_, err = conn.WriteMessages(
		kafka.Message{Key:   []byte("Key-A"),Value: []byte("Hello World!")},
	)
	if err != nil {
		fmt.Printf("failed to write messages : %v\n", err)
	}

	fmt.Printf("WRITTEN\n")

But 2.
Writing using the NewWriter`does not work

	w := kafka.NewWriter(kafka.WriterConfig{
		Brokers:     []string{"myserver:443"},
		Topic:       "my-topic",
		Balancer:    &kafka.LeastBytes{},
		Dialer:      dialer,
		ErrorLogger: errLogger(),
		Logger:      logger(),
		BatchSize:   1,
	})

ctx, cancel = context.WithTimeout(context.Background(), time.Second*90)
	defer cancel()
	err = w.WriteMessages(ctx,
		kafka.Message{
			Key:   []byte("Key-A"),
			Value: []byte("Hello World!"),
		},
		kafka.Message{
			Key:   []byte("Key-B"),
			Value: []byte("One!"),
		},
		kafka.Message{
			Key:   []byte("Key-C"),
			Value: []byte("Two!"),
		},
	)

@ThomasObenaus
Copy link

ThomasObenaus commented Mar 5, 2022

Sadly I don't get the broker appears to be expecting TLS message.

In my setup using the NewWriter approach seams to get no response from the server.
The app always blocks forever at
id, res, err := ReadResponse(rw, req.ApiKey(), apiVersion) (in protocol/roundtrip.go)

image

@ThomasObenaus
Copy link

@rhansen2 ok it seams to end up in en endless loop when decoding the first message from the server.
In protocol/response.go at line 76
The statement res.decode(d, valueOf(msg)) blocks forever

There internally in protocol/decode.go it never leaves the for loop at at line 109 since d.remain never decrements

image

@ThomasObenaus
Copy link

My kafka server has version 2.6.0

@rhansen2
Copy link
Collaborator

rhansen2 commented Mar 7, 2022

HI @ThomasObenaus,

Are you able to provide a reproduction of your issue?

Thanks!

@ThomasObenaus
Copy link

ThomasObenaus commented Mar 7, 2022

@rhansen2 sorry I have no full example for reproduction since I don't know how I should provide the Kafka server configuration of my company.

But the code snipped I use is:

package main

import (
	"context"
	"crypto"
	"crypto/tls"
	"crypto/x509"
	"fmt"
	"io/ioutil"
	"log"
	"time"

	"github.com/pkg/errors"
	"github.com/segmentio/kafka-go"

	"software.sslmate.com/src/go-pkcs12"
)

func main() {
	host := "my-kafka-server:443"
	topic := "my-topic"
	trustStorePassword := "abcdefg"
	keyStorePassword := "xyzuvw"

	tlsConfig, err := tlsCfg("ca.p12", trustStorePassword, "user.p12", keyStorePassword)
	if err != nil {
		log.Fatal(err)
	}

	dialer := &kafka.Dialer{
		Timeout:   10 * time.Second,
		DualStack: true,
		TLS:       tlsConfig,
	}

	if err := writeUsingConnection(dialer, host, topic); err != nil {
		log.Fatal(err)
	}

	if err := writeUsingWriter(dialer, host, topic); err != nil {
		log.Fatal(err)
	}

	if err := read(dialer, host, topic, time.Second*10); err != nil {
		log.Fatal(err)
	}
}

func logger(silence bool) kafka.LoggerFunc {
	return func(msg string, args ...interface{}) {
		if silence {
			return
		}
		fmt.Printf(fmt.Sprintf("[DBG] %s\n", msg), args...)
	}
}

func errLogger() kafka.LoggerFunc {
	return func(msg string, args ...interface{}) {
		fmt.Printf(fmt.Sprintf("[Error] %s\n", msg), args...)
	}
}

func tlsCfg(trustStoreFile string, trustStorePassword string, keyStoreFile string, keyStorePassword string) (*tls.Config, error) {
	trustStore, err := ioutil.ReadFile(trustStoreFile)
	if err != nil {
		return nil, errors.Wrap(err, "loading trust store")
	}

	trustStoreCerts, err := pkcs12.DecodeTrustStore(trustStore, trustStorePassword)
	if err != nil {
		return nil, errors.Wrap(err, "decoding trust store")
	}

	certPool, err := x509.SystemCertPool()
	if err != nil {
		return nil, errors.Wrap(err, "opening system cert pool")
	}

	for _, cert := range trustStoreCerts {
		certPool.AddCert(cert)
	}

	keyStore, err := ioutil.ReadFile(keyStoreFile)
	if err != nil {
		return nil, errors.Wrap(err, "loading key store")
	}

	keyStoreKey, keyStoreCert, err := pkcs12.Decode(keyStore, keyStorePassword)
	if err != nil {
		return nil, errors.Wrap(err, "decoding key store")
	}

	clientCert := tls.Certificate{
		Certificate: [][]byte{keyStoreCert.Raw},
		PrivateKey:  keyStoreKey.(crypto.PrivateKey),
		Leaf:        keyStoreCert,
	}

	return &tls.Config{
		InsecureSkipVerify: false,
		MaxVersion:         tls.VersionTLS12,
		Certificates:       []tls.Certificate{clientCert},
		RootCAs:            certPool,
	}, nil
}

func writeUsingConnection(dialer *kafka.Dialer, host string, topic string) error {
	ctx, cancel := context.WithTimeout(context.Background(), time.Second*2)
	defer cancel()

	conn, err := dialer.DialContext(ctx, "tcp", host)
	if err != nil {
		return errors.Wrap(err, "opening connection")
	}
	defer conn.Close()
	fmt.Println("[INFO] Connected")

	fmt.Println("[INFO] Start write (using connection)")
	connLeader, err := dialer.DialLeader(ctx, "tcp", host, topic, 0)
	if err != nil {
		return errors.Wrap(err, "opening connection to leader")
	}
	defer connLeader.Close()

	connLeader.SetWriteDeadline(time.Now().Add(2 * time.Second))
	bWritten, err := connLeader.WriteMessages(
		kafka.Message{Key: []byte("Key-Conn"), Value: []byte("Message Written Using Connection")})
	if err != nil {
		return errors.Wrap(err, "writing message")
	}

	fmt.Printf("[INFO] Done write (using connection) %d Bytes\n", bWritten)
	return nil
}

func writeUsingWriter(dialer *kafka.Dialer, host string, topic string) error {
	fmt.Println("[INFO] Start write (using NewWriter)")

	w := kafka.NewWriter(kafka.WriterConfig{
		Brokers:     []string{host},
		Topic:       topic,
		Balancer:    &kafka.LeastBytes{},
		Dialer:      dialer,
		ErrorLogger: errLogger(),
		Logger:      logger(true),
		BatchSize:   1,
	})
	defer func() {
		if err := w.Close(); err != nil {
			fmt.Printf("[Error] Failed to close writer: %v\n", err)
		}
	}()

	ctx, cancel := context.WithTimeout(context.Background(), time.Second*2)
	defer cancel()

	err := w.WriteMessages(ctx,
		kafka.Message{Key: []byte("Key-Writer"), Value: []byte("Message Written Using Writer")},
	)
	if err != nil {
		return errors.Wrap(err, "writing messages")
	}
	fmt.Println("[INFO] Done write (using NewWriter)")
	return nil
}

func read(dialer *kafka.Dialer, host string, topic string, readFor time.Duration) error {
	fmt.Println("[INFO] Start reading")

	r := kafka.NewReader(kafka.ReaderConfig{
		Brokers:     []string{host},
		GroupID:     "consumer-group-1",
		Topic:       topic,
		Dialer:      dialer,
		ErrorLogger: errLogger(),
		Logger:      logger(true),
		MaxWait:     time.Second,
	})
	defer func() {
		if err := r.Close(); err != nil {
			fmt.Printf("[Error] Failed to close reader: %v\n", err)
		}
	}()

	ctx, cancel := context.WithTimeout(context.Background(), readFor)
	defer cancel()

	for i := 0; i < 100; i++ {
		subCtx, cancelSubCtx := context.WithTimeout(ctx, time.Second*2)
		defer cancelSubCtx()

		m, err := r.ReadMessage(subCtx)
		if err != nil {
			globalDeadline, _ := ctx.Deadline()
			if time.Now().After(globalDeadline) {
				break
			}
			subDeadline, _ := subCtx.Deadline()
			if time.Now().After(subDeadline) {
				fmt.Printf("\r(%0.2fs)...", globalDeadline.Sub(time.Now()).Seconds())
				continue
			}

			fmt.Printf("\n[Error] Reading %v\n", err)
			break
		}
		fmt.Printf("\n[INFO] Message at offset %d: %s = %s\n", m.Offset, string(m.Key), string(m.Value))
	}

	fmt.Println("\n[INFO] Done reading")
	return nil
}

@ThomasObenaus
Copy link

@rhansen2
In the meantime I got it working on my side (also with version v0.4.29).

The root cause seams to have something to do with the TLS configuration when one (like me) skips the server certificate verification.

Does not work

with InsecureSkipVerify: true, I can't write using the writer approach. But writing via connection works.

Output

[INFO] Connected
[INFO] Start write (using connection)
[INFO] Done write (using connection) 40 Bytes
[INFO] Start write (using NewWriter)
2022/03/07 21:04:18 writing messages: context deadline exceeded
exit status 1
make: *** [Makefile:4: producer] Error 1

Works

with InsecureSkipVerify: false, and providing the needed CA's to actually do the verification, everything works.

Output

[INFO] Connected
[INFO] Start write (using connection)
[INFO] Done write (using connection) 40 Bytes
[INFO] Start write (using NewWriter)
[INFO] Done write (using NewWriter)
[INFO] Start reading
(6.00s)...
[INFO] Message at offset 96: Key-Conn = Message Written Using Connection

[INFO] Message at offset 97: Key-Writer = Message Written Using Writer
(1.17s)...
[INFO] Done reading

@rhansen2
Copy link
Collaborator

rhansen2 commented Mar 7, 2022

Glad we were able to get that fixed!
I've released https://github.com/segmentio/kafka-go/releases/tag/v0.4.30 which contains the fix.

@tong3jie @Jrmy2402 @lxxwilliam If you can please retry with the latest release and let me know if your issue persists.

Thanks!

@Jrmy2402
Copy link

Jrmy2402 commented Mar 7, 2022

Hi, I always have the error: unexpected EOF on my side.
The error comes from here:

numPartitions, err := w.partitions(ctx, topic)
and
if state.err != nil {

And it's work if I use

partition := 0
dialer := &kafka.Dialer{
	Timeout:   10 * time.Second,
	DualStack: true,
	TLS: &tls.Config{
		InsecureSkipVerify: true,
	},
	SASLMechanism: plain.Mechanism{
		Username: "user",
		Password: "XXXXX",
	},
}

conn, err := dialer.DialLeader(context.Background(), "tcp", "kafka:9093", "topic", partition)
if err != nil {
	log.Fatal("failed to dial leader:", err)
}

conn.SetWriteDeadline(time.Now().Add(10 * time.Second))
_, err = conn.WriteMessages(
	kafka.Message{
		Value: []byte("message"),
	},
)
if err != nil {
	log.Error("failed to write messages : " + err.Error())
}

@rhansen2
Copy link
Collaborator

rhansen2 commented Mar 7, 2022

@Jrmy2402 To confirm you're still seeing the unexpected EOF with v0.4.30?

@Jrmy2402
Copy link

Jrmy2402 commented Mar 7, 2022

Yes i still seeing the unexpected EOF with v0.4.30

@rhansen2
Copy link
Collaborator

rhansen2 commented Mar 7, 2022

😞 Thanks!

Could you possible share the TLS config your kafka brokers are using?

Additionally is the code from #795 (comment) still what you're using to test? In that example you don't set InsecureSkipVerify in the Writer's TLS Config.

@lxxwilliam
Copy link

Glad we were able to get that fixed! I've released https://github.com/segmentio/kafka-go/releases/tag/v0.4.30 which contains the fix.

@tong3jie @Jrmy2402 @lxxwilliam If you can please retry with the latest release and let me know if your issue persists.

Thanks!

sad! got unexpected EOF too!

@rhansen2
Copy link
Collaborator

rhansen2 commented Mar 8, 2022

@lxxwilliam are you able to provide the configuration of your Kafka brokers?

@lxxwilliam
Copy link

@lxxwilliam are you able to provide the configuration of your Kafka brokers?

`kafkaDialer := &kafka.Dialer{
Timeout: 10 * time.Second,
DualStack: true,
SASLMechanism: &plain.Mechanism{
Username: "xxx",
Password: "xxx",
},
TLS: &tls.Config{InsecureSkipVerify: true},
}

brokers := []string{"xxx"}
topic := "xxx"

w := kafka.NewWriter(kafka.WriterConfig{
    Brokers:      brokers,
    Topic:        topic,
    Dialer:       kafkaDialer,
    Balancer:     &kafka.LeastBytes{},
    BatchTimeout: 100 * time.Millisecond,
    BatchSize:    100,
})
defer w.Close()

err := w.WriteMessages(context.Background(),
    kafka.Message{
        Key:   []byte("Key-A"),
        Value: []byte("Hello World1!"),
    },
)`

@rhansen2
Copy link
Collaborator

@lxxwilliam I was hoping for Kafka broker configurations not the Writer config. Sorry for any confusion.

@curtisr7
Copy link

@rhansen2 I'm mostly able to recreate this in my dev env. (not using sasl, but seeing EOF behavior when using kafka.Writer{... vs dialer.DialLeader(...)

Any chance you'd be down for a hangout/screenshare so I can gather whatever data you need?

@rhansen2
Copy link
Collaborator

@curtisr7 Without more information I'm not sure what I'd be looking for beyond the information I've been requesting such as Kafka broker configurations and potentially packet captures between the Writer and the Kafka Brokers.

@curtisr7
Copy link

Gotcha. Here is a snippet from my kafka pod coming up(not sure if this is helpful)

�[38;5;6mkafka �[38;5;5m18:47:04.23 �[0m
�[38;5;6mkafka �[38;5;5m18:47:04.23 �[0m�[1mWelcome to the Bitnami kafka container�[0m
�[38;5;6mkafka �[38;5;5m18:47:04.23 �[0mSubscribe to project updates by watching �[1mhttps://github.com/bitnami/bitnami-docker-kafka�[0m
�[38;5;6mkafka �[38;5;5m18:47:04.23 �[0mSubmit issues and feature requests at �[1mhttps://github.com/bitnami/bitnami-docker-kafka/issues�[0m
�[38;5;6mkafka �[38;5;5m18:47:04.24 �[0m
�[38;5;6mkafka �[38;5;5m18:47:04.24 �[0m�[38;5;2mINFO �[0m ==> ** Starting Kafka setup **
�[38;5;6mkafka �[38;5;5m18:47:04.33 �[0m�[38;5;3mWARN �[0m ==> You set the environment variable ALLOW_PLAINTEXT_LISTENER=yes. For safety reasons, do not use this flag in a production environment.
�[38;5;6mkafka �[38;5;5m18:47:04.34 �[0m�[38;5;2mINFO �[0m ==> Initializing Kafka...
�[38;5;6mkafka �[38;5;5m18:47:04.35 �[0m�[38;5;2mINFO �[0m ==> No injected configuration files found, creating default config files
�[38;5;6mkafka �[38;5;5m18:47:04.65 �[0m�[38;5;2mINFO �[0m ==> Configuring Kafka for inter-broker communications with SSL authentication.
�[38;5;6mkafka �[38;5;5m18:47:04.70 �[0m�[38;5;2mINFO �[0m ==> Configuring Kafka for client communications with SSL authentication.

�[38;5;6mkafka �[38;5;5m18:47:04.73 �[0m�[38;5;2mINFO �[0m ==> ** Kafka setup finished! **
�[38;5;6mkafka �[38;5;5m18:47:04.75 �[0m�[38;5;2mINFO �[0m ==> ** Starting Kafka **
[2022-03-14 18:47:06,057] INFO Registered kafka:type=kafka.Log4jController MBean (kafka.utils.Log4jControllerRegistration$)
[2022-03-14 18:47:06,603] INFO Setting -D jdk.tls.rejectClientInitiatedRenegotiation=true to disable client-initiated TLS renegotiation (org.apache.zookeeper.common.X509Util)
[2022-03-14 18:47:06,715] INFO Registered signal handlers for TERM, INT, HUP (org.apache.kafka.common.utils.LoggingSignalHandler)
[2022-03-14 18:47:06,719] INFO starting (kafka.server.KafkaServer)
[2022-03-14 18:47:06,720] INFO Connecting to zookeeper on kafka-zookeeper (kafka.server.KafkaServer)
[2022-03-14 18:47:06,742] INFO [ZooKeeperClient Kafka server] Initializing a new session to kafka-zookeeper. (kafka.zookeeper.ZooKeeperClient)
[2022-03-14 18:47:06,749] INFO Client environment:zookeeper.version=3.6.3--6401e4ad2087061bc6b9f80dec2d69f2e3c8660a, built on 04/08/2021 16:35 GMT (org.apache.zookeeper.ZooKeeper)
[2022-03-14 18:47:06,749] INFO Client environment:host.name=kafka-0.kafka-headless.kafka.svc.cluster.local (org.apache.zookeeper.ZooKeeper)
[2022-03-14 18:47:06,749] INFO Client environment:java.version=11.0.14 (org.apache.zookeeper.ZooKeeper)
[2022-03-14 18:47:06,749] INFO Client environment:java.vendor=BellSoft (org.apache.zookeeper.ZooKeeper)
[2022-03-14 18:47:06,749] INFO Client environment:java.home=/opt/bitnami/java (org.apache.zookeeper.ZooKeeper)
[2022-03-14 18:47:06,749] INFO Client environment:java.class.path=/opt/bitnami/kafka/bin/../libs/activation-1.1.1.jar:/opt/bitnami/kafka/bin/../libs/aopalliance-repackaged-2.6.1.jar:/opt/bitnami/kafka/bin/../libs/argparse4j-0.7.0.jar:/opt/bitnami/kafka/bin/../libs/audience-annotations-0.5.0.jar:/opt/bitnami/kafka/bin/../libs/commons-cli-1.4.jar:/opt/bitnami/kafka/bin/../libs/commons-lang3-3.8.1.jar:/opt/bitnami/kafka/bin/../libs/connect-api-3.1.0.jar:/opt/bitnami/kafka/bin/../libs/connect-basic-auth-extension-3.1.0.jar:/opt/bitnami/kafka/bin/../libs/connect-file-3.1.0.jar:/opt/bitnami/kafka/bin/../libs/connect-json-3.1.0.jar:/opt/bitnami/kafka/bin/../libs/connect-mirror-3.1.0.jar:/opt/bitnami/kafka/bin/../libs/connect-mirror-client-3.1.0.jar:/opt/bitnami/kafka/bin/../libs/connect-runtime-3.1.0.jar:/opt/bitnami/kafka/bin/../libs/connect-transforms-3.1.0.jar:/opt/bitnami/kafka/bin/../libs/hk2-api-2.6.1.jar:/opt/bitnami/kafka/bin/../libs/hk2-locator-2.6.1.jar:/opt/bitnami/kafka/bin/../libs/hk2-utils-2.6.1.jar:/opt/bitnami/kafka/bin/../libs/jackson-annotations-2.12.3.jar:/opt/bitnami/kafka/bin/../libs/jackson-core-2.12.3.jar:/opt/bitnami/kafka/bin/../libs/jackson-databind-2.12.3.jar:/opt/bitnami/kafka/bin/../libs/jackson-dataformat-csv-2.12.3.jar:/opt/bitnami/kafka/bin/../libs/jackson-datatype-jdk8-2.12.3.jar:/opt/bitnami/kafka/bin/../libs/jackson-jaxrs-base-2.12.3.jar:/opt/bitnami/kafka/bin/../libs/jackson-jaxrs-json-provider-2.12.3.jar:/opt/bitnami/kafka/bin/../libs/jackson-module-jaxb-annotations-2.12.3.jar:/opt/bitnami/kafka/bin/../libs/jackson-module-scala_2.12-2.12.3.jar:/opt/bitnami/kafka/bin/../libs/jakarta.activation-api-1.2.1.jar:/opt/bitnami/kafka/bin/../libs/jakarta.annotation-api-1.3.5.jar:/opt/bitnami/kafka/bin/../libs/jakarta.inject-2.6.1.jar:/opt/bitnami/kafka/bin/../libs/jakarta.validation-api-2.0.2.jar:/opt/bitnami/kafka/bin/../libs/jakarta.ws.rs-api-2.1.6.jar:/opt/bitnami/kafka/bin/../libs/jakarta.xml.bind-api-2.3.2.jar:/opt/bitnami/kafka/bin/../libs/javassist-3.27.0-GA.jar:/opt/bitnami/kafka/bin/../libs/javax.servlet-api-3.1.0.jar:/opt/bitnami/kafka/bin/../libs/javax.ws.rs-api-2.1.1.jar:/opt/bitnami/kafka/bin/../libs/jaxb-api-2.3.0.jar:/opt/bitnami/kafka/bin/../libs/jersey-client-2.34.jar:/opt/bitnami/kafka/bin/../libs/jersey-common-2.34.jar:/opt/bitnami/kafka/bin/../libs/jersey-container-servlet-2.34.jar:/opt/bitnami/kafka/bin/../libs/jersey-container-servlet-core-2.34.jar:/opt/bitnami/kafka/bin/../libs/jersey-hk2-2.34.jar:/opt/bitnami/kafka/bin/../libs/jersey-server-2.34.jar:/opt/bitnami/kafka/bin/../libs/jetty-client-9.4.43.v20210629.jar:/opt/bitnami/kafka/bin/../libs/jetty-continuation-9.4.43.v20210629.jar:/opt/bitnami/kafka/bin/../libs/jetty-http-9.4.43.v20210629.jar:/opt/bitnami/kafka/bin/../libs/jetty-io-9.4.43.v20210629.jar:/opt/bitnami/kafka/bin/../libs/jetty-security-9.4.43.v20210629.jar:/opt/bitnami/kafka/bin/../libs/jetty-server-9.4.43.v20210629.jar:/opt/bitnami/kafka/bin/../libs/jetty-servlet-9.4.43.v20210629.jar:/opt/bitnami/kafka/bin/../libs/jetty-servlets-9.4.43.v20210629.jar:/opt/bitnami/kafka/bin/../libs/jetty-util-9.4.43.v20210629.jar:/opt/bitnami/kafka/bin/../libs/jetty-util-ajax-9.4.43.v20210629.jar:/opt/bitnami/kafka/bin/../libs/jline-3.12.1.jar:/opt/bitnami/kafka/bin/../libs/jopt-simple-5.0.4.jar:/opt/bitnami/kafka/bin/../libs/jose4j-0.7.8.jar:/opt/bitnami/kafka/bin/../libs/kafka-clients-3.1.0.jar:/opt/bitnami/kafka/bin/../libs/kafka-log4j-appender-3.1.0.jar:/opt/bitnami/kafka/bin/../libs/kafka-metadata-3.1.0.jar:/opt/bitnami/kafka/bin/../libs/kafka-raft-3.1.0.jar:/opt/bitnami/kafka/bin/../libs/kafka-server-common-3.1.0.jar:/opt/bitnami/kafka/bin/../libs/kafka-shell-3.1.0.jar:/opt/bitnami/kafka/bin/../libs/kafka-storage-3.1.0.jar:/opt/bitnami/kafka/bin/../libs/kafka-storage-api-3.1.0.jar:/opt/bitnami/kafka/bin/../libs/kafka-streams-3.1.0.jar:/opt/bitnami/kafka/bin/../libs/kafka-streams-examples-3.1.0.jar:/opt/bitnami/kafka/bin/../libs/kafka-streams-scala_2.12-3.1.0.jar:/opt/bitnami/kafka/bin/../libs/kafka-streams-test-utils-3.1.0.jar:/opt/bitnami/kafka/bin/../libs/kafka-tools-3.1.0.jar:/opt/bitnami/kafka/bin/../libs/kafka_2.12-3.1.0.jar:/opt/bitnami/kafka/bin/../libs/log4j-1.2.17.jar:/opt/bitnami/kafka/bin/../libs/lz4-java-1.8.0.jar:/opt/bitnami/kafka/bin/../libs/maven-artifact-3.8.1.jar:/opt/bitnami/kafka/bin/../libs/metrics-core-2.2.0.jar:/opt/bitnami/kafka/bin/../libs/metrics-core-4.1.12.1.jar:/opt/bitnami/kafka/bin/../libs/netty-buffer-4.1.68.Final.jar:/opt/bitnami/kafka/bin/../libs/netty-codec-4.1.68.Final.jar:/opt/bitnami/kafka/bin/../libs/netty-common-4.1.68.Final.jar:/opt/bitnami/kafka/bin/../libs/netty-handler-4.1.68.Final.jar:/opt/bitnami/kafka/bin/../libs/netty-resolver-4.1.68.Final.jar:/opt/bitnami/kafka/bin/../libs/netty-transport-4.1.68.Final.jar:/opt/bitnami/kafka/bin/../libs/netty-transport-native-epoll-4.1.68.Final.jar:/opt/bitnami/kafka/bin/../libs/netty-transport-native-unix-common-4.1.68.Final.jar:/opt/bitnami/kafka/bin/../libs/osgi-resource-locator-1.0.3.jar:/opt/bitnami/kafka/bin/../libs/paranamer-2.8.jar:/opt/bitnami/kafka/bin/../libs/plexus-utils-3.2.1.jar:/opt/bitnami/kafka/bin/../libs/reflections-0.9.12.jar:/opt/bitnami/kafka/bin/../libs/rocksdbjni-6.22.1.1.jar:/opt/bitnami/kafka/bin/../libs/scala-collection-compat_2.12-2.4.4.jar:/opt/bitnami/kafka/bin/../libs/scala-java8-compat_2.12-1.0.0.jar:/opt/bitnami/kafka/bin/../libs/scala-library-2.12.14.jar:/opt/bitnami/kafka/bin/../libs/scala-logging_2.12-3.9.3.jar:/opt/bitnami/kafka/bin/../libs/scala-reflect-2.12.14.jar:/opt/bitnami/kafka/bin/../libs/slf4j-api-1.7.30.jar:/opt/bitnami/kafka/bin/../libs/slf4j-log4j12-1.7.30.jar:/opt/bitnami/kafka/bin/../libs/snappy-java-1.1.8.4.jar:/opt/bitnami/kafka/bin/../libs/trogdor-3.1.0.jar:/opt/bitnami/kafka/bin/../libs/zookeeper-3.6.3.jar:/opt/bitnami/kafka/bin/../libs/zookeeper-jute-3.6.3.jar:/opt/bitnami/kafka/bin/../libs/zstd-jni-1.5.0-4.jar (org.apache.zookeeper.ZooKeeper)
[2022-03-14 18:47:06,750] INFO Client environment:java.library.path=/usr/java/packages/lib:/usr/lib64:/lib64:/lib:/usr/lib (org.apache.zookeeper.ZooKeeper)
[2022-03-14 18:47:06,750] INFO Client environment:java.io.tmpdir=/tmp (org.apache.zookeeper.ZooKeeper)
[2022-03-14 18:47:06,750] INFO Client environment:java.compiler=<NA> (org.apache.zookeeper.ZooKeeper)
[2022-03-14 18:47:06,750] INFO Client environment:os.name=Linux (org.apache.zookeeper.ZooKeeper)
[2022-03-14 18:47:06,750] INFO Client environment:os.arch=amd64 (org.apache.zookeeper.ZooKeeper)
[2022-03-14 18:47:06,751] INFO Client environment:os.version=5.10.76-linuxkit (org.apache.zookeeper.ZooKeeper)
[2022-03-14 18:47:06,751] INFO Client environment:user.name=? (org.apache.zookeeper.ZooKeeper)
[2022-03-14 18:47:06,751] INFO Client environment:user.home=? (org.apache.zookeeper.ZooKeeper)
[2022-03-14 18:47:06,751] INFO Client environment:user.dir=/ (org.apache.zookeeper.ZooKeeper)
[2022-03-14 18:47:06,751] INFO Client environment:os.memory.free=1009MB (org.apache.zookeeper.ZooKeeper)
[2022-03-14 18:47:06,751] INFO Client environment:os.memory.max=1024MB (org.apache.zookeeper.ZooKeeper)
[2022-03-14 18:47:06,751] INFO Client environment:os.memory.total=1024MB (org.apache.zookeeper.ZooKeeper)
[2022-03-14 18:47:06,754] INFO Initiating client connection, connectString=kafka-zookeeper sessionTimeout=18000 watcher=kafka.zookeeper.ZooKeeperClient$ZooKeeperClientWatcher$@42463763 (org.apache.zookeeper.ZooKeeper)
[2022-03-14 18:47:06,759] INFO jute.maxbuffer value is 4194304 Bytes (org.apache.zookeeper.ClientCnxnSocket)
[2022-03-14 18:47:06,766] INFO zookeeper.request.timeout value is 0. feature enabled=false (org.apache.zookeeper.ClientCnxn)
[2022-03-14 18:47:06,770] INFO [ZooKeeperClient Kafka server] Waiting until connected. (kafka.zookeeper.ZooKeeperClient)
[2022-03-14 18:47:06,782] INFO Opening socket connection to server kafka-zookeeper/10.43.243.35:2181. (org.apache.zookeeper.ClientCnxn)
[2022-03-14 18:47:06,785] INFO SASL config status: Will not attempt to authenticate using SASL (unknown error) (org.apache.zookeeper.ClientCnxn)
[2022-03-14 18:47:06,790] INFO Socket connection established, initiating session, client: /192.168.58.10:53272, server: kafka-zookeeper/10.43.243.35:2181 (org.apache.zookeeper.ClientCnxn)
[2022-03-14 18:47:06,811] INFO Session establishment complete on server kafka-zookeeper/10.43.243.35:2181, session id = 0x1001f5c8b050000, negotiated timeout = 18000 (org.apache.zookeeper.ClientCnxn)
[2022-03-14 18:47:06,816] INFO [ZooKeeperClient Kafka server] Connected. (kafka.zookeeper.ZooKeeperClient)
[2022-03-14 18:47:06,941] INFO [feature-zk-node-event-process-thread]: Starting (kafka.server.FinalizedFeatureChangeListener$ChangeNotificationProcessorThread)
[2022-03-14 18:47:06,956] INFO Feature ZK node at path: /feature does not exist (kafka.server.FinalizedFeatureChangeListener)
[2022-03-14 18:47:06,956] INFO Cleared cache (kafka.server.FinalizedFeatureCache)
[2022-03-14 18:47:07,142] INFO Cluster ID = 2qnYfDp5TQidUho8LioBSw (kafka.server.KafkaServer)
[2022-03-14 18:47:07,146] WARN No meta.properties file under dir /bitnami/kafka/data/meta.properties (kafka.server.BrokerMetadataCheckpoint)
[2022-03-14 18:47:07,241] INFO KafkaConfig values: 
	advertised.listeners = INTERNAL://kafka-0.kafka-headless.kafka.svc.cluster.local:9093,CLIENT://kafka-0.kafka-headless.kafka.svc.cluster.local:9092
	alter.config.policy.class.name = null
	alter.log.dirs.replication.quota.window.num = 11
	alter.log.dirs.replication.quota.window.size.seconds = 1
	authorizer.class.name = 
	auto.create.topics.enable = true
	auto.leader.rebalance.enable = true
	background.threads = 10
	broker.heartbeat.interval.ms = 2000
	broker.id = 0
	broker.id.generation.enable = true
	broker.rack = null
	broker.session.timeout.ms = 9000
	client.quota.callback.class = null
	compression.type = producer
	connection.failed.authentication.delay.ms = 100
	connections.max.idle.ms = 600000
	connections.max.reauth.ms = 0
	control.plane.listener.name = null
	controlled.shutdown.enable = true
	controlled.shutdown.max.retries = 3
	controlled.shutdown.retry.backoff.ms = 5000
	controller.listener.names = null
	controller.quorum.append.linger.ms = 25
	controller.quorum.election.backoff.max.ms = 1000
	controller.quorum.election.timeout.ms = 1000
	controller.quorum.fetch.timeout.ms = 2000
	controller.quorum.request.timeout.ms = 2000
	controller.quorum.retry.backoff.ms = 20
	controller.quorum.voters = []
	controller.quota.window.num = 11
	controller.quota.window.size.seconds = 1
	controller.socket.timeout.ms = 30000
	create.topic.policy.class.name = null
	default.replication.factor = 1
	delegation.token.expiry.check.interval.ms = 3600000
	delegation.token.expiry.time.ms = 86400000
	delegation.token.master.key = null
	delegation.token.max.lifetime.ms = 604800000
	delegation.token.secret.key = null
	delete.records.purgatory.purge.interval.requests = 1
	delete.topic.enable = true
	fetch.max.bytes = 57671680
	fetch.purgatory.purge.interval.requests = 1000
	group.initial.rebalance.delay.ms = 0
	group.max.session.timeout.ms = 1800000
	group.max.size = 2147483647
	group.min.session.timeout.ms = 6000
	initial.broker.registration.timeout.ms = 60000
	inter.broker.listener.name = INTERNAL
	inter.broker.protocol.version = 3.1-IV0
	kafka.metrics.polling.interval.secs = 10
	kafka.metrics.reporters = []
	leader.imbalance.check.interval.seconds = 300
	leader.imbalance.per.broker.percentage = 10
	listener.security.protocol.map = INTERNAL:SSL,CLIENT:SSL
	listeners = INTERNAL://:9093,CLIENT://:9092
	log.cleaner.backoff.ms = 15000
	log.cleaner.dedupe.buffer.size = 134217728
	log.cleaner.delete.retention.ms = 86400000
	log.cleaner.enable = true
	log.cleaner.io.buffer.load.factor = 0.9
	log.cleaner.io.buffer.size = 524288
	log.cleaner.io.max.bytes.per.second = 1.7976931348623157E308
	log.cleaner.max.compaction.lag.ms = 9223372036854775807
	log.cleaner.min.cleanable.ratio = 0.5
	log.cleaner.min.compaction.lag.ms = 0
	log.cleaner.threads = 1
	log.cleanup.policy = [delete]
	log.dir = /tmp/kafka-logs
	log.dirs = /bitnami/kafka/data
	log.flush.interval.messages = 10000
	log.flush.interval.ms = 1000
	log.flush.offset.checkpoint.interval.ms = 60000
	log.flush.scheduler.interval.ms = 9223372036854775807
	log.flush.start.offset.checkpoint.interval.ms = 60000
	log.index.interval.bytes = 4096
	log.index.size.max.bytes = 10485760
	log.message.downconversion.enable = true
	log.message.format.version = 3.0-IV1
	log.message.timestamp.difference.max.ms = 9223372036854775807
	log.message.timestamp.type = CreateTime
	log.preallocate = false
	log.retention.bytes = 1073741824
	log.retention.check.interval.ms = 300000
	log.retention.hours = 168
	log.retention.minutes = null
	log.retention.ms = null
	log.roll.hours = 168
	log.roll.jitter.hours = 0
	log.roll.jitter.ms = null
	log.roll.ms = null
	log.segment.bytes = 1073741824
	log.segment.delete.delay.ms = 60000
	max.connection.creation.rate = 2147483647
	max.connections = 2147483647
	max.connections.per.ip = 2147483647
	max.connections.per.ip.overrides = 
	max.incremental.fetch.session.cache.slots = 1000
	message.max.bytes = 1000012
	metadata.log.dir = null
	metadata.log.max.record.bytes.between.snapshots = 20971520
	metadata.log.segment.bytes = 1073741824
	metadata.log.segment.min.bytes = 8388608
	metadata.log.segment.ms = 604800000
	metadata.max.retention.bytes = -1
	metadata.max.retention.ms = 604800000
	metric.reporters = []
	metrics.num.samples = 2
	metrics.recording.level = INFO
	metrics.sample.window.ms = 30000
	min.insync.replicas = 1
	node.id = 0
	num.io.threads = 8
	num.network.threads = 3
	num.partitions = 1
	num.recovery.threads.per.data.dir = 1
	num.replica.alter.log.dirs.threads = null
	num.replica.fetchers = 1
	offset.metadata.max.bytes = 4096
	offsets.commit.required.acks = -1
	offsets.commit.timeout.ms = 5000
	offsets.load.buffer.size = 5242880
	offsets.retention.check.interval.ms = 600000
	offsets.retention.minutes = 10080
	offsets.topic.compression.codec = 0
	offsets.topic.num.partitions = 50
	offsets.topic.replication.factor = 1
	offsets.topic.segment.bytes = 104857600
	password.encoder.cipher.algorithm = AES/CBC/PKCS5Padding
	password.encoder.iterations = 4096
	password.encoder.key.length = 128
	password.encoder.keyfactory.algorithm = null
	password.encoder.old.secret = null
	password.encoder.secret = null
	principal.builder.class = class org.apache.kafka.common.security.authenticator.DefaultKafkaPrincipalBuilder
	process.roles = []
	producer.purgatory.purge.interval.requests = 1000
	queued.max.request.bytes = -1
	queued.max.requests = 500
	quota.window.num = 11
	quota.window.size.seconds = 1
	remote.log.index.file.cache.total.size.bytes = 1073741824
	remote.log.manager.task.interval.ms = 30000
	remote.log.manager.task.retry.backoff.max.ms = 30000
	remote.log.manager.task.retry.backoff.ms = 500
	remote.log.manager.task.retry.jitter = 0.2
	remote.log.manager.thread.pool.size = 10
	remote.log.metadata.manager.class.name = null
	remote.log.metadata.manager.class.path = null
	remote.log.metadata.manager.impl.prefix = null
	remote.log.metadata.manager.listener.name = null
	remote.log.reader.max.pending.tasks = 100
	remote.log.reader.threads = 10
	remote.log.storage.manager.class.name = null
	remote.log.storage.manager.class.path = null
	remote.log.storage.manager.impl.prefix = null
	remote.log.storage.system.enable = false
	replica.fetch.backoff.ms = 1000
	replica.fetch.max.bytes = 1048576
	replica.fetch.min.bytes = 1
	replica.fetch.response.max.bytes = 10485760
	replica.fetch.wait.max.ms = 500
	replica.high.watermark.checkpoint.interval.ms = 5000
	replica.lag.time.max.ms = 30000
	replica.selector.class = null
	replica.socket.receive.buffer.bytes = 65536
	replica.socket.timeout.ms = 30000
	replication.quota.window.num = 11
	replication.quota.window.size.seconds = 1
	request.timeout.ms = 30000
	reserved.broker.max.id = 1000
	sasl.client.callback.handler.class = null
	sasl.enabled.mechanisms = [PLAIN, SCRAM-SHA-256, SCRAM-SHA-512]
	sasl.jaas.config = null
	sasl.kerberos.kinit.cmd = /usr/bin/kinit
	sasl.kerberos.min.time.before.relogin = 60000
	sasl.kerberos.principal.to.local.rules = [DEFAULT]
	sasl.kerberos.service.name = null
	sasl.kerberos.ticket.renew.jitter = 0.05
	sasl.kerberos.ticket.renew.window.factor = 0.8
	sasl.login.callback.handler.class = null
	sasl.login.class = null
	sasl.login.connect.timeout.ms = null
	sasl.login.read.timeout.ms = null
	sasl.login.refresh.buffer.seconds = 300
	sasl.login.refresh.min.period.seconds = 60
	sasl.login.refresh.window.factor = 0.8
	sasl.login.refresh.window.jitter = 0.05
	sasl.login.retry.backoff.max.ms = 10000
	sasl.login.retry.backoff.ms = 100
	sasl.mechanism.controller.protocol = GSSAPI
	sasl.mechanism.inter.broker.protocol = 
	sasl.oauthbearer.clock.skew.seconds = 30
	sasl.oauthbearer.expected.audience = null
	sasl.oauthbearer.expected.issuer = null
	sasl.oauthbearer.jwks.endpoint.refresh.ms = 3600000
	sasl.oauthbearer.jwks.endpoint.retry.backoff.max.ms = 10000
	sasl.oauthbearer.jwks.endpoint.retry.backoff.ms = 100
	sasl.oauthbearer.jwks.endpoint.url = null
	sasl.oauthbearer.scope.claim.name = scope
	sasl.oauthbearer.sub.claim.name = sub
	sasl.oauthbearer.token.endpoint.url = null
	sasl.server.callback.handler.class = null
	security.inter.broker.protocol = PLAINTEXT
	security.providers = null
	socket.connection.setup.timeout.max.ms = 30000
	socket.connection.setup.timeout.ms = 10000
	socket.receive.buffer.bytes = 102400
	socket.request.max.bytes = 104857600
	socket.send.buffer.bytes = 102400
	ssl.cipher.suites = []
	ssl.client.auth = none
	ssl.enabled.protocols = [TLSv1.2, TLSv1.3]
	ssl.endpoint.identification.algorithm = https
	ssl.engine.factory.class = null
	ssl.key.password = null
	ssl.keymanager.algorithm = SunX509
	ssl.keystore.certificate.chain = [hidden]
	ssl.keystore.key = [hidden]
	ssl.keystore.location = null
	ssl.keystore.password = null
	ssl.keystore.type = PEM
	ssl.principal.mapping.rules = DEFAULT
	ssl.protocol = TLSv1.3
	ssl.provider = null
	ssl.secure.random.implementation = null
	ssl.trustmanager.algorithm = PKIX
	ssl.truststore.certificates = [hidden]
	ssl.truststore.location = null
	ssl.truststore.password = null
	ssl.truststore.type = PEM
	transaction.abort.timed.out.transaction.cleanup.interval.ms = 10000
	transaction.max.timeout.ms = 900000
	transaction.remove.expired.transaction.cleanup.interval.ms = 3600000
	transaction.state.log.load.buffer.size = 5242880
	transaction.state.log.min.isr = 1
	transaction.state.log.num.partitions = 50
	transaction.state.log.replication.factor = 1
	transaction.state.log.segment.bytes = 104857600
	transactional.id.expiration.ms = 604800000
	unclean.leader.election.enable = false
	zookeeper.clientCnxnSocket = null
	zookeeper.connect = kafka-zookeeper
	zookeeper.connection.timeout.ms = 6000
	zookeeper.max.in.flight.requests = 10
	zookeeper.session.timeout.ms = 18000
	zookeeper.set.acl = false
	zookeeper.ssl.cipher.suites = null
	zookeeper.ssl.client.enable = false
	zookeeper.ssl.crl.enable = false
	zookeeper.ssl.enabled.protocols = null
	zookeeper.ssl.endpoint.identification.algorithm = HTTPS
	zookeeper.ssl.keystore.location = null
	zookeeper.ssl.keystore.password = null
	zookeeper.ssl.keystore.type = null
	zookeeper.ssl.ocsp.enable = false
	zookeeper.ssl.protocol = TLSv1.2
	zookeeper.ssl.truststore.location = null
	zookeeper.ssl.truststore.password = null
	zookeeper.ssl.truststore.type = null
	zookeeper.sync.time.ms = 2000
 (kafka.server.KafkaConfig)
[2022-03-14 18:47:07,254] INFO KafkaConfig values: 
	advertised.listeners = INTERNAL://kafka-0.kafka-headless.kafka.svc.cluster.local:9093,CLIENT://kafka-0.kafka-headless.kafka.svc.cluster.local:9092
	alter.config.policy.class.name = null
	alter.log.dirs.replication.quota.window.num = 11
	alter.log.dirs.replication.quota.window.size.seconds = 1
	authorizer.class.name = 
	auto.create.topics.enable = true
	auto.leader.rebalance.enable = true
	background.threads = 10
	broker.heartbeat.interval.ms = 2000
	broker.id = 0
	broker.id.generation.enable = true
	broker.rack = null
	broker.session.timeout.ms = 9000
	client.quota.callback.class = null
	compression.type = producer
	connection.failed.authentication.delay.ms = 100
	connections.max.idle.ms = 600000
	connections.max.reauth.ms = 0
	control.plane.listener.name = null
	controlled.shutdown.enable = true
	controlled.shutdown.max.retries = 3
	controlled.shutdown.retry.backoff.ms = 5000
	controller.listener.names = null
	controller.quorum.append.linger.ms = 25
	controller.quorum.election.backoff.max.ms = 1000
	controller.quorum.election.timeout.ms = 1000
	controller.quorum.fetch.timeout.ms = 2000
	controller.quorum.request.timeout.ms = 2000
	controller.quorum.retry.backoff.ms = 20
	controller.quorum.voters = []
	controller.quota.window.num = 11
	controller.quota.window.size.seconds = 1
	controller.socket.timeout.ms = 30000
	create.topic.policy.class.name = null
	default.replication.factor = 1
	delegation.token.expiry.check.interval.ms = 3600000
	delegation.token.expiry.time.ms = 86400000
	delegation.token.master.key = null
	delegation.token.max.lifetime.ms = 604800000
	delegation.token.secret.key = null
	delete.records.purgatory.purge.interval.requests = 1
	delete.topic.enable = true
	fetch.max.bytes = 57671680
	fetch.purgatory.purge.interval.requests = 1000
	group.initial.rebalance.delay.ms = 0
	group.max.session.timeout.ms = 1800000
	group.max.size = 2147483647
	group.min.session.timeout.ms = 6000
	initial.broker.registration.timeout.ms = 60000
	inter.broker.listener.name = INTERNAL
	inter.broker.protocol.version = 3.1-IV0
	kafka.metrics.polling.interval.secs = 10
	kafka.metrics.reporters = []
	leader.imbalance.check.interval.seconds = 300
	leader.imbalance.per.broker.percentage = 10
	listener.security.protocol.map = INTERNAL:SSL,CLIENT:SSL
	listeners = INTERNAL://:9093,CLIENT://:9092
	log.cleaner.backoff.ms = 15000
	log.cleaner.dedupe.buffer.size = 134217728
	log.cleaner.delete.retention.ms = 86400000
	log.cleaner.enable = true
	log.cleaner.io.buffer.load.factor = 0.9
	log.cleaner.io.buffer.size = 524288
	log.cleaner.io.max.bytes.per.second = 1.7976931348623157E308
	log.cleaner.max.compaction.lag.ms = 9223372036854775807
	log.cleaner.min.cleanable.ratio = 0.5
	log.cleaner.min.compaction.lag.ms = 0
	log.cleaner.threads = 1
	log.cleanup.policy = [delete]
	log.dir = /tmp/kafka-logs
	log.dirs = /bitnami/kafka/data
	log.flush.interval.messages = 10000
	log.flush.interval.ms = 1000
	log.flush.offset.checkpoint.interval.ms = 60000
	log.flush.scheduler.interval.ms = 9223372036854775807
	log.flush.start.offset.checkpoint.interval.ms = 60000
	log.index.interval.bytes = 4096
	log.index.size.max.bytes = 10485760
	log.message.downconversion.enable = true
	log.message.format.version = 3.0-IV1
	log.message.timestamp.difference.max.ms = 9223372036854775807
	log.message.timestamp.type = CreateTime
	log.preallocate = false
	log.retention.bytes = 1073741824
	log.retention.check.interval.ms = 300000
	log.retention.hours = 168
	log.retention.minutes = null
	log.retention.ms = null
	log.roll.hours = 168
	log.roll.jitter.hours = 0
	log.roll.jitter.ms = null
	log.roll.ms = null
	log.segment.bytes = 1073741824
	log.segment.delete.delay.ms = 60000
	max.connection.creation.rate = 2147483647
	max.connections = 2147483647
	max.connections.per.ip = 2147483647
	max.connections.per.ip.overrides = 
	max.incremental.fetch.session.cache.slots = 1000
	message.max.bytes = 1000012
	metadata.log.dir = null
	metadata.log.max.record.bytes.between.snapshots = 20971520
	metadata.log.segment.bytes = 1073741824
	metadata.log.segment.min.bytes = 8388608
	metadata.log.segment.ms = 604800000
	metadata.max.retention.bytes = -1
	metadata.max.retention.ms = 604800000
	metric.reporters = []
	metrics.num.samples = 2
	metrics.recording.level = INFO
	metrics.sample.window.ms = 30000
	min.insync.replicas = 1
	node.id = 0
	num.io.threads = 8
	num.network.threads = 3
	num.partitions = 1
	num.recovery.threads.per.data.dir = 1
	num.replica.alter.log.dirs.threads = null
	num.replica.fetchers = 1
	offset.metadata.max.bytes = 4096
	offsets.commit.required.acks = -1
	offsets.commit.timeout.ms = 5000
	offsets.load.buffer.size = 5242880
	offsets.retention.check.interval.ms = 600000
	offsets.retention.minutes = 10080
	offsets.topic.compression.codec = 0
	offsets.topic.num.partitions = 50
	offsets.topic.replication.factor = 1
	offsets.topic.segment.bytes = 104857600
	password.encoder.cipher.algorithm = AES/CBC/PKCS5Padding
	password.encoder.iterations = 4096
	password.encoder.key.length = 128
	password.encoder.keyfactory.algorithm = null
	password.encoder.old.secret = null
	password.encoder.secret = null
	principal.builder.class = class org.apache.kafka.common.security.authenticator.DefaultKafkaPrincipalBuilder
	process.roles = []
	producer.purgatory.purge.interval.requests = 1000
	queued.max.request.bytes = -1
	queued.max.requests = 500
	quota.window.num = 11
	quota.window.size.seconds = 1
	remote.log.index.file.cache.total.size.bytes = 1073741824
	remote.log.manager.task.interval.ms = 30000
	remote.log.manager.task.retry.backoff.max.ms = 30000
	remote.log.manager.task.retry.backoff.ms = 500
	remote.log.manager.task.retry.jitter = 0.2
	remote.log.manager.thread.pool.size = 10
	remote.log.metadata.manager.class.name = null
	remote.log.metadata.manager.class.path = null
	remote.log.metadata.manager.impl.prefix = null
	remote.log.metadata.manager.listener.name = null
	remote.log.reader.max.pending.tasks = 100
	remote.log.reader.threads = 10
	remote.log.storage.manager.class.name = null
	remote.log.storage.manager.class.path = null
	remote.log.storage.manager.impl.prefix = null
	remote.log.storage.system.enable = false
	replica.fetch.backoff.ms = 1000
	replica.fetch.max.bytes = 1048576
	replica.fetch.min.bytes = 1
	replica.fetch.response.max.bytes = 10485760
	replica.fetch.wait.max.ms = 500
	replica.high.watermark.checkpoint.interval.ms = 5000
	replica.lag.time.max.ms = 30000
	replica.selector.class = null
	replica.socket.receive.buffer.bytes = 65536
	replica.socket.timeout.ms = 30000
	replication.quota.window.num = 11
	replication.quota.window.size.seconds = 1
	request.timeout.ms = 30000
	reserved.broker.max.id = 1000
	sasl.client.callback.handler.class = null
	sasl.enabled.mechanisms = [PLAIN, SCRAM-SHA-256, SCRAM-SHA-512]
	sasl.jaas.config = null
	sasl.kerberos.kinit.cmd = /usr/bin/kinit
	sasl.kerberos.min.time.before.relogin = 60000
	sasl.kerberos.principal.to.local.rules = [DEFAULT]
	sasl.kerberos.service.name = null
	sasl.kerberos.ticket.renew.jitter = 0.05
	sasl.kerberos.ticket.renew.window.factor = 0.8
	sasl.login.callback.handler.class = null
	sasl.login.class = null
	sasl.login.connect.timeout.ms = null
	sasl.login.read.timeout.ms = null
	sasl.login.refresh.buffer.seconds = 300
	sasl.login.refresh.min.period.seconds = 60
	sasl.login.refresh.window.factor = 0.8
	sasl.login.refresh.window.jitter = 0.05
	sasl.login.retry.backoff.max.ms = 10000
	sasl.login.retry.backoff.ms = 100
	sasl.mechanism.controller.protocol = GSSAPI
	sasl.mechanism.inter.broker.protocol = 
	sasl.oauthbearer.clock.skew.seconds = 30
	sasl.oauthbearer.expected.audience = null
	sasl.oauthbearer.expected.issuer = null
	sasl.oauthbearer.jwks.endpoint.refresh.ms = 3600000
	sasl.oauthbearer.jwks.endpoint.retry.backoff.max.ms = 10000
	sasl.oauthbearer.jwks.endpoint.retry.backoff.ms = 100
	sasl.oauthbearer.jwks.endpoint.url = null
	sasl.oauthbearer.scope.claim.name = scope
	sasl.oauthbearer.sub.claim.name = sub
	sasl.oauthbearer.token.endpoint.url = null
	sasl.server.callback.handler.class = null
	security.inter.broker.protocol = PLAINTEXT
	security.providers = null
	socket.connection.setup.timeout.max.ms = 30000
	socket.connection.setup.timeout.ms = 10000
	socket.receive.buffer.bytes = 102400
	socket.request.max.bytes = 104857600
	socket.send.buffer.bytes = 102400
	ssl.cipher.suites = []
	ssl.client.auth = none
	ssl.enabled.protocols = [TLSv1.2, TLSv1.3]
	ssl.endpoint.identification.algorithm = https
	ssl.engine.factory.class = null
	ssl.key.password = null
	ssl.keymanager.algorithm = SunX509
	ssl.keystore.certificate.chain = [hidden]
	ssl.keystore.key = [hidden]
	ssl.keystore.location = null
	ssl.keystore.password = null
	ssl.keystore.type = PEM
	ssl.principal.mapping.rules = DEFAULT
	ssl.protocol = TLSv1.3
	ssl.provider = null
	ssl.secure.random.implementation = null
	ssl.trustmanager.algorithm = PKIX
	ssl.truststore.certificates = [hidden]
	ssl.truststore.location = null
	ssl.truststore.password = null
	ssl.truststore.type = PEM
	transaction.abort.timed.out.transaction.cleanup.interval.ms = 10000
	transaction.max.timeout.ms = 900000
	transaction.remove.expired.transaction.cleanup.interval.ms = 3600000
	transaction.state.log.load.buffer.size = 5242880
	transaction.state.log.min.isr = 1
	transaction.state.log.num.partitions = 50
	transaction.state.log.replication.factor = 1
	transaction.state.log.segment.bytes = 104857600
	transactional.id.expiration.ms = 604800000
	unclean.leader.election.enable = false
	zookeeper.clientCnxnSocket = null
	zookeeper.connect = kafka-zookeeper
	zookeeper.connection.timeout.ms = 6000
	zookeeper.max.in.flight.requests = 10
	zookeeper.session.timeout.ms = 18000
	zookeeper.set.acl = false
	zookeeper.ssl.cipher.suites = null
	zookeeper.ssl.client.enable = false
	zookeeper.ssl.crl.enable = false
	zookeeper.ssl.enabled.protocols = null
	zookeeper.ssl.endpoint.identification.algorithm = HTTPS
	zookeeper.ssl.keystore.location = null
	zookeeper.ssl.keystore.password = null
	zookeeper.ssl.keystore.type = null
	zookeeper.ssl.ocsp.enable = false
	zookeeper.ssl.protocol = TLSv1.2
	zookeeper.ssl.truststore.location = null
	zookeeper.ssl.truststore.password = null
	zookeeper.ssl.truststore.type = null
	zookeeper.sync.time.ms = 2000
 (kafka.server.KafkaConfig)

@rhansen2
Copy link
Collaborator

Thanks @curtisr7, unfortunately I don't see anything interesting in the differences between your set-up and the testing set-up I'm using from https://github.com/vdesabou/kafka-docker-playground (environment/sasl-ssl).

Are you able to reproduce the issue using that docker environment in the kafka-docker-playground repo?

Are you able to determine if the Kafka broker is closing the connections?

Additionally, your full code snippet which reproduces the issue may be helpful.

Thanks!

@rhansen2
Copy link
Collaborator

It may also be helpful to know if Writes are attempted multiple times via the Writer if they eventually succeed.

@curtisr7
Copy link

Are you able to reproduce the issue using that docker environment in the kafka-docker-playground repo?

No, I didn't get the time this afternoon to look into that test framework.

Are you able to determine if the Kafka broker is closing the connections?

In my kafka pod log I'm seeing:

[2022-03-14 22:20:37,330] INFO [SocketServer listenerType=ZK_BROKER, nodeId=0] Failed authentication with /192.168.58.40 (SSL handshake failed) (org.apache.kafka.common.network.Selector)

It may also be helpful to know if Writes are attempted multiple times via the Writer if they eventually succeed.

No, that doesn't seem to matter. To test I tried writing using the same writer 10x, with 1sec sleep and same result.

Also, as I mentioned before I'm not using sasl.. so my scenario is slightly different. I'm setting dialer.TLS.RootCAs (with self signed cert), and dialer.TLS.InsecureSkipVerify=true (not sure if that is important or not)

@rhansen2
Copy link
Collaborator

rhansen2 commented Mar 14, 2022

Thanks! The SSL Handshake failure is interesting. One of the differences between the dialer type and the writer is the conn manually performs the handshake but the Writer relies on the tls library performing the handshake on its first write. Looking at a packet capture of two connection methods might help diagnose what's going on.

@curtisr7
Copy link

Looking at a packet capture of two connection methods might help diagnose what's going on.

Here are two tcpdumps... though not really sure how to interpret those things. Hopefully they make some sense to you.

Let me know what else I can collect. Thanks

tcpdump-success.log
tcpdump.log

@rhansen2
Copy link
Collaborator

Thanks @curtisr7.
The only differences I can find in the captures you sent are the success seems to be using 192-168-58-10.kafka.kafka.svc.cluster.local.9092 vs kafka-0.kafka-headless.kafka.svc.cluster.local.9092 for the failure but based on the DNS resolution in the failure logs those should point to the same IP. Since there's no log of the dns resolution of the success log idk if that's related. Without the TLS details in the capture it's hard to tell if there's any TLS issues going on.

Are you able to provide the code you're using to create your writer vs dialer?

@curtisr7
Copy link

Are you able to provide the code you're using to create your writer vs dialer?

In responding to this question I think I found the problem.

In both cases I was creating a dialer in the same manner. When creating the writer I was using:

w = &kafka.Writer{
	Addr: kafka.TCP(d.brokers...),
	Transport: &kafka.Transport{
		Dial: dialer.DialFunc,
	},
	Topic: event.Topic,
}

but I found if I use the follow(note TLS: dialer.TLS), my writes are successful

w = &kafka.Writer{
	Addr: kafka.TCP(d.brokers...),
	Transport: &kafka.Transport{
		Dial: dialer.DialFunc,
		TLS:  dialer.TLS,
	},
	Topic: event.Topic,
}

@curtisr7
Copy link

On closer inspection I need to create the transport something like this

	transport := &kafka.Transport{
		Dial:    dialer.DialFunc,
		TLS:     dialer.TLS,
		SASL:    dialer.SASLMechanism,
		Context: ctx, // seems weird
	}

@rhansen2
Copy link
Collaborator

I don't think you should need to set the Context or Dial on the Transport for things to work.

@curtisr7
Copy link

I don't think you should need to set the Context or Dial on the Transport for things to work.

+1

@rhansen2
Copy link
Collaborator

@lxxwilliam @tong3jie
I believe I may have found something related to this issue. Would you be able to try out https://github.com/rhansen2/kafka-go/tree/transport-saslv0 and let me know if it help?

Thanks!

@Jrmy2402
Copy link

It's working for me with your version! Thanks a lot :)

@rhansen2
Copy link
Collaborator

rhansen2 commented Apr 1, 2022

Fixed via #869

@rhansen2 rhansen2 closed this as completed Apr 1, 2022
@lxxwilliam
Copy link

@lxxwilliam @tong3jie I believe I may have found something related to this issue. Would you be able to try out https://github.com/rhansen2/kafka-go/tree/transport-saslv0 and let me know if it help?

Thanks!

I met another problems when I used v0.4.31,
the writer always send an error named "read tcp xxx.xxx.xxx.xxx:xxxx->xxx.xxx.xxx.xxx:xxxx: i/o timeout",
but when I used the old version like v0.3.7, the writer can work.
please check it , Thanks a lot :)

@rhansen2
Copy link
Collaborator

rhansen2 commented May 6, 2022

@lxxwilliam Please open a new issue and provide a runnable reproduction case.

Thanks!

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
Development

No branches or pull requests

7 participants