From 8358c8d0410c9da896ade974c9d08212b1f2b39a Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=C5=81ukasz=20Magiera?= Date: Sat, 10 Mar 2018 19:02:57 +0100 Subject: [PATCH] coreapi: implement swarm api MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit License: MIT Signed-off-by: Ɓukasz Magiera --- core/coreapi/coreapi.go | 5 + core/coreapi/interface/coreapi.go | 3 + core/coreapi/interface/swarm.go | 6 +- core/coreapi/swarm.go | 147 ++++++++++++++++++++++++++++++ 4 files changed, 158 insertions(+), 3 deletions(-) create mode 100644 core/coreapi/swarm.go diff --git a/core/coreapi/coreapi.go b/core/coreapi/coreapi.go index bb7afd61a..8a0410a2d 100644 --- a/core/coreapi/coreapi.go +++ b/core/coreapi/coreapi.go @@ -67,3 +67,8 @@ func (api *CoreAPI) Pin() coreiface.PinAPI { func (api *CoreAPI) Dht() coreiface.DhtAPI { return (*DhtAPI)(api) } + +// Swarm returns the SwarmAPI interface implementation backed by the go-ipfs node +func (api *CoreAPI) Swarm() coreiface.SwarmAPI { + return &SwarmAPI{api} +} diff --git a/core/coreapi/interface/coreapi.go b/core/coreapi/interface/coreapi.go index 0053d472e..0b153b6f9 100644 --- a/core/coreapi/interface/coreapi.go +++ b/core/coreapi/interface/coreapi.go @@ -34,6 +34,9 @@ type CoreAPI interface { // Dht returns an implementation of Dht API Dht() DhtAPI + // Swarm returns an implementation of Swarm API + Swarm() SwarmAPI + // ResolvePath resolves the path using Unixfs resolver ResolvePath(context.Context, Path) (ResolvedPath, error) diff --git a/core/coreapi/interface/swarm.go b/core/coreapi/interface/swarm.go index 1ec260e07..1f0b1216f 100644 --- a/core/coreapi/interface/swarm.go +++ b/core/coreapi/interface/swarm.go @@ -1,9 +1,9 @@ package iface import ( + "context" "time" - "context" ma "gx/ipfs/QmWWQ2Txc2c6tqjsBpzg5Ar652cHPGNsQQp2SejkNmkUMb/go-multiaddr" peer "gx/ipfs/QmZoWKhxUmZ2seW4BzX6fJkNR8hh9PsGModr7q171yq2SS/go-libp2p-peer" ) @@ -17,11 +17,11 @@ type PeerInfo interface { Address() ma.Multiaddr // Latency returns last known round trip time to the peer - Latency() time.Duration + Latency(context.Context) (time.Duration, error) // Streams returns list of streams established with the peer // TODO: should this return multicodecs? - Streams() []string + Streams(context.Context) ([]string, error) } // SwarmAPI specifies the interface to libp2p swarm diff --git a/core/coreapi/swarm.go b/core/coreapi/swarm.go new file mode 100644 index 000000000..4af0ce61a --- /dev/null +++ b/core/coreapi/swarm.go @@ -0,0 +1,147 @@ +package coreapi + +import ( + "context" + "errors" + "fmt" + "time" + + coreiface "github.com/ipfs/go-ipfs/core/coreapi/interface" + + iaddr "gx/ipfs/QmQViVWBHbU6HmYjXcdNq7tVASCNgdg64ZGcauuDkLCivW/go-ipfs-addr" + swarm "gx/ipfs/QmSwZMWwFZSUpe5muU2xgTUwppH24KfMwdPXiwbEp2c6G5/go-libp2p-swarm" + ma "gx/ipfs/QmWWQ2Txc2c6tqjsBpzg5Ar652cHPGNsQQp2SejkNmkUMb/go-multiaddr" + pstore "gx/ipfs/QmXauCuJzmzapetmC6W4TuDJLL1yFFrVzSHoWv8YdbmnxH/go-libp2p-peerstore" + net "gx/ipfs/QmXfkENeeBvh3zYA51MaSdGUdBjhQ99cP5WQe8zgr6wchG/go-libp2p-net" + peer "gx/ipfs/QmZoWKhxUmZ2seW4BzX6fJkNR8hh9PsGModr7q171yq2SS/go-libp2p-peer" +) + +type SwarmAPI struct { + *CoreAPI +} + +type connInfo struct { + api *CoreAPI + conn net.Conn + + addr ma.Multiaddr + peer peer.ID + muxer string +} + +func (api *SwarmAPI) Connect(ctx context.Context, addr ma.Multiaddr) error { + if api.node.PeerHost == nil { + return coreiface.ErrOffline + } + + snet, ok := api.node.PeerHost.Network().(*swarm.Network) + if !ok { + return fmt.Errorf("peerhost network was not swarm") + } + + swrm := snet.Swarm() + + ia, err := iaddr.ParseMultiaddr(ma.Multiaddr(addr)) + if err != nil { + return err + } + + pi := pstore.PeerInfo{ + ID: ia.ID(), + Addrs: []ma.Multiaddr{ia.Transport()}, + } + + swrm.Backoff().Clear(pi.ID) + + return api.node.PeerHost.Connect(ctx, pi) +} + +func (api *SwarmAPI) Disconnect(ctx context.Context, addr ma.Multiaddr) error { + if api.node.PeerHost == nil { + return coreiface.ErrOffline + } + + ia, err := iaddr.ParseMultiaddr(ma.Multiaddr(addr)) + if err != nil { + return err + } + + taddr := ia.Transport() + + found := false + conns := api.node.PeerHost.Network().ConnsToPeer(ia.ID()) + for _, conn := range conns { + if !conn.RemoteMultiaddr().Equal(taddr) { + continue + } + + if err := conn.Close(); err != nil { + return err + } + found = true + break + } + + if !found { + return errors.New("conn not found") + } + + return nil +} + +func (api *SwarmAPI) Peers(context.Context) ([]coreiface.PeerInfo, error) { + if api.node.PeerHost == nil { + return nil, coreiface.ErrOffline + } + + conns := api.node.PeerHost.Network().Conns() + + var out []coreiface.PeerInfo + for _, c := range conns { + pid := c.RemotePeer() + addr := c.RemoteMultiaddr() + + ci := &connInfo{ + api: api.CoreAPI, + conn: c, + + addr: addr, + peer: pid, + } + + swcon, ok := c.(*swarm.Conn) + if ok { + ci.muxer = fmt.Sprintf("%T", swcon.StreamConn().Conn()) + } + + out = append(out, ci) + } + + return out, nil +} + +func (ci *connInfo) ID() peer.ID { + return ci.ID() +} + +func (ci *connInfo) Address() ma.Multiaddr { + return ci.addr +} + +func (ci *connInfo) Latency(context.Context) (time.Duration, error) { + return ci.api.node.Peerstore.LatencyEWMA(peer.ID(ci.ID())), nil +} + +func (ci *connInfo) Streams(context.Context) ([]string, error) { + streams, err := ci.conn.GetStreams() + if err != nil { + return nil, err + } + + out := make([]string, len(streams)) + for i, s := range streams { + out[i] = string(s.Protocol()) + } + + return out, nil +}