Skip to content

Automatically record heap profiles in testplans #147

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 4 commits into from
Feb 12, 2021
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions .circleci/config.yml
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,9 @@ jobs:
- run:
name: "trigger graphsync testplan on taas"
command: ~/testground-cli run composition -f $HOME/testground/plans/graphsync/stress-k8s.toml --metadata-commit=$CIRCLE_SHA1 --metadata-repo=ipfs/go-graphsync --metadata-branch=$CIRCLE_BRANCH
- run:
name: "trigger graphsync memory stress on taas"
command: ~/testground-cli run composition -f $HOME/testground/plans/graphsync/memory-stress-k8s.toml --metadata-commit=$CIRCLE_SHA1 --metadata-repo=ipfs/go-graphsync --metadata-branch=$CIRCLE_BRANCH

workflows:
version: 2
Expand Down
18 changes: 10 additions & 8 deletions testplans/graphsync/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -3,31 +3,33 @@ module github.com/ipfs/go-graphsync/testplans/graphsync
go 1.14

require (
github.com/davidlazar/go-crypto v0.0.0-20190912175916-7055855a373f // indirect
github.com/dustin/go-humanize v1.0.0
github.com/hannahhoward/all-selector v0.1.0
github.com/hannahhoward/all-selector v0.2.0
github.com/ipfs/go-blockservice v0.1.3
github.com/ipfs/go-cid v0.0.6
github.com/ipfs/go-cid v0.0.7
github.com/ipfs/go-datastore v0.4.4
github.com/ipfs/go-graphsync v0.1.2
github.com/ipfs/go-ds-badger v0.2.3
github.com/ipfs/go-graphsync v0.6.0
github.com/ipfs/go-ipfs-blockstore v0.1.4
github.com/ipfs/go-ipfs-chunker v0.0.5
github.com/ipfs/go-ipfs-exchange-offline v0.0.1
github.com/ipfs/go-ipfs-files v0.0.8
github.com/ipfs/go-ipld-format v0.2.0
github.com/ipfs/go-merkledag v0.3.1
github.com/ipfs/go-unixfs v0.2.4
github.com/ipld/go-ipld-prime v0.4.0
github.com/ipld/go-ipld-prime v0.5.1-0.20201021195245-109253e8a018
github.com/kr/text v0.2.0 // indirect
github.com/libp2p/go-libp2p v0.10.0
github.com/libp2p/go-libp2p-core v0.6.0
github.com/libp2p/go-libp2p v0.12.0
github.com/libp2p/go-libp2p-core v0.7.0
github.com/libp2p/go-libp2p-noise v0.1.1
github.com/libp2p/go-libp2p-secio v0.2.2
github.com/libp2p/go-libp2p-tls v0.1.3
github.com/libp2p/go-sockaddr v0.1.0 // indirect
github.com/niemeyer/pretty v0.0.0-20200227124842-a10e7caefd8e // indirect
github.com/testground/sdk-go v0.2.7-0.20201112151952-8ee00c80c3ec
golang.org/x/sync v0.0.0-20200625203802-6e8e738ad208
golang.org/x/sys v0.0.0-20200625212154-ddb9806d33ae // indirect
google.golang.org/protobuf v1.25.0 // indirect
gopkg.in/check.v1 v1.0.0-20200227125254-8fa46927fb4f // indirect
gopkg.in/yaml.v2 v2.2.8 // indirect
)
)
290 changes: 136 additions & 154 deletions testplans/graphsync/go.sum

Large diffs are not rendered by default.

145 changes: 129 additions & 16 deletions testplans/graphsync/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"os"
"path/filepath"
goruntime "runtime"
"runtime/pprof"
"strings"
"time"

