Skip to content

Commit 364a1ca

Browse files
committed
refactor: decorate more errors
1 parent 9d1beeb commit 364a1ca

9 files changed

+46
-41
lines changed

addoffsetstotxn.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -53,7 +53,7 @@ func (c *Client) AddOffsetsToTxn(
5353
GroupID: req.GroupID,
5454
})
5555
if err != nil {
56-
return nil, fmt.Errorf("kafka.(*Client).AddOffsetsToTxn: %w", err)
56+
return nil, fmt.Errorf("failed to add offsets to transaction: %w", err)
5757
}
5858

5959
r := m.(*addoffsetstotxn.Response)

addpartitionstotxn.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -83,7 +83,7 @@ func (c *Client) AddPartitionsToTxn(
8383

8484
m, err := c.roundTrip(ctx, req.Addr, protoReq)
8585
if err != nil {
86-
return nil, fmt.Errorf("kafka.(*Client).AddPartitionsToTxn: %w", err)
86+
return nil, fmt.Errorf("failed to add partitions to transaction: %w", err)
8787
}
8888

8989
r := m.(*addpartitionstotxn.Response)

alterconfigs.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -86,7 +86,7 @@ func (c *Client) AlterConfigs(ctx context.Context, req *AlterConfigsRequest) (*A
8686
})
8787

8888
if err != nil {
89-
return nil, fmt.Errorf("kafka.(*Client).AlterConfigs: %w", err)
89+
return nil, fmt.Errorf("failed to alter configs: %w", err)
9090
}
9191

9292
res := m.(*alterconfigs.Response)

apiversions.go

+2-1
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@ package kafka
22

