Skip to content

Commit 01fda65

Browse files
jentingroboquat
authored andcommitted
ws-mananger: refactor volume snapshot watcher
Signed-off-by: JenTing Hsiao <[email protected]>
1 parent 6bdc9bf commit 01fda65

File tree

6 files changed

+104
-168
lines changed

6 files changed

+104
-168
lines changed

components/ws-manager/cmd/run.go

+8-18
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,9 @@ import (
3333
imgbldr "github.com/gitpod-io/gitpod/image-builder/api"
3434
"github.com/gitpod-io/gitpod/ws-manager/pkg/manager"
3535
"github.com/gitpod-io/gitpod/ws-manager/pkg/proxy"
36+
3637
volumesnapshotv1 "github.com/kubernetes-csi/external-snapshotter/client/v4/apis/volumesnapshot/v1"
38+
volumesnapshotclientv1 "github.com/kubernetes-csi/external-snapshotter/client/v4/clientset/versioned"
3739
)
3840

3941
// serveCmd represents the serve command
@@ -77,9 +79,6 @@ var runCmd = &cobra.Command{
7779
}
7880

7981
kubeConfig, err := ctrl.GetConfig()
80-
if err != nil {
81-
log.WithError(err).Fatal("unable to create a Kubernetes API Client configuration")
82-
}
8382
if err != nil {
8483
log.WithError(err).Fatal("unable to getting Kubernetes client config")
8584
}
@@ -89,6 +88,11 @@ var runCmd = &cobra.Command{
8988
log.WithError(err).Fatal("constructing Kubernetes client")
9089
}
9190

91+
volumesnapshotclientset, err := volumesnapshotclientv1.NewForConfig(kubeConfig)
92+
if err != nil {
93+
log.WithError(err).Fatal("constructing volume snapshot client")
94+
}
95+
9296
if err := mgr.AddHealthzCheck("healthz", healthz.Ping); err != nil {
9397
log.WithError(err).Fatal("unable to set up health check")
9498
}
@@ -106,7 +110,7 @@ var runCmd = &cobra.Command{
106110
log.WithError(err).Fatal("cannot register Kubernetes volumesnapshotv1 schema - this should never happen")
107111
}
108112

109-
mgmt, err := manager.New(cfg.Manager, mgr.GetClient(), clientset, cp)
113+
mgmt, err := manager.New(cfg.Manager, mgr.GetClient(), clientset, volumesnapshotclientset, cp)
110114
if err != nil {
111115
log.WithError(err).Fatal("cannot create manager")
112116
}
@@ -200,20 +204,6 @@ var runCmd = &cobra.Command{
200204
log.WithError(err).Fatal("unable to create controller", "controller", "Pod")
201205
}
202206

203-
// enable the volume snapshot controller when the VolumeSnapshot CRD exists
204-
_, err = clientset.DiscoveryClient.ServerResourcesForGroupVersion(volumesnapshotv1.SchemeGroupVersion.String())
205-
if err == nil {
206-
err = (&manager.VolumeSnapshotReconciler{
207-
Monitor: monitor,
208-
Client: mgr.GetClient(),
209-
Log: ctrl.Log.WithName("controllers").WithName("VolumeSnapshot"),
210-
Scheme: mgr.GetScheme(),
211-
}).SetupWithManager(mgr)
212-
if err != nil {
213-
log.WithError(err).Fatal("unable to create controller", "controller", "VolumeSnapshot")
214-
}
215-
}
216-
217207
if cfg.PProf.Addr != "" {
218208
go pprof.Serve(cfg.PProf.Addr)
219209
}

components/ws-manager/pkg/manager/integration_test.go

+9-1
Original file line numberDiff line numberDiff line change
@@ -45,6 +45,8 @@ import (
4545
config "github.com/gitpod-io/gitpod/ws-manager/api/config"
4646
"github.com/gitpod-io/gitpod/ws-manager/pkg/manager/internal/grpcpool"
4747
"github.com/gitpod-io/gitpod/ws-manager/pkg/test"
48+
49+
volumesnapshotclientv1 "github.com/kubernetes-csi/external-snapshotter/client/v4/clientset/versioned"
4850
)
4951

