1
0
mirror of https://github.com/ipfs/kubo.git synced 2025-06-29 01:12:24 +08:00

initial hack at turning ipfs into a daemon, just implemented simple rpc at this point

This commit is contained in:
Jeromy
2014-09-09 05:11:56 +00:00
parent 05e457e197
commit b2a218816d
9 changed files with 226 additions and 263 deletions

View File

@ -2,21 +2,14 @@ package main
import (
"fmt"
"io/ioutil"
"os"
"path/filepath"
"github.com/gonuts/flag"
"github.com/jbenet/commander"
core "github.com/jbenet/go-ipfs/core"
importer "github.com/jbenet/go-ipfs/importer"
dag "github.com/jbenet/go-ipfs/merkledag"
daemon "github.com/jbenet/go-ipfs/daemon"
u "github.com/jbenet/go-ipfs/util"
)
// Error indicating the max depth has been exceded.
var ErrDepthLimitExceeded = fmt.Errorf("depth limit exceeded")
var cmdIpfsAdd = &commander.Command{
UsageLine: "add",
Short: "Add an object to ipfs.",
@ -41,92 +34,20 @@ func addCmd(c *commander.Command, inp []string) error {
return nil
}
n, err := localNode(false)
cmd := daemon.NewCommand()
cmd.Command = "add"
fmt.Println(inp)
cmd.Args = inp
cmd.Opts["r"] = c.Flag.Lookup("r").Value.Get()
err := daemon.SendCommand(cmd, "localhost:12345")
if err != nil {
return err
}
recursive := c.Flag.Lookup("r").Value.Get().(bool)
var depth int
if recursive {
depth = -1
} else {
depth = 1
}
for _, fpath := range inp {
_, err := addPath(n, fpath, depth)
// Do locally
n, err := localNode(false)
if err != nil {
if !recursive {
return fmt.Errorf("%s is a directory. Use -r to add recursively", fpath)
}
u.PErr("error adding %s: %v\n", fpath, err)
}
}
return err
}
func addPath(n *core.IpfsNode, fpath string, depth int) (*dag.Node, error) {
if depth == 0 {
return nil, ErrDepthLimitExceeded
}
fi, err := os.Stat(fpath)
if err != nil {
return nil, err
}
if fi.IsDir() {
return addDir(n, fpath, depth)
}
return addFile(n, fpath, depth)
}
func addDir(n *core.IpfsNode, fpath string, depth int) (*dag.Node, error) {
tree := &dag.Node{Data: dag.FolderPBData()}
files, err := ioutil.ReadDir(fpath)
if err != nil {
return nil, err
}
// construct nodes for containing files.
for _, f := range files {
fp := filepath.Join(fpath, f.Name())
nd, err := addPath(n, fp, depth-1)
if err != nil {
return nil, err
return err
}
if err = tree.AddNodeLink(f.Name(), nd); err != nil {
return nil, err
}
daemon.ExecuteCommand(cmd, n, os.Stdout)
}
return tree, addNode(n, tree, fpath)
}
func addFile(n *core.IpfsNode, fpath string, depth int) (*dag.Node, error) {
root, err := importer.NewDagFromFile(fpath)
if err != nil {
return nil, err
}
return root, addNode(n, root, fpath)
}
// addNode adds the node to the graph + local storage
func addNode(n *core.IpfsNode, nd *dag.Node, fpath string) error {
// add the file to the graph + local storage
err := n.DAG.AddRecursive(nd)
if err != nil {
return err
}
u.POut("added %s\n", fpath)
// ensure we keep it. atm no-op
return n.PinDagNode(nd)
return nil
}

View File

