mirror of
https://github.com/ipfs/kubo.git
synced 2025-06-28 17:03:58 +08:00
Merge pull request #1471 from heems/master
add transport from netsim and bandwidth to mocknet
This commit is contained in:
@ -46,7 +46,7 @@ func (g *SessionGenerator) Next() Instance {
|
|||||||
if err != nil {
|
if err != nil {
|
||||||
panic("FIXME") // TODO change signature
|
panic("FIXME") // TODO change signature
|
||||||
}
|
}
|
||||||
return session(g.ctx, g.net, p)
|
return Session(g.ctx, g.net, p)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (g *SessionGenerator) Instances(n int) []Instance {
|
func (g *SessionGenerator) Instances(n int) []Instance {
|
||||||
@ -85,7 +85,7 @@ func (i *Instance) SetBlockstoreLatency(t time.Duration) time.Duration {
|
|||||||
// NB: It's easy make mistakes by providing the same peer ID to two different
|
// NB: It's easy make mistakes by providing the same peer ID to two different
|
||||||
// sessions. To safeguard, use the SessionGenerator to generate sessions. It's
|
// sessions. To safeguard, use the SessionGenerator to generate sessions. It's
|
||||||
// just a much better idea.
|
// just a much better idea.
|
||||||
func session(ctx context.Context, net tn.Network, p testutil.Identity) Instance {
|
func Session(ctx context.Context, net tn.Network, p testutil.Identity) Instance {
|
||||||
bsdelay := delay.Fixed(0)
|
bsdelay := delay.Fixed(0)
|
||||||
const writeCacheElems = 100
|
const writeCacheElems = 100
|
||||||
|
|
||||||
|
@ -7,13 +7,12 @@
|
|||||||
package mocknet
|
package mocknet
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"io"
|
|
||||||
"time"
|
|
||||||
|
|
||||||
ic "github.com/ipfs/go-ipfs/p2p/crypto"
|
ic "github.com/ipfs/go-ipfs/p2p/crypto"
|
||||||
host "github.com/ipfs/go-ipfs/p2p/host"
|
host "github.com/ipfs/go-ipfs/p2p/host"
|
||||||
inet "github.com/ipfs/go-ipfs/p2p/net"
|
inet "github.com/ipfs/go-ipfs/p2p/net"
|
||||||
peer "github.com/ipfs/go-ipfs/p2p/peer"
|
peer "github.com/ipfs/go-ipfs/p2p/peer"
|
||||||
|
"io"
|
||||||
|
"time"
|
||||||
|
|
||||||
ma "github.com/ipfs/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-multiaddr"
|
ma "github.com/ipfs/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-multiaddr"
|
||||||
)
|
)
|
||||||
@ -59,13 +58,14 @@ type Mocknet interface {
|
|||||||
ConnectNets(inet.Network, inet.Network) (inet.Conn, error)
|
ConnectNets(inet.Network, inet.Network) (inet.Conn, error)
|
||||||
DisconnectPeers(peer.ID, peer.ID) error
|
DisconnectPeers(peer.ID, peer.ID) error
|
||||||
DisconnectNets(inet.Network, inet.Network) error
|
DisconnectNets(inet.Network, inet.Network) error
|
||||||
|
LinkAll() error
|
||||||
}
|
}
|
||||||
|
|
||||||
// LinkOptions are used to change aspects of the links.
|
// LinkOptions are used to change aspects of the links.
|
||||||
// Sorry but they dont work yet :(
|
// Sorry but they dont work yet :(
|
||||||
type LinkOptions struct {
|
type LinkOptions struct {
|
||||||
Latency time.Duration
|
Latency time.Duration
|
||||||
Bandwidth int // in bytes-per-second
|
Bandwidth float64 // in bytes-per-second
|
||||||
// we can make these values distributions down the road.
|
// we can make these values distributions down the road.
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -1,8 +1,10 @@
|
|||||||
package mocknet
|
package mocknet
|
||||||
|
|
||||||
import (
|
import (
|
||||||
|
// "fmt"
|
||||||
"io"
|
"io"
|
||||||
"sync"
|
"sync"
|
||||||
|
"time"
|
||||||
|
|
||||||
inet "github.com/ipfs/go-ipfs/p2p/net"
|
inet "github.com/ipfs/go-ipfs/p2p/net"
|
||||||
peer "github.com/ipfs/go-ipfs/p2p/peer"
|
peer "github.com/ipfs/go-ipfs/p2p/peer"
|
||||||
@ -11,17 +13,20 @@ import (
|
|||||||
// link implements mocknet.Link
|
// link implements mocknet.Link
|
||||||
// and, for simplicity, inet.Conn
|
// and, for simplicity, inet.Conn
|
||||||
type link struct {
|
type link struct {
|
||||||
mock *mocknet
|
mock *mocknet
|
||||||
nets []*peernet
|
nets []*peernet
|
||||||
opts LinkOptions
|
opts LinkOptions
|
||||||
|
ratelimiter *ratelimiter
|
||||||
// this could have addresses on both sides.
|
// this could have addresses on both sides.
|
||||||
|
|
||||||
sync.RWMutex
|
sync.RWMutex
|
||||||
}
|
}
|
||||||
|
|
||||||
func newLink(mn *mocknet, opts LinkOptions) *link {
|
func newLink(mn *mocknet, opts LinkOptions) *link {
|
||||||
return &link{mock: mn, opts: opts}
|
l := &link{mock: mn,
|
||||||
|
opts: opts,
|
||||||
|
ratelimiter: NewRatelimiter(opts.Bandwidth)}
|
||||||
|
return l
|
||||||
}
|
}
|
||||||
|
|
||||||
func (l *link) newConnPair(dialer *peernet) (*conn, *conn) {
|
func (l *link) newConnPair(dialer *peernet) (*conn, *conn) {
|
||||||
@ -57,8 +62,8 @@ func (l *link) newStreamPair() (*stream, *stream) {
|
|||||||
r1, w1 := io.Pipe()
|
r1, w1 := io.Pipe()
|
||||||
r2, w2 := io.Pipe()
|
r2, w2 := io.Pipe()
|
||||||
|
|
||||||
s1 := &stream{Reader: r1, Writer: w2}
|
s1 := NewStream(w2, r1)
|
||||||
s2 := &stream{Reader: r2, Writer: w1}
|
s2 := NewStream(w1, r2)
|
||||||
return s1, s2
|
return s1, s2
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -86,8 +91,17 @@ func (l *link) Peers() []peer.ID {
|
|||||||
|
|
||||||
func (l *link) SetOptions(o LinkOptions) {
|
func (l *link) SetOptions(o LinkOptions) {
|
||||||
l.opts = o
|
l.opts = o
|
||||||
|
l.ratelimiter.UpdateBandwidth(l.opts.Bandwidth)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (l *link) Options() LinkOptions {
|
func (l *link) Options() LinkOptions {
|
||||||
return l.opts
|
return l.opts
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (l *link) GetLatency() time.Duration {
|
||||||
|
return l.opts.Latency
|
||||||
|
}
|
||||||
|
|
||||||
|
func (l *link) RateLimit(dataSize int) time.Duration {
|
||||||
|
return l.ratelimiter.Limit(dataSize)
|
||||||
|
}
|
||||||
|
@ -63,7 +63,7 @@ func TestNotifications(t *testing.T) {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
if !found {
|
if !found {
|
||||||
t.Error("connection not found")
|
t.Error("connection not found", c1, len(expect), len(actual))
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -1,7 +1,11 @@
|
|||||||
package mocknet
|
package mocknet
|
||||||
|
|
||||||
import (
|
import (
|
||||||
|
"bytes"
|
||||||
"io"
|
"io"
|
||||||
|
"time"
|
||||||
|
|
||||||
|
process "github.com/ipfs/go-ipfs/Godeps/_workspace/src/github.com/jbenet/goprocess"
|
||||||
|
|
||||||
inet "github.com/ipfs/go-ipfs/p2p/net"
|
inet "github.com/ipfs/go-ipfs/p2p/net"
|
||||||
)
|
)
|
||||||
@ -10,10 +14,51 @@ import (
|
|||||||
type stream struct {
|
type stream struct {
|
||||||
io.Reader
|
io.Reader
|
||||||
io.Writer
|
io.Writer
|
||||||
conn *conn
|
conn *conn
|
||||||
|
toDeliver chan *transportObject
|
||||||
|
proc process.Process
|
||||||
|
}
|
||||||
|
|
||||||
|
type transportObject struct {
|
||||||
|
msg []byte
|
||||||
|
arrivalTime time.Time
|
||||||
|
}
|
||||||
|
|
||||||
|
func NewStream(w io.Writer, r io.Reader) *stream {
|
||||||
|
s := &stream{
|
||||||
|
Reader: r,
|
||||||
|
Writer: w,
|
||||||
|
toDeliver: make(chan *transportObject),
|
||||||
|
}
|
||||||
|
|
||||||
|
s.proc = process.WithTeardown(s.teardown)
|
||||||
|
s.proc.Go(s.transport)
|
||||||
|
return s
|
||||||
|
}
|
||||||
|
|
||||||
|
// How to handle errors with writes?
|
||||||
|
func (s *stream) Write(p []byte) (n int, err error) {
|
||||||
|
l := s.conn.link
|
||||||
|
delay := l.GetLatency() + l.RateLimit(len(p))
|
||||||
|
t := time.Now().Add(delay)
|
||||||
|
select {
|
||||||
|
case <-s.proc.Closing(): // bail out if we're closing.
|
||||||
|
return 0, io.ErrClosedPipe
|
||||||
|
case s.toDeliver <- &transportObject{msg: p, arrivalTime: t}:
|
||||||
|
}
|
||||||
|
return len(p), nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *stream) Close() error {
|
func (s *stream) Close() error {
|
||||||
|
return s.proc.Close()
|
||||||
|
}
|
||||||
|
|
||||||
|
// teardown shuts down the stream. it is called by s.proc.Close()
|
||||||
|
// after all the children of this s.proc (i.e. transport's proc)
|
||||||
|
// are done.
|
||||||
|
func (s *stream) teardown() error {
|
||||||
|
// at this point, no streams are writing.
|
||||||
|
|
||||||
s.conn.removeStream(s)
|
s.conn.removeStream(s)
|
||||||
if r, ok := (s.Reader).(io.Closer); ok {
|
if r, ok := (s.Reader).(io.Closer); ok {
|
||||||
r.Close()
|
r.Close()
|
||||||
@ -30,3 +75,71 @@ func (s *stream) Close() error {
|
|||||||
func (s *stream) Conn() inet.Conn {
|
func (s *stream) Conn() inet.Conn {
|
||||||
return s.conn
|
return s.conn
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// transport will grab message arrival times, wait until that time, and
|
||||||
|
// then write the message out when it is scheduled to arrive
|
||||||
|
func (s *stream) transport(proc process.Process) {
|
||||||
|
bufsize := 256
|
||||||
|
buf := new(bytes.Buffer)
|
||||||
|
ticker := time.NewTicker(time.Millisecond * 4)
|
||||||
|
|
||||||
|
// writeBuf writes the contents of buf through to the s.Writer.
|
||||||
|
// done only when arrival time makes sense.
|
||||||
|
drainBuf := func() {
|
||||||
|
if buf.Len() > 0 {
|
||||||
|
_, err := s.Writer.Write(buf.Bytes())
|
||||||
|
if err != nil {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
buf.Reset()
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// deliverOrWait is a helper func that processes
|
||||||
|
// an incoming packet. it waits until the arrival time,
|
||||||
|
// and then writes things out.
|
||||||
|
deliverOrWait := func(o *transportObject) {
|
||||||
|
buffered := len(o.msg) + buf.Len()
|
||||||
|
|
||||||
|
now := time.Now()
|
||||||
|
if now.Before(o.arrivalTime) {
|
||||||
|
if buffered < bufsize {
|
||||||
|
buf.Write(o.msg)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
// we do not buffer + return here, instead hanging the
|
||||||
|
// call (i.e. not accepting any more transportObjects)
|
||||||
|
// so that we apply back-pressure to the sender.
|
||||||
|
// this sleep should wake up same time as ticker.
|
||||||
|
time.Sleep(o.arrivalTime.Sub(now))
|
||||||
|
}
|
||||||
|
|
||||||
|
// ok, we waited our due time. now rite the buf + msg.
|
||||||
|
|
||||||
|
// drainBuf first, before we write this message.
|
||||||
|
drainBuf()
|
||||||
|
|
||||||
|
// write this message.
|
||||||
|
_, err := s.Writer.Write(o.msg)
|
||||||
|
if err != nil {
|
||||||
|
log.Error("mock_stream", err)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
for {
|
||||||
|
select {
|
||||||
|
case <-proc.Closing():
|
||||||
|
return // bail out of here.
|
||||||
|
|
||||||
|
case o, ok := <-s.toDeliver:
|
||||||
|
if !ok {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
deliverOrWait(o)
|
||||||
|
|
||||||
|
case <-ticker.C: // ok, due to write it out.
|
||||||
|
drainBuf()
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
@ -3,9 +3,11 @@ package mocknet
|
|||||||
import (
|
import (
|
||||||
"bytes"
|
"bytes"
|
||||||
"io"
|
"io"
|
||||||
|
"math"
|
||||||
"math/rand"
|
"math/rand"
|
||||||
"sync"
|
"sync"
|
||||||
"testing"
|
"testing"
|
||||||
|
"time"
|
||||||
|
|
||||||
inet "github.com/ipfs/go-ipfs/p2p/net"
|
inet "github.com/ipfs/go-ipfs/p2p/net"
|
||||||
peer "github.com/ipfs/go-ipfs/p2p/peer"
|
peer "github.com/ipfs/go-ipfs/p2p/peer"
|
||||||
@ -478,3 +480,102 @@ func TestAdding(t *testing.T) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func TestRateLimiting(t *testing.T) {
|
||||||
|
rl := NewRatelimiter(10)
|
||||||
|
|
||||||
|
if !within(rl.Limit(10), time.Duration(float32(time.Second)), time.Millisecond/10) {
|
||||||
|
t.Fail()
|
||||||
|
}
|
||||||
|
if !within(rl.Limit(10), time.Duration(float32(time.Second*2)), time.Millisecond) {
|
||||||
|
t.Fail()
|
||||||
|
}
|
||||||
|
if !within(rl.Limit(10), time.Duration(float32(time.Second*3)), time.Millisecond) {
|
||||||
|
t.Fail()
|
||||||
|
}
|
||||||
|
|
||||||
|
if within(rl.Limit(10), time.Duration(float32(time.Second*3)), time.Millisecond) {
|
||||||
|
t.Fail()
|
||||||
|
}
|
||||||
|
|
||||||
|
rl.UpdateBandwidth(50)
|
||||||
|
if !within(rl.Limit(75), time.Duration(float32(time.Second)*1.5), time.Millisecond/10) {
|
||||||
|
t.Fail()
|
||||||
|
}
|
||||||
|
|
||||||
|
if within(rl.Limit(75), time.Duration(float32(time.Second)*1.5), time.Millisecond/10) {
|
||||||
|
t.Fail()
|
||||||
|
}
|
||||||
|
|
||||||
|
rl.UpdateBandwidth(100)
|
||||||
|
if !within(rl.Limit(1), time.Duration(time.Millisecond*10), time.Millisecond/10) {
|
||||||
|
t.Fail()
|
||||||
|
}
|
||||||
|
|
||||||
|
if within(rl.Limit(1), time.Duration(time.Millisecond*10), time.Millisecond/10) {
|
||||||
|
t.Fail()
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func within(t1 time.Duration, t2 time.Duration, tolerance time.Duration) bool {
|
||||||
|
return math.Abs(float64(t1)-float64(t2)) < float64(tolerance)
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestLimitedStreams(t *testing.T) {
|
||||||
|
mn, err := FullMeshConnected(context.Background(), 2)
|
||||||
|
if err != nil {
|
||||||
|
t.Fatal(err)
|
||||||
|
}
|
||||||
|
|
||||||
|
var wg sync.WaitGroup
|
||||||
|
messages := 4
|
||||||
|
messageSize := 500
|
||||||
|
handler := func(s inet.Stream) {
|
||||||
|
b := make([]byte, messageSize)
|
||||||
|
for i := 0; i < messages; i++ {
|
||||||
|
if _, err := io.ReadFull(s, b); err != nil {
|
||||||
|
log.Fatal(err)
|
||||||
|
}
|
||||||
|
if !bytes.Equal(b[:4], []byte("ping")) {
|
||||||
|
log.Fatal("bytes mismatch")
|
||||||
|
}
|
||||||
|
wg.Done()
|
||||||
|
}
|
||||||
|
s.Close()
|
||||||
|
}
|
||||||
|
|
||||||
|
hosts := mn.Hosts()
|
||||||
|
for _, h := range mn.Hosts() {
|
||||||
|
h.SetStreamHandler(protocol.TestingID, handler)
|
||||||
|
}
|
||||||
|
|
||||||
|
peers := mn.Peers()
|
||||||
|
links := mn.LinksBetweenPeers(peers[0], peers[1])
|
||||||
|
// 1000 byte per second bandwidth
|
||||||
|
bps := float64(1000)
|
||||||
|
opts := links[0].Options()
|
||||||
|
opts.Bandwidth = bps
|
||||||
|
for _, link := range links {
|
||||||
|
link.SetOptions(opts)
|
||||||
|
}
|
||||||
|
|
||||||
|
s, err := hosts[0].NewStream(protocol.TestingID, hosts[1].ID())
|
||||||
|
if err != nil {
|
||||||
|
t.Fatal(err)
|
||||||
|
}
|
||||||
|
|
||||||
|
filler := make([]byte, messageSize-4)
|
||||||
|
data := append([]byte("ping"), filler...)
|
||||||
|
before := time.Now()
|
||||||
|
for i := 0; i < messages; i++ {
|
||||||
|
wg.Add(1)
|
||||||
|
if _, err := s.Write(data); err != nil {
|
||||||
|
panic(err)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
wg.Wait()
|
||||||
|
if !within(time.Since(before), time.Duration(time.Second*2), time.Second/3) {
|
||||||
|
t.Fatal("Expected 2ish seconds but got ", time.Since(before))
|
||||||
|
}
|
||||||
|
}
|
||||||
|
69
p2p/net/mock/ratelimiter.go
Normal file
69
p2p/net/mock/ratelimiter.go
Normal file
@ -0,0 +1,69 @@
|
|||||||
|
package mocknet
|
||||||
|
|
||||||
|
import (
|
||||||
|
"time"
|
||||||
|
)
|
||||||
|
|
||||||
|
// A ratelimiter is used by a link to determine how long to wait before sending
|
||||||
|
// data given a bandwidth cap.
|
||||||
|
type ratelimiter struct {
|
||||||
|
bandwidth float64 // bytes per nanosecond
|
||||||
|
allowance float64 // in bytes
|
||||||
|
maxAllowance float64 // in bytes
|
||||||
|
lastUpdate time.Time // when allowance was updated last
|
||||||
|
count int // number of times rate limiting was applied
|
||||||
|
duration time.Duration // total delay introduced due to rate limiting
|
||||||
|
}
|
||||||
|
|
||||||
|
// Creates a new ratelimiter with bandwidth (in bytes/sec)
|
||||||
|
func NewRatelimiter(bandwidth float64) *ratelimiter {
|
||||||
|
// convert bandwidth to bytes per nanosecond
|
||||||
|
b := bandwidth / float64(time.Second)
|
||||||
|
return &ratelimiter{
|
||||||
|
bandwidth: b,
|
||||||
|
allowance: 0,
|
||||||
|
maxAllowance: bandwidth,
|
||||||
|
lastUpdate: time.Now(),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Changes bandwidth of a ratelimiter and resets its allowance
|
||||||
|
func (r *ratelimiter) UpdateBandwidth(bandwidth float64) {
|
||||||
|
// Convert bandwidth from bytes/second to bytes/nanosecond
|
||||||
|
b := bandwidth / float64(time.Second)
|
||||||
|
r.bandwidth = b
|
||||||
|
// Reset allowance
|
||||||
|
r.allowance = 0
|
||||||
|
r.maxAllowance = bandwidth
|
||||||
|
r.lastUpdate = time.Now()
|
||||||
|
}
|
||||||
|
|
||||||
|
// Returns how long to wait before sending data with length 'dataSize' bytes
|
||||||
|
func (r *ratelimiter) Limit(dataSize int) time.Duration {
|
||||||
|
// update time
|
||||||
|
var duration time.Duration = time.Duration(0)
|
||||||
|
if r.bandwidth == 0 {
|
||||||
|
return duration
|
||||||
|
}
|
||||||
|
current := time.Now()
|
||||||
|
elapsedTime := current.Sub(r.lastUpdate)
|
||||||
|
r.lastUpdate = current
|
||||||
|
|
||||||
|
allowance := r.allowance + float64(elapsedTime)*r.bandwidth
|
||||||
|
// allowance can't exceed bandwidth
|
||||||
|
if allowance > r.maxAllowance {
|
||||||
|
allowance = r.maxAllowance
|
||||||
|
}
|
||||||
|
|
||||||
|
allowance -= float64(dataSize)
|
||||||
|
if allowance < 0 {
|
||||||
|
// sleep until allowance is back to 0
|
||||||
|
duration = time.Duration(-allowance / r.bandwidth)
|
||||||
|
// rate limiting was applied, record stats
|
||||||
|
r.count++
|
||||||
|
r.duration += duration
|
||||||
|
}
|
||||||
|
|
||||||
|
r.allowance = allowance
|
||||||
|
return duration
|
||||||
|
}
|
Reference in New Issue
Block a user