mirror of
https://github.com/ipfs/kubo.git
synced 2025-07-02 03:28:25 +08:00
Routing uses context now
@perfmode boom
This commit is contained in:

committed by
Brian Tiger Chow

parent
8c35988b8d
commit
52cefb16cd
@ -103,7 +103,7 @@ func (bs *bitswap) Block(parent context.Context, k u.Key) (*blocks.Block, error)
|
|||||||
// it to peers (Partners) whose WantLists include it.
|
// it to peers (Partners) whose WantLists include it.
|
||||||
func (bs *bitswap) HasBlock(ctx context.Context, blk blocks.Block) error {
|
func (bs *bitswap) HasBlock(ctx context.Context, blk blocks.Block) error {
|
||||||
bs.sendToPeersThatWant(ctx, blk)
|
bs.sendToPeersThatWant(ctx, blk)
|
||||||
return bs.routing.Provide(blk.Key())
|
return bs.routing.Provide(ctx, blk.Key())
|
||||||
}
|
}
|
||||||
|
|
||||||
// TODO(brian): handle errors
|
// TODO(brian): handle errors
|
||||||
|
@ -49,5 +49,5 @@ type Routing interface {
|
|||||||
FindProvidersAsync(context.Context, u.Key, int) <-chan *peer.Peer
|
FindProvidersAsync(context.Context, u.Key, int) <-chan *peer.Peer
|
||||||
|
|
||||||
// Provide provides the key to the network
|
// Provide provides the key to the network
|
||||||
Provide(key u.Key) error
|
Provide(context.Context, u.Key) error
|
||||||
}
|
}
|
||||||
|
@ -73,7 +73,7 @@ func NewDHT(p *peer.Peer, ps peer.Peerstore, net inet.Network, sender inet.Sende
|
|||||||
}
|
}
|
||||||
|
|
||||||
// Connect to a new peer at the given address, ping and add to the routing table
|
// Connect to a new peer at the given address, ping and add to the routing table
|
||||||
func (dht *IpfsDHT) Connect(npeer *peer.Peer) (*peer.Peer, error) {
|
func (dht *IpfsDHT) Connect(ctx context.Context, npeer *peer.Peer) (*peer.Peer, error) {
|
||||||
u.DOut("Connect to new peer: %s\n", npeer.ID.Pretty())
|
u.DOut("Connect to new peer: %s\n", npeer.ID.Pretty())
|
||||||
|
|
||||||
// TODO(jbenet,whyrusleeping)
|
// TODO(jbenet,whyrusleeping)
|
||||||
@ -92,7 +92,7 @@ func (dht *IpfsDHT) Connect(npeer *peer.Peer) (*peer.Peer, error) {
|
|||||||
|
|
||||||
// Ping new peer to register in their routing table
|
// Ping new peer to register in their routing table
|
||||||
// NOTE: this should be done better...
|
// NOTE: this should be done better...
|
||||||
err = dht.Ping(npeer, time.Second*2)
|
err = dht.Ping(ctx, npeer)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, fmt.Errorf("failed to ping newly connected peer: %s\n", err)
|
return nil, fmt.Errorf("failed to ping newly connected peer: %s\n", err)
|
||||||
}
|
}
|
||||||
@ -497,8 +497,8 @@ func (dht *IpfsDHT) loadProvidableKeys() error {
|
|||||||
}
|
}
|
||||||
|
|
||||||
// Bootstrap builds up list of peers by requesting random peer IDs
|
// Bootstrap builds up list of peers by requesting random peer IDs
|
||||||
func (dht *IpfsDHT) Bootstrap() {
|
func (dht *IpfsDHT) Bootstrap(ctx context.Context) {
|
||||||
id := make([]byte, 16)
|
id := make([]byte, 16)
|
||||||
rand.Read(id)
|
rand.Read(id)
|
||||||
dht.FindPeer(peer.ID(id), time.Second*10)
|
dht.FindPeer(ctx, peer.ID(id))
|
||||||
}
|
}
|
||||||
|
@ -22,7 +22,7 @@ import (
|
|||||||
)
|
)
|
||||||
|
|
||||||
func setupDHT(t *testing.T, p *peer.Peer) *IpfsDHT {
|
func setupDHT(t *testing.T, p *peer.Peer) *IpfsDHT {
|
||||||
ctx, _ := context.WithCancel(context.TODO())
|
ctx := context.Background()
|
||||||
|
|
||||||
peerstore := peer.NewPeerstore()
|
peerstore := peer.NewPeerstore()
|
||||||
|
|
||||||
@ -150,9 +150,11 @@ func TestValueGetSet(t *testing.T) {
|
|||||||
t.Fatal(err)
|
t.Fatal(err)
|
||||||
}
|
}
|
||||||
|
|
||||||
dhtA.PutValue("hello", []byte("world"))
|
ctxT, _ := context.WithTimeout(context.Background(), time.Second)
|
||||||
|
dhtA.PutValue(ctxT, "hello", []byte("world"))
|
||||||
|
|
||||||
val, err := dhtA.GetValue("hello", time.Second*2)
|
ctxT, _ = context.WithTimeout(context.Background(), time.Second*2)
|
||||||
|
val, err := dhtA.GetValue(ctxT, "hello")
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Fatal(err)
|
t.Fatal(err)
|
||||||
}
|
}
|
||||||
@ -208,7 +210,8 @@ func TestProvides(t *testing.T) {
|
|||||||
|
|
||||||
time.Sleep(time.Millisecond * 60)
|
time.Sleep(time.Millisecond * 60)
|
||||||
|
|
||||||
provs, err := dhts[0].FindProviders(u.Key("hello"), time.Second)
|
ctxT, _ := context.WithTimeout(context.Background(), time.Second)
|
||||||
|
provs, err := dhts[0].FindProviders(ctxT, u.Key("hello"))
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Fatal(err)
|
t.Fatal(err)
|
||||||
}
|
}
|
||||||
@ -218,6 +221,63 @@ func TestProvides(t *testing.T) {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func TestProvidesAsync(t *testing.T) {
|
||||||
|
// t.Skip("skipping test to debug another")
|
||||||
|
|
||||||
|
u.Debug = false
|
||||||
|
|
||||||
|
_, peers, dhts := setupDHTS(4, t)
|
||||||
|
defer func() {
|
||||||
|
for i := 0; i < 4; i++ {
|
||||||
|
dhts[i].Halt()
|
||||||
|
defer dhts[i].network.Close()
|
||||||
|
}
|
||||||
|
}()
|
||||||
|
|
||||||
|
_, err := dhts[0].Connect(peers[1])
|
||||||
|
if err != nil {
|
||||||
|
t.Fatal(err)
|
||||||
|
}
|
||||||
|
|
||||||
|
_, err = dhts[1].Connect(peers[2])
|
||||||
|
if err != nil {
|
||||||
|
t.Fatal(err)
|
||||||
|
}
|
||||||
|
|
||||||
|
_, err = dhts[1].Connect(peers[3])
|
||||||
|
if err != nil {
|
||||||
|
t.Fatal(err)
|
||||||
|
}
|
||||||
|
|
||||||
|
err = dhts[3].putLocal(u.Key("hello"), []byte("world"))
|
||||||
|
if err != nil {
|
||||||
|
t.Fatal(err)
|
||||||
|
}
|
||||||
|
|
||||||
|
bits, err := dhts[3].getLocal(u.Key("hello"))
|
||||||
|
if err != nil && bytes.Equal(bits, []byte("world")) {
|
||||||
|
t.Fatal(err)
|
||||||
|
}
|
||||||
|
|
||||||
|
err = dhts[3].Provide(u.Key("hello"))
|
||||||
|
if err != nil {
|
||||||
|
t.Fatal(err)
|
||||||
|
}
|
||||||
|
|
||||||
|
time.Sleep(time.Millisecond * 60)
|
||||||
|
|
||||||
|
ctx, _ := context.WithTimeout(context.TODO(), time.Millisecond*300)
|
||||||
|
provs := dhts[0].FindProvidersAsync(ctx, u.Key("hello"), 5)
|
||||||
|
select {
|
||||||
|
case p := <-provs:
|
||||||
|
if !p.ID.Equal(dhts[3].self.ID) {
|
||||||
|
t.Fatalf("got a provider, but not the right one. %v", p.ID.Pretty())
|
||||||
|
}
|
||||||
|
case <-ctx.Done():
|
||||||
|
t.Fatal("Didnt get back providers")
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
func TestLayeredGet(t *testing.T) {
|
func TestLayeredGet(t *testing.T) {
|
||||||
// t.Skip("skipping test to debug another")
|
// t.Skip("skipping test to debug another")
|
||||||
|
|
||||||
@ -257,7 +317,8 @@ func TestLayeredGet(t *testing.T) {
|
|||||||
|
|
||||||
time.Sleep(time.Millisecond * 60)
|
time.Sleep(time.Millisecond * 60)
|
||||||
|
|
||||||
val, err := dhts[0].GetValue(u.Key("hello"), time.Second)
|
ctxT, _ := context.WithTimeout(context.Background(), time.Second)
|
||||||
|
val, err := dhts[0].GetValue(ctxT, u.Key("hello"))
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Fatal(err)
|
t.Fatal(err)
|
||||||
}
|
}
|
||||||
@ -296,7 +357,8 @@ func TestFindPeer(t *testing.T) {
|
|||||||
t.Fatal(err)
|
t.Fatal(err)
|
||||||
}
|
}
|
||||||
|
|
||||||
p, err := dhts[0].FindPeer(peers[2].ID, time.Second)
|
ctxT, _ := context.WithTimeout(context.Background(), time.Second)
|
||||||
|
p, err := dhts[0].FindPeer(ctxT, peers[2].ID)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Fatal(err)
|
t.Fatal(err)
|
||||||
}
|
}
|
||||||
|
@ -108,7 +108,8 @@ func TestGetFailures(t *testing.T) {
|
|||||||
|
|
||||||
// This one should time out
|
// This one should time out
|
||||||
// u.POut("Timout Test\n")
|
// u.POut("Timout Test\n")
|
||||||
_, err := d.GetValue(u.Key("test"), time.Millisecond*10)
|
ctx1, _ := context.WithTimeout(context.Background(), time.Second)
|
||||||
|
_, err := d.GetValue(ctx1, u.Key("test"))
|
||||||
if err != nil {
|
if err != nil {
|
||||||
if err != context.DeadlineExceeded {
|
if err != context.DeadlineExceeded {
|
||||||
t.Fatal("Got different error than we expected", err)
|
t.Fatal("Got different error than we expected", err)
|
||||||
@ -134,7 +135,8 @@ func TestGetFailures(t *testing.T) {
|
|||||||
})
|
})
|
||||||
|
|
||||||
// This one should fail with NotFound
|
// This one should fail with NotFound
|
||||||
_, err = d.GetValue(u.Key("test"), time.Millisecond*1000)
|
ctx2, _ := context.WithTimeout(context.Background(), time.Second)
|
||||||
|
_, err = d.GetValue(ctx2, u.Key("test"))
|
||||||
if err != nil {
|
if err != nil {
|
||||||
if err != u.ErrNotFound {
|
if err != u.ErrNotFound {
|
||||||
t.Fatalf("Expected ErrNotFound, got: %s", err)
|
t.Fatalf("Expected ErrNotFound, got: %s", err)
|
||||||
@ -236,7 +238,8 @@ func TestNotFound(t *testing.T) {
|
|||||||
|
|
||||||
})
|
})
|
||||||
|
|
||||||
v, err := d.GetValue(u.Key("hello"), time.Second*5)
|
ctx, _ := context.WithTimeout(context.Background(), time.Second*5)
|
||||||
|
v, err := d.GetValue(ctx, u.Key("hello"))
|
||||||
u.DOut("get value got %v\n", v)
|
u.DOut("get value got %v\n", v)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
switch err {
|
switch err {
|
||||||
@ -299,7 +302,8 @@ func TestLessThanKResponses(t *testing.T) {
|
|||||||
|
|
||||||
})
|
})
|
||||||
|
|
||||||
_, err := d.GetValue(u.Key("hello"), time.Second*30)
|
ctx, _ := context.WithTimeout(context.Background(), time.Second*30)
|
||||||
|
_, err := d.GetValue(ctx, u.Key("hello"))
|
||||||
if err != nil {
|
if err != nil {
|
||||||
switch err {
|
switch err {
|
||||||
case u.ErrNotFound:
|
case u.ErrNotFound:
|
||||||
|
@ -3,7 +3,6 @@ package dht
|
|||||||
import (
|
import (
|
||||||
"bytes"
|
"bytes"
|
||||||
"encoding/json"
|
"encoding/json"
|
||||||
"time"
|
|
||||||
|
|
||||||
context "github.com/jbenet/go-ipfs/Godeps/_workspace/src/code.google.com/p/go.net/context"
|
context "github.com/jbenet/go-ipfs/Godeps/_workspace/src/code.google.com/p/go.net/context"
|
||||||
|
|
||||||
@ -18,9 +17,7 @@ import (
|
|||||||
|
|
||||||
// PutValue adds value corresponding to given Key.
|
// PutValue adds value corresponding to given Key.
|
||||||
// This is the top level "Store" operation of the DHT
|
// This is the top level "Store" operation of the DHT
|
||||||
func (dht *IpfsDHT) PutValue(key u.Key, value []byte) error {
|
func (dht *IpfsDHT) PutValue(ctx context.Context, key u.Key, value []byte) error {
|
||||||
ctx := context.TODO()
|
|
||||||
|
|
||||||
peers := []*peer.Peer{}
|
peers := []*peer.Peer{}
|
||||||
|
|
||||||
// get the peers we need to announce to
|
// get the peers we need to announce to
|
||||||
@ -46,12 +43,10 @@ func (dht *IpfsDHT) PutValue(key u.Key, value []byte) error {
|
|||||||
// GetValue searches for the value corresponding to given Key.
|
// GetValue searches for the value corresponding to given Key.
|
||||||
// If the search does not succeed, a multiaddr string of a closer peer is
|
// If the search does not succeed, a multiaddr string of a closer peer is
|
||||||
// returned along with util.ErrSearchIncomplete
|
// returned along with util.ErrSearchIncomplete
|
||||||
func (dht *IpfsDHT) GetValue(key u.Key, timeout time.Duration) ([]byte, error) {
|
func (dht *IpfsDHT) GetValue(ctx context.Context, key u.Key) ([]byte, error) {
|
||||||
ll := startNewRPC("GET")
|
ll := startNewRPC("GET")
|
||||||
defer ll.EndAndPrint()
|
defer ll.EndAndPrint()
|
||||||
|
|
||||||
ctx, _ := context.WithTimeout(context.TODO(), timeout)
|
|
||||||
|
|
||||||
// If we have it local, dont bother doing an RPC!
|
// If we have it local, dont bother doing an RPC!
|
||||||
// NOTE: this might not be what we want to do...
|
// NOTE: this might not be what we want to do...
|
||||||
val, err := dht.getLocal(key)
|
val, err := dht.getLocal(key)
|
||||||
@ -101,8 +96,7 @@ func (dht *IpfsDHT) GetValue(key u.Key, timeout time.Duration) ([]byte, error) {
|
|||||||
// This is what DSHTs (Coral and MainlineDHT) do to store large values in a DHT.
|
// This is what DSHTs (Coral and MainlineDHT) do to store large values in a DHT.
|
||||||
|
|
||||||
// Provide makes this node announce that it can provide a value for the given key
|
// Provide makes this node announce that it can provide a value for the given key
|
||||||
func (dht *IpfsDHT) Provide(key u.Key) error {
|
func (dht *IpfsDHT) Provide(ctx context.Context, key u.Key) error {
|
||||||
ctx := context.TODO()
|
|
||||||
|
|
||||||
dht.providers.AddProvider(key, dht.self)
|
dht.providers.AddProvider(key, dht.self)
|
||||||
peers := dht.routingTables[0].NearestPeers(kb.ConvertKey(key), PoolSize)
|
peers := dht.routingTables[0].NearestPeers(kb.ConvertKey(key), PoolSize)
|
||||||
@ -174,12 +168,10 @@ func (dht *IpfsDHT) addPeerListAsync(k u.Key, peers []*Message_Peer, ps *peerSet
|
|||||||
}
|
}
|
||||||
|
|
||||||
// FindProviders searches for peers who can provide the value for given key.
|
// FindProviders searches for peers who can provide the value for given key.
|
||||||
func (dht *IpfsDHT) FindProviders(key u.Key, timeout time.Duration) ([]*peer.Peer, error) {
|
func (dht *IpfsDHT) FindProviders(ctx context.Context, key u.Key) ([]*peer.Peer, error) {
|
||||||
ll := startNewRPC("FindProviders")
|
ll := startNewRPC("FindProviders")
|
||||||
ll.EndAndPrint()
|
ll.EndAndPrint()
|
||||||
|
|
||||||
ctx, _ := context.WithTimeout(context.TODO(), timeout)
|
|
||||||
|
|
||||||
// get closest peer
|
// get closest peer
|
||||||
u.DOut("Find providers for: '%s'\n", key)
|
u.DOut("Find providers for: '%s'\n", key)
|
||||||
p := dht.routingTables[0].NearestPeer(kb.ConvertKey(key))
|
p := dht.routingTables[0].NearestPeer(kb.ConvertKey(key))
|
||||||
@ -223,8 +215,7 @@ func (dht *IpfsDHT) FindProviders(key u.Key, timeout time.Duration) ([]*peer.Pee
|
|||||||
// Find specific Peer
|
// Find specific Peer
|
||||||
|
|
||||||
// FindPeer searches for a peer with given ID.
|
// FindPeer searches for a peer with given ID.
|
||||||
func (dht *IpfsDHT) FindPeer(id peer.ID, timeout time.Duration) (*peer.Peer, error) {
|
func (dht *IpfsDHT) FindPeer(ctx context.Context, id peer.ID) (*peer.Peer, error) {
|
||||||
ctx, _ := context.WithTimeout(context.TODO(), timeout)
|
|
||||||
|
|
||||||
// Check if were already connected to them
|
// Check if were already connected to them
|
||||||
p, _ := dht.Find(id)
|
p, _ := dht.Find(id)
|
||||||
@ -266,8 +257,7 @@ func (dht *IpfsDHT) FindPeer(id peer.ID, timeout time.Duration) (*peer.Peer, err
|
|||||||
return nil, u.ErrNotFound
|
return nil, u.ErrNotFound
|
||||||
}
|
}
|
||||||
|
|
||||||
func (dht *IpfsDHT) findPeerMultiple(id peer.ID, timeout time.Duration) (*peer.Peer, error) {
|
func (dht *IpfsDHT) findPeerMultiple(ctx context.Context, id peer.ID) (*peer.Peer, error) {
|
||||||
ctx, _ := context.WithTimeout(context.TODO(), timeout)
|
|
||||||
|
|
||||||
// Check if were already connected to them
|
// Check if were already connected to them
|
||||||
p, _ := dht.Find(id)
|
p, _ := dht.Find(id)
|
||||||
@ -325,9 +315,7 @@ func (dht *IpfsDHT) findPeerMultiple(id peer.ID, timeout time.Duration) (*peer.P
|
|||||||
}
|
}
|
||||||
|
|
||||||
// Ping a peer, log the time it took
|
// Ping a peer, log the time it took
|
||||||
func (dht *IpfsDHT) Ping(p *peer.Peer, timeout time.Duration) error {
|
func (dht *IpfsDHT) Ping(ctx context.Context, p *peer.Peer) error {
|
||||||
ctx, _ := context.WithTimeout(context.TODO(), timeout)
|
|
||||||
|
|
||||||
// Thoughts: maybe this should accept an ID and do a peer lookup?
|
// Thoughts: maybe this should accept an ID and do a peer lookup?
|
||||||
u.DOut("Enter Ping.\n")
|
u.DOut("Enter Ping.\n")
|
||||||
|
|
||||||
@ -336,8 +324,7 @@ func (dht *IpfsDHT) Ping(p *peer.Peer, timeout time.Duration) error {
|
|||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
func (dht *IpfsDHT) getDiagnostic(timeout time.Duration) ([]*diagInfo, error) {
|
func (dht *IpfsDHT) getDiagnostic(ctx context.Context) ([]*diagInfo, error) {
|
||||||
ctx, _ := context.WithTimeout(context.TODO(), timeout)
|
|
||||||
|
|
||||||
u.DOut("Begin Diagnostic")
|
u.DOut("Begin Diagnostic")
|
||||||
peers := dht.routingTables[0].NearestPeers(kb.ConvertPeerID(dht.self.ID), 10)
|
peers := dht.routingTables[0].NearestPeers(kb.ConvertPeerID(dht.self.ID), 10)
|
||||||
|
@ -1,8 +1,6 @@
|
|||||||
package routing
|
package routing
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"time"
|
|
||||||
|
|
||||||
context "github.com/jbenet/go-ipfs/Godeps/_workspace/src/code.google.com/p/go.net/context"
|
context "github.com/jbenet/go-ipfs/Godeps/_workspace/src/code.google.com/p/go.net/context"
|
||||||
|
|
||||||
peer "github.com/jbenet/go-ipfs/peer"
|
peer "github.com/jbenet/go-ipfs/peer"
|
||||||
@ -17,22 +15,22 @@ type IpfsRouting interface {
|
|||||||
// Basic Put/Get
|
// Basic Put/Get
|
||||||
|
|
||||||
// PutValue adds value corresponding to given Key.
|
// PutValue adds value corresponding to given Key.
|
||||||
PutValue(key u.Key, value []byte) error
|
PutValue(context.Context, u.Key, []byte) error
|
||||||
|
|
||||||
// GetValue searches for the value corresponding to given Key.
|
// GetValue searches for the value corresponding to given Key.
|
||||||
GetValue(key u.Key, timeout time.Duration) ([]byte, error)
|
GetValue(context.Context, u.Key) ([]byte, error)
|
||||||
|
|
||||||
// Value provider layer of indirection.
|
// Value provider layer of indirection.
|
||||||
// This is what DSHTs (Coral and MainlineDHT) do to store large values in a DHT.
|
// This is what DSHTs (Coral and MainlineDHT) do to store large values in a DHT.
|
||||||
|
|
||||||
// Announce that this node can provide value for given key
|
// Announce that this node can provide value for given key
|
||||||
Provide(key u.Key) error
|
Provide(context.Context, u.Key) error
|
||||||
|
|
||||||
// FindProviders searches for peers who can provide the value for given key.
|
// FindProviders searches for peers who can provide the value for given key.
|
||||||
FindProviders(key u.Key, timeout time.Duration) ([]*peer.Peer, error)
|
FindProviders(context.Context, u.Key) ([]*peer.Peer, error)
|
||||||
|
|
||||||
// Find specific Peer
|
// Find specific Peer
|
||||||
|
|
||||||
// FindPeer searches for a peer with given ID.
|
// FindPeer searches for a peer with given ID.
|
||||||
FindPeer(id peer.ID, timeout time.Duration) (*peer.Peer, error)
|
FindPeer(context.Context, peer.ID) (*peer.Peer, error)
|
||||||
}
|
}
|
||||||
|
Reference in New Issue
Block a user