mirror of
https://github.com/ipfs/kubo.git
synced 2025-07-02 03:28:25 +08:00
fix(bitswap) move mutex up to strategy from ledger
addresses concurrent access in bitswap session
This commit is contained in:
@ -1,7 +1,6 @@
|
||||
package strategy
|
||||
|
||||
import (
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
peer "github.com/jbenet/go-ipfs/peer"
|
||||
@ -21,9 +20,8 @@ func newLedger(p peer.Peer, strategy strategyFunc) *ledger {
|
||||
}
|
||||
|
||||
// ledger stores the data exchange relationship between two peers.
|
||||
// NOT threadsafe
|
||||
type ledger struct {
|
||||
lock sync.RWMutex
|
||||
|
||||
// Partner is the remote Peer.
|
||||
Partner peer.Peer
|
||||
|
||||
@ -46,25 +44,16 @@ type ledger struct {
|
||||
}
|
||||
|
||||
func (l *ledger) ShouldSend() bool {
|
||||
l.lock.Lock()
|
||||
defer l.lock.Unlock()
|
||||
|
||||
return l.Strategy(l)
|
||||
}
|
||||
|
||||
func (l *ledger) SentBytes(n int) {
|
||||
l.lock.Lock()
|
||||
defer l.lock.Unlock()
|
||||
|
||||
l.exchangeCount++
|
||||
l.lastExchange = time.Now()
|
||||
l.Accounting.BytesSent += uint64(n)
|
||||
}
|
||||
|
||||
func (l *ledger) ReceivedBytes(n int) {
|
||||
l.lock.Lock()
|
||||
defer l.lock.Unlock()
|
||||
|
||||
l.exchangeCount++
|
||||
l.lastExchange = time.Now()
|
||||
l.Accounting.BytesRecv += uint64(n)
|
||||
@ -72,22 +61,14 @@ func (l *ledger) ReceivedBytes(n int) {
|
||||
|
||||
// TODO: this needs to be different. We need timeouts.
|
||||
func (l *ledger) Wants(k u.Key) {
|
||||
l.lock.Lock()
|
||||
defer l.lock.Unlock()
|
||||
|
||||
l.wantList[k] = struct{}{}
|
||||
}
|
||||
|
||||
func (l *ledger) WantListContains(k u.Key) bool {
|
||||
l.lock.RLock()
|
||||
defer l.lock.RUnlock()
|
||||
|
||||
_, ok := l.wantList[k]
|
||||
return ok
|
||||
}
|
||||
|
||||
func (l *ledger) ExchangeCount() uint64 {
|
||||
l.lock.RLock()
|
||||
defer l.lock.RUnlock()
|
||||
return l.exchangeCount
|
||||
}
|
||||
|
@ -1,23 +1 @@
|
||||
package strategy
|
||||
|
||||
import (
|
||||
"sync"
|
||||
"testing"
|
||||
)
|
||||
|
||||
func TestRaceConditions(t *testing.T) {
|
||||
const numberOfExpectedExchanges = 10000
|
||||
l := new(ledger)
|
||||
var wg sync.WaitGroup
|
||||
for i := 0; i < numberOfExpectedExchanges; i++ {
|
||||
wg.Add(1)
|
||||
go func() {
|
||||
defer wg.Done()
|
||||
l.ReceivedBytes(1)
|
||||
}()
|
||||
}
|
||||
wg.Wait()
|
||||
if l.ExchangeCount() != numberOfExpectedExchanges {
|
||||
t.Fail()
|
||||
}
|
||||
}
|
||||
|
@ -2,6 +2,7 @@ package strategy
|
||||
|
||||
import (
|
||||
"errors"
|
||||
"sync"
|
||||
|
||||
bsmsg "github.com/jbenet/go-ipfs/exchange/bitswap/message"
|
||||
"github.com/jbenet/go-ipfs/peer"
|
||||
@ -26,6 +27,7 @@ func New(nice bool) Strategy {
|
||||
}
|
||||
|
||||
type strategist struct {
|
||||
lock sync.RWMutex
|
||||
ledgerMap
|
||||
strategyFunc
|
||||
}
|
||||
@ -38,6 +40,9 @@ type peerKey u.Key
|
||||
|
||||
// Peers returns a list of peers
|
||||
func (s *strategist) Peers() []peer.Peer {
|
||||
s.lock.RLock()
|
||||
defer s.lock.RUnlock()
|
||||
|
||||
response := make([]peer.Peer, 0)
|
||||
for _, ledger := range s.ledgerMap {
|
||||
response = append(response, ledger.Partner)
|
||||
@ -46,20 +51,32 @@ func (s *strategist) Peers() []peer.Peer {
|
||||
}
|
||||
|
||||
func (s *strategist) BlockIsWantedByPeer(k u.Key, p peer.Peer) bool {
|
||||
s.lock.RLock()
|
||||
defer s.lock.RUnlock()
|
||||
|
||||
ledger := s.ledger(p)
|
||||
return ledger.WantListContains(k)
|
||||
}
|
||||
|
||||
func (s *strategist) ShouldSendBlockToPeer(k u.Key, p peer.Peer) bool {
|
||||
s.lock.RLock()
|
||||
defer s.lock.RUnlock()
|
||||
|
||||
ledger := s.ledger(p)
|
||||
return ledger.ShouldSend()
|
||||
}
|
||||
|
||||
func (s *strategist) Seed(int64) {
|
||||
s.lock.Lock()
|
||||
defer s.lock.Unlock()
|
||||
|
||||
// TODO
|
||||
}
|
||||
|
||||
func (s *strategist) MessageReceived(p peer.Peer, m bsmsg.BitSwapMessage) error {
|
||||
s.lock.Lock()
|
||||
defer s.lock.Unlock()
|
||||
|
||||
// TODO find a more elegant way to handle this check
|
||||
if p == nil {
|
||||
return errors.New("Strategy received nil peer")
|
||||
@ -85,6 +102,9 @@ func (s *strategist) MessageReceived(p peer.Peer, m bsmsg.BitSwapMessage) error
|
||||
// send happen atomically
|
||||
|
||||
func (s *strategist) MessageSent(p peer.Peer, m bsmsg.BitSwapMessage) error {
|
||||
s.lock.Lock()
|
||||
defer s.lock.Unlock()
|
||||
|
||||
l := s.ledger(p)
|
||||
for _, block := range m.Blocks() {
|
||||
l.SentBytes(len(block.Data))
|
||||
@ -96,10 +116,16 @@ func (s *strategist) MessageSent(p peer.Peer, m bsmsg.BitSwapMessage) error {
|
||||
}
|
||||
|
||||
func (s *strategist) NumBytesSentTo(p peer.Peer) uint64 {
|
||||
s.lock.RLock()
|
||||
defer s.lock.RUnlock()
|
||||
|
||||
return s.ledger(p).Accounting.BytesSent
|
||||
}
|
||||
|
||||
func (s *strategist) NumBytesReceivedFrom(p peer.Peer) uint64 {
|
||||
s.lock.RLock()
|
||||
defer s.lock.RUnlock()
|
||||
|
||||
return s.ledger(p).Accounting.BytesRecv
|
||||
}
|
||||
|
||||
|
Reference in New Issue
Block a user