1
0
mirror of https://github.com/ipfs/kubo.git synced 2025-07-30 16:01:06 +08:00

get: fix bug + improvements

up until now there has been a very annoying bug with get, we would
get halting behavior. I'm not 100% sure this commit fixes it,
but it should. It certainly fixes others found in the process of
digging into the get / tar extractor code. (wish we could repro
the bug reliably enough to make a test case).

This is a much cleaner tar writer. the ad-hoc, error-prone synch
for the tar reader is gone (with i believe was incorrect). it is
replaced with a simple pipe and bufio. The tar logic is now in
tar.Writer, which writes unixfs dag nodes into a tar archive (no
need for synch here). And get's reader is constructed with DagArchive
which sets up the pipe + bufio.

NOTE: this commit also changes this behavior of `get`:
When retrieving a single file, if the file exists, get would fail.
this emulated the behavior of wget by default, which (without opts)
does not overwrite if the file is there. This change makes get
fail if the file is available locally. This seems more intuitive to
me as expected from a unix tool-- though perhaps it should be
discussed more before adopting.

Everything seems to work fine, and i have not been able to reproduce
the get halt bug.

License: MIT
Signed-off-by: Juan Batiz-Benet <juan@benet.ai>
This commit is contained in:
Juan Batiz-Benet
2015-08-04 12:14:58 +02:00
parent 741cf7e793
commit f105ce439f
5 changed files with 275 additions and 287 deletions
core/commands
test/sharness
thirdparty/tar
unixfs/tar

@ -98,54 +98,87 @@ may also specify the level of compression by specifying '-l=<1-9>'.
return
}
if archive, _, _ := req.Option("archive").Bool(); archive || cmplvl != gzip.NoCompression {
if archive && !strings.HasSuffix(outPath, ".tar") {
outPath += ".tar"
}
if cmplvl != gzip.NoCompression {
outPath += ".gz"
}
fmt.Printf("Saving archive to %s\n", outPath)
archive, _, _ := req.Option("archive").Bool()
file, err := os.Create(outPath)
if err != nil {
res.SetError(err, cmds.ErrNormal)
return
}
defer file.Close()
bar := pb.New(0).SetUnits(pb.U_BYTES)
bar.Output = os.Stderr
pbReader := bar.NewProxyReader(outReader)
bar.Start()
defer bar.Finish()
if _, err := io.Copy(file, pbReader); err != nil {
res.SetError(err, cmds.ErrNormal)
return
}
gw := getWriter{
Out: os.Stdout,
Err: os.Stderr,
Archive: archive,
Compression: cmplvl,
}
if err := gw.Write(outReader, outPath); err != nil {
res.SetError(err, cmds.ErrNormal)
return
}
fmt.Printf("Saving file(s) to %s\n", outPath)
// TODO: get total length of files
bar := pb.New(0).SetUnits(pb.U_BYTES)
bar.Output = os.Stderr
// wrap the reader with the progress bar proxy reader
reader := bar.NewProxyReader(outReader)
bar.Start()
defer bar.Finish()
extractor := &tar.Extractor{outPath}
if err := extractor.Extract(reader); err != nil {
res.SetError(err, cmds.ErrNormal)
}
},
}
func progressBarForReader(out io.Writer, r io.Reader) (*pb.ProgressBar, *pb.Reader) {
// setup bar reader
// TODO: get total length of files
bar := pb.New(0).SetUnits(pb.U_BYTES)
bar.Output = out
barR := bar.NewProxyReader(r)
return bar, barR
}
type getWriter struct {
Out io.Writer // for output to user
Err io.Writer // for progress bar output
Archive bool
Compression int
}
func (gw *getWriter) Write(r io.Reader, fpath string) error {
if gw.Archive || gw.Compression != gzip.NoCompression {
return gw.writeArchive(r, fpath)
}
return gw.writeExtracted(r, fpath)
}
func (gw *getWriter) writeArchive(r io.Reader, fpath string) error {
// adjust file name if tar
if gw.Archive {
if !strings.HasSuffix(fpath, ".tar") && !strings.HasSuffix(fpath, ".tar.gz") {
fpath += ".tar"
}
}
// adjust file name if gz
if gw.Compression != gzip.NoCompression {
if !strings.HasSuffix(fpath, ".gz") {
fpath += ".gz"
}
}
// create file
file, err := os.Create(fpath)
if err != nil {
return err
}
defer file.Close()
fmt.Fprintf(gw.Out, "Saving archive to %s\n", fpath)
bar, barR := progressBarForReader(gw.Err, r)
bar.Start()
defer bar.Finish()
_, err = io.Copy(file, barR)
return err
}
func (gw *getWriter) writeExtracted(r io.Reader, fpath string) error {
fmt.Fprintf(gw.Out, "Saving file(s) to %s\n", fpath)
bar, barR := progressBarForReader(gw.Err, r)
bar.Start()
defer bar.Finish()
extractor := &tar.Extractor{fpath}
return extractor.Extract(barR)
}
func getCompressOptions(req cmds.Request) (int, error) {
cmprs, _, _ := req.Option("compress").Bool()
cmplvl, cmplvlFound, _ := req.Option("compression-level").Int()
@ -161,12 +194,12 @@ func getCompressOptions(req cmds.Request) (int, error) {
}
func get(ctx context.Context, node *core.IpfsNode, p path.Path, compression int) (io.Reader, error) {
dagnode, err := core.Resolve(ctx, node, p)
dn, err := core.Resolve(ctx, node, p)
if err != nil {
return nil, err
}
return utar.NewReader(ctx, p, node.DAG, dagnode, compression)
return utar.DagArchive(ctx, dn, p.String(), node.DAG, compression)
}
// getZip is equivalent to `ipfs getdag $hash | gzip`