From f89cfc11086f788beb9ce85bff2f5a8885d2f5c4 Mon Sep 17 00:00:00 2001 From: Jeromy Date: Mon, 27 Oct 2014 21:19:22 +0000 Subject: [PATCH] moved pipes package --- .../src/github.com/jbenet/go-msgio/chan.go | 16 +++++++++++++++- crypto/spipe/pipe.go | 2 +- crypto/spipe/signedpipe.go | 2 +- net/conn/conn.go | 4 ++-- net/conn/secure_conn.go | 2 +- {pipes => util/pipes}/duplex.go | 0 6 files changed, 20 insertions(+), 6 deletions(-) rename {pipes => util/pipes}/duplex.go (100%) diff --git a/Godeps/_workspace/src/github.com/jbenet/go-msgio/chan.go b/Godeps/_workspace/src/github.com/jbenet/go-msgio/chan.go index 84ccbe355..1c16a9f0f 100644 --- a/Godeps/_workspace/src/github.com/jbenet/go-msgio/chan.go +++ b/Godeps/_workspace/src/github.com/jbenet/go-msgio/chan.go @@ -13,7 +13,15 @@ type Chan struct { BufPool *sync.Pool } -func NewChan(chanSize int, pool *sync.Pool) *Chan { +func NewChan(chanSize int) *Chan { + return &Chan{ + MsgChan: make(chan []byte, chanSize), + ErrChan: make(chan error, 1), + CloseChan: make(chan bool, 2), + } +} + +func NewChanWithPool(chanSize int, pool *sync.Pool) *Chan { return &Chan{ MsgChan: make(chan []byte, chanSize), ErrChan: make(chan error, 1), @@ -26,6 +34,12 @@ func (s *Chan) ReadFrom(r io.Reader, maxMsgLen int) { // new buffer per message // if bottleneck, cycle around a set of buffers mr := NewReader(r) + if s.BufPool == nil { + s.BufPool = new(sync.Pool) + s.BufPool.New = func() interface{} { + return make([]byte, maxMsgLen) + } + } Loop: for { bufi := s.BufPool.Get() diff --git a/crypto/spipe/pipe.go b/crypto/spipe/pipe.go index b1c9ecccc..3f8f57b62 100644 --- a/crypto/spipe/pipe.go +++ b/crypto/spipe/pipe.go @@ -6,7 +6,7 @@ import ( context "github.com/jbenet/go-ipfs/Godeps/_workspace/src/code.google.com/p/go.net/context" peer "github.com/jbenet/go-ipfs/peer" - pipes "github.com/jbenet/go-ipfs/pipes" + pipes "github.com/jbenet/go-ipfs/util/pipes" ) // SecurePipe objects represent a bi-directional message channel. diff --git a/crypto/spipe/signedpipe.go b/crypto/spipe/signedpipe.go index 0a78388f5..445981305 100644 --- a/crypto/spipe/signedpipe.go +++ b/crypto/spipe/signedpipe.go @@ -11,7 +11,7 @@ import ( ci "github.com/jbenet/go-ipfs/crypto" pb "github.com/jbenet/go-ipfs/crypto/spipe/internal/pb" "github.com/jbenet/go-ipfs/peer" - "github.com/jbenet/go-ipfs/pipes" + "github.com/jbenet/go-ipfs/util/pipes" ) type SignedPipe struct { diff --git a/net/conn/conn.go b/net/conn/conn.go index 537f30870..e0272c335 100644 --- a/net/conn/conn.go +++ b/net/conn/conn.go @@ -51,8 +51,8 @@ type msgioPipe struct { func newMsgioPipe(size int, pool *sync.Pool) *msgioPipe { return &msgioPipe{ - outgoing: msgio.NewChan(size, nil), - incoming: msgio.NewChan(size, pool), + outgoing: msgio.NewChan(size), + incoming: msgio.NewChanWithPool(size, pool), } } diff --git a/net/conn/secure_conn.go b/net/conn/secure_conn.go index 644f39b39..5c272de28 100644 --- a/net/conn/secure_conn.go +++ b/net/conn/secure_conn.go @@ -8,8 +8,8 @@ import ( spipe "github.com/jbenet/go-ipfs/crypto/spipe" peer "github.com/jbenet/go-ipfs/peer" - "github.com/jbenet/go-ipfs/pipes" ctxc "github.com/jbenet/go-ipfs/util/ctxcloser" + "github.com/jbenet/go-ipfs/util/pipes" ) // secureConn wraps another Conn object with an encrypted channel. diff --git a/pipes/duplex.go b/util/pipes/duplex.go similarity index 100% rename from pipes/duplex.go rename to util/pipes/duplex.go