add an etcd client for naming resolution
This commit is contained in:
@ -1,73 +1,144 @@
|
|||||||
package etcdnaming
|
package etcdnaming
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"io/ioutil"
|
"fmt"
|
||||||
"net/http"
|
"golang.org/x/net/context"
|
||||||
"strings"
|
"sync"
|
||||||
|
|
||||||
|
etcdcl "github.com/coreos/etcd/client"
|
||||||
)
|
)
|
||||||
|
|
||||||
func parser(body string) map[string]string {
|
type namePair struct {
|
||||||
if body == "" {
|
key, value string
|
||||||
return nil
|
|
||||||
}
|
|
||||||
res := make(map[string]string)
|
|
||||||
str := strings.Replace(body, "nodes", "Nodes", -1)
|
|
||||||
subStr := strings.Split(str, "node")
|
|
||||||
for _, entry := range subStr {
|
|
||||||
i := 0
|
|
||||||
var last int
|
|
||||||
if strings.Contains(entry, "prevNode") {
|
|
||||||
last = strings.IndexAny(entry, "prevNode")
|
|
||||||
} else {
|
|
||||||
last = len(entry)
|
|
||||||
}
|
|
||||||
for i < last {
|
|
||||||
if entry[i] == '{' {
|
|
||||||
j := i + 1
|
|
||||||
subEntry := []byte{}
|
|
||||||
for j < len(entry) && entry[j] != '{' && entry[j] != '}' {
|
|
||||||
subEntry = append(subEntry, entry[j])
|
|
||||||
j++
|
|
||||||
}
|
|
||||||
if j < len(entry) && entry[j] == '}' {
|
|
||||||
var (
|
|
||||||
key string
|
|
||||||
value string
|
|
||||||
)
|
|
||||||
for _, val := range strings.Split(string(subEntry), ",") {
|
|
||||||
if strings.Contains(val, "key") {
|
|
||||||
key = strings.Replace(strings.SplitN(val, ":", 2)[1], `"`, "", -1)
|
|
||||||
}
|
|
||||||
if strings.Contains(val, "value") {
|
|
||||||
value = strings.Replace(strings.SplitN(val, ":", 2)[1], `"`, "", -1)
|
|
||||||
}
|
|
||||||
res[key] = value
|
|
||||||
|
|
||||||
}
|
|
||||||
}
|
|
||||||
i = j
|
|
||||||
} else {
|
|
||||||
i++
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
return res
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func Etcdnaming(host string, key string, watch bool) map[string]string {
|
// recvBuffer is an unbounded channel of item.
|
||||||
var comm string
|
type recvBuffer struct {
|
||||||
if !watch {
|
c chan *namePair
|
||||||
comm = "http://" + host + "/v2/keys/" + key + "/?recursive=true"
|
mu sync.Mutex
|
||||||
} else {
|
stopping bool
|
||||||
comm = "http://" + host + "/v2/keys/" + key + `?wait=true&recursive=true`
|
backlog []*namePair
|
||||||
|
}
|
||||||
|
|
||||||
|
func newRecvBuffer() *recvBuffer {
|
||||||
|
b := &recvBuffer{
|
||||||
|
c: make(chan *namePair, 1),
|
||||||
}
|
}
|
||||||
resp, err := http.Get(comm)
|
return b
|
||||||
|
}
|
||||||
|
|
||||||
|
func (b *recvBuffer) put(r *namePair) {
|
||||||
|
b.mu.Lock()
|
||||||
|
defer b.mu.Unlock()
|
||||||
|
b.backlog = append(b.backlog, r)
|
||||||
|
if !b.stopping {
|
||||||
|
select {
|
||||||
|
case b.c <- b.backlog[0]:
|
||||||
|
b.backlog = b.backlog[1:]
|
||||||
|
default:
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (b *recvBuffer) load() {
|
||||||
|
b.mu.Lock()
|
||||||
|
defer b.mu.Unlock()
|
||||||
|
if len(b.backlog) > 0 && !b.stopping {
|
||||||
|
select {
|
||||||
|
case b.c <- b.backlog[0]:
|
||||||
|
b.backlog = b.backlog[1:]
|
||||||
|
default:
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (b *recvBuffer) get() <-chan *namePair {
|
||||||
|
return b.c
|
||||||
|
}
|
||||||
|
|
||||||
|
func (b *recvBuffer) stop() {
|
||||||
|
b.mu.Lock()
|
||||||
|
b.stopping = true
|
||||||
|
close(b.c)
|
||||||
|
b.mu.Unlock()
|
||||||
|
}
|
||||||
|
|
||||||
|
type etcdNR struct {
|
||||||
|
cfg etcdcl.Config
|
||||||
|
recv *recvBuffer
|
||||||
|
ctx context.Context
|
||||||
|
cancel context.CancelFunc
|
||||||
|
}
|
||||||
|
|
||||||
|
func NewETCDNR(cfg etcdcl.Config) *etcdNR {
|
||||||
|
return &etcdNR{
|
||||||
|
cfg: cfg,
|
||||||
|
recv: newRecvBuffer(),
|
||||||
|
ctx: context.Background(),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func getNode(node *etcdcl.Node, res map[string]string) {
|
||||||
|
if !node.Dir {
|
||||||
|
res[node.Key] = node.Value
|
||||||
|
return
|
||||||
|
}
|
||||||
|
for _, val := range node.Nodes {
|
||||||
|
getNode(val, res)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (nr *etcdNR) Get(target string) map[string]string {
|
||||||
|
cfg := nr.cfg
|
||||||
|
c, err := etcdcl.New(cfg)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
panic(err)
|
panic(err)
|
||||||
}
|
}
|
||||||
defer resp.Body.Close()
|
kAPI := etcdcl.NewKeysAPI(c)
|
||||||
var body []byte
|
resp, err := kAPI.Get(nr.ctx, target, &etcdcl.GetOptions{Recursive: true, Sort: true})
|
||||||
body, err = ioutil.ReadAll(resp.Body)
|
if err != nil {
|
||||||
res := parser(string(body))
|
fmt.Printf("non-nil error: %v", err)
|
||||||
|
}
|
||||||
|
node := resp.Node
|
||||||
|
res := make(map[string]string)
|
||||||
|
getNode(node, res)
|
||||||
return res
|
return res
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (nr *etcdNR) Watch(target string) {
|
||||||
|
cfg := nr.cfg
|
||||||
|
c, err := etcdcl.New(cfg)
|
||||||
|
if err != nil {
|
||||||
|
panic(err)
|
||||||
|
}
|
||||||
|
kAPI := etcdcl.NewKeysAPI(c)
|
||||||
|
watcher := kAPI.Watcher(target, &etcdcl.WatcherOptions{Recursive: true})
|
||||||
|
for {
|
||||||
|
ctx, cancel := context.WithCancel(nr.ctx)
|
||||||
|
nr.ctx = ctx
|
||||||
|
nr.cancel = cancel
|
||||||
|
resp, err := watcher.Next(nr.ctx)
|
||||||
|
if err != nil {
|
||||||
|
fmt.Printf("non-nil error: %v", err)
|
||||||
|
break
|
||||||
|
}
|
||||||
|
if resp.Node.Dir {
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
entry := &namePair{key: resp.Node.Key, value: resp.Node.Value}
|
||||||
|
nr.recv.put(entry)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (nr *etcdNR) GetUpdate() *namePair {
|
||||||
|
select {
|
||||||
|
case i := <-nr.recv.get():
|
||||||
|
nr.recv.load()
|
||||||
|
return i
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (nr *etcdNR) Stop() {
|
||||||
|
nr.recv.stop()
|
||||||
|
nr.cancel()
|
||||||
|
}
|
||||||
|
@ -1,29 +0,0 @@
|
|||||||
package etcdnaming
|
|
||||||
|
|
||||||
import (
|
|
||||||
"testing"
|
|
||||||
)
|
|
||||||
|
|
||||||
func TestEtcdParsing(t *testing.T) {
|
|
||||||
for _, test := range []struct {
|
|
||||||
// input
|
|
||||||
body string
|
|
||||||
// expected
|
|
||||||
expected map[string]string
|
|
||||||
}{
|
|
||||||
{"", nil},
|
|
||||||
{`{"action":"set","node":{"key":"/foo","value":"bar5","modifiedIndex":30,"createdIndex":30}}`, map[string]string{"/foo": "bar5"}},
|
|
||||||
{`{"action":"set","node":{"key":"/foo_dir/foo2","value":"bar2","modifiedIndex":59,"createdIndex":59},"prevNode":{"key":"/foo_dir/foo2","value":"bar","modifiedIndex":58,"createdIndex":58}}`, map[string]string{"/foo_dir/foo2": "bar2"}},
|
|
||||||
{`{"action":"get","node":{"key":"/foo_dir","dir":true,"nodes":[{"key":"/foo_dir/foo2","value":"bar2","modifiedIndex":33,"createdIndex":33},{"key":"/foo_dir/bar_dir","dir":true,"nodes":[{"key":"/foo_dir/bar_dir/bar1","value":"a","modifiedIndex":35,"createdIndex":35},{"key":"/foo_dir/bar_dir/bar2","value":"b","modifiedIndex":36,"createdIndex":36}],"modifiedIndex":34,"createdIndex":34},{"key":"/foo_dir/bar_dir2","dir":true,"nodes":[{"key":"/foo_dir/bar_dir2/bar3","value":"c","modifiedIndex":39,"createdIndex":39}],"modifiedIndex":38,"createdIndex":38},{"key":"/foo_dir/foo6","value":"newbar2","modifiedIndex":48,"createdIndex":48},{"key":"/foo_dir/foo3","value":"newbar2","modifiedIndex":50,"createdIndex":50},{"key":"/foo_dir/foo","value":"newbar","modifiedIndex":42,"createdIndex":42}],"modifiedIndex":32,"createdIndex":32}}`, map[string]string{"/foo_dir/foo2": "bar2", "/foo_dir/bar_dir/bar1": "a", "/foo_dir/bar_dir/bar2": "b", "/foo_dir/bar_dir2/bar3": "c", "/foo_dir/foo6": "newbar2", "/foo_dir/foo3": "newbar2", "/foo_dir/foo": "newbar"}},
|
|
||||||
} {
|
|
||||||
out := parser(test.body)
|
|
||||||
if len(test.expected) != len(out) {
|
|
||||||
t.Fatalf("want result length %v, get %v", len(test.expected), len(out))
|
|
||||||
}
|
|
||||||
for key, _ := range out {
|
|
||||||
if test.expected[key] != out[key] {
|
|
||||||
t.Fatalf("expected result error!, want %v get %v", test.expected[key], out[key])
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
Reference in New Issue
Block a user