Skip to content

Commit edf45f6

Browse files
authored
Add RawProduce API (#1233)
1 parent f568774 commit edf45f6

File tree

10 files changed

+642
-0
lines changed

10 files changed

+642
-0
lines changed

protocol/protocol.go

+31
Original file line numberDiff line numberDiff line change
@@ -213,6 +213,37 @@ func Register(req, res Message) {
213213
}
214214
}
215215

216+
// OverrideTypeMessage is an interface implemented by messages that want to override the standard
217+
// request/response types for a given API.
218+
type OverrideTypeMessage interface {
219+
TypeKey() OverrideTypeKey
220+
}
221+
222+
type OverrideTypeKey int16
223+
224+
const (
225+
RawProduceOverride OverrideTypeKey = 0
226+
)
227+
228+
var overrideApiTypes [numApis]map[OverrideTypeKey]apiType
229+
230+
func RegisterOverride(req, res Message, key OverrideTypeKey) {
231+
k1 := req.ApiKey()
232+
k2 := res.ApiKey()
233+
234+
if k1 != k2 {
235+
panic(fmt.Sprintf("[%T/%T]: request and response API keys mismatch: %d != %d", req, res, k1, k2))
236+
}
237+
238+
if overrideApiTypes[k1] == nil {
239+
overrideApiTypes[k1] = make(map[OverrideTypeKey]apiType)
240+
}
241+
overrideApiTypes[k1][key] = apiType{
242+
requests: typesOf(req),
243+
responses: typesOf(res),
244+
}
245+
}
246+
216247
func typesOf(v interface{}) []messageType {
217248
return makeTypes(reflect.TypeOf(v).Elem())
218249
}

protocol/prototest/reflect.go

+8
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
package prototest
22

33
import (
4+
"bytes"
45
"errors"
56
"io"
67
"reflect"
@@ -49,6 +50,13 @@ func loadValue(v reflect.Value) (reset func()) {
4950
}
5051
resetFunc()
5152
resets = append(resets, resetFunc)
53+
case io.Reader:
54+
buf, _ := io.ReadAll(x)
55+
resetFunc := func() {
56+
f.Set(reflect.ValueOf(bytes.NewBuffer(buf)))
57+
}
58+
resetFunc()
59+
resets = append(resets, resetFunc)
5260
}
5361
})
5462

protocol/prototest/request.go

+33
Original file line numberDiff line numberDiff line change
@@ -46,6 +46,39 @@ func TestRequest(t *testing.T, version int16, msg protocol.Message) {
4646
})
4747
}
4848

