Skip to content

Commit dd4ac15

Browse files
rphillipsbertinatto
authored andcommitted
UPSTREAM: <carry>: add management support to kubelet
UPSTREAM: <carry>: management workloads enhancement 741 UPSTREAM: <carry>: lower verbosity of managed workloads logging Support for managed workloads was introduced by PR#627. However, the the CPU manager reconcile loop now seems to flood kubelet log with "reconcileState: skipping pod; pod is managed" warnings. Lower the verbosity of these log messages. UPSTREAM: <carry>: set correctly static pods CPUs when workload partitioning is disabled UPSTREAM: <carry>: Remove reserved CPUs from default set Remove reserved CPUs from default set when workload partitioning is enabled. Co-Authored-By: Brent Rowsell <[email protected]> Signed-off-by: Artyom Lukianov <[email protected]> Signed-off-by: Don Penney <[email protected]> OpenShift-Rebase-Source: b762ced OpenShift-Rebase-Source: 63cf793 OpenShift-Rebase-Source: 32af64c UPSTREAM: <carry>: add management support to kubelet UPSTREAM: <carry>: OCPBUGS-29520: fix cpu manager default cpuset check in workload partitioned env (this can be squashed to 04070bb UPSTREAM: : add management support to kubelet) Workload partitioning makes the separation between reserved and workload cpus more strict. It is therefore expected the reserved cpus are NOT part of the default cpuset and the existing check was overzealous. First execution of kubelet after reboot never gets here as the cpuset is computed on line 209. However a kubelet restart without reboot skips this code, recovers from state file and runs the check on line 220. This was uncovered by decoupling the cpu manager state file cleanup from kubelet restart, doing it only once at reboot as part of OCPBUGS-24366 UPSTREAM: <carry>: add management workload check for guaranteed qos when static pods have workload partitioning enabled we should not alter their resources if they are Guaranteed QoS, this change adds a check for Guaranteed QoS Signed-off-by: ehila <[email protected]> test: add unit tests for error states Signed-off-by: ehila <[email protected]>
1 parent 9998c74 commit dd4ac15

10 files changed

+1017
-16
lines changed

pkg/kubelet/cm/cpumanager/cpu_manager.go

