diff --git a/call.go b/call.go index 7f1345b2..d7ceeb2c 100644 --- a/call.go +++ b/call.go @@ -80,7 +80,7 @@ func recvResponse(ctx context.Context, dopts dialOptions, t transport.ClientTran } } if inPayload != nil && err == io.EOF && stream.StatusCode() == codes.OK { - // TODO in the current implementation, inTrailer may be handled before inStats in some cases. + // TODO in the current implementation, inTrailer may be handled before inPayload in some cases. // Fix the order if necessary. stats.Handle(ctx, inPayload) } diff --git a/rpc_util.go b/rpc_util.go index 909aba2b..06b8ee00 100644 --- a/rpc_util.go +++ b/rpc_util.go @@ -257,7 +257,7 @@ func (p *parser) recvMsg(maxMsgSize int) (pf payloadFormat, msg []byte, err erro // encode serializes msg and prepends the message header. If msg is nil, it // generates the message header of 0 message length. -func encode(c Codec, msg interface{}, cp Compressor, cbuf *bytes.Buffer, outStats *stats.OutPayload) ([]byte, error) { +func encode(c Codec, msg interface{}, cp Compressor, cbuf *bytes.Buffer, outPayload *stats.OutPayload) ([]byte, error) { var ( b []byte length uint @@ -269,10 +269,10 @@ func encode(c Codec, msg interface{}, cp Compressor, cbuf *bytes.Buffer, outStat if err != nil { return nil, err } - if outStats != nil { - outStats.Payload = msg - outStats.Data = b - outStats.Length = len(b) + if outPayload != nil { + outPayload.Payload = msg + outPayload.Data = b + outPayload.Length = len(b) } if cp != nil { if err := cp.Do(cbuf, b); err != nil { @@ -304,8 +304,8 @@ func encode(c Codec, msg interface{}, cp Compressor, cbuf *bytes.Buffer, outStat // Copy encoded msg to buf copy(buf[5:], b) - if outStats != nil { - outStats.WireLength = len(buf) + if outPayload != nil { + outPayload.WireLength = len(buf) } return buf, nil @@ -324,14 +324,14 @@ func checkRecvPayload(pf payloadFormat, recvCompress string, dc Decompressor) er return nil } -func recv(p *parser, c Codec, s *transport.Stream, dc Decompressor, m interface{}, maxMsgSize int, inStats *stats.InPayload) error { +func recv(p *parser, c Codec, s *transport.Stream, dc Decompressor, m interface{}, maxMsgSize int, inPayload *stats.InPayload) error { pf, d, err := p.recvMsg(maxMsgSize) if err != nil { return err } - if inStats != nil { - inStats.RecvTime = time.Now() - inStats.WireLength = len(d) + if inPayload != nil { + inPayload.RecvTime = time.Now() + inPayload.WireLength = len(d) } if err := checkRecvPayload(pf, s.RecvCompress(), dc); err != nil { return err @@ -350,10 +350,10 @@ func recv(p *parser, c Codec, s *transport.Stream, dc Decompressor, m interface{ if err := c.Unmarshal(d, m); err != nil { return Errorf(codes.Internal, "grpc: failed to unmarshal the received message %v", err) } - if inStats != nil { - inStats.Payload = m - inStats.Data = d - inStats.Length = len(d) + if inPayload != nil { + inPayload.Payload = m + inPayload.Data = d + inPayload.Length = len(d) } return nil } diff --git a/server.go b/server.go index 3c4eb067..67f898f2 100644 --- a/server.go +++ b/server.go @@ -620,9 +620,9 @@ func (s *Server) processUnaryRPC(t transport.ServerTransport, stream *transport. p := &parser{r: stream} for { pf, req, err := p.recvMsg(s.opts.maxMsgSize) - var inStats *stats.InPayload + var inPayload *stats.InPayload if stats.On() { - inStats = &stats.InPayload{ + inPayload = &stats.InPayload{ RecvTime: time.Now(), } @@ -670,8 +670,8 @@ func (s *Server) processUnaryRPC(t transport.ServerTransport, stream *transport. statusCode := codes.OK statusDesc := "" df := func(v interface{}) error { - if inStats != nil { - inStats.WireLength = len(req) + if inPayload != nil { + inPayload.WireLength = len(req) } if pf == compressionMade { var err error @@ -692,11 +692,11 @@ func (s *Server) processUnaryRPC(t transport.ServerTransport, stream *transport. if err := s.opts.codec.Unmarshal(req, v); err != nil { return err } - if inStats != nil { - inStats.Payload = v - inStats.Data = req - inStats.Length = len(req) - stats.Handle(stream.Context(), inStats) + if inPayload != nil { + inPayload.Payload = v + inPayload.Data = req + inPayload.Length = len(req) + stats.Handle(stream.Context(), inPayload) } if trInfo != nil { trInfo.tr.LazyLog(&payload{sent: false, msg: v}, true) diff --git a/stream.go b/stream.go index c3838e0b..9a4bd232 100644 --- a/stream.go +++ b/stream.go @@ -382,13 +382,13 @@ func (cs *clientStream) RecvMsg(m interface{}) (err error) { stats.Handle(cs.userCtx, end) } }() - var inStats *stats.InPayload + var inPayload *stats.InPayload if stats.On() { - inStats = &stats.InPayload{ + inPayload = &stats.InPayload{ Client: true, } } - err = recv(cs.p, cs.codec, cs.s, cs.dc, m, math.MaxInt32, inStats) + err = recv(cs.p, cs.codec, cs.s, cs.dc, m, math.MaxInt32, inPayload) defer func() { // err != nil indicates the termination of the stream. if err != nil { @@ -403,14 +403,14 @@ func (cs *clientStream) RecvMsg(m interface{}) (err error) { } cs.mu.Unlock() } - if inStats != nil { - stats.Handle(cs.userCtx, inStats) + if inPayload != nil { + stats.Handle(cs.userCtx, inPayload) } if !cs.desc.ClientStreams || cs.desc.ServerStreams { return } // Special handling for client streaming rpc. - // This recv expects EOF or errors, so we don't collect inStats. + // This recv expects EOF or errors, so we don't collect inPayload. err = recv(cs.p, cs.codec, cs.s, cs.dc, m, math.MaxInt32, nil) cs.closeTransportStream(err) if err == nil { @@ -606,11 +606,11 @@ func (ss *serverStream) RecvMsg(m interface{}) (err error) { ss.mu.Unlock() } }() - var inStats *stats.InPayload + var inPayload *stats.InPayload if stats.On() { - inStats = &stats.InPayload{} + inPayload = &stats.InPayload{} } - if err := recv(ss.p, ss.codec, ss.s, ss.dc, m, ss.maxMsgSize, inStats); err != nil { + if err := recv(ss.p, ss.codec, ss.s, ss.dc, m, ss.maxMsgSize, inPayload); err != nil { if err == io.EOF { return err } @@ -619,8 +619,8 @@ func (ss *serverStream) RecvMsg(m interface{}) (err error) { } return toRPCErr(err) } - if inStats != nil { - stats.Handle(ss.s.Context(), inStats) + if inPayload != nil { + stats.Handle(ss.s.Context(), inPayload) } return nil }