Skip to content

Commit c91dc14

Browse files
committed
feature: implement necessary gRPC handlers of health-monitor
1 parent e3559b7 commit c91dc14

File tree

1,596 files changed

+546499
-98222
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

1,596 files changed

+546499
-98222
lines changed

Dockerfile

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,6 @@ LABEL description="HostPath Driver"
44
ARG binary=./bin/hostpathplugin
55

66
# Add util-linux to get a new version of losetup.
7-
RUN apk add util-linux
7+
RUN apk add util-linux && apk update && apk upgrade
88
COPY ${binary} /hostpathplugin
99
ENTRYPOINT ["/hostpathplugin"]

cmd/hostpathplugin/main.go

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,7 @@ var (
3636
ephemeral = flag.Bool("ephemeral", false, "publish volumes in ephemeral mode even if kubelet did not ask for it (only needed for Kubernetes 1.15)")
3737
maxVolumesPerNode = flag.Int64("maxvolumespernode", 0, "limit of volumes per node")
3838
showVersion = flag.Bool("version", false, "Show version.")
39+
kubeconfig = flag.String("kubeconfig", "", "absolute path to the kubeconfig file")
3940
// Set by the build process
4041
version = ""
4142
)
@@ -58,10 +59,11 @@ func main() {
5859
}
5960

6061
func handle() {
61-
driver, err := hostpath.NewHostPathDriver(*driverName, *nodeID, *endpoint, *ephemeral, *maxVolumesPerNode, version)
62+
driver, err := hostpath.NewHostPathDriver(*driverName, *nodeID, *endpoint, *ephemeral, *maxVolumesPerNode, version, *kubeconfig)
6263
if err != nil {
6364
fmt.Printf("Failed to initialize driver: %s", err.Error())
6465
os.Exit(1)
6566
}
67+
6668
driver.Run()
6769
}

go.mod

Lines changed: 10 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -4,19 +4,19 @@ go 1.12
44

55
require (
66
github.com/container-storage-interface/spec v1.3.0
7-
github.com/gogo/protobuf v1.3.1 // indirect
87
github.com/golang/glog v0.0.0-20160126235308-23def4e6c14b
9-
github.com/golang/protobuf v1.3.2
8+
github.com/golang/protobuf v1.4.2
109
github.com/google/gofuzz v1.2.0 // indirect
11-
github.com/google/uuid v1.0.0 // indirect
10+
github.com/imdario/mergo v0.3.11 // indirect
1211
github.com/kubernetes-csi/csi-lib-utils v0.3.0
1312
github.com/pborman/uuid v0.0.0-20180906182336-adf5a7427709
14-
github.com/spf13/afero v1.2.2 // indirect
15-
github.com/stretchr/testify v1.4.0 // indirect
16-
golang.org/x/net v0.0.0-20190311183353-d8887717615a
17-
google.golang.org/grpc v1.26.0
18-
gopkg.in/inf.v0 v0.9.1 // indirect
19-
k8s.io/apimachinery v0.0.0-20181110190943-2a7c93004028 // indirect
13+
github.com/stretchr/testify v1.4.0
14+
golang.org/x/net v0.0.0-20200707034311-ab3426394381
15+
golang.org/x/time v0.0.0-20200630173020-3af7569d3a1e // indirect
16+
google.golang.org/grpc v1.27.0
17+
k8s.io/api v0.19.0
18+
k8s.io/apimachinery v0.19.0
19+
k8s.io/client-go v0.19.0
2020
k8s.io/kubernetes v1.12.2
21-
k8s.io/utils v0.0.0-20181102055113-1bd4f387aa67
21+
k8s.io/utils v0.0.0-20201015054608-420da100c033
2222
)

go.sum

Lines changed: 278 additions & 6 deletions
Large diffs are not rendered by default.

pkg/hostpath/controllerserver.go

Lines changed: 34 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -67,6 +67,7 @@ func NewControllerServer(ephemeral bool, nodeID string) *controllerServer {
6767
csi.ControllerServiceCapability_RPC_LIST_VOLUMES,
6868
csi.ControllerServiceCapability_RPC_CLONE_VOLUME,
6969
csi.ControllerServiceCapability_RPC_EXPAND_VOLUME,
70+
csi.ControllerServiceCapability_RPC_VOLUME_CONDITION,
7071
}),
7172
nodeID: nodeID,
7273
}
@@ -288,16 +289,26 @@ func (cs *controllerServer) ListVolumes(ctx context.Context, req *csi.ListVolume
288289
hpVolume hostPathVolume
289290
)
290291
volumeIds := getSortedVolumeIDs()
292+
if req.StartingToken == "" {
293+
req.StartingToken = "1"
294+
}
295+
291296
startIdx, err := strconv.ParseInt(req.StartingToken, 10, 32)
292297
if err != nil {
293298
return nil, status.Error(codes.InvalidArgument, "The type of startingToken should be integer")
294299
}
295300

