mirror of
https://github.com/ipfs/kubo.git
synced 2025-09-09 19:32:24 +08:00
mild refactor of bitswap
This commit is contained in:
@ -72,61 +72,7 @@ func (n *network) deliver(
|
||||
|
||||
n.delay.Wait()
|
||||
|
||||
nextPeer, nextMsg := r.ReceiveMessage(context.TODO(), from, message)
|
||||
|
||||
if (nextPeer == "" && nextMsg != nil) || (nextMsg == nil && nextPeer != "") {
|
||||
return errors.New("Malformed client request")
|
||||
}
|
||||
|
||||
if nextPeer == "" && nextMsg == nil { // no response to send
|
||||
return nil
|
||||
}
|
||||
|
||||
nextReceiver, ok := n.clients[nextPeer]
|
||||
if !ok {
|
||||
return errors.New("Cannot locate peer on network")
|
||||
}
|
||||
go n.deliver(nextReceiver, nextPeer, nextMsg)
|
||||
return nil
|
||||
}
|
||||
|
||||
// TODO
|
||||
func (n *network) SendRequest(
|
||||
ctx context.Context,
|
||||
from peer.ID,
|
||||
to peer.ID,
|
||||
message bsmsg.BitSwapMessage) (
|
||||
incoming bsmsg.BitSwapMessage, err error) {
|
||||
|
||||
r, ok := n.clients[to]
|
||||
if !ok {
|
||||
return nil, errors.New("Cannot locate peer on network")
|
||||
}
|
||||
nextPeer, nextMsg := r.ReceiveMessage(context.TODO(), from, message)
|
||||
|
||||
// TODO dedupe code
|
||||
if (nextPeer == "" && nextMsg != nil) || (nextMsg == nil && nextPeer != "") {
|
||||
r.ReceiveError(errors.New("Malformed client request"))
|
||||
return nil, nil
|
||||
}
|
||||
|
||||
// TODO dedupe code
|
||||
if nextPeer == "" && nextMsg == nil {
|
||||
return nil, nil
|
||||
}
|
||||
|
||||
// TODO test when receiver doesn't immediately respond to the initiator of the request
|
||||
if nextPeer != from {
|
||||
go func() {
|
||||
nextReceiver, ok := n.clients[nextPeer]
|
||||
if !ok {
|
||||
// TODO log the error?
|
||||
}
|
||||
n.deliver(nextReceiver, nextPeer, nextMsg)
|
||||
}()
|
||||
return nil, nil
|
||||
}
|
||||
return nextMsg, nil
|
||||
return r.ReceiveMessage(context.TODO(), from, message)
|
||||
}
|
||||
|
||||
type networkClient struct {
|
||||
@ -143,13 +89,6 @@ func (nc *networkClient) SendMessage(
|
||||
return nc.network.SendMessage(ctx, nc.local, to, message)
|
||||
}
|
||||
|
||||
func (nc *networkClient) SendRequest(
|
||||
ctx context.Context,
|
||||
to peer.ID,
|
||||
message bsmsg.BitSwapMessage) (incoming bsmsg.BitSwapMessage, err error) {
|
||||
return nc.network.SendRequest(ctx, nc.local, to, message)
|
||||
}
|
||||
|
||||
// FindProvidersAsync returns a channel of providers for the given key
|
||||
func (nc *networkClient) FindProvidersAsync(ctx context.Context, k util.Key, max int) <-chan peer.ID {
|
||||
|
||||
|
Reference in New Issue
Block a user