Skip to content

Commit 12c4ccb

Browse files
committed
[WIP] GKE-MT support for PDCSI
[WIP] Refactor token source code into the token_source.go file Cleaning up a few logs
1 parent 624f654 commit 12c4ccb

File tree

448 files changed

+36903
-36
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

448 files changed

+36903
-36
lines changed

cmd/gce-pd-csi-driver/main.go

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -76,6 +76,7 @@ var (
7676
enableStoragePoolsFlag = flag.Bool("enable-storage-pools", false, "If set to true, the CSI Driver will allow volumes to be provisioned in Storage Pools")
7777
enableHdHAFlag = flag.Bool("allow-hdha-provisioning", false, "If set to true, will allow the driver to provision Hyperdisk-balanced High Availability disks")
7878
enableDataCacheFlag = flag.Bool("enable-data-cache", false, "If set to true, the CSI Driver will allow volumes to be provisioned with Data Cache configuration")
79+
enableMultitenancyFlag = flag.Bool("enable-multitenancy", false, "If set to true, the CSI Driver will support running on multitenant GKE clusters")
7980
nodeName = flag.String("node-name", "", "The node this driver is running on")
8081

8182
multiZoneVolumeHandleDiskTypesFlag = flag.String("multi-zone-volume-handle-disk-types", "", "Comma separated list of allowed disk types that can use the multi-zone volumeHandle. Used only if --multi-zone-volume-handle-enable")
@@ -221,10 +222,15 @@ func handle() {
221222
// Initialize requirements for the controller service
222223
var controllerServer *driver.GCEControllerServer
223224
if *runControllerService {
224-
cloudProvider, err := gce.CreateCloudProvider(ctx, version, *cloudConfigFilePath, computeEndpoint, computeEnvironment, waitForAttachConfig, listInstancesConfig)
225+
cloudProvider, err := gce.CreateCloudProvider(ctx, version, *cloudConfigFilePath, computeEndpoint, computeEnvironment, waitForAttachConfig, listInstancesConfig, *enableMultitenancyFlag)
225226
if err != nil {
226227
klog.Fatalf("Failed to get cloud provider: %v", err.Error())
227228
}
229+
if *enableMultitenancyFlag {
230+
ctx, cancel := context.WithCancel(ctx)
231+
defer cancel()
232+
go cloudProvider.TenantInformer.Run(ctx.Done())
233+
}
228234
initialBackoffDuration := time.Duration(*errorBackoffInitialDurationMs) * time.Millisecond
229235
maxBackoffDuration := time.Duration(*errorBackoffMaxDurationMs) * time.Millisecond
230236
controllerServer = driver.NewControllerServer(gceDriver, cloudProvider, initialBackoffDuration, maxBackoffDuration, fallbackRequisiteZones, *enableStoragePoolsFlag, *enableDataCacheFlag, multiZoneVolumeHandleConfig, listVolumesConfig, provisionableDisksConfig, *enableHdHAFlag)

deploy/kubernetes/base/controller/controller.yaml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -145,6 +145,7 @@ spec:
145145
- "--supports-dynamic-iops-provisioning=hyperdisk-balanced,hyperdisk-extreme"
146146
- "--supports-dynamic-throughput-provisioning=hyperdisk-balanced,hyperdisk-throughput,hyperdisk-ml"
147147
- --enable-data-cache
148+
- --enable-multitenancy
148149
command:
149150
- /gce-pd-csi-driver
150151
env:

deploy/kubernetes/base/node_linux/node.yaml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -47,6 +47,7 @@ spec:
4747
- "--endpoint=unix:/csi/csi.sock"
4848
- "--run-controller-service=false"
4949
- "--enable-data-cache"
50+
- "--enable-multitenancy"
5051
- "--node-name=$(KUBE_NODE_NAME)"
5152
securityContext:
5253
privileged: true

deploy/kubernetes/images/stable-master/image.yaml

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -49,7 +49,8 @@ metadata:
4949
name: imagetag-gcepd-driver
5050
imageTag:
5151
name: gke.gcr.io/gcp-compute-persistent-disk-csi-driver
52-
# pdImagePlaceholder in test/k8s-integration/main.go is updated automatically with the newTag
53-
newName: registry.k8s.io/cloud-provider-gcp/gcp-compute-persistent-disk-csi-driver
54-
newTag: "v1.17.2"
52+
# Don't change stable image without changing pdImagePlaceholder in
53+
# test/k8s-integration/main.go
54+
newName: us-central1-docker.pkg.dev/enginakdemir-gke-dev/csi-dev/gcp-compute-persistent-disk-csi-driver
55+
newTag: "latest"
5556
---

go.mod

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,7 @@ require (
4141
k8s.io/mount-utils v0.32.3
4242
k8s.io/utils v0.0.0-20241104100929-3ea5e8cea738
4343
sigs.k8s.io/boskos v0.0.0-20220711194915-6cb8a6fb2dd1
44+
sigs.k8s.io/yaml v1.4.0
4445
)
4546

4647
require (
@@ -116,7 +117,6 @@ require (
116117
k8s.io/test-infra v0.0.0-20210730160938-8ad9b8c53bd8 // indirect
117118
sigs.k8s.io/json v0.0.0-20241010143419-9aa6b5e7a4b3 // indirect
118119
sigs.k8s.io/structured-merge-diff/v4 v4.6.0 // indirect
119-
sigs.k8s.io/yaml v1.4.0 // indirect
120120
)
121121

122122
replace k8s.io/client-go => k8s.io/client-go v0.32.2

pkg/gce-cloud-provider/compute/fake-gce.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -378,7 +378,7 @@ func (cloud *FakeCloudProvider) InsertInstance(instance *computev1.Instance, ins
378378
return
379379
}
380380

381-
func (cloud *FakeCloudProvider) GetInstanceOrError(ctx context.Context, instanceZone, instanceName string) (*computev1.Instance, error) {
381+
func (cloud *FakeCloudProvider) GetInstanceOrError(ctx context.Context, project, instanceZone, instanceName string) (*computev1.Instance, error) {
382382
instance, ok := cloud.instances[instanceName]
383383
if !ok {
384384
return nil, notFoundError()

pkg/gce-cloud-provider/compute/gce-compute.go

Lines changed: 70 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -117,7 +117,7 @@ type GCECompute interface {
117117
// Regional Disk Methods
118118
GetReplicaZoneURI(project string, zone string) string
119119
// Instance Methods
120-
GetInstanceOrError(ctx context.Context, instanceZone, instanceName string) (*computev1.Instance, error)
120+
GetInstanceOrError(ctx context.Context, project, instanceZone, instanceName string) (*computev1.Instance, error)
121121
// Zone Methods
122122
ListZones(ctx context.Context, region string) ([]string, error)
123123
ListSnapshots(ctx context.Context, filter string) ([]*computev1.Snapshot, string, error)
@@ -160,40 +160,67 @@ func (cloud *CloudProvider) listDisksInternal(ctx context.Context, fields []goog
160160
if err != nil {
161161
return nil, "", err
162162
}
163-
items := []*computev1.Disk{}
163+
disks := []*computev1.Disk{}
164164

165-
// listing out regional disks in the region
166-
rlCall := cloud.service.RegionDisks.List(cloud.project, region)
165+
// listing out regional disks in the region for each project
166+
for p, s := range cloud.tenantServiceMap {
167+
klog.Infof("Getting regional disks for project: %s", p)
168+
rDisks, err := listRegionalDisksForProject(s, p, region, fields, filter)
169+
if err != nil {
170+
return nil, "", err
171+
}
172+
disks = append(disks, rDisks...)
173+
}
174+
175+
// listing out zonal disks in all zones of the region for each project
176+
for p, s := range cloud.tenantServiceMap {
177+
klog.Infof("Getting zonal disks for project: %s", p)
178+
zDisks, err := listZonalDisksForProject(s, p, zones, fields, filter)
179+
if err != nil {
180+
return nil, "", err
181+
}
182+
disks = append(disks, zDisks...)
183+
}
184+
185+
return disks, "", nil
186+
}
187+
188+
func listRegionalDisksForProject(service *computev1.Service, project string, region string, fields []googleapi.Field, filter string) ([]*computev1.Disk, error) {
189+
items := []*computev1.Disk{}
190+
rlCall := service.RegionDisks.List(project, region)
167191
rlCall.Fields(fields...)
168192
rlCall.Filter(filter)
169193
nextPageToken := "pageToken"
170194
for nextPageToken != "" {
171195
rDiskList, err := rlCall.Do()
172196
if err != nil {
173-
return nil, "", err
197+
return nil, err
174198
}
175199
items = append(items, rDiskList.Items...)
176200
nextPageToken = rDiskList.NextPageToken
177201
rlCall.PageToken(nextPageToken)
178202
}
203+
return items, nil
204+
}
179205

180-
// listing out zonal disks in all zones of the region
206+
func listZonalDisksForProject(service *computev1.Service, project string, zones []string, fields []googleapi.Field, filter string) ([]*computev1.Disk, error) {
207+
items := []*computev1.Disk{}
181208
for _, zone := range zones {
182-
lCall := cloud.service.Disks.List(cloud.project, zone)
209+
lCall := service.Disks.List(project, zone)
183210
lCall.Fields(fields...)
184211
lCall.Filter(filter)
185212
nextPageToken := "pageToken"
186213
for nextPageToken != "" {
187214
diskList, err := lCall.Do()
188215
if err != nil {
189-
return nil, "", err
216+
return nil, err
190217
}
191218
items = append(items, diskList.Items...)
192219
nextPageToken = diskList.NextPageToken
193220
lCall.PageToken(nextPageToken)
194221
}
195222
}
196-
return items, "", nil
223+
return items, nil
197224
}
198225

199226
// ListInstances lists instances based on maxEntries and pageToken for the project and region
@@ -210,8 +237,22 @@ func (cloud *CloudProvider) ListInstances(ctx context.Context, fields []googleap
210237
}
211238
items := []*computev1.Instance{}
212239

240+
for p, s := range cloud.tenantServiceMap {
241+
instances, err := cloud.listInstancesForProject(s, p, zones, fields)
242+
if err != nil {
243+
return nil, "", err
244+
}
245+
items = append(items, instances...)
246+
}
247+
248+
return items, "", nil
249+
}
250+
251+
func (cloud *CloudProvider) listInstancesForProject(service *computev1.Service, project string, zones []string, fields []googleapi.Field) ([]*computev1.Instance, error) {
252+
items := []*computev1.Instance{}
253+
213254
for _, zone := range zones {
214-
lCall := cloud.service.Instances.List(cloud.project, zone)
255+
lCall := service.Instances.List(project, zone)
215256
for _, filter := range cloud.listInstancesConfig.Filters {
216257
lCall = lCall.Filter(filter)
217258
}
@@ -220,15 +261,14 @@ func (cloud *CloudProvider) ListInstances(ctx context.Context, fields []googleap
220261
for nextPageToken != "" {
221262
instancesList, err := lCall.Do()
222263
if err != nil {
223-
return nil, "", err
264+
return nil, err
224265
}
225266
items = append(items, instancesList.Items...)
226267
nextPageToken = instancesList.NextPageToken
227268
lCall.PageToken(nextPageToken)
228269
}
229270
}
230-
231-
return items, "", nil
271+
return items, nil
232272
}
233273

234274
// RepairUnderspecifiedVolumeKey will query the cloud provider and check each zone for the disk specified
@@ -857,7 +897,11 @@ func (cloud *CloudProvider) AttachDisk(ctx context.Context, project string, volK
857897
ForceAttach: forceAttach,
858898
}
859899

860-
op, err := cloud.service.Instances.AttachDisk(project, instanceZone, instanceName, attachedDiskV1).Context(ctx).ForceAttach(forceAttach).Do()
900+
service := cloud.service
901+
if _, ok := cloud.tenantServiceMap[project]; ok {
902+
service = cloud.tenantServiceMap[project]
903+
}
904+
op, err := service.Instances.AttachDisk(project, instanceZone, instanceName, attachedDiskV1).Context(ctx).ForceAttach(forceAttach).Do()
861905
if err != nil {
862906
return fmt.Errorf("failed cloud service attach disk call: %w", err)
863907
}
@@ -872,7 +916,11 @@ func (cloud *CloudProvider) AttachDisk(ctx context.Context, project string, volK
872916

873917
func (cloud *CloudProvider) DetachDisk(ctx context.Context, project, deviceName, instanceZone, instanceName string) error {
874918
klog.V(5).Infof("Detaching disk %v from %v", deviceName, instanceName)
875-
op, err := cloud.service.Instances.DetachDisk(project, instanceZone, instanceName, deviceName).Context(ctx).Do()
919+
service := cloud.service
920+
if _, ok := cloud.tenantServiceMap[project]; ok {
921+
service = cloud.tenantServiceMap[project]
922+
}
923+
op, err := service.Instances.DetachDisk(project, instanceZone, instanceName, deviceName).Context(ctx).Do()
876924
if err != nil {
877925
return err
878926
}
@@ -1041,7 +1089,7 @@ func (cloud *CloudProvider) waitForAttachOnInstance(ctx context.Context, project
10411089
start := time.Now()
10421090
return wait.ExponentialBackoff(AttachDiskBackoff, func() (bool, error) {
10431091
klog.V(6).Infof("Polling instances.get for attach of disk %v to instance %v to complete for %v", volKey.Name, instanceName, time.Since(start))
1044-
instance, err := cloud.GetInstanceOrError(ctx, instanceZone, instanceName)
1092+
instance, err := cloud.GetInstanceOrError(ctx, project, instanceZone, instanceName)
10451093
if err != nil {
10461094
return false, fmt.Errorf("GetInstance failed to get instance: %w", err)
10471095
}
@@ -1145,10 +1193,13 @@ func opIsDone(op *computev1.Operation) (bool, error) {
11451193
return true, nil
11461194
}
11471195

1148-
func (cloud *CloudProvider) GetInstanceOrError(ctx context.Context, instanceZone, instanceName string) (*computev1.Instance, error) {
1196+
func (cloud *CloudProvider) GetInstanceOrError(ctx context.Context, project, instanceZone, instanceName string) (*computev1.Instance, error) {
11491197
klog.V(5).Infof("Getting instance %v from zone %v", instanceName, instanceZone)
1150-
project := cloud.project
1151-
instance, err := cloud.service.Instances.Get(project, instanceZone, instanceName).Do()
1198+
service := cloud.service
1199+
if _, ok := cloud.tenantServiceMap[project]; ok {
1200+
service = cloud.tenantServiceMap[project]
1201+
}
1202+
instance, err := service.Instances.Get(project, instanceZone, instanceName).Do()
11521203
if err != nil {
11531204
return nil, err
11541205
}

pkg/gce-cloud-provider/compute/gce.go

Lines changed: 74 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -22,13 +22,16 @@ import (
2222
"net/url"
2323
"os"
2424
"runtime"
25+
"sync"
2526
"time"
2627

2728
"golang.org/x/oauth2/google"
2829
"golang.org/x/time/rate"
2930
"google.golang.org/api/option"
3031
"gopkg.in/gcfg.v1"
32+
"k8s.io/client-go/tools/cache"
3133
"sigs.k8s.io/gcp-compute-persistent-disk-csi-driver/pkg/common"
34+
"sigs.k8s.io/gcp-compute-persistent-disk-csi-driver/pkg/gce-cloud-provider/compute/tenancy"
3235

3336
"cloud.google.com/go/compute/metadata"
3437
rscmgr "cloud.google.com/go/resourcemanager/apiv3"
@@ -81,7 +84,8 @@ var (
8184
// snapshotsType is the resource type of compute snapshots.
8285
snapshotsType ResourceType = "snapshots"
8386
// imagesType is the resource type of compute images.
84-
imagesType ResourceType = "images"
87+
imagesType ResourceType = "images"
88+
tenantServiceMutex sync.Mutex
8589
)
8690

8791
// CloudProvider only supports GCE v1/beta Disk APIs. See
@@ -102,6 +106,10 @@ type CloudProvider struct {
102106

103107
listInstancesConfig ListInstancesConfig
104108

109+
TenantInformer tenancy.TenantsInformer
110+
// tenantServiceMap maintains Compute Services for the default project as well as any tenant projects for any tenant-aware GCE operations
111+
tenantServiceMap map[string]*compute.Service
112+
105113
enableHdHA bool
106114
}
107115

@@ -133,7 +141,7 @@ type ConfigGlobal struct {
133141
Zone string `gcfg:"zone"`
134142
}
135143

136-
func CreateCloudProvider(ctx context.Context, vendorVersion string, configPath string, computeEndpoint *url.URL, computeEnvironment Environment, waitForAttachConfig WaitForAttachConfig, listInstancesConfig ListInstancesConfig) (*CloudProvider, error) {
144+
func CreateCloudProvider(ctx context.Context, vendorVersion string, configPath string, computeEndpoint *url.URL, computeEnvironment Environment, waitForAttachConfig WaitForAttachConfig, listInstancesConfig ListInstancesConfig, multiTenancyEnabled bool) (*CloudProvider, error) {
137145
configFile, err := readConfig(configPath)
138146
if err != nil {
139147
return nil, err
@@ -162,10 +170,10 @@ func CreateCloudProvider(ctx context.Context, vendorVersion string, configPath s
162170

163171
project, zone, err := getProjectAndZone(configFile)
164172
if err != nil {
165-
return nil, fmt.Errorf("Failed getting Project and Zone: %w", err)
173+
return nil, fmt.Errorf("failed getting Project and Zone: %w", err)
166174
}
167175

168-
return &CloudProvider{
176+
cp := &CloudProvider{
169177
service: svc,
170178
betaService: betasvc,
171179
tokenSource: tokenSource,
@@ -177,8 +185,69 @@ func CreateCloudProvider(ctx context.Context, vendorVersion string, configPath s
177185
// GCP has a rate limit of 600 requests per minute, restricting
178186
// here to 8 requests per second.
179187
tagsRateLimiter: common.NewLimiter(gcpTagsRequestRateLimit, gcpTagsRequestTokenBucketSize, true),
180-
}, nil
188+
}
189+
190+
if multiTenancyEnabled {
191+
klog.Info("Setting up multitenancy")
192+
// Setup informant for tenant CR to automatically create tenant specific clients with tenant identities
193+
ti, err := tenancy.NewTenantsInformer(multiTenancyEnabled)
194+
if err != nil {
195+
return nil, fmt.Errorf("failed initializing tenant informer: %w", err)
196+
}
197+
cp.TenantInformer = ti
198+
cp.tenantServiceMap = map[string]*compute.Service{}
199+
cp.TenantInformer.AddEventHandler(cache.ResourceEventHandlerFuncs{
200+
AddFunc: func(obj any) {
201+
// Handle tenant creation
202+
klog.Infof("Tenant %s created", obj)
203+
204+
tenantMeta, err := tenancy.GetMetadataFromTenantCR(obj)
205+
if err != nil {
206+
klog.Errorf("error while extracting tenant metadata: %v", err)
207+
}
208+
209+
tenantServiceMutex.Lock()
210+
defer tenantServiceMutex.Unlock()
211+
212+
if _, ok := cp.tenantServiceMap[tenantMeta.ProjectNumber]; ok {
213+
klog.Infof("tenant GCE client already exists, skipping GCE client instantiation for tenant(%s) with project number(%s)", tenantMeta.TenantName, tenantMeta.ProjectNumber)
214+
return
215+
}
216+
217+
region, err := common.GetRegionFromZones([]string{zone})
218+
if err != nil {
219+
klog.Errorf("error getting region from zone(%s): %v", zone, err)
220+
return
221+
}
222+
tokenSource, err := NewTenantTokenSource(tenantMeta, region, configFile.Global.TokenURL, configFile.Global.TokenBody)
223+
if err != nil {
224+
klog.Errorf("error during tenant token generation: %v", err.Error())
225+
}
226+
klog.Infof("Compute endpoint in add func: %s", computeEndpoint)
227+
klog.Infof("Compute environment in add func: %s", computeEnvironment)
228+
229+
svc, err := createCloudService(ctx, vendorVersion, tokenSource, computeEndpoint, computeEnvironment)
230+
if err != nil {
231+
klog.Errorf("error while creating compute service with tenant identity: %v", err)
232+
return
233+
}
234+
cp.tenantServiceMap[tenantMeta.ProjectNumber] = svc
235+
},
236+
UpdateFunc: func(oldObj, newObj any) {},
237+
DeleteFunc: func(obj any) {
238+
klog.Infof("Tenant %s deleted", obj)
239+
tenantMeta, err := tenancy.GetMetadataFromTenantCR(obj)
240+
if err != nil {
241+
klog.Errorf("error while extracting teantn metadata: %v", err)
242+
}
243+
tenantServiceMutex.Lock()
244+
defer tenantServiceMutex.Unlock()
245+
delete(cp.tenantServiceMap, tenantMeta.ProjectNumber)
246+
},
247+
})
248+
}
181249

250+
return cp, nil
182251
}
183252

184253
func generateTokenSource(ctx context.Context, configFile *ConfigFile) (oauth2.TokenSource, error) {

0 commit comments

Comments
 (0)