mirror of
https://github.com/ipfs/kubo.git
synced 2025-07-02 03:28:25 +08:00
@ -6,14 +6,16 @@ import (
|
|||||||
"errors"
|
"errors"
|
||||||
|
|
||||||
ds "github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-datastore"
|
ds "github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-datastore"
|
||||||
|
|
||||||
mh "github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-multihash"
|
mh "github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-multihash"
|
||||||
|
|
||||||
blocks "github.com/jbenet/go-ipfs/blocks"
|
blocks "github.com/jbenet/go-ipfs/blocks"
|
||||||
u "github.com/jbenet/go-ipfs/util"
|
u "github.com/jbenet/go-ipfs/util"
|
||||||
)
|
)
|
||||||
|
|
||||||
var ValueTypeMismatch = errors.New("The retrieved value is not a Block")
|
var ValueTypeMismatch = errors.New("The retrieved value is not a Block")
|
||||||
|
|
||||||
|
var ErrNotFound = errors.New("blockstore: block not found")
|
||||||
|
|
||||||
// Blockstore wraps a ThreadSafeDatastore
|
// Blockstore wraps a ThreadSafeDatastore
|
||||||
type Blockstore interface {
|
type Blockstore interface {
|
||||||
DeleteBlock(u.Key) error
|
DeleteBlock(u.Key) error
|
||||||
@ -34,6 +36,9 @@ type blockstore struct {
|
|||||||
|
|
||||||
func (bs *blockstore) Get(k u.Key) (*blocks.Block, error) {
|
func (bs *blockstore) Get(k u.Key) (*blocks.Block, error) {
|
||||||
maybeData, err := bs.datastore.Get(k.DsKey())
|
maybeData, err := bs.datastore.Get(k.DsKey())
|
||||||
|
if err == ds.ErrNotFound {
|
||||||
|
return nil, ErrNotFound
|
||||||
|
}
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
@ -8,8 +8,6 @@ import (
|
|||||||
"fmt"
|
"fmt"
|
||||||
|
|
||||||
context "github.com/jbenet/go-ipfs/Godeps/_workspace/src/code.google.com/p/go.net/context"
|
context "github.com/jbenet/go-ipfs/Godeps/_workspace/src/code.google.com/p/go.net/context"
|
||||||
ds "github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-datastore"
|
|
||||||
|
|
||||||
blocks "github.com/jbenet/go-ipfs/blocks"
|
blocks "github.com/jbenet/go-ipfs/blocks"
|
||||||
"github.com/jbenet/go-ipfs/blocks/blockstore"
|
"github.com/jbenet/go-ipfs/blocks/blockstore"
|
||||||
exchange "github.com/jbenet/go-ipfs/exchange"
|
exchange "github.com/jbenet/go-ipfs/exchange"
|
||||||
@ -67,7 +65,7 @@ func (s *BlockService) GetBlock(ctx context.Context, k u.Key) (*blocks.Block, er
|
|||||||
return block, nil
|
return block, nil
|
||||||
// TODO be careful checking ErrNotFound. If the underlying
|
// TODO be careful checking ErrNotFound. If the underlying
|
||||||
// implementation changes, this will break.
|
// implementation changes, this will break.
|
||||||
} else if err == ds.ErrNotFound && s.Exchange != nil {
|
} else if err == blockstore.ErrNotFound && s.Exchange != nil {
|
||||||
log.Debug("Blockservice: Searching bitswap.")
|
log.Debug("Blockservice: Searching bitswap.")
|
||||||
blk, err := s.Exchange.GetBlock(ctx, k)
|
blk, err := s.Exchange.GetBlock(ctx, k)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
@ -26,10 +26,6 @@ import (
|
|||||||
const kSeed = 1
|
const kSeed = 1
|
||||||
|
|
||||||
func Test100MBInstantaneous(t *testing.T) {
|
func Test100MBInstantaneous(t *testing.T) {
|
||||||
t.Log("a sanity check")
|
|
||||||
|
|
||||||
t.Parallel()
|
|
||||||
|
|
||||||
conf := Config{
|
conf := Config{
|
||||||
NetworkLatency: 0,
|
NetworkLatency: 0,
|
||||||
RoutingLatency: 0,
|
RoutingLatency: 0,
|
||||||
@ -41,10 +37,7 @@ func Test100MBInstantaneous(t *testing.T) {
|
|||||||
|
|
||||||
func TestDegenerateSlowBlockstore(t *testing.T) {
|
func TestDegenerateSlowBlockstore(t *testing.T) {
|
||||||
SkipUnlessEpic(t)
|
SkipUnlessEpic(t)
|
||||||
t.Parallel()
|
|
||||||
|
|
||||||
conf := Config{BlockstoreLatency: 50 * time.Millisecond}
|
conf := Config{BlockstoreLatency: 50 * time.Millisecond}
|
||||||
|
|
||||||
if err := AddCatPowers(conf, 128); err != nil {
|
if err := AddCatPowers(conf, 128); err != nil {
|
||||||
t.Fatal(err)
|
t.Fatal(err)
|
||||||
}
|
}
|
||||||
@ -52,10 +45,7 @@ func TestDegenerateSlowBlockstore(t *testing.T) {
|
|||||||
|
|
||||||
func TestDegenerateSlowNetwork(t *testing.T) {
|
func TestDegenerateSlowNetwork(t *testing.T) {
|
||||||
SkipUnlessEpic(t)
|
SkipUnlessEpic(t)
|
||||||
t.Parallel()
|
|
||||||
|
|
||||||
conf := Config{NetworkLatency: 400 * time.Millisecond}
|
conf := Config{NetworkLatency: 400 * time.Millisecond}
|
||||||
|
|
||||||
if err := AddCatPowers(conf, 128); err != nil {
|
if err := AddCatPowers(conf, 128); err != nil {
|
||||||
t.Fatal(err)
|
t.Fatal(err)
|
||||||
}
|
}
|
||||||
@ -63,10 +53,7 @@ func TestDegenerateSlowNetwork(t *testing.T) {
|
|||||||
|
|
||||||
func TestDegenerateSlowRouting(t *testing.T) {
|
func TestDegenerateSlowRouting(t *testing.T) {
|
||||||
SkipUnlessEpic(t)
|
SkipUnlessEpic(t)
|
||||||
t.Parallel()
|
|
||||||
|
|
||||||
conf := Config{RoutingLatency: 400 * time.Millisecond}
|
conf := Config{RoutingLatency: 400 * time.Millisecond}
|
||||||
|
|
||||||
if err := AddCatPowers(conf, 128); err != nil {
|
if err := AddCatPowers(conf, 128); err != nil {
|
||||||
t.Fatal(err)
|
t.Fatal(err)
|
||||||
}
|
}
|
||||||
@ -74,10 +61,7 @@ func TestDegenerateSlowRouting(t *testing.T) {
|
|||||||
|
|
||||||
func Test100MBMacbookCoastToCoast(t *testing.T) {
|
func Test100MBMacbookCoastToCoast(t *testing.T) {
|
||||||
SkipUnlessEpic(t)
|
SkipUnlessEpic(t)
|
||||||
t.Parallel()
|
|
||||||
|
|
||||||
conf := Config{}.Network_NYtoSF().Blockstore_SlowSSD2014().Routing_Slow()
|
conf := Config{}.Network_NYtoSF().Blockstore_SlowSSD2014().Routing_Slow()
|
||||||
|
|
||||||
if err := AddCatBytes(RandomBytes(100*1024*1024), conf); err != nil {
|
if err := AddCatBytes(RandomBytes(100*1024*1024), conf); err != nil {
|
||||||
t.Fatal(err)
|
t.Fatal(err)
|
||||||
}
|
}
|
||||||
|
@ -3,36 +3,46 @@
|
|||||||
package bitswap
|
package bitswap
|
||||||
|
|
||||||
import (
|
import (
|
||||||
|
"math"
|
||||||
"sync"
|
"sync"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
context "github.com/jbenet/go-ipfs/Godeps/_workspace/src/code.google.com/p/go.net/context"
|
context "github.com/jbenet/go-ipfs/Godeps/_workspace/src/code.google.com/p/go.net/context"
|
||||||
|
|
||||||
blocks "github.com/jbenet/go-ipfs/blocks"
|
blocks "github.com/jbenet/go-ipfs/blocks"
|
||||||
blockstore "github.com/jbenet/go-ipfs/blocks/blockstore"
|
blockstore "github.com/jbenet/go-ipfs/blocks/blockstore"
|
||||||
exchange "github.com/jbenet/go-ipfs/exchange"
|
exchange "github.com/jbenet/go-ipfs/exchange"
|
||||||
|
decision "github.com/jbenet/go-ipfs/exchange/bitswap/decision"
|
||||||
bsmsg "github.com/jbenet/go-ipfs/exchange/bitswap/message"
|
bsmsg "github.com/jbenet/go-ipfs/exchange/bitswap/message"
|
||||||
bsnet "github.com/jbenet/go-ipfs/exchange/bitswap/network"
|
bsnet "github.com/jbenet/go-ipfs/exchange/bitswap/network"
|
||||||
notifications "github.com/jbenet/go-ipfs/exchange/bitswap/notifications"
|
notifications "github.com/jbenet/go-ipfs/exchange/bitswap/notifications"
|
||||||
strategy "github.com/jbenet/go-ipfs/exchange/bitswap/strategy"
|
wantlist "github.com/jbenet/go-ipfs/exchange/bitswap/wantlist"
|
||||||
peer "github.com/jbenet/go-ipfs/peer"
|
peer "github.com/jbenet/go-ipfs/peer"
|
||||||
u "github.com/jbenet/go-ipfs/util"
|
u "github.com/jbenet/go-ipfs/util"
|
||||||
eventlog "github.com/jbenet/go-ipfs/util/eventlog"
|
eventlog "github.com/jbenet/go-ipfs/util/eventlog"
|
||||||
|
pset "github.com/jbenet/go-ipfs/util/peerset"
|
||||||
)
|
)
|
||||||
|
|
||||||
var log = eventlog.Logger("bitswap")
|
var log = eventlog.Logger("bitswap")
|
||||||
|
|
||||||
// Number of providers to request for sending a wantlist to
|
const (
|
||||||
// TODO: if a 'non-nice' strategy is implemented, consider increasing this value
|
// Number of providers to request for sending a wantlist to
|
||||||
const maxProvidersPerRequest = 3
|
// TODO: if a 'non-nice' strategy is implemented, consider increasing this value
|
||||||
|
maxProvidersPerRequest = 3
|
||||||
|
providerRequestTimeout = time.Second * 10
|
||||||
|
hasBlockTimeout = time.Second * 15
|
||||||
|
sizeBatchRequestChan = 32
|
||||||
|
// kMaxPriority is the max priority as defined by the bitswap protocol
|
||||||
|
kMaxPriority = math.MaxInt32
|
||||||
|
)
|
||||||
|
|
||||||
const providerRequestTimeout = time.Second * 10
|
var (
|
||||||
const hasBlockTimeout = time.Second * 15
|
rebroadcastDelay = time.Second * 10
|
||||||
|
)
|
||||||
|
|
||||||
// New initializes a BitSwap instance that communicates over the
|
// New initializes a BitSwap instance that communicates over the provided
|
||||||
// provided BitSwapNetwork. This function registers the returned instance as
|
// BitSwapNetwork. This function registers the returned instance as the network
|
||||||
// the network delegate.
|
// delegate.
|
||||||
// Runs until context is cancelled
|
// Runs until context is cancelled.
|
||||||
func New(parent context.Context, p peer.Peer, network bsnet.BitSwapNetwork, routing bsnet.Routing,
|
func New(parent context.Context, p peer.Peer, network bsnet.BitSwapNetwork, routing bsnet.Routing,
|
||||||
bstore blockstore.Blockstore, nice bool) exchange.Interface {
|
bstore blockstore.Blockstore, nice bool) exchange.Interface {
|
||||||
|
|
||||||
@ -41,6 +51,7 @@ func New(parent context.Context, p peer.Peer, network bsnet.BitSwapNetwork, rout
|
|||||||
notif := notifications.New()
|
notif := notifications.New()
|
||||||
go func() {
|
go func() {
|
||||||
<-ctx.Done()
|
<-ctx.Done()
|
||||||
|
cancelFunc()
|
||||||
notif.Shutdown()
|
notif.Shutdown()
|
||||||
}()
|
}()
|
||||||
|
|
||||||
@ -48,14 +59,15 @@ func New(parent context.Context, p peer.Peer, network bsnet.BitSwapNetwork, rout
|
|||||||
blockstore: bstore,
|
blockstore: bstore,
|
||||||
cancelFunc: cancelFunc,
|
cancelFunc: cancelFunc,
|
||||||
notifications: notif,
|
notifications: notif,
|
||||||
strategy: strategy.New(nice),
|
engine: decision.NewEngine(ctx, bstore),
|
||||||
routing: routing,
|
routing: routing,
|
||||||
sender: network,
|
sender: network,
|
||||||
wantlist: u.NewKeySet(),
|
wantlist: wantlist.NewThreadSafe(),
|
||||||
batchRequests: make(chan []u.Key, 32),
|
batchRequests: make(chan []u.Key, sizeBatchRequestChan),
|
||||||
}
|
}
|
||||||
network.SetDelegate(bs)
|
network.SetDelegate(bs)
|
||||||
go bs.loop(ctx)
|
go bs.clientWorker(ctx)
|
||||||
|
go bs.taskWorker(ctx)
|
||||||
|
|
||||||
return bs
|
return bs
|
||||||
}
|
}
|
||||||
@ -80,12 +92,9 @@ type bitswap struct {
|
|||||||
// have more than a single block in the set
|
// have more than a single block in the set
|
||||||
batchRequests chan []u.Key
|
batchRequests chan []u.Key
|
||||||
|
|
||||||
// strategy listens to network traffic and makes decisions about how to
|
engine *decision.Engine
|
||||||
// interact with partners.
|
|
||||||
// TODO(brian): save the strategy's state to the datastore
|
|
||||||
strategy strategy.Strategy
|
|
||||||
|
|
||||||
wantlist u.KeySet
|
wantlist *wantlist.ThreadSafe
|
||||||
|
|
||||||
// cancelFunc signals cancellation to the bitswap event loop
|
// cancelFunc signals cancellation to the bitswap event loop
|
||||||
cancelFunc func()
|
cancelFunc func()
|
||||||
@ -153,12 +162,7 @@ func (bs *bitswap) HasBlock(ctx context.Context, blk *blocks.Block) error {
|
|||||||
}
|
}
|
||||||
bs.wantlist.Remove(blk.Key())
|
bs.wantlist.Remove(blk.Key())
|
||||||
bs.notifications.Publish(blk)
|
bs.notifications.Publish(blk)
|
||||||
child, _ := context.WithTimeout(ctx, hasBlockTimeout)
|
return bs.routing.Provide(ctx, blk.Key())
|
||||||
if err := bs.sendToPeersThatWant(child, blk); err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
child, _ = context.WithTimeout(ctx, hasBlockTimeout)
|
|
||||||
return bs.routing.Provide(child, blk.Key())
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func (bs *bitswap) sendWantListTo(ctx context.Context, peers <-chan peer.Peer) error {
|
func (bs *bitswap) sendWantListTo(ctx context.Context, peers <-chan peer.Peer) error {
|
||||||
@ -166,13 +170,15 @@ func (bs *bitswap) sendWantListTo(ctx context.Context, peers <-chan peer.Peer) e
|
|||||||
panic("Cant send wantlist to nil peerchan")
|
panic("Cant send wantlist to nil peerchan")
|
||||||
}
|
}
|
||||||
message := bsmsg.New()
|
message := bsmsg.New()
|
||||||
for _, wanted := range bs.wantlist.Keys() {
|
for _, wanted := range bs.wantlist.Entries() {
|
||||||
message.AddWanted(wanted)
|
message.AddEntry(wanted.Key, wanted.Priority)
|
||||||
}
|
}
|
||||||
|
wg := sync.WaitGroup{}
|
||||||
for peerToQuery := range peers {
|
for peerToQuery := range peers {
|
||||||
log.Debug("sending query to: %s", peerToQuery)
|
|
||||||
log.Event(ctx, "PeerToQuery", peerToQuery)
|
log.Event(ctx, "PeerToQuery", peerToQuery)
|
||||||
|
wg.Add(1)
|
||||||
go func(p peer.Peer) {
|
go func(p peer.Peer) {
|
||||||
|
defer wg.Done()
|
||||||
|
|
||||||
log.Event(ctx, "DialPeer", p)
|
log.Event(ctx, "DialPeer", p)
|
||||||
err := bs.sender.DialPeer(ctx, p)
|
err := bs.sender.DialPeer(ctx, p)
|
||||||
@ -189,57 +195,76 @@ func (bs *bitswap) sendWantListTo(ctx context.Context, peers <-chan peer.Peer) e
|
|||||||
// FIXME ensure accounting is handled correctly when
|
// FIXME ensure accounting is handled correctly when
|
||||||
// communication fails. May require slightly different API to
|
// communication fails. May require slightly different API to
|
||||||
// get better guarantees. May need shared sequence numbers.
|
// get better guarantees. May need shared sequence numbers.
|
||||||
bs.strategy.MessageSent(p, message)
|
bs.engine.MessageSent(p, message)
|
||||||
}(peerToQuery)
|
}(peerToQuery)
|
||||||
}
|
}
|
||||||
|
wg.Wait()
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (bs *bitswap) sendWantlistToProviders(ctx context.Context, ks []u.Key) {
|
func (bs *bitswap) sendWantlistToProviders(ctx context.Context, wantlist *wantlist.ThreadSafe) {
|
||||||
|
ctx, cancel := context.WithCancel(ctx)
|
||||||
|
defer cancel()
|
||||||
|
|
||||||
|
message := bsmsg.New()
|
||||||
|
message.SetFull(true)
|
||||||
|
for _, e := range bs.wantlist.Entries() {
|
||||||
|
message.AddEntry(e.Key, e.Priority)
|
||||||
|
}
|
||||||
|
|
||||||
|
ps := pset.New()
|
||||||
|
|
||||||
|
// Get providers for all entries in wantlist (could take a while)
|
||||||
wg := sync.WaitGroup{}
|
wg := sync.WaitGroup{}
|
||||||
for _, k := range ks {
|
for _, e := range wantlist.Entries() {
|
||||||
wg.Add(1)
|
wg.Add(1)
|
||||||
go func(k u.Key) {
|
go func(k u.Key) {
|
||||||
|
defer wg.Done()
|
||||||
child, _ := context.WithTimeout(ctx, providerRequestTimeout)
|
child, _ := context.WithTimeout(ctx, providerRequestTimeout)
|
||||||
providers := bs.routing.FindProvidersAsync(child, k, maxProvidersPerRequest)
|
providers := bs.routing.FindProvidersAsync(child, k, maxProvidersPerRequest)
|
||||||
|
|
||||||
err := bs.sendWantListTo(ctx, providers)
|
for prov := range providers {
|
||||||
if err != nil {
|
if ps.TryAdd(prov) { //Do once per peer
|
||||||
log.Errorf("error sending wantlist: %s", err)
|
bs.send(ctx, prov, message)
|
||||||
|
}
|
||||||
}
|
}
|
||||||
wg.Done()
|
}(e.Key)
|
||||||
}(k)
|
|
||||||
}
|
}
|
||||||
wg.Wait()
|
wg.Wait()
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (bs *bitswap) taskWorker(ctx context.Context) {
|
||||||
|
for {
|
||||||
|
select {
|
||||||
|
case <-ctx.Done():
|
||||||
|
return
|
||||||
|
case envelope := <-bs.engine.Outbox():
|
||||||
|
bs.send(ctx, envelope.Peer, envelope.Message)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
// TODO ensure only one active request per key
|
// TODO ensure only one active request per key
|
||||||
func (bs *bitswap) loop(parent context.Context) {
|
func (bs *bitswap) clientWorker(parent context.Context) {
|
||||||
|
|
||||||
ctx, cancel := context.WithCancel(parent)
|
ctx, cancel := context.WithCancel(parent)
|
||||||
|
|
||||||
broadcastSignal := time.NewTicker(bs.strategy.GetRebroadcastDelay())
|
broadcastSignal := time.After(rebroadcastDelay)
|
||||||
defer func() {
|
defer cancel()
|
||||||
cancel() // signal to derived async functions
|
|
||||||
broadcastSignal.Stop()
|
|
||||||
}()
|
|
||||||
|
|
||||||
for {
|
for {
|
||||||
select {
|
select {
|
||||||
case <-broadcastSignal.C:
|
case <-broadcastSignal:
|
||||||
// Resend unfulfilled wantlist keys
|
// Resend unfulfilled wantlist keys
|
||||||
bs.sendWantlistToProviders(ctx, bs.wantlist.Keys())
|
bs.sendWantlistToProviders(ctx, bs.wantlist)
|
||||||
|
broadcastSignal = time.After(rebroadcastDelay)
|
||||||
case ks := <-bs.batchRequests:
|
case ks := <-bs.batchRequests:
|
||||||
// TODO: implement batching on len(ks) > X for some X
|
|
||||||
// i.e. if given 20 keys, fetch first five, then next
|
|
||||||
// five, and so on, so we are more likely to be able to
|
|
||||||
// effectively stream the data
|
|
||||||
if len(ks) == 0 {
|
if len(ks) == 0 {
|
||||||
log.Warning("Received batch request for zero blocks")
|
log.Warning("Received batch request for zero blocks")
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
for _, k := range ks {
|
for i, k := range ks {
|
||||||
bs.wantlist.Add(k)
|
bs.wantlist.Add(k, kMaxPriority-i)
|
||||||
}
|
}
|
||||||
// NB: send want list to providers for the first peer in this list.
|
// NB: send want list to providers for the first peer in this list.
|
||||||
// the assumption is made that the providers of the first key in
|
// the assumption is made that the providers of the first key in
|
||||||
@ -277,45 +302,45 @@ func (bs *bitswap) ReceiveMessage(ctx context.Context, p peer.Peer, incoming bsm
|
|||||||
return nil, nil
|
return nil, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// Record message bytes in ledger
|
|
||||||
// TODO: this is bad, and could be easily abused.
|
|
||||||
// Should only track *useful* messages in ledger
|
|
||||||
// This call records changes to wantlists, blocks received,
|
// This call records changes to wantlists, blocks received,
|
||||||
// and number of bytes transfered.
|
// and number of bytes transfered.
|
||||||
bs.strategy.MessageReceived(p, incoming)
|
bs.engine.MessageReceived(p, incoming)
|
||||||
|
// TODO: this is bad, and could be easily abused.
|
||||||
|
// Should only track *useful* messages in ledger
|
||||||
|
|
||||||
for _, block := range incoming.Blocks() {
|
for _, block := range incoming.Blocks() {
|
||||||
if err := bs.HasBlock(ctx, block); err != nil {
|
hasBlockCtx, _ := context.WithTimeout(ctx, hasBlockTimeout)
|
||||||
|
if err := bs.HasBlock(hasBlockCtx, block); err != nil {
|
||||||
log.Error(err)
|
log.Error(err)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
var keys []u.Key
|
||||||
for _, key := range incoming.Wantlist() {
|
for _, block := range incoming.Blocks() {
|
||||||
if bs.strategy.ShouldSendBlockToPeer(key, p) {
|
keys = append(keys, block.Key())
|
||||||
if block, errBlockNotFound := bs.blockstore.Get(key); errBlockNotFound != nil {
|
|
||||||
continue
|
|
||||||
} else {
|
|
||||||
// Create a separate message to send this block in
|
|
||||||
blkmsg := bsmsg.New()
|
|
||||||
|
|
||||||
// TODO: only send this the first time
|
|
||||||
// no sense in sending our wantlist to the
|
|
||||||
// same peer multiple times
|
|
||||||
for _, k := range bs.wantlist.Keys() {
|
|
||||||
blkmsg.AddWanted(k)
|
|
||||||
}
|
|
||||||
|
|
||||||
blkmsg.AddBlock(block)
|
|
||||||
bs.send(ctx, p, blkmsg)
|
|
||||||
bs.strategy.BlockSentToPeer(block.Key(), p)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
bs.cancelBlocks(ctx, keys)
|
||||||
|
|
||||||
// TODO: consider changing this function to not return anything
|
// TODO: consider changing this function to not return anything
|
||||||
return nil, nil
|
return nil, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (bs *bitswap) cancelBlocks(ctx context.Context, bkeys []u.Key) {
|
||||||
|
if len(bkeys) < 1 {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
message := bsmsg.New()
|
||||||
|
message.SetFull(false)
|
||||||
|
for _, k := range bkeys {
|
||||||
|
message.Cancel(k)
|
||||||
|
}
|
||||||
|
for _, p := range bs.engine.Peers() {
|
||||||
|
err := bs.send(ctx, p, message)
|
||||||
|
if err != nil {
|
||||||
|
log.Errorf("Error sending message: %s", err)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
func (bs *bitswap) ReceiveError(err error) {
|
func (bs *bitswap) ReceiveError(err error) {
|
||||||
log.Errorf("Bitswap ReceiveError: %s", err)
|
log.Errorf("Bitswap ReceiveError: %s", err)
|
||||||
// TODO log the network error
|
// TODO log the network error
|
||||||
@ -328,25 +353,7 @@ func (bs *bitswap) send(ctx context.Context, p peer.Peer, m bsmsg.BitSwapMessage
|
|||||||
if err := bs.sender.SendMessage(ctx, p, m); err != nil {
|
if err := bs.sender.SendMessage(ctx, p, m); err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
return bs.strategy.MessageSent(p, m)
|
return bs.engine.MessageSent(p, m)
|
||||||
}
|
|
||||||
|
|
||||||
func (bs *bitswap) sendToPeersThatWant(ctx context.Context, block *blocks.Block) error {
|
|
||||||
for _, p := range bs.strategy.Peers() {
|
|
||||||
if bs.strategy.BlockIsWantedByPeer(block.Key(), p) {
|
|
||||||
if bs.strategy.ShouldSendBlockToPeer(block.Key(), p) {
|
|
||||||
message := bsmsg.New()
|
|
||||||
message.AddBlock(block)
|
|
||||||
for _, wanted := range bs.wantlist.Keys() {
|
|
||||||
message.AddWanted(wanted)
|
|
||||||
}
|
|
||||||
if err := bs.send(ctx, p, message); err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
return nil
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func (bs *bitswap) Close() error {
|
func (bs *bitswap) Close() error {
|
||||||
|
@ -11,6 +11,7 @@ import (
|
|||||||
blocksutil "github.com/jbenet/go-ipfs/blocks/blocksutil"
|
blocksutil "github.com/jbenet/go-ipfs/blocks/blocksutil"
|
||||||
tn "github.com/jbenet/go-ipfs/exchange/bitswap/testnet"
|
tn "github.com/jbenet/go-ipfs/exchange/bitswap/testnet"
|
||||||
mockrouting "github.com/jbenet/go-ipfs/routing/mock"
|
mockrouting "github.com/jbenet/go-ipfs/routing/mock"
|
||||||
|
u "github.com/jbenet/go-ipfs/util"
|
||||||
delay "github.com/jbenet/go-ipfs/util/delay"
|
delay "github.com/jbenet/go-ipfs/util/delay"
|
||||||
testutil "github.com/jbenet/go-ipfs/util/testutil"
|
testutil "github.com/jbenet/go-ipfs/util/testutil"
|
||||||
)
|
)
|
||||||
@ -25,6 +26,7 @@ func TestClose(t *testing.T) {
|
|||||||
vnet := tn.VirtualNetwork(delay.Fixed(kNetworkDelay))
|
vnet := tn.VirtualNetwork(delay.Fixed(kNetworkDelay))
|
||||||
rout := mockrouting.NewServer()
|
rout := mockrouting.NewServer()
|
||||||
sesgen := NewSessionGenerator(vnet, rout)
|
sesgen := NewSessionGenerator(vnet, rout)
|
||||||
|
defer sesgen.Close()
|
||||||
bgen := blocksutil.NewBlockGenerator()
|
bgen := blocksutil.NewBlockGenerator()
|
||||||
|
|
||||||
block := bgen.Next()
|
block := bgen.Next()
|
||||||
@ -39,6 +41,7 @@ func TestGetBlockTimeout(t *testing.T) {
|
|||||||
net := tn.VirtualNetwork(delay.Fixed(kNetworkDelay))
|
net := tn.VirtualNetwork(delay.Fixed(kNetworkDelay))
|
||||||
rs := mockrouting.NewServer()
|
rs := mockrouting.NewServer()
|
||||||
g := NewSessionGenerator(net, rs)
|
g := NewSessionGenerator(net, rs)
|
||||||
|
defer g.Close()
|
||||||
|
|
||||||
self := g.Next()
|
self := g.Next()
|
||||||
|
|
||||||
@ -56,11 +59,13 @@ func TestProviderForKeyButNetworkCannotFind(t *testing.T) {
|
|||||||
net := tn.VirtualNetwork(delay.Fixed(kNetworkDelay))
|
net := tn.VirtualNetwork(delay.Fixed(kNetworkDelay))
|
||||||
rs := mockrouting.NewServer()
|
rs := mockrouting.NewServer()
|
||||||
g := NewSessionGenerator(net, rs)
|
g := NewSessionGenerator(net, rs)
|
||||||
|
defer g.Close()
|
||||||
|
|
||||||
block := blocks.NewBlock([]byte("block"))
|
block := blocks.NewBlock([]byte("block"))
|
||||||
rs.Client(testutil.NewPeerWithIDString("testing")).Provide(context.Background(), block.Key()) // but not on network
|
rs.Client(testutil.NewPeerWithIDString("testing")).Provide(context.Background(), block.Key()) // but not on network
|
||||||
|
|
||||||
solo := g.Next()
|
solo := g.Next()
|
||||||
|
defer solo.Exchange.Close()
|
||||||
|
|
||||||
ctx, _ := context.WithTimeout(context.Background(), time.Nanosecond)
|
ctx, _ := context.WithTimeout(context.Background(), time.Nanosecond)
|
||||||
_, err := solo.Exchange.GetBlock(ctx, block.Key())
|
_, err := solo.Exchange.GetBlock(ctx, block.Key())
|
||||||
@ -78,8 +83,10 @@ func TestGetBlockFromPeerAfterPeerAnnounces(t *testing.T) {
|
|||||||
rs := mockrouting.NewServer()
|
rs := mockrouting.NewServer()
|
||||||
block := blocks.NewBlock([]byte("block"))
|
block := blocks.NewBlock([]byte("block"))
|
||||||
g := NewSessionGenerator(net, rs)
|
g := NewSessionGenerator(net, rs)
|
||||||
|
defer g.Close()
|
||||||
|
|
||||||
hasBlock := g.Next()
|
hasBlock := g.Next()
|
||||||
|
defer hasBlock.Exchange.Close()
|
||||||
|
|
||||||
if err := hasBlock.Blockstore().Put(block); err != nil {
|
if err := hasBlock.Blockstore().Put(block); err != nil {
|
||||||
t.Fatal(err)
|
t.Fatal(err)
|
||||||
@ -89,6 +96,7 @@ func TestGetBlockFromPeerAfterPeerAnnounces(t *testing.T) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
wantsBlock := g.Next()
|
wantsBlock := g.Next()
|
||||||
|
defer wantsBlock.Exchange.Close()
|
||||||
|
|
||||||
ctx, _ := context.WithTimeout(context.Background(), time.Second)
|
ctx, _ := context.WithTimeout(context.Background(), time.Second)
|
||||||
received, err := wantsBlock.Exchange.GetBlock(ctx, block.Key())
|
received, err := wantsBlock.Exchange.GetBlock(ctx, block.Key())
|
||||||
@ -107,7 +115,7 @@ func TestLargeSwarm(t *testing.T) {
|
|||||||
t.SkipNow()
|
t.SkipNow()
|
||||||
}
|
}
|
||||||
t.Parallel()
|
t.Parallel()
|
||||||
numInstances := 5
|
numInstances := 500
|
||||||
numBlocks := 2
|
numBlocks := 2
|
||||||
PerformDistributionTest(t, numInstances, numBlocks)
|
PerformDistributionTest(t, numInstances, numBlocks)
|
||||||
}
|
}
|
||||||
@ -129,6 +137,7 @@ func PerformDistributionTest(t *testing.T, numInstances, numBlocks int) {
|
|||||||
net := tn.VirtualNetwork(delay.Fixed(kNetworkDelay))
|
net := tn.VirtualNetwork(delay.Fixed(kNetworkDelay))
|
||||||
rs := mockrouting.NewServer()
|
rs := mockrouting.NewServer()
|
||||||
sg := NewSessionGenerator(net, rs)
|
sg := NewSessionGenerator(net, rs)
|
||||||
|
defer sg.Close()
|
||||||
bg := blocksutil.NewBlockGenerator()
|
bg := blocksutil.NewBlockGenerator()
|
||||||
|
|
||||||
t.Log("Test a few nodes trying to get one file with a lot of blocks")
|
t.Log("Test a few nodes trying to get one file with a lot of blocks")
|
||||||
@ -138,24 +147,29 @@ func PerformDistributionTest(t *testing.T, numInstances, numBlocks int) {
|
|||||||
|
|
||||||
t.Log("Give the blocks to the first instance")
|
t.Log("Give the blocks to the first instance")
|
||||||
|
|
||||||
|
var blkeys []u.Key
|
||||||
first := instances[0]
|
first := instances[0]
|
||||||
for _, b := range blocks {
|
for _, b := range blocks {
|
||||||
first.Blockstore().Put(b)
|
first.Blockstore().Put(b)
|
||||||
|
blkeys = append(blkeys, b.Key())
|
||||||
first.Exchange.HasBlock(context.Background(), b)
|
first.Exchange.HasBlock(context.Background(), b)
|
||||||
rs.Client(first.Peer).Provide(context.Background(), b.Key())
|
rs.Client(first.Peer).Provide(context.Background(), b.Key())
|
||||||
}
|
}
|
||||||
|
|
||||||
t.Log("Distribute!")
|
t.Log("Distribute!")
|
||||||
|
|
||||||
var wg sync.WaitGroup
|
wg := sync.WaitGroup{}
|
||||||
|
|
||||||
for _, inst := range instances {
|
for _, inst := range instances {
|
||||||
for _, b := range blocks {
|
wg.Add(1)
|
||||||
wg.Add(1)
|
go func(inst Instance) {
|
||||||
// NB: executing getOrFail concurrently puts tremendous pressure on
|
defer wg.Done()
|
||||||
// the goroutine scheduler
|
outch, err := inst.Exchange.GetBlocks(context.TODO(), blkeys)
|
||||||
getOrFail(inst, b, t, &wg)
|
if err != nil {
|
||||||
}
|
t.Fatal(err)
|
||||||
|
}
|
||||||
|
for _ = range outch {
|
||||||
|
}
|
||||||
|
}(inst)
|
||||||
}
|
}
|
||||||
wg.Wait()
|
wg.Wait()
|
||||||
|
|
||||||
@ -189,60 +203,75 @@ func TestSendToWantingPeer(t *testing.T) {
|
|||||||
net := tn.VirtualNetwork(delay.Fixed(kNetworkDelay))
|
net := tn.VirtualNetwork(delay.Fixed(kNetworkDelay))
|
||||||
rs := mockrouting.NewServer()
|
rs := mockrouting.NewServer()
|
||||||
sg := NewSessionGenerator(net, rs)
|
sg := NewSessionGenerator(net, rs)
|
||||||
|
defer sg.Close()
|
||||||
bg := blocksutil.NewBlockGenerator()
|
bg := blocksutil.NewBlockGenerator()
|
||||||
|
|
||||||
me := sg.Next()
|
oldVal := rebroadcastDelay
|
||||||
w := sg.Next()
|
rebroadcastDelay = time.Second / 2
|
||||||
o := sg.Next()
|
defer func() { rebroadcastDelay = oldVal }()
|
||||||
|
|
||||||
t.Logf("Session %v\n", me.Peer)
|
peerA := sg.Next()
|
||||||
t.Logf("Session %v\n", w.Peer)
|
peerB := sg.Next()
|
||||||
t.Logf("Session %v\n", o.Peer)
|
|
||||||
|
t.Logf("Session %v\n", peerA.Peer)
|
||||||
|
t.Logf("Session %v\n", peerB.Peer)
|
||||||
|
|
||||||
|
timeout := time.Second
|
||||||
|
waitTime := time.Second * 5
|
||||||
|
|
||||||
alpha := bg.Next()
|
alpha := bg.Next()
|
||||||
|
// peerA requests and waits for block alpha
|
||||||
const timeout = 100 * time.Millisecond // FIXME don't depend on time
|
ctx, _ := context.WithTimeout(context.TODO(), waitTime)
|
||||||
|
alphaPromise, err := peerA.Exchange.GetBlocks(ctx, []u.Key{alpha.Key()})
|
||||||
t.Logf("Peer %v attempts to get %v. NB: not available\n", w.Peer, alpha.Key())
|
|
||||||
ctx, _ := context.WithTimeout(context.Background(), timeout)
|
|
||||||
_, err := w.Exchange.GetBlock(ctx, alpha.Key())
|
|
||||||
if err == nil {
|
|
||||||
t.Fatalf("Expected %v to NOT be available", alpha.Key())
|
|
||||||
}
|
|
||||||
|
|
||||||
beta := bg.Next()
|
|
||||||
t.Logf("Peer %v announes availability of %v\n", w.Peer, beta.Key())
|
|
||||||
ctx, _ = context.WithTimeout(context.Background(), timeout)
|
|
||||||
if err := w.Blockstore().Put(beta); err != nil {
|
|
||||||
t.Fatal(err)
|
|
||||||
}
|
|
||||||
w.Exchange.HasBlock(ctx, beta)
|
|
||||||
|
|
||||||
t.Logf("%v gets %v from %v and discovers it wants %v\n", me.Peer, beta.Key(), w.Peer, alpha.Key())
|
|
||||||
ctx, _ = context.WithTimeout(context.Background(), timeout)
|
|
||||||
if _, err := me.Exchange.GetBlock(ctx, beta.Key()); err != nil {
|
|
||||||
t.Fatal(err)
|
|
||||||
}
|
|
||||||
|
|
||||||
t.Logf("%v announces availability of %v\n", o.Peer, alpha.Key())
|
|
||||||
ctx, _ = context.WithTimeout(context.Background(), timeout)
|
|
||||||
if err := o.Blockstore().Put(alpha); err != nil {
|
|
||||||
t.Fatal(err)
|
|
||||||
}
|
|
||||||
o.Exchange.HasBlock(ctx, alpha)
|
|
||||||
|
|
||||||
t.Logf("%v requests %v\n", me.Peer, alpha.Key())
|
|
||||||
ctx, _ = context.WithTimeout(context.Background(), timeout)
|
|
||||||
if _, err := me.Exchange.GetBlock(ctx, alpha.Key()); err != nil {
|
|
||||||
t.Fatal(err)
|
|
||||||
}
|
|
||||||
|
|
||||||
t.Logf("%v should now have %v\n", w.Peer, alpha.Key())
|
|
||||||
block, err := w.Blockstore().Get(alpha.Key())
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Fatalf("Should not have received an error: %s", err)
|
t.Fatal(err)
|
||||||
}
|
}
|
||||||
if block.Key() != alpha.Key() {
|
|
||||||
t.Fatal("Expected to receive alpha from me")
|
// peerB announces to the network that he has block alpha
|
||||||
|
ctx, _ = context.WithTimeout(context.TODO(), timeout)
|
||||||
|
err = peerB.Exchange.HasBlock(ctx, alpha)
|
||||||
|
if err != nil {
|
||||||
|
t.Fatal(err)
|
||||||
|
}
|
||||||
|
|
||||||
|
// At some point, peerA should get alpha (or timeout)
|
||||||
|
blkrecvd, ok := <-alphaPromise
|
||||||
|
if !ok {
|
||||||
|
t.Fatal("context timed out and broke promise channel!")
|
||||||
|
}
|
||||||
|
|
||||||
|
if blkrecvd.Key() != alpha.Key() {
|
||||||
|
t.Fatal("Wrong block!")
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestBasicBitswap(t *testing.T) {
|
||||||
|
net := tn.VirtualNetwork(delay.Fixed(kNetworkDelay))
|
||||||
|
rs := mockrouting.NewServer()
|
||||||
|
sg := NewSessionGenerator(net, rs)
|
||||||
|
bg := blocksutil.NewBlockGenerator()
|
||||||
|
|
||||||
|
t.Log("Test a few nodes trying to get one file with a lot of blocks")
|
||||||
|
|
||||||
|
instances := sg.Instances(2)
|
||||||
|
blocks := bg.Blocks(1)
|
||||||
|
err := instances[0].Exchange.HasBlock(context.TODO(), blocks[0])
|
||||||
|
if err != nil {
|
||||||
|
t.Fatal(err)
|
||||||
|
}
|
||||||
|
|
||||||
|
ctx, _ := context.WithTimeout(context.TODO(), time.Second*5)
|
||||||
|
blk, err := instances[1].Exchange.GetBlock(ctx, blocks[0].Key())
|
||||||
|
if err != nil {
|
||||||
|
t.Fatal(err)
|
||||||
|
}
|
||||||
|
|
||||||
|
t.Log(blk)
|
||||||
|
for _, inst := range instances {
|
||||||
|
err := inst.Exchange.Close()
|
||||||
|
if err != nil {
|
||||||
|
t.Fatal(err)
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
224
exchange/bitswap/decision/engine.go
Normal file
224
exchange/bitswap/decision/engine.go
Normal file
@ -0,0 +1,224 @@
|
|||||||
|
package decision
|
||||||
|
|
||||||
|
import (
|
||||||
|
"sync"
|
||||||
|
|
||||||
|
context "github.com/jbenet/go-ipfs/Godeps/_workspace/src/code.google.com/p/go.net/context"
|
||||||
|
bstore "github.com/jbenet/go-ipfs/blocks/blockstore"
|
||||||
|
bsmsg "github.com/jbenet/go-ipfs/exchange/bitswap/message"
|
||||||
|
wl "github.com/jbenet/go-ipfs/exchange/bitswap/wantlist"
|
||||||
|
peer "github.com/jbenet/go-ipfs/peer"
|
||||||
|
u "github.com/jbenet/go-ipfs/util"
|
||||||
|
)
|
||||||
|
|
||||||
|
// TODO consider taking responsibility for other types of requests. For
|
||||||
|
// example, there could be a |cancelQueue| for all of the cancellation
|
||||||
|
// messages that need to go out. There could also be a |wantlistQueue| for
|
||||||
|
// the local peer's wantlists. Alternatively, these could all be bundled
|
||||||
|
// into a single, intelligent global queue that efficiently
|
||||||
|
// batches/combines and takes all of these into consideration.
|
||||||
|
//
|
||||||
|
// Right now, messages go onto the network for four reasons:
|
||||||
|
// 1. an initial `sendwantlist` message to a provider of the first key in a request
|
||||||
|
// 2. a periodic full sweep of `sendwantlist` messages to all providers
|
||||||
|
// 3. upon receipt of blocks, a `cancel` message to all peers
|
||||||
|
// 4. draining the priority queue of `blockrequests` from peers
|
||||||
|
//
|
||||||
|
// Presently, only `blockrequests` are handled by the decision engine.
|
||||||
|
// However, there is an opportunity to give it more responsibility! If the
|
||||||
|
// decision engine is given responsibility for all of the others, it can
|
||||||
|
// intelligently decide how to combine requests efficiently.
|
||||||
|
//
|
||||||
|
// Some examples of what would be possible:
|
||||||
|
//
|
||||||
|
// * when sending out the wantlists, include `cancel` requests
|
||||||
|
// * when handling `blockrequests`, include `sendwantlist` and `cancel` as appropriate
|
||||||
|
// * when handling `cancel`, if we recently received a wanted block from a
|
||||||
|
// peer, include a partial wantlist that contains a few other high priority
|
||||||
|
// blocks
|
||||||
|
//
|
||||||
|
// In a sense, if we treat the decision engine as a black box, it could do
|
||||||
|
// whatever it sees fit to produce desired outcomes (get wanted keys
|
||||||
|
// quickly, maintain good relationships with peers, etc).
|
||||||
|
|
||||||
|
var log = u.Logger("engine")
|
||||||
|
|
||||||
|
const (
|
||||||
|
sizeOutboxChan = 4
|
||||||
|
)
|
||||||
|
|
||||||
|
// Envelope contains a message for a Peer
|
||||||
|
type Envelope struct {
|
||||||
|
// Peer is the intended recipient
|
||||||
|
Peer peer.Peer
|
||||||
|
// Message is the payload
|
||||||
|
Message bsmsg.BitSwapMessage
|
||||||
|
}
|
||||||
|
|
||||||
|
type Engine struct {
|
||||||
|
// peerRequestQueue is a priority queue of requests received from peers.
|
||||||
|
// Requests are popped from the queue, packaged up, and placed in the
|
||||||
|
// outbox.
|
||||||
|
peerRequestQueue *taskQueue
|
||||||
|
|
||||||
|
// FIXME it's a bit odd for the client and the worker to both share memory
|
||||||
|
// (both modify the peerRequestQueue) and also to communicate over the
|
||||||
|
// workSignal channel. consider sending requests over the channel and
|
||||||
|
// allowing the worker to have exclusive access to the peerRequestQueue. In
|
||||||
|
// that case, no lock would be required.
|
||||||
|
workSignal chan struct{}
|
||||||
|
|
||||||
|
// outbox contains outgoing messages to peers
|
||||||
|
outbox chan Envelope
|
||||||
|
|
||||||
|
bs bstore.Blockstore
|
||||||
|
|
||||||
|
lock sync.RWMutex // protects the fields immediatly below
|
||||||
|
// ledgerMap lists Ledgers by their Partner key.
|
||||||
|
ledgerMap map[u.Key]*ledger
|
||||||
|
}
|
||||||
|
|
||||||
|
func NewEngine(ctx context.Context, bs bstore.Blockstore) *Engine {
|
||||||
|
e := &Engine{
|
||||||
|
ledgerMap: make(map[u.Key]*ledger),
|
||||||
|
bs: bs,
|
||||||
|
peerRequestQueue: newTaskQueue(),
|
||||||
|
outbox: make(chan Envelope, sizeOutboxChan),
|
||||||
|
workSignal: make(chan struct{}),
|
||||||
|
}
|
||||||
|
go e.taskWorker(ctx)
|
||||||
|
return e
|
||||||
|
}
|
||||||
|
|
||||||
|
func (e *Engine) taskWorker(ctx context.Context) {
|
||||||
|
for {
|
||||||
|
nextTask := e.peerRequestQueue.Pop()
|
||||||
|
if nextTask == nil {
|
||||||
|
// No tasks in the list?
|
||||||
|
// Wait until there are!
|
||||||
|
select {
|
||||||
|
case <-ctx.Done():
|
||||||
|
return
|
||||||
|
case <-e.workSignal:
|
||||||
|
}
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
block, err := e.bs.Get(nextTask.Entry.Key)
|
||||||
|
if err != nil {
|
||||||
|
log.Warning("engine: task exists to send block, but block is not in blockstore")
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
// construct message here so we can make decisions about any additional
|
||||||
|
// information we may want to include at this time.
|
||||||
|
m := bsmsg.New()
|
||||||
|
m.AddBlock(block)
|
||||||
|
// TODO: maybe add keys from our wantlist?
|
||||||
|
select {
|
||||||
|
case <-ctx.Done():
|
||||||
|
return
|
||||||
|
case e.outbox <- Envelope{Peer: nextTask.Target, Message: m}:
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (e *Engine) Outbox() <-chan Envelope {
|
||||||
|
return e.outbox
|
||||||
|
}
|
||||||
|
|
||||||
|
// Returns a slice of Peers with whom the local node has active sessions
|
||||||
|
func (e *Engine) Peers() []peer.Peer {
|
||||||
|
e.lock.RLock()
|
||||||
|
defer e.lock.RUnlock()
|
||||||
|
|
||||||
|
response := make([]peer.Peer, 0)
|
||||||
|
for _, ledger := range e.ledgerMap {
|
||||||
|
response = append(response, ledger.Partner)
|
||||||
|
}
|
||||||
|
return response
|
||||||
|
}
|
||||||
|
|
||||||
|
// MessageReceived performs book-keeping. Returns error if passed invalid
|
||||||
|
// arguments.
|
||||||
|
func (e *Engine) MessageReceived(p peer.Peer, m bsmsg.BitSwapMessage) error {
|
||||||
|
newWorkExists := false
|
||||||
|
defer func() {
|
||||||
|
if newWorkExists {
|
||||||
|
// Signal task generation to restart (if stopped!)
|
||||||
|
select {
|
||||||
|
case e.workSignal <- struct{}{}:
|
||||||
|
default:
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}()
|
||||||
|
e.lock.Lock()
|
||||||
|
defer e.lock.Unlock()
|
||||||
|
|
||||||
|
l := e.findOrCreate(p)
|
||||||
|
if m.Full() {
|
||||||
|
l.wantList = wl.New()
|
||||||
|
}
|
||||||
|
for _, entry := range m.Wantlist() {
|
||||||
|
if entry.Cancel {
|
||||||
|
l.CancelWant(entry.Key)
|
||||||
|
e.peerRequestQueue.Remove(entry.Key, p)
|
||||||
|
} else {
|
||||||
|
l.Wants(entry.Key, entry.Priority)
|
||||||
|
if exists, err := e.bs.Has(entry.Key); err == nil && exists {
|
||||||
|
newWorkExists = true
|
||||||
|
e.peerRequestQueue.Push(entry.Entry, p)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
for _, block := range m.Blocks() {
|
||||||
|
// FIXME extract blocks.NumBytes(block) or block.NumBytes() method
|
||||||
|
l.ReceivedBytes(len(block.Data))
|
||||||
|
for _, l := range e.ledgerMap {
|
||||||
|
if l.WantListContains(block.Key()) {
|
||||||
|
newWorkExists = true
|
||||||
|
e.peerRequestQueue.Push(wl.Entry{block.Key(), 1}, l.Partner)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// TODO add contents of m.WantList() to my local wantlist? NB: could introduce
|
||||||
|
// race conditions where I send a message, but MessageSent gets handled after
|
||||||
|
// MessageReceived. The information in the local wantlist could become
|
||||||
|
// inconsistent. Would need to ensure that Sends and acknowledgement of the
|
||||||
|
// send happen atomically
|
||||||
|
|
||||||
|
func (e *Engine) MessageSent(p peer.Peer, m bsmsg.BitSwapMessage) error {
|
||||||
|
e.lock.Lock()
|
||||||
|
defer e.lock.Unlock()
|
||||||
|
|
||||||
|
l := e.findOrCreate(p)
|
||||||
|
for _, block := range m.Blocks() {
|
||||||
|
l.SentBytes(len(block.Data))
|
||||||
|
l.wantList.Remove(block.Key())
|
||||||
|
e.peerRequestQueue.Remove(block.Key(), p)
|
||||||
|
}
|
||||||
|
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (e *Engine) numBytesSentTo(p peer.Peer) uint64 {
|
||||||
|
// NB not threadsafe
|
||||||
|
return e.findOrCreate(p).Accounting.BytesSent
|
||||||
|
}
|
||||||
|
|
||||||
|
func (e *Engine) numBytesReceivedFrom(p peer.Peer) uint64 {
|
||||||
|
// NB not threadsafe
|
||||||
|
return e.findOrCreate(p).Accounting.BytesRecv
|
||||||
|
}
|
||||||
|
|
||||||
|
// ledger lazily instantiates a ledger
|
||||||
|
func (e *Engine) findOrCreate(p peer.Peer) *ledger {
|
||||||
|
l, ok := e.ledgerMap[p.Key()]
|
||||||
|
if !ok {
|
||||||
|
l = newLedger(p)
|
||||||
|
e.ledgerMap[p.Key()] = l
|
||||||
|
}
|
||||||
|
return l
|
||||||
|
}
|
93
exchange/bitswap/decision/engine_test.go
Normal file
93
exchange/bitswap/decision/engine_test.go
Normal file
@ -0,0 +1,93 @@
|
|||||||
|
package decision
|
||||||
|
|
||||||
|
import (
|
||||||
|
"strings"
|
||||||
|
"testing"
|
||||||
|
|
||||||
|
context "github.com/jbenet/go-ipfs/Godeps/_workspace/src/code.google.com/p/go.net/context"
|
||||||
|
ds "github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-datastore"
|
||||||
|
sync "github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-datastore/sync"
|
||||||
|
blocks "github.com/jbenet/go-ipfs/blocks"
|
||||||
|
blockstore "github.com/jbenet/go-ipfs/blocks/blockstore"
|
||||||
|
message "github.com/jbenet/go-ipfs/exchange/bitswap/message"
|
||||||
|
peer "github.com/jbenet/go-ipfs/peer"
|
||||||
|
testutil "github.com/jbenet/go-ipfs/util/testutil"
|
||||||
|
)
|
||||||
|
|
||||||
|
type peerAndEngine struct {
|
||||||
|
peer.Peer
|
||||||
|
Engine *Engine
|
||||||
|
}
|
||||||
|
|
||||||
|
func newPeerAndLedgermanager(idStr string) peerAndEngine {
|
||||||
|
return peerAndEngine{
|
||||||
|
Peer: testutil.NewPeerWithIDString(idStr),
|
||||||
|
//Strategy: New(true),
|
||||||
|
Engine: NewEngine(context.TODO(),
|
||||||
|
blockstore.NewBlockstore(sync.MutexWrap(ds.NewMapDatastore()))),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestConsistentAccounting(t *testing.T) {
|
||||||
|
sender := newPeerAndLedgermanager("Ernie")
|
||||||
|
receiver := newPeerAndLedgermanager("Bert")
|
||||||
|
|
||||||
|
// Send messages from Ernie to Bert
|
||||||
|
for i := 0; i < 1000; i++ {
|
||||||
|
|
||||||
|
m := message.New()
|
||||||
|
content := []string{"this", "is", "message", "i"}
|
||||||
|
m.AddBlock(blocks.NewBlock([]byte(strings.Join(content, " "))))
|
||||||
|
|
||||||
|
sender.Engine.MessageSent(receiver.Peer, m)
|
||||||
|
receiver.Engine.MessageReceived(sender.Peer, m)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Ensure sender records the change
|
||||||
|
if sender.Engine.numBytesSentTo(receiver.Peer) == 0 {
|
||||||
|
t.Fatal("Sent bytes were not recorded")
|
||||||
|
}
|
||||||
|
|
||||||
|
// Ensure sender and receiver have the same values
|
||||||
|
if sender.Engine.numBytesSentTo(receiver.Peer) != receiver.Engine.numBytesReceivedFrom(sender.Peer) {
|
||||||
|
t.Fatal("Inconsistent book-keeping. Strategies don't agree")
|
||||||
|
}
|
||||||
|
|
||||||
|
// Ensure sender didn't record receving anything. And that the receiver
|
||||||
|
// didn't record sending anything
|
||||||
|
if receiver.Engine.numBytesSentTo(sender.Peer) != 0 || sender.Engine.numBytesReceivedFrom(receiver.Peer) != 0 {
|
||||||
|
t.Fatal("Bert didn't send bytes to Ernie")
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestPeerIsAddedToPeersWhenMessageReceivedOrSent(t *testing.T) {
|
||||||
|
|
||||||
|
sanfrancisco := newPeerAndLedgermanager("sf")
|
||||||
|
seattle := newPeerAndLedgermanager("sea")
|
||||||
|
|
||||||
|
m := message.New()
|
||||||
|
|
||||||
|
sanfrancisco.Engine.MessageSent(seattle.Peer, m)
|
||||||
|
seattle.Engine.MessageReceived(sanfrancisco.Peer, m)
|
||||||
|
|
||||||
|
if seattle.Peer.Key() == sanfrancisco.Peer.Key() {
|
||||||
|
t.Fatal("Sanity Check: Peers have same Key!")
|
||||||
|
}
|
||||||
|
|
||||||
|
if !peerIsPartner(seattle.Peer, sanfrancisco.Engine) {
|
||||||
|
t.Fatal("Peer wasn't added as a Partner")
|
||||||
|
}
|
||||||
|
|
||||||
|
if !peerIsPartner(sanfrancisco.Peer, seattle.Engine) {
|
||||||
|
t.Fatal("Peer wasn't added as a Partner")
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func peerIsPartner(p peer.Peer, e *Engine) bool {
|
||||||
|
for _, partner := range e.Peers() {
|
||||||
|
if partner.Key() == p.Key() {
|
||||||
|
return true
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return false
|
||||||
|
}
|
@ -1,8 +1,9 @@
|
|||||||
package strategy
|
package decision
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
|
wl "github.com/jbenet/go-ipfs/exchange/bitswap/wantlist"
|
||||||
peer "github.com/jbenet/go-ipfs/peer"
|
peer "github.com/jbenet/go-ipfs/peer"
|
||||||
u "github.com/jbenet/go-ipfs/util"
|
u "github.com/jbenet/go-ipfs/util"
|
||||||
)
|
)
|
||||||
@ -11,10 +12,9 @@ import (
|
|||||||
// access/lookups.
|
// access/lookups.
|
||||||
type keySet map[u.Key]struct{}
|
type keySet map[u.Key]struct{}
|
||||||
|
|
||||||
func newLedger(p peer.Peer, strategy strategyFunc) *ledger {
|
func newLedger(p peer.Peer) *ledger {
|
||||||
return &ledger{
|
return &ledger{
|
||||||
wantList: keySet{},
|
wantList: wl.New(),
|
||||||
Strategy: strategy,
|
|
||||||
Partner: p,
|
Partner: p,
|
||||||
sentToPeer: make(map[u.Key]time.Time),
|
sentToPeer: make(map[u.Key]time.Time),
|
||||||
}
|
}
|
||||||
@ -39,17 +39,20 @@ type ledger struct {
|
|||||||
exchangeCount uint64
|
exchangeCount uint64
|
||||||
|
|
||||||
// wantList is a (bounded, small) set of keys that Partner desires.
|
// wantList is a (bounded, small) set of keys that Partner desires.
|
||||||
wantList keySet
|
wantList *wl.Wantlist
|
||||||
|
|
||||||
// sentToPeer is a set of keys to ensure we dont send duplicate blocks
|
// sentToPeer is a set of keys to ensure we dont send duplicate blocks
|
||||||
// to a given peer
|
// to a given peer
|
||||||
sentToPeer map[u.Key]time.Time
|
sentToPeer map[u.Key]time.Time
|
||||||
|
|
||||||
Strategy strategyFunc
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func (l *ledger) ShouldSend() bool {
|
type debtRatio struct {
|
||||||
return l.Strategy(l)
|
BytesSent uint64
|
||||||
|
BytesRecv uint64
|
||||||
|
}
|
||||||
|
|
||||||
|
func (dr *debtRatio) Value() float64 {
|
||||||
|
return float64(dr.BytesSent) / float64(dr.BytesRecv+1)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (l *ledger) SentBytes(n int) {
|
func (l *ledger) SentBytes(n int) {
|
||||||
@ -65,14 +68,17 @@ func (l *ledger) ReceivedBytes(n int) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
// TODO: this needs to be different. We need timeouts.
|
// TODO: this needs to be different. We need timeouts.
|
||||||
func (l *ledger) Wants(k u.Key) {
|
func (l *ledger) Wants(k u.Key, priority int) {
|
||||||
log.Debugf("peer %s wants %s", l.Partner, k)
|
log.Debugf("peer %s wants %s", l.Partner, k)
|
||||||
l.wantList[k] = struct{}{}
|
l.wantList.Add(k, priority)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (l *ledger) CancelWant(k u.Key) {
|
||||||
|
l.wantList.Remove(k)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (l *ledger) WantListContains(k u.Key) bool {
|
func (l *ledger) WantListContains(k u.Key) bool {
|
||||||
_, ok := l.wantList[k]
|
return l.wantList.Contains(k)
|
||||||
return ok
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func (l *ledger) ExchangeCount() uint64 {
|
func (l *ledger) ExchangeCount() uint64 {
|
84
exchange/bitswap/decision/taskqueue.go
Normal file
84
exchange/bitswap/decision/taskqueue.go
Normal file
@ -0,0 +1,84 @@
|
|||||||
|
package decision
|
||||||
|
|
||||||
|
import (
|
||||||
|
"sync"
|
||||||
|
|
||||||
|
wantlist "github.com/jbenet/go-ipfs/exchange/bitswap/wantlist"
|
||||||
|
peer "github.com/jbenet/go-ipfs/peer"
|
||||||
|
u "github.com/jbenet/go-ipfs/util"
|
||||||
|
)
|
||||||
|
|
||||||
|
// TODO: at some point, the strategy needs to plug in here
|
||||||
|
// to help decide how to sort tasks (on add) and how to select
|
||||||
|
// tasks (on getnext). For now, we are assuming a dumb/nice strategy.
|
||||||
|
type taskQueue struct {
|
||||||
|
// TODO: make this into a priority queue
|
||||||
|
lock sync.Mutex
|
||||||
|
tasks []*task
|
||||||
|
taskmap map[string]*task
|
||||||
|
}
|
||||||
|
|
||||||
|
func newTaskQueue() *taskQueue {
|
||||||
|
return &taskQueue{
|
||||||
|
taskmap: make(map[string]*task),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
type task struct {
|
||||||
|
Entry wantlist.Entry
|
||||||
|
Target peer.Peer
|
||||||
|
Trash bool
|
||||||
|
}
|
||||||
|
|
||||||
|
// Push currently adds a new task to the end of the list
|
||||||
|
func (tl *taskQueue) Push(entry wantlist.Entry, to peer.Peer) {
|
||||||
|
tl.lock.Lock()
|
||||||
|
defer tl.lock.Unlock()
|
||||||
|
if task, ok := tl.taskmap[taskKey(to, entry.Key)]; ok {
|
||||||
|
// TODO: when priority queue is implemented,
|
||||||
|
// rearrange this task
|
||||||
|
task.Entry.Priority = entry.Priority
|
||||||
|
return
|
||||||
|
}
|
||||||
|
task := &task{
|
||||||
|
Entry: entry,
|
||||||
|
Target: to,
|
||||||
|
}
|
||||||
|
tl.tasks = append(tl.tasks, task)
|
||||||
|
tl.taskmap[taskKey(to, entry.Key)] = task
|
||||||
|
}
|
||||||
|
|
||||||
|
// Pop 'pops' the next task to be performed. Returns nil no task exists.
|
||||||
|
func (tl *taskQueue) Pop() *task {
|
||||||
|
tl.lock.Lock()
|
||||||
|
defer tl.lock.Unlock()
|
||||||
|
var out *task
|
||||||
|
for len(tl.tasks) > 0 {
|
||||||
|
// TODO: instead of zero, use exponential distribution
|
||||||
|
// it will help reduce the chance of receiving
|
||||||
|
// the same block from multiple peers
|
||||||
|
out = tl.tasks[0]
|
||||||
|
tl.tasks = tl.tasks[1:]
|
||||||
|
delete(tl.taskmap, taskKey(out.Target, out.Entry.Key))
|
||||||
|
if out.Trash {
|
||||||
|
continue // discarding tasks that have been removed
|
||||||
|
}
|
||||||
|
break // and return |out|
|
||||||
|
}
|
||||||
|
return out
|
||||||
|
}
|
||||||
|
|
||||||
|
// Remove lazily removes a task from the queue
|
||||||
|
func (tl *taskQueue) Remove(k u.Key, p peer.Peer) {
|
||||||
|
tl.lock.Lock()
|
||||||
|
t, ok := tl.taskmap[taskKey(p, k)]
|
||||||
|
if ok {
|
||||||
|
t.Trash = true
|
||||||
|
}
|
||||||
|
tl.lock.Unlock()
|
||||||
|
}
|
||||||
|
|
||||||
|
// taskKey returns a key that uniquely identifies a task.
|
||||||
|
func taskKey(p peer.Peer, k u.Key) string {
|
||||||
|
return string(p.Key() + k)
|
||||||
|
}
|
@ -21,16 +21,16 @@ var _ = proto.Marshal
|
|||||||
var _ = math.Inf
|
var _ = math.Inf
|
||||||
|
|
||||||
type Message struct {
|
type Message struct {
|
||||||
Wantlist []string `protobuf:"bytes,1,rep,name=wantlist" json:"wantlist,omitempty"`
|
Wantlist *Message_Wantlist `protobuf:"bytes,1,opt,name=wantlist" json:"wantlist,omitempty"`
|
||||||
Blocks [][]byte `protobuf:"bytes,2,rep,name=blocks" json:"blocks,omitempty"`
|
Blocks [][]byte `protobuf:"bytes,2,rep,name=blocks" json:"blocks,omitempty"`
|
||||||
XXX_unrecognized []byte `json:"-"`
|
XXX_unrecognized []byte `json:"-"`
|
||||||
}
|
}
|
||||||
|
|
||||||
func (m *Message) Reset() { *m = Message{} }
|
func (m *Message) Reset() { *m = Message{} }
|
||||||
func (m *Message) String() string { return proto.CompactTextString(m) }
|
func (m *Message) String() string { return proto.CompactTextString(m) }
|
||||||
func (*Message) ProtoMessage() {}
|
func (*Message) ProtoMessage() {}
|
||||||
|
|
||||||
func (m *Message) GetWantlist() []string {
|
func (m *Message) GetWantlist() *Message_Wantlist {
|
||||||
if m != nil {
|
if m != nil {
|
||||||
return m.Wantlist
|
return m.Wantlist
|
||||||
}
|
}
|
||||||
@ -44,5 +44,61 @@ func (m *Message) GetBlocks() [][]byte {
|
|||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
type Message_Wantlist struct {
|
||||||
|
Entries []*Message_Wantlist_Entry `protobuf:"bytes,1,rep,name=entries" json:"entries,omitempty"`
|
||||||
|
Full *bool `protobuf:"varint,2,opt,name=full" json:"full,omitempty"`
|
||||||
|
XXX_unrecognized []byte `json:"-"`
|
||||||
|
}
|
||||||
|
|
||||||
|
func (m *Message_Wantlist) Reset() { *m = Message_Wantlist{} }
|
||||||
|
func (m *Message_Wantlist) String() string { return proto.CompactTextString(m) }
|
||||||
|
func (*Message_Wantlist) ProtoMessage() {}
|
||||||
|
|
||||||
|
func (m *Message_Wantlist) GetEntries() []*Message_Wantlist_Entry {
|
||||||
|
if m != nil {
|
||||||
|
return m.Entries
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (m *Message_Wantlist) GetFull() bool {
|
||||||
|
if m != nil && m.Full != nil {
|
||||||
|
return *m.Full
|
||||||
|
}
|
||||||
|
return false
|
||||||
|
}
|
||||||
|
|
||||||
|
type Message_Wantlist_Entry struct {
|
||||||
|
Block *string `protobuf:"bytes,1,opt,name=block" json:"block,omitempty"`
|
||||||
|
Priority *int32 `protobuf:"varint,2,opt,name=priority" json:"priority,omitempty"`
|
||||||
|
Cancel *bool `protobuf:"varint,3,opt,name=cancel" json:"cancel,omitempty"`
|
||||||
|
XXX_unrecognized []byte `json:"-"`
|
||||||
|
}
|
||||||
|
|
||||||
|
func (m *Message_Wantlist_Entry) Reset() { *m = Message_Wantlist_Entry{} }
|
||||||
|
func (m *Message_Wantlist_Entry) String() string { return proto.CompactTextString(m) }
|
||||||
|
func (*Message_Wantlist_Entry) ProtoMessage() {}
|
||||||
|
|
||||||
|
func (m *Message_Wantlist_Entry) GetBlock() string {
|
||||||
|
if m != nil && m.Block != nil {
|
||||||
|
return *m.Block
|
||||||
|
}
|
||||||
|
return ""
|
||||||
|
}
|
||||||
|
|
||||||
|
func (m *Message_Wantlist_Entry) GetPriority() int32 {
|
||||||
|
if m != nil && m.Priority != nil {
|
||||||
|
return *m.Priority
|
||||||
|
}
|
||||||
|
return 0
|
||||||
|
}
|
||||||
|
|
||||||
|
func (m *Message_Wantlist_Entry) GetCancel() bool {
|
||||||
|
if m != nil && m.Cancel != nil {
|
||||||
|
return *m.Cancel
|
||||||
|
}
|
||||||
|
return false
|
||||||
|
}
|
||||||
|
|
||||||
func init() {
|
func init() {
|
||||||
}
|
}
|
||||||
|
@ -1,6 +1,19 @@
|
|||||||
package bitswap.message.pb;
|
package bitswap.message.pb;
|
||||||
|
|
||||||
message Message {
|
message Message {
|
||||||
repeated string wantlist = 1;
|
|
||||||
repeated bytes blocks = 2;
|
message Wantlist {
|
||||||
|
|
||||||
|
message Entry {
|
||||||
|
optional string block = 1; // the block key
|
||||||
|
optional int32 priority = 2; // the priority (normalized). default to 1
|
||||||
|
optional bool cancel = 3; // whether this revokes an entry
|
||||||
|
}
|
||||||
|
|
||||||
|
repeated Entry entries = 1; // a list of wantlist entries
|
||||||
|
optional bool full = 2; // whether this is the full wantlist. default to false
|
||||||
|
}
|
||||||
|
|
||||||
|
optional Wantlist wantlist = 1;
|
||||||
|
repeated bytes blocks = 2;
|
||||||
}
|
}
|
||||||
|
@ -5,10 +5,12 @@ import (
|
|||||||
|
|
||||||
blocks "github.com/jbenet/go-ipfs/blocks"
|
blocks "github.com/jbenet/go-ipfs/blocks"
|
||||||
pb "github.com/jbenet/go-ipfs/exchange/bitswap/message/internal/pb"
|
pb "github.com/jbenet/go-ipfs/exchange/bitswap/message/internal/pb"
|
||||||
|
wantlist "github.com/jbenet/go-ipfs/exchange/bitswap/wantlist"
|
||||||
inet "github.com/jbenet/go-ipfs/net"
|
inet "github.com/jbenet/go-ipfs/net"
|
||||||
u "github.com/jbenet/go-ipfs/util"
|
u "github.com/jbenet/go-ipfs/util"
|
||||||
|
|
||||||
ggio "github.com/jbenet/go-ipfs/Godeps/_workspace/src/code.google.com/p/gogoprotobuf/io"
|
ggio "github.com/jbenet/go-ipfs/Godeps/_workspace/src/code.google.com/p/gogoprotobuf/io"
|
||||||
|
proto "github.com/jbenet/go-ipfs/Godeps/_workspace/src/code.google.com/p/goprotobuf/proto"
|
||||||
)
|
)
|
||||||
|
|
||||||
// TODO move message.go into the bitswap package
|
// TODO move message.go into the bitswap package
|
||||||
@ -17,21 +19,23 @@ import (
|
|||||||
type BitSwapMessage interface {
|
type BitSwapMessage interface {
|
||||||
// Wantlist returns a slice of unique keys that represent data wanted by
|
// Wantlist returns a slice of unique keys that represent data wanted by
|
||||||
// the sender.
|
// the sender.
|
||||||
Wantlist() []u.Key
|
Wantlist() []Entry
|
||||||
|
|
||||||
// Blocks returns a slice of unique blocks
|
// Blocks returns a slice of unique blocks
|
||||||
Blocks() []*blocks.Block
|
Blocks() []*blocks.Block
|
||||||
|
|
||||||
// AddWanted adds the key to the Wantlist.
|
// AddEntry adds an entry to the Wantlist.
|
||||||
//
|
AddEntry(key u.Key, priority int)
|
||||||
// Insertion order determines priority. That is, earlier insertions are
|
|
||||||
// deemed higher priority than keys inserted later.
|
Cancel(key u.Key)
|
||||||
//
|
|
||||||
// t = 0, msg.AddWanted(A)
|
// Sets whether or not the contained wantlist represents the entire wantlist
|
||||||
// t = 1, msg.AddWanted(B)
|
// true = full wantlist
|
||||||
//
|
// false = wantlist 'patch'
|
||||||
// implies Priority(A) > Priority(B)
|
// default: true
|
||||||
AddWanted(u.Key)
|
SetFull(isFull bool)
|
||||||
|
|
||||||
|
Full() bool
|
||||||
|
|
||||||
AddBlock(*blocks.Block)
|
AddBlock(*blocks.Block)
|
||||||
Exportable
|
Exportable
|
||||||
@ -43,23 +47,33 @@ type Exportable interface {
|
|||||||
}
|
}
|
||||||
|
|
||||||
type impl struct {
|
type impl struct {
|
||||||
existsInWantlist map[u.Key]struct{} // map to detect duplicates
|
full bool
|
||||||
wantlist []u.Key // slice to preserve ordering
|
wantlist map[u.Key]Entry
|
||||||
blocks map[u.Key]*blocks.Block // map to detect duplicates
|
blocks map[u.Key]*blocks.Block // map to detect duplicates
|
||||||
}
|
}
|
||||||
|
|
||||||
func New() BitSwapMessage {
|
func New() BitSwapMessage {
|
||||||
|
return newMsg()
|
||||||
|
}
|
||||||
|
|
||||||
|
func newMsg() *impl {
|
||||||
return &impl{
|
return &impl{
|
||||||
blocks: make(map[u.Key]*blocks.Block),
|
blocks: make(map[u.Key]*blocks.Block),
|
||||||
existsInWantlist: make(map[u.Key]struct{}),
|
wantlist: make(map[u.Key]Entry),
|
||||||
wantlist: make([]u.Key, 0),
|
full: true,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
type Entry struct {
|
||||||
|
wantlist.Entry
|
||||||
|
Cancel bool
|
||||||
|
}
|
||||||
|
|
||||||
func newMessageFromProto(pbm pb.Message) BitSwapMessage {
|
func newMessageFromProto(pbm pb.Message) BitSwapMessage {
|
||||||
m := New()
|
m := newMsg()
|
||||||
for _, s := range pbm.GetWantlist() {
|
m.SetFull(pbm.GetWantlist().GetFull())
|
||||||
m.AddWanted(u.Key(s))
|
for _, e := range pbm.GetWantlist().GetEntries() {
|
||||||
|
m.addEntry(u.Key(e.GetBlock()), int(e.GetPriority()), e.GetCancel())
|
||||||
}
|
}
|
||||||
for _, d := range pbm.GetBlocks() {
|
for _, d := range pbm.GetBlocks() {
|
||||||
b := blocks.NewBlock(d)
|
b := blocks.NewBlock(d)
|
||||||
@ -68,8 +82,20 @@ func newMessageFromProto(pbm pb.Message) BitSwapMessage {
|
|||||||
return m
|
return m
|
||||||
}
|
}
|
||||||
|
|
||||||
func (m *impl) Wantlist() []u.Key {
|
func (m *impl) SetFull(full bool) {
|
||||||
return m.wantlist
|
m.full = full
|
||||||
|
}
|
||||||
|
|
||||||
|
func (m *impl) Full() bool {
|
||||||
|
return m.full
|
||||||
|
}
|
||||||
|
|
||||||
|
func (m *impl) Wantlist() []Entry {
|
||||||
|
var out []Entry
|
||||||
|
for _, e := range m.wantlist {
|
||||||
|
out = append(out, e)
|
||||||
|
}
|
||||||
|
return out
|
||||||
}
|
}
|
||||||
|
|
||||||
func (m *impl) Blocks() []*blocks.Block {
|
func (m *impl) Blocks() []*blocks.Block {
|
||||||
@ -80,13 +106,28 @@ func (m *impl) Blocks() []*blocks.Block {
|
|||||||
return bs
|
return bs
|
||||||
}
|
}
|
||||||
|
|
||||||
func (m *impl) AddWanted(k u.Key) {
|
func (m *impl) Cancel(k u.Key) {
|
||||||
_, exists := m.existsInWantlist[k]
|
m.addEntry(k, 0, true)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (m *impl) AddEntry(k u.Key, priority int) {
|
||||||
|
m.addEntry(k, priority, false)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (m *impl) addEntry(k u.Key, priority int, cancel bool) {
|
||||||
|
e, exists := m.wantlist[k]
|
||||||
if exists {
|
if exists {
|
||||||
return
|
e.Priority = priority
|
||||||
|
e.Cancel = cancel
|
||||||
|
} else {
|
||||||
|
m.wantlist[k] = Entry{
|
||||||
|
Entry: wantlist.Entry{
|
||||||
|
Key: k,
|
||||||
|
Priority: priority,
|
||||||
|
},
|
||||||
|
Cancel: cancel,
|
||||||
|
}
|
||||||
}
|
}
|
||||||
m.existsInWantlist[k] = struct{}{}
|
|
||||||
m.wantlist = append(m.wantlist, k)
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func (m *impl) AddBlock(b *blocks.Block) {
|
func (m *impl) AddBlock(b *blocks.Block) {
|
||||||
@ -106,14 +147,19 @@ func FromNet(r io.Reader) (BitSwapMessage, error) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
func (m *impl) ToProto() *pb.Message {
|
func (m *impl) ToProto() *pb.Message {
|
||||||
pb := new(pb.Message)
|
pbm := new(pb.Message)
|
||||||
for _, k := range m.Wantlist() {
|
pbm.Wantlist = new(pb.Message_Wantlist)
|
||||||
pb.Wantlist = append(pb.Wantlist, string(k))
|
for _, e := range m.wantlist {
|
||||||
|
pbm.Wantlist.Entries = append(pbm.Wantlist.Entries, &pb.Message_Wantlist_Entry{
|
||||||
|
Block: proto.String(string(e.Key)),
|
||||||
|
Priority: proto.Int32(int32(e.Priority)),
|
||||||
|
Cancel: &e.Cancel,
|
||||||
|
})
|
||||||
}
|
}
|
||||||
for _, b := range m.Blocks() {
|
for _, b := range m.Blocks() {
|
||||||
pb.Blocks = append(pb.Blocks, b.Data)
|
pbm.Blocks = append(pbm.Blocks, b.Data)
|
||||||
}
|
}
|
||||||
return pb
|
return pbm
|
||||||
}
|
}
|
||||||
|
|
||||||
func (m *impl) ToNet(w io.Writer) error {
|
func (m *impl) ToNet(w io.Writer) error {
|
||||||
|
@ -4,6 +4,8 @@ import (
|
|||||||
"bytes"
|
"bytes"
|
||||||
"testing"
|
"testing"
|
||||||
|
|
||||||
|
proto "github.com/jbenet/go-ipfs/Godeps/_workspace/src/code.google.com/p/goprotobuf/proto"
|
||||||
|
|
||||||
blocks "github.com/jbenet/go-ipfs/blocks"
|
blocks "github.com/jbenet/go-ipfs/blocks"
|
||||||
pb "github.com/jbenet/go-ipfs/exchange/bitswap/message/internal/pb"
|
pb "github.com/jbenet/go-ipfs/exchange/bitswap/message/internal/pb"
|
||||||
u "github.com/jbenet/go-ipfs/util"
|
u "github.com/jbenet/go-ipfs/util"
|
||||||
@ -12,22 +14,26 @@ import (
|
|||||||
func TestAppendWanted(t *testing.T) {
|
func TestAppendWanted(t *testing.T) {
|
||||||
const str = "foo"
|
const str = "foo"
|
||||||
m := New()
|
m := New()
|
||||||
m.AddWanted(u.Key(str))
|
m.AddEntry(u.Key(str), 1)
|
||||||
|
|
||||||
if !contains(m.ToProto().GetWantlist(), str) {
|
if !wantlistContains(m.ToProto().GetWantlist(), str) {
|
||||||
t.Fail()
|
t.Fail()
|
||||||
}
|
}
|
||||||
|
m.ToProto().GetWantlist().GetEntries()
|
||||||
}
|
}
|
||||||
|
|
||||||
func TestNewMessageFromProto(t *testing.T) {
|
func TestNewMessageFromProto(t *testing.T) {
|
||||||
const str = "a_key"
|
const str = "a_key"
|
||||||
protoMessage := new(pb.Message)
|
protoMessage := new(pb.Message)
|
||||||
protoMessage.Wantlist = []string{string(str)}
|
protoMessage.Wantlist = new(pb.Message_Wantlist)
|
||||||
if !contains(protoMessage.Wantlist, str) {
|
protoMessage.Wantlist.Entries = []*pb.Message_Wantlist_Entry{
|
||||||
|
&pb.Message_Wantlist_Entry{Block: proto.String(str)},
|
||||||
|
}
|
||||||
|
if !wantlistContains(protoMessage.Wantlist, str) {
|
||||||
t.Fail()
|
t.Fail()
|
||||||
}
|
}
|
||||||
m := newMessageFromProto(*protoMessage)
|
m := newMessageFromProto(*protoMessage)
|
||||||
if !contains(m.ToProto().GetWantlist(), str) {
|
if !wantlistContains(m.ToProto().GetWantlist(), str) {
|
||||||
t.Fail()
|
t.Fail()
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -57,7 +63,7 @@ func TestWantlist(t *testing.T) {
|
|||||||
keystrs := []string{"foo", "bar", "baz", "bat"}
|
keystrs := []string{"foo", "bar", "baz", "bat"}
|
||||||
m := New()
|
m := New()
|
||||||
for _, s := range keystrs {
|
for _, s := range keystrs {
|
||||||
m.AddWanted(u.Key(s))
|
m.AddEntry(u.Key(s), 1)
|
||||||
}
|
}
|
||||||
exported := m.Wantlist()
|
exported := m.Wantlist()
|
||||||
|
|
||||||
@ -65,12 +71,12 @@ func TestWantlist(t *testing.T) {
|
|||||||
present := false
|
present := false
|
||||||
for _, s := range keystrs {
|
for _, s := range keystrs {
|
||||||
|
|
||||||
if s == string(k) {
|
if s == string(k.Key) {
|
||||||
present = true
|
present = true
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
if !present {
|
if !present {
|
||||||
t.Logf("%v isn't in original list", string(k))
|
t.Logf("%v isn't in original list", k.Key)
|
||||||
t.Fail()
|
t.Fail()
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -80,19 +86,19 @@ func TestCopyProtoByValue(t *testing.T) {
|
|||||||
const str = "foo"
|
const str = "foo"
|
||||||
m := New()
|
m := New()
|
||||||
protoBeforeAppend := m.ToProto()
|
protoBeforeAppend := m.ToProto()
|
||||||
m.AddWanted(u.Key(str))
|
m.AddEntry(u.Key(str), 1)
|
||||||
if contains(protoBeforeAppend.GetWantlist(), str) {
|
if wantlistContains(protoBeforeAppend.GetWantlist(), str) {
|
||||||
t.Fail()
|
t.Fail()
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func TestToNetFromNetPreservesWantList(t *testing.T) {
|
func TestToNetFromNetPreservesWantList(t *testing.T) {
|
||||||
original := New()
|
original := New()
|
||||||
original.AddWanted(u.Key("M"))
|
original.AddEntry(u.Key("M"), 1)
|
||||||
original.AddWanted(u.Key("B"))
|
original.AddEntry(u.Key("B"), 1)
|
||||||
original.AddWanted(u.Key("D"))
|
original.AddEntry(u.Key("D"), 1)
|
||||||
original.AddWanted(u.Key("T"))
|
original.AddEntry(u.Key("T"), 1)
|
||||||
original.AddWanted(u.Key("F"))
|
original.AddEntry(u.Key("F"), 1)
|
||||||
|
|
||||||
var buf bytes.Buffer
|
var buf bytes.Buffer
|
||||||
if err := original.ToNet(&buf); err != nil {
|
if err := original.ToNet(&buf); err != nil {
|
||||||
@ -106,11 +112,11 @@ func TestToNetFromNetPreservesWantList(t *testing.T) {
|
|||||||
|
|
||||||
keys := make(map[u.Key]bool)
|
keys := make(map[u.Key]bool)
|
||||||
for _, k := range copied.Wantlist() {
|
for _, k := range copied.Wantlist() {
|
||||||
keys[k] = true
|
keys[k.Key] = true
|
||||||
}
|
}
|
||||||
|
|
||||||
for _, k := range original.Wantlist() {
|
for _, k := range original.Wantlist() {
|
||||||
if _, ok := keys[k]; !ok {
|
if _, ok := keys[k.Key]; !ok {
|
||||||
t.Fatalf("Key Missing: \"%v\"", k)
|
t.Fatalf("Key Missing: \"%v\"", k)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -146,9 +152,18 @@ func TestToAndFromNetMessage(t *testing.T) {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func contains(s []string, x string) bool {
|
func wantlistContains(wantlist *pb.Message_Wantlist, x string) bool {
|
||||||
for _, a := range s {
|
for _, e := range wantlist.GetEntries() {
|
||||||
if a == x {
|
if e.GetBlock() == x {
|
||||||
|
return true
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return false
|
||||||
|
}
|
||||||
|
|
||||||
|
func contains(strs []string, x string) bool {
|
||||||
|
for _, s := range strs {
|
||||||
|
if s == x {
|
||||||
return true
|
return true
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -159,8 +174,8 @@ func TestDuplicates(t *testing.T) {
|
|||||||
b := blocks.NewBlock([]byte("foo"))
|
b := blocks.NewBlock([]byte("foo"))
|
||||||
msg := New()
|
msg := New()
|
||||||
|
|
||||||
msg.AddWanted(b.Key())
|
msg.AddEntry(b.Key(), 1)
|
||||||
msg.AddWanted(b.Key())
|
msg.AddEntry(b.Key(), 1)
|
||||||
if len(msg.Wantlist()) != 1 {
|
if len(msg.Wantlist()) != 1 {
|
||||||
t.Fatal("Duplicate in BitSwapMessage")
|
t.Fatal("Duplicate in BitSwapMessage")
|
||||||
}
|
}
|
||||||
|
@ -1,40 +0,0 @@
|
|||||||
package strategy
|
|
||||||
|
|
||||||
import (
|
|
||||||
"time"
|
|
||||||
|
|
||||||
bsmsg "github.com/jbenet/go-ipfs/exchange/bitswap/message"
|
|
||||||
peer "github.com/jbenet/go-ipfs/peer"
|
|
||||||
u "github.com/jbenet/go-ipfs/util"
|
|
||||||
)
|
|
||||||
|
|
||||||
type Strategy interface {
|
|
||||||
// Returns a slice of Peers with whom the local node has active sessions
|
|
||||||
Peers() []peer.Peer
|
|
||||||
|
|
||||||
// BlockIsWantedByPeer returns true if peer wants the block given by this
|
|
||||||
// key
|
|
||||||
BlockIsWantedByPeer(u.Key, peer.Peer) bool
|
|
||||||
|
|
||||||
// ShouldSendTo(Peer) decides whether to send data to this Peer
|
|
||||||
ShouldSendBlockToPeer(u.Key, peer.Peer) bool
|
|
||||||
|
|
||||||
// Seed initializes the decider to a deterministic state
|
|
||||||
Seed(int64)
|
|
||||||
|
|
||||||
// MessageReceived records receipt of message for accounting purposes
|
|
||||||
MessageReceived(peer.Peer, bsmsg.BitSwapMessage) error
|
|
||||||
|
|
||||||
// MessageSent records sending of message for accounting purposes
|
|
||||||
MessageSent(peer.Peer, bsmsg.BitSwapMessage) error
|
|
||||||
|
|
||||||
NumBytesSentTo(peer.Peer) uint64
|
|
||||||
|
|
||||||
NumBytesReceivedFrom(peer.Peer) uint64
|
|
||||||
|
|
||||||
BlockSentToPeer(u.Key, peer.Peer)
|
|
||||||
|
|
||||||
// Values determining bitswap behavioural patterns
|
|
||||||
GetBatchSize() int
|
|
||||||
GetRebroadcastDelay() time.Duration
|
|
||||||
}
|
|
@ -1 +0,0 @@
|
|||||||
package strategy
|
|
@ -1,34 +0,0 @@
|
|||||||
package strategy
|
|
||||||
|
|
||||||
import (
|
|
||||||
"math"
|
|
||||||
"math/rand"
|
|
||||||
)
|
|
||||||
|
|
||||||
type strategyFunc func(*ledger) bool
|
|
||||||
|
|
||||||
// TODO avoid using rand.Float64 method. it uses a singleton lock and may cause
|
|
||||||
// performance issues. Instead, instantiate a rand struct and use that to call
|
|
||||||
// Float64()
|
|
||||||
func standardStrategy(l *ledger) bool {
|
|
||||||
return rand.Float64() <= probabilitySend(l.Accounting.Value())
|
|
||||||
}
|
|
||||||
|
|
||||||
func yesManStrategy(l *ledger) bool {
|
|
||||||
return true
|
|
||||||
}
|
|
||||||
|
|
||||||
func probabilitySend(ratio float64) float64 {
|
|
||||||
x := 1 + math.Exp(6-3*ratio)
|
|
||||||
y := 1 / x
|
|
||||||
return 1 - y
|
|
||||||
}
|
|
||||||
|
|
||||||
type debtRatio struct {
|
|
||||||
BytesSent uint64
|
|
||||||
BytesRecv uint64
|
|
||||||
}
|
|
||||||
|
|
||||||
func (dr *debtRatio) Value() float64 {
|
|
||||||
return float64(dr.BytesSent) / float64(dr.BytesRecv+1)
|
|
||||||
}
|
|
@ -1,17 +0,0 @@
|
|||||||
package strategy
|
|
||||||
|
|
||||||
import (
|
|
||||||
"testing"
|
|
||||||
)
|
|
||||||
|
|
||||||
func TestProbabilitySendDecreasesAsRatioIncreases(t *testing.T) {
|
|
||||||
grateful := debtRatio{BytesSent: 0, BytesRecv: 10000}
|
|
||||||
pWhenGrateful := probabilitySend(grateful.Value())
|
|
||||||
|
|
||||||
abused := debtRatio{BytesSent: 10000, BytesRecv: 0}
|
|
||||||
pWhenAbused := probabilitySend(abused.Value())
|
|
||||||
|
|
||||||
if pWhenGrateful < pWhenAbused {
|
|
||||||
t.Fail()
|
|
||||||
}
|
|
||||||
}
|
|
@ -1,169 +0,0 @@
|
|||||||
package strategy
|
|
||||||
|
|
||||||
import (
|
|
||||||
"errors"
|
|
||||||
"sync"
|
|
||||||
"time"
|
|
||||||
|
|
||||||
bsmsg "github.com/jbenet/go-ipfs/exchange/bitswap/message"
|
|
||||||
peer "github.com/jbenet/go-ipfs/peer"
|
|
||||||
u "github.com/jbenet/go-ipfs/util"
|
|
||||||
)
|
|
||||||
|
|
||||||
const resendTimeoutPeriod = time.Minute
|
|
||||||
|
|
||||||
var log = u.Logger("strategy")
|
|
||||||
|
|
||||||
// TODO niceness should be on a per-peer basis. Use-case: Certain peers are
|
|
||||||
// "trusted" and/or controlled by a single human user. The user may want for
|
|
||||||
// these peers to exchange data freely
|
|
||||||
func New(nice bool) Strategy {
|
|
||||||
var stratFunc strategyFunc
|
|
||||||
if nice {
|
|
||||||
stratFunc = yesManStrategy
|
|
||||||
} else {
|
|
||||||
stratFunc = standardStrategy
|
|
||||||
}
|
|
||||||
return &strategist{
|
|
||||||
ledgerMap: ledgerMap{},
|
|
||||||
strategyFunc: stratFunc,
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
type strategist struct {
|
|
||||||
lock sync.RWMutex
|
|
||||||
ledgerMap
|
|
||||||
strategyFunc
|
|
||||||
}
|
|
||||||
|
|
||||||
// LedgerMap lists Ledgers by their Partner key.
|
|
||||||
type ledgerMap map[peerKey]*ledger
|
|
||||||
|
|
||||||
// FIXME share this externally
|
|
||||||
type peerKey u.Key
|
|
||||||
|
|
||||||
// Peers returns a list of peers
|
|
||||||
func (s *strategist) Peers() []peer.Peer {
|
|
||||||
s.lock.RLock()
|
|
||||||
defer s.lock.RUnlock()
|
|
||||||
|
|
||||||
response := make([]peer.Peer, 0)
|
|
||||||
for _, ledger := range s.ledgerMap {
|
|
||||||
response = append(response, ledger.Partner)
|
|
||||||
}
|
|
||||||
return response
|
|
||||||
}
|
|
||||||
|
|
||||||
func (s *strategist) BlockIsWantedByPeer(k u.Key, p peer.Peer) bool {
|
|
||||||
s.lock.RLock()
|
|
||||||
defer s.lock.RUnlock()
|
|
||||||
|
|
||||||
ledger := s.ledger(p)
|
|
||||||
return ledger.WantListContains(k)
|
|
||||||
}
|
|
||||||
|
|
||||||
func (s *strategist) ShouldSendBlockToPeer(k u.Key, p peer.Peer) bool {
|
|
||||||
s.lock.RLock()
|
|
||||||
defer s.lock.RUnlock()
|
|
||||||
|
|
||||||
ledger := s.ledger(p)
|
|
||||||
|
|
||||||
// Dont resend blocks within a certain time period
|
|
||||||
t, ok := ledger.sentToPeer[k]
|
|
||||||
if ok && t.Add(resendTimeoutPeriod).After(time.Now()) {
|
|
||||||
return false
|
|
||||||
}
|
|
||||||
|
|
||||||
return ledger.ShouldSend()
|
|
||||||
}
|
|
||||||
|
|
||||||
func (s *strategist) BlockSentToPeer(k u.Key, p peer.Peer) {
|
|
||||||
s.lock.Lock()
|
|
||||||
defer s.lock.Unlock()
|
|
||||||
|
|
||||||
ledger := s.ledger(p)
|
|
||||||
ledger.sentToPeer[k] = time.Now()
|
|
||||||
}
|
|
||||||
|
|
||||||
func (s *strategist) Seed(int64) {
|
|
||||||
s.lock.Lock()
|
|
||||||
defer s.lock.Unlock()
|
|
||||||
|
|
||||||
// TODO
|
|
||||||
}
|
|
||||||
|
|
||||||
// MessageReceived performs book-keeping. Returns error if passed invalid
|
|
||||||
// arguments.
|
|
||||||
func (s *strategist) MessageReceived(p peer.Peer, m bsmsg.BitSwapMessage) error {
|
|
||||||
s.lock.Lock()
|
|
||||||
defer s.lock.Unlock()
|
|
||||||
|
|
||||||
// TODO find a more elegant way to handle this check
|
|
||||||
if p == nil {
|
|
||||||
return errors.New("Strategy received nil peer")
|
|
||||||
}
|
|
||||||
if m == nil {
|
|
||||||
return errors.New("Strategy received nil message")
|
|
||||||
}
|
|
||||||
l := s.ledger(p)
|
|
||||||
for _, key := range m.Wantlist() {
|
|
||||||
l.Wants(key)
|
|
||||||
}
|
|
||||||
for _, block := range m.Blocks() {
|
|
||||||
// FIXME extract blocks.NumBytes(block) or block.NumBytes() method
|
|
||||||
l.ReceivedBytes(len(block.Data))
|
|
||||||
}
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
|
|
||||||
// TODO add contents of m.WantList() to my local wantlist? NB: could introduce
|
|
||||||
// race conditions where I send a message, but MessageSent gets handled after
|
|
||||||
// MessageReceived. The information in the local wantlist could become
|
|
||||||
// inconsistent. Would need to ensure that Sends and acknowledgement of the
|
|
||||||
// send happen atomically
|
|
||||||
|
|
||||||
func (s *strategist) MessageSent(p peer.Peer, m bsmsg.BitSwapMessage) error {
|
|
||||||
s.lock.Lock()
|
|
||||||
defer s.lock.Unlock()
|
|
||||||
|
|
||||||
l := s.ledger(p)
|
|
||||||
for _, block := range m.Blocks() {
|
|
||||||
l.SentBytes(len(block.Data))
|
|
||||||
}
|
|
||||||
|
|
||||||
// TODO remove these blocks from peer's want list
|
|
||||||
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
|
|
||||||
func (s *strategist) NumBytesSentTo(p peer.Peer) uint64 {
|
|
||||||
s.lock.RLock()
|
|
||||||
defer s.lock.RUnlock()
|
|
||||||
|
|
||||||
return s.ledger(p).Accounting.BytesSent
|
|
||||||
}
|
|
||||||
|
|
||||||
func (s *strategist) NumBytesReceivedFrom(p peer.Peer) uint64 {
|
|
||||||
s.lock.RLock()
|
|
||||||
defer s.lock.RUnlock()
|
|
||||||
|
|
||||||
return s.ledger(p).Accounting.BytesRecv
|
|
||||||
}
|
|
||||||
|
|
||||||
// ledger lazily instantiates a ledger
|
|
||||||
func (s *strategist) ledger(p peer.Peer) *ledger {
|
|
||||||
l, ok := s.ledgerMap[peerKey(p.Key())]
|
|
||||||
if !ok {
|
|
||||||
l = newLedger(p, s.strategyFunc)
|
|
||||||
s.ledgerMap[peerKey(p.Key())] = l
|
|
||||||
}
|
|
||||||
return l
|
|
||||||
}
|
|
||||||
|
|
||||||
func (s *strategist) GetBatchSize() int {
|
|
||||||
return 10
|
|
||||||
}
|
|
||||||
|
|
||||||
func (s *strategist) GetRebroadcastDelay() time.Duration {
|
|
||||||
return time.Second * 5
|
|
||||||
}
|
|
@ -1,104 +0,0 @@
|
|||||||
package strategy
|
|
||||||
|
|
||||||
import (
|
|
||||||
"strings"
|
|
||||||
"testing"
|
|
||||||
|
|
||||||
blocks "github.com/jbenet/go-ipfs/blocks"
|
|
||||||
message "github.com/jbenet/go-ipfs/exchange/bitswap/message"
|
|
||||||
peer "github.com/jbenet/go-ipfs/peer"
|
|
||||||
testutil "github.com/jbenet/go-ipfs/util/testutil"
|
|
||||||
)
|
|
||||||
|
|
||||||
type peerAndStrategist struct {
|
|
||||||
peer.Peer
|
|
||||||
Strategy
|
|
||||||
}
|
|
||||||
|
|
||||||
func newPeerAndStrategist(idStr string) peerAndStrategist {
|
|
||||||
return peerAndStrategist{
|
|
||||||
Peer: testutil.NewPeerWithIDString(idStr),
|
|
||||||
Strategy: New(true),
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
func TestConsistentAccounting(t *testing.T) {
|
|
||||||
sender := newPeerAndStrategist("Ernie")
|
|
||||||
receiver := newPeerAndStrategist("Bert")
|
|
||||||
|
|
||||||
// Send messages from Ernie to Bert
|
|
||||||
for i := 0; i < 1000; i++ {
|
|
||||||
|
|
||||||
m := message.New()
|
|
||||||
content := []string{"this", "is", "message", "i"}
|
|
||||||
m.AddBlock(blocks.NewBlock([]byte(strings.Join(content, " "))))
|
|
||||||
|
|
||||||
sender.MessageSent(receiver.Peer, m)
|
|
||||||
receiver.MessageReceived(sender.Peer, m)
|
|
||||||
}
|
|
||||||
|
|
||||||
// Ensure sender records the change
|
|
||||||
if sender.NumBytesSentTo(receiver.Peer) == 0 {
|
|
||||||
t.Fatal("Sent bytes were not recorded")
|
|
||||||
}
|
|
||||||
|
|
||||||
// Ensure sender and receiver have the same values
|
|
||||||
if sender.NumBytesSentTo(receiver.Peer) != receiver.NumBytesReceivedFrom(sender.Peer) {
|
|
||||||
t.Fatal("Inconsistent book-keeping. Strategies don't agree")
|
|
||||||
}
|
|
||||||
|
|
||||||
// Ensure sender didn't record receving anything. And that the receiver
|
|
||||||
// didn't record sending anything
|
|
||||||
if receiver.NumBytesSentTo(sender.Peer) != 0 || sender.NumBytesReceivedFrom(receiver.Peer) != 0 {
|
|
||||||
t.Fatal("Bert didn't send bytes to Ernie")
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
func TestBlockRecordedAsWantedAfterMessageReceived(t *testing.T) {
|
|
||||||
beggar := newPeerAndStrategist("can't be chooser")
|
|
||||||
chooser := newPeerAndStrategist("chooses JIF")
|
|
||||||
|
|
||||||
block := blocks.NewBlock([]byte("data wanted by beggar"))
|
|
||||||
|
|
||||||
messageFromBeggarToChooser := message.New()
|
|
||||||
messageFromBeggarToChooser.AddWanted(block.Key())
|
|
||||||
|
|
||||||
chooser.MessageReceived(beggar.Peer, messageFromBeggarToChooser)
|
|
||||||
// for this test, doesn't matter if you record that beggar sent
|
|
||||||
|
|
||||||
if !chooser.BlockIsWantedByPeer(block.Key(), beggar.Peer) {
|
|
||||||
t.Fatal("chooser failed to record that beggar wants block")
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
func TestPeerIsAddedToPeersWhenMessageReceivedOrSent(t *testing.T) {
|
|
||||||
|
|
||||||
sanfrancisco := newPeerAndStrategist("sf")
|
|
||||||
seattle := newPeerAndStrategist("sea")
|
|
||||||
|
|
||||||
m := message.New()
|
|
||||||
|
|
||||||
sanfrancisco.MessageSent(seattle.Peer, m)
|
|
||||||
seattle.MessageReceived(sanfrancisco.Peer, m)
|
|
||||||
|
|
||||||
if seattle.Peer.Key() == sanfrancisco.Peer.Key() {
|
|
||||||
t.Fatal("Sanity Check: Peers have same Key!")
|
|
||||||
}
|
|
||||||
|
|
||||||
if !peerIsPartner(seattle.Peer, sanfrancisco.Strategy) {
|
|
||||||
t.Fatal("Peer wasn't added as a Partner")
|
|
||||||
}
|
|
||||||
|
|
||||||
if !peerIsPartner(sanfrancisco.Peer, seattle.Strategy) {
|
|
||||||
t.Fatal("Peer wasn't added as a Partner")
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
func peerIsPartner(p peer.Peer, s Strategy) bool {
|
|
||||||
for _, partner := range s.Peers() {
|
|
||||||
if partner.Key() == p.Key() {
|
|
||||||
return true
|
|
||||||
}
|
|
||||||
}
|
|
||||||
return false
|
|
||||||
}
|
|
111
exchange/bitswap/wantlist/wantlist.go
Normal file
111
exchange/bitswap/wantlist/wantlist.go
Normal file
@ -0,0 +1,111 @@
|
|||||||
|
package wantlist
|
||||||
|
|
||||||
|
import (
|
||||||
|
u "github.com/jbenet/go-ipfs/util"
|
||||||
|
"sort"
|
||||||
|
"sync"
|
||||||
|
)
|
||||||
|
|
||||||
|
type ThreadSafe struct {
|
||||||
|
lk sync.RWMutex
|
||||||
|
Wantlist
|
||||||
|
}
|
||||||
|
|
||||||
|
// not threadsafe
|
||||||
|
type Wantlist struct {
|
||||||
|
set map[u.Key]Entry
|
||||||
|
}
|
||||||
|
|
||||||
|
type Entry struct {
|
||||||
|
// TODO consider making entries immutable so they can be shared safely and
|
||||||
|
// slices can be copied efficiently.
|
||||||
|
Key u.Key
|
||||||
|
Priority int
|
||||||
|
}
|
||||||
|
|
||||||
|
type entrySlice []Entry
|
||||||
|
|
||||||
|
func (es entrySlice) Len() int { return len(es) }
|
||||||
|
func (es entrySlice) Swap(i, j int) { es[i], es[j] = es[j], es[i] }
|
||||||
|
func (es entrySlice) Less(i, j int) bool { return es[i].Priority > es[j].Priority }
|
||||||
|
|
||||||
|
func NewThreadSafe() *ThreadSafe {
|
||||||
|
return &ThreadSafe{
|
||||||
|
Wantlist: *New(),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func New() *Wantlist {
|
||||||
|
return &Wantlist{
|
||||||
|
set: make(map[u.Key]Entry),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (w *ThreadSafe) Add(k u.Key, priority int) {
|
||||||
|
// TODO rm defer for perf
|
||||||
|
w.lk.Lock()
|
||||||
|
defer w.lk.Unlock()
|
||||||
|
w.Wantlist.Add(k, priority)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (w *ThreadSafe) Remove(k u.Key) {
|
||||||
|
// TODO rm defer for perf
|
||||||
|
w.lk.Lock()
|
||||||
|
defer w.lk.Unlock()
|
||||||
|
w.Wantlist.Remove(k)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (w *ThreadSafe) Contains(k u.Key) bool {
|
||||||
|
// TODO rm defer for perf
|
||||||
|
w.lk.RLock()
|
||||||
|
defer w.lk.RUnlock()
|
||||||
|
return w.Wantlist.Contains(k)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (w *ThreadSafe) Entries() []Entry {
|
||||||
|
w.lk.RLock()
|
||||||
|
defer w.lk.RUnlock()
|
||||||
|
return w.Wantlist.Entries()
|
||||||
|
}
|
||||||
|
|
||||||
|
func (w *ThreadSafe) SortedEntries() []Entry {
|
||||||
|
w.lk.RLock()
|
||||||
|
defer w.lk.RUnlock()
|
||||||
|
return w.Wantlist.SortedEntries()
|
||||||
|
}
|
||||||
|
|
||||||
|
func (w *Wantlist) Add(k u.Key, priority int) {
|
||||||
|
if _, ok := w.set[k]; ok {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
w.set[k] = Entry{
|
||||||
|
Key: k,
|
||||||
|
Priority: priority,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (w *Wantlist) Remove(k u.Key) {
|
||||||
|
delete(w.set, k)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (w *Wantlist) Contains(k u.Key) bool {
|
||||||
|
_, ok := w.set[k]
|
||||||
|
return ok
|
||||||
|
}
|
||||||
|
|
||||||
|
func (w *Wantlist) Entries() []Entry {
|
||||||
|
var es entrySlice
|
||||||
|
for _, e := range w.set {
|
||||||
|
es = append(es, e)
|
||||||
|
}
|
||||||
|
return es
|
||||||
|
}
|
||||||
|
|
||||||
|
func (w *Wantlist) SortedEntries() []Entry {
|
||||||
|
var es entrySlice
|
||||||
|
for _, e := range w.set {
|
||||||
|
es = append(es, e)
|
||||||
|
}
|
||||||
|
sort.Sort(es)
|
||||||
|
return es
|
||||||
|
}
|
@ -2,6 +2,7 @@
|
|||||||
package merkledag
|
package merkledag
|
||||||
|
|
||||||
import (
|
import (
|
||||||
|
"bytes"
|
||||||
"fmt"
|
"fmt"
|
||||||
"sync"
|
"sync"
|
||||||
"time"
|
"time"
|
||||||
@ -294,8 +295,9 @@ func FetchGraph(ctx context.Context, root *Node, serv DAGService) chan struct{}
|
|||||||
// returns the indexes of any links pointing to it
|
// returns the indexes of any links pointing to it
|
||||||
func FindLinks(n *Node, k u.Key) []int {
|
func FindLinks(n *Node, k u.Key) []int {
|
||||||
var out []int
|
var out []int
|
||||||
|
keybytes := []byte(k)
|
||||||
for i, lnk := range n.Links {
|
for i, lnk := range n.Links {
|
||||||
if u.Key(lnk.Hash) == k {
|
if bytes.Equal([]byte(lnk.Hash), keybytes) {
|
||||||
out = append(out, i)
|
out = append(out, i)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
1
net/backpressure/backpressure.go
Normal file
1
net/backpressure/backpressure.go
Normal file
@ -0,0 +1 @@
|
|||||||
|
package backpressure_tests
|
@ -120,15 +120,12 @@ func (dht *IpfsDHT) putProvider(ctx context.Context, p peer.Peer, key string) er
|
|||||||
// add self as the provider
|
// add self as the provider
|
||||||
pmes.ProviderPeers = pb.PeersToPBPeers(dht.network, []peer.Peer{dht.self})
|
pmes.ProviderPeers = pb.PeersToPBPeers(dht.network, []peer.Peer{dht.self})
|
||||||
|
|
||||||
rpmes, err := dht.sendRequest(ctx, p, pmes)
|
err := dht.sendMessage(ctx, p, pmes)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
log.Debugf("%s putProvider: %s for %s", dht.self, p, u.Key(key))
|
log.Debugf("%s putProvider: %s for %s", dht.self, p, u.Key(key))
|
||||||
if rpmes.GetKey() != pmes.GetKey() {
|
|
||||||
return errors.New("provider not added correctly")
|
|
||||||
}
|
|
||||||
|
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
@ -8,8 +8,8 @@ import (
|
|||||||
peer "github.com/jbenet/go-ipfs/peer"
|
peer "github.com/jbenet/go-ipfs/peer"
|
||||||
pb "github.com/jbenet/go-ipfs/routing/dht/pb"
|
pb "github.com/jbenet/go-ipfs/routing/dht/pb"
|
||||||
|
|
||||||
ggio "github.com/jbenet/go-ipfs/Godeps/_workspace/src/code.google.com/p/gogoprotobuf/io"
|
|
||||||
context "github.com/jbenet/go-ipfs/Godeps/_workspace/src/code.google.com/p/go.net/context"
|
context "github.com/jbenet/go-ipfs/Godeps/_workspace/src/code.google.com/p/go.net/context"
|
||||||
|
ggio "github.com/jbenet/go-ipfs/Godeps/_workspace/src/code.google.com/p/gogoprotobuf/io"
|
||||||
)
|
)
|
||||||
|
|
||||||
// handleNewStream implements the inet.StreamHandler
|
// handleNewStream implements the inet.StreamHandler
|
||||||
@ -102,3 +102,24 @@ func (dht *IpfsDHT) sendRequest(ctx context.Context, p peer.Peer, pmes *pb.Messa
|
|||||||
log.Event(ctx, "dhtReceivedMessage", dht.self, p, rpmes)
|
log.Event(ctx, "dhtReceivedMessage", dht.self, p, rpmes)
|
||||||
return rpmes, nil
|
return rpmes, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// sendMessage sends out a message
|
||||||
|
func (dht *IpfsDHT) sendMessage(ctx context.Context, p peer.Peer, pmes *pb.Message) error {
|
||||||
|
|
||||||
|
log.Debugf("%s dht starting stream", dht.self)
|
||||||
|
s, err := dht.network.NewStream(inet.ProtocolDHT, p)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
defer s.Close()
|
||||||
|
|
||||||
|
w := ggio.NewDelimitedWriter(s)
|
||||||
|
|
||||||
|
log.Debugf("%s writing", dht.self)
|
||||||
|
if err := w.WriteMsg(pmes); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
log.Event(ctx, "dhtSentMessage", dht.self, p, pmes)
|
||||||
|
log.Debugf("%s done", dht.self)
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
@ -11,6 +11,7 @@ import (
|
|||||||
pb "github.com/jbenet/go-ipfs/routing/dht/pb"
|
pb "github.com/jbenet/go-ipfs/routing/dht/pb"
|
||||||
kb "github.com/jbenet/go-ipfs/routing/kbucket"
|
kb "github.com/jbenet/go-ipfs/routing/kbucket"
|
||||||
u "github.com/jbenet/go-ipfs/util"
|
u "github.com/jbenet/go-ipfs/util"
|
||||||
|
pset "github.com/jbenet/go-ipfs/util/peerset"
|
||||||
)
|
)
|
||||||
|
|
||||||
// asyncQueryBuffer is the size of buffered channels in async queries. This
|
// asyncQueryBuffer is the size of buffered channels in async queries. This
|
||||||
@ -140,11 +141,11 @@ func (dht *IpfsDHT) FindProvidersAsync(ctx context.Context, key u.Key, count int
|
|||||||
func (dht *IpfsDHT) findProvidersAsyncRoutine(ctx context.Context, key u.Key, count int, peerOut chan peer.Peer) {
|
func (dht *IpfsDHT) findProvidersAsyncRoutine(ctx context.Context, key u.Key, count int, peerOut chan peer.Peer) {
|
||||||
defer close(peerOut)
|
defer close(peerOut)
|
||||||
|
|
||||||
ps := newPeerSet()
|
ps := pset.NewLimited(count)
|
||||||
provs := dht.providers.GetProviders(ctx, key)
|
provs := dht.providers.GetProviders(ctx, key)
|
||||||
for _, p := range provs {
|
for _, p := range provs {
|
||||||
// NOTE: assuming that this list of peers is unique
|
// NOTE: assuming that this list of peers is unique
|
||||||
if ps.AddIfSmallerThan(p, count) {
|
if ps.TryAdd(p) {
|
||||||
select {
|
select {
|
||||||
case peerOut <- p:
|
case peerOut <- p:
|
||||||
case <-ctx.Done():
|
case <-ctx.Done():
|
||||||
@ -175,7 +176,7 @@ func (dht *IpfsDHT) findProvidersAsyncRoutine(ctx context.Context, key u.Key, co
|
|||||||
|
|
||||||
// Add unique providers from request, up to 'count'
|
// Add unique providers from request, up to 'count'
|
||||||
for _, prov := range provs {
|
for _, prov := range provs {
|
||||||
if ps.AddIfSmallerThan(prov, count) {
|
if ps.TryAdd(prov) {
|
||||||
select {
|
select {
|
||||||
case peerOut <- prov:
|
case peerOut <- prov:
|
||||||
case <-ctx.Done():
|
case <-ctx.Done():
|
||||||
@ -207,7 +208,7 @@ func (dht *IpfsDHT) findProvidersAsyncRoutine(ctx context.Context, key u.Key, co
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func (dht *IpfsDHT) addPeerListAsync(ctx context.Context, k u.Key, peers []*pb.Message_Peer, ps *peerSet, count int, out chan peer.Peer) {
|
func (dht *IpfsDHT) addPeerListAsync(ctx context.Context, k u.Key, peers []*pb.Message_Peer, ps *pset.PeerSet, count int, out chan peer.Peer) {
|
||||||
var wg sync.WaitGroup
|
var wg sync.WaitGroup
|
||||||
for _, pbp := range peers {
|
for _, pbp := range peers {
|
||||||
wg.Add(1)
|
wg.Add(1)
|
||||||
@ -225,7 +226,7 @@ func (dht *IpfsDHT) addPeerListAsync(ctx context.Context, k u.Key, peers []*pb.M
|
|||||||
}
|
}
|
||||||
|
|
||||||
dht.providers.AddProvider(k, p)
|
dht.providers.AddProvider(k, p)
|
||||||
if ps.AddIfSmallerThan(p, count) {
|
if ps.TryAdd(p) {
|
||||||
select {
|
select {
|
||||||
case out <- p:
|
case out <- p:
|
||||||
case <-ctx.Done():
|
case <-ctx.Done():
|
||||||
|
@ -2,8 +2,6 @@ package dht
|
|||||||
|
|
||||||
import (
|
import (
|
||||||
"sync"
|
"sync"
|
||||||
|
|
||||||
peer "github.com/jbenet/go-ipfs/peer"
|
|
||||||
)
|
)
|
||||||
|
|
||||||
// Pool size is the number of nodes used for group find/set RPC calls
|
// Pool size is the number of nodes used for group find/set RPC calls
|
||||||
@ -39,45 +37,3 @@ func (c *counter) Size() (s int) {
|
|||||||
c.mut.Unlock()
|
c.mut.Unlock()
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
// peerSet is a threadsafe set of peers
|
|
||||||
type peerSet struct {
|
|
||||||
ps map[string]bool
|
|
||||||
lk sync.RWMutex
|
|
||||||
}
|
|
||||||
|
|
||||||
func newPeerSet() *peerSet {
|
|
||||||
ps := new(peerSet)
|
|
||||||
ps.ps = make(map[string]bool)
|
|
||||||
return ps
|
|
||||||
}
|
|
||||||
|
|
||||||
func (ps *peerSet) Add(p peer.Peer) {
|
|
||||||
ps.lk.Lock()
|
|
||||||
ps.ps[string(p.ID())] = true
|
|
||||||
ps.lk.Unlock()
|
|
||||||
}
|
|
||||||
|
|
||||||
func (ps *peerSet) Contains(p peer.Peer) bool {
|
|
||||||
ps.lk.RLock()
|
|
||||||
_, ok := ps.ps[string(p.ID())]
|
|
||||||
ps.lk.RUnlock()
|
|
||||||
return ok
|
|
||||||
}
|
|
||||||
|
|
||||||
func (ps *peerSet) Size() int {
|
|
||||||
ps.lk.RLock()
|
|
||||||
defer ps.lk.RUnlock()
|
|
||||||
return len(ps.ps)
|
|
||||||
}
|
|
||||||
|
|
||||||
func (ps *peerSet) AddIfSmallerThan(p peer.Peer, maxsize int) bool {
|
|
||||||
var success bool
|
|
||||||
ps.lk.Lock()
|
|
||||||
if _, ok := ps.ps[string(p.ID())]; !ok && len(ps.ps) < maxsize {
|
|
||||||
success = true
|
|
||||||
ps.ps[string(p.ID())] = true
|
|
||||||
}
|
|
||||||
ps.lk.Unlock()
|
|
||||||
return success
|
|
||||||
}
|
|
||||||
|
@ -36,6 +36,9 @@ func TestClientFindProviders(t *testing.T) {
|
|||||||
if err != nil {
|
if err != nil {
|
||||||
t.Fatal(err)
|
t.Fatal(err)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// This is bad... but simulating networks is hard
|
||||||
|
time.Sleep(time.Millisecond * 300)
|
||||||
max := 100
|
max := 100
|
||||||
|
|
||||||
providersFromHashTable, err := rs.Client(peer).FindProviders(context.Background(), k)
|
providersFromHashTable, err := rs.Client(peer).FindProviders(context.Background(), k)
|
||||||
@ -160,6 +163,7 @@ func TestValidAfter(t *testing.T) {
|
|||||||
if err != nil {
|
if err != nil {
|
||||||
t.Fatal(err)
|
t.Fatal(err)
|
||||||
}
|
}
|
||||||
|
t.Log("providers", providers)
|
||||||
if len(providers) != 1 {
|
if len(providers) != 1 {
|
||||||
t.Fail()
|
t.Fail()
|
||||||
}
|
}
|
||||||
|
61
util/peerset/peerset.go
Normal file
61
util/peerset/peerset.go
Normal file
@ -0,0 +1,61 @@
|
|||||||
|
package peerset
|
||||||
|
|
||||||
|
import (
|
||||||
|
peer "github.com/jbenet/go-ipfs/peer"
|
||||||
|
"sync"
|
||||||
|
)
|
||||||
|
|
||||||
|
// PeerSet is a threadsafe set of peers
|
||||||
|
type PeerSet struct {
|
||||||
|
ps map[string]bool // FIXME can be map[string]struct{}
|
||||||
|
lk sync.RWMutex
|
||||||
|
size int
|
||||||
|
}
|
||||||
|
|
||||||
|
func New() *PeerSet {
|
||||||
|
ps := new(PeerSet)
|
||||||
|
ps.ps = make(map[string]bool)
|
||||||
|
ps.size = -1
|
||||||
|
return ps
|
||||||
|
}
|
||||||
|
|
||||||
|
func NewLimited(size int) *PeerSet {
|
||||||
|
ps := new(PeerSet)
|
||||||
|
ps.ps = make(map[string]bool)
|
||||||
|
ps.size = size
|
||||||
|
return ps
|
||||||
|
}
|
||||||
|
|
||||||
|
func (ps *PeerSet) Add(p peer.Peer) {
|
||||||
|
ps.lk.Lock()
|
||||||
|
ps.ps[string(p.ID())] = true
|
||||||
|
ps.lk.Unlock()
|
||||||
|
}
|
||||||
|
|
||||||
|
func (ps *PeerSet) Contains(p peer.Peer) bool {
|
||||||
|
ps.lk.RLock()
|
||||||
|
_, ok := ps.ps[string(p.ID())]
|
||||||
|
ps.lk.RUnlock()
|
||||||
|
return ok
|
||||||
|
}
|
||||||
|
|
||||||
|
func (ps *PeerSet) Size() int {
|
||||||
|
ps.lk.RLock()
|
||||||
|
defer ps.lk.RUnlock()
|
||||||
|
return len(ps.ps)
|
||||||
|
}
|
||||||
|
|
||||||
|
// TryAdd Attempts to add the given peer into the set.
|
||||||
|
// This operation can fail for one of two reasons:
|
||||||
|
// 1) The given peer is already in the set
|
||||||
|
// 2) The number of peers in the set is equal to size
|
||||||
|
func (ps *PeerSet) TryAdd(p peer.Peer) bool {
|
||||||
|
var success bool
|
||||||
|
ps.lk.Lock()
|
||||||
|
if _, ok := ps.ps[string(p.ID())]; !ok && (len(ps.ps) < ps.size || ps.size == -1) {
|
||||||
|
success = true
|
||||||
|
ps.ps[string(p.ID())] = true
|
||||||
|
}
|
||||||
|
ps.lk.Unlock()
|
||||||
|
return success
|
||||||
|
}
|
Reference in New Issue
Block a user