Skip to content

Commit f4ae857

Browse files
luffyaonlsun
authored andcommitted
Add DescribeConfigs, AlterConfigs & CreatePartition implementation
1 parent 1d5a83b commit f4ae857

12 files changed

+728
-4
lines changed

alterconfigs.go

+106
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,106 @@
1+
package kafka
2+
3+
import (
4+
"context"
5+
"fmt"
6+
"net"
7+
"time"
8+
9+
"github.com/segmentio/kafka-go/protocol/alterconfigs"
10+
)
11+
12+
// AlterConfigsRequest represents a request sent to a kafka broker to alter configs
13+
type AlterConfigsRequest struct {
14+
// Address of the kafka broker to send the request to.
15+
Addr net.Addr
16+
17+
// List of resources to update.
18+
Resources []AlterConfigRequestResource
19+
20+
// When set to true, topics are not created but the configuration is
21+
// validated as if they were.
22+
ValidateOnly bool
23+
}
24+
25+
type AlterConfigRequestResource struct {
26+
// Resource Type
27+
ResourceType ResourceType
28+
29+
// Resource Name
30+
ResourceName string
31+
32+
// Configs is a list of configuration updates.
33+
Configs []AlterConfigRequestConfig
34+
}
35+
36+
type AlterConfigRequestConfig struct {
37+
// Configuration key name
38+
Name string
39+
40+
// The value to set for the configuration key.
41+
Value string
42+
}
43+
44+
// AlterConfigsResponse represents a response from a kafka broker to an alter config request.
45+
type AlterConfigsResponse struct {
46+
// Duration for which the request was throttled due to a quota violation.
47+
Throttle time.Duration
48+
49+
// Mapping of topic names to errors that occurred while attempting to create
50+
// the topics.
51+
//
52+
// The errors contain the kafka error code. Programs may use the standard
53+
// errors.Is function to test the error against kafka error codes.
54+
Errors map[AlterConfigsResponseResource]error
55+
}
56+
57+
// AlterConfigsResponseResource
58+
type AlterConfigsResponseResource struct {
59+
Type int8
60+
Name string
61+
}
62+
63+
// AlterConfigs sends a config altering request to a kafka broker and returns the
64+
// response.
65+
func (c *Client) AlterConfigs(ctx context.Context, req *AlterConfigsRequest) (*AlterConfigsResponse, error) {
66+
resources := make([]alterconfigs.RequestResources, len(req.Resources))
67+
68+
for i, t := range req.Resources {
69+
configs := make([]alterconfigs.RequestConfig, len(t.Configs))
70+
for j, v := range t.Configs {
71+
configs[j] = alterconfigs.RequestConfig{
72+
Name: v.Name,
73+
Value: v.Value,
74+
}
75+
}
76+
resources[i] = alterconfigs.RequestResources{
77+
ResourceType: int8(t.ResourceType),
78+
ResourceName: t.ResourceName,
79+
Configs: configs,
80+
}
81+
}
82+
83+
m, err := c.roundTrip(ctx, req.Addr, &alterconfigs.Request{
84+
Resources: resources,
85+
ValidateOnly: req.ValidateOnly,
86+
})
87+
88+
if err != nil {
89+
return nil, fmt.Errorf("kafka.(*Client).AlterConfigs: %w", err)
90+
}
91+
92+
res := m.(*alterconfigs.Response)
93+
ret := &AlterConfigsResponse{
94+
Throttle: makeDuration(res.ThrottleTimeMs),
95+
Errors: make(map[AlterConfigsResponseResource]error, len(res.Responses)),
96+
}
97+
98+
for _, t := range res.Responses {
99+
ret.Errors[AlterConfigsResponseResource{
100+
Type: t.ResourceType,
101+
Name: t.ResourceName,
102+
}] = makeError(t.ErrorCode, t.ErrorMessage)
103+
}
104+
105+
return ret, nil
106+
}

alterconfigs_test.go

