diff --git a/components/usage/pkg/db/dbtest/workspace_instance.go b/components/usage/pkg/db/dbtest/workspace_instance.go index c7a46d03921033..8a1be3b808c07a 100644 --- a/components/usage/pkg/db/dbtest/workspace_instance.go +++ b/components/usage/pkg/db/dbtest/workspace_instance.go @@ -77,6 +77,11 @@ func NewWorkspaceInstance(t *testing.T, instance db.WorkspaceInstance) db.Worksp workspaceClass = instance.WorkspaceClass } + phasePersisted := "" + if instance.PhasePersisted != "" { + phasePersisted = instance.PhasePersisted + } + return db.WorkspaceInstance{ ID: id, WorkspaceID: workspaceID, @@ -98,7 +103,7 @@ func NewWorkspaceInstance(t *testing.T, instance db.WorkspaceInstance) db.Worksp StatusOld: sql.NullString{}, Status: status, Phase: sql.NullString{}, - PhasePersisted: "", + PhasePersisted: phasePersisted, } } diff --git a/components/usage/pkg/db/workspace_instance.go b/components/usage/pkg/db/workspace_instance.go index 1a68ec995e6bd4..e47f0c24b1b428 100644 --- a/components/usage/pkg/db/workspace_instance.go +++ b/components/usage/pkg/db/workspace_instance.go @@ -239,3 +239,19 @@ func (i *WorkspaceInstanceForUsage) WorkspaceRuntimeSeconds(stopTimeIfInstanceIs return int64(stop.Sub(start).Round(time.Second).Seconds()) } + +func ListWorkspaceInstanceIDsWithPhaseStoppedButNoStoppingTime(ctx context.Context, conn *gorm.DB) ([]uuid.UUID, error) { + var ids []uuid.UUID + //var chunk []uuid.UUID + + tx := conn.WithContext(ctx). + Table(fmt.Sprintf("%s as wsi", (&WorkspaceInstance{}).TableName())). + Joins(fmt.Sprintf("LEFT JOIN %s AS u ON wsi.id = u.id", (&Usage{}).TableName())). + Where("wsi.phasePersisted = ?", "stopped"). + Where("wsi.stoppingTime = ''"). // empty + Pluck("wsi.id", &ids) + if tx.Error != nil { + return nil, fmt.Errorf("failed to list workspace instances with phase stopped but no stopping time: %w", tx.Error) + } + return ids, nil +} diff --git a/components/usage/pkg/db/workspace_instance_test.go b/components/usage/pkg/db/workspace_instance_test.go index 47033fcc8237a4..df1a924e253cb8 100644 --- a/components/usage/pkg/db/workspace_instance_test.go +++ b/components/usage/pkg/db/workspace_instance_test.go @@ -244,3 +244,31 @@ func TestWorkspaceInstanceForUsage_WorkspaceRuntimeSeconds(t *testing.T) { }) } } + +func TestListWorkspaceInstanceIDsWithPhaseStoppedButNoStoppingTime(t *testing.T) { + dbconn := dbtest.ConnectForTests(t) + instances := dbtest.CreateWorkspaceInstances(t, dbconn, + // started but not stopped, should be ignored + dbtest.NewWorkspaceInstance(t, db.WorkspaceInstance{ + StartedTime: db.NewVarcharTime(time.Now()), + }), + // stopped, but no stopping time, should be detected + dbtest.NewWorkspaceInstance(t, db.WorkspaceInstance{ + StartedTime: db.NewVarcharTime(time.Now()), + PhasePersisted: "stopped", + }), + ) + + dbtest.CreateUsageRecords(t, dbconn, + dbtest.NewUsage(t, db.Usage{ + ID: instances[0].ID, + }), + dbtest.NewUsage(t, db.Usage{ + ID: instances[1].ID, + }), + ) + + detectedIDs, err := db.ListWorkspaceInstanceIDsWithPhaseStoppedButNoStoppingTime(context.Background(), dbconn) + require.NoError(t, err) + require.Equal(t, []uuid.UUID{instances[1].ID}, detectedIDs) +} diff --git a/components/usage/pkg/scheduler/job.go b/components/usage/pkg/scheduler/job.go index 90b74b12aa38ef..f6e93f876d1233 100644 --- a/components/usage/pkg/scheduler/job.go +++ b/components/usage/pkg/scheduler/job.go @@ -9,8 +9,10 @@ import ( "fmt" "github.com/gitpod-io/gitpod/common-go/log" v1 "github.com/gitpod-io/gitpod/usage-api/v1" + "github.com/gitpod-io/gitpod/usage/pkg/db" "github.com/robfig/cron" "google.golang.org/protobuf/types/known/timestamppb" + "gorm.io/gorm" "time" ) @@ -18,52 +20,43 @@ type Job interface { Run() error } -func NewLedgerTriggerJobSpec(schedule time.Duration, job Job) (JobSpec, error) { - parsed, err := cron.Parse(fmt.Sprintf("@every %s", schedule.String())) +type JobFunc func() error + +func (f JobFunc) Run() error { + return f() +} + +func NewPeriodicJobSpec(period time.Duration, id string, job Job) (JobSpec, error) { + parsed, err := cron.Parse(fmt.Sprintf("@every %s", period.String())) if err != nil { - return JobSpec{}, fmt.Errorf("failed to parse ledger job schedule: %w", err) + return JobSpec{}, fmt.Errorf("failed to parse period into schedule: %w", err) } return JobSpec{ - Job: job, - ID: "ledger", + Job: WithoutConcurrentRun(job), + ID: id, Schedule: parsed, }, nil } +func NewLedgerTriggerJobSpec(schedule time.Duration, job Job) (JobSpec, error) { + return NewPeriodicJobSpec(schedule, "ledger", WithoutConcurrentRun(job)) +} + func NewLedgerTrigger(usageClient v1.UsageServiceClient, billingClient v1.BillingServiceClient) *LedgerJob { return &LedgerJob{ usageClient: usageClient, billingClient: billingClient, - - running: make(chan struct{}, 1), } } type LedgerJob struct { usageClient v1.UsageServiceClient billingClient v1.BillingServiceClient - - running chan struct{} } func (r *LedgerJob) Run() (err error) { ctx := context.Background() - - select { - // attempt a write to signal we want to run - case r.running <- struct{}{}: - // we managed to write, there's no other job executing. Cases are not fall through so we continue executing our main logic. - defer func() { - // signal job completed - <-r.running - }() - default: - // we could not write, so another instance is already running. Skip current run. - log.Infof("Skipping ledger run, another run is already in progress.") - return nil - } - now := time.Now().UTC() hourAgo := now.Add(-1 * time.Hour) @@ -90,3 +83,62 @@ func (r *LedgerJob) Run() (err error) { return nil } + +// WithoutConcurrentRun wraps a Job and ensures the job does not concurrently +func WithoutConcurrentRun(j Job) Job { + return &preventConcurrentInvocation{ + job: j, + running: make(chan struct{}, 1), + } +} + +type preventConcurrentInvocation struct { + job Job + running chan struct{} +} + +func (r *preventConcurrentInvocation) Run() error { + select { + // attempt a write to signal we want to run + case r.running <- struct{}{}: + // we managed to write, there's no other job executing. Cases are not fall through so we continue executing our main logic. + defer func() { + // signal job completed + <-r.running + }() + + err := r.job.Run() + return err + default: + // we could not write, so another instance is already running. Skip current run. + log.Infof("Job already running, skipping invocation.") + return nil + } +} + +func NewStoppedWithoutStoppingTimeDetectorSpec(dbconn *gorm.DB) *StoppedWithoutStoppingTimeDetector { + return &StoppedWithoutStoppingTimeDetector{ + dbconn: dbconn, + } +} + +type StoppedWithoutStoppingTimeDetector struct { + dbconn *gorm.DB +} + +func (r *StoppedWithoutStoppingTimeDetector) Run() error { + ctx, cancel := context.WithTimeout(context.Background(), 10*time.Minute) + defer cancel() + + log.Info("Checking for instances which are stopped but are missing a stoppingTime.") + instances, err := db.ListWorkspaceInstanceIDsWithPhaseStoppedButNoStoppingTime(ctx, r.dbconn) + if err != nil { + log.WithError(err).Errorf("Failed to list stop instances without stopping time.") + return fmt.Errorf("failed to list instances from db: %w", err) + } + + log.Infof("Identified %d instances in stopped state without a stopping time.", len(instances)) + stoppedWithoutStoppingTime.Set(float64(len(instances))) + + return nil +} diff --git a/components/usage/pkg/scheduler/job_test.go b/components/usage/pkg/scheduler/job_test.go index e2b6cf3d5341d8..ded7fa6f9d163a 100644 --- a/components/usage/pkg/scheduler/job_test.go +++ b/components/usage/pkg/scheduler/job_test.go @@ -5,21 +5,20 @@ package scheduler import ( - "context" - v1 "github.com/gitpod-io/gitpod/usage-api/v1" "github.com/stretchr/testify/require" - "google.golang.org/grpc" - "google.golang.org/grpc/codes" - "google.golang.org/grpc/status" "sync" "sync/atomic" "testing" "time" ) -func TestLedgerJob_PreventsConcurrentInvocations(t *testing.T) { - client := &fakeUsageClient{} - job := NewLedgerTrigger(client, nil) +func TestPreventConcurrentInvocation(t *testing.T) { + callCount := int32(0) + job := WithoutConcurrentRun(JobFunc(func() error { + atomic.AddInt32(&callCount, 1) + time.Sleep(50 * time.Millisecond) + return nil + })) invocations := 3 wg := sync.WaitGroup{} @@ -32,42 +31,18 @@ func TestLedgerJob_PreventsConcurrentInvocations(t *testing.T) { } wg.Wait() - require.Equal(t, 1, int(client.ReconcileUsageWithLedgerCallCount)) + require.Equal(t, int32(1), callCount) } -func TestLedgerJob_CanRunRepeatedly(t *testing.T) { - client := &fakeUsageClient{} - job := NewLedgerTrigger(client, nil) +func TestPreventConcurrentInvocation_CanRunRepeatedly(t *testing.T) { + callCount := int32(0) + job := WithoutConcurrentRun(JobFunc(func() error { + atomic.AddInt32(&callCount, 1) + return nil + })) - _ = job.Run() - _ = job.Run() + require.NoError(t, job.Run()) + require.NoError(t, job.Run()) - require.Equal(t, 2, int(client.ReconcileUsageWithLedgerCallCount)) -} - -type fakeUsageClient struct { - ReconcileUsageWithLedgerCallCount int32 -} - -// GetCostCenter retrieves the active cost center for the given attributionID -func (c *fakeUsageClient) GetCostCenter(ctx context.Context, in *v1.GetCostCenterRequest, opts ...grpc.CallOption) (*v1.GetCostCenterResponse, error) { - return nil, status.Error(codes.Unauthenticated, "not implemented") -} - -// SetCostCenter stores the given cost center -func (c *fakeUsageClient) SetCostCenter(ctx context.Context, in *v1.SetCostCenterRequest, opts ...grpc.CallOption) (*v1.SetCostCenterResponse, error) { - return nil, status.Error(codes.Unauthenticated, "not implemented") -} - -// Triggers reconciliation of usage with ledger implementation. -func (c *fakeUsageClient) ReconcileUsage(ctx context.Context, in *v1.ReconcileUsageRequest, opts ...grpc.CallOption) (*v1.ReconcileUsageResponse, error) { - atomic.AddInt32(&c.ReconcileUsageWithLedgerCallCount, 1) - time.Sleep(50 * time.Millisecond) - - return nil, status.Error(codes.Unauthenticated, "not implemented") -} - -// ListUsage retrieves all usage for the specified attributionId and theb given time range -func (c *fakeUsageClient) ListUsage(ctx context.Context, in *v1.ListUsageRequest, opts ...grpc.CallOption) (*v1.ListUsageResponse, error) { - return nil, status.Error(codes.Unauthenticated, "not implemented") + require.Equal(t, int32(2), callCount) } diff --git a/components/usage/pkg/scheduler/reporter.go b/components/usage/pkg/scheduler/reporter.go index 4096c140520974..7020885b6fd1ef 100644 --- a/components/usage/pkg/scheduler/reporter.go +++ b/components/usage/pkg/scheduler/reporter.go @@ -30,12 +30,20 @@ var ( Help: "Histogram of job duration", Buckets: prometheus.LinearBuckets(30, 30, 10), // every 30 secs, starting at 30secs }, []string{"job", "outcome"}) + + stoppedWithoutStoppingTime = prometheus.NewGauge(prometheus.GaugeOpts{ + Namespace: namespace, + Subsystem: subsystem, + Name: "job_stopped_instances_without_stopping_time_count", + Help: "Gauge of usage records where workpsace instance is stopped but doesn't have a stopping time", + }) ) func RegisterMetrics(reg *prometheus.Registry) error { metrics := []prometheus.Collector{ jobStartedSeconds, jobCompletedSeconds, + stoppedWithoutStoppingTime, } for _, metric := range metrics { err := reg.Register(metric) diff --git a/components/usage/pkg/scheduler/scheduler_test.go b/components/usage/pkg/scheduler/scheduler_test.go index 27a1564ce30d0e..1b14e53c5b3d2a 100644 --- a/components/usage/pkg/scheduler/scheduler_test.go +++ b/components/usage/pkg/scheduler/scheduler_test.go @@ -39,9 +39,3 @@ func TestScheduler(t *testing.T) { require.True(t, firstRan) require.True(t, secondRan) } - -type JobFunc func() error - -func (f JobFunc) Run() error { - return f() -} diff --git a/components/usage/pkg/server/server.go b/components/usage/pkg/server/server.go index 616be29ae94fe8..3e6740e5edf511 100644 --- a/components/usage/pkg/server/server.go +++ b/components/usage/pkg/server/server.go @@ -98,6 +98,19 @@ func Start(cfg Config) error { stripeClient = c } + stoppedWithoutStoppingTimeSpec, err := scheduler.NewPeriodicJobSpec( + 15*time.Minute, + "stopped_without_stopping_time", + scheduler.WithoutConcurrentRun(scheduler.NewStoppedWithoutStoppingTimeDetectorSpec(conn)), + ) + if err != nil { + return fmt.Errorf("failed to setup stopped without a stopping time detector: %w", err) + } + + schedulerJobSpecs := []scheduler.JobSpec{ + stoppedWithoutStoppingTimeSpec, + } + if cfg.ControllerSchedule != "" { // we do not run the controller if there is no schedule defined. schedule, err := time.ParseDuration(cfg.ControllerSchedule) @@ -112,13 +125,16 @@ func Start(cfg Config) error { return fmt.Errorf("failed to setup ledger trigger job: %w", err) } - sched := scheduler.New(jobSpec) - sched.Start() - defer sched.Stop() + schedulerJobSpecs = append(schedulerJobSpecs, jobSpec) + } else { log.Info("No controller schedule specified, controller will be disabled.") } + sched := scheduler.New(schedulerJobSpecs...) + sched.Start() + defer sched.Stop() + err = registerGRPCServices(srv, conn, stripeClient, pricer) if err != nil { return fmt.Errorf("failed to register gRPC services: %w", err)