opentelemetry In Memory SpanExporter 在测试用例完成后不重置

open telemetry InMemorySpanExporter not reseting after test case is completed

打开遥测 InMemorySpanExporter 测试 class 完成后未重置。这导致 span_list = self.memory_exporter.get_finished_spans() 对于第二次测试 class.

为空
from opentelemetry import trace
from opentelemetry.sdk.trace import TracerProvider
from opentelemetry.sdk.trace.export import SimpleSpanProcessor
from opentelemetry.sdk.trace.export.in_memory_span_exporter import InMemorySpanExporter
from contextlib import contextmanager
import unittest

tracer = trace.get_tracer(__name__)


@contextmanager
def method(name):
    with tracer.start_as_current_span(name) as span:
        try:
            yield span
        except Exception as error:
            span.record_exception(error)
            raise
          
          
class OpenTelemetryBase(unittest.TestCase):
    @classmethod
    def setUpClass(cls):
        cls.tracer_provider = TracerProvider()
        cls.memory_exporter = InMemorySpanExporter()
        cls.span_processor = SimpleSpanProcessor(cls.memory_exporter)
        cls.tracer_provider.add_span_processor(cls.span_processor)
        trace.set_tracer_provider(cls.tracer_provider)

    def tearDown(self):
        self.memory_exporter.clear()

class TestTracing(OpenTelemetryBase):
    def test_method_1(self):
        with self.assertRaises(Exception):
            with method("TestTracing1") as span:
                raise Exception("Failed trace 1")
        span_list = self.memory_exporter.get_finished_spans()
        self.assertEqual(len(span_list), 1)
        self.assertEqual(span_list[0].status.description, "Exception: Failed trace 1")
        self.assertEqual(span_list[0].name, "TestTracing1")


class TestTracingB(OpenTelemetryBase):
    def test_method_2(self):
        with self.assertRaises(Exception):
            with method("TestTracingB1") as span:
                raise Exception("Failed traceB 1")
        span_list = self.memory_exporter.get_finished_spans()
        self.assertEqual(len(span_list), 1)
        self.assertEqual(span_list[0].status.description, "Exception: Failed traceB 1")
        self.assertEqual(span_list[0].name, "TestTracingB1")

命令 运行 仅测试 class 1:py.test tracing.py::TestTracing:执行成功。

命令 运行 仅测试 class 2:py.test tracing.py::TestTracingB:执行成功。

但是命令 运行 两者一起测试 class:py.test tracing.py

输出:tracing.py .F第一次成功,第二次失败。

FAILED tracing.py::TestTracingB::test_method_2 - AssertionError: 0 != 1

即使我每次测试都使用了 memory_exporter.clear() 万亿次。

对于 setUpClass 方法,如果我将其更改为设置,则同一个 class 中的多个测试将开始失败,只有第一个通过,其余的将失败。

您的描述不清楚,但我可以分享您为什么 运行 进入 AssertionError。发生这种情况是因为我们不允许在已经设置后设置全局跟踪器提供程序; link to code which does that。只能有一个全球示踪剂供应商。因此,当在第二次测试中调用 trace.set_tracer_provider 时,它会在不执行任何操作的情况下记录警告,因此您第二次尝试设置管道失败,即第二个导出器从未收到跨度。

Srikanth Chekuri 先发制人:这里的基本问题是对 set_tracer_provider 的第二次调用没有做任何事情,您最终使用与第一次设置调用相同的导出器。

不幸的是,跟踪器提供程序是全局的,只配置一次,并且库没有提供一种简单的方法来更换提供程序或跨度处理器以进行此类测试。

就像暴力招致暴力一样,全局变量会导致全局变量。解决此问题的一种(丑陋的)方法是使您的测试导出器也成为全局的,并且仅在初始化测试时将其附加到跟踪提供程序。这意味着 OpenTelemetryBase 测试的所有实例将共享同一个导出器,因此您可以在拆卸时清除已完成的跨度,并且测试可能无法并行进行。

from contextlib import contextmanager
import logging
import unittest

from opentelemetry import trace
from opentelemetry.sdk.trace import TracerProvider
from opentelemetry.sdk.trace.export import SimpleSpanProcessor
from opentelemetry.sdk.trace.export.in_memory_span_exporter import InMemorySpanExporter


logger = logging.getLogger(__name__)

# Only here for demonstration reasons
logger.addHandler(logging.StreamHandler())
logger.setLevel(logging.DEBUG)

# Typically called somewhere else. If this isn't called anywhere we get the
# default ProxyTracerProvider provider, which doesn't support adding span
# processors. This is usually only the case if the OT SDK isn't installed.
trace.set_tracer_provider(TracerProvider())

_TEST_OT_EXPORTER = None
_TEST_OT_PROVIDER_INITIALIZED = False

def get_test_ot_exporter():
    global _TEST_OT_EXPORTER

    if _TEST_OT_EXPORTER is None:
        _TEST_OT_EXPORTER = InMemorySpanExporter()
    return _TEST_OT_EXPORTER

def use_test_ot_exporter():
    global _TEST_OT_PROVIDER_INITIALIZED

    if _TEST_OT_PROVIDER_INITIALIZED:
        logger.info("Skipping repeated call to use_test_ot_exporter")
        return

    provider = trace.get_tracer_provider()
    if not hasattr(provider, 'add_span_processor'):
        logger.warn("OT TracerProvider has no add_span_processor. Is the OT "
                    "SDK installed?")
        return
    provider.add_span_processor(SimpleSpanProcessor(get_test_ot_exporter()))
    _TEST_OT_PROVIDER_INITIALIZED = True


@contextmanager
def method(name):
    tracer = trace.get_tracer(__name__)
    with tracer.start_as_current_span(name) as span:
        try:
            yield span
        except Exception as error:
            span.record_exception(error)
            raise


class OpenTelemetryBase(unittest.TestCase):
    @classmethod
    def setUpClass(cls):
        use_test_ot_exporter()
        cls.ot_exporter = get_test_ot_exporter()

    def tearDown(self):
        self.ot_exporter.clear()


class TestTracing(OpenTelemetryBase):
    def test_method_1(self):
        with self.assertRaises(Exception):
            with method("TestTracing1") as span:
                raise Exception("Failed trace 1")

        span_list = self.ot_exporter.get_finished_spans()
        self.assertEqual(len(span_list), 1)
        self.assertEqual(
            span_list[0].status.description, "Exception: Failed trace 1"
        )
        self.assertEqual(span_list[0].name, "TestTracing1")


class TestTracingB(OpenTelemetryBase):
    def test_method_2(self):
        with self.assertRaises(Exception):
            with method("TestTracingB1") as span:
                raise Exception("Failed traceB 1")

        span_list = self.ot_exporter.get_finished_spans()
        self.assertEqual(len(span_list), 1)
        self.assertEqual(
            span_list[0].status.description, "Exception: Failed traceB 1"
        )
        self.assertEqual(span_list[0].name, "TestTracingB1")

直到 OT python 库添加了一种更好的方法来临时替换 tracer 提供程序,或者临时向全局提供程序添加 span 处理器或导出器,这可能是执行此操作的侵入性最小的方法。