Skip to content

Commit 6bd4c78

Browse files
author
Achille Roussel
committed
Merge remote-tracking branch 'origin/master' into 0.4
2 parents 8766085 + ff55ecc commit 6bd4c78

File tree

6 files changed

+100
-13
lines changed

6 files changed

+100
-13
lines changed

conn.go

+25-4
Original file line numberDiff line numberDiff line change
@@ -104,6 +104,14 @@ type ReadBatchConfig struct {
104104
// ReadUncommitted makes all records visible. With ReadCommitted only
105105
// non-transactional and committed records are visible.
106106
IsolationLevel IsolationLevel
107+
108+
// MaxWait is the amount of time for the broker while waiting to hit the
109+
// min/max byte targets. This setting is independent of any network-level
110+
// timeouts or deadlines.
111+
//
112+
// For backward compatibility, when this field is left zero, kafka-go will
113+
// infer the max wait from the connection's read deadline.
114+
MaxWait time.Duration
107115
}
108116

109117
type IsolationLevel int8
@@ -773,7 +781,20 @@ func (c *Conn) ReadBatchWith(cfg ReadBatchConfig) *Batch {
773781

774782
id, err := c.doRequest(&c.rdeadline, func(deadline time.Time, id int32) error {
775783
now := time.Now()
776-
deadline = adjustDeadlineForRTT(deadline, now, defaultRTT)
784+
var timeout time.Duration
785+
if cfg.MaxWait > 0 {
786+
// explicitly-configured case: no changes are made to the deadline,
787+
// and the timeout is sent exactly as specified.
788+
timeout = cfg.MaxWait
789+
} else {
790+
// default case: use the original logic to adjust the conn's
791+
// deadline.T
792+
deadline = adjustDeadlineForRTT(deadline, now, defaultRTT)
793+
timeout = deadlineToTimeout(deadline, now)
794+
}
795+
// save this variable outside of the closure for later use in detecting
796+
// truncated messages.
797+
adjustedDeadline = deadline
777798
switch fetchVersion {
778799
case v10:
779800
return c.wb.writeFetchRequestV10(
@@ -784,7 +805,7 @@ func (c *Conn) ReadBatchWith(cfg ReadBatchConfig) *Batch {
784805
offset,
785806
cfg.MinBytes,
786807
cfg.MaxBytes+int(c.fetchMinSize),
787-
deadlineToTimeout(deadline, now),
808+
timeout,
788809
int8(cfg.IsolationLevel),
789810
)
790811
case v5:
@@ -796,7 +817,7 @@ func (c *Conn) ReadBatchWith(cfg ReadBatchConfig) *Batch {
796817
offset,
797818
cfg.MinBytes,
798819
cfg.MaxBytes+int(c.fetchMinSize),
799-
deadlineToTimeout(deadline, now),
820+
timeout,
800821
int8(cfg.IsolationLevel),
801822
)
802823
default:
@@ -808,7 +829,7 @@ func (c *Conn) ReadBatchWith(cfg ReadBatchConfig) *Batch {
808829
offset,
809830
cfg.MinBytes,
810831
cfg.MaxBytes+int(c.fetchMinSize),
811-
deadlineToTimeout(deadline, now),
832+
timeout,
812833
)
813834
}
814835
})

conn_test.go

+51
Original file line numberDiff line numberDiff line change
@@ -185,6 +185,11 @@ func TestConn(t *testing.T) {
185185
function: testConnReadWatermarkFromBatch,
186186
},
187187

188+
{
189+
scenario: "read a batch using explicit max wait time",
190+
function: testConnReadBatchWithMaxWait,
191+
},
192+
188193
{
189194
scenario: "describe groups retrieves all groups when no groupID specified",
190195
function: testConnDescribeGroupRetrievesAllGroups,
@@ -540,6 +545,52 @@ func testConnReadWatermarkFromBatch(t *testing.T, conn *Conn) {
540545
batch.Close()
541546
}
542547

548+
func testConnReadBatchWithMaxWait(t *testing.T, conn *Conn) {
549+
if _, err := conn.WriteMessages(makeTestSequence(10)...); err != nil {
550+
t.Fatal(err)
551+
}
552+
553+
const maxBytes = 10e6 // 10 MB
554+
555+
value := make([]byte, 10e3) // 10 KB
556+
557+
cfg := ReadBatchConfig{
558+
MinBytes: maxBytes, // use max for both so that we hit max wait time
559+
MaxBytes: maxBytes,
560+
MaxWait: 500 * time.Millisecond,
561+
}
562+
563+
// set aa read deadline so the batch will succeed.
564+
conn.SetDeadline(time.Now().Add(time.Second))
565+
batch := conn.ReadBatchWith(cfg)
566+
567+
for i := 0; i < 10; i++ {
568+
_, err := batch.Read(value)
569+
if err != nil {
570+
if err = batch.Close(); err != nil {
571+
t.Fatalf("error trying to read batch message: %s", err)
572+
}
573+
}
574+
575+
if batch.HighWaterMark() != 10 {
576+
t.Fatal("expected highest offset (watermark) to be 10")
577+
}
578+
}
579+
580+
batch.Close()
581+
582+
// reset the offset and ensure that the conn deadline takes precedence over
583+
// the max wait
584+
conn.Seek(0, SeekAbsolute)
585+
conn.SetDeadline(time.Now().Add(50 * time.Millisecond))
586+
batch = conn.ReadBatchWith(cfg)
587+
if err := batch.Err(); err == nil {
588+
t.Fatal("should have timed out, but got no error")
589+
} else if netErr, ok := err.(net.Error); !ok || !netErr.Timeout() {
590+
t.Fatalf("should have timed out, but got: %v", err)
591+
}
592+
}
593+
543594
func waitForCoordinator(t *testing.T, conn *Conn, groupID string) {
544595
// ensure that kafka has allocated a group coordinator. oddly, issue doesn't
545596
// appear to happen if the kafka been running for a while.

dialer.go

+18-6
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,13 @@ type Dialer struct {
1818
// Unique identifier for client connections established by this Dialer.
1919
ClientID string
2020

21+
// Optionally specifies the function that the dialer uses to establish
22+
// network connections. If nil, net.(*Dialer).DialContext is used instead.
23+
//
24+
// When DialFunc is set, LocalAddr, DualStack, FallbackDelay, and KeepAlive
25+
// are ignored.
26+
DialFunc func(ctx context.Context, network string, address string) (net.Conn, error)
27+
2128
// Timeout is the maximum amount of time a dial will wait for a connect to
2229
// complete. If Deadline is also set, it may fail earlier.
2330
//
@@ -329,12 +336,17 @@ func (d *Dialer) dialContext(ctx context.Context, network string, address string
329336
}
330337
}
331338

332-
conn, err := (&net.Dialer{
333-
LocalAddr: d.LocalAddr,
334-
DualStack: d.DualStack,
335-
FallbackDelay: d.FallbackDelay,
336-
KeepAlive: d.KeepAlive,
337-
}).DialContext(ctx, network, address)
339+
dial := d.DialFunc
340+
if dial == nil {
341+
dial = (&net.Dialer{
342+
LocalAddr: d.LocalAddr,
343+
DualStack: d.DualStack,
344+
FallbackDelay: d.FallbackDelay,
345+
KeepAlive: d.KeepAlive,
346+
}).DialContext
347+
}
348+
349+
conn, err := dial(ctx, network, address)
338350
if err != nil {
339351
return nil, err
340352
}

example_consumergroup_test.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -46,7 +46,7 @@ func ExampleConsumerGroupParallelReaders() {
4646
case kafka.ErrGenerationEnded:
4747
// generation has ended. commit offsets. in a real app,
4848
// offsets would be committed periodically.
49-
gen.CommitOffsets(map[string]map[int]int64{"my-topic": {partition: offset}})
49+
gen.CommitOffsets(map[string]map[int]int64{"my-topic": {partition: offset + 1}})
5050
return
5151
case nil:
5252
fmt.Printf("received message %s/%d/%d : %s\n", msg.Topic, msg.Partition, msg.Offset, string(msg.Value))

reader.go

+5-1
Original file line numberDiff line numberDiff line change
@@ -685,7 +685,11 @@ func (r *Reader) Close() error {
685685
// The method returns io.EOF to indicate that the reader has been closed.
686686
//
687687
// If consumer groups are used, ReadMessage will automatically commit the
688-
// offset when called.
688+
// offset when called. Note that this could result in an offset being committed
689+
// before the message is fully processed.
690+
//
691+
// If more fine grained control of when offsets are committed is required, it
692+
// is recommended to use FetchMessage with CommitMessages instead.
689693
func (r *Reader) ReadMessage(ctx context.Context) (Message, error) {
690694
m, err := r.FetchMessage(ctx)
691695
if err != nil {

writer.go

-1
Original file line numberDiff line numberDiff line change
@@ -636,7 +636,6 @@ func (w *Writer) WriteMessages(ctx context.Context, msgs ...Message) error {
636636
werr[i] = batch.err
637637
}
638638
}
639-
640639
return werr
641640
}
642641

0 commit comments

Comments
 (0)