diff --git a/core/core.go b/core/core.go index bdde93ce2..186e90e6f 100644 --- a/core/core.go +++ b/core/core.go @@ -249,9 +249,6 @@ func (n *IpfsNode) startOnlineServices(ctx context.Context, routingOption Routin // startOnlineServicesWithHost is the set of services which need to be // initialized with the host and _before_ we start listening. func (n *IpfsNode) startOnlineServicesWithHost(ctx context.Context, host p2phost.Host, routingOption RoutingOption) error { - // Wrap standard peer host with routing system to allow unknown peer lookups - n.PeerHost = rhost.Wrap(host, n.Routing) - // setup diagnostics service n.Diagnostics = diag.NewDiagnostics(n.Identity, host) @@ -262,6 +259,9 @@ func (n *IpfsNode) startOnlineServicesWithHost(ctx context.Context, host p2phost } n.Routing = r + // Wrap standard peer host with routing system to allow unknown peer lookups + n.PeerHost = rhost.Wrap(host, n.Routing) + // setup exchange service const alwaysSendToPeer = true // use YesManStrategy bitswapNetwork := bsnet.NewFromIpfsHost(n.PeerHost, n.Routing) diff --git a/exchange/bitswap/bitswap.go b/exchange/bitswap/bitswap.go index 1fcce72d9..ff24e068b 100644 --- a/exchange/bitswap/bitswap.go +++ b/exchange/bitswap/bitswap.go @@ -84,7 +84,7 @@ func New(parent context.Context, p peer.ID, network bsnet.BitSwapNetwork, engine: decision.NewEngine(ctx, bstore), // TODO close the engine with Close() method network: network, wantlist: wantlist.NewThreadSafe(), - batchRequests: make(chan []u.Key, sizeBatchRequestChan), + batchRequests: make(chan *blockRequest, sizeBatchRequestChan), process: px, } network.SetDelegate(bs) @@ -94,6 +94,9 @@ func New(parent context.Context, p peer.ID, network bsnet.BitSwapNetwork, px.Go(func(px process.Process) { bs.taskWorker(ctx) }) + px.Go(func(px process.Process) { + bs.rebroadcastWorker(ctx) + }) return bs } @@ -116,7 +119,7 @@ type bitswap struct { // Requests for a set of related blocks // the assumption is made that the same peer is likely to // have more than a single block in the set - batchRequests chan []u.Key + batchRequests chan *blockRequest engine *decision.Engine @@ -125,6 +128,11 @@ type bitswap struct { process process.Process } +type blockRequest struct { + keys []u.Key + ctx context.Context +} + // GetBlock attempts to retrieve a particular block from peers within the // deadline enforced by the context. func (bs *bitswap) GetBlock(parent context.Context, k u.Key) (*blocks.Block, error) { @@ -175,15 +183,19 @@ func (bs *bitswap) GetBlock(parent context.Context, k u.Key) (*blocks.Block, err // resources, provide a context with a reasonably short deadline (ie. not one // that lasts throughout the lifetime of the server) func (bs *bitswap) GetBlocks(ctx context.Context, keys []u.Key) (<-chan *blocks.Block, error) { - select { case <-bs.process.Closing(): return nil, errors.New("bitswap is closed") default: } promise := bs.notifications.Subscribe(ctx, keys...) + + req := &blockRequest{ + keys: keys, + ctx: ctx, + } select { - case bs.batchRequests <- keys: + case bs.batchRequests <- req: return promise, nil case <-ctx.Done(): return nil, ctx.Err() @@ -321,8 +333,8 @@ func (bs *bitswap) PeerConnected(p peer.ID) { } // Connected/Disconnected warns bitswap about peer connections -func (bs *bitswap) PeerDisconnected(peer.ID) { - // TODO: release resources. +func (bs *bitswap) PeerDisconnected(p peer.ID) { + bs.engine.PeerDisconnected(p) } func (bs *bitswap) cancelBlocks(ctx context.Context, bkeys []u.Key) { @@ -342,6 +354,24 @@ func (bs *bitswap) cancelBlocks(ctx context.Context, bkeys []u.Key) { } } +func (bs *bitswap) wantNewBlocks(ctx context.Context, bkeys []u.Key) { + if len(bkeys) < 1 { + return + } + + message := bsmsg.New() + message.SetFull(false) + for i, k := range bkeys { + message.AddEntry(k, kMaxPriority-i) + } + for _, p := range bs.engine.Peers() { + err := bs.send(ctx, p, message) + if err != nil { + log.Debugf("Error sending message: %s", err) + } + } +} + func (bs *bitswap) ReceiveError(err error) { log.Debugf("Bitswap ReceiveError: %s", err) // TODO log the network error @@ -385,13 +415,42 @@ func (bs *bitswap) taskWorker(ctx context.Context) { // TODO ensure only one active request per key func (bs *bitswap) clientWorker(parent context.Context) { - defer log.Info("bitswap client worker shutting down...") + for { + select { + case req := <-bs.batchRequests: + keys := req.keys + if len(keys) == 0 { + log.Warning("Received batch request for zero blocks") + continue + } + for i, k := range keys { + bs.wantlist.Add(k, kMaxPriority-i) + } + + bs.wantNewBlocks(req.ctx, keys) + + // NB: Optimization. Assumes that providers of key[0] are likely to + // be able to provide for all keys. This currently holds true in most + // every situation. Later, this assumption may not hold as true. + child, _ := context.WithTimeout(req.ctx, providerRequestTimeout) + providers := bs.network.FindProvidersAsync(child, keys[0], maxProvidersPerRequest) + err := bs.sendWantlistToPeers(req.ctx, providers) + if err != nil { + log.Debugf("error sending wantlist: %s", err) + } + case <-parent.Done(): + return + } + } +} + +func (bs *bitswap) rebroadcastWorker(parent context.Context) { ctx, cancel := context.WithCancel(parent) + defer cancel() broadcastSignal := time.After(rebroadcastDelay.Get()) - defer cancel() for { select { @@ -406,23 +465,6 @@ func (bs *bitswap) clientWorker(parent context.Context) { bs.sendWantlistToProviders(ctx, entries) } broadcastSignal = time.After(rebroadcastDelay.Get()) - case keys := <-bs.batchRequests: - if len(keys) == 0 { - log.Warning("Received batch request for zero blocks") - continue - } - for i, k := range keys { - bs.wantlist.Add(k, kMaxPriority-i) - } - // NB: Optimization. Assumes that providers of key[0] are likely to - // be able to provide for all keys. This currently holds true in most - // every situation. Later, this assumption may not hold as true. - child, _ := context.WithTimeout(ctx, providerRequestTimeout) - providers := bs.network.FindProvidersAsync(child, keys[0], maxProvidersPerRequest) - err := bs.sendWantlistToPeers(ctx, providers) - if err != nil { - log.Debugf("error sending wantlist: %s", err) - } case <-parent.Done(): return } diff --git a/exchange/bitswap/decision/engine.go b/exchange/bitswap/decision/engine.go index e4e16e3da..11edf5f6d 100644 --- a/exchange/bitswap/decision/engine.go +++ b/exchange/bitswap/decision/engine.go @@ -228,6 +228,10 @@ func (e *Engine) MessageSent(p peer.ID, m bsmsg.BitSwapMessage) error { return nil } +func (e *Engine) PeerDisconnected(p peer.ID) { + // TODO: release ledger +} + func (e *Engine) numBytesSentTo(p peer.ID) uint64 { // NB not threadsafe return e.findOrCreate(p).Accounting.BytesSent diff --git a/merkledag/merkledag.go b/merkledag/merkledag.go index db48d3b34..cb35f0919 100644 --- a/merkledag/merkledag.go +++ b/merkledag/merkledag.go @@ -187,9 +187,12 @@ func (ds *dagService) GetNodes(ctx context.Context, keys []u.Key) []NodeGetter { } go func() { + ctx, cancel := context.WithCancel(ctx) + defer cancel() + blkchan := ds.Blocks.GetBlocks(ctx, keys) - for { + for count := 0; count < len(keys); { select { case blk, ok := <-blkchan: if !ok { @@ -205,6 +208,7 @@ func (ds *dagService) GetNodes(ctx context.Context, keys []u.Key) []NodeGetter { is := FindLinks(keys, blk.Key(), 0) for _, i := range is { sendChans[i] <- nd + count++ } case <-ctx.Done(): return