1
0
mirror of https://github.com/ipfs/kubo.git synced 2025-07-01 19:24:14 +08:00

Merge pull request #272 from jbenet/context-dial-peer

add context to DialPeer interface
This commit is contained in:
Brian Tiger Chow
2014-11-05 10:08:40 -08:00
12 changed files with 118 additions and 37 deletions

View File

@ -21,15 +21,28 @@ import (
var log = u.Logger("bitswap") var log = u.Logger("bitswap")
// NetMessageSession initializes a BitSwap session that communicates over the // NetMessageSession initializes a BitSwap session that communicates over the
// provided NetMessage service // provided NetMessage service.
func NetMessageSession(parent context.Context, p peer.Peer, // Runs until context is cancelled
func NetMessageSession(ctx context.Context, p peer.Peer,
net inet.Network, srv inet.Service, directory bsnet.Routing, net inet.Network, srv inet.Service, directory bsnet.Routing,
d ds.ThreadSafeDatastore, nice bool) exchange.Interface { d ds.ThreadSafeDatastore, nice bool) exchange.Interface {
networkAdapter := bsnet.NetMessageAdapter(srv, net, nil) networkAdapter := bsnet.NetMessageAdapter(srv, net, nil)
notif := notifications.New()
go func() {
for {
select {
case <-ctx.Done():
notif.Shutdown()
}
}
}()
bs := &bitswap{ bs := &bitswap{
blockstore: blockstore.NewBlockstore(d), blockstore: blockstore.NewBlockstore(d),
notifications: notifications.New(), // TODO Shutdown() notifications: notif,
strategy: strategy.New(nice), strategy: strategy.New(nice),
routing: directory, routing: directory,
sender: networkAdapter, sender: networkAdapter,
@ -75,6 +88,8 @@ func (bs *bitswap) Block(parent context.Context, k u.Key) (*blocks.Block, error)
}() }()
ctx, cancelFunc := context.WithCancel(parent) ctx, cancelFunc := context.WithCancel(parent)
defer cancelFunc()
bs.wantlist.Add(k) bs.wantlist.Add(k)
promise := bs.notifications.Subscribe(ctx, k) promise := bs.notifications.Subscribe(ctx, k)
@ -91,7 +106,7 @@ func (bs *bitswap) Block(parent context.Context, k u.Key) (*blocks.Block, error)
go func(p peer.Peer) { go func(p peer.Peer) {
log.Debugf("bitswap dialing peer: %s", p) log.Debugf("bitswap dialing peer: %s", p)
err := bs.sender.DialPeer(p) err := bs.sender.DialPeer(ctx, p)
if err != nil { if err != nil {
log.Errorf("Error sender.DialPeer(%s)", p) log.Errorf("Error sender.DialPeer(%s)", p)
return return
@ -117,17 +132,15 @@ func (bs *bitswap) Block(parent context.Context, k u.Key) (*blocks.Block, error)
select { select {
case block := <-promise: case block := <-promise:
cancelFunc()
bs.wantlist.Remove(k) bs.wantlist.Remove(k)
// TODO remove from wantlist
return &block, nil return &block, nil
case <-parent.Done(): case <-parent.Done():
return nil, parent.Err() return nil, parent.Err()
} }
} }
// HasBlock announces the existance of a block to bitswap, potentially sending // HasBlock announces the existance of a block to this bitswap service. The
// it to peers (Partners) whose WantLists include it. // service will potentially notify its peers.
func (bs *bitswap) HasBlock(ctx context.Context, blk blocks.Block) error { func (bs *bitswap) HasBlock(ctx context.Context, blk blocks.Block) error {
log.Debugf("Has Block %v", blk.Key()) log.Debugf("Has Block %v", blk.Key())
bs.wantlist.Remove(blk.Key()) bs.wantlist.Remove(blk.Key())
@ -162,13 +175,11 @@ func (bs *bitswap) ReceiveMessage(ctx context.Context, p peer.Peer, incoming bsm
if err := bs.blockstore.Put(&block); err != nil { if err := bs.blockstore.Put(&block); err != nil {
continue // FIXME(brian): err ignored continue // FIXME(brian): err ignored
} }
go bs.notifications.Publish(block) bs.notifications.Publish(block)
go func(block blocks.Block) { err := bs.HasBlock(ctx, block)
err := bs.HasBlock(ctx, block) // FIXME err ignored if err != nil {
if err != nil { log.Warningf("HasBlock errored: %s", err)
log.Warningf("HasBlock errored: %s", err) }
}
}(block)
} }
message := bsmsg.New() message := bsmsg.New()
@ -202,11 +213,12 @@ func (bs *bitswap) ReceiveError(err error) {
// sent // sent
func (bs *bitswap) send(ctx context.Context, p peer.Peer, m bsmsg.BitSwapMessage) { func (bs *bitswap) send(ctx context.Context, p peer.Peer, m bsmsg.BitSwapMessage) {
bs.sender.SendMessage(ctx, p, m) bs.sender.SendMessage(ctx, p, m)
go bs.strategy.MessageSent(p, m) bs.strategy.MessageSent(p, m)
} }
func (bs *bitswap) sendToPeersThatWant(ctx context.Context, block blocks.Block) { func (bs *bitswap) sendToPeersThatWant(ctx context.Context, block blocks.Block) {
log.Debugf("Sending %v to peers that want it", block.Key()) log.Debugf("Sending %v to peers that want it", block.Key())
for _, p := range bs.strategy.Peers() { for _, p := range bs.strategy.Peers() {
if bs.strategy.BlockIsWantedByPeer(block.Key(), p) { if bs.strategy.BlockIsWantedByPeer(block.Key(), p) {
log.Debugf("%v wants %v", p, block.Key()) log.Debugf("%v wants %v", p, block.Key())
@ -216,7 +228,7 @@ func (bs *bitswap) sendToPeersThatWant(ctx context.Context, block blocks.Block)
for _, wanted := range bs.wantlist.Keys() { for _, wanted := range bs.wantlist.Keys() {
message.AddWanted(wanted) message.AddWanted(wanted)
} }
go bs.send(ctx, p, message) bs.send(ctx, p, message)
} }
} }
} }

View File

@ -12,7 +12,7 @@ import (
type Adapter interface { type Adapter interface {
// DialPeer ensures there is a connection to peer. // DialPeer ensures there is a connection to peer.
DialPeer(peer.Peer) error DialPeer(context.Context, peer.Peer) error
// SendMessage sends a BitSwap message to a peer. // SendMessage sends a BitSwap message to a peer.
SendMessage( SendMessage(

View File

@ -67,8 +67,8 @@ func (adapter *impl) HandleMessage(
return outgoing return outgoing
} }
func (adapter *impl) DialPeer(p peer.Peer) error { func (adapter *impl) DialPeer(ctx context.Context, p peer.Peer) error {
return adapter.net.DialPeer(p) return adapter.net.DialPeer(ctx, p)
} }
func (adapter *impl) SendMessage( func (adapter *impl) SendMessage(

View File

@ -163,7 +163,7 @@ func (nc *networkClient) SendRequest(
return nc.network.SendRequest(ctx, nc.local, to, message) return nc.network.SendRequest(ctx, nc.local, to, message)
} }
func (nc *networkClient) DialPeer(p peer.Peer) error { func (nc *networkClient) DialPeer(ctx context.Context, p peer.Peer) error {
// no need to do anything because dialing isn't a thing in this test net. // no need to do anything because dialing isn't a thing in this test net.
if !nc.network.HasPeer(p) { if !nc.network.HasPeer(p) {
return fmt.Errorf("Peer not in network: %s", p) return fmt.Errorf("Peer not in network: %s", p)

View File

@ -1,6 +1,7 @@
package net package net
import ( import (
"github.com/jbenet/go-ipfs/Godeps/_workspace/src/code.google.com/p/go.net/context"
msg "github.com/jbenet/go-ipfs/net/message" msg "github.com/jbenet/go-ipfs/net/message"
mux "github.com/jbenet/go-ipfs/net/mux" mux "github.com/jbenet/go-ipfs/net/mux"
srv "github.com/jbenet/go-ipfs/net/service" srv "github.com/jbenet/go-ipfs/net/service"
@ -19,7 +20,7 @@ type Network interface {
// TODO: for now, only listen on addrs in local peer when initializing. // TODO: for now, only listen on addrs in local peer when initializing.
// DialPeer attempts to establish a connection to a given peer // DialPeer attempts to establish a connection to a given peer
DialPeer(peer.Peer) error DialPeer(context.Context, peer.Peer) error
// ClosePeer connection to peer // ClosePeer connection to peer
ClosePeer(peer.Peer) error ClosePeer(peer.Peer) error
@ -64,5 +65,5 @@ type Service srv.Service
type Dialer interface { type Dialer interface {
// DialPeer attempts to establish a connection to a given peer // DialPeer attempts to establish a connection to a given peer
DialPeer(peer.Peer) error DialPeer(context.Context, peer.Peer) error
} }

View File

@ -5,6 +5,7 @@ import (
mux "github.com/jbenet/go-ipfs/net/mux" mux "github.com/jbenet/go-ipfs/net/mux"
swarm "github.com/jbenet/go-ipfs/net/swarm" swarm "github.com/jbenet/go-ipfs/net/swarm"
peer "github.com/jbenet/go-ipfs/peer" peer "github.com/jbenet/go-ipfs/peer"
util "github.com/jbenet/go-ipfs/util"
ctxc "github.com/jbenet/go-ipfs/util/ctxcloser" ctxc "github.com/jbenet/go-ipfs/util/ctxcloser"
context "github.com/jbenet/go-ipfs/Godeps/_workspace/src/code.google.com/p/go.net/context" context "github.com/jbenet/go-ipfs/Godeps/_workspace/src/code.google.com/p/go.net/context"
@ -56,9 +57,13 @@ func NewIpfsNetwork(ctx context.Context, listen []ma.Multiaddr, local peer.Peer,
// Listen handles incoming connections on given Multiaddr. // Listen handles incoming connections on given Multiaddr.
// func (n *IpfsNetwork) Listen(*ma.Muliaddr) error {} // func (n *IpfsNetwork) Listen(*ma.Muliaddr) error {}
// DialPeer attempts to establish a connection to a given peer // DialPeer attempts to establish a connection to a given peer.
func (n *IpfsNetwork) DialPeer(p peer.Peer) error { // Respects the context.
_, err := n.swarm.Dial(p) func (n *IpfsNetwork) DialPeer(ctx context.Context, p peer.Peer) error {
err := util.ContextDo(ctx, func() error {
_, err := n.swarm.Dial(p)
return err
})
return err return err
} }

View File

@ -100,7 +100,7 @@ func (dht *IpfsDHT) Connect(ctx context.Context, npeer peer.Peer) (peer.Peer, er
// //
// /ip4/10.20.30.40/tcp/1234/ipfs/Qxhxxchxzcncxnzcnxzcxzm // /ip4/10.20.30.40/tcp/1234/ipfs/Qxhxxchxzcncxnzcnxzcxzm
// //
err := dht.dialer.DialPeer(npeer) err := dht.dialer.DialPeer(ctx, npeer)
if err != nil { if err != nil {
return nil, err return nil, err
} }
@ -311,7 +311,7 @@ func (dht *IpfsDHT) getFromPeerList(ctx context.Context, key u.Key,
peerlist []*pb.Message_Peer, level int) ([]byte, error) { peerlist []*pb.Message_Peer, level int) ([]byte, error) {
for _, pinfo := range peerlist { for _, pinfo := range peerlist {
p, err := dht.ensureConnectedToPeer(pinfo) p, err := dht.ensureConnectedToPeer(ctx, pinfo)
if err != nil { if err != nil {
log.Errorf("getFromPeers error: %s", err) log.Errorf("getFromPeers error: %s", err)
continue continue
@ -496,14 +496,14 @@ func (dht *IpfsDHT) peerFromInfo(pbp *pb.Message_Peer) (peer.Peer, error) {
return p, nil return p, nil
} }
func (dht *IpfsDHT) ensureConnectedToPeer(pbp *pb.Message_Peer) (peer.Peer, error) { func (dht *IpfsDHT) ensureConnectedToPeer(ctx context.Context, pbp *pb.Message_Peer) (peer.Peer, error) {
p, err := dht.peerFromInfo(pbp) p, err := dht.peerFromInfo(pbp)
if err != nil { if err != nil {
return nil, err return nil, err
} }
// dial connection // dial connection
err = dht.dialer.DialPeer(p) err = dht.dialer.DialPeer(ctx, p)
return p, err return p, err
} }
@ -556,7 +556,7 @@ func (dht *IpfsDHT) Bootstrap(ctx context.Context) {
if err != nil { if err != nil {
log.Error("Bootstrap peer error: %s", err) log.Error("Bootstrap peer error: %s", err)
} }
err = dht.dialer.DialPeer(p) err = dht.dialer.DialPeer(ctx, p)
if err != nil { if err != nil {
log.Errorf("Bootstrap peer error: %s", err) log.Errorf("Bootstrap peer error: %s", err)
} }

View File

@ -4,7 +4,6 @@ import (
"testing" "testing"
crand "crypto/rand" crand "crypto/rand"
context "github.com/jbenet/go-ipfs/Godeps/_workspace/src/code.google.com/p/go.net/context" context "github.com/jbenet/go-ipfs/Godeps/_workspace/src/code.google.com/p/go.net/context"
"github.com/jbenet/go-ipfs/Godeps/_workspace/src/code.google.com/p/goprotobuf/proto" "github.com/jbenet/go-ipfs/Godeps/_workspace/src/code.google.com/p/goprotobuf/proto"
@ -82,7 +81,7 @@ type fauxNet struct {
} }
// DialPeer attempts to establish a connection to a given peer // DialPeer attempts to establish a connection to a given peer
func (f *fauxNet) DialPeer(peer.Peer) error { func (f *fauxNet) DialPeer(context.Context, peer.Peer) error {
return nil return nil
} }

View File

@ -230,7 +230,7 @@ func (r *dhtQueryRunner) queryPeer(p peer.Peer) {
// make sure we're connected to the peer. // make sure we're connected to the peer.
// (Incidentally, this will add it to the peerstore too) // (Incidentally, this will add it to the peerstore too)
err := r.query.dialer.DialPeer(p) err := r.query.dialer.DialPeer(r.ctx, p)
if err != nil { if err != nil {
log.Debugf("ERROR worker for: %v -- err connecting: %v", p, err) log.Debugf("ERROR worker for: %v -- err connecting: %v", p, err)
r.Lock() r.Lock()

View File

@ -145,7 +145,7 @@ func (dht *IpfsDHT) FindProvidersAsync(ctx context.Context, key u.Key, count int
log.Error(err) log.Error(err)
return return
} }
dht.addPeerListAsync(key, pmes.GetProviderPeers(), ps, count, peerOut) dht.addPeerListAsync(ctx, key, pmes.GetProviderPeers(), ps, count, peerOut)
}(pp) }(pp)
} }
wg.Wait() wg.Wait()
@ -154,13 +154,13 @@ func (dht *IpfsDHT) FindProvidersAsync(ctx context.Context, key u.Key, count int
return peerOut return peerOut
} }
func (dht *IpfsDHT) addPeerListAsync(k u.Key, peers []*pb.Message_Peer, ps *peerSet, count int, out chan peer.Peer) { func (dht *IpfsDHT) addPeerListAsync(ctx context.Context, k u.Key, peers []*pb.Message_Peer, ps *peerSet, count int, out chan peer.Peer) {
done := make(chan struct{}) done := make(chan struct{})
for _, pbp := range peers { for _, pbp := range peers {
go func(mp *pb.Message_Peer) { go func(mp *pb.Message_Peer) {
defer func() { done <- struct{}{} }() defer func() { done <- struct{}{} }()
// construct new peer // construct new peer
p, err := dht.ensureConnectedToPeer(mp) p, err := dht.ensureConnectedToPeer(ctx, mp)
if err != nil { if err != nil {
log.Error("%s", err) log.Error("%s", err)
return return

22
util/do.go Normal file
View File

@ -0,0 +1,22 @@
package util
import "github.com/jbenet/go-ipfs/Godeps/_workspace/src/code.google.com/p/go.net/context"
func ContextDo(ctx context.Context, f func() error) error {
ch := make(chan error)
go func() {
select {
case <-ctx.Done():
case ch <- f():
}
}()
select {
case <-ctx.Done():
return ctx.Err()
case val := <-ch:
return val
}
return nil
}

42
util/do_test.go Normal file
View File

@ -0,0 +1,42 @@
package util
import (
"errors"
"testing"
"github.com/jbenet/go-ipfs/Godeps/_workspace/src/code.google.com/p/go.net/context"
)
func TestDoReturnsContextErr(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
ch := make(chan struct{})
err := ContextDo(ctx, func() error {
cancel()
ch <- struct{}{} // won't return
return nil
})
if err != ctx.Err() {
t.Fail()
}
}
func TestDoReturnsFuncError(t *testing.T) {
ctx := context.Background()
expected := errors.New("expected to be returned by ContextDo")
err := ContextDo(ctx, func() error {
return expected
})
if err != expected {
t.Fail()
}
}
func TestDoReturnsNil(t *testing.T) {
ctx := context.Background()
err := ContextDo(ctx, func() error {
return nil
})
if err != nil {
t.Fail()
}
}