Skip to content

Commit 57e833c

Browse files
authored
Add Consumer reconciler for scheduling and scaling (#1601)
* Add Consumer reconciler for scheduling and scaling Signed-off-by: Pierangelo Di Pilato <[email protected]> * Refactor get* methods, rename + flatten structure Signed-off-by: Pierangelo Di Pilato <[email protected]> * Add comment on schedule Signed-off-by: Pierangelo Di Pilato <[email protected]> * Fix compilation error Signed-off-by: Pierangelo Di Pilato <[email protected]>
1 parent 7605d59 commit 57e833c

File tree

14 files changed

+1111
-9
lines changed

14 files changed

+1111
-9
lines changed
Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,22 @@
1+
/*
2+
* Copyright 2021 The Knative Authors
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package eventing
18+
19+
const (
20+
// ConfigMapVolumeName is the volume name of the data plane ConfigMap
21+
ConfigMapVolumeName = "kafka-resources"
22+
)

control-plane/pkg/apis/internals/kafka/eventing/v1alpha1/consumer_group_validation.go

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -42,5 +42,9 @@ func (cgs *ConsumerGroupSpec) Validate(ctx context.Context) *apis.FieldError {
4242
}
4343

4444
func (cts *ConsumerTemplateSpec) Validate(ctx context.Context) *apis.FieldError {
45-
return cts.Spec.Validate(ctx).ViaField("spec")
45+
specCtx := ctx
46+
if apis.IsInUpdate(ctx) {
47+
specCtx = apis.WithinUpdate(ctx, apis.GetBaseline(ctx).(*ConsumerGroup).Spec.Template.Spec)
48+
}
49+
return cts.Spec.Validate(specCtx).ViaField("spec")
4650
}

control-plane/pkg/apis/internals/kafka/eventing/v1alpha1/consumer_group_validation_test.go

Lines changed: 266 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -95,7 +95,8 @@ func TestConsumerGroup_Validate(t *testing.T) {
9595
Topics: []string{"t1"},
9696
Configs: ConsumerConfigs{
9797
Configs: map[string]string{
98-
"group.id": "g1",
98+
"group.id": "g1",
99+
"bootstrap.servers": "kafka:9092",
99100
},
100101
},
101102
Delivery: &DeliverySpec{
@@ -107,12 +108,276 @@ func TestConsumerGroup_Validate(t *testing.T) {
107108
Host: "127.0.0.1",
108109
},
109110
},
111+
PodBind: &PodBind{
112+
PodName: "p-0",
113+
PodNamespace: "ns",
114+
},
115+
},
116+
},
117+
},
118+
},
119+
wantErr: false,
120+
},
121+
{
122+
name: "invalid no pod name",
123+
ctx: context.Background(),
124+
given: &ConsumerGroup{
125+
Spec: ConsumerGroupSpec{
126+
Replicas: pointer.Int32Ptr(1),
127+
Selector: map[string]string{"app": "app"},
128+
Template: ConsumerTemplateSpec{
129+
Spec: ConsumerSpec{
130+
Topics: []string{"t1"},
131+
Configs: ConsumerConfigs{
132+
Configs: map[string]string{
133+
"group.id": "g1",
134+
"bootstrap.servers": "kafka:9092",
135+
},
136+
},
137+
Delivery: &DeliverySpec{
138+
DeliverySpec: &eventingduck.DeliverySpec{},
139+
},
140+
Subscriber: duckv1.Destination{
141+
URI: &apis.URL{
142+
Scheme: "http",
143+
Host: "127.0.0.1",
144+
},
145+
},
146+
PodBind: &PodBind{
147+
PodNamespace: "ns",
148+
},
149+
},
150+
},
151+
},
152+
},
153+
wantErr: true,
154+
},
155+
{
156+
name: "invalid no pod namespace",
157+
ctx: context.Background(),
158+
given: &ConsumerGroup{
159+
Spec: ConsumerGroupSpec{
160+
Replicas: pointer.Int32Ptr(1),
161+
Selector: map[string]string{"app": "app"},
162+
Template: ConsumerTemplateSpec{
163+
Spec: ConsumerSpec{
164+
Topics: []string{"t1"},
165+
Configs: ConsumerConfigs{
166+
Configs: map[string]string{
167+
"group.id": "g1",
168+
"bootstrap.servers": "kafka:9092",
169+
},
170+
},
171+
Delivery: &DeliverySpec{
172+
DeliverySpec: &eventingduck.DeliverySpec{},
173+
},
174+
Subscriber: duckv1.Destination{
175+
URI: &apis.URL{
176+
Scheme: "http",
177+
Host: "127.0.0.1",
178+
},
179+
},
180+
PodBind: &PodBind{
181+
PodName: "p-0",
182+
},
183+
},
184+
},
185+
},
186+
},
187+
wantErr: true,
188+
},
189+
{
190+
name: "valid - no updates",
191+
ctx: apis.WithinUpdate(context.Background(), &ConsumerGroup{
192+
Spec: ConsumerGroupSpec{
193+
Replicas: pointer.Int32Ptr(1),
194+
Selector: map[string]string{"app": "app"},
195+
Template: ConsumerTemplateSpec{
196+
Spec: ConsumerSpec{
197+
Topics: []string{"t1"},
198+
Configs: ConsumerConfigs{
199+
Configs: map[string]string{
200+
"group.id": "g1",
201+
"bootstrap.servers": "kafka:9092",
202+
},
203+
},
204+
Delivery: &DeliverySpec{
205+
DeliverySpec: &eventingduck.DeliverySpec{},
206+
},
207+
Subscriber: duckv1.Destination{
208+
URI: &apis.URL{
209+
Scheme: "http",
210+
Host: "127.0.0.1",
211+
},
212+
},
213+
PodBind: &PodBind{
214+
PodName: "p-0",
215+
PodNamespace: "ns",
216+
},
217+
},
218+
},
219+
},
220+
}),
221+
given: &ConsumerGroup{
222+
Spec: ConsumerGroupSpec{
223+
Replicas: pointer.Int32Ptr(1),
224+
Selector: map[string]string{"app": "app"},
225+
Template: ConsumerTemplateSpec{
226+
Spec: ConsumerSpec{
227+
Topics: []string{"t1"},
228+
Configs: ConsumerConfigs{
229+
Configs: map[string]string{
230+
"group.id": "g1",
231+
"bootstrap.servers": "kafka:9092",
232+
},
233+
},
234+
Delivery: &DeliverySpec{
235+
DeliverySpec: &eventingduck.DeliverySpec{},
236+
},
237+
Subscriber: duckv1.Destination{
238+
URI: &apis.URL{
239+
Scheme: "http",
240+
Host: "127.0.0.1",
241+
},
242+
},
243+
PodBind: &PodBind{
244+
PodName: "p-0",
245+
PodNamespace: "ns",
246+
},
110247
},
111248
},
112249
},
113250
},
114251
wantErr: false,
115252
},
253+
{
254+
name: "invalid pod name update",
255+
ctx: apis.WithinUpdate(context.Background(), &ConsumerGroup{
256+
Spec: ConsumerGroupSpec{
257+
Replicas: pointer.Int32Ptr(1),
258+
Selector: map[string]string{"app": "app"},
259+
Template: ConsumerTemplateSpec{
260+
Spec: ConsumerSpec{
261+
Topics: []string{"t1"},
262+
Configs: ConsumerConfigs{
263+
Configs: map[string]string{
264+
"group.id": "g1",
265+
"bootstrap.servers": "kafka:9092",
266+
},
267+
},
268+
Delivery: &DeliverySpec{
269+
DeliverySpec: &eventingduck.DeliverySpec{},
270+
},
271+
Subscriber: duckv1.Destination{
272+
URI: &apis.URL{
273+
Scheme: "http",
274+
Host: "127.0.0.1",
275+
},
276+
},
277+
PodBind: &PodBind{
278+
PodName: "p-1",
279+
PodNamespace: "ns",
280+
},
281+
},
282+
},
283+
},
284+
}),
285+
given: &ConsumerGroup{
286+
Spec: ConsumerGroupSpec{
287+
Replicas: pointer.Int32Ptr(1),
288+
Selector: map[string]string{"app": "app"},
289+
Template: ConsumerTemplateSpec{
290+
Spec: ConsumerSpec{
291+
Topics: []string{"t1"},
292+
Configs: ConsumerConfigs{
293+
Configs: map[string]string{
294+
"group.id": "g1",
295+
"bootstrap.servers": "kafka:9092",
296+
},
297+
},
298+
Delivery: &DeliverySpec{
299+
DeliverySpec: &eventingduck.DeliverySpec{},
300+
},
301+
Subscriber: duckv1.Destination{
302+
URI: &apis.URL{
303+
Scheme: "http",
304+
Host: "127.0.0.1",
305+
},
306+
},
307+
PodBind: &PodBind{
308+
PodName: "p-0",
309+
PodNamespace: "ns",
310+
},
311+
},
312+
},
313+
},
314+
},
315+
wantErr: true,
316+
},
317+
{
318+
name: "invalid pod name update",
319+
ctx: apis.WithinUpdate(context.Background(), &ConsumerGroup{
320+
Spec: ConsumerGroupSpec{
321+
Replicas: pointer.Int32Ptr(1),
322+
Selector: map[string]string{"app": "app"},
323+
Template: ConsumerTemplateSpec{
324+
Spec: ConsumerSpec{
325+
Topics: []string{"t1"},
326+
Configs: ConsumerConfigs{
327+
Configs: map[string]string{
328+
"group.id": "g1",
329+
"bootstrap.servers": "kafka:9092",
330+
},
331+
},
332+
Delivery: &DeliverySpec{
333+
DeliverySpec: &eventingduck.DeliverySpec{},
334+
},
335+
Subscriber: duckv1.Destination{
336+
URI: &apis.URL{
337+
Scheme: "http",
338+
Host: "127.0.0.1",
339+
},
340+
},
341+
PodBind: &PodBind{
342+
PodName: "p-0",
343+
PodNamespace: "ns-1",
344+
},
345+
},
346+
},
347+
},
348+
}),
349+
given: &ConsumerGroup{
350+
Spec: ConsumerGroupSpec{
351+
Replicas: pointer.Int32Ptr(1),
352+
Selector: map[string]string{"app": "app"},
353+
Template: ConsumerTemplateSpec{
354+
Spec: ConsumerSpec{
355+
Topics: []string{"t1"},
356+
Configs: ConsumerConfigs{
357+
Configs: map[string]string{
358+
"group.id": "g1",
359+
"bootstrap.servers": "kafka:9092",
360+
},
361+
},
362+
Delivery: &DeliverySpec{
363+
DeliverySpec: &eventingduck.DeliverySpec{},
364+
},
365+
Subscriber: duckv1.Destination{
366+
URI: &apis.URL{
367+
Scheme: "http",
368+
Host: "127.0.0.1",
369+
},
370+
},
371+
PodBind: &PodBind{
372+
PodName: "p-0",
373+
PodNamespace: "ns-2",
374+
},
375+
},
376+
},
377+
},
378+
},
379+
wantErr: true,
380+
},
116381
}
117382
for _, tt := range tests {
118383
t.Run(tt.name, func(t *testing.T) {

control-plane/pkg/apis/internals/kafka/eventing/v1alpha1/consumer_lifecycle.go

Lines changed: 40 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,9 +17,48 @@
1717
package v1alpha1
1818

1919
import (
20+
"fmt"
21+
2022
"knative.dev/pkg/apis"
23+
"knative.dev/pkg/reconciler"
24+
)
25+
26+
const (
27+
ConsumerConditionContract = "Contract"
28+
ConsumerConditionBind = "Bind"
29+
)
30+
31+
var (
32+
consumerConditionSet = apis.NewLivingConditionSet(
33+
ConsumerConditionContract,
34+
ConsumerConditionBind,
35+
)
2136
)
2237

2338
func (c *Consumer) GetConditionSet() apis.ConditionSet {
24-
return apis.NewLivingConditionSet()
39+
return consumerConditionSet
40+
}
41+
42+
func (c *Consumer) MarkReconcileContractFailed(err error) reconciler.Event {
43+
err = fmt.Errorf("failed to reconcile contract: %w", err)
44+
c.GetConditionSet().Manage(c.GetStatus()).MarkFalse(ConsumerConditionContract, "ReconcileContract", err.Error())
45+
return err
46+
}
47+
48+
func (c *Consumer) MarkReconcileContractSucceeded() {
49+
c.GetConditionSet().Manage(c.GetStatus()).MarkTrue(ConsumerConditionContract)
50+
}
51+
52+
func (c *Consumer) MarkBindFailed(err error) reconciler.Event {
53+
err = fmt.Errorf("failed to bind resource to pod: %w", err)
54+
c.GetConditionSet().Manage(c.GetStatus()).MarkFalse(ConsumerConditionBind, "ConsumerBinding", err.Error())
55+
return err
56+
}
57+
58+
func (c *Consumer) MarkBindInProgress() {
59+
c.GetConditionSet().Manage(c.GetStatus()).MarkFalse(ConsumerConditionBind, "BindInProgress", "")
60+
}
61+
62+
func (c *Consumer) MarkBindSucceeded() {
63+
c.GetConditionSet().Manage(c.GetStatus()).MarkTrue(ConsumerConditionBind)
2564
}

0 commit comments

Comments
 (0)