From 111a497321726c2dd7d2956f0aa6717d449270b5 Mon Sep 17 00:00:00 2001 From: Jordan Liggitt Date: Thu, 14 Dec 2017 15:43:37 -0500 Subject: [PATCH] UPSTREAM: 57211: Process cluster-scoped owners correctly --- .../garbagecollector/garbagecollector.go | 2 +- .../controller/garbagecollector/operations.go | 12 +- .../cluster_scoped_owner_test.go | 156 ++++++++++++++++++ .../garbage_collector_test.go | 10 +- 4 files changed, 170 insertions(+), 10 deletions(-) create mode 100644 vendor/k8s.io/kubernetes/test/integration/garbagecollector/cluster_scoped_owner_test.go diff --git a/vendor/k8s.io/kubernetes/pkg/controller/garbagecollector/garbagecollector.go b/vendor/k8s.io/kubernetes/pkg/controller/garbagecollector/garbagecollector.go index 544b3b57784b..c60bc0bdadad 100644 --- a/vendor/k8s.io/kubernetes/pkg/controller/garbagecollector/garbagecollector.go +++ b/vendor/k8s.io/kubernetes/pkg/controller/garbagecollector/garbagecollector.go @@ -214,7 +214,7 @@ func (gc *GarbageCollector) isDangling(reference metav1.OwnerReference, item *no if err != nil { return false, nil, err } - resource, err := gc.apiResource(reference.APIVersion, reference.Kind, len(item.identity.Namespace) != 0) + resource, err := gc.apiResource(reference.APIVersion, reference.Kind) if err != nil { return false, nil, err } diff --git a/vendor/k8s.io/kubernetes/pkg/controller/garbagecollector/operations.go b/vendor/k8s.io/kubernetes/pkg/controller/garbagecollector/operations.go index fcfdcd1cee65..4618f590f1e5 100644 --- a/vendor/k8s.io/kubernetes/pkg/controller/garbagecollector/operations.go +++ b/vendor/k8s.io/kubernetes/pkg/controller/garbagecollector/operations.go @@ -32,7 +32,7 @@ import ( // apiResource consults the REST mapper to translate an tuple to a unversioned.APIResource struct. -func (gc *GarbageCollector) apiResource(apiVersion, kind string, namespaced bool) (*metav1.APIResource, error) { +func (gc *GarbageCollector) apiResource(apiVersion, kind string) (*metav1.APIResource, error) { fqKind := schema.FromAPIVersionAndKind(apiVersion, kind) mapping, err := gc.restMapper.RESTMapping(fqKind.GroupKind(), apiVersion) if err != nil { @@ -41,7 +41,7 @@ func (gc *GarbageCollector) apiResource(apiVersion, kind string, namespaced bool glog.V(5).Infof("map kind %s, version %s to resource %s", kind, apiVersion, mapping.Resource) resource := metav1.APIResource{ Name: mapping.Resource, - Namespaced: namespaced, + Namespaced: mapping.Scope == meta.RESTScopeNamespace, Kind: kind, } return &resource, nil @@ -51,7 +51,7 @@ func (gc *GarbageCollector) deleteObject(item objectReference, policy *metav1.De fqKind := schema.FromAPIVersionAndKind(item.APIVersion, item.Kind) client, err := gc.clientPool.ClientForGroupVersionKind(fqKind) gc.registeredRateLimiter.registerIfNotPresent(fqKind.GroupVersion(), client, "garbage_collector_operation") - resource, err := gc.apiResource(item.APIVersion, item.Kind, len(item.Namespace) != 0) + resource, err := gc.apiResource(item.APIVersion, item.Kind) if err != nil { return err } @@ -65,7 +65,7 @@ func (gc *GarbageCollector) getObject(item objectReference) (*unstructured.Unstr fqKind := schema.FromAPIVersionAndKind(item.APIVersion, item.Kind) client, err := gc.clientPool.ClientForGroupVersionKind(fqKind) gc.registeredRateLimiter.registerIfNotPresent(fqKind.GroupVersion(), client, "garbage_collector_operation") - resource, err := gc.apiResource(item.APIVersion, item.Kind, len(item.Namespace) != 0) + resource, err := gc.apiResource(item.APIVersion, item.Kind) if err != nil { return nil, err } @@ -76,7 +76,7 @@ func (gc *GarbageCollector) updateObject(item objectReference, obj *unstructured fqKind := schema.FromAPIVersionAndKind(item.APIVersion, item.Kind) client, err := gc.clientPool.ClientForGroupVersionKind(fqKind) gc.registeredRateLimiter.registerIfNotPresent(fqKind.GroupVersion(), client, "garbage_collector_operation") - resource, err := gc.apiResource(item.APIVersion, item.Kind, len(item.Namespace) != 0) + resource, err := gc.apiResource(item.APIVersion, item.Kind) if err != nil { return nil, err } @@ -87,7 +87,7 @@ func (gc *GarbageCollector) patchObject(item objectReference, patch []byte) (*un fqKind := schema.FromAPIVersionAndKind(item.APIVersion, item.Kind) client, err := gc.clientPool.ClientForGroupVersionKind(fqKind) gc.registeredRateLimiter.registerIfNotPresent(fqKind.GroupVersion(), client, "garbage_collector_operation") - resource, err := gc.apiResource(item.APIVersion, item.Kind, len(item.Namespace) != 0) + resource, err := gc.apiResource(item.APIVersion, item.Kind) if err != nil { return nil, err } diff --git a/vendor/k8s.io/kubernetes/test/integration/garbagecollector/cluster_scoped_owner_test.go b/vendor/k8s.io/kubernetes/test/integration/garbagecollector/cluster_scoped_owner_test.go new file mode 100644 index 000000000000..fa032a69db4e --- /dev/null +++ b/vendor/k8s.io/kubernetes/test/integration/garbagecollector/cluster_scoped_owner_test.go @@ -0,0 +1,156 @@ +/* +Copyright 2017 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package garbagecollector + +import ( + "io" + "net/http" + "strings" + "testing" + "time" + + "k8s.io/apimachinery/pkg/api/errors" + "k8s.io/apimachinery/pkg/api/resource" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/types" + "k8s.io/apimachinery/pkg/util/wait" + restclient "k8s.io/client-go/rest" + "k8s.io/kubernetes/pkg/api/v1" + "k8s.io/kubernetes/test/integration/framework" +) + +type roundTripFunc func(req *http.Request) (*http.Response, error) + +func (w roundTripFunc) RoundTrip(req *http.Request) (*http.Response, error) { + return w(req) +} + +type readDelayer struct { + delay time.Duration + io.ReadCloser +} + +func (b *readDelayer) Read(p []byte) (n int, err error) { + defer time.Sleep(b.delay) + return b.ReadCloser.Read(p) +} + +func TestClusterScopedOwners(t *testing.T) { + stopCh := make(chan struct{}) + + // Start the test server and wrap the client to delay PV watch responses + masterConfig := framework.NewIntegrationTestMasterConfig() + masterConfig.EnableCoreControllers = false + _, s, closeFn := framework.RunAMaster(masterConfig) + config := &restclient.Config{Host: s.URL} + config.WrapTransport = func(rt http.RoundTripper) http.RoundTripper { + return roundTripFunc(func(req *http.Request) (*http.Response, error) { + if req.URL.Query().Get("watch") != "true" || !strings.Contains(req.URL.String(), "persistentvolumes") { + return rt.RoundTrip(req) + } + resp, err := rt.RoundTrip(req) + if err != nil { + return resp, err + } + resp.Body = &readDelayer{30 * time.Second, resp.Body} + return resp, err + }) + } + gc, clientSet := setupWithServer(t, config, stopCh) + defer func() { + // We have to close the stop channel first, so the shared informers can terminate their watches; + // otherwise closeFn() will hang waiting for active client connections to finish. + close(stopCh) + closeFn() + }() + go gc.Run(5, stopCh) + + ns := framework.CreateTestingNamespace("gc-cluster-scope-deletion", s, t) + defer framework.DeleteTestingNamespace(ns, s, t) + + t.Log("Create a pair of objects") + pv, err := clientSet.CoreV1().PersistentVolumes().Create(&v1.PersistentVolume{ + ObjectMeta: metav1.ObjectMeta{Name: "pv-valid"}, + Spec: v1.PersistentVolumeSpec{ + PersistentVolumeSource: v1.PersistentVolumeSource{HostPath: &v1.HostPathVolumeSource{Path: "/foo"}}, + Capacity: v1.ResourceList{v1.ResourceStorage: resource.MustParse("1Gi")}, + AccessModes: []v1.PersistentVolumeAccessMode{v1.ReadWriteMany}, + }, + }) + if err != nil { + t.Fatal(err) + } + if _, err := clientSet.CoreV1().ConfigMaps(ns.Name).Create(&v1.ConfigMap{ + ObjectMeta: metav1.ObjectMeta{ + Name: "cm-valid", + OwnerReferences: []metav1.OwnerReference{{Kind: "PersistentVolume", APIVersion: "v1", Name: pv.Name, UID: pv.UID}}, + }, + }); err != nil { + t.Fatal(err) + } + + t.Log("Create a namespaced object with a missing parent") + if _, err := clientSet.CoreV1().ConfigMaps(ns.Name).Create(&v1.ConfigMap{ + ObjectMeta: metav1.ObjectMeta{ + Name: "cm-missing", + Labels: map[string]string{"missing": "true"}, + OwnerReferences: []metav1.OwnerReference{{Kind: "PersistentVolume", APIVersion: "v1", Name: "missing-name", UID: types.UID("missing-uid")}}, + }, + }); err != nil { + t.Fatal(err) + } + + t.Log("Create a namespaced object with a missing type parent") + if _, err := clientSet.CoreV1().ConfigMaps(ns.Name).Create(&v1.ConfigMap{ + ObjectMeta: metav1.ObjectMeta{ + Name: "cm-invalid", + OwnerReferences: []metav1.OwnerReference{{Kind: "UnknownType", APIVersion: "unknown.group/v1", Name: "invalid-name", UID: types.UID("invalid-uid")}}, + }, + }); err != nil { + t.Fatal(err) + } + + // wait for deletable children to go away + if err := wait.Poll(5*time.Second, 300*time.Second, func() (bool, error) { + _, err := clientSet.CoreV1().ConfigMaps(ns.Name).Get("cm-missing", metav1.GetOptions{}) + switch { + case errors.IsNotFound(err): + return true, nil + case err != nil: + return false, err + default: + t.Logf("cm with missing parent still exists, retrying") + return false, nil + } + }); err != nil { + t.Fatal(err) + } + t.Logf("deletable children removed") + + // Give time for blocked children to be incorrectly cleaned up + time.Sleep(5 * time.Second) + + // ensure children with unverifiable parents don't get reaped + if _, err := clientSet.CoreV1().ConfigMaps(ns.Name).Get("cm-invalid", metav1.GetOptions{}); err != nil { + t.Fatalf("child with invalid ownerRef is unexpectedly missing: %v", err) + } + + // ensure children with present parents don't get reaped + if _, err := clientSet.CoreV1().ConfigMaps(ns.Name).Get("cm-valid", metav1.GetOptions{}); err != nil { + t.Fatalf("child with valid ownerRef is unexpectedly missing: %v", err) + } +} diff --git a/vendor/k8s.io/kubernetes/test/integration/garbagecollector/garbage_collector_test.go b/vendor/k8s.io/kubernetes/test/integration/garbagecollector/garbage_collector_test.go index 56e3ea421395..2b7f46b89904 100644 --- a/vendor/k8s.io/kubernetes/test/integration/garbagecollector/garbage_collector_test.go +++ b/vendor/k8s.io/kubernetes/test/integration/garbagecollector/garbage_collector_test.go @@ -128,8 +128,13 @@ func setup(t *testing.T, stop chan struct{}) (*httptest.Server, framework.CloseF masterConfig := framework.NewIntegrationTestMasterConfig() masterConfig.EnableCoreControllers = false _, s, closeFn := framework.RunAMaster(masterConfig) + config := &restclient.Config{Host: s.URL} + gc, clientSet := setupWithServer(t, config, stop) + return s, closeFn, gc, clientSet +} - clientSet, err := clientset.NewForConfig(&restclient.Config{Host: s.URL}) +func setupWithServer(t *testing.T, config *restclient.Config, stop chan struct{}) (*garbagecollector.GarbageCollector, clientset.Interface) { + clientSet, err := clientset.NewForConfig(config) if err != nil { t.Fatalf("Error in create clientset: %v", err) } @@ -142,7 +147,6 @@ func setup(t *testing.T, stop chan struct{}) (*httptest.Server, framework.CloseF if err != nil { t.Fatalf("Failed to parse supported resources from server: %v", err) } - config := &restclient.Config{Host: s.URL} config.ContentConfig.NegotiatedSerializer = serializer.DirectCodecFactory{CodecFactory: metaonly.NewMetadataCodecFactory()} metaOnlyClientPool := dynamic.NewClientPool(config, api.Registry.RESTMapper(), dynamic.LegacyAPIPathResolverFunc) config.ContentConfig.NegotiatedSerializer = nil @@ -165,7 +169,7 @@ func setup(t *testing.T, stop chan struct{}) (*httptest.Server, framework.CloseF go sharedInformers.Start(stop) - return s, closeFn, gc, clientSet + return gc, clientSet } // This test simulates the cascading deletion.