1
0
mirror of https://github.com/ipfs/kubo.git synced 2025-06-19 18:05:32 +08:00

namesys: select on output

License: MIT
Signed-off-by: Łukasz Magiera <magik6k@gmail.com>
This commit is contained in:
Łukasz Magiera
2018-10-16 17:45:13 +02:00
parent 734615ac98
commit 7fcf56e8e5
4 changed files with 32 additions and 54 deletions

View File

@ -49,6 +49,11 @@ func resolveAsync(ctx context.Context, r resolver, name string, options opts.Res
defer close(outCh)
var subCh <-chan Result
var cancelSub context.CancelFunc
defer func() {
if cancelSub != nil {
cancelSub()
}
}()
for {
select {
@ -59,20 +64,17 @@ func resolveAsync(ctx context.Context, r resolver, name string, options opts.Res
}
if res.err != nil {
outCh <- Result{Err: res.err}
if cancelSub != nil {
cancelSub()
}
emitResult(ctx, outCh, Result{Err: res.err})
return
}
log.Debugf("resolved %s to %s", name, res.value.String())
if !strings.HasPrefix(res.value.String(), ipnsPrefix) {
outCh <- Result{Path: res.value}
emitResult(ctx, outCh, Result{Path: res.value})
break
}
if depth == 1 {
outCh <- Result{Path: res.value, Err: ErrResolveRecursion}
emitResult(ctx, outCh, Result{Path: res.value, Err: ErrResolveRecursion})
break
}
@ -87,6 +89,7 @@ func resolveAsync(ctx context.Context, r resolver, name string, options opts.Res
cancelSub()
}
subCtx, cancelSub = context.WithCancel(ctx)
_ = cancelSub
p := strings.TrimPrefix(res.value.String(), ipnsPrefix)
subCh = resolveAsync(subCtx, r, p, subopts)
@ -96,27 +99,21 @@ func resolveAsync(ctx context.Context, r resolver, name string, options opts.Res
break
}
select {
case outCh <- res:
case <-ctx.Done():
if cancelSub != nil {
cancelSub()
}
return
}
emitResult(ctx, outCh, res)
case <-ctx.Done():
if cancelSub != nil {
cancelSub()
}
return
}
if resCh == nil && subCh == nil {
if cancelSub != nil {
cancelSub()
}
return
}
}
}()
return outCh
}
func emitResult(ctx context.Context, outCh chan<- Result, r Result) {
select {
case outCh <- r:
case <-ctx.Done():
}
}

View File

@ -80,10 +80,7 @@ func (r *DNSResolver) resolveOnceAsync(ctx context.Context, name string, options
}
if subRes.error == nil {
p, err := appendPath(subRes.path)
select {
case out <- onceResult{value: p, err: err}:
case <-ctx.Done():
}
emitOnceResult(ctx, out, onceResult{value: p, err: err})
return
}
case rootRes, ok := <-rootChan:
@ -93,10 +90,7 @@ func (r *DNSResolver) resolveOnceAsync(ctx context.Context, name string, options
}
if rootRes.error == nil {
p, err := appendPath(rootRes.path)
select {
case out <- onceResult{value: p, err: err}:
case <-ctx.Done():
}
emitOnceResult(ctx, out, onceResult{value: p, err: err})
}
case <-ctx.Done():
return

View File

@ -141,19 +141,11 @@ func (ns *mpns) resolveOnceAsync(ctx context.Context, name string, options opts.
if len(segments) > 3 {
p, err := path.FromSegments("", strings.TrimRight(p.String(), "/"), segments[3])
if err != nil {
select {
case out <- onceResult{value: p, err: err}:
case <-ctx.Done():
}
return
emitOnceResult(ctx, out, onceResult{value: p, ttl: res.ttl, err: err})
}
}
select {
case out <- onceResult{value: p, ttl: res.ttl, err: res.err}:
case <-ctx.Done():
return
}
emitOnceResult(ctx, out, onceResult{value: p, ttl: res.ttl, err: res.err})
case <-ctx.Done():
return
}
@ -163,6 +155,13 @@ func (ns *mpns) resolveOnceAsync(ctx context.Context, name string, options opts.
return out
}
func emitOnceResult(ctx context.Context, outCh chan<- onceResult, r onceResult) {
select {
case outCh <- r:
case <-ctx.Done():
}
}
// Publish implements Publisher
func (ns *mpns) Publish(ctx context.Context, name ci.PrivKey, value path.Path) error {
return ns.PublishWithEOL(ctx, name, value, time.Now().Add(DefaultRecordTTL))

View File

@ -112,10 +112,7 @@ func (r *IpnsResolver) resolveOnceAsync(ctx context.Context, name string, option
err = proto.Unmarshal(val, entry)
if err != nil {
log.Debugf("RoutingResolver: could not unmarshal value for name %s: %s", name, err)
select {
case out <- onceResult{err: err}:
case <-ctx.Done():
}
emitOnceResult(ctx, out, onceResult{err: err})
return
}
@ -129,10 +126,7 @@ func (r *IpnsResolver) resolveOnceAsync(ctx context.Context, name string, option
// Not a multihash, probably a new style record
p, err = path.ParsePath(string(entry.GetValue()))
if err != nil {
select {
case out <- onceResult{err: err}:
case <-ctx.Done():
}
emitOnceResult(ctx, out, onceResult{err: err})
return
}
}
@ -154,17 +148,11 @@ func (r *IpnsResolver) resolveOnceAsync(ctx context.Context, name string, option
}
default:
log.Errorf("encountered error when parsing EOL: %s", err)
select {
case out <- onceResult{err: err}:
case <-ctx.Done():
}
emitOnceResult(ctx, out, onceResult{err: err})
return
}
select {
case out <- onceResult{value: p, ttl: ttl}:
case <-ctx.Done():
}
emitOnceResult(ctx, out, onceResult{value: p, ttl: ttl})
case <-ctx.Done():
return
}