Skip to content

Commit 0b1617c

Browse files
committed
Clean error handling in port-forward
This commit introduces: 1. Cleanups in port-forwarding error handling code, which ensures that we only compare lowercased text always. 2. E2E verifying that when a pod is removed a port-forward is stopped. Signed-off-by: Maciej Szulik <[email protected]>
1 parent dbe6b66 commit 0b1617c

File tree

2 files changed

+146
-40
lines changed

2 files changed

+146
-40
lines changed

staging/src/k8s.io/client-go/tools/portforward/portforward.go

+14-8
Original file line numberDiff line numberDiff line change
@@ -37,7 +37,13 @@ import (
3737
// TODO move to API machinery and re-unify with kubelet/server/portfoward
3838
const PortForwardProtocolV1Name = "portforward.k8s.io"
3939

40-
var ErrLostConnectionToPod = errors.New("lost connection to pod")
40+
var (
41+
// error returned whenever we lost connection to a pod
42+
ErrLostConnectionToPod = errors.New("lost connection to pod")
43+
44+
// set of error we're expecting during port-forwarding
45+
networkClosedError = "use of closed network connection"
46+
)
4147

4248
// PortForwarder knows how to listen for local connections and forward them to
4349
// a remote pod via an upgraded HTTP request.
@@ -312,7 +318,7 @@ func (pf *PortForwarder) waitForConnection(listener net.Listener, port Forwarded
312318
conn, err := listener.Accept()
313319
if err != nil {
314320
// TODO consider using something like https://github.com/hydrogen18/stoppableListener?
315-
if !strings.Contains(strings.ToLower(err.Error()), "use of closed network connection") {
321+
if !strings.Contains(strings.ToLower(err.Error()), networkClosedError) {
316322
runtime.HandleError(fmt.Errorf("error accepting connection on port %d: %v", port.Local, err))
317323
}
318324
return
@@ -381,7 +387,7 @@ func (pf *PortForwarder) handleConnection(conn net.Conn, port ForwardedPort) {
381387

382388
go func() {
383389
// Copy from the remote side to the local port.
384-
if _, err := io.Copy(conn, dataStream); err != nil && !strings.Contains(err.Error(), "use of closed network connection") {
390+
if _, err := io.Copy(conn, dataStream); err != nil && !strings.Contains(strings.ToLower(err.Error()), networkClosedError) {
385391
runtime.HandleError(fmt.Errorf("error copying from remote stream to local connection: %v", err))
386392
}
387393

@@ -394,7 +400,7 @@ func (pf *PortForwarder) handleConnection(conn net.Conn, port ForwardedPort) {
394400
defer dataStream.Close()
395401

396402
// Copy from the local port to the remote side.
397-
if _, err := io.Copy(dataStream, conn); err != nil && !strings.Contains(err.Error(), "use of closed network connection") {
403+
if _, err := io.Copy(dataStream, conn); err != nil && !strings.Contains(strings.ToLower(err.Error()), networkClosedError) {
398404
runtime.HandleError(fmt.Errorf("error copying from local connection to remote stream: %v", err))
399405
// break out of the select below without waiting for the other copy to finish
400406
close(localError)
@@ -406,10 +412,10 @@ func (pf *PortForwarder) handleConnection(conn net.Conn, port ForwardedPort) {
406412
case <-remoteDone:
407413
case <-localError:
408414
}
409-
/*
410-
reset dataStream to discard any unsent data, preventing port forwarding from being blocked.
411-
we must reset dataStream before waiting on errorChan, otherwise, the blocking data will affect errorStream and cause <-errorChan to block indefinitely.
412-
*/
415+
416+
// reset dataStream to discard any unsent data, preventing port forwarding from being blocked.
417+
// we must reset dataStream before waiting on errorChan, otherwise,
418+
// the blocking data will affect errorStream and cause <-errorChan to block indefinitely.
413419
_ = dataStream.Reset()
414420

415421
// always expect something on errorChan (it may be nil)

test/e2e/kubectl/portforward.go

+132-32
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,9 @@ import (
3636
"golang.org/x/net/websocket"
3737
v1 "k8s.io/api/core/v1"
3838
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
39+
"k8s.io/apimachinery/pkg/util/intstr"
40+
utilnet "k8s.io/apimachinery/pkg/util/net"
41+
"k8s.io/apimachinery/pkg/util/rand"
3942
"k8s.io/apimachinery/pkg/util/wait"
4043
"k8s.io/kubernetes/test/e2e/framework"
4144
e2ekubectl "k8s.io/kubernetes/test/e2e/framework/kubectl"
@@ -124,6 +127,41 @@ func pfPod(expectedClientData, chunks, chunkSize, chunkIntervalMillis string, bi
124127
}
125128
}
126129

130+
func pfNeverReadRequestBodyPod() *v1.Pod {
131+
return &v1.Pod{
132+
ObjectMeta: metav1.ObjectMeta{
133+
Name: "issue-74551",
134+
},
135+
Spec: v1.PodSpec{
136+
RestartPolicy: v1.RestartPolicyNever,
137+
Containers: []v1.Container{
138+
{
139+
Name: "server",
140+
Image: imageutils.GetE2EImage(imageutils.Agnhost),
141+
Args: []string{
142+
"netexec",
143+
"--http-port=80",
144+
},
145+
ReadinessProbe: &v1.Probe{
146+
ProbeHandler: v1.ProbeHandler{
147+
HTTPGet: &v1.HTTPGetAction{
148+
Path: "/healthz",
149+
Port: intstr.IntOrString{
150+
IntVal: int32(80),
151+
},
152+
Scheme: v1.URISchemeHTTP,
153+
},
154+
},
155+
InitialDelaySeconds: 5,
156+
TimeoutSeconds: 60,
157+
PeriodSeconds: 1,
158+
},
159+
},
160+
},
161+
},
162+
}
163+
}
164+
127165
func testWebServerPod() *v1.Pod {
128166
return &v1.Pod{
129167
ObjectMeta: metav1.ObjectMeta{
@@ -525,66 +563,107 @@ var _ = SIGDescribe("Kubectl Port forwarding", func() {
525563
})
526564
})
527565

566+
ginkgo.Describe("with a pod being removed", func() {
567+
ginkgo.It("should stop port-forwarding", func(ctx context.Context) {
568+
ginkgo.By("Creating the target pod")
569+
pod := pfNeverReadRequestBodyPod()
570+
_, err := f.ClientSet.CoreV1().Pods(f.Namespace.Name).Create(ctx, pod, metav1.CreateOptions{})
571+
framework.ExpectNoError(err, "couldn't create pod")
572+
573+
err = e2epod.WaitTimeoutForPodReadyInNamespace(ctx, f.ClientSet, pod.Name, f.Namespace.Name, framework.PodStartTimeout)
574+
framework.ExpectNoError(err, "pod did not start running")
575+
576+
ginkgo.By("Running 'kubectl port-forward'")
577+
cmd := runPortForward(f.Namespace.Name, pod.Name, 80)
578+
defer cmd.Stop()
579+
580+
ginkgo.By("Running port-forward client")
581+
reqChan := make(chan bool)
582+
errorChan := make(chan error)
583+
go func() {
584+
defer ginkgo.GinkgoRecover()
585+
586+
// try to mock a big request, which should take some time
587+
for sentBodySize := 0; sentBodySize < 1024*1024*1024; {
588+
size := rand.Intn(4 * 1024 * 1024)
589+
url := fmt.Sprintf("http://localhost:%d/header", cmd.port)
590+
_, err := post(url, strings.NewReader(strings.Repeat("x", size)), nil)
591+
if err != nil {
592+
errorChan <- err
593+
}
594+
ginkgo.By(fmt.Sprintf("Sent %d chunk of data", sentBodySize))
595+
if sentBodySize == 0 {
596+
close(reqChan)
597+
}
598+
sentBodySize += size
599+
}
600+
}()
601+
602+
ginkgo.By("Remove the forwarded pod after the first client request")
603+
<-reqChan
604+
e2epod.DeletePodOrFail(ctx, f.ClientSet, f.Namespace.Name, pod.Name)
605+
606+
ginkgo.By("Wait for client being interrupted")
607+
select {
608+
case err = <-errorChan:
609+
case <-time.After(e2epod.DefaultPodDeletionTimeout):
610+
}
611+
612+
ginkgo.By("Check the client error")
613+
gomega.Expect(err).To(gomega.HaveOccurred())
614+
gomega.Expect(err.Error()).To(gomega.Or(gomega.ContainSubstring("connection reset by peer"), gomega.ContainSubstring("EOF")))
615+
616+
ginkgo.By("Check kubectl port-forward exit code")
617+
gomega.Expect(cmd.cmd.ProcessState.ExitCode()).To(gomega.BeNumerically("<", 0), "kubectl port-forward should finish with non-zero exit code")
618+
})
619+
})
620+
528621
ginkgo.Describe("Shutdown client connection while the remote stream is writing data to the port-forward connection", func() {
529622
ginkgo.It("port-forward should keep working after detect broken connection", func(ctx context.Context) {
530623
ginkgo.By("Creating the target pod")
531624
pod := testWebServerPod()
532-
if _, err := f.ClientSet.CoreV1().Pods(f.Namespace.Name).Create(ctx, pod, metav1.CreateOptions{}); err != nil {
533-
framework.Failf("Couldn't create pod: %v", err)
534-
}
535-
if err := e2epod.WaitTimeoutForPodReadyInNamespace(ctx, f.ClientSet, pod.Name, f.Namespace.Name, framework.PodStartTimeout); err != nil {
536-
framework.Failf("Pod did not start running: %v", err)
537-
}
625+
_, err := f.ClientSet.CoreV1().Pods(f.Namespace.Name).Create(ctx, pod, metav1.CreateOptions{})
626+
framework.ExpectNoError(err, "couldn't create pod")
627+
628+
err = e2epod.WaitTimeoutForPodReadyInNamespace(ctx, f.ClientSet, pod.Name, f.Namespace.Name, framework.PodStartTimeout)
629+
framework.ExpectNoError(err, "pod did not start running")
538630

539631
ginkgo.By("Running 'kubectl port-forward'")
540632
cmd := runPortForward(f.Namespace.Name, pod.Name, 80)
541633
defer cmd.Stop()
542634

543635
ginkgo.By("Send a http request to verify port-forward working")
544636
client := http.Client{
545-
Timeout: 5 * time.Second,
637+
Timeout: 10 * time.Second,
546638
}
547639
resp, err := client.Get(fmt.Sprintf("http://127.0.0.1:%d/", cmd.port))
548-
if err != nil {
549-
framework.Failf("Couldn't get http response from port-forward: %v", err)
550-
}
551-
if resp.StatusCode != http.StatusOK {
552-
framework.Failf("Expected status code %d, got %d", http.StatusOK, resp.StatusCode)
553-
}
640+
framework.ExpectNoError(err, "couldn't get http response from port-forward")
641+
gomega.Expect(resp.StatusCode).To(gomega.Equal(http.StatusOK), "unexpected status code")
554642

555643
ginkgo.By("Dialing the local port")
556644
conn, err := net.Dial("tcp", fmt.Sprintf("127.0.0.1:%d", cmd.port))
557-
if err != nil {
558-
framework.Failf("Couldn't connect to port %d: %v", cmd.port, err)
559-
}
645+
framework.ExpectNoError(err, "couldn't connect to port %d", cmd.port)
560646

561647
// use raw tcp connection to emulate client close connection without reading response
562-
ginkgo.By("Request agohost binary file (40MB+)")
648+
ginkgo.By("Request agnhost binary file (40MB+)")
563649
requestLines := []string{"GET /agnhost HTTP/1.1", "Host: localhost", ""}
564650
for _, line := range requestLines {
565-
if _, err := conn.Write(append([]byte(line), []byte("\r\n")...)); err != nil {
566-
framework.Failf("Couldn't write http request to local connection: %v", err)
567-
}
651+
_, err := conn.Write(append([]byte(line), []byte("\r\n")...))
652+
framework.ExpectNoError(err, "couldn't write http request to local connection")
568653
}
569654

570655
ginkgo.By("Read only one byte from the connection")
571-
if _, err := conn.Read(make([]byte, 1)); err != nil {
572-
framework.Logf("Couldn't reading from the local connection: %v", err)
573-
}
656+
_, err = conn.Read(make([]byte, 1))
657+
framework.ExpectNoError(err, "couldn't read from the local connection")
574658

575659
ginkgo.By("Close client connection without reading remain data")
576-
if err := conn.Close(); err != nil {
577-
framework.Failf("Couldn't close local connection: %v", err)
578-
}
660+
err = conn.Close()
661+
framework.ExpectNoError(err, "couldn't close local connection")
579662

580663
ginkgo.By("Send another http request through port-forward again")
581664
resp, err = client.Get(fmt.Sprintf("http://127.0.0.1:%d/", cmd.port))
582-
if err != nil {
583-
framework.Failf("Couldn't get http response from port-forward: %v", err)
584-
}
585-
if resp.StatusCode != http.StatusOK {
586-
framework.Failf("Expected status code %d, got %d", http.StatusOK, resp.StatusCode)
587-
}
665+
framework.ExpectNoError(err, "couldn't get http response from port-forward")
666+
gomega.Expect(resp.StatusCode).To(gomega.Equal(http.StatusOK), "unexpected status code")
588667
})
589668
})
590669
})
@@ -615,3 +694,24 @@ func wsWrite(conn *websocket.Conn, channel byte, data []byte) error {
615694
err := websocket.Message.Send(conn, frame)
616695
return err
617696
}
697+
698+
func post(url string, reader io.Reader, transport *http.Transport) (string, error) {
699+
if transport == nil {
700+
transport = utilnet.SetTransportDefaults(&http.Transport{})
701+
}
702+
client := &http.Client{Transport: transport}
703+
req, err := http.NewRequest(http.MethodPost, url, reader)
704+
if err != nil {
705+
return "", err
706+
}
707+
resp, err := client.Do(req)
708+
if err != nil {
709+
return "", err
710+
}
711+
defer resp.Body.Close() //nolint: errcheck
712+
body, err := io.ReadAll(resp.Body)
713+
if err != nil {
714+
return "", err
715+
}
716+
return string(body), nil
717+
}

0 commit comments

Comments
 (0)