Skip to content

Commit 2ea295d

Browse files
committed
add implement for csi plugin
1 parent 7373866 commit 2ea295d

File tree

5 files changed

+379
-8
lines changed

5 files changed

+379
-8
lines changed

cmd/csi-resizer/main.go

+11-3
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,9 @@ var (
3333
resyncPeriod = flag.Duration("resync-period", time.Minute*10, "Resync period for cache")
3434
workers = flag.Int("workers", 10, "Concurrency to process multiple resize requests")
3535

36+
csiAddress = flag.String("csi-address", "/run/csi/socket", "Address of the CSI driver socket.")
37+
csiTimeout = flag.Duration("csiTimeout", 15*time.Second, "Timeout for waiting for CSI driver socket.")
38+
3639
enableLeaderElection = flag.Bool("leader-election", false, "Enable leader election.")
3740
leaderElectionIdentity = flag.String("leader-election-identity", "", "Unique identity of this resizer. Typically name of the pod where the resizer runs.")
3841
leaderElectionNamespace = flag.String("leader-election-namespace", "kube-system", "Namespace where this resizer runs.")
@@ -58,13 +61,18 @@ func main() {
5861
flag.Set("logtostderr", "true")
5962
flag.Parse()
6063

61-
resizerName := "csi/example-resizer"
62-
6364
kubeClient, err := util.NewK8sClient(*master, *kubeConfig)
6465
if err != nil {
6566
klog.Fatal(err.Error())
6667
}
6768

69+
csiResizer, err := resizer.NewCSIResizer(*csiAddress, *csiTimeout)
70+
if err != nil {
71+
klog.Fatal(err.Error())
72+
}
73+
74+
resizerName := csiResizer.Name()
75+
6876
var leaderElectionConfig *util.LeaderElectionConfig
6977
if *enableLeaderElection {
7078
if leaderElectionIdentity == nil || *leaderElectionIdentity == "" {
@@ -80,6 +88,6 @@ func main() {
8088
}
8189
}
8290

83-
rc := controller.NewResizeController(resizerName, resizer.New(), kubeClient, *resyncPeriod)
91+
rc := controller.NewResizeController(resizerName, csiResizer, kubeClient, *resyncPeriod)
8492
rc.Run(*workers, leaderElectionConfig)
8593
}

pkg/csi/client.go

+198
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,198 @@
1+
/*
2+
Copyright 2019 The Kubernetes Authors.
3+
4+
Licensed under the Apache License, Version 2.0 (the "License");
5+
you may not use this file except in compliance with the License.
6+
You may obtain a copy of the License at
7+
8+
http://www.apache.org/licenses/LICENSE-2.0
9+
10+
Unless required by applicable law or agreed to in writing, software
11+
distributed under the License is distributed on an "AS IS" BASIS,
12+
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
See the License for the specific language governing permissions and
14+
limitations under the License.
15+
*/
16+
17+
package csi
18+
19+
import (
20+
"context"
21+
"errors"
22+
"fmt"
23+
"net"
24+
"strings"
25+
"time"
26+
27+
"github.com/container-storage-interface/spec/lib/go/csi"
28+
"github.com/kubernetes-csi/csi-lib-utils/protosanitizer"
29+
"google.golang.org/grpc"
30+
"google.golang.org/grpc/connectivity"
31+
"k8s.io/klog"
32+
)
33+
34+
// Client is a gRPC client connect to remote CSI driver and abstracts all CSI calls.
35+
type Client interface {
36+
// GetDriverName returns driver name as discovered by GetPluginInfo()
37+
// gRPC call.
38+
GetDriverName(ctx context.Context) (string, error)
39+
40+
// SupportsPluginControllerService return true if the CSI driver reports
41+
// CONTROLLER_SERVICE in GetPluginCapabilities() gRPC call.
42+
SupportsPluginControllerService(ctx context.Context) (bool, error)
43+
44+
// SupportsControllerResize returns whether the CSI driver reports EXPAND_VOLUME
45+
// in ControllerGetCapabilities() gRPC call.
46+
SupportsControllerResize(ctx context.Context) (bool, error)
47+
48+
// Expand expands the volume to a new size at least as big as requestBytes.
49+
// It returns the new size and whether the volume need expand operation on the node.
50+
Expand(ctx context.Context, volumeID string, requestBytes int64, secrets map[string]string) (int64, bool, error)
51+
52+
// Probe checks that the CSI driver is ready to process requests
53+
Probe(ctx context.Context) error
54+
}
55+
56+
// New creates a new CSI client.
57+
func New(address string, timeout time.Duration) (Client, error) {
58+
conn, err := newGRPCConnection(address, timeout)
59+
if err != nil {
60+
return nil, err
61+
}
62+
return &client{
63+
idClient: csi.NewIdentityClient(conn),
64+
ctrlClient: csi.NewControllerClient(conn),
65+
}, nil
66+
}
67+
68+
type client struct {
69+
idClient csi.IdentityClient
70+
ctrlClient csi.ControllerClient
71+
}
72+
73+
func (c *client) GetDriverName(ctx context.Context) (string, error) {
74+
req := csi.GetPluginInfoRequest{}
75+
76+
resp, err := c.idClient.GetPluginInfo(ctx, &req)
77+
if err != nil {
78+
return "", err
79+
}
80+
81+
name := resp.GetName()
82+
if name == "" {
83+
return "", errors.New("driver name is empty")
84+
}
85+
86+
return name, nil
87+
}
88+
89+
func (c *client) SupportsPluginControllerService(ctx context.Context) (bool, error) {
90+
rsp, err := c.idClient.GetPluginCapabilities(ctx, &csi.GetPluginCapabilitiesRequest{})
91+
if err != nil {
92+
return false, err
93+
}
94+
caps := rsp.GetCapabilities()
95+
for _, capability := range caps {
96+
if capability == nil {
97+
continue
98+
}
99+
service := capability.GetService()
100+
if service == nil {
101+
continue
102+
}
103+
if service.GetType() == csi.PluginCapability_Service_CONTROLLER_SERVICE {
104+
return true, nil
105+
}
106+
}
107+
return false, nil
108+
}
109+
110+
func (c *client) SupportsControllerResize(ctx context.Context) (bool, error) {
111+
rsp, err := c.ctrlClient.ControllerGetCapabilities(ctx, &csi.ControllerGetCapabilitiesRequest{})
112+
if err != nil {
113+
return false, err
114+
}
115+
caps := rsp.GetCapabilities()
116+
for _, capability := range caps {
117+
if capability == nil {
118+
continue
119+
}
120+
rpc := capability.GetRpc()
121+
if rpc == nil {
122+
continue
123+
}
124+
if rpc.GetType() == csi.ControllerServiceCapability_RPC_EXPAND_VOLUME {
125+
return true, nil
126+
}
127+
}
128+
return false, nil
129+
}
130+
131+
func (c *client) Expand(
132+
ctx context.Context,
133+
volumeID string,
134+
requestBytes int64,
135+
secrets map[string]string) (int64, bool, error) {
136+
req := &csi.ControllerExpandVolumeRequest{
137+
Secrets: secrets,
138+
VolumeId: volumeID,
139+
CapacityRange: &csi.CapacityRange{RequiredBytes: requestBytes},
140+
}
141+
resp, err := c.ctrlClient.ControllerExpandVolume(ctx, req)
142+
if err != nil {
143+
return 0, false, err
144+
}
145+
return resp.CapacityBytes, resp.NodeExpansionRequired, nil
146+
}
147+
148+
func (c *client) Probe(ctx context.Context) error {
149+
resp, err := c.idClient.Probe(ctx, &csi.ProbeRequest{})
150+
if err != nil {
151+
return err
152+
}
153+
if resp.Ready == nil || !resp.Ready.Value {
154+
return errors.New("driver is still initializing")
155+
}
156+
return nil
157+
}
158+
159+
func newGRPCConnection(address string, timeout time.Duration) (*grpc.ClientConn, error) {
160+
klog.V(2).Infof("Connecting to %s", address)
161+
dialOptions := []grpc.DialOption{
162+
grpc.WithInsecure(),
163+
grpc.WithBackoffMaxDelay(time.Second),
164+
grpc.WithUnaryInterceptor(logGRPC),
165+
}
166+
if strings.HasPrefix(address, "/") {
167+
dialOptions = append(dialOptions, grpc.WithDialer(func(addr string, timeout time.Duration) (net.Conn, error) {
168+
return net.DialTimeout("unix", addr, timeout)
169+
}))
170+
}
171+
conn, err := grpc.Dial(address, dialOptions...)
172+
173+
if err != nil {
174+
return nil, err
175+
}
176+
ctx, cancel := context.WithTimeout(context.Background(), timeout)
177+
defer cancel()
178+
for {
179+
if !conn.WaitForStateChange(ctx, conn.GetState()) {
180+
klog.V(4).Infof("Connection timed out")
181+
return conn, fmt.Errorf("Connection timed out")
182+
}
183+
if conn.GetState() == connectivity.Ready {
184+
klog.V(3).Infof("Connected")
185+
return conn, nil
186+
}
187+
klog.V(4).Infof("Still trying, connection is %s", conn.GetState())
188+
}
189+
}
190+
191+
func logGRPC(ctx context.Context, method string, req, reply interface{}, cc *grpc.ClientConn, invoker grpc.UnaryInvoker, opts ...grpc.CallOption) error {
192+
klog.V(5).Infof("GRPC call: %s", method)
193+
klog.V(5).Infof("GRPC request: %s", protosanitizer.StripSecrets(req))
194+
err := invoker(ctx, method, req, reply, cc, opts...)
195+
klog.V(5).Infof("GRPC response: %s", protosanitizer.StripSecrets(reply))
196+
klog.V(5).Infof("GRPC error: %v", err)
197+
return err
198+
}

0 commit comments

Comments
 (0)