From 13c489eca1ac8565350b9566a1b194fb4ebda17e Mon Sep 17 00:00:00 2001 From: Jeromy Date: Mon, 23 Mar 2015 14:01:42 -0700 Subject: [PATCH] fix context respect through fuse reading --- core/commands/refs.go | 2 +- fuse/ipns/ipns_unix.go | 35 ++++++++++++++--------------------- ipnsfs/file.go | 7 +++++++ merkledag/merkledag.go | 6 ++++-- pin/pin.go | 2 +- unixfs/io/dagreader.go | 16 +++++++++++----- unixfs/mod/dagmodifier.go | 31 ++++++++++++++++++++++++++----- unixfs/tar/reader.go | 2 +- 8 files changed, 65 insertions(+), 36 deletions(-) diff --git a/core/commands/refs.go b/core/commands/refs.go index 6a5b8b6ca..dfede0e7a 100644 --- a/core/commands/refs.go +++ b/core/commands/refs.go @@ -254,7 +254,7 @@ func (rw *RefWriter) writeRefsRecursive(n *dag.Node) (int, error) { return count, err } - nd, err := ng.Get() + nd, err := ng.Get(rw.Ctx) if err != nil { return count, err } diff --git a/fuse/ipns/ipns_unix.go b/fuse/ipns/ipns_unix.go index cf9add9ad..0ce96ec87 100644 --- a/fuse/ipns/ipns_unix.go +++ b/fuse/ipns/ipns_unix.go @@ -6,7 +6,6 @@ package ipns import ( "errors" - "io" "os" fuse "github.com/jbenet/go-ipfs/Godeps/_workspace/src/bazil.org/fuse" @@ -274,32 +273,26 @@ func (dir *Directory) ReadDirAll(ctx context.Context) ([]fuse.Dirent, error) { } func (fi *File) Read(ctx context.Context, req *fuse.ReadRequest, resp *fuse.ReadResponse) error { - errs := make(chan error, 1) - go func() { - _, err := fi.fi.Seek(req.Offset, os.SEEK_SET) - if err != nil { - errs <- err - return - } + _, err := fi.fi.Seek(req.Offset, os.SEEK_SET) + if err != nil { + return err + } - fisize, err := fi.fi.Size() - if err != nil { - errs <- err - return - } - - readsize := min(req.Size, int(fisize-req.Offset)) - n, err := io.ReadFull(fi.fi, resp.Data[:readsize]) - resp.Data = resp.Data[:n] - errs <- err - }() + fisize, err := fi.fi.Size() + if err != nil { + return err + } select { case <-ctx.Done(): return ctx.Err() - case err := <-errs: - return err + default: } + + readsize := min(req.Size, int(fisize-req.Offset)) + n, err := fi.fi.CtxReadFull(ctx, resp.Data[:readsize]) + resp.Data = resp.Data[:n] + return err } func (fi *File) Write(ctx context.Context, req *fuse.WriteRequest, resp *fuse.WriteResponse) error { diff --git a/ipnsfs/file.go b/ipnsfs/file.go index 7d7edebd3..73560d351 100644 --- a/ipnsfs/file.go +++ b/ipnsfs/file.go @@ -51,6 +51,13 @@ func (fi *File) Read(b []byte) (int, error) { return fi.mod.Read(b) } +// Read reads into the given buffer from the current offset +func (fi *File) CtxReadFull(ctx context.Context, b []byte) (int, error) { + fi.Lock() + defer fi.Unlock() + return fi.mod.CtxReadFull(ctx, b) +} + // Close flushes, then propogates the modified dag node up the directory structure // and signals a republish to occur func (fi *File) Close() error { diff --git a/merkledag/merkledag.go b/merkledag/merkledag.go index 923a3d715..2084c200e 100644 --- a/merkledag/merkledag.go +++ b/merkledag/merkledag.go @@ -257,10 +257,10 @@ type nodePromise struct { // from its internal channels, subsequent calls will return the // cached node. type NodeGetter interface { - Get() (*Node, error) + Get(context.Context) (*Node, error) } -func (np *nodePromise) Get() (*Node, error) { +func (np *nodePromise) Get(ctx context.Context) (*Node, error) { if np.cache != nil { return np.cache, nil } @@ -270,6 +270,8 @@ func (np *nodePromise) Get() (*Node, error) { np.cache = blk case <-np.ctx.Done(): return nil, np.ctx.Err() + case <-ctx.Done(): + return nil, ctx.Err() } return np.cache, nil } diff --git a/pin/pin.go b/pin/pin.go index 5f726a457..6ec299388 100644 --- a/pin/pin.go +++ b/pin/pin.go @@ -177,7 +177,7 @@ func (p *pinner) pinLinks(node *mdag.Node) error { defer cancel() for _, ng := range p.dserv.GetDAG(ctx, node) { - subnode, err := ng.Get() + subnode, err := ng.Get(ctx) if err != nil { // TODO: Maybe just log and continue? return err diff --git a/unixfs/io/dagreader.go b/unixfs/io/dagreader.go index 64dfff127..6bb9eb406 100644 --- a/unixfs/io/dagreader.go +++ b/unixfs/io/dagreader.go @@ -100,12 +100,13 @@ func newDataFileReader(ctx context.Context, n *mdag.Node, pb *ftpb.Data, serv md // precalcNextBuf follows the next link in line and loads it from the DAGService, // setting the next buffer to read from -func (dr *DagReader) precalcNextBuf() error { +func (dr *DagReader) precalcNextBuf(ctx context.Context) error { dr.buf.Close() // Just to make sure if dr.linkPosition >= len(dr.promises) { return io.EOF } - nxt, err := dr.promises[dr.linkPosition].Get() + + nxt, err := dr.promises[dr.linkPosition].Get(ctx) if err != nil { return err } @@ -141,6 +142,11 @@ func (dr *DagReader) Size() int64 { // Read reads data from the DAG structured file func (dr *DagReader) Read(b []byte) (int, error) { + return dr.CtxReadFull(dr.ctx, b) +} + +// CtxReadFull reads data from the DAG structured file +func (dr *DagReader) CtxReadFull(ctx context.Context, b []byte) (int, error) { // If no cached buffer, load one total := 0 for { @@ -161,7 +167,7 @@ func (dr *DagReader) Read(b []byte) (int, error) { } // Otherwise, load up the next block - err = dr.precalcNextBuf() + err = dr.precalcNextBuf(ctx) if err != nil { return total, err } @@ -183,7 +189,7 @@ func (dr *DagReader) WriteTo(w io.Writer) (int64, error) { } // Otherwise, load up the next block - err = dr.precalcNextBuf() + err = dr.precalcNextBuf(dr.ctx) if err != nil { if err == io.EOF { return total, nil @@ -239,7 +245,7 @@ func (dr *DagReader) Seek(offset int64, whence int) (int64, error) { } // start sub-block request - err := dr.precalcNextBuf() + err := dr.precalcNextBuf(dr.ctx) if err != nil { return 0, err } diff --git a/unixfs/mod/dagmodifier.go b/unixfs/mod/dagmodifier.go index cd207ed91..133af2227 100644 --- a/unixfs/mod/dagmodifier.go +++ b/unixfs/mod/dagmodifier.go @@ -315,32 +315,53 @@ func (dm *DagModifier) appendData(node *mdag.Node, blks <-chan []byte) (*mdag.No // Read data from this dag starting at the current offset func (dm *DagModifier) Read(b []byte) (int, error) { - err := dm.Sync() + err := dm.readPrep() if err != nil { return 0, err } + n, err := dm.read.Read(b) + dm.curWrOff += uint64(n) + return n, err +} + +func (dm *DagModifier) readPrep() error { + err := dm.Sync() + if err != nil { + return err + } + if dm.read == nil { ctx, cancel := context.WithCancel(dm.ctx) dr, err := uio.NewDagReader(ctx, dm.curNode, dm.dagserv) if err != nil { - return 0, err + return err } i, err := dr.Seek(int64(dm.curWrOff), os.SEEK_SET) if err != nil { - return 0, err + return err } if i != int64(dm.curWrOff) { - return 0, ErrSeekFail + return ErrSeekFail } dm.readCancel = cancel dm.read = dr } - n, err := dm.read.Read(b) + return nil +} + +// Read data from this dag starting at the current offset +func (dm *DagModifier) CtxReadFull(ctx context.Context, b []byte) (int, error) { + err := dm.readPrep() + if err != nil { + return 0, err + } + + n, err := dm.read.CtxReadFull(ctx, b) dm.curWrOff += uint64(n) return n, err } diff --git a/unixfs/tar/reader.go b/unixfs/tar/reader.go index aa15c823a..26aa772ce 100644 --- a/unixfs/tar/reader.go +++ b/unixfs/tar/reader.go @@ -90,7 +90,7 @@ func (r *Reader) writeToBuf(dagnode *mdag.Node, path string, depth int) { defer cancel() for i, ng := range r.dag.GetDAG(ctx, dagnode) { - childNode, err := ng.Get() + childNode, err := ng.Get(ctx) if err != nil { r.emitError(err) return