diff --git a/core/core.go b/core/core.go index b3f858ddd..148853bfc 100644 --- a/core/core.go +++ b/core/core.go @@ -31,6 +31,7 @@ import ( u "github.com/jbenet/go-ipfs/util" ctxc "github.com/jbenet/go-ipfs/util/ctxcloser" debugerror "github.com/jbenet/go-ipfs/util/debugerror" + "github.com/jbenet/go-ipfs/util/eventlog" ) const IpnsValidatorTag = "ipns" diff --git a/exchange/bitswap/bitswap.go b/exchange/bitswap/bitswap.go index 490ae0d47..9cfe5875d 100644 --- a/exchange/bitswap/bitswap.go +++ b/exchange/bitswap/bitswap.go @@ -16,11 +16,14 @@ import ( strategy "github.com/jbenet/go-ipfs/exchange/bitswap/strategy" peer "github.com/jbenet/go-ipfs/peer" u "github.com/jbenet/go-ipfs/util" - "github.com/jbenet/go-ipfs/util/eventlog" + eventlog "github.com/jbenet/go-ipfs/util/eventlog" ) var log = eventlog.Logger("bitswap") +// Number of providers to request for sending a wantlist to +const maxProvidersPerRequest = 6 + // New initializes a BitSwap instance that communicates over the // provided BitSwapNetwork. This function registers the returned instance as // the network delegate. @@ -97,7 +100,7 @@ func (bs *bitswap) GetBlock(parent context.Context, k u.Key) (*blocks.Block, err ctx, cancelFunc := context.WithCancel(parent) - ctx = eventlog.ContextWithMetadata(ctx, eventlog.Uuid("GetBlockRequest")) + ctx = eventlog.ContextWithLoggable(ctx, eventlog.Uuid("GetBlockRequest")) log.Event(ctx, "GetBlockRequestBegin", &k) defer func() { @@ -176,14 +179,29 @@ func (bs *bitswap) sendWantListTo(ctx context.Context, peers <-chan peer.Peer) e return nil } +func (bs *bitswap) sendWantlistToProviders(ctx context.Context, ks []u.Key) { + done := make(chan struct{}) + for _, k := range ks { + go func(k u.Key) { + providers := bs.routing.FindProvidersAsync(ctx, k, maxProvidersPerRequest) + + err := bs.sendWantListTo(ctx, providers) + if err != nil { + log.Errorf("error sending wantlist: %s", err) + } + done <- struct{}{} + }(k) + } + for _ = range ks { + <-done + } +} + // TODO ensure only one active request per key func (bs *bitswap) loop(parent context.Context) { ctx, cancel := context.WithCancel(parent) - // Every so often, we should resend out our current want list - rebroadcastTime := time.Second * 5 - broadcastSignal := time.NewTicker(bs.strategy.GetRebroadcastDelay()) defer func() { cancel() // signal to derived async functions @@ -193,15 +211,12 @@ func (bs *bitswap) loop(parent context.Context) { for { select { case <-broadcastSignal.C: - for _, k := range bs.wantlist.Keys() { - providers := bs.routing.FindProvidersAsync(ctx, k, maxProvidersPerRequest) - err := bs.sendWantListTo(ctx, providers) - if err != nil { - log.Errorf("error sending wantlist: %s", err) - } - } + bs.sendWantlistToProviders(ctx, bs.wantlist.Keys()) case ks := <-bs.batchRequests: // TODO: implement batching on len(ks) > X for some X + // i.e. if given 20 keys, fetch first five, then next + // five, and so on, so we are more likely to be able to + // effectively stream the data if len(ks) == 0 { log.Warning("Received batch request for zero blocks") continue @@ -232,6 +247,18 @@ func (bs *bitswap) HasBlock(ctx context.Context, blk *blocks.Block) error { return bs.routing.Provide(ctx, blk.Key()) } +func (bs *bitswap) receiveBlock(ctx context.Context, block *blocks.Block) { + // TODO verify blocks? + if err := bs.blockstore.Put(block); err != nil { + log.Criticalf("error putting block: %s", err) + return + } + err := bs.HasBlock(ctx, block) + if err != nil { + log.Warningf("HasBlock errored: %s", err) + } +} + // TODO(brian): handle errors func (bs *bitswap) ReceiveMessage(ctx context.Context, p peer.Peer, incoming bsmsg.BitSwapMessage) ( peer.Peer, bsmsg.BitSwapMessage) { @@ -255,15 +282,7 @@ func (bs *bitswap) ReceiveMessage(ctx context.Context, p peer.Peer, incoming bsm bs.strategy.MessageReceived(p, incoming) // FIRST for _, block := range incoming.Blocks() { - // TODO verify blocks? - if err := bs.blockstore.Put(block); err != nil { - log.Criticalf("error putting block: %s", err) - continue // FIXME(brian): err ignored - } - err := bs.HasBlock(ctx, block) - if err != nil { - log.Warningf("HasBlock errored: %s", err) - } + go bs.receiveBlock(ctx, block) } for _, key := range incoming.Wantlist() { @@ -277,6 +296,8 @@ func (bs *bitswap) ReceiveMessage(ctx context.Context, p peer.Peer, incoming bsm blkmsg := bsmsg.New() // TODO: only send this the first time + // no sense in sending our wantlist to the + // same peer multiple times for _, k := range bs.wantlist.Keys() { blkmsg.AddWanted(k) } diff --git a/exchange/bitswap/strategy/strategy.go b/exchange/bitswap/strategy/strategy.go index d86092da6..fb353d84a 100644 --- a/exchange/bitswap/strategy/strategy.go +++ b/exchange/bitswap/strategy/strategy.go @@ -148,5 +148,5 @@ func (s *strategist) GetBatchSize() int { } func (s *strategist) GetRebroadcastDelay() time.Duration { - return time.Second * 2 + return time.Second * 5 } diff --git a/merkledag/merkledag.go b/merkledag/merkledag.go index 06381bacf..7dadf722d 100644 --- a/merkledag/merkledag.go +++ b/merkledag/merkledag.go @@ -28,7 +28,7 @@ type DAGService interface { AddRecursive(*Node) error Get(u.Key) (*Node, error) Remove(*Node) error - BatchFetch(context.Context, *Node) <-chan *Node + GetKeysAsync(context.Context, *Node) <-chan *Node } func NewDAGService(bs *bserv.BlockService) DAGService { @@ -298,41 +298,33 @@ func FindLink(n *Node, k u.Key, found []*Node) (int, error) { return -1, u.ErrNotFound } -// BatchFetch will fill out all of the links of the given Node. +// GetKeysAsync will fill out all of the links of the given Node. // It returns a channel of nodes, which the caller can receive // all the child nodes of 'root' on, in proper order. -func (ds *dagService) BatchFetch(ctx context.Context, root *Node) <-chan *Node { +func (ds *dagService) GetKeysAsync(ctx context.Context, root *Node) <-chan *Node { sig := make(chan *Node) go func() { var keys []u.Key nodes := make([]*Node, len(root.Links)) - //temp - recvd := []int{} - // - - // - next := 0 - // - for _, lnk := range root.Links { keys = append(keys, u.Key(lnk.Hash)) } blkchan := ds.Blocks.GetBlocks(ctx, keys) - count := 0 + next := 0 for blk := range blkchan { - count++ i, err := FindLink(root, blk.Key(), nodes) if err != nil { + // NB: can only occur as a result of programmer error panic("Received block that wasnt in this nodes links!") } - recvd = append(recvd, i) - nd, err := Decoded(blk.Data) if err != nil { + // NB: can occur in normal situations, with improperly formatted + // input data log.Error("Got back bad block!") break } @@ -347,23 +339,11 @@ func (ds *dagService) BatchFetch(ctx context.Context, root *Node) <-chan *Node { } } if next < len(nodes) { - log.Errorf("count = %d, links = %d", count, len(nodes)) - log.Error(recvd) - panic("didnt receive all requested blocks!") + // TODO: bubble errors back up. + log.Errorf("Did not receive correct number of nodes!") } close(sig) }() return sig } - -func checkForDupes(ks []u.Key) bool { - seen := make(map[u.Key]struct{}) - for _, k := range ks { - if _, ok := seen[k]; ok { - return true - } - seen[k] = struct{}{} - } - return false -} diff --git a/unixfs/io/dagreader.go b/unixfs/io/dagreader.go index 55e677386..b41ac3daa 100644 --- a/unixfs/io/dagreader.go +++ b/unixfs/io/dagreader.go @@ -40,7 +40,7 @@ func NewDagReader(n *mdag.Node, serv mdag.DAGService) (io.Reader, error) { case ftpb.Data_File: var fetchChan <-chan *mdag.Node if serv != nil { - fetchChan = serv.BatchFetch(context.TODO(), n) + fetchChan = serv.GetKeysAsync(context.TODO(), n) } return &DagReader{ node: n,