mirror of
https://github.com/ipfs/kubo.git
synced 2025-07-02 03:28:25 +08:00
Fixed panic on closer
This commit is contained in:
@ -1,29 +1,71 @@
|
|||||||
package conn
|
package conn
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"errors"
|
"sync"
|
||||||
|
|
||||||
context "github.com/jbenet/go-ipfs/Godeps/_workspace/src/code.google.com/p/go.net/context"
|
context "github.com/jbenet/go-ipfs/Godeps/_workspace/src/code.google.com/p/go.net/context"
|
||||||
)
|
)
|
||||||
|
|
||||||
// Wait is a readable channel to block on until it receives a signal.
|
|
||||||
type Wait <-chan Signal
|
|
||||||
|
|
||||||
// Signal is an empty channel
|
|
||||||
type Signal struct{}
|
|
||||||
|
|
||||||
// CloseFunc is a function used to close a ContextCloser
|
// CloseFunc is a function used to close a ContextCloser
|
||||||
type CloseFunc func() error
|
type CloseFunc func() error
|
||||||
|
|
||||||
// ContextCloser is an interface for services able to be opened and closed.
|
// ContextCloser is an interface for services able to be opened and closed.
|
||||||
|
// It has a parent Context, and Children. But ContextCloser is not a proper
|
||||||
|
// "tree" like the Context tree. It is more like a Context-WaitGroup hybrid.
|
||||||
|
// It models a main object with a few children objects -- and, unlike the
|
||||||
|
// context -- concerns itself with the parent-child closing semantics:
|
||||||
|
//
|
||||||
|
// - Can define a CloseFunc (func() error) to be run at Close time.
|
||||||
|
// - Children call Children().Add(1) to be waited upon
|
||||||
|
// - Children can select on <-Closing() to know when they should shut down.
|
||||||
|
// - Close() will wait until all children call Children().Done()
|
||||||
|
// - <-Closed() signals when the service is completely closed.
|
||||||
|
//
|
||||||
|
// ContextCloser can be embedded into the main object itself. In that case,
|
||||||
|
// the closeFunc (if a member function) has to be set after the struct
|
||||||
|
// is intialized:
|
||||||
|
//
|
||||||
|
// type service struct {
|
||||||
|
// ContextCloser
|
||||||
|
// net.Conn
|
||||||
|
// }
|
||||||
|
//
|
||||||
|
// func (s *service) close() error {
|
||||||
|
// return s.Conn.Close()
|
||||||
|
// }
|
||||||
|
//
|
||||||
|
// func newService(ctx context.Context, c net.Conn) *service {
|
||||||
|
// s := &service{c}
|
||||||
|
// s.ContextCloser = NewContextCloser(ctx, s.close)
|
||||||
|
// return s
|
||||||
|
// }
|
||||||
|
//
|
||||||
type ContextCloser interface {
|
type ContextCloser interface {
|
||||||
|
|
||||||
|
// Context is the context of this ContextCloser. It is "sort of" a parent.
|
||||||
Context() context.Context
|
Context() context.Context
|
||||||
|
|
||||||
// Close is a method to call when you with to stop this ContextCloser
|
// Children is a sync.Waitgroup for all children goroutines that should
|
||||||
|
// shut down completely before this service is said to be "closed".
|
||||||
|
// Follows the semantics of WaitGroup:
|
||||||
|
// Children().Add(1) // add one more dependent child
|
||||||
|
// Children().Done() // child signals it is done
|
||||||
|
Children() *sync.WaitGroup
|
||||||
|
|
||||||
|
// Close is a method to call when you wish to stop this ContextCloser
|
||||||
Close() error
|
Close() error
|
||||||
|
|
||||||
// Done is a method to wait upon, like context.Context.Done
|
// Closing is a signal to wait upon, like Context.Done().
|
||||||
Done() Wait
|
// It fires when the object should be closing (but hasn't yet fully closed).
|
||||||
|
// The primary use case is for child goroutines who need to know when
|
||||||
|
// they should shut down. (equivalent to Context().Done())
|
||||||
|
Closing() <-chan struct{}
|
||||||
|
|
||||||
|
// Closed is a method to wait upon, like Context.Done().
|
||||||
|
// It fires when the entire object is fully closed.
|
||||||
|
// The primary use case is for external listeners who need to know when
|
||||||
|
// this object is completly done, and all its children closed.
|
||||||
|
Closed() <-chan struct{}
|
||||||
}
|
}
|
||||||
|
|
||||||
// contextCloser is an OpenCloser with a cancellable context
|
// contextCloser is an OpenCloser with a cancellable context
|
||||||
@ -31,11 +73,20 @@ type contextCloser struct {
|
|||||||
ctx context.Context
|
ctx context.Context
|
||||||
cancel context.CancelFunc
|
cancel context.CancelFunc
|
||||||
|
|
||||||
// called to close
|
// called to run the close logic.
|
||||||
closeFunc CloseFunc
|
closeFunc CloseFunc
|
||||||
|
|
||||||
// closed is released once the close function is done.
|
// closed is released once the close function is done.
|
||||||
closed chan Signal
|
closed chan struct{}
|
||||||
|
|
||||||
|
// wait group for child goroutines
|
||||||
|
children sync.WaitGroup
|
||||||
|
|
||||||
|
// sync primitive to ensure the close logic is only called once.
|
||||||
|
closeOnce sync.Once
|
||||||
|
|
||||||
|
// error to return to clients of Close().
|
||||||
|
closeErr error
|
||||||
}
|
}
|
||||||
|
|
||||||
// NewContextCloser constructs and returns a ContextCloser. It will call
|
// NewContextCloser constructs and returns a ContextCloser. It will call
|
||||||
@ -46,7 +97,7 @@ func NewContextCloser(ctx context.Context, cf CloseFunc) ContextCloser {
|
|||||||
ctx: ctx,
|
ctx: ctx,
|
||||||
cancel: cancel,
|
cancel: cancel,
|
||||||
closeFunc: cf,
|
closeFunc: cf,
|
||||||
closed: make(chan Signal),
|
closed: make(chan struct{}),
|
||||||
}
|
}
|
||||||
|
|
||||||
go c.closeOnContextDone()
|
go c.closeOnContextDone()
|
||||||
@ -57,30 +108,47 @@ func (c *contextCloser) Context() context.Context {
|
|||||||
return c.ctx
|
return c.ctx
|
||||||
}
|
}
|
||||||
|
|
||||||
func (c *contextCloser) Done() Wait {
|
func (c *contextCloser) Children() *sync.WaitGroup {
|
||||||
|
return &c.children
|
||||||
|
}
|
||||||
|
|
||||||
|
// Close is the external close function. it's a wrapper around internalClose
|
||||||
|
// that waits on Closed()
|
||||||
|
func (c *contextCloser) Close() error {
|
||||||
|
c.internalClose()
|
||||||
|
<-c.Closed() // wait until we're totally done.
|
||||||
|
return c.closeErr
|
||||||
|
}
|
||||||
|
|
||||||
|
func (c *contextCloser) Closing() <-chan struct{} {
|
||||||
|
return c.Context().Done()
|
||||||
|
}
|
||||||
|
|
||||||
|
func (c *contextCloser) Closed() <-chan struct{} {
|
||||||
return c.closed
|
return c.closed
|
||||||
}
|
}
|
||||||
|
|
||||||
func (c *contextCloser) Close() error {
|
func (c *contextCloser) internalClose() {
|
||||||
select {
|
go c.closeOnce.Do(c.closeLogic)
|
||||||
case <-c.Done():
|
|
||||||
// panic("closed twice")
|
|
||||||
return errors.New("closed twice")
|
|
||||||
default:
|
|
||||||
}
|
|
||||||
|
|
||||||
err := c.closeFunc() // actually run the close logic
|
|
||||||
close(c.closed) // relase everyone waiting on Done
|
|
||||||
c.cancel() // release anyone waiting on the context
|
|
||||||
return err
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// the _actual_ close process.
|
||||||
|
func (c *contextCloser) closeLogic() {
|
||||||
|
// this function should only be called once (hence the sync.Once).
|
||||||
|
// and it will panic at the bottom (on close(c.closed)) otherwise.
|
||||||
|
|
||||||
|
c.cancel() // signal that we're shutting down (Closing)
|
||||||
|
c.closeErr = c.closeFunc() // actually run the close logic
|
||||||
|
c.children.Wait() // wait till all children are done.
|
||||||
|
close(c.closed) // signal that we're shut down (Closed)
|
||||||
|
}
|
||||||
|
|
||||||
|
// if parent context is shut down before we call Close explicitly,
|
||||||
|
// we need to go through the Close motions anyway. Hence all the sync
|
||||||
|
// stuff all over the place...
|
||||||
func (c *contextCloser) closeOnContextDone() {
|
func (c *contextCloser) closeOnContextDone() {
|
||||||
<-c.ctx.Done()
|
c.Children().Add(1) // we're a child goroutine, to be waited upon.
|
||||||
select {
|
<-c.Context().Done() // wait until parent (context) is done.
|
||||||
case <-c.Done():
|
c.internalClose()
|
||||||
return // already closed
|
c.Children().Done()
|
||||||
default:
|
|
||||||
}
|
|
||||||
c.Close()
|
|
||||||
}
|
}
|
||||||
|
@ -218,7 +218,7 @@ func (l *listener) close() error {
|
|||||||
|
|
||||||
func (l *listener) isClosed() bool {
|
func (l *listener) isClosed() bool {
|
||||||
select {
|
select {
|
||||||
case <-l.Done():
|
case <-l.Closed():
|
||||||
return true
|
return true
|
||||||
default:
|
default:
|
||||||
return false
|
return false
|
||||||
|
@ -18,9 +18,9 @@ func TestClose(t *testing.T) {
|
|||||||
c1, c2 := setupConn(t, ctx, "/ip4/127.0.0.1/tcp/1234", "/ip4/127.0.0.1/tcp/2345")
|
c1, c2 := setupConn(t, ctx, "/ip4/127.0.0.1/tcp/1234", "/ip4/127.0.0.1/tcp/2345")
|
||||||
|
|
||||||
select {
|
select {
|
||||||
case <-c1.Done():
|
case <-c1.Closed():
|
||||||
t.Fatal("done before close")
|
t.Fatal("done before close")
|
||||||
case <-c2.Done():
|
case <-c2.Closed():
|
||||||
t.Fatal("done before close")
|
t.Fatal("done before close")
|
||||||
default:
|
default:
|
||||||
}
|
}
|
||||||
@ -28,7 +28,7 @@ func TestClose(t *testing.T) {
|
|||||||
c1.Close()
|
c1.Close()
|
||||||
|
|
||||||
select {
|
select {
|
||||||
case <-c1.Done():
|
case <-c1.Closed():
|
||||||
default:
|
default:
|
||||||
t.Fatal("not done after cancel")
|
t.Fatal("not done after cancel")
|
||||||
}
|
}
|
||||||
@ -36,7 +36,7 @@ func TestClose(t *testing.T) {
|
|||||||
c2.Close()
|
c2.Close()
|
||||||
|
|
||||||
select {
|
select {
|
||||||
case <-c2.Done():
|
case <-c2.Closed():
|
||||||
default:
|
default:
|
||||||
t.Fatal("not done after cancel")
|
t.Fatal("not done after cancel")
|
||||||
}
|
}
|
||||||
@ -50,9 +50,9 @@ func TestCancel(t *testing.T) {
|
|||||||
c1, c2 := setupConn(t, ctx, "/ip4/127.0.0.1/tcp/1234", "/ip4/127.0.0.1/tcp/2345")
|
c1, c2 := setupConn(t, ctx, "/ip4/127.0.0.1/tcp/1234", "/ip4/127.0.0.1/tcp/2345")
|
||||||
|
|
||||||
select {
|
select {
|
||||||
case <-c1.Done():
|
case <-c1.Closed():
|
||||||
t.Fatal("done before close")
|
t.Fatal("done before close")
|
||||||
case <-c2.Done():
|
case <-c2.Closed():
|
||||||
t.Fatal("done before close")
|
t.Fatal("done before close")
|
||||||
default:
|
default:
|
||||||
}
|
}
|
||||||
@ -64,13 +64,13 @@ func TestCancel(t *testing.T) {
|
|||||||
// test that cancel called Close.
|
// test that cancel called Close.
|
||||||
|
|
||||||
select {
|
select {
|
||||||
case <-c1.Done():
|
case <-c1.Closed():
|
||||||
default:
|
default:
|
||||||
t.Fatal("not done after cancel")
|
t.Fatal("not done after cancel")
|
||||||
}
|
}
|
||||||
|
|
||||||
select {
|
select {
|
||||||
case <-c2.Done():
|
case <-c2.Closed():
|
||||||
default:
|
default:
|
||||||
t.Fatal("not done after cancel")
|
t.Fatal("not done after cancel")
|
||||||
}
|
}
|
||||||
|
@ -31,7 +31,7 @@ func VersionHandshake(ctx context.Context, c Conn) error {
|
|||||||
case <-ctx.Done():
|
case <-ctx.Done():
|
||||||
return ctx.Err()
|
return ctx.Err()
|
||||||
|
|
||||||
case <-c.Done():
|
case <-c.Closed():
|
||||||
return errors.New("remote closed connection during version exchange")
|
return errors.New("remote closed connection during version exchange")
|
||||||
|
|
||||||
case data, ok := <-c.In():
|
case data, ok := <-c.In():
|
||||||
|
@ -37,9 +37,9 @@ func TestSecureClose(t *testing.T) {
|
|||||||
c2 = setupSecureConn(t, c2)
|
c2 = setupSecureConn(t, c2)
|
||||||
|
|
||||||
select {
|
select {
|
||||||
case <-c1.Done():
|
case <-c1.Closed():
|
||||||
t.Fatal("done before close")
|
t.Fatal("done before close")
|
||||||
case <-c2.Done():
|
case <-c2.Closed():
|
||||||
t.Fatal("done before close")
|
t.Fatal("done before close")
|
||||||
default:
|
default:
|
||||||
}
|
}
|
||||||
@ -47,7 +47,7 @@ func TestSecureClose(t *testing.T) {
|
|||||||
c1.Close()
|
c1.Close()
|
||||||
|
|
||||||
select {
|
select {
|
||||||
case <-c1.Done():
|
case <-c1.Closed():
|
||||||
default:
|
default:
|
||||||
t.Fatal("not done after cancel")
|
t.Fatal("not done after cancel")
|
||||||
}
|
}
|
||||||
@ -55,7 +55,7 @@ func TestSecureClose(t *testing.T) {
|
|||||||
c2.Close()
|
c2.Close()
|
||||||
|
|
||||||
select {
|
select {
|
||||||
case <-c2.Done():
|
case <-c2.Closed():
|
||||||
default:
|
default:
|
||||||
t.Fatal("not done after cancel")
|
t.Fatal("not done after cancel")
|
||||||
}
|
}
|
||||||
@ -72,9 +72,9 @@ func TestSecureCancel(t *testing.T) {
|
|||||||
c2 = setupSecureConn(t, c2)
|
c2 = setupSecureConn(t, c2)
|
||||||
|
|
||||||
select {
|
select {
|
||||||
case <-c1.Done():
|
case <-c1.Closed():
|
||||||
t.Fatal("done before close")
|
t.Fatal("done before close")
|
||||||
case <-c2.Done():
|
case <-c2.Closed():
|
||||||
t.Fatal("done before close")
|
t.Fatal("done before close")
|
||||||
default:
|
default:
|
||||||
}
|
}
|
||||||
@ -86,13 +86,13 @@ func TestSecureCancel(t *testing.T) {
|
|||||||
// test that cancel called Close.
|
// test that cancel called Close.
|
||||||
|
|
||||||
select {
|
select {
|
||||||
case <-c1.Done():
|
case <-c1.Closed():
|
||||||
default:
|
default:
|
||||||
t.Fatal("not done after cancel")
|
t.Fatal("not done after cancel")
|
||||||
}
|
}
|
||||||
|
|
||||||
select {
|
select {
|
||||||
case <-c2.Done():
|
case <-c2.Closed():
|
||||||
default:
|
default:
|
||||||
t.Fatal("not done after cancel")
|
t.Fatal("not done after cancel")
|
||||||
}
|
}
|
||||||
|
Reference in New Issue
Block a user