+6
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,7 @@ import (
3535
"k8s.io/kubernetes/pkg/kubelet/cm/topologymanager"
3636
"k8s.io/kubernetes/pkg/kubelet/config"
3737
kubecontainer "k8s.io/kubernetes/pkg/kubelet/container"
38+
"k8s.io/kubernetes/pkg/kubelet/managed"
3839
"k8s.io/kubernetes/pkg/kubelet/status"
3940
"k8s.io/utils/cpuset"
4041
)
@@ -409,13 +410,18 @@ func (m *manager) reconcileState() (success []reconciledContainer, failure []rec
409410
failure = []reconciledContainer{}
410411

411412
m.removeStaleState()
413+
workloadEnabled := managed.IsEnabled()
412414
for _, pod := range m.activePods() {
413415
pstatus, ok := m.podStatusProvider.GetPodStatus(pod.UID)
414416
if !ok {
415417
klog.V(5).InfoS("ReconcileState: skipping pod; status not found", "pod", klog.KObj(pod))
416418
failure = append(failure, reconciledContainer{pod.Name, "", ""})
417419
continue
418420
}
421+
if enabled, _, _ := managed.IsPodManaged(pod); workloadEnabled && enabled {
422+
klog.V(4).InfoS("[cpumanager] reconcileState: skipping pod; pod is managed (pod: %s)", pod.Name)
423+
continue
424+
}
419425

420426
allContainers := pod.Spec.InitContainers
421427
allContainers = append(allContainers, pod.Spec.Containers...)

pkg/kubelet/cm/cpumanager/policy_static.go

+18-4
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@ import (
2929
"k8s.io/kubernetes/pkg/kubelet/cm/cpumanager/topology"
3030
"k8s.io/kubernetes/pkg/kubelet/cm/topologymanager"
3131
"k8s.io/kubernetes/pkg/kubelet/cm/topologymanager/bitmask"
32+
"k8s.io/kubernetes/pkg/kubelet/managed"
3233
"k8s.io/kubernetes/pkg/kubelet/metrics"
3334
"k8s.io/utils/cpuset"
3435
)
@@ -214,6 +215,10 @@ func (p *staticPolicy) validateState(s state.State) error {
214215
// state is empty initialize
215216
s.SetDefaultCPUSet(allCPUs)
216217
klog.InfoS("Static policy initialized", "defaultCPUSet", allCPUs)
218+
if managed.IsEnabled() {
219+
defaultCpus := s.GetDefaultCPUSet().Difference(p.reservedCPUs)
220+
s.SetDefaultCPUSet(defaultCpus)
221+
}
217222
return nil
218223
}
219224

@@ -227,7 +232,9 @@ func (p *staticPolicy) validateState(s state.State) error {
227232
p.reservedCPUs.Intersection(tmpDefaultCPUset).String(), tmpDefaultCPUset.String())
228233
}
229234
} else {
230-
if !p.reservedCPUs.Intersection(tmpDefaultCPUset).Equals(p.reservedCPUs) {
235+
// 2. This only applies when managed mode is disabled. Active workload partitioning feature
236+
// removes the reserved cpus from the default cpu mask on purpose.
237+
if !managed.IsEnabled() && !p.reservedCPUs.Intersection(tmpDefaultCPUset).Equals(p.reservedCPUs) {
231238
return fmt.Errorf("not all reserved cpus: \"%s\" are present in defaultCpuSet: \"%s\"",
232239
p.reservedCPUs.String(), tmpDefaultCPUset.String())
233240
}
@@ -259,10 +266,17 @@ func (p *staticPolicy) validateState(s state.State) error {
259266
}
260267
}
261268
totalKnownCPUs = totalKnownCPUs.Union(tmpCPUSets...)
262-
if !totalKnownCPUs.Equals(allCPUs) {
263-
return fmt.Errorf("current set of available CPUs \"%s\" doesn't match with CPUs in state \"%s\"",
264-
allCPUs.String(), totalKnownCPUs.String())
269+
availableCPUs := p.topology.CPUDetails.CPUs()
265270

271+
// CPU (workload) partitioning removes reserved cpus
272+
// from the default mask intentionally
273+
if managed.IsEnabled() {
274+
availableCPUs = availableCPUs.Difference(p.reservedCPUs)
275+
}
276+
277+
if !totalKnownCPUs.Equals(availableCPUs) {
278+
return fmt.Errorf("current set of available CPUs \"%s\" doesn't match with CPUs in state \"%s\"",
279+
availableCPUs.String(), totalKnownCPUs.String())
266280
}
267281

268282
return nil

pkg/kubelet/cm/cpumanager/policy_static_test.go

+37-12
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,8 @@ import (
2121
"reflect"
2222
"testing"
2323

24+
"k8s.io/kubernetes/pkg/kubelet/managed"
25+
2426
v1 "k8s.io/api/core/v1"
2527
utilfeature "k8s.io/apiserver/pkg/util/feature"
2628
featuregatetesting "k8s.io/component-base/featuregate/testing"
@@ -961,18 +963,19 @@ func TestTopologyAwareAllocateCPUs(t *testing.T) {
961963
// above test cases are without kubelet --reserved-cpus cmd option
962964
// the following tests are with --reserved-cpus configured
963965
type staticPolicyTestWithResvList struct {
964-
description string
965-
topo *topology.CPUTopology
966-
numReservedCPUs int
967-
reserved cpuset.CPUSet
968-
cpuPolicyOptions map[string]string
969-
stAssignments state.ContainerCPUAssignments
970-
stDefaultCPUSet cpuset.CPUSet
971-
pod *v1.Pod
972-
expErr error
973-
expNewErr error
974-
expCPUAlloc bool
975-
expCSet cpuset.CPUSet
966+
description string
967+
topo *topology.CPUTopology
968+
numReservedCPUs int
969+
reserved cpuset.CPUSet
970+
cpuPolicyOptions map[string]string
971+
stAssignments state.ContainerCPUAssignments
972+
stDefaultCPUSet cpuset.CPUSet
973+
pod *v1.Pod
974+
expErr error
975+
expNewErr error
976+
expCPUAlloc bool
977+
expCSet cpuset.CPUSet
978+
managementPartition bool
976979
}
977980

978981
func TestStaticPolicyStartWithResvList(t *testing.T) {
@@ -1024,9 +1027,31 @@ func TestStaticPolicyStartWithResvList(t *testing.T) {
10241027
stDefaultCPUSet: cpuset.New(0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11),
10251028
expNewErr: fmt.Errorf("[cpumanager] unable to reserve the required amount of CPUs (size of 0-1 did not equal 1)"),
10261029
},
1030+
{
1031+
description: "reserved cores 0 & 6 are not present in available cpuset when management partitioning is enabled",
1032+
topo: topoDualSocketHT,
1033+
numReservedCPUs: 2,
1034+
stAssignments: state.ContainerCPUAssignments{},
1035+
managementPartition: true,
1036+
expCSet: cpuset.New(1, 2, 3, 4, 5, 7, 8, 9, 10, 11),
1037+
},
1038+
{
1039+
description: "reserved cores 0 & 6 are not present in available cpuset when management partitioning is enabled during recovery",
1040+
topo: topoDualSocketHT,
1041+
numReservedCPUs: 2,
1042+
stAssignments: state.ContainerCPUAssignments{},
1043+
stDefaultCPUSet: cpuset.New(1, 2, 3, 4, 5, 7, 8, 9, 10, 11),
1044+
managementPartition: true,
1045+
expCSet: cpuset.New(1, 2, 3, 4, 5, 7, 8, 9, 10, 11),
1046+
},
10271047
}
10281048
for _, testCase := range testCases {
10291049
t.Run(testCase.description, func(t *testing.T) {
1050+
wasManaged := managed.IsEnabled()
1051+
managed.TestOnlySetEnabled(testCase.managementPartition)
1052+
defer func() {
1053+
managed.TestOnlySetEnabled(wasManaged)
1054+
}()
10301055
featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, pkgfeatures.CPUManagerPolicyAlphaOptions, true)
10311056
p, err := NewStaticPolicy(testCase.topo, testCase.numReservedCPUs, testCase.reserved, topologymanager.NewFakeManager(), testCase.cpuPolicyOptions)
10321057
if !reflect.DeepEqual(err, testCase.expNewErr) {

pkg/kubelet/cm/qos_container_manager_linux.go

+4
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,7 @@ import (
3535
"k8s.io/component-helpers/resource"
3636
v1qos "k8s.io/kubernetes/pkg/apis/core/v1/helper/qos"
3737
kubefeatures "k8s.io/kubernetes/pkg/features"
38+
"k8s.io/kubernetes/pkg/kubelet/managed"
3839
)
3940

4041
const (
@@ -174,6 +175,9 @@ func (m *qosContainerManagerImpl) setCPUCgroupConfig(configs map[v1.PodQOSClass]
174175
reuseReqs := make(v1.ResourceList, 4)
175176
for i := range pods {
176177
pod := pods[i]
178+
if enabled, _, _ := managed.IsPodManaged(pod); enabled {
179+
continue
180+
}
177181
qosClass := v1qos.GetPodQOS(pod)
178182
if qosClass != v1.PodQOSBurstable {
179183
// we only care about the burstable qos tier

pkg/kubelet/config/file.go

+11
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@ import (
3030
"k8s.io/apimachinery/pkg/types"
3131
"k8s.io/client-go/tools/cache"
3232
api "k8s.io/kubernetes/pkg/apis/core"
33+
"k8s.io/kubernetes/pkg/kubelet/managed"
3334
kubetypes "k8s.io/kubernetes/pkg/kubelet/types"
3435
utilio "k8s.io/utils/io"
3536
)
@@ -230,6 +231,16 @@ func (s *sourceFile) extractFromFile(filename string) (pod *v1.Pod, err error) {
230231
if podErr != nil {
231232
return pod, podErr
232233
}
234+
if managed.IsEnabled() {
235+
if newPod, _, err := managed.ModifyStaticPodForPinnedManagement(pod); err != nil {
236+
klog.V(2).Error(err, "Static Pod is managed but errored", "name", pod.ObjectMeta.Name, "namespace", pod.ObjectMeta.Namespace)
237+
} else if newPod != nil {
238+
klog.V(2).InfoS("Static Pod is managed. Using modified pod", "name", newPod.ObjectMeta.Name, "namespace", newPod.ObjectMeta.Namespace, "annotations", newPod.Annotations)
239+
pod = newPod
240+
} else {
241+
klog.V(2).InfoS("Static Pod is not managed", "name", pod.ObjectMeta.Name, "namespace", pod.ObjectMeta.Namespace)
242+
}
243+
}
233244
return pod, nil
234245
}
235246

pkg/kubelet/kubelet.go

+5
Original file line numberDiff line numberDiff line change
@@ -95,6 +95,7 @@ import (
9595
"k8s.io/kubernetes/pkg/kubelet/kuberuntime"
9696
"k8s.io/kubernetes/pkg/kubelet/lifecycle"
9797
"k8s.io/kubernetes/pkg/kubelet/logs"
98+
"k8s.io/kubernetes/pkg/kubelet/managed"
9899
"k8s.io/kubernetes/pkg/kubelet/metrics"
99100
"k8s.io/kubernetes/pkg/kubelet/metrics/collectors"
100101
"k8s.io/kubernetes/pkg/kubelet/network/dns"
@@ -668,6 +669,10 @@ func NewMainKubelet(kubeCfg *kubeletconfiginternal.KubeletConfiguration,
668669

669670
klet.runtimeService = kubeDeps.RemoteRuntimeService
670671

672+
if managed.IsEnabled() {
673+
klog.InfoS("Pinned Workload Management Enabled")
674+
}
675+
671676
if kubeDeps.KubeClient != nil {
672677
klet.runtimeClassManager = runtimeclass.NewManager(kubeDeps.KubeClient)
673678
}

pkg/kubelet/kubelet_node_status.go

+27
Original file line numberDiff line numberDiff line change
@@ -42,7 +42,9 @@ import (
4242
kubeletapis "k8s.io/kubelet/pkg/apis"
4343
v1helper "k8s.io/kubernetes/pkg/apis/core/v1/helper"
4444
"k8s.io/kubernetes/pkg/features"
45+
"k8s.io/kubernetes/pkg/kubelet/cadvisor"
4546
"k8s.io/kubernetes/pkg/kubelet/events"
47+
"k8s.io/kubernetes/pkg/kubelet/managed"
4648
"k8s.io/kubernetes/pkg/kubelet/nodestatus"
4749
taintutil "k8s.io/kubernetes/pkg/util/taints"
4850
volutil "k8s.io/kubernetes/pkg/volume/util"
@@ -131,6 +133,9 @@ func (kl *Kubelet) tryRegisterWithAPIServer(node *v1.Node) bool {
131133
requiresUpdate = kl.updateDefaultLabels(node, existingNode) || requiresUpdate
132134
requiresUpdate = kl.reconcileExtendedResource(node, existingNode) || requiresUpdate
133135
requiresUpdate = kl.reconcileHugePageResource(node, existingNode) || requiresUpdate
136+
if managed.IsEnabled() {
137+
requiresUpdate = kl.addManagementNodeCapacity(node, existingNode) || requiresUpdate
138+
}
134139
if requiresUpdate {
135140
if _, _, err := nodeutil.PatchNodeStatus(kl.kubeClient.CoreV1(), types.NodeName(kl.nodeName), originalNode, existingNode); err != nil {
136141
klog.ErrorS(err, "Unable to reconcile node with API server,error updating node", "node", klog.KObj(node))
@@ -141,6 +146,25 @@ func (kl *Kubelet) tryRegisterWithAPIServer(node *v1.Node) bool {
141146
return true
142147
}
143148

149+
// addManagementNodeCapacity adds the managednode capacity to the node
150+
func (kl *Kubelet) addManagementNodeCapacity(initialNode, existingNode *v1.Node) bool {
151+
updateDefaultResources(initialNode, existingNode)
152+
machineInfo, err := kl.cadvisor.MachineInfo()
153+
if err != nil {
154+
klog.Errorf("Unable to calculate managed node capacity for %q: %v", kl.nodeName, err)
155+
return false
156+
}
157+
cpuRequest := cadvisor.CapacityFromMachineInfo(machineInfo)[v1.ResourceCPU]
158+
cpuRequestInMilli := cpuRequest.MilliValue()
159+
newCPURequest := resource.NewMilliQuantity(cpuRequestInMilli*1000, cpuRequest.Format)
160+
managedResourceName := managed.GenerateResourceName("management")
161+
if existingCapacity, ok := existingNode.Status.Capacity[managedResourceName]; ok && existingCapacity.Equal(*newCPURequest) {
162+
return false
163+
}
164+
existingNode.Status.Capacity[managedResourceName] = *newCPURequest
165+
return true
166+
}
167+
144168
// reconcileHugePageResource will update huge page capacity for each page size and remove huge page sizes no longer supported
145169
func (kl *Kubelet) reconcileHugePageResource(initialNode, existingNode *v1.Node) bool {
146170
requiresUpdate := updateDefaultResources(initialNode, existingNode)
@@ -432,6 +456,9 @@ func (kl *Kubelet) initialNode(ctx context.Context) (*v1.Node, error) {
432456
}
433457
}
434458
}
459+
if managed.IsEnabled() {
460+
kl.addManagementNodeCapacity(node, node)
461+
}
435462

436463
kl.setNodeStatus(ctx, node)
437464

pkg/kubelet/managed/cpu_shares.go

+30
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,30 @@
1+
package managed
2+
3+
const (
4+
// These limits are defined in the kernel:
5+
// https://github.com/torvalds/linux/blob/0bddd227f3dc55975e2b8dfa7fc6f959b062a2c7/kernel/sched/sched.h#L427-L428
6+
MinShares = 2
7+
MaxShares = 262144
8+
9+
SharesPerCPU = 1024
10+
MilliCPUToCPU = 1000
11+
)
12+
13+
// MilliCPUToShares converts the milliCPU to CFS shares.
14+
func MilliCPUToShares(milliCPU int64) uint64 {
15+
if milliCPU == 0 {
16+
// Docker converts zero milliCPU to unset, which maps to kernel default
17+
// for unset: 1024. Return 2 here to really match kernel default for
18+
// zero milliCPU.
19+
return MinShares
20+
}
21+
// Conceptually (milliCPU / milliCPUToCPU) * sharesPerCPU, but factored to improve rounding.
22+
shares := (milliCPU * SharesPerCPU) / MilliCPUToCPU
23+
if shares < MinShares {
24+
return MinShares
25+
}
26+
if shares > MaxShares {
27+
return MaxShares
28+
}
29+
return uint64(shares)
30+
}

0 commit comments

Comments
 (0)