mirror of
https://github.com/ipfs/kubo.git
synced 2025-07-01 10:49:24 +08:00
Merge pull request #3615 from ipfs/feat/metrics/perf
Add multiple performance metrics
This commit is contained in:
@ -28,8 +28,6 @@ import (
|
||||
"gx/ipfs/QmX3QZ5jHEPidwUrymXV1iSCSUhdGxj15sm2gP4jKMef7B/client_golang/prometheus"
|
||||
util "gx/ipfs/QmZuY8aV7zbNXVy6DyN9SmnuH3o9nG852F4aTiSBpts8d1/go-ipfs-util"
|
||||
iconn "gx/ipfs/QmcYnysCkyGezY6k6MQ1yHHdrRiZaU9x3M9Y1tE9qZ5hD2/go-libp2p-interface-conn"
|
||||
|
||||
_ "gx/ipfs/QmV3NSS3A1kX5s28r7yLczhDsXzkgo65cqRgKFXYunWZmD/metrics/runtime"
|
||||
)
|
||||
|
||||
const (
|
||||
|
@ -19,6 +19,7 @@ import (
|
||||
flags "github.com/ipfs/go-ipfs/flags"
|
||||
"github.com/ipfs/go-ipfs/thirdparty/delay"
|
||||
|
||||
metrics "gx/ipfs/QmRg1gKTHzc3CZXSKzem8aR4E3TubFhbgXwfVuWnSK5CC5/go-metrics-interface"
|
||||
process "gx/ipfs/QmSF8fPo3jgVBAy8fpdjjYqgG87dkJgUprRBHRd2tmfgpP/goprocess"
|
||||
procctx "gx/ipfs/QmSF8fPo3jgVBAy8fpdjjYqgG87dkJgUprRBHRd2tmfgpP/goprocess/context"
|
||||
logging "gx/ipfs/QmSpJByNKFX1sCsHBEp3R73FL4NF6FnQTEGyNAXHm2GS52/go-log"
|
||||
@ -47,6 +48,9 @@ var (
|
||||
HasBlockBufferSize = 256
|
||||
provideKeysBufferSize = 2048
|
||||
provideWorkerMax = 512
|
||||
|
||||
// the 1<<18+15 is to observe old file chunks that are 1<<18 + 14 in size
|
||||
metricsBuckets = []float64{1 << 6, 1 << 10, 1 << 14, 1 << 18, 1<<18 + 15, 1 << 22}
|
||||
)
|
||||
|
||||
func init() {
|
||||
@ -74,6 +78,11 @@ func New(parent context.Context, p peer.ID, network bsnet.BitSwapNetwork,
|
||||
// shouldn't accept a context anymore. Clients should probably use Close()
|
||||
// exclusively. We should probably find another way to share logging data
|
||||
ctx, cancelFunc := context.WithCancel(parent)
|
||||
ctx = metrics.CtxSubScope(ctx, "bitswap")
|
||||
dupHist := metrics.NewCtx(ctx, "recv_dup_blocks_bytes", "Summary of duplicate"+
|
||||
" data blocks recived").Histogram(metricsBuckets)
|
||||
allHist := metrics.NewCtx(ctx, "recv_all_blocks_bytes", "Summary of all"+
|
||||
" data blocks recived").Histogram(metricsBuckets)
|
||||
|
||||
notif := notifications.New()
|
||||
px := process.WithTeardown(func() error {
|
||||
@ -91,6 +100,9 @@ func New(parent context.Context, p peer.ID, network bsnet.BitSwapNetwork,
|
||||
newBlocks: make(chan *cid.Cid, HasBlockBufferSize),
|
||||
provideKeys: make(chan *cid.Cid, provideKeysBufferSize),
|
||||
wm: NewWantManager(ctx, network),
|
||||
|
||||
dupMetric: dupHist,
|
||||
allMetric: allHist,
|
||||
}
|
||||
go bs.wm.Run()
|
||||
network.SetDelegate(bs)
|
||||
@ -145,6 +157,10 @@ type Bitswap struct {
|
||||
blocksRecvd int
|
||||
dupBlocksRecvd int
|
||||
dupDataRecvd uint64
|
||||
|
||||
// Metrics interface metrics
|
||||
dupMetric metrics.Histogram
|
||||
allMetric metrics.Histogram
|
||||
}
|
||||
|
||||
type blockRequest struct {
|
||||
@ -352,9 +368,7 @@ func (bs *Bitswap) ReceiveMessage(ctx context.Context, p peer.ID, incoming bsmsg
|
||||
go func(b blocks.Block) {
|
||||
defer wg.Done()
|
||||
|
||||
if err := bs.updateReceiveCounters(b); err != nil {
|
||||
return // ignore error, is either logged previously, or ErrAlreadyHaveBlock
|
||||
}
|
||||
bs.updateReceiveCounters(b)
|
||||
|
||||
k := b.Cid()
|
||||
log.Event(ctx, "Bitswap.GetBlockRequest.End", k)
|
||||
@ -370,24 +384,27 @@ func (bs *Bitswap) ReceiveMessage(ctx context.Context, p peer.ID, incoming bsmsg
|
||||
|
||||
var ErrAlreadyHaveBlock = errors.New("already have block")
|
||||
|
||||
func (bs *Bitswap) updateReceiveCounters(b blocks.Block) error {
|
||||
bs.counterLk.Lock()
|
||||
defer bs.counterLk.Unlock()
|
||||
bs.blocksRecvd++
|
||||
func (bs *Bitswap) updateReceiveCounters(b blocks.Block) {
|
||||
blkLen := len(b.RawData())
|
||||
has, err := bs.blockstore.Has(b.Cid())
|
||||
if err != nil {
|
||||
log.Infof("blockstore.Has error: %s", err)
|
||||
return err
|
||||
}
|
||||
if err == nil && has {
|
||||
bs.dupBlocksRecvd++
|
||||
bs.dupDataRecvd += uint64(len(b.RawData()))
|
||||
return
|
||||
}
|
||||
|
||||
bs.allMetric.Observe(float64(blkLen))
|
||||
if has {
|
||||
return ErrAlreadyHaveBlock
|
||||
bs.dupMetric.Observe(float64(blkLen))
|
||||
}
|
||||
|
||||
bs.counterLk.Lock()
|
||||
defer bs.counterLk.Unlock()
|
||||
|
||||
bs.blocksRecvd++
|
||||
if has {
|
||||
bs.dupBlocksRecvd++
|
||||
bs.dupDataRecvd += uint64(blkLen)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// Connected/Disconnected warns bitswap about peer connections
|
||||
|
@ -9,6 +9,8 @@ import (
|
||||
bsmsg "github.com/ipfs/go-ipfs/exchange/bitswap/message"
|
||||
bsnet "github.com/ipfs/go-ipfs/exchange/bitswap/network"
|
||||
wantlist "github.com/ipfs/go-ipfs/exchange/bitswap/wantlist"
|
||||
|
||||
metrics "gx/ipfs/QmRg1gKTHzc3CZXSKzem8aR4E3TubFhbgXwfVuWnSK5CC5/go-metrics-interface"
|
||||
cid "gx/ipfs/QmV5gPoRsjN1Gid3LMdNZTyfCtP2DsvqEbMAmz82RmmiGk/go-cid"
|
||||
peer "gx/ipfs/QmZcUPvPhD1Xvk6mwijYF8AfR3mG31S1YsEfHG4khrFPRr/go-libp2p-peer"
|
||||
)
|
||||
@ -27,20 +29,29 @@ type WantManager struct {
|
||||
network bsnet.BitSwapNetwork
|
||||
ctx context.Context
|
||||
cancel func()
|
||||
|
||||
wantlistGauge metrics.Gauge
|
||||
sentHistogram metrics.Histogram
|
||||
}
|
||||
|
||||
func NewWantManager(ctx context.Context, network bsnet.BitSwapNetwork) *WantManager {
|
||||
ctx, cancel := context.WithCancel(ctx)
|
||||
wantlistGauge := metrics.NewCtx(ctx, "wanlist_total",
|
||||
"Number of items in wantlist.").Gauge()
|
||||
sentHistogram := metrics.NewCtx(ctx, "sent_all_blocks_bytes", "Histogram of blocks sent by"+
|
||||
" this bitswap").Histogram(metricsBuckets)
|
||||
return &WantManager{
|
||||
incoming: make(chan []*bsmsg.Entry, 10),
|
||||
connect: make(chan peer.ID, 10),
|
||||
disconnect: make(chan peer.ID, 10),
|
||||
peerReqs: make(chan chan []peer.ID),
|
||||
peers: make(map[peer.ID]*msgQueue),
|
||||
wl: wantlist.NewThreadSafe(),
|
||||
network: network,
|
||||
ctx: ctx,
|
||||
cancel: cancel,
|
||||
incoming: make(chan []*bsmsg.Entry, 10),
|
||||
connect: make(chan peer.ID, 10),
|
||||
disconnect: make(chan peer.ID, 10),
|
||||
peerReqs: make(chan chan []peer.ID),
|
||||
peers: make(map[peer.ID]*msgQueue),
|
||||
wl: wantlist.NewThreadSafe(),
|
||||
network: network,
|
||||
ctx: ctx,
|
||||
cancel: cancel,
|
||||
wantlistGauge: wantlistGauge,
|
||||
sentHistogram: sentHistogram,
|
||||
}
|
||||
}
|
||||
|
||||
@ -109,6 +120,8 @@ func (pm *WantManager) SendBlock(ctx context.Context, env *engine.Envelope) {
|
||||
// throughout the network stack
|
||||
defer env.Sent()
|
||||
|
||||
pm.sentHistogram.Observe(float64(len(env.Block.RawData())))
|
||||
|
||||
msg := bsmsg.New(false)
|
||||
msg.AddBlock(env.Block)
|
||||
log.Infof("Sending block %s to %s", env.Block, env.Peer)
|
||||
@ -282,10 +295,12 @@ func (pm *WantManager) Run() {
|
||||
for _, e := range entries {
|
||||
if e.Cancel {
|
||||
if pm.wl.Remove(e.Cid) {
|
||||
pm.wantlistGauge.Dec()
|
||||
filtered = append(filtered, e)
|
||||
}
|
||||
} else {
|
||||
if pm.wl.AddEntry(e.Entry) {
|
||||
pm.wantlistGauge.Inc()
|
||||
filtered = append(filtered, e)
|
||||
}
|
||||
}
|
||||
|
@ -208,9 +208,9 @@
|
||||
},
|
||||
{
|
||||
"author": "whyrusleeping",
|
||||
"hash": "QmbUSMTQtK9GRrUbD4ngqJwSzHsquUc8nyDubRWp4vPybH",
|
||||
"hash": "QmNPv1yzXBqxzqjfTzHCeBoicxxZgHzLezdY2hMCZ3r6EU",
|
||||
"name": "go-ds-measure",
|
||||
"version": "1.1.0"
|
||||
"version": "1.2.0"
|
||||
},
|
||||
{
|
||||
"author": "whyrusleeping",
|
||||
|
@ -8,12 +8,12 @@ import (
|
||||
config "github.com/ipfs/go-ipfs/repo/config"
|
||||
"github.com/ipfs/go-ipfs/thirdparty/dir"
|
||||
|
||||
measure "gx/ipfs/QmNPv1yzXBqxzqjfTzHCeBoicxxZgHzLezdY2hMCZ3r6EU/go-ds-measure"
|
||||
ds "gx/ipfs/QmRWDav6mzWseLWeYfVd5fvUKiVe9xNH29YfMF438fG364/go-datastore"
|
||||
mount "gx/ipfs/QmRWDav6mzWseLWeYfVd5fvUKiVe9xNH29YfMF438fG364/go-datastore/syncmount"
|
||||
"gx/ipfs/QmXZEfbEv9sXG9JnLoMNhREDMDgkq5Jd7uWJ7d77VJ4pxn/go-ds-flatfs"
|
||||
flatfs "gx/ipfs/QmXZEfbEv9sXG9JnLoMNhREDMDgkq5Jd7uWJ7d77VJ4pxn/go-ds-flatfs"
|
||||
levelds "gx/ipfs/QmaHHmfEozrrotyhyN44omJouyuEtx6ahddqV6W5yRaUSQ/go-ds-leveldb"
|
||||
ldbopts "gx/ipfs/QmbBhyDKsY4mbY6xsKt3qu9Y7FPvMJ6qbD8AMjYYvPRw1g/goleveldb/leveldb/opt"
|
||||
measure "gx/ipfs/QmbUSMTQtK9GRrUbD4ngqJwSzHsquUc8nyDubRWp4vPybH/go-ds-measure"
|
||||
)
|
||||
|
||||
const (
|
||||
|
@ -11,7 +11,6 @@ import (
|
||||
"strings"
|
||||
"sync"
|
||||
|
||||
"github.com/ipfs/go-ipfs/Godeps/_workspace/src/github.com/mitchellh/go-homedir"
|
||||
keystore "github.com/ipfs/go-ipfs/keystore"
|
||||
repo "github.com/ipfs/go-ipfs/repo"
|
||||
"github.com/ipfs/go-ipfs/repo/common"
|
||||
@ -21,10 +20,12 @@ import (
|
||||
serialize "github.com/ipfs/go-ipfs/repo/fsrepo/serialize"
|
||||
dir "github.com/ipfs/go-ipfs/thirdparty/dir"
|
||||
|
||||
"github.com/ipfs/go-ipfs/Godeps/_workspace/src/github.com/mitchellh/go-homedir"
|
||||
|
||||
measure "gx/ipfs/QmNPv1yzXBqxzqjfTzHCeBoicxxZgHzLezdY2hMCZ3r6EU/go-ds-measure"
|
||||
ma "gx/ipfs/QmSWLfmj5frN9xVLMMN846dMDriy5wN5jeghUm7aTW3DAG/go-multiaddr"
|
||||
logging "gx/ipfs/QmSpJByNKFX1sCsHBEp3R73FL4NF6FnQTEGyNAXHm2GS52/go-log"
|
||||
util "gx/ipfs/QmZuY8aV7zbNXVy6DyN9SmnuH3o9nG852F4aTiSBpts8d1/go-ipfs-util"
|
||||
"gx/ipfs/QmbUSMTQtK9GRrUbD4ngqJwSzHsquUc8nyDubRWp4vPybH/go-ds-measure"
|
||||
)
|
||||
|
||||
var log = logging.Logger("fsrepo")
|
||||
|
Reference in New Issue
Block a user