mirror of
https://github.com/ipfs/kubo.git
synced 2025-09-09 23:42:20 +08:00
move util.Key into its own package under blocks
This commit is contained in:
@ -12,6 +12,7 @@ import (
|
||||
context "github.com/ipfs/go-ipfs/Godeps/_workspace/src/golang.org/x/net/context"
|
||||
blocks "github.com/ipfs/go-ipfs/blocks"
|
||||
blockstore "github.com/ipfs/go-ipfs/blocks/blockstore"
|
||||
key "github.com/ipfs/go-ipfs/blocks/key"
|
||||
exchange "github.com/ipfs/go-ipfs/exchange"
|
||||
decision "github.com/ipfs/go-ipfs/exchange/bitswap/decision"
|
||||
bsmsg "github.com/ipfs/go-ipfs/exchange/bitswap/message"
|
||||
@ -21,7 +22,6 @@ import (
|
||||
peer "github.com/ipfs/go-ipfs/p2p/peer"
|
||||
"github.com/ipfs/go-ipfs/thirdparty/delay"
|
||||
eventlog "github.com/ipfs/go-ipfs/thirdparty/eventlog"
|
||||
u "github.com/ipfs/go-ipfs/util"
|
||||
)
|
||||
|
||||
var log = eventlog.Logger("bitswap")
|
||||
@ -85,7 +85,7 @@ func New(parent context.Context, p peer.ID, network bsnet.BitSwapNetwork,
|
||||
findKeys: make(chan *blockRequest, sizeBatchRequestChan),
|
||||
process: px,
|
||||
newBlocks: make(chan *blocks.Block, HasBlockBufferSize),
|
||||
provideKeys: make(chan u.Key),
|
||||
provideKeys: make(chan key.Key),
|
||||
wm: NewWantManager(ctx, network),
|
||||
}
|
||||
go bs.wm.Run()
|
||||
@ -124,7 +124,7 @@ type Bitswap struct {
|
||||
|
||||
newBlocks chan *blocks.Block
|
||||
|
||||
provideKeys chan u.Key
|
||||
provideKeys chan key.Key
|
||||
|
||||
counterLk sync.Mutex
|
||||
blocksRecvd int
|
||||
@ -132,13 +132,13 @@ type Bitswap struct {
|
||||
}
|
||||
|
||||
type blockRequest struct {
|
||||
keys []u.Key
|
||||
keys []key.Key
|
||||
ctx context.Context
|
||||
}
|
||||
|
||||
// GetBlock attempts to retrieve a particular block from peers within the
|
||||
// deadline enforced by the context.
|
||||
func (bs *Bitswap) GetBlock(parent context.Context, k u.Key) (*blocks.Block, error) {
|
||||
func (bs *Bitswap) GetBlock(parent context.Context, k key.Key) (*blocks.Block, error) {
|
||||
|
||||
// Any async work initiated by this function must end when this function
|
||||
// returns. To ensure this, derive a new context. Note that it is okay to
|
||||
@ -156,7 +156,7 @@ func (bs *Bitswap) GetBlock(parent context.Context, k u.Key) (*blocks.Block, err
|
||||
cancelFunc()
|
||||
}()
|
||||
|
||||
promise, err := bs.GetBlocks(ctx, []u.Key{k})
|
||||
promise, err := bs.GetBlocks(ctx, []key.Key{k})
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
@ -177,8 +177,8 @@ func (bs *Bitswap) GetBlock(parent context.Context, k u.Key) (*blocks.Block, err
|
||||
}
|
||||
}
|
||||
|
||||
func (bs *Bitswap) WantlistForPeer(p peer.ID) []u.Key {
|
||||
var out []u.Key
|
||||
func (bs *Bitswap) WantlistForPeer(p peer.ID) []key.Key {
|
||||
var out []key.Key
|
||||
for _, e := range bs.engine.WantlistForPeer(p) {
|
||||
out = append(out, e.Key)
|
||||
}
|
||||
@ -192,7 +192,7 @@ func (bs *Bitswap) WantlistForPeer(p peer.ID) []u.Key {
|
||||
// NB: Your request remains open until the context expires. To conserve
|
||||
// resources, provide a context with a reasonably short deadline (ie. not one
|
||||
// that lasts throughout the lifetime of the server)
|
||||
func (bs *Bitswap) GetBlocks(ctx context.Context, keys []u.Key) (<-chan *blocks.Block, error) {
|
||||
func (bs *Bitswap) GetBlocks(ctx context.Context, keys []key.Key) (<-chan *blocks.Block, error) {
|
||||
select {
|
||||
case <-bs.process.Closing():
|
||||
return nil, errors.New("bitswap is closed")
|
||||
@ -246,7 +246,7 @@ func (bs *Bitswap) connectToProviders(ctx context.Context, entries []wantlist.En
|
||||
wg := sync.WaitGroup{}
|
||||
for _, e := range entries {
|
||||
wg.Add(1)
|
||||
go func(k u.Key) {
|
||||
go func(k key.Key) {
|
||||
defer wg.Done()
|
||||
|
||||
child, cancel := context.WithTimeout(ctx, providerRequestTimeout)
|
||||
@ -277,7 +277,7 @@ func (bs *Bitswap) ReceiveMessage(ctx context.Context, p peer.ID, incoming bsmsg
|
||||
}
|
||||
|
||||
// quickly send out cancels, reduces chances of duplicate block receives
|
||||
var keys []u.Key
|
||||
var keys []key.Key
|
||||
for _, block := range iblocks {
|
||||
if _, found := bs.wm.wl.Contains(block.Key()); !found {
|
||||
log.Notice("received un-asked-for block: %s", block)
|
||||
@ -342,8 +342,8 @@ func (bs *Bitswap) Close() error {
|
||||
return bs.process.Close()
|
||||
}
|
||||
|
||||
func (bs *Bitswap) GetWantlist() []u.Key {
|
||||
var out []u.Key
|
||||
func (bs *Bitswap) GetWantlist() []key.Key {
|
||||
var out []key.Key
|
||||
for _, e := range bs.wm.wl.Entries() {
|
||||
out = append(out, e.Key)
|
||||
}
|
||||
|
@ -12,11 +12,11 @@ import (
|
||||
|
||||
blocks "github.com/ipfs/go-ipfs/blocks"
|
||||
blocksutil "github.com/ipfs/go-ipfs/blocks/blocksutil"
|
||||
key "github.com/ipfs/go-ipfs/blocks/key"
|
||||
tn "github.com/ipfs/go-ipfs/exchange/bitswap/testnet"
|
||||
p2ptestutil "github.com/ipfs/go-ipfs/p2p/test/util"
|
||||
mockrouting "github.com/ipfs/go-ipfs/routing/mock"
|
||||
delay "github.com/ipfs/go-ipfs/thirdparty/delay"
|
||||
u "github.com/ipfs/go-ipfs/util"
|
||||
)
|
||||
|
||||
// FIXME the tests are really sensitive to the network delay. fix them to work
|
||||
@ -155,7 +155,7 @@ func PerformDistributionTest(t *testing.T, numInstances, numBlocks int) {
|
||||
|
||||
t.Log("Give the blocks to the first instance")
|
||||
|
||||
var blkeys []u.Key
|
||||
var blkeys []key.Key
|
||||
first := instances[0]
|
||||
for _, b := range blocks {
|
||||
blkeys = append(blkeys, b.Key())
|
||||
@ -227,7 +227,7 @@ func TestSendToWantingPeer(t *testing.T) {
|
||||
alpha := bg.Next()
|
||||
// peerA requests and waits for block alpha
|
||||
ctx, _ := context.WithTimeout(context.TODO(), waitTime)
|
||||
alphaPromise, err := peerA.Exchange.GetBlocks(ctx, []u.Key{alpha.Key()})
|
||||
alphaPromise, err := peerA.Exchange.GetBlocks(ctx, []key.Key{alpha.Key()})
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
@ -4,9 +4,9 @@ import (
|
||||
"math"
|
||||
"testing"
|
||||
|
||||
key "github.com/ipfs/go-ipfs/blocks/key"
|
||||
"github.com/ipfs/go-ipfs/exchange/bitswap/wantlist"
|
||||
"github.com/ipfs/go-ipfs/p2p/peer"
|
||||
"github.com/ipfs/go-ipfs/util"
|
||||
"github.com/ipfs/go-ipfs/util/testutil"
|
||||
)
|
||||
|
||||
@ -21,6 +21,6 @@ func BenchmarkTaskQueuePush(b *testing.B) {
|
||||
}
|
||||
b.ResetTimer()
|
||||
for i := 0; i < b.N; i++ {
|
||||
q.Push(wantlist.Entry{Key: util.Key(i), Priority: math.MaxInt32}, peers[i%len(peers)])
|
||||
q.Push(wantlist.Entry{Key: key.Key(i), Priority: math.MaxInt32}, peers[i%len(peers)])
|
||||
}
|
||||
}
|
||||
|
@ -3,20 +3,20 @@ package decision
|
||||
import (
|
||||
"time"
|
||||
|
||||
key "github.com/ipfs/go-ipfs/blocks/key"
|
||||
wl "github.com/ipfs/go-ipfs/exchange/bitswap/wantlist"
|
||||
peer "github.com/ipfs/go-ipfs/p2p/peer"
|
||||
u "github.com/ipfs/go-ipfs/util"
|
||||
)
|
||||
|
||||
// keySet is just a convenient alias for maps of keys, where we only care
|
||||
// access/lookups.
|
||||
type keySet map[u.Key]struct{}
|
||||
type keySet map[key.Key]struct{}
|
||||
|
||||
func newLedger(p peer.ID) *ledger {
|
||||
return &ledger{
|
||||
wantList: wl.New(),
|
||||
Partner: p,
|
||||
sentToPeer: make(map[u.Key]time.Time),
|
||||
sentToPeer: make(map[key.Key]time.Time),
|
||||
}
|
||||
}
|
||||
|
||||
@ -43,7 +43,7 @@ type ledger struct {
|
||||
|
||||
// sentToPeer is a set of keys to ensure we dont send duplicate blocks
|
||||
// to a given peer
|
||||
sentToPeer map[u.Key]time.Time
|
||||
sentToPeer map[key.Key]time.Time
|
||||
}
|
||||
|
||||
type debtRatio struct {
|
||||
@ -68,16 +68,16 @@ func (l *ledger) ReceivedBytes(n int) {
|
||||
}
|
||||
|
||||
// TODO: this needs to be different. We need timeouts.
|
||||
func (l *ledger) Wants(k u.Key, priority int) {
|
||||
func (l *ledger) Wants(k key.Key, priority int) {
|
||||
log.Debugf("peer %s wants %s", l.Partner, k)
|
||||
l.wantList.Add(k, priority)
|
||||
}
|
||||
|
||||
func (l *ledger) CancelWant(k u.Key) {
|
||||
func (l *ledger) CancelWant(k key.Key) {
|
||||
l.wantList.Remove(k)
|
||||
}
|
||||
|
||||
func (l *ledger) WantListContains(k u.Key) (wl.Entry, bool) {
|
||||
func (l *ledger) WantListContains(k key.Key) (wl.Entry, bool) {
|
||||
return l.wantList.Contains(k)
|
||||
}
|
||||
|
||||
|
@ -4,17 +4,17 @@ import (
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
key "github.com/ipfs/go-ipfs/blocks/key"
|
||||
wantlist "github.com/ipfs/go-ipfs/exchange/bitswap/wantlist"
|
||||
peer "github.com/ipfs/go-ipfs/p2p/peer"
|
||||
pq "github.com/ipfs/go-ipfs/thirdparty/pq"
|
||||
u "github.com/ipfs/go-ipfs/util"
|
||||
)
|
||||
|
||||
type peerRequestQueue interface {
|
||||
// Pop returns the next peerRequestTask. Returns nil if the peerRequestQueue is empty.
|
||||
Pop() *peerRequestTask
|
||||
Push(entry wantlist.Entry, to peer.ID)
|
||||
Remove(k u.Key, p peer.ID)
|
||||
Remove(k key.Key, p peer.ID)
|
||||
// NB: cannot expose simply expose taskQueue.Len because trashed elements
|
||||
// may exist. These trashed elements should not contribute to the count.
|
||||
}
|
||||
@ -110,7 +110,7 @@ func (tl *prq) Pop() *peerRequestTask {
|
||||
}
|
||||
|
||||
// Remove removes a task from the queue
|
||||
func (tl *prq) Remove(k u.Key, p peer.ID) {
|
||||
func (tl *prq) Remove(k key.Key, p peer.ID) {
|
||||
tl.lock.Lock()
|
||||
t, ok := tl.taskMap[taskKey(p, k)]
|
||||
if ok {
|
||||
@ -155,7 +155,7 @@ func (t *peerRequestTask) SetIndex(i int) {
|
||||
}
|
||||
|
||||
// taskKey returns a key that uniquely identifies a task.
|
||||
func taskKey(p peer.ID, k u.Key) string {
|
||||
func taskKey(p peer.ID, k key.Key) string {
|
||||
return string(p) + string(k)
|
||||
}
|
||||
|
||||
@ -186,7 +186,7 @@ type activePartner struct {
|
||||
activelk sync.Mutex
|
||||
active int
|
||||
|
||||
activeBlocks map[u.Key]struct{}
|
||||
activeBlocks map[key.Key]struct{}
|
||||
|
||||
// requests is the number of blocks this peer is currently requesting
|
||||
// request need not be locked around as it will only be modified under
|
||||
@ -203,7 +203,7 @@ type activePartner struct {
|
||||
func newActivePartner() *activePartner {
|
||||
return &activePartner{
|
||||
taskQueue: pq.New(wrapCmp(V1)),
|
||||
activeBlocks: make(map[u.Key]struct{}),
|
||||
activeBlocks: make(map[key.Key]struct{}),
|
||||
}
|
||||
}
|
||||
|
||||
@ -230,7 +230,7 @@ func partnerCompare(a, b pq.Elem) bool {
|
||||
}
|
||||
|
||||
// StartTask signals that a task was started for this partner
|
||||
func (p *activePartner) StartTask(k u.Key) {
|
||||
func (p *activePartner) StartTask(k key.Key) {
|
||||
p.activelk.Lock()
|
||||
p.activeBlocks[k] = struct{}{}
|
||||
p.active++
|
||||
@ -238,7 +238,7 @@ func (p *activePartner) StartTask(k u.Key) {
|
||||
}
|
||||
|
||||
// TaskDone signals that a task was completed for this partner
|
||||
func (p *activePartner) TaskDone(k u.Key) {
|
||||
func (p *activePartner) TaskDone(k key.Key) {
|
||||
p.activelk.Lock()
|
||||
delete(p.activeBlocks, k)
|
||||
p.active--
|
||||
|
@ -7,8 +7,8 @@ import (
|
||||
"strings"
|
||||
"testing"
|
||||
|
||||
key "github.com/ipfs/go-ipfs/blocks/key"
|
||||
"github.com/ipfs/go-ipfs/exchange/bitswap/wantlist"
|
||||
"github.com/ipfs/go-ipfs/util"
|
||||
"github.com/ipfs/go-ipfs/util/testutil"
|
||||
)
|
||||
|
||||
@ -41,10 +41,10 @@ func TestPushPop(t *testing.T) {
|
||||
for _, index := range rand.Perm(len(alphabet)) { // add blocks for all letters
|
||||
letter := alphabet[index]
|
||||
t.Log(partner.String())
|
||||
prq.Push(wantlist.Entry{Key: util.Key(letter), Priority: math.MaxInt32 - index}, partner)
|
||||
prq.Push(wantlist.Entry{Key: key.Key(letter), Priority: math.MaxInt32 - index}, partner)
|
||||
}
|
||||
for _, consonant := range consonants {
|
||||
prq.Remove(util.Key(consonant), partner)
|
||||
prq.Remove(key.Key(consonant), partner)
|
||||
}
|
||||
|
||||
var out []string
|
||||
@ -76,10 +76,10 @@ func TestPeerRepeats(t *testing.T) {
|
||||
// Have each push some blocks
|
||||
|
||||
for i := 0; i < 5; i++ {
|
||||
prq.Push(wantlist.Entry{Key: util.Key(i)}, a)
|
||||
prq.Push(wantlist.Entry{Key: util.Key(i)}, b)
|
||||
prq.Push(wantlist.Entry{Key: util.Key(i)}, c)
|
||||
prq.Push(wantlist.Entry{Key: util.Key(i)}, d)
|
||||
prq.Push(wantlist.Entry{Key: key.Key(i)}, a)
|
||||
prq.Push(wantlist.Entry{Key: key.Key(i)}, b)
|
||||
prq.Push(wantlist.Entry{Key: key.Key(i)}, c)
|
||||
prq.Push(wantlist.Entry{Key: key.Key(i)}, d)
|
||||
}
|
||||
|
||||
// now, pop off four entries, there should be one from each
|
||||
|
@ -4,10 +4,10 @@ import (
|
||||
"io"
|
||||
|
||||
blocks "github.com/ipfs/go-ipfs/blocks"
|
||||
key "github.com/ipfs/go-ipfs/blocks/key"
|
||||
pb "github.com/ipfs/go-ipfs/exchange/bitswap/message/internal/pb"
|
||||
wantlist "github.com/ipfs/go-ipfs/exchange/bitswap/wantlist"
|
||||
inet "github.com/ipfs/go-ipfs/p2p/net"
|
||||
u "github.com/ipfs/go-ipfs/util"
|
||||
|
||||
ggio "github.com/ipfs/go-ipfs/Godeps/_workspace/src/github.com/gogo/protobuf/io"
|
||||
proto "github.com/ipfs/go-ipfs/Godeps/_workspace/src/github.com/gogo/protobuf/proto"
|
||||
@ -25,9 +25,9 @@ type BitSwapMessage interface {
|
||||
Blocks() []*blocks.Block
|
||||
|
||||
// AddEntry adds an entry to the Wantlist.
|
||||
AddEntry(key u.Key, priority int)
|
||||
AddEntry(key key.Key, priority int)
|
||||
|
||||
Cancel(key u.Key)
|
||||
Cancel(key key.Key)
|
||||
|
||||
Empty() bool
|
||||
|
||||
@ -47,8 +47,8 @@ type Exportable interface {
|
||||
|
||||
type impl struct {
|
||||
full bool
|
||||
wantlist map[u.Key]Entry
|
||||
blocks map[u.Key]*blocks.Block
|
||||
wantlist map[key.Key]Entry
|
||||
blocks map[key.Key]*blocks.Block
|
||||
}
|
||||
|
||||
func New(full bool) BitSwapMessage {
|
||||
@ -57,8 +57,8 @@ func New(full bool) BitSwapMessage {
|
||||
|
||||
func newMsg(full bool) *impl {
|
||||
return &impl{
|
||||
blocks: make(map[u.Key]*blocks.Block),
|
||||
wantlist: make(map[u.Key]Entry),
|
||||
blocks: make(map[key.Key]*blocks.Block),
|
||||
wantlist: make(map[key.Key]Entry),
|
||||
full: full,
|
||||
}
|
||||
}
|
||||
@ -71,7 +71,7 @@ type Entry struct {
|
||||
func newMessageFromProto(pbm pb.Message) BitSwapMessage {
|
||||
m := newMsg(pbm.GetWantlist().GetFull())
|
||||
for _, e := range pbm.GetWantlist().GetEntries() {
|
||||
m.addEntry(u.Key(e.GetBlock()), int(e.GetPriority()), e.GetCancel())
|
||||
m.addEntry(key.Key(e.GetBlock()), int(e.GetPriority()), e.GetCancel())
|
||||
}
|
||||
for _, d := range pbm.GetBlocks() {
|
||||
b := blocks.NewBlock(d)
|
||||
@ -104,16 +104,16 @@ func (m *impl) Blocks() []*blocks.Block {
|
||||
return bs
|
||||
}
|
||||
|
||||
func (m *impl) Cancel(k u.Key) {
|
||||
func (m *impl) Cancel(k key.Key) {
|
||||
delete(m.wantlist, k)
|
||||
m.addEntry(k, 0, true)
|
||||
}
|
||||
|
||||
func (m *impl) AddEntry(k u.Key, priority int) {
|
||||
func (m *impl) AddEntry(k key.Key, priority int) {
|
||||
m.addEntry(k, priority, false)
|
||||
}
|
||||
|
||||
func (m *impl) addEntry(k u.Key, priority int, cancel bool) {
|
||||
func (m *impl) addEntry(k key.Key, priority int, cancel bool) {
|
||||
e, exists := m.wantlist[k]
|
||||
if exists {
|
||||
e.Priority = priority
|
||||
|
@ -7,14 +7,14 @@ import (
|
||||
proto "github.com/ipfs/go-ipfs/Godeps/_workspace/src/github.com/gogo/protobuf/proto"
|
||||
|
||||
blocks "github.com/ipfs/go-ipfs/blocks"
|
||||
key "github.com/ipfs/go-ipfs/blocks/key"
|
||||
pb "github.com/ipfs/go-ipfs/exchange/bitswap/message/internal/pb"
|
||||
u "github.com/ipfs/go-ipfs/util"
|
||||
)
|
||||
|
||||
func TestAppendWanted(t *testing.T) {
|
||||
const str = "foo"
|
||||
m := New(true)
|
||||
m.AddEntry(u.Key(str), 1)
|
||||
m.AddEntry(key.Key(str), 1)
|
||||
|
||||
if !wantlistContains(m.ToProto().GetWantlist(), str) {
|
||||
t.Fail()
|
||||
@ -63,7 +63,7 @@ func TestWantlist(t *testing.T) {
|
||||
keystrs := []string{"foo", "bar", "baz", "bat"}
|
||||
m := New(true)
|
||||
for _, s := range keystrs {
|
||||
m.AddEntry(u.Key(s), 1)
|
||||
m.AddEntry(key.Key(s), 1)
|
||||
}
|
||||
exported := m.Wantlist()
|
||||
|
||||
@ -86,7 +86,7 @@ func TestCopyProtoByValue(t *testing.T) {
|
||||
const str = "foo"
|
||||
m := New(true)
|
||||
protoBeforeAppend := m.ToProto()
|
||||
m.AddEntry(u.Key(str), 1)
|
||||
m.AddEntry(key.Key(str), 1)
|
||||
if wantlistContains(protoBeforeAppend.GetWantlist(), str) {
|
||||
t.Fail()
|
||||
}
|
||||
@ -94,11 +94,11 @@ func TestCopyProtoByValue(t *testing.T) {
|
||||
|
||||
func TestToNetFromNetPreservesWantList(t *testing.T) {
|
||||
original := New(true)
|
||||
original.AddEntry(u.Key("M"), 1)
|
||||
original.AddEntry(u.Key("B"), 1)
|
||||
original.AddEntry(u.Key("D"), 1)
|
||||
original.AddEntry(u.Key("T"), 1)
|
||||
original.AddEntry(u.Key("F"), 1)
|
||||
original.AddEntry(key.Key("M"), 1)
|
||||
original.AddEntry(key.Key("B"), 1)
|
||||
original.AddEntry(key.Key("D"), 1)
|
||||
original.AddEntry(key.Key("T"), 1)
|
||||
original.AddEntry(key.Key("F"), 1)
|
||||
|
||||
buf := new(bytes.Buffer)
|
||||
if err := original.ToNet(buf); err != nil {
|
||||
@ -110,7 +110,7 @@ func TestToNetFromNetPreservesWantList(t *testing.T) {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
keys := make(map[u.Key]bool)
|
||||
keys := make(map[key.Key]bool)
|
||||
for _, k := range copied.Wantlist() {
|
||||
keys[k.Key] = true
|
||||
}
|
||||
@ -140,7 +140,7 @@ func TestToAndFromNetMessage(t *testing.T) {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
keys := make(map[u.Key]bool)
|
||||
keys := make(map[key.Key]bool)
|
||||
for _, b := range m2.Blocks() {
|
||||
keys[b.Key()] = true
|
||||
}
|
||||
|
@ -2,10 +2,10 @@ package network
|
||||
|
||||
import (
|
||||
context "github.com/ipfs/go-ipfs/Godeps/_workspace/src/golang.org/x/net/context"
|
||||
key "github.com/ipfs/go-ipfs/blocks/key"
|
||||
bsmsg "github.com/ipfs/go-ipfs/exchange/bitswap/message"
|
||||
peer "github.com/ipfs/go-ipfs/p2p/peer"
|
||||
protocol "github.com/ipfs/go-ipfs/p2p/protocol"
|
||||
u "github.com/ipfs/go-ipfs/util"
|
||||
)
|
||||
|
||||
var ProtocolBitswap protocol.ID = "/ipfs/bitswap"
|
||||
@ -44,8 +44,8 @@ type Receiver interface {
|
||||
|
||||
type Routing interface {
|
||||
// FindProvidersAsync returns a channel of providers for the given key
|
||||
FindProvidersAsync(context.Context, u.Key, int) <-chan peer.ID
|
||||
FindProvidersAsync(context.Context, key.Key, int) <-chan peer.ID
|
||||
|
||||
// Provide provides the key to the network
|
||||
Provide(context.Context, u.Key) error
|
||||
Provide(context.Context, key.Key) error
|
||||
}
|
||||
|
@ -3,13 +3,13 @@ package network
|
||||
import (
|
||||
ma "github.com/ipfs/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-multiaddr"
|
||||
context "github.com/ipfs/go-ipfs/Godeps/_workspace/src/golang.org/x/net/context"
|
||||
key "github.com/ipfs/go-ipfs/blocks/key"
|
||||
bsmsg "github.com/ipfs/go-ipfs/exchange/bitswap/message"
|
||||
host "github.com/ipfs/go-ipfs/p2p/host"
|
||||
inet "github.com/ipfs/go-ipfs/p2p/net"
|
||||
peer "github.com/ipfs/go-ipfs/p2p/peer"
|
||||
routing "github.com/ipfs/go-ipfs/routing"
|
||||
eventlog "github.com/ipfs/go-ipfs/thirdparty/eventlog"
|
||||
util "github.com/ipfs/go-ipfs/util"
|
||||
)
|
||||
|
||||
var log = eventlog.Logger("bitswap_network")
|
||||
@ -102,7 +102,7 @@ func (bsnet *impl) ConnectTo(ctx context.Context, p peer.ID) error {
|
||||
}
|
||||
|
||||
// FindProvidersAsync returns a channel of providers for the given key
|
||||
func (bsnet *impl) FindProvidersAsync(ctx context.Context, k util.Key, max int) <-chan peer.ID {
|
||||
func (bsnet *impl) FindProvidersAsync(ctx context.Context, k key.Key, max int) <-chan peer.ID {
|
||||
|
||||
// Since routing queries are expensive, give bitswap the peers to which we
|
||||
// have open connections. Note that this may cause issues if bitswap starts
|
||||
@ -138,7 +138,7 @@ func (bsnet *impl) FindProvidersAsync(ctx context.Context, k util.Key, max int)
|
||||
}
|
||||
|
||||
// Provide provides the key to the network
|
||||
func (bsnet *impl) Provide(ctx context.Context, k util.Key) error {
|
||||
func (bsnet *impl) Provide(ctx context.Context, k key.Key) error {
|
||||
return bsnet.routing.Provide(ctx, k)
|
||||
}
|
||||
|
||||
|
@ -4,14 +4,14 @@ import (
|
||||
pubsub "github.com/ipfs/go-ipfs/Godeps/_workspace/src/github.com/briantigerchow/pubsub"
|
||||
context "github.com/ipfs/go-ipfs/Godeps/_workspace/src/golang.org/x/net/context"
|
||||
blocks "github.com/ipfs/go-ipfs/blocks"
|
||||
u "github.com/ipfs/go-ipfs/util"
|
||||
key "github.com/ipfs/go-ipfs/blocks/key"
|
||||
)
|
||||
|
||||
const bufferSize = 16
|
||||
|
||||
type PubSub interface {
|
||||
Publish(block *blocks.Block)
|
||||
Subscribe(ctx context.Context, keys ...u.Key) <-chan *blocks.Block
|
||||
Subscribe(ctx context.Context, keys ...key.Key) <-chan *blocks.Block
|
||||
Shutdown()
|
||||
}
|
||||
|
||||
@ -35,7 +35,7 @@ func (ps *impl) Shutdown() {
|
||||
// Subscribe returns a channel of blocks for the given |keys|. |blockChannel|
|
||||
// is closed if the |ctx| times out or is cancelled, or after sending len(keys)
|
||||
// blocks.
|
||||
func (ps *impl) Subscribe(ctx context.Context, keys ...u.Key) <-chan *blocks.Block {
|
||||
func (ps *impl) Subscribe(ctx context.Context, keys ...key.Key) <-chan *blocks.Block {
|
||||
|
||||
blocksCh := make(chan *blocks.Block, len(keys))
|
||||
valuesCh := make(chan interface{}, len(keys)) // provide our own channel to control buffer, prevent blocking
|
||||
@ -71,7 +71,7 @@ func (ps *impl) Subscribe(ctx context.Context, keys ...u.Key) <-chan *blocks.Blo
|
||||
return blocksCh
|
||||
}
|
||||
|
||||
func toStrings(keys []u.Key) []string {
|
||||
func toStrings(keys []key.Key) []string {
|
||||
strs := make([]string, 0)
|
||||
for _, key := range keys {
|
||||
strs = append(strs, string(key))
|
||||
|
@ -8,7 +8,7 @@ import (
|
||||
context "github.com/ipfs/go-ipfs/Godeps/_workspace/src/golang.org/x/net/context"
|
||||
blocks "github.com/ipfs/go-ipfs/blocks"
|
||||
blocksutil "github.com/ipfs/go-ipfs/blocks/blocksutil"
|
||||
"github.com/ipfs/go-ipfs/util"
|
||||
key "github.com/ipfs/go-ipfs/blocks/key"
|
||||
)
|
||||
|
||||
func TestDuplicates(t *testing.T) {
|
||||
@ -131,8 +131,8 @@ func TestDoesNotDeadLockIfContextCancelledBeforePublish(t *testing.T) {
|
||||
|
||||
t.Log("generate a large number of blocks. exceed default buffer")
|
||||
bs := g.Blocks(1000)
|
||||
ks := func() []util.Key {
|
||||
var keys []util.Key
|
||||
ks := func() []key.Key {
|
||||
var keys []key.Key
|
||||
for _, b := range bs {
|
||||
keys = append(keys, b.Key())
|
||||
}
|
||||
|
@ -1,13 +1,13 @@
|
||||
package bitswap
|
||||
|
||||
import (
|
||||
u "github.com/ipfs/go-ipfs/util"
|
||||
key "github.com/ipfs/go-ipfs/blocks/key"
|
||||
"sort"
|
||||
)
|
||||
|
||||
type Stat struct {
|
||||
ProvideBufLen int
|
||||
Wantlist []u.Key
|
||||
Wantlist []key.Key
|
||||
Peers []string
|
||||
BlocksReceived int
|
||||
DupBlksReceived int
|
||||
|
@ -4,13 +4,13 @@ import (
|
||||
"errors"
|
||||
|
||||
context "github.com/ipfs/go-ipfs/Godeps/_workspace/src/golang.org/x/net/context"
|
||||
key "github.com/ipfs/go-ipfs/blocks/key"
|
||||
bsmsg "github.com/ipfs/go-ipfs/exchange/bitswap/message"
|
||||
bsnet "github.com/ipfs/go-ipfs/exchange/bitswap/network"
|
||||
peer "github.com/ipfs/go-ipfs/p2p/peer"
|
||||
routing "github.com/ipfs/go-ipfs/routing"
|
||||
mockrouting "github.com/ipfs/go-ipfs/routing/mock"
|
||||
delay "github.com/ipfs/go-ipfs/thirdparty/delay"
|
||||
util "github.com/ipfs/go-ipfs/util"
|
||||
testutil "github.com/ipfs/go-ipfs/util/testutil"
|
||||
)
|
||||
|
||||
@ -91,7 +91,7 @@ func (nc *networkClient) SendMessage(
|
||||
}
|
||||
|
||||
// FindProvidersAsync returns a channel of providers for the given key
|
||||
func (nc *networkClient) FindProvidersAsync(ctx context.Context, k util.Key, max int) <-chan peer.ID {
|
||||
func (nc *networkClient) FindProvidersAsync(ctx context.Context, k key.Key, max int) <-chan peer.ID {
|
||||
|
||||
// NB: this function duplicates the PeerInfo -> ID transformation in the
|
||||
// bitswap network adapter. Not to worry. This network client will be
|
||||
@ -113,7 +113,7 @@ func (nc *networkClient) FindProvidersAsync(ctx context.Context, k util.Key, max
|
||||
}
|
||||
|
||||
// Provide provides the key to the network
|
||||
func (nc *networkClient) Provide(ctx context.Context, k util.Key) error {
|
||||
func (nc *networkClient) Provide(ctx context.Context, k key.Key) error {
|
||||
return nc.routing.Provide(ctx, k)
|
||||
}
|
||||
|
||||
|
@ -3,7 +3,7 @@
|
||||
package wantlist
|
||||
|
||||
import (
|
||||
u "github.com/ipfs/go-ipfs/util"
|
||||
key "github.com/ipfs/go-ipfs/blocks/key"
|
||||
"sort"
|
||||
"sync"
|
||||
)
|
||||
@ -15,14 +15,14 @@ type ThreadSafe struct {
|
||||
|
||||
// not threadsafe
|
||||
type Wantlist struct {
|
||||
set map[u.Key]Entry
|
||||
set map[key.Key]Entry
|
||||
// TODO provide O(1) len accessor if cost becomes an issue
|
||||
}
|
||||
|
||||
type Entry struct {
|
||||
// TODO consider making entries immutable so they can be shared safely and
|
||||
// slices can be copied efficiently.
|
||||
Key u.Key
|
||||
Key key.Key
|
||||
Priority int
|
||||
}
|
||||
|
||||
@ -40,25 +40,25 @@ func NewThreadSafe() *ThreadSafe {
|
||||
|
||||
func New() *Wantlist {
|
||||
return &Wantlist{
|
||||
set: make(map[u.Key]Entry),
|
||||
set: make(map[key.Key]Entry),
|
||||
}
|
||||
}
|
||||
|
||||
func (w *ThreadSafe) Add(k u.Key, priority int) {
|
||||
func (w *ThreadSafe) Add(k key.Key, priority int) {
|
||||
// TODO rm defer for perf
|
||||
w.lk.Lock()
|
||||
defer w.lk.Unlock()
|
||||
w.Wantlist.Add(k, priority)
|
||||
}
|
||||
|
||||
func (w *ThreadSafe) Remove(k u.Key) {
|
||||
func (w *ThreadSafe) Remove(k key.Key) {
|
||||
// TODO rm defer for perf
|
||||
w.lk.Lock()
|
||||
defer w.lk.Unlock()
|
||||
w.Wantlist.Remove(k)
|
||||
}
|
||||
|
||||
func (w *ThreadSafe) Contains(k u.Key) (Entry, bool) {
|
||||
func (w *ThreadSafe) Contains(k key.Key) (Entry, bool) {
|
||||
// TODO rm defer for perf
|
||||
w.lk.RLock()
|
||||
defer w.lk.RUnlock()
|
||||
@ -87,7 +87,7 @@ func (w *Wantlist) Len() int {
|
||||
return len(w.set)
|
||||
}
|
||||
|
||||
func (w *Wantlist) Add(k u.Key, priority int) {
|
||||
func (w *Wantlist) Add(k key.Key, priority int) {
|
||||
if _, ok := w.set[k]; ok {
|
||||
return
|
||||
}
|
||||
@ -97,11 +97,11 @@ func (w *Wantlist) Add(k u.Key, priority int) {
|
||||
}
|
||||
}
|
||||
|
||||
func (w *Wantlist) Remove(k u.Key) {
|
||||
func (w *Wantlist) Remove(k key.Key) {
|
||||
delete(w.set, k)
|
||||
}
|
||||
|
||||
func (w *Wantlist) Contains(k u.Key) (Entry, bool) {
|
||||
func (w *Wantlist) Contains(k key.Key) (Entry, bool) {
|
||||
e, ok := w.set[k]
|
||||
return e, ok
|
||||
}
|
||||
|
@ -5,12 +5,12 @@ import (
|
||||
"time"
|
||||
|
||||
context "github.com/ipfs/go-ipfs/Godeps/_workspace/src/golang.org/x/net/context"
|
||||
key "github.com/ipfs/go-ipfs/blocks/key"
|
||||
engine "github.com/ipfs/go-ipfs/exchange/bitswap/decision"
|
||||
bsmsg "github.com/ipfs/go-ipfs/exchange/bitswap/message"
|
||||
bsnet "github.com/ipfs/go-ipfs/exchange/bitswap/network"
|
||||
wantlist "github.com/ipfs/go-ipfs/exchange/bitswap/wantlist"
|
||||
peer "github.com/ipfs/go-ipfs/p2p/peer"
|
||||
u "github.com/ipfs/go-ipfs/util"
|
||||
)
|
||||
|
||||
type WantManager struct {
|
||||
@ -46,7 +46,7 @@ type msgPair struct {
|
||||
|
||||
type cancellation struct {
|
||||
who peer.ID
|
||||
blk u.Key
|
||||
blk key.Key
|
||||
}
|
||||
|
||||
type msgQueue struct {
|
||||
@ -60,16 +60,16 @@ type msgQueue struct {
|
||||
done chan struct{}
|
||||
}
|
||||
|
||||
func (pm *WantManager) WantBlocks(ks []u.Key) {
|
||||
func (pm *WantManager) WantBlocks(ks []key.Key) {
|
||||
log.Infof("want blocks: %s", ks)
|
||||
pm.addEntries(ks, false)
|
||||
}
|
||||
|
||||
func (pm *WantManager) CancelWants(ks []u.Key) {
|
||||
func (pm *WantManager) CancelWants(ks []key.Key) {
|
||||
pm.addEntries(ks, true)
|
||||
}
|
||||
|
||||
func (pm *WantManager) addEntries(ks []u.Key, cancel bool) {
|
||||
func (pm *WantManager) addEntries(ks []key.Key, cancel bool) {
|
||||
var entries []*bsmsg.Entry
|
||||
for i, k := range ks {
|
||||
entries = append(entries, &bsmsg.Entry{
|
||||
|
@ -7,7 +7,7 @@ import (
|
||||
|
||||
process "github.com/ipfs/go-ipfs/Godeps/_workspace/src/github.com/jbenet/goprocess"
|
||||
context "github.com/ipfs/go-ipfs/Godeps/_workspace/src/golang.org/x/net/context"
|
||||
u "github.com/ipfs/go-ipfs/util"
|
||||
key "github.com/ipfs/go-ipfs/blocks/key"
|
||||
)
|
||||
|
||||
var TaskWorkerCount = 8
|
||||
@ -104,9 +104,9 @@ func (bs *Bitswap) provideWorker(ctx context.Context) {
|
||||
|
||||
func (bs *Bitswap) provideCollector(ctx context.Context) {
|
||||
defer close(bs.provideKeys)
|
||||
var toProvide []u.Key
|
||||
var nextKey u.Key
|
||||
var keysOut chan u.Key
|
||||
var toProvide []key.Key
|
||||
var nextKey key.Key
|
||||
var keysOut chan key.Key
|
||||
|
||||
for {
|
||||
select {
|
||||
|
Reference in New Issue
Block a user