1
0
mirror of https://github.com/ipfs/kubo.git synced 2025-06-25 23:21:54 +08:00

Refactor ipfs get

License: MIT
Signed-off-by: rht <rhtbot@gmail.com>
This commit is contained in:
rht
2015-08-10 04:05:41 +07:00
parent 3dfe02aa95
commit dfa0351df9
4 changed files with 70 additions and 103 deletions

View File

@ -1,7 +1,6 @@
package commands package commands
import ( import (
"bufio"
"compress/gzip" "compress/gzip"
"errors" "errors"
"fmt" "fmt"
@ -11,13 +10,11 @@ import (
"strings" "strings"
"github.com/ipfs/go-ipfs/Godeps/_workspace/src/github.com/cheggaaa/pb" "github.com/ipfs/go-ipfs/Godeps/_workspace/src/github.com/cheggaaa/pb"
context "github.com/ipfs/go-ipfs/Godeps/_workspace/src/golang.org/x/net/context"
cmds "github.com/ipfs/go-ipfs/commands" cmds "github.com/ipfs/go-ipfs/commands"
core "github.com/ipfs/go-ipfs/core" core "github.com/ipfs/go-ipfs/core"
path "github.com/ipfs/go-ipfs/path" path "github.com/ipfs/go-ipfs/path"
tar "github.com/ipfs/go-ipfs/thirdparty/tar" tar "github.com/ipfs/go-ipfs/thirdparty/tar"
uio "github.com/ipfs/go-ipfs/unixfs/io"
utar "github.com/ipfs/go-ipfs/unixfs/tar" utar "github.com/ipfs/go-ipfs/unixfs/tar"
) )
@ -64,15 +61,16 @@ may also specify the level of compression by specifying '-l=<1-9>'.
res.SetError(err, cmds.ErrNormal) res.SetError(err, cmds.ErrNormal)
return return
} }
p := path.Path(req.Arguments()[0]) p := path.Path(req.Arguments()[0])
var reader io.Reader ctx := req.Context()
if archive, _, _ := req.Option("archive").Bool(); !archive && cmplvl != gzip.NoCompression { dn, err := core.Resolve(ctx, node, p)
// only use this when the flag is '-C' without '-a' if err != nil {
reader, err = getZip(req.Context(), node, p, cmplvl) res.SetError(err, cmds.ErrNormal)
} else { return
reader, err = get(req.Context(), node, p, cmplvl)
} }
archive, _, _ := req.Option("archive").Bool()
reader, err := utar.DagArchive(ctx, dn, p.String(), node.DAG, archive, cmplvl)
if err != nil { if err != nil {
res.SetError(err, cmds.ErrNormal) res.SetError(err, cmds.ErrNormal)
return return
@ -192,42 +190,3 @@ func getCompressOptions(req cmds.Request) (int, error) {
} }
return gzip.NoCompression, nil return gzip.NoCompression, nil
} }
func get(ctx context.Context, node *core.IpfsNode, p path.Path, compression int) (io.Reader, error) {
dn, err := core.Resolve(ctx, node, p)
if err != nil {
return nil, err
}
return utar.DagArchive(ctx, dn, p.String(), node.DAG, compression)
}
// getZip is equivalent to `ipfs getdag $hash | gzip`
func getZip(ctx context.Context, node *core.IpfsNode, p path.Path, compression int) (io.Reader, error) {
dagnode, err := core.Resolve(ctx, node, p)
if err != nil {
return nil, err
}
reader, err := uio.NewDagReader(ctx, dagnode, node.DAG)
if err != nil {
return nil, err
}
pr, pw := io.Pipe()
gw, err := gzip.NewWriterLevel(pw, compression)
if err != nil {
return nil, err
}
bufin := bufio.NewReader(reader)
go func() {
_, err := bufin.WriteTo(gw)
if err != nil {
log.Error("Fail to compress the stream")
}
gw.Close()
pw.Close()
}()
return pr, nil
}

View File