@ -1,13 +1,11 @@
package main
import (
"fmt"
"io"
"os"
"github.com/gonuts/flag"
"github.com/jbenet/commander"
dag "github.com/jbenet/go-ipfs/merkledag"
"github.com/jbenet/go-ipfs/daemon"
u "github.com/jbenet/go-ipfs/util"
)
@ -29,28 +27,18 @@ func catCmd(c *commander.Command, inp []string) error {
return nil
}
n, err := localNode(false)
if err != nil {
return err
}
com := daemon.NewCommand()
com.Command = "cat"
com.Args = inp
for _, fn := range inp {
nd, err := n.Resolver.ResolvePath(fn)
err := daemon.SendCommand(com, "localhost:12345")
if err != nil {
n, err := localNode(false)
if err != nil {
return err
}
read, err := dag.NewDagReader(nd, n.DAG)
if err != nil {
fmt.Println(err)
continue
}
_, err = io.Copy(os.Stdout, read)
if err != nil {
fmt.Println(err)
continue
}
daemon.ExecuteCommand(com, n, os.Stdout)
}
return nil
}

View File

@ -1,8 +1,12 @@
package main
import (
"fmt"
"os"
"github.com/gonuts/flag"
"github.com/jbenet/commander"
"github.com/jbenet/go-ipfs/daemon"
u "github.com/jbenet/go-ipfs/util"
)
@ -27,20 +31,20 @@ func lsCmd(c *commander.Command, inp []string) error {
return nil
}
n, err := localNode(false)
fmt.Println("hello")
com := daemon.NewCommand()
com.Command = "ls"
com.Args = inp
err := daemon.SendCommand(com, "localhost:12345")
if err != nil {
return err
}
for _, fn := range inp {
nd, err := n.Resolver.ResolvePath(fn)
fmt.Println(err)
n, err := localNode(false)
if err != nil {
return err
}
for _, link := range nd.Links {
u.POut("%s %d %s\n", link.Hash.B58String(), link.Size, link.Name)
}
daemon.ExecuteCommand(com, n, os.Stdout)
}
return nil
}

View File

