1
0
mirror of https://github.com/ipfs/kubo.git synced 2025-06-25 23:21:54 +08:00

remove supernode routing

It was never fully implemented and isn't used.

fixes #3950

(not removing routing/mock because that *is* in use).

License: MIT
Signed-off-by: Steven Allen <steven@stebalien.com>
This commit is contained in:
Steven Allen
2017-10-12 19:39:29 -07:00
parent e7acb96c65
commit 93f3117d7d
10 changed files with 2 additions and 1133 deletions

View File

@ -16,12 +16,10 @@ import (
commands "github.com/ipfs/go-ipfs/core/commands"
corehttp "github.com/ipfs/go-ipfs/core/corehttp"
corerepo "github.com/ipfs/go-ipfs/core/corerepo"
"github.com/ipfs/go-ipfs/core/corerouting"
nodeMount "github.com/ipfs/go-ipfs/fuse/node"
fsrepo "github.com/ipfs/go-ipfs/repo/fsrepo"
migrate "github.com/ipfs/go-ipfs/repo/fsrepo/migrations"
pstore "gx/ipfs/QmPgDWmTmuzvP7QE5zwo1TmjbJme9pmZHNujB2453jkCTr/go-libp2p-peerstore"
mprome "gx/ipfs/QmSk46nSD78YiuNojYMS8NW6hSCjH95JajqqzzoychZgef/go-metrics-prometheus"
"gx/ipfs/QmX3QZ5jHEPidwUrymXV1iSCSUhdGxj15sm2gP4jKMef7B/client_golang/prometheus"
"gx/ipfs/QmX3U3YXCQ6UYBxq2LVWF8dARS1hPUTEYLrSx654Qyxyw6/go-multiaddr-net"
@ -304,21 +302,8 @@ func daemonFunc(req cmds.Request, res cmds.Response) {
}
switch routingOption {
case routingOptionSupernodeKwd:
servers, err := cfg.SupernodeRouting.ServerIPFSAddrs()
if err != nil {
res.SetError(err, cmds.ErrNormal)
repo.Close() // because ownership hasn't been transferred to the node
return
}
var infos []pstore.PeerInfo
for _, addr := range servers {
infos = append(infos, pstore.PeerInfo{
ID: addr.ID(),
Addrs: []ma.Multiaddr{addr.Transport()},
})
}
ncfg.Routing = corerouting.SupernodeClient(infos...)
res.SetError(errors.New("supernode routing was never fully implemented and has been removed"), cmds.ErrNormal)
return
case routingOptionDHTClientKwd:
ncfg.Routing = core.DHTClientOption
case routingOptionDHTKwd:

View File

@ -1,52 +0,0 @@
package corerouting
import (
"errors"
context "context"
core "github.com/ipfs/go-ipfs/core"
repo "github.com/ipfs/go-ipfs/repo"
supernode "github.com/ipfs/go-ipfs/routing/supernode"
gcproxy "github.com/ipfs/go-ipfs/routing/supernode/proxy"
routing "gx/ipfs/QmPR2JzfKd9poHx9XBhzoFeBBC31ZM3W5iUPKJZWyaoZZm/go-libp2p-routing"
pstore "gx/ipfs/QmPgDWmTmuzvP7QE5zwo1TmjbJme9pmZHNujB2453jkCTr/go-libp2p-peerstore"
ds "gx/ipfs/QmVSase1JP7cq9QkPT46oNwdp9pT6kBkG3oqS14y3QcZjG/go-datastore"
"gx/ipfs/QmaSxYRuMq4pkpBBG2CYaRrPx2z7NmMVEs34b9g61biQA6/go-libp2p-host"
)
// NB: DHT option is included in the core to avoid 1) because it's a sane
// default and 2) to avoid a circular dependency (it needs to be referenced in
// the core if it's going to be the default)
var errServersMissing = errors.New("supernode routing client requires at least 1 server peer")
// SupernodeServer returns a configuration for a routing server that stores
// routing records to the provided datastore. Only routing records are store in
// the datastore.
func SupernodeServer(recordSource ds.Datastore) core.RoutingOption {
return func(ctx context.Context, ph host.Host, dstore repo.Datastore) (routing.IpfsRouting, error) {
server, err := supernode.NewServer(recordSource, ph.Peerstore(), ph.ID())
if err != nil {
return nil, err
}
proxy := &gcproxy.Loopback{
Handler: server,
Local: ph.ID(),
}
ph.SetStreamHandler(gcproxy.ProtocolSNR, proxy.HandleStream)
return supernode.NewClient(proxy, ph, ph.Peerstore(), ph.ID())
}
}
// TODO doc
func SupernodeClient(remotes ...pstore.PeerInfo) core.RoutingOption {
return func(ctx context.Context, ph host.Host, dstore repo.Datastore) (routing.IpfsRouting, error) {
if len(remotes) < 1 {
return nil, errServersMissing
}
proxy := gcproxy.Standard(ph, remotes)
ph.SetStreamHandler(gcproxy.ProtocolSNR, proxy.HandleStream)
return supernode.NewClient(proxy, ph, ph.Peerstore(), ph.ID())
}
}

View File

@ -1,164 +0,0 @@
package supernode
import (
"bytes"
"context"
"errors"
"time"
proxy "github.com/ipfs/go-ipfs/routing/supernode/proxy"
cid "gx/ipfs/QmNp85zy9RLrQ5oQD4hPyS39ezrrXpcaa7R4Y9kxdWQLLQ/go-cid"
routing "gx/ipfs/QmPR2JzfKd9poHx9XBhzoFeBBC31ZM3W5iUPKJZWyaoZZm/go-libp2p-routing"
pstore "gx/ipfs/QmPgDWmTmuzvP7QE5zwo1TmjbJme9pmZHNujB2453jkCTr/go-libp2p-peerstore"
logging "gx/ipfs/QmSpJByNKFX1sCsHBEp3R73FL4NF6FnQTEGyNAXHm2GS52/go-log"
loggables "gx/ipfs/QmT4PgCNdv73hnFAqzHqwW44q7M9PWpykSswHDxndquZbc/go-libp2p-loggables"
dhtpb "gx/ipfs/QmT7PnPxYkeKPCG8pAnucfcjrXc15Q7FgvFv7YC24EPrw8/go-libp2p-kad-dht/pb"
peer "gx/ipfs/QmXYjuNuxVzXKJCfWasQk1RqkhVLDM9jtUKhqc2WPQmFSB/go-libp2p-peer"
proto "gx/ipfs/QmZ4Qi3GaRbjcx28Sme5eMH7RQjGkt8wHxt2a65oLaeFEV/gogo-protobuf/proto"
"gx/ipfs/QmaSxYRuMq4pkpBBG2CYaRrPx2z7NmMVEs34b9g61biQA6/go-libp2p-host"
pb "gx/ipfs/QmbxkgUceEcuSZ4ZdBA3x74VUDSSYjHYmmeEqkjxbtZ6Jg/go-libp2p-record/pb"
)
var log = logging.Logger("supernode")
type Client struct {
peerhost host.Host
peerstore pstore.Peerstore
proxy proxy.Proxy
local peer.ID
}
// TODO take in datastore/cache
func NewClient(px proxy.Proxy, h host.Host, ps pstore.Peerstore, local peer.ID) (*Client, error) {
return &Client{
proxy: px,
local: local,
peerstore: ps,
peerhost: h,
}, nil
}
func (c *Client) FindProvidersAsync(ctx context.Context, k *cid.Cid, max int) <-chan pstore.PeerInfo {
logging.ContextWithLoggable(ctx, loggables.Uuid("findProviders"))
defer log.EventBegin(ctx, "findProviders", k).Done()
ch := make(chan pstore.PeerInfo)
go func() {
defer close(ch)
request := dhtpb.NewMessage(dhtpb.Message_GET_PROVIDERS, k.KeyString(), 0)
response, err := c.proxy.SendRequest(ctx, request)
if err != nil {
log.Debug(err)
return
}
for _, p := range dhtpb.PBPeersToPeerInfos(response.GetProviderPeers()) {
select {
case <-ctx.Done():
log.Debug(ctx.Err())
return
case ch <- *p:
}
}
}()
return ch
}
func (c *Client) PutValue(ctx context.Context, k string, v []byte) error {
defer log.EventBegin(ctx, "putValue").Done()
r, err := makeRecord(c.peerstore, c.local, k, v)
if err != nil {
return err
}
pmes := dhtpb.NewMessage(dhtpb.Message_PUT_VALUE, string(k), 0)
pmes.Record = r
return c.proxy.SendMessage(ctx, pmes) // wrap to hide the remote
}
func (c *Client) GetValue(ctx context.Context, k string) ([]byte, error) {
defer log.EventBegin(ctx, "getValue").Done()
msg := dhtpb.NewMessage(dhtpb.Message_GET_VALUE, string(k), 0)
response, err := c.proxy.SendRequest(ctx, msg) // TODO wrap to hide the remote
if err != nil {
return nil, err
}
return response.Record.GetValue(), nil
}
func (c *Client) GetValues(ctx context.Context, k string, _ int) ([]routing.RecvdVal, error) {
defer log.EventBegin(ctx, "getValue").Done()
msg := dhtpb.NewMessage(dhtpb.Message_GET_VALUE, string(k), 0)
response, err := c.proxy.SendRequest(ctx, msg) // TODO wrap to hide the remote
if err != nil {
return nil, err
}
return []routing.RecvdVal{
{
Val: response.Record.GetValue(),
From: c.local,
},
}, nil
}
// Provide adds the given key 'k' to the content routing system. If 'brd' is
// true, it announces that content to the network. For the supernode client,
// setting 'brd' to false makes this call a no-op
func (c *Client) Provide(ctx context.Context, k *cid.Cid, brd bool) error {
if !brd {
return nil
}
defer log.EventBegin(ctx, "provide", k).Done()
msg := dhtpb.NewMessage(dhtpb.Message_ADD_PROVIDER, k.KeyString(), 0)
// FIXME how is connectedness defined for the local node
pri := []dhtpb.PeerRoutingInfo{
{
PeerInfo: pstore.PeerInfo{
ID: c.local,
Addrs: c.peerhost.Addrs(),
},
},
}
msg.ProviderPeers = dhtpb.PeerRoutingInfosToPBPeers(pri)
return c.proxy.SendMessage(ctx, msg) // TODO wrap to hide remote
}
func (c *Client) FindPeer(ctx context.Context, id peer.ID) (pstore.PeerInfo, error) {
defer log.EventBegin(ctx, "findPeer", id).Done()
request := dhtpb.NewMessage(dhtpb.Message_FIND_NODE, string(id), 0)
response, err := c.proxy.SendRequest(ctx, request) // hide remote
if err != nil {
return pstore.PeerInfo{}, err
}
for _, p := range dhtpb.PBPeersToPeerInfos(response.GetCloserPeers()) {
if p.ID == id {
return *p, nil
}
}
return pstore.PeerInfo{}, errors.New("could not find peer")
}
// creates and signs a record for the given key/value pair
func makeRecord(ps pstore.Peerstore, p peer.ID, k string, v []byte) (*pb.Record, error) {
blob := bytes.Join([][]byte{[]byte(k), v, []byte(p)}, []byte{})
sig, err := ps.PrivKey(p).Sign(blob)
if err != nil {
return nil, err
}
return &pb.Record{
Key: proto.String(string(k)),
Value: v,
Author: proto.String(string(p)),
Signature: sig,
}, nil
}
func (c *Client) Ping(ctx context.Context, id peer.ID) (time.Duration, error) {
defer log.EventBegin(ctx, "ping", id).Done()
return time.Nanosecond, errors.New("supernode routing does not support the ping method")
}
func (c *Client) Bootstrap(ctx context.Context) error {
return c.proxy.Bootstrap(ctx)
}
var _ routing.IpfsRouting = &Client{}

View File

@ -1,59 +0,0 @@
package proxy
import (
context "context"
inet "gx/ipfs/QmNa31VPzC561NWwRsJLE7nGYZYuuD2QfpK2b1q9BK54J1/go-libp2p-net"
dhtpb "gx/ipfs/QmT7PnPxYkeKPCG8pAnucfcjrXc15Q7FgvFv7YC24EPrw8/go-libp2p-kad-dht/pb"
peer "gx/ipfs/QmXYjuNuxVzXKJCfWasQk1RqkhVLDM9jtUKhqc2WPQmFSB/go-libp2p-peer"
ggio "gx/ipfs/QmZ4Qi3GaRbjcx28Sme5eMH7RQjGkt8wHxt2a65oLaeFEV/gogo-protobuf/io"
)
// RequestHandler handles routing requests locally
type RequestHandler interface {
HandleRequest(ctx context.Context, p peer.ID, m *dhtpb.Message) *dhtpb.Message
}
// Loopback forwards requests to a local handler
type Loopback struct {
Handler RequestHandler
Local peer.ID
}
func (_ *Loopback) Bootstrap(ctx context.Context) error {
return nil
}
// SendMessage intercepts local requests, forwarding them to a local handler
func (lb *Loopback) SendMessage(ctx context.Context, m *dhtpb.Message) error {
response := lb.Handler.HandleRequest(ctx, lb.Local, m)
if response != nil {
log.Warning("loopback handler returned unexpected message")
}
return nil
}
// SendRequest intercepts local requests, forwarding them to a local handler
func (lb *Loopback) SendRequest(ctx context.Context, m *dhtpb.Message) (*dhtpb.Message, error) {
return lb.Handler.HandleRequest(ctx, lb.Local, m), nil
}
func (lb *Loopback) HandleStream(s inet.Stream) {
defer s.Close()
pbr := ggio.NewDelimitedReader(s, inet.MessageSizeMax)
var incoming dhtpb.Message
if err := pbr.ReadMsg(&incoming); err != nil {
s.Reset()
log.Debug(err)
return
}
ctx := context.TODO()
outgoing := lb.Handler.HandleRequest(ctx, s.Conn().RemotePeer(), &incoming)
pbw := ggio.NewDelimitedWriter(s)
if err := pbw.WriteMsg(outgoing); err != nil {
s.Reset()
log.Debug(err)
return
}
}

View File

@ -1,174 +0,0 @@
package proxy
import (
"context"
"errors"
inet "gx/ipfs/QmNa31VPzC561NWwRsJLE7nGYZYuuD2QfpK2b1q9BK54J1/go-libp2p-net"
pstore "gx/ipfs/QmPgDWmTmuzvP7QE5zwo1TmjbJme9pmZHNujB2453jkCTr/go-libp2p-peerstore"
kbucket "gx/ipfs/QmSAFA8v42u4gpJNy1tb7vW3JiiXiaYDC2b845c2RnNSJL/go-libp2p-kbucket"
logging "gx/ipfs/QmSpJByNKFX1sCsHBEp3R73FL4NF6FnQTEGyNAXHm2GS52/go-log"
loggables "gx/ipfs/QmT4PgCNdv73hnFAqzHqwW44q7M9PWpykSswHDxndquZbc/go-libp2p-loggables"
dhtpb "gx/ipfs/QmT7PnPxYkeKPCG8pAnucfcjrXc15Q7FgvFv7YC24EPrw8/go-libp2p-kad-dht/pb"
peer "gx/ipfs/QmXYjuNuxVzXKJCfWasQk1RqkhVLDM9jtUKhqc2WPQmFSB/go-libp2p-peer"
ggio "gx/ipfs/QmZ4Qi3GaRbjcx28Sme5eMH7RQjGkt8wHxt2a65oLaeFEV/gogo-protobuf/io"
host "gx/ipfs/QmaSxYRuMq4pkpBBG2CYaRrPx2z7NmMVEs34b9g61biQA6/go-libp2p-host"
)
const ProtocolSNR = "/ipfs/supernoderouting"
var log = logging.Logger("supernode/proxy")
type Proxy interface {
Bootstrap(context.Context) error
HandleStream(inet.Stream)
SendMessage(ctx context.Context, m *dhtpb.Message) error
SendRequest(ctx context.Context, m *dhtpb.Message) (*dhtpb.Message, error)
}
type standard struct {
Host host.Host
remoteInfos []pstore.PeerInfo // addr required for bootstrapping
remoteIDs []peer.ID // []ID is required for each req. here, cached for performance.
}
func Standard(h host.Host, remotes []pstore.PeerInfo) Proxy {
var ids []peer.ID
for _, remote := range remotes {
ids = append(ids, remote.ID)
}
return &standard{h, remotes, ids}
}
func (px *standard) Bootstrap(ctx context.Context) error {
var cxns []pstore.PeerInfo
for _, info := range px.remoteInfos {
if err := px.Host.Connect(ctx, info); err != nil {
continue
}
cxns = append(cxns, info)
}
if len(cxns) == 0 {
log.Error("unable to bootstrap to any supernode routers")
} else {
log.Infof("bootstrapped to %d supernode routers: %s", len(cxns), cxns)
}
return nil
}
func (p *standard) HandleStream(s inet.Stream) {
// TODO(brian): Should clients be able to satisfy requests?
log.Error("supernode client received (dropped) a routing message from", s.Conn().RemotePeer())
s.Reset()
}
const replicationFactor = 2
// SendMessage sends message to each remote sequentially (randomized order),
// stopping after the first successful response. If all fail, returns the last
// error.
func (px *standard) SendMessage(ctx context.Context, m *dhtpb.Message) error {
var err error
var numSuccesses int
for _, remote := range sortedByKey(px.remoteIDs, m.GetKey()) {
if err = px.sendMessage(ctx, m, remote); err != nil { // careful don't re-declare err!
continue
}
numSuccesses++
switch m.GetType() {
case dhtpb.Message_ADD_PROVIDER, dhtpb.Message_PUT_VALUE:
if numSuccesses < replicationFactor {
continue
}
}
return nil // success
}
return err // NB: returns the last error
}
func (px *standard) sendMessage(ctx context.Context, m *dhtpb.Message, remote peer.ID) (err error) {
e := log.EventBegin(ctx, "sendRoutingMessage", px.Host.ID(), remote, m)
defer func() {
if err != nil {
e.SetError(err)
}
e.Done()
}()
if err = px.Host.Connect(ctx, pstore.PeerInfo{ID: remote}); err != nil {
return err
}
s, err := px.Host.NewStream(ctx, remote, ProtocolSNR)
if err != nil {
return err
}
pbw := ggio.NewDelimitedWriter(s)
err = pbw.WriteMsg(m)
if err == nil {
s.Close()
} else {
s.Reset()
}
return err
}
// SendRequest sends the request to each remote sequentially (randomized order),
// stopping after the first successful response. If all fail, returns the last
// error.
func (px *standard) SendRequest(ctx context.Context, m *dhtpb.Message) (*dhtpb.Message, error) {
var err error
for _, remote := range sortedByKey(px.remoteIDs, m.GetKey()) {
var reply *dhtpb.Message
reply, err = px.sendRequest(ctx, m, remote) // careful don't redeclare err!
if err != nil {
continue
}
return reply, nil // success
}
return nil, err // NB: returns the last error
}
func (px *standard) sendRequest(ctx context.Context, m *dhtpb.Message, remote peer.ID) (*dhtpb.Message, error) {
e := log.EventBegin(ctx, "sendRoutingRequest", px.Host.ID(), remote, logging.Pair("request", m))
defer e.Done()
if err := px.Host.Connect(ctx, pstore.PeerInfo{ID: remote}); err != nil {
e.SetError(err)
return nil, err
}
s, err := px.Host.NewStream(ctx, remote, ProtocolSNR)
if err != nil {
e.SetError(err)
return nil, err
}
defer s.Close()
r := ggio.NewDelimitedReader(s, inet.MessageSizeMax)
w := ggio.NewDelimitedWriter(s)
if err = w.WriteMsg(m); err != nil {
s.Reset()
e.SetError(err)
return nil, err
}
response := &dhtpb.Message{}
if err = r.ReadMsg(response); err != nil {
s.Reset()
e.SetError(err)
return nil, err
}
// need ctx expiration?
if response == nil {
s.Reset()
err := errors.New("no response to request")
e.SetError(err)
return nil, err
}
e.Append(logging.Pair("response", response))
e.Append(logging.Pair("uuid", loggables.Uuid("foo")))
return response, nil
}
func sortedByKey(peers []peer.ID, skey string) []peer.ID {
target := kbucket.ConvertKey(skey)
return kbucket.SortClosestPeers(peers, target)
}

View File

@ -1,202 +0,0 @@
package supernode
import (
"context"
"errors"
"fmt"
proxy "github.com/ipfs/go-ipfs/routing/supernode/proxy"
dshelp "github.com/ipfs/go-ipfs/thirdparty/ds-help"
pstore "gx/ipfs/QmPgDWmTmuzvP7QE5zwo1TmjbJme9pmZHNujB2453jkCTr/go-libp2p-peerstore"
dhtpb "gx/ipfs/QmT7PnPxYkeKPCG8pAnucfcjrXc15Q7FgvFv7YC24EPrw8/go-libp2p-kad-dht/pb"
datastore "gx/ipfs/QmVSase1JP7cq9QkPT46oNwdp9pT6kBkG3oqS14y3QcZjG/go-datastore"
peer "gx/ipfs/QmXYjuNuxVzXKJCfWasQk1RqkhVLDM9jtUKhqc2WPQmFSB/go-libp2p-peer"
proto "gx/ipfs/QmZ4Qi3GaRbjcx28Sme5eMH7RQjGkt8wHxt2a65oLaeFEV/gogo-protobuf/proto"
pb "gx/ipfs/QmbxkgUceEcuSZ4ZdBA3x74VUDSSYjHYmmeEqkjxbtZ6Jg/go-libp2p-record/pb"
)
// Server handles routing queries using a database backend
type Server struct {
local peer.ID
routingBackend datastore.Datastore
peerstore pstore.Peerstore
*proxy.Loopback // so server can be injected into client
}
// NewServer creates a new Supernode routing Server
func NewServer(ds datastore.Datastore, ps pstore.Peerstore, local peer.ID) (*Server, error) {
s := &Server{local, ds, ps, nil}
s.Loopback = &proxy.Loopback{
Handler: s,
Local: local,
}
return s, nil
}
func (_ *Server) Bootstrap(ctx context.Context) error {
return nil
}
// HandleLocalRequest implements the proxy.RequestHandler interface. This is
// where requests are received from the outside world.
func (s *Server) HandleRequest(ctx context.Context, p peer.ID, req *dhtpb.Message) *dhtpb.Message {
_, response := s.handleMessage(ctx, p, req) // ignore response peer. it's local.
return response
}
func (s *Server) handleMessage(
ctx context.Context, p peer.ID, req *dhtpb.Message) (peer.ID, *dhtpb.Message) {
defer log.EventBegin(ctx, "routingMessageReceived", req, p).Done()
var response = dhtpb.NewMessage(req.GetType(), req.GetKey(), req.GetClusterLevel())
switch req.GetType() {
case dhtpb.Message_GET_VALUE:
rawRecord, err := getRoutingRecord(s.routingBackend, req.GetKey())
if err != nil {
return "", nil
}
response.Record = rawRecord
return p, response
case dhtpb.Message_PUT_VALUE:
// FIXME: verify complains that the peer's ID is not present in the
// peerstore. Mocknet problem?
// if err := verify(s.peerstore, req.GetRecord()); err != nil {
// log.Event(ctx, "validationFailed", req, p)
// return "", nil
// }
putRoutingRecord(s.routingBackend, req.GetKey(), req.GetRecord())
return p, req
case dhtpb.Message_FIND_NODE:
p := s.peerstore.PeerInfo(peer.ID(req.GetKey()))
pri := []dhtpb.PeerRoutingInfo{
{
PeerInfo: p,
// Connectedness: TODO
},
}
response.CloserPeers = dhtpb.PeerRoutingInfosToPBPeers(pri)
return p.ID, response
case dhtpb.Message_ADD_PROVIDER:
for _, provider := range req.GetProviderPeers() {
providerID := peer.ID(provider.GetId())
if providerID == p {
store := []*dhtpb.Message_Peer{provider}
storeProvidersToPeerstore(s.peerstore, p, store)
if err := putRoutingProviders(s.routingBackend, req.GetKey(), store); err != nil {
return "", nil
}
} else {
log.Event(ctx, "addProviderBadRequest", p, req)
}
}
return "", nil
case dhtpb.Message_GET_PROVIDERS:
providers, err := getRoutingProviders(s.routingBackend, req.GetKey())
if err != nil {
return "", nil
}
response.ProviderPeers = providers
return p, response
case dhtpb.Message_PING:
return p, req
default:
}
return "", nil
}
var _ proxy.RequestHandler = &Server{}
var _ proxy.Proxy = &Server{}
func getRoutingRecord(ds datastore.Datastore, k string) (*pb.Record, error) {
dskey := dshelp.NewKeyFromBinary([]byte(k))
val, err := ds.Get(dskey)
if err != nil {
return nil, err
}
recordBytes, ok := val.([]byte)
if !ok {
return nil, fmt.Errorf("datastore had non byte-slice value for %v", dskey)
}
var record pb.Record
if err := proto.Unmarshal(recordBytes, &record); err != nil {
return nil, errors.New("failed to unmarshal dht record from datastore")
}
return &record, nil
}
func putRoutingRecord(ds datastore.Datastore, k string, value *pb.Record) error {
data, err := proto.Marshal(value)
if err != nil {
return err
}
dskey := dshelp.NewKeyFromBinary([]byte(k))
// TODO namespace
return ds.Put(dskey, data)
}
func putRoutingProviders(ds datastore.Datastore, k string, newRecords []*dhtpb.Message_Peer) error {
log.Event(context.Background(), "putRoutingProviders")
oldRecords, err := getRoutingProviders(ds, k)
if err != nil {
return err
}
mergedRecords := make(map[string]*dhtpb.Message_Peer)
for _, provider := range oldRecords {
mergedRecords[provider.GetId()] = provider // add original records
}
for _, provider := range newRecords {
mergedRecords[provider.GetId()] = provider // overwrite old record if new exists
}
var protomsg dhtpb.Message
protomsg.ProviderPeers = make([]*dhtpb.Message_Peer, 0, len(mergedRecords))
for _, provider := range mergedRecords {
protomsg.ProviderPeers = append(protomsg.ProviderPeers, provider)
}
data, err := proto.Marshal(&protomsg)
if err != nil {
return err
}
return ds.Put(providerKey(k), data)
}
func storeProvidersToPeerstore(ps pstore.Peerstore, p peer.ID, providers []*dhtpb.Message_Peer) {
for _, provider := range providers {
providerID := peer.ID(provider.GetId())
if providerID != p {
log.Errorf("provider message came from third-party %s", p)
continue
}
for _, maddr := range provider.Addresses() {
// as a router, we want to store addresses for peers who have provided
ps.AddAddr(p, maddr, pstore.AddressTTL)
}
}
}
func getRoutingProviders(ds datastore.Datastore, k string) ([]*dhtpb.Message_Peer, error) {
e := log.EventBegin(context.Background(), "getProviders")
defer e.Done()
var providers []*dhtpb.Message_Peer
if v, err := ds.Get(providerKey(k)); err == nil {
if data, ok := v.([]byte); ok {
var msg dhtpb.Message
if err := proto.Unmarshal(data, &msg); err != nil {
return nil, err
}
providers = append(providers, msg.GetProviderPeers()...)
}
}
return providers, nil
}
func providerKey(k string) datastore.Key {
return datastore.KeyWithNamespaces([]string{"routing", "providers", k})
}

View File

@ -1,39 +0,0 @@
package supernode
import (
"testing"
dhtpb "gx/ipfs/QmT7PnPxYkeKPCG8pAnucfcjrXc15Q7FgvFv7YC24EPrw8/go-libp2p-kad-dht/pb"
datastore "gx/ipfs/QmVSase1JP7cq9QkPT46oNwdp9pT6kBkG3oqS14y3QcZjG/go-datastore"
)
func TestPutProviderDoesntResultInDuplicates(t *testing.T) {
routingBackend := datastore.NewMapDatastore()
k := "foo"
put := []*dhtpb.Message_Peer{
convPeer("bob", "127.0.0.1/tcp/4001"),
convPeer("alice", "10.0.0.10/tcp/4001"),
}
if err := putRoutingProviders(routingBackend, k, put); err != nil {
t.Fatal(err)
}
if err := putRoutingProviders(routingBackend, k, put); err != nil {
t.Fatal(err)
}
got, err := getRoutingProviders(routingBackend, k)
if err != nil {
t.Fatal(err)
}
if len(got) != 2 {
t.Fatal("should be 2 values, but there are", len(got))
}
}
func convPeer(name string, addrs ...string) *dhtpb.Message_Peer {
var rawAddrs [][]byte
for _, addr := range addrs {
rawAddrs = append(rawAddrs, []byte(addr))
}
return &dhtpb.Message_Peer{Id: &name, Addrs: rawAddrs}
}

View File

@ -1,180 +0,0 @@
package integrationtest
import (
"bytes"
"errors"
"fmt"
"io"
"math"
"testing"
context "context"
core "github.com/ipfs/go-ipfs/core"
"github.com/ipfs/go-ipfs/core/corerouting"
"github.com/ipfs/go-ipfs/core/coreunix"
mock "github.com/ipfs/go-ipfs/core/mock"
ds2 "github.com/ipfs/go-ipfs/thirdparty/datastore2"
"github.com/ipfs/go-ipfs/thirdparty/unit"
pstore "gx/ipfs/QmPgDWmTmuzvP7QE5zwo1TmjbJme9pmZHNujB2453jkCTr/go-libp2p-peerstore"
testutil "gx/ipfs/QmQgLZP9haZheimMHqqAjJh2LhRmNfEoZDfbtkpeMhi9xK/go-testutil"
mocknet "gx/ipfs/QmRQ76P5dgvxTujhfPsCRAG83rC15jgb1G9bKLuomuC6dQ/go-libp2p/p2p/net/mock"
)
func TestSupernodeBootstrappedAddCat(t *testing.T) {
// create 8 supernode-routing bootstrap nodes
// create 2 supernode-routing clients both bootstrapped to the bootstrap nodes
// let the bootstrap nodes share a single datastore
// add a large file on one node then cat the file from the other
conf := testutil.LatencyConfig{
NetworkLatency: 0,
RoutingLatency: 0,
BlockstoreLatency: 0,
}
if err := RunSupernodeBootstrappedAddCat(RandomBytes(100*unit.MB), conf); err != nil {
t.Fatal(err)
}
}
func RunSupernodeBootstrappedAddCat(data []byte, conf testutil.LatencyConfig) error {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
servers, clients, err := InitializeSupernodeNetwork(ctx, 8, 2, conf)
if err != nil {
return err
}
for _, n := range append(servers, clients...) {
defer n.Close()
}
adder := clients[0]
catter := clients[1]
log.Info("adder is", adder.Identity)
log.Info("catter is", catter.Identity)
keyAdded, err := coreunix.Add(adder, bytes.NewReader(data))
if err != nil {
return err
}
readerCatted, err := coreunix.Cat(ctx, catter, keyAdded)
if err != nil {
return err
}
// verify
bufout := new(bytes.Buffer)
io.Copy(bufout, readerCatted)
if 0 != bytes.Compare(bufout.Bytes(), data) {
return errors.New("catted data does not match added data")
}
cancel()
return nil
}
func InitializeSupernodeNetwork(
ctx context.Context,
numServers, numClients int,
conf testutil.LatencyConfig) ([]*core.IpfsNode, []*core.IpfsNode, error) {
// create network
mn := mocknet.New(ctx)
mn.SetLinkDefaults(mocknet.LinkOptions{
Latency: conf.NetworkLatency,
Bandwidth: math.MaxInt32,
})
routingDatastore := ds2.ThreadSafeCloserMapDatastore()
var servers []*core.IpfsNode
for i := 0; i < numServers; i++ {
bootstrap, err := core.NewNode(ctx, &core.BuildCfg{
Online: true,
Host: mock.MockHostOption(mn),
Routing: corerouting.SupernodeServer(routingDatastore),
})
if err != nil {
return nil, nil, err
}
servers = append(servers, bootstrap)
}
var bootstrapInfos []pstore.PeerInfo
for _, n := range servers {
info := n.Peerstore.PeerInfo(n.PeerHost.ID())
bootstrapInfos = append(bootstrapInfos, info)
}
var clients []*core.IpfsNode
for i := 0; i < numClients; i++ {
n, err := core.NewNode(ctx, &core.BuildCfg{
Online: true,
Host: mock.MockHostOption(mn),
Routing: corerouting.SupernodeClient(bootstrapInfos...),
})
if err != nil {
return nil, nil, err
}
clients = append(clients, n)
}
mn.LinkAll()
bcfg := core.BootstrapConfigWithPeers(bootstrapInfos)
for _, n := range clients {
if err := n.Bootstrap(bcfg); err != nil {
return nil, nil, err
}
}
return servers, clients, nil
}
func TestSupernodePutRecordGetRecord(t *testing.T) {
// create 8 supernode-routing bootstrap nodes
// create 2 supernode-routing clients both bootstrapped to the bootstrap nodes
// let the bootstrap nodes share a single datastore
// add a large file on one node then cat the file from the other
conf := testutil.LatencyConfig{
NetworkLatency: 0,
RoutingLatency: 0,
BlockstoreLatency: 0,
}
if err := RunSupernodePutRecordGetRecord(conf); err != nil {
t.Fatal(err)
}
}
func RunSupernodePutRecordGetRecord(conf testutil.LatencyConfig) error {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
servers, clients, err := InitializeSupernodeNetwork(ctx, 2, 2, conf)
if err != nil {
return err
}
for _, n := range append(servers, clients...) {
defer n.Close()
}
putter := clients[0]
getter := clients[1]
k := "key"
note := []byte("a note from putter")
if err := putter.Routing.PutValue(ctx, k, note); err != nil {
return fmt.Errorf("failed to put value: %s", err)
}
received, err := getter.Routing.GetValue(ctx, k)
if err != nil {
return fmt.Errorf("failed to get value: %s", err)
}
if 0 != bytes.Compare(note, received) {
return errors.New("record doesn't match")
}
cancel()
return nil
}

View File

@ -1 +0,0 @@
.ipfs/

View File

@ -1,245 +0,0 @@
package main
import (
"bytes"
"flag"
"fmt"
"io"
"io/ioutil"
"log"
"math"
"os"
gopath "path"
"time"
random "github.com/ipfs/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-random"
commands "github.com/ipfs/go-ipfs/commands"
core "github.com/ipfs/go-ipfs/core"
corehttp "github.com/ipfs/go-ipfs/core/corehttp"
corerouting "github.com/ipfs/go-ipfs/core/corerouting"
"github.com/ipfs/go-ipfs/core/coreunix"
"github.com/ipfs/go-ipfs/repo"
config "github.com/ipfs/go-ipfs/repo/config"
fsrepo "github.com/ipfs/go-ipfs/repo/fsrepo"
ds2 "github.com/ipfs/go-ipfs/thirdparty/datastore2"
"github.com/ipfs/go-ipfs/thirdparty/ipfsaddr"
unit "github.com/ipfs/go-ipfs/thirdparty/unit"
context "context"
pstore "gx/ipfs/QmPgDWmTmuzvP7QE5zwo1TmjbJme9pmZHNujB2453jkCTr/go-libp2p-peerstore"
logging "gx/ipfs/QmSpJByNKFX1sCsHBEp3R73FL4NF6FnQTEGyNAXHm2GS52/go-log"
ma "gx/ipfs/QmXY77cVe7rVRQXZZQRioukUM7aRW3BTcAgJe12MCtb3Ji/go-multiaddr"
)
var elog = logging.Logger("gc-client")
var (
cat = flag.Bool("cat", false, "else add")
seed = flag.Int64("seed", 1, "")
nBitsForKeypair = flag.Int("b", 1024, "number of bits for keypair (if repo is uninitialized)")
)
func main() {
flag.Parse()
if err := run(); err != nil {
fmt.Fprintf(os.Stderr, "error: %s\n", err)
os.Exit(1)
}
}
func run() error {
servers := config.DefaultSNRServers
fmt.Println("using gcr remotes:")
for _, p := range servers {
fmt.Println("\t", p)
}
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
cwd, err := os.Getwd()
if err != nil {
return err
}
repoPath := gopath.Join(cwd, config.DefaultPathName)
_ = ensureRepoInitialized(repoPath)
repo, err := fsrepo.Open(repoPath)
if err != nil { // owned by node
return err
}
cfg, err := repo.Config()
if err != nil {
return err
}
cfg.Bootstrap = servers
if err := repo.SetConfig(cfg); err != nil {
return err
}
var addrs []ipfsaddr.IPFSAddr
for _, info := range servers {
addr, err := ipfsaddr.ParseString(info)
if err != nil {
return err
}
addrs = append(addrs, addr)
}
var infos []pstore.PeerInfo
for _, addr := range addrs {
infos = append(infos, pstore.PeerInfo{
ID: addr.ID(),
Addrs: []ma.Multiaddr{addr.Transport()},
})
}
node, err := core.NewNode(ctx, &core.BuildCfg{
Online: true,
Repo: repo,
Routing: corerouting.SupernodeClient(infos...),
})
if err != nil {
return err
}
defer node.Close()
opts := []corehttp.ServeOption{
corehttp.CommandsOption(cmdCtx(node, repoPath)),
corehttp.GatewayOption(false),
}
if *cat {
if err := runFileCattingWorker(ctx, node); err != nil {
return err
}
} else {
if err := runFileAddingWorker(node); err != nil {
return err
}
}
return corehttp.ListenAndServe(node, cfg.Addresses.API, opts...)
}
func ensureRepoInitialized(path string) error {
if !fsrepo.IsInitialized(path) {
conf, err := config.Init(ioutil.Discard, *nBitsForKeypair)
if err != nil {
return err
}
if err := fsrepo.Init(path, conf); err != nil {
return err
}
}
return nil
}
func sizeOfIthFile(i int64) int64 {
return (1 << uint64(i)) * unit.KB
}
func runFileAddingWorker(n *core.IpfsNode) error {
errs := make(chan error)
go func() {
var i int64
for i = 1; i < math.MaxInt32; i++ {
piper, pipew := io.Pipe()
go func() {
defer pipew.Close()
if err := random.WritePseudoRandomBytes(sizeOfIthFile(i), pipew, *seed); err != nil {
errs <- err
}
}()
k, err := coreunix.Add(n, piper)
if err != nil {
errs <- err
}
log.Println("added file", "seed", *seed, "#", i, "key", k, "size", unit.Information(sizeOfIthFile(i)))
time.Sleep(1 * time.Second)
}
}()
var i int64
for i = 0; i < math.MaxInt32; i++ {
err := <-errs
if err != nil {
log.Fatal(err)
}
}
return nil
}
func runFileCattingWorker(ctx context.Context, n *core.IpfsNode) error {
conf, err := config.Init(ioutil.Discard, *nBitsForKeypair)
if err != nil {
return err
}
r := &repo.Mock{
D: ds2.ThreadSafeCloserMapDatastore(),
C: *conf,
}
dummy, err := core.NewNode(ctx, &core.BuildCfg{
Repo: r,
})
if err != nil {
return err
}
errs := make(chan error)
go func() {
defer dummy.Close()
var i int64 = 1
for {
buf := new(bytes.Buffer)
if err := random.WritePseudoRandomBytes(sizeOfIthFile(i), buf, *seed); err != nil {
errs <- err
}
// add to a dummy node to discover the key
k, err := coreunix.Add(dummy, bytes.NewReader(buf.Bytes()))
if err != nil {
errs <- err
}
e := elog.EventBegin(ctx, "cat", logging.LoggableF(func() map[string]interface{} {
return map[string]interface{}{
"key": k,
"localPeer": n.Identity,
}
}))
if r, err := coreunix.Cat(ctx, n, k); err != nil {
e.Done()
log.Printf("failed to cat file. seed: %d #%d key: %s err: %s", *seed, i, k, err)
} else {
log.Println("found file", "seed", *seed, "#", i, "key", k, "size", unit.Information(sizeOfIthFile(i)))
io.Copy(ioutil.Discard, r)
e.Done()
log.Println("catted file", "seed", *seed, "#", i, "key", k, "size", unit.Information(sizeOfIthFile(i)))
i++
}
time.Sleep(time.Second)
}
}()
err = <-errs
if err != nil {
log.Fatal(err)
}
return nil
}
func cmdCtx(node *core.IpfsNode, repoPath string) commands.Context {
return commands.Context{
Online: true,
ConfigRoot: repoPath,
LoadConfig: func(path string) (*config.Config, error) {
return node.Repo.Config()
},
ConstructNode: func() (*core.IpfsNode, error) {
return node, nil
},
}
}