Expand Down Expand Up @@ -65,10 +66,10 @@ func (p networkParams) String() string {

func runStress(runenv *runtime.RunEnv, initCtx *run.InitContext) error {
var (
size = runenv.SizeParam("size")
concurrency = runenv.IntParam("concurrency")

networkParams = parseNetworkConfig(runenv)
size = runenv.SizeParam("size")
concurrency = runenv.IntParam("concurrency")
networkParams = parseNetworkConfig(runenv)
memorySnapshots = parseMemorySnapshotsParam(runenv)
)
runenv.RecordMessage("started test instance")
runenv.RecordMessage("network params: %v", networkParams)
Expand All @@ -90,6 +91,7 @@ func runStress(runenv *runtime.RunEnv, initCtx *run.InitContext) error {
storeutil.LoaderForBlockstore(bs),
storeutil.StorerForBlockstore(bs),
)
recorder = &runRecorder{memorySnapshots: memorySnapshots, runenv: runenv}
)

defer initCtx.SyncClient.MustSignalAndWait(ctx, "done", runenv.TestInstanceCount)
Expand All @@ -106,19 +108,24 @@ func runStress(runenv *runtime.RunEnv, initCtx *run.InitContext) error {
gsync.RegisterIncomingRequestHook(func(p peer.ID, request gs.RequestData, hookActions gs.IncomingRequestHookActions) {
hookActions.ValidateRequest()
})

return runProvider(ctx, runenv, initCtx, dagsrv, size, networkParams, concurrency)
gsync.RegisterBlockSentListener(func(p peer.ID, request gs.RequestData, block gs.BlockData) {
recorder.recordBlock()
})
return runProvider(ctx, runenv, initCtx, dagsrv, size, networkParams, concurrency, memorySnapshots, recorder)

case "requestors":
runenv.RecordMessage("we are the requestor")
defer runenv.RecordMessage("done requestor")
gsync.RegisterIncomingBlockHook(func(p peer.ID, request gs.ResponseData, block gs.BlockData, ha gs.IncomingBlockHookActions) {
recorder.recordBlock()
})

p := *peers[0]
if err := host.Connect(ctx, p); err != nil {
return err
}
runenv.RecordMessage("done dialling provider")
return runRequestor(ctx, runenv, initCtx, gsync, p, dagsrv, networkParams, concurrency, size)
return runRequestor(ctx, runenv, initCtx, gsync, p, dagsrv, networkParams, concurrency, size, memorySnapshots, recorder)

default:
panic(fmt.Sprintf("unsupported group ID: %s\n", runenv.TestGroupID))
Expand Down Expand Up @@ -158,7 +165,33 @@ func parseNetworkConfig(runenv *runtime.RunEnv) []networkParams {
return ret
}

func runRequestor(ctx context.Context, runenv *runtime.RunEnv, initCtx *run.InitContext, gsync gs.GraphExchange, p peer.AddrInfo, dagsrv format.DAGService, networkParams []networkParams, concurrency int, size uint64) error {
type snapshotMode uint

const (
snapshotNone snapshotMode = iota
snapshotSimple
snapshotDetailed
)

const (
detailedSnapshotFrequency = 10
)

func parseMemorySnapshotsParam(runenv *runtime.RunEnv) snapshotMode {
memorySnapshotsString := runenv.StringParam("memory_snapshots")
switch memorySnapshotsString {
case "none":
return snapshotNone
case "simple":
return snapshotSimple
case "detailed":
return snapshotDetailed
default:
panic("invalid memory_snapshot parameter")
}
}

func runRequestor(ctx context.Context, runenv *runtime.RunEnv, initCtx *run.InitContext, gsync gs.GraphExchange, p peer.AddrInfo, dagsrv format.DAGService, networkParams []networkParams, concurrency int, size uint64, memorySnapshots snapshotMode, recorder *runRecorder) error {
var (
cids []cid.Cid
// create a selector for the whole UnixFS dag
Expand All @@ -167,11 +200,14 @@ func runRequestor(ctx context.Context, runenv *runtime.RunEnv, initCtx *run.Init

for round, np := range networkParams {
var (
topicCid = sync.NewTopic(fmt.Sprintf("cid-%d", round), []cid.Cid{})
stateNext = sync.State(fmt.Sprintf("next-%d", round))
stateNet = sync.State(fmt.Sprintf("network-configured-%d", round))
topicCid = sync.NewTopic(fmt.Sprintf("cid-%d", round), []cid.Cid{})
stateNext = sync.State(fmt.Sprintf("next-%d", round))
stateNet = sync.State(fmt.Sprintf("network-configured-%d", round))
stateFinish = sync.State(fmt.Sprintf("finish-%d", round))
)

recorder.beginRun(np, size, concurrency)

// wait for all instances to be ready for the next state.
initCtx.SyncClient.MustSignalAndWait(ctx, stateNext, runenv.TestInstanceCount)

Expand Down Expand Up @@ -206,7 +242,9 @@ func runRequestor(ctx context.Context, runenv *runtime.RunEnv, initCtx *run.Init
runenv.RecordMessage("\t>>> requesting CID %s", c)

start := time.Now()
_, errCh := gsync.Request(grpctx, p.ID, clink, sel)
respCh, errCh := gsync.Request(grpctx, p.ID, clink, sel)
for range respCh {
}
for err := range errCh {
return err
}
Expand All @@ -233,23 +271,32 @@ func runRequestor(ctx context.Context, runenv *runtime.RunEnv, initCtx *run.Init
if err := errgrp.Wait(); err != nil {
return err
}

// wait for all instances to finish running
initCtx.SyncClient.MustSignalAndWait(ctx, stateFinish, runenv.TestInstanceCount)

if memorySnapshots == snapshotSimple || memorySnapshots == snapshotDetailed {
recordSnapshots(runenv, size, np, concurrency, "total")
}
}

return nil
}

func runProvider(ctx context.Context, runenv *runtime.RunEnv, initCtx *run.InitContext, dagsrv format.DAGService, size uint64, networkParams []networkParams, concurrency int) error {
func runProvider(ctx context.Context, runenv *runtime.RunEnv, initCtx *run.InitContext, dagsrv format.DAGService, size uint64, networkParams []networkParams, concurrency int, memorySnapshots snapshotMode, recorder *runRecorder) error {
var (
cids []cid.Cid
bufferedDS = format.NewBufferedDAG(ctx, dagsrv)
)

for round, np := range networkParams {
var (
topicCid = sync.NewTopic(fmt.Sprintf("cid-%d", round), []cid.Cid{})
stateNext = sync.State(fmt.Sprintf("next-%d", round))
stateNet = sync.State(fmt.Sprintf("network-configured-%d", round))
topicCid = sync.NewTopic(fmt.Sprintf("cid-%d", round), []cid.Cid{})
stateNext = sync.State(fmt.Sprintf("next-%d", round))
stateFinish = sync.State(fmt.Sprintf("finish-%d", round))
stateNet = sync.State(fmt.Sprintf("network-configured-%d", round))
)
recorder.beginRun(np, size, concurrency)

// wait for all instances to be ready for the next state.
initCtx.SyncClient.MustSignalAndWait(ctx, stateNext, runenv.TestInstanceCount)
Expand Down Expand Up @@ -314,6 +361,14 @@ func runProvider(ctx context.Context, runenv *runtime.RunEnv, initCtx *run.InitC
CallbackTarget: 1,
})
runenv.RecordMessage("\tnetwork configured for round %d", round)

// wait for all instances to finish running
initCtx.SyncClient.MustSignalAndWait(ctx, stateFinish, runenv.TestInstanceCount)

if memorySnapshots == snapshotSimple || memorySnapshots == snapshotDetailed {
recordSnapshots(runenv, size, np, concurrency, "total")
}

}

return nil
Expand Down Expand Up @@ -414,3 +469,61 @@ func createDatastore(diskStore bool) ds.Datastore {

return datastore
}

func recordSnapshots(runenv *runtime.RunEnv, size uint64, np networkParams, concurrency int, postfix string) error {
runenv.RecordMessage("Recording heap profile...")
err := writeHeap(runenv, size, np, concurrency, fmt.Sprintf("%s-pre-gc", postfix))
if err != nil {
return err
}
goruntime.GC()
goruntime.GC()
err = writeHeap(runenv, size, np, concurrency, fmt.Sprintf("%s-post-gc", postfix))
if err != nil {
return err
}
return nil
}

func writeHeap(runenv *runtime.RunEnv, size uint64, np networkParams, concurrency int, postfix string) error {
snapshotName := fmt.Sprintf("heap_lat-%s_bw-%s_concurrency-%d_size-%s_%s", np.latency, humanize.IBytes(np.bandwidth), concurrency, humanize.Bytes(size), postfix)
snapshotName = strings.Replace(snapshotName, " ", "", -1)
snapshotFile, err := runenv.CreateRawAsset(snapshotName)
if err != nil {
return err
}
err = pprof.WriteHeapProfile(snapshotFile)
if err != nil {
return err
}
err = snapshotFile.Close()
if err != nil {
return err
}
return nil
}

type runRecorder struct {
memorySnapshots snapshotMode
index int
np networkParams
size uint64
concurrency int
runenv *runtime.RunEnv
}

func (rr *runRecorder) recordBlock() {
if rr.memorySnapshots == snapshotDetailed {
if rr.index%detailedSnapshotFrequency == 0 {
recordSnapshots(rr.runenv, rr.size, rr.np, rr.concurrency, fmt.Sprintf("incremental-%d", rr.index))
}
}
rr.index++
}

func (rr *runRecorder) beginRun(np networkParams, size uint64, concurrency int) {
rr.concurrency = concurrency
rr.np = np
rr.size = size
rr.index = 0
}
1 change: 1 addition & 0 deletions testplans/graphsync/manifest.toml
Original file line number Diff line number Diff line change
Expand Up @@ -26,3 +26,4 @@ chunk_size = { type = "int", desc = "unixfs chunk size (power of 2)", default =
links_per_level = { type = "int", desc = "unixfs links per level", default = "1024" }
raw_leaves = { type = "bool", desc = "should unixfs leaves be left unwrapped", default = "true"}
disk_store = { type = "bool", desc = "should data be stored on disk (true) or memory (false)", default = "false"}
memory_snapshots = { type = "string", desc = "what kind of memory snapshots to take (none, simple, detailed)", default = "none" }
40 changes: 40 additions & 0 deletions testplans/graphsync/memory-stress-k8s.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
[metadata]
name = "memory-stress"

[global]
plan = "graphsync"
case = "stress"
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not sure how different this test plan is to the one that is at https://github.com/filecoin-project/lotus/tree/master/testplans/graphsync , but if they are different (and I believe they are), we should change the name of the case to something else.

The automated dashboards, such as https://ci.testground.ipfs.team/dashboard?task_id=c09omhl5p7a858f1470g rely on uniqueness of the name... i.e. if we have 2 testplans that do different stuff and share the same plan:case , then dashboards would be meaningless.

Nothing urgent, just explaining in case we decide we want to run this on TaaS as well, periodically, which I think would be nice.

total_instances = 2
builder = "docker:go"
runner = "cluster:k8s"

[global.build_config]
push_registry=true
go_proxy_mode="remote"
go_proxy_url="http://localhost:8081"
registry_type="aws"

[global.run.test_params]
size = "512MB"
latencies = '["50ms"]'
bandwidths = '["32MiB"]'
concurrency = "1"
chunk_size = "18"
links_per_level = "1024"
raw_leaves = "false"
disk_store = "true"
memory_snapshots = "detailed"

[[groups]]
id = "providers"
instances = { count = 1 }
[groups.resources]
memory = "4096Mi"
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Running this test with the following params, results in OOMKilled error for the provider.

[global.run.test_params]
size      = "8GB"
latencies = '["20ms"]'
bandwidths = '["128MiB"]'


[[groups]]
id = "providers"
instances = { count = 1 }
[groups.resources]
memory = "2048Mi"
cpu = "1000m"

[[groups]]
id = "requestors"
instances = { count = 1 }
[groups.resources]
memory = "2048Mi"
cpu = "1000m"

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should be fixed now.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am still getting OOMKilled when I try to run the memory-stress-k8s.toml with size > memory for providers and requestors.

cpu = "1000m"

[[groups]]
id = "requestors"
instances = { count = 1 }
[groups.resources]
memory = "4096Mi"
cpu = "1000m"
28 changes: 28 additions & 0 deletions testplans/graphsync/memory-stress.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
[metadata]
name = "memory-stress"

[global]
plan = "graphsync"
case = "stress"
total_instances = 2
builder = "docker:go"
runner = "local:docker"

[global.run.test_params]
size = "512MB"
latencies = '["50ms"]'
bandwidths = '["32MiB"]'
concurrency = "1"
chunk_size = "18"
links_per_level = "1024"
raw_leaves = "false"
disk_store = "true"
memory_snapshots = "detailed"

[[groups]]
id = "providers"
instances = { count = 1 }

[[groups]]
id = "requestors"
instances = { count = 1 }
5 changes: 5 additions & 0 deletions testplans/graphsync/stress-k8s.toml
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,11 @@ size = "10MB"
latencies = '["50ms", "100ms", "200ms"]'
bandwidths = '["32MiB", "16MiB", "8MiB", "4MiB", "1MiB"]'
concurrency = "10"
chunk_size = "20"
links_per_level = "1024"
raw_leaves = "true"
disk_store = "true"
memory_snapshots = "simple"

[[groups]]
id = "providers"
Expand Down
1 change: 1 addition & 0 deletions testplans/graphsync/stress.toml
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ chunk_size = "20"
links_per_level = "1024"
raw_leaves = "true"
disk_store = "true"
memory_snapshots = "true"

[[groups]]
id = "providers"
Expand Down