@ -7,6 +7,7 @@ import (
"github.com/gonuts/flag"
"github.com/jbenet/commander"
"github.com/jbenet/go-ipfs/daemon"
rofs "github.com/jbenet/go-ipfs/fuse/readonly"
u "github.com/jbenet/go-ipfs/util"
)
@ -26,16 +27,26 @@ var cmdIpfsMount = &commander.Command{
}
func mountCmd(c *commander.Command, inp []string) error {
u.Debug = true
if len(inp) < 1 || len(inp[0]) == 0 {
u.POut(c.Long)
return nil
}
fmt.Println("wtf.")
n, err := localNode(true)
if err != nil {
return err
}
fmt.Println("starting new daemon listener...")
dl, err := daemon.NewDaemonListener(n, "localhost:12345")
if err != nil {
return err
}
go dl.Listen()
defer dl.Close()
mp := inp[0]
fmt.Printf("Mounting at %s\n", mp)

80
core/commands/add.go Normal file
View File

@ -0,0 +1,80 @@
package commands
import (
"fmt"
"io/ioutil"
"os"
"path/filepath"
"github.com/jbenet/go-ipfs/core"
"github.com/jbenet/go-ipfs/importer"
dag "github.com/jbenet/go-ipfs/merkledag"
u "github.com/jbenet/go-ipfs/util"
)
// Error indicating the max depth has been exceded.
var ErrDepthLimitExceeded = fmt.Errorf("depth limit exceeded")
func AddPath(n *core.IpfsNode, fpath string, depth int) (*dag.Node, error) {
if depth == 0 {
return nil, ErrDepthLimitExceeded
}
fi, err := os.Stat(fpath)
if err != nil {
return nil, err
}
if fi.IsDir() {
return addDir(n, fpath, depth)
}
return addFile(n, fpath, depth)
}
func addDir(n *core.IpfsNode, fpath string, depth int) (*dag.Node, error) {
tree := &dag.Node{Data: dag.FolderPBData()}
files, err := ioutil.ReadDir(fpath)
if err != nil {
return nil, err
}
// construct nodes for containing files.
for _, f := range files {
fp := filepath.Join(fpath, f.Name())
nd, err := AddPath(n, fp, depth-1)
if err != nil {
return nil, err
}
if err = tree.AddNodeLink(f.Name(), nd); err != nil {
return nil, err
}
}
return tree, addNode(n, tree, fpath)
}
func addFile(n *core.IpfsNode, fpath string, depth int) (*dag.Node, error) {
root, err := importer.NewDagFromFile(fpath)
if err != nil {
return nil, err
}
return root, addNode(n, root, fpath)
}
// addNode adds the node to the graph + local storage
func addNode(n *core.IpfsNode, nd *dag.Node, fpath string) error {
// add the file to the graph + local storage
err := n.DAG.AddRecursive(nd)
if err != nil {
return err
}
u.POut("added %s\n", fpath)
// ensure we keep it. atm no-op
return n.PinDagNode(nd)
}

View File

@ -133,19 +133,24 @@ func loadBitswap(cfg *config.Config, d ds.Datastore) (*bitswap.BitSwap, error) {
route := dht.NewDHT(local, net, d)
route.Start()
for _, p := range cfg.Peers {
maddr, err := ma.NewMultiaddr(p.Address)
if err != nil {
u.PErr("error: %v\n", err)
continue
}
go func() {
u.DOut("setup: connecting to peers.\n")
for _, p := range cfg.Peers {
maddr, err := ma.NewMultiaddr(p.Address)
if err != nil {
u.PErr("error: %v\n", err)
continue
}
_, err = route.Connect(maddr)
if err != nil {
u.PErr("Bootstrapping error: %v\n", err)
u.DOut("setup: connect.\n")
_, err = route.Connect(maddr)
if err != nil {
u.PErr("Bootstrapping error: %v\n", err)
}
}
}
}()
u.DOut("setup: return new bitswap\n")
return bitswap.NewBitSwap(local, net, d, route), nil
}

View File

@ -3,41 +3,54 @@ package daemon
import (
"encoding/json"
"errors"
"fmt"
"io"
"net"
"strings"
core "github.com/jbenet/go-ipfs/core"
commands "github.com/jbenet/go-ipfs/core/commands"
dag "github.com/jbenet/go-ipfs/merkledag"
u "github.com/jbenet/go-ipfs/util"
)
var ErrInvalidCommand = errors.New("invalid command")
type DaemonListener struct {
list net.Listener
CommChan chan *Command
closed bool
node *core.IpfsNode
list net.Listener
closed bool
}
func NewDaemonListener(addr string) (*DaemonListener, error) {
func NewDaemonListener(node *core.IpfsNode, addr string) (*DaemonListener, error) {
list, err := net.Listen("tcp", addr)
if err != nil {
return nil, err
}
fmt.Println("new daemon listener.")
return &DaemonListener{
list: list,
CommChan: make(chan *Command),
node: node,
list: list,
}, nil
}
type Command struct {
command string
args []string
resp chan string
Command string
Args []string
Opts map[string]interface{}
}
func NewCommand() *Command {
return &Command{
Opts: make(map[string]interface{}),
}
}
func (dl *DaemonListener) Listen() {
fmt.Println("listen.")
for {
c, err := dl.list.Accept()
fmt.Println("Loop!")
if err != nil {
if !dl.closed {
u.PErr("DaemonListener Accept: %v\n", err)
@ -49,59 +62,74 @@ func (dl *DaemonListener) Listen() {
}
func (dl *DaemonListener) handleConnection(c net.Conn) {
defer c.Close()
dec := json.NewDecoder(c)
enc := json.NewEncoder(c)
var com string
var com Command
err := dec.Decode(&com)
if err != nil {
err := enc.Encode(err.Error())
if err != nil {
u.PErr("DaemonListener decode: %v\n", err)
}
fmt.Fprintln(c, err)
return
}
u.DOut("Got command: %v\n", com)
cmd, err := parseCommand(com)
if err != nil {
err := enc.Encode(err.Error())
if err != nil {
u.PErr("DaemonListener parse: %v\n", err)
}
return
}
select {
case dl.CommChan <- cmd:
default:
u.PErr("Recieved command after closing...")
return
}
resp := <-cmd.resp
err = enc.Encode(resp)
if err != nil {
u.PErr("handleConnection: %v\n", err)
}
ExecuteCommand(&com, dl.node, c)
}
func parseCommand(cmdi string) (*Command, error) {
params := strings.Split(cmdi, " ")
if len(params) == 0 {
return nil, ErrInvalidCommand
func ExecuteCommand(com *Command, n *core.IpfsNode, out io.Writer) {
u.DOut("executing command: %s\n", com.Command)
switch com.Command {
case "add":
depth := 1
if r, ok := com.Opts["r"].(bool); r && ok {
depth = -1
}
for _, path := range com.Args {
_, err := commands.AddPath(n, path, depth)
if err != nil {
fmt.Fprintf(out, "addFile error: %v\n", err)
continue
}
}
case "cat":
for _, fn := range com.Args {
nd, err := n.Resolver.ResolvePath(fn)
if err != nil {
fmt.Fprintf(out, "catFile error: %v\n", err)
return
}
read, err := dag.NewDagReader(nd, n.DAG)
if err != nil {
fmt.Fprintln(out, err)
continue
}
_, err = io.Copy(out, read)
if err != nil {
fmt.Fprintln(out, err)
continue
}
}
case "ls":
for _, fn := range com.Args {
nd, err := n.Resolver.ResolvePath(fn)
if err != nil {
fmt.Fprintf(out, "ls: %v\n", err)
return
}
for _, link := range nd.Links {
fmt.Fprintf(out, "%s %d %s\n", link.Hash.B58String(), link.Size, link.Name)
}
}
default:
fmt.Fprintf(out, "Invalid Command: '%s'\n", com.Command)
}
//TODO: some sort of validation here
return &Command{
command: params[0],
args: params[1:],
resp: make(chan string),
}, nil
}
func (dl *DaemonListener) Close() error {
dl.closed = true
close(dl.CommChan)
return dl.list.Close()
}

View File

@ -2,28 +2,24 @@ package daemon
import (
"encoding/json"
"io"
"net"
"os"
)
func SendCommand(command, server string) (string, error) {
func SendCommand(com *Command, server string) error {
con, err := net.Dial("tcp", server)
if err != nil {
return "", err
return err
}
enc := json.NewEncoder(con)
err = enc.Encode(command)
err = enc.Encode(com)
if err != nil {
return "", err
return err
}
dec := json.NewDecoder(con)
io.Copy(os.Stdout, con)
var resp string
err = dec.Decode(&resp)
if err != nil {
return "", err
}
return resp, nil
return nil
}

View File

@ -1,70 +0,0 @@
package daemon
import (
"fmt"
"testing"
)
func TestCommandCall(t *testing.T) {
dl, err := NewDaemonListener("localhost:12345")
if err != nil {
t.Fatal(err)
}
go dl.Listen()
defer dl.Close()
go func() {
_, err := SendCommand("test command for fun", "localhost:12345")
if err != nil {
t.Fatal(err)
}
}()
cmd := <-dl.CommChan
if cmd.command != "test" {
t.Fatal("command parsing failed.")
}
if cmd.args[0] != "command" ||
cmd.args[1] != "for" ||
cmd.args[2] != "fun" {
t.Fatal("Args parsed incorrectly.")
}
}
func TestFailures(t *testing.T) {
dl, err := NewDaemonListener("localhost:12345")
if err != nil {
t.Fatal(err)
}
go dl.Listen()
defer dl.Close()
go func() {
_, err := SendCommand("test", "localhost:12345")
if err != nil {
t.Fatal(err)
}
}()
cmd := <-dl.CommChan
if cmd.command != "test" || len(cmd.args) > 0 {
t.Fatal("Parsing Failed.")
}
go func() {
_, err := SendCommand("", "localhost:12345")
if err != nil {
t.Fatal(err)
}
}()
cmd = <-dl.CommChan
if cmd.command != "" || len(cmd.args) > 0 {
fmt.Println(cmd)
t.Fatal("Parsing Failed.")
}
}