Skip to content

We need to ensure that we are not reading stale PV objects #96

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 2 commits into from
Aug 21, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
164 changes: 114 additions & 50 deletions pkg/controller/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,9 +26,9 @@ import (
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
v1 "k8s.io/api/core/v1"
k8serrors "k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/api/resource"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/types"
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/client-go/informers"
Expand All @@ -55,15 +55,18 @@ type resizeController struct {
kubeClient kubernetes.Interface
claimQueue workqueue.RateLimitingInterface
eventRecorder record.EventRecorder
pvLister corelisters.PersistentVolumeLister
pvSynced cache.InformerSynced
pvcLister corelisters.PersistentVolumeClaimLister
pvcSynced cache.InformerSynced

usedPVCs *inUsePVCStore

podLister corelisters.PodLister
podListerSynced cache.InformerSynced
podLister corelisters.PodLister
podListerSynced cache.InformerSynced

// a cache to store PersistentVolume objects
volumes cache.Store
// a cache to store PersistentVolumeClaim objects
claims cache.Store
handleVolumeInUseError bool
}

Expand Down Expand Up @@ -91,11 +94,11 @@ func NewResizeController(
name: name,
resizer: resizer,
kubeClient: kubeClient,
pvLister: pvInformer.Lister(),
pvSynced: pvInformer.Informer().HasSynced,
pvcLister: pvcInformer.Lister(),
pvcSynced: pvcInformer.Informer().HasSynced,
claimQueue: claimQueue,
volumes: pvInformer.Informer().GetStore(),
claims: pvcInformer.Informer().GetStore(),
eventRecorder: eventRecorder,
usedPVCs: newUsedPVCStore(),
handleVolumeInUseError: handleVolumeInUseError,
Expand Down Expand Up @@ -266,6 +269,7 @@ func (ctrl *resizeController) syncPVCs() {

if err := ctrl.syncPVC(key.(string)); err != nil {
// Put PVC back to the queue so that we can retry later.
klog.Errorf("Error syncing PVC: %v", err)
ctrl.claimQueue.AddRateLimited(key)
} else {
ctrl.claimQueue.Forget(key)
Expand All @@ -278,33 +282,42 @@ func (ctrl *resizeController) syncPVC(key string) error {

namespace, name, err := cache.SplitMetaNamespaceKey(key)
if err != nil {
klog.Errorf("Split meta namespace key of pvc %s failed: %v", key, err)
return err
return fmt.Errorf("getting namespace and name from key %s failed: %v", key, err)
}

pvc, err := ctrl.pvcLister.PersistentVolumeClaims(namespace).Get(name)
pvcObject, exists, err := ctrl.claims.GetByKey(key)
if err != nil {
if k8serrors.IsNotFound(err) {
klog.V(3).Infof("PVC %s/%s is deleted, no need to process it", namespace, name)
return nil
}
klog.Errorf("Get PVC %s/%s failed: %v", namespace, name, err)
return err
return fmt.Errorf("getting PVC %s/%s failed: %v", namespace, name, err)
}

if !exists {
klog.V(3).Infof("PVC %s/%s is deleted or does not exist", namespace, name)
return nil
}

pvc, ok := pvcObject.(*v1.PersistentVolumeClaim)
if !ok {
return fmt.Errorf("expected PVC got: %v", pvcObject)
}

if !ctrl.pvcNeedResize(pvc) {
klog.V(4).Infof("No need to resize PVC %q", util.PVCKey(pvc))
return nil
}

pv, err := ctrl.pvLister.Get(pvc.Spec.VolumeName)
volumeObj, exists, err := ctrl.volumes.GetByKey(pvc.Spec.VolumeName)
if err != nil {
if k8serrors.IsNotFound(err) {
klog.V(3).Infof("PV %s is deleted, no need to process it", pvc.Spec.VolumeName)
return nil
}
klog.Errorf("Get PV %q of pvc %q failed: %v", pvc.Spec.VolumeName, util.PVCKey(pvc), err)
return err
return fmt.Errorf("Get PV %q of pvc %q failed: %v", pvc.Spec.VolumeName, util.PVCKey(pvc), err)
}

if !exists {
klog.Warningf("PV %q bound to PVC %s not found", pvc.Spec.VolumeName, util.PVCKey(pvc))
return nil
}

pv, ok := volumeObj.(*v1.PersistentVolume)
if !ok {
return fmt.Errorf("expected volume but got %+v", volumeObj)
}

if !ctrl.pvNeedResize(pvc, pv) {
Expand Down Expand Up @@ -369,8 +382,7 @@ func (ctrl *resizeController) pvNeedResize(pvc *v1.PersistentVolumeClaim, pv *v1
// 3. Mark pvc as resizing finished(no error, no need to resize fs), need resizing fs or resize failed.
func (ctrl *resizeController) resizePVC(pvc *v1.PersistentVolumeClaim, pv *v1.PersistentVolume) error {
if updatedPVC, err := ctrl.markPVCResizeInProgress(pvc); err != nil {
klog.Errorf("Mark pvc %q as resizing failed: %v", util.PVCKey(pvc), err)
return err
return fmt.Errorf("marking pvc %q as resizing failed: %v", util.PVCKey(pvc), err)
} else if updatedPVC != nil {
pvc = updatedPVC
}
Expand Down Expand Up @@ -406,7 +418,6 @@ func (ctrl *resizeController) resizePVC(pvc *v1.PersistentVolumeClaim, pv *v1.Pe
// Record an event to indicate that resize operation is failed.
ctrl.eventRecorder.Eventf(pvc, v1.EventTypeWarning, util.VolumeResizeFailed, err.Error())
}

return err
}

Expand All @@ -426,25 +437,48 @@ func (ctrl *resizeController) resizeVolume(
newSize, fsResizeRequired, err := ctrl.resizer.Resize(pv, requestSize)

if err != nil {
klog.Errorf("Resize volume %q by resizer %q failed: %v", pv.Name, ctrl.name, err)
// if this error was a in-use error then it must be tracked so as we don't retry without
// first verifying if volume is in-use
if inUseError(err) {
ctrl.usedPVCs.addPVCWithInUseError(pvc)
}
return newSize, fsResizeRequired, fmt.Errorf("resize volume %s failed: %v", pv.Name, err)
return newSize, fsResizeRequired, fmt.Errorf("resize volume %q by resizer %q failed: %v", pv.Name, ctrl.name, err)
}
klog.V(4).Infof("Resize volume succeeded for volume %q, start to update PV's capacity", pv.Name)

if err := util.UpdatePVCapacity(pv, newSize, ctrl.kubeClient); err != nil {
klog.Errorf("Update capacity of PV %q to %s failed: %v", pv.Name, newSize.String(), err)
err = ctrl.updatePVCapacity(pv, newSize)
if err != nil {
return newSize, fsResizeRequired, err
}
klog.V(4).Infof("Update capacity of PV %q to %s succeeded", pv.Name, newSize.String())

return newSize, fsResizeRequired, nil
}

func (ctrl *resizeController) markPVCAsFSResizeRequired(pvc *v1.PersistentVolumeClaim) error {
pvcCondition := v1.PersistentVolumeClaimCondition{
Type: v1.PersistentVolumeClaimFileSystemResizePending,
Status: v1.ConditionTrue,
LastTransitionTime: metav1.Now(),
Message: "Waiting for user to (re-)start a pod to finish file system resize of volume on node.",
}
newPVC := pvc.DeepCopy()
newPVC.Status.Conditions = util.MergeResizeConditionsOfPVC(newPVC.Status.Conditions,
[]v1.PersistentVolumeClaimCondition{pvcCondition})

_, err := ctrl.patchClaim(pvc, newPVC)

if err != nil {
return fmt.Errorf("Mark PVC %q as file system resize required failed: %v", util.PVCKey(pvc), err)
}

klog.V(4).Infof("Mark PVC %q as file system resize required", util.PVCKey(pvc))
ctrl.eventRecorder.Eventf(pvc, v1.EventTypeNormal,
util.FileSystemResizeRequired, "Require file system resize of volume on node")

return nil
}

func (ctrl *resizeController) markPVCResizeInProgress(pvc *v1.PersistentVolumeClaim) (*v1.PersistentVolumeClaim, error) {
// Mark PVC as Resize Started
progressCondition := v1.PersistentVolumeClaimCondition{
Expand All @@ -455,7 +489,12 @@ func (ctrl *resizeController) markPVCResizeInProgress(pvc *v1.PersistentVolumeCl
newPVC := pvc.DeepCopy()
newPVC.Status.Conditions = util.MergeResizeConditionsOfPVC(newPVC.Status.Conditions,
[]v1.PersistentVolumeClaimCondition{progressCondition})
return util.PatchPVCStatus(pvc, newPVC, ctrl.kubeClient)

updatedPVC, err := ctrl.patchClaim(pvc, newPVC)
if err != nil {
return nil, err
}
return updatedPVC, nil
}

func (ctrl *resizeController) markPVCResizeFinished(
Expand All @@ -464,9 +503,10 @@ func (ctrl *resizeController) markPVCResizeFinished(
newPVC := pvc.DeepCopy()
newPVC.Status.Capacity[v1.ResourceStorage] = newSize
newPVC.Status.Conditions = util.MergeResizeConditionsOfPVC(pvc.Status.Conditions, []v1.PersistentVolumeClaimCondition{})
if _, err := util.PatchPVCStatus(pvc, newPVC, ctrl.kubeClient); err != nil {
klog.Errorf("Mark PVC %q as resize finished failed: %v", util.PVCKey(pvc), err)
return err

_, err := ctrl.patchClaim(pvc, newPVC)
if err != nil {
return fmt.Errorf("Mark PVC %q as resize finished failed: %v", util.PVCKey(pvc), err)
}

klog.V(4).Infof("Resize PVC %q finished", util.PVCKey(pvc))
Expand All @@ -475,28 +515,52 @@ func (ctrl *resizeController) markPVCResizeFinished(
return nil
}

func (ctrl *resizeController) markPVCAsFSResizeRequired(pvc *v1.PersistentVolumeClaim) error {
pvcCondition := v1.PersistentVolumeClaimCondition{
Type: v1.PersistentVolumeClaimFileSystemResizePending,
Status: v1.ConditionTrue,
LastTransitionTime: metav1.Now(),
Message: "Waiting for user to (re-)start a pod to finish file system resize of volume on node.",
func (ctrl *resizeController) patchClaim(oldPVC, newPVC *v1.PersistentVolumeClaim) (*v1.PersistentVolumeClaim, error) {
patchBytes, err := util.GetPVCPatchData(oldPVC, newPVC)
if err != nil {
return nil, fmt.Errorf("can't patch status of PVC %s as generate path data failed: %v", util.PVCKey(oldPVC), err)
}
newPVC := pvc.DeepCopy()
newPVC.Status.Conditions = util.MergeResizeConditionsOfPVC(newPVC.Status.Conditions,
[]v1.PersistentVolumeClaimCondition{pvcCondition})

if _, err := util.PatchPVCStatus(pvc, newPVC, ctrl.kubeClient); err != nil {
klog.Errorf("Mark PVC %q as file system resize required failed: %v", util.PVCKey(pvc), err)
return err
updatedClaim, updateErr := ctrl.kubeClient.CoreV1().PersistentVolumeClaims(oldPVC.Namespace).
Patch(context.TODO(), oldPVC.Name, types.StrategicMergePatchType, patchBytes, metav1.PatchOptions{}, "status")
if updateErr != nil {
return nil, fmt.Errorf("can't patch status of PVC %s with %v", util.PVCKey(oldPVC), updateErr)
}
klog.V(4).Infof("Mark PVC %q as file system resize required", util.PVCKey(pvc))
ctrl.eventRecorder.Eventf(pvc, v1.EventTypeNormal,
util.FileSystemResizeRequired, "Require file system resize of volume on node")
err = ctrl.claims.Update(updatedClaim)
if err != nil {
return nil, fmt.Errorf("error updating PVC %s in local cache: %v", util.PVCKey(newPVC), err)
}

return updatedClaim, nil
}

func (ctrl *resizeController) updatePVCapacity(pv *v1.PersistentVolume, newCapacity resource.Quantity) error {
klog.V(4).Infof("Resize volume succeeded for volume %q, start to update PV's capacity", pv.Name)
newPV := pv.DeepCopy()
newPV.Spec.Capacity[v1.ResourceStorage] = newCapacity

_, err := ctrl.patchPersistentVolume(pv, newPV)
if err != nil {
return fmt.Errorf("updating capacity of PV %q to %s failed: %v", pv.Name, newCapacity.String(), err)
}
return nil
}

func (ctrl *resizeController) patchPersistentVolume(oldPV, newPV *v1.PersistentVolume) (*v1.PersistentVolume, error) {
patchBytes, err := util.GetPatchData(oldPV, newPV)
if err != nil {
return nil, fmt.Errorf("can't update capacity of PV %s as generate path data failed: %v", newPV.Name, err)
}
updatedPV, updateErr := ctrl.kubeClient.CoreV1().PersistentVolumes().Patch(context.TODO(), newPV.Name, types.StrategicMergePatchType, patchBytes, metav1.PatchOptions{})
if updateErr != nil {
return nil, fmt.Errorf("update capacity of PV %s failed: %v", newPV.Name, updateErr)
}
err = ctrl.volumes.Update(updatedPV)
if err != nil {
return nil, fmt.Errorf("error updating PV %s in local cache: %v", newPV.Name, err)
}
return updatedPV, nil
}

func parsePod(obj interface{}) *v1.Pod {
if obj == nil {
return nil
Expand Down
87 changes: 87 additions & 0 deletions pkg/controller/controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -277,6 +277,93 @@ func TestController(t *testing.T) {
}
}

func TestResizePVC(t *testing.T) {
fsVolumeMode := v1.PersistentVolumeFilesystem

for _, test := range []struct {
Name string
PVC *v1.PersistentVolumeClaim
PV *v1.PersistentVolume

NodeResize bool
expansionFailure bool
expectFailure bool
}{
{
Name: "Resize PVC with FS resize",
PVC: createPVC(2, 1),
PV: createPV(1, "testPVC", defaultNS, "foobar", &fsVolumeMode),
NodeResize: true,
},
{
Name: "Resize PVC with FS resize failure",
PVC: createPVC(2, 1),
PV: createPV(1, "testPVC", defaultNS, "foobar", &fsVolumeMode),
NodeResize: true,
expansionFailure: true,
expectFailure: true,
},
} {
client := csi.NewMockClient("mock", test.NodeResize, true, true)
if test.expansionFailure {
client.SetExpansionFailed()
}
driverName, _ := client.GetDriverName(context.TODO())

initialObjects := []runtime.Object{}
if test.PVC != nil {
initialObjects = append(initialObjects, test.PVC)
}
if test.PV != nil {
test.PV.Spec.PersistentVolumeSource.CSI.Driver = driverName
initialObjects = append(initialObjects, test.PV)
}

kubeClient, informerFactory := fakeK8s(initialObjects)
pvInformer := informerFactory.Core().V1().PersistentVolumes()
pvcInformer := informerFactory.Core().V1().PersistentVolumeClaims()
podInformer := informerFactory.Core().V1().Pods()

metricsManager := metrics.NewCSIMetricsManager("" /* driverName */)
metricsAddress := ""
metricsPath := ""
csiResizer, err := resizer.NewResizerFromClient(client, 15*time.Second, kubeClient, informerFactory, metricsManager, metricsAddress, metricsPath)
if err != nil {
t.Fatalf("Test %s: Unable to create resizer: %v", test.Name, err)
}

controller := NewResizeController(driverName, csiResizer, kubeClient, time.Second, informerFactory, workqueue.DefaultControllerRateLimiter(), true /* disableVolumeInUseErrorHandler*/)

ctrlInstance, _ := controller.(*resizeController)

stopCh := make(chan struct{})
informerFactory.Start(stopCh)

for _, obj := range initialObjects {
switch obj.(type) {
case *v1.PersistentVolume:
pvInformer.Informer().GetStore().Add(obj)
case *v1.PersistentVolumeClaim:
pvcInformer.Informer().GetStore().Add(obj)
case *v1.Pod:
podInformer.Informer().GetStore().Add(obj)
default:
t.Fatalf("Test %s: Unknown initalObject type: %+v", test.Name, obj)
}
}

err = ctrlInstance.resizePVC(test.PVC, test.PV)
if test.expectFailure && err == nil {
t.Errorf("for %s expected error got nothing", test.Name)
continue
}
if !test.expectFailure && err != nil {
t.Errorf("for %s, unexpected error: %v", test.Name, err)
}

}
}

func invalidPVC() *v1.PersistentVolumeClaim {
pvc := createPVC(1, 1)
pvc.ObjectMeta.Name = ""
Expand Down
Loading