Merge pull request #961 from menghanl/add_stats_package
Add package stats, and export stats.
This commit is contained in:
59
call.go
59
call.go
@ -42,6 +42,7 @@ import (
|
||||
"golang.org/x/net/context"
|
||||
"golang.org/x/net/trace"
|
||||
"google.golang.org/grpc/codes"
|
||||
"google.golang.org/grpc/stats"
|
||||
"google.golang.org/grpc/transport"
|
||||
)
|
||||
|
||||
@ -49,7 +50,8 @@ import (
|
||||
// On error, it returns the error and indicates whether the call should be retried.
|
||||
//
|
||||
// TODO(zhaoq): Check whether the received message sequence is valid.
|
||||
func recvResponse(dopts dialOptions, t transport.ClientTransport, c *callInfo, stream *transport.Stream, reply interface{}) (err error) {
|
||||
// TODO ctx is used for stats collection and processing. It is the context passed from the application.
|
||||
func recvResponse(ctx context.Context, dopts dialOptions, t transport.ClientTransport, c *callInfo, stream *transport.Stream, reply interface{}) (err error) {
|
||||
// Try to acquire header metadata from the server if there is any.
|
||||
defer func() {
|
||||
if err != nil {
|
||||
@ -63,14 +65,25 @@ func recvResponse(dopts dialOptions, t transport.ClientTransport, c *callInfo, s
|
||||
return
|
||||
}
|
||||
p := &parser{r: stream}
|
||||
var inPayload *stats.InPayload
|
||||
if stats.On() {
|
||||
inPayload = &stats.InPayload{
|
||||
Client: true,
|
||||
}
|
||||
}
|
||||
for {
|
||||
if err = recv(p, dopts.codec, stream, dopts.dc, reply, math.MaxInt32); err != nil {
|
||||
if err = recv(p, dopts.codec, stream, dopts.dc, reply, math.MaxInt32, inPayload); err != nil {
|
||||
if err == io.EOF {
|
||||
break
|
||||
}
|
||||
return
|
||||
}
|
||||
}
|
||||
if inPayload != nil && err == io.EOF && stream.StatusCode() == codes.OK {
|
||||
// TODO in the current implementation, inTrailer may be handled before inPayload in some cases.
|
||||
// Fix the order if necessary.
|
||||
stats.Handle(ctx, inPayload)
|
||||
}
|
||||
c.trailerMD = stream.Trailer()
|
||||
return nil
|
||||
}
|
||||
@ -89,15 +102,27 @@ func sendRequest(ctx context.Context, codec Codec, compressor Compressor, callHd
|
||||
}
|
||||
}
|
||||
}()
|
||||
var cbuf *bytes.Buffer
|
||||
var (
|
||||
cbuf *bytes.Buffer
|
||||
outPayload *stats.OutPayload
|
||||
)
|
||||
if compressor != nil {
|
||||
cbuf = new(bytes.Buffer)
|
||||
}
|
||||
outBuf, err := encode(codec, args, compressor, cbuf)
|
||||
if stats.On() {
|
||||
outPayload = &stats.OutPayload{
|
||||
Client: true,
|
||||
}
|
||||
}
|
||||
outBuf, err := encode(codec, args, compressor, cbuf, outPayload)
|
||||
if err != nil {
|
||||
return nil, Errorf(codes.Internal, "grpc: %v", err)
|
||||
}
|
||||
err = t.Write(stream, outBuf, opts)
|
||||
if err == nil && outPayload != nil {
|
||||
outPayload.SentTime = time.Now()
|
||||
stats.Handle(ctx, outPayload)
|
||||
}
|
||||
// t.NewStream(...) could lead to an early rejection of the RPC (e.g., the service/method
|
||||
// does not exist.) so that t.Write could get io.EOF from wait(...). Leave the following
|
||||
// recvResponse to get the final status.
|
||||
@ -118,7 +143,7 @@ func Invoke(ctx context.Context, method string, args, reply interface{}, cc *Cli
|
||||
return invoke(ctx, method, args, reply, cc, opts...)
|
||||
}
|
||||
|
||||
func invoke(ctx context.Context, method string, args, reply interface{}, cc *ClientConn, opts ...CallOption) (err error) {
|
||||
func invoke(ctx context.Context, method string, args, reply interface{}, cc *ClientConn, opts ...CallOption) (e error) {
|
||||
c := defaultCallInfo
|
||||
for _, o := range opts {
|
||||
if err := o.before(&c); err != nil {
|
||||
@ -140,12 +165,30 @@ func invoke(ctx context.Context, method string, args, reply interface{}, cc *Cli
|
||||
c.traceInfo.tr.LazyLog(&c.traceInfo.firstLine, false)
|
||||
// TODO(dsymonds): Arrange for c.traceInfo.firstLine.remoteAddr to be set.
|
||||
defer func() {
|
||||
if err != nil {
|
||||
c.traceInfo.tr.LazyLog(&fmtStringer{"%v", []interface{}{err}}, true)
|
||||
if e != nil {
|
||||
c.traceInfo.tr.LazyLog(&fmtStringer{"%v", []interface{}{e}}, true)
|
||||
c.traceInfo.tr.SetError()
|
||||
}
|
||||
}()
|
||||
}
|
||||
if stats.On() {
|
||||
begin := &stats.Begin{
|
||||
Client: true,
|
||||
BeginTime: time.Now(),
|
||||
FailFast: c.failFast,
|
||||
}
|
||||
stats.Handle(ctx, begin)
|
||||
}
|
||||
defer func() {
|
||||
if stats.On() {
|
||||
end := &stats.End{
|
||||
Client: true,
|
||||
EndTime: time.Now(),
|
||||
Error: e,
|
||||
}
|
||||
stats.Handle(ctx, end)
|
||||
}
|
||||
}()
|
||||
topts := &transport.Options{
|
||||
Last: true,
|
||||
Delay: false,
|
||||
@ -205,7 +248,7 @@ func invoke(ctx context.Context, method string, args, reply interface{}, cc *Cli
|
||||
}
|
||||
return toRPCErr(err)
|
||||
}
|
||||
err = recvResponse(cc.dopts, t, &c, stream, reply)
|
||||
err = recvResponse(ctx, cc.dopts, t, &c, stream, reply)
|
||||
if err != nil {
|
||||
if put != nil {
|
||||
put()
|
||||
|
@ -118,7 +118,7 @@ func (h *testStreamHandler) handleStream(t *testing.T, s *transport.Stream) {
|
||||
}
|
||||
}
|
||||
// send a response back to end the stream.
|
||||
reply, err := encode(testCodec{}, &expectedResponse, nil, nil)
|
||||
reply, err := encode(testCodec{}, &expectedResponse, nil, nil, nil)
|
||||
if err != nil {
|
||||
t.Errorf("Failed to encode the response: %v", err)
|
||||
return
|
||||
@ -185,6 +185,8 @@ func (s *server) start(t *testing.T, port int, maxStreams uint32) {
|
||||
}
|
||||
go st.HandleStreams(func(s *transport.Stream) {
|
||||
go h.handleStream(t, s)
|
||||
}, func(ctx context.Context, method string) context.Context {
|
||||
return ctx
|
||||
})
|
||||
}
|
||||
}
|
||||
|
32
rpc_util.go
32
rpc_util.go
@ -42,11 +42,13 @@ import (
|
||||
"io/ioutil"
|
||||
"math"
|
||||
"os"
|
||||
"time"
|
||||
|
||||
"github.com/golang/protobuf/proto"
|
||||
"golang.org/x/net/context"
|
||||
"google.golang.org/grpc/codes"
|
||||
"google.golang.org/grpc/metadata"
|
||||
"google.golang.org/grpc/stats"
|
||||
"google.golang.org/grpc/transport"
|
||||
)
|
||||
|
||||
@ -255,9 +257,11 @@ func (p *parser) recvMsg(maxMsgSize int) (pf payloadFormat, msg []byte, err erro
|
||||
|
||||
// encode serializes msg and prepends the message header. If msg is nil, it
|
||||
// generates the message header of 0 message length.
|
||||
func encode(c Codec, msg interface{}, cp Compressor, cbuf *bytes.Buffer) ([]byte, error) {
|
||||
var b []byte
|
||||
var length uint
|
||||
func encode(c Codec, msg interface{}, cp Compressor, cbuf *bytes.Buffer, outPayload *stats.OutPayload) ([]byte, error) {
|
||||
var (
|
||||
b []byte
|
||||
length uint
|
||||
)
|
||||
if msg != nil {
|
||||
var err error
|
||||
// TODO(zhaoq): optimize to reduce memory alloc and copying.
|
||||
@ -265,6 +269,12 @@ func encode(c Codec, msg interface{}, cp Compressor, cbuf *bytes.Buffer) ([]byte
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
if outPayload != nil {
|
||||
outPayload.Payload = msg
|
||||
// TODO truncate large payload.
|
||||
outPayload.Data = b
|
||||
outPayload.Length = len(b)
|
||||
}
|
||||
if cp != nil {
|
||||
if err := cp.Do(cbuf, b); err != nil {
|
||||
return nil, err
|
||||
@ -295,6 +305,10 @@ func encode(c Codec, msg interface{}, cp Compressor, cbuf *bytes.Buffer) ([]byte
|
||||
// Copy encoded msg to buf
|
||||
copy(buf[5:], b)
|
||||
|
||||
if outPayload != nil {
|
||||
outPayload.WireLength = len(buf)
|
||||
}
|
||||
|
||||
return buf, nil
|
||||
}
|
||||
|
||||
@ -311,11 +325,14 @@ func checkRecvPayload(pf payloadFormat, recvCompress string, dc Decompressor) er
|
||||
return nil
|
||||
}
|
||||
|
||||
func recv(p *parser, c Codec, s *transport.Stream, dc Decompressor, m interface{}, maxMsgSize int) error {
|
||||
func recv(p *parser, c Codec, s *transport.Stream, dc Decompressor, m interface{}, maxMsgSize int, inPayload *stats.InPayload) error {
|
||||
pf, d, err := p.recvMsg(maxMsgSize)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
if inPayload != nil {
|
||||
inPayload.WireLength = len(d)
|
||||
}
|
||||
if err := checkRecvPayload(pf, s.RecvCompress(), dc); err != nil {
|
||||
return err
|
||||
}
|
||||
@ -333,6 +350,13 @@ func recv(p *parser, c Codec, s *transport.Stream, dc Decompressor, m interface{
|
||||
if err := c.Unmarshal(d, m); err != nil {
|
||||
return Errorf(codes.Internal, "grpc: failed to unmarshal the received message %v", err)
|
||||
}
|
||||
if inPayload != nil {
|
||||
inPayload.RecvTime = time.Now()
|
||||
inPayload.Payload = m
|
||||
// TODO truncate large payload.
|
||||
inPayload.Data = d
|
||||
inPayload.Length = len(d)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
|
@ -114,7 +114,7 @@ func TestEncode(t *testing.T) {
|
||||
}{
|
||||
{nil, nil, []byte{0, 0, 0, 0, 0}, nil},
|
||||
} {
|
||||
b, err := encode(protoCodec{}, test.msg, nil, nil)
|
||||
b, err := encode(protoCodec{}, test.msg, nil, nil, nil)
|
||||
if err != test.err || !bytes.Equal(b, test.b) {
|
||||
t.Fatalf("encode(_, _, %v, _) = %v, %v\nwant %v, %v", test.cp, b, err, test.b, test.err)
|
||||
}
|
||||
@ -199,12 +199,12 @@ func TestErrorsWithSameParameters(t *testing.T) {
|
||||
// bytes.
|
||||
func bmEncode(b *testing.B, mSize int) {
|
||||
msg := &perfpb.Buffer{Body: make([]byte, mSize)}
|
||||
encoded, _ := encode(protoCodec{}, msg, nil, nil)
|
||||
encoded, _ := encode(protoCodec{}, msg, nil, nil, nil)
|
||||
encodedSz := int64(len(encoded))
|
||||
b.ReportAllocs()
|
||||
b.ResetTimer()
|
||||
for i := 0; i < b.N; i++ {
|
||||
encode(protoCodec{}, msg, nil, nil)
|
||||
encode(protoCodec{}, msg, nil, nil, nil)
|
||||
}
|
||||
b.SetBytes(encodedSz)
|
||||
}
|
||||
|
127
server.go
127
server.go
@ -54,6 +54,7 @@ import (
|
||||
"google.golang.org/grpc/grpclog"
|
||||
"google.golang.org/grpc/internal"
|
||||
"google.golang.org/grpc/metadata"
|
||||
"google.golang.org/grpc/stats"
|
||||
"google.golang.org/grpc/tap"
|
||||
"google.golang.org/grpc/transport"
|
||||
)
|
||||
@ -466,6 +467,12 @@ func (s *Server) serveStreams(st transport.ServerTransport) {
|
||||
defer wg.Done()
|
||||
s.handleStream(st, stream, s.traceInfo(st, stream))
|
||||
}()
|
||||
}, func(ctx context.Context, method string) context.Context {
|
||||
if !EnableTracing {
|
||||
return ctx
|
||||
}
|
||||
tr := trace.New("grpc.Recv."+methodFamily(method), method)
|
||||
return trace.NewContext(ctx, tr)
|
||||
})
|
||||
wg.Wait()
|
||||
}
|
||||
@ -515,15 +522,17 @@ func (s *Server) ServeHTTP(w http.ResponseWriter, r *http.Request) {
|
||||
// traceInfo returns a traceInfo and associates it with stream, if tracing is enabled.
|
||||
// If tracing is not enabled, it returns nil.
|
||||
func (s *Server) traceInfo(st transport.ServerTransport, stream *transport.Stream) (trInfo *traceInfo) {
|
||||
if !EnableTracing {
|
||||
tr, ok := trace.FromContext(stream.Context())
|
||||
if !ok {
|
||||
return nil
|
||||
}
|
||||
|
||||
trInfo = &traceInfo{
|
||||
tr: trace.New("grpc.Recv."+methodFamily(stream.Method()), stream.Method()),
|
||||
tr: tr,
|
||||
}
|
||||
trInfo.firstLine.client = false
|
||||
trInfo.firstLine.remoteAddr = st.RemoteAddr()
|
||||
stream.TraceContext(trInfo.tr)
|
||||
|
||||
if dl, ok := stream.Context().Deadline(); ok {
|
||||
trInfo.firstLine.deadline = dl.Sub(time.Now())
|
||||
}
|
||||
@ -550,11 +559,17 @@ func (s *Server) removeConn(c io.Closer) {
|
||||
}
|
||||
|
||||
func (s *Server) sendResponse(t transport.ServerTransport, stream *transport.Stream, msg interface{}, cp Compressor, opts *transport.Options) error {
|
||||
var cbuf *bytes.Buffer
|
||||
var (
|
||||
cbuf *bytes.Buffer
|
||||
outPayload *stats.OutPayload
|
||||
)
|
||||
if cp != nil {
|
||||
cbuf = new(bytes.Buffer)
|
||||
}
|
||||
p, err := encode(s.opts.codec, msg, cp, cbuf)
|
||||
if stats.On() {
|
||||
outPayload = &stats.OutPayload{}
|
||||
}
|
||||
p, err := encode(s.opts.codec, msg, cp, cbuf, outPayload)
|
||||
if err != nil {
|
||||
// This typically indicates a fatal issue (e.g., memory
|
||||
// corruption or hardware faults) the application program
|
||||
@ -565,10 +580,32 @@ func (s *Server) sendResponse(t transport.ServerTransport, stream *transport.Str
|
||||
// the optimal option.
|
||||
grpclog.Fatalf("grpc: Server failed to encode response %v", err)
|
||||
}
|
||||
return t.Write(stream, p, opts)
|
||||
err = t.Write(stream, p, opts)
|
||||
if err == nil && outPayload != nil {
|
||||
outPayload.SentTime = time.Now()
|
||||
stats.Handle(stream.Context(), outPayload)
|
||||
}
|
||||
return err
|
||||
}
|
||||
|
||||
func (s *Server) processUnaryRPC(t transport.ServerTransport, stream *transport.Stream, srv *service, md *MethodDesc, trInfo *traceInfo) (err error) {
|
||||
if stats.On() {
|
||||
begin := &stats.Begin{
|
||||
BeginTime: time.Now(),
|
||||
}
|
||||
stats.Handle(stream.Context(), begin)
|
||||
}
|
||||
defer func() {
|
||||
if stats.On() {
|
||||
end := &stats.End{
|
||||
EndTime: time.Now(),
|
||||
}
|
||||
if err != nil && err != io.EOF {
|
||||
end.Error = toRPCErr(err)
|
||||
}
|
||||
stats.Handle(stream.Context(), end)
|
||||
}
|
||||
}()
|
||||
if trInfo != nil {
|
||||
defer trInfo.tr.Finish()
|
||||
trInfo.firstLine.client = false
|
||||
@ -597,14 +634,14 @@ func (s *Server) processUnaryRPC(t transport.ServerTransport, stream *transport.
|
||||
if err != nil {
|
||||
switch err := err.(type) {
|
||||
case *rpcError:
|
||||
if err := t.WriteStatus(stream, err.code, err.desc); err != nil {
|
||||
grpclog.Printf("grpc: Server.processUnaryRPC failed to write status %v", err)
|
||||
if e := t.WriteStatus(stream, err.code, err.desc); e != nil {
|
||||
grpclog.Printf("grpc: Server.processUnaryRPC failed to write status %v", e)
|
||||
}
|
||||
case transport.ConnectionError:
|
||||
// Nothing to do here.
|
||||
case transport.StreamError:
|
||||
if err := t.WriteStatus(stream, err.Code, err.Desc); err != nil {
|
||||
grpclog.Printf("grpc: Server.processUnaryRPC failed to write status %v", err)
|
||||
if e := t.WriteStatus(stream, err.Code, err.Desc); e != nil {
|
||||
grpclog.Printf("grpc: Server.processUnaryRPC failed to write status %v", e)
|
||||
}
|
||||
default:
|
||||
panic(fmt.Sprintf("grpc: Unexpected error (%T) from recvMsg: %v", err, err))
|
||||
@ -615,20 +652,29 @@ func (s *Server) processUnaryRPC(t transport.ServerTransport, stream *transport.
|
||||
if err := checkRecvPayload(pf, stream.RecvCompress(), s.opts.dc); err != nil {
|
||||
switch err := err.(type) {
|
||||
case *rpcError:
|
||||
if err := t.WriteStatus(stream, err.code, err.desc); err != nil {
|
||||
grpclog.Printf("grpc: Server.processUnaryRPC failed to write status %v", err)
|
||||
if e := t.WriteStatus(stream, err.code, err.desc); e != nil {
|
||||
grpclog.Printf("grpc: Server.processUnaryRPC failed to write status %v", e)
|
||||
}
|
||||
return err
|
||||
default:
|
||||
if err := t.WriteStatus(stream, codes.Internal, err.Error()); err != nil {
|
||||
grpclog.Printf("grpc: Server.processUnaryRPC failed to write status %v", err)
|
||||
if e := t.WriteStatus(stream, codes.Internal, err.Error()); e != nil {
|
||||
grpclog.Printf("grpc: Server.processUnaryRPC failed to write status %v", e)
|
||||
}
|
||||
|
||||
// TODO checkRecvPayload always return RPC error. Add a return here if necessary.
|
||||
}
|
||||
}
|
||||
var inPayload *stats.InPayload
|
||||
if stats.On() {
|
||||
inPayload = &stats.InPayload{
|
||||
RecvTime: time.Now(),
|
||||
}
|
||||
return err
|
||||
}
|
||||
statusCode := codes.OK
|
||||
statusDesc := ""
|
||||
df := func(v interface{}) error {
|
||||
if inPayload != nil {
|
||||
inPayload.WireLength = len(req)
|
||||
}
|
||||
if pf == compressionMade {
|
||||
var err error
|
||||
req, err = s.opts.dc.Do(bytes.NewReader(req))
|
||||
@ -636,7 +682,7 @@ func (s *Server) processUnaryRPC(t transport.ServerTransport, stream *transport.
|
||||
if err := t.WriteStatus(stream, codes.Internal, err.Error()); err != nil {
|
||||
grpclog.Printf("grpc: Server.processUnaryRPC failed to write status %v", err)
|
||||
}
|
||||
return err
|
||||
return Errorf(codes.Internal, err.Error())
|
||||
}
|
||||
}
|
||||
if len(req) > s.opts.maxMsgSize {
|
||||
@ -648,6 +694,12 @@ func (s *Server) processUnaryRPC(t transport.ServerTransport, stream *transport.
|
||||
if err := s.opts.codec.Unmarshal(req, v); err != nil {
|
||||
return err
|
||||
}
|
||||
if inPayload != nil {
|
||||
inPayload.Payload = v
|
||||
inPayload.Data = req
|
||||
inPayload.Length = len(req)
|
||||
stats.Handle(stream.Context(), inPayload)
|
||||
}
|
||||
if trInfo != nil {
|
||||
trInfo.tr.LazyLog(&payload{sent: false, msg: v}, true)
|
||||
}
|
||||
@ -668,9 +720,8 @@ func (s *Server) processUnaryRPC(t transport.ServerTransport, stream *transport.
|
||||
}
|
||||
if err := t.WriteStatus(stream, statusCode, statusDesc); err != nil {
|
||||
grpclog.Printf("grpc: Server.processUnaryRPC failed to write status: %v", err)
|
||||
return err
|
||||
}
|
||||
return nil
|
||||
return Errorf(statusCode, statusDesc)
|
||||
}
|
||||
if trInfo != nil {
|
||||
trInfo.tr.LazyLog(stringer("OK"), false)
|
||||
@ -695,11 +746,32 @@ func (s *Server) processUnaryRPC(t transport.ServerTransport, stream *transport.
|
||||
if trInfo != nil {
|
||||
trInfo.tr.LazyLog(&payload{sent: true, msg: reply}, true)
|
||||
}
|
||||
return t.WriteStatus(stream, statusCode, statusDesc)
|
||||
errWrite := t.WriteStatus(stream, statusCode, statusDesc)
|
||||
if statusCode != codes.OK {
|
||||
return Errorf(statusCode, statusDesc)
|
||||
}
|
||||
return errWrite
|
||||
}
|
||||
}
|
||||
|
||||
func (s *Server) processStreamingRPC(t transport.ServerTransport, stream *transport.Stream, srv *service, sd *StreamDesc, trInfo *traceInfo) (err error) {
|
||||
if stats.On() {
|
||||
begin := &stats.Begin{
|
||||
BeginTime: time.Now(),
|
||||
}
|
||||
stats.Handle(stream.Context(), begin)
|
||||
}
|
||||
defer func() {
|
||||
if stats.On() {
|
||||
end := &stats.End{
|
||||
EndTime: time.Now(),
|
||||
}
|
||||
if err != nil && err != io.EOF {
|
||||
end.Error = toRPCErr(err)
|
||||
}
|
||||
stats.Handle(stream.Context(), end)
|
||||
}
|
||||
}()
|
||||
if s.opts.cp != nil {
|
||||
stream.SetSendCompress(s.opts.cp.Type())
|
||||
}
|
||||
@ -762,7 +834,11 @@ func (s *Server) processStreamingRPC(t transport.ServerTransport, stream *transp
|
||||
}
|
||||
ss.mu.Unlock()
|
||||
}
|
||||
return t.WriteStatus(ss.s, ss.statusCode, ss.statusDesc)
|
||||
errWrite := t.WriteStatus(ss.s, ss.statusCode, ss.statusDesc)
|
||||
if ss.statusCode != codes.OK {
|
||||
return Errorf(ss.statusCode, ss.statusDesc)
|
||||
}
|
||||
return errWrite
|
||||
|
||||
}
|
||||
|
||||
@ -777,7 +853,8 @@ func (s *Server) handleStream(t transport.ServerTransport, stream *transport.Str
|
||||
trInfo.tr.LazyLog(&fmtStringer{"Malformed method name %q", []interface{}{sm}}, true)
|
||||
trInfo.tr.SetError()
|
||||
}
|
||||
if err := t.WriteStatus(stream, codes.InvalidArgument, fmt.Sprintf("malformed method name: %q", stream.Method())); err != nil {
|
||||
errDesc := fmt.Sprintf("malformed method name: %q", stream.Method())
|
||||
if err := t.WriteStatus(stream, codes.InvalidArgument, errDesc); err != nil {
|
||||
if trInfo != nil {
|
||||
trInfo.tr.LazyLog(&fmtStringer{"%v", []interface{}{err}}, true)
|
||||
trInfo.tr.SetError()
|
||||
@ -797,7 +874,8 @@ func (s *Server) handleStream(t transport.ServerTransport, stream *transport.Str
|
||||
trInfo.tr.LazyLog(&fmtStringer{"Unknown service %v", []interface{}{service}}, true)
|
||||
trInfo.tr.SetError()
|
||||
}
|
||||
if err := t.WriteStatus(stream, codes.Unimplemented, fmt.Sprintf("unknown service %v", service)); err != nil {
|
||||
errDesc := fmt.Sprintf("unknown service %v", service)
|
||||
if err := t.WriteStatus(stream, codes.Unimplemented, errDesc); err != nil {
|
||||
if trInfo != nil {
|
||||
trInfo.tr.LazyLog(&fmtStringer{"%v", []interface{}{err}}, true)
|
||||
trInfo.tr.SetError()
|
||||
@ -822,7 +900,8 @@ func (s *Server) handleStream(t transport.ServerTransport, stream *transport.Str
|
||||
trInfo.tr.LazyLog(&fmtStringer{"Unknown method %v", []interface{}{method}}, true)
|
||||
trInfo.tr.SetError()
|
||||
}
|
||||
if err := t.WriteStatus(stream, codes.Unimplemented, fmt.Sprintf("unknown method %v", method)); err != nil {
|
||||
errDesc := fmt.Sprintf("unknown method %v", method)
|
||||
if err := t.WriteStatus(stream, codes.Unimplemented, errDesc); err != nil {
|
||||
if trInfo != nil {
|
||||
trInfo.tr.LazyLog(&fmtStringer{"%v", []interface{}{err}}, true)
|
||||
trInfo.tr.SetError()
|
||||
|
225
stats/grpc_testing/test.pb.go
Normal file
225
stats/grpc_testing/test.pb.go
Normal file
@ -0,0 +1,225 @@
|
||||
// Code generated by protoc-gen-go.
|
||||
// source: test.proto
|
||||
// DO NOT EDIT!
|
||||
|
||||
/*
|
||||
Package grpc_testing is a generated protocol buffer package.
|
||||
|
||||
It is generated from these files:
|
||||
test.proto
|
||||
|
||||
It has these top-level messages:
|
||||
SimpleRequest
|
||||
SimpleResponse
|
||||
*/
|
||||
package grpc_testing
|
||||
|
||||
import proto "github.com/golang/protobuf/proto"
|
||||
import fmt "fmt"
|
||||
import math "math"
|
||||
|
||||
import (
|
||||
context "golang.org/x/net/context"
|
||||
grpc "google.golang.org/grpc"
|
||||
)
|
||||
|
||||
// Reference imports to suppress errors if they are not otherwise used.
|
||||
var _ = proto.Marshal
|
||||
var _ = fmt.Errorf
|
||||
var _ = math.Inf
|
||||
|
||||
// This is a compile-time assertion to ensure that this generated file
|
||||
// is compatible with the proto package it is being compiled against.
|
||||
// A compilation error at this line likely means your copy of the
|
||||
// proto package needs to be updated.
|
||||
const _ = proto.ProtoPackageIsVersion2 // please upgrade the proto package
|
||||
|
||||
// Unary request.
|
||||
type SimpleRequest struct {
|
||||
Id int32 `protobuf:"varint,2,opt,name=id" json:"id,omitempty"`
|
||||
}
|
||||
|
||||
func (m *SimpleRequest) Reset() { *m = SimpleRequest{} }
|
||||
func (m *SimpleRequest) String() string { return proto.CompactTextString(m) }
|
||||
func (*SimpleRequest) ProtoMessage() {}
|
||||
func (*SimpleRequest) Descriptor() ([]byte, []int) { return fileDescriptor0, []int{0} }
|
||||
|
||||
// Unary response, as configured by the request.
|
||||
type SimpleResponse struct {
|
||||
Id int32 `protobuf:"varint,3,opt,name=id" json:"id,omitempty"`
|
||||
}
|
||||
|
||||
func (m *SimpleResponse) Reset() { *m = SimpleResponse{} }
|
||||
func (m *SimpleResponse) String() string { return proto.CompactTextString(m) }
|
||||
func (*SimpleResponse) ProtoMessage() {}
|
||||
func (*SimpleResponse) Descriptor() ([]byte, []int) { return fileDescriptor0, []int{1} }
|
||||
|
||||
func init() {
|
||||
proto.RegisterType((*SimpleRequest)(nil), "grpc.testing.SimpleRequest")
|
||||
proto.RegisterType((*SimpleResponse)(nil), "grpc.testing.SimpleResponse")
|
||||
}
|
||||
|
||||
// Reference imports to suppress errors if they are not otherwise used.
|
||||
var _ context.Context
|
||||
var _ grpc.ClientConn
|
||||
|
||||
// This is a compile-time assertion to ensure that this generated file
|
||||
// is compatible with the grpc package it is being compiled against.
|
||||
const _ = grpc.SupportPackageIsVersion4
|
||||
|
||||
// Client API for TestService service
|
||||
|
||||
type TestServiceClient interface {
|
||||
// One request followed by one response.
|
||||
// The server returns the client id as-is.
|
||||
UnaryCall(ctx context.Context, in *SimpleRequest, opts ...grpc.CallOption) (*SimpleResponse, error)
|
||||
// A sequence of requests with each request served by the server immediately.
|
||||
// As one request could lead to multiple responses, this interface
|
||||
// demonstrates the idea of full duplexing.
|
||||
FullDuplexCall(ctx context.Context, opts ...grpc.CallOption) (TestService_FullDuplexCallClient, error)
|
||||
}
|
||||
|
||||
type testServiceClient struct {
|
||||
cc *grpc.ClientConn
|
||||
}
|
||||
|
||||
func NewTestServiceClient(cc *grpc.ClientConn) TestServiceClient {
|
||||
return &testServiceClient{cc}
|
||||
}
|
||||
|
||||
func (c *testServiceClient) UnaryCall(ctx context.Context, in *SimpleRequest, opts ...grpc.CallOption) (*SimpleResponse, error) {
|
||||
out := new(SimpleResponse)
|
||||
err := grpc.Invoke(ctx, "/grpc.testing.TestService/UnaryCall", in, out, c.cc, opts...)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return out, nil
|
||||
}
|
||||
|
||||
func (c *testServiceClient) FullDuplexCall(ctx context.Context, opts ...grpc.CallOption) (TestService_FullDuplexCallClient, error) {
|
||||
stream, err := grpc.NewClientStream(ctx, &_TestService_serviceDesc.Streams[0], c.cc, "/grpc.testing.TestService/FullDuplexCall", opts...)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
x := &testServiceFullDuplexCallClient{stream}
|
||||
return x, nil
|
||||
}
|
||||
|
||||
type TestService_FullDuplexCallClient interface {
|
||||
Send(*SimpleRequest) error
|
||||
Recv() (*SimpleResponse, error)
|
||||
grpc.ClientStream
|
||||
}
|
||||
|
||||
type testServiceFullDuplexCallClient struct {
|
||||
grpc.ClientStream
|
||||
}
|
||||
|
||||
func (x *testServiceFullDuplexCallClient) Send(m *SimpleRequest) error {
|
||||
return x.ClientStream.SendMsg(m)
|
||||
}
|
||||
|
||||
func (x *testServiceFullDuplexCallClient) Recv() (*SimpleResponse, error) {
|
||||
m := new(SimpleResponse)
|
||||
if err := x.ClientStream.RecvMsg(m); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return m, nil
|
||||
}
|
||||
|
||||
// Server API for TestService service
|
||||
|
||||
type TestServiceServer interface {
|
||||
// One request followed by one response.
|
||||
// The server returns the client id as-is.
|
||||
UnaryCall(context.Context, *SimpleRequest) (*SimpleResponse, error)
|
||||
// A sequence of requests with each request served by the server immediately.
|
||||
// As one request could lead to multiple responses, this interface
|
||||
// demonstrates the idea of full duplexing.
|
||||
FullDuplexCall(TestService_FullDuplexCallServer) error
|
||||
}
|
||||
|
||||
func RegisterTestServiceServer(s *grpc.Server, srv TestServiceServer) {
|
||||
s.RegisterService(&_TestService_serviceDesc, srv)
|
||||
}
|
||||
|
||||
func _TestService_UnaryCall_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) {
|
||||
in := new(SimpleRequest)
|
||||
if err := dec(in); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
if interceptor == nil {
|
||||
return srv.(TestServiceServer).UnaryCall(ctx, in)
|
||||
}
|
||||
info := &grpc.UnaryServerInfo{
|
||||
Server: srv,
|
||||
FullMethod: "/grpc.testing.TestService/UnaryCall",
|
||||
}
|
||||
handler := func(ctx context.Context, req interface{}) (interface{}, error) {
|
||||
return srv.(TestServiceServer).UnaryCall(ctx, req.(*SimpleRequest))
|
||||
}
|
||||
return interceptor(ctx, in, info, handler)
|
||||
}
|
||||
|
||||
func _TestService_FullDuplexCall_Handler(srv interface{}, stream grpc.ServerStream) error {
|
||||
return srv.(TestServiceServer).FullDuplexCall(&testServiceFullDuplexCallServer{stream})
|
||||
}
|
||||
|
||||
type TestService_FullDuplexCallServer interface {
|
||||
Send(*SimpleResponse) error
|
||||
Recv() (*SimpleRequest, error)
|
||||
grpc.ServerStream
|
||||
}
|
||||
|
||||
type testServiceFullDuplexCallServer struct {
|
||||
grpc.ServerStream
|
||||
}
|
||||
|
||||
func (x *testServiceFullDuplexCallServer) Send(m *SimpleResponse) error {
|
||||
return x.ServerStream.SendMsg(m)
|
||||
}
|
||||
|
||||
func (x *testServiceFullDuplexCallServer) Recv() (*SimpleRequest, error) {
|
||||
m := new(SimpleRequest)
|
||||
if err := x.ServerStream.RecvMsg(m); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return m, nil
|
||||
}
|
||||
|
||||
var _TestService_serviceDesc = grpc.ServiceDesc{
|
||||
ServiceName: "grpc.testing.TestService",
|
||||
HandlerType: (*TestServiceServer)(nil),
|
||||
Methods: []grpc.MethodDesc{
|
||||
{
|
||||
MethodName: "UnaryCall",
|
||||
Handler: _TestService_UnaryCall_Handler,
|
||||
},
|
||||
},
|
||||
Streams: []grpc.StreamDesc{
|
||||
{
|
||||
StreamName: "FullDuplexCall",
|
||||
Handler: _TestService_FullDuplexCall_Handler,
|
||||
ServerStreams: true,
|
||||
ClientStreams: true,
|
||||
},
|
||||
},
|
||||
Metadata: "test.proto",
|
||||
}
|
||||
|
||||
func init() { proto.RegisterFile("test.proto", fileDescriptor0) }
|
||||
|
||||
var fileDescriptor0 = []byte{
|
||||
// 167 bytes of a gzipped FileDescriptorProto
|
||||
0x1f, 0x8b, 0x08, 0x00, 0x00, 0x09, 0x6e, 0x88, 0x02, 0xff, 0xe2, 0xe2, 0x2a, 0x49, 0x2d, 0x2e,
|
||||
0xd1, 0x2b, 0x28, 0xca, 0x2f, 0xc9, 0x17, 0xe2, 0x49, 0x2f, 0x2a, 0x48, 0xd6, 0x03, 0x09, 0x64,
|
||||
0xe6, 0xa5, 0x2b, 0xc9, 0x73, 0xf1, 0x06, 0x67, 0xe6, 0x16, 0xe4, 0xa4, 0x06, 0xa5, 0x16, 0x96,
|
||||
0xa6, 0x16, 0x97, 0x08, 0xf1, 0x71, 0x31, 0x65, 0xa6, 0x48, 0x30, 0x29, 0x30, 0x6a, 0xb0, 0x06,
|
||||
0x31, 0x65, 0xa6, 0x28, 0x29, 0x70, 0xf1, 0xc1, 0x14, 0x14, 0x17, 0xe4, 0xe7, 0x15, 0xa7, 0x42,
|
||||
0x55, 0x30, 0xc3, 0x54, 0x18, 0x2d, 0x63, 0xe4, 0xe2, 0x0e, 0x49, 0x2d, 0x2e, 0x09, 0x4e, 0x2d,
|
||||
0x2a, 0xcb, 0x4c, 0x4e, 0x15, 0x72, 0xe3, 0xe2, 0x0c, 0xcd, 0x4b, 0x2c, 0xaa, 0x74, 0x4e, 0xcc,
|
||||
0xc9, 0x11, 0x92, 0xd6, 0x43, 0xb6, 0x4e, 0x0f, 0xc5, 0x2e, 0x29, 0x19, 0xec, 0x92, 0x50, 0x7b,
|
||||
0xfc, 0xb9, 0xf8, 0xdc, 0x4a, 0x73, 0x72, 0x5c, 0x4a, 0x0b, 0x72, 0x52, 0x2b, 0x28, 0x34, 0x4c,
|
||||
0x83, 0xd1, 0x80, 0x31, 0x89, 0x0d, 0x1c, 0x00, 0xc6, 0x80, 0x00, 0x00, 0x00, 0xff, 0xff, 0x8d,
|
||||
0x82, 0x5b, 0xdd, 0x0e, 0x01, 0x00, 0x00,
|
||||
}
|
23
stats/grpc_testing/test.proto
Normal file
23
stats/grpc_testing/test.proto
Normal file
@ -0,0 +1,23 @@
|
||||
syntax = "proto3";
|
||||
|
||||
package grpc.testing;
|
||||
|
||||
message SimpleRequest {
|
||||
int32 id = 2;
|
||||
}
|
||||
|
||||
message SimpleResponse {
|
||||
int32 id = 3;
|
||||
}
|
||||
|
||||
// A simple test service.
|
||||
service TestService {
|
||||
// One request followed by one response.
|
||||
// The server returns the client id as-is.
|
||||
rpc UnaryCall(SimpleRequest) returns (SimpleResponse);
|
||||
|
||||
// A sequence of requests with each request served by the server immediately.
|
||||
// As one request could lead to multiple responses, this interface
|
||||
// demonstrates the idea of full duplexing.
|
||||
rpc FullDuplexCall(stream SimpleRequest) returns (stream SimpleResponse);
|
||||
}
|
219
stats/stats.go
Normal file
219
stats/stats.go
Normal file
@ -0,0 +1,219 @@
|
||||
/*
|
||||
*
|
||||
* Copyright 2016, Google Inc.
|
||||
* All rights reserved.
|
||||
*
|
||||
* Redistribution and use in source and binary forms, with or without
|
||||
* modification, are permitted provided that the following conditions are
|
||||
* met:
|
||||
*
|
||||
* * Redistributions of source code must retain the above copyright
|
||||
* notice, this list of conditions and the following disclaimer.
|
||||
* * Redistributions in binary form must reproduce the above
|
||||
* copyright notice, this list of conditions and the following disclaimer
|
||||
* in the documentation and/or other materials provided with the
|
||||
* distribution.
|
||||
* * Neither the name of Google Inc. nor the names of its
|
||||
* contributors may be used to endorse or promote products derived from
|
||||
* this software without specific prior written permission.
|
||||
*
|
||||
* THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
|
||||
* "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
|
||||
* LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
|
||||
* A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
|
||||
* OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
|
||||
* SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
|
||||
* LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
|
||||
* DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
|
||||
* THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
|
||||
* (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
|
||||
* OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
|
||||
*
|
||||
*/
|
||||
|
||||
// Package stats is for collecting and reporting various network and RPC stats.
|
||||
// This package is for monitoring purpose only. All fields are read-only.
|
||||
// All APIs are experimental.
|
||||
package stats // import "google.golang.org/grpc/stats"
|
||||
|
||||
import (
|
||||
"net"
|
||||
"sync/atomic"
|
||||
"time"
|
||||
|
||||
"golang.org/x/net/context"
|
||||
"google.golang.org/grpc/grpclog"
|
||||
)
|
||||
|
||||
// RPCStats contains stats information about RPCs.
|
||||
// All stats types in this package implements this interface.
|
||||
type RPCStats interface {
|
||||
// IsClient returns true if this RPCStats is from client side.
|
||||
IsClient() bool
|
||||
}
|
||||
|
||||
// Begin contains stats when an RPC begins.
|
||||
// FailFast are only valid if Client is true.
|
||||
type Begin struct {
|
||||
// Client is true if this Begin is from client side.
|
||||
Client bool
|
||||
// BeginTime is the time when the RPC begins.
|
||||
BeginTime time.Time
|
||||
// FailFast indicates if this RPC is failfast.
|
||||
FailFast bool
|
||||
}
|
||||
|
||||
// IsClient indicates if this is from client side.
|
||||
func (s *Begin) IsClient() bool { return s.Client }
|
||||
|
||||
// InPayload contains the information for an incoming payload.
|
||||
type InPayload struct {
|
||||
// Client is true if this InPayload is from client side.
|
||||
Client bool
|
||||
// Payload is the payload with original type.
|
||||
Payload interface{}
|
||||
// Data is the serialized message payload.
|
||||
Data []byte
|
||||
// Length is the length of uncompressed data.
|
||||
Length int
|
||||
// WireLength is the length of data on wire (compressed, signed, encrypted).
|
||||
WireLength int
|
||||
// RecvTime is the time when the payload is received.
|
||||
RecvTime time.Time
|
||||
}
|
||||
|
||||
// IsClient indicates if this is from client side.
|
||||
func (s *InPayload) IsClient() bool { return s.Client }
|
||||
|
||||
// InHeader contains stats when a header is received.
|
||||
// FullMethod, addresses and Compression are only valid if Client is false.
|
||||
type InHeader struct {
|
||||
// Client is true if this InHeader is from client side.
|
||||
Client bool
|
||||
// WireLength is the wire length of header.
|
||||
WireLength int
|
||||
|
||||
// FullMethod is the full RPC method string, i.e., /package.service/method.
|
||||
FullMethod string
|
||||
// RemoteAddr is the remote address of the corresponding connection.
|
||||
RemoteAddr net.Addr
|
||||
// LocalAddr is the local address of the corresponding connection.
|
||||
LocalAddr net.Addr
|
||||
// Compression is the compression algorithm used for the RPC.
|
||||
Compression string
|
||||
}
|
||||
|
||||
// IsClient indicates if this is from client side.
|
||||
func (s *InHeader) IsClient() bool { return s.Client }
|
||||
|
||||
// InTrailer contains stats when a trailer is received.
|
||||
type InTrailer struct {
|
||||
// Client is true if this InTrailer is from client side.
|
||||
Client bool
|
||||
// WireLength is the wire length of trailer.
|
||||
WireLength int
|
||||
}
|
||||
|
||||
// IsClient indicates if this is from client side.
|
||||
func (s *InTrailer) IsClient() bool { return s.Client }
|
||||
|
||||
// OutPayload contains the information for an outgoing payload.
|
||||
type OutPayload struct {
|
||||
// Client is true if this OutPayload is from client side.
|
||||
Client bool
|
||||
// Payload is the payload with original type.
|
||||
Payload interface{}
|
||||
// Data is the serialized message payload.
|
||||
Data []byte
|
||||
// Length is the length of uncompressed data.
|
||||
Length int
|
||||
// WireLength is the length of data on wire (compressed, signed, encrypted).
|
||||
WireLength int
|
||||
// SentTime is the time when the payload is sent.
|
||||
SentTime time.Time
|
||||
}
|
||||
|
||||
// IsClient indicates if this is from client side.
|
||||
func (s *OutPayload) IsClient() bool { return s.Client }
|
||||
|
||||
// OutHeader contains stats when a header is sent.
|
||||
// FullMethod, addresses and Compression are only valid if Client is true.
|
||||
type OutHeader struct {
|
||||
// Client is true if this OutHeader is from client side.
|
||||
Client bool
|
||||
// WireLength is the wire length of header.
|
||||
WireLength int
|
||||
|
||||
// FullMethod is the full RPC method string, i.e., /package.service/method.
|
||||
FullMethod string
|
||||
// RemoteAddr is the remote address of the corresponding connection.
|
||||
RemoteAddr net.Addr
|
||||
// LocalAddr is the local address of the corresponding connection.
|
||||
LocalAddr net.Addr
|
||||
// Compression is the compression algorithm used for the RPC.
|
||||
Compression string
|
||||
}
|
||||
|
||||
// IsClient indicates if this is from client side.
|
||||
func (s *OutHeader) IsClient() bool { return s.Client }
|
||||
|
||||
// OutTrailer contains stats when a trailer is sent.
|
||||
type OutTrailer struct {
|
||||
// Client is true if this OutTrailer is from client side.
|
||||
Client bool
|
||||
// WireLength is the wire length of trailer.
|
||||
WireLength int
|
||||
}
|
||||
|
||||
// IsClient indicates if this is from client side.
|
||||
func (s *OutTrailer) IsClient() bool { return s.Client }
|
||||
|
||||
// End contains stats when an RPC ends.
|
||||
type End struct {
|
||||
// Client is true if this End is from client side.
|
||||
Client bool
|
||||
// EndTime is the time when the RPC ends.
|
||||
EndTime time.Time
|
||||
// Error is the error just happened. Its type is gRPC error.
|
||||
Error error
|
||||
}
|
||||
|
||||
// IsClient indicates if this is from client side.
|
||||
func (s *End) IsClient() bool { return s.Client }
|
||||
|
||||
var (
|
||||
on = new(int32)
|
||||
handler func(context.Context, RPCStats)
|
||||
)
|
||||
|
||||
// On indicates whether stats is started.
|
||||
func On() bool {
|
||||
return atomic.CompareAndSwapInt32(on, 1, 1)
|
||||
}
|
||||
|
||||
// Handle processes the stats using the call back function registered by user.
|
||||
func Handle(ctx context.Context, s RPCStats) {
|
||||
handler(ctx, s)
|
||||
}
|
||||
|
||||
// RegisterHandler registers the user handler function.
|
||||
// If another handler was registered before, this new handler will overwrite the old one.
|
||||
// This handler function will be called to process the stats.
|
||||
func RegisterHandler(f func(context.Context, RPCStats)) {
|
||||
handler = f
|
||||
}
|
||||
|
||||
// Start starts the stats collection and reporting if there is a registered stats handle.
|
||||
func Start() {
|
||||
if handler == nil {
|
||||
grpclog.Println("handler is nil when starting stats. Stats is not started")
|
||||
return
|
||||
}
|
||||
atomic.StoreInt32(on, 1)
|
||||
}
|
||||
|
||||
// Stop stops the stats collection and processing.
|
||||
// Stop does not unregister handler.
|
||||
func Stop() {
|
||||
atomic.StoreInt32(on, 0)
|
||||
}
|
1192
stats/stats_test.go
Normal file
1192
stats/stats_test.go
Normal file
File diff suppressed because it is too large
Load Diff
93
stream.go
93
stream.go
@ -45,6 +45,7 @@ import (
|
||||
"golang.org/x/net/trace"
|
||||
"google.golang.org/grpc/codes"
|
||||
"google.golang.org/grpc/metadata"
|
||||
"google.golang.org/grpc/stats"
|
||||
"google.golang.org/grpc/transport"
|
||||
)
|
||||
|
||||
@ -97,7 +98,7 @@ type ClientStream interface {
|
||||
|
||||
// NewClientStream creates a new Stream for the client side. This is called
|
||||
// by generated code.
|
||||
func NewClientStream(ctx context.Context, desc *StreamDesc, cc *ClientConn, method string, opts ...CallOption) (ClientStream, error) {
|
||||
func NewClientStream(ctx context.Context, desc *StreamDesc, cc *ClientConn, method string, opts ...CallOption) (_ ClientStream, err error) {
|
||||
if cc.dopts.streamInt != nil {
|
||||
return cc.dopts.streamInt(ctx, desc, cc, method, newClientStream, opts...)
|
||||
}
|
||||
@ -143,6 +144,24 @@ func newClientStream(ctx context.Context, desc *StreamDesc, cc *ClientConn, meth
|
||||
}
|
||||
}()
|
||||
}
|
||||
if stats.On() {
|
||||
begin := &stats.Begin{
|
||||
Client: true,
|
||||
BeginTime: time.Now(),
|
||||
FailFast: c.failFast,
|
||||
}
|
||||
stats.Handle(ctx, begin)
|
||||
}
|
||||
defer func() {
|
||||
if err != nil && stats.On() {
|
||||
// Only handle end stats if err != nil.
|
||||
end := &stats.End{
|
||||
Client: true,
|
||||
Error: err,
|
||||
}
|
||||
stats.Handle(ctx, end)
|
||||
}
|
||||
}()
|
||||
gopts := BalancerGetOptions{
|
||||
BlockingWait: !c.failFast,
|
||||
}
|
||||
@ -194,6 +213,8 @@ func newClientStream(ctx context.Context, desc *StreamDesc, cc *ClientConn, meth
|
||||
|
||||
tracing: EnableTracing,
|
||||
trInfo: trInfo,
|
||||
|
||||
statsCtx: ctx,
|
||||
}
|
||||
if cc.dopts.cp != nil {
|
||||
cs.cbuf = new(bytes.Buffer)
|
||||
@ -246,6 +267,11 @@ type clientStream struct {
|
||||
// trInfo.tr is set when the clientStream is created (if EnableTracing is true),
|
||||
// and is set to nil when the clientStream's finish method is called.
|
||||
trInfo traceInfo
|
||||
|
||||
// statsCtx keeps the user context for stats handling.
|
||||
// All stats collection should use the statsCtx (instead of the stream context)
|
||||
// so that all the generated stats for a particular RPC can be associated in the processing phase.
|
||||
statsCtx context.Context
|
||||
}
|
||||
|
||||
func (cs *clientStream) Context() context.Context {
|
||||
@ -274,6 +300,8 @@ func (cs *clientStream) SendMsg(m interface{}) (err error) {
|
||||
}
|
||||
cs.mu.Unlock()
|
||||
}
|
||||
// TODO Investigate how to signal the stats handling party.
|
||||
// generate error stats if err != nil && err != io.EOF?
|
||||
defer func() {
|
||||
if err != nil {
|
||||
cs.finish(err)
|
||||
@ -296,7 +324,13 @@ func (cs *clientStream) SendMsg(m interface{}) (err error) {
|
||||
}
|
||||
err = toRPCErr(err)
|
||||
}()
|
||||
out, err := encode(cs.codec, m, cs.cp, cs.cbuf)
|
||||
var outPayload *stats.OutPayload
|
||||
if stats.On() {
|
||||
outPayload = &stats.OutPayload{
|
||||
Client: true,
|
||||
}
|
||||
}
|
||||
out, err := encode(cs.codec, m, cs.cp, cs.cbuf, outPayload)
|
||||
defer func() {
|
||||
if cs.cbuf != nil {
|
||||
cs.cbuf.Reset()
|
||||
@ -305,11 +339,37 @@ func (cs *clientStream) SendMsg(m interface{}) (err error) {
|
||||
if err != nil {
|
||||
return Errorf(codes.Internal, "grpc: %v", err)
|
||||
}
|
||||
return cs.t.Write(cs.s, out, &transport.Options{Last: false})
|
||||
err = cs.t.Write(cs.s, out, &transport.Options{Last: false})
|
||||
if err == nil && outPayload != nil {
|
||||
outPayload.SentTime = time.Now()
|
||||
stats.Handle(cs.statsCtx, outPayload)
|
||||
}
|
||||
return err
|
||||
}
|
||||
|
||||
func (cs *clientStream) RecvMsg(m interface{}) (err error) {
|
||||
err = recv(cs.p, cs.codec, cs.s, cs.dc, m, math.MaxInt32)
|
||||
defer func() {
|
||||
if err != nil && stats.On() {
|
||||
// Only generate End if err != nil.
|
||||
// If err == nil, it's not the last RecvMsg.
|
||||
// The last RecvMsg gets either an RPC error or io.EOF.
|
||||
end := &stats.End{
|
||||
Client: true,
|
||||
EndTime: time.Now(),
|
||||
}
|
||||
if err != io.EOF {
|
||||
end.Error = toRPCErr(err)
|
||||
}
|
||||
stats.Handle(cs.statsCtx, end)
|
||||
}
|
||||
}()
|
||||
var inPayload *stats.InPayload
|
||||
if stats.On() {
|
||||
inPayload = &stats.InPayload{
|
||||
Client: true,
|
||||
}
|
||||
}
|
||||
err = recv(cs.p, cs.codec, cs.s, cs.dc, m, math.MaxInt32, inPayload)
|
||||
defer func() {
|
||||
// err != nil indicates the termination of the stream.
|
||||
if err != nil {
|
||||
@ -324,11 +384,15 @@ func (cs *clientStream) RecvMsg(m interface{}) (err error) {
|
||||
}
|
||||
cs.mu.Unlock()
|
||||
}
|
||||
if inPayload != nil {
|
||||
stats.Handle(cs.statsCtx, inPayload)
|
||||
}
|
||||
if !cs.desc.ClientStreams || cs.desc.ServerStreams {
|
||||
return
|
||||
}
|
||||
// Special handling for client streaming rpc.
|
||||
err = recv(cs.p, cs.codec, cs.s, cs.dc, m, math.MaxInt32)
|
||||
// This recv expects EOF or errors, so we don't collect inPayload.
|
||||
err = recv(cs.p, cs.codec, cs.s, cs.dc, m, math.MaxInt32, nil)
|
||||
cs.closeTransportStream(err)
|
||||
if err == nil {
|
||||
return toRPCErr(errors.New("grpc: client streaming protocol violation: get <nil>, want <EOF>"))
|
||||
@ -482,7 +546,11 @@ func (ss *serverStream) SendMsg(m interface{}) (err error) {
|
||||
ss.mu.Unlock()
|
||||
}
|
||||
}()
|
||||
out, err := encode(ss.codec, m, ss.cp, ss.cbuf)
|
||||
var outPayload *stats.OutPayload
|
||||
if stats.On() {
|
||||
outPayload = &stats.OutPayload{}
|
||||
}
|
||||
out, err := encode(ss.codec, m, ss.cp, ss.cbuf, outPayload)
|
||||
defer func() {
|
||||
if ss.cbuf != nil {
|
||||
ss.cbuf.Reset()
|
||||
@ -495,6 +563,10 @@ func (ss *serverStream) SendMsg(m interface{}) (err error) {
|
||||
if err := ss.t.Write(ss.s, out, &transport.Options{Last: false}); err != nil {
|
||||
return toRPCErr(err)
|
||||
}
|
||||
if outPayload != nil {
|
||||
outPayload.SentTime = time.Now()
|
||||
stats.Handle(ss.s.Context(), outPayload)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
@ -513,7 +585,11 @@ func (ss *serverStream) RecvMsg(m interface{}) (err error) {
|
||||
ss.mu.Unlock()
|
||||
}
|
||||
}()
|
||||
if err := recv(ss.p, ss.codec, ss.s, ss.dc, m, ss.maxMsgSize); err != nil {
|
||||
var inPayload *stats.InPayload
|
||||
if stats.On() {
|
||||
inPayload = &stats.InPayload{}
|
||||
}
|
||||
if err := recv(ss.p, ss.codec, ss.s, ss.dc, m, ss.maxMsgSize, inPayload); err != nil {
|
||||
if err == io.EOF {
|
||||
return err
|
||||
}
|
||||
@ -522,5 +598,8 @@ func (ss *serverStream) RecvMsg(m interface{}) (err error) {
|
||||
}
|
||||
return toRPCErr(err)
|
||||
}
|
||||
if inPayload != nil {
|
||||
stats.Handle(ss.s.Context(), inPayload)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
@ -268,7 +268,7 @@ func (ht *serverHandlerTransport) WriteHeader(s *Stream, md metadata.MD) error {
|
||||
})
|
||||
}
|
||||
|
||||
func (ht *serverHandlerTransport) HandleStreams(startStream func(*Stream)) {
|
||||
func (ht *serverHandlerTransport) HandleStreams(startStream func(*Stream), traceCtx func(context.Context, string) context.Context) {
|
||||
// With this transport type there will be exactly 1 stream: this HTTP request.
|
||||
|
||||
var ctx context.Context
|
||||
|
@ -300,7 +300,10 @@ func TestHandlerTransport_HandleStreams(t *testing.T) {
|
||||
st.bodyw.Close() // no body
|
||||
st.ht.WriteStatus(s, codes.OK, "")
|
||||
}
|
||||
st.ht.HandleStreams(func(s *Stream) { go handleStream(s) })
|
||||
st.ht.HandleStreams(
|
||||
func(s *Stream) { go handleStream(s) },
|
||||
func(ctx context.Context, method string) context.Context { return ctx },
|
||||
)
|
||||
wantHeader := http.Header{
|
||||
"Date": nil,
|
||||
"Content-Type": {"application/grpc"},
|
||||
@ -327,7 +330,10 @@ func handleStreamCloseBodyTest(t *testing.T, statusCode codes.Code, msg string)
|
||||
handleStream := func(s *Stream) {
|
||||
st.ht.WriteStatus(s, statusCode, msg)
|
||||
}
|
||||
st.ht.HandleStreams(func(s *Stream) { go handleStream(s) })
|
||||
st.ht.HandleStreams(
|
||||
func(s *Stream) { go handleStream(s) },
|
||||
func(ctx context.Context, method string) context.Context { return ctx },
|
||||
)
|
||||
wantHeader := http.Header{
|
||||
"Date": nil,
|
||||
"Content-Type": {"application/grpc"},
|
||||
@ -375,7 +381,10 @@ func TestHandlerTransport_HandleStreams_Timeout(t *testing.T) {
|
||||
}
|
||||
ht.WriteStatus(s, codes.DeadlineExceeded, "too slow")
|
||||
}
|
||||
ht.HandleStreams(func(s *Stream) { go runStream(s) })
|
||||
ht.HandleStreams(
|
||||
func(s *Stream) { go runStream(s) },
|
||||
func(ctx context.Context, method string) context.Context { return ctx },
|
||||
)
|
||||
wantHeader := http.Header{
|
||||
"Date": nil,
|
||||
"Content-Type": {"application/grpc"},
|
||||
|
@ -51,16 +51,19 @@ import (
|
||||
"google.golang.org/grpc/grpclog"
|
||||
"google.golang.org/grpc/metadata"
|
||||
"google.golang.org/grpc/peer"
|
||||
"google.golang.org/grpc/stats"
|
||||
)
|
||||
|
||||
// http2Client implements the ClientTransport interface with HTTP2.
|
||||
type http2Client struct {
|
||||
target string // server name/addr
|
||||
userAgent string
|
||||
md interface{}
|
||||
conn net.Conn // underlying communication channel
|
||||
authInfo credentials.AuthInfo // auth info about the connection
|
||||
nextID uint32 // the next stream ID to be used
|
||||
target string // server name/addr
|
||||
userAgent string
|
||||
md interface{}
|
||||
conn net.Conn // underlying communication channel
|
||||
remoteAddr net.Addr
|
||||
localAddr net.Addr
|
||||
authInfo credentials.AuthInfo // auth info about the connection
|
||||
nextID uint32 // the next stream ID to be used
|
||||
|
||||
// writableChan synchronizes write access to the transport.
|
||||
// A writer acquires the write lock by sending a value on writableChan
|
||||
@ -175,11 +178,13 @@ func newHTTP2Client(ctx context.Context, addr TargetInfo, opts ConnectOptions) (
|
||||
}
|
||||
var buf bytes.Buffer
|
||||
t := &http2Client{
|
||||
target: addr.Addr,
|
||||
userAgent: ua,
|
||||
md: addr.Metadata,
|
||||
conn: conn,
|
||||
authInfo: authInfo,
|
||||
target: addr.Addr,
|
||||
userAgent: ua,
|
||||
md: addr.Metadata,
|
||||
conn: conn,
|
||||
remoteAddr: conn.RemoteAddr(),
|
||||
localAddr: conn.LocalAddr(),
|
||||
authInfo: authInfo,
|
||||
// The client initiated stream id is odd starting from 1.
|
||||
nextID: 1,
|
||||
writableChan: make(chan int, 1),
|
||||
@ -270,12 +275,13 @@ func (t *http2Client) newStream(ctx context.Context, callHdr *CallHdr) *Stream {
|
||||
// streams.
|
||||
func (t *http2Client) NewStream(ctx context.Context, callHdr *CallHdr) (_ *Stream, err error) {
|
||||
pr := &peer.Peer{
|
||||
Addr: t.conn.RemoteAddr(),
|
||||
Addr: t.remoteAddr,
|
||||
}
|
||||
// Attach Auth info if there is any.
|
||||
if t.authInfo != nil {
|
||||
pr.AuthInfo = t.authInfo
|
||||
}
|
||||
userCtx := ctx
|
||||
ctx = peer.NewContext(ctx, pr)
|
||||
authData := make(map[string]string)
|
||||
for _, c := range t.creds {
|
||||
@ -347,6 +353,7 @@ func (t *http2Client) NewStream(ctx context.Context, callHdr *CallHdr) (_ *Strea
|
||||
return nil, ErrConnClosing
|
||||
}
|
||||
s := t.newStream(ctx, callHdr)
|
||||
s.clientStatsCtx = userCtx
|
||||
t.activeStreams[s.id] = s
|
||||
|
||||
// This stream is not counted when applySetings(...) initialize t.streamsQuota.
|
||||
@ -413,6 +420,7 @@ func (t *http2Client) NewStream(ctx context.Context, callHdr *CallHdr) (_ *Strea
|
||||
}
|
||||
}
|
||||
first := true
|
||||
bufLen := t.hBuf.Len()
|
||||
// Sends the headers in a single batch even when they span multiple frames.
|
||||
for !endHeaders {
|
||||
size := t.hBuf.Len()
|
||||
@ -447,6 +455,17 @@ func (t *http2Client) NewStream(ctx context.Context, callHdr *CallHdr) (_ *Strea
|
||||
return nil, connectionErrorf(true, err, "transport: %v", err)
|
||||
}
|
||||
}
|
||||
if stats.On() {
|
||||
outHeader := &stats.OutHeader{
|
||||
Client: true,
|
||||
WireLength: bufLen,
|
||||
FullMethod: callHdr.Method,
|
||||
RemoteAddr: t.remoteAddr,
|
||||
LocalAddr: t.localAddr,
|
||||
Compression: callHdr.SendCompress,
|
||||
}
|
||||
stats.Handle(s.clientStatsCtx, outHeader)
|
||||
}
|
||||
t.writableChan <- 0
|
||||
return s, nil
|
||||
}
|
||||
@ -874,6 +893,24 @@ func (t *http2Client) operateHeaders(frame *http2.MetaHeadersFrame) {
|
||||
}
|
||||
|
||||
endStream := frame.StreamEnded()
|
||||
var isHeader bool
|
||||
defer func() {
|
||||
if stats.On() {
|
||||
if isHeader {
|
||||
inHeader := &stats.InHeader{
|
||||
Client: true,
|
||||
WireLength: int(frame.Header().Length),
|
||||
}
|
||||
stats.Handle(s.clientStatsCtx, inHeader)
|
||||
} else {
|
||||
inTrailer := &stats.InTrailer{
|
||||
Client: true,
|
||||
WireLength: int(frame.Header().Length),
|
||||
}
|
||||
stats.Handle(s.clientStatsCtx, inTrailer)
|
||||
}
|
||||
}
|
||||
}()
|
||||
|
||||
s.mu.Lock()
|
||||
if !endStream {
|
||||
@ -885,6 +922,7 @@ func (t *http2Client) operateHeaders(frame *http2.MetaHeadersFrame) {
|
||||
}
|
||||
close(s.headerChan)
|
||||
s.headerDone = true
|
||||
isHeader = true
|
||||
}
|
||||
if !endStream || s.state == streamDone {
|
||||
s.mu.Unlock()
|
||||
|
@ -50,6 +50,7 @@ import (
|
||||
"google.golang.org/grpc/grpclog"
|
||||
"google.golang.org/grpc/metadata"
|
||||
"google.golang.org/grpc/peer"
|
||||
"google.golang.org/grpc/stats"
|
||||
"google.golang.org/grpc/tap"
|
||||
)
|
||||
|
||||
@ -60,6 +61,8 @@ var ErrIllegalHeaderWrite = errors.New("transport: the stream is done or WriteHe
|
||||
// http2Server implements the ServerTransport interface with HTTP2.
|
||||
type http2Server struct {
|
||||
conn net.Conn
|
||||
remoteAddr net.Addr
|
||||
localAddr net.Addr
|
||||
maxStreamID uint32 // max stream ID ever seen
|
||||
authInfo credentials.AuthInfo // auth info about the connection
|
||||
inTapHandle tap.ServerInHandle
|
||||
@ -125,6 +128,8 @@ func newHTTP2Server(conn net.Conn, config *ServerConfig) (_ ServerTransport, err
|
||||
var buf bytes.Buffer
|
||||
t := &http2Server{
|
||||
conn: conn,
|
||||
remoteAddr: conn.RemoteAddr(),
|
||||
localAddr: conn.LocalAddr(),
|
||||
authInfo: config.AuthInfo,
|
||||
framer: framer,
|
||||
hBuf: &buf,
|
||||
@ -146,7 +151,7 @@ func newHTTP2Server(conn net.Conn, config *ServerConfig) (_ ServerTransport, err
|
||||
}
|
||||
|
||||
// operateHeader takes action on the decoded headers.
|
||||
func (t *http2Server) operateHeaders(frame *http2.MetaHeadersFrame, handle func(*Stream)) (close bool) {
|
||||
func (t *http2Server) operateHeaders(frame *http2.MetaHeadersFrame, handle func(*Stream), traceCtx func(context.Context, string) context.Context) (close bool) {
|
||||
buf := newRecvBuffer()
|
||||
s := &Stream{
|
||||
id: frame.Header().StreamID,
|
||||
@ -177,7 +182,7 @@ func (t *http2Server) operateHeaders(frame *http2.MetaHeadersFrame, handle func(
|
||||
s.ctx, s.cancel = context.WithCancel(context.TODO())
|
||||
}
|
||||
pr := &peer.Peer{
|
||||
Addr: t.conn.RemoteAddr(),
|
||||
Addr: t.remoteAddr,
|
||||
}
|
||||
// Attach Auth info if there is any.
|
||||
if t.authInfo != nil {
|
||||
@ -234,13 +239,25 @@ func (t *http2Server) operateHeaders(frame *http2.MetaHeadersFrame, handle func(
|
||||
s.windowHandler = func(n int) {
|
||||
t.updateWindow(s, uint32(n))
|
||||
}
|
||||
s.ctx = traceCtx(s.ctx, s.method)
|
||||
if stats.On() {
|
||||
inHeader := &stats.InHeader{
|
||||
FullMethod: s.method,
|
||||
RemoteAddr: t.remoteAddr,
|
||||
LocalAddr: t.localAddr,
|
||||
Compression: s.recvCompress,
|
||||
WireLength: int(frame.Header().Length),
|
||||
}
|
||||
stats.Handle(s.ctx, inHeader)
|
||||
}
|
||||
handle(s)
|
||||
return
|
||||
}
|
||||
|
||||
// HandleStreams receives incoming streams using the given handler. This is
|
||||
// typically run in a separate goroutine.
|
||||
func (t *http2Server) HandleStreams(handle func(*Stream)) {
|
||||
// traceCtx attaches trace to ctx and returns the new context.
|
||||
func (t *http2Server) HandleStreams(handle func(*Stream), traceCtx func(context.Context, string) context.Context) {
|
||||
// Check the validity of client preface.
|
||||
preface := make([]byte, len(clientPreface))
|
||||
if _, err := io.ReadFull(t.conn, preface); err != nil {
|
||||
@ -295,7 +312,7 @@ func (t *http2Server) HandleStreams(handle func(*Stream)) {
|
||||
}
|
||||
switch frame := frame.(type) {
|
||||
case *http2.MetaHeadersFrame:
|
||||
if t.operateHeaders(frame, handle) {
|
||||
if t.operateHeaders(frame, handle, traceCtx) {
|
||||
t.Close()
|
||||
break
|
||||
}
|
||||
@ -508,9 +525,16 @@ func (t *http2Server) WriteHeader(s *Stream, md metadata.MD) error {
|
||||
t.hEnc.WriteField(hpack.HeaderField{Name: k, Value: entry})
|
||||
}
|
||||
}
|
||||
bufLen := t.hBuf.Len()
|
||||
if err := t.writeHeaders(s, t.hBuf, false); err != nil {
|
||||
return err
|
||||
}
|
||||
if stats.On() {
|
||||
outHeader := &stats.OutHeader{
|
||||
WireLength: bufLen,
|
||||
}
|
||||
stats.Handle(s.Context(), outHeader)
|
||||
}
|
||||
t.writableChan <- 0
|
||||
return nil
|
||||
}
|
||||
@ -563,10 +587,17 @@ func (t *http2Server) WriteStatus(s *Stream, statusCode codes.Code, statusDesc s
|
||||
t.hEnc.WriteField(hpack.HeaderField{Name: k, Value: entry})
|
||||
}
|
||||
}
|
||||
bufLen := t.hBuf.Len()
|
||||
if err := t.writeHeaders(s, t.hBuf, true); err != nil {
|
||||
t.Close()
|
||||
return err
|
||||
}
|
||||
if stats.On() {
|
||||
outTrailer := &stats.OutTrailer{
|
||||
WireLength: bufLen,
|
||||
}
|
||||
stats.Handle(s.Context(), outTrailer)
|
||||
}
|
||||
t.closeStream(s)
|
||||
t.writableChan <- 0
|
||||
return nil
|
||||
@ -783,7 +814,7 @@ func (t *http2Server) closeStream(s *Stream) {
|
||||
}
|
||||
|
||||
func (t *http2Server) RemoteAddr() net.Addr {
|
||||
return t.conn.RemoteAddr()
|
||||
return t.remoteAddr
|
||||
}
|
||||
|
||||
func (t *http2Server) Drain() {
|
||||
|
@ -45,7 +45,6 @@ import (
|
||||
"sync"
|
||||
|
||||
"golang.org/x/net/context"
|
||||
"golang.org/x/net/trace"
|
||||
"google.golang.org/grpc/codes"
|
||||
"google.golang.org/grpc/credentials"
|
||||
"google.golang.org/grpc/metadata"
|
||||
@ -168,6 +167,11 @@ type Stream struct {
|
||||
id uint32
|
||||
// nil for client side Stream.
|
||||
st ServerTransport
|
||||
// clientStatsCtx keeps the user context for stats handling.
|
||||
// It's only valid on client side. Server side stats context is same as s.ctx.
|
||||
// All client side stats collection should use the clientStatsCtx (instead of the stream context)
|
||||
// so that all the generated stats for a particular RPC can be associated in the processing phase.
|
||||
clientStatsCtx context.Context
|
||||
// ctx is the associated context of the stream.
|
||||
ctx context.Context
|
||||
// cancel is always nil for client side Stream.
|
||||
@ -267,11 +271,6 @@ func (s *Stream) Context() context.Context {
|
||||
return s.ctx
|
||||
}
|
||||
|
||||
// TraceContext recreates the context of s with a trace.Trace.
|
||||
func (s *Stream) TraceContext(tr trace.Trace) {
|
||||
s.ctx = trace.NewContext(s.ctx, tr)
|
||||
}
|
||||
|
||||
// Method returns the method for the stream.
|
||||
func (s *Stream) Method() string {
|
||||
return s.method
|
||||
@ -474,7 +473,7 @@ type ClientTransport interface {
|
||||
// Write methods for a given Stream will be called serially.
|
||||
type ServerTransport interface {
|
||||
// HandleStreams receives incoming streams using the given handler.
|
||||
HandleStreams(func(*Stream))
|
||||
HandleStreams(func(*Stream), func(context.Context, string) context.Context)
|
||||
|
||||
// WriteHeader sends the header metadata for the given stream.
|
||||
// WriteHeader may not be called on all streams.
|
||||
|
@ -197,22 +197,33 @@ func (s *server) start(t *testing.T, port int, maxStreams uint32, ht hType) {
|
||||
h := &testStreamHandler{transport.(*http2Server)}
|
||||
switch ht {
|
||||
case suspended:
|
||||
go transport.HandleStreams(h.handleStreamSuspension)
|
||||
go transport.HandleStreams(h.handleStreamSuspension,
|
||||
func(ctx context.Context, method string) context.Context {
|
||||
return ctx
|
||||
})
|
||||
case misbehaved:
|
||||
go transport.HandleStreams(func(s *Stream) {
|
||||
go h.handleStreamMisbehave(t, s)
|
||||
}, func(ctx context.Context, method string) context.Context {
|
||||
return ctx
|
||||
})
|
||||
case encodingRequiredStatus:
|
||||
go transport.HandleStreams(func(s *Stream) {
|
||||
go h.handleStreamEncodingRequiredStatus(t, s)
|
||||
}, func(ctx context.Context, method string) context.Context {
|
||||
return ctx
|
||||
})
|
||||
case invalidHeaderField:
|
||||
go transport.HandleStreams(func(s *Stream) {
|
||||
go h.handleStreamInvalidHeaderField(t, s)
|
||||
}, func(ctx context.Context, method string) context.Context {
|
||||
return ctx
|
||||
})
|
||||
default:
|
||||
go transport.HandleStreams(func(s *Stream) {
|
||||
go h.handleStream(t, s)
|
||||
}, func(ctx context.Context, method string) context.Context {
|
||||
return ctx
|
||||
})
|
||||
}
|
||||
}
|
||||
|
Reference in New Issue
Block a user