Skip to content

Commit 234c01b

Browse files
authored
Merge pull request #7 from xing-yang/snapshot_controller
Add Snapshot Controller
2 parents cc19fd3 + 17c7e1b commit 234c01b

File tree

9 files changed

+2310
-10
lines changed

9 files changed

+2310
-10
lines changed

cmd/csi-snapshotter/create_crd.go

+122
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,122 @@
1+
/*
2+
Copyright 2018 The Kubernetes Authors.
3+
Licensed under the Apache License, Version 2.0 (the "License");
4+
you may not use this file except in compliance with the License.
5+
You may obtain a copy of the License at
6+
http://www.apache.org/licenses/LICENSE-2.0
7+
Unless required by applicable law or agreed to in writing, software
8+
distributed under the License is distributed on an "AS IS" BASIS,
9+
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
10+
See the License for the specific language governing permissions and
11+
limitations under the License.
12+
*/
13+
14+
package main
15+
16+
import (
17+
"reflect"
18+
19+
"github.com/golang/glog"
20+
crdv1 "github.com/kubernetes-csi/external-snapshotter/pkg/apis/volumesnapshot/v1alpha1"
21+
apiextensionsv1beta1 "k8s.io/apiextensions-apiserver/pkg/apis/apiextensions/v1beta1"
22+
apiextensionsclient "k8s.io/apiextensions-apiserver/pkg/client/clientset/clientset"
23+
apierrors "k8s.io/apimachinery/pkg/api/errors"
24+
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
25+
"k8s.io/apimachinery/pkg/runtime"
26+
"k8s.io/apimachinery/pkg/runtime/serializer"
27+
"k8s.io/client-go/rest"
28+
)
29+
30+
const (
31+
// SnapshotPVCAnnotation is "snapshot.alpha.kubernetes.io/snapshot"
32+
SnapshotPVCAnnotation = "volumesnapshot.csi.k8s.io/snapshot"
33+
)
34+
35+
// NewClient creates a new RESTClient
36+
func NewClient(cfg *rest.Config) (*rest.RESTClient, *runtime.Scheme, error) {
37+
scheme := runtime.NewScheme()
38+
if err := crdv1.AddToScheme(scheme); err != nil {
39+
return nil, nil, err
40+
}
41+
42+
config := *cfg
43+
config.GroupVersion = &crdv1.SchemeGroupVersion
44+
config.APIPath = "/apis"
45+
config.ContentType = runtime.ContentTypeJSON
46+
config.NegotiatedSerializer = serializer.DirectCodecFactory{CodecFactory: serializer.NewCodecFactory(scheme)}
47+
48+
client, err := rest.RESTClientFor(&config)
49+
if err != nil {
50+
return nil, nil, err
51+
}
52+
53+
return client, scheme, nil
54+
}
55+
56+
// CreateCRD creates CustomResourceDefinition
57+
func CreateCRD(clientset apiextensionsclient.Interface) error {
58+
crd := &apiextensionsv1beta1.CustomResourceDefinition{
59+
ObjectMeta: metav1.ObjectMeta{
60+
Name: crdv1.VolumeSnapshotClassResourcePlural + "." + crdv1.GroupName,
61+
},
62+
Spec: apiextensionsv1beta1.CustomResourceDefinitionSpec{
63+
Group: crdv1.GroupName,
64+
Version: crdv1.SchemeGroupVersion.Version,
65+
Scope: apiextensionsv1beta1.ClusterScoped,
66+
Names: apiextensionsv1beta1.CustomResourceDefinitionNames{
67+
Plural: crdv1.VolumeSnapshotClassResourcePlural,
68+
Kind: reflect.TypeOf(crdv1.VolumeSnapshotClass{}).Name(),
69+
},
70+
},
71+
}
72+
res, err := clientset.ApiextensionsV1beta1().CustomResourceDefinitions().Create(crd)
73+
74+
if err != nil && !apierrors.IsAlreadyExists(err) {
75+
glog.Fatalf("failed to create VolumeSnapshotResource: %#v, err: %#v",
76+
res, err)
77+
}
78+
79+
crd = &apiextensionsv1beta1.CustomResourceDefinition{
80+
ObjectMeta: metav1.ObjectMeta{
81+
Name: crdv1.VolumeSnapshotContentResourcePlural + "." + crdv1.GroupName,
82+
},
83+
Spec: apiextensionsv1beta1.CustomResourceDefinitionSpec{
84+
Group: crdv1.GroupName,
85+
Version: crdv1.SchemeGroupVersion.Version,
86+
Scope: apiextensionsv1beta1.ClusterScoped,
87+
Names: apiextensionsv1beta1.CustomResourceDefinitionNames{
88+
Plural: crdv1.VolumeSnapshotContentResourcePlural,
89+
Kind: reflect.TypeOf(crdv1.VolumeSnapshotContent{}).Name(),
90+
},
91+
},
92+
}
93+
res, err = clientset.ApiextensionsV1beta1().CustomResourceDefinitions().Create(crd)
94+
95+
if err != nil && !apierrors.IsAlreadyExists(err) {
96+
glog.Fatalf("failed to create VolumeSnapshotContentResource: %#v, err: %#v",
97+
res, err)
98+
}
99+
100+
crd = &apiextensionsv1beta1.CustomResourceDefinition{
101+
ObjectMeta: metav1.ObjectMeta{
102+
Name: crdv1.VolumeSnapshotResourcePlural + "." + crdv1.GroupName,
103+
},
104+
Spec: apiextensionsv1beta1.CustomResourceDefinitionSpec{
105+
Group: crdv1.GroupName,
106+
Version: crdv1.SchemeGroupVersion.Version,
107+
Scope: apiextensionsv1beta1.NamespaceScoped,
108+
Names: apiextensionsv1beta1.CustomResourceDefinitionNames{
109+
Plural: crdv1.VolumeSnapshotResourcePlural,
110+
Kind: reflect.TypeOf(crdv1.VolumeSnapshot{}).Name(),
111+
},
112+
},
113+
}
114+
res, err = clientset.ApiextensionsV1beta1().CustomResourceDefinitions().Create(crd)
115+
116+
if err != nil && !apierrors.IsAlreadyExists(err) {
117+
glog.Fatalf("failed to create VolumeSnapshotResource: %#v, err: %#v",
118+
res, err)
119+
}
120+
121+
return nil
122+
}

