From a437240989c678ef133e3d25ca9680713d26c727 Mon Sep 17 00:00:00 2001 From: Jeromy Date: Mon, 30 Mar 2015 07:53:14 -0700 Subject: [PATCH] reduce dht bandwidth consumption Signed-off-by: Jeromy --- routing/dht/handlers.go | 2 +- routing/dht/records.go | 9 +++++++-- routing/dht/routing.go | 8 ++++---- 3 files changed, 12 insertions(+), 7 deletions(-) diff --git a/routing/dht/handlers.go b/routing/dht/handlers.go index 62b22c5ca..550825245 100644 --- a/routing/dht/handlers.go +++ b/routing/dht/handlers.go @@ -226,5 +226,5 @@ func (dht *IpfsDHT) handleAddProvider(ctx context.Context, p peer.ID, pmes *pb.M dht.providers.AddProvider(ctx, key, p) } - return pmes, nil // send back same msg as confirmation. + return nil, nil } diff --git a/routing/dht/records.go b/routing/dht/records.go index e327ed171..9c90b9b7d 100644 --- a/routing/dht/records.go +++ b/routing/dht/records.go @@ -31,6 +31,10 @@ func (dht *IpfsDHT) getPublicKeyOnline(ctx context.Context, p peer.ID) (ci.PubKe ctxT, cancelFunc := ctxutil.WithDeadlineFraction(ctx, 0.3) defer cancelFunc() if pk, err := dht.getPublicKeyFromNode(ctx, p); err == nil { + err := dht.peerstore.AddPubKey(p, pk) + if err != nil { + return pk, err + } return pk, nil } @@ -38,7 +42,7 @@ func (dht *IpfsDHT) getPublicKeyOnline(ctx context.Context, p peer.ID) (ci.PubKe log.Debugf("pk for %s not in peerstore, and peer failed. trying dht.", p) pkkey := KeyForPublicKey(p) - // ok, try the node itself. if they're overwhelmed or slow we can move on. + // ok, now try the dht. Anyone who has previously fetched the key should have it val, err := dht.GetValue(ctxT, pkkey) if err != nil { log.Warning("Failed to find requested public key.") @@ -50,7 +54,8 @@ func (dht *IpfsDHT) getPublicKeyOnline(ctx context.Context, p peer.ID) (ci.PubKe log.Debugf("Failed to unmarshal public key: %s", err) return nil, err } - return pk, nil + + return pk, dht.peerstore.AddPubKey(p, pk) } func (dht *IpfsDHT) getPublicKeyFromNode(ctx context.Context, p peer.ID) (ci.PubKey, error) { diff --git a/routing/dht/routing.go b/routing/dht/routing.go index 28ee8f03a..5ff279104 100644 --- a/routing/dht/routing.go +++ b/routing/dht/routing.go @@ -91,7 +91,7 @@ func (dht *IpfsDHT) GetValue(ctx context.Context, key u.Key) ([]byte, error) { } // get closest peers in the routing table - rtp := dht.routingTable.ListPeers() + rtp := dht.routingTable.NearestPeers(kb.ConvertKey(key), AlphaValue) log.Debugf("peers in rt: %s", len(rtp), rtp) if len(rtp) == 0 { log.Warning("No peers from routing table!") @@ -256,7 +256,7 @@ func (dht *IpfsDHT) findProvidersAsyncRoutine(ctx context.Context, key u.Key, co return &dhtQueryResult{closerPeers: clpeers}, nil }) - peers := dht.routingTable.ListPeers() + peers := dht.routingTable.NearestPeers(kb.ConvertKey(key), AlphaValue) _, err := query.Run(ctx, peers) if err != nil { log.Debugf("Query error: %s", err) @@ -276,7 +276,7 @@ func (dht *IpfsDHT) FindPeer(ctx context.Context, id peer.ID) (peer.PeerInfo, er return pi, nil } - peers := dht.routingTable.ListPeers() + peers := dht.routingTable.NearestPeers(kb.ConvertPeerID(id), AlphaValue) if len(peers) == 0 { return peer.PeerInfo{}, errors.Wrap(kb.ErrLookupFailure) } @@ -342,7 +342,7 @@ func (dht *IpfsDHT) FindPeersConnectedToPeer(ctx context.Context, id peer.ID) (< peerchan := make(chan peer.PeerInfo, asyncQueryBuffer) peersSeen := peer.Set{} - peers := dht.routingTable.ListPeers() + peers := dht.routingTable.NearestPeers(kb.ConvertPeerID(id), AlphaValue) if len(peers) == 0 { return nil, errors.Wrap(kb.ErrLookupFailure) }