fix review comments
This commit is contained in:
25
call.go
25
call.go
@ -64,23 +64,24 @@ func recvResponse(dopts dialOptions, t transport.ClientTransport, c *callInfo, s
|
|||||||
return
|
return
|
||||||
}
|
}
|
||||||
p := &parser{r: stream}
|
p := &parser{r: stream}
|
||||||
var inStats *stats.InPayload
|
var inPayload *stats.InPayload
|
||||||
if stats.On() {
|
if stats.On() {
|
||||||
inStats = &stats.InPayload{
|
inPayload = &stats.InPayload{
|
||||||
Client: true,
|
Client: true,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
for {
|
for {
|
||||||
if err = recv(p, dopts.codec, stream, dopts.dc, reply, math.MaxInt32, inStats); err != nil {
|
if err = recv(p, dopts.codec, stream, dopts.dc, reply, math.MaxInt32, inPayload); err != nil {
|
||||||
if err == io.EOF {
|
if err == io.EOF {
|
||||||
break
|
break
|
||||||
}
|
}
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
if err == io.EOF && stream.StatusCode() == codes.OK && inStats != nil {
|
if inPayload != nil && err == io.EOF && stream.StatusCode() == codes.OK {
|
||||||
// TODO in the current implementation, inTrailer is handled before inStats. Fix the order if necessary.
|
// TODO in the current implementation, inTrailer may be handled before inStats in some cases.
|
||||||
stats.Handle(stream.Context(), inStats)
|
// Fix the order if necessary.
|
||||||
|
stats.Handle(stream.Context(), inPayload)
|
||||||
}
|
}
|
||||||
c.trailerMD = stream.Trailer()
|
c.trailerMD = stream.Trailer()
|
||||||
return nil
|
return nil
|
||||||
@ -102,24 +103,24 @@ func sendRequest(ctx context.Context, codec Codec, compressor Compressor, callHd
|
|||||||
}()
|
}()
|
||||||
var (
|
var (
|
||||||
cbuf *bytes.Buffer
|
cbuf *bytes.Buffer
|
||||||
outStats *stats.OutPayload
|
outPayload *stats.OutPayload
|
||||||
)
|
)
|
||||||
if compressor != nil {
|
if compressor != nil {
|
||||||
cbuf = new(bytes.Buffer)
|
cbuf = new(bytes.Buffer)
|
||||||
}
|
}
|
||||||
if stats.On() {
|
if stats.On() {
|
||||||
outStats = &stats.OutPayload{
|
outPayload = &stats.OutPayload{
|
||||||
Client: true,
|
Client: true,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
outBuf, err := encode(codec, args, compressor, cbuf, outStats)
|
outBuf, err := encode(codec, args, compressor, cbuf, outPayload)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, Errorf(codes.Internal, "grpc: %v", err)
|
return nil, Errorf(codes.Internal, "grpc: %v", err)
|
||||||
}
|
}
|
||||||
err = t.Write(stream, outBuf, opts)
|
err = t.Write(stream, outBuf, opts)
|
||||||
if outStats != nil {
|
if outPayload != nil {
|
||||||
outStats.SentTime = time.Now()
|
outPayload.SentTime = time.Now()
|
||||||
stats.Handle(stream.Context(), outStats)
|
stats.Handle(stream.Context(), outPayload)
|
||||||
}
|
}
|
||||||
// t.NewStream(...) could lead to an early rejection of the RPC (e.g., the service/method
|
// t.NewStream(...) could lead to an early rejection of the RPC (e.g., the service/method
|
||||||
// does not exist.) so that t.Write could get io.EOF from wait(...). Leave the following
|
// does not exist.) so that t.Write could get io.EOF from wait(...). Leave the following
|
||||||
|
|||||||
@ -187,10 +187,10 @@ var (
|
|||||||
|
|
||||||
// On indicates whether stats is started.
|
// On indicates whether stats is started.
|
||||||
func On() bool {
|
func On() bool {
|
||||||
return atomic.LoadInt32(on) == 1
|
return atomic.CompareAndSwapInt32(on, 1, 1)
|
||||||
}
|
}
|
||||||
|
|
||||||
// Handle returns the call back function registered by user to process the stats.
|
// Handle processes the stats using the call back function registered by user.
|
||||||
func Handle(ctx context.Context, s RPCStats) {
|
func Handle(ctx context.Context, s RPCStats) {
|
||||||
handler(ctx, s)
|
handler(ctx, s)
|
||||||
}
|
}
|
||||||
@ -202,8 +202,7 @@ func RegisterHandler(f func(context.Context, RPCStats)) {
|
|||||||
handler = f
|
handler = f
|
||||||
}
|
}
|
||||||
|
|
||||||
// Start starts the stats collection.
|
// Start starts the stats collection and reporting if there is a registered stats handle.
|
||||||
// Stats will only be started if handler is not nil.
|
|
||||||
func Start() {
|
func Start() {
|
||||||
if handler == nil {
|
if handler == nil {
|
||||||
grpclog.Println("handler is nil when starting stats. Stats is not started")
|
grpclog.Println("handler is nil when starting stats. Stats is not started")
|
||||||
@ -212,8 +211,8 @@ func Start() {
|
|||||||
atomic.StoreInt32(on, 1)
|
atomic.StoreInt32(on, 1)
|
||||||
}
|
}
|
||||||
|
|
||||||
// Stop stops the collection of any further stats.
|
// Stop stops the stats collection and processing.
|
||||||
// Stop won't unregister handler.
|
// Stop does not unregister handler.
|
||||||
func Stop() {
|
func Stop() {
|
||||||
atomic.StoreInt32(on, 0)
|
atomic.StoreInt32(on, 0)
|
||||||
}
|
}
|
||||||
|
|||||||
Reference in New Issue
Block a user