From 1f4d3300b0efc93e4d565dd9507d126b77c8492f Mon Sep 17 00:00:00 2001 From: Jeromy Date: Fri, 20 Nov 2015 15:24:14 -0800 Subject: [PATCH] wire contexts into bitswap requests more deeply License: MIT Signed-off-by: Jeromy --- blockservice/blockservice.go | 4 ++ exchange/bitswap/bitswap.go | 51 ++++++----------- exchange/bitswap/decision/engine.go | 2 +- exchange/bitswap/decision/ledger.go | 6 +- exchange/bitswap/wantlist/wantlist.go | 30 +++++++--- exchange/bitswap/wantmanager.go | 19 +++++-- exchange/bitswap/workers.go | 79 +++++++++++++++------------ 7 files changed, 106 insertions(+), 85 deletions(-) diff --git a/blockservice/blockservice.go b/blockservice/blockservice.go index 21af30dfb..0b0397b5b 100644 --- a/blockservice/blockservice.go +++ b/blockservice/blockservice.go @@ -122,6 +122,10 @@ func (s *BlockService) GetBlocks(ctx context.Context, ks []key.Key) <-chan *bloc } } + if len(misses) == 0 { + return + } + rblocks, err := s.Exchange.GetBlocks(ctx, misses) if err != nil { log.Debugf("Error with GetBlocks: %s", err) diff --git a/exchange/bitswap/bitswap.go b/exchange/bitswap/bitswap.go index 8e7f4df48..bf509fc55 100644 --- a/exchange/bitswap/bitswap.go +++ b/exchange/bitswap/bitswap.go @@ -86,7 +86,7 @@ func New(parent context.Context, p peer.ID, network bsnet.BitSwapNetwork, notifications: notif, engine: decision.NewEngine(ctx, bstore), // TODO close the engine with Close() method network: network, - findKeys: make(chan *blockRequest, sizeBatchRequestChan), + findKeys: make(chan *wantlist.Entry, sizeBatchRequestChan), process: px, newBlocks: make(chan *blocks.Block, HasBlockBufferSize), provideKeys: make(chan key.Key, provideKeysBufferSize), @@ -129,7 +129,7 @@ type Bitswap struct { notifications notifications.PubSub // send keys to a worker to find and connect to providers for them - findKeys chan *blockRequest + findKeys chan *wantlist.Entry engine *decision.Engine @@ -146,8 +146,8 @@ type Bitswap struct { } type blockRequest struct { - keys []key.Key - ctx context.Context + key key.Key + ctx context.Context } // GetBlock attempts to retrieve a particular block from peers within the @@ -208,6 +208,12 @@ func (bs *Bitswap) WantlistForPeer(p peer.ID) []key.Key { // resources, provide a context with a reasonably short deadline (ie. not one // that lasts throughout the lifetime of the server) func (bs *Bitswap) GetBlocks(ctx context.Context, keys []key.Key) (<-chan *blocks.Block, error) { + if len(keys) == 0 { + out := make(chan *blocks.Block) + close(out) + return out, nil + } + select { case <-bs.process.Closing(): return nil, errors.New("bitswap is closed") @@ -219,11 +225,14 @@ func (bs *Bitswap) GetBlocks(ctx context.Context, keys []key.Key) (<-chan *block log.Event(ctx, "Bitswap.GetBlockRequest.Start", &k) } - bs.wm.WantBlocks(keys) + bs.wm.WantBlocks(ctx, keys) - req := &blockRequest{ - keys: keys, - ctx: ctx, + // NB: Optimization. Assumes that providers of key[0] are likely to + // be able to provide for all keys. This currently holds true in most + // every situation. Later, this assumption may not hold as true. + req := &wantlist.Entry{ + Key: keys[0], + Ctx: ctx, } select { case bs.findKeys <- req: @@ -276,32 +285,6 @@ func (bs *Bitswap) tryPutBlock(blk *blocks.Block, attempts int) error { return err } -func (bs *Bitswap) connectToProviders(ctx context.Context, entries []wantlist.Entry) { - - ctx, cancel := context.WithCancel(ctx) - defer cancel() - - // Get providers for all entries in wantlist (could take a while) - wg := sync.WaitGroup{} - for _, e := range entries { - wg.Add(1) - go func(k key.Key) { - defer wg.Done() - - child, cancel := context.WithTimeout(ctx, providerRequestTimeout) - defer cancel() - providers := bs.network.FindProvidersAsync(child, k, maxProvidersPerRequest) - for prov := range providers { - go func(p peer.ID) { - bs.network.ConnectTo(ctx, p) - }(prov) - } - }(e.Key) - } - - wg.Wait() // make sure all our children do finish. -} - func (bs *Bitswap) ReceiveMessage(ctx context.Context, p peer.ID, incoming bsmsg.BitSwapMessage) { // This call records changes to wantlists, blocks received, // and number of bytes transfered. diff --git a/exchange/bitswap/decision/engine.go b/exchange/bitswap/decision/engine.go index 6d2577b72..6a026858f 100644 --- a/exchange/bitswap/decision/engine.go +++ b/exchange/bitswap/decision/engine.go @@ -217,7 +217,7 @@ func (e *Engine) MessageReceived(p peer.ID, m bsmsg.BitSwapMessage) error { e.peerRequestQueue.Remove(entry.Key, p) } else { log.Debugf("wants %s - %d", entry.Key, entry.Priority) - l.Wants(entry.Key, entry.Priority) + l.Wants(entry.Ctx, entry.Key, entry.Priority) if exists, err := e.bs.Has(entry.Key); err == nil && exists { e.peerRequestQueue.Push(entry.Entry, p) newWorkExists = true diff --git a/exchange/bitswap/decision/ledger.go b/exchange/bitswap/decision/ledger.go index de133524e..7b8982e47 100644 --- a/exchange/bitswap/decision/ledger.go +++ b/exchange/bitswap/decision/ledger.go @@ -6,6 +6,8 @@ import ( key "github.com/ipfs/go-ipfs/blocks/key" wl "github.com/ipfs/go-ipfs/exchange/bitswap/wantlist" peer "gx/ipfs/QmZwZjMVGss5rqYsJVGy18gNbkTJffFyq2x1uJ4e4p3ZAt/go-libp2p-peer" + + "gx/ipfs/QmZy2y8t9zQH2a1b8q2ZSLKp17ATuJoCNxxyMFG5qFExpt/go-net/context" ) // keySet is just a convenient alias for maps of keys, where we only care @@ -68,9 +70,9 @@ func (l *ledger) ReceivedBytes(n int) { } // TODO: this needs to be different. We need timeouts. -func (l *ledger) Wants(k key.Key, priority int) { +func (l *ledger) Wants(ctx context.Context, k key.Key, priority int) { log.Debugf("peer %s wants %s", l.Partner, k) - l.wantList.Add(k, priority) + l.wantList.Add(ctx, k, priority) } func (l *ledger) CancelWant(k key.Key) { diff --git a/exchange/bitswap/wantlist/wantlist.go b/exchange/bitswap/wantlist/wantlist.go index a82b484a4..545b98f7c 100644 --- a/exchange/bitswap/wantlist/wantlist.go +++ b/exchange/bitswap/wantlist/wantlist.go @@ -3,9 +3,12 @@ package wantlist import ( - key "github.com/ipfs/go-ipfs/blocks/key" "sort" "sync" + + key "github.com/ipfs/go-ipfs/blocks/key" + + "gx/ipfs/QmZy2y8t9zQH2a1b8q2ZSLKp17ATuJoCNxxyMFG5qFExpt/go-net/context" ) type ThreadSafe struct { @@ -16,7 +19,6 @@ type ThreadSafe struct { // not threadsafe type Wantlist struct { set map[key.Key]Entry - // TODO provide O(1) len accessor if cost becomes an issue } type Entry struct { @@ -24,6 +26,7 @@ type Entry struct { // slices can be copied efficiently. Key key.Key Priority int + Ctx context.Context } type entrySlice []Entry @@ -44,22 +47,25 @@ func New() *Wantlist { } } -func (w *ThreadSafe) Add(k key.Key, priority int) { - // TODO rm defer for perf +func (w *ThreadSafe) Add(ctx context.Context, k key.Key, priority int) { w.lk.Lock() defer w.lk.Unlock() - w.Wantlist.Add(k, priority) + w.Wantlist.Add(ctx, k, priority) +} + +func (w *ThreadSafe) AddEntry(e Entry) { + w.lk.Lock() + defer w.lk.Unlock() + w.Wantlist.AddEntry(e) } func (w *ThreadSafe) Remove(k key.Key) { - // TODO rm defer for perf w.lk.Lock() defer w.lk.Unlock() w.Wantlist.Remove(k) } func (w *ThreadSafe) Contains(k key.Key) (Entry, bool) { - // TODO rm defer for perf w.lk.RLock() defer w.lk.RUnlock() return w.Wantlist.Contains(k) @@ -87,16 +93,24 @@ func (w *Wantlist) Len() int { return len(w.set) } -func (w *Wantlist) Add(k key.Key, priority int) { +func (w *Wantlist) Add(ctx context.Context, k key.Key, priority int) { if _, ok := w.set[k]; ok { return } w.set[k] = Entry{ Key: k, Priority: priority, + Ctx: ctx, } } +func (w *Wantlist) AddEntry(e Entry) { + if _, ok := w.set[e.Key]; ok { + return + } + w.set[e.Key] = e +} + func (w *Wantlist) Remove(k key.Key) { delete(w.set, k) } diff --git a/exchange/bitswap/wantmanager.go b/exchange/bitswap/wantmanager.go index f80acbfae..be68b3faa 100644 --- a/exchange/bitswap/wantmanager.go +++ b/exchange/bitswap/wantmanager.go @@ -64,16 +64,16 @@ type msgQueue struct { done chan struct{} } -func (pm *WantManager) WantBlocks(ks []key.Key) { +func (pm *WantManager) WantBlocks(ctx context.Context, ks []key.Key) { log.Infof("want blocks: %s", ks) - pm.addEntries(ks, false) + pm.addEntries(ctx, ks, false) } func (pm *WantManager) CancelWants(ks []key.Key) { - pm.addEntries(ks, true) + pm.addEntries(context.TODO(), ks, true) } -func (pm *WantManager) addEntries(ks []key.Key, cancel bool) { +func (pm *WantManager) addEntries(ctx context.Context, ks []key.Key, cancel bool) { var entries []*bsmsg.Entry for i, k := range ks { entries = append(entries, &bsmsg.Entry{ @@ -81,6 +81,7 @@ func (pm *WantManager) addEntries(ks []key.Key, cancel bool) { Entry: wantlist.Entry{ Key: k, Priority: kMaxPriority - i, + Ctx: ctx, }, }) } @@ -224,7 +225,7 @@ func (pm *WantManager) Run() { if e.Cancel { pm.wl.Remove(e.Key) } else { - pm.wl.Add(e.Key, e.Priority) + pm.wl.AddEntry(e.Entry) } } @@ -237,6 +238,14 @@ func (pm *WantManager) Run() { // resend entire wantlist every so often (REALLY SHOULDNT BE NECESSARY) var es []*bsmsg.Entry for _, e := range pm.wl.Entries() { + select { + case <-e.Ctx.Done(): + // entry has been cancelled + // simply continue, the entry will be removed from the + // wantlist soon enough + continue + default: + } es = append(es, &bsmsg.Entry{Entry: e}) } for _, p := range pm.peers { diff --git a/exchange/bitswap/workers.go b/exchange/bitswap/workers.go index 46f5693f4..1bd9154f5 100644 --- a/exchange/bitswap/workers.go +++ b/exchange/bitswap/workers.go @@ -1,6 +1,7 @@ package bitswap import ( + "sync" "time" process "gx/ipfs/QmQopLATEYMNg7dVqZRNDfeE2S1yKy8zrRh5xnYiuqeZBn/goprocess" @@ -8,6 +9,8 @@ import ( context "gx/ipfs/QmZy2y8t9zQH2a1b8q2ZSLKp17ATuJoCNxxyMFG5qFExpt/go-net/context" key "github.com/ipfs/go-ipfs/blocks/key" + wantlist "github.com/ipfs/go-ipfs/exchange/bitswap/wantlist" + peer "gx/ipfs/QmZwZjMVGss5rqYsJVGy18gNbkTJffFyq2x1uJ4e4p3ZAt/go-libp2p-peer" logging "gx/ipfs/Qmazh5oNUVsDZTs2g59rq8aYQqwpss8tcUWQzor5sCCEuH/go-log" ) @@ -16,7 +19,7 @@ var TaskWorkerCount = 8 func (bs *Bitswap) startWorkers(px process.Process, ctx context.Context) { // Start up a worker to handle block requests this node is making px.Go(func(px process.Process) { - bs.providerConnector(ctx) + bs.providerQueryManager(ctx) }) // Start up workers to handle requests from other nodes for the data on this node @@ -149,37 +152,6 @@ func (bs *Bitswap) provideCollector(ctx context.Context) { } } -// connects to providers for the given keys -func (bs *Bitswap) providerConnector(parent context.Context) { - defer log.Info("bitswap client worker shutting down...") - - for { - log.Event(parent, "Bitswap.ProviderConnector.Loop") - select { - case req := <-bs.findKeys: - keys := req.keys - if len(keys) == 0 { - log.Warning("Received batch request for zero blocks") - continue - } - log.Event(parent, "Bitswap.ProviderConnector.Work", logging.LoggableMap{"Keys": keys}) - - // NB: Optimization. Assumes that providers of key[0] are likely to - // be able to provide for all keys. This currently holds true in most - // every situation. Later, this assumption may not hold as true. - child, cancel := context.WithTimeout(req.ctx, providerRequestTimeout) - providers := bs.network.FindProvidersAsync(child, keys[0], maxProvidersPerRequest) - for p := range providers { - go bs.network.ConnectTo(req.ctx, p) - } - cancel() - - case <-parent.Done(): - return - } - } -} - func (bs *Bitswap) rebroadcastWorker(parent context.Context) { ctx, cancel := context.WithCancel(parent) defer cancel() @@ -200,12 +172,49 @@ func (bs *Bitswap) rebroadcastWorker(parent context.Context) { } case <-broadcastSignal.C: // resend unfulfilled wantlist keys log.Event(ctx, "Bitswap.Rebroadcast.active") - entries := bs.wm.wl.Entries() - if len(entries) > 0 { - bs.connectToProviders(ctx, entries) + for _, e := range bs.wm.wl.Entries() { + bs.findKeys <- &e } case <-parent.Done(): return } } } + +func (bs *Bitswap) providerQueryManager(ctx context.Context) { + var activeLk sync.Mutex + active := make(map[key.Key]*wantlist.Entry) + + for { + select { + case e := <-bs.findKeys: + activeLk.Lock() + if _, ok := active[e.Key]; ok { + activeLk.Unlock() + continue + } + active[e.Key] = e + activeLk.Unlock() + + go func(e *wantlist.Entry) { + child, cancel := context.WithTimeout(e.Ctx, providerRequestTimeout) + defer cancel() + providers := bs.network.FindProvidersAsync(child, e.Key, maxProvidersPerRequest) + for p := range providers { + go func(p peer.ID) { + err := bs.network.ConnectTo(child, p) + if err != nil { + log.Debug("failed to connect to provider %s: %s", p, err) + } + }(p) + } + activeLk.Lock() + delete(active, e.Key) + activeLk.Unlock() + }(e) + + case <-ctx.Done(): + return + } + } +}