Skip to content

Commit f27059e

Browse files
committed
adjust appwrapper/kueue api to push kueue types into workload controller
1 parent 3829595 commit f27059e

File tree

3 files changed

+36
-34
lines changed

3 files changed

+36
-34
lines changed

Diff for: internal/controller/appwrapper/appwrapper_controller_test.go

+2-3
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,6 @@ import (
2929
"k8s.io/client-go/tools/record"
3030
"sigs.k8s.io/controller-runtime/pkg/controller/controllerutil"
3131
"sigs.k8s.io/controller-runtime/pkg/reconcile"
32-
"sigs.k8s.io/kueue/pkg/podset"
3332

3433
awv1beta2 "github.com/project-codeflare/appwrapper/api/v1beta2"
3534
"github.com/project-codeflare/appwrapper/pkg/config"
@@ -39,7 +38,7 @@ import (
3938
var _ = Describe("AppWrapper Controller", func() {
4039
var awReconciler *AppWrapperReconciler
4140
var awName types.NamespacedName
42-
markerPodSet := podset.PodSetInfo{
41+
markerPodSet := awv1beta2.AppWrapperPodSetInfo{
4342
Labels: map[string]string{"testkey1": "value1"},
4443
Annotations: map[string]string{"test2": "test2"},
4544
NodeSelector: map[string]string{"nodeName": "myNode"},
@@ -79,7 +78,7 @@ var _ = Describe("AppWrapper Controller", func() {
7978
Expect(aw.Status.Phase).Should(Equal(awv1beta2.AppWrapperSuspended))
8079

8180
By("Updating aw.Spec by invoking utils.SetPodSetInfos and setting suspend to false")
82-
Expect(utils.SetPodSetInfos(aw, []podset.PodSetInfo{markerPodSet, markerPodSet})).To(Succeed())
81+
Expect(utils.SetPodSetInfos(aw, []awv1beta2.AppWrapperPodSetInfo{markerPodSet, markerPodSet})).To(Succeed())
8382
aw.Spec.Suspend = false
8483
Expect(k8sClient.Update(ctx, aw)).To(Succeed())
8584

Diff for: internal/controller/workload/workload_controller.go

+22-6
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,8 @@ limitations under the License.
1717
package workload
1818

1919
import (
20+
"fmt"
21+
2022
"k8s.io/apimachinery/pkg/api/meta"
2123
"k8s.io/apimachinery/pkg/runtime/schema"
2224

@@ -72,21 +74,35 @@ func (aw *AppWrapper) GVK() schema.GroupVersionKind {
7274
}
7375

7476
func (aw *AppWrapper) PodSets() []kueue.PodSet {
75-
podSets, err := utils.GetPodSets((*awv1beta2.AppWrapper)(aw))
77+
podSpecTemplates, awPodSets, err := utils.GetComponentPodSpecs((*awv1beta2.AppWrapper)(aw))
7678
if err != nil {
7779
// Kueue will raise an error on zero length PodSet; the Kueue GenericJob API prevents propagating the actual error.
7880
return []kueue.PodSet{}
7981
}
80-
for psIndex := range podSets {
81-
podSets[psIndex].TopologyRequest = jobframework.PodSetTopologyRequest(&podSets[psIndex].Template.ObjectMeta, nil, nil, nil)
82+
podSets := []kueue.PodSet{}
83+
for psIndex := range podSpecTemplates {
84+
podSets = append(podSets, kueue.PodSet{
85+
Name: fmt.Sprintf("%s-%v", aw.Name, psIndex),
86+
Template: *podSpecTemplates[psIndex],
87+
Count: utils.Replicas(awPodSets[psIndex]),
88+
TopologyRequest: jobframework.PodSetTopologyRequest(&(podSpecTemplates[psIndex].ObjectMeta), nil, nil, nil),
89+
})
8290
}
83-
8491
return podSets
8592
}
8693

8794
func (aw *AppWrapper) RunWithPodSetsInfo(podSetsInfo []podset.PodSetInfo) error {
88-
if err := utils.SetPodSetInfos((*awv1beta2.AppWrapper)(aw), podSetsInfo); err != nil {
89-
return err
95+
awPodSetsInfo := make([]awv1beta2.AppWrapperPodSetInfo, len(podSetsInfo))
96+
for idx := range podSetsInfo {
97+
awPodSetsInfo[idx].Annotations = podSetsInfo[idx].Annotations
98+
awPodSetsInfo[idx].Labels = podSetsInfo[idx].Labels
99+
awPodSetsInfo[idx].NodeSelector = podSetsInfo[idx].NodeSelector
100+
awPodSetsInfo[idx].Tolerations = podSetsInfo[idx].Tolerations
101+
awPodSetsInfo[idx].SchedulingGates = podSetsInfo[idx].SchedulingGates
102+
}
103+
104+
if err := utils.SetPodSetInfos((*awv1beta2.AppWrapper)(aw), awPodSetsInfo); err != nil {
105+
return fmt.Errorf("%w: %v", podset.ErrInvalidPodsetInfo, err)
90106
}
91107
aw.Spec.Suspend = false
92108
return nil

Diff for: pkg/utils/utils.go

+12-25
Original file line numberDiff line numberDiff line change
@@ -38,9 +38,6 @@ import (
3838
"k8s.io/utils/ptr"
3939
jobsetapi "sigs.k8s.io/jobset/api/jobset/v1alpha2"
4040

41-
kueue "sigs.k8s.io/kueue/apis/kueue/v1beta1"
42-
"sigs.k8s.io/kueue/pkg/podset"
43-
4441
awv1beta2 "github.com/project-codeflare/appwrapper/api/v1beta2"
4542
)
4643

@@ -349,36 +346,32 @@ func EnsureComponentStatusInitialized(aw *awv1beta2.AppWrapper) error {
349346
return nil
350347
}
351348

352-
// GetPodSets constructs the kueue.PodSets for an AppWrapper
353-
func GetPodSets(aw *awv1beta2.AppWrapper) ([]kueue.PodSet, error) {
354-
podSets := []kueue.PodSet{}
349+
func GetComponentPodSpecs(aw *awv1beta2.AppWrapper) ([]*v1.PodTemplateSpec, []awv1beta2.AppWrapperPodSet, error) {
350+
templates := []*v1.PodTemplateSpec{}
351+
podSets := []awv1beta2.AppWrapperPodSet{}
355352
if err := EnsureComponentStatusInitialized(aw); err != nil {
356-
return nil, err
353+
return nil, nil, err
357354
}
358355
for idx := range aw.Status.ComponentStatus {
359356
if len(aw.Status.ComponentStatus[idx].PodSets) > 0 {
360357
obj := &unstructured.Unstructured{}
361358
if _, _, err := unstructured.UnstructuredJSONScheme.Decode(aw.Spec.Components[idx].Template.Raw, nil, obj); err != nil {
362359
// Should be unreachable; Template.Raw validated by AppWrapper AdmissionController
363-
return nil, err
360+
return nil, nil, err
364361
}
365-
for psIdx, podSet := range aw.Status.ComponentStatus[idx].PodSets {
366-
replicas := Replicas(podSet)
362+
for _, podSet := range aw.Status.ComponentStatus[idx].PodSets {
367363
if template, err := GetPodTemplateSpec(obj, podSet.Path); err == nil {
368-
podSets = append(podSets, kueue.PodSet{
369-
Name: fmt.Sprintf("%s-%v-%v", aw.Name, idx, psIdx),
370-
Template: *template,
371-
Count: replicas,
372-
})
364+
templates = append(templates, template)
365+
podSets = append(podSets, podSet)
373366
}
374367
}
375368
}
376369
}
377-
return podSets, nil
370+
return templates, podSets, nil
378371
}
379372

380373
// SetPodSetInfos propagates podSetsInfo into the PodSetInfos of aw.Spec.Components
381-
func SetPodSetInfos(aw *awv1beta2.AppWrapper, podSetsInfo []podset.PodSetInfo) error {
374+
func SetPodSetInfos(aw *awv1beta2.AppWrapper, podSetsInfo []awv1beta2.AppWrapperPodSetInfo) error {
382375
if err := EnsureComponentStatusInitialized(aw); err != nil {
383376
return err
384377
}
@@ -392,18 +385,12 @@ func SetPodSetInfos(aw *awv1beta2.AppWrapper, podSetsInfo []podset.PodSetInfo) e
392385
if podSetsInfoIndex > len(podSetsInfo) {
393386
continue // we will return an error below...continuing to get an accurate count for the error message
394387
}
395-
aw.Spec.Components[idx].PodSetInfos[podSetIdx] = awv1beta2.AppWrapperPodSetInfo{
396-
Annotations: podSetsInfo[podSetsInfoIndex-1].Annotations,
397-
Labels: podSetsInfo[podSetsInfoIndex-1].Labels,
398-
NodeSelector: podSetsInfo[podSetsInfoIndex-1].NodeSelector,
399-
Tolerations: podSetsInfo[podSetsInfoIndex-1].Tolerations,
400-
SchedulingGates: podSetsInfo[podSetsInfoIndex-1].SchedulingGates,
401-
}
388+
aw.Spec.Components[idx].PodSetInfos[podSetIdx] = podSetsInfo[podSetIdx]
402389
}
403390
}
404391

405392
if podSetsInfoIndex != len(podSetsInfo) {
406-
return podset.BadPodSetsInfoLenError(podSetsInfoIndex, len(podSetsInfo))
393+
return fmt.Errorf("expecting %d podsets, got %d", podSetsInfoIndex, len(podSetsInfo))
407394
}
408395
return nil
409396
}

0 commit comments

Comments
 (0)