296301
volumesLength = int64(len(volumeIds))
297302
maxLength = int64(req.MaxEntries)
298-
for index := startIdx - 1; index < volumesLength || index < maxLength; index++ {
303+
304+
if maxLength > volumesLength || maxLength <= 0 {
305+
maxLength = volumesLength
306+
}
307+
308+
for index := startIdx - 1; index < volumesLength && index < maxLength; index++ {
299309
hpVolume = hostPathVolumes[volumeIds[index]]
300310
healthy, msg := doHealthCheck(volumeIds[index])
311+
glog.V(3).Infof("Healthy state: %+v Volume: %+v", hpVolume.VolName, healthy)
301312
volumeRes.Entries = append(volumeRes.Entries, &csi.ListVolumesResponse_Entry{
302313
Volume: &csi.Volume{
303314
VolumeId: hpVolume.VolID,
@@ -312,11 +323,32 @@ func (cs *controllerServer) ListVolumes(ctx context.Context, req *csi.ListVolume
312323
},
313324
})
314325
}
326+
327+
glog.V(5).Infof("Volumes are: %+v", *volumeRes)
315328
return volumeRes, nil
316329
}
317330

318331
func (cs *controllerServer) ControllerGetVolume(ctx context.Context, req *csi.ControllerGetVolumeRequest) (*csi.ControllerGetVolumeResponse, error) {
319-
return nil, status.Error(codes.Unimplemented, "")
332+
volume, ok := hostPathVolumes[req.GetVolumeId()]
333+
if !ok {
334+
return nil, status.Error(codes.NotFound, "The volume not found")
335+
}
336+
337+
healthy, msg := doHealthCheck(req.GetVolumeId())
338+
glog.V(3).Infof("Healthy state: %+v Volume: %+v", volume.VolName, healthy)
339+
return &csi.ControllerGetVolumeResponse{
340+
Volume: &csi.Volume{
341+
VolumeId: volume.VolID,
342+
CapacityBytes: volume.VolSize,
343+
},
344+
Status: &csi.ControllerGetVolumeResponse_VolumeStatus{
345+
PublishedNodeIds: []string{volume.NodeID},
346+
VolumeCondition: &csi.VolumeCondition{
347+
Abnormal: !healthy,
348+
Message: msg,
349+
},
350+
},
351+
}, nil
320352
}
321353

322354
// getSnapshotPath returns the full path to where the snapshot is stored

pkg/hostpath/healthcheck.go

Lines changed: 74 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -1,21 +1,40 @@
11
package hostpath
22

33
import (
4+
"encoding/json"
45
"fmt"
56
"os"
7+
"os/exec"
68
"path/filepath"
79
"strings"
810

11+
"github.com/golang/glog"
912
fs "k8s.io/kubernetes/pkg/volume/util/fs"
10-
utilexec "k8s.io/utils/exec"
1113
)
1214

1315
const (
1416
// sourcePathPrefix should be specified by the csi-data-dir volume.
1517
// You can change it by modifying the volume path of csi-data-dir volume.
16-
sourcePathPrefix = "/var/lib/csi-hostpath-data/"
18+
sourcePathPrefix = "/csi-data-dir/"
19+
podVolumeTargetPath = "/var/lib/kubelet/pods"
1720
)
1821

22+
type MountPointInfo struct {
23+
Target string `json:"target"`
24+
Source string `json:"source"`
25+
FsType string `json:"fstype"`
26+
Options string `json:"options"`
27+
ContainerFileSystem []MountPointInfo `json:"children,omitempty"`
28+
}
29+
30+
type ContainerFileSystem struct {
31+
Children []MountPointInfo `json:"children"`
32+
}
33+
34+
type FileSystems struct {
35+
Filsystem []ContainerFileSystem `json:"filesystems"`
36+
}
37+
1938
func locateCommandPath(commandName string) string {
2039
// default to root
2140
binary := filepath.Join("/", commandName)
@@ -36,7 +55,8 @@ func getSourcePath(volumeHandle string) string {
3655
}
3756

3857
func checkSourcePathExist(volumeHandle string) (bool, error) {
39-
sourcePath := getSnapshotPath(volumeHandle)
58+
sourcePath := getSourcePath(volumeHandle)
59+
glog.V(3).Infof("Volume: %+v Source path is: %+v", volumeHandle, sourcePath)
4060
_, err := os.Stat(sourcePath)
4161
if err != nil {
4262
if os.IsNotExist(err) {
@@ -49,60 +69,93 @@ func checkSourcePathExist(volumeHandle string) (bool, error) {
4969
return true, nil
5070
}
5171

72+
func parseMountInfo(originalMountInfo []byte) ([]MountPointInfo, error) {
73+
fs := FileSystems{
74+
Filsystem: make([]ContainerFileSystem, 0),
75+
}
76+
77+
if err := json.Unmarshal(originalMountInfo, &fs); err != nil {
78+
return nil, err
79+
}
80+
81+
if len(fs.Filsystem) <= 0 {
82+
return nil, fmt.Errorf("failed to get mount info")
83+
}
84+
85+
return fs.Filsystem[0].Children, nil
86+
}
87+
5288
func checkMountPointExist(sourcePath string) (bool, error) {
53-
exec := utilexec.New()
54-
args := []string{"-l", "-o TARGET,SOURCE"}
55-
out, err := exec.Command(locateCommandPath("findmnt"), args...).CombinedOutput()
89+
cmdPath := locateCommandPath("findmnt")
90+
out, err := exec.Command(cmdPath, "--json").CombinedOutput()
5691
if err != nil {
92+
glog.V(3).Infof("failed to execute command: %+v", cmdPath)
5793
return false, err
5894
}
5995

6096
if len(out) < 1 {
6197
return false, fmt.Errorf("mount point info is nil")
6298
}
6399

64-
mountInfo := strings.Split(string(out), " ")
65-
if len(mountInfo) != 2 {
66-
return false, nil
100+
mountInfos, err := parseMountInfo([]byte(out))
101+
if err != nil {
102+
return false, fmt.Errorf("failed to parse the mount infos: %+v", err)
67103
}
68104

69-
mountPoint := mountInfo[0]
70-
sp := mountInfo[1]
71-
72-
if !strings.Contains(sp, sourcePath) {
73-
return false, nil
105+
mountInfosOfPod := MountPointInfo{}
106+
for _, mountInfo := range mountInfos {
107+
if mountInfo.Target == podVolumeTargetPath {
108+
mountInfosOfPod = mountInfo
109+
break
110+
}
74111
}
75112

76-
_, err = os.Stat(mountPoint)
77-
if err != nil {
78-
if os.IsNotExist(err) {
79-
return false, nil
113+
for _, mountInfo := range mountInfosOfPod.ContainerFileSystem {
114+
if !strings.Contains(mountInfo.Source, sourcePath) {
115+
continue
80116
}
81117

82-
return false, err
118+
_, err = os.Stat(mountInfo.Target)
119+
if err != nil {
120+
if os.IsNotExist(err) {
121+
return false, nil
122+
}
123+
124+
return false, err
125+
}
126+
127+
return true, nil
83128
}
84129

85-
return true, nil
130+
return false, nil
86131
}
87132

88133
func checkPVCapacityValid(volumeHandle string) (bool, error) {
89134
sourcePath := getSourcePath(volumeHandle)
90135
_, fscapacity, _, _, _, _, err := fs.FsInfo(sourcePath)
91136
if err != nil {
92-
return false, err
137+
return false, fmt.Errorf("failed to get capacity info: %+v", err)
93138
}
94139

95140
volumeCapacity := hostPathVolumes[volumeHandle].VolSize
141+
glog.V(3).Infof("volume capacity: %+v fs capacity:%+v", volumeCapacity, fscapacity)
96142
return fscapacity >= volumeCapacity, nil
97143
}
98144

145+
func getPVCapacity(volumeHandle string) (int64, int64, int64, error) {
146+
sourcePath := getSourcePath(volumeHandle)
147+
fsavailable, fscapacity, fsused, _, _, _, err := fs.FsInfo(sourcePath)
148+
return fscapacity, fsused, fsavailable, err
149+
}
150+
99151
func checkPVUsage(volumeHandle string) (bool, error) {
100152
sourcePath := getSourcePath(volumeHandle)
101153
fsavailable, _, _, _, _, _, err := fs.FsInfo(sourcePath)
102154
if err != nil {
103155
return false, err
104156
}
105157

158+
glog.V(3).Infof("fs available: %+v", fsavailable)
106159
return fsavailable > 0, nil
107160
}
108161

0 commit comments

Comments
 (0)