Skip to content

Commit 5e7e1e7

Browse files
committed
KEP-4742: Node Topology Labels via Downward API
1 parent e3baee3 commit 5e7e1e7

File tree

10 files changed

+580
-6
lines changed

10 files changed

+580
-6
lines changed

pkg/controlplane/apiserver/samples/generic/server/admission.go

+3
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@ import (
3030
certsigning "k8s.io/kubernetes/plugin/pkg/admission/certificates/signing"
3131
certsubjectrestriction "k8s.io/kubernetes/plugin/pkg/admission/certificates/subjectrestriction"
3232
"k8s.io/kubernetes/plugin/pkg/admission/defaulttolerationseconds"
33+
"k8s.io/kubernetes/plugin/pkg/admission/podtopologylabels"
3334
"k8s.io/kubernetes/plugin/pkg/admission/serviceaccount"
3435
)
3536

@@ -48,6 +49,8 @@ func DefaultOffAdmissionPlugins() sets.Set[string] {
4849
certsubjectrestriction.PluginName, // CertificateSubjectRestriction
4950
validatingadmissionpolicy.PluginName, // ValidatingAdmissionPolicy
5051
mutatingadmissionpolicy.PluginName, // MutatingAdmissionPolicy
52+
podtopologylabels.PluginName, // PodTopologyLabels, only active when feature gate PodTopologyLabelsAdmission is enabled.
53+
validatingadmissionpolicy.PluginName, // ValidatingAdmissionPolicy, only active when feature gate ValidatingAdmissionPolicy is enabled
5154
)
5255

5356
return sets.New(options.AllOrderedPlugins...).Difference(defaultOnPlugins)

pkg/features/kube_features.go

+10
Original file line numberDiff line numberDiff line change
@@ -964,6 +964,16 @@ const (
964964
// restore the old behavior. Please file issues if you hit issues and have to use this Feature Gate.
965965
// The Feature Gate will be locked to true and then removed in +2 releases (1.35) if there are no bug reported
966966
DisableCPUQuotaWithExclusiveCPUs featuregate.Feature = "DisableCPUQuotaWithExclusiveCPUs"
967+
968+
// owner: @munnerz
969+
// kep: https://kep.k8s.io/4742
970+
// alpha: v1.33
971+
//
972+
// Enables the PodTopologyLabelsAdmission admission plugin to automatically set node topology labels
973+
// (i.e. those with the 'topology.k8s.io/' prefix on Node objects) onto Pod objects when they are
974+
// bound/scheduled to a node.
975+
// This allows workloads running in pods to understand the topology information of their assigned node.
976+
PodTopologyLabelsAdmission featuregate.Feature = "PodTopologyLabelsAdmission"
967977
)
968978

969979
// defaultVersionedKubernetesFeatureGates consists of all known Kubernetes-specific feature keys with VersionedSpecs.

pkg/kubeapiserver/options/plugins.go

