mirror of
https://github.com/ipfs/kubo.git
synced 2025-06-28 17:03:58 +08:00
bitswap virtual test net code should send messages in order
License: MIT Signed-off-by: Jeromy <jeromyj@gmail.com>
This commit is contained in:
@ -4,6 +4,7 @@ import (
|
||||
"context"
|
||||
"errors"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
bsmsg "github.com/ipfs/go-ipfs/exchange/bitswap/message"
|
||||
bsnet "github.com/ipfs/go-ipfs/exchange/bitswap/network"
|
||||
@ -22,7 +23,7 @@ var log = logging.Logger("bstestnet")
|
||||
|
||||
func VirtualNetwork(rs mockrouting.Server, d delay.D) Network {
|
||||
return &network{
|
||||
clients: make(map[peer.ID]bsnet.Receiver),
|
||||
clients: make(map[peer.ID]*receiverQueue),
|
||||
delay: d,
|
||||
routingserver: rs,
|
||||
conns: make(map[string]struct{}),
|
||||
@ -31,12 +32,28 @@ func VirtualNetwork(rs mockrouting.Server, d delay.D) Network {
|
||||
|
||||
type network struct {
|
||||
mu sync.Mutex
|
||||
clients map[peer.ID]bsnet.Receiver
|
||||
clients map[peer.ID]*receiverQueue
|
||||
routingserver mockrouting.Server
|
||||
delay delay.D
|
||||
conns map[string]struct{}
|
||||
}
|
||||
|
||||
type message struct {
|
||||
from peer.ID
|
||||
msg bsmsg.BitSwapMessage
|
||||
shouldSend time.Time
|
||||
}
|
||||
|
||||
// receiverQueue queues up a set of messages to be sent, and sends them *in
|
||||
// order* with their delays respected as much as sending them in order allows
|
||||
// for
|
||||
type receiverQueue struct {
|
||||
receiver bsnet.Receiver
|
||||
queue []*message
|
||||
active bool
|
||||
lk sync.Mutex
|
||||
}
|
||||
|
||||
func (n *network) Adapter(p testutil.Identity) bsnet.BitSwapNetwork {
|
||||
n.mu.Lock()
|
||||
defer n.mu.Unlock()
|
||||
@ -46,7 +63,7 @@ func (n *network) Adapter(p testutil.Identity) bsnet.BitSwapNetwork {
|
||||
network: n,
|
||||
routing: n.routingserver.Client(p),
|
||||
}
|
||||
n.clients[p.ID()] = client
|
||||
n.clients[p.ID()] = &receiverQueue{receiver: client}
|
||||
return client
|
||||
}
|
||||
|
||||
@ -64,7 +81,7 @@ func (n *network) SendMessage(
|
||||
ctx context.Context,
|
||||
from peer.ID,
|
||||
to peer.ID,
|
||||
message bsmsg.BitSwapMessage) error {
|
||||
mes bsmsg.BitSwapMessage) error {
|
||||
|
||||
n.mu.Lock()
|
||||
defer n.mu.Unlock()
|
||||
@ -77,7 +94,12 @@ func (n *network) SendMessage(
|
||||
// nb: terminate the context since the context wouldn't actually be passed
|
||||
// over the network in a real scenario
|
||||
|
||||
go n.deliver(receiver, from, message)
|
||||
msg := &message{
|
||||
from: from,
|
||||
msg: mes,
|
||||
shouldSend: time.Now().Add(n.delay.Get()),
|
||||
}
|
||||
receiver.enqueue(msg)
|
||||
|
||||
return nil
|
||||
}
|
||||
@ -191,11 +213,38 @@ func (nc *networkClient) ConnectTo(_ context.Context, p peer.ID) error {
|
||||
|
||||
// TODO: add handling for disconnects
|
||||
|
||||
otherClient.PeerConnected(nc.local)
|
||||
otherClient.receiver.PeerConnected(nc.local)
|
||||
nc.Receiver.PeerConnected(p)
|
||||
return nil
|
||||
}
|
||||
|
||||
func (rq *receiverQueue) enqueue(m *message) {
|
||||
rq.lk.Lock()
|
||||
defer rq.lk.Unlock()
|
||||
rq.queue = append(rq.queue, m)
|
||||
if !rq.active {
|
||||
rq.active = true
|
||||
go rq.process()
|
||||
}
|
||||
}
|
||||
|
||||
func (rq *receiverQueue) process() {
|
||||
for {
|
||||
rq.lk.Lock()
|
||||
if len(rq.queue) == 0 {
|
||||
rq.active = false
|
||||
rq.lk.Unlock()
|
||||
return
|
||||
}
|
||||
m := rq.queue[0]
|
||||
rq.queue = rq.queue[1:]
|
||||
rq.lk.Unlock()
|
||||
|
||||
time.Sleep(time.Until(m.shouldSend))
|
||||
rq.receiver.ReceiveMessage(context.TODO(), m.from, m.msg)
|
||||
}
|
||||
}
|
||||
|
||||
func tagForPeers(a, b peer.ID) string {
|
||||
if a < b {
|
||||
return string(a + b)
|
||||
|
Reference in New Issue
Block a user