@@ -149,6 +149,99 @@ func (ctrl *csiSnapshotSideCarController) enqueueContentWork(obj interface{}) {
149
149
// contentWorker processes items from contentQueue. It must run only once,
150
150
// syncContent is not assured to be reentrant.
151
151
func (ctrl * csiSnapshotSideCarController ) contentWorker () {
152
+ for ctrl .processNextItem () {
153
+ }
154
+ }
155
+
156
+ func (ctrl * csiSnapshotSideCarController ) processNextItem () bool {
157
+ keyObj , quit := ctrl .contentQueue .Get ()
158
+ if quit {
159
+ return false
160
+ }
161
+ defer ctrl .contentQueue .Done (keyObj )
162
+
163
+ if err := ctrl .syncContentByKey (keyObj .(string )); err != nil {
164
+ // Rather than wait for a full resync, re-add the key to the
165
+ // queue to be processed.
166
+ ctrl .contentQueue .AddRateLimited (keyObj )
167
+ klog .V (4 ).Infof ("Failed to sync content %q, will retry again: %v" , keyObj .(string ), err )
168
+ return true
169
+ }
170
+
171
+ // Finally, if no error occurs we Forget this item so it does not
172
+ // get queued again until another change happens.
173
+ ctrl .contentQueue .Forget (keyObj )
174
+ return true
175
+ }
176
+
177
+ func (ctrl * csiSnapshotSideCarController ) syncContentByKey (key string ) error {
178
+ klog .V (5 ).Infof ("syncContentByKey[%s]" , key )
179
+ //key := keyObj.(string)
180
+ //klog.V(5).Infof("contentWorker[%s]", key)
181
+
182
+ _ , name , err := cache .SplitMetaNamespaceKey (key )
183
+ if err != nil {
184
+ klog .V (4 ).Infof ("error getting name of snapshotContent %q to get snapshotContent from informer: %v" , key , err )
185
+ return nil
186
+ }
187
+ content , err := ctrl .contentLister .Get (name )
188
+ // The content still exists in informer cache, the event must have
189
+ // been add/update/sync
190
+ if err == nil {
191
+ if ctrl .isDriverMatch (content ) {
192
+ err = ctrl .updateContentInCacheStore (content )
193
+ }
194
+ if err != nil {
195
+ // If error occurs we add this item back to the queue
196
+ //ctrl.contentQueue.AddRateLimited(key)
197
+ return err
198
+ } /* else {
199
+ // If no error occurs we Forget this item so it does not
200
+ // get queued again until another change happens
201
+ klog.V(4).Infof("Forget snapshotContent %q so it does not get queued again until another change happens", key)
202
+ ctrl.contentQueue.Forget(key)
203
+ }*/
204
+ return nil
205
+ }
206
+ if ! errors .IsNotFound (err ) {
207
+ klog .V (2 ).Infof ("error getting content %q from informer: %v" , key , err )
208
+ return nil
209
+ }
210
+
211
+ // The content is not in informer cache, the event must have been
212
+ // "delete"
213
+ contentObj , found , err := ctrl .contentStore .GetByKey (key )
214
+ if err != nil {
215
+ klog .V (2 ).Infof ("error getting content %q from cache: %v" , key , err )
216
+ return nil
217
+ }
218
+ if ! found {
219
+ // The controller has already processed the delete event and
220
+ // deleted the content from its cache
221
+ klog .V (2 ).Infof ("deletion of content %q was already processed" , key )
222
+ return nil
223
+ }
224
+ content , ok := contentObj .(* crdv1.VolumeSnapshotContent )
225
+ if ! ok {
226
+ klog .Errorf ("expected content, got %+v" , content )
227
+ return nil
228
+ }
229
+ ctrl .deleteContentInCacheStore (content )
230
+ return nil
231
+ //}
232
+
233
+ /*for {
234
+ if quit := workFunc(); quit {
235
+ klog.Infof("content worker queue shutting down")
236
+ return
237
+ }
238
+ }*/
239
+ //return nil
240
+ }
241
+
242
+ // contentWorker processes items from contentQueue. It must run only once,
243
+ // syncContent is not assured to be reentrant.
244
+ /*func (ctrl *csiSnapshotSideCarController) contentWorker() {
152
245
workFunc := func() bool {
153
246
keyObj, quit := ctrl.contentQueue.Get()
154
247
if quit {
@@ -214,7 +307,7 @@ func (ctrl *csiSnapshotSideCarController) contentWorker() {
214
307
return
215
308
}
216
309
}
217
- }
310
+ }*/
218
311
219
312
// verify whether the driver specified in VolumeSnapshotContent matches the controller's driver name
220
313
func (ctrl * csiSnapshotSideCarController ) isDriverMatch (content * crdv1.VolumeSnapshotContent ) bool {
0 commit comments