|
| 1 | +import os |
| 2 | +import pathlib._abc |
| 3 | +import pathlib.types |
| 4 | +import posixpath |
| 5 | + |
| 6 | +from test.support import os_helper |
| 7 | + |
| 8 | + |
| 9 | +class LocalPathInfo(pathlib.types.PathInfo): |
| 10 | + """ |
| 11 | + Simple implementation of PathInfo for a local path |
| 12 | + """ |
| 13 | + def __init__(self, path): |
| 14 | + self._path = str(path) |
| 15 | + self._exists = None |
| 16 | + self._is_dir = None |
| 17 | + self._is_file = None |
| 18 | + self._is_symlink = None |
| 19 | + |
| 20 | + def exists(self, *, follow_symlinks=True): |
| 21 | + """Whether this path exists.""" |
| 22 | + if not follow_symlinks and self.is_symlink(): |
| 23 | + return True |
| 24 | + if self._exists is None: |
| 25 | + self._exists = os.path.exists(self._path) |
| 26 | + return self._exists |
| 27 | + |
| 28 | + def is_dir(self, *, follow_symlinks=True): |
| 29 | + """Whether this path is a directory.""" |
| 30 | + if not follow_symlinks and self.is_symlink(): |
| 31 | + return False |
| 32 | + if self._is_dir is None: |
| 33 | + self._is_dir = os.path.isdir(self._path) |
| 34 | + return self._is_dir |
| 35 | + |
| 36 | + def is_file(self, *, follow_symlinks=True): |
| 37 | + """Whether this path is a regular file.""" |
| 38 | + if not follow_symlinks and self.is_symlink(): |
| 39 | + return False |
| 40 | + if self._is_file is None: |
| 41 | + self._is_file = os.path.isfile(self._path) |
| 42 | + return self._is_file |
| 43 | + |
| 44 | + def is_symlink(self): |
| 45 | + """Whether this path is a symbolic link.""" |
| 46 | + if self._is_symlink is None: |
| 47 | + self._is_symlink = os.path.islink(self._path) |
| 48 | + return self._is_symlink |
| 49 | + |
| 50 | + |
| 51 | +class ReadableLocalPath(pathlib._abc.ReadablePath): |
| 52 | + """ |
| 53 | + Simple implementation of a ReadablePath class for local filesystem paths. |
| 54 | + """ |
| 55 | + parser = os.path |
| 56 | + |
| 57 | + def __init__(self, *pathsegments): |
| 58 | + self._segments = pathsegments |
| 59 | + self._info = None |
| 60 | + |
| 61 | + def __str__(self): |
| 62 | + if not self._segments: |
| 63 | + return '' |
| 64 | + return self.parser.join(*self._segments) |
| 65 | + |
| 66 | + def __fspath__(self): |
| 67 | + return str(self) |
| 68 | + |
| 69 | + def with_segments(self, *pathsegments): |
| 70 | + return type(self)(*pathsegments) |
| 71 | + |
| 72 | + def __open_rb__(self, buffering=-1): |
| 73 | + return open(self, 'rb') |
| 74 | + |
| 75 | + def iterdir(self): |
| 76 | + return (self / name for name in os.listdir(self)) |
| 77 | + |
| 78 | + def readlink(self): |
| 79 | + return self.with_segments(os.readlink(self)) |
| 80 | + |
| 81 | + @property |
| 82 | + def info(self): |
| 83 | + if self._info is None: |
| 84 | + self._info = LocalPathInfo(self) |
| 85 | + return self._info |
| 86 | + |
| 87 | + |
| 88 | +class WritableLocalPath(pathlib._abc.WritablePath): |
| 89 | + """ |
| 90 | + Simple implementation of a WritablePath class for local filesystem paths. |
| 91 | + """ |
| 92 | + |
| 93 | + __slots__ = ('_segments',) |
| 94 | + parser = posixpath |
| 95 | + |
| 96 | + def __init__(self, *pathsegments): |
| 97 | + self._segments = pathsegments |
| 98 | + |
| 99 | + def __str__(self): |
| 100 | + if not self._segments: |
| 101 | + return '' |
| 102 | + return self.parser.join(*self._segments) |
| 103 | + |
| 104 | + def __fspath__(self): |
| 105 | + return str(self) |
| 106 | + |
| 107 | + def with_segments(self, *pathsegments): |
| 108 | + return type(self)(*pathsegments) |
| 109 | + |
| 110 | + def __open_wb__(self, buffering=-1): |
| 111 | + return open(self, 'wb') |
| 112 | + |
| 113 | + def mkdir(self, mode=0o777): |
| 114 | + os.mkdir(self, mode) |
| 115 | + |
| 116 | + def symlink_to(self, target, target_is_directory=False): |
| 117 | + os.symlink(target, self, target_is_directory) |
| 118 | + |
| 119 | + |
| 120 | +class LocalPathGround: |
| 121 | + can_symlink = os_helper.can_symlink() |
| 122 | + |
| 123 | + def __init__(self, path_cls): |
| 124 | + self.path_cls = path_cls |
| 125 | + |
| 126 | + def setup(self): |
| 127 | + root = self.path_cls(os_helper.TESTFN) |
| 128 | + os.mkdir(root) |
| 129 | + return root |
| 130 | + |
| 131 | + def teardown(self, root): |
| 132 | + os_helper.rmtree(root) |
| 133 | + |
| 134 | + def create_file(self, p, data=b''): |
| 135 | + with open(p, 'wb') as f: |
| 136 | + f.write(data) |
| 137 | + |
| 138 | + def create_hierarchy(self, p): |
| 139 | + os.mkdir(os.path.join(p, 'dirA')) |
| 140 | + os.mkdir(os.path.join(p, 'dirB')) |
| 141 | + os.mkdir(os.path.join(p, 'dirC')) |
| 142 | + os.mkdir(os.path.join(p, 'dirC', 'dirD')) |
| 143 | + with open(os.path.join(p, 'fileA'), 'wb') as f: |
| 144 | + f.write(b"this is file A\n") |
| 145 | + with open(os.path.join(p, 'dirB', 'fileB'), 'wb') as f: |
| 146 | + f.write(b"this is file B\n") |
| 147 | + with open(os.path.join(p, 'dirC', 'fileC'), 'wb') as f: |
| 148 | + f.write(b"this is file C\n") |
| 149 | + with open(os.path.join(p, 'dirC', 'novel.txt'), 'wb') as f: |
| 150 | + f.write(b"this is a novel\n") |
| 151 | + with open(os.path.join(p, 'dirC', 'dirD', 'fileD'), 'wb') as f: |
| 152 | + f.write(b"this is file D\n") |
| 153 | + if self.can_symlink: |
| 154 | + # Relative symlinks. |
| 155 | + os.symlink('fileA', os.path.join(p, 'linkA')) |
| 156 | + os.symlink('non-existing', os.path.join(p, 'brokenLink')) |
| 157 | + os.symlink('dirB', |
| 158 | + os.path.join(p, 'linkB'), |
| 159 | + target_is_directory=True) |
| 160 | + os.symlink(os.path.join('..', 'dirB'), |
| 161 | + os.path.join(p, 'dirA', 'linkC'), |
| 162 | + target_is_directory=True) |
| 163 | + # This one goes upwards, creating a loop. |
| 164 | + os.symlink(os.path.join('..', 'dirB'), |
| 165 | + os.path.join(p, 'dirB', 'linkD'), |
| 166 | + target_is_directory=True) |
| 167 | + # Broken symlink (pointing to itself). |
| 168 | + os.symlink('brokenLinkLoop', os.path.join(p, 'brokenLinkLoop')) |
| 169 | + |
| 170 | + isdir = staticmethod(os.path.isdir) |
| 171 | + isfile = staticmethod(os.path.isfile) |
| 172 | + islink = staticmethod(os.path.islink) |
| 173 | + readlink = staticmethod(os.readlink) |
| 174 | + |
| 175 | + def readtext(self, p): |
| 176 | + with open(p, 'r') as f: |
| 177 | + return f.read() |
| 178 | + |
| 179 | + def readbytes(self, p): |
| 180 | + with open(p, 'rb') as f: |
| 181 | + return f.read() |
0 commit comments