Skip to content

Commit 9de06e5

Browse files
mauriciovasquezbernaltoumorokoshi
authored andcommitted
sdk: Implement force_flush for span processors (open-telemetry#389)
open-telemetry/opentelemetry-specification#370 added the requirement to have a "force_flush" method in the span processors. This commit exposes an already existing internal method on the batch span processor that does exactly the same, it also adds it to the span processor interface and as a no-op to the simple span processor.
1 parent a012e99 commit 9de06e5

File tree

3 files changed

+42
-6
lines changed

3 files changed

+42
-6
lines changed

opentelemetry-sdk/src/opentelemetry/sdk/trace/__init__.py

+5
Original file line numberDiff line numberDiff line change
@@ -70,6 +70,11 @@ def shutdown(self) -> None:
7070
"""Called when a :class:`opentelemetry.sdk.trace.Tracer` is shutdown.
7171
"""
7272

73+
def force_flush(self) -> None:
74+
"""Export all ended spans to the configured Exporter that have not
75+
yet been exported.
76+
"""
77+
7378

7479
class MultiSpanProcessor(SpanProcessor):
7580
"""Implementation of :class:`SpanProcessor` that forwards all received

opentelemetry-sdk/src/opentelemetry/sdk/trace/export/__init__.py

+5-2
Original file line numberDiff line numberDiff line change
@@ -83,6 +83,9 @@ def on_end(self, span: Span) -> None:
8383
def shutdown(self) -> None:
8484
self.span_exporter.shutdown()
8585

86+
def force_flush(self) -> None:
87+
pass
88+
8689

8790
class BatchExportSpanProcessor(SpanProcessor):
8891
"""Batch span processor implementation.
@@ -171,7 +174,7 @@ def worker(self):
171174
timeout = self.schedule_delay_millis / 1e3 - duration
172175

173176
# be sure that all spans are sent
174-
self._flush()
177+
self.force_flush()
175178

176179
def export(self) -> None:
177180
"""Exports at most max_export_batch_size spans."""
@@ -197,7 +200,7 @@ def export(self) -> None:
197200
for index in range(idx):
198201
self.spans_list[index] = None
199202

200-
def _flush(self):
203+
def force_flush(self):
201204
# export all elements until queue is empty
202205
while self.queue:
203206
self.export()

opentelemetry-sdk/tests/trace/export/test_export.py

+32-4
Original file line numberDiff line numberDiff line change
@@ -97,7 +97,7 @@ def _create_start_and_end_span(name, span_processor):
9797

9898

9999
class TestBatchExportSpanProcessor(unittest.TestCase):
100-
def test_batch_span_processor(self):
100+
def test_shutdown(self):
101101
spans_names_list = []
102102

103103
my_exporter = MySpanExporter(destination=spans_names_list)
@@ -109,9 +109,35 @@ def test_batch_span_processor(self):
109109
_create_start_and_end_span(name, span_processor)
110110

111111
span_processor.shutdown()
112+
self.assertTrue(my_exporter.is_shutdown)
113+
114+
# check that spans are exported without an explicitly call to
115+
# force_flush()
112116
self.assertListEqual(span_names, spans_names_list)
113117

114-
self.assertTrue(my_exporter.is_shutdown)
118+
def test_flush(self):
119+
spans_names_list = []
120+
121+
my_exporter = MySpanExporter(destination=spans_names_list)
122+
span_processor = export.BatchExportSpanProcessor(my_exporter)
123+
124+
span_names0 = ["xxx", "bar", "foo"]
125+
span_names1 = ["yyy", "baz", "fox"]
126+
127+
for name in span_names0:
128+
_create_start_and_end_span(name, span_processor)
129+
130+
span_processor.force_flush()
131+
self.assertListEqual(span_names0, spans_names_list)
132+
133+
# create some more spans to check that span processor still works
134+
for name in span_names1:
135+
_create_start_and_end_span(name, span_processor)
136+
137+
span_processor.force_flush()
138+
self.assertListEqual(span_names0 + span_names1, spans_names_list)
139+
140+
span_processor.shutdown()
115141

116142
def test_batch_span_processor_lossless(self):
117143
"""Test that no spans are lost when sending max_queue_size spans"""
@@ -127,8 +153,9 @@ def test_batch_span_processor_lossless(self):
127153
for _ in range(512):
128154
_create_start_and_end_span("foo", span_processor)
129155

130-
span_processor.shutdown()
156+
span_processor.force_flush()
131157
self.assertEqual(len(spans_names_list), 512)
158+
span_processor.shutdown()
132159

133160
def test_batch_span_processor_many_spans(self):
134161
"""Test that no spans are lost when sending many spans"""
@@ -150,8 +177,9 @@ def test_batch_span_processor_many_spans(self):
150177

151178
time.sleep(0.05) # give some time for the exporter to upload spans
152179

153-
span_processor.shutdown()
180+
span_processor.force_flush()
154181
self.assertEqual(len(spans_names_list), 1024)
182+
span_processor.shutdown()
155183

156184
def test_batch_span_processor_scheduled_delay(self):
157185
"""Test that spans are exported each schedule_delay_millis"""

0 commit comments

Comments
 (0)