diff --git a/blockservice/blockservice.go b/blockservice/blockservice.go index 21af30dfb..0b0397b5b 100644 --- a/blockservice/blockservice.go +++ b/blockservice/blockservice.go @@ -122,6 +122,10 @@ func (s *BlockService) GetBlocks(ctx context.Context, ks []key.Key) <-chan *bloc } } + if len(misses) == 0 { + return + } + rblocks, err := s.Exchange.GetBlocks(ctx, misses) if err != nil { log.Debugf("Error with GetBlocks: %s", err) diff --git a/exchange/bitswap/bitswap.go b/exchange/bitswap/bitswap.go index 8e7f4df48..c34dbc89b 100644 --- a/exchange/bitswap/bitswap.go +++ b/exchange/bitswap/bitswap.go @@ -86,7 +86,7 @@ func New(parent context.Context, p peer.ID, network bsnet.BitSwapNetwork, notifications: notif, engine: decision.NewEngine(ctx, bstore), // TODO close the engine with Close() method network: network, - findKeys: make(chan *blockRequest, sizeBatchRequestChan), + findKeys: make(chan *wantlist.Entry, sizeBatchRequestChan), process: px, newBlocks: make(chan *blocks.Block, HasBlockBufferSize), provideKeys: make(chan key.Key, provideKeysBufferSize), @@ -129,7 +129,7 @@ type Bitswap struct { notifications notifications.PubSub // send keys to a worker to find and connect to providers for them - findKeys chan *blockRequest + findKeys chan *wantlist.Entry engine *decision.Engine @@ -146,8 +146,8 @@ type Bitswap struct { } type blockRequest struct { - keys []key.Key - ctx context.Context + key key.Key + ctx context.Context } // GetBlock attempts to retrieve a particular block from peers within the @@ -208,6 +208,12 @@ func (bs *Bitswap) WantlistForPeer(p peer.ID) []key.Key { // resources, provide a context with a reasonably short deadline (ie. not one // that lasts throughout the lifetime of the server) func (bs *Bitswap) GetBlocks(ctx context.Context, keys []key.Key) (<-chan *blocks.Block, error) { + if len(keys) == 0 { + out := make(chan *blocks.Block) + close(out) + return out, nil + } + select { case <-bs.process.Closing(): return nil, errors.New("bitswap is closed") @@ -219,11 +225,14 @@ func (bs *Bitswap) GetBlocks(ctx context.Context, keys []key.Key) (<-chan *block log.Event(ctx, "Bitswap.GetBlockRequest.Start", &k) } - bs.wm.WantBlocks(keys) + bs.wm.WantBlocks(ctx, keys) - req := &blockRequest{ - keys: keys, - ctx: ctx, + // NB: Optimization. Assumes that providers of key[0] are likely to + // be able to provide for all keys. This currently holds true in most + // every situation. Later, this assumption may not hold as true. + req := &wantlist.Entry{ + Key: keys[0], + Ctx: ctx, } select { case bs.findKeys <- req: @@ -255,6 +264,8 @@ func (bs *Bitswap) HasBlock(blk *blocks.Block) error { bs.notifications.Publish(blk) + bs.engine.AddBlock(blk) + select { case bs.newBlocks <- blk: // send block off to be reprovided @@ -276,32 +287,6 @@ func (bs *Bitswap) tryPutBlock(blk *blocks.Block, attempts int) error { return err } -func (bs *Bitswap) connectToProviders(ctx context.Context, entries []wantlist.Entry) { - - ctx, cancel := context.WithCancel(ctx) - defer cancel() - - // Get providers for all entries in wantlist (could take a while) - wg := sync.WaitGroup{} - for _, e := range entries { - wg.Add(1) - go func(k key.Key) { - defer wg.Done() - - child, cancel := context.WithTimeout(ctx, providerRequestTimeout) - defer cancel() - providers := bs.network.FindProvidersAsync(child, k, maxProvidersPerRequest) - for prov := range providers { - go func(p peer.ID) { - bs.network.ConnectTo(ctx, p) - }(prov) - } - }(e.Key) - } - - wg.Wait() // make sure all our children do finish. -} - func (bs *Bitswap) ReceiveMessage(ctx context.Context, p peer.ID, incoming bsmsg.BitSwapMessage) { // This call records changes to wantlists, blocks received, // and number of bytes transfered. diff --git a/exchange/bitswap/bitswap_test.go b/exchange/bitswap/bitswap_test.go index a994019ff..d7fde792b 100644 --- a/exchange/bitswap/bitswap_test.go +++ b/exchange/bitswap/bitswap_test.go @@ -308,3 +308,57 @@ func TestBasicBitswap(t *testing.T) { } } } + +func TestDoubleGet(t *testing.T) { + net := tn.VirtualNetwork(mockrouting.NewServer(), delay.Fixed(kNetworkDelay)) + sg := NewTestSessionGenerator(net) + defer sg.Close() + bg := blocksutil.NewBlockGenerator() + + t.Log("Test a one node trying to get one block from another") + + instances := sg.Instances(2) + blocks := bg.Blocks(1) + + ctx1, cancel1 := context.WithCancel(context.Background()) + + blkch1, err := instances[1].Exchange.GetBlocks(ctx1, []key.Key{blocks[0].Key()}) + if err != nil { + t.Fatal(err) + } + + ctx2, cancel2 := context.WithCancel(context.Background()) + defer cancel2() + + blkch2, err := instances[1].Exchange.GetBlocks(ctx2, []key.Key{blocks[0].Key()}) + if err != nil { + t.Fatal(err) + } + + // ensure both requests make it into the wantlist at the same time + time.Sleep(time.Millisecond * 100) + cancel1() + + _, ok := <-blkch1 + if ok { + t.Fatal("expected channel to be closed") + } + + err = instances[0].Exchange.HasBlock(blocks[0]) + if err != nil { + t.Fatal(err) + } + + blk, ok := <-blkch2 + if !ok { + t.Fatal("expected to get the block here") + } + t.Log(blk) + + for _, inst := range instances { + err := inst.Exchange.Close() + if err != nil { + t.Fatal(err) + } + } +} diff --git a/exchange/bitswap/decision/engine.go b/exchange/bitswap/decision/engine.go index 6d2577b72..8d738e306 100644 --- a/exchange/bitswap/decision/engine.go +++ b/exchange/bitswap/decision/engine.go @@ -83,7 +83,7 @@ type Engine struct { bs bstore.Blockstore - lock sync.RWMutex // protects the fields immediatly below + lock sync.Mutex // protects the fields immediatly below // ledgerMap lists Ledgers by their Partner key. ledgerMap map[peer.ID]*ledger } @@ -178,8 +178,8 @@ func (e *Engine) Outbox() <-chan (<-chan *Envelope) { // Returns a slice of Peers with whom the local node has active sessions func (e *Engine) Peers() []peer.ID { - e.lock.RLock() - defer e.lock.RUnlock() + e.lock.Lock() + defer e.lock.Unlock() response := make([]peer.ID, 0) for _, ledger := range e.ledgerMap { @@ -228,16 +228,32 @@ func (e *Engine) MessageReceived(p peer.ID, m bsmsg.BitSwapMessage) error { for _, block := range m.Blocks() { log.Debugf("got block %s %d bytes", block.Key(), len(block.Data)) l.ReceivedBytes(len(block.Data)) - for _, l := range e.ledgerMap { - if entry, ok := l.WantListContains(block.Key()); ok { - e.peerRequestQueue.Push(entry, l.Partner) - newWorkExists = true - } - } } return nil } +func (e *Engine) addBlock(block *blocks.Block) { + work := false + + for _, l := range e.ledgerMap { + if entry, ok := l.WantListContains(block.Key()); ok { + e.peerRequestQueue.Push(entry, l.Partner) + work = true + } + } + + if work { + e.signalNewWork() + } +} + +func (e *Engine) AddBlock(block *blocks.Block) { + e.lock.Lock() + defer e.lock.Unlock() + + e.addBlock(block) +} + // TODO add contents of m.WantList() to my local wantlist? NB: could introduce // race conditions where I send a message, but MessageSent gets handled after // MessageReceived. The information in the local wantlist could become diff --git a/exchange/bitswap/decision/ledger.go b/exchange/bitswap/decision/ledger.go index de133524e..95239de4e 100644 --- a/exchange/bitswap/decision/ledger.go +++ b/exchange/bitswap/decision/ledger.go @@ -67,7 +67,6 @@ func (l *ledger) ReceivedBytes(n int) { l.Accounting.BytesRecv += uint64(n) } -// TODO: this needs to be different. We need timeouts. func (l *ledger) Wants(k key.Key, priority int) { log.Debugf("peer %s wants %s", l.Partner, k) l.wantList.Add(k, priority) diff --git a/exchange/bitswap/wantlist/wantlist.go b/exchange/bitswap/wantlist/wantlist.go index a82b484a4..77b959a65 100644 --- a/exchange/bitswap/wantlist/wantlist.go +++ b/exchange/bitswap/wantlist/wantlist.go @@ -3,9 +3,12 @@ package wantlist import ( - key "github.com/ipfs/go-ipfs/blocks/key" "sort" "sync" + + key "github.com/ipfs/go-ipfs/blocks/key" + + "gx/ipfs/QmZy2y8t9zQH2a1b8q2ZSLKp17ATuJoCNxxyMFG5qFExpt/go-net/context" ) type ThreadSafe struct { @@ -16,14 +19,15 @@ type ThreadSafe struct { // not threadsafe type Wantlist struct { set map[key.Key]Entry - // TODO provide O(1) len accessor if cost becomes an issue } type Entry struct { - // TODO consider making entries immutable so they can be shared safely and - // slices can be copied efficiently. Key key.Key Priority int + + Ctx context.Context + cancel func() + RefCnt int } type entrySlice []Entry @@ -45,21 +49,24 @@ func New() *Wantlist { } func (w *ThreadSafe) Add(k key.Key, priority int) { - // TODO rm defer for perf w.lk.Lock() defer w.lk.Unlock() w.Wantlist.Add(k, priority) } +func (w *ThreadSafe) AddEntry(e Entry) { + w.lk.Lock() + defer w.lk.Unlock() + w.Wantlist.AddEntry(e) +} + func (w *ThreadSafe) Remove(k key.Key) { - // TODO rm defer for perf w.lk.Lock() defer w.lk.Unlock() w.Wantlist.Remove(k) } func (w *ThreadSafe) Contains(k key.Key) (Entry, bool) { - // TODO rm defer for perf w.lk.RLock() defer w.lk.RUnlock() return w.Wantlist.Contains(k) @@ -88,17 +95,41 @@ func (w *Wantlist) Len() int { } func (w *Wantlist) Add(k key.Key, priority int) { - if _, ok := w.set[k]; ok { + if e, ok := w.set[k]; ok { + e.RefCnt++ return } + + ctx, cancel := context.WithCancel(context.Background()) w.set[k] = Entry{ Key: k, Priority: priority, + Ctx: ctx, + cancel: cancel, + RefCnt: 1, } } +func (w *Wantlist) AddEntry(e Entry) { + if _, ok := w.set[e.Key]; ok { + return + } + w.set[e.Key] = e +} + func (w *Wantlist) Remove(k key.Key) { - delete(w.set, k) + e, ok := w.set[k] + if !ok { + return + } + + e.RefCnt-- + if e.RefCnt <= 0 { + delete(w.set, k) + if e.cancel != nil { + e.cancel() + } + } } func (w *Wantlist) Contains(k key.Key) (Entry, bool) { diff --git a/exchange/bitswap/wantmanager.go b/exchange/bitswap/wantmanager.go index f80acbfae..be68b3faa 100644 --- a/exchange/bitswap/wantmanager.go +++ b/exchange/bitswap/wantmanager.go @@ -64,16 +64,16 @@ type msgQueue struct { done chan struct{} } -func (pm *WantManager) WantBlocks(ks []key.Key) { +func (pm *WantManager) WantBlocks(ctx context.Context, ks []key.Key) { log.Infof("want blocks: %s", ks) - pm.addEntries(ks, false) + pm.addEntries(ctx, ks, false) } func (pm *WantManager) CancelWants(ks []key.Key) { - pm.addEntries(ks, true) + pm.addEntries(context.TODO(), ks, true) } -func (pm *WantManager) addEntries(ks []key.Key, cancel bool) { +func (pm *WantManager) addEntries(ctx context.Context, ks []key.Key, cancel bool) { var entries []*bsmsg.Entry for i, k := range ks { entries = append(entries, &bsmsg.Entry{ @@ -81,6 +81,7 @@ func (pm *WantManager) addEntries(ks []key.Key, cancel bool) { Entry: wantlist.Entry{ Key: k, Priority: kMaxPriority - i, + Ctx: ctx, }, }) } @@ -224,7 +225,7 @@ func (pm *WantManager) Run() { if e.Cancel { pm.wl.Remove(e.Key) } else { - pm.wl.Add(e.Key, e.Priority) + pm.wl.AddEntry(e.Entry) } } @@ -237,6 +238,14 @@ func (pm *WantManager) Run() { // resend entire wantlist every so often (REALLY SHOULDNT BE NECESSARY) var es []*bsmsg.Entry for _, e := range pm.wl.Entries() { + select { + case <-e.Ctx.Done(): + // entry has been cancelled + // simply continue, the entry will be removed from the + // wantlist soon enough + continue + default: + } es = append(es, &bsmsg.Entry{Entry: e}) } for _, p := range pm.peers { diff --git a/exchange/bitswap/workers.go b/exchange/bitswap/workers.go index 46f5693f4..1bd9154f5 100644 --- a/exchange/bitswap/workers.go +++ b/exchange/bitswap/workers.go @@ -1,6 +1,7 @@ package bitswap import ( + "sync" "time" process "gx/ipfs/QmQopLATEYMNg7dVqZRNDfeE2S1yKy8zrRh5xnYiuqeZBn/goprocess" @@ -8,6 +9,8 @@ import ( context "gx/ipfs/QmZy2y8t9zQH2a1b8q2ZSLKp17ATuJoCNxxyMFG5qFExpt/go-net/context" key "github.com/ipfs/go-ipfs/blocks/key" + wantlist "github.com/ipfs/go-ipfs/exchange/bitswap/wantlist" + peer "gx/ipfs/QmZwZjMVGss5rqYsJVGy18gNbkTJffFyq2x1uJ4e4p3ZAt/go-libp2p-peer" logging "gx/ipfs/Qmazh5oNUVsDZTs2g59rq8aYQqwpss8tcUWQzor5sCCEuH/go-log" ) @@ -16,7 +19,7 @@ var TaskWorkerCount = 8 func (bs *Bitswap) startWorkers(px process.Process, ctx context.Context) { // Start up a worker to handle block requests this node is making px.Go(func(px process.Process) { - bs.providerConnector(ctx) + bs.providerQueryManager(ctx) }) // Start up workers to handle requests from other nodes for the data on this node @@ -149,37 +152,6 @@ func (bs *Bitswap) provideCollector(ctx context.Context) { } } -// connects to providers for the given keys -func (bs *Bitswap) providerConnector(parent context.Context) { - defer log.Info("bitswap client worker shutting down...") - - for { - log.Event(parent, "Bitswap.ProviderConnector.Loop") - select { - case req := <-bs.findKeys: - keys := req.keys - if len(keys) == 0 { - log.Warning("Received batch request for zero blocks") - continue - } - log.Event(parent, "Bitswap.ProviderConnector.Work", logging.LoggableMap{"Keys": keys}) - - // NB: Optimization. Assumes that providers of key[0] are likely to - // be able to provide for all keys. This currently holds true in most - // every situation. Later, this assumption may not hold as true. - child, cancel := context.WithTimeout(req.ctx, providerRequestTimeout) - providers := bs.network.FindProvidersAsync(child, keys[0], maxProvidersPerRequest) - for p := range providers { - go bs.network.ConnectTo(req.ctx, p) - } - cancel() - - case <-parent.Done(): - return - } - } -} - func (bs *Bitswap) rebroadcastWorker(parent context.Context) { ctx, cancel := context.WithCancel(parent) defer cancel() @@ -200,12 +172,49 @@ func (bs *Bitswap) rebroadcastWorker(parent context.Context) { } case <-broadcastSignal.C: // resend unfulfilled wantlist keys log.Event(ctx, "Bitswap.Rebroadcast.active") - entries := bs.wm.wl.Entries() - if len(entries) > 0 { - bs.connectToProviders(ctx, entries) + for _, e := range bs.wm.wl.Entries() { + bs.findKeys <- &e } case <-parent.Done(): return } } } + +func (bs *Bitswap) providerQueryManager(ctx context.Context) { + var activeLk sync.Mutex + active := make(map[key.Key]*wantlist.Entry) + + for { + select { + case e := <-bs.findKeys: + activeLk.Lock() + if _, ok := active[e.Key]; ok { + activeLk.Unlock() + continue + } + active[e.Key] = e + activeLk.Unlock() + + go func(e *wantlist.Entry) { + child, cancel := context.WithTimeout(e.Ctx, providerRequestTimeout) + defer cancel() + providers := bs.network.FindProvidersAsync(child, e.Key, maxProvidersPerRequest) + for p := range providers { + go func(p peer.ID) { + err := bs.network.ConnectTo(child, p) + if err != nil { + log.Debug("failed to connect to provider %s: %s", p, err) + } + }(p) + } + activeLk.Lock() + delete(active, e.Key) + activeLk.Unlock() + }(e) + + case <-ctx.Done(): + return + } + } +}