From de374226be0229a72e337b486588e792233ec80c Mon Sep 17 00:00:00 2001 From: Brian Tiger Chow Date: Tue, 2 Dec 2014 00:40:50 -0800 Subject: [PATCH] fix(dht/routing) make GetProviders respect context This commit makes GetProviders (sync) respect the request context. It also amends all of GetProviders' callsites to pass a context in. This meant changing the signature of the dht's handlerfunc. I think I'll start referring to the request context as Vito Corleone. cc @whyrusleeping @jbenet License: MIT Signed-off-by: Brian Tiger Chow --- routing/dht/dht.go | 2 +- routing/dht/handlers.go | 25 ++++++++++++------------- routing/dht/providers.go | 10 +++++++--- routing/dht/providers_test.go | 2 +- routing/dht/routing.go | 2 +- 5 files changed, 22 insertions(+), 19 deletions(-) diff --git a/routing/dht/dht.go b/routing/dht/dht.go index 127cfacc5..0277f644b 100644 --- a/routing/dht/dht.go +++ b/routing/dht/dht.go @@ -161,7 +161,7 @@ func (dht *IpfsDHT) HandleMessage(ctx context.Context, mes msg.NetMessage) msg.N } // dispatch handler. - rpmes, err := handler(mPeer, pmes) + rpmes, err := handler(ctx, mPeer, pmes) if err != nil { log.Errorf("handle message error: %s", err) return nil diff --git a/routing/dht/handlers.go b/routing/dht/handlers.go index bd4b813ee..07f21f18a 100644 --- a/routing/dht/handlers.go +++ b/routing/dht/handlers.go @@ -5,20 +5,19 @@ import ( "fmt" "time" - "github.com/jbenet/go-ipfs/Godeps/_workspace/src/code.google.com/p/goprotobuf/proto" - + context "github.com/jbenet/go-ipfs/Godeps/_workspace/src/code.google.com/p/go.net/context" + proto "github.com/jbenet/go-ipfs/Godeps/_workspace/src/code.google.com/p/goprotobuf/proto" + ds "github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-datastore" peer "github.com/jbenet/go-ipfs/peer" pb "github.com/jbenet/go-ipfs/routing/dht/pb" u "github.com/jbenet/go-ipfs/util" - - ds "github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-datastore" ) // The number of closer peers to send on requests. var CloserPeerCount = 4 // dhthandler specifies the signature of functions that handle DHT messages. -type dhtHandler func(peer.Peer, *pb.Message) (*pb.Message, error) +type dhtHandler func(context.Context, peer.Peer, *pb.Message) (*pb.Message, error) func (dht *IpfsDHT) handlerForMsgType(t pb.Message_MessageType) dhtHandler { switch t { @@ -39,7 +38,7 @@ func (dht *IpfsDHT) handlerForMsgType(t pb.Message_MessageType) dhtHandler { } } -func (dht *IpfsDHT) handleGetValue(p peer.Peer, pmes *pb.Message) (*pb.Message, error) { +func (dht *IpfsDHT) handleGetValue(ctx context.Context, p peer.Peer, pmes *pb.Message) (*pb.Message, error) { log.Debugf("%s handleGetValue for key: %s\n", dht.self, pmes.GetKey()) // setup response @@ -85,7 +84,7 @@ func (dht *IpfsDHT) handleGetValue(p peer.Peer, pmes *pb.Message) (*pb.Message, } // if we know any providers for the requested value, return those. - provs := dht.providers.GetProviders(u.Key(pmes.GetKey())) + provs := dht.providers.GetProviders(ctx, u.Key(pmes.GetKey())) if len(provs) > 0 { log.Debugf("handleGetValue returning %d provider[s]", len(provs)) resp.ProviderPeers = pb.PeersToPBPeers(provs) @@ -107,7 +106,7 @@ func (dht *IpfsDHT) handleGetValue(p peer.Peer, pmes *pb.Message) (*pb.Message, } // Store a value in this peer local storage -func (dht *IpfsDHT) handlePutValue(p peer.Peer, pmes *pb.Message) (*pb.Message, error) { +func (dht *IpfsDHT) handlePutValue(ctx context.Context, p peer.Peer, pmes *pb.Message) (*pb.Message, error) { dht.dslock.Lock() defer dht.dslock.Unlock() dskey := u.Key(pmes.GetKey()).DsKey() @@ -129,12 +128,12 @@ func (dht *IpfsDHT) handlePutValue(p peer.Peer, pmes *pb.Message) (*pb.Message, return pmes, err } -func (dht *IpfsDHT) handlePing(p peer.Peer, pmes *pb.Message) (*pb.Message, error) { +func (dht *IpfsDHT) handlePing(_ context.Context, p peer.Peer, pmes *pb.Message) (*pb.Message, error) { log.Debugf("%s Responding to ping from %s!\n", dht.self, p) return pmes, nil } -func (dht *IpfsDHT) handleFindPeer(p peer.Peer, pmes *pb.Message) (*pb.Message, error) { +func (dht *IpfsDHT) handleFindPeer(ctx context.Context, p peer.Peer, pmes *pb.Message) (*pb.Message, error) { resp := pb.NewMessage(pmes.GetType(), "", pmes.GetClusterLevel()) var closest []peer.Peer @@ -164,7 +163,7 @@ func (dht *IpfsDHT) handleFindPeer(p peer.Peer, pmes *pb.Message) (*pb.Message, return resp, nil } -func (dht *IpfsDHT) handleGetProviders(p peer.Peer, pmes *pb.Message) (*pb.Message, error) { +func (dht *IpfsDHT) handleGetProviders(ctx context.Context, p peer.Peer, pmes *pb.Message) (*pb.Message, error) { resp := pb.NewMessage(pmes.GetType(), pmes.GetKey(), pmes.GetClusterLevel()) // check if we have this value, to add ourselves as provider. @@ -177,7 +176,7 @@ func (dht *IpfsDHT) handleGetProviders(p peer.Peer, pmes *pb.Message) (*pb.Messa } // setup providers - providers := dht.providers.GetProviders(u.Key(pmes.GetKey())) + providers := dht.providers.GetProviders(ctx, u.Key(pmes.GetKey())) if has { providers = append(providers, dht.self) } @@ -201,7 +200,7 @@ type providerInfo struct { Value peer.Peer } -func (dht *IpfsDHT) handleAddProvider(p peer.Peer, pmes *pb.Message) (*pb.Message, error) { +func (dht *IpfsDHT) handleAddProvider(ctx context.Context, p peer.Peer, pmes *pb.Message) (*pb.Message, error) { key := u.Key(pmes.GetKey()) log.Debugf("%s adding %s as a provider for '%s'\n", dht.self, p, peer.ID(key)) diff --git a/routing/dht/providers.go b/routing/dht/providers.go index f7d491d6a..2adc20860 100644 --- a/routing/dht/providers.go +++ b/routing/dht/providers.go @@ -101,12 +101,16 @@ func (pm *ProviderManager) AddProvider(k u.Key, val peer.Peer) { } } -func (pm *ProviderManager) GetProviders(k u.Key) []peer.Peer { +func (pm *ProviderManager) GetProviders(ctx context.Context, k u.Key) []peer.Peer { gp := new(getProv) gp.k = k gp.resp = make(chan []peer.Peer) - pm.getprovs <- gp - return <-gp.resp + select { + case pm.getprovs <- gp: + return <-gp.resp + case <-ctx.Done(): + return nil + } } func (pm *ProviderManager) GetLocal() []u.Key { diff --git a/routing/dht/providers_test.go b/routing/dht/providers_test.go index c4ae53910..1ae85fbc4 100644 --- a/routing/dht/providers_test.go +++ b/routing/dht/providers_test.go @@ -15,7 +15,7 @@ func TestProviderManager(t *testing.T) { p := NewProviderManager(ctx, mid) a := u.Key("test") p.AddProvider(a, peer.WithIDString("testingprovider")) - resp := p.GetProviders(a) + resp := p.GetProviders(ctx, a) if len(resp) != 1 { t.Fatal("Could not retrieve provider.") } diff --git a/routing/dht/routing.go b/routing/dht/routing.go index b154b270e..5db218ff6 100644 --- a/routing/dht/routing.go +++ b/routing/dht/routing.go @@ -133,7 +133,7 @@ func (dht *IpfsDHT) FindProvidersAsync(ctx context.Context, key u.Key, count int ps := newPeerSet() // TODO may want to make this function async to hide latency - provs := dht.providers.GetProviders(key) + provs := dht.providers.GetProviders(ctx, key) for _, p := range provs { count-- // NOTE: assuming that this list of peers is unique