Skip to content

Commit 66fb185

Browse files
committed
[usage] Add periodic job to detect invalid workspace instances in usage
1 parent f514e30 commit 66fb185

File tree

7 files changed

+170
-75
lines changed

7 files changed

+170
-75
lines changed

components/usage/pkg/db/workspace_instance.go

+19
Original file line numberDiff line numberDiff line change
@@ -239,3 +239,22 @@ func (i *WorkspaceInstanceForUsage) WorkspaceRuntimeSeconds(stopTimeIfInstanceIs
239239

240240
return int64(stop.Sub(start).Round(time.Second).Seconds())
241241
}
242+
243+
func ListWorkspaceInstanceIDsWithPhaseStoppedButNoStoppingTime(ctx context.Context, conn *gorm.DB) ([]uuid.UUID, error) {
244+
var ids []uuid.UUID
245+
var chunk []uuid.UUID
246+
247+
tx := conn.WithContext(ctx).
248+
Table(fmt.Sprintf("%s as wsi", (&WorkspaceInstance{}).TableName())).
249+
Joins(fmt.Sprintf("LEFT JOIN %s AS u ON wsi.id = u.id", (&Usage{}).TableName())).
250+
Where("wsi.phasePersisted = ?", "stopped").
251+
Where("wsi.stoppingTime = ''"). // empty
252+
FindInBatches(&chunk, 1000, func(_ *gorm.DB, _ int) error {
253+
ids = append(ids, chunk...)
254+
return nil
255+
})
256+
if tx.Error != nil {
257+
return nil, fmt.Errorf("failed to list workspace instances with phase stopped but no stopping time: %w", tx.Error)
258+
}
259+
return ids, nil
260+
}

components/usage/pkg/db/workspace_instance_test.go

+31
Original file line numberDiff line numberDiff line change
@@ -244,3 +244,34 @@ func TestWorkspaceInstanceForUsage_WorkspaceRuntimeSeconds(t *testing.T) {
244244
})
245245
}
246246
}
247+
248+
func TestListWorkspaceInstanceIDsWithPhaseStoppedButNoStoppingTime(t *testing.T) {
249+
dbconn := dbtest.ConnectForTests(t)
250+
instances := dbtest.CreateWorkspaceInstances(t, dbconn,
251+
// started but not stopped, should be ignored
252+
dbtest.NewWorkspaceInstance(t, db.WorkspaceInstance{
253+
ID: uuid.UUID{},
254+
StartedTime: db.NewVarcharTime(time.Now()),
255+
}),
256+
// stopped, but no stopping time, should be detected
257+
dbtest.NewWorkspaceInstance(t, db.WorkspaceInstance{
258+
ID: uuid.UUID{},
259+
StartedTime: db.NewVarcharTime(time.Now()),
260+
PhasePersisted: "stopped",
261+
StoppingTime: db.VarcharTime{},
262+
}),
263+
)
264+
265+
dbtest.CreateUsageRecords(t, dbconn,
266+
dbtest.NewUsage(t, db.Usage{
267+
ID: instances[0].ID,
268+
}),
269+
dbtest.NewUsage(t, db.Usage{
270+
ID: instances[1].ID,
271+
}),
272+
)
273+
274+
detectedIDs, err := db.ListWorkspaceInstanceIDsWithPhaseStoppedButNoStoppingTime(context.Background(), dbconn)
275+
require.NoError(t, err)
276+
require.Equal(t, []uuid.UUID{instances[1].ID}, detectedIDs)
277+
}

components/usage/pkg/scheduler/job.go

+76-24
Original file line numberDiff line numberDiff line change
@@ -9,61 +9,54 @@ import (
99
"fmt"
1010
"github.com/gitpod-io/gitpod/common-go/log"
1111
v1 "github.com/gitpod-io/gitpod/usage-api/v1"
12+
"github.com/gitpod-io/gitpod/usage/pkg/db"
1213
"github.com/robfig/cron"
1314
"google.golang.org/protobuf/types/known/timestamppb"
15+
"gorm.io/gorm"
1416
"time"
1517
)
1618

