Skip to content

Commit 210b97b

Browse files
Tal-orbertinatto
authored andcommitted
UPSTREAM: <carry>: add new admission for handling shared cpus
Adding a new mutation plugin that handles the following: 1. In case of `workload.openshift.io/enable-shared-cpus` request, it adds an annotation to hint runtime about the request. runtime is not aware of extended resources, hence we need the annotation. 2. It validates the pod's QoS class and return an error if it's not a guaranteed QoS class 3. It validates that no more than a single resource is being request. 4. It validates that the pod deployed in a namespace that has mixedcpus workloads allowed annotation. For more information see - openshift/enhancements#1396 Signed-off-by: Talor Itzhak <[email protected]> UPSTREAM: <carry>: Update management webhook pod admission logic Updating the logic for pod admission to allow a pod creation with workload partitioning annotations to be run in a namespace that has no workload allow annoations. The pod will be stripped of its workload annotations and treated as if it were normal, a warning annoation will be placed to note the behavior on the pod. Signed-off-by: ehila <[email protected]> UPSTREAM: <carry>: add support for cpu limits into management workloads Added support to allow workload partitioning to use the CPU limits for a container, to allow the runtime to make better decisions around workload cpu quotas we are passing down the cpu limit as part of the cpulimit value in the annotation. CRI-O will take that information and calculate the quota per node. This should support situations where workloads might have different cpu period overrides assigned. Updated kubelet for static pods and the admission webhook for regular to support cpu limits. Updated unit test to reflect changes. Signed-off-by: ehila <[email protected]>
1 parent 97777c3 commit 210b97b

File tree

8 files changed

+660
-65
lines changed

8 files changed

+660
-65
lines changed

openshift-kube-apiserver/admission/admissionenablement/register.go

+3
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@ import (
55
"k8s.io/apiserver/pkg/admission"
66
"k8s.io/apiserver/pkg/admission/plugin/resourcequota"
77
mutatingwebhook "k8s.io/apiserver/pkg/admission/plugin/webhook/mutating"
8+
"k8s.io/kubernetes/openshift-kube-apiserver/admission/autoscaling/mixedcpus"
89

910
"github.com/openshift/apiserver-library-go/pkg/admission/imagepolicy"
1011
imagepolicyapiv1 "github.com/openshift/apiserver-library-go/pkg/admission/imagepolicy/apis/imagepolicy/v1"
@@ -32,6 +33,7 @@ func RegisterOpenshiftKubeAdmissionPlugins(plugins *admission.Plugins) {
3233
ingressadmission.Register(plugins)
3334
managementcpusoverride.Register(plugins)
3435
managednode.Register(plugins)
36+
mixedcpus.Register(plugins)
3537
projectnodeenv.Register(plugins)
3638
quotaclusterresourceoverride.Register(plugins)
3739
quotaclusterresourcequota.Register(plugins)
@@ -74,6 +76,7 @@ var (
7476
hostassignment.PluginName, // "route.openshift.io/RouteHostAssignment"
7577
csiinlinevolumesecurity.PluginName, // "storage.openshift.io/CSIInlineVolumeSecurity"
7678
managednode.PluginName, // "autoscaling.openshift.io/ManagedNode"
79+
mixedcpus.PluginName, // "autoscaling.openshift.io/MixedCPUs"
7780
}
7881

7982
// openshiftAdmissionPluginsForKubeAfterResourceQuota are the plugins to add after ResourceQuota plugin

openshift-kube-apiserver/admission/autoscaling/managementcpusoverride/admission.go

+54-34
Original file line numberDiff line numberDiff line change
@@ -71,6 +71,14 @@ func Register(plugins *admission.Plugins) {
7171
})
7272
}
7373

74+
type resourceAnnotation struct {
75+
// CPUShares contains resource annotation value cpushares key
76+
CPUShares uint64 `json:"cpushares,omitempty"`
77+
// CPULimit contains the cpu limit in millicores to be used by the container runtime to calculate
78+
// quota
79+
CPULimit int64 `json:"cpulimit,omitempty"`
80+
}
81+
7482
// managementCPUsOverride presents admission plugin that should replace pod container CPU requests with a new management resource.
7583
// It applies to all pods that:
7684
// 1. are in an allowed namespace
@@ -217,6 +225,13 @@ func (a *managementCPUsOverride) Admit(ctx context.Context, attr admission.Attri
217225
return err
218226
}
219227

