-
Notifications
You must be signed in to change notification settings - Fork 812
fix zstd decoder leak #543
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
df53faf
201f7e1
9b42977
c8476a8
e227f01
d8aacd5
1d66dfb
fab1751
1cc99b6
f2aaf8f
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -88,19 +88,24 @@ func testEncodeDecode(t *testing.T, m kafka.Message, codec pkg.Codec) { | |
t.Run("encode with "+codec.Name(), func(t *testing.T) { | ||
r1, err = compress(codec, m.Value) | ||
if err != nil { | ||
t.Error(err) | ||
t.Fatal(err) | ||
} | ||
}) | ||
|
||
t.Run("decode with "+codec.Name(), func(t *testing.T) { | ||
if r1 == nil { | ||
if r1, err = compress(codec, m.Value); err != nil { | ||
t.Fatal(err) | ||
} | ||
} | ||
r2, err = decompress(codec, r1) | ||
if err != nil { | ||
t.Error(err) | ||
t.Fatal(err) | ||
} | ||
if string(r2) != "message" { | ||
t.Error("bad message") | ||
t.Log("got: ", string(r2)) | ||
t.Log("expected: ", string(m.Value)) | ||
t.Logf("expected: %q", string(m.Value)) | ||
t.Logf("got: %q", string(r2)) | ||
} | ||
}) | ||
} | ||
|
@@ -116,15 +121,16 @@ func TestCompressedMessages(t *testing.T) { | |
} | ||
|
||
func testCompressedMessages(t *testing.T, codec pkg.Codec) { | ||
t.Run("produce/consume with"+codec.Name(), func(t *testing.T) { | ||
topic := createTopic(t, 1) | ||
defer deleteTopic(t, topic) | ||
t.Run(codec.Name(), func(t *testing.T) { | ||
client, topic, shutdown := newLocalClientAndTopic() | ||
defer shutdown() | ||
|
||
w := &kafka.Writer{ | ||
Addr: kafka.TCP("127.0.0.1:9092"), | ||
Topic: topic, | ||
Compression: kafka.Compression(codec.Code()), | ||
BatchTimeout: 10 * time.Millisecond, | ||
Transport: client.Transport, | ||
} | ||
defer w.Close() | ||
|
||
|
@@ -185,19 +191,23 @@ func testCompressedMessages(t *testing.T, codec pkg.Codec) { | |
} | ||
|
||
func TestMixedCompressedMessages(t *testing.T) { | ||
topic := createTopic(t, 1) | ||
defer deleteTopic(t, topic) | ||
client, topic, shutdown := newLocalClientAndTopic() | ||
defer shutdown() | ||
|
||
offset := 0 | ||
var values []string | ||
produce := func(n int, codec pkg.Codec) { | ||
w := &kafka.Writer{ | ||
Addr: kafka.TCP("127.0.0.1:9092"), | ||
Topic: topic, | ||
Compression: kafka.Compression(codec.Code()), | ||
Addr: kafka.TCP("127.0.0.1:9092"), | ||
Topic: topic, | ||
Transport: client.Transport, | ||
} | ||
defer w.Close() | ||
|
||
if codec != nil { | ||
w.Compression = kafka.Compression(codec.Code()) | ||
} | ||
|
||
msgs := make([]kafka.Message, n) | ||
for i := range msgs { | ||
value := fmt.Sprintf("Hello World %d!", offset) | ||
|
@@ -407,58 +417,72 @@ func benchmarkCompression(b *testing.B, codec pkg.Codec, buf *bytes.Buffer, payl | |
return 1 - (float64(buf.Len()) / float64(len(payload))) | ||
} | ||
|
||
func init() { | ||
rand.Seed(time.Now().UnixNano()) | ||
} | ||
|
||
func makeTopic() string { | ||
return fmt.Sprintf("kafka-go-%016x", rand.Int63()) | ||
} | ||
|
||
func createTopic(t *testing.T, partitions int) string { | ||
func newLocalClientAndTopic() (*kafka.Client, string, func()) { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. seems to mostly be a copy of what's in client_test.go, would it help to move this to the There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Unfortunately it would result in a cycle between the top level |
||
topic := makeTopic() | ||
|
||
conn, err := kafka.Dial("tcp", "localhost:9092") | ||
client, shutdown := newLocalClient() | ||
|
||
_, err := client.CreateTopics(context.Background(), &kafka.CreateTopicsRequest{ | ||
Topics: []kafka.TopicConfig{{ | ||
Topic: topic, | ||
NumPartitions: 1, | ||
ReplicationFactor: 1, | ||
}}, | ||
}) | ||
if err != nil { | ||
t.Fatal(err) | ||
shutdown() | ||
panic(err) | ||
} | ||
defer conn.Close() | ||
|
||
err = conn.CreateTopics(kafka.TopicConfig{ | ||
Topic: topic, | ||
NumPartitions: partitions, | ||
ReplicationFactor: 1, | ||
}) | ||
// Topic creation seems to be asynchronous. Metadata for the topic partition | ||
// layout in the cluster is available in the controller before being synced | ||
// with the other brokers, which causes "Error:[3] Unknown Topic Or Partition" | ||
// when sending requests to the partition leaders. | ||
for i := 0; i < 20; i++ { | ||
r, err := client.Fetch(context.Background(), &kafka.FetchRequest{ | ||
Topic: topic, | ||
Partition: 0, | ||
Offset: 0, | ||
}) | ||
if err == nil && r.Error == nil { | ||
break | ||
} | ||
time.Sleep(100 * time.Millisecond) | ||
} | ||
|
||
switch err { | ||
case nil: | ||
// ok | ||
case kafka.TopicAlreadyExists: | ||
// ok | ||
default: | ||
t.Error("bad createTopics", err) | ||
t.FailNow() | ||
return client, topic, func() { | ||
client.DeleteTopics(context.Background(), &kafka.DeleteTopicsRequest{ | ||
Topics: []string{topic}, | ||
}) | ||
shutdown() | ||
} | ||
} | ||
|
||
return topic | ||
func newLocalClient() (*kafka.Client, func()) { | ||
return newClient(kafka.TCP("127.0.0.1:9092")) | ||
} | ||
|
||
func deleteTopic(t *testing.T, topic ...string) { | ||
conn, err := kafka.Dial("tcp", "localhost:9092") | ||
if err != nil { | ||
t.Fatal(err) | ||
func newClient(addr net.Addr) (*kafka.Client, func()) { | ||
conns := &ktesting.ConnWaitGroup{ | ||
DialFunc: (&net.Dialer{}).DialContext, | ||
} | ||
defer conn.Close() | ||
|
||
controller, err := conn.Controller() | ||
if err != nil { | ||
t.Fatal(err) | ||
transport := &kafka.Transport{ | ||
Dial: conns.Dial, | ||
} | ||
|
||
conn, err = kafka.Dial("tcp", net.JoinHostPort(controller.Host, strconv.Itoa(controller.Port))) | ||
if err != nil { | ||
t.Fatal(err) | ||
client := &kafka.Client{ | ||
Addr: addr, | ||
Timeout: 5 * time.Second, | ||
Transport: transport, | ||
} | ||
|
||
conn.SetDeadline(time.Now().Add(2 * time.Second)) | ||
|
||
if err := conn.DeleteTopics(topic...); err != nil { | ||
t.Fatal(err) | ||
} | ||
return client, func() { transport.CloseIdleConnections(); conns.Wait() } | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
is r1 only nil if the first test fails? in that case does the
t.Fatal
cover this already?There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
r1
may benil
if the first test is not run, which is what this check is intended to cover (for example if-run
is used in thego test
invocation).