-
Notifications
You must be signed in to change notification settings - Fork 213
/
Copy path_utils.py
220 lines (172 loc) · 6.8 KB
/
_utils.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
# Copyright 2017-2020 Palantir Technologies, Inc.
# Copyright 2021- Python Language Server Contributors.
import functools
import inspect
import logging
import os
import pathlib
import threading
import jedi
JEDI_VERSION = jedi.__version__
log = logging.getLogger(__name__)
def debounce(interval_s, keyed_by=None):
"""Debounce calls to this function until interval_s seconds have passed."""
def wrapper(func):
timers = {}
lock = threading.Lock()
@functools.wraps(func)
def debounced(*args, **kwargs):
sig = inspect.signature(func)
call_args = sig.bind(*args, **kwargs)
key = call_args.arguments[keyed_by] if keyed_by else None
def run():
with lock:
del timers[key]
return func(*args, **kwargs)
with lock:
old_timer = timers.get(key)
if old_timer:
old_timer.cancel()
timer = threading.Timer(interval_s, run)
timers[key] = timer
timer.start()
return debounced
return wrapper
def find_parents(root, path, names):
"""Find files matching the given names relative to the given path.
Args:
path (str): The file path to start searching up from.
names (List[str]): The file/directory names to look for.
root (str): The directory at which to stop recursing upwards.
Note:
The path MUST be within the root.
"""
if not root:
return []
if not os.path.commonprefix((root, path)):
log.warning("Path %s not in %s", path, root)
return []
# Split the relative by directory, generate all the parent directories, then check each of them.
# This avoids running a loop that has different base-cases for unix/windows
# e.g. /a/b and /a/b/c/d/e.py -> ['/a/b', 'c', 'd']
dirs = [root] + os.path.relpath(os.path.dirname(path), root).split(os.path.sep)
# Search each of /a/b/c, /a/b, /a
while dirs:
search_dir = os.path.join(*dirs)
existing = list(filter(os.path.exists, [os.path.join(search_dir, n) for n in names]))
if existing:
return existing
dirs.pop()
# Otherwise nothing
return []
def path_to_dot_name(path):
"""Given a path to a module, derive its dot-separated full name."""
directory = os.path.dirname(path)
module_name, _ = os.path.splitext(os.path.basename(path))
full_name = [module_name]
while os.path.exists(os.path.join(directory, '__init__.py')):
this_directory = os.path.basename(directory)
directory = os.path.dirname(directory)
full_name = [this_directory] + full_name
return '.'.join(full_name)
def match_uri_to_workspace(uri, workspaces):
if uri is None:
return None
max_len, chosen_workspace = -1, None
path = pathlib.Path(uri).parts
for workspace in workspaces:
workspace_parts = pathlib.Path(workspace).parts
if len(workspace_parts) > len(path):
continue
match_len = 0
for workspace_part, path_part in zip(workspace_parts, path):
if workspace_part == path_part:
match_len += 1
if match_len > 0:
if match_len > max_len:
max_len = match_len
chosen_workspace = workspace
return chosen_workspace
def list_to_string(value):
return ",".join(value) if isinstance(value, list) else value
def merge_dicts(dict_a, dict_b):
"""Recursively merge dictionary b into dictionary a.
If override_nones is True, then
"""
def _merge_dicts_(a, b):
for key in set(a.keys()).union(b.keys()):
if key in a and key in b:
if isinstance(a[key], dict) and isinstance(b[key], dict):
yield (key, dict(_merge_dicts_(a[key], b[key])))
elif b[key] is not None:
yield (key, b[key])
else:
yield (key, a[key])
elif key in a:
yield (key, a[key])
elif b[key] is not None:
yield (key, b[key])
return dict(_merge_dicts_(dict_a, dict_b))
def format_docstring(contents):
"""Python doc strings come in a number of formats, but LSP wants markdown.
Until we can find a fast enough way of discovering and parsing each format,
we can do a little better by at least preserving indentation.
"""
contents = contents.replace('\t', u'\u00A0' * 4)
contents = contents.replace(' ', u'\u00A0' * 2)
return contents
def clip_column(column, lines, line_number):
"""
Normalise the position as per the LSP that accepts character positions > line length
https://microsoft.github.io/language-server-protocol/specification#position
"""
max_column = len(lines[line_number].rstrip('\r\n')) if len(lines) > line_number else 0
return min(column, max_column)
def position_to_jedi_linecolumn(document, position):
"""
Convert the LSP format 'line', 'character' to Jedi's 'line', 'column'
https://microsoft.github.io/language-server-protocol/specification#position
"""
code_position = {}
if position:
code_position = {'line': position['line'] + 1,
'column': clip_column(position['character'],
document.lines,
position['line'])}
return code_position
if os.name == 'nt':
import ctypes
kernel32 = ctypes.windll.kernel32
PROCESS_QUERY_INFROMATION = 0x1000
def is_process_alive(pid):
"""Check whether the process with the given pid is still alive.
Running `os.kill()` on Windows always exits the process, so it can't be used to check for an alive process.
see: https://docs.python.org/3/library/os.html?highlight=os%20kill#os.kill
Hence ctypes is used to check for the process directly via windows API avoiding any other 3rd-party dependency.
Args:
pid (int): process ID
Returns:
bool: False if the process is not alive or don't have permission to check, True otherwise.
"""
process = kernel32.OpenProcess(PROCESS_QUERY_INFROMATION, 0, pid)
if process != 0:
kernel32.CloseHandle(process)
return True
return False
else:
import errno
def is_process_alive(pid):
"""Check whether the process with the given pid is still alive.
Args:
pid (int): process ID
Returns:
bool: False if the process is not alive or don't have permission to check, True otherwise.
"""
if pid < 0:
return False
try:
os.kill(pid, 0)
except OSError as e:
return e.errno == errno.EPERM
else:
return True