Skip to content

Commit a8d9d8d

Browse files
authored
Merge pull request #151 from ddebroy/ddebroy-spread1
Evenly spread volumes of a StatefuleSet across nodes based on topology
2 parents bdc133f + 14cd3e4 commit a8d9d8d

File tree

3 files changed

+352
-12
lines changed

3 files changed

+352
-12
lines changed

pkg/controller/controller.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -435,6 +435,7 @@ func (p *csiProvisioner) Provision(options controller.VolumeOptions) (*v1.Persis
435435
p.client,
436436
p.csiAPIClient,
437437
driverState.driverName,
438+
options.PVC.Name,
438439
options.AllowedTopologies,
439440
options.SelectedNode)
440441
if err != nil {

pkg/controller/topology.go

Lines changed: 75 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -20,13 +20,15 @@ import (
2020
"fmt"
2121
"github.com/container-storage-interface/spec/lib/go/csi/v0"
2222
"github.com/golang/glog"
23+
"hash/fnv"
2324
"k8s.io/api/core/v1"
2425
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
2526
"k8s.io/client-go/kubernetes"
2627
csiv1alpha1 "k8s.io/csi-api/pkg/apis/csi/v1alpha1"
2728
csiclientset "k8s.io/csi-api/pkg/client/clientset/versioned"
2829
"math/rand"
2930
"sort"
31+
"strconv"
3032
"strings"
3133
)
3234

@@ -68,6 +70,7 @@ func GenerateAccessibilityRequirements(
6870
kubeClient kubernetes.Interface,
6971
csiAPIClient csiclientset.Interface,
7072
driverName string,
73+
pvcName string,
7174
allowedTopologies []v1.TopologySelectorTerm,
7275
selectedNode *v1.Node) (*csi.TopologyRequirement, error) {
7376
requirement := &csi.TopologyRequirement{}
@@ -96,7 +99,15 @@ func GenerateAccessibilityRequirements(
9699
requirement.Requisite = toCSITopology(requisiteTerms)
97100

98101
/* Preferred */
99-
if selectedNode != nil {
102+
var preferredTerms []topologyTerm
103+
if selectedNode == nil {
104+
// no node selected therefore ensure even spreading of StatefulSet volumes by sorting
105+
// requisiteTerms and shifting the sorted terms based on hash of pvcName and replica index suffix
106+
hash, index := getPVCNameHashAndIndexOffset(pvcName)
107+
i := (hash + index) % uint32(len(requisiteTerms))
108+
preferredTerms = sortAndShift(requisiteTerms, nil, i)
109+
} else {
110+
// selectedNode is set so use topology from that node to populate preferredTerms
100111
// TODO (verult) reuse selected node info from aggregateTopologies
101112
// TODO (verult) retry
102113
nodeInfo, err := csiAPIClient.CsiV1alpha1().CSINodeInfos().Get(selectedNode.Name, metav1.GetOptions{})
@@ -110,7 +121,7 @@ func GenerateAccessibilityRequirements(
110121
return nil, fmt.Errorf("topology labels from selected node %v does not match topology keys from CSINodeInfo %v", selectedNode.Labels, topologyKeys)
111122
}
112123

113-
preferredTerms := sortAndShift(requisiteTerms, selectedTopology)
124+
preferredTerms = sortAndShift(requisiteTerms, selectedTopology, 0)
114125
if preferredTerms == nil {
115126
// Topology from selected node is not in requisite. This case should never be hit:
116127
// - If AllowedTopologies is specified, the scheduler should choose a node satisfying the
@@ -119,10 +130,8 @@ func GenerateAccessibilityRequirements(
119130
// selected node.
120131
return nil, fmt.Errorf("topology %v from selected node %q is not in requisite", selectedTopology, selectedNode.Name)
121132
}
122-
123-
requirement.Preferred = toCSITopology(preferredTerms)
124133
}
125-
134+
requirement.Preferred = toCSITopology(preferredTerms)
126135
return requirement, nil
127136
}
128137

@@ -267,17 +276,21 @@ func deduplicate(terms []topologyTerm) []topologyTerm {
267276
}
268277

269278
// Sort the given terms in place,
270-
// then return a new list of terms equivalent to the sorted terms, but shifted so that the primary
271-
// term is the first in the list.
272-
func sortAndShift(terms []topologyTerm, primary topologyTerm) []topologyTerm {
279+
// then return a new list of terms equivalent to the sorted terms, but shifted so that
280+
// either the primary term (if specified) or term at shiftIndex is the first in the list.
281+
func sortAndShift(terms []topologyTerm, primary topologyTerm, shiftIndex uint32) []topologyTerm {
273282
var preferredTerms []topologyTerm
274283
sort.Slice(terms, func(i, j int) bool {
275284
return terms[i].less(terms[j])
276285
})
277-
for i, t := range terms {
278-
if t.equal(primary) {
279-
preferredTerms = append(terms[i:], terms[:i]...)
280-
break
286+
if primary == nil {
287+
preferredTerms = append(terms[shiftIndex:], terms[:shiftIndex]...)
288+
} else {
289+
for i, t := range terms {
290+
if t.equal(primary) {
291+
preferredTerms = append(terms[i:], terms[:i]...)
292+
break
293+
}
281294
}
282295
}
283296
return preferredTerms
@@ -367,3 +380,53 @@ func toCSITopology(terms []topologyTerm) []*csi.Topology {
367380
}
368381
return out
369382
}
383+
384+
// identical to logic in getPVCNameHashAndIndexOffset in pkg/volume/util/util.go in-tree
385+
// [https://github.com/kubernetes/kubernetes/blob/master/pkg/volume/util/util.go]
386+
func getPVCNameHashAndIndexOffset(pvcName string) (hash uint32, index uint32) {
387+
if pvcName == "" {
388+
// We should always be called with a name; this shouldn't happen
389+
hash = rand.Uint32()
390+
} else {
391+
hashString := pvcName
392+
393+
// Heuristic to make sure that volumes in a StatefulSet are spread across zones
394+
// StatefulSet PVCs are (currently) named ClaimName-StatefulSetName-Id,
395+
// where Id is an integer index.
396+
// Note though that if a StatefulSet pod has multiple claims, we need them to be
397+
// in the same zone, because otherwise the pod will be unable to mount both volumes,
398+
// and will be unschedulable. So we hash _only_ the "StatefulSetName" portion when
399+
// it looks like `ClaimName-StatefulSetName-Id`.
400+
// We continue to round-robin volume names that look like `Name-Id` also; this is a useful
401+
// feature for users that are creating statefulset-like functionality without using statefulsets.
402+
lastDash := strings.LastIndexByte(pvcName, '-')
403+
if lastDash != -1 {
404+
statefulsetIDString := pvcName[lastDash+1:]
405+
statefulsetID, err := strconv.ParseUint(statefulsetIDString, 10, 32)
406+
if err == nil {
407+
// Offset by the statefulsetID, so we round-robin across zones
408+
index = uint32(statefulsetID)
409+
// We still hash the volume name, but only the prefix
410+
hashString = pvcName[:lastDash]
411+
412+
// In the special case where it looks like `ClaimName-StatefulSetName-Id`,
413+
// hash only the StatefulSetName, so that different claims on the same StatefulSet
414+
// member end up in the same zone.
415+
// Note that StatefulSetName (and ClaimName) might themselves both have dashes.
416+
// We actually just take the portion after the final - of ClaimName-StatefulSetName.
417+
// For our purposes it doesn't much matter (just suboptimal spreading).
418+
lastDash := strings.LastIndexByte(hashString, '-')
419+
if lastDash != -1 {
420+
hashString = hashString[lastDash+1:]
421+
}
422+
}
423+
}
424+
425+
// We hash the (base) volume name, so we don't bias towards the first N zones
426+
h := fnv.New32()
427+
h.Write([]byte(hashString))
428+
hash = h.Sum32()
429+
}
430+
431+
return hash, index
432+
}

0 commit comments

Comments
 (0)