From 06b651c4545ae67c10ca0b26c30fd329e6b60e2d Mon Sep 17 00:00:00 2001 From: Juan Batiz-Benet Date: Thu, 11 Sep 2014 03:47:37 -0700 Subject: [PATCH] Service + request --- net/service/Makefile | 8 ++ net/service/request.go | 127 +++++++++++++++++++++++ net/service/request.pb.go | 50 ++++++++++ net/service/request.proto | 6 ++ net/service/request_test.go | 41 ++++++++ net/service/service.go | 194 ++++++++++++++++++++++++++++++++++++ net/service/service_test.go | 125 +++++++++++++++++++++++ 7 files changed, 551 insertions(+) create mode 100644 net/service/Makefile create mode 100644 net/service/request.go create mode 100644 net/service/request.pb.go create mode 100644 net/service/request.proto create mode 100644 net/service/request_test.go create mode 100644 net/service/service.go create mode 100644 net/service/service_test.go diff --git a/net/service/Makefile b/net/service/Makefile new file mode 100644 index 000000000..990c6ade7 --- /dev/null +++ b/net/service/Makefile @@ -0,0 +1,8 @@ + +all: request.pb.go + +request.pb.go: request.proto + protoc --gogo_out=. --proto_path=../../../../../:/usr/local/opt/protobuf/include:. $< + +clean: + rm request.pb.go diff --git a/net/service/request.go b/net/service/request.go new file mode 100644 index 000000000..b29be79a3 --- /dev/null +++ b/net/service/request.go @@ -0,0 +1,127 @@ +package service + +import ( + crand "crypto/rand" + + msg "github.com/jbenet/go-ipfs/net/message" + peer "github.com/jbenet/go-ipfs/peer" + + proto "code.google.com/p/goprotobuf/proto" +) + +const ( + // IDSize is the size of the ID in bytes. + IDSize int = 4 +) + +// RequestID is a field that identifies request-response flows. +type RequestID []byte + +// Request turns a RequestID into a Request (unsetting first bit) +func (r RequestID) Request() RequestID { + if r == nil { + return nil + } + r2 := make([]byte, len(r)) + copy(r2, r) + r2[0] = r[0] & 0x7F // unset first bit for request + return RequestID(r2) +} + +// Response turns a RequestID into a Response (setting first bit) +func (r RequestID) Response() RequestID { + if r == nil { + return nil + } + r2 := make([]byte, len(r)) + copy(r2, r) + r2[0] = r[0] | 0x80 // set first bit for response + return RequestID(r2) +} + +// IsRequest returns whether a RequestID identifies a request +func (r RequestID) IsRequest() bool { + if r == nil { + return false + } + return !r.IsResponse() +} + +// IsResponse returns whether a RequestID identifies a response +func (r RequestID) IsResponse() bool { + if r == nil { + return false + } + return bool(r[0]&0x80 == 0x80) +} + +// RandomRequestID creates and returns a new random request ID +func RandomRequestID() (RequestID, error) { + buf := make([]byte, IDSize) + _, err := crand.Read(buf) + return RequestID(buf).Request(), err +} + +// RequestMap is a map of Requests. the key = (peer.ID concat RequestID). +type RequestMap map[string]*Request + +// Request objects are used to multiplex request-response flows. +type Request struct { + + // ID is the RequestID identifying this Request-Response Flow. + ID RequestID + + // PeerID identifies the peer from whom to expect the response. + PeerID peer.ID + + // Response is the channel of incoming responses. + Response chan *msg.Message +} + +// NewRequest creates a request for given peer.ID +func NewRequest(pid peer.ID) (*Request, error) { + id, err := RandomRequestID() + if err != nil { + return nil, err + } + + return &Request{ + ID: id, + PeerID: pid, + Response: make(chan *msg.Message, 1), + }, nil +} + +// Key returns the RequestKey for this request. Use with maps. +func (r *Request) Key() string { + return RequestKey(r.PeerID, r.ID) +} + +// RequestKey is the peer.ID concatenated with the RequestID. Use with maps. +func RequestKey(pid peer.ID, rid RequestID) string { + return string(pid) + string(rid.Request()[:]) +} + +func wrapData(data []byte, rid RequestID) ([]byte, error) { + // Marshal + pbm := new(PBRequest) + pbm.Data = data + pbm.Tag = rid + b, err := proto.Marshal(pbm) + if err != nil { + return nil, err + } + + return b, nil +} + +func unwrapData(data []byte) ([]byte, RequestID, error) { + // Unmarshal + pbm := new(PBRequest) + err := proto.Unmarshal(data, pbm) + if err != nil { + return nil, nil, err + } + + return pbm.GetData(), pbm.GetTag(), nil +} diff --git a/net/service/request.pb.go b/net/service/request.pb.go new file mode 100644 index 000000000..c4cec4d1d --- /dev/null +++ b/net/service/request.pb.go @@ -0,0 +1,50 @@ +// Code generated by protoc-gen-gogo. +// source: request.proto +// DO NOT EDIT! + +/* +Package service is a generated protocol buffer package. + +It is generated from these files: + request.proto + +It has these top-level messages: + PBRequest +*/ +package service + +import proto "code.google.com/p/gogoprotobuf/proto" +import json "encoding/json" +import math "math" + +// Reference proto, json, and math imports to suppress error if they are not otherwise used. +var _ = proto.Marshal +var _ = &json.SyntaxError{} +var _ = math.Inf + +type PBRequest struct { + Data []byte `protobuf:"bytes,1,req" json:"Data,omitempty"` + Tag []byte `protobuf:"bytes,3,opt" json:"Tag,omitempty"` + XXX_unrecognized []byte `json:"-"` +} + +func (m *PBRequest) Reset() { *m = PBRequest{} } +func (m *PBRequest) String() string { return proto.CompactTextString(m) } +func (*PBRequest) ProtoMessage() {} + +func (m *PBRequest) GetData() []byte { + if m != nil { + return m.Data + } + return nil +} + +func (m *PBRequest) GetTag() []byte { + if m != nil { + return m.Tag + } + return nil +} + +func init() { +} diff --git a/net/service/request.proto b/net/service/request.proto new file mode 100644 index 000000000..695308f50 --- /dev/null +++ b/net/service/request.proto @@ -0,0 +1,6 @@ +package service; + +message PBRequest { + required bytes Data = 1; + optional bytes Tag = 3; +} diff --git a/net/service/request_test.go b/net/service/request_test.go new file mode 100644 index 000000000..1931f8f63 --- /dev/null +++ b/net/service/request_test.go @@ -0,0 +1,41 @@ +package service + +import ( + "bytes" + "testing" +) + +func TestMarshaling(t *testing.T) { + + test := func(d1 []byte, rid1 RequestID) { + d2, err := wrapData(d1, rid1) + if err != nil { + t.Error(err) + } + + d3, rid2, err := unwrapData(d2) + if err != nil { + t.Error(err) + } + + d4, err := wrapData(d3, rid1) + if err != nil { + t.Error(err) + } + + if !bytes.Equal(rid2, rid1) { + t.Error("RequestID fail") + } + + if !bytes.Equal(d1, d3) { + t.Error("unmarshalled data should be the same") + } + + if !bytes.Equal(d2, d4) { + t.Error("marshalled data should be the same") + } + } + + test([]byte("foo"), []byte{1, 2, 3, 4}) + test([]byte("bar"), nil) +} diff --git a/net/service/service.go b/net/service/service.go new file mode 100644 index 000000000..4e67ca9dc --- /dev/null +++ b/net/service/service.go @@ -0,0 +1,194 @@ +package service + +import ( + "errors" + "sync" + + msg "github.com/jbenet/go-ipfs/net/message" + u "github.com/jbenet/go-ipfs/util" + + context "code.google.com/p/go.net/context" +) + +// Handler is an interface that objects must implement in order to handle +// a service's requests. +type Handler interface { + + // HandleMessage receives an incoming message, and potentially returns + // a response message to send back. + HandleMessage(context.Context, *msg.Message) (*msg.Message, error) +} + +// Service is a networking component that protocols can use to multiplex +// messages over the same channel, and to issue + handle requests. +type Service struct { + // Handler is the object registered to handle incoming requests. + Handler Handler + + // Requests are all the pending requests on this service. + Requests RequestMap + RequestsLock sync.RWMutex + + // cancel is the function to stop the Service + cancel context.CancelFunc + + // Message Pipe (connected to the outside world) + *msg.Pipe +} + +// NewService creates a service object with given type ID and Handler +func NewService(ctx context.Context, h Handler) *Service { + s := &Service{ + Handler: h, + Requests: RequestMap{}, + Pipe: msg.NewPipe(10), + } + + go s.handleIncomingMessages(ctx) + + return s +} + +// Start kicks off the Service goroutines. +func (s *Service) Start(ctx context.Context) error { + if s.cancel != nil { + return errors.New("Service already started.") + } + + // make a cancellable context. + ctx, s.cancel = context.WithCancel(ctx) + + go s.handleIncomingMessages(ctx) + return nil +} + +// Stop stops Service activity. +func (s *Service) Stop() { + s.cancel() + s.cancel = context.CancelFunc(nil) +} + +// SendMessage sends a message out +func (s *Service) SendMessage(ctx context.Context, m *msg.Message, rid RequestID) error { + + // serialize ServiceMessage wrapper + data, err := wrapData(m.Data, rid) + if err != nil { + return err + } + + // send message + m2 := &msg.Message{Peer: m.Peer, Data: data} + select { + case s.Outgoing <- m2: + case <-ctx.Done(): + return ctx.Err() + } + + return nil +} + +// SendRequest sends a request message out and awaits a response. +func (s *Service) SendRequest(ctx context.Context, m *msg.Message) (*msg.Message, error) { + + // create a request + r, err := NewRequest(m.Peer.ID) + if err != nil { + return nil, err + } + + // register Request + s.RequestsLock.Lock() + s.Requests[r.Key()] = r + s.RequestsLock.Unlock() + + // defer deleting this request + defer func() { + s.RequestsLock.Lock() + delete(s.Requests, r.Key()) + s.RequestsLock.Unlock() + }() + + // check if we should bail after waiting for mutex + select { + default: + case <-ctx.Done(): + return nil, ctx.Err() + } + + // Send message + s.SendMessage(ctx, m, r.ID) + + // wait for response + m = nil + err = nil + select { + case m = <-r.Response: + case <-ctx.Done(): + err = ctx.Err() + } + + return m, err +} + +// handleIncoming consumes the messages on the s.Incoming channel and +// routes them appropriately (to requests, or handler). +func (s *Service) handleIncomingMessages(ctx context.Context) { + for { + select { + case m := <-s.Incoming: + go s.handleIncomingMessage(ctx, m) + + case <-ctx.Done(): + return + } + } +} + +func (s *Service) handleIncomingMessage(ctx context.Context, m *msg.Message) { + + // unwrap the incoming message + data, rid, err := unwrapData(m.Data) + if err != nil { + u.PErr("de-serializing error: %v\n", err) + } + m2 := &msg.Message{Peer: m.Peer, Data: data} + + // if it's a request (or has no RequestID), handle it + if rid == nil || rid.IsRequest() { + r1, err := s.Handler.HandleMessage(ctx, m2) + if err != nil { + u.PErr("handled message yielded error %v\n", err) + return + } + + // if handler gave us a response, send it back out! + if r1 != nil { + err := s.SendMessage(ctx, r1, rid.Response()) + if err != nil { + u.PErr("error sending response message: %v\n", err) + } + } + return + } + + // Otherwise, it is a response. handle it. + if !rid.IsResponse() { + u.PErr("RequestID should identify a response here.\n") + } + + key := RequestKey(m.Peer.ID, RequestID(rid)) + s.RequestsLock.RLock() + r, found := s.Requests[key] + s.RequestsLock.RUnlock() + + if !found { + u.PErr("no request key %v (timeout?)\n", []byte(key)) + return + } + + select { + case r.Response <- m2: + case <-ctx.Done(): + } +} diff --git a/net/service/service_test.go b/net/service/service_test.go new file mode 100644 index 000000000..138e61763 --- /dev/null +++ b/net/service/service_test.go @@ -0,0 +1,125 @@ +package service + +import ( + "bytes" + "testing" + "time" + + msg "github.com/jbenet/go-ipfs/net/message" + peer "github.com/jbenet/go-ipfs/peer" + + context "code.google.com/p/go.net/context" + mh "github.com/jbenet/go-multihash" +) + +// ReverseHandler reverses all Data it receives and sends it back. +type ReverseHandler struct{} + +func (t *ReverseHandler) HandleMessage(ctx context.Context, m *msg.Message) ( + *msg.Message, error) { + + d := m.Data + for i, j := 0, len(d)-1; i < j; i, j = i+1, j-1 { + d[i], d[j] = d[j], d[i] + } + + return &msg.Message{Peer: m.Peer, Data: d}, nil +} + +func newPeer(t *testing.T, id string) *peer.Peer { + mh, err := mh.FromHexString(id) + if err != nil { + t.Error(err) + return nil + } + + return &peer.Peer{ID: peer.ID(mh)} +} + +func TestServiceHandler(t *testing.T) { + ctx := context.Background() + h := &ReverseHandler{} + s := NewService(ctx, h) + peer1 := newPeer(t, "11140beec7b5ea3f0fdbc95d0dd47f3c5bc275aaaaaa") + + d, err := wrapData([]byte("beep"), nil) + if err != nil { + t.Error(err) + } + + m1 := &msg.Message{Peer: peer1, Data: d} + s.Incoming <- m1 + m2 := <-s.Outgoing + + d, rid, err := unwrapData(m2.Data) + if err != nil { + t.Error(err) + } + + if rid != nil { + t.Error("RequestID should be nil") + } + + if !bytes.Equal(d, []byte("peeb")) { + t.Errorf("service handler data incorrect: %v != %v", d, "oof") + } +} + +func TestServiceRequest(t *testing.T) { + ctx := context.Background() + s1 := NewService(ctx, &ReverseHandler{}) + s2 := NewService(ctx, &ReverseHandler{}) + peer1 := newPeer(t, "11140beec7b5ea3f0fdbc95d0dd47f3c5bc275aaaaaa") + + // patch services together + go func() { + for { + select { + case m := <-s1.Outgoing: + s2.Incoming <- m + case m := <-s2.Outgoing: + s1.Incoming <- m + case <-ctx.Done(): + return + } + } + }() + + m1 := &msg.Message{Peer: peer1, Data: []byte("beep")} + m2, err := s1.SendRequest(ctx, m1) + if err != nil { + t.Error(err) + } + + if !bytes.Equal(m2.Data, []byte("peeb")) { + t.Errorf("service handler data incorrect: %v != %v", m2.Data, "oof") + } +} + +func TestServiceRequestTimeout(t *testing.T) { + ctx, _ := context.WithTimeout(context.Background(), time.Millisecond) + s1 := NewService(ctx, &ReverseHandler{}) + s2 := NewService(ctx, &ReverseHandler{}) + peer1 := newPeer(t, "11140beec7b5ea3f0fdbc95d0dd47f3c5bc275aaaaaa") + + // patch services together + go func() { + for { + <-time.After(time.Millisecond) + select { + case m := <-s1.Outgoing: + s2.Incoming <- m + case m := <-s2.Outgoing: + s1.Incoming <- m + case <-ctx.Done(): + return + } + } + }() + + m1 := &msg.Message{Peer: peer1, Data: []byte("beep")} + m2, err := s1.SendRequest(ctx, m1) + if err == nil || m2 != nil { + t.Error("should've timed out") + } +}