Skip to content

Commit f8512c0

Browse files
committed
feature: support external-health-monitor
1 parent 6775833 commit f8512c0

File tree

1,567 files changed

+551870
-87724
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

1,567 files changed

+551870
-87724
lines changed

Dockerfile

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,6 @@ LABEL description="HostPath Driver"
44
ARG binary=./bin/hostpathplugin
55

66
# Add util-linux to get a new version of losetup.
7-
RUN apk add util-linux
7+
RUN apk add util-linux && apk update && apk upgrade
88
COPY ${binary} /hostpathplugin
99
ENTRYPOINT ["/hostpathplugin"]

cmd/hostpathplugin/main.go

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -63,5 +63,10 @@ func handle() {
6363
fmt.Printf("Failed to initialize driver: %s", err.Error())
6464
os.Exit(1)
6565
}
66-
driver.Run()
66+
67+
if err := driver.Run(); err != nil {
68+
fmt.Printf("Failed to run driver: %s", err.Error())
69+
os.Exit(1)
70+
71+
}
6772
}

go.mod

Lines changed: 10 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -3,17 +3,18 @@ module github.com/kubernetes-csi/csi-driver-host-path
33
go 1.12
44

55
require (
6-
github.com/container-storage-interface/spec v1.2.0
6+
github.com/container-storage-interface/spec v1.3.0
77
github.com/golang/glog v0.0.0-20160126235308-23def4e6c14b
8-
github.com/golang/protobuf v1.3.2
9-
github.com/google/uuid v1.0.0 // indirect
8+
github.com/golang/protobuf v1.4.2
9+
github.com/google/gofuzz v1.2.0 // indirect
1010
github.com/kubernetes-csi/csi-lib-utils v0.3.0
1111
github.com/pborman/uuid v0.0.0-20180906182336-adf5a7427709
12-
github.com/spf13/afero v1.2.2 // indirect
13-
github.com/stretchr/testify v1.4.0 // indirect
14-
golang.org/x/net v0.0.0-20190311183353-d8887717615a
15-
google.golang.org/grpc v1.26.0
16-
k8s.io/apimachinery v0.0.0-20181110190943-2a7c93004028 // indirect
12+
github.com/stretchr/testify v1.4.0
13+
golang.org/x/net v0.0.0-20200707034311-ab3426394381
14+
golang.org/x/time v0.0.0-20200630173020-3af7569d3a1e // indirect
15+
google.golang.org/grpc v1.27.0
16+
gopkg.in/yaml.v2 v2.3.0 // indirect
17+
k8s.io/client-go v0.19.1
1718
k8s.io/kubernetes v1.12.2
18-
k8s.io/utils v0.0.0-20181102055113-1bd4f387aa67
19+
k8s.io/utils v0.0.0-20201104234853-8146046b121e
1920
)

go.sum

Lines changed: 278 additions & 20 deletions
Large diffs are not rendered by default.

pkg/hostpath/controllerserver.go

Lines changed: 77 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -61,10 +61,13 @@ func NewControllerServer(ephemeral bool, nodeID string) *controllerServer {
6161
caps: getControllerServiceCapabilities(
6262
[]csi.ControllerServiceCapability_RPC_Type{
6363
csi.ControllerServiceCapability_RPC_CREATE_DELETE_VOLUME,
64+
csi.ControllerServiceCapability_RPC_GET_VOLUME,
6465
csi.ControllerServiceCapability_RPC_CREATE_DELETE_SNAPSHOT,
6566
csi.ControllerServiceCapability_RPC_LIST_SNAPSHOTS,
67+
csi.ControllerServiceCapability_RPC_LIST_VOLUMES,
6668
csi.ControllerServiceCapability_RPC_CLONE_VOLUME,
6769
csi.ControllerServiceCapability_RPC_EXPAND_VOLUME,
70+
csi.ControllerServiceCapability_RPC_VOLUME_CONDITION,
6871
}),
6972
nodeID: nodeID,
7073
}
@@ -277,7 +280,75 @@ func (cs *controllerServer) GetCapacity(ctx context.Context, req *csi.GetCapacit
277280
}
278281

