mirror of
https://github.com/ipfs/kubo.git
synced 2025-07-01 19:24:14 +08:00
Merge pull request #198 from jbenet/fix/bitswap-races
fix(bitswap) data races
This commit is contained in:
@ -16,14 +16,14 @@ type Blockstore interface {
|
||||
Put(*blocks.Block) error
|
||||
}
|
||||
|
||||
func NewBlockstore(d ds.Datastore) Blockstore {
|
||||
func NewBlockstore(d ds.ThreadSafeDatastore) Blockstore {
|
||||
return &blockstore{
|
||||
datastore: d,
|
||||
}
|
||||
}
|
||||
|
||||
type blockstore struct {
|
||||
datastore ds.Datastore
|
||||
datastore ds.ThreadSafeDatastore
|
||||
}
|
||||
|
||||
func (bs *blockstore) Get(k u.Key) (*blocks.Block, error) {
|
||||
|
@ -5,6 +5,7 @@ import (
|
||||
"testing"
|
||||
|
||||
ds "github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-datastore"
|
||||
ds_sync "github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-datastore/sync"
|
||||
blocks "github.com/jbenet/go-ipfs/blocks"
|
||||
u "github.com/jbenet/go-ipfs/util"
|
||||
)
|
||||
@ -12,7 +13,7 @@ import (
|
||||
// TODO(brian): TestGetReturnsNil
|
||||
|
||||
func TestGetWhenKeyNotPresent(t *testing.T) {
|
||||
bs := NewBlockstore(ds.NewMapDatastore())
|
||||
bs := NewBlockstore(ds_sync.MutexWrap(ds.NewMapDatastore()))
|
||||
_, err := bs.Get(u.Key("not present"))
|
||||
|
||||
if err != nil {
|
||||
@ -23,7 +24,7 @@ func TestGetWhenKeyNotPresent(t *testing.T) {
|
||||
}
|
||||
|
||||
func TestPutThenGetBlock(t *testing.T) {
|
||||
bs := NewBlockstore(ds.NewMapDatastore())
|
||||
bs := NewBlockstore(ds_sync.MutexWrap(ds.NewMapDatastore()))
|
||||
block := blocks.NewBlock([]byte("some data"))
|
||||
|
||||
err := bs.Put(block)
|
||||
@ -46,7 +47,7 @@ func TestValueTypeMismatch(t *testing.T) {
|
||||
datastore := ds.NewMapDatastore()
|
||||
datastore.Put(block.Key().DsKey(), "data that isn't a block!")
|
||||
|
||||
blockstore := NewBlockstore(datastore)
|
||||
blockstore := NewBlockstore(ds_sync.MutexWrap(datastore))
|
||||
|
||||
_, err := blockstore.Get(block.Key())
|
||||
if err != ValueTypeMismatch {
|
||||
|
@ -22,7 +22,7 @@ var log = u.Logger("bitswap")
|
||||
// provided NetMessage service
|
||||
func NetMessageSession(parent context.Context, p peer.Peer,
|
||||
net inet.Network, srv inet.Service, directory bsnet.Routing,
|
||||
d ds.Datastore, nice bool) exchange.Interface {
|
||||
d ds.ThreadSafeDatastore, nice bool) exchange.Interface {
|
||||
|
||||
networkAdapter := bsnet.NetMessageAdapter(srv, net, nil)
|
||||
bs := &bitswap{
|
||||
|
@ -9,6 +9,7 @@ import (
|
||||
context "github.com/jbenet/go-ipfs/Godeps/_workspace/src/code.google.com/p/go.net/context"
|
||||
|
||||
ds "github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-datastore"
|
||||
ds_sync "github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-datastore/sync"
|
||||
blocks "github.com/jbenet/go-ipfs/blocks"
|
||||
bstore "github.com/jbenet/go-ipfs/blockstore"
|
||||
exchange "github.com/jbenet/go-ipfs/exchange"
|
||||
@ -279,7 +280,7 @@ func session(net tn.Network, rs mock.RoutingServer, id peer.ID) instance {
|
||||
adapter := net.Adapter(p)
|
||||
htc := rs.Client(p)
|
||||
|
||||
blockstore := bstore.NewBlockstore(ds.NewMapDatastore())
|
||||
blockstore := bstore.NewBlockstore(ds_sync.MutexWrap(ds.NewMapDatastore()))
|
||||
const alwaysSendToPeer = true
|
||||
bs := &bitswap{
|
||||
blockstore: blockstore,
|
||||
|
@ -1,7 +1,6 @@
|
||||
package strategy
|
||||
|
||||
import (
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
peer "github.com/jbenet/go-ipfs/peer"
|
||||
@ -21,9 +20,8 @@ func newLedger(p peer.Peer, strategy strategyFunc) *ledger {
|
||||
}
|
||||
|
||||
// ledger stores the data exchange relationship between two peers.
|
||||
// NOT threadsafe
|
||||
type ledger struct {
|
||||
lock sync.RWMutex
|
||||
|
||||
// Partner is the remote Peer.
|
||||
Partner peer.Peer
|
||||
|
||||
@ -46,25 +44,16 @@ type ledger struct {
|
||||
}
|
||||
|
||||
func (l *ledger) ShouldSend() bool {
|
||||
l.lock.Lock()
|
||||
defer l.lock.Unlock()
|
||||
|
||||
return l.Strategy(l)
|
||||
}
|
||||
|
||||
func (l *ledger) SentBytes(n int) {
|
||||
l.lock.Lock()
|
||||
defer l.lock.Unlock()
|
||||
|
||||
l.exchangeCount++
|
||||
l.lastExchange = time.Now()
|
||||
l.Accounting.BytesSent += uint64(n)
|
||||
}
|
||||
|
||||
func (l *ledger) ReceivedBytes(n int) {
|
||||
l.lock.Lock()
|
||||
defer l.lock.Unlock()
|
||||
|
||||
l.exchangeCount++
|
||||
l.lastExchange = time.Now()
|
||||
l.Accounting.BytesRecv += uint64(n)
|
||||
@ -72,22 +61,14 @@ func (l *ledger) ReceivedBytes(n int) {
|
||||
|
||||
// TODO: this needs to be different. We need timeouts.
|
||||
func (l *ledger) Wants(k u.Key) {
|
||||
l.lock.Lock()
|
||||
defer l.lock.Unlock()
|
||||
|
||||
l.wantList[k] = struct{}{}
|
||||
}
|
||||
|
||||
func (l *ledger) WantListContains(k u.Key) bool {
|
||||
l.lock.RLock()
|
||||
defer l.lock.RUnlock()
|
||||
|
||||
_, ok := l.wantList[k]
|
||||
return ok
|
||||
}
|
||||
|
||||
func (l *ledger) ExchangeCount() uint64 {
|
||||
l.lock.RLock()
|
||||
defer l.lock.RUnlock()
|
||||
return l.exchangeCount
|
||||
}
|
||||
|
@ -1,23 +1 @@
|
||||
package strategy
|
||||
|
||||
import (
|
||||
"sync"
|
||||
"testing"
|
||||
)
|
||||
|
||||
func TestRaceConditions(t *testing.T) {
|
||||
const numberOfExpectedExchanges = 10000
|
||||
l := new(ledger)
|
||||
var wg sync.WaitGroup
|
||||
for i := 0; i < numberOfExpectedExchanges; i++ {
|
||||
wg.Add(1)
|
||||
go func() {
|
||||
defer wg.Done()
|
||||
l.ReceivedBytes(1)
|
||||
}()
|
||||
}
|
||||
wg.Wait()
|
||||
if l.ExchangeCount() != numberOfExpectedExchanges {
|
||||
t.Fail()
|
||||
}
|
||||
}
|
||||
|
@ -2,6 +2,7 @@ package strategy
|
||||
|
||||
import (
|
||||
"errors"
|
||||
"sync"
|
||||
|
||||
bsmsg "github.com/jbenet/go-ipfs/exchange/bitswap/message"
|
||||
"github.com/jbenet/go-ipfs/peer"
|
||||
@ -26,6 +27,7 @@ func New(nice bool) Strategy {
|
||||
}
|
||||
|
||||
type strategist struct {
|
||||
lock sync.RWMutex
|
||||
ledgerMap
|
||||
strategyFunc
|
||||
}
|
||||
@ -38,6 +40,9 @@ type peerKey u.Key
|
||||
|
||||
// Peers returns a list of peers
|
||||
func (s *strategist) Peers() []peer.Peer {
|
||||
s.lock.RLock()
|
||||
defer s.lock.RUnlock()
|
||||
|
||||
response := make([]peer.Peer, 0)
|
||||
for _, ledger := range s.ledgerMap {
|
||||
response = append(response, ledger.Partner)
|
||||
@ -46,20 +51,32 @@ func (s *strategist) Peers() []peer.Peer {
|
||||
}
|
||||
|
||||
func (s *strategist) BlockIsWantedByPeer(k u.Key, p peer.Peer) bool {
|
||||
s.lock.RLock()
|
||||
defer s.lock.RUnlock()
|
||||
|
||||
ledger := s.ledger(p)
|
||||
return ledger.WantListContains(k)
|
||||
}
|
||||
|
||||
func (s *strategist) ShouldSendBlockToPeer(k u.Key, p peer.Peer) bool {
|
||||
s.lock.RLock()
|
||||
defer s.lock.RUnlock()
|
||||
|
||||
ledger := s.ledger(p)
|
||||
return ledger.ShouldSend()
|
||||
}
|
||||
|
||||
func (s *strategist) Seed(int64) {
|
||||
s.lock.Lock()
|
||||
defer s.lock.Unlock()
|
||||
|
||||
// TODO
|
||||
}
|
||||
|
||||
func (s *strategist) MessageReceived(p peer.Peer, m bsmsg.BitSwapMessage) error {
|
||||
s.lock.Lock()
|
||||
defer s.lock.Unlock()
|
||||
|
||||
// TODO find a more elegant way to handle this check
|
||||
if p == nil {
|
||||
return errors.New("Strategy received nil peer")
|
||||
@ -85,6 +102,9 @@ func (s *strategist) MessageReceived(p peer.Peer, m bsmsg.BitSwapMessage) error
|
||||
// send happen atomically
|
||||
|
||||
func (s *strategist) MessageSent(p peer.Peer, m bsmsg.BitSwapMessage) error {
|
||||
s.lock.Lock()
|
||||
defer s.lock.Unlock()
|
||||
|
||||
l := s.ledger(p)
|
||||
for _, block := range m.Blocks() {
|
||||
l.SentBytes(len(block.Data))
|
||||
@ -96,10 +116,16 @@ func (s *strategist) MessageSent(p peer.Peer, m bsmsg.BitSwapMessage) error {
|
||||
}
|
||||
|
||||
func (s *strategist) NumBytesSentTo(p peer.Peer) uint64 {
|
||||
s.lock.RLock()
|
||||
defer s.lock.RUnlock()
|
||||
|
||||
return s.ledger(p).Accounting.BytesSent
|
||||
}
|
||||
|
||||
func (s *strategist) NumBytesReceivedFrom(p peer.Peer) uint64 {
|
||||
s.lock.RLock()
|
||||
defer s.lock.RUnlock()
|
||||
|
||||
return s.ledger(p).Accounting.BytesRecv
|
||||
}
|
||||
|
||||
|
Reference in New Issue
Block a user