This commit is contained in:
Menghan Li
2016-11-04 16:02:19 -07:00
parent aa5b5c7e2a
commit a4edc462a2
4 changed files with 36 additions and 36 deletions

View File

@ -80,7 +80,7 @@ func recvResponse(ctx context.Context, dopts dialOptions, t transport.ClientTran
}
}
if inPayload != nil && err == io.EOF && stream.StatusCode() == codes.OK {
// TODO in the current implementation, inTrailer may be handled before inStats in some cases.
// TODO in the current implementation, inTrailer may be handled before inPayload in some cases.
// Fix the order if necessary.
stats.Handle(ctx, inPayload)
}

View File

@ -257,7 +257,7 @@ func (p *parser) recvMsg(maxMsgSize int) (pf payloadFormat, msg []byte, err erro
// encode serializes msg and prepends the message header. If msg is nil, it
// generates the message header of 0 message length.
func encode(c Codec, msg interface{}, cp Compressor, cbuf *bytes.Buffer, outStats *stats.OutPayload) ([]byte, error) {
func encode(c Codec, msg interface{}, cp Compressor, cbuf *bytes.Buffer, outPayload *stats.OutPayload) ([]byte, error) {
var (
b []byte
length uint
@ -269,10 +269,10 @@ func encode(c Codec, msg interface{}, cp Compressor, cbuf *bytes.Buffer, outStat
if err != nil {
return nil, err
}
if outStats != nil {
outStats.Payload = msg
outStats.Data = b
outStats.Length = len(b)
if outPayload != nil {
outPayload.Payload = msg
outPayload.Data = b
outPayload.Length = len(b)
}
if cp != nil {
if err := cp.Do(cbuf, b); err != nil {
@ -304,8 +304,8 @@ func encode(c Codec, msg interface{}, cp Compressor, cbuf *bytes.Buffer, outStat
// Copy encoded msg to buf
copy(buf[5:], b)
if outStats != nil {
outStats.WireLength = len(buf)
if outPayload != nil {
outPayload.WireLength = len(buf)
}
return buf, nil
@ -324,14 +324,14 @@ func checkRecvPayload(pf payloadFormat, recvCompress string, dc Decompressor) er
return nil
}
func recv(p *parser, c Codec, s *transport.Stream, dc Decompressor, m interface{}, maxMsgSize int, inStats *stats.InPayload) error {
func recv(p *parser, c Codec, s *transport.Stream, dc Decompressor, m interface{}, maxMsgSize int, inPayload *stats.InPayload) error {
pf, d, err := p.recvMsg(maxMsgSize)
if err != nil {
return err
}
if inStats != nil {
inStats.RecvTime = time.Now()
inStats.WireLength = len(d)
if inPayload != nil {
inPayload.RecvTime = time.Now()
inPayload.WireLength = len(d)
}
if err := checkRecvPayload(pf, s.RecvCompress(), dc); err != nil {
return err
@ -350,10 +350,10 @@ func recv(p *parser, c Codec, s *transport.Stream, dc Decompressor, m interface{
if err := c.Unmarshal(d, m); err != nil {
return Errorf(codes.Internal, "grpc: failed to unmarshal the received message %v", err)
}
if inStats != nil {
inStats.Payload = m
inStats.Data = d
inStats.Length = len(d)
if inPayload != nil {
inPayload.Payload = m
inPayload.Data = d
inPayload.Length = len(d)
}
return nil
}

View File

@ -620,9 +620,9 @@ func (s *Server) processUnaryRPC(t transport.ServerTransport, stream *transport.
p := &parser{r: stream}
for {
pf, req, err := p.recvMsg(s.opts.maxMsgSize)
var inStats *stats.InPayload
var inPayload *stats.InPayload
if stats.On() {
inStats = &stats.InPayload{
inPayload = &stats.InPayload{
RecvTime: time.Now(),
}
@ -670,8 +670,8 @@ func (s *Server) processUnaryRPC(t transport.ServerTransport, stream *transport.
statusCode := codes.OK
statusDesc := ""
df := func(v interface{}) error {
if inStats != nil {
inStats.WireLength = len(req)
if inPayload != nil {
inPayload.WireLength = len(req)
}
if pf == compressionMade {
var err error
@ -692,11 +692,11 @@ func (s *Server) processUnaryRPC(t transport.ServerTransport, stream *transport.
if err := s.opts.codec.Unmarshal(req, v); err != nil {
return err
}
if inStats != nil {
inStats.Payload = v
inStats.Data = req
inStats.Length = len(req)
stats.Handle(stream.Context(), inStats)
if inPayload != nil {
inPayload.Payload = v
inPayload.Data = req
inPayload.Length = len(req)
stats.Handle(stream.Context(), inPayload)
}
if trInfo != nil {
trInfo.tr.LazyLog(&payload{sent: false, msg: v}, true)

View File

@ -382,13 +382,13 @@ func (cs *clientStream) RecvMsg(m interface{}) (err error) {
stats.Handle(cs.userCtx, end)
}
}()
var inStats *stats.InPayload
var inPayload *stats.InPayload
if stats.On() {
inStats = &stats.InPayload{
inPayload = &stats.InPayload{
Client: true,
}
}
err = recv(cs.p, cs.codec, cs.s, cs.dc, m, math.MaxInt32, inStats)
err = recv(cs.p, cs.codec, cs.s, cs.dc, m, math.MaxInt32, inPayload)
defer func() {
// err != nil indicates the termination of the stream.
if err != nil {
@ -403,14 +403,14 @@ func (cs *clientStream) RecvMsg(m interface{}) (err error) {
}
cs.mu.Unlock()
}
if inStats != nil {
stats.Handle(cs.userCtx, inStats)
if inPayload != nil {
stats.Handle(cs.userCtx, inPayload)
}
if !cs.desc.ClientStreams || cs.desc.ServerStreams {
return
}
// Special handling for client streaming rpc.
// This recv expects EOF or errors, so we don't collect inStats.
// This recv expects EOF or errors, so we don't collect inPayload.
err = recv(cs.p, cs.codec, cs.s, cs.dc, m, math.MaxInt32, nil)
cs.closeTransportStream(err)
if err == nil {
@ -606,11 +606,11 @@ func (ss *serverStream) RecvMsg(m interface{}) (err error) {
ss.mu.Unlock()
}
}()
var inStats *stats.InPayload
var inPayload *stats.InPayload
if stats.On() {
inStats = &stats.InPayload{}
inPayload = &stats.InPayload{}
}
if err := recv(ss.p, ss.codec, ss.s, ss.dc, m, ss.maxMsgSize, inStats); err != nil {
if err := recv(ss.p, ss.codec, ss.s, ss.dc, m, ss.maxMsgSize, inPayload); err != nil {
if err == io.EOF {
return err
}
@ -619,8 +619,8 @@ func (ss *serverStream) RecvMsg(m interface{}) (err error) {
}
return toRPCErr(err)
}
if inStats != nil {
stats.Handle(ss.s.Context(), inStats)
if inPayload != nil {
stats.Handle(ss.s.Context(), inPayload)
}
return nil
}