Skip to content

Commit ce0524f

Browse files
committed
We need to ensure that we are not reading stale PV objects
So as we don't end up resizing same object more than once
1 parent d017b1f commit ce0524f

File tree

2 files changed

+69
-41
lines changed

2 files changed

+69
-41
lines changed

pkg/controller/controller.go

+63-35
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,6 @@ import (
2626
"google.golang.org/grpc/codes"
2727
"google.golang.org/grpc/status"
2828
v1 "k8s.io/api/core/v1"
29-
k8serrors "k8s.io/apimachinery/pkg/api/errors"
3029
"k8s.io/apimachinery/pkg/api/resource"
3130
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
3231
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
@@ -55,15 +54,18 @@ type resizeController struct {
5554
kubeClient kubernetes.Interface
5655
claimQueue workqueue.RateLimitingInterface
5756
eventRecorder record.EventRecorder
58-
pvLister corelisters.PersistentVolumeLister
5957
pvSynced cache.InformerSynced
60-
pvcLister corelisters.PersistentVolumeClaimLister
6158
pvcSynced cache.InformerSynced
6259

6360
usedPVCs *inUsePVCStore
6461

65-
podLister corelisters.PodLister
66-
podListerSynced cache.InformerSynced
62+
podLister corelisters.PodLister
63+
podListerSynced cache.InformerSynced
64+
65+
// a cache to store PersistentVolume objects
66+
volumes cache.Store
67+
// a cache to store PersistentVolumeClaim objects
68+
claims cache.Store
6769
handleVolumeInUseError bool
6870
}
6971

@@ -91,11 +93,11 @@ func NewResizeController(
9193
name: name,
9294
resizer: resizer,
9395
kubeClient: kubeClient,
94-
pvLister: pvInformer.Lister(),
9596
pvSynced: pvInformer.Informer().HasSynced,
96-
pvcLister: pvcInformer.Lister(),
9797
pvcSynced: pvcInformer.Informer().HasSynced,
9898
claimQueue: claimQueue,
99+
volumes: pvInformer.Informer().GetStore(),
100+
claims: pvcInformer.Informer().GetStore(),
99101
eventRecorder: eventRecorder,
100102
usedPVCs: newUsedPVCStore(),
101103
handleVolumeInUseError: handleVolumeInUseError,
@@ -266,6 +268,7 @@ func (ctrl *resizeController) syncPVCs() {
266268

267269
if err := ctrl.syncPVC(key.(string)); err != nil {
268270
// Put PVC back to the queue so that we can retry later.
271+
klog.Errorf("Error syncing PVC: %v", err)
269272
ctrl.claimQueue.AddRateLimited(key)
270273
} else {
271274
ctrl.claimQueue.Forget(key)
@@ -278,33 +281,42 @@ func (ctrl *resizeController) syncPVC(key string) error {
278281

279282
namespace, name, err := cache.SplitMetaNamespaceKey(key)
280283
if err != nil {
281-
klog.Errorf("Split meta namespace key of pvc %s failed: %v", key, err)
282-
return err
284+
return fmt.Errorf("getting namespace and name from key %s failed: %v", key, err)
283285
}
284286

285-
pvc, err := ctrl.pvcLister.PersistentVolumeClaims(namespace).Get(name)
287+
pvcObject, exists, err := ctrl.claims.GetByKey(key)
286288
if err != nil {
287-
if k8serrors.IsNotFound(err) {
288-
klog.V(3).Infof("PVC %s/%s is deleted, no need to process it", namespace, name)
289-
return nil
290-
}
291-
klog.Errorf("Get PVC %s/%s failed: %v", namespace, name, err)
292-
return err
289+
return fmt.Errorf("getting PVC %s/%s failed: %v", namespace, name, err)
290+
}
291+
292+
if !exists {
293+
klog.V(3).Infof("PVC %s/%s is deleted or does not exist", namespace, name)
294+
return nil
295+
}
296+
297+
pvc, ok := pvcObject.(*v1.PersistentVolumeClaim)
298+
if !ok {
299+
return fmt.Errorf("expected PVC got: %v", pvcObject)
293300
}
294301

295302
if !ctrl.pvcNeedResize(pvc) {
296303
klog.V(4).Infof("No need to resize PVC %q", util.PVCKey(pvc))
297304
return nil
298305
}
299306

300-
pv, err := ctrl.pvLister.Get(pvc.Spec.VolumeName)
307+
volumeObj, exists, err := ctrl.volumes.GetByKey(pvc.Spec.VolumeName)
301308
if err != nil {
302-
if k8serrors.IsNotFound(err) {
303-
klog.V(3).Infof("PV %s is deleted, no need to process it", pvc.Spec.VolumeName)
304-
return nil
305-
}
306-
klog.Errorf("Get PV %q of pvc %q failed: %v", pvc.Spec.VolumeName, util.PVCKey(pvc), err)
307-
return err
309+
return fmt.Errorf("Get PV %q of pvc %q failed: %v", pvc.Spec.VolumeName, util.PVCKey(pvc), err)
310+
}
311+
312+
if !exists {
313+
klog.Warningf("PV %q bound to PVC %s not found", pvc.Spec.VolumeName, util.PVCKey(pvc))
314+
return nil
315+
}
316+
317+
pv, ok := volumeObj.(*v1.PersistentVolume)
318+
if !ok {
319+
return fmt.Errorf("expected volume but got %+v", volumeObj)
308320
}
309321

310322
if !ctrl.pvNeedResize(pvc, pv) {
@@ -375,6 +387,12 @@ func (ctrl *resizeController) resizePVC(pvc *v1.PersistentVolumeClaim, pv *v1.Pe
375387
pvc = updatedPVC
376388
}
377389

390+
err := ctrl.claims.Update(pvc)
391+
if err != nil {
392+
klog.Errorf("error updating PVC %s in cache: %v", util.PVCKey(pvc), err)
393+
return err
394+
}
395+
378396
// if pvc previously failed to expand because it can't be expanded when in-use
379397
// we must not try expansion here
380398
if ctrl.usedPVCs.hasInUseErrors(pvc) && ctrl.usedPVCs.checkForUse(pvc) {
@@ -388,10 +406,10 @@ func (ctrl *resizeController) resizePVC(pvc *v1.PersistentVolumeClaim, pv *v1.Pe
388406
ctrl.eventRecorder.Event(pvc, v1.EventTypeNormal, util.VolumeResizing,
389407
fmt.Sprintf("External resizer is resizing volume %s", pv.Name))
390408

391-
err := func() error {
409+
updatedPVC, err := func() (*v1.PersistentVolumeClaim, error) {
392410
newSize, fsResizeRequired, err := ctrl.resizeVolume(pvc, pv)
393411
if err != nil {
394-
return err
412+
return pvc, err
395413
}
396414

397415
if fsResizeRequired {
@@ -407,7 +425,7 @@ func (ctrl *resizeController) resizePVC(pvc *v1.PersistentVolumeClaim, pv *v1.Pe
407425
ctrl.eventRecorder.Eventf(pvc, v1.EventTypeWarning, util.VolumeResizeFailed, err.Error())
408426
}
409427

410-
return err
428+
return ctrl.claims.Update(updatedPVC)
411429
}
412430

413431
// resizeVolume resize the volume to request size, and update PV's capacity if succeeded.
@@ -435,11 +453,17 @@ func (ctrl *resizeController) resizeVolume(
435453
return newSize, fsResizeRequired, fmt.Errorf("resize volume %s failed: %v", pv.Name, err)
436454
}
437455
klog.V(4).Infof("Resize volume succeeded for volume %q, start to update PV's capacity", pv.Name)
456+
updatedPV, err := util.UpdatePVCapacity(pv, newSize, ctrl.kubeClient)
438457

439-
if err := util.UpdatePVCapacity(pv, newSize, ctrl.kubeClient); err != nil {
458+
if err != nil {
440459
klog.Errorf("Update capacity of PV %q to %s failed: %v", pv.Name, newSize.String(), err)
441460
return newSize, fsResizeRequired, err
442461
}
462+
err = ctrl.volumes.Update(updatedPV)
463+
if err != nil {
464+
klog.Errorf("error updating PV %s: %v", updatedPV.Name, err)
465+
return newSize, fsResizeRequired, err
466+
}
443467
klog.V(4).Infof("Update capacity of PV %q to %s succeeded", pv.Name, newSize.String())
444468

445469
return newSize, fsResizeRequired, nil
@@ -460,22 +484,23 @@ func (ctrl *resizeController) markPVCResizeInProgress(pvc *v1.PersistentVolumeCl
460484

461485
func (ctrl *resizeController) markPVCResizeFinished(
462486
pvc *v1.PersistentVolumeClaim,
463-
newSize resource.Quantity) error {
487+
newSize resource.Quantity) (*v1.PersistentVolumeClaim, error) {
464488
newPVC := pvc.DeepCopy()
465489
newPVC.Status.Capacity[v1.ResourceStorage] = newSize
466490
newPVC.Status.Conditions = util.MergeResizeConditionsOfPVC(pvc.Status.Conditions, []v1.PersistentVolumeClaimCondition{})
467-
if _, err := util.PatchPVCStatus(pvc, newPVC, ctrl.kubeClient); err != nil {
491+
updatedPVC, err := util.PatchPVCStatus(pvc, newPVC, ctrl.kubeClient)
492+
if err != nil {
468493
klog.Errorf("Mark PVC %q as resize finished failed: %v", util.PVCKey(pvc), err)
469-
return err
494+
return nil, err
470495
}
471496

472497
klog.V(4).Infof("Resize PVC %q finished", util.PVCKey(pvc))
473498
ctrl.eventRecorder.Eventf(pvc, v1.EventTypeNormal, util.VolumeResizeSuccess, "Resize volume succeeded")
474499

475-
return nil
500+
return updatedPVC, nil
476501
}
477502

478-
func (ctrl *resizeController) markPVCAsFSResizeRequired(pvc *v1.PersistentVolumeClaim) error {
503+
func (ctrl *resizeController) markPVCAsFSResizeRequired(pvc *v1.PersistentVolumeClaim) (*v1.PersistentVolumeClaim, error) {
479504
pvcCondition := v1.PersistentVolumeClaimCondition{
480505
Type: v1.PersistentVolumeClaimFileSystemResizePending,
481506
Status: v1.ConditionTrue,
@@ -486,15 +511,18 @@ func (ctrl *resizeController) markPVCAsFSResizeRequired(pvc *v1.PersistentVolume
486511
newPVC.Status.Conditions = util.MergeResizeConditionsOfPVC(newPVC.Status.Conditions,
487512
[]v1.PersistentVolumeClaimCondition{pvcCondition})
488513

489-
if _, err := util.PatchPVCStatus(pvc, newPVC, ctrl.kubeClient); err != nil {
514+
updatedPVC, err := util.PatchPVCStatus(pvc, newPVC, ctrl.kubeClient)
515+
516+
if err != nil {
490517
klog.Errorf("Mark PVC %q as file system resize required failed: %v", util.PVCKey(pvc), err)
491-
return err
518+
return nil, err
492519
}
520+
493521
klog.V(4).Infof("Mark PVC %q as file system resize required", util.PVCKey(pvc))
494522
ctrl.eventRecorder.Eventf(pvc, v1.EventTypeNormal,
495523
util.FileSystemResizeRequired, "Require file system resize of volume on node")
496524

497-
return nil
525+
return updatedPVC, nil
498526
}
499527

500528
func parsePod(obj interface{}) *v1.Pod {

pkg/util/util.go

+6-6
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,7 @@ import (
2222
"fmt"
2323
"regexp"
2424

25-
"k8s.io/api/core/v1"
25+
v1 "k8s.io/api/core/v1"
2626
"k8s.io/apimachinery/pkg/api/meta"
2727
"k8s.io/apimachinery/pkg/api/resource"
2828
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
@@ -124,18 +124,18 @@ func addResourceVersion(patchBytes []byte, resourceVersion string) ([]byte, erro
124124
}
125125

126126
// UpdatePVCapacity updates PVC capacity with requested size.
127-
func UpdatePVCapacity(pv *v1.PersistentVolume, newCapacity resource.Quantity, kubeClient kubernetes.Interface) error {
127+
func UpdatePVCapacity(pv *v1.PersistentVolume, newCapacity resource.Quantity, kubeClient kubernetes.Interface) (*v1.PersistentVolume, error) {
128128
newPV := pv.DeepCopy()
129129
newPV.Spec.Capacity[v1.ResourceStorage] = newCapacity
130130
patchBytes, err := getPatchData(pv, newPV)
131131
if err != nil {
132-
return fmt.Errorf("can't update capacity of PV %s as generate path data failed: %v", pv.Name, err)
132+
return nil, fmt.Errorf("can't update capacity of PV %s as generate path data failed: %v", pv.Name, err)
133133
}
134-
_, updateErr := kubeClient.CoreV1().PersistentVolumes().Patch(context.TODO(), pv.Name, types.StrategicMergePatchType, patchBytes, metav1.PatchOptions{})
134+
updatedPV, updateErr := kubeClient.CoreV1().PersistentVolumes().Patch(context.TODO(), pv.Name, types.StrategicMergePatchType, patchBytes, metav1.PatchOptions{})
135135
if updateErr != nil {
136-
return fmt.Errorf("update capacity of PV %s failed: %v", pv.Name, updateErr)
136+
return nil, fmt.Errorf("update capacity of PV %s failed: %v", pv.Name, updateErr)
137137
}
138-
return nil
138+
return updatedPV, nil
139139
}
140140

141141
func getPatchData(oldObj, newObj interface{}) ([]byte, error) {

0 commit comments

Comments
 (0)