@@ -3,6 +3,7 @@ package kafka
3
3
import (
4
4
"context"
5
5
"crypto/tls"
6
+ "errors"
6
7
"fmt"
7
8
"io"
8
9
"net"
@@ -276,23 +277,23 @@ func (d *Dialer) connect(ctx context.Context, network, address string, connCfg C
276
277
277
278
c , err := d .dialContext (ctx , network , address )
278
279
if err != nil {
279
- return nil , err
280
+ return nil , fmt . Errorf ( "failed to dial: %w" , err )
280
281
}
281
282
282
283
conn := NewConnWith (c , connCfg )
283
284
284
285
if d .SASLMechanism != nil {
285
286
host , port , err := splitHostPortNumber (address )
286
287
if err != nil {
287
- return nil , err
288
+ return nil , fmt . Errorf ( "could not determine host/port for SASL authentication: %w" , err )
288
289
}
289
290
metadata := & sasl.Metadata {
290
291
Host : host ,
291
292
Port : port ,
292
293
}
293
294
if err := d .authenticateSASL (sasl .WithMetadata (ctx , metadata ), conn ); err != nil {
294
295
_ = conn .Close ()
295
- return nil , err
296
+ return nil , fmt . Errorf ( "could not successfully authenticate to %s:%d with SASL: %w" , host , port , err )
296
297
}
297
298
}
298
299
@@ -307,19 +308,19 @@ func (d *Dialer) connect(ctx context.Context, network, address string, connCfg C
307
308
// responsibility of the caller.
308
309
func (d * Dialer ) authenticateSASL (ctx context.Context , conn * Conn ) error {
309
310
if err := conn .saslHandshake (d .SASLMechanism .Name ()); err != nil {
310
- return err
311
+ return fmt . Errorf ( "SASL handshake failed: %w" , err )
311
312
}
312
313
313
314
sess , state , err := d .SASLMechanism .Start (ctx )
314
315
if err != nil {
315
- return err
316
+ return fmt . Errorf ( "SASL authentication process could not be started: %w" , err )
316
317
}
317
318
318
319
for completed := false ; ! completed ; {
319
320
challenge , err := conn .saslAuthenticate (state )
320
- switch err {
321
- case nil :
322
- case io .EOF :
321
+ switch {
322
+ case err == nil :
323
+ case errors . Is ( err , io .EOF ) :
323
324
// the broker may communicate a failed exchange by closing the
324
325
// connection (esp. in the case where we're passing opaque sasl
325
326
// data over the wire since there's no protocol info).
@@ -330,7 +331,7 @@ func (d *Dialer) authenticateSASL(ctx context.Context, conn *Conn) error {
330
331
331
332
completed , state , err = sess .Next (ctx , challenge )
332
333
if err != nil {
333
- return err
334
+ return fmt . Errorf ( "SASL authentication process has failed: %w" , err )
334
335
}
335
336
}
336
337
@@ -340,7 +341,7 @@ func (d *Dialer) authenticateSASL(ctx context.Context, conn *Conn) error {
340
341
func (d * Dialer ) dialContext (ctx context.Context , network string , addr string ) (net.Conn , error ) {
341
342
address , err := lookupHost (ctx , addr , d .Resolver )
342
343
if err != nil {
343
- return nil , err
344
+ return nil , fmt . Errorf ( "failed to resolve host: %w" , err )
344
345
}
345
346
346
347
dial := d .DialFunc
@@ -355,7 +356,7 @@ func (d *Dialer) dialContext(ctx context.Context, network string, addr string) (
355
356
356
357
conn , err := dial (ctx , network , address )
357
358
if err != nil {
358
- return nil , err
359
+ return nil , fmt . Errorf ( "failed to open connection to %s: %w" , address , err )
359
360
}
360
361
361
362
if d .TLS != nil {
@@ -469,7 +470,7 @@ func lookupHost(ctx context.Context, address string, resolver Resolver) (string,
469
470
if resolver != nil {
470
471
resolved , err := resolver .LookupHost (ctx , host )
471
472
if err != nil {
472
- return "" , err
473
+ return "" , fmt . Errorf ( "failed to resolve host %s: %w" , host , err )
473
474
}
474
475
475
476
// if the resolver doesn't return anything, we'll fall back on the provided
0 commit comments