228+
if _, found := ns.Annotations[namespaceAllowedAnnotation]; !found && len(workloadType) > 0 {
229+
pod.Annotations[workloadAdmissionWarning] = fmt.Sprintf(
230+
"skipping pod CPUs requests modifications because the %s namespace is not annotated with %s to allow workload partitioning",
231+
ns.GetName(), namespaceAllowedAnnotation)
232+
return nil
233+
}
234+
220235
if !doesNamespaceAllowWorkloadType(ns.Annotations, workloadType) {
221236
return admission.NewForbidden(attr, fmt.Errorf("%s the pod namespace %q does not allow the workload type %s", PluginName, ns.Name, workloadType))
222237
}
@@ -245,13 +260,6 @@ func (a *managementCPUsOverride) Admit(ctx context.Context, attr admission.Attri
245260
return nil
246261
}
247262

248-
// we should skip mutation of the pod that has container with both CPU limit and request because once we will remove
249-
// the request, the defaulter will set the request back with the CPU limit value
250-
if podHasBothCPULimitAndRequest(allContainers) {
251-
pod.Annotations[workloadAdmissionWarning] = "skip pod CPUs requests modifications because pod container has both CPU limit and request"
252-
return nil
253-
}
254-
255263
// before we update the pod available under admission attributes, we need to verify that deletion of the CPU request
256264
// will not change the pod QoS class, otherwise skip pod mutation
257265
// 1. Copy the pod
@@ -353,6 +361,14 @@ func updateContainersResources(containers []coreapi.Container, podAnnotations ma
353361
continue
354362
}
355363

