Skip to content

Commit 5ad85b0

Browse files
utam0kutam0k
authored and
utam0k
committed
test: Handle errors relate network in port-fowarding
1 parent 2da5eef commit 5ad85b0

File tree

1 file changed

+78
-39
lines changed

1 file changed

+78
-39
lines changed

test/pkg/integration/integration.go

+78-39
Original file line numberDiff line numberDiff line change
@@ -20,9 +20,12 @@ import (
2020
"runtime"
2121
"strconv"
2222
"strings"
23+
"syscall"
2324
"time"
2425

2526
"golang.org/x/xerrors"
27+
"google.golang.org/grpc/codes"
28+
"google.golang.org/grpc/status"
2629
corev1 "k8s.io/api/core/v1"
2730
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
2831
"k8s.io/apimachinery/pkg/runtime/schema"
@@ -39,6 +42,11 @@ import (
3942
"github.com/gitpod-io/gitpod/test/pkg/integration/common"
4043
)
4144

45+
const (
46+
connectFailureMaxTries = 5
47+
errorDialingBackendEOF = "error dialing backend: EOF"
48+
)
49+
4250
type PodExec struct {
4351
RestConfig *rest.Config
4452
*kubernetes.Clientset
@@ -57,16 +65,16 @@ func NewPodExec(config rest.Config, clientset *kubernetes.Clientset) *PodExec {
5765
func (p *PodExec) PodCopyFile(src string, dst string, containername string) (*bytes.Buffer, *bytes.Buffer, *bytes.Buffer, error) {
5866
var in, out, errOut *bytes.Buffer
5967
var ioStreams genericclioptions.IOStreams
60-
for {
68+
for count := 0; ; count++ {
6169
ioStreams, in, out, errOut = genericclioptions.NewTestIOStreams()
6270
copyOptions := kubectlcp.NewCopyOptions(ioStreams)
6371
copyOptions.Clientset = p.Clientset
6472
copyOptions.ClientConfig = p.RestConfig
6573
copyOptions.Container = containername
6674
err := copyOptions.Run([]string{src, dst})
6775
if err != nil {
68-
if !errors.Is(err, io.EOF) {
69-
return nil, nil, nil, fmt.Errorf("Could not run copy operation: %v", err)
76+
if !shouldRetry(count, err) {
77+
return nil, nil, nil, fmt.Errorf("could not run copy operation: %v", err)
7078
}
7179
time.Sleep(10 * time.Second)
7280
continue
@@ -76,6 +84,13 @@ func (p *PodExec) PodCopyFile(src string, dst string, containername string) (*by
7684
return in, out, errOut, nil
7785
}
7886

87+
func shouldRetry(count int, err error) bool {
88+
if count < connectFailureMaxTries {
89+
return err.Error() == errorDialingBackendEOF
90+
}
91+
return false
92+
}
93+
7994
func (p *PodExec) ExecCmd(command []string, podname string, namespace string, containername string) (*bytes.Buffer, *bytes.Buffer, *bytes.Buffer, error) {
8095
ioStreams, in, out, errOut := genericclioptions.NewTestIOStreams()
8196
execOptions := &kubectlexec.ExecOptions{
@@ -187,6 +202,43 @@ func Instrument(component ComponentType, agentName string, namespace string, kub
187202
return nil, closer, err
188203
}
189204

205+
var (
206+
res *rpc.Client
207+
cl []func() error
208+
)
209+
for i := 0; i < connectFailureMaxTries; i++ {
210+
res, cl, err = portfw(podExec, kubeconfig, podName, namespace, containerName, tgtFN, options)
211+
if err == nil {
212+
closer = append(closer, cl...)
213+
break
214+
}
215+
for _, c := range cl {
216+
_ = c()
217+
}
218+
time.Sleep(5 * time.Second)
219+
}
220+
if err != nil {
221+
return nil, closer, err
222+
}
223+
224+
closer = append(closer, func() error {
225+
err := res.Call(MethodTestAgentShutdown, new(TestAgentShutdownRequest), new(TestAgentShutdownResponse))
226+
if err != nil && strings.Contains(err.Error(), "connection is shut down") {
227+
return nil
228+
}
229+
230+
if err != nil {
231+
return xerrors.Errorf("cannot shutdown agent: %w", err)
232+
}
233+
return nil
234+
})
235+
236+
return res, closer, nil
237+
}
238+
239+
func portfw(podExec *PodExec, kubeconfig string, podName string, namespace string, containerName string, tgtFN string, options instrumentOptions) (*rpc.Client, []func() error, error) {
240+
var closer []func() error
241+
190242
localAgentPort, err := getFreePort()
191243
if err != nil {
192244
return nil, closer, err
@@ -198,22 +250,13 @@ func Instrument(component ComponentType, agentName string, namespace string, kub
198250
}
199251

200252
execErrs := make(chan error, 1)
201-
execF := func() {
253+
go func() {
202254
defer close(execErrs)
203255
_, _, _, execErr := podExec.ExecCmd(cmd, podName, namespace, containerName)
204256
if execErr != nil {
205257
execErrs <- execErr
206258
}
207-
}
208-
go execF()
209-
select {
210-
case err := <-execErrs:
211-
if err != nil {
212-
return nil, closer, err
213-
}
214-
return nil, closer, fmt.Errorf("agent stopped unexepectedly")
215-
case <-time.After(30 * time.Second):
216-
}
259+
}()
217260

218261
ctx, cancel := context.WithCancel(context.Background())
219262
defer func() {
@@ -226,24 +269,32 @@ func Instrument(component ComponentType, agentName string, namespace string, kub
226269
cancel()
227270
}
228271
}()
229-
230-
fwdReady, fwdErr := common.ForwardPortOfPod(ctx, kubeconfig, namespace, podName, strconv.Itoa(localAgentPort))
231-
select {
232-
case <-fwdReady:
233-
case err := <-execErrs:
234-
if err != nil {
235-
return nil, closer, err
236-
}
237-
case err := <-fwdErr:
238-
if err != nil {
239-
return nil, closer, err
272+
L:
273+
for {
274+
fwdReady, fwdErr := common.ForwardPortOfPod(ctx, kubeconfig, namespace, podName, strconv.Itoa(localAgentPort))
275+
select {
276+
case <-fwdReady:
277+
break L
278+
case err := <-execErrs:
279+
if err != nil {
280+
return nil, closer, err
281+
}
282+
case err := <-fwdErr:
283+
var eno syscall.Errno
284+
if errors.Is(err, io.EOF) || (errors.As(err, &eno) && eno == syscall.ECONNREFUSED) {
285+
time.Sleep(10 * time.Second)
286+
} else if st, ok := status.FromError(err); ok && st.Code() == codes.Unavailable {
287+
time.Sleep(10 * time.Second)
288+
} else if err != nil {
289+
return nil, closer, err
290+
}
240291
}
241292
}
242293

243294
var res *rpc.Client
244295
var lastError error
245-
waitErr := wait.PollImmediate(5*time.Second, 3*time.Minute, func() (bool, error) {
246-
res, lastError = rpc.DialHTTP("tcp", fmt.Sprintf("localhost:%d", localAgentPort))
296+
waitErr := wait.PollImmediate(5*time.Second, 5*time.Minute, func() (bool, error) {
297+
res, lastError = rpc.DialHTTP("tcp", net.JoinHostPort("localhost", strconv.Itoa(localAgentPort)))
247298
if lastError != nil {
248299
return false, nil
249300
}
@@ -257,18 +308,6 @@ func Instrument(component ComponentType, agentName string, namespace string, kub
257308
return nil, closer, err
258309
}
259310

260-
closer = append(closer, func() error {
261-
err := res.Call(MethodTestAgentShutdown, new(TestAgentShutdownRequest), new(TestAgentShutdownResponse))
262-
if err != nil && strings.Contains(err.Error(), "connection is shut down") {
263-
return nil
264-
}
265-
266-
if err != nil {
267-
return xerrors.Errorf("cannot shutdown agent: %w", err)
268-
}
269-
return nil
270-
})
271-
272311
return res, closer, nil
273312
}
274313

0 commit comments

Comments
 (0)