stickiness: add stickiness support (#1969)

This commit is contained in:
Menghan Li
2018-04-24 10:37:52 -07:00
committed by GitHub
parent 4166ea7dad
commit e8a6e2844b
6 changed files with 553 additions and 3 deletions

View File

@ -21,12 +21,16 @@ package grpc
import (
"io"
"sync"
"sync/atomic"
"unsafe"
"golang.org/x/net/context"
"google.golang.org/grpc/balancer"
"google.golang.org/grpc/channelz"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/grpclog"
"google.golang.org/grpc/metadata"
"google.golang.org/grpc/resolver"
"google.golang.org/grpc/status"
"google.golang.org/grpc/transport"
)
@ -42,10 +46,16 @@ type pickerWrapper struct {
// The latest connection happened.
connErrMu sync.Mutex
connErr error
stickinessMDKey unsafe.Pointer
stickiness *stickyStore
}
func newPickerWrapper() *pickerWrapper {
bp := &pickerWrapper{blockingCh: make(chan struct{})}
bp := &pickerWrapper{
blockingCh: make(chan struct{}),
stickiness: newStickyStore(),
}
return bp
}
@ -62,6 +72,28 @@ func (bp *pickerWrapper) connectionError() error {
return err
}
func (bp *pickerWrapper) updateStickinessMDKey(newKey string) {
oldKeyUnsafePtr := atomic.SwapPointer(&bp.stickinessMDKey, unsafe.Pointer(&newKey))
if ptr := (*string)(oldKeyUnsafePtr); ptr == nil || *ptr != newKey {
bp.stickiness.reset(newKey)
}
}
func (bp *pickerWrapper) getStickinessMDKey() string {
mdKeyPtr := (*string)(atomic.LoadPointer(&bp.stickinessMDKey))
if mdKeyPtr != nil {
return *mdKeyPtr
}
return ""
}
func (bp *pickerWrapper) clearStickinessState() {
if oldKey := bp.getStickinessMDKey(); oldKey != "" {
// There's no need to reset store if mdKey was "".
bp.stickiness.reset(oldKey)
}
}
// updatePicker is called by UpdateBalancerState. It unblocks all blocked pick.
func (bp *pickerWrapper) updatePicker(p balancer.Picker) {
bp.mu.Lock()
@ -101,6 +133,27 @@ func doneChannelzWrapper(acw *acBalancerWrapper, done func(balancer.DoneInfo)) f
// - the subConn returned by the current picker is not READY
// When one of these situations happens, pick blocks until the picker gets updated.
func (bp *pickerWrapper) pick(ctx context.Context, failfast bool, opts balancer.PickOptions) (transport.ClientTransport, func(balancer.DoneInfo), error) {
mdKey := bp.getStickinessMDKey()
stickyKey, isSticky := stickyKeyFromContext(ctx, mdKey)
// Potential race here: if stickinessMDKey is updated after the above two
// lines, and this pick is a sticky pick, the following put could add an
// entry to sticky store with an outdated sticky key.
//
// The solution: keep the current md key in sticky store, and at the
// beginning of each get/put, check the mdkey against store.curMDKey.
// - Cons: one more string comparing for each get/put.
// - Pros: the string matching happens inside get/put, so the overhead for
// non-sticky RPCs will be minimal.
if isSticky {
if t, ok := bp.stickiness.get(mdKey, stickyKey); ok {
// Done function returned is always nil.
return t, nil, nil
}
}
var (
p balancer.Picker
ch chan struct{}
@ -156,6 +209,9 @@ func (bp *pickerWrapper) pick(ctx context.Context, failfast bool, opts balancer.
continue
}
if t, ok := acw.getAddrConn().getReadyTransport(); ok {
if isSticky {
bp.stickiness.put(mdKey, stickyKey, acw)
}
if channelz.IsOn() {
return t, doneChannelzWrapper(acw, done), nil
}
@ -178,3 +234,100 @@ func (bp *pickerWrapper) close() {
bp.done = true
close(bp.blockingCh)
}
type stickyStoreEntry struct {
acw *acBalancerWrapper
addr resolver.Address
}
type stickyStore struct {
mu sync.Mutex
// curMDKey is check before every get/put to avoid races. The operation will
// abort immediately when the given mdKey is different from the curMDKey.
curMDKey string
store map[string]*stickyStoreEntry
}
func newStickyStore() *stickyStore {
return &stickyStore{
store: make(map[string]*stickyStoreEntry),
}
}
// reset clears the map in stickyStore, and set the currentMDKey to newMDKey.
func (ss *stickyStore) reset(newMDKey string) {
ss.mu.Lock()
ss.curMDKey = newMDKey
ss.store = make(map[string]*stickyStoreEntry)
ss.mu.Unlock()
}
// stickyKey is the key to look up in store. mdKey will be checked against
// curMDKey to avoid races.
func (ss *stickyStore) put(mdKey, stickyKey string, acw *acBalancerWrapper) {
ss.mu.Lock()
defer ss.mu.Unlock()
if mdKey != ss.curMDKey {
return
}
// TODO(stickiness): limit the total number of entries.
ss.store[stickyKey] = &stickyStoreEntry{
acw: acw,
addr: acw.getAddrConn().getCurAddr(),
}
}
// stickyKey is the key to look up in store. mdKey will be checked against
// curMDKey to avoid races.
func (ss *stickyStore) get(mdKey, stickyKey string) (transport.ClientTransport, bool) {
ss.mu.Lock()
defer ss.mu.Unlock()
if mdKey != ss.curMDKey {
return nil, false
}
entry, ok := ss.store[stickyKey]
if !ok {
return nil, false
}
ac := entry.acw.getAddrConn()
if ac.getCurAddr() != entry.addr {
delete(ss.store, stickyKey)
return nil, false
}
t, ok := ac.getReadyTransport()
if !ok {
delete(ss.store, stickyKey)
return nil, false
}
return t, true
}
// Get one value from metadata in ctx with key stickinessMDKey.
//
// It returns "", false if stickinessMDKey is an empty string.
func stickyKeyFromContext(ctx context.Context, stickinessMDKey string) (string, bool) {
if stickinessMDKey == "" {
return "", false
}
md, added, ok := metadata.FromOutgoingContextRaw(ctx)
if !ok {
return "", false
}
if vv, ok := md[stickinessMDKey]; ok {
if len(vv) > 0 {
return vv[0], true
}
}
for _, ss := range added {
for i := 0; i < len(ss)-1; i += 2 {
if ss[i] == stickinessMDKey {
return ss[i+1], true
}
}
}
return "", false
}