Skip to content

Commit d373e93

Browse files
Merge pull request #3615 from ipfs/feat/metrics/perf
Add multiple performance metrics
2 parents 57e4e21 + acfaf9a commit d373e93

File tree

6 files changed

+61
-30
lines changed

6 files changed

+61
-30
lines changed

cmd/ipfs/daemon.go

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -28,8 +28,6 @@ import (
2828
"gx/ipfs/QmX3QZ5jHEPidwUrymXV1iSCSUhdGxj15sm2gP4jKMef7B/client_golang/prometheus"
2929
util "gx/ipfs/QmZuY8aV7zbNXVy6DyN9SmnuH3o9nG852F4aTiSBpts8d1/go-ipfs-util"
3030
iconn "gx/ipfs/QmcYnysCkyGezY6k6MQ1yHHdrRiZaU9x3M9Y1tE9qZ5hD2/go-libp2p-interface-conn"
31-
32-
_ "gx/ipfs/QmV3NSS3A1kX5s28r7yLczhDsXzkgo65cqRgKFXYunWZmD/metrics/runtime"
3331
)
3432

3533
const (

exchange/bitswap/bitswap.go

Lines changed: 30 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@ import (
1919
flags "github.com/ipfs/go-ipfs/flags"
2020
"github.com/ipfs/go-ipfs/thirdparty/delay"
2121

22+
metrics "gx/ipfs/QmRg1gKTHzc3CZXSKzem8aR4E3TubFhbgXwfVuWnSK5CC5/go-metrics-interface"
2223
process "gx/ipfs/QmSF8fPo3jgVBAy8fpdjjYqgG87dkJgUprRBHRd2tmfgpP/goprocess"
2324
procctx "gx/ipfs/QmSF8fPo3jgVBAy8fpdjjYqgG87dkJgUprRBHRd2tmfgpP/goprocess/context"
2425
logging "gx/ipfs/QmSpJByNKFX1sCsHBEp3R73FL4NF6FnQTEGyNAXHm2GS52/go-log"
@@ -47,6 +48,9 @@ var (
4748
HasBlockBufferSize = 256
4849
provideKeysBufferSize = 2048
4950
provideWorkerMax = 512
51+
52+
// the 1<<18+15 is to observe old file chunks that are 1<<18 + 14 in size
53+
metricsBuckets = []float64{1 << 6, 1 << 10, 1 << 14, 1 << 18, 1<<18 + 15, 1 << 22}
5054
)
5155

5256
func init() {
@@ -74,6 +78,11 @@ func New(parent context.Context, p peer.ID, network bsnet.BitSwapNetwork,
7478
// shouldn't accept a context anymore. Clients should probably use Close()
7579
// exclusively. We should probably find another way to share logging data
7680
ctx, cancelFunc := context.WithCancel(parent)
81+
ctx = metrics.CtxSubScope(ctx, "bitswap")
82+
dupHist := metrics.NewCtx(ctx, "recv_dup_blocks_bytes", "Summary of duplicate"+
83+
" data blocks recived").Histogram(metricsBuckets)
84+
allHist := metrics.NewCtx(ctx, "recv_all_blocks_bytes", "Summary of all"+
85+
" data blocks recived").Histogram(metricsBuckets)
7786

7887
notif := notifications.New()
7988
px := process.WithTeardown(func() error {
@@ -91,6 +100,9 @@ func New(parent context.Context, p peer.ID, network bsnet.BitSwapNetwork,
91100
newBlocks: make(chan *cid.Cid, HasBlockBufferSize),
92101
provideKeys: make(chan *cid.Cid, provideKeysBufferSize),
93102
wm: NewWantManager(ctx, network),
103+
104+
dupMetric: dupHist,
105+
allMetric: allHist,
94106
}
95107
go bs.wm.Run()
96108
network.SetDelegate(bs)
@@ -145,6 +157,10 @@ type Bitswap struct {
145157
blocksRecvd int
146158
dupBlocksRecvd int
147159
dupDataRecvd uint64
160+
161+
// Metrics interface metrics
162+
dupMetric metrics.Histogram
163+
allMetric metrics.Histogram
148164
}
149165

150166
type blockRequest struct {
@@ -352,9 +368,7 @@ func (bs *Bitswap) ReceiveMessage(ctx context.Context, p peer.ID, incoming bsmsg
352368
go func(b blocks.Block) {
353369
defer wg.Done()
354370

355-
if err := bs.updateReceiveCounters(b); err != nil {
356-
return // ignore error, is either logged previously, or ErrAlreadyHaveBlock
357-
}
371+
bs.updateReceiveCounters(b)
358372

359373
k := b.Cid()
360374
log.Event(ctx, "Bitswap.GetBlockRequest.End", k)
@@ -370,24 +384,27 @@ func (bs *Bitswap) ReceiveMessage(ctx context.Context, p peer.ID, incoming bsmsg
370384

371385
var ErrAlreadyHaveBlock = errors.New("already have block")
372386

373-
func (bs *Bitswap) updateReceiveCounters(b blocks.Block) error {
374-
bs.counterLk.Lock()
375-
defer bs.counterLk.Unlock()
376-
bs.blocksRecvd++
387+
func (bs *Bitswap) updateReceiveCounters(b blocks.Block) {
388+
blkLen := len(b.RawData())
377389
has, err := bs.blockstore.Has(b.Cid())
378390
if err != nil {
379391
log.Infof("blockstore.Has error: %s", err)
380-
return err
392+
return
381393
}
382-
if err == nil && has {
383-
bs.dupBlocksRecvd++
384-
bs.dupDataRecvd += uint64(len(b.RawData()))
394+
395+
bs.allMetric.Observe(float64(blkLen))
396+
if has {
397+
bs.dupMetric.Observe(float64(blkLen))
385398
}
386399

400+
bs.counterLk.Lock()
401+
defer bs.counterLk.Unlock()
402+
403+
bs.blocksRecvd++
387404
if has {
388-
return ErrAlreadyHaveBlock
405+
bs.dupBlocksRecvd++
406+
bs.dupDataRecvd += uint64(blkLen)
389407
}
390-
return nil
391408
}
392409

393410
// Connected/Disconnected warns bitswap about peer connections

exchange/bitswap/wantmanager.go

Lines changed: 24 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,8 @@ import (
99
bsmsg "github.com/ipfs/go-ipfs/exchange/bitswap/message"
1010
bsnet "github.com/ipfs/go-ipfs/exchange/bitswap/network"
1111
wantlist "github.com/ipfs/go-ipfs/exchange/bitswap/wantlist"
12+
13+
metrics "gx/ipfs/QmRg1gKTHzc3CZXSKzem8aR4E3TubFhbgXwfVuWnSK5CC5/go-metrics-interface"
1214
cid "gx/ipfs/QmV5gPoRsjN1Gid3LMdNZTyfCtP2DsvqEbMAmz82RmmiGk/go-cid"
1315
peer "gx/ipfs/QmZcUPvPhD1Xvk6mwijYF8AfR3mG31S1YsEfHG4khrFPRr/go-libp2p-peer"
1416
)
@@ -27,20 +29,29 @@ type WantManager struct {
2729
network bsnet.BitSwapNetwork
2830
ctx context.Context
2931
cancel func()
32+
33+
wantlistGauge metrics.Gauge
34+
sentHistogram metrics.Histogram
3035
}
3136

3237
func NewWantManager(ctx context.Context, network bsnet.BitSwapNetwork) *WantManager {
3338
ctx, cancel := context.WithCancel(ctx)
39+
wantlistGauge := metrics.NewCtx(ctx, "wanlist_total",
40+
"Number of items in wantlist.").Gauge()
41+
sentHistogram := metrics.NewCtx(ctx, "sent_all_blocks_bytes", "Histogram of blocks sent by"+
42+
" this bitswap").Histogram(metricsBuckets)
3443
return &WantManager{
35-
incoming: make(chan []*bsmsg.Entry, 10),
36-
connect: make(chan peer.ID, 10),
37-
disconnect: make(chan peer.ID, 10),
38-
peerReqs: make(chan chan []peer.ID),
39-
peers: make(map[peer.ID]*msgQueue),
40-
wl: wantlist.NewThreadSafe(),
41-
network: network,
42-
ctx: ctx,
43-
cancel: cancel,
44+
incoming: make(chan []*bsmsg.Entry, 10),
45+
connect: make(chan peer.ID, 10),
46+
disconnect: make(chan peer.ID, 10),
47+
peerReqs: make(chan chan []peer.ID),
48+
peers: make(map[peer.ID]*msgQueue),
49+
wl: wantlist.NewThreadSafe(),
50+
network: network,
51+
ctx: ctx,
52+
cancel: cancel,
53+
wantlistGauge: wantlistGauge,
54+
sentHistogram: sentHistogram,
4455
}
4556
}
4657

@@ -109,6 +120,8 @@ func (pm *WantManager) SendBlock(ctx context.Context, env *engine.Envelope) {
109120
// throughout the network stack
110121
defer env.Sent()
111122

123+
pm.sentHistogram.Observe(float64(len(env.Block.RawData())))
124+
112125
msg := bsmsg.New(false)
113126
msg.AddBlock(env.Block)
114127
log.Infof("Sending block %s to %s", env.Block, env.Peer)
@@ -282,10 +295,12 @@ func (pm *WantManager) Run() {
282295
for _, e := range entries {
283296
if e.Cancel {
284297
if pm.wl.Remove(e.Cid) {
298+
pm.wantlistGauge.Dec()
285299
filtered = append(filtered, e)
286300
}
287301
} else {
288302
if pm.wl.AddEntry(e.Entry) {
303+
pm.wantlistGauge.Inc()
289304
filtered = append(filtered, e)
290305
}
291306
}

package.json

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -208,9 +208,9 @@
208208
},
209209
{
210210
"author": "whyrusleeping",
211-
"hash": "QmbUSMTQtK9GRrUbD4ngqJwSzHsquUc8nyDubRWp4vPybH",
211+
"hash": "QmNPv1yzXBqxzqjfTzHCeBoicxxZgHzLezdY2hMCZ3r6EU",
212212
"name": "go-ds-measure",
213-
"version": "1.1.0"
213+
"version": "1.2.0"
214214
},
215215
{
216216
"author": "whyrusleeping",

repo/fsrepo/defaultds.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -8,12 +8,12 @@ import (
88
config "github.com/ipfs/go-ipfs/repo/config"
99
"github.com/ipfs/go-ipfs/thirdparty/dir"
1010

11+
measure "gx/ipfs/QmNPv1yzXBqxzqjfTzHCeBoicxxZgHzLezdY2hMCZ3r6EU/go-ds-measure"
1112
ds "gx/ipfs/QmRWDav6mzWseLWeYfVd5fvUKiVe9xNH29YfMF438fG364/go-datastore"
1213
mount "gx/ipfs/QmRWDav6mzWseLWeYfVd5fvUKiVe9xNH29YfMF438fG364/go-datastore/syncmount"
13-
"gx/ipfs/QmXZEfbEv9sXG9JnLoMNhREDMDgkq5Jd7uWJ7d77VJ4pxn/go-ds-flatfs"
14+
flatfs "gx/ipfs/QmXZEfbEv9sXG9JnLoMNhREDMDgkq5Jd7uWJ7d77VJ4pxn/go-ds-flatfs"
1415
levelds "gx/ipfs/QmaHHmfEozrrotyhyN44omJouyuEtx6ahddqV6W5yRaUSQ/go-ds-leveldb"
1516
ldbopts "gx/ipfs/QmbBhyDKsY4mbY6xsKt3qu9Y7FPvMJ6qbD8AMjYYvPRw1g/goleveldb/leveldb/opt"
16-
measure "gx/ipfs/QmbUSMTQtK9GRrUbD4ngqJwSzHsquUc8nyDubRWp4vPybH/go-ds-measure"
1717
)
1818

1919
const (

repo/fsrepo/fsrepo.go

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,6 @@ import (
1111
"strings"
1212
"sync"
1313

14-
"github.com/ipfs/go-ipfs/Godeps/_workspace/src/github.com/mitchellh/go-homedir"
1514
keystore "github.com/ipfs/go-ipfs/keystore"
1615
repo "github.com/ipfs/go-ipfs/repo"
1716
"github.com/ipfs/go-ipfs/repo/common"
@@ -21,10 +20,12 @@ import (
2120
serialize "github.com/ipfs/go-ipfs/repo/fsrepo/serialize"
2221
dir "github.com/ipfs/go-ipfs/thirdparty/dir"
2322

23+
"github.com/ipfs/go-ipfs/Godeps/_workspace/src/github.com/mitchellh/go-homedir"
24+
25+
measure "gx/ipfs/QmNPv1yzXBqxzqjfTzHCeBoicxxZgHzLezdY2hMCZ3r6EU/go-ds-measure"
2426
ma "gx/ipfs/QmSWLfmj5frN9xVLMMN846dMDriy5wN5jeghUm7aTW3DAG/go-multiaddr"
2527
logging "gx/ipfs/QmSpJByNKFX1sCsHBEp3R73FL4NF6FnQTEGyNAXHm2GS52/go-log"
2628
util "gx/ipfs/QmZuY8aV7zbNXVy6DyN9SmnuH3o9nG852F4aTiSBpts8d1/go-ipfs-util"
27-
"gx/ipfs/QmbUSMTQtK9GRrUbD4ngqJwSzHsquUc8nyDubRWp4vPybH/go-ds-measure"
2829
)
2930

3031
var log = logging.Logger("fsrepo")

0 commit comments

Comments
 (0)