Skip to content

Commit 7a5aafa

Browse files
committed
Remove scheduleOperation
1 parent 3384ab2 commit 7a5aafa

File tree

2 files changed

+52
-181
lines changed

2 files changed

+52
-181
lines changed

pkg/sidecar-controller/snapshot_controller.go

Lines changed: 40 additions & 78 deletions
Original file line numberDiff line numberDiff line change
@@ -29,8 +29,6 @@ import (
2929
v1 "k8s.io/api/core/v1"
3030
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
3131
"k8s.io/klog"
32-
"k8s.io/kubernetes/pkg/util/goroutinemap"
33-
"k8s.io/kubernetes/pkg/util/goroutinemap/exponentialbackoff"
3432
"k8s.io/kubernetes/pkg/util/slice"
3533
)
3634

@@ -73,101 +71,65 @@ func (ctrl *csiSnapshotSideCarController) syncContent(content *crdv1.VolumeSnaps
7371
// if there is one so that API server could delete the object if there is
7472
// no other finalizer.
7573
return ctrl.removeContentFinalizer(content)
76-
7774
}
7875
if content.Spec.Source.VolumeHandle != nil && content.Status == nil {
7976
klog.V(5).Infof("syncContent: Call CreateSnapshot for content %s", content.Name)
80-
ctrl.createSnapshot(content)
81-
} else {
82-
// Skip checkandUpdateContentStatus() if ReadyToUse is
83-
// already true. We don't want to keep calling CreateSnapshot
84-
// or ListSnapshots CSI methods over and over again for
85-
// performance reasons.
86-
if content.Status != nil && content.Status.ReadyToUse != nil && *content.Status.ReadyToUse == true {
87-
// Try to remove AnnVolumeSnapshotBeingCreated if it is not removed yet for some reason
88-
err := ctrl.removeAnnVolumeSnapshotBeingCreated(content)
89-
if err != nil {
90-
return fmt.Errorf("failed to remove VolumeSnapshotBeingCreated annotation from the content %s: %q", content.Name, err)
91-
}
92-
return nil
93-
}
94-
ctrl.checkandUpdateContentStatus(content)
77+
return ctrl.createSnapshot(content)
9578
}
96-
97-
return nil
79+
// Skip checkandUpdateContentStatus() if ReadyToUse is
80+
// already true. We don't want to keep calling CreateSnapshot
81+
// or ListSnapshots CSI methods over and over again for
82+
// performance reasons.
83+
if content.Status != nil && content.Status.ReadyToUse != nil && *content.Status.ReadyToUse == true {
84+
// Try to remove AnnVolumeSnapshotBeingCreated if it is not removed yet for some reason
85+
return ctrl.removeAnnVolumeSnapshotBeingCreated(content)
86+
}
87+
return ctrl.checkandUpdateContentStatus(content)
9888
}
9989

10090
// deleteCSISnapshot starts delete action.
10191
func (ctrl *csiSnapshotSideCarController) deleteCSISnapshot(content *crdv1.VolumeSnapshotContent) error {
102-
operationName := fmt.Sprintf("delete-%s", content.Name)
103-
klog.V(5).Infof("schedule to delete snapshot, operation named %s", operationName)
104-
ctrl.scheduleOperation(operationName, func() error {
105-
return ctrl.deleteCSISnapshotOperation(content)
106-
})
107-
return nil
108-
}
109-
110-
// scheduleOperation starts given asynchronous operation on given snapshot. It
111-
// makes sure there is no running operation with the same operationName
112-
func (ctrl *csiSnapshotSideCarController) scheduleOperation(operationName string, operation func() error) {
113-
klog.V(5).Infof("scheduleOperation[%s]", operationName)
114-
115-
err := ctrl.runningOperations.Run(operationName, operation)
116-
if err != nil {
117-
switch {
118-
case goroutinemap.IsAlreadyExists(err):
119-
klog.V(4).Infof("operation %q is already running, skipping", operationName)
120-
case exponentialbackoff.IsExponentialBackoff(err):
121-
klog.V(4).Infof("operation %q postponed due to exponential backoff", operationName)
122-
default:
123-
klog.Errorf("error scheduling operation %q: %v", operationName, err)
124-
}
125-
}
92+
klog.V(5).Infof("Deleting snapshot for content: %s", content.Name)
93+
return ctrl.deleteCSISnapshotOperation(content)
12694
}
12795

