Skip to content

Commit 46dc787

Browse files
sphrasavathffromani
authored andcommitted
Add takeByTopologyUnCoreCachePacked if policy option align-cpus-by-uncorecache is enabled.
Adding new function to evaluate uncore cache id. Reverse allocation logic. Implement preferAlignByUncorecache within TakeByTopologyNUMAPacked, along with new test cases. (cherry picked from commit 3f459f2)
1 parent 5865c5b commit 46dc787

9 files changed

+1253
-350
lines changed

pkg/kubelet/cm/cpumanager/cpu_assignment.go

+103-8
Original file line numberDiff line numberDiff line change
@@ -125,6 +125,17 @@ func (n *numaFirst) takeFullSecondLevel() {
125125
n.acc.takeFullSockets()
126126
}
127127

128+
// Sort the UncoreCaches within the NUMA nodes.
129+
func (a *cpuAccumulator) sortAvailableUncoreCaches() []int {
130+
var result []int
131+
for _, numa := range a.sortAvailableNUMANodes() {
132+
uncore := a.details.UncoreInNUMANodes(numa).UnsortedList()
133+
a.sort(uncore, a.details.CPUsInUncoreCaches)
134+
result = append(result, uncore...)
135+
}
136+
return result
137+
}
138+
128139
// If NUMA nodes are higher in the memory hierarchy than sockets, then just
129140
// sort the NUMA nodes directly, and return them.
130141
func (n *numaFirst) sortAvailableNUMANodes() []int {
@@ -238,7 +249,14 @@ func (a *cpuAccumulator) isSocketFree(socketID int) bool {
238249
return a.details.CPUsInSockets(socketID).Size() == a.topo.CPUsPerSocket()
239250
}
240251

241-
// Returns true if the supplied core is fully available in `topoDetails`.
252+
// Returns true if the supplied UnCoreCache is fully available,
253+
// "fully available" means that all the CPUs in it are free.
254+
func (a *cpuAccumulator) isUncoreCacheFree(uncoreID int) bool {
255+
return a.details.CPUsInUncoreCaches(uncoreID).Size() == a.topo.CPUDetails.CPUsInUncoreCaches(uncoreID).Size()
256+
}
257+
258+
// Returns true if the supplied core is fully available in `a.details`.
259+
// "fully available" means that all the CPUs in it are free.
242260
func (a *cpuAccumulator) isCoreFree(coreID int) bool {
243261
return a.details.CPUsInCores(coreID).Size() == a.topo.CPUsPerCore()
244262
}
@@ -265,6 +283,17 @@ func (a *cpuAccumulator) freeSockets() []int {
265283
return free
266284
}
267285

286+
// Returns free UncoreCache IDs as a slice sorted by sortAvailableUnCoreCache().
287+
func (a *cpuAccumulator) freeUncoreCache() []int {
288+
free := []int{}
289+
for _, uncore := range a.sortAvailableUncoreCaches() {
290+
if a.isUncoreCacheFree(uncore) {
291+
free = append(free, uncore)
292+
}
293+
}
294+
return free
295+
}
296+
268297
// Returns free core IDs as a slice sorted by sortAvailableCores().
269298
func (a *cpuAccumulator) freeCores() []int {
270299
free := []int{}
@@ -358,6 +387,60 @@ func (a *cpuAccumulator) takeFullSockets() {
358387
}
359388
}
360389

390+
func (a *cpuAccumulator) takeFullUncore() {
391+
for _, uncore := range a.freeUncoreCache() {
392+
cpusInUncore := a.topo.CPUDetails.CPUsInUncoreCaches(uncore)
393+
if !a.needsAtLeast(cpusInUncore.Size()) {
394+
continue
395+
}
396+
klog.V(4).InfoS("takeFullUncore: claiming uncore", "uncore", uncore)
397+
a.take(cpusInUncore)
398+
}
399+
}
400+
401+
func (a *cpuAccumulator) takePartialUncore(uncoreID int) {
402+
numCoresNeeded := a.numCPUsNeeded / a.topo.CPUsPerCore()
403+
404+
// note: we need to keep the first N free cores (physical cpus) and only we we got these expand to their
405+
// cpus (virtual cpus). Taking directly the first M cpus (virtual cpus) leads to suboptimal allocation
406+
freeCores := a.details.CoresNeededInUncoreCache(numCoresNeeded, uncoreID)
407+
freeCPUs := a.details.CPUsInCores(freeCores.UnsortedList()...)
408+
409+
claimed := (a.numCPUsNeeded == freeCPUs.Size())
410+
klog.V(4).InfoS("takePartialUncore: trying to claim partial uncore",
411+
"uncore", uncoreID,
412+
"claimed", claimed,
413+
"needed", a.numCPUsNeeded,
414+
"cores", freeCores.String(),
415+
"cpus", freeCPUs.String())
416+
if !claimed {
417+
return
418+
419+
}
420+
a.take(freeCPUs)
421+
}
422+
423+
// First try to take full UncoreCache, if available and need is at least the size of the UncoreCache group.
424+
// Second try to take the partial UncoreCache if available and the request size can fit w/in the UncoreCache.
425+
func (a *cpuAccumulator) takeUncoreCache() {
426+
numCPUsInUncore := a.topo.CPUsPerUncore()
427+
for _, uncore := range a.sortAvailableUncoreCaches() {
428+
// take full UncoreCache if the CPUs needed is greater than free UncoreCache size
429+
if a.needsAtLeast(numCPUsInUncore) {
430+
a.takeFullUncore()
431+
}
432+
433+
if a.isSatisfied() {
434+
return
435+
}
436+
437+
a.takePartialUncore(uncore)
438+
if a.isSatisfied() {
439+
return
440+
}
441+
}
442+
}
443+
361444
func (a *cpuAccumulator) takeFullCores() {
362445
for _, core := range a.freeCores() {
363446
cpusInCore := a.topo.CPUDetails.CPUsInCores(core)
@@ -447,7 +530,7 @@ func (a *cpuAccumulator) iterateCombinations(n []int, k int, f func([]int) LoopC
447530
helper(n, k, 0, []int{}, f)
448531
}
449532

450-
func takeByTopologyNUMAPacked(topo *topology.CPUTopology, availableCPUs cpuset.CPUSet, numCPUs int) (cpuset.CPUSet, error) {
533+
func takeByTopologyNUMAPacked(topo *topology.CPUTopology, availableCPUs cpuset.CPUSet, numCPUs int, preferAlignByUncoreCache bool) (cpuset.CPUSet, error) {
451534
acc := newCPUAccumulator(topo, availableCPUs, numCPUs)
452535
if acc.isSatisfied() {
453536
return acc.result, nil
@@ -470,14 +553,24 @@ func takeByTopologyNUMAPacked(topo *topology.CPUTopology, availableCPUs cpuset.C
470553
return acc.result, nil
471554
}
472555

473-
// 2. Acquire whole cores, if available and the container requires at least
556+
// 2. If PreferAlignByUncoreCache is enabled, acquire whole UncoreCaches
557+
// if available and the container requires at least a UncoreCache's-worth
558+
// of CPUs. Otherwise, acquire CPUs from the least amount of UncoreCaches.
559+
if preferAlignByUncoreCache {
560+
acc.takeUncoreCache()
561+
if acc.isSatisfied() {
562+
return acc.result, nil
563+
}
564+
}
565+
566+
// 3. Acquire whole cores, if available and the container requires at least
474567
// a core's-worth of CPUs.
475568
acc.takeFullCores()
476569
if acc.isSatisfied() {
477570
return acc.result, nil
478571
}
479572

480-
// 3. Acquire single threads, preferring to fill partially-allocated cores
573+
// 4. Acquire single threads, preferring to fill partially-allocated cores
481574
// on the same sockets as the whole cores we have already taken in this
482575
// allocation.
483576
acc.takeRemainingCPUs()
@@ -555,8 +648,10 @@ func takeByTopologyNUMADistributed(topo *topology.CPUTopology, availableCPUs cpu
555648
// If the number of CPUs requested cannot be handed out in chunks of
556649
// 'cpuGroupSize', then we just call out the packing algorithm since we
557650
// can't distribute CPUs in this chunk size.
651+
// PreferAlignByUncoreCache feature not implemented here yet and set to false.
652+
// Support for PreferAlignByUncoreCache to be done at beta release.
558653
if (numCPUs % cpuGroupSize) != 0 {
559-
return takeByTopologyNUMAPacked(topo, availableCPUs, numCPUs)
654+
return takeByTopologyNUMAPacked(topo, availableCPUs, numCPUs, false)
560655
}
561656

562657
// Otherwise build an accumulator to start allocating CPUs from.
@@ -739,7 +834,7 @@ func takeByTopologyNUMADistributed(topo *topology.CPUTopology, availableCPUs cpu
739834
// size 'cpuGroupSize' from 'bestCombo'.
740835
distribution := (numCPUs / len(bestCombo) / cpuGroupSize) * cpuGroupSize
741836
for _, numa := range bestCombo {
742-
cpus, _ := takeByTopologyNUMAPacked(acc.topo, acc.details.CPUsInNUMANodes(numa), distribution)
837+
cpus, _ := takeByTopologyNUMAPacked(acc.topo, acc.details.CPUsInNUMANodes(numa), distribution, false)
743838
acc.take(cpus)
744839
}
745840

@@ -754,7 +849,7 @@ func takeByTopologyNUMADistributed(topo *topology.CPUTopology, availableCPUs cpu
754849
if acc.details.CPUsInNUMANodes(numa).Size() < cpuGroupSize {
755850
continue
756851
}
757-
cpus, _ := takeByTopologyNUMAPacked(acc.topo, acc.details.CPUsInNUMANodes(numa), cpuGroupSize)
852+
cpus, _ := takeByTopologyNUMAPacked(acc.topo, acc.details.CPUsInNUMANodes(numa), cpuGroupSize, false)
758853
acc.take(cpus)
759854
remainder -= cpuGroupSize
760855
}
@@ -778,5 +873,5 @@ func takeByTopologyNUMADistributed(topo *topology.CPUTopology, availableCPUs cpu
778873

779874
// If we never found a combination of NUMA nodes that we could properly
780875
// distribute CPUs across, fall back to the packing algorithm.
781-
return takeByTopologyNUMAPacked(topo, availableCPUs, numCPUs)
876+
return takeByTopologyNUMAPacked(topo, availableCPUs, numCPUs, false)
782877
}

pkg/kubelet/cm/cpumanager/cpu_assignment_test.go

+74-1
Original file line numberDiff line numberDiff line change
@@ -651,11 +651,84 @@ func TestTakeByTopologyNUMAPacked(t *testing.T) {
651651
"",
652652
mustParseCPUSet(t, "0-29,40-69,30,31,70,71"),
653653
},
654+
// Test cases for PreferAlignByUncoreCache
655+
{
656+
"take cpus from two full UncoreCaches and partial from a single UncoreCache",
657+
topoUncoreSingleSocketNoSMT,
658+
StaticPolicyOptions{PreferAlignByUncoreCacheOption: true},
659+
mustParseCPUSet(t, "1-15"),
660+
10,
661+
"",
662+
cpuset.New(1, 2, 4, 5, 6, 7, 8, 9, 10, 11),
663+
},
664+
{
665+
"take one cpu from dual socket with HT - core from Socket 0",
666+
topoDualSocketHT,
667+
StaticPolicyOptions{PreferAlignByUncoreCacheOption: true},
668+
cpuset.New(1, 2, 3, 4, 5, 7, 8, 9, 10, 11),
669+
1,
670+
"",
671+
cpuset.New(2),
672+
},
673+
{
674+
"take first available UncoreCache from first socket",
675+
topoUncoreDualSocketNoSMT,
676+
StaticPolicyOptions{PreferAlignByUncoreCacheOption: true},
677+
mustParseCPUSet(t, "0-15"),
678+
4,
679+
"",
680+
cpuset.New(0, 1, 2, 3),
681+
},
682+
{
683+
"take all available UncoreCache from first socket",
684+
topoUncoreDualSocketNoSMT,
685+
StaticPolicyOptions{PreferAlignByUncoreCacheOption: true},
686+
mustParseCPUSet(t, "2-15"),
687+
6,
688+
"",
689+
cpuset.New(2, 3, 4, 5, 6, 7),
690+
},
691+
{
692+
"take first available UncoreCache from second socket",
693+
topoUncoreDualSocketNoSMT,
694+
StaticPolicyOptions{PreferAlignByUncoreCacheOption: true},
695+
mustParseCPUSet(t, "8-15"),
696+
4,
697+
"",
698+
cpuset.New(8, 9, 10, 11),
699+
},
700+
{
701+
"take first available UncoreCache from available NUMA",
702+
topoUncoreSingleSocketMultiNuma,
703+
StaticPolicyOptions{PreferAlignByUncoreCacheOption: true},
704+
mustParseCPUSet(t, "3,4-8,12"),
705+
2,
706+
"",
707+
cpuset.New(4, 5),
708+
},
709+
{
710+
"take cpus from best available UncoreCache group of multi uncore cache single socket - SMT enabled",
711+
topoUncoreSingleSocketSMT,
712+
StaticPolicyOptions{PreferAlignByUncoreCacheOption: true},
713+
mustParseCPUSet(t, "2-3,10-11,4-7,12-15"),
714+
6,
715+
"",
716+
cpuset.New(4, 5, 6, 12, 13, 14),
717+
},
718+
{
719+
"take cpus from multiple UncoreCache of single socket - SMT enabled",
720+
topoUncoreSingleSocketSMT,
721+
StaticPolicyOptions{PreferAlignByUncoreCacheOption: true},
722+
mustParseCPUSet(t, "1-7,9-15"),
723+
10,
724+
"",
725+
mustParseCPUSet(t, "4-7,12-15,1,9"),
726+
},
654727
}...)
655728

656729
for _, tc := range testCases {
657730
t.Run(tc.description, func(t *testing.T) {
658-
result, err := takeByTopologyNUMAPacked(tc.topo, tc.availableCPUs, tc.numCPUs)
731+
result, err := takeByTopologyNUMAPacked(tc.topo, tc.availableCPUs, tc.numCPUs, tc.opts.PreferAlignByUncoreCacheOption)
659732
if tc.expErr != "" && err != nil && err.Error() != tc.expErr {
660733
t.Errorf("expected error to be [%v] but it was [%v]", tc.expErr, err)
661734
}

pkg/kubelet/cm/cpumanager/cpu_manager_test.go

+12-8
Original file line numberDiff line numberDiff line change
@@ -651,20 +651,24 @@ func TestCPUManagerGenerate(t *testing.T) {
651651
{
652652
Cores: []cadvisorapi.Core{
653653
{
654-
Id: 0,
655-
Threads: []int{0},
654+
Id: 0,
655+
Threads: []int{0},
656+
UncoreCaches: []cadvisorapi.Cache{{Id: 1}},
656657
},
657658
{
658-
Id: 1,
659-
Threads: []int{1},
659+
Id: 1,
660+
Threads: []int{1},
661+
UncoreCaches: []cadvisorapi.Cache{{Id: 1}},
660662
},
661663
{
662-
Id: 2,
663-
Threads: []int{2},
664+
Id: 2,
665+
Threads: []int{2},
666+
UncoreCaches: []cadvisorapi.Cache{{Id: 1}},
664667
},
665668
{
666-
Id: 3,
667-
Threads: []int{3},
669+
Id: 3,
670+
Threads: []int{3},
671+
UncoreCaches: []cadvisorapi.Cache{{Id: 1}},
668672
},
669673
},
670674
},

pkg/kubelet/cm/cpumanager/policy_options.go

+16
Original file line numberDiff line numberDiff line change
@@ -32,12 +32,14 @@ const (
3232
FullPCPUsOnlyOption string = "full-pcpus-only"
3333
DistributeCPUsAcrossNUMAOption string = "distribute-cpus-across-numa"
3434
AlignBySocketOption string = "align-by-socket"
35+
PreferAlignByUnCoreCacheOption string = "prefer-align-cpus-by-uncorecache"
3536
)
3637

3738
var (
3839
alphaOptions = sets.New[string](
3940
DistributeCPUsAcrossNUMAOption,
4041
AlignBySocketOption,
42+
PreferAlignByUnCoreCacheOption,
4143
)
4244
betaOptions = sets.New[string](
4345
FullPCPUsOnlyOption,
@@ -80,6 +82,9 @@ type StaticPolicyOptions struct {
8082
// Flag to ensure CPUs are considered aligned at socket boundary rather than
8183
// NUMA boundary
8284
AlignBySocket bool
85+
// Flag that makes best-effort to align CPUs to a uncorecache boundary
86+
// As long as there are CPUs available, pods will be admitted if the condition is not met.
87+
PreferAlignByUncoreCacheOption bool
8388
}
8489

8590
// NewStaticPolicyOptions creates a StaticPolicyOptions struct from the user configuration.
@@ -109,12 +114,23 @@ func NewStaticPolicyOptions(policyOptions map[string]string) (StaticPolicyOption
109114
return opts, fmt.Errorf("bad value for option %q: %w", name, err)
110115
}
111116
opts.AlignBySocket = optValue
117+
case PreferAlignByUnCoreCacheOption:
118+
optValue, err := strconv.ParseBool(value)
119+
if err != nil {
120+
return opts, fmt.Errorf("bad value for option %q: %w", name, err)
121+
}
122+
opts.PreferAlignByUncoreCacheOption = optValue
112123
default:
113124
// this should never be reached, we already detect unknown options,
114125
// but we keep it as further safety.
115126
return opts, fmt.Errorf("unsupported cpumanager option: %q (%s)", name, value)
116127
}
117128
}
129+
130+
if opts.PreferAlignByUncoreCacheOption && opts.DistributeCPUsAcrossNUMA {
131+
return opts, fmt.Errorf("static policy options %s and %s can not be used at the same time", PreferAlignByUnCoreCacheOption, DistributeCPUsAcrossNUMAOption)
132+
}
133+
118134
return opts, nil
119135
}
120136

pkg/kubelet/cm/cpumanager/policy_static.go

+2-1
Original file line numberDiff line numberDiff line change
@@ -505,7 +505,8 @@ func (p *staticPolicy) takeByTopology(availableCPUs cpuset.CPUSet, numCPUs int)
505505
}
506506
return takeByTopologyNUMADistributed(p.topology, availableCPUs, numCPUs, cpuGroupSize)
507507
}
508-
return takeByTopologyNUMAPacked(p.topology, availableCPUs, numCPUs)
508+
509+
return takeByTopologyNUMAPacked(p.topology, availableCPUs, numCPUs, p.options.PreferAlignByUncoreCacheOption)
509510
}
510511

511512
func (p *staticPolicy) GetTopologyHints(s state.State, pod *v1.Pod, container *v1.Container) map[string][]topologymanager.TopologyHint {

0 commit comments

Comments
 (0)