Skip to content

[usage] Add periodic job to detect invalid workspace instances in usage #12930

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Sep 14, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 6 additions & 1 deletion components/usage/pkg/db/dbtest/workspace_instance.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,11 @@ func NewWorkspaceInstance(t *testing.T, instance db.WorkspaceInstance) db.Worksp
workspaceClass = instance.WorkspaceClass
}

phasePersisted := ""
if instance.PhasePersisted != "" {
phasePersisted = instance.PhasePersisted
}

return db.WorkspaceInstance{
ID: id,
WorkspaceID: workspaceID,
Expand All @@ -98,7 +103,7 @@ func NewWorkspaceInstance(t *testing.T, instance db.WorkspaceInstance) db.Worksp
StatusOld: sql.NullString{},
Status: status,
Phase: sql.NullString{},
PhasePersisted: "",
PhasePersisted: phasePersisted,
}
}

Expand Down
16 changes: 16 additions & 0 deletions components/usage/pkg/db/workspace_instance.go
Original file line number Diff line number Diff line change
Expand Up @@ -239,3 +239,19 @@ func (i *WorkspaceInstanceForUsage) WorkspaceRuntimeSeconds(stopTimeIfInstanceIs

return int64(stop.Sub(start).Round(time.Second).Seconds())
}

func ListWorkspaceInstanceIDsWithPhaseStoppedButNoStoppingTime(ctx context.Context, conn *gorm.DB) ([]uuid.UUID, error) {
var ids []uuid.UUID
//var chunk []uuid.UUID

tx := conn.WithContext(ctx).
Table(fmt.Sprintf("%s as wsi", (&WorkspaceInstance{}).TableName())).
Joins(fmt.Sprintf("LEFT JOIN %s AS u ON wsi.id = u.id", (&Usage{}).TableName())).
Where("wsi.phasePersisted = ?", "stopped").
Where("wsi.stoppingTime = ''"). // empty
Pluck("wsi.id", &ids)
if tx.Error != nil {
return nil, fmt.Errorf("failed to list workspace instances with phase stopped but no stopping time: %w", tx.Error)
}
return ids, nil
}
28 changes: 28 additions & 0 deletions components/usage/pkg/db/workspace_instance_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -244,3 +244,31 @@ func TestWorkspaceInstanceForUsage_WorkspaceRuntimeSeconds(t *testing.T) {
})
}
}

func TestListWorkspaceInstanceIDsWithPhaseStoppedButNoStoppingTime(t *testing.T) {
dbconn := dbtest.ConnectForTests(t)
instances := dbtest.CreateWorkspaceInstances(t, dbconn,
// started but not stopped, should be ignored
dbtest.NewWorkspaceInstance(t, db.WorkspaceInstance{
StartedTime: db.NewVarcharTime(time.Now()),
}),
// stopped, but no stopping time, should be detected
dbtest.NewWorkspaceInstance(t, db.WorkspaceInstance{
StartedTime: db.NewVarcharTime(time.Now()),
PhasePersisted: "stopped",
}),
)

dbtest.CreateUsageRecords(t, dbconn,
dbtest.NewUsage(t, db.Usage{
ID: instances[0].ID,
}),
dbtest.NewUsage(t, db.Usage{
ID: instances[1].ID,
}),
)

detectedIDs, err := db.ListWorkspaceInstanceIDsWithPhaseStoppedButNoStoppingTime(context.Background(), dbconn)
require.NoError(t, err)
require.Equal(t, []uuid.UUID{instances[1].ID}, detectedIDs)
}
100 changes: 76 additions & 24 deletions components/usage/pkg/scheduler/job.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,61 +9,54 @@ import (
"fmt"
"github.com/gitpod-io/gitpod/common-go/log"
v1 "github.com/gitpod-io/gitpod/usage-api/v1"
"github.com/gitpod-io/gitpod/usage/pkg/db"
"github.com/robfig/cron"
"google.golang.org/protobuf/types/known/timestamppb"
"gorm.io/gorm"
"time"
)

type Job interface {
Run() error
}

