diff --git a/etcdnaming/etcdnaming.go b/etcdnaming/etcdnaming.go index 10727f90..76b7da10 100644 --- a/etcdnaming/etcdnaming.go +++ b/etcdnaming/etcdnaming.go @@ -1,73 +1,144 @@ package etcdnaming import ( - "io/ioutil" - "net/http" - "strings" + "fmt" + "golang.org/x/net/context" + "sync" + + etcdcl "github.com/coreos/etcd/client" ) -func parser(body string) map[string]string { - if body == "" { - return nil - } - res := make(map[string]string) - str := strings.Replace(body, "nodes", "Nodes", -1) - subStr := strings.Split(str, "node") - for _, entry := range subStr { - i := 0 - var last int - if strings.Contains(entry, "prevNode") { - last = strings.IndexAny(entry, "prevNode") - } else { - last = len(entry) - } - for i < last { - if entry[i] == '{' { - j := i + 1 - subEntry := []byte{} - for j < len(entry) && entry[j] != '{' && entry[j] != '}' { - subEntry = append(subEntry, entry[j]) - j++ - } - if j < len(entry) && entry[j] == '}' { - var ( - key string - value string - ) - for _, val := range strings.Split(string(subEntry), ",") { - if strings.Contains(val, "key") { - key = strings.Replace(strings.SplitN(val, ":", 2)[1], `"`, "", -1) - } - if strings.Contains(val, "value") { - value = strings.Replace(strings.SplitN(val, ":", 2)[1], `"`, "", -1) - } - res[key] = value - - } - } - i = j - } else { - i++ - } - } - } - return res +type namePair struct { + key, value string } -func Etcdnaming(host string, key string, watch bool) map[string]string { - var comm string - if !watch { - comm = "http://" + host + "/v2/keys/" + key + "/?recursive=true" - } else { - comm = "http://" + host + "/v2/keys/" + key + `?wait=true&recursive=true` +// recvBuffer is an unbounded channel of item. +type recvBuffer struct { + c chan *namePair + mu sync.Mutex + stopping bool + backlog []*namePair +} + +func newRecvBuffer() *recvBuffer { + b := &recvBuffer{ + c: make(chan *namePair, 1), } - resp, err := http.Get(comm) + return b +} + +func (b *recvBuffer) put(r *namePair) { + b.mu.Lock() + defer b.mu.Unlock() + b.backlog = append(b.backlog, r) + if !b.stopping { + select { + case b.c <- b.backlog[0]: + b.backlog = b.backlog[1:] + default: + } + } +} + +func (b *recvBuffer) load() { + b.mu.Lock() + defer b.mu.Unlock() + if len(b.backlog) > 0 && !b.stopping { + select { + case b.c <- b.backlog[0]: + b.backlog = b.backlog[1:] + default: + } + } +} + +func (b *recvBuffer) get() <-chan *namePair { + return b.c +} + +func (b *recvBuffer) stop() { + b.mu.Lock() + b.stopping = true + close(b.c) + b.mu.Unlock() +} + +type etcdNR struct { + cfg etcdcl.Config + recv *recvBuffer + ctx context.Context + cancel context.CancelFunc +} + +func NewETCDNR(cfg etcdcl.Config) *etcdNR { + return &etcdNR{ + cfg: cfg, + recv: newRecvBuffer(), + ctx: context.Background(), + } +} + +func getNode(node *etcdcl.Node, res map[string]string) { + if !node.Dir { + res[node.Key] = node.Value + return + } + for _, val := range node.Nodes { + getNode(val, res) + } +} + +func (nr *etcdNR) Get(target string) map[string]string { + cfg := nr.cfg + c, err := etcdcl.New(cfg) if err != nil { panic(err) } - defer resp.Body.Close() - var body []byte - body, err = ioutil.ReadAll(resp.Body) - res := parser(string(body)) + kAPI := etcdcl.NewKeysAPI(c) + resp, err := kAPI.Get(nr.ctx, target, &etcdcl.GetOptions{Recursive: true, Sort: true}) + if err != nil { + fmt.Printf("non-nil error: %v", err) + } + node := resp.Node + res := make(map[string]string) + getNode(node, res) return res } + +func (nr *etcdNR) Watch(target string) { + cfg := nr.cfg + c, err := etcdcl.New(cfg) + if err != nil { + panic(err) + } + kAPI := etcdcl.NewKeysAPI(c) + watcher := kAPI.Watcher(target, &etcdcl.WatcherOptions{Recursive: true}) + for { + ctx, cancel := context.WithCancel(nr.ctx) + nr.ctx = ctx + nr.cancel = cancel + resp, err := watcher.Next(nr.ctx) + if err != nil { + fmt.Printf("non-nil error: %v", err) + break + } + if resp.Node.Dir { + continue + } + entry := &namePair{key: resp.Node.Key, value: resp.Node.Value} + nr.recv.put(entry) + } +} + +func (nr *etcdNR) GetUpdate() *namePair { + select { + case i := <-nr.recv.get(): + nr.recv.load() + return i + } +} + +func (nr *etcdNR) Stop() { + nr.recv.stop() + nr.cancel() +} diff --git a/etcdnaming/etcdnaming_test.go b/etcdnaming/etcdnaming_test.go deleted file mode 100644 index d3370977..00000000 --- a/etcdnaming/etcdnaming_test.go +++ /dev/null @@ -1,29 +0,0 @@ -package etcdnaming - -import ( - "testing" -) - -func TestEtcdParsing(t *testing.T) { - for _, test := range []struct { - // input - body string - // expected - expected map[string]string - }{ - {"", nil}, - {`{"action":"set","node":{"key":"/foo","value":"bar5","modifiedIndex":30,"createdIndex":30}}`, map[string]string{"/foo": "bar5"}}, - {`{"action":"set","node":{"key":"/foo_dir/foo2","value":"bar2","modifiedIndex":59,"createdIndex":59},"prevNode":{"key":"/foo_dir/foo2","value":"bar","modifiedIndex":58,"createdIndex":58}}`, map[string]string{"/foo_dir/foo2": "bar2"}}, - {`{"action":"get","node":{"key":"/foo_dir","dir":true,"nodes":[{"key":"/foo_dir/foo2","value":"bar2","modifiedIndex":33,"createdIndex":33},{"key":"/foo_dir/bar_dir","dir":true,"nodes":[{"key":"/foo_dir/bar_dir/bar1","value":"a","modifiedIndex":35,"createdIndex":35},{"key":"/foo_dir/bar_dir/bar2","value":"b","modifiedIndex":36,"createdIndex":36}],"modifiedIndex":34,"createdIndex":34},{"key":"/foo_dir/bar_dir2","dir":true,"nodes":[{"key":"/foo_dir/bar_dir2/bar3","value":"c","modifiedIndex":39,"createdIndex":39}],"modifiedIndex":38,"createdIndex":38},{"key":"/foo_dir/foo6","value":"newbar2","modifiedIndex":48,"createdIndex":48},{"key":"/foo_dir/foo3","value":"newbar2","modifiedIndex":50,"createdIndex":50},{"key":"/foo_dir/foo","value":"newbar","modifiedIndex":42,"createdIndex":42}],"modifiedIndex":32,"createdIndex":32}}`, map[string]string{"/foo_dir/foo2": "bar2", "/foo_dir/bar_dir/bar1": "a", "/foo_dir/bar_dir/bar2": "b", "/foo_dir/bar_dir2/bar3": "c", "/foo_dir/foo6": "newbar2", "/foo_dir/foo3": "newbar2", "/foo_dir/foo": "newbar"}}, - } { - out := parser(test.body) - if len(test.expected) != len(out) { - t.Fatalf("want result length %v, get %v", len(test.expected), len(out)) - } - for key, _ := range out { - if test.expected[key] != out[key] { - t.Fatalf("expected result error!, want %v get %v", test.expected[key], out[key]) - } - } - } -}