diff --git a/pkg/controller/controller.go b/pkg/controller/controller.go index bd715be06..ee78973b2 100644 --- a/pkg/controller/controller.go +++ b/pkg/controller/controller.go @@ -26,9 +26,9 @@ import ( "google.golang.org/grpc/codes" "google.golang.org/grpc/status" v1 "k8s.io/api/core/v1" - k8serrors "k8s.io/apimachinery/pkg/api/errors" "k8s.io/apimachinery/pkg/api/resource" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/types" utilruntime "k8s.io/apimachinery/pkg/util/runtime" "k8s.io/apimachinery/pkg/util/wait" "k8s.io/client-go/informers" @@ -55,15 +55,18 @@ type resizeController struct { kubeClient kubernetes.Interface claimQueue workqueue.RateLimitingInterface eventRecorder record.EventRecorder - pvLister corelisters.PersistentVolumeLister pvSynced cache.InformerSynced - pvcLister corelisters.PersistentVolumeClaimLister pvcSynced cache.InformerSynced usedPVCs *inUsePVCStore - podLister corelisters.PodLister - podListerSynced cache.InformerSynced + podLister corelisters.PodLister + podListerSynced cache.InformerSynced + + // a cache to store PersistentVolume objects + volumes cache.Store + // a cache to store PersistentVolumeClaim objects + claims cache.Store handleVolumeInUseError bool } @@ -91,11 +94,11 @@ func NewResizeController( name: name, resizer: resizer, kubeClient: kubeClient, - pvLister: pvInformer.Lister(), pvSynced: pvInformer.Informer().HasSynced, - pvcLister: pvcInformer.Lister(), pvcSynced: pvcInformer.Informer().HasSynced, claimQueue: claimQueue, + volumes: pvInformer.Informer().GetStore(), + claims: pvcInformer.Informer().GetStore(), eventRecorder: eventRecorder, usedPVCs: newUsedPVCStore(), handleVolumeInUseError: handleVolumeInUseError, @@ -266,6 +269,7 @@ func (ctrl *resizeController) syncPVCs() { if err := ctrl.syncPVC(key.(string)); err != nil { // Put PVC back to the queue so that we can retry later. + klog.Errorf("Error syncing PVC: %v", err) ctrl.claimQueue.AddRateLimited(key) } else { ctrl.claimQueue.Forget(key) @@ -278,18 +282,22 @@ func (ctrl *resizeController) syncPVC(key string) error { namespace, name, err := cache.SplitMetaNamespaceKey(key) if err != nil { - klog.Errorf("Split meta namespace key of pvc %s failed: %v", key, err) - return err + return fmt.Errorf("getting namespace and name from key %s failed: %v", key, err) } - pvc, err := ctrl.pvcLister.PersistentVolumeClaims(namespace).Get(name) + pvcObject, exists, err := ctrl.claims.GetByKey(key) if err != nil { - if k8serrors.IsNotFound(err) { - klog.V(3).Infof("PVC %s/%s is deleted, no need to process it", namespace, name) - return nil - } - klog.Errorf("Get PVC %s/%s failed: %v", namespace, name, err) - return err + return fmt.Errorf("getting PVC %s/%s failed: %v", namespace, name, err) + } + + if !exists { + klog.V(3).Infof("PVC %s/%s is deleted or does not exist", namespace, name) + return nil + } + + pvc, ok := pvcObject.(*v1.PersistentVolumeClaim) + if !ok { + return fmt.Errorf("expected PVC got: %v", pvcObject) } if !ctrl.pvcNeedResize(pvc) { @@ -297,14 +305,19 @@ func (ctrl *resizeController) syncPVC(key string) error { return nil } - pv, err := ctrl.pvLister.Get(pvc.Spec.VolumeName) + volumeObj, exists, err := ctrl.volumes.GetByKey(pvc.Spec.VolumeName) if err != nil { - if k8serrors.IsNotFound(err) { - klog.V(3).Infof("PV %s is deleted, no need to process it", pvc.Spec.VolumeName) - return nil - } - klog.Errorf("Get PV %q of pvc %q failed: %v", pvc.Spec.VolumeName, util.PVCKey(pvc), err) - return err + return fmt.Errorf("Get PV %q of pvc %q failed: %v", pvc.Spec.VolumeName, util.PVCKey(pvc), err) + } + + if !exists { + klog.Warningf("PV %q bound to PVC %s not found", pvc.Spec.VolumeName, util.PVCKey(pvc)) + return nil + } + + pv, ok := volumeObj.(*v1.PersistentVolume) + if !ok { + return fmt.Errorf("expected volume but got %+v", volumeObj) } if !ctrl.pvNeedResize(pvc, pv) { @@ -369,8 +382,7 @@ func (ctrl *resizeController) pvNeedResize(pvc *v1.PersistentVolumeClaim, pv *v1 // 3. Mark pvc as resizing finished(no error, no need to resize fs), need resizing fs or resize failed. func (ctrl *resizeController) resizePVC(pvc *v1.PersistentVolumeClaim, pv *v1.PersistentVolume) error { if updatedPVC, err := ctrl.markPVCResizeInProgress(pvc); err != nil { - klog.Errorf("Mark pvc %q as resizing failed: %v", util.PVCKey(pvc), err) - return err + return fmt.Errorf("marking pvc %q as resizing failed: %v", util.PVCKey(pvc), err) } else if updatedPVC != nil { pvc = updatedPVC } @@ -406,7 +418,6 @@ func (ctrl *resizeController) resizePVC(pvc *v1.PersistentVolumeClaim, pv *v1.Pe // Record an event to indicate that resize operation is failed. ctrl.eventRecorder.Eventf(pvc, v1.EventTypeWarning, util.VolumeResizeFailed, err.Error()) } - return err } @@ -426,18 +437,17 @@ func (ctrl *resizeController) resizeVolume( newSize, fsResizeRequired, err := ctrl.resizer.Resize(pv, requestSize) if err != nil { - klog.Errorf("Resize volume %q by resizer %q failed: %v", pv.Name, ctrl.name, err) // if this error was a in-use error then it must be tracked so as we don't retry without // first verifying if volume is in-use if inUseError(err) { ctrl.usedPVCs.addPVCWithInUseError(pvc) } - return newSize, fsResizeRequired, fmt.Errorf("resize volume %s failed: %v", pv.Name, err) + return newSize, fsResizeRequired, fmt.Errorf("resize volume %q by resizer %q failed: %v", pv.Name, ctrl.name, err) } klog.V(4).Infof("Resize volume succeeded for volume %q, start to update PV's capacity", pv.Name) - if err := util.UpdatePVCapacity(pv, newSize, ctrl.kubeClient); err != nil { - klog.Errorf("Update capacity of PV %q to %s failed: %v", pv.Name, newSize.String(), err) + err = ctrl.updatePVCapacity(pv, newSize) + if err != nil { return newSize, fsResizeRequired, err } klog.V(4).Infof("Update capacity of PV %q to %s succeeded", pv.Name, newSize.String()) @@ -445,6 +455,30 @@ func (ctrl *resizeController) resizeVolume( return newSize, fsResizeRequired, nil } +func (ctrl *resizeController) markPVCAsFSResizeRequired(pvc *v1.PersistentVolumeClaim) error { + pvcCondition := v1.PersistentVolumeClaimCondition{ + Type: v1.PersistentVolumeClaimFileSystemResizePending, + Status: v1.ConditionTrue, + LastTransitionTime: metav1.Now(), + Message: "Waiting for user to (re-)start a pod to finish file system resize of volume on node.", + } + newPVC := pvc.DeepCopy() + newPVC.Status.Conditions = util.MergeResizeConditionsOfPVC(newPVC.Status.Conditions, + []v1.PersistentVolumeClaimCondition{pvcCondition}) + + _, err := ctrl.patchClaim(pvc, newPVC) + + if err != nil { + return fmt.Errorf("Mark PVC %q as file system resize required failed: %v", util.PVCKey(pvc), err) + } + + klog.V(4).Infof("Mark PVC %q as file system resize required", util.PVCKey(pvc)) + ctrl.eventRecorder.Eventf(pvc, v1.EventTypeNormal, + util.FileSystemResizeRequired, "Require file system resize of volume on node") + + return nil +} + func (ctrl *resizeController) markPVCResizeInProgress(pvc *v1.PersistentVolumeClaim) (*v1.PersistentVolumeClaim, error) { // Mark PVC as Resize Started progressCondition := v1.PersistentVolumeClaimCondition{ @@ -455,7 +489,12 @@ func (ctrl *resizeController) markPVCResizeInProgress(pvc *v1.PersistentVolumeCl newPVC := pvc.DeepCopy() newPVC.Status.Conditions = util.MergeResizeConditionsOfPVC(newPVC.Status.Conditions, []v1.PersistentVolumeClaimCondition{progressCondition}) - return util.PatchPVCStatus(pvc, newPVC, ctrl.kubeClient) + + updatedPVC, err := ctrl.patchClaim(pvc, newPVC) + if err != nil { + return nil, err + } + return updatedPVC, nil } func (ctrl *resizeController) markPVCResizeFinished( @@ -464,9 +503,10 @@ func (ctrl *resizeController) markPVCResizeFinished( newPVC := pvc.DeepCopy() newPVC.Status.Capacity[v1.ResourceStorage] = newSize newPVC.Status.Conditions = util.MergeResizeConditionsOfPVC(pvc.Status.Conditions, []v1.PersistentVolumeClaimCondition{}) - if _, err := util.PatchPVCStatus(pvc, newPVC, ctrl.kubeClient); err != nil { - klog.Errorf("Mark PVC %q as resize finished failed: %v", util.PVCKey(pvc), err) - return err + + _, err := ctrl.patchClaim(pvc, newPVC) + if err != nil { + return fmt.Errorf("Mark PVC %q as resize finished failed: %v", util.PVCKey(pvc), err) } klog.V(4).Infof("Resize PVC %q finished", util.PVCKey(pvc)) @@ -475,28 +515,52 @@ func (ctrl *resizeController) markPVCResizeFinished( return nil } -func (ctrl *resizeController) markPVCAsFSResizeRequired(pvc *v1.PersistentVolumeClaim) error { - pvcCondition := v1.PersistentVolumeClaimCondition{ - Type: v1.PersistentVolumeClaimFileSystemResizePending, - Status: v1.ConditionTrue, - LastTransitionTime: metav1.Now(), - Message: "Waiting for user to (re-)start a pod to finish file system resize of volume on node.", +func (ctrl *resizeController) patchClaim(oldPVC, newPVC *v1.PersistentVolumeClaim) (*v1.PersistentVolumeClaim, error) { + patchBytes, err := util.GetPVCPatchData(oldPVC, newPVC) + if err != nil { + return nil, fmt.Errorf("can't patch status of PVC %s as generate path data failed: %v", util.PVCKey(oldPVC), err) } - newPVC := pvc.DeepCopy() - newPVC.Status.Conditions = util.MergeResizeConditionsOfPVC(newPVC.Status.Conditions, - []v1.PersistentVolumeClaimCondition{pvcCondition}) - - if _, err := util.PatchPVCStatus(pvc, newPVC, ctrl.kubeClient); err != nil { - klog.Errorf("Mark PVC %q as file system resize required failed: %v", util.PVCKey(pvc), err) - return err + updatedClaim, updateErr := ctrl.kubeClient.CoreV1().PersistentVolumeClaims(oldPVC.Namespace). + Patch(context.TODO(), oldPVC.Name, types.StrategicMergePatchType, patchBytes, metav1.PatchOptions{}, "status") + if updateErr != nil { + return nil, fmt.Errorf("can't patch status of PVC %s with %v", util.PVCKey(oldPVC), updateErr) } - klog.V(4).Infof("Mark PVC %q as file system resize required", util.PVCKey(pvc)) - ctrl.eventRecorder.Eventf(pvc, v1.EventTypeNormal, - util.FileSystemResizeRequired, "Require file system resize of volume on node") + err = ctrl.claims.Update(updatedClaim) + if err != nil { + return nil, fmt.Errorf("error updating PVC %s in local cache: %v", util.PVCKey(newPVC), err) + } + + return updatedClaim, nil +} + +func (ctrl *resizeController) updatePVCapacity(pv *v1.PersistentVolume, newCapacity resource.Quantity) error { + klog.V(4).Infof("Resize volume succeeded for volume %q, start to update PV's capacity", pv.Name) + newPV := pv.DeepCopy() + newPV.Spec.Capacity[v1.ResourceStorage] = newCapacity + _, err := ctrl.patchPersistentVolume(pv, newPV) + if err != nil { + return fmt.Errorf("updating capacity of PV %q to %s failed: %v", pv.Name, newCapacity.String(), err) + } return nil } +func (ctrl *resizeController) patchPersistentVolume(oldPV, newPV *v1.PersistentVolume) (*v1.PersistentVolume, error) { + patchBytes, err := util.GetPatchData(oldPV, newPV) + if err != nil { + return nil, fmt.Errorf("can't update capacity of PV %s as generate path data failed: %v", newPV.Name, err) + } + updatedPV, updateErr := ctrl.kubeClient.CoreV1().PersistentVolumes().Patch(context.TODO(), newPV.Name, types.StrategicMergePatchType, patchBytes, metav1.PatchOptions{}) + if updateErr != nil { + return nil, fmt.Errorf("update capacity of PV %s failed: %v", newPV.Name, updateErr) + } + err = ctrl.volumes.Update(updatedPV) + if err != nil { + return nil, fmt.Errorf("error updating PV %s in local cache: %v", newPV.Name, err) + } + return updatedPV, nil +} + func parsePod(obj interface{}) *v1.Pod { if obj == nil { return nil diff --git a/pkg/controller/controller_test.go b/pkg/controller/controller_test.go index fc67846ca..c67e71379 100644 --- a/pkg/controller/controller_test.go +++ b/pkg/controller/controller_test.go @@ -277,6 +277,93 @@ func TestController(t *testing.T) { } } +func TestResizePVC(t *testing.T) { + fsVolumeMode := v1.PersistentVolumeFilesystem + + for _, test := range []struct { + Name string + PVC *v1.PersistentVolumeClaim + PV *v1.PersistentVolume + + NodeResize bool + expansionFailure bool + expectFailure bool + }{ + { + Name: "Resize PVC with FS resize", + PVC: createPVC(2, 1), + PV: createPV(1, "testPVC", defaultNS, "foobar", &fsVolumeMode), + NodeResize: true, + }, + { + Name: "Resize PVC with FS resize failure", + PVC: createPVC(2, 1), + PV: createPV(1, "testPVC", defaultNS, "foobar", &fsVolumeMode), + NodeResize: true, + expansionFailure: true, + expectFailure: true, + }, + } { + client := csi.NewMockClient("mock", test.NodeResize, true, true) + if test.expansionFailure { + client.SetExpansionFailed() + } + driverName, _ := client.GetDriverName(context.TODO()) + + initialObjects := []runtime.Object{} + if test.PVC != nil { + initialObjects = append(initialObjects, test.PVC) + } + if test.PV != nil { + test.PV.Spec.PersistentVolumeSource.CSI.Driver = driverName + initialObjects = append(initialObjects, test.PV) + } + + kubeClient, informerFactory := fakeK8s(initialObjects) + pvInformer := informerFactory.Core().V1().PersistentVolumes() + pvcInformer := informerFactory.Core().V1().PersistentVolumeClaims() + podInformer := informerFactory.Core().V1().Pods() + + metricsManager := metrics.NewCSIMetricsManager("" /* driverName */) + metricsAddress := "" + metricsPath := "" + csiResizer, err := resizer.NewResizerFromClient(client, 15*time.Second, kubeClient, informerFactory, metricsManager, metricsAddress, metricsPath) + if err != nil { + t.Fatalf("Test %s: Unable to create resizer: %v", test.Name, err) + } + + controller := NewResizeController(driverName, csiResizer, kubeClient, time.Second, informerFactory, workqueue.DefaultControllerRateLimiter(), true /* disableVolumeInUseErrorHandler*/) + + ctrlInstance, _ := controller.(*resizeController) + + stopCh := make(chan struct{}) + informerFactory.Start(stopCh) + + for _, obj := range initialObjects { + switch obj.(type) { + case *v1.PersistentVolume: + pvInformer.Informer().GetStore().Add(obj) + case *v1.PersistentVolumeClaim: + pvcInformer.Informer().GetStore().Add(obj) + case *v1.Pod: + podInformer.Informer().GetStore().Add(obj) + default: + t.Fatalf("Test %s: Unknown initalObject type: %+v", test.Name, obj) + } + } + + err = ctrlInstance.resizePVC(test.PVC, test.PV) + if test.expectFailure && err == nil { + t.Errorf("for %s expected error got nothing", test.Name) + continue + } + if !test.expectFailure && err != nil { + t.Errorf("for %s, unexpected error: %v", test.Name, err) + } + + } +} + func invalidPVC() *v1.PersistentVolumeClaim { pvc := createPVC(1, 1) pvc.ObjectMeta.Name = "" diff --git a/pkg/csi/mock_client.go b/pkg/csi/mock_client.go index 42912478c..1852f498e 100644 --- a/pkg/csi/mock_client.go +++ b/pkg/csi/mock_client.go @@ -2,6 +2,7 @@ package csi import ( "context" + "fmt" "github.com/container-storage-interface/spec/lib/go/csi" ) @@ -26,6 +27,7 @@ type MockClient struct { supportsControllerResize bool supportsPluginControllerService bool expandCalled int + expansionFailed bool usedSecrets map[string]string usedCapability *csi.VolumeCapability } @@ -46,6 +48,10 @@ func (c *MockClient) SupportsNodeResize(context.Context) (bool, error) { return c.supportsNodeResize, nil } +func (c *MockClient) SetExpansionFailed() { + c.expansionFailed = true +} + func (c *MockClient) Expand( ctx context.Context, volumeID string, @@ -53,6 +59,10 @@ func (c *MockClient) Expand( secrets map[string]string, capability *csi.VolumeCapability) (int64, bool, error) { // TODO: Determine whether the operation succeeds or fails by parameters. + if c.expansionFailed { + c.expandCalled++ + return requestBytes, c.supportsNodeResize, fmt.Errorf("expansion failed") + } c.expandCalled++ c.usedSecrets = secrets c.usedCapability = capability diff --git a/pkg/util/util.go b/pkg/util/util.go index e3eed0ec9..79b349225 100644 --- a/pkg/util/util.go +++ b/pkg/util/util.go @@ -17,19 +17,14 @@ limitations under the License. package util import ( - "context" "encoding/json" "fmt" "regexp" - "k8s.io/api/core/v1" + v1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/api/meta" - "k8s.io/apimachinery/pkg/api/resource" - metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" - "k8s.io/apimachinery/pkg/types" "k8s.io/apimachinery/pkg/util/strategicpatch" - "k8s.io/client-go/kubernetes" ) var knownResizeConditions = map[v1.PersistentVolumeClaimConditionType]bool{ @@ -74,25 +69,8 @@ func MergeResizeConditionsOfPVC(oldConditions, newConditions []v1.PersistentVolu return resultConditions } -// PatchPVCStatus updates PVC status using PATCH verb -func PatchPVCStatus( - oldPVC *v1.PersistentVolumeClaim, - newPVC *v1.PersistentVolumeClaim, - kubeClient kubernetes.Interface) (*v1.PersistentVolumeClaim, error) { - patchBytes, err := getPVCPatchData(oldPVC, newPVC) - if err != nil { - return nil, fmt.Errorf("can't patch status of PVC %s as generate path data failed: %v", PVCKey(oldPVC), err) - } - updatedClaim, updateErr := kubeClient.CoreV1().PersistentVolumeClaims(oldPVC.Namespace). - Patch(context.TODO(), oldPVC.Name, types.StrategicMergePatchType, patchBytes, metav1.PatchOptions{}, "status") - if updateErr != nil { - return nil, fmt.Errorf("can't patch status of PVC %s with %v", PVCKey(oldPVC), updateErr) - } - return updatedClaim, nil -} - -func getPVCPatchData(oldPVC, newPVC *v1.PersistentVolumeClaim) ([]byte, error) { - patchBytes, err := getPatchData(oldPVC, newPVC) +func GetPVCPatchData(oldPVC, newPVC *v1.PersistentVolumeClaim) ([]byte, error) { + patchBytes, err := GetPatchData(oldPVC, newPVC) if err != nil { return patchBytes, err } @@ -123,22 +101,7 @@ func addResourceVersion(patchBytes []byte, resourceVersion string) ([]byte, erro return versionBytes, nil } -// UpdatePVCapacity updates PVC capacity with requested size. -func UpdatePVCapacity(pv *v1.PersistentVolume, newCapacity resource.Quantity, kubeClient kubernetes.Interface) error { - newPV := pv.DeepCopy() - newPV.Spec.Capacity[v1.ResourceStorage] = newCapacity - patchBytes, err := getPatchData(pv, newPV) - if err != nil { - return fmt.Errorf("can't update capacity of PV %s as generate path data failed: %v", pv.Name, err) - } - _, updateErr := kubeClient.CoreV1().PersistentVolumes().Patch(context.TODO(), pv.Name, types.StrategicMergePatchType, patchBytes, metav1.PatchOptions{}) - if updateErr != nil { - return fmt.Errorf("update capacity of PV %s failed: %v", pv.Name, updateErr) - } - return nil -} - -func getPatchData(oldObj, newObj interface{}) ([]byte, error) { +func GetPatchData(oldObj, newObj interface{}) ([]byte, error) { oldData, err := json.Marshal(oldObj) if err != nil { return nil, fmt.Errorf("marshal old object failed: %v", err) diff --git a/pkg/util/util_test.go b/pkg/util/util_test.go index 6bfd599b3..b0322203d 100644 --- a/pkg/util/util_test.go +++ b/pkg/util/util_test.go @@ -18,7 +18,7 @@ func TestGetPVCPatchData(t *testing.T) { newPVC := c.OldPVC.DeepCopy() newPVC.Status.Conditions = append(newPVC.Status.Conditions, v1.PersistentVolumeClaimCondition{Type: VolumeResizing, Status: v1.ConditionTrue}) - patchBytes, err := getPVCPatchData(c.OldPVC, newPVC) + patchBytes, err := GetPVCPatchData(c.OldPVC, newPVC) if err != nil { t.Errorf("Case %d: Get patch data failed: %v", i, err) }