diff --git a/pkg/blob/blob.go b/pkg/blob/blob.go index 0ae551961..7d148b138 100644 --- a/pkg/blob/blob.go +++ b/pkg/blob/blob.go @@ -162,6 +162,10 @@ var ( supportedProtocolList = []string{Fuse, Fuse2, NFS, AZNFS} retriableErrors = []string{accountNotProvisioned, tooManyRequests, statusCodeNotFound, containerBeingDeletedDataplaneAPIError, containerBeingDeletedManagementAPIError, clientThrottled} supportedFSGroupChangePolicyList = []string{FSGroupChangeNone, string(v1.FSGroupChangeAlways), string(v1.FSGroupChangeOnRootMismatch)} + + // azcopyCloneVolumeOptions used in volume cloning between different storage account and --check-length to false because volume data may be in changing state, copy volume is not same as current source volume, + // set --s2s-preserve-access-tier=false to avoid BlobAccessTierNotSupportedForAccountType error in azcopy + azcopyCloneVolumeOptions = []string{"--recursive", "--check-length=false", "--s2s-preserve-access-tier=false"} ) // DriverOptions defines driver parameters specified in driver deployment diff --git a/pkg/blob/controllerserver.go b/pkg/blob/controllerserver.go index 1ee2df5a1..0a34b1adf 100644 --- a/pkg/blob/controllerserver.go +++ b/pkg/blob/controllerserver.go @@ -366,36 +366,12 @@ func (d *Driver) CreateVolume(ctx context.Context, req *csi.CreateVolumeRequest) defer d.volumeLocks.Release(volName) requestName := "controller_create_volume" - - var srcAzcopyAuthEnv []string - var srcSubscriptionID, srcResourceGroupName, srcAccountName, srcContainerName, srcPath, srcAccountSASToken string if volContentSource != nil { switch volContentSource.Type.(type) { case *csi.VolumeContentSource_Snapshot: return nil, status.Errorf(codes.InvalidArgument, "VolumeContentSource Snapshot is not yet implemented") case *csi.VolumeContentSource_Volume: requestName = "controller_create_volume_from_volume" - var srcVolumeID string - if volContentSource.GetVolume() != nil { - srcVolumeID = volContentSource.GetVolume().GetVolumeId() - } - srcResourceGroupName, srcAccountName, srcContainerName, _, srcSubscriptionID, err = GetContainerInfo(srcVolumeID) - if err != nil { - return nil, status.Error(codes.NotFound, err.Error()) - } - srcAccountOptions := &azure.AccountOptions{ - Name: srcAccountName, - SubscriptionID: srcSubscriptionID, - ResourceGroup: srcResourceGroupName, - GetLatestAccountKey: getLatestAccountKey, - } - srcAccountSASToken, srcAzcopyAuthEnv, err = d.getAzcopyAuth(ctx, srcAccountName, "", storageEndpointSuffix, srcAccountOptions, secrets, secretName, secretNamespace) - if err != nil { - return nil, status.Errorf(codes.Internal, "failed to getAzcopyAuth on account(%s) rg(%s), error: %v", accountOptions.Name, accountOptions.ResourceGroup, err) - } - srcPath = fmt.Sprintf("https://%s.blob.%s/%s", srcAccountName, storageEndpointSuffix, srcContainerName) - default: - return nil, status.Errorf(codes.InvalidArgument, "VolumeContentSource is not recognized: %v", volContentSource) } } @@ -466,16 +442,11 @@ func (d *Driver) CreateVolume(ctx context.Context, req *csi.CreateVolumeRequest) return nil, status.Errorf(codes.Internal, "failed to create container(%s) on account(%s) type(%s) rg(%s) location(%s) size(%d), error: %v", validContainerName, accountName, storageAccountType, resourceGroup, location, requestGiB, err) } if volContentSource != nil { - dstAzcopyAuthEnv := srcAzcopyAuthEnv - dstAccountSASToken := srcAccountSASToken - if srcAccountName != accountName { - if dstAccountSASToken, dstAzcopyAuthEnv, err = d.getAzcopyAuth(ctx, accountName, accountKey, storageEndpointSuffix, accountOptions, secrets, secretName, secretNamespace); err != nil { - return nil, status.Errorf(codes.Internal, "failed to getAzcopyAuth on account(%s) rg(%s), error: %v", accountOptions.Name, accountOptions.ResourceGroup, err) - } + accountSASToken, authAzcopyEnv, err := d.getAzcopyAuth(ctx, accountName, accountKey, storageEndpointSuffix, accountOptions, secrets, secretName, secretNamespace) + if err != nil { + return nil, status.Errorf(codes.Internal, "failed to getAzcopyAuth on account(%s) rg(%s), error: %v", accountOptions.Name, accountOptions.ResourceGroup, err) } - - dstPath := fmt.Sprintf("https://%s.blob.%s/%s", accountName, storageEndpointSuffix, validContainerName) - if err := d.copyBlobContainer(dstAzcopyAuthEnv, srcPath, srcAccountSASToken, dstPath, dstAccountSASToken, validContainerName); err != nil { + if err := d.copyVolume(ctx, req, accountName, accountSASToken, authAzcopyEnv, validContainerName, secretNamespace, accountOptions, storageEndpointSuffix); err != nil { return nil, err } } @@ -786,11 +757,36 @@ func (d *Driver) DeleteBlobContainer(ctx context.Context, subsID, resourceGroupN } // copyBlobContainer copies source volume content into a destination volume -func (d *Driver) copyBlobContainer(authAzcopyEnv []string, srcPath string, srcAccountSASToken string, dstPath string, dstAccountSASToken string, dstContainerName string) error { +func (d *Driver) copyBlobContainer(ctx context.Context, req *csi.CreateVolumeRequest, dstAccountName string, dstAccountSasToken string, authAzcopyEnv []string, dstContainerName string, secretNamespace string, accountOptions *azure.AccountOptions, storageEndpointSuffix string) error { + var sourceVolumeID string + if req.GetVolumeContentSource() != nil && req.GetVolumeContentSource().GetVolume() != nil { + sourceVolumeID = req.GetVolumeContentSource().GetVolume().GetVolumeId() - if srcPath == "" || dstPath == "" || dstContainerName == "" { - return fmt.Errorf("srcPath(%s) or dstPath(%s) or dstContainerName(%s) is empty", srcPath, dstPath, dstContainerName) } + srcResourceGroupName, srcAccountName, srcContainerName, _, srcSubscriptionID, err := GetContainerInfo(sourceVolumeID) //nolint:dogsled + if err != nil { + return status.Error(codes.NotFound, err.Error()) + } + if dstAccountName == "" { + dstAccountName = srcAccountName + } + if srcAccountName == "" || srcContainerName == "" || dstContainerName == "" { + return fmt.Errorf("srcAccountName(%s) or srcContainerName(%s) or dstContainerName(%s) is empty", srcAccountName, srcContainerName, dstContainerName) + } + srcAccountSasToken := dstAccountSasToken + if srcAccountName != dstAccountName && dstAccountSasToken != "" { + srcAccountOptions := &azure.AccountOptions{ + Name: srcAccountName, + ResourceGroup: srcResourceGroupName, + SubscriptionID: srcSubscriptionID, + GetLatestAccountKey: accountOptions.GetLatestAccountKey, + } + if srcAccountSasToken, _, err = d.getAzcopyAuth(ctx, srcAccountName, "", storageEndpointSuffix, srcAccountOptions, nil, "", secretNamespace); err != nil { + return err + } + } + srcPath := fmt.Sprintf("https://%s.blob.%s/%s%s", srcAccountName, storageEndpointSuffix, srcContainerName, srcAccountSasToken) + dstPath := fmt.Sprintf("https://%s.blob.%s/%s%s", dstAccountName, storageEndpointSuffix, dstContainerName, dstAccountSasToken) jobState, percent, err := d.azcopy.GetAzcopyJob(dstContainerName, authAzcopyEnv) klog.V(2).Infof("azcopy job status: %s, copy percent: %s%%, error: %v", jobState, percent, err) @@ -800,13 +796,9 @@ func (d *Driver) copyBlobContainer(authAzcopyEnv []string, srcPath string, srcAc case util.AzcopyJobRunning: return fmt.Errorf("wait for the existing AzCopy job to complete, current copy percentage is %s%%", percent) case util.AzcopyJobNotFound: - klog.V(2).Infof("copy blob container %s to %s", srcPath, dstContainerName) + klog.V(2).Infof("copy blob container %s:%s to %s:%s", srcAccountName, srcContainerName, dstAccountName, dstContainerName) execFunc := func() error { - cmd := exec.Command("azcopy", "copy", srcPath+srcAccountSASToken, dstPath+dstAccountSASToken, "--recursive", "--check-length=false", "--s2s-preserve-access-tier=false") - if len(authAzcopyEnv) > 0 { - cmd.Env = append(os.Environ(), authAzcopyEnv...) - } - if out, err := cmd.CombinedOutput(); err != nil { + if out, err := d.execAzcopyCopy(srcPath, dstPath, azcopyCloneVolumeOptions, authAzcopyEnv); err != nil { return fmt.Errorf("exec error: %v, output: %v", err, string(out)) } return nil @@ -817,15 +809,38 @@ func (d *Driver) copyBlobContainer(authAzcopyEnv []string, srcPath string, srcAc } copyErr := util.WaitUntilTimeout(time.Duration(d.waitForAzCopyTimeoutMinutes)*time.Minute, execFunc, timeoutFunc) if copyErr != nil { - klog.Warningf("CopyBlobContainer(%s, %s, %s) failed with error: %v", srcPath, dstPath, dstContainerName, copyErr) + klog.Warningf("CopyBlobContainer(%s, %s, %s) failed with error: %v", accountOptions.ResourceGroup, dstAccountName, dstContainerName, copyErr) } else { - klog.V(2).Infof("copied blob container %s to %s successfully", srcPath, dstContainerName) + klog.V(2).Infof("copied blob container %s to %s successfully", srcContainerName, dstContainerName) } return copyErr } return err } +// copyVolume copies a volume form volume or snapshot, snapshot is not supported now +func (d *Driver) copyVolume(ctx context.Context, req *csi.CreateVolumeRequest, accountName string, accountSASToken string, authAzcopyEnv []string, dstContainerName, secretNamespace string, accountOptions *azure.AccountOptions, storageEndpointSuffix string) error { + vs := req.VolumeContentSource + switch vs.Type.(type) { + case *csi.VolumeContentSource_Snapshot: + return status.Errorf(codes.InvalidArgument, "VolumeContentSource Snapshot is not yet implemented") + case *csi.VolumeContentSource_Volume: + return d.copyBlobContainer(ctx, req, accountName, accountSASToken, authAzcopyEnv, dstContainerName, secretNamespace, accountOptions, storageEndpointSuffix) + default: + return status.Errorf(codes.InvalidArgument, "%v is not a proper volume source", vs) + } +} + +// execAzcopyCopy exec azcopy copy command +func (d *Driver) execAzcopyCopy(srcPath, dstPath string, azcopyCopyOptions, authAzcopyEnv []string) ([]byte, error) { + cmd := exec.Command("azcopy", "copy", srcPath, dstPath) + cmd.Args = append(cmd.Args, azcopyCopyOptions...) + if len(authAzcopyEnv) > 0 { + cmd.Env = append(os.Environ(), authAzcopyEnv...) + } + return cmd.CombinedOutput() +} + // authorizeAzcopyWithIdentity returns auth env for azcopy using cluster identity func (d *Driver) authorizeAzcopyWithIdentity() ([]string, error) { azureAuthConfig := d.cloud.Config.AzureAuthConfig diff --git a/pkg/blob/controllerserver_test.go b/pkg/blob/controllerserver_test.go index 3e52282a1..057b2a9f7 100644 --- a/pkg/blob/controllerserver_test.go +++ b/pkg/blob/controllerserver_test.go @@ -820,8 +820,6 @@ func TestCreateVolume(t *testing.T) { d.cloud.StorageAccountClient = NewMockSAClient(context.Background(), gomock.NewController(t), "subID", "unit-test", "unit-test", &keyList) d.cloud.Config.AzureAuthConfig.UseManagedIdentityExtension = true - d.clientFactory = mock_azclient.NewMockClientFactory(gomock.NewController(t)) - mp := make(map[string]string) mp[protocolField] = "fuse" mp[skuNameField] = "unit-test" @@ -855,6 +853,15 @@ func TestCreateVolume(t *testing.T) { defer ctrl.Finish() m := util.NewMockEXEC(ctrl) + + clientFactoryMock := mock_azclient.NewMockClientFactory(ctrl) + blobClientMock := mock_blobcontainerclient.NewMockInterface(ctrl) + blobClientMock.EXPECT().CreateContainer(gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any()).Return(nil, nil) + clientFactoryMock.EXPECT().GetBlobContainerClientForSub(gomock.Any()).Return(blobClientMock, nil) + d.clientFactory = clientFactoryMock + + listStr := "no error" + m.EXPECT().RunCommand(gomock.Any(), gomock.Any()).Return(listStr, nil) d.azcopy.ExecCmd = m expectedErr := status.Errorf(codes.NotFound, "error parsing volume id: \"unit-test\", should at least contain two #") @@ -1525,38 +1532,167 @@ func TestDeleteBlobContainer(t *testing.T) { } func TestCopyVolume(t *testing.T) { + stdVolumeCapability := &csi.VolumeCapability{ + AccessType: &csi.VolumeCapability_Mount{ + Mount: &csi.VolumeCapability_MountVolume{}, + }, + } + stdVolumeCapabilities := []*csi.VolumeCapability{ + stdVolumeCapability, + } testCases := []struct { name string testFunc func(t *testing.T) }{ { - name: "src path is empty", + name: "copy volume from volumeSnapshot is not supported", + testFunc: func(t *testing.T) { + ctx := context.Background() + d := NewFakeDriver() + mp := map[string]string{} + + volumeSnapshotSource := &csi.VolumeContentSource_SnapshotSource{ + SnapshotId: "unit-test", + } + volumeContentSourceSnapshotSource := &csi.VolumeContentSource_Snapshot{ + Snapshot: volumeSnapshotSource, + } + volumecontensource := csi.VolumeContentSource{ + Type: volumeContentSourceSnapshotSource, + } + req := &csi.CreateVolumeRequest{ + Name: "unit-test", + VolumeCapabilities: stdVolumeCapabilities, + Parameters: mp, + VolumeContentSource: &volumecontensource, + } + + expectedErr := status.Errorf(codes.InvalidArgument, "VolumeContentSource Snapshot is not yet implemented") + err := d.copyVolume(ctx, req, "", "", nil, "", "", nil, "core.windows.net") + if !reflect.DeepEqual(err, expectedErr) { + t.Errorf("Unexpected error: %v", err) + } + }, + }, + { + name: "copy volume from volume not found", + testFunc: func(t *testing.T) { + ctx := context.Background() + d := NewFakeDriver() + mp := map[string]string{} + + volumeSource := &csi.VolumeContentSource_VolumeSource{ + VolumeId: "unit-test", + } + volumeContentSourceVolumeSource := &csi.VolumeContentSource_Volume{ + Volume: volumeSource, + } + volumecontensource := csi.VolumeContentSource{ + Type: volumeContentSourceVolumeSource, + } + + req := &csi.CreateVolumeRequest{ + Name: "unit-test", + VolumeCapabilities: stdVolumeCapabilities, + Parameters: mp, + VolumeContentSource: &volumecontensource, + } + + expectedErr := status.Errorf(codes.NotFound, "error parsing volume id: \"unit-test\", should at least contain two #") + err := d.copyVolume(ctx, req, "", "", nil, "dstContainer", "", nil, "core.windows.net") + if !reflect.DeepEqual(err, expectedErr) { + t.Errorf("Unexpected error: %v", err) + } + }, + }, + { + name: "src blob container is empty", testFunc: func(t *testing.T) { + ctx := context.Background() d := NewFakeDriver() - expectedErr := fmt.Errorf("srcPath() or dstPath(dstPath) or dstContainerName(dstContainer) is empty") - err := d.copyBlobContainer([]string{}, "", "", "dstPath", "", "dstContainer") + mp := map[string]string{} + + volumeSource := &csi.VolumeContentSource_VolumeSource{ + VolumeId: "rg#unit-test##", + } + volumeContentSourceVolumeSource := &csi.VolumeContentSource_Volume{ + Volume: volumeSource, + } + volumecontensource := csi.VolumeContentSource{ + Type: volumeContentSourceVolumeSource, + } + + req := &csi.CreateVolumeRequest{ + Name: "unit-test", + VolumeCapabilities: stdVolumeCapabilities, + Parameters: mp, + VolumeContentSource: &volumecontensource, + } + + expectedErr := fmt.Errorf("srcAccountName(unit-test) or srcContainerName() or dstContainerName(dstContainer) is empty") + err := d.copyVolume(ctx, req, "", "", nil, "dstContainer", "", nil, "core.windows.net") if !reflect.DeepEqual(err, expectedErr) { t.Errorf("Unexpected error: %v", err) } }, }, { - name: "dst path is empty", + name: "dst blob container is empty", testFunc: func(t *testing.T) { + ctx := context.Background() d := NewFakeDriver() - expectedErr := fmt.Errorf("srcPath(srcPath) or dstPath() or dstContainerName(dstContainer) is empty") - err := d.copyBlobContainer([]string{}, "srcPath", "", "", "", "dstContainer") + mp := map[string]string{} + + volumeSource := &csi.VolumeContentSource_VolumeSource{ + VolumeId: "vol_1#f5713de20cde511e8ba4900#fileshare#", + } + volumeContentSourceVolumeSource := &csi.VolumeContentSource_Volume{ + Volume: volumeSource, + } + volumecontensource := csi.VolumeContentSource{ + Type: volumeContentSourceVolumeSource, + } + + req := &csi.CreateVolumeRequest{ + Name: "unit-test", + VolumeCapabilities: stdVolumeCapabilities, + Parameters: mp, + VolumeContentSource: &volumecontensource, + } + + expectedErr := fmt.Errorf("srcAccountName(f5713de20cde511e8ba4900) or srcContainerName(fileshare) or dstContainerName() is empty") + err := d.copyVolume(ctx, req, "", "", nil, "", "", nil, "core.windows.net") if !reflect.DeepEqual(err, expectedErr) { t.Errorf("Unexpected error: %v", err) } }, }, { - name: "dst container is empty", + name: "dstAccount Name is different and storageAccountClient is nil", testFunc: func(t *testing.T) { + ctx := context.Background() d := NewFakeDriver() - expectedErr := fmt.Errorf("srcPath(srcPath) or dstPath(dstPath) or dstContainerName() is empty") - err := d.copyBlobContainer([]string{}, "srcPath", "", "dstPath", "", "") + mp := map[string]string{} + + volumeSource := &csi.VolumeContentSource_VolumeSource{ + VolumeId: "vol_1#f5713de20cde511e8ba4900#fileshare#", + } + volumeContentSourceVolumeSource := &csi.VolumeContentSource_Volume{ + Volume: volumeSource, + } + volumecontensource := csi.VolumeContentSource{ + Type: volumeContentSourceVolumeSource, + } + + req := &csi.CreateVolumeRequest{ + Name: "unit-test", + VolumeCapabilities: stdVolumeCapabilities, + Parameters: mp, + VolumeContentSource: &volumecontensource, + } + + expectedErr := fmt.Errorf("StorageAccountClient is nil") + err := d.copyVolume(ctx, req, "dstAccount", "sastoken", nil, "dstContainer", "", &azure.AccountOptions{Name: "dstAccount", ResourceGroup: "rg", SubscriptionID: "subsID", GetLatestAccountKey: true}, "core.windows.net") if !reflect.DeepEqual(err, expectedErr) { t.Errorf("Unexpected error: %v", err) } @@ -1565,21 +1701,38 @@ func TestCopyVolume(t *testing.T) { { name: "azcopy job is already completed", testFunc: func(t *testing.T) { + ctx := context.Background() d := NewFakeDriver() + mp := map[string]string{} + + volumeSource := &csi.VolumeContentSource_VolumeSource{ + VolumeId: "vol_1#f5713de20cde511e8ba4900#fileshare#", + } + volumeContentSourceVolumeSource := &csi.VolumeContentSource_Volume{ + Volume: volumeSource, + } + volumecontensource := csi.VolumeContentSource{ + Type: volumeContentSourceVolumeSource, + } + + req := &csi.CreateVolumeRequest{ + Name: "unit-test", + VolumeCapabilities: stdVolumeCapabilities, + Parameters: mp, + VolumeContentSource: &volumecontensource, + } + ctrl := gomock.NewController(t) defer ctrl.Finish() m := util.NewMockEXEC(ctrl) listStr := "JobId: ed1c3833-eaff-fe42-71d7-513fb065a9d9\nStart Time: Monday, 07-Aug-23 03:29:54 UTC\nStatus: Completed\nCommand: copy https://{accountName}.file.core.windows.net/{srcFileshare}{SAStoken} https://{accountName}.file.core.windows.net/{dstFileshare}{SAStoken} --recursive --check-length=false" m.EXPECT().RunCommand(gomock.Eq("azcopy jobs list | grep dstContainer -B 3"), gomock.Any()).Return(listStr, nil) - // if test.enableShow { - // m.EXPECT().RunCommand(gomock.Not("azcopy jobs list | grep dstContainer -B 3")).Return(test.showStr, test.showErr) - // } d.azcopy.ExecCmd = m var expectedErr error - err := d.copyBlobContainer([]string{}, "srcPath", "", "dstPath", "", "dstContainer") + err := d.copyVolume(ctx, req, "", "sastoken", nil, "dstContainer", "", nil, "core.windows.net") if !reflect.DeepEqual(err, expectedErr) { t.Errorf("Unexpected error: %v", err) } @@ -1588,7 +1741,27 @@ func TestCopyVolume(t *testing.T) { { name: "azcopy job is in progress", testFunc: func(t *testing.T) { + ctx := context.Background() d := NewFakeDriver() + mp := map[string]string{} + + volumeSource := &csi.VolumeContentSource_VolumeSource{ + VolumeId: "vol_1#f5713de20cde511e8ba4900#fileshare#", + } + volumeContentSourceVolumeSource := &csi.VolumeContentSource_Volume{ + Volume: volumeSource, + } + volumecontensource := csi.VolumeContentSource{ + Type: volumeContentSourceVolumeSource, + } + + req := &csi.CreateVolumeRequest{ + Name: "unit-test", + VolumeCapabilities: stdVolumeCapabilities, + Parameters: mp, + VolumeContentSource: &volumecontensource, + } + ctrl := gomock.NewController(t) defer ctrl.Finish() @@ -1600,7 +1773,7 @@ func TestCopyVolume(t *testing.T) { d.azcopy.ExecCmd = m expectedErr := fmt.Errorf("wait for the existing AzCopy job to complete, current copy percentage is 50.0%%") - err := d.copyBlobContainer([]string{}, "srcPath", "", "dstPath", "", "dstContainer") + err := d.copyVolume(ctx, req, "", "sastoken", nil, "dstContainer", "", nil, "core.windows.net") if !reflect.DeepEqual(err, expectedErr) { t.Errorf("Unexpected error: %v", err) }