From 13975c070286c7371aa3a8b3c230e90d7bf029fc Mon Sep 17 00:00:00 2001 From: dfawley Date: Mon, 12 Mar 2018 13:27:54 -0700 Subject: [PATCH] stream: split per-attempt data from clientStream (#1900) This is pre-work to implementing retry support. Each retry attempt will have its own csAttempt. The fields left in clientStream are the same across all attempts. --- call.go | 4 +- rpc_util.go | 2 +- stream.go | 263 ++++++++++++++++++++++++++++++---------------------- 3 files changed, 156 insertions(+), 113 deletions(-) diff --git a/call.go b/call.go index a66e3c2d..8299dc53 100644 --- a/call.go +++ b/call.go @@ -54,7 +54,7 @@ func invoke(ctx context.Context, method string, req, reply interface{}, cc *Clie } cs := csInt.(*clientStream) if err := cs.SendMsg(req); err != nil { - if !cs.c.failFast && cs.s.Unprocessed() && firstAttempt { + if !cs.c.failFast && cs.attempt.s.Unprocessed() && firstAttempt { // TODO: Add a field to header for grpc-transparent-retry-attempts firstAttempt = false continue @@ -62,7 +62,7 @@ func invoke(ctx context.Context, method string, req, reply interface{}, cc *Clie return err } if err := cs.RecvMsg(reply); err != nil { - if !cs.c.failFast && cs.s.Unprocessed() && firstAttempt { + if !cs.c.failFast && cs.attempt.s.Unprocessed() && firstAttempt { // TODO: Add a field to header for grpc-transparent-retry-attempts firstAttempt = false continue diff --git a/rpc_util.go b/rpc_util.go index 889ab9cb..aa2101a4 100644 --- a/rpc_util.go +++ b/rpc_util.go @@ -127,7 +127,7 @@ func (d *gzipDecompressor) Type() string { type callInfo struct { compressorType string failFast bool - stream *transport.Stream + stream *clientStream traceInfo traceInfo // in trace.go maxReceiveMessageSize *int maxSendMessageSize *int diff --git a/stream.go b/stream.go index 64e66e17..9967d6ab 100644 --- a/stream.go +++ b/stream.go @@ -266,29 +266,28 @@ func newClientStream(ctx context.Context, desc *StreamDesc, cc *ClientConn, meth break } - c.stream = s cs := &clientStream{ opts: opts, c: c, desc: desc, codec: c.codec, cp: cp, - dc: cc.dopts.dc, comp: comp, cancel: cancel, - - done: done, - t: t, - s: s, - p: &parser{r: s}, - - tracing: EnableTracing, - trInfo: trInfo, - - statsCtx: ctx, - statsHandler: cc.dopts.copts.StatsHandler, - beginTime: beginTime, + attempt: &csAttempt{ + t: t, + s: s, + p: &parser{r: s}, + done: done, + dc: cc.dopts.dc, + ctx: ctx, + trInfo: trInfo, + statsHandler: sh, + beginTime: beginTime, + }, } + cs.c.stream = cs + cs.attempt.cs = cs if desc != unaryStreamDesc { // Listen on cc and stream contexts to cleanup when the user closes the // ClientConn or cancels the stream context. In all other cases, an error @@ -300,7 +299,7 @@ func newClientStream(ctx context.Context, desc *StreamDesc, cc *ClientConn, meth case <-cc.ctx.Done(): cs.finish(ErrClientConnClosing) case <-ctx.Done(): - cs.finish(toRPCErr(s.Context().Err())) + cs.finish(toRPCErr(ctx.Err())) } }() } @@ -311,47 +310,56 @@ func newClientStream(ctx context.Context, desc *StreamDesc, cc *ClientConn, meth type clientStream struct { opts []CallOption c *callInfo + desc *StreamDesc + + codec baseCodec + cp Compressor + comp encoding.Compressor + + cancel context.CancelFunc // cancels all attempts + + sentLast bool // sent an end stream + + mu sync.Mutex // guards finished + finished bool // TODO: replace with atomic cmpxchg or sync.Once? + + attempt *csAttempt // the active client stream attempt + // TODO(hedging): hedging will have multiple attempts simultaneously. +} + +// csAttempt implements a single transport stream attempt within a +// clientStream. +type csAttempt struct { + cs *clientStream t transport.ClientTransport s *transport.Stream p *parser - desc *StreamDesc + done func(balancer.DoneInfo) - codec baseCodec - cp Compressor dc Decompressor - comp encoding.Compressor decomp encoding.Compressor decompSet bool - // cancel is only called when RecvMsg() returns non-nil error, which means - // the stream finishes with error or with io.EOF. - cancel context.CancelFunc + ctx context.Context // the application's context, wrapped by stats/tracing - tracing bool // set to EnableTracing when the clientStream is created. - - mu sync.Mutex - done func(balancer.DoneInfo) - sentLast bool // sent an end stream - finished bool - // trInfo.tr is set when the clientStream is created (if EnableTracing is true), - // and is set to nil when the clientStream's finish method is called. + mu sync.Mutex // guards trInfo.tr + // trInfo.tr is set when created (if EnableTracing is true), + // and cleared when the finish method is called. trInfo traceInfo - // statsCtx keeps the user context for stats handling. - // All stats collection should use the statsCtx (instead of the stream context) - // so that all the generated stats for a particular RPC can be associated in the processing phase. - statsCtx context.Context statsHandler stats.Handler beginTime time.Time } func (cs *clientStream) Context() context.Context { - return cs.s.Context() + // TODO(retry): commit the current attempt (the context has peer-aware data). + return cs.attempt.context() } func (cs *clientStream) Header() (metadata.MD, error) { - m, err := cs.s.Header() + m, err := cs.attempt.header() if err != nil { + // TODO(retry): maybe retry on error or commit attempt on success. err = toRPCErr(err) cs.finish(err) } @@ -359,20 +367,61 @@ func (cs *clientStream) Header() (metadata.MD, error) { } func (cs *clientStream) Trailer() metadata.MD { - return cs.s.Trailer() + // TODO(retry): on error, maybe retry (trailers-only). + return cs.attempt.trailer() } func (cs *clientStream) SendMsg(m interface{}) (err error) { - // TODO: Check cs.sentLast and error if we already ended the stream. - if cs.tracing { - cs.mu.Lock() - if cs.trInfo.tr != nil { - cs.trInfo.tr.LazyLog(&payload{sent: true, msg: m}, true) - } - cs.mu.Unlock() + // TODO(retry): buffer message for replaying if not committed. + return cs.attempt.sendMsg(m) +} + +func (cs *clientStream) RecvMsg(m interface{}) (err error) { + // TODO(retry): maybe retry on error or commit attempt on success. + return cs.attempt.recvMsg(m) +} + +func (cs *clientStream) CloseSend() error { + cs.attempt.closeSend() + return nil +} + +func (cs *clientStream) finish(err error) { + if err == io.EOF { + // Ending a stream with EOF indicates a success. + err = nil } + cs.mu.Lock() + if cs.finished { + cs.mu.Unlock() + return + } + cs.finished = true + cs.mu.Unlock() + // TODO(retry): commit current attempt if necessary. + cs.attempt.finish(err) + for _, o := range cs.opts { + o.after(cs.c) + } + cs.cancel() +} + +func (a *csAttempt) context() context.Context { + return a.s.Context() +} + +func (a *csAttempt) header() (metadata.MD, error) { + return a.s.Header() +} + +func (a *csAttempt) trailer() metadata.MD { + return a.s.Trailer() +} + +func (a *csAttempt) sendMsg(m interface{}) (err error) { // TODO Investigate how to signal the stats handling party. // generate error stats if err != nil && err != io.EOF? + cs := a.cs defer func() { // For non-client-streaming RPCs, we return nil instead of EOF on success // because the generated code requires it. finish is not called; RecvMsg() @@ -381,14 +430,23 @@ func (cs *clientStream) SendMsg(m interface{}) (err error) { err = nil } if err != nil && err != io.EOF { - // Call finish for errors generated by this SendMsg call. (Transport + // Call finish on the client stream for errors generated by this SendMsg + // call, as these indicate problems created by this client. (Transport // errors are converted to an io.EOF error below; the real error will be - // returned from RecvMsg eventually in that case.) + // returned from RecvMsg eventually in that case, or be retried.) cs.finish(err) } }() + // TODO: Check cs.sentLast and error if we already ended the stream. + if EnableTracing { + a.mu.Lock() + if a.trInfo.tr != nil { + a.trInfo.tr.LazyLog(&payload{sent: true, msg: m}, true) + } + a.mu.Unlock() + } var outPayload *stats.OutPayload - if cs.statsHandler != nil { + if a.statsHandler != nil { outPayload = &stats.OutPayload{ Client: true, } @@ -403,18 +461,19 @@ func (cs *clientStream) SendMsg(m interface{}) (err error) { if !cs.desc.ClientStreams { cs.sentLast = true } - err = cs.t.Write(cs.s, hdr, data, &transport.Options{Last: !cs.desc.ClientStreams}) + err = a.t.Write(a.s, hdr, data, &transport.Options{Last: !cs.desc.ClientStreams}) if err == nil { if outPayload != nil { outPayload.SentTime = time.Now() - cs.statsHandler.HandleRPC(cs.statsCtx, outPayload) + a.statsHandler.HandleRPC(a.ctx, outPayload) } return nil } return io.EOF } -func (cs *clientStream) RecvMsg(m interface{}) (err error) { +func (a *csAttempt) recvMsg(m interface{}) (err error) { + cs := a.cs defer func() { if err != nil || !cs.desc.ServerStreams { // err != nil or non-server-streaming indicates end of stream. @@ -422,46 +481,46 @@ func (cs *clientStream) RecvMsg(m interface{}) (err error) { } }() var inPayload *stats.InPayload - if cs.statsHandler != nil { + if a.statsHandler != nil { inPayload = &stats.InPayload{ Client: true, } } - if !cs.decompSet { + if !a.decompSet { // Block until we receive headers containing received message encoding. - if ct := cs.s.RecvCompress(); ct != "" && ct != encoding.Identity { - if cs.dc == nil || cs.dc.Type() != ct { + if ct := a.s.RecvCompress(); ct != "" && ct != encoding.Identity { + if a.dc == nil || a.dc.Type() != ct { // No configured decompressor, or it does not match the incoming // message encoding; attempt to find a registered compressor that does. - cs.dc = nil - cs.decomp = encoding.GetCompressor(ct) + a.dc = nil + a.decomp = encoding.GetCompressor(ct) } } else { // No compression is used; disable our decompressor. - cs.dc = nil + a.dc = nil } // Only initialize this state once per stream. - cs.decompSet = true + a.decompSet = true } - err = recv(cs.p, cs.codec, cs.s, cs.dc, m, *cs.c.maxReceiveMessageSize, inPayload, cs.decomp) + err = recv(a.p, cs.codec, a.s, a.dc, m, *cs.c.maxReceiveMessageSize, inPayload, a.decomp) if err != nil { if err == io.EOF { - if statusErr := cs.s.Status().Err(); statusErr != nil { + if statusErr := a.s.Status().Err(); statusErr != nil { return statusErr } return io.EOF // indicates successful end of stream. } return toRPCErr(err) } - if cs.tracing { - cs.mu.Lock() - if cs.trInfo.tr != nil { - cs.trInfo.tr.LazyLog(&payload{sent: false, msg: m}, true) + if EnableTracing { + a.mu.Lock() + if a.trInfo.tr != nil { + a.trInfo.tr.LazyLog(&payload{sent: false, msg: m}, true) } - cs.mu.Unlock() + a.mu.Unlock() } if inPayload != nil { - cs.statsHandler.HandleRPC(cs.statsCtx, inPayload) + a.statsHandler.HandleRPC(a.ctx, inPayload) } if cs.desc.ServerStreams { // Subsequent messages should be received by subsequent RecvMsg calls. @@ -470,75 +529,59 @@ func (cs *clientStream) RecvMsg(m interface{}) (err error) { // Special handling for non-server-stream rpcs. // This recv expects EOF or errors, so we don't collect inPayload. - err = recv(cs.p, cs.codec, cs.s, cs.dc, m, *cs.c.maxReceiveMessageSize, nil, cs.decomp) + err = recv(a.p, cs.codec, a.s, a.dc, m, *cs.c.maxReceiveMessageSize, nil, a.decomp) if err == nil { return toRPCErr(errors.New("grpc: client streaming protocol violation: get , want ")) } if err == io.EOF { - return cs.s.Status().Err() // non-server streaming Recv returns nil on success + return a.s.Status().Err() // non-server streaming Recv returns nil on success } return toRPCErr(err) } -func (cs *clientStream) CloseSend() error { +func (a *csAttempt) closeSend() { + cs := a.cs if cs.sentLast { - return nil - } - cs.sentLast = true - cs.t.Write(cs.s, nil, nil, &transport.Options{Last: true}) - // We ignore errors from Write and always return nil here. Any error it - // would return would also be returned by a subsequent RecvMsg call, and the - // user is supposed to always finish the stream by calling RecvMsg until it - // returns err != nil. - return nil -} - -func (cs *clientStream) finish(err error) { - if err == io.EOF { - // Ending a stream with EOF indicates a success. - err = nil - } - cs.mu.Lock() - defer cs.mu.Unlock() - if cs.finished { return } - cs.finished = true - cs.t.CloseStream(cs.s, err) - for _, o := range cs.opts { - o.after(cs.c) - } - if cs.done != nil { - cs.done(balancer.DoneInfo{ + cs.sentLast = true + cs.attempt.t.Write(cs.attempt.s, nil, nil, &transport.Options{Last: true}) + // We ignore errors from Write. Any error it would return would also be + // returned by a subsequent RecvMsg call, and the user is supposed to always + // finish the stream by calling RecvMsg until it returns err != nil. +} + +func (a *csAttempt) finish(err error) { + a.mu.Lock() + a.t.CloseStream(a.s, err) + + if a.done != nil { + a.done(balancer.DoneInfo{ Err: err, BytesSent: true, - BytesReceived: cs.s.BytesReceived(), + BytesReceived: a.s.BytesReceived(), }) - cs.done = nil } - if cs.statsHandler != nil { + if a.statsHandler != nil { end := &stats.End{ Client: true, - BeginTime: cs.beginTime, + BeginTime: a.beginTime, EndTime: time.Now(), Error: err, } - cs.statsHandler.HandleRPC(cs.statsCtx, end) + a.statsHandler.HandleRPC(a.ctx, end) } - cs.cancel() - if !cs.tracing { - return - } - if cs.trInfo.tr != nil { + if a.trInfo.tr != nil { if err == nil { - cs.trInfo.tr.LazyPrintf("RPC: [OK]") + a.trInfo.tr.LazyPrintf("RPC: [OK]") } else { - cs.trInfo.tr.LazyPrintf("RPC: [%v]", err) - cs.trInfo.tr.SetError() + a.trInfo.tr.LazyPrintf("RPC: [%v]", err) + a.trInfo.tr.SetError() } - cs.trInfo.tr.Finish() - cs.trInfo.tr = nil + a.trInfo.tr.Finish() + a.trInfo.tr = nil } + a.mu.Unlock() } // ServerStream defines the interface a server stream has to satisfy.