Update grpclb proto and move grpclb into package grpc (#1186)

This commit is contained in:
Menghan Li
2017-04-25 10:51:43 -07:00
committed by GitHub
parent 38df39bad1
commit c73e0165df
5 changed files with 485 additions and 252 deletions

View File

@ -31,10 +31,7 @@
* *
*/ */
// Package grpclb implements the load balancing protocol defined at package grpc
// https://github.com/grpc/grpc/blob/master/doc/load-balancing.md.
// The implementation is currently EXPERIMENTAL.
package grpclb
import ( import (
"errors" "errors"
@ -45,7 +42,6 @@ import (
"time" "time"
"golang.org/x/net/context" "golang.org/x/net/context"
"google.golang.org/grpc"
"google.golang.org/grpc/codes" "google.golang.org/grpc/codes"
lbpb "google.golang.org/grpc/grpclb/grpc_lb_v1" lbpb "google.golang.org/grpc/grpclb/grpc_lb_v1"
"google.golang.org/grpc/grpclog" "google.golang.org/grpc/grpclog"
@ -53,6 +49,43 @@ import (
"google.golang.org/grpc/naming" "google.golang.org/grpc/naming"
) )
// Client API for LoadBalancer service.
// Mostly copied from generated pb.go file.
// To avoid circular dependency.
type loadBalancerClient struct {
cc *ClientConn
}
func (c *loadBalancerClient) BalanceLoad(ctx context.Context, opts ...CallOption) (*balanceLoadClientStream, error) {
desc := &StreamDesc{
StreamName: "BalanceLoad",
ServerStreams: true,
ClientStreams: true,
}
stream, err := NewClientStream(ctx, desc, c.cc, "/grpc.lb.v1.LoadBalancer/BalanceLoad", opts...)
if err != nil {
return nil, err
}
x := &balanceLoadClientStream{stream}
return x, nil
}
type balanceLoadClientStream struct {
ClientStream
}
func (x *balanceLoadClientStream) Send(m *lbpb.LoadBalanceRequest) error {
return x.ClientStream.SendMsg(m)
}
func (x *balanceLoadClientStream) Recv() (*lbpb.LoadBalanceResponse, error) {
m := new(lbpb.LoadBalanceResponse)
if err := x.ClientStream.RecvMsg(m); err != nil {
return nil, err
}
return m, nil
}
// AddressType indicates the address type returned by name resolution. // AddressType indicates the address type returned by name resolution.
type AddressType uint8 type AddressType uint8
@ -63,18 +96,18 @@ const (
GRPCLB GRPCLB
) )
// Metadata contains the information the name resolution for grpclb should provide. The // AddrMetadataGRPCLB contains the information the name resolution for grpclb should provide. The
// name resolver used by grpclb balancer is required to provide this type of metadata in // name resolver used by grpclb balancer is required to provide this type of metadata in
// its address updates. // its address updates.
type Metadata struct { type AddrMetadataGRPCLB struct {
// AddrType is the type of server (grpc load balancer or backend). // AddrType is the type of server (grpc load balancer or backend).
AddrType AddressType AddrType AddressType
// ServerName is the name of the grpc load balancer. Used for authentication. // ServerName is the name of the grpc load balancer. Used for authentication.
ServerName string ServerName string
} }
// Balancer creates a grpclb load balancer. // NewGRPCLBBalancer creates a grpclb load balancer.
func Balancer(r naming.Resolver) grpc.Balancer { func NewGRPCLBBalancer(r naming.Resolver) Balancer {
return &balancer{ return &balancer{
r: r, r: r,
} }
@ -86,13 +119,16 @@ type remoteBalancerInfo struct {
name string name string
} }
// addrInfo consists of the information of a backend server. // grpclbAddrInfo consists of the information of a backend server.
type addrInfo struct { type grpclbAddrInfo struct {
addr grpc.Address addr Address
connected bool connected bool
// dropRequest indicates whether a particular RPC which chooses this address // dropForRateLimiting indicates whether this particular request should be
// should be dropped. // dropped by the client for rate limiting.
dropRequest bool dropForRateLimiting bool
// dropForLoadBalancing indicates whether this particular request should be
// dropped by the client for load balancing.
dropForLoadBalancing bool
} }
type balancer struct { type balancer struct {
@ -101,9 +137,9 @@ type balancer struct {
mu sync.Mutex mu sync.Mutex
seq int // a sequence number to make sure addrCh does not get stale addresses. seq int // a sequence number to make sure addrCh does not get stale addresses.
w naming.Watcher w naming.Watcher
addrCh chan []grpc.Address addrCh chan []Address
rbs []remoteBalancerInfo rbs []remoteBalancerInfo
addrs []*addrInfo addrs []*grpclbAddrInfo
next int next int
waitCh chan struct{} waitCh chan struct{}
done bool done bool
@ -119,7 +155,7 @@ func (b *balancer) watchAddrUpdates(w naming.Watcher, ch chan []remoteBalancerIn
b.mu.Lock() b.mu.Lock()
defer b.mu.Unlock() defer b.mu.Unlock()
if b.done { if b.done {
return grpc.ErrClientConnClosing return ErrClientConnClosing
} }
for _, update := range updates { for _, update := range updates {
switch update.Op { switch update.Op {
@ -135,7 +171,7 @@ func (b *balancer) watchAddrUpdates(w naming.Watcher, ch chan []remoteBalancerIn
if exist { if exist {
continue continue
} }
md, ok := update.Metadata.(*Metadata) md, ok := update.Metadata.(*AddrMetadataGRPCLB)
if !ok { if !ok {
// TODO: Revisit the handling here and may introduce some fallback mechanism. // TODO: Revisit the handling here and may introduce some fallback mechanism.
grpclog.Printf("The name resolution contains unexpected metadata %v", update.Metadata) grpclog.Printf("The name resolution contains unexpected metadata %v", update.Metadata)
@ -206,18 +242,19 @@ func (b *balancer) processServerList(l *lbpb.ServerList, seq int) {
servers := l.GetServers() servers := l.GetServers()
expiration := convertDuration(l.GetExpirationInterval()) expiration := convertDuration(l.GetExpirationInterval())
var ( var (
sl []*addrInfo sl []*grpclbAddrInfo
addrs []grpc.Address addrs []Address
) )
for _, s := range servers { for _, s := range servers {
md := metadata.Pairs("lb-token", s.LoadBalanceToken) md := metadata.Pairs("lb-token", s.LoadBalanceToken)
addr := grpc.Address{ addr := Address{
Addr: fmt.Sprintf("%s:%d", net.IP(s.IpAddress), s.Port), Addr: fmt.Sprintf("%s:%d", net.IP(s.IpAddress), s.Port),
Metadata: &md, Metadata: &md,
} }
sl = append(sl, &addrInfo{ sl = append(sl, &grpclbAddrInfo{
addr: addr, addr: addr,
dropRequest: s.DropRequest, dropForRateLimiting: s.DropForRateLimiting,
dropForLoadBalancing: s.DropForLoadBalancing,
}) })
addrs = append(addrs, addr) addrs = append(addrs, addr)
} }
@ -244,7 +281,7 @@ func (b *balancer) processServerList(l *lbpb.ServerList, seq int) {
return return
} }
func (b *balancer) callRemoteBalancer(lbc lbpb.LoadBalancerClient, seq int) (retry bool) { func (b *balancer) callRemoteBalancer(lbc *loadBalancerClient, seq int) (retry bool) {
ctx, cancel := context.WithCancel(context.Background()) ctx, cancel := context.WithCancel(context.Background())
defer cancel() defer cancel()
stream, err := lbc.BalanceLoad(ctx) stream, err := lbc.BalanceLoad(ctx)
@ -306,7 +343,7 @@ func (b *balancer) callRemoteBalancer(lbc lbpb.LoadBalancerClient, seq int) (ret
return true return true
} }
func (b *balancer) Start(target string, config grpc.BalancerConfig) error { func (b *balancer) Start(target string, config BalancerConfig) error {
b.rand = rand.New(rand.NewSource(time.Now().Unix())) b.rand = rand.New(rand.NewSource(time.Now().Unix()))
// TODO: Fall back to the basic direct connection if there is no name resolver. // TODO: Fall back to the basic direct connection if there is no name resolver.
if b.r == nil { if b.r == nil {
@ -316,9 +353,9 @@ func (b *balancer) Start(target string, config grpc.BalancerConfig) error {
b.mu.Lock() b.mu.Lock()
if b.done { if b.done {
b.mu.Unlock() b.mu.Unlock()
return grpc.ErrClientConnClosing return ErrClientConnClosing
} }
b.addrCh = make(chan []grpc.Address) b.addrCh = make(chan []Address)
w, err := b.r.Resolve(target) w, err := b.r.Resolve(target)
if err != nil { if err != nil {
b.mu.Unlock() b.mu.Unlock()
@ -340,7 +377,7 @@ func (b *balancer) Start(target string, config grpc.BalancerConfig) error {
// Spawn a goroutine to talk to the remote load balancer. // Spawn a goroutine to talk to the remote load balancer.
go func() { go func() {
var ( var (
cc *grpc.ClientConn cc *ClientConn
// ccError is closed when there is an error in the current cc. // ccError is closed when there is an error in the current cc.
// A new rb should be picked from rbs and connected. // A new rb should be picked from rbs and connected.
ccError chan struct{} ccError chan struct{}
@ -419,7 +456,7 @@ func (b *balancer) Start(target string, config grpc.BalancerConfig) error {
creds := config.DialCreds creds := config.DialCreds
ccError = make(chan struct{}) ccError = make(chan struct{})
if creds == nil { if creds == nil {
cc, err = grpc.Dial(rb.addr, grpc.WithInsecure()) cc, err = Dial(rb.addr, WithInsecure())
} else { } else {
if rb.name != "" { if rb.name != "" {
if err := creds.OverrideServerName(rb.name); err != nil { if err := creds.OverrideServerName(rb.name); err != nil {
@ -427,7 +464,7 @@ func (b *balancer) Start(target string, config grpc.BalancerConfig) error {
continue continue
} }
} }
cc, err = grpc.Dial(rb.addr, grpc.WithTransportCredentials(creds)) cc, err = Dial(rb.addr, WithTransportCredentials(creds))
} }
if err != nil { if err != nil {
grpclog.Printf("Failed to setup a connection to the remote balancer %v: %v", rb.addr, err) grpclog.Printf("Failed to setup a connection to the remote balancer %v: %v", rb.addr, err)
@ -439,8 +476,8 @@ func (b *balancer) Start(target string, config grpc.BalancerConfig) error {
seq := b.seq seq := b.seq
b.next = 0 b.next = 0
b.mu.Unlock() b.mu.Unlock()
go func(cc *grpc.ClientConn, ccError chan struct{}) { go func(cc *ClientConn, ccError chan struct{}) {
lbc := lbpb.NewLoadBalancerClient(cc) lbc := &loadBalancerClient{cc}
b.callRemoteBalancer(lbc, seq) b.callRemoteBalancer(lbc, seq)
cc.Close() cc.Close()
select { select {
@ -454,7 +491,7 @@ func (b *balancer) Start(target string, config grpc.BalancerConfig) error {
return nil return nil
} }
func (b *balancer) down(addr grpc.Address, err error) { func (b *balancer) down(addr Address, err error) {
b.mu.Lock() b.mu.Lock()
defer b.mu.Unlock() defer b.mu.Unlock()
for _, a := range b.addrs { for _, a := range b.addrs {
@ -465,7 +502,7 @@ func (b *balancer) down(addr grpc.Address, err error) {
} }
} }
func (b *balancer) Up(addr grpc.Address) func(error) { func (b *balancer) Up(addr Address) func(error) {
b.mu.Lock() b.mu.Lock()
defer b.mu.Unlock() defer b.mu.Unlock()
if b.done { if b.done {
@ -479,7 +516,7 @@ func (b *balancer) Up(addr grpc.Address) func(error) {
} }
a.connected = true a.connected = true
} }
if a.connected && !a.dropRequest { if a.connected && !a.dropForRateLimiting && !a.dropForLoadBalancing {
cnt++ cnt++
} }
} }
@ -493,12 +530,12 @@ func (b *balancer) Up(addr grpc.Address) func(error) {
} }
} }
func (b *balancer) Get(ctx context.Context, opts grpc.BalancerGetOptions) (addr grpc.Address, put func(), err error) { func (b *balancer) Get(ctx context.Context, opts BalancerGetOptions) (addr Address, put func(), err error) {
var ch chan struct{} var ch chan struct{}
b.mu.Lock() b.mu.Lock()
if b.done { if b.done {
b.mu.Unlock() b.mu.Unlock()
err = grpc.ErrClientConnClosing err = ErrClientConnClosing
return return
} }
@ -511,7 +548,7 @@ func (b *balancer) Get(ctx context.Context, opts grpc.BalancerGetOptions) (addr
a := b.addrs[next] a := b.addrs[next]
next = (next + 1) % len(b.addrs) next = (next + 1) % len(b.addrs)
if a.connected { if a.connected {
if !a.dropRequest { if !a.dropForRateLimiting && !a.dropForLoadBalancing {
addr = a.addr addr = a.addr
b.next = next b.next = next
b.mu.Unlock() b.mu.Unlock()
@ -520,7 +557,7 @@ func (b *balancer) Get(ctx context.Context, opts grpc.BalancerGetOptions) (addr
if !opts.BlockingWait { if !opts.BlockingWait {
b.next = next b.next = next
b.mu.Unlock() b.mu.Unlock()
err = grpc.Errorf(codes.Unavailable, "%s drops requests", a.addr.Addr) err = Errorf(codes.Unavailable, "%s drops requests", a.addr.Addr)
return return
} }
} }
@ -533,7 +570,7 @@ func (b *balancer) Get(ctx context.Context, opts grpc.BalancerGetOptions) (addr
if !opts.BlockingWait { if !opts.BlockingWait {
if len(b.addrs) == 0 { if len(b.addrs) == 0 {
b.mu.Unlock() b.mu.Unlock()
err = grpc.Errorf(codes.Unavailable, "there is no address available") err = Errorf(codes.Unavailable, "there is no address available")
return return
} }
// Returns the next addr on b.addrs for a failfast RPC. // Returns the next addr on b.addrs for a failfast RPC.
@ -559,7 +596,7 @@ func (b *balancer) Get(ctx context.Context, opts grpc.BalancerGetOptions) (addr
b.mu.Lock() b.mu.Lock()
if b.done { if b.done {
b.mu.Unlock() b.mu.Unlock()
err = grpc.ErrClientConnClosing err = ErrClientConnClosing
return return
} }
@ -572,7 +609,7 @@ func (b *balancer) Get(ctx context.Context, opts grpc.BalancerGetOptions) (addr
a := b.addrs[next] a := b.addrs[next]
next = (next + 1) % len(b.addrs) next = (next + 1) % len(b.addrs)
if a.connected { if a.connected {
if !a.dropRequest { if !a.dropForRateLimiting && !a.dropForLoadBalancing {
addr = a.addr addr = a.addr
b.next = next b.next = next
b.mu.Unlock() b.mu.Unlock()
@ -581,7 +618,7 @@ func (b *balancer) Get(ctx context.Context, opts grpc.BalancerGetOptions) (addr
if !opts.BlockingWait { if !opts.BlockingWait {
b.next = next b.next = next
b.mu.Unlock() b.mu.Unlock()
err = grpc.Errorf(codes.Unavailable, "drop requests for the addreess %s", a.addr.Addr) err = Errorf(codes.Unavailable, "drop requests for the addreess %s", a.addr.Addr)
return return
} }
} }
@ -603,7 +640,7 @@ func (b *balancer) Get(ctx context.Context, opts grpc.BalancerGetOptions) (addr
} }
} }
func (b *balancer) Notify() <-chan []grpc.Address { func (b *balancer) Notify() <-chan []Address {
return b.addrCh return b.addrCh
} }

View File

@ -10,6 +10,7 @@ It is generated from these files:
It has these top-level messages: It has these top-level messages:
Duration Duration
Timestamp
LoadBalanceRequest LoadBalanceRequest
InitialLoadBalanceRequest InitialLoadBalanceRequest
ClientStats ClientStats
@ -24,11 +25,6 @@ import proto "github.com/golang/protobuf/proto"
import fmt "fmt" import fmt "fmt"
import math "math" import math "math"
import (
context "golang.org/x/net/context"
grpc "google.golang.org/grpc"
)
// Reference imports to suppress errors if they are not otherwise used. // Reference imports to suppress errors if they are not otherwise used.
var _ = proto.Marshal var _ = proto.Marshal
var _ = fmt.Errorf var _ = fmt.Errorf
@ -58,6 +54,51 @@ func (m *Duration) String() string { return proto.CompactTextString(m
func (*Duration) ProtoMessage() {} func (*Duration) ProtoMessage() {}
func (*Duration) Descriptor() ([]byte, []int) { return fileDescriptor0, []int{0} } func (*Duration) Descriptor() ([]byte, []int) { return fileDescriptor0, []int{0} }
func (m *Duration) GetSeconds() int64 {
if m != nil {
return m.Seconds
}
return 0
}
func (m *Duration) GetNanos() int32 {
if m != nil {
return m.Nanos
}
return 0
}
type Timestamp struct {
// Represents seconds of UTC time since Unix epoch
// 1970-01-01T00:00:00Z. Must be from 0001-01-01T00:00:00Z to
// 9999-12-31T23:59:59Z inclusive.
Seconds int64 `protobuf:"varint,1,opt,name=seconds" json:"seconds,omitempty"`
// Non-negative fractions of a second at nanosecond resolution. Negative
// second values with fractions must still have non-negative nanos values
// that count forward in time. Must be from 0 to 999,999,999
// inclusive.
Nanos int32 `protobuf:"varint,2,opt,name=nanos" json:"nanos,omitempty"`
}
func (m *Timestamp) Reset() { *m = Timestamp{} }
func (m *Timestamp) String() string { return proto.CompactTextString(m) }
func (*Timestamp) ProtoMessage() {}
func (*Timestamp) Descriptor() ([]byte, []int) { return fileDescriptor0, []int{1} }
func (m *Timestamp) GetSeconds() int64 {
if m != nil {
return m.Seconds
}
return 0
}
func (m *Timestamp) GetNanos() int32 {
if m != nil {
return m.Nanos
}
return 0
}
type LoadBalanceRequest struct { type LoadBalanceRequest struct {
// Types that are valid to be assigned to LoadBalanceRequestType: // Types that are valid to be assigned to LoadBalanceRequestType:
// *LoadBalanceRequest_InitialRequest // *LoadBalanceRequest_InitialRequest
@ -68,17 +109,17 @@ type LoadBalanceRequest struct {
func (m *LoadBalanceRequest) Reset() { *m = LoadBalanceRequest{} } func (m *LoadBalanceRequest) Reset() { *m = LoadBalanceRequest{} }
func (m *LoadBalanceRequest) String() string { return proto.CompactTextString(m) } func (m *LoadBalanceRequest) String() string { return proto.CompactTextString(m) }
func (*LoadBalanceRequest) ProtoMessage() {} func (*LoadBalanceRequest) ProtoMessage() {}
func (*LoadBalanceRequest) Descriptor() ([]byte, []int) { return fileDescriptor0, []int{1} } func (*LoadBalanceRequest) Descriptor() ([]byte, []int) { return fileDescriptor0, []int{2} }
type isLoadBalanceRequest_LoadBalanceRequestType interface { type isLoadBalanceRequest_LoadBalanceRequestType interface {
isLoadBalanceRequest_LoadBalanceRequestType() isLoadBalanceRequest_LoadBalanceRequestType()
} }
type LoadBalanceRequest_InitialRequest struct { type LoadBalanceRequest_InitialRequest struct {
InitialRequest *InitialLoadBalanceRequest `protobuf:"bytes,1,opt,name=initial_request,oneof"` InitialRequest *InitialLoadBalanceRequest `protobuf:"bytes,1,opt,name=initial_request,json=initialRequest,oneof"`
} }
type LoadBalanceRequest_ClientStats struct { type LoadBalanceRequest_ClientStats struct {
ClientStats *ClientStats `protobuf:"bytes,2,opt,name=client_stats,oneof"` ClientStats *ClientStats `protobuf:"bytes,2,opt,name=client_stats,json=clientStats,oneof"`
} }
func (*LoadBalanceRequest_InitialRequest) isLoadBalanceRequest_LoadBalanceRequestType() {} func (*LoadBalanceRequest_InitialRequest) isLoadBalanceRequest_LoadBalanceRequestType() {}
@ -180,30 +221,98 @@ func _LoadBalanceRequest_OneofSizer(msg proto.Message) (n int) {
} }
type InitialLoadBalanceRequest struct { type InitialLoadBalanceRequest struct {
// Name of load balanced service (IE, service.grpc.gslb.google.com) // Name of load balanced service (IE, balancer.service.com)
// length should be less than 256 bytes.
Name string `protobuf:"bytes,1,opt,name=name" json:"name,omitempty"` Name string `protobuf:"bytes,1,opt,name=name" json:"name,omitempty"`
} }
func (m *InitialLoadBalanceRequest) Reset() { *m = InitialLoadBalanceRequest{} } func (m *InitialLoadBalanceRequest) Reset() { *m = InitialLoadBalanceRequest{} }
func (m *InitialLoadBalanceRequest) String() string { return proto.CompactTextString(m) } func (m *InitialLoadBalanceRequest) String() string { return proto.CompactTextString(m) }
func (*InitialLoadBalanceRequest) ProtoMessage() {} func (*InitialLoadBalanceRequest) ProtoMessage() {}
func (*InitialLoadBalanceRequest) Descriptor() ([]byte, []int) { return fileDescriptor0, []int{2} } func (*InitialLoadBalanceRequest) Descriptor() ([]byte, []int) { return fileDescriptor0, []int{3} }
func (m *InitialLoadBalanceRequest) GetName() string {
if m != nil {
return m.Name
}
return ""
}
// Contains client level statistics that are useful to load balancing. Each // Contains client level statistics that are useful to load balancing. Each
// count should be reset to zero after reporting the stats. // count except the timestamp should be reset to zero after reporting the stats.
type ClientStats struct { type ClientStats struct {
// The total number of requests sent by the client since the last report. // The timestamp of generating the report.
TotalRequests int64 `protobuf:"varint,1,opt,name=total_requests" json:"total_requests,omitempty"` Timestamp *Timestamp `protobuf:"bytes,1,opt,name=timestamp" json:"timestamp,omitempty"`
// The number of client rpc errors since the last report. // The total number of RPCs that started.
ClientRpcErrors int64 `protobuf:"varint,2,opt,name=client_rpc_errors" json:"client_rpc_errors,omitempty"` NumCallsStarted int64 `protobuf:"varint,2,opt,name=num_calls_started,json=numCallsStarted" json:"num_calls_started,omitempty"`
// The number of dropped requests since the last report. // The total number of RPCs that finished.
DroppedRequests int64 `protobuf:"varint,3,opt,name=dropped_requests" json:"dropped_requests,omitempty"` NumCallsFinished int64 `protobuf:"varint,3,opt,name=num_calls_finished,json=numCallsFinished" json:"num_calls_finished,omitempty"`
// The total number of RPCs that were dropped by the client because of rate
// limiting.
NumCallsFinishedWithDropForRateLimiting int64 `protobuf:"varint,4,opt,name=num_calls_finished_with_drop_for_rate_limiting,json=numCallsFinishedWithDropForRateLimiting" json:"num_calls_finished_with_drop_for_rate_limiting,omitempty"`
// The total number of RPCs that were dropped by the client because of load
// balancing.
NumCallsFinishedWithDropForLoadBalancing int64 `protobuf:"varint,5,opt,name=num_calls_finished_with_drop_for_load_balancing,json=numCallsFinishedWithDropForLoadBalancing" json:"num_calls_finished_with_drop_for_load_balancing,omitempty"`
// The total number of RPCs that failed to reach a server except dropped RPCs.
NumCallsFinishedWithClientFailedToSend int64 `protobuf:"varint,6,opt,name=num_calls_finished_with_client_failed_to_send,json=numCallsFinishedWithClientFailedToSend" json:"num_calls_finished_with_client_failed_to_send,omitempty"`
// The total number of RPCs that finished and are known to have been received
// by a server.
NumCallsFinishedKnownReceived int64 `protobuf:"varint,7,opt,name=num_calls_finished_known_received,json=numCallsFinishedKnownReceived" json:"num_calls_finished_known_received,omitempty"`
} }
func (m *ClientStats) Reset() { *m = ClientStats{} } func (m *ClientStats) Reset() { *m = ClientStats{} }
func (m *ClientStats) String() string { return proto.CompactTextString(m) } func (m *ClientStats) String() string { return proto.CompactTextString(m) }
func (*ClientStats) ProtoMessage() {} func (*ClientStats) ProtoMessage() {}
func (*ClientStats) Descriptor() ([]byte, []int) { return fileDescriptor0, []int{3} } func (*ClientStats) Descriptor() ([]byte, []int) { return fileDescriptor0, []int{4} }
func (m *ClientStats) GetTimestamp() *Timestamp {
if m != nil {
return m.Timestamp
}
return nil
}
func (m *ClientStats) GetNumCallsStarted() int64 {
if m != nil {
return m.NumCallsStarted
}
return 0
}
func (m *ClientStats) GetNumCallsFinished() int64 {
if m != nil {
return m.NumCallsFinished
}
return 0
}
func (m *ClientStats) GetNumCallsFinishedWithDropForRateLimiting() int64 {
if m != nil {
return m.NumCallsFinishedWithDropForRateLimiting
}
return 0
}
func (m *ClientStats) GetNumCallsFinishedWithDropForLoadBalancing() int64 {
if m != nil {
return m.NumCallsFinishedWithDropForLoadBalancing
}
return 0
}
func (m *ClientStats) GetNumCallsFinishedWithClientFailedToSend() int64 {
if m != nil {
return m.NumCallsFinishedWithClientFailedToSend
}
return 0
}
func (m *ClientStats) GetNumCallsFinishedKnownReceived() int64 {
if m != nil {
return m.NumCallsFinishedKnownReceived
}
return 0
}
type LoadBalanceResponse struct { type LoadBalanceResponse struct {
// Types that are valid to be assigned to LoadBalanceResponseType: // Types that are valid to be assigned to LoadBalanceResponseType:
@ -215,17 +324,17 @@ type LoadBalanceResponse struct {
func (m *LoadBalanceResponse) Reset() { *m = LoadBalanceResponse{} } func (m *LoadBalanceResponse) Reset() { *m = LoadBalanceResponse{} }
func (m *LoadBalanceResponse) String() string { return proto.CompactTextString(m) } func (m *LoadBalanceResponse) String() string { return proto.CompactTextString(m) }
func (*LoadBalanceResponse) ProtoMessage() {} func (*LoadBalanceResponse) ProtoMessage() {}
func (*LoadBalanceResponse) Descriptor() ([]byte, []int) { return fileDescriptor0, []int{4} } func (*LoadBalanceResponse) Descriptor() ([]byte, []int) { return fileDescriptor0, []int{5} }
type isLoadBalanceResponse_LoadBalanceResponseType interface { type isLoadBalanceResponse_LoadBalanceResponseType interface {
isLoadBalanceResponse_LoadBalanceResponseType() isLoadBalanceResponse_LoadBalanceResponseType()
} }
type LoadBalanceResponse_InitialResponse struct { type LoadBalanceResponse_InitialResponse struct {
InitialResponse *InitialLoadBalanceResponse `protobuf:"bytes,1,opt,name=initial_response,oneof"` InitialResponse *InitialLoadBalanceResponse `protobuf:"bytes,1,opt,name=initial_response,json=initialResponse,oneof"`
} }
type LoadBalanceResponse_ServerList struct { type LoadBalanceResponse_ServerList struct {
ServerList *ServerList `protobuf:"bytes,2,opt,name=server_list,oneof"` ServerList *ServerList `protobuf:"bytes,2,opt,name=server_list,json=serverList,oneof"`
} }
func (*LoadBalanceResponse_InitialResponse) isLoadBalanceResponse_LoadBalanceResponseType() {} func (*LoadBalanceResponse_InitialResponse) isLoadBalanceResponse_LoadBalanceResponseType() {}
@ -330,18 +439,26 @@ type InitialLoadBalanceResponse struct {
// This is an application layer redirect that indicates the client should use // This is an application layer redirect that indicates the client should use
// the specified server for load balancing. When this field is non-empty in // the specified server for load balancing. When this field is non-empty in
// the response, the client should open a separate connection to the // the response, the client should open a separate connection to the
// load_balancer_delegate and call the BalanceLoad method. // load_balancer_delegate and call the BalanceLoad method. Its length should
LoadBalancerDelegate string `protobuf:"bytes,1,opt,name=load_balancer_delegate" json:"load_balancer_delegate,omitempty"` // be less than 64 bytes.
LoadBalancerDelegate string `protobuf:"bytes,1,opt,name=load_balancer_delegate,json=loadBalancerDelegate" json:"load_balancer_delegate,omitempty"`
// This interval defines how often the client should send the client stats // This interval defines how often the client should send the client stats
// to the load balancer. Stats should only be reported when the duration is // to the load balancer. Stats should only be reported when the duration is
// positive. // positive.
ClientStatsReportInterval *Duration `protobuf:"bytes,3,opt,name=client_stats_report_interval" json:"client_stats_report_interval,omitempty"` ClientStatsReportInterval *Duration `protobuf:"bytes,2,opt,name=client_stats_report_interval,json=clientStatsReportInterval" json:"client_stats_report_interval,omitempty"`
} }
func (m *InitialLoadBalanceResponse) Reset() { *m = InitialLoadBalanceResponse{} } func (m *InitialLoadBalanceResponse) Reset() { *m = InitialLoadBalanceResponse{} }
func (m *InitialLoadBalanceResponse) String() string { return proto.CompactTextString(m) } func (m *InitialLoadBalanceResponse) String() string { return proto.CompactTextString(m) }
func (*InitialLoadBalanceResponse) ProtoMessage() {} func (*InitialLoadBalanceResponse) ProtoMessage() {}
func (*InitialLoadBalanceResponse) Descriptor() ([]byte, []int) { return fileDescriptor0, []int{5} } func (*InitialLoadBalanceResponse) Descriptor() ([]byte, []int) { return fileDescriptor0, []int{6} }
func (m *InitialLoadBalanceResponse) GetLoadBalancerDelegate() string {
if m != nil {
return m.LoadBalancerDelegate
}
return ""
}
func (m *InitialLoadBalanceResponse) GetClientStatsReportInterval() *Duration { func (m *InitialLoadBalanceResponse) GetClientStatsReportInterval() *Duration {
if m != nil { if m != nil {
@ -360,13 +477,13 @@ type ServerList struct {
// list as valid. It may be considered stale after waiting this interval of // list as valid. It may be considered stale after waiting this interval of
// time after receiving the list. If the interval is not positive, the // time after receiving the list. If the interval is not positive, the
// client can assume the list is valid until the next list is received. // client can assume the list is valid until the next list is received.
ExpirationInterval *Duration `protobuf:"bytes,3,opt,name=expiration_interval" json:"expiration_interval,omitempty"` ExpirationInterval *Duration `protobuf:"bytes,3,opt,name=expiration_interval,json=expirationInterval" json:"expiration_interval,omitempty"`
} }
func (m *ServerList) Reset() { *m = ServerList{} } func (m *ServerList) Reset() { *m = ServerList{} }
func (m *ServerList) String() string { return proto.CompactTextString(m) } func (m *ServerList) String() string { return proto.CompactTextString(m) }
func (*ServerList) ProtoMessage() {} func (*ServerList) ProtoMessage() {}
func (*ServerList) Descriptor() ([]byte, []int) { return fileDescriptor0, []int{6} } func (*ServerList) Descriptor() ([]byte, []int) { return fileDescriptor0, []int{7} }
func (m *ServerList) GetServers() []*Server { func (m *ServerList) GetServers() []*Server {
if m != nil { if m != nil {
@ -382,29 +499,74 @@ func (m *ServerList) GetExpirationInterval() *Duration {
return nil return nil
} }
// Contains server information. When none of the [drop_for_*] fields are true,
// use the other fields. When drop_for_rate_limiting is true, ignore all other
// fields. Use drop_for_load_balancing only when it is true and
// drop_for_rate_limiting is false.
type Server struct { type Server struct {
// A resolved address for the server, serialized in network-byte-order. It may // A resolved address for the server, serialized in network-byte-order. It may
// either be an IPv4 or IPv6 address. // either be an IPv4 or IPv6 address.
IpAddress []byte `protobuf:"bytes,1,opt,name=ip_address,proto3" json:"ip_address,omitempty"` IpAddress []byte `protobuf:"bytes,1,opt,name=ip_address,json=ipAddress,proto3" json:"ip_address,omitempty"`
// A resolved port number for the server. // A resolved port number for the server.
Port int32 `protobuf:"varint,2,opt,name=port" json:"port,omitempty"` Port int32 `protobuf:"varint,2,opt,name=port" json:"port,omitempty"`
// An opaque but printable token given to the frontend for each pick. All // An opaque but printable token given to the frontend for each pick. All
// frontend requests for that pick must include the token in its initial // frontend requests for that pick must include the token in its initial
// metadata. The token is used by the backend to verify the request and to // metadata. The token is used by the backend to verify the request and to
// allow the backend to report load to the gRPC LB system. // allow the backend to report load to the gRPC LB system.
LoadBalanceToken string `protobuf:"bytes,3,opt,name=load_balance_token" json:"load_balance_token,omitempty"` //
// Its length is variable but less than 50 bytes.
LoadBalanceToken string `protobuf:"bytes,3,opt,name=load_balance_token,json=loadBalanceToken" json:"load_balance_token,omitempty"`
// Indicates whether this particular request should be dropped by the client // Indicates whether this particular request should be dropped by the client
// when this server is chosen from the list. // for rate limiting.
DropRequest bool `protobuf:"varint,4,opt,name=drop_request" json:"drop_request,omitempty"` DropForRateLimiting bool `protobuf:"varint,4,opt,name=drop_for_rate_limiting,json=dropForRateLimiting" json:"drop_for_rate_limiting,omitempty"`
// Indicates whether this particular request should be dropped by the client
// for load balancing.
DropForLoadBalancing bool `protobuf:"varint,5,opt,name=drop_for_load_balancing,json=dropForLoadBalancing" json:"drop_for_load_balancing,omitempty"`
} }
func (m *Server) Reset() { *m = Server{} } func (m *Server) Reset() { *m = Server{} }
func (m *Server) String() string { return proto.CompactTextString(m) } func (m *Server) String() string { return proto.CompactTextString(m) }
func (*Server) ProtoMessage() {} func (*Server) ProtoMessage() {}
func (*Server) Descriptor() ([]byte, []int) { return fileDescriptor0, []int{7} } func (*Server) Descriptor() ([]byte, []int) { return fileDescriptor0, []int{8} }
func (m *Server) GetIpAddress() []byte {
if m != nil {
return m.IpAddress
}
return nil
}
func (m *Server) GetPort() int32 {
if m != nil {
return m.Port
}
return 0
}
func (m *Server) GetLoadBalanceToken() string {
if m != nil {
return m.LoadBalanceToken
}
return ""
}
func (m *Server) GetDropForRateLimiting() bool {
if m != nil {
return m.DropForRateLimiting
}
return false
}
func (m *Server) GetDropForLoadBalancing() bool {
if m != nil {
return m.DropForLoadBalancing
}
return false
}
func init() { func init() {
proto.RegisterType((*Duration)(nil), "grpc.lb.v1.Duration") proto.RegisterType((*Duration)(nil), "grpc.lb.v1.Duration")
proto.RegisterType((*Timestamp)(nil), "grpc.lb.v1.Timestamp")
proto.RegisterType((*LoadBalanceRequest)(nil), "grpc.lb.v1.LoadBalanceRequest") proto.RegisterType((*LoadBalanceRequest)(nil), "grpc.lb.v1.LoadBalanceRequest")
proto.RegisterType((*InitialLoadBalanceRequest)(nil), "grpc.lb.v1.InitialLoadBalanceRequest") proto.RegisterType((*InitialLoadBalanceRequest)(nil), "grpc.lb.v1.InitialLoadBalanceRequest")
proto.RegisterType((*ClientStats)(nil), "grpc.lb.v1.ClientStats") proto.RegisterType((*ClientStats)(nil), "grpc.lb.v1.ClientStats")
@ -414,144 +576,54 @@ func init() {
proto.RegisterType((*Server)(nil), "grpc.lb.v1.Server") proto.RegisterType((*Server)(nil), "grpc.lb.v1.Server")
} }
// Reference imports to suppress errors if they are not otherwise used.
var _ context.Context
var _ grpc.ClientConn
// This is a compile-time assertion to ensure that this generated file
// is compatible with the grpc package it is being compiled against.
const _ = grpc.SupportPackageIsVersion4
// Client API for LoadBalancer service
type LoadBalancerClient interface {
// Bidirectional rpc to get a list of servers.
BalanceLoad(ctx context.Context, opts ...grpc.CallOption) (LoadBalancer_BalanceLoadClient, error)
}
type loadBalancerClient struct {
cc *grpc.ClientConn
}
func NewLoadBalancerClient(cc *grpc.ClientConn) LoadBalancerClient {
return &loadBalancerClient{cc}
}
func (c *loadBalancerClient) BalanceLoad(ctx context.Context, opts ...grpc.CallOption) (LoadBalancer_BalanceLoadClient, error) {
stream, err := grpc.NewClientStream(ctx, &_LoadBalancer_serviceDesc.Streams[0], c.cc, "/grpc.lb.v1.LoadBalancer/BalanceLoad", opts...)
if err != nil {
return nil, err
}
x := &loadBalancerBalanceLoadClient{stream}
return x, nil
}
type LoadBalancer_BalanceLoadClient interface {
Send(*LoadBalanceRequest) error
Recv() (*LoadBalanceResponse, error)
grpc.ClientStream
}
type loadBalancerBalanceLoadClient struct {
grpc.ClientStream
}
func (x *loadBalancerBalanceLoadClient) Send(m *LoadBalanceRequest) error {
return x.ClientStream.SendMsg(m)
}
func (x *loadBalancerBalanceLoadClient) Recv() (*LoadBalanceResponse, error) {
m := new(LoadBalanceResponse)
if err := x.ClientStream.RecvMsg(m); err != nil {
return nil, err
}
return m, nil
}
// Server API for LoadBalancer service
type LoadBalancerServer interface {
// Bidirectional rpc to get a list of servers.
BalanceLoad(LoadBalancer_BalanceLoadServer) error
}
func RegisterLoadBalancerServer(s *grpc.Server, srv LoadBalancerServer) {
s.RegisterService(&_LoadBalancer_serviceDesc, srv)
}
func _LoadBalancer_BalanceLoad_Handler(srv interface{}, stream grpc.ServerStream) error {
return srv.(LoadBalancerServer).BalanceLoad(&loadBalancerBalanceLoadServer{stream})
}
type LoadBalancer_BalanceLoadServer interface {
Send(*LoadBalanceResponse) error
Recv() (*LoadBalanceRequest, error)
grpc.ServerStream
}
type loadBalancerBalanceLoadServer struct {
grpc.ServerStream
}
func (x *loadBalancerBalanceLoadServer) Send(m *LoadBalanceResponse) error {
return x.ServerStream.SendMsg(m)
}
func (x *loadBalancerBalanceLoadServer) Recv() (*LoadBalanceRequest, error) {
m := new(LoadBalanceRequest)
if err := x.ServerStream.RecvMsg(m); err != nil {
return nil, err
}
return m, nil
}
var _LoadBalancer_serviceDesc = grpc.ServiceDesc{
ServiceName: "grpc.lb.v1.LoadBalancer",
HandlerType: (*LoadBalancerServer)(nil),
Methods: []grpc.MethodDesc{},
Streams: []grpc.StreamDesc{
{
StreamName: "BalanceLoad",
Handler: _LoadBalancer_BalanceLoad_Handler,
ServerStreams: true,
ClientStreams: true,
},
},
Metadata: "grpclb.proto",
}
func init() { proto.RegisterFile("grpclb.proto", fileDescriptor0) } func init() { proto.RegisterFile("grpclb.proto", fileDescriptor0) }
var fileDescriptor0 = []byte{ var fileDescriptor0 = []byte{
// 471 bytes of a gzipped FileDescriptorProto // 733 bytes of a gzipped FileDescriptorProto
0x1f, 0x8b, 0x08, 0x00, 0x00, 0x09, 0x6e, 0x88, 0x02, 0xff, 0x8c, 0x93, 0x51, 0x6f, 0xd3, 0x3e, 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0x94, 0x55, 0xdd, 0x4e, 0x1b, 0x39,
0x14, 0xc5, 0x9b, 0x7f, 0xb7, 0xfd, 0xb7, 0x9b, 0xc0, 0xc6, 0xdd, 0x54, 0xda, 0x32, 0x8d, 0x2a, 0x14, 0x66, 0x36, 0xfc, 0xe5, 0x24, 0x5a, 0x58, 0x93, 0x85, 0xc0, 0xc2, 0x2e, 0x1b, 0xa9, 0x34,
0x08, 0x54, 0x90, 0x08, 0x2c, 0xbc, 0xf1, 0x84, 0x0a, 0x0f, 0x45, 0xda, 0xd3, 0xf6, 0x86, 0x90, 0xaa, 0x68, 0x68, 0x43, 0x7b, 0xd1, 0x9f, 0x9b, 0x02, 0x45, 0x41, 0xe5, 0xa2, 0x72, 0xa8, 0x7a,
0x2c, 0x27, 0xb9, 0x9a, 0x2c, 0x82, 0x6d, 0x6c, 0xaf, 0x1a, 0xdf, 0x07, 0xf1, 0x39, 0x91, 0xe3, 0x55, 0x59, 0x4e, 0xc6, 0x80, 0xc5, 0xc4, 0x9e, 0xda, 0x4e, 0x68, 0x2f, 0x7b, 0xd9, 0x47, 0xe9,
0x94, 0x64, 0x54, 0x15, 0xbc, 0xc5, 0xbe, 0x3e, 0xf7, 0x1e, 0xff, 0x7c, 0x02, 0xc9, 0xb5, 0xd1, 0x63, 0x54, 0x7d, 0x86, 0xbe, 0x4f, 0x65, 0x7b, 0x26, 0x33, 0x90, 0x1f, 0xd4, 0xbb, 0xf1, 0xf1,
0x65, 0x5d, 0x64, 0xda, 0x28, 0xa7, 0x10, 0xfc, 0x2a, 0xab, 0x8b, 0x6c, 0x75, 0x9e, 0xbe, 0x80, 0x77, 0xbe, 0xf3, 0xf9, 0xd8, 0xdf, 0x19, 0x28, 0x5f, 0xa8, 0xb8, 0x1b, 0x75, 0x1a, 0xb1, 0x92,
0xfd, 0x0f, 0x37, 0x86, 0x3b, 0xa1, 0x24, 0x1e, 0xc2, 0xff, 0x96, 0x4a, 0x25, 0x2b, 0x3b, 0x8e, 0x46, 0x22, 0xb0, 0xab, 0x46, 0xd4, 0x69, 0x0c, 0x1e, 0xd7, 0x9e, 0xc3, 0xe2, 0x51, 0x5f, 0x51,
0x66, 0xd1, 0x7c, 0x88, 0xf7, 0x60, 0x57, 0x72, 0xa9, 0xec, 0xf8, 0xbf, 0x59, 0x34, 0xdf, 0x4d, 0xc3, 0xa5, 0x40, 0x55, 0x58, 0xd0, 0xac, 0x2b, 0x45, 0xa8, 0xab, 0xc1, 0x76, 0x50, 0x2f, 0xe0,
0x7f, 0x44, 0x80, 0x17, 0x8a, 0x57, 0x0b, 0x5e, 0x73, 0x59, 0xd2, 0x25, 0x7d, 0xbb, 0x21, 0xeb, 0x74, 0x89, 0x2a, 0x30, 0x27, 0xa8, 0x90, 0xba, 0xfa, 0xc7, 0x76, 0x50, 0x9f, 0xc3, 0x7e, 0x51,
0xf0, 0x1d, 0x1c, 0x0a, 0x29, 0x9c, 0xe0, 0x35, 0x33, 0x61, 0xab, 0x91, 0xc7, 0xf9, 0xd3, 0xac, 0x7b, 0x01, 0xc5, 0x33, 0xde, 0x63, 0xda, 0xd0, 0x5e, 0xfc, 0xdb, 0xc9, 0xdf, 0x03, 0x40, 0xa7,
0x1b, 0x94, 0x7d, 0x0c, 0x47, 0x36, 0xf5, 0xcb, 0x01, 0xbe, 0x82, 0xa4, 0xac, 0x05, 0x49, 0xc7, 0x92, 0x86, 0x07, 0x34, 0xa2, 0xa2, 0xcb, 0x30, 0xfb, 0xd8, 0x67, 0xda, 0xa0, 0xb7, 0xb0, 0xc4,
0xac, 0xe3, 0x2e, 0x8c, 0x8b, 0xf3, 0x87, 0x7d, 0xf9, 0xfb, 0xa6, 0x7e, 0xe5, 0xcb, 0xcb, 0xc1, 0x05, 0x37, 0x9c, 0x46, 0x44, 0xf9, 0x90, 0xa3, 0x2b, 0x35, 0xef, 0x35, 0x32, 0xd5, 0x8d, 0x13,
0xe2, 0x11, 0x4c, 0x6a, 0xc5, 0x2b, 0x56, 0x84, 0x4e, 0xeb, 0xb9, 0xcc, 0x7d, 0xd7, 0x94, 0x3e, 0x0f, 0x19, 0xcd, 0x6f, 0xcd, 0xe0, 0x3f, 0x93, 0xfc, 0x94, 0xf1, 0x25, 0x94, 0xbb, 0x11, 0x67,
0x87, 0xc9, 0xd6, 0x61, 0x98, 0xc0, 0x8e, 0xe4, 0x5f, 0xa9, 0x71, 0x78, 0x90, 0x7e, 0x82, 0xb8, 0xc2, 0x10, 0x6d, 0xa8, 0xf1, 0x2a, 0x4a, 0xcd, 0xb5, 0x3c, 0xdd, 0xa1, 0xdb, 0x6f, 0xdb, 0xed,
0xd7, 0x18, 0x47, 0x70, 0xdf, 0x29, 0xd7, 0xdd, 0x63, 0xcd, 0x61, 0x02, 0x0f, 0x5a, 0x7f, 0x46, 0xd6, 0x0c, 0x2e, 0x75, 0xb3, 0xe5, 0xc1, 0x3f, 0xb0, 0x1e, 0x49, 0x1a, 0x92, 0x8e, 0x2f, 0x93,
0x97, 0x8c, 0x8c, 0x51, 0x26, 0x98, 0x1c, 0xe2, 0x18, 0x8e, 0x2a, 0xa3, 0xb4, 0xa6, 0xaa, 0x13, 0x8a, 0x22, 0xe6, 0x73, 0xcc, 0x6a, 0x7b, 0xb0, 0x3e, 0x51, 0x09, 0x42, 0x30, 0x2b, 0x68, 0x8f,
0x0d, 0x7d, 0x25, 0xfd, 0x19, 0xc1, 0xf1, 0x1d, 0x03, 0x56, 0x2b, 0x69, 0x09, 0x17, 0x70, 0xd4, 0x39, 0xf9, 0x45, 0xec, 0xbe, 0x6b, 0x5f, 0x67, 0xa1, 0x94, 0x2b, 0x86, 0xf6, 0xa1, 0x68, 0xd2,
0xe1, 0x0a, 0x7b, 0x2d, 0xaf, 0x67, 0x7f, 0xe3, 0x15, 0x4e, 0x2f, 0x07, 0xf8, 0x12, 0x62, 0x4b, 0x0e, 0x26, 0xe7, 0xfc, 0x3b, 0x2f, 0x6c, 0xd8, 0x5e, 0x9c, 0xe1, 0xd0, 0x03, 0xf8, 0x4b, 0xf4,
0x66, 0x45, 0x86, 0xd5, 0xc2, 0xba, 0x96, 0xd7, 0xa8, 0x2f, 0xbf, 0x6a, 0xca, 0x17, 0xc2, 0xf3, 0x7b, 0xa4, 0x4b, 0xa3, 0x48, 0xdb, 0x33, 0x29, 0xc3, 0x42, 0x77, 0xaa, 0x02, 0x5e, 0x12, 0xfd,
0x5d, 0x9c, 0xc2, 0xf4, 0x0f, 0x5c, 0xa1, 0x53, 0xe0, 0x75, 0x0b, 0xd3, 0xed, 0xc3, 0xf0, 0x0c, 0xde, 0xa1, 0x8d, 0xb7, 0x7d, 0x18, 0xed, 0x02, 0xca, 0xb0, 0xe7, 0x5c, 0x70, 0x7d, 0xc9, 0xc2,
0x46, 0x7d, 0xad, 0x61, 0x15, 0xd5, 0x74, 0xcd, 0x5d, 0x8b, 0x10, 0xdf, 0xc2, 0x69, 0xff, 0xed, 0x6a, 0xc1, 0x81, 0x97, 0x53, 0xf0, 0x71, 0x12, 0x47, 0x04, 0x1a, 0xa3, 0x68, 0x72, 0xcd, 0xcd,
0x98, 0x21, 0xad, 0x8c, 0x63, 0x42, 0x3a, 0x32, 0x2b, 0x5e, 0x37, 0x30, 0xe2, 0xfc, 0xa4, 0xef, 0x25, 0x09, 0x95, 0x8c, 0xc9, 0xb9, 0x54, 0x44, 0x51, 0xc3, 0x48, 0xc4, 0x7b, 0xdc, 0x70, 0x71,
0x6d, 0x1d, 0xb8, 0xb4, 0x02, 0xe8, 0x7c, 0xe2, 0x13, 0x1f, 0x3f, 0xbf, 0xf2, 0xd8, 0x87, 0xf3, 0x51, 0x9d, 0x75, 0x4c, 0xf7, 0x6f, 0x33, 0xbd, 0xe7, 0xe6, 0xf2, 0x48, 0xc9, 0xf8, 0x58, 0x2a,
0x38, 0xc7, 0xcd, 0x0b, 0xe1, 0x39, 0x1c, 0xd3, 0xad, 0x16, 0xa1, 0xc1, 0xbf, 0x4d, 0xf9, 0x0c, 0x4c, 0x0d, 0x3b, 0x4d, 0xe0, 0x88, 0xc2, 0xde, 0x9d, 0x05, 0x72, 0xed, 0xb6, 0x15, 0xe6, 0x5c,
0x7b, 0xad, 0x18, 0x01, 0x84, 0x66, 0xbc, 0xaa, 0x0c, 0xd9, 0xf0, 0xb6, 0x89, 0x0f, 0x84, 0x37, 0x85, 0xfa, 0x94, 0x0a, 0x59, 0xef, 0x6d, 0x89, 0x0f, 0xf0, 0x70, 0x52, 0x89, 0xe4, 0x19, 0x9c,
0x1c, 0x22, 0x8e, 0x53, 0xc0, 0x3b, 0xa4, 0x9c, 0xfa, 0x42, 0xb2, 0xe9, 0x7e, 0x80, 0x27, 0x90, 0x53, 0x1e, 0xb1, 0x90, 0x18, 0x49, 0x34, 0x13, 0x61, 0x75, 0xde, 0x15, 0xd8, 0x19, 0x57, 0xc0,
0xf8, 0xa7, 0xfe, 0x1d, 0xf2, 0x9d, 0x59, 0x34, 0xdf, 0xcf, 0x0b, 0x48, 0x7a, 0xd8, 0x0c, 0x5e, 0x5f, 0xd5, 0xb1, 0xc3, 0x9f, 0xc9, 0x36, 0x13, 0x21, 0x6a, 0xc1, 0xff, 0x63, 0xe8, 0xaf, 0x84,
0x42, 0xdc, 0x7e, 0xfb, 0x6d, 0x3c, 0xeb, 0x5b, 0xda, 0xcc, 0xe3, 0xf4, 0xf1, 0xd6, 0x7a, 0xe0, 0xbc, 0x16, 0x44, 0xb1, 0x2e, 0xe3, 0x03, 0x16, 0x56, 0x17, 0x1c, 0xe5, 0xd6, 0x6d, 0xca, 0x37,
0x3f, 0x8f, 0x5e, 0x47, 0xc5, 0x5e, 0xf3, 0xdf, 0xbe, 0xf9, 0x15, 0x00, 0x00, 0xff, 0xff, 0x01, 0x16, 0x85, 0x13, 0x50, 0xed, 0x47, 0x00, 0x2b, 0x37, 0x9e, 0x8d, 0x8e, 0xa5, 0xd0, 0x0c, 0xb5,
0x8b, 0xc9, 0x26, 0xc7, 0x03, 0x00, 0x00, 0x61, 0x39, 0x73, 0x80, 0x8f, 0x25, 0x4f, 0x63, 0xe7, 0x2e, 0x0b, 0x78, 0x74, 0x6b, 0x06, 0x2f,
0x0d, 0x3d, 0x90, 0x90, 0x3e, 0x83, 0x92, 0x66, 0x6a, 0xc0, 0x14, 0x89, 0xb8, 0x36, 0x89, 0x07,
0x56, 0xf3, 0x7c, 0x6d, 0xb7, 0x7d, 0xca, 0x9d, 0x87, 0x40, 0x0f, 0x57, 0x07, 0x9b, 0xb0, 0x71,
0xcb, 0x01, 0x9e, 0xd3, 0x5b, 0xe0, 0x5b, 0x00, 0x1b, 0x93, 0xa5, 0xa0, 0x27, 0xb0, 0x9a, 0x4f,
0x56, 0x24, 0x64, 0x11, 0xbb, 0xa0, 0x26, 0xb5, 0x45, 0x25, 0xca, 0x92, 0xd4, 0x51, 0xb2, 0x87,
0xde, 0xc1, 0x66, 0xde, 0xb2, 0x44, 0xb1, 0x58, 0x2a, 0x43, 0xb8, 0x30, 0x4c, 0x0d, 0x68, 0x94,
0xc8, 0xaf, 0xe4, 0xe5, 0xa7, 0x43, 0x0c, 0xaf, 0xe7, 0xdc, 0x8b, 0x5d, 0xde, 0x49, 0x92, 0x56,
0xfb, 0x12, 0x00, 0x64, 0xc7, 0x44, 0xbb, 0x76, 0x62, 0xd9, 0x95, 0x9d, 0x58, 0x85, 0x7a, 0xa9,
0x89, 0x46, 0xfb, 0x81, 0x53, 0x08, 0x7a, 0x0d, 0x2b, 0xec, 0x53, 0xcc, 0x7d, 0x95, 0x4c, 0x4a,
0x61, 0x8a, 0x14, 0x94, 0x25, 0x0c, 0x35, 0xfc, 0x0c, 0x60, 0xde, 0x53, 0xa3, 0x2d, 0x00, 0x1e,
0x13, 0x1a, 0x86, 0x8a, 0x69, 0x3f, 0x34, 0xcb, 0xb8, 0xc8, 0xe3, 0x57, 0x3e, 0x60, 0xe7, 0x87,
0x55, 0x9f, 0x4c, 0x4d, 0xf7, 0x6d, 0xed, 0x7c, 0xe3, 0x2e, 0x8c, 0xbc, 0x62, 0xc2, 0x69, 0x28,
0xe2, 0xe5, 0x5c, 0x2b, 0xcf, 0x6c, 0x1c, 0xed, 0xc3, 0xea, 0x14, 0xdb, 0x2e, 0xe2, 0x95, 0x70,
0x8c, 0x45, 0x9f, 0xc2, 0xda, 0x34, 0x2b, 0x2e, 0xe2, 0x4a, 0x38, 0xc6, 0x76, 0xcd, 0x0e, 0x94,
0x73, 0xf7, 0xaf, 0x10, 0x86, 0x52, 0xf2, 0x6d, 0xc3, 0xe8, 0xdf, 0x7c, 0x83, 0x46, 0x87, 0xe5,
0xc6, 0x7f, 0x13, 0xf7, 0xfd, 0x43, 0xaa, 0x07, 0x8f, 0x82, 0xce, 0xbc, 0xfb, 0x7d, 0xed, 0xff,
0x0a, 0x00, 0x00, 0xff, 0xff, 0x64, 0xbf, 0xda, 0x5e, 0xce, 0x06, 0x00, 0x00,
} }

View File

@ -1,3 +1,32 @@
// Copyright 2016, Google Inc.
// All rights reserved.
//
// Redistribution and use in source and binary forms, with or without
// modification, are permitted provided that the following conditions are
// met:
//
// * Redistributions of source code must retain the above copyright
// notice, this list of conditions and the following disclaimer.
// * Redistributions in binary form must reproduce the above
// copyright notice, this list of conditions and the following disclaimer
// in the documentation and/or other materials provided with the
// distribution.
// * Neither the name of Google Inc. nor the names of its
// contributors may be used to endorse or promote products derived from
// this software without specific prior written permission.
//
// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
// "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
// LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
// A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
// OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
// SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
// LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
// DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
// THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
// (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
// OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
syntax = "proto3"; syntax = "proto3";
package grpc.lb.v1; package grpc.lb.v1;
@ -16,6 +45,20 @@ message Duration {
int32 nanos = 2; int32 nanos = 2;
} }
message Timestamp {
// Represents seconds of UTC time since Unix epoch
// 1970-01-01T00:00:00Z. Must be from 0001-01-01T00:00:00Z to
// 9999-12-31T23:59:59Z inclusive.
int64 seconds = 1;
// Non-negative fractions of a second at nanosecond resolution. Negative
// second values with fractions must still have non-negative nanos values
// that count forward in time. Must be from 0 to 999,999,999
// inclusive.
int32 nanos = 2;
}
service LoadBalancer { service LoadBalancer {
// Bidirectional rpc to get a list of servers. // Bidirectional rpc to get a list of servers.
rpc BalanceLoad(stream LoadBalanceRequest) rpc BalanceLoad(stream LoadBalanceRequest)
@ -34,21 +77,37 @@ message LoadBalanceRequest {
} }
message InitialLoadBalanceRequest { message InitialLoadBalanceRequest {
// Name of load balanced service (IE, service.grpc.gslb.google.com) // Name of load balanced service (IE, balancer.service.com)
// length should be less than 256 bytes.
string name = 1; string name = 1;
} }
// Contains client level statistics that are useful to load balancing. Each // Contains client level statistics that are useful to load balancing. Each
// count should be reset to zero after reporting the stats. // count except the timestamp should be reset to zero after reporting the stats.
message ClientStats { message ClientStats {
// The total number of requests sent by the client since the last report. // The timestamp of generating the report.
int64 total_requests = 1; Timestamp timestamp = 1;
// The number of client rpc errors since the last report. // The total number of RPCs that started.
int64 client_rpc_errors = 2; int64 num_calls_started = 2;
// The number of dropped requests since the last report. // The total number of RPCs that finished.
int64 dropped_requests = 3; int64 num_calls_finished = 3;
// The total number of RPCs that were dropped by the client because of rate
// limiting.
int64 num_calls_finished_with_drop_for_rate_limiting = 4;
// The total number of RPCs that were dropped by the client because of load
// balancing.
int64 num_calls_finished_with_drop_for_load_balancing = 5;
// The total number of RPCs that failed to reach a server except dropped RPCs.
int64 num_calls_finished_with_client_failed_to_send = 6;
// The total number of RPCs that finished and are known to have been received
// by a server.
int64 num_calls_finished_known_received = 7;
} }
message LoadBalanceResponse { message LoadBalanceResponse {
@ -66,13 +125,14 @@ message InitialLoadBalanceResponse {
// This is an application layer redirect that indicates the client should use // This is an application layer redirect that indicates the client should use
// the specified server for load balancing. When this field is non-empty in // the specified server for load balancing. When this field is non-empty in
// the response, the client should open a separate connection to the // the response, the client should open a separate connection to the
// load_balancer_delegate and call the BalanceLoad method. // load_balancer_delegate and call the BalanceLoad method. Its length should
// be less than 64 bytes.
string load_balancer_delegate = 1; string load_balancer_delegate = 1;
// This interval defines how often the client should send the client stats // This interval defines how often the client should send the client stats
// to the load balancer. Stats should only be reported when the duration is // to the load balancer. Stats should only be reported when the duration is
// positive. // positive.
Duration client_stats_report_interval = 3; Duration client_stats_report_interval = 2;
} }
message ServerList { message ServerList {
@ -89,6 +149,10 @@ message ServerList {
Duration expiration_interval = 3; Duration expiration_interval = 3;
} }
// Contains server information. When none of the [drop_for_*] fields are true,
// use the other fields. When drop_for_rate_limiting is true, ignore all other
// fields. Use drop_for_load_balancing only when it is true and
// drop_for_rate_limiting is false.
message Server { message Server {
// A resolved address for the server, serialized in network-byte-order. It may // A resolved address for the server, serialized in network-byte-order. It may
// either be an IPv4 or IPv6 address. // either be an IPv4 or IPv6 address.
@ -101,9 +165,15 @@ message Server {
// frontend requests for that pick must include the token in its initial // frontend requests for that pick must include the token in its initial
// metadata. The token is used by the backend to verify the request and to // metadata. The token is used by the backend to verify the request and to
// allow the backend to report load to the gRPC LB system. // allow the backend to report load to the gRPC LB system.
//
// Its length is variable but less than 50 bytes.
string load_balance_token = 3; string load_balance_token = 3;
// Indicates whether this particular request should be dropped by the client // Indicates whether this particular request should be dropped by the client
// when this server is chosen from the list. // for rate limiting.
bool drop_request = 4; bool drop_for_rate_limiting = 4;
// Indicates whether this particular request should be dropped by the client
// for load balancing.
bool drop_for_load_balancing = 5;
} }

View File

@ -0,0 +1,54 @@
// This file contains the generated server side code.
// It's only used for grpclb testing.
package grpclb
import (
"google.golang.org/grpc"
lbpb "google.golang.org/grpc/grpclb/grpc_lb_v1"
)
// Server API for LoadBalancer service
type loadBalancerServer interface {
// Bidirectional rpc to get a list of servers.
BalanceLoad(*loadBalancerBalanceLoadServer) error
}
func registerLoadBalancerServer(s *grpc.Server, srv loadBalancerServer) {
s.RegisterService(
&grpc.ServiceDesc{
ServiceName: "grpc.lb.v1.LoadBalancer",
HandlerType: (*loadBalancerServer)(nil),
Methods: []grpc.MethodDesc{},
Streams: []grpc.StreamDesc{
{
StreamName: "BalanceLoad",
Handler: balanceLoadHandler,
ServerStreams: true,
ClientStreams: true,
},
},
Metadata: "grpclb.proto",
}, srv)
}
func balanceLoadHandler(srv interface{}, stream grpc.ServerStream) error {
return srv.(loadBalancerServer).BalanceLoad(&loadBalancerBalanceLoadServer{stream})
}
type loadBalancerBalanceLoadServer struct {
grpc.ServerStream
}
func (x *loadBalancerBalanceLoadServer) Send(m *lbpb.LoadBalanceResponse) error {
return x.ServerStream.SendMsg(m)
}
func (x *loadBalancerBalanceLoadServer) Recv() (*lbpb.LoadBalanceRequest, error) {
m := new(lbpb.LoadBalanceRequest)
if err := x.ServerStream.RecvMsg(m); err != nil {
return nil, err
}
return m, nil
}

View File

@ -114,8 +114,8 @@ func (r *testNameResolver) Resolve(target string) (naming.Watcher, error) {
r.w.update <- &naming.Update{ r.w.update <- &naming.Update{
Op: naming.Add, Op: naming.Add,
Addr: addr, Addr: addr,
Metadata: &Metadata{ Metadata: &grpc.AddrMetadataGRPCLB{
AddrType: GRPCLB, AddrType: grpc.GRPCLB,
ServerName: lbsn, ServerName: lbsn,
}, },
} }
@ -187,7 +187,7 @@ func (b *remoteBalancer) stop() {
close(b.done) close(b.done)
} }
func (b *remoteBalancer) BalanceLoad(stream lbpb.LoadBalancer_BalanceLoadServer) error { func (b *remoteBalancer) BalanceLoad(stream *loadBalancerBalanceLoadServer) error {
req, err := stream.Recv() req, err := stream.Recv()
if err != nil { if err != nil {
return err return err
@ -305,7 +305,7 @@ func newLoadBalancer(numberOfBackends int) (tss *testServers, cleanup func(), er
return return
} }
ls = newRemoteBalancer(nil, nil) ls = newRemoteBalancer(nil, nil)
lbpb.RegisterLoadBalancerServer(lb, ls) registerLoadBalancerServer(lb, ls)
go func() { go func() {
lb.Serve(lbLis) lb.Serve(lbLis)
}() }()
@ -350,7 +350,7 @@ func TestGRPCLB(t *testing.T) {
expected: besn, expected: besn,
} }
ctx, _ := context.WithTimeout(context.Background(), 10*time.Second) ctx, _ := context.WithTimeout(context.Background(), 10*time.Second)
cc, err := grpc.DialContext(ctx, besn, grpc.WithBalancer(Balancer(&testNameResolver{ cc, err := grpc.DialContext(ctx, besn, grpc.WithBalancer(grpc.NewGRPCLBBalancer(&testNameResolver{
addrs: []string{tss.lbAddr}, addrs: []string{tss.lbAddr},
})), grpc.WithBlock(), grpc.WithTransportCredentials(&creds)) })), grpc.WithBlock(), grpc.WithTransportCredentials(&creds))
if err != nil { if err != nil {
@ -371,15 +371,15 @@ func TestDropRequest(t *testing.T) {
defer cleanup() defer cleanup()
tss.ls.sls = []*lbpb.ServerList{{ tss.ls.sls = []*lbpb.ServerList{{
Servers: []*lbpb.Server{{ Servers: []*lbpb.Server{{
IpAddress: tss.beIPs[0], IpAddress: tss.beIPs[0],
Port: int32(tss.bePorts[0]), Port: int32(tss.bePorts[0]),
LoadBalanceToken: lbToken, LoadBalanceToken: lbToken,
DropRequest: true, DropForLoadBalancing: true,
}, { }, {
IpAddress: tss.beIPs[1], IpAddress: tss.beIPs[1],
Port: int32(tss.bePorts[1]), Port: int32(tss.bePorts[1]),
LoadBalanceToken: lbToken, LoadBalanceToken: lbToken,
DropRequest: false, DropForLoadBalancing: false,
}}, }},
}} }}
tss.ls.intervals = []time.Duration{0} tss.ls.intervals = []time.Duration{0}
@ -387,7 +387,7 @@ func TestDropRequest(t *testing.T) {
expected: besn, expected: besn,
} }
ctx, _ := context.WithTimeout(context.Background(), 10*time.Second) ctx, _ := context.WithTimeout(context.Background(), 10*time.Second)
cc, err := grpc.DialContext(ctx, besn, grpc.WithBalancer(Balancer(&testNameResolver{ cc, err := grpc.DialContext(ctx, besn, grpc.WithBalancer(grpc.NewGRPCLBBalancer(&testNameResolver{
addrs: []string{tss.lbAddr}, addrs: []string{tss.lbAddr},
})), grpc.WithBlock(), grpc.WithTransportCredentials(&creds)) })), grpc.WithBlock(), grpc.WithTransportCredentials(&creds))
if err != nil { if err != nil {
@ -395,12 +395,12 @@ func TestDropRequest(t *testing.T) {
} }
helloC := hwpb.NewGreeterClient(cc) helloC := hwpb.NewGreeterClient(cc)
// The 1st, non-fail-fast RPC should succeed. This ensures both server // The 1st, non-fail-fast RPC should succeed. This ensures both server
// connections are made, because the first one has DropRequest set to true. // connections are made, because the first one has DropForLoadBalancing set to true.
if _, err := helloC.SayHello(context.Background(), &hwpb.HelloRequest{Name: "grpc"}, grpc.FailFast(false)); err != nil { if _, err := helloC.SayHello(context.Background(), &hwpb.HelloRequest{Name: "grpc"}, grpc.FailFast(false)); err != nil {
t.Fatalf("%v.SayHello(_, _) = _, %v, want _, <nil>", helloC, err) t.Fatalf("%v.SayHello(_, _) = _, %v, want _, <nil>", helloC, err)
} }
for i := 0; i < 3; i++ { for i := 0; i < 3; i++ {
// Odd fail-fast RPCs should fail, because the 1st backend has DropRequest // Odd fail-fast RPCs should fail, because the 1st backend has DropForLoadBalancing
// set to true. // set to true.
if _, err := helloC.SayHello(context.Background(), &hwpb.HelloRequest{Name: "grpc"}); grpc.Code(err) != codes.Unavailable { if _, err := helloC.SayHello(context.Background(), &hwpb.HelloRequest{Name: "grpc"}); grpc.Code(err) != codes.Unavailable {
t.Fatalf("%v.SayHello(_, _) = _, %v, want _, %s", helloC, err, codes.Unavailable) t.Fatalf("%v.SayHello(_, _) = _, %v, want _, %s", helloC, err, codes.Unavailable)
@ -421,10 +421,10 @@ func TestDropRequestFailedNonFailFast(t *testing.T) {
} }
defer cleanup() defer cleanup()
be := &lbpb.Server{ be := &lbpb.Server{
IpAddress: tss.beIPs[0], IpAddress: tss.beIPs[0],
Port: int32(tss.bePorts[0]), Port: int32(tss.bePorts[0]),
LoadBalanceToken: lbToken, LoadBalanceToken: lbToken,
DropRequest: true, DropForLoadBalancing: true,
} }
var bes []*lbpb.Server var bes []*lbpb.Server
bes = append(bes, be) bes = append(bes, be)
@ -437,7 +437,7 @@ func TestDropRequestFailedNonFailFast(t *testing.T) {
expected: besn, expected: besn,
} }
ctx, _ := context.WithTimeout(context.Background(), 10*time.Second) ctx, _ := context.WithTimeout(context.Background(), 10*time.Second)
cc, err := grpc.DialContext(ctx, besn, grpc.WithBalancer(Balancer(&testNameResolver{ cc, err := grpc.DialContext(ctx, besn, grpc.WithBalancer(grpc.NewGRPCLBBalancer(&testNameResolver{
addrs: []string{tss.lbAddr}, addrs: []string{tss.lbAddr},
})), grpc.WithBlock(), grpc.WithTransportCredentials(&creds)) })), grpc.WithBlock(), grpc.WithTransportCredentials(&creds))
if err != nil { if err != nil {
@ -487,7 +487,7 @@ func TestServerExpiration(t *testing.T) {
expected: besn, expected: besn,
} }
ctx, _ := context.WithTimeout(context.Background(), 10*time.Second) ctx, _ := context.WithTimeout(context.Background(), 10*time.Second)
cc, err := grpc.DialContext(ctx, besn, grpc.WithBalancer(Balancer(&testNameResolver{ cc, err := grpc.DialContext(ctx, besn, grpc.WithBalancer(grpc.NewGRPCLBBalancer(&testNameResolver{
addrs: []string{tss.lbAddr}, addrs: []string{tss.lbAddr},
})), grpc.WithBlock(), grpc.WithTransportCredentials(&creds)) })), grpc.WithBlock(), grpc.WithTransportCredentials(&creds))
if err != nil { if err != nil {
@ -547,7 +547,7 @@ func TestBalancerDisconnects(t *testing.T) {
resolver := &testNameResolver{ resolver := &testNameResolver{
addrs: lbAddrs[:2], addrs: lbAddrs[:2],
} }
cc, err := grpc.DialContext(ctx, besn, grpc.WithBalancer(Balancer(resolver)), grpc.WithBlock(), grpc.WithTransportCredentials(&creds)) cc, err := grpc.DialContext(ctx, besn, grpc.WithBalancer(grpc.NewGRPCLBBalancer(resolver)), grpc.WithBlock(), grpc.WithTransportCredentials(&creds))
if err != nil { if err != nil {
t.Fatalf("Failed to dial to the backend %v", err) t.Fatalf("Failed to dial to the backend %v", err)
} }
@ -576,8 +576,8 @@ func TestBalancerDisconnects(t *testing.T) {
resolver.inject([]*naming.Update{ resolver.inject([]*naming.Update{
{Op: naming.Add, {Op: naming.Add,
Addr: lbAddrs[2], Addr: lbAddrs[2],
Metadata: &Metadata{ Metadata: &grpc.AddrMetadataGRPCLB{
AddrType: GRPCLB, AddrType: grpc.GRPCLB,
ServerName: lbsn, ServerName: lbsn,
}, },
}, },