Skip to content

Adjust APIs between AppWrapper and Kueue AppWrapper integration #321

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 5 commits into from
Feb 20, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions cmd/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ import (

kueue "sigs.k8s.io/kueue/apis/kueue/v1beta1"

workloadv1beta2 "github.com/project-codeflare/appwrapper/api/v1beta2"
awv1beta2 "github.com/project-codeflare/appwrapper/api/v1beta2"
"github.com/project-codeflare/appwrapper/internal/metrics"
"github.com/project-codeflare/appwrapper/pkg/config"
"github.com/project-codeflare/appwrapper/pkg/controller"
Expand All @@ -67,7 +67,7 @@ var (
func init() {
utilruntime.Must(clientgoscheme.AddToScheme(scheme))
utilruntime.Must(kueue.AddToScheme(scheme))
utilruntime.Must(workloadv1beta2.AddToScheme(scheme))
utilruntime.Must(awv1beta2.AddToScheme(scheme))
//+kubebuilder:scaffold:scheme
}

Expand Down
236 changes: 118 additions & 118 deletions internal/controller/appwrapper/appwrapper_controller.go

Large diffs are not rendered by default.

209 changes: 91 additions & 118 deletions internal/controller/appwrapper/appwrapper_controller_test.go

Large diffs are not rendered by default.

38 changes: 19 additions & 19 deletions internal/controller/appwrapper/fixtures_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ import (
kueue "sigs.k8s.io/kueue/apis/kueue/v1beta1"
"sigs.k8s.io/yaml"

workloadv1beta2 "github.com/project-codeflare/appwrapper/api/v1beta2"
awv1beta2 "github.com/project-codeflare/appwrapper/api/v1beta2"
)

const charset = "abcdefghijklmnopqrstuvwxyz0123456789"
Expand All @@ -47,16 +47,16 @@ func randName(baseName string) string {
return fmt.Sprintf("%s-%s", baseName, string(b))
}

func toAppWrapper(components ...workloadv1beta2.AppWrapperComponent) *workloadv1beta2.AppWrapper {
return &workloadv1beta2.AppWrapper{
TypeMeta: metav1.TypeMeta{APIVersion: workloadv1beta2.GroupVersion.String(), Kind: "AppWrapper"},
func toAppWrapper(components ...awv1beta2.AppWrapperComponent) *awv1beta2.AppWrapper {
return &awv1beta2.AppWrapper{
TypeMeta: metav1.TypeMeta{APIVersion: awv1beta2.GroupVersion.String(), Kind: "AppWrapper"},
ObjectMeta: metav1.ObjectMeta{Name: randName("aw"), Namespace: "default"},
Spec: workloadv1beta2.AppWrapperSpec{Components: components},
Spec: awv1beta2.AppWrapperSpec{Components: components},
}
}

func getAppWrapper(typeNamespacedName types.NamespacedName) *workloadv1beta2.AppWrapper {
aw := &workloadv1beta2.AppWrapper{}
func getAppWrapper(typeNamespacedName types.NamespacedName) *awv1beta2.AppWrapper {
aw := &awv1beta2.AppWrapper{}
err := k8sClient.Get(ctx, typeNamespacedName, aw)
Expect(err).NotTo(HaveOccurred())
return aw
Expand All @@ -69,21 +69,21 @@ func getNode(name string) *v1.Node {
return node
}

func getPods(aw *workloadv1beta2.AppWrapper) []v1.Pod {
func getPods(aw *awv1beta2.AppWrapper) []v1.Pod {
result := []v1.Pod{}
podList := &v1.PodList{}
err := k8sClient.List(ctx, podList, &client.ListOptions{Namespace: aw.Namespace})
Expect(err).NotTo(HaveOccurred())
for _, pod := range podList.Items {
if awn, found := pod.Labels[workloadv1beta2.AppWrapperLabel]; found && awn == aw.Name {
if awn, found := pod.Labels[awv1beta2.AppWrapperLabel]; found && awn == aw.Name {
result = append(result, pod)
}
}
return result
}

// envTest doesn't have a Pod controller; so simulate it
func setPodStatus(aw *workloadv1beta2.AppWrapper, phase v1.PodPhase, numToChange int32) error {
func setPodStatus(aw *awv1beta2.AppWrapper, phase v1.PodPhase, numToChange int32) error {
podList := &v1.PodList{}
err := k8sClient.List(ctx, podList, &client.ListOptions{Namespace: aw.Namespace})
if err != nil {
Expand All @@ -93,7 +93,7 @@ func setPodStatus(aw *workloadv1beta2.AppWrapper, phase v1.PodPhase, numToChange
if numToChange <= 0 {
return nil
}
if awn, found := pod.Labels[workloadv1beta2.AppWrapperLabel]; found && awn == aw.Name {
if awn, found := pod.Labels[awv1beta2.AppWrapperLabel]; found && awn == aw.Name {
pod.Status.Phase = phase
err = k8sClient.Status().Update(ctx, &pod)
if err != nil {
Expand Down Expand Up @@ -123,7 +123,7 @@ spec:
limits:
nvidia.com/gpu: %v`

func pod(milliCPU int64, numGPU int64, declarePodSets bool) workloadv1beta2.AppWrapperComponent {
func pod(milliCPU int64, numGPU int64, declarePodSets bool) awv1beta2.AppWrapperComponent {
yamlString := fmt.Sprintf(podYAML,
randName("pod"),
resource.NewMilliQuantity(milliCPU, resource.DecimalSI),
Expand All @@ -132,11 +132,11 @@ func pod(milliCPU int64, numGPU int64, declarePodSets bool) workloadv1beta2.AppW

jsonBytes, err := yaml.YAMLToJSON([]byte(yamlString))
Expect(err).NotTo(HaveOccurred())
awc := &workloadv1beta2.AppWrapperComponent{
awc := &awv1beta2.AppWrapperComponent{
Template: runtime.RawExtension{Raw: jsonBytes},
}
if declarePodSets {
awc.DeclaredPodSets = []workloadv1beta2.AppWrapperPodSet{{Replicas: ptr.To(int32(1)), Path: "template"}}
awc.DeclaredPodSets = []awv1beta2.AppWrapperPodSet{{Replicas: ptr.To(int32(1)), Path: "template"}}
}
return *awc
}
Expand Down Expand Up @@ -181,11 +181,11 @@ spec:
limits:
nvidia.com/gpu: 1`

func complexPodYaml() workloadv1beta2.AppWrapperComponent {
func complexPodYaml() awv1beta2.AppWrapperComponent {
yamlString := fmt.Sprintf(complexPodYAML, randName("pod"))
jsonBytes, err := yaml.YAMLToJSON([]byte(yamlString))
Expect(err).NotTo(HaveOccurred())
awc := &workloadv1beta2.AppWrapperComponent{
awc := &awv1beta2.AppWrapperComponent{
Template: runtime.RawExtension{Raw: jsonBytes},
}
return *awc
Expand All @@ -205,15 +205,15 @@ spec:
requests:
cpu: %v`

func malformedPod(milliCPU int64) workloadv1beta2.AppWrapperComponent {
func malformedPod(milliCPU int64) awv1beta2.AppWrapperComponent {
yamlString := fmt.Sprintf(malformedPodYAML,
randName("pod"),
resource.NewMilliQuantity(milliCPU, resource.DecimalSI))

jsonBytes, err := yaml.YAMLToJSON([]byte(yamlString))
Expect(err).NotTo(HaveOccurred())
return workloadv1beta2.AppWrapperComponent{
DeclaredPodSets: []workloadv1beta2.AppWrapperPodSet{{Replicas: ptr.To(int32(1)), Path: "template"}},
return awv1beta2.AppWrapperComponent{
DeclaredPodSets: []awv1beta2.AppWrapperPodSet{{Replicas: ptr.To(int32(1)), Path: "template"}},
Template: runtime.RawExtension{Raw: jsonBytes},
}
}
Expand Down
51 changes: 23 additions & 28 deletions internal/controller/appwrapper/resource_management.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,8 @@ import (
"fmt"
"time"

workloadv1beta2 "github.com/project-codeflare/appwrapper/api/v1beta2"
awv1beta2 "github.com/project-codeflare/appwrapper/api/v1beta2"
utilmaps "github.com/project-codeflare/appwrapper/internal/util"
"github.com/project-codeflare/appwrapper/pkg/utils"
v1 "k8s.io/api/core/v1"
apierrors "k8s.io/apimachinery/pkg/api/errors"
Expand All @@ -34,8 +35,6 @@ import (
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/controller/controllerutil"
"sigs.k8s.io/controller-runtime/pkg/log"
"sigs.k8s.io/kueue/pkg/podset"
utilmaps "sigs.k8s.io/kueue/pkg/util/maps"
)

func parseComponent(raw []byte, expectedNamespace string) (*unstructured.Unstructured, error) {
Expand Down Expand Up @@ -188,7 +187,7 @@ func addNodeSelectorsToAffinity(spec map[string]interface{}, exprsToAdd []v1.Nod
}

//gocyclo:ignore
func (r *AppWrapperReconciler) createComponent(ctx context.Context, aw *workloadv1beta2.AppWrapper, componentIdx int) (error, bool) {
func (r *AppWrapperReconciler) createComponent(ctx context.Context, aw *awv1beta2.AppWrapper, componentIdx int) (error, bool) {
component := aw.Spec.Components[componentIdx]
componentStatus := aw.Status.ComponentStatus[componentIdx]
toMap := func(x interface{}) map[string]string {
Expand Down Expand Up @@ -218,17 +217,13 @@ func (r *AppWrapperReconciler) createComponent(ctx context.Context, aw *workload
if err != nil {
return err, true
}
awLabels := map[string]string{workloadv1beta2.AppWrapperLabel: aw.Name}
awLabels := map[string]string{awv1beta2.AppWrapperLabel: aw.Name}
obj.SetLabels(utilmaps.MergeKeepFirst(obj.GetLabels(), awLabels))

for podSetsIdx, podSet := range componentStatus.PodSets {
toInject := &workloadv1beta2.AppWrapperPodSetInfo{}
if r.Config.EnableKueueIntegrations {
if podSetsIdx < len(component.PodSetInfos) {
toInject = &component.PodSetInfos[podSetsIdx]
} else {
return fmt.Errorf("missing podSetInfo %v for component %v", podSetsIdx, componentIdx), true
}
toInject := &awv1beta2.AppWrapperPodSetInfo{}
if podSetsIdx < len(component.PodSetInfos) {
toInject = &component.PodSetInfos[podSetsIdx]
}

p, err := utils.GetRawTemplate(obj.UnstructuredContent(), podSet.Path)
Expand All @@ -245,7 +240,7 @@ func (r *AppWrapperReconciler) createComponent(ctx context.Context, aw *workload
if len(toInject.Annotations) > 0 {
existing := toMap(metadata["annotations"])
if err := utilmaps.HaveConflict(existing, toInject.Annotations); err != nil {
return podset.BadPodSetsUpdateError("annotations", err), true
return fmt.Errorf("conflict updating annotations: %w", err), true
}
metadata["annotations"] = utilmaps.MergeKeepFirst(existing, toInject.Annotations)
}
Expand All @@ -254,15 +249,15 @@ func (r *AppWrapperReconciler) createComponent(ctx context.Context, aw *workload
mergedLabels := utilmaps.MergeKeepFirst(toInject.Labels, awLabels)
existing := toMap(metadata["labels"])
if err := utilmaps.HaveConflict(existing, mergedLabels); err != nil {
return podset.BadPodSetsUpdateError("labels", err), true
return fmt.Errorf("conflict updating labels: %w", err), true
}
metadata["labels"] = utilmaps.MergeKeepFirst(existing, mergedLabels)

// NodeSelectors
if len(toInject.NodeSelector) > 0 {
existing := toMap(spec["nodeSelector"])
if err := utilmaps.HaveConflict(existing, toInject.NodeSelector); err != nil {
return podset.BadPodSetsUpdateError("nodeSelector", err), true
return fmt.Errorf("conflict updating nodeSelector: %w", err), true
}
spec["nodeSelector"] = utilmaps.MergeKeepFirst(existing, toInject.NodeSelector)
}
Expand Down Expand Up @@ -354,12 +349,12 @@ func (r *AppWrapperReconciler) createComponent(ctx context.Context, aw *workload
log.FromContext(ctx).Info("After injection", "obj", obj)

orig := copyForStatusPatch(aw)
if meta.FindStatusCondition(aw.Status.ComponentStatus[componentIdx].Conditions, string(workloadv1beta2.ResourcesDeployed)) == nil {
if meta.FindStatusCondition(aw.Status.ComponentStatus[componentIdx].Conditions, string(awv1beta2.ResourcesDeployed)) == nil {
aw.Status.ComponentStatus[componentIdx].Name = obj.GetName()
aw.Status.ComponentStatus[componentIdx].Kind = obj.GetKind()
aw.Status.ComponentStatus[componentIdx].APIVersion = obj.GetAPIVersion()
meta.SetStatusCondition(&aw.Status.ComponentStatus[componentIdx].Conditions, metav1.Condition{
Type: string(workloadv1beta2.ResourcesDeployed),
Type: string(awv1beta2.ResourcesDeployed),
Status: metav1.ConditionUnknown,
Reason: "ComponentCreationInitiated",
})
Expand All @@ -383,7 +378,7 @@ func (r *AppWrapperReconciler) createComponent(ctx context.Context, aw *workload
// resource not actually created; patch status to reflect that
orig := copyForStatusPatch(aw)
meta.SetStatusCondition(&aw.Status.ComponentStatus[componentIdx].Conditions, metav1.Condition{
Type: string(workloadv1beta2.ResourcesDeployed),
Type: string(awv1beta2.ResourcesDeployed),
Status: metav1.ConditionFalse,
Reason: "ComponentCreationErrored",
})
Expand All @@ -399,7 +394,7 @@ func (r *AppWrapperReconciler) createComponent(ctx context.Context, aw *workload
orig = copyForStatusPatch(aw)
aw.Status.ComponentStatus[componentIdx].Name = obj.GetName() // Update name to support usage of GenerateName
meta.SetStatusCondition(&aw.Status.ComponentStatus[componentIdx].Conditions, metav1.Condition{
Type: string(workloadv1beta2.ResourcesDeployed),
Type: string(awv1beta2.ResourcesDeployed),
Status: metav1.ConditionTrue,
Reason: "ComponentCreatedSuccessfully",
})
Expand All @@ -411,9 +406,9 @@ func (r *AppWrapperReconciler) createComponent(ctx context.Context, aw *workload
}

// createComponents incrementally patches aw.Status -- MUST NOT CARRY STATUS PATCHES ACROSS INVOCATIONS
func (r *AppWrapperReconciler) createComponents(ctx context.Context, aw *workloadv1beta2.AppWrapper) (error, bool) {
func (r *AppWrapperReconciler) createComponents(ctx context.Context, aw *awv1beta2.AppWrapper) (error, bool) {
for componentIdx := range aw.Spec.Components {
if !meta.IsStatusConditionTrue(aw.Status.ComponentStatus[componentIdx].Conditions, string(workloadv1beta2.ResourcesDeployed)) {
if !meta.IsStatusConditionTrue(aw.Status.ComponentStatus[componentIdx].Conditions, string(awv1beta2.ResourcesDeployed)) {
if err, fatal := r.createComponent(ctx, aw, componentIdx); err != nil {
return err, fatal
}
Expand All @@ -422,10 +417,10 @@ func (r *AppWrapperReconciler) createComponents(ctx context.Context, aw *workloa
return nil, false
}

func (r *AppWrapperReconciler) deleteComponents(ctx context.Context, aw *workloadv1beta2.AppWrapper) bool {
func (r *AppWrapperReconciler) deleteComponents(ctx context.Context, aw *awv1beta2.AppWrapper) bool {
deleteIfPresent := func(idx int, opts ...client.DeleteOption) bool {
cs := &aw.Status.ComponentStatus[idx]
rd := meta.FindStatusCondition(cs.Conditions, string(workloadv1beta2.ResourcesDeployed))
rd := meta.FindStatusCondition(cs.Conditions, string(awv1beta2.ResourcesDeployed))
if rd == nil || rd.Status == metav1.ConditionFalse || (rd.Status == metav1.ConditionUnknown && cs.Name == "") {
return false // not present
}
Expand All @@ -437,7 +432,7 @@ func (r *AppWrapperReconciler) deleteComponents(ctx context.Context, aw *workloa
if apierrors.IsNotFound(err) {
// Has already been undeployed; update componentStatus and return not present
meta.SetStatusCondition(&cs.Conditions, metav1.Condition{
Type: string(workloadv1beta2.ResourcesDeployed),
Type: string(awv1beta2.ResourcesDeployed),
Status: metav1.ConditionFalse,
Reason: "CompononetDeleted",
})
Expand All @@ -451,7 +446,7 @@ func (r *AppWrapperReconciler) deleteComponents(ctx context.Context, aw *workloa
}

meta.SetStatusCondition(&aw.Status.Conditions, metav1.Condition{
Type: string(workloadv1beta2.DeletingResources),
Type: string(awv1beta2.DeletingResources),
Status: metav1.ConditionTrue,
Reason: "DeletionInitiated",
})
Expand All @@ -462,7 +457,7 @@ func (r *AppWrapperReconciler) deleteComponents(ctx context.Context, aw *workloa
}

deletionGracePeriod := r.forcefulDeletionGraceDuration(ctx, aw)
whenInitiated := meta.FindStatusCondition(aw.Status.Conditions, string(workloadv1beta2.DeletingResources)).LastTransitionTime
whenInitiated := meta.FindStatusCondition(aw.Status.Conditions, string(awv1beta2.DeletingResources)).LastTransitionTime
gracePeriodExpired := time.Now().After(whenInitiated.Time.Add(deletionGracePeriod))

if componentsRemaining && !gracePeriodExpired {
Expand All @@ -474,13 +469,13 @@ func (r *AppWrapperReconciler) deleteComponents(ctx context.Context, aw *workloa
if err := r.List(ctx, pods,
client.UnsafeDisableDeepCopy,
client.InNamespace(aw.Namespace),
client.MatchingLabels{workloadv1beta2.AppWrapperLabel: aw.Name}); err != nil {
client.MatchingLabels{awv1beta2.AppWrapperLabel: aw.Name}); err != nil {
log.FromContext(ctx).Error(err, "Pod list error")
}

if !componentsRemaining && len(pods.Items) == 0 {
// no resources or pods left; deletion is complete
clearCondition(aw, workloadv1beta2.DeletingResources, "DeletionComplete", "")
clearCondition(aw, awv1beta2.DeletingResources, "DeletionComplete", "")
return true
}

Expand Down
4 changes: 2 additions & 2 deletions internal/controller/appwrapper/suite_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ import (
logf "sigs.k8s.io/controller-runtime/pkg/log"
"sigs.k8s.io/controller-runtime/pkg/log/zap"

workloadv1beta2 "github.com/project-codeflare/appwrapper/api/v1beta2"
awv1beta2 "github.com/project-codeflare/appwrapper/api/v1beta2"
kueue "sigs.k8s.io/kueue/apis/kueue/v1beta1"
)

Expand Down Expand Up @@ -85,7 +85,7 @@ var _ = BeforeSuite(func() {
Expect(cfg).NotTo(BeNil())

scheme := apimachineryruntime.NewScheme()
err = workloadv1beta2.AddToScheme(scheme)
err = awv1beta2.AddToScheme(scheme)
Expect(err).NotTo(HaveOccurred())

err = admissionv1.AddToScheme(scheme)
Expand Down
Loading