Skip to content

Commit 601749f

Browse files
rphillipsbertinatto
authored andcommitted
UPSTREAM: <carry>: add management support to kubelet
UPSTREAM: <carry>: management workloads enhancement 741 UPSTREAM: <carry>: lower verbosity of managed workloads logging Support for managed workloads was introduced by PR#627. However, the the CPU manager reconcile loop now seems to flood kubelet log with "reconcileState: skipping pod; pod is managed" warnings. Lower the verbosity of these log messages. UPSTREAM: <carry>: set correctly static pods CPUs when workload partitioning is disabled UPSTREAM: <carry>: Remove reserved CPUs from default set Remove reserved CPUs from default set when workload partitioning is enabled. Co-Authored-By: Brent Rowsell <[email protected]> Signed-off-by: Artyom Lukianov <[email protected]> Signed-off-by: Don Penney <[email protected]> OpenShift-Rebase-Source: b762ced OpenShift-Rebase-Source: 63cf793 OpenShift-Rebase-Source: 32af64c UPSTREAM: <carry>: add management support to kubelet UPSTREAM: <carry>: OCPBUGS-29520: fix cpu manager default cpuset check in workload partitioned env (this can be squashed to 04070bb UPSTREAM: : add management support to kubelet) Workload partitioning makes the separation between reserved and workload cpus more strict. It is therefore expected the reserved cpus are NOT part of the default cpuset and the existing check was overzealous. First execution of kubelet after reboot never gets here as the cpuset is computed on line 209. However a kubelet restart without reboot skips this code, recovers from state file and runs the check on line 220. This was uncovered by decoupling the cpu manager state file cleanup from kubelet restart, doing it only once at reboot as part of OCPBUGS-24366 UPSTREAM: <carry>: add management workload check for guaranteed qos when static pods have workload partitioning enabled we should not alter their resources if they are Guaranteed QoS, this change adds a check for Guaranteed QoS Signed-off-by: ehila <[email protected]> test: add unit tests for error states Signed-off-by: ehila <[email protected]>
1 parent d2c005a commit 601749f

10 files changed

+1019
-17
lines changed

pkg/kubelet/cm/cpumanager/cpu_manager.go

