From cd5721c02fac6005d1cb0774f1d0e0418148a6d5 Mon Sep 17 00:00:00 2001 From: Menghan Li Date: Tue, 16 Aug 2016 11:44:41 -0700 Subject: [PATCH] Add TestOneConnectionRemoval --- balancer_test.go | 66 ++++++++++++++++++++++++++++++++++++++++++++++-- 1 file changed, 64 insertions(+), 2 deletions(-) diff --git a/balancer_test.go b/balancer_test.go index a101fe15..48f8b27d 100644 --- a/balancer_test.go +++ b/balancer_test.go @@ -321,7 +321,7 @@ func TestGetOnWaitChannel(t *testing.T) { servers[0].stop() } -func TestOneConnectionDown(t *testing.T) { +func TestOneServerDown(t *testing.T) { // Start 2 servers. numServers := 2 servers, r := startServers(t, numServers, math.MaxUint32) @@ -363,7 +363,7 @@ func TestOneConnectionDown(t *testing.T) { go func() { time.Sleep(sleepDuration) // After sleepDuration, invoke RPC. - // server[0] is killed around the same time to make it racey between balancer and gRPC internals. + // server[0] is killed around the same time to make it racy between balancer and gRPC internals. Invoke(context.Background(), "/foo/bar", &req, &reply, cc, FailFast(false)) wg.Done() }() @@ -374,3 +374,65 @@ func TestOneConnectionDown(t *testing.T) { servers[i].stop() } } + +func TestOneAddressRemoval(t *testing.T) { + // Start 2 servers. + numServers := 2 + servers, r := startServers(t, numServers, math.MaxUint32) + cc, err := Dial("foo.bar.com", WithBalancer(RoundRobin(r)), WithBlock(), WithInsecure(), WithCodec(testCodec{})) + if err != nil { + t.Fatalf("Failed to create ClientConn: %v", err) + } + // Add servers[1] to the service discovery. + var updates []*naming.Update + updates = append(updates, &naming.Update{ + Op: naming.Add, + Addr: "127.0.0.1:" + servers[1].port, + }) + r.w.inject(updates) + req := "port" + var reply string + // Loop until servers[1] is up + for { + if err := Invoke(context.Background(), "/foo/bar", &req, &reply, cc); err != nil && ErrorDesc(err) == servers[1].port { + break + } + time.Sleep(10 * time.Millisecond) + } + + var wg sync.WaitGroup + numRPC := 100 + sleepDuration := 10 * time.Millisecond + wg.Add(1) + go func() { + time.Sleep(sleepDuration) + // After sleepDuration, delete server[0]. + var updates []*naming.Update + updates = append(updates, &naming.Update{ + Op: naming.Delete, + Addr: "127.0.0.1:" + servers[0].port, + }) + r.w.inject(updates) + wg.Done() + }() + + // All non-failfast RPCs should not fail because there's at least one connection available. + for i := 0; i < numRPC; i++ { + wg.Add(1) + go func() { + var reply string + time.Sleep(sleepDuration) + // After sleepDuration, invoke RPC. + // server[0] is removed around the same time to make it racy between balancer and gRPC internals. + if err := Invoke(context.Background(), "/foo/bar", &expectedRequest, &reply, cc, FailFast(false)); err != nil { + t.Errorf("grpc.Invoke(_, _, _, _, _) = %v, want not nil", err) + } + wg.Done() + }() + } + wg.Wait() + cc.Close() + for i := 0; i < numServers; i++ { + servers[i].stop() + } +}