From 15ce2bff63348bff78808e3d01437aacfbd0d432 Mon Sep 17 00:00:00 2001 From: Richard Si Date: Wed, 26 Feb 2025 23:15:28 -0500 Subject: [PATCH] =?UTF-8?q?Parallelize=20bytecode=20compilation=20?= =?UTF-8?q?=E2=9C=A8?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Bytecode compilation is slow. It's often one of the biggest contributors to the install step's sluggishness. For better or worse, we can't really enable --no-compile by default as it has the potential to render certain workflows permanently slower in a subtle way.[^1] To improve the situation, bytecode compilation can be parallelized across a pool of processes (or sub-interpreters on Python 3.14). I've observed a 1.1x to 3x improvement in install step times locally.[^2] This patch has been written to be relatively comprehensible, but for posterity, these are the high-level implementation notes: - We can't use compileall.compile_dir() because it spins up a new worker pool on every invocation. If it's used as a "drop-in" replacement for compileall.compile_file(), then the pool creation overhead will be paid for every package installed. This is bad and kills most of the gains. Redesigning the installation logic to compile everything at the end was rejected for being too invasive (a key goal was to avoid affecting the package installation order). - A bytecode compiler is created right before package installation starts and reused for all packages. Depending on platform and workload, either a serial (in-process) compiler or parallel compiler will be used. They both have the same interface, accepting a batch of Python filepaths to compile. - This patch was designed to as low-risk as reasonably possible. pip does not contain any parallelized code, thus introducing any sort of parallelization poses a nontrivial risk. To minimize this risk, the only code parallelized is the bytecode compilation code itself (~10 LOC). In addition, the package install order is unaffected and pip will fall back to serial compilation if parallelization is unsupported. The criteria for parallelization are: 1. There are at least 2 CPUs available. The process CPU count is used if available, otherwise the system CPU count. If there is only one CPU, serial compilation will always be used because even a parallel compiler with one worker will add extra overhead. 2. The maximum amount of workers is at least 2. This is controlled by the --install-jobs option.[^3] It defaults to "auto" which uses the process/system CPU count.[^4] 3. There is "enough" code for parallelization to be "worth it". This criterion exists so pip won't waste (say) 100ms on spinning up a parallel compiler when compiling serially would only take 20ms.[^5] The limit is set to 1 MB of Python code. This is admittedly rather crude, but it seems to work well enough having tested on a variety of systems. [^1]: Basically, if the Python files are installed to a read-only directory, then importing those files will be permanently slower as the .pyc files will never be cached. This is quite subtle, enough so that we can't really expect newbies to recognise and know how to address this (there is the PYTHONPYCACHEPREFIX envvar, but if you're advanced enough to use it, then you are also advanced enough to know when to use uv or pip's --no-compile). [^2]: The 1.1x was on a painfully slow dual-core/HDD-equipped Windows install installing simply setuptools. The 3x was observed on my main 8-core Ryzen 5800HS Windows machine while installing pip's own test dependencies. [^3]: Yes, this is probably not the best name, but adding an option for just bytecode compilation seems silly. Anyway, this will give us room if we ever parallelize more parts of the install step. [^4]: Up to a hard-coded limit of 8 to avoid resource exhaustion. This number was chosen arbitrarily, but is definitely high enough to net a major improvement. [^5]: This is important because I don't want to slow down tiny installs (e.g., pip install six ... or our own test suite). Creating a new process is prohibitively expensive on Windows (and to a lesser degree on macOS) for various reasons, so parallelization can't be simply used all of time. --- news/13247.feature.rst | 4 + src/pip/_internal/cli/cmdoptions.py | 33 ++++ src/pip/_internal/commands/install.py | 7 + src/pip/_internal/operations/install/wheel.py | 38 ++-- src/pip/_internal/req/__init__.py | 41 ++++- src/pip/_internal/req/req_install.py | 5 +- src/pip/_internal/utils/pyc_compile.py | 170 +++++++++++++++++ tests/unit/test_utils_pyc_compile.py | 172 ++++++++++++++++++ 8 files changed, 439 insertions(+), 31 deletions(-) create mode 100644 news/13247.feature.rst create mode 100644 src/pip/_internal/utils/pyc_compile.py create mode 100644 tests/unit/test_utils_pyc_compile.py diff --git a/news/13247.feature.rst b/news/13247.feature.rst new file mode 100644 index 00000000000..770f84c240d --- /dev/null +++ b/news/13247.feature.rst @@ -0,0 +1,4 @@ +Bytecode compilation is parallelized to significantly speed up installation of +large/many packages. By default, the number of workers matches the available CPUs +(up to a hard-coded limit), but can be adjusted using the ``--install-jobs`` +option. To disable parallelization, pass ``--install-jobs 1``. diff --git a/src/pip/_internal/cli/cmdoptions.py b/src/pip/_internal/cli/cmdoptions.py index 81fed6e940d..1ed4a5359fb 100644 --- a/src/pip/_internal/cli/cmdoptions.py +++ b/src/pip/_internal/cli/cmdoptions.py @@ -1070,6 +1070,39 @@ def check_list_path_option(options: Values) -> None: ) +def _handle_jobs( + option: Option, opt_str: str, value: str, parser: OptionParser +) -> None: + if value == "auto": + setattr(parser.values, option.dest, "auto") + return + + try: + if (count := int(value)) > 0: + setattr(parser.values, option.dest, count) + return + except ValueError: + pass + + msg = "should be a positive integer or 'auto'" + raise_option_error(parser, option=option, msg=msg) + + +install_jobs: Callable[..., Option] = partial( + Option, + "--install-jobs", + dest="install_jobs", + default="auto", + type=str, + action="callback", + callback=_handle_jobs, + help=( + "Maximum number of workers to use while installing packages. " + "To disable parallelization, pass 1. (default: %default)" + ), +) + + ########## # groups # ########## diff --git a/src/pip/_internal/commands/install.py b/src/pip/_internal/commands/install.py index 5239d010421..9f5f4812345 100644 --- a/src/pip/_internal/commands/install.py +++ b/src/pip/_internal/commands/install.py @@ -270,6 +270,8 @@ def add_options(self) -> None: ), ) + self.cmd_opts.add_option(cmdoptions.install_jobs()) + @with_cleanup def run(self, options: Values, args: List[str]) -> int: if options.use_user_site and options.target_dir is not None: @@ -416,6 +418,10 @@ def run(self, options: Values, args: List[str]) -> int: # we're not modifying it. modifying_pip = pip_req.satisfied_by is None protect_pip_from_modification_on_windows(modifying_pip=modifying_pip) + if modifying_pip: + # Parallelization will re-import pip when starting new workers + # during installation which is unsafe if pip is being modified. + options.install_jobs = 1 reqs_to_build = [ r @@ -465,6 +471,7 @@ def run(self, options: Values, args: List[str]) -> int: use_user_site=options.use_user_site, pycompile=options.compile, progress_bar=options.progress_bar, + workers=options.install_jobs, ) lib_locations = get_lib_location_guesses( diff --git a/src/pip/_internal/operations/install/wheel.py b/src/pip/_internal/operations/install/wheel.py index 73e4bfc7c00..416e8da358c 100644 --- a/src/pip/_internal/operations/install/wheel.py +++ b/src/pip/_internal/operations/install/wheel.py @@ -1,19 +1,15 @@ """Support for installing and building the "wheel" binary package format.""" import collections -import compileall import contextlib import csv -import importlib import logging import os.path import re import shutil import sys -import warnings from base64 import urlsafe_b64encode from email.message import Message -from io import StringIO from itertools import chain, filterfalse, starmap from typing import ( IO, @@ -51,6 +47,7 @@ from pip._internal.models.scheme import SCHEME_KEYS, Scheme from pip._internal.utils.filesystem import adjacent_tmp_file, replace from pip._internal.utils.misc import ensure_dir, hash_file, partition +from pip._internal.utils.pyc_compile import BytecodeCompiler from pip._internal.utils.unpacking import ( current_umask, is_within_directory, @@ -417,12 +414,12 @@ def make( return super().make(specification, options) -def _install_wheel( # noqa: C901, PLR0915 function is too long +def _install_wheel( # noqa: C901 function is too long name: str, wheel_zip: ZipFile, wheel_path: str, scheme: Scheme, - pycompile: bool = True, + pycompiler: Optional[BytecodeCompiler], warn_script_location: bool = True, direct_url: Optional[DirectUrl] = None, requested: bool = False, @@ -601,25 +598,14 @@ def pyc_source_file_paths() -> Generator[str, None, None]: continue yield full_installed_path - def pyc_output_path(path: str) -> str: - """Return the path the pyc file would have been written to.""" - return importlib.util.cache_from_source(path) - # Compile all of the pyc files for the installed files - if pycompile: - with contextlib.redirect_stdout(StringIO()) as stdout: - with warnings.catch_warnings(): - warnings.filterwarnings("ignore") - for path in pyc_source_file_paths(): - success = compileall.compile_file(path, force=True, quiet=True) - if success: - pyc_path = pyc_output_path(path) - assert os.path.exists(pyc_path) - pyc_record_path = cast( - "RecordPath", pyc_path.replace(os.path.sep, "/") - ) - record_installed(pyc_record_path, pyc_path) - logger.debug(stdout.getvalue()) + if pycompiler is not None: + for module in pycompiler(pyc_source_file_paths()): + if module.is_success: + pyc_record_path = module.pyc_path.replace(os.path.sep, "/") + record_installed(RecordPath(pyc_record_path), module.pyc_path) + if output := module.compile_output: + logger.debug(output) maker = PipScriptMaker(None, scheme.scripts) @@ -718,7 +704,7 @@ def install_wheel( wheel_path: str, scheme: Scheme, req_description: str, - pycompile: bool = True, + pycompiler: Optional[BytecodeCompiler] = None, warn_script_location: bool = True, direct_url: Optional[DirectUrl] = None, requested: bool = False, @@ -730,7 +716,7 @@ def install_wheel( wheel_zip=z, wheel_path=wheel_path, scheme=scheme, - pycompile=pycompile, + pycompiler=pycompiler, warn_script_location=warn_script_location, direct_url=direct_url, requested=requested, diff --git a/src/pip/_internal/req/__init__.py b/src/pip/_internal/req/__init__.py index bf282dab8bc..ea1a79a0ef2 100644 --- a/src/pip/_internal/req/__init__.py +++ b/src/pip/_internal/req/__init__.py @@ -1,10 +1,14 @@ import collections import logging +from contextlib import nullcontext from dataclasses import dataclass -from typing import Generator, List, Optional, Sequence, Tuple +from functools import partial +from typing import Generator, Iterable, List, Optional, Sequence, Tuple +from zipfile import ZipFile from pip._internal.cli.progress_bars import get_install_progress_renderer from pip._internal.utils.logging import indent_log +from pip._internal.utils.pyc_compile import WorkerSetting, create_bytecode_compiler from .req_file import parse_requirements from .req_install import InstallRequirement @@ -33,6 +37,28 @@ def _validate_requirements( yield req.name, req +def _does_python_size_surpass_threshold( + requirements: Iterable[InstallRequirement], threshold: int +) -> bool: + """Inspect wheels to check whether there is enough .py code to + enable bytecode parallelization. + """ + py_size = 0 + for req in requirements: + if not req.local_file_path or not req.is_wheel: + # No wheel to inspect as this is a legacy editable. + continue + + with ZipFile(req.local_file_path, allowZip64=True) as wheel_file: + for entry in wheel_file.infolist(): + if entry.filename.endswith(".py"): + py_size += entry.file_size + if py_size > threshold: + return True + + return False + + def install_given_reqs( requirements: List[InstallRequirement], global_options: Sequence[str], @@ -43,6 +69,7 @@ def install_given_reqs( use_user_site: bool, pycompile: bool, progress_bar: str, + workers: WorkerSetting, ) -> List[InstallationResult]: """ Install everything in the given list. @@ -68,7 +95,15 @@ def install_given_reqs( ) items = renderer(items) - with indent_log(): + if pycompile: + code_size_check = partial( + _does_python_size_surpass_threshold, to_install.values() + ) + pycompiler = create_bytecode_compiler(workers, code_size_check) + else: + pycompiler = None + + with indent_log(), pycompiler or nullcontext(): for requirement in items: req_name = requirement.name assert req_name is not None @@ -87,7 +122,7 @@ def install_given_reqs( prefix=prefix, warn_script_location=warn_script_location, use_user_site=use_user_site, - pycompile=pycompile, + pycompiler=pycompiler, ) except Exception: # if install did not succeed, rollback previous uninstall diff --git a/src/pip/_internal/req/req_install.py b/src/pip/_internal/req/req_install.py index 3262d82658e..3dd1798c39a 100644 --- a/src/pip/_internal/req/req_install.py +++ b/src/pip/_internal/req/req_install.py @@ -53,6 +53,7 @@ redact_auth_from_url, ) from pip._internal.utils.packaging import get_requirement +from pip._internal.utils.pyc_compile import BytecodeCompiler from pip._internal.utils.subprocess import runner_with_spinner_message from pip._internal.utils.temp_dir import TempDirectory, tempdir_kinds from pip._internal.utils.unpacking import unpack_file @@ -812,7 +813,7 @@ def install( prefix: Optional[str] = None, warn_script_location: bool = True, use_user_site: bool = False, - pycompile: bool = True, + pycompiler: Optional[BytecodeCompiler] = None, ) -> None: assert self.req is not None scheme = get_scheme( @@ -869,7 +870,7 @@ def install( self.local_file_path, scheme=scheme, req_description=str(self.req), - pycompile=pycompile, + pycompiler=pycompiler, warn_script_location=warn_script_location, direct_url=self.download_info if self.is_direct else None, requested=self.user_supplied, diff --git a/src/pip/_internal/utils/pyc_compile.py b/src/pip/_internal/utils/pyc_compile.py new file mode 100644 index 00000000000..472ab19e0d4 --- /dev/null +++ b/src/pip/_internal/utils/pyc_compile.py @@ -0,0 +1,170 @@ +# -------------------------------------------------------------------------- # +# NOTE: Importing from pip's internals or vendored modules should be AVOIDED +# so this module remains fast to import, minimizing the overhead of +# spawning a new bytecode compiler worker. +# -------------------------------------------------------------------------- # + +import compileall +import importlib +import os +import sys +import warnings +from collections.abc import Callable, Iterable, Iterator +from contextlib import contextmanager, redirect_stdout +from io import StringIO +from pathlib import Path +from typing import TYPE_CHECKING, Literal, NamedTuple, Optional, Protocol, Union + +if TYPE_CHECKING: + from pip._vendor.typing_extensions import Self + +WorkerSetting = Union[int, Literal["auto"]] + +CODE_SIZE_THRESHOLD = 1000 * 1000 # 1 MB of .py code +WORKER_LIMIT = 8 + + +@contextmanager +def _patch_main_module_hack() -> Iterator[None]: + """Temporarily replace __main__ to reduce the worker startup overhead. + + concurrent.futures imports the main module while initializing new workers + so any global state is retained in the workers. Unfortunately, when pip + is run from a console script wrapper, the wrapper unconditionally imports + pip._internal.cli.main and everything else it requires. This is *slow*. + + The compilation code does not depend on any global state, thus the costly + re-import of pip can be avoided by replacing __main__ with any random + module that does nothing. + """ + original_main = sys.modules["__main__"] + sys.modules["__main__"] = sys.modules["pip"] + try: + yield + finally: + sys.modules["__main__"] = original_main + + +class CompileResult(NamedTuple): + py_path: str + pyc_path: str + is_success: bool + compile_output: str + + +def _compile_single(py_path: Union[str, Path]) -> CompileResult: + # compile_file() returns True silently even if the source file is nonexistent. + if not os.path.exists(py_path): + raise FileNotFoundError(f"Python file '{py_path!s}' does not exist") + + with warnings.catch_warnings(), redirect_stdout(StringIO()) as stdout: + warnings.filterwarnings("ignore") + success = compileall.compile_file(py_path, force=True, quiet=True) + pyc_path = importlib.util.cache_from_source(py_path) # type: ignore[arg-type] + return CompileResult( + str(py_path), pyc_path, success, stdout.getvalue() # type: ignore[arg-type] + ) + + +class BytecodeCompiler(Protocol): + """Abstraction for compiling Python modules into bytecode in bulk.""" + + def __call__(self, paths: Iterable[str]) -> Iterable[CompileResult]: ... + + def __enter__(self) -> "Self": + return self + + def __exit__(self, *args: object) -> None: + return + + +class SerialCompiler(BytecodeCompiler): + """Compile a set of Python modules one by one in-process.""" + + def __call__(self, paths: Iterable[Union[str, Path]]) -> Iterable[CompileResult]: + for p in paths: + yield _compile_single(p) + + +class ParallelCompiler(BytecodeCompiler): + """Compile a set of Python modules using a pool of workers.""" + + def __init__(self, workers: int) -> None: + from concurrent import futures + + if sys.version_info >= (3, 14): + # Sub-interpreters have less overhead than OS processes. + self.pool = futures.InterpreterPoolExecutor(workers) + else: + self.pool = futures.ProcessPoolExecutor(workers) + self.workers = workers + + def __call__(self, paths: Iterable[Union[str, Path]]) -> Iterable[CompileResult]: + # New workers can be started at any time, so patch until fully done. + with _patch_main_module_hack(): + yield from self.pool.map(_compile_single, paths) + + def __exit__(self, *args: object) -> None: + # It's pointless to block on pool finalization, let it occur in background. + self.pool.shutdown(wait=False) + + +def create_bytecode_compiler( + max_workers: WorkerSetting = "auto", + code_size_check: Optional[Callable[[int], bool]] = None, +) -> BytecodeCompiler: + """Return a bytecode compiler appropriate for the workload and platform. + + Parallelization will only be used if: + - There are 2 or more CPUs available + - The maximum # of workers permitted is at least 2 + - There is "enough" code to be compiled to offset the worker startup overhead + (if it can be determined in advance via code_size_check) + + A maximum worker count of "auto" will use the number of CPUs available to the + process or system, up to a hard-coded limit (to avoid resource exhaustion). + + code_size_check is a callable that receives the code size threshold (in # of + bytes) for parallelization and returns whether it will be surpassed or not. + """ + import logging + + try: + # New in Python 3.13. + cpus: Optional[int] = os.process_cpu_count() # type: ignore + except AttributeError: + # Poor man's fallback. We won't respect PYTHON_CPU_COUNT, but the envvar + # was only added in Python 3.13 anyway. + try: + cpus = len(os.sched_getaffinity(0)) # exists on unix (usually) + except AttributeError: + cpus = os.cpu_count() + + logger = logging.getLogger(__name__) + logger.debug("Detected CPU count: %s", cpus) + logger.debug("Configured worker count: %s", max_workers) + + # Case 1: Parallelization is disabled or pointless (there's only one CPU). + if max_workers == 1 or cpus == 1 or cpus is None: + logger.debug("Bytecode will be compiled serially") + return SerialCompiler() + + # Case 2: There isn't enough code for parallelization to be worth it. + if code_size_check is not None and not code_size_check(CODE_SIZE_THRESHOLD): + logger.debug("Bytecode will be compiled serially (not enough .py code)") + return SerialCompiler() + + # Case 3: Attempt to initialize a parallelized compiler. + # The concurrent executors will spin up new workers on a "on-demand basis", + # which helps to avoid wasting time on starting new workers that won't be + # used. (** This isn't true for the fork start method, but forking is + # fast enough that it doesn't really matter.) + workers = min(cpus, WORKER_LIMIT) if max_workers == "auto" else max_workers + try: + compiler = ParallelCompiler(workers) + logger.debug("Bytecode will be compiled using at most %s workers", workers) + return compiler + except (ImportError, NotImplementedError, OSError) as e: + # Case 4: multiprocessing is broken, fall back to serial compilation. + logger.debug("Err! Falling back to serial bytecode compilation", exc_info=e) + return SerialCompiler() diff --git a/tests/unit/test_utils_pyc_compile.py b/tests/unit/test_utils_pyc_compile.py new file mode 100644 index 00000000000..d7d8a1e41ef --- /dev/null +++ b/tests/unit/test_utils_pyc_compile.py @@ -0,0 +1,172 @@ +from contextlib import contextmanager +from functools import partial +from pathlib import Path +from typing import Iterator, Optional, Type +from unittest.mock import Mock, patch + +import pytest +from pytest import param # noqa: PT013 + +from pip._internal.utils import pyc_compile +from pip._internal.utils.pyc_compile import ( + BytecodeCompiler, + ParallelCompiler, + SerialCompiler, + _compile_single, + create_bytecode_compiler, +) + +try: + import concurrent.futures + import multiprocessing +except (OSError, NotImplementedError, ImportError): + parallel_supported = False +else: + parallel_supported = True + +needs_parallel_compiler = pytest.mark.skipif( + not parallel_supported, reason="ParallelCompiler is unavailable" +) + + +@contextmanager +def patch_cpu_count(n: Optional[int]) -> Iterator[None]: + with patch("os.process_cpu_count", new=lambda: n, create=True): + yield + + +@pytest.fixture(autouse=True) +def force_spawn_method() -> Iterator[None]: + """Force the use of the spawn method to suppress thread-safety warnings.""" + if parallel_supported: + ctx = multiprocessing.get_context("spawn") + wrapped = partial(concurrent.futures.ProcessPoolExecutor, mp_context=ctx) + with patch.object(concurrent.futures, "ProcessPoolExecutor", wrapped): + yield + + +class TestCompileSingle: + def test_basic(self, tmp_path: Path) -> None: + source_file = tmp_path / "code.py" + source_file.write_text("print('hello, world')") + + result = _compile_single(source_file) + assert result.is_success + assert not result.compile_output.strip(), "nothing should be logged!" + assert "__pycache__" in result.pyc_path + assert Path(result.pyc_path).exists() + + def test_syntax_error( + self, tmp_path: Path, capsys: pytest.CaptureFixture[str] + ) -> None: + source_file = tmp_path / "code.py" + source_file.write_text("import ") + + result = _compile_single(source_file) + assert not result.is_success + assert result.compile_output.strip() + assert "SyntaxError" in result.compile_output + + stdout, stderr = capsys.readouterr() + assert not stdout, "output should be captured" + assert not stderr, "compileall does not use sys.stderr" + + def test_nonexistent_file(self, tmp_path: Path) -> None: + with pytest.raises(FileNotFoundError): + _compile_single(tmp_path / "aaa.py") + + +@pytest.mark.parametrize( + "compiler_kind", ["serial", param("parallel", marks=needs_parallel_compiler)] +) +def test_bulk_compilation(tmp_path: Path, compiler_kind: str) -> None: + files = [tmp_path / f"source{n}.py" for n in range(1, 11)] + for f in files: + f.write_text("pass") + + remaining = list(files) + compiler = SerialCompiler() if compiler_kind == "serial" else ParallelCompiler(2) + for result in compiler(files): + assert result.is_success + assert Path(result.pyc_path).exists() + remaining.remove(Path(result.py_path)) + + assert not remaining, "every file should've been compiled" + + +@pytest.mark.parametrize( + "compiler_kind", ["serial", param("parallel", marks=needs_parallel_compiler)] +) +def test_bulk_compilation_with_error(tmp_path: Path, compiler_kind: str) -> None: + good_files, bad_files = set(), set() + for n in range(1, 11): + source_file = tmp_path / f"source{n}.py" + if not n % 2: + source_file.write_text("pass") + good_files.add(source_file) + else: + source_file.write_text("import ") + bad_files.add(source_file) + + compiler = SerialCompiler() if compiler_kind == "serial" else ParallelCompiler(2) + + files = good_files | bad_files + remaining = files.copy() + reported_as_success, reported_as_fail = set(), set() + for result in compiler(files): + py_path = Path(result.py_path) + remaining.remove(py_path) + if result.is_success: + reported_as_success.add(py_path) + else: + reported_as_fail.add(py_path) + + assert not remaining, "every file should've been processed" + assert files - reported_as_success == bad_files + assert files - reported_as_fail == good_files + + +@needs_parallel_compiler +class TestCompilerSelection: + @pytest.mark.parametrize( + "cpus, expected_type", + [(None, SerialCompiler), (1, SerialCompiler), (2, ParallelCompiler)], + ) + def test_cpu_count( + self, cpus: Optional[int], expected_type: Type[BytecodeCompiler] + ) -> None: + with patch_cpu_count(cpus): + compiler = create_bytecode_compiler() + assert isinstance(compiler, expected_type) + if isinstance(compiler, ParallelCompiler): + assert compiler.workers == cpus + + def test_cpu_count_exceeds_limit(self) -> None: + with patch_cpu_count(10): + compiler = create_bytecode_compiler() + assert isinstance(compiler, ParallelCompiler) + assert compiler.workers == 8 + + def test_broken_multiprocessing(self) -> None: + fake_module = Mock() + fake_module.ProcessPoolExecutor = Mock(side_effect=NotImplementedError) + fake_module.InterpreterPoolExecutor = Mock(side_effect=NotImplementedError) + with ( + patch("concurrent.futures", fake_module), + patch.object( + pyc_compile, "ParallelCompiler", wraps=ParallelCompiler + ) as parallel_mock, + ): + compiler = create_bytecode_compiler(max_workers=2) + assert isinstance(compiler, SerialCompiler) + parallel_mock.assert_called_once() + + def test_only_one_worker(self) -> None: + with patch_cpu_count(2): + compiler = create_bytecode_compiler(max_workers=1) + assert isinstance(compiler, SerialCompiler) + + def test_not_enough_code(self) -> None: + with patch_cpu_count(2): + compiler = create_bytecode_compiler(code_size_check=lambda threshold: False) + assert isinstance(compiler, SerialCompiler)