diff --git a/call.go b/call.go index ebfffdd6..4efc5bdd 100644 --- a/call.go +++ b/call.go @@ -64,23 +64,24 @@ func recvResponse(dopts dialOptions, t transport.ClientTransport, c *callInfo, s return } p := &parser{r: stream} - var inStats *stats.InPayload + var inPayload *stats.InPayload if stats.On() { - inStats = &stats.InPayload{ + inPayload = &stats.InPayload{ Client: true, } } for { - if err = recv(p, dopts.codec, stream, dopts.dc, reply, math.MaxInt32, inStats); err != nil { + if err = recv(p, dopts.codec, stream, dopts.dc, reply, math.MaxInt32, inPayload); err != nil { if err == io.EOF { break } return } } - if err == io.EOF && stream.StatusCode() == codes.OK && inStats != nil { - // TODO in the current implementation, inTrailer is handled before inStats. Fix the order if necessary. - stats.Handle(stream.Context(), inStats) + if inPayload != nil && err == io.EOF && stream.StatusCode() == codes.OK { + // TODO in the current implementation, inTrailer may be handled before inStats in some cases. + // Fix the order if necessary. + stats.Handle(stream.Context(), inPayload) } c.trailerMD = stream.Trailer() return nil @@ -101,25 +102,25 @@ func sendRequest(ctx context.Context, codec Codec, compressor Compressor, callHd } }() var ( - cbuf *bytes.Buffer - outStats *stats.OutPayload + cbuf *bytes.Buffer + outPayload *stats.OutPayload ) if compressor != nil { cbuf = new(bytes.Buffer) } if stats.On() { - outStats = &stats.OutPayload{ + outPayload = &stats.OutPayload{ Client: true, } } - outBuf, err := encode(codec, args, compressor, cbuf, outStats) + outBuf, err := encode(codec, args, compressor, cbuf, outPayload) if err != nil { return nil, Errorf(codes.Internal, "grpc: %v", err) } err = t.Write(stream, outBuf, opts) - if outStats != nil { - outStats.SentTime = time.Now() - stats.Handle(stream.Context(), outStats) + if outPayload != nil { + outPayload.SentTime = time.Now() + stats.Handle(stream.Context(), outPayload) } // t.NewStream(...) could lead to an early rejection of the RPC (e.g., the service/method // does not exist.) so that t.Write could get io.EOF from wait(...). Leave the following diff --git a/stats/stats.go b/stats/stats.go index e7094c1c..7b8ee123 100644 --- a/stats/stats.go +++ b/stats/stats.go @@ -187,10 +187,10 @@ var ( // On indicates whether stats is started. func On() bool { - return atomic.LoadInt32(on) == 1 + return atomic.CompareAndSwapInt32(on, 1, 1) } -// Handle returns the call back function registered by user to process the stats. +// Handle processes the stats using the call back function registered by user. func Handle(ctx context.Context, s RPCStats) { handler(ctx, s) } @@ -202,8 +202,7 @@ func RegisterHandler(f func(context.Context, RPCStats)) { handler = f } -// Start starts the stats collection. -// Stats will only be started if handler is not nil. +// Start starts the stats collection and reporting if there is a registered stats handle. func Start() { if handler == nil { grpclog.Println("handler is nil when starting stats. Stats is not started") @@ -212,8 +211,8 @@ func Start() { atomic.StoreInt32(on, 1) } -// Stop stops the collection of any further stats. -// Stop won't unregister handler. +// Stop stops the stats collection and processing. +// Stop does not unregister handler. func Stop() { atomic.StoreInt32(on, 0) }