mirror of
https://github.com/ipfs/kubo.git
synced 2025-06-30 09:59:13 +08:00
net/p2p + secio: parallelize crypto handshake
We had a very nasty problem: handshakes were serial so incoming dials would wait for each other to finish handshaking. this was particularly problematic when handshakes hung-- nodes would not recover quickly. This led to gateways not bootstrapping peers fast enough. The approach taken here is to do what crypto/tls does: defer the handshake until Read/Write[1]. There are a number of reasons why this is _the right thing to do_: - it delays handshaking until it is known to be necessary (doing io) - it "accepts" before the handshake, getting the handshake out of the critical path entirely. - it defers to the user's parallelization of conn handling. users must implement this in some way already so use that, instead of picking constants surely to be wrong (how many handshakes to run in parallel?) [0] http://golang.org/src/crypto/tls/conn.go#L886
This commit is contained in:
@ -17,28 +17,13 @@ type SessionGenerator struct {
|
||||
PrivateKey ci.PrivKey
|
||||
}
|
||||
|
||||
// NewSession takes an insecure io.ReadWriter, performs a TLS-like
|
||||
// NewSession takes an insecure io.ReadWriter, sets up a TLS-like
|
||||
// handshake with the other side, and returns a secure session.
|
||||
// The handshake isn't run until the connection is read or written to.
|
||||
// See the source for the protocol details and security implementation.
|
||||
// The provided Context is only needed for the duration of this function.
|
||||
func (sg *SessionGenerator) NewSession(ctx context.Context,
|
||||
insecure io.ReadWriter) (Session, error) {
|
||||
|
||||
ss, err := newSecureSession(sg.LocalID, sg.PrivateKey)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
if ctx == nil {
|
||||
ctx = context.Background()
|
||||
}
|
||||
ctx, cancel := context.WithCancel(ctx)
|
||||
if err := ss.handshake(ctx, insecure); err != nil {
|
||||
cancel()
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return ss, nil
|
||||
func (sg *SessionGenerator) NewSession(ctx context.Context, insecure io.ReadWriteCloser) (Session, error) {
|
||||
return newSecureSession(ctx, sg.LocalID, sg.PrivateKey, insecure)
|
||||
}
|
||||
|
||||
type Session interface {
|
||||
@ -64,6 +49,9 @@ type Session interface {
|
||||
|
||||
// SecureReadWriter returns the encrypted communication channel
|
||||
func (s *secureSession) ReadWriter() msgio.ReadWriteCloser {
|
||||
if err := s.Handshake(); err != nil {
|
||||
return &closedRW{err}
|
||||
}
|
||||
return s.secure
|
||||
}
|
||||
|
||||
@ -79,15 +67,60 @@ func (s *secureSession) LocalPrivateKey() ci.PrivKey {
|
||||
|
||||
// RemotePeer retrieves the remote peer.
|
||||
func (s *secureSession) RemotePeer() peer.ID {
|
||||
if err := s.Handshake(); err != nil {
|
||||
return ""
|
||||
}
|
||||
return s.remotePeer
|
||||
}
|
||||
|
||||
// RemotePeer retrieves the remote peer.
|
||||
func (s *secureSession) RemotePublicKey() ci.PubKey {
|
||||
if err := s.Handshake(); err != nil {
|
||||
return nil
|
||||
}
|
||||
return s.remote.permanentPubKey
|
||||
}
|
||||
|
||||
// Close closes the secure session
|
||||
func (s *secureSession) Close() error {
|
||||
s.cancel()
|
||||
s.handshakeMu.Lock()
|
||||
defer s.handshakeMu.Unlock()
|
||||
if s.secure == nil {
|
||||
return s.insecure.Close() // hadn't secured yet.
|
||||
}
|
||||
return s.secure.Close()
|
||||
}
|
||||
|
||||
// closedRW implements a stub msgio interface that's already
|
||||
// closed and errored.
|
||||
type closedRW struct {
|
||||
err error
|
||||
}
|
||||
|
||||
func (c *closedRW) Read(buf []byte) (int, error) {
|
||||
return 0, c.err
|
||||
}
|
||||
|
||||
func (c *closedRW) Write(buf []byte) (int, error) {
|
||||
return 0, c.err
|
||||
}
|
||||
|
||||
func (c *closedRW) NextMsgLen() (int, error) {
|
||||
return 0, c.err
|
||||
}
|
||||
|
||||
func (c *closedRW) ReadMsg() ([]byte, error) {
|
||||
return nil, c.err
|
||||
}
|
||||
|
||||
func (c *closedRW) WriteMsg(buf []byte) error {
|
||||
return c.err
|
||||
}
|
||||
|
||||
func (c *closedRW) Close() error {
|
||||
return c.err
|
||||
}
|
||||
|
||||
func (c *closedRW) ReleaseMsg(m []byte) {
|
||||
}
|
||||
|
@ -6,6 +6,8 @@ import (
|
||||
"errors"
|
||||
"fmt"
|
||||
"io"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
msgio "github.com/ipfs/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-msgio"
|
||||
context "github.com/ipfs/go-ipfs/Godeps/_workspace/src/golang.org/x/net/context"
|
||||
@ -27,15 +29,23 @@ var ErrClosed = errors.New("connection closed")
|
||||
// ErrEcho is returned when we're attempting to handshake with the same keys and nonces.
|
||||
var ErrEcho = errors.New("same keys and nonces. one side talking to self.")
|
||||
|
||||
// HandshakeTimeout governs how long the handshake will be allowed to take place for.
|
||||
// Making this number large means there could be many bogus connections waiting to
|
||||
// timeout in flight. Typical handshakes take ~3RTTs, so it should be completed within
|
||||
// seconds across a typical planet in the solar system.
|
||||
var HandshakeTimeout = time.Second * 30
|
||||
|
||||
// nonceSize is the size of our nonces (in bytes)
|
||||
const nonceSize = 16
|
||||
|
||||
// secureSession encapsulates all the parameters needed for encrypting
|
||||
// and decrypting traffic from an insecure channel.
|
||||
type secureSession struct {
|
||||
secure msgio.ReadWriteCloser
|
||||
ctx context.Context
|
||||
cancel context.CancelFunc
|
||||
|
||||
insecure io.ReadWriter
|
||||
secure msgio.ReadWriteCloser
|
||||
insecure io.ReadWriteCloser
|
||||
insecureM msgio.ReadWriter
|
||||
|
||||
localKey ci.PrivKey
|
||||
@ -46,6 +56,10 @@ type secureSession struct {
|
||||
remote encParams
|
||||
|
||||
sharedSecret []byte
|
||||
|
||||
handshakeMu sync.Mutex // guards handshakeDone + handshakeErr
|
||||
handshakeDone bool
|
||||
handshakeErr error
|
||||
}
|
||||
|
||||
func (s *secureSession) Loggable() map[string]interface{} {
|
||||
@ -56,8 +70,9 @@ func (s *secureSession) Loggable() map[string]interface{} {
|
||||
return m
|
||||
}
|
||||
|
||||
func newSecureSession(local peer.ID, key ci.PrivKey) (*secureSession, error) {
|
||||
func newSecureSession(ctx context.Context, local peer.ID, key ci.PrivKey, insecure io.ReadWriteCloser) (*secureSession, error) {
|
||||
s := &secureSession{localPeer: local, localKey: key}
|
||||
s.ctx, s.cancel = context.WithCancel(ctx)
|
||||
|
||||
switch {
|
||||
case s.localPeer == "":
|
||||
@ -66,18 +81,37 @@ func newSecureSession(local peer.ID, key ci.PrivKey) (*secureSession, error) {
|
||||
return nil, errors.New("no local private key provided")
|
||||
case !s.localPeer.MatchesPrivateKey(s.localKey):
|
||||
return nil, fmt.Errorf("peer.ID does not match PrivateKey")
|
||||
case insecure == nil:
|
||||
return nil, fmt.Errorf("insecure ReadWriter is nil")
|
||||
}
|
||||
|
||||
s.ctx = ctx
|
||||
s.insecure = insecure
|
||||
s.insecureM = msgio.NewReadWriter(insecure)
|
||||
return s, nil
|
||||
}
|
||||
|
||||
// handsahke performs initial communication over insecure channel to share
|
||||
func (s *secureSession) Handshake() error {
|
||||
s.handshakeMu.Lock()
|
||||
defer s.handshakeMu.Unlock()
|
||||
|
||||
if s.handshakeErr != nil {
|
||||
return s.handshakeErr
|
||||
}
|
||||
|
||||
if !s.handshakeDone {
|
||||
s.handshakeErr = s.runHandshake()
|
||||
s.handshakeDone = true
|
||||
}
|
||||
return s.handshakeErr
|
||||
}
|
||||
|
||||
// runHandshake performs initial communication over insecure channel to share
|
||||
// keys, IDs, and initiate communication, assigning all necessary params.
|
||||
// requires the duplex channel to be a msgio.ReadWriter (for framed messaging)
|
||||
func (s *secureSession) handshake(ctx context.Context, insecure io.ReadWriter) error {
|
||||
|
||||
s.insecure = insecure
|
||||
s.insecureM = msgio.NewReadWriter(insecure)
|
||||
func (s *secureSession) runHandshake() error {
|
||||
ctx, cancel := context.WithTimeout(s.ctx, HandshakeTimeout) // remove
|
||||
defer cancel()
|
||||
|
||||
// =============================================================================
|
||||
// step 1. Propose -- propose cipher suite + send pubkeys + nonce
|
||||
|
@ -75,18 +75,35 @@ func setupConn(t *testing.T, ctx context.Context, secure bool) (a, b Conn, p1, p
|
||||
|
||||
done := make(chan error)
|
||||
go func() {
|
||||
defer close(done)
|
||||
|
||||
var err error
|
||||
c2, err = d2.Dial(ctx, p1.Addr, p1.ID)
|
||||
if err != nil {
|
||||
done <- err
|
||||
return
|
||||
}
|
||||
|
||||
// if secure, need to read + write, as that's what triggers the handshake.
|
||||
if secure {
|
||||
if err := sayHello(c2); err != nil {
|
||||
done <- err
|
||||
}
|
||||
}
|
||||
close(done)
|
||||
}()
|
||||
|
||||
c1, err := l1.Accept()
|
||||
if err != nil {
|
||||
t.Fatal("failed to accept", err)
|
||||
}
|
||||
|
||||
// if secure, need to read + write, as that's what triggers the handshake.
|
||||
if secure {
|
||||
if err := sayHello(c1); err != nil {
|
||||
done <- err
|
||||
}
|
||||
}
|
||||
|
||||
if err := <-done; err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
@ -94,6 +111,20 @@ func setupConn(t *testing.T, ctx context.Context, secure bool) (a, b Conn, p1, p
|
||||
return c1.(Conn), c2, p1, p2
|
||||
}
|
||||
|
||||
func sayHello(c net.Conn) error {
|
||||
h := []byte("hello")
|
||||
if _, err := c.Write(h); err != nil {
|
||||
return err
|
||||
}
|
||||
if _, err := c.Read(h); err != nil {
|
||||
return err
|
||||
}
|
||||
if string(h) != "hello" {
|
||||
return fmt.Errorf("did not get hello")
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func testDialer(t *testing.T, secure bool) {
|
||||
// t.Skip("Skipping in favor of another test")
|
||||
|
||||
@ -203,7 +234,7 @@ func testDialerCloseEarly(t *testing.T, secure bool) {
|
||||
go func() {
|
||||
defer func() { done <- struct{}{} }()
|
||||
|
||||
_, err := l1.Accept()
|
||||
c, err := l1.Accept()
|
||||
if err != nil {
|
||||
if strings.Contains(err.Error(), "closed") {
|
||||
gotclosed <- struct{}{}
|
||||
@ -211,7 +242,13 @@ func testDialerCloseEarly(t *testing.T, secure bool) {
|
||||
}
|
||||
errs <- err
|
||||
}
|
||||
errs <- fmt.Errorf("got conn")
|
||||
|
||||
if _, err := c.Write([]byte("hello")); err != nil {
|
||||
gotclosed <- struct{}{}
|
||||
return
|
||||
}
|
||||
|
||||
errs <- fmt.Errorf("wrote to conn")
|
||||
}()
|
||||
|
||||
c, err := d2.Dial(ctx, p1.Addr, p1.ID)
|
||||
|
@ -5,7 +5,6 @@ import (
|
||||
"net"
|
||||
"time"
|
||||
|
||||
msgio "github.com/ipfs/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-msgio"
|
||||
ma "github.com/ipfs/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-multiaddr"
|
||||
context "github.com/ipfs/go-ipfs/Godeps/_workspace/src/golang.org/x/net/context"
|
||||
|
||||
@ -16,15 +15,8 @@ import (
|
||||
|
||||
// secureConn wraps another Conn object with an encrypted channel.
|
||||
type secureConn struct {
|
||||
|
||||
// the wrapped conn
|
||||
insecure Conn
|
||||
|
||||
// secure io (wrapping insecure)
|
||||
secure msgio.ReadWriteCloser
|
||||
|
||||
// secure Session
|
||||
session secio.Session
|
||||
insecure Conn // the wrapped conn
|
||||
secure secio.Session // secure Session
|
||||
}
|
||||
|
||||
// newConn constructs a new connection
|
||||
@ -37,23 +29,20 @@ func newSecureConn(ctx context.Context, sk ic.PrivKey, insecure Conn) (Conn, err
|
||||
return nil, errors.New("insecure.LocalPeer() is nil")
|
||||
}
|
||||
if sk == nil {
|
||||
panic("way")
|
||||
return nil, errors.New("private key is nil")
|
||||
}
|
||||
|
||||
// NewSession performs the secure handshake, which takes multiple RTT
|
||||
sessgen := secio.SessionGenerator{LocalID: insecure.LocalPeer(), PrivateKey: sk}
|
||||
session, err := sessgen.NewSession(ctx, insecure)
|
||||
secure, err := sessgen.NewSession(ctx, insecure)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
conn := &secureConn{
|
||||
insecure: insecure,
|
||||
session: session,
|
||||
secure: session.ReadWriter(),
|
||||
secure: secure,
|
||||
}
|
||||
log.Debugf("newSecureConn: %v to %v handshake success!", conn.LocalPeer(), conn.RemotePeer())
|
||||
return conn, nil
|
||||
}
|
||||
|
||||
@ -102,49 +91,49 @@ func (c *secureConn) RemoteMultiaddr() ma.Multiaddr {
|
||||
|
||||
// LocalPeer is the Peer on this side
|
||||
func (c *secureConn) LocalPeer() peer.ID {
|
||||
return c.session.LocalPeer()
|
||||
return c.secure.LocalPeer()
|
||||
}
|
||||
|
||||
// RemotePeer is the Peer on the remote side
|
||||
func (c *secureConn) RemotePeer() peer.ID {
|
||||
return c.session.RemotePeer()
|
||||
return c.secure.RemotePeer()
|
||||
}
|
||||
|
||||
// LocalPrivateKey is the public key of the peer on this side
|
||||
func (c *secureConn) LocalPrivateKey() ic.PrivKey {
|
||||
return c.session.LocalPrivateKey()
|
||||
return c.secure.LocalPrivateKey()
|
||||
}
|
||||
|
||||
// RemotePubKey is the public key of the peer on the remote side
|
||||
func (c *secureConn) RemotePublicKey() ic.PubKey {
|
||||
return c.session.RemotePublicKey()
|
||||
return c.secure.RemotePublicKey()
|
||||
}
|
||||
|
||||
// Read reads data, net.Conn style
|
||||
func (c *secureConn) Read(buf []byte) (int, error) {
|
||||
return c.secure.Read(buf)
|
||||
return c.secure.ReadWriter().Read(buf)
|
||||
}
|
||||
|
||||
// Write writes data, net.Conn style
|
||||
func (c *secureConn) Write(buf []byte) (int, error) {
|
||||
return c.secure.Write(buf)
|
||||
return c.secure.ReadWriter().Write(buf)
|
||||
}
|
||||
|
||||
func (c *secureConn) NextMsgLen() (int, error) {
|
||||
return c.secure.NextMsgLen()
|
||||
return c.secure.ReadWriter().NextMsgLen()
|
||||
}
|
||||
|
||||
// ReadMsg reads data, net.Conn style
|
||||
func (c *secureConn) ReadMsg() ([]byte, error) {
|
||||
return c.secure.ReadMsg()
|
||||
return c.secure.ReadWriter().ReadMsg()
|
||||
}
|
||||
|
||||
// WriteMsg writes data, net.Conn style
|
||||
func (c *secureConn) WriteMsg(buf []byte) error {
|
||||
return c.secure.WriteMsg(buf)
|
||||
return c.secure.ReadWriter().WriteMsg(buf)
|
||||
}
|
||||
|
||||
// ReleaseMsg releases a buffer
|
||||
func (c *secureConn) ReleaseMsg(m []byte) {
|
||||
c.secure.ReleaseMsg(m)
|
||||
c.secure.ReadWriter().ReleaseMsg(m)
|
||||
}
|
||||
|
@ -23,6 +23,15 @@ func upgradeToSecureConn(t *testing.T, ctx context.Context, sk ic.PrivKey, c Con
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
// need to read + write, as that's what triggers the handshake.
|
||||
h := []byte("hello")
|
||||
if _, err := s.Write(h); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
if _, err := s.Read(h); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return s, nil
|
||||
}
|
||||
|
||||
|
Reference in New Issue
Block a user