diff --git a/cmd/gce-pd-csi-driver/main.go b/cmd/gce-pd-csi-driver/main.go index 2447572bc..9406639fb 100644 --- a/cmd/gce-pd-csi-driver/main.go +++ b/cmd/gce-pd-csi-driver/main.go @@ -27,6 +27,8 @@ import ( "strings" "time" + "k8s.io/client-go/kubernetes" + "k8s.io/client-go/rest" "k8s.io/klog/v2" "k8s.io/utils/strings/slices" @@ -76,6 +78,7 @@ var ( fallbackRequisiteZonesFlag = flag.String("fallback-requisite-zones", "", "Comma separated list of requisite zones that will be used if there are not sufficient zones present in requisite topologies when provisioning a disk") enableStoragePoolsFlag = flag.Bool("enable-storage-pools", false, "If set to true, the CSI Driver will allow volumes to be provisioned in Storage Pools") enableHdHAFlag = flag.Bool("allow-hdha-provisioning", false, "If set to true, will allow the driver to provision Hyperdisk-balanced High Availability disks") + nodeName = flag.String("node-name", "", "The node this driver is running on") multiZoneVolumeHandleDiskTypesFlag = flag.String("multi-zone-volume-handle-disk-types", "", "Comma separated list of allowed disk types that can use the multi-zone volumeHandle. Used only if --multi-zone-volume-handle-enable") multiZoneVolumeHandleEnableFlag = flag.Bool("multi-zone-volume-handle-enable", false, "If set to true, the multi-zone volumeHandle feature will be enabled") @@ -244,11 +247,24 @@ func handle() { if err != nil { klog.Fatalf("Failed to set up metadata service: %v", err.Error()) } + + // Instantiate a kubeClient and pass it. + klog.V(2).Infof("Setting up kubeClient") + cfg, err := rest.InClusterConfig() + if err != nil { + klog.Fatalf("Failed to create REST Config for k8s client: %v", err.Error()) + } + kubeClient, err := kubernetes.NewForConfig(cfg) + if err != nil { + klog.Fatalf("Failed to create k8s client: %v", err.Error()) + } + nsArgs := driver.NodeServerArgs{ EnableDeviceInUseCheck: *enableDeviceInUseCheck, DeviceInUseTimeout: *deviceInUseTimeout, } - nodeServer = driver.NewNodeServer(gceDriver, mounter, deviceUtils, meta, statter, nsArgs) + + nodeServer = driver.NewNodeServer(gceDriver, mounter, deviceUtils, meta, statter, kubeClient, nsArgs) if *maxConcurrentFormatAndMount > 0 { nodeServer = nodeServer.WithSerializedFormatAndMount(*formatAndMountTimeout, *maxConcurrentFormatAndMount) } diff --git a/deploy/kubernetes/base/controller/cluster_setup.yaml b/deploy/kubernetes/base/controller/cluster_setup.yaml index a6941a119..e1462493f 100644 --- a/deploy/kubernetes/base/controller/cluster_setup.yaml +++ b/deploy/kubernetes/base/controller/cluster_setup.yaml @@ -205,6 +205,9 @@ rules: verbs: ['use'] resourceNames: - csi-gce-pd-node-psp + - apiGroups: [""] # The core API group + resources: ["nodes"] + verbs: ["get", "list"] --- kind: ClusterRole @@ -217,6 +220,9 @@ rules: verbs: ['use'] resourceNames: - csi-gce-pd-node-psp-win + - apiGroups: [""] # The core API group + resources: ["nodes"] + verbs: ["get", "list"] --- apiVersion: rbac.authorization.k8s.io/v1 diff --git a/deploy/kubernetes/images/stable-master/image.yaml b/deploy/kubernetes/images/stable-master/image.yaml index 2d2df2367..aea2b6566 100644 --- a/deploy/kubernetes/images/stable-master/image.yaml +++ b/deploy/kubernetes/images/stable-master/image.yaml @@ -51,6 +51,6 @@ imageTag: name: gke.gcr.io/gcp-compute-persistent-disk-csi-driver # Don't change stable image without changing pdImagePlaceholder in # test/k8s-integration/main.go - newName: registry.k8s.io/cloud-provider-gcp/gcp-compute-persistent-disk-csi-driver - newTag: "v1.15.0" + newName: us-central1-docker.pkg.dev/juliankatz-joonix/csi-dev/gcp-compute-persistent-disk-csi-driver + newTag: "latest" --- diff --git a/pkg/common/constants.go b/pkg/common/constants.go index 9851b11d1..7cbaa8b48 100644 --- a/pkg/common/constants.go +++ b/pkg/common/constants.go @@ -17,8 +17,9 @@ limitations under the License. package common const ( + TopologyKeyPrefix = "topology.gke.io/" // Keys for Topology. This key will be shared amongst drivers from GCP - TopologyKeyZone = "topology.gke.io/zone" + TopologyKeyZone = TopologyKeyPrefix + "zone" // VolumeAttributes for Partition VolumeAttributePartition = "partition" diff --git a/pkg/common/utils.go b/pkg/common/utils.go index a69d30453..e83796504 100644 --- a/pkg/common/utils.go +++ b/pkg/common/utils.go @@ -78,6 +78,8 @@ const ( // Full or partial URL of the zone resource, in the format: // projects/{project}/zones/{zone} zoneURIPattern = "projects/[^/]+/zones/([^/]+)$" + + gkeTopologyLabelPrefix = "topology.gke.io/" ) var ( @@ -696,3 +698,11 @@ func NewLimiter(limit, burst int, emptyBucket bool) *rate.Limiter { return limiter } + +func IsGKETopologyLabel(key string) bool { + // This is the actual code + // return strings.HasPrefix(key, gkeTopologyLabelPrefix) + + // More permissive code for testing + return strings.HasPrefix(key, "topology.gke") +} diff --git a/pkg/gce-pd-csi-driver/gce-pd-driver.go b/pkg/gce-pd-csi-driver/gce-pd-driver.go index 9133ce035..156ed57a8 100644 --- a/pkg/gce-pd-csi-driver/gce-pd-driver.go +++ b/pkg/gce-pd-csi-driver/gce-pd-driver.go @@ -21,6 +21,7 @@ import ( csi "github.com/container-storage-interface/spec/lib/go/csi" "google.golang.org/grpc/codes" "google.golang.org/grpc/status" + "k8s.io/client-go/kubernetes" "k8s.io/klog/v2" "k8s.io/mount-utils" common "sigs.k8s.io/gcp-compute-persistent-disk-csi-driver/pkg/common" @@ -145,7 +146,7 @@ func NewIdentityServer(gceDriver *GCEDriver) *GCEIdentityServer { } } -func NewNodeServer(gceDriver *GCEDriver, mounter *mount.SafeFormatAndMount, deviceUtils deviceutils.DeviceUtils, meta metadataservice.MetadataService, statter mountmanager.Statter, args NodeServerArgs) *GCENodeServer { +func NewNodeServer(gceDriver *GCEDriver, mounter *mount.SafeFormatAndMount, deviceUtils deviceutils.DeviceUtils, meta metadataservice.MetadataService, statter mountmanager.Statter, kubeClient *kubernetes.Clientset, args NodeServerArgs) *GCENodeServer { return &GCENodeServer{ Driver: gceDriver, Mounter: mounter, @@ -155,6 +156,7 @@ func NewNodeServer(gceDriver *GCEDriver, mounter *mount.SafeFormatAndMount, devi VolumeStatter: statter, enableDeviceInUseCheck: args.EnableDeviceInUseCheck, deviceInUseErrors: newDeviceErrMap(args.DeviceInUseTimeout), + kubeClient: kubeClient, } } diff --git a/pkg/gce-pd-csi-driver/node.go b/pkg/gce-pd-csi-driver/node.go index 7bc934036..a87a8b892 100644 --- a/pkg/gce-pd-csi-driver/node.go +++ b/pkg/gce-pd-csi-driver/node.go @@ -30,6 +30,8 @@ import ( csi "github.com/container-storage-interface/spec/lib/go/csi" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/client-go/kubernetes" "k8s.io/klog/v2" "k8s.io/mount-utils" @@ -47,6 +49,8 @@ type GCENodeServer struct { VolumeStatter mountmanager.Statter MetadataService metadataservice.MetadataService + kubeClient *kubernetes.Clientset + // A map storing all volumes with ongoing operations so that additional operations // for that same volume (as defined by VolumeID) return an Aborted error volumeLocks *common.VolumeLocks @@ -520,12 +524,25 @@ func (ns *GCENodeServer) NodeGetCapabilities(ctx context.Context, req *csi.NodeG } func (ns *GCENodeServer) NodeGetInfo(ctx context.Context, req *csi.NodeGetInfoRequest) (*csi.NodeGetInfoResponse, error) { + labels, err := ns.gkeTopologyLabels(ctx, ns.MetadataService.GetName()) + if err != nil { + // Perhaps we don't want to fail here. We are introducing a new + // dependency and we might be better off allowing this failure to + // happen and moving on to retrieve the zone from GCE MDS. + return nil, err + } + + labels[common.TopologyKeyZone] = ns.MetadataService.GetZone() + + // Each "Topology" struct will later be translated into an individual + // 'matchExpressions' block in the PV's NodeAffinity. Because we always + // need to match on both the zone AND the disk type, both the zone and the + // supported disks belong as segments on a single Topology. top := &csi.Topology{ - Segments: map[string]string{common.TopologyKeyZone: ns.MetadataService.GetZone()}, + Segments: labels, } nodeID := common.CreateNodeID(ns.MetadataService.GetProject(), ns.MetadataService.GetZone(), ns.MetadataService.GetName()) - volumeLimits, err := ns.GetVolumeLimits() resp := &csi.NodeGetInfoResponse{ @@ -533,9 +550,34 @@ func (ns *GCENodeServer) NodeGetInfo(ctx context.Context, req *csi.NodeGetInfoRe MaxVolumesPerNode: volumeLimits, AccessibleTopology: top, } + + klog.V(2).Infof("Returning NodeGetInfoResponse: %+v", resp) + return resp, err } +// gkeTopologyLabels retrieves the node labels with the prefix +// `topology.gke.io/`. +func (ns *GCENodeServer) gkeTopologyLabels(ctx context.Context, nodeName string) (map[string]string, error) { + klog.V(2).Infof("Retrieving node topology labels for node %q", nodeName) + + node, err := ns.kubeClient.CoreV1().Nodes().Get(ctx, nodeName, metav1.GetOptions{}) + if err != nil { + // We should retry instead. Need to figure out how much wrong-ness can be tolerated and how often CSINode gets refreshed. + return nil, err + } + + topology := make(map[string]string) + for k, v := range node.GetLabels() { + if common.IsGKETopologyLabel(k) { + klog.V(2).Infof("Including node topology label %q=%q", k, v) + topology[k] = v + } + } + + return topology, nil +} + func (ns *GCENodeServer) NodeGetVolumeStats(ctx context.Context, req *csi.NodeGetVolumeStatsRequest) (*csi.NodeGetVolumeStatsResponse, error) { if len(req.VolumeId) == 0 { return nil, status.Error(codes.InvalidArgument, "NodeGetVolumeStats volume ID was empty") diff --git a/pkg/gce-pd-csi-driver/node_test.go b/pkg/gce-pd-csi-driver/node_test.go index d1d0920bf..1d856bdae 100644 --- a/pkg/gce-pd-csi-driver/node_test.go +++ b/pkg/gce-pd-csi-driver/node_test.go @@ -51,7 +51,7 @@ func getTestGCEDriverWithCustomMounter(t *testing.T, mounter *mount.SafeFormatAn func getCustomTestGCEDriver(t *testing.T, mounter *mount.SafeFormatAndMount, deviceUtils deviceutils.DeviceUtils, metaService metadataservice.MetadataService) *GCEDriver { gceDriver := GetGCEDriver() - nodeServer := NewNodeServer(gceDriver, mounter, deviceUtils, metaService, mountmanager.NewFakeStatter(mounter), NodeServerArgs{true, 0}) + nodeServer := NewNodeServer(gceDriver, mounter, deviceUtils, metaService, mountmanager.NewFakeStatter(mounter), nil, NodeServerArgs{true, 0}) err := gceDriver.SetupGCEDriver(driver, "test-vendor", nil, nil, nil, nil, nodeServer) if err != nil { t.Fatalf("Failed to setup GCE Driver: %v", err) @@ -62,7 +62,7 @@ func getCustomTestGCEDriver(t *testing.T, mounter *mount.SafeFormatAndMount, dev func getTestBlockingMountGCEDriver(t *testing.T, readyToExecute chan chan struct{}) *GCEDriver { gceDriver := GetGCEDriver() mounter := mountmanager.NewFakeSafeBlockingMounter(readyToExecute) - nodeServer := NewNodeServer(gceDriver, mounter, deviceutils.NewFakeDeviceUtils(false), metadataservice.NewFakeService(), mountmanager.NewFakeStatter(mounter), NodeServerArgs{true, 0}) + nodeServer := NewNodeServer(gceDriver, mounter, deviceutils.NewFakeDeviceUtils(false), metadataservice.NewFakeService(), mountmanager.NewFakeStatter(mounter), nil, NodeServerArgs{true, 0}) err := gceDriver.SetupGCEDriver(driver, "test-vendor", nil, nil, nil, nil, nodeServer) if err != nil { t.Fatalf("Failed to setup GCE Driver: %v", err) @@ -73,7 +73,7 @@ func getTestBlockingMountGCEDriver(t *testing.T, readyToExecute chan chan struct func getTestBlockingFormatAndMountGCEDriver(t *testing.T, readyToExecute chan chan struct{}) *GCEDriver { gceDriver := GetGCEDriver() mounter := mountmanager.NewFakeSafeBlockingMounter(readyToExecute) - nodeServer := NewNodeServer(gceDriver, mounter, deviceutils.NewFakeDeviceUtils(false), metadataservice.NewFakeService(), mountmanager.NewFakeStatter(mounter), NodeServerArgs{true, 0}).WithSerializedFormatAndMount(5*time.Second, 1) + nodeServer := NewNodeServer(gceDriver, mounter, deviceutils.NewFakeDeviceUtils(false), metadataservice.NewFakeService(), mountmanager.NewFakeStatter(mounter), nil, NodeServerArgs{true, 0}).WithSerializedFormatAndMount(5*time.Second, 1) err := gceDriver.SetupGCEDriver(driver, "test-vendor", nil, nil, nil, nil, nodeServer) if err != nil { diff --git a/test/sanity/sanity_test.go b/test/sanity/sanity_test.go index 1ff33a41b..c7f1290c5 100644 --- a/test/sanity/sanity_test.go +++ b/test/sanity/sanity_test.go @@ -77,7 +77,7 @@ func TestSanity(t *testing.T) { identityServer := driver.NewIdentityServer(gceDriver) controllerServer := driver.NewControllerServer(gceDriver, cloudProvider, 0, 5*time.Minute, fallbackRequisiteZones, enableStoragePools, multiZoneVolumeHandleConfig, listVolumesConfig, provisionableDisksConfig, true) fakeStatter := mountmanager.NewFakeStatterWithOptions(mounter, mountmanager.FakeStatterOptions{IsBlock: false}) - nodeServer := driver.NewNodeServer(gceDriver, mounter, deviceUtils, metadataservice.NewFakeService(), fakeStatter, driver.NodeServerArgs{EnableDeviceInUseCheck: true, DeviceInUseTimeout: 0}) + nodeServer := driver.NewNodeServer(gceDriver, mounter, deviceUtils, metadataservice.NewFakeService(), fakeStatter, nil, driver.NodeServerArgs{EnableDeviceInUseCheck: true, DeviceInUseTimeout: 0}) err = gceDriver.SetupGCEDriver(driverName, vendorVersion, extraLabels, nil, identityServer, controllerServer, nodeServer) if err != nil { t.Fatalf("Failed to initialize GCE CSI Driver: %v", err.Error())