-
Notifications
You must be signed in to change notification settings - Fork 11
/
Copy pathworkspace.py
461 lines (384 loc) · 15.8 KB
/
workspace.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
import os
import warnings
from urlparse import urljoin
from unidecode import unidecode
from git import Repo
from elasticutils import get_es, Q, F
from elasticgit.storage import StorageManager, RemoteStorageManager
from elasticgit.search import ESManager, S
import logging
log = logging.getLogger(__name__)
class Workspace(object):
"""
The main API exposing a model interface to both a Git repository
and an Elasticsearch index.
:param git.Repo repo:
A :py:class:`git.Repo` instance.
:param dit es:
A dictionary of values one would pass to elasticutils.get_es
to get an Elasticsearch connection
:param str index_prefix:
The prefix to use when generating index names for Elasticsearch
"""
def __init__(self, repo, es, index_prefix):
self.repo = repo
self.sm = StorageManager(repo)
self.es_settings = es
self.im = ESManager(
self.sm, get_es(**self.es_settings), index_prefix)
self.working_dir = self.repo.working_dir
self.index_prefix = index_prefix
def setup(self, name, email):
"""
Setup a Git repository & ES index if they do not yet exist.
This is safe to run if already existing.
:param str name:
The name of the committer in this repository.
:param str email:
The email address of the committer in this repository.
"""
if not self.sm.storage_exists():
self.sm.create_storage()
self.sm.write_config('user', {
'name': name,
'email': email,
})
if not self.im.index_exists(self.sm.active_branch()):
self.im.create_index(self.sm.active_branch())
def exists(self):
"""
Check if the Git repository or the ES index exists.
Returns ``True`` if either of them exist.
:returns: bool
"""
if self.sm.storage_exists():
return self.im.index_exists(self.sm.active_branch())
return False
def destroy(self):
"""
Removes an ES index and a Git repository completely.
Guaranteed to remove things completely, use with caution.
"""
if self.sm.storage_exists():
if self.im.index_exists(self.sm.active_branch()):
self.im.destroy_index(self.sm.active_branch())
self.sm.destroy_storage()
def save(self, model, message, author=None, committer=None):
"""
Save a :py:class:`elasticgit.models.Model` instance in Git and add it
to the Elasticsearch index.
:param elasticgit.models.Model model:
The model instance
:param str message:
The commit message to write the model to Git with.
:param tuple author:
The author information (name, email address)
Defaults repo default if unspecified.
:param tuple committer:
The committer information (name, email address).
Defaults to the author if unspecified.
"""
if isinstance(message, unicode):
message = unidecode(message)
self.sm.store(model, message, author=author, committer=committer)
self.im.index(model)
def delete(self, model, message, author=None, committer=None):
"""
Delete a :py:class`elasticgit.models.Model` instance from Git and
the Elasticsearch index.
:param elasticgit.models.Model model:
The model instance
:param str message:
The commit message to remove the model from Git with.
:param tuple author:
The author information (name, email address)
Defaults repo default if unspecified.
:param tuple committer:
The committer information (name, email address).
Defaults to the author if unspecified.
"""
if isinstance(message, unicode):
message = unidecode(message)
self.sm.delete(model, message, author=author, committer=committer)
self.im.unindex(model)
def fast_forward(self, branch_name='master', remote_name='origin'):
warnings.warn('This method is deprecated, use pull() instead',
DeprecationWarning)
return self.pull(branch_name=branch_name, remote_name=remote_name)
def index_diff(self, diff_index):
# NOTE: This is probably more complicated than it needs to be
# If we have multiple remotes GitPython gets confused about
# deletes. It marks things as deletes because it may not
# exist on another remote.
#
# Here we loop over all changes, track the models that've
# changed and then reindex fully to make sure we're in sync.
if len(self.repo.remotes) > 1 and any(diff_index):
return self.reindex_diff(diff_index)
# NOTE: There's a very unlikely scenario where we're dealing with
# renames. This generally can only happen when a repository
# has been manually modififed. If that's the case then
# reindex everything as well
if any(diff_index.iter_change_type('R')):
return self.reindex_diff(diff_index)
# unindex deleted blobs
for diff in diff_index.iter_change_type('D'):
path_info = self.sm.path_info(diff.a_blob.path)
if path_info is None:
continue
self.im.raw_unindex(*path_info)
# reindex added blobs
for diff in diff_index.iter_change_type('A'):
path_info = self.sm.path_info(diff.b_blob.path)
if path_info is None:
continue
obj = self.sm.get(*path_info)
self.im.index(obj)
# reindex modified blobs
for diff in diff_index.iter_change_type('M'):
path_info = self.sm.path_info(diff.a_blob.path)
if path_info is None:
continue
obj = self.sm.get(*path_info)
self.im.index(obj)
def reindex_diff(self, diff_index):
changed_model_set = set([])
for diff in diff_index:
if diff.new_file:
path_info = self.sm.path_info(diff.b_blob.path)
if path_info is not None:
changed_model_set.add(path_info[0])
elif diff.renamed:
path_info = self.sm.path_info(diff.a_blob.path)
if path_info is not None:
changed_model_set.add(path_info[0])
else:
path_info = self.sm.path_info(diff.a_blob.path)
if path_info is not None:
changed_model_set.add(path_info[0])
for model_class in changed_model_set:
self.reindex(model_class)
def pull(self, branch_name='master', remote_name='origin'):
"""
Fetch & Merge in an upstream's commits.
:param str branch_name:
The name of the branch to fast forward & merge in
:param str remote_name:
The name of the remote to fetch from.
"""
changes = self.sm.pull(branch_name=branch_name,
remote_name=remote_name)
return self.index_diff(changes)
def reindex_iter(self, model_class, refresh_index=True):
"""
Reindex everything that Git knows about in an iterator
:param elasticgit.models.Model model_class:
:param bool refresh_index:
Whether or not to refresh the index after everything has
been indexed. Defaults to ``True``
"""
if not self.im.index_exists(self.sm.active_branch()):
self.im.create_index(self.sm.active_branch())
iterator = self.sm.iterate(model_class)
for model in iterator:
yield self.im.index(model)
if refresh_index:
self.refresh_index()
def reindex(self, model_class, refresh_index=True):
"""
Same as :py:func:`reindex_iter` but returns a list instead of
a generator.
"""
return list(
self.reindex_iter(model_class, refresh_index=refresh_index))
def refresh_index(self):
"""
Manually refresh the Elasticsearch index. In production this is
not necessary but it is useful when running tests.
"""
self.im.refresh_indices(self.sm.active_branch())
def index_ready(self):
"""
Check if the index is ready
:returns: bool
"""
return self.im.index_ready(self.sm.active_branch())
def sync(self, model_class, refresh_index=True):
"""
Resync a workspace, it assumes the Git repository is the source
of truth and Elasticsearch is made to match. This involves two
passes, first to index everything that Git knows about and
unindexing everything that's in Elastisearch that Git does not
know about.
:param elasticgit.models.Model model_class:
The model to resync
:param bool refresh_index:
Whether or not to refresh the index after indexing
everything from Git
"""
reindexed_uuids = set([])
removed_uuids = set([])
for model_obj in self.reindex_iter(model_class,
refresh_index=refresh_index):
reindexed_uuids.add(model_obj.uuid)
for result in self.S(model_class).everything():
if result.uuid not in reindexed_uuids:
self.im.raw_unindex(model_class, result.uuid)
removed_uuids.add(result.uuid)
return reindexed_uuids, removed_uuids
def setup_mapping(self, model_class):
"""
Add a custom mapping for a model_class
:param elasticgit.models.Model model_class:
:returns: dict, the decoded dictionary from Elasticsearch
"""
return self.im.setup_mapping(self.sm.active_branch(), model_class)
def setup_custom_mapping(self, model_class, mapping):
"""
Add a custom mapping for a model class instead of accepting
what the model_class defines.
:param elasticgit.models.Model model_class:
:param dict: the Elastisearch mapping definition
:returns: dict, the decoded dictionary from Elasticsearch
"""
return self.im.setup_custom_mapping(
self.sm.active_branch(), model_class, mapping)
def get_mapping(self, model_class):
"""
Get a mapping from Elasticsearch for a model_class
:param elasticgit.models.Model model_class:
:returns: dict
"""
return self.im.get_mapping(self.sm.active_branch(), model_class)
def S(self, model_class):
"""
Get a :py:class:`elasticutils.S` object for the given
model class. Under the hood this dynamically generates a
:py:class:`elasticutils.MappingType` and
:py:class:`elasticutils.Indexable` subclass which maps the
Elasticsearch results to :py:class:`elasticgit.models.Model`
instances on the UUIDs.
:param elasticgit.models.Model model_class:
The class to provide a search interface for.
"""
return S(
self.im.get_mapping_type(model_class)).es(**self.es_settings)
class RemoteWorkspace(Workspace):
"""
A workspace that connects to a unicore.distribute server hosted
somewhere on the network.
This is a read only version of the :py:class:`Workspace`
"""
def __init__(self, url, es=None, index_prefix=None):
"""
:param str url:
The URL of the unicore.distribute server.
:param dict es:
The parameters for connecting to Elasticsearch to. If not specified
then the default unicore.distribute ES proxy would be used.
This defaults to ``/esapi`` on the host of the ``url`` parameter
provided.
:param str index_prefix:
The prefix to use when generating index names for Elasticsearch
"""
self.sm = RemoteStorageManager(url)
self.index_prefix = index_prefix or self.sm.repo_name
self.es_settings = es or {'urls': urljoin(url, '/esapi')}
self.im = ESManager(
self.sm,
es=get_es(**self.es_settings),
index_prefix=self.index_prefix)
def reindex_changes(self, changes):
changed_model_set = set([])
for change in changes:
if change['type'] == 'A':
path_info = self.sm.path_info(change['path'])
if path_info is not None:
changed_model_set.add(path_info[0])
elif change['type'] == 'R':
path_info = self.sm.path_info(change['rename_to'])
if path_info is not None:
changed_model_set.add(path_info[0])
else:
path_info = self.sm.path_info(change.get('path'))
if path_info is not None:
changed_model_set.add(path_info[0])
for model_class in changed_model_set:
self.reindex(model_class)
def pull(self, branch_name='master', remote_name='origin'):
changes = self.sm.pull(branch_name=branch_name,
remote_name=remote_name)
def pick_type(change_type):
return filter(
lambda change: change['type'] == change_type, changes)
# NOTE: There's a very unlikely scenario where we're dealing with
# renames. This generally can only happen when a repository
# has been manually modififed. If that's the case then
# reindex everything as well
if pick_type('R'):
return self.reindex_changes(changes)
# unindex deleted blobs
for diff in pick_type('D'):
path_info = self.sm.path_info(diff['path'])
if path_info is None:
continue
self.im.raw_unindex(*path_info)
# reindex added blobs
for diff in pick_type('A'):
path_info = self.sm.path_info(diff['path'])
if path_info is None:
continue
obj = self.sm.get(*path_info)
self.im.index(obj)
# reindex modified blobs
for diff in pick_type('M'):
path_info = self.sm.path_info(diff['path'])
if path_info is None:
continue
obj = self.sm.get(*path_info)
self.im.index(obj)
class EG(object):
"""
A helper function for things in ElasticGit.
"""
@classmethod
def workspace(cls, workdir, es={}, index_prefix=None):
"""
Create a workspace
:param str workdir:
The path to the directory where a git repository can
be found or needs to be created when
:py:meth:`.Workspace.setup` is called.
:param dict es:
The parameters to pass along to :func:`elasticutils.get_es`
:param str index_prefix:
The index_prefix use when generating index names for
Elasticsearch
:returns:
:py:class:`.Workspace`
"""
index_prefix = index_prefix or os.path.basename(workdir)
repo = (cls.read_repo(workdir)
if cls.is_repo(workdir)
else cls.init_repo(workdir))
return Workspace(repo, es, index_prefix)
@classmethod
def dot_git_path(cls, workdir):
return os.path.join(workdir, '.git')
@classmethod
def is_repo(cls, workdir):
return cls.is_dir(cls.dot_git_path(workdir))
@classmethod
def is_dir(cls, workdir):
return os.path.isdir(workdir)
@classmethod
def read_repo(cls, workdir):
return Repo(workdir)
@classmethod
def init_repo(cls, workdir, bare=False):
return Repo.init(workdir, bare=bare)
@classmethod
def clone_repo(cls, repo_url, workdir):
return Repo.clone_from(repo_url, workdir)
Q
F