From e83c4905634aa1089cf71226cc03f93498e5d3fd Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Mauricio=20V=C3=A1squez?= Date: Wed, 29 Jan 2020 11:41:14 -0500 Subject: [PATCH 1/3] sdk: Implement force_flush for span processors https://github.com/open-telemetry/opentelemetry-specification/pull/370 added the requirement to have a "force_flush" method in the span processors. This commit exposes an already existing internal method on the batch span processor that does exactly the same, it also adds it to the span processor interface and as a no-op to the simple span processor. --- .../src/opentelemetry/sdk/trace/__init__.py | 5 +++++ .../src/opentelemetry/sdk/trace/export/__init__.py | 7 +++++-- opentelemetry-sdk/tests/trace/export/test_export.py | 9 ++++++--- 3 files changed, 16 insertions(+), 5 deletions(-) diff --git a/opentelemetry-sdk/src/opentelemetry/sdk/trace/__init__.py b/opentelemetry-sdk/src/opentelemetry/sdk/trace/__init__.py index 9829c8b33ba..21e5a50b9bd 100644 --- a/opentelemetry-sdk/src/opentelemetry/sdk/trace/__init__.py +++ b/opentelemetry-sdk/src/opentelemetry/sdk/trace/__init__.py @@ -70,6 +70,11 @@ def shutdown(self) -> None: """Called when a :class:`opentelemetry.sdk.trace.Tracer` is shutdown. """ + def force_flush(self) -> None: + """Export all ended spans to the configured Exporter that have not + yet been exported. + """ + class MultiSpanProcessor(SpanProcessor): """Implementation of :class:`SpanProcessor` that forwards all received diff --git a/opentelemetry-sdk/src/opentelemetry/sdk/trace/export/__init__.py b/opentelemetry-sdk/src/opentelemetry/sdk/trace/export/__init__.py index b70fb010190..5db2c1e957d 100644 --- a/opentelemetry-sdk/src/opentelemetry/sdk/trace/export/__init__.py +++ b/opentelemetry-sdk/src/opentelemetry/sdk/trace/export/__init__.py @@ -83,6 +83,9 @@ def on_end(self, span: Span) -> None: def shutdown(self) -> None: self.span_exporter.shutdown() + def force_flush(self) -> None: + pass + class BatchExportSpanProcessor(SpanProcessor): """Batch span processor implementation. @@ -171,7 +174,7 @@ def worker(self): timeout = self.schedule_delay_millis / 1e3 - duration # be sure that all spans are sent - self._flush() + self.force_flush() def export(self) -> None: """Exports at most max_export_batch_size spans.""" @@ -197,7 +200,7 @@ def export(self) -> None: for index in range(idx): self.spans_list[index] = None - def _flush(self): + def force_flush(self): # export all elements until queue is empty while self.queue: self.export() diff --git a/opentelemetry-sdk/tests/trace/export/test_export.py b/opentelemetry-sdk/tests/trace/export/test_export.py index 54fdee2629b..e65220e364f 100644 --- a/opentelemetry-sdk/tests/trace/export/test_export.py +++ b/opentelemetry-sdk/tests/trace/export/test_export.py @@ -108,9 +108,10 @@ def test_batch_span_processor(self): for name in span_names: _create_start_and_end_span(name, span_processor) - span_processor.shutdown() + span_processor.force_flush() self.assertListEqual(span_names, spans_names_list) + span_processor.shutdown() self.assertTrue(my_exporter.is_shutdown) def test_batch_span_processor_lossless(self): @@ -127,8 +128,9 @@ def test_batch_span_processor_lossless(self): for _ in range(512): _create_start_and_end_span("foo", span_processor) - span_processor.shutdown() + span_processor.force_flush() self.assertEqual(len(spans_names_list), 512) + span_processor.shutdown() def test_batch_span_processor_many_spans(self): """Test that no spans are lost when sending many spans""" @@ -150,8 +152,9 @@ def test_batch_span_processor_many_spans(self): time.sleep(0.05) # give some time for the exporter to upload spans - span_processor.shutdown() + span_processor.force_flush() self.assertEqual(len(spans_names_list), 1024) + span_processor.shutdown() def test_batch_span_processor_scheduled_delay(self): """Test that spans are exported each schedule_delay_millis""" From 4cf3c3c6761b7ca8dbb7f0a0ce91243f1c18bf49 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Mauricio=20V=C3=A1squez?= Date: Mon, 3 Feb 2020 11:35:07 -0500 Subject: [PATCH 2/3] Improve testing: Test that: - processor works after call to force_flush() - shutdown() flushes the processor --- .../tests/trace/export/test_export.py | 14 +++++++++++--- 1 file changed, 11 insertions(+), 3 deletions(-) diff --git a/opentelemetry-sdk/tests/trace/export/test_export.py b/opentelemetry-sdk/tests/trace/export/test_export.py index e65220e364f..00266f9e3d3 100644 --- a/opentelemetry-sdk/tests/trace/export/test_export.py +++ b/opentelemetry-sdk/tests/trace/export/test_export.py @@ -103,17 +103,25 @@ def test_batch_span_processor(self): my_exporter = MySpanExporter(destination=spans_names_list) span_processor = export.BatchExportSpanProcessor(my_exporter) - span_names = ["xxx", "bar", "foo"] + span_names0 = ["xxx", "bar", "foo"] + span_names1 = ["yyy", "baz", "fox"] - for name in span_names: + for name in span_names0: _create_start_and_end_span(name, span_processor) span_processor.force_flush() - self.assertListEqual(span_names, spans_names_list) + self.assertListEqual(span_names0, spans_names_list) + + # create some more spans to check that span processor still works + for name in span_names1: + _create_start_and_end_span(name, span_processor) span_processor.shutdown() self.assertTrue(my_exporter.is_shutdown) + # check that processor is flushed after shutdown() + self.assertListEqual(span_names0 + span_names1, spans_names_list) + def test_batch_span_processor_lossless(self): """Test that no spans are lost when sending max_queue_size spans""" spans_names_list = [] From 7612e6e484a274112c9ebc9504a42ac4bbafd832 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Mauricio=20V=C3=A1squez?= Date: Tue, 4 Feb 2020 13:30:18 -0500 Subject: [PATCH 3/3] Separate tests for shutdown and force_flush --- .../tests/trace/export/test_export.py | 27 +++++++++++++++---- 1 file changed, 22 insertions(+), 5 deletions(-) diff --git a/opentelemetry-sdk/tests/trace/export/test_export.py b/opentelemetry-sdk/tests/trace/export/test_export.py index 00266f9e3d3..43299ebe6a4 100644 --- a/opentelemetry-sdk/tests/trace/export/test_export.py +++ b/opentelemetry-sdk/tests/trace/export/test_export.py @@ -97,7 +97,25 @@ def _create_start_and_end_span(name, span_processor): class TestBatchExportSpanProcessor(unittest.TestCase): - def test_batch_span_processor(self): + def test_shutdown(self): + spans_names_list = [] + + my_exporter = MySpanExporter(destination=spans_names_list) + span_processor = export.BatchExportSpanProcessor(my_exporter) + + span_names = ["xxx", "bar", "foo"] + + for name in span_names: + _create_start_and_end_span(name, span_processor) + + span_processor.shutdown() + self.assertTrue(my_exporter.is_shutdown) + + # check that spans are exported without an explicitly call to + # force_flush() + self.assertListEqual(span_names, spans_names_list) + + def test_flush(self): spans_names_list = [] my_exporter = MySpanExporter(destination=spans_names_list) @@ -116,12 +134,11 @@ def test_batch_span_processor(self): for name in span_names1: _create_start_and_end_span(name, span_processor) - span_processor.shutdown() - self.assertTrue(my_exporter.is_shutdown) - - # check that processor is flushed after shutdown() + span_processor.force_flush() self.assertListEqual(span_names0 + span_names1, spans_names_list) + span_processor.shutdown() + def test_batch_span_processor_lossless(self): """Test that no spans are lost when sending max_queue_size spans""" spans_names_list = []