Skip to content

sdk: retry Watch() call on failure. #311

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
28 changes: 19 additions & 9 deletions pkg/sdk/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@ package sdk

import (
"context"
"sync"
"time"

"github.com/operator-framework/operator-sdk/pkg/k8sclient"

Expand All @@ -24,7 +26,8 @@ import (

var (
// informers is the set of all informers for the resources watched by the user
informers []Informer
informers []Informer
informersMux sync.Mutex
)

// Watch watches for changes on the given resource.
Expand All @@ -37,16 +40,23 @@ var (
// - 0 means no periodic events will be sent
// Consult the API reference for the Group, Version and Kind of a resource: https://kubernetes.io/docs/reference/
// namespace is the Namespace to watch for the resource
// If the resource client can not be obtained (CRD not yet deployed for example) it is retried until success.
// TODO: support opts for specifying label selector
func Watch(apiVersion, kind, namespace string, resyncPeriod int) {
resourceClient, resourcePluralName, err := k8sclient.GetResourceClient(apiVersion, kind, namespace)
// TODO: Better error handling, e.g retry
if err != nil {
logrus.Errorf("failed to get resource client for (apiVersion:%s, kind:%s, ns:%s): %v", apiVersion, kind, namespace, err)
panic(err)
}
informer := NewInformer(resourcePluralName, namespace, resourceClient, resyncPeriod)
informers = append(informers, informer)
go func() {
for {
resourceClient, resourcePluralName, err := k8sclient.GetResourceClient(apiVersion, kind, namespace)
if err == nil {
informersMux.Lock()
defer informersMux.Unlock()
informer := NewInformer(resourcePluralName, namespace, resourceClient, resyncPeriod)
informers = append(informers, informer)
break
}
logrus.Errorf("failed to get resource client for (apiVersion:%s, kind:%s, ns:%s): %v", apiVersion, kind, namespace, err)
time.Sleep(1 * time.Second)
}
}()
}

// Handle registers the handler for all events.
Expand Down