mirror of
https://github.com/ipfs/kubo.git
synced 2025-07-01 19:24:14 +08:00
better query processing (runner)
This commit is contained in:

committed by
Brian Tiger Chow

parent
98c3afeecf
commit
a114e9cd12
@ -177,6 +177,9 @@ func (dht *IpfsDHT) sendRequest(ctx context.Context, p *peer.Peer, pmes *Message
|
|||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
if rmes == nil {
|
||||||
|
return nil, errors.New("no response to request")
|
||||||
|
}
|
||||||
|
|
||||||
rtt := time.Since(start)
|
rtt := time.Since(start)
|
||||||
rmes.Peer().SetLatency(rtt)
|
rmes.Peer().SetLatency(rtt)
|
||||||
@ -218,19 +221,22 @@ func (dht *IpfsDHT) getValueOrPeers(ctx context.Context, p *peer.Peer,
|
|||||||
return nil, nil, err
|
return nil, nil, err
|
||||||
}
|
}
|
||||||
|
|
||||||
|
u.POut("pmes.GetValue() %v\n", pmes.GetValue())
|
||||||
if value := pmes.GetValue(); value != nil {
|
if value := pmes.GetValue(); value != nil {
|
||||||
// Success! We were given the value
|
// Success! We were given the value
|
||||||
|
u.POut("getValueOrPeers: got value\n")
|
||||||
return value, nil, nil
|
return value, nil, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// TODO decide on providers. This probably shouldn't be happening.
|
// TODO decide on providers. This probably shouldn't be happening.
|
||||||
// if prv := pmes.GetProviderPeers(); prv != nil && len(prv) > 0 {
|
if prv := pmes.GetProviderPeers(); prv != nil && len(prv) > 0 {
|
||||||
// val, err := dht.getFromPeerList(key, timeout,, level)
|
val, err := dht.getFromPeerList(ctx, key, prv, level)
|
||||||
// if err != nil {
|
if err != nil {
|
||||||
// return nil, nil, err
|
return nil, nil, err
|
||||||
// }
|
}
|
||||||
// return val, nil, nil
|
u.POut("getValueOrPeers: get from providers\n")
|
||||||
// }
|
return val, nil, nil
|
||||||
|
}
|
||||||
|
|
||||||
// Perhaps we were given closer peers
|
// Perhaps we were given closer peers
|
||||||
var peers []*peer.Peer
|
var peers []*peer.Peer
|
||||||
@ -256,10 +262,12 @@ func (dht *IpfsDHT) getValueOrPeers(ctx context.Context, p *peer.Peer,
|
|||||||
}
|
}
|
||||||
|
|
||||||
if len(peers) > 0 {
|
if len(peers) > 0 {
|
||||||
|
u.POut("getValueOrPeers: peers\n")
|
||||||
return nil, peers, nil
|
return nil, peers, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
return nil, nil, errors.New("NotFound. did not get value or closer peers.")
|
u.POut("getValueOrPeers: u.ErrNotFound\n")
|
||||||
|
return nil, nil, u.ErrNotFound
|
||||||
}
|
}
|
||||||
|
|
||||||
// getValueSingle simply performs the get value RPC with the given parameters
|
// getValueSingle simply performs the get value RPC with the given parameters
|
||||||
|
@ -18,14 +18,23 @@ import (
|
|||||||
"time"
|
"time"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
// mesHandleFunc is a function that takes in outgoing messages
|
||||||
|
// and can respond to them, simulating other peers on the network.
|
||||||
|
// returning nil will chose not to respond and pass the message onto the
|
||||||
|
// next registered handler
|
||||||
|
type mesHandleFunc func(msg.NetMessage) msg.NetMessage
|
||||||
|
|
||||||
// fauxNet is a standin for a swarm.Network in order to more easily recreate
|
// fauxNet is a standin for a swarm.Network in order to more easily recreate
|
||||||
// different testing scenarios
|
// different testing scenarios
|
||||||
type fauxSender struct {
|
type fauxSender struct {
|
||||||
handlers []mesHandleFunc
|
handlers []mesHandleFunc
|
||||||
}
|
}
|
||||||
|
|
||||||
func (f *fauxSender) SendRequest(ctx context.Context, m msg.NetMessage) (msg.NetMessage, error) {
|
func (f *fauxSender) AddHandler(fn func(msg.NetMessage) msg.NetMessage) {
|
||||||
|
f.handlers = append(f.handlers, fn)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (f *fauxSender) SendRequest(ctx context.Context, m msg.NetMessage) (msg.NetMessage, error) {
|
||||||
for _, h := range f.handlers {
|
for _, h := range f.handlers {
|
||||||
reply := h(m)
|
reply := h(m)
|
||||||
if reply != nil {
|
if reply != nil {
|
||||||
@ -33,7 +42,12 @@ func (f *fauxSender) SendRequest(ctx context.Context, m msg.NetMessage) (msg.Net
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
return nil, nil
|
// no reply? ok force a timeout
|
||||||
|
select {
|
||||||
|
case <-ctx.Done():
|
||||||
|
}
|
||||||
|
|
||||||
|
return nil, ctx.Err()
|
||||||
}
|
}
|
||||||
|
|
||||||
func (f *fauxSender) SendMessage(ctx context.Context, m msg.NetMessage) error {
|
func (f *fauxSender) SendMessage(ctx context.Context, m msg.NetMessage) error {
|
||||||
@ -49,17 +63,6 @@ func (f *fauxSender) SendMessage(ctx context.Context, m msg.NetMessage) error {
|
|||||||
// fauxNet is a standin for a swarm.Network in order to more easily recreate
|
// fauxNet is a standin for a swarm.Network in order to more easily recreate
|
||||||
// different testing scenarios
|
// different testing scenarios
|
||||||
type fauxNet struct {
|
type fauxNet struct {
|
||||||
handlers []mesHandleFunc
|
|
||||||
}
|
|
||||||
|
|
||||||
// mesHandleFunc is a function that takes in outgoing messages
|
|
||||||
// and can respond to them, simulating other peers on the network.
|
|
||||||
// returning nil will chose not to respond and pass the message onto the
|
|
||||||
// next registered handler
|
|
||||||
type mesHandleFunc func(msg.NetMessage) msg.NetMessage
|
|
||||||
|
|
||||||
func (f *fauxNet) AddHandler(fn func(msg.NetMessage) msg.NetMessage) {
|
|
||||||
f.handlers = append(f.handlers, fn)
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// DialPeer attempts to establish a connection to a given peer
|
// DialPeer attempts to establish a connection to a given peer
|
||||||
@ -98,25 +101,23 @@ func TestGetFailures(t *testing.T) {
|
|||||||
local.ID = peer.ID("test_peer")
|
local.ID = peer.ID("test_peer")
|
||||||
|
|
||||||
d := NewDHT(local, peerstore, fn, fs, ds.NewMapDatastore())
|
d := NewDHT(local, peerstore, fn, fs, ds.NewMapDatastore())
|
||||||
|
|
||||||
other := &peer.Peer{ID: peer.ID("other_peer")}
|
other := &peer.Peer{ID: peer.ID("other_peer")}
|
||||||
|
|
||||||
d.Start()
|
|
||||||
|
|
||||||
d.Update(other)
|
d.Update(other)
|
||||||
|
|
||||||
// This one should time out
|
// This one should time out
|
||||||
|
// u.POut("Timout Test\n")
|
||||||
_, err := d.GetValue(u.Key("test"), time.Millisecond*10)
|
_, err := d.GetValue(u.Key("test"), time.Millisecond*10)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
if err != u.ErrTimeout {
|
if err != context.DeadlineExceeded {
|
||||||
t.Fatal("Got different error than we expected.")
|
t.Fatal("Got different error than we expected", err)
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
t.Fatal("Did not get expected error!")
|
t.Fatal("Did not get expected error!")
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// u.POut("NotFound Test\n")
|
||||||
// Reply with failures to every message
|
// Reply with failures to every message
|
||||||
fn.AddHandler(func(mes msg.NetMessage) msg.NetMessage {
|
fs.AddHandler(func(mes msg.NetMessage) msg.NetMessage {
|
||||||
pmes := new(Message)
|
pmes := new(Message)
|
||||||
err := proto.Unmarshal(mes.Data(), pmes)
|
err := proto.Unmarshal(mes.Data(), pmes)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
@ -140,18 +141,7 @@ func TestGetFailures(t *testing.T) {
|
|||||||
t.Fatal("expected error, got none.")
|
t.Fatal("expected error, got none.")
|
||||||
}
|
}
|
||||||
|
|
||||||
success := make(chan struct{})
|
fs.handlers = nil
|
||||||
fn.handlers = nil
|
|
||||||
fn.AddHandler(func(mes msg.NetMessage) msg.NetMessage {
|
|
||||||
resp := new(Message)
|
|
||||||
err := proto.Unmarshal(mes.Data(), resp)
|
|
||||||
if err != nil {
|
|
||||||
t.Fatal(err)
|
|
||||||
}
|
|
||||||
success <- struct{}{}
|
|
||||||
return nil
|
|
||||||
})
|
|
||||||
|
|
||||||
// Now we test this DHT's handleGetValue failure
|
// Now we test this DHT's handleGetValue failure
|
||||||
typ := Message_GET_VALUE
|
typ := Message_GET_VALUE
|
||||||
str := "hello"
|
str := "hello"
|
||||||
@ -161,17 +151,32 @@ func TestGetFailures(t *testing.T) {
|
|||||||
Value: []byte{0},
|
Value: []byte{0},
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// u.POut("handleGetValue Test\n")
|
||||||
mes, err := msg.FromObject(other, &req)
|
mes, err := msg.FromObject(other, &req)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Error(err)
|
t.Error(err)
|
||||||
}
|
}
|
||||||
|
|
||||||
mes, err = fs.SendRequest(ctx, mes)
|
mes, err = d.HandleMessage(ctx, mes)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Error(err)
|
t.Error(err)
|
||||||
}
|
}
|
||||||
|
|
||||||
<-success
|
pmes := new(Message)
|
||||||
|
err = proto.Unmarshal(mes.Data(), pmes)
|
||||||
|
if err != nil {
|
||||||
|
t.Fatal(err)
|
||||||
|
}
|
||||||
|
if pmes.GetValue() != nil {
|
||||||
|
t.Fatal("shouldnt have value")
|
||||||
|
}
|
||||||
|
if pmes.GetCloserPeers() != nil {
|
||||||
|
t.Fatal("shouldnt have closer peers")
|
||||||
|
}
|
||||||
|
if pmes.GetProviderPeers() != nil {
|
||||||
|
t.Fatal("shouldnt have provider peers")
|
||||||
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// TODO: Maybe put these in some sort of "ipfs_testutil" package
|
// TODO: Maybe put these in some sort of "ipfs_testutil" package
|
||||||
@ -192,7 +197,6 @@ func TestNotFound(t *testing.T) {
|
|||||||
peerstore := peer.NewPeerstore()
|
peerstore := peer.NewPeerstore()
|
||||||
|
|
||||||
d := NewDHT(local, peerstore, fn, fs, ds.NewMapDatastore())
|
d := NewDHT(local, peerstore, fn, fs, ds.NewMapDatastore())
|
||||||
d.Start()
|
|
||||||
|
|
||||||
var ps []*peer.Peer
|
var ps []*peer.Peer
|
||||||
for i := 0; i < 5; i++ {
|
for i := 0; i < 5; i++ {
|
||||||
@ -201,7 +205,7 @@ func TestNotFound(t *testing.T) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
// Reply with random peers to every message
|
// Reply with random peers to every message
|
||||||
fn.AddHandler(func(mes msg.NetMessage) msg.NetMessage {
|
fs.AddHandler(func(mes msg.NetMessage) msg.NetMessage {
|
||||||
pmes := new(Message)
|
pmes := new(Message)
|
||||||
err := proto.Unmarshal(mes.Data(), pmes)
|
err := proto.Unmarshal(mes.Data(), pmes)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
@ -228,7 +232,8 @@ func TestNotFound(t *testing.T) {
|
|||||||
|
|
||||||
})
|
})
|
||||||
|
|
||||||
_, err := d.GetValue(u.Key("hello"), time.Second*30)
|
v, err := d.GetValue(u.Key("hello"), time.Second*5)
|
||||||
|
u.POut("get value got %v\n", v)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
switch err {
|
switch err {
|
||||||
case u.ErrNotFound:
|
case u.ErrNotFound:
|
||||||
@ -254,7 +259,6 @@ func TestLessThanKResponses(t *testing.T) {
|
|||||||
local.ID = peer.ID("test_peer")
|
local.ID = peer.ID("test_peer")
|
||||||
|
|
||||||
d := NewDHT(local, peerstore, fn, fs, ds.NewMapDatastore())
|
d := NewDHT(local, peerstore, fn, fs, ds.NewMapDatastore())
|
||||||
d.Start()
|
|
||||||
|
|
||||||
var ps []*peer.Peer
|
var ps []*peer.Peer
|
||||||
for i := 0; i < 5; i++ {
|
for i := 0; i < 5; i++ {
|
||||||
@ -264,7 +268,7 @@ func TestLessThanKResponses(t *testing.T) {
|
|||||||
other := _randPeer()
|
other := _randPeer()
|
||||||
|
|
||||||
// Reply with random peers to every message
|
// Reply with random peers to every message
|
||||||
fn.AddHandler(func(mes msg.NetMessage) msg.NetMessage {
|
fs.AddHandler(func(mes msg.NetMessage) msg.NetMessage {
|
||||||
pmes := new(Message)
|
pmes := new(Message)
|
||||||
err := proto.Unmarshal(mes.Data(), pmes)
|
err := proto.Unmarshal(mes.Data(), pmes)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
@ -58,7 +58,10 @@ func (dht *IpfsDHT) handleGetValue(p *peer.Peer, pmes *Message) (*Message, error
|
|||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
|
||||||
// if we have the value, respond with it!
|
// Note: changed the behavior here to return _as much_ info as possible
|
||||||
|
// (potentially all of {value, closer peers, provider})
|
||||||
|
|
||||||
|
// if we have the value, send it back
|
||||||
if err == nil {
|
if err == nil {
|
||||||
u.DOut("handleGetValue success!\n")
|
u.DOut("handleGetValue success!\n")
|
||||||
|
|
||||||
@ -68,7 +71,6 @@ func (dht *IpfsDHT) handleGetValue(p *peer.Peer, pmes *Message) (*Message, error
|
|||||||
}
|
}
|
||||||
|
|
||||||
resp.Value = byts
|
resp.Value = byts
|
||||||
return resp, nil
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// if we know any providers for the requested value, return those.
|
// if we know any providers for the requested value, return those.
|
||||||
@ -76,20 +78,16 @@ func (dht *IpfsDHT) handleGetValue(p *peer.Peer, pmes *Message) (*Message, error
|
|||||||
if len(provs) > 0 {
|
if len(provs) > 0 {
|
||||||
u.DOut("handleGetValue returning %d provider[s]\n", len(provs))
|
u.DOut("handleGetValue returning %d provider[s]\n", len(provs))
|
||||||
resp.ProviderPeers = peersToPBPeers(provs)
|
resp.ProviderPeers = peersToPBPeers(provs)
|
||||||
return resp, nil
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// Find closest peer on given cluster to desired key and reply with that info
|
// Find closest peer on given cluster to desired key and reply with that info
|
||||||
closer := dht.betterPeerToQuery(pmes)
|
closer := dht.betterPeerToQuery(pmes)
|
||||||
if closer == nil {
|
if closer != nil {
|
||||||
u.DOut("handleGetValue could not find a closer node than myself.\n")
|
u.DOut("handleGetValue returning a closer peer: '%s'\n", closer.ID.Pretty())
|
||||||
resp.CloserPeers = nil
|
resp.CloserPeers = peersToPBPeers([]*peer.Peer{closer})
|
||||||
return resp, nil
|
return resp, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// we got a closer peer, it seems. return it.
|
|
||||||
u.DOut("handleGetValue returning a closer peer: '%s'\n", closer.ID.Pretty())
|
|
||||||
resp.CloserPeers = peersToPBPeers([]*peer.Peer{closer})
|
|
||||||
return resp, nil
|
return resp, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -1,19 +1,45 @@
|
|||||||
package dht
|
package dht
|
||||||
|
|
||||||
import (
|
import (
|
||||||
|
"sync"
|
||||||
|
|
||||||
peer "github.com/jbenet/go-ipfs/peer"
|
peer "github.com/jbenet/go-ipfs/peer"
|
||||||
queue "github.com/jbenet/go-ipfs/peer/queue"
|
queue "github.com/jbenet/go-ipfs/peer/queue"
|
||||||
|
kb "github.com/jbenet/go-ipfs/routing/kbucket"
|
||||||
u "github.com/jbenet/go-ipfs/util"
|
u "github.com/jbenet/go-ipfs/util"
|
||||||
|
todoctr "github.com/jbenet/go-ipfs/util/todocounter"
|
||||||
|
|
||||||
context "github.com/jbenet/go-ipfs/Godeps/_workspace/src/code.google.com/p/go.net/context"
|
context "github.com/jbenet/go-ipfs/Godeps/_workspace/src/code.google.com/p/go.net/context"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
const maxQueryConcurrency = 5
|
||||||
|
|
||||||
type dhtQuery struct {
|
type dhtQuery struct {
|
||||||
// a PeerQueue
|
// the key we're querying for
|
||||||
peers queue.PeerQueue
|
key u.Key
|
||||||
|
|
||||||
// the function to execute per peer
|
// the function to execute per peer
|
||||||
qfunc queryFunc
|
qfunc queryFunc
|
||||||
|
|
||||||
|
// the concurrency parameter
|
||||||
|
concurrency int
|
||||||
|
}
|
||||||
|
|
||||||
|
type dhtQueryResult struct {
|
||||||
|
value []byte // GetValue
|
||||||
|
peer *peer.Peer // FindPeer
|
||||||
|
providerPeers []*peer.Peer // GetProviders
|
||||||
|
closerPeers []*peer.Peer // *
|
||||||
|
success bool
|
||||||
|
}
|
||||||
|
|
||||||
|
// constructs query
|
||||||
|
func newQuery(k u.Key, f queryFunc) *dhtQuery {
|
||||||
|
return &dhtQuery{
|
||||||
|
key: k,
|
||||||
|
qfunc: f,
|
||||||
|
concurrency: maxQueryConcurrency,
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// QueryFunc is a function that runs a particular query with a given peer.
|
// QueryFunc is a function that runs a particular query with a given peer.
|
||||||
@ -21,65 +47,170 @@ type dhtQuery struct {
|
|||||||
// - the value
|
// - the value
|
||||||
// - a list of peers potentially better able to serve the query
|
// - a list of peers potentially better able to serve the query
|
||||||
// - an error
|
// - an error
|
||||||
type queryFunc func(context.Context, *peer.Peer) (interface{}, []*peer.Peer, error)
|
type queryFunc func(context.Context, *peer.Peer) (*dhtQueryResult, error)
|
||||||
|
|
||||||
func (q *dhtQuery) Run(ctx context.Context, concurrency int) (interface{}, error) {
|
// Run runs the query at hand. pass in a list of peers to use first.
|
||||||
// get own cancel function to signal when we've found the value
|
func (q *dhtQuery) Run(ctx context.Context, peers []*peer.Peer) (*dhtQueryResult, error) {
|
||||||
|
runner := newQueryRunner(ctx, q)
|
||||||
|
return runner.Run(peers)
|
||||||
|
}
|
||||||
|
|
||||||
|
type dhtQueryRunner struct {
|
||||||
|
|
||||||
|
// the query to run
|
||||||
|
query *dhtQuery
|
||||||
|
|
||||||
|
// peersToQuery is a list of peers remaining to query
|
||||||
|
peersToQuery *queue.ChanQueue
|
||||||
|
|
||||||
|
// peersSeen are all the peers queried. used to prevent querying same peer 2x
|
||||||
|
peersSeen peer.Map
|
||||||
|
|
||||||
|
// rateLimit is a channel used to rate limit our processing (semaphore)
|
||||||
|
rateLimit chan struct{}
|
||||||
|
|
||||||
|
// peersRemaining is a counter of peers remaining (toQuery + processing)
|
||||||
|
peersRemaining todoctr.Counter
|
||||||
|
|
||||||
|
// context
|
||||||
|
ctx context.Context
|
||||||
|
cancel context.CancelFunc
|
||||||
|
|
||||||
|
// result
|
||||||
|
result *dhtQueryResult
|
||||||
|
|
||||||
|
// result errors
|
||||||
|
errs []error
|
||||||
|
|
||||||
|
// lock for concurrent access to fields
|
||||||
|
sync.RWMutex
|
||||||
|
}
|
||||||
|
|
||||||
|
func newQueryRunner(ctx context.Context, q *dhtQuery) *dhtQueryRunner {
|
||||||
ctx, cancel := context.WithCancel(ctx)
|
ctx, cancel := context.WithCancel(ctx)
|
||||||
|
|
||||||
// the variable waiting to be populated upon success
|
return &dhtQueryRunner{
|
||||||
var result interface{}
|
ctx: ctx,
|
||||||
|
cancel: cancel,
|
||||||
// chanQueue is how workers receive their work
|
query: q,
|
||||||
chanQueue := queue.NewChanQueue(ctx, q.peers)
|
peersToQuery: queue.NewChanQueue(ctx, queue.NewXORDistancePQ(q.key)),
|
||||||
|
peersRemaining: todoctr.NewSyncCounter(),
|
||||||
// worker
|
peersSeen: peer.Map{},
|
||||||
worker := func() {
|
rateLimit: make(chan struct{}, q.concurrency),
|
||||||
for {
|
|
||||||
select {
|
|
||||||
case p := <-chanQueue.DeqChan:
|
|
||||||
|
|
||||||
val, closer, err := q.qfunc(ctx, p)
|
|
||||||
if err != nil {
|
|
||||||
u.PErr("error running query: %v\n", err)
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
|
|
||||||
if val != nil {
|
|
||||||
result = val
|
|
||||||
cancel() // signal we're done.
|
|
||||||
return
|
|
||||||
}
|
|
||||||
|
|
||||||
if closer != nil {
|
|
||||||
for _, p := range closer {
|
|
||||||
select {
|
|
||||||
case chanQueue.EnqChan <- p:
|
|
||||||
case <-ctx.Done():
|
|
||||||
return
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
case <-ctx.Done():
|
func (r *dhtQueryRunner) Run(peers []*peer.Peer) (*dhtQueryResult, error) {
|
||||||
return
|
// setup concurrency rate limiting
|
||||||
}
|
for i := 0; i < r.query.concurrency; i++ {
|
||||||
}
|
r.rateLimit <- struct{}{}
|
||||||
}
|
}
|
||||||
|
|
||||||
// launch all workers
|
// add all the peers we got first.
|
||||||
for i := 0; i < concurrency; i++ {
|
for _, p := range peers {
|
||||||
go worker()
|
r.addPeerToQuery(p, nil) // don't have access to self here...
|
||||||
}
|
}
|
||||||
|
|
||||||
// wait until we're done. yep.
|
// wait until we're done. yep.
|
||||||
select {
|
select {
|
||||||
case <-ctx.Done():
|
case <-r.peersRemaining.Done():
|
||||||
|
r.cancel() // ran all and nothing. cancel all outstanding workers.
|
||||||
|
|
||||||
|
r.RLock()
|
||||||
|
defer r.RUnlock()
|
||||||
|
|
||||||
|
if len(r.errs) > 0 {
|
||||||
|
return nil, r.errs[0]
|
||||||
|
}
|
||||||
|
return nil, u.ErrNotFound
|
||||||
|
|
||||||
|
case <-r.ctx.Done():
|
||||||
|
r.RLock()
|
||||||
|
defer r.RUnlock()
|
||||||
|
|
||||||
|
if r.result != nil && r.result.success {
|
||||||
|
return r.result, nil
|
||||||
|
}
|
||||||
|
return nil, r.ctx.Err()
|
||||||
}
|
}
|
||||||
|
|
||||||
if result != nil {
|
|
||||||
return result, nil
|
|
||||||
}
|
}
|
||||||
|
|
||||||
return nil, ctx.Err()
|
func (r *dhtQueryRunner) addPeerToQuery(next *peer.Peer, benchmark *peer.Peer) {
|
||||||
|
if next == nil {
|
||||||
|
// wtf why are peers nil?!?
|
||||||
|
u.PErr("Query getting nil peers!!!\n")
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
// if new peer further away than whom we got it from, bother (loops)
|
||||||
|
if benchmark != nil && kb.Closer(benchmark.ID, next.ID, r.query.key) {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
// if already seen, no need.
|
||||||
|
r.Lock()
|
||||||
|
_, found := r.peersSeen[next.Key()]
|
||||||
|
if found {
|
||||||
|
r.Unlock()
|
||||||
|
return
|
||||||
|
}
|
||||||
|
r.peersSeen[next.Key()] = next
|
||||||
|
r.Unlock()
|
||||||
|
|
||||||
|
// do this after unlocking to prevent possible deadlocks.
|
||||||
|
r.peersRemaining.Increment(1)
|
||||||
|
select {
|
||||||
|
case r.peersToQuery.EnqChan <- next:
|
||||||
|
case <-r.ctx.Done():
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (r *dhtQueryRunner) spawnWorkers(p *peer.Peer) {
|
||||||
|
for {
|
||||||
|
select {
|
||||||
|
case <-r.peersRemaining.Done():
|
||||||
|
return
|
||||||
|
|
||||||
|
case <-r.ctx.Done():
|
||||||
|
return
|
||||||
|
|
||||||
|
case p := <-r.peersToQuery.DeqChan:
|
||||||
|
go r.queryPeer(p)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (r *dhtQueryRunner) queryPeer(p *peer.Peer) {
|
||||||
|
// make sure we rate limit concurrency.
|
||||||
|
select {
|
||||||
|
case <-r.rateLimit:
|
||||||
|
case <-r.ctx.Done():
|
||||||
|
r.peersRemaining.Decrement(1)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
// finally, run the query against this peer
|
||||||
|
res, err := r.query.qfunc(r.ctx, p)
|
||||||
|
|
||||||
|
if err != nil {
|
||||||
|
r.Lock()
|
||||||
|
r.errs = append(r.errs, err)
|
||||||
|
r.Unlock()
|
||||||
|
|
||||||
|
} else if res.success {
|
||||||
|
r.Lock()
|
||||||
|
r.result = res
|
||||||
|
r.Unlock()
|
||||||
|
r.cancel() // signal to everyone that we're done.
|
||||||
|
|
||||||
|
} else if res.closerPeers != nil {
|
||||||
|
for _, next := range res.closerPeers {
|
||||||
|
r.addPeerToQuery(next, p)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// signal we're done proccessing peer p
|
||||||
|
r.peersRemaining.Decrement(1)
|
||||||
|
r.rateLimit <- struct{}{}
|
||||||
}
|
}
|
||||||
|
@ -3,14 +3,11 @@ package dht
|
|||||||
import (
|
import (
|
||||||
"bytes"
|
"bytes"
|
||||||
"encoding/json"
|
"encoding/json"
|
||||||
"errors"
|
|
||||||
"fmt"
|
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
context "github.com/jbenet/go-ipfs/Godeps/_workspace/src/code.google.com/p/go.net/context"
|
context "github.com/jbenet/go-ipfs/Godeps/_workspace/src/code.google.com/p/go.net/context"
|
||||||
|
|
||||||
peer "github.com/jbenet/go-ipfs/peer"
|
peer "github.com/jbenet/go-ipfs/peer"
|
||||||
queue "github.com/jbenet/go-ipfs/peer/queue"
|
|
||||||
kb "github.com/jbenet/go-ipfs/routing/kbucket"
|
kb "github.com/jbenet/go-ipfs/routing/kbucket"
|
||||||
u "github.com/jbenet/go-ipfs/util"
|
u "github.com/jbenet/go-ipfs/util"
|
||||||
)
|
)
|
||||||
@ -24,28 +21,23 @@ import (
|
|||||||
func (dht *IpfsDHT) PutValue(key u.Key, value []byte) error {
|
func (dht *IpfsDHT) PutValue(key u.Key, value []byte) error {
|
||||||
ctx := context.TODO()
|
ctx := context.TODO()
|
||||||
|
|
||||||
query := &dhtQuery{}
|
peers := []*peer.Peer{}
|
||||||
query.peers = queue.NewXORDistancePQ(key)
|
|
||||||
|
|
||||||
// get the peers we need to announce to
|
// get the peers we need to announce to
|
||||||
for _, route := range dht.routingTables {
|
for _, route := range dht.routingTables {
|
||||||
peers := route.NearestPeers(kb.ConvertKey(key), KValue)
|
npeers := route.NearestPeers(kb.ConvertKey(key), KValue)
|
||||||
for _, p := range peers {
|
peers = append(peers, npeers...)
|
||||||
if p == nil {
|
|
||||||
// this shouldn't be happening.
|
|
||||||
panic("p should not be nil")
|
|
||||||
}
|
}
|
||||||
|
|
||||||
query.peers.Enqueue(p)
|
query := newQuery(key, func(ctx context.Context, p *peer.Peer) (*dhtQueryResult, error) {
|
||||||
}
|
err := dht.putValueToNetwork(ctx, p, string(key), value)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
}
|
}
|
||||||
|
return &dhtQueryResult{success: true}, nil
|
||||||
|
})
|
||||||
|
|
||||||
query.qfunc = func(ctx context.Context, p *peer.Peer) (interface{}, []*peer.Peer, error) {
|
_, err := query.Run(ctx, peers)
|
||||||
dht.putValueToNetwork(ctx, p, string(key), value)
|
|
||||||
return nil, nil, nil
|
|
||||||
}
|
|
||||||
|
|
||||||
_, err := query.Run(ctx, query.peers.Len())
|
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -63,7 +55,6 @@ func (dht *IpfsDHT) GetValue(key u.Key, timeout time.Duration) ([]byte, error) {
|
|||||||
val, err := dht.getLocal(key)
|
val, err := dht.getLocal(key)
|
||||||
if err == nil {
|
if err == nil {
|
||||||
ll.Success = true
|
ll.Success = true
|
||||||
u.DOut("Found local, returning.\n")
|
|
||||||
return val, nil
|
return val, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -74,30 +65,33 @@ func (dht *IpfsDHT) GetValue(key u.Key, timeout time.Duration) ([]byte, error) {
|
|||||||
return nil, kb.ErrLookupFailure
|
return nil, kb.ErrLookupFailure
|
||||||
}
|
}
|
||||||
|
|
||||||
query := &dhtQuery{}
|
// setup the Query
|
||||||
query.peers = queue.NewXORDistancePQ(key)
|
query := newQuery(key, func(ctx context.Context, p *peer.Peer) (*dhtQueryResult, error) {
|
||||||
|
|
||||||
// get the peers we need to announce to
|
val, peers, err := dht.getValueOrPeers(ctx, p, key, routeLevel)
|
||||||
for _, p := range closest {
|
|
||||||
query.peers.Enqueue(p)
|
|
||||||
}
|
|
||||||
|
|
||||||
// setup the Query Function
|
|
||||||
query.qfunc = func(ctx context.Context, p *peer.Peer) (interface{}, []*peer.Peer, error) {
|
|
||||||
return dht.getValueOrPeers(ctx, p, key, routeLevel)
|
|
||||||
}
|
|
||||||
|
|
||||||
// run it!
|
|
||||||
result, err := query.Run(ctx, query.peers.Len())
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
|
||||||
byt, ok := result.([]byte)
|
res := &dhtQueryResult{value: val, closerPeers: peers}
|
||||||
if !ok {
|
if val != nil {
|
||||||
return nil, fmt.Errorf("received non-byte slice value")
|
res.success = true
|
||||||
}
|
}
|
||||||
return byt, nil
|
|
||||||
|
return res, nil
|
||||||
|
})
|
||||||
|
|
||||||
|
// run it!
|
||||||
|
result, err := query.Run(ctx, closest)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
if result.value == nil {
|
||||||
|
return nil, u.ErrNotFound
|
||||||
|
}
|
||||||
|
|
||||||
|
return result.value, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// Value provider layer of indirection.
|
// Value provider layer of indirection.
|
||||||
@ -278,25 +272,19 @@ func (dht *IpfsDHT) findPeerMultiple(id peer.ID, timeout time.Duration) (*peer.P
|
|||||||
return p, nil
|
return p, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
query := &dhtQuery{}
|
|
||||||
query.peers = queue.NewXORDistancePQ(u.Key(id))
|
|
||||||
|
|
||||||
// get the peers we need to announce to
|
// get the peers we need to announce to
|
||||||
routeLevel := 0
|
routeLevel := 0
|
||||||
peers := dht.routingTables[routeLevel].NearestPeers(kb.ConvertPeerID(id), AlphaValue)
|
peers := dht.routingTables[routeLevel].NearestPeers(kb.ConvertPeerID(id), AlphaValue)
|
||||||
if len(peers) == 0 {
|
if len(peers) == 0 {
|
||||||
return nil, kb.ErrLookupFailure
|
return nil, kb.ErrLookupFailure
|
||||||
}
|
}
|
||||||
for _, p := range peers {
|
|
||||||
query.peers.Enqueue(p)
|
|
||||||
}
|
|
||||||
|
|
||||||
// setup query function
|
// setup query function
|
||||||
query.qfunc = func(ctx context.Context, p *peer.Peer) (interface{}, []*peer.Peer, error) {
|
query := newQuery(u.Key(id), func(ctx context.Context, p *peer.Peer) (*dhtQueryResult, error) {
|
||||||
pmes, err := dht.findPeerSingle(ctx, p, id, routeLevel)
|
pmes, err := dht.findPeerSingle(ctx, p, id, routeLevel)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
u.DErr("getPeer error: %v\n", err)
|
u.DErr("getPeer error: %v\n", err)
|
||||||
return nil, nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
|
||||||
plist := pmes.GetCloserPeers()
|
plist := pmes.GetCloserPeers()
|
||||||
@ -313,25 +301,24 @@ func (dht *IpfsDHT) findPeerMultiple(id peer.ID, timeout time.Duration) (*peer.P
|
|||||||
}
|
}
|
||||||
|
|
||||||
if nxtp.ID.Equal(id) {
|
if nxtp.ID.Equal(id) {
|
||||||
return nxtp, nil, nil
|
return &dhtQueryResult{peer: nxtp, success: true}, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
nxtprs[i] = nxtp
|
nxtprs[i] = nxtp
|
||||||
}
|
}
|
||||||
|
|
||||||
return nil, nxtprs, nil
|
return &dhtQueryResult{closerPeers: nxtprs}, nil
|
||||||
}
|
})
|
||||||
|
|
||||||
p5, err := query.Run(ctx, query.peers.Len())
|
result, err := query.Run(ctx, peers)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
|
||||||
p6, ok := p5.(*peer.Peer)
|
if result.peer == nil {
|
||||||
if !ok {
|
return nil, u.ErrNotFound
|
||||||
return nil, errors.New("received non peer object")
|
|
||||||
}
|
}
|
||||||
return p6, nil
|
return result.peer, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// Ping a peer, log the time it took
|
// Ping a peer, log the time it took
|
||||||
@ -350,21 +337,14 @@ func (dht *IpfsDHT) getDiagnostic(timeout time.Duration) ([]*diagInfo, error) {
|
|||||||
ctx, _ := context.WithTimeout(context.TODO(), timeout)
|
ctx, _ := context.WithTimeout(context.TODO(), timeout)
|
||||||
|
|
||||||
u.DOut("Begin Diagnostic")
|
u.DOut("Begin Diagnostic")
|
||||||
query := &dhtQuery{}
|
peers := dht.routingTables[0].NearestPeers(kb.ConvertPeerID(dht.self.ID), 10)
|
||||||
query.peers = queue.NewXORDistancePQ(u.Key(dht.self.ID))
|
|
||||||
|
|
||||||
targets := dht.routingTables[0].NearestPeers(kb.ConvertPeerID(dht.self.ID), 10)
|
|
||||||
for _, p := range targets {
|
|
||||||
query.peers.Enqueue(p)
|
|
||||||
}
|
|
||||||
|
|
||||||
var out []*diagInfo
|
var out []*diagInfo
|
||||||
|
|
||||||
query.qfunc = func(ctx context.Context, p *peer.Peer) (interface{}, []*peer.Peer, error) {
|
query := newQuery(dht.self.Key(), func(ctx context.Context, p *peer.Peer) (*dhtQueryResult, error) {
|
||||||
pmes := newMessage(Message_DIAGNOSTIC, "", 0)
|
pmes := newMessage(Message_DIAGNOSTIC, "", 0)
|
||||||
rpmes, err := dht.sendRequest(ctx, p, pmes)
|
rpmes, err := dht.sendRequest(ctx, p, pmes)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
|
||||||
dec := json.NewDecoder(bytes.NewBuffer(rpmes.GetValue()))
|
dec := json.NewDecoder(bytes.NewBuffer(rpmes.GetValue()))
|
||||||
@ -377,9 +357,9 @@ func (dht *IpfsDHT) getDiagnostic(timeout time.Duration) ([]*diagInfo, error) {
|
|||||||
|
|
||||||
out = append(out, di)
|
out = append(out, di)
|
||||||
}
|
}
|
||||||
return nil, nil, nil
|
return &dhtQueryResult{success: true}, nil
|
||||||
}
|
})
|
||||||
|
|
||||||
_, err := query.Run(ctx, query.peers.Len())
|
_, err := query.Run(ctx, peers)
|
||||||
return out, err
|
return out, err
|
||||||
}
|
}
|
||||||
|
Reference in New Issue
Block a user