diff --git a/Godeps/Godeps.json b/Godeps/Godeps.json index 0fb6cb109..08444fe3e 100644 --- a/Godeps/Godeps.json +++ b/Godeps/Godeps.json @@ -55,8 +55,8 @@ }, { "ImportPath": "github.com/jbenet/go-multiaddr", - "Comment": "0.1.0-1-g99196c0", - "Rev": "99196c0d231f83eea7f6e47cf59cbb5a0b86b358" + "Comment": "0.1.2", + "Rev": "b90678896b52c3e5a4c8176805c6facc3fe3eb82" }, { "ImportPath": "github.com/jbenet/go-multihash", diff --git a/Godeps/_workspace/src/github.com/jbenet/go-multiaddr/codec.go b/Godeps/_workspace/src/github.com/jbenet/go-multiaddr/codec.go index ca0400a99..527f0cb58 100644 --- a/Godeps/_workspace/src/github.com/jbenet/go-multiaddr/codec.go +++ b/Godeps/_workspace/src/github.com/jbenet/go-multiaddr/codec.go @@ -8,10 +8,14 @@ import ( "strings" ) -func StringToBytes(s string) ([]byte, error) { +func stringToBytes(s string) ([]byte, error) { b := []byte{} sp := strings.Split(s, "/") + if sp[0] != "" { + return nil, fmt.Errorf("invalid multiaddr, must begin with /") + } + // consume first empty elem sp = sp[1:] @@ -22,7 +26,7 @@ func StringToBytes(s string) ([]byte, error) { } b = append(b, byte(p.Code)) - a := AddressStringToBytes(p, sp[1]) + a := addressStringToBytes(p, sp[1]) b = append(b, a...) sp = sp[2:] @@ -30,7 +34,7 @@ func StringToBytes(s string) ([]byte, error) { return b, nil } -func BytesToString(b []byte) (ret string, err error) { +func bytesToString(b []byte) (ret string, err error) { // panic handler, in case we try accessing bytes incorrectly. defer func() { if e := recover(); e != nil { @@ -49,7 +53,7 @@ func BytesToString(b []byte) (ret string, err error) { s = strings.Join([]string{s, "/", p.Name}, "") b = b[1:] - a := AddressBytesToString(p, b[:(p.Size/8)]) + a := addressBytesToString(p, b[:(p.Size/8)]) if len(a) > 0 { s = strings.Join([]string{s, "/", a}, "") } @@ -59,7 +63,7 @@ func BytesToString(b []byte) (ret string, err error) { return s, nil } -func AddressStringToBytes(p *Protocol, s string) []byte { +func addressStringToBytes(p *Protocol, s string) []byte { switch p.Code { // ipv4,6 @@ -79,7 +83,7 @@ func AddressStringToBytes(p *Protocol, s string) []byte { return []byte{} } -func AddressBytesToString(p *Protocol, b []byte) string { +func addressBytesToString(p *Protocol, b []byte) string { switch p.Code { // ipv4,6 diff --git a/Godeps/_workspace/src/github.com/jbenet/go-multiaddr/index.go b/Godeps/_workspace/src/github.com/jbenet/go-multiaddr/index.go index 413a971f5..df22012e9 100644 --- a/Godeps/_workspace/src/github.com/jbenet/go-multiaddr/index.go +++ b/Godeps/_workspace/src/github.com/jbenet/go-multiaddr/index.go @@ -5,22 +5,26 @@ import ( "strings" ) +// Multiaddr is the data structure representing a multiaddr type Multiaddr struct { Bytes []byte } +// NewMultiaddr parses and validates an input string, returning a *Multiaddr func NewMultiaddr(s string) (*Multiaddr, error) { - b, err := StringToBytes(s) + b, err := stringToBytes(s) if err != nil { return nil, err } return &Multiaddr{Bytes: b}, nil } +// String returns the string representation of a Multiaddr func (m *Multiaddr) String() (string, error) { - return BytesToString(m.Bytes) + return bytesToString(m.Bytes) } +// Protocols returns the list of protocols this Multiaddr has. func (m *Multiaddr) Protocols() (ret []*Protocol, err error) { // panic handler, in case we try accessing bytes incorrectly. @@ -44,12 +48,14 @@ func (m *Multiaddr) Protocols() (ret []*Protocol, err error) { return ps, nil } +// Encapsulate wraps a given Multiaddr, returning the resulting joined Multiaddr func (m *Multiaddr) Encapsulate(o *Multiaddr) *Multiaddr { b := make([]byte, len(m.Bytes)+len(o.Bytes)) b = append(m.Bytes, o.Bytes...) return &Multiaddr{Bytes: b} } +// Decapsulate unwraps Multiaddr up until the given Multiaddr is found. func (m *Multiaddr) Decapsulate(o *Multiaddr) (*Multiaddr, error) { s1, err := m.String() if err != nil { @@ -68,9 +74,10 @@ func (m *Multiaddr) Decapsulate(o *Multiaddr) (*Multiaddr, error) { return NewMultiaddr(s1[:i]) } +// DialArgs is a convenience function returning arguments for use in net.Dial func (m *Multiaddr) DialArgs() (string, string, error) { if !m.IsThinWaist() { - return "", "", fmt.Errorf("%s is not a 'thin waist' address.", m) + return "", "", fmt.Errorf("%s is not a 'thin waist' address", m) } str, err := m.String() @@ -84,6 +91,8 @@ func (m *Multiaddr) DialArgs() (string, string, error) { return network, host, nil } +// IsThinWaist returns whether this multiaddr includes "Thin Waist" Protocols. +// This means: /{IP4, IP6}/{TCP, UDP} func (m *Multiaddr) IsThinWaist() bool { p, err := m.Protocols() if err != nil { diff --git a/Godeps/_workspace/src/github.com/jbenet/go-multiaddr/multiaddr_test.go b/Godeps/_workspace/src/github.com/jbenet/go-multiaddr/multiaddr_test.go index 976e09556..65cb97219 100644 --- a/Godeps/_workspace/src/github.com/jbenet/go-multiaddr/multiaddr_test.go +++ b/Godeps/_workspace/src/github.com/jbenet/go-multiaddr/multiaddr_test.go @@ -14,7 +14,7 @@ func TestStringToBytes(t *testing.T) { t.Error("failed to decode hex", h) } - b2, err := StringToBytes(s) + b2, err := stringToBytes(s) if err != nil { t.Error("failed to convert", s) } @@ -35,7 +35,7 @@ func TestBytesToString(t *testing.T) { t.Error("failed to decode hex", h) } - s2, err := BytesToString(b) + s2, err := bytesToString(b) if err != nil { t.Error("failed to convert", b) } diff --git a/Godeps/_workspace/src/github.com/jbenet/go-multiaddr/protocols.go b/Godeps/_workspace/src/github.com/jbenet/go-multiaddr/protocols.go index e08d01f07..ed4d29f13 100644 --- a/Godeps/_workspace/src/github.com/jbenet/go-multiaddr/protocols.go +++ b/Godeps/_workspace/src/github.com/jbenet/go-multiaddr/protocols.go @@ -1,5 +1,6 @@ package multiaddr +// Protocol is a Multiaddr protocol description structure. type Protocol struct { Code int Size int @@ -10,7 +11,6 @@ type Protocol struct { // 1. avoid parsing the csv // 2. ensuring errors in the csv don't screw up code. // 3. changing a number has to happen in two places. - const ( P_IP4 = 4 P_TCP = 6 @@ -20,6 +20,7 @@ const ( P_SCTP = 132 ) +// Protocols is the list of multiaddr protocols supported by this module. var Protocols = []*Protocol{ &Protocol{P_IP4, 32, "ip4"}, &Protocol{P_TCP, 16, "tcp"}, @@ -32,6 +33,7 @@ var Protocols = []*Protocol{ // {443, 0, "https"}, } +// ProtocolWithName returns the Protocol description with given string name. func ProtocolWithName(s string) *Protocol { for _, p := range Protocols { if p.Name == s { @@ -41,6 +43,7 @@ func ProtocolWithName(s string) *Protocol { return nil } +// ProtocolWithCode returns the Protocol description with given protocol code. func ProtocolWithCode(c int) *Protocol { for _, p := range Protocols { if p.Code == c { diff --git a/bitswap/bitswap.go b/bitswap/bitswap.go index 22f97514c..bcb16b747 100644 --- a/bitswap/bitswap.go +++ b/bitswap/bitswap.go @@ -1,17 +1,17 @@ package bitswap import ( - "github.com/jbenet/go-ipfs/Godeps/_workspace/src/code.google.com/p/goprotobuf/proto" + "time" + + proto "github.com/jbenet/go-ipfs/Godeps/_workspace/src/code.google.com/p/goprotobuf/proto" + ds "github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/jbenet/datastore.go" + blocks "github.com/jbenet/go-ipfs/blocks" peer "github.com/jbenet/go-ipfs/peer" routing "github.com/jbenet/go-ipfs/routing" dht "github.com/jbenet/go-ipfs/routing/dht" swarm "github.com/jbenet/go-ipfs/swarm" u "github.com/jbenet/go-ipfs/util" - - ds "github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/jbenet/datastore.go" - - "time" ) // PartnerWantListMax is the bound for the number of keys we'll store per @@ -44,7 +44,7 @@ type BitSwap struct { // The Ledger has the peer.ID, and the peer connection works through net. // Ledgers of known relationships (active or inactive) stored in datastore. // Changes to the Ledger should be committed to the datastore. - partners map[u.Key]*Ledger + partners LedgerMap // haveList is the set of keys we have values for. a map for fast lookups. // haveList KeySet -- not needed. all values in datastore? @@ -115,13 +115,12 @@ func (bs *BitSwap) GetBlock(k u.Key, timeout time.Duration) ( func (bs *BitSwap) getBlock(k u.Key, p *peer.Peer, timeout time.Duration) ([]byte, error) { u.DOut("[%s] getBlock '%s' from [%s]\n", bs.peer.ID.Pretty(), k.Pretty(), p.ID.Pretty()) - pmes := new(PBMessage) - pmes.Wantlist = []string{string(k)} + message := newMessage() + message.AppendWanted(k) after := time.After(timeout) resp := bs.listener.Listen(string(k), 1, timeout) - smes := swarm.NewMessage(p, pmes) - bs.meschan.Outgoing <- smes + bs.meschan.Outgoing <- message.ToSwarm(p) select { case resp_mes := <-resp: @@ -137,7 +136,7 @@ func (bs *BitSwap) getBlock(k u.Key, p *peer.Peer, timeout time.Duration) ([]byt func (bs *BitSwap) HaveBlock(blk *blocks.Block) error { go func() { for _, ledger := range bs.partners { - if _, ok := ledger.WantList[blk.Key()]; ok { + if ledger.WantListContains(blk.Key()) { //send block to node if ledger.ShouldSend() { bs.SendBlock(ledger.Partner, blk) @@ -149,11 +148,9 @@ func (bs *BitSwap) HaveBlock(blk *blocks.Block) error { } func (bs *BitSwap) SendBlock(p *peer.Peer, b *blocks.Block) { - pmes := new(PBMessage) - pmes.Blocks = [][]byte{b.Data} - - swarm_mes := swarm.NewMessage(p, pmes) - bs.meschan.Outgoing <- swarm_mes + message := newMessage() + message.AppendBlock(b) + bs.meschan.Outgoing <- message.ToSwarm(p) } func (bs *BitSwap) handleMessages() { @@ -192,14 +189,13 @@ func (bs *BitSwap) handleMessages() { // and then if we do, check the ledger for whether or not we should send it. func (bs *BitSwap) peerWantsBlock(p *peer.Peer, want string) { u.DOut("peer [%s] wants block [%s]\n", p.ID.Pretty(), u.Key(want).Pretty()) - ledg := bs.GetLedger(p) + ledger := bs.getLedger(p) dsk := ds.NewKey(want) blk_i, err := bs.datastore.Get(dsk) if err != nil { if err == ds.ErrNotFound { - // TODO: this needs to be different. We need timeouts. - ledg.WantList[u.Key(want)] = struct{}{} + ledger.Wants(u.Key(want)) } u.PErr("datastore get error: %v\n", err) return @@ -211,7 +207,7 @@ func (bs *BitSwap) peerWantsBlock(p *peer.Peer, want string) { return } - if ledg.ShouldSend() { + if ledger.ShouldSend() { u.DOut("Sending block to peer.\n") bblk, err := blocks.NewBlock(blk) if err != nil { @@ -219,7 +215,7 @@ func (bs *BitSwap) peerWantsBlock(p *peer.Peer, want string) { return } bs.SendBlock(p, bblk) - ledg.SentBytes(len(blk)) + ledger.SentBytes(len(blk)) } else { u.DOut("Decided not to send block.") } @@ -239,11 +235,11 @@ func (bs *BitSwap) blockReceive(p *peer.Peer, blk *blocks.Block) { } bs.listener.Respond(string(blk.Key()), mes) - ledger := bs.GetLedger(p) + ledger := bs.getLedger(p) ledger.ReceivedBytes(len(blk.Data)) } -func (bs *BitSwap) GetLedger(p *peer.Peer) *Ledger { +func (bs *BitSwap) getLedger(p *peer.Peer) *Ledger { l, ok := bs.partners[p.Key()] if ok { return l @@ -257,14 +253,14 @@ func (bs *BitSwap) GetLedger(p *peer.Peer) *Ledger { } func (bs *BitSwap) SendWantList(wl KeySet) error { - pmes := new(PBMessage) + message := newMessage() for k, _ := range wl { - pmes.Wantlist = append(pmes.Wantlist, string(k)) + message.AppendWanted(k) } // Lets just ping everybody all at once for _, ledger := range bs.partners { - bs.meschan.Outgoing <- swarm.NewMessage(ledger.Partner, pmes) + bs.meschan.Outgoing <- message.ToSwarm(ledger.Partner) } return nil @@ -276,7 +272,7 @@ func (bs *BitSwap) Halt() { func (bs *BitSwap) SetStrategy(sf StrategyFunc) { bs.strategy = sf - for _, ledg := range bs.partners { - ledg.Strategy = sf + for _, ledger := range bs.partners { + ledger.Strategy = sf } } diff --git a/bitswap/ledger.go b/bitswap/ledger.go index a0f23b8d4..6ddc0a711 100644 --- a/bitswap/ledger.go +++ b/bitswap/ledger.go @@ -1,14 +1,16 @@ package bitswap import ( + "sync" + "time" + peer "github.com/jbenet/go-ipfs/peer" u "github.com/jbenet/go-ipfs/util" - - "time" ) // Ledger stores the data exchange relationship between two peers. type Ledger struct { + lock sync.RWMutex // Partner is the remote Peer. Partner *peer.Peer @@ -16,17 +18,17 @@ type Ledger struct { // Accounting tracks bytes sent and recieved. Accounting debtRatio - // FirstExchnage is the time of the first data exchange. - FirstExchange time.Time + // firstExchnage is the time of the first data exchange. + firstExchange time.Time - // LastExchange is the time of the last data exchange. - LastExchange time.Time + // lastExchange is the time of the last data exchange. + lastExchange time.Time - // Number of exchanges with this peer - ExchangeCount uint64 + // exchangeCount is the number of exchanges with this peer + exchangeCount uint64 - // WantList is a (bounded, small) set of keys that Partner desires. - WantList KeySet + // wantList is a (bounded, small) set of keys that Partner desires. + wantList KeySet Strategy StrategyFunc } @@ -35,17 +37,48 @@ type Ledger struct { type LedgerMap map[u.Key]*Ledger func (l *Ledger) ShouldSend() bool { + l.lock.Lock() + defer l.lock.Unlock() + return l.Strategy(l) } func (l *Ledger) SentBytes(n int) { - l.ExchangeCount++ - l.LastExchange = time.Now() + l.lock.Lock() + defer l.lock.Unlock() + + l.exchangeCount++ + l.lastExchange = time.Now() l.Accounting.BytesSent += uint64(n) } func (l *Ledger) ReceivedBytes(n int) { - l.ExchangeCount++ - l.LastExchange = time.Now() + l.lock.Lock() + defer l.lock.Unlock() + + l.exchangeCount++ + l.lastExchange = time.Now() l.Accounting.BytesRecv += uint64(n) } + +// TODO: this needs to be different. We need timeouts. +func (l *Ledger) Wants(k u.Key) { + l.lock.Lock() + defer l.lock.Unlock() + + l.wantList[k] = struct{}{} +} + +func (l *Ledger) WantListContains(k u.Key) bool { + l.lock.RLock() + defer l.lock.RUnlock() + + _, ok := l.wantList[k] + return ok +} + +func (l *Ledger) ExchangeCount() uint64 { + l.lock.RLock() + defer l.lock.RUnlock() + return l.exchangeCount +} diff --git a/bitswap/ledger_test.go b/bitswap/ledger_test.go new file mode 100644 index 000000000..d651d485f --- /dev/null +++ b/bitswap/ledger_test.go @@ -0,0 +1,23 @@ +package bitswap + +import ( + "sync" + "testing" +) + +func TestRaceConditions(t *testing.T) { + const numberOfExpectedExchanges = 10000 + l := new(Ledger) + var wg sync.WaitGroup + for i := 0; i < numberOfExpectedExchanges; i++ { + wg.Add(1) + go func() { + defer wg.Done() + l.ReceivedBytes(1) + }() + } + wg.Wait() + if l.ExchangeCount() != numberOfExpectedExchanges { + t.Fail() + } +} diff --git a/bitswap/message.go b/bitswap/message.go new file mode 100644 index 000000000..94bb82ef8 --- /dev/null +++ b/bitswap/message.go @@ -0,0 +1,38 @@ +package bitswap + +import ( + blocks "github.com/jbenet/go-ipfs/blocks" + peer "github.com/jbenet/go-ipfs/peer" + swarm "github.com/jbenet/go-ipfs/swarm" + u "github.com/jbenet/go-ipfs/util" +) + +// message wraps a proto message for convenience +type message struct { + pb PBMessage +} + +func newMessageFromProto(pb PBMessage) *message { + return &message{pb: pb} +} + +func newMessage() *message { + return new(message) +} + +func (m *message) AppendWanted(k u.Key) { + m.pb.Wantlist = append(m.pb.Wantlist, string(k)) +} + +func (m *message) AppendBlock(b *blocks.Block) { + m.pb.Blocks = append(m.pb.Blocks, b.Data) +} + +func (m *message) ToProto() *PBMessage { + cp := m.pb + return &cp +} + +func (m *message) ToSwarm(p *peer.Peer) *swarm.Message { + return swarm.NewMessage(p, m.ToProto()) +} diff --git a/bitswap/message_test.go b/bitswap/message_test.go new file mode 100644 index 000000000..bc52b5aa9 --- /dev/null +++ b/bitswap/message_test.go @@ -0,0 +1,75 @@ +package bitswap + +import ( + "bytes" + "testing" + + blocks "github.com/jbenet/go-ipfs/blocks" + u "github.com/jbenet/go-ipfs/util" +) + +func TestAppendWanted(t *testing.T) { + const str = "foo" + m := newMessage() + m.AppendWanted(u.Key(str)) + + if !contains(m.ToProto().GetWantlist(), str) { + t.Fail() + } +} + +func TestNewMessageFromProto(t *testing.T) { + const str = "a_key" + protoMessage := new(PBMessage) + protoMessage.Wantlist = []string{string(str)} + if !contains(protoMessage.Wantlist, str) { + t.Fail() + } + m := newMessageFromProto(*protoMessage) + if !contains(m.ToProto().GetWantlist(), str) { + t.Fail() + } +} + +func TestAppendBlock(t *testing.T) { + + strs := make([]string, 2) + strs = append(strs, "Celeritas") + strs = append(strs, "Incendia") + + m := newMessage() + for _, str := range strs { + block, err := blocks.NewBlock([]byte(str)) + if err != nil { + t.Fail() + } + m.AppendBlock(block) + } + + // assert strings are in proto message + for _, blockbytes := range m.ToProto().GetBlocks() { + s := bytes.NewBuffer(blockbytes).String() + if !contains(strs, s) { + t.Fail() + } + } +} + +func TestCopyProtoByValue(t *testing.T) { + const str = "foo" + m := newMessage() + protoBeforeAppend := m.ToProto() + m.AppendWanted(u.Key(str)) + if contains(protoBeforeAppend.GetWantlist(), str) { + t.Fail() + } +} + +func contains(s []string, x string) bool { + for _, a := range s { + if a == x { + return true + } + } + return false +} diff --git a/cmd/ipfs/cat.go b/cmd/ipfs/cat.go index 8d3d74f10..2860c380b 100644 --- a/cmd/ipfs/cat.go +++ b/cmd/ipfs/cat.go @@ -35,9 +35,9 @@ func catCmd(c *commander.Command, inp []string) error { com := daemon.NewCommand() com.Command = "cat" - com.Args = inp + com.Args = expanded - err := daemon.SendCommand(com, "localhost:12345") + err = daemon.SendCommand(com, "localhost:12345") if err != nil { n, err := localNode(false) if err != nil { diff --git a/cmd/ipfs/init.go b/cmd/ipfs/init.go index 77f3fb9f0..a70e0eb81 100644 --- a/cmd/ipfs/init.go +++ b/cmd/ipfs/init.go @@ -32,14 +32,18 @@ func init() { } func initCmd(c *commander.Command, inp []string) error { - _, err := os.Lstat(config.DefaultConfigFilePath) + filename, err := config.Filename(config.DefaultConfigFilePath) + if err != nil { + return errors.New("Couldn't get home directory path") + } + fi, err := os.Lstat(filename) force := c.Flag.Lookup("f").Value.Get().(bool) - if err != nil && !force { + if fi != nil || (err != nil && !os.IsNotExist(err)) && !force { return errors.New("ipfs configuration file already exists!\nReinitializing would overwrite your keys.\n(use -f to force overwrite)") } cfg := new(config.Config) - cfg.Datastore = new(config.Datastore) + cfg.Datastore = config.Datastore{} dspath, err := u.TildeExpansion("~/.go-ipfs/datastore") if err != nil { return err @@ -68,7 +72,7 @@ func initCmd(c *commander.Command, inp []string) error { } cfg.Identity.PrivKey = base64.StdEncoding.EncodeToString(skbytes) - id, err := identify.IdFromPubKey(pk) + id, err := identify.IDFromPubKey(pk) if err != nil { return err } diff --git a/config/config.go b/config/config.go index b80ff1a2c..25f40ec9e 100644 --- a/config/config.go +++ b/config/config.go @@ -30,16 +30,17 @@ type SavedPeer struct { // Config is used to load IPFS config files. type Config struct { Identity *Identity - Datastore *Datastore + Datastore Datastore Peers []*SavedPeer } -var DefaultConfigFilePath = "~/.go-ipfs/config" -var DefaultConfigFile = `{ +const DefaultPathRoot = "~/.go-ipfs" +const DefaultConfigFilePath = DefaultPathRoot + "/config" +const DefaultConfigFile = `{ "identity": {}, "datastore": { "type": "leveldb", - "path": "~/.go-ipfs/datastore" + "path": "` + DefaultPathRoot + `/datastore" } } ` diff --git a/config/config_test.go b/config/config_test.go index ffc7ef7af..c891d6c51 100644 --- a/config/config_test.go +++ b/config/config_test.go @@ -1,17 +1,24 @@ package config import ( - "fmt" "testing" ) func TestConfig(t *testing.T) { - - cfg, err := Load(".ipfsconfig") + const filename = ".ipfsconfig" + const dsPath = "/path/to/datastore" + cfgWritten := new(Config) + cfgWritten.Datastore.Path = dsPath + err := WriteConfigFile(filename, cfgWritten) + if err != nil { + t.Error(err) + } + cfgRead, err := Load(filename) if err != nil { t.Error(err) return } - - fmt.Printf(cfg.Datastore.Path) + if cfgWritten.Datastore.Path != cfgRead.Datastore.Path { + t.Fail() + } } diff --git a/config/serialize.go b/config/serialize.go index bf495a260..647e19e33 100644 --- a/config/serialize.go +++ b/config/serialize.go @@ -22,6 +22,11 @@ func ReadConfigFile(filename string, cfg interface{}) error { // WriteConfigFile writes the config from `cfg` into `filename`. func WriteConfigFile(filename string, cfg interface{}) error { + err := os.MkdirAll(filepath.Dir(filename), 0775) + if err != nil { + return err + } + f, err := os.Create(filename) if err != nil { return err diff --git a/core/core.go b/core/core.go index 296da57f1..0a9db055a 100644 --- a/core/core.go +++ b/core/core.go @@ -69,13 +69,32 @@ func NewIpfsNode(cfg *config.Config, online bool) (*IpfsNode, error) { return nil, err } - var swap *bitswap.BitSwap + local, err := initIdentity(cfg) + if err != nil { + return nil, err + } + + var ( + net *swarm.Swarm + // TODO: refactor so we can use IpfsRouting interface instead of being DHT-specific + route* dht.IpfsDHT + swap *bitswap.BitSwap + ) + if online { - swap, err = loadBitswap(cfg, d) + net = swarm.NewSwarm(local) + err = net.Listen() if err != nil { return nil, err } + + route = dht.NewDHT(local, net, d) + route.Start() + + swap = bitswap.NewBitSwap(local, net, d, route) swap.SetStrategy(bitswap.YesManStrategy) + + go initConnections(cfg, route) } bs, err := bserv.NewBlockService(d, swap) @@ -85,22 +104,37 @@ func NewIpfsNode(cfg *config.Config, online bool) (*IpfsNode, error) { dag := &merkledag.DAGService{Blocks: bs} - n := &IpfsNode{ + return &IpfsNode{ Config: cfg, PeerMap: &peer.Map{}, Datastore: d, Blocks: bs, DAG: dag, Resolver: &path.Resolver{DAG: dag}, - } - - return n, nil + BitSwap: swap, + Identity: local, + Routing: route, + }, nil } -func loadBitswap(cfg *config.Config, d ds.Datastore) (*bitswap.BitSwap, error) { - maddr, err := ma.NewMultiaddr(cfg.Identity.Address) - if err != nil { - return nil, err +func initIdentity(cfg *config.Config) (*peer.Peer, error) { + if cfg.Identity == nil { + return nil, errors.New("Identity was not set in config (was ipfs init run?)") + } + + if len(cfg.Identity.PeerID) == 0 { + return nil, errors.New("No peer ID in config! (was ipfs init run?)") + } + + // address is optional + var addresses []*ma.Multiaddr + if len(cfg.Identity.Address) > 0 { + maddr, err := ma.NewMultiaddr(cfg.Identity.Address) + if err != nil { + return nil, err + } + + addresses = []*ma.Multiaddr{ maddr } } skb, err := base64.StdEncoding.DecodeString(cfg.Identity.PrivKey) @@ -113,45 +147,27 @@ func loadBitswap(cfg *config.Config, d ds.Datastore) (*bitswap.BitSwap, error) { return nil, err } - local := &peer.Peer{ + return &peer.Peer{ ID: peer.ID(b58.Decode(cfg.Identity.PeerID)), - Addresses: []*ma.Multiaddr{maddr}, + Addresses: addresses, PrivKey: sk, PubKey: sk.GetPublic(), - } + }, nil +} - if len(local.ID) == 0 { - return nil, errors.New("No peer ID in config! (was ipfs init run?)") - } - - net := swarm.NewSwarm(local) - err = net.Listen() - if err != nil { - return nil, err - } - - route := dht.NewDHT(local, net, d) - route.Start() - - go func() { - u.DOut("setup: connecting to peers.\n") - for _, p := range cfg.Peers { - maddr, err := ma.NewMultiaddr(p.Address) - if err != nil { - u.PErr("error: %v\n", err) - continue - } - - u.DOut("setup: connect.\n") - _, err = route.Connect(maddr) - if err != nil { - u.PErr("Bootstrapping error: %v\n", err) - } +func initConnections(cfg *config.Config, route *dht.IpfsDHT) { + for _, p := range cfg.Peers { + maddr, err := ma.NewMultiaddr(p.Address) + if err != nil { + u.PErr("error: %v\n", err) + continue } - }() - u.DOut("setup: return new bitswap\n") - return bitswap.NewBitSwap(local, net, d, route), nil + _, err = route.Connect(maddr) + if err != nil { + u.PErr("Bootstrapping error: %v\n", err) + } + } } func (n *IpfsNode) PinDagNode(nd *merkledag.Node) error { diff --git a/core/core_test.go b/core/core_test.go index f2de5f016..c6695eb6b 100644 --- a/core/core_test.go +++ b/core/core_test.go @@ -6,17 +6,35 @@ import ( config "github.com/jbenet/go-ipfs/config" ) -func TestDatastores(t *testing.T) { +func TestInitialization(t *testing.T) { + id := &config.Identity{ + PeerID: "QmNgdzLieYi8tgfo2WfTUzNVH5hQK9oAYGVf6dxN12NrHt", + Address: "/ip4/127.0.0.1/tcp/8000", + PrivKey: "CAASrRIwggkpAgEAAoICAQCwt67GTUQ8nlJhks6CgbLKOx7F5tl1r9zF4m3TUrG3Pe8h64vi+ILDRFd7QJxaJ/n8ux9RUDoxLjzftL4uTdtv5UXl2vaufCc/C0bhCRvDhuWPhVsD75/DZPbwLsepxocwVWTyq7/ZHsCfuWdoh/KNczfy+Gn33gVQbHCnip/uhTVxT7ARTiv8Qa3d7qmmxsR+1zdL/IRO0mic/iojcb3Oc/PRnYBTiAZFbZdUEit/99tnfSjMDg02wRayZaT5ikxa6gBTMZ16Yvienq7RwSELzMQq2jFA4i/TdiGhS9uKywltiN2LrNDBcQJSN02pK12DKoiIy+wuOCRgs2NTQEhU2sXCk091v7giTTOpFX2ij9ghmiRfoSiBFPJA5RGwiH6ansCHtWKY1K8BS5UORM0o3dYk87mTnKbCsdz4bYnGtOWafujYwzueGx8r+IWiys80IPQKDeehnLW6RgoyjszKgL/2XTyP54xMLSW+Qb3BPgDcPaPO0hmop1hW9upStxKsefW2A2d46Ds4HEpJEry7PkS5M4gKL/zCKHuxuXVk14+fZQ1rstMuvKjrekpAC2aVIKMI9VRA3awtnje8HImQMdj+r+bPmv0N8rTTr3eS4J8Yl7k12i95LLfK+fWnmUh22oTNzkRlaiERQrUDyE4XNCtJc0xs1oe1yXGqazCIAQIDAQABAoICAQCk1N/ftahlRmOfAXk//8wNl7FvdJD3le6+YSKBj0uWmN1ZbUSQk64chr12iGCOM2WY180xYjy1LOS44PTXaeW5bEiTSnb3b3SH+HPHaWCNM2EiSogHltYVQjKW+3tfH39vlOdQ9uQ+l9Gh6iTLOqsCRyszpYPqIBwi1NMLY2Ej8PpVU7ftnFWouHZ9YKS7nAEiMoowhTu/7cCIVwZlAy3AySTuKxPMVj9LORqC32PVvBHZaMPJ+X1Xyijqg6aq39WyoztkXg3+Xxx5j5eOrK6vO/Lp6ZUxaQilHDXoJkKEJjgIBDZpluss08UPfOgiWAGkW+L4fgUxY0qDLDAEMhyEBAn6KOKVL1JhGTX6GjhWziI94bddSpHKYOEIDzUy4H8BXnKhtnyQV6ELS65C2hj9D0IMBTj7edCF1poJy0QfdK0cuXgMvxHLeUO5uc2YWfbNosvKxqygB9rToy4b22YvNwsZUXsTY6Jt+p9V2OgXSKfB5VPeRbjTJL6xqvvUJpQytmII/C9JmSDUtCbYceHj6X9jgigLk20VV6nWHqCTj3utXD6NPAjoycVpLKDlnWEgfVELDIk0gobxUqqSm3jTPEKRPJgxkgPxbwxYumtw++1UY2y35w3WRDc2xYPaWKBCQeZy+mL6ByXp9bWlNvxS3Knb6oZp36/ovGnf2pGvdQKCAQEAyKpipz2lIUySDyE0avVWAmQb2tWGKXALPohzj7AwkcfEg2GuwoC6GyVE2sTJD1HRazIjOKn3yQORg2uOPeG7sx7EKHxSxCKDrbPawkvLCq8JYSy9TLvhqKUVVGYPqMBzu2POSLEA81QXas+aYjKOFWA2Zrjq26zV9ey3+6Lc6WULePgRQybU8+RHJc6fdjUCCfUxgOrUO2IQOuTJ+FsDpVnrMUGlokmWn23OjL4qTL9wGDnWGUs2pjSzNbj3qA0d8iqaiMUyHX/D/VS0wpeT1osNBSm8suvSibYBn+7wbIApbwXUxZaxMv2OHGz3empae4ckvNZs7r8wsI9UwFt8mwKCAQEA4XK6gZkv9t+3YCcSPw2ensLvL/xU7i2bkC9tfTGdjnQfzZXIf5KNdVuj/SerOl2S1s45NMs3ysJbADwRb4ahElD/V71nGzV8fpFTitC20ro9fuX4J0+twmBolHqeH9pmeGTjAeL1rvt6vxs4FkeG/yNft7GdXpXTtEGaObn8Mt0tPY+aB3UnKrnCQoQAlPyGHFrVRX0UEcp6wyyNGhJCNKeNOvqCHTFObhbhO+KWpWSN0MkVHnqaIBnIn1Te8FtvP/iTwXGnKc0YXJUG6+LM6LmOguW6tg8ZqiQeYyyR+e9eCFH4csLzkrTl1GxCxwEsoSLIMm7UDcjttW6tYEghkwKCAQEAmeCO5lCPYImnN5Lu71ZTLmI2OgmjaANTnBBnDbi+hgv61gUCToUIMejSdDCTPfwv61P3TmyIZs0luPGxkiKYHTNqmOE9Vspgz8Mr7fLRMNApESuNvloVIY32XVImj/GEzh4rAfM6F15U1sN8T/EUo6+0B/Glp+9R49QzAfRSE2g48/rGwgf1JVHYfVWFUtAzUA+GdqWdOixo5cCsYJbqpNHfWVZN/bUQnBFIYwUwysnC29D+LUdQEQQ4qOm+gFAOtrWU62zMkXJ4iLt8Ify6kbrvsRXgbhQIzzGS7WH9XDarj0eZciuslr15TLMC1Azadf+cXHLR9gMHA13mT9vYIQKCAQA/DjGv8cKCkAvf7s2hqROGYAs6Jp8yhrsN1tYOwAPLRhtnCs+rLrg17M2vDptLlcRuI/vIElamdTmylRpjUQpX7yObzLO73nfVhpwRJVMdGU394iBIDncQ+JoHfUwgqJskbUM40dvZdyjbrqc/Q/4z+hbZb+oN/GXb8sVKBATPzSDMKQ/xqgisYIw+wmDPStnPsHAaIWOtni47zIgilJzD0WEk78/YjmPbUrboYvWziK5JiRRJFA1rkQqV1c0M+OXixIm+/yS8AksgCeaHr0WUieGcJtjT9uE8vyFop5ykhRiNxy9wGaq6i7IEecsrkd6DqxDHWkwhFuO1bSE83q/VAoIBAEA+RX1i/SUi08p71ggUi9WFMqXmzELp1L3hiEjOc2AklHk2rPxsaTh9+G95BvjhP7fRa/Yga+yDtYuyjO99nedStdNNSg03aPXILl9gs3r2dPiQKUEXZJ3FrH6tkils/8BlpOIRfbkszrdZIKTO9GCdLWQ30dQITDACs8zV/1GFGrHFrqnnMe/NpIFHWNZJ0/WZMi8wgWO6Ik8jHEpQtVXRiXLqy7U6hk170pa4GHOzvftfPElOZZjy9qn7KjdAQqy6spIrAE94OEL+fBgbHQZGLpuTlj6w6YGbMtPU8uo7sXKoc6WOCb68JWft3tejGLDa1946HAWqVM9B/UcneNc=", + } good := []*config.Config{ - &config.Config{Datastore: &config.Datastore{Type: "memory"}}, - &config.Config{Datastore: &config.Datastore{Type: "leveldb", Path: ".testdb"}}, + &config.Config{ + Identity: id, + Datastore: config.Datastore{ + Type: "memory", + }, + }, + + &config.Config{ + Identity: id, + Datastore: config.Datastore{ + Type: "leveldb", + Path: ".testdb", + }, + }, } bad := []*config.Config{ - &config.Config{Datastore: &config.Datastore{}}, - &config.Config{Datastore: &config.Datastore{Type: "badtype"}}, + &config.Config{Identity: id, Datastore: config.Datastore{}}, + &config.Config{Identity: id, Datastore: config.Datastore{Type: "badtype"}}, &config.Config{}, + &config.Config{Datastore: config.Datastore{Type: "memory"}}, nil, } diff --git a/core/datastore.go b/core/datastore.go index 5395e74a4..9105adaab 100644 --- a/core/datastore.go +++ b/core/datastore.go @@ -2,13 +2,14 @@ package core import ( "fmt" + ds "github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/jbenet/datastore.go" lds "github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/jbenet/datastore.go/leveldb" config "github.com/jbenet/go-ipfs/config" ) -func makeDatastore(cfg *config.Datastore) (ds.Datastore, error) { - if cfg == nil || len(cfg.Type) == 0 { +func makeDatastore(cfg config.Datastore) (ds.Datastore, error) { + if len(cfg.Type) == 0 { return nil, fmt.Errorf("config datastore.type required") } @@ -22,7 +23,7 @@ func makeDatastore(cfg *config.Datastore) (ds.Datastore, error) { return nil, fmt.Errorf("Unknown datastore type: %s", cfg.Type) } -func makeLevelDBDatastore(cfg *config.Datastore) (ds.Datastore, error) { +func makeLevelDBDatastore(cfg config.Datastore) (ds.Datastore, error) { if len(cfg.Path) == 0 { return nil, fmt.Errorf("config datastore.path required for leveldb") } diff --git a/identify/identify.go b/identify/identify.go index 72555144c..b6b896780 100644 --- a/identify/identify.go +++ b/identify/identify.go @@ -1,4 +1,4 @@ -// The identify package handles how peers identify with eachother upon +// Package identify handles how peers identify with eachother upon // connection to the network package identify @@ -31,13 +31,16 @@ var SupportedHashes = "SHA256,SHA512,SHA1" // ErrUnsupportedKeyType is returned when a private key cast/type switch fails. var ErrUnsupportedKeyType = errors.New("unsupported key type") -// Perform initial communication with this peer to share node ID's and +// Performs initial communication with this peer to share node ID's and // initiate communication. (secureIn, secureOut, error) -func Handshake(self, remote *peer.Peer, in, out chan []byte) (chan []byte, chan []byte, error) { +func Handshake(self, remote *peer.Peer, in <-chan []byte, out chan<- []byte) (<-chan []byte, chan<- []byte, error) { // Generate and send Hello packet. // Hello = (rand, PublicKey, Supported) nonce := make([]byte, 16) - rand.Read(nonce) + _, err := rand.Read(nonce) + if err != nil { + return nil, nil, err + } hello := new(Hello) @@ -74,7 +77,7 @@ func Handshake(self, remote *peer.Peer, in, out chan []byte) (chan []byte, chan return nil, nil, err } - remote.ID, err = IdFromPubKey(remote.PubKey) + remote.ID, err = IDFromPubKey(remote.PubKey) if err != nil { return nil, nil, err } @@ -95,6 +98,9 @@ func Handshake(self, remote *peer.Peer, in, out chan []byte) (chan []byte, chan } epubkey, done, err := ci.GenerateEKeyPair(exchange) // Generate EphemeralPubKey + if err != nil { + return nil, nil, err + } var handshake bytes.Buffer // Gather corpus to sign. handshake.Write(encoded) @@ -110,6 +116,9 @@ func Handshake(self, remote *peer.Peer, in, out chan []byte) (chan []byte, chan } exEncoded, err := proto.Marshal(exPacket) + if err != nil { + return nil, nil, err + } out <- exEncoded @@ -124,9 +133,18 @@ func Handshake(self, remote *peer.Peer, in, out chan []byte) (chan []byte, chan } var theirHandshake bytes.Buffer - theirHandshake.Write(resp) - theirHandshake.Write(encoded) - theirHandshake.Write(exchangeResp.GetEpubkey()) + _, err = theirHandshake.Write(resp) + if err != nil { + return nil, nil, err + } + _, err = theirHandshake.Write(encoded) + if err != nil { + return nil, nil, err + } + _, err = theirHandshake.Write(exchangeResp.GetEpubkey()) + if err != nil { + return nil, nil, err + } ok, err := remote.PubKey.Verify(theirHandshake.Bytes(), exchangeResp.GetSignature()) if err != nil { @@ -176,7 +194,7 @@ func makeMac(hashType string, key []byte) (hash.Hash, int) { } } -func secureInProxy(in, secureIn chan []byte, hashType string, tIV, tCKey, tMKey []byte) { +func secureInProxy(in <-chan []byte, secureIn chan<- []byte, hashType string, tIV, tCKey, tMKey []byte) { theirBlock, _ := aes.NewCipher(tCKey) theirCipher := cipher.NewCTR(theirBlock, tIV) @@ -185,6 +203,7 @@ func secureInProxy(in, secureIn chan []byte, hashType string, tIV, tCKey, tMKey for { data, ok := <-in if !ok { + close(secureIn) return } @@ -211,7 +230,7 @@ func secureInProxy(in, secureIn chan []byte, hashType string, tIV, tCKey, tMKey } } -func secureOutProxy(out, secureOut chan []byte, hashType string, mIV, mCKey, mMKey []byte) { +func secureOutProxy(out chan<- []byte, secureOut <-chan []byte, hashType string, mIV, mCKey, mMKey []byte) { myBlock, _ := aes.NewCipher(mCKey) myCipher := cipher.NewCTR(myBlock, mIV) @@ -220,6 +239,7 @@ func secureOutProxy(out, secureOut chan []byte, hashType string, mIV, mCKey, mMK for { data, ok := <-secureOut if !ok { + close(out) return } @@ -239,7 +259,8 @@ func secureOutProxy(out, secureOut chan []byte, hashType string, mIV, mCKey, mMK } } -func IdFromPubKey(pk ci.PubKey) (peer.ID, error) { +// IDFromPubKey returns Nodes ID given its public key +func IDFromPubKey(pk ci.PubKey) (peer.ID, error) { b, err := pk.Bytes() if err != nil { return nil, err diff --git a/identify/identify_test.go b/identify/identify_test.go index 6ac33039f..3d529f3e4 100644 --- a/identify/identify_test.go +++ b/identify/identify_test.go @@ -20,7 +20,7 @@ func TestHandshake(t *testing.T) { cha := make(chan []byte, 5) chb := make(chan []byte, 5) - ida, err := IdFromPubKey(pka) + ida, err := IDFromPubKey(pka) if err != nil { t.Fatal(err) } @@ -30,7 +30,7 @@ func TestHandshake(t *testing.T) { PrivKey: ska, } - idb, err := IdFromPubKey(pkb) + idb, err := IDFromPubKey(pkb) if err != nil { t.Fatal(err) } diff --git a/importer/splitting.go b/importer/splitting.go index 1832743d8..d2690c784 100644 --- a/importer/splitting.go +++ b/importer/splitting.go @@ -34,12 +34,13 @@ func SplitterBySize(n int) BlockSplitter { } // TODO: this should take a reader, not a byte array. what if we're splitting a 3TB file? +//Rabin Fingerprinting for file chunking func Rabin(b []byte) [][]byte { var out [][]byte windowsize := uint64(48) - chunk_max := 1024 * 16 - min_blk_size := 2048 - blk_beg_i := 0 + chunkMax := 1024 * 16 + minBlkSize := 2048 + blkBegI := 0 prime := uint64(61) var poly uint64 @@ -63,21 +64,21 @@ func Rabin(b []byte) [][]byte { poly = (poly * prime) + cur curchecksum -= (uint64(b[i-1]) * prime) - if i-blk_beg_i >= chunk_max { + if i-blkBegI >= chunkMax { // push block - out = append(out, b[blk_beg_i:i]) - blk_beg_i = i + out = append(out, b[blkBegI:i]) + blkBegI = i } // first 13 bits of polynomial are 0 - if poly%8192 == 0 && i-blk_beg_i >= min_blk_size { + if poly%8192 == 0 && i-blkBegI >= minBlkSize { // push block - out = append(out, b[blk_beg_i:i]) - blk_beg_i = i + out = append(out, b[blkBegI:i]) + blkBegI = i } } - if i > blk_beg_i { - out = append(out, b[blk_beg_i:]) + if i > blkBegI { + out = append(out, b[blkBegI:]) } return out } diff --git a/merkledag/dagreader.go b/merkledag/dagreader.go index 0aa0d2606..967ec63a4 100644 --- a/merkledag/dagreader.go +++ b/merkledag/dagreader.go @@ -9,7 +9,7 @@ import ( u "github.com/jbenet/go-ipfs/util" ) -var ErrIsDir = errors.New("this dag node is a directory.") +var ErrIsDir = errors.New("this dag node is a directory") // DagReader provides a way to easily read the data contained in a dag. type DagReader struct { diff --git a/peer/peer.go b/peer/peer.go index 1d270450d..870170c4b 100644 --- a/peer/peer.go +++ b/peer/peer.go @@ -16,7 +16,7 @@ import ( // ID is a byte slice representing the identity of a peer. type ID mh.Multihash -// Utililty function for comparing two peer ID's +// Equal is utililty function for comparing two peer ID's func (id ID) Equal(other ID) bool { return bytes.Equal(id, other) } diff --git a/routing/dht/dht_test.go b/routing/dht/dht_test.go index 9e14987d8..3b8f7f7d1 100644 --- a/routing/dht/dht_test.go +++ b/routing/dht/dht_test.go @@ -35,7 +35,7 @@ func setupDHTS(n int, t *testing.T) ([]*ma.Multiaddr, []*peer.Peer, []*IpfsDHT) } p.PubKey = pk p.PrivKey = sk - id, err := identify.IdFromPubKey(pk) + id, err := identify.IDFromPubKey(pk) if err != nil { panic(err) } @@ -67,7 +67,7 @@ func makePeer(addr *ma.Multiaddr) *peer.Peer { } p.PrivKey = sk p.PubKey = pk - id, err := identify.IdFromPubKey(pk) + id, err := identify.IDFromPubKey(pk) if err != nil { panic(err) } diff --git a/swarm/conn.go b/swarm/conn.go index 1f2df1658..468e86cd2 100644 --- a/swarm/conn.go +++ b/swarm/conn.go @@ -25,8 +25,8 @@ type Conn struct { Closed chan bool Outgoing *msgio.Chan Incoming *msgio.Chan - secIn chan []byte - secOut chan []byte + secIn <-chan []byte + secOut chan<- []byte } // ConnMap maps Keys (Peer.IDs) to Connections.