Skip to content

Commit 3f459f2

Browse files
sphrasavathajcaldelas
authored andcommitted
Add takeByTopologyUnCoreCachePacked if policy option align-cpus-by-uncorecache is enabled.
Adding new function to evaluate uncore cache id. Reverse allocation logic. Implement preferAlignByUncorecache within TakeByTopologyNUMAPacked, along with new test cases.
1 parent 9bbb46d commit 3f459f2

9 files changed

+1263
-350
lines changed

pkg/kubelet/cm/cpumanager/cpu_assignment.go

+109-7
Original file line numberDiff line numberDiff line change
@@ -118,6 +118,17 @@ func (n *numaFirst) takeFullSecondLevel() {
118118
n.acc.takeFullSockets()
119119
}
120120

121+
// Sort the UncoreCaches within the NUMA nodes.
122+
func (a *cpuAccumulator) sortAvailableUncoreCaches() []int {
123+
var result []int
124+
for _, numa := range a.sortAvailableNUMANodes() {
125+
uncore := a.details.UncoreInNUMANodes(numa).UnsortedList()
126+
a.sort(uncore, a.details.CPUsInUncoreCaches)
127+
result = append(result, uncore...)
128+
}
129+
return result
130+
}
131+
121132
// If NUMA nodes are higher in the memory hierarchy than sockets, then just
122133
// sort the NUMA nodes directly, and return them.
123134
func (n *numaFirst) sortAvailableNUMANodes() []int {
@@ -318,6 +329,12 @@ func (a *cpuAccumulator) isSocketFree(socketID int) bool {
318329
return a.details.CPUsInSockets(socketID).Size() == a.topo.CPUsPerSocket()
319330
}
320331

332+
// Returns true if the supplied UnCoreCache is fully available,
333+
// "fully available" means that all the CPUs in it are free.
334+
func (a *cpuAccumulator) isUncoreCacheFree(uncoreID int) bool {
335+
return a.details.CPUsInUncoreCaches(uncoreID).Size() == a.topo.CPUDetails.CPUsInUncoreCaches(uncoreID).Size()
336+
}
337+
321338
// Returns true if the supplied core is fully available in `a.details`.
322339
// "fully available" means that all the CPUs in it are free.
323340
func (a *cpuAccumulator) isCoreFree(coreID int) bool {
@@ -346,6 +363,17 @@ func (a *cpuAccumulator) freeSockets() []int {
346363
return free
347364
}
348365

366+
// Returns free UncoreCache IDs as a slice sorted by sortAvailableUnCoreCache().
367+
func (a *cpuAccumulator) freeUncoreCache() []int {
368+
free := []int{}
369+
for _, uncore := range a.sortAvailableUncoreCaches() {
370+
if a.isUncoreCacheFree(uncore) {
371+
free = append(free, uncore)
372+
}
373+
}
374+
return free
375+
}
376+
349377
// Returns free core IDs as a slice sorted by sortAvailableCores().
350378
func (a *cpuAccumulator) freeCores() []int {
351379
free := []int{}
@@ -519,6 +547,60 @@ func (a *cpuAccumulator) takeFullSockets() {
519547
}
520548
}
521549

550+
func (a *cpuAccumulator) takeFullUncore() {
551+
for _, uncore := range a.freeUncoreCache() {
552+
cpusInUncore := a.topo.CPUDetails.CPUsInUncoreCaches(uncore)
553+
if !a.needsAtLeast(cpusInUncore.Size()) {
554+
continue
555+
}
556+
klog.V(4).InfoS("takeFullUncore: claiming uncore", "uncore", uncore)
557+
a.take(cpusInUncore)
558+
}
559+
}
560+
561+
func (a *cpuAccumulator) takePartialUncore(uncoreID int) {
562+
numCoresNeeded := a.numCPUsNeeded / a.topo.CPUsPerCore()
563+
564+
// note: we need to keep the first N free cores (physical cpus) and only we we got these expand to their
565+
// cpus (virtual cpus). Taking directly the first M cpus (virtual cpus) leads to suboptimal allocation
566+
freeCores := a.details.CoresNeededInUncoreCache(numCoresNeeded, uncoreID)
567+
freeCPUs := a.details.CPUsInCores(freeCores.UnsortedList()...)
568+
569+
claimed := (a.numCPUsNeeded == freeCPUs.Size())
570+
klog.V(4).InfoS("takePartialUncore: trying to claim partial uncore",
571+
"uncore", uncoreID,
572+
"claimed", claimed,
573+
"needed", a.numCPUsNeeded,
574+
"cores", freeCores.String(),
575+
"cpus", freeCPUs.String())
576+
if !claimed {
577+
return
578+
579+
}
580+
a.take(freeCPUs)
581+
}
582+
583+
// First try to take full UncoreCache, if available and need is at least the size of the UncoreCache group.
584+
// Second try to take the partial UncoreCache if available and the request size can fit w/in the UncoreCache.
585+
func (a *cpuAccumulator) takeUncoreCache() {
586+
numCPUsInUncore := a.topo.CPUsPerUncore()
587+
for _, uncore := range a.sortAvailableUncoreCaches() {
588+
// take full UncoreCache if the CPUs needed is greater than free UncoreCache size
589+
if a.needsAtLeast(numCPUsInUncore) {
590+
a.takeFullUncore()
591+
}
592+
593+
if a.isSatisfied() {
594+
return
595+
}
596+
597+
a.takePartialUncore(uncore)
598+
if a.isSatisfied() {
599+
return
600+
}
601+
}
602+
}
603+
522604
func (a *cpuAccumulator) takeFullCores() {
523605
for _, core := range a.freeCores() {
524606
cpusInCore := a.topo.CPUDetails.CPUsInCores(core)
@@ -637,6 +719,14 @@ func (a *cpuAccumulator) iterateCombinations(n []int, k int, f func([]int) LoopC
637719
// or the remaining number of CPUs to take after having taken full sockets and NUMA nodes is less
638720
// than a whole NUMA node, the function tries to take whole physical cores (cores).
639721
//
722+
// If `PreferAlignByUncoreCache` is enabled, the function will try to optimally assign Uncorecaches.
723+
// If `numCPUs` is larger than or equal to the total number of CPUs in a Uncorecache, and there are
724+
// free (i.e. all CPUs within the Uncorecache are free) Uncorecaches, the function takes as many entire
725+
// cores from free Uncorecaches as possible. If/Once `numCPUs` is smaller than the total number of
726+
// CPUs in a free Uncorecache, the function scans each Uncorecache index in numerical order to assign
727+
// cores that will fit within the Uncorecache. If `numCPUs` cannot fit within any Uncorecache, the
728+
// function tries to take whole physical cores.
729+
//
640730
// If `numCPUs` is bigger than the total number of CPUs in a core, and there are
641731
// free (i.e. all CPUs in them are free) cores, the function takes as many entire free cores as possible.
642732
// The cores are taken from one socket at a time, and the sockets are considered by
@@ -658,7 +748,7 @@ func (a *cpuAccumulator) iterateCombinations(n []int, k int, f func([]int) LoopC
658748
// the least amount of free CPUs to the one with the highest amount of free CPUs (i.e. in ascending
659749
// order of free CPUs). For any NUMA node, the cores are selected from the ones in the socket with
660750
// the least amount of free CPUs to the one with the highest amount of free CPUs.
661-
func takeByTopologyNUMAPacked(topo *topology.CPUTopology, availableCPUs cpuset.CPUSet, numCPUs int, cpuSortingStrategy CPUSortingStrategy) (cpuset.CPUSet, error) {
751+
func takeByTopologyNUMAPacked(topo *topology.CPUTopology, availableCPUs cpuset.CPUSet, numCPUs int, cpuSortingStrategy CPUSortingStrategy, preferAlignByUncoreCache bool) (cpuset.CPUSet, error) {
662752
acc := newCPUAccumulator(topo, availableCPUs, numCPUs, cpuSortingStrategy)
663753
if acc.isSatisfied() {
664754
return acc.result, nil
@@ -681,7 +771,17 @@ func takeByTopologyNUMAPacked(topo *topology.CPUTopology, availableCPUs cpuset.C
681771
return acc.result, nil
682772
}
683773

684-
// 2. Acquire whole cores, if available and the container requires at least
774+
// 2. If PreferAlignByUncoreCache is enabled, acquire whole UncoreCaches
775+
// if available and the container requires at least a UncoreCache's-worth
776+
// of CPUs. Otherwise, acquire CPUs from the least amount of UncoreCaches.
777+
if preferAlignByUncoreCache {
778+
acc.takeUncoreCache()
779+
if acc.isSatisfied() {
780+
return acc.result, nil
781+
}
782+
}
783+
784+
// 3. Acquire whole cores, if available and the container requires at least
685785
// a core's-worth of CPUs.
686786
// If `CPUSortingStrategySpread` is specified, skip taking the whole core.
687787
if cpuSortingStrategy != CPUSortingStrategySpread {
@@ -691,7 +791,7 @@ func takeByTopologyNUMAPacked(topo *topology.CPUTopology, availableCPUs cpuset.C
691791
}
692792
}
693793

694-
// 3. Acquire single threads, preferring to fill partially-allocated cores
794+
// 4. Acquire single threads, preferring to fill partially-allocated cores
695795
// on the same sockets as the whole cores we have already taken in this
696796
// allocation.
697797
acc.takeRemainingCPUs()
@@ -769,8 +869,10 @@ func takeByTopologyNUMADistributed(topo *topology.CPUTopology, availableCPUs cpu
769869
// If the number of CPUs requested cannot be handed out in chunks of
770870
// 'cpuGroupSize', then we just call out the packing algorithm since we
771871
// can't distribute CPUs in this chunk size.
872+
// PreferAlignByUncoreCache feature not implemented here yet and set to false.
873+
// Support for PreferAlignByUncoreCache to be done at beta release.
772874
if (numCPUs % cpuGroupSize) != 0 {
773-
return takeByTopologyNUMAPacked(topo, availableCPUs, numCPUs, cpuSortingStrategy)
875+
return takeByTopologyNUMAPacked(topo, availableCPUs, numCPUs, cpuSortingStrategy, false)
774876
}
775877

776878
// Otherwise build an accumulator to start allocating CPUs from.
@@ -953,7 +1055,7 @@ func takeByTopologyNUMADistributed(topo *topology.CPUTopology, availableCPUs cpu
9531055
// size 'cpuGroupSize' from 'bestCombo'.
9541056
distribution := (numCPUs / len(bestCombo) / cpuGroupSize) * cpuGroupSize
9551057
for _, numa := range bestCombo {
956-
cpus, _ := takeByTopologyNUMAPacked(acc.topo, acc.details.CPUsInNUMANodes(numa), distribution, cpuSortingStrategy)
1058+
cpus, _ := takeByTopologyNUMAPacked(acc.topo, acc.details.CPUsInNUMANodes(numa), distribution, cpuSortingStrategy, false)
9571059
acc.take(cpus)
9581060
}
9591061

@@ -968,7 +1070,7 @@ func takeByTopologyNUMADistributed(topo *topology.CPUTopology, availableCPUs cpu
9681070
if acc.details.CPUsInNUMANodes(numa).Size() < cpuGroupSize {
9691071
continue
9701072
}
971-
cpus, _ := takeByTopologyNUMAPacked(acc.topo, acc.details.CPUsInNUMANodes(numa), cpuGroupSize, cpuSortingStrategy)
1073+
cpus, _ := takeByTopologyNUMAPacked(acc.topo, acc.details.CPUsInNUMANodes(numa), cpuGroupSize, cpuSortingStrategy, false)
9721074
acc.take(cpus)
9731075
remainder -= cpuGroupSize
9741076
}
@@ -992,5 +1094,5 @@ func takeByTopologyNUMADistributed(topo *topology.CPUTopology, availableCPUs cpu
9921094

9931095
// If we never found a combination of NUMA nodes that we could properly
9941096
// distribute CPUs across, fall back to the packing algorithm.
995-
return takeByTopologyNUMAPacked(topo, availableCPUs, numCPUs, cpuSortingStrategy)
1097+
return takeByTopologyNUMAPacked(topo, availableCPUs, numCPUs, cpuSortingStrategy, false)
9961098
}

pkg/kubelet/cm/cpumanager/cpu_assignment_test.go

+75-2
Original file line numberDiff line numberDiff line change
@@ -668,6 +668,79 @@ func TestTakeByTopologyNUMAPacked(t *testing.T) {
668668
"",
669669
mustParseCPUSet(t, "0-29,40-69,30,31,70,71"),
670670
},
671+
// Test cases for PreferAlignByUncoreCache
672+
{
673+
"take cpus from two full UncoreCaches and partial from a single UncoreCache",
674+
topoUncoreSingleSocketNoSMT,
675+
StaticPolicyOptions{PreferAlignByUncoreCacheOption: true},
676+
mustParseCPUSet(t, "1-15"),
677+
10,
678+
"",
679+
cpuset.New(1, 2, 4, 5, 6, 7, 8, 9, 10, 11),
680+
},
681+
{
682+
"take one cpu from dual socket with HT - core from Socket 0",
683+
topoDualSocketHT,
684+
StaticPolicyOptions{PreferAlignByUncoreCacheOption: true},
685+
cpuset.New(1, 2, 3, 4, 5, 7, 8, 9, 10, 11),
686+
1,
687+
"",
688+
cpuset.New(2),
689+
},
690+
{
691+
"take first available UncoreCache from first socket",
692+
topoUncoreDualSocketNoSMT,
693+
StaticPolicyOptions{PreferAlignByUncoreCacheOption: true},
694+
mustParseCPUSet(t, "0-15"),
695+
4,
696+
"",
697+
cpuset.New(0, 1, 2, 3),
698+
},
699+
{
700+
"take all available UncoreCache from first socket",
701+
topoUncoreDualSocketNoSMT,
702+
StaticPolicyOptions{PreferAlignByUncoreCacheOption: true},
703+
mustParseCPUSet(t, "2-15"),
704+
6,
705+
"",
706+
cpuset.New(2, 3, 4, 5, 6, 7),
707+
},
708+
{
709+
"take first available UncoreCache from second socket",
710+
topoUncoreDualSocketNoSMT,
711+
StaticPolicyOptions{PreferAlignByUncoreCacheOption: true},
712+
mustParseCPUSet(t, "8-15"),
713+
4,
714+
"",
715+
cpuset.New(8, 9, 10, 11),
716+
},
717+
{
718+
"take first available UncoreCache from available NUMA",
719+
topoUncoreSingleSocketMultiNuma,
720+
StaticPolicyOptions{PreferAlignByUncoreCacheOption: true},
721+
mustParseCPUSet(t, "3,4-8,12"),
722+
2,
723+
"",
724+
cpuset.New(4, 5),
725+
},
726+
{
727+
"take cpus from best available UncoreCache group of multi uncore cache single socket - SMT enabled",
728+
topoUncoreSingleSocketSMT,
729+
StaticPolicyOptions{PreferAlignByUncoreCacheOption: true},
730+
mustParseCPUSet(t, "2-3,10-11,4-7,12-15"),
731+
6,
732+
"",
733+
cpuset.New(4, 5, 6, 12, 13, 14),
734+
},
735+
{
736+
"take cpus from multiple UncoreCache of single socket - SMT enabled",
737+
topoUncoreSingleSocketSMT,
738+
StaticPolicyOptions{PreferAlignByUncoreCacheOption: true},
739+
mustParseCPUSet(t, "1-7,9-15"),
740+
10,
741+
"",
742+
mustParseCPUSet(t, "4-7,12-15,1,9"),
743+
},
671744
}...)
672745

673746
for _, tc := range testCases {
@@ -677,7 +750,7 @@ func TestTakeByTopologyNUMAPacked(t *testing.T) {
677750
strategy = CPUSortingStrategySpread
678751
}
679752

680-
result, err := takeByTopologyNUMAPacked(tc.topo, tc.availableCPUs, tc.numCPUs, strategy)
753+
result, err := takeByTopologyNUMAPacked(tc.topo, tc.availableCPUs, tc.numCPUs, strategy, tc.opts.PreferAlignByUncoreCacheOption)
681754
if tc.expErr != "" && err != nil && err.Error() != tc.expErr {
682755
t.Errorf("expected error to be [%v] but it was [%v]", tc.expErr, err)
683756
}
@@ -778,7 +851,7 @@ func TestTakeByTopologyWithSpreadPhysicalCPUsPreferredOption(t *testing.T) {
778851
if tc.opts.DistributeCPUsAcrossCores {
779852
strategy = CPUSortingStrategySpread
780853
}
781-
result, err := takeByTopologyNUMAPacked(tc.topo, tc.availableCPUs, tc.numCPUs, strategy)
854+
result, err := takeByTopologyNUMAPacked(tc.topo, tc.availableCPUs, tc.numCPUs, strategy, tc.opts.PreferAlignByUncoreCacheOption)
782855
if tc.expErr != "" && err.Error() != tc.expErr {
783856
t.Errorf("testCase %q failed, expected error to be [%v] but it was [%v]", tc.description, tc.expErr, err)
784857
}

pkg/kubelet/cm/cpumanager/cpu_manager_test.go

+12-8
Original file line numberDiff line numberDiff line change
@@ -668,20 +668,24 @@ func TestCPUManagerGenerate(t *testing.T) {
668668
{
669669
Cores: []cadvisorapi.Core{
670670
{
671-
Id: 0,
672-
Threads: []int{0},
671+
Id: 0,
672+
Threads: []int{0},
673+
UncoreCaches: []cadvisorapi.Cache{{Id: 1}},
673674
},
674675
{
675-
Id: 1,
676-
Threads: []int{1},
676+
Id: 1,
677+
Threads: []int{1},
678+
UncoreCaches: []cadvisorapi.Cache{{Id: 1}},
677679
},
678680
{
679-
Id: 2,
680-
Threads: []int{2},
681+
Id: 2,
682+
Threads: []int{2},
683+
UncoreCaches: []cadvisorapi.Cache{{Id: 1}},
681684
},
682685
{
683-
Id: 3,
684-
Threads: []int{3},
686+
Id: 3,
687+
Threads: []int{3},
688+
UncoreCaches: []cadvisorapi.Cache{{Id: 1}},
685689
},
686690
},
687691
},

pkg/kubelet/cm/cpumanager/policy_options.go

+19
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,7 @@ const (
3434
AlignBySocketOption string = "align-by-socket"
3535
DistributeCPUsAcrossCoresOption string = "distribute-cpus-across-cores"
3636
StrictCPUReservationOption string = "strict-cpu-reservation"
37+
PreferAlignByUnCoreCacheOption string = "prefer-align-cpus-by-uncorecache"
3738
)
3839

3940
var (
@@ -42,6 +43,7 @@ var (
4243
AlignBySocketOption,
4344
DistributeCPUsAcrossCoresOption,
4445
StrictCPUReservationOption,
46+
PreferAlignByUnCoreCacheOption,
4547
)
4648
betaOptions = sets.New[string](
4749
FullPCPUsOnlyOption,
@@ -90,6 +92,9 @@ type StaticPolicyOptions struct {
9092
DistributeCPUsAcrossCores bool
9193
// Flag to remove reserved cores from the list of available cores
9294
StrictCPUReservation bool
95+
// Flag that makes best-effort to align CPUs to a uncorecache boundary
96+
// As long as there are CPUs available, pods will be admitted if the condition is not met.
97+
PreferAlignByUncoreCacheOption bool
9398
}
9499

95100
// NewStaticPolicyOptions creates a StaticPolicyOptions struct from the user configuration.
@@ -131,6 +136,12 @@ func NewStaticPolicyOptions(policyOptions map[string]string) (StaticPolicyOption
131136
return opts, fmt.Errorf("bad value for option %q: %w", name, err)
132137
}
133138
opts.StrictCPUReservation = optValue
139+
case PreferAlignByUnCoreCacheOption:
140+
optValue, err := strconv.ParseBool(value)
141+
if err != nil {
142+
return opts, fmt.Errorf("bad value for option %q: %w", name, err)
143+
}
144+
opts.PreferAlignByUncoreCacheOption = optValue
134145
default:
135146
// this should never be reached, we already detect unknown options,
136147
// but we keep it as further safety.
@@ -147,6 +158,14 @@ func NewStaticPolicyOptions(policyOptions map[string]string) (StaticPolicyOption
147158
return opts, fmt.Errorf("static policy options %s and %s can not be used at the same time", DistributeCPUsAcrossNUMAOption, DistributeCPUsAcrossCoresOption)
148159
}
149160

161+
if opts.PreferAlignByUncoreCacheOption && opts.DistributeCPUsAcrossCores {
162+
return opts, fmt.Errorf("static policy options %s and %s can not be used at the same time", PreferAlignByUnCoreCacheOption, DistributeCPUsAcrossCoresOption)
163+
}
164+
165+
if opts.PreferAlignByUncoreCacheOption && opts.DistributeCPUsAcrossNUMA {
166+
return opts, fmt.Errorf("static policy options %s and %s can not be used at the same time", PreferAlignByUnCoreCacheOption, DistributeCPUsAcrossNUMAOption)
167+
}
168+
150169
return opts, nil
151170
}
152171

pkg/kubelet/cm/cpumanager/policy_static.go

+2-1
Original file line numberDiff line numberDiff line change
@@ -525,7 +525,8 @@ func (p *staticPolicy) takeByTopology(availableCPUs cpuset.CPUSet, numCPUs int)
525525
}
526526
return takeByTopologyNUMADistributed(p.topology, availableCPUs, numCPUs, cpuGroupSize, cpuSortingStrategy)
527527
}
528-
return takeByTopologyNUMAPacked(p.topology, availableCPUs, numCPUs, cpuSortingStrategy)
528+
529+
return takeByTopologyNUMAPacked(p.topology, availableCPUs, numCPUs, cpuSortingStrategy, p.options.PreferAlignByUncoreCacheOption)
529530
}
530531

531532
func (p *staticPolicy) GetTopologyHints(s state.State, pod *v1.Pod, container *v1.Container) map[string][]topologymanager.TopologyHint {

0 commit comments

Comments
 (0)