|
15 | 15 | package framework
|
16 | 16 |
|
17 | 17 | import (
|
| 18 | + "bytes" |
18 | 19 | "fmt"
|
| 20 | + "io" |
| 21 | + "k8s.io/apimachinery/pkg/labels" |
| 22 | + "k8s.io/client-go/kubernetes/scheme" |
| 23 | + "k8s.io/client-go/tools/clientcmd" |
| 24 | + "k8s.io/client-go/tools/remotecommand" |
| 25 | + "os" |
| 26 | + "path/filepath" |
19 | 27 | "strings"
|
20 | 28 | "time"
|
21 | 29 |
|
@@ -873,3 +881,143 @@ func (j *PVCTestJig) CheckMultiplePodReadWrite(namespace string, pvcName string,
|
873 | 881 | By("Creating Pod that can read contents of existing file")
|
874 | 882 | j.NewPodForCSIFSSRead(string(uuid2), namespace, pvcName, fileName)
|
875 | 883 | }
|
| 884 | + |
| 885 | +func (j *PVCTestJig) CheckDataPersistenceWithDeployment(pvcName string, ns string){ |
| 886 | + nodes, err := j.KubeClient.CoreV1().Nodes().List(metav1.ListOptions{}) |
| 887 | + |
| 888 | + if err!= nil{ |
| 889 | + Failf("Error getting list of nodes: %v", err) |
| 890 | + } |
| 891 | + |
| 892 | + if len(nodes.Items) == 0{ |
| 893 | + Failf("No worker nodes are present in the cluster") |
| 894 | + } |
| 895 | + |
| 896 | + nodeSelectorLabels := map[string]string{} |
| 897 | + schedulableNodeFound := false |
| 898 | + |
| 899 | + for _, node := range nodes.Items { |
| 900 | + if node.Spec.Unschedulable == false { |
| 901 | + schedulableNodeFound = true |
| 902 | + nodeSelectorLabels = node.Labels |
| 903 | + break |
| 904 | + } |
| 905 | + } |
| 906 | + |
| 907 | + if !schedulableNodeFound{ |
| 908 | + Failf("No schedulable nodes found") |
| 909 | + } |
| 910 | + |
| 911 | + podRunningCommand := " while true; do true; done;" |
| 912 | + |
| 913 | + dataWritten := "Data written" |
| 914 | + |
| 915 | + writeCommand := "echo \"" + dataWritten +"\" >> /data/out.txt;" |
| 916 | + readCommand := "cat /data/out.txt" |
| 917 | + |
| 918 | + By("Creating a deployment") |
| 919 | + deploymentName := j.createDeploymentOnNodeAndWait(podRunningCommand, pvcName, ns, "data-persistence-deployment", 1, nodeSelectorLabels) |
| 920 | + |
| 921 | + deployment, err := j.KubeClient.AppsV1().Deployments(ns).Get(deploymentName, metav1.GetOptions{}) |
| 922 | + |
| 923 | + if err!= nil { |
| 924 | + Failf("Error while fetching deployment %v: %v", deploymentName, err) |
| 925 | + } |
| 926 | + |
| 927 | + set := labels.Set(deployment.Spec.Selector.MatchLabels) |
| 928 | + pods, err := j.KubeClient.CoreV1().Pods(ns).List(metav1.ListOptions{LabelSelector: set.AsSelector().String()}) |
| 929 | + |
| 930 | + if err != nil { |
| 931 | + Failf("Error getting list of pods: %v", err) |
| 932 | + } |
| 933 | + |
| 934 | + podName := pods.Items[0].Name |
| 935 | + |
| 936 | + By("Writing to the volume using the pod") |
| 937 | + _, _, err = j.ExecCommandOnPod(podName, writeCommand, nil, ns) |
| 938 | + |
| 939 | + if err!= nil{ |
| 940 | + Failf("Error executing write command a pod: %v", err) |
| 941 | + } |
| 942 | + |
| 943 | + By("Deleting the pod used to write to the volume") |
| 944 | + err = j.KubeClient.CoreV1().Pods(ns).Delete(podName, &metav1.DeleteOptions{}) |
| 945 | + |
| 946 | + if err!= nil{ |
| 947 | + Failf("Error deleting pod: %v", err) |
| 948 | + } |
| 949 | + |
| 950 | + By("Waiting for pod to be restarted") |
| 951 | + err = j.waitTimeoutForDeploymentAvailable(deploymentName, ns, deploymentAvailableTimeout, 1) |
| 952 | + |
| 953 | + if err!= nil { |
| 954 | + Failf("Error waiting for deployment to become available again: %v", err) |
| 955 | + } |
| 956 | + |
| 957 | + pods, err = j.KubeClient.CoreV1().Pods(ns).List(metav1.ListOptions{LabelSelector: set.AsSelector().String()}) |
| 958 | + |
| 959 | + if err != nil { |
| 960 | + Failf("Error getting list of pods: %v", err) |
| 961 | + } |
| 962 | + |
| 963 | + podName = pods.Items[0].Name |
| 964 | + |
| 965 | + By("Reading from the volume using the pod and checking data integrity") |
| 966 | + stdout, _, err := j.ExecCommandOnPod(podName, readCommand, nil, ns) |
| 967 | + |
| 968 | + if err!= nil{ |
| 969 | + Failf("Error executing write command a pod: %v", err) |
| 970 | + } |
| 971 | + |
| 972 | + if dataWritten != strings.TrimSpace(stdout){ |
| 973 | + Failf("Written data not found on the volume, written: %v, found: %v", dataWritten, strings.TrimSpace(stdout)) |
| 974 | + } |
| 975 | + |
| 976 | +} |
| 977 | + |
| 978 | +func (j *PVCTestJig) ExecCommandOnPod(podName string, command string, stdin io.Reader, ns string) (string, string, error) { |
| 979 | + cmd := []string{ |
| 980 | + "sh", |
| 981 | + "-c", |
| 982 | + command, |
| 983 | + } |
| 984 | + |
| 985 | + req := j.KubeClient.CoreV1().RESTClient().Post().Resource("pods").Name(podName).Namespace(ns).SubResource("exec") |
| 986 | + option := &v1.PodExecOptions{ |
| 987 | + Command: cmd, |
| 988 | + Stdin: true, |
| 989 | + Stdout: true, |
| 990 | + Stderr: true, |
| 991 | + TTY: true, |
| 992 | + } |
| 993 | + if stdin == nil { |
| 994 | + option.Stdin = false |
| 995 | + } |
| 996 | + req.VersionedParams( |
| 997 | + option, |
| 998 | + scheme.ParameterCodec, |
| 999 | + ) |
| 1000 | + |
| 1001 | + kubeconfig := filepath.Join(os.Getenv("HOME"), ".kube", "config_amd") |
| 1002 | + config, err := clientcmd.BuildConfigFromFlags("", kubeconfig) |
| 1003 | + |
| 1004 | + if err != nil { |
| 1005 | + return "", "", fmt.Errorf("error while retrieving kubeconfig: %v", err) |
| 1006 | + } |
| 1007 | + |
| 1008 | + exec, err := remotecommand.NewSPDYExecutor(config, "POST", req.URL()) |
| 1009 | + if err != nil { |
| 1010 | + return "", "", fmt.Errorf("error while creating Executor: %v", err) |
| 1011 | + } |
| 1012 | + var stdout, stderr bytes.Buffer |
| 1013 | + err = exec.Stream(remotecommand.StreamOptions{ |
| 1014 | + Stdin: stdin, |
| 1015 | + Stdout: &stdout, |
| 1016 | + Stderr: &stderr, |
| 1017 | + }) |
| 1018 | + if err != nil { |
| 1019 | + return "", "", fmt.Errorf("error in Stream: %v", err) |
| 1020 | + } |
| 1021 | + |
| 1022 | + return stdout.String(), stderr.String(), nil |
| 1023 | +} |
0 commit comments