Skip to content

Commit d09c6c6

Browse files
authored
Adjust APIs between AppWrapper and Kueue AppWrapper integration (#321)
1 parent 92e7a2b commit d09c6c6

16 files changed

+547
-515
lines changed

Diff for: cmd/main.go

+2-2
Original file line numberDiff line numberDiff line change
@@ -49,7 +49,7 @@ import (
4949

5050
kueue "sigs.k8s.io/kueue/apis/kueue/v1beta1"
5151

52-
workloadv1beta2 "github.com/project-codeflare/appwrapper/api/v1beta2"
52+
awv1beta2 "github.com/project-codeflare/appwrapper/api/v1beta2"
5353
"github.com/project-codeflare/appwrapper/internal/metrics"
5454
"github.com/project-codeflare/appwrapper/pkg/config"
5555
"github.com/project-codeflare/appwrapper/pkg/controller"
@@ -67,7 +67,7 @@ var (
6767
func init() {
6868
utilruntime.Must(clientgoscheme.AddToScheme(scheme))
6969
utilruntime.Must(kueue.AddToScheme(scheme))
70-
utilruntime.Must(workloadv1beta2.AddToScheme(scheme))
70+
utilruntime.Must(awv1beta2.AddToScheme(scheme))
7171
//+kubebuilder:scaffold:scheme
7272
}
7373

Diff for: internal/controller/appwrapper/appwrapper_controller.go

+118-118
Large diffs are not rendered by default.

Diff for: internal/controller/appwrapper/appwrapper_controller_test.go

+91-118
Large diffs are not rendered by default.

Diff for: internal/controller/appwrapper/fixtures_test.go

+19-19
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,7 @@ import (
3333
kueue "sigs.k8s.io/kueue/apis/kueue/v1beta1"
3434
"sigs.k8s.io/yaml"
3535

36-
workloadv1beta2 "github.com/project-codeflare/appwrapper/api/v1beta2"
36+
awv1beta2 "github.com/project-codeflare/appwrapper/api/v1beta2"
3737
)
3838

3939
const charset = "abcdefghijklmnopqrstuvwxyz0123456789"
@@ -47,16 +47,16 @@ func randName(baseName string) string {
4747
return fmt.Sprintf("%s-%s", baseName, string(b))
4848
}
4949

50-
func toAppWrapper(components ...workloadv1beta2.AppWrapperComponent) *workloadv1beta2.AppWrapper {
51-
return &workloadv1beta2.AppWrapper{
52-
TypeMeta: metav1.TypeMeta{APIVersion: workloadv1beta2.GroupVersion.String(), Kind: "AppWrapper"},
50+
func toAppWrapper(components ...awv1beta2.AppWrapperComponent) *awv1beta2.AppWrapper {
51+
return &awv1beta2.AppWrapper{
52+
TypeMeta: metav1.TypeMeta{APIVersion: awv1beta2.GroupVersion.String(), Kind: "AppWrapper"},
5353
ObjectMeta: metav1.ObjectMeta{Name: randName("aw"), Namespace: "default"},
54-
Spec: workloadv1beta2.AppWrapperSpec{Components: components},
54+
Spec: awv1beta2.AppWrapperSpec{Components: components},
5555
}
5656
}
5757

58-
func getAppWrapper(typeNamespacedName types.NamespacedName) *workloadv1beta2.AppWrapper {
59-
aw := &workloadv1beta2.AppWrapper{}
58+
func getAppWrapper(typeNamespacedName types.NamespacedName) *awv1beta2.AppWrapper {
59+
aw := &awv1beta2.AppWrapper{}
6060
err := k8sClient.Get(ctx, typeNamespacedName, aw)
6161
Expect(err).NotTo(HaveOccurred())
6262
return aw
@@ -69,21 +69,21 @@ func getNode(name string) *v1.Node {
6969
return node
7070
}
7171

72-
func getPods(aw *workloadv1beta2.AppWrapper) []v1.Pod {
72+
func getPods(aw *awv1beta2.AppWrapper) []v1.Pod {
7373
result := []v1.Pod{}
7474
podList := &v1.PodList{}
7575
err := k8sClient.List(ctx, podList, &client.ListOptions{Namespace: aw.Namespace})
7676
Expect(err).NotTo(HaveOccurred())
7777
for _, pod := range podList.Items {
78-
if awn, found := pod.Labels[workloadv1beta2.AppWrapperLabel]; found && awn == aw.Name {
78+
if awn, found := pod.Labels[awv1beta2.AppWrapperLabel]; found && awn == aw.Name {
7979
result = append(result, pod)
8080
}
8181
}
8282
return result
8383
}
8484

8585
// envTest doesn't have a Pod controller; so simulate it
86-
func setPodStatus(aw *workloadv1beta2.AppWrapper, phase v1.PodPhase, numToChange int32) error {
86+
func setPodStatus(aw *awv1beta2.AppWrapper, phase v1.PodPhase, numToChange int32) error {
8787
podList := &v1.PodList{}
8888
err := k8sClient.List(ctx, podList, &client.ListOptions{Namespace: aw.Namespace})
8989
if err != nil {
@@ -93,7 +93,7 @@ func setPodStatus(aw *workloadv1beta2.AppWrapper, phase v1.PodPhase, numToChange
9393
if numToChange <= 0 {
9494
return nil
9595
}
96-
if awn, found := pod.Labels[workloadv1beta2.AppWrapperLabel]; found && awn == aw.Name {
96+
if awn, found := pod.Labels[awv1beta2.AppWrapperLabel]; found && awn == aw.Name {
9797
pod.Status.Phase = phase
9898
err = k8sClient.Status().Update(ctx, &pod)
9999
if err != nil {
@@ -123,7 +123,7 @@ spec:
123123
limits:
124124
nvidia.com/gpu: %v`
125125

126-
func pod(milliCPU int64, numGPU int64, declarePodSets bool) workloadv1beta2.AppWrapperComponent {
126+
func pod(milliCPU int64, numGPU int64, declarePodSets bool) awv1beta2.AppWrapperComponent {
127127
yamlString := fmt.Sprintf(podYAML,
128128
randName("pod"),
129129
resource.NewMilliQuantity(milliCPU, resource.DecimalSI),
@@ -132,11 +132,11 @@ func pod(milliCPU int64, numGPU int64, declarePodSets bool) workloadv1beta2.AppW
132132

133133
jsonBytes, err := yaml.YAMLToJSON([]byte(yamlString))
134134
Expect(err).NotTo(HaveOccurred())
135-
awc := &workloadv1beta2.AppWrapperComponent{
135+
awc := &awv1beta2.AppWrapperComponent{
136136
Template: runtime.RawExtension{Raw: jsonBytes},
137137
}
138138
if declarePodSets {
139-
awc.DeclaredPodSets = []workloadv1beta2.AppWrapperPodSet{{Replicas: ptr.To(int32(1)), Path: "template"}}
139+
awc.DeclaredPodSets = []awv1beta2.AppWrapperPodSet{{Replicas: ptr.To(int32(1)), Path: "template"}}
140140
}
141141
return *awc
142142
}
@@ -181,11 +181,11 @@ spec:
181181
limits:
182182
nvidia.com/gpu: 1`
183183

184-
func complexPodYaml() workloadv1beta2.AppWrapperComponent {
184+
func complexPodYaml() awv1beta2.AppWrapperComponent {
185185
yamlString := fmt.Sprintf(complexPodYAML, randName("pod"))
186186
jsonBytes, err := yaml.YAMLToJSON([]byte(yamlString))
187187
Expect(err).NotTo(HaveOccurred())
188-
awc := &workloadv1beta2.AppWrapperComponent{
188+
awc := &awv1beta2.AppWrapperComponent{
189189
Template: runtime.RawExtension{Raw: jsonBytes},
190190
}
191191
return *awc
@@ -205,15 +205,15 @@ spec:
205205
requests:
206206
cpu: %v`
207207

208-
func malformedPod(milliCPU int64) workloadv1beta2.AppWrapperComponent {
208+
func malformedPod(milliCPU int64) awv1beta2.AppWrapperComponent {
209209
yamlString := fmt.Sprintf(malformedPodYAML,
210210
randName("pod"),
211211
resource.NewMilliQuantity(milliCPU, resource.DecimalSI))
212212

213213
jsonBytes, err := yaml.YAMLToJSON([]byte(yamlString))
214214
Expect(err).NotTo(HaveOccurred())
215-
return workloadv1beta2.AppWrapperComponent{
216-
DeclaredPodSets: []workloadv1beta2.AppWrapperPodSet{{Replicas: ptr.To(int32(1)), Path: "template"}},
215+
return awv1beta2.AppWrapperComponent{
216+
DeclaredPodSets: []awv1beta2.AppWrapperPodSet{{Replicas: ptr.To(int32(1)), Path: "template"}},
217217
Template: runtime.RawExtension{Raw: jsonBytes},
218218
}
219219
}

Diff for: internal/controller/appwrapper/resource_management.go

+23-28
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,8 @@ import (
2222
"fmt"
2323
"time"
2424

25-
workloadv1beta2 "github.com/project-codeflare/appwrapper/api/v1beta2"
25+
awv1beta2 "github.com/project-codeflare/appwrapper/api/v1beta2"
26+
utilmaps "github.com/project-codeflare/appwrapper/internal/util"
2627
"github.com/project-codeflare/appwrapper/pkg/utils"
2728
v1 "k8s.io/api/core/v1"
2829
apierrors "k8s.io/apimachinery/pkg/api/errors"
@@ -34,8 +35,6 @@ import (
3435
"sigs.k8s.io/controller-runtime/pkg/client"
3536
"sigs.k8s.io/controller-runtime/pkg/controller/controllerutil"
3637
"sigs.k8s.io/controller-runtime/pkg/log"
37-
"sigs.k8s.io/kueue/pkg/podset"
38-
utilmaps "sigs.k8s.io/kueue/pkg/util/maps"
3938
)
4039

4140
func parseComponent(raw []byte, expectedNamespace string) (*unstructured.Unstructured, error) {
@@ -188,7 +187,7 @@ func addNodeSelectorsToAffinity(spec map[string]interface{}, exprsToAdd []v1.Nod
188187
}
189188

190189
//gocyclo:ignore
191-
func (r *AppWrapperReconciler) createComponent(ctx context.Context, aw *workloadv1beta2.AppWrapper, componentIdx int) (error, bool) {
190+
func (r *AppWrapperReconciler) createComponent(ctx context.Context, aw *awv1beta2.AppWrapper, componentIdx int) (error, bool) {
192191
component := aw.Spec.Components[componentIdx]
193192
componentStatus := aw.Status.ComponentStatus[componentIdx]
194193
toMap := func(x interface{}) map[string]string {
@@ -218,17 +217,13 @@ func (r *AppWrapperReconciler) createComponent(ctx context.Context, aw *workload
218217
if err != nil {
219218
return err, true
220219
}
221-
awLabels := map[string]string{workloadv1beta2.AppWrapperLabel: aw.Name}
220+
awLabels := map[string]string{awv1beta2.AppWrapperLabel: aw.Name}
222221
obj.SetLabels(utilmaps.MergeKeepFirst(obj.GetLabels(), awLabels))
223222

224223
for podSetsIdx, podSet := range componentStatus.PodSets {
225-
toInject := &workloadv1beta2.AppWrapperPodSetInfo{}
226-
if r.Config.EnableKueueIntegrations {
227-
if podSetsIdx < len(component.PodSetInfos) {
228-
toInject = &component.PodSetInfos[podSetsIdx]
229-
} else {
230-
return fmt.Errorf("missing podSetInfo %v for component %v", podSetsIdx, componentIdx), true
231-
}
224+
toInject := &awv1beta2.AppWrapperPodSetInfo{}
225+
if podSetsIdx < len(component.PodSetInfos) {
226+
toInject = &component.PodSetInfos[podSetsIdx]
232227
}
233228

234229
p, err := utils.GetRawTemplate(obj.UnstructuredContent(), podSet.Path)
@@ -245,7 +240,7 @@ func (r *AppWrapperReconciler) createComponent(ctx context.Context, aw *workload
245240
if len(toInject.Annotations) > 0 {
246241
existing := toMap(metadata["annotations"])
247242
if err := utilmaps.HaveConflict(existing, toInject.Annotations); err != nil {
248-
return podset.BadPodSetsUpdateError("annotations", err), true
243+
return fmt.Errorf("conflict updating annotations: %w", err), true
249244
}
250245
metadata["annotations"] = utilmaps.MergeKeepFirst(existing, toInject.Annotations)
251246
}
@@ -254,15 +249,15 @@ func (r *AppWrapperReconciler) createComponent(ctx context.Context, aw *workload
254249
mergedLabels := utilmaps.MergeKeepFirst(toInject.Labels, awLabels)
255250
existing := toMap(metadata["labels"])
256251
if err := utilmaps.HaveConflict(existing, mergedLabels); err != nil {
257-
return podset.BadPodSetsUpdateError("labels", err), true
252+
return fmt.Errorf("conflict updating labels: %w", err), true
258253
}
259254
metadata["labels"] = utilmaps.MergeKeepFirst(existing, mergedLabels)
260255

261256
// NodeSelectors
262257
if len(toInject.NodeSelector) > 0 {
263258
existing := toMap(spec["nodeSelector"])
264259
if err := utilmaps.HaveConflict(existing, toInject.NodeSelector); err != nil {
265-
return podset.BadPodSetsUpdateError("nodeSelector", err), true
260+
return fmt.Errorf("conflict updating nodeSelector: %w", err), true
266261
}
267262
spec["nodeSelector"] = utilmaps.MergeKeepFirst(existing, toInject.NodeSelector)
268263
}
@@ -354,12 +349,12 @@ func (r *AppWrapperReconciler) createComponent(ctx context.Context, aw *workload
354349
log.FromContext(ctx).Info("After injection", "obj", obj)
355350

356351
orig := copyForStatusPatch(aw)
357-
if meta.FindStatusCondition(aw.Status.ComponentStatus[componentIdx].Conditions, string(workloadv1beta2.ResourcesDeployed)) == nil {
352+
if meta.FindStatusCondition(aw.Status.ComponentStatus[componentIdx].Conditions, string(awv1beta2.ResourcesDeployed)) == nil {
358353
aw.Status.ComponentStatus[componentIdx].Name = obj.GetName()
359354
aw.Status.ComponentStatus[componentIdx].Kind = obj.GetKind()
360355
aw.Status.ComponentStatus[componentIdx].APIVersion = obj.GetAPIVersion()
361356
meta.SetStatusCondition(&aw.Status.ComponentStatus[componentIdx].Conditions, metav1.Condition{
362-
Type: string(workloadv1beta2.ResourcesDeployed),
357+
Type: string(awv1beta2.ResourcesDeployed),
363358
Status: metav1.ConditionUnknown,
364359
Reason: "ComponentCreationInitiated",
365360
})
@@ -383,7 +378,7 @@ func (r *AppWrapperReconciler) createComponent(ctx context.Context, aw *workload
383378
// resource not actually created; patch status to reflect that
384379
orig := copyForStatusPatch(aw)
385380
meta.SetStatusCondition(&aw.Status.ComponentStatus[componentIdx].Conditions, metav1.Condition{
386-
Type: string(workloadv1beta2.ResourcesDeployed),
381+
Type: string(awv1beta2.ResourcesDeployed),
387382
Status: metav1.ConditionFalse,
388383
Reason: "ComponentCreationErrored",
389384
})
@@ -399,7 +394,7 @@ func (r *AppWrapperReconciler) createComponent(ctx context.Context, aw *workload
399394
orig = copyForStatusPatch(aw)
400395
aw.Status.ComponentStatus[componentIdx].Name = obj.GetName() // Update name to support usage of GenerateName
401396
meta.SetStatusCondition(&aw.Status.ComponentStatus[componentIdx].Conditions, metav1.Condition{
402-
Type: string(workloadv1beta2.ResourcesDeployed),
397+
Type: string(awv1beta2.ResourcesDeployed),
403398
Status: metav1.ConditionTrue,
404399
Reason: "ComponentCreatedSuccessfully",
405400
})
@@ -411,9 +406,9 @@ func (r *AppWrapperReconciler) createComponent(ctx context.Context, aw *workload
411406
}
412407

413408
// createComponents incrementally patches aw.Status -- MUST NOT CARRY STATUS PATCHES ACROSS INVOCATIONS
414-
func (r *AppWrapperReconciler) createComponents(ctx context.Context, aw *workloadv1beta2.AppWrapper) (error, bool) {
409+
func (r *AppWrapperReconciler) createComponents(ctx context.Context, aw *awv1beta2.AppWrapper) (error, bool) {
415410
for componentIdx := range aw.Spec.Components {
416-
if !meta.IsStatusConditionTrue(aw.Status.ComponentStatus[componentIdx].Conditions, string(workloadv1beta2.ResourcesDeployed)) {
411+
if !meta.IsStatusConditionTrue(aw.Status.ComponentStatus[componentIdx].Conditions, string(awv1beta2.ResourcesDeployed)) {
417412
if err, fatal := r.createComponent(ctx, aw, componentIdx); err != nil {
418413
return err, fatal
419414
}
@@ -422,10 +417,10 @@ func (r *AppWrapperReconciler) createComponents(ctx context.Context, aw *workloa
422417
return nil, false
423418
}
424419

425-
func (r *AppWrapperReconciler) deleteComponents(ctx context.Context, aw *workloadv1beta2.AppWrapper) bool {
420+
func (r *AppWrapperReconciler) deleteComponents(ctx context.Context, aw *awv1beta2.AppWrapper) bool {
426421
deleteIfPresent := func(idx int, opts ...client.DeleteOption) bool {
427422
cs := &aw.Status.ComponentStatus[idx]
428-
rd := meta.FindStatusCondition(cs.Conditions, string(workloadv1beta2.ResourcesDeployed))
423+
rd := meta.FindStatusCondition(cs.Conditions, string(awv1beta2.ResourcesDeployed))
429424
if rd == nil || rd.Status == metav1.ConditionFalse || (rd.Status == metav1.ConditionUnknown && cs.Name == "") {
430425
return false // not present
431426
}
@@ -437,7 +432,7 @@ func (r *AppWrapperReconciler) deleteComponents(ctx context.Context, aw *workloa
437432
if apierrors.IsNotFound(err) {
438433
// Has already been undeployed; update componentStatus and return not present
439434
meta.SetStatusCondition(&cs.Conditions, metav1.Condition{
440-
Type: string(workloadv1beta2.ResourcesDeployed),
435+
Type: string(awv1beta2.ResourcesDeployed),
441436
Status: metav1.ConditionFalse,
442437
Reason: "CompononetDeleted",
443438
})
@@ -451,7 +446,7 @@ func (r *AppWrapperReconciler) deleteComponents(ctx context.Context, aw *workloa
451446
}
452447

453448
meta.SetStatusCondition(&aw.Status.Conditions, metav1.Condition{
454-
Type: string(workloadv1beta2.DeletingResources),
449+
Type: string(awv1beta2.DeletingResources),
455450
Status: metav1.ConditionTrue,
456451
Reason: "DeletionInitiated",
457452
})
@@ -462,7 +457,7 @@ func (r *AppWrapperReconciler) deleteComponents(ctx context.Context, aw *workloa
462457
}
463458

464459
deletionGracePeriod := r.forcefulDeletionGraceDuration(ctx, aw)
465-
whenInitiated := meta.FindStatusCondition(aw.Status.Conditions, string(workloadv1beta2.DeletingResources)).LastTransitionTime
460+
whenInitiated := meta.FindStatusCondition(aw.Status.Conditions, string(awv1beta2.DeletingResources)).LastTransitionTime
466461
gracePeriodExpired := time.Now().After(whenInitiated.Time.Add(deletionGracePeriod))
467462

468463
if componentsRemaining && !gracePeriodExpired {
@@ -474,13 +469,13 @@ func (r *AppWrapperReconciler) deleteComponents(ctx context.Context, aw *workloa
474469
if err := r.List(ctx, pods,
475470
client.UnsafeDisableDeepCopy,
476471
client.InNamespace(aw.Namespace),
477-
client.MatchingLabels{workloadv1beta2.AppWrapperLabel: aw.Name}); err != nil {
472+
client.MatchingLabels{awv1beta2.AppWrapperLabel: aw.Name}); err != nil {
478473
log.FromContext(ctx).Error(err, "Pod list error")
479474
}
480475

481476
if !componentsRemaining && len(pods.Items) == 0 {
482477
// no resources or pods left; deletion is complete
483-
clearCondition(aw, workloadv1beta2.DeletingResources, "DeletionComplete", "")
478+
clearCondition(aw, awv1beta2.DeletingResources, "DeletionComplete", "")
484479
return true
485480
}
486481

Diff for: internal/controller/appwrapper/suite_test.go

+2-2
Original file line numberDiff line numberDiff line change
@@ -38,7 +38,7 @@ import (
3838
logf "sigs.k8s.io/controller-runtime/pkg/log"
3939
"sigs.k8s.io/controller-runtime/pkg/log/zap"
4040

41-
workloadv1beta2 "github.com/project-codeflare/appwrapper/api/v1beta2"
41+
awv1beta2 "github.com/project-codeflare/appwrapper/api/v1beta2"
4242
kueue "sigs.k8s.io/kueue/apis/kueue/v1beta1"
4343
)
4444

@@ -85,7 +85,7 @@ var _ = BeforeSuite(func() {
8585
Expect(cfg).NotTo(BeNil())
8686

8787
scheme := apimachineryruntime.NewScheme()
88-
err = workloadv1beta2.AddToScheme(scheme)
88+
err = awv1beta2.AddToScheme(scheme)
8989
Expect(err).NotTo(HaveOccurred())
9090

9191
err = admissionv1.AddToScheme(scheme)

0 commit comments

Comments
 (0)