diff --git a/cmd/csi-snapshotter/main.go b/cmd/csi-snapshotter/main.go index 0c1e37f18..114cc813f 100644 --- a/cmd/csi-snapshotter/main.go +++ b/cmd/csi-snapshotter/main.go @@ -93,6 +93,7 @@ var ( metricsAddress = flag.String("metrics-address", "", "(deprecated) The TCP network address where the prometheus metrics endpoint will listen (example: `:8080`). The default is empty string, which means metrics endpoint is disabled. Only one of `--metrics-address` and `--http-endpoint` can be set.") httpEndpoint = flag.String("http-endpoint", "", "The TCP network address where the HTTP server for diagnostics, including metrics and leader election health check, will listen (example: `:8080`). The default is empty string, which means the server is disabled. Only one of `--metrics-address` and `--http-endpoint` can be set.") metricsPath = flag.String("metrics-path", "/metrics", "The HTTP path where prometheus metrics will be exposed. Default is `/metrics`.") + pollSnapshotPeriod = flag.Duration("poll-snapshot-period", 10*time.Second, "Poll interval to check if a snapshot is ready. Default is 10 seconds.") retryIntervalStart = flag.Duration("retry-interval-start", time.Second, "Initial retry interval of failed volume snapshot creation or deletion. It doubles with each failure, up to retry-interval-max. Default is 1 second.") retryIntervalMax = flag.Duration("retry-interval-max", 5*time.Minute, "Maximum retry interval of failed volume snapshot creation or deletion. Default is 5 minutes.") enableNodeDeployment = flag.Bool("node-deployment", false, "Enables deploying the sidecar controller together with a CSI driver on nodes to manage snapshots for node-local volumes.") @@ -297,6 +298,7 @@ func main() { snapshotContentfactory.Groupsnapshot().V1beta1().VolumeGroupSnapshotContents(), snapshotContentfactory.Groupsnapshot().V1beta1().VolumeGroupSnapshotClasses(), workqueue.NewTypedItemExponentialFailureRateLimiter[string](*retryIntervalStart, *retryIntervalMax), + *pollSnapshotPeriod, ) run := func(context.Context) { diff --git a/pkg/sidecar-controller/framework_test.go b/pkg/sidecar-controller/framework_test.go index 9265a3686..92f1bd713 100644 --- a/pkg/sidecar-controller/framework_test.go +++ b/pkg/sidecar-controller/framework_test.go @@ -580,6 +580,7 @@ func newTestController(kubeClient kubernetes.Interface, clientset clientset.Inte informerFactory.Groupsnapshot().V1beta1().VolumeGroupSnapshotContents(), informerFactory.Groupsnapshot().V1beta1().VolumeGroupSnapshotClasses(), workqueue.NewTypedItemExponentialFailureRateLimiter[string](1*time.Millisecond, 1*time.Minute), + 10*time.Second, /* pollSnapshotPeriod */ ) ctrl.eventRecorder = record.NewFakeRecorder(1000) diff --git a/pkg/sidecar-controller/snapshot_controller_base.go b/pkg/sidecar-controller/snapshot_controller_base.go index 0e0c7295d..16f03fb4f 100644 --- a/pkg/sidecar-controller/snapshot_controller_base.go +++ b/pkg/sidecar-controller/snapshot_controller_base.go @@ -64,6 +64,8 @@ type csiSnapshotSideCarController struct { resyncPeriod time.Duration + pollSnapshotPeriod time.Duration + enableVolumeGroupSnapshots bool groupSnapshotContentQueue workqueue.TypedRateLimitingInterface[string] groupSnapshotContentLister groupsnapshotlisters.VolumeGroupSnapshotContentLister @@ -94,6 +96,7 @@ func NewCSISnapshotSideCarController( volumeGroupSnapshotContentInformer groupsnapshotinformers.VolumeGroupSnapshotContentInformer, volumeGroupSnapshotClassInformer groupsnapshotinformers.VolumeGroupSnapshotClassInformer, groupSnapshotContentRateLimiter workqueue.TypedRateLimiter[string], + pollSnapshotPeriod time.Duration, ) *csiSnapshotSideCarController { broadcaster := record.NewBroadcaster() broadcaster.StartLogging(klog.Infof) @@ -102,13 +105,14 @@ func NewCSISnapshotSideCarController( eventRecorder = broadcaster.NewRecorder(scheme.Scheme, v1.EventSource{Component: fmt.Sprintf("csi-snapshotter %s", driverName)}) ctrl := &csiSnapshotSideCarController{ - clientset: clientset, - client: client, - driverName: driverName, - eventRecorder: eventRecorder, - handler: NewCSIHandler(snapshotter, groupSnapshotter, timeout, snapshotNamePrefix, snapshotNameUUIDLength, groupSnapshotNamePrefix, groupSnapshotNameUUIDLength), - resyncPeriod: resyncPeriod, - contentStore: cache.NewStore(cache.DeletionHandlingMetaNamespaceKeyFunc), + clientset: clientset, + client: client, + driverName: driverName, + eventRecorder: eventRecorder, + handler: NewCSIHandler(snapshotter, groupSnapshotter, timeout, snapshotNamePrefix, snapshotNameUUIDLength, groupSnapshotNamePrefix, groupSnapshotNameUUIDLength), + resyncPeriod: resyncPeriod, + pollSnapshotPeriod: pollSnapshotPeriod, + contentStore: cache.NewStore(cache.DeletionHandlingMetaNamespaceKeyFunc), contentQueue: workqueue.NewTypedRateLimitingQueueWithConfig( contentRateLimiter, workqueue.TypedRateLimitingQueueConfig[string]{ Name: "csi-snapshotter-content"}), @@ -232,16 +236,17 @@ func (ctrl *csiSnapshotSideCarController) processNextItem() bool { klog.V(4).Infof("Failed to sync content %q, will retry again: %v", key, err) // Always requeue on error to be able to call functions like "return false, doSomething()" where doSomething // does not need to worry about re-queueing. - requeue = true - } - if requeue { ctrl.contentQueue.AddRateLimited(key) - return true + } else { + // if no error occurs we Forget this item so it does not + // get queued again until another change happens. + ctrl.contentQueue.Forget(key) + + if requeue { + ctrl.contentQueue.AddAfter(key, ctrl.pollSnapshotPeriod) + } } - // Finally, if no error occurs we Forget this item so it does not - // get queued again until another change happens. - ctrl.contentQueue.Forget(key) return true }