stream: split per-attempt data from clientStream (#1900)
This is pre-work to implementing retry support. Each retry attempt will have its own csAttempt. The fields left in clientStream are the same across all attempts.
This commit is contained in:
		
							
								
								
									
										263
									
								
								stream.go
									
									
									
									
									
								
							
							
						
						
									
										263
									
								
								stream.go
									
									
									
									
									
								
							| @ -266,29 +266,28 @@ func newClientStream(ctx context.Context, desc *StreamDesc, cc *ClientConn, meth | ||||
| 		break | ||||
| 	} | ||||
|  | ||||
| 	c.stream = s | ||||
| 	cs := &clientStream{ | ||||
| 		opts:   opts, | ||||
| 		c:      c, | ||||
| 		desc:   desc, | ||||
| 		codec:  c.codec, | ||||
| 		cp:     cp, | ||||
| 		dc:     cc.dopts.dc, | ||||
| 		comp:   comp, | ||||
| 		cancel: cancel, | ||||
|  | ||||
| 		done: done, | ||||
| 		t:    t, | ||||
| 		s:    s, | ||||
| 		p:    &parser{r: s}, | ||||
|  | ||||
| 		tracing: EnableTracing, | ||||
| 		trInfo:  trInfo, | ||||
|  | ||||
| 		statsCtx:     ctx, | ||||
| 		statsHandler: cc.dopts.copts.StatsHandler, | ||||
| 		beginTime:    beginTime, | ||||
| 		attempt: &csAttempt{ | ||||
| 			t:            t, | ||||
| 			s:            s, | ||||
| 			p:            &parser{r: s}, | ||||
| 			done:         done, | ||||
| 			dc:           cc.dopts.dc, | ||||
| 			ctx:          ctx, | ||||
| 			trInfo:       trInfo, | ||||
| 			statsHandler: sh, | ||||
| 			beginTime:    beginTime, | ||||
| 		}, | ||||
| 	} | ||||
| 	cs.c.stream = cs | ||||
| 	cs.attempt.cs = cs | ||||
| 	if desc != unaryStreamDesc { | ||||
| 		// Listen on cc and stream contexts to cleanup when the user closes the | ||||
| 		// ClientConn or cancels the stream context.  In all other cases, an error | ||||
| @ -300,7 +299,7 @@ func newClientStream(ctx context.Context, desc *StreamDesc, cc *ClientConn, meth | ||||
| 			case <-cc.ctx.Done(): | ||||
| 				cs.finish(ErrClientConnClosing) | ||||
| 			case <-ctx.Done(): | ||||
| 				cs.finish(toRPCErr(s.Context().Err())) | ||||
| 				cs.finish(toRPCErr(ctx.Err())) | ||||
| 			} | ||||
| 		}() | ||||
| 	} | ||||
| @ -311,47 +310,56 @@ func newClientStream(ctx context.Context, desc *StreamDesc, cc *ClientConn, meth | ||||
| type clientStream struct { | ||||
| 	opts []CallOption | ||||
| 	c    *callInfo | ||||
| 	desc *StreamDesc | ||||
|  | ||||
| 	codec baseCodec | ||||
| 	cp    Compressor | ||||
| 	comp  encoding.Compressor | ||||
|  | ||||
| 	cancel context.CancelFunc // cancels all attempts | ||||
|  | ||||
| 	sentLast bool // sent an end stream | ||||
|  | ||||
| 	mu       sync.Mutex // guards finished | ||||
| 	finished bool       // TODO: replace with atomic cmpxchg or sync.Once? | ||||
|  | ||||
| 	attempt *csAttempt // the active client stream attempt | ||||
| 	// TODO(hedging): hedging will have multiple attempts simultaneously. | ||||
| } | ||||
|  | ||||
| // csAttempt implements a single transport stream attempt within a | ||||
| // clientStream. | ||||
| type csAttempt struct { | ||||
| 	cs   *clientStream | ||||
| 	t    transport.ClientTransport | ||||
| 	s    *transport.Stream | ||||
| 	p    *parser | ||||
| 	desc *StreamDesc | ||||
| 	done func(balancer.DoneInfo) | ||||
|  | ||||
| 	codec     baseCodec | ||||
| 	cp        Compressor | ||||
| 	dc        Decompressor | ||||
| 	comp      encoding.Compressor | ||||
| 	decomp    encoding.Compressor | ||||
| 	decompSet bool | ||||
|  | ||||
| 	// cancel is only called when RecvMsg() returns non-nil error, which means | ||||
| 	// the stream finishes with error or with io.EOF. | ||||
| 	cancel context.CancelFunc | ||||
| 	ctx context.Context // the application's context, wrapped by stats/tracing | ||||
|  | ||||
| 	tracing bool // set to EnableTracing when the clientStream is created. | ||||
|  | ||||
| 	mu       sync.Mutex | ||||
| 	done     func(balancer.DoneInfo) | ||||
| 	sentLast bool // sent an end stream | ||||
| 	finished bool | ||||
| 	// trInfo.tr is set when the clientStream is created (if EnableTracing is true), | ||||
| 	// and is set to nil when the clientStream's finish method is called. | ||||
| 	mu sync.Mutex // guards trInfo.tr | ||||
| 	// trInfo.tr is set when created (if EnableTracing is true), | ||||
| 	// and cleared when the finish method is called. | ||||
| 	trInfo traceInfo | ||||
|  | ||||
| 	// statsCtx keeps the user context for stats handling. | ||||
| 	// All stats collection should use the statsCtx (instead of the stream context) | ||||
| 	// so that all the generated stats for a particular RPC can be associated in the processing phase. | ||||
| 	statsCtx     context.Context | ||||
| 	statsHandler stats.Handler | ||||
| 	beginTime    time.Time | ||||
| } | ||||
|  | ||||
| func (cs *clientStream) Context() context.Context { | ||||
| 	return cs.s.Context() | ||||
| 	// TODO(retry): commit the current attempt (the context has peer-aware data). | ||||
| 	return cs.attempt.context() | ||||
| } | ||||
|  | ||||
| func (cs *clientStream) Header() (metadata.MD, error) { | ||||
| 	m, err := cs.s.Header() | ||||
| 	m, err := cs.attempt.header() | ||||
| 	if err != nil { | ||||
| 		// TODO(retry): maybe retry on error or commit attempt on success. | ||||
| 		err = toRPCErr(err) | ||||
| 		cs.finish(err) | ||||
| 	} | ||||
| @ -359,20 +367,61 @@ func (cs *clientStream) Header() (metadata.MD, error) { | ||||
| } | ||||
|  | ||||
| func (cs *clientStream) Trailer() metadata.MD { | ||||
| 	return cs.s.Trailer() | ||||
| 	// TODO(retry): on error, maybe retry (trailers-only). | ||||
| 	return cs.attempt.trailer() | ||||
| } | ||||
|  | ||||
| func (cs *clientStream) SendMsg(m interface{}) (err error) { | ||||
| 	// TODO: Check cs.sentLast and error if we already ended the stream. | ||||
| 	if cs.tracing { | ||||
| 		cs.mu.Lock() | ||||
| 		if cs.trInfo.tr != nil { | ||||
| 			cs.trInfo.tr.LazyLog(&payload{sent: true, msg: m}, true) | ||||
| 		} | ||||
| 		cs.mu.Unlock() | ||||
| 	// TODO(retry): buffer message for replaying if not committed. | ||||
| 	return cs.attempt.sendMsg(m) | ||||
| } | ||||
|  | ||||
| func (cs *clientStream) RecvMsg(m interface{}) (err error) { | ||||
| 	// TODO(retry): maybe retry on error or commit attempt on success. | ||||
| 	return cs.attempt.recvMsg(m) | ||||
| } | ||||
|  | ||||
| func (cs *clientStream) CloseSend() error { | ||||
| 	cs.attempt.closeSend() | ||||
| 	return nil | ||||
| } | ||||
|  | ||||
| func (cs *clientStream) finish(err error) { | ||||
| 	if err == io.EOF { | ||||
| 		// Ending a stream with EOF indicates a success. | ||||
| 		err = nil | ||||
| 	} | ||||
| 	cs.mu.Lock() | ||||
| 	if cs.finished { | ||||
| 		cs.mu.Unlock() | ||||
| 		return | ||||
| 	} | ||||
| 	cs.finished = true | ||||
| 	cs.mu.Unlock() | ||||
| 	// TODO(retry): commit current attempt if necessary. | ||||
| 	cs.attempt.finish(err) | ||||
| 	for _, o := range cs.opts { | ||||
| 		o.after(cs.c) | ||||
| 	} | ||||
| 	cs.cancel() | ||||
| } | ||||
|  | ||||
| func (a *csAttempt) context() context.Context { | ||||
| 	return a.s.Context() | ||||
| } | ||||
|  | ||||
| func (a *csAttempt) header() (metadata.MD, error) { | ||||
| 	return a.s.Header() | ||||
| } | ||||
|  | ||||
| func (a *csAttempt) trailer() metadata.MD { | ||||
| 	return a.s.Trailer() | ||||
| } | ||||
|  | ||||
| func (a *csAttempt) sendMsg(m interface{}) (err error) { | ||||
| 	// TODO Investigate how to signal the stats handling party. | ||||
| 	// generate error stats if err != nil && err != io.EOF? | ||||
| 	cs := a.cs | ||||
| 	defer func() { | ||||
| 		// For non-client-streaming RPCs, we return nil instead of EOF on success | ||||
| 		// because the generated code requires it.  finish is not called; RecvMsg() | ||||
| @ -381,14 +430,23 @@ func (cs *clientStream) SendMsg(m interface{}) (err error) { | ||||
| 			err = nil | ||||
| 		} | ||||
| 		if err != nil && err != io.EOF { | ||||
| 			// Call finish for errors generated by this SendMsg call.  (Transport | ||||
| 			// Call finish on the client stream for errors generated by this SendMsg | ||||
| 			// call, as these indicate problems created by this client.  (Transport | ||||
| 			// errors are converted to an io.EOF error below; the real error will be | ||||
| 			// returned from RecvMsg eventually in that case.) | ||||
| 			// returned from RecvMsg eventually in that case, or be retried.) | ||||
| 			cs.finish(err) | ||||
| 		} | ||||
| 	}() | ||||
| 	// TODO: Check cs.sentLast and error if we already ended the stream. | ||||
| 	if EnableTracing { | ||||
| 		a.mu.Lock() | ||||
| 		if a.trInfo.tr != nil { | ||||
| 			a.trInfo.tr.LazyLog(&payload{sent: true, msg: m}, true) | ||||
| 		} | ||||
| 		a.mu.Unlock() | ||||
| 	} | ||||
| 	var outPayload *stats.OutPayload | ||||
| 	if cs.statsHandler != nil { | ||||
| 	if a.statsHandler != nil { | ||||
| 		outPayload = &stats.OutPayload{ | ||||
| 			Client: true, | ||||
| 		} | ||||
| @ -403,18 +461,19 @@ func (cs *clientStream) SendMsg(m interface{}) (err error) { | ||||
| 	if !cs.desc.ClientStreams { | ||||
| 		cs.sentLast = true | ||||
| 	} | ||||
| 	err = cs.t.Write(cs.s, hdr, data, &transport.Options{Last: !cs.desc.ClientStreams}) | ||||
| 	err = a.t.Write(a.s, hdr, data, &transport.Options{Last: !cs.desc.ClientStreams}) | ||||
| 	if err == nil { | ||||
| 		if outPayload != nil { | ||||
| 			outPayload.SentTime = time.Now() | ||||
| 			cs.statsHandler.HandleRPC(cs.statsCtx, outPayload) | ||||
| 			a.statsHandler.HandleRPC(a.ctx, outPayload) | ||||
| 		} | ||||
| 		return nil | ||||
| 	} | ||||
| 	return io.EOF | ||||
| } | ||||
|  | ||||
| func (cs *clientStream) RecvMsg(m interface{}) (err error) { | ||||
| func (a *csAttempt) recvMsg(m interface{}) (err error) { | ||||
| 	cs := a.cs | ||||
| 	defer func() { | ||||
| 		if err != nil || !cs.desc.ServerStreams { | ||||
| 			// err != nil or non-server-streaming indicates end of stream. | ||||
| @ -422,46 +481,46 @@ func (cs *clientStream) RecvMsg(m interface{}) (err error) { | ||||
| 		} | ||||
| 	}() | ||||
| 	var inPayload *stats.InPayload | ||||
| 	if cs.statsHandler != nil { | ||||
| 	if a.statsHandler != nil { | ||||
| 		inPayload = &stats.InPayload{ | ||||
| 			Client: true, | ||||
| 		} | ||||
| 	} | ||||
| 	if !cs.decompSet { | ||||
| 	if !a.decompSet { | ||||
| 		// Block until we receive headers containing received message encoding. | ||||
| 		if ct := cs.s.RecvCompress(); ct != "" && ct != encoding.Identity { | ||||
| 			if cs.dc == nil || cs.dc.Type() != ct { | ||||
| 		if ct := a.s.RecvCompress(); ct != "" && ct != encoding.Identity { | ||||
| 			if a.dc == nil || a.dc.Type() != ct { | ||||
| 				// No configured decompressor, or it does not match the incoming | ||||
| 				// message encoding; attempt to find a registered compressor that does. | ||||
| 				cs.dc = nil | ||||
| 				cs.decomp = encoding.GetCompressor(ct) | ||||
| 				a.dc = nil | ||||
| 				a.decomp = encoding.GetCompressor(ct) | ||||
| 			} | ||||
| 		} else { | ||||
| 			// No compression is used; disable our decompressor. | ||||
| 			cs.dc = nil | ||||
| 			a.dc = nil | ||||
| 		} | ||||
| 		// Only initialize this state once per stream. | ||||
| 		cs.decompSet = true | ||||
| 		a.decompSet = true | ||||
| 	} | ||||
| 	err = recv(cs.p, cs.codec, cs.s, cs.dc, m, *cs.c.maxReceiveMessageSize, inPayload, cs.decomp) | ||||
| 	err = recv(a.p, cs.codec, a.s, a.dc, m, *cs.c.maxReceiveMessageSize, inPayload, a.decomp) | ||||
| 	if err != nil { | ||||
| 		if err == io.EOF { | ||||
| 			if statusErr := cs.s.Status().Err(); statusErr != nil { | ||||
| 			if statusErr := a.s.Status().Err(); statusErr != nil { | ||||
| 				return statusErr | ||||
| 			} | ||||
| 			return io.EOF // indicates successful end of stream. | ||||
| 		} | ||||
| 		return toRPCErr(err) | ||||
| 	} | ||||
| 	if cs.tracing { | ||||
| 		cs.mu.Lock() | ||||
| 		if cs.trInfo.tr != nil { | ||||
| 			cs.trInfo.tr.LazyLog(&payload{sent: false, msg: m}, true) | ||||
| 	if EnableTracing { | ||||
| 		a.mu.Lock() | ||||
| 		if a.trInfo.tr != nil { | ||||
| 			a.trInfo.tr.LazyLog(&payload{sent: false, msg: m}, true) | ||||
| 		} | ||||
| 		cs.mu.Unlock() | ||||
| 		a.mu.Unlock() | ||||
| 	} | ||||
| 	if inPayload != nil { | ||||
| 		cs.statsHandler.HandleRPC(cs.statsCtx, inPayload) | ||||
| 		a.statsHandler.HandleRPC(a.ctx, inPayload) | ||||
| 	} | ||||
| 	if cs.desc.ServerStreams { | ||||
| 		// Subsequent messages should be received by subsequent RecvMsg calls. | ||||
| @ -470,75 +529,59 @@ func (cs *clientStream) RecvMsg(m interface{}) (err error) { | ||||
|  | ||||
| 	// Special handling for non-server-stream rpcs. | ||||
| 	// This recv expects EOF or errors, so we don't collect inPayload. | ||||
| 	err = recv(cs.p, cs.codec, cs.s, cs.dc, m, *cs.c.maxReceiveMessageSize, nil, cs.decomp) | ||||
| 	err = recv(a.p, cs.codec, a.s, a.dc, m, *cs.c.maxReceiveMessageSize, nil, a.decomp) | ||||
| 	if err == nil { | ||||
| 		return toRPCErr(errors.New("grpc: client streaming protocol violation: get <nil>, want <EOF>")) | ||||
| 	} | ||||
| 	if err == io.EOF { | ||||
| 		return cs.s.Status().Err() // non-server streaming Recv returns nil on success | ||||
| 		return a.s.Status().Err() // non-server streaming Recv returns nil on success | ||||
| 	} | ||||
| 	return toRPCErr(err) | ||||
| } | ||||
|  | ||||
| func (cs *clientStream) CloseSend() error { | ||||
| func (a *csAttempt) closeSend() { | ||||
| 	cs := a.cs | ||||
| 	if cs.sentLast { | ||||
| 		return nil | ||||
| 	} | ||||
| 	cs.sentLast = true | ||||
| 	cs.t.Write(cs.s, nil, nil, &transport.Options{Last: true}) | ||||
| 	// We ignore errors from Write and always return nil here.  Any error it | ||||
| 	// would return would also be returned by a subsequent RecvMsg call, and the | ||||
| 	// user is supposed to always finish the stream by calling RecvMsg until it | ||||
| 	// returns err != nil. | ||||
| 	return nil | ||||
| } | ||||
|  | ||||
| func (cs *clientStream) finish(err error) { | ||||
| 	if err == io.EOF { | ||||
| 		// Ending a stream with EOF indicates a success. | ||||
| 		err = nil | ||||
| 	} | ||||
| 	cs.mu.Lock() | ||||
| 	defer cs.mu.Unlock() | ||||
| 	if cs.finished { | ||||
| 		return | ||||
| 	} | ||||
| 	cs.finished = true | ||||
| 	cs.t.CloseStream(cs.s, err) | ||||
| 	for _, o := range cs.opts { | ||||
| 		o.after(cs.c) | ||||
| 	} | ||||
| 	if cs.done != nil { | ||||
| 		cs.done(balancer.DoneInfo{ | ||||
| 	cs.sentLast = true | ||||
| 	cs.attempt.t.Write(cs.attempt.s, nil, nil, &transport.Options{Last: true}) | ||||
| 	// We ignore errors from Write.  Any error it would return would also be | ||||
| 	// returned by a subsequent RecvMsg call, and the user is supposed to always | ||||
| 	// finish the stream by calling RecvMsg until it returns err != nil. | ||||
| } | ||||
|  | ||||
| func (a *csAttempt) finish(err error) { | ||||
| 	a.mu.Lock() | ||||
| 	a.t.CloseStream(a.s, err) | ||||
|  | ||||
| 	if a.done != nil { | ||||
| 		a.done(balancer.DoneInfo{ | ||||
| 			Err:           err, | ||||
| 			BytesSent:     true, | ||||
| 			BytesReceived: cs.s.BytesReceived(), | ||||
| 			BytesReceived: a.s.BytesReceived(), | ||||
| 		}) | ||||
| 		cs.done = nil | ||||
| 	} | ||||
| 	if cs.statsHandler != nil { | ||||
| 	if a.statsHandler != nil { | ||||
| 		end := &stats.End{ | ||||
| 			Client:    true, | ||||
| 			BeginTime: cs.beginTime, | ||||
| 			BeginTime: a.beginTime, | ||||
| 			EndTime:   time.Now(), | ||||
| 			Error:     err, | ||||
| 		} | ||||
| 		cs.statsHandler.HandleRPC(cs.statsCtx, end) | ||||
| 		a.statsHandler.HandleRPC(a.ctx, end) | ||||
| 	} | ||||
| 	cs.cancel() | ||||
| 	if !cs.tracing { | ||||
| 		return | ||||
| 	} | ||||
| 	if cs.trInfo.tr != nil { | ||||
| 	if a.trInfo.tr != nil { | ||||
| 		if err == nil { | ||||
| 			cs.trInfo.tr.LazyPrintf("RPC: [OK]") | ||||
| 			a.trInfo.tr.LazyPrintf("RPC: [OK]") | ||||
| 		} else { | ||||
| 			cs.trInfo.tr.LazyPrintf("RPC: [%v]", err) | ||||
| 			cs.trInfo.tr.SetError() | ||||
| 			a.trInfo.tr.LazyPrintf("RPC: [%v]", err) | ||||
| 			a.trInfo.tr.SetError() | ||||
| 		} | ||||
| 		cs.trInfo.tr.Finish() | ||||
| 		cs.trInfo.tr = nil | ||||
| 		a.trInfo.tr.Finish() | ||||
| 		a.trInfo.tr = nil | ||||
| 	} | ||||
| 	a.mu.Unlock() | ||||
| } | ||||
|  | ||||
| // ServerStream defines the interface a server stream has to satisfy. | ||||
|  | ||||
		Reference in New Issue
	
	Block a user
	 dfawley
					dfawley