7
7
import json
8
8
import os
9
9
from pathlib import Path
10
+ import tempfile
10
11
from typing import Dict
11
12
from typing import final
12
13
from typing import Generator
13
14
from typing import Iterable
14
15
from typing import List
15
16
from typing import Optional
16
17
from typing import Set
18
+ from typing import Tuple
17
19
from typing import Union
18
20
19
21
from .pathlib import resolve_from_str
20
22
from .pathlib import rm_rf
21
23
from .reports import CollectReport
22
24
from _pytest import nodes
23
25
from _pytest ._io import TerminalWriter
26
+ from _pytest .compat import assert_never
24
27
from _pytest .config import Config
25
28
from _pytest .config import ExitCode
26
29
from _pytest .config import hookimpl
@@ -123,6 +126,10 @@ def warn(self, fmt: str, *, _ispytest: bool = False, **args: object) -> None:
123
126
stacklevel = 3 ,
124
127
)
125
128
129
+ def _mkdir (self , path : Path ) -> None :
130
+ self ._ensure_cache_dir_and_supporting_files ()
131
+ path .mkdir (exist_ok = True , parents = True )
132
+
126
133
def mkdir (self , name : str ) -> Path :
127
134
"""Return a directory path object with the given name.
128
135
@@ -141,7 +148,7 @@ def mkdir(self, name: str) -> Path:
141
148
if len (path .parts ) > 1 :
142
149
raise ValueError ("name is not allowed to contain path separators" )
143
150
res = self ._cachedir .joinpath (self ._CACHE_PREFIX_DIRS , path )
144
- res . mkdir ( exist_ok = True , parents = True )
151
+ self . _mkdir ( res )
145
152
return res
146
153
147
154
def _getvaluepath (self , key : str ) -> Path :
@@ -178,19 +185,13 @@ def set(self, key: str, value: object) -> None:
178
185
"""
179
186
path = self ._getvaluepath (key )
180
187
try :
181
- if path .parent .is_dir ():
182
- cache_dir_exists_already = True
183
- else :
184
- cache_dir_exists_already = self ._cachedir .exists ()
185
- path .parent .mkdir (exist_ok = True , parents = True )
188
+ self ._mkdir (path .parent )
186
189
except OSError as exc :
187
190
self .warn (
188
191
f"could not create cache path { path } : { exc } " ,
189
192
_ispytest = True ,
190
193
)
191
194
return
192
- if not cache_dir_exists_already :
193
- self ._ensure_supporting_files ()
194
195
data = json .dumps (value , ensure_ascii = False , indent = 2 )
195
196
try :
196
197
f = path .open ("w" , encoding = "UTF-8" )
@@ -203,17 +204,47 @@ def set(self, key: str, value: object) -> None:
203
204
with f :
204
205
f .write (data )
205
206
206
- def _ensure_supporting_files (self ) -> None :
207
- """Create supporting files in the cache dir that are not really part of the cache ."""
208
- readme_path = self ._cachedir / "README.md"
209
- readme_path . write_text ( README_CONTENT , encoding = "UTF-8" )
207
+ def _ensure_cache_dir_and_supporting_files (self ) -> None :
208
+ """Create the cache dir and its supporting files ."""
209
+ if self ._cachedir . is_dir ():
210
+ return
210
211
211
- gitignore_path = self ._cachedir .joinpath (".gitignore" )
212
- msg = "# Created by pytest automatically.\n *\n "
213
- gitignore_path .write_text (msg , encoding = "UTF-8" )
212
+ files : Iterable [Tuple [str , Union [str , bytes ]]] = (
213
+ ("README.md" , README_CONTENT ),
214
+ (".gitignore" , "# Created by pytest automatically.\n *\n " ),
215
+ ("CACHEDIR.TAG" , CACHEDIR_TAG_CONTENT ),
216
+ )
214
217
215
- cachedir_tag_path = self ._cachedir .joinpath ("CACHEDIR.TAG" )
216
- cachedir_tag_path .write_bytes (CACHEDIR_TAG_CONTENT )
218
+ with tempfile .TemporaryDirectory () as d :
219
+ for file , content in files :
220
+ file = os .path .join (d , file )
221
+ if isinstance (content , str ):
222
+ with open (file , "xt" , encoding = "UTF-8" ) as f :
223
+ f .write (content )
224
+ elif isinstance (content , bytes ):
225
+ with open (file , "xb" ) as f :
226
+ f .write (content )
227
+ else :
228
+ assert_never (content )
229
+ try :
230
+ os .renames (d , self ._cachedir )
231
+ except OSError as exc :
232
+ # This can happen if the temporary directory is on a different filesystem from the
233
+ # destination.
234
+ self .warn (
235
+ f"could not rename cache directory into place: { exc } " ,
236
+ _ispytest = True ,
237
+ )
238
+ self ._cachedir .mkdir (parents = True , exist_ok = True )
239
+ for file , _ in files :
240
+ os .rename (os .path .join (d , file ), self ._cachedir .joinpath (file ))
241
+
242
+ # Create a directory in place of the one we just moved so that `TemporaryDirectory`'s
243
+ # cleanup doesn't complain.
244
+ #
245
+ # TODO: pass ignore_cleanup_errors=True when we no longer support python < 3.10. See
246
+ # https://github.com/python/cpython/issues/74168.
247
+ os .mkdir (d )
217
248
218
249
219
250
class LFPluginCollWrapper :
0 commit comments