build(deps): bump github.com/uber/jaeger-client-go

Bumps [github.com/uber/jaeger-client-go](https://github.com/uber/jaeger-client-go) from 2.20.1+incompatible to 2.22.1+incompatible.
- [Release notes](https://github.com/uber/jaeger-client-go/releases)
- [Changelog](https://github.com/jaegertracing/jaeger-client-go/blob/master/CHANGELOG.md)
- [Commits](https://github.com/uber/jaeger-client-go/compare/v2.20.1...v2.22.1)

Signed-off-by: dependabot-preview[bot] <support@dependabot.com>
Signed-off-by: Matthew Heon <mheon@redhat.com>
This commit is contained in:
dependabot-preview[bot]
2020-01-23 08:40:18 +00:00
committed by Matthew Heon
parent ac3a6b80b0
commit fd36a33dc3
21 changed files with 527 additions and 38 deletions

2
go.mod
View File

@ -61,7 +61,7 @@ require (
github.com/spf13/pflag v1.0.5
github.com/stretchr/testify v1.4.0
github.com/syndtr/gocapability v0.0.0-20180916011248-d98352740cb2
github.com/uber/jaeger-client-go v2.20.1+incompatible
github.com/uber/jaeger-client-go v2.22.1+incompatible
github.com/uber/jaeger-lib v0.0.0-20190122222657-d036253de8f5 // indirect
github.com/varlink/go v0.0.0-20190502142041-0f1d566d194b
github.com/vishvananda/netlink v1.0.0

2
go.sum
View File

@ -488,6 +488,8 @@ github.com/theckman/go-flock v0.7.1/go.mod h1:kjuth3y9VJ2aNlkNEO99G/8lp9fMIKaGyB
github.com/u-root/u-root v5.0.0+incompatible/go.mod h1:RYkpo8pTHrNjW08opNd/U6p/RJE7K0D8fXO0d47+3YY=
github.com/uber/jaeger-client-go v2.20.1+incompatible h1:HgqpYBng0n7tLJIlyT4kPCIv5XgCsF+kai1NnnrJzEU=
github.com/uber/jaeger-client-go v2.20.1+incompatible/go.mod h1:WVhlPFC8FDjOFMMWRy2pZqQJSXxYSwNYOkTr/Z6d3Kk=
github.com/uber/jaeger-client-go v2.22.1+incompatible h1:NHcubEkVbahf9t3p75TOCR83gdUHXjRJvjoBh1yACsM=
github.com/uber/jaeger-client-go v2.22.1+incompatible/go.mod h1:WVhlPFC8FDjOFMMWRy2pZqQJSXxYSwNYOkTr/Z6d3Kk=
github.com/uber/jaeger-lib v0.0.0-20190122222657-d036253de8f5 h1:CwmGyzHTzCqCdZJkWR0A7ucZXgrCY7spRcpvm7ci//s=
github.com/uber/jaeger-lib v0.0.0-20190122222657-d036253de8f5/go.mod h1:ComeNDZlWwrWnDv8aPp0Ba6+uUTzImX/AauajbLI56U=
github.com/ugorji/go/codec v0.0.0-20181204163529-d75b2dcb6bc8/go.mod h1:VFNgLljTbGfSG7qAOspJ7OScBnGdDN/yBr0sguwnwf0=

View File

@ -1,6 +1,34 @@
Changes by Version
==================
2.22.1 (2020-01-16)
-------------------
- Increase UDP batch overhead to account for data loss metrics ([#488](https://github.com/jaegertracing/jaeger-client-go/pull/488)) -- Yuri Shkuro
2.22.0 (2020-01-15)
-------------------
- Report data loss stats to Jaeger backend ([#482](https://github.com/jaegertracing/jaeger-client-go/pull/482)) -- Yuri Shkuro
- Add limit on log records per span ([#483](https://github.com/jaegertracing/jaeger-client-go/pull/483)) -- Sokolov Yura
2.21.1 (2019-12-20)
-------------------
- Update version correctly.
2.21.0 (2019-12-20)
-------------------
- Clarify reporting error logs ([#469](https://github.com/jaegertracing/jaeger-client-go/pull/469)) -- Yuri Shkuro
- Do not strip leading zeros from trace IDs ([#472](https://github.com/jaegertracing/jaeger-client-go/pull/472)) -- Yuri Shkuro
- Chore (docs): fixed a couple of typos ([#475](https://github.com/jaegertracing/jaeger-client-go/pull/475)) -- Marc Bramaud
- Support custom HTTP headers when reporting spans over HTTP ([#479](https://github.com/jaegertracing/jaeger-client-go/pull/479)) -- Albert Teoh
2.20.1 (2019-11-08)
-------------------

View File

@ -83,8 +83,12 @@ cover-html: cover
test-examples:
make -C examples
.PHONY: thrift
thrift: idl-submodule thrift-compile
# TODO at the moment we're not generating tchan_*.go files
thrift: idl-submodule thrift-image
.PHONY: thrift-compile
thrift-compile: thrift-image
$(THRIFT) -o /data --gen go:$(THRIFT_GO_ARGS) --out /data/$(THRIFT_GEN_DIR) /data/idl/thrift/agent.thrift
$(THRIFT) -o /data --gen go:$(THRIFT_GO_ARGS) --out /data/$(THRIFT_GEN_DIR) /data/idl/thrift/sampling.thrift
$(THRIFT) -o /data --gen go:$(THRIFT_GO_ARGS) --out /data/$(THRIFT_GEN_DIR) /data/idl/thrift/jaeger.thrift
@ -99,10 +103,12 @@ thrift: idl-submodule thrift-image
rm -rf crossdock/thrift/*/*-remote
rm -rf thrift-gen/jaeger/collector.go
.PHONY: idl-submodule
idl-submodule:
git submodule init
git submodule update
.PHONY: thrift-image
thrift-image:
$(THRIFT) -version

View File

@ -45,7 +45,7 @@ and [config/example_test.go](./config/example_test.go).
### Environment variables
The tracer can be initialized with values coming from environment variables. None of the env vars are required
and all of them can be overriden via direct setting of the property on the configuration object.
and all of them can be overridden via direct setting of the property on the configuration object.
Property| Description
--- | ---

View File

@ -134,6 +134,10 @@ type ReporterConfig struct {
// Password instructs reporter to include a password for basic http authentication when sending spans to
// jaeger-collector. Can be set by exporting an environment variable named JAEGER_PASSWORD
Password string `yaml:"password"`
// HTTPHeaders instructs the reporter to add these headers to the http request when reporting spans.
// This field takes effect only when using HTTPTransport by setting the CollectorEndpoint.
HTTPHeaders map[string]string `yaml:"http_headers"`
}
// BaggageRestrictionsConfig configures the baggage restrictions manager which can be used to whitelist
@ -397,11 +401,12 @@ func (rc *ReporterConfig) NewReporter(
func (rc *ReporterConfig) newTransport() (jaeger.Transport, error) {
switch {
case rc.CollectorEndpoint != "" && rc.User != "" && rc.Password != "":
return transport.NewHTTPTransport(rc.CollectorEndpoint, transport.HTTPBatchSize(1),
transport.HTTPBasicAuth(rc.User, rc.Password)), nil
case rc.CollectorEndpoint != "":
return transport.NewHTTPTransport(rc.CollectorEndpoint, transport.HTTPBatchSize(1)), nil
httpOptions := []transport.HTTPOption{transport.HTTPBatchSize(1), transport.HTTPHeaders(rc.HTTPHeaders)}
if rc.User != "" && rc.Password != "" {
httpOptions = append(httpOptions, transport.HTTPBasicAuth(rc.User, rc.Password))
}
return transport.NewHTTPTransport(rc.CollectorEndpoint, httpOptions...), nil
default:
return jaeger.NewUDPTransport(rc.LocalAgentHostPort, 0)
}

View File

@ -22,7 +22,7 @@ import (
const (
// JaegerClientVersion is the version of the client library reported as Span tag.
JaegerClientVersion = "Go-2.20.1"
JaegerClientVersion = "Go-2.22.1"
// JaegerClientVersionTagKey is the name of the tag used to report client version.
JaegerClientVersionTagKey = "jaeger.version"

View File

@ -0,0 +1,25 @@
// Copyright (c) 2020 The Jaeger Authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package reporterstats
// ReporterStats exposes some metrics from the RemoteReporter.
type ReporterStats interface {
SpansDroppedFromQueue() int64
}
// Receiver can be implemented by a Transport to be given ReporterStats.
type Receiver interface {
SetReporterStats(ReporterStats)
}

View File

@ -24,6 +24,7 @@ import (
)
// BuildJaegerThrift builds jaeger span based on internal span.
// TODO: (breaking change) move to internal package.
func BuildJaegerThrift(span *Span) *j.Span {
span.Lock()
defer span.Unlock()
@ -46,6 +47,7 @@ func BuildJaegerThrift(span *Span) *j.Span {
}
// BuildJaegerProcessThrift creates a thrift Process type.
// TODO: (breaking change) move to internal package.
func BuildJaegerProcessThrift(span *Span) *j.Process {
span.Lock()
defer span.Unlock()

View File

@ -22,6 +22,7 @@ import (
"github.com/opentracing/opentracing-go"
"github.com/uber/jaeger-client-go/internal/reporterstats"
"github.com/uber/jaeger-client-go/log"
)
@ -176,16 +177,31 @@ type reporterQueueItem struct {
close *sync.WaitGroup
}
// reporterStats implements reporterstats.ReporterStats.
type reporterStats struct {
droppedCount int64 // provided to Transports to report data loss to the backend
}
// SpansDroppedFromQueue implements reporterstats.ReporterStats.
func (r *reporterStats) SpansDroppedFromQueue() int64 {
return atomic.LoadInt64(&r.droppedCount)
}
func (r *reporterStats) incDroppedCount() {
atomic.AddInt64(&r.droppedCount, 1)
}
type remoteReporter struct {
// These fields must be first in the struct because `sync/atomic` expects 64-bit alignment.
// Cf. https://github.com/uber/jaeger-client-go/issues/155, https://goo.gl/zW7dgq
queueLength int64
queueLength int64 // used to update metrics.Gauge
closed int64 // 0 - not closed, 1 - closed
reporterOptions
sender Transport
queue chan reporterQueueItem
sender Transport
queue chan reporterQueueItem
reporterStats *reporterStats
}
// NewRemoteReporter creates a new reporter that sends spans out of process by means of Sender.
@ -213,6 +229,10 @@ func NewRemoteReporter(sender Transport, opts ...ReporterOption) Reporter {
reporterOptions: options,
sender: sender,
queue: make(chan reporterQueueItem, options.queueSize),
reporterStats: new(reporterStats),
}
if receiver, ok := sender.(reporterstats.Receiver); ok {
receiver.SetReporterStats(reporter.reporterStats)
}
go reporter.processQueue()
return reporter
@ -231,6 +251,7 @@ func (r *remoteReporter) Report(span *Span) {
atomic.AddInt64(&r.queueLength, 1)
default:
r.metrics.ReporterDropped.Inc(1)
r.reporterStats.incDroppedCount()
}
}
@ -241,7 +262,7 @@ func (r *remoteReporter) Close() {
return
}
r.sendCloseEvent()
r.sender.Close()
_ = r.sender.Close()
}
func (r *remoteReporter) sendCloseEvent() {
@ -263,7 +284,7 @@ func (r *remoteReporter) processQueue() {
flush := func() {
if flushed, err := r.sender.Flush(); err != nil {
r.metrics.ReporterFailure.Inc(int64(flushed))
r.logger.Error(fmt.Sprintf("error when flushing the buffer: %s", err.Error()))
r.logger.Error(fmt.Sprintf("failed to flush Jaeger spans to server: %s", err.Error()))
} else if flushed > 0 {
r.metrics.ReporterSuccess.Inc(int64(flushed))
}
@ -281,7 +302,7 @@ func (r *remoteReporter) processQueue() {
span := item.span
if flushed, err := r.sender.Append(span); err != nil {
r.metrics.ReporterFailure.Inc(int64(flushed))
r.logger.Error(fmt.Sprintf("error reporting span %q: %s", span.OperationName(), err.Error()))
r.logger.Error(fmt.Sprintf("error reporting Jaeger span %q: %s", span.OperationName(), err.Error()))
} else if flushed > 0 {
r.metrics.ReporterSuccess.Inc(int64(flushed))
// to reduce the number of gauge stats, we only emit queue length on flush

View File

@ -59,6 +59,9 @@ type Span struct {
// The span's "micro-log"
logs []opentracing.LogRecord
// The number of logs dropped because of MaxLogsPerSpan.
numDroppedLogs int
// references for this span
references []Reference
@ -152,7 +155,12 @@ func (s *Span) Logs() []opentracing.LogRecord {
s.Lock()
defer s.Unlock()
return append([]opentracing.LogRecord(nil), s.logs...)
logs := append([]opentracing.LogRecord(nil), s.logs...)
if s.numDroppedLogs != 0 {
fixLogs(logs, s.numDroppedLogs)
}
return logs
}
// References returns references for this span
@ -234,8 +242,65 @@ func (s *Span) Log(ld opentracing.LogData) {
// this function should only be called while holding a Write lock
func (s *Span) appendLogNoLocking(lr opentracing.LogRecord) {
// TODO add logic to limit number of logs per span (issue #46)
s.logs = append(s.logs, lr)
maxLogs := s.tracer.options.maxLogsPerSpan
if maxLogs == 0 || len(s.logs) < maxLogs {
s.logs = append(s.logs, lr)
return
}
// We have too many logs. We don't touch the first numOld logs; we treat the
// rest as a circular buffer and overwrite the oldest log among those.
numOld := (maxLogs - 1) / 2
numNew := maxLogs - numOld
s.logs[numOld+s.numDroppedLogs%numNew] = lr
s.numDroppedLogs++
}
// rotateLogBuffer rotates the records in the buffer: records 0 to pos-1 move at
// the end (i.e. pos circular left shifts).
func rotateLogBuffer(buf []opentracing.LogRecord, pos int) {
// This algorithm is described in:
// http://www.cplusplus.com/reference/algorithm/rotate
for first, middle, next := 0, pos, pos; first != middle; {
buf[first], buf[next] = buf[next], buf[first]
first++
next++
if next == len(buf) {
next = middle
} else if first == middle {
middle = next
}
}
}
func fixLogs(logs []opentracing.LogRecord, numDroppedLogs int) {
// We dropped some log events, which means that we used part of Logs as a
// circular buffer (see appendLog). De-circularize it.
numOld := (len(logs) - 1) / 2
numNew := len(logs) - numOld
rotateLogBuffer(logs[numOld:], numDroppedLogs%numNew)
// Replace the log in the middle (the oldest "new" log) with information
// about the dropped logs. This means that we are effectively dropping one
// more "new" log.
numDropped := numDroppedLogs + 1
logs[numOld] = opentracing.LogRecord{
// Keep the timestamp of the last dropped event.
Timestamp: logs[numOld].Timestamp,
Fields: []log.Field{
log.String("event", "dropped Span logs"),
log.Int("dropped_log_count", numDropped),
log.String("component", "jaeger-client"),
},
}
}
func (s *Span) fixLogsIfDropped() {
if s.numDroppedLogs == 0 {
return
}
fixLogs(s.logs, s.numDroppedLogs)
s.numDroppedLogs = 0
}
// SetBaggageItem implements SetBaggageItem() of opentracing.SpanContext
@ -274,8 +339,9 @@ func (s *Span) FinishWithOptions(options opentracing.FinishOptions) {
s.applySamplingDecision(decision, true)
}
if s.context.IsSampled() {
s.Lock()
s.fixLogsIfDropped()
if len(options.LogRecords) > 0 || len(options.BulkLogData) > 0 {
s.Lock()
// Note: bulk logs are not subject to maxLogsPerSpan limit
if options.LogRecords != nil {
s.logs = append(s.logs, options.LogRecords...)
@ -283,8 +349,8 @@ func (s *Span) FinishWithOptions(options opentracing.FinishOptions) {
for _, ld := range options.BulkLogData {
s.logs = append(s.logs, ld.ToLogRecord())
}
s.Unlock()
}
s.Unlock()
}
// call reportSpan even for non-sampled traces, to return span to the pool
// and update metrics counter
@ -344,6 +410,7 @@ func (s *Span) reset() {
// Note: To reuse memory we can save the pointers on the heap
s.tags = s.tags[:0]
s.logs = s.logs[:0]
s.numDroppedLogs = 0
s.references = s.references[:0]
}

View File

@ -213,9 +213,9 @@ func (c SpanContext) SetFirehose() {
func (c SpanContext) String() string {
if c.traceID.High == 0 {
return fmt.Sprintf("%x:%x:%x:%x", c.traceID.Low, uint64(c.spanID), uint64(c.parentID), c.samplingState.stateFlags.Load())
return fmt.Sprintf("%016x:%016x:%016x:%x", c.traceID.Low, uint64(c.spanID), uint64(c.parentID), c.samplingState.stateFlags.Load())
}
return fmt.Sprintf("%x%016x:%x:%x:%x", c.traceID.High, c.traceID.Low, uint64(c.spanID), uint64(c.parentID), c.samplingState.stateFlags.Load())
return fmt.Sprintf("%016x%016x:%016x:%016x:%x", c.traceID.High, c.traceID.Low, uint64(c.spanID), uint64(c.parentID), c.samplingState.stateFlags.Load())
}
// ContextFromString reconstructs the Context encoded in a string

View File

@ -1576,12 +1576,193 @@ func (p *Process) String() string {
return fmt.Sprintf("Process(%+v)", *p)
}
// Attributes:
// - FullQueueDroppedSpans
// - TooLargeDroppedSpans
// - FailedToEmitSpans
type ClientStats struct {
FullQueueDroppedSpans int64 `thrift:"fullQueueDroppedSpans,1,required" json:"fullQueueDroppedSpans"`
TooLargeDroppedSpans int64 `thrift:"tooLargeDroppedSpans,2,required" json:"tooLargeDroppedSpans"`
FailedToEmitSpans int64 `thrift:"failedToEmitSpans,3,required" json:"failedToEmitSpans"`
}
func NewClientStats() *ClientStats {
return &ClientStats{}
}
func (p *ClientStats) GetFullQueueDroppedSpans() int64 {
return p.FullQueueDroppedSpans
}
func (p *ClientStats) GetTooLargeDroppedSpans() int64 {
return p.TooLargeDroppedSpans
}
func (p *ClientStats) GetFailedToEmitSpans() int64 {
return p.FailedToEmitSpans
}
func (p *ClientStats) Read(iprot thrift.TProtocol) error {
if _, err := iprot.ReadStructBegin(); err != nil {
return thrift.PrependError(fmt.Sprintf("%T read error: ", p), err)
}
var issetFullQueueDroppedSpans bool = false
var issetTooLargeDroppedSpans bool = false
var issetFailedToEmitSpans bool = false
for {
_, fieldTypeId, fieldId, err := iprot.ReadFieldBegin()
if err != nil {
return thrift.PrependError(fmt.Sprintf("%T field %d read error: ", p, fieldId), err)
}
if fieldTypeId == thrift.STOP {
break
}
switch fieldId {
case 1:
if err := p.readField1(iprot); err != nil {
return err
}
issetFullQueueDroppedSpans = true
case 2:
if err := p.readField2(iprot); err != nil {
return err
}
issetTooLargeDroppedSpans = true
case 3:
if err := p.readField3(iprot); err != nil {
return err
}
issetFailedToEmitSpans = true
default:
if err := iprot.Skip(fieldTypeId); err != nil {
return err
}
}
if err := iprot.ReadFieldEnd(); err != nil {
return err
}
}
if err := iprot.ReadStructEnd(); err != nil {
return thrift.PrependError(fmt.Sprintf("%T read struct end error: ", p), err)
}
if !issetFullQueueDroppedSpans {
return thrift.NewTProtocolExceptionWithType(thrift.INVALID_DATA, fmt.Errorf("Required field FullQueueDroppedSpans is not set"))
}
if !issetTooLargeDroppedSpans {
return thrift.NewTProtocolExceptionWithType(thrift.INVALID_DATA, fmt.Errorf("Required field TooLargeDroppedSpans is not set"))
}
if !issetFailedToEmitSpans {
return thrift.NewTProtocolExceptionWithType(thrift.INVALID_DATA, fmt.Errorf("Required field FailedToEmitSpans is not set"))
}
return nil
}
func (p *ClientStats) readField1(iprot thrift.TProtocol) error {
if v, err := iprot.ReadI64(); err != nil {
return thrift.PrependError("error reading field 1: ", err)
} else {
p.FullQueueDroppedSpans = v
}
return nil
}
func (p *ClientStats) readField2(iprot thrift.TProtocol) error {
if v, err := iprot.ReadI64(); err != nil {
return thrift.PrependError("error reading field 2: ", err)
} else {
p.TooLargeDroppedSpans = v
}
return nil
}
func (p *ClientStats) readField3(iprot thrift.TProtocol) error {
if v, err := iprot.ReadI64(); err != nil {
return thrift.PrependError("error reading field 3: ", err)
} else {
p.FailedToEmitSpans = v
}
return nil
}
func (p *ClientStats) Write(oprot thrift.TProtocol) error {
if err := oprot.WriteStructBegin("ClientStats"); err != nil {
return thrift.PrependError(fmt.Sprintf("%T write struct begin error: ", p), err)
}
if err := p.writeField1(oprot); err != nil {
return err
}
if err := p.writeField2(oprot); err != nil {
return err
}
if err := p.writeField3(oprot); err != nil {
return err
}
if err := oprot.WriteFieldStop(); err != nil {
return thrift.PrependError("write field stop error: ", err)
}
if err := oprot.WriteStructEnd(); err != nil {
return thrift.PrependError("write struct stop error: ", err)
}
return nil
}
func (p *ClientStats) writeField1(oprot thrift.TProtocol) (err error) {
if err := oprot.WriteFieldBegin("fullQueueDroppedSpans", thrift.I64, 1); err != nil {
return thrift.PrependError(fmt.Sprintf("%T write field begin error 1:fullQueueDroppedSpans: ", p), err)
}
if err := oprot.WriteI64(int64(p.FullQueueDroppedSpans)); err != nil {
return thrift.PrependError(fmt.Sprintf("%T.fullQueueDroppedSpans (1) field write error: ", p), err)
}
if err := oprot.WriteFieldEnd(); err != nil {
return thrift.PrependError(fmt.Sprintf("%T write field end error 1:fullQueueDroppedSpans: ", p), err)
}
return err
}
func (p *ClientStats) writeField2(oprot thrift.TProtocol) (err error) {
if err := oprot.WriteFieldBegin("tooLargeDroppedSpans", thrift.I64, 2); err != nil {
return thrift.PrependError(fmt.Sprintf("%T write field begin error 2:tooLargeDroppedSpans: ", p), err)
}
if err := oprot.WriteI64(int64(p.TooLargeDroppedSpans)); err != nil {
return thrift.PrependError(fmt.Sprintf("%T.tooLargeDroppedSpans (2) field write error: ", p), err)
}
if err := oprot.WriteFieldEnd(); err != nil {
return thrift.PrependError(fmt.Sprintf("%T write field end error 2:tooLargeDroppedSpans: ", p), err)
}
return err
}
func (p *ClientStats) writeField3(oprot thrift.TProtocol) (err error) {
if err := oprot.WriteFieldBegin("failedToEmitSpans", thrift.I64, 3); err != nil {
return thrift.PrependError(fmt.Sprintf("%T write field begin error 3:failedToEmitSpans: ", p), err)
}
if err := oprot.WriteI64(int64(p.FailedToEmitSpans)); err != nil {
return thrift.PrependError(fmt.Sprintf("%T.failedToEmitSpans (3) field write error: ", p), err)
}
if err := oprot.WriteFieldEnd(); err != nil {
return thrift.PrependError(fmt.Sprintf("%T write field end error 3:failedToEmitSpans: ", p), err)
}
return err
}
func (p *ClientStats) String() string {
if p == nil {
return "<nil>"
}
return fmt.Sprintf("ClientStats(%+v)", *p)
}
// Attributes:
// - Process
// - Spans
// - SeqNo
// - Stats
type Batch struct {
Process *Process `thrift:"process,1,required" json:"process"`
Spans []*Span `thrift:"spans,2,required" json:"spans"`
Process *Process `thrift:"process,1,required" json:"process"`
Spans []*Span `thrift:"spans,2,required" json:"spans"`
SeqNo *int64 `thrift:"seqNo,3" json:"seqNo,omitempty"`
Stats *ClientStats `thrift:"stats,4" json:"stats,omitempty"`
}
func NewBatch() *Batch {
@ -1600,10 +1781,36 @@ func (p *Batch) GetProcess() *Process {
func (p *Batch) GetSpans() []*Span {
return p.Spans
}
var Batch_SeqNo_DEFAULT int64
func (p *Batch) GetSeqNo() int64 {
if !p.IsSetSeqNo() {
return Batch_SeqNo_DEFAULT
}
return *p.SeqNo
}
var Batch_Stats_DEFAULT *ClientStats
func (p *Batch) GetStats() *ClientStats {
if !p.IsSetStats() {
return Batch_Stats_DEFAULT
}
return p.Stats
}
func (p *Batch) IsSetProcess() bool {
return p.Process != nil
}
func (p *Batch) IsSetSeqNo() bool {
return p.SeqNo != nil
}
func (p *Batch) IsSetStats() bool {
return p.Stats != nil
}
func (p *Batch) Read(iprot thrift.TProtocol) error {
if _, err := iprot.ReadStructBegin(); err != nil {
return thrift.PrependError(fmt.Sprintf("%T read error: ", p), err)
@ -1631,6 +1838,14 @@ func (p *Batch) Read(iprot thrift.TProtocol) error {
return err
}
issetSpans = true
case 3:
if err := p.readField3(iprot); err != nil {
return err
}
case 4:
if err := p.readField4(iprot); err != nil {
return err
}
default:
if err := iprot.Skip(fieldTypeId); err != nil {
return err
@ -1680,6 +1895,23 @@ func (p *Batch) readField2(iprot thrift.TProtocol) error {
return nil
}
func (p *Batch) readField3(iprot thrift.TProtocol) error {
if v, err := iprot.ReadI64(); err != nil {
return thrift.PrependError("error reading field 3: ", err)
} else {
p.SeqNo = &v
}
return nil
}
func (p *Batch) readField4(iprot thrift.TProtocol) error {
p.Stats = &ClientStats{}
if err := p.Stats.Read(iprot); err != nil {
return thrift.PrependError(fmt.Sprintf("%T error reading struct: ", p.Stats), err)
}
return nil
}
func (p *Batch) Write(oprot thrift.TProtocol) error {
if err := oprot.WriteStructBegin("Batch"); err != nil {
return thrift.PrependError(fmt.Sprintf("%T write struct begin error: ", p), err)
@ -1690,6 +1922,12 @@ func (p *Batch) Write(oprot thrift.TProtocol) error {
if err := p.writeField2(oprot); err != nil {
return err
}
if err := p.writeField3(oprot); err != nil {
return err
}
if err := p.writeField4(oprot); err != nil {
return err
}
if err := oprot.WriteFieldStop(); err != nil {
return thrift.PrependError("write field stop error: ", err)
}
@ -1733,6 +1971,36 @@ func (p *Batch) writeField2(oprot thrift.TProtocol) (err error) {
return err
}
func (p *Batch) writeField3(oprot thrift.TProtocol) (err error) {
if p.IsSetSeqNo() {
if err := oprot.WriteFieldBegin("seqNo", thrift.I64, 3); err != nil {
return thrift.PrependError(fmt.Sprintf("%T write field begin error 3:seqNo: ", p), err)
}
if err := oprot.WriteI64(int64(*p.SeqNo)); err != nil {
return thrift.PrependError(fmt.Sprintf("%T.seqNo (3) field write error: ", p), err)
}
if err := oprot.WriteFieldEnd(); err != nil {
return thrift.PrependError(fmt.Sprintf("%T write field end error 3:seqNo: ", p), err)
}
}
return err
}
func (p *Batch) writeField4(oprot thrift.TProtocol) (err error) {
if p.IsSetStats() {
if err := oprot.WriteFieldBegin("stats", thrift.STRUCT, 4); err != nil {
return thrift.PrependError(fmt.Sprintf("%T write field begin error 4:stats: ", p), err)
}
if err := p.Stats.Write(oprot); err != nil {
return thrift.PrependError(fmt.Sprintf("%T error writing struct: ", p.Stats), err)
}
if err := oprot.WriteFieldEnd(); err != nil {
return thrift.PrependError(fmt.Sprintf("%T write field end error 4:stats: ", p), err)
}
}
return err
}
func (p *Batch) String() string {
if p == nil {
return "<nil>"

View File

@ -729,7 +729,7 @@ func (p *BinaryAnnotation) String() string {
// precise value possible. For example, gettimeofday or syncing nanoTime
// against a tick of currentTimeMillis.
//
// For compatibilty with instrumentation that precede this field, collectors
// For compatibility with instrumentation that precede this field, collectors
// or span stores can derive this via Annotation.timestamp.
// For example, SERVER_RECV.timestamp or CLIENT_SEND.timestamp.
//
@ -741,7 +741,7 @@ func (p *BinaryAnnotation) String() string {
// precise measurement decoupled from problems of clocks, such as skew or NTP
// updates causing time to move backwards.
//
// For compatibilty with instrumentation that precede this field, collectors
// For compatibility with instrumentation that precede this field, collectors
// or span stores can derive this by subtracting Annotation.timestamp.
// For example, SERVER_SEND.timestamp - SERVER_RECV.timestamp.
//

View File

@ -52,6 +52,7 @@ type Tracer struct {
highTraceIDGenerator func() uint64 // custom high trace ID generator
maxTagValueLength int
noDebugFlagOnForcedSampling bool
maxLogsPerSpan int
// more options to come
}
// allocator of Span objects

View File

@ -144,6 +144,18 @@ func (tracerOptions) MaxTagValueLength(maxTagValueLength int) TracerOption {
}
}
// MaxLogsPerSpan limits the number of Logs in a span (if set to a nonzero
// value). If a span has more logs than this value, logs are dropped as
// necessary (and replaced with a log describing how many were dropped).
//
// About half of the MaxLogsPerSpan logs kept are the oldest logs, and about
// half are the newest logs.
func (tracerOptions) MaxLogsPerSpan(maxLogsPerSpan int) TracerOption {
return func(tracer *Tracer) {
tracer.options.maxLogsPerSpan = maxLogsPerSpan
}
}
func (tracerOptions) ZipkinSharedRPCSpan(zipkinSharedRPCSpan bool) TracerOption {
return func(tracer *Tracer) {
tracer.options.zipkinSharedRPCSpan = zipkinSharedRPCSpan

View File

@ -39,6 +39,7 @@ type HTTPTransport struct {
spans []*j.Span
process *j.Process
httpCredentials *HTTPBasicAuthCredentials
headers map[string]string
}
// HTTPBasicAuthCredentials stores credentials for HTTP basic auth.
@ -76,6 +77,13 @@ func HTTPRoundTripper(transport http.RoundTripper) HTTPOption {
}
}
// HTTPHeaders defines the HTTP headers that will be attached to the jaeger client's HTTP request
func HTTPHeaders(headers map[string]string) HTTPOption {
return func(c *HTTPTransport) {
c.headers = headers
}
}
// NewHTTPTransport returns a new HTTP-backend transport. url should be an http
// url of the collector to handle POST request, typically something like:
// http://hostname:14268/api/traces?format=jaeger.thrift
@ -136,6 +144,9 @@ func (c *HTTPTransport) send(spans []*j.Span) error {
return err
}
req.Header.Set("Content-Type", "application/x-thrift")
for k, v := range c.headers {
req.Header.Set(k, v)
}
if c.httpCredentials != nil {
req.SetBasicAuth(c.httpCredentials.username, c.httpCredentials.password)

View File

@ -18,8 +18,8 @@ import (
"errors"
"fmt"
"github.com/uber/jaeger-client-go/internal/reporterstats"
"github.com/uber/jaeger-client-go/thrift"
j "github.com/uber/jaeger-client-go/thrift-gen/jaeger"
"github.com/uber/jaeger-client-go/utils"
)
@ -27,12 +27,14 @@ import (
// Empirically obtained constant for how many bytes in the message are used for envelope.
// The total datagram size is:
// sizeof(Span) * numSpans + processByteSize + emitBatchOverhead <= maxPacketSize
// There is a unit test `TestEmitBatchOverhead` that validates this number.
//
// Note that due to the use of Compact Thrift protocol, overhead grows with the number of spans
// in the batch, because the length of the list is encoded as varint32, as well as SeqId.
const emitBatchOverhead = 30
//
// There is a unit test `TestEmitBatchOverhead` that validates this number, it fails at <68.
const emitBatchOverhead = 70
var errSpanTooLarge = errors.New("Span is too large")
var errSpanTooLarge = errors.New("span is too large")
type udpSender struct {
client *utils.AgentClientUDP
@ -44,9 +46,19 @@ type udpSender struct {
thriftProtocol thrift.TProtocol
process *j.Process
processByteSize int
// reporterStats provides access to stats that are only known to Reporter
reporterStats reporterstats.ReporterStats
// The following counters are always non-negative, but we need to send them in signed i64 Thrift fields,
// so we keep them as signed. At 10k QPS, overflow happens in about 300 million years.
batchSeqNo int64
tooLargeDroppedSpans int64
failedToEmitSpans int64
}
// NewUDPTransport creates a reporter that submits spans to jaeger-agent
// NewUDPTransport creates a reporter that submits spans to jaeger-agent.
// TODO: (breaking change) move to transport/ package.
func NewUDPTransport(hostPort string, maxPacketSize int) (Transport, error) {
if len(hostPort) == 0 {
hostPort = fmt.Sprintf("%s:%d", DefaultUDPSpanServerHost, DefaultUDPSpanServerPort)
@ -66,17 +78,22 @@ func NewUDPTransport(hostPort string, maxPacketSize int) (Transport, error) {
return nil, err
}
sender := &udpSender{
return &udpSender{
client: client,
maxSpanBytes: maxPacketSize - emitBatchOverhead,
thriftBuffer: thriftBuffer,
thriftProtocol: thriftProtocol}
return sender, nil
thriftProtocol: thriftProtocol,
}, nil
}
// SetReporterStats implements reporterstats.Receiver.
func (s *udpSender) SetReporterStats(rs reporterstats.ReporterStats) {
s.reporterStats = rs
}
func (s *udpSender) calcSizeOfSerializedThrift(thriftStruct thrift.TStruct) int {
s.thriftBuffer.Reset()
thriftStruct.Write(s.thriftProtocol)
_ = thriftStruct.Write(s.thriftProtocol)
return s.thriftBuffer.Len()
}
@ -89,6 +106,7 @@ func (s *udpSender) Append(span *Span) (int, error) {
jSpan := BuildJaegerThrift(span)
spanSize := s.calcSizeOfSerializedThrift(jSpan)
if spanSize > s.maxSpanBytes {
s.tooLargeDroppedSpans++
return 1, errSpanTooLarge
}
@ -112,9 +130,18 @@ func (s *udpSender) Flush() (int, error) {
if n == 0 {
return 0, nil
}
err := s.client.EmitBatch(&j.Batch{Process: s.process, Spans: s.spanBuffer})
s.batchSeqNo++
batchSeqNo := int64(s.batchSeqNo)
err := s.client.EmitBatch(&j.Batch{
Process: s.process,
Spans: s.spanBuffer,
SeqNo: &batchSeqNo,
Stats: s.makeStats(),
})
s.resetBuffers()
if err != nil {
s.failedToEmitSpans += int64(n)
}
return n, err
}
@ -129,3 +156,15 @@ func (s *udpSender) resetBuffers() {
s.spanBuffer = s.spanBuffer[:0]
s.byteBufferSize = s.processByteSize
}
func (s *udpSender) makeStats() *j.ClientStats {
var dropped int64
if s.reporterStats != nil {
dropped = s.reporterStats.SpansDroppedFromQueue()
}
return &j.ClientStats{
FullQueueDroppedSpans: dropped,
TooLargeDroppedSpans: s.tooLargeDroppedSpans,
FailedToEmitSpans: s.failedToEmitSpans,
}
}

View File

@ -85,7 +85,7 @@ func (a *AgentClientUDP) EmitBatch(batch *jaeger.Batch) error {
return err
}
if a.thriftBuffer.Len() > a.maxPacketSize {
return fmt.Errorf("Data does not fit within one UDP packet; size %d, max %d, spans %d",
return fmt.Errorf("data does not fit within one UDP packet; size %d, max %d, spans %d",
a.thriftBuffer.Len(), a.maxPacketSize, len(batch.Spans))
}
_, err := a.connUDP.Write(a.thriftBuffer.Bytes())

View File

@ -40,6 +40,7 @@ var specialTagHandlers = map[string]func(*zipkinSpan, interface{}){
}
// BuildZipkinThrift builds thrift span based on internal span.
// TODO: (breaking change) move to transport/zipkin and make private.
func BuildZipkinThrift(s *Span) *z.Span {
span := &zipkinSpan{Span: s}
span.handleSpecialTags()

3
vendor/modules.txt vendored
View File

@ -478,11 +478,12 @@ github.com/stretchr/testify/require
github.com/syndtr/gocapability/capability
# github.com/tchap/go-patricia v2.3.0+incompatible
github.com/tchap/go-patricia/patricia
# github.com/uber/jaeger-client-go v2.20.1+incompatible
# github.com/uber/jaeger-client-go v2.22.1+incompatible
github.com/uber/jaeger-client-go
github.com/uber/jaeger-client-go/config
github.com/uber/jaeger-client-go/internal/baggage
github.com/uber/jaeger-client-go/internal/baggage/remote
github.com/uber/jaeger-client-go/internal/reporterstats
github.com/uber/jaeger-client-go/internal/spanlog
github.com/uber/jaeger-client-go/internal/throttler
github.com/uber/jaeger-client-go/internal/throttler/remote