Skip to content

Commit d7e076b

Browse files
authored
Merge pull request #1521 from umagnus/refactor_volume_cloning
cleanup: refactor volume cloning
2 parents 4b8ebdc + d39b8b7 commit d7e076b

File tree

3 files changed

+252
-60
lines changed

3 files changed

+252
-60
lines changed

pkg/blob/blob.go

+4
Original file line numberDiff line numberDiff line change
@@ -163,6 +163,10 @@ var (
163163
supportedProtocolList = []string{Fuse, Fuse2, NFS, AZNFS}
164164
retriableErrors = []string{accountNotProvisioned, tooManyRequests, statusCodeNotFound, containerBeingDeletedDataplaneAPIError, containerBeingDeletedManagementAPIError, clientThrottled}
165165
supportedFSGroupChangePolicyList = []string{FSGroupChangeNone, string(v1.FSGroupChangeAlways), string(v1.FSGroupChangeOnRootMismatch)}
166+
167+
// azcopyCloneVolumeOptions used in volume cloning between different storage account and --check-length to false because volume data may be in changing state, copy volume is not same as current source volume,
168+
// set --s2s-preserve-access-tier=false to avoid BlobAccessTierNotSupportedForAccountType error in azcopy
169+
azcopyCloneVolumeOptions = []string{"--recursive", "--check-length=false", "--s2s-preserve-access-tier=false"}
166170
)
167171

168172
// DriverOptions defines driver parameters specified in driver deployment

pkg/blob/controllerserver.go

+59-44
Original file line numberDiff line numberDiff line change
@@ -366,36 +366,12 @@ func (d *Driver) CreateVolume(ctx context.Context, req *csi.CreateVolumeRequest)
366366
defer d.volumeLocks.Release(volName)
367367

368368
requestName := "controller_create_volume"
369-
370-
var srcAzcopyAuthEnv []string
371-
var srcSubscriptionID, srcResourceGroupName, srcAccountName, srcContainerName, srcPath, srcAccountSASToken string
372369
if volContentSource != nil {
373370
switch volContentSource.Type.(type) {
374371
case *csi.VolumeContentSource_Snapshot:
375372
return nil, status.Errorf(codes.InvalidArgument, "VolumeContentSource Snapshot is not yet implemented")
376373
case *csi.VolumeContentSource_Volume:
377374
requestName = "controller_create_volume_from_volume"
378-
var srcVolumeID string
379-
if volContentSource.GetVolume() != nil {
380-
srcVolumeID = volContentSource.GetVolume().GetVolumeId()
381-
}
382-
srcResourceGroupName, srcAccountName, srcContainerName, _, srcSubscriptionID, err = GetContainerInfo(srcVolumeID)
383-
if err != nil {
384-
return nil, status.Error(codes.NotFound, err.Error())
385-
}
386-
srcAccountOptions := &azure.AccountOptions{
387-
Name: srcAccountName,
388-
SubscriptionID: srcSubscriptionID,
389-
ResourceGroup: srcResourceGroupName,
390-
GetLatestAccountKey: getLatestAccountKey,
391-
}
392-
srcAccountSASToken, srcAzcopyAuthEnv, err = d.getAzcopyAuth(ctx, srcAccountName, "", storageEndpointSuffix, srcAccountOptions, secrets, secretName, secretNamespace)
393-
if err != nil {
394-
return nil, status.Errorf(codes.Internal, "failed to getAzcopyAuth on account(%s) rg(%s), error: %v", accountOptions.Name, accountOptions.ResourceGroup, err)
395-
}
396-
srcPath = fmt.Sprintf("https://%s.blob.%s/%s", srcAccountName, storageEndpointSuffix, srcContainerName)
397-
default:
398-
return nil, status.Errorf(codes.InvalidArgument, "VolumeContentSource is not recognized: %v", volContentSource)
399375
}
400376
}
401377

