1
0
mirror of https://github.com/ipfs/kubo.git synced 2025-09-10 11:52:21 +08:00

fixup datastore interfaces

License: MIT
Signed-off-by: Jeromy <jeromyj@gmail.com>
This commit is contained in:
Jeromy
2015-07-16 11:32:41 -07:00
parent 3ffebd942f
commit 45d4b1a8bc
8 changed files with 26 additions and 40 deletions

4
Godeps/Godeps.json generated
View File

@ -166,7 +166,11 @@
}, },
{ {
"ImportPath": "github.com/jbenet/go-datastore", "ImportPath": "github.com/jbenet/go-datastore",
<<<<<<< HEAD
"Rev": "c835c30f206c1e97172e428f052e225adab9abde" "Rev": "c835c30f206c1e97172e428f052e225adab9abde"
=======
"Rev": "47af23f2ad09237ccc09c586c118048e2b39b358"
>>>>>>> fixup datastore interfaces
}, },
{ {
"ImportPath": "github.com/jbenet/go-detect-race", "ImportPath": "github.com/jbenet/go-detect-race",

View File

@ -25,7 +25,7 @@ var ValueTypeMismatch = errors.New("The retrieved value is not a Block")
var ErrNotFound = errors.New("blockstore: block not found") var ErrNotFound = errors.New("blockstore: block not found")
// Blockstore wraps a ThreadSafeDatastore // Blockstore wraps a Datastore
type Blockstore interface { type Blockstore interface {
DeleteBlock(key.Key) error DeleteBlock(key.Key) error
Has(key.Key) (bool, error) Has(key.Key) (bool, error)
@ -51,7 +51,7 @@ type GCBlockstore interface {
PinLock() func() PinLock() func()
} }
func NewBlockstore(d ds.Datastore) *blockstore { func NewBlockstore(d ds.Batching) *blockstore {
dd := dsns.Wrap(d, BlockPrefix) dd := dsns.Wrap(d, BlockPrefix)
return &blockstore{ return &blockstore{
datastore: dd, datastore: dd,
@ -60,8 +60,6 @@ func NewBlockstore(d ds.Datastore) *blockstore {
type blockstore struct { type blockstore struct {
datastore ds.Batching datastore ds.Batching
// cant be ThreadSafeDatastore cause namespace.Datastore doesnt support it.
// we do check it on `NewBlockstore` though.
lk sync.RWMutex lk sync.RWMutex
} }

View File

@ -17,7 +17,6 @@ import (
"time" "time"
b58 "github.com/ipfs/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-base58" b58 "github.com/ipfs/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-base58"
ds "github.com/ipfs/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-datastore"
ma "github.com/ipfs/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-multiaddr" ma "github.com/ipfs/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-multiaddr"
goprocess "github.com/ipfs/go-ipfs/Godeps/_workspace/src/github.com/jbenet/goprocess" goprocess "github.com/ipfs/go-ipfs/Godeps/_workspace/src/github.com/jbenet/goprocess"
mamask "github.com/ipfs/go-ipfs/Godeps/_workspace/src/github.com/whyrusleeping/multiaddr-filter" mamask "github.com/ipfs/go-ipfs/Godeps/_workspace/src/github.com/whyrusleeping/multiaddr-filter"
@ -570,14 +569,14 @@ func startListening(ctx context.Context, host p2phost.Host, cfg *config.Config)
return nil return nil
} }
func constructDHTRouting(ctx context.Context, host p2phost.Host, dstore ds.Datastore) (routing.IpfsRouting, error) { func constructDHTRouting(ctx context.Context, host p2phost.Host, dstore repo.Datastore) (routing.IpfsRouting, error) {
dhtRouting := dht.NewDHT(ctx, host, dstore) dhtRouting := dht.NewDHT(ctx, host, dstore)
dhtRouting.Validator[IpnsValidatorTag] = namesys.IpnsRecordValidator dhtRouting.Validator[IpnsValidatorTag] = namesys.IpnsRecordValidator
dhtRouting.Selector[IpnsValidatorTag] = namesys.IpnsSelectorFunc dhtRouting.Selector[IpnsValidatorTag] = namesys.IpnsSelectorFunc
return dhtRouting, nil return dhtRouting, nil
} }
type RoutingOption func(context.Context, p2phost.Host, ds.Datastore) (routing.IpfsRouting, error) type RoutingOption func(context.Context, p2phost.Host, repo.Datastore) (routing.IpfsRouting, error)
type DiscoveryOption func(p2phost.Host) (discovery.Service, error) type DiscoveryOption func(p2phost.Host) (discovery.Service, error)

View File

@ -8,6 +8,7 @@ import (
core "github.com/ipfs/go-ipfs/core" core "github.com/ipfs/go-ipfs/core"
"github.com/ipfs/go-ipfs/p2p/host" "github.com/ipfs/go-ipfs/p2p/host"
"github.com/ipfs/go-ipfs/p2p/peer" "github.com/ipfs/go-ipfs/p2p/peer"
repo "github.com/ipfs/go-ipfs/repo"
routing "github.com/ipfs/go-ipfs/routing" routing "github.com/ipfs/go-ipfs/routing"
supernode "github.com/ipfs/go-ipfs/routing/supernode" supernode "github.com/ipfs/go-ipfs/routing/supernode"
gcproxy "github.com/ipfs/go-ipfs/routing/supernode/proxy" gcproxy "github.com/ipfs/go-ipfs/routing/supernode/proxy"
@ -28,7 +29,7 @@ var (
// routing records to the provided datastore. Only routing records are store in // routing records to the provided datastore. Only routing records are store in
// the datastore. // the datastore.
func SupernodeServer(recordSource ds.ThreadSafeDatastore) core.RoutingOption { func SupernodeServer(recordSource ds.ThreadSafeDatastore) core.RoutingOption {
return func(ctx context.Context, ph host.Host, dstore ds.Datastore) (routing.IpfsRouting, error) { return func(ctx context.Context, ph host.Host, dstore repo.Datastore) (routing.IpfsRouting, error) {
server, err := supernode.NewServer(recordSource, ph.Peerstore(), ph.ID()) server, err := supernode.NewServer(recordSource, ph.Peerstore(), ph.ID())
if err != nil { if err != nil {
return nil, err return nil, err
@ -44,7 +45,7 @@ func SupernodeServer(recordSource ds.ThreadSafeDatastore) core.RoutingOption {
// TODO doc // TODO doc
func SupernodeClient(remotes ...peer.PeerInfo) core.RoutingOption { func SupernodeClient(remotes ...peer.PeerInfo) core.RoutingOption {
return func(ctx context.Context, ph host.Host, dstore ds.Datastore) (routing.IpfsRouting, error) { return func(ctx context.Context, ph host.Host, dstore repo.Datastore) (routing.IpfsRouting, error) {
if len(remotes) < 1 { if len(remotes) < 1 {
return nil, errServersMissing return nil, errServersMissing
} }

View File

@ -8,7 +8,7 @@ import (
"github.com/ipfs/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-datastore/flatfs" "github.com/ipfs/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-datastore/flatfs"
levelds "github.com/ipfs/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-datastore/leveldb" levelds "github.com/ipfs/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-datastore/leveldb"
"github.com/ipfs/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-datastore/measure" "github.com/ipfs/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-datastore/measure"
"github.com/ipfs/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-datastore/mount" mount "github.com/ipfs/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-datastore/syncmount"
ldbopts "github.com/ipfs/go-ipfs/Godeps/_workspace/src/github.com/syndtr/goleveldb/leveldb/opt" ldbopts "github.com/ipfs/go-ipfs/Godeps/_workspace/src/github.com/syndtr/goleveldb/leveldb/opt"
repo "github.com/ipfs/go-ipfs/repo" repo "github.com/ipfs/go-ipfs/repo"
config "github.com/ipfs/go-ipfs/repo/config" config "github.com/ipfs/go-ipfs/repo/config"
@ -20,22 +20,11 @@ const (
flatfsDirectory = "blocks" flatfsDirectory = "blocks"
) )
type defaultDatastore struct {
repo.Datastore
// tracked separately for use in Close; do not use directly.
leveldbDS repo.Datastore
metricsBlocks repo.Datastore
metricsLevelDB repo.Datastore
}
func openDefaultDatastore(r *FSRepo) (repo.Datastore, error) { func openDefaultDatastore(r *FSRepo) (repo.Datastore, error) {
d := &defaultDatastore{}
leveldbPath := path.Join(r.path, leveldbDirectory) leveldbPath := path.Join(r.path, leveldbDirectory)
var err error
// save leveldb reference so it can be neatly closed afterward // save leveldb reference so it can be neatly closed afterward
d.leveldbDS, err = levelds.NewDatastore(leveldbPath, &levelds.Options{ leveldbDS, err := levelds.NewDatastore(leveldbPath, &levelds.Options{
Compression: ldbopts.NoCompression, Compression: ldbopts.NoCompression,
}) })
if err != nil { if err != nil {
@ -65,26 +54,20 @@ func openDefaultDatastore(r *FSRepo) (repo.Datastore, error) {
id = fmt.Sprintf("uninitialized_%p", r) id = fmt.Sprintf("uninitialized_%p", r)
} }
prefix := "fsrepo." + id + ".datastore." prefix := "fsrepo." + id + ".datastore."
d.metricsBlocks = measure.New(prefix+"blocks", blocksDS) metricsBlocks := measure.New(prefix+"blocks", blocksDS)
d.metricsLevelDB = measure.New(prefix+"leveldb", d.leveldbDS) metricsLevelDB := measure.New(prefix+"leveldb", leveldbDS)
mountDS := mount.New([]mount.Mount{ mountDS := mount.New([]mount.Mount{
{ {
Prefix: ds.NewKey("/blocks"), Prefix: ds.NewKey("/blocks"),
Datastore: d.metricsBlocks, Datastore: metricsBlocks,
}, },
{ {
Prefix: ds.NewKey("/"), Prefix: ds.NewKey("/"),
Datastore: d.metricsLevelDB, Datastore: metricsLevelDB,
}, },
}) })
// Make sure it's ok to claim the virtual datastore from mount as
// threadsafe. There's no clean way to make mount itself provide
// this information without copy-pasting the code into two
// variants. This is the same dilemma as the `[].byte` attempt at
// introducing const types to Go.
d.Datastore = mountDS
return d, nil return mountDS, nil
} }
func initDefaultDatastore(repoPath string, conf *config.Config) error { func initDefaultDatastore(repoPath string, conf *config.Config) error {
@ -101,5 +84,3 @@ func initDefaultDatastore(repoPath string, conf *config.Config) error {
} }
return nil return nil
} }
var _ repo.Datastore = (*defaultDatastore)(nil)

View File

@ -5,7 +5,6 @@ import (
"io" "io"
ds "github.com/ipfs/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-datastore" ds "github.com/ipfs/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-datastore"
config "github.com/ipfs/go-ipfs/repo/config" config "github.com/ipfs/go-ipfs/repo/config"
) )
@ -32,6 +31,6 @@ type Repo interface {
// Datastore is the interface required from a datastore to be // Datastore is the interface required from a datastore to be
// acceptable to FSRepo. // acceptable to FSRepo.
type Datastore interface { type Datastore interface {
ds.Datastore // should be threadsafe, just be careful ds.Batching // should be threadsafe, just be careful
io.Closer io.Closer
} }

View File

@ -3,11 +3,11 @@ package nilrouting
import ( import (
"errors" "errors"
ds "github.com/ipfs/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-datastore"
context "github.com/ipfs/go-ipfs/Godeps/_workspace/src/golang.org/x/net/context" context "github.com/ipfs/go-ipfs/Godeps/_workspace/src/golang.org/x/net/context"
key "github.com/ipfs/go-ipfs/blocks/key" key "github.com/ipfs/go-ipfs/blocks/key"
p2phost "github.com/ipfs/go-ipfs/p2p/host" p2phost "github.com/ipfs/go-ipfs/p2p/host"
peer "github.com/ipfs/go-ipfs/p2p/peer" peer "github.com/ipfs/go-ipfs/p2p/peer"
repo "github.com/ipfs/go-ipfs/repo"
routing "github.com/ipfs/go-ipfs/routing" routing "github.com/ipfs/go-ipfs/routing"
logging "github.com/ipfs/go-ipfs/vendor/QmQg1J6vikuXF9oDvm4wpdeAUvvkVEKW1EYDw9HhTMnP2b/go-log" logging "github.com/ipfs/go-ipfs/vendor/QmQg1J6vikuXF9oDvm4wpdeAUvvkVEKW1EYDw9HhTMnP2b/go-log"
) )
@ -47,7 +47,7 @@ func (c *nilclient) Bootstrap(_ context.Context) error {
return nil return nil
} }
func ConstructNilRouting(_ context.Context, _ p2phost.Host, _ ds.Datastore) (routing.IpfsRouting, error) { func ConstructNilRouting(_ context.Context, _ p2phost.Host, _ repo.Datastore) (routing.IpfsRouting, error) {
return &nilclient{}, nil return &nilclient{}, nil
} }

View File

@ -71,4 +71,8 @@ func (ds *S3Datastore) Close() error {
return nil return nil
} }
func (ds *S3Datastore) Batch() (datastore.Batch, error) {
return datastore.NewBasicBatch(ds), nil
}
func (ds *S3Datastore) IsThreadSafe() {} func (ds *S3Datastore) IsThreadSafe() {}