279282
func (cs *controllerServer) ListVolumes(ctx context.Context, req *csi.ListVolumesRequest) (*csi.ListVolumesResponse, error) {
280-
return nil, status.Error(codes.Unimplemented, "")
283+
volumeRes := &csi.ListVolumesResponse{
284+
Entries: []*csi.ListVolumesResponse_Entry{},
285+
}
286+
287+
var (
288+
startIdx, volumesLength, maxLength int64
289+
hpVolume hostPathVolume
290+
)
291+
volumeIds := getSortedVolumeIDs()
292+
if req.StartingToken == "" {
293+
req.StartingToken = "1"
294+
}
295+
296+
startIdx, err := strconv.ParseInt(req.StartingToken, 10, 32)
297+
if err != nil {
298+
return nil, status.Error(codes.InvalidArgument, "The type of startingToken should be integer")
299+
}
300+
301+
volumesLength = int64(len(volumeIds))
302+
maxLength = int64(req.MaxEntries)
303+
304+
if maxLength > volumesLength || maxLength <= 0 {
305+
maxLength = volumesLength
306+
}
307+
308+
for index := startIdx - 1; index < volumesLength && index < maxLength; index++ {
309+
hpVolume = hostPathVolumes[volumeIds[index]]
310+
healthy, msg := doHealthCheck(volumeIds[index])
311+
glog.V(3).Infof("Healthy state: %s Volume: %t", hpVolume.VolName, healthy)
312+
volumeRes.Entries = append(volumeRes.Entries, &csi.ListVolumesResponse_Entry{
313+
Volume: &csi.Volume{
314+
VolumeId: hpVolume.VolID,
315+
CapacityBytes: hpVolume.VolSize,
316+
},
317+
Status: &csi.ListVolumesResponse_VolumeStatus{
318+
PublishedNodeIds: []string{hpVolume.NodeID},
319+
VolumeCondition: &csi.VolumeCondition{
320+
Abnormal: !healthy,
321+
Message: msg,
322+
},
323+
},
324+
})
325+
}
326+
327+
glog.V(5).Infof("Volumes are: %+v", *volumeRes)
328+
return volumeRes, nil
329+
}
330+
331+
func (cs *controllerServer) ControllerGetVolume(ctx context.Context, req *csi.ControllerGetVolumeRequest) (*csi.ControllerGetVolumeResponse, error) {
332+
volume, ok := hostPathVolumes[req.GetVolumeId()]
333+
if !ok {
334+
return nil, status.Error(codes.NotFound, "The volume not found")
335+
}
336+
337+
healthy, msg := doHealthCheck(req.GetVolumeId())
338+
glog.V(3).Infof("Healthy state: %s Volume: %t", volume.VolName, healthy)
339+
return &csi.ControllerGetVolumeResponse{
340+
Volume: &csi.Volume{
341+
VolumeId: volume.VolID,
342+
CapacityBytes: volume.VolSize,
343+
},
344+
Status: &csi.ControllerGetVolumeResponse_VolumeStatus{
345+
PublishedNodeIds: []string{volume.NodeID},
346+
VolumeCondition: &csi.VolumeCondition{
347+
Abnormal: !healthy,
348+
Message: msg,
349+
},
350+
},
351+
}, nil
281352
}
282353