33
import (
44
"context"
5+
"fmt"
56
"net"
67

78
"github.com/segmentio/kafka-go/protocol"
@@ -49,7 +50,7 @@ func (c *Client) ApiVersions(
4950
apiReq,
5051
)
5152
if err != nil {
52-
return nil, err
53+
return nil, fmt.Errorf("failed to determine supported API versions from broker: %w", err)
5354
}
5455
apiResp := protoResp.(*apiversions.Response)
5556

balancer.go

+2-1
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
package kafka
22

33
import (
4+
"fmt"
45
"hash"
56
"hash/crc32"
67
"hash/fnv"
@@ -153,7 +154,7 @@ func (h *Hash) Balance(msg Message, partitions ...int) int {
153154

154155
hasher.Reset()
155156
if _, err := hasher.Write(msg.Key); err != nil {
156-
panic(err)
157+
panic(fmt.Errorf("HashBalancer failed to write to hasher: %w", err))
157158
}
158159

159160
// uses same algorithm that Sarama's hashPartitioner uses

batch.go

+7-5
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@ package kafka
33
import (
44
"bufio"
55
"errors"
6+
"fmt"
67
"io"
78
"sync"
89
"time"
@@ -70,7 +71,7 @@ func (batch *Batch) Close() error {
7071
batch.mutex.Lock()
7172
err := batch.close()
7273
batch.mutex.Unlock()
73-
return err
74+
return fmt.Errorf("failed to close batch: %w", err)
7475
}
7576

7677
func (batch *Batch) close() (err error) {
@@ -94,7 +95,8 @@ func (batch *Batch) close() (err error) {
9495
conn.mutex.Unlock()
9596

9697
if err != nil {
97-
if _, ok := err.(Error); !ok && err != io.ErrShortBuffer {
98+
var kafkaError Error
99+
if !errors.As(err, &kafkaError) && !errors.Is(err, io.ErrShortBuffer) {
98100
conn.Close()
99101
}
100102
}
@@ -239,11 +241,11 @@ func (batch *Batch) readMessage(
239241

240242
var lastOffset int64
241243
offset, lastOffset, timestamp, headers, err = batch.msgs.readMessage(batch.offset, key, val)
242-
switch err {
243-
case nil:
244+
switch {
245+
case err == nil:
244246
batch.offset = offset + 1
245247
batch.lastOffset = lastOffset
246-
case errShortRead:
248+
case errors.Is(err, errShortRead):
247249
// As an "optimization" kafka truncates the returned response after
248250
// producing MaxBytes, which could then cause the code to return
249251
// errShortRead.

client.go

+3-2
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@ package kafka
33
import (
44
"context"
55
"errors"
6+
"fmt"
67
"net"
78
"time"
89

@@ -67,7 +68,7 @@ func (c *Client) ConsumerOffsets(ctx context.Context, tg TopicAndGroup) (map[int
6768
})
6869

6970
if err != nil {
70-
return nil, err
71+
return nil, fmt.Errorf("failed to get topic metadata :%w", err)
7172
}
7273

7374
topic := metadata.Topics[0]
@@ -85,7 +86,7 @@ func (c *Client) ConsumerOffsets(ctx context.Context, tg TopicAndGroup) (map[int
8586
})
8687

8788
if err != nil {
88-
return nil, err
89+
return nil, fmt.Errorf("failed to get offsets: %w", err)
8990
}
9091

9192
topicOffsets := offsets.Topics[topic.Name]

conn.go

+28-28
Original file line numberDiff line numberDiff line change
@@ -206,7 +206,7 @@ func NewConnWith(conn net.Conn, config ConnConfig) *Conn {
206206
func (c *Conn) negotiateVersion(key apiKey, sortedSupportedVersions ...apiVersion) (apiVersion, error) {
207207
v, err := c.loadVersions()
208208
if err != nil {
209-
return -1, err
209+
return -1, fmt.Errorf("failed to determine available versions: %w", err)
210210
}
211211
a := v.negotiate(key, sortedSupportedVersions...)
212212
if a < 0 {
@@ -331,10 +331,10 @@ func (c *Conn) findCoordinator(request findCoordinatorRequestV0) (findCoordinato
331331
},
332332
)
333333
if err != nil {
334-
return findCoordinatorResponseV0{}, err
334+
return findCoordinatorResponseV0{}, fmt.Errorf("failed to find coordinator: %w", err)
335335
}
336336
if response.ErrorCode != 0 {
337-
return findCoordinatorResponseV0{}, Error(response.ErrorCode)
337+
return findCoordinatorResponseV0{}, makeError(response.ErrorCode, "failed to find coordinator")
338338
}
339339

340340
return response, nil
@@ -357,10 +357,10 @@ func (c *Conn) heartbeat(request heartbeatRequestV0) (heartbeatResponseV0, error
357357
},
358358
)
359359
if err != nil {
360-
return heartbeatResponseV0{}, err
360+
return heartbeatResponseV0{}, fmt.Errorf("failed to send heartbeat: %w", err)
361361
}
362362
if response.ErrorCode != 0 {
363-
return heartbeatResponseV0{}, Error(response.ErrorCode)
363+
return heartbeatResponseV0{}, makeError(response.ErrorCode, "failed to send heartbeat")
364364
}
365365

366366
return response, nil
@@ -383,10 +383,10 @@ func (c *Conn) joinGroup(request joinGroupRequestV1) (joinGroupResponseV1, error
383383
},
384384
)
385385
if err != nil {
386-
return joinGroupResponseV1{}, err
386+
return joinGroupResponseV1{}, fmt.Errorf("failed to join consumer group: %w", err)
387387
}
388388
if response.ErrorCode != 0 {
389-
return joinGroupResponseV1{}, Error(response.ErrorCode)
389+
return joinGroupResponseV1{}, makeError(response.ErrorCode, "failed to join consumer group")
390390
}
391391

392392
return response, nil
@@ -409,10 +409,10 @@ func (c *Conn) leaveGroup(request leaveGroupRequestV0) (leaveGroupResponseV0, er
409409
},
410410
)
411411
if err != nil {
412-
return leaveGroupResponseV0{}, err
412+
return leaveGroupResponseV0{}, fmt.Errorf("failed to leave consumer group: %w", err)
413413
}
414414
if response.ErrorCode != 0 {
415-
return leaveGroupResponseV0{}, Error(response.ErrorCode)
415+
return leaveGroupResponseV0{}, makeError(response.ErrorCode, "failed to leave consumer group")
416416
}
417417

418418
return response, nil
@@ -435,10 +435,10 @@ func (c *Conn) listGroups(request listGroupsRequestV1) (listGroupsResponseV1, er
435435
},
436436
)
437437
if err != nil {
438-
return listGroupsResponseV1{}, err
438+
return listGroupsResponseV1{}, fmt.Errorf("failed to list consumer groups: %w", err)
439439
}
440440
if response.ErrorCode != 0 {
441-
return listGroupsResponseV1{}, Error(response.ErrorCode)
441+
return listGroupsResponseV1{}, makeError(response.ErrorCode, "failed to list consumer groups")
442442
}
443443

444444
return response, nil
@@ -461,12 +461,12 @@ func (c *Conn) offsetCommit(request offsetCommitRequestV2) (offsetCommitResponse
461461
},
462462
)
463463
if err != nil {
464-
return offsetCommitResponseV2{}, err
464+
return offsetCommitResponseV2{}, fmt.Errorf("failed to commit offsets: %w", err)
465465
}
466466
for _, r := range response.Responses {
467467
for _, pr := range r.PartitionResponses {
468468
if pr.ErrorCode != 0 {
469-
return offsetCommitResponseV2{}, Error(pr.ErrorCode)
469+
return offsetCommitResponseV2{}, makeError(pr.ErrorCode, "failed to commit offsets")
470470
}
471471
}
472472
}
@@ -492,12 +492,12 @@ func (c *Conn) offsetFetch(request offsetFetchRequestV1) (offsetFetchResponseV1,
492492
},
493493
)
494494
if err != nil {
495-
return offsetFetchResponseV1{}, err
495+
return offsetFetchResponseV1{}, fmt.Errorf("failed to fetch offsets: %w", err)
496496
}
497497
for _, r := range response.Responses {
498498
for _, pr := range r.PartitionResponses {
499499
if pr.ErrorCode != 0 {
500-
return offsetFetchResponseV1{}, Error(pr.ErrorCode)
500+
return offsetFetchResponseV1{}, makeError(pr.ErrorCode, "failed to fetch offsets")
501501
}
502502
}
503503
}
@@ -522,10 +522,10 @@ func (c *Conn) syncGroup(request syncGroupRequestV0) (syncGroupResponseV0, error
522522
},
523523
)
524524
if err != nil {
525-
return syncGroupResponseV0{}, err
525+
return syncGroupResponseV0{}, fmt.Errorf("failed to finish joining consumer group: %w", err)
526526
}
527527
if response.ErrorCode != 0 {
528-
return syncGroupResponseV0{}, Error(response.ErrorCode)
528+
return syncGroupResponseV0{}, makeError(response.ErrorCode, "failed to finish joining consumer group")
529529
}
530530

531531
return response, nil
@@ -666,7 +666,7 @@ func (c *Conn) Seek(offset int64, whence int) (int64, error) {
666666

667667
first, last, err := c.ReadOffsets()
668668
if err != nil {
669-
return 0, err
669+
return 0, fmt.Errorf("failed to read offsets: %w", err)
670670
}
671671

672672
switch whence {
@@ -757,20 +757,20 @@ func (c *Conn) ReadBatchWith(cfg ReadBatchConfig) *Batch {
757757
var maxFetch = int(c.fetchMaxBytes)
758758

759759
if cfg.MinBytes < 0 || cfg.MinBytes > maxFetch {
760-
return &Batch{err: fmt.Errorf("kafka.(*Conn).ReadBatch: minBytes of %d out of [1,%d] bounds", cfg.MinBytes, maxFetch)}
760+
return &Batch{err: fmt.Errorf("minBytes of %d out of [1,%d] bounds", cfg.MinBytes, maxFetch)}
761761
}
762762
if cfg.MaxBytes < 0 || cfg.MaxBytes > maxFetch {
763-
return &Batch{err: fmt.Errorf("kafka.(*Conn).ReadBatch: maxBytes of %d out of [1,%d] bounds", cfg.MaxBytes, maxFetch)}
763+
return &Batch{err: fmt.Errorf("maxBytes of %d out of [1,%d] bounds", cfg.MaxBytes, maxFetch)}
764764
}
765765
if cfg.MinBytes > cfg.MaxBytes {
766-
return &Batch{err: fmt.Errorf("kafka.(*Conn).ReadBatch: minBytes (%d) > maxBytes (%d)", cfg.MinBytes, cfg.MaxBytes)}
766+
return &Batch{err: fmt.Errorf("minBytes (%d) > maxBytes (%d)", cfg.MinBytes, cfg.MaxBytes)}
767767
}
768768

769769
offset, whence := c.Offset()
770770

771771
offset, err := c.Seek(offset, whence|SeekDontCheck)
772772
if err != nil {
773-
return &Batch{err: dontExpectEOF(err)}
773+
return &Batch{err: fmt.Errorf("failed to seek to offset %d (whence %d): %w", offset, whence, dontExpectEOF(err))}
774774
}
775775

776776
fetchVersion, err := c.negotiateVersion(fetch, v2, v5, v10)
@@ -833,12 +833,12 @@ func (c *Conn) ReadBatchWith(cfg ReadBatchConfig) *Batch {
833833
}
834834
})
835835
if err != nil {
836-
return &Batch{err: dontExpectEOF(err)}
836+
return &Batch{err: fmt.Errorf("failed to write fetch request (version %d): %w", fetchVersion, dontExpectEOF(err))}
837837
}
838838

839839
_, size, lock, err := c.waitResponse(&c.rdeadline, id)
840840
if err != nil {
841-
return &Batch{err: dontExpectEOF(err)}
841+
return &Batch{err: fmt.Errorf("request %d failed to give response: %w", id, dontExpectEOF(err))}
842842
}
843843

844844
var throttle int32
@@ -854,7 +854,7 @@ func (c *Conn) ReadBatchWith(cfg ReadBatchConfig) *Batch {
854854
throttle, highWaterMark, remain, err = readFetchResponseHeaderV2(&c.rbuf, size)
855855
}
856856
if errors.Is(err, errShortRead) {
857-
err = checkTimeoutErr(adjustedDeadline)
857+
err = fmt.Errorf("failed to fetch response header (version %d): %w", fetchVersion, checkTimeoutErr(adjustedDeadline))
858858
}
859859

860860
var msgs *messageSetReader
@@ -863,11 +863,11 @@ func (c *Conn) ReadBatchWith(cfg ReadBatchConfig) *Batch {
863863
msgs = &messageSetReader{empty: true}
864864
} else {
865865
msgs, err = newMessageSetReader(&c.rbuf, remain)
866+
if errors.Is(err, errShortRead) {
867+
err = fmt.Errorf("failed to initialize a reader for set of %d messages: %w", remain, checkTimeoutErr(adjustedDeadline))
868+
}
866869
}
867870
}
868-
if errors.Is(err, errShortRead) {
869-
err = checkTimeoutErr(adjustedDeadline)
870-
}
871871
return &Batch{
872872
conn: c,
873873
msgs: msgs,

error.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -524,7 +524,7 @@ func silentEOF(err error) error {
524524

525525
func dontExpectEOF(err error) error {
526526
if errors.Is(err, io.EOF) {
527-
err = io.ErrUnexpectedEOF
527+
return io.ErrUnexpectedEOF
528528
}
529529
return err
530530
}

0 commit comments

Comments
 (0)