diff --git a/exchange/bitswap/network/net_message_adapter.go b/exchange/bitswap/network/net_message_adapter.go index 1bdf13ae9..f3fe1b257 100644 --- a/exchange/bitswap/network/net_message_adapter.go +++ b/exchange/bitswap/network/net_message_adapter.go @@ -68,7 +68,7 @@ func (adapter *impl) HandleMessage( } func (adapter *impl) DialPeer(ctx context.Context, p peer.Peer) error { - return adapter.net.DialPeer(p) + return adapter.net.DialPeer(ctx, p) } func (adapter *impl) SendMessage( diff --git a/exchange/bitswap/testnet/network.go b/exchange/bitswap/testnet/network.go index 418f75ce0..a7864c2a1 100644 --- a/exchange/bitswap/testnet/network.go +++ b/exchange/bitswap/testnet/network.go @@ -163,7 +163,7 @@ func (nc *networkClient) SendRequest( return nc.network.SendRequest(ctx, nc.local, to, message) } -func (nc *networkClient) DialPeer(p peer.Peer) error { +func (nc *networkClient) DialPeer(ctx context.Context, p peer.Peer) error { // no need to do anything because dialing isn't a thing in this test net. if !nc.network.HasPeer(p) { return fmt.Errorf("Peer not in network: %s", p) diff --git a/net/interface.go b/net/interface.go index 84afd7258..832588490 100644 --- a/net/interface.go +++ b/net/interface.go @@ -1,6 +1,7 @@ package net import ( + "github.com/jbenet/go-ipfs/Godeps/_workspace/src/code.google.com/p/go.net/context" msg "github.com/jbenet/go-ipfs/net/message" mux "github.com/jbenet/go-ipfs/net/mux" srv "github.com/jbenet/go-ipfs/net/service" @@ -19,7 +20,7 @@ type Network interface { // TODO: for now, only listen on addrs in local peer when initializing. // DialPeer attempts to establish a connection to a given peer - DialPeer(peer.Peer) error + DialPeer(context.Context, peer.Peer) error // ClosePeer connection to peer ClosePeer(peer.Peer) error @@ -64,5 +65,5 @@ type Service srv.Service type Dialer interface { // DialPeer attempts to establish a connection to a given peer - DialPeer(peer.Peer) error + DialPeer(context.Context, peer.Peer) error } diff --git a/net/net.go b/net/net.go index 0bff06f6a..ed648c976 100644 --- a/net/net.go +++ b/net/net.go @@ -57,7 +57,7 @@ func NewIpfsNetwork(ctx context.Context, listen []ma.Multiaddr, local peer.Peer, // func (n *IpfsNetwork) Listen(*ma.Muliaddr) error {} // DialPeer attempts to establish a connection to a given peer -func (n *IpfsNetwork) DialPeer(p peer.Peer) error { +func (n *IpfsNetwork) DialPeer(ctx context.Context, p peer.Peer) error { _, err := n.swarm.Dial(p) return err } diff --git a/routing/dht/dht.go b/routing/dht/dht.go index feff52706..11764faba 100644 --- a/routing/dht/dht.go +++ b/routing/dht/dht.go @@ -100,7 +100,7 @@ func (dht *IpfsDHT) Connect(ctx context.Context, npeer peer.Peer) (peer.Peer, er // // /ip4/10.20.30.40/tcp/1234/ipfs/Qxhxxchxzcncxnzcnxzcxzm // - err := dht.dialer.DialPeer(npeer) + err := dht.dialer.DialPeer(ctx, npeer) if err != nil { return nil, err } @@ -311,7 +311,7 @@ func (dht *IpfsDHT) getFromPeerList(ctx context.Context, key u.Key, peerlist []*pb.Message_Peer, level int) ([]byte, error) { for _, pinfo := range peerlist { - p, err := dht.ensureConnectedToPeer(pinfo) + p, err := dht.ensureConnectedToPeer(ctx, pinfo) if err != nil { log.Errorf("getFromPeers error: %s", err) continue @@ -496,14 +496,14 @@ func (dht *IpfsDHT) peerFromInfo(pbp *pb.Message_Peer) (peer.Peer, error) { return p, nil } -func (dht *IpfsDHT) ensureConnectedToPeer(pbp *pb.Message_Peer) (peer.Peer, error) { +func (dht *IpfsDHT) ensureConnectedToPeer(ctx context.Context, pbp *pb.Message_Peer) (peer.Peer, error) { p, err := dht.peerFromInfo(pbp) if err != nil { return nil, err } // dial connection - err = dht.dialer.DialPeer(p) + err = dht.dialer.DialPeer(ctx, p) return p, err } @@ -556,7 +556,7 @@ func (dht *IpfsDHT) Bootstrap(ctx context.Context) { if err != nil { log.Error("Bootstrap peer error: %s", err) } - err = dht.dialer.DialPeer(p) + err = dht.dialer.DialPeer(ctx, p) if err != nil { log.Errorf("Bootstrap peer error: %s", err) } diff --git a/routing/dht/ext_test.go b/routing/dht/ext_test.go index a6d1d933d..1dabb5b64 100644 --- a/routing/dht/ext_test.go +++ b/routing/dht/ext_test.go @@ -4,7 +4,6 @@ import ( "testing" crand "crypto/rand" - context "github.com/jbenet/go-ipfs/Godeps/_workspace/src/code.google.com/p/go.net/context" "github.com/jbenet/go-ipfs/Godeps/_workspace/src/code.google.com/p/goprotobuf/proto" @@ -82,7 +81,7 @@ type fauxNet struct { } // DialPeer attempts to establish a connection to a given peer -func (f *fauxNet) DialPeer(peer.Peer) error { +func (f *fauxNet) DialPeer(context.Context, peer.Peer) error { return nil } diff --git a/routing/dht/query.go b/routing/dht/query.go index 557af095c..f0478ff29 100644 --- a/routing/dht/query.go +++ b/routing/dht/query.go @@ -230,7 +230,7 @@ func (r *dhtQueryRunner) queryPeer(p peer.Peer) { // make sure we're connected to the peer. // (Incidentally, this will add it to the peerstore too) - err := r.query.dialer.DialPeer(p) + err := r.query.dialer.DialPeer(r.ctx, p) if err != nil { log.Debugf("ERROR worker for: %v -- err connecting: %v", p, err) r.Lock() diff --git a/routing/dht/routing.go b/routing/dht/routing.go index da400fee2..c0d86e1d1 100644 --- a/routing/dht/routing.go +++ b/routing/dht/routing.go @@ -145,7 +145,7 @@ func (dht *IpfsDHT) FindProvidersAsync(ctx context.Context, key u.Key, count int log.Error(err) return } - dht.addPeerListAsync(key, pmes.GetProviderPeers(), ps, count, peerOut) + dht.addPeerListAsync(ctx, key, pmes.GetProviderPeers(), ps, count, peerOut) }(pp) } wg.Wait() @@ -154,13 +154,13 @@ func (dht *IpfsDHT) FindProvidersAsync(ctx context.Context, key u.Key, count int return peerOut } -func (dht *IpfsDHT) addPeerListAsync(k u.Key, peers []*pb.Message_Peer, ps *peerSet, count int, out chan peer.Peer) { +func (dht *IpfsDHT) addPeerListAsync(ctx context.Context, k u.Key, peers []*pb.Message_Peer, ps *peerSet, count int, out chan peer.Peer) { done := make(chan struct{}) for _, pbp := range peers { go func(mp *pb.Message_Peer) { defer func() { done <- struct{}{} }() // construct new peer - p, err := dht.ensureConnectedToPeer(mp) + p, err := dht.ensureConnectedToPeer(ctx, mp) if err != nil { log.Error("%s", err) return