+6
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,7 @@ import (
3535
"k8s.io/kubernetes/pkg/kubelet/cm/topologymanager"
3636
"k8s.io/kubernetes/pkg/kubelet/config"
3737
kubecontainer "k8s.io/kubernetes/pkg/kubelet/container"
38+
"k8s.io/kubernetes/pkg/kubelet/managed"
3839
"k8s.io/kubernetes/pkg/kubelet/status"
3940
"k8s.io/utils/cpuset"
4041
)
@@ -411,13 +412,18 @@ func (m *manager) reconcileState() (success []reconciledContainer, failure []rec
411412
failure = []reconciledContainer{}
412413

413414
m.removeStaleState()
415+
workloadEnabled := managed.IsEnabled()
414416
for _, pod := range m.activePods() {
415417
pstatus, ok := m.podStatusProvider.GetPodStatus(pod.UID)
416418
if !ok {
417419
klog.V(5).InfoS("ReconcileState: skipping pod; status not found", "pod", klog.KObj(pod))
418420
failure = append(failure, reconciledContainer{pod.Name, "", ""})
419421
continue
420422
}
423+
if enabled, _, _ := managed.IsPodManaged(pod); workloadEnabled && enabled {
424+
klog.V(4).InfoS("[cpumanager] reconcileState: skipping pod; pod is managed (pod: %s)", pod.Name)
425+
continue
426+
}
421427

422428
allContainers := pod.Spec.InitContainers
423429
allContainers = append(allContainers, pod.Spec.Containers...)

pkg/kubelet/cm/cpumanager/policy_static.go

+18-4
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@ import (
3030
"k8s.io/kubernetes/pkg/kubelet/cm/cpumanager/topology"
3131
"k8s.io/kubernetes/pkg/kubelet/cm/topologymanager"
3232
"k8s.io/kubernetes/pkg/kubelet/cm/topologymanager/bitmask"
33+
"k8s.io/kubernetes/pkg/kubelet/managed"
3334
"k8s.io/kubernetes/pkg/kubelet/metrics"
3435
"k8s.io/utils/cpuset"
3536
)
@@ -215,6 +216,10 @@ func (p *staticPolicy) validateState(s state.State) error {
215216
// state is empty initialize
216217
s.SetDefaultCPUSet(allCPUs)
217218
klog.InfoS("Static policy initialized", "defaultCPUSet", allCPUs)
219+
if managed.IsEnabled() {
220+
defaultCpus := s.GetDefaultCPUSet().Difference(p.reservedCPUs)
221+
s.SetDefaultCPUSet(defaultCpus)
222+
}
218223
return nil
219224
}
220225

@@ -228,7 +233,9 @@ func (p *staticPolicy) validateState(s state.State) error {
228233
p.reservedCPUs.Intersection(tmpDefaultCPUset).String(), tmpDefaultCPUset.String())
229234
}
230235
} else {
231-
if !p.reservedCPUs.Intersection(tmpDefaultCPUset).Equals(p.reservedCPUs) {
236+
// 2. This only applies when managed mode is disabled. Active workload partitioning feature
237+
// removes the reserved cpus from the default cpu mask on purpose.
238+
if !managed.IsEnabled() && !p.reservedCPUs.Intersection(tmpDefaultCPUset).Equals(p.reservedCPUs) {
232239
return fmt.Errorf("not all reserved cpus: \"%s\" are present in defaultCpuSet: \"%s\"",
233240
p.reservedCPUs.String(), tmpDefaultCPUset.String())
234241
}
@@ -260,10 +267,17 @@ func (p *staticPolicy) validateState(s state.State) error {
260267
}
261268
}
262269
totalKnownCPUs = totalKnownCPUs.Union(tmpCPUSets...)
263-
if !totalKnownCPUs.Equals(allCPUs) {
264-
return fmt.Errorf("current set of available CPUs \"%s\" doesn't match with CPUs in state \"%s\"",
265-
allCPUs.String(), totalKnownCPUs.String())
270+
availableCPUs := p.topology.CPUDetails.CPUs()
266271

272+
// CPU (workload) partitioning removes reserved cpus
273+
// from the default mask intentionally
274+
if managed.IsEnabled() {
275+
availableCPUs = availableCPUs.Difference(p.reservedCPUs)
276+
}
277+
278+
if !totalKnownCPUs.Equals(availableCPUs) {
279+
return fmt.Errorf("current set of available CPUs \"%s\" doesn't match with CPUs in state \"%s\"",
280+
availableCPUs.String(), totalKnownCPUs.String())
267281
}
268282

269283
return nil

pkg/kubelet/cm/cpumanager/policy_static_test.go

+39-13
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,8 @@ import (
2121
"reflect"
2222
"testing"
2323

24+
"k8s.io/kubernetes/pkg/kubelet/managed"
25+
2426
v1 "k8s.io/api/core/v1"
2527
"k8s.io/apimachinery/pkg/types"
2628
utilfeature "k8s.io/apiserver/pkg/util/feature"
@@ -982,19 +984,20 @@ func TestTopologyAwareAllocateCPUs(t *testing.T) {
982984
// above test cases are without kubelet --reserved-cpus cmd option
983985
// the following tests are with --reserved-cpus configured
984986
type staticPolicyTestWithResvList struct {
985-
description string
986-
topo *topology.CPUTopology
987-
numReservedCPUs int
988-
reserved cpuset.CPUSet
989-
cpuPolicyOptions map[string]string
990-
stAssignments state.ContainerCPUAssignments
991-
stDefaultCPUSet cpuset.CPUSet
992-
pod *v1.Pod
993-
expErr error
994-
expNewErr error
995-
expCPUAlloc bool
996-
expCSet cpuset.CPUSet
997-
expUncoreCache cpuset.CPUSet // represents the expected UncoreCacheIDs
987+
description string
988+
topo *topology.CPUTopology
989+
numReservedCPUs int
990+
reserved cpuset.CPUSet
991+
cpuPolicyOptions map[string]string
992+
stAssignments state.ContainerCPUAssignments
993+
stDefaultCPUSet cpuset.CPUSet
994+
pod *v1.Pod
995+
expErr error
996+
expNewErr error
997+
expCPUAlloc bool
998+
expCSet cpuset.CPUSet
999+
expUncoreCache cpuset.CPUSet // represents the expected UncoreCacheIDs
1000+
managementPartition bool
9981001
}
9991002

10001003
func TestStaticPolicyStartWithResvList(t *testing.T) {
@@ -1046,9 +1049,32 @@ func TestStaticPolicyStartWithResvList(t *testing.T) {
10461049
stDefaultCPUSet: cpuset.New(0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11),
10471050
expNewErr: fmt.Errorf("[cpumanager] unable to reserve the required amount of CPUs (size of 0-1 did not equal 1)"),
10481051
},
1052+
{
1053+
description: "reserved cores 0 & 6 are not present in available cpuset when management partitioning is enabled",
1054+
topo: topoDualSocketHT,
1055+
numReservedCPUs: 2,
1056+
stAssignments: state.ContainerCPUAssignments{},
1057+
managementPartition: true,
1058+
expCSet: cpuset.New(1, 2, 3, 4, 5, 7, 8, 9, 10, 11),
1059+
},
1060+
{
1061+
description: "reserved cores 0 & 6 are not present in available cpuset when management partitioning is enabled during recovery",
1062+
topo: topoDualSocketHT,
1063+
numReservedCPUs: 2,
1064+
stAssignments: state.ContainerCPUAssignments{},
1065+
stDefaultCPUSet: cpuset.New(1, 2, 3, 4, 5, 7, 8, 9, 10, 11),
1066+
managementPartition: true,
1067+
expCSet: cpuset.New(1, 2, 3, 4, 5, 7, 8, 9, 10, 11),
1068+
},
10491069
}
10501070
for _, testCase := range testCases {
10511071
t.Run(testCase.description, func(t *testing.T) {
1072+
wasManaged := managed.IsEnabled()
1073+
managed.TestOnlySetEnabled(testCase.managementPartition)
1074+
defer func() {
1075+
managed.TestOnlySetEnabled(wasManaged)
1076+
}()
1077+
featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, pkgfeatures.CPUManagerPolicyAlphaOptions, true)
10521078
p, err := NewStaticPolicy(testCase.topo, testCase.numReservedCPUs, testCase.reserved, topologymanager.NewFakeManager(), testCase.cpuPolicyOptions)
10531079
if !reflect.DeepEqual(err, testCase.expNewErr) {
10541080
t.Errorf("StaticPolicy Start() error (%v). expected error: %v but got: %v",

pkg/kubelet/cm/qos_container_manager_linux.go

+4
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,7 @@ import (
3535
"k8s.io/component-helpers/resource"
3636
v1qos "k8s.io/kubernetes/pkg/apis/core/v1/helper/qos"
3737
kubefeatures "k8s.io/kubernetes/pkg/features"
38+
"k8s.io/kubernetes/pkg/kubelet/managed"
3839
)
3940

4041
const (
@@ -174,6 +175,9 @@ func (m *qosContainerManagerImpl) setCPUCgroupConfig(configs map[v1.PodQOSClass]
174175
reuseReqs := make(v1.ResourceList, 4)
175176
for i := range pods {
176177
pod := pods[i]
178+
if enabled, _, _ := managed.IsPodManaged(pod); enabled {
179+
continue
180+
}
177181
qosClass := v1qos.GetPodQOS(pod)
178182
if qosClass != v1.PodQOSBurstable {
179183
// we only care about the burstable qos tier

pkg/kubelet/config/file.go

+11
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@ import (
3030
"k8s.io/apimachinery/pkg/types"
3131
"k8s.io/client-go/tools/cache"
3232
api "k8s.io/kubernetes/pkg/apis/core"
33+
"k8s.io/kubernetes/pkg/kubelet/managed"
3334
kubetypes "k8s.io/kubernetes/pkg/kubelet/types"
3435
utilio "k8s.io/utils/io"
3536
)
@@ -230,6 +231,16 @@ func (s *sourceFile) extractFromFile(filename string) (pod *v1.Pod, err error) {
230231
if podErr != nil {
231232
return pod, podErr
232233
}
234+
if managed.IsEnabled() {
235+
if newPod, _, err := managed.ModifyStaticPodForPinnedManagement(pod); err != nil {
236+
klog.V(2).Error(err, "Static Pod is managed but errored", "name", pod.ObjectMeta.Name, "namespace", pod.ObjectMeta.Namespace)
237+
} else if newPod != nil {
238+
klog.V(2).InfoS("Static Pod is managed. Using modified pod", "name", newPod.ObjectMeta.Name, "namespace", newPod.ObjectMeta.Namespace, "annotations", newPod.Annotations)
239+
pod = newPod
240+
} else {
241+
klog.V(2).InfoS("Static Pod is not managed", "name", pod.ObjectMeta.Name, "namespace", pod.ObjectMeta.Namespace)
242+
}
243+
}
233244
return pod, nil
234245
}
235246

pkg/kubelet/kubelet.go

+5
Original file line numberDiff line numberDiff line change
@@ -98,6 +98,7 @@ import (
9898
"k8s.io/kubernetes/pkg/kubelet/kuberuntime"
9999
"k8s.io/kubernetes/pkg/kubelet/lifecycle"
100100
"k8s.io/kubernetes/pkg/kubelet/logs"
101+
"k8s.io/kubernetes/pkg/kubelet/managed"
101102
"k8s.io/kubernetes/pkg/kubelet/metrics"
102103
"k8s.io/kubernetes/pkg/kubelet/metrics/collectors"
103104
"k8s.io/kubernetes/pkg/kubelet/network/dns"
@@ -704,6 +705,10 @@ func NewMainKubelet(kubeCfg *kubeletconfiginternal.KubeletConfiguration,
704705

705706
klet.runtimeService = kubeDeps.RemoteRuntimeService
706707

708+
if managed.IsEnabled() {
709+
klog.InfoS("Pinned Workload Management Enabled")
710+
}
711+
707712
if kubeDeps.KubeClient != nil {
708713
klet.runtimeClassManager = runtimeclass.NewManager(kubeDeps.KubeClient)
709714
}

pkg/kubelet/kubelet_node_status.go

+27
Original file line numberDiff line numberDiff line change
@@ -41,7 +41,9 @@ import (
4141
kubeletapis "k8s.io/kubelet/pkg/apis"
4242
v1helper "k8s.io/kubernetes/pkg/apis/core/v1/helper"
4343
"k8s.io/kubernetes/pkg/features"
44+
"k8s.io/kubernetes/pkg/kubelet/cadvisor"
4445
"k8s.io/kubernetes/pkg/kubelet/events"
46+
"k8s.io/kubernetes/pkg/kubelet/managed"
4547
"k8s.io/kubernetes/pkg/kubelet/nodestatus"
4648
taintutil "k8s.io/kubernetes/pkg/util/taints"
4749
volutil "k8s.io/kubernetes/pkg/volume/util"
@@ -130,6 +132,9 @@ func (kl *Kubelet) tryRegisterWithAPIServer(node *v1.Node) bool {
130132
requiresUpdate = kl.updateDefaultLabels(node, existingNode) || requiresUpdate
131133
requiresUpdate = kl.reconcileExtendedResource(node, existingNode) || requiresUpdate
132134
requiresUpdate = kl.reconcileHugePageResource(node, existingNode) || requiresUpdate
135+
if managed.IsEnabled() {
136+
requiresUpdate = kl.addManagementNodeCapacity(node, existingNode) || requiresUpdate
137+
}
133138
if requiresUpdate {
134139
if _, _, err := nodeutil.PatchNodeStatus(kl.kubeClient.CoreV1(), types.NodeName(kl.nodeName), originalNode, existingNode); err != nil {
135140
klog.ErrorS(err, "Unable to reconcile node with API server,error updating node", "node", klog.KObj(node))
@@ -140,6 +145,25 @@ func (kl *Kubelet) tryRegisterWithAPIServer(node *v1.Node) bool {
140145
return true
141146
}
142147

148+
// addManagementNodeCapacity adds the managednode capacity to the node
149+
func (kl *Kubelet) addManagementNodeCapacity(initialNode, existingNode *v1.Node) bool {
150+
updateDefaultResources(initialNode, existingNode)
151+
machineInfo, err := kl.cadvisor.MachineInfo()
152+
if err != nil {
153+
klog.Errorf("Unable to calculate managed node capacity for %q: %v", kl.nodeName, err)
154+
return false
155+
}
156+
cpuRequest := cadvisor.CapacityFromMachineInfo(machineInfo)[v1.ResourceCPU]
157+
cpuRequestInMilli := cpuRequest.MilliValue()
158+
newCPURequest := resource.NewMilliQuantity(cpuRequestInMilli*1000, cpuRequest.Format)
159+
managedResourceName := managed.GenerateResourceName("management")
160+
if existingCapacity, ok := existingNode.Status.Capacity[managedResourceName]; ok && existingCapacity.Equal(*newCPURequest) {
161+
return false
162+
}
163+
existingNode.Status.Capacity[managedResourceName] = *newCPURequest
164+
return true
165+
}
166+
143167
// reconcileHugePageResource will update huge page capacity for each page size and remove huge page sizes no longer supported
144168
func (kl *Kubelet) reconcileHugePageResource(initialNode, existingNode *v1.Node) bool {
145169
requiresUpdate := updateDefaultResources(initialNode, existingNode)
@@ -431,6 +455,9 @@ func (kl *Kubelet) initialNode(ctx context.Context) (*v1.Node, error) {
431455
}
432456
}
433457
}
458+
if managed.IsEnabled() {
459+
kl.addManagementNodeCapacity(node, node)
460+
}
434461

435462
kl.setNodeStatus(ctx, node)
436463

pkg/kubelet/managed/cpu_shares.go

+30
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,30 @@
1+
package managed
2+
3+
const (
4+
// These limits are defined in the kernel:
5+
// https://github.com/torvalds/linux/blob/0bddd227f3dc55975e2b8dfa7fc6f959b062a2c7/kernel/sched/sched.h#L427-L428
6+
MinShares = 2
7+
MaxShares = 262144
8+
9+
SharesPerCPU = 1024
10+
MilliCPUToCPU = 1000
11+
)
12+
13+
// MilliCPUToShares converts the milliCPU to CFS shares.
14+
func MilliCPUToShares(milliCPU int64) uint64 {
15+
if milliCPU == 0 {
16+
// Docker converts zero milliCPU to unset, which maps to kernel default
17+
// for unset: 1024. Return 2 here to really match kernel default for
18+
// zero milliCPU.
19+
return MinShares
20+
}
21+
// Conceptually (milliCPU / milliCPUToCPU) * sharesPerCPU, but factored to improve rounding.
22+
shares := (milliCPU * SharesPerCPU) / MilliCPUToCPU
23+
if shares < MinShares {
24+
return MinShares
25+
}
26+
if shares > MaxShares {
27+
return MaxShares
28+
}
29+
return uint64(shares)
30+
}

0 commit comments

Comments
 (0)