From 0324b4b28356059151d6215ae8d58880f5354e20 Mon Sep 17 00:00:00 2001 From: Jeromy Johnson Date: Tue, 5 May 2015 12:28:50 -0700 Subject: [PATCH 1/2] mild refactor of bitswap --- exchange/bitswap/bitswap.go | 34 ++++-------- exchange/bitswap/network/interface.go | 11 +--- exchange/bitswap/testnet/network_test.go | 70 +++--------------------- exchange/bitswap/testnet/virtual.go | 63 +-------------------- 4 files changed, 23 insertions(+), 155 deletions(-) diff --git a/exchange/bitswap/bitswap.go b/exchange/bitswap/bitswap.go index 8b12a4727..61854c79a 100644 --- a/exchange/bitswap/bitswap.go +++ b/exchange/bitswap/bitswap.go @@ -4,6 +4,7 @@ package bitswap import ( "errors" + "fmt" "math" "sync" "time" @@ -324,47 +325,31 @@ func (bs *Bitswap) sendWantlistToProviders(ctx context.Context, entries []wantli } // TODO(brian): handle errors -func (bs *Bitswap) ReceiveMessage(ctx context.Context, p peer.ID, incoming bsmsg.BitSwapMessage) ( - peer.ID, bsmsg.BitSwapMessage) { +func (bs *Bitswap) ReceiveMessage(ctx context.Context, p peer.ID, incoming bsmsg.BitSwapMessage) error { defer log.EventBegin(ctx, "receiveMessage", p, incoming).Done() - if p == "" { - log.Debug("Received message from nil peer!") - // TODO propagate the error upward - return "", nil - } - if incoming == nil { - log.Debug("Got nil bitswap message!") - // TODO propagate the error upward - return "", nil - } - // This call records changes to wantlists, blocks received, // and number of bytes transfered. bs.engine.MessageReceived(p, incoming) // TODO: this is bad, and could be easily abused. // Should only track *useful* messages in ledger + var keys []u.Key for _, block := range incoming.Blocks() { bs.blocksRecvd++ if has, err := bs.blockstore.Has(block.Key()); err == nil && has { bs.dupBlocksRecvd++ } + log.Debugf("got block %s from %s", block, p) hasBlockCtx, cancel := context.WithTimeout(ctx, hasBlockTimeout) if err := bs.HasBlock(hasBlockCtx, block); err != nil { - log.Debug(err) + return fmt.Errorf("ReceiveMessage HasBlock error: %s", err) } cancel() - } - - var keys []u.Key - for _, block := range incoming.Blocks() { keys = append(keys, block.Key()) } - bs.cancelBlocks(ctx, keys) - // TODO: consider changing this function to not return anything - return "", nil + return bs.cancelBlocks(ctx, keys) } // Connected/Disconnected warns bitswap about peer connections @@ -384,21 +369,24 @@ func (bs *Bitswap) PeerDisconnected(p peer.ID) { bs.engine.PeerDisconnected(p) } -func (bs *Bitswap) cancelBlocks(ctx context.Context, bkeys []u.Key) { +func (bs *Bitswap) cancelBlocks(ctx context.Context, bkeys []u.Key) error { if len(bkeys) < 1 { - return + return nil } message := bsmsg.New() message.SetFull(false) for _, k := range bkeys { + log.Debug("cancel block: %s", k) message.Cancel(k) } for _, p := range bs.engine.Peers() { err := bs.send(ctx, p, message) if err != nil { log.Debugf("Error sending message: %s", err) + return err } } + return nil } func (bs *Bitswap) wantNewBlocks(ctx context.Context, bkeys []u.Key) { diff --git a/exchange/bitswap/network/interface.go b/exchange/bitswap/network/interface.go index 146c73341..a6ed070c0 100644 --- a/exchange/bitswap/network/interface.go +++ b/exchange/bitswap/network/interface.go @@ -19,12 +19,6 @@ type BitSwapNetwork interface { peer.ID, bsmsg.BitSwapMessage) error - // SendRequest sends a BitSwap message to a peer and waits for a response. - SendRequest( - context.Context, - peer.ID, - bsmsg.BitSwapMessage) (incoming bsmsg.BitSwapMessage, err error) - // SetDelegate registers the Reciver to handle messages received from the // network. SetDelegate(Receiver) @@ -35,8 +29,9 @@ type BitSwapNetwork interface { // Implement Receiver to receive messages from the BitSwapNetwork type Receiver interface { ReceiveMessage( - ctx context.Context, sender peer.ID, incoming bsmsg.BitSwapMessage) ( - destination peer.ID, outgoing bsmsg.BitSwapMessage) + ctx context.Context, + sender peer.ID, + incoming bsmsg.BitSwapMessage) error ReceiveError(error) diff --git a/exchange/bitswap/testnet/network_test.go b/exchange/bitswap/testnet/network_test.go index 8d457d81c..9091ff255 100644 --- a/exchange/bitswap/testnet/network_test.go +++ b/exchange/bitswap/testnet/network_test.go @@ -14,57 +14,6 @@ import ( testutil "github.com/ipfs/go-ipfs/util/testutil" ) -func TestSendRequestToCooperativePeer(t *testing.T) { - net := VirtualNetwork(mockrouting.NewServer(), delay.Fixed(0)) - - recipientPeer := testutil.RandIdentityOrFatal(t) - - t.Log("Get two network adapters") - - initiator := net.Adapter(testutil.RandIdentityOrFatal(t)) - recipient := net.Adapter(recipientPeer) - - expectedStr := "response from recipient" - recipient.SetDelegate(lambda(func( - ctx context.Context, - from peer.ID, - incoming bsmsg.BitSwapMessage) ( - peer.ID, bsmsg.BitSwapMessage) { - - t.Log("Recipient received a message from the network") - - // TODO test contents of incoming message - - m := bsmsg.New() - m.AddBlock(blocks.NewBlock([]byte(expectedStr))) - - return from, m - })) - - t.Log("Build a message and send a synchronous request to recipient") - - message := bsmsg.New() - message.AddBlock(blocks.NewBlock([]byte("data"))) - response, err := initiator.SendRequest( - context.Background(), recipientPeer.ID(), message) - if err != nil { - t.Fatal(err) - } - - t.Log("Check the contents of the response from recipient") - - if response == nil { - t.Fatal("Should have received a response") - } - - for _, blockFromRecipient := range response.Blocks() { - if string(blockFromRecipient.Data) == expectedStr { - return - } - } - t.Fatal("Should have returned after finding expected block data") -} - func TestSendMessageAsyncButWaitForResponse(t *testing.T) { net := VirtualNetwork(mockrouting.NewServer(), delay.Fixed(0)) responderPeer := testutil.RandIdentityOrFatal(t) @@ -80,20 +29,19 @@ func TestSendMessageAsyncButWaitForResponse(t *testing.T) { responder.SetDelegate(lambda(func( ctx context.Context, fromWaiter peer.ID, - msgFromWaiter bsmsg.BitSwapMessage) ( - peer.ID, bsmsg.BitSwapMessage) { + msgFromWaiter bsmsg.BitSwapMessage) error { msgToWaiter := bsmsg.New() msgToWaiter.AddBlock(blocks.NewBlock([]byte(expectedStr))) + waiter.SendMessage(ctx, fromWaiter, msgToWaiter) - return fromWaiter, msgToWaiter + return nil })) waiter.SetDelegate(lambda(func( ctx context.Context, fromResponder peer.ID, - msgFromResponder bsmsg.BitSwapMessage) ( - peer.ID, bsmsg.BitSwapMessage) { + msgFromResponder bsmsg.BitSwapMessage) error { // TODO assert that this came from the correct peer and that the message contents are as expected ok := false @@ -108,7 +56,7 @@ func TestSendMessageAsyncButWaitForResponse(t *testing.T) { t.Fatal("Message not received from the responder") } - return "", nil + return nil })) messageSentAsync := bsmsg.New() @@ -123,7 +71,7 @@ func TestSendMessageAsyncButWaitForResponse(t *testing.T) { } type receiverFunc func(ctx context.Context, p peer.ID, - incoming bsmsg.BitSwapMessage) (peer.ID, bsmsg.BitSwapMessage) + incoming bsmsg.BitSwapMessage) error // lambda returns a Receiver instance given a receiver function func lambda(f receiverFunc) bsnet.Receiver { @@ -133,13 +81,11 @@ func lambda(f receiverFunc) bsnet.Receiver { } type lambdaImpl struct { - f func(ctx context.Context, p peer.ID, incoming bsmsg.BitSwapMessage) ( - peer.ID, bsmsg.BitSwapMessage) + f func(ctx context.Context, p peer.ID, incoming bsmsg.BitSwapMessage) error } func (lam *lambdaImpl) ReceiveMessage(ctx context.Context, - p peer.ID, incoming bsmsg.BitSwapMessage) ( - peer.ID, bsmsg.BitSwapMessage) { + p peer.ID, incoming bsmsg.BitSwapMessage) error { return lam.f(ctx, p, incoming) } diff --git a/exchange/bitswap/testnet/virtual.go b/exchange/bitswap/testnet/virtual.go index e0812ffbd..feb5fd722 100644 --- a/exchange/bitswap/testnet/virtual.go +++ b/exchange/bitswap/testnet/virtual.go @@ -72,61 +72,7 @@ func (n *network) deliver( n.delay.Wait() - nextPeer, nextMsg := r.ReceiveMessage(context.TODO(), from, message) - - if (nextPeer == "" && nextMsg != nil) || (nextMsg == nil && nextPeer != "") { - return errors.New("Malformed client request") - } - - if nextPeer == "" && nextMsg == nil { // no response to send - return nil - } - - nextReceiver, ok := n.clients[nextPeer] - if !ok { - return errors.New("Cannot locate peer on network") - } - go n.deliver(nextReceiver, nextPeer, nextMsg) - return nil -} - -// TODO -func (n *network) SendRequest( - ctx context.Context, - from peer.ID, - to peer.ID, - message bsmsg.BitSwapMessage) ( - incoming bsmsg.BitSwapMessage, err error) { - - r, ok := n.clients[to] - if !ok { - return nil, errors.New("Cannot locate peer on network") - } - nextPeer, nextMsg := r.ReceiveMessage(context.TODO(), from, message) - - // TODO dedupe code - if (nextPeer == "" && nextMsg != nil) || (nextMsg == nil && nextPeer != "") { - r.ReceiveError(errors.New("Malformed client request")) - return nil, nil - } - - // TODO dedupe code - if nextPeer == "" && nextMsg == nil { - return nil, nil - } - - // TODO test when receiver doesn't immediately respond to the initiator of the request - if nextPeer != from { - go func() { - nextReceiver, ok := n.clients[nextPeer] - if !ok { - // TODO log the error? - } - n.deliver(nextReceiver, nextPeer, nextMsg) - }() - return nil, nil - } - return nextMsg, nil + return r.ReceiveMessage(context.TODO(), from, message) } type networkClient struct { @@ -143,13 +89,6 @@ func (nc *networkClient) SendMessage( return nc.network.SendMessage(ctx, nc.local, to, message) } -func (nc *networkClient) SendRequest( - ctx context.Context, - to peer.ID, - message bsmsg.BitSwapMessage) (incoming bsmsg.BitSwapMessage, err error) { - return nc.network.SendRequest(ctx, nc.local, to, message) -} - // FindProvidersAsync returns a channel of providers for the given key func (nc *networkClient) FindProvidersAsync(ctx context.Context, k util.Key, max int) <-chan peer.ID { From 9049dae6742e3b75d06c16705adcde55381ff819 Mon Sep 17 00:00:00 2001 From: Jeromy Date: Wed, 6 May 2015 00:50:44 -0700 Subject: [PATCH 2/2] address comments from CR --- exchange/bitswap/bitswap.go | 26 +++++++++++++++++--------- 1 file changed, 17 insertions(+), 9 deletions(-) diff --git a/exchange/bitswap/bitswap.go b/exchange/bitswap/bitswap.go index 61854c79a..757c9067e 100644 --- a/exchange/bitswap/bitswap.go +++ b/exchange/bitswap/bitswap.go @@ -349,7 +349,8 @@ func (bs *Bitswap) ReceiveMessage(ctx context.Context, p peer.ID, incoming bsmsg keys = append(keys, block.Key()) } - return bs.cancelBlocks(ctx, keys) + bs.cancelBlocks(ctx, keys) + return nil } // Connected/Disconnected warns bitswap about peer connections @@ -369,9 +370,9 @@ func (bs *Bitswap) PeerDisconnected(p peer.ID) { bs.engine.PeerDisconnected(p) } -func (bs *Bitswap) cancelBlocks(ctx context.Context, bkeys []u.Key) error { +func (bs *Bitswap) cancelBlocks(ctx context.Context, bkeys []u.Key) { if len(bkeys) < 1 { - return nil + return } message := bsmsg.New() message.SetFull(false) @@ -379,14 +380,21 @@ func (bs *Bitswap) cancelBlocks(ctx context.Context, bkeys []u.Key) error { log.Debug("cancel block: %s", k) message.Cancel(k) } + + wg := sync.WaitGroup{} for _, p := range bs.engine.Peers() { - err := bs.send(ctx, p, message) - if err != nil { - log.Debugf("Error sending message: %s", err) - return err - } + wg.Add(1) + go func(p peer.ID) { + defer wg.Done() + err := bs.send(ctx, p, message) + if err != nil { + log.Warningf("Error sending message: %s", err) + return + } + }(p) } - return nil + wg.Wait() + return } func (bs *Bitswap) wantNewBlocks(ctx context.Context, bkeys []u.Key) {