1
0
mirror of https://github.com/ipfs/kubo.git synced 2025-09-10 22:49:13 +08:00

Dialer for dht

dht doesn't need the whole network interface, only needs a Dialer.
(much reduced surface of possible errors)
This commit is contained in:
Juan Batiz-Benet
2014-10-21 03:13:10 -07:00
parent cc9f276f0f
commit cc5c181ae0
4 changed files with 22 additions and 17 deletions

View File

@ -48,3 +48,12 @@ type Handler srv.Handler
// Service interface for network resources. // Service interface for network resources.
type Service srv.Service type Service srv.Service
// Dialer service that can dial to peers
// (this is usually just a Network, but other services may not need the whole
// thing, and thus it becomes easier to mock)
type Dialer interface {
// DialPeer attempts to establish a connection to a given peer
DialPeer(peer.Peer) error
}

View File

@ -33,8 +33,8 @@ type IpfsDHT struct {
// NOTE: (currently, only a single table is used) // NOTE: (currently, only a single table is used)
routingTables []*kb.RoutingTable routingTables []*kb.RoutingTable
// the network interface. service // the network services we need
network inet.Network dialer inet.Dialer
sender inet.Sender sender inet.Sender
// Local peer (yourself) // Local peer (yourself)
@ -59,9 +59,9 @@ type IpfsDHT struct {
} }
// NewDHT creates a new DHT object with the given peer as the 'local' host // NewDHT creates a new DHT object with the given peer as the 'local' host
func NewDHT(ctx context.Context, p peer.Peer, ps peer.Peerstore, net inet.Network, sender inet.Sender, dstore ds.Datastore) *IpfsDHT { func NewDHT(ctx context.Context, p peer.Peer, ps peer.Peerstore, dialer inet.Dialer, sender inet.Sender, dstore ds.Datastore) *IpfsDHT {
dht := new(IpfsDHT) dht := new(IpfsDHT)
dht.network = net dht.dialer = dialer
dht.sender = sender dht.sender = sender
dht.datastore = dstore dht.datastore = dstore
dht.self = p dht.self = p
@ -95,7 +95,7 @@ func (dht *IpfsDHT) Connect(ctx context.Context, npeer peer.Peer) (peer.Peer, er
// //
// /ip4/10.20.30.40/tcp/1234/ipfs/Qxhxxchxzcncxnzcnxzcxzm // /ip4/10.20.30.40/tcp/1234/ipfs/Qxhxxchxzcncxnzcnxzcxzm
// //
err := dht.network.DialPeer(npeer) err := dht.dialer.DialPeer(npeer)
if err != nil { if err != nil {
return nil, err return nil, err
} }
@ -499,7 +499,7 @@ func (dht *IpfsDHT) ensureConnectedToPeer(pbp *Message_Peer) (peer.Peer, error)
} }
// dial connection // dial connection
err = dht.network.DialPeer(p) err = dht.dialer.DialPeer(p)
return p, err return p, err
} }

View File

@ -3,6 +3,7 @@ package dht
import ( import (
"sync" "sync"
inet "github.com/jbenet/go-ipfs/net"
peer "github.com/jbenet/go-ipfs/peer" peer "github.com/jbenet/go-ipfs/peer"
queue "github.com/jbenet/go-ipfs/peer/queue" queue "github.com/jbenet/go-ipfs/peer/queue"
kb "github.com/jbenet/go-ipfs/routing/kbucket" kb "github.com/jbenet/go-ipfs/routing/kbucket"
@ -14,17 +15,12 @@ import (
var maxQueryConcurrency = AlphaValue var maxQueryConcurrency = AlphaValue
type dhtDialer interface {
// DialPeer attempts to establish a connection to a given peer
DialPeer(peer.Peer) error
}
type dhtQuery struct { type dhtQuery struct {
// the key we're querying for // the key we're querying for
key u.Key key u.Key
// dialer used to ensure we're connected to peers // dialer used to ensure we're connected to peers
dialer dhtDialer dialer inet.Dialer
// the function to execute per peer // the function to execute per peer
qfunc queryFunc qfunc queryFunc
@ -42,7 +38,7 @@ type dhtQueryResult struct {
} }
// constructs query // constructs query
func newQuery(k u.Key, d dhtDialer, f queryFunc) *dhtQuery { func newQuery(k u.Key, d inet.Dialer, f queryFunc) *dhtQuery {
return &dhtQuery{ return &dhtQuery{
key: k, key: k,
dialer: d, dialer: d,

View File

@ -29,7 +29,7 @@ func (dht *IpfsDHT) PutValue(ctx context.Context, key u.Key, value []byte) error
peers = append(peers, npeers...) peers = append(peers, npeers...)
} }
query := newQuery(key, dht.network, func(ctx context.Context, p peer.Peer) (*dhtQueryResult, error) { query := newQuery(key, dht.dialer, func(ctx context.Context, p peer.Peer) (*dhtQueryResult, error) {
log.Debug("%s PutValue qry part %v", dht.self, p) log.Debug("%s PutValue qry part %v", dht.self, p)
err := dht.putValueToNetwork(ctx, p, string(key), value) err := dht.putValueToNetwork(ctx, p, string(key), value)
if err != nil { if err != nil {
@ -65,7 +65,7 @@ func (dht *IpfsDHT) GetValue(ctx context.Context, key u.Key) ([]byte, error) {
} }
// setup the Query // setup the Query
query := newQuery(key, dht.network, func(ctx context.Context, p peer.Peer) (*dhtQueryResult, error) { query := newQuery(key, dht.dialer, func(ctx context.Context, p peer.Peer) (*dhtQueryResult, error) {
val, peers, err := dht.getValueOrPeers(ctx, p, key, routeLevel) val, peers, err := dht.getValueOrPeers(ctx, p, key, routeLevel)
if err != nil { if err != nil {
@ -230,7 +230,7 @@ func (dht *IpfsDHT) findPeerMultiple(ctx context.Context, id peer.ID) (peer.Peer
} }
// setup query function // setup query function
query := newQuery(u.Key(id), dht.network, func(ctx context.Context, p peer.Peer) (*dhtQueryResult, error) { query := newQuery(u.Key(id), dht.dialer, func(ctx context.Context, p peer.Peer) (*dhtQueryResult, error) {
pmes, err := dht.findPeerSingle(ctx, p, id, routeLevel) pmes, err := dht.findPeerSingle(ctx, p, id, routeLevel)
if err != nil { if err != nil {
log.Error("%s getPeer error: %v", dht.self, err) log.Error("%s getPeer error: %v", dht.self, err)