From d7eb57f48fb4f84d6474e4cfa1a3983120d3f7d8 Mon Sep 17 00:00:00 2001 From: Jeromy Date: Wed, 18 Feb 2015 08:18:19 +0000 Subject: [PATCH] add worker to bitswap for reproviding new blocks --- blocks/blocks.go | 6 ++ exchange/bitswap/bitswap.go | 108 +++++------------------------ exchange/bitswap/workers.go | 133 ++++++++++++++++++++++++++++++++++++ 3 files changed, 156 insertions(+), 91 deletions(-) create mode 100644 exchange/bitswap/workers.go diff --git a/blocks/blocks.go b/blocks/blocks.go index 46fd2e126..e0d7624c1 100644 --- a/blocks/blocks.go +++ b/blocks/blocks.go @@ -42,3 +42,9 @@ func (b *Block) Key() u.Key { func (b *Block) String() string { return fmt.Sprintf("[Block %s]", b.Key()) } + +func (b *Block) Loggable() map[string]interface{} { + return map[string]interface{}{ + "block": b.Key().String(), + } +} diff --git a/exchange/bitswap/bitswap.go b/exchange/bitswap/bitswap.go index ff24e068b..3046c987c 100644 --- a/exchange/bitswap/bitswap.go +++ b/exchange/bitswap/bitswap.go @@ -8,7 +8,6 @@ import ( "time" context "github.com/jbenet/go-ipfs/Godeps/_workspace/src/code.google.com/p/go.net/context" - inflect "github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/briantigerchow/inflect" process "github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/jbenet/goprocess" blocks "github.com/jbenet/go-ipfs/blocks" @@ -37,9 +36,13 @@ const ( maxProvidersPerRequest = 3 providerRequestTimeout = time.Second * 10 hasBlockTimeout = time.Second * 15 + provideTimeout = time.Second * 15 sizeBatchRequestChan = 32 // kMaxPriority is the max priority as defined by the bitswap protocol kMaxPriority = math.MaxInt32 + + hasBlockBufferSize = 256 + provideWorkers = 4 ) var ( @@ -86,18 +89,12 @@ func New(parent context.Context, p peer.ID, network bsnet.BitSwapNetwork, wantlist: wantlist.NewThreadSafe(), batchRequests: make(chan *blockRequest, sizeBatchRequestChan), process: px, + newBlocks: make(chan *blocks.Block, hasBlockBufferSize), } network.SetDelegate(bs) - px.Go(func(px process.Process) { - bs.clientWorker(ctx) - }) - px.Go(func(px process.Process) { - bs.taskWorker(ctx) - }) - px.Go(func(px process.Process) { - bs.rebroadcastWorker(ctx) - }) + // Start up bitswaps async worker routines + bs.startWorkers(px, ctx) return bs } @@ -126,6 +123,8 @@ type bitswap struct { wantlist *wantlist.ThreadSafe process process.Process + + newBlocks chan *blocks.Block } type blockRequest struct { @@ -172,7 +171,6 @@ func (bs *bitswap) GetBlock(parent context.Context, k u.Key) (*blocks.Block, err case <-parent.Done(): return nil, parent.Err() } - } // GetBlocks returns a channel where the caller may receive blocks that @@ -205,6 +203,7 @@ func (bs *bitswap) GetBlocks(ctx context.Context, keys []u.Key) (<-chan *blocks. // HasBlock announces the existance of a block to this bitswap service. The // service will potentially notify its peers. func (bs *bitswap) HasBlock(ctx context.Context, blk *blocks.Block) error { + log.Event(ctx, "hasBlock", blk) select { case <-bs.process.Closing(): return errors.New("bitswap is closed") @@ -215,7 +214,12 @@ func (bs *bitswap) HasBlock(ctx context.Context, blk *blocks.Block) error { } bs.wantlist.Remove(blk.Key()) bs.notifications.Publish(blk) - return bs.network.Provide(ctx, blk.Key()) + select { + case bs.newBlocks <- blk: + case <-ctx.Done(): + return ctx.Err() + } + return nil } func (bs *bitswap) sendWantlistMsgToPeers(ctx context.Context, m bsmsg.BitSwapMessage, peers <-chan peer.ID) error { @@ -310,6 +314,7 @@ func (bs *bitswap) ReceiveMessage(ctx context.Context, p peer.ID, incoming bsmsg log.Debug(err) } } + var keys []u.Key for _, block := range incoming.Blocks() { keys = append(keys, block.Key()) @@ -391,82 +396,3 @@ func (bs *bitswap) send(ctx context.Context, p peer.ID, m bsmsg.BitSwapMessage) func (bs *bitswap) Close() error { return bs.process.Close() } - -func (bs *bitswap) taskWorker(ctx context.Context) { - defer log.Info("bitswap task worker shutting down...") - for { - select { - case <-ctx.Done(): - return - case nextEnvelope := <-bs.engine.Outbox(): - select { - case <-ctx.Done(): - return - case envelope, ok := <-nextEnvelope: - if !ok { - continue - } - log.Event(ctx, "deliverBlocks", envelope.Message, envelope.Peer) - bs.send(ctx, envelope.Peer, envelope.Message) - } - } - } -} - -// TODO ensure only one active request per key -func (bs *bitswap) clientWorker(parent context.Context) { - defer log.Info("bitswap client worker shutting down...") - - for { - select { - case req := <-bs.batchRequests: - keys := req.keys - if len(keys) == 0 { - log.Warning("Received batch request for zero blocks") - continue - } - for i, k := range keys { - bs.wantlist.Add(k, kMaxPriority-i) - } - - bs.wantNewBlocks(req.ctx, keys) - - // NB: Optimization. Assumes that providers of key[0] are likely to - // be able to provide for all keys. This currently holds true in most - // every situation. Later, this assumption may not hold as true. - child, _ := context.WithTimeout(req.ctx, providerRequestTimeout) - providers := bs.network.FindProvidersAsync(child, keys[0], maxProvidersPerRequest) - err := bs.sendWantlistToPeers(req.ctx, providers) - if err != nil { - log.Debugf("error sending wantlist: %s", err) - } - case <-parent.Done(): - return - } - } -} - -func (bs *bitswap) rebroadcastWorker(parent context.Context) { - ctx, cancel := context.WithCancel(parent) - defer cancel() - - broadcastSignal := time.After(rebroadcastDelay.Get()) - - for { - select { - case <-time.Tick(10 * time.Second): - n := bs.wantlist.Len() - if n > 0 { - log.Debug(n, inflect.FromNumber("keys", n), "in bitswap wantlist") - } - case <-broadcastSignal: // resend unfulfilled wantlist keys - entries := bs.wantlist.Entries() - if len(entries) > 0 { - bs.sendWantlistToProviders(ctx, entries) - } - broadcastSignal = time.After(rebroadcastDelay.Get()) - case <-parent.Done(): - return - } - } -} diff --git a/exchange/bitswap/workers.go b/exchange/bitswap/workers.go new file mode 100644 index 000000000..f2f348305 --- /dev/null +++ b/exchange/bitswap/workers.go @@ -0,0 +1,133 @@ +package bitswap + +import ( + "time" + + context "github.com/jbenet/go-ipfs/Godeps/_workspace/src/code.google.com/p/go.net/context" + inflect "github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/briantigerchow/inflect" + process "github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/jbenet/goprocess" +) + +func (bs *bitswap) startWorkers(px process.Process, ctx context.Context) { + // Start up a worker to handle block requests this node is making + px.Go(func(px process.Process) { + bs.clientWorker(ctx) + }) + + // Start up a worker to handle requests from other nodes for the data on this node + px.Go(func(px process.Process) { + bs.taskWorker(ctx) + }) + + // Start up a worker to manage periodically resending our wantlist out to peers + px.Go(func(px process.Process) { + bs.rebroadcastWorker(ctx) + }) + + // Spawn up multiple workers to handle incoming blocks + // consider increasing number if providing blocks bottlenecks + // file transfers + for i := 0; i < provideWorkers; i++ { + px.Go(func(px process.Process) { + bs.blockReceiveWorker(ctx) + }) + } +} + +func (bs *bitswap) taskWorker(ctx context.Context) { + defer log.Info("bitswap task worker shutting down...") + for { + select { + case nextEnvelope := <-bs.engine.Outbox(): + select { + case envelope, ok := <-nextEnvelope: + if !ok { + continue + } + log.Event(ctx, "deliverBlocks", envelope.Message, envelope.Peer) + bs.send(ctx, envelope.Peer, envelope.Message) + case <-ctx.Done(): + return + } + case <-ctx.Done(): + return + } + } +} + +func (bs *bitswap) blockReceiveWorker(ctx context.Context) { + for { + select { + case blk, ok := <-bs.newBlocks: + if !ok { + log.Debug("newBlocks channel closed") + return + } + ctx, _ := context.WithTimeout(ctx, provideTimeout) + err := bs.network.Provide(ctx, blk.Key()) + if err != nil { + log.Error(err) + } + case <-ctx.Done(): + return + } + } +} + +// TODO ensure only one active request per key +func (bs *bitswap) clientWorker(parent context.Context) { + defer log.Info("bitswap client worker shutting down...") + + for { + select { + case req := <-bs.batchRequests: + keys := req.keys + if len(keys) == 0 { + log.Warning("Received batch request for zero blocks") + continue + } + for i, k := range keys { + bs.wantlist.Add(k, kMaxPriority-i) + } + + bs.wantNewBlocks(req.ctx, keys) + + // NB: Optimization. Assumes that providers of key[0] are likely to + // be able to provide for all keys. This currently holds true in most + // every situation. Later, this assumption may not hold as true. + child, _ := context.WithTimeout(req.ctx, providerRequestTimeout) + providers := bs.network.FindProvidersAsync(child, keys[0], maxProvidersPerRequest) + err := bs.sendWantlistToPeers(req.ctx, providers) + if err != nil { + log.Debugf("error sending wantlist: %s", err) + } + case <-parent.Done(): + return + } + } +} + +func (bs *bitswap) rebroadcastWorker(parent context.Context) { + ctx, cancel := context.WithCancel(parent) + defer cancel() + + broadcastSignal := time.After(rebroadcastDelay.Get()) + + for { + select { + case <-time.Tick(10 * time.Second): + n := bs.wantlist.Len() + if n > 0 { + log.Debug(n, inflect.FromNumber("keys", n), "in bitswap wantlist") + } + case <-broadcastSignal: // resend unfulfilled wantlist keys + entries := bs.wantlist.Entries() + if len(entries) > 0 { + bs.sendWantlistToProviders(ctx, entries) + } + broadcastSignal = time.After(rebroadcastDelay.Get()) + case <-parent.Done(): + return + } + } +}