From c848202c7d7c77a3e7ee5c396d3b748d311e8d00 Mon Sep 17 00:00:00 2001 From: Brian Tiger Chow Date: Fri, 24 Oct 2014 16:15:48 -0700 Subject: [PATCH] fix(bitswap) move mutex up to strategy from ledger addresses concurrent access in bitswap session --- exchange/bitswap/strategy/ledger.go | 21 +------------------ exchange/bitswap/strategy/ledger_test.go | 22 -------------------- exchange/bitswap/strategy/strategy.go | 26 ++++++++++++++++++++++++ 3 files changed, 27 insertions(+), 42 deletions(-) diff --git a/exchange/bitswap/strategy/ledger.go b/exchange/bitswap/strategy/ledger.go index 3700c1f43..9f33b1aba 100644 --- a/exchange/bitswap/strategy/ledger.go +++ b/exchange/bitswap/strategy/ledger.go @@ -1,7 +1,6 @@ package strategy import ( - "sync" "time" peer "github.com/jbenet/go-ipfs/peer" @@ -21,9 +20,8 @@ func newLedger(p peer.Peer, strategy strategyFunc) *ledger { } // ledger stores the data exchange relationship between two peers. +// NOT threadsafe type ledger struct { - lock sync.RWMutex - // Partner is the remote Peer. Partner peer.Peer @@ -46,25 +44,16 @@ type ledger struct { } func (l *ledger) ShouldSend() bool { - l.lock.Lock() - defer l.lock.Unlock() - return l.Strategy(l) } func (l *ledger) SentBytes(n int) { - l.lock.Lock() - defer l.lock.Unlock() - l.exchangeCount++ l.lastExchange = time.Now() l.Accounting.BytesSent += uint64(n) } func (l *ledger) ReceivedBytes(n int) { - l.lock.Lock() - defer l.lock.Unlock() - l.exchangeCount++ l.lastExchange = time.Now() l.Accounting.BytesRecv += uint64(n) @@ -72,22 +61,14 @@ func (l *ledger) ReceivedBytes(n int) { // TODO: this needs to be different. We need timeouts. func (l *ledger) Wants(k u.Key) { - l.lock.Lock() - defer l.lock.Unlock() - l.wantList[k] = struct{}{} } func (l *ledger) WantListContains(k u.Key) bool { - l.lock.RLock() - defer l.lock.RUnlock() - _, ok := l.wantList[k] return ok } func (l *ledger) ExchangeCount() uint64 { - l.lock.RLock() - defer l.lock.RUnlock() return l.exchangeCount } diff --git a/exchange/bitswap/strategy/ledger_test.go b/exchange/bitswap/strategy/ledger_test.go index 0fdfae0cc..4271d525c 100644 --- a/exchange/bitswap/strategy/ledger_test.go +++ b/exchange/bitswap/strategy/ledger_test.go @@ -1,23 +1 @@ package strategy - -import ( - "sync" - "testing" -) - -func TestRaceConditions(t *testing.T) { - const numberOfExpectedExchanges = 10000 - l := new(ledger) - var wg sync.WaitGroup - for i := 0; i < numberOfExpectedExchanges; i++ { - wg.Add(1) - go func() { - defer wg.Done() - l.ReceivedBytes(1) - }() - } - wg.Wait() - if l.ExchangeCount() != numberOfExpectedExchanges { - t.Fail() - } -} diff --git a/exchange/bitswap/strategy/strategy.go b/exchange/bitswap/strategy/strategy.go index 399d7777b..42cbe7773 100644 --- a/exchange/bitswap/strategy/strategy.go +++ b/exchange/bitswap/strategy/strategy.go @@ -2,6 +2,7 @@ package strategy import ( "errors" + "sync" bsmsg "github.com/jbenet/go-ipfs/exchange/bitswap/message" "github.com/jbenet/go-ipfs/peer" @@ -26,6 +27,7 @@ func New(nice bool) Strategy { } type strategist struct { + lock sync.RWMutex ledgerMap strategyFunc } @@ -38,6 +40,9 @@ type peerKey u.Key // Peers returns a list of peers func (s *strategist) Peers() []peer.Peer { + s.lock.RLock() + defer s.lock.RUnlock() + response := make([]peer.Peer, 0) for _, ledger := range s.ledgerMap { response = append(response, ledger.Partner) @@ -46,20 +51,32 @@ func (s *strategist) Peers() []peer.Peer { } func (s *strategist) BlockIsWantedByPeer(k u.Key, p peer.Peer) bool { + s.lock.RLock() + defer s.lock.RUnlock() + ledger := s.ledger(p) return ledger.WantListContains(k) } func (s *strategist) ShouldSendBlockToPeer(k u.Key, p peer.Peer) bool { + s.lock.RLock() + defer s.lock.RUnlock() + ledger := s.ledger(p) return ledger.ShouldSend() } func (s *strategist) Seed(int64) { + s.lock.Lock() + defer s.lock.Unlock() + // TODO } func (s *strategist) MessageReceived(p peer.Peer, m bsmsg.BitSwapMessage) error { + s.lock.Lock() + defer s.lock.Unlock() + // TODO find a more elegant way to handle this check if p == nil { return errors.New("Strategy received nil peer") @@ -85,6 +102,9 @@ func (s *strategist) MessageReceived(p peer.Peer, m bsmsg.BitSwapMessage) error // send happen atomically func (s *strategist) MessageSent(p peer.Peer, m bsmsg.BitSwapMessage) error { + s.lock.Lock() + defer s.lock.Unlock() + l := s.ledger(p) for _, block := range m.Blocks() { l.SentBytes(len(block.Data)) @@ -96,10 +116,16 @@ func (s *strategist) MessageSent(p peer.Peer, m bsmsg.BitSwapMessage) error { } func (s *strategist) NumBytesSentTo(p peer.Peer) uint64 { + s.lock.RLock() + defer s.lock.RUnlock() + return s.ledger(p).Accounting.BytesSent } func (s *strategist) NumBytesReceivedFrom(p peer.Peer) uint64 { + s.lock.RLock() + defer s.lock.RUnlock() + return s.ledger(p).Accounting.BytesRecv }