diff --git a/core/commands/get.go b/core/commands/get.go index 70ab9ec62..42b52bb6a 100644 --- a/core/commands/get.go +++ b/core/commands/get.go @@ -1,7 +1,6 @@ package commands import ( - "bufio" "compress/gzip" "errors" "fmt" @@ -11,13 +10,11 @@ import ( "strings" "github.com/ipfs/go-ipfs/Godeps/_workspace/src/github.com/cheggaaa/pb" - context "github.com/ipfs/go-ipfs/Godeps/_workspace/src/golang.org/x/net/context" cmds "github.com/ipfs/go-ipfs/commands" core "github.com/ipfs/go-ipfs/core" path "github.com/ipfs/go-ipfs/path" tar "github.com/ipfs/go-ipfs/thirdparty/tar" - uio "github.com/ipfs/go-ipfs/unixfs/io" utar "github.com/ipfs/go-ipfs/unixfs/tar" ) @@ -64,15 +61,16 @@ may also specify the level of compression by specifying '-l=<1-9>'. res.SetError(err, cmds.ErrNormal) return } - p := path.Path(req.Arguments()[0]) - var reader io.Reader - if archive, _, _ := req.Option("archive").Bool(); !archive && cmplvl != gzip.NoCompression { - // only use this when the flag is '-C' without '-a' - reader, err = getZip(req.Context(), node, p, cmplvl) - } else { - reader, err = get(req.Context(), node, p, cmplvl) + ctx := req.Context() + dn, err := core.Resolve(ctx, node, p) + if err != nil { + res.SetError(err, cmds.ErrNormal) + return } + + archive, _, _ := req.Option("archive").Bool() + reader, err := utar.DagArchive(ctx, dn, p.String(), node.DAG, archive, cmplvl) if err != nil { res.SetError(err, cmds.ErrNormal) return @@ -192,42 +190,3 @@ func getCompressOptions(req cmds.Request) (int, error) { } return gzip.NoCompression, nil } - -func get(ctx context.Context, node *core.IpfsNode, p path.Path, compression int) (io.Reader, error) { - dn, err := core.Resolve(ctx, node, p) - if err != nil { - return nil, err - } - - return utar.DagArchive(ctx, dn, p.String(), node.DAG, compression) -} - -// getZip is equivalent to `ipfs getdag $hash | gzip` -func getZip(ctx context.Context, node *core.IpfsNode, p path.Path, compression int) (io.Reader, error) { - dagnode, err := core.Resolve(ctx, node, p) - if err != nil { - return nil, err - } - - reader, err := uio.NewDagReader(ctx, dagnode, node.DAG) - if err != nil { - return nil, err - } - - pr, pw := io.Pipe() - gw, err := gzip.NewWriterLevel(pw, compression) - if err != nil { - return nil, err - } - bufin := bufio.NewReader(reader) - go func() { - _, err := bufin.WriteTo(gw) - if err != nil { - log.Error("Fail to compress the stream") - } - gw.Close() - pw.Close() - }() - - return pr, nil -} diff --git a/fuse/readonly/readonly_unix.go b/fuse/readonly/readonly_unix.go index fe8321a8a..290618791 100644 --- a/fuse/readonly/readonly_unix.go +++ b/fuse/readonly/readonly_unix.go @@ -180,7 +180,7 @@ func (s *Node) Read(ctx context.Context, req *fuse.ReadRequest, resp *fuse.ReadR return err } - buf := resp.Data[:min(req.Size, int(r.Size()-req.Offset))] + buf := resp.Data[:min(req.Size, int(int64(r.Size())-req.Offset))] n, err := io.ReadFull(r, buf) if err != nil && err != io.EOF { return err diff --git a/unixfs/io/dagreader.go b/unixfs/io/dagreader.go index def8c1501..1426f10cc 100644 --- a/unixfs/io/dagreader.go +++ b/unixfs/io/dagreader.go @@ -58,8 +58,7 @@ type ReadSeekCloser interface { // node, using the passed in DAGService for data retreival func NewDagReader(ctx context.Context, n *mdag.Node, serv mdag.DAGService) (*DagReader, error) { pb := new(ftpb.Data) - err := proto.Unmarshal(n.Data, pb) - if err != nil { + if err := proto.Unmarshal(n.Data, pb); err != nil { return nil, err } @@ -70,7 +69,7 @@ func NewDagReader(ctx context.Context, n *mdag.Node, serv mdag.DAGService) (*Dag case ftpb.Data_Raw: fallthrough case ftpb.Data_File: - return newDataFileReader(ctx, n, pb, serv), nil + return NewDataFileReader(ctx, n, pb, serv), nil case ftpb.Data_Metadata: if len(n.Links) == 0 { return nil, errors.New("incorrectly formatted metadata object") @@ -85,7 +84,7 @@ func NewDagReader(ctx context.Context, n *mdag.Node, serv mdag.DAGService) (*Dag } } -func newDataFileReader(ctx context.Context, n *mdag.Node, pb *ftpb.Data, serv mdag.DAGService) *DagReader { +func NewDataFileReader(ctx context.Context, n *mdag.Node, pb *ftpb.Data, serv mdag.DAGService) *DagReader { fctx, cancel := context.WithCancel(ctx) promises := serv.GetDAG(fctx, n) return &DagReader{ @@ -124,7 +123,7 @@ func (dr *DagReader) precalcNextBuf(ctx context.Context) error { // A directory should not exist within a file return ft.ErrInvalidDirLocation case ftpb.Data_File: - dr.buf = newDataFileReader(dr.ctx, nxt, pb, dr.serv) + dr.buf = NewDataFileReader(dr.ctx, nxt, pb, dr.serv) return nil case ftpb.Data_Raw: dr.buf = NewRSNCFromBytes(pb.GetData()) @@ -137,8 +136,8 @@ func (dr *DagReader) precalcNextBuf(ctx context.Context) error { } // Size return the total length of the data from the DAG structured file. -func (dr *DagReader) Size() int64 { - return int64(dr.pbdata.GetFilesize()) +func (dr *DagReader) Size() uint64 { + return dr.pbdata.GetFilesize() } // Read reads data from the DAG structured file diff --git a/unixfs/tar/writer.go b/unixfs/tar/writer.go index 125beed96..9e519b368 100644 --- a/unixfs/tar/writer.go +++ b/unixfs/tar/writer.go @@ -4,7 +4,6 @@ import ( "archive/tar" "bufio" "compress/gzip" - "fmt" "io" "path" "time" @@ -13,6 +12,7 @@ import ( cxt "github.com/ipfs/go-ipfs/Godeps/_workspace/src/golang.org/x/net/context" mdag "github.com/ipfs/go-ipfs/merkledag" + ft "github.com/ipfs/go-ipfs/unixfs" uio "github.com/ipfs/go-ipfs/unixfs/io" upb "github.com/ipfs/go-ipfs/unixfs/pb" ) @@ -21,7 +21,8 @@ import ( // TODO: does this need to be configurable? var DefaultBufSize = 1048576 -func DagArchive(ctx cxt.Context, nd *mdag.Node, name string, dag mdag.DAGService, compression int) (io.Reader, error) { +// DagArchive is equivalent to `ipfs getdag $hash | maybe_tar | maybe_gzip` +func DagArchive(ctx cxt.Context, nd *mdag.Node, name string, dag mdag.DAGService, archive bool, compression int) (io.Reader, error) { _, filename := path.Split(name) @@ -31,17 +32,44 @@ func DagArchive(ctx cxt.Context, nd *mdag.Node, name string, dag mdag.DAGService // use a buffered writer to parallelize task bufw := bufio.NewWriterSize(pipew, DefaultBufSize) + // compression determines whether to use gzip compression. + var maybeGzw io.Writer + if compression != gzip.NoCompression { + var err error + maybeGzw, err = gzip.NewWriterLevel(bufw, compression) + if err != nil { + return nil, err + } + } else { + maybeGzw = bufw + } + // construct the tar writer - w, err := NewWriter(bufw, dag, compression) + w, err := NewWriter(ctx, dag, archive, compression, maybeGzw) if err != nil { return nil, err } // write all the nodes recursively go func() { - if err := w.WriteNode(ctx, nd, filename); err != nil { - pipew.CloseWithError(err) - return + if !archive && compression != gzip.NoCompression { + // the case when the node is a file + dagr, err := uio.NewDagReader(w.ctx, nd, w.Dag) + if err != nil { + pipew.CloseWithError(err) + return + } + + if _, err := dagr.WriteTo(maybeGzw); err != nil { + pipew.CloseWithError(err) + return + } + } else { + // the case for 1. archive, and 2. not archived and not compressed, in which tar is used anyway as a transport format + if err := w.WriteNode(nd, filename); err != nil { + pipew.CloseWithError(err) + return + } } if err := bufw.Flush(); err != nil { @@ -49,6 +77,7 @@ func DagArchive(ctx cxt.Context, nd *mdag.Node, name string, dag mdag.DAGService return } + w.Close() pipew.Close() // everything seems to be ok. }() @@ -61,39 +90,32 @@ func DagArchive(ctx cxt.Context, nd *mdag.Node, name string, dag mdag.DAGService type Writer struct { Dag mdag.DAGService TarW *tar.Writer + + ctx cxt.Context } // NewWriter wraps given io.Writer. -// compression determines whether to use gzip compression. -func NewWriter(w io.Writer, dag mdag.DAGService, compression int) (*Writer, error) { - - if compression != gzip.NoCompression { - var err error - w, err = gzip.NewWriterLevel(w, compression) - if err != nil { - return nil, err - } - } - +func NewWriter(ctx cxt.Context, dag mdag.DAGService, archive bool, compression int, w io.Writer) (*Writer, error) { return &Writer{ Dag: dag, TarW: tar.NewWriter(w), + ctx: ctx, }, nil } -func (w *Writer) WriteDir(ctx cxt.Context, nd *mdag.Node, fpath string) error { +func (w *Writer) writeDir(nd *mdag.Node, fpath string) error { if err := writeDirHeader(w.TarW, fpath); err != nil { return err } - for i, ng := range w.Dag.GetDAG(ctx, nd) { - child, err := ng.Get(ctx) + for i, ng := range w.Dag.GetDAG(w.ctx, nd) { + child, err := ng.Get(w.ctx) if err != nil { return err } npath := path.Join(fpath, nd.Links[i].Name) - if err := w.WriteNode(ctx, child, npath); err != nil { + if err := w.WriteNode(child, npath); err != nil { return err } } @@ -101,46 +123,33 @@ func (w *Writer) WriteDir(ctx cxt.Context, nd *mdag.Node, fpath string) error { return nil } -func (w *Writer) WriteFile(ctx cxt.Context, nd *mdag.Node, fpath string) error { - pb := new(upb.Data) - if err := proto.Unmarshal(nd.Data, pb); err != nil { - return err - } - - return w.writeFile(ctx, nd, pb, fpath) -} - -func (w *Writer) writeFile(ctx cxt.Context, nd *mdag.Node, pb *upb.Data, fpath string) error { +func (w *Writer) writeFile(nd *mdag.Node, pb *upb.Data, fpath string) error { if err := writeFileHeader(w.TarW, fpath, pb.GetFilesize()); err != nil { return err } - dagr, err := uio.NewDagReader(ctx, nd, w.Dag) - if err != nil { - return err - } - - _, err = io.Copy(w.TarW, dagr) - if err != nil && err != io.EOF { - return err - } - - return nil + dagr := uio.NewDataFileReader(w.ctx, nd, pb, w.Dag) + _, err := dagr.WriteTo(w.TarW) + return err } -func (w *Writer) WriteNode(ctx cxt.Context, nd *mdag.Node, fpath string) error { +func (w *Writer) WriteNode(nd *mdag.Node, fpath string) error { pb := new(upb.Data) if err := proto.Unmarshal(nd.Data, pb); err != nil { return err } switch pb.GetType() { + case upb.Data_Metadata: + fallthrough case upb.Data_Directory: - return w.WriteDir(ctx, nd, fpath) + return w.writeDir(nd, fpath) + case upb.Data_Raw: + fallthrough case upb.Data_File: - return w.writeFile(ctx, nd, pb, fpath) + return w.writeFile(nd, pb, fpath) default: - return fmt.Errorf("unixfs type not supported: %s", pb.GetType()) + return ft.ErrUnrecognizedType } }