grpc: restore changes after sync

Change-Id: I97f0c77f95086301202d0fe4ca477ae6e22dd0b5
This commit is contained in:
Sameer Ajmani
2015-09-23 15:18:41 -04:00
parent b5f8a855c8
commit 980b4c6d05
2 changed files with 86 additions and 8 deletions

View File

@ -42,6 +42,7 @@ import (
"time" "time"
"golang.org/x/net/context" "golang.org/x/net/context"
"golang.org/x/net/trace"
"google.golang.org/grpc/credentials" "google.golang.org/grpc/credentials"
"google.golang.org/grpc/grpclog" "google.golang.org/grpc/grpclog"
"google.golang.org/grpc/transport" "google.golang.org/grpc/transport"
@ -148,6 +149,9 @@ func Dial(target string, opts ...DialOption) (*ClientConn, error) {
target: target, target: target,
shutdownChan: make(chan struct{}), shutdownChan: make(chan struct{}),
} }
if EnableTracing {
cc.events = trace.NewEventLog("grpc.ClientConn", target)
}
for _, opt := range opts { for _, opt := range opts {
opt(&cc.dopts) opt(&cc.dopts)
} }
@ -181,6 +185,9 @@ func Dial(target string, opts ...DialOption) (*ClientConn, error) {
cc.stateCV = sync.NewCond(&cc.mu) cc.stateCV = sync.NewCond(&cc.mu)
if cc.dopts.block { if cc.dopts.block {
if err := cc.resetTransport(false); err != nil { if err := cc.resetTransport(false); err != nil {
cc.mu.Lock()
cc.errorf("dial failed: %v", err)
cc.mu.Unlock()
cc.Close() cc.Close()
return nil, err return nil, err
} }
@ -190,6 +197,9 @@ func Dial(target string, opts ...DialOption) (*ClientConn, error) {
// Start a goroutine connecting to the server asynchronously. // Start a goroutine connecting to the server asynchronously.
go func() { go func() {
if err := cc.resetTransport(false); err != nil { if err := cc.resetTransport(false); err != nil {
cc.mu.Lock()
cc.errorf("dial failed: %v", err)
cc.mu.Unlock()
grpclog.Printf("Failed to dial %s: %v; please retry.", target, err) grpclog.Printf("Failed to dial %s: %v; please retry.", target, err)
cc.Close() cc.Close()
return return
@ -200,6 +210,22 @@ func Dial(target string, opts ...DialOption) (*ClientConn, error) {
return cc, nil return cc, nil
} }
// printf records an event in cc's event log, unless cc has been closed.
// REQUIRES cc.mu is held.
func (cc *ClientConn) printf(format string, a ...interface{}) {
if cc.events != nil {
cc.events.Printf(format, a...)
}
}
// errorf records an error in cc's event log, unless cc has been closed.
// REQUIRES cc.mu is held.
func (cc *ClientConn) errorf(format string, a ...interface{}) {
if cc.events != nil {
cc.events.Errorf(format, a...)
}
}
// ConnectivityState indicates the state of a client connection. // ConnectivityState indicates the state of a client connection.
type ConnectivityState int type ConnectivityState int
@ -239,14 +265,15 @@ type ClientConn struct {
authority string authority string
dopts dialOptions dopts dialOptions
shutdownChan chan struct{} shutdownChan chan struct{}
events trace.EventLog
mu sync.Mutex mu sync.Mutex
state ConnectivityState state ConnectivityState
stateCV *sync.Cond stateCV *sync.Cond
// ready is closed and becomes nil when a new transport is up or failed // ready is closed and becomes nil when a new transport is up or failed
// due to timeout. // due to timeout.
ready chan struct{} ready chan struct{}
transport transport.ClientTransport transport transport.ClientTransport
} }
// State returns the connectivity state of the ClientConn // State returns the connectivity state of the ClientConn
@ -295,6 +322,7 @@ func (cc *ClientConn) resetTransport(closeTransport bool) error {
start := time.Now() start := time.Now()
for { for {
cc.mu.Lock() cc.mu.Lock()
cc.printf("connecting")
if cc.state == Shutdown { if cc.state == Shutdown {
cc.mu.Unlock() cc.mu.Unlock()
return ErrClientConnClosing return ErrClientConnClosing
@ -330,6 +358,7 @@ func (cc *ClientConn) resetTransport(closeTransport bool) error {
newTransport, err := transport.NewClientTransport(cc.target, &copts) newTransport, err := transport.NewClientTransport(cc.target, &copts)
if err != nil { if err != nil {
cc.mu.Lock() cc.mu.Lock()
cc.errorf("transient failure: %v", err)
cc.state = TransientFailure cc.state = TransientFailure
cc.stateCV.Broadcast() cc.stateCV.Broadcast()
cc.mu.Unlock() cc.mu.Unlock()
@ -339,6 +368,9 @@ func (cc *ClientConn) resetTransport(closeTransport bool) error {
} }
// Fail early before falling into sleep. // Fail early before falling into sleep.
if cc.dopts.copts.Timeout > 0 && cc.dopts.copts.Timeout < sleepTime+time.Since(start) { if cc.dopts.copts.Timeout > 0 && cc.dopts.copts.Timeout < sleepTime+time.Since(start) {
cc.mu.Lock()
cc.errorf("connection timeout")
cc.mu.Unlock()
cc.Close() cc.Close()
return ErrClientConnTimeout return ErrClientConnTimeout
} }
@ -349,6 +381,7 @@ func (cc *ClientConn) resetTransport(closeTransport bool) error {
continue continue
} }
cc.mu.Lock() cc.mu.Lock()
cc.printf("ready")
if cc.state == Shutdown { if cc.state == Shutdown {
// cc.Close() has been invoked. // cc.Close() has been invoked.
cc.mu.Unlock() cc.mu.Unlock()
@ -383,6 +416,9 @@ func (cc *ClientConn) transportMonitor() {
cc.mu.Unlock() cc.mu.Unlock()
if err := cc.resetTransport(true); err != nil { if err := cc.resetTransport(true); err != nil {
// The ClientConn is closing. // The ClientConn is closing.
cc.mu.Lock()
cc.printf("transport exiting: %v", err)
cc.mu.Unlock()
grpclog.Printf("grpc: ClientConn.transportMonitor exits due to: %v", err) grpclog.Printf("grpc: ClientConn.transportMonitor exits due to: %v", err)
return return
} }
@ -433,6 +469,10 @@ func (cc *ClientConn) Close() error {
} }
cc.state = Shutdown cc.state = Shutdown
cc.stateCV.Broadcast() cc.stateCV.Broadcast()
if cc.events != nil {
cc.events.Finish()
cc.events = nil
}
if cc.ready != nil { if cc.ready != nil {
close(cc.ready) close(cc.ready)
cc.ready = nil cc.ready = nil

View File

@ -39,6 +39,7 @@ import (
"io" "io"
"net" "net"
"reflect" "reflect"
"runtime"
"strings" "strings"
"sync" "sync"
@ -79,11 +80,12 @@ type service struct {
// Server is a gRPC server to serve RPC requests. // Server is a gRPC server to serve RPC requests.
type Server struct { type Server struct {
opts options opts options
mu sync.Mutex mu sync.Mutex
lis map[net.Listener]bool lis map[net.Listener]bool
conns map[transport.ServerTransport]bool conns map[transport.ServerTransport]bool
m map[string]*service // service name -> service info m map[string]*service // service name -> service info
events trace.EventLog
} }
type options struct { type options struct {
@ -128,12 +130,33 @@ func NewServer(opt ...ServerOption) *Server {
// Set the default codec. // Set the default codec.
opts.codec = protoCodec{} opts.codec = protoCodec{}
} }
return &Server{ s := &Server{
lis: make(map[net.Listener]bool), lis: make(map[net.Listener]bool),
opts: opts, opts: opts,
conns: make(map[transport.ServerTransport]bool), conns: make(map[transport.ServerTransport]bool),
m: make(map[string]*service), m: make(map[string]*service),
} }
if EnableTracing {
_, file, line, _ := runtime.Caller(1)
s.events = trace.NewEventLog("grpc.Server", fmt.Sprintf("%s:%d", file, line))
}
return s
}
// printf records an event in s's event log, unless s has been stopped.
// REQUIRES s.mu is held.
func (s *Server) printf(format string, a ...interface{}) {
if s.events != nil {
s.events.Printf(format, a...)
}
}
// errorf records an error in s's event log, unless s has been stopped.
// REQUIRES s.mu is held.
func (s *Server) errorf(format string, a ...interface{}) {
if s.events != nil {
s.events.Errorf(format, a...)
}
} }
// RegisterService register a service and its implementation to the gRPC // RegisterService register a service and its implementation to the gRPC
@ -151,6 +174,7 @@ func (s *Server) RegisterService(sd *ServiceDesc, ss interface{}) {
func (s *Server) register(sd *ServiceDesc, ss interface{}) { func (s *Server) register(sd *ServiceDesc, ss interface{}) {
s.mu.Lock() s.mu.Lock()
defer s.mu.Unlock() defer s.mu.Unlock()
s.printf("RegisterService(%q)", sd.ServiceName)
if _, ok := s.m[sd.ServiceName]; ok { if _, ok := s.m[sd.ServiceName]; ok {
grpclog.Fatalf("grpc: Server.RegisterService found duplicate service registration for %q", sd.ServiceName) grpclog.Fatalf("grpc: Server.RegisterService found duplicate service registration for %q", sd.ServiceName)
} }
@ -182,6 +206,7 @@ var (
// Service returns when lis.Accept fails. // Service returns when lis.Accept fails.
func (s *Server) Serve(lis net.Listener) error { func (s *Server) Serve(lis net.Listener) error {
s.mu.Lock() s.mu.Lock()
s.printf("serving")
if s.lis == nil { if s.lis == nil {
s.mu.Unlock() s.mu.Unlock()
return ErrServerStopped return ErrServerStopped
@ -197,12 +222,18 @@ func (s *Server) Serve(lis net.Listener) error {
for { for {
c, err := lis.Accept() c, err := lis.Accept()
if err != nil { if err != nil {
s.mu.Lock()
s.printf("done serving; Accept = %v", err)
s.mu.Unlock()
return err return err
} }
var authInfo credentials.AuthInfo var authInfo credentials.AuthInfo
if creds, ok := s.opts.creds.(credentials.TransportAuthenticator); ok { if creds, ok := s.opts.creds.(credentials.TransportAuthenticator); ok {
c, authInfo, err = creds.ServerHandshake(c) c, authInfo, err = creds.ServerHandshake(c)
if err != nil { if err != nil {
s.mu.Lock()
s.errorf("ServerHandshake(%q) failed: %v", c.RemoteAddr(), err)
s.mu.Unlock()
grpclog.Println("grpc: Server.Serve failed to complete security handshake.") grpclog.Println("grpc: Server.Serve failed to complete security handshake.")
continue continue
} }
@ -215,6 +246,7 @@ func (s *Server) Serve(lis net.Listener) error {
} }
st, err := transport.NewServerTransport("http2", c, s.opts.maxConcurrentStreams, authInfo) st, err := transport.NewServerTransport("http2", c, s.opts.maxConcurrentStreams, authInfo)
if err != nil { if err != nil {
s.errorf("NewServerTransport(%q) failed: %v", c.RemoteAddr(), err)
s.mu.Unlock() s.mu.Unlock()
c.Close() c.Close()
grpclog.Println("grpc: Server.Serve failed to create ServerTransport: ", err) grpclog.Println("grpc: Server.Serve failed to create ServerTransport: ", err)
@ -418,6 +450,12 @@ func (s *Server) Stop() {
for c := range cs { for c := range cs {
c.Close() c.Close()
} }
s.mu.Lock()
if s.events != nil {
s.events.Finish()
s.events = nil
}
s.mu.Unlock()
} }
// TestingCloseConns closes all exiting transports but keeps s.lis accepting new // TestingCloseConns closes all exiting transports but keeps s.lis accepting new