1
0
mirror of https://github.com/ipfs/kubo.git synced 2025-06-30 18:13:54 +08:00

some bitswap cleanup

This commit is contained in:
Jeromy
2014-11-26 22:50:41 +00:00
parent 1e7b7efa76
commit f0a4fdad59
5 changed files with 54 additions and 52 deletions

View File

@ -31,6 +31,7 @@ import (
u "github.com/jbenet/go-ipfs/util"
ctxc "github.com/jbenet/go-ipfs/util/ctxcloser"
debugerror "github.com/jbenet/go-ipfs/util/debugerror"
"github.com/jbenet/go-ipfs/util/eventlog"
)
const IpnsValidatorTag = "ipns"

View File

@ -16,11 +16,14 @@ import (
strategy "github.com/jbenet/go-ipfs/exchange/bitswap/strategy"
peer "github.com/jbenet/go-ipfs/peer"
u "github.com/jbenet/go-ipfs/util"
"github.com/jbenet/go-ipfs/util/eventlog"
eventlog "github.com/jbenet/go-ipfs/util/eventlog"
)
var log = eventlog.Logger("bitswap")
// Number of providers to request for sending a wantlist to
const maxProvidersPerRequest = 6
// New initializes a BitSwap instance that communicates over the
// provided BitSwapNetwork. This function registers the returned instance as
// the network delegate.
@ -97,7 +100,7 @@ func (bs *bitswap) GetBlock(parent context.Context, k u.Key) (*blocks.Block, err
ctx, cancelFunc := context.WithCancel(parent)
ctx = eventlog.ContextWithMetadata(ctx, eventlog.Uuid("GetBlockRequest"))
ctx = eventlog.ContextWithLoggable(ctx, eventlog.Uuid("GetBlockRequest"))
log.Event(ctx, "GetBlockRequestBegin", &k)
defer func() {
@ -176,14 +179,29 @@ func (bs *bitswap) sendWantListTo(ctx context.Context, peers <-chan peer.Peer) e
return nil
}
func (bs *bitswap) sendWantlistToProviders(ctx context.Context, ks []u.Key) {
done := make(chan struct{})
for _, k := range ks {
go func(k u.Key) {
providers := bs.routing.FindProvidersAsync(ctx, k, maxProvidersPerRequest)
err := bs.sendWantListTo(ctx, providers)
if err != nil {
log.Errorf("error sending wantlist: %s", err)
}
done <- struct{}{}
}(k)
}
for _ = range ks {
<-done
}
}
// TODO ensure only one active request per key
func (bs *bitswap) loop(parent context.Context) {
ctx, cancel := context.WithCancel(parent)
// Every so often, we should resend out our current want list
rebroadcastTime := time.Second * 5
broadcastSignal := time.NewTicker(bs.strategy.GetRebroadcastDelay())
defer func() {
cancel() // signal to derived async functions
@ -193,15 +211,12 @@ func (bs *bitswap) loop(parent context.Context) {
for {
select {
case <-broadcastSignal.C:
for _, k := range bs.wantlist.Keys() {
providers := bs.routing.FindProvidersAsync(ctx, k, maxProvidersPerRequest)
err := bs.sendWantListTo(ctx, providers)
if err != nil {
log.Errorf("error sending wantlist: %s", err)
}
}
bs.sendWantlistToProviders(ctx, bs.wantlist.Keys())
case ks := <-bs.batchRequests:
// TODO: implement batching on len(ks) > X for some X
// i.e. if given 20 keys, fetch first five, then next
// five, and so on, so we are more likely to be able to
// effectively stream the data
if len(ks) == 0 {
log.Warning("Received batch request for zero blocks")
continue
@ -232,6 +247,18 @@ func (bs *bitswap) HasBlock(ctx context.Context, blk *blocks.Block) error {
return bs.routing.Provide(ctx, blk.Key())
}
func (bs *bitswap) receiveBlock(ctx context.Context, block *blocks.Block) {
// TODO verify blocks?
if err := bs.blockstore.Put(block); err != nil {
log.Criticalf("error putting block: %s", err)
return
}
err := bs.HasBlock(ctx, block)
if err != nil {
log.Warningf("HasBlock errored: %s", err)
}
}
// TODO(brian): handle errors
func (bs *bitswap) ReceiveMessage(ctx context.Context, p peer.Peer, incoming bsmsg.BitSwapMessage) (
peer.Peer, bsmsg.BitSwapMessage) {
@ -255,15 +282,7 @@ func (bs *bitswap) ReceiveMessage(ctx context.Context, p peer.Peer, incoming bsm
bs.strategy.MessageReceived(p, incoming) // FIRST
for _, block := range incoming.Blocks() {
// TODO verify blocks?
if err := bs.blockstore.Put(block); err != nil {
log.Criticalf("error putting block: %s", err)
continue // FIXME(brian): err ignored
}
err := bs.HasBlock(ctx, block)
if err != nil {
log.Warningf("HasBlock errored: %s", err)
}
go bs.receiveBlock(ctx, block)
}
for _, key := range incoming.Wantlist() {
@ -277,6 +296,8 @@ func (bs *bitswap) ReceiveMessage(ctx context.Context, p peer.Peer, incoming bsm
blkmsg := bsmsg.New()
// TODO: only send this the first time
// no sense in sending our wantlist to the
// same peer multiple times
for _, k := range bs.wantlist.Keys() {
blkmsg.AddWanted(k)
}

View File

@ -148,5 +148,5 @@ func (s *strategist) GetBatchSize() int {
}
func (s *strategist) GetRebroadcastDelay() time.Duration {
return time.Second * 2
return time.Second * 5
}

View File

@ -28,7 +28,7 @@ type DAGService interface {
AddRecursive(*Node) error
Get(u.Key) (*Node, error)
Remove(*Node) error
BatchFetch(context.Context, *Node) <-chan *Node
GetKeysAsync(context.Context, *Node) <-chan *Node
}
func NewDAGService(bs *bserv.BlockService) DAGService {
@ -298,41 +298,33 @@ func FindLink(n *Node, k u.Key, found []*Node) (int, error) {
return -1, u.ErrNotFound
}
// BatchFetch will fill out all of the links of the given Node.
// GetKeysAsync will fill out all of the links of the given Node.
// It returns a channel of nodes, which the caller can receive
// all the child nodes of 'root' on, in proper order.
func (ds *dagService) BatchFetch(ctx context.Context, root *Node) <-chan *Node {
func (ds *dagService) GetKeysAsync(ctx context.Context, root *Node) <-chan *Node {
sig := make(chan *Node)
go func() {
var keys []u.Key
nodes := make([]*Node, len(root.Links))
//temp
recvd := []int{}
//
//
next := 0
//
for _, lnk := range root.Links {
keys = append(keys, u.Key(lnk.Hash))
}
blkchan := ds.Blocks.GetBlocks(ctx, keys)
count := 0
next := 0
for blk := range blkchan {
count++
i, err := FindLink(root, blk.Key(), nodes)
if err != nil {
// NB: can only occur as a result of programmer error
panic("Received block that wasnt in this nodes links!")
}
recvd = append(recvd, i)
nd, err := Decoded(blk.Data)
if err != nil {
// NB: can occur in normal situations, with improperly formatted
// input data
log.Error("Got back bad block!")
break
}
@ -347,23 +339,11 @@ func (ds *dagService) BatchFetch(ctx context.Context, root *Node) <-chan *Node {
}
}
if next < len(nodes) {
log.Errorf("count = %d, links = %d", count, len(nodes))
log.Error(recvd)
panic("didnt receive all requested blocks!")
// TODO: bubble errors back up.
log.Errorf("Did not receive correct number of nodes!")
}
close(sig)
}()
return sig
}
func checkForDupes(ks []u.Key) bool {
seen := make(map[u.Key]struct{})
for _, k := range ks {
if _, ok := seen[k]; ok {
return true
}
seen[k] = struct{}{}
}
return false
}

View File

@ -40,7 +40,7 @@ func NewDagReader(n *mdag.Node, serv mdag.DAGService) (io.Reader, error) {
case ftpb.Data_File:
var fetchChan <-chan *mdag.Node
if serv != nil {
fetchChan = serv.BatchFetch(context.TODO(), n)
fetchChan = serv.GetKeysAsync(context.TODO(), n)
}
return &DagReader{
node: n,