12896
func (ctrl *csiSnapshotSideCarController) storeContentUpdate(content interface{}) (bool, error) {
12997
return utils.StoreObjectUpdate(ctrl.contentStore, content, "content")
13098
}
13199

132100
// createSnapshot starts new asynchronous operation to create snapshot
133-
func (ctrl *csiSnapshotSideCarController) createSnapshot(content *crdv1.VolumeSnapshotContent) {
101+
func (ctrl *csiSnapshotSideCarController) createSnapshot(content *crdv1.VolumeSnapshotContent) error {
134102
klog.V(5).Infof("createSnapshot for content [%s]: started", content.Name)
135-
opName := fmt.Sprintf("create-%s", content.Name)
136-
ctrl.scheduleOperation(opName, func() error {
137-
contentObj, err := ctrl.createSnapshotWrapper(content)
138-
if err != nil {
139-
ctrl.updateContentErrorStatusWithEvent(content, v1.EventTypeWarning, "SnapshotCreationFailed", fmt.Sprintf("Failed to create snapshot: %v", err))
140-
klog.Errorf("createSnapshot [%s]: error occurred in createSnapshotWrapper: %v", opName, err)
141-
return err
142-
}
103+
contentObj, err := ctrl.createSnapshotWrapper(content)
104+
if err != nil {
105+
ctrl.updateContentErrorStatusWithEvent(content, v1.EventTypeWarning, "SnapshotCreationFailed", fmt.Sprintf("Failed to create snapshot: %v", err))
106+
klog.Errorf("createSnapshot for content [%s]: error occurred in createSnapshotWrapper: %v", content.Name, err)
107+
return err
108+
}
143109

144-
_, updateErr := ctrl.storeContentUpdate(contentObj)
145-
if updateErr != nil {
146-
// We will get an "snapshot update" event soon, this is not a big error
147-
klog.V(4).Infof("createSnapshot [%s]: cannot update internal content cache: %v", content.Name, updateErr)
148-
}
149-
return nil
150-
})
110+
_, updateErr := ctrl.storeContentUpdate(contentObj)
111+
if updateErr != nil {
112+
// We will get an "snapshot update" event soon, this is not a big error
113+
klog.V(4).Infof("createSnapshot for content [%s]: cannot update internal content cache: %v", content.Name, updateErr)
114+
}
115+
return nil
151116
}
152117

153-
func (ctrl *csiSnapshotSideCarController) checkandUpdateContentStatus(content *crdv1.VolumeSnapshotContent) {
118+
func (ctrl *csiSnapshotSideCarController) checkandUpdateContentStatus(content *crdv1.VolumeSnapshotContent) error {
154119
klog.V(5).Infof("checkandUpdateContentStatus[%s] started", content.Name)
155-
opName := fmt.Sprintf("check-%s", content.Name)
156-
ctrl.scheduleOperation(opName, func() error {
157-
contentObj, err := ctrl.checkandUpdateContentStatusOperation(content)
158-
if err != nil {
159-
ctrl.updateContentErrorStatusWithEvent(content, v1.EventTypeWarning, "SnapshotContentCheckandUpdateFailed", fmt.Sprintf("Failed to check and update snapshot content: %v", err))
160-
klog.Errorf("checkandUpdateContentStatus [%s]: error occurred %v", content.Name, err)
161-
return err
162-
}
163-
_, updateErr := ctrl.storeContentUpdate(contentObj)
164-
if updateErr != nil {
165-
// We will get an "snapshot update" event soon, this is not a big error
166-
klog.V(4).Infof("checkandUpdateContentStatus [%s]: cannot update internal cache: %v", content.Name, updateErr)
167-
}
120+
contentObj, err := ctrl.checkandUpdateContentStatusOperation(content)
121+
if err != nil {
122+
ctrl.updateContentErrorStatusWithEvent(content, v1.EventTypeWarning, "SnapshotContentCheckandUpdateFailed", fmt.Sprintf("Failed to check and update snapshot content: %v", err))
123+
klog.Errorf("checkandUpdateContentStatus [%s]: error occurred %v", content.Name, err)
124+
return err
125+
}
126+
_, updateErr := ctrl.storeContentUpdate(contentObj)
127+
if updateErr != nil {
128+
// We will get an "snapshot update" event soon, this is not a big error
129+
klog.V(4).Infof("checkandUpdateContentStatus [%s]: cannot update internal cache: %v", content.Name, updateErr)
130+
}
168131

169-
return nil
170-
})
132+
return nil
171133
}
172134

173135
// updateContentStatusWithEvent saves new content.Status to API server and emits
@@ -384,8 +346,8 @@ func (ctrl *csiSnapshotSideCarController) deleteCSISnapshotOperation(content *cr
384346
ctrl.eventRecorder.Event(content, v1.EventTypeWarning, "SnapshotDeleteError", "Failed to clear content status")
385347
return err
386348
}
387-
// update local cache
388-
ctrl.updateContentInCacheStore(newContent)
349+
// trigger syncContent
350+
ctrl.updateContentInInformerCache(newContent)
389351
return nil
390352
}
391353

pkg/sidecar-controller/snapshot_controller_base.go

Lines changed: 12 additions & 103 deletions
Original file line numberDiff line numberDiff line change
@@ -37,7 +37,6 @@ import (
3737
"k8s.io/client-go/tools/record"
3838
"k8s.io/client-go/util/workqueue"
3939
"k8s.io/klog"
40-
"k8s.io/kubernetes/pkg/util/goroutinemap"
4140
)
4241

4342
type csiSnapshotSideCarController struct {
@@ -55,8 +54,6 @@ type csiSnapshotSideCarController struct {
5554
contentStore cache.Store
5655

5756
handler Handler
58-
// Map of scheduled/running operations.
59-
runningOperations goroutinemap.GoRoutineMap
6057

6158
resyncPeriod time.Duration
6259
}
@@ -81,15 +78,14 @@ func NewCSISnapshotSideCarController(
8178
eventRecorder = broadcaster.NewRecorder(scheme.Scheme, v1.EventSource{Component: fmt.Sprintf("csi-snapshotter %s", driverName)})
8279

8380
ctrl := &csiSnapshotSideCarController{
84-
clientset: clientset,
85-
client: client,
86-
driverName: driverName,
87-
eventRecorder: eventRecorder,
88-
handler: NewCSIHandler(snapshotter, timeout, snapshotNamePrefix, snapshotNameUUIDLength),
89-
runningOperations: goroutinemap.NewGoRoutineMap(true),
90-
resyncPeriod: resyncPeriod,
91-
contentStore: cache.NewStore(cache.DeletionHandlingMetaNamespaceKeyFunc),
92-
contentQueue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "csi-snapshotter-content"),
81+
clientset: clientset,
82+
client: client,
83+
driverName: driverName,
84+
eventRecorder: eventRecorder,
85+
handler: NewCSIHandler(snapshotter, timeout, snapshotNamePrefix, snapshotNameUUIDLength),
86+
resyncPeriod: resyncPeriod,
87+
contentStore: cache.NewStore(cache.DeletionHandlingMetaNamespaceKeyFunc),
88+
contentQueue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "csi-snapshotter-content"),
9389
}
9490

9591
volumeSnapshotContentInformer.Informer().AddEventHandlerWithResyncPeriod(
@@ -176,8 +172,6 @@ func (ctrl *csiSnapshotSideCarController) processNextItem() bool {
176172

177173
func (ctrl *csiSnapshotSideCarController) syncContentByKey(key string) error {
178174
klog.V(5).Infof("syncContentByKey[%s]", key)
179-
//key := keyObj.(string)
180-
//klog.V(5).Infof("contentWorker[%s]", key)
181175

182176
_, name, err := cache.SplitMetaNamespaceKey(key)
183177
if err != nil {
@@ -189,18 +183,12 @@ func (ctrl *csiSnapshotSideCarController) syncContentByKey(key string) error {
189183
// been add/update/sync
190184
if err == nil {
191185
if ctrl.isDriverMatch(content) {
192-
err = ctrl.updateContentInCacheStore(content)
186+
err = ctrl.updateContentInInformerCache(content)
193187
}
194188
if err != nil {
195189
// If error occurs we add this item back to the queue
196-
//ctrl.contentQueue.AddRateLimited(key)
197190
return err
198-
} /* else {
199-
// If no error occurs we Forget this item so it does not
200-
// get queued again until another change happens
201-
klog.V(4).Infof("Forget snapshotContent %q so it does not get queued again until another change happens", key)
202-
ctrl.contentQueue.Forget(key)
203-
}*/
191+
}
204192
return nil
205193
}
206194
if !errors.IsNotFound(err) {
@@ -228,87 +216,8 @@ func (ctrl *csiSnapshotSideCarController) syncContentByKey(key string) error {
228216
}
229217
ctrl.deleteContentInCacheStore(content)
230218
return nil
231-
//}
232-
233-
/*for {
234-
if quit := workFunc(); quit {
235-
klog.Infof("content worker queue shutting down")
236-
return
237-
}
238-
}*/
239-
//return nil
240219
}
241220

242-
// contentWorker processes items from contentQueue. It must run only once,
243-
// syncContent is not assured to be reentrant.
244-
/*func (ctrl *csiSnapshotSideCarController) contentWorker() {
245-
workFunc := func() bool {
246-
keyObj, quit := ctrl.contentQueue.Get()
247-
if quit {
248-
return true
249-
}
250-
defer ctrl.contentQueue.Done(keyObj)
251-
key := keyObj.(string)
252-
klog.V(5).Infof("contentWorker[%s]", key)
253-
254-
_, name, err := cache.SplitMetaNamespaceKey(key)
255-
if err != nil {
256-
klog.V(4).Infof("error getting name of snapshotContent %q to get snapshotContent from informer: %v", key, err)
257-
return false
258-
}
259-
content, err := ctrl.contentLister.Get(name)
260-
// The content still exists in informer cache, the event must have
261-
// been add/update/sync
262-
if err == nil {
263-
if ctrl.isDriverMatch(content) {
264-
err = ctrl.updateContentInCacheStore(content)
265-
}
266-
if err != nil {
267-
// If error occurs we add this item back to the queue
268-
ctrl.contentQueue.AddRateLimited(keyObj)
269-
} else {
270-
// If no error occurs we Forget this item so it does not
271-
// get queued again until another change happens
272-
klog.V(4).Infof("Forget snapshotContent %q so it does not get queued again until another change happens", key)
273-
ctrl.contentQueue.Forget(keyObj)
274-
}
275-
return false
276-
}
277-
if !errors.IsNotFound(err) {
278-
klog.V(2).Infof("error getting content %q from informer: %v", key, err)
279-
return false
280-
}
281-
282-
// The content is not in informer cache, the event must have been
283-
// "delete"
284-
contentObj, found, err := ctrl.contentStore.GetByKey(key)
285-
if err != nil {
286-
klog.V(2).Infof("error getting content %q from cache: %v", key, err)
287-
return false
288-
}
289-
if !found {
290-
// The controller has already processed the delete event and
291-
// deleted the content from its cache
292-
klog.V(2).Infof("deletion of content %q was already processed", key)
293-
return false
294-
}
295-
content, ok := contentObj.(*crdv1.VolumeSnapshotContent)
296-
if !ok {
297-
klog.Errorf("expected content, got %+v", content)
298-
return false
299-
}
300-
ctrl.deleteContentInCacheStore(content)
301-
return false
302-
}
303-
304-
for {
305-
if quit := workFunc(); quit {
306-
klog.Infof("content worker queue shutting down")
307-
return
308-
}
309-
}
310-
}*/
311-
312221
// verify whether the driver specified in VolumeSnapshotContent matches the controller's driver name
313222
func (ctrl *csiSnapshotSideCarController) isDriverMatch(content *crdv1.VolumeSnapshotContent) bool {
314223
if content.Spec.Source.VolumeHandle == nil && content.Spec.Source.SnapshotHandle == nil {
@@ -330,9 +239,9 @@ func (ctrl *csiSnapshotSideCarController) isDriverMatch(content *crdv1.VolumeSna
330239
return true
331240
}
332241

333-
// updateContent runs in worker thread and handles "content added",
242+
// updateContentInInformerCache runs in worker thread and handles "content added",
334243
// "content updated" and "periodic sync" events.
335-
func (ctrl *csiSnapshotSideCarController) updateContentInCacheStore(content *crdv1.VolumeSnapshotContent) error {
244+
func (ctrl *csiSnapshotSideCarController) updateContentInInformerCache(content *crdv1.VolumeSnapshotContent) error {
336245
// Store the new content version in the cache and do not process it if this is
337246
// an old version.
338247
new, err := ctrl.storeContentUpdate(content)

0 commit comments

Comments
 (0)