Skip to content

Commit 97eda0d

Browse files
committed
We need to ensure that we are not reading stale PV objects
So as we don't end up resizing same object more than once
1 parent d017b1f commit 97eda0d

File tree

2 files changed

+69
-30
lines changed

2 files changed

+69
-30
lines changed

pkg/controller/controller.go

+63-24
Original file line numberDiff line numberDiff line change
@@ -55,15 +55,18 @@ type resizeController struct {
5555
kubeClient kubernetes.Interface
5656
claimQueue workqueue.RateLimitingInterface
5757
eventRecorder record.EventRecorder
58-
pvLister corelisters.PersistentVolumeLister
5958
pvSynced cache.InformerSynced
60-
pvcLister corelisters.PersistentVolumeClaimLister
6159
pvcSynced cache.InformerSynced
6260

6361
usedPVCs *inUsePVCStore
6462

65-
podLister corelisters.PodLister
66-
podListerSynced cache.InformerSynced
63+
podLister corelisters.PodLister
64+
podListerSynced cache.InformerSynced
65+
66+
// a Cache to store PersistentVolume objects
67+
volumes cache.Store
68+
// a cache to store PersistentVolumeClaim objects
69+
claims cache.Store
6770
handleVolumeInUseError bool
6871
}
6972

@@ -87,15 +90,17 @@ func NewResizeController(
8790
claimQueue := workqueue.NewNamedRateLimitingQueue(
8891
pvcRateLimiter, fmt.Sprintf("%s-pvc", name))
8992

93+
pvcInformer.Informer().GetStore().Get("foo")
94+
9095
ctrl := &resizeController{
9196
name: name,
9297
resizer: resizer,
9398
kubeClient: kubeClient,
94-
pvLister: pvInformer.Lister(),
9599
pvSynced: pvInformer.Informer().HasSynced,
96-
pvcLister: pvcInformer.Lister(),
97100
pvcSynced: pvcInformer.Informer().HasSynced,
98101
claimQueue: claimQueue,
102+
volumes: pvInformer.Informer().GetStore(),
103+
claims: pvcInformer.Informer().GetStore(),
99104
eventRecorder: eventRecorder,
100105
usedPVCs: newUsedPVCStore(),
101106
handleVolumeInUseError: handleVolumeInUseError,
@@ -282,22 +287,30 @@ func (ctrl *resizeController) syncPVC(key string) error {
282287
return err
283288
}
284289

285-
pvc, err := ctrl.pvcLister.PersistentVolumeClaims(namespace).Get(name)
290+
pvcObject, exists, err := ctrl.claims.GetByKey(key)
286291
if err != nil {
287-
if k8serrors.IsNotFound(err) {
288-
klog.V(3).Infof("PVC %s/%s is deleted, no need to process it", namespace, name)
289-
return nil
290-
}
292+
291293
klog.Errorf("Get PVC %s/%s failed: %v", namespace, name, err)
292294
return err
293295
}
294296

297+
if !exists {
298+
klog.V(3).Infof("PVC %s/%s is deleted or does not exist", namespace, name)
299+
return nil
300+
}
301+
302+
pvc, ok := pvcObject.(*v1.PersistentVolumeClaim)
303+
if !ok {
304+
klog.Errorf("expected PVC got: %v", pvcObject)
305+
return fmt.Errorf("expected PVC got: %v", pvcObject)
306+
}
307+
295308
if !ctrl.pvcNeedResize(pvc) {
296309
klog.V(4).Infof("No need to resize PVC %q", util.PVCKey(pvc))
297310
return nil
298311
}
299312

300-
pv, err := ctrl.pvLister.Get(pvc.Spec.VolumeName)
313+
volumeObj, exists, err := ctrl.volumes.GetByKey(pvc.Spec.VolumeName)
301314
if err != nil {
302315
if k8serrors.IsNotFound(err) {
303316
klog.V(3).Infof("PV %s is deleted, no need to process it", pvc.Spec.VolumeName)
@@ -307,6 +320,16 @@ func (ctrl *resizeController) syncPVC(key string) error {
307320
return err
308321
}
309322

323+
if !exists {
324+
return fmt.Errorf("PV %q bound to PVC %s not found", pvc.Spec.VolumeName, util.PVCKey(pvc))
325+
}
326+
327+
pv, ok := volumeObj.(*v1.PersistentVolume)
328+
if !ok {
329+
klog.Errorf("expected volume but got %+v", volumeObj)
330+
return fmt.Errorf("expected volume but got %+v", volumeObj)
331+
}
332+
310333
if !ctrl.pvNeedResize(pvc, pv) {
311334
klog.V(4).Infof("No need to resize PV %q", pv.Name)
312335
return nil
@@ -375,6 +398,12 @@ func (ctrl *resizeController) resizePVC(pvc *v1.PersistentVolumeClaim, pv *v1.Pe
375398
pvc = updatedPVC
376399
}
377400

401+
err := ctrl.claims.Update(pvc)
402+
if err != nil {
403+
klog.Errorf("error updating PVC %s in cache: %v", util.PVCKey(pvc), err)
404+
return err
405+
}
406+
378407
// if pvc previously failed to expand because it can't be expanded when in-use
379408
// we must not try expansion here
380409
if ctrl.usedPVCs.hasInUseErrors(pvc) && ctrl.usedPVCs.checkForUse(pvc) {
@@ -388,10 +417,10 @@ func (ctrl *resizeController) resizePVC(pvc *v1.PersistentVolumeClaim, pv *v1.Pe
388417
ctrl.eventRecorder.Event(pvc, v1.EventTypeNormal, util.VolumeResizing,
389418
fmt.Sprintf("External resizer is resizing volume %s", pv.Name))
390419

391-
err := func() error {
420+
updatedPVC, err := func() (*v1.PersistentVolumeClaim, error) {
392421
newSize, fsResizeRequired, err := ctrl.resizeVolume(pvc, pv)
393422
if err != nil {
394-
return err
423+
return pvc, err
395424
}
396425

397426
if fsResizeRequired {
@@ -407,7 +436,7 @@ func (ctrl *resizeController) resizePVC(pvc *v1.PersistentVolumeClaim, pv *v1.Pe
407436
ctrl.eventRecorder.Eventf(pvc, v1.EventTypeWarning, util.VolumeResizeFailed, err.Error())
408437
}
409438

410-
return err
439+
return ctrl.claims.Update(updatedPVC)
411440
}
412441

413442
// resizeVolume resize the volume to request size, and update PV's capacity if succeeded.
@@ -435,11 +464,17 @@ func (ctrl *resizeController) resizeVolume(
435464
return newSize, fsResizeRequired, fmt.Errorf("resize volume %s failed: %v", pv.Name, err)
436465
}
437466
klog.V(4).Infof("Resize volume succeeded for volume %q, start to update PV's capacity", pv.Name)
467+
updatedPV, err := util.UpdatePVCapacity(pv, newSize, ctrl.kubeClient)
438468

439-
if err := util.UpdatePVCapacity(pv, newSize, ctrl.kubeClient); err != nil {
469+
if err != nil {
440470
klog.Errorf("Update capacity of PV %q to %s failed: %v", pv.Name, newSize.String(), err)
441471
return newSize, fsResizeRequired, err
442472
}
473+
err = ctrl.volumes.Update(updatedPV)
474+
if err != nil {
475+
klog.Errorf("error updating PV %s: %v", updatedPV.Name, err)
476+
return newSize, fsResizeRequired, err
477+
}
443478
klog.V(4).Infof("Update capacity of PV %q to %s succeeded", pv.Name, newSize.String())
444479

445480
return newSize, fsResizeRequired, nil
@@ -460,22 +495,23 @@ func (ctrl *resizeController) markPVCResizeInProgress(pvc *v1.PersistentVolumeCl
460495

461496
func (ctrl *resizeController) markPVCResizeFinished(
462497
pvc *v1.PersistentVolumeClaim,
463-
newSize resource.Quantity) error {
498+
newSize resource.Quantity) (*v1.PersistentVolumeClaim, error) {
464499
newPVC := pvc.DeepCopy()
465500
newPVC.Status.Capacity[v1.ResourceStorage] = newSize
466501
newPVC.Status.Conditions = util.MergeResizeConditionsOfPVC(pvc.Status.Conditions, []v1.PersistentVolumeClaimCondition{})
467-
if _, err := util.PatchPVCStatus(pvc, newPVC, ctrl.kubeClient); err != nil {
502+
updatedPVC, err := util.PatchPVCStatus(pvc, newPVC, ctrl.kubeClient)
503+
if err != nil {
468504
klog.Errorf("Mark PVC %q as resize finished failed: %v", util.PVCKey(pvc), err)
469-
return err
505+
return nil, err
470506
}
471507

472508
klog.V(4).Infof("Resize PVC %q finished", util.PVCKey(pvc))
473509
ctrl.eventRecorder.Eventf(pvc, v1.EventTypeNormal, util.VolumeResizeSuccess, "Resize volume succeeded")
474510

475-
return nil
511+
return updatedPVC, nil
476512
}
477513

478-
func (ctrl *resizeController) markPVCAsFSResizeRequired(pvc *v1.PersistentVolumeClaim) error {
514+
func (ctrl *resizeController) markPVCAsFSResizeRequired(pvc *v1.PersistentVolumeClaim) (*v1.PersistentVolumeClaim, error) {
479515
pvcCondition := v1.PersistentVolumeClaimCondition{
480516
Type: v1.PersistentVolumeClaimFileSystemResizePending,
481517
Status: v1.ConditionTrue,
@@ -486,15 +522,18 @@ func (ctrl *resizeController) markPVCAsFSResizeRequired(pvc *v1.PersistentVolume
486522
newPVC.Status.Conditions = util.MergeResizeConditionsOfPVC(newPVC.Status.Conditions,
487523
[]v1.PersistentVolumeClaimCondition{pvcCondition})
488524

489-
if _, err := util.PatchPVCStatus(pvc, newPVC, ctrl.kubeClient); err != nil {
525+
updatedPVC, err := util.PatchPVCStatus(pvc, newPVC, ctrl.kubeClient)
526+
527+
if err != nil {
490528
klog.Errorf("Mark PVC %q as file system resize required failed: %v", util.PVCKey(pvc), err)
491-
return err
529+
return nil, err
492530
}
531+
493532
klog.V(4).Infof("Mark PVC %q as file system resize required", util.PVCKey(pvc))
494533
ctrl.eventRecorder.Eventf(pvc, v1.EventTypeNormal,
495534
util.FileSystemResizeRequired, "Require file system resize of volume on node")
496535

497-
return nil
536+
return updatedPVC, nil
498537
}
499538

500539
func parsePod(obj interface{}) *v1.Pod {

pkg/util/util.go

+6-6
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,7 @@ import (
2222
"fmt"
2323
"regexp"
2424

25-
"k8s.io/api/core/v1"
25+
v1 "k8s.io/api/core/v1"
2626
"k8s.io/apimachinery/pkg/api/meta"
2727
"k8s.io/apimachinery/pkg/api/resource"
2828
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
@@ -124,18 +124,18 @@ func addResourceVersion(patchBytes []byte, resourceVersion string) ([]byte, erro
124124
}
125125

126126
// UpdatePVCapacity updates PVC capacity with requested size.
127-
func UpdatePVCapacity(pv *v1.PersistentVolume, newCapacity resource.Quantity, kubeClient kubernetes.Interface) error {
127+
func UpdatePVCapacity(pv *v1.PersistentVolume, newCapacity resource.Quantity, kubeClient kubernetes.Interface) (*v1.PersistentVolume, error) {
128128
newPV := pv.DeepCopy()
129129
newPV.Spec.Capacity[v1.ResourceStorage] = newCapacity
130130
patchBytes, err := getPatchData(pv, newPV)
131131
if err != nil {
132-
return fmt.Errorf("can't update capacity of PV %s as generate path data failed: %v", pv.Name, err)
132+
return nil, fmt.Errorf("can't update capacity of PV %s as generate path data failed: %v", pv.Name, err)
133133
}
134-
_, updateErr := kubeClient.CoreV1().PersistentVolumes().Patch(context.TODO(), pv.Name, types.StrategicMergePatchType, patchBytes, metav1.PatchOptions{})
134+
updatedPV, updateErr := kubeClient.CoreV1().PersistentVolumes().Patch(context.TODO(), pv.Name, types.StrategicMergePatchType, patchBytes, metav1.PatchOptions{})
135135
if updateErr != nil {
136-
return fmt.Errorf("update capacity of PV %s failed: %v", pv.Name, updateErr)
136+
return nil, fmt.Errorf("update capacity of PV %s failed: %v", pv.Name, updateErr)
137137
}
138-
return nil
138+
return updatedPV, nil
139139
}
140140

141141
func getPatchData(oldObj, newObj interface{}) ([]byte, error) {

0 commit comments

Comments
 (0)