comments and gofmt

This commit is contained in:
iamqizhao
2016-01-25 11:18:41 -08:00
parent da3bb0c9f7
commit 61d7c39d4c
5 changed files with 32 additions and 18 deletions

View File

@ -91,12 +91,16 @@ func WithCodec(c Codec) DialOption {
} }
} }
// WithCompressor returns a DialOption which sets a CompressorGenerator for generating message
// compressor.
func WithCompressor(f CompressorGenerator) DialOption { func WithCompressor(f CompressorGenerator) DialOption {
return func(o *dialOptions) { return func(o *dialOptions) {
o.cg = f o.cg = f
} }
} }
// WithDecompressor returns a DialOption which sets a DecompressorGenerator for generating
// message decompressor.
func WithDecompressor(f DecompressorGenerator) DialOption { func WithDecompressor(f DecompressorGenerator) DialOption {
return func(o *dialOptions) { return func(o *dialOptions) {
o.dg = f o.dg = f

View File

@ -78,11 +78,15 @@ func (protoCodec) String() string {
return "proto" return "proto"
} }
// Compressor defines the interface gRPC uses to compress a message.
type Compressor interface { type Compressor interface {
// Do compresses p into w.
Do(w io.Writer, p []byte) error Do(w io.Writer, p []byte) error
// Type returns the compression algorithm the Compressor uses.
Type() string Type() string
} }
// NewGZIPCompressor creates a Compressor based on GZIP.
func NewGZIPCompressor() Compressor { func NewGZIPCompressor() Compressor {
return &gzipCompressor{} return &gzipCompressor{}
} }
@ -102,14 +106,18 @@ func (c *gzipCompressor) Type() string {
return "gzip" return "gzip"
} }
// Decompressor defines the interface gRPC uses to decompress a message.
type Decompressor interface { type Decompressor interface {
// Do reads the data from r and uncompress them.
Do(r io.Reader) ([]byte, error) Do(r io.Reader) ([]byte, error)
// Type returns the compression algorithm the Decompressor uses.
Type() string Type() string
} }
type gzipDecompressor struct { type gzipDecompressor struct {
} }
// NewGZIPDecompressor creates a Decompressor based on GZIP.
func NewGZIPDecompressor() Decompressor { func NewGZIPDecompressor() Decompressor {
return &gzipDecompressor{} return &gzipDecompressor{}
} }
@ -127,8 +135,10 @@ func (d *gzipDecompressor) Type() string {
return "gzip" return "gzip"
} }
// CompressorGenerator defines the function generating a Compressor.
type CompressorGenerator func() Compressor type CompressorGenerator func() Compressor
// DecompressorGenerator defines the function generating a Decompressor.
type DecompressorGenerator func() Decompressor type DecompressorGenerator func() Decompressor
// callInfo contains all related configuration and information about an RPC. // callInfo contains all related configuration and information about an RPC.

View File

@ -563,7 +563,7 @@ func testEmptyUnaryWithUserAgent(t *testing.T, e env) {
t.Fatalf("header[\"ua\"] = %q, %t, want %q, true", v, ok, testAppUA) t.Fatalf("header[\"ua\"] = %q, %t, want %q, true", v, ok, testAppUA)
} }
tearDown(s, cc) tearDown(s, cc)
ctx, _ = context.WithTimeout(context.Background(), 5 * time.Second) ctx, _ = context.WithTimeout(context.Background(), 5*time.Second)
if _, err := cc.WaitForStateChange(ctx, grpc.Ready); err != nil { if _, err := cc.WaitForStateChange(ctx, grpc.Ready); err != nil {
t.Fatalf("cc.WaitForStateChange(_, %s) = _, %v, want _, <nil>", grpc.Ready, err) t.Fatalf("cc.WaitForStateChange(_, %s) = _, %v, want _, <nil>", grpc.Ready, err)
} }
@ -823,12 +823,12 @@ func testCancelNoIO(t *testing.T, e env) {
go func() { go func() {
defer close(ch) defer close(ch)
// This should be blocked until the 1st is canceled. // This should be blocked until the 1st is canceled.
ctx, _ := context.WithTimeout(context.Background(), 2 * time.Second) ctx, _ := context.WithTimeout(context.Background(), 2*time.Second)
if _, err := tc.StreamingInputCall(ctx); err != nil { if _, err := tc.StreamingInputCall(ctx); err != nil {
t.Errorf("%v.StreamingInputCall(_) = _, %v, want _, <nil>", tc, err) t.Errorf("%v.StreamingInputCall(_) = _, %v, want _, <nil>", tc, err)
} }
}() }()
cancel(); cancel()
<-ch <-ch
} }

View File

@ -208,13 +208,13 @@ func (t *http2Client) newStream(ctx context.Context, callHdr *CallHdr) *Stream {
} }
// TODO(zhaoq): Handle uint32 overflow of Stream.id. // TODO(zhaoq): Handle uint32 overflow of Stream.id.
s := &Stream{ s := &Stream{
id: t.nextID, id: t.nextID,
method: callHdr.Method, method: callHdr.Method,
sendCompress: callHdr.SendCompress, sendCompress: callHdr.SendCompress,
buf: newRecvBuffer(), buf: newRecvBuffer(),
fc: fc, fc: fc,
sendQuotaPool: newQuotaPool(int(t.streamSendQuota)), sendQuotaPool: newQuotaPool(int(t.streamSendQuota)),
headerChan: make(chan struct{}), headerChan: make(chan struct{}),
} }
t.nextID += 2 t.nextID += 2
s.windowHandler = func(n int) { s.windowHandler = func(n int) {

View File

@ -170,13 +170,13 @@ type Stream struct {
ctx context.Context ctx context.Context
cancel context.CancelFunc cancel context.CancelFunc
// method records the associated RPC method of the stream. // method records the associated RPC method of the stream.
method string method string
recvCompress string recvCompress string
sendCompress string sendCompress string
buf *recvBuffer buf *recvBuffer
dec io.Reader dec io.Reader
fc *inFlow fc *inFlow
recvQuota uint32 recvQuota uint32
// The accumulated inbound quota pending for window update. // The accumulated inbound quota pending for window update.
updateQuota uint32 updateQuota uint32
// The handler to control the window update procedure for both this // The handler to control the window update procedure for both this
@ -358,8 +358,8 @@ type Options struct {
// CallHdr carries the information of a particular RPC. // CallHdr carries the information of a particular RPC.
type CallHdr struct { type CallHdr struct {
Host string // peer host Host string // peer host
Method string // the operation to perform on the specified host Method string // the operation to perform on the specified host
RecvCompress string RecvCompress string
SendCompress string SendCompress string
} }