diff --git a/exchange/bitswap/testutils.go b/exchange/bitswap/testutils.go
index 91fdece7f..3dad2afed 100644
--- a/exchange/bitswap/testutils.go
+++ b/exchange/bitswap/testutils.go
@@ -46,7 +46,7 @@ func (g *SessionGenerator) Next() Instance {
 	if err != nil {
 		panic("FIXME") // TODO change signature
 	}
-	return session(g.ctx, g.net, p)
+	return Session(g.ctx, g.net, p)
 }
 
 func (g *SessionGenerator) Instances(n int) []Instance {
@@ -85,7 +85,7 @@ func (i *Instance) SetBlockstoreLatency(t time.Duration) time.Duration {
 // NB: It's easy make mistakes by providing the same peer ID to two different
 // sessions. To safeguard, use the SessionGenerator to generate sessions. It's
 // just a much better idea.
-func session(ctx context.Context, net tn.Network, p testutil.Identity) Instance {
+func Session(ctx context.Context, net tn.Network, p testutil.Identity) Instance {
 	bsdelay := delay.Fixed(0)
 	const writeCacheElems = 100
 
diff --git a/p2p/net/mock/interface.go b/p2p/net/mock/interface.go
index 9887269b6..28687d70b 100644
--- a/p2p/net/mock/interface.go
+++ b/p2p/net/mock/interface.go
@@ -7,13 +7,12 @@
 package mocknet
 
 import (
-	"io"
-	"time"
-
 	ic "github.com/ipfs/go-ipfs/p2p/crypto"
 	host "github.com/ipfs/go-ipfs/p2p/host"
 	inet "github.com/ipfs/go-ipfs/p2p/net"
 	peer "github.com/ipfs/go-ipfs/p2p/peer"
+	"io"
+	"time"
 
 	ma "github.com/ipfs/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-multiaddr"
 )
@@ -59,13 +58,14 @@ type Mocknet interface {
 	ConnectNets(inet.Network, inet.Network) (inet.Conn, error)
 	DisconnectPeers(peer.ID, peer.ID) error
 	DisconnectNets(inet.Network, inet.Network) error
+	LinkAll() error
 }
 
 // LinkOptions are used to change aspects of the links.
 // Sorry but they dont work yet :(
 type LinkOptions struct {
 	Latency   time.Duration
-	Bandwidth int // in bytes-per-second
+	Bandwidth float64 // in bytes-per-second
 	// we can make these values distributions down the road.
 }
 
diff --git a/p2p/net/mock/mock_link.go b/p2p/net/mock/mock_link.go
index 618b90706..28575e758 100644
--- a/p2p/net/mock/mock_link.go
+++ b/p2p/net/mock/mock_link.go
@@ -1,8 +1,10 @@
 package mocknet
 
 import (
+	//	"fmt"
 	"io"
 	"sync"
+	"time"
 
 	inet "github.com/ipfs/go-ipfs/p2p/net"
 	peer "github.com/ipfs/go-ipfs/p2p/peer"
@@ -11,17 +13,20 @@ import (
 // link implements mocknet.Link
 // and, for simplicity, inet.Conn
 type link struct {
-	mock *mocknet
-	nets []*peernet
-	opts LinkOptions
-
+	mock        *mocknet
+	nets        []*peernet
+	opts        LinkOptions
+	ratelimiter *ratelimiter
 	// this could have addresses on both sides.
 
 	sync.RWMutex
 }
 
 func newLink(mn *mocknet, opts LinkOptions) *link {
-	return &link{mock: mn, opts: opts}
+	l := &link{mock: mn,
+		opts:        opts,
+		ratelimiter: NewRatelimiter(opts.Bandwidth)}
+	return l
 }
 
 func (l *link) newConnPair(dialer *peernet) (*conn, *conn) {
@@ -57,8 +62,8 @@ func (l *link) newStreamPair() (*stream, *stream) {
 	r1, w1 := io.Pipe()
 	r2, w2 := io.Pipe()
 
-	s1 := &stream{Reader: r1, Writer: w2}
-	s2 := &stream{Reader: r2, Writer: w1}
+	s1 := NewStream(w2, r1)
+	s2 := NewStream(w1, r2)
 	return s1, s2
 }
 
@@ -86,8 +91,17 @@ func (l *link) Peers() []peer.ID {
 
 func (l *link) SetOptions(o LinkOptions) {
 	l.opts = o
+	l.ratelimiter.UpdateBandwidth(l.opts.Bandwidth)
 }
 
 func (l *link) Options() LinkOptions {
 	return l.opts
 }
+
+func (l *link) GetLatency() time.Duration {
+	return l.opts.Latency
+}
+
+func (l *link) RateLimit(dataSize int) time.Duration {
+	return l.ratelimiter.Limit(dataSize)
+}
diff --git a/p2p/net/mock/mock_notif_test.go b/p2p/net/mock/mock_notif_test.go
index d91403f8b..1ea9df424 100644
--- a/p2p/net/mock/mock_notif_test.go
+++ b/p2p/net/mock/mock_notif_test.go
@@ -63,7 +63,7 @@ func TestNotifications(t *testing.T) {
 					}
 				}
 				if !found {
-					t.Error("connection not found")
+					t.Error("connection not found", c1, len(expect), len(actual))
 				}
 			}
 
diff --git a/p2p/net/mock/mock_stream.go b/p2p/net/mock/mock_stream.go
index 1820553c1..56e3031ef 100644
--- a/p2p/net/mock/mock_stream.go
+++ b/p2p/net/mock/mock_stream.go
@@ -1,7 +1,11 @@
 package mocknet
 
 import (
+	"bytes"
 	"io"
+	"time"
+
+	process "github.com/ipfs/go-ipfs/Godeps/_workspace/src/github.com/jbenet/goprocess"
 
 	inet "github.com/ipfs/go-ipfs/p2p/net"
 )
@@ -10,10 +14,51 @@ import (
 type stream struct {
 	io.Reader
 	io.Writer
-	conn *conn
+	conn      *conn
+	toDeliver chan *transportObject
+	proc      process.Process
+}
+
+type transportObject struct {
+	msg         []byte
+	arrivalTime time.Time
+}
+
+func NewStream(w io.Writer, r io.Reader) *stream {
+	s := &stream{
+		Reader:    r,
+		Writer:    w,
+		toDeliver: make(chan *transportObject),
+	}
+
+	s.proc = process.WithTeardown(s.teardown)
+	s.proc.Go(s.transport)
+	return s
+}
+
+//  How to handle errors with writes?
+func (s *stream) Write(p []byte) (n int, err error) {
+	l := s.conn.link
+	delay := l.GetLatency() + l.RateLimit(len(p))
+	t := time.Now().Add(delay)
+	select {
+	case <-s.proc.Closing(): // bail out if we're closing.
+		return 0, io.ErrClosedPipe
+	case s.toDeliver <- &transportObject{msg: p, arrivalTime: t}:
+	}
+	return len(p), nil
 }
 
 func (s *stream) Close() error {
+	return s.proc.Close()
+}
+
+// teardown shuts down the stream. it is called by s.proc.Close()
+// after all the children of this s.proc (i.e. transport's proc)
+// are done.
+func (s *stream) teardown() error {
+	// at this point, no streams are writing.
+
 	s.conn.removeStream(s)
 	if r, ok := (s.Reader).(io.Closer); ok {
 		r.Close()
@@ -30,3 +75,71 @@ func (s *stream) Close() error {
 func (s *stream) Conn() inet.Conn {
 	return s.conn
 }
+
+// transport will grab message arrival times, wait until that time, and
+// then write the message out when it is scheduled to arrive
+func (s *stream) transport(proc process.Process) {
+	bufsize := 256
+	buf := new(bytes.Buffer)
+	ticker := time.NewTicker(time.Millisecond * 4)
+
+	// writeBuf writes the contents of buf through to the s.Writer.
+	// done only when arrival time makes sense.
+	drainBuf := func() {
+		if buf.Len() > 0 {
+			_, err := s.Writer.Write(buf.Bytes())
+			if err != nil {
+				return
+			}
+			buf.Reset()
+		}
+	}
+
+	// deliverOrWait is a helper func that processes
+	// an incoming packet. it waits until the arrival time,
+	// and then writes things out.
+	deliverOrWait := func(o *transportObject) {
+		buffered := len(o.msg) + buf.Len()
+
+		now := time.Now()
+		if now.Before(o.arrivalTime) {
+			if buffered < bufsize {
+				buf.Write(o.msg)
+				return
+			}
+
+			// we do not buffer + return here, instead hanging the
+			// call (i.e. not accepting any more transportObjects)
+			// so that we apply back-pressure to the sender.
+			// this sleep should wake up same time as ticker.
+			time.Sleep(o.arrivalTime.Sub(now))
+		}
+
+		// ok, we waited our due time. now rite the buf + msg.
+
+		// drainBuf first, before we write this message.
+		drainBuf()
+
+		// write this message.
+		_, err := s.Writer.Write(o.msg)
+		if err != nil {
+			log.Error("mock_stream", err)
+		}
+	}
+
+	for {
+		select {
+		case <-proc.Closing():
+			return // bail out of here.
+
+		case o, ok := <-s.toDeliver:
+			if !ok {
+				return
+			}
+			deliverOrWait(o)
+
+		case <-ticker.C: // ok, due to write it out.
+			drainBuf()
+		}
+	}
+}
diff --git a/p2p/net/mock/mock_test.go b/p2p/net/mock/mock_test.go
index d402fe045..360886828 100644
--- a/p2p/net/mock/mock_test.go
+++ b/p2p/net/mock/mock_test.go
@@ -3,9 +3,11 @@ package mocknet
 import (
 	"bytes"
 	"io"
+	"math"
 	"math/rand"
 	"sync"
 	"testing"
+	"time"
 
 	inet "github.com/ipfs/go-ipfs/p2p/net"
 	peer "github.com/ipfs/go-ipfs/p2p/peer"
@@ -478,3 +480,102 @@ func TestAdding(t *testing.T) {
 	}
 
 }
+
+func TestRateLimiting(t *testing.T) {
+	rl := NewRatelimiter(10)
+
+	if !within(rl.Limit(10), time.Duration(float32(time.Second)), time.Millisecond/10) {
+		t.Fail()
+	}
+	if !within(rl.Limit(10), time.Duration(float32(time.Second*2)), time.Millisecond) {
+		t.Fail()
+	}
+	if !within(rl.Limit(10), time.Duration(float32(time.Second*3)), time.Millisecond) {
+		t.Fail()
+	}
+
+	if within(rl.Limit(10), time.Duration(float32(time.Second*3)), time.Millisecond) {
+		t.Fail()
+	}
+
+	rl.UpdateBandwidth(50)
+	if !within(rl.Limit(75), time.Duration(float32(time.Second)*1.5), time.Millisecond/10) {
+		t.Fail()
+	}
+
+	if within(rl.Limit(75), time.Duration(float32(time.Second)*1.5), time.Millisecond/10) {
+		t.Fail()
+	}
+
+	rl.UpdateBandwidth(100)
+	if !within(rl.Limit(1), time.Duration(time.Millisecond*10), time.Millisecond/10) {
+		t.Fail()
+	}
+
+	if within(rl.Limit(1), time.Duration(time.Millisecond*10), time.Millisecond/10) {
+		t.Fail()
+	}
+}
+
+func within(t1 time.Duration, t2 time.Duration, tolerance time.Duration) bool {
+	return math.Abs(float64(t1)-float64(t2)) < float64(tolerance)
+}
+
+func TestLimitedStreams(t *testing.T) {
+	mn, err := FullMeshConnected(context.Background(), 2)
+	if err != nil {
+		t.Fatal(err)
+	}
+
+	var wg sync.WaitGroup
+	messages := 4
+	messageSize := 500
+	handler := func(s inet.Stream) {
+		b := make([]byte, messageSize)
+		for i := 0; i < messages; i++ {
+			if _, err := io.ReadFull(s, b); err != nil {
+				log.Fatal(err)
+			}
+			if !bytes.Equal(b[:4], []byte("ping")) {
+				log.Fatal("bytes mismatch")
+			}
+			wg.Done()
+		}
+		s.Close()
+	}
+
+	hosts := mn.Hosts()
+	for _, h := range mn.Hosts() {
+		h.SetStreamHandler(protocol.TestingID, handler)
+	}
+
+	peers := mn.Peers()
+	links := mn.LinksBetweenPeers(peers[0], peers[1])
+	//  1000 byte per second bandwidth
+	bps := float64(1000)
+	opts := links[0].Options()
+	opts.Bandwidth = bps
+	for _, link := range links {
+		link.SetOptions(opts)
+	}
+
+	s, err := hosts[0].NewStream(protocol.TestingID, hosts[1].ID())
+	if err != nil {
+		t.Fatal(err)
+	}
+
+	filler := make([]byte, messageSize-4)
+	data := append([]byte("ping"), filler...)
+	before := time.Now()
+	for i := 0; i < messages; i++ {
+		wg.Add(1)
+		if _, err := s.Write(data); err != nil {
+			panic(err)
+		}
+	}
+
+	wg.Wait()
+	if !within(time.Since(before), time.Duration(time.Second*2), time.Second/3) {
+		t.Fatal("Expected 2ish seconds but got ", time.Since(before))
+	}
+}
diff --git a/p2p/net/mock/ratelimiter.go b/p2p/net/mock/ratelimiter.go
new file mode 100644
index 000000000..65eb7b6a2
--- /dev/null
+++ b/p2p/net/mock/ratelimiter.go
@@ -0,0 +1,69 @@
+package mocknet
+
+import (
+	"time"
+)
+
+//  A ratelimiter is used by a link to determine how long to wait before sending
+//  data given a bandwidth cap.
+type ratelimiter struct {
+	bandwidth    float64       // bytes per nanosecond
+	allowance    float64       // in bytes
+	maxAllowance float64       // in bytes
+	lastUpdate   time.Time     // when allowance was updated last
+	count        int           // number of times rate limiting was applied
+	duration     time.Duration // total delay introduced due to rate limiting
+}
+
+//  Creates a new ratelimiter with bandwidth (in bytes/sec)
+func NewRatelimiter(bandwidth float64) *ratelimiter {
+	//  convert bandwidth to bytes per nanosecond
+	b := bandwidth / float64(time.Second)
+	return &ratelimiter{
+		bandwidth:    b,
+		allowance:    0,
+		maxAllowance: bandwidth,
+		lastUpdate:   time.Now(),
+	}
+}
+
+//  Changes bandwidth of a ratelimiter and resets its allowance
+func (r *ratelimiter) UpdateBandwidth(bandwidth float64) {
+	//  Convert bandwidth from bytes/second to bytes/nanosecond
+	b := bandwidth / float64(time.Second)
+	r.bandwidth = b
+	//  Reset allowance
+	r.allowance = 0
+	r.maxAllowance = bandwidth
+	r.lastUpdate = time.Now()
+}
+
+//  Returns how long to wait before sending data with length 'dataSize' bytes
+func (r *ratelimiter) Limit(dataSize int) time.Duration {
+	//  update time
+	var duration time.Duration = time.Duration(0)
+	if r.bandwidth == 0 {
+		return duration
+	}
+	current := time.Now()
+	elapsedTime := current.Sub(r.lastUpdate)
+	r.lastUpdate = current
+
+	allowance := r.allowance + float64(elapsedTime)*r.bandwidth
+	//  allowance can't exceed bandwidth
+	if allowance > r.maxAllowance {
+		allowance = r.maxAllowance
+	}
+
+	allowance -= float64(dataSize)
+	if allowance < 0 {
+		//  sleep until allowance is back to 0
+		duration = time.Duration(-allowance / r.bandwidth)
+		//  rate limiting was applied, record stats
+		r.count++
+		r.duration += duration
+	}
+
+	r.allowance = allowance
+	return duration
+}