Skip to content

Commit 0b38267

Browse files
author
Achille
authored
add kafka.SeekDontCheck (#295)
* add kafka.SeekDontCheck * fix logic * touch less code * use SeekDontCheck in ReadBatch * add test
1 parent 8174c4c commit 0b38267

File tree

4 files changed

+58
-1
lines changed

4 files changed

+58
-1
lines changed

conn.go

Lines changed: 30 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -595,6 +595,12 @@ const (
595595
SeekAbsolute = 1 // Seek to an absolute offset.
596596
SeekEnd = 2 // Seek relative to the last offset available in the partition.
597597
SeekCurrent = 3 // Seek relative to the current offset.
598+
599+
// This flag may be combined to any of the SeekAbsolute and SeekCurrent
600+
// constants to skip the bound check that the connection would do otherwise.
601+
// Programs can use this flag to avoid making a metadata request to the kafka
602+
// broker to read the current first and last offsets of the partition.
603+
SeekDontCheck = 1 << 31
598604
)
599605

600606
// Seek sets the offset for the next read or write operation according to whence, which
@@ -604,12 +610,32 @@ const (
604610
// as in lseek(2) or os.Seek.
605611
// The method returns the new absolute offset of the connection.
606612
func (c *Conn) Seek(offset int64, whence int) (int64, error) {
613+
seekDontCheck := (whence & SeekDontCheck) != 0
614+
whence &= ^SeekDontCheck
615+
607616
switch whence {
608617
case SeekStart, SeekAbsolute, SeekEnd, SeekCurrent:
609618
default:
610619
return 0, fmt.Errorf("whence must be one of 0, 1, 2, or 3. (whence = %d)", whence)
611620
}
612621

622+
if seekDontCheck {
623+
if whence == SeekAbsolute {
624+
c.mutex.Lock()
625+
c.offset = offset
626+
c.mutex.Unlock()
627+
return offset, nil
628+
}
629+
630+
if whence == SeekCurrent {
631+
c.mutex.Lock()
632+
c.offset += offset
633+
offset = c.offset
634+
c.mutex.Unlock()
635+
return offset, nil
636+
}
637+
}
638+
613639
if whence == SeekAbsolute {
614640
c.mutex.Lock()
615641
unchanged := offset == c.offset
@@ -618,6 +644,7 @@ func (c *Conn) Seek(offset int64, whence int) (int64, error) {
618644
return offset, nil
619645
}
620646
}
647+
621648
if whence == SeekCurrent {
622649
c.mutex.Lock()
623650
offset = c.offset + offset
@@ -726,7 +753,9 @@ func (c *Conn) ReadBatchWith(cfg ReadBatchConfig) *Batch {
726753
return &Batch{err: fmt.Errorf("kafka.(*Conn).ReadBatch: minBytes (%d) > maxBytes (%d)", cfg.MinBytes, cfg.MaxBytes)}
727754
}
728755

729-
offset, err := c.Seek(c.Offset())
756+
offset, whence := c.Offset()
757+
758+
offset, err := c.Seek(offset, whence|SeekDontCheck)
730759
if err != nil {
731760
return &Batch{err: dontExpectEOF(err)}
732761
}

conn_test.go

Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -152,6 +152,11 @@ func TestConn(t *testing.T) {
152152
function: testConnSeekRandomOffset,
153153
},
154154

155+
{
156+
scenario: "unchecked seeks allow the connection to be positionned outside the boundaries of the partition",
157+
function: testConnSeekDontCheck,
158+
},
159+
155160
{
156161
scenario: "writing and reading messages sequentially should preserve the order",
157162
function: testConnWriteReadSequentially,
@@ -439,6 +444,27 @@ func testConnSeekRandomOffset(t *testing.T, conn *Conn) {
439444
}
440445
}
441446

447+
func testConnSeekDontCheck(t *testing.T, conn *Conn) {
448+
for i := 0; i != 10; i++ {
449+
if _, err := conn.Write([]byte(strconv.Itoa(i))); err != nil {
450+
t.Fatal(err)
451+
}
452+
}
453+
454+
offset, err := conn.Seek(42, SeekAbsolute|SeekDontCheck)
455+
if err != nil {
456+
t.Error(err)
457+
}
458+
459+
if offset != 42 {
460+
t.Error("bad offset:", offset)
461+
}
462+
463+
if _, err := conn.ReadMessage(1024); err != OffsetOutOfRange {
464+
t.Error("unexpected error:", err)
465+
}
466+
}
467+
442468
func testConnWriteReadSequentially(t *testing.T, conn *Conn) {
443469
for i := 0; i != 10; i++ {
444470
if _, err := conn.Write([]byte(strconv.Itoa(i))); err != nil {

go.mod

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,4 +9,5 @@ require (
99
github.com/xdg/scram v0.0.0-20180814205039-7eeb5667e42c
1010
github.com/xdg/stringprep v1.0.0 // indirect
1111
golang.org/x/crypto v0.0.0-20190506204251-e1dfcc566284 // indirect
12+
golang.org/x/net v0.0.0-20190404232315-eb5bcb51f2a3
1213
)

go.sum

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@ github.com/xdg/stringprep v1.0.0/go.mod h1:Jhud4/sHMO4oL310DaZAKk9ZaJ08SJfe+sJh0
1111
golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w=
1212
golang.org/x/crypto v0.0.0-20190506204251-e1dfcc566284 h1:rlLehGeYg6jfoyz/eDqDU1iRXLKfR42nnNh57ytKEWo=
1313
golang.org/x/crypto v0.0.0-20190506204251-e1dfcc566284/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI=
14+
golang.org/x/net v0.0.0-20190404232315-eb5bcb51f2a3 h1:0GoQqolDA55aaLxZyTzK/Y2ePZzZTUrRacwib7cNsYQ=
1415
golang.org/x/net v0.0.0-20190404232315-eb5bcb51f2a3/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg=
1516
golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
1617
golang.org/x/sys v0.0.0-20190412213103-97732733099d/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=

0 commit comments

Comments
 (0)