Skip to content

Commit e1ec0ac

Browse files
committed
We need to ensure that we are not reading stale PV objects
So as we don't end up resizing same object more than once
1 parent d017b1f commit e1ec0ac

File tree

4 files changed

+183
-43
lines changed

4 files changed

+183
-43
lines changed

pkg/controller/controller.go

Lines changed: 80 additions & 37 deletions
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,6 @@ import (
2626
"google.golang.org/grpc/codes"
2727
"google.golang.org/grpc/status"
2828
v1 "k8s.io/api/core/v1"
29-
k8serrors "k8s.io/apimachinery/pkg/api/errors"
3029
"k8s.io/apimachinery/pkg/api/resource"
3130
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
3231
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
@@ -55,15 +54,18 @@ type resizeController struct {
5554
kubeClient kubernetes.Interface
5655
claimQueue workqueue.RateLimitingInterface
5756
eventRecorder record.EventRecorder
58-
pvLister corelisters.PersistentVolumeLister
5957
pvSynced cache.InformerSynced
60-
pvcLister corelisters.PersistentVolumeClaimLister
6158
pvcSynced cache.InformerSynced
6259

6360
usedPVCs *inUsePVCStore
6461

65-
podLister corelisters.PodLister
66-
podListerSynced cache.InformerSynced
62+
podLister corelisters.PodLister
63+
podListerSynced cache.InformerSynced
64+
65+
// a cache to store PersistentVolume objects
66+
volumes cache.Store
67+
// a cache to store PersistentVolumeClaim objects
68+
claims cache.Store
6769
handleVolumeInUseError bool
6870
}
6971

@@ -91,11 +93,11 @@ func NewResizeController(
9193
name: name,
9294
resizer: resizer,
9395
kubeClient: kubeClient,
94-
pvLister: pvInformer.Lister(),
9596
pvSynced: pvInformer.Informer().HasSynced,
96-
pvcLister: pvcInformer.Lister(),
9797
pvcSynced: pvcInformer.Informer().HasSynced,
9898
claimQueue: claimQueue,
99+
volumes: pvInformer.Informer().GetStore(),
100+
claims: pvcInformer.Informer().GetStore(),
99101
eventRecorder: eventRecorder,
100102
usedPVCs: newUsedPVCStore(),
101103
handleVolumeInUseError: handleVolumeInUseError,
@@ -266,6 +268,7 @@ func (ctrl *resizeController) syncPVCs() {
266268

267269
if err := ctrl.syncPVC(key.(string)); err != nil {
268270
// Put PVC back to the queue so that we can retry later.
271+
klog.Errorf("Error syncing PVC: %v", err)
269272
ctrl.claimQueue.AddRateLimited(key)
270273
} else {
271274
ctrl.claimQueue.Forget(key)
@@ -278,33 +281,42 @@ func (ctrl *resizeController) syncPVC(key string) error {
278281

279282
namespace, name, err := cache.SplitMetaNamespaceKey(key)
280283
if err != nil {
281-
klog.Errorf("Split meta namespace key of pvc %s failed: %v", key, err)
282-
return err
284+
return fmt.Errorf("getting namespace and name from key %s failed: %v", key, err)
283285
}
284286

285-
pvc, err := ctrl.pvcLister.PersistentVolumeClaims(namespace).Get(name)
287+
pvcObject, exists, err := ctrl.claims.GetByKey(key)
286288
if err != nil {
287-
if k8serrors.IsNotFound(err) {
288-
klog.V(3).Infof("PVC %s/%s is deleted, no need to process it", namespace, name)
289-
return nil
290-
}
291-
klog.Errorf("Get PVC %s/%s failed: %v", namespace, name, err)
292-
return err
289+
return fmt.Errorf("getting PVC %s/%s failed: %v", namespace, name, err)
290+
}
291+
292+
if !exists {
293+
klog.V(3).Infof("PVC %s/%s is deleted or does not exist", namespace, name)
294+
return nil
295+
}
296+
297+
pvc, ok := pvcObject.(*v1.PersistentVolumeClaim)
298+
if !ok {
299+
return fmt.Errorf("expected PVC got: %v", pvcObject)
293300
}
294301

295302
if !ctrl.pvcNeedResize(pvc) {
296303
klog.V(4).Infof("No need to resize PVC %q", util.PVCKey(pvc))
297304
return nil
298305
}
299306

300-
pv, err := ctrl.pvLister.Get(pvc.Spec.VolumeName)
307+
volumeObj, exists, err := ctrl.volumes.GetByKey(pvc.Spec.VolumeName)
301308
if err != nil {
302-
if k8serrors.IsNotFound(err) {
303-
klog.V(3).Infof("PV %s is deleted, no need to process it", pvc.Spec.VolumeName)
304-
return nil
305-
}
306-
klog.Errorf("Get PV %q of pvc %q failed: %v", pvc.Spec.VolumeName, util.PVCKey(pvc), err)
307-
return err
309+
return fmt.Errorf("Get PV %q of pvc %q failed: %v", pvc.Spec.VolumeName, util.PVCKey(pvc), err)
310+
}
311+
312+
if !exists {
313+
klog.Warningf("PV %q bound to PVC %s not found", pvc.Spec.VolumeName, util.PVCKey(pvc))
314+
return nil
315+
}
316+
317+
pv, ok := volumeObj.(*v1.PersistentVolume)
318+
if !ok {
319+
return fmt.Errorf("expected volume but got %+v", volumeObj)
308320
}
309321

310322
if !ctrl.pvNeedResize(pvc, pv) {
@@ -369,8 +381,7 @@ func (ctrl *resizeController) pvNeedResize(pvc *v1.PersistentVolumeClaim, pv *v1
369381
// 3. Mark pvc as resizing finished(no error, no need to resize fs), need resizing fs or resize failed.
370382
func (ctrl *resizeController) resizePVC(pvc *v1.PersistentVolumeClaim, pv *v1.PersistentVolume) error {
371383
if updatedPVC, err := ctrl.markPVCResizeInProgress(pvc); err != nil {
372-
klog.Errorf("Mark pvc %q as resizing failed: %v", util.PVCKey(pvc), err)
373-
return err
384+
return fmt.Errorf("marking pvc %q as resizing failed: %v", util.PVCKey(pvc), err)
374385
} else if updatedPVC != nil {
375386
pvc = updatedPVC
376387
}
@@ -406,7 +417,6 @@ func (ctrl *resizeController) resizePVC(pvc *v1.PersistentVolumeClaim, pv *v1.Pe
406417
// Record an event to indicate that resize operation is failed.
407418
ctrl.eventRecorder.Eventf(pvc, v1.EventTypeWarning, util.VolumeResizeFailed, err.Error())
408419
}
409-
410420
return err
411421
}
412422

@@ -426,18 +436,17 @@ func (ctrl *resizeController) resizeVolume(
426436
newSize, fsResizeRequired, err := ctrl.resizer.Resize(pv, requestSize)
427437

428438
if err != nil {
429-
klog.Errorf("Resize volume %q by resizer %q failed: %v", pv.Name, ctrl.name, err)
430439
// if this error was a in-use error then it must be tracked so as we don't retry without
431440
// first verifying if volume is in-use
432441
if inUseError(err) {
433442
ctrl.usedPVCs.addPVCWithInUseError(pvc)
434443
}
435-
return newSize, fsResizeRequired, fmt.Errorf("resize volume %s failed: %v", pv.Name, err)
444+
return newSize, fsResizeRequired, fmt.Errorf("resize volume %q by resizer %q failed: %v", pv.Name, ctrl.name, err)
436445
}
437446
klog.V(4).Infof("Resize volume succeeded for volume %q, start to update PV's capacity", pv.Name)
438447

439-
if err := util.UpdatePVCapacity(pv, newSize, ctrl.kubeClient); err != nil {
440-
klog.Errorf("Update capacity of PV %q to %s failed: %v", pv.Name, newSize.String(), err)
448+
err = ctrl.updatePVCapacity(pv, newSize)
449+
if err != nil {
441450
return newSize, fsResizeRequired, err
442451
}
443452
klog.V(4).Infof("Update capacity of PV %q to %s succeeded", pv.Name, newSize.String())
@@ -455,7 +464,15 @@ func (ctrl *resizeController) markPVCResizeInProgress(pvc *v1.PersistentVolumeCl
455464
newPVC := pvc.DeepCopy()
456465
newPVC.Status.Conditions = util.MergeResizeConditionsOfPVC(newPVC.Status.Conditions,
457466
[]v1.PersistentVolumeClaimCondition{progressCondition})
458-
return util.PatchPVCStatus(pvc, newPVC, ctrl.kubeClient)
467+
updatedPVC, err := util.PatchPVCStatus(pvc, newPVC, ctrl.kubeClient)
468+
if err != nil {
469+
return nil, err
470+
}
471+
err = ctrl.claims.Update(updatedPVC)
472+
if err != nil {
473+
return nil, fmt.Errorf("error updating PVC %s in local cache: %v", util.PVCKey(newPVC), err)
474+
}
475+
return updatedPVC, nil
459476
}
460477

461478
func (ctrl *resizeController) markPVCResizeFinished(
@@ -464,9 +481,14 @@ func (ctrl *resizeController) markPVCResizeFinished(
464481
newPVC := pvc.DeepCopy()
465482
newPVC.Status.Capacity[v1.ResourceStorage] = newSize
466483
newPVC.Status.Conditions = util.MergeResizeConditionsOfPVC(pvc.Status.Conditions, []v1.PersistentVolumeClaimCondition{})
467-
if _, err := util.PatchPVCStatus(pvc, newPVC, ctrl.kubeClient); err != nil {
468-
klog.Errorf("Mark PVC %q as resize finished failed: %v", util.PVCKey(pvc), err)
469-
return err
484+
updatedPVC, err := util.PatchPVCStatus(pvc, newPVC, ctrl.kubeClient)
485+
if err != nil {
486+
return fmt.Errorf("Mark PVC %q as resize finished failed: %v", util.PVCKey(pvc), err)
487+
}
488+
489+
err = ctrl.claims.Update(updatedPVC)
490+
if err != nil {
491+
return fmt.Errorf("error updating PVC %s in local cache: %v", util.PVCKey(newPVC), err)
470492
}
471493

472494
klog.V(4).Infof("Resize PVC %q finished", util.PVCKey(pvc))
@@ -475,6 +497,20 @@ func (ctrl *resizeController) markPVCResizeFinished(
475497
return nil
476498
}
477499

500+
func (ctrl *resizeController) updatePVCapacity(pv *v1.PersistentVolume, newCapacity resource.Quantity) error {
501+
klog.V(4).Infof("Resize volume succeeded for volume %q, start to update PV's capacity", pv.Name)
502+
updatedPV, err := util.UpdatePVCapacity(pv, newCapacity, ctrl.kubeClient)
503+
504+
if err != nil {
505+
return fmt.Errorf("updating capacity of PV %q to %s failed: %v", pv.Name, newCapacity.String(), err)
506+
}
507+
err = ctrl.volumes.Update(updatedPV)
508+
if err != nil {
509+
return fmt.Errorf("error updating PV %s: %v", updatedPV.Name, err)
510+
}
511+
return nil
512+
}
513+
478514
func (ctrl *resizeController) markPVCAsFSResizeRequired(pvc *v1.PersistentVolumeClaim) error {
479515
pvcCondition := v1.PersistentVolumeClaimCondition{
480516
Type: v1.PersistentVolumeClaimFileSystemResizePending,
@@ -486,10 +522,17 @@ func (ctrl *resizeController) markPVCAsFSResizeRequired(pvc *v1.PersistentVolume
486522
newPVC.Status.Conditions = util.MergeResizeConditionsOfPVC(newPVC.Status.Conditions,
487523
[]v1.PersistentVolumeClaimCondition{pvcCondition})
488524

489-
if _, err := util.PatchPVCStatus(pvc, newPVC, ctrl.kubeClient); err != nil {
490-
klog.Errorf("Mark PVC %q as file system resize required failed: %v", util.PVCKey(pvc), err)
491-
return err
525+
updatedPVC, err := util.PatchPVCStatus(pvc, newPVC, ctrl.kubeClient)
526+
527+
if err != nil {
528+
return fmt.Errorf("Mark PVC %q as file system resize required failed: %v", util.PVCKey(pvc), err)
492529
}
530+
531+
err = ctrl.claims.Update(updatedPVC)
532+
if err != nil {
533+
return fmt.Errorf("error updating pvc %s in local cache: %v", util.PVCKey(updatedPVC), err)
534+
}
535+
493536
klog.V(4).Infof("Mark PVC %q as file system resize required", util.PVCKey(pvc))
494537
ctrl.eventRecorder.Eventf(pvc, v1.EventTypeNormal,
495538
util.FileSystemResizeRequired, "Require file system resize of volume on node")

pkg/controller/controller_test.go

Lines changed: 87 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -277,6 +277,93 @@ func TestController(t *testing.T) {
277277
}
278278
}
279279

280+
func TestResizePVC(t *testing.T) {
281+
fsVolumeMode := v1.PersistentVolumeFilesystem
282+
283+
for _, test := range []struct {
284+
Name string
285+
PVC *v1.PersistentVolumeClaim
286+
PV *v1.PersistentVolume
287+
288+
NodeResize bool
289+
expansionFailure bool
290+
expectFailure bool
291+
}{
292+
{
293+
Name: "Resize PVC with FS resize",
294+
PVC: createPVC(2, 1),
295+
PV: createPV(1, "testPVC", defaultNS, "foobar", &fsVolumeMode),
296+
NodeResize: true,
297+
},
298+
{
299+
Name: "Resize PVC with FS resize failure",
300+
PVC: createPVC(2, 1),
301+
PV: createPV(1, "testPVC", defaultNS, "foobar", &fsVolumeMode),
302+
NodeResize: true,
303+
expansionFailure: true,
304+
expectFailure: true,
305+
},
306+
} {
307+
client := csi.NewMockClient("mock", test.NodeResize, true, true)
308+
if test.expansionFailure {
309+
client.SetExpansionFailed()
310+
}
311+
driverName, _ := client.GetDriverName(context.TODO())
312+
313+
initialObjects := []runtime.Object{}
314+
if test.PVC != nil {
315+
initialObjects = append(initialObjects, test.PVC)
316+
}
317+
if test.PV != nil {
318+
test.PV.Spec.PersistentVolumeSource.CSI.Driver = driverName
319+
initialObjects = append(initialObjects, test.PV)
320+
}
321+
322+
kubeClient, informerFactory := fakeK8s(initialObjects)
323+
pvInformer := informerFactory.Core().V1().PersistentVolumes()
324+
pvcInformer := informerFactory.Core().V1().PersistentVolumeClaims()
325+
podInformer := informerFactory.Core().V1().Pods()
326+
327+
metricsManager := metrics.NewCSIMetricsManager("" /* driverName */)
328+
metricsAddress := ""
329+
metricsPath := ""
330+
csiResizer, err := resizer.NewResizerFromClient(client, 15*time.Second, kubeClient, informerFactory, metricsManager, metricsAddress, metricsPath)
331+
if err != nil {
332+
t.Fatalf("Test %s: Unable to create resizer: %v", test.Name, err)
333+
}
334+
335+
controller := NewResizeController(driverName, csiResizer, kubeClient, time.Second, informerFactory, workqueue.DefaultControllerRateLimiter(), true /* disableVolumeInUseErrorHandler*/)
336+
337+
ctrlInstance, _ := controller.(*resizeController)
338+
339+
stopCh := make(chan struct{})
340+
informerFactory.Start(stopCh)
341+
342+
for _, obj := range initialObjects {
343+
switch obj.(type) {
344+
case *v1.PersistentVolume:
345+
pvInformer.Informer().GetStore().Add(obj)
346+
case *v1.PersistentVolumeClaim:
347+
pvcInformer.Informer().GetStore().Add(obj)
348+
case *v1.Pod:
349+
podInformer.Informer().GetStore().Add(obj)
350+
default:
351+
t.Fatalf("Test %s: Unknown initalObject type: %+v", test.Name, obj)
352+
}
353+
}
354+
355+
err = ctrlInstance.resizePVC(test.PVC, test.PV)
356+
if test.expectFailure && err == nil {
357+
t.Errorf("for %s expected error got nothing", test.Name)
358+
continue
359+
}
360+
if !test.expectFailure && err != nil {
361+
t.Errorf("for %s, unexpected error: %v", test.Name, err)
362+
}
363+
364+
}
365+
}
366+
280367
func invalidPVC() *v1.PersistentVolumeClaim {
281368
pvc := createPVC(1, 1)
282369
pvc.ObjectMeta.Name = ""

pkg/csi/mock_client.go

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@ package csi
22

33
import (
44
"context"
5+
"fmt"
56

67
"github.com/container-storage-interface/spec/lib/go/csi"
78
)
@@ -26,6 +27,7 @@ type MockClient struct {
2627
supportsControllerResize bool
2728
supportsPluginControllerService bool
2829
expandCalled int
30+
expansionFailed bool
2931
usedSecrets map[string]string
3032
usedCapability *csi.VolumeCapability
3133
}
@@ -46,13 +48,21 @@ func (c *MockClient) SupportsNodeResize(context.Context) (bool, error) {
4648
return c.supportsNodeResize, nil
4749
}
4850

51+
func (c *MockClient) SetExpansionFailed() {
52+
c.expansionFailed = true
53+
}
54+
4955
func (c *MockClient) Expand(
5056
ctx context.Context,
5157
volumeID string,
5258
requestBytes int64,
5359
secrets map[string]string,
5460
capability *csi.VolumeCapability) (int64, bool, error) {
5561
// TODO: Determine whether the operation succeeds or fails by parameters.
62+
if c.expansionFailed {
63+
c.expandCalled++
64+
return requestBytes, c.supportsNodeResize, fmt.Errorf("expansion failed")
65+
}
5666
c.expandCalled++
5767
c.usedSecrets = secrets
5868
c.usedCapability = capability

pkg/util/util.go

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,7 @@ import (
2222
"fmt"
2323
"regexp"
2424

25-
"k8s.io/api/core/v1"
25+
v1 "k8s.io/api/core/v1"
2626
"k8s.io/apimachinery/pkg/api/meta"
2727
"k8s.io/apimachinery/pkg/api/resource"
2828
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
@@ -124,18 +124,18 @@ func addResourceVersion(patchBytes []byte, resourceVersion string) ([]byte, erro
124124
}
125125

126126
// UpdatePVCapacity updates PVC capacity with requested size.
127-
func UpdatePVCapacity(pv *v1.PersistentVolume, newCapacity resource.Quantity, kubeClient kubernetes.Interface) error {
127+
func UpdatePVCapacity(pv *v1.PersistentVolume, newCapacity resource.Quantity, kubeClient kubernetes.Interface) (*v1.PersistentVolume, error) {
128128
newPV := pv.DeepCopy()
129129
newPV.Spec.Capacity[v1.ResourceStorage] = newCapacity
130130
patchBytes, err := getPatchData(pv, newPV)
131131
if err != nil {
132-
return fmt.Errorf("can't update capacity of PV %s as generate path data failed: %v", pv.Name, err)
132+
return nil, fmt.Errorf("can't update capacity of PV %s as generate path data failed: %v", pv.Name, err)
133133
}
134-
_, updateErr := kubeClient.CoreV1().PersistentVolumes().Patch(context.TODO(), pv.Name, types.StrategicMergePatchType, patchBytes, metav1.PatchOptions{})
134+
updatedPV, updateErr := kubeClient.CoreV1().PersistentVolumes().Patch(context.TODO(), pv.Name, types.StrategicMergePatchType, patchBytes, metav1.PatchOptions{})
135135
if updateErr != nil {
136-
return fmt.Errorf("update capacity of PV %s failed: %v", pv.Name, updateErr)
136+
return nil, fmt.Errorf("update capacity of PV %s failed: %v", pv.Name, updateErr)
137137
}
138-
return nil
138+
return updatedPV, nil
139139
}
140140

141141
func getPatchData(oldObj, newObj interface{}) ([]byte, error) {

0 commit comments

Comments
 (0)