diff --git a/core/commands/get.go b/core/commands/get.go index 48cc034ee..70ab9ec62 100644 --- a/core/commands/get.go +++ b/core/commands/get.go @@ -98,54 +98,87 @@ may also specify the level of compression by specifying '-l=<1-9>'. return } - if archive, _, _ := req.Option("archive").Bool(); archive || cmplvl != gzip.NoCompression { - if archive && !strings.HasSuffix(outPath, ".tar") { - outPath += ".tar" - } - if cmplvl != gzip.NoCompression { - outPath += ".gz" - } - fmt.Printf("Saving archive to %s\n", outPath) + archive, _, _ := req.Option("archive").Bool() - file, err := os.Create(outPath) - if err != nil { - res.SetError(err, cmds.ErrNormal) - return - } - defer file.Close() - - bar := pb.New(0).SetUnits(pb.U_BYTES) - bar.Output = os.Stderr - pbReader := bar.NewProxyReader(outReader) - bar.Start() - defer bar.Finish() - - if _, err := io.Copy(file, pbReader); err != nil { - res.SetError(err, cmds.ErrNormal) - return - } + gw := getWriter{ + Out: os.Stdout, + Err: os.Stderr, + Archive: archive, + Compression: cmplvl, + } + if err := gw.Write(outReader, outPath); err != nil { + res.SetError(err, cmds.ErrNormal) return } - - fmt.Printf("Saving file(s) to %s\n", outPath) - - // TODO: get total length of files - bar := pb.New(0).SetUnits(pb.U_BYTES) - bar.Output = os.Stderr - - // wrap the reader with the progress bar proxy reader - reader := bar.NewProxyReader(outReader) - - bar.Start() - defer bar.Finish() - extractor := &tar.Extractor{outPath} - if err := extractor.Extract(reader); err != nil { - res.SetError(err, cmds.ErrNormal) - } }, } +func progressBarForReader(out io.Writer, r io.Reader) (*pb.ProgressBar, *pb.Reader) { + // setup bar reader + // TODO: get total length of files + bar := pb.New(0).SetUnits(pb.U_BYTES) + bar.Output = out + barR := bar.NewProxyReader(r) + return bar, barR +} + +type getWriter struct { + Out io.Writer // for output to user + Err io.Writer // for progress bar output + + Archive bool + Compression int +} + +func (gw *getWriter) Write(r io.Reader, fpath string) error { + if gw.Archive || gw.Compression != gzip.NoCompression { + return gw.writeArchive(r, fpath) + } + return gw.writeExtracted(r, fpath) +} + +func (gw *getWriter) writeArchive(r io.Reader, fpath string) error { + // adjust file name if tar + if gw.Archive { + if !strings.HasSuffix(fpath, ".tar") && !strings.HasSuffix(fpath, ".tar.gz") { + fpath += ".tar" + } + } + + // adjust file name if gz + if gw.Compression != gzip.NoCompression { + if !strings.HasSuffix(fpath, ".gz") { + fpath += ".gz" + } + } + + // create file + file, err := os.Create(fpath) + if err != nil { + return err + } + defer file.Close() + + fmt.Fprintf(gw.Out, "Saving archive to %s\n", fpath) + bar, barR := progressBarForReader(gw.Err, r) + bar.Start() + defer bar.Finish() + + _, err = io.Copy(file, barR) + return err +} + +func (gw *getWriter) writeExtracted(r io.Reader, fpath string) error { + fmt.Fprintf(gw.Out, "Saving file(s) to %s\n", fpath) + bar, barR := progressBarForReader(gw.Err, r) + bar.Start() + defer bar.Finish() + + extractor := &tar.Extractor{fpath} + return extractor.Extract(barR) +} + func getCompressOptions(req cmds.Request) (int, error) { cmprs, _, _ := req.Option("compress").Bool() cmplvl, cmplvlFound, _ := req.Option("compression-level").Int() @@ -161,12 +194,12 @@ func getCompressOptions(req cmds.Request) (int, error) { } func get(ctx context.Context, node *core.IpfsNode, p path.Path, compression int) (io.Reader, error) { - dagnode, err := core.Resolve(ctx, node, p) + dn, err := core.Resolve(ctx, node, p) if err != nil { return nil, err } - return utar.NewReader(ctx, p, node.DAG, dagnode, compression) + return utar.DagArchive(ctx, dn, p.String(), node.DAG, compression) } // getZip is equivalent to `ipfs getdag $hash | gzip` diff --git a/test/sharness/t0090-get.sh b/test/sharness/t0090-get.sh index 5c9358454..f00bf7e89 100755 --- a/test/sharness/t0090-get.sh +++ b/test/sharness/t0090-get.sh @@ -37,9 +37,8 @@ test_get_cmd() { test_cmp "$HASH" data ' - # this started failing after this change. fixed in later commit - test_expect_failure "ipfs get errors when trying to overwrite a file" ' - test_must_fail ipfs get "$HASH" >actual && + test_expect_success "ipfs get DOES NOT error when trying to overwrite a file" ' + ipfs get "$HASH" >actual && rm "$HASH" ' diff --git a/thirdparty/tar/extractor.go b/thirdparty/tar/extractor.go index f21224817..7a526f508 100644 --- a/thirdparty/tar/extractor.go +++ b/thirdparty/tar/extractor.go @@ -4,6 +4,7 @@ import ( "archive/tar" "io" "os" + gopath "path" fp "path/filepath" "strings" ) @@ -39,7 +40,7 @@ func (te *Extractor) Extract(reader io.Reader) error { } if header.Typeflag == tar.TypeDir { - if err := te.extractDir(header, i, rootExists); err != nil { + if err := te.extractDir(header, i); err != nil { return err } continue @@ -52,13 +53,19 @@ func (te *Extractor) Extract(reader io.Reader) error { return nil } -func (te *Extractor) extractDir(h *tar.Header, depth int, rootExists bool) error { - pathElements := strings.Split(h.Name, "/") - if !rootExists { - pathElements = pathElements[1:] - } - path := fp.Join(pathElements...) - path = fp.Join(te.Path, path) +// outputPath returns the path at whicht o place tarPath +func (te *Extractor) outputPath(tarPath string) string { + elems := strings.Split(tarPath, "/") // break into elems + elems = elems[1:] // remove original root + + path := fp.Join(elems...) // join elems + path = fp.Join(te.Path, path) // rebase on extractor root + return path +} + +func (te *Extractor) extractDir(h *tar.Header, depth int) error { + path := te.outputPath(h.Name) + if depth == 0 { // if this is the root root directory, use it as the output path for remaining files te.Path = path @@ -73,18 +80,18 @@ func (te *Extractor) extractDir(h *tar.Header, depth int, rootExists bool) error } func (te *Extractor) extractFile(h *tar.Header, r *tar.Reader, depth int, rootExists bool, rootIsDir bool) error { - path := te.Path - if depth == 0 && rootExists { - // if depth is 0, this is the only file (we aren't 'ipfs get'ing a directory) - if rootIsDir { // putting file inside of a root dir. - path = fp.Join(te.Path, h.Name) - } - // else if the file exists, just overwrite it. - } else { - // we are outputting a directory, this file is inside of it - pathElements := strings.Split(h.Name, "/")[1:] - path = fp.Join(pathElements...) - path = fp.Join(te.Path, path) + path := te.outputPath(h.Name) + + if depth == 0 { // if depth is 0, this is the only file (we aren't 'ipfs get'ing a directory) + if rootExists && rootIsDir { + // putting file inside of a root dir. + fnameo := gopath.Base(h.Name) + fnamen := fp.Base(path) + // add back original name if lost. + if fnameo != fnamen { + path = fp.Join(path, fnameo) + } + } // else if old file exists, just overwrite it. } file, err := os.Create(path) diff --git a/unixfs/tar/reader.go b/unixfs/tar/reader.go deleted file mode 100644 index 2c55b1bbc..000000000 --- a/unixfs/tar/reader.go +++ /dev/null @@ -1,221 +0,0 @@ -package tar - -import ( - "archive/tar" - "bytes" - "compress/gzip" - "fmt" - "io" - gopath "path" - "time" - - proto "github.com/ipfs/go-ipfs/Godeps/_workspace/src/github.com/gogo/protobuf/proto" - cxt "github.com/ipfs/go-ipfs/Godeps/_workspace/src/golang.org/x/net/context" - - mdag "github.com/ipfs/go-ipfs/merkledag" - path "github.com/ipfs/go-ipfs/path" - uio "github.com/ipfs/go-ipfs/unixfs/io" - upb "github.com/ipfs/go-ipfs/unixfs/pb" -) - -type Reader struct { - buf bytes.Buffer - closed bool - signalChan chan struct{} - dag mdag.DAGService - resolver *path.Resolver - writer *tar.Writer - gzipWriter *gzip.Writer - err error -} - -func NewReader(ctx cxt.Context, path path.Path, dag mdag.DAGService, dagnode *mdag.Node, compression int) (*Reader, error) { - - reader := &Reader{ - signalChan: make(chan struct{}), - dag: dag, - } - - var err error - if compression != gzip.NoCompression { - reader.gzipWriter, err = gzip.NewWriterLevel(&reader.buf, compression) - if err != nil { - return nil, err - } - reader.writer = tar.NewWriter(reader.gzipWriter) - } else { - reader.writer = tar.NewWriter(&reader.buf) - } - - // writeToBuf will write the data to the buffer, and will signal when there - // is new data to read - _, filename := gopath.Split(path.String()) - go func() { - if err := reader.writeNodeToBuf(ctx, dagnode, filename, 0); err != nil { - reader.emitError(err) - } - }() - return reader, nil -} - -func (r *Reader) writeDirToBuf(ctx cxt.Context, nd *mdag.Node, path string, depth int) error { - if err := writeDirHeader(r.writer, path); err != nil { - return err - } - r.flush() - - for i, ng := range r.dag.GetDAG(ctx, nd) { - child, err := ng.Get(ctx) - if err != nil { - return err - } - - npath := gopath.Join(path, nd.Links[i].Name) - if err := r.writeNodeToBuf(ctx, child, npath, depth+1); err != nil { - return err - } - } - - return nil -} - -func (r *Reader) writeFileToBuf(ctx cxt.Context, nd *mdag.Node, pb *upb.Data, path string, depth int) error { - if err := writeFileHeader(r.writer, path, pb.GetFilesize()); err != nil { - return err - } - r.flush() - - reader, err := uio.NewDagReader(ctx, nd, r.dag) - if err != nil { - return err - } - - if err := r.syncCopy(reader); err != nil { - return err - } - - return nil -} - -func (r *Reader) writeNodeToBuf(ctx cxt.Context, nd *mdag.Node, path string, depth int) error { - pb := new(upb.Data) - if err := proto.Unmarshal(nd.Data, pb); err != nil { - return err - } - - if depth == 0 { - defer r.close() - } - - switch pb.GetType() { - case upb.Data_Directory: - return r.writeDirToBuf(ctx, nd, path, depth) - case upb.Data_File: - return r.writeFileToBuf(ctx, nd, pb, path, depth) - default: - return fmt.Errorf("unixfs type not supported: %s", pb.GetType()) - } -} - -func (r *Reader) Read(p []byte) (int, error) { - // wait for the goroutine that is writing data to the buffer to tell us - // there is something to read - if !r.closed { - <-r.signalChan - } - - if r.err != nil { - return 0, r.err - } - - if !r.closed { - defer r.signal() - } - - if r.buf.Len() == 0 { - if r.closed { - return 0, io.EOF - } - return 0, nil - } - - n, err := r.buf.Read(p) - if err == io.EOF && !r.closed || r.buf.Len() > 0 { - return n, nil - } - - return n, err -} - -func (r *Reader) signal() { - r.signalChan <- struct{}{} -} - -func (r *Reader) flush() { - r.signal() - <-r.signalChan -} - -func (r *Reader) emitError(err error) { - r.err = err - r.signal() -} - -func (r *Reader) close() { - r.closed = true - defer r.signal() - err := r.writer.Close() - if err != nil { - r.emitError(err) - return - } - if r.gzipWriter != nil { - err = r.gzipWriter.Close() - if err != nil { - r.emitError(err) - return - } - } -} - -func (r *Reader) syncCopy(reader io.Reader) error { - buf := make([]byte, 32*1024) - for { - nr, err := reader.Read(buf) - if nr > 0 { - _, err := r.writer.Write(buf[:nr]) - if err != nil { - return err - } - r.flush() - } - if err == io.EOF { - break - } - if err != nil { - return err - } - } - return nil -} - -func writeDirHeader(w *tar.Writer, path string) error { - return w.WriteHeader(&tar.Header{ - Name: path, - Typeflag: tar.TypeDir, - Mode: 0777, - ModTime: time.Now(), - // TODO: set mode, dates, etc. when added to unixFS - }) -} - -func writeFileHeader(w *tar.Writer, path string, size uint64) error { - return w.WriteHeader(&tar.Header{ - Name: path, - Size: int64(size), - Typeflag: tar.TypeReg, - Mode: 0644, - ModTime: time.Now(), - // TODO: set mode, dates, etc. when added to unixFS - }) -} diff --git a/unixfs/tar/writer.go b/unixfs/tar/writer.go new file mode 100644 index 000000000..125beed96 --- /dev/null +++ b/unixfs/tar/writer.go @@ -0,0 +1,170 @@ +package tar + +import ( + "archive/tar" + "bufio" + "compress/gzip" + "fmt" + "io" + "path" + "time" + + proto "github.com/ipfs/go-ipfs/Godeps/_workspace/src/github.com/gogo/protobuf/proto" + cxt "github.com/ipfs/go-ipfs/Godeps/_workspace/src/golang.org/x/net/context" + + mdag "github.com/ipfs/go-ipfs/merkledag" + uio "github.com/ipfs/go-ipfs/unixfs/io" + upb "github.com/ipfs/go-ipfs/unixfs/pb" +) + +// DefaultBufSize is the buffer size for gets. for now, 1MB, which is ~4 blocks. +// TODO: does this need to be configurable? +var DefaultBufSize = 1048576 + +func DagArchive(ctx cxt.Context, nd *mdag.Node, name string, dag mdag.DAGService, compression int) (io.Reader, error) { + + _, filename := path.Split(name) + + // need to connect a writer to a reader + piper, pipew := io.Pipe() + + // use a buffered writer to parallelize task + bufw := bufio.NewWriterSize(pipew, DefaultBufSize) + + // construct the tar writer + w, err := NewWriter(bufw, dag, compression) + if err != nil { + return nil, err + } + + // write all the nodes recursively + go func() { + if err := w.WriteNode(ctx, nd, filename); err != nil { + pipew.CloseWithError(err) + return + } + + if err := bufw.Flush(); err != nil { + pipew.CloseWithError(err) + return + } + + pipew.Close() // everything seems to be ok. + }() + + return piper, nil +} + +// Writer is a utility structure that helps to write +// unixfs merkledag nodes as a tar archive format. +// It wraps any io.Writer. +type Writer struct { + Dag mdag.DAGService + TarW *tar.Writer +} + +// NewWriter wraps given io.Writer. +// compression determines whether to use gzip compression. +func NewWriter(w io.Writer, dag mdag.DAGService, compression int) (*Writer, error) { + + if compression != gzip.NoCompression { + var err error + w, err = gzip.NewWriterLevel(w, compression) + if err != nil { + return nil, err + } + } + + return &Writer{ + Dag: dag, + TarW: tar.NewWriter(w), + }, nil +} + +func (w *Writer) WriteDir(ctx cxt.Context, nd *mdag.Node, fpath string) error { + if err := writeDirHeader(w.TarW, fpath); err != nil { + return err + } + + for i, ng := range w.Dag.GetDAG(ctx, nd) { + child, err := ng.Get(ctx) + if err != nil { + return err + } + + npath := path.Join(fpath, nd.Links[i].Name) + if err := w.WriteNode(ctx, child, npath); err != nil { + return err + } + } + + return nil +} + +func (w *Writer) WriteFile(ctx cxt.Context, nd *mdag.Node, fpath string) error { + pb := new(upb.Data) + if err := proto.Unmarshal(nd.Data, pb); err != nil { + return err + } + + return w.writeFile(ctx, nd, pb, fpath) +} + +func (w *Writer) writeFile(ctx cxt.Context, nd *mdag.Node, pb *upb.Data, fpath string) error { + if err := writeFileHeader(w.TarW, fpath, pb.GetFilesize()); err != nil { + return err + } + + dagr, err := uio.NewDagReader(ctx, nd, w.Dag) + if err != nil { + return err + } + + _, err = io.Copy(w.TarW, dagr) + if err != nil && err != io.EOF { + return err + } + + return nil +} + +func (w *Writer) WriteNode(ctx cxt.Context, nd *mdag.Node, fpath string) error { + pb := new(upb.Data) + if err := proto.Unmarshal(nd.Data, pb); err != nil { + return err + } + + switch pb.GetType() { + case upb.Data_Directory: + return w.WriteDir(ctx, nd, fpath) + case upb.Data_File: + return w.writeFile(ctx, nd, pb, fpath) + default: + return fmt.Errorf("unixfs type not supported: %s", pb.GetType()) + } +} + +func (w *Writer) Close() error { + return w.TarW.Close() +} + +func writeDirHeader(w *tar.Writer, fpath string) error { + return w.WriteHeader(&tar.Header{ + Name: fpath, + Typeflag: tar.TypeDir, + Mode: 0777, + ModTime: time.Now(), + // TODO: set mode, dates, etc. when added to unixFS + }) +} + +func writeFileHeader(w *tar.Writer, fpath string, size uint64) error { + return w.WriteHeader(&tar.Header{ + Name: fpath, + Size: int64(size), + Typeflag: tar.TypeReg, + Mode: 0644, + ModTime: time.Now(), + // TODO: set mode, dates, etc. when added to unixFS + }) +}