From a1b60d7b43696187fcd83baf55d3a1ffe7676304 Mon Sep 17 00:00:00 2001 From: iamqizhao Date: Thu, 12 May 2016 17:01:58 -0700 Subject: [PATCH] Add the tests for balancer and resolver --- balancer_test.go | 222 +++++++++++++++++++++++++++++++++++++++++++++++ call_test.go | 13 ++- clientconn.go | 3 +- 3 files changed, 235 insertions(+), 3 deletions(-) create mode 100644 balancer_test.go diff --git a/balancer_test.go b/balancer_test.go new file mode 100644 index 00000000..9fa76b4b --- /dev/null +++ b/balancer_test.go @@ -0,0 +1,222 @@ +/* + * + * Copyright 2014, Google Inc. + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are + * met: + * + * * Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * * Redistributions in binary form must reproduce the above + * copyright notice, this list of conditions and the following disclaimer + * in the documentation and/or other materials provided with the + * distribution. + * * Neither the name of Google Inc. nor the names of its + * contributors may be used to endorse or promote products derived from + * this software without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS + * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT + * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR + * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT + * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, + * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT + * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, + * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY + * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE + * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + * + */ + +package grpc + +import ( + "fmt" + "math" + "testing" + "time" + + "golang.org/x/net/context" + "google.golang.org/grpc/naming" +) + +type testWatcher struct { + // the channel to receives name resolution updates + update chan *naming.Update + // the side channel to get to know how many updates in a batch + side chan int + // the channel to notifiy update injector that the update reading is done + readDone chan int +} + +func (w *testWatcher) Next() (updates []*naming.Update, err error) { + n := <-w.side + if n == 0 { + return nil, fmt.Errorf("w.side is closed") + } + for i := 0; i < n; i++ { + u := <-w.update + if u != nil { + updates = append(updates, u) + } + } + w.readDone <- 0 + return +} + +func (w *testWatcher) Close() { +} + +func (w *testWatcher) inject(updates []*naming.Update) { + w.side <- len(updates) + for _, u := range updates { + w.update <- u + } + <-w.readDone +} + +type testNameResolver struct { + w *testWatcher + addr string +} + +func (r *testNameResolver) Resolve(target string) (naming.Watcher, error) { + r.w = &testWatcher{ + update: make(chan *naming.Update, 1), + side: make(chan int, 1), + readDone: make(chan int), + } + r.w.side <- 1 + r.w.update <- &naming.Update{ + Op: naming.Add, + Addr: r.addr, + } + go func() { + <-r.w.readDone + }() + return r.w, nil +} + +func startServers(t *testing.T, numServers, port int, maxStreams uint32) ([]*server, *testNameResolver) { + var servers []*server + for i := 0; i < numServers; i++ { + s := newTestServer() + servers = append(servers, s) + go s.start(t, port, maxStreams) + s.wait(t, 2*time.Second) + } + // Point to server[0] + addr := "127.0.0.1:" + servers[0].port + return servers, &testNameResolver{ + addr: addr, + } +} + +func TestNameDiscovery(t *testing.T) { + // Start 2 servers on 2 ports. + numServers := 2 + servers, r := startServers(t, numServers, 0, math.MaxUint32) + cc, err := Dial("foo.bar.com", WithNameResolver(r), WithBlock(), WithInsecure(), WithCodec(testCodec{})) + if err != nil { + t.Fatalf("Failed to create ClientConn: %v", err) + } + req := "port" + var reply string + if err := Invoke(context.Background(), "/foo/bar", &req, &reply, cc); err == nil || ErrorDesc(err) != servers[0].port { + t.Fatalf("grpc.Invoke(_, _, _, _, _) = %v, want %s", err, servers[0].port) + } + // Inject name resolution change to point to the second server now. + var updates []*naming.Update + updates = append(updates, &naming.Update{ + Op: naming.Delete, + Addr: "127.0.0.1:" + servers[0].port, + }) + updates = append(updates, &naming.Update{ + Op: naming.Add, + Addr: "127.0.0.1:" + servers[1].port, + }) + r.w.inject(updates) + for { + if err := Invoke(context.Background(), "/foo/bar", &req, &reply, cc); err != nil && ErrorDesc(err) == servers[1].port { + break + } + time.Sleep(10 * time.Millisecond) + } + cc.Close() + for i := 0; i < numServers; i++ { + servers[i].stop() + } +} + +func TestEmptyAddrs(t *testing.T) { + servers, r := startServers(t, 1, 0, math.MaxUint32) + cc, err := Dial("foo.bar.com", WithNameResolver(r), WithBlock(), WithInsecure(), WithCodec(testCodec{})) + if err != nil { + t.Fatalf("Failed to create ClientConn: %v", err) + } + var reply string + if err := Invoke(context.Background(), "/foo/bar", &expectedRequest, &reply, cc); err != nil || reply != expectedResponse { + t.Fatalf("grpc.Invoke(_, _, _, _, _) = %v, want ", err) + } + // Inject name resolution change to remove the server address so that there is no address + // available after that. + var updates []*naming.Update + updates = append(updates, &naming.Update{ + Op: naming.Delete, + Addr: "127.0.0.1:" + servers[0].port, + }) + r.w.inject(updates) + // Loop until the above updates apply. + for { + time.Sleep(10 * time.Millisecond) + ctx, _ := context.WithTimeout(context.Background(), 10*time.Millisecond) + if err := Invoke(ctx, "/foo/bar", &expectedRequest, &reply, cc); err != nil { + break + } + } + cc.Close() + servers[0].stop() +} + +func TestRoundRobin(t *testing.T) { + // Start 3 servers on 3 ports. + numServers := 3 + servers, r := startServers(t, numServers, 0, math.MaxUint32) + cc, err := Dial("foo.bar.com", WithNameResolver(r), WithBlock(), WithInsecure(), WithCodec(testCodec{})) + if err != nil { + t.Fatalf("Failed to create ClientConn: %v", err) + } + // Add the other 2 servers as the name updates. + var updates []*naming.Update + updates = append(updates, &naming.Update{ + Op: naming.Add, + Addr: "127.0.0.1:" + servers[1].port, + }) + updates = append(updates, &naming.Update{ + Op: naming.Add, + Addr: "127.0.0.1:" + servers[2].port, + }) + r.w.inject(updates) + req := "port" + var reply string + // Loop until an RPC is completed by servers[2]. + for { + if err := Invoke(context.Background(), "/foo/bar", &req, &reply, cc); err != nil && ErrorDesc(err) == servers[2].port { + break + } + time.Sleep(10 * time.Millisecond) + } + // Check it works in round-robin manner. + for i := 0; i < 10; i++ { + if err := Invoke(context.Background(), "/foo/bar", &req, &reply, cc); err == nil || ErrorDesc(err) != servers[i%numServers].port { + t.Fatalf("Invoke(_, _, _, _, _) = %v, want %s", err, servers[i%numServers].port) + } + } + cc.Close() + for i := 0; i < numServers; i++ { + servers[i].stop() + } +} diff --git a/call_test.go b/call_test.go index 7d01f457..20d4adb4 100644 --- a/call_test.go +++ b/call_test.go @@ -74,7 +74,8 @@ func (testCodec) String() string { } type testStreamHandler struct { - t transport.ServerTransport + port string + t transport.ServerTransport } func (h *testStreamHandler) handleStream(t *testing.T, s *transport.Stream) { @@ -106,6 +107,11 @@ func (h *testStreamHandler) handleStream(t *testing.T, s *transport.Stream) { h.t.WriteStatus(s, codes.Internal, "") return } + if v == "port" { + h.t.WriteStatus(s, codes.Internal, h.port) + return + } + if v != expectedRequest { h.t.WriteStatus(s, codes.Internal, strings.Repeat("A", sizeLargeErr)) return @@ -170,7 +176,10 @@ func (s *server) start(t *testing.T, port int, maxStreams uint32) { } s.conns[st] = true s.mu.Unlock() - h := &testStreamHandler{st} + h := &testStreamHandler{ + port: s.port, + t: st, + } go st.HandleStreams(func(s *transport.Stream) { go h.handleStream(t, s) }) diff --git a/clientconn.go b/clientconn.go index 75e59d07..8059e6f6 100644 --- a/clientconn.go +++ b/clientconn.go @@ -114,7 +114,8 @@ func WithDecompressor(dc Decompressor) DialOption { } } -func WithResolver(r naming.Resolver) DialOption { +// WithNameResolver returns a DialOption which sets a name resolver for service discovery. +func WithNameResolver(r naming.Resolver) DialOption { return func(o *dialOptions) { o.resolver = r }