Skip to content

Invalid order in SASL authentication call resulting wrong message which trigger connection reset from the server #542

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
yulrizka opened this issue Oct 16, 2020 · 1 comment
Assignees
Labels
bug hacktoberfest-accepted https://hacktoberfest.digitalocean.com/

Comments

@yulrizka
Copy link
Contributor

Describe the bug
using Writer struct with SASL authentication resulted sending the wrong message and made the
server terminate the connection

Kafka Version
Azure EventHub

To Reproduce
Create a writer with SASL and TLS using the struct directly

	mechanism := plain.Mechanism{
		Username: "xxx",
		Password: "xxx",
	}


	transport := &kafka.Transport{
		Dial: (&net.Dialer{
			Timeout:   10 * time.Second,
			DualStack: true,
		}).DialContext,
		SASL: mechanism,
		TLS: &tls.Config{
			InsecureSkipVerify: true,
		},
	}
	w := kafka.Writer{
		Addr:        kafka.TCP("someurl.windows.net:9093"),
		Topic:       ActivityLogTopic,
		Balancer:    &kafka.LeastBytes{},
		ErrorLogger: kafka.LoggerFunc(log.Errorf),
		Logger:      kafka.LoggerFunc(log.Infof),
		Async:       false,
		Transport:   transport,
	}

Expected behavior
Not throwing any error

Additional context
I did some debugging and found that SASL handshake was performed before getting all available kafka api version.

I made a PR https://github.com/segmentio/kafka-go/pull/541/files

More specifically, in this code, the order should be reversed (also aligned with the protocol)

kafka-go/transport.go

Lines 1138 to 1158 in cb3dd79

if g.pool.sasl != nil {
if err := authenticateSASL(ctx, pc, g.pool.sasl); err != nil {
return nil, err
}
}
r, err := pc.RoundTrip(new(apiversions.Request))
if err != nil {
return nil, err
}
res := r.(*apiversions.Response)
ver := make(map[protocol.ApiKey]int16, len(res.ApiKeys))
if res.ErrorCode != 0 {
return nil, fmt.Errorf("negotating API versions with kafka broker at %s: %w", g.addr, Error(res.ErrorCode))
}
for _, r := range res.ApiKeys {
apiKey := protocol.ApiKey(r.ApiKey)
ver[apiKey] = apiKey.SelectVersion(r.MinVersion, r.MaxVersion)
}

Since the order was reverse, the handshake was done with emtpy c.versions which resulted in wrong api 0 instead of 1

func (c *Conn) RoundTrip(msg Message) (Message, error) {
correlationID := atomic.AddInt32(&c.idgen, +1)
versions, _ := c.versions.Load().(map[ApiKey]int16)
apiVersion := versions[msg.ApiKey()]
if p, _ := msg.(PreparedMessage); p != nil {
p.Prepare(apiVersion)
}
return RoundTrip(c, apiVersion, correlationID, c.clientID, msg)
}

@achille-roussel
Copy link
Contributor

Thanks for the detailed bug report @yulrizka!

I merged your change and tagged a release for v0.4.6 containing it 👍

@achille-roussel achille-roussel added the hacktoberfest-accepted https://hacktoberfest.digitalocean.com/ label Oct 16, 2020
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
bug hacktoberfest-accepted https://hacktoberfest.digitalocean.com/
Projects
None yet
Development

No branches or pull requests

2 participants