Skip to content

Commit bfbab3a

Browse files
committed
Fix race condition in ConcurrentMultiSpanProcessor.shutdown()
Fixed error during shutdown: 'cannot schedule new futures after shutdow'. Modified shutdown sequence to handle executor already being in shutdown state. Added direct processor shutdown when executor is unavailable. Added regression test for executor shutdown race condition.
1 parent afec2dd commit bfbab3a

File tree

4 files changed

+77
-11
lines changed

4 files changed

+77
-11
lines changed

CHANGELOG.md

+2
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,8 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
3232
([#4406](https://github.com/open-telemetry/opentelemetry-python/pull/4406))
3333
- Fix env var error message for TraceLimits/SpanLimits
3434
([#4458](https://github.com/open-telemetry/opentelemetry-python/pull/4458))
35+
- Fix race condition in ConcurrentMultiSpanProcessor shutdown
36+
([#4462](https://github.com/open-telemetry/opentelemetry-python/pull/4462))
3537

3638
## Version 1.30.0/0.51b0 (2025-02-03)
3739

failing_command.py

+19
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,19 @@
1+
from opentelemetry import trace
2+
from opentelemetry.sdk.trace import (
3+
ConcurrentMultiSpanProcessor,
4+
TracerProvider,
5+
)
6+
from opentelemetry.sdk.trace.export import (
7+
BatchSpanProcessor,
8+
ConsoleSpanExporter,
9+
)
10+
11+
span_proc = ConcurrentMultiSpanProcessor()
12+
span_proc.add_span_processor(BatchSpanProcessor(ConsoleSpanExporter()))
13+
14+
trace.set_tracer_provider(TracerProvider())
15+
trace.get_tracer_provider().add_span_processor(span_proc)
16+
17+
tracer = trace.get_tracer(__name__)
18+
with tracer.start_as_current_span("foo"):
19+
print("Hello world!")

opentelemetry-sdk/src/opentelemetry/sdk/trace/__init__.py

+29-10
Original file line numberDiff line numberDiff line change
@@ -221,15 +221,18 @@ class ConcurrentMultiSpanProcessor(SpanProcessor):
221221
def __init__(self, num_threads: int = 2):
222222
# use a tuple to avoid race conditions when adding a new span and
223223
# iterating through it on "on_start" and "on_end".
224-
self._span_processors = () # type: Tuple[SpanProcessor, ...]
224+
self._span_processors: tuple[SpanProcessor] = ()
225225
self._lock = threading.Lock()
226226
self._executor = concurrent.futures.ThreadPoolExecutor(
227227
max_workers=num_threads
228228
)
229+
self._is_shutdown: bool = False
229230

230231
def add_span_processor(self, span_processor: SpanProcessor) -> None:
231232
"""Adds a SpanProcessor to the list handled by this instance."""
232233
with self._lock:
234+
if self._is_shutdown:
235+
return
233236
self._span_processors += (span_processor,)
234237

235238
def _submit_and_await(
@@ -239,9 +242,13 @@ def _submit_and_await(
239242
**kwargs: Any,
240243
):
241244
futures = []
242-
for sp in self._span_processors:
243-
future = self._executor.submit(func(sp), *args, **kwargs)
244-
futures.append(future)
245+
with self._lock:
246+
if self._is_shutdown:
247+
return
248+
for sp in self._span_processors:
249+
future = self._executor.submit(func(sp), *args, **kwargs)
250+
futures.append(future)
251+
245252
for future in futures:
246253
future.result()
247254

@@ -258,8 +265,16 @@ def on_end(self, span: "ReadableSpan") -> None:
258265
self._submit_and_await(lambda sp: sp.on_end, span)
259266

260267
def shutdown(self) -> None:
261-
"""Shuts down all underlying span processors in parallel."""
262-
self._submit_and_await(lambda sp: sp.shutdown)
268+
"""Shuts down all underlying span processors and the executor."""
269+
with self._lock:
270+
if self._is_shutdown:
271+
return
272+
self._is_shutdown = True
273+
274+
for sp in self._span_processors:
275+
sp.shutdown()
276+
277+
self._executor.shutdown(wait=True)
263278

264279
def force_flush(self, timeout_millis: int = 30000) -> bool:
265280
"""Calls force_flush on all underlying span processors in parallel.
@@ -272,10 +287,14 @@ def force_flush(self, timeout_millis: int = 30000) -> bool:
272287
True if all span processors flushed their spans within the given
273288
timeout, False otherwise.
274289
"""
275-
futures = []
276-
for sp in self._span_processors: # type: SpanProcessor
277-
future = self._executor.submit(sp.force_flush, timeout_millis)
278-
futures.append(future)
290+
with self._lock:
291+
if self._is_shutdown:
292+
return False
293+
294+
futures = []
295+
for sp in self._span_processors:
296+
future = self._executor.submit(sp.force_flush, timeout_millis)
297+
futures.append(future)
279298

280299
timeout_sec = timeout_millis / 1e3
281300
done_futures, not_done_futures = concurrent.futures.wait(

opentelemetry-sdk/tests/trace/test_span_processor.py

+27-1
Original file line numberDiff line numberDiff line change
@@ -11,8 +11,8 @@
1111
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
1212
# See the License for the specific language governing permissions and
1313
# limitations under the License.
14-
1514
import abc
15+
import threading
1616
import time
1717
import typing
1818
import unittest
@@ -26,6 +26,10 @@
2626
from opentelemetry import trace as trace_api
2727
from opentelemetry.context import Context
2828
from opentelemetry.sdk import trace
29+
from opentelemetry.sdk.trace.export import (
30+
BatchSpanProcessor,
31+
ConsoleSpanExporter,
32+
)
2933

3034

3135
def span_event_start_fmt(span_processor_name, span_name):
@@ -314,3 +318,25 @@ def test_force_flush_late_by_span_processor(self):
314318
for mock_processor in mocks:
315319
self.assertEqual(1, mock_processor.force_flush.call_count)
316320
multi_processor.shutdown()
321+
322+
def test_executor_shutdown(self):
323+
span_proc = trace.ConcurrentMultiSpanProcessor()
324+
span_proc.add_span_processor(BatchSpanProcessor(ConsoleSpanExporter()))
325+
326+
provider = trace.TracerProvider()
327+
provider.add_span_processor(span_proc)
328+
329+
tracer = provider.get_tracer(__name__)
330+
with tracer.start_as_current_span("foo"):
331+
pass
332+
333+
shutdown_thread = threading.Thread(target=span_proc._executor.shutdown) # pylint: disable=protected-access
334+
shutdown_thread.start()
335+
336+
try:
337+
span_proc.shutdown()
338+
shutdown_thread.join()
339+
except RuntimeError as unexpected_error:
340+
self.fail(
341+
f"Unexpected error raised during shutdown: {unexpected_error}"
342+
)

0 commit comments

Comments
 (0)