283354
// getSnapshotPath returns the full path to where the snapshot is stored
@@ -312,7 +383,7 @@ func (cs *controllerServer) CreateSnapshot(ctx context.Context, req *csi.CreateS
312383
Snapshot: &csi.Snapshot{
313384
SnapshotId: exSnap.Id,
314385
SourceVolumeId: exSnap.VolID,
315-
CreationTime: &exSnap.CreationTime,
386+
CreationTime: exSnap.CreationTime,
316387
SizeBytes: exSnap.SizeBytes,
317388
ReadyToUse: exSnap.ReadyToUse,
318389
},
@@ -352,7 +423,7 @@ func (cs *controllerServer) CreateSnapshot(ctx context.Context, req *csi.CreateS
352423
snapshot.Id = snapshotID
353424
snapshot.VolID = volumeID
354425
snapshot.Path = file
355-
snapshot.CreationTime = *creationTime
426+
snapshot.CreationTime = creationTime
356427
snapshot.SizeBytes = hostPathVolume.VolSize
357428
snapshot.ReadyToUse = true
358429

@@ -362,7 +433,7 @@ func (cs *controllerServer) CreateSnapshot(ctx context.Context, req *csi.CreateS
362433
Snapshot: &csi.Snapshot{
363434
SnapshotId: snapshot.Id,
364435
SourceVolumeId: snapshot.VolID,
365-
CreationTime: &snapshot.CreationTime,
436+
CreationTime: snapshot.CreationTime,
366437
SizeBytes: snapshot.SizeBytes,
367438
ReadyToUse: snapshot.ReadyToUse,
368439
},
@@ -423,7 +494,7 @@ func (cs *controllerServer) ListSnapshots(ctx context.Context, req *csi.ListSnap
423494
snapshot := csi.Snapshot{
424495
SnapshotId: snap.Id,
425496
SourceVolumeId: snap.VolID,
426-
CreationTime: &snap.CreationTime,
497+
CreationTime: snap.CreationTime,
427498
SizeBytes: snap.SizeBytes,
428499
ReadyToUse: snap.ReadyToUse,
429500
}
@@ -531,7 +602,7 @@ func convertSnapshot(snap hostPathSnapshot) *csi.ListSnapshotsResponse {
531602
Snapshot: &csi.Snapshot{
532603
SnapshotId: snap.Id,
533604
SourceVolumeId: snap.VolID,
534-
CreationTime: &snap.CreationTime,
605+
CreationTime: snap.CreationTime,
535606
SizeBytes: snap.SizeBytes,
536607
ReadyToUse: snap.ReadyToUse,
537608
},

pkg/hostpath/healthcheck.go

Lines changed: 201 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,201 @@
1+
package hostpath
2+
3+
import (
4+
"encoding/json"
5+
"fmt"
6+
"os"
7+
"os/exec"
8+
"path/filepath"
9+
"strings"
10+
11+
"github.com/golang/glog"
12+
fs "k8s.io/kubernetes/pkg/volume/util/fs"
13+
)
14+
15+
const (
16+
// sourcePathPrefix should be specified by the csi-data-dir volume.
17+
// You can change it by modifying the volume path of csi-data-dir volume.
18+
sourcePathPrefix = "/csi-data-dir/"
19+
podVolumeTargetPath = "/var/lib/kubelet/pods"
20+
csiSignOfVolumeTargetPath = "kubernetes.io~csi/pvc"
21+
)
22+
23+
type MountPointInfo struct {
24+
Target string `json:"target"`
25+
Source string `json:"source"`
26+
FsType string `json:"fstype"`
27+
Options string `json:"options"`
28+
ContainerFileSystem []MountPointInfo `json:"children,omitempty"`
29+
}
30+
31+
type ContainerFileSystem struct {
32+
Children []MountPointInfo `json:"children"`
33+
}
34+
35+
type FileSystems struct {
36+
Filsystem []ContainerFileSystem `json:"filesystems"`
37+
}
38+
39+
func locateCommandPath(commandName string) string {
40+
// default to root
41+
binary := filepath.Join("/", commandName)
42+
for _, path := range []string{"/bin", "/usr/sbin", "/usr/bin"} {
43+
binPath := filepath.Join(path, binary)
44+
if _, err := os.Stat(binPath); err != nil {
45+
continue
46+
}
47+
48+
return binPath
49+
}
50+
51+
return ""
52+
}
53+
54+
func getSourcePath(volumeHandle string) string {
55+
return fmt.Sprintf("%s%s", sourcePathPrefix, volumeHandle)
56+
}
57+
58+
func checkSourcePathExist(volumeHandle string) (bool, error) {
59+
sourcePath := getSourcePath(volumeHandle)
60+
glog.V(3).Infof("Volume: %s Source path is: %s", volumeHandle, sourcePath)
61+
_, err := os.Stat(sourcePath)
62+
if err != nil {
63+
if os.IsNotExist(err) {
64+
return false, nil
65+
}
66+
67+
return false, err
68+
}
69+
70+
return true, nil
71+
}
72+
73+
func parseMountInfo(originalMountInfo []byte) ([]MountPointInfo, error) {
74+
fs := FileSystems{
75+
Filsystem: make([]ContainerFileSystem, 0),
76+
}
77+
78+
if err := json.Unmarshal(originalMountInfo, &fs); err != nil {
79+
return nil, err
80+
}
81+
82+
if len(fs.Filsystem) <= 0 {
83+
return nil, fmt.Errorf("failed to get mount info")
84+
}
85+
86+
return fs.Filsystem[0].Children, nil
87+
}
88+
89+
func checkMountPointExist(sourcePath string) (bool, error) {
90+
cmdPath := locateCommandPath("findmnt")
91+
out, err := exec.Command(cmdPath, "--json").CombinedOutput()
92+
if err != nil {
93+
glog.V(3).Infof("failed to execute command: %+v", cmdPath)
94+
return false, err
95+
}
96+
97+
if len(out) < 1 {
98+
return false, fmt.Errorf("mount point info is nil")
99+
}
100+
101+
mountInfos, err := parseMountInfo([]byte(out))
102+
if err != nil {
103+
return false, fmt.Errorf("failed to parse the mount infos: %+v", err)
104+
}
105+
106+
mountInfosOfPod := MountPointInfo{}
107+
for _, mountInfo := range mountInfos {
108+
if mountInfo.Target == podVolumeTargetPath {
109+
mountInfosOfPod = mountInfo
110+
break
111+
}
112+
}
113+
114+
for _, mountInfo := range mountInfosOfPod.ContainerFileSystem {
115+
if !strings.Contains(mountInfo.Source, sourcePath) {
116+
continue
117+
}
118+
119+
_, err = os.Stat(mountInfo.Target)
120+
if err != nil {
121+
if os.IsNotExist(err) {
122+
return false, nil
123+
}
124+
125+
return false, err
126+
}
127+
128+
return true, nil
129+
}
130+
131+
return false, nil
132+
}
133+
134+
func checkPVCapacityValid(volumeHandle string) (bool, error) {
135+
sourcePath := getSourcePath(volumeHandle)
136+
_, fscapacity, _, _, _, _, err := fs.FsInfo(sourcePath)
137+
if err != nil {
138+
return false, fmt.Errorf("failed to get capacity info: %+v", err)
139+
}
140+
141+
volumeCapacity := hostPathVolumes[volumeHandle].VolSize
142+
glog.V(3).Infof("volume capacity: %+v fs capacity:%+v", volumeCapacity, fscapacity)
143+
return fscapacity >= volumeCapacity, nil
144+
}
145+
146+
func getPVCapacity(volumeHandle string) (int64, int64, int64, error) {
147+
sourcePath := getSourcePath(volumeHandle)
148+
fsavailable, fscapacity, fsused, _, _, _, err := fs.FsInfo(sourcePath)
149+
return fscapacity, fsused, fsavailable, err
150+
}
151+
152+
func checkPVUsage(volumeHandle string) (bool, error) {
153+
sourcePath := getSourcePath(volumeHandle)
154+
fsavailable, _, _, _, _, _, err := fs.FsInfo(sourcePath)
155+
if err != nil {
156+
return false, err
157+
}
158+
159+
glog.V(3).Infof("fs available: %+v", fsavailable)
160+
return fsavailable > 0, nil
161+
}
162+
163+
func doHealthCheck(volumeHandle string) (bool, string) {
164+
spExist, err := checkSourcePathExist(volumeHandle)
165+
if err != nil {
166+
return false, err.Error()
167+
}
168+
169+
if !spExist {
170+
return false, "The source path of the volume doesn't exist"
171+
}
172+
173+
mpExist, err := checkMountPointExist(volumeHandle)
174+
if err != nil {
175+
return false, err.Error()
176+
}
177+
178+
if !mpExist {
179+
return false, "The volume isn't mounted"
180+
}
181+
182+
capValid, err := checkPVCapacityValid(volumeHandle)
183+
if err != nil {
184+
return false, err.Error()
185+
}
186+
187+
if !capValid {
188+
return false, "The capacity of volume is greater than actual storage"
189+
}
190+
191+
available, err := checkPVUsage(volumeHandle)
192+
if err != nil {
193+
return false, err.Error()
194+
}
195+
196+
if !available {
197+
return false, "The free space of the volume is insufficient"
198+
}
199+
200+
return true, ""
201+
}

0 commit comments

Comments
 (0)