From c2367521ced62349173899c3e573430bb3be983e Mon Sep 17 00:00:00 2001 From: yangzhouhan Date: Mon, 3 Aug 2015 14:43:11 -0700 Subject: [PATCH 01/17] add etcd naming resoluation service and tests --- etcdnaming/etcdnaming.go | 73 +++++++++++++++++++++++++++++++++++ etcdnaming/etcdnaming_test.go | 29 ++++++++++++++ 2 files changed, 102 insertions(+) create mode 100644 etcdnaming/etcdnaming.go create mode 100644 etcdnaming/etcdnaming_test.go diff --git a/etcdnaming/etcdnaming.go b/etcdnaming/etcdnaming.go new file mode 100644 index 00000000..10727f90 --- /dev/null +++ b/etcdnaming/etcdnaming.go @@ -0,0 +1,73 @@ +package etcdnaming + +import ( + "io/ioutil" + "net/http" + "strings" +) + +func parser(body string) map[string]string { + if body == "" { + return nil + } + res := make(map[string]string) + str := strings.Replace(body, "nodes", "Nodes", -1) + subStr := strings.Split(str, "node") + for _, entry := range subStr { + i := 0 + var last int + if strings.Contains(entry, "prevNode") { + last = strings.IndexAny(entry, "prevNode") + } else { + last = len(entry) + } + for i < last { + if entry[i] == '{' { + j := i + 1 + subEntry := []byte{} + for j < len(entry) && entry[j] != '{' && entry[j] != '}' { + subEntry = append(subEntry, entry[j]) + j++ + } + if j < len(entry) && entry[j] == '}' { + var ( + key string + value string + ) + for _, val := range strings.Split(string(subEntry), ",") { + if strings.Contains(val, "key") { + key = strings.Replace(strings.SplitN(val, ":", 2)[1], `"`, "", -1) + } + if strings.Contains(val, "value") { + value = strings.Replace(strings.SplitN(val, ":", 2)[1], `"`, "", -1) + } + res[key] = value + + } + } + i = j + } else { + i++ + } + } + } + return res +} + +func Etcdnaming(host string, key string, watch bool) map[string]string { + var comm string + if !watch { + comm = "http://" + host + "/v2/keys/" + key + "/?recursive=true" + } else { + comm = "http://" + host + "/v2/keys/" + key + `?wait=true&recursive=true` + } + resp, err := http.Get(comm) + if err != nil { + panic(err) + } + defer resp.Body.Close() + var body []byte + body, err = ioutil.ReadAll(resp.Body) + res := parser(string(body)) + return res +} diff --git a/etcdnaming/etcdnaming_test.go b/etcdnaming/etcdnaming_test.go new file mode 100644 index 00000000..d3370977 --- /dev/null +++ b/etcdnaming/etcdnaming_test.go @@ -0,0 +1,29 @@ +package etcdnaming + +import ( + "testing" +) + +func TestEtcdParsing(t *testing.T) { + for _, test := range []struct { + // input + body string + // expected + expected map[string]string + }{ + {"", nil}, + {`{"action":"set","node":{"key":"/foo","value":"bar5","modifiedIndex":30,"createdIndex":30}}`, map[string]string{"/foo": "bar5"}}, + {`{"action":"set","node":{"key":"/foo_dir/foo2","value":"bar2","modifiedIndex":59,"createdIndex":59},"prevNode":{"key":"/foo_dir/foo2","value":"bar","modifiedIndex":58,"createdIndex":58}}`, map[string]string{"/foo_dir/foo2": "bar2"}}, + {`{"action":"get","node":{"key":"/foo_dir","dir":true,"nodes":[{"key":"/foo_dir/foo2","value":"bar2","modifiedIndex":33,"createdIndex":33},{"key":"/foo_dir/bar_dir","dir":true,"nodes":[{"key":"/foo_dir/bar_dir/bar1","value":"a","modifiedIndex":35,"createdIndex":35},{"key":"/foo_dir/bar_dir/bar2","value":"b","modifiedIndex":36,"createdIndex":36}],"modifiedIndex":34,"createdIndex":34},{"key":"/foo_dir/bar_dir2","dir":true,"nodes":[{"key":"/foo_dir/bar_dir2/bar3","value":"c","modifiedIndex":39,"createdIndex":39}],"modifiedIndex":38,"createdIndex":38},{"key":"/foo_dir/foo6","value":"newbar2","modifiedIndex":48,"createdIndex":48},{"key":"/foo_dir/foo3","value":"newbar2","modifiedIndex":50,"createdIndex":50},{"key":"/foo_dir/foo","value":"newbar","modifiedIndex":42,"createdIndex":42}],"modifiedIndex":32,"createdIndex":32}}`, map[string]string{"/foo_dir/foo2": "bar2", "/foo_dir/bar_dir/bar1": "a", "/foo_dir/bar_dir/bar2": "b", "/foo_dir/bar_dir2/bar3": "c", "/foo_dir/foo6": "newbar2", "/foo_dir/foo3": "newbar2", "/foo_dir/foo": "newbar"}}, + } { + out := parser(test.body) + if len(test.expected) != len(out) { + t.Fatalf("want result length %v, get %v", len(test.expected), len(out)) + } + for key, _ := range out { + if test.expected[key] != out[key] { + t.Fatalf("expected result error!, want %v get %v", test.expected[key], out[key]) + } + } + } +} From 4623618e6b6b844fec5fbd7f99f6f81ad25a2f3e Mon Sep 17 00:00:00 2001 From: yangzhouhan Date: Wed, 5 Aug 2015 16:04:56 -0700 Subject: [PATCH 02/17] add an etcd client for naming resolution --- etcdnaming/etcdnaming.go | 191 +++++++++++++++++++++++----------- etcdnaming/etcdnaming_test.go | 29 ------ 2 files changed, 131 insertions(+), 89 deletions(-) delete mode 100644 etcdnaming/etcdnaming_test.go diff --git a/etcdnaming/etcdnaming.go b/etcdnaming/etcdnaming.go index 10727f90..76b7da10 100644 --- a/etcdnaming/etcdnaming.go +++ b/etcdnaming/etcdnaming.go @@ -1,73 +1,144 @@ package etcdnaming import ( - "io/ioutil" - "net/http" - "strings" + "fmt" + "golang.org/x/net/context" + "sync" + + etcdcl "github.com/coreos/etcd/client" ) -func parser(body string) map[string]string { - if body == "" { - return nil - } - res := make(map[string]string) - str := strings.Replace(body, "nodes", "Nodes", -1) - subStr := strings.Split(str, "node") - for _, entry := range subStr { - i := 0 - var last int - if strings.Contains(entry, "prevNode") { - last = strings.IndexAny(entry, "prevNode") - } else { - last = len(entry) - } - for i < last { - if entry[i] == '{' { - j := i + 1 - subEntry := []byte{} - for j < len(entry) && entry[j] != '{' && entry[j] != '}' { - subEntry = append(subEntry, entry[j]) - j++ - } - if j < len(entry) && entry[j] == '}' { - var ( - key string - value string - ) - for _, val := range strings.Split(string(subEntry), ",") { - if strings.Contains(val, "key") { - key = strings.Replace(strings.SplitN(val, ":", 2)[1], `"`, "", -1) - } - if strings.Contains(val, "value") { - value = strings.Replace(strings.SplitN(val, ":", 2)[1], `"`, "", -1) - } - res[key] = value - - } - } - i = j - } else { - i++ - } - } - } - return res +type namePair struct { + key, value string } -func Etcdnaming(host string, key string, watch bool) map[string]string { - var comm string - if !watch { - comm = "http://" + host + "/v2/keys/" + key + "/?recursive=true" - } else { - comm = "http://" + host + "/v2/keys/" + key + `?wait=true&recursive=true` +// recvBuffer is an unbounded channel of item. +type recvBuffer struct { + c chan *namePair + mu sync.Mutex + stopping bool + backlog []*namePair +} + +func newRecvBuffer() *recvBuffer { + b := &recvBuffer{ + c: make(chan *namePair, 1), } - resp, err := http.Get(comm) + return b +} + +func (b *recvBuffer) put(r *namePair) { + b.mu.Lock() + defer b.mu.Unlock() + b.backlog = append(b.backlog, r) + if !b.stopping { + select { + case b.c <- b.backlog[0]: + b.backlog = b.backlog[1:] + default: + } + } +} + +func (b *recvBuffer) load() { + b.mu.Lock() + defer b.mu.Unlock() + if len(b.backlog) > 0 && !b.stopping { + select { + case b.c <- b.backlog[0]: + b.backlog = b.backlog[1:] + default: + } + } +} + +func (b *recvBuffer) get() <-chan *namePair { + return b.c +} + +func (b *recvBuffer) stop() { + b.mu.Lock() + b.stopping = true + close(b.c) + b.mu.Unlock() +} + +type etcdNR struct { + cfg etcdcl.Config + recv *recvBuffer + ctx context.Context + cancel context.CancelFunc +} + +func NewETCDNR(cfg etcdcl.Config) *etcdNR { + return &etcdNR{ + cfg: cfg, + recv: newRecvBuffer(), + ctx: context.Background(), + } +} + +func getNode(node *etcdcl.Node, res map[string]string) { + if !node.Dir { + res[node.Key] = node.Value + return + } + for _, val := range node.Nodes { + getNode(val, res) + } +} + +func (nr *etcdNR) Get(target string) map[string]string { + cfg := nr.cfg + c, err := etcdcl.New(cfg) if err != nil { panic(err) } - defer resp.Body.Close() - var body []byte - body, err = ioutil.ReadAll(resp.Body) - res := parser(string(body)) + kAPI := etcdcl.NewKeysAPI(c) + resp, err := kAPI.Get(nr.ctx, target, &etcdcl.GetOptions{Recursive: true, Sort: true}) + if err != nil { + fmt.Printf("non-nil error: %v", err) + } + node := resp.Node + res := make(map[string]string) + getNode(node, res) return res } + +func (nr *etcdNR) Watch(target string) { + cfg := nr.cfg + c, err := etcdcl.New(cfg) + if err != nil { + panic(err) + } + kAPI := etcdcl.NewKeysAPI(c) + watcher := kAPI.Watcher(target, &etcdcl.WatcherOptions{Recursive: true}) + for { + ctx, cancel := context.WithCancel(nr.ctx) + nr.ctx = ctx + nr.cancel = cancel + resp, err := watcher.Next(nr.ctx) + if err != nil { + fmt.Printf("non-nil error: %v", err) + break + } + if resp.Node.Dir { + continue + } + entry := &namePair{key: resp.Node.Key, value: resp.Node.Value} + nr.recv.put(entry) + } +} + +func (nr *etcdNR) GetUpdate() *namePair { + select { + case i := <-nr.recv.get(): + nr.recv.load() + return i + } +} + +func (nr *etcdNR) Stop() { + nr.recv.stop() + nr.cancel() +} diff --git a/etcdnaming/etcdnaming_test.go b/etcdnaming/etcdnaming_test.go deleted file mode 100644 index d3370977..00000000 --- a/etcdnaming/etcdnaming_test.go +++ /dev/null @@ -1,29 +0,0 @@ -package etcdnaming - -import ( - "testing" -) - -func TestEtcdParsing(t *testing.T) { - for _, test := range []struct { - // input - body string - // expected - expected map[string]string - }{ - {"", nil}, - {`{"action":"set","node":{"key":"/foo","value":"bar5","modifiedIndex":30,"createdIndex":30}}`, map[string]string{"/foo": "bar5"}}, - {`{"action":"set","node":{"key":"/foo_dir/foo2","value":"bar2","modifiedIndex":59,"createdIndex":59},"prevNode":{"key":"/foo_dir/foo2","value":"bar","modifiedIndex":58,"createdIndex":58}}`, map[string]string{"/foo_dir/foo2": "bar2"}}, - {`{"action":"get","node":{"key":"/foo_dir","dir":true,"nodes":[{"key":"/foo_dir/foo2","value":"bar2","modifiedIndex":33,"createdIndex":33},{"key":"/foo_dir/bar_dir","dir":true,"nodes":[{"key":"/foo_dir/bar_dir/bar1","value":"a","modifiedIndex":35,"createdIndex":35},{"key":"/foo_dir/bar_dir/bar2","value":"b","modifiedIndex":36,"createdIndex":36}],"modifiedIndex":34,"createdIndex":34},{"key":"/foo_dir/bar_dir2","dir":true,"nodes":[{"key":"/foo_dir/bar_dir2/bar3","value":"c","modifiedIndex":39,"createdIndex":39}],"modifiedIndex":38,"createdIndex":38},{"key":"/foo_dir/foo6","value":"newbar2","modifiedIndex":48,"createdIndex":48},{"key":"/foo_dir/foo3","value":"newbar2","modifiedIndex":50,"createdIndex":50},{"key":"/foo_dir/foo","value":"newbar","modifiedIndex":42,"createdIndex":42}],"modifiedIndex":32,"createdIndex":32}}`, map[string]string{"/foo_dir/foo2": "bar2", "/foo_dir/bar_dir/bar1": "a", "/foo_dir/bar_dir/bar2": "b", "/foo_dir/bar_dir2/bar3": "c", "/foo_dir/foo6": "newbar2", "/foo_dir/foo3": "newbar2", "/foo_dir/foo": "newbar"}}, - } { - out := parser(test.body) - if len(test.expected) != len(out) { - t.Fatalf("want result length %v, get %v", len(test.expected), len(out)) - } - for key, _ := range out { - if test.expected[key] != out[key] { - t.Fatalf("expected result error!, want %v get %v", test.expected[key], out[key]) - } - } - } -} From ab3efa6a5d54d5c79c56e41953de14a34786b4aa Mon Sep 17 00:00:00 2001 From: yangzhouhan Date: Wed, 5 Aug 2015 16:16:18 -0700 Subject: [PATCH 03/17] add nameresolver interface --- etcdnaming/etcdnaming.go | 7 +++++++ 1 file changed, 7 insertions(+) diff --git a/etcdnaming/etcdnaming.go b/etcdnaming/etcdnaming.go index 76b7da10..d6efaeba 100644 --- a/etcdnaming/etcdnaming.go +++ b/etcdnaming/etcdnaming.go @@ -63,6 +63,13 @@ func (b *recvBuffer) stop() { b.mu.Unlock() } +type NameResolver interface { + Get(target string) map[string]string + Watch(target string) + GetUpdate() *namePair + Stop() +} + type etcdNR struct { cfg etcdcl.Config recv *recvBuffer From 6fd3b74078cab7931ac607be3744231cb2dd6ec1 Mon Sep 17 00:00:00 2001 From: yangzhouhan Date: Wed, 5 Aug 2015 16:33:22 -0700 Subject: [PATCH 04/17] move NameResolver interface to clientconn --- clientconn.go | 7 +++++++ etcdnaming/etcdnaming.go | 11 ++--------- 2 files changed, 9 insertions(+), 9 deletions(-) diff --git a/clientconn.go b/clientconn.go index 52d47a57..66392799 100644 --- a/clientconn.go +++ b/clientconn.go @@ -71,6 +71,13 @@ type dialOptions struct { // DialOption configures how we set up the connection. type DialOption func(*dialOptions) +type NameResolver interface { + Get(target string) map[string]string + Watch(target string) + GetUpdate() (key string, val string) + Stop() +} + // WithCodec returns a DialOption which sets a codec for message marshaling and unmarshaling. func WithCodec(c Codec) DialOption { return func(o *dialOptions) { diff --git a/etcdnaming/etcdnaming.go b/etcdnaming/etcdnaming.go index d6efaeba..ea10ed49 100644 --- a/etcdnaming/etcdnaming.go +++ b/etcdnaming/etcdnaming.go @@ -63,13 +63,6 @@ func (b *recvBuffer) stop() { b.mu.Unlock() } -type NameResolver interface { - Get(target string) map[string]string - Watch(target string) - GetUpdate() *namePair - Stop() -} - type etcdNR struct { cfg etcdcl.Config recv *recvBuffer @@ -137,11 +130,11 @@ func (nr *etcdNR) Watch(target string) { } } -func (nr *etcdNR) GetUpdate() *namePair { +func (nr *etcdNR) GetUpdate() (key string, val string) { select { case i := <-nr.recv.get(): nr.recv.load() - return i + return i.key, i.value } } From ea4b8e03106fa8c210e4d896de3e8708661d92d4 Mon Sep 17 00:00:00 2001 From: yangzhouhan Date: Thu, 6 Aug 2015 16:08:24 -0700 Subject: [PATCH 05/17] add comments and revise nameresolver --- clientconn.go | 6 +++- etcdnaming/etcdnaming.go | 63 ++++++++++++++++++++-------------------- 2 files changed, 36 insertions(+), 33 deletions(-) diff --git a/clientconn.go b/clientconn.go index 66392799..1eb26dd5 100644 --- a/clientconn.go +++ b/clientconn.go @@ -72,9 +72,13 @@ type dialOptions struct { type DialOption func(*dialOptions) type NameResolver interface { + // Get returns the map of customized names and the corresponding ip/DNS addresses. Get(target string) map[string]string + // Watch set a watcher on the target directory/key. Watch(target string) - GetUpdate() (key string, val string) + // GetUpdate returns the key and value of the updated namePair of the target directory/key being watched. + GetUpdate() (string, string) + // Stop shut down the watcher. Stop() } diff --git a/etcdnaming/etcdnaming.go b/etcdnaming/etcdnaming.go index ea10ed49..4f4246d9 100644 --- a/etcdnaming/etcdnaming.go +++ b/etcdnaming/etcdnaming.go @@ -12,7 +12,7 @@ type namePair struct { key, value string } -// recvBuffer is an unbounded channel of item. +// recvBuffer is an unbounded channel of *namePair. type recvBuffer struct { c chan *namePair mu sync.Mutex @@ -30,25 +30,27 @@ func newRecvBuffer() *recvBuffer { func (b *recvBuffer) put(r *namePair) { b.mu.Lock() defer b.mu.Unlock() + if b.stopping { + return + } b.backlog = append(b.backlog, r) - if !b.stopping { - select { - case b.c <- b.backlog[0]: - b.backlog = b.backlog[1:] - default: - } + select { + case b.c <- b.backlog[0]: + b.backlog = b.backlog[1:] + default: } } func (b *recvBuffer) load() { b.mu.Lock() defer b.mu.Unlock() - if len(b.backlog) > 0 && !b.stopping { - select { - case b.c <- b.backlog[0]: - b.backlog = b.backlog[1:] - default: - } + if b.stopping || len(b.backlog) == 0 { + return + } + select { + case b.c <- b.backlog[0]: + b.backlog = b.backlog[1:] + default: } } @@ -64,20 +66,28 @@ func (b *recvBuffer) stop() { } type etcdNR struct { - cfg etcdcl.Config + kAPI etcdcl.KeysAPI recv *recvBuffer ctx context.Context cancel context.CancelFunc } func NewETCDNR(cfg etcdcl.Config) *etcdNR { + c, err := etcdcl.New(cfg) + if err != nil { + panic(err) + } + kAPI := etcdcl.NewKeysAPI(c) + ctx, cancel := context.WithCancel(context.Background()) return &etcdNR{ - cfg: cfg, - recv: newRecvBuffer(), - ctx: context.Background(), + kAPI: kAPI, + recv: newRecvBuffer(), + ctx: ctx, + cancel: cancel, } } +// getNode can be called recursively to build the result key-value map func getNode(node *etcdcl.Node, res map[string]string) { if !node.Dir { res[node.Key] = node.Value @@ -89,13 +99,7 @@ func getNode(node *etcdcl.Node, res map[string]string) { } func (nr *etcdNR) Get(target string) map[string]string { - cfg := nr.cfg - c, err := etcdcl.New(cfg) - if err != nil { - panic(err) - } - kAPI := etcdcl.NewKeysAPI(c) - resp, err := kAPI.Get(nr.ctx, target, &etcdcl.GetOptions{Recursive: true, Sort: true}) + resp, err := nr.kAPI.Get(nr.ctx, target, &etcdcl.GetOptions{Recursive: true, Sort: true}) if err != nil { fmt.Printf("non-nil error: %v", err) } @@ -106,13 +110,7 @@ func (nr *etcdNR) Get(target string) map[string]string { } func (nr *etcdNR) Watch(target string) { - cfg := nr.cfg - c, err := etcdcl.New(cfg) - if err != nil { - panic(err) - } - kAPI := etcdcl.NewKeysAPI(c) - watcher := kAPI.Watcher(target, &etcdcl.WatcherOptions{Recursive: true}) + watcher := nr.kAPI.Watcher(target, &etcdcl.WatcherOptions{Recursive: true}) for { ctx, cancel := context.WithCancel(nr.ctx) nr.ctx = ctx @@ -130,10 +128,11 @@ func (nr *etcdNR) Watch(target string) { } } -func (nr *etcdNR) GetUpdate() (key string, val string) { +func (nr *etcdNR) GetUpdate() (string, string) { select { case i := <-nr.recv.get(): nr.recv.load() + // returns key and the corresponding value of the updated namePair return i.key, i.value } } From 5b59d19b70f2df7bf2a3f203b994ea9d69bd335c Mon Sep 17 00:00:00 2001 From: yangzhouhan Date: Thu, 6 Aug 2015 17:39:41 -0700 Subject: [PATCH 06/17] add comment for nameresolver --- clientconn.go | 1 + 1 file changed, 1 insertion(+) diff --git a/clientconn.go b/clientconn.go index 1eb26dd5..058ec48a 100644 --- a/clientconn.go +++ b/clientconn.go @@ -71,6 +71,7 @@ type dialOptions struct { // DialOption configures how we set up the connection. type DialOption func(*dialOptions) +// NameResolver defines the functions needed to resolve a customized names. type NameResolver interface { // Get returns the map of customized names and the corresponding ip/DNS addresses. Get(target string) map[string]string From 83e023520cfa7a01eaeb157ee3350e68e0728162 Mon Sep 17 00:00:00 2001 From: yangzhouhan Date: Thu, 6 Aug 2015 19:10:28 -0700 Subject: [PATCH 07/17] change namepair to kv and update the comments --- clientconn.go | 10 +++++----- etcdnaming/etcdnaming.go | 2 +- 2 files changed, 6 insertions(+), 6 deletions(-) diff --git a/clientconn.go b/clientconn.go index 058ec48a..8a45185b 100644 --- a/clientconn.go +++ b/clientconn.go @@ -71,15 +71,15 @@ type dialOptions struct { // DialOption configures how we set up the connection. type DialOption func(*dialOptions) -// NameResolver defines the functions needed to resolve a customized names. +// NameResolver dose name resolution and watches for the resolution changes. type NameResolver interface { - // Get returns the map of customized names and the corresponding ip/DNS addresses. + // Get gets a snapshot of the current name resolution results for target. Get(target string) map[string]string - // Watch set a watcher on the target directory/key. + // Watch watches for the name resolution changes on target. It blocks until Stop() is invoked. The watch results are obtained via GetUpdate(). Watch(target string) - // GetUpdate returns the key and value of the updated namePair of the target directory/key being watched. + // GetUpdate returns a name resolution change when watch is triggered. It blocks until it observes a change. The caller needs to call it again to get the next change. GetUpdate() (string, string) - // Stop shut down the watcher. + // Stop shuts down the NameResolver. Stop() } diff --git a/etcdnaming/etcdnaming.go b/etcdnaming/etcdnaming.go index 4f4246d9..1ea99de4 100644 --- a/etcdnaming/etcdnaming.go +++ b/etcdnaming/etcdnaming.go @@ -2,10 +2,10 @@ package etcdnaming import ( "fmt" - "golang.org/x/net/context" "sync" etcdcl "github.com/coreos/etcd/client" + "golang.org/x/net/context" ) type namePair struct { From 80b71f410f54f049f68cead83126446ac92794c1 Mon Sep 17 00:00:00 2001 From: yangzhouhan Date: Thu, 6 Aug 2015 19:15:11 -0700 Subject: [PATCH 08/17] small fix --- etcdnaming/etcdnaming.go | 20 ++++++++++---------- 1 file changed, 10 insertions(+), 10 deletions(-) diff --git a/etcdnaming/etcdnaming.go b/etcdnaming/etcdnaming.go index 1ea99de4..976f774d 100644 --- a/etcdnaming/etcdnaming.go +++ b/etcdnaming/etcdnaming.go @@ -8,26 +8,26 @@ import ( "golang.org/x/net/context" ) -type namePair struct { +type kv struct { key, value string } -// recvBuffer is an unbounded channel of *namePair. +// recvBuffer is an unbounded channel of *kv. type recvBuffer struct { - c chan *namePair + c chan *kv mu sync.Mutex stopping bool - backlog []*namePair + backlog []*kv } func newRecvBuffer() *recvBuffer { b := &recvBuffer{ - c: make(chan *namePair, 1), + c: make(chan *kv, 1), } return b } -func (b *recvBuffer) put(r *namePair) { +func (b *recvBuffer) put(r *kv) { b.mu.Lock() defer b.mu.Unlock() if b.stopping { @@ -54,7 +54,7 @@ func (b *recvBuffer) load() { } } -func (b *recvBuffer) get() <-chan *namePair { +func (b *recvBuffer) get() <-chan *kv { return b.c } @@ -123,7 +123,7 @@ func (nr *etcdNR) Watch(target string) { if resp.Node.Dir { continue } - entry := &namePair{key: resp.Node.Key, value: resp.Node.Value} + entry := &kv{key: resp.Node.Key, value: resp.Node.Value} nr.recv.put(entry) } } @@ -131,8 +131,8 @@ func (nr *etcdNR) Watch(target string) { func (nr *etcdNR) GetUpdate() (string, string) { select { case i := <-nr.recv.get(): - nr.recv.load() - // returns key and the corresponding value of the updated namePair + nr.recv.load()string + // returns key and the corresponding value of the updated kv return i.key, i.value } } From 3ecd7498150ead87ea6af5f924aa26145d3f24d4 Mon Sep 17 00:00:00 2001 From: yangzhouhan Date: Fri, 7 Aug 2015 14:31:31 -0700 Subject: [PATCH 09/17] small modification --- etcdnaming/etcdnaming.go | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/etcdnaming/etcdnaming.go b/etcdnaming/etcdnaming.go index 976f774d..5ab24a46 100644 --- a/etcdnaming/etcdnaming.go +++ b/etcdnaming/etcdnaming.go @@ -72,6 +72,7 @@ type etcdNR struct { cancel context.CancelFunc } +// NewETCDNR creates a etcd nameResolver func NewETCDNR(cfg etcdcl.Config) *etcdNR { c, err := etcdcl.New(cfg) if err != nil { @@ -87,7 +88,7 @@ func NewETCDNR(cfg etcdcl.Config) *etcdNR { } } -// getNode can be called recursively to build the result key-value map +// getNode builds the resulting key-value map starting from node recursively func getNode(node *etcdcl.Node, res map[string]string) { if !node.Dir { res[node.Key] = node.Value @@ -103,9 +104,8 @@ func (nr *etcdNR) Get(target string) map[string]string { if err != nil { fmt.Printf("non-nil error: %v", err) } - node := resp.Node res := make(map[string]string) - getNode(node, res) + getNode(resp.Node, res) return res } @@ -131,7 +131,7 @@ func (nr *etcdNR) Watch(target string) { func (nr *etcdNR) GetUpdate() (string, string) { select { case i := <-nr.recv.get(): - nr.recv.load()string + nr.recv.load() // returns key and the corresponding value of the updated kv return i.key, i.value } From 813dca6ff726419628f98d262a56bc4a3b1fefb0 Mon Sep 17 00:00:00 2001 From: yangzhouhan Date: Fri, 7 Aug 2015 14:33:01 -0700 Subject: [PATCH 10/17] small modification --- etcdnaming/etcdnaming.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/etcdnaming/etcdnaming.go b/etcdnaming/etcdnaming.go index 5ab24a46..59e01c13 100644 --- a/etcdnaming/etcdnaming.go +++ b/etcdnaming/etcdnaming.go @@ -72,7 +72,7 @@ type etcdNR struct { cancel context.CancelFunc } -// NewETCDNR creates a etcd nameResolver +// NewETCDNR creates an etcd NameResolver func NewETCDNR(cfg etcdcl.Config) *etcdNR { c, err := etcdcl.New(cfg) if err != nil { From 2e3fbc4f8f3789057f412ef7df071d9ee73307fc Mon Sep 17 00:00:00 2001 From: yangzhouhan Date: Fri, 7 Aug 2015 17:12:15 -0700 Subject: [PATCH 11/17] change the name resolver interface --- clientconn.go | 12 ------------ etcdnaming/etcdnaming.go => naming/etcd/etcd.go | 14 ++++++++++++-- naming/naming.go | 14 ++++++++++++++ 3 files changed, 26 insertions(+), 14 deletions(-) rename etcdnaming/etcdnaming.go => naming/etcd/etcd.go (92%) create mode 100644 naming/naming.go diff --git a/clientconn.go b/clientconn.go index 8a45185b..52d47a57 100644 --- a/clientconn.go +++ b/clientconn.go @@ -71,18 +71,6 @@ type dialOptions struct { // DialOption configures how we set up the connection. type DialOption func(*dialOptions) -// NameResolver dose name resolution and watches for the resolution changes. -type NameResolver interface { - // Get gets a snapshot of the current name resolution results for target. - Get(target string) map[string]string - // Watch watches for the name resolution changes on target. It blocks until Stop() is invoked. The watch results are obtained via GetUpdate(). - Watch(target string) - // GetUpdate returns a name resolution change when watch is triggered. It blocks until it observes a change. The caller needs to call it again to get the next change. - GetUpdate() (string, string) - // Stop shuts down the NameResolver. - Stop() -} - // WithCodec returns a DialOption which sets a codec for message marshaling and unmarshaling. func WithCodec(c Codec) DialOption { return func(o *dialOptions) { diff --git a/etcdnaming/etcdnaming.go b/naming/etcd/etcd.go similarity index 92% rename from etcdnaming/etcdnaming.go rename to naming/etcd/etcd.go index 59e01c13..838c2377 100644 --- a/etcdnaming/etcdnaming.go +++ b/naming/etcd/etcd.go @@ -1,4 +1,4 @@ -package etcdnaming +package etcd import ( "fmt" @@ -6,6 +6,7 @@ import ( etcdcl "github.com/coreos/etcd/client" "golang.org/x/net/context" + "google.golang.org/grpc/naming" ) type kv struct { @@ -73,7 +74,7 @@ type etcdNR struct { } // NewETCDNR creates an etcd NameResolver -func NewETCDNR(cfg etcdcl.Config) *etcdNR { +func NewETCDNR(cfg etcdcl.Config) naming.Resolver { c, err := etcdcl.New(cfg) if err != nil { panic(err) @@ -106,6 +107,10 @@ func (nr *etcdNR) Get(target string) map[string]string { } res := make(map[string]string) getNode(resp.Node, res) + + for k,v := range res { + fmt.Println("key is :",k,"value is :",v)} + return res } @@ -132,6 +137,9 @@ func (nr *etcdNR) GetUpdate() (string, string) { select { case i := <-nr.recv.get(): nr.recv.load() + if i == nil { + return "","" + } // returns key and the corresponding value of the updated kv return i.key, i.value } @@ -141,3 +149,5 @@ func (nr *etcdNR) Stop() { nr.recv.stop() nr.cancel() } + + diff --git a/naming/naming.go b/naming/naming.go new file mode 100644 index 00000000..bba0f52a --- /dev/null +++ b/naming/naming.go @@ -0,0 +1,14 @@ +package naming; + +// Resolver dose name resolution and watches for the resolution changes. +type Resolver interface { + // Get gets a snapshot of the current name resolution results for target. + Get(target string) map[string]string + // Watch watches for the name resolution changes on target. It blocks until Stop() is invoked. The watch results are obtained via GetUpdate(). + Watch(target string) + // GetUpdate returns a name resolution change when watch is triggered. It blocks until it observes a change. The caller needs to call it again to get the next change. + GetUpdate() (string, string) + // Stop shuts down the NameResolver. + Stop() +} + From 6970805cc5b338bb1c9c78287408b243b5e478c7 Mon Sep 17 00:00:00 2001 From: yangzhouhan Date: Fri, 7 Aug 2015 17:15:27 -0700 Subject: [PATCH 12/17] gofmt --- naming/etcd/etcd.go | 10 ++-------- naming/naming.go | 3 +-- 2 files changed, 3 insertions(+), 10 deletions(-) diff --git a/naming/etcd/etcd.go b/naming/etcd/etcd.go index 838c2377..65e5c425 100644 --- a/naming/etcd/etcd.go +++ b/naming/etcd/etcd.go @@ -107,10 +107,6 @@ func (nr *etcdNR) Get(target string) map[string]string { } res := make(map[string]string) getNode(resp.Node, res) - - for k,v := range res { - fmt.Println("key is :",k,"value is :",v)} - return res } @@ -138,8 +134,8 @@ func (nr *etcdNR) GetUpdate() (string, string) { case i := <-nr.recv.get(): nr.recv.load() if i == nil { - return "","" - } + return "", "" + } // returns key and the corresponding value of the updated kv return i.key, i.value } @@ -149,5 +145,3 @@ func (nr *etcdNR) Stop() { nr.recv.stop() nr.cancel() } - - diff --git a/naming/naming.go b/naming/naming.go index bba0f52a..a6a319f7 100644 --- a/naming/naming.go +++ b/naming/naming.go @@ -1,4 +1,4 @@ -package naming; +package naming // Resolver dose name resolution and watches for the resolution changes. type Resolver interface { @@ -11,4 +11,3 @@ type Resolver interface { // Stop shuts down the NameResolver. Stop() } - From 2911d760f47e2dee969f7bf28e16445e3da0dbb2 Mon Sep 17 00:00:00 2001 From: yangzhouhan Date: Fri, 7 Aug 2015 18:07:57 -0700 Subject: [PATCH 13/17] minor modification --- naming/etcd/etcd.go | 20 +++++++++----------- 1 file changed, 9 insertions(+), 11 deletions(-) diff --git a/naming/etcd/etcd.go b/naming/etcd/etcd.go index 65e5c425..e5f66154 100644 --- a/naming/etcd/etcd.go +++ b/naming/etcd/etcd.go @@ -1,8 +1,8 @@ -package etcd +package etcd import ( - "fmt" "sync" + "log" etcdcl "github.com/coreos/etcd/client" "golang.org/x/net/context" @@ -77,7 +77,8 @@ type etcdNR struct { func NewETCDNR(cfg etcdcl.Config) naming.Resolver { c, err := etcdcl.New(cfg) if err != nil { - panic(err) + log.Fatalf("NewETCDNR() failed: %v",err) + return nil } kAPI := etcdcl.NewKeysAPI(c) ctx, cancel := context.WithCancel(context.Background()) @@ -103,7 +104,8 @@ func getNode(node *etcdcl.Node, res map[string]string) { func (nr *etcdNR) Get(target string) map[string]string { resp, err := nr.kAPI.Get(nr.ctx, target, &etcdcl.GetOptions{Recursive: true, Sort: true}) if err != nil { - fmt.Printf("non-nil error: %v", err) + log.Fatalf("etcdNR.Get(_) failed: %v", err) + return nil } res := make(map[string]string) getNode(resp.Node, res) @@ -113,12 +115,9 @@ func (nr *etcdNR) Get(target string) map[string]string { func (nr *etcdNR) Watch(target string) { watcher := nr.kAPI.Watcher(target, &etcdcl.WatcherOptions{Recursive: true}) for { - ctx, cancel := context.WithCancel(nr.ctx) - nr.ctx = ctx - nr.cancel = cancel resp, err := watcher.Next(nr.ctx) if err != nil { - fmt.Printf("non-nil error: %v", err) + log.Printf("etcdNR.Watch() stopped: %v", err) break } if resp.Node.Dir { @@ -130,15 +129,14 @@ func (nr *etcdNR) Watch(target string) { } func (nr *etcdNR) GetUpdate() (string, string) { - select { - case i := <-nr.recv.get(): + i := <-nr.recv.get() nr.recv.load() if i == nil { return "", "" } // returns key and the corresponding value of the updated kv return i.key, i.value - } + } func (nr *etcdNR) Stop() { From 86b5db2f228fb307722fc4b0751c6627a3d1d92b Mon Sep 17 00:00:00 2001 From: yangzhouhan Date: Mon, 10 Aug 2015 11:01:15 -0700 Subject: [PATCH 14/17] gofmt --- naming/etcd/etcd.go | 22 +++++++++++----------- 1 file changed, 11 insertions(+), 11 deletions(-) diff --git a/naming/etcd/etcd.go b/naming/etcd/etcd.go index e5f66154..4830ecb3 100644 --- a/naming/etcd/etcd.go +++ b/naming/etcd/etcd.go @@ -1,8 +1,8 @@ -package etcd +package etcd import ( - "sync" "log" + "sync" etcdcl "github.com/coreos/etcd/client" "golang.org/x/net/context" @@ -77,7 +77,7 @@ type etcdNR struct { func NewETCDNR(cfg etcdcl.Config) naming.Resolver { c, err := etcdcl.New(cfg) if err != nil { - log.Fatalf("NewETCDNR() failed: %v",err) + log.Fatalf("NewETCDNR(_) failed: %v", err) return nil } kAPI := etcdcl.NewKeysAPI(c) @@ -104,7 +104,7 @@ func getNode(node *etcdcl.Node, res map[string]string) { func (nr *etcdNR) Get(target string) map[string]string { resp, err := nr.kAPI.Get(nr.ctx, target, &etcdcl.GetOptions{Recursive: true, Sort: true}) if err != nil { - log.Fatalf("etcdNR.Get(_) failed: %v", err) + log.Printf("etcdNR.Get(_) stopped: %v", err) return nil } res := make(map[string]string) @@ -117,7 +117,7 @@ func (nr *etcdNR) Watch(target string) { for { resp, err := watcher.Next(nr.ctx) if err != nil { - log.Printf("etcdNR.Watch() stopped: %v", err) + log.Printf("etcdNR.Watch(_) stopped: %v", err) break } if resp.Node.Dir { @@ -130,12 +130,12 @@ func (nr *etcdNR) Watch(target string) { func (nr *etcdNR) GetUpdate() (string, string) { i := <-nr.recv.get() - nr.recv.load() - if i == nil { - return "", "" - } - // returns key and the corresponding value of the updated kv - return i.key, i.value + nr.recv.load() + if i == nil { + return "", "" + } + // returns key and the corresponding value of the updated kv pair + return i.key, i.value } From 15755ea6929a02b2bbd49a40db5cc41e1d73f91e Mon Sep 17 00:00:00 2001 From: yangzhouhan Date: Mon, 10 Aug 2015 11:03:11 -0700 Subject: [PATCH 15/17] fix the comment --- naming/etcd/etcd.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/naming/etcd/etcd.go b/naming/etcd/etcd.go index 4830ecb3..e9b60107 100644 --- a/naming/etcd/etcd.go +++ b/naming/etcd/etcd.go @@ -13,7 +13,7 @@ type kv struct { key, value string } -// recvBuffer is an unbounded channel of *kv. +// recvBuffer is an unbounded channel of *kv to record all the pending changes from etcd server. type recvBuffer struct { c chan *kv mu sync.Mutex From 22561bcdfca868047c37d4d3924c8f6bce08b8a2 Mon Sep 17 00:00:00 2001 From: yangzhouhan Date: Mon, 10 Aug 2015 11:17:24 -0700 Subject: [PATCH 16/17] return error when create etcdNR fails --- naming/etcd/etcd.go | 11 +++++------ 1 file changed, 5 insertions(+), 6 deletions(-) diff --git a/naming/etcd/etcd.go b/naming/etcd/etcd.go index e9b60107..f9ee2759 100644 --- a/naming/etcd/etcd.go +++ b/naming/etcd/etcd.go @@ -73,12 +73,11 @@ type etcdNR struct { cancel context.CancelFunc } -// NewETCDNR creates an etcd NameResolver -func NewETCDNR(cfg etcdcl.Config) naming.Resolver { +// NewETCDNR creates an etcd NameResolver. +func NewETCDNR(cfg etcdcl.Config) (naming.Resolver, error) { c, err := etcdcl.New(cfg) if err != nil { - log.Fatalf("NewETCDNR(_) failed: %v", err) - return nil + return nil, err } kAPI := etcdcl.NewKeysAPI(c) ctx, cancel := context.WithCancel(context.Background()) @@ -87,10 +86,10 @@ func NewETCDNR(cfg etcdcl.Config) naming.Resolver { recv: newRecvBuffer(), ctx: ctx, cancel: cancel, - } + }, nil } -// getNode builds the resulting key-value map starting from node recursively +// getNode builds the resulting key-value map starting from node recursively. func getNode(node *etcdcl.Node, res map[string]string) { if !node.Dir { res[node.Key] = node.Value From 95389ca819ec2cf05b1b4273e4400d496f07a04a Mon Sep 17 00:00:00 2001 From: yangzhouhan Date: Mon, 10 Aug 2015 11:19:08 -0700 Subject: [PATCH 17/17] add a comment --- naming/etcd/etcd.go | 1 + 1 file changed, 1 insertion(+) diff --git a/naming/etcd/etcd.go b/naming/etcd/etcd.go index f9ee2759..915e2271 100644 --- a/naming/etcd/etcd.go +++ b/naming/etcd/etcd.go @@ -59,6 +59,7 @@ func (b *recvBuffer) get() <-chan *kv { return b.c } +// stop terminates the recvBuffer. After it is called, the recvBuffer is not usable any more. func (b *recvBuffer) stop() { b.mu.Lock() b.stopping = true