+63
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,63 @@
1+
package kafka
2+
3+
import (
4+
"context"
5+
"testing"
6+
7+
ktesting "github.com/segmentio/kafka-go/testing"
8+
"github.com/stretchr/testify/assert"
9+
)
10+
11+
func TestClientAlterConfigs(t *testing.T) {
12+
if !ktesting.KafkaIsAtLeast("0.11.0") {
13+
return
14+
}
15+
16+
const (
17+
MaxMessageBytes = "max.message.bytes"
18+
MaxMessageBytesValue = "200000"
19+
)
20+
21+
client, shutdown := newLocalClient()
22+
defer shutdown()
23+
24+
topic := makeTopic()
25+
createTopic(t, topic, 1)
26+
defer deleteTopic(t, topic)
27+
28+
_, err := client.AlterConfigs(context.Background(), &AlterConfigsRequest{
29+
Resources: []AlterConfigRequestResource{{
30+
ResourceType: ResourceTypeTopic,
31+
ResourceName: topic,
32+
Configs: []AlterConfigRequestConfig{{
33+
Name: MaxMessageBytes,
34+
Value: MaxMessageBytesValue,
35+
},
36+
},
37+
}},
38+
})
39+
40+
if err != nil {
41+
t.Fatal(err)
42+
}
43+
44+
describeResp, err := client.DescribeConfigs(context.Background(), &DescribeConfigsRequest{
45+
Resources: []DescribeConfigRequestResource{{
46+
ResourceType: ResourceTypeTopic,
47+
ResourceName: topic,
48+
ConfigNames: []string{MaxMessageBytes},
49+
}},
50+
})
51+
52+
maxMessageBytesValue := "0"
53+
for _, resource := range describeResp.Resources {
54+
if resource.ResourceType == int8(ResourceTypeTopic) && resource.ResourceName == topic {
55+
for _, entry := range resource.ConfigEntries {
56+
if entry.ConfigName == MaxMessageBytes {
57+
maxMessageBytesValue = entry.ConfigValue
58+
}
59+
}
60+
}
61+
}
62+
assert.Equal(t, maxMessageBytesValue, MaxMessageBytesValue)
63+
}

client.go

+5-4
Original file line numberDiff line numberDiff line change
@@ -10,10 +10,11 @@ import (
1010
)
1111

1212
const (
13-
defaultCreateTopicsTimeout = 2 * time.Second
14-
defaultDeleteTopicsTimeout = 2 * time.Second
15-
defaultProduceTimeout = 500 * time.Millisecond
16-
defaultMaxWait = 500 * time.Millisecond
13+
defaultCreateTopicsTimeout = 2 * time.Second
14+
defaultDeleteTopicsTimeout = 2 * time.Second
15+
defaultCreatePartitionsTimeout = 2 * time.Second
16+
defaultProduceTimeout = 500 * time.Millisecond
17+
defaultMaxWait = 500 * time.Millisecond
1718
)
1819

1920
// Client is a high-level API to interract with kafka brokers.

createpartitions.go