49+
// TestRequestWithOverride validates requests that have an overridden type. For requests with type overrides, we
50+
// double-serialize the request to ensure the resulting encoding of the overridden and original type are identical.
51+
func TestRequestWithOverride(t *testing.T, version int16, msg protocol.Message) {
52+
reset := load(msg)
53+
54+
t.Run(fmt.Sprintf("v%d", version), func(t *testing.T) {
55+
b1 := &bytes.Buffer{}
56+
57+
if err := protocol.WriteRequest(b1, version, 1234, "me", msg); err != nil {
58+
t.Fatal(err)
59+
}
60+
61+
reset()
62+
t.Logf("\n%s\n", hex.Dump(b1.Bytes()))
63+
64+
_, _, _, req, err := protocol.ReadRequest(b1)
65+
if err != nil {
66+
t.Fatal(err)
67+
}
68+
69+
b2 := &bytes.Buffer{}
70+
if err := protocol.WriteRequest(b2, version, 1234, "me", req); err != nil {
71+
t.Fatal(err)
72+
}
73+
74+
if !deepEqual(b1, b2) {
75+
t.Errorf("request message mismatch:")
76+
t.Logf("expected: %+v", hex.Dump(b1.Bytes()))
77+
t.Logf("found: %+v", hex.Dump(b2.Bytes()))
78+
}
79+
})
80+
}
81+
4982
func BenchmarkRequest(b *testing.B, version int16, msg protocol.Message) {
5083
reset := load(msg)
5184

protocol/rawproduce/rawproduce.go

+91
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,91 @@
1+
package rawproduce
2+
3+
import (
4+
"fmt"
5+
6+
"github.com/segmentio/kafka-go/protocol"
7+
"github.com/segmentio/kafka-go/protocol/produce"
8+
)
9+
10+
func init() {
11+
// Register a type override so that raw produce requests will be encoded with the correct type.
12+
req := &Request{}
13+
protocol.RegisterOverride(req, &produce.Response{}, req.TypeKey())
14+
}
15+
16+
type Request struct {
17+
TransactionalID string `kafka:"min=v3,max=v8,nullable"`
18+
Acks int16 `kafka:"min=v0,max=v8"`
19+
Timeout int32 `kafka:"min=v0,max=v8"`
20+
Topics []RequestTopic `kafka:"min=v0,max=v8"`
21+
}
22+
23+
func (r *Request) ApiKey() protocol.ApiKey { return protocol.Produce }
24+
25+
func (r *Request) TypeKey() protocol.OverrideTypeKey { return protocol.RawProduceOverride }
26+
27+
func (r *Request) Broker(cluster protocol.Cluster) (protocol.Broker, error) {
28+
broker := protocol.Broker{ID: -1}
29+
30+
for i := range r.Topics {
31+
t := &r.Topics[i]
32+
33+
topic, ok := cluster.Topics[t.Topic]
34+
if !ok {
35+
return broker, NewError(protocol.NewErrNoTopic(t.Topic))
36+
}
37+
38+
for j := range t.Partitions {
39+
p := &t.Partitions[j]
40+
41+
partition, ok := topic.Partitions[p.Partition]
42+
if !ok {
43+
return broker, NewError(protocol.NewErrNoPartition(t.Topic, p.Partition))
44+
}
45+
46+
if b, ok := cluster.Brokers[partition.Leader]; !ok {
47+
return broker, NewError(protocol.NewErrNoLeader(t.Topic, p.Partition))
48+
} else if broker.ID < 0 {
49+
broker = b
50+
} else if b.ID != broker.ID {
51+
return broker, NewError(fmt.Errorf("mismatching leaders (%d!=%d)", b.ID, broker.ID))
52+
}
53+
}
54+
}
55+
56+
return broker, nil
57+
}
58+
59+
func (r *Request) HasResponse() bool {
60+
return r.Acks != 0
61+
}
62+
63+
type RequestTopic struct {
64+
Topic string `kafka:"min=v0,max=v8"`
65+
Partitions []RequestPartition `kafka:"min=v0,max=v8"`
66+
}
67+
68+
type RequestPartition struct {
69+
Partition int32 `kafka:"min=v0,max=v8"`
70+
RecordSet protocol.RawRecordSet `kafka:"min=v0,max=v8"`
71+
}
72+
73+
var (
74+
_ protocol.BrokerMessage = (*Request)(nil)
75+
)
76+
77+
type Error struct {
78+
Err error
79+
}
80+
81+
func NewError(err error) *Error {
82+
return &Error{Err: err}
83+
}
84+
85+
func (e *Error) Error() string {
86+
return fmt.Sprintf("fetch request error: %v", e.Err)
87+
}
88+
89+
func (e *Error) Unwrap() error {
90+
return e.Err
91+
}
+201
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,201 @@
1+
package rawproduce_test
2+
3+
import (
4+
"bytes"
5+
"testing"
6+
"time"
7+
8+
"github.com/segmentio/kafka-go/protocol"
9+
"github.com/segmentio/kafka-go/protocol/prototest"
10+
"github.com/segmentio/kafka-go/protocol/rawproduce"
11+
)
12+
13+
const (
14+
v0 = 0
15+
v3 = 3
16+
v5 = 5
17+
)
18+
19+
func TestRawProduceRequest(t *testing.T) {
20+
t0 := time.Now().Truncate(time.Millisecond)
21+
t1 := t0.Add(1 * time.Millisecond)
22+
t2 := t0.Add(2 * time.Millisecond)
23+
24+
prototest.TestRequestWithOverride(t, v0, &rawproduce.Request{
25+
Acks: 1,
26+
Timeout: 500,
27+
Topics: []rawproduce.RequestTopic{
28+
{
29+
Topic: "topic-1",
30+
Partitions: []rawproduce.RequestPartition{
31+
{
32+
Partition: 0,
33+
RecordSet: NewRawRecordSet(protocol.NewRecordReader(
34+
protocol.Record{Offset: 0, Time: t0, Key: nil, Value: nil},
35+
), 1, 0),
36+
},
37+
{
38+
Partition: 1,
39+
RecordSet: NewRawRecordSet(protocol.NewRecordReader(
40+
protocol.Record{Offset: 0, Time: t0, Key: nil, Value: prototest.String("msg-0")},
41+
protocol.Record{Offset: 1, Time: t1, Key: nil, Value: prototest.String("msg-1")},
42+
protocol.Record{Offset: 2, Time: t2, Key: prototest.Bytes([]byte{1}), Value: prototest.String("msg-2")},
43+
), 1, 0),
44+
},
45+
},
46+
},
47+
48+
{
49+
Topic: "topic-2",
50+
Partitions: []rawproduce.RequestPartition{
51+
{
52+
Partition: 0,
53+
RecordSet: NewRawRecordSet(protocol.NewRecordReader(
54+
protocol.Record{Offset: 0, Time: t0, Key: nil, Value: prototest.String("msg-0")},
55+
protocol.Record{Offset: 1, Time: t1, Key: nil, Value: prototest.String("msg-1")},
56+
protocol.Record{Offset: 2, Time: t2, Key: prototest.Bytes([]byte{1}), Value: prototest.String("msg-2")},
57+
), 1, protocol.Gzip),
58+
},
59+
},
60+
},
61+
},
62+
})
63+
64+
prototest.TestRequestWithOverride(t, v3, &rawproduce.Request{
65+
TransactionalID: "1234",
66+
Acks: 1,
67+
Timeout: 500,
68+
Topics: []rawproduce.RequestTopic{
69+
{
70+
Topic: "topic-1",
71+
Partitions: []rawproduce.RequestPartition{
72+
{
73+
Partition: 0,
74+
RecordSet: NewRawRecordSet(protocol.NewRecordReader(
75+
protocol.Record{Offset: 0, Time: t0, Key: nil, Value: nil},
76+
), 1, 0),
77+
},
78+
{
79+
Partition: 1,
80+
RecordSet: NewRawRecordSet(protocol.NewRecordReader(
81+
protocol.Record{Offset: 0, Time: t0, Key: nil, Value: prototest.String("msg-0")},
82+
protocol.Record{Offset: 1, Time: t1, Key: nil, Value: prototest.String("msg-1")},
83+
protocol.Record{Offset: 2, Time: t2, Key: prototest.Bytes([]byte{1}), Value: prototest.String("msg-2")},
84+
), 1, 0),
85+
},
86+
},
87+
},
88+
},
89+
})
90+
91+
headers := []protocol.Header{
92+
{Key: "key-1", Value: []byte("value-1")},
93+
{Key: "key-2", Value: []byte("value-2")},
94+
{Key: "key-3", Value: []byte("value-3")},
95+
}
96+
97+
prototest.TestRequestWithOverride(t, v5, &rawproduce.Request{
98+
TransactionalID: "1234",
99+
Acks: 1,
100+
Timeout: 500,
101+
Topics: []rawproduce.RequestTopic{
102+
{
103+
Topic: "topic-1",
104+
Partitions: []rawproduce.RequestPartition{
105+
{
106+
Partition: 1,
107+
RecordSet: NewRawRecordSet(protocol.NewRecordReader(
108+
protocol.Record{Offset: 0, Time: t0, Key: nil, Value: prototest.String("msg-0"), Headers: headers},
109+
protocol.Record{Offset: 1, Time: t1, Key: nil, Value: prototest.String("msg-1")},
110+
protocol.Record{Offset: 2, Time: t2, Key: prototest.Bytes([]byte{1}), Value: prototest.String("msg-2")},
111+
), 2, 0),
112+
},
113+
},
114+
},
115+
116+
{
117+
Topic: "topic-2",
118+
Partitions: []rawproduce.RequestPartition{
119+
{
120+
Partition: 1,
121+
RecordSet: NewRawRecordSet(protocol.NewRecordReader(
122+
protocol.Record{Offset: 0, Time: t0, Key: nil, Value: prototest.String("msg-0"), Headers: headers},
123+
protocol.Record{Offset: 1, Time: t1, Key: nil, Value: prototest.String("msg-1")},
124+
protocol.Record{Offset: 2, Time: t2, Key: prototest.Bytes([]byte{1}), Value: prototest.String("msg-2")},
125+
), 2, protocol.Snappy),
126+
},
127+
},
128+
},
129+
},
130+
})
131+
}
132+
133+
func NewRawRecordSet(reader protocol.RecordReader, version int8, attr protocol.Attributes) protocol.RawRecordSet {
134+
rs := protocol.RecordSet{Version: version, Attributes: attr, Records: reader}
135+
buf := &bytes.Buffer{}
136+
rs.WriteTo(buf)
137+
138+
return protocol.RawRecordSet{
139+
Reader: buf,
140+
}
141+
}
142+
143+
func BenchmarkProduceRequest(b *testing.B) {
144+
t0 := time.Now().Truncate(time.Millisecond)
145+
t1 := t0.Add(1 * time.Millisecond)
146+
t2 := t0.Add(2 * time.Millisecond)
147+
148+
prototest.BenchmarkRequest(b, v3, &rawproduce.Request{
149+
TransactionalID: "1234",
150+
Acks: 1,
151+
Timeout: 500,
152+
Topics: []rawproduce.RequestTopic{
153+
{
154+
Topic: "topic-1",
155+
Partitions: []rawproduce.RequestPartition{
156+
{
157+
Partition: 0,
158+
RecordSet: NewRawRecordSet(protocol.NewRecordReader(
159+
protocol.Record{Offset: 0, Time: t0, Key: nil, Value: nil},
160+
), 1, 0),
161+
},
162+
{
163+
Partition: 1,
164+
RecordSet: NewRawRecordSet(protocol.NewRecordReader(
165+
protocol.Record{Offset: 0, Time: t0, Key: nil, Value: prototest.String("msg-0")},
166+
protocol.Record{Offset: 1, Time: t1, Key: nil, Value: prototest.String("msg-1")},
167+
protocol.Record{Offset: 2, Time: t2, Key: prototest.Bytes([]byte{1}), Value: prototest.String("msg-2")},
168+
), 1, 0),
169+
},
170+
},
171+
},
172+
},
173+
})
174+
175+
headers := []protocol.Header{
176+
{Key: "key-1", Value: []byte("value-1")},
177+
{Key: "key-2", Value: []byte("value-2")},
178+
{Key: "key-3", Value: []byte("value-3")},
179+
}
180+
181+
prototest.BenchmarkRequest(b, v5, &rawproduce.Request{
182+
TransactionalID: "1234",
183+
Acks: 1,
184+
Timeout: 500,
185+
Topics: []rawproduce.RequestTopic{
186+
{
187+
Topic: "topic-1",
188+
Partitions: []rawproduce.RequestPartition{
189+
{
190+
Partition: 1,
191+
RecordSet: NewRawRecordSet(protocol.NewRecordReader(
192+
protocol.Record{Offset: 0, Time: t0, Key: nil, Value: prototest.String("msg-0"), Headers: headers},
193+
protocol.Record{Offset: 1, Time: t1, Key: nil, Value: prototest.String("msg-1")},
194+
protocol.Record{Offset: 2, Time: t2, Key: prototest.Bytes([]byte{1}), Value: prototest.String("msg-2")},
195+
), 2, 0),
196+
},
197+
},
198+
},
199+
},
200+
})
201+
}

0 commit comments

Comments
 (0)