add comments and revise nameresolver

This commit is contained in:
yangzhouhan
2015-08-06 16:08:24 -07:00
parent 6fd3b74078
commit ea4b8e0310
2 changed files with 36 additions and 33 deletions

View File

@ -72,9 +72,13 @@ type dialOptions struct {
type DialOption func(*dialOptions) type DialOption func(*dialOptions)
type NameResolver interface { type NameResolver interface {
// Get returns the map of customized names and the corresponding ip/DNS addresses.
Get(target string) map[string]string Get(target string) map[string]string
// Watch set a watcher on the target directory/key.
Watch(target string) Watch(target string)
GetUpdate() (key string, val string) // GetUpdate returns the key and value of the updated namePair of the target directory/key being watched.
GetUpdate() (string, string)
// Stop shut down the watcher.
Stop() Stop()
} }

View File

@ -12,7 +12,7 @@ type namePair struct {
key, value string key, value string
} }
// recvBuffer is an unbounded channel of item. // recvBuffer is an unbounded channel of *namePair.
type recvBuffer struct { type recvBuffer struct {
c chan *namePair c chan *namePair
mu sync.Mutex mu sync.Mutex
@ -30,25 +30,27 @@ func newRecvBuffer() *recvBuffer {
func (b *recvBuffer) put(r *namePair) { func (b *recvBuffer) put(r *namePair) {
b.mu.Lock() b.mu.Lock()
defer b.mu.Unlock() defer b.mu.Unlock()
if b.stopping {
return
}
b.backlog = append(b.backlog, r) b.backlog = append(b.backlog, r)
if !b.stopping { select {
select { case b.c <- b.backlog[0]:
case b.c <- b.backlog[0]: b.backlog = b.backlog[1:]
b.backlog = b.backlog[1:] default:
default:
}
} }
} }
func (b *recvBuffer) load() { func (b *recvBuffer) load() {
b.mu.Lock() b.mu.Lock()
defer b.mu.Unlock() defer b.mu.Unlock()
if len(b.backlog) > 0 && !b.stopping { if b.stopping || len(b.backlog) == 0 {
select { return
case b.c <- b.backlog[0]: }
b.backlog = b.backlog[1:] select {
default: case b.c <- b.backlog[0]:
} b.backlog = b.backlog[1:]
default:
} }
} }
@ -64,20 +66,28 @@ func (b *recvBuffer) stop() {
} }
type etcdNR struct { type etcdNR struct {
cfg etcdcl.Config kAPI etcdcl.KeysAPI
recv *recvBuffer recv *recvBuffer
ctx context.Context ctx context.Context
cancel context.CancelFunc cancel context.CancelFunc
} }
func NewETCDNR(cfg etcdcl.Config) *etcdNR { func NewETCDNR(cfg etcdcl.Config) *etcdNR {
c, err := etcdcl.New(cfg)
if err != nil {
panic(err)
}
kAPI := etcdcl.NewKeysAPI(c)
ctx, cancel := context.WithCancel(context.Background())
return &etcdNR{ return &etcdNR{
cfg: cfg, kAPI: kAPI,
recv: newRecvBuffer(), recv: newRecvBuffer(),
ctx: context.Background(), ctx: ctx,
cancel: cancel,
} }
} }
// getNode can be called recursively to build the result key-value map
func getNode(node *etcdcl.Node, res map[string]string) { func getNode(node *etcdcl.Node, res map[string]string) {
if !node.Dir { if !node.Dir {
res[node.Key] = node.Value res[node.Key] = node.Value
@ -89,13 +99,7 @@ func getNode(node *etcdcl.Node, res map[string]string) {
} }
func (nr *etcdNR) Get(target string) map[string]string { func (nr *etcdNR) Get(target string) map[string]string {
cfg := nr.cfg resp, err := nr.kAPI.Get(nr.ctx, target, &etcdcl.GetOptions{Recursive: true, Sort: true})
c, err := etcdcl.New(cfg)
if err != nil {
panic(err)
}
kAPI := etcdcl.NewKeysAPI(c)
resp, err := kAPI.Get(nr.ctx, target, &etcdcl.GetOptions{Recursive: true, Sort: true})
if err != nil { if err != nil {
fmt.Printf("non-nil error: %v", err) fmt.Printf("non-nil error: %v", err)
} }
@ -106,13 +110,7 @@ func (nr *etcdNR) Get(target string) map[string]string {
} }
func (nr *etcdNR) Watch(target string) { func (nr *etcdNR) Watch(target string) {
cfg := nr.cfg watcher := nr.kAPI.Watcher(target, &etcdcl.WatcherOptions{Recursive: true})
c, err := etcdcl.New(cfg)
if err != nil {
panic(err)
}
kAPI := etcdcl.NewKeysAPI(c)
watcher := kAPI.Watcher(target, &etcdcl.WatcherOptions{Recursive: true})
for { for {
ctx, cancel := context.WithCancel(nr.ctx) ctx, cancel := context.WithCancel(nr.ctx)
nr.ctx = ctx nr.ctx = ctx
@ -130,10 +128,11 @@ func (nr *etcdNR) Watch(target string) {
} }
} }
func (nr *etcdNR) GetUpdate() (key string, val string) { func (nr *etcdNR) GetUpdate() (string, string) {
select { select {
case i := <-nr.recv.get(): case i := <-nr.recv.get():
nr.recv.load() nr.recv.load()
// returns key and the corresponding value of the updated namePair
return i.key, i.value return i.key, i.value
} }
} }