Skip to content

Commit 82ef6e6

Browse files
authored
Merge pull request #230 from xing-yang/requeue_sidecar
Fix the requeue logic
2 parents 790068f + cbd5b8b commit 82ef6e6

File tree

4 files changed

+126
-166
lines changed

4 files changed

+126
-166
lines changed

pkg/sidecar-controller/framework_test.go

Lines changed: 1 addition & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -409,24 +409,6 @@ func (r *snapshotReactor) getChangeCount() int {
409409
return r.changedSinceLastSync
410410
}
411411

412-
// waitForIdle waits until all tests, controllers and other goroutines do their
413-
// job and no new actions are registered for 10 milliseconds.
414-
func (r *snapshotReactor) waitForIdle() {
415-
r.ctrl.runningOperations.WaitForCompletion()
416-
// Check every 10ms if the controller does something and stop if it's
417-
// idle.
418-
oldChanges := -1
419-
for {
420-
time.Sleep(10 * time.Millisecond)
421-
changes := r.getChangeCount()
422-
if changes == oldChanges {
423-
// No changes for last 10ms -> controller must be idle.
424-
break
425-
}
426-
oldChanges = changes
427-
}
428-
}
429-
430412
// waitTest waits until all tests, controllers and other goroutines do their
431413
// job and list of current contents/snapshots is equal to list of expected
432414
// contents/snapshots (with ~10 second timeout).
@@ -439,9 +421,6 @@ func (r *snapshotReactor) waitTest(test controllerTest) error {
439421
Steps: 10,
440422
}
441423
err := wait.ExponentialBackoff(backoff, func() (done bool, err error) {
442-
// Finish all operations that are in progress
443-
r.ctrl.runningOperations.WaitForCompletion()
444-
445424
// Return 'true' if the reactor reached the expected state
446425
err1 := r.checkContents(test.expectedContents)
447426
if err1 == nil {
@@ -768,7 +747,7 @@ func runSyncContentTests(t *testing.T, tests []controllerTest, snapshotClasses [
768747

769748
// Run the tested functions
770749
err = test.test(ctrl, reactor, test)
771-
if err != nil {
750+
if test.expectSuccess && err != nil {
772751
t.Errorf("Test %q failed: %v", test.name, err)
773752
}
774753

pkg/sidecar-controller/snapshot_controller.go

Lines changed: 40 additions & 78 deletions
Original file line numberDiff line numberDiff line change
@@ -29,8 +29,6 @@ import (
2929
v1 "k8s.io/api/core/v1"
3030
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
3131
"k8s.io/klog"
32-
"k8s.io/kubernetes/pkg/util/goroutinemap"
33-
"k8s.io/kubernetes/pkg/util/goroutinemap/exponentialbackoff"
3432
"k8s.io/kubernetes/pkg/util/slice"
3533
)
3634

@@ -73,101 +71,65 @@ func (ctrl *csiSnapshotSideCarController) syncContent(content *crdv1.VolumeSnaps
7371
// if there is one so that API server could delete the object if there is
7472
// no other finalizer.
7573
return ctrl.removeContentFinalizer(content)
76-
7774
}
7875
if content.Spec.Source.VolumeHandle != nil && content.Status == nil {
7976
klog.V(5).Infof("syncContent: Call CreateSnapshot for content %s", content.Name)
80-
ctrl.createSnapshot(content)
81-
} else {
82-
// Skip checkandUpdateContentStatus() if ReadyToUse is
83-
// already true. We don't want to keep calling CreateSnapshot
84-
// or ListSnapshots CSI methods over and over again for
85-
// performance reasons.
86-
if content.Status != nil && content.Status.ReadyToUse != nil && *content.Status.ReadyToUse == true {
87-
// Try to remove AnnVolumeSnapshotBeingCreated if it is not removed yet for some reason
88-
err := ctrl.removeAnnVolumeSnapshotBeingCreated(content)
89-
if err != nil {
90-
return fmt.Errorf("failed to remove VolumeSnapshotBeingCreated annotation from the content %s: %q", content.Name, err)
91-
}
92-
return nil
93-
}
94-
ctrl.checkandUpdateContentStatus(content)
77+
return ctrl.createSnapshot(content)
9578
}
96-
97-
return nil
79+
// Skip checkandUpdateContentStatus() if ReadyToUse is
80+
// already true. We don't want to keep calling CreateSnapshot
81+
// or ListSnapshots CSI methods over and over again for
82+
// performance reasons.
83+
if content.Status != nil && content.Status.ReadyToUse != nil && *content.Status.ReadyToUse == true {
84+
// Try to remove AnnVolumeSnapshotBeingCreated if it is not removed yet for some reason
85+
return ctrl.removeAnnVolumeSnapshotBeingCreated(content)
86+
}
87+
return ctrl.checkandUpdateContentStatus(content)
9888
}
9989

10090
// deleteCSISnapshot starts delete action.
10191
func (ctrl *csiSnapshotSideCarController) deleteCSISnapshot(content *crdv1.VolumeSnapshotContent) error {
102-
operationName := fmt.Sprintf("delete-%s", content.Name)
103-
klog.V(5).Infof("schedule to delete snapshot, operation named %s", operationName)
104-
ctrl.scheduleOperation(operationName, func() error {
105-
return ctrl.deleteCSISnapshotOperation(content)
106-
})
107-
return nil
108-
}
109-
110-
// scheduleOperation starts given asynchronous operation on given snapshot. It
111-
// makes sure there is no running operation with the same operationName
112-
func (ctrl *csiSnapshotSideCarController) scheduleOperation(operationName string, operation func() error) {
113-
klog.V(5).Infof("scheduleOperation[%s]", operationName)
114-
115-
err := ctrl.runningOperations.Run(operationName, operation)
116-
if err != nil {
117-
switch {
118-
case goroutinemap.IsAlreadyExists(err):
119-
klog.V(4).Infof("operation %q is already running, skipping", operationName)
120-
case exponentialbackoff.IsExponentialBackoff(err):
121-
klog.V(4).Infof("operation %q postponed due to exponential backoff", operationName)
122-
default:
123-
klog.Errorf("error scheduling operation %q: %v", operationName, err)
124-
}
125-
}
92+
klog.V(5).Infof("Deleting snapshot for content: %s", content.Name)
93+
return ctrl.deleteCSISnapshotOperation(content)
12694
}
12795

12896
func (ctrl *csiSnapshotSideCarController) storeContentUpdate(content interface{}) (bool, error) {
12997
return utils.StoreObjectUpdate(ctrl.contentStore, content, "content")
13098
}
13199

132100
// createSnapshot starts new asynchronous operation to create snapshot
133-
func (ctrl *csiSnapshotSideCarController) createSnapshot(content *crdv1.VolumeSnapshotContent) {
101+
func (ctrl *csiSnapshotSideCarController) createSnapshot(content *crdv1.VolumeSnapshotContent) error {
134102
klog.V(5).Infof("createSnapshot for content [%s]: started", content.Name)
135-
opName := fmt.Sprintf("create-%s", content.Name)
136-
ctrl.scheduleOperation(opName, func() error {
137-
contentObj, err := ctrl.createSnapshotWrapper(content)
138-
if err != nil {
139-
ctrl.updateContentErrorStatusWithEvent(content, v1.EventTypeWarning, "SnapshotCreationFailed", fmt.Sprintf("Failed to create snapshot: %v", err))
140-
klog.Errorf("createSnapshot [%s]: error occurred in createSnapshotWrapper: %v", opName, err)
141-
return err
142-
}
103+
contentObj, err := ctrl.createSnapshotWrapper(content)
104+
if err != nil {
105+
ctrl.updateContentErrorStatusWithEvent(content, v1.EventTypeWarning, "SnapshotCreationFailed", fmt.Sprintf("Failed to create snapshot: %v", err))
106+
klog.Errorf("createSnapshot for content [%s]: error occurred in createSnapshotWrapper: %v", content.Name, err)
107+
return err
108+
}
143109

144-
_, updateErr := ctrl.storeContentUpdate(contentObj)
145-
if updateErr != nil {
146-
// We will get an "snapshot update" event soon, this is not a big error
147-
klog.V(4).Infof("createSnapshot [%s]: cannot update internal content cache: %v", content.Name, updateErr)
148-
}
149-
return nil
150-
})
110+
_, updateErr := ctrl.storeContentUpdate(contentObj)
111+
if updateErr != nil {
112+
// We will get an "snapshot update" event soon, this is not a big error
113+
klog.V(4).Infof("createSnapshot for content [%s]: cannot update internal content cache: %v", content.Name, updateErr)
114+
}
115+
return nil
151116
}
152117

153-
func (ctrl *csiSnapshotSideCarController) checkandUpdateContentStatus(content *crdv1.VolumeSnapshotContent) {
118+
func (ctrl *csiSnapshotSideCarController) checkandUpdateContentStatus(content *crdv1.VolumeSnapshotContent) error {
154119
klog.V(5).Infof("checkandUpdateContentStatus[%s] started", content.Name)
155-
opName := fmt.Sprintf("check-%s", content.Name)
156-
ctrl.scheduleOperation(opName, func() error {
157-
contentObj, err := ctrl.checkandUpdateContentStatusOperation(content)
158-
if err != nil {
159-
ctrl.updateContentErrorStatusWithEvent(content, v1.EventTypeWarning, "SnapshotContentCheckandUpdateFailed", fmt.Sprintf("Failed to check and update snapshot content: %v", err))
160-
klog.Errorf("checkandUpdateContentStatus [%s]: error occurred %v", content.Name, err)
161-
return err
162-
}
163-
_, updateErr := ctrl.storeContentUpdate(contentObj)
164-
if updateErr != nil {
165-
// We will get an "snapshot update" event soon, this is not a big error
166-
klog.V(4).Infof("checkandUpdateContentStatus [%s]: cannot update internal cache: %v", content.Name, updateErr)
167-
}
120+
contentObj, err := ctrl.checkandUpdateContentStatusOperation(content)
121+
if err != nil {
122+
ctrl.updateContentErrorStatusWithEvent(content, v1.EventTypeWarning, "SnapshotContentCheckandUpdateFailed", fmt.Sprintf("Failed to check and update snapshot content: %v", err))
123+
klog.Errorf("checkandUpdateContentStatus [%s]: error occurred %v", content.Name, err)
124+
return err
125+
}
126+
_, updateErr := ctrl.storeContentUpdate(contentObj)
127+
if updateErr != nil {
128+
// We will get an "snapshot update" event soon, this is not a big error
129+
klog.V(4).Infof("checkandUpdateContentStatus [%s]: cannot update internal cache: %v", content.Name, updateErr)
130+
}
168131

169-
return nil
170-
})
132+
return nil
171133
}
172134

173135
// updateContentStatusWithEvent saves new content.Status to API server and emits
@@ -384,8 +346,8 @@ func (ctrl *csiSnapshotSideCarController) deleteCSISnapshotOperation(content *cr
384346
ctrl.eventRecorder.Event(content, v1.EventTypeWarning, "SnapshotDeleteError", "Failed to clear content status")
385347
return err
386348
}
387-
// update local cache
388-
ctrl.updateContentInCacheStore(newContent)
349+
// trigger syncContent
350+
ctrl.updateContentInInformerCache(newContent)
389351
return nil
390352
}
391353

0 commit comments

Comments
 (0)