From ea4b8e03106fa8c210e4d896de3e8708661d92d4 Mon Sep 17 00:00:00 2001 From: yangzhouhan Date: Thu, 6 Aug 2015 16:08:24 -0700 Subject: [PATCH] add comments and revise nameresolver --- clientconn.go | 6 +++- etcdnaming/etcdnaming.go | 63 ++++++++++++++++++++-------------------- 2 files changed, 36 insertions(+), 33 deletions(-) diff --git a/clientconn.go b/clientconn.go index 66392799..1eb26dd5 100644 --- a/clientconn.go +++ b/clientconn.go @@ -72,9 +72,13 @@ type dialOptions struct { type DialOption func(*dialOptions) type NameResolver interface { + // Get returns the map of customized names and the corresponding ip/DNS addresses. Get(target string) map[string]string + // Watch set a watcher on the target directory/key. Watch(target string) - GetUpdate() (key string, val string) + // GetUpdate returns the key and value of the updated namePair of the target directory/key being watched. + GetUpdate() (string, string) + // Stop shut down the watcher. Stop() } diff --git a/etcdnaming/etcdnaming.go b/etcdnaming/etcdnaming.go index ea10ed49..4f4246d9 100644 --- a/etcdnaming/etcdnaming.go +++ b/etcdnaming/etcdnaming.go @@ -12,7 +12,7 @@ type namePair struct { key, value string } -// recvBuffer is an unbounded channel of item. +// recvBuffer is an unbounded channel of *namePair. type recvBuffer struct { c chan *namePair mu sync.Mutex @@ -30,25 +30,27 @@ func newRecvBuffer() *recvBuffer { func (b *recvBuffer) put(r *namePair) { b.mu.Lock() defer b.mu.Unlock() + if b.stopping { + return + } b.backlog = append(b.backlog, r) - if !b.stopping { - select { - case b.c <- b.backlog[0]: - b.backlog = b.backlog[1:] - default: - } + select { + case b.c <- b.backlog[0]: + b.backlog = b.backlog[1:] + default: } } func (b *recvBuffer) load() { b.mu.Lock() defer b.mu.Unlock() - if len(b.backlog) > 0 && !b.stopping { - select { - case b.c <- b.backlog[0]: - b.backlog = b.backlog[1:] - default: - } + if b.stopping || len(b.backlog) == 0 { + return + } + select { + case b.c <- b.backlog[0]: + b.backlog = b.backlog[1:] + default: } } @@ -64,20 +66,28 @@ func (b *recvBuffer) stop() { } type etcdNR struct { - cfg etcdcl.Config + kAPI etcdcl.KeysAPI recv *recvBuffer ctx context.Context cancel context.CancelFunc } func NewETCDNR(cfg etcdcl.Config) *etcdNR { + c, err := etcdcl.New(cfg) + if err != nil { + panic(err) + } + kAPI := etcdcl.NewKeysAPI(c) + ctx, cancel := context.WithCancel(context.Background()) return &etcdNR{ - cfg: cfg, - recv: newRecvBuffer(), - ctx: context.Background(), + kAPI: kAPI, + recv: newRecvBuffer(), + ctx: ctx, + cancel: cancel, } } +// getNode can be called recursively to build the result key-value map func getNode(node *etcdcl.Node, res map[string]string) { if !node.Dir { res[node.Key] = node.Value @@ -89,13 +99,7 @@ func getNode(node *etcdcl.Node, res map[string]string) { } func (nr *etcdNR) Get(target string) map[string]string { - cfg := nr.cfg - c, err := etcdcl.New(cfg) - if err != nil { - panic(err) - } - kAPI := etcdcl.NewKeysAPI(c) - resp, err := kAPI.Get(nr.ctx, target, &etcdcl.GetOptions{Recursive: true, Sort: true}) + resp, err := nr.kAPI.Get(nr.ctx, target, &etcdcl.GetOptions{Recursive: true, Sort: true}) if err != nil { fmt.Printf("non-nil error: %v", err) } @@ -106,13 +110,7 @@ func (nr *etcdNR) Get(target string) map[string]string { } func (nr *etcdNR) Watch(target string) { - cfg := nr.cfg - c, err := etcdcl.New(cfg) - if err != nil { - panic(err) - } - kAPI := etcdcl.NewKeysAPI(c) - watcher := kAPI.Watcher(target, &etcdcl.WatcherOptions{Recursive: true}) + watcher := nr.kAPI.Watcher(target, &etcdcl.WatcherOptions{Recursive: true}) for { ctx, cancel := context.WithCancel(nr.ctx) nr.ctx = ctx @@ -130,10 +128,11 @@ func (nr *etcdNR) Watch(target string) { } } -func (nr *etcdNR) GetUpdate() (key string, val string) { +func (nr *etcdNR) GetUpdate() (string, string) { select { case i := <-nr.recv.get(): nr.recv.load() + // returns key and the corresponding value of the updated namePair return i.key, i.value } }