Merge pull request #476 from iamqizhao/master

deflake a test and interop test format fix
This commit is contained in:
Qi Zhao
2015-12-18 18:15:55 -08:00
3 changed files with 12 additions and 21 deletions

View File

@ -73,7 +73,7 @@ var (
cancel_after_first_response: cancellation after receiving 1st message from the server.`) cancel_after_first_response: cancellation after receiving 1st message from the server.`)
// The test CA root cert file // The test CA root cert file
testCAFile = "testdata/ca.pem" testCAFile = "testdata/ca.pem"
) )
func main() { func main() {

View File

@ -242,7 +242,7 @@ func DoEmptyStream(tc testpb.TestServiceClient) {
grpclog.Println("Emptystream done") grpclog.Println("Emptystream done")
} }
// DoTimeoutSleepingServer performs an RPC on a sleep server which causes RPC timeout. // DoTimeoutSleepingServer performs an RPC on a sleep server which causes RPC timeout.
func DoTimeoutOnSleepingServer(tc testpb.TestServiceClient) { func DoTimeoutOnSleepingServer(tc testpb.TestServiceClient) {
ctx, _ := context.WithTimeout(context.Background(), 1*time.Millisecond) ctx, _ := context.WithTimeout(context.Background(), 1*time.Millisecond)
stream, err := tc.FullDuplexCall(ctx) stream, err := tc.FullDuplexCall(ctx)
@ -600,4 +600,3 @@ func (s *TestServer) HalfDuplexCall(stream testpb.TestService_HalfDuplexCallServ
} }
return nil return nil
} }

View File

@ -36,7 +36,6 @@ package grpc
import ( import (
"fmt" "fmt"
"math" "math"
"sync"
"testing" "testing"
"time" "time"
@ -51,7 +50,6 @@ type testWatcher struct {
side chan int side chan int
// the channel to notifiy update injector that the update reading is done // the channel to notifiy update injector that the update reading is done
readDone chan int readDone chan int
wg *sync.WaitGroup
} }
func (w *testWatcher) Next() (updates []*naming.Update, err error) { func (w *testWatcher) Next() (updates []*naming.Update, err error) {
@ -83,7 +81,6 @@ func (w *testWatcher) inject(updates []*naming.Update) {
type testNameResolver struct { type testNameResolver struct {
w *testWatcher w *testWatcher
addr string addr string
wg *sync.WaitGroup
} }
func (r *testNameResolver) Resolve(target string) (naming.Watcher, error) { func (r *testNameResolver) Resolve(target string) (naming.Watcher, error) {
@ -91,7 +88,6 @@ func (r *testNameResolver) Resolve(target string) (naming.Watcher, error) {
update: make(chan *naming.Update, 1), update: make(chan *naming.Update, 1),
side: make(chan int, 1), side: make(chan int, 1),
readDone: make(chan int), readDone: make(chan int),
wg: r.wg,
} }
r.w.side <- 1 r.w.side <- 1
r.w.update <- &naming.Update{ r.w.update <- &naming.Update{
@ -100,14 +96,11 @@ func (r *testNameResolver) Resolve(target string) (naming.Watcher, error) {
} }
go func() { go func() {
<-r.w.readDone <-r.w.readDone
if r.w.wg != nil {
r.w.wg.Done()
}
}() }()
return r.w, nil return r.w, nil
} }
func startServers(t *testing.T, numServers, port int, maxStreams uint32, wg *sync.WaitGroup) ([]*server, *testNameResolver) { func startServers(t *testing.T, numServers, port int, maxStreams uint32) ([]*server, *testNameResolver) {
var servers []*server var servers []*server
for i := 0; i < numServers; i++ { for i := 0; i < numServers; i++ {
s := &server{readyChan: make(chan bool)} s := &server{readyChan: make(chan bool)}
@ -119,13 +112,12 @@ func startServers(t *testing.T, numServers, port int, maxStreams uint32, wg *syn
addr := "127.0.0.1:" + servers[0].port addr := "127.0.0.1:" + servers[0].port
return servers, &testNameResolver{ return servers, &testNameResolver{
addr: addr, addr: addr,
wg: wg,
} }
} }
func TestNameDiscovery(t *testing.T) { func TestNameDiscovery(t *testing.T) {
// Start 3 servers on 3 ports. // Start 3 servers on 3 ports.
servers, r := startServers(t, 3, 0, math.MaxUint32, nil) servers, r := startServers(t, 3, 0, math.MaxUint32)
cc, err := Dial("foo.bar.com", WithPicker(NewUnicastNamingPicker(r)), WithBlock(), WithInsecure(), WithCodec(testCodec{})) cc, err := Dial("foo.bar.com", WithPicker(NewUnicastNamingPicker(r)), WithBlock(), WithInsecure(), WithCodec(testCodec{}))
if err != nil { if err != nil {
t.Fatalf("Failed to create ClientConn: %v", err) t.Fatalf("Failed to create ClientConn: %v", err)
@ -166,9 +158,7 @@ func TestNameDiscovery(t *testing.T) {
} }
func TestEmptyAddrs(t *testing.T) { func TestEmptyAddrs(t *testing.T) {
var wg sync.WaitGroup servers, r := startServers(t, 1, 0, math.MaxUint32)
servers, r := startServers(t, 1, 0, math.MaxUint32, &wg)
wg.Add(1)
cc, err := Dial("foo.bar.com", WithPicker(NewUnicastNamingPicker(r)), WithBlock(), WithInsecure(), WithCodec(testCodec{})) cc, err := Dial("foo.bar.com", WithPicker(NewUnicastNamingPicker(r)), WithBlock(), WithInsecure(), WithCodec(testCodec{}))
if err != nil { if err != nil {
t.Fatalf("Failed to create ClientConn: %v", err) t.Fatalf("Failed to create ClientConn: %v", err)
@ -184,12 +174,14 @@ func TestEmptyAddrs(t *testing.T) {
Op: naming.Delete, Op: naming.Delete,
Addr: "127.0.0.1:" + servers[0].port, Addr: "127.0.0.1:" + servers[0].port,
}) })
// Wait until the first reading is done.
wg.Wait()
r.w.inject(updates) r.w.inject(updates)
ctx, _ := context.WithTimeout(context.Background(), 10*time.Millisecond) // Loop until the above updates apply.
if err := Invoke(ctx, "/foo/bar", &expectedRequest, &reply, cc); err == nil { for {
t.Errorf("grpc.Invoke(_, _, _, _, _) = %v, want non-<nil>", err) time.Sleep(10 * time.Millisecond)
ctx, _ := context.WithTimeout(context.Background(), 10*time.Millisecond)
if err := Invoke(ctx, "/foo/bar", &expectedRequest, &reply, cc); err != nil {
break
}
} }
cc.Close() cc.Close()
servers[0].stop() servers[0].stop()