mirror of
https://github.com/ipfs/kubo.git
synced 2025-06-29 01:12:24 +08:00
Service + request
This commit is contained in:

committed by
Brian Tiger Chow

parent
ffad3bb6af
commit
06b651c454
8
net/service/Makefile
Normal file
8
net/service/Makefile
Normal file
@ -0,0 +1,8 @@
|
|||||||
|
|
||||||
|
all: request.pb.go
|
||||||
|
|
||||||
|
request.pb.go: request.proto
|
||||||
|
protoc --gogo_out=. --proto_path=../../../../../:/usr/local/opt/protobuf/include:. $<
|
||||||
|
|
||||||
|
clean:
|
||||||
|
rm request.pb.go
|
127
net/service/request.go
Normal file
127
net/service/request.go
Normal file
@ -0,0 +1,127 @@
|
|||||||
|
package service
|
||||||
|
|
||||||
|
import (
|
||||||
|
crand "crypto/rand"
|
||||||
|
|
||||||
|
msg "github.com/jbenet/go-ipfs/net/message"
|
||||||
|
peer "github.com/jbenet/go-ipfs/peer"
|
||||||
|
|
||||||
|
proto "code.google.com/p/goprotobuf/proto"
|
||||||
|
)
|
||||||
|
|
||||||
|
const (
|
||||||
|
// IDSize is the size of the ID in bytes.
|
||||||
|
IDSize int = 4
|
||||||
|
)
|
||||||
|
|
||||||
|
// RequestID is a field that identifies request-response flows.
|
||||||
|
type RequestID []byte
|
||||||
|
|
||||||
|
// Request turns a RequestID into a Request (unsetting first bit)
|
||||||
|
func (r RequestID) Request() RequestID {
|
||||||
|
if r == nil {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
r2 := make([]byte, len(r))
|
||||||
|
copy(r2, r)
|
||||||
|
r2[0] = r[0] & 0x7F // unset first bit for request
|
||||||
|
return RequestID(r2)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Response turns a RequestID into a Response (setting first bit)
|
||||||
|
func (r RequestID) Response() RequestID {
|
||||||
|
if r == nil {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
r2 := make([]byte, len(r))
|
||||||
|
copy(r2, r)
|
||||||
|
r2[0] = r[0] | 0x80 // set first bit for response
|
||||||
|
return RequestID(r2)
|
||||||
|
}
|
||||||
|
|
||||||
|
// IsRequest returns whether a RequestID identifies a request
|
||||||
|
func (r RequestID) IsRequest() bool {
|
||||||
|
if r == nil {
|
||||||
|
return false
|
||||||
|
}
|
||||||
|
return !r.IsResponse()
|
||||||
|
}
|
||||||
|
|
||||||
|
// IsResponse returns whether a RequestID identifies a response
|
||||||
|
func (r RequestID) IsResponse() bool {
|
||||||
|
if r == nil {
|
||||||
|
return false
|
||||||
|
}
|
||||||
|
return bool(r[0]&0x80 == 0x80)
|
||||||
|
}
|
||||||
|
|
||||||
|
// RandomRequestID creates and returns a new random request ID
|
||||||
|
func RandomRequestID() (RequestID, error) {
|
||||||
|
buf := make([]byte, IDSize)
|
||||||
|
_, err := crand.Read(buf)
|
||||||
|
return RequestID(buf).Request(), err
|
||||||
|
}
|
||||||
|
|
||||||
|
// RequestMap is a map of Requests. the key = (peer.ID concat RequestID).
|
||||||
|
type RequestMap map[string]*Request
|
||||||
|
|
||||||
|
// Request objects are used to multiplex request-response flows.
|
||||||
|
type Request struct {
|
||||||
|
|
||||||
|
// ID is the RequestID identifying this Request-Response Flow.
|
||||||
|
ID RequestID
|
||||||
|
|
||||||
|
// PeerID identifies the peer from whom to expect the response.
|
||||||
|
PeerID peer.ID
|
||||||
|
|
||||||
|
// Response is the channel of incoming responses.
|
||||||
|
Response chan *msg.Message
|
||||||
|
}
|
||||||
|
|
||||||
|
// NewRequest creates a request for given peer.ID
|
||||||
|
func NewRequest(pid peer.ID) (*Request, error) {
|
||||||
|
id, err := RandomRequestID()
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
return &Request{
|
||||||
|
ID: id,
|
||||||
|
PeerID: pid,
|
||||||
|
Response: make(chan *msg.Message, 1),
|
||||||
|
}, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// Key returns the RequestKey for this request. Use with maps.
|
||||||
|
func (r *Request) Key() string {
|
||||||
|
return RequestKey(r.PeerID, r.ID)
|
||||||
|
}
|
||||||
|
|
||||||
|
// RequestKey is the peer.ID concatenated with the RequestID. Use with maps.
|
||||||
|
func RequestKey(pid peer.ID, rid RequestID) string {
|
||||||
|
return string(pid) + string(rid.Request()[:])
|
||||||
|
}
|
||||||
|
|
||||||
|
func wrapData(data []byte, rid RequestID) ([]byte, error) {
|
||||||
|
// Marshal
|
||||||
|
pbm := new(PBRequest)
|
||||||
|
pbm.Data = data
|
||||||
|
pbm.Tag = rid
|
||||||
|
b, err := proto.Marshal(pbm)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
return b, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func unwrapData(data []byte) ([]byte, RequestID, error) {
|
||||||
|
// Unmarshal
|
||||||
|
pbm := new(PBRequest)
|
||||||
|
err := proto.Unmarshal(data, pbm)
|
||||||
|
if err != nil {
|
||||||
|
return nil, nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
return pbm.GetData(), pbm.GetTag(), nil
|
||||||
|
}
|
50
net/service/request.pb.go
Normal file
50
net/service/request.pb.go
Normal file
@ -0,0 +1,50 @@
|
|||||||
|
// Code generated by protoc-gen-gogo.
|
||||||
|
// source: request.proto
|
||||||
|
// DO NOT EDIT!
|
||||||
|
|
||||||
|
/*
|
||||||
|
Package service is a generated protocol buffer package.
|
||||||
|
|
||||||
|
It is generated from these files:
|
||||||
|
request.proto
|
||||||
|
|
||||||
|
It has these top-level messages:
|
||||||
|
PBRequest
|
||||||
|
*/
|
||||||
|
package service
|
||||||
|
|
||||||
|
import proto "code.google.com/p/gogoprotobuf/proto"
|
||||||
|
import json "encoding/json"
|
||||||
|
import math "math"
|
||||||
|
|
||||||
|
// Reference proto, json, and math imports to suppress error if they are not otherwise used.
|
||||||
|
var _ = proto.Marshal
|
||||||
|
var _ = &json.SyntaxError{}
|
||||||
|
var _ = math.Inf
|
||||||
|
|
||||||
|
type PBRequest struct {
|
||||||
|
Data []byte `protobuf:"bytes,1,req" json:"Data,omitempty"`
|
||||||
|
Tag []byte `protobuf:"bytes,3,opt" json:"Tag,omitempty"`
|
||||||
|
XXX_unrecognized []byte `json:"-"`
|
||||||
|
}
|
||||||
|
|
||||||
|
func (m *PBRequest) Reset() { *m = PBRequest{} }
|
||||||
|
func (m *PBRequest) String() string { return proto.CompactTextString(m) }
|
||||||
|
func (*PBRequest) ProtoMessage() {}
|
||||||
|
|
||||||
|
func (m *PBRequest) GetData() []byte {
|
||||||
|
if m != nil {
|
||||||
|
return m.Data
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (m *PBRequest) GetTag() []byte {
|
||||||
|
if m != nil {
|
||||||
|
return m.Tag
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func init() {
|
||||||
|
}
|
6
net/service/request.proto
Normal file
6
net/service/request.proto
Normal file
@ -0,0 +1,6 @@
|
|||||||
|
package service;
|
||||||
|
|
||||||
|
message PBRequest {
|
||||||
|
required bytes Data = 1;
|
||||||
|
optional bytes Tag = 3;
|
||||||
|
}
|
41
net/service/request_test.go
Normal file
41
net/service/request_test.go
Normal file
@ -0,0 +1,41 @@
|
|||||||
|
package service
|
||||||
|
|
||||||
|
import (
|
||||||
|
"bytes"
|
||||||
|
"testing"
|
||||||
|
)
|
||||||
|
|
||||||
|
func TestMarshaling(t *testing.T) {
|
||||||
|
|
||||||
|
test := func(d1 []byte, rid1 RequestID) {
|
||||||
|
d2, err := wrapData(d1, rid1)
|
||||||
|
if err != nil {
|
||||||
|
t.Error(err)
|
||||||
|
}
|
||||||
|
|
||||||
|
d3, rid2, err := unwrapData(d2)
|
||||||
|
if err != nil {
|
||||||
|
t.Error(err)
|
||||||
|
}
|
||||||
|
|
||||||
|
d4, err := wrapData(d3, rid1)
|
||||||
|
if err != nil {
|
||||||
|
t.Error(err)
|
||||||
|
}
|
||||||
|
|
||||||
|
if !bytes.Equal(rid2, rid1) {
|
||||||
|
t.Error("RequestID fail")
|
||||||
|
}
|
||||||
|
|
||||||
|
if !bytes.Equal(d1, d3) {
|
||||||
|
t.Error("unmarshalled data should be the same")
|
||||||
|
}
|
||||||
|
|
||||||
|
if !bytes.Equal(d2, d4) {
|
||||||
|
t.Error("marshalled data should be the same")
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
test([]byte("foo"), []byte{1, 2, 3, 4})
|
||||||
|
test([]byte("bar"), nil)
|
||||||
|
}
|
194
net/service/service.go
Normal file
194
net/service/service.go
Normal file
@ -0,0 +1,194 @@
|
|||||||
|
package service
|
||||||
|
|
||||||
|
import (
|
||||||
|
"errors"
|
||||||
|
"sync"
|
||||||
|
|
||||||
|
msg "github.com/jbenet/go-ipfs/net/message"
|
||||||
|
u "github.com/jbenet/go-ipfs/util"
|
||||||
|
|
||||||
|
context "code.google.com/p/go.net/context"
|
||||||
|
)
|
||||||
|
|
||||||
|
// Handler is an interface that objects must implement in order to handle
|
||||||
|
// a service's requests.
|
||||||
|
type Handler interface {
|
||||||
|
|
||||||
|
// HandleMessage receives an incoming message, and potentially returns
|
||||||
|
// a response message to send back.
|
||||||
|
HandleMessage(context.Context, *msg.Message) (*msg.Message, error)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Service is a networking component that protocols can use to multiplex
|
||||||
|
// messages over the same channel, and to issue + handle requests.
|
||||||
|
type Service struct {
|
||||||
|
// Handler is the object registered to handle incoming requests.
|
||||||
|
Handler Handler
|
||||||
|
|
||||||
|
// Requests are all the pending requests on this service.
|
||||||
|
Requests RequestMap
|
||||||
|
RequestsLock sync.RWMutex
|
||||||
|
|
||||||
|
// cancel is the function to stop the Service
|
||||||
|
cancel context.CancelFunc
|
||||||
|
|
||||||
|
// Message Pipe (connected to the outside world)
|
||||||
|
*msg.Pipe
|
||||||
|
}
|
||||||
|
|
||||||
|
// NewService creates a service object with given type ID and Handler
|
||||||
|
func NewService(ctx context.Context, h Handler) *Service {
|
||||||
|
s := &Service{
|
||||||
|
Handler: h,
|
||||||
|
Requests: RequestMap{},
|
||||||
|
Pipe: msg.NewPipe(10),
|
||||||
|
}
|
||||||
|
|
||||||
|
go s.handleIncomingMessages(ctx)
|
||||||
|
|
||||||
|
return s
|
||||||
|
}
|
||||||
|
|
||||||
|
// Start kicks off the Service goroutines.
|
||||||
|
func (s *Service) Start(ctx context.Context) error {
|
||||||
|
if s.cancel != nil {
|
||||||
|
return errors.New("Service already started.")
|
||||||
|
}
|
||||||
|
|
||||||
|
// make a cancellable context.
|
||||||
|
ctx, s.cancel = context.WithCancel(ctx)
|
||||||
|
|
||||||
|
go s.handleIncomingMessages(ctx)
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// Stop stops Service activity.
|
||||||
|
func (s *Service) Stop() {
|
||||||
|
s.cancel()
|
||||||
|
s.cancel = context.CancelFunc(nil)
|
||||||
|
}
|
||||||
|
|
||||||
|
// SendMessage sends a message out
|
||||||
|
func (s *Service) SendMessage(ctx context.Context, m *msg.Message, rid RequestID) error {
|
||||||
|
|
||||||
|
// serialize ServiceMessage wrapper
|
||||||
|
data, err := wrapData(m.Data, rid)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
// send message
|
||||||
|
m2 := &msg.Message{Peer: m.Peer, Data: data}
|
||||||
|
select {
|
||||||
|
case s.Outgoing <- m2:
|
||||||
|
case <-ctx.Done():
|
||||||
|
return ctx.Err()
|
||||||
|
}
|
||||||
|
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// SendRequest sends a request message out and awaits a response.
|
||||||
|
func (s *Service) SendRequest(ctx context.Context, m *msg.Message) (*msg.Message, error) {
|
||||||
|
|
||||||
|
// create a request
|
||||||
|
r, err := NewRequest(m.Peer.ID)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
// register Request
|
||||||
|
s.RequestsLock.Lock()
|
||||||
|
s.Requests[r.Key()] = r
|
||||||
|
s.RequestsLock.Unlock()
|
||||||
|
|
||||||
|
// defer deleting this request
|
||||||
|
defer func() {
|
||||||
|
s.RequestsLock.Lock()
|
||||||
|
delete(s.Requests, r.Key())
|
||||||
|
s.RequestsLock.Unlock()
|
||||||
|
}()
|
||||||
|
|
||||||
|
// check if we should bail after waiting for mutex
|
||||||
|
select {
|
||||||
|
default:
|
||||||
|
case <-ctx.Done():
|
||||||
|
return nil, ctx.Err()
|
||||||
|
}
|
||||||
|
|
||||||
|
// Send message
|
||||||
|
s.SendMessage(ctx, m, r.ID)
|
||||||
|
|
||||||
|
// wait for response
|
||||||
|
m = nil
|
||||||
|
err = nil
|
||||||
|
select {
|
||||||
|
case m = <-r.Response:
|
||||||
|
case <-ctx.Done():
|
||||||
|
err = ctx.Err()
|
||||||
|
}
|
||||||
|
|
||||||
|
return m, err
|
||||||
|
}
|
||||||
|
|
||||||
|
// handleIncoming consumes the messages on the s.Incoming channel and
|
||||||
|
// routes them appropriately (to requests, or handler).
|
||||||
|
func (s *Service) handleIncomingMessages(ctx context.Context) {
|
||||||
|
for {
|
||||||
|
select {
|
||||||
|
case m := <-s.Incoming:
|
||||||
|
go s.handleIncomingMessage(ctx, m)
|
||||||
|
|
||||||
|
case <-ctx.Done():
|
||||||
|
return
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *Service) handleIncomingMessage(ctx context.Context, m *msg.Message) {
|
||||||
|
|
||||||
|
// unwrap the incoming message
|
||||||
|
data, rid, err := unwrapData(m.Data)
|
||||||
|
if err != nil {
|
||||||
|
u.PErr("de-serializing error: %v\n", err)
|
||||||
|
}
|
||||||
|
m2 := &msg.Message{Peer: m.Peer, Data: data}
|
||||||
|
|
||||||
|
// if it's a request (or has no RequestID), handle it
|
||||||
|
if rid == nil || rid.IsRequest() {
|
||||||
|
r1, err := s.Handler.HandleMessage(ctx, m2)
|
||||||
|
if err != nil {
|
||||||
|
u.PErr("handled message yielded error %v\n", err)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
// if handler gave us a response, send it back out!
|
||||||
|
if r1 != nil {
|
||||||
|
err := s.SendMessage(ctx, r1, rid.Response())
|
||||||
|
if err != nil {
|
||||||
|
u.PErr("error sending response message: %v\n", err)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
// Otherwise, it is a response. handle it.
|
||||||
|
if !rid.IsResponse() {
|
||||||
|
u.PErr("RequestID should identify a response here.\n")
|
||||||
|
}
|
||||||
|
|
||||||
|
key := RequestKey(m.Peer.ID, RequestID(rid))
|
||||||
|
s.RequestsLock.RLock()
|
||||||
|
r, found := s.Requests[key]
|
||||||
|
s.RequestsLock.RUnlock()
|
||||||
|
|
||||||
|
if !found {
|
||||||
|
u.PErr("no request key %v (timeout?)\n", []byte(key))
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
select {
|
||||||
|
case r.Response <- m2:
|
||||||
|
case <-ctx.Done():
|
||||||
|
}
|
||||||
|
}
|
125
net/service/service_test.go
Normal file
125
net/service/service_test.go
Normal file
@ -0,0 +1,125 @@
|
|||||||
|
package service
|
||||||
|
|
||||||
|
import (
|
||||||
|
"bytes"
|
||||||
|
"testing"
|
||||||
|
"time"
|
||||||
|
|
||||||
|
msg "github.com/jbenet/go-ipfs/net/message"
|
||||||
|
peer "github.com/jbenet/go-ipfs/peer"
|
||||||
|
|
||||||
|
context "code.google.com/p/go.net/context"
|
||||||
|
mh "github.com/jbenet/go-multihash"
|
||||||
|
)
|
||||||
|
|
||||||
|
// ReverseHandler reverses all Data it receives and sends it back.
|
||||||
|
type ReverseHandler struct{}
|
||||||
|
|
||||||
|
func (t *ReverseHandler) HandleMessage(ctx context.Context, m *msg.Message) (
|
||||||
|
*msg.Message, error) {
|
||||||
|
|
||||||
|
d := m.Data
|
||||||
|
for i, j := 0, len(d)-1; i < j; i, j = i+1, j-1 {
|
||||||
|
d[i], d[j] = d[j], d[i]
|
||||||
|
}
|
||||||
|
|
||||||
|
return &msg.Message{Peer: m.Peer, Data: d}, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func newPeer(t *testing.T, id string) *peer.Peer {
|
||||||
|
mh, err := mh.FromHexString(id)
|
||||||
|
if err != nil {
|
||||||
|
t.Error(err)
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
return &peer.Peer{ID: peer.ID(mh)}
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestServiceHandler(t *testing.T) {
|
||||||
|
ctx := context.Background()
|
||||||
|
h := &ReverseHandler{}
|
||||||
|
s := NewService(ctx, h)
|
||||||
|
peer1 := newPeer(t, "11140beec7b5ea3f0fdbc95d0dd47f3c5bc275aaaaaa")
|
||||||
|
|
||||||
|
d, err := wrapData([]byte("beep"), nil)
|
||||||
|
if err != nil {
|
||||||
|
t.Error(err)
|
||||||
|
}
|
||||||
|
|
||||||
|
m1 := &msg.Message{Peer: peer1, Data: d}
|
||||||
|
s.Incoming <- m1
|
||||||
|
m2 := <-s.Outgoing
|
||||||
|
|
||||||
|
d, rid, err := unwrapData(m2.Data)
|
||||||
|
if err != nil {
|
||||||
|
t.Error(err)
|
||||||
|
}
|
||||||
|
|
||||||
|
if rid != nil {
|
||||||
|
t.Error("RequestID should be nil")
|
||||||
|
}
|
||||||
|
|
||||||
|
if !bytes.Equal(d, []byte("peeb")) {
|
||||||
|
t.Errorf("service handler data incorrect: %v != %v", d, "oof")
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestServiceRequest(t *testing.T) {
|
||||||
|
ctx := context.Background()
|
||||||
|
s1 := NewService(ctx, &ReverseHandler{})
|
||||||
|
s2 := NewService(ctx, &ReverseHandler{})
|
||||||
|
peer1 := newPeer(t, "11140beec7b5ea3f0fdbc95d0dd47f3c5bc275aaaaaa")
|
||||||
|
|
||||||
|
// patch services together
|
||||||
|
go func() {
|
||||||
|
for {
|
||||||
|
select {
|
||||||
|
case m := <-s1.Outgoing:
|
||||||
|
s2.Incoming <- m
|
||||||
|
case m := <-s2.Outgoing:
|
||||||
|
s1.Incoming <- m
|
||||||
|
case <-ctx.Done():
|
||||||
|
return
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}()
|
||||||
|
|
||||||
|
m1 := &msg.Message{Peer: peer1, Data: []byte("beep")}
|
||||||
|
m2, err := s1.SendRequest(ctx, m1)
|
||||||
|
if err != nil {
|
||||||
|
t.Error(err)
|
||||||
|
}
|
||||||
|
|
||||||
|
if !bytes.Equal(m2.Data, []byte("peeb")) {
|
||||||
|
t.Errorf("service handler data incorrect: %v != %v", m2.Data, "oof")
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestServiceRequestTimeout(t *testing.T) {
|
||||||
|
ctx, _ := context.WithTimeout(context.Background(), time.Millisecond)
|
||||||
|
s1 := NewService(ctx, &ReverseHandler{})
|
||||||
|
s2 := NewService(ctx, &ReverseHandler{})
|
||||||
|
peer1 := newPeer(t, "11140beec7b5ea3f0fdbc95d0dd47f3c5bc275aaaaaa")
|
||||||
|
|
||||||
|
// patch services together
|
||||||
|
go func() {
|
||||||
|
for {
|
||||||
|
<-time.After(time.Millisecond)
|
||||||
|
select {
|
||||||
|
case m := <-s1.Outgoing:
|
||||||
|
s2.Incoming <- m
|
||||||
|
case m := <-s2.Outgoing:
|
||||||
|
s1.Incoming <- m
|
||||||
|
case <-ctx.Done():
|
||||||
|
return
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}()
|
||||||
|
|
||||||
|
m1 := &msg.Message{Peer: peer1, Data: []byte("beep")}
|
||||||
|
m2, err := s1.SendRequest(ctx, m1)
|
||||||
|
if err == nil || m2 != nil {
|
||||||
|
t.Error("should've timed out")
|
||||||
|
}
|
||||||
|
}
|
Reference in New Issue
Block a user