Create latency package for realistically simulating network latency (#1286)
This commit is contained in:
298
benchmark/latency/latency.go
Normal file
298
benchmark/latency/latency.go
Normal file
@ -0,0 +1,298 @@
|
|||||||
|
/*
|
||||||
|
*
|
||||||
|
* Copyright 2017 gRPC authors.
|
||||||
|
*
|
||||||
|
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||||
|
* you may not use this file except in compliance with the License.
|
||||||
|
* You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*
|
||||||
|
*/
|
||||||
|
|
||||||
|
// Package latency provides wrappers for net.Conn, net.Listener, and
|
||||||
|
// net.Dialers, designed to interoperate to inject real-world latency into
|
||||||
|
// network connections.
|
||||||
|
package latency
|
||||||
|
|
||||||
|
import (
|
||||||
|
"bytes"
|
||||||
|
"encoding/binary"
|
||||||
|
"fmt"
|
||||||
|
"io"
|
||||||
|
"net"
|
||||||
|
"time"
|
||||||
|
|
||||||
|
"golang.org/x/net/context"
|
||||||
|
)
|
||||||
|
|
||||||
|
// Dialer is a function matching the signature of net.Dial.
|
||||||
|
type Dialer func(network, address string) (net.Conn, error)
|
||||||
|
|
||||||
|
// TimeoutDialer is a function matching the signature of net.DialTimeout.
|
||||||
|
type TimeoutDialer func(network, address string, timeout time.Duration) (net.Conn, error)
|
||||||
|
|
||||||
|
// ContextDialer is a function matching the signature of
|
||||||
|
// net.Dialer.DialContext.
|
||||||
|
type ContextDialer func(ctx context.Context, network, address string) (net.Conn, error)
|
||||||
|
|
||||||
|
// Network represents a network with the given bandwidth, latency, and MTU
|
||||||
|
// (Maximum Transmission Unit) configuration, and can produce wrappers of
|
||||||
|
// net.Listeners, net.Conn, and various forms of dialing functions. The
|
||||||
|
// Listeners and Dialers/Conns on both sides of connections must come from this
|
||||||
|
// package, but need not be created from the same Network. Latency is computed
|
||||||
|
// when sending (in Write), and is injected when receiving (in Read). This
|
||||||
|
// allows senders' Write calls to be non-blocking, as in real-world
|
||||||
|
// applications.
|
||||||
|
//
|
||||||
|
// Note: Latency is injected by the sender specifying the absolute time data
|
||||||
|
// should be available, and the reader delaying until that time arrives to
|
||||||
|
// provide the data. This package attempts to counter-act the effects of clock
|
||||||
|
// drift and existing network latency by measuring the delay between the
|
||||||
|
// sender's transmission time and the receiver's reception time during startup.
|
||||||
|
// No attempt is made to measure the existing bandwidth of the connection.
|
||||||
|
type Network struct {
|
||||||
|
Kbps int // Kilobits per second; if non-positive, infinite
|
||||||
|
Latency time.Duration // One-way latency (sending); if non-positive, no delay
|
||||||
|
MTU int // Bytes per packet; if non-positive, infinite
|
||||||
|
}
|
||||||
|
|
||||||
|
// Conn returns a net.Conn that wraps c and injects n's latency into that
|
||||||
|
// connection. This function also imposes latency for connection creation.
|
||||||
|
// If n's Latency is lower than the measured latency in c, an error is
|
||||||
|
// returned.
|
||||||
|
func (n *Network) Conn(c net.Conn) (net.Conn, error) {
|
||||||
|
start := now()
|
||||||
|
nc := &conn{Conn: c, network: n, readBuf: new(bytes.Buffer)}
|
||||||
|
if err := nc.sync(); err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
sleep(start.Add(nc.delay).Sub(now()))
|
||||||
|
return nc, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
type conn struct {
|
||||||
|
net.Conn
|
||||||
|
network *Network
|
||||||
|
|
||||||
|
readBuf *bytes.Buffer // one packet worth of data received
|
||||||
|
lastSendEnd time.Time // time the previous Write should be fully on the wire
|
||||||
|
delay time.Duration // desired latency - measured latency
|
||||||
|
}
|
||||||
|
|
||||||
|
// header is sent before all data transmitted by the application.
|
||||||
|
type header struct {
|
||||||
|
ReadTime int64 // Time the reader is allowed to read this packet (UnixNano)
|
||||||
|
Sz int32 // Size of the data in the packet
|
||||||
|
}
|
||||||
|
|
||||||
|
func (c *conn) Write(p []byte) (n int, err error) {
|
||||||
|
tNow := now()
|
||||||
|
if c.lastSendEnd.Before(tNow) {
|
||||||
|
c.lastSendEnd = tNow
|
||||||
|
}
|
||||||
|
for len(p) > 0 {
|
||||||
|
pkt := p
|
||||||
|
if c.network.MTU > 0 && len(pkt) > c.network.MTU {
|
||||||
|
pkt = pkt[:c.network.MTU]
|
||||||
|
p = p[c.network.MTU:]
|
||||||
|
} else {
|
||||||
|
p = nil
|
||||||
|
}
|
||||||
|
c.lastSendEnd = c.lastSendEnd.Add(c.network.pktTime(len(pkt)))
|
||||||
|
hdr := header{ReadTime: c.lastSendEnd.Add(c.delay).UnixNano(), Sz: int32(len(pkt))}
|
||||||
|
if err := binary.Write(c.Conn, binary.BigEndian, hdr); err != nil {
|
||||||
|
return n, err
|
||||||
|
}
|
||||||
|
x, err := c.Conn.Write(pkt)
|
||||||
|
n += x
|
||||||
|
if err != nil {
|
||||||
|
return n, err
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return n, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (c *conn) Read(p []byte) (n int, err error) {
|
||||||
|
if c.readBuf.Len() == 0 {
|
||||||
|
var hdr header
|
||||||
|
if err := binary.Read(c.Conn, binary.BigEndian, &hdr); err != nil {
|
||||||
|
return 0, err
|
||||||
|
}
|
||||||
|
defer func() { sleep(time.Unix(0, hdr.ReadTime).Sub(now())) }()
|
||||||
|
|
||||||
|
if _, err := io.CopyN(c.readBuf, c.Conn, int64(hdr.Sz)); err != nil {
|
||||||
|
return 0, err
|
||||||
|
}
|
||||||
|
}
|
||||||
|
// Read from readBuf.
|
||||||
|
return c.readBuf.Read(p)
|
||||||
|
}
|
||||||
|
|
||||||
|
// sync does a handshake and then measures the latency on the network in
|
||||||
|
// coordination with the other side.
|
||||||
|
func (c *conn) sync() error {
|
||||||
|
const (
|
||||||
|
pingMsg = "syncPing"
|
||||||
|
warmup = 10 // minimum number of iterations to measure latency
|
||||||
|
giveUp = 50 // maximum number of iterations to measure latency
|
||||||
|
accuracy = time.Millisecond // req'd accuracy to stop early
|
||||||
|
goodRun = 3 // stop early if latency within accuracy this many times
|
||||||
|
)
|
||||||
|
|
||||||
|
type syncMsg struct {
|
||||||
|
SendT int64 // Time sent. If zero, stop.
|
||||||
|
RecvT int64 // Time received. If zero, fill in and respond.
|
||||||
|
}
|
||||||
|
|
||||||
|
// A trivial handshake
|
||||||
|
if err := binary.Write(c.Conn, binary.BigEndian, []byte(pingMsg)); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
var ping [8]byte
|
||||||
|
if err := binary.Read(c.Conn, binary.BigEndian, &ping); err != nil {
|
||||||
|
return err
|
||||||
|
} else if string(ping[:]) != pingMsg {
|
||||||
|
return fmt.Errorf("malformed handshake message: %v (want %q)", ping, pingMsg)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Both sides are alive and syncing. Calculate network delay / clock skew.
|
||||||
|
att := 0
|
||||||
|
good := 0
|
||||||
|
var latency time.Duration
|
||||||
|
localDone, remoteDone := false, false
|
||||||
|
send := true
|
||||||
|
for !localDone || !remoteDone {
|
||||||
|
if send {
|
||||||
|
if err := binary.Write(c.Conn, binary.BigEndian, syncMsg{SendT: now().UnixNano()}); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
att++
|
||||||
|
send = false
|
||||||
|
}
|
||||||
|
|
||||||
|
// Block until we get a syncMsg
|
||||||
|
m := syncMsg{}
|
||||||
|
if err := binary.Read(c.Conn, binary.BigEndian, &m); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
if m.RecvT == 0 {
|
||||||
|
// Message initiated from other side.
|
||||||
|
if m.SendT == 0 {
|
||||||
|
remoteDone = true
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
// Send response.
|
||||||
|
m.RecvT = now().UnixNano()
|
||||||
|
if err := binary.Write(c.Conn, binary.BigEndian, m); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
|
||||||
|
lag := time.Duration(m.RecvT - m.SendT)
|
||||||
|
latency += lag
|
||||||
|
avgLatency := latency / time.Duration(att)
|
||||||
|
if e := lag - avgLatency; e > -accuracy && e < accuracy {
|
||||||
|
good++
|
||||||
|
} else {
|
||||||
|
good = 0
|
||||||
|
}
|
||||||
|
if att < giveUp && (att < warmup || good < goodRun) {
|
||||||
|
send = true
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
localDone = true
|
||||||
|
latency = avgLatency
|
||||||
|
// Tell the other side we're done.
|
||||||
|
if err := binary.Write(c.Conn, binary.BigEndian, syncMsg{}); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if c.network.Latency <= 0 {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
c.delay = c.network.Latency - latency
|
||||||
|
if c.delay < 0 {
|
||||||
|
return fmt.Errorf("measured network latency (%v) higher than desired latency (%v)", latency, c.network.Latency)
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// Listener returns a net.Listener that wraps l and injects n's latency in its
|
||||||
|
// connections.
|
||||||
|
func (n *Network) Listener(l net.Listener) net.Listener {
|
||||||
|
return &listener{Listener: l, network: n}
|
||||||
|
}
|
||||||
|
|
||||||
|
type listener struct {
|
||||||
|
net.Listener
|
||||||
|
network *Network
|
||||||
|
}
|
||||||
|
|
||||||
|
func (l *listener) Accept() (net.Conn, error) {
|
||||||
|
c, err := l.Listener.Accept()
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
return l.network.Conn(c)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Dialer returns a Dialer that wraps d and injects n's latency in its
|
||||||
|
// connections. n's Latency is also injected to the connection's creation.
|
||||||
|
func (n *Network) Dialer(d Dialer) Dialer {
|
||||||
|
return func(network, address string) (net.Conn, error) {
|
||||||
|
conn, err := d(network, address)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
return n.Conn(conn)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// TimeoutDialer returns a TimeoutDialer that wraps d and injects n's latency
|
||||||
|
// in its connections. n's Latency is also injected to the connection's
|
||||||
|
// creation.
|
||||||
|
func (n *Network) TimeoutDialer(d TimeoutDialer) TimeoutDialer {
|
||||||
|
return func(network, address string, timeout time.Duration) (net.Conn, error) {
|
||||||
|
conn, err := d(network, address, timeout)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
return n.Conn(conn)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// ContextDialer returns a ContextDialer that wraps d and injects n's latency
|
||||||
|
// in its connections. n's Latency is also injected to the connection's
|
||||||
|
// creation.
|
||||||
|
func (n *Network) ContextDialer(d ContextDialer) ContextDialer {
|
||||||
|
return func(ctx context.Context, network, address string) (net.Conn, error) {
|
||||||
|
conn, err := d(ctx, network, address)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
return n.Conn(conn)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// pktTime returns the time it takes to transmit one packet of data of size b
|
||||||
|
// in bytes.
|
||||||
|
func (n *Network) pktTime(b int) time.Duration {
|
||||||
|
if n.Kbps <= 0 {
|
||||||
|
return time.Duration(0)
|
||||||
|
}
|
||||||
|
return time.Duration(b) * time.Second / time.Duration(n.Kbps*(1024/8))
|
||||||
|
}
|
||||||
|
|
||||||
|
// Wrappers for testing
|
||||||
|
|
||||||
|
var now = time.Now
|
||||||
|
var sleep = time.Sleep
|
287
benchmark/latency/latency_test.go
Normal file
287
benchmark/latency/latency_test.go
Normal file
@ -0,0 +1,287 @@
|
|||||||
|
/*
|
||||||
|
*
|
||||||
|
* Copyright 2017 gRPC authors.
|
||||||
|
*
|
||||||
|
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||||
|
* you may not use this file except in compliance with the License.
|
||||||
|
* You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*
|
||||||
|
*/
|
||||||
|
package latency
|
||||||
|
|
||||||
|
import (
|
||||||
|
"bytes"
|
||||||
|
"net"
|
||||||
|
"reflect"
|
||||||
|
"sync"
|
||||||
|
"testing"
|
||||||
|
"time"
|
||||||
|
)
|
||||||
|
|
||||||
|
// bufConn is a net.Conn implemented by a bytes.Buffer (which is a ReadWriter).
|
||||||
|
type bufConn struct {
|
||||||
|
*bytes.Buffer
|
||||||
|
}
|
||||||
|
|
||||||
|
func (bufConn) Close() error { panic("unimplemented") }
|
||||||
|
func (bufConn) LocalAddr() net.Addr { panic("unimplemented") }
|
||||||
|
func (bufConn) RemoteAddr() net.Addr { panic("unimplemented") }
|
||||||
|
func (bufConn) SetDeadline(t time.Time) error { panic("unimplemneted") }
|
||||||
|
func (bufConn) SetReadDeadline(t time.Time) error { panic("unimplemneted") }
|
||||||
|
func (bufConn) SetWriteDeadline(t time.Time) error { panic("unimplemneted") }
|
||||||
|
|
||||||
|
func restoreHooks() func() {
|
||||||
|
s := sleep
|
||||||
|
n := now
|
||||||
|
return func() {
|
||||||
|
sleep = s
|
||||||
|
now = n
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestConn(t *testing.T) {
|
||||||
|
defer restoreHooks()()
|
||||||
|
|
||||||
|
// Constant time.
|
||||||
|
now = func() time.Time { return time.Unix(123, 456) }
|
||||||
|
|
||||||
|
// Capture sleep times for checking later.
|
||||||
|
var sleepTimes []time.Duration
|
||||||
|
sleep = func(t time.Duration) { sleepTimes = append(sleepTimes, t) }
|
||||||
|
|
||||||
|
wantSleeps := func(want ...time.Duration) {
|
||||||
|
if !reflect.DeepEqual(want, sleepTimes) {
|
||||||
|
t.Fatalf("sleepTimes = %v; want %v", sleepTimes, want)
|
||||||
|
}
|
||||||
|
sleepTimes = nil
|
||||||
|
}
|
||||||
|
|
||||||
|
latency := 10 * time.Millisecond
|
||||||
|
c, err := (&Network{Kbps: 1, Latency: latency, MTU: 5}).Conn(bufConn{&bytes.Buffer{}})
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("Unexpected error creating connection: %v", err)
|
||||||
|
}
|
||||||
|
wantSleeps(latency) // Connection creation delay.
|
||||||
|
|
||||||
|
// 1 kbps = 128 Bps. Divides evenly by 1 second using nanos.
|
||||||
|
byteLatency := time.Duration(time.Second / 128)
|
||||||
|
|
||||||
|
write := func(b []byte) {
|
||||||
|
n, err := c.Write(b)
|
||||||
|
if n != len(b) || err != nil {
|
||||||
|
t.Fatalf("c.Write(%v) = %v, %v; want %v, nil", b, n, err, len(b))
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
write([]byte{1, 2, 3, 4, 5}) // One full packet
|
||||||
|
pkt1Time := latency + byteLatency*5
|
||||||
|
write([]byte{6}) // One partial packet
|
||||||
|
pkt2Time := pkt1Time + byteLatency
|
||||||
|
write([]byte{7, 8, 9, 10, 11, 12, 13}) // Two packets
|
||||||
|
pkt3Time := pkt2Time + byteLatency*5
|
||||||
|
pkt4Time := pkt3Time + byteLatency*2
|
||||||
|
|
||||||
|
// No reads, so no sleeps yet.
|
||||||
|
wantSleeps()
|
||||||
|
|
||||||
|
read := func(n int, want []byte) {
|
||||||
|
b := make([]byte, n)
|
||||||
|
if rd, err := c.Read(b); err != nil || rd != len(want) {
|
||||||
|
t.Fatalf("c.Read(<%v bytes>) = %v, %v; want %v, nil", n, rd, err, len(want))
|
||||||
|
}
|
||||||
|
if !reflect.DeepEqual(b[:len(want)], want) {
|
||||||
|
t.Fatalf("read %v; want %v", b, want)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
read(1, []byte{1})
|
||||||
|
wantSleeps(pkt1Time)
|
||||||
|
read(1, []byte{2})
|
||||||
|
wantSleeps()
|
||||||
|
read(3, []byte{3, 4, 5})
|
||||||
|
wantSleeps()
|
||||||
|
read(2, []byte{6})
|
||||||
|
wantSleeps(pkt2Time)
|
||||||
|
read(2, []byte{7, 8})
|
||||||
|
wantSleeps(pkt3Time)
|
||||||
|
read(10, []byte{9, 10, 11})
|
||||||
|
wantSleeps()
|
||||||
|
read(10, []byte{12, 13})
|
||||||
|
wantSleeps(pkt4Time)
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestSync(t *testing.T) {
|
||||||
|
defer restoreHooks()()
|
||||||
|
|
||||||
|
// Infinitely fast CPU: time doesn't pass unless sleep is called.
|
||||||
|
tn := time.Unix(123, 0)
|
||||||
|
now = func() time.Time { return tn }
|
||||||
|
sleep = func(d time.Duration) { tn = tn.Add(d) }
|
||||||
|
|
||||||
|
// Simulate a 20ms latency network, then run sync across that and expect to
|
||||||
|
// measure 20ms latency, or 10ms additional delay for a 30ms network.
|
||||||
|
slowConn, err := (&Network{Kbps: 0, Latency: 20 * time.Millisecond, MTU: 5}).Conn(bufConn{&bytes.Buffer{}})
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("Unexpected error creating connection: %v", err)
|
||||||
|
}
|
||||||
|
c, err := (&Network{Latency: 30 * time.Millisecond}).Conn(slowConn)
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("Unexpected error creating connection: %v", err)
|
||||||
|
}
|
||||||
|
if c.(*conn).delay != 10*time.Millisecond {
|
||||||
|
t.Fatalf("c.delay = %v; want 10ms", c.(*conn).delay)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestSyncTooSlow(t *testing.T) {
|
||||||
|
defer restoreHooks()()
|
||||||
|
|
||||||
|
// Infinitely fast CPU: time doesn't pass unless sleep is called.
|
||||||
|
tn := time.Unix(123, 0)
|
||||||
|
now = func() time.Time { return tn }
|
||||||
|
sleep = func(d time.Duration) { tn = tn.Add(d) }
|
||||||
|
|
||||||
|
// Simulate a 10ms latency network, then attempt to simulate a 5ms latency
|
||||||
|
// network and expect an error.
|
||||||
|
slowConn, err := (&Network{Kbps: 0, Latency: 10 * time.Millisecond, MTU: 5}).Conn(bufConn{&bytes.Buffer{}})
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("Unexpected error creating connection: %v", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
errWant := "measured network latency (10ms) higher than desired latency (5ms)"
|
||||||
|
if _, err := (&Network{Latency: 5 * time.Millisecond}).Conn(slowConn); err == nil || err.Error() != errWant {
|
||||||
|
t.Fatalf("Conn() = _, %q; want _, %q", err, errWant)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestListenerAndDialer(t *testing.T) {
|
||||||
|
defer restoreHooks()()
|
||||||
|
|
||||||
|
tn := time.Unix(123, 0)
|
||||||
|
startTime := tn
|
||||||
|
mu := &sync.Mutex{}
|
||||||
|
now = func() time.Time {
|
||||||
|
mu.Lock()
|
||||||
|
defer mu.Unlock()
|
||||||
|
return tn
|
||||||
|
}
|
||||||
|
|
||||||
|
n := &Network{Kbps: 2, Latency: 20 * time.Millisecond, MTU: 10}
|
||||||
|
// 2 kbps = .25 kBps = 256 Bps
|
||||||
|
byteLatency := func(n int) time.Duration {
|
||||||
|
return time.Duration(n) * time.Second / 256
|
||||||
|
}
|
||||||
|
|
||||||
|
// Create a real listener and wrap it.
|
||||||
|
l, err := net.Listen("tcp", ":0")
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("Unexpected error creating listener: %v", err)
|
||||||
|
}
|
||||||
|
defer l.Close()
|
||||||
|
l = n.Listener(l)
|
||||||
|
|
||||||
|
var serverConn net.Conn
|
||||||
|
var scErr error
|
||||||
|
scDone := make(chan struct{})
|
||||||
|
go func() {
|
||||||
|
serverConn, scErr = l.Accept()
|
||||||
|
close(scDone)
|
||||||
|
}()
|
||||||
|
|
||||||
|
// Create a dialer and use it.
|
||||||
|
clientConn, err := n.TimeoutDialer(net.DialTimeout)("tcp", l.Addr().String(), 2*time.Second)
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("Unexpected error dialing: %v", err)
|
||||||
|
}
|
||||||
|
defer clientConn.Close()
|
||||||
|
|
||||||
|
// Block until server's Conn is available.
|
||||||
|
<-scDone
|
||||||
|
if scErr != nil {
|
||||||
|
t.Fatalf("Unexpected error listening: %v", scErr)
|
||||||
|
}
|
||||||
|
defer serverConn.Close()
|
||||||
|
|
||||||
|
// sleep (only) advances tn. Done after connections established so sync detects zero delay.
|
||||||
|
sleep = func(d time.Duration) {
|
||||||
|
mu.Lock()
|
||||||
|
defer mu.Unlock()
|
||||||
|
if d > 0 {
|
||||||
|
tn = tn.Add(d)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
seq := func(a, b int) []byte {
|
||||||
|
buf := make([]byte, b-a)
|
||||||
|
for i := 0; i < b-a; i++ {
|
||||||
|
buf[i] = byte(i + a)
|
||||||
|
}
|
||||||
|
return buf
|
||||||
|
}
|
||||||
|
|
||||||
|
pkt1 := seq(0, 10)
|
||||||
|
pkt2 := seq(10, 30)
|
||||||
|
pkt3 := seq(30, 35)
|
||||||
|
|
||||||
|
write := func(c net.Conn, b []byte) {
|
||||||
|
n, err := c.Write(b)
|
||||||
|
if n != len(b) || err != nil {
|
||||||
|
t.Fatalf("c.Write(%v) = %v, %v; want %v, nil", b, n, err, len(b))
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
write(serverConn, pkt1)
|
||||||
|
write(serverConn, pkt2)
|
||||||
|
write(serverConn, pkt3)
|
||||||
|
write(clientConn, pkt3)
|
||||||
|
write(clientConn, pkt1)
|
||||||
|
write(clientConn, pkt2)
|
||||||
|
|
||||||
|
if tn != startTime {
|
||||||
|
t.Fatalf("unexpected sleep in write; tn = %v; want %v", tn, startTime)
|
||||||
|
}
|
||||||
|
|
||||||
|
read := func(c net.Conn, n int, want []byte, timeWant time.Time) {
|
||||||
|
b := make([]byte, n)
|
||||||
|
if rd, err := c.Read(b); err != nil || rd != len(want) {
|
||||||
|
t.Fatalf("c.Read(<%v bytes>) = %v, %v; want %v, nil (read: %v)", n, rd, err, len(want), b[:rd])
|
||||||
|
}
|
||||||
|
if !reflect.DeepEqual(b[:len(want)], want) {
|
||||||
|
t.Fatalf("read %v; want %v", b, want)
|
||||||
|
}
|
||||||
|
if !tn.Equal(timeWant) {
|
||||||
|
t.Errorf("tn after read(%v) = %v; want %v", want, tn, timeWant)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
read(clientConn, len(pkt1)+1, pkt1, startTime.Add(n.Latency+byteLatency(len(pkt1))))
|
||||||
|
read(serverConn, len(pkt3)+1, pkt3, tn) // tn was advanced by the above read; pkt3 is shorter than pkt1
|
||||||
|
|
||||||
|
read(clientConn, len(pkt2), pkt2[:10], startTime.Add(n.Latency+byteLatency(len(pkt1)+10)))
|
||||||
|
read(clientConn, len(pkt2), pkt2[10:], startTime.Add(n.Latency+byteLatency(len(pkt1)+len(pkt2))))
|
||||||
|
read(clientConn, len(pkt3), pkt3, startTime.Add(n.Latency+byteLatency(len(pkt1)+len(pkt2)+len(pkt3))))
|
||||||
|
|
||||||
|
read(serverConn, len(pkt1), pkt1, tn) // tn already past the arrival time due to prior reads
|
||||||
|
read(serverConn, len(pkt2), pkt2[:10], tn)
|
||||||
|
read(serverConn, len(pkt2), pkt2[10:], tn)
|
||||||
|
|
||||||
|
// Sleep awhile and make sure the read happens disregarding previous writes
|
||||||
|
// (lastSendEnd handling).
|
||||||
|
sleep(10 * time.Second)
|
||||||
|
write(clientConn, pkt1)
|
||||||
|
read(serverConn, len(pkt1), pkt1, tn.Add(n.Latency+byteLatency(len(pkt1))))
|
||||||
|
|
||||||
|
// Send, sleep longer than the network delay, then make sure the read happens
|
||||||
|
// instantly.
|
||||||
|
write(serverConn, pkt1)
|
||||||
|
sleep(10 * time.Second)
|
||||||
|
read(clientConn, len(pkt1), pkt1, tn)
|
||||||
|
}
|
Reference in New Issue
Block a user