mirror of
https://github.com/ipfs/kubo.git
synced 2025-07-01 19:24:14 +08:00
Merge pull request #1199 from ipfs/feat/bsrefactor
mild refactor of bitswap
This commit is contained in:
@ -4,6 +4,7 @@ package bitswap
|
|||||||
|
|
||||||
import (
|
import (
|
||||||
"errors"
|
"errors"
|
||||||
|
"fmt"
|
||||||
"math"
|
"math"
|
||||||
"sync"
|
"sync"
|
||||||
"time"
|
"time"
|
||||||
@ -324,47 +325,32 @@ func (bs *Bitswap) sendWantlistToProviders(ctx context.Context, entries []wantli
|
|||||||
}
|
}
|
||||||
|
|
||||||
// TODO(brian): handle errors
|
// TODO(brian): handle errors
|
||||||
func (bs *Bitswap) ReceiveMessage(ctx context.Context, p peer.ID, incoming bsmsg.BitSwapMessage) (
|
func (bs *Bitswap) ReceiveMessage(ctx context.Context, p peer.ID, incoming bsmsg.BitSwapMessage) error {
|
||||||
peer.ID, bsmsg.BitSwapMessage) {
|
|
||||||
defer log.EventBegin(ctx, "receiveMessage", p, incoming).Done()
|
defer log.EventBegin(ctx, "receiveMessage", p, incoming).Done()
|
||||||
|
|
||||||
if p == "" {
|
|
||||||
log.Debug("Received message from nil peer!")
|
|
||||||
// TODO propagate the error upward
|
|
||||||
return "", nil
|
|
||||||
}
|
|
||||||
if incoming == nil {
|
|
||||||
log.Debug("Got nil bitswap message!")
|
|
||||||
// TODO propagate the error upward
|
|
||||||
return "", nil
|
|
||||||
}
|
|
||||||
|
|
||||||
// This call records changes to wantlists, blocks received,
|
// This call records changes to wantlists, blocks received,
|
||||||
// and number of bytes transfered.
|
// and number of bytes transfered.
|
||||||
bs.engine.MessageReceived(p, incoming)
|
bs.engine.MessageReceived(p, incoming)
|
||||||
// TODO: this is bad, and could be easily abused.
|
// TODO: this is bad, and could be easily abused.
|
||||||
// Should only track *useful* messages in ledger
|
// Should only track *useful* messages in ledger
|
||||||
|
|
||||||
|
var keys []u.Key
|
||||||
for _, block := range incoming.Blocks() {
|
for _, block := range incoming.Blocks() {
|
||||||
bs.blocksRecvd++
|
bs.blocksRecvd++
|
||||||
if has, err := bs.blockstore.Has(block.Key()); err == nil && has {
|
if has, err := bs.blockstore.Has(block.Key()); err == nil && has {
|
||||||
bs.dupBlocksRecvd++
|
bs.dupBlocksRecvd++
|
||||||
}
|
}
|
||||||
|
log.Debugf("got block %s from %s", block, p)
|
||||||
hasBlockCtx, cancel := context.WithTimeout(ctx, hasBlockTimeout)
|
hasBlockCtx, cancel := context.WithTimeout(ctx, hasBlockTimeout)
|
||||||
if err := bs.HasBlock(hasBlockCtx, block); err != nil {
|
if err := bs.HasBlock(hasBlockCtx, block); err != nil {
|
||||||
log.Debug(err)
|
return fmt.Errorf("ReceiveMessage HasBlock error: %s", err)
|
||||||
}
|
}
|
||||||
cancel()
|
cancel()
|
||||||
}
|
|
||||||
|
|
||||||
var keys []u.Key
|
|
||||||
for _, block := range incoming.Blocks() {
|
|
||||||
keys = append(keys, block.Key())
|
keys = append(keys, block.Key())
|
||||||
}
|
}
|
||||||
bs.cancelBlocks(ctx, keys)
|
|
||||||
|
|
||||||
// TODO: consider changing this function to not return anything
|
bs.cancelBlocks(ctx, keys)
|
||||||
return "", nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// Connected/Disconnected warns bitswap about peer connections
|
// Connected/Disconnected warns bitswap about peer connections
|
||||||
@ -391,14 +377,24 @@ func (bs *Bitswap) cancelBlocks(ctx context.Context, bkeys []u.Key) {
|
|||||||
message := bsmsg.New()
|
message := bsmsg.New()
|
||||||
message.SetFull(false)
|
message.SetFull(false)
|
||||||
for _, k := range bkeys {
|
for _, k := range bkeys {
|
||||||
|
log.Debug("cancel block: %s", k)
|
||||||
message.Cancel(k)
|
message.Cancel(k)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
wg := sync.WaitGroup{}
|
||||||
for _, p := range bs.engine.Peers() {
|
for _, p := range bs.engine.Peers() {
|
||||||
err := bs.send(ctx, p, message)
|
wg.Add(1)
|
||||||
if err != nil {
|
go func(p peer.ID) {
|
||||||
log.Debugf("Error sending message: %s", err)
|
defer wg.Done()
|
||||||
}
|
err := bs.send(ctx, p, message)
|
||||||
|
if err != nil {
|
||||||
|
log.Warningf("Error sending message: %s", err)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
}(p)
|
||||||
}
|
}
|
||||||
|
wg.Wait()
|
||||||
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
func (bs *Bitswap) wantNewBlocks(ctx context.Context, bkeys []u.Key) {
|
func (bs *Bitswap) wantNewBlocks(ctx context.Context, bkeys []u.Key) {
|
||||||
|
@ -19,12 +19,6 @@ type BitSwapNetwork interface {
|
|||||||
peer.ID,
|
peer.ID,
|
||||||
bsmsg.BitSwapMessage) error
|
bsmsg.BitSwapMessage) error
|
||||||
|
|
||||||
// SendRequest sends a BitSwap message to a peer and waits for a response.
|
|
||||||
SendRequest(
|
|
||||||
context.Context,
|
|
||||||
peer.ID,
|
|
||||||
bsmsg.BitSwapMessage) (incoming bsmsg.BitSwapMessage, err error)
|
|
||||||
|
|
||||||
// SetDelegate registers the Reciver to handle messages received from the
|
// SetDelegate registers the Reciver to handle messages received from the
|
||||||
// network.
|
// network.
|
||||||
SetDelegate(Receiver)
|
SetDelegate(Receiver)
|
||||||
@ -35,8 +29,9 @@ type BitSwapNetwork interface {
|
|||||||
// Implement Receiver to receive messages from the BitSwapNetwork
|
// Implement Receiver to receive messages from the BitSwapNetwork
|
||||||
type Receiver interface {
|
type Receiver interface {
|
||||||
ReceiveMessage(
|
ReceiveMessage(
|
||||||
ctx context.Context, sender peer.ID, incoming bsmsg.BitSwapMessage) (
|
ctx context.Context,
|
||||||
destination peer.ID, outgoing bsmsg.BitSwapMessage)
|
sender peer.ID,
|
||||||
|
incoming bsmsg.BitSwapMessage) error
|
||||||
|
|
||||||
ReceiveError(error)
|
ReceiveError(error)
|
||||||
|
|
||||||
|
@ -14,57 +14,6 @@ import (
|
|||||||
testutil "github.com/ipfs/go-ipfs/util/testutil"
|
testutil "github.com/ipfs/go-ipfs/util/testutil"
|
||||||
)
|
)
|
||||||
|
|
||||||
func TestSendRequestToCooperativePeer(t *testing.T) {
|
|
||||||
net := VirtualNetwork(mockrouting.NewServer(), delay.Fixed(0))
|
|
||||||
|
|
||||||
recipientPeer := testutil.RandIdentityOrFatal(t)
|
|
||||||
|
|
||||||
t.Log("Get two network adapters")
|
|
||||||
|
|
||||||
initiator := net.Adapter(testutil.RandIdentityOrFatal(t))
|
|
||||||
recipient := net.Adapter(recipientPeer)
|
|
||||||
|
|
||||||
expectedStr := "response from recipient"
|
|
||||||
recipient.SetDelegate(lambda(func(
|
|
||||||
ctx context.Context,
|
|
||||||
from peer.ID,
|
|
||||||
incoming bsmsg.BitSwapMessage) (
|
|
||||||
peer.ID, bsmsg.BitSwapMessage) {
|
|
||||||
|
|
||||||
t.Log("Recipient received a message from the network")
|
|
||||||
|
|
||||||
// TODO test contents of incoming message
|
|
||||||
|
|
||||||
m := bsmsg.New()
|
|
||||||
m.AddBlock(blocks.NewBlock([]byte(expectedStr)))
|
|
||||||
|
|
||||||
return from, m
|
|
||||||
}))
|
|
||||||
|
|
||||||
t.Log("Build a message and send a synchronous request to recipient")
|
|
||||||
|
|
||||||
message := bsmsg.New()
|
|
||||||
message.AddBlock(blocks.NewBlock([]byte("data")))
|
|
||||||
response, err := initiator.SendRequest(
|
|
||||||
context.Background(), recipientPeer.ID(), message)
|
|
||||||
if err != nil {
|
|
||||||
t.Fatal(err)
|
|
||||||
}
|
|
||||||
|
|
||||||
t.Log("Check the contents of the response from recipient")
|
|
||||||
|
|
||||||
if response == nil {
|
|
||||||
t.Fatal("Should have received a response")
|
|
||||||
}
|
|
||||||
|
|
||||||
for _, blockFromRecipient := range response.Blocks() {
|
|
||||||
if string(blockFromRecipient.Data) == expectedStr {
|
|
||||||
return
|
|
||||||
}
|
|
||||||
}
|
|
||||||
t.Fatal("Should have returned after finding expected block data")
|
|
||||||
}
|
|
||||||
|
|
||||||
func TestSendMessageAsyncButWaitForResponse(t *testing.T) {
|
func TestSendMessageAsyncButWaitForResponse(t *testing.T) {
|
||||||
net := VirtualNetwork(mockrouting.NewServer(), delay.Fixed(0))
|
net := VirtualNetwork(mockrouting.NewServer(), delay.Fixed(0))
|
||||||
responderPeer := testutil.RandIdentityOrFatal(t)
|
responderPeer := testutil.RandIdentityOrFatal(t)
|
||||||
@ -80,20 +29,19 @@ func TestSendMessageAsyncButWaitForResponse(t *testing.T) {
|
|||||||
responder.SetDelegate(lambda(func(
|
responder.SetDelegate(lambda(func(
|
||||||
ctx context.Context,
|
ctx context.Context,
|
||||||
fromWaiter peer.ID,
|
fromWaiter peer.ID,
|
||||||
msgFromWaiter bsmsg.BitSwapMessage) (
|
msgFromWaiter bsmsg.BitSwapMessage) error {
|
||||||
peer.ID, bsmsg.BitSwapMessage) {
|
|
||||||
|
|
||||||
msgToWaiter := bsmsg.New()
|
msgToWaiter := bsmsg.New()
|
||||||
msgToWaiter.AddBlock(blocks.NewBlock([]byte(expectedStr)))
|
msgToWaiter.AddBlock(blocks.NewBlock([]byte(expectedStr)))
|
||||||
|
waiter.SendMessage(ctx, fromWaiter, msgToWaiter)
|
||||||
|
|
||||||
return fromWaiter, msgToWaiter
|
return nil
|
||||||
}))
|
}))
|
||||||
|
|
||||||
waiter.SetDelegate(lambda(func(
|
waiter.SetDelegate(lambda(func(
|
||||||
ctx context.Context,
|
ctx context.Context,
|
||||||
fromResponder peer.ID,
|
fromResponder peer.ID,
|
||||||
msgFromResponder bsmsg.BitSwapMessage) (
|
msgFromResponder bsmsg.BitSwapMessage) error {
|
||||||
peer.ID, bsmsg.BitSwapMessage) {
|
|
||||||
|
|
||||||
// TODO assert that this came from the correct peer and that the message contents are as expected
|
// TODO assert that this came from the correct peer and that the message contents are as expected
|
||||||
ok := false
|
ok := false
|
||||||
@ -108,7 +56,7 @@ func TestSendMessageAsyncButWaitForResponse(t *testing.T) {
|
|||||||
t.Fatal("Message not received from the responder")
|
t.Fatal("Message not received from the responder")
|
||||||
|
|
||||||
}
|
}
|
||||||
return "", nil
|
return nil
|
||||||
}))
|
}))
|
||||||
|
|
||||||
messageSentAsync := bsmsg.New()
|
messageSentAsync := bsmsg.New()
|
||||||
@ -123,7 +71,7 @@ func TestSendMessageAsyncButWaitForResponse(t *testing.T) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
type receiverFunc func(ctx context.Context, p peer.ID,
|
type receiverFunc func(ctx context.Context, p peer.ID,
|
||||||
incoming bsmsg.BitSwapMessage) (peer.ID, bsmsg.BitSwapMessage)
|
incoming bsmsg.BitSwapMessage) error
|
||||||
|
|
||||||
// lambda returns a Receiver instance given a receiver function
|
// lambda returns a Receiver instance given a receiver function
|
||||||
func lambda(f receiverFunc) bsnet.Receiver {
|
func lambda(f receiverFunc) bsnet.Receiver {
|
||||||
@ -133,13 +81,11 @@ func lambda(f receiverFunc) bsnet.Receiver {
|
|||||||
}
|
}
|
||||||
|
|
||||||
type lambdaImpl struct {
|
type lambdaImpl struct {
|
||||||
f func(ctx context.Context, p peer.ID, incoming bsmsg.BitSwapMessage) (
|
f func(ctx context.Context, p peer.ID, incoming bsmsg.BitSwapMessage) error
|
||||||
peer.ID, bsmsg.BitSwapMessage)
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func (lam *lambdaImpl) ReceiveMessage(ctx context.Context,
|
func (lam *lambdaImpl) ReceiveMessage(ctx context.Context,
|
||||||
p peer.ID, incoming bsmsg.BitSwapMessage) (
|
p peer.ID, incoming bsmsg.BitSwapMessage) error {
|
||||||
peer.ID, bsmsg.BitSwapMessage) {
|
|
||||||
return lam.f(ctx, p, incoming)
|
return lam.f(ctx, p, incoming)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -72,61 +72,7 @@ func (n *network) deliver(
|
|||||||
|
|
||||||
n.delay.Wait()
|
n.delay.Wait()
|
||||||
|
|
||||||
nextPeer, nextMsg := r.ReceiveMessage(context.TODO(), from, message)
|
return r.ReceiveMessage(context.TODO(), from, message)
|
||||||
|
|
||||||
if (nextPeer == "" && nextMsg != nil) || (nextMsg == nil && nextPeer != "") {
|
|
||||||
return errors.New("Malformed client request")
|
|
||||||
}
|
|
||||||
|
|
||||||
if nextPeer == "" && nextMsg == nil { // no response to send
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
|
|
||||||
nextReceiver, ok := n.clients[nextPeer]
|
|
||||||
if !ok {
|
|
||||||
return errors.New("Cannot locate peer on network")
|
|
||||||
}
|
|
||||||
go n.deliver(nextReceiver, nextPeer, nextMsg)
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
|
|
||||||
// TODO
|
|
||||||
func (n *network) SendRequest(
|
|
||||||
ctx context.Context,
|
|
||||||
from peer.ID,
|
|
||||||
to peer.ID,
|
|
||||||
message bsmsg.BitSwapMessage) (
|
|
||||||
incoming bsmsg.BitSwapMessage, err error) {
|
|
||||||
|
|
||||||
r, ok := n.clients[to]
|
|
||||||
if !ok {
|
|
||||||
return nil, errors.New("Cannot locate peer on network")
|
|
||||||
}
|
|
||||||
nextPeer, nextMsg := r.ReceiveMessage(context.TODO(), from, message)
|
|
||||||
|
|
||||||
// TODO dedupe code
|
|
||||||
if (nextPeer == "" && nextMsg != nil) || (nextMsg == nil && nextPeer != "") {
|
|
||||||
r.ReceiveError(errors.New("Malformed client request"))
|
|
||||||
return nil, nil
|
|
||||||
}
|
|
||||||
|
|
||||||
// TODO dedupe code
|
|
||||||
if nextPeer == "" && nextMsg == nil {
|
|
||||||
return nil, nil
|
|
||||||
}
|
|
||||||
|
|
||||||
// TODO test when receiver doesn't immediately respond to the initiator of the request
|
|
||||||
if nextPeer != from {
|
|
||||||
go func() {
|
|
||||||
nextReceiver, ok := n.clients[nextPeer]
|
|
||||||
if !ok {
|
|
||||||
// TODO log the error?
|
|
||||||
}
|
|
||||||
n.deliver(nextReceiver, nextPeer, nextMsg)
|
|
||||||
}()
|
|
||||||
return nil, nil
|
|
||||||
}
|
|
||||||
return nextMsg, nil
|
|
||||||
}
|
}
|
||||||
|
|
||||||
type networkClient struct {
|
type networkClient struct {
|
||||||
@ -143,13 +89,6 @@ func (nc *networkClient) SendMessage(
|
|||||||
return nc.network.SendMessage(ctx, nc.local, to, message)
|
return nc.network.SendMessage(ctx, nc.local, to, message)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (nc *networkClient) SendRequest(
|
|
||||||
ctx context.Context,
|
|
||||||
to peer.ID,
|
|
||||||
message bsmsg.BitSwapMessage) (incoming bsmsg.BitSwapMessage, err error) {
|
|
||||||
return nc.network.SendRequest(ctx, nc.local, to, message)
|
|
||||||
}
|
|
||||||
|
|
||||||
// FindProvidersAsync returns a channel of providers for the given key
|
// FindProvidersAsync returns a channel of providers for the given key
|
||||||
func (nc *networkClient) FindProvidersAsync(ctx context.Context, k util.Key, max int) <-chan peer.ID {
|
func (nc *networkClient) FindProvidersAsync(ctx context.Context, k util.Key, max int) <-chan peer.ID {
|
||||||
|
|
||||||
|
Reference in New Issue
Block a user