Skip to content

Commit 3c9380c

Browse files
authored
Memory manager support for Windows nodes (kubernetes#128560)
1 parent 8504758 commit 3c9380c

6 files changed

+412
-22
lines changed

pkg/kubelet/cm/container_manager_windows.go

+27-5
Original file line numberDiff line numberDiff line change
@@ -68,6 +68,7 @@ type containerManagerImpl struct {
6868
// Interface for Topology resource co-ordination
6969
topologyManager topologymanager.Manager
7070
cpuManager cpumanager.Manager
71+
memoryManager memorymanager.Manager
7172
nodeInfo *v1.Node
7273
sync.RWMutex
7374
}
@@ -95,12 +96,17 @@ func (cm *containerManagerImpl) Start(ctx context.Context, node *v1.Node,
9596

9697
containerMap, containerRunningSet := buildContainerMapAndRunningSetFromRuntime(ctx, runtimeService)
9798

98-
// Initialize CPU manager
9999
if utilfeature.DefaultFeatureGate.Enabled(kubefeatures.WindowsCPUAndMemoryAffinity) {
100100
err := cm.cpuManager.Start(cpumanager.ActivePodsFunc(activePods), sourcesReady, podStatusProvider, runtimeService, containerMap.Clone())
101101
if err != nil {
102102
return fmt.Errorf("start cpu manager error: %v", err)
103103
}
104+
105+
// Initialize memory manager
106+
err = cm.memoryManager.Start(memorymanager.ActivePodsFunc(activePods), sourcesReady, podStatusProvider, runtimeService, containerMap.Clone())
107+
if err != nil {
108+
return fmt.Errorf("start memory manager error: %v", err)
109+
}
104110
}
105111

106112
// Starts device manager.
@@ -128,6 +134,10 @@ func NewContainerManager(mountUtil mount.Interface, cadvisorInterface cadvisor.I
128134
cadvisorInterface: cadvisorInterface,
129135
}
130136

137+
cm.topologyManager = topologymanager.NewFakeManager()
138+
cm.cpuManager = cpumanager.NewFakeManager()
139+
cm.memoryManager = memorymanager.NewFakeManager()
140+
131141
if utilfeature.DefaultFeatureGate.Enabled(kubefeatures.WindowsCPUAndMemoryAffinity) {
132142
klog.InfoS("Creating topology manager")
133143
cm.topologyManager, err = topologymanager.NewManager(machineInfo.Topology,
@@ -155,9 +165,21 @@ func NewContainerManager(mountUtil mount.Interface, cadvisorInterface cadvisor.I
155165
return nil, err
156166
}
157167
cm.topologyManager.AddHintProvider(cm.cpuManager)
158-
} else {
159-
cm.topologyManager = topologymanager.NewFakeManager()
160-
cm.cpuManager = cpumanager.NewFakeManager()
168+
169+
klog.InfoS("Creating memory manager")
170+
cm.memoryManager, err = memorymanager.NewManager(
171+
nodeConfig.ExperimentalMemoryManagerPolicy,
172+
machineInfo,
173+
cm.GetNodeAllocatableReservation(),
174+
nodeConfig.ExperimentalMemoryManagerReservedMemory,
175+
nodeConfig.KubeletRootDir,
176+
cm.topologyManager,
177+
)
178+
if err != nil {
179+
klog.ErrorS(err, "Failed to initialize memory manager")
180+
return nil, err
181+
}
182+
cm.topologyManager.AddHintProvider(cm.memoryManager)
161183
}
162184

163185
klog.InfoS("Creating device plugin manager")
@@ -273,7 +295,7 @@ func (cm *containerManagerImpl) UpdatePluginResources(node *schedulerframework.N
273295
}
274296

275297
func (cm *containerManagerImpl) InternalContainerLifecycle() InternalContainerLifecycle {
276-
return &internalContainerLifecycleImpl{cm.cpuManager, memorymanager.NewFakeManager(), cm.topologyManager}
298+
return &internalContainerLifecycleImpl{cm.cpuManager, cm.memoryManager, cm.topologyManager}
277299
}
278300

279301
func (cm *containerManagerImpl) GetPodCgroupRoot() string {

pkg/kubelet/cm/internal_container_lifecycle_windows.go

+106-14
Original file line numberDiff line numberDiff line change
@@ -20,30 +20,122 @@ limitations under the License.
2020
package cm
2121

2222
import (
23-
"k8s.io/api/core/v1"
23+
"fmt"
24+
25+
v1 "k8s.io/api/core/v1"
26+
"k8s.io/apimachinery/pkg/util/sets"
2427
utilfeature "k8s.io/apiserver/pkg/util/feature"
2528
runtimeapi "k8s.io/cri-api/pkg/apis/runtime/v1"
2629
"k8s.io/klog/v2"
2730
kubefeatures "k8s.io/kubernetes/pkg/features"
2831
"k8s.io/kubernetes/pkg/kubelet/winstats"
32+
"k8s.io/utils/cpuset"
2933
)
3034

3135
func (i *internalContainerLifecycleImpl) PreCreateContainer(pod *v1.Pod, container *v1.Container, containerConfig *runtimeapi.ContainerConfig) error {
32-
if i.cpuManager != nil && utilfeature.DefaultFeatureGate.Enabled(kubefeatures.WindowsCPUAndMemoryAffinity) {
33-
allocatedCPUs := i.cpuManager.GetCPUAffinity(string(pod.UID), container.Name)
34-
if !allocatedCPUs.IsEmpty() {
35-
var cpuGroupAffinities []*runtimeapi.WindowsCpuGroupAffinity
36-
affinities := winstats.CpusToGroupAffinity(allocatedCPUs.List())
37-
for _, affinity := range affinities {
38-
klog.V(4).InfoS("Setting CPU affinity", "container", container.Name, "pod", pod.Name, "group", affinity.Group, "mask", affinity.MaskString(), "processorIds", affinity.Processors())
39-
cpuGroupAffinities = append(cpuGroupAffinities, &runtimeapi.WindowsCpuGroupAffinity{
40-
CpuGroup: uint32(affinity.Group),
41-
CpuMask: uint64(affinity.Mask),
42-
})
43-
}
36+
if !utilfeature.DefaultFeatureGate.Enabled(kubefeatures.WindowsCPUAndMemoryAffinity) {
37+
return nil
38+
}
39+
40+
klog.V(4).Info("PreCreateContainer for Windows")
41+
42+
// retrieve CPU and NUMA affinity from CPU Manager and Memory Manager (if enabled)
43+
var allocatedCPUs cpuset.CPUSet
44+
if i.cpuManager != nil {
45+
allocatedCPUs = i.cpuManager.GetCPUAffinity(string(pod.UID), container.Name)
46+
}
47+
48+
var numaNodes sets.Set[int]
49+
if i.memoryManager != nil {
50+
numaNodes = i.memoryManager.GetMemoryNUMANodes(pod, container)
51+
}
4452

45-
containerConfig.Windows.Resources.AffinityCpus = cpuGroupAffinities
53+
// Gather all CPUs associated with the selected NUMA nodes
54+
var allNumaNodeCPUs []winstats.GroupAffinity
55+
for _, numaNode := range sets.List(numaNodes) {
56+
affinity, err := winstats.GetCPUsforNUMANode(uint16(numaNode))
57+
if err != nil {
58+
return fmt.Errorf("failed to get CPUs for NUMA node %d: %v", numaNode, err)
4659
}
60+
allNumaNodeCPUs = append(allNumaNodeCPUs, *affinity)
4761
}
62+
63+
var finalCPUSet = computeFinalCpuSet(allocatedCPUs, allNumaNodeCPUs)
64+
65+
klog.V(4).InfoS("Setting CPU affinity", "affinity", finalCPUSet, "container", container.Name, "pod", pod.UID)
66+
67+
// Set CPU group affinities in the container config
68+
if finalCPUSet != nil {
69+
var cpusToGroupAffinities []*runtimeapi.WindowsCpuGroupAffinity
70+
for group, mask := range groupMasks(finalCPUSet) {
71+
72+
cpusToGroupAffinities = append(cpusToGroupAffinities, &runtimeapi.WindowsCpuGroupAffinity{
73+
CpuGroup: uint32(group),
74+
CpuMask: uint64(mask),
75+
})
76+
}
77+
containerConfig.Windows.Resources.AffinityCpus = cpusToGroupAffinities
78+
}
79+
80+
// return nil if no CPUs were selected
4881
return nil
4982
}
83+
84+
// computeFinalCpuSet determines the final set of CPUs to use based on the CPU and memory managers
85+
// and is extracted so that it can be tested
86+
func computeFinalCpuSet(allocatedCPUs cpuset.CPUSet, allNumaNodeCPUs []winstats.GroupAffinity) sets.Set[int] {
87+
if !allocatedCPUs.IsEmpty() && len(allNumaNodeCPUs) > 0 {
88+
// Both CPU and memory managers are enabled
89+
90+
numaNodeAffinityCPUSet := computeCPUSet(allNumaNodeCPUs)
91+
cpuManagerAffinityCPUSet := sets.New[int](allocatedCPUs.List()...)
92+
93+
// Determine which set of CPUs to use using the following logic outlined in the KEP:
94+
// Case 1: CPU manager selects more CPUs than those available in the NUMA nodes selected by the memory manager
95+
// Case 2: CPU manager selects fewer CPUs, and they all fall within the CPUs available in the NUMA nodes selected by the memory manager
96+
// Case 3: CPU manager selects fewer CPUs, but some are outside of the CPUs available in the NUMA nodes selected by the memory manager
97+
98+
if cpuManagerAffinityCPUSet.Len() > numaNodeAffinityCPUSet.Len() {
99+
// Case 1, use CPU manager selected CPUs
100+
return cpuManagerAffinityCPUSet
101+
} else if numaNodeAffinityCPUSet.IsSuperset(cpuManagerAffinityCPUSet) {
102+
// case 2, use CPU manager selected CPUstry
103+
return cpuManagerAffinityCPUSet
104+
} else {
105+
// Case 3, merge CPU manager and memory manager selected CPUs
106+
return cpuManagerAffinityCPUSet.Union(numaNodeAffinityCPUSet)
107+
}
108+
} else if !allocatedCPUs.IsEmpty() {
109+
// Only CPU manager is enabled, use CPU manager selected CPUs
110+
return sets.New[int](allocatedCPUs.List()...)
111+
} else if len(allNumaNodeCPUs) > 0 {
112+
// Only memory manager is enabled, use CPUs associated with selected NUMA nodes
113+
return computeCPUSet(allNumaNodeCPUs)
114+
}
115+
return nil
116+
}
117+
118+
// computeCPUSet converts a list of GroupAffinity to a set of CPU IDs
119+
func computeCPUSet(affinities []winstats.GroupAffinity) sets.Set[int] {
120+
cpuSet := sets.New[int]()
121+
for _, affinity := range affinities {
122+
for i := 0; i < 64; i++ {
123+
if (affinity.Mask>>i)&1 == 1 {
124+
cpuID := int(affinity.Group)*64 + i
125+
cpuSet.Insert(cpuID)
126+
}
127+
}
128+
}
129+
return cpuSet
130+
}
131+
132+
// groupMasks converts a set of CPU IDs into group and mask representations
133+
func groupMasks(cpuSet sets.Set[int]) map[int]uint64 {
134+
groupMasks := make(map[int]uint64)
135+
for cpu := range cpuSet {
136+
group := cpu / 64
137+
mask := uint64(1) << (cpu % 64)
138+
groupMasks[group] |= mask
139+
}
140+
return groupMasks
141+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,160 @@
1+
//go:build windows
2+
// +build windows
3+
4+
/*
5+
Copyright 2024 The Kubernetes Authors.
6+
7+
Licensed under the Apache License, Version 2.0 (the "License");
8+
you may not use this file except in compliance with the License.
9+
You may obtain a copy of the License at
10+
11+
http://www.apache.org/licenses/LICENSE-2.0
12+
13+
Unless required by applicable law or agreed to in writing, software
14+
distributed under the License is distributed on an "AS IS" BASIS,
15+
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16+
See the License for the specific language governing permissions and
17+
limitations under the License.
18+
*/
19+
20+
package cm
21+
22+
import (
23+
"testing"
24+
25+
"k8s.io/apimachinery/pkg/util/sets"
26+
"k8s.io/kubernetes/pkg/kubelet/winstats"
27+
"k8s.io/utils/cpuset"
28+
)
29+
30+
func TestComputeCPUSet(t *testing.T) {
31+
affinities := []winstats.GroupAffinity{
32+
{Mask: 0b1010, Group: 0}, // CPUs 1 and 3 in Group 0
33+
{Mask: 0b1001, Group: 1}, // CPUs 0 and 3 in Group 1
34+
}
35+
36+
expected := map[int]struct{}{
37+
1: {}, // Group 0, CPU 1
38+
3: {}, // Group 0, CPU 3
39+
64: {}, // Group 1, CPU 0
40+
67: {}, // Group 1, CPU 3
41+
}
42+
43+
result := computeCPUSet(affinities)
44+
if len(result) != len(expected) {
45+
t.Errorf("expected length %v, but got length %v", len(expected), len(result))
46+
}
47+
for key := range expected {
48+
if _, exists := result[key]; !exists {
49+
t.Errorf("expected key %v to be in result", key)
50+
}
51+
}
52+
}
53+
54+
func TestGroupMasks(t *testing.T) {
55+
tests := []struct {
56+
cpuSet sets.Set[int]
57+
expected map[int]uint64
58+
}{
59+
{
60+
cpuSet: sets.New[int](0, 1, 2, 3, 64, 65, 66, 67),
61+
expected: map[int]uint64{
62+
0: 0b1111,
63+
1: 0b1111,
64+
},
65+
},
66+
{
67+
cpuSet: sets.New[int](0, 2, 64, 66),
68+
expected: map[int]uint64{
69+
0: 0b0101,
70+
1: 0b0101,
71+
},
72+
},
73+
{
74+
cpuSet: sets.New[int](1, 65),
75+
expected: map[int]uint64{
76+
0: 0b0010,
77+
1: 0b0010,
78+
},
79+
},
80+
{
81+
cpuSet: sets.New[int](),
82+
expected: map[int]uint64{},
83+
},
84+
}
85+
86+
for _, test := range tests {
87+
result := groupMasks(test.cpuSet)
88+
if len(result) != len(test.expected) {
89+
t.Errorf("expected length %v, but got length %v", len(test.expected), len(result))
90+
}
91+
for group, mask := range test.expected {
92+
if result[group] != mask {
93+
t.Errorf("expected group %v to have mask %v, but got mask %v", group, mask, result[group])
94+
}
95+
}
96+
}
97+
}
98+
99+
func TestComputeFinalCpuSet(t *testing.T) {
100+
tests := []struct {
101+
name string
102+
allocatedCPUs cpuset.CPUSet
103+
allNumaNodeCPUs []winstats.GroupAffinity
104+
expectedCPUSet sets.Set[int]
105+
}{
106+
{
107+
name: "Both managers enabled, CPU manager selects more CPUs",
108+
allocatedCPUs: cpuset.New(0, 1, 2, 3),
109+
allNumaNodeCPUs: []winstats.GroupAffinity{
110+
{Mask: 0b0011, Group: 0}, // CPUs 0 and 1 in Group 0
111+
},
112+
expectedCPUSet: sets.New[int](0, 1, 2, 3),
113+
},
114+
{
115+
name: "Both managers enabled, CPU manager selects fewer CPUs within NUMA nodes",
116+
allocatedCPUs: cpuset.New(0, 1),
117+
allNumaNodeCPUs: []winstats.GroupAffinity{
118+
{Mask: 0b1111, Group: 0}, // CPUs 0, 1, 2, 3 in Group 0
119+
},
120+
expectedCPUSet: sets.New[int](0, 1),
121+
},
122+
{
123+
name: "Both managers enabled, CPU manager selects fewer CPUs outside NUMA nodes",
124+
allocatedCPUs: cpuset.New(0, 1),
125+
allNumaNodeCPUs: []winstats.GroupAffinity{
126+
{Mask: 0b1100, Group: 0}, // CPUs 2 and 3 in Group 0
127+
},
128+
expectedCPUSet: sets.New[int](0, 1, 2, 3),
129+
},
130+
{
131+
name: "Only CPU manager enabled",
132+
allocatedCPUs: cpuset.New(0, 1),
133+
allNumaNodeCPUs: nil,
134+
expectedCPUSet: sets.New[int](0, 1),
135+
},
136+
{
137+
name: "Only memory manager enabled",
138+
allocatedCPUs: cpuset.New(),
139+
allNumaNodeCPUs: []winstats.GroupAffinity{
140+
{Mask: 0b1100, Group: 0}, // CPUs 2 and 3 in Group 0
141+
},
142+
expectedCPUSet: sets.New[int](2, 3),
143+
},
144+
{
145+
name: "Neither manager enabled",
146+
allocatedCPUs: cpuset.New(),
147+
allNumaNodeCPUs: nil,
148+
expectedCPUSet: nil,
149+
},
150+
}
151+
152+
for _, test := range tests {
153+
t.Run(test.name, func(t *testing.T) {
154+
result := computeFinalCpuSet(test.allocatedCPUs, test.allNumaNodeCPUs)
155+
if !result.Equal(test.expectedCPUSet) {
156+
t.Errorf("expected %v, but got %v", test.expectedCPUSet, result)
157+
}
158+
})
159+
}
160+
}

0 commit comments

Comments
 (0)