Skip to content

feat: Implement RandomQueue scheduler strategy #1914

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 8 commits into from
Oct 3, 2024
Merged
Show file tree
Hide file tree
Changes from 6 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 12 additions & 8 deletions scheduler/metrics.go → scheduler/metrics/metrics.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package scheduler
package metrics

import (
"context"
Expand All @@ -12,6 +12,10 @@ import (
"go.opentelemetry.io/otel/metric"
)

const (
OtelName = "io.cloudquery"
)

// Metrics is deprecated as we move toward open telemetry for tracing and metrics
type Metrics struct {
TableClient map[string]map[string]*TableClientMetrics
Expand Down Expand Up @@ -82,39 +86,39 @@ func (s *Metrics) Equal(other *Metrics) bool {
}

func getOtelMeters(tableName string, clientID string) *OtelMeters {
resources, err := otel.Meter(otelName).Int64Counter("sync.table.resources",
resources, err := otel.Meter(OtelName).Int64Counter("sync.table.resources",
metric.WithDescription("Number of resources synced for a table"),
metric.WithUnit("/{tot}"),
)
if err != nil {
return nil
}

errors, err := otel.Meter(otelName).Int64Counter("sync.table.errors",
errors, err := otel.Meter(OtelName).Int64Counter("sync.table.errors",
metric.WithDescription("Number of errors encountered while syncing a table"),
metric.WithUnit("/{tot}"),
)
if err != nil {
return nil
}

panics, err := otel.Meter(otelName).Int64Counter("sync.table.panics",
panics, err := otel.Meter(OtelName).Int64Counter("sync.table.panics",
metric.WithDescription("Number of panics encountered while syncing a table"),
metric.WithUnit("/{tot}"),
)
if err != nil {
return nil
}

startTime, err := otel.Meter(otelName).Int64Counter("sync.table.start_time",
startTime, err := otel.Meter(OtelName).Int64Counter("sync.table.start_time",
metric.WithDescription("Start time of syncing a table"),
metric.WithUnit("ns"),
)
if err != nil {
return nil
}

endTime, err := otel.Meter(otelName).Int64Counter("sync.table.end_time",
endTime, err := otel.Meter(OtelName).Int64Counter("sync.table.end_time",
metric.WithDescription("End time of syncing a table"),
metric.WithUnit("ns"),
)
Expand All @@ -136,7 +140,7 @@ func getOtelMeters(tableName string, clientID string) *OtelMeters {
}
}

func (s *Metrics) initWithClients(table *schema.Table, clients []schema.ClientMeta) {
func (s *Metrics) InitWithClients(table *schema.Table, clients []schema.ClientMeta) {
s.TableClient[table.Name] = make(map[string]*TableClientMetrics, len(clients))
for _, client := range clients {
tableName := table.Name
Expand All @@ -146,7 +150,7 @@ func (s *Metrics) initWithClients(table *schema.Table, clients []schema.ClientMe
}
}
for _, relation := range table.Relations {
s.initWithClients(relation, clients)
s.InitWithClients(relation, clients)
}
}

Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package scheduler
package metrics

import "testing"

Expand Down
67 changes: 67 additions & 0 deletions scheduler/queue/active_work_signal.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
package queue

import (
"sync"
"sync/atomic"
)

// activeWorkSignal is a thread-safe coordinator for awaiting a worker pool
// that relies on a queue that might be temporarily empty.
//
// If queue is empty and workers idle, done!
//
// If the queue is empty but a worker is working on a task, we must wait and check
// if it's empty after that worker finishes. That's why we need this.
//
// Use it like this:
//
// - When a worker picks up a task, call `Add()` (like a WaitGroup)
// - When a worker finishes a task, call `Done()` (like a WaitGroup)
//
// - If the queue is empty, check `IsIdle()` to check if no workers are active.
// - If workers are still active, call `Wait()` to block until state changes.
type activeWorkSignal struct {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nice 🚀 Much better than what I did before 🙈

countChangeSignal *sync.Cond
activeWorkUnitCount *atomic.Int32
isStarted *atomic.Bool
}

func newActiveWorkSignal() *activeWorkSignal {
return &activeWorkSignal{
countChangeSignal: sync.NewCond(&sync.Mutex{}),
activeWorkUnitCount: &atomic.Int32{},
isStarted: &atomic.Bool{},
}
}

// Add means a worker has started working on a task.
//
// Wake up the work queuing goroutine.
func (s *activeWorkSignal) Add() {
s.activeWorkUnitCount.Add(1)
s.isStarted.Store(true)
s.countChangeSignal.Signal()
}

// Done means a worker has finished working on a task.
//
// If the count became zero, wake up the work queuing goroutine (might have finished).
func (s *activeWorkSignal) Done() {
s.activeWorkUnitCount.Add(-1)
s.countChangeSignal.Signal()
}

// IsIdle returns true if no workers are active. If queue is empty and workers idle, done!
func (s *activeWorkSignal) IsIdle() bool {
return s.isStarted.Load() && s.activeWorkUnitCount.Load() <= 0
}

// Wait blocks until the count of active workers changes.
func (s *activeWorkSignal) Wait() {
if s.activeWorkUnitCount.Load() <= 0 {
return
}
s.countChangeSignal.L.Lock()
defer s.countChangeSignal.L.Unlock()
s.countChangeSignal.Wait()
}
41 changes: 41 additions & 0 deletions scheduler/queue/concurrent_random_queue.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
package queue

import (
"math/rand"
"sync"
)

// ConcurrentRandomQueue is a generic, thread-safe queue
// that pops random elements in O(1) time.
type ConcurrentRandomQueue[T any] struct {
mu sync.Mutex
queue []T
random *rand.Rand
}

func NewConcurrentRandomQueue[T any](seed int64, capacityHint int) *ConcurrentRandomQueue[T] {
return &ConcurrentRandomQueue[T]{queue: make([]T, 0, capacityHint), random: rand.New(rand.NewSource(seed))}
}

func (q *ConcurrentRandomQueue[T]) Push(item T) {
q.mu.Lock()
defer q.mu.Unlock()

q.queue = append(q.queue, item)
}

func (q *ConcurrentRandomQueue[T]) Pop() *T {
q.mu.Lock()
defer q.mu.Unlock()

if len(q.queue) == 0 {
return nil
}
idx := q.random.Intn(len(q.queue))
lastIdx := len(q.queue) - 1
q.queue[idx], q.queue[lastIdx] = q.queue[lastIdx], q.queue[idx]
item := q.queue[lastIdx]
q.queue = q.queue[:lastIdx]

return &item
}
138 changes: 138 additions & 0 deletions scheduler/queue/scheduler.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,138 @@
package queue

import (
"context"

"github.com/cloudquery/plugin-sdk/v4/caser"
"github.com/cloudquery/plugin-sdk/v4/scheduler/metrics"
"github.com/cloudquery/plugin-sdk/v4/schema"
"github.com/google/uuid"
"github.com/rs/zerolog"
"golang.org/x/sync/errgroup"
)

const DefaultWorkerCount = 1000

// WorkUnit is an atomic unit of work that the scheduler syncs.
//
// It is one table resolver (same as all other scheduler strategies).
//
// But if it is a non-top-level table, it is bound to a single parent resource.
type WorkUnit struct {
Table *schema.Table
Client schema.ClientMeta
Parent *schema.Resource
}

type Scheduler struct {
workerCount int
logger zerolog.Logger
caser *caser.Caser
deterministicCQID bool
metrics *metrics.Metrics
invocationID string
seed int64
}

type Option func(*Scheduler)

func WithWorkerCount(workerCount int) Option {
return func(d *Scheduler) {
d.workerCount = workerCount
}
}

func WithCaser(c *caser.Caser) Option {
return func(d *Scheduler) {
d.caser = c
}
}

func WithDeterministicCQID(deterministicCQID bool) Option {
return func(d *Scheduler) {
d.deterministicCQID = deterministicCQID
}
}

func WithInvocationID(invocationID string) Option {
return func(d *Scheduler) {
d.invocationID = invocationID
}
}

func NewRandomQueueScheduler(logger zerolog.Logger, m *metrics.Metrics, seed int64, opts ...Option) *Scheduler {
scheduler := &Scheduler{
logger: logger,
metrics: m,
workerCount: DefaultWorkerCount,
caser: caser.New(),
invocationID: uuid.New().String(),
seed: seed,
}

for _, opt := range opts {
opt(scheduler)
}

return scheduler
}

func (d *Scheduler) Sync(ctx context.Context, tableClients []WorkUnit, resolvedResources chan<- *schema.Resource) {
if len(tableClients) == 0 {
return
}
queue := NewConcurrentRandomQueue[WorkUnit](d.seed, len(tableClients))
for _, tc := range tableClients {
queue.Push(tc)
}

jobs := make(chan *WorkUnit)
activeWorkSignal := newActiveWorkSignal()

// Worker pool
workerPool, _ := errgroup.WithContext(ctx)
for w := 0; w < d.workerCount; w++ {
workerPool.Go(func() error {
newWorker(
jobs,
queue,
resolvedResources,
d.logger,
d.caser,
d.invocationID,
d.deterministicCQID,
d.metrics,
).work(ctx, activeWorkSignal)
return nil
})
}

// Work distribution
go func() {
defer close(jobs)
for {
select {
case <-ctx.Done():
return
default:
item := queue.Pop()

// There is work to do
if item != nil {
jobs <- item
continue
}

// Queue is empty and no active work, done!
if activeWorkSignal.IsIdle() {
return
}

// Queue is empty and there is active work, wait for changes
activeWorkSignal.Wait()
}
}
}()

_ = workerPool.Wait()
}
Loading
Loading