cmd/csi-snapshotter/main.go

+195-2
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,200 @@
1+
/*
2+
Copyright 2018 The Kubernetes Authors.
3+
4+
Licensed under the Apache License, Version 2.0 (the "License");
5+
you may not use this file except in compliance with the License.
6+
You may obtain a copy of the License at
7+
8+
http://www.apache.org/licenses/LICENSE-2.0
9+
10+
Unless required by applicable law or agreed to in writing, software
11+
distributed under the License is distributed on an "AS IS" BASIS,
12+
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
See the License for the specific language governing permissions and
14+
limitations under the License.
15+
*/
16+
117
package main
218

3-
import "fmt"
19+
import (
20+
"context"
21+
"flag"
22+
"fmt"
23+
"os"
24+
"os/signal"
25+
"time"
26+
27+
"github.com/golang/glog"
28+
"k8s.io/client-go/kubernetes"
29+
"k8s.io/client-go/rest"
30+
"k8s.io/client-go/tools/clientcmd"
31+
32+
"github.com/kubernetes-csi/external-snapshotter/pkg/connection"
33+
"github.com/kubernetes-csi/external-snapshotter/pkg/controller"
34+
35+
clientset "github.com/kubernetes-csi/external-snapshotter/pkg/client/clientset/versioned"
36+
informers "github.com/kubernetes-csi/external-snapshotter/pkg/client/informers/externalversions"
37+
apiextensionsclient "k8s.io/apiextensions-apiserver/pkg/client/clientset/clientset"
38+
)
39+
40+
const (
41+
// Number of worker threads
42+
threads = 10
43+
44+
// Default timeout of short CSI calls like GetPluginInfo
45+
csiTimeout = time.Second
46+
)
47+
48+
// Command line flags
49+
var (
50+
snapshotter = flag.String("snapshotter", "", "Name of the snapshotter. The snapshotter will only create snapshot content for snapshot that requests a VolumeSnapshotClass with a snapshotter field set equal to this name.")
51+
kubeconfig = flag.String("kubeconfig", "", "Absolute path to the kubeconfig file. Required only when running out of cluster.")
52+
connectionTimeout = flag.Duration("connection-timeout", 1*time.Minute, "Timeout for waiting for CSI driver socket.")
53+
csiAddress = flag.String("csi-address", "/run/csi/socket", "Address of the CSI driver socket.")
54+
createSnapshotContentRetryCount = flag.Int("create-snapshotcontent-retrycount", 5, "Number of retries when we create a snapshot content object for a snapshot.")
55+
createSnapshotContentInterval = flag.Duration("create-snapshotcontent-interval", 10*time.Second, "Interval between retries when we create a snapshot content object for a snapshot.")
56+
resyncPeriod = flag.Duration("resync-period", 60*time.Second, "Resync interval of the controller.")
57+
snapshotNamePrefix = flag.String("snapshot-name-prefix", "snapshot", "Prefix to apply to the name of a created snapshot")
58+
snapshotNameUUIDLength = flag.Int("snapshot-name-uuid-length", -1, "Length in characters for the generated uuid of a created snapshot. Defaults behavior is to NOT truncate.")
59+
)
460

