Skip to content

Commit 61289cf

Browse files
committed
test: initial approach to pods_exec
1 parent b08fe66 commit 61289cf

File tree

4 files changed

+228
-7
lines changed

4 files changed

+228
-7
lines changed

pkg/kubernetes/kubernetes.go

+13-5
Original file line numberDiff line numberDiff line change
@@ -35,7 +35,8 @@ type Kubernetes struct {
3535
kubeConfigFiles []string
3636
CloseWatchKubeConfig CloseWatchKubeConfig
3737
scheme *runtime.Scheme
38-
parameterCodec *runtime.ParameterCodec
38+
parameterCodec runtime.ParameterCodec
39+
restClient rest.Interface
3940
clientSet kubernetes.Interface
4041
discoveryClient *discovery.DiscoveryClient
4142
deferredDiscoveryRESTMapper *restmapper.DeferredDiscoveryRESTMapper
@@ -47,7 +48,11 @@ func NewKubernetes() (*Kubernetes, error) {
4748
if err != nil {
4849
return nil, err
4950
}
50-
clientSet, err := kubernetes.NewForConfig(cfg)
51+
restClient, err := rest.HTTPClientFor(cfg)
52+
if err != nil {
53+
return nil, err
54+
}
55+
clientSet, err := kubernetes.NewForConfigAndClient(cfg, restClient)
5156
if err != nil {
5257
return nil, err
5358
}
@@ -63,12 +68,11 @@ func NewKubernetes() (*Kubernetes, error) {
6368
if err = v1.AddToScheme(scheme); err != nil {
6469
return nil, err
6570
}
66-
parameterCodec := runtime.NewParameterCodec(scheme)
6771
return &Kubernetes{
6872
cfg: cfg,
6973
kubeConfigFiles: resolveConfig().ConfigAccess().GetLoadingPrecedence(),
7074
scheme: scheme,
71-
parameterCodec: &parameterCodec,
75+
parameterCodec: runtime.NewParameterCodec(scheme),
7276
clientSet: clientSet,
7377
discoveryClient: discoveryClient,
7478
deferredDiscoveryRESTMapper: restmapper.NewDeferredDiscoveryRESTMapper(memory.NewMemCacheClient(discoveryClient)),
@@ -148,7 +152,11 @@ func resolveClientConfig() (*rest.Config, error) {
148152
if err == nil && inClusterConfig != nil {
149153
return inClusterConfig, nil
150154
}
151-
return resolveConfig().ClientConfig()
155+
cfg, err := resolveConfig().ClientConfig()
156+
if cfg != nil && cfg.UserAgent == "" {
157+
cfg.UserAgent = rest.DefaultKubernetesUserAgent()
158+
}
159+
return cfg, err
152160
}
153161

154162
func configuredNamespace() string {

pkg/kubernetes/mock_server_test.go

+165
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,165 @@
1+
package kubernetes
2+
3+
import (
4+
"encoding/json"
5+
"errors"
6+
"io"
7+
v1 "k8s.io/api/core/v1"
8+
apierrors "k8s.io/apimachinery/pkg/api/errors"
9+
"k8s.io/apimachinery/pkg/runtime"
10+
"k8s.io/apimachinery/pkg/runtime/serializer"
11+
"k8s.io/apimachinery/pkg/util/httpstream"
12+
"k8s.io/apimachinery/pkg/util/httpstream/spdy"
13+
"k8s.io/client-go/kubernetes/fake"
14+
"k8s.io/client-go/rest"
15+
"net/http"
16+
"net/http/httptest"
17+
)
18+
19+
type MockServer struct {
20+
server *httptest.Server
21+
config *rest.Config
22+
restClient *rest.RESTClient
23+
restHandlers []http.HandlerFunc
24+
clientSet *fake.Clientset
25+
parameterCodec runtime.ParameterCodec
26+
}
27+
28+
func NewMockServer() *MockServer {
29+
ms := &MockServer{
30+
clientSet: fake.NewClientset(),
31+
}
32+
scheme := runtime.NewScheme()
33+
codecs := serializer.NewCodecFactory(scheme)
34+
ms.parameterCodec = runtime.NewParameterCodec(scheme)
35+
ms.server = httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, req *http.Request) {
36+
for _, handler := range ms.restHandlers {
37+
handler(w, req)
38+
}
39+
}))
40+
ms.config = &rest.Config{
41+
Host: ms.server.URL,
42+
APIPath: "/api",
43+
ContentConfig: rest.ContentConfig{
44+
NegotiatedSerializer: codecs,
45+
ContentType: runtime.ContentTypeJSON,
46+
GroupVersion: &v1.SchemeGroupVersion,
47+
},
48+
}
49+
ms.restClient, _ = rest.RESTClientFor(ms.config)
50+
ms.restHandlers = make([]http.HandlerFunc, 0)
51+
return ms
52+
}
53+
54+
func (m *MockServer) Close() {
55+
m.server.Close()
56+
}
57+
58+
func (m *MockServer) ClientSet() *fake.Clientset {
59+
return m.clientSet
60+
}
61+
62+
func (m *MockServer) Handle(handler http.Handler) {
63+
m.restHandlers = append(m.restHandlers, handler.ServeHTTP)
64+
}
65+
66+
func (m *MockServer) NewKubernetes() *Kubernetes {
67+
return &Kubernetes{
68+
cfg: m.config,
69+
restClient: m.restClient,
70+
clientSet: m.clientSet,
71+
parameterCodec: m.parameterCodec,
72+
}
73+
}
74+
75+
type streamAndReply struct {
76+
httpstream.Stream
77+
replySent <-chan struct{}
78+
}
79+
80+
type streamContext struct {
81+
conn io.Closer
82+
stdinStream io.ReadCloser
83+
stdoutStream io.WriteCloser
84+
stderrStream io.WriteCloser
85+
writeStatus func(status *apierrors.StatusError) error
86+
}
87+
88+
type StreamOptions struct {
89+
Stdin io.Reader
90+
Stdout io.Writer
91+
Stderr io.Writer
92+
}
93+
94+
func v4WriteStatusFunc(stream io.Writer) func(status *apierrors.StatusError) error {
95+
return func(status *apierrors.StatusError) error {
96+
bs, err := json.Marshal(status.Status())
97+
if err != nil {
98+
return err
99+
}
100+
_, err = stream.Write(bs)
101+
return err
102+
}
103+
}
104+
func createHTTPStreams(w http.ResponseWriter, req *http.Request, opts *StreamOptions) (*streamContext, error) {
105+
_, err := httpstream.Handshake(req, w, []string{"v4.channel.k8s.io"})
106+
if err != nil {
107+
return nil, err
108+
}
109+
110+
upgrader := spdy.NewResponseUpgrader()
111+
streamCh := make(chan streamAndReply)
112+
conn := upgrader.UpgradeResponse(w, req, func(stream httpstream.Stream, replySent <-chan struct{}) error {
113+
streamCh <- streamAndReply{Stream: stream, replySent: replySent}
114+
return nil
115+
})
116+
ctx := &streamContext{
117+
conn: conn,
118+
}
119+
120+
// wait for stream
121+
replyChan := make(chan struct{}, 4)
122+
defer close(replyChan)
123+
receivedStreams := 0
124+
expectedStreams := 1
125+
if opts.Stdout != nil {
126+
expectedStreams++
127+
}
128+
if opts.Stdin != nil {
129+
expectedStreams++
130+
}
131+
if opts.Stderr != nil {
132+
expectedStreams++
133+
}
134+
WaitForStreams:
135+
for {
136+
select {
137+
case stream := <-streamCh:
138+
streamType := stream.Headers().Get(v1.StreamType)
139+
switch streamType {
140+
case v1.StreamTypeError:
141+
replyChan <- struct{}{}
142+
ctx.writeStatus = v4WriteStatusFunc(stream)
143+
case v1.StreamTypeStdout:
144+
replyChan <- struct{}{}
145+
ctx.stdoutStream = stream
146+
case v1.StreamTypeStdin:
147+
replyChan <- struct{}{}
148+
ctx.stdinStream = stream
149+
case v1.StreamTypeStderr:
150+
replyChan <- struct{}{}
151+
ctx.stderrStream = stream
152+
default:
153+
// add other stream ...
154+
return nil, errors.New("unimplemented stream type")
155+
}
156+
case <-replyChan:
157+
receivedStreams++
158+
if receivedStreams == expectedStreams {
159+
break WaitForStreams
160+
}
161+
}
162+
}
163+
164+
return ctx, nil
165+
}

pkg/kubernetes/pods.go

+3-2
Original file line numberDiff line numberDiff line change
@@ -214,12 +214,13 @@ func (k *Kubernetes) PodsExec(ctx context.Context, namespace, name, container st
214214
func (k *Kubernetes) createExecutor(namespace, name string, podExecOptions *v1.PodExecOptions) (remotecommand.Executor, error) {
215215
// Compute URL
216216
// https://github.com/kubernetes/kubectl/blob/5366de04e168bcbc11f5e340d131a9ca8b7d0df4/pkg/cmd/exec/exec.go#L382-L397
217-
req := k.clientSet.CoreV1().RESTClient().Post().
217+
req := k.restClient.
218+
Post().
218219
Resource("pods").
219220
Namespace(namespace).
220221
Name(name).
221222
SubResource("exec")
222-
req.VersionedParams(podExecOptions, *k.parameterCodec)
223+
req.VersionedParams(podExecOptions, k.parameterCodec)
223224
spdyExec, err := remotecommand.NewSPDYExecutor(k.cfg, "POST", req.URL())
224225
if err != nil {
225226
return nil, err

pkg/kubernetes/pods_test.go

+47
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,47 @@
1+
package kubernetes
2+
3+
import (
4+
"bytes"
5+
"context"
6+
"io"
7+
v1 "k8s.io/api/core/v1"
8+
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
9+
"net/http"
10+
"testing"
11+
)
12+
13+
func TestPodsExec(t *testing.T) {
14+
mockServer := NewMockServer()
15+
defer mockServer.Close()
16+
_ = mockServer.ClientSet().Tracker().Add(&v1.Pod{
17+
ObjectMeta: metav1.ObjectMeta{
18+
Namespace: "default",
19+
Name: "pod-to-exec",
20+
},
21+
Spec: v1.PodSpec{Containers: []v1.Container{{Name: "container-to-exec"}}},
22+
})
23+
mockServer.Handle(http.HandlerFunc(func(w http.ResponseWriter, req *http.Request) {
24+
var stdin, stdout bytes.Buffer
25+
ctx, err := createHTTPStreams(w, req, &StreamOptions{
26+
Stdin: &stdin,
27+
Stdout: &stdout,
28+
})
29+
if err != nil {
30+
w.WriteHeader(http.StatusInternalServerError)
31+
_, _ = w.Write([]byte(err.Error()))
32+
return
33+
}
34+
defer ctx.conn.Close()
35+
if req.URL.Path == "/api/v1/namespaces/default/pods/pod-to-exec/exec" {
36+
_, _ = io.WriteString(ctx.stdoutStream, "total 0\n")
37+
}
38+
}))
39+
k8s := mockServer.NewKubernetes()
40+
out, err := k8s.PodsExec(context.Background(), "default", "pod-to-exec", "", []string{"ls", "-l"})
41+
if err != nil {
42+
t.Fatalf("unexpected error: %v", err)
43+
}
44+
if out != "total 0\n" {
45+
t.Fatalf("unexpected output: %s", out)
46+
}
47+
}

0 commit comments

Comments
 (0)