From 9dc72d1df0066e130be84a211b37b007c41370fd Mon Sep 17 00:00:00 2001 From: Menghan Li Date: Tue, 26 Nov 2019 16:07:43 -0800 Subject: [PATCH] Revert "dns: stop polling for updates; use UpdateState API" (#3213) This reverts commit e5e980f2766388fc243cc4cf20c59b9c13affa8c. --- internal/resolver/dns/dns_resolver.go | 139 ++++++++--- internal/resolver/dns/dns_resolver_test.go | 275 ++++++++++++--------- 2 files changed, 259 insertions(+), 155 deletions(-) diff --git a/internal/resolver/dns/dns_resolver.go b/internal/resolver/dns/dns_resolver.go index bc89eee5..65f231c1 100644 --- a/internal/resolver/dns/dns_resolver.go +++ b/internal/resolver/dns/dns_resolver.go @@ -32,10 +32,11 @@ import ( "sync" "time" + "google.golang.org/grpc/backoff" "google.golang.org/grpc/grpclog" + internalbackoff "google.golang.org/grpc/internal/backoff" "google.golang.org/grpc/internal/grpcrand" "google.golang.org/grpc/resolver" - "google.golang.org/grpc/serviceconfig" ) // EnableSRVLookups controls whether the DNS resolver attempts to fetch gRPCLB @@ -48,6 +49,7 @@ func init() { const ( defaultPort = "443" + defaultFreq = time.Minute * 30 defaultDNSSvrPort = "53" golang = "GO" // txtPrefix is the prefix string to be prepended to the host name for txt record lookup. @@ -97,10 +99,13 @@ var customAuthorityResolver = func(authority string) (netResolver, error) { // NewBuilder creates a dnsBuilder which is used to factory DNS resolvers. func NewBuilder() resolver.Builder { - return &dnsBuilder{} + return &dnsBuilder{minFreq: defaultFreq} } -type dnsBuilder struct{} +type dnsBuilder struct { + // minimum frequency of polling the DNS server. + minFreq time.Duration +} // Build creates and starts a DNS resolver that watches the name resolution of the target. func (b *dnsBuilder) Build(target resolver.Target, cc resolver.ClientConn, opts resolver.BuildOptions) (resolver.Resolver, error) { @@ -110,20 +115,33 @@ func (b *dnsBuilder) Build(target resolver.Target, cc resolver.ClientConn, opts } // IP address. - if ipAddr, ok := formatIP(host); ok { - addr := []resolver.Address{{Addr: ipAddr + ":" + port}} - cc.UpdateState(resolver.State{Addresses: addr}) - return deadResolver{}, nil + if net.ParseIP(host) != nil { + host, _ = formatIP(host) + addr := []resolver.Address{{Addr: host + ":" + port}} + i := &ipResolver{ + cc: cc, + ip: addr, + rn: make(chan struct{}, 1), + q: make(chan struct{}), + } + cc.NewAddress(addr) + go i.watcher() + return i, nil } // DNS address (non-IP). ctx, cancel := context.WithCancel(context.Background()) + bc := backoff.DefaultConfig + bc.MaxDelay = b.minFreq d := &dnsResolver{ + freq: b.minFreq, + backoff: internalbackoff.Exponential{Config: bc}, host: host, port: port, ctx: ctx, cancel: cancel, cc: cc, + t: time.NewTimer(0), rn: make(chan struct{}, 1), disableServiceConfig: opts.DisableServiceConfig, } @@ -139,7 +157,6 @@ func (b *dnsBuilder) Build(target resolver.Target, cc resolver.ClientConn, opts d.wg.Add(1) go d.watcher() - d.ResolveNow(resolver.ResolveNowOptions{}) return d, nil } @@ -154,23 +171,53 @@ type netResolver interface { LookupTXT(ctx context.Context, name string) (txts []string, err error) } -// deadResolver is a resolver that does nothing. -type deadResolver struct{} +// ipResolver watches for the name resolution update for an IP address. +type ipResolver struct { + cc resolver.ClientConn + ip []resolver.Address + // rn channel is used by ResolveNow() to force an immediate resolution of the target. + rn chan struct{} + q chan struct{} +} -func (deadResolver) ResolveNow(resolver.ResolveNowOptions) {} +// ResolveNow resend the address it stores, no resolution is needed. +func (i *ipResolver) ResolveNow(opt resolver.ResolveNowOptions) { + select { + case i.rn <- struct{}{}: + default: + } +} -func (deadResolver) Close() {} +// Close closes the ipResolver. +func (i *ipResolver) Close() { + close(i.q) +} + +func (i *ipResolver) watcher() { + for { + select { + case <-i.rn: + i.cc.NewAddress(i.ip) + case <-i.q: + return + } + } +} // dnsResolver watches for the name resolution update for a non-IP target. type dnsResolver struct { - host string - port string - resolver netResolver - ctx context.Context - cancel context.CancelFunc - cc resolver.ClientConn + freq time.Duration + backoff internalbackoff.Exponential + retryCount int + host string + port string + resolver netResolver + ctx context.Context + cancel context.CancelFunc + cc resolver.ClientConn // rn channel is used by ResolveNow() to force an immediate resolution of the target. rn chan struct{} + t *time.Timer // wg is used to enforce Close() to return after the watcher() goroutine has finished. // Otherwise, data race will be possible. [Race Example] in dns_resolver_test we // replace the real lookup functions with mocked ones to facilitate testing. @@ -182,7 +229,7 @@ type dnsResolver struct { } // ResolveNow invoke an immediate resolution of the target that this dnsResolver watches. -func (d *dnsResolver) ResolveNow(resolver.ResolveNowOptions) { +func (d *dnsResolver) ResolveNow(opt resolver.ResolveNowOptions) { select { case d.rn <- struct{}{}: default: @@ -193,6 +240,7 @@ func (d *dnsResolver) ResolveNow(resolver.ResolveNowOptions) { func (d *dnsResolver) Close() { d.cancel() d.wg.Wait() + d.t.Stop() } func (d *dnsResolver) watcher() { @@ -201,11 +249,29 @@ func (d *dnsResolver) watcher() { select { case <-d.ctx.Done(): return + case <-d.t.C: case <-d.rn: + if !d.t.Stop() { + // Before resetting a timer, it should be stopped to prevent racing with + // reads on it's channel. + <-d.t.C + } } - state := d.lookup() - d.cc.UpdateState(*state) + result, sc := d.lookup() + // Next lookup should happen within an interval defined by d.freq. It may be + // more often due to exponential retry on empty address list. + if len(result) == 0 { + d.retryCount++ + d.t.Reset(d.backoff.Backoff(d.retryCount)) + } else { + d.retryCount = 0 + d.t.Reset(d.freq) + } + if sc != "" { // We get empty string when disabled or the TXT lookup failed. + d.cc.NewServiceConfig(sc) + } + d.cc.NewAddress(result) // Sleep to prevent excessive re-resolutions. Incoming resolution requests // will be queued in d.rn. @@ -248,12 +314,11 @@ func (d *dnsResolver) lookupSRV() []resolver.Address { return newAddrs } -func (d *dnsResolver) lookupTXT() *serviceconfig.ParseResult { +func (d *dnsResolver) lookupTXT() string { ss, err := d.resolver.LookupTXT(d.ctx, txtPrefix+d.host) if err != nil { - err = fmt.Errorf("error from DNS TXT record lookup: %v", err) - grpclog.Infoln("grpc:", err) - return &serviceconfig.ParseResult{Err: err} + grpclog.Infof("grpc: failed dns TXT record lookup due to %v.\n", err) + return "" } var res string for _, s := range ss { @@ -262,12 +327,10 @@ func (d *dnsResolver) lookupTXT() *serviceconfig.ParseResult { // TXT record must have "grpc_config=" attribute in order to be used as service config. if !strings.HasPrefix(res, txtAttribute) { - grpclog.Warningf("grpc: DNS TXT record %v missing %v attribute", res, txtAttribute) - // This is not an error; it is the equivalent of not having a service config. - return nil + grpclog.Warningf("grpc: TXT record %v missing %v attribute", res, txtAttribute) + return "" } - sc := canaryingSC(strings.TrimPrefix(res, txtAttribute)) - return d.cc.ParseServiceConfig(sc) + return strings.TrimPrefix(res, txtAttribute) } func (d *dnsResolver) lookupHost() []resolver.Address { @@ -289,15 +352,15 @@ func (d *dnsResolver) lookupHost() []resolver.Address { return newAddrs } -func (d *dnsResolver) lookup() *resolver.State { - srv := d.lookupSRV() - state := &resolver.State{ - Addresses: append(d.lookupHost(), srv...), +func (d *dnsResolver) lookup() ([]resolver.Address, string) { + newAddrs := d.lookupSRV() + // Support fallback to non-balancer address. + newAddrs = append(newAddrs, d.lookupHost()...) + if d.disableServiceConfig { + return newAddrs, "" } - if !d.disableServiceConfig { - state.ServiceConfig = d.lookupTXT() - } - return state + sc := d.lookupTXT() + return newAddrs, canaryingSC(sc) } // formatIP returns ok = false if addr is not a valid textual representation of an IP address. diff --git a/internal/resolver/dns/dns_resolver_test.go b/internal/resolver/dns/dns_resolver_test.go index be3ae2aa..f8083b20 100644 --- a/internal/resolver/dns/dns_resolver_test.go +++ b/internal/resolver/dns/dns_resolver_test.go @@ -35,11 +35,14 @@ import ( ) func TestMain(m *testing.M) { - // Set a non-zero duration only for tests which are actually testing that - // feature. - replaceDNSResRate(time.Duration(0)) // No nead to clean up since we os.Exit - replaceNetFunc(nil) // No nead to clean up since we os.Exit + // Set a valid duration for the re-resolution rate only for tests which are + // actually testing that feature. + dc := replaceDNSResRate(time.Duration(0)) + defer dc() + + cleanup := replaceNetFunc(nil) code := m.Run() + cleanup() os.Exit(code) } @@ -48,43 +51,47 @@ const ( ) type testClientConn struct { - resolver.ClientConn // For unimplemented functions - target string - m1 sync.Mutex - state resolver.State - updateStateCalls int + target string + m1 sync.Mutex + addrs []resolver.Address + a int // how many times NewAddress() has been called + m2 sync.Mutex + sc string + s int } func (t *testClientConn) UpdateState(s resolver.State) { + panic("unused") +} + +func (t *testClientConn) NewAddress(addresses []resolver.Address) { t.m1.Lock() defer t.m1.Unlock() - t.state = s - t.updateStateCalls++ + t.addrs = addresses + t.a++ } -func (t *testClientConn) getState() (resolver.State, int) { +func (t *testClientConn) getAddress() ([]resolver.Address, int) { t.m1.Lock() defer t.m1.Unlock() - return t.state, t.updateStateCalls + return t.addrs, t.a } -func scFromState(s resolver.State) string { - if s.ServiceConfig != nil { - if s.ServiceConfig.Err != nil { - return "" - } - return s.ServiceConfig.Config.(unparsedServiceConfig).config - } - return "" +func (t *testClientConn) NewServiceConfig(serviceConfig string) { + t.m2.Lock() + defer t.m2.Unlock() + t.sc = serviceConfig + t.s++ } -type unparsedServiceConfig struct { - serviceconfig.Config - config string +func (t *testClientConn) getSc() (string, int) { + t.m2.Lock() + defer t.m2.Unlock() + return t.sc, t.s } -func (t *testClientConn) ParseServiceConfig(s string) *serviceconfig.ParseResult { - return &serviceconfig.ParseResult{Config: unparsedServiceConfig{config: s}} +func (t *testClientConn) ParseServiceConfig(string) *serviceconfig.ParseResult { + panic("not implemented") } func (t *testClientConn) ReportError(error) { @@ -691,23 +698,33 @@ func testDNSResolver(t *testing.T) { if err != nil { t.Fatalf("%v\n", err) } - var state resolver.State + var addrs []resolver.Address var cnt int - for i := 0; i < 2000; i++ { - state, cnt = cc.getState() + for { + addrs, cnt = cc.getAddress() if cnt > 0 { break } time.Sleep(time.Millisecond) } - if cnt == 0 { - t.Fatalf("UpdateState not called after 2s; aborting") + var sc string + if a.scWant != "" { + for { + sc, cnt = cc.getSc() + if cnt > 0 { + break + } + time.Sleep(time.Millisecond) + } + } else { + // A new service config should never be produced; call getSc once + // just in case. + sc, _ = cc.getSc() } - if !reflect.DeepEqual(a.addrWant, state.Addresses) { - t.Errorf("Resolved addresses of target: %q = %+v, want %+v\n", a.target, state.Addresses, a.addrWant) + if !reflect.DeepEqual(a.addrWant, addrs) { + t.Errorf("Resolved addresses of target: %q = %+v, want %+v\n", a.target, addrs, a.addrWant) } - sc := scFromState(state) - if a.scWant != sc { + if !reflect.DeepEqual(a.scWant, sc) { t.Errorf("Resolved service config of target: %q = %+v, want %+v\n", a.target, sc, a.scWant) } r.Close() @@ -737,7 +754,7 @@ func testDNSResolverWithSRV(t *testing.T) { }, { "srv.ipv4.single.fake", - []resolver.Address{{Addr: "2.4.6.8" + colonDefaultPort}, {Addr: "1.2.3.4:1234", Type: resolver.GRPCLB, ServerName: "ipv4.single.fake"}}, + []resolver.Address{{Addr: "1.2.3.4:1234", Type: resolver.GRPCLB, ServerName: "ipv4.single.fake"}, {Addr: "2.4.6.8" + colonDefaultPort}}, generateSC("srv.ipv4.single.fake"), }, { @@ -772,26 +789,36 @@ func testDNSResolverWithSRV(t *testing.T) { if err != nil { t.Fatalf("%v\n", err) } - defer r.Close() - var state resolver.State + var addrs []resolver.Address var cnt int - for i := 0; i < 2000; i++ { - state, cnt = cc.getState() + for { + addrs, cnt = cc.getAddress() if cnt > 0 { break } time.Sleep(time.Millisecond) } - if cnt == 0 { - t.Fatalf("UpdateState not called after 2s; aborting") + var sc string + if a.scWant != "" { + for { + sc, cnt = cc.getSc() + if cnt > 0 { + break + } + time.Sleep(time.Millisecond) + } + } else { + // A new service config should never be produced; call getSc once + // just in case. + sc, _ = cc.getSc() } - if !reflect.DeepEqual(a.addrWant, state.Addresses) { - t.Errorf("Resolved addresses of target: %q = %+v, want %+v\n", a.target, state.Addresses, a.addrWant) + if !reflect.DeepEqual(a.addrWant, addrs) { + t.Errorf("Resolved addresses of target: %q = %+v, want %+v\n", a.target, addrs, a.addrWant) } - sc := scFromState(state) - if a.scWant != sc { + if !reflect.DeepEqual(a.scWant, sc) { t.Errorf("Resolved service config of target: %q = %+v, want %+v\n", a.target, sc, a.scWant) } + r.Close() } } @@ -840,47 +867,55 @@ func testDNSResolveNow(t *testing.T) { if err != nil { t.Fatalf("%v\n", err) } - defer r.Close() - var state resolver.State + var addrs []resolver.Address var cnt int - for i := 0; i < 2000; i++ { - state, cnt = cc.getState() + for { + addrs, cnt = cc.getAddress() if cnt > 0 { break } time.Sleep(time.Millisecond) } - if cnt == 0 { - t.Fatalf("UpdateState not called after 2s; aborting. state=%v", state) - } - if !reflect.DeepEqual(a.addrWant, state.Addresses) { - t.Errorf("Resolved addresses of target: %q = %+v, want %+v\n", a.target, state.Addresses, a.addrWant) - } - sc := scFromState(state) - if a.scWant != sc { - t.Errorf("Resolved service config of target: %q = %+v, want %+v\n", a.target, sc, a.scWant) - } - - revertTbl := mutateTbl(a.target) - r.ResolveNow(resolver.ResolveNowOptions{}) - for i := 0; i < 2000; i++ { - state, cnt = cc.getState() - if cnt == 2 { + var sc string + for { + sc, cnt = cc.getSc() + if cnt > 0 { break } time.Sleep(time.Millisecond) } - if cnt != 2 { - t.Fatalf("UpdateState not called after 2s; aborting. state=%v", state) + if !reflect.DeepEqual(a.addrWant, addrs) { + t.Errorf("Resolved addresses of target: %q = %+v, want %+v\n", a.target, addrs, a.addrWant) } - sc = scFromState(state) - if !reflect.DeepEqual(a.addrNext, state.Addresses) { - t.Errorf("Resolved addresses of target: %q = %+v, want %+v\n", a.target, state.Addresses, a.addrNext) + if !reflect.DeepEqual(a.scWant, sc) { + t.Errorf("Resolved service config of target: %q = %+v, want %+v\n", a.target, sc, a.scWant) } - if a.scNext != sc { + revertTbl := mutateTbl(a.target) + r.ResolveNow(resolver.ResolveNowOptions{}) + for i := 0; i < 1000; i++ { + addrs, cnt = cc.getAddress() + // Break if the address list changes or enough redundant updates happen. + if !reflect.DeepEqual(addrs, a.addrWant) || cnt > 10 { + break + } + time.Sleep(time.Millisecond) + } + for i := 0; i < 1000; i++ { + sc, cnt = cc.getSc() + // Break if the service config changes or enough redundant updates happen. + if !reflect.DeepEqual(sc, a.scWant) || cnt > 10 { + break + } + time.Sleep(time.Millisecond) + } + if !reflect.DeepEqual(a.addrNext, addrs) { + t.Errorf("Resolved addresses of target: %q = %+v, want %+v\n", a.target, addrs, a.addrNext) + } + if !reflect.DeepEqual(a.scNext, sc) { t.Errorf("Resolved service config of target: %q = %+v, want %+v\n", a.target, sc, a.scNext) } revertTbl() + r.Close() } } @@ -911,26 +946,29 @@ func testIPResolver(t *testing.T) { if err != nil { t.Fatalf("%v\n", err) } - var state resolver.State + var addrs []resolver.Address var cnt int for { - state, cnt = cc.getState() + addrs, cnt = cc.getAddress() if cnt > 0 { break } time.Sleep(time.Millisecond) } - if !reflect.DeepEqual(v.want, state.Addresses) { - t.Errorf("Resolved addresses of target: %q = %+v, want %+v\n", v.target, state.Addresses, v.want) + if !reflect.DeepEqual(v.want, addrs) { + t.Errorf("Resolved addresses of target: %q = %+v, want %+v\n", v.target, addrs, v.want) } r.ResolveNow(resolver.ResolveNowOptions{}) - for i := 0; i < 50; i++ { - state, cnt = cc.getState() - if cnt > 1 { - t.Fatalf("Unexpected second call by resolver to UpdateState. state: %v", state) + for { + addrs, cnt = cc.getAddress() + if cnt == 2 { + break } time.Sleep(time.Millisecond) } + if !reflect.DeepEqual(v.want, addrs) { + t.Errorf("Resolved addresses of target: %q = %+v, want %+v\n", v.target, addrs, v.want) + } r.Close() } } @@ -968,7 +1006,7 @@ func TestResolveFunc(t *testing.T) { r.Close() } if !reflect.DeepEqual(err, v.want) { - t.Errorf("Build(%q, cc, _) = %v, want %v", v.addr, err, v.want) + t.Errorf("Build(%q, cc, resolver.BuildOptions{}) = %v, want %v", v.addr, err, v.want) } } } @@ -999,23 +1037,26 @@ func TestDisableServiceConfig(t *testing.T) { if err != nil { t.Fatalf("%v\n", err) } - defer r.Close() var cnt int - var state resolver.State - for i := 0; i < 2000; i++ { - state, cnt = cc.getState() + var sc string + // First wait for addresses. We know service configs are reported + // first, so once addresses have been reported, we can then check to + // see whether any configs have been reported.. + for i := 0; i < 1000; i++ { + _, cnt = cc.getAddress() if cnt > 0 { break } time.Sleep(time.Millisecond) } - if cnt == 0 { - t.Fatalf("UpdateState not called after 2s; aborting") + sc, cnt = cc.getSc() + if a.disableServiceConfig && cnt > 0 { + t.Errorf("Resolver reported a service config even though lookups are disabled: sc=%v, cnt=%v", sc, cnt) } - sc := scFromState(state) - if a.scWant != sc { + if !reflect.DeepEqual(a.scWant, sc) { t.Errorf("Resolved service config of target: %q = %+v, want %+v\n", a.target, sc, a.scWant) } + r.Close() } } @@ -1027,49 +1068,49 @@ func TestDNSResolverRetry(t *testing.T) { if err != nil { t.Fatalf("%v\n", err) } - defer r.Close() - var state resolver.State - for i := 0; i < 2000; i++ { - state, _ = cc.getState() - if len(state.Addresses) == 1 { + var addrs []resolver.Address + for { + addrs, _ = cc.getAddress() + if len(addrs) == 1 { break } time.Sleep(time.Millisecond) } - if len(state.Addresses) != 1 { - t.Fatalf("UpdateState not called with 1 address after 2s; aborting. state=%v", state) - } want := []resolver.Address{{Addr: "1.2.3.4" + colonDefaultPort}} - if !reflect.DeepEqual(want, state.Addresses) { - t.Errorf("Resolved addresses of target: %q = %+v, want %+v\n", target, state.Addresses, want) + if !reflect.DeepEqual(want, addrs) { + t.Errorf("Resolved addresses of target: %q = %+v, want %+v\n", target, addrs, want) } // mutate the host lookup table so the target has 0 address returned. revertTbl := mutateTbl(target) // trigger a resolve that will get empty address list r.ResolveNow(resolver.ResolveNowOptions{}) - for i := 0; i < 2000; i++ { - state, _ = cc.getState() - if len(state.Addresses) == 0 { + for { + addrs, _ = cc.getAddress() + if len(addrs) == 0 { break } time.Sleep(time.Millisecond) } - if len(state.Addresses) != 0 { - t.Fatalf("UpdateState not called with 0 address after 2s; aborting. state=%v", state) - } revertTbl() // wait for the retry to happen in two seconds. - r.ResolveNow(resolver.ResolveNowOptions{}) - for i := 0; i < 2000; i++ { - state, _ = cc.getState() - if len(state.Addresses) == 1 { - break + timer := time.NewTimer(2 * time.Second) +loop: + for { + select { + case <-timer.C: + break loop + default: + addrs, _ = cc.getAddress() + if len(addrs) != 0 { + break loop + } + time.Sleep(time.Millisecond) } - time.Sleep(time.Millisecond) } - if !reflect.DeepEqual(want, state.Addresses) { - t.Errorf("Resolved addresses of target: %q = %+v, want %+v\n", target, state.Addresses, want) + if !reflect.DeepEqual(want, addrs) { + t.Errorf("Resolved addresses of target: %q = %+v, want %+v\n", target, addrs, want) } + r.Close() } func TestCustomAuthority(t *testing.T) { @@ -1256,16 +1297,16 @@ func TestRateLimitedResolve(t *testing.T) { } wantAddrs := []resolver.Address{{Addr: "1.2.3.4" + colonDefaultPort}, {Addr: "5.6.7.8" + colonDefaultPort}} - var state resolver.State + var gotAddrs []resolver.Address for { var cnt int - state, cnt = cc.getState() + gotAddrs, cnt = cc.getAddress() if cnt > 0 { break } time.Sleep(time.Millisecond) } - if !reflect.DeepEqual(state.Addresses, wantAddrs) { - t.Errorf("Resolved addresses of target: %q = %+v, want %+v\n", target, state.Addresses, wantAddrs) + if !reflect.DeepEqual(gotAddrs, wantAddrs) { + t.Errorf("Resolved addresses of target: %q = %+v, want %+v\n", target, gotAddrs, wantAddrs) } }