Skip to content

Commit b762ced

Browse files
rphillipssoltysh
authored andcommitted
UPSTREAM: <carry>: add management support to kubelet
UPSTREAM: <carry>: management workloads enhancement 741 UPSTREAM: <carry>: lower verbosity of managed workloads logging Support for managed workloads was introduced by PR#627. However, the the CPU manager reconcile loop now seems to flood kubelet log with "reconcileState: skipping pod; pod is managed" warnings. Lower the verbosity of these log messages. openshift-rebase(v1.24):source=9e63356d4a9
1 parent 3baaa16 commit b762ced

File tree

9 files changed

+505
-1
lines changed

9 files changed

+505
-1
lines changed

pkg/kubelet/cm/cpumanager/cpu_manager.go

+5
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,7 @@ import (
3535
"k8s.io/kubernetes/pkg/kubelet/cm/topologymanager"
3636
"k8s.io/kubernetes/pkg/kubelet/config"
3737
kubecontainer "k8s.io/kubernetes/pkg/kubelet/container"
38+
"k8s.io/kubernetes/pkg/kubelet/managed"
3839
"k8s.io/kubernetes/pkg/kubelet/status"
3940
)
4041

@@ -412,6 +413,10 @@ func (m *manager) reconcileState() (success []reconciledContainer, failure []rec
412413
failure = append(failure, reconciledContainer{pod.Name, "", ""})
413414
continue
414415
}
416+
if enabled, _, _ := managed.IsPodManaged(pod); enabled {
417+
klog.V(4).InfoS("[cpumanager] reconcileState: skipping pod; pod is managed (pod: %s)", pod.Name)
418+
continue
419+
}
415420

416421
allContainers := pod.Spec.InitContainers
417422
allContainers = append(allContainers, pod.Spec.Containers...)

pkg/kubelet/cm/qos_container_manager_linux.go

+4
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,7 @@ import (
3434
"k8s.io/kubernetes/pkg/api/v1/resource"
3535
v1qos "k8s.io/kubernetes/pkg/apis/core/v1/helper/qos"
3636
kubefeatures "k8s.io/kubernetes/pkg/features"
37+
"k8s.io/kubernetes/pkg/kubelet/managed"
3738
)
3839