1719
type Job interface {
1820
Run() error
1921
}
2022

21-
func NewLedgerTriggerJobSpec(schedule time.Duration, job Job) (JobSpec, error) {
22-
parsed, err := cron.Parse(fmt.Sprintf("@every %s", schedule.String()))
23+
type JobFunc func() error
24+
25+
func (f JobFunc) Run() error {
26+
return f()
27+
}
28+
29+
func NewPeriodicJobSpec(period time.Duration, id string, job Job) (JobSpec, error) {
30+
parsed, err := cron.Parse(fmt.Sprintf("@every %s", period.String()))
2331
if err != nil {
24-
return JobSpec{}, fmt.Errorf("failed to parse ledger job schedule: %w", err)
32+
return JobSpec{}, fmt.Errorf("failed to parse period into schedule: %w", err)
2533
}
2634

2735
return JobSpec{
28-
Job: job,
29-
ID: "ledger",
36+
Job: WithoutConcurrentRun(job),
37+
ID: id,
3038
Schedule: parsed,
3139
}, nil
3240
}
3341

42+
func NewLedgerTriggerJobSpec(schedule time.Duration, job Job) (JobSpec, error) {
43+
return NewPeriodicJobSpec(schedule, "ledger", WithoutConcurrentRun(job))
44+
}
45+
3446
func NewLedgerTrigger(usageClient v1.UsageServiceClient, billingClient v1.BillingServiceClient) *LedgerJob {
3547
return &LedgerJob{
3648
usageClient: usageClient,
3749
billingClient: billingClient,
38-
39-
running: make(chan struct{}, 1),
4050
}
4151
}
4252

4353
type LedgerJob struct {
4454
usageClient v1.UsageServiceClient
4555
billingClient v1.BillingServiceClient
46-
47-
running chan struct{}
4856
}
4957

5058
func (r *LedgerJob) Run() (err error) {
5159
ctx := context.Background()
52-
53-
select {
54-
// attempt a write to signal we want to run
55-
case r.running <- struct{}{}:
56-
// we managed to write, there's no other job executing. Cases are not fall through so we continue executing our main logic.
57-
defer func() {
58-
// signal job completed
59-
<-r.running
60-
}()
61-
default:
62-
// we could not write, so another instance is already running. Skip current run.
63-
log.Infof("Skipping ledger run, another run is already in progress.")
64-
return nil
65-
}
66-
6760
now := time.Now().UTC()
6861
hourAgo := now.Add(-1 * time.Hour)
6962

@@ -90,3 +83,62 @@ func (r *LedgerJob) Run() (err error) {
9083

9184
return nil
9285
}
86+
87+
// WithoutConcurrentRun wraps a Job and ensures the job does not concurrently
88+
func WithoutConcurrentRun(j Job) Job {
89+
return &preventConcurrentInvocation{
90+
job: j,
91+
running: make(chan struct{}, 1),
92+
}
93+
}
94+
95+
type preventConcurrentInvocation struct {
96+
job Job
97+
running chan struct{}
98+
}
99+
100+
func (r *preventConcurrentInvocation) Run() error {
101+
select {
102+
// attempt a write to signal we want to run
103+
case r.running <- struct{}{}:
104+
// we managed to write, there's no other job executing. Cases are not fall through so we continue executing our main logic.
105+
defer func() {
106+
// signal job completed
107+
<-r.running
108+
}()
109+
110+
err := r.job.Run()
111+
return err
112+
default:
113+
// we could not write, so another instance is already running. Skip current run.
114+
log.Infof("Job already running, skipping invocation.")
115+
return nil
116+
}
117+
}
118+
119+
func NewStoppedWithoutStoppingTimeDetectorSpec(dbconn *gorm.DB) *StoppedWithoutStoppingTimeDetector {
120+
return &StoppedWithoutStoppingTimeDetector{
121+
dbconn: dbconn,
122+
}
123+
}
124+
125+
type StoppedWithoutStoppingTimeDetector struct {
126+
dbconn *gorm.DB
127+
}
128+
129+
func (r *StoppedWithoutStoppingTimeDetector) Run() error {
130+
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Minute)
131+
defer cancel()
132+
133+
log.Info("Checking for instances which are stopped but are missing a stoppingTime.")
134+
instances, err := db.ListWorkspaceInstanceIDsWithPhaseStoppedButNoStoppingTime(ctx, r.dbconn)
135+
if err != nil {
136+
log.WithError(err).Errorf("Failed to list stop instances without stopping time.")
137+
return fmt.Errorf("failed to list instances from db: %w", err)
138+
}
139+
140+
log.Infof("Identified %d instances in stopped state without a stopping time.", len(instances))
141+
stoppedWithoutStoppingTime.Set(float64(len(instances)))
142+
143+
return nil
144+
}

components/usage/pkg/scheduler/job_test.go

+17-42
Original file line numberDiff line numberDiff line change
@@ -5,21 +5,20 @@
55
package scheduler
66

77
import (
8-
"context"
9-
v1 "github.com/gitpod-io/gitpod/usage-api/v1"
108
"github.com/stretchr/testify/require"
11-
"google.golang.org/grpc"
12-
"google.golang.org/grpc/codes"
13-
"google.golang.org/grpc/status"
149
"sync"
1510
"sync/atomic"
1611
"testing"
1712
"time"
1813
)
1914

20-
func TestLedgerJob_PreventsConcurrentInvocations(t *testing.T) {
21-
client := &fakeUsageClient{}
22-
job := NewLedgerTrigger(client, nil)
15+
func TestPreventConcurrentInvocation(t *testing.T) {
16+
callCount := int32(0)
17+
job := WithoutConcurrentRun(JobFunc(func() error {
18+
atomic.AddInt32(&callCount, 1)
19+
time.Sleep(50 * time.Millisecond)
20+
return nil
21+
}))
2322

2423
invocations := 3
2524
wg := sync.WaitGroup{}
@@ -32,42 +31,18 @@ func TestLedgerJob_PreventsConcurrentInvocations(t *testing.T) {
3231
}
3332
wg.Wait()
3433

35-
require.Equal(t, 1, int(client.ReconcileUsageWithLedgerCallCount))
34+
require.Equal(t, int32(1), callCount)
3635
}
3736

38-
func TestLedgerJob_CanRunRepeatedly(t *testing.T) {
39-
client := &fakeUsageClient{}
40-
job := NewLedgerTrigger(client, nil)
37+
func TestPreventConcurrentInvocation_CanRunRepeatedly(t *testing.T) {
38+
callCount := int32(0)
39+
job := WithoutConcurrentRun(JobFunc(func() error {
40+
atomic.AddInt32(&callCount, 1)
41+
return nil
42+
}))
4143

42-
_ = job.Run()
43-
_ = job.Run()
44+
require.NoError(t, job.Run())
45+
require.NoError(t, job.Run())
4446

45-
require.Equal(t, 2, int(client.ReconcileUsageWithLedgerCallCount))
46-
}
47-
48-
type fakeUsageClient struct {
49-
ReconcileUsageWithLedgerCallCount int32
50-
}
51-
52-
// GetCostCenter retrieves the active cost center for the given attributionID
53-
func (c *fakeUsageClient) GetCostCenter(ctx context.Context, in *v1.GetCostCenterRequest, opts ...grpc.CallOption) (*v1.GetCostCenterResponse, error) {
54-
return nil, status.Error(codes.Unauthenticated, "not implemented")
55-
}
56-
57-
// SetCostCenter stores the given cost center
58-
func (c *fakeUsageClient) SetCostCenter(ctx context.Context, in *v1.SetCostCenterRequest, opts ...grpc.CallOption) (*v1.SetCostCenterResponse, error) {
59-
return nil, status.Error(codes.Unauthenticated, "not implemented")
60-
}
61-
62-
// Triggers reconciliation of usage with ledger implementation.
63-
func (c *fakeUsageClient) ReconcileUsage(ctx context.Context, in *v1.ReconcileUsageRequest, opts ...grpc.CallOption) (*v1.ReconcileUsageResponse, error) {
64-
atomic.AddInt32(&c.ReconcileUsageWithLedgerCallCount, 1)
65-
time.Sleep(50 * time.Millisecond)
66-
67-
return nil, status.Error(codes.Unauthenticated, "not implemented")
68-
}
69-
70-
// ListUsage retrieves all usage for the specified attributionId and theb given time range
71-
func (c *fakeUsageClient) ListUsage(ctx context.Context, in *v1.ListUsageRequest, opts ...grpc.CallOption) (*v1.ListUsageResponse, error) {
72-
return nil, status.Error(codes.Unauthenticated, "not implemented")
47+
require.Equal(t, int32(2), callCount)
7348
}

components/usage/pkg/scheduler/reporter.go

+8
Original file line numberDiff line numberDiff line change
@@ -30,12 +30,20 @@ var (
3030
Help: "Histogram of job duration",
3131
Buckets: prometheus.LinearBuckets(30, 30, 10), // every 30 secs, starting at 30secs
3232
}, []string{"job", "outcome"})
33+
34+
stoppedWithoutStoppingTime = prometheus.NewGauge(prometheus.GaugeOpts{
35+
Namespace: namespace,
36+
Subsystem: subsystem,
37+
Name: "job_stopped_instances_without_stopping_time_count",
38+
Help: "Gauge of usage records where workpsace instance is stopped but doesn't have a stopping time",
39+
})
3340
)
3441

3542
func RegisterMetrics(reg *prometheus.Registry) error {
3643
metrics := []prometheus.Collector{
3744
jobStartedSeconds,
3845
jobCompletedSeconds,
46+
stoppedWithoutStoppingTime,
3947
}
4048
for _, metric := range metrics {
4149
err := reg.Register(metric)

components/usage/pkg/scheduler/scheduler_test.go

-6
Original file line numberDiff line numberDiff line change
@@ -39,9 +39,3 @@ func TestScheduler(t *testing.T) {
3939
require.True(t, firstRan)
4040
require.True(t, secondRan)
4141
}
42-
43-
type JobFunc func() error
44-
45-
func (f JobFunc) Run() error {
46-
return f()
47-
}

components/usage/pkg/server/server.go

+19-3
Original file line numberDiff line numberDiff line change
@@ -98,6 +98,19 @@ func Start(cfg Config) error {
9898
stripeClient = c
9999
}
100100

101+
stoppedWithoutStoppingTimeSpec, err := scheduler.NewPeriodicJobSpec(
102+
15*time.Minute,
103+
"stopped_without_stopping_time",
104+
scheduler.WithoutConcurrentRun(scheduler.NewStoppedWithoutStoppingTimeDetectorSpec(conn)),
105+
)
106+
if err != nil {
107+
return fmt.Errorf("failed to setup stopped without a stopping time detector: %w", err)
108+
}
109+
110+
schedulerJobSpecs := []scheduler.JobSpec{
111+
stoppedWithoutStoppingTimeSpec,
112+
}
113+
101114
if cfg.ControllerSchedule != "" {
102115
// we do not run the controller if there is no schedule defined.
103116
schedule, err := time.ParseDuration(cfg.ControllerSchedule)
@@ -112,13 +125,16 @@ func Start(cfg Config) error {
112125
return fmt.Errorf("failed to setup ledger trigger job: %w", err)
113126
}
114127

115-
sched := scheduler.New(jobSpec)
116-
sched.Start()
117-
defer sched.Stop()
128+
schedulerJobSpecs = append(schedulerJobSpecs, jobSpec)
129+
118130
} else {
119131
log.Info("No controller schedule specified, controller will be disabled.")
120132
}
121133

134+
sched := scheduler.New(schedulerJobSpecs...)
135+
sched.Start()
136+
defer sched.Stop()
137+
122138
err = registerGRPCServices(srv, conn, stripeClient, pricer)
123139
if err != nil {
124140
return fmt.Errorf("failed to register gRPC services: %w", err)

0 commit comments

Comments
 (0)