561
func main() {
6-
fmt.Println("vim-go")
62+
flag.Set("logtostderr", "true")
63+
flag.Parse()
64+
65+
// Create the client config. Use kubeconfig if given, otherwise assume in-cluster.
66+
config, err := buildConfig(*kubeconfig)
67+
if err != nil {
68+
glog.Error(err.Error())
69+
os.Exit(1)
70+
}
71+
72+
kubeClient, err := kubernetes.NewForConfig(config)
73+
if err != nil {
74+
glog.Error(err.Error())
75+
os.Exit(1)
76+
}
77+
78+
snapClient, err := clientset.NewForConfig(config)
79+
if err != nil {
80+
glog.Errorf("Error building snapshot clientset: %s", err.Error())
81+
os.Exit(1)
82+
}
83+
84+
factory := informers.NewSharedInformerFactory(snapClient, *resyncPeriod)
85+
86+
// Create CRD resource
87+
aeclientset, err := apiextensionsclient.NewForConfig(config)
88+
if err != nil {
89+
glog.Error(err.Error())
90+
os.Exit(1)
91+
}
92+
93+
// initialize CRD resource if it does not exist
94+
err = CreateCRD(aeclientset)
95+
if err != nil {
96+
glog.Error(err.Error())
97+
os.Exit(1)
98+
}
99+
100+
// Connect to CSI.
101+
csiConn, err := connection.New(*csiAddress, *connectionTimeout)
102+
if err != nil {
103+
glog.Error(err.Error())
104+
os.Exit(1)
105+
}
106+
107+
// Pass a context with a timeout
108+
ctx, cancel := context.WithTimeout(context.Background(), csiTimeout)
109+
defer cancel()
110+
111+
// Find driver name
112+
if *snapshotter == "" {
113+
*snapshotter, err = csiConn.GetDriverName(ctx)
114+
if err != nil {
115+
glog.Error(err.Error())
116+
os.Exit(1)
117+
}
118+
}
119+
glog.V(2).Infof("CSI driver name: %q", *snapshotter)
120+
121+
// Check it's ready
122+
if err = waitForDriverReady(csiConn, *connectionTimeout); err != nil {
123+
glog.Error(err.Error())
124+
os.Exit(1)
125+
}
126+
127+
// Find out if the driver supports create/delete snapshot.
128+
supportsCreateSnapshot, err := csiConn.SupportsControllerCreateSnapshot(ctx)
129+
if err != nil {
130+
glog.Error(err.Error())
131+
os.Exit(1)
132+
}
133+
if !supportsCreateSnapshot {
134+
glog.Errorf("CSI driver %s does not support ControllerCreateSnapshot", *snapshotter)
135+
os.Exit(1)
136+
}
137+
138+
if len(*snapshotNamePrefix) == 0 {
139+
glog.Error("Snapshot name prefix cannot be of length 0")
140+
os.Exit(1)
141+
}
142+
143+
glog.V(2).Infof("Start NewCSISnapshotController with snapshotter [%s] kubeconfig [%s] connectionTimeout [%+v] csiAddress [%s] createSnapshotContentRetryCount [%d] createSnapshotContentInterval [%+v] resyncPeriod [%+v] snapshotNamePrefix [%s] snapshotNameUUIDLength [%d]", *snapshotter, *kubeconfig, *connectionTimeout, *csiAddress, createSnapshotContentRetryCount, *createSnapshotContentInterval, *resyncPeriod, *snapshotNamePrefix, snapshotNameUUIDLength)
144+
145+
ctrl := controller.NewCSISnapshotController(
146+
snapClient,
147+
kubeClient,
148+
*snapshotter,
149+
factory.Volumesnapshot().V1alpha1().VolumeSnapshots(),
150+
factory.Volumesnapshot().V1alpha1().VolumeSnapshotContents(),
151+
factory.Volumesnapshot().V1alpha1().VolumeSnapshotClasses(),
152+
*createSnapshotContentRetryCount,
153+
*createSnapshotContentInterval,
154+
csiConn,
155+
*connectionTimeout,
156+
*resyncPeriod,
157+
*snapshotNamePrefix,
158+
*snapshotNameUUIDLength,
159+
)
160+
161+
// run...
162+
stopCh := make(chan struct{})
163+
factory.Start(stopCh)
164+
go ctrl.Run(threads, stopCh)
165+
166+
// ...until SIGINT
167+
c := make(chan os.Signal, 1)
168+
signal.Notify(c, os.Interrupt)
169+
<-c
170+
close(stopCh)
171+
}
172+
173+
func buildConfig(kubeconfig string) (*rest.Config, error) {
174+
if kubeconfig != "" {
175+
return clientcmd.BuildConfigFromFlags("", kubeconfig)
176+
}
177+
return rest.InClusterConfig()
178+
}
179+
180+
func waitForDriverReady(csiConn connection.CSIConnection, timeout time.Duration) error {
181+
now := time.Now()
182+
finish := now.Add(timeout)
183+
var err error
184+
for {
185+
ctx, cancel := context.WithTimeout(context.Background(), csiTimeout)
186+
defer cancel()
187+
err = csiConn.Probe(ctx)
188+
if err == nil {
189+
glog.V(2).Infof("Probe succeeded")
190+
return nil
191+
}
192+
glog.V(2).Infof("Probe failed with %s", err)
193+
194+
now := time.Now()
195+
if now.After(finish) {
196+
return fmt.Errorf("failed to probe the controller: %s", err)
197+
}
198+
time.Sleep(time.Second)
199+
}
7200
}

0 commit comments

Comments
 (0)