mirror of
https://github.com/ipfs/kubo.git
synced 2025-06-29 09:34:03 +08:00
add writerAt for fuse writes
This commit is contained in:
@ -1,14 +1,13 @@
|
|||||||
package ipns
|
package ipns
|
||||||
|
|
||||||
import (
|
import (
|
||||||
|
"bytes"
|
||||||
"fmt"
|
"fmt"
|
||||||
"io/ioutil"
|
"io/ioutil"
|
||||||
"os"
|
"os"
|
||||||
"path/filepath"
|
"path/filepath"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
"bytes"
|
|
||||||
|
|
||||||
"github.com/jbenet/go-ipfs/Godeps/_workspace/src/bazil.org/fuse"
|
"github.com/jbenet/go-ipfs/Godeps/_workspace/src/bazil.org/fuse"
|
||||||
"github.com/jbenet/go-ipfs/Godeps/_workspace/src/bazil.org/fuse/fs"
|
"github.com/jbenet/go-ipfs/Godeps/_workspace/src/bazil.org/fuse/fs"
|
||||||
"github.com/jbenet/go-ipfs/Godeps/_workspace/src/code.google.com/p/goprotobuf/proto"
|
"github.com/jbenet/go-ipfs/Godeps/_workspace/src/code.google.com/p/goprotobuf/proto"
|
||||||
@ -205,7 +204,7 @@ type Node struct {
|
|||||||
cached *mdag.PBData
|
cached *mdag.PBData
|
||||||
|
|
||||||
// For writing
|
// For writing
|
||||||
dataBuf *bytes.Buffer
|
writerBuf WriteAtBuf
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *Node) loadData() error {
|
func (s *Node) loadData() error {
|
||||||
@ -290,21 +289,25 @@ func (s *Node) ReadAll(intr fs.Intr) ([]byte, fuse.Error) {
|
|||||||
}
|
}
|
||||||
// this is a terrible function... 'ReadAll'?
|
// this is a terrible function... 'ReadAll'?
|
||||||
// what if i have a 6TB file? GG RAM.
|
// what if i have a 6TB file? GG RAM.
|
||||||
return ioutil.ReadAll(r)
|
b, err := ioutil.ReadAll(r)
|
||||||
|
if err != nil {
|
||||||
|
log.Error("[%s] Readall error: %s", s.name, err)
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
if len(b) > 4 {
|
||||||
|
log.Debug("ReadAll trailing bytes: %v", b[len(b)-4:])
|
||||||
|
}
|
||||||
|
return b, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (n *Node) Write(req *fuse.WriteRequest, resp *fuse.WriteResponse, intr fs.Intr) fuse.Error {
|
func (n *Node) Write(req *fuse.WriteRequest, resp *fuse.WriteResponse, intr fs.Intr) fuse.Error {
|
||||||
log.Debug("ipns: Node Write [%s]: flags = %s, offset = %d, size = %d", n.name, req.Flags.String(), req.Offset, len(req.Data))
|
log.Debug("ipns: Node Write [%s]: flags = %s, offset = %d, size = %d", n.name, req.Flags.String(), req.Offset, len(req.Data))
|
||||||
if n.dataBuf == nil {
|
if n.writerBuf == nil {
|
||||||
n.dataBuf = new(bytes.Buffer)
|
n.writerBuf = NewWriterAtFromBytes(nil)
|
||||||
}
|
}
|
||||||
if req.Offset == 0 {
|
_, err := n.writerBuf.WriteAt(req.Data, req.Offset)
|
||||||
n.dataBuf.Reset()
|
if err != nil {
|
||||||
n.dataBuf.Write(req.Data)
|
return err
|
||||||
resp.Size = len(req.Data)
|
|
||||||
} else {
|
|
||||||
log.Error("Unhandled write to offset!")
|
|
||||||
n.dataBuf = nil
|
|
||||||
}
|
}
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
@ -312,7 +315,7 @@ func (n *Node) Write(req *fuse.WriteRequest, resp *fuse.WriteResponse, intr fs.I
|
|||||||
func (n *Node) Flush(req *fuse.FlushRequest, intr fs.Intr) fuse.Error {
|
func (n *Node) Flush(req *fuse.FlushRequest, intr fs.Intr) fuse.Error {
|
||||||
log.Debug("Got flush request [%s]!", n.name)
|
log.Debug("Got flush request [%s]!", n.name)
|
||||||
|
|
||||||
if n.dataBuf != nil {
|
if n.writerBuf != nil {
|
||||||
//TODO:
|
//TODO:
|
||||||
// This operation holds everything in memory,
|
// This operation holds everything in memory,
|
||||||
// should be changed to stream the block creation/storage
|
// should be changed to stream the block creation/storage
|
||||||
@ -321,9 +324,10 @@ func (n *Node) Flush(req *fuse.FlushRequest, intr fs.Intr) fuse.Error {
|
|||||||
//NOTE:
|
//NOTE:
|
||||||
// This should only occur on a file object, if this were to be a
|
// This should only occur on a file object, if this were to be a
|
||||||
// folder, bad things would happen.
|
// folder, bad things would happen.
|
||||||
newNode, err := imp.NewDagFromReader(n.dataBuf)
|
buf := bytes.NewReader(n.writerBuf.Bytes())
|
||||||
|
newNode, err := imp.NewDagFromReader(buf)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.Critical("error creating dag from dataBuf: %s", err)
|
log.Critical("error creating dag from writerBuf: %s", err)
|
||||||
return fuse.ENODATA
|
return fuse.ENODATA
|
||||||
}
|
}
|
||||||
if n.parent != nil {
|
if n.parent != nil {
|
||||||
@ -350,7 +354,7 @@ func (n *Node) Flush(req *fuse.FlushRequest, intr fs.Intr) fuse.Error {
|
|||||||
fmt.Println(string(b))
|
fmt.Println(string(b))
|
||||||
//
|
//
|
||||||
|
|
||||||
n.dataBuf = nil
|
n.writerBuf = nil
|
||||||
|
|
||||||
n.wasChanged()
|
n.wasChanged()
|
||||||
}
|
}
|
||||||
@ -382,7 +386,7 @@ func (n *Node) republishRoot() error {
|
|||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
n.dataBuf = nil
|
n.writerBuf = nil
|
||||||
|
|
||||||
ndkey, err := root.Nd.Key()
|
ndkey, err := root.Nd.Key()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
29
fuse/ipns/writerat.go
Normal file
29
fuse/ipns/writerat.go
Normal file
@ -0,0 +1,29 @@
|
|||||||
|
package ipns
|
||||||
|
|
||||||
|
import "io"
|
||||||
|
|
||||||
|
type WriteAtBuf interface {
|
||||||
|
io.WriterAt
|
||||||
|
Bytes() []byte
|
||||||
|
}
|
||||||
|
|
||||||
|
type writerAt struct {
|
||||||
|
buf []byte
|
||||||
|
}
|
||||||
|
|
||||||
|
func NewWriterAtFromBytes(b []byte) WriteAtBuf {
|
||||||
|
return &writerAt{b}
|
||||||
|
}
|
||||||
|
|
||||||
|
// TODO: make this better in the future, this is just a quick hack for now
|
||||||
|
func (wa *writerAt) WriteAt(p []byte, off int64) (int, error) {
|
||||||
|
if off+int64(len(p)) > int64(len(wa.buf)) {
|
||||||
|
wa.buf = append(wa.buf, make([]byte, (int(off)+len(p))-len(wa.buf))...)
|
||||||
|
}
|
||||||
|
copy(wa.buf[off:], p)
|
||||||
|
return len(p), nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (wa *writerAt) Bytes() []byte {
|
||||||
|
return wa.buf
|
||||||
|
}
|
Reference in New Issue
Block a user