Skip to content

Commit ee2efb7

Browse files
author
Achille
authored
0.4: fix short writes (#479)
1 parent a37955f commit ee2efb7

File tree

4 files changed

+151
-5
lines changed

4 files changed

+151
-5
lines changed

client_test.go

+117
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,16 @@
11
package kafka
22

33
import (
4+
"bytes"
45
"context"
6+
"io"
7+
"math/rand"
58
"net"
69
"sync"
710
"testing"
811
"time"
12+
13+
"github.com/segmentio/kafka-go/compress"
914
)
1015

1116
func newLocalClientAndTopic() (*Client, string, func()) {
@@ -183,3 +188,115 @@ func testConsumerGroupFetchOffsets(t *testing.T, ctx context.Context, c *Client)
183188
}
184189
}
185190
}
191+
192+
func TestClientProduceAndConsume(t *testing.T) {
193+
ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
194+
defer cancel()
195+
// Tests a typical kafka use case, data is produced to a partition,
196+
// then consumed back sequentially. We use snappy compression because
197+
// kafka stream are often compressed, and verify that each record
198+
// produced is exposed to the consumer, and order is preserved.
199+
client, topic, shutdown := newLocalClientAndTopic()
200+
defer shutdown()
201+
202+
epoch := time.Now()
203+
seed := int64(0) // deterministic
204+
prng := rand.New(rand.NewSource(seed))
205+
offset := int64(0)
206+
207+
const numBatches = 100
208+
const recordsPerBatch = 320
209+
t.Logf("producing %d batches of %d records...", numBatches, recordsPerBatch)
210+
211+
for i := 0; i < numBatches; i++ { // produce 100 batches
212+
records := make([]Record, recordsPerBatch)
213+
214+
for i := range records {
215+
v := make([]byte, prng.Intn(999)+1)
216+
io.ReadFull(prng, v)
217+
records[i].Time = epoch
218+
records[i].Value = NewBytes(v)
219+
}
220+
221+
res, err := client.Produce(ctx, &ProduceRequest{
222+
Topic: topic,
223+
Partition: 0,
224+
RequiredAcks: -1,
225+
Records: NewRecordReader(records...),
226+
Compression: compress.Snappy,
227+
})
228+
if err != nil {
229+
t.Fatal(err)
230+
}
231+
if res.Error != nil {
232+
t.Fatal(res.Error)
233+
}
234+
if res.BaseOffset != offset {
235+
t.Fatalf("records were produced at an unexpected offset, want %d but got %d", offset, res.BaseOffset)
236+
}
237+
offset += int64(len(records))
238+
}
239+
240+
prng.Seed(seed)
241+
offset = 0 // reset
242+
numFetches := 0
243+
numRecords := 0
244+
245+
for numRecords < (numBatches * recordsPerBatch) {
246+
res, err := client.Fetch(ctx, &FetchRequest{
247+
Topic: topic,
248+
Partition: 0,
249+
Offset: offset,
250+
MinBytes: 1,
251+
MaxBytes: 256 * 1024,
252+
MaxWait: 100 * time.Millisecond, // should only hit on the last fetch
253+
})
254+
if err != nil {
255+
t.Fatal(err)
256+
}
257+
if res.Error != nil {
258+
t.Fatal(err)
259+
}
260+
261+
for {
262+
r, err := res.Records.ReadRecord()
263+
if err != nil {
264+
if err != io.EOF {
265+
t.Fatal(err)
266+
}
267+
break
268+
}
269+
270+
if r.Key != nil {
271+
r.Key.Close()
272+
t.Error("unexpected non-null key on record at offset", r.Offset)
273+
}
274+
275+
n := prng.Intn(999) + 1
276+
a := make([]byte, n)
277+
b := make([]byte, n)
278+
io.ReadFull(prng, a)
279+
280+
_, err = io.ReadFull(r.Value, b)
281+
r.Value.Close()
282+
if err != nil {
283+
t.Fatalf("reading record at offset %d: %v", r.Offset, err)
284+
}
285+
286+
if !bytes.Equal(a, b) {
287+
t.Fatalf("value of record at offset %d mismatches", r.Offset)
288+
}
289+
290+
if r.Offset != offset {
291+
t.Fatalf("record at offset %d was expected to have offset %d", r.Offset, offset)
292+
}
293+
294+
offset = r.Offset + 1
295+
numRecords++
296+
}
297+
298+
numFetches++
299+
}
300+
301+
t.Logf("%d records were read in %d fetches", numRecords, numFetches)
302+
}

docker-compose.010.yml

+29
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,29 @@
1+
version: "3"
2+
services:
3+
kafka:
4+
image: wurstmeister/kafka:0.10.1.1
5+
links:
6+
- zookeeper
7+
ports:
8+
- 9092:9092
9+
- 9093:9093
10+
environment:
11+
KAFKA_BROKER_ID: '1'
12+
KAFKA_CREATE_TOPICS: 'test-writer-0:3:1,test-writer-1:3:1'
13+
KAFKA_DELETE_TOPIC_ENABLE: 'true'
14+
KAFKA_ADVERTISED_HOST_NAME: 'localhost'
15+
KAFKA_ADVERTISED_PORT: '9092'
16+
KAFKA_ZOOKEEPER_CONNECT: 'zookeeper:2181'
17+
KAFKA_AUTO_CREATE_TOPICS_ENABLE: 'true'
18+
KAFKA_MESSAGE_MAX_BYTES: '200000000'
19+
KAFKA_LISTENERS: 'PLAINTEXT://:9092,SASL_PLAINTEXT://:9093'
20+
KAFKA_ADVERTISED_LISTENERS: 'PLAINTEXT://localhost:9092,SASL_PLAINTEXT://localhost:9093'
21+
KAFKA_SASL_ENABLED_MECHANISMS: 'PLAIN'
22+
KAFKA_OPTS: "-Djava.security.auth.login.config=/opt/kafka/config/kafka_server_jaas.conf"
23+
CUSTOM_INIT_SCRIPT: |-
24+
echo -e 'KafkaServer {\norg.apache.kafka.common.security.plain.PlainLoginModule required\n username="adminplain"\n password="admin-secret"\n user_adminplain="admin-secret";\n };' > /opt/kafka/config/kafka_server_jaas.conf;
25+
26+
zookeeper:
27+
image: wurstmeister/zookeeper
28+
ports:
29+
- 2181:2181

protocol/buffer.go

+4-4
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,6 @@ import (
55
"fmt"
66
"io"
77
"math"
8-
"sort"
98
"sync"
109
"sync/atomic"
1110
)
@@ -461,9 +460,10 @@ func (pages contiguousPages) slice(begin, end int64) contiguousPages {
461460
}
462461

463462
func (pages contiguousPages) indexOf(offset int64) int {
464-
return sort.Search(len(pages), func(i int) bool {
465-
return offset < (pages[i].offset + pages[i].Size())
466-
})
463+
if len(pages) == 0 {
464+
return 0
465+
}
466+
return int((offset - pages[0].offset) / pageSize)
467467
}
468468

469469
func (pages contiguousPages) scan(begin, end int64, f func([]byte) bool) {

protocol/record_v1.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -174,7 +174,7 @@ func (rs *RecordSet) writeToVersion1(buffer *pageBuffer, bufferOffset int64) err
174174
var err error
175175
buffer.pages.scan(bufferOffset, buffer.Size(), func(b []byte) bool {
176176
_, err = compressor.Write(b)
177-
return err != nil
177+
return err == nil
178178
})
179179
if err != nil {
180180
return err

0 commit comments

Comments
 (0)