364+
resourceAnno := resourceAnnotation{}
365+
366+
if c.Resources.Limits != nil {
367+
if value, ok := c.Resources.Limits[coreapi.ResourceCPU]; ok {
368+
resourceAnno.CPULimit = value.MilliValue()
369+
}
370+
}
371+
356372
if c.Resources.Requests != nil {
357373
if _, ok := c.Resources.Requests[coreapi.ResourceCPU]; !ok {
358374
continue
@@ -361,9 +377,20 @@ func updateContainersResources(containers []coreapi.Container, podAnnotations ma
361377
cpuRequest := c.Resources.Requests[coreapi.ResourceCPU]
362378
cpuRequestInMilli := cpuRequest.MilliValue()
363379

364-
cpuShares := cm.MilliCPUToShares(cpuRequestInMilli)
365-
podAnnotations[cpusharesAnnotationKey] = fmt.Sprintf(`{"%s": %d}`, containerResourcesAnnotationValueKeyCPUShares, cpuShares)
380+
// Casting to uint64, Linux build returns uint64, noop Darwin build returns int64
381+
resourceAnno.CPUShares = uint64(cm.MilliCPUToShares(cpuRequestInMilli))
382+
383+
// This should not error but if something does go wrong we default to string creation of just CPU Shares
384+
// and add a warning annotation
385+
resourceAnnoString, err := json.Marshal(resourceAnno)
386+
if err != nil {
387+
podAnnotations[workloadAdmissionWarning] = fmt.Sprintf("failed to marshal cpu resources, using fallback: err: %s", err.Error())
388+
podAnnotations[cpusharesAnnotationKey] = fmt.Sprintf(`{"%s": %d}`, containerResourcesAnnotationValueKeyCPUShares, resourceAnno.CPUShares)
389+
} else {
390+
podAnnotations[cpusharesAnnotationKey] = string(resourceAnnoString)
391+
}
366392
delete(c.Resources.Requests, coreapi.ResourceCPU)
393+
delete(c.Resources.Limits, coreapi.ResourceCPU)
367394

368395
if c.Resources.Limits == nil {
369396
c.Resources.Limits = coreapi.ResourceList{}
@@ -378,7 +405,7 @@ func updateContainersResources(containers []coreapi.Container, podAnnotations ma
378405
}
379406
}
380407

381-
func isGuaranteed(containers []coreapi.Container) bool {
408+
func IsGuaranteed(containers []coreapi.Container) bool {
382409
for _, c := range containers {
383410
// only memory and CPU resources are relevant to decide pod QoS class
384411
for _, r := range []coreapi.ResourceName{coreapi.ResourceMemory, coreapi.ResourceCPU} {
@@ -425,7 +452,7 @@ func isBestEffort(containers []coreapi.Container) bool {
425452
}
426453

427454
func getPodQoSClass(containers []coreapi.Container) coreapi.PodQOSClass {
428-
if isGuaranteed(containers) {
455+
if IsGuaranteed(containers) {
429456
return coreapi.PodQOSGuaranteed
430457
}
431458

@@ -449,10 +476,13 @@ func podHasBothCPULimitAndRequest(containers []coreapi.Container) bool {
449476
return false
450477
}
451478

479+
// doesNamespaceAllowWorkloadType will return false when a workload type does not match any present ones.
452480
func doesNamespaceAllowWorkloadType(annotations map[string]string, workloadType string) bool {
453481
v, found := annotations[namespaceAllowedAnnotation]
482+
// When a namespace contains no annotation for workloads we infer that to mean all workload types are allowed.
483+
// The mutation hook will strip all workload annotation from pods that contain them in that circumstance.
454484
if !found {
455-
return false
485+
return true
456486
}
457487

458488
for _, t := range strings.Split(v, ",") {
@@ -559,17 +589,20 @@ func (a *managementCPUsOverride) Validate(ctx context.Context, attr admission.At
559589
allErrs = append(allErrs, getPodInvalidWorkloadAnnotationError(pod.Annotations, err.Error()))
560590
}
561591

562-
workloadResourceAnnotations := map[string]map[string]int{}
592+
workloadResourceAnnotations := resourceAnnotation{}
593+
hasWorkloadAnnotation := false
563594
for k, v := range pod.Annotations {
564595
if !strings.HasPrefix(k, containerResourcesAnnotationPrefix) {
565596
continue
566597
}
598+
hasWorkloadAnnotation = true
567599

568-
resourceAnnotationValue := map[string]int{}
569-
if err := json.Unmarshal([]byte(v), &resourceAnnotationValue); err != nil {
600+
// Custom decoder to print invalid fields for resources
601+
decoder := json.NewDecoder(strings.NewReader(v))
602+
decoder.DisallowUnknownFields()
603+
if err := decoder.Decode(&workloadResourceAnnotations); err != nil {
570604
allErrs = append(allErrs, getPodInvalidWorkloadAnnotationError(pod.Annotations, err.Error()))
571605
}
572-
workloadResourceAnnotations[k] = resourceAnnotationValue
573606
}
574607

575608
containersWorkloadResources := map[string]*coreapi.Container{}
@@ -586,9 +619,9 @@ func (a *managementCPUsOverride) Validate(ctx context.Context, attr admission.At
586619
}
587620
}
588621

589-
// the pod does not have workload annotation
590-
if len(workloadType) == 0 {
591-
if len(workloadResourceAnnotations) > 0 {
622+
switch {
623+
case len(workloadType) == 0: // the pod does not have workload annotation
624+
if hasWorkloadAnnotation {
592625
allErrs = append(allErrs, getPodInvalidWorkloadAnnotationError(pod.Annotations, "the pod without workload annotation can not have resource annotation"))
593626
}
594627

@@ -599,21 +632,8 @@ func (a *managementCPUsOverride) Validate(ctx context.Context, attr admission.At
599632

600633
allErrs = append(allErrs, field.Invalid(field.NewPath("spec.containers.resources.requests"), c.Resources.Requests, fmt.Sprintf("the pod without workload annotations can not have containers with workload resources %q", resourceName)))
601634
}
602-
} else {
603-
if !doesNamespaceAllowWorkloadType(ns.Annotations, workloadType) { // pod has workload annotation, but the pod does not have workload annotation
604-
allErrs = append(allErrs, getPodInvalidWorkloadAnnotationError(pod.Annotations, fmt.Sprintf("the pod can not have workload annotation, when the namespace %q does not allow it", ns.Name)))
605-
}
606-
607-
for _, v := range workloadResourceAnnotations {
608-
if len(v) > 1 {
609-
allErrs = append(allErrs, field.Invalid(field.NewPath("metadata.annotations"), pod.Annotations, "the pod resource annotation value can not have more than one key"))
610-
}
611-
612-
// the pod should not have any resource annotations with the value that includes keys different from cpushares
613-
if _, ok := v[containerResourcesAnnotationValueKeyCPUShares]; len(v) == 1 && !ok {
614-
allErrs = append(allErrs, field.Invalid(field.NewPath("metadata.annotations"), pod.Annotations, "the pod resource annotation value should have only cpushares key"))
615-
}
616-
}
635+
case !doesNamespaceAllowWorkloadType(ns.Annotations, workloadType): // pod has workload annotation, but the namespace does not allow specified workload
636+
allErrs = append(allErrs, getPodInvalidWorkloadAnnotationError(pod.Annotations, fmt.Sprintf("the namespace %q does not allow the workload type %s", ns.Name, workloadType)))
617637
}
618638

619639
if len(allErrs) == 0 {

openshift-kube-apiserver/admission/autoscaling/managementcpusoverride/admission_test.go

+55-30
Original file line numberDiff line numberDiff line change
@@ -84,12 +84,12 @@ func TestAdmit(t *testing.T) {
8484
}{
8585
{
8686
name: "should return admission error when the pod namespace does not allow the workload type",
87-
pod: testManagedPod("500m", "250m", "500Mi", "250Mi"),
87+
pod: testManagedPodWithWorkloadAnnotation("500m", "250m", "500Mi", "250Mi", "non-existent"),
8888
expectedCpuRequest: resource.MustParse("250m"),
89-
namespace: testNamespace(),
89+
namespace: testManagedNamespace(),
9090
nodes: []*corev1.Node{testNodeWithManagementResource()},
9191
infra: testClusterSNOInfra(),
92-
expectedError: fmt.Errorf("the pod namespace %q does not allow the workload type management", "namespace"),
92+
expectedError: fmt.Errorf("the pod namespace %q does not allow the workload type non-existent", "managed-namespace"),
9393
},
9494
{
9595
name: "should ignore pods that do not have managed annotation",
@@ -167,14 +167,33 @@ func TestAdmit(t *testing.T) {
167167
expectedError: fmt.Errorf(`failed to get workload annotation effect: the workload annotation value map["test":"test"] does not have "effect" key`),
168168
infra: testClusterSNOInfra(),
169169
},
170+
{
171+
name: "should return admission warning when the pod has workload annotation but the namespace does not",
172+
pod: testManagedPodWithAnnotations(
173+
"500m",
174+
"250m",
175+
"500Mi",
176+
"250Mi",
177+
map[string]string{
178+
fmt.Sprintf("%s%s", podWorkloadTargetAnnotationPrefix, workloadTypeManagement): `{"test": "test"}`,
179+
},
180+
),
181+
expectedCpuRequest: resource.MustParse("250m"),
182+
expectedAnnotations: map[string]string{
183+
workloadAdmissionWarning: "skipping pod CPUs requests modifications because the namespace namespace is not annotated with workload.openshift.io/allowed to allow workload partitioning",
184+
},
185+
namespace: testNamespace(),
186+
nodes: []*corev1.Node{testNodeWithManagementResource()},
187+
infra: testClusterSNOInfra(),
188+
},
170189
{
171190
name: "should delete CPU requests and update workload CPU annotations for the burstable pod with managed annotation",
172191
pod: testManagedPod("", "250m", "500Mi", "250Mi"),
173192
expectedCpuRequest: resource.Quantity{},
174193
namespace: testManagedNamespace(),
175194
expectedAnnotations: map[string]string{
176-
fmt.Sprintf("%s%s", containerResourcesAnnotationPrefix, "test"): fmt.Sprintf(`{"%s": 256}`, containerResourcesAnnotationValueKeyCPUShares),
177-
fmt.Sprintf("%s%s", containerResourcesAnnotationPrefix, "initTest"): fmt.Sprintf(`{"%s": 256}`, containerResourcesAnnotationValueKeyCPUShares),
195+
fmt.Sprintf("%s%s", containerResourcesAnnotationPrefix, "test"): fmt.Sprintf(`{"%s":256}`, containerResourcesAnnotationValueKeyCPUShares),
196+
fmt.Sprintf("%s%s", containerResourcesAnnotationPrefix, "initTest"): fmt.Sprintf(`{"%s":256}`, containerResourcesAnnotationValueKeyCPUShares),
178197
fmt.Sprintf("%s%s", podWorkloadTargetAnnotationPrefix, workloadTypeManagement): fmt.Sprintf(`{"%s":"%s"}`, podWorkloadAnnotationEffect, workloadEffectPreferredDuringScheduling),
179198
},
180199
nodes: []*corev1.Node{testNodeWithManagementResource()},
@@ -217,12 +236,14 @@ func TestAdmit(t *testing.T) {
217236
infra: testClusterSNOInfra(),
218237
},
219238
{
220-
name: "should ignore pod when one of pod containers have both CPU limit and request",
239+
name: "should not ignore pod when one of pod containers have both CPU limit and request",
221240
pod: testManagedPod("500m", "250m", "500Mi", ""),
222-
expectedCpuRequest: resource.MustParse("250m"),
241+
expectedCpuRequest: resource.Quantity{},
223242
namespace: testManagedNamespace(),
224243
expectedAnnotations: map[string]string{
225-
workloadAdmissionWarning: fmt.Sprintf("skip pod CPUs requests modifications because pod container has both CPU limit and request"),
244+
fmt.Sprintf("%s%s", containerResourcesAnnotationPrefix, "test"): fmt.Sprintf(`{"%s":256,"cpulimit":500}`, containerResourcesAnnotationValueKeyCPUShares),
245+
fmt.Sprintf("%s%s", containerResourcesAnnotationPrefix, "initTest"): fmt.Sprintf(`{"%s":256,"cpulimit":500}`, containerResourcesAnnotationValueKeyCPUShares),
246+
fmt.Sprintf("%s%s", podWorkloadTargetAnnotationPrefix, workloadTypeManagement): fmt.Sprintf(`{"%s":"%s"}`, podWorkloadAnnotationEffect, workloadEffectPreferredDuringScheduling),
226247
},
227248
nodes: []*corev1.Node{testNodeWithManagementResource()},
228249
infra: testClusterSNOInfra(),
@@ -239,12 +260,12 @@ func TestAdmit(t *testing.T) {
239260
infra: testClusterSNOInfra(),
240261
},
241262
{
242-
name: "should not mutate the pod when at least one node does not have management resources",
263+
name: "should not mutate the pod when cpu partitioning is not set to AllNodes",
243264
pod: testManagedPod("500m", "250m", "500Mi", "250Mi"),
244265
expectedCpuRequest: resource.MustParse("250m"),
245266
namespace: testManagedNamespace(),
246267
nodes: []*corev1.Node{testNode()},
247-
infra: testClusterSNOInfra(),
268+
infra: testClusterInfraWithoutWorkloadPartitioning(),
248269
},
249270
{
250271
name: "should return admission error when the cluster does not have any nodes",
@@ -407,7 +428,7 @@ func TestValidate(t *testing.T) {
407428
),
408429
namespace: testManagedNamespace(),
409430
nodes: []*corev1.Node{testNodeWithManagementResource()},
410-
expectedError: fmt.Errorf("he pod resource annotation value should have only cpushares key"),
431+
expectedError: fmt.Errorf("json: unknown field \"cpuset\""),
411432
},
412433
{
413434
name: "should return invalid error when the pod does not have workload annotation, but has resource annotation",
@@ -437,16 +458,28 @@ func TestValidate(t *testing.T) {
437458
expectedError: fmt.Errorf("the pod without workload annotations can not have containers with workload resources %q", "management.workload.openshift.io/cores"),
438459
},
439460
{
440-
name: "should return invalid error when the pod has workload annotation, but the pod namespace does not have allowed annotation",
441-
pod: testManagedPod(
461+
name: "should return invalid error when the pod has workload annotation, but the pod namespace does not have allowed workload type",
462+
pod: testManagedPodWithWorkloadAnnotation(
442463
"500m",
443464
"250m",
444465
"500Mi",
445466
"250Mi",
467+
"non-existent",
446468
),
447-
namespace: testNamespace(),
469+
namespace: testManagedNamespace(),
448470
nodes: []*corev1.Node{testNodeWithManagementResource()},
449-
expectedError: fmt.Errorf("the pod can not have workload annotation, when the namespace %q does not allow it", "namespace"),
471+
expectedError: fmt.Errorf("the namespace %q does not allow the workload type %s", "managed-namespace", "non-existent"),
472+
},
473+
{
474+
name: "should not return any errors when the pod has workload annotation, but the pod namespace has no annotations",
475+
pod: testManagedPod(
476+
"500m",
477+
"250m",
478+
"500Mi",
479+
"250Mi",
480+
),
481+
namespace: testNamespace(),
482+
nodes: []*corev1.Node{testNodeWithManagementResource()},
450483
},
451484
{
452485
name: "should not return any errors when the pod and namespace valid",
@@ -532,19 +565,12 @@ func testManagedStaticPod(cpuLimit, cpuRequest, memoryLimit, memoryRequest strin
532565
}
533566

534567
func testManagedPod(cpuLimit, cpuRequest, memoryLimit, memoryRequest string) *kapi.Pod {
535-
pod := testPod(cpuLimit, cpuRequest, memoryLimit, memoryRequest)
536-
537-
pod.Annotations = map[string]string{}
538-
for _, c := range pod.Spec.InitContainers {
539-
cpusetAnnotation := fmt.Sprintf("%s%s", containerResourcesAnnotationPrefix, c.Name)
540-
pod.Annotations[cpusetAnnotation] = `{"cpuset": "0-1"}`
541-
}
542-
for _, c := range pod.Spec.Containers {
543-
cpusetAnnotation := fmt.Sprintf("%s%s", containerResourcesAnnotationPrefix, c.Name)
544-
pod.Annotations[cpusetAnnotation] = `{"cpuset": "0-1"}`
545-
}
568+
return testManagedPodWithWorkloadAnnotation(cpuLimit, cpuRequest, memoryLimit, memoryRequest, workloadTypeManagement)
569+
}
546570

547-
managementWorkloadAnnotation := fmt.Sprintf("%s%s", podWorkloadTargetAnnotationPrefix, workloadTypeManagement)
571+
func testManagedPodWithWorkloadAnnotation(cpuLimit, cpuRequest, memoryLimit, memoryRequest string, workloadType string) *kapi.Pod {
572+
pod := testPod(cpuLimit, cpuRequest, memoryLimit, memoryRequest)
573+
managementWorkloadAnnotation := fmt.Sprintf("%s%s", podWorkloadTargetAnnotationPrefix, workloadType)
548574
pod.Annotations = map[string]string{
549575
managementWorkloadAnnotation: fmt.Sprintf(`{"%s":"%s"}`, podWorkloadAnnotationEffect, workloadEffectPreferredDuringScheduling),
550576
}
@@ -675,9 +701,8 @@ func testClusterSNOInfra() *configv1.Infrastructure {
675701
}
676702
}
677703

678-
func testClusterInfraWithoutTopologyFields() *configv1.Infrastructure {
704+
func testClusterInfraWithoutWorkloadPartitioning() *configv1.Infrastructure {
679705
infra := testClusterSNOInfra()
680-
infra.Status.ControlPlaneTopology = ""
681-
infra.Status.InfrastructureTopology = ""
706+
infra.Status.CPUPartitioning = configv1.CPUPartitioningNone
682707
return infra
683708
}

0 commit comments

Comments
 (0)