5052
var integrationFlag = flag.String("integration-test", "disabled", "configures integration tests. Valid values are disabled, local or a path to a kubeconfig file")
@@ -119,13 +121,19 @@ func forIntegrationTestGetManager(t *testing.T) *Manager {
119121
return nil
120122
}
121123

124+
volumesnapshotclientset, err := volumesnapshotclientv1.NewForConfig(cfg)
125+
if err != nil {
126+
t.Errorf("cannt create test environment: %v", err)
127+
return nil
128+
}
129+
122130
ctrlClient, err := ctrler_client.New(cfg, ctrler_client.Options{Scheme: scheme})
123131
if err != nil {
124132
t.Errorf("cannot create test environment: %v", err)
125133
return nil
126134
}
127135

128-
m, err := New(config, ctrlClient, clientset, &layer.Provider{Storage: &storage.PresignedNoopStorage{}})
136+
m, err := New(config, ctrlClient, clientset, volumesnapshotclientset, &layer.Provider{Storage: &storage.PresignedNoopStorage{}})
129137
if err != nil {
130138
t.Fatalf("cannot create manager: %s", err.Error())
131139
}

components/ws-manager/pkg/manager/manager.go

+18-14
Original file line numberDiff line numberDiff line change
@@ -52,16 +52,19 @@ import (
5252
config "github.com/gitpod-io/gitpod/ws-manager/api/config"
5353
"github.com/gitpod-io/gitpod/ws-manager/pkg/clock"
5454
"github.com/gitpod-io/gitpod/ws-manager/pkg/manager/internal/grpcpool"
55+
5556
volumesnapshotv1 "github.com/kubernetes-csi/external-snapshotter/client/v4/apis/volumesnapshot/v1"
57+
volumesnapshotclientv1 "github.com/kubernetes-csi/external-snapshotter/client/v4/clientset/versioned"
5658
)
5759

