Skip to content

Commit b7a001a

Browse files
authored
Remove deprecated function (NewWriter) usages (#528)
1 parent 2fc66e4 commit b7a001a

File tree

12 files changed

+102
-61
lines changed

12 files changed

+102
-61
lines changed

client_test.go

+3-4
Original file line numberDiff line numberDiff line change
@@ -139,13 +139,12 @@ func testConsumerGroupFetchOffsets(t *testing.T, ctx context.Context, c *Client)
139139
groupId := makeGroupID()
140140
brokers := []string{"localhost:9092"}
141141

142-
writer := NewWriter(WriterConfig{
143-
Brokers: brokers,
142+
writer := &Writer{
143+
Addr: TCP(brokers...),
144144
Topic: topic,
145-
Dialer: DefaultDialer,
146145
Balancer: &RoundRobin{},
147146
BatchSize: 1,
148-
})
147+
}
149148
if err := writer.WriteMessages(ctx, makeTestSequence(totalMessages)...); err != nil {
150149
t.Fatalf("bad write messages: %v", err)
151150
}

compress/compress_test.go

+11-11
Original file line numberDiff line numberDiff line change
@@ -120,12 +120,12 @@ func testCompressedMessages(t *testing.T, codec pkg.Codec) {
120120
topic := createTopic(t, 1)
121121
defer deleteTopic(t, topic)
122122

123-
w := kafka.NewWriter(kafka.WriterConfig{
124-
Brokers: []string{"127.0.0.1:9092"},
125-
Topic: topic,
126-
CompressionCodec: codec,
127-
BatchTimeout: 10 * time.Millisecond,
128-
})
123+
w := &kafka.Writer{
124+
Addr: kafka.TCP("127.0.0.1:9092"),
125+
Topic: topic,
126+
Compression: kafka.Compression(codec.Code()),
127+
BatchTimeout: 10 * time.Millisecond,
128+
}
129129
defer w.Close()
130130

131131
offset := 0
@@ -191,11 +191,11 @@ func TestMixedCompressedMessages(t *testing.T) {
191191
offset := 0
192192
var values []string
193193
produce := func(n int, codec pkg.Codec) {
194-
w := kafka.NewWriter(kafka.WriterConfig{
195-
Brokers: []string{"127.0.0.1:9092"},
196-
Topic: topic,
197-
CompressionCodec: codec,
198-
})
194+
w := &kafka.Writer{
195+
Addr: kafka.TCP("127.0.0.1:9092"),
196+
Topic: topic,
197+
Compression: kafka.Compression(codec.Code()),
198+
}
199199
defer w.Close()
200200

201201
msgs := make([]kafka.Message, n)

dialer_test.go

+8-8
Original file line numberDiff line numberDiff line change
@@ -40,10 +40,10 @@ func testDialerLookupPartitions(t *testing.T, ctx context.Context, d *Dialer) {
4040
defer deleteTopic(t, topic)
4141

4242
// Write a message to ensure the partition gets created.
43-
w := NewWriter(WriterConfig{
44-
Brokers: []string{"localhost:9092"},
45-
Topic: topic,
46-
})
43+
w := &Writer{
44+
Addr: TCP("localhost:9092"),
45+
Topic: topic,
46+
}
4747
w.WriteMessages(ctx, Message{})
4848
w.Close()
4949

@@ -173,10 +173,10 @@ func TestDialerTLS(t *testing.T) {
173173
defer deleteTopic(t, topic)
174174

175175
// Write a message to ensure the partition gets created.
176-
w := NewWriter(WriterConfig{
177-
Brokers: []string{"localhost:9092"},
178-
Topic: topic,
179-
})
176+
w := &Writer{
177+
Addr: TCP("localhost:9092"),
178+
Topic: topic,
179+
}
180180
w.WriteMessages(context.Background(), Message{})
181181
w.Close()
182182

example_writer_test.go

+4-4
Original file line numberDiff line numberDiff line change
@@ -7,10 +7,10 @@ import (
77
)
88

99
func ExampleWriter() {
10-
w := kafka.NewWriter(kafka.WriterConfig{
11-
Brokers: []string{"localhost:9092"},
12-
Topic: "Topic-1",
13-
})
10+
w := &kafka.Writer{
11+
Addr: kafka.TCP("localhost:9092"),
12+
Topic: "Topic-1",
13+
}
1414

1515
w.WriteMessages(context.Background(),
1616
kafka.Message{

examples/producer-api/go.mod

+3-1
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,5 @@
11
module github.com/tarekbadrshalaan/GoKafka/kafka-go
22

3-
require github.com/segmentio/kafka-go v0.2.2
3+
go 1.15
4+
5+
require github.com/segmentio/kafka-go v0.4.5

examples/producer-api/go.sum

+23-2
Original file line numberDiff line numberDiff line change
@@ -1,2 +1,23 @@
1-
github.com/segmentio/kafka-go v0.2.2 h1:KIUln5unPisRL2yyAkZsDR/coiymN9Djunv6JKGQ6JI=
2-
github.com/segmentio/kafka-go v0.2.2/go.mod h1:X6itGqS9L4jDletMsxZ7Dz+JFWxM6JHfPOCvTvk+EJo=
1+
github.com/eapache/go-xerial-snappy v0.0.0-20180814174437-776d5712da21 h1:YEetp8/yCZMuEPMUDHG0CW/brkkEp8mzqk2+ODEitlw=
2+
github.com/eapache/go-xerial-snappy v0.0.0-20180814174437-776d5712da21/go.mod h1:+020luEh2TKB4/GOp8oxxtq0Daoen/Cii55CzbTV6DU=
3+
github.com/golang/snappy v0.0.1 h1:Qgr9rKW7uDUkrbSmQeiDsGa8SjGyCOGtuasMWwvp2P4=
4+
github.com/golang/snappy v0.0.1/go.mod h1:/XxbfmMg8lxefKM7IXC3fBNl/7bRcc72aCRzEWrmP2Q=
5+
github.com/klauspost/compress v1.9.8 h1:VMAMUUOh+gaxKTMk+zqbjsSjsIcUcL/LF4o63i82QyA=
6+
github.com/klauspost/compress v1.9.8/go.mod h1:RyIbtBH6LamlWaDj8nUwkbUhJ87Yi3uG0guNDohfE1A=
7+
github.com/pierrec/lz4 v2.0.5+incompatible h1:2xWsjqPFWcplujydGg4WmhC/6fZqK42wMM8aXeqhl0I=
8+
github.com/pierrec/lz4 v2.0.5+incompatible/go.mod h1:pdkljMzZIN41W+lC3N2tnIh5sFi+IEE17M5jbnwPHcY=
9+
github.com/segmentio/kafka-go v0.4.5 h1:vphUaNc3rt77MlGjGfV6AjGq/piP+04wzDLuIiAE9iE=
10+
github.com/segmentio/kafka-go v0.4.5/go.mod h1:Inh7PqOsxmfgasV8InZYKVXWsdjcCq2d9tFV75GLbuM=
11+
github.com/xdg/scram v0.0.0-20180814205039-7eeb5667e42c h1:u40Z8hqBAAQyv+vATcGgV0YCnDjqSL7/q/JyPhhJSPk=
12+
github.com/xdg/scram v0.0.0-20180814205039-7eeb5667e42c/go.mod h1:lB8K/P019DLNhemzwFU4jHLhdvlE6uDZjXFejJXr49I=
13+
github.com/xdg/stringprep v1.0.0 h1:d9X0esnoa3dFsV0FG35rAT0RIhYFlPq7MiP+DW89La0=
14+
github.com/xdg/stringprep v1.0.0/go.mod h1:Jhud4/sHMO4oL310DaZAKk9ZaJ08SJfe+sJh0HrGL1Y=
15+
golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w=
16+
golang.org/x/crypto v0.0.0-20190506204251-e1dfcc566284 h1:rlLehGeYg6jfoyz/eDqDU1iRXLKfR42nnNh57ytKEWo=
17+
golang.org/x/crypto v0.0.0-20190506204251-e1dfcc566284/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI=
18+
golang.org/x/net v0.0.0-20190404232315-eb5bcb51f2a3 h1:0GoQqolDA55aaLxZyTzK/Y2ePZzZTUrRacwib7cNsYQ=
19+
golang.org/x/net v0.0.0-20190404232315-eb5bcb51f2a3/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg=
20+
golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
21+
golang.org/x/sys v0.0.0-20190412213103-97732733099d/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
22+
golang.org/x/text v0.3.0 h1:g61tztE5qeGQ89tm6NTjjM9VPIm088od1l6aSorWRWg=
23+
golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ=

examples/producer-api/main.go

+3-3
Original file line numberDiff line numberDiff line change
@@ -30,11 +30,11 @@ func producerHandler(kafkaWriter *kafka.Writer) func(http.ResponseWriter, *http.
3030
}
3131

3232
func getKafkaWriter(kafkaURL, topic string) *kafka.Writer {
33-
return kafka.NewWriter(kafka.WriterConfig{
34-
Brokers: []string{kafkaURL},
33+
return &kafka.Writer{
34+
Addr: kafka.TCP(kafkaURL),
3535
Topic: topic,
3636
Balancer: &kafka.LeastBytes{},
37-
})
37+
}
3838
}
3939

4040
func main() {

examples/producer-random/go.mod

+3-1
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,8 @@
11
module github.com/tarekbadrshalaan/GoKafka/kafka-go
22

3+
go 1.15
4+
35
require (
46
github.com/google/uuid v1.1.0
5-
github.com/segmentio/kafka-go v0.2.2
7+
github.com/segmentio/kafka-go v0.4.5
68
)

examples/producer-random/go.sum

+23-2
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,25 @@
1+
github.com/eapache/go-xerial-snappy v0.0.0-20180814174437-776d5712da21 h1:YEetp8/yCZMuEPMUDHG0CW/brkkEp8mzqk2+ODEitlw=
2+
github.com/eapache/go-xerial-snappy v0.0.0-20180814174437-776d5712da21/go.mod h1:+020luEh2TKB4/GOp8oxxtq0Daoen/Cii55CzbTV6DU=
3+
github.com/golang/snappy v0.0.1 h1:Qgr9rKW7uDUkrbSmQeiDsGa8SjGyCOGtuasMWwvp2P4=
4+
github.com/golang/snappy v0.0.1/go.mod h1:/XxbfmMg8lxefKM7IXC3fBNl/7bRcc72aCRzEWrmP2Q=
15
github.com/google/uuid v1.1.0 h1:Jf4mxPC/ziBnoPIdpQdPJ9OeiomAUHLvxmPRSPH9m4s=
26
github.com/google/uuid v1.1.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo=
3-
github.com/segmentio/kafka-go v0.2.2 h1:KIUln5unPisRL2yyAkZsDR/coiymN9Djunv6JKGQ6JI=
4-
github.com/segmentio/kafka-go v0.2.2/go.mod h1:X6itGqS9L4jDletMsxZ7Dz+JFWxM6JHfPOCvTvk+EJo=
7+
github.com/klauspost/compress v1.9.8 h1:VMAMUUOh+gaxKTMk+zqbjsSjsIcUcL/LF4o63i82QyA=
8+
github.com/klauspost/compress v1.9.8/go.mod h1:RyIbtBH6LamlWaDj8nUwkbUhJ87Yi3uG0guNDohfE1A=
9+
github.com/pierrec/lz4 v2.0.5+incompatible h1:2xWsjqPFWcplujydGg4WmhC/6fZqK42wMM8aXeqhl0I=
10+
github.com/pierrec/lz4 v2.0.5+incompatible/go.mod h1:pdkljMzZIN41W+lC3N2tnIh5sFi+IEE17M5jbnwPHcY=
11+
github.com/segmentio/kafka-go v0.4.5 h1:vphUaNc3rt77MlGjGfV6AjGq/piP+04wzDLuIiAE9iE=
12+
github.com/segmentio/kafka-go v0.4.5/go.mod h1:Inh7PqOsxmfgasV8InZYKVXWsdjcCq2d9tFV75GLbuM=
13+
github.com/xdg/scram v0.0.0-20180814205039-7eeb5667e42c h1:u40Z8hqBAAQyv+vATcGgV0YCnDjqSL7/q/JyPhhJSPk=
14+
github.com/xdg/scram v0.0.0-20180814205039-7eeb5667e42c/go.mod h1:lB8K/P019DLNhemzwFU4jHLhdvlE6uDZjXFejJXr49I=
15+
github.com/xdg/stringprep v1.0.0 h1:d9X0esnoa3dFsV0FG35rAT0RIhYFlPq7MiP+DW89La0=
16+
github.com/xdg/stringprep v1.0.0/go.mod h1:Jhud4/sHMO4oL310DaZAKk9ZaJ08SJfe+sJh0HrGL1Y=
17+
golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w=
18+
golang.org/x/crypto v0.0.0-20190506204251-e1dfcc566284 h1:rlLehGeYg6jfoyz/eDqDU1iRXLKfR42nnNh57ytKEWo=
19+
golang.org/x/crypto v0.0.0-20190506204251-e1dfcc566284/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI=
20+
golang.org/x/net v0.0.0-20190404232315-eb5bcb51f2a3 h1:0GoQqolDA55aaLxZyTzK/Y2ePZzZTUrRacwib7cNsYQ=
21+
golang.org/x/net v0.0.0-20190404232315-eb5bcb51f2a3/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg=
22+
golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
23+
golang.org/x/sys v0.0.0-20190412213103-97732733099d/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
24+
golang.org/x/text v0.3.0 h1:g61tztE5qeGQ89tm6NTjjM9VPIm088od1l6aSorWRWg=
25+
golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ=

examples/producer-random/main.go

+3-3
Original file line numberDiff line numberDiff line change
@@ -11,11 +11,11 @@ import (
1111
)
1212

1313
func newKafkaWriter(kafkaURL, topic string) *kafka.Writer {
14-
return kafka.NewWriter(kafka.WriterConfig{
15-
Brokers: []string{kafkaURL},
14+
return &kafka.Writer{
15+
Addr: kafka.TCP(kafkaURL),
1616
Topic: topic,
1717
Balancer: &kafka.LeastBytes{},
18-
})
18+
}
1919
}
2020

2121
func main() {

reader_test.go

+15-19
Original file line numberDiff line numberDiff line change
@@ -906,13 +906,12 @@ func testReaderConsumerGroupVerifyCommitsOnClose(t *testing.T, ctx context.Conte
906906
func testReaderConsumerGroupReadContentAcrossPartitions(t *testing.T, ctx context.Context, r *Reader) {
907907
const N = 12
908908

909-
writer := NewWriter(WriterConfig{
910-
Brokers: r.config.Brokers,
909+
writer := &Writer{
910+
Addr: TCP(r.config.Brokers...),
911911
Topic: r.config.Topic,
912-
Dialer: r.config.Dialer,
913912
Balancer: &RoundRobin{},
914913
BatchSize: 1,
915-
})
914+
}
916915
if err := writer.WriteMessages(ctx, makeTestSequence(N)...); err != nil {
917916
t.Fatalf("bad write messages: %v", err)
918917
}
@@ -944,13 +943,12 @@ func testReaderConsumerGroupRebalance(t *testing.T, ctx context.Context, r *Read
944943
)
945944

946945
// rebalance should result in 12 message in each of the partitions
947-
writer := NewWriter(WriterConfig{
948-
Brokers: r.config.Brokers,
946+
writer := &Writer{
947+
Addr: TCP(r.config.Brokers...),
949948
Topic: r.config.Topic,
950-
Dialer: r.config.Dialer,
951949
Balancer: &RoundRobin{},
952950
BatchSize: 1,
953-
})
951+
}
954952
if err := writer.WriteMessages(ctx, makeTestSequence(N*partitions)...); err != nil {
955953
t.Fatalf("bad write messages: %v", err)
956954
}
@@ -994,13 +992,12 @@ func testReaderConsumerGroupRebalanceAcrossTopics(t *testing.T, ctx context.Cont
994992
)
995993

996994
// write messages across both partitions
997-
writer := NewWriter(WriterConfig{
998-
Brokers: r.config.Brokers,
995+
writer := &Writer{
996+
Addr: TCP(r.config.Brokers...),
999997
Topic: r.config.Topic,
1000-
Dialer: r.config.Dialer,
1001998
Balancer: &RoundRobin{},
1002999
BatchSize: 1,
1003-
})
1000+
}
10041001
if err := writer.WriteMessages(ctx, makeTestSequence(N)...); err != nil {
10051002
t.Fatalf("bad write messages: %v", err)
10061003
}
@@ -1047,13 +1044,12 @@ func testReaderConsumerGroupRebalanceAcrossManyPartitionsAndConsumers(t *testing
10471044
}()
10481045

10491046
// write messages across both partitions
1050-
writer := NewWriter(WriterConfig{
1051-
Brokers: r.config.Brokers,
1047+
writer := &Writer{
1048+
Addr: TCP(r.config.Brokers...),
10521049
Topic: r.config.Topic,
1053-
Dialer: r.config.Dialer,
10541050
Balancer: &RoundRobin{},
10551051
BatchSize: 1,
1056-
})
1052+
}
10571053
if err := writer.WriteMessages(ctx, makeTestSequence(N*3)...); err != nil {
10581054
t.Fatalf("bad write messages: %v", err)
10591055
}
@@ -1299,12 +1295,12 @@ func TestConsumerGroupWithMissingTopic(t *testing.T) {
12991295
createTopic(t, conf.Topic, 1)
13001296
defer deleteTopic(t, conf.Topic)
13011297

1302-
w := NewWriter(WriterConfig{
1303-
Brokers: r.config.Brokers,
1298+
w := &Writer{
1299+
Addr: TCP(r.config.Brokers...),
13041300
Topic: r.config.Topic,
13051301
BatchTimeout: 10 * time.Millisecond,
13061302
BatchSize: 1,
1307-
})
1303+
}
13081304
defer w.Close()
13091305
if err := w.WriteMessages(ctx, Message{}); err != nil {
13101306
t.Fatalf("write error: %+v", err)

write_test.go

+3-3
Original file line numberDiff line numberDiff line change
@@ -205,12 +205,12 @@ func TestWriteV2RecordBatch(t *testing.T) {
205205
value := fmt.Sprintf("Sample message content: %d!", i)
206206
msgs[i] = Message{Key: []byte("Key"), Value: []byte(value), Headers: []Header{Header{Key: "hk", Value: []byte("hv")}}}
207207
}
208-
w := NewWriter(WriterConfig{
209-
Brokers: []string{"localhost:9092"},
208+
w := &Writer{
209+
Addr: TCP("localhost:9092"),
210210
Topic: topic,
211211
BatchTimeout: 100 * time.Millisecond,
212212
BatchSize: 5,
213-
})
213+
}
214214

215215
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
216216
defer cancel()

0 commit comments

Comments
 (0)