From e81f029b62114afaf9b2288d7cd30acabb0c32e5 Mon Sep 17 00:00:00 2001 From: Carlos Date: Tue, 29 Oct 2019 12:57:53 +0000 Subject: [PATCH] Format SHOW PROCESSLIST progress as a tree Signed-off-by: Carlos --- sql/analyzer/process.go | 10 +-- sql/analyzer/process_test.go | 14 ++-- sql/plan/processlist.go | 26 ++++++-- sql/plan/processlist_test.go | 27 +++++--- sql/processlist.go | 124 +++++++++++++++++++++++++++++++---- sql/processlist_test.go | 37 ++++++++--- 6 files changed, 191 insertions(+), 47 deletions(-) diff --git a/sql/analyzer/process.go b/sql/analyzer/process.go index 09fbae4e0..aa0d9d0c7 100644 --- a/sql/analyzer/process.go +++ b/sql/analyzer/process.go @@ -40,21 +40,21 @@ func trackProcess(ctx *sql.Context, a *Analyzer, n sql.Node) (sql.Node, error) { } total = count } - processList.AddProgressItem(ctx.Pid(), name, total) + processList.AddTableProgress(ctx.Pid(), name, total) seen[name] = struct{}{} onPartitionDone := func(partitionName string) { - processList.UpdateProgress(ctx.Pid(), name, 1) - processList.RemoveProgressItem(ctx.Pid(), partitionName) + processList.UpdateTableProgress(ctx.Pid(), name, 1) + processList.RemovePartitionProgress(ctx.Pid(), name, partitionName) } onPartitionStart := func(partitionName string) { - processList.AddProgressItem(ctx.Pid(), partitionName, -1) + processList.AddPartitionProgress(ctx.Pid(), name, partitionName, -1) } onRowNext := func(partitionName string) { - processList.UpdateProgress(ctx.Pid(), partitionName, 1) + processList.UpdatePartitionProgress(ctx.Pid(), name, partitionName, 1) } var t sql.Table diff --git a/sql/analyzer/process_test.go b/sql/analyzer/process_test.go index 32a719500..91271af4c 100644 --- a/sql/analyzer/process_test.go +++ b/sql/analyzer/process_test.go @@ -35,10 +35,16 @@ func TestTrackProcess(t *testing.T) { require.Len(processes, 1) require.Equal("SELECT foo", processes[0].Query) require.Equal(sql.QueryProcess, processes[0].Type) - require.Equal(map[string]sql.Progress{ - "foo": sql.Progress{Total: 2}, - "bar": sql.Progress{Total: 4}, - }, processes[0].Progress) + require.Equal( + map[string]sql.TableProgress{ + "foo": sql.TableProgress{ + Progress: sql.Progress{Name: "foo", Done: 0, Total: 2}, + PartitionsProgress: map[string]sql.PartitionProgress{}}, + "bar": sql.TableProgress{ + Progress: sql.Progress{Name: "bar", Done: 0, Total: 4}, + PartitionsProgress: map[string]sql.PartitionProgress{}}, + }, + processes[0].Progress) proc, ok := result.(*plan.QueryProcess) require.True(ok) diff --git a/sql/plan/processlist.go b/sql/plan/processlist.go index bc9f4d18c..a447fb79d 100644 --- a/sql/plan/processlist.go +++ b/sql/plan/processlist.go @@ -1,7 +1,6 @@ package plan import ( - "fmt" "sort" "strings" @@ -77,21 +76,36 @@ func (p *ShowProcessList) RowIter(ctx *sql.Context) (sql.RowIter, error) { for i, proc := range processes { var status []string - for name, progress := range proc.Progress { - status = append(status, fmt.Sprintf("%s(%s)", name, progress)) + var names []string + for name := range proc.Progress { + names = append(names, name) + } + sort.Strings(names) + + for _, name := range names { + progress := proc.Progress[name] + + printer := sql.NewTreePrinter() + _ = printer.WriteNode("\n" + progress.String()) + children := []string{} + for _, partitionProgress := range progress.PartitionsProgress { + children = append(children, partitionProgress.String()) + } + sort.Strings(children) + _ = printer.WriteChildren(children...) + + status = append(status, printer.String()) } if len(status) == 0 { status = []string{"running"} } - sort.Strings(status) - rows[i] = process{ id: int64(proc.Connection), user: proc.User, time: int64(proc.Seconds()), - state: strings.Join(status, ", "), + state: strings.Join(status, ""), command: proc.Type.String(), host: ctx.Session.Client().Address, info: proc.Query, diff --git a/sql/plan/processlist_test.go b/sql/plan/processlist_test.go index 401661ba2..12ee98b9a 100644 --- a/sql/plan/processlist_test.go +++ b/sql/plan/processlist_test.go @@ -21,19 +21,21 @@ func TestShowProcessList(t *testing.T) { ctx, err := p.AddProcess(ctx, sql.QueryProcess, "SELECT foo") require.NoError(err) - p.AddProgressItem(ctx.Pid(), "a", 5) - p.AddProgressItem(ctx.Pid(), "b", 6) + p.AddTableProgress(ctx.Pid(), "a", 5) + p.AddTableProgress(ctx.Pid(), "b", 6) ctx = sql.NewContext(context.Background(), sql.WithPid(2), sql.WithSession(sess)) ctx, err = p.AddProcess(ctx, sql.CreateIndexProcess, "SELECT bar") require.NoError(err) - p.AddProgressItem(ctx.Pid(), "foo", 2) + p.AddTableProgress(ctx.Pid(), "foo", 2) - p.UpdateProgress(1, "a", 3) - p.UpdateProgress(1, "a", 1) - p.UpdateProgress(1, "b", 2) - p.UpdateProgress(2, "foo", 1) + p.UpdateTableProgress(1, "a", 3) + p.UpdateTableProgress(1, "a", 1) + p.UpdatePartitionProgress(1, "a", "a-1", 7) + p.UpdatePartitionProgress(1, "a", "a-2", 9) + p.UpdateTableProgress(1, "b", 2) + p.UpdateTableProgress(2, "foo", 1) n.ProcessList = p n.Database = "foo" @@ -44,8 +46,15 @@ func TestShowProcessList(t *testing.T) { require.NoError(err) expected := []sql.Row{ - {int64(1), "foo", addr, "foo", "query", int64(0), "a(4/5), b(2/6)", "SELECT foo"}, - {int64(1), "foo", addr, "foo", "create_index", int64(0), "foo(1/2)", "SELECT bar"}, + {int64(1), "foo", addr, "foo", "query", int64(0), + ` +a (4/5 partitions) + ├─ a-1 (7/? rows) + └─ a-2 (9/? rows) + +b (2/6 partitions) +`, "SELECT foo"}, + {int64(1), "foo", addr, "foo", "create_index", int64(0), "\nfoo (1/2 partitions)\n", "SELECT bar"}, } require.ElementsMatch(expected, rows) diff --git a/sql/processlist.go b/sql/processlist.go index 580323238..5bc1fecf6 100644 --- a/sql/processlist.go +++ b/sql/processlist.go @@ -12,17 +12,46 @@ import ( // Progress between done items and total items. type Progress struct { + Name string Done int64 Total int64 } -func (p Progress) String() string { +func (p Progress) totalString() string { var total = "?" if p.Total > 0 { total = fmt.Sprint(p.Total) } + return total +} + +// TableProgress keeps track of a table progress, and for each of its partitions +type TableProgress struct { + Progress + PartitionsProgress map[string]PartitionProgress +} + +func NewTableProgress(name string, total int64) TableProgress { + return TableProgress{ + Progress: Progress{ + Name: name, + Total: total, + }, + PartitionsProgress: make(map[string]PartitionProgress), + } +} + +func (p TableProgress) String() string { + return fmt.Sprintf("%s (%d/%s partitions)", p.Name, p.Done, p.totalString()) +} - return fmt.Sprintf("%d/%s", p.Done, total) +// PartitionProgress keeps track of a partition progress +type PartitionProgress struct { + Progress +} + +func (p PartitionProgress) String() string { + return fmt.Sprintf("%s (%d/%s rows)", p.Name, p.Done, p.totalString()) } // ProcessType is the type of process. @@ -53,7 +82,7 @@ type Process struct { User string Type ProcessType Query string - Progress map[string]Progress + Progress map[string]TableProgress StartedAt time.Time Kill context.CancelFunc } @@ -108,7 +137,7 @@ func (pl *ProcessList) AddProcess( Connection: ctx.ID(), Type: typ, Query: query, - Progress: make(map[string]Progress), + Progress: make(map[string]TableProgress), User: ctx.Session.Client().User, StartedAt: time.Now(), Kill: cancel, @@ -117,9 +146,9 @@ func (pl *ProcessList) AddProcess( return ctx, nil } -// UpdateProgress updates the progress of the item with the given name for the +// UpdateTableProgress updates the progress of the table with the given name for the // process with the given pid. -func (pl *ProcessList) UpdateProgress(pid uint64, name string, delta int64) { +func (pl *ProcessList) UpdateTableProgress(pid uint64, name string, delta int64) { pl.mu.Lock() defer pl.mu.Unlock() @@ -130,16 +159,41 @@ func (pl *ProcessList) UpdateProgress(pid uint64, name string, delta int64) { progress, ok := p.Progress[name] if !ok { - progress = Progress{Total: -1} + progress = NewTableProgress(name, -1) } progress.Done += delta p.Progress[name] = progress } -// AddProgressItem adds a new item to track progress from to the process with +// UpdatePartitionProgress updates the progress of the table partition with the +// given name for the process with the given pid. +func (pl *ProcessList) UpdatePartitionProgress(pid uint64, tableName, partitionName string, delta int64) { + pl.mu.Lock() + defer pl.mu.Unlock() + + p, ok := pl.procs[pid] + if !ok { + return + } + + tablePg, ok := p.Progress[tableName] + if !ok { + return + } + + partitionPg, ok := tablePg.PartitionsProgress[partitionName] + if !ok { + partitionPg = PartitionProgress{Progress: Progress{Name: partitionName, Total: -1}} + } + + partitionPg.Done += delta + tablePg.PartitionsProgress[partitionName] = partitionPg +} + +// AddTableProgress adds a new item to track progress from to the process with // the given pid. If the pid does not exist, it will do nothing. -func (pl *ProcessList) AddProgressItem(pid uint64, name string, total int64) { +func (pl *ProcessList) AddTableProgress(pid uint64, name string, total int64) { pl.mu.Lock() defer pl.mu.Unlock() @@ -152,13 +206,38 @@ func (pl *ProcessList) AddProgressItem(pid uint64, name string, total int64) { pg.Total = total p.Progress[name] = pg } else { - p.Progress[name] = Progress{Total: total} + p.Progress[name] = NewTableProgress(name, total) } } -// RemoveProgressItem removes an existing item tracking progress from the +// AddPartitionProgress adds a new item to track progress from to the process with +// the given pid. If the pid or the table does not exist, it will do nothing. +func (pl *ProcessList) AddPartitionProgress(pid uint64, tableName, partitionName string, total int64) { + pl.mu.Lock() + defer pl.mu.Unlock() + + p, ok := pl.procs[pid] + if !ok { + return + } + + tablePg, ok := p.Progress[tableName] + if !ok { + return + } + + if pg, ok := tablePg.PartitionsProgress[partitionName]; ok { + pg.Total = total + tablePg.PartitionsProgress[partitionName] = pg + } else { + tablePg.PartitionsProgress[partitionName] = + PartitionProgress{Progress: Progress{Name: partitionName, Total: total}} + } +} + +// RemoveTableProgress removes an existing item tracking progress from the // process with the given pid, if it exists. -func (pl *ProcessList) RemoveProgressItem(pid uint64, name string) { +func (pl *ProcessList) RemoveTableProgress(pid uint64, name string) { pl.mu.Lock() defer pl.mu.Unlock() @@ -170,6 +249,25 @@ func (pl *ProcessList) RemoveProgressItem(pid uint64, name string) { delete(p.Progress, name) } +// RemovePartitionProgress removes an existing item tracking progress from the +// process with the given pid, if it exists. +func (pl *ProcessList) RemovePartitionProgress(pid uint64, tableName, partitionName string) { + pl.mu.Lock() + defer pl.mu.Unlock() + + p, ok := pl.procs[pid] + if !ok { + return + } + + tablePg, ok := p.Progress[tableName] + if !ok { + return + } + + delete(tablePg.PartitionsProgress, partitionName) +} + // Kill terminates all queries for a given connection id. func (pl *ProcessList) Kill(connID uint32) { pl.mu.Lock() @@ -220,7 +318,7 @@ func (pl *ProcessList) Processes() []Process { for _, proc := range pl.procs { p := *proc - var progress = make(map[string]Progress, len(p.Progress)) + var progress = make(map[string]TableProgress, len(p.Progress)) for n, p := range p.Progress { progress[n] = p } diff --git a/sql/processlist_test.go b/sql/processlist_test.go index a6fb3a44f..198f40b12 100644 --- a/sql/processlist_test.go +++ b/sql/processlist_test.go @@ -20,16 +20,16 @@ func TestProcessList(t *testing.T) { require.Equal(uint64(1), ctx.Pid()) require.Len(p.procs, 1) - p.AddProgressItem(ctx.Pid(), "a", 5) - p.AddProgressItem(ctx.Pid(), "b", 6) + p.AddTableProgress(ctx.Pid(), "a", 5) + p.AddTableProgress(ctx.Pid(), "b", 6) expectedProcess := &Process{ Pid: 1, Connection: 1, Type: QueryProcess, - Progress: map[string]Progress{ - "a": Progress{0, 5}, - "b": Progress{0, 6}, + Progress: map[string]TableProgress{ + "a": {Progress{Name: "a", Done: 0, Total: 5}, map[string]PartitionProgress{}}, + "b": {Progress{Name: "b", Done: 0, Total: 6}, map[string]PartitionProgress{}}, }, User: "foo", Query: "SELECT foo", @@ -39,19 +39,36 @@ func TestProcessList(t *testing.T) { p.procs[ctx.Pid()].Kill = nil require.Equal(expectedProcess, p.procs[ctx.Pid()]) + p.AddPartitionProgress(ctx.Pid(), "b", "b-1", -1) + p.AddPartitionProgress(ctx.Pid(), "b", "b-2", -1) + p.AddPartitionProgress(ctx.Pid(), "b", "b-3", -1) + + p.UpdatePartitionProgress(ctx.Pid(), "b", "b-2", 1) + + p.RemovePartitionProgress(ctx.Pid(), "b", "b-3") + + expectedProgress := map[string]TableProgress{ + "a": {Progress{Name: "a", Total: 5}, map[string]PartitionProgress{}}, + "b": {Progress{Name: "b", Total: 6}, map[string]PartitionProgress{ + "b-1": {Progress{Name: "b-1", Done: 0, Total: -1}}, + "b-2": {Progress{Name: "b-2", Done: 1, Total: -1}}, + }}, + } + require.Equal(expectedProgress, p.procs[ctx.Pid()].Progress) + ctx = NewContext(context.Background(), WithPid(2), WithSession(sess)) ctx, err = p.AddProcess(ctx, CreateIndexProcess, "SELECT bar") require.NoError(err) - p.AddProgressItem(ctx.Pid(), "foo", 2) + p.AddTableProgress(ctx.Pid(), "foo", 2) require.Equal(uint64(2), ctx.Pid()) require.Len(p.procs, 2) - p.UpdateProgress(1, "a", 3) - p.UpdateProgress(1, "a", 1) - p.UpdateProgress(1, "b", 2) - p.UpdateProgress(2, "foo", 1) + p.UpdateTableProgress(1, "a", 3) + p.UpdateTableProgress(1, "a", 1) + p.UpdateTableProgress(1, "b", 2) + p.UpdateTableProgress(2, "foo", 1) require.Equal(int64(4), p.procs[1].Progress["a"].Done) require.Equal(int64(2), p.procs[1].Progress["b"].Done)