1
0
mirror of https://github.com/ipfs/kubo.git synced 2025-06-29 17:36:38 +08:00

wip with DHT

@whyrusleeping @jbenet this is a WIP with the DHT.

wip

License: MIT
Signed-off-by: Brian Tiger Chow <brian@perfmode.com>

Conflicts:
	epictest/addcat_test.go
	exchange/bitswap/testnet/peernet.go
	exchange/bitswap/testutils.go
	routing/mock/centralized_server.go
	routing/mock/centralized_test.go
	routing/mock/interface.go

fix(routing/mock) fill in function definition
This commit is contained in:
Brian Tiger Chow
2014-12-17 10:02:19 -08:00
parent 14990bb556
commit ca32a83394
15 changed files with 127 additions and 68 deletions

View File

@ -1,6 +1,7 @@
package core
import (
context "github.com/jbenet/go-ipfs/Godeps/_workspace/src/code.google.com/p/go.net/context"
ds "github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-datastore"
syncds "github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-datastore/sync"
"github.com/jbenet/go-ipfs/blocks/blockstore"
@ -11,12 +12,13 @@ import (
nsys "github.com/jbenet/go-ipfs/namesys"
path "github.com/jbenet/go-ipfs/path"
peer "github.com/jbenet/go-ipfs/peer"
mdht "github.com/jbenet/go-ipfs/routing/mock"
dht "github.com/jbenet/go-ipfs/routing/dht"
ds2 "github.com/jbenet/go-ipfs/util/datastore2"
)
// NewMockNode constructs an IpfsNode for use in tests.
func NewMockNode() (*IpfsNode, error) {
ctx := context.TODO()
nd := new(IpfsNode)
// Generate Identity
@ -41,8 +43,7 @@ func NewMockNode() (*IpfsNode, error) {
nd.Datastore = ds2.CloserWrap(syncds.MutexWrap(dstore))
// Routing
dht := mdht.NewServer().ClientWithDatastore(peer.PeerInfo{ID: p}, nd.Datastore)
nd.Routing = dht
nd.Routing = dht.NewDHT(ctx, nd.Identity, nd.Network, nd.Datastore)
// Bitswap
bstore := blockstore.NewBlockstore(nd.Datastore)
@ -54,7 +55,7 @@ func NewMockNode() (*IpfsNode, error) {
nd.DAG = mdag.NewDAGService(bserv)
// Namespace resolver
nd.Namesys = nsys.NewNameSystem(dht)
nd.Namesys = nsys.NewNameSystem(nd.Routing)
// Path resolver
nd.Resolver = &path.Resolver{DAG: nd.DAG}

View File

@ -4,6 +4,7 @@ import (
"bytes"
"fmt"
"io"
"math"
"os"
"testing"
"time"
@ -16,12 +17,12 @@ import (
importer "github.com/jbenet/go-ipfs/importer"
chunk "github.com/jbenet/go-ipfs/importer/chunk"
merkledag "github.com/jbenet/go-ipfs/merkledag"
mocknet "github.com/jbenet/go-ipfs/net/mock"
path "github.com/jbenet/go-ipfs/path"
mockrouting "github.com/jbenet/go-ipfs/routing/mock"
uio "github.com/jbenet/go-ipfs/unixfs/io"
util "github.com/jbenet/go-ipfs/util"
errors "github.com/jbenet/go-ipfs/util/debugerror"
delay "github.com/jbenet/go-ipfs/util/delay"
)
const kSeed = 1
@ -87,11 +88,14 @@ func RandomBytes(n int64) []byte {
func AddCatBytes(data []byte, conf Config) error {
ctx := context.Background()
rs := mockrouting.NewServerWithDelay(mockrouting.DelayConfig{
Query: delay.Fixed(conf.RoutingLatency),
ValueVisibility: delay.Fixed(conf.RoutingLatency),
mn := mocknet.New(ctx)
// defer mn.Close() FIXME does mocknet require clean-up
mn.SetLinkDefaults(mocknet.LinkOptions{
Latency: conf.NetworkLatency,
Bandwidth: math.MaxInt32, // TODO add to conf
})
net, err := tn.StreamNetWithDelay(ctx, rs, delay.Fixed(conf.NetworkLatency))
dhtNetwork := mockrouting.NewDHTNetwork(mn)
net, err := tn.StreamNet(ctx, mn, dhtNetwork)
if err != nil {
return errors.Wrap(err)
}
@ -100,6 +104,28 @@ func AddCatBytes(data []byte, conf Config) error {
adder := sessionGenerator.Next()
catter := sessionGenerator.Next()
// catter.Routing.Update(context.TODO(), adder.Peer)
peers := mn.Peers()
if len(peers) != 2 {
return errors.New("peers not in network")
}
for _, i := range peers {
for _, j := range peers {
if i == j {
continue
}
fmt.Println(i, " and ", j)
if _, err := mn.LinkPeers(i, j); err != nil {
return err
}
if err := mn.ConnectPeers(i, j); err != nil {
return err
}
}
}
catter.SetBlockstoreLatency(conf.BlockstoreLatency)
adder.SetBlockstoreLatency(0) // disable blockstore latency during add operation

View File

@ -18,6 +18,7 @@ func benchmarkAddCat(numBytes int64, conf Config, b *testing.B) {
var instant = Config{}.All_Instantaneous()
func BenchmarkInstantaneousAddCat1KB(b *testing.B) { benchmarkAddCat(1*KB, instant, b) }
func BenchmarkInstantaneousAddCat1MB(b *testing.B) { benchmarkAddCat(1*MB, instant, b) }
func BenchmarkInstantaneousAddCat2MB(b *testing.B) { benchmarkAddCat(2*MB, instant, b) }
func BenchmarkInstantaneousAddCat4MB(b *testing.B) { benchmarkAddCat(4*MB, instant, b) }

View File

@ -11,10 +11,10 @@ import (
blocks "github.com/jbenet/go-ipfs/blocks"
blocksutil "github.com/jbenet/go-ipfs/blocks/blocksutil"
tn "github.com/jbenet/go-ipfs/exchange/bitswap/testnet"
peer "github.com/jbenet/go-ipfs/peer"
mockrouting "github.com/jbenet/go-ipfs/routing/mock"
u "github.com/jbenet/go-ipfs/util"
delay "github.com/jbenet/go-ipfs/util/delay"
"github.com/jbenet/go-ipfs/util/testutil"
)
// FIXME the tests are really sensitive to the network delay. fix them to work
@ -61,7 +61,7 @@ func TestProviderForKeyButNetworkCannotFind(t *testing.T) { // TODO revisit this
defer g.Close()
block := blocks.NewBlock([]byte("block"))
pinfo := peer.PeerInfo{ID: peer.ID("testing")}
pinfo := testutil.RandPeerOrFatal(t)
rs.Client(pinfo).Provide(context.Background(), block.Key()) // but not on network
solo := g.Next()

View File

@ -1,14 +1,12 @@
package bitswap
import (
"math"
context "github.com/jbenet/go-ipfs/Godeps/_workspace/src/code.google.com/p/go.net/context"
ds "github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-datastore"
bsnet "github.com/jbenet/go-ipfs/exchange/bitswap/network"
mockpeernet "github.com/jbenet/go-ipfs/net/mock"
peer "github.com/jbenet/go-ipfs/peer"
mockrouting "github.com/jbenet/go-ipfs/routing/mock"
delay "github.com/jbenet/go-ipfs/util/delay"
testutil "github.com/jbenet/go-ipfs/util/testutil"
)
@ -17,16 +15,7 @@ type peernet struct {
routingserver mockrouting.Server
}
func StreamNetWithDelay(
ctx context.Context,
rs mockrouting.Server,
d delay.D) (Network, error) {
net := mockpeernet.New(ctx)
net.SetLinkDefaults(mockpeernet.LinkOptions{
Latency: d.Get(),
Bandwidth: math.MaxInt32, // TODO inject
})
func StreamNet(ctx context.Context, net mockpeernet.Mocknet, rs mockrouting.Server) (Network, error) {
return &peernet{net, rs}, nil
}
@ -39,7 +28,7 @@ func (pn *peernet) Adapter(p testutil.Peer) bsnet.BitSwapNetwork {
for _, other := range peers {
pn.Mocknet.LinkPeers(p.ID(), other)
}
routing := pn.routingserver.Client(peer.PeerInfo{ID: p.ID()})
routing := pn.routingserver.ClientWithDatastore(context.TODO(), p, ds.NewMapDatastore())
return bsnet.NewFromIpfsNetwork(client, routing)
}

View File

@ -33,7 +33,7 @@ func (n *network) Adapter(p testutil.Peer) bsnet.BitSwapNetwork {
client := &networkClient{
local: p.ID(),
network: n,
routing: n.routingserver.Client(peer.PeerInfo{ID: p.ID()}),
routing: n.routingserver.Client(p),
}
n.clients[p.ID()] = client
return client

View File

@ -79,15 +79,15 @@ func (i *Instance) SetBlockstoreLatency(t time.Duration) time.Duration {
// sessions. To safeguard, use the SessionGenerator to generate sessions. It's
// just a much better idea.
func session(ctx context.Context, net tn.Network, p testutil.Peer) Instance {
adapter := net.Adapter(p)
bsdelay := delay.Fixed(0)
const kWriteCacheElems = 100
bstore, err := blockstore.WriteCached(blockstore.NewBlockstore(ds_sync.MutexWrap(datastore2.WithDelay(ds.NewMapDatastore(), bsdelay))), kWriteCacheElems)
adapter := net.Adapter(p)
dstore := ds_sync.MutexWrap(datastore2.WithDelay(ds.NewMapDatastore(), bsdelay))
bstore, err := blockstore.WriteCached(blockstore.NewBlockstore(ds_sync.MutexWrap(dstore)), kWriteCacheElems)
if err != nil {
// FIXME perhaps change signature and return error.
panic(err.Error())
panic(err.Error()) // FIXME perhaps change signature and return error.
}
const alwaysSendToPeer = true

View File

@ -4,18 +4,13 @@ import (
"testing"
ci "github.com/jbenet/go-ipfs/crypto"
peer "github.com/jbenet/go-ipfs/peer"
mockrouting "github.com/jbenet/go-ipfs/routing/mock"
u "github.com/jbenet/go-ipfs/util"
testutil "github.com/jbenet/go-ipfs/util/testutil"
)
func TestRoutingResolve(t *testing.T) {
local, err := testutil.RandPeerID()
if err != nil {
t.Fatal(err)
}
d := mockrouting.NewServer().Client(peer.PeerInfo{ID: local})
d := mockrouting.NewServer().Client(testutil.RandPeerOrFatal(t))
resolver := NewRoutingResolver(d)
publisher := NewRoutingPublisher(d)

View File

@ -76,6 +76,7 @@ func (m *Mux) ReadProtocolHeader(s io.Reader) (string, StreamHandler, error) {
// This operation is threadsafe.
func (m *Mux) SetHandler(p ProtocolID, h StreamHandler) {
m.Lock()
log.Debug("setting protocol ", p)
m.Handlers[p] = h
m.Unlock()
}

View File

@ -1,6 +1,7 @@
package dht
import (
"math"
"sync"
context "github.com/jbenet/go-ipfs/Godeps/_workspace/src/code.google.com/p/go.net/context"
@ -127,6 +128,15 @@ func (dht *IpfsDHT) Provide(ctx context.Context, key u.Key) error {
return nil
}
// FindProviders searches until the context expires.
func (dht *IpfsDHT) FindProviders(ctx context.Context, key u.Key) ([]peer.PeerInfo, error) {
var providers []peer.PeerInfo
for p := range dht.FindProvidersAsync(ctx, key, math.MaxInt32) {
providers = append(providers, p)
}
return providers, nil
}
// FindProvidersAsync is the same thing as FindProviders, but returns a channel.
// Peers will be returned on the channel as soon as they are found, even before
// the search query completes.

View File

@ -5,9 +5,11 @@ import (
context "github.com/jbenet/go-ipfs/Godeps/_workspace/src/code.google.com/p/go.net/context"
ds "github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-datastore"
ma "github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-multiaddr"
peer "github.com/jbenet/go-ipfs/peer"
routing "github.com/jbenet/go-ipfs/routing"
u "github.com/jbenet/go-ipfs/util"
"github.com/jbenet/go-ipfs/util/testutil"
)
var log = u.Logger("mockrouter")
@ -15,7 +17,7 @@ var log = u.Logger("mockrouter")
type client struct {
datastore ds.Datastore
server server
peer peer.PeerInfo
peer testutil.Peer
}
// FIXME(brian): is this method meant to simulate putting a value into the network?
@ -70,7 +72,11 @@ func (c *client) FindProvidersAsync(ctx context.Context, k u.Key, max int) <-cha
// Provide returns once the message is on the network. Value is not necessarily
// visible yet.
func (c *client) Provide(_ context.Context, key u.Key) error {
return c.server.Announce(c.peer, key)
info := peer.PeerInfo{
ID: c.peer.ID(),
Addrs: []ma.Multiaddr{c.peer.Address()},
}
return c.server.Announce(info, key)
}
var _ routing.IpfsRouting = &client{}

View File

@ -5,9 +5,11 @@ import (
"sync"
"time"
context "github.com/jbenet/go-ipfs/Godeps/_workspace/src/code.google.com/p/go.net/context"
ds "github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-datastore"
peer "github.com/jbenet/go-ipfs/peer"
u "github.com/jbenet/go-ipfs/util"
"github.com/jbenet/go-ipfs/util/testutil"
)
// server is the mockrouting.Client's private interface to the routing server
@ -71,11 +73,11 @@ func (rs *s) Providers(k u.Key) []peer.PeerInfo {
return ret
}
func (rs *s) Client(p peer.PeerInfo) Client {
return rs.ClientWithDatastore(p, ds.NewMapDatastore())
func (rs *s) Client(p testutil.Peer) Client {
return rs.ClientWithDatastore(context.Background(), p, ds.NewMapDatastore())
}
func (rs *s) ClientWithDatastore(p peer.PeerInfo, datastore ds.Datastore) Client {
func (rs *s) ClientWithDatastore(_ context.Context, p testutil.Peer, datastore ds.Datastore) Client {
return &client{
peer: p,
datastore: ds.NewMapDatastore(),

View File

@ -8,11 +8,12 @@ import (
peer "github.com/jbenet/go-ipfs/peer"
u "github.com/jbenet/go-ipfs/util"
delay "github.com/jbenet/go-ipfs/util/delay"
"github.com/jbenet/go-ipfs/util/testutil"
)
func TestKeyNotFound(t *testing.T) {
var pi = peer.PeerInfo{ID: peer.ID("the peer id")}
var pi = testutil.RandPeerOrFatal(t)
var key = u.Key("mock key")
var ctx = context.Background()
@ -25,7 +26,7 @@ func TestKeyNotFound(t *testing.T) {
}
func TestClientFindProviders(t *testing.T) {
pi := peer.PeerInfo{ID: peer.ID("42")}
pi := testutil.RandPeerOrFatal(t)
rs := NewServer()
client := rs.Client(pi)
@ -39,20 +40,6 @@ func TestClientFindProviders(t *testing.T) {
time.Sleep(time.Millisecond * 300)
max := 100
providersFromHashTable, err := rs.Client(pi).FindProviders(context.Background(), k)
if err != nil {
t.Fatal(err)
}
isInHT := false
for _, pi := range providersFromHashTable {
if pi.ID == pi.ID {
isInHT = true
}
}
if !isInHT {
t.Fatal("Despite client providing key, peer wasn't in hash table as a provider")
}
providersFromClient := client.FindProvidersAsync(context.Background(), u.Key("hello"), max)
isInClient := false
for pi := range providersFromClient {
@ -70,7 +57,7 @@ func TestClientOverMax(t *testing.T) {
k := u.Key("hello")
numProvidersForHelloKey := 100
for i := 0; i < numProvidersForHelloKey; i++ {
pi := peer.PeerInfo{ID: peer.ID(i)}
pi := testutil.RandPeerOrFatal(t)
err := rs.Client(pi).Provide(context.Background(), k)
if err != nil {
t.Fatal(err)
@ -78,7 +65,7 @@ func TestClientOverMax(t *testing.T) {
}
max := 10
pi := peer.PeerInfo{ID: peer.ID("TODO")}
pi := testutil.RandPeerOrFatal(t)
client := rs.Client(pi)
providersFromClient := client.FindProvidersAsync(context.Background(), k, max)
@ -113,8 +100,11 @@ func TestCanceledContext(t *testing.T) {
default:
}
pi := peer.PeerInfo{ID: peer.ID(i)}
err := rs.Client(pi).Provide(context.Background(), k)
pi, err := testutil.RandPeer()
if err != nil {
t.Error(err)
}
err = rs.Client(pi).Provide(context.Background(), k)
if err != nil {
t.Error(err)
}
@ -122,7 +112,7 @@ func TestCanceledContext(t *testing.T) {
}
}()
local := peer.PeerInfo{ID: peer.ID("peer id doesn't matter")}
local := testutil.RandPeerOrFatal(t)
client := rs.Client(local)
t.Log("warning: max is finite so this test is non-deterministic")
@ -148,7 +138,7 @@ func TestCanceledContext(t *testing.T) {
func TestValidAfter(t *testing.T) {
var pi = peer.PeerInfo{ID: peer.ID("the peer id")}
pi := testutil.RandPeerOrFatal(t)
var key = u.Key("mock key")
var ctx = context.Background()
conf := DelayConfig{

View File

@ -11,18 +11,18 @@ import (
routing "github.com/jbenet/go-ipfs/routing"
u "github.com/jbenet/go-ipfs/util"
delay "github.com/jbenet/go-ipfs/util/delay"
"github.com/jbenet/go-ipfs/util/testutil"
)
// Server provides mockrouting Clients
type Server interface {
Client(p peer.PeerInfo) Client
ClientWithDatastore(peer.PeerInfo, ds.Datastore) Client
Client(p testutil.Peer) Client
ClientWithDatastore(context.Context, testutil.Peer, ds.Datastore) Client
}
// Client implements IpfsRouting
type Client interface {
FindProviders(context.Context, u.Key) ([]peer.PeerInfo, error)
routing.IpfsRouting
}

38
routing/mock/server2.go Normal file
View File

@ -0,0 +1,38 @@
package mockrouting
import (
context "github.com/jbenet/go-ipfs/Godeps/_workspace/src/code.google.com/p/go.net/context"
ds "github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-datastore"
sync "github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-datastore/sync"
mocknet "github.com/jbenet/go-ipfs/net/mock"
dht "github.com/jbenet/go-ipfs/routing/dht"
"github.com/jbenet/go-ipfs/util/testutil"
)
type mocknetserver struct {
mn mocknet.Mocknet
}
func NewDHTNetwork(mn mocknet.Mocknet) Server {
return &mocknetserver{
mn: mn,
}
}
func (rs *mocknetserver) Client(p testutil.Peer) Client {
return rs.ClientWithDatastore(context.TODO(), p, ds.NewMapDatastore())
}
func (rs *mocknetserver) ClientWithDatastore(ctx context.Context, p testutil.Peer, ds ds.Datastore) Client {
// FIXME AddPeer doesn't appear to be idempotent
net, err := rs.mn.AddPeer(p.PrivateKey(), p.Address())
if err != nil {
panic("FIXME")
// return nil, debugerror.Wrap(err)
}
return dht.NewDHT(ctx, p.ID(), net, sync.MutexWrap(ds))
}
var _ Server = &mocknetserver{}