From 01d1b69da2b6972eba2e7d6cf59c1bd7ccd05fc9 Mon Sep 17 00:00:00 2001 From: Jeromy Date: Wed, 27 Apr 2016 20:45:06 -0700 Subject: [PATCH] fix doubleGet issue caused by hasblock not announcing License: MIT Signed-off-by: Jeromy --- exchange/bitswap/bitswap.go | 2 ++ exchange/bitswap/bitswap_test.go | 2 ++ exchange/bitswap/decision/engine.go | 36 +++++++++++++++++++-------- exchange/bitswap/decision/ledger.go | 7 ++---- exchange/bitswap/wantlist/wantlist.go | 33 ++++++++++++++++++------ 5 files changed, 57 insertions(+), 23 deletions(-) diff --git a/exchange/bitswap/bitswap.go b/exchange/bitswap/bitswap.go index bf509fc55..c34dbc89b 100644 --- a/exchange/bitswap/bitswap.go +++ b/exchange/bitswap/bitswap.go @@ -264,6 +264,8 @@ func (bs *Bitswap) HasBlock(blk *blocks.Block) error { bs.notifications.Publish(blk) + bs.engine.AddBlock(blk) + select { case bs.newBlocks <- blk: // send block off to be reprovided diff --git a/exchange/bitswap/bitswap_test.go b/exchange/bitswap/bitswap_test.go index 0df1f9b2c..aa367edb1 100644 --- a/exchange/bitswap/bitswap_test.go +++ b/exchange/bitswap/bitswap_test.go @@ -335,6 +335,8 @@ func TestDoubleGet(t *testing.T) { t.Fatal(err) } + // ensure both requests make it into the wantlist at the same time + time.Sleep(time.Millisecond * 100) cancel1() _, ok := <-blkch1 diff --git a/exchange/bitswap/decision/engine.go b/exchange/bitswap/decision/engine.go index 6a026858f..8d738e306 100644 --- a/exchange/bitswap/decision/engine.go +++ b/exchange/bitswap/decision/engine.go @@ -83,7 +83,7 @@ type Engine struct { bs bstore.Blockstore - lock sync.RWMutex // protects the fields immediatly below + lock sync.Mutex // protects the fields immediatly below // ledgerMap lists Ledgers by their Partner key. ledgerMap map[peer.ID]*ledger } @@ -178,8 +178,8 @@ func (e *Engine) Outbox() <-chan (<-chan *Envelope) { // Returns a slice of Peers with whom the local node has active sessions func (e *Engine) Peers() []peer.ID { - e.lock.RLock() - defer e.lock.RUnlock() + e.lock.Lock() + defer e.lock.Unlock() response := make([]peer.ID, 0) for _, ledger := range e.ledgerMap { @@ -217,7 +217,7 @@ func (e *Engine) MessageReceived(p peer.ID, m bsmsg.BitSwapMessage) error { e.peerRequestQueue.Remove(entry.Key, p) } else { log.Debugf("wants %s - %d", entry.Key, entry.Priority) - l.Wants(entry.Ctx, entry.Key, entry.Priority) + l.Wants(entry.Key, entry.Priority) if exists, err := e.bs.Has(entry.Key); err == nil && exists { e.peerRequestQueue.Push(entry.Entry, p) newWorkExists = true @@ -228,16 +228,32 @@ func (e *Engine) MessageReceived(p peer.ID, m bsmsg.BitSwapMessage) error { for _, block := range m.Blocks() { log.Debugf("got block %s %d bytes", block.Key(), len(block.Data)) l.ReceivedBytes(len(block.Data)) - for _, l := range e.ledgerMap { - if entry, ok := l.WantListContains(block.Key()); ok { - e.peerRequestQueue.Push(entry, l.Partner) - newWorkExists = true - } - } } return nil } +func (e *Engine) addBlock(block *blocks.Block) { + work := false + + for _, l := range e.ledgerMap { + if entry, ok := l.WantListContains(block.Key()); ok { + e.peerRequestQueue.Push(entry, l.Partner) + work = true + } + } + + if work { + e.signalNewWork() + } +} + +func (e *Engine) AddBlock(block *blocks.Block) { + e.lock.Lock() + defer e.lock.Unlock() + + e.addBlock(block) +} + // TODO add contents of m.WantList() to my local wantlist? NB: could introduce // race conditions where I send a message, but MessageSent gets handled after // MessageReceived. The information in the local wantlist could become diff --git a/exchange/bitswap/decision/ledger.go b/exchange/bitswap/decision/ledger.go index 7b8982e47..95239de4e 100644 --- a/exchange/bitswap/decision/ledger.go +++ b/exchange/bitswap/decision/ledger.go @@ -6,8 +6,6 @@ import ( key "github.com/ipfs/go-ipfs/blocks/key" wl "github.com/ipfs/go-ipfs/exchange/bitswap/wantlist" peer "gx/ipfs/QmZwZjMVGss5rqYsJVGy18gNbkTJffFyq2x1uJ4e4p3ZAt/go-libp2p-peer" - - "gx/ipfs/QmZy2y8t9zQH2a1b8q2ZSLKp17ATuJoCNxxyMFG5qFExpt/go-net/context" ) // keySet is just a convenient alias for maps of keys, where we only care @@ -69,10 +67,9 @@ func (l *ledger) ReceivedBytes(n int) { l.Accounting.BytesRecv += uint64(n) } -// TODO: this needs to be different. We need timeouts. -func (l *ledger) Wants(ctx context.Context, k key.Key, priority int) { +func (l *ledger) Wants(k key.Key, priority int) { log.Debugf("peer %s wants %s", l.Partner, k) - l.wantList.Add(ctx, k, priority) + l.wantList.Add(k, priority) } func (l *ledger) CancelWant(k key.Key) { diff --git a/exchange/bitswap/wantlist/wantlist.go b/exchange/bitswap/wantlist/wantlist.go index 545b98f7c..77b959a65 100644 --- a/exchange/bitswap/wantlist/wantlist.go +++ b/exchange/bitswap/wantlist/wantlist.go @@ -22,11 +22,12 @@ type Wantlist struct { } type Entry struct { - // TODO consider making entries immutable so they can be shared safely and - // slices can be copied efficiently. Key key.Key Priority int - Ctx context.Context + + Ctx context.Context + cancel func() + RefCnt int } type entrySlice []Entry @@ -47,10 +48,10 @@ func New() *Wantlist { } } -func (w *ThreadSafe) Add(ctx context.Context, k key.Key, priority int) { +func (w *ThreadSafe) Add(k key.Key, priority int) { w.lk.Lock() defer w.lk.Unlock() - w.Wantlist.Add(ctx, k, priority) + w.Wantlist.Add(k, priority) } func (w *ThreadSafe) AddEntry(e Entry) { @@ -93,14 +94,19 @@ func (w *Wantlist) Len() int { return len(w.set) } -func (w *Wantlist) Add(ctx context.Context, k key.Key, priority int) { - if _, ok := w.set[k]; ok { +func (w *Wantlist) Add(k key.Key, priority int) { + if e, ok := w.set[k]; ok { + e.RefCnt++ return } + + ctx, cancel := context.WithCancel(context.Background()) w.set[k] = Entry{ Key: k, Priority: priority, Ctx: ctx, + cancel: cancel, + RefCnt: 1, } } @@ -112,7 +118,18 @@ func (w *Wantlist) AddEntry(e Entry) { } func (w *Wantlist) Remove(k key.Key) { - delete(w.set, k) + e, ok := w.set[k] + if !ok { + return + } + + e.RefCnt-- + if e.RefCnt <= 0 { + delete(w.set, k) + if e.cancel != nil { + e.cancel() + } + } } func (w *Wantlist) Contains(k key.Key) (Entry, bool) {