diff --git a/pkg/blob/blob.go b/pkg/blob/blob.go index 72891f374..2b38462c4 100644 --- a/pkg/blob/blob.go +++ b/pkg/blob/blob.go @@ -277,7 +277,7 @@ func NewDriver(options *DriverOptions, kubeClient kubernetes.Interface, cloud *p sasTokenExpirationMinutes: options.SasTokenExpirationMinutes, waitForAzCopyTimeoutMinutes: options.WaitForAzCopyTimeoutMinutes, fsGroupChangePolicy: options.FSGroupChangePolicy, - azcopy: &util.Azcopy{}, + azcopy: &util.Azcopy{ExecCmd: &util.ExecCommand{}}, KubeClient: kubeClient, cloud: cloud, } diff --git a/pkg/blob/controllerserver.go b/pkg/blob/controllerserver.go index a93b3f5d1..2bf3491e1 100644 --- a/pkg/blob/controllerserver.go +++ b/pkg/blob/controllerserver.go @@ -790,7 +790,7 @@ func (d *Driver) copyBlobContainer(ctx context.Context, req *csi.CreateVolumeReq jobState, percent, err := d.azcopy.GetAzcopyJob(dstContainerName, authAzcopyEnv) klog.V(2).Infof("azcopy job status: %s, copy percent: %s%%, error: %v", jobState, percent, err) switch jobState { - case util.AzcopyJobError, util.AzcopyJobCompleted: + case util.AzcopyJobError, util.AzcopyJobCompleted, util.AzcopyJobCompletedWithErrors, util.AzcopyJobCompletedWithSkipped, util.AzcopyJobCompletedWithErrorsAndSkipped: return err case util.AzcopyJobRunning: err = wait.PollImmediate(20*time.Second, time.Duration(d.waitForAzCopyTimeoutMinutes)*time.Minute, func() (bool, error) { @@ -822,6 +822,9 @@ func (d *Driver) copyBlobContainer(ctx context.Context, req *csi.CreateVolumeReq klog.Warningf("CopyBlobContainer(%s, %s, %s) failed with error: %v", accountOptions.ResourceGroup, dstAccountName, dstContainerName, err) } else { klog.V(2).Infof("copied blob container %s to %s successfully", srcContainerName, dstContainerName) + if out, err := d.azcopy.CleanJobs(); err != nil { + klog.Warningf("clean azcopy jobs failed with error: %v, output: %s", err, string(out)) + } } return err } diff --git a/pkg/util/util.go b/pkg/util/util.go index 116e61502..54c32717c 100644 --- a/pkg/util/util.go +++ b/pkg/util/util.go @@ -45,10 +45,13 @@ const ( type AzcopyJobState string const ( - AzcopyJobError AzcopyJobState = "Error" - AzcopyJobNotFound AzcopyJobState = "NotFound" - AzcopyJobRunning AzcopyJobState = "Running" - AzcopyJobCompleted AzcopyJobState = "Completed" + AzcopyJobError AzcopyJobState = "Error" + AzcopyJobNotFound AzcopyJobState = "NotFound" + AzcopyJobRunning AzcopyJobState = "Running" + AzcopyJobCompleted AzcopyJobState = "Completed" + AzcopyJobCompletedWithErrors AzcopyJobState = "CompletedWithErrors" + AzcopyJobCompletedWithSkipped AzcopyJobState = "CompletedWithSkipped" + AzcopyJobCompletedWithErrorsAndSkipped AzcopyJobState = "CompletedWithErrorsAndSkipped" ) // RoundUpBytes rounds up the volume size in bytes up to multiplications of GiB @@ -243,9 +246,6 @@ func (ac *Azcopy) GetAzcopyJob(dstBlobContainer string, authAzcopyEnv []string) // Start Time: Wednesday, 09-Aug-23 09:09:03 UTC // Status: Cancelled // Command: copy https://{accountName}.file.core.windows.net/{srcBlobContainer}{SAStoken} https://{accountName}.file.core.windows.net/{dstBlobContainer}{SAStoken} --recursive --check-length=false - if ac.ExecCmd == nil { - ac.ExecCmd = &ExecCommand{} - } out, err := ac.ExecCmd.RunCommand(cmdStr, authAzcopyEnv) // if grep command returns nothing, the exec will return exit status 1 error, so filter this error if err != nil && err.Error() != "exit status 1" { @@ -279,13 +279,8 @@ func (ac *Azcopy) GetAzcopyJob(dstBlobContainer string, authAzcopyEnv []string) return jobState, percent, nil } -// TestListJobs test azcopy jobs list command with authAzcopyEnv -func (ac *Azcopy) TestListJobs(accountName, storageEndpointSuffix string, authAzcopyEnv []string) (string, error) { - cmdStr := fmt.Sprintf("azcopy list %s", fmt.Sprintf("https://%s.blob.%s", accountName, storageEndpointSuffix)) - if ac.ExecCmd == nil { - ac.ExecCmd = &ExecCommand{} - } - return ac.ExecCmd.RunCommand(cmdStr, authAzcopyEnv) +func (ac *Azcopy) CleanJobs() (string, error) { + return ac.ExecCmd.RunCommand("azcopy jobs clean", nil) } // parseAzcopyJobList parse command azcopy jobs list, get jobid and state from joblist diff --git a/pkg/util/util_test.go b/pkg/util/util_test.go index 6a7c587d1..dab9a7e7b 100644 --- a/pkg/util/util_test.go +++ b/pkg/util/util_test.go @@ -346,6 +346,41 @@ func TestTrimDuplicatedSpace(t *testing.T) { } } +func TestCleanJobs(t *testing.T) { + tests := []struct { + desc string + execStr string + execErr error + expectedErr error + }{ + { + desc: "run exec get error", + execStr: "", + execErr: fmt.Errorf("error"), + expectedErr: fmt.Errorf("error"), + }, + { + desc: "run exec succeed", + execStr: "cleaned", + execErr: nil, + expectedErr: nil, + }, + } + for _, test := range tests { + ctrl := gomock.NewController(t) + defer ctrl.Finish() + + m := NewMockEXEC(ctrl) + m.EXPECT().RunCommand(gomock.Eq("azcopy jobs clean"), nil).Return(test.execStr, test.execErr) + + azcopyFunc := &Azcopy{ExecCmd: m} + _, err := azcopyFunc.CleanJobs() + if !reflect.DeepEqual(err, test.expectedErr) { + t.Errorf("test[%s]: unexpected err: %v, expected err: %v", test.desc, err, test.expectedErr) + } + } +} + func TestGetAzcopyJob(t *testing.T) { tests := []struct { desc string