From caf8501bb5354d23b968b61d841966ed57b65027 Mon Sep 17 00:00:00 2001 From: Artur Kronenberg Date: Wed, 19 Aug 2020 10:24:59 +0100 Subject: [PATCH] fix(writer): set correct balancer Sets the correct balancer as passed through in the config on the writer --- writer.go | 5 +++++ writer_test.go | 18 ++++++++++++++++++ 2 files changed, 23 insertions(+) diff --git a/writer.go b/writer.go index b58c075f8..e6d7f6fa3 100644 --- a/writer.go +++ b/writer.go @@ -396,6 +396,10 @@ func NewWriter(config WriterConfig) *Writer { config.Dialer = DefaultDialer } + if config.Balancer == nil { + config.Balancer = &RoundRobin{} + } + // Converts the pre-0.4 Dialer API into a Transport. kafkaDialer := DefaultDialer if config.Dialer != nil { @@ -472,6 +476,7 @@ func NewWriter(config WriterConfig) *Writer { Topic: config.Topic, MaxAttempts: config.MaxAttempts, BatchSize: config.BatchSize, + Balancer: config.Balancer, BatchBytes: int64(config.BatchBytes), BatchTimeout: config.BatchTimeout, ReadTimeout: config.ReadTimeout, diff --git a/writer_test.go b/writer_test.go index b930b2a87..7469d6d66 100644 --- a/writer_test.go +++ b/writer_test.go @@ -47,6 +47,10 @@ func TestWriter(t *testing.T) { scenario: "writing messsages with a small batch byte size", function: testWriterSmallBatchBytes, }, + { + scenario: "setting a non default balancer on the writer", + function: testWriterSetsRightBalancer, + }, } for _, test := range tests { @@ -79,6 +83,20 @@ func testWriterClose(t *testing.T) { } } +func testWriterSetsRightBalancer(t *testing.T) { + const topic = "test-writer-1" + balancer := &CRC32Balancer{} + w := newTestWriter(WriterConfig{ + Topic: topic, + Balancer: balancer, + }) + defer w.Close() + + if w.Balancer != balancer { + t.Errorf("Balancer not set correctly") + } +} + func testWriterRoundRobin1(t *testing.T) { const topic = "test-writer-1" createTopic(t, topic, 1)