+4
Original file line numberDiff line numberDiff line change
@@ -46,6 +46,7 @@ import (
4646
"k8s.io/kubernetes/plugin/pkg/admission/nodetaint"
4747
"k8s.io/kubernetes/plugin/pkg/admission/podnodeselector"
4848
"k8s.io/kubernetes/plugin/pkg/admission/podtolerationrestriction"
49+
"k8s.io/kubernetes/plugin/pkg/admission/podtopologylabels"
4950
podpriority "k8s.io/kubernetes/plugin/pkg/admission/priority"
5051
"k8s.io/kubernetes/plugin/pkg/admission/runtimeclass"
5152
"k8s.io/kubernetes/plugin/pkg/admission/security/podsecurity"
@@ -93,6 +94,7 @@ var AllOrderedPlugins = []string{
9394
certsubjectrestriction.PluginName, // CertificateSubjectRestriction
9495
defaultingressclass.PluginName, // DefaultIngressClass
9596
denyserviceexternalips.PluginName, // DenyServiceExternalIPs
97+
podtopologylabels.PluginName, // PodTopologyLabels
9698

9799
// new admission plugins should generally be inserted above here
98100
// webhook, resourcequota, and deny plugins must go at the end
@@ -138,6 +140,7 @@ func RegisterAllAdmissionPlugins(plugins *admission.Plugins) {
138140
certsigning.Register(plugins)
139141
ctbattest.Register(plugins)
140142
certsubjectrestriction.Register(plugins)
143+
podtopologylabels.Register(plugins)
141144
}
142145

143146
// DefaultOffAdmissionPlugins get admission plugins off by default for kube-apiserver.
@@ -162,6 +165,7 @@ func DefaultOffAdmissionPlugins() sets.Set[string] {
162165
certsubjectrestriction.PluginName, // CertificateSubjectRestriction
163166
defaultingressclass.PluginName, // DefaultIngressClass
164167
podsecurity.PluginName, // PodSecurity
168+
podtopologylabels.PluginName, // PodTopologyLabels, only active when feature gate PodTopologyLabelsAdmission is enabled.
165169
mutatingadmissionpolicy.PluginName, // Mutatingadmissionpolicy, only active when feature gate MutatingAdmissionpolicy is enabled
166170
validatingadmissionpolicy.PluginName, // ValidatingAdmissionPolicy, only active when feature gate ValidatingAdmissionPolicy is enabled
167171
)

pkg/registry/core/pod/storage/storage.go

+21-4
Original file line numberDiff line numberDiff line change
@@ -33,10 +33,12 @@ import (
3333
"k8s.io/apiserver/pkg/storage"
3434
storeerr "k8s.io/apiserver/pkg/storage/errors"
3535
"k8s.io/apiserver/pkg/util/dryrun"
36+
utilfeature "k8s.io/apiserver/pkg/util/feature"
3637
policyclient "k8s.io/client-go/kubernetes/typed/policy/v1"
3738
podutil "k8s.io/kubernetes/pkg/api/pod"
3839
api "k8s.io/kubernetes/pkg/apis/core"
3940
"k8s.io/kubernetes/pkg/apis/core/validation"
41+
kubefeatures "k8s.io/kubernetes/pkg/features"
4042
"k8s.io/kubernetes/pkg/kubelet/client"
4143
"k8s.io/kubernetes/pkg/printers"
4244
printersinternal "k8s.io/kubernetes/pkg/printers/internalversion"
@@ -191,7 +193,7 @@ func (r *BindingREST) Create(ctx context.Context, name string, obj runtime.Objec
191193
}
192194
}
193195

194-
err = r.assignPod(ctx, binding.UID, binding.ResourceVersion, binding.Name, binding.Target.Name, binding.Annotations, dryrun.IsDryRun(options.DryRun))
196+
err = r.assignPod(ctx, binding.UID, binding.ResourceVersion, binding.Name, binding.Target.Name, binding.Annotations, binding.Labels, dryrun.IsDryRun(options.DryRun))
195197
out = &metav1.Status{Status: metav1.StatusSuccess}
196198
return
197199
}
@@ -206,7 +208,7 @@ func (r *BindingREST) PreserveRequestObjectMetaSystemFieldsOnSubresourceCreate()
206208
// setPodHostAndAnnotations sets the given pod's host to 'machine' if and only if
207209
// the pod is unassigned and merges the provided annotations with those of the pod.
208210
// Returns the current state of the pod, or an error.
209-
func (r *BindingREST) setPodHostAndAnnotations(ctx context.Context, podUID types.UID, podResourceVersion, podID, machine string, annotations map[string]string, dryRun bool) (finalPod *api.Pod, err error) {
211+
func (r *BindingREST) setPodHostAndAnnotations(ctx context.Context, podUID types.UID, podResourceVersion, podID, machine string, annotations, labels map[string]string, dryRun bool) (finalPod *api.Pod, err error) {
210212
podKey, err := r.store.KeyFunc(ctx, podID)
211213
if err != nil {
212214
return nil, err
@@ -245,6 +247,21 @@ func (r *BindingREST) setPodHostAndAnnotations(ctx context.Context, podUID types
245247
for k, v := range annotations {
246248
pod.Annotations[k] = v
247249
}
250+
if utilfeature.DefaultFeatureGate.Enabled(kubefeatures.PodTopologyLabelsAdmission) {
251+
// If any labels are present on the Binding, copy them across to the Pod.
252+
if len(labels) > 0 {
253+
if pod.Labels == nil {
254+
pod.Labels = make(map[string]string)
255+
}
256+
for k, v := range labels {
257+
if _, ok := pod.Labels[k]; ok {
258+
// don't overwrite labels that are already present on the Pod
259+
continue
260+
}
261+
pod.Labels[k] = v
262+
}
263+
}
264+
}
248265
podutil.UpdatePodCondition(&pod.Status, &api.PodCondition{
249266
Type: api.PodScheduled,
250267
Status: api.ConditionTrue,
@@ -256,8 +273,8 @@ func (r *BindingREST) setPodHostAndAnnotations(ctx context.Context, podUID types
256273
}
257274

258275
// assignPod assigns the given pod to the given machine.
259-
func (r *BindingREST) assignPod(ctx context.Context, podUID types.UID, podResourceVersion, podID string, machine string, annotations map[string]string, dryRun bool) (err error) {
260-
if _, err = r.setPodHostAndAnnotations(ctx, podUID, podResourceVersion, podID, machine, annotations, dryRun); err != nil {
276+
func (r *BindingREST) assignPod(ctx context.Context, podUID types.UID, podResourceVersion, podID string, machine string, annotations, labels map[string]string, dryRun bool) (err error) {
277+
if _, err = r.setPodHostAndAnnotations(ctx, podUID, podResourceVersion, podID, machine, annotations, labels, dryRun); err != nil {
261278
err = storeerr.InterpretGetError(err, api.Resource("pods"), podID)
262279
err = storeerr.InterpretUpdateError(err, api.Resource("pods"), podID)
263280
if _, ok := err.(*errors.StatusError); !ok {

pkg/registry/core/pod/storage/storage_test.go

+12-2
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,7 @@ import (
3434
"k8s.io/apimachinery/pkg/labels"
3535
"k8s.io/apimachinery/pkg/runtime"
3636
"k8s.io/apimachinery/pkg/types"
37+
"k8s.io/apimachinery/pkg/util/version"
3738
genericapirequest "k8s.io/apiserver/pkg/endpoints/request"
3839
"k8s.io/apiserver/pkg/registry/generic"
3940
genericregistry "k8s.io/apiserver/pkg/registry/generic/registry"
@@ -42,8 +43,11 @@ import (
4243
apiserverstorage "k8s.io/apiserver/pkg/storage"
4344
storeerr "k8s.io/apiserver/pkg/storage/errors"
4445
etcd3testing "k8s.io/apiserver/pkg/storage/etcd3/testing"
46+
utilfeature "k8s.io/apiserver/pkg/util/feature"
47+
featuregatetesting "k8s.io/component-base/featuregate/testing"
4548
podtest "k8s.io/kubernetes/pkg/api/pod/testing"
4649
api "k8s.io/kubernetes/pkg/apis/core"
50+
kubefeatures "k8s.io/kubernetes/pkg/features"
4751
"k8s.io/kubernetes/pkg/registry/registrytest"
4852
"k8s.io/kubernetes/pkg/securitycontext"
4953
)
@@ -676,12 +680,15 @@ func TestEtcdCreateWithContainersNotFound(t *testing.T) {
676680
t.Fatalf("unexpected error: %v", err)
677681
}
678682

683+
featuregatetesting.SetFeatureGateEmulationVersionDuringTest(t, utilfeature.DefaultFeatureGate, version.MustParse("1.32"))
684+
featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, kubefeatures.SetPodTopologyLabels, true)
679685
// Suddenly, a wild scheduler appears:
680686
_, err = bindingStorage.Create(ctx, "foo", &api.Binding{
681687
ObjectMeta: metav1.ObjectMeta{
682688
Namespace: metav1.NamespaceDefault,
683689
Name: "foo",
684-
Annotations: map[string]string{"label1": "value1"},
690+
Annotations: map[string]string{"annotation1": "value1"},
691+
Labels: map[string]string{"label1": "label-value1"},
685692
},
686693
Target: api.ObjectReference{Name: "machine"},
687694
}, rest.ValidateAllObjectFunc, &metav1.CreateOptions{})
@@ -695,9 +702,12 @@ func TestEtcdCreateWithContainersNotFound(t *testing.T) {
695702
}
696703
pod := obj.(*api.Pod)
697704

698-
if !(pod.Annotations != nil && pod.Annotations["label1"] == "value1") {
705+
if !(pod.Annotations != nil && pod.Annotations["annotation1"] == "value1") {
699706
t.Fatalf("Pod annotations don't match the expected: %v", pod.Annotations)
700707
}
708+
if !(pod.Labels != nil && pod.Labels["label1"] == "label-value1") {
709+
t.Fatalf("Pod labels don't match the expected: %v", pod.Labels)
710+
}
701711
}
702712

703713
func TestEtcdCreateWithConflict(t *testing.T) {
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,205 @@
1+
/*
2+
Copyright 2024 The Kubernetes Authors.
3+
4+
Licensed under the Apache License, Version 2.0 (the "License");
5+
you may not use this file except in compliance with the License.
6+
You may obtain a copy of the License at
7+
8+
http://www.apache.org/licenses/LICENSE-2.0
9+
10+
Unless required by applicable law or agreed to in writing, software
11+
distributed under the License is distributed on an "AS IS" BASIS,
12+
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
See the License for the specific language governing permissions and
14+
limitations under the License.
15+
*/
16+
17+
package podtopologylabels
18+
19+
import (
20+
"context"
21+
"fmt"
22+
"io"
23+
"strings"
24+
25+
"k8s.io/klog/v2"
26+
27+
apierrors "k8s.io/apimachinery/pkg/api/errors"
28+
"k8s.io/apimachinery/pkg/util/sets"
29+
"k8s.io/apiserver/pkg/admission"
30+
genericadmissioninitializer "k8s.io/apiserver/pkg/admission/initializer"
31+
"k8s.io/client-go/informers"
32+
corev1listers "k8s.io/client-go/listers/core/v1"
33+
"k8s.io/component-base/featuregate"
34+
api "k8s.io/kubernetes/pkg/apis/core"
35+
"k8s.io/kubernetes/pkg/features"
36+
)
37+
38+
// PluginName is a string with the name of the plugin
39+
const PluginName = "PodTopologyLabels"
40+
41+
// Register registers a plugin
42+
func Register(plugins *admission.Plugins) {
43+
plugins.Register(PluginName, func(_ io.Reader) (admission.Interface, error) {
44+
plugin := NewPodTopologyPlugin()
45+
return plugin, nil
46+
})
47+
}
48+
49+
// NewPodTopologyPlugin initializes a Plugin
50+
func NewPodTopologyPlugin() *Plugin {
51+
return &Plugin{
52+
Handler: admission.NewHandler(admission.Create),
53+
// Always copy zone and region labels.
54+
labels: sets.New("topology.k8s.io/zone", "topology.k8s.io/region"),
55+
// Also support copying arbitrary custom topology labels.
56+
domains: sets.New("topology.k8s.io"),
57+
// Copy any sub-domains of topology.k8s.io as well.
58+
suffixes: sets.New(".topology.k8s.io"),
59+
}
60+
}
61+
62+
type Plugin struct {
63+
*admission.Handler
64+
65+
nodeLister corev1listers.NodeLister
66+
67+
// explicit labels, list of domains or a list of domain
68+
// suffixes to be copies to Pod objects being bound.
69+
labels, domains, suffixes sets.Set[string]
70+
71+
enabled, inspectedFeatureGates bool
72+
}
73+
74+
var _ admission.MutationInterface = &Plugin{}
75+
var _ genericadmissioninitializer.WantsExternalKubeInformerFactory = &Plugin{}
76+
var _ genericadmissioninitializer.WantsFeatures = &Plugin{}
77+
78+
// InspectFeatureGates implements WantsFeatures.
79+
func (p *Plugin) InspectFeatureGates(featureGates featuregate.FeatureGate) {
80+
p.enabled = featureGates.Enabled(features.PodTopologyLabelsAdmission)
81+
p.inspectedFeatureGates = true
82+
}
83+
84+
func (p *Plugin) SetExternalKubeInformerFactory(factory informers.SharedInformerFactory) {
85+
nodeInformer := factory.Core().V1().Nodes()
86+
p.nodeLister = nodeInformer.Lister()
87+
p.SetReadyFunc(nodeInformer.Informer().HasSynced)
88+
}
89+
90+
func (p *Plugin) ValidateInitialization() error {
91+
if p.nodeLister == nil {
92+
return fmt.Errorf("nodeLister not set")
93+
}
94+
if !p.inspectedFeatureGates {
95+
return fmt.Errorf("feature gates not inspected")
96+
}
97+
return nil
98+
}
99+
100+
func (p *Plugin) Admit(ctx context.Context, a admission.Attributes, o admission.ObjectInterfaces) (err error) {
101+
if !p.enabled {
102+
return nil
103+
}
104+
if shouldIgnore(a) {
105+
return nil
106+
}
107+
// we need to wait for our caches to warm
108+
if !p.WaitForReady() {
109+
return admission.NewForbidden(a, fmt.Errorf("not yet ready to handle request"))
110+
}
111+
112+
binding := a.GetObject().(*api.Binding)
113+
// other fields are not set by the default scheduler for the binding target, so only check the Kind.
114+
if binding.Target.Kind != "Node" {
115+
klog.V(6).Info("Skipping Pod being bound to non-Node object type", "target", binding.Target.GroupVersionKind())
116+
return nil
117+
}
118+
119+
node, err := p.nodeLister.Get(binding.Target.Name)
120+
if err != nil {
121+
// Ignore NotFound errors to avoid risking breaking compatibility/behaviour.
122+
if apierrors.IsNotFound(err) {
123+
return nil
124+
}
125+
return err
126+
}
127+
128+
// fast-path/short circuit if the node has no labels
129+
if node.Labels == nil {
130+
return nil
131+
}
132+
133+
labelsToCopy := make(map[string]string)
134+
for k, v := range node.Labels {
135+
if !p.isTopologyLabel(k) {
136+
continue
137+
}
138+
labelsToCopy[k] = v
139+
}
140+
141+
if len(labelsToCopy) == 0 {
142+
// fast-path/short circuit if the node has no topology labels
143+
return nil
144+
}
145+
146+
// copy the topology labels into the Binding's labels, as these are copied from the Binding
147+
// to the Pod object being bound within the podBinding registry/store.
148+
if binding.Labels == nil {
149+
binding.Labels = make(map[string]string)
150+
}
151+
for k, v := range labelsToCopy {
152+
if _, exists := binding.Labels[k]; exists {
153+
// Don't overwrite labels on Binding resources as this could lead to unexpected
154+
// behaviour if any schedulers rely on being able to explicitly set values themselves.
155+
continue
156+
}
157+
binding.Labels[k] = v
158+
}
159+
160+
return nil
161+
}
162+
163+
func (p *Plugin) isTopologyLabel(key string) bool {
164+
// First check explicit label keys.
165+
if p.labels.Has(key) {
166+
return true
167+
}
168+
// Check the domain portion of the label key, if present
169+
domain, _, hasDomain := strings.Cut(key, "/")
170+
if !hasDomain {
171+
// fast-path if there is no / separator
172+
return false
173+
}
174+
if p.domains.Has(domain) {
175+
// check for explicit domains to copy
176+
return true
177+
}
178+
for _, suffix := range p.suffixes.UnsortedList() {
179+
// check if the domain has one of the suffixes that are to be copied
180+
if strings.HasSuffix(domain, suffix) {
181+
return true
182+
}
183+
}
184+
return false
185+
}
186+
187+
func shouldIgnore(a admission.Attributes) bool {
188+
resource := a.GetResource().GroupResource()
189+
if resource != api.Resource("pods") {
190+
return true
191+
}
192+
if a.GetSubresource() != "binding" {
193+
// only run the checks below on the binding subresource
194+
return true
195+
}
196+
197+
obj := a.GetObject()
198+
_, ok := obj.(*api.Binding)
199+
if !ok {
200+
klog.Errorf("expected Binding but got %s", a.GetKind().Kind)
201+
return true
202+
}
203+
204+
return false
205+
}

0 commit comments

Comments
 (0)