1
0
mirror of https://github.com/ipfs/kubo.git synced 2025-09-09 23:42:20 +08:00

Replace ctxgroup.ContextGroup -> goprocess.Process

License: MIT
Signed-off-by: rht <rhtbot@gmail.com>
This commit is contained in:
rht
2015-06-17 19:18:52 +07:00
parent 5bfd694ca7
commit 330b213777
19 changed files with 72 additions and 588 deletions

4
Godeps/Godeps.json generated
View File

@ -137,10 +137,6 @@
"ImportPath": "github.com/jbenet/go-base58",
"Rev": "6237cf65f3a6f7111cd8a42be3590df99a66bc7d"
},
{
"ImportPath": "github.com/jbenet/go-ctxgroup",
"Rev": "c14598396fa31465dc558b176c7976606f95a49d"
},
{
"ImportPath": "github.com/jbenet/go-datastore",
"Rev": "245a981af3750d7710db13dca731ba8461aa1095"

View File

@ -1,13 +0,0 @@
{
"ImportPath": "github.com/jbenet/go-ctxgroup",
"GoVersion": "go1.4.2",
"Packages": [
"./..."
],
"Deps": [
{
"ImportPath": "golang.org/x/net/context",
"Rev": "b6fdb7d8a4ccefede406f8fe0f017fb58265054c"
}
]
}

View File

@ -1,5 +0,0 @@
This directory tree is generated automatically by godep.
Please do not edit.
See https://github.com/tools/godep for more information.

View File

@ -1,16 +0,0 @@
all:
# no-op
GODEP=$(which godep)
godep: ${GODEP}
${GODEP}:
echo ${GODEP}
go get github.com/tools/godep
# saves/vendors third-party dependencies to Godeps/_workspace
# -r flag rewrites import paths to use the vendored path
# ./... performs operation on all packages in tree
vendor: godep
godep save -r ./...

View File

@ -1,35 +0,0 @@
# ContextGroup
- Godoc: https://godoc.org/github.com/jbenet/go-ctxgroup
ContextGroup is an interface for services able to be opened and closed.
It has a parent Context, and Children. But ContextGroup is not a proper
"tree" like the Context tree. It is more like a Context-WaitGroup hybrid.
It models a main object with a few children objects -- and, unlike the
context -- concerns itself with the parent-child closing semantics:
- Can define an optional TeardownFunc (func() error) to be run at Closetime.
- Children call Children().Add(1) to be waited upon
- Children can select on <-Closing() to know when they should shut down.
- Close() will wait until all children call Children().Done()
- <-Closed() signals when the service is completely closed.
ContextGroup can be embedded into the main object itself. In that case,
the teardownFunc (if a member function) has to be set after the struct
is intialized:
```Go
type service struct {
ContextGroup
net.Conn
}
func (s *service) close() error {
return s.Conn.Close()
}
func newService(ctx context.Context, c net.Conn) *service {
s := &service{c}
s.ContextGroup = NewContextGroup(ctx, s.close)
return s
}
```

View File

@ -1,257 +0,0 @@
// package ctxgroup provides the ContextGroup, a hybrid between the
// context.Context and sync.WaitGroup, which models process trees.
package ctxgroup
import (
"io"
"sync"
context "github.com/ipfs/go-ipfs/Godeps/_workspace/src/golang.org/x/net/context"
)
// TeardownFunc is a function used to cleanup state at the end of the
// lifecycle of a process.
type TeardownFunc func() error
// ChildFunc is a function to register as a child. It will be automatically
// tracked.
type ChildFunc func(parent ContextGroup)
var nilTeardownFunc = func() error { return nil }
// ContextGroup is an interface for services able to be opened and closed.
// It has a parent Context, and Children. But ContextGroup is not a proper
// "tree" like the Context tree. It is more like a Context-WaitGroup hybrid.
// It models a main object with a few children objects -- and, unlike the
// context -- concerns itself with the parent-child closing semantics:
//
// - Can define an optional TeardownFunc (func() error) to be run at Close time.
// - Children call Children().Add(1) to be waited upon
// - Children can select on <-Closing() to know when they should shut down.
// - Close() will wait until all children call Children().Done()
// - <-Closed() signals when the service is completely closed.
//
// ContextGroup can be embedded into the main object itself. In that case,
// the teardownFunc (if a member function) has to be set after the struct
// is intialized:
//
// type service struct {
// ContextGroup
// net.Conn
// }
//
// func (s *service) close() error {
// return s.Conn.Close()
// }
//
// func newService(ctx context.Context, c net.Conn) *service {
// s := &service{c}
// s.ContextGroup = NewContextGroup(ctx, s.close)
// return s
// }
//
type ContextGroup interface {
// Context is the context of this ContextGroup. It is "sort of" a parent.
Context() context.Context
// SetTeardown assigns the teardown function.
// It is called exactly _once_ when the ContextGroup is Closed.
SetTeardown(tf TeardownFunc)
// Children is a sync.Waitgroup for all children goroutines that should
// shut down completely before this service is said to be "closed".
// Follows the semantics of WaitGroup:
//
// Children().Add(1) // add one more dependent child
// Children().Done() // child signals it is done
//
// WARNING: this is deprecated and will go away soon.
Children() *sync.WaitGroup
// AddChild gives ownership of a child io.Closer. The child will be closed
// when the context group is closed.
AddChild(io.Closer)
// AddChildFunc registers a dependent ChildFund. The child will receive
// its parent ContextGroup, and can wait on its signals. Child references
// tracked automatically. It equivalent to the following:
//
// go func(parent, child ContextGroup) {
//
// <-parent.Closing() // wait until parent is closing
// child.Close() // signal child to close
// parent.Children().Done() // child signals it is done
// }(a, b)
//
AddChildFunc(c ChildFunc)
// Close is a method to call when you wish to stop this ContextGroup
Close() error
// Closing is a signal to wait upon, like Context.Done().
// It fires when the object should be closing (but hasn't yet fully closed).
// The primary use case is for child goroutines who need to know when
// they should shut down. (equivalent to Context().Done())
Closing() <-chan struct{}
// Closed is a method to wait upon, like Context.Done().
// It fires when the entire object is fully closed.
// The primary use case is for external listeners who need to know when
// this object is completly done, and all its children closed.
Closed() <-chan struct{}
}
// contextGroup is a Closer with a cancellable context
type contextGroup struct {
ctx context.Context
cancel context.CancelFunc
// called to run the teardown logic.
teardownFunc TeardownFunc
// closed is released once the close function is done.
closed chan struct{}
// wait group for child goroutines
children sync.WaitGroup
// sync primitive to ensure the close logic is only called once.
closeOnce sync.Once
// error to return to clients of Close().
closeErr error
}
// newContextGroup constructs and returns a ContextGroup. It will call
// cf TeardownFunc before its Done() Wait signals fire.
func newContextGroup(ctx context.Context, cf TeardownFunc) ContextGroup {
ctx, cancel := context.WithCancel(ctx)
c := &contextGroup{
ctx: ctx,
cancel: cancel,
closed: make(chan struct{}),
}
c.SetTeardown(cf)
c.Children().Add(1) // initialize with 1. calling Close will decrement it.
go c.closeOnContextDone()
return c
}
// SetTeardown assigns the teardown function.
func (c *contextGroup) SetTeardown(cf TeardownFunc) {
if cf == nil {
cf = nilTeardownFunc
}
c.teardownFunc = cf
}
func (c *contextGroup) Context() context.Context {
return c.ctx
}
func (c *contextGroup) Children() *sync.WaitGroup {
return &c.children
}
func (c *contextGroup) AddChild(child io.Closer) {
c.children.Add(1)
go func(parent ContextGroup, child io.Closer) {
<-parent.Closing() // wait until parent is closing
child.Close() // signal child to close
parent.Children().Done() // child signals it is done
}(c, child)
}
func (c *contextGroup) AddChildFunc(child ChildFunc) {
c.children.Add(1)
go func(parent ContextGroup, child ChildFunc) {
child(parent)
parent.Children().Done() // child signals it is done
}(c, child)
}
// Close is the external close function. it's a wrapper around internalClose
// that waits on Closed()
func (c *contextGroup) Close() error {
c.internalClose()
<-c.Closed() // wait until we're totally done.
return c.closeErr
}
func (c *contextGroup) Closing() <-chan struct{} {
return c.Context().Done()
}
func (c *contextGroup) Closed() <-chan struct{} {
return c.closed
}
func (c *contextGroup) internalClose() {
go c.closeOnce.Do(c.closeLogic)
}
// the _actual_ close process.
func (c *contextGroup) closeLogic() {
// this function should only be called once (hence the sync.Once).
// and it will panic at the bottom (on close(c.closed)) otherwise.
c.cancel() // signal that we're shutting down (Closing)
c.closeErr = c.teardownFunc() // actually run the close logic
c.children.Wait() // wait till all children are done.
close(c.closed) // signal that we're shut down (Closed)
}
// if parent context is shut down before we call Close explicitly,
// we need to go through the Close motions anyway. Hence all the sync
// stuff all over the place...
func (c *contextGroup) closeOnContextDone() {
<-c.Context().Done() // wait until parent (context) is done.
c.internalClose()
c.Children().Done()
}
// WithTeardown constructs and returns a ContextGroup with
// cf TeardownFunc (and context.Background)
func WithTeardown(cf TeardownFunc) ContextGroup {
if cf == nil {
panic("nil TeardownFunc")
}
return newContextGroup(context.Background(), cf)
}
// WithContext constructs and returns a ContextGroup with given context
func WithContext(ctx context.Context) ContextGroup {
if ctx == nil {
panic("nil Context")
}
return newContextGroup(ctx, nil)
}
// WithContextAndTeardown constructs and returns a ContextGroup with
// cf TeardownFunc (and context.Background)
func WithContextAndTeardown(ctx context.Context, cf TeardownFunc) ContextGroup {
if ctx == nil {
panic("nil Context")
}
if cf == nil {
panic("nil TeardownFunc")
}
return newContextGroup(ctx, cf)
}
// WithParent constructs and returns a ContextGroup with given parent
func WithParent(p ContextGroup) ContextGroup {
if p == nil {
panic("nil ContextGroup")
}
c := newContextGroup(p.Context(), nil)
p.AddChild(c)
return c
}
// WithBackground returns a ContextGroup with context.Background()
func WithBackground() ContextGroup {
return newContextGroup(context.Background(), nil)
}

View File

@ -1,187 +0,0 @@
package ctxgroup
import (
"testing"
"time"
context "github.com/ipfs/go-ipfs/Godeps/_workspace/src/golang.org/x/net/context"
)
type tree struct {
ContextGroup
c []tree
}
func setupCGHierarchy(ctx context.Context) tree {
t := func(n ContextGroup, ts ...tree) tree {
return tree{n, ts}
}
if ctx == nil {
ctx = context.Background()
}
a := WithContext(ctx)
b1 := WithParent(a)
b2 := WithParent(a)
c1 := WithParent(b1)
c2 := WithParent(b1)
c3 := WithParent(b2)
c4 := WithParent(b2)
return t(a, t(b1, t(c1), t(c2)), t(b2, t(c3), t(c4)))
}
func TestClosingClosed(t *testing.T) {
a := WithBackground()
Q := make(chan string)
go func() {
<-a.Closing()
Q <- "closing"
}()
go func() {
<-a.Closed()
Q <- "closed"
}()
go func() {
a.Close()
Q <- "closed"
}()
if q := <-Q; q != "closing" {
t.Error("order incorrect. closing not first")
}
if q := <-Q; q != "closed" {
t.Error("order incorrect. closing not first")
}
if q := <-Q; q != "closed" {
t.Error("order incorrect. closing not first")
}
}
func TestChildFunc(t *testing.T) {
a := WithBackground()
wait1 := make(chan struct{})
wait2 := make(chan struct{})
wait3 := make(chan struct{})
wait4 := make(chan struct{})
go func() {
a.Close()
wait4 <- struct{}{}
}()
a.AddChildFunc(func(parent ContextGroup) {
wait1 <- struct{}{}
<-wait2
wait3 <- struct{}{}
})
<-wait1
select {
case <-wait3:
t.Error("should not be closed yet")
case <-wait4:
t.Error("should not be closed yet")
case <-a.Closed():
t.Error("should not be closed yet")
default:
}
wait2 <- struct{}{}
select {
case <-wait3:
case <-time.After(time.Second):
t.Error("should be closed now")
}
select {
case <-wait4:
case <-time.After(time.Second):
t.Error("should be closed now")
}
}
func TestTeardownCalledOnce(t *testing.T) {
a := setupCGHierarchy(nil)
onlyOnce := func() func() error {
count := 0
return func() error {
count++
if count > 1 {
t.Error("called", count, "times")
}
return nil
}
}
a.SetTeardown(onlyOnce())
a.c[0].SetTeardown(onlyOnce())
a.c[0].c[0].SetTeardown(onlyOnce())
a.c[0].c[1].SetTeardown(onlyOnce())
a.c[1].SetTeardown(onlyOnce())
a.c[1].c[0].SetTeardown(onlyOnce())
a.c[1].c[1].SetTeardown(onlyOnce())
a.c[0].c[0].Close()
a.c[0].c[0].Close()
a.c[0].c[0].Close()
a.c[0].c[0].Close()
a.c[0].Close()
a.c[0].Close()
a.c[0].Close()
a.c[0].Close()
a.Close()
a.Close()
a.Close()
a.Close()
a.c[1].Close()
a.c[1].Close()
a.c[1].Close()
a.c[1].Close()
}
func TestOnClosed(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
a := setupCGHierarchy(ctx)
Q := make(chan string, 10)
onClosed := func(s string, c ContextGroup) {
<-c.Closed()
Q <- s
}
go onClosed("0", a.c[0])
go onClosed("10", a.c[1].c[0])
go onClosed("", a)
go onClosed("00", a.c[0].c[0])
go onClosed("1", a.c[1])
go onClosed("01", a.c[0].c[1])
go onClosed("11", a.c[1].c[1])
test := func(ss ...string) {
s1 := <-Q
for _, s2 := range ss {
if s1 == s2 {
return
}
}
t.Error("context not in group", s1, ss)
}
cancel()
test("00", "01", "10", "11")
test("00", "01", "10", "11")
test("00", "01", "10", "11")
test("00", "01", "10", "11")
test("0", "1")
test("0", "1")
test("")
}

View File

@ -17,9 +17,10 @@ import (
"time"
b58 "github.com/ipfs/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-base58"
ctxgroup "github.com/ipfs/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-ctxgroup"
ds "github.com/ipfs/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-datastore"
ma "github.com/ipfs/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-multiaddr"
goprocess "github.com/ipfs/go-ipfs/Godeps/_workspace/src/github.com/jbenet/goprocess"
goprocessctx "github.com/ipfs/go-ipfs/Godeps/_workspace/src/github.com/jbenet/goprocess/context"
mamask "github.com/ipfs/go-ipfs/Godeps/_workspace/src/github.com/whyrusleeping/multiaddr-filter"
context "github.com/ipfs/go-ipfs/Godeps/_workspace/src/golang.org/x/net/context"
diag "github.com/ipfs/go-ipfs/diagnostics"
@ -105,7 +106,7 @@ type IpfsNode struct {
IpnsFs *ipnsfs.Filesystem
ctxgroup.ContextGroup
goprocess.Process
mode mode
}
@ -121,12 +122,12 @@ type Mounts struct {
type ConfigOption func(ctx context.Context) (*IpfsNode, error)
func NewIPFSNode(parent context.Context, option ConfigOption) (*IpfsNode, error) {
ctxg := ctxgroup.WithContext(parent)
ctx := ctxg.Context()
procctx := goprocessctx.WithContext(parent)
ctx := parent
success := false // flip to true after all sub-system inits succeed
defer func() {
if !success {
ctxg.Close()
procctx.Close()
}
}()
@ -134,7 +135,7 @@ func NewIPFSNode(parent context.Context, option ConfigOption) (*IpfsNode, error)
if err != nil {
return nil, err
}
node.ContextGroup = ctxg
node.Process = procctx
ctxg.SetTeardown(node.teardown)
// Need to make sure it's perfectly clear 1) which variables are expected

View File

@ -1,9 +1,9 @@
package coremock
import (
ctxgroup "github.com/ipfs/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-ctxgroup"
"github.com/ipfs/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-datastore"
syncds "github.com/ipfs/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-datastore/sync"
goprocessctx "github.com/ipfs/go-ipfs/Godeps/_workspace/src/github.com/jbenet/goprocess/context"
context "github.com/ipfs/go-ipfs/Godeps/_workspace/src/golang.org/x/net/context"
"github.com/ipfs/go-ipfs/blocks/blockstore"
blockservice "github.com/ipfs/go-ipfs/blockservice"
@ -42,7 +42,7 @@ func NewMockNode() (*core.IpfsNode, error) {
nd.Peerstore = peer.NewPeerstore()
nd.Peerstore.AddPrivKey(p, ident.PrivateKey())
nd.Peerstore.AddPubKey(p, ident.PublicKey())
nd.ContextGroup = ctxgroup.WithContext(ctx)
nd.Process = goprocessctx.WithContext(ctx)
nd.PeerHost, err = mocknet.New(ctx).AddPeer(ident.PrivateKey(), ident.Address()) // effectively offline
if err != nil {

View File

@ -8,7 +8,7 @@ import (
"github.com/ipfs/go-ipfs/Godeps/_workspace/src/bazil.org/fuse"
"github.com/ipfs/go-ipfs/Godeps/_workspace/src/bazil.org/fuse/fs"
"github.com/ipfs/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-ctxgroup"
"github.com/ipfs/go-ipfs/Godeps/_workspace/src/github.com/jbenet/goprocess"
)
// mount implements go-ipfs/fuse/mount
@ -18,12 +18,12 @@ type mount struct {
fuseConn *fuse.Conn
// closeErr error
cg ctxgroup.ContextGroup
proc goprocess.Process
}
// Mount mounts a fuse fs.FS at a given location, and returns a Mount instance.
// parent is a ContextGroup to bind the mount's ContextGroup to.
func NewMount(p ctxgroup.ContextGroup, fsys fs.FS, mountpoint string, allow_other bool) (Mount, error) {
func NewMount(p goprocess.Process, fsys fs.FS, mountpoint string, allow_other bool) (Mount, error) {
var conn *fuse.Conn
var err error
@ -41,9 +41,9 @@ func NewMount(p ctxgroup.ContextGroup, fsys fs.FS, mountpoint string, allow_othe
mpoint: mountpoint,
fuseConn: conn,
filesys: fsys,
cg: ctxgroup.WithParent(p), // link it to parent.
proc: goprocess.WithParent(p), // link it to parent.
}
m.cg.SetTeardown(m.unmount)
m.proc.SetTeardown(m.unmount)
// launch the mounting process.
if err := m.mount(); err != nil {
@ -116,8 +116,8 @@ func (m *mount) unmount() error {
return nil
}
func (m *mount) CtxGroup() ctxgroup.ContextGroup {
return m.cg
func (m *mount) Process() goprocess.Process {
return m.proc
}
func (m *mount) MountPoint() string {
@ -125,6 +125,6 @@ func (m *mount) MountPoint() string {
}
func (m *mount) Unmount() error {
// call ContextCloser Close(), which calls unmount() exactly once.
return m.cg.Close()
// call Process Close(), which calls unmount() exactly once.
return m.proc.Close()
}

View File

@ -7,7 +7,7 @@ import (
"runtime"
"time"
ctxgroup "github.com/ipfs/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-ctxgroup"
goprocess "github.com/ipfs/go-ipfs/Godeps/_workspace/src/github.com/jbenet/goprocess"
u "github.com/ipfs/go-ipfs/util"
)
@ -24,9 +24,9 @@ type Mount interface {
// Unmounts the mount
Unmount() error
// CtxGroup returns the mount's CtxGroup to be able to link it
// Process returns the mount's Process to be able to link it
// to other processes. Unmount upon closing.
CtxGroup() ctxgroup.ContextGroup
Process() goprocess.Process
}
// ForceUnmount attempts to forcibly unmount a given mount.

View File

@ -5,11 +5,12 @@ import (
"io"
"net"
ctxgroup "github.com/ipfs/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-ctxgroup"
ma "github.com/ipfs/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-multiaddr"
manet "github.com/ipfs/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-multiaddr-net"
reuseport "github.com/ipfs/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-reuseport"
tec "github.com/ipfs/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-temp-err-catcher"
"github.com/ipfs/go-ipfs/Godeps/_workspace/src/github.com/jbenet/goprocess"
goprocessctx "github.com/ipfs/go-ipfs/Godeps/_workspace/src/github.com/jbenet/goprocess/context"
context "github.com/ipfs/go-ipfs/Godeps/_workspace/src/golang.org/x/net/context"
ic "github.com/ipfs/go-ipfs/p2p/crypto"
@ -31,7 +32,7 @@ type listener struct {
wrapper ConnWrapper
cg ctxgroup.ContextGroup
proc goprocess.Process
}
func (l *listener) teardown() error {
@ -41,7 +42,7 @@ func (l *listener) teardown() error {
func (l *listener) Close() error {
log.Debugf("listener closing: %s %s", l.local, l.Multiaddr())
return l.cg.Close()
return l.proc.Close()
}
func (l *listener) String() string {
@ -157,9 +158,9 @@ func Listen(ctx context.Context, addr ma.Multiaddr, local peer.ID, sk ic.PrivKey
Listener: ml,
local: local,
privk: sk,
cg: ctxgroup.WithContext(ctx),
proc: goprocessctx.WithContext(ctx),
}
l.cg.SetTeardown(l.teardown)
l.proc.SetTeardown(l.teardown)
log.Debugf("Conn Listener on %s", l.Multiaddr())
log.Event(ctx, "swarmListen", l)

View File

@ -6,8 +6,8 @@ import (
conn "github.com/ipfs/go-ipfs/p2p/net/conn"
peer "github.com/ipfs/go-ipfs/p2p/peer"
ctxgroup "github.com/ipfs/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-ctxgroup"
ma "github.com/ipfs/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-multiaddr"
"github.com/ipfs/go-ipfs/Godeps/_workspace/src/github.com/jbenet/goprocess"
context "github.com/ipfs/go-ipfs/Godeps/_workspace/src/golang.org/x/net/context"
)
@ -80,8 +80,8 @@ type Network interface {
// use the known local interfaces.
InterfaceListenAddresses() ([]ma.Multiaddr, error)
// CtxGroup returns the network's contextGroup
CtxGroup() ctxgroup.ContextGroup
// Process returns the network's Process
Process() goprocess.Process
}
// Dialer represents a service that can dial out to peers

View File

@ -13,8 +13,9 @@ import (
p2putil "github.com/ipfs/go-ipfs/p2p/test/util"
testutil "github.com/ipfs/go-ipfs/util/testutil"
ctxgroup "github.com/ipfs/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-ctxgroup"
ma "github.com/ipfs/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-multiaddr"
"github.com/ipfs/go-ipfs/Godeps/_workspace/src/github.com/jbenet/goprocess"
goprocessctx "github.com/ipfs/go-ipfs/Godeps/_workspace/src/github.com/jbenet/goprocess/context"
context "github.com/ipfs/go-ipfs/Godeps/_workspace/src/golang.org/x/net/context"
)
@ -31,7 +32,7 @@ type mocknet struct {
linkDefaults LinkOptions
cg ctxgroup.ContextGroup // for Context closing
proc goprocess.Process // for Context closing
sync.RWMutex
}
@ -40,7 +41,7 @@ func New(ctx context.Context) Mocknet {
nets: map[peer.ID]*peernet{},
hosts: map[peer.ID]*bhost.BasicHost{},
links: map[peer.ID]map[peer.ID]map[*link]struct{}{},
cg: ctxgroup.WithContext(ctx),
proc: goprocessctx.WithContext(ctx),
}
}

View File

@ -9,8 +9,9 @@ import (
inet "github.com/ipfs/go-ipfs/p2p/net"
peer "github.com/ipfs/go-ipfs/p2p/peer"
ctxgroup "github.com/ipfs/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-ctxgroup"
ma "github.com/ipfs/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-multiaddr"
"github.com/ipfs/go-ipfs/Godeps/_workspace/src/github.com/jbenet/goprocess"
goprocessctx "github.com/ipfs/go-ipfs/Godeps/_workspace/src/github.com/jbenet/goprocess/context"
context "github.com/ipfs/go-ipfs/Godeps/_workspace/src/golang.org/x/net/context"
)
@ -34,7 +35,7 @@ type peernet struct {
notifmu sync.RWMutex
notifs map[inet.Notifiee]struct{}
cg ctxgroup.ContextGroup
proc goprocess.Process
sync.RWMutex
}
@ -57,7 +58,7 @@ func newPeernet(ctx context.Context, m *mocknet, k ic.PrivKey,
mocknet: m,
peer: p,
ps: ps,
cg: ctxgroup.WithContext(ctx),
proc: goprocessctx.WithContext(ctx),
connsByPeer: map[peer.ID]map[*conn]struct{}{},
connsByLink: map[*link]map[*conn]struct{}{},
@ -223,9 +224,9 @@ func (pn *peernet) removeConn(c *conn) {
delete(cs, c)
}
// CtxGroup returns the network's ContextGroup
func (pn *peernet) CtxGroup() ctxgroup.ContextGroup {
return pn.cg
// Process returns the network's Process
func (pn *peernet) Process() goprocess.Process {
return pn.proc
}
// LocalPeer the network's LocalPeer

View File

@ -14,11 +14,12 @@ import (
peer "github.com/ipfs/go-ipfs/p2p/peer"
eventlog "github.com/ipfs/go-ipfs/thirdparty/eventlog"
ctxgroup "github.com/ipfs/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-ctxgroup"
ma "github.com/ipfs/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-multiaddr"
ps "github.com/ipfs/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-peerstream"
pst "github.com/ipfs/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-peerstream/transport"
psy "github.com/ipfs/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-peerstream/transport/yamux"
"github.com/ipfs/go-ipfs/Godeps/_workspace/src/github.com/jbenet/goprocess"
goprocessctx "github.com/ipfs/go-ipfs/Godeps/_workspace/src/github.com/jbenet/goprocess/context"
prom "github.com/ipfs/go-ipfs/Godeps/_workspace/src/github.com/prometheus/client_golang/prometheus"
mafilter "github.com/ipfs/go-ipfs/Godeps/_workspace/src/github.com/whyrusleeping/multiaddr-filter"
context "github.com/ipfs/go-ipfs/Godeps/_workspace/src/golang.org/x/net/context"
@ -63,7 +64,7 @@ type Swarm struct {
// filters for addresses that shouldnt be dialed
Filters *filter.Filters
cg ctxgroup.ContextGroup
proc goprocess.Process
bwc metrics.Reporter
}
@ -80,7 +81,7 @@ func NewSwarm(ctx context.Context, listenAddrs []ma.Multiaddr,
swarm: ps.NewSwarm(PSTransport),
local: local,
peers: peers,
cg: ctxgroup.WithContext(ctx),
proc: goprocessctx.WithContext(ctx),
dialT: DialTimeout,
notifs: make(map[inet.Notifiee]ps.Notifiee),
bwc: bwc,
@ -88,7 +89,7 @@ func NewSwarm(ctx context.Context, listenAddrs []ma.Multiaddr,
}
// configure Swarm
s.cg.SetTeardown(s.teardown)
s.proc.SetTeardown(s.teardown)
s.SetConnHandler(nil) // make sure to setup our own conn handler.
// setup swarm metrics
@ -111,8 +112,6 @@ func (s *Swarm) AddAddrFilter(f string) error {
s.Filters.AddDialFilter(m)
return nil
}
// CtxGroup returns the Context Group of the swarm
func filterAddrs(listenAddrs []ma.Multiaddr) ([]ma.Multiaddr, error) {
if len(listenAddrs) > 0 {
filtered := addrutil.FilterUsableAddrs(listenAddrs)
@ -124,7 +123,6 @@ func filterAddrs(listenAddrs []ma.Multiaddr) ([]ma.Multiaddr, error) {
return listenAddrs, nil
}
// CtxGroup returns the Context Group of the swarm
func (s *Swarm) Listen(addrs ...ma.Multiaddr) error {
addrs, err := filterAddrs(addrs)
if err != nil {
@ -134,14 +132,14 @@ func (s *Swarm) Listen(addrs ...ma.Multiaddr) error {
return s.listen(addrs)
}
// CtxGroup returns the Context Group of the swarm
func (s *Swarm) CtxGroup() ctxgroup.ContextGroup {
return s.cg
// Process returns the Process of the swarm
func (s *Swarm) Process() goprocess.Process {
return s.proc
}
// Close stops the Swarm.
func (s *Swarm) Close() error {
return s.cg.Close()
return s.proc.Close()
}
// StreamSwarm returns the underlying peerstream.Swarm

View File

@ -8,8 +8,8 @@ import (
metrics "github.com/ipfs/go-ipfs/metrics"
inet "github.com/ipfs/go-ipfs/p2p/net"
ctxgroup "github.com/ipfs/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-ctxgroup"
ma "github.com/ipfs/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-multiaddr"
"github.com/ipfs/go-ipfs/Godeps/_workspace/src/github.com/jbenet/goprocess"
context "github.com/ipfs/go-ipfs/Godeps/_workspace/src/golang.org/x/net/context"
)
@ -43,9 +43,9 @@ func (n *Network) DialPeer(ctx context.Context, p peer.ID) (inet.Conn, error) {
return inet.Conn(sc), nil
}
// CtxGroup returns the network's ContextGroup
func (n *Network) CtxGroup() ctxgroup.ContextGroup {
return n.cg
// Process returns the network's Process
func (n *Network) Process() goprocess.Process {
return n.proc
}
// Swarm returns the network's peerstream.Swarm
@ -100,7 +100,7 @@ func (n *Network) close() error {
// Close calls the ContextCloser func
func (n *Network) Close() error {
return n.Swarm().cg.Close()
return n.Swarm().proc.Close()
}
// Listen tells the network to start listening on given multiaddrs.

View File

@ -23,8 +23,9 @@ import (
u "github.com/ipfs/go-ipfs/util"
proto "github.com/ipfs/go-ipfs/Godeps/_workspace/src/github.com/gogo/protobuf/proto"
ctxgroup "github.com/ipfs/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-ctxgroup"
ds "github.com/ipfs/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-datastore"
goprocess "github.com/ipfs/go-ipfs/Godeps/_workspace/src/github.com/jbenet/goprocess"
goprocessctx "github.com/ipfs/go-ipfs/Godeps/_workspace/src/github.com/jbenet/goprocess/context"
context "github.com/ipfs/go-ipfs/Godeps/_workspace/src/golang.org/x/net/context"
)
@ -57,7 +58,8 @@ type IpfsDHT struct {
Validator record.Validator // record validator funcs
ctxgroup.ContextGroup
Context context.Context
goprocess.Process
}
// NewDHT creates a new DHT object with the given peer as the 'local' host
@ -71,14 +73,17 @@ func NewDHT(ctx context.Context, h host.Host, dstore ds.ThreadSafeDatastore) *Ip
// register for network notifs.
dht.host.Network().Notify((*netNotifiee)(dht))
dht.ContextGroup = ctxgroup.WithContextAndTeardown(ctx, func() error {
procctx = goprocessctx.WithContext(ctx)
procctx.SetTeardown(func() error {
// remove ourselves from network notifs.
dht.host.Network().StopNotify((*netNotifiee)(dht))
return nil
})
dht.Process = procctx
dht.Context = ctx
h.SetStreamHandler(ProtocolDHT, dht.handleNewStream)
dht.providers = NewProviderManager(dht.Context(), dht.self)
dht.providers = NewProviderManager(dht.Context, dht.self)
dht.AddChild(dht.providers)
dht.routingTable = kb.NewRoutingTable(20, kb.ConvertPeerID(dht.self), time.Minute, dht.peerstore)
@ -88,8 +93,7 @@ func NewDHT(ctx context.Context, h host.Host, dstore ds.ThreadSafeDatastore) *Ip
dht.Validator["pk"] = record.PublicKeyValidator
if doPinging {
dht.Children().Add(1)
go dht.PingRoutine(time.Second * 10)
dht.Go(func() { dht.PingRoutine(time.Second * 10) })
}
return dht
}
@ -348,8 +352,6 @@ func (dht *IpfsDHT) ensureConnectedToPeer(ctx context.Context, p peer.ID) error
// PingRoutine periodically pings nearest neighbors.
func (dht *IpfsDHT) PingRoutine(t time.Duration) {
defer dht.Children().Done()
tick := time.Tick(t)
for {
select {
@ -358,7 +360,7 @@ func (dht *IpfsDHT) PingRoutine(t time.Duration) {
rand.Read(id)
peers := dht.routingTable.NearestPeers(kb.ConvertKey(key.Key(id)), 5)
for _, p := range peers {
ctx, cancel := context.WithTimeout(dht.Context(), time.Second*5)
ctx, cancel := context.WithTimeout(dht.Context, time.Second*5)
_, err := dht.Ping(ctx, p)
if err != nil {
log.Debugf("Ping error: %s", err)

View File

@ -3,7 +3,8 @@ package dht
import (
"time"
ctxgroup "github.com/ipfs/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-ctxgroup"
"github.com/ipfs/go-ipfs/Godeps/_workspace/src/github.com/jbenet/goprocess"
goprocessctx "github.com/ipfs/go-ipfs/Godeps/_workspace/src/github.com/jbenet/goprocess/context"
key "github.com/ipfs/go-ipfs/blocks/key"
peer "github.com/ipfs/go-ipfs/p2p/peer"
@ -21,7 +22,7 @@ type ProviderManager struct {
newprovs chan *addProv
getprovs chan *getProv
period time.Duration
ctxgroup.ContextGroup
goprocess.Process
}
type providerSet struct {
@ -46,17 +47,13 @@ func NewProviderManager(ctx context.Context, local peer.ID) *ProviderManager {
pm.providers = make(map[key.Key]*providerSet)
pm.getlocal = make(chan chan []key.Key)
pm.local = make(map[key.Key]struct{})
pm.ContextGroup = ctxgroup.WithContext(ctx)
pm.Children().Add(1)
go pm.run()
pm.Process = goprocessctx.WithContext(ctx)
pm.Go(pm.run)
return pm
}
func (pm *ProviderManager) run() {
defer pm.Children().Done()
tick := time.NewTicker(time.Hour)
for {
select {