From 52cefb16cd7184ddffe15cd58e85f607e19c37dc Mon Sep 17 00:00:00 2001 From: Juan Batiz-Benet Date: Sun, 21 Sep 2014 18:04:43 -0700 Subject: [PATCH] Routing uses context now @perfmode boom --- exchange/bitswap/bitswap.go | 2 +- exchange/bitswap/network/interface.go | 2 +- routing/dht/dht.go | 8 +-- routing/dht/dht_test.go | 74 ++++++++++++++++++++++++--- routing/dht/ext_test.go | 12 +++-- routing/dht/routing.go | 29 +++-------- routing/routing.go | 12 ++--- 7 files changed, 95 insertions(+), 44 deletions(-) diff --git a/exchange/bitswap/bitswap.go b/exchange/bitswap/bitswap.go index 0eaab521c..4f63e6c8c 100644 --- a/exchange/bitswap/bitswap.go +++ b/exchange/bitswap/bitswap.go @@ -103,7 +103,7 @@ func (bs *bitswap) Block(parent context.Context, k u.Key) (*blocks.Block, error) // it to peers (Partners) whose WantLists include it. func (bs *bitswap) HasBlock(ctx context.Context, blk blocks.Block) error { bs.sendToPeersThatWant(ctx, blk) - return bs.routing.Provide(blk.Key()) + return bs.routing.Provide(ctx, blk.Key()) } // TODO(brian): handle errors diff --git a/exchange/bitswap/network/interface.go b/exchange/bitswap/network/interface.go index a84775c15..15fa9c89e 100644 --- a/exchange/bitswap/network/interface.go +++ b/exchange/bitswap/network/interface.go @@ -49,5 +49,5 @@ type Routing interface { FindProvidersAsync(context.Context, u.Key, int) <-chan *peer.Peer // Provide provides the key to the network - Provide(key u.Key) error + Provide(context.Context, u.Key) error } diff --git a/routing/dht/dht.go b/routing/dht/dht.go index 148168d01..507c19c3f 100644 --- a/routing/dht/dht.go +++ b/routing/dht/dht.go @@ -73,7 +73,7 @@ func NewDHT(p *peer.Peer, ps peer.Peerstore, net inet.Network, sender inet.Sende } // Connect to a new peer at the given address, ping and add to the routing table -func (dht *IpfsDHT) Connect(npeer *peer.Peer) (*peer.Peer, error) { +func (dht *IpfsDHT) Connect(ctx context.Context, npeer *peer.Peer) (*peer.Peer, error) { u.DOut("Connect to new peer: %s\n", npeer.ID.Pretty()) // TODO(jbenet,whyrusleeping) @@ -92,7 +92,7 @@ func (dht *IpfsDHT) Connect(npeer *peer.Peer) (*peer.Peer, error) { // Ping new peer to register in their routing table // NOTE: this should be done better... - err = dht.Ping(npeer, time.Second*2) + err = dht.Ping(ctx, npeer) if err != nil { return nil, fmt.Errorf("failed to ping newly connected peer: %s\n", err) } @@ -497,8 +497,8 @@ func (dht *IpfsDHT) loadProvidableKeys() error { } // Bootstrap builds up list of peers by requesting random peer IDs -func (dht *IpfsDHT) Bootstrap() { +func (dht *IpfsDHT) Bootstrap(ctx context.Context) { id := make([]byte, 16) rand.Read(id) - dht.FindPeer(peer.ID(id), time.Second*10) + dht.FindPeer(ctx, peer.ID(id)) } diff --git a/routing/dht/dht_test.go b/routing/dht/dht_test.go index e3f056ce2..675d80dde 100644 --- a/routing/dht/dht_test.go +++ b/routing/dht/dht_test.go @@ -22,7 +22,7 @@ import ( ) func setupDHT(t *testing.T, p *peer.Peer) *IpfsDHT { - ctx, _ := context.WithCancel(context.TODO()) + ctx := context.Background() peerstore := peer.NewPeerstore() @@ -150,9 +150,11 @@ func TestValueGetSet(t *testing.T) { t.Fatal(err) } - dhtA.PutValue("hello", []byte("world")) + ctxT, _ := context.WithTimeout(context.Background(), time.Second) + dhtA.PutValue(ctxT, "hello", []byte("world")) - val, err := dhtA.GetValue("hello", time.Second*2) + ctxT, _ = context.WithTimeout(context.Background(), time.Second*2) + val, err := dhtA.GetValue(ctxT, "hello") if err != nil { t.Fatal(err) } @@ -208,7 +210,8 @@ func TestProvides(t *testing.T) { time.Sleep(time.Millisecond * 60) - provs, err := dhts[0].FindProviders(u.Key("hello"), time.Second) + ctxT, _ := context.WithTimeout(context.Background(), time.Second) + provs, err := dhts[0].FindProviders(ctxT, u.Key("hello")) if err != nil { t.Fatal(err) } @@ -218,6 +221,63 @@ func TestProvides(t *testing.T) { } } +func TestProvidesAsync(t *testing.T) { + // t.Skip("skipping test to debug another") + + u.Debug = false + + _, peers, dhts := setupDHTS(4, t) + defer func() { + for i := 0; i < 4; i++ { + dhts[i].Halt() + defer dhts[i].network.Close() + } + }() + + _, err := dhts[0].Connect(peers[1]) + if err != nil { + t.Fatal(err) + } + + _, err = dhts[1].Connect(peers[2]) + if err != nil { + t.Fatal(err) + } + + _, err = dhts[1].Connect(peers[3]) + if err != nil { + t.Fatal(err) + } + + err = dhts[3].putLocal(u.Key("hello"), []byte("world")) + if err != nil { + t.Fatal(err) + } + + bits, err := dhts[3].getLocal(u.Key("hello")) + if err != nil && bytes.Equal(bits, []byte("world")) { + t.Fatal(err) + } + + err = dhts[3].Provide(u.Key("hello")) + if err != nil { + t.Fatal(err) + } + + time.Sleep(time.Millisecond * 60) + + ctx, _ := context.WithTimeout(context.TODO(), time.Millisecond*300) + provs := dhts[0].FindProvidersAsync(ctx, u.Key("hello"), 5) + select { + case p := <-provs: + if !p.ID.Equal(dhts[3].self.ID) { + t.Fatalf("got a provider, but not the right one. %v", p.ID.Pretty()) + } + case <-ctx.Done(): + t.Fatal("Didnt get back providers") + } +} + func TestLayeredGet(t *testing.T) { // t.Skip("skipping test to debug another") @@ -257,7 +317,8 @@ func TestLayeredGet(t *testing.T) { time.Sleep(time.Millisecond * 60) - val, err := dhts[0].GetValue(u.Key("hello"), time.Second) + ctxT, _ := context.WithTimeout(context.Background(), time.Second) + val, err := dhts[0].GetValue(ctxT, u.Key("hello")) if err != nil { t.Fatal(err) } @@ -296,7 +357,8 @@ func TestFindPeer(t *testing.T) { t.Fatal(err) } - p, err := dhts[0].FindPeer(peers[2].ID, time.Second) + ctxT, _ := context.WithTimeout(context.Background(), time.Second) + p, err := dhts[0].FindPeer(ctxT, peers[2].ID) if err != nil { t.Fatal(err) } diff --git a/routing/dht/ext_test.go b/routing/dht/ext_test.go index 26fbfea35..07999e651 100644 --- a/routing/dht/ext_test.go +++ b/routing/dht/ext_test.go @@ -108,7 +108,8 @@ func TestGetFailures(t *testing.T) { // This one should time out // u.POut("Timout Test\n") - _, err := d.GetValue(u.Key("test"), time.Millisecond*10) + ctx1, _ := context.WithTimeout(context.Background(), time.Second) + _, err := d.GetValue(ctx1, u.Key("test")) if err != nil { if err != context.DeadlineExceeded { t.Fatal("Got different error than we expected", err) @@ -134,7 +135,8 @@ func TestGetFailures(t *testing.T) { }) // This one should fail with NotFound - _, err = d.GetValue(u.Key("test"), time.Millisecond*1000) + ctx2, _ := context.WithTimeout(context.Background(), time.Second) + _, err = d.GetValue(ctx2, u.Key("test")) if err != nil { if err != u.ErrNotFound { t.Fatalf("Expected ErrNotFound, got: %s", err) @@ -236,7 +238,8 @@ func TestNotFound(t *testing.T) { }) - v, err := d.GetValue(u.Key("hello"), time.Second*5) + ctx, _ := context.WithTimeout(context.Background(), time.Second*5) + v, err := d.GetValue(ctx, u.Key("hello")) u.DOut("get value got %v\n", v) if err != nil { switch err { @@ -299,7 +302,8 @@ func TestLessThanKResponses(t *testing.T) { }) - _, err := d.GetValue(u.Key("hello"), time.Second*30) + ctx, _ := context.WithTimeout(context.Background(), time.Second*30) + _, err := d.GetValue(ctx, u.Key("hello")) if err != nil { switch err { case u.ErrNotFound: diff --git a/routing/dht/routing.go b/routing/dht/routing.go index acb4cab45..9f4a916e7 100644 --- a/routing/dht/routing.go +++ b/routing/dht/routing.go @@ -3,7 +3,6 @@ package dht import ( "bytes" "encoding/json" - "time" context "github.com/jbenet/go-ipfs/Godeps/_workspace/src/code.google.com/p/go.net/context" @@ -18,9 +17,7 @@ import ( // PutValue adds value corresponding to given Key. // This is the top level "Store" operation of the DHT -func (dht *IpfsDHT) PutValue(key u.Key, value []byte) error { - ctx := context.TODO() - +func (dht *IpfsDHT) PutValue(ctx context.Context, key u.Key, value []byte) error { peers := []*peer.Peer{} // get the peers we need to announce to @@ -46,12 +43,10 @@ func (dht *IpfsDHT) PutValue(key u.Key, value []byte) error { // GetValue searches for the value corresponding to given Key. // If the search does not succeed, a multiaddr string of a closer peer is // returned along with util.ErrSearchIncomplete -func (dht *IpfsDHT) GetValue(key u.Key, timeout time.Duration) ([]byte, error) { +func (dht *IpfsDHT) GetValue(ctx context.Context, key u.Key) ([]byte, error) { ll := startNewRPC("GET") defer ll.EndAndPrint() - ctx, _ := context.WithTimeout(context.TODO(), timeout) - // If we have it local, dont bother doing an RPC! // NOTE: this might not be what we want to do... val, err := dht.getLocal(key) @@ -101,8 +96,7 @@ func (dht *IpfsDHT) GetValue(key u.Key, timeout time.Duration) ([]byte, error) { // This is what DSHTs (Coral and MainlineDHT) do to store large values in a DHT. // Provide makes this node announce that it can provide a value for the given key -func (dht *IpfsDHT) Provide(key u.Key) error { - ctx := context.TODO() +func (dht *IpfsDHT) Provide(ctx context.Context, key u.Key) error { dht.providers.AddProvider(key, dht.self) peers := dht.routingTables[0].NearestPeers(kb.ConvertKey(key), PoolSize) @@ -174,12 +168,10 @@ func (dht *IpfsDHT) addPeerListAsync(k u.Key, peers []*Message_Peer, ps *peerSet } // FindProviders searches for peers who can provide the value for given key. -func (dht *IpfsDHT) FindProviders(key u.Key, timeout time.Duration) ([]*peer.Peer, error) { +func (dht *IpfsDHT) FindProviders(ctx context.Context, key u.Key) ([]*peer.Peer, error) { ll := startNewRPC("FindProviders") ll.EndAndPrint() - ctx, _ := context.WithTimeout(context.TODO(), timeout) - // get closest peer u.DOut("Find providers for: '%s'\n", key) p := dht.routingTables[0].NearestPeer(kb.ConvertKey(key)) @@ -223,8 +215,7 @@ func (dht *IpfsDHT) FindProviders(key u.Key, timeout time.Duration) ([]*peer.Pee // Find specific Peer // FindPeer searches for a peer with given ID. -func (dht *IpfsDHT) FindPeer(id peer.ID, timeout time.Duration) (*peer.Peer, error) { - ctx, _ := context.WithTimeout(context.TODO(), timeout) +func (dht *IpfsDHT) FindPeer(ctx context.Context, id peer.ID) (*peer.Peer, error) { // Check if were already connected to them p, _ := dht.Find(id) @@ -266,8 +257,7 @@ func (dht *IpfsDHT) FindPeer(id peer.ID, timeout time.Duration) (*peer.Peer, err return nil, u.ErrNotFound } -func (dht *IpfsDHT) findPeerMultiple(id peer.ID, timeout time.Duration) (*peer.Peer, error) { - ctx, _ := context.WithTimeout(context.TODO(), timeout) +func (dht *IpfsDHT) findPeerMultiple(ctx context.Context, id peer.ID) (*peer.Peer, error) { // Check if were already connected to them p, _ := dht.Find(id) @@ -325,9 +315,7 @@ func (dht *IpfsDHT) findPeerMultiple(id peer.ID, timeout time.Duration) (*peer.P } // Ping a peer, log the time it took -func (dht *IpfsDHT) Ping(p *peer.Peer, timeout time.Duration) error { - ctx, _ := context.WithTimeout(context.TODO(), timeout) - +func (dht *IpfsDHT) Ping(ctx context.Context, p *peer.Peer) error { // Thoughts: maybe this should accept an ID and do a peer lookup? u.DOut("Enter Ping.\n") @@ -336,8 +324,7 @@ func (dht *IpfsDHT) Ping(p *peer.Peer, timeout time.Duration) error { return err } -func (dht *IpfsDHT) getDiagnostic(timeout time.Duration) ([]*diagInfo, error) { - ctx, _ := context.WithTimeout(context.TODO(), timeout) +func (dht *IpfsDHT) getDiagnostic(ctx context.Context) ([]*diagInfo, error) { u.DOut("Begin Diagnostic") peers := dht.routingTables[0].NearestPeers(kb.ConvertPeerID(dht.self.ID), 10) diff --git a/routing/routing.go b/routing/routing.go index 872bad6f8..4669fb48c 100644 --- a/routing/routing.go +++ b/routing/routing.go @@ -1,8 +1,6 @@ package routing import ( - "time" - context "github.com/jbenet/go-ipfs/Godeps/_workspace/src/code.google.com/p/go.net/context" peer "github.com/jbenet/go-ipfs/peer" @@ -17,22 +15,22 @@ type IpfsRouting interface { // Basic Put/Get // PutValue adds value corresponding to given Key. - PutValue(key u.Key, value []byte) error + PutValue(context.Context, u.Key, []byte) error // GetValue searches for the value corresponding to given Key. - GetValue(key u.Key, timeout time.Duration) ([]byte, error) + GetValue(context.Context, u.Key) ([]byte, error) // Value provider layer of indirection. // This is what DSHTs (Coral and MainlineDHT) do to store large values in a DHT. // Announce that this node can provide value for given key - Provide(key u.Key) error + Provide(context.Context, u.Key) error // FindProviders searches for peers who can provide the value for given key. - FindProviders(key u.Key, timeout time.Duration) ([]*peer.Peer, error) + FindProviders(context.Context, u.Key) ([]*peer.Peer, error) // Find specific Peer // FindPeer searches for a peer with given ID. - FindPeer(id peer.ID, timeout time.Duration) (*peer.Peer, error) + FindPeer(context.Context, peer.ID) (*peer.Peer, error) }