1
0
mirror of https://github.com/ipfs/kubo.git synced 2025-06-19 18:05:32 +08:00

fix silent refs failure

This commit is contained in:
Jeromy
2015-05-19 17:58:57 -07:00
parent 01e1e71221
commit 002cf5128e
2 changed files with 48 additions and 52 deletions

View File

@ -2,9 +2,10 @@ package commands
import (
"bytes"
"errors"
"fmt"
"io"
"strings"
"sync"
context "github.com/ipfs/go-ipfs/Godeps/_workspace/src/golang.org/x/net/context"
cmds "github.com/ipfs/go-ipfs/commands"
@ -91,14 +92,14 @@ Note: list all refs recursively with -r.
return
}
piper, pipew := io.Pipe()
eptr := &ErrPassThroughReader{R: piper}
out := make(chan interface{})
res.SetOutput((<-chan interface{})(out))
go func() {
defer pipew.Close()
defer close(out)
rw := RefWriter{
W: pipew,
out: out,
DAG: n.DAG,
Ctx: ctx,
Unique: unique,
@ -109,14 +110,40 @@ Note: list all refs recursively with -r.
for _, o := range objs {
if _, err := rw.WriteRefs(o); err != nil {
eptr.SetError(err)
out <- &RefWrapper{Err: err.Error()}
return
}
}
}()
res.SetOutput(eptr)
},
Marshalers: cmds.MarshalerMap{
cmds.Text: func(res cmds.Response) (io.Reader, error) {
outChan, ok := res.Output().(<-chan interface{})
if !ok {
return nil, u.ErrCast()
}
marshal := func(v interface{}) (io.Reader, error) {
obj, ok := v.(*RefWrapper)
if !ok {
fmt.Println("%#v", v)
return nil, u.ErrCast()
}
if obj.Err != "" {
return nil, errors.New(obj.Err)
}
return strings.NewReader(obj.Ref), nil
}
return &cmds.ChannelMarshaler{
Channel: outChan,
Marshaler: marshal,
}, nil
},
},
Type: RefWrapper{},
}
var RefsLocalCmd = &cmds.Command{
@ -143,7 +170,6 @@ Displays the hashes of all local objects.
}
piper, pipew := io.Pipe()
eptr := &ErrPassThroughReader{R: piper}
go func() {
defer pipew.Close()
@ -151,13 +177,13 @@ Displays the hashes of all local objects.
for k := range allKeys {
s := k.Pretty() + "\n"
if _, err := pipew.Write([]byte(s)); err != nil {
eptr.SetError(err)
log.Error("pipe write error: ", err)
return
}
}
}()
res.SetOutput(eptr)
res.SetOutput(piper)
},
}
@ -173,46 +199,13 @@ func objectsForPaths(ctx context.Context, n *core.IpfsNode, paths []string) ([]*
return objects, nil
}
// ErrPassThroughReader is a reader that may return an externally set error.
type ErrPassThroughReader struct {
R io.ReadCloser
err error
sync.RWMutex
}
func (r *ErrPassThroughReader) Error() error {
r.RLock()
defer r.RUnlock()
return r.err
}
func (r *ErrPassThroughReader) SetError(err error) {
r.Lock()
r.err = err
r.Unlock()
}
func (r *ErrPassThroughReader) Read(buf []byte) (int, error) {
err := r.Error()
if err != nil {
return 0, err
}
return r.R.Read(buf)
}
func (r *ErrPassThroughReader) Close() error {
err1 := r.R.Close()
err2 := r.Error()
if err2 != nil {
return err2
}
return err1
type RefWrapper struct {
Ref string
Err string
}
type RefWriter struct {
W io.Writer
out chan interface{}
DAG dag.DAGService
Ctx context.Context
@ -335,8 +328,6 @@ func (rw *RefWriter) WriteEdge(from, to u.Key, linkname string) error {
}
s += "\n"
if _, err := rw.W.Write([]byte(s)); err != nil {
return err
}
rw.out <- &RefWrapper{Ref: s}
return nil
}