mirror of
https://github.com/ipfs/kubo.git
synced 2025-06-30 01:52:26 +08:00
bitswap debug logging
This commit is contained in:
@ -3,7 +3,6 @@
|
|||||||
package bitswap
|
package bitswap
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"fmt"
|
|
||||||
"math"
|
"math"
|
||||||
"sync"
|
"sync"
|
||||||
"time"
|
"time"
|
||||||
@ -172,14 +171,14 @@ func (bs *bitswap) HasBlock(ctx context.Context, blk *blocks.Block) error {
|
|||||||
}
|
}
|
||||||
|
|
||||||
func (bs *bitswap) sendWantlistMsgToPeer(ctx context.Context, m bsmsg.BitSwapMessage, p peer.ID) error {
|
func (bs *bitswap) sendWantlistMsgToPeer(ctx context.Context, m bsmsg.BitSwapMessage, p peer.ID) error {
|
||||||
logd := fmt.Sprintf("%s bitswap.sendWantlistMsgToPeer(%d, %s)", bs.self, len(m.Wantlist()), p)
|
log := log.Prefix("bitswap(%s).bitswap.sendWantlistMsgToPeer(%d, %s)", bs.self, len(m.Wantlist()), p)
|
||||||
|
|
||||||
log.Debugf("%s sending wantlist", logd)
|
log.Debug("sending wantlist")
|
||||||
if err := bs.send(ctx, p, m); err != nil {
|
if err := bs.send(ctx, p, m); err != nil {
|
||||||
log.Errorf("%s send wantlist error: %s", logd, err)
|
log.Errorf("send wantlist error: %s", err)
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
log.Debugf("%s send wantlist success", logd)
|
log.Debugf("send wantlist success")
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -188,20 +187,20 @@ func (bs *bitswap) sendWantlistMsgToPeers(ctx context.Context, m bsmsg.BitSwapMe
|
|||||||
panic("Cant send wantlist to nil peerchan")
|
panic("Cant send wantlist to nil peerchan")
|
||||||
}
|
}
|
||||||
|
|
||||||
logd := fmt.Sprintf("%s bitswap.sendWantlistMsgTo(%d)", bs.self, len(m.Wantlist()))
|
log := log.Prefix("bitswap(%s).sendWantlistMsgToPeers(%d)", bs.self, len(m.Wantlist()))
|
||||||
log.Debugf("%s begin", logd)
|
log.Debugf("begin")
|
||||||
defer log.Debugf("%s end", logd)
|
defer log.Debugf("end")
|
||||||
|
|
||||||
set := pset.New()
|
set := pset.New()
|
||||||
wg := sync.WaitGroup{}
|
wg := sync.WaitGroup{}
|
||||||
for peerToQuery := range peers {
|
for peerToQuery := range peers {
|
||||||
log.Event(ctx, "PeerToQuery", peerToQuery)
|
log.Event(ctx, "PeerToQuery", peerToQuery)
|
||||||
logd := fmt.Sprintf("%sto(%s)", logd, peerToQuery)
|
|
||||||
|
|
||||||
if !set.TryAdd(peerToQuery) { //Do once per peer
|
if !set.TryAdd(peerToQuery) { //Do once per peer
|
||||||
log.Debugf("%s skipped (already sent)", logd)
|
log.Debugf("%s skipped (already sent)", peerToQuery)
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
|
log.Debugf("%s sending", peerToQuery)
|
||||||
|
|
||||||
wg.Add(1)
|
wg.Add(1)
|
||||||
go func(p peer.ID) {
|
go func(p peer.ID) {
|
||||||
@ -223,9 +222,9 @@ func (bs *bitswap) sendWantlistToPeers(ctx context.Context, peers <-chan peer.ID
|
|||||||
}
|
}
|
||||||
|
|
||||||
func (bs *bitswap) sendWantlistToProviders(ctx context.Context) {
|
func (bs *bitswap) sendWantlistToProviders(ctx context.Context) {
|
||||||
logd := fmt.Sprintf("%s bitswap.sendWantlistToProviders", bs.self)
|
log := log.Prefix("bitswap(%s).sendWantlistToProviders ", bs.self)
|
||||||
log.Debugf("%s begin", logd)
|
log.Debugf("begin")
|
||||||
defer log.Debugf("%s end", logd)
|
defer log.Debugf("end")
|
||||||
|
|
||||||
ctx, cancel := context.WithCancel(ctx)
|
ctx, cancel := context.WithCancel(ctx)
|
||||||
defer cancel()
|
defer cancel()
|
||||||
@ -240,13 +239,13 @@ func (bs *bitswap) sendWantlistToProviders(ctx context.Context) {
|
|||||||
go func(k u.Key) {
|
go func(k u.Key) {
|
||||||
defer wg.Done()
|
defer wg.Done()
|
||||||
|
|
||||||
logd := fmt.Sprintf("%s(entry: %s)", logd, k)
|
log := log.Prefix("(entry: %s) ", k)
|
||||||
log.Debugf("%s asking dht for providers", logd)
|
log.Debug("asking dht for providers")
|
||||||
|
|
||||||
child, _ := context.WithTimeout(ctx, providerRequestTimeout)
|
child, _ := context.WithTimeout(ctx, providerRequestTimeout)
|
||||||
providers := bs.network.FindProvidersAsync(child, k, maxProvidersPerRequest)
|
providers := bs.network.FindProvidersAsync(child, k, maxProvidersPerRequest)
|
||||||
for prov := range providers {
|
for prov := range providers {
|
||||||
log.Debugf("%s dht returned provider %s. send wantlist", logd, prov)
|
log.Debugf("dht returned provider %s. send wantlist", prov)
|
||||||
sendToPeers <- prov
|
sendToPeers <- prov
|
||||||
}
|
}
|
||||||
}(e.Key)
|
}(e.Key)
|
||||||
@ -259,7 +258,7 @@ func (bs *bitswap) sendWantlistToProviders(ctx context.Context) {
|
|||||||
|
|
||||||
err := bs.sendWantlistToPeers(ctx, sendToPeers)
|
err := bs.sendWantlistToPeers(ctx, sendToPeers)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.Errorf("%s sendWantlistToPeers error: %s", logd, err)
|
log.Errorf("sendWantlistToPeers error: %s", err)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -8,7 +8,7 @@ import (
|
|||||||
bsmsg "github.com/jbenet/go-ipfs/exchange/bitswap/message"
|
bsmsg "github.com/jbenet/go-ipfs/exchange/bitswap/message"
|
||||||
wl "github.com/jbenet/go-ipfs/exchange/bitswap/wantlist"
|
wl "github.com/jbenet/go-ipfs/exchange/bitswap/wantlist"
|
||||||
peer "github.com/jbenet/go-ipfs/p2p/peer"
|
peer "github.com/jbenet/go-ipfs/p2p/peer"
|
||||||
u "github.com/jbenet/go-ipfs/util"
|
eventlog "github.com/jbenet/go-ipfs/util/eventlog"
|
||||||
)
|
)
|
||||||
|
|
||||||
// TODO consider taking responsibility for other types of requests. For
|
// TODO consider taking responsibility for other types of requests. For
|
||||||
@ -41,7 +41,7 @@ import (
|
|||||||
// whatever it sees fit to produce desired outcomes (get wanted keys
|
// whatever it sees fit to produce desired outcomes (get wanted keys
|
||||||
// quickly, maintain good relationships with peers, etc).
|
// quickly, maintain good relationships with peers, etc).
|
||||||
|
|
||||||
var log = u.Logger("engine")
|
var log = eventlog.Logger("engine")
|
||||||
|
|
||||||
const (
|
const (
|
||||||
sizeOutboxChan = 4
|
sizeOutboxChan = 4
|
||||||
@ -140,6 +140,10 @@ func (e *Engine) Peers() []peer.ID {
|
|||||||
// MessageReceived performs book-keeping. Returns error if passed invalid
|
// MessageReceived performs book-keeping. Returns error if passed invalid
|
||||||
// arguments.
|
// arguments.
|
||||||
func (e *Engine) MessageReceived(p peer.ID, m bsmsg.BitSwapMessage) error {
|
func (e *Engine) MessageReceived(p peer.ID, m bsmsg.BitSwapMessage) error {
|
||||||
|
log := log.Prefix("Engine.MessageReceived(%s)", p)
|
||||||
|
log.Debugf("enter")
|
||||||
|
defer log.Debugf("exit")
|
||||||
|
|
||||||
newWorkExists := false
|
newWorkExists := false
|
||||||
defer func() {
|
defer func() {
|
||||||
if newWorkExists {
|
if newWorkExists {
|
||||||
@ -156,9 +160,11 @@ func (e *Engine) MessageReceived(p peer.ID, m bsmsg.BitSwapMessage) error {
|
|||||||
}
|
}
|
||||||
for _, entry := range m.Wantlist() {
|
for _, entry := range m.Wantlist() {
|
||||||
if entry.Cancel {
|
if entry.Cancel {
|
||||||
|
log.Debug("cancel", entry.Key)
|
||||||
l.CancelWant(entry.Key)
|
l.CancelWant(entry.Key)
|
||||||
e.peerRequestQueue.Remove(entry.Key, p)
|
e.peerRequestQueue.Remove(entry.Key, p)
|
||||||
} else {
|
} else {
|
||||||
|
log.Debug("wants", entry.Key, entry.Priority)
|
||||||
l.Wants(entry.Key, entry.Priority)
|
l.Wants(entry.Key, entry.Priority)
|
||||||
if exists, err := e.bs.Has(entry.Key); err == nil && exists {
|
if exists, err := e.bs.Has(entry.Key); err == nil && exists {
|
||||||
newWorkExists = true
|
newWorkExists = true
|
||||||
@ -169,6 +175,7 @@ func (e *Engine) MessageReceived(p peer.ID, m bsmsg.BitSwapMessage) error {
|
|||||||
|
|
||||||
for _, block := range m.Blocks() {
|
for _, block := range m.Blocks() {
|
||||||
// FIXME extract blocks.NumBytes(block) or block.NumBytes() method
|
// FIXME extract blocks.NumBytes(block) or block.NumBytes() method
|
||||||
|
log.Debug("got block %s %d bytes", block.Key(), len(block.Data))
|
||||||
l.ReceivedBytes(len(block.Data))
|
l.ReceivedBytes(len(block.Data))
|
||||||
for _, l := range e.ledgerMap {
|
for _, l := range e.ledgerMap {
|
||||||
if l.WantListContains(block.Key()) {
|
if l.WantListContains(block.Key()) {
|
||||||
|
@ -55,6 +55,7 @@ func (bsnet *impl) SendRequest(
|
|||||||
p peer.ID,
|
p peer.ID,
|
||||||
outgoing bsmsg.BitSwapMessage) (bsmsg.BitSwapMessage, error) {
|
outgoing bsmsg.BitSwapMessage) (bsmsg.BitSwapMessage, error) {
|
||||||
|
|
||||||
|
log.Debugf("bsnet SendRequest to %s", p)
|
||||||
s, err := bsnet.host.NewStream(ProtocolBitswap, p)
|
s, err := bsnet.host.NewStream(ProtocolBitswap, p)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
|
@ -87,15 +87,11 @@ func (dht *IpfsDHT) sendRequest(ctx context.Context, p peer.ID, pmes *pb.Message
|
|||||||
|
|
||||||
start := time.Now()
|
start := time.Now()
|
||||||
|
|
||||||
log.Debugf("%s writing", dht.self)
|
|
||||||
if err := w.WriteMsg(pmes); err != nil {
|
if err := w.WriteMsg(pmes); err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
log.Event(ctx, "dhtSentMessage", dht.self, p, pmes)
|
log.Event(ctx, "dhtSentMessage", dht.self, p, pmes)
|
||||||
|
|
||||||
log.Debugf("%s reading", dht.self)
|
|
||||||
defer log.Debugf("%s done", dht.self)
|
|
||||||
|
|
||||||
rpmes := new(pb.Message)
|
rpmes := new(pb.Message)
|
||||||
if err := r.ReadMsg(rpmes); err != nil {
|
if err := r.ReadMsg(rpmes); err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
@ -125,12 +121,10 @@ func (dht *IpfsDHT) sendMessage(ctx context.Context, p peer.ID, pmes *pb.Message
|
|||||||
cw := ctxutil.NewWriter(ctx, s) // ok to use. we defer close stream in this func
|
cw := ctxutil.NewWriter(ctx, s) // ok to use. we defer close stream in this func
|
||||||
w := ggio.NewDelimitedWriter(cw)
|
w := ggio.NewDelimitedWriter(cw)
|
||||||
|
|
||||||
log.Debugf("%s writing", dht.self)
|
|
||||||
if err := w.WriteMsg(pmes); err != nil {
|
if err := w.WriteMsg(pmes); err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
log.Event(ctx, "dhtSentMessage", dht.self, p, pmes)
|
log.Event(ctx, "dhtSentMessage", dht.self, p, pmes)
|
||||||
log.Debugf("%s done", dht.self)
|
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
Reference in New Issue
Block a user