@ -180,7 +180,7 @@ func (s *Node) Read(ctx context.Context, req *fuse.ReadRequest, resp *fuse.ReadR
return err return err
} }
buf := resp.Data[:min(req.Size, int(r.Size()-req.Offset))] buf := resp.Data[:min(req.Size, int(int64(r.Size())-req.Offset))]
n, err := io.ReadFull(r, buf) n, err := io.ReadFull(r, buf)
if err != nil && err != io.EOF { if err != nil && err != io.EOF {
return err return err

View File

@ -58,8 +58,7 @@ type ReadSeekCloser interface {
// node, using the passed in DAGService for data retreival // node, using the passed in DAGService for data retreival
func NewDagReader(ctx context.Context, n *mdag.Node, serv mdag.DAGService) (*DagReader, error) { func NewDagReader(ctx context.Context, n *mdag.Node, serv mdag.DAGService) (*DagReader, error) {
pb := new(ftpb.Data) pb := new(ftpb.Data)
err := proto.Unmarshal(n.Data, pb) if err := proto.Unmarshal(n.Data, pb); err != nil {
if err != nil {
return nil, err return nil, err
} }
@ -70,7 +69,7 @@ func NewDagReader(ctx context.Context, n *mdag.Node, serv mdag.DAGService) (*Dag
case ftpb.Data_Raw: case ftpb.Data_Raw:
fallthrough fallthrough
case ftpb.Data_File: case ftpb.Data_File:
return newDataFileReader(ctx, n, pb, serv), nil return NewDataFileReader(ctx, n, pb, serv), nil
case ftpb.Data_Metadata: case ftpb.Data_Metadata:
if len(n.Links) == 0 { if len(n.Links) == 0 {
return nil, errors.New("incorrectly formatted metadata object") return nil, errors.New("incorrectly formatted metadata object")
@ -85,7 +84,7 @@ func NewDagReader(ctx context.Context, n *mdag.Node, serv mdag.DAGService) (*Dag
} }
} }
func newDataFileReader(ctx context.Context, n *mdag.Node, pb *ftpb.Data, serv mdag.DAGService) *DagReader { func NewDataFileReader(ctx context.Context, n *mdag.Node, pb *ftpb.Data, serv mdag.DAGService) *DagReader {
fctx, cancel := context.WithCancel(ctx) fctx, cancel := context.WithCancel(ctx)
promises := serv.GetDAG(fctx, n) promises := serv.GetDAG(fctx, n)
return &DagReader{ return &DagReader{
@ -124,7 +123,7 @@ func (dr *DagReader) precalcNextBuf(ctx context.Context) error {
// A directory should not exist within a file // A directory should not exist within a file
return ft.ErrInvalidDirLocation return ft.ErrInvalidDirLocation
case ftpb.Data_File: case ftpb.Data_File:
dr.buf = newDataFileReader(dr.ctx, nxt, pb, dr.serv) dr.buf = NewDataFileReader(dr.ctx, nxt, pb, dr.serv)
return nil return nil
case ftpb.Data_Raw: case ftpb.Data_Raw:
dr.buf = NewRSNCFromBytes(pb.GetData()) dr.buf = NewRSNCFromBytes(pb.GetData())
@ -137,8 +136,8 @@ func (dr *DagReader) precalcNextBuf(ctx context.Context) error {
} }
// Size return the total length of the data from the DAG structured file. // Size return the total length of the data from the DAG structured file.
func (dr *DagReader) Size() int64 { func (dr *DagReader) Size() uint64 {
return int64(dr.pbdata.GetFilesize()) return dr.pbdata.GetFilesize()
} }
// Read reads data from the DAG structured file // Read reads data from the DAG structured file

View File

@ -4,7 +4,6 @@ import (
"archive/tar" "archive/tar"
"bufio" "bufio"
"compress/gzip" "compress/gzip"
"fmt"
"io" "io"
"path" "path"
"time" "time"
@ -13,6 +12,7 @@ import (
cxt "github.com/ipfs/go-ipfs/Godeps/_workspace/src/golang.org/x/net/context" cxt "github.com/ipfs/go-ipfs/Godeps/_workspace/src/golang.org/x/net/context"
mdag "github.com/ipfs/go-ipfs/merkledag" mdag "github.com/ipfs/go-ipfs/merkledag"
ft "github.com/ipfs/go-ipfs/unixfs"
uio "github.com/ipfs/go-ipfs/unixfs/io" uio "github.com/ipfs/go-ipfs/unixfs/io"
upb "github.com/ipfs/go-ipfs/unixfs/pb" upb "github.com/ipfs/go-ipfs/unixfs/pb"
) )
@ -21,7 +21,8 @@ import (
// TODO: does this need to be configurable? // TODO: does this need to be configurable?
var DefaultBufSize = 1048576 var DefaultBufSize = 1048576
func DagArchive(ctx cxt.Context, nd *mdag.Node, name string, dag mdag.DAGService, compression int) (io.Reader, error) { // DagArchive is equivalent to `ipfs getdag $hash | maybe_tar | maybe_gzip`
func DagArchive(ctx cxt.Context, nd *mdag.Node, name string, dag mdag.DAGService, archive bool, compression int) (io.Reader, error) {
_, filename := path.Split(name) _, filename := path.Split(name)
@ -31,17 +32,44 @@ func DagArchive(ctx cxt.Context, nd *mdag.Node, name string, dag mdag.DAGService
// use a buffered writer to parallelize task // use a buffered writer to parallelize task
bufw := bufio.NewWriterSize(pipew, DefaultBufSize) bufw := bufio.NewWriterSize(pipew, DefaultBufSize)
// compression determines whether to use gzip compression.
var maybeGzw io.Writer
if compression != gzip.NoCompression {
var err error
maybeGzw, err = gzip.NewWriterLevel(bufw, compression)
if err != nil {
return nil, err
}
} else {
maybeGzw = bufw
}
// construct the tar writer // construct the tar writer
w, err := NewWriter(bufw, dag, compression) w, err := NewWriter(ctx, dag, archive, compression, maybeGzw)
if err != nil { if err != nil {
return nil, err return nil, err
} }
// write all the nodes recursively // write all the nodes recursively
go func() { go func() {
if err := w.WriteNode(ctx, nd, filename); err != nil { if !archive && compression != gzip.NoCompression {
pipew.CloseWithError(err) // the case when the node is a file
return dagr, err := uio.NewDagReader(w.ctx, nd, w.Dag)
if err != nil {
pipew.CloseWithError(err)
return
}
if _, err := dagr.WriteTo(maybeGzw); err != nil {
pipew.CloseWithError(err)
return
}
} else {
// the case for 1. archive, and 2. not archived and not compressed, in which tar is used anyway as a transport format
if err := w.WriteNode(nd, filename); err != nil {
pipew.CloseWithError(err)
return
}
} }
if err := bufw.Flush(); err != nil { if err := bufw.Flush(); err != nil {
@ -49,6 +77,7 @@ func DagArchive(ctx cxt.Context, nd *mdag.Node, name string, dag mdag.DAGService
return return
} }
w.Close()
pipew.Close() // everything seems to be ok. pipew.Close() // everything seems to be ok.
}() }()
@ -61,39 +90,32 @@ func DagArchive(ctx cxt.Context, nd *mdag.Node, name string, dag mdag.DAGService
type Writer struct { type Writer struct {
Dag mdag.DAGService Dag mdag.DAGService
TarW *tar.Writer TarW *tar.Writer
ctx cxt.Context
} }
// NewWriter wraps given io.Writer. // NewWriter wraps given io.Writer.
// compression determines whether to use gzip compression. func NewWriter(ctx cxt.Context, dag mdag.DAGService, archive bool, compression int, w io.Writer) (*Writer, error) {
func NewWriter(w io.Writer, dag mdag.DAGService, compression int) (*Writer, error) {
if compression != gzip.NoCompression {
var err error
w, err = gzip.NewWriterLevel(w, compression)
if err != nil {
return nil, err
}
}
return &Writer{ return &Writer{
Dag: dag, Dag: dag,
TarW: tar.NewWriter(w), TarW: tar.NewWriter(w),
ctx: ctx,
}, nil }, nil
} }
func (w *Writer) WriteDir(ctx cxt.Context, nd *mdag.Node, fpath string) error { func (w *Writer) writeDir(nd *mdag.Node, fpath string) error {
if err := writeDirHeader(w.TarW, fpath); err != nil { if err := writeDirHeader(w.TarW, fpath); err != nil {
return err return err
} }
for i, ng := range w.Dag.GetDAG(ctx, nd) { for i, ng := range w.Dag.GetDAG(w.ctx, nd) {
child, err := ng.Get(ctx) child, err := ng.Get(w.ctx)
if err != nil { if err != nil {
return err return err
} }
npath := path.Join(fpath, nd.Links[i].Name) npath := path.Join(fpath, nd.Links[i].Name)
if err := w.WriteNode(ctx, child, npath); err != nil { if err := w.WriteNode(child, npath); err != nil {
return err return err
} }
} }
@ -101,46 +123,33 @@ func (w *Writer) WriteDir(ctx cxt.Context, nd *mdag.Node, fpath string) error {
return nil return nil
} }
func (w *Writer) WriteFile(ctx cxt.Context, nd *mdag.Node, fpath string) error { func (w *Writer) writeFile(nd *mdag.Node, pb *upb.Data, fpath string) error {
pb := new(upb.Data)
if err := proto.Unmarshal(nd.Data, pb); err != nil {
return err
}
return w.writeFile(ctx, nd, pb, fpath)
}
func (w *Writer) writeFile(ctx cxt.Context, nd *mdag.Node, pb *upb.Data, fpath string) error {
if err := writeFileHeader(w.TarW, fpath, pb.GetFilesize()); err != nil { if err := writeFileHeader(w.TarW, fpath, pb.GetFilesize()); err != nil {
return err return err
} }
dagr, err := uio.NewDagReader(ctx, nd, w.Dag) dagr := uio.NewDataFileReader(w.ctx, nd, pb, w.Dag)
if err != nil { _, err := dagr.WriteTo(w.TarW)
return err return err
}
_, err = io.Copy(w.TarW, dagr)
if err != nil && err != io.EOF {
return err
}
return nil
} }
func (w *Writer) WriteNode(ctx cxt.Context, nd *mdag.Node, fpath string) error { func (w *Writer) WriteNode(nd *mdag.Node, fpath string) error {
pb := new(upb.Data) pb := new(upb.Data)
if err := proto.Unmarshal(nd.Data, pb); err != nil { if err := proto.Unmarshal(nd.Data, pb); err != nil {
return err return err
} }
switch pb.GetType() { switch pb.GetType() {
case upb.Data_Metadata:
fallthrough
case upb.Data_Directory: case upb.Data_Directory:
return w.WriteDir(ctx, nd, fpath) return w.writeDir(nd, fpath)
case upb.Data_Raw:
fallthrough
case upb.Data_File: case upb.Data_File:
return w.writeFile(ctx, nd, pb, fpath) return w.writeFile(nd, pb, fpath)
default: default:
return fmt.Errorf("unixfs type not supported: %s", pb.GetType()) return ft.ErrUnrecognizedType
} }
} }