Skip to content

Commit e7d5971

Browse files
authored
Fix for panic when RequiredAcks is set to RequireNone (#504)
* Fix panic in async wait() method when RequiredAcks is None When RequiredAcks is None, the producer does not wait for a response from the broker, therefore the response is nil. The async wait() method was not handling this case, leading to a panic. * Add regression test for RequiredAcks == RequireNone This new test is required because all the other Writer tests use NewWriter() to create Writers, which sets RequiredAcks to RequireAll when 0 (None) was specified.
1 parent 6bd4c78 commit e7d5971

File tree

2 files changed

+29
-0
lines changed

2 files changed

+29
-0
lines changed

transport.go

+2
Original file line numberDiff line numberDiff line change
@@ -778,6 +778,8 @@ func (p async) await(ctx context.Context) (Response, error) {
778778
select {
779779
case x := <-p:
780780
switch v := x.(type) {
781+
case nil:
782+
return nil, nil // A nil response is ok (e.g. when RequiredAcks is None)
781783
case Response:
782784
return v, nil
783785
case error:

writer_test.go

+27
Original file line numberDiff line numberDiff line change
@@ -51,6 +51,10 @@ func TestWriter(t *testing.T) {
5151
scenario: "setting a non default balancer on the writer",
5252
function: testWriterSetsRightBalancer,
5353
},
54+
{
55+
scenario: "setting RequiredAcks to None in Writer doesn't cause a panic",
56+
function: testWriterRequiredAcksNone,
57+
},
5458
}
5559

5660
for _, test := range tests {
@@ -83,6 +87,29 @@ func testWriterClose(t *testing.T) {
8387
}
8488
}
8589

90+
func testWriterRequiredAcksNone(t *testing.T) {
91+
topic := makeTopic()
92+
createTopic(t, topic, 1)
93+
defer deleteTopic(t, topic)
94+
95+
w := &Writer{
96+
Addr: TCP("localhost:9092"),
97+
Topic: topic,
98+
Balancer: &RoundRobin{},
99+
RequiredAcks: RequireNone,
100+
}
101+
defer w.Close()
102+
103+
msg := Message{
104+
Key: []byte("ThisIsAKey"),
105+
Value: []byte("Test message for required acks test")}
106+
107+
err := w.WriteMessages(context.Background(), msg)
108+
if err != nil {
109+
t.Fatal(err)
110+
}
111+
}
112+
86113
func testWriterSetsRightBalancer(t *testing.T) {
87114
const topic = "test-writer-1"
88115
balancer := &CRC32Balancer{}

0 commit comments

Comments
 (0)