Skip to content
This repository was archived by the owner on Jan 28, 2021. It is now read-only.

Format SHOW PROCESSLIST progress as a tree #861

Merged
merged 1 commit into from
Oct 29, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 5 additions & 5 deletions sql/analyzer/process.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,21 +40,21 @@ func trackProcess(ctx *sql.Context, a *Analyzer, n sql.Node) (sql.Node, error) {
}
total = count
}
processList.AddProgressItem(ctx.Pid(), name, total)
processList.AddTableProgress(ctx.Pid(), name, total)

seen[name] = struct{}{}

onPartitionDone := func(partitionName string) {
processList.UpdateProgress(ctx.Pid(), name, 1)
processList.RemoveProgressItem(ctx.Pid(), partitionName)
processList.UpdateTableProgress(ctx.Pid(), name, 1)
processList.RemovePartitionProgress(ctx.Pid(), name, partitionName)
}

onPartitionStart := func(partitionName string) {
processList.AddProgressItem(ctx.Pid(), partitionName, -1)
processList.AddPartitionProgress(ctx.Pid(), name, partitionName, -1)
}

onRowNext := func(partitionName string) {
processList.UpdateProgress(ctx.Pid(), partitionName, 1)
processList.UpdatePartitionProgress(ctx.Pid(), name, partitionName, 1)
}

