mirror of
https://github.com/open-telemetry/opentelemetry-python-contrib.git
synced 2025-07-29 21:23:55 +08:00
Bugfix/set default context in pika (#719)
* Add a default context when one does not exist * Split the if to fit C0325: Unnecessary parens after 'not' keyword * Split the context retrieving according to the function
This commit is contained in:
@ -41,15 +41,20 @@ def _decorate_callback(
|
|||||||
) -> Any:
|
) -> Any:
|
||||||
if not properties:
|
if not properties:
|
||||||
properties = BasicProperties()
|
properties = BasicProperties()
|
||||||
|
if properties.headers is None:
|
||||||
|
properties.headers = {}
|
||||||
|
ctx = propagate.extract(properties.headers, getter=_pika_getter)
|
||||||
|
if not ctx:
|
||||||
|
ctx = context.get_current()
|
||||||
span = _get_span(
|
span = _get_span(
|
||||||
tracer,
|
tracer,
|
||||||
channel,
|
channel,
|
||||||
properties,
|
properties,
|
||||||
task_name=task_name,
|
task_name=task_name,
|
||||||
|
ctx=ctx,
|
||||||
operation=MessagingOperationValues.RECEIVE,
|
operation=MessagingOperationValues.RECEIVE,
|
||||||
)
|
)
|
||||||
with trace.use_span(span, end_on_exit=True):
|
with trace.use_span(span, end_on_exit=True):
|
||||||
propagate.inject(properties.headers)
|
|
||||||
retval = callback(channel, method, properties, body)
|
retval = callback(channel, method, properties, body)
|
||||||
return retval
|
return retval
|
||||||
|
|
||||||
@ -70,11 +75,13 @@ def _decorate_basic_publish(
|
|||||||
) -> Any:
|
) -> Any:
|
||||||
if not properties:
|
if not properties:
|
||||||
properties = BasicProperties()
|
properties = BasicProperties()
|
||||||
|
ctx = context.get_current()
|
||||||
span = _get_span(
|
span = _get_span(
|
||||||
tracer,
|
tracer,
|
||||||
channel,
|
channel,
|
||||||
properties,
|
properties,
|
||||||
task_name="(temporary)",
|
task_name="(temporary)",
|
||||||
|
ctx=ctx,
|
||||||
operation=None,
|
operation=None,
|
||||||
)
|
)
|
||||||
if not span:
|
if not span:
|
||||||
@ -97,11 +104,9 @@ def _get_span(
|
|||||||
channel: Channel,
|
channel: Channel,
|
||||||
properties: BasicProperties,
|
properties: BasicProperties,
|
||||||
task_name: str,
|
task_name: str,
|
||||||
|
ctx: context.Context,
|
||||||
operation: Optional[MessagingOperationValues] = None,
|
operation: Optional[MessagingOperationValues] = None,
|
||||||
) -> Optional[Span]:
|
) -> Optional[Span]:
|
||||||
if properties.headers is None:
|
|
||||||
properties.headers = {}
|
|
||||||
ctx = propagate.extract(properties.headers, getter=_pika_getter)
|
|
||||||
if context.get_value("suppress_instrumentation") or context.get_value(
|
if context.get_value("suppress_instrumentation") or context.get_value(
|
||||||
_SUPPRESS_INSTRUMENTATION_KEY
|
_SUPPRESS_INSTRUMENTATION_KEY
|
||||||
):
|
):
|
||||||
|
@ -23,9 +23,7 @@ class TestUtils(TestCase):
|
|||||||
@mock.patch("opentelemetry.context.get_value")
|
@mock.patch("opentelemetry.context.get_value")
|
||||||
@mock.patch("opentelemetry.instrumentation.pika.utils._generate_span_name")
|
@mock.patch("opentelemetry.instrumentation.pika.utils._generate_span_name")
|
||||||
@mock.patch("opentelemetry.instrumentation.pika.utils._enrich_span")
|
@mock.patch("opentelemetry.instrumentation.pika.utils._enrich_span")
|
||||||
@mock.patch("opentelemetry.propagate.extract")
|
|
||||||
def test_get_span(
|
def test_get_span(
|
||||||
extract: mock.MagicMock,
|
|
||||||
enrich_span: mock.MagicMock,
|
enrich_span: mock.MagicMock,
|
||||||
generate_span_name: mock.MagicMock,
|
generate_span_name: mock.MagicMock,
|
||||||
get_value: mock.MagicMock,
|
get_value: mock.MagicMock,
|
||||||
@ -35,21 +33,19 @@ class TestUtils(TestCase):
|
|||||||
properties = mock.MagicMock()
|
properties = mock.MagicMock()
|
||||||
task_name = "test.test"
|
task_name = "test.test"
|
||||||
get_value.return_value = None
|
get_value.return_value = None
|
||||||
_ = utils._get_span(tracer, channel, properties, task_name)
|
ctx = mock.MagicMock()
|
||||||
extract.assert_called_once()
|
_ = utils._get_span(tracer, channel, properties, task_name, ctx)
|
||||||
generate_span_name.assert_called_once()
|
generate_span_name.assert_called_once()
|
||||||
tracer.start_span.assert_called_once_with(
|
tracer.start_span.assert_called_once_with(
|
||||||
context=extract.return_value, name=generate_span_name.return_value
|
context=ctx, name=generate_span_name.return_value
|
||||||
)
|
)
|
||||||
enrich_span.assert_called_once()
|
enrich_span.assert_called_once()
|
||||||
|
|
||||||
@mock.patch("opentelemetry.context.get_value")
|
@mock.patch("opentelemetry.context.get_value")
|
||||||
@mock.patch("opentelemetry.instrumentation.pika.utils._generate_span_name")
|
@mock.patch("opentelemetry.instrumentation.pika.utils._generate_span_name")
|
||||||
@mock.patch("opentelemetry.instrumentation.pika.utils._enrich_span")
|
@mock.patch("opentelemetry.instrumentation.pika.utils._enrich_span")
|
||||||
@mock.patch("opentelemetry.propagate.extract")
|
|
||||||
def test_get_span_suppressed(
|
def test_get_span_suppressed(
|
||||||
self,
|
self,
|
||||||
extract: mock.MagicMock,
|
|
||||||
enrich_span: mock.MagicMock,
|
enrich_span: mock.MagicMock,
|
||||||
generate_span_name: mock.MagicMock,
|
generate_span_name: mock.MagicMock,
|
||||||
get_value: mock.MagicMock,
|
get_value: mock.MagicMock,
|
||||||
@ -59,10 +55,11 @@ class TestUtils(TestCase):
|
|||||||
properties = mock.MagicMock()
|
properties = mock.MagicMock()
|
||||||
task_name = "test.test"
|
task_name = "test.test"
|
||||||
get_value.return_value = True
|
get_value.return_value = True
|
||||||
span = utils._get_span(tracer, channel, properties, task_name)
|
ctx = mock.MagicMock()
|
||||||
|
span = utils._get_span(tracer, channel, properties, task_name, ctx)
|
||||||
self.assertEqual(span, None)
|
self.assertEqual(span, None)
|
||||||
extract.assert_called_once()
|
|
||||||
generate_span_name.assert_not_called()
|
generate_span_name.assert_not_called()
|
||||||
|
enrich_span.assert_not_called()
|
||||||
|
|
||||||
def test_generate_span_name_no_operation(self) -> None:
|
def test_generate_span_name_no_operation(self) -> None:
|
||||||
task_name = "test.test"
|
task_name = "test.test"
|
||||||
|
Reference in New Issue
Block a user