diff --git a/Godeps/Godeps.json b/Godeps/Godeps.json index 86e993317..b9beea618 100644 --- a/Godeps/Godeps.json +++ b/Godeps/Godeps.json @@ -162,7 +162,7 @@ }, { "ImportPath": "github.com/jbenet/go-msgio", - "Rev": "b4f3f1e1c7ec0cbf2fe35d8a45d1c253d224dc72" + "Rev": "9399b44f6bf265b30bedaf2af8c0604bbc8d5275" }, { "ImportPath": "github.com/jbenet/go-multiaddr", diff --git a/Godeps/_workspace/src/github.com/jbenet/go-msgio/limit.go b/Godeps/_workspace/src/github.com/jbenet/go-msgio/limit.go new file mode 100644 index 000000000..bce33cff1 --- /dev/null +++ b/Godeps/_workspace/src/github.com/jbenet/go-msgio/limit.go @@ -0,0 +1,45 @@ +package msgio + +import ( + "bytes" + "io" + "sync" +) + +// LimitedReader wraps an io.Reader with a msgio framed reader. The LimitedReader +// will return a reader which will io.EOF when the msg length is done. +func LimitedReader(r io.Reader) (io.Reader, error) { + l, err := ReadLen(r, nil) + return io.LimitReader(r, int64(l)), err +} + +// LimitedWriter wraps an io.Writer with a msgio framed writer. It is the inverse +// of LimitedReader: it will buffer all writes until "Flush" is called. When Flush +// is called, it will write the size of the buffer first, flush the buffer, reset +// the buffer, and begin accept more incoming writes. +func NewLimitedWriter(w io.Writer) *LimitedWriter { + return &LimitedWriter{W: w} +} + +type LimitedWriter struct { + W io.Writer + B bytes.Buffer + M sync.Mutex +} + +func (w *LimitedWriter) Write(buf []byte) (n int, err error) { + w.M.Lock() + n, err = w.B.Write(buf) + w.M.Unlock() + return n, err +} + +func (w *LimitedWriter) Flush() error { + w.M.Lock() + defer w.M.Unlock() + if err := WriteLen(w.W, w.B.Len()); err != nil { + return err + } + _, err := w.B.WriteTo(w.W) + return err +} diff --git a/Godeps/_workspace/src/github.com/jbenet/go-msgio/msgio.go b/Godeps/_workspace/src/github.com/jbenet/go-msgio/msgio.go index 6547e6890..4bb92debe 100644 --- a/Godeps/_workspace/src/github.com/jbenet/go-msgio/msgio.go +++ b/Godeps/_workspace/src/github.com/jbenet/go-msgio/msgio.go @@ -1,7 +1,6 @@ package msgio import ( - "encoding/binary" "errors" "io" "sync" @@ -9,9 +8,6 @@ import ( mpool "github.com/ipfs/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-msgio/mpool" ) -// NBO is NetworkByteOrder -var NBO = binary.BigEndian - // ErrMsgTooLarge is returned when the message length is exessive var ErrMsgTooLarge = errors.New("message too large") @@ -101,9 +97,7 @@ func (s *writer) Write(msg []byte) (int, error) { func (s *writer) WriteMsg(msg []byte) (err error) { s.lock.Lock() defer s.lock.Unlock() - - length := uint32(len(msg)) - if err := binary.Write(s.W, NBO, &length); err != nil { + if err := WriteLen(s.W, len(msg)); err != nil { return err } _, err = s.W.Write(msg) @@ -166,10 +160,12 @@ func (s *reader) NextMsgLen() (int, error) { func (s *reader) nextMsgLen() (int, error) { if s.next == -1 { - if _, err := io.ReadFull(s.R, s.lbuf); err != nil { + n, err := ReadLen(s.R, s.lbuf) + if err != nil { return 0, err } - s.next = int(NBO.Uint32(s.lbuf)) + + s.next = n } return s.next, nil } @@ -186,6 +182,7 @@ func (s *reader) Read(msg []byte) (int, error) { if length > len(msg) { return 0, io.ErrShortBuffer } + _, err = io.ReadFull(s.R, msg[:length]) s.next = -1 // signal we've consumed this msg return length, err @@ -200,7 +197,7 @@ func (s *reader) ReadMsg() ([]byte, error) { return nil, err } - if length > s.max { + if length > s.max || length < 0 { return nil, ErrMsgTooLarge } diff --git a/Godeps/_workspace/src/github.com/jbenet/go-msgio/msgio/.gitignore b/Godeps/_workspace/src/github.com/jbenet/go-msgio/msgio/.gitignore new file mode 100644 index 000000000..fc28073bf --- /dev/null +++ b/Godeps/_workspace/src/github.com/jbenet/go-msgio/msgio/.gitignore @@ -0,0 +1 @@ +msgio diff --git a/Godeps/_workspace/src/github.com/jbenet/go-msgio/msgio/README.md b/Godeps/_workspace/src/github.com/jbenet/go-msgio/msgio/README.md new file mode 100644 index 000000000..9a7318f9b --- /dev/null +++ b/Godeps/_workspace/src/github.com/jbenet/go-msgio/msgio/README.md @@ -0,0 +1,24 @@ +# msgio headers tool + +Conveniently output msgio headers. + +## Install + +``` +go get github.com/jbenet/go-msgio/msgio +``` + +## Usage + +``` +> msgio -h +msgio - tool to wrap messages with msgio header + +Usage + msgio header 1020 >header + cat file | msgio wrap >wrapped + +Commands + header output a msgio header of given size + wrap wrap incoming stream with msgio +``` diff --git a/Godeps/_workspace/src/github.com/jbenet/go-msgio/msgio/msgio.go b/Godeps/_workspace/src/github.com/jbenet/go-msgio/msgio/msgio.go new file mode 100644 index 000000000..a97793478 --- /dev/null +++ b/Godeps/_workspace/src/github.com/jbenet/go-msgio/msgio/msgio.go @@ -0,0 +1,108 @@ +package main + +import ( + "flag" + "fmt" + "io" + "io/ioutil" + "os" + "strconv" + "strings" + + msgio "github.com/ipfs/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-msgio" +) + +var Args ArgType + +type ArgType struct { + Command string + Args []string +} + +func (a *ArgType) Arg(i int) string { + n := i + 1 + if len(a.Args) < n { + die(fmt.Sprintf("expected %d argument(s)", n)) + } + return a.Args[i] +} + +var usageStr = ` +msgio - tool to wrap messages with msgio header + +Usage + msgio header 1020 >header + cat file | msgio wrap >wrapped + +Commands + header output a msgio header of given size + wrap wrap incoming stream with msgio +` + +func usage() { + fmt.Println(strings.TrimSpace(usageStr)) + os.Exit(0) +} + +func die(err string) { + fmt.Fprintf(os.Stderr, "error: %s\n", err) + os.Exit(-1) +} + +func main() { + if err := run(); err != nil { + die(err.Error()) + } +} + +func argParse() { + flag.Usage = usage + flag.Parse() + + args := flag.Args() + if l := len(args); l < 1 || l > 2 { + usage() + } + + Args.Command = flag.Args()[0] + Args.Args = flag.Args()[1:] +} + +func run() error { + argParse() + + w := os.Stdout + r := os.Stdin + + switch Args.Command { + case "header": + size, err := strconv.Atoi(Args.Arg(0)) + if err != nil { + return err + } + return header(w, size) + case "wrap": + return wrap(w, r) + default: + usage() + return nil + } +} + +func header(w io.Writer, size int) error { + return msgio.WriteLen(w, size) +} + +func wrap(w io.Writer, r io.Reader) error { + buf, err := ioutil.ReadAll(r) + if err != nil { + return err + } + + if err := msgio.WriteLen(w, len(buf)); err != nil { + return err + } + + _, err = w.Write(buf) + return err +} diff --git a/Godeps/_workspace/src/github.com/jbenet/go-msgio/msgio_test.go b/Godeps/_workspace/src/github.com/jbenet/go-msgio/msgio_test.go index 05eca7b24..da1590d6e 100644 --- a/Godeps/_workspace/src/github.com/jbenet/go-msgio/msgio_test.go +++ b/Godeps/_workspace/src/github.com/jbenet/go-msgio/msgio_test.go @@ -180,3 +180,18 @@ func SubtestReadWriteMsgSync(t *testing.T, writer WriteCloser, reader ReadCloser t.Error(e) } } + +func TestBadSizes(t *testing.T) { + data := make([]byte, 4) + + // on a 64 bit system, this will fail because its too large + // on a 32 bit system, this will fail because its too small + NBO.PutUint32(data, 4000000000) + buf := bytes.NewReader(data) + read := NewReader(buf) + msg, err := read.ReadMsg() + if err == nil { + t.Fatal(err) + } + _ = msg +} diff --git a/Godeps/_workspace/src/github.com/jbenet/go-msgio/num.go b/Godeps/_workspace/src/github.com/jbenet/go-msgio/num.go new file mode 100644 index 000000000..513c1994f --- /dev/null +++ b/Godeps/_workspace/src/github.com/jbenet/go-msgio/num.go @@ -0,0 +1,33 @@ +package msgio + +import ( + "encoding/binary" + "io" +) + +// NBO is NetworkByteOrder +var NBO = binary.BigEndian + +// WriteLen writes a length to the given writer. +func WriteLen(w io.Writer, l int) error { + ul := uint32(l) + return binary.Write(w, NBO, &ul) +} + +// ReadLen reads a length from the given reader. +// if buf is non-nil, it reuses the buffer. Ex: +// l, err := ReadLen(r, nil) +// _, err := ReadLen(r, buf) +func ReadLen(r io.Reader, buf []byte) (int, error) { + if len(buf) < 4 { + buf = make([]byte, 4) + } + buf = buf[:4] + + if _, err := io.ReadFull(r, buf); err != nil { + return 0, err + } + + n := int(NBO.Uint32(buf)) + return n, nil +}