1
0
mirror of https://github.com/ipfs/kubo.git synced 2025-06-28 17:03:58 +08:00

updated msgio (varints)

This commit is contained in:
Juan Batiz-Benet
2014-12-29 07:40:11 -08:00
parent e2698a8733
commit 4309d15edb
4 changed files with 269 additions and 9 deletions

2
Godeps/Godeps.json generated
View File

@ -118,7 +118,7 @@
},
{
"ImportPath": "github.com/jbenet/go-msgio",
"Rev": "5e7289d3a0cd046a5bee30b187cc844c31f54dce"
"Rev": "dbae89193876910c736b2ce1291fa8bbcf299d77"
},
{
"ImportPath": "github.com/jbenet/go-multiaddr",

View File

@ -15,6 +15,24 @@ func TestReadWrite(t *testing.T) {
buf := bytes.NewBuffer(nil)
writer := NewWriter(buf)
reader := NewReader(buf)
SubtestReadWrite(t, writer, reader)
}
func TestReadWriteMsg(t *testing.T) {
buf := bytes.NewBuffer(nil)
writer := NewWriter(buf)
reader := NewReader(buf)
SubtestReadWriteMsg(t, writer, reader)
}
func TestReadWriteMsgSync(t *testing.T) {
buf := bytes.NewBuffer(nil)
writer := NewWriter(buf)
reader := NewReader(buf)
SubtestReadWriteMsgSync(t, writer, reader)
}
func SubtestReadWrite(t *testing.T, writer WriteCloser, reader ReadCloser) {
msgs := [1000][]byte{}
r := rand.New(rand.NewSource(time.Now().UnixNano()))
@ -58,10 +76,7 @@ func TestReadWrite(t *testing.T) {
}
}
func TestReadWriteMsg(t *testing.T) {
buf := bytes.NewBuffer(nil)
writer := NewWriter(buf)
reader := NewReader(buf)
func SubtestReadWriteMsg(t *testing.T, writer WriteCloser, reader ReadCloser) {
msgs := [1000][]byte{}
r := rand.New(rand.NewSource(time.Now().UnixNano()))
@ -100,10 +115,7 @@ func TestReadWriteMsg(t *testing.T) {
}
}
func TestReadWriteMsgSync(t *testing.T) {
buf := bytes.NewBuffer(nil)
writer := NewWriter(buf)
reader := NewReader(buf)
func SubtestReadWriteMsgSync(t *testing.T, writer WriteCloser, reader ReadCloser) {
msgs := [1000][]byte{}
r := rand.New(rand.NewSource(time.Now().UnixNano()))

View File

@ -0,0 +1,182 @@
package msgio
import (
"encoding/binary"
"io"
"sync"
mpool "github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-msgio/mpool"
)
// varintWriter is the underlying type that implements the Writer interface.
type varintWriter struct {
W io.Writer
lbuf []byte // for encoding varints
lock sync.Locker // for threadsafe writes
}
// NewVarintWriter wraps an io.Writer with a varint msgio framed writer.
// The msgio.Writer will write the length prefix of every message written
// as a varint, using https://golang.org/pkg/encoding/binary/#PutUvarint
func NewVarintWriter(w io.Writer) WriteCloser {
return &varintWriter{
W: w,
lbuf: make([]byte, binary.MaxVarintLen64),
lock: new(sync.Mutex),
}
}
func (s *varintWriter) Write(msg []byte) (int, error) {
err := s.WriteMsg(msg)
if err != nil {
return 0, err
}
return len(msg), nil
}
func (s *varintWriter) WriteMsg(msg []byte) error {
s.lock.Lock()
defer s.lock.Unlock()
length := uint64(len(msg))
n := binary.PutUvarint(s.lbuf, length)
if _, err := s.W.Write(s.lbuf[:n]); err != nil {
return err
}
_, err := s.W.Write(msg)
return err
}
func (s *varintWriter) Close() error {
s.lock.Lock()
defer s.lock.Unlock()
if c, ok := s.W.(io.Closer); ok {
return c.Close()
}
return nil
}
// varintReader is the underlying type that implements the Reader interface.
type varintReader struct {
R io.Reader
br io.ByteReader // for reading varints.
lbuf []byte
next int
pool *mpool.Pool
lock sync.Locker
}
// NewVarintReader wraps an io.Reader with a varint msgio framed reader.
// The msgio.Reader will read whole messages at a time (using the length).
// Varints read according to https://golang.org/pkg/encoding/binary/#ReadUvarint
// Assumes an equivalent writer on the other side.
func NewVarintReader(r io.Reader) ReadCloser {
return NewVarintReaderWithPool(r, &mpool.ByteSlicePool)
}
// NewVarintReaderWithPool wraps an io.Reader with a varint msgio framed reader.
// The msgio.Reader will read whole messages at a time (using the length).
// Varints read according to https://golang.org/pkg/encoding/binary/#ReadUvarint
// Assumes an equivalent writer on the other side. It uses a given mpool.Pool
func NewVarintReaderWithPool(r io.Reader, p *mpool.Pool) ReadCloser {
if p == nil {
panic("nil pool")
}
return &varintReader{
R: r,
br: &simpleByteReader{R: r},
lbuf: make([]byte, binary.MaxVarintLen64),
next: -1,
pool: p,
lock: new(sync.Mutex),
}
}
// NextMsgLen reads the length of the next msg into s.lbuf, and returns it.
// WARNING: like Read, NextMsgLen is destructive. It reads from the internal
// reader.
func (s *varintReader) NextMsgLen() (int, error) {
s.lock.Lock()
defer s.lock.Unlock()
return s.nextMsgLen()
}
func (s *varintReader) nextMsgLen() (int, error) {
if s.next == -1 {
length, err := binary.ReadUvarint(s.br)
if err != nil {
return 0, err
}
s.next = int(length)
}
return s.next, nil
}
func (s *varintReader) Read(msg []byte) (int, error) {
s.lock.Lock()
defer s.lock.Unlock()
length, err := s.nextMsgLen()
if err != nil {
return 0, err
}
if length > len(msg) {
return 0, io.ErrShortBuffer
}
_, err = io.ReadFull(s.R, msg[:length])
s.next = -1 // signal we've consumed this msg
return length, err
}
func (s *varintReader) ReadMsg() ([]byte, error) {
s.lock.Lock()
defer s.lock.Unlock()
length, err := s.nextMsgLen()
if err != nil {
return nil, err
}
msgb := s.pool.Get(uint32(length))
if msgb == nil {
return nil, io.ErrShortBuffer
}
msg := msgb.([]byte)[:length]
_, err = io.ReadFull(s.R, msg)
s.next = -1 // signal we've consumed this msg
return msg, err
}
func (s *varintReader) ReleaseMsg(msg []byte) {
s.pool.Put(uint32(cap(msg)), msg)
}
func (s *varintReader) Close() error {
s.lock.Lock()
defer s.lock.Unlock()
if c, ok := s.R.(io.Closer); ok {
return c.Close()
}
return nil
}
type simpleByteReader struct {
R io.Reader
buf []byte
}
func (r *simpleByteReader) ReadByte() (c byte, err error) {
if r.buf == nil {
r.buf = make([]byte, 1)
}
if _, err := io.ReadFull(r.R, r.buf); err != nil {
return 0, err
}
return r.buf[0], nil
}

View File

@ -0,0 +1,66 @@
package msgio
import (
"bytes"
"encoding/binary"
"testing"
)
func TestVarintReadWrite(t *testing.T) {
buf := bytes.NewBuffer(nil)
writer := NewVarintWriter(buf)
reader := NewVarintReader(buf)
SubtestReadWrite(t, writer, reader)
}
func TestVarintReadWriteMsg(t *testing.T) {
buf := bytes.NewBuffer(nil)
writer := NewVarintWriter(buf)
reader := NewVarintReader(buf)
SubtestReadWriteMsg(t, writer, reader)
}
func TestVarintReadWriteMsgSync(t *testing.T) {
buf := bytes.NewBuffer(nil)
writer := NewVarintWriter(buf)
reader := NewVarintReader(buf)
SubtestReadWriteMsgSync(t, writer, reader)
}
func TestVarintWrite(t *testing.T) {
SubtestVarintWrite(t, []byte("hello world"))
SubtestVarintWrite(t, []byte("hello world hello world hello world"))
SubtestVarintWrite(t, make([]byte, 1<<20))
SubtestVarintWrite(t, []byte(""))
}
func SubtestVarintWrite(t *testing.T, msg []byte) {
buf := bytes.NewBuffer(nil)
writer := NewVarintWriter(buf)
if err := writer.WriteMsg(msg); err != nil {
t.Fatal(err)
}
bb := buf.Bytes()
sbr := simpleByteReader{R: buf}
length, err := binary.ReadUvarint(&sbr)
if err != nil {
t.Fatal(err)
}
t.Logf("checking varint is %d", len(msg))
if int(length) != len(msg) {
t.Fatalf("incorrect varint: %d != %d", length, len(msg))
}
lbuf := make([]byte, binary.MaxVarintLen64)
n := binary.PutUvarint(lbuf, length)
bblen := int(length) + n
t.Logf("checking wrote (%d + %d) bytes", length, n)
if len(bb) != bblen {
t.Fatalf("wrote incorrect number of bytes: %d != %d", len(bb), bblen)
}
}