+103
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,103 @@
1+
package kafka
2+
3+
import (
4+
"context"
5+
"fmt"
6+
"net"
7+
"time"
8+
9+
"github.com/segmentio/kafka-go/protocol/createpartitions"
10+
)
11+
12+
// CreatePartitionsRequest represents a request sent to a kafka broker to create
13+
// and update topic parititions.
14+
type CreatePartitionsRequest struct {
15+
// Address of the kafka broker to send the request to.
16+
Addr net.Addr
17+
18+
// List of topics to create and their configuration.
19+
Topics []TopicPartitionsConfig
20+
21+
// When set to true, topics are not created but the configuration is
22+
// validated as if they were.
23+
ValidateOnly bool
24+
}
25+
26+
// CreatePartitionsResponse represents a response from a kafka broker to a partition
27+
// creation request.
28+
type CreatePartitionsResponse struct {
29+
// The amount of time that the broker throttled the request.
30+
Throttle time.Duration
31+
32+
// Mapping of topic names to errors that occurred while attempting to create
33+
// the topics.
34+
//
35+
// The errors contain the kafka error code. Programs may use the standard
36+
// errors.Is function to test the error against kafka error codes.
37+
Errors map[string]error
38+
}
39+
40+
// CreatePartitions sends a partitions creation request to a kafka broker and returns the
41+
// response.
42+
func (c *Client) CreatePartitions(ctx context.Context, req *CreatePartitionsRequest) (*CreatePartitionsResponse, error) {
43+
topics := make([]createpartitions.RequestTopic, len(req.Topics))
44+
45+
for i, t := range req.Topics {
46+
topics[i] = createpartitions.RequestTopic{
47+
Name: t.Name,
48+
Count: t.Count,
49+
Assignments: t.assignments(),
50+
}
51+
}
52+
53+
m, err := c.roundTrip(ctx, req.Addr, &createpartitions.Request{
54+
Topics: topics,
55+
TimeoutMs: c.timeoutMs(ctx, defaultCreatePartitionsTimeout),
56+
ValidateOnly: req.ValidateOnly,
57+
})
58+
59+
if err != nil {
60+
return nil, fmt.Errorf("kafka.(*Client).CreatePartitions: %w", err)
61+
}
62+
63+
res := m.(*createpartitions.Response)
64+
ret := &CreatePartitionsResponse{
65+
Throttle: makeDuration(res.ThrottleTimeMs),
66+
Errors: make(map[string]error, len(res.Results)),
67+
}
68+
69+
for _, t := range res.Results {
70+
ret.Errors[t.Name] = makeError(t.ErrorCode, t.ErrorMessage)
71+
}
72+
73+
return ret, nil
74+
}
75+
76+
type TopicPartitionsConfig struct {
77+
// Topic name
78+
Name string
79+
80+
// Topic partition's count.
81+
Count int32
82+
83+
// TopicPartitionAssignments among kafka brokers for this topic partitions.
84+
TopicPartitionAssignments []TopicPartitionAssignment
85+
}
86+
87+
func (t *TopicPartitionsConfig) assignments() []createpartitions.RequestAssignment {
88+
if len(t.TopicPartitionAssignments) == 0 {
89+
return nil
90+
}
91+
assignments := make([]createpartitions.RequestAssignment, len(t.TopicPartitionAssignments))
92+
for i, a := range t.TopicPartitionAssignments {
93+
assignments[i] = createpartitions.RequestAssignment{
94+
BrokerIDs: a.BrokerIDs,
95+
}
96+
}
97+
return assignments
98+
}
99+
100+
type TopicPartitionAssignment struct {
101+
// Broker IDs
102+
BrokerIDs []int32
103+
}

createpartitions_test.go

+44
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,44 @@
1+
package kafka
2+
3+
import (
4+
"context"
5+
"testing"
6+
7+
ktesting "github.com/segmentio/kafka-go/testing"
8+
)
9+
10+
func TestClientCreatePartitions(t *testing.T) {
11+
if !ktesting.KafkaIsAtLeast("1.0.1") {
12+
return
13+
}
14+
15+
client, shutdown := newLocalClient()
16+
defer shutdown()
17+
18+
topic := makeTopic()
19+
createTopic(t, topic, 1)
20+
defer deleteTopic(t, topic)
21+
22+
res, err := client.CreatePartitions(context.Background(), &CreatePartitionsRequest{
23+
Topics: []TopicPartitionsConfig{
24+
TopicPartitionsConfig{
25+
Name: topic,
26+
Count: 2,
27+
TopicPartitionAssignments: []TopicPartitionAssignment{
28+
TopicPartitionAssignment{
29+
BrokerIDs: []int32{1},
30+
},
31+
},
32+
},
33+
},
34+
ValidateOnly: false,
35+
})
36+
37+
if err != nil {
38+
t.Fatal(err)
39+
}
40+
41+
if err := res.Errors[topic]; err != nil {
42+
t.Error(err)
43+
}
44+
}

0 commit comments

Comments
 (0)