3940
const (
@@ -171,6 +172,9 @@ func (m *qosContainerManagerImpl) setCPUCgroupConfig(configs map[v1.PodQOSClass]
171172
burstablePodCPURequest := int64(0)
172173
for i := range pods {
173174
pod := pods[i]
175+
if enabled, _, _ := managed.IsPodManaged(pod); enabled {
176+
continue
177+
}
174178
qosClass := v1qos.GetPodQOS(pod)
175179
if qosClass != v1.PodQOSBurstable {
176180
// we only care about the burstable qos tier

pkg/kubelet/config/file.go

+11
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@ import (
3030
"k8s.io/apimachinery/pkg/types"
3131
"k8s.io/client-go/tools/cache"
3232
api "k8s.io/kubernetes/pkg/apis/core"
33+
"k8s.io/kubernetes/pkg/kubelet/managed"
3334
kubetypes "k8s.io/kubernetes/pkg/kubelet/types"
3435
utilio "k8s.io/utils/io"
3536
)
@@ -230,6 +231,16 @@ func (s *sourceFile) extractFromFile(filename string) (pod *v1.Pod, err error) {
230231
if podErr != nil {
231232
return pod, podErr
232233
}
234+
if managed.IsEnabled() {
235+
if newPod, _, err := managed.ModifyStaticPodForPinnedManagement(pod); err != nil {
236+
klog.V(2).Error(err, "Static Pod is managed but errored", "name", pod.ObjectMeta.Name, "namespace", pod.ObjectMeta.Namespace)
237+
} else if newPod != nil {
238+
klog.V(2).InfoS("Static Pod is managed. Using modified pod", "name", newPod.ObjectMeta.Name, "namespace", newPod.ObjectMeta.Namespace, "annotations", newPod.Annotations)
239+
pod = newPod
240+
} else {
241+
klog.V(2).InfoS("Static Pod is not managed", "name", pod.ObjectMeta.Name, "namespace", pod.ObjectMeta.Namespace)
242+
}
243+
}
233244
return pod, nil
234245
}
235246

pkg/kubelet/config/file_linux.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,7 @@ import (
2929
"github.com/fsnotify/fsnotify"
3030
"k8s.io/klog/v2"
3131

32-
"k8s.io/api/core/v1"
32+
v1 "k8s.io/api/core/v1"
3333
"k8s.io/apimachinery/pkg/util/wait"
3434
"k8s.io/client-go/util/flowcontrol"
3535
kubetypes "k8s.io/kubernetes/pkg/kubelet/types"

pkg/kubelet/kubelet.go

+5
Original file line numberDiff line numberDiff line change
@@ -83,6 +83,7 @@ import (
8383
"k8s.io/kubernetes/pkg/kubelet/kuberuntime"
8484
"k8s.io/kubernetes/pkg/kubelet/lifecycle"
8585
"k8s.io/kubernetes/pkg/kubelet/logs"
86+
"k8s.io/kubernetes/pkg/kubelet/managed"
8687
"k8s.io/kubernetes/pkg/kubelet/metrics"
8788
"k8s.io/kubernetes/pkg/kubelet/metrics/collectors"
8889
"k8s.io/kubernetes/pkg/kubelet/network/dns"
@@ -586,6 +587,10 @@ func NewMainKubelet(kubeCfg *kubeletconfiginternal.KubeletConfiguration,
586587

587588
klet.runtimeService = kubeDeps.RemoteRuntimeService
588589

590+
if managed.IsEnabled() {
591+
klog.InfoS("Pinned Workload Management Enabled")
592+
}
593+
589594
if kubeDeps.KubeClient != nil {
590595
klet.runtimeClassManager = runtimeclass.NewManager(kubeDeps.KubeClient)
591596
}

pkg/kubelet/kubelet_node_status.go

+27
Original file line numberDiff line numberDiff line change
@@ -38,7 +38,9 @@ import (
3838
"k8s.io/klog/v2"
3939
kubeletapis "k8s.io/kubelet/pkg/apis"
4040
v1helper "k8s.io/kubernetes/pkg/apis/core/v1/helper"
41+
"k8s.io/kubernetes/pkg/kubelet/cadvisor"
4142
"k8s.io/kubernetes/pkg/kubelet/events"
43+
"k8s.io/kubernetes/pkg/kubelet/managed"
4244
"k8s.io/kubernetes/pkg/kubelet/nodestatus"
4345
"k8s.io/kubernetes/pkg/kubelet/util"
4446
taintutil "k8s.io/kubernetes/pkg/util/taints"
@@ -114,6 +116,9 @@ func (kl *Kubelet) tryRegisterWithAPIServer(node *v1.Node) bool {
114116
requiresUpdate = kl.updateDefaultLabels(node, existingNode) || requiresUpdate
115117
requiresUpdate = kl.reconcileExtendedResource(node, existingNode) || requiresUpdate
116118
requiresUpdate = kl.reconcileHugePageResource(node, existingNode) || requiresUpdate
119+
if managed.IsEnabled() {
120+
requiresUpdate = kl.addManagementNodeCapacity(node, existingNode) || requiresUpdate
121+
}
117122
if requiresUpdate {
118123
if _, _, err := nodeutil.PatchNodeStatus(kl.kubeClient.CoreV1(), types.NodeName(kl.nodeName), originalNode, existingNode); err != nil {
119124
klog.ErrorS(err, "Unable to reconcile node with API server,error updating node", "node", klog.KObj(node))
@@ -124,6 +129,25 @@ func (kl *Kubelet) tryRegisterWithAPIServer(node *v1.Node) bool {
124129
return true
125130
}
126131

132+
// addManagementNodeCapacity adds the managednode capacity to the node
133+
func (kl *Kubelet) addManagementNodeCapacity(initialNode, existingNode *v1.Node) bool {
134+
updateDefaultResources(initialNode, existingNode)
135+
machineInfo, err := kl.cadvisor.MachineInfo()
136+
if err != nil {
137+
klog.Errorf("Unable to calculate managed node capacity for %q: %v", kl.nodeName, err)
138+
return false
139+
}
140+
cpuRequest := cadvisor.CapacityFromMachineInfo(machineInfo)[v1.ResourceCPU]
141+
cpuRequestInMilli := cpuRequest.MilliValue()
142+
newCPURequest := resource.NewMilliQuantity(cpuRequestInMilli*1000, cpuRequest.Format)
143+
managedResourceName := managed.GenerateResourceName("management")
144+
if existingCapacity, ok := existingNode.Status.Capacity[managedResourceName]; ok && existingCapacity.Equal(*newCPURequest) {
145+
return false
146+
}
147+
existingNode.Status.Capacity[managedResourceName] = *newCPURequest
148+
return true
149+
}
150+
127151
// reconcileHugePageResource will update huge page capacity for each page size and remove huge page sizes no longer supported
128152
func (kl *Kubelet) reconcileHugePageResource(initialNode, existingNode *v1.Node) bool {
129153
requiresUpdate := updateDefaultResources(initialNode, existingNode)
@@ -423,6 +447,9 @@ func (kl *Kubelet) initialNode(ctx context.Context) (*v1.Node, error) {
423447
}
424448
}
425449
}
450+
if managed.IsEnabled() {
451+
kl.addManagementNodeCapacity(node, node)
452+
}
426453

427454
kl.setNodeStatus(node)
428455

pkg/kubelet/managed/cpu_shares.go

+30
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,30 @@
1+
package managed
2+
3+
const (
4+
// These limits are defined in the kernel:
5+
// https://github.com/torvalds/linux/blob/0bddd227f3dc55975e2b8dfa7fc6f959b062a2c7/kernel/sched/sched.h#L427-L428
6+
MinShares = 2
7+
MaxShares = 262144
8+
9+
SharesPerCPU = 1024
10+
MilliCPUToCPU = 1000
11+
)
12+
13+
// MilliCPUToShares converts the milliCPU to CFS shares.
14+
func MilliCPUToShares(milliCPU int64) uint64 {
15+
if milliCPU == 0 {
16+
// Docker converts zero milliCPU to unset, which maps to kernel default
17+
// for unset: 1024. Return 2 here to really match kernel default for
18+
// zero milliCPU.
19+
return MinShares
20+
}
21+
// Conceptually (milliCPU / milliCPUToCPU) * sharesPerCPU, but factored to improve rounding.
22+
shares := (milliCPU * SharesPerCPU) / MilliCPUToCPU
23+
if shares < MinShares {
24+
return MinShares
25+
}
26+
if shares > MaxShares {
27+
return MaxShares
28+
}
29+
return uint64(shares)
30+
}

pkg/kubelet/managed/managed.go

+141
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,141 @@
1+
/*
2+
Copyright 2021 The Kubernetes Authors.
3+
4+
Licensed under the Apache License, Version 2.0 (the "License");
5+
you may not use this file except in compliance with the License.
6+
You may obtain a copy of the License at
7+
8+
http://www.apache.org/licenses/LICENSE-2.0
9+
10+
Unless required by applicable law or agreed to in writing, software
11+
distributed under the License is distributed on an "AS IS" BASIS,
12+
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
See the License for the specific language governing permissions and
14+
limitations under the License.
15+
*/
16+
17+
package managed
18+
19+
import (
20+
"encoding/json"
21+
"fmt"
22+
"os"
23+
"strings"
24+
25+
v1 "k8s.io/api/core/v1"
26+
"k8s.io/apimachinery/pkg/api/resource"
27+
)
28+
29+
var (
30+
pinnedManagementEnabled bool
31+
pinnedManagementFilename = "/etc/kubernetes/openshift-workload-pinning"
32+
WorkloadsAnnotationPrefix = "target.workload.openshift.io/"
33+
WorkloadsCapacitySuffix = "workload.openshift.io/cores"
34+
ContainerAnnotationFormat = "resources.workload.openshift.io/%v"
35+
)
36+
37+
type WorkloadContainerAnnotation struct {
38+
CpuShares uint64 `json:"cpushares"`
39+
}
40+
41+
func NewWorkloadContainerAnnotation(cpushares uint64) WorkloadContainerAnnotation {
42+
return WorkloadContainerAnnotation{
43+
CpuShares: cpushares,
44+
}
45+
}
46+
47+
func (w WorkloadContainerAnnotation) Serialize() ([]byte, error) {
48+
return json.Marshal(w)
49+
}
50+
51+
func init() {
52+
readEnablementFile()
53+
}
54+
55+
func readEnablementFile() {
56+
if _, err := os.Stat(pinnedManagementFilename); err == nil {
57+
pinnedManagementEnabled = true
58+
}
59+
}
60+
61+
func IsEnabled() bool {
62+
return pinnedManagementEnabled
63+
}
64+
65+
/// IsPodManaged returns true and the name of the workload if enabled.
66+
/// returns true, workload name, and the annotation payload.
67+
func IsPodManaged(pod *v1.Pod) (bool, string, string) {
68+
if pod.ObjectMeta.Annotations == nil {
69+
return false, "", ""
70+
}
71+
for annotation, value := range pod.ObjectMeta.Annotations {
72+
if strings.HasPrefix(annotation, WorkloadsAnnotationPrefix) {
73+
return true, strings.TrimPrefix(annotation, WorkloadsAnnotationPrefix), value
74+
}
75+
}
76+
return false, "", ""
77+
}
78+
79+
// ModifyStaticPodForPinnedManagement will modify a pod for pod management
80+
func ModifyStaticPodForPinnedManagement(pod *v1.Pod) (*v1.Pod, string, error) {
81+
pod = pod.DeepCopy()
82+
enabled, workloadName, value := IsPodManaged(pod)
83+
if !enabled {
84+
return nil, "", nil
85+
}
86+
if pod.Annotations == nil {
87+
pod.Annotations = make(map[string]string)
88+
}
89+
pod.Annotations[fmt.Sprintf("%v%v", WorkloadsAnnotationPrefix, workloadName)] = value
90+
if err := updateContainers(workloadName, pod); err != nil {
91+
return nil, "", err
92+
}
93+
return pod, workloadName, nil
94+
}
95+
96+
func GenerateResourceName(workloadName string) v1.ResourceName {
97+
return v1.ResourceName(fmt.Sprintf("%v.%v", workloadName, WorkloadsCapacitySuffix))
98+
}
99+
100+
func updateContainers(workloadName string, pod *v1.Pod) error {
101+
updateContainer := func(container *v1.Container) error {
102+
if container.Resources.Requests == nil {
103+
return fmt.Errorf("managed container %v does not have Resource.Requests", container.Name)
104+
}
105+
if _, ok := container.Resources.Requests[v1.ResourceCPU]; !ok {
106+
return fmt.Errorf("managed container %v does not have cpu requests", container.Name)
107+
}
108+
if _, ok := container.Resources.Requests[v1.ResourceMemory]; !ok {
109+
return fmt.Errorf("managed container %v does not have memory requests", container.Name)
110+
}
111+
if container.Resources.Limits == nil {
112+
container.Resources.Limits = v1.ResourceList{}
113+
}
114+
cpuRequest := container.Resources.Requests[v1.ResourceCPU]
115+
cpuRequestInMilli := cpuRequest.MilliValue()
116+
117+
containerAnnotation := NewWorkloadContainerAnnotation(MilliCPUToShares(cpuRequestInMilli))
118+
jsonAnnotation, _ := containerAnnotation.Serialize()
119+
containerNameKey := fmt.Sprintf(ContainerAnnotationFormat, container.Name)
120+
121+
newCPURequest := resource.NewMilliQuantity(cpuRequestInMilli*1000, cpuRequest.Format)
122+
123+
pod.Annotations[containerNameKey] = string(jsonAnnotation)
124+
container.Resources.Requests[GenerateResourceName(workloadName)] = *newCPURequest
125+
container.Resources.Limits[GenerateResourceName(workloadName)] = *newCPURequest
126+
127+
delete(container.Resources.Requests, v1.ResourceCPU)
128+
return nil
129+
}
130+
for idx := range pod.Spec.Containers {
131+
if err := updateContainer(&pod.Spec.Containers[idx]); err != nil {
132+
return err
133+
}
134+
}
135+
for idx := range pod.Spec.InitContainers {
136+
if err := updateContainer(&pod.Spec.InitContainers[idx]); err != nil {
137+
return err
138+
}
139+
}
140+
return nil
141+
}

0 commit comments

Comments
 (0)