diff --git a/grpclb/grpclb.go b/grpclb.go similarity index 80% rename from grpclb/grpclb.go rename to grpclb.go index ea065fab..cdf117ca 100644 --- a/grpclb/grpclb.go +++ b/grpclb.go @@ -31,10 +31,7 @@ * */ -// Package grpclb implements the load balancing protocol defined at -// https://github.com/grpc/grpc/blob/master/doc/load-balancing.md. -// The implementation is currently EXPERIMENTAL. -package grpclb +package grpc import ( "errors" @@ -45,7 +42,6 @@ import ( "time" "golang.org/x/net/context" - "google.golang.org/grpc" "google.golang.org/grpc/codes" lbpb "google.golang.org/grpc/grpclb/grpc_lb_v1" "google.golang.org/grpc/grpclog" @@ -53,6 +49,43 @@ import ( "google.golang.org/grpc/naming" ) +// Client API for LoadBalancer service. +// Mostly copied from generated pb.go file. +// To avoid circular dependency. +type loadBalancerClient struct { + cc *ClientConn +} + +func (c *loadBalancerClient) BalanceLoad(ctx context.Context, opts ...CallOption) (*balanceLoadClientStream, error) { + desc := &StreamDesc{ + StreamName: "BalanceLoad", + ServerStreams: true, + ClientStreams: true, + } + stream, err := NewClientStream(ctx, desc, c.cc, "/grpc.lb.v1.LoadBalancer/BalanceLoad", opts...) + if err != nil { + return nil, err + } + x := &balanceLoadClientStream{stream} + return x, nil +} + +type balanceLoadClientStream struct { + ClientStream +} + +func (x *balanceLoadClientStream) Send(m *lbpb.LoadBalanceRequest) error { + return x.ClientStream.SendMsg(m) +} + +func (x *balanceLoadClientStream) Recv() (*lbpb.LoadBalanceResponse, error) { + m := new(lbpb.LoadBalanceResponse) + if err := x.ClientStream.RecvMsg(m); err != nil { + return nil, err + } + return m, nil +} + // AddressType indicates the address type returned by name resolution. type AddressType uint8 @@ -63,18 +96,18 @@ const ( GRPCLB ) -// Metadata contains the information the name resolution for grpclb should provide. The +// AddrMetadataGRPCLB contains the information the name resolution for grpclb should provide. The // name resolver used by grpclb balancer is required to provide this type of metadata in // its address updates. -type Metadata struct { +type AddrMetadataGRPCLB struct { // AddrType is the type of server (grpc load balancer or backend). AddrType AddressType // ServerName is the name of the grpc load balancer. Used for authentication. ServerName string } -// Balancer creates a grpclb load balancer. -func Balancer(r naming.Resolver) grpc.Balancer { +// NewGRPCLBBalancer creates a grpclb load balancer. +func NewGRPCLBBalancer(r naming.Resolver) Balancer { return &balancer{ r: r, } @@ -86,13 +119,16 @@ type remoteBalancerInfo struct { name string } -// addrInfo consists of the information of a backend server. -type addrInfo struct { - addr grpc.Address +// grpclbAddrInfo consists of the information of a backend server. +type grpclbAddrInfo struct { + addr Address connected bool - // dropRequest indicates whether a particular RPC which chooses this address - // should be dropped. - dropRequest bool + // dropForRateLimiting indicates whether this particular request should be + // dropped by the client for rate limiting. + dropForRateLimiting bool + // dropForLoadBalancing indicates whether this particular request should be + // dropped by the client for load balancing. + dropForLoadBalancing bool } type balancer struct { @@ -101,9 +137,9 @@ type balancer struct { mu sync.Mutex seq int // a sequence number to make sure addrCh does not get stale addresses. w naming.Watcher - addrCh chan []grpc.Address + addrCh chan []Address rbs []remoteBalancerInfo - addrs []*addrInfo + addrs []*grpclbAddrInfo next int waitCh chan struct{} done bool @@ -119,7 +155,7 @@ func (b *balancer) watchAddrUpdates(w naming.Watcher, ch chan []remoteBalancerIn b.mu.Lock() defer b.mu.Unlock() if b.done { - return grpc.ErrClientConnClosing + return ErrClientConnClosing } for _, update := range updates { switch update.Op { @@ -135,7 +171,7 @@ func (b *balancer) watchAddrUpdates(w naming.Watcher, ch chan []remoteBalancerIn if exist { continue } - md, ok := update.Metadata.(*Metadata) + md, ok := update.Metadata.(*AddrMetadataGRPCLB) if !ok { // TODO: Revisit the handling here and may introduce some fallback mechanism. grpclog.Printf("The name resolution contains unexpected metadata %v", update.Metadata) @@ -206,18 +242,19 @@ func (b *balancer) processServerList(l *lbpb.ServerList, seq int) { servers := l.GetServers() expiration := convertDuration(l.GetExpirationInterval()) var ( - sl []*addrInfo - addrs []grpc.Address + sl []*grpclbAddrInfo + addrs []Address ) for _, s := range servers { md := metadata.Pairs("lb-token", s.LoadBalanceToken) - addr := grpc.Address{ + addr := Address{ Addr: fmt.Sprintf("%s:%d", net.IP(s.IpAddress), s.Port), Metadata: &md, } - sl = append(sl, &addrInfo{ - addr: addr, - dropRequest: s.DropRequest, + sl = append(sl, &grpclbAddrInfo{ + addr: addr, + dropForRateLimiting: s.DropForRateLimiting, + dropForLoadBalancing: s.DropForLoadBalancing, }) addrs = append(addrs, addr) } @@ -244,7 +281,7 @@ func (b *balancer) processServerList(l *lbpb.ServerList, seq int) { return } -func (b *balancer) callRemoteBalancer(lbc lbpb.LoadBalancerClient, seq int) (retry bool) { +func (b *balancer) callRemoteBalancer(lbc *loadBalancerClient, seq int) (retry bool) { ctx, cancel := context.WithCancel(context.Background()) defer cancel() stream, err := lbc.BalanceLoad(ctx) @@ -306,7 +343,7 @@ func (b *balancer) callRemoteBalancer(lbc lbpb.LoadBalancerClient, seq int) (ret return true } -func (b *balancer) Start(target string, config grpc.BalancerConfig) error { +func (b *balancer) Start(target string, config BalancerConfig) error { b.rand = rand.New(rand.NewSource(time.Now().Unix())) // TODO: Fall back to the basic direct connection if there is no name resolver. if b.r == nil { @@ -316,9 +353,9 @@ func (b *balancer) Start(target string, config grpc.BalancerConfig) error { b.mu.Lock() if b.done { b.mu.Unlock() - return grpc.ErrClientConnClosing + return ErrClientConnClosing } - b.addrCh = make(chan []grpc.Address) + b.addrCh = make(chan []Address) w, err := b.r.Resolve(target) if err != nil { b.mu.Unlock() @@ -340,7 +377,7 @@ func (b *balancer) Start(target string, config grpc.BalancerConfig) error { // Spawn a goroutine to talk to the remote load balancer. go func() { var ( - cc *grpc.ClientConn + cc *ClientConn // ccError is closed when there is an error in the current cc. // A new rb should be picked from rbs and connected. ccError chan struct{} @@ -419,7 +456,7 @@ func (b *balancer) Start(target string, config grpc.BalancerConfig) error { creds := config.DialCreds ccError = make(chan struct{}) if creds == nil { - cc, err = grpc.Dial(rb.addr, grpc.WithInsecure()) + cc, err = Dial(rb.addr, WithInsecure()) } else { if rb.name != "" { if err := creds.OverrideServerName(rb.name); err != nil { @@ -427,7 +464,7 @@ func (b *balancer) Start(target string, config grpc.BalancerConfig) error { continue } } - cc, err = grpc.Dial(rb.addr, grpc.WithTransportCredentials(creds)) + cc, err = Dial(rb.addr, WithTransportCredentials(creds)) } if err != nil { grpclog.Printf("Failed to setup a connection to the remote balancer %v: %v", rb.addr, err) @@ -439,8 +476,8 @@ func (b *balancer) Start(target string, config grpc.BalancerConfig) error { seq := b.seq b.next = 0 b.mu.Unlock() - go func(cc *grpc.ClientConn, ccError chan struct{}) { - lbc := lbpb.NewLoadBalancerClient(cc) + go func(cc *ClientConn, ccError chan struct{}) { + lbc := &loadBalancerClient{cc} b.callRemoteBalancer(lbc, seq) cc.Close() select { @@ -454,7 +491,7 @@ func (b *balancer) Start(target string, config grpc.BalancerConfig) error { return nil } -func (b *balancer) down(addr grpc.Address, err error) { +func (b *balancer) down(addr Address, err error) { b.mu.Lock() defer b.mu.Unlock() for _, a := range b.addrs { @@ -465,7 +502,7 @@ func (b *balancer) down(addr grpc.Address, err error) { } } -func (b *balancer) Up(addr grpc.Address) func(error) { +func (b *balancer) Up(addr Address) func(error) { b.mu.Lock() defer b.mu.Unlock() if b.done { @@ -479,7 +516,7 @@ func (b *balancer) Up(addr grpc.Address) func(error) { } a.connected = true } - if a.connected && !a.dropRequest { + if a.connected && !a.dropForRateLimiting && !a.dropForLoadBalancing { cnt++ } } @@ -493,12 +530,12 @@ func (b *balancer) Up(addr grpc.Address) func(error) { } } -func (b *balancer) Get(ctx context.Context, opts grpc.BalancerGetOptions) (addr grpc.Address, put func(), err error) { +func (b *balancer) Get(ctx context.Context, opts BalancerGetOptions) (addr Address, put func(), err error) { var ch chan struct{} b.mu.Lock() if b.done { b.mu.Unlock() - err = grpc.ErrClientConnClosing + err = ErrClientConnClosing return } @@ -511,7 +548,7 @@ func (b *balancer) Get(ctx context.Context, opts grpc.BalancerGetOptions) (addr a := b.addrs[next] next = (next + 1) % len(b.addrs) if a.connected { - if !a.dropRequest { + if !a.dropForRateLimiting && !a.dropForLoadBalancing { addr = a.addr b.next = next b.mu.Unlock() @@ -520,7 +557,7 @@ func (b *balancer) Get(ctx context.Context, opts grpc.BalancerGetOptions) (addr if !opts.BlockingWait { b.next = next b.mu.Unlock() - err = grpc.Errorf(codes.Unavailable, "%s drops requests", a.addr.Addr) + err = Errorf(codes.Unavailable, "%s drops requests", a.addr.Addr) return } } @@ -533,7 +570,7 @@ func (b *balancer) Get(ctx context.Context, opts grpc.BalancerGetOptions) (addr if !opts.BlockingWait { if len(b.addrs) == 0 { b.mu.Unlock() - err = grpc.Errorf(codes.Unavailable, "there is no address available") + err = Errorf(codes.Unavailable, "there is no address available") return } // Returns the next addr on b.addrs for a failfast RPC. @@ -559,7 +596,7 @@ func (b *balancer) Get(ctx context.Context, opts grpc.BalancerGetOptions) (addr b.mu.Lock() if b.done { b.mu.Unlock() - err = grpc.ErrClientConnClosing + err = ErrClientConnClosing return } @@ -572,7 +609,7 @@ func (b *balancer) Get(ctx context.Context, opts grpc.BalancerGetOptions) (addr a := b.addrs[next] next = (next + 1) % len(b.addrs) if a.connected { - if !a.dropRequest { + if !a.dropForRateLimiting && !a.dropForLoadBalancing { addr = a.addr b.next = next b.mu.Unlock() @@ -581,7 +618,7 @@ func (b *balancer) Get(ctx context.Context, opts grpc.BalancerGetOptions) (addr if !opts.BlockingWait { b.next = next b.mu.Unlock() - err = grpc.Errorf(codes.Unavailable, "drop requests for the addreess %s", a.addr.Addr) + err = Errorf(codes.Unavailable, "drop requests for the addreess %s", a.addr.Addr) return } } @@ -603,7 +640,7 @@ func (b *balancer) Get(ctx context.Context, opts grpc.BalancerGetOptions) (addr } } -func (b *balancer) Notify() <-chan []grpc.Address { +func (b *balancer) Notify() <-chan []Address { return b.addrCh } diff --git a/grpclb/grpc_lb_v1/grpclb.pb.go b/grpclb/grpc_lb_v1/grpclb.pb.go index 7be89477..f63941bd 100644 --- a/grpclb/grpc_lb_v1/grpclb.pb.go +++ b/grpclb/grpc_lb_v1/grpclb.pb.go @@ -10,6 +10,7 @@ It is generated from these files: It has these top-level messages: Duration + Timestamp LoadBalanceRequest InitialLoadBalanceRequest ClientStats @@ -24,11 +25,6 @@ import proto "github.com/golang/protobuf/proto" import fmt "fmt" import math "math" -import ( - context "golang.org/x/net/context" - grpc "google.golang.org/grpc" -) - // Reference imports to suppress errors if they are not otherwise used. var _ = proto.Marshal var _ = fmt.Errorf @@ -58,6 +54,51 @@ func (m *Duration) String() string { return proto.CompactTextString(m func (*Duration) ProtoMessage() {} func (*Duration) Descriptor() ([]byte, []int) { return fileDescriptor0, []int{0} } +func (m *Duration) GetSeconds() int64 { + if m != nil { + return m.Seconds + } + return 0 +} + +func (m *Duration) GetNanos() int32 { + if m != nil { + return m.Nanos + } + return 0 +} + +type Timestamp struct { + // Represents seconds of UTC time since Unix epoch + // 1970-01-01T00:00:00Z. Must be from 0001-01-01T00:00:00Z to + // 9999-12-31T23:59:59Z inclusive. + Seconds int64 `protobuf:"varint,1,opt,name=seconds" json:"seconds,omitempty"` + // Non-negative fractions of a second at nanosecond resolution. Negative + // second values with fractions must still have non-negative nanos values + // that count forward in time. Must be from 0 to 999,999,999 + // inclusive. + Nanos int32 `protobuf:"varint,2,opt,name=nanos" json:"nanos,omitempty"` +} + +func (m *Timestamp) Reset() { *m = Timestamp{} } +func (m *Timestamp) String() string { return proto.CompactTextString(m) } +func (*Timestamp) ProtoMessage() {} +func (*Timestamp) Descriptor() ([]byte, []int) { return fileDescriptor0, []int{1} } + +func (m *Timestamp) GetSeconds() int64 { + if m != nil { + return m.Seconds + } + return 0 +} + +func (m *Timestamp) GetNanos() int32 { + if m != nil { + return m.Nanos + } + return 0 +} + type LoadBalanceRequest struct { // Types that are valid to be assigned to LoadBalanceRequestType: // *LoadBalanceRequest_InitialRequest @@ -68,17 +109,17 @@ type LoadBalanceRequest struct { func (m *LoadBalanceRequest) Reset() { *m = LoadBalanceRequest{} } func (m *LoadBalanceRequest) String() string { return proto.CompactTextString(m) } func (*LoadBalanceRequest) ProtoMessage() {} -func (*LoadBalanceRequest) Descriptor() ([]byte, []int) { return fileDescriptor0, []int{1} } +func (*LoadBalanceRequest) Descriptor() ([]byte, []int) { return fileDescriptor0, []int{2} } type isLoadBalanceRequest_LoadBalanceRequestType interface { isLoadBalanceRequest_LoadBalanceRequestType() } type LoadBalanceRequest_InitialRequest struct { - InitialRequest *InitialLoadBalanceRequest `protobuf:"bytes,1,opt,name=initial_request,oneof"` + InitialRequest *InitialLoadBalanceRequest `protobuf:"bytes,1,opt,name=initial_request,json=initialRequest,oneof"` } type LoadBalanceRequest_ClientStats struct { - ClientStats *ClientStats `protobuf:"bytes,2,opt,name=client_stats,oneof"` + ClientStats *ClientStats `protobuf:"bytes,2,opt,name=client_stats,json=clientStats,oneof"` } func (*LoadBalanceRequest_InitialRequest) isLoadBalanceRequest_LoadBalanceRequestType() {} @@ -180,30 +221,98 @@ func _LoadBalanceRequest_OneofSizer(msg proto.Message) (n int) { } type InitialLoadBalanceRequest struct { - // Name of load balanced service (IE, service.grpc.gslb.google.com) + // Name of load balanced service (IE, balancer.service.com) + // length should be less than 256 bytes. Name string `protobuf:"bytes,1,opt,name=name" json:"name,omitempty"` } func (m *InitialLoadBalanceRequest) Reset() { *m = InitialLoadBalanceRequest{} } func (m *InitialLoadBalanceRequest) String() string { return proto.CompactTextString(m) } func (*InitialLoadBalanceRequest) ProtoMessage() {} -func (*InitialLoadBalanceRequest) Descriptor() ([]byte, []int) { return fileDescriptor0, []int{2} } +func (*InitialLoadBalanceRequest) Descriptor() ([]byte, []int) { return fileDescriptor0, []int{3} } + +func (m *InitialLoadBalanceRequest) GetName() string { + if m != nil { + return m.Name + } + return "" +} // Contains client level statistics that are useful to load balancing. Each -// count should be reset to zero after reporting the stats. +// count except the timestamp should be reset to zero after reporting the stats. type ClientStats struct { - // The total number of requests sent by the client since the last report. - TotalRequests int64 `protobuf:"varint,1,opt,name=total_requests" json:"total_requests,omitempty"` - // The number of client rpc errors since the last report. - ClientRpcErrors int64 `protobuf:"varint,2,opt,name=client_rpc_errors" json:"client_rpc_errors,omitempty"` - // The number of dropped requests since the last report. - DroppedRequests int64 `protobuf:"varint,3,opt,name=dropped_requests" json:"dropped_requests,omitempty"` + // The timestamp of generating the report. + Timestamp *Timestamp `protobuf:"bytes,1,opt,name=timestamp" json:"timestamp,omitempty"` + // The total number of RPCs that started. + NumCallsStarted int64 `protobuf:"varint,2,opt,name=num_calls_started,json=numCallsStarted" json:"num_calls_started,omitempty"` + // The total number of RPCs that finished. + NumCallsFinished int64 `protobuf:"varint,3,opt,name=num_calls_finished,json=numCallsFinished" json:"num_calls_finished,omitempty"` + // The total number of RPCs that were dropped by the client because of rate + // limiting. + NumCallsFinishedWithDropForRateLimiting int64 `protobuf:"varint,4,opt,name=num_calls_finished_with_drop_for_rate_limiting,json=numCallsFinishedWithDropForRateLimiting" json:"num_calls_finished_with_drop_for_rate_limiting,omitempty"` + // The total number of RPCs that were dropped by the client because of load + // balancing. + NumCallsFinishedWithDropForLoadBalancing int64 `protobuf:"varint,5,opt,name=num_calls_finished_with_drop_for_load_balancing,json=numCallsFinishedWithDropForLoadBalancing" json:"num_calls_finished_with_drop_for_load_balancing,omitempty"` + // The total number of RPCs that failed to reach a server except dropped RPCs. + NumCallsFinishedWithClientFailedToSend int64 `protobuf:"varint,6,opt,name=num_calls_finished_with_client_failed_to_send,json=numCallsFinishedWithClientFailedToSend" json:"num_calls_finished_with_client_failed_to_send,omitempty"` + // The total number of RPCs that finished and are known to have been received + // by a server. + NumCallsFinishedKnownReceived int64 `protobuf:"varint,7,opt,name=num_calls_finished_known_received,json=numCallsFinishedKnownReceived" json:"num_calls_finished_known_received,omitempty"` } func (m *ClientStats) Reset() { *m = ClientStats{} } func (m *ClientStats) String() string { return proto.CompactTextString(m) } func (*ClientStats) ProtoMessage() {} -func (*ClientStats) Descriptor() ([]byte, []int) { return fileDescriptor0, []int{3} } +func (*ClientStats) Descriptor() ([]byte, []int) { return fileDescriptor0, []int{4} } + +func (m *ClientStats) GetTimestamp() *Timestamp { + if m != nil { + return m.Timestamp + } + return nil +} + +func (m *ClientStats) GetNumCallsStarted() int64 { + if m != nil { + return m.NumCallsStarted + } + return 0 +} + +func (m *ClientStats) GetNumCallsFinished() int64 { + if m != nil { + return m.NumCallsFinished + } + return 0 +} + +func (m *ClientStats) GetNumCallsFinishedWithDropForRateLimiting() int64 { + if m != nil { + return m.NumCallsFinishedWithDropForRateLimiting + } + return 0 +} + +func (m *ClientStats) GetNumCallsFinishedWithDropForLoadBalancing() int64 { + if m != nil { + return m.NumCallsFinishedWithDropForLoadBalancing + } + return 0 +} + +func (m *ClientStats) GetNumCallsFinishedWithClientFailedToSend() int64 { + if m != nil { + return m.NumCallsFinishedWithClientFailedToSend + } + return 0 +} + +func (m *ClientStats) GetNumCallsFinishedKnownReceived() int64 { + if m != nil { + return m.NumCallsFinishedKnownReceived + } + return 0 +} type LoadBalanceResponse struct { // Types that are valid to be assigned to LoadBalanceResponseType: @@ -215,17 +324,17 @@ type LoadBalanceResponse struct { func (m *LoadBalanceResponse) Reset() { *m = LoadBalanceResponse{} } func (m *LoadBalanceResponse) String() string { return proto.CompactTextString(m) } func (*LoadBalanceResponse) ProtoMessage() {} -func (*LoadBalanceResponse) Descriptor() ([]byte, []int) { return fileDescriptor0, []int{4} } +func (*LoadBalanceResponse) Descriptor() ([]byte, []int) { return fileDescriptor0, []int{5} } type isLoadBalanceResponse_LoadBalanceResponseType interface { isLoadBalanceResponse_LoadBalanceResponseType() } type LoadBalanceResponse_InitialResponse struct { - InitialResponse *InitialLoadBalanceResponse `protobuf:"bytes,1,opt,name=initial_response,oneof"` + InitialResponse *InitialLoadBalanceResponse `protobuf:"bytes,1,opt,name=initial_response,json=initialResponse,oneof"` } type LoadBalanceResponse_ServerList struct { - ServerList *ServerList `protobuf:"bytes,2,opt,name=server_list,oneof"` + ServerList *ServerList `protobuf:"bytes,2,opt,name=server_list,json=serverList,oneof"` } func (*LoadBalanceResponse_InitialResponse) isLoadBalanceResponse_LoadBalanceResponseType() {} @@ -330,18 +439,26 @@ type InitialLoadBalanceResponse struct { // This is an application layer redirect that indicates the client should use // the specified server for load balancing. When this field is non-empty in // the response, the client should open a separate connection to the - // load_balancer_delegate and call the BalanceLoad method. - LoadBalancerDelegate string `protobuf:"bytes,1,opt,name=load_balancer_delegate" json:"load_balancer_delegate,omitempty"` + // load_balancer_delegate and call the BalanceLoad method. Its length should + // be less than 64 bytes. + LoadBalancerDelegate string `protobuf:"bytes,1,opt,name=load_balancer_delegate,json=loadBalancerDelegate" json:"load_balancer_delegate,omitempty"` // This interval defines how often the client should send the client stats // to the load balancer. Stats should only be reported when the duration is // positive. - ClientStatsReportInterval *Duration `protobuf:"bytes,3,opt,name=client_stats_report_interval" json:"client_stats_report_interval,omitempty"` + ClientStatsReportInterval *Duration `protobuf:"bytes,2,opt,name=client_stats_report_interval,json=clientStatsReportInterval" json:"client_stats_report_interval,omitempty"` } func (m *InitialLoadBalanceResponse) Reset() { *m = InitialLoadBalanceResponse{} } func (m *InitialLoadBalanceResponse) String() string { return proto.CompactTextString(m) } func (*InitialLoadBalanceResponse) ProtoMessage() {} -func (*InitialLoadBalanceResponse) Descriptor() ([]byte, []int) { return fileDescriptor0, []int{5} } +func (*InitialLoadBalanceResponse) Descriptor() ([]byte, []int) { return fileDescriptor0, []int{6} } + +func (m *InitialLoadBalanceResponse) GetLoadBalancerDelegate() string { + if m != nil { + return m.LoadBalancerDelegate + } + return "" +} func (m *InitialLoadBalanceResponse) GetClientStatsReportInterval() *Duration { if m != nil { @@ -360,13 +477,13 @@ type ServerList struct { // list as valid. It may be considered stale after waiting this interval of // time after receiving the list. If the interval is not positive, the // client can assume the list is valid until the next list is received. - ExpirationInterval *Duration `protobuf:"bytes,3,opt,name=expiration_interval" json:"expiration_interval,omitempty"` + ExpirationInterval *Duration `protobuf:"bytes,3,opt,name=expiration_interval,json=expirationInterval" json:"expiration_interval,omitempty"` } func (m *ServerList) Reset() { *m = ServerList{} } func (m *ServerList) String() string { return proto.CompactTextString(m) } func (*ServerList) ProtoMessage() {} -func (*ServerList) Descriptor() ([]byte, []int) { return fileDescriptor0, []int{6} } +func (*ServerList) Descriptor() ([]byte, []int) { return fileDescriptor0, []int{7} } func (m *ServerList) GetServers() []*Server { if m != nil { @@ -382,29 +499,74 @@ func (m *ServerList) GetExpirationInterval() *Duration { return nil } +// Contains server information. When none of the [drop_for_*] fields are true, +// use the other fields. When drop_for_rate_limiting is true, ignore all other +// fields. Use drop_for_load_balancing only when it is true and +// drop_for_rate_limiting is false. type Server struct { // A resolved address for the server, serialized in network-byte-order. It may // either be an IPv4 or IPv6 address. - IpAddress []byte `protobuf:"bytes,1,opt,name=ip_address,proto3" json:"ip_address,omitempty"` + IpAddress []byte `protobuf:"bytes,1,opt,name=ip_address,json=ipAddress,proto3" json:"ip_address,omitempty"` // A resolved port number for the server. Port int32 `protobuf:"varint,2,opt,name=port" json:"port,omitempty"` // An opaque but printable token given to the frontend for each pick. All // frontend requests for that pick must include the token in its initial // metadata. The token is used by the backend to verify the request and to // allow the backend to report load to the gRPC LB system. - LoadBalanceToken string `protobuf:"bytes,3,opt,name=load_balance_token" json:"load_balance_token,omitempty"` + // + // Its length is variable but less than 50 bytes. + LoadBalanceToken string `protobuf:"bytes,3,opt,name=load_balance_token,json=loadBalanceToken" json:"load_balance_token,omitempty"` // Indicates whether this particular request should be dropped by the client - // when this server is chosen from the list. - DropRequest bool `protobuf:"varint,4,opt,name=drop_request" json:"drop_request,omitempty"` + // for rate limiting. + DropForRateLimiting bool `protobuf:"varint,4,opt,name=drop_for_rate_limiting,json=dropForRateLimiting" json:"drop_for_rate_limiting,omitempty"` + // Indicates whether this particular request should be dropped by the client + // for load balancing. + DropForLoadBalancing bool `protobuf:"varint,5,opt,name=drop_for_load_balancing,json=dropForLoadBalancing" json:"drop_for_load_balancing,omitempty"` } func (m *Server) Reset() { *m = Server{} } func (m *Server) String() string { return proto.CompactTextString(m) } func (*Server) ProtoMessage() {} -func (*Server) Descriptor() ([]byte, []int) { return fileDescriptor0, []int{7} } +func (*Server) Descriptor() ([]byte, []int) { return fileDescriptor0, []int{8} } + +func (m *Server) GetIpAddress() []byte { + if m != nil { + return m.IpAddress + } + return nil +} + +func (m *Server) GetPort() int32 { + if m != nil { + return m.Port + } + return 0 +} + +func (m *Server) GetLoadBalanceToken() string { + if m != nil { + return m.LoadBalanceToken + } + return "" +} + +func (m *Server) GetDropForRateLimiting() bool { + if m != nil { + return m.DropForRateLimiting + } + return false +} + +func (m *Server) GetDropForLoadBalancing() bool { + if m != nil { + return m.DropForLoadBalancing + } + return false +} func init() { proto.RegisterType((*Duration)(nil), "grpc.lb.v1.Duration") + proto.RegisterType((*Timestamp)(nil), "grpc.lb.v1.Timestamp") proto.RegisterType((*LoadBalanceRequest)(nil), "grpc.lb.v1.LoadBalanceRequest") proto.RegisterType((*InitialLoadBalanceRequest)(nil), "grpc.lb.v1.InitialLoadBalanceRequest") proto.RegisterType((*ClientStats)(nil), "grpc.lb.v1.ClientStats") @@ -414,144 +576,54 @@ func init() { proto.RegisterType((*Server)(nil), "grpc.lb.v1.Server") } -// Reference imports to suppress errors if they are not otherwise used. -var _ context.Context -var _ grpc.ClientConn - -// This is a compile-time assertion to ensure that this generated file -// is compatible with the grpc package it is being compiled against. -const _ = grpc.SupportPackageIsVersion4 - -// Client API for LoadBalancer service - -type LoadBalancerClient interface { - // Bidirectional rpc to get a list of servers. - BalanceLoad(ctx context.Context, opts ...grpc.CallOption) (LoadBalancer_BalanceLoadClient, error) -} - -type loadBalancerClient struct { - cc *grpc.ClientConn -} - -func NewLoadBalancerClient(cc *grpc.ClientConn) LoadBalancerClient { - return &loadBalancerClient{cc} -} - -func (c *loadBalancerClient) BalanceLoad(ctx context.Context, opts ...grpc.CallOption) (LoadBalancer_BalanceLoadClient, error) { - stream, err := grpc.NewClientStream(ctx, &_LoadBalancer_serviceDesc.Streams[0], c.cc, "/grpc.lb.v1.LoadBalancer/BalanceLoad", opts...) - if err != nil { - return nil, err - } - x := &loadBalancerBalanceLoadClient{stream} - return x, nil -} - -type LoadBalancer_BalanceLoadClient interface { - Send(*LoadBalanceRequest) error - Recv() (*LoadBalanceResponse, error) - grpc.ClientStream -} - -type loadBalancerBalanceLoadClient struct { - grpc.ClientStream -} - -func (x *loadBalancerBalanceLoadClient) Send(m *LoadBalanceRequest) error { - return x.ClientStream.SendMsg(m) -} - -func (x *loadBalancerBalanceLoadClient) Recv() (*LoadBalanceResponse, error) { - m := new(LoadBalanceResponse) - if err := x.ClientStream.RecvMsg(m); err != nil { - return nil, err - } - return m, nil -} - -// Server API for LoadBalancer service - -type LoadBalancerServer interface { - // Bidirectional rpc to get a list of servers. - BalanceLoad(LoadBalancer_BalanceLoadServer) error -} - -func RegisterLoadBalancerServer(s *grpc.Server, srv LoadBalancerServer) { - s.RegisterService(&_LoadBalancer_serviceDesc, srv) -} - -func _LoadBalancer_BalanceLoad_Handler(srv interface{}, stream grpc.ServerStream) error { - return srv.(LoadBalancerServer).BalanceLoad(&loadBalancerBalanceLoadServer{stream}) -} - -type LoadBalancer_BalanceLoadServer interface { - Send(*LoadBalanceResponse) error - Recv() (*LoadBalanceRequest, error) - grpc.ServerStream -} - -type loadBalancerBalanceLoadServer struct { - grpc.ServerStream -} - -func (x *loadBalancerBalanceLoadServer) Send(m *LoadBalanceResponse) error { - return x.ServerStream.SendMsg(m) -} - -func (x *loadBalancerBalanceLoadServer) Recv() (*LoadBalanceRequest, error) { - m := new(LoadBalanceRequest) - if err := x.ServerStream.RecvMsg(m); err != nil { - return nil, err - } - return m, nil -} - -var _LoadBalancer_serviceDesc = grpc.ServiceDesc{ - ServiceName: "grpc.lb.v1.LoadBalancer", - HandlerType: (*LoadBalancerServer)(nil), - Methods: []grpc.MethodDesc{}, - Streams: []grpc.StreamDesc{ - { - StreamName: "BalanceLoad", - Handler: _LoadBalancer_BalanceLoad_Handler, - ServerStreams: true, - ClientStreams: true, - }, - }, - Metadata: "grpclb.proto", -} - func init() { proto.RegisterFile("grpclb.proto", fileDescriptor0) } var fileDescriptor0 = []byte{ - // 471 bytes of a gzipped FileDescriptorProto - 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x09, 0x6e, 0x88, 0x02, 0xff, 0x8c, 0x93, 0x51, 0x6f, 0xd3, 0x3e, - 0x14, 0xc5, 0x9b, 0x7f, 0xb7, 0xfd, 0xb7, 0x9b, 0xc0, 0xc6, 0xdd, 0x54, 0xda, 0x32, 0x8d, 0x2a, - 0x08, 0x54, 0x90, 0x08, 0x2c, 0xbc, 0xf1, 0x84, 0x0a, 0x0f, 0x45, 0xda, 0xd3, 0xf6, 0x86, 0x90, - 0x2c, 0x27, 0xb9, 0x9a, 0x2c, 0x82, 0x6d, 0x6c, 0xaf, 0x1a, 0xdf, 0x07, 0xf1, 0x39, 0x91, 0xe3, - 0x94, 0x64, 0x54, 0x15, 0xbc, 0xc5, 0xbe, 0x3e, 0xf7, 0x1e, 0xff, 0x7c, 0x02, 0xc9, 0xb5, 0xd1, - 0x65, 0x5d, 0x64, 0xda, 0x28, 0xa7, 0x10, 0xfc, 0x2a, 0xab, 0x8b, 0x6c, 0x75, 0x9e, 0xbe, 0x80, - 0xfd, 0x0f, 0x37, 0x86, 0x3b, 0xa1, 0x24, 0x1e, 0xc2, 0xff, 0x96, 0x4a, 0x25, 0x2b, 0x3b, 0x8e, - 0x66, 0xd1, 0x7c, 0x88, 0xf7, 0x60, 0x57, 0x72, 0xa9, 0xec, 0xf8, 0xbf, 0x59, 0x34, 0xdf, 0x4d, - 0x7f, 0x44, 0x80, 0x17, 0x8a, 0x57, 0x0b, 0x5e, 0x73, 0x59, 0xd2, 0x25, 0x7d, 0xbb, 0x21, 0xeb, - 0xf0, 0x1d, 0x1c, 0x0a, 0x29, 0x9c, 0xe0, 0x35, 0x33, 0x61, 0xab, 0x91, 0xc7, 0xf9, 0xd3, 0xac, - 0x1b, 0x94, 0x7d, 0x0c, 0x47, 0x36, 0xf5, 0xcb, 0x01, 0xbe, 0x82, 0xa4, 0xac, 0x05, 0x49, 0xc7, - 0xac, 0xe3, 0x2e, 0x8c, 0x8b, 0xf3, 0x87, 0x7d, 0xf9, 0xfb, 0xa6, 0x7e, 0xe5, 0xcb, 0xcb, 0xc1, - 0xe2, 0x11, 0x4c, 0x6a, 0xc5, 0x2b, 0x56, 0x84, 0x4e, 0xeb, 0xb9, 0xcc, 0x7d, 0xd7, 0x94, 0x3e, - 0x87, 0xc9, 0xd6, 0x61, 0x98, 0xc0, 0x8e, 0xe4, 0x5f, 0xa9, 0x71, 0x78, 0x90, 0x7e, 0x82, 0xb8, - 0xd7, 0x18, 0x47, 0x70, 0xdf, 0x29, 0xd7, 0xdd, 0x63, 0xcd, 0x61, 0x02, 0x0f, 0x5a, 0x7f, 0x46, - 0x97, 0x8c, 0x8c, 0x51, 0x26, 0x98, 0x1c, 0xe2, 0x18, 0x8e, 0x2a, 0xa3, 0xb4, 0xa6, 0xaa, 0x13, - 0x0d, 0x7d, 0x25, 0xfd, 0x19, 0xc1, 0xf1, 0x1d, 0x03, 0x56, 0x2b, 0x69, 0x09, 0x17, 0x70, 0xd4, - 0xe1, 0x0a, 0x7b, 0x2d, 0xaf, 0x67, 0x7f, 0xe3, 0x15, 0x4e, 0x2f, 0x07, 0xf8, 0x12, 0x62, 0x4b, - 0x66, 0x45, 0x86, 0xd5, 0xc2, 0xba, 0x96, 0xd7, 0xa8, 0x2f, 0xbf, 0x6a, 0xca, 0x17, 0xc2, 0xf3, - 0x5d, 0x9c, 0xc2, 0xf4, 0x0f, 0x5c, 0xa1, 0x53, 0xe0, 0x75, 0x0b, 0xd3, 0xed, 0xc3, 0xf0, 0x0c, - 0x46, 0x7d, 0xad, 0x61, 0x15, 0xd5, 0x74, 0xcd, 0x5d, 0x8b, 0x10, 0xdf, 0xc2, 0x69, 0xff, 0xed, - 0x98, 0x21, 0xad, 0x8c, 0x63, 0x42, 0x3a, 0x32, 0x2b, 0x5e, 0x37, 0x30, 0xe2, 0xfc, 0xa4, 0xef, - 0x6d, 0x1d, 0xb8, 0xb4, 0x02, 0xe8, 0x7c, 0xe2, 0x13, 0x1f, 0x3f, 0xbf, 0xf2, 0xd8, 0x87, 0xf3, - 0x38, 0xc7, 0xcd, 0x0b, 0xe1, 0x39, 0x1c, 0xd3, 0xad, 0x16, 0xa1, 0xc1, 0xbf, 0x4d, 0xf9, 0x0c, - 0x7b, 0xad, 0x18, 0x01, 0x84, 0x66, 0xbc, 0xaa, 0x0c, 0xd9, 0xf0, 0xb6, 0x89, 0x0f, 0x84, 0x37, - 0x1c, 0x22, 0x8e, 0x53, 0xc0, 0x3b, 0xa4, 0x9c, 0xfa, 0x42, 0xb2, 0xe9, 0x7e, 0x80, 0x27, 0x90, - 0xf8, 0xa7, 0xfe, 0x1d, 0xf2, 0x9d, 0x59, 0x34, 0xdf, 0xcf, 0x0b, 0x48, 0x7a, 0xd8, 0x0c, 0x5e, - 0x42, 0xdc, 0x7e, 0xfb, 0x6d, 0x3c, 0xeb, 0x5b, 0xda, 0xcc, 0xe3, 0xf4, 0xf1, 0xd6, 0x7a, 0xe0, - 0x3f, 0x8f, 0x5e, 0x47, 0xc5, 0x5e, 0xf3, 0xdf, 0xbe, 0xf9, 0x15, 0x00, 0x00, 0xff, 0xff, 0x01, - 0x8b, 0xc9, 0x26, 0xc7, 0x03, 0x00, 0x00, + // 733 bytes of a gzipped FileDescriptorProto + 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0x94, 0x55, 0xdd, 0x4e, 0x1b, 0x39, + 0x14, 0x66, 0x36, 0xfc, 0xe5, 0x24, 0x5a, 0x58, 0x93, 0x85, 0xc0, 0xc2, 0x2e, 0x1b, 0xa9, 0x34, + 0xaa, 0x68, 0x68, 0x43, 0x7b, 0xd1, 0x9f, 0x9b, 0x02, 0x45, 0x41, 0xe5, 0xa2, 0x72, 0xa8, 0x7a, + 0x55, 0x59, 0x4e, 0xc6, 0x80, 0xc5, 0xc4, 0x9e, 0xda, 0x4e, 0x68, 0x2f, 0x7b, 0xd9, 0x47, 0xe9, + 0x63, 0x54, 0x7d, 0x86, 0xbe, 0x4f, 0x65, 0x7b, 0x26, 0x33, 0x90, 0x1f, 0xd4, 0xbb, 0xf1, 0xf1, + 0x77, 0xbe, 0xf3, 0xf9, 0xd8, 0xdf, 0x19, 0x28, 0x5f, 0xa8, 0xb8, 0x1b, 0x75, 0x1a, 0xb1, 0x92, + 0x46, 0x22, 0xb0, 0xab, 0x46, 0xd4, 0x69, 0x0c, 0x1e, 0xd7, 0x9e, 0xc3, 0xe2, 0x51, 0x5f, 0x51, + 0xc3, 0xa5, 0x40, 0x55, 0x58, 0xd0, 0xac, 0x2b, 0x45, 0xa8, 0xab, 0xc1, 0x76, 0x50, 0x2f, 0xe0, + 0x74, 0x89, 0x2a, 0x30, 0x27, 0xa8, 0x90, 0xba, 0xfa, 0xc7, 0x76, 0x50, 0x9f, 0xc3, 0x7e, 0x51, + 0x7b, 0x01, 0xc5, 0x33, 0xde, 0x63, 0xda, 0xd0, 0x5e, 0xfc, 0xdb, 0xc9, 0xdf, 0x03, 0x40, 0xa7, + 0x92, 0x86, 0x07, 0x34, 0xa2, 0xa2, 0xcb, 0x30, 0xfb, 0xd8, 0x67, 0xda, 0xa0, 0xb7, 0xb0, 0xc4, + 0x05, 0x37, 0x9c, 0x46, 0x44, 0xf9, 0x90, 0xa3, 0x2b, 0x35, 0xef, 0x35, 0x32, 0xd5, 0x8d, 0x13, + 0x0f, 0x19, 0xcd, 0x6f, 0xcd, 0xe0, 0x3f, 0x93, 0xfc, 0x94, 0xf1, 0x25, 0x94, 0xbb, 0x11, 0x67, + 0xc2, 0x10, 0x6d, 0xa8, 0xf1, 0x2a, 0x4a, 0xcd, 0xb5, 0x3c, 0xdd, 0xa1, 0xdb, 0x6f, 0xdb, 0xed, + 0xd6, 0x0c, 0x2e, 0x75, 0xb3, 0xe5, 0xc1, 0x3f, 0xb0, 0x1e, 0x49, 0x1a, 0x92, 0x8e, 0x2f, 0x93, + 0x8a, 0x22, 0xe6, 0x73, 0xcc, 0x6a, 0x7b, 0xb0, 0x3e, 0x51, 0x09, 0x42, 0x30, 0x2b, 0x68, 0x8f, + 0x39, 0xf9, 0x45, 0xec, 0xbe, 0x6b, 0x5f, 0x67, 0xa1, 0x94, 0x2b, 0x86, 0xf6, 0xa1, 0x68, 0xd2, + 0x0e, 0x26, 0xe7, 0xfc, 0x3b, 0x2f, 0x6c, 0xd8, 0x5e, 0x9c, 0xe1, 0xd0, 0x03, 0xf8, 0x4b, 0xf4, + 0x7b, 0xa4, 0x4b, 0xa3, 0x48, 0xdb, 0x33, 0x29, 0xc3, 0x42, 0x77, 0xaa, 0x02, 0x5e, 0x12, 0xfd, + 0xde, 0xa1, 0x8d, 0xb7, 0x7d, 0x18, 0xed, 0x02, 0xca, 0xb0, 0xe7, 0x5c, 0x70, 0x7d, 0xc9, 0xc2, + 0x6a, 0xc1, 0x81, 0x97, 0x53, 0xf0, 0x71, 0x12, 0x47, 0x04, 0x1a, 0xa3, 0x68, 0x72, 0xcd, 0xcd, + 0x25, 0x09, 0x95, 0x8c, 0xc9, 0xb9, 0x54, 0x44, 0x51, 0xc3, 0x48, 0xc4, 0x7b, 0xdc, 0x70, 0x71, + 0x51, 0x9d, 0x75, 0x4c, 0xf7, 0x6f, 0x33, 0xbd, 0xe7, 0xe6, 0xf2, 0x48, 0xc9, 0xf8, 0x58, 0x2a, + 0x4c, 0x0d, 0x3b, 0x4d, 0xe0, 0x88, 0xc2, 0xde, 0x9d, 0x05, 0x72, 0xed, 0xb6, 0x15, 0xe6, 0x5c, + 0x85, 0xfa, 0x94, 0x0a, 0x59, 0xef, 0x6d, 0x89, 0x0f, 0xf0, 0x70, 0x52, 0x89, 0xe4, 0x19, 0x9c, + 0x53, 0x1e, 0xb1, 0x90, 0x18, 0x49, 0x34, 0x13, 0x61, 0x75, 0xde, 0x15, 0xd8, 0x19, 0x57, 0xc0, + 0x5f, 0xd5, 0xb1, 0xc3, 0x9f, 0xc9, 0x36, 0x13, 0x21, 0x6a, 0xc1, 0xff, 0x63, 0xe8, 0xaf, 0x84, + 0xbc, 0x16, 0x44, 0xb1, 0x2e, 0xe3, 0x03, 0x16, 0x56, 0x17, 0x1c, 0xe5, 0xd6, 0x6d, 0xca, 0x37, + 0x16, 0x85, 0x13, 0x50, 0xed, 0x47, 0x00, 0x2b, 0x37, 0x9e, 0x8d, 0x8e, 0xa5, 0xd0, 0x0c, 0xb5, + 0x61, 0x39, 0x73, 0x80, 0x8f, 0x25, 0x4f, 0x63, 0xe7, 0x2e, 0x0b, 0x78, 0x74, 0x6b, 0x06, 0x2f, + 0x0d, 0x3d, 0x90, 0x90, 0x3e, 0x83, 0x92, 0x66, 0x6a, 0xc0, 0x14, 0x89, 0xb8, 0x36, 0x89, 0x07, + 0x56, 0xf3, 0x7c, 0x6d, 0xb7, 0x7d, 0xca, 0x9d, 0x87, 0x40, 0x0f, 0x57, 0x07, 0x9b, 0xb0, 0x71, + 0xcb, 0x01, 0x9e, 0xd3, 0x5b, 0xe0, 0x5b, 0x00, 0x1b, 0x93, 0xa5, 0xa0, 0x27, 0xb0, 0x9a, 0x4f, + 0x56, 0x24, 0x64, 0x11, 0xbb, 0xa0, 0x26, 0xb5, 0x45, 0x25, 0xca, 0x92, 0xd4, 0x51, 0xb2, 0x87, + 0xde, 0xc1, 0x66, 0xde, 0xb2, 0x44, 0xb1, 0x58, 0x2a, 0x43, 0xb8, 0x30, 0x4c, 0x0d, 0x68, 0x94, + 0xc8, 0xaf, 0xe4, 0xe5, 0xa7, 0x43, 0x0c, 0xaf, 0xe7, 0xdc, 0x8b, 0x5d, 0xde, 0x49, 0x92, 0x56, + 0xfb, 0x12, 0x00, 0x64, 0xc7, 0x44, 0xbb, 0x76, 0x62, 0xd9, 0x95, 0x9d, 0x58, 0x85, 0x7a, 0xa9, + 0x89, 0x46, 0xfb, 0x81, 0x53, 0x08, 0x7a, 0x0d, 0x2b, 0xec, 0x53, 0xcc, 0x7d, 0x95, 0x4c, 0x4a, + 0x61, 0x8a, 0x14, 0x94, 0x25, 0x0c, 0x35, 0xfc, 0x0c, 0x60, 0xde, 0x53, 0xa3, 0x2d, 0x00, 0x1e, + 0x13, 0x1a, 0x86, 0x8a, 0x69, 0x3f, 0x34, 0xcb, 0xb8, 0xc8, 0xe3, 0x57, 0x3e, 0x60, 0xe7, 0x87, + 0x55, 0x9f, 0x4c, 0x4d, 0xf7, 0x6d, 0xed, 0x7c, 0xe3, 0x2e, 0x8c, 0xbc, 0x62, 0xc2, 0x69, 0x28, + 0xe2, 0xe5, 0x5c, 0x2b, 0xcf, 0x6c, 0x1c, 0xed, 0xc3, 0xea, 0x14, 0xdb, 0x2e, 0xe2, 0x95, 0x70, + 0x8c, 0x45, 0x9f, 0xc2, 0xda, 0x34, 0x2b, 0x2e, 0xe2, 0x4a, 0x38, 0xc6, 0x76, 0xcd, 0x0e, 0x94, + 0x73, 0xf7, 0xaf, 0x10, 0x86, 0x52, 0xf2, 0x6d, 0xc3, 0xe8, 0xdf, 0x7c, 0x83, 0x46, 0x87, 0xe5, + 0xc6, 0x7f, 0x13, 0xf7, 0xfd, 0x43, 0xaa, 0x07, 0x8f, 0x82, 0xce, 0xbc, 0xfb, 0x7d, 0xed, 0xff, + 0x0a, 0x00, 0x00, 0xff, 0xff, 0x64, 0xbf, 0xda, 0x5e, 0xce, 0x06, 0x00, 0x00, } diff --git a/grpclb/grpc_lb_v1/grpclb.proto b/grpclb/grpc_lb_v1/grpclb.proto index 13219c6f..a2502fb2 100644 --- a/grpclb/grpc_lb_v1/grpclb.proto +++ b/grpclb/grpc_lb_v1/grpclb.proto @@ -1,3 +1,32 @@ +// Copyright 2016, Google Inc. +// All rights reserved. +// +// Redistribution and use in source and binary forms, with or without +// modification, are permitted provided that the following conditions are +// met: +// +// * Redistributions of source code must retain the above copyright +// notice, this list of conditions and the following disclaimer. +// * Redistributions in binary form must reproduce the above +// copyright notice, this list of conditions and the following disclaimer +// in the documentation and/or other materials provided with the +// distribution. +// * Neither the name of Google Inc. nor the names of its +// contributors may be used to endorse or promote products derived from +// this software without specific prior written permission. +// +// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS +// "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT +// LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR +// A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT +// OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, +// SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT +// LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, +// DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY +// THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT +// (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE +// OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + syntax = "proto3"; package grpc.lb.v1; @@ -16,6 +45,20 @@ message Duration { int32 nanos = 2; } +message Timestamp { + + // Represents seconds of UTC time since Unix epoch + // 1970-01-01T00:00:00Z. Must be from 0001-01-01T00:00:00Z to + // 9999-12-31T23:59:59Z inclusive. + int64 seconds = 1; + + // Non-negative fractions of a second at nanosecond resolution. Negative + // second values with fractions must still have non-negative nanos values + // that count forward in time. Must be from 0 to 999,999,999 + // inclusive. + int32 nanos = 2; +} + service LoadBalancer { // Bidirectional rpc to get a list of servers. rpc BalanceLoad(stream LoadBalanceRequest) @@ -34,21 +77,37 @@ message LoadBalanceRequest { } message InitialLoadBalanceRequest { - // Name of load balanced service (IE, service.grpc.gslb.google.com) + // Name of load balanced service (IE, balancer.service.com) + // length should be less than 256 bytes. string name = 1; } // Contains client level statistics that are useful to load balancing. Each -// count should be reset to zero after reporting the stats. +// count except the timestamp should be reset to zero after reporting the stats. message ClientStats { - // The total number of requests sent by the client since the last report. - int64 total_requests = 1; + // The timestamp of generating the report. + Timestamp timestamp = 1; - // The number of client rpc errors since the last report. - int64 client_rpc_errors = 2; + // The total number of RPCs that started. + int64 num_calls_started = 2; - // The number of dropped requests since the last report. - int64 dropped_requests = 3; + // The total number of RPCs that finished. + int64 num_calls_finished = 3; + + // The total number of RPCs that were dropped by the client because of rate + // limiting. + int64 num_calls_finished_with_drop_for_rate_limiting = 4; + + // The total number of RPCs that were dropped by the client because of load + // balancing. + int64 num_calls_finished_with_drop_for_load_balancing = 5; + + // The total number of RPCs that failed to reach a server except dropped RPCs. + int64 num_calls_finished_with_client_failed_to_send = 6; + + // The total number of RPCs that finished and are known to have been received + // by a server. + int64 num_calls_finished_known_received = 7; } message LoadBalanceResponse { @@ -66,13 +125,14 @@ message InitialLoadBalanceResponse { // This is an application layer redirect that indicates the client should use // the specified server for load balancing. When this field is non-empty in // the response, the client should open a separate connection to the - // load_balancer_delegate and call the BalanceLoad method. + // load_balancer_delegate and call the BalanceLoad method. Its length should + // be less than 64 bytes. string load_balancer_delegate = 1; // This interval defines how often the client should send the client stats // to the load balancer. Stats should only be reported when the duration is // positive. - Duration client_stats_report_interval = 3; + Duration client_stats_report_interval = 2; } message ServerList { @@ -89,6 +149,10 @@ message ServerList { Duration expiration_interval = 3; } +// Contains server information. When none of the [drop_for_*] fields are true, +// use the other fields. When drop_for_rate_limiting is true, ignore all other +// fields. Use drop_for_load_balancing only when it is true and +// drop_for_rate_limiting is false. message Server { // A resolved address for the server, serialized in network-byte-order. It may // either be an IPv4 or IPv6 address. @@ -101,9 +165,15 @@ message Server { // frontend requests for that pick must include the token in its initial // metadata. The token is used by the backend to verify the request and to // allow the backend to report load to the gRPC LB system. + // + // Its length is variable but less than 50 bytes. string load_balance_token = 3; // Indicates whether this particular request should be dropped by the client - // when this server is chosen from the list. - bool drop_request = 4; + // for rate limiting. + bool drop_for_rate_limiting = 4; + + // Indicates whether this particular request should be dropped by the client + // for load balancing. + bool drop_for_load_balancing = 5; } diff --git a/grpclb/grpclb_server_generated.go b/grpclb/grpclb_server_generated.go new file mode 100644 index 00000000..65ce6360 --- /dev/null +++ b/grpclb/grpclb_server_generated.go @@ -0,0 +1,54 @@ +// This file contains the generated server side code. +// It's only used for grpclb testing. + +package grpclb + +import ( + "google.golang.org/grpc" + lbpb "google.golang.org/grpc/grpclb/grpc_lb_v1" +) + +// Server API for LoadBalancer service + +type loadBalancerServer interface { + // Bidirectional rpc to get a list of servers. + BalanceLoad(*loadBalancerBalanceLoadServer) error +} + +func registerLoadBalancerServer(s *grpc.Server, srv loadBalancerServer) { + s.RegisterService( + &grpc.ServiceDesc{ + ServiceName: "grpc.lb.v1.LoadBalancer", + HandlerType: (*loadBalancerServer)(nil), + Methods: []grpc.MethodDesc{}, + Streams: []grpc.StreamDesc{ + { + StreamName: "BalanceLoad", + Handler: balanceLoadHandler, + ServerStreams: true, + ClientStreams: true, + }, + }, + Metadata: "grpclb.proto", + }, srv) +} + +func balanceLoadHandler(srv interface{}, stream grpc.ServerStream) error { + return srv.(loadBalancerServer).BalanceLoad(&loadBalancerBalanceLoadServer{stream}) +} + +type loadBalancerBalanceLoadServer struct { + grpc.ServerStream +} + +func (x *loadBalancerBalanceLoadServer) Send(m *lbpb.LoadBalanceResponse) error { + return x.ServerStream.SendMsg(m) +} + +func (x *loadBalancerBalanceLoadServer) Recv() (*lbpb.LoadBalanceRequest, error) { + m := new(lbpb.LoadBalanceRequest) + if err := x.ServerStream.RecvMsg(m); err != nil { + return nil, err + } + return m, nil +} diff --git a/grpclb/grpclb_test.go b/grpclb/grpclb_test.go index f6115b28..ee54ead6 100644 --- a/grpclb/grpclb_test.go +++ b/grpclb/grpclb_test.go @@ -114,8 +114,8 @@ func (r *testNameResolver) Resolve(target string) (naming.Watcher, error) { r.w.update <- &naming.Update{ Op: naming.Add, Addr: addr, - Metadata: &Metadata{ - AddrType: GRPCLB, + Metadata: &grpc.AddrMetadataGRPCLB{ + AddrType: grpc.GRPCLB, ServerName: lbsn, }, } @@ -187,7 +187,7 @@ func (b *remoteBalancer) stop() { close(b.done) } -func (b *remoteBalancer) BalanceLoad(stream lbpb.LoadBalancer_BalanceLoadServer) error { +func (b *remoteBalancer) BalanceLoad(stream *loadBalancerBalanceLoadServer) error { req, err := stream.Recv() if err != nil { return err @@ -305,7 +305,7 @@ func newLoadBalancer(numberOfBackends int) (tss *testServers, cleanup func(), er return } ls = newRemoteBalancer(nil, nil) - lbpb.RegisterLoadBalancerServer(lb, ls) + registerLoadBalancerServer(lb, ls) go func() { lb.Serve(lbLis) }() @@ -350,7 +350,7 @@ func TestGRPCLB(t *testing.T) { expected: besn, } ctx, _ := context.WithTimeout(context.Background(), 10*time.Second) - cc, err := grpc.DialContext(ctx, besn, grpc.WithBalancer(Balancer(&testNameResolver{ + cc, err := grpc.DialContext(ctx, besn, grpc.WithBalancer(grpc.NewGRPCLBBalancer(&testNameResolver{ addrs: []string{tss.lbAddr}, })), grpc.WithBlock(), grpc.WithTransportCredentials(&creds)) if err != nil { @@ -371,15 +371,15 @@ func TestDropRequest(t *testing.T) { defer cleanup() tss.ls.sls = []*lbpb.ServerList{{ Servers: []*lbpb.Server{{ - IpAddress: tss.beIPs[0], - Port: int32(tss.bePorts[0]), - LoadBalanceToken: lbToken, - DropRequest: true, + IpAddress: tss.beIPs[0], + Port: int32(tss.bePorts[0]), + LoadBalanceToken: lbToken, + DropForLoadBalancing: true, }, { - IpAddress: tss.beIPs[1], - Port: int32(tss.bePorts[1]), - LoadBalanceToken: lbToken, - DropRequest: false, + IpAddress: tss.beIPs[1], + Port: int32(tss.bePorts[1]), + LoadBalanceToken: lbToken, + DropForLoadBalancing: false, }}, }} tss.ls.intervals = []time.Duration{0} @@ -387,7 +387,7 @@ func TestDropRequest(t *testing.T) { expected: besn, } ctx, _ := context.WithTimeout(context.Background(), 10*time.Second) - cc, err := grpc.DialContext(ctx, besn, grpc.WithBalancer(Balancer(&testNameResolver{ + cc, err := grpc.DialContext(ctx, besn, grpc.WithBalancer(grpc.NewGRPCLBBalancer(&testNameResolver{ addrs: []string{tss.lbAddr}, })), grpc.WithBlock(), grpc.WithTransportCredentials(&creds)) if err != nil { @@ -395,12 +395,12 @@ func TestDropRequest(t *testing.T) { } helloC := hwpb.NewGreeterClient(cc) // The 1st, non-fail-fast RPC should succeed. This ensures both server - // connections are made, because the first one has DropRequest set to true. + // connections are made, because the first one has DropForLoadBalancing set to true. if _, err := helloC.SayHello(context.Background(), &hwpb.HelloRequest{Name: "grpc"}, grpc.FailFast(false)); err != nil { t.Fatalf("%v.SayHello(_, _) = _, %v, want _, ", helloC, err) } for i := 0; i < 3; i++ { - // Odd fail-fast RPCs should fail, because the 1st backend has DropRequest + // Odd fail-fast RPCs should fail, because the 1st backend has DropForLoadBalancing // set to true. if _, err := helloC.SayHello(context.Background(), &hwpb.HelloRequest{Name: "grpc"}); grpc.Code(err) != codes.Unavailable { t.Fatalf("%v.SayHello(_, _) = _, %v, want _, %s", helloC, err, codes.Unavailable) @@ -421,10 +421,10 @@ func TestDropRequestFailedNonFailFast(t *testing.T) { } defer cleanup() be := &lbpb.Server{ - IpAddress: tss.beIPs[0], - Port: int32(tss.bePorts[0]), - LoadBalanceToken: lbToken, - DropRequest: true, + IpAddress: tss.beIPs[0], + Port: int32(tss.bePorts[0]), + LoadBalanceToken: lbToken, + DropForLoadBalancing: true, } var bes []*lbpb.Server bes = append(bes, be) @@ -437,7 +437,7 @@ func TestDropRequestFailedNonFailFast(t *testing.T) { expected: besn, } ctx, _ := context.WithTimeout(context.Background(), 10*time.Second) - cc, err := grpc.DialContext(ctx, besn, grpc.WithBalancer(Balancer(&testNameResolver{ + cc, err := grpc.DialContext(ctx, besn, grpc.WithBalancer(grpc.NewGRPCLBBalancer(&testNameResolver{ addrs: []string{tss.lbAddr}, })), grpc.WithBlock(), grpc.WithTransportCredentials(&creds)) if err != nil { @@ -487,7 +487,7 @@ func TestServerExpiration(t *testing.T) { expected: besn, } ctx, _ := context.WithTimeout(context.Background(), 10*time.Second) - cc, err := grpc.DialContext(ctx, besn, grpc.WithBalancer(Balancer(&testNameResolver{ + cc, err := grpc.DialContext(ctx, besn, grpc.WithBalancer(grpc.NewGRPCLBBalancer(&testNameResolver{ addrs: []string{tss.lbAddr}, })), grpc.WithBlock(), grpc.WithTransportCredentials(&creds)) if err != nil { @@ -547,7 +547,7 @@ func TestBalancerDisconnects(t *testing.T) { resolver := &testNameResolver{ addrs: lbAddrs[:2], } - cc, err := grpc.DialContext(ctx, besn, grpc.WithBalancer(Balancer(resolver)), grpc.WithBlock(), grpc.WithTransportCredentials(&creds)) + cc, err := grpc.DialContext(ctx, besn, grpc.WithBalancer(grpc.NewGRPCLBBalancer(resolver)), grpc.WithBlock(), grpc.WithTransportCredentials(&creds)) if err != nil { t.Fatalf("Failed to dial to the backend %v", err) } @@ -576,8 +576,8 @@ func TestBalancerDisconnects(t *testing.T) { resolver.inject([]*naming.Update{ {Op: naming.Add, Addr: lbAddrs[2], - Metadata: &Metadata{ - AddrType: GRPCLB, + Metadata: &grpc.AddrMetadataGRPCLB{ + AddrType: grpc.GRPCLB, ServerName: lbsn, }, },