@@ -17,24 +17,23 @@ limitations under the License.
17
17
package hostpath
18
18
19
19
import (
20
- "context"
21
20
"errors"
22
21
"fmt"
23
22
"io"
24
23
"io/ioutil"
25
24
"os"
25
+ "os/exec"
26
26
"path/filepath"
27
+ "regexp"
27
28
"sort"
28
29
"strings"
29
30
30
31
"github.com/golang/glog"
31
32
timestamp "github.com/golang/protobuf/ptypes/timestamp"
32
33
"google.golang.org/grpc/codes"
33
34
"google.golang.org/grpc/status"
34
- v1 "k8s.io/api/core/v1"
35
- metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
36
35
"k8s.io/client-go/kubernetes"
37
- "k8s.io/client-go/tools/clientcmd "
36
+ fs "k8s.io/kubernetes/pkg/volume/util/fs "
38
37
"k8s.io/kubernetes/pkg/volume/util/volumepathhandler"
39
38
utilexec "k8s.io/utils/exec"
40
39
)
@@ -129,26 +128,13 @@ func NewHostPathDriver(driverName, nodeID, endpoint string, ephemeral bool, maxV
129
128
glog .Infof ("Driver: %v " , driverName )
130
129
glog .Infof ("Version: %s" , vendorVersion )
131
130
132
- config , err := clientcmd .BuildConfigFromFlags ("" , kubeconfig )
133
- if err != nil {
134
- fmt .Printf ("Failed to build kubeconfig: %s" , err .Error ())
135
- os .Exit (1 )
136
- }
137
-
138
- clientset , err := kubernetes .NewForConfig (config )
139
- if err != nil {
140
- fmt .Printf ("Faild to create k8s client: %s" , err .Error ())
141
- os .Exit (1 )
142
- }
143
-
144
131
return & hostPath {
145
132
name : driverName ,
146
133
version : vendorVersion ,
147
134
nodeID : nodeID ,
148
135
endpoint : endpoint ,
149
136
ephemeral : ephemeral ,
150
137
maxVolumesPerNode : maxVolumesPerNode ,
151
- kclient : clientset ,
152
138
}, nil
153
139
}
154
140
@@ -181,10 +167,12 @@ func discoverExistingSnapshots() {
181
167
}
182
168
}
183
169
184
- func (hp * hostPath ) Run () {
185
- // Initialize existing volumes
186
- hp .initExistingVolumes ()
170
+ func (hp * hostPath ) Run () error {
187
171
// Create GRPC servers
172
+ if err := initialExistingVolumes (); err != nil {
173
+ return err
174
+ }
175
+
188
176
hp .ids = NewIdentityServer (hp .name , hp .version )
189
177
hp .ns = NewNodeServer (hp .nodeID , hp .ephemeral , hp .maxVolumesPerNode )
190
178
hp .cs = NewControllerServer (hp .ephemeral , hp .nodeID )
@@ -193,32 +181,8 @@ func (hp *hostPath) Run() {
193
181
s := NewNonBlockingGRPCServer ()
194
182
s .Start (hp .endpoint , hp .ids , hp .cs , hp .ns )
195
183
s .Wait ()
196
- }
197
-
198
- func (hp * hostPath ) initExistingVolumes () {
199
- pvList , err := hp .kclient .CoreV1 ().PersistentVolumes ().List (context .Background (), metav1.ListOptions {})
200
- if err != nil {
201
- glog .V (3 ).Infof ("Failed to get pv: %s" , err .Error ())
202
- os .Exit (1 )
203
- }
204
-
205
- for _ , pv := range pvList .Items {
206
- volAccessType := mountAccess
207
- if * pv .Spec .VolumeMode == v1 .PersistentVolumeBlock {
208
- volAccessType = blockAccess
209
- }
210
-
211
- hostPathVolumes [pv .Spec .CSI .VolumeHandle ] = hostPathVolume {
212
- VolName : pv .Name ,
213
- VolID : pv .Spec .CSI .VolumeHandle ,
214
- VolSize : pv .Spec .Capacity .Storage ().Value (),
215
- VolPath : getVolumePath (pv .Spec .CSI .VolumeHandle ),
216
- VolAccessType : volAccessType ,
217
- }
218
- }
219
184
220
- glog .V (3 ).Infof ("The existing volume list is: %+v\n " , hostPathVolumes )
221
- return
185
+ return nil
222
186
}
223
187
224
188
func getVolumeByID (volumeID string ) (hostPathVolume , error ) {
@@ -455,3 +419,85 @@ func getSortedVolumeIDs() []string {
455
419
sort .Strings (ids )
456
420
return ids
457
421
}
422
+
423
+ func filterVolumeName (targetPath string ) string {
424
+ pathItems := strings .Split (targetPath , "kubernetes.io~csi/" )
425
+ if len (pathItems ) < 2 {
426
+ return ""
427
+ }
428
+
429
+ return strings .TrimSuffix (pathItems [1 ], "/mount" )
430
+ }
431
+
432
+ func filterVolumeID (sourcePath string ) string {
433
+ volumeSourcePathRegex := regexp .MustCompile (`\[(.*)\]` )
434
+ volumeSP := string (volumeSourcePathRegex .Find ([]byte (sourcePath )))
435
+ if volumeSP == "" {
436
+ return ""
437
+ }
438
+
439
+ return strings .TrimSuffix (strings .TrimPrefix (volumeSP , "[/var/lib/csi-hostpath-data/" ), "]" )
440
+ }
441
+
442
+ func parseVolumeInfo (volume MountPointInfo ) (* hostPathVolume , error ) {
443
+ volumeName := filterVolumeName (volume .Target )
444
+ volumeID := filterVolumeID (volume .Source )
445
+ sourcePath := getSourcePath (volumeID )
446
+ _ , fscapacity , _ , _ , _ , _ , err := fs .FsInfo (sourcePath )
447
+ if err != nil {
448
+ return nil , fmt .Errorf ("failed to get capacity info: %+v" , err )
449
+ }
450
+
451
+ hp := hostPathVolume {
452
+ VolName : volumeName ,
453
+ VolID : volumeID ,
454
+ VolSize : fscapacity ,
455
+ VolPath : getVolumePath (volumeID ),
456
+ VolAccessType : mountAccess ,
457
+ }
458
+
459
+ return & hp , nil
460
+ }
461
+
462
+ func initialExistingVolumes () error {
463
+ cmdPath := locateCommandPath ("findmnt" )
464
+ out , err := exec .Command (cmdPath , "--json" ).CombinedOutput ()
465
+ if err != nil {
466
+ glog .V (3 ).Infof ("failed to execute command: %+v" , cmdPath )
467
+ return err
468
+ }
469
+
470
+ if len (out ) < 1 {
471
+ return fmt .Errorf ("mount point info is nil" )
472
+ }
473
+
474
+ mountInfos , err := parseMountInfo ([]byte (out ))
475
+ if err != nil {
476
+ return fmt .Errorf ("failed to parse the mount infos: %+v" , err )
477
+ }
478
+
479
+ mountInfosOfPod := MountPointInfo {}
480
+ for _ , mountInfo := range mountInfos {
481
+ if mountInfo .Target == podVolumeTargetPath {
482
+ mountInfosOfPod = mountInfo
483
+ break
484
+ }
485
+ }
486
+
487
+ // getting existing volumes based on the mount point infos.
488
+ // It's a temporary solution to recall volumes.
489
+ for _ , pv := range mountInfosOfPod .ContainerFileSystem {
490
+ if ! strings .Contains (pv .Target , csiSignOfVolumeTargetPath ) {
491
+ continue
492
+ }
493
+
494
+ hp , err := parseVolumeInfo (pv )
495
+ if err != nil {
496
+ return err
497
+ }
498
+
499
+ hostPathVolumes [hp .VolID ] = * hp
500
+ }
501
+
502
+ return nil
503
+ }
0 commit comments