Skip to content

Allow custom PVC rebinding #200

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Draft
wants to merge 1 commit into
base: master
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
70 changes: 46 additions & 24 deletions populator-machinery/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,9 @@ const (
reasonWaitForDataPopulationFinished = "PopulatorWaitForDataPopulationFinished"
reasonStorageClassCreationError = "PopulatorStorageClassCreationError"
reasonDataSourceNotFound = "PopulatorDataSourceNotFound"
reasonRebindOperationFinished = "RebindOperationFinished"
reasonRebindOperationFailed = "RebindOperationFailed"
reasonWaitForRebindFinished = "WaitForRebindFinished"
)

type empty struct{}
Expand Down Expand Up @@ -174,6 +177,10 @@ type ProviderFunctionConfig struct {
PopulateFn func(context.Context, PopulatorParams) error
// PopulateCompleteFn is the provider specific data population completeness check function, return true when data transfer gets completed
PopulateCompleteFn func(context.Context, PopulatorParams) (bool, error)
// RebindCompleteFn is the provider specific volume rebind completeness check function, return true when the user's PVC is in Bound state
// If set, the volume populator library will defer rebinding to this function, rather than rebinding the PV referenced by PVC Prime
// to the user provided PVC. It is expected that the user's PVC will be bound, and PVC Prime will have status "Lost"
RebindCompleteFn func(context.Context, PopulatorParams) (bool, error)
// PopulateCleanupFn is the provider specific data population cleanup function, cleanup resouces after data population completed
PopulateCleanupFn func(context.Context, PopulatorParams) error
}
Expand Down Expand Up @@ -892,32 +899,47 @@ func (c *controller) syncPvc(ctx context.Context, key, pvcNamespace, pvcName str
// Examine the claimref for the PV and see if it's bound to the correct PVC
claimRef := pv.Spec.ClaimRef
if claimRef.Name != pvc.Name || claimRef.Namespace != pvc.Namespace || claimRef.UID != pvc.UID {
// Make new PV with strategic patch values to perform the PV rebind
patchPv := corev1.PersistentVolume{
ObjectMeta: metav1.ObjectMeta{
Name: pv.Name,
Annotations: map[string]string{},
},
Spec: corev1.PersistentVolumeSpec{
ClaimRef: &corev1.ObjectReference{
Namespace: pvc.Namespace,
Name: pvc.Name,
UID: pvc.UID,
ResourceVersion: pvc.ResourceVersion,
if c.providerFunctionConfig != nil && c.providerFunctionConfig.RebindCompleteFn != nil {
complete, err := c.providerFunctionConfig.RebindCompleteFn(ctx, *params)
if err != nil {
// Handle rebind error appropriately, e.g., log and requeue
c.recorder.Eventf(pvc, corev1.EventTypeWarning, reasonRebindOperationFailed, "Rebind operation failed: %s", err.Error())
return err
}
if !complete {
// TODO: Revisited if there is a better way to requeue pvc than return an error
// Return error to force reque pvc. We'll get called again later when the population operation complete
return fmt.Errorf(reasonWaitForRebindFinished)
}
} else {
// Make new PV with strategic patch values to perform the PV rebind
patchPv := corev1.PersistentVolume{
ObjectMeta: metav1.ObjectMeta{
Name: pv.Name,
Annotations: map[string]string{},
},
},
}
patchPv.Annotations[c.populatedFromAnno] = pvc.Namespace + "/" + dataSourceRef.Name
var patchData []byte
patchData, err = json.Marshal(patchPv)
if err != nil {
return err
}
_, err = c.kubeClient.CoreV1().PersistentVolumes().Patch(ctx, pv.Name, types.StrategicMergePatchType,
patchData, metav1.PatchOptions{})
if err != nil {
return err
Spec: corev1.PersistentVolumeSpec{
ClaimRef: &corev1.ObjectReference{
Namespace: pvc.Namespace,
Name: pvc.Name,
UID: pvc.UID,
ResourceVersion: pvc.ResourceVersion,
},
},
}
patchPv.Annotations[c.populatedFromAnno] = pvc.Namespace + "/" + dataSourceRef.Name
var patchData []byte
patchData, err = json.Marshal(patchPv)
if err != nil {
return err
}
_, err = c.kubeClient.CoreV1().PersistentVolumes().Patch(ctx, pv.Name, types.StrategicMergePatchType,
patchData, metav1.PatchOptions{})
if err != nil {
return err
}
}
c.recorder.Eventf(pvc, corev1.EventTypeNormal, reasonRebindOperationFinished, "Rebind operation finished")

// Don't start cleaning up yet -- we need to bind controller to acknowledge
// the switch
Expand Down
77 changes: 77 additions & 0 deletions populator-machinery/controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,8 @@ type testCase struct {
populateFn func(context.Context, PopulatorParams) error
// Provider specific data population completeness check function, return true when data transfer gets completed.
populateCompleteFn func(context.Context, PopulatorParams) (bool, error)
// Provider specific data population completeness check function, return true when data transfer gets completed.
rebindCompleteFn func(context.Context, PopulatorParams) (bool, error)
// The original PVC gets deleted or not
pvcDeleted bool
// PvcPrimeMutator is the mutator function for pvcPrime
Expand Down Expand Up @@ -100,11 +102,13 @@ const (
testStorageClassName = "test-sc"
testPvcPrimeName = populatorPvcPrefix + "-" + testPvcUid
testPvName = "test-pv"
testPvName2 = "test-pv-2"
testNodeName = "test-node-name"
testPodName = populatorPodPrefix + "-" + testPvcUid
testProvisioner = "test.provisioner"
testPopulationOperationStartFailed = "Test populate operation start failed"
testPopulateCompleteFailed = "Test populate operation complete failed"
testRebindCompleteFailed = "Test rebind operation failed"
testMutatePVCPrimeFailed = "Test mutate pvcPrime failed"
dataSourceKey = "unstructured/" + testPvcNamespace + "/" + testDataSourceName
storageClassKey = "sc/" + testStorageClassName
Expand Down Expand Up @@ -252,6 +256,26 @@ func populateCompleteSuccess(ctx context.Context, p PopulatorParams) (bool, erro
return true, nil
}

func rebindPvCompleteError(ctx context.Context, p PopulatorParams) (bool, error) {
return false, fmt.Errorf(testRebindCompleteFailed)
}

func rebindNewPvSuccess(pvName string) func(ctx context.Context, p PopulatorParams) (bool, error) {
return func(ctx context.Context, p PopulatorParams) (bool, error) {
updatePvc, err := p.KubeClient.CoreV1().PersistentVolumeClaims(p.Pvc.Namespace).Get(ctx, p.Pvc.Name, metav1.GetOptions{})
if err != nil {
return false, nil
}
updatePvc.Spec.VolumeName = pvName
_, err = p.KubeClient.CoreV1().PersistentVolumeClaims(p.Pvc.Namespace).Update(ctx, updatePvc, metav1.UpdateOptions{})
if err != nil {
return false, err
}

return true, nil
}
}

func pvcPrimeMutateAccessModeRWX(mp PvcPrimeMutatorParams) (*v1.PersistentVolumeClaim, error) {
accessMode := v1.ReadWriteMany
mp.PvcPrime.Spec.AccessModes[0] = accessMode
Expand Down Expand Up @@ -317,6 +341,9 @@ func initTest(test testCase) (
PopulateCompleteFn: test.populateCompleteFn,
}
}
if test.populateCompleteFn != nil {
providerFunctionConfig.RebindCompleteFn = test.rebindCompleteFn
}

var mutatorConfig *MutatorConfig
if test.pvcPrimeMutator != nil {
Expand Down Expand Up @@ -1227,6 +1254,56 @@ func TestSyncPvcWithProviderImplementation(t *testing.T) {
pv: pv(testPvcName, testPvcNamespace, testPvcUid),
},
},
{
name: "Wait for the bind controller to rebind the PV with providerRebindFn",
key: "pvc/" + testPvcNamespace + "/" + testPvcName,
pvcNamespace: testPvcNamespace,
pvcName: testPvcName,
initialObjects: []runtime.Object{
pvc(testPvcName, testPvcNamespace, "", testStorageClassName, "", testPvcUid, []string{pvFinalizer, vpFinalizer},
dsf(testApiGroup, testDatasourceKind, testDataSourceName, testPvcNamespace), "", v1.ReadWriteOnce),
ust(),
sc(testStorageClassName, storagev1.VolumeBindingImmediate),
pvc(testPvcPrimeName, testVpWorkingNamespace, "", testStorageClassName, testPvName, testPvcUid, []string{pvFinalizer}, nil, "", v1.ReadWriteOnce),
pv(testPvcPrimeName, testVpWorkingNamespace, testPvcUid),
},
populateFn: PopulateOperationStartSuccess,
populateCompleteFn: populateCompleteSuccess,
rebindCompleteFn: rebindNewPvSuccess(testPvName2),
expectedResult: nil,
expectedKeys: []string{pvcPrimeKey, pvKey},
expectedObjects: &vpObjects{
pvc: pvc(testPvcName, testPvcNamespace, "", testStorageClassName, testPvName2, testPvcUid, []string{pvFinalizer, vpFinalizer},
dsf(testApiGroup, testDatasourceKind, testDataSourceName, testPvcNamespace), "", v1.ReadWriteOnce),
pvcPrime: pvc(testPvcPrimeName, testVpWorkingNamespace, "", testStorageClassName, testPvName, testPvcUid, []string{pvFinalizer}, nil, "", v1.ReadWriteOnce),
pv: pv(testPvcPrimeName, testVpWorkingNamespace, testPvcUid),
},
},
{
name: "Wait for the bind controller failed to rebind",
key: "pvc/" + testPvcNamespace + "/" + testPvcName,
pvcNamespace: testPvcNamespace,
pvcName: testPvcName,
initialObjects: []runtime.Object{
pvc(testPvcName, testPvcNamespace, "", testStorageClassName, "", testPvcUid, []string{pvFinalizer, vpFinalizer},
dsf(testApiGroup, testDatasourceKind, testDataSourceName, testPvcNamespace), "", v1.ReadWriteOnce),
ust(),
sc(testStorageClassName, storagev1.VolumeBindingImmediate),
pvc(testPvcPrimeName, testVpWorkingNamespace, "", testStorageClassName, testPvName, testPvcUid, []string{pvFinalizer}, nil, "", v1.ReadWriteOnce),
pv(testPvcPrimeName, testVpWorkingNamespace, testPvcUid),
},
populateFn: PopulateOperationStartSuccess,
populateCompleteFn: populateCompleteSuccess,
rebindCompleteFn: rebindPvCompleteError,
expectedResult: fmt.Errorf(testRebindCompleteFailed),
expectedKeys: []string{pvcPrimeKey, pvKey},
expectedObjects: &vpObjects{
pvc: pvc(testPvcName, testPvcNamespace, "", testStorageClassName, "", testPvcUid, []string{pvFinalizer, vpFinalizer},
dsf(testApiGroup, testDatasourceKind, testDataSourceName, testPvcNamespace), "", v1.ReadWriteOnce),
pvcPrime: pvc(testPvcPrimeName, testVpWorkingNamespace, "", testStorageClassName, testPvName, testPvcUid, []string{pvFinalizer}, nil, "", v1.ReadWriteOnce),
pv: pv(testPvcPrimeName, testVpWorkingNamespace, testPvcUid),
},
},
{
name: "Clean up pvcPrime",
key: "pvc/" + testPvcNamespace + "/" + testPvcName,
Expand Down