diff --git a/core/commands/get.go b/core/commands/get.go index 70ab9ec62..ea859c9a0 100644 --- a/core/commands/get.go +++ b/core/commands/get.go @@ -1,7 +1,6 @@ package commands import ( - "bufio" "compress/gzip" "errors" "fmt" @@ -11,14 +10,12 @@ import ( "strings" "github.com/ipfs/go-ipfs/Godeps/_workspace/src/github.com/cheggaaa/pb" - context "github.com/ipfs/go-ipfs/Godeps/_workspace/src/golang.org/x/net/context" cmds "github.com/ipfs/go-ipfs/commands" core "github.com/ipfs/go-ipfs/core" path "github.com/ipfs/go-ipfs/path" tar "github.com/ipfs/go-ipfs/thirdparty/tar" - uio "github.com/ipfs/go-ipfs/unixfs/io" - utar "github.com/ipfs/go-ipfs/unixfs/tar" + uarchive "github.com/ipfs/go-ipfs/unixfs/archive" ) var ErrInvalidCompressionLevel = errors.New("Compression level must be between 1 and 9") @@ -64,15 +61,16 @@ may also specify the level of compression by specifying '-l=<1-9>'. res.SetError(err, cmds.ErrNormal) return } - p := path.Path(req.Arguments()[0]) - var reader io.Reader - if archive, _, _ := req.Option("archive").Bool(); !archive && cmplvl != gzip.NoCompression { - // only use this when the flag is '-C' without '-a' - reader, err = getZip(req.Context(), node, p, cmplvl) - } else { - reader, err = get(req.Context(), node, p, cmplvl) + ctx := req.Context() + dn, err := core.Resolve(ctx, node, p) + if err != nil { + res.SetError(err, cmds.ErrNormal) + return } + + archive, _, _ := req.Option("archive").Bool() + reader, err := uarchive.DagArchive(ctx, dn, p.String(), node.DAG, archive, cmplvl) if err != nil { res.SetError(err, cmds.ErrNormal) return @@ -192,42 +190,3 @@ func getCompressOptions(req cmds.Request) (int, error) { } return gzip.NoCompression, nil } - -func get(ctx context.Context, node *core.IpfsNode, p path.Path, compression int) (io.Reader, error) { - dn, err := core.Resolve(ctx, node, p) - if err != nil { - return nil, err - } - - return utar.DagArchive(ctx, dn, p.String(), node.DAG, compression) -} - -// getZip is equivalent to `ipfs getdag $hash | gzip` -func getZip(ctx context.Context, node *core.IpfsNode, p path.Path, compression int) (io.Reader, error) { - dagnode, err := core.Resolve(ctx, node, p) - if err != nil { - return nil, err - } - - reader, err := uio.NewDagReader(ctx, dagnode, node.DAG) - if err != nil { - return nil, err - } - - pr, pw := io.Pipe() - gw, err := gzip.NewWriterLevel(pw, compression) - if err != nil { - return nil, err - } - bufin := bufio.NewReader(reader) - go func() { - _, err := bufin.WriteTo(gw) - if err != nil { - log.Error("Fail to compress the stream") - } - gw.Close() - pw.Close() - }() - - return pr, nil -} diff --git a/fuse/readonly/readonly_unix.go b/fuse/readonly/readonly_unix.go index fe8321a8a..290618791 100644 --- a/fuse/readonly/readonly_unix.go +++ b/fuse/readonly/readonly_unix.go @@ -180,7 +180,7 @@ func (s *Node) Read(ctx context.Context, req *fuse.ReadRequest, resp *fuse.ReadR return err } - buf := resp.Data[:min(req.Size, int(r.Size()-req.Offset))] + buf := resp.Data[:min(req.Size, int(int64(r.Size())-req.Offset))] n, err := io.ReadFull(r, buf) if err != nil && err != io.EOF { return err diff --git a/unixfs/archive/archive.go b/unixfs/archive/archive.go new file mode 100644 index 000000000..d530461e7 --- /dev/null +++ b/unixfs/archive/archive.go @@ -0,0 +1,83 @@ +package archive + +import ( + "bufio" + "compress/gzip" + "io" + "path" + + cxt "github.com/ipfs/go-ipfs/Godeps/_workspace/src/golang.org/x/net/context" + + mdag "github.com/ipfs/go-ipfs/merkledag" + tar "github.com/ipfs/go-ipfs/unixfs/archive/tar" + uio "github.com/ipfs/go-ipfs/unixfs/io" +) + +// DefaultBufSize is the buffer size for gets. for now, 1MB, which is ~4 blocks. +// TODO: does this need to be configurable? +var DefaultBufSize = 1048576 + +// DagArchive is equivalent to `ipfs getdag $hash | maybe_tar | maybe_gzip` +func DagArchive(ctx cxt.Context, nd *mdag.Node, name string, dag mdag.DAGService, archive bool, compression int) (io.Reader, error) { + + _, filename := path.Split(name) + + // need to connect a writer to a reader + piper, pipew := io.Pipe() + + // use a buffered writer to parallelize task + bufw := bufio.NewWriterSize(pipew, DefaultBufSize) + + // compression determines whether to use gzip compression. + var maybeGzw io.Writer + if compression != gzip.NoCompression { + var err error + maybeGzw, err = gzip.NewWriterLevel(bufw, compression) + if err != nil { + return nil, err + } + } else { + maybeGzw = bufw + } + + if !archive && compression != gzip.NoCompression { + // the case when the node is a file + dagr, err := uio.NewDagReader(ctx, nd, dag) + if err != nil { + pipew.CloseWithError(err) + return nil, err + } + + go func() { + if _, err := dagr.WriteTo(maybeGzw); err != nil { + pipew.CloseWithError(err) + return + } + pipew.Close() // everything seems to be ok. + }() + } else { + // the case for 1. archive, and 2. not archived and not compressed, in which tar is used anyway as a transport format + + // construct the tar writer + w, err := tar.NewWriter(ctx, dag, archive, compression, maybeGzw) + if err != nil { + return nil, err + } + + go func() { + // write all the nodes recursively + if err := w.WriteNode(nd, filename); err != nil { + pipew.CloseWithError(err) + return + } + if err := bufw.Flush(); err != nil { + pipew.CloseWithError(err) + return + } + w.Close() + pipew.Close() // everything seems to be ok. + }() + } + + return piper, nil +} diff --git a/unixfs/archive/tar/writer.go b/unixfs/archive/tar/writer.go new file mode 100644 index 000000000..73aeafa4b --- /dev/null +++ b/unixfs/archive/tar/writer.go @@ -0,0 +1,110 @@ +package tar + +import ( + "archive/tar" + "io" + "path" + "time" + + proto "github.com/ipfs/go-ipfs/Godeps/_workspace/src/github.com/gogo/protobuf/proto" + cxt "github.com/ipfs/go-ipfs/Godeps/_workspace/src/golang.org/x/net/context" + + mdag "github.com/ipfs/go-ipfs/merkledag" + ft "github.com/ipfs/go-ipfs/unixfs" + uio "github.com/ipfs/go-ipfs/unixfs/io" + upb "github.com/ipfs/go-ipfs/unixfs/pb" +) + +// Writer is a utility structure that helps to write +// unixfs merkledag nodes as a tar archive format. +// It wraps any io.Writer. +type Writer struct { + Dag mdag.DAGService + TarW *tar.Writer + + ctx cxt.Context +} + +// NewWriter wraps given io.Writer. +func NewWriter(ctx cxt.Context, dag mdag.DAGService, archive bool, compression int, w io.Writer) (*Writer, error) { + return &Writer{ + Dag: dag, + TarW: tar.NewWriter(w), + ctx: ctx, + }, nil +} + +func (w *Writer) writeDir(nd *mdag.Node, fpath string) error { + if err := writeDirHeader(w.TarW, fpath); err != nil { + return err + } + + for i, ng := range w.Dag.GetDAG(w.ctx, nd) { + child, err := ng.Get(w.ctx) + if err != nil { + return err + } + + npath := path.Join(fpath, nd.Links[i].Name) + if err := w.WriteNode(child, npath); err != nil { + return err + } + } + + return nil +} + +func (w *Writer) writeFile(nd *mdag.Node, pb *upb.Data, fpath string) error { + if err := writeFileHeader(w.TarW, fpath, pb.GetFilesize()); err != nil { + return err + } + + dagr := uio.NewDataFileReader(w.ctx, nd, pb, w.Dag) + _, err := dagr.WriteTo(w.TarW) + return err +} + +func (w *Writer) WriteNode(nd *mdag.Node, fpath string) error { + pb := new(upb.Data) + if err := proto.Unmarshal(nd.Data, pb); err != nil { + return err + } + + switch pb.GetType() { + case upb.Data_Metadata: + fallthrough + case upb.Data_Directory: + return w.writeDir(nd, fpath) + case upb.Data_Raw: + fallthrough + case upb.Data_File: + return w.writeFile(nd, pb, fpath) + default: + return ft.ErrUnrecognizedType + } +} + +func (w *Writer) Close() error { + return w.TarW.Close() +} + +func writeDirHeader(w *tar.Writer, fpath string) error { + return w.WriteHeader(&tar.Header{ + Name: fpath, + Typeflag: tar.TypeDir, + Mode: 0777, + ModTime: time.Now(), + // TODO: set mode, dates, etc. when added to unixFS + }) +} + +func writeFileHeader(w *tar.Writer, fpath string, size uint64) error { + return w.WriteHeader(&tar.Header{ + Name: fpath, + Size: int64(size), + Typeflag: tar.TypeReg, + Mode: 0644, + ModTime: time.Now(), + // TODO: set mode, dates, etc. when added to unixFS + }) +} diff --git a/unixfs/io/dagreader.go b/unixfs/io/dagreader.go index def8c1501..1426f10cc 100644 --- a/unixfs/io/dagreader.go +++ b/unixfs/io/dagreader.go @@ -58,8 +58,7 @@ type ReadSeekCloser interface { // node, using the passed in DAGService for data retreival func NewDagReader(ctx context.Context, n *mdag.Node, serv mdag.DAGService) (*DagReader, error) { pb := new(ftpb.Data) - err := proto.Unmarshal(n.Data, pb) - if err != nil { + if err := proto.Unmarshal(n.Data, pb); err != nil { return nil, err } @@ -70,7 +69,7 @@ func NewDagReader(ctx context.Context, n *mdag.Node, serv mdag.DAGService) (*Dag case ftpb.Data_Raw: fallthrough case ftpb.Data_File: - return newDataFileReader(ctx, n, pb, serv), nil + return NewDataFileReader(ctx, n, pb, serv), nil case ftpb.Data_Metadata: if len(n.Links) == 0 { return nil, errors.New("incorrectly formatted metadata object") @@ -85,7 +84,7 @@ func NewDagReader(ctx context.Context, n *mdag.Node, serv mdag.DAGService) (*Dag } } -func newDataFileReader(ctx context.Context, n *mdag.Node, pb *ftpb.Data, serv mdag.DAGService) *DagReader { +func NewDataFileReader(ctx context.Context, n *mdag.Node, pb *ftpb.Data, serv mdag.DAGService) *DagReader { fctx, cancel := context.WithCancel(ctx) promises := serv.GetDAG(fctx, n) return &DagReader{ @@ -124,7 +123,7 @@ func (dr *DagReader) precalcNextBuf(ctx context.Context) error { // A directory should not exist within a file return ft.ErrInvalidDirLocation case ftpb.Data_File: - dr.buf = newDataFileReader(dr.ctx, nxt, pb, dr.serv) + dr.buf = NewDataFileReader(dr.ctx, nxt, pb, dr.serv) return nil case ftpb.Data_Raw: dr.buf = NewRSNCFromBytes(pb.GetData()) @@ -137,8 +136,8 @@ func (dr *DagReader) precalcNextBuf(ctx context.Context) error { } // Size return the total length of the data from the DAG structured file. -func (dr *DagReader) Size() int64 { - return int64(dr.pbdata.GetFilesize()) +func (dr *DagReader) Size() uint64 { + return dr.pbdata.GetFilesize() } // Read reads data from the DAG structured file diff --git a/unixfs/tar/writer.go b/unixfs/tar/writer.go deleted file mode 100644 index 125beed96..000000000 --- a/unixfs/tar/writer.go +++ /dev/null @@ -1,170 +0,0 @@ -package tar - -import ( - "archive/tar" - "bufio" - "compress/gzip" - "fmt" - "io" - "path" - "time" - - proto "github.com/ipfs/go-ipfs/Godeps/_workspace/src/github.com/gogo/protobuf/proto" - cxt "github.com/ipfs/go-ipfs/Godeps/_workspace/src/golang.org/x/net/context" - - mdag "github.com/ipfs/go-ipfs/merkledag" - uio "github.com/ipfs/go-ipfs/unixfs/io" - upb "github.com/ipfs/go-ipfs/unixfs/pb" -) - -// DefaultBufSize is the buffer size for gets. for now, 1MB, which is ~4 blocks. -// TODO: does this need to be configurable? -var DefaultBufSize = 1048576 - -func DagArchive(ctx cxt.Context, nd *mdag.Node, name string, dag mdag.DAGService, compression int) (io.Reader, error) { - - _, filename := path.Split(name) - - // need to connect a writer to a reader - piper, pipew := io.Pipe() - - // use a buffered writer to parallelize task - bufw := bufio.NewWriterSize(pipew, DefaultBufSize) - - // construct the tar writer - w, err := NewWriter(bufw, dag, compression) - if err != nil { - return nil, err - } - - // write all the nodes recursively - go func() { - if err := w.WriteNode(ctx, nd, filename); err != nil { - pipew.CloseWithError(err) - return - } - - if err := bufw.Flush(); err != nil { - pipew.CloseWithError(err) - return - } - - pipew.Close() // everything seems to be ok. - }() - - return piper, nil -} - -// Writer is a utility structure that helps to write -// unixfs merkledag nodes as a tar archive format. -// It wraps any io.Writer. -type Writer struct { - Dag mdag.DAGService - TarW *tar.Writer -} - -// NewWriter wraps given io.Writer. -// compression determines whether to use gzip compression. -func NewWriter(w io.Writer, dag mdag.DAGService, compression int) (*Writer, error) { - - if compression != gzip.NoCompression { - var err error - w, err = gzip.NewWriterLevel(w, compression) - if err != nil { - return nil, err - } - } - - return &Writer{ - Dag: dag, - TarW: tar.NewWriter(w), - }, nil -} - -func (w *Writer) WriteDir(ctx cxt.Context, nd *mdag.Node, fpath string) error { - if err := writeDirHeader(w.TarW, fpath); err != nil { - return err - } - - for i, ng := range w.Dag.GetDAG(ctx, nd) { - child, err := ng.Get(ctx) - if err != nil { - return err - } - - npath := path.Join(fpath, nd.Links[i].Name) - if err := w.WriteNode(ctx, child, npath); err != nil { - return err - } - } - - return nil -} - -func (w *Writer) WriteFile(ctx cxt.Context, nd *mdag.Node, fpath string) error { - pb := new(upb.Data) - if err := proto.Unmarshal(nd.Data, pb); err != nil { - return err - } - - return w.writeFile(ctx, nd, pb, fpath) -} - -func (w *Writer) writeFile(ctx cxt.Context, nd *mdag.Node, pb *upb.Data, fpath string) error { - if err := writeFileHeader(w.TarW, fpath, pb.GetFilesize()); err != nil { - return err - } - - dagr, err := uio.NewDagReader(ctx, nd, w.Dag) - if err != nil { - return err - } - - _, err = io.Copy(w.TarW, dagr) - if err != nil && err != io.EOF { - return err - } - - return nil -} - -func (w *Writer) WriteNode(ctx cxt.Context, nd *mdag.Node, fpath string) error { - pb := new(upb.Data) - if err := proto.Unmarshal(nd.Data, pb); err != nil { - return err - } - - switch pb.GetType() { - case upb.Data_Directory: - return w.WriteDir(ctx, nd, fpath) - case upb.Data_File: - return w.writeFile(ctx, nd, pb, fpath) - default: - return fmt.Errorf("unixfs type not supported: %s", pb.GetType()) - } -} - -func (w *Writer) Close() error { - return w.TarW.Close() -} - -func writeDirHeader(w *tar.Writer, fpath string) error { - return w.WriteHeader(&tar.Header{ - Name: fpath, - Typeflag: tar.TypeDir, - Mode: 0777, - ModTime: time.Now(), - // TODO: set mode, dates, etc. when added to unixFS - }) -} - -func writeFileHeader(w *tar.Writer, fpath string, size uint64) error { - return w.WriteHeader(&tar.Header{ - Name: fpath, - Size: int64(size), - Typeflag: tar.TypeReg, - Mode: 0644, - ModTime: time.Now(), - // TODO: set mode, dates, etc. when added to unixFS - }) -}