Skip to content

Commit 9d4d690

Browse files
committed
test: Handle errors relate network in port-fowarding
1 parent 2da5eef commit 9d4d690

File tree

1 file changed

+79
-39
lines changed

1 file changed

+79
-39
lines changed

test/pkg/integration/integration.go

+79-39
Original file line numberDiff line numberDiff line change
@@ -20,9 +20,12 @@ import (
2020
"runtime"
2121
"strconv"
2222
"strings"
23+
"syscall"
2324
"time"
2425

2526
"golang.org/x/xerrors"
27+
"google.golang.org/grpc/codes"
28+
"google.golang.org/grpc/status"
2629
corev1 "k8s.io/api/core/v1"
2730
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
2831
"k8s.io/apimachinery/pkg/runtime/schema"
@@ -39,6 +42,11 @@ import (
3942
"github.com/gitpod-io/gitpod/test/pkg/integration/common"
4043
)
4144

45+
const (
46+
connectFailureMaxTries = 5
47+
errorDialingBackendEOF = "error dialing backend: EOF"
48+
)
49+
4250
type PodExec struct {
4351
RestConfig *rest.Config
4452
*kubernetes.Clientset
@@ -57,25 +65,33 @@ func NewPodExec(config rest.Config, clientset *kubernetes.Clientset) *PodExec {
5765
func (p *PodExec) PodCopyFile(src string, dst string, containername string) (*bytes.Buffer, *bytes.Buffer, *bytes.Buffer, error) {
5866
var in, out, errOut *bytes.Buffer
5967
var ioStreams genericclioptions.IOStreams
60-
for {
68+
for count := 0; ; count++ {
6169
ioStreams, in, out, errOut = genericclioptions.NewTestIOStreams()
6270
copyOptions := kubectlcp.NewCopyOptions(ioStreams)
6371
copyOptions.Clientset = p.Clientset
6472
copyOptions.ClientConfig = p.RestConfig
6573
copyOptions.Container = containername
6674
err := copyOptions.Run([]string{src, dst})
6775
if err != nil {
68-
if !errors.Is(err, io.EOF) {
69-
return nil, nil, nil, fmt.Errorf("Could not run copy operation: %v", err)
76+
if !shouldRetry(count, err) {
77+
return nil, nil, nil, fmt.Errorf("could not run copy operation: %v", err)
7078
}
7179
time.Sleep(10 * time.Second)
80+
count = count + 1
7281
continue
7382
}
7483
break
7584
}
7685
return in, out, errOut, nil
7786
}
7887

88+
func shouldRetry(count int, err error) bool {
89+
if count < connectFailureMaxTries {
90+
return err.Error() == errorDialingBackendEOF
91+
}
92+
return false
93+
}
94+
7995
func (p *PodExec) ExecCmd(command []string, podname string, namespace string, containername string) (*bytes.Buffer, *bytes.Buffer, *bytes.Buffer, error) {
8096
ioStreams, in, out, errOut := genericclioptions.NewTestIOStreams()
8197
execOptions := &kubectlexec.ExecOptions{
@@ -187,6 +203,43 @@ func Instrument(component ComponentType, agentName string, namespace string, kub
187203
return nil, closer, err
188204
}
189205

206+
var (
207+
res *rpc.Client
208+
cl []func() error
209+
)
210+
for i := 0; i < 10; i++ {
211+
res, cl, err = portfw(podExec, kubeconfig, podName, namespace, containerName, tgtFN, options)
212+
if err == nil {
213+
closer = append(closer, cl...)
214+
break
215+
}
216+
for _, c := range cl {
217+
_ = c()
218+
}
219+
time.Sleep(5 * time.Second)
220+
}
221+
if err != nil {
222+
return nil, closer, err
223+
}
224+
225+
closer = append(closer, func() error {
226+
err := res.Call(MethodTestAgentShutdown, new(TestAgentShutdownRequest), new(TestAgentShutdownResponse))
227+
if err != nil && strings.Contains(err.Error(), "connection is shut down") {
228+
return nil
229+
}
230+
231+
if err != nil {
232+
return xerrors.Errorf("cannot shutdown agent: %w", err)
233+
}
234+
return nil
235+
})
236+
237+
return res, closer, nil
238+
}
239+
240+
func portfw(podExec *PodExec, kubeconfig string, podName string, namespace string, containerName string, tgtFN string, options instrumentOptions) (*rpc.Client, []func() error, error) {
241+
var closer []func() error
242+
190243
localAgentPort, err := getFreePort()
191244
if err != nil {
192245
return nil, closer, err
@@ -198,22 +251,13 @@ func Instrument(component ComponentType, agentName string, namespace string, kub
198251
}
199252

200253
execErrs := make(chan error, 1)
201-
execF := func() {
254+
go func() {
202255
defer close(execErrs)
203256
_, _, _, execErr := podExec.ExecCmd(cmd, podName, namespace, containerName)
204257
if execErr != nil {
205258
execErrs <- execErr
206259
}
207-
}
208-
go execF()
209-
select {
210-
case err := <-execErrs:
211-
if err != nil {
212-
return nil, closer, err
213-
}
214-
return nil, closer, fmt.Errorf("agent stopped unexepectedly")
215-
case <-time.After(30 * time.Second):
216-
}
260+
}()
217261

218262
ctx, cancel := context.WithCancel(context.Background())
219263
defer func() {
@@ -226,24 +270,32 @@ func Instrument(component ComponentType, agentName string, namespace string, kub
226270
cancel()
227271
}
228272
}()
229-
230-
fwdReady, fwdErr := common.ForwardPortOfPod(ctx, kubeconfig, namespace, podName, strconv.Itoa(localAgentPort))
231-
select {
232-
case <-fwdReady:
233-
case err := <-execErrs:
234-
if err != nil {
235-
return nil, closer, err
236-
}
237-
case err := <-fwdErr:
238-
if err != nil {
239-
return nil, closer, err
273+
L:
274+
for {
275+
fwdReady, fwdErr := common.ForwardPortOfPod(ctx, kubeconfig, namespace, podName, strconv.Itoa(localAgentPort))
276+
select {
277+
case <-fwdReady:
278+
break L
279+
case err := <-execErrs:
280+
if err != nil {
281+
return nil, closer, err
282+
}
283+
case err := <-fwdErr:
284+
var eno syscall.Errno
285+
if errors.Is(err, io.EOF) || (errors.As(err, &eno) && eno == syscall.ECONNREFUSED) {
286+
time.Sleep(10 * time.Second)
287+
} else if st, ok := status.FromError(err); ok && st.Code() == codes.Unavailable {
288+
time.Sleep(10 * time.Second)
289+
} else if err != nil {
290+
return nil, closer, err
291+
}
240292
}
241293
}
242294

243295
var res *rpc.Client
244296
var lastError error
245-
waitErr := wait.PollImmediate(5*time.Second, 3*time.Minute, func() (bool, error) {
246-
res, lastError = rpc.DialHTTP("tcp", fmt.Sprintf("localhost:%d", localAgentPort))
297+
waitErr := wait.PollImmediate(5*time.Second, 5*time.Minute, func() (bool, error) {
298+
res, lastError = rpc.DialHTTP("tcp", net.JoinHostPort("localhost", strconv.Itoa(localAgentPort)))
247299
if lastError != nil {
248300
return false, nil
249301
}
@@ -257,18 +309,6 @@ func Instrument(component ComponentType, agentName string, namespace string, kub
257309
return nil, closer, err
258310
}
259311

260-
closer = append(closer, func() error {
261-
err := res.Call(MethodTestAgentShutdown, new(TestAgentShutdownRequest), new(TestAgentShutdownResponse))
262-
if err != nil && strings.Contains(err.Error(), "connection is shut down") {
263-
return nil
264-
}
265-
266-
if err != nil {
267-
return xerrors.Errorf("cannot shutdown agent: %w", err)
268-
}
269-
return nil
270-
})
271-
272312
return res, closer, nil
273313
}
274314

0 commit comments

Comments
 (0)