Skip to content

Commit 9e7831d

Browse files
committed
feature: support external-health-monitor
1 parent e5c8072 commit 9e7831d

File tree

195 files changed

+64920
-32
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

195 files changed

+64920
-32
lines changed

Dockerfile

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,6 @@ LABEL description="HostPath Driver"
44
ARG binary=./bin/hostpathplugin
55

66
# Add util-linux to get a new version of losetup.
7-
RUN apk add util-linux
7+
RUN apk add util-linux && apk update && apk upgrade
88
COPY ${binary} /hostpathplugin
99
ENTRYPOINT ["/hostpathplugin"]

cmd/hostpathplugin/main.go

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -63,5 +63,10 @@ func handle() {
6363
fmt.Printf("Failed to initialize driver: %s", err.Error())
6464
os.Exit(1)
6565
}
66-
driver.Run()
66+
67+
if err := driver.Run(); err != nil {
68+
fmt.Printf("Failed to run driver: %s", err.Error())
69+
os.Exit(1)
70+
71+
}
6772
}

deploy/kubernetes-1.17/hostpath/csi-hostpath-plugin.yaml

Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,37 @@ spec:
3434
labels:
3535
app: csi-hostpathplugin
3636
spec:
37+
serviceAccount: csi-external-health-monitor-controller
3738
containers:
39+
- name: csi-external-health-monitor-agent
40+
image: k8s.gcr.io/sig-storage/csi-external-health-monitor-agent:v0.2.0
41+
args:
42+
- "--v=5"
43+
- "--csi-address=$(ADDRESS)"
44+
env:
45+
- name: NODE_NAME
46+
valueFrom:
47+
fieldRef:
48+
fieldPath: spec.nodeName
49+
- name: ADDRESS
50+
value: /csi/csi.sock
51+
imagePullPolicy: "IfNotPresent"
52+
volumeMounts:
53+
- name: socket-dir
54+
mountPath: /csi
55+
- name: csi-external-health-monitor-controller
56+
image: k8s.gcr.io/sig-storage/csi-external-health-monitor-controller:v0.2.0
57+
args:
58+
- "--v=5"
59+
- "--csi-address=$(ADDRESS)"
60+
- "--leader-election"
61+
env:
62+
- name: ADDRESS
63+
value: /csi/csi.sock
64+
imagePullPolicy: "IfNotPresent"
65+
volumeMounts:
66+
- name: socket-dir
67+
mountPath: /csi
3868
- name: node-driver-registrar
3969
image: k8s.gcr.io/sig-storage/csi-node-driver-registrar:v2.0.1
4070
args:

