From 4309d15edb29b0f524a0d58bc1335fb73c66116d Mon Sep 17 00:00:00 2001 From: Juan Batiz-Benet Date: Mon, 29 Dec 2014 07:40:11 -0800 Subject: [PATCH] updated msgio (varints) --- Godeps/Godeps.json | 2 +- .../github.com/jbenet/go-msgio/msgio_test.go | 28 ++- .../src/github.com/jbenet/go-msgio/varint.go | 182 ++++++++++++++++++ .../github.com/jbenet/go-msgio/varint_test.go | 66 +++++++ 4 files changed, 269 insertions(+), 9 deletions(-) create mode 100644 Godeps/_workspace/src/github.com/jbenet/go-msgio/varint.go create mode 100644 Godeps/_workspace/src/github.com/jbenet/go-msgio/varint_test.go diff --git a/Godeps/Godeps.json b/Godeps/Godeps.json index b473d6b72..61c05dd9a 100644 --- a/Godeps/Godeps.json +++ b/Godeps/Godeps.json @@ -118,7 +118,7 @@ }, { "ImportPath": "github.com/jbenet/go-msgio", - "Rev": "5e7289d3a0cd046a5bee30b187cc844c31f54dce" + "Rev": "dbae89193876910c736b2ce1291fa8bbcf299d77" }, { "ImportPath": "github.com/jbenet/go-multiaddr", diff --git a/Godeps/_workspace/src/github.com/jbenet/go-msgio/msgio_test.go b/Godeps/_workspace/src/github.com/jbenet/go-msgio/msgio_test.go index cef3e9cd0..05eca7b24 100644 --- a/Godeps/_workspace/src/github.com/jbenet/go-msgio/msgio_test.go +++ b/Godeps/_workspace/src/github.com/jbenet/go-msgio/msgio_test.go @@ -15,6 +15,24 @@ func TestReadWrite(t *testing.T) { buf := bytes.NewBuffer(nil) writer := NewWriter(buf) reader := NewReader(buf) + SubtestReadWrite(t, writer, reader) +} + +func TestReadWriteMsg(t *testing.T) { + buf := bytes.NewBuffer(nil) + writer := NewWriter(buf) + reader := NewReader(buf) + SubtestReadWriteMsg(t, writer, reader) +} + +func TestReadWriteMsgSync(t *testing.T) { + buf := bytes.NewBuffer(nil) + writer := NewWriter(buf) + reader := NewReader(buf) + SubtestReadWriteMsgSync(t, writer, reader) +} + +func SubtestReadWrite(t *testing.T, writer WriteCloser, reader ReadCloser) { msgs := [1000][]byte{} r := rand.New(rand.NewSource(time.Now().UnixNano())) @@ -58,10 +76,7 @@ func TestReadWrite(t *testing.T) { } } -func TestReadWriteMsg(t *testing.T) { - buf := bytes.NewBuffer(nil) - writer := NewWriter(buf) - reader := NewReader(buf) +func SubtestReadWriteMsg(t *testing.T, writer WriteCloser, reader ReadCloser) { msgs := [1000][]byte{} r := rand.New(rand.NewSource(time.Now().UnixNano())) @@ -100,10 +115,7 @@ func TestReadWriteMsg(t *testing.T) { } } -func TestReadWriteMsgSync(t *testing.T) { - buf := bytes.NewBuffer(nil) - writer := NewWriter(buf) - reader := NewReader(buf) +func SubtestReadWriteMsgSync(t *testing.T, writer WriteCloser, reader ReadCloser) { msgs := [1000][]byte{} r := rand.New(rand.NewSource(time.Now().UnixNano())) diff --git a/Godeps/_workspace/src/github.com/jbenet/go-msgio/varint.go b/Godeps/_workspace/src/github.com/jbenet/go-msgio/varint.go new file mode 100644 index 000000000..613470503 --- /dev/null +++ b/Godeps/_workspace/src/github.com/jbenet/go-msgio/varint.go @@ -0,0 +1,182 @@ +package msgio + +import ( + "encoding/binary" + "io" + "sync" + + mpool "github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-msgio/mpool" +) + +// varintWriter is the underlying type that implements the Writer interface. +type varintWriter struct { + W io.Writer + + lbuf []byte // for encoding varints + lock sync.Locker // for threadsafe writes +} + +// NewVarintWriter wraps an io.Writer with a varint msgio framed writer. +// The msgio.Writer will write the length prefix of every message written +// as a varint, using https://golang.org/pkg/encoding/binary/#PutUvarint +func NewVarintWriter(w io.Writer) WriteCloser { + return &varintWriter{ + W: w, + lbuf: make([]byte, binary.MaxVarintLen64), + lock: new(sync.Mutex), + } +} + +func (s *varintWriter) Write(msg []byte) (int, error) { + err := s.WriteMsg(msg) + if err != nil { + return 0, err + } + return len(msg), nil +} + +func (s *varintWriter) WriteMsg(msg []byte) error { + s.lock.Lock() + defer s.lock.Unlock() + + length := uint64(len(msg)) + n := binary.PutUvarint(s.lbuf, length) + if _, err := s.W.Write(s.lbuf[:n]); err != nil { + return err + } + _, err := s.W.Write(msg) + return err +} + +func (s *varintWriter) Close() error { + s.lock.Lock() + defer s.lock.Unlock() + + if c, ok := s.W.(io.Closer); ok { + return c.Close() + } + return nil +} + +// varintReader is the underlying type that implements the Reader interface. +type varintReader struct { + R io.Reader + br io.ByteReader // for reading varints. + + lbuf []byte + next int + pool *mpool.Pool + lock sync.Locker +} + +// NewVarintReader wraps an io.Reader with a varint msgio framed reader. +// The msgio.Reader will read whole messages at a time (using the length). +// Varints read according to https://golang.org/pkg/encoding/binary/#ReadUvarint +// Assumes an equivalent writer on the other side. +func NewVarintReader(r io.Reader) ReadCloser { + return NewVarintReaderWithPool(r, &mpool.ByteSlicePool) +} + +// NewVarintReaderWithPool wraps an io.Reader with a varint msgio framed reader. +// The msgio.Reader will read whole messages at a time (using the length). +// Varints read according to https://golang.org/pkg/encoding/binary/#ReadUvarint +// Assumes an equivalent writer on the other side. It uses a given mpool.Pool +func NewVarintReaderWithPool(r io.Reader, p *mpool.Pool) ReadCloser { + if p == nil { + panic("nil pool") + } + return &varintReader{ + R: r, + br: &simpleByteReader{R: r}, + lbuf: make([]byte, binary.MaxVarintLen64), + next: -1, + pool: p, + lock: new(sync.Mutex), + } +} + +// NextMsgLen reads the length of the next msg into s.lbuf, and returns it. +// WARNING: like Read, NextMsgLen is destructive. It reads from the internal +// reader. +func (s *varintReader) NextMsgLen() (int, error) { + s.lock.Lock() + defer s.lock.Unlock() + return s.nextMsgLen() +} + +func (s *varintReader) nextMsgLen() (int, error) { + if s.next == -1 { + length, err := binary.ReadUvarint(s.br) + if err != nil { + return 0, err + } + s.next = int(length) + } + return s.next, nil +} + +func (s *varintReader) Read(msg []byte) (int, error) { + s.lock.Lock() + defer s.lock.Unlock() + + length, err := s.nextMsgLen() + if err != nil { + return 0, err + } + + if length > len(msg) { + return 0, io.ErrShortBuffer + } + _, err = io.ReadFull(s.R, msg[:length]) + s.next = -1 // signal we've consumed this msg + return length, err +} + +func (s *varintReader) ReadMsg() ([]byte, error) { + s.lock.Lock() + defer s.lock.Unlock() + + length, err := s.nextMsgLen() + if err != nil { + return nil, err + } + + msgb := s.pool.Get(uint32(length)) + if msgb == nil { + return nil, io.ErrShortBuffer + } + msg := msgb.([]byte)[:length] + _, err = io.ReadFull(s.R, msg) + s.next = -1 // signal we've consumed this msg + return msg, err +} + +func (s *varintReader) ReleaseMsg(msg []byte) { + s.pool.Put(uint32(cap(msg)), msg) +} + +func (s *varintReader) Close() error { + s.lock.Lock() + defer s.lock.Unlock() + + if c, ok := s.R.(io.Closer); ok { + return c.Close() + } + return nil +} + +type simpleByteReader struct { + R io.Reader + buf []byte +} + +func (r *simpleByteReader) ReadByte() (c byte, err error) { + if r.buf == nil { + r.buf = make([]byte, 1) + } + + if _, err := io.ReadFull(r.R, r.buf); err != nil { + return 0, err + } + return r.buf[0], nil +} diff --git a/Godeps/_workspace/src/github.com/jbenet/go-msgio/varint_test.go b/Godeps/_workspace/src/github.com/jbenet/go-msgio/varint_test.go new file mode 100644 index 000000000..c63415089 --- /dev/null +++ b/Godeps/_workspace/src/github.com/jbenet/go-msgio/varint_test.go @@ -0,0 +1,66 @@ +package msgio + +import ( + "bytes" + "encoding/binary" + "testing" +) + +func TestVarintReadWrite(t *testing.T) { + buf := bytes.NewBuffer(nil) + writer := NewVarintWriter(buf) + reader := NewVarintReader(buf) + SubtestReadWrite(t, writer, reader) +} + +func TestVarintReadWriteMsg(t *testing.T) { + buf := bytes.NewBuffer(nil) + writer := NewVarintWriter(buf) + reader := NewVarintReader(buf) + SubtestReadWriteMsg(t, writer, reader) +} + +func TestVarintReadWriteMsgSync(t *testing.T) { + buf := bytes.NewBuffer(nil) + writer := NewVarintWriter(buf) + reader := NewVarintReader(buf) + SubtestReadWriteMsgSync(t, writer, reader) +} + +func TestVarintWrite(t *testing.T) { + SubtestVarintWrite(t, []byte("hello world")) + SubtestVarintWrite(t, []byte("hello world hello world hello world")) + SubtestVarintWrite(t, make([]byte, 1<<20)) + SubtestVarintWrite(t, []byte("")) +} + +func SubtestVarintWrite(t *testing.T, msg []byte) { + buf := bytes.NewBuffer(nil) + writer := NewVarintWriter(buf) + + if err := writer.WriteMsg(msg); err != nil { + t.Fatal(err) + } + + bb := buf.Bytes() + + sbr := simpleByteReader{R: buf} + length, err := binary.ReadUvarint(&sbr) + if err != nil { + t.Fatal(err) + } + + t.Logf("checking varint is %d", len(msg)) + if int(length) != len(msg) { + t.Fatalf("incorrect varint: %d != %d", length, len(msg)) + } + + lbuf := make([]byte, binary.MaxVarintLen64) + n := binary.PutUvarint(lbuf, length) + + bblen := int(length) + n + t.Logf("checking wrote (%d + %d) bytes", length, n) + if len(bb) != bblen { + t.Fatalf("wrote incorrect number of bytes: %d != %d", len(bb), bblen) + } +}