mirror of
https://github.com/ipfs/kubo.git
synced 2025-06-03 02:56:09 +08:00
159 lines
4.6 KiB
Go
159 lines
4.6 KiB
Go
package supernode
|
|
|
|
import (
|
|
"bytes"
|
|
"errors"
|
|
"time"
|
|
|
|
key "gx/ipfs/Qmce4Y4zg3sYr7xKM5UueS67vhNni6EeWgCRnb7MbLJMew/go-key"
|
|
|
|
routing "github.com/ipfs/go-ipfs/routing"
|
|
pb "github.com/ipfs/go-ipfs/routing/dht/pb"
|
|
proxy "github.com/ipfs/go-ipfs/routing/supernode/proxy"
|
|
loggables "gx/ipfs/QmYrv4LgCC8FhG2Ab4bwuq5DqBdwMtx3hMb3KKJDZcr2d7/go-libp2p-loggables"
|
|
|
|
logging "gx/ipfs/QmSpJByNKFX1sCsHBEp3R73FL4NF6FnQTEGyNAXHm2GS52/go-log"
|
|
peer "gx/ipfs/QmWXjJo15p4pzT7cayEwZi2sWgJqLnGDof6ZGMh9xBgU1p/go-libp2p-peer"
|
|
proto "gx/ipfs/QmZ4Qi3GaRbjcx28Sme5eMH7RQjGkt8wHxt2a65oLaeFEV/gogo-protobuf/proto"
|
|
context "gx/ipfs/QmZy2y8t9zQH2a1b8q2ZSLKp17ATuJoCNxxyMFG5qFExpt/go-net/context"
|
|
"gx/ipfs/QmcpZpCmnfjRunzeYtXZdtcy16P2mC65CThjb7aA8sPqNY/go-libp2p/p2p/host"
|
|
pstore "gx/ipfs/QmdMfSLMDBDYhtc4oF3NYGCZr5dy4wQb6Ji26N4D4mdxa2/go-libp2p-peerstore"
|
|
)
|
|
|
|
var log = logging.Logger("supernode")
|
|
|
|
type Client struct {
|
|
peerhost host.Host
|
|
peerstore pstore.Peerstore
|
|
proxy proxy.Proxy
|
|
local peer.ID
|
|
}
|
|
|
|
// TODO take in datastore/cache
|
|
func NewClient(px proxy.Proxy, h host.Host, ps pstore.Peerstore, local peer.ID) (*Client, error) {
|
|
return &Client{
|
|
proxy: px,
|
|
local: local,
|
|
peerstore: ps,
|
|
peerhost: h,
|
|
}, nil
|
|
}
|
|
|
|
func (c *Client) FindProvidersAsync(ctx context.Context, k key.Key, max int) <-chan pstore.PeerInfo {
|
|
logging.ContextWithLoggable(ctx, loggables.Uuid("findProviders"))
|
|
defer log.EventBegin(ctx, "findProviders", &k).Done()
|
|
ch := make(chan pstore.PeerInfo)
|
|
go func() {
|
|
defer close(ch)
|
|
request := pb.NewMessage(pb.Message_GET_PROVIDERS, string(k), 0)
|
|
response, err := c.proxy.SendRequest(ctx, request)
|
|
if err != nil {
|
|
log.Debug(err)
|
|
return
|
|
}
|
|
for _, p := range pb.PBPeersToPeerInfos(response.GetProviderPeers()) {
|
|
select {
|
|
case <-ctx.Done():
|
|
log.Debug(ctx.Err())
|
|
return
|
|
case ch <- p:
|
|
}
|
|
}
|
|
}()
|
|
return ch
|
|
}
|
|
|
|
func (c *Client) PutValue(ctx context.Context, k key.Key, v []byte) error {
|
|
defer log.EventBegin(ctx, "putValue", &k).Done()
|
|
r, err := makeRecord(c.peerstore, c.local, k, v)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
pmes := pb.NewMessage(pb.Message_PUT_VALUE, string(k), 0)
|
|
pmes.Record = r
|
|
return c.proxy.SendMessage(ctx, pmes) // wrap to hide the remote
|
|
}
|
|
|
|
func (c *Client) GetValue(ctx context.Context, k key.Key) ([]byte, error) {
|
|
defer log.EventBegin(ctx, "getValue", &k).Done()
|
|
msg := pb.NewMessage(pb.Message_GET_VALUE, string(k), 0)
|
|
response, err := c.proxy.SendRequest(ctx, msg) // TODO wrap to hide the remote
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
return response.Record.GetValue(), nil
|
|
}
|
|
|
|
func (c *Client) GetValues(ctx context.Context, k key.Key, _ int) ([]routing.RecvdVal, error) {
|
|
defer log.EventBegin(ctx, "getValue", &k).Done()
|
|
msg := pb.NewMessage(pb.Message_GET_VALUE, string(k), 0)
|
|
response, err := c.proxy.SendRequest(ctx, msg) // TODO wrap to hide the remote
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
return []routing.RecvdVal{
|
|
{
|
|
Val: response.Record.GetValue(),
|
|
From: c.local,
|
|
},
|
|
}, nil
|
|
}
|
|
|
|
func (c *Client) Provide(ctx context.Context, k key.Key) error {
|
|
defer log.EventBegin(ctx, "provide", &k).Done()
|
|
msg := pb.NewMessage(pb.Message_ADD_PROVIDER, string(k), 0)
|
|
// FIXME how is connectedness defined for the local node
|
|
pri := []pb.PeerRoutingInfo{
|
|
{
|
|
PeerInfo: pstore.PeerInfo{
|
|
ID: c.local,
|
|
Addrs: c.peerhost.Addrs(),
|
|
},
|
|
},
|
|
}
|
|
msg.ProviderPeers = pb.PeerRoutingInfosToPBPeers(pri)
|
|
return c.proxy.SendMessage(ctx, msg) // TODO wrap to hide remote
|
|
}
|
|
|
|
func (c *Client) FindPeer(ctx context.Context, id peer.ID) (pstore.PeerInfo, error) {
|
|
defer log.EventBegin(ctx, "findPeer", id).Done()
|
|
request := pb.NewMessage(pb.Message_FIND_NODE, string(id), 0)
|
|
response, err := c.proxy.SendRequest(ctx, request) // hide remote
|
|
if err != nil {
|
|
return pstore.PeerInfo{}, err
|
|
}
|
|
for _, p := range pb.PBPeersToPeerInfos(response.GetCloserPeers()) {
|
|
if p.ID == id {
|
|
return p, nil
|
|
}
|
|
}
|
|
return pstore.PeerInfo{}, errors.New("could not find peer")
|
|
}
|
|
|
|
// creates and signs a record for the given key/value pair
|
|
func makeRecord(ps pstore.Peerstore, p peer.ID, k key.Key, v []byte) (*pb.Record, error) {
|
|
blob := bytes.Join([][]byte{[]byte(k), v, []byte(p)}, []byte{})
|
|
sig, err := ps.PrivKey(p).Sign(blob)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
return &pb.Record{
|
|
Key: proto.String(string(k)),
|
|
Value: v,
|
|
Author: proto.String(string(p)),
|
|
Signature: sig,
|
|
}, nil
|
|
}
|
|
|
|
func (c *Client) Ping(ctx context.Context, id peer.ID) (time.Duration, error) {
|
|
defer log.EventBegin(ctx, "ping", id).Done()
|
|
return time.Nanosecond, errors.New("supernode routing does not support the ping method")
|
|
}
|
|
|
|
func (c *Client) Bootstrap(ctx context.Context) error {
|
|
return c.proxy.Bootstrap(ctx)
|
|
}
|
|
|
|
var _ routing.IpfsRouting = &Client{}
|