var t sql.Table
Expand Down
14 changes: 10 additions & 4 deletions sql/analyzer/process_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,10 +35,16 @@ func TestTrackProcess(t *testing.T) {
require.Len(processes, 1)
require.Equal("SELECT foo", processes[0].Query)
require.Equal(sql.QueryProcess, processes[0].Type)
require.Equal(map[string]sql.Progress{
"foo": sql.Progress{Total: 2},
"bar": sql.Progress{Total: 4},
}, processes[0].Progress)
require.Equal(
map[string]sql.TableProgress{
"foo": sql.TableProgress{
Progress: sql.Progress{Name: "foo", Done: 0, Total: 2},
PartitionsProgress: map[string]sql.PartitionProgress{}},
"bar": sql.TableProgress{
Progress: sql.Progress{Name: "bar", Done: 0, Total: 4},
PartitionsProgress: map[string]sql.PartitionProgress{}},
},
processes[0].Progress)

proc, ok := result.(*plan.QueryProcess)
require.True(ok)
Expand Down
26 changes: 20 additions & 6 deletions sql/plan/processlist.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
package plan

import (
"fmt"
"sort"
"strings"

Expand Down Expand Up @@ -77,21 +76,36 @@ func (p *ShowProcessList) RowIter(ctx *sql.Context) (sql.RowIter, error) {

for i, proc := range processes {
var status []string
for name, progress := range proc.Progress {
status = append(status, fmt.Sprintf("%s(%s)", name, progress))
var names []string
for name := range proc.Progress {
names = append(names, name)
}
sort.Strings(names)

for _, name := range names {
progress := proc.Progress[name]

printer := sql.NewTreePrinter()
_ = printer.WriteNode("\n" + progress.String())
children := []string{}
for _, partitionProgress := range progress.PartitionsProgress {
children = append(children, partitionProgress.String())
}
sort.Strings(children)
_ = printer.WriteChildren(children...)

status = append(status, printer.String())
}

if len(status) == 0 {
status = []string{"running"}
}

sort.Strings(status)

rows[i] = process{
id: int64(proc.Connection),
user: proc.User,
time: int64(proc.Seconds()),
state: strings.Join(status, ", "),
state: strings.Join(status, ""),
command: proc.Type.String(),
host: ctx.Session.Client().Address,
info: proc.Query,
Expand Down
27 changes: 18 additions & 9 deletions sql/plan/processlist_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,19 +21,21 @@ func TestShowProcessList(t *testing.T) {
ctx, err := p.AddProcess(ctx, sql.QueryProcess, "SELECT foo")
require.NoError(err)

p.AddProgressItem(ctx.Pid(), "a", 5)
p.AddProgressItem(ctx.Pid(), "b", 6)
p.AddTableProgress(ctx.Pid(), "a", 5)
p.AddTableProgress(ctx.Pid(), "b", 6)

ctx = sql.NewContext(context.Background(), sql.WithPid(2), sql.WithSession(sess))
ctx, err = p.AddProcess(ctx, sql.CreateIndexProcess, "SELECT bar")
require.NoError(err)

p.AddProgressItem(ctx.Pid(), "foo", 2)
p.AddTableProgress(ctx.Pid(), "foo", 2)

p.UpdateProgress(1, "a", 3)
p.UpdateProgress(1, "a", 1)
p.UpdateProgress(1, "b", 2)
p.UpdateProgress(2, "foo", 1)
p.UpdateTableProgress(1, "a", 3)
p.UpdateTableProgress(1, "a", 1)
p.UpdatePartitionProgress(1, "a", "a-1", 7)
p.UpdatePartitionProgress(1, "a", "a-2", 9)
p.UpdateTableProgress(1, "b", 2)
p.UpdateTableProgress(2, "foo", 1)

n.ProcessList = p
n.Database = "foo"
Expand All @@ -44,8 +46,15 @@ func TestShowProcessList(t *testing.T) {
require.NoError(err)

expected := []sql.Row{
{int64(1), "foo", addr, "foo", "query", int64(0), "a(4/5), b(2/6)", "SELECT foo"},
{int64(1), "foo", addr, "foo", "create_index", int64(0), "foo(1/2)", "SELECT bar"},
{int64(1), "foo", addr, "foo", "query", int64(0),
`
a (4/5 partitions)
├─ a-1 (7/? rows)
└─ a-2 (9/? rows)

b (2/6 partitions)
`, "SELECT foo"},
{int64(1), "foo", addr, "foo", "create_index", int64(0), "\nfoo (1/2 partitions)\n", "SELECT bar"},
}

require.ElementsMatch(expected, rows)
Expand Down
124 changes: 111 additions & 13 deletions sql/processlist.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,17 +12,46 @@ import (

// Progress between done items and total items.
type Progress struct {
Name string
Done int64
Total int64
}

func (p Progress) String() string {
func (p Progress) totalString() string {
var total = "?"
if p.Total > 0 {
total = fmt.Sprint(p.Total)
}
return total
}

// TableProgress keeps track of a table progress, and for each of its partitions
type TableProgress struct {
Progress
PartitionsProgress map[string]PartitionProgress
}

func NewTableProgress(name string, total int64) TableProgress {
return TableProgress{
Progress: Progress{
Name: name,
Total: total,
},
PartitionsProgress: make(map[string]PartitionProgress),
}
}

func (p TableProgress) String() string {
return fmt.Sprintf("%s (%d/%s partitions)", p.Name, p.Done, p.totalString())
}

return fmt.Sprintf("%d/%s", p.Done, total)
// PartitionProgress keeps track of a partition progress
type PartitionProgress struct {
Progress
}

func (p PartitionProgress) String() string {
return fmt.Sprintf("%s (%d/%s rows)", p.Name, p.Done, p.totalString())
}

// ProcessType is the type of process.
Expand Down Expand Up @@ -53,7 +82,7 @@ type Process struct {
User string
Type ProcessType
Query string
Progress map[string]Progress
Progress map[string]TableProgress
StartedAt time.Time
Kill context.CancelFunc
}
Expand Down Expand Up @@ -108,7 +137,7 @@ func (pl *ProcessList) AddProcess(
Connection: ctx.ID(),
Type: typ,
Query: query,
Progress: make(map[string]Progress),
Progress: make(map[string]TableProgress),
User: ctx.Session.Client().User,
StartedAt: time.Now(),
Kill: cancel,
Expand All @@ -117,9 +146,9 @@ func (pl *ProcessList) AddProcess(
return ctx, nil
}

// UpdateProgress updates the progress of the item with the given name for the
// UpdateTableProgress updates the progress of the table with the given name for the
// process with the given pid.
func (pl *ProcessList) UpdateProgress(pid uint64, name string, delta int64) {
func (pl *ProcessList) UpdateTableProgress(pid uint64, name string, delta int64) {
pl.mu.Lock()
defer pl.mu.Unlock()

Expand All @@ -130,16 +159,41 @@ func (pl *ProcessList) UpdateProgress(pid uint64, name string, delta int64) {

progress, ok := p.Progress[name]
if !ok {
progress = Progress{Total: -1}
progress = NewTableProgress(name, -1)
}

progress.Done += delta
p.Progress[name] = progress
}

// AddProgressItem adds a new item to track progress from to the process with
// UpdatePartitionProgress updates the progress of the table partition with the
// given name for the process with the given pid.
func (pl *ProcessList) UpdatePartitionProgress(pid uint64, tableName, partitionName string, delta int64) {
pl.mu.Lock()
defer pl.mu.Unlock()

p, ok := pl.procs[pid]
if !ok {
return
}

tablePg, ok := p.Progress[tableName]
if !ok {
return
}

partitionPg, ok := tablePg.PartitionsProgress[partitionName]
if !ok {
partitionPg = PartitionProgress{Progress: Progress{Name: partitionName, Total: -1}}
}

partitionPg.Done += delta
tablePg.PartitionsProgress[partitionName] = partitionPg
}

// AddTableProgress adds a new item to track progress from to the process with
// the given pid. If the pid does not exist, it will do nothing.
func (pl *ProcessList) AddProgressItem(pid uint64, name string, total int64) {
func (pl *ProcessList) AddTableProgress(pid uint64, name string, total int64) {
pl.mu.Lock()
defer pl.mu.Unlock()

Expand All @@ -152,13 +206,38 @@ func (pl *ProcessList) AddProgressItem(pid uint64, name string, total int64) {
pg.Total = total
p.Progress[name] = pg
} else {
p.Progress[name] = Progress{Total: total}
p.Progress[name] = NewTableProgress(name, total)
}
}

// RemoveProgressItem removes an existing item tracking progress from the
// AddPartitionProgress adds a new item to track progress from to the process with
// the given pid. If the pid or the table does not exist, it will do nothing.
func (pl *ProcessList) AddPartitionProgress(pid uint64, tableName, partitionName string, total int64) {
pl.mu.Lock()
defer pl.mu.Unlock()

p, ok := pl.procs[pid]
if !ok {
return
}

tablePg, ok := p.Progress[tableName]
if !ok {
return
}

if pg, ok := tablePg.PartitionsProgress[partitionName]; ok {
pg.Total = total
tablePg.PartitionsProgress[partitionName] = pg
} else {
tablePg.PartitionsProgress[partitionName] =
PartitionProgress{Progress: Progress{Name: partitionName, Total: total}}
}
}

// RemoveTableProgress removes an existing item tracking progress from the
// process with the given pid, if it exists.
func (pl *ProcessList) RemoveProgressItem(pid uint64, name string) {
func (pl *ProcessList) RemoveTableProgress(pid uint64, name string) {
pl.mu.Lock()
defer pl.mu.Unlock()

Expand All @@ -170,6 +249,25 @@ func (pl *ProcessList) RemoveProgressItem(pid uint64, name string) {
delete(p.Progress, name)
}

// RemovePartitionProgress removes an existing item tracking progress from the
// process with the given pid, if it exists.
func (pl *ProcessList) RemovePartitionProgress(pid uint64, tableName, partitionName string) {
pl.mu.Lock()
defer pl.mu.Unlock()

p, ok := pl.procs[pid]
if !ok {
return
}

tablePg, ok := p.Progress[tableName]
if !ok {
return
}

delete(tablePg.PartitionsProgress, partitionName)
}

// Kill terminates all queries for a given connection id.
func (pl *ProcessList) Kill(connID uint32) {
pl.mu.Lock()
Expand Down Expand Up @@ -220,7 +318,7 @@ func (pl *ProcessList) Processes() []Process {

for _, proc := range pl.procs {
p := *proc
var progress = make(map[string]Progress, len(p.Progress))
var progress = make(map[string]TableProgress, len(p.Progress))
for n, p := range p.Progress {
progress[n] = p
}
Expand Down
Loading