/* * * Copyright 2014, Google Inc. * All rights reserved. * * Redistribution and use in source and binary forms, with or without * modification, are permitted provided that the following conditions are * met: * * * Redistributions of source code must retain the above copyright * notice, this list of conditions and the following disclaimer. * * Redistributions in binary form must reproduce the above * copyright notice, this list of conditions and the following disclaimer * in the documentation and/or other materials provided with the * distribution. * * Neither the name of Google Inc. nor the names of its * contributors may be used to endorse or promote products derived from * this software without specific prior written permission. * * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. * */ package grpc import ( "fmt" "math" "sync" "testing" "time" "golang.org/x/net/context" "google.golang.org/grpc/naming" ) type testWatcher struct { // the channel to receives name resolution updates update chan *naming.Update // the side channel to get to know how many updates in a batch side chan int // the channel to notifiy update injector that the update reading is done readDone chan int wg *sync.WaitGroup } func (w *testWatcher) Next() (updates []*naming.Update, err error) { n := <-w.side if n == 0 { return nil, fmt.Errorf("w.side is closed") } for i := 0; i < n; i++ { u := <-w.update if u != nil { updates = append(updates, u) } } w.readDone <- 0 return } func (w *testWatcher) Close() { } func (w *testWatcher) inject(updates []*naming.Update) { w.side <- len(updates) for _, u := range updates { w.update <- u } <-w.readDone } type testNameResolver struct { w *testWatcher addr string wg *sync.WaitGroup } func (r *testNameResolver) Resolve(target string) (naming.Watcher, error) { r.w = &testWatcher{ update: make(chan *naming.Update, 1), side: make(chan int, 1), readDone: make(chan int), wg: r.wg, } r.w.side <- 1 r.w.update <- &naming.Update{ Op: naming.Add, Addr: r.addr, } go func() { <-r.w.readDone if r.w.wg != nil { r.w.wg.Done() } }() return r.w, nil } func startServers(t *testing.T, numServers, port int, maxStreams uint32, wg *sync.WaitGroup) ([]*server, *testNameResolver) { var servers []*server for i := 0; i < numServers; i++ { s := &server{readyChan: make(chan bool)} servers = append(servers, s) go s.start(t, port, maxStreams) s.wait(t, 2*time.Second) } // Point to server1 addr := "127.0.0.1:" + servers[0].port return servers, &testNameResolver{ addr: addr, wg: wg, } } func TestNameDiscovery(t *testing.T) { // Start 3 servers on 3 ports. servers, r := startServers(t, 3, 0, math.MaxUint32, nil) cc, err := Dial("foo.bar.com", WithPicker(NewUnicastNamingPicker(r)), WithBlock(), WithInsecure(), WithCodec(testCodec{})) if err != nil { t.Fatalf("Failed to create ClientConn: %v", err) } var reply string if err := Invoke(context.Background(), "/foo/bar", &expectedRequest, &reply, cc); err != nil || reply != expectedResponse { t.Fatalf("grpc.Invoke(_, _, _, _, _) = %v, want ", err) } // Inject name resolution change to point to the second server now. var updates []*naming.Update updates = append(updates, &naming.Update{ Op: naming.Delete, Addr: "127.0.0.1:" + servers[0].port, }) updates = append(updates, &naming.Update{ Op: naming.Add, Addr: "127.0.0.1:" + servers[1].port, }) r.w.inject(updates) servers[0].stop() if err := Invoke(context.Background(), "/foo/bar", &expectedRequest, &reply, cc); err != nil || reply != expectedResponse { t.Fatalf("grpc.Invoke(_, _, _, _, _) = %v, want ", err) } // Add another server address (server#3) to name resolution updates = nil updates = append(updates, &naming.Update{ Op: naming.Add, Addr: "127.0.0.1:" + servers[2].port, }) r.w.inject(updates) // Stop server#2. The library should direct to server#3 automatically. servers[1].stop() if err := Invoke(context.Background(), "/foo/bar", &expectedRequest, &reply, cc); err != nil || reply != expectedResponse { t.Fatalf("grpc.Invoke(_, _, _, _, _) = %v, want ", err) } cc.Close() servers[2].stop() } func TestEmptyAddrs(t *testing.T) { var wg sync.WaitGroup servers, r := startServers(t, 1, 0, math.MaxUint32, &wg) wg.Add(1) cc, err := Dial("foo.bar.com", WithPicker(NewUnicastNamingPicker(r)), WithBlock(), WithInsecure(), WithCodec(testCodec{})) if err != nil { t.Fatalf("Failed to create ClientConn: %v", err) } var reply string if err := Invoke(context.Background(), "/foo/bar", &expectedRequest, &reply, cc); err != nil || reply != expectedResponse { t.Fatalf("grpc.Invoke(_, _, _, _, _) = %v, want ", err) } // Inject name resolution change to remove the server address so that there is no address // available after that. var updates []*naming.Update updates = append(updates, &naming.Update{ Op: naming.Delete, Addr: "127.0.0.1:" + servers[0].port, }) // Wait until the first reading is done. wg.Wait() r.w.inject(updates) ctx, _ := context.WithTimeout(context.Background(), 10*time.Millisecond) if err := Invoke(ctx, "/foo/bar", &expectedRequest, &reply, cc); err == nil { t.Errorf("grpc.Invoke(_, _, _, _, _) = %v, want non-", err) } cc.Close() servers[0].stop() }