@@ -466,16 +442,11 @@ func (d *Driver) CreateVolume(ctx context.Context, req *csi.CreateVolumeRequest)
466442
return nil, status.Errorf(codes.Internal, "failed to create container(%s) on account(%s) type(%s) rg(%s) location(%s) size(%d), error: %v", validContainerName, accountName, storageAccountType, resourceGroup, location, requestGiB, err)
467443
}
468444
if volContentSource != nil {
469-
dstAzcopyAuthEnv := srcAzcopyAuthEnv
470-
dstAccountSASToken := srcAccountSASToken
471-
if srcAccountName != accountName {
472-
if dstAccountSASToken, dstAzcopyAuthEnv, err = d.getAzcopyAuth(ctx, accountName, accountKey, storageEndpointSuffix, accountOptions, secrets, secretName, secretNamespace); err != nil {
473-
return nil, status.Errorf(codes.Internal, "failed to getAzcopyAuth on account(%s) rg(%s), error: %v", accountOptions.Name, accountOptions.ResourceGroup, err)
474-
}
445+
accountSASToken, authAzcopyEnv, err := d.getAzcopyAuth(ctx, accountName, accountKey, storageEndpointSuffix, accountOptions, secrets, secretName, secretNamespace)
446+
if err != nil {
447+
return nil, status.Errorf(codes.Internal, "failed to getAzcopyAuth on account(%s) rg(%s), error: %v", accountOptions.Name, accountOptions.ResourceGroup, err)
475448
}
476-
477-
dstPath := fmt.Sprintf("https://%s.blob.%s/%s", accountName, storageEndpointSuffix, validContainerName)
478-
if err := d.copyBlobContainer(dstAzcopyAuthEnv, srcPath, srcAccountSASToken, dstPath, dstAccountSASToken, validContainerName); err != nil {
449+
if err := d.copyVolume(ctx, req, accountName, accountSASToken, authAzcopyEnv, validContainerName, secretNamespace, accountOptions, storageEndpointSuffix); err != nil {
479450
return nil, err
480451
}
481452
}
@@ -786,11 +757,36 @@ func (d *Driver) DeleteBlobContainer(ctx context.Context, subsID, resourceGroupN
786757
}
787758

788759
// copyBlobContainer copies source volume content into a destination volume
789-
func (d *Driver) copyBlobContainer(authAzcopyEnv []string, srcPath string, srcAccountSASToken string, dstPath string, dstAccountSASToken string, dstContainerName string) error {
760+
func (d *Driver) copyBlobContainer(ctx context.Context, req *csi.CreateVolumeRequest, dstAccountName string, dstAccountSasToken string, authAzcopyEnv []string, dstContainerName string, secretNamespace string, accountOptions *azure.AccountOptions, storageEndpointSuffix string) error {
761+
var sourceVolumeID string
762+
if req.GetVolumeContentSource() != nil && req.GetVolumeContentSource().GetVolume() != nil {
763+
sourceVolumeID = req.GetVolumeContentSource().GetVolume().GetVolumeId()
790764

791-
if srcPath == "" || dstPath == "" || dstContainerName == "" {
792-
return fmt.Errorf("srcPath(%s) or dstPath(%s) or dstContainerName(%s) is empty", srcPath, dstPath, dstContainerName)
793765
}
766+
srcResourceGroupName, srcAccountName, srcContainerName, _, srcSubscriptionID, err := GetContainerInfo(sourceVolumeID) //nolint:dogsled
767+
if err != nil {
768+
return status.Error(codes.NotFound, err.Error())
769+
}
770+
if dstAccountName == "" {
771+
dstAccountName = srcAccountName
772+
}
773+
if srcAccountName == "" || srcContainerName == "" || dstContainerName == "" {
774+
return fmt.Errorf("srcAccountName(%s) or srcContainerName(%s) or dstContainerName(%s) is empty", srcAccountName, srcContainerName, dstContainerName)
775+
}
776+
srcAccountSasToken := dstAccountSasToken
777+
if srcAccountName != dstAccountName && dstAccountSasToken != "" {
778+
srcAccountOptions := &azure.AccountOptions{
779+
Name: srcAccountName,
780+
ResourceGroup: srcResourceGroupName,
781+
SubscriptionID: srcSubscriptionID,
782+
GetLatestAccountKey: accountOptions.GetLatestAccountKey,
783+
}
784+
if srcAccountSasToken, _, err = d.getAzcopyAuth(ctx, srcAccountName, "", storageEndpointSuffix, srcAccountOptions, nil, "", secretNamespace); err != nil {
785+
return err
786+
}
787+
}
788+
srcPath := fmt.Sprintf("https://%s.blob.%s/%s%s", srcAccountName, storageEndpointSuffix, srcContainerName, srcAccountSasToken)
789+
dstPath := fmt.Sprintf("https://%s.blob.%s/%s%s", dstAccountName, storageEndpointSuffix, dstContainerName, dstAccountSasToken)
794790

795791
jobState, percent, err := d.azcopy.GetAzcopyJob(dstContainerName, authAzcopyEnv)
796792
klog.V(2).Infof("azcopy job status: %s, copy percent: %s%%, error: %v", jobState, percent, err)
@@ -800,13 +796,9 @@ func (d *Driver) copyBlobContainer(authAzcopyEnv []string, srcPath string, srcAc
800796
case util.AzcopyJobRunning:
801797
return fmt.Errorf("wait for the existing AzCopy job to complete, current copy percentage is %s%%", percent)
802798
case util.AzcopyJobNotFound:
803-
klog.V(2).Infof("copy blob container %s to %s", srcPath, dstContainerName)
799+
klog.V(2).Infof("copy blob container %s:%s to %s:%s", srcAccountName, srcContainerName, dstAccountName, dstContainerName)
804800
execFunc := func() error {
805-
cmd := exec.Command("azcopy", "copy", srcPath+srcAccountSASToken, dstPath+dstAccountSASToken, "--recursive", "--check-length=false", "--s2s-preserve-access-tier=false")
806-
if len(authAzcopyEnv) > 0 {
807-
cmd.Env = append(os.Environ(), authAzcopyEnv...)
808-
}
809-
if out, err := cmd.CombinedOutput(); err != nil {
801+
if out, err := d.execAzcopyCopy(srcPath, dstPath, azcopyCloneVolumeOptions, authAzcopyEnv); err != nil {
810802
return fmt.Errorf("exec error: %v, output: %v", err, string(out))
811803
}
812804
return nil
@@ -817,15 +809,38 @@ func (d *Driver) copyBlobContainer(authAzcopyEnv []string, srcPath string, srcAc
817809
}
818810
copyErr := util.WaitUntilTimeout(time.Duration(d.waitForAzCopyTimeoutMinutes)*time.Minute, execFunc, timeoutFunc)
819811
if copyErr != nil {
820-
klog.Warningf("CopyBlobContainer(%s, %s, %s) failed with error: %v", srcPath, dstPath, dstContainerName, copyErr)
812+
klog.Warningf("CopyBlobContainer(%s, %s, %s) failed with error: %v", accountOptions.ResourceGroup, dstAccountName, dstContainerName, copyErr)
821813
} else {
822-
klog.V(2).Infof("copied blob container %s to %s successfully", srcPath, dstContainerName)
814+
klog.V(2).Infof("copied blob container %s to %s successfully", srcContainerName, dstContainerName)
823815
}
824816
return copyErr
825817
}
826818
return err
827819
}
828820

821+
// copyVolume copies a volume form volume or snapshot, snapshot is not supported now
822+
func (d *Driver) copyVolume(ctx context.Context, req *csi.CreateVolumeRequest, accountName string, accountSASToken string, authAzcopyEnv []string, dstContainerName, secretNamespace string, accountOptions *azure.AccountOptions, storageEndpointSuffix string) error {
823+
vs := req.VolumeContentSource
824+
switch vs.Type.(type) {
825+
case *csi.VolumeContentSource_Snapshot:
826+
return status.Errorf(codes.InvalidArgument, "VolumeContentSource Snapshot is not yet implemented")
827+
case *csi.VolumeContentSource_Volume:
828+
return d.copyBlobContainer(ctx, req, accountName, accountSASToken, authAzcopyEnv, dstContainerName, secretNamespace, accountOptions, storageEndpointSuffix)
829+
default:
830+
return status.Errorf(codes.InvalidArgument, "%v is not a proper volume source", vs)
831+
}
832+
}
833+
834+
// execAzcopyCopy exec azcopy copy command
835+
func (d *Driver) execAzcopyCopy(srcPath, dstPath string, azcopyCopyOptions, authAzcopyEnv []string) ([]byte, error) {
836+
cmd := exec.Command("azcopy", "copy", srcPath, dstPath)
837+
cmd.Args = append(cmd.Args, azcopyCopyOptions...)
838+
if len(authAzcopyEnv) > 0 {
839+
cmd.Env = append(os.Environ(), authAzcopyEnv...)
840+
}
841+
return cmd.CombinedOutput()
842+
}
843+
829844
// authorizeAzcopyWithIdentity returns auth env for azcopy using cluster identity
830845
func (d *Driver) authorizeAzcopyWithIdentity() ([]string, error) {
831846
azureAuthConfig := d.cloud.Config.AzureAuthConfig

0 commit comments

Comments
 (0)