deploy/util/deploy-hostpath.sh

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -125,6 +125,9 @@ CSI_SNAPSHOTTER_RBAC_YAML="https://raw.githubusercontent.com/kubernetes-csi/exte
125125
CSI_RESIZER_RBAC_YAML="https://raw.githubusercontent.com/kubernetes-csi/external-resizer/$(rbac_version "${BASE_DIR}/hostpath/csi-hostpath-resizer.yaml" csi-resizer false)/deploy/kubernetes/rbac.yaml"
126126
: ${CSI_RESIZER_RBAC:=https://raw.githubusercontent.com/kubernetes-csi/external-resizer/$(rbac_version "${BASE_DIR}/hostpath/csi-hostpath-resizer.yaml" csi-resizer "${UPDATE_RBAC_RULES}")/deploy/kubernetes/rbac.yaml}
127127

128+
CSI_EXTERNALHEALTH_MONITOR_RBAC_YAML="https://raw.githubusercontent.com/kubernetes-csi/external-health-monitor/$(rbac_version "${BASE_DIR}/hostpath/csi-hostpath-plugin.yaml" csi-external-health-monitor-controller false)/deploy/kubernetes/external-health-monitor-controller/rbac.yaml"
129+
: ${CSI_EXTERNALHEALTH_MONITOR_RBAC:=https://raw.githubusercontent.com/kubernetes-csi/external-health-monitor/$(rbac_version "${BASE_DIR}/hostpath/csi-hostpath-plugin.yaml" csi-external-health-monitor-controller "${UPDATE_RBAC_RULES}")/deploy/kubernetes/external-health-monitor-controller/rbac.yaml}
130+
128131
INSTALL_CRD=${INSTALL_CRD:-"false"}
129132

130133
# Some images are not affected by *_REGISTRY/*_TAG and IMAGE_* variables.
@@ -140,7 +143,7 @@ run () {
140143

141144
# rbac rules
142145
echo "applying RBAC rules"
143-
for component in CSI_PROVISIONER CSI_ATTACHER CSI_SNAPSHOTTER CSI_RESIZER; do
146+
for component in CSI_PROVISIONER CSI_ATTACHER CSI_SNAPSHOTTER CSI_RESIZER CSI_EXTERNALHEALTH_MONITOR; do
144147
eval current="\${${component}_RBAC}"
145148
eval original="\${${component}_RBAC_YAML}"
146149
if [ "$current" != "$original" ]; then

go.mod

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@ require (
99
github.com/golang/protobuf v1.4.3
1010
github.com/kubernetes-csi/csi-lib-utils v0.9.0
1111
github.com/pborman/uuid v1.2.1
12+
github.com/stretchr/testify v1.6.1
1213
golang.org/x/net v0.0.0-20201209123823-ac852fbbde11
1314
golang.org/x/sys v0.0.0-20201207223542-d4d67f95c62d // indirect
1415
google.golang.org/genproto v0.0.0-20201209185603-f92720507ed4 // indirect

go.sum

Lines changed: 11 additions & 17 deletions
Large diffs are not rendered by default.

hack/get-sanity.sh

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
#!/bin/sh
22

3-
VERSION="v1.0.0-rc2"
3+
VERSION="v4.0.2"
44
SANITYTGZ="csi-sanity-${VERSION}.linux.amd64.tar.gz"
55

66
echo "Downloading csi-test from https://github.com/kubernetes-csi/csi-test/releases/download/${VERSION}/${SANITYTGZ}"

pkg/hostpath/controllerserver.go

Lines changed: 72 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -61,10 +61,13 @@ func NewControllerServer(ephemeral bool, nodeID string) *controllerServer {
6161
caps: getControllerServiceCapabilities(
6262
[]csi.ControllerServiceCapability_RPC_Type{
6363
csi.ControllerServiceCapability_RPC_CREATE_DELETE_VOLUME,
64+
csi.ControllerServiceCapability_RPC_GET_VOLUME,
6465
csi.ControllerServiceCapability_RPC_CREATE_DELETE_SNAPSHOT,
6566
csi.ControllerServiceCapability_RPC_LIST_SNAPSHOTS,
67+
csi.ControllerServiceCapability_RPC_LIST_VOLUMES,
6668
csi.ControllerServiceCapability_RPC_CLONE_VOLUME,
6769
csi.ControllerServiceCapability_RPC_EXPAND_VOLUME,
70+
csi.ControllerServiceCapability_RPC_VOLUME_CONDITION,
6871
}),
6972
nodeID: nodeID,
7073
}
@@ -279,7 +282,75 @@ func (cs *controllerServer) GetCapacity(ctx context.Context, req *csi.GetCapacit
279282
}
280283

281284
func (cs *controllerServer) ListVolumes(ctx context.Context, req *csi.ListVolumesRequest) (*csi.ListVolumesResponse, error) {
282-
return nil, status.Error(codes.Unimplemented, "")
285+
volumeRes := &csi.ListVolumesResponse{
286+
Entries: []*csi.ListVolumesResponse_Entry{},
287+
}
288+
289+
var (
290+
startIdx, volumesLength, maxLength int64
291+
hpVolume hostPathVolume
292+
)
293+
volumeIds := getSortedVolumeIDs()
294+
if req.StartingToken == "" {
295+
req.StartingToken = "1"
296+
}
297+
298+
startIdx, err := strconv.ParseInt(req.StartingToken, 10, 32)
299+
if err != nil {
300+
return nil, status.Error(codes.Aborted, "The type of startingToken should be integer")
301+
}
302+
303+
volumesLength = int64(len(volumeIds))
304+
maxLength = int64(req.MaxEntries)
305+
306+
if maxLength > volumesLength || maxLength <= 0 {
307+
maxLength = volumesLength
308+
}
309+
310+
for index := startIdx - 1; index < volumesLength && index < maxLength; index++ {
311+
hpVolume = hostPathVolumes[volumeIds[index]]
312+
healthy, msg := doHealthCheckInControllerSide(volumeIds[index])
313+
glog.V(3).Infof("Healthy state: %s Volume: %t", hpVolume.VolName, healthy)
314+
volumeRes.Entries = append(volumeRes.Entries, &csi.ListVolumesResponse_Entry{
315+
Volume: &csi.Volume{
316+
VolumeId: hpVolume.VolID,
317+
CapacityBytes: hpVolume.VolSize,
318+
},
319+
Status: &csi.ListVolumesResponse_VolumeStatus{
320+
PublishedNodeIds: []string{hpVolume.NodeID},
321+
VolumeCondition: &csi.VolumeCondition{
322+
Abnormal: !healthy,
323+
Message: msg,
324+
},
325+
},
326+
})
327+
}
328+
329+
glog.V(5).Infof("Volumes are: %+v", *volumeRes)
330+
return volumeRes, nil
331+
}
332+
333+
func (cs *controllerServer) ControllerGetVolume(ctx context.Context, req *csi.ControllerGetVolumeRequest) (*csi.ControllerGetVolumeResponse, error) {
334+
volume, ok := hostPathVolumes[req.GetVolumeId()]
335+
if !ok {
336+
return nil, status.Error(codes.NotFound, "The volume not found")
337+
}
338+
339+
healthy, msg := doHealthCheckInControllerSide(req.GetVolumeId())
340+
glog.V(3).Infof("Healthy state: %s Volume: %t", volume.VolName, healthy)
341+
return &csi.ControllerGetVolumeResponse{
342+
Volume: &csi.Volume{
343+
VolumeId: volume.VolID,
344+
CapacityBytes: volume.VolSize,
345+
},
346+
Status: &csi.ControllerGetVolumeResponse_VolumeStatus{
347+
PublishedNodeIds: []string{volume.NodeID},
348+
VolumeCondition: &csi.VolumeCondition{
349+
Abnormal: !healthy,
350+
Message: msg,
351+
},
352+
},
353+
}, nil
283354
}
284355

285356
// getSnapshotPath returns the full path to where the snapshot is stored
@@ -527,10 +598,6 @@ func (cs *controllerServer) ControllerExpandVolume(ctx context.Context, req *csi
527598
}, nil
528599
}
529600

530-
func (cs *controllerServer) ControllerGetVolume(context.Context, *csi.ControllerGetVolumeRequest) (*csi.ControllerGetVolumeResponse, error) {
531-
return nil, status.Error(codes.Unimplemented, "")
532-
}
533-
534601
func convertSnapshot(snap hostPathSnapshot) *csi.ListSnapshotsResponse {
535602
entries := []*csi.ListSnapshotsResponse_Entry{
536603
{

pkg/hostpath/healthcheck.go

Lines changed: 202 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,202 @@
1+
package hostpath
2+
3+
import (
4+
"encoding/json"
5+
"fmt"
6+
"os"
7+
"os/exec"
8+
"path/filepath"
9+
"strings"
10+
11+
"github.com/golang/glog"
12+
fs "k8s.io/kubernetes/pkg/volume/util/fs"
13+
)
14+
15+
const (
16+
podVolumeTargetPath = "/var/lib/kubelet/pods"
17+
csiSignOfVolumeTargetPath = "kubernetes.io~csi/pvc"
18+
)
19+
20+
type MountPointInfo struct {
21+
Target string `json:"target"`
22+
Source string `json:"source"`
23+
FsType string `json:"fstype"`
24+
Options string `json:"options"`
25+
ContainerFileSystem []MountPointInfo `json:"children,omitempty"`
26+
}
27+
28+
type ContainerFileSystem struct {
29+
Children []MountPointInfo `json:"children"`
30+
}
31+
32+
type FileSystems struct {
33+
Filsystem []ContainerFileSystem `json:"filesystems"`
34+
}
35+
36+
func locateCommandPath(commandName string) string {
37+
// default to root
38+
binary := filepath.Join("/", commandName)
39+
for _, path := range []string{"/bin", "/usr/sbin", "/usr/bin"} {
40+
binPath := filepath.Join(path, binary)
41+
if _, err := os.Stat(binPath); err != nil {
42+
continue
43+
}
44+
45+
return binPath
46+
}
47+
48+
return ""
49+
}
50+
51+
func getSourcePath(volumeHandle string) string {
52+
return fmt.Sprintf("%s/%s", dataRoot, volumeHandle)
53+
}
54+
55+
func checkSourcePathExist(volumeHandle string) (bool, error) {
56+
sourcePath := getSourcePath(volumeHandle)
57+
glog.V(3).Infof("Volume: %s Source path is: %s", volumeHandle, sourcePath)
58+
_, err := os.Stat(sourcePath)
59+
if err != nil {
60+
if os.IsNotExist(err) {
61+
return false, nil
62+
}
63+
64+
return false, err
65+
}
66+
67+
return true, nil
68+
}
69+
70+
func parseMountInfo(originalMountInfo []byte) ([]MountPointInfo, error) {
71+
fs := FileSystems{
72+
Filsystem: make([]ContainerFileSystem, 0),
73+
}
74+
75+
if err := json.Unmarshal(originalMountInfo, &fs); err != nil {
76+
return nil, err
77+
}
78+
79+
if len(fs.Filsystem) <= 0 {
80+
return nil, fmt.Errorf("failed to get mount info")
81+
}
82+
83+
return fs.Filsystem[0].Children, nil
84+
}
85+
86+
func checkMountPointExist(sourcePath string) (bool, error) {
87+
cmdPath := locateCommandPath("findmnt")
88+
out, err := exec.Command(cmdPath, "--json").CombinedOutput()
89+
if err != nil {
90+
glog.V(3).Infof("failed to execute command: %+v", cmdPath)
91+
return false, err
92+
}
93+
94+
if len(out) < 1 {
95+
return false, fmt.Errorf("mount point info is nil")
96+
}
97+
98+
mountInfos, err := parseMountInfo([]byte(out))
99+
if err != nil {
100+
return false, fmt.Errorf("failed to parse the mount infos: %+v", err)
101+
}
102+
103+
mountInfosOfPod := MountPointInfo{}
104+
for _, mountInfo := range mountInfos {
105+
if mountInfo.Target == podVolumeTargetPath {
106+
mountInfosOfPod = mountInfo
107+
break
108+
}
109+
}
110+
111+
for _, mountInfo := range mountInfosOfPod.ContainerFileSystem {
112+
if !strings.Contains(mountInfo.Source, sourcePath) {
113+
continue
114+
}
115+
116+
_, err = os.Stat(mountInfo.Target)
117+
if err != nil {
118+
if os.IsNotExist(err) {
119+
return false, nil
120+
}
121+
122+
return false, err
123+
}
124+
125+
return true, nil
126+
}
127+
128+
return false, nil
129+
}
130+
131+
func checkPVCapacityValid(volumeHandle string) (bool, error) {
132+
sourcePath := getSourcePath(volumeHandle)
133+
_, fscapacity, _, _, _, _, err := fs.FsInfo(sourcePath)
134+
if err != nil {
135+
return false, fmt.Errorf("failed to get capacity info: %+v", err)
136+
}
137+
138+
volumeCapacity := hostPathVolumes[volumeHandle].VolSize
139+
glog.V(3).Infof("volume capacity: %+v fs capacity:%+v", volumeCapacity, fscapacity)
140+
return fscapacity >= volumeCapacity, nil
141+
}
142+
143+
func getPVCapacity(volumeHandle string) (int64, int64, int64, error) {
144+
sourcePath := getSourcePath(volumeHandle)
145+
fsavailable, fscapacity, fsused, _, _, _, err := fs.FsInfo(sourcePath)
146+
return fscapacity, fsused, fsavailable, err
147+
}
148+
149+
func checkPVUsage(volumeHandle string) (bool, error) {
150+
sourcePath := getSourcePath(volumeHandle)
151+
fsavailable, _, _, _, _, _, err := fs.FsInfo(sourcePath)
152+
if err != nil {
153+
return false, err
154+
}
155+
156+
glog.V(3).Infof("fs available: %+v", fsavailable)
157+
return fsavailable > 0, nil
158+
}
159+
160+
func doHealthCheckInControllerSide(volumeHandle string) (bool, string) {
161+
spExist, err := checkSourcePathExist(volumeHandle)
162+
if err != nil {
163+
return false, err.Error()
164+
}
165+
166+
if !spExist {
167+
return false, "The source path of the volume doesn't exist"
168+
}
169+
170+
capValid, err := checkPVCapacityValid(volumeHandle)
171+
if err != nil {
172+
return false, err.Error()
173+
}
174+
175+
if !capValid {
176+
return false, "The capacity of volume is greater than actual storage"
177+
}
178+
179+
available, err := checkPVUsage(volumeHandle)
180+
if err != nil {
181+
return false, err.Error()
182+
}
183+
184+
if !available {
185+
return false, "The free space of the volume is insufficient"
186+
}
187+
188+
return true, ""
189+
}
190+
191+
func doHealthCheckInNodeSide(volumeHandle string) (bool, string) {
192+
mpExist, err := checkMountPointExist(volumeHandle)
193+
if err != nil {
194+
return false, err.Error()
195+
}
196+
197+
if !mpExist {
198+
return false, "The volume isn't mounted"
199+
}
200+
201+
return true, ""
202+
}

0 commit comments

Comments
 (0)