Skip to content

Commit 04070bb

Browse files
rphillipssoltysh
authored andcommitted
UPSTREAM: <carry>: add management support to kubelet
UPSTREAM: <carry>: management workloads enhancement 741 UPSTREAM: <carry>: lower verbosity of managed workloads logging Support for managed workloads was introduced by PR#627. However, the the CPU manager reconcile loop now seems to flood kubelet log with "reconcileState: skipping pod; pod is managed" warnings. Lower the verbosity of these log messages. UPSTREAM: <carry>: set correctly static pods CPUs when workload partitioning is disabled UPSTREAM: <carry>: Remove reserved CPUs from default set Remove reserved CPUs from default set when workload partitioning is enabled. Co-Authored-By: Brent Rowsell <[email protected]> Signed-off-by: Artyom Lukianov <[email protected]> Signed-off-by: Don Penney <[email protected]> OpenShift-Rebase-Source: b762ced OpenShift-Rebase-Source: 63cf793 OpenShift-Rebase-Source: 32af64c UPSTREAM: <carry>: add management support to kubelet
1 parent 30d7d41 commit 04070bb

10 files changed

+511
-1
lines changed

Diff for: pkg/kubelet/cm/cpumanager/cpu_manager.go

+6
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,7 @@ import (
3535
"k8s.io/kubernetes/pkg/kubelet/cm/topologymanager"
3636
"k8s.io/kubernetes/pkg/kubelet/config"
3737
kubecontainer "k8s.io/kubernetes/pkg/kubelet/container"
38+
"k8s.io/kubernetes/pkg/kubelet/managed"
3839
"k8s.io/kubernetes/pkg/kubelet/status"
3940
"k8s.io/utils/cpuset"
4041
)
@@ -407,13 +408,18 @@ func (m *manager) reconcileState() (success []reconciledContainer, failure []rec
407408
failure = []reconciledContainer{}
408409

409410
m.removeStaleState()
411+
workloadEnabled := managed.IsEnabled()
410412
for _, pod := range m.activePods() {
411413
pstatus, ok := m.podStatusProvider.GetPodStatus(pod.UID)
412414
if !ok {
413415
klog.V(4).InfoS("ReconcileState: skipping pod; status not found", "pod", klog.KObj(pod))
414416
failure = append(failure, reconciledContainer{pod.Name, "", ""})
415417
continue
416418
}
419+
if enabled, _, _ := managed.IsPodManaged(pod); workloadEnabled && enabled {
420+
klog.V(4).InfoS("[cpumanager] reconcileState: skipping pod; pod is managed (pod: %s)", pod.Name)
421+
continue
422+
}
417423

418424
allContainers := pod.Spec.InitContainers
419425
allContainers = append(allContainers, pod.Spec.Containers...)

Diff for: pkg/kubelet/cm/cpumanager/policy_static.go

+5
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@ import (
2929
"k8s.io/kubernetes/pkg/kubelet/cm/cpumanager/topology"
3030
"k8s.io/kubernetes/pkg/kubelet/cm/topologymanager"
3131
"k8s.io/kubernetes/pkg/kubelet/cm/topologymanager/bitmask"
32+
"k8s.io/kubernetes/pkg/kubelet/managed"
3233
"k8s.io/kubernetes/pkg/kubelet/metrics"
3334
"k8s.io/kubernetes/pkg/kubelet/types"
3435
"k8s.io/utils/cpuset"
@@ -203,6 +204,10 @@ func (p *staticPolicy) validateState(s state.State) error {
203204
// state is empty initialize
204205
allCPUs := p.topology.CPUDetails.CPUs()
205206
s.SetDefaultCPUSet(allCPUs)
207+
if managed.IsEnabled() {
208+
defaultCpus := s.GetDefaultCPUSet().Difference(p.reservedCPUs)
209+
s.SetDefaultCPUSet(defaultCpus)
210+
}
206211
return nil
207212
}
208213

Diff for: pkg/kubelet/cm/qos_container_manager_linux.go

+4
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,7 @@ import (
3535
"k8s.io/kubernetes/pkg/api/v1/resource"
3636
v1qos "k8s.io/kubernetes/pkg/apis/core/v1/helper/qos"
3737
kubefeatures "k8s.io/kubernetes/pkg/features"
38+
"k8s.io/kubernetes/pkg/kubelet/managed"
3839
)
3940

4041
const (
@@ -173,6 +174,9 @@ func (m *qosContainerManagerImpl) setCPUCgroupConfig(configs map[v1.PodQOSClass]
173174
reuseReqs := make(v1.ResourceList, 4)
174175
for i := range pods {
175176
pod := pods[i]
177+
if enabled, _, _ := managed.IsPodManaged(pod); enabled {
178+
continue
179+
}
176180
qosClass := v1qos.GetPodQOS(pod)
177181
if qosClass != v1.PodQOSBurstable {
178182
// we only care about the burstable qos tier

Diff for: pkg/kubelet/config/file.go

+11
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@ import (
3030
"k8s.io/apimachinery/pkg/types"
3131
"k8s.io/client-go/tools/cache"
3232
api "k8s.io/kubernetes/pkg/apis/core"
33+
"k8s.io/kubernetes/pkg/kubelet/managed"
3334
kubetypes "k8s.io/kubernetes/pkg/kubelet/types"
3435
utilio "k8s.io/utils/io"
3536
)
@@ -230,6 +231,16 @@ func (s *sourceFile) extractFromFile(filename string) (pod *v1.Pod, err error) {
230231
if podErr != nil {
231232
return pod, podErr
232233
}
234+
if managed.IsEnabled() {
235+
if newPod, _, err := managed.ModifyStaticPodForPinnedManagement(pod); err != nil {
236+
klog.V(2).Error(err, "Static Pod is managed but errored", "name", pod.ObjectMeta.Name, "namespace", pod.ObjectMeta.Namespace)
237+
} else if newPod != nil {
238+
klog.V(2).InfoS("Static Pod is managed. Using modified pod", "name", newPod.ObjectMeta.Name, "namespace", newPod.ObjectMeta.Namespace, "annotations", newPod.Annotations)
239+
pod = newPod
240+
} else {
241+
klog.V(2).InfoS("Static Pod is not managed", "name", pod.ObjectMeta.Name, "namespace", pod.ObjectMeta.Namespace)
242+
}
243+
}
233244
return pod, nil
234245
}
235246

Diff for: pkg/kubelet/config/file_linux.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,7 @@ import (
2929
"github.com/fsnotify/fsnotify"
3030
"k8s.io/klog/v2"
3131

32-
"k8s.io/api/core/v1"
32+
v1 "k8s.io/api/core/v1"
3333
"k8s.io/apimachinery/pkg/util/wait"
3434
"k8s.io/client-go/util/flowcontrol"
3535
kubetypes "k8s.io/kubernetes/pkg/kubelet/types"

Diff for: pkg/kubelet/kubelet.go

+5
Original file line numberDiff line numberDiff line change
@@ -88,6 +88,7 @@ import (
8888
"k8s.io/kubernetes/pkg/kubelet/kuberuntime"
8989
"k8s.io/kubernetes/pkg/kubelet/lifecycle"
9090
"k8s.io/kubernetes/pkg/kubelet/logs"
91+
"k8s.io/kubernetes/pkg/kubelet/managed"
9192
"k8s.io/kubernetes/pkg/kubelet/metrics"
9293
"k8s.io/kubernetes/pkg/kubelet/metrics/collectors"
9394
"k8s.io/kubernetes/pkg/kubelet/network/dns"
@@ -616,6 +617,10 @@ func NewMainKubelet(kubeCfg *kubeletconfiginternal.KubeletConfiguration,
616617

617618
klet.runtimeService = kubeDeps.RemoteRuntimeService
618619

620+
if managed.IsEnabled() {
621+
klog.InfoS("Pinned Workload Management Enabled")
622+
}
623+
619624
if kubeDeps.KubeClient != nil {
620625
klet.runtimeClassManager = runtimeclass.NewManager(kubeDeps.KubeClient)
621626
}

Diff for: pkg/kubelet/kubelet_node_status.go

+27
Original file line numberDiff line numberDiff line change
@@ -38,7 +38,9 @@ import (
3838
"k8s.io/klog/v2"
3939
kubeletapis "k8s.io/kubelet/pkg/apis"
4040
v1helper "k8s.io/kubernetes/pkg/apis/core/v1/helper"
41+
"k8s.io/kubernetes/pkg/kubelet/cadvisor"
4142
"k8s.io/kubernetes/pkg/kubelet/events"
43+
"k8s.io/kubernetes/pkg/kubelet/managed"
4244
"k8s.io/kubernetes/pkg/kubelet/nodestatus"
4345
"k8s.io/kubernetes/pkg/kubelet/util"
4446
taintutil "k8s.io/kubernetes/pkg/util/taints"
@@ -118,6 +120,9 @@ func (kl *Kubelet) tryRegisterWithAPIServer(node *v1.Node) bool {
118120
requiresUpdate = kl.updateDefaultLabels(node, existingNode) || requiresUpdate
119121
requiresUpdate = kl.reconcileExtendedResource(node, existingNode) || requiresUpdate
120122
requiresUpdate = kl.reconcileHugePageResource(node, existingNode) || requiresUpdate
123+
if managed.IsEnabled() {
124+
requiresUpdate = kl.addManagementNodeCapacity(node, existingNode) || requiresUpdate
125+
}
121126
if requiresUpdate {
122127
if _, _, err := nodeutil.PatchNodeStatus(kl.kubeClient.CoreV1(), types.NodeName(kl.nodeName), originalNode, existingNode); err != nil {
123128
klog.ErrorS(err, "Unable to reconcile node with API server,error updating node", "node", klog.KObj(node))
@@ -128,6 +133,25 @@ func (kl *Kubelet) tryRegisterWithAPIServer(node *v1.Node) bool {
128133
return true
129134
}
130135

136+
// addManagementNodeCapacity adds the managednode capacity to the node
137+
func (kl *Kubelet) addManagementNodeCapacity(initialNode, existingNode *v1.Node) bool {
138+
updateDefaultResources(initialNode, existingNode)
139+
machineInfo, err := kl.cadvisor.MachineInfo()
140+
if err != nil {
141+
klog.Errorf("Unable to calculate managed node capacity for %q: %v", kl.nodeName, err)
142+
return false
143+
}
144+
cpuRequest := cadvisor.CapacityFromMachineInfo(machineInfo)[v1.ResourceCPU]
145+
cpuRequestInMilli := cpuRequest.MilliValue()
146+
newCPURequest := resource.NewMilliQuantity(cpuRequestInMilli*1000, cpuRequest.Format)
147+
managedResourceName := managed.GenerateResourceName("management")
148+
if existingCapacity, ok := existingNode.Status.Capacity[managedResourceName]; ok && existingCapacity.Equal(*newCPURequest) {
149+
return false
150+
}
151+
existingNode.Status.Capacity[managedResourceName] = *newCPURequest
152+
return true
153+
}
154+
131155
// reconcileHugePageResource will update huge page capacity for each page size and remove huge page sizes no longer supported
132156
func (kl *Kubelet) reconcileHugePageResource(initialNode, existingNode *v1.Node) bool {
133157
requiresUpdate := updateDefaultResources(initialNode, existingNode)
@@ -427,6 +451,9 @@ func (kl *Kubelet) initialNode(ctx context.Context) (*v1.Node, error) {
427451
}
428452
}
429453
}
454+
if managed.IsEnabled() {
455+
kl.addManagementNodeCapacity(node, node)
456+
}
430457

431458
kl.setNodeStatus(ctx, node)
432459

Diff for: pkg/kubelet/managed/cpu_shares.go

+30
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,30 @@
1+
package managed
2+
3+
const (
4+
// These limits are defined in the kernel:
5+
// https://github.com/torvalds/linux/blob/0bddd227f3dc55975e2b8dfa7fc6f959b062a2c7/kernel/sched/sched.h#L427-L428
6+
MinShares = 2
7+
MaxShares = 262144
8+
9+
SharesPerCPU = 1024
10+
MilliCPUToCPU = 1000
11+
)
12+
13+
// MilliCPUToShares converts the milliCPU to CFS shares.
14+
func MilliCPUToShares(milliCPU int64) uint64 {
15+
if milliCPU == 0 {
16+
// Docker converts zero milliCPU to unset, which maps to kernel default
17+
// for unset: 1024. Return 2 here to really match kernel default for
18+
// zero milliCPU.
19+
return MinShares
20+
}
21+
// Conceptually (milliCPU / milliCPUToCPU) * sharesPerCPU, but factored to improve rounding.
22+
shares := (milliCPU * SharesPerCPU) / MilliCPUToCPU
23+
if shares < MinShares {
24+
return MinShares
25+
}
26+
if shares > MaxShares {
27+
return MaxShares
28+
}
29+
return uint64(shares)
30+
}

Diff for: pkg/kubelet/managed/managed.go

+141
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,141 @@
1+
/*
2+
Copyright 2021 The Kubernetes Authors.
3+
4+
Licensed under the Apache License, Version 2.0 (the "License");
5+
you may not use this file except in compliance with the License.
6+
You may obtain a copy of the License at
7+
8+
http://www.apache.org/licenses/LICENSE-2.0
9+
10+
Unless required by applicable law or agreed to in writing, software
11+
distributed under the License is distributed on an "AS IS" BASIS,
12+
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
See the License for the specific language governing permissions and
14+
limitations under the License.
15+
*/
16+
17+
package managed
18+
19+
import (
20+
"encoding/json"
21+
"fmt"
22+
"os"
23+
"strings"
24+
25+
v1 "k8s.io/api/core/v1"
26+
"k8s.io/apimachinery/pkg/api/resource"
27+
)
28+
29+
var (
30+
pinnedManagementEnabled bool
31+
pinnedManagementFilename = "/etc/kubernetes/openshift-workload-pinning"
32+
WorkloadsAnnotationPrefix = "target.workload.openshift.io/"
33+
WorkloadsCapacitySuffix = "workload.openshift.io/cores"
34+
ContainerAnnotationFormat = "resources.workload.openshift.io/%v"
35+
)
36+
37+
type WorkloadContainerAnnotation struct {
38+
CpuShares uint64 `json:"cpushares"`
39+
}
40+
41+
func NewWorkloadContainerAnnotation(cpushares uint64) WorkloadContainerAnnotation {
42+
return WorkloadContainerAnnotation{
43+
CpuShares: cpushares,
44+
}
45+
}
46+
47+
func (w WorkloadContainerAnnotation) Serialize() ([]byte, error) {
48+
return json.Marshal(w)
49+
}
50+
51+
func init() {
52+
readEnablementFile()
53+
}
54+
55+
func readEnablementFile() {
56+
if _, err := os.Stat(pinnedManagementFilename); err == nil {
57+
pinnedManagementEnabled = true
58+
}
59+
}
60+
61+
func IsEnabled() bool {
62+
return pinnedManagementEnabled
63+
}
64+
65+
// IsPodManaged returns true and the name of the workload if enabled.
66+
// returns true, workload name, and the annotation payload.
67+
func IsPodManaged(pod *v1.Pod) (bool, string, string) {
68+
if pod.ObjectMeta.Annotations == nil {
69+
return false, "", ""
70+
}
71+
for annotation, value := range pod.ObjectMeta.Annotations {
72+
if strings.HasPrefix(annotation, WorkloadsAnnotationPrefix) {
73+
return true, strings.TrimPrefix(annotation, WorkloadsAnnotationPrefix), value
74+
}
75+
}
76+
return false, "", ""
77+
}
78+
79+
// ModifyStaticPodForPinnedManagement will modify a pod for pod management
80+
func ModifyStaticPodForPinnedManagement(pod *v1.Pod) (*v1.Pod, string, error) {
81+
pod = pod.DeepCopy()
82+
enabled, workloadName, value := IsPodManaged(pod)
83+
if !enabled {
84+
return nil, "", nil
85+
}
86+
if pod.Annotations == nil {
87+
pod.Annotations = make(map[string]string)
88+
}
89+
pod.Annotations[fmt.Sprintf("%v%v", WorkloadsAnnotationPrefix, workloadName)] = value
90+
if err := updateContainers(workloadName, pod); err != nil {
91+
return nil, "", err
92+
}
93+
return pod, workloadName, nil
94+
}
95+
96+
func GenerateResourceName(workloadName string) v1.ResourceName {
97+
return v1.ResourceName(fmt.Sprintf("%v.%v", workloadName, WorkloadsCapacitySuffix))
98+
}
99+
100+
func updateContainers(workloadName string, pod *v1.Pod) error {
101+
updateContainer := func(container *v1.Container) error {
102+
if container.Resources.Requests == nil {
103+
return fmt.Errorf("managed container %v does not have Resource.Requests", container.Name)
104+
}
105+
if _, ok := container.Resources.Requests[v1.ResourceCPU]; !ok {
106+
return fmt.Errorf("managed container %v does not have cpu requests", container.Name)
107+
}
108+
if _, ok := container.Resources.Requests[v1.ResourceMemory]; !ok {
109+
return fmt.Errorf("managed container %v does not have memory requests", container.Name)
110+
}
111+
if container.Resources.Limits == nil {
112+
container.Resources.Limits = v1.ResourceList{}
113+
}
114+
cpuRequest := container.Resources.Requests[v1.ResourceCPU]
115+
cpuRequestInMilli := cpuRequest.MilliValue()
116+
117+
containerAnnotation := NewWorkloadContainerAnnotation(MilliCPUToShares(cpuRequestInMilli))
118+
jsonAnnotation, _ := containerAnnotation.Serialize()
119+
containerNameKey := fmt.Sprintf(ContainerAnnotationFormat, container.Name)
120+
121+
newCPURequest := resource.NewMilliQuantity(cpuRequestInMilli*1000, cpuRequest.Format)
122+
123+
pod.Annotations[containerNameKey] = string(jsonAnnotation)
124+
container.Resources.Requests[GenerateResourceName(workloadName)] = *newCPURequest
125+
container.Resources.Limits[GenerateResourceName(workloadName)] = *newCPURequest
126+
127+
delete(container.Resources.Requests, v1.ResourceCPU)
128+
return nil
129+
}
130+
for idx := range pod.Spec.Containers {
131+
if err := updateContainer(&pod.Spec.Containers[idx]); err != nil {
132+
return err
133+
}
134+
}
135+
for idx := range pod.Spec.InitContainers {
136+
if err := updateContainer(&pod.Spec.InitContainers[idx]); err != nil {
137+
return err
138+
}
139+
}
140+
return nil
141+
}

0 commit comments

Comments
 (0)