func NewLedgerTriggerJobSpec(schedule time.Duration, job Job) (JobSpec, error) {
parsed, err := cron.Parse(fmt.Sprintf("@every %s", schedule.String()))
type JobFunc func() error

func (f JobFunc) Run() error {
return f()
}

func NewPeriodicJobSpec(period time.Duration, id string, job Job) (JobSpec, error) {
parsed, err := cron.Parse(fmt.Sprintf("@every %s", period.String()))
if err != nil {
return JobSpec{}, fmt.Errorf("failed to parse ledger job schedule: %w", err)
return JobSpec{}, fmt.Errorf("failed to parse period into schedule: %w", err)
}

return JobSpec{
Job: job,
ID: "ledger",
Job: WithoutConcurrentRun(job),
ID: id,
Schedule: parsed,
}, nil
}

func NewLedgerTriggerJobSpec(schedule time.Duration, job Job) (JobSpec, error) {
return NewPeriodicJobSpec(schedule, "ledger", WithoutConcurrentRun(job))
}

func NewLedgerTrigger(usageClient v1.UsageServiceClient, billingClient v1.BillingServiceClient) *LedgerJob {
return &LedgerJob{
usageClient: usageClient,
billingClient: billingClient,

running: make(chan struct{}, 1),
}
}

type LedgerJob struct {
usageClient v1.UsageServiceClient
billingClient v1.BillingServiceClient

running chan struct{}
}

func (r *LedgerJob) Run() (err error) {
ctx := context.Background()

select {
// attempt a write to signal we want to run
case r.running <- struct{}{}:
// we managed to write, there's no other job executing. Cases are not fall through so we continue executing our main logic.
defer func() {
// signal job completed
<-r.running
}()
default:
// we could not write, so another instance is already running. Skip current run.
log.Infof("Skipping ledger run, another run is already in progress.")
return nil
}

now := time.Now().UTC()
hourAgo := now.Add(-1 * time.Hour)

Expand All @@ -90,3 +83,62 @@ func (r *LedgerJob) Run() (err error) {

return nil
}

// WithoutConcurrentRun wraps a Job and ensures the job does not concurrently
func WithoutConcurrentRun(j Job) Job {
return &preventConcurrentInvocation{
job: j,
running: make(chan struct{}, 1),
}
}

type preventConcurrentInvocation struct {
job Job
running chan struct{}
}

func (r *preventConcurrentInvocation) Run() error {
select {
// attempt a write to signal we want to run
case r.running <- struct{}{}:
// we managed to write, there's no other job executing. Cases are not fall through so we continue executing our main logic.
defer func() {
// signal job completed
<-r.running
}()

err := r.job.Run()
return err
default:
// we could not write, so another instance is already running. Skip current run.
log.Infof("Job already running, skipping invocation.")
return nil
}
}

func NewStoppedWithoutStoppingTimeDetectorSpec(dbconn *gorm.DB) *StoppedWithoutStoppingTimeDetector {
return &StoppedWithoutStoppingTimeDetector{
dbconn: dbconn,
}
}

type StoppedWithoutStoppingTimeDetector struct {
dbconn *gorm.DB
}

func (r *StoppedWithoutStoppingTimeDetector) Run() error {
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Minute)
defer cancel()

log.Info("Checking for instances which are stopped but are missing a stoppingTime.")
instances, err := db.ListWorkspaceInstanceIDsWithPhaseStoppedButNoStoppingTime(ctx, r.dbconn)
if err != nil {
log.WithError(err).Errorf("Failed to list stop instances without stopping time.")
return fmt.Errorf("failed to list instances from db: %w", err)
}

log.Infof("Identified %d instances in stopped state without a stopping time.", len(instances))
stoppedWithoutStoppingTime.Set(float64(len(instances)))

return nil
}
59 changes: 17 additions & 42 deletions components/usage/pkg/scheduler/job_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,21 +5,20 @@
package scheduler

import (
"context"
v1 "github.com/gitpod-io/gitpod/usage-api/v1"
"github.com/stretchr/testify/require"
"google.golang.org/grpc"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
"sync"
"sync/atomic"
"testing"
"time"
)

func TestLedgerJob_PreventsConcurrentInvocations(t *testing.T) {
client := &fakeUsageClient{}
job := NewLedgerTrigger(client, nil)
func TestPreventConcurrentInvocation(t *testing.T) {
callCount := int32(0)
job := WithoutConcurrentRun(JobFunc(func() error {
atomic.AddInt32(&callCount, 1)
time.Sleep(50 * time.Millisecond)
return nil
}))

invocations := 3
wg := sync.WaitGroup{}
Expand All @@ -32,42 +31,18 @@ func TestLedgerJob_PreventsConcurrentInvocations(t *testing.T) {
}
wg.Wait()

require.Equal(t, 1, int(client.ReconcileUsageWithLedgerCallCount))
require.Equal(t, int32(1), callCount)
}

func TestLedgerJob_CanRunRepeatedly(t *testing.T) {
client := &fakeUsageClient{}
job := NewLedgerTrigger(client, nil)
func TestPreventConcurrentInvocation_CanRunRepeatedly(t *testing.T) {
callCount := int32(0)
job := WithoutConcurrentRun(JobFunc(func() error {
atomic.AddInt32(&callCount, 1)
return nil
}))

_ = job.Run()
_ = job.Run()
require.NoError(t, job.Run())
require.NoError(t, job.Run())

require.Equal(t, 2, int(client.ReconcileUsageWithLedgerCallCount))
}

type fakeUsageClient struct {
ReconcileUsageWithLedgerCallCount int32
}

// GetCostCenter retrieves the active cost center for the given attributionID
func (c *fakeUsageClient) GetCostCenter(ctx context.Context, in *v1.GetCostCenterRequest, opts ...grpc.CallOption) (*v1.GetCostCenterResponse, error) {
return nil, status.Error(codes.Unauthenticated, "not implemented")
}

// SetCostCenter stores the given cost center
func (c *fakeUsageClient) SetCostCenter(ctx context.Context, in *v1.SetCostCenterRequest, opts ...grpc.CallOption) (*v1.SetCostCenterResponse, error) {
return nil, status.Error(codes.Unauthenticated, "not implemented")
}

// Triggers reconciliation of usage with ledger implementation.
func (c *fakeUsageClient) ReconcileUsage(ctx context.Context, in *v1.ReconcileUsageRequest, opts ...grpc.CallOption) (*v1.ReconcileUsageResponse, error) {
atomic.AddInt32(&c.ReconcileUsageWithLedgerCallCount, 1)
time.Sleep(50 * time.Millisecond)

return nil, status.Error(codes.Unauthenticated, "not implemented")
}

// ListUsage retrieves all usage for the specified attributionId and theb given time range
func (c *fakeUsageClient) ListUsage(ctx context.Context, in *v1.ListUsageRequest, opts ...grpc.CallOption) (*v1.ListUsageResponse, error) {
return nil, status.Error(codes.Unauthenticated, "not implemented")
require.Equal(t, int32(2), callCount)
}
8 changes: 8 additions & 0 deletions components/usage/pkg/scheduler/reporter.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,12 +30,20 @@ var (
Help: "Histogram of job duration",
Buckets: prometheus.LinearBuckets(30, 30, 10), // every 30 secs, starting at 30secs
}, []string{"job", "outcome"})

stoppedWithoutStoppingTime = prometheus.NewGauge(prometheus.GaugeOpts{
Namespace: namespace,
Subsystem: subsystem,
Name: "job_stopped_instances_without_stopping_time_count",
Help: "Gauge of usage records where workpsace instance is stopped but doesn't have a stopping time",
})
)

func RegisterMetrics(reg *prometheus.Registry) error {
metrics := []prometheus.Collector{
jobStartedSeconds,
jobCompletedSeconds,
stoppedWithoutStoppingTime,
}
for _, metric := range metrics {
err := reg.Register(metric)
Expand Down
6 changes: 0 additions & 6 deletions components/usage/pkg/scheduler/scheduler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,9 +39,3 @@ func TestScheduler(t *testing.T) {
require.True(t, firstRan)
require.True(t, secondRan)
}

type JobFunc func() error

func (f JobFunc) Run() error {
return f()
}
22 changes: 19 additions & 3 deletions components/usage/pkg/server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,19 @@ func Start(cfg Config) error {
stripeClient = c
}

stoppedWithoutStoppingTimeSpec, err := scheduler.NewPeriodicJobSpec(
15*time.Minute,
"stopped_without_stopping_time",
scheduler.WithoutConcurrentRun(scheduler.NewStoppedWithoutStoppingTimeDetectorSpec(conn)),
)
if err != nil {
return fmt.Errorf("failed to setup stopped without a stopping time detector: %w", err)
}

schedulerJobSpecs := []scheduler.JobSpec{
stoppedWithoutStoppingTimeSpec,
}

if cfg.ControllerSchedule != "" {
// we do not run the controller if there is no schedule defined.
schedule, err := time.ParseDuration(cfg.ControllerSchedule)
Expand All @@ -112,13 +125,16 @@ func Start(cfg Config) error {
return fmt.Errorf("failed to setup ledger trigger job: %w", err)
}

sched := scheduler.New(jobSpec)
sched.Start()
defer sched.Stop()
schedulerJobSpecs = append(schedulerJobSpecs, jobSpec)

} else {
log.Info("No controller schedule specified, controller will be disabled.")
}

sched := scheduler.New(schedulerJobSpecs...)
sched.Start()
defer sched.Stop()

err = registerGRPCServices(srv, conn, stripeClient, pricer)
if err != nil {
return fmt.Errorf("failed to register gRPC services: %w", err)
Expand Down