mirror of
https://github.com/ipfs/kubo.git
synced 2025-06-30 18:13:54 +08:00
p2p/net/conn/listener: ignore certain errors
This should handle early breakages, where a failing connection would take out the listener entirely. There are probably other errors we should be handling here, like secure connection failures.
This commit is contained in:
@ -1,8 +1,10 @@
|
|||||||
package conn
|
package conn
|
||||||
|
|
||||||
import (
|
import (
|
||||||
|
"fmt"
|
||||||
"io"
|
"io"
|
||||||
"net"
|
"net"
|
||||||
|
"strings"
|
||||||
"testing"
|
"testing"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
@ -103,6 +105,9 @@ func testDialer(t *testing.T, secure bool) {
|
|||||||
if !secure {
|
if !secure {
|
||||||
key1 = nil
|
key1 = nil
|
||||||
key2 = nil
|
key2 = nil
|
||||||
|
t.Log("testing insecurely")
|
||||||
|
} else {
|
||||||
|
t.Log("testing securely")
|
||||||
}
|
}
|
||||||
|
|
||||||
ctx, cancel := context.WithCancel(context.Background())
|
ctx, cancel := context.WithCancel(context.Background())
|
||||||
@ -164,3 +169,89 @@ func TestDialerSecure(t *testing.T) {
|
|||||||
// t.Skip("Skipping in favor of another test")
|
// t.Skip("Skipping in favor of another test")
|
||||||
testDialer(t, true)
|
testDialer(t, true)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func testDialerCloseEarly(t *testing.T, secure bool) {
|
||||||
|
// t.Skip("Skipping in favor of another test")
|
||||||
|
|
||||||
|
p1 := tu.RandPeerNetParamsOrFatal(t)
|
||||||
|
p2 := tu.RandPeerNetParamsOrFatal(t)
|
||||||
|
|
||||||
|
key1 := p1.PrivKey
|
||||||
|
if !secure {
|
||||||
|
key1 = nil
|
||||||
|
t.Log("testing insecurely")
|
||||||
|
} else {
|
||||||
|
t.Log("testing securely")
|
||||||
|
}
|
||||||
|
|
||||||
|
ctx, cancel := context.WithCancel(context.Background())
|
||||||
|
l1, err := Listen(ctx, p1.Addr, p1.ID, key1)
|
||||||
|
if err != nil {
|
||||||
|
t.Fatal(err)
|
||||||
|
}
|
||||||
|
p1.Addr = l1.Multiaddr() // Addr has been determined by kernel.
|
||||||
|
|
||||||
|
// lol nesting
|
||||||
|
d2 := &Dialer{
|
||||||
|
LocalPeer: p2.ID,
|
||||||
|
// PrivateKey: key2, -- dont give it key. we'll just close the conn.
|
||||||
|
}
|
||||||
|
|
||||||
|
errs := make(chan error, 100)
|
||||||
|
done := make(chan struct{}, 1)
|
||||||
|
gotclosed := make(chan struct{}, 1)
|
||||||
|
go func() {
|
||||||
|
defer func() { done <- struct{}{} }()
|
||||||
|
|
||||||
|
_, err := l1.Accept()
|
||||||
|
if err != nil {
|
||||||
|
if strings.Contains(err.Error(), "closed") {
|
||||||
|
gotclosed <- struct{}{}
|
||||||
|
return
|
||||||
|
}
|
||||||
|
errs <- err
|
||||||
|
}
|
||||||
|
errs <- fmt.Errorf("got conn")
|
||||||
|
}()
|
||||||
|
|
||||||
|
c, err := d2.Dial(ctx, p1.Addr, p1.ID)
|
||||||
|
if err != nil {
|
||||||
|
errs <- err
|
||||||
|
}
|
||||||
|
c.Close() // close it early.
|
||||||
|
|
||||||
|
readerrs := func() {
|
||||||
|
for {
|
||||||
|
select {
|
||||||
|
case e := <-errs:
|
||||||
|
t.Error(e)
|
||||||
|
default:
|
||||||
|
return
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
readerrs()
|
||||||
|
|
||||||
|
l1.Close()
|
||||||
|
<-done
|
||||||
|
cancel()
|
||||||
|
readerrs()
|
||||||
|
close(errs)
|
||||||
|
|
||||||
|
select {
|
||||||
|
case <-gotclosed:
|
||||||
|
default:
|
||||||
|
t.Error("did not get closed")
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// we dont do a handshake with singleConn, so cant "close early."
|
||||||
|
// func TestDialerCloseEarlyInsecure(t *testing.T) {
|
||||||
|
// // t.Skip("Skipping in favor of another test")
|
||||||
|
// testDialerCloseEarly(t, false)
|
||||||
|
// }
|
||||||
|
|
||||||
|
func TestDialerCloseEarlySecure(t *testing.T) {
|
||||||
|
// t.Skip("Skipping in favor of another test")
|
||||||
|
testDialerCloseEarly(t, true)
|
||||||
|
}
|
||||||
|
@ -2,13 +2,14 @@ package conn
|
|||||||
|
|
||||||
import (
|
import (
|
||||||
"fmt"
|
"fmt"
|
||||||
|
"io"
|
||||||
"net"
|
"net"
|
||||||
|
|
||||||
context "github.com/jbenet/go-ipfs/Godeps/_workspace/src/code.google.com/p/go.net/context"
|
context "github.com/jbenet/go-ipfs/Godeps/_workspace/src/code.google.com/p/go.net/context"
|
||||||
ctxgroup "github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-ctxgroup"
|
ctxgroup "github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-ctxgroup"
|
||||||
ma "github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-multiaddr"
|
ma "github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-multiaddr"
|
||||||
manet "github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-multiaddr-net"
|
manet "github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-multiaddr-net"
|
||||||
|
tec "github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-temp-err-catcher"
|
||||||
ic "github.com/jbenet/go-ipfs/p2p/crypto"
|
ic "github.com/jbenet/go-ipfs/p2p/crypto"
|
||||||
peer "github.com/jbenet/go-ipfs/p2p/peer"
|
peer "github.com/jbenet/go-ipfs/p2p/peer"
|
||||||
)
|
)
|
||||||
@ -46,25 +47,53 @@ func (l *listener) Accept() (net.Conn, error) {
|
|||||||
// Contexts and io don't mix.
|
// Contexts and io don't mix.
|
||||||
ctx := context.Background()
|
ctx := context.Background()
|
||||||
|
|
||||||
maconn, err := l.Listener.Accept()
|
var catcher tec.TempErrCatcher
|
||||||
if err != nil {
|
|
||||||
return nil, err
|
catcher.IsTemp = func(e error) bool {
|
||||||
|
// ignore connection breakages up to this point. but log them
|
||||||
|
if e == io.EOF {
|
||||||
|
log.Debugf("listener ignoring conn with EOF: %s", e)
|
||||||
|
return true
|
||||||
|
}
|
||||||
|
|
||||||
|
te, ok := e.(tec.Temporary)
|
||||||
|
if ok {
|
||||||
|
log.Debugf("listener ignoring conn with temporary err: %s", e)
|
||||||
|
return te.Temporary()
|
||||||
|
}
|
||||||
|
return false
|
||||||
}
|
}
|
||||||
|
|
||||||
c, err := newSingleConn(ctx, l.local, "", maconn)
|
for {
|
||||||
if err != nil {
|
maconn, err := l.Listener.Accept()
|
||||||
return nil, fmt.Errorf("Error accepting connection: %v", err)
|
if err != nil {
|
||||||
}
|
if catcher.IsTemporary(err) {
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
if l.privk == nil {
|
c, err := newSingleConn(ctx, l.local, "", maconn)
|
||||||
log.Warning("listener %s listening INSECURELY!", l)
|
if err != nil {
|
||||||
return c, nil
|
if catcher.IsTemporary(err) {
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
if l.privk == nil {
|
||||||
|
log.Warning("listener %s listening INSECURELY!", l)
|
||||||
|
return c, nil
|
||||||
|
}
|
||||||
|
sc, err := newSecureConn(ctx, l.privk, c)
|
||||||
|
if err != nil {
|
||||||
|
if catcher.IsTemporary(err) {
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
return sc, nil
|
||||||
}
|
}
|
||||||
sc, err := newSecureConn(ctx, l.privk, c)
|
|
||||||
if err != nil {
|
|
||||||
return nil, fmt.Errorf("Error securing connection: %v", err)
|
|
||||||
}
|
|
||||||
return sc, nil
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func (l *listener) Addr() net.Addr {
|
func (l *listener) Addr() net.Addr {
|
||||||
|
Reference in New Issue
Block a user