From c60d01176412c3897ab7ea906dd074ff0e7ed233 Mon Sep 17 00:00:00 2001 From: Brian Tiger Chow Date: Wed, 5 Nov 2014 03:00:04 -0800 Subject: [PATCH 1/8] fix(exchange) add context to DialPeer --- exchange/bitswap/bitswap.go | 2 +- exchange/bitswap/network/interface.go | 2 +- exchange/bitswap/network/net_message_adapter.go | 2 +- 3 files changed, 3 insertions(+), 3 deletions(-) diff --git a/exchange/bitswap/bitswap.go b/exchange/bitswap/bitswap.go index 88ff418c7..af84caa05 100644 --- a/exchange/bitswap/bitswap.go +++ b/exchange/bitswap/bitswap.go @@ -91,7 +91,7 @@ func (bs *bitswap) Block(parent context.Context, k u.Key) (*blocks.Block, error) go func(p peer.Peer) { log.Debugf("bitswap dialing peer: %s", p) - err := bs.sender.DialPeer(p) + err := bs.sender.DialPeer(ctx, p) if err != nil { log.Errorf("Error sender.DialPeer(%s)", p) return diff --git a/exchange/bitswap/network/interface.go b/exchange/bitswap/network/interface.go index 467b0f400..1d3fc63a5 100644 --- a/exchange/bitswap/network/interface.go +++ b/exchange/bitswap/network/interface.go @@ -12,7 +12,7 @@ import ( type Adapter interface { // DialPeer ensures there is a connection to peer. - DialPeer(peer.Peer) error + DialPeer(context.Context, peer.Peer) error // SendMessage sends a BitSwap message to a peer. SendMessage( diff --git a/exchange/bitswap/network/net_message_adapter.go b/exchange/bitswap/network/net_message_adapter.go index c7e1a852d..1bdf13ae9 100644 --- a/exchange/bitswap/network/net_message_adapter.go +++ b/exchange/bitswap/network/net_message_adapter.go @@ -67,7 +67,7 @@ func (adapter *impl) HandleMessage( return outgoing } -func (adapter *impl) DialPeer(p peer.Peer) error { +func (adapter *impl) DialPeer(ctx context.Context, p peer.Peer) error { return adapter.net.DialPeer(p) } From 23096de3c4e77c09bfc9c88fde8e43a9c80b2f2f Mon Sep 17 00:00:00 2001 From: Brian Tiger Chow Date: Wed, 5 Nov 2014 04:26:30 -0800 Subject: [PATCH 2/8] fix(net) pass contexts to dial peer --- exchange/bitswap/network/net_message_adapter.go | 2 +- exchange/bitswap/testnet/network.go | 2 +- net/interface.go | 5 +++-- net/net.go | 2 +- routing/dht/dht.go | 10 +++++----- routing/dht/ext_test.go | 3 +-- routing/dht/query.go | 2 +- routing/dht/routing.go | 6 +++--- 8 files changed, 16 insertions(+), 16 deletions(-) diff --git a/exchange/bitswap/network/net_message_adapter.go b/exchange/bitswap/network/net_message_adapter.go index 1bdf13ae9..f3fe1b257 100644 --- a/exchange/bitswap/network/net_message_adapter.go +++ b/exchange/bitswap/network/net_message_adapter.go @@ -68,7 +68,7 @@ func (adapter *impl) HandleMessage( } func (adapter *impl) DialPeer(ctx context.Context, p peer.Peer) error { - return adapter.net.DialPeer(p) + return adapter.net.DialPeer(ctx, p) } func (adapter *impl) SendMessage( diff --git a/exchange/bitswap/testnet/network.go b/exchange/bitswap/testnet/network.go index 418f75ce0..a7864c2a1 100644 --- a/exchange/bitswap/testnet/network.go +++ b/exchange/bitswap/testnet/network.go @@ -163,7 +163,7 @@ func (nc *networkClient) SendRequest( return nc.network.SendRequest(ctx, nc.local, to, message) } -func (nc *networkClient) DialPeer(p peer.Peer) error { +func (nc *networkClient) DialPeer(ctx context.Context, p peer.Peer) error { // no need to do anything because dialing isn't a thing in this test net. if !nc.network.HasPeer(p) { return fmt.Errorf("Peer not in network: %s", p) diff --git a/net/interface.go b/net/interface.go index 84afd7258..832588490 100644 --- a/net/interface.go +++ b/net/interface.go @@ -1,6 +1,7 @@ package net import ( + "github.com/jbenet/go-ipfs/Godeps/_workspace/src/code.google.com/p/go.net/context" msg "github.com/jbenet/go-ipfs/net/message" mux "github.com/jbenet/go-ipfs/net/mux" srv "github.com/jbenet/go-ipfs/net/service" @@ -19,7 +20,7 @@ type Network interface { // TODO: for now, only listen on addrs in local peer when initializing. // DialPeer attempts to establish a connection to a given peer - DialPeer(peer.Peer) error + DialPeer(context.Context, peer.Peer) error // ClosePeer connection to peer ClosePeer(peer.Peer) error @@ -64,5 +65,5 @@ type Service srv.Service type Dialer interface { // DialPeer attempts to establish a connection to a given peer - DialPeer(peer.Peer) error + DialPeer(context.Context, peer.Peer) error } diff --git a/net/net.go b/net/net.go index 0bff06f6a..ed648c976 100644 --- a/net/net.go +++ b/net/net.go @@ -57,7 +57,7 @@ func NewIpfsNetwork(ctx context.Context, listen []ma.Multiaddr, local peer.Peer, // func (n *IpfsNetwork) Listen(*ma.Muliaddr) error {} // DialPeer attempts to establish a connection to a given peer -func (n *IpfsNetwork) DialPeer(p peer.Peer) error { +func (n *IpfsNetwork) DialPeer(ctx context.Context, p peer.Peer) error { _, err := n.swarm.Dial(p) return err } diff --git a/routing/dht/dht.go b/routing/dht/dht.go index feff52706..11764faba 100644 --- a/routing/dht/dht.go +++ b/routing/dht/dht.go @@ -100,7 +100,7 @@ func (dht *IpfsDHT) Connect(ctx context.Context, npeer peer.Peer) (peer.Peer, er // // /ip4/10.20.30.40/tcp/1234/ipfs/Qxhxxchxzcncxnzcnxzcxzm // - err := dht.dialer.DialPeer(npeer) + err := dht.dialer.DialPeer(ctx, npeer) if err != nil { return nil, err } @@ -311,7 +311,7 @@ func (dht *IpfsDHT) getFromPeerList(ctx context.Context, key u.Key, peerlist []*pb.Message_Peer, level int) ([]byte, error) { for _, pinfo := range peerlist { - p, err := dht.ensureConnectedToPeer(pinfo) + p, err := dht.ensureConnectedToPeer(ctx, pinfo) if err != nil { log.Errorf("getFromPeers error: %s", err) continue @@ -496,14 +496,14 @@ func (dht *IpfsDHT) peerFromInfo(pbp *pb.Message_Peer) (peer.Peer, error) { return p, nil } -func (dht *IpfsDHT) ensureConnectedToPeer(pbp *pb.Message_Peer) (peer.Peer, error) { +func (dht *IpfsDHT) ensureConnectedToPeer(ctx context.Context, pbp *pb.Message_Peer) (peer.Peer, error) { p, err := dht.peerFromInfo(pbp) if err != nil { return nil, err } // dial connection - err = dht.dialer.DialPeer(p) + err = dht.dialer.DialPeer(ctx, p) return p, err } @@ -556,7 +556,7 @@ func (dht *IpfsDHT) Bootstrap(ctx context.Context) { if err != nil { log.Error("Bootstrap peer error: %s", err) } - err = dht.dialer.DialPeer(p) + err = dht.dialer.DialPeer(ctx, p) if err != nil { log.Errorf("Bootstrap peer error: %s", err) } diff --git a/routing/dht/ext_test.go b/routing/dht/ext_test.go index a6d1d933d..1dabb5b64 100644 --- a/routing/dht/ext_test.go +++ b/routing/dht/ext_test.go @@ -4,7 +4,6 @@ import ( "testing" crand "crypto/rand" - context "github.com/jbenet/go-ipfs/Godeps/_workspace/src/code.google.com/p/go.net/context" "github.com/jbenet/go-ipfs/Godeps/_workspace/src/code.google.com/p/goprotobuf/proto" @@ -82,7 +81,7 @@ type fauxNet struct { } // DialPeer attempts to establish a connection to a given peer -func (f *fauxNet) DialPeer(peer.Peer) error { +func (f *fauxNet) DialPeer(context.Context, peer.Peer) error { return nil } diff --git a/routing/dht/query.go b/routing/dht/query.go index 557af095c..f0478ff29 100644 --- a/routing/dht/query.go +++ b/routing/dht/query.go @@ -230,7 +230,7 @@ func (r *dhtQueryRunner) queryPeer(p peer.Peer) { // make sure we're connected to the peer. // (Incidentally, this will add it to the peerstore too) - err := r.query.dialer.DialPeer(p) + err := r.query.dialer.DialPeer(r.ctx, p) if err != nil { log.Debugf("ERROR worker for: %v -- err connecting: %v", p, err) r.Lock() diff --git a/routing/dht/routing.go b/routing/dht/routing.go index da400fee2..c0d86e1d1 100644 --- a/routing/dht/routing.go +++ b/routing/dht/routing.go @@ -145,7 +145,7 @@ func (dht *IpfsDHT) FindProvidersAsync(ctx context.Context, key u.Key, count int log.Error(err) return } - dht.addPeerListAsync(key, pmes.GetProviderPeers(), ps, count, peerOut) + dht.addPeerListAsync(ctx, key, pmes.GetProviderPeers(), ps, count, peerOut) }(pp) } wg.Wait() @@ -154,13 +154,13 @@ func (dht *IpfsDHT) FindProvidersAsync(ctx context.Context, key u.Key, count int return peerOut } -func (dht *IpfsDHT) addPeerListAsync(k u.Key, peers []*pb.Message_Peer, ps *peerSet, count int, out chan peer.Peer) { +func (dht *IpfsDHT) addPeerListAsync(ctx context.Context, k u.Key, peers []*pb.Message_Peer, ps *peerSet, count int, out chan peer.Peer) { done := make(chan struct{}) for _, pbp := range peers { go func(mp *pb.Message_Peer) { defer func() { done <- struct{}{} }() // construct new peer - p, err := dht.ensureConnectedToPeer(mp) + p, err := dht.ensureConnectedToPeer(ctx, mp) if err != nil { log.Error("%s", err) return From ed247ec1543168da2c64564943fec7ee7f32f687 Mon Sep 17 00:00:00 2001 From: Brian Tiger Chow Date: Wed, 5 Nov 2014 06:45:50 -0800 Subject: [PATCH 3/8] add do --- util/do.go | 15 +++++++++++++++ util/do_test.go | 42 ++++++++++++++++++++++++++++++++++++++++++ 2 files changed, 57 insertions(+) create mode 100644 util/do.go create mode 100644 util/do_test.go diff --git a/util/do.go b/util/do.go new file mode 100644 index 000000000..248f0eda8 --- /dev/null +++ b/util/do.go @@ -0,0 +1,15 @@ +package util + +import "code.google.com/p/go.net/context" + +func Do(ctx context.Context, f func() error) error { + ch := make(chan error) + go func() { ch <- f() }() + select { + case <-ctx.Done(): + return ctx.Err() + case val := <-ch: + return val + } + return nil +} diff --git a/util/do_test.go b/util/do_test.go new file mode 100644 index 000000000..14861265f --- /dev/null +++ b/util/do_test.go @@ -0,0 +1,42 @@ +package util + +import ( + "errors" + "testing" + + "code.google.com/p/go.net/context" +) + +func TestDoReturnsContextErr(t *testing.T) { + ctx, cancel := context.WithCancel(context.Background()) + ch := make(chan struct{}) + err := Do(ctx, func() error { + cancel() + ch <- struct{}{} // won't return + return nil + }) + if err != ctx.Err() { + t.Fail() + } +} + +func TestDoReturnsFuncError(t *testing.T) { + ctx := context.Background() + expected := errors.New("expected to be returned by Do") + err := Do(ctx, func() error { + return expected + }) + if err != expected { + t.Fail() + } +} + +func TestDoReturnsNil(t *testing.T) { + ctx := context.Background() + err := Do(ctx, func() error { + return nil + }) + if err != nil { + t.Fail() + } +} From 68618f3fb8e7be9b2866575222944f09e36a64ee Mon Sep 17 00:00:00 2001 From: Brian Tiger Chow Date: Wed, 5 Nov 2014 06:48:08 -0800 Subject: [PATCH 4/8] fix(net) use Do to respect the caller's context --- net/net.go | 9 +++++++-- 1 file changed, 7 insertions(+), 2 deletions(-) diff --git a/net/net.go b/net/net.go index ed648c976..be62f84a9 100644 --- a/net/net.go +++ b/net/net.go @@ -5,6 +5,7 @@ import ( mux "github.com/jbenet/go-ipfs/net/mux" swarm "github.com/jbenet/go-ipfs/net/swarm" peer "github.com/jbenet/go-ipfs/peer" + util "github.com/jbenet/go-ipfs/util" ctxc "github.com/jbenet/go-ipfs/util/ctxcloser" context "github.com/jbenet/go-ipfs/Godeps/_workspace/src/code.google.com/p/go.net/context" @@ -56,9 +57,13 @@ func NewIpfsNetwork(ctx context.Context, listen []ma.Multiaddr, local peer.Peer, // Listen handles incoming connections on given Multiaddr. // func (n *IpfsNetwork) Listen(*ma.Muliaddr) error {} -// DialPeer attempts to establish a connection to a given peer +// DialPeer attempts to establish a connection to a given peer. +// Respects the context. func (n *IpfsNetwork) DialPeer(ctx context.Context, p peer.Peer) error { - _, err := n.swarm.Dial(p) + err := util.Do(ctx, func() error { + _, err := n.swarm.Dial(p) + return err + }) return err } From 97e11c7d01d3c05eec6708be5de8c5654a5434ff Mon Sep 17 00:00:00 2001 From: Brian Tiger Chow Date: Wed, 5 Nov 2014 06:53:35 -0800 Subject: [PATCH 5/8] fix(do) child listens on ctx too --- util/do.go | 11 +++++++++-- util/do_test.go | 2 +- 2 files changed, 10 insertions(+), 3 deletions(-) diff --git a/util/do.go b/util/do.go index 248f0eda8..7b86768c3 100644 --- a/util/do.go +++ b/util/do.go @@ -1,10 +1,17 @@ package util -import "code.google.com/p/go.net/context" +import "github.com/jbenet/go-ipfs/Godeps/_workspace/src/code.google.com/p/go.net/context" func Do(ctx context.Context, f func() error) error { + ch := make(chan error) - go func() { ch <- f() }() + + go func() { + select { + case <-ctx.Done(): + case ch <- f(): + } + }() select { case <-ctx.Done(): return ctx.Err() diff --git a/util/do_test.go b/util/do_test.go index 14861265f..45ca0a469 100644 --- a/util/do_test.go +++ b/util/do_test.go @@ -4,7 +4,7 @@ import ( "errors" "testing" - "code.google.com/p/go.net/context" + "github.com/jbenet/go-ipfs/Godeps/_workspace/src/code.google.com/p/go.net/context" ) func TestDoReturnsContextErr(t *testing.T) { From a529378ce075c262a7daca6f56ab7f6f52aa6e6a Mon Sep 17 00:00:00 2001 From: Brian Tiger Chow Date: Wed, 5 Nov 2014 07:05:12 -0800 Subject: [PATCH 6/8] fix(bitswap) don't 'go' local function calls --- exchange/bitswap/bitswap.go | 41 +++++++++++++++++++++++-------------- 1 file changed, 26 insertions(+), 15 deletions(-) diff --git a/exchange/bitswap/bitswap.go b/exchange/bitswap/bitswap.go index af84caa05..843bed4a9 100644 --- a/exchange/bitswap/bitswap.go +++ b/exchange/bitswap/bitswap.go @@ -21,15 +21,28 @@ import ( var log = u.Logger("bitswap") // NetMessageSession initializes a BitSwap session that communicates over the -// provided NetMessage service -func NetMessageSession(parent context.Context, p peer.Peer, +// provided NetMessage service. +// Runs until context is cancelled +func NetMessageSession(ctx context.Context, p peer.Peer, net inet.Network, srv inet.Service, directory bsnet.Routing, d ds.ThreadSafeDatastore, nice bool) exchange.Interface { networkAdapter := bsnet.NetMessageAdapter(srv, net, nil) + + notif := notifications.New() + + go func() { + for { + select { + case <-ctx.Done(): + notif.Shutdown() + } + } + }() + bs := &bitswap{ blockstore: blockstore.NewBlockstore(d), - notifications: notifications.New(), // TODO Shutdown() + notifications: notif, strategy: strategy.New(nice), routing: directory, sender: networkAdapter, @@ -119,15 +132,14 @@ func (bs *bitswap) Block(parent context.Context, k u.Key) (*blocks.Block, error) case block := <-promise: cancelFunc() bs.wantlist.Remove(k) - // TODO remove from wantlist return &block, nil case <-parent.Done(): return nil, parent.Err() } } -// HasBlock announces the existance of a block to bitswap, potentially sending -// it to peers (Partners) whose WantLists include it. +// HasBlock announces the existance of a block to this bitswap service. The +// service will potentially notify its peers. func (bs *bitswap) HasBlock(ctx context.Context, blk blocks.Block) error { log.Debugf("Has Block %v", blk.Key()) bs.wantlist.Remove(blk.Key()) @@ -162,13 +174,11 @@ func (bs *bitswap) ReceiveMessage(ctx context.Context, p peer.Peer, incoming bsm if err := bs.blockstore.Put(&block); err != nil { continue // FIXME(brian): err ignored } - go bs.notifications.Publish(block) - go func(block blocks.Block) { - err := bs.HasBlock(ctx, block) // FIXME err ignored - if err != nil { - log.Warningf("HasBlock errored: %s", err) - } - }(block) + bs.notifications.Publish(block) + err := bs.HasBlock(ctx, block) + if err != nil { + log.Warningf("HasBlock errored: %s", err) + } } message := bsmsg.New() @@ -202,11 +212,12 @@ func (bs *bitswap) ReceiveError(err error) { // sent func (bs *bitswap) send(ctx context.Context, p peer.Peer, m bsmsg.BitSwapMessage) { bs.sender.SendMessage(ctx, p, m) - go bs.strategy.MessageSent(p, m) + bs.strategy.MessageSent(p, m) } func (bs *bitswap) sendToPeersThatWant(ctx context.Context, block blocks.Block) { log.Debugf("Sending %v to peers that want it", block.Key()) + for _, p := range bs.strategy.Peers() { if bs.strategy.BlockIsWantedByPeer(block.Key(), p) { log.Debugf("%v wants %v", p, block.Key()) @@ -216,7 +227,7 @@ func (bs *bitswap) sendToPeersThatWant(ctx context.Context, block blocks.Block) for _, wanted := range bs.wantlist.Keys() { message.AddWanted(wanted) } - go bs.send(ctx, p, message) + bs.send(ctx, p, message) } } } From 40c5cca1173dd1efa346cc8d92c31a0a766ed537 Mon Sep 17 00:00:00 2001 From: Brian Tiger Chow Date: Wed, 5 Nov 2014 07:07:49 -0800 Subject: [PATCH 7/8] fix(bitswap) always cancel on return --- exchange/bitswap/bitswap.go | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/exchange/bitswap/bitswap.go b/exchange/bitswap/bitswap.go index 843bed4a9..3ccab5d97 100644 --- a/exchange/bitswap/bitswap.go +++ b/exchange/bitswap/bitswap.go @@ -88,6 +88,8 @@ func (bs *bitswap) Block(parent context.Context, k u.Key) (*blocks.Block, error) }() ctx, cancelFunc := context.WithCancel(parent) + defer cancelFunc() + bs.wantlist.Add(k) promise := bs.notifications.Subscribe(ctx, k) @@ -130,7 +132,6 @@ func (bs *bitswap) Block(parent context.Context, k u.Key) (*blocks.Block, error) select { case block := <-promise: - cancelFunc() bs.wantlist.Remove(k) return &block, nil case <-parent.Done(): From 390f4d7419efa694e41d9779d01aeab3f1188672 Mon Sep 17 00:00:00 2001 From: Brian Tiger Chow Date: Wed, 5 Nov 2014 10:07:16 -0800 Subject: [PATCH 8/8] readability(util) Do -> ContextDo @jbenet --- net/net.go | 2 +- util/do.go | 2 +- util/do_test.go | 8 ++++---- 3 files changed, 6 insertions(+), 6 deletions(-) diff --git a/net/net.go b/net/net.go index be62f84a9..c8cdb20ee 100644 --- a/net/net.go +++ b/net/net.go @@ -60,7 +60,7 @@ func NewIpfsNetwork(ctx context.Context, listen []ma.Multiaddr, local peer.Peer, // DialPeer attempts to establish a connection to a given peer. // Respects the context. func (n *IpfsNetwork) DialPeer(ctx context.Context, p peer.Peer) error { - err := util.Do(ctx, func() error { + err := util.ContextDo(ctx, func() error { _, err := n.swarm.Dial(p) return err }) diff --git a/util/do.go b/util/do.go index 7b86768c3..95ec6e56a 100644 --- a/util/do.go +++ b/util/do.go @@ -2,7 +2,7 @@ package util import "github.com/jbenet/go-ipfs/Godeps/_workspace/src/code.google.com/p/go.net/context" -func Do(ctx context.Context, f func() error) error { +func ContextDo(ctx context.Context, f func() error) error { ch := make(chan error) diff --git a/util/do_test.go b/util/do_test.go index 45ca0a469..8abc91589 100644 --- a/util/do_test.go +++ b/util/do_test.go @@ -10,7 +10,7 @@ import ( func TestDoReturnsContextErr(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) ch := make(chan struct{}) - err := Do(ctx, func() error { + err := ContextDo(ctx, func() error { cancel() ch <- struct{}{} // won't return return nil @@ -22,8 +22,8 @@ func TestDoReturnsContextErr(t *testing.T) { func TestDoReturnsFuncError(t *testing.T) { ctx := context.Background() - expected := errors.New("expected to be returned by Do") - err := Do(ctx, func() error { + expected := errors.New("expected to be returned by ContextDo") + err := ContextDo(ctx, func() error { return expected }) if err != expected { @@ -33,7 +33,7 @@ func TestDoReturnsFuncError(t *testing.T) { func TestDoReturnsNil(t *testing.T) { ctx := context.Background() - err := Do(ctx, func() error { + err := ContextDo(ctx, func() error { return nil }) if err != nil {