From 8d7cb9253d459a204faa8232ff02ddf077e4acf9 Mon Sep 17 00:00:00 2001 From: iamqizhao Date: Wed, 7 Oct 2015 16:40:10 -0700 Subject: [PATCH] refactor naming package --- naming/etcd/etcd.go | 178 ++++++++++++++++++++++---------------------- naming/naming.go | 41 +++++----- 2 files changed, 108 insertions(+), 111 deletions(-) diff --git a/naming/etcd/etcd.go b/naming/etcd/etcd.go index 362649da..8a877ede 100644 --- a/naming/etcd/etcd.go +++ b/naming/etcd/etcd.go @@ -34,115 +34,49 @@ package etcd import ( + "sync" + etcdcl "github.com/coreos/etcd/client" "golang.org/x/net/context" "google.golang.org/grpc/naming" ) -// update defines an etcd key-value update. -type update struct { - key, val string -} -// getNode reports the set of changes starting from node recursively. -func getNode(node *etcdcl.Node) (updates []*update) { - for _, v := range node.Nodes { - updates = append(updates, getNode(v)...) - } +// getNode builds the key-value map starting from node recursively. It returns the +// max etcdcl.Node.ModifiedIndex starting from that node. +func getNode(node *etcdcl.Node, kv map[string]string) uint64 { if !node.Dir { - u := &update{ - key: node.Key, - val: node.Value, - } - updates = []*update{u} + kv[node.Key] = node.Value + return node.ModifiedIndex } - return -} - -type watcher struct { - wr etcdcl.Watcher - ctx context.Context - cancel context.CancelFunc - kv map[string]string -} - -func (w *watcher) Next() (nu []*naming.Update, _ error) { - for { - resp, err := w.wr.Next(w.ctx) - if err != nil { - return nil, err - } - updates := getNode(resp.Node) - for _, u := range updates { - switch resp.Action { - case "set": - if resp.PrevNode == nil { - w.kv[u.key] = u.val - nu = append(nu, &naming.Update{ - Op: naming.Add, - Addr: u.val, - }) - } else { - nu = append(nu, &naming.Update{ - Op: naming.Delete, - Addr: w.kv[u.key], - }) - nu = append(nu, &naming.Update{ - Op: naming.Add, - Addr: u.val, - }) - w.kv[u.key] = u.val - } - case "delete": - nu = append(nu, &naming.Update{ - Op: naming.Delete, - Addr: w.kv[u.key], - }) - delete(w.kv, u.key) - } - } - if len(nu) > 0 { - break + var max uint64 + for _, v := range node.Nodes { + i := getNode(v, kv) + if max < i { + max = i } } - return nu, nil -} - -func (w *watcher) Stop() { - w.cancel() + return max } type resolver struct { kapi etcdcl.KeysAPI - kv map[string]string } -func (r *resolver) NewWatcher(target string) naming.Watcher { - ctx, cancel := context.WithCancel(context.Background()) - w := &watcher{ - wr: r.kapi.Watcher(target, &etcdcl.WatcherOptions{Recursive: true}), - ctx: ctx, - cancel: cancel, - } - for k, v := range r.kv { - w.kv[k] = v - } - return w -} - -func (r *resolver) Resolve(target string) (nu []*naming.Update, _ error) { +func (r *resolver) Resolve(target string) (naming.Watcher, error) { resp, err := r.kapi.Get(context.Background(), target, &etcdcl.GetOptions{Recursive: true}) if err != nil { return nil, err } - updates := getNode(resp.Node) - for _, u := range updates { - r.kv[u.key] = u.val - nu = append(nu, &naming.Update{ - Op: naming.Add, - Addr: u.val, - }) - } - return nu, nil + kv := make(map[string]string) + // Record the index in order to avoid missing updates between Get returning and + // watch starting. + index := getNode(resp.Node, kv) + return &watcher{ + wr: r.kapi.Watcher(target, &etcdcl.WatcherOptions{ + AfterIndex: index, + Recursive: true}), + kv: kv, + }, nil } // NewResolver creates an etcd-based naming.Resolver. @@ -153,6 +87,68 @@ func NewResolver(cfg etcdcl.Config) (naming.Resolver, error) { } return &resolver{ kapi: etcdcl.NewKeysAPI(c), - kv: make(map[string]string), }, nil } + +type watcher struct { + wr etcdcl.Watcher + kv map[string]string +} + +var once sync.Once + +func (w *watcher) Next(ctx context.Context) (nu []*naming.Update, err error) { + once.Do(func() { + select { + case <-ctx.Done(): + err = ctx.Err() + default: + for _, v := range w.kv { + nu = append(nu, &naming.Update{ + Op: naming.Add, + Addr: v, + }) + } + } + }) + if len(nu) > 0 || err != nil { + // once.Do ran. Return directly. + return + } + for { + resp, err := w.wr.Next(ctx) + if err != nil { + return nil, err + } + if resp.Node.Dir { + continue + } + switch resp.Action { + case "set": + if resp.PrevNode == nil { + nu = append(nu, &naming.Update{ + Op: naming.Add, + Addr: resp.Node.Value, + }) + w.kv[resp.Node.Key] = resp.Node.Value + } else { + nu = append(nu, &naming.Update{ + Op: naming.Delete, + Addr: w.kv[resp.Node.Key], + }) + nu = append(nu, &naming.Update{ + Op: naming.Add, + Addr: resp.Node.Value, + }) + w.kv[resp.Node.Key] = resp.Node.Value + } + case "delete": + nu = append(nu, &naming.Update{ + Op: naming.Delete, + Addr: resp.Node.Value, + }) + delete(w.kv, resp.Node.Key) + } + return nu, nil + } +} diff --git a/naming/naming.go b/naming/naming.go index 610eb811..d0a4f46f 100644 --- a/naming/naming.go +++ b/naming/naming.go @@ -35,40 +35,41 @@ // The interface is EXPERIMENTAL and may be suject to change. package naming -// OP defines the corresponding operations for a name resolution change. -type OP uint8 +import ( + "golang.org/x/net/context" +) + +// Operation defines the corresponding operations for a name resolution change. +type Operation uint8 const ( // Add indicates a new address is added. - Add = iota + Add Operation = iota // Delete indicates an exisiting address is deleted. Delete ) -type ServiceConfig interface{} - -// Update defines a name resolution change. +// Update defines a name resolution update. Notice that it is not valid having both +// empty string Addr and nil Metadata in an Update. type Update struct { // Op indicates the operation of the update. - Op OP - Addr string - Config ServiceConfig + Op Operation + // Addr is the updated address. It is empty string if there is no address update. + Addr string + // Metadata is the updated metadata. It is nil if there is no metadata update. + // Metadata is not required for a custom naming implementation. + Metadata interface{} } -// Resolver does one-shot name resolution and creates a Watcher to -// watch the future updates. +// Resolver creates a Watcher for a target to track its resolution changes. type Resolver interface { - // Resolve returns the name resolution results. - Resolve(target string) ([]*Update, error) - // NewWatcher creates a Watcher to watch the changes on target. - NewWatcher(target string) Watcher + // Resolve creates a Watcher for target. + Resolve(target string) (Watcher, error) } -// Watcher watches the updates for a particular target. +// Watcher watches for the updates on the specified target. type Watcher interface { // Next blocks until an update or error happens. It may return one or more - // updates. - Next() ([]*Update, error) - // Stop stops the Watcher. - Stop() + // updates. The first call should get the full set of the results. + Next(ctx context.Context) ([]*Update, error) }