Skip to content

Commit 0325630

Browse files
committed
We need to ensure that we are not reading stale PV objects
So as we don't end up resizing same object more than once
1 parent d017b1f commit 0325630

File tree

2 files changed

+78
-45
lines changed

2 files changed

+78
-45
lines changed

pkg/controller/controller.go

Lines changed: 72 additions & 39 deletions
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,6 @@ import (
2626
"google.golang.org/grpc/codes"
2727
"google.golang.org/grpc/status"
2828
v1 "k8s.io/api/core/v1"
29-
k8serrors "k8s.io/apimachinery/pkg/api/errors"
3029
"k8s.io/apimachinery/pkg/api/resource"
3130
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
3231
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
@@ -55,15 +54,18 @@ type resizeController struct {
5554
kubeClient kubernetes.Interface
5655
claimQueue workqueue.RateLimitingInterface
5756
eventRecorder record.EventRecorder
58-
pvLister corelisters.PersistentVolumeLister
5957
pvSynced cache.InformerSynced
60-
pvcLister corelisters.PersistentVolumeClaimLister
6158
pvcSynced cache.InformerSynced
6259

6360
usedPVCs *inUsePVCStore
6461

65-
podLister corelisters.PodLister
66-
podListerSynced cache.InformerSynced
62+
podLister corelisters.PodLister
63+
podListerSynced cache.InformerSynced
64+
65+
// a cache to store PersistentVolume objects
66+
volumes cache.Store
67+
// a cache to store PersistentVolumeClaim objects
68+
claims cache.Store
6769
handleVolumeInUseError bool
6870
}
6971

@@ -91,11 +93,11 @@ func NewResizeController(
9193
name: name,
9294
resizer: resizer,
9395
kubeClient: kubeClient,
94-
pvLister: pvInformer.Lister(),
9596
pvSynced: pvInformer.Informer().HasSynced,
96-
pvcLister: pvcInformer.Lister(),
9797
pvcSynced: pvcInformer.Informer().HasSynced,
9898
claimQueue: claimQueue,
99+
volumes: pvInformer.Informer().GetStore(),
100+
claims: pvcInformer.Informer().GetStore(),
99101
eventRecorder: eventRecorder,
100102
usedPVCs: newUsedPVCStore(),
101103
handleVolumeInUseError: handleVolumeInUseError,
@@ -266,6 +268,7 @@ func (ctrl *resizeController) syncPVCs() {
266268

267269
if err := ctrl.syncPVC(key.(string)); err != nil {
268270
// Put PVC back to the queue so that we can retry later.
271+
klog.Errorf("Error syncing PVC: %v", err)
269272
ctrl.claimQueue.AddRateLimited(key)
270273
} else {
271274
ctrl.claimQueue.Forget(key)
@@ -278,33 +281,42 @@ func (ctrl *resizeController) syncPVC(key string) error {
278281

279282
namespace, name, err := cache.SplitMetaNamespaceKey(key)
280283
if err != nil {
281-
klog.Errorf("Split meta namespace key of pvc %s failed: %v", key, err)
282-
return err
284+
return fmt.Errorf("getting namespace and name from key %s failed: %v", key, err)
283285
}
284286

285-
pvc, err := ctrl.pvcLister.PersistentVolumeClaims(namespace).Get(name)
287+
pvcObject, exists, err := ctrl.claims.GetByKey(key)
286288
if err != nil {
287-
if k8serrors.IsNotFound(err) {
288-
klog.V(3).Infof("PVC %s/%s is deleted, no need to process it", namespace, name)
289-
return nil
290-
}
291-
klog.Errorf("Get PVC %s/%s failed: %v", namespace, name, err)
292-
return err
289+
return fmt.Errorf("getting PVC %s/%s failed: %v", namespace, name, err)
290+
}
291+
292+
if !exists {
293+
klog.V(3).Infof("PVC %s/%s is deleted or does not exist", namespace, name)
294+
return nil
295+
}
296+
297+
pvc, ok := pvcObject.(*v1.PersistentVolumeClaim)
298+
if !ok {
299+
return fmt.Errorf("expected PVC got: %v", pvcObject)
293300
}
294301

295302
if !ctrl.pvcNeedResize(pvc) {
296303
klog.V(4).Infof("No need to resize PVC %q", util.PVCKey(pvc))
297304
return nil
298305
}
299306

300-
pv, err := ctrl.pvLister.Get(pvc.Spec.VolumeName)
307+
volumeObj, exists, err := ctrl.volumes.GetByKey(pvc.Spec.VolumeName)
301308
if err != nil {
302-
if k8serrors.IsNotFound(err) {
303-
klog.V(3).Infof("PV %s is deleted, no need to process it", pvc.Spec.VolumeName)
304-
return nil
305-
}
306-
klog.Errorf("Get PV %q of pvc %q failed: %v", pvc.Spec.VolumeName, util.PVCKey(pvc), err)
307-
return err
309+
return fmt.Errorf("Get PV %q of pvc %q failed: %v", pvc.Spec.VolumeName, util.PVCKey(pvc), err)
310+
}
311+
312+
if !exists {
313+
klog.Warningf("PV %q bound to PVC %s not found", pvc.Spec.VolumeName, util.PVCKey(pvc))
314+
return nil
315+
}
316+
317+
pv, ok := volumeObj.(*v1.PersistentVolume)
318+
if !ok {
319+
return fmt.Errorf("expected volume but got %+v", volumeObj)
308320
}
309321

310322
if !ctrl.pvNeedResize(pvc, pv) {
@@ -369,8 +381,7 @@ func (ctrl *resizeController) pvNeedResize(pvc *v1.PersistentVolumeClaim, pv *v1
369381
// 3. Mark pvc as resizing finished(no error, no need to resize fs), need resizing fs or resize failed.
370382
func (ctrl *resizeController) resizePVC(pvc *v1.PersistentVolumeClaim, pv *v1.PersistentVolume) error {
371383
if updatedPVC, err := ctrl.markPVCResizeInProgress(pvc); err != nil {
372-
klog.Errorf("Mark pvc %q as resizing failed: %v", util.PVCKey(pvc), err)
373-
return err
384+
return fmt.Errorf("marking pvc %q as resizing failed: %v", util.PVCKey(pvc), err)
374385
} else if updatedPVC != nil {
375386
pvc = updatedPVC
376387
}
@@ -406,8 +417,7 @@ func (ctrl *resizeController) resizePVC(pvc *v1.PersistentVolumeClaim, pv *v1.Pe
406417
// Record an event to indicate that resize operation is failed.
407418
ctrl.eventRecorder.Eventf(pvc, v1.EventTypeWarning, util.VolumeResizeFailed, err.Error())
408419
}
409-
410-
return err
420+
return nil
411421
}
412422

413423
// resizeVolume resize the volume to request size, and update PV's capacity if succeeded.
@@ -426,19 +436,22 @@ func (ctrl *resizeController) resizeVolume(
426436
newSize, fsResizeRequired, err := ctrl.resizer.Resize(pv, requestSize)
427437

428438
if err != nil {
429-
klog.Errorf("Resize volume %q by resizer %q failed: %v", pv.Name, ctrl.name, err)
430439
// if this error was a in-use error then it must be tracked so as we don't retry without
431440
// first verifying if volume is in-use
432441
if inUseError(err) {
433442
ctrl.usedPVCs.addPVCWithInUseError(pvc)
434443
}
435-
return newSize, fsResizeRequired, fmt.Errorf("resize volume %s failed: %v", pv.Name, err)
444+
return newSize, fsResizeRequired, fmt.Errorf("Resize volume %q by resizer %q failed: %v", pv.Name, ctrl.name, err)
436445
}
437446
klog.V(4).Infof("Resize volume succeeded for volume %q, start to update PV's capacity", pv.Name)
447+
updatedPV, err := util.UpdatePVCapacity(pv, newSize, ctrl.kubeClient)
438448

439-
if err := util.UpdatePVCapacity(pv, newSize, ctrl.kubeClient); err != nil {
440-
klog.Errorf("Update capacity of PV %q to %s failed: %v", pv.Name, newSize.String(), err)
441-
return newSize, fsResizeRequired, err
449+
if err != nil {
450+
return newSize, fsResizeRequired, fmt.Errorf("Update capacity of PV %q to %s failed: %v", pv.Name, newSize.String(), err)
451+
}
452+
err = ctrl.volumes.Update(updatedPV)
453+
if err != nil {
454+
return newSize, fsResizeRequired, fmt.Errorf("error updating PV %s: %v", updatedPV.Name, err)
442455
}
443456
klog.V(4).Infof("Update capacity of PV %q to %s succeeded", pv.Name, newSize.String())
444457

@@ -455,7 +468,15 @@ func (ctrl *resizeController) markPVCResizeInProgress(pvc *v1.PersistentVolumeCl
455468
newPVC := pvc.DeepCopy()
456469
newPVC.Status.Conditions = util.MergeResizeConditionsOfPVC(newPVC.Status.Conditions,
457470
[]v1.PersistentVolumeClaimCondition{progressCondition})
458-
return util.PatchPVCStatus(pvc, newPVC, ctrl.kubeClient)
471+
updatedPVC, err := util.PatchPVCStatus(pvc, newPVC, ctrl.kubeClient)
472+
if err != nil {
473+
return nil, err
474+
}
475+
err = ctrl.claims.Update(updatedPVC)
476+
if err != nil {
477+
return nil, fmt.Errorf("error updating PVC %s in local cache: %v", util.PVCKey(newPVC), err)
478+
}
479+
return updatedPVC, nil
459480
}
460481

461482
func (ctrl *resizeController) markPVCResizeFinished(
@@ -464,9 +485,14 @@ func (ctrl *resizeController) markPVCResizeFinished(
464485
newPVC := pvc.DeepCopy()
465486
newPVC.Status.Capacity[v1.ResourceStorage] = newSize
466487
newPVC.Status.Conditions = util.MergeResizeConditionsOfPVC(pvc.Status.Conditions, []v1.PersistentVolumeClaimCondition{})
467-
if _, err := util.PatchPVCStatus(pvc, newPVC, ctrl.kubeClient); err != nil {
468-
klog.Errorf("Mark PVC %q as resize finished failed: %v", util.PVCKey(pvc), err)
469-
return err
488+
updatedPVC, err := util.PatchPVCStatus(pvc, newPVC, ctrl.kubeClient)
489+
if err != nil {
490+
return fmt.Errorf("Mark PVC %q as resize finished failed: %v", util.PVCKey(pvc), err)
491+
}
492+
493+
err = ctrl.claims.Update(updatedPVC)
494+
if err != nil {
495+
return fmt.Errorf("error updating PVC %s in local cache: %v", util.PVCKey(newPVC), err)
470496
}
471497

472498
klog.V(4).Infof("Resize PVC %q finished", util.PVCKey(pvc))
@@ -486,10 +512,17 @@ func (ctrl *resizeController) markPVCAsFSResizeRequired(pvc *v1.PersistentVolume
486512
newPVC.Status.Conditions = util.MergeResizeConditionsOfPVC(newPVC.Status.Conditions,
487513
[]v1.PersistentVolumeClaimCondition{pvcCondition})
488514

489-
if _, err := util.PatchPVCStatus(pvc, newPVC, ctrl.kubeClient); err != nil {
490-
klog.Errorf("Mark PVC %q as file system resize required failed: %v", util.PVCKey(pvc), err)
491-
return err
515+
updatedPVC, err := util.PatchPVCStatus(pvc, newPVC, ctrl.kubeClient)
516+
517+
if err != nil {
518+
return fmt.Errorf("Mark PVC %q as file system resize required failed: %v", util.PVCKey(pvc), err)
492519
}
520+
521+
err = ctrl.claims.Update(updatedPVC)
522+
if err != nil {
523+
return fmt.Errorf("error updating pvc %s in local cache: %v", util.PVCKey(updatedPVC), err)
524+
}
525+
493526
klog.V(4).Infof("Mark PVC %q as file system resize required", util.PVCKey(pvc))
494527
ctrl.eventRecorder.Eventf(pvc, v1.EventTypeNormal,
495528
util.FileSystemResizeRequired, "Require file system resize of volume on node")

pkg/util/util.go

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,7 @@ import (
2222
"fmt"
2323
"regexp"
2424

25-
"k8s.io/api/core/v1"
25+
v1 "k8s.io/api/core/v1"
2626
"k8s.io/apimachinery/pkg/api/meta"
2727
"k8s.io/apimachinery/pkg/api/resource"
2828
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
@@ -124,18 +124,18 @@ func addResourceVersion(patchBytes []byte, resourceVersion string) ([]byte, erro
124124
}
125125

126126
// UpdatePVCapacity updates PVC capacity with requested size.
127-
func UpdatePVCapacity(pv *v1.PersistentVolume, newCapacity resource.Quantity, kubeClient kubernetes.Interface) error {
127+
func UpdatePVCapacity(pv *v1.PersistentVolume, newCapacity resource.Quantity, kubeClient kubernetes.Interface) (*v1.PersistentVolume, error) {
128128
newPV := pv.DeepCopy()
129129
newPV.Spec.Capacity[v1.ResourceStorage] = newCapacity
130130
patchBytes, err := getPatchData(pv, newPV)
131131
if err != nil {
132-
return fmt.Errorf("can't update capacity of PV %s as generate path data failed: %v", pv.Name, err)
132+
return nil, fmt.Errorf("can't update capacity of PV %s as generate path data failed: %v", pv.Name, err)
133133
}
134-
_, updateErr := kubeClient.CoreV1().PersistentVolumes().Patch(context.TODO(), pv.Name, types.StrategicMergePatchType, patchBytes, metav1.PatchOptions{})
134+
updatedPV, updateErr := kubeClient.CoreV1().PersistentVolumes().Patch(context.TODO(), pv.Name, types.StrategicMergePatchType, patchBytes, metav1.PatchOptions{})
135135
if updateErr != nil {
136-
return fmt.Errorf("update capacity of PV %s failed: %v", pv.Name, updateErr)
136+
return nil, fmt.Errorf("update capacity of PV %s failed: %v", pv.Name, updateErr)
137137
}
138-
return nil
138+
return updatedPV, nil
139139
}
140140

141141
func getPatchData(oldObj, newObj interface{}) ([]byte, error) {

0 commit comments

Comments
 (0)