Skip to content

test: Enhanced running in unstable environments #12966

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 3 commits into from
Sep 15, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions test/pkg/integration/apis.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,8 +30,10 @@ import (
"github.com/google/uuid"
"golang.org/x/xerrors"
"google.golang.org/grpc"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/credentials"
"google.golang.org/grpc/credentials/insecure"
"google.golang.org/grpc/status"
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/labels"
Expand Down Expand Up @@ -1136,6 +1138,8 @@ func (c *ComponentAPI) portFwdWithRetry(ctx context.Context, portFwdF portFwdFun
case err := <-errc:
if err == io.EOF {
time.Sleep(10 * time.Second)
} else if st, ok := status.FromError(err); ok && st.Code() == codes.Unavailable {
time.Sleep(10 * time.Second)
} else {
return err
}
Expand Down
145 changes: 87 additions & 58 deletions test/pkg/integration/workspace.go
Original file line number Diff line number Diff line change
Expand Up @@ -301,41 +301,52 @@ func stopWsF(t *testing.T, instanceID string, api *ComponentAPI) stopWorkspaceFu
break
}

if !waitForStop {
wm, err := api.WorkspaceManager()
if err != nil {
return nil, err
}
wm, err := api.WorkspaceManager()
if err != nil {
return nil, err
}

dr, err := wm.DescribeWorkspace(sctx, &wsmanapi.DescribeWorkspaceRequest{
Id: instanceID,
})
if err != nil {
dr, err := wm.DescribeWorkspace(sctx, &wsmanapi.DescribeWorkspaceRequest{
Id: instanceID,
})
if err != nil {
if s, ok := status.FromError(err); ok && s.Code() == codes.NotFound {
t.Log("the workspace is already gone")
return &wsmanapi.WorkspaceStatus{
Id: instanceID,
Phase: wsmanapi.WorkspacePhase_STOPPED,
}, nil
} else if err != nil {
return nil, err
}

s, ok := status.FromError(err)
if ok && s.Code() == codes.NotFound {
return nil, err
}
return dr.Status, err
}

if !waitForStop {
return dr.Status, nil
}

var lastStatus *wsmanapi.WorkspaceStatus
for {
t.Logf("waiting for stopping the workspace: %s", instanceID)
lastStatus, err = WaitForWorkspaceStop(sctx, api, instanceID)
if st, ok := status.FromError(err); ok && st.Code() == codes.Unavailable {
api.ClearWorkspaceManagerClientCache()
t.Logf("got %v during waiting for stopping the workspace", st)
time.Sleep(5 * time.Second)
continue
if st, ok := status.FromError(err); ok {
switch st.Code() {
case codes.Unavailable:
api.ClearWorkspaceManagerClientCache()
t.Logf("got %v during waiting for stopping the workspace", st)
time.Sleep(5 * time.Second)
continue
case codes.NotFound:
t.Log("the workspace is already gone")
return lastStatus, nil
}
} else if err != nil {
return lastStatus, err
}
break
}
return lastStatus, err
return lastStatus, nil
}
}

Expand Down Expand Up @@ -372,7 +383,7 @@ func WaitForWorkspaceStart(ctx context.Context, instanceID string, api *Componen
continue
}
if err != nil {
return nil, xerrors.Errorf("cannot listen for workspace updates: %q", err)
return nil, xerrors.Errorf("cannot listen for workspace updates: %w", err)
}
defer func() {
_ = sub.CloseSend()
Expand All @@ -393,6 +404,7 @@ func WaitForWorkspaceStart(ctx context.Context, instanceID string, api *Componen
resp, err := sub.Recv()
if err != nil {
if st, ok := status.FromError(err); ok && st.Code() == codes.Unavailable {
time.Sleep(10 * time.Second)
sub.CloseSend()
api.ClearWorkspaceManagerClientCache()
wsman, err := api.WorkspaceManager()
Expand All @@ -405,17 +417,16 @@ func WaitForWorkspaceStart(ctx context.Context, instanceID string, api *Componen
errStatus <- xerrors.Errorf("cannot listen for workspace updates: %w", err)
return
}
time.Sleep(10 * time.Second)
continue
}
errStatus <- xerrors.Errorf("workspace update error: %q", err)
errStatus <- xerrors.Errorf("workspace update error: %w", err)
return
}

s = resp.GetStatus()
if s == nil {
continue
}
if s.Id != instanceID {
if s == nil || s.Id != instanceID {
time.Sleep(10 * time.Second)
continue
}

Expand All @@ -430,18 +441,14 @@ func WaitForWorkspaceStart(ctx context.Context, instanceID string, api *Componen
if s.Conditions.Failed != "" {
errStatus <- xerrors.Errorf("workspace instance %s failed: %s", instanceID, s.Conditions.Failed)
return
}
if s.Phase == wsmanapi.WorkspacePhase_STOPPING {
errStatus <- xerrors.Errorf("workspace instance %s is stopping", instanceID)
return
}
if s.Phase == wsmanapi.WorkspacePhase_STOPPED {
errStatus <- xerrors.Errorf("workspace instance %s has stopped", instanceID)
} else if s.Phase == wsmanapi.WorkspacePhase_STOPPING || s.Phase == wsmanapi.WorkspacePhase_STOPPED {
errStatus <- xerrors.Errorf("workspace instance %s is %s", instanceID, s.Phase)
return
}
}
if s.Phase != wsmanapi.WorkspacePhase_RUNNING {
// we're still starting
time.Sleep(10 * time.Second)
continue
}

Expand Down Expand Up @@ -469,7 +476,7 @@ func WaitForWorkspaceStart(ctx context.Context, instanceID string, api *Componen

select {
case <-ctx.Done():
return nil, xerrors.Errorf("cannot wait for workspace: %q", ctx.Err())
return nil, xerrors.Errorf("cannot wait for workspace: %w", ctx.Err())
case s := <-done:
return s, nil
case err := <-errStatus:
Expand All @@ -485,44 +492,67 @@ func WaitForWorkspaceStop(ctx context.Context, api *ComponentAPI, instanceID str
return nil, xerrors.Errorf("cannot listen for workspace updates: %q", err)
}

sub, err := wsman.Subscribe(context.Background(), &wsmanapi.SubscribeRequest{})
_, err = wsman.DescribeWorkspace(ctx, &wsmanapi.DescribeWorkspaceRequest{
Id: instanceID,
})
if err != nil {
return nil, err
}

sub, err := wsman.Subscribe(ctx, &wsmanapi.SubscribeRequest{})
if err != nil {
return nil, xerrors.Errorf("cannot listen for workspace updates: %q", err)
}
defer func() {
_ = sub.CloseSend()
}()

done := make(chan struct{})
done := make(chan *wsmanapi.WorkspaceStatus)
errCh := make(chan error)
resetSubscriber := func(subscriber wsmanapi.WorkspaceManager_SubscribeClient, sapi *ComponentAPI) (wsmanapi.WorkspaceManager_SubscribeClient, error) {
subscriber.CloseSend()
sapi.ClearWorkspaceManagerClientCache()
wsman, err := sapi.WorkspaceManager()
if err != nil {
return nil, xerrors.Errorf("cannot listen for workspace updates: %w", err)
}
new_sub, err := wsman.Subscribe(ctx, &wsmanapi.SubscribeRequest{})
if err != nil {
return nil, xerrors.Errorf("cannot listen for workspace updates: %w", err)
}
return new_sub, nil
}

go func() {
defer close(done)
var wss *wsmanapi.WorkspaceStatus
defer func() {
done <- wss
close(done)
}()
for {
resp, err := sub.Recv()
if err != nil {
if s, ok := status.FromError(err); ok && s.Code() == codes.Unavailable {
time.Sleep(10 * time.Second)
sub, err = resetSubscriber(sub, api)
if err == nil {
continue
}
}
errCh <- xerrors.Errorf("workspace update error: %q", err)
return
}
s := resp.GetStatus()
if s == nil {
continue
}
if s.Id != instanceID {
continue
}

if s.Conditions.Failed != "" {
// TODO(toru): we have to fix https://github.com/gitpod-io/gitpod/issues/12021
if s.Conditions.Failed == "The container could not be located when the pod was deleted. The container used to be Running" || s.Conditions.Failed == "The container could not be located when the pod was terminated" {
lastStatus = s
if wss = resp.GetStatus(); wss != nil && wss.Id == instanceID {
if wss.Conditions.Failed != "" {
// TODO(toru): we have to fix https://github.com/gitpod-io/gitpod/issues/12021
if wss.Conditions.Failed != "The container could not be located when the pod was deleted. The container used to be Running" && wss.Conditions.Failed != "The container could not be located when the pod was terminated" {
errCh <- xerrors.Errorf("workspace instance %s failed: %s", instanceID, wss.Conditions.Failed)
}
return
}
if wss.Phase == wsmanapi.WorkspacePhase_STOPPED {
return
}
errCh <- xerrors.Errorf("workspace instance %s failed: %s", instanceID, s.Conditions.Failed)
return
}
if s.Phase == wsmanapi.WorkspacePhase_STOPPED {
lastStatus = s
return
}

time.Sleep(10 * time.Second)
Expand All @@ -544,10 +574,9 @@ func WaitForWorkspaceStop(ctx context.Context, api *ComponentAPI, instanceID str
return nil, err
case <-ctx.Done():
return nil, xerrors.Errorf("cannot wait for workspace: %q", ctx.Err())
case <-done:
case s := <-done:
return s, nil
}

return
}

// WaitForWorkspace waits until the condition function returns true. Fails the test if the condition does
Expand Down
Loading