@@ -17,24 +17,23 @@ limitations under the License.
17
17
package hostpath
18
18
19
19
import (
20
- "context"
21
20
"errors"
22
21
"fmt"
23
22
"io"
24
23
"io/ioutil"
25
24
"os"
25
+ "os/exec"
26
26
"path/filepath"
27
+ "regexp"
27
28
"sort"
28
29
"strings"
29
30
30
31
"github.com/golang/glog"
31
32
timestamp "github.com/golang/protobuf/ptypes/timestamp"
32
33
"google.golang.org/grpc/codes"
33
34
"google.golang.org/grpc/status"
34
- v1 "k8s.io/api/core/v1"
35
- metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
36
35
"k8s.io/client-go/kubernetes"
37
- "k8s.io/client-go/tools/clientcmd "
36
+ fs "k8s.io/kubernetes/pkg/volume/util/fs "
38
37
"k8s.io/kubernetes/pkg/volume/util/volumepathhandler"
39
38
utilexec "k8s.io/utils/exec"
40
39
)
@@ -129,26 +128,13 @@ func NewHostPathDriver(driverName, nodeID, endpoint string, ephemeral bool, maxV
129
128
glog .Infof ("Driver: %v " , driverName )
130
129
glog .Infof ("Version: %s" , vendorVersion )
131
130
132
- config , err := clientcmd .BuildConfigFromFlags ("" , kubeconfig )
133
- if err != nil {
134
- fmt .Printf ("Failed to build kubeconfig: %s" , err .Error ())
135
- os .Exit (1 )
136
- }
137
-
138
- clientset , err := kubernetes .NewForConfig (config )
139
- if err != nil {
140
- fmt .Printf ("Faild to create k8s client: %s" , err .Error ())
141
- os .Exit (1 )
142
- }
143
-
144
131
return & hostPath {
145
132
name : driverName ,
146
133
version : vendorVersion ,
147
134
nodeID : nodeID ,
148
135
endpoint : endpoint ,
149
136
ephemeral : ephemeral ,
150
137
maxVolumesPerNode : maxVolumesPerNode ,
151
- kclient : clientset ,
152
138
}, nil
153
139
}
154
140
@@ -181,10 +167,12 @@ func discoverExistingSnapshots() {
181
167
}
182
168
}
183
169
184
- func (hp * hostPath ) Run () {
185
- // Initialize existing volumes
186
- hp .initExistingVolumes ()
170
+ func (hp * hostPath ) Run () error {
187
171
// Create GRPC servers
172
+ if err := initialExistingVolumes (); err != nil {
173
+ return err
174
+ }
175
+
188
176
hp .ids = NewIdentityServer (hp .name , hp .version )
189
177
hp .ns = NewNodeServer (hp .nodeID , hp .ephemeral , hp .maxVolumesPerNode )
190
178
hp .cs = NewControllerServer (hp .ephemeral , hp .nodeID )
@@ -193,32 +181,8 @@ func (hp *hostPath) Run() {
193
181
s := NewNonBlockingGRPCServer ()
194
182
s .Start (hp .endpoint , hp .ids , hp .cs , hp .ns )
195
183
s .Wait ()
196
- }
197
-
198
- func (hp * hostPath ) initExistingVolumes () {
199
- pvList , err := hp .kclient .CoreV1 ().PersistentVolumes ().List (context .Background (), metav1.ListOptions {})
200
- if err != nil {
201
- glog .V (3 ).Infof ("Failed to get pv: %s" , err .Error ())
202
- os .Exit (1 )
203
- }
204
-
205
- for _ , pv := range pvList .Items {
206
- volAccessType := mountAccess
207
- if * pv .Spec .VolumeMode == v1 .PersistentVolumeBlock {
208
- volAccessType = blockAccess
209
- }
210
-
211
- hostPathVolumes [pv .Spec .CSI .VolumeHandle ] = hostPathVolume {
212
- VolName : pv .Name ,
213
- VolID : pv .Spec .CSI .VolumeHandle ,
214
- VolSize : pv .Spec .Capacity .Storage ().Value (),
215
- VolPath : getVolumePath (pv .Spec .CSI .VolumeHandle ),
216
- VolAccessType : volAccessType ,
217
- }
218
- }
219
184
220
- glog .V (3 ).Infof ("The existing volume list is: %+v\n " , hostPathVolumes )
221
- return
185
+ return nil
222
186
}
223
187
224
188
func getVolumeByID (volumeID string ) (hostPathVolume , error ) {
@@ -463,3 +427,85 @@ func getSortedVolumeIDs() []string {
463
427
sort .Strings (ids )
464
428
return ids
465
429
}
430
+
431
+ func filterVolumeName (targetPath string ) string {
432
+ pathItems := strings .Split (targetPath , "kubernetes.io~csi/" )
433
+ if len (pathItems ) < 2 {
434
+ return ""
435
+ }
436
+
437
+ return strings .TrimSuffix (pathItems [1 ], "/mount" )
438
+ }
439
+
440
+ func filterVolumeID (sourcePath string ) string {
441
+ volumeSourcePathRegex := regexp .MustCompile (`\[(.*)\]` )
442
+ volumeSP := string (volumeSourcePathRegex .Find ([]byte (sourcePath )))
443
+ if volumeSP == "" {
444
+ return ""
445
+ }
446
+
447
+ return strings .TrimSuffix (strings .TrimPrefix (volumeSP , "[/var/lib/csi-hostpath-data/" ), "]" )
448
+ }
449
+
450
+ func parseVolumeInfo (volume MountPointInfo ) (* hostPathVolume , error ) {
451
+ volumeName := filterVolumeName (volume .Target )
452
+ volumeID := filterVolumeID (volume .Source )
453
+ sourcePath := getSourcePath (volumeID )
454
+ _ , fscapacity , _ , _ , _ , _ , err := fs .FsInfo (sourcePath )
455
+ if err != nil {
456
+ return nil , fmt .Errorf ("failed to get capacity info: %+v" , err )
457
+ }
458
+
459
+ hp := hostPathVolume {
460
+ VolName : volumeName ,
461
+ VolID : volumeID ,
462
+ VolSize : fscapacity ,
463
+ VolPath : getVolumePath (volumeID ),
464
+ VolAccessType : mountAccess ,
465
+ }
466
+
467
+ return & hp , nil
468
+ }
469
+
470
+ func initialExistingVolumes () error {
471
+ cmdPath := locateCommandPath ("findmnt" )
472
+ out , err := exec .Command (cmdPath , "--json" ).CombinedOutput ()
473
+ if err != nil {
474
+ glog .V (3 ).Infof ("failed to execute command: %+v" , cmdPath )
475
+ return err
476
+ }
477
+
478
+ if len (out ) < 1 {
479
+ return fmt .Errorf ("mount point info is nil" )
480
+ }
481
+
482
+ mountInfos , err := parseMountInfo ([]byte (out ))
483
+ if err != nil {
484
+ return fmt .Errorf ("failed to parse the mount infos: %+v" , err )
485
+ }
486
+
487
+ mountInfosOfPod := MountPointInfo {}
488
+ for _ , mountInfo := range mountInfos {
489
+ if mountInfo .Target == podVolumeTargetPath {
490
+ mountInfosOfPod = mountInfo
491
+ break
492
+ }
493
+ }
494
+
495
+ // getting existing volumes based on the mount point infos.
496
+ // It's a temporary solution to recall volumes.
497
+ for _ , pv := range mountInfosOfPod .ContainerFileSystem {
498
+ if ! strings .Contains (pv .Target , csiSignOfVolumeTargetPath ) {
499
+ continue
500
+ }
501
+
502
+ hp , err := parseVolumeInfo (pv )
503
+ if err != nil {
504
+ return err
505
+ }
506
+
507
+ hostPathVolumes [hp .VolID ] = * hp
508
+ }
509
+
510
+ return nil
511
+ }
0 commit comments