i) revise picker API and channel state API; ii) add unicastNamingPicker which supports custom name resolving.
This commit is contained in:
@ -81,10 +81,11 @@ func (h *testStreamHandler) handleStream(t *testing.T, s *transport.Stream) {
|
|||||||
break
|
break
|
||||||
}
|
}
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Fatalf("Failed to receive the message from the client.")
|
return
|
||||||
}
|
}
|
||||||
if pf != compressionNone {
|
if pf != compressionNone {
|
||||||
t.Fatalf("Received the mistaken message format %d, want %d", pf, compressionNone)
|
t.Errorf("Received the mistaken message format %d, want %d", pf, compressionNone)
|
||||||
|
return
|
||||||
}
|
}
|
||||||
var v string
|
var v string
|
||||||
codec := testCodec{}
|
codec := testCodec{}
|
||||||
|
105
clientconn.go
105
clientconn.go
@ -89,6 +89,12 @@ func WithCodec(c Codec) DialOption {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func WithPicker(p Picker) DialOption {
|
||||||
|
return func(o *dialOptions) {
|
||||||
|
o.picker = p
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
// WithBlock returns a DialOption which makes caller of Dial blocks until the underlying
|
// WithBlock returns a DialOption which makes caller of Dial blocks until the underlying
|
||||||
// connection is up. Without this, Dial returns immediately and connecting the server
|
// connection is up. Without this, Dial returns immediately and connecting the server
|
||||||
// happens in background.
|
// happens in background.
|
||||||
@ -154,7 +160,9 @@ func Dial(target string, opts ...DialOption) (*ClientConn, error) {
|
|||||||
cc.dopts.codec = protoCodec{}
|
cc.dopts.codec = protoCodec{}
|
||||||
}
|
}
|
||||||
if cc.dopts.picker == nil {
|
if cc.dopts.picker == nil {
|
||||||
cc.dopts.picker = &unicastPicker{}
|
cc.dopts.picker = &unicastPicker{
|
||||||
|
target: target,
|
||||||
|
}
|
||||||
}
|
}
|
||||||
if err := cc.dopts.picker.Init(cc); err != nil {
|
if err := cc.dopts.picker.Init(cc); err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
@ -209,15 +217,15 @@ type ClientConn struct {
|
|||||||
|
|
||||||
// State returns the connectivity state of cc.
|
// State returns the connectivity state of cc.
|
||||||
// This is EXPERIMENTAL API.
|
// This is EXPERIMENTAL API.
|
||||||
func (cc *ClientConn) State() ConnectivityState {
|
func (cc *ClientConn) State() (ConnectivityState, error) {
|
||||||
return cc.dopts.picker.State()
|
return cc.dopts.picker.State()
|
||||||
}
|
}
|
||||||
|
|
||||||
// WaitForStateChange blocks until the state changes to something other than the sourceState
|
// WaitForStateChange blocks until the state changes to something other than the sourceState.
|
||||||
// or timeout fires on cc. It returns false if timeout fires, and true otherwise.
|
// It returns the new state or error.
|
||||||
// This is EXPERIMENTAL API.
|
// This is EXPERIMENTAL API.
|
||||||
func (cc *ClientConn) WaitForStateChange(timeout time.Duration, sourceState ConnectivityState) bool {
|
func (cc *ClientConn) WaitForStateChange(ctx context.Context, sourceState ConnectivityState) (ConnectivityState, error) {
|
||||||
return cc.dopts.picker.WaitForStateChange(timeout, sourceState)
|
return cc.dopts.picker.WaitForStateChange(ctx, sourceState)
|
||||||
}
|
}
|
||||||
|
|
||||||
// Close starts to tear down the ClientConn.
|
// Close starts to tear down the ClientConn.
|
||||||
@ -229,6 +237,7 @@ func (cc *ClientConn) Close() error {
|
|||||||
type Conn struct {
|
type Conn struct {
|
||||||
target string
|
target string
|
||||||
dopts dialOptions
|
dopts dialOptions
|
||||||
|
resetChan chan int
|
||||||
shutdownChan chan struct{}
|
shutdownChan chan struct{}
|
||||||
events trace.EventLog
|
events trace.EventLog
|
||||||
|
|
||||||
@ -249,6 +258,7 @@ func NewConn(cc *ClientConn) (*Conn, error) {
|
|||||||
c := &Conn{
|
c := &Conn{
|
||||||
target: cc.target,
|
target: cc.target,
|
||||||
dopts: cc.dopts,
|
dopts: cc.dopts,
|
||||||
|
resetChan: make(chan int, 1),
|
||||||
shutdownChan: make(chan struct{}),
|
shutdownChan: make(chan struct{}),
|
||||||
}
|
}
|
||||||
if EnableTracing {
|
if EnableTracing {
|
||||||
@ -317,26 +327,20 @@ func (cc *Conn) State() ConnectivityState {
|
|||||||
return cc.state
|
return cc.state
|
||||||
}
|
}
|
||||||
|
|
||||||
// WaitForStateChange blocks until the state changes to something other than the sourceState
|
// WaitForStateChange blocks until the state changes to something other than the sourceState.
|
||||||
// or timeout fires. It returns false if timeout fires and true otherwise.
|
func (cc *Conn) WaitForStateChange(ctx context.Context, sourceState ConnectivityState) (ConnectivityState, error) {
|
||||||
// TODO(zhaoq): Rewrite for complex Picker.
|
|
||||||
func (cc *Conn) WaitForStateChange(timeout time.Duration, sourceState ConnectivityState) bool {
|
|
||||||
start := time.Now()
|
|
||||||
cc.mu.Lock()
|
cc.mu.Lock()
|
||||||
defer cc.mu.Unlock()
|
defer cc.mu.Unlock()
|
||||||
if sourceState != cc.state {
|
if sourceState != cc.state {
|
||||||
return true
|
return cc.state, nil
|
||||||
}
|
|
||||||
expired := timeout <= time.Since(start)
|
|
||||||
if expired {
|
|
||||||
return false
|
|
||||||
}
|
}
|
||||||
done := make(chan struct{})
|
done := make(chan struct{})
|
||||||
|
var err error
|
||||||
go func() {
|
go func() {
|
||||||
select {
|
select {
|
||||||
case <-time.After(timeout - time.Since(start)):
|
case <-ctx.Done():
|
||||||
cc.mu.Lock()
|
cc.mu.Lock()
|
||||||
expired = true
|
err = ctx.Err()
|
||||||
cc.stateCV.Broadcast()
|
cc.stateCV.Broadcast()
|
||||||
cc.mu.Unlock()
|
cc.mu.Unlock()
|
||||||
case <-done:
|
case <-done:
|
||||||
@ -345,11 +349,18 @@ func (cc *Conn) WaitForStateChange(timeout time.Duration, sourceState Connectivi
|
|||||||
defer close(done)
|
defer close(done)
|
||||||
for sourceState == cc.state {
|
for sourceState == cc.state {
|
||||||
cc.stateCV.Wait()
|
cc.stateCV.Wait()
|
||||||
if expired {
|
if err != nil {
|
||||||
return false
|
return cc.state, err
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
return true
|
return cc.state, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (cc *Conn) NotifyReset() {
|
||||||
|
select {
|
||||||
|
case cc.resetChan <- 0:
|
||||||
|
default:
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func (cc *Conn) resetTransport(closeTransport bool) error {
|
func (cc *Conn) resetTransport(closeTransport bool) error {
|
||||||
@ -391,7 +402,11 @@ func (cc *Conn) resetTransport(closeTransport bool) error {
|
|||||||
copts.Timeout = timeout
|
copts.Timeout = timeout
|
||||||
}
|
}
|
||||||
connectTime := time.Now()
|
connectTime := time.Now()
|
||||||
newTransport, err := transport.NewClientTransport(cc.target, &copts)
|
addr, err := cc.dopts.picker.PickAddr()
|
||||||
|
var newTransport transport.ClientTransport
|
||||||
|
if err == nil {
|
||||||
|
newTransport, err = transport.NewClientTransport(addr, &copts)
|
||||||
|
}
|
||||||
if err != nil {
|
if err != nil {
|
||||||
cc.mu.Lock()
|
cc.mu.Lock()
|
||||||
if cc.state == Shutdown {
|
if cc.state == Shutdown {
|
||||||
@ -422,7 +437,7 @@ func (cc *Conn) resetTransport(closeTransport bool) error {
|
|||||||
closeTransport = false
|
closeTransport = false
|
||||||
time.Sleep(sleepTime)
|
time.Sleep(sleepTime)
|
||||||
retries++
|
retries++
|
||||||
grpclog.Printf("grpc: ClientConn.resetTransport failed to create client transport: %v; Reconnecting to %q", err, cc.target)
|
grpclog.Printf("grpc: Conn.resetTransport failed to create client transport: %v; Reconnecting to %q", err, cc.target)
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
cc.mu.Lock()
|
cc.mu.Lock()
|
||||||
@ -445,6 +460,27 @@ func (cc *Conn) resetTransport(closeTransport bool) error {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (cc *Conn) reconnect() bool {
|
||||||
|
cc.mu.Lock()
|
||||||
|
if cc.state == Shutdown {
|
||||||
|
// cc.Close() has been invoked.
|
||||||
|
cc.mu.Unlock()
|
||||||
|
return false
|
||||||
|
}
|
||||||
|
cc.state = TransientFailure
|
||||||
|
cc.stateCV.Broadcast()
|
||||||
|
cc.mu.Unlock()
|
||||||
|
if err := cc.resetTransport(true); err != nil {
|
||||||
|
// The ClientConn is closing.
|
||||||
|
cc.mu.Lock()
|
||||||
|
cc.printf("transport exiting: %v", err)
|
||||||
|
cc.mu.Unlock()
|
||||||
|
grpclog.Printf("grpc: Conn.transportMonitor exits due to: %v", err)
|
||||||
|
return false
|
||||||
|
}
|
||||||
|
return true
|
||||||
|
}
|
||||||
|
|
||||||
// Run in a goroutine to track the error in transport and create the
|
// Run in a goroutine to track the error in transport and create the
|
||||||
// new transport if an error happens. It returns when the channel is closing.
|
// new transport if an error happens. It returns when the channel is closing.
|
||||||
func (cc *Conn) transportMonitor() {
|
func (cc *Conn) transportMonitor() {
|
||||||
@ -454,25 +490,18 @@ func (cc *Conn) transportMonitor() {
|
|||||||
// the ClientConn is idle (i.e., no RPC in flight).
|
// the ClientConn is idle (i.e., no RPC in flight).
|
||||||
case <-cc.shutdownChan:
|
case <-cc.shutdownChan:
|
||||||
return
|
return
|
||||||
|
case <-cc.resetChan:
|
||||||
|
if !cc.reconnect() {
|
||||||
|
return
|
||||||
|
}
|
||||||
case <-cc.transport.Error():
|
case <-cc.transport.Error():
|
||||||
cc.mu.Lock()
|
if !cc.reconnect() {
|
||||||
if cc.state == Shutdown {
|
|
||||||
// cc.Close() has been invoked.
|
|
||||||
cc.mu.Unlock()
|
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
cc.state = TransientFailure
|
select {
|
||||||
cc.stateCV.Broadcast()
|
case <-cc.resetChan:
|
||||||
cc.mu.Unlock()
|
default:
|
||||||
if err := cc.resetTransport(true); err != nil {
|
|
||||||
// The ClientConn is closing.
|
|
||||||
cc.mu.Lock()
|
|
||||||
cc.printf("transport exiting: %v", err)
|
|
||||||
cc.mu.Unlock()
|
|
||||||
grpclog.Printf("grpc: ClientConn.transportMonitor exits due to: %v", err)
|
|
||||||
return
|
|
||||||
}
|
}
|
||||||
continue
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
170
picker.go
170
picker.go
@ -34,9 +34,13 @@
|
|||||||
package grpc
|
package grpc
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"time"
|
"container/list"
|
||||||
|
"fmt"
|
||||||
|
"sync"
|
||||||
|
|
||||||
"golang.org/x/net/context"
|
"golang.org/x/net/context"
|
||||||
|
"google.golang.org/grpc/grpclog"
|
||||||
|
"google.golang.org/grpc/naming"
|
||||||
"google.golang.org/grpc/transport"
|
"google.golang.org/grpc/transport"
|
||||||
)
|
)
|
||||||
|
|
||||||
@ -48,12 +52,14 @@ type Picker interface {
|
|||||||
// Pick blocks until either a transport.ClientTransport is ready for the upcoming RPC
|
// Pick blocks until either a transport.ClientTransport is ready for the upcoming RPC
|
||||||
// or some error happens.
|
// or some error happens.
|
||||||
Pick(ctx context.Context) (transport.ClientTransport, error)
|
Pick(ctx context.Context) (transport.ClientTransport, error)
|
||||||
|
// PickAddr picks a peer address for connecting. This will be called repeated for
|
||||||
|
// connecting/reconnecting.
|
||||||
|
PickAddr() (string, error)
|
||||||
// State returns the connectivity state of the underlying connections.
|
// State returns the connectivity state of the underlying connections.
|
||||||
State() ConnectivityState
|
State() (ConnectivityState, error)
|
||||||
// WaitForStateChange blocks until the state changes to something other than
|
// WaitForStateChange blocks until the state changes to something other than
|
||||||
// the sourceState or timeout fires on cc. It returns false if timeout fires,
|
// the sourceState. It returns the new state or error.
|
||||||
// and true otherwise.
|
WaitForStateChange(ctx context.Context, sourceState ConnectivityState) (ConnectivityState, error)
|
||||||
WaitForStateChange(timeout time.Duration, sourceState ConnectivityState) bool
|
|
||||||
// Close closes all the Conn's owned by this Picker.
|
// Close closes all the Conn's owned by this Picker.
|
||||||
Close() error
|
Close() error
|
||||||
}
|
}
|
||||||
@ -61,7 +67,8 @@ type Picker interface {
|
|||||||
// unicastPicker is the default Picker which is used when there is no custom Picker
|
// unicastPicker is the default Picker which is used when there is no custom Picker
|
||||||
// specified by users. It always picks the same Conn.
|
// specified by users. It always picks the same Conn.
|
||||||
type unicastPicker struct {
|
type unicastPicker struct {
|
||||||
conn *Conn
|
target string
|
||||||
|
conn *Conn
|
||||||
}
|
}
|
||||||
|
|
||||||
func (p *unicastPicker) Init(cc *ClientConn) error {
|
func (p *unicastPicker) Init(cc *ClientConn) error {
|
||||||
@ -77,12 +84,16 @@ func (p *unicastPicker) Pick(ctx context.Context) (transport.ClientTransport, er
|
|||||||
return p.conn.Wait(ctx)
|
return p.conn.Wait(ctx)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (p *unicastPicker) State() ConnectivityState {
|
func (p *unicastPicker) PickAddr() (string, error) {
|
||||||
return p.conn.State()
|
return p.target, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (p *unicastPicker) WaitForStateChange(timeout time.Duration, sourceState ConnectivityState) bool {
|
func (p *unicastPicker) State() (ConnectivityState, error) {
|
||||||
return p.conn.WaitForStateChange(timeout, sourceState)
|
return p.conn.State(), nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (p *unicastPicker) WaitForStateChange(ctx context.Context, sourceState ConnectivityState) (ConnectivityState, error) {
|
||||||
|
return p.conn.WaitForStateChange(ctx, sourceState)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (p *unicastPicker) Close() error {
|
func (p *unicastPicker) Close() error {
|
||||||
@ -91,3 +102,142 @@ func (p *unicastPicker) Close() error {
|
|||||||
}
|
}
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// unicastNamingPicker picks an address from a name resolver to set up the connection.
|
||||||
|
type unicastNamingPicker struct {
|
||||||
|
cc *ClientConn
|
||||||
|
resolver naming.Resolver
|
||||||
|
watcher naming.Watcher
|
||||||
|
mu sync.Mutex
|
||||||
|
// The list of the addresses are obtained from watcher.
|
||||||
|
addrs *list.List
|
||||||
|
// It tracks the current picked addr by PickAddr(). The next PickAddr may
|
||||||
|
// push it forward on addrs.
|
||||||
|
pickedAddr *list.Element
|
||||||
|
conn *Conn
|
||||||
|
}
|
||||||
|
|
||||||
|
// NewUnicastNamingPicker creates a Picker to pick addresses from a name resolver
|
||||||
|
// to connect.
|
||||||
|
func NewUnicastNamingPicker(r naming.Resolver) Picker {
|
||||||
|
return &unicastNamingPicker{
|
||||||
|
resolver: r,
|
||||||
|
addrs: list.New(),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
type addrInfo struct {
|
||||||
|
addr string
|
||||||
|
// Set to true if this addrInfo needs to be deleted in the next PickAddrr() call.
|
||||||
|
deleting bool
|
||||||
|
}
|
||||||
|
|
||||||
|
// processUpdates calls Watcher.Next() once and processes the obtained updates.
|
||||||
|
func (p *unicastNamingPicker) processUpdates() error {
|
||||||
|
updates, err := p.watcher.Next()
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
for _, update := range updates {
|
||||||
|
switch update.Op {
|
||||||
|
case naming.Add:
|
||||||
|
p.mu.Lock()
|
||||||
|
p.addrs.PushBack(&addrInfo{
|
||||||
|
addr: update.Addr,
|
||||||
|
})
|
||||||
|
p.mu.Unlock()
|
||||||
|
// Initial connection setup
|
||||||
|
if p.conn == nil {
|
||||||
|
conn, err := NewConn(p.cc)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
p.conn = conn
|
||||||
|
}
|
||||||
|
case naming.Delete:
|
||||||
|
p.mu.Lock()
|
||||||
|
for e := p.addrs.Front(); e != nil; e = e.Next() {
|
||||||
|
if update.Addr == e.Value.(*addrInfo).addr {
|
||||||
|
if e == p.pickedAddr {
|
||||||
|
// Do not remove the element now if it is the current picked
|
||||||
|
// one. We leave the deletion to the next PickAddr() call.
|
||||||
|
e.Value.(*addrInfo).deleting = true
|
||||||
|
// Notify Conn to close it. All the live RPCs on this connection
|
||||||
|
// will be aborted.
|
||||||
|
p.conn.NotifyReset()
|
||||||
|
} else {
|
||||||
|
p.addrs.Remove(e)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
p.mu.Unlock()
|
||||||
|
default:
|
||||||
|
grpclog.Println("Unknown update.Op %d", update.Op)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// monitor runs in a standalone goroutine to keep watching name resolution updates until the watcher
|
||||||
|
// is closed.
|
||||||
|
func (p *unicastNamingPicker) monitor() {
|
||||||
|
for {
|
||||||
|
if err := p.processUpdates(); err != nil {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (p *unicastNamingPicker) Init(cc *ClientConn) error {
|
||||||
|
w, err := p.resolver.Resolve(cc.target)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
p.watcher = w
|
||||||
|
p.cc = cc
|
||||||
|
// Get the initial name resolution.
|
||||||
|
if err := p.processUpdates(); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
go p.monitor()
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (p *unicastNamingPicker) Pick(ctx context.Context) (transport.ClientTransport, error) {
|
||||||
|
return p.conn.Wait(ctx)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (p *unicastNamingPicker) PickAddr() (string, error) {
|
||||||
|
p.mu.Lock()
|
||||||
|
defer p.mu.Unlock()
|
||||||
|
if p.pickedAddr == nil {
|
||||||
|
p.pickedAddr = p.addrs.Front()
|
||||||
|
} else {
|
||||||
|
pa := p.pickedAddr
|
||||||
|
p.pickedAddr = pa.Next()
|
||||||
|
if pa.Value.(*addrInfo).deleting {
|
||||||
|
p.addrs.Remove(pa)
|
||||||
|
}
|
||||||
|
if p.pickedAddr == nil {
|
||||||
|
p.pickedAddr = p.addrs.Front()
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if p.pickedAddr == nil {
|
||||||
|
return "", fmt.Errorf("there is no address available to pick")
|
||||||
|
}
|
||||||
|
return p.pickedAddr.Value.(*addrInfo).addr, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (p *unicastNamingPicker) State() (ConnectivityState, error) {
|
||||||
|
return 0, fmt.Errorf("State() is not supported for unicastNamingPicker")
|
||||||
|
}
|
||||||
|
|
||||||
|
func (p *unicastNamingPicker) WaitForStateChange(ctx context.Context, sourceState ConnectivityState) (ConnectivityState, error) {
|
||||||
|
return 0, fmt.Errorf("WaitForStateChange is not supported for unicastNamingPciker")
|
||||||
|
}
|
||||||
|
|
||||||
|
func (p *unicastNamingPicker) Close() error {
|
||||||
|
p.watcher.Close()
|
||||||
|
p.conn.Close()
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
182
picker_test.go
Normal file
182
picker_test.go
Normal file
@ -0,0 +1,182 @@
|
|||||||
|
/*
|
||||||
|
*
|
||||||
|
* Copyright 2014, Google Inc.
|
||||||
|
* All rights reserved.
|
||||||
|
*
|
||||||
|
* Redistribution and use in source and binary forms, with or without
|
||||||
|
* modification, are permitted provided that the following conditions are
|
||||||
|
* met:
|
||||||
|
*
|
||||||
|
* * Redistributions of source code must retain the above copyright
|
||||||
|
* notice, this list of conditions and the following disclaimer.
|
||||||
|
* * Redistributions in binary form must reproduce the above
|
||||||
|
* copyright notice, this list of conditions and the following disclaimer
|
||||||
|
* in the documentation and/or other materials provided with the
|
||||||
|
* distribution.
|
||||||
|
* * Neither the name of Google Inc. nor the names of its
|
||||||
|
* contributors may be used to endorse or promote products derived from
|
||||||
|
* this software without specific prior written permission.
|
||||||
|
*
|
||||||
|
* THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
|
||||||
|
* "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
|
||||||
|
* LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
|
||||||
|
* A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
|
||||||
|
* OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
|
||||||
|
* SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
|
||||||
|
* LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
|
||||||
|
* DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
|
||||||
|
* THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
|
||||||
|
* (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
|
||||||
|
* OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
|
||||||
|
*
|
||||||
|
*/
|
||||||
|
|
||||||
|
package grpc
|
||||||
|
|
||||||
|
import (
|
||||||
|
"fmt"
|
||||||
|
"math"
|
||||||
|
"testing"
|
||||||
|
"time"
|
||||||
|
|
||||||
|
"golang.org/x/net/context"
|
||||||
|
"google.golang.org/grpc/naming"
|
||||||
|
)
|
||||||
|
|
||||||
|
type testWatcher struct {
|
||||||
|
// the channel to receives name resolution updates
|
||||||
|
update chan *naming.Update
|
||||||
|
// the side channel to get to know how many updates in a batch
|
||||||
|
side chan int
|
||||||
|
// the channel to notifiy update injector that the update reading is done
|
||||||
|
readDone chan int
|
||||||
|
}
|
||||||
|
|
||||||
|
func (w *testWatcher) Next() (updates []*naming.Update, err error) {
|
||||||
|
n := <-w.side
|
||||||
|
if n == 0 {
|
||||||
|
return nil, fmt.Errorf("w.side is closed")
|
||||||
|
}
|
||||||
|
for i := 0; i < n; i++ {
|
||||||
|
u := <-w.update
|
||||||
|
if u != nil {
|
||||||
|
updates = append(updates, u)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
w.readDone <- 0
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
func (w *testWatcher) Close() {
|
||||||
|
}
|
||||||
|
|
||||||
|
func (w *testWatcher) inject(updates []*naming.Update) {
|
||||||
|
w.side <- len(updates)
|
||||||
|
for _, u := range updates {
|
||||||
|
w.update <- u
|
||||||
|
}
|
||||||
|
<-w.readDone
|
||||||
|
}
|
||||||
|
|
||||||
|
type testNameResolver struct {
|
||||||
|
w *testWatcher
|
||||||
|
addr string
|
||||||
|
}
|
||||||
|
|
||||||
|
func (r *testNameResolver) Resolve(target string) (naming.Watcher, error) {
|
||||||
|
r.w = &testWatcher{
|
||||||
|
update: make(chan *naming.Update, 1),
|
||||||
|
side: make(chan int, 1),
|
||||||
|
readDone: make(chan int),
|
||||||
|
}
|
||||||
|
r.w.side <- 1
|
||||||
|
r.w.update <- &naming.Update{
|
||||||
|
Op: naming.Add,
|
||||||
|
Addr: r.addr,
|
||||||
|
}
|
||||||
|
go func() {
|
||||||
|
<-r.w.readDone
|
||||||
|
}()
|
||||||
|
return r.w, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func startServers(t *testing.T, numServers, port int, maxStreams uint32) ([]*server, *testNameResolver) {
|
||||||
|
var servers []*server
|
||||||
|
for i := 0; i < numServers; i++ {
|
||||||
|
s := &server{readyChan: make(chan bool)}
|
||||||
|
servers = append(servers, s)
|
||||||
|
go s.start(t, port, maxStreams)
|
||||||
|
s.wait(t, 2*time.Second)
|
||||||
|
}
|
||||||
|
// Point to server1
|
||||||
|
addr := "127.0.0.1:" + servers[0].port
|
||||||
|
return servers, &testNameResolver{addr: addr}
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestNameDiscovery(t *testing.T) {
|
||||||
|
// Start 3 servers on 3 ports.
|
||||||
|
servers, r := startServers(t, 3, 0, math.MaxUint32)
|
||||||
|
cc, err := Dial("foo.bar.com", WithPicker(NewUnicastNamingPicker(r)), WithBlock(), WithInsecure(), WithCodec(testCodec{}))
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("Failed to create ClientConn: %v", err)
|
||||||
|
}
|
||||||
|
var reply string
|
||||||
|
if err := Invoke(context.Background(), "/foo/bar", &expectedRequest, &reply, cc); err != nil || reply != expectedResponse {
|
||||||
|
t.Fatalf("grpc.Invoke(_, _, _, _, _) = %v, want <nil>", err)
|
||||||
|
}
|
||||||
|
// Inject name resolution change to point to the second server now.
|
||||||
|
var updates []*naming.Update
|
||||||
|
updates = append(updates, &naming.Update{
|
||||||
|
Op: naming.Delete,
|
||||||
|
Addr: "127.0.0.1:" + servers[0].port,
|
||||||
|
})
|
||||||
|
updates = append(updates, &naming.Update{
|
||||||
|
Op: naming.Add,
|
||||||
|
Addr: "127.0.0.1:" + servers[1].port,
|
||||||
|
})
|
||||||
|
r.w.inject(updates)
|
||||||
|
servers[0].stop()
|
||||||
|
if err := Invoke(context.Background(), "/foo/bar", &expectedRequest, &reply, cc); err != nil || reply != expectedResponse {
|
||||||
|
t.Fatalf("grpc.Invoke(_, _, _, _, _) = %v, want <nil>", err)
|
||||||
|
}
|
||||||
|
// Add another server address (server#3) to name resolution
|
||||||
|
updates = nil
|
||||||
|
updates = append(updates, &naming.Update{
|
||||||
|
Op: naming.Add,
|
||||||
|
Addr: "127.0.0.1:" + servers[2].port,
|
||||||
|
})
|
||||||
|
r.w.inject(updates)
|
||||||
|
// Stop server#2. The library should direct to server#3 automatically.
|
||||||
|
servers[1].stop()
|
||||||
|
if err := Invoke(context.Background(), "/foo/bar", &expectedRequest, &reply, cc); err != nil || reply != expectedResponse {
|
||||||
|
t.Fatalf("grpc.Invoke(_, _, _, _, _) = %v, want <nil>", err)
|
||||||
|
}
|
||||||
|
cc.Close()
|
||||||
|
servers[2].stop()
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestEmptyAddrs(t *testing.T) {
|
||||||
|
servers, r := startServers(t, 1, 0, math.MaxUint32)
|
||||||
|
cc, err := Dial("foo.bar.com", WithPicker(NewUnicastNamingPicker(r)), WithBlock(), WithInsecure(), WithCodec(testCodec{}))
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("Failed to create ClientConn: %v", err)
|
||||||
|
}
|
||||||
|
var reply string
|
||||||
|
if err := Invoke(context.Background(), "/foo/bar", &expectedRequest, &reply, cc); err != nil || reply != expectedResponse {
|
||||||
|
t.Fatalf("grpc.Invoke(_, _, _, _, _) = %v, want <nil>", err)
|
||||||
|
}
|
||||||
|
// Inject name resolution change to remove the server address so that there is no address
|
||||||
|
// available after that.
|
||||||
|
var updates []*naming.Update
|
||||||
|
updates = append(updates, &naming.Update{
|
||||||
|
Op: naming.Delete,
|
||||||
|
Addr: "127.0.0.1:" + servers[0].port,
|
||||||
|
})
|
||||||
|
r.w.inject(updates)
|
||||||
|
ctx, _ := context.WithTimeout(context.Background(), 10*time.Millisecond)
|
||||||
|
if err := Invoke(ctx, "/foo/bar", &expectedRequest, &reply, cc); err == nil {
|
||||||
|
t.Errorf("grpc.Invoke(_, _, _, _, _) = %v, want non-<nil>", err)
|
||||||
|
}
|
||||||
|
cc.Close()
|
||||||
|
servers[0].stop()
|
||||||
|
}
|
@ -388,32 +388,35 @@ func TestTimeoutOnDeadServer(t *testing.T) {
|
|||||||
func testTimeoutOnDeadServer(t *testing.T, e env) {
|
func testTimeoutOnDeadServer(t *testing.T, e env) {
|
||||||
s, cc := setUp(t, nil, math.MaxUint32, "", e)
|
s, cc := setUp(t, nil, math.MaxUint32, "", e)
|
||||||
tc := testpb.NewTestServiceClient(cc)
|
tc := testpb.NewTestServiceClient(cc)
|
||||||
if ok := cc.WaitForStateChange(time.Second, grpc.Idle); !ok {
|
ctx, _ := context.WithTimeout(context.Background(), time.Second)
|
||||||
t.Fatalf("cc.WaitForStateChange(_, %s) = %t, want true", grpc.Idle, ok)
|
if _, err := cc.WaitForStateChange(ctx, grpc.Idle); err != nil {
|
||||||
|
t.Fatalf("cc.WaitForStateChange(_, %s) = _, %v, want _, <nil>", grpc.Idle, err)
|
||||||
}
|
}
|
||||||
if ok := cc.WaitForStateChange(time.Second, grpc.Connecting); !ok {
|
ctx, _ = context.WithTimeout(context.Background(), time.Second)
|
||||||
t.Fatalf("cc.WaitForStateChange(_, %s) = %t, want true", grpc.Connecting, ok)
|
if _, err := cc.WaitForStateChange(ctx, grpc.Connecting); err != nil {
|
||||||
|
t.Fatalf("cc.WaitForStateChange(_, %s) = _, %v, want _, <nil>", grpc.Connecting, err)
|
||||||
}
|
}
|
||||||
if cc.State() != grpc.Ready {
|
if state, err := cc.State(); err != nil || state != grpc.Ready {
|
||||||
t.Fatalf("cc.State() = %s, want %s", cc.State(), grpc.Ready)
|
t.Fatalf("cc.State() = %s, %v, want %s, <nil>", state, err, grpc.Ready)
|
||||||
}
|
}
|
||||||
if ok := cc.WaitForStateChange(time.Millisecond, grpc.Ready); ok {
|
ctx, _ = context.WithTimeout(context.Background(), time.Second)
|
||||||
t.Fatalf("cc.WaitForStateChange(_, %s) = %t, want false", grpc.Ready, ok)
|
if _, err := cc.WaitForStateChange(ctx, grpc.Ready); err != context.DeadlineExceeded {
|
||||||
|
t.Fatalf("cc.WaitForStateChange(_, %s) = _, %v, want _, %v", grpc.Ready, err, context.DeadlineExceeded)
|
||||||
}
|
}
|
||||||
s.Stop()
|
s.Stop()
|
||||||
// Set -1 as the timeout to make sure if transportMonitor gets error
|
// Set -1 as the timeout to make sure if transportMonitor gets error
|
||||||
// notification in time the failure path of the 1st invoke of
|
// notification in time the failure path of the 1st invoke of
|
||||||
// ClientConn.wait hits the deadline exceeded error.
|
// ClientConn.wait hits the deadline exceeded error.
|
||||||
ctx, _ := context.WithTimeout(context.Background(), -1)
|
ctx, _ = context.WithTimeout(context.Background(), -1)
|
||||||
if _, err := tc.EmptyCall(ctx, &testpb.Empty{}); grpc.Code(err) != codes.DeadlineExceeded {
|
if _, err := tc.EmptyCall(ctx, &testpb.Empty{}); grpc.Code(err) != codes.DeadlineExceeded {
|
||||||
t.Fatalf("TestService/EmptyCall(%v, _) = _, error %v, want _, error code: %d", ctx, err, codes.DeadlineExceeded)
|
t.Fatalf("TestService/EmptyCall(%v, _) = _, error %v, want _, error code: %d", ctx, err, codes.DeadlineExceeded)
|
||||||
}
|
}
|
||||||
if ok := cc.WaitForStateChange(time.Second, grpc.Ready); !ok {
|
ctx, _ = context.WithTimeout(context.Background(), time.Second)
|
||||||
t.Fatalf("cc.WaitForStateChange(_, %s) = %t, want true", grpc.Ready, ok)
|
if _, err := cc.WaitForStateChange(ctx, grpc.Ready); err != nil {
|
||||||
|
t.Fatalf("cc.WaitForStateChange(_, %s) = _, %v, want _, <nil>", grpc.Ready, err)
|
||||||
}
|
}
|
||||||
state := cc.State()
|
if state, err := cc.State(); err != nil || (state != grpc.Connecting && state != grpc.TransientFailure) {
|
||||||
if state != grpc.Connecting && state != grpc.TransientFailure {
|
t.Fatalf("cc.State() = %s, %v, want %s or %s, <nil>", state, err, grpc.Connecting, grpc.TransientFailure)
|
||||||
t.Fatalf("cc.State() = %s, want %s or %s", state, grpc.Connecting, grpc.TransientFailure)
|
|
||||||
}
|
}
|
||||||
cc.Close()
|
cc.Close()
|
||||||
}
|
}
|
||||||
@ -521,17 +524,20 @@ func TestEmptyUnaryWithUserAgent(t *testing.T) {
|
|||||||
func testEmptyUnaryWithUserAgent(t *testing.T, e env) {
|
func testEmptyUnaryWithUserAgent(t *testing.T, e env) {
|
||||||
s, cc := setUp(t, nil, math.MaxUint32, testAppUA, e)
|
s, cc := setUp(t, nil, math.MaxUint32, testAppUA, e)
|
||||||
// Wait until cc is connected.
|
// Wait until cc is connected.
|
||||||
if ok := cc.WaitForStateChange(time.Second, grpc.Idle); !ok {
|
ctx, _ := context.WithTimeout(context.Background(), time.Second)
|
||||||
t.Fatalf("cc.WaitForStateChange(_, %s) = %t, want true", grpc.Idle, ok)
|
if _, err := cc.WaitForStateChange(ctx, grpc.Idle); err != nil {
|
||||||
|
t.Fatalf("cc.WaitForStateChange(_, %s) = _, %v, want _, <nil>", grpc.Idle, err)
|
||||||
}
|
}
|
||||||
if ok := cc.WaitForStateChange(10*time.Second, grpc.Connecting); !ok {
|
ctx, _ = context.WithTimeout(context.Background(), time.Second)
|
||||||
t.Fatalf("cc.WaitForStateChange(_, %s) = %t, want true", grpc.Connecting, ok)
|
if _, err := cc.WaitForStateChange(ctx, grpc.Connecting); err != nil {
|
||||||
|
t.Fatalf("cc.WaitForStateChange(_, %s) = _, %v, want _, <nil>", grpc.Connecting, err)
|
||||||
}
|
}
|
||||||
if cc.State() != grpc.Ready {
|
if state, err := cc.State(); err != nil || state != grpc.Ready {
|
||||||
t.Fatalf("cc.State() = %s, want %s", cc.State(), grpc.Ready)
|
t.Fatalf("cc.State() = %s, %v, want %s, <nil>", state, err, grpc.Ready)
|
||||||
}
|
}
|
||||||
if ok := cc.WaitForStateChange(time.Second, grpc.Ready); ok {
|
ctx, _ = context.WithTimeout(context.Background(), time.Second)
|
||||||
t.Fatalf("cc.WaitForStateChange(_, %s) = %t, want false", grpc.Ready, ok)
|
if _, err := cc.WaitForStateChange(ctx, grpc.Ready); err == nil {
|
||||||
|
t.Fatalf("cc.WaitForStateChange(_, %s) = _, %v, want _, %v", grpc.Ready, context.DeadlineExceeded)
|
||||||
}
|
}
|
||||||
tc := testpb.NewTestServiceClient(cc)
|
tc := testpb.NewTestServiceClient(cc)
|
||||||
var header metadata.MD
|
var header metadata.MD
|
||||||
@ -543,11 +549,12 @@ func testEmptyUnaryWithUserAgent(t *testing.T, e env) {
|
|||||||
t.Fatalf("header[\"ua\"] = %q, %t, want %q, true", v, ok, testAppUA)
|
t.Fatalf("header[\"ua\"] = %q, %t, want %q, true", v, ok, testAppUA)
|
||||||
}
|
}
|
||||||
tearDown(s, cc)
|
tearDown(s, cc)
|
||||||
if ok := cc.WaitForStateChange(5*time.Second, grpc.Ready); !ok {
|
ctx, _ = context.WithTimeout(context.Background(), 5 * time.Second)
|
||||||
t.Fatalf("cc.WaitForStateChange(_, %s) = %t, want true", grpc.Ready, ok)
|
if _, err := cc.WaitForStateChange(ctx, grpc.Ready); err != nil {
|
||||||
|
t.Fatalf("cc.WaitForStateChange(_, %s) = _, %v, want _, <nil>", grpc.Ready, err)
|
||||||
}
|
}
|
||||||
if cc.State() != grpc.Shutdown {
|
if state, err := cc.State(); err != nil || state != grpc.Shutdown {
|
||||||
t.Fatalf("cc.State() = %s, want %s", cc.State(), grpc.Shutdown)
|
t.Fatalf("cc.State() = %s, %v, want %s, <nil>", state, err, grpc.Shutdown)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
Reference in New Issue
Block a user