5860
// Manager is a kubernetes backed implementation of a workspace manager
5961
type Manager struct {
60-
Config config.Configuration
61-
Clientset client.Client
62-
RawClient kubernetes.Interface
63-
Content *layer.Provider
64-
OnChange func(context.Context, *api.WorkspaceStatus)
62+
Config config.Configuration
63+
Clientset client.Client
64+
RawClient kubernetes.Interface
65+
VolumeSnapshotClient volumesnapshotclientv1.Interface
66+
Content *layer.Provider
67+
OnChange func(context.Context, *api.WorkspaceStatus)
6568

6669
activity sync.Map
6770
clock *clock.HLC
@@ -127,7 +130,7 @@ const (
127130
)
128131

129132
// New creates a new workspace manager
130-
func New(config config.Configuration, client client.Client, rawClient kubernetes.Interface, cp *layer.Provider) (*Manager, error) {
133+
func New(config config.Configuration, client client.Client, rawClient kubernetes.Interface, volumesnapshotClient volumesnapshotclientv1.Interface, cp *layer.Provider) (*Manager, error) {
131134
wsdaemonConnfactory, err := newWssyncConnectionFactory(config)
132135
if err != nil {
133136
return nil, err
@@ -138,14 +141,15 @@ func New(config config.Configuration, client client.Client, rawClient kubernetes
138141
eventRecorder := broadcaster.NewRecorder(runtime.NewScheme(), corev1.EventSource{Component: "ws-manager"})
139142

140143
m := &Manager{
141-
Config: config,
142-
Clientset: client,
143-
RawClient: rawClient,
144-
Content: cp,
145-
clock: clock.System(),
146-
subscribers: make(map[string]chan *api.SubscribeResponse),
147-
wsdaemonPool: grpcpool.New(wsdaemonConnfactory, checkWSDaemonEndpoint(config.Namespace, client)),
148-
eventRecorder: eventRecorder,
144+
Config: config,
145+
Clientset: client,
146+
RawClient: rawClient,
147+
VolumeSnapshotClient: volumesnapshotClient,
148+
Content: cp,
149+
clock: clock.System(),
150+
subscribers: make(map[string]chan *api.SubscribeResponse),
151+
wsdaemonPool: grpcpool.New(wsdaemonConnfactory, checkWSDaemonEndpoint(config.Namespace, client)),
152+
eventRecorder: eventRecorder,
149153
}
150154
m.metrics = newMetrics(m)
151155
m.OnChange = m.onChange

components/ws-manager/pkg/manager/monitor.go

+60-76
Original file line numberDiff line numberDiff line change
@@ -27,10 +27,14 @@ import (
2727
corev1 "k8s.io/api/core/v1"
2828
k8serr "k8s.io/apimachinery/pkg/api/errors"
2929
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
30+
"k8s.io/apimachinery/pkg/fields"
3031
"k8s.io/apimachinery/pkg/labels"
3132
"k8s.io/apimachinery/pkg/types"
33+
"k8s.io/apimachinery/pkg/util/wait"
3234
"k8s.io/apimachinery/pkg/watch"
35+
"k8s.io/client-go/tools/cache"
3336
"k8s.io/client-go/tools/record"
37+
watchtools "k8s.io/client-go/tools/watch"
3438
"sigs.k8s.io/controller-runtime/pkg/client"
3539

3640
"github.com/gitpod-io/gitpod/common-go/kubernetes"
@@ -84,9 +88,6 @@ type Monitor struct {
8488

8589
OnError func(error)
8690

87-
notifyPod map[string]chan string
88-
notifyPodMapLock sync.Mutex
89-
9091
eventRecorder record.EventRecorder
9192
}
9293

@@ -107,8 +108,6 @@ func (m *Manager) CreateMonitor() (*Monitor, error) {
107108
log.WithError(err).Error("workspace monitor error")
108109
},
109110

110-
notifyPod: make(map[string]chan string),
111-
112111
eventRecorder: m.eventRecorder,
113112
}
114113
res.eventpool = workpool.NewEventWorkerPool(res.handleEvent)
@@ -152,62 +151,13 @@ func (m *Monitor) handleEvent(evt watch.Event) {
152151
switch evt.Object.(type) {
153152
case *corev1.Pod:
154153
err = m.onPodEvent(evt)
155-
case *volumesnapshotv1.VolumeSnapshot:
156-
err = m.onVolumesnapshotEvent(evt)
157154
}
158155

159156
if err != nil {
160157
m.OnError(err)
161158
}
162159
}
163160

164-
func (m *Monitor) onVolumesnapshotEvent(evt watch.Event) error {
165-
vs, ok := evt.Object.(*volumesnapshotv1.VolumeSnapshot)
166-
if !ok {
167-
return xerrors.Errorf("received non-volume-snapshot event")
168-
}
169-
170-
log := log.WithField("volumesnapshot", vs.Name)
171-
172-
if vs.Spec.Source.PersistentVolumeClaimName == nil {
173-
// there is no pvc name within the VolumeSnapshot object
174-
log.Warn("the spec.source.persistentVolumeClaimName is empty")
175-
return nil
176-
}
177-
178-
// the pod name is 1:1 mapping to pvc name
179-
podName := *vs.Spec.Source.PersistentVolumeClaimName
180-
log = log.WithField("pod", podName)
181-
182-
// get the pod resource
183-
var pod corev1.Pod
184-
err := m.manager.Clientset.Get(context.Background(), types.NamespacedName{Namespace: vs.Namespace, Name: podName}, &pod)
185-
if err != nil && !k8serr.IsNotFound(err) {
186-
log.WithError(err).Warnf("cannot get pod")
187-
}
188-
189-
if vs.Status == nil || vs.Status.ReadyToUse == nil || !*vs.Status.ReadyToUse || vs.Status.BoundVolumeSnapshotContentName == nil {
190-
if !pod.CreationTimestamp.IsZero() {
191-
m.eventRecorder.Eventf(&pod, corev1.EventTypeNormal, "VolumeSnapshot", "Volume snapshot %q is in progress", vs.Name)
192-
}
193-
return nil
194-
}
195-
196-
vsc := *vs.Status.BoundVolumeSnapshotContentName
197-
log.Debugf("the vsc %s is ready to use", vsc)
198-
if !pod.CreationTimestamp.IsZero() {
199-
m.eventRecorder.Eventf(&pod, corev1.EventTypeNormal, "VolumeSnapshot", "Volume snapshot %q is ready to use", vs.Name)
200-
}
201-
202-
m.notifyPodMapLock.Lock()
203-
if m.notifyPod[podName] != nil {
204-
m.notifyPod[podName] <- vsc
205-
}
206-
m.notifyPodMapLock.Unlock()
207-
208-
return nil
209-
}
210-
211161
// onPodEvent interpretes Kubernetes events, translates and broadcasts them, and acts based on them
212162
func (m *Monitor) onPodEvent(evt watch.Event) error {
213163
// Beware: we patch running pods to add annotations. At the moment this is not a problem as do not attach
@@ -1150,31 +1100,65 @@ func (m *Monitor) finalizeWorkspaceContent(ctx context.Context, wso *workspaceOb
11501100
volumeSnapshotTime = time.Now()
11511101
}
11521102
if createdVolumeSnapshot {
1153-
m.notifyPodMapLock.Lock()
1154-
if m.notifyPod[wso.Pod.Name] == nil {
1155-
m.notifyPod[wso.Pod.Name] = make(chan string)
1156-
}
1157-
m.notifyPodMapLock.Unlock()
1158-
1159-
select {
1160-
case pvcVolumeSnapshotContentName = <-m.notifyPod[wso.Pod.Name]:
1103+
log = log.WithField("VolumeSnapshot.Name", pvcVolumeSnapshotName)
1104+
1105+
var volumeSnapshotWatcher *watchtools.RetryWatcher
1106+
volumeSnapshotWatcher, err = watchtools.NewRetryWatcher("1", &cache.ListWatch{
1107+
WatchFunc: func(options metav1.ListOptions) (watch.Interface, error) {
1108+
return m.manager.VolumeSnapshotClient.SnapshotV1().VolumeSnapshots(m.manager.Config.Namespace).Watch(ctx, metav1.ListOptions{
1109+
FieldSelector: fields.OneTermEqualSelector("metadata.name", pvcVolumeSnapshotName).String(),
1110+
})
1111+
},
1112+
})
1113+
if err != nil {
1114+
log.WithError(err).Info("fall back to exponential backoff retry")
1115+
// we can not create a retry watcher, we fall back to exponential backoff retry
1116+
backoff := wait.Backoff{
1117+
Steps: 30,
1118+
Duration: 100 * time.Millisecond,
1119+
Factor: 1.5,
1120+
Jitter: 0.1,
1121+
Cap: 10 * time.Minute,
1122+
}
1123+
err = wait.ExponentialBackoff(backoff, func() (bool, error) {
1124+
var vs volumesnapshotv1.VolumeSnapshot
1125+
err := m.manager.Clientset.Get(ctx, types.NamespacedName{Namespace: m.manager.Config.Namespace, Name: pvcVolumeSnapshotName}, &vs)
1126+
if err != nil {
1127+
if k8serr.IsNotFound(err) {
1128+
// volumesnapshot doesn't exist yet, retry again
1129+
return false, nil
1130+
}
1131+
log.WithError(err).Error("was unable to get volume snapshot")
1132+
return false, err
1133+
}
1134+
if vs.Status != nil && vs.Status.ReadyToUse != nil && *vs.Status.ReadyToUse && vs.Status.BoundVolumeSnapshotContentName != nil {
1135+
pvcVolumeSnapshotContentName = *vs.Status.BoundVolumeSnapshotContentName
1136+
return true, nil
1137+
}
1138+
return false, nil
1139+
})
1140+
if err != nil {
1141+
log.WithError(err).Errorf("failed while waiting for volume snapshot to get ready")
1142+
return nil, err
1143+
}
11611144
readyVolumeSnapshot = true
1162-
case <-ctx.Done():
1163-
// There might be a chance that the VolumeSnapshot is ready but somehow
1164-
// we did not receive the notification.
1165-
// For example, the ws-manager restarts before the VolumeSnapshot becomes ready.
1166-
// Let's give it the last chance to check the VolumeSnapshot is ready.
1167-
var vs volumesnapshotv1.VolumeSnapshot
1168-
err := m.manager.Clientset.Get(ctx, types.NamespacedName{Namespace: m.manager.Config.Namespace, Name: pvcVolumeSnapshotName}, &vs)
1169-
if err == nil && vs.Status != nil && vs.Status.ReadyToUse != nil && *vs.Status.ReadyToUse && vs.Status.BoundVolumeSnapshotContentName != nil {
1170-
pvcVolumeSnapshotContentName = *vs.Status.BoundVolumeSnapshotContentName
1171-
readyVolumeSnapshot = true
1172-
break
1145+
} else {
1146+
for event := range volumeSnapshotWatcher.ResultChan() {
1147+
vs, ok := event.Object.(*volumesnapshotv1.VolumeSnapshot)
1148+
if !ok {
1149+
log.Errorf("unexpected type assertion %T", event.Object)
1150+
continue
1151+
}
1152+
1153+
if vs != nil && vs.Status != nil && vs.Status.ReadyToUse != nil && *vs.Status.ReadyToUse && vs.Status.BoundVolumeSnapshotContentName != nil {
1154+
pvcVolumeSnapshotContentName = *vs.Status.BoundVolumeSnapshotContentName
1155+
readyVolumeSnapshot = true
1156+
break
1157+
}
11731158
}
11741159

1175-
err = xerrors.Errorf("%s timed out while waiting for volume snapshot to get ready", m.manager.Config.Timeouts.ContentFinalization.String())
1176-
log.Error(err.Error())
1177-
return nil, err
1160+
// stop the volume snapshot retry watcher
1161+
volumeSnapshotWatcher.Stop()
11781162
}
11791163

11801164
hist, err := m.manager.metrics.volumeSnapshotTimeHistVec.GetMetricWithLabelValues(wsType, wso.Pod.Labels[workspaceClassLabel])

components/ws-manager/pkg/manager/testing_test.go

+9-1
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,8 @@ import (
2222
"github.com/gitpod-io/gitpod/content-service/pkg/storage"
2323
"github.com/gitpod-io/gitpod/ws-manager/api"
2424
config "github.com/gitpod-io/gitpod/ws-manager/api/config"
25+
26+
volumesnapshotclientv1 "github.com/kubernetes-csi/external-snapshotter/client/v4/clientset/versioned"
2527
)
2628

2729
// This file contains test infrastructure for this package. No function in here is meant for consumption outside of tests.
@@ -96,6 +98,12 @@ func forTestingOnlyGetManager(t *testing.T, objects ...client.Object) *Manager {
9698
return nil
9799
}
98100

101+
volumesnapshotclientset, err := volumesnapshotclientv1.NewForConfig(cfg)
102+
if err != nil {
103+
t.Errorf("cannt create test environment: %v", err)
104+
return nil
105+
}
106+
99107
ctrlClient, err := client.New(cfg, client.Options{Scheme: scheme})
100108
if err != nil {
101109
t.Errorf("cannot create test environment: %v", err)
@@ -122,7 +130,7 @@ func forTestingOnlyGetManager(t *testing.T, objects ...client.Object) *Manager {
122130
}
123131
}
124132

125-
m, err := New(config, ctrlClient, clientset, &layer.Provider{Storage: &storage.PresignedNoopStorage{}})
133+
m, err := New(config, ctrlClient, clientset, volumesnapshotclientset, &layer.Provider{Storage: &storage.PresignedNoopStorage{}})
126134
if err != nil {
127135
t.Fatalf("cannot create manager: %s", err.Error())
128136
}

0 commit comments

Comments
 (0)