grpclb: sync messages.proto and update client load reporting (#2101)

This commit is contained in:
Menghan Li
2018-06-04 10:02:09 -07:00
committed by GitHub
parent 130c87fa0d
commit 70e9c28461
6 changed files with 332 additions and 424 deletions

View File

@ -24,6 +24,7 @@ import (
"sync" "sync"
"time" "time"
durationpb "github.com/golang/protobuf/ptypes/duration"
"golang.org/x/net/context" "golang.org/x/net/context"
"google.golang.org/grpc/balancer" "google.golang.org/grpc/balancer"
"google.golang.org/grpc/connectivity" "google.golang.org/grpc/connectivity"
@ -38,7 +39,7 @@ const (
grpclbName = "grpclb" grpclbName = "grpclb"
) )
func convertDuration(d *lbpb.Duration) time.Duration { func convertDuration(d *durationpb.Duration) time.Duration {
if d == nil { if d == nil {
return 0 return 0
} }
@ -138,7 +139,7 @@ func (b *lbBuilder) Build(cc balancer.ClientConn, opt balancer.BuildOptions) bal
subConns: make(map[resolver.Address]balancer.SubConn), subConns: make(map[resolver.Address]balancer.SubConn),
scStates: make(map[balancer.SubConn]connectivity.State), scStates: make(map[balancer.SubConn]connectivity.State),
picker: &errPicker{err: balancer.ErrNoSubConnAvailable}, picker: &errPicker{err: balancer.ErrNoSubConnAvailable},
clientStats: &rpcStats{}, clientStats: newRPCStats(),
} }
return lb return lb

View File

@ -6,6 +6,8 @@ package messages // import "google.golang.org/grpc/grpclb/grpc_lb_v1/messages"
import proto "github.com/golang/protobuf/proto" import proto "github.com/golang/protobuf/proto"
import fmt "fmt" import fmt "fmt"
import math "math" import math "math"
import duration "github.com/golang/protobuf/ptypes/duration"
import timestamp "github.com/golang/protobuf/ptypes/timestamp"
// Reference imports to suppress errors if they are not otherwise used. // Reference imports to suppress errors if they are not otherwise used.
var _ = proto.Marshal var _ = proto.Marshal
@ -18,113 +20,6 @@ var _ = math.Inf
// proto package needs to be updated. // proto package needs to be updated.
const _ = proto.ProtoPackageIsVersion2 // please upgrade the proto package const _ = proto.ProtoPackageIsVersion2 // please upgrade the proto package
type Duration struct {
// Signed seconds of the span of time. Must be from -315,576,000,000
// to +315,576,000,000 inclusive.
Seconds int64 `protobuf:"varint,1,opt,name=seconds" json:"seconds,omitempty"`
// Signed fractions of a second at nanosecond resolution of the span
// of time. Durations less than one second are represented with a 0
// `seconds` field and a positive or negative `nanos` field. For durations
// of one second or more, a non-zero value for the `nanos` field must be
// of the same sign as the `seconds` field. Must be from -999,999,999
// to +999,999,999 inclusive.
Nanos int32 `protobuf:"varint,2,opt,name=nanos" json:"nanos,omitempty"`
XXX_NoUnkeyedLiteral struct{} `json:"-"`
XXX_unrecognized []byte `json:"-"`
XXX_sizecache int32 `json:"-"`
}
func (m *Duration) Reset() { *m = Duration{} }
func (m *Duration) String() string { return proto.CompactTextString(m) }
func (*Duration) ProtoMessage() {}
func (*Duration) Descriptor() ([]byte, []int) {
return fileDescriptor_messages_b81c731f0e83edbd, []int{0}
}
func (m *Duration) XXX_Unmarshal(b []byte) error {
return xxx_messageInfo_Duration.Unmarshal(m, b)
}
func (m *Duration) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) {
return xxx_messageInfo_Duration.Marshal(b, m, deterministic)
}
func (dst *Duration) XXX_Merge(src proto.Message) {
xxx_messageInfo_Duration.Merge(dst, src)
}
func (m *Duration) XXX_Size() int {
return xxx_messageInfo_Duration.Size(m)
}
func (m *Duration) XXX_DiscardUnknown() {
xxx_messageInfo_Duration.DiscardUnknown(m)
}
var xxx_messageInfo_Duration proto.InternalMessageInfo
func (m *Duration) GetSeconds() int64 {
if m != nil {
return m.Seconds
}
return 0
}
func (m *Duration) GetNanos() int32 {
if m != nil {
return m.Nanos
}
return 0
}
type Timestamp struct {
// Represents seconds of UTC time since Unix epoch
// 1970-01-01T00:00:00Z. Must be from 0001-01-01T00:00:00Z to
// 9999-12-31T23:59:59Z inclusive.
Seconds int64 `protobuf:"varint,1,opt,name=seconds" json:"seconds,omitempty"`
// Non-negative fractions of a second at nanosecond resolution. Negative
// second values with fractions must still have non-negative nanos values
// that count forward in time. Must be from 0 to 999,999,999
// inclusive.
Nanos int32 `protobuf:"varint,2,opt,name=nanos" json:"nanos,omitempty"`
XXX_NoUnkeyedLiteral struct{} `json:"-"`
XXX_unrecognized []byte `json:"-"`
XXX_sizecache int32 `json:"-"`
}
func (m *Timestamp) Reset() { *m = Timestamp{} }
func (m *Timestamp) String() string { return proto.CompactTextString(m) }
func (*Timestamp) ProtoMessage() {}
func (*Timestamp) Descriptor() ([]byte, []int) {
return fileDescriptor_messages_b81c731f0e83edbd, []int{1}
}
func (m *Timestamp) XXX_Unmarshal(b []byte) error {
return xxx_messageInfo_Timestamp.Unmarshal(m, b)
}
func (m *Timestamp) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) {
return xxx_messageInfo_Timestamp.Marshal(b, m, deterministic)
}
func (dst *Timestamp) XXX_Merge(src proto.Message) {
xxx_messageInfo_Timestamp.Merge(dst, src)
}
func (m *Timestamp) XXX_Size() int {
return xxx_messageInfo_Timestamp.Size(m)
}
func (m *Timestamp) XXX_DiscardUnknown() {
xxx_messageInfo_Timestamp.DiscardUnknown(m)
}
var xxx_messageInfo_Timestamp proto.InternalMessageInfo
func (m *Timestamp) GetSeconds() int64 {
if m != nil {
return m.Seconds
}
return 0
}
func (m *Timestamp) GetNanos() int32 {
if m != nil {
return m.Nanos
}
return 0
}
type LoadBalanceRequest struct { type LoadBalanceRequest struct {
// Types that are valid to be assigned to LoadBalanceRequestType: // Types that are valid to be assigned to LoadBalanceRequestType:
// *LoadBalanceRequest_InitialRequest // *LoadBalanceRequest_InitialRequest
@ -139,7 +34,7 @@ func (m *LoadBalanceRequest) Reset() { *m = LoadBalanceRequest{} }
func (m *LoadBalanceRequest) String() string { return proto.CompactTextString(m) } func (m *LoadBalanceRequest) String() string { return proto.CompactTextString(m) }
func (*LoadBalanceRequest) ProtoMessage() {} func (*LoadBalanceRequest) ProtoMessage() {}
func (*LoadBalanceRequest) Descriptor() ([]byte, []int) { func (*LoadBalanceRequest) Descriptor() ([]byte, []int) {
return fileDescriptor_messages_b81c731f0e83edbd, []int{2} return fileDescriptor_messages_b3d89fcb5aa158f8, []int{0}
} }
func (m *LoadBalanceRequest) XXX_Unmarshal(b []byte) error { func (m *LoadBalanceRequest) XXX_Unmarshal(b []byte) error {
return xxx_messageInfo_LoadBalanceRequest.Unmarshal(m, b) return xxx_messageInfo_LoadBalanceRequest.Unmarshal(m, b)
@ -269,7 +164,7 @@ func _LoadBalanceRequest_OneofSizer(msg proto.Message) (n int) {
} }
type InitialLoadBalanceRequest struct { type InitialLoadBalanceRequest struct {
// Name of load balanced service (IE, balancer.service.com) // Name of load balanced service (IE, balancer.service.com). Its
// length should be less than 256 bytes. // length should be less than 256 bytes.
Name string `protobuf:"bytes,1,opt,name=name" json:"name,omitempty"` Name string `protobuf:"bytes,1,opt,name=name" json:"name,omitempty"`
XXX_NoUnkeyedLiteral struct{} `json:"-"` XXX_NoUnkeyedLiteral struct{} `json:"-"`
@ -281,7 +176,7 @@ func (m *InitialLoadBalanceRequest) Reset() { *m = InitialLoadBalanceReq
func (m *InitialLoadBalanceRequest) String() string { return proto.CompactTextString(m) } func (m *InitialLoadBalanceRequest) String() string { return proto.CompactTextString(m) }
func (*InitialLoadBalanceRequest) ProtoMessage() {} func (*InitialLoadBalanceRequest) ProtoMessage() {}
func (*InitialLoadBalanceRequest) Descriptor() ([]byte, []int) { func (*InitialLoadBalanceRequest) Descriptor() ([]byte, []int) {
return fileDescriptor_messages_b81c731f0e83edbd, []int{3} return fileDescriptor_messages_b3d89fcb5aa158f8, []int{1}
} }
func (m *InitialLoadBalanceRequest) XXX_Unmarshal(b []byte) error { func (m *InitialLoadBalanceRequest) XXX_Unmarshal(b []byte) error {
return xxx_messageInfo_InitialLoadBalanceRequest.Unmarshal(m, b) return xxx_messageInfo_InitialLoadBalanceRequest.Unmarshal(m, b)
@ -308,26 +203,71 @@ func (m *InitialLoadBalanceRequest) GetName() string {
return "" return ""
} }
// Contains the number of calls finished for a particular load balance token.
type ClientStatsPerToken struct {
// See Server.load_balance_token.
LoadBalanceToken string `protobuf:"bytes,1,opt,name=load_balance_token,json=loadBalanceToken" json:"load_balance_token,omitempty"`
// The total number of RPCs that finished associated with the token.
NumCalls int64 `protobuf:"varint,2,opt,name=num_calls,json=numCalls" json:"num_calls,omitempty"`
XXX_NoUnkeyedLiteral struct{} `json:"-"`
XXX_unrecognized []byte `json:"-"`
XXX_sizecache int32 `json:"-"`
}
func (m *ClientStatsPerToken) Reset() { *m = ClientStatsPerToken{} }
func (m *ClientStatsPerToken) String() string { return proto.CompactTextString(m) }
func (*ClientStatsPerToken) ProtoMessage() {}
func (*ClientStatsPerToken) Descriptor() ([]byte, []int) {
return fileDescriptor_messages_b3d89fcb5aa158f8, []int{2}
}
func (m *ClientStatsPerToken) XXX_Unmarshal(b []byte) error {
return xxx_messageInfo_ClientStatsPerToken.Unmarshal(m, b)
}
func (m *ClientStatsPerToken) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) {
return xxx_messageInfo_ClientStatsPerToken.Marshal(b, m, deterministic)
}
func (dst *ClientStatsPerToken) XXX_Merge(src proto.Message) {
xxx_messageInfo_ClientStatsPerToken.Merge(dst, src)
}
func (m *ClientStatsPerToken) XXX_Size() int {
return xxx_messageInfo_ClientStatsPerToken.Size(m)
}
func (m *ClientStatsPerToken) XXX_DiscardUnknown() {
xxx_messageInfo_ClientStatsPerToken.DiscardUnknown(m)
}
var xxx_messageInfo_ClientStatsPerToken proto.InternalMessageInfo
func (m *ClientStatsPerToken) GetLoadBalanceToken() string {
if m != nil {
return m.LoadBalanceToken
}
return ""
}
func (m *ClientStatsPerToken) GetNumCalls() int64 {
if m != nil {
return m.NumCalls
}
return 0
}
// Contains client level statistics that are useful to load balancing. Each // Contains client level statistics that are useful to load balancing. Each
// count except the timestamp should be reset to zero after reporting the stats. // count except the timestamp should be reset to zero after reporting the stats.
type ClientStats struct { type ClientStats struct {
// The timestamp of generating the report. // The timestamp of generating the report.
Timestamp *Timestamp `protobuf:"bytes,1,opt,name=timestamp" json:"timestamp,omitempty"` Timestamp *timestamp.Timestamp `protobuf:"bytes,1,opt,name=timestamp" json:"timestamp,omitempty"`
// The total number of RPCs that started. // The total number of RPCs that started.
NumCallsStarted int64 `protobuf:"varint,2,opt,name=num_calls_started,json=numCallsStarted" json:"num_calls_started,omitempty"` NumCallsStarted int64 `protobuf:"varint,2,opt,name=num_calls_started,json=numCallsStarted" json:"num_calls_started,omitempty"`
// The total number of RPCs that finished. // The total number of RPCs that finished.
NumCallsFinished int64 `protobuf:"varint,3,opt,name=num_calls_finished,json=numCallsFinished" json:"num_calls_finished,omitempty"` NumCallsFinished int64 `protobuf:"varint,3,opt,name=num_calls_finished,json=numCallsFinished" json:"num_calls_finished,omitempty"`
// The total number of RPCs that were dropped by the client because of rate
// limiting.
NumCallsFinishedWithDropForRateLimiting int64 `protobuf:"varint,4,opt,name=num_calls_finished_with_drop_for_rate_limiting,json=numCallsFinishedWithDropForRateLimiting" json:"num_calls_finished_with_drop_for_rate_limiting,omitempty"`
// The total number of RPCs that were dropped by the client because of load
// balancing.
NumCallsFinishedWithDropForLoadBalancing int64 `protobuf:"varint,5,opt,name=num_calls_finished_with_drop_for_load_balancing,json=numCallsFinishedWithDropForLoadBalancing" json:"num_calls_finished_with_drop_for_load_balancing,omitempty"`
// The total number of RPCs that failed to reach a server except dropped RPCs. // The total number of RPCs that failed to reach a server except dropped RPCs.
NumCallsFinishedWithClientFailedToSend int64 `protobuf:"varint,6,opt,name=num_calls_finished_with_client_failed_to_send,json=numCallsFinishedWithClientFailedToSend" json:"num_calls_finished_with_client_failed_to_send,omitempty"` NumCallsFinishedWithClientFailedToSend int64 `protobuf:"varint,6,opt,name=num_calls_finished_with_client_failed_to_send,json=numCallsFinishedWithClientFailedToSend" json:"num_calls_finished_with_client_failed_to_send,omitempty"`
// The total number of RPCs that finished and are known to have been received // The total number of RPCs that finished and are known to have been received
// by a server. // by a server.
NumCallsFinishedKnownReceived int64 `protobuf:"varint,7,opt,name=num_calls_finished_known_received,json=numCallsFinishedKnownReceived" json:"num_calls_finished_known_received,omitempty"` NumCallsFinishedKnownReceived int64 `protobuf:"varint,7,opt,name=num_calls_finished_known_received,json=numCallsFinishedKnownReceived" json:"num_calls_finished_known_received,omitempty"`
// The list of dropped calls.
CallsFinishedWithDrop []*ClientStatsPerToken `protobuf:"bytes,8,rep,name=calls_finished_with_drop,json=callsFinishedWithDrop" json:"calls_finished_with_drop,omitempty"`
XXX_NoUnkeyedLiteral struct{} `json:"-"` XXX_NoUnkeyedLiteral struct{} `json:"-"`
XXX_unrecognized []byte `json:"-"` XXX_unrecognized []byte `json:"-"`
XXX_sizecache int32 `json:"-"` XXX_sizecache int32 `json:"-"`
@ -337,7 +277,7 @@ func (m *ClientStats) Reset() { *m = ClientStats{} }
func (m *ClientStats) String() string { return proto.CompactTextString(m) } func (m *ClientStats) String() string { return proto.CompactTextString(m) }
func (*ClientStats) ProtoMessage() {} func (*ClientStats) ProtoMessage() {}
func (*ClientStats) Descriptor() ([]byte, []int) { func (*ClientStats) Descriptor() ([]byte, []int) {
return fileDescriptor_messages_b81c731f0e83edbd, []int{4} return fileDescriptor_messages_b3d89fcb5aa158f8, []int{3}
} }
func (m *ClientStats) XXX_Unmarshal(b []byte) error { func (m *ClientStats) XXX_Unmarshal(b []byte) error {
return xxx_messageInfo_ClientStats.Unmarshal(m, b) return xxx_messageInfo_ClientStats.Unmarshal(m, b)
@ -357,7 +297,7 @@ func (m *ClientStats) XXX_DiscardUnknown() {
var xxx_messageInfo_ClientStats proto.InternalMessageInfo var xxx_messageInfo_ClientStats proto.InternalMessageInfo
func (m *ClientStats) GetTimestamp() *Timestamp { func (m *ClientStats) GetTimestamp() *timestamp.Timestamp {
if m != nil { if m != nil {
return m.Timestamp return m.Timestamp
} }
@ -378,20 +318,6 @@ func (m *ClientStats) GetNumCallsFinished() int64 {
return 0 return 0
} }
func (m *ClientStats) GetNumCallsFinishedWithDropForRateLimiting() int64 {
if m != nil {
return m.NumCallsFinishedWithDropForRateLimiting
}
return 0
}
func (m *ClientStats) GetNumCallsFinishedWithDropForLoadBalancing() int64 {
if m != nil {
return m.NumCallsFinishedWithDropForLoadBalancing
}
return 0
}
func (m *ClientStats) GetNumCallsFinishedWithClientFailedToSend() int64 { func (m *ClientStats) GetNumCallsFinishedWithClientFailedToSend() int64 {
if m != nil { if m != nil {
return m.NumCallsFinishedWithClientFailedToSend return m.NumCallsFinishedWithClientFailedToSend
@ -406,6 +332,13 @@ func (m *ClientStats) GetNumCallsFinishedKnownReceived() int64 {
return 0 return 0
} }
func (m *ClientStats) GetCallsFinishedWithDrop() []*ClientStatsPerToken {
if m != nil {
return m.CallsFinishedWithDrop
}
return nil
}
type LoadBalanceResponse struct { type LoadBalanceResponse struct {
// Types that are valid to be assigned to LoadBalanceResponseType: // Types that are valid to be assigned to LoadBalanceResponseType:
// *LoadBalanceResponse_InitialResponse // *LoadBalanceResponse_InitialResponse
@ -420,7 +353,7 @@ func (m *LoadBalanceResponse) Reset() { *m = LoadBalanceResponse{} }
func (m *LoadBalanceResponse) String() string { return proto.CompactTextString(m) } func (m *LoadBalanceResponse) String() string { return proto.CompactTextString(m) }
func (*LoadBalanceResponse) ProtoMessage() {} func (*LoadBalanceResponse) ProtoMessage() {}
func (*LoadBalanceResponse) Descriptor() ([]byte, []int) { func (*LoadBalanceResponse) Descriptor() ([]byte, []int) {
return fileDescriptor_messages_b81c731f0e83edbd, []int{5} return fileDescriptor_messages_b3d89fcb5aa158f8, []int{4}
} }
func (m *LoadBalanceResponse) XXX_Unmarshal(b []byte) error { func (m *LoadBalanceResponse) XXX_Unmarshal(b []byte) error {
return xxx_messageInfo_LoadBalanceResponse.Unmarshal(m, b) return xxx_messageInfo_LoadBalanceResponse.Unmarshal(m, b)
@ -559,7 +492,7 @@ type InitialLoadBalanceResponse struct {
// This interval defines how often the client should send the client stats // This interval defines how often the client should send the client stats
// to the load balancer. Stats should only be reported when the duration is // to the load balancer. Stats should only be reported when the duration is
// positive. // positive.
ClientStatsReportInterval *Duration `protobuf:"bytes,2,opt,name=client_stats_report_interval,json=clientStatsReportInterval" json:"client_stats_report_interval,omitempty"` ClientStatsReportInterval *duration.Duration `protobuf:"bytes,2,opt,name=client_stats_report_interval,json=clientStatsReportInterval" json:"client_stats_report_interval,omitempty"`
XXX_NoUnkeyedLiteral struct{} `json:"-"` XXX_NoUnkeyedLiteral struct{} `json:"-"`
XXX_unrecognized []byte `json:"-"` XXX_unrecognized []byte `json:"-"`
XXX_sizecache int32 `json:"-"` XXX_sizecache int32 `json:"-"`
@ -569,7 +502,7 @@ func (m *InitialLoadBalanceResponse) Reset() { *m = InitialLoadBalanceRe
func (m *InitialLoadBalanceResponse) String() string { return proto.CompactTextString(m) } func (m *InitialLoadBalanceResponse) String() string { return proto.CompactTextString(m) }
func (*InitialLoadBalanceResponse) ProtoMessage() {} func (*InitialLoadBalanceResponse) ProtoMessage() {}
func (*InitialLoadBalanceResponse) Descriptor() ([]byte, []int) { func (*InitialLoadBalanceResponse) Descriptor() ([]byte, []int) {
return fileDescriptor_messages_b81c731f0e83edbd, []int{6} return fileDescriptor_messages_b3d89fcb5aa158f8, []int{5}
} }
func (m *InitialLoadBalanceResponse) XXX_Unmarshal(b []byte) error { func (m *InitialLoadBalanceResponse) XXX_Unmarshal(b []byte) error {
return xxx_messageInfo_InitialLoadBalanceResponse.Unmarshal(m, b) return xxx_messageInfo_InitialLoadBalanceResponse.Unmarshal(m, b)
@ -596,7 +529,7 @@ func (m *InitialLoadBalanceResponse) GetLoadBalancerDelegate() string {
return "" return ""
} }
func (m *InitialLoadBalanceResponse) GetClientStatsReportInterval() *Duration { func (m *InitialLoadBalanceResponse) GetClientStatsReportInterval() *duration.Duration {
if m != nil { if m != nil {
return m.ClientStatsReportInterval return m.ClientStatsReportInterval
} }
@ -618,7 +551,7 @@ func (m *ServerList) Reset() { *m = ServerList{} }
func (m *ServerList) String() string { return proto.CompactTextString(m) } func (m *ServerList) String() string { return proto.CompactTextString(m) }
func (*ServerList) ProtoMessage() {} func (*ServerList) ProtoMessage() {}
func (*ServerList) Descriptor() ([]byte, []int) { func (*ServerList) Descriptor() ([]byte, []int) {
return fileDescriptor_messages_b81c731f0e83edbd, []int{7} return fileDescriptor_messages_b3d89fcb5aa158f8, []int{6}
} }
func (m *ServerList) XXX_Unmarshal(b []byte) error { func (m *ServerList) XXX_Unmarshal(b []byte) error {
return xxx_messageInfo_ServerList.Unmarshal(m, b) return xxx_messageInfo_ServerList.Unmarshal(m, b)
@ -645,10 +578,8 @@ func (m *ServerList) GetServers() []*Server {
return nil return nil
} }
// Contains server information. When none of the [drop_for_*] fields are true, // Contains server information. When the drop field is not true, use the other
// use the other fields. When drop_for_rate_limiting is true, ignore all other // fields.
// fields. Use drop_for_load_balancing only when it is true and
// drop_for_rate_limiting is false.
type Server struct { type Server struct {
// A resolved address for the server, serialized in network-byte-order. It may // A resolved address for the server, serialized in network-byte-order. It may
// either be an IPv4 or IPv6 address. // either be an IPv4 or IPv6 address.
@ -658,16 +589,13 @@ type Server struct {
// An opaque but printable token given to the frontend for each pick. All // An opaque but printable token given to the frontend for each pick. All
// frontend requests for that pick must include the token in its initial // frontend requests for that pick must include the token in its initial
// metadata. The token is used by the backend to verify the request and to // metadata. The token is used by the backend to verify the request and to
// allow the backend to report load to the gRPC LB system. // allow the backend to report load to the gRPC LB system. The token is also
// // used in client stats for reporting dropped calls.
// Its length is variable but less than 50 bytes.
LoadBalanceToken string `protobuf:"bytes,3,opt,name=load_balance_token,json=loadBalanceToken" json:"load_balance_token,omitempty"` LoadBalanceToken string `protobuf:"bytes,3,opt,name=load_balance_token,json=loadBalanceToken" json:"load_balance_token,omitempty"`
// Indicates whether this particular request should be dropped by the client // Indicates whether this particular request should be dropped by the client.
// for rate limiting. // If the request is dropped, there will be a corresponding entry in
DropForRateLimiting bool `protobuf:"varint,4,opt,name=drop_for_rate_limiting,json=dropForRateLimiting" json:"drop_for_rate_limiting,omitempty"` // ClientStats.calls_finished_with_drop.
// Indicates whether this particular request should be dropped by the client Drop bool `protobuf:"varint,4,opt,name=drop" json:"drop,omitempty"`
// for load balancing.
DropForLoadBalancing bool `protobuf:"varint,5,opt,name=drop_for_load_balancing,json=dropForLoadBalancing" json:"drop_for_load_balancing,omitempty"`
XXX_NoUnkeyedLiteral struct{} `json:"-"` XXX_NoUnkeyedLiteral struct{} `json:"-"`
XXX_unrecognized []byte `json:"-"` XXX_unrecognized []byte `json:"-"`
XXX_sizecache int32 `json:"-"` XXX_sizecache int32 `json:"-"`
@ -677,7 +605,7 @@ func (m *Server) Reset() { *m = Server{} }
func (m *Server) String() string { return proto.CompactTextString(m) } func (m *Server) String() string { return proto.CompactTextString(m) }
func (*Server) ProtoMessage() {} func (*Server) ProtoMessage() {}
func (*Server) Descriptor() ([]byte, []int) { func (*Server) Descriptor() ([]byte, []int) {
return fileDescriptor_messages_b81c731f0e83edbd, []int{8} return fileDescriptor_messages_b3d89fcb5aa158f8, []int{7}
} }
func (m *Server) XXX_Unmarshal(b []byte) error { func (m *Server) XXX_Unmarshal(b []byte) error {
return xxx_messageInfo_Server.Unmarshal(m, b) return xxx_messageInfo_Server.Unmarshal(m, b)
@ -718,25 +646,17 @@ func (m *Server) GetLoadBalanceToken() string {
return "" return ""
} }
func (m *Server) GetDropForRateLimiting() bool { func (m *Server) GetDrop() bool {
if m != nil { if m != nil {
return m.DropForRateLimiting return m.Drop
}
return false
}
func (m *Server) GetDropForLoadBalancing() bool {
if m != nil {
return m.DropForLoadBalancing
} }
return false return false
} }
func init() { func init() {
proto.RegisterType((*Duration)(nil), "grpc.lb.v1.Duration")
proto.RegisterType((*Timestamp)(nil), "grpc.lb.v1.Timestamp")
proto.RegisterType((*LoadBalanceRequest)(nil), "grpc.lb.v1.LoadBalanceRequest") proto.RegisterType((*LoadBalanceRequest)(nil), "grpc.lb.v1.LoadBalanceRequest")
proto.RegisterType((*InitialLoadBalanceRequest)(nil), "grpc.lb.v1.InitialLoadBalanceRequest") proto.RegisterType((*InitialLoadBalanceRequest)(nil), "grpc.lb.v1.InitialLoadBalanceRequest")
proto.RegisterType((*ClientStatsPerToken)(nil), "grpc.lb.v1.ClientStatsPerToken")
proto.RegisterType((*ClientStats)(nil), "grpc.lb.v1.ClientStats") proto.RegisterType((*ClientStats)(nil), "grpc.lb.v1.ClientStats")
proto.RegisterType((*LoadBalanceResponse)(nil), "grpc.lb.v1.LoadBalanceResponse") proto.RegisterType((*LoadBalanceResponse)(nil), "grpc.lb.v1.LoadBalanceResponse")
proto.RegisterType((*InitialLoadBalanceResponse)(nil), "grpc.lb.v1.InitialLoadBalanceResponse") proto.RegisterType((*InitialLoadBalanceResponse)(nil), "grpc.lb.v1.InitialLoadBalanceResponse")
@ -745,55 +665,54 @@ func init() {
} }
func init() { func init() {
proto.RegisterFile("grpc_lb_v1/messages/messages.proto", fileDescriptor_messages_b81c731f0e83edbd) proto.RegisterFile("grpc_lb_v1/messages/messages.proto", fileDescriptor_messages_b3d89fcb5aa158f8)
} }
var fileDescriptor_messages_b81c731f0e83edbd = []byte{ var fileDescriptor_messages_b3d89fcb5aa158f8 = []byte{
// 731 bytes of a gzipped FileDescriptorProto // 708 bytes of a gzipped FileDescriptorProto
0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0x94, 0x55, 0xdd, 0x4e, 0x1b, 0x39, 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0x84, 0x54, 0x61, 0x4f, 0xf3, 0x36,
0x14, 0x26, 0x9b, 0x00, 0xc9, 0x09, 0x5a, 0xb2, 0x26, 0x0b, 0x81, 0x05, 0x89, 0x1d, 0x69, 0xd9, 0x10, 0x26, 0x6b, 0xe8, 0xdb, 0x5e, 0x5f, 0x8d, 0xce, 0x6c, 0x2c, 0x2d, 0x30, 0x58, 0xa4, 0x21,
0x68, 0xc5, 0x4e, 0x04, 0xd9, 0xbd, 0xe8, 0xcf, 0x45, 0x1b, 0x10, 0x0a, 0x2d, 0x17, 0x95, 0x43, 0x34, 0xb1, 0x54, 0xc0, 0x3e, 0x6c, 0xd2, 0x3e, 0x6c, 0x05, 0xa1, 0xc2, 0xf8, 0x80, 0x52, 0xa4,
0x55, 0xa9, 0x52, 0x65, 0x39, 0x19, 0x33, 0x58, 0x38, 0xf6, 0xd4, 0x76, 0x82, 0xfa, 0x08, 0x7d, 0x4d, 0x48, 0x93, 0xe7, 0x36, 0x26, 0x58, 0xb8, 0x76, 0x66, 0xbb, 0x45, 0xfb, 0xbc, 0xff, 0x33,
0x94, 0x3e, 0x46, 0xd5, 0x67, 0xe8, 0xfb, 0x54, 0xe3, 0x99, 0xc9, 0x0c, 0x10, 0x40, 0xbd, 0x89, 0xed, 0x2f, 0x4c, 0xfb, 0x63, 0x53, 0xec, 0xa4, 0x0d, 0x2d, 0xd5, 0xfb, 0x25, 0x72, 0xee, 0x9e,
0xec, 0xe3, 0xef, 0x7c, 0xdf, 0xf1, 0x89, 0xbf, 0x33, 0xe0, 0x85, 0x3a, 0x1a, 0x11, 0x31, 0x24, 0x7b, 0xee, 0xce, 0x77, 0x8f, 0x21, 0x4c, 0x55, 0x36, 0xc6, 0x7c, 0x84, 0x67, 0xa7, 0xbd, 0x09,
0xd3, 0x83, 0xce, 0x98, 0x19, 0x43, 0x43, 0x66, 0x66, 0x0b, 0x3f, 0xd2, 0xca, 0x2a, 0x04, 0x31, 0xd5, 0x9a, 0xa4, 0x54, 0xcf, 0x0f, 0x51, 0xa6, 0xa4, 0x91, 0x08, 0x72, 0x4c, 0xc4, 0x47, 0xd1,
0xc6, 0x17, 0x43, 0x7f, 0x7a, 0xe0, 0x3d, 0x85, 0xea, 0xf1, 0x44, 0x53, 0xcb, 0x95, 0x44, 0x2d, 0xec, 0xb4, 0xfb, 0x45, 0x2a, 0x65, 0xca, 0x69, 0xcf, 0x7a, 0x46, 0xd3, 0xc7, 0x5e, 0x32, 0x55,
0x58, 0x36, 0x6c, 0xa4, 0x64, 0x60, 0x5a, 0xa5, 0xdd, 0x52, 0xbb, 0x8c, 0xb3, 0x2d, 0x6a, 0xc2, 0xc4, 0x30, 0x29, 0x1c, 0xb6, 0x7b, 0xb0, 0xec, 0x37, 0x6c, 0x42, 0xb5, 0x21, 0x93, 0xcc, 0x01,
0xa2, 0xa4, 0x52, 0x99, 0xd6, 0x2f, 0xbb, 0xa5, 0xf6, 0x22, 0x4e, 0x36, 0xde, 0x33, 0xa8, 0x9d, 0xc2, 0x7f, 0x3d, 0x40, 0xb7, 0x92, 0x24, 0x7d, 0xc2, 0x89, 0x18, 0xd3, 0x98, 0xfe, 0x31, 0xa5,
0xf3, 0x31, 0x33, 0x96, 0x8e, 0xa3, 0x9f, 0x4e, 0xfe, 0x5a, 0x02, 0x74, 0xa6, 0x68, 0xd0, 0xa3, 0xda, 0xa0, 0x3b, 0xd8, 0x62, 0x82, 0x19, 0x46, 0x38, 0x56, 0xce, 0x14, 0x78, 0x87, 0xde, 0x71,
0x82, 0xca, 0x11, 0xc3, 0xec, 0xe3, 0x84, 0x19, 0x8b, 0xde, 0xc0, 0x2a, 0x97, 0xdc, 0x72, 0x2a, 0xeb, 0xec, 0xab, 0x68, 0x91, 0x3d, 0xba, 0x76, 0x90, 0xd5, 0xf8, 0xc1, 0x46, 0xfc, 0x71, 0x11,
0x88, 0x4e, 0x42, 0x8e, 0xae, 0x7e, 0xf8, 0x97, 0x9f, 0x57, 0xed, 0x9f, 0x26, 0x90, 0xbb, 0xf9, 0x5f, 0x32, 0xfe, 0x00, 0xef, 0xc7, 0x9c, 0x51, 0x61, 0xb0, 0x36, 0xc4, 0xe8, 0xe0, 0x23, 0x4b,
0xfd, 0x05, 0xfc, 0x6b, 0x9a, 0x9f, 0x31, 0x3e, 0x87, 0x95, 0x91, 0xe0, 0x4c, 0x5a, 0x62, 0x2c, 0xf7, 0x79, 0x95, 0xee, 0xc2, 0xfa, 0x87, 0xb9, 0x7b, 0xb0, 0x11, 0xb7, 0xc6, 0x8b, 0xdf, 0xfe,
0xb5, 0x49, 0x15, 0xf5, 0xc3, 0x8d, 0x22, 0xdd, 0x91, 0x3b, 0x1f, 0xc4, 0xc7, 0xfd, 0x05, 0x5c, 0x2e, 0x74, 0xb8, 0x24, 0x09, 0x1e, 0xb9, 0x34, 0x65, 0x51, 0xd8, 0xfc, 0x99, 0xd1, 0xb0, 0x07,
0x1f, 0xe5, 0xdb, 0xde, 0x1f, 0xb0, 0x29, 0x14, 0x0d, 0xc8, 0x30, 0x91, 0xc9, 0x8a, 0x22, 0xf6, 0x9d, 0xb5, 0x95, 0x20, 0x04, 0xbe, 0x20, 0x13, 0x6a, 0xcb, 0x6f, 0xc6, 0xf6, 0x1c, 0xfe, 0x0e,
0x53, 0xc4, 0xbc, 0x0e, 0x6c, 0xde, 0x5b, 0x09, 0x42, 0x50, 0x91, 0x74, 0xcc, 0x5c, 0xf9, 0x35, 0xdb, 0x95, 0x5c, 0x77, 0x54, 0xdd, 0xcb, 0x67, 0x2a, 0xd0, 0x09, 0xa0, 0x57, 0x49, 0x4c, 0x6e,
0xec, 0xd6, 0xde, 0xe7, 0x0a, 0xd4, 0x0b, 0x62, 0xa8, 0x0b, 0x35, 0x9b, 0x75, 0x30, 0xbd, 0xe7, 0x2d, 0x02, 0xdb, 0x7c, 0x41, 0xed, 0xd0, 0xbb, 0xd0, 0x14, 0xd3, 0x09, 0x1e, 0x13, 0xce, 0x5d,
0xef, 0xc5, 0xc2, 0x66, 0xed, 0xc5, 0x39, 0x0e, 0xfd, 0x03, 0xbf, 0xc9, 0xc9, 0x98, 0x8c, 0xa8, 0x37, 0xb5, 0xb8, 0x21, 0xa6, 0x93, 0x8b, 0xfc, 0x3f, 0xfc, 0xa7, 0x06, 0xad, 0x4a, 0x0a, 0xf4,
0x10, 0x26, 0xbe, 0x93, 0xb6, 0x2c, 0x70, 0xb7, 0x2a, 0xe3, 0x55, 0x39, 0x19, 0x1f, 0xc5, 0xf1, 0x1d, 0x34, 0xe7, 0x37, 0x5f, 0xdc, 0x64, 0x37, 0x72, 0xb3, 0x89, 0xca, 0xd9, 0x44, 0xf7, 0x25,
0x41, 0x12, 0x46, 0xfb, 0x80, 0x72, 0xec, 0x05, 0x97, 0xdc, 0x5c, 0xb2, 0xa0, 0x55, 0x76, 0xe0, 0x22, 0x5e, 0x80, 0xd1, 0xd7, 0xf0, 0xc9, 0x3c, 0x4d, 0x7e, 0x75, 0xca, 0xd0, 0xa4, 0x48, 0xb7,
0x46, 0x06, 0x3e, 0x49, 0xe3, 0x88, 0x80, 0x7f, 0x17, 0x4d, 0xae, 0xb9, 0xbd, 0x24, 0x81, 0x56, 0x55, 0xa6, 0x1b, 0x3a, 0x73, 0xde, 0xc0, 0x02, 0xfb, 0xc8, 0x04, 0xd3, 0x4f, 0x34, 0x09, 0x6a,
0x11, 0xb9, 0x50, 0x9a, 0x68, 0x6a, 0x19, 0x11, 0x7c, 0xcc, 0x2d, 0x97, 0x61, 0xab, 0xe2, 0x98, 0x16, 0xdc, 0x2e, 0xc1, 0x57, 0x85, 0x1d, 0xfd, 0x06, 0xdf, 0xac, 0xa2, 0xf1, 0x0b, 0x33, 0x4f,
0xfe, 0xbe, 0xcd, 0xf4, 0x8e, 0xdb, 0xcb, 0x63, 0xad, 0xa2, 0x13, 0xa5, 0x31, 0xb5, 0xec, 0x2c, 0xb8, 0x98, 0xd4, 0x23, 0x61, 0x9c, 0x26, 0xd8, 0x48, 0xac, 0xa9, 0x48, 0x82, 0xba, 0x25, 0x3a,
0x85, 0x23, 0x0a, 0x9d, 0x47, 0x05, 0x0a, 0xed, 0x8e, 0x15, 0x16, 0x9d, 0x42, 0xfb, 0x01, 0x85, 0x5a, 0x26, 0xfa, 0x85, 0x99, 0x27, 0xd7, 0xeb, 0x95, 0xc5, 0xdf, 0xcb, 0x21, 0x15, 0x09, 0x1a,
0xbc, 0xf7, 0xb1, 0xc4, 0x07, 0xf8, 0xf7, 0x3e, 0x89, 0xf4, 0x19, 0x5c, 0x50, 0x2e, 0x58, 0x40, 0xc0, 0x97, 0x6f, 0xd0, 0x3f, 0x0b, 0xf9, 0x22, 0xb0, 0xa2, 0x63, 0xca, 0x66, 0x34, 0x09, 0xde,
0xac, 0x22, 0x86, 0xc9, 0xa0, 0xb5, 0xe4, 0x04, 0xf6, 0xe6, 0x09, 0x24, 0x7f, 0xd5, 0x89, 0xc3, 0x59, 0xca, 0xfd, 0x65, 0xca, 0x9f, 0x73, 0x54, 0x5c, 0x80, 0xd0, 0xaf, 0x10, 0xbc, 0x55, 0x64,
0x9f, 0xab, 0x01, 0x93, 0x01, 0xea, 0xc3, 0x9f, 0x73, 0xe8, 0xaf, 0xa4, 0xba, 0x96, 0x44, 0xb3, 0xa2, 0x64, 0x16, 0x34, 0x0e, 0x6b, 0xc7, 0xad, 0xb3, 0x83, 0x35, 0x6b, 0x54, 0x8e, 0x36, 0xfe,
0x11, 0xe3, 0x53, 0x16, 0xb4, 0x96, 0x1d, 0xe5, 0xce, 0x6d, 0xca, 0xd7, 0x31, 0x0a, 0xa7, 0x20, 0x6c, 0xbc, 0x5c, 0xf1, 0xa5, 0x92, 0xd9, 0x8d, 0xdf, 0xf0, 0xdb, 0x9b, 0x37, 0x7e, 0x63, 0xb3,
0xef, 0x5b, 0x09, 0xd6, 0x6e, 0x3c, 0x1b, 0x13, 0x29, 0x69, 0x18, 0x1a, 0x40, 0x23, 0x77, 0x40, 0x5d, 0x0f, 0xff, 0xf3, 0x60, 0xfb, 0xd5, 0xfe, 0xe8, 0x4c, 0x0a, 0x4d, 0xd1, 0x10, 0xda, 0x0b,
0x12, 0x4b, 0x9f, 0xc6, 0xde, 0x63, 0x16, 0x48, 0xd0, 0xfd, 0x05, 0xbc, 0x3a, 0xf3, 0x40, 0x4a, 0x29, 0x38, 0x5b, 0x31, 0xc1, 0xa3, 0x0f, 0x69, 0xc1, 0xa1, 0x07, 0x1b, 0xf1, 0xd6, 0x5c, 0x0c,
0xfa, 0x04, 0xea, 0x86, 0xe9, 0x29, 0xd3, 0x44, 0x70, 0x63, 0x53, 0x0f, 0xac, 0x17, 0xf9, 0x06, 0x05, 0xe9, 0xf7, 0xd0, 0xd2, 0x54, 0xcd, 0xa8, 0xc2, 0x9c, 0x69, 0x53, 0x88, 0x61, 0xa7, 0xca,
0xee, 0xf8, 0x8c, 0x3b, 0x0f, 0x81, 0x99, 0xed, 0x7a, 0xdb, 0xb0, 0x75, 0xcb, 0x01, 0x09, 0x67, 0x37, 0xb4, 0xee, 0x5b, 0x66, 0xc5, 0x04, 0x7a, 0xfe, 0xd7, 0xdf, 0x83, 0xee, 0x92, 0x14, 0x1c,
0x62, 0x81, 0x2f, 0x25, 0xd8, 0xba, 0xbf, 0x14, 0xf4, 0x1f, 0xac, 0x17, 0x93, 0x35, 0x09, 0x98, 0xa7, 0xd3, 0xc2, 0xdf, 0x1e, 0x74, 0xd7, 0x97, 0x82, 0xbe, 0x85, 0x9d, 0x6a, 0xb0, 0xc2, 0x09,
0x60, 0x21, 0xb5, 0x99, 0x2d, 0x9a, 0x22, 0x4f, 0xd2, 0xc7, 0xe9, 0x19, 0x7a, 0x0b, 0xdb, 0x45, 0xe5, 0x34, 0x25, 0xa6, 0xd4, 0xc7, 0xa7, 0x95, 0x35, 0x57, 0x97, 0x85, 0x0f, 0x3d, 0xc0, 0x5e,
0xcb, 0x12, 0xcd, 0x22, 0xa5, 0x2d, 0xe1, 0xd2, 0x32, 0x3d, 0xa5, 0x22, 0x2d, 0xbf, 0x59, 0x2c, 0x55, 0xbb, 0x58, 0xd1, 0x4c, 0x2a, 0x83, 0x99, 0x30, 0x54, 0xcd, 0x08, 0x2f, 0xca, 0xef, 0xac,
0x3f, 0x1b, 0x62, 0x78, 0xb3, 0xe0, 0x5e, 0xec, 0xf2, 0x4e, 0xd3, 0x34, 0xef, 0x05, 0x40, 0x7e, 0x2c, 0xf4, 0x65, 0xf1, 0x18, 0xc5, 0x9d, 0x8a, 0x96, 0x63, 0x1b, 0x7c, 0x5d, 0xc4, 0x86, 0x3f,
0x4b, 0xb4, 0x1f, 0x0f, 0xac, 0x78, 0x17, 0x0f, 0xac, 0x72, 0xbb, 0x7e, 0x88, 0xee, 0xb6, 0x03, 0x02, 0x2c, 0x5a, 0x45, 0x27, 0xf0, 0xce, 0xb5, 0xaa, 0x03, 0xcf, 0x4e, 0x16, 0xad, 0xde, 0x49,
0x67, 0x90, 0x57, 0x95, 0x6a, 0xb9, 0x51, 0xf1, 0xbe, 0x97, 0x60, 0x29, 0x39, 0x41, 0x3b, 0x00, 0x5c, 0x42, 0x6e, 0xfc, 0x46, 0xad, 0xed, 0x87, 0x7f, 0x79, 0x50, 0x77, 0x1e, 0xb4, 0x0f, 0xc0,
0x3c, 0x22, 0x34, 0x08, 0x34, 0x33, 0xc9, 0xc8, 0x5b, 0xc1, 0x35, 0x1e, 0xbd, 0x4c, 0x02, 0xb1, 0x32, 0x4c, 0x92, 0x44, 0x51, 0xad, 0x6d, 0x4b, 0xef, 0xe3, 0x26, 0xcb, 0x7e, 0x72, 0x86, 0xfc,
0xfb, 0x63, 0xed, 0x74, 0xe6, 0xb9, 0x75, 0x6c, 0xc6, 0x1b, 0x9d, 0xb4, 0xea, 0x8a, 0x49, 0x67, 0x2d, 0xc8, 0x73, 0xdb, 0x7a, 0x37, 0x63, 0x7b, 0x5e, 0x23, 0xfa, 0xda, 0x1a, 0xd1, 0x23, 0xf0,
0xc6, 0x1a, 0x6e, 0x14, 0x1a, 0x71, 0x1e, 0xc7, 0x51, 0x17, 0xd6, 0x1f, 0x30, 0x5d, 0x15, 0xaf, 0xed, 0xda, 0xf9, 0x87, 0xde, 0x71, 0x23, 0xb6, 0x67, 0xb7, 0x3e, 0xfd, 0xf3, 0x87, 0xd3, 0xa2,
0x05, 0x73, 0x0c, 0xf6, 0x3f, 0x6c, 0x3c, 0x64, 0xa4, 0x2a, 0x6e, 0x06, 0x73, 0x4c, 0xd3, 0xeb, 0xfd, 0x54, 0x72, 0x22, 0xd2, 0x48, 0xaa, 0xb4, 0x97, 0xd7, 0x6e, 0x3f, 0x7c, 0xd4, 0x7b, 0xe3,
0xbe, 0x3f, 0x08, 0x95, 0x0a, 0x05, 0xf3, 0x43, 0x25, 0xa8, 0x0c, 0x7d, 0xa5, 0xc3, 0x4e, 0xdc, 0x65, 0x1f, 0xd5, 0xed, 0x55, 0x9d, 0xff, 0x1f, 0x00, 0x00, 0xff, 0xff, 0xc8, 0x88, 0xe6, 0xf4,
0x0d, 0xf7, 0x23, 0x86, 0x9d, 0x39, 0x5f, 0x95, 0xe1, 0x92, 0xfb, 0x9a, 0x74, 0x7f, 0x04, 0x00, 0xf7, 0x05, 0x00, 0x00,
0x00, 0xff, 0xff, 0x8e, 0xd0, 0x70, 0xb7, 0x73, 0x06, 0x00, 0x00,
} }

View File

@ -14,36 +14,12 @@
syntax = "proto3"; syntax = "proto3";
import "google/protobuf/duration.proto";
import "google/protobuf/timestamp.proto";
package grpc.lb.v1; package grpc.lb.v1;
option go_package = "google.golang.org/grpc/grpclb/grpc_lb_v1/messages"; option go_package = "google.golang.org/grpc/grpclb/grpc_lb_v1/messages";
message Duration {
// Signed seconds of the span of time. Must be from -315,576,000,000
// to +315,576,000,000 inclusive.
int64 seconds = 1;
// Signed fractions of a second at nanosecond resolution of the span
// of time. Durations less than one second are represented with a 0
// `seconds` field and a positive or negative `nanos` field. For durations
// of one second or more, a non-zero value for the `nanos` field must be
// of the same sign as the `seconds` field. Must be from -999,999,999
// to +999,999,999 inclusive.
int32 nanos = 2;
}
message Timestamp {
// Represents seconds of UTC time since Unix epoch
// 1970-01-01T00:00:00Z. Must be from 0001-01-01T00:00:00Z to
// 9999-12-31T23:59:59Z inclusive.
int64 seconds = 1;
// Non-negative fractions of a second at nanosecond resolution. Negative
// second values with fractions must still have non-negative nanos values
// that count forward in time. Must be from 0 to 999,999,999
// inclusive.
int32 nanos = 2;
}
message LoadBalanceRequest { message LoadBalanceRequest {
oneof load_balance_request_type { oneof load_balance_request_type {
// This message should be sent on the first request to the load balancer. // This message should be sent on the first request to the load balancer.
@ -56,16 +32,25 @@ message LoadBalanceRequest {
} }
message InitialLoadBalanceRequest { message InitialLoadBalanceRequest {
// Name of load balanced service (IE, balancer.service.com) // Name of load balanced service (IE, balancer.service.com). Its
// length should be less than 256 bytes. // length should be less than 256 bytes.
string name = 1; string name = 1;
} }
// Contains the number of calls finished for a particular load balance token.
message ClientStatsPerToken {
// See Server.load_balance_token.
string load_balance_token = 1;
// The total number of RPCs that finished associated with the token.
int64 num_calls = 2;
}
// Contains client level statistics that are useful to load balancing. Each // Contains client level statistics that are useful to load balancing. Each
// count except the timestamp should be reset to zero after reporting the stats. // count except the timestamp should be reset to zero after reporting the stats.
message ClientStats { message ClientStats {
// The timestamp of generating the report. // The timestamp of generating the report.
Timestamp timestamp = 1; google.protobuf.Timestamp timestamp = 1;
// The total number of RPCs that started. // The total number of RPCs that started.
int64 num_calls_started = 2; int64 num_calls_started = 2;
@ -73,20 +58,17 @@ message ClientStats {
// The total number of RPCs that finished. // The total number of RPCs that finished.
int64 num_calls_finished = 3; int64 num_calls_finished = 3;
// The total number of RPCs that were dropped by the client because of rate
// limiting.
int64 num_calls_finished_with_drop_for_rate_limiting = 4;
// The total number of RPCs that were dropped by the client because of load
// balancing.
int64 num_calls_finished_with_drop_for_load_balancing = 5;
// The total number of RPCs that failed to reach a server except dropped RPCs. // The total number of RPCs that failed to reach a server except dropped RPCs.
int64 num_calls_finished_with_client_failed_to_send = 6; int64 num_calls_finished_with_client_failed_to_send = 6;
// The total number of RPCs that finished and are known to have been received // The total number of RPCs that finished and are known to have been received
// by a server. // by a server.
int64 num_calls_finished_known_received = 7; int64 num_calls_finished_known_received = 7;
// The list of dropped calls.
repeated ClientStatsPerToken calls_finished_with_drop = 8;
reserved 4, 5;
} }
message LoadBalanceResponse { message LoadBalanceResponse {
@ -111,7 +93,7 @@ message InitialLoadBalanceResponse {
// This interval defines how often the client should send the client stats // This interval defines how often the client should send the client stats
// to the load balancer. Stats should only be reported when the duration is // to the load balancer. Stats should only be reported when the duration is
// positive. // positive.
Duration client_stats_report_interval = 2; google.protobuf.Duration client_stats_report_interval = 2;
} }
message ServerList { message ServerList {
@ -125,10 +107,8 @@ message ServerList {
reserved 3; reserved 3;
} }
// Contains server information. When none of the [drop_for_*] fields are true, // Contains server information. When the drop field is not true, use the other
// use the other fields. When drop_for_rate_limiting is true, ignore all other // fields.
// fields. Use drop_for_load_balancing only when it is true and
// drop_for_rate_limiting is false.
message Server { message Server {
// A resolved address for the server, serialized in network-byte-order. It may // A resolved address for the server, serialized in network-byte-order. It may
// either be an IPv4 or IPv6 address. // either be an IPv4 or IPv6 address.
@ -140,16 +120,14 @@ message Server {
// An opaque but printable token given to the frontend for each pick. All // An opaque but printable token given to the frontend for each pick. All
// frontend requests for that pick must include the token in its initial // frontend requests for that pick must include the token in its initial
// metadata. The token is used by the backend to verify the request and to // metadata. The token is used by the backend to verify the request and to
// allow the backend to report load to the gRPC LB system. // allow the backend to report load to the gRPC LB system. The token is also
// // used in client stats for reporting dropped calls.
// Its length is variable but less than 50 bytes.
string load_balance_token = 3; string load_balance_token = 3;
// Indicates whether this particular request should be dropped by the client // Indicates whether this particular request should be dropped by the client.
// for rate limiting. // If the request is dropped, there will be a corresponding entry in
bool drop_for_rate_limiting = 4; // ClientStats.calls_finished_with_drop.
bool drop = 4;
// Indicates whether this particular request should be dropped by the client reserved 5;
// for load balancing.
bool drop_for_load_balancing = 5;
} }

View File

@ -33,7 +33,7 @@ import (
"testing" "testing"
"time" "time"
"github.com/golang/protobuf/proto" durationpb "github.com/golang/protobuf/ptypes/duration"
"golang.org/x/net/context" "golang.org/x/net/context"
"google.golang.org/grpc" "google.golang.org/grpc"
"google.golang.org/grpc/balancer" "google.golang.org/grpc/balancer"
@ -125,18 +125,81 @@ func fakeNameDialer(addr string, timeout time.Duration) (net.Conn, error) {
return net.DialTimeout("tcp", addr, timeout) return net.DialTimeout("tcp", addr, timeout)
} }
// rpcStats is same as lbmpb.ClientStats, except that numCallsDropped is a map
// instead of a slice of pointers.
//
// TODO: this struct was already defined in grpclb_picker.go. Try to merge these
// two after moving grpclb to its own package (this package).
type rpcStats struct {
numCallsStarted int64
numCallsFinished int64
numCallsFinishedWithClientFailedToSend int64
numCallsFinishedKnownReceived int64
// map load_balance_token -> num_calls_dropped
numCallsDropped map[string]int64
}
func newRPCStats() *rpcStats {
return &rpcStats{
numCallsDropped: make(map[string]int64),
}
}
func (stats *rpcStats) merge(new *lbmpb.ClientStats) {
stats.numCallsStarted += new.NumCallsStarted
stats.numCallsFinished += new.NumCallsFinished
stats.numCallsFinishedWithClientFailedToSend += new.NumCallsFinishedWithClientFailedToSend
stats.numCallsFinishedKnownReceived += new.NumCallsFinishedKnownReceived
for _, perToken := range new.CallsFinishedWithDrop {
stats.numCallsDropped[perToken.LoadBalanceToken] += perToken.NumCalls
}
}
func mapsEqual(a, b map[string]int64) bool {
if len(a) != len(b) {
return false
}
for k, v1 := range a {
if v2, ok := b[k]; !ok || v1 != v2 {
return false
}
}
return true
}
func (stats *rpcStats) equal(new *rpcStats) bool {
if stats.numCallsStarted != new.numCallsStarted {
return false
}
if stats.numCallsFinished != new.numCallsFinished {
return false
}
if stats.numCallsFinishedWithClientFailedToSend != new.numCallsFinishedWithClientFailedToSend {
return false
}
if stats.numCallsFinishedKnownReceived != new.numCallsFinishedKnownReceived {
return false
}
if !mapsEqual(stats.numCallsDropped, new.numCallsDropped) {
return false
}
return true
}
type remoteBalancer struct { type remoteBalancer struct {
sls chan *lbmpb.ServerList sls chan *lbmpb.ServerList
statsDura time.Duration statsDura time.Duration
done chan struct{} done chan struct{}
mu sync.Mutex mu sync.Mutex
stats lbmpb.ClientStats stats *rpcStats
} }
func newRemoteBalancer(intervals []time.Duration) *remoteBalancer { func newRemoteBalancer(intervals []time.Duration) *remoteBalancer {
return &remoteBalancer{ return &remoteBalancer{
sls: make(chan *lbmpb.ServerList, 1), sls: make(chan *lbmpb.ServerList, 1),
done: make(chan struct{}), done: make(chan struct{}),
stats: newRPCStats(),
} }
} }
@ -157,7 +220,7 @@ func (b *remoteBalancer) BalanceLoad(stream lbspb.LoadBalancer_BalanceLoadServer
resp := &lbmpb.LoadBalanceResponse{ resp := &lbmpb.LoadBalanceResponse{
LoadBalanceResponseType: &lbmpb.LoadBalanceResponse_InitialResponse{ LoadBalanceResponseType: &lbmpb.LoadBalanceResponse_InitialResponse{
InitialResponse: &lbmpb.InitialLoadBalanceResponse{ InitialResponse: &lbmpb.InitialLoadBalanceResponse{
ClientStatsReportInterval: &lbmpb.Duration{ ClientStatsReportInterval: &durationpb.Duration{
Seconds: int64(b.statsDura.Seconds()), Seconds: int64(b.statsDura.Seconds()),
Nanos: int32(b.statsDura.Nanoseconds() - int64(b.statsDura.Seconds())*1e9), Nanos: int32(b.statsDura.Nanoseconds() - int64(b.statsDura.Seconds())*1e9),
}, },
@ -177,12 +240,7 @@ func (b *remoteBalancer) BalanceLoad(stream lbspb.LoadBalancer_BalanceLoadServer
return return
} }
b.mu.Lock() b.mu.Lock()
b.stats.NumCallsStarted += req.GetClientStats().NumCallsStarted b.stats.merge(req.GetClientStats())
b.stats.NumCallsFinished += req.GetClientStats().NumCallsFinished
b.stats.NumCallsFinishedWithDropForRateLimiting += req.GetClientStats().NumCallsFinishedWithDropForRateLimiting
b.stats.NumCallsFinishedWithDropForLoadBalancing += req.GetClientStats().NumCallsFinishedWithDropForLoadBalancing
b.stats.NumCallsFinishedWithClientFailedToSend += req.GetClientStats().NumCallsFinishedWithClientFailedToSend
b.stats.NumCallsFinishedKnownReceived += req.GetClientStats().NumCallsFinishedKnownReceived
b.mu.Unlock() b.mu.Unlock()
} }
}() }()
@ -447,9 +505,9 @@ func TestDropRequest(t *testing.T) {
IpAddress: tss.beIPs[0], IpAddress: tss.beIPs[0],
Port: int32(tss.bePorts[0]), Port: int32(tss.bePorts[0]),
LoadBalanceToken: lbToken, LoadBalanceToken: lbToken,
DropForLoadBalancing: false, Drop: false,
}, { }, {
DropForLoadBalancing: true, Drop: true,
}}, }},
} }
creds := serverNameCheckCreds{ creds := serverNameCheckCreds{
@ -697,14 +755,14 @@ func (failPreRPCCred) RequireTransportSecurity() bool {
return false return false
} }
func checkStats(stats *lbmpb.ClientStats, expected *lbmpb.ClientStats) error { func checkStats(stats, expected *rpcStats) error {
if !proto.Equal(stats, expected) { if !stats.equal(expected) {
return fmt.Errorf("stats not equal: got %+v, want %+v", stats, expected) return fmt.Errorf("stats not equal: got %+v, want %+v", stats, expected)
} }
return nil return nil
} }
func runAndGetStats(t *testing.T, dropForLoadBalancing, dropForRateLimiting bool, runRPCs func(*grpc.ClientConn)) lbmpb.ClientStats { func runAndGetStats(t *testing.T, drop bool, runRPCs func(*grpc.ClientConn)) *rpcStats {
defer leakcheck.Check(t) defer leakcheck.Check(t)
r, cleanup := manual.GenerateAndRegisterManualResolver() r, cleanup := manual.GenerateAndRegisterManualResolver()
@ -720,8 +778,7 @@ func runAndGetStats(t *testing.T, dropForLoadBalancing, dropForRateLimiting bool
IpAddress: tss.beIPs[0], IpAddress: tss.beIPs[0],
Port: int32(tss.bePorts[0]), Port: int32(tss.bePorts[0]),
LoadBalanceToken: lbToken, LoadBalanceToken: lbToken,
DropForLoadBalancing: dropForLoadBalancing, Drop: drop,
DropForRateLimiting: dropForRateLimiting,
}}, }},
} }
tss.ls.statsDura = 100 * time.Millisecond tss.ls.statsDura = 100 * time.Millisecond
@ -760,7 +817,7 @@ const (
func TestGRPCLBStatsUnarySuccess(t *testing.T) { func TestGRPCLBStatsUnarySuccess(t *testing.T) {
defer leakcheck.Check(t) defer leakcheck.Check(t)
stats := runAndGetStats(t, false, false, func(cc *grpc.ClientConn) { stats := runAndGetStats(t, false, func(cc *grpc.ClientConn) {
testC := testpb.NewTestServiceClient(cc) testC := testpb.NewTestServiceClient(cc)
// The first non-failfast RPC succeeds, all connections are up. // The first non-failfast RPC succeeds, all connections are up.
if _, err := testC.EmptyCall(context.Background(), &testpb.Empty{}, grpc.FailFast(false)); err != nil { if _, err := testC.EmptyCall(context.Background(), &testpb.Empty{}, grpc.FailFast(false)); err != nil {
@ -771,19 +828,19 @@ func TestGRPCLBStatsUnarySuccess(t *testing.T) {
} }
}) })
if err := checkStats(&stats, &lbmpb.ClientStats{ if err := checkStats(stats, &rpcStats{
NumCallsStarted: int64(countRPC), numCallsStarted: int64(countRPC),
NumCallsFinished: int64(countRPC), numCallsFinished: int64(countRPC),
NumCallsFinishedKnownReceived: int64(countRPC), numCallsFinishedKnownReceived: int64(countRPC),
}); err != nil { }); err != nil {
t.Fatal(err) t.Fatal(err)
} }
} }
func TestGRPCLBStatsUnaryDropLoadBalancing(t *testing.T) { func TestGRPCLBStatsUnaryDrop(t *testing.T) {
defer leakcheck.Check(t) defer leakcheck.Check(t)
c := 0 c := 0
stats := runAndGetStats(t, true, false, func(cc *grpc.ClientConn) { stats := runAndGetStats(t, true, func(cc *grpc.ClientConn) {
testC := testpb.NewTestServiceClient(cc) testC := testpb.NewTestServiceClient(cc)
for { for {
c++ c++
@ -798,39 +855,11 @@ func TestGRPCLBStatsUnaryDropLoadBalancing(t *testing.T) {
} }
}) })
if err := checkStats(&stats, &lbmpb.ClientStats{ if err := checkStats(stats, &rpcStats{
NumCallsStarted: int64(countRPC + c), numCallsStarted: int64(countRPC + c),
NumCallsFinished: int64(countRPC + c), numCallsFinished: int64(countRPC + c),
NumCallsFinishedWithDropForLoadBalancing: int64(countRPC + 1), numCallsFinishedWithClientFailedToSend: int64(c - 1),
NumCallsFinishedWithClientFailedToSend: int64(c - 1), numCallsDropped: map[string]int64{lbToken: int64(countRPC + 1)},
}); err != nil {
t.Fatal(err)
}
}
func TestGRPCLBStatsUnaryDropRateLimiting(t *testing.T) {
defer leakcheck.Check(t)
c := 0
stats := runAndGetStats(t, false, true, func(cc *grpc.ClientConn) {
testC := testpb.NewTestServiceClient(cc)
for {
c++
if _, err := testC.EmptyCall(context.Background(), &testpb.Empty{}); err != nil {
if strings.Contains(err.Error(), dropErrDesc) {
break
}
}
}
for i := 0; i < countRPC; i++ {
testC.EmptyCall(context.Background(), &testpb.Empty{})
}
})
if err := checkStats(&stats, &lbmpb.ClientStats{
NumCallsStarted: int64(countRPC + c),
NumCallsFinished: int64(countRPC + c),
NumCallsFinishedWithDropForRateLimiting: int64(countRPC + 1),
NumCallsFinishedWithClientFailedToSend: int64(c - 1),
}); err != nil { }); err != nil {
t.Fatal(err) t.Fatal(err)
} }
@ -838,7 +867,7 @@ func TestGRPCLBStatsUnaryDropRateLimiting(t *testing.T) {
func TestGRPCLBStatsUnaryFailedToSend(t *testing.T) { func TestGRPCLBStatsUnaryFailedToSend(t *testing.T) {
defer leakcheck.Check(t) defer leakcheck.Check(t)
stats := runAndGetStats(t, false, false, func(cc *grpc.ClientConn) { stats := runAndGetStats(t, false, func(cc *grpc.ClientConn) {
testC := testpb.NewTestServiceClient(cc) testC := testpb.NewTestServiceClient(cc)
// The first non-failfast RPC succeeds, all connections are up. // The first non-failfast RPC succeeds, all connections are up.
if _, err := testC.EmptyCall(context.Background(), &testpb.Empty{}, grpc.FailFast(false)); err != nil { if _, err := testC.EmptyCall(context.Background(), &testpb.Empty{}, grpc.FailFast(false)); err != nil {
@ -849,11 +878,11 @@ func TestGRPCLBStatsUnaryFailedToSend(t *testing.T) {
} }
}) })
if err := checkStats(&stats, &lbmpb.ClientStats{ if err := checkStats(stats, &rpcStats{
NumCallsStarted: int64(countRPC), numCallsStarted: int64(countRPC),
NumCallsFinished: int64(countRPC), numCallsFinished: int64(countRPC),
NumCallsFinishedWithClientFailedToSend: int64(countRPC - 1), numCallsFinishedWithClientFailedToSend: int64(countRPC - 1),
NumCallsFinishedKnownReceived: 1, numCallsFinishedKnownReceived: 1,
}); err != nil { }); err != nil {
t.Fatal(err) t.Fatal(err)
} }
@ -861,7 +890,7 @@ func TestGRPCLBStatsUnaryFailedToSend(t *testing.T) {
func TestGRPCLBStatsStreamingSuccess(t *testing.T) { func TestGRPCLBStatsStreamingSuccess(t *testing.T) {
defer leakcheck.Check(t) defer leakcheck.Check(t)
stats := runAndGetStats(t, false, false, func(cc *grpc.ClientConn) { stats := runAndGetStats(t, false, func(cc *grpc.ClientConn) {
testC := testpb.NewTestServiceClient(cc) testC := testpb.NewTestServiceClient(cc)
// The first non-failfast RPC succeeds, all connections are up. // The first non-failfast RPC succeeds, all connections are up.
stream, err := testC.FullDuplexCall(context.Background(), grpc.FailFast(false)) stream, err := testC.FullDuplexCall(context.Background(), grpc.FailFast(false))
@ -886,19 +915,19 @@ func TestGRPCLBStatsStreamingSuccess(t *testing.T) {
} }
}) })
if err := checkStats(&stats, &lbmpb.ClientStats{ if err := checkStats(stats, &rpcStats{
NumCallsStarted: int64(countRPC), numCallsStarted: int64(countRPC),
NumCallsFinished: int64(countRPC), numCallsFinished: int64(countRPC),
NumCallsFinishedKnownReceived: int64(countRPC), numCallsFinishedKnownReceived: int64(countRPC),
}); err != nil { }); err != nil {
t.Fatal(err) t.Fatal(err)
} }
} }
func TestGRPCLBStatsStreamingDropLoadBalancing(t *testing.T) { func TestGRPCLBStatsStreamingDrop(t *testing.T) {
defer leakcheck.Check(t) defer leakcheck.Check(t)
c := 0 c := 0
stats := runAndGetStats(t, true, false, func(cc *grpc.ClientConn) { stats := runAndGetStats(t, true, func(cc *grpc.ClientConn) {
testC := testpb.NewTestServiceClient(cc) testC := testpb.NewTestServiceClient(cc)
for { for {
c++ c++
@ -913,39 +942,11 @@ func TestGRPCLBStatsStreamingDropLoadBalancing(t *testing.T) {
} }
}) })
if err := checkStats(&stats, &lbmpb.ClientStats{ if err := checkStats(stats, &rpcStats{
NumCallsStarted: int64(countRPC + c), numCallsStarted: int64(countRPC + c),
NumCallsFinished: int64(countRPC + c), numCallsFinished: int64(countRPC + c),
NumCallsFinishedWithDropForLoadBalancing: int64(countRPC + 1), numCallsFinishedWithClientFailedToSend: int64(c - 1),
NumCallsFinishedWithClientFailedToSend: int64(c - 1), numCallsDropped: map[string]int64{lbToken: int64(countRPC + 1)},
}); err != nil {
t.Fatal(err)
}
}
func TestGRPCLBStatsStreamingDropRateLimiting(t *testing.T) {
defer leakcheck.Check(t)
c := 0
stats := runAndGetStats(t, false, true, func(cc *grpc.ClientConn) {
testC := testpb.NewTestServiceClient(cc)
for {
c++
if _, err := testC.EmptyCall(context.Background(), &testpb.Empty{}); err != nil {
if strings.Contains(err.Error(), dropErrDesc) {
break
}
}
}
for i := 0; i < countRPC; i++ {
testC.FullDuplexCall(context.Background())
}
})
if err := checkStats(&stats, &lbmpb.ClientStats{
NumCallsStarted: int64(countRPC + c),
NumCallsFinished: int64(countRPC + c),
NumCallsFinishedWithDropForRateLimiting: int64(countRPC + 1),
NumCallsFinishedWithClientFailedToSend: int64(c - 1),
}); err != nil { }); err != nil {
t.Fatal(err) t.Fatal(err)
} }
@ -953,7 +954,7 @@ func TestGRPCLBStatsStreamingDropRateLimiting(t *testing.T) {
func TestGRPCLBStatsStreamingFailedToSend(t *testing.T) { func TestGRPCLBStatsStreamingFailedToSend(t *testing.T) {
defer leakcheck.Check(t) defer leakcheck.Check(t)
stats := runAndGetStats(t, false, false, func(cc *grpc.ClientConn) { stats := runAndGetStats(t, false, func(cc *grpc.ClientConn) {
testC := testpb.NewTestServiceClient(cc) testC := testpb.NewTestServiceClient(cc)
// The first non-failfast RPC succeeds, all connections are up. // The first non-failfast RPC succeeds, all connections are up.
stream, err := testC.FullDuplexCall(context.Background(), grpc.FailFast(false)) stream, err := testC.FullDuplexCall(context.Background(), grpc.FailFast(false))
@ -970,11 +971,11 @@ func TestGRPCLBStatsStreamingFailedToSend(t *testing.T) {
} }
}) })
if err := checkStats(&stats, &lbmpb.ClientStats{ if err := checkStats(stats, &rpcStats{
NumCallsStarted: int64(countRPC), numCallsStarted: int64(countRPC),
NumCallsFinished: int64(countRPC), numCallsFinished: int64(countRPC),
NumCallsFinishedWithClientFailedToSend: int64(countRPC - 1), numCallsFinishedWithClientFailedToSend: int64(countRPC - 1),
NumCallsFinishedKnownReceived: 1, numCallsFinishedKnownReceived: 1,
}); err != nil { }); err != nil {
t.Fatal(err) t.Fatal(err)
} }

View File

@ -30,49 +30,61 @@ import (
) )
type rpcStats struct { type rpcStats struct {
NumCallsStarted int64 numCallsStarted int64
NumCallsFinished int64 numCallsFinished int64
NumCallsFinishedWithDropForRateLimiting int64 numCallsFinishedWithClientFailedToSend int64
NumCallsFinishedWithDropForLoadBalancing int64 numCallsFinishedKnownReceived int64
NumCallsFinishedWithClientFailedToSend int64
NumCallsFinishedKnownReceived int64 mu sync.Mutex
// map load_balance_token -> num_calls_dropped
numCallsDropped map[string]int64
}
func newRPCStats() *rpcStats {
return &rpcStats{
numCallsDropped: make(map[string]int64),
}
} }
// toClientStats converts rpcStats to lbpb.ClientStats, and clears rpcStats. // toClientStats converts rpcStats to lbpb.ClientStats, and clears rpcStats.
func (s *rpcStats) toClientStats() *lbpb.ClientStats { func (s *rpcStats) toClientStats() *lbpb.ClientStats {
stats := &lbpb.ClientStats{ stats := &lbpb.ClientStats{
NumCallsStarted: atomic.SwapInt64(&s.NumCallsStarted, 0), NumCallsStarted: atomic.SwapInt64(&s.numCallsStarted, 0),
NumCallsFinished: atomic.SwapInt64(&s.NumCallsFinished, 0), NumCallsFinished: atomic.SwapInt64(&s.numCallsFinished, 0),
NumCallsFinishedWithDropForRateLimiting: atomic.SwapInt64(&s.NumCallsFinishedWithDropForRateLimiting, 0), NumCallsFinishedWithClientFailedToSend: atomic.SwapInt64(&s.numCallsFinishedWithClientFailedToSend, 0),
NumCallsFinishedWithDropForLoadBalancing: atomic.SwapInt64(&s.NumCallsFinishedWithDropForLoadBalancing, 0), NumCallsFinishedKnownReceived: atomic.SwapInt64(&s.numCallsFinishedKnownReceived, 0),
NumCallsFinishedWithClientFailedToSend: atomic.SwapInt64(&s.NumCallsFinishedWithClientFailedToSend, 0), }
NumCallsFinishedKnownReceived: atomic.SwapInt64(&s.NumCallsFinishedKnownReceived, 0), s.mu.Lock()
dropped := s.numCallsDropped
s.numCallsDropped = make(map[string]int64)
s.mu.Unlock()
for token, count := range dropped {
stats.CallsFinishedWithDrop = append(stats.CallsFinishedWithDrop, &lbpb.ClientStatsPerToken{
LoadBalanceToken: token,
NumCalls: count,
})
} }
return stats return stats
} }
func (s *rpcStats) dropForRateLimiting() { func (s *rpcStats) drop(token string) {
atomic.AddInt64(&s.NumCallsStarted, 1) atomic.AddInt64(&s.numCallsStarted, 1)
atomic.AddInt64(&s.NumCallsFinishedWithDropForRateLimiting, 1) s.mu.Lock()
atomic.AddInt64(&s.NumCallsFinished, 1) s.numCallsDropped[token]++
} s.mu.Unlock()
atomic.AddInt64(&s.numCallsFinished, 1)
func (s *rpcStats) dropForLoadBalancing() {
atomic.AddInt64(&s.NumCallsStarted, 1)
atomic.AddInt64(&s.NumCallsFinishedWithDropForLoadBalancing, 1)
atomic.AddInt64(&s.NumCallsFinished, 1)
} }
func (s *rpcStats) failedToSend() { func (s *rpcStats) failedToSend() {
atomic.AddInt64(&s.NumCallsStarted, 1) atomic.AddInt64(&s.numCallsStarted, 1)
atomic.AddInt64(&s.NumCallsFinishedWithClientFailedToSend, 1) atomic.AddInt64(&s.numCallsFinishedWithClientFailedToSend, 1)
atomic.AddInt64(&s.NumCallsFinished, 1) atomic.AddInt64(&s.numCallsFinished, 1)
} }
func (s *rpcStats) knownReceived() { func (s *rpcStats) knownReceived() {
atomic.AddInt64(&s.NumCallsStarted, 1) atomic.AddInt64(&s.numCallsStarted, 1)
atomic.AddInt64(&s.NumCallsFinishedKnownReceived, 1) atomic.AddInt64(&s.numCallsFinishedKnownReceived, 1)
atomic.AddInt64(&s.NumCallsFinished, 1) atomic.AddInt64(&s.numCallsFinished, 1)
} }
type errPicker struct { type errPicker struct {
@ -131,12 +143,8 @@ func (p *lbPicker) Pick(ctx context.Context, opts balancer.PickOptions) (balance
p.serverListNext = (p.serverListNext + 1) % len(p.serverList) p.serverListNext = (p.serverListNext + 1) % len(p.serverList)
// If it's a drop, return an error and fail the RPC. // If it's a drop, return an error and fail the RPC.
if s.DropForRateLimiting { if s.Drop {
p.stats.dropForRateLimiting() p.stats.drop(s.LoadBalanceToken)
return nil, nil, status.Errorf(codes.Unavailable, "request dropped by grpclb")
}
if s.DropForLoadBalancing {
p.stats.dropForLoadBalancing()
return nil, nil, status.Errorf(codes.Unavailable, "request dropped by grpclb") return nil, nil, status.Errorf(codes.Unavailable, "request dropped by grpclb")
} }

View File

@ -28,6 +28,7 @@ import (
"google.golang.org/grpc/balancer" "google.golang.org/grpc/balancer"
"google.golang.org/grpc/channelz" "google.golang.org/grpc/channelz"
timestamppb "github.com/golang/protobuf/ptypes/timestamp"
"google.golang.org/grpc/connectivity" "google.golang.org/grpc/connectivity"
lbpb "google.golang.org/grpc/grpclb/grpc_lb_v1/messages" lbpb "google.golang.org/grpc/grpclb/grpc_lb_v1/messages"
"google.golang.org/grpc/grpclog" "google.golang.org/grpc/grpclog"
@ -55,7 +56,7 @@ func (lb *lbBalancer) processServerList(l *lbpb.ServerList) {
var backendAddrs []resolver.Address var backendAddrs []resolver.Address
for _, s := range l.Servers { for _, s := range l.Servers {
if s.DropForLoadBalancing || s.DropForRateLimiting { if s.Drop {
continue continue
} }
@ -162,7 +163,7 @@ func (lb *lbBalancer) sendLoadReport(s *balanceLoadClientStream, interval time.D
} }
stats := lb.clientStats.toClientStats() stats := lb.clientStats.toClientStats()
t := time.Now() t := time.Now()
stats.Timestamp = &lbpb.Timestamp{ stats.Timestamp = &timestamppb.Timestamp{
Seconds: t.Unix(), Seconds: t.Unix(),
Nanos: int32(t.Nanosecond()), Nanos: int32(t.Nanosecond()),
} }