Skip to content

Remove deprecated function (NewWriter) usages #528

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Oct 16, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 3 additions & 4 deletions client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -139,13 +139,12 @@ func testConsumerGroupFetchOffsets(t *testing.T, ctx context.Context, c *Client)
groupId := makeGroupID()
brokers := []string{"localhost:9092"}

writer := NewWriter(WriterConfig{
Brokers: brokers,
writer := &Writer{
Addr: TCP(brokers...),
Topic: topic,
Dialer: DefaultDialer,
Balancer: &RoundRobin{},
BatchSize: 1,
})
}
if err := writer.WriteMessages(ctx, makeTestSequence(totalMessages)...); err != nil {
t.Fatalf("bad write messages: %v", err)
}
Expand Down
22 changes: 11 additions & 11 deletions compress/compress_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -120,12 +120,12 @@ func testCompressedMessages(t *testing.T, codec pkg.Codec) {
topic := createTopic(t, 1)
defer deleteTopic(t, topic)

w := kafka.NewWriter(kafka.WriterConfig{
Brokers: []string{"127.0.0.1:9092"},
Topic: topic,
CompressionCodec: codec,
BatchTimeout: 10 * time.Millisecond,
})
w := &kafka.Writer{
Addr: kafka.TCP("127.0.0.1:9092"),
Topic: topic,
Compression: kafka.Compression(codec.Code()),
BatchTimeout: 10 * time.Millisecond,
}
defer w.Close()

offset := 0
Expand Down Expand Up @@ -191,11 +191,11 @@ func TestMixedCompressedMessages(t *testing.T) {
offset := 0
var values []string
produce := func(n int, codec pkg.Codec) {
w := kafka.NewWriter(kafka.WriterConfig{
Brokers: []string{"127.0.0.1:9092"},
Topic: topic,
CompressionCodec: codec,
})
w := &kafka.Writer{
Addr: kafka.TCP("127.0.0.1:9092"),
Topic: topic,
Compression: kafka.Compression(codec.Code()),
}
defer w.Close()

msgs := make([]kafka.Message, n)
Expand Down
16 changes: 8 additions & 8 deletions dialer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,10 +40,10 @@ func testDialerLookupPartitions(t *testing.T, ctx context.Context, d *Dialer) {
defer deleteTopic(t, topic)

// Write a message to ensure the partition gets created.
w := NewWriter(WriterConfig{
Brokers: []string{"localhost:9092"},
Topic: topic,
})
w := &Writer{
Addr: TCP("localhost:9092"),
Topic: topic,
}
w.WriteMessages(ctx, Message{})
w.Close()

Expand Down Expand Up @@ -173,10 +173,10 @@ func TestDialerTLS(t *testing.T) {
defer deleteTopic(t, topic)

// Write a message to ensure the partition gets created.
w := NewWriter(WriterConfig{
Brokers: []string{"localhost:9092"},
Topic: topic,
})
w := &Writer{
Addr: TCP("localhost:9092"),
Topic: topic,
}
w.WriteMessages(context.Background(), Message{})
w.Close()

Expand Down
8 changes: 4 additions & 4 deletions example_writer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,10 +7,10 @@ import (
)

func ExampleWriter() {
w := kafka.NewWriter(kafka.WriterConfig{
Brokers: []string{"localhost:9092"},
Topic: "Topic-1",
})
w := &kafka.Writer{
Addr: kafka.TCP("localhost:9092"),
Topic: "Topic-1",
}

w.WriteMessages(context.Background(),
kafka.Message{
Expand Down
4 changes: 3 additions & 1 deletion examples/producer-api/go.mod
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
module github.com/tarekbadrshalaan/GoKafka/kafka-go

require github.com/segmentio/kafka-go v0.2.2
go 1.15

require github.com/segmentio/kafka-go v0.4.5
25 changes: 23 additions & 2 deletions examples/producer-api/go.sum
Original file line number Diff line number Diff line change
@@ -1,2 +1,23 @@
github.com/segmentio/kafka-go v0.2.2 h1:KIUln5unPisRL2yyAkZsDR/coiymN9Djunv6JKGQ6JI=
github.com/segmentio/kafka-go v0.2.2/go.mod h1:X6itGqS9L4jDletMsxZ7Dz+JFWxM6JHfPOCvTvk+EJo=
github.com/eapache/go-xerial-snappy v0.0.0-20180814174437-776d5712da21 h1:YEetp8/yCZMuEPMUDHG0CW/brkkEp8mzqk2+ODEitlw=
github.com/eapache/go-xerial-snappy v0.0.0-20180814174437-776d5712da21/go.mod h1:+020luEh2TKB4/GOp8oxxtq0Daoen/Cii55CzbTV6DU=
github.com/golang/snappy v0.0.1 h1:Qgr9rKW7uDUkrbSmQeiDsGa8SjGyCOGtuasMWwvp2P4=
github.com/golang/snappy v0.0.1/go.mod h1:/XxbfmMg8lxefKM7IXC3fBNl/7bRcc72aCRzEWrmP2Q=
github.com/klauspost/compress v1.9.8 h1:VMAMUUOh+gaxKTMk+zqbjsSjsIcUcL/LF4o63i82QyA=
github.com/klauspost/compress v1.9.8/go.mod h1:RyIbtBH6LamlWaDj8nUwkbUhJ87Yi3uG0guNDohfE1A=
github.com/pierrec/lz4 v2.0.5+incompatible h1:2xWsjqPFWcplujydGg4WmhC/6fZqK42wMM8aXeqhl0I=
github.com/pierrec/lz4 v2.0.5+incompatible/go.mod h1:pdkljMzZIN41W+lC3N2tnIh5sFi+IEE17M5jbnwPHcY=
github.com/segmentio/kafka-go v0.4.5 h1:vphUaNc3rt77MlGjGfV6AjGq/piP+04wzDLuIiAE9iE=
github.com/segmentio/kafka-go v0.4.5/go.mod h1:Inh7PqOsxmfgasV8InZYKVXWsdjcCq2d9tFV75GLbuM=
github.com/xdg/scram v0.0.0-20180814205039-7eeb5667e42c h1:u40Z8hqBAAQyv+vATcGgV0YCnDjqSL7/q/JyPhhJSPk=
github.com/xdg/scram v0.0.0-20180814205039-7eeb5667e42c/go.mod h1:lB8K/P019DLNhemzwFU4jHLhdvlE6uDZjXFejJXr49I=
github.com/xdg/stringprep v1.0.0 h1:d9X0esnoa3dFsV0FG35rAT0RIhYFlPq7MiP+DW89La0=
github.com/xdg/stringprep v1.0.0/go.mod h1:Jhud4/sHMO4oL310DaZAKk9ZaJ08SJfe+sJh0HrGL1Y=
golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w=
golang.org/x/crypto v0.0.0-20190506204251-e1dfcc566284 h1:rlLehGeYg6jfoyz/eDqDU1iRXLKfR42nnNh57ytKEWo=
golang.org/x/crypto v0.0.0-20190506204251-e1dfcc566284/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI=
golang.org/x/net v0.0.0-20190404232315-eb5bcb51f2a3 h1:0GoQqolDA55aaLxZyTzK/Y2ePZzZTUrRacwib7cNsYQ=
golang.org/x/net v0.0.0-20190404232315-eb5bcb51f2a3/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg=
golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
golang.org/x/sys v0.0.0-20190412213103-97732733099d/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/text v0.3.0 h1:g61tztE5qeGQ89tm6NTjjM9VPIm088od1l6aSorWRWg=
golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ=
6 changes: 3 additions & 3 deletions examples/producer-api/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,11 +30,11 @@ func producerHandler(kafkaWriter *kafka.Writer) func(http.ResponseWriter, *http.
}

func getKafkaWriter(kafkaURL, topic string) *kafka.Writer {
return kafka.NewWriter(kafka.WriterConfig{
Brokers: []string{kafkaURL},
return &kafka.Writer{
Addr: kafka.TCP(kafkaURL),
Topic: topic,
Balancer: &kafka.LeastBytes{},
})
}
}

func main() {
Expand Down
4 changes: 3 additions & 1 deletion examples/producer-random/go.mod
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
module github.com/tarekbadrshalaan/GoKafka/kafka-go

go 1.15

require (
github.com/google/uuid v1.1.0
github.com/segmentio/kafka-go v0.2.2
github.com/segmentio/kafka-go v0.4.5
)
25 changes: 23 additions & 2 deletions examples/producer-random/go.sum
Original file line number Diff line number Diff line change
@@ -1,4 +1,25 @@
github.com/eapache/go-xerial-snappy v0.0.0-20180814174437-776d5712da21 h1:YEetp8/yCZMuEPMUDHG0CW/brkkEp8mzqk2+ODEitlw=
github.com/eapache/go-xerial-snappy v0.0.0-20180814174437-776d5712da21/go.mod h1:+020luEh2TKB4/GOp8oxxtq0Daoen/Cii55CzbTV6DU=
github.com/golang/snappy v0.0.1 h1:Qgr9rKW7uDUkrbSmQeiDsGa8SjGyCOGtuasMWwvp2P4=
github.com/golang/snappy v0.0.1/go.mod h1:/XxbfmMg8lxefKM7IXC3fBNl/7bRcc72aCRzEWrmP2Q=
github.com/google/uuid v1.1.0 h1:Jf4mxPC/ziBnoPIdpQdPJ9OeiomAUHLvxmPRSPH9m4s=
github.com/google/uuid v1.1.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo=
github.com/segmentio/kafka-go v0.2.2 h1:KIUln5unPisRL2yyAkZsDR/coiymN9Djunv6JKGQ6JI=
github.com/segmentio/kafka-go v0.2.2/go.mod h1:X6itGqS9L4jDletMsxZ7Dz+JFWxM6JHfPOCvTvk+EJo=
github.com/klauspost/compress v1.9.8 h1:VMAMUUOh+gaxKTMk+zqbjsSjsIcUcL/LF4o63i82QyA=
github.com/klauspost/compress v1.9.8/go.mod h1:RyIbtBH6LamlWaDj8nUwkbUhJ87Yi3uG0guNDohfE1A=
github.com/pierrec/lz4 v2.0.5+incompatible h1:2xWsjqPFWcplujydGg4WmhC/6fZqK42wMM8aXeqhl0I=
github.com/pierrec/lz4 v2.0.5+incompatible/go.mod h1:pdkljMzZIN41W+lC3N2tnIh5sFi+IEE17M5jbnwPHcY=
github.com/segmentio/kafka-go v0.4.5 h1:vphUaNc3rt77MlGjGfV6AjGq/piP+04wzDLuIiAE9iE=
github.com/segmentio/kafka-go v0.4.5/go.mod h1:Inh7PqOsxmfgasV8InZYKVXWsdjcCq2d9tFV75GLbuM=
github.com/xdg/scram v0.0.0-20180814205039-7eeb5667e42c h1:u40Z8hqBAAQyv+vATcGgV0YCnDjqSL7/q/JyPhhJSPk=
github.com/xdg/scram v0.0.0-20180814205039-7eeb5667e42c/go.mod h1:lB8K/P019DLNhemzwFU4jHLhdvlE6uDZjXFejJXr49I=
github.com/xdg/stringprep v1.0.0 h1:d9X0esnoa3dFsV0FG35rAT0RIhYFlPq7MiP+DW89La0=
github.com/xdg/stringprep v1.0.0/go.mod h1:Jhud4/sHMO4oL310DaZAKk9ZaJ08SJfe+sJh0HrGL1Y=
golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w=
golang.org/x/crypto v0.0.0-20190506204251-e1dfcc566284 h1:rlLehGeYg6jfoyz/eDqDU1iRXLKfR42nnNh57ytKEWo=
golang.org/x/crypto v0.0.0-20190506204251-e1dfcc566284/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI=
golang.org/x/net v0.0.0-20190404232315-eb5bcb51f2a3 h1:0GoQqolDA55aaLxZyTzK/Y2ePZzZTUrRacwib7cNsYQ=
golang.org/x/net v0.0.0-20190404232315-eb5bcb51f2a3/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg=
golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
golang.org/x/sys v0.0.0-20190412213103-97732733099d/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/text v0.3.0 h1:g61tztE5qeGQ89tm6NTjjM9VPIm088od1l6aSorWRWg=
golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ=
6 changes: 3 additions & 3 deletions examples/producer-random/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,11 +11,11 @@ import (
)

func newKafkaWriter(kafkaURL, topic string) *kafka.Writer {
return kafka.NewWriter(kafka.WriterConfig{
Brokers: []string{kafkaURL},
return &kafka.Writer{
Addr: kafka.TCP(kafkaURL),
Topic: topic,
Balancer: &kafka.LeastBytes{},
})
}
}

func main() {
Expand Down
34 changes: 15 additions & 19 deletions reader_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -906,13 +906,12 @@ func testReaderConsumerGroupVerifyCommitsOnClose(t *testing.T, ctx context.Conte
func testReaderConsumerGroupReadContentAcrossPartitions(t *testing.T, ctx context.Context, r *Reader) {
const N = 12

writer := NewWriter(WriterConfig{
Brokers: r.config.Brokers,
writer := &Writer{
Addr: TCP(r.config.Brokers...),
Topic: r.config.Topic,
Dialer: r.config.Dialer,
Balancer: &RoundRobin{},
BatchSize: 1,
})
}
if err := writer.WriteMessages(ctx, makeTestSequence(N)...); err != nil {
t.Fatalf("bad write messages: %v", err)
}
Expand Down Expand Up @@ -944,13 +943,12 @@ func testReaderConsumerGroupRebalance(t *testing.T, ctx context.Context, r *Read
)

// rebalance should result in 12 message in each of the partitions
writer := NewWriter(WriterConfig{
Brokers: r.config.Brokers,
writer := &Writer{
Addr: TCP(r.config.Brokers...),
Topic: r.config.Topic,
Dialer: r.config.Dialer,
Balancer: &RoundRobin{},
BatchSize: 1,
})
}
if err := writer.WriteMessages(ctx, makeTestSequence(N*partitions)...); err != nil {
t.Fatalf("bad write messages: %v", err)
}
Expand Down Expand Up @@ -994,13 +992,12 @@ func testReaderConsumerGroupRebalanceAcrossTopics(t *testing.T, ctx context.Cont
)

// write messages across both partitions
writer := NewWriter(WriterConfig{
Brokers: r.config.Brokers,
writer := &Writer{
Addr: TCP(r.config.Brokers...),
Topic: r.config.Topic,
Dialer: r.config.Dialer,
Balancer: &RoundRobin{},
BatchSize: 1,
})
}
if err := writer.WriteMessages(ctx, makeTestSequence(N)...); err != nil {
t.Fatalf("bad write messages: %v", err)
}
Expand Down Expand Up @@ -1047,13 +1044,12 @@ func testReaderConsumerGroupRebalanceAcrossManyPartitionsAndConsumers(t *testing
}()

// write messages across both partitions
writer := NewWriter(WriterConfig{
Brokers: r.config.Brokers,
writer := &Writer{
Addr: TCP(r.config.Brokers...),
Topic: r.config.Topic,
Dialer: r.config.Dialer,
Balancer: &RoundRobin{},
BatchSize: 1,
})
}
if err := writer.WriteMessages(ctx, makeTestSequence(N*3)...); err != nil {
t.Fatalf("bad write messages: %v", err)
}
Expand Down Expand Up @@ -1299,12 +1295,12 @@ func TestConsumerGroupWithMissingTopic(t *testing.T) {
createTopic(t, conf.Topic, 1)
defer deleteTopic(t, conf.Topic)

w := NewWriter(WriterConfig{
Brokers: r.config.Brokers,
w := &Writer{
Addr: TCP(r.config.Brokers...),
Topic: r.config.Topic,
BatchTimeout: 10 * time.Millisecond,
BatchSize: 1,
})
}
defer w.Close()
if err := w.WriteMessages(ctx, Message{}); err != nil {
t.Fatalf("write error: %+v", err)
Expand Down
6 changes: 3 additions & 3 deletions write_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -205,12 +205,12 @@ func TestWriteV2RecordBatch(t *testing.T) {
value := fmt.Sprintf("Sample message content: %d!", i)
msgs[i] = Message{Key: []byte("Key"), Value: []byte(value), Headers: []Header{Header{Key: "hk", Value: []byte("hv")}}}
}
w := NewWriter(WriterConfig{
Brokers: []string{"localhost:9092"},
w := &Writer{
Addr: TCP("localhost:9092"),
Topic: topic,
BatchTimeout: 100 * time.Millisecond,
BatchSize: 5,
})
}

ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
defer cancel()
Expand Down