diff --git a/populator-machinery/controller.go b/populator-machinery/controller.go index 2fa7bc60..99a32c31 100644 --- a/populator-machinery/controller.go +++ b/populator-machinery/controller.go @@ -83,6 +83,9 @@ const ( reasonWaitForDataPopulationFinished = "PopulatorWaitForDataPopulationFinished" reasonStorageClassCreationError = "PopulatorStorageClassCreationError" reasonDataSourceNotFound = "PopulatorDataSourceNotFound" + reasonRebindOperationFinished = "RebindOperationFinished" + reasonRebindOperationFailed = "RebindOperationFailed" + reasonWaitForRebindFinished = "WaitForRebindFinished" ) type empty struct{} @@ -174,6 +177,10 @@ type ProviderFunctionConfig struct { PopulateFn func(context.Context, PopulatorParams) error // PopulateCompleteFn is the provider specific data population completeness check function, return true when data transfer gets completed PopulateCompleteFn func(context.Context, PopulatorParams) (bool, error) + // RebindCompleteFn is the provider specific volume rebind completeness check function, return true when the user's PVC is in Bound state + // If set, the volume populator library will defer rebinding to this function, rather than rebinding the PV referenced by PVC Prime + // to the user provided PVC. It is expected that the user's PVC will be bound, and PVC Prime will have status "Lost" + RebindCompleteFn func(context.Context, PopulatorParams) (bool, error) // PopulateCleanupFn is the provider specific data population cleanup function, cleanup resouces after data population completed PopulateCleanupFn func(context.Context, PopulatorParams) error } @@ -892,32 +899,47 @@ func (c *controller) syncPvc(ctx context.Context, key, pvcNamespace, pvcName str // Examine the claimref for the PV and see if it's bound to the correct PVC claimRef := pv.Spec.ClaimRef if claimRef.Name != pvc.Name || claimRef.Namespace != pvc.Namespace || claimRef.UID != pvc.UID { - // Make new PV with strategic patch values to perform the PV rebind - patchPv := corev1.PersistentVolume{ - ObjectMeta: metav1.ObjectMeta{ - Name: pv.Name, - Annotations: map[string]string{}, - }, - Spec: corev1.PersistentVolumeSpec{ - ClaimRef: &corev1.ObjectReference{ - Namespace: pvc.Namespace, - Name: pvc.Name, - UID: pvc.UID, - ResourceVersion: pvc.ResourceVersion, + if c.providerFunctionConfig != nil && c.providerFunctionConfig.RebindCompleteFn != nil { + complete, err := c.providerFunctionConfig.RebindCompleteFn(ctx, *params) + if err != nil { + // Handle rebind error appropriately, e.g., log and requeue + c.recorder.Eventf(pvc, corev1.EventTypeWarning, reasonRebindOperationFailed, "Rebind operation failed: %s", err.Error()) + return err + } + if !complete { + // TODO: Revisited if there is a better way to requeue pvc than return an error + // Return error to force reque pvc. We'll get called again later when the population operation complete + return fmt.Errorf(reasonWaitForRebindFinished) + } + } else { + // Make new PV with strategic patch values to perform the PV rebind + patchPv := corev1.PersistentVolume{ + ObjectMeta: metav1.ObjectMeta{ + Name: pv.Name, + Annotations: map[string]string{}, }, - }, - } - patchPv.Annotations[c.populatedFromAnno] = pvc.Namespace + "/" + dataSourceRef.Name - var patchData []byte - patchData, err = json.Marshal(patchPv) - if err != nil { - return err - } - _, err = c.kubeClient.CoreV1().PersistentVolumes().Patch(ctx, pv.Name, types.StrategicMergePatchType, - patchData, metav1.PatchOptions{}) - if err != nil { - return err + Spec: corev1.PersistentVolumeSpec{ + ClaimRef: &corev1.ObjectReference{ + Namespace: pvc.Namespace, + Name: pvc.Name, + UID: pvc.UID, + ResourceVersion: pvc.ResourceVersion, + }, + }, + } + patchPv.Annotations[c.populatedFromAnno] = pvc.Namespace + "/" + dataSourceRef.Name + var patchData []byte + patchData, err = json.Marshal(patchPv) + if err != nil { + return err + } + _, err = c.kubeClient.CoreV1().PersistentVolumes().Patch(ctx, pv.Name, types.StrategicMergePatchType, + patchData, metav1.PatchOptions{}) + if err != nil { + return err + } } + c.recorder.Eventf(pvc, corev1.EventTypeNormal, reasonRebindOperationFinished, "Rebind operation finished") // Don't start cleaning up yet -- we need to bind controller to acknowledge // the switch diff --git a/populator-machinery/controller_test.go b/populator-machinery/controller_test.go index d280efda..8c679df5 100644 --- a/populator-machinery/controller_test.go +++ b/populator-machinery/controller_test.go @@ -62,6 +62,8 @@ type testCase struct { populateFn func(context.Context, PopulatorParams) error // Provider specific data population completeness check function, return true when data transfer gets completed. populateCompleteFn func(context.Context, PopulatorParams) (bool, error) + // Provider specific data population completeness check function, return true when data transfer gets completed. + rebindCompleteFn func(context.Context, PopulatorParams) (bool, error) // The original PVC gets deleted or not pvcDeleted bool // PvcPrimeMutator is the mutator function for pvcPrime @@ -100,11 +102,13 @@ const ( testStorageClassName = "test-sc" testPvcPrimeName = populatorPvcPrefix + "-" + testPvcUid testPvName = "test-pv" + testPvName2 = "test-pv-2" testNodeName = "test-node-name" testPodName = populatorPodPrefix + "-" + testPvcUid testProvisioner = "test.provisioner" testPopulationOperationStartFailed = "Test populate operation start failed" testPopulateCompleteFailed = "Test populate operation complete failed" + testRebindCompleteFailed = "Test rebind operation failed" testMutatePVCPrimeFailed = "Test mutate pvcPrime failed" dataSourceKey = "unstructured/" + testPvcNamespace + "/" + testDataSourceName storageClassKey = "sc/" + testStorageClassName @@ -252,6 +256,26 @@ func populateCompleteSuccess(ctx context.Context, p PopulatorParams) (bool, erro return true, nil } +func rebindPvCompleteError(ctx context.Context, p PopulatorParams) (bool, error) { + return false, fmt.Errorf(testRebindCompleteFailed) +} + +func rebindNewPvSuccess(pvName string) func(ctx context.Context, p PopulatorParams) (bool, error) { + return func(ctx context.Context, p PopulatorParams) (bool, error) { + updatePvc, err := p.KubeClient.CoreV1().PersistentVolumeClaims(p.Pvc.Namespace).Get(ctx, p.Pvc.Name, metav1.GetOptions{}) + if err != nil { + return false, nil + } + updatePvc.Spec.VolumeName = pvName + _, err = p.KubeClient.CoreV1().PersistentVolumeClaims(p.Pvc.Namespace).Update(ctx, updatePvc, metav1.UpdateOptions{}) + if err != nil { + return false, err + } + + return true, nil + } +} + func pvcPrimeMutateAccessModeRWX(mp PvcPrimeMutatorParams) (*v1.PersistentVolumeClaim, error) { accessMode := v1.ReadWriteMany mp.PvcPrime.Spec.AccessModes[0] = accessMode @@ -317,6 +341,9 @@ func initTest(test testCase) ( PopulateCompleteFn: test.populateCompleteFn, } } + if test.populateCompleteFn != nil { + providerFunctionConfig.RebindCompleteFn = test.rebindCompleteFn + } var mutatorConfig *MutatorConfig if test.pvcPrimeMutator != nil { @@ -1227,6 +1254,56 @@ func TestSyncPvcWithProviderImplementation(t *testing.T) { pv: pv(testPvcName, testPvcNamespace, testPvcUid), }, }, + { + name: "Wait for the bind controller to rebind the PV with providerRebindFn", + key: "pvc/" + testPvcNamespace + "/" + testPvcName, + pvcNamespace: testPvcNamespace, + pvcName: testPvcName, + initialObjects: []runtime.Object{ + pvc(testPvcName, testPvcNamespace, "", testStorageClassName, "", testPvcUid, []string{pvFinalizer, vpFinalizer}, + dsf(testApiGroup, testDatasourceKind, testDataSourceName, testPvcNamespace), "", v1.ReadWriteOnce), + ust(), + sc(testStorageClassName, storagev1.VolumeBindingImmediate), + pvc(testPvcPrimeName, testVpWorkingNamespace, "", testStorageClassName, testPvName, testPvcUid, []string{pvFinalizer}, nil, "", v1.ReadWriteOnce), + pv(testPvcPrimeName, testVpWorkingNamespace, testPvcUid), + }, + populateFn: PopulateOperationStartSuccess, + populateCompleteFn: populateCompleteSuccess, + rebindCompleteFn: rebindNewPvSuccess(testPvName2), + expectedResult: nil, + expectedKeys: []string{pvcPrimeKey, pvKey}, + expectedObjects: &vpObjects{ + pvc: pvc(testPvcName, testPvcNamespace, "", testStorageClassName, testPvName2, testPvcUid, []string{pvFinalizer, vpFinalizer}, + dsf(testApiGroup, testDatasourceKind, testDataSourceName, testPvcNamespace), "", v1.ReadWriteOnce), + pvcPrime: pvc(testPvcPrimeName, testVpWorkingNamespace, "", testStorageClassName, testPvName, testPvcUid, []string{pvFinalizer}, nil, "", v1.ReadWriteOnce), + pv: pv(testPvcPrimeName, testVpWorkingNamespace, testPvcUid), + }, + }, + { + name: "Wait for the bind controller failed to rebind", + key: "pvc/" + testPvcNamespace + "/" + testPvcName, + pvcNamespace: testPvcNamespace, + pvcName: testPvcName, + initialObjects: []runtime.Object{ + pvc(testPvcName, testPvcNamespace, "", testStorageClassName, "", testPvcUid, []string{pvFinalizer, vpFinalizer}, + dsf(testApiGroup, testDatasourceKind, testDataSourceName, testPvcNamespace), "", v1.ReadWriteOnce), + ust(), + sc(testStorageClassName, storagev1.VolumeBindingImmediate), + pvc(testPvcPrimeName, testVpWorkingNamespace, "", testStorageClassName, testPvName, testPvcUid, []string{pvFinalizer}, nil, "", v1.ReadWriteOnce), + pv(testPvcPrimeName, testVpWorkingNamespace, testPvcUid), + }, + populateFn: PopulateOperationStartSuccess, + populateCompleteFn: populateCompleteSuccess, + rebindCompleteFn: rebindPvCompleteError, + expectedResult: fmt.Errorf(testRebindCompleteFailed), + expectedKeys: []string{pvcPrimeKey, pvKey}, + expectedObjects: &vpObjects{ + pvc: pvc(testPvcName, testPvcNamespace, "", testStorageClassName, "", testPvcUid, []string{pvFinalizer, vpFinalizer}, + dsf(testApiGroup, testDatasourceKind, testDataSourceName, testPvcNamespace), "", v1.ReadWriteOnce), + pvcPrime: pvc(testPvcPrimeName, testVpWorkingNamespace, "", testStorageClassName, testPvName, testPvcUid, []string{pvFinalizer}, nil, "", v1.ReadWriteOnce), + pv: pv(testPvcPrimeName, testVpWorkingNamespace, testPvcUid), + }, + }, { name: "Clean up pvcPrime", key: "pvc/" + testPvcNamespace + "/" + testPvcName,