Skip to content

Commit ede94da

Browse files
honzakraldi
authored andcommitted
Fix reindex (#3989)
* Make sure data always goes to new index Fixes #3746 * Make celery workers more short lived * Fix tests
1 parent 359b75e commit ede94da

File tree

3 files changed

+20
-8
lines changed

3 files changed

+20
-8
lines changed

Procfile.cabotage

+1-1
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
release: bin/release-cabotage
22
web: bin/start-web-cabotage python -m gunicorn.app.wsgiapp -c gunicorn-cabotage.conf warehouse.wsgi:application
33
web-uploads: bin/start-web-cabotage python -m gunicorn.app.wsgiapp -c gunicorn-uploads-cabotage.conf warehouse.wsgi:application
4-
worker: bin/start-worker celery -A warehouse worker -l info --max-tasks-per-child 128
4+
worker: bin/start-worker celery -A warehouse worker -l info --max-tasks-per-child 32
55
worker-beat: bin/start-worker celery -A warehouse beat -S redbeat.RedBeatScheduler -l info

tests/unit/search/test_tasks.py

+14-5
Original file line numberDiff line numberDiff line change
@@ -128,9 +128,10 @@ def project_docs(db):
128128
class TestException(Exception):
129129
pass
130130

131-
def parallel_bulk(client, iterable):
131+
def parallel_bulk(client, iterable, index=None):
132132
assert client is es_client
133133
assert iterable is docs
134+
assert index == "warehouse-cbcbcbcbcb"
134135
raise TestException
135136

136137
monkeypatch.setattr(
@@ -176,15 +177,19 @@ def project_docs(db):
176177
lambda *a, **kw: es_client
177178
)
178179

179-
parallel_bulk = pretend.call_recorder(lambda client, iterable: [None])
180+
parallel_bulk = pretend.call_recorder(
181+
lambda client, iterable, index: [None]
182+
)
180183
monkeypatch.setattr(
181184
warehouse.search.tasks, "parallel_bulk", parallel_bulk)
182185

183186
monkeypatch.setattr(os, "urandom", lambda n: b"\xcb" * n)
184187

185188
reindex(db_request)
186189

187-
assert parallel_bulk.calls == [pretend.call(es_client, docs)]
190+
assert parallel_bulk.calls == [
191+
pretend.call(es_client, docs, index="warehouse-cbcbcbcbcb")
192+
]
188193
assert es_client.indices.create.calls == [
189194
pretend.call(
190195
body={
@@ -247,15 +252,19 @@ def project_docs(db):
247252
lambda *a, **kw: es_client
248253
)
249254

250-
parallel_bulk = pretend.call_recorder(lambda client, iterable: [None])
255+
parallel_bulk = pretend.call_recorder(
256+
lambda client, iterable, index: [None]
257+
)
251258
monkeypatch.setattr(
252259
warehouse.search.tasks, "parallel_bulk", parallel_bulk)
253260

254261
monkeypatch.setattr(os, "urandom", lambda n: b"\xcb" * n)
255262

256263
reindex(db_request)
257264

258-
assert parallel_bulk.calls == [pretend.call(es_client, docs)]
265+
assert parallel_bulk.calls == [
266+
pretend.call(es_client, docs, index="warehouse-cbcbcbcbcb")
267+
]
259268
assert es_client.indices.create.calls == [
260269
pretend.call(
261270
body={

warehouse/search/tasks.py

+5-2
Original file line numberDiff line numberDiff line change
@@ -94,7 +94,9 @@ def _project_docs(db):
9494
for release in windowed_query(release_data, Release.name, 50000):
9595
p = ProjectDocType.from_db(release)
9696
p.full_clean()
97-
yield p.to_dict(include_meta=True)
97+
doc = p.to_dict(include_meta=True)
98+
doc.pop('_index', None)
99+
yield doc
98100

99101

100102
@tasks.task(ignore_result=True, acks_late=True)
@@ -143,7 +145,8 @@ def reindex(request):
143145
try:
144146
request.db.execute("SET statement_timeout = '600s'")
145147

146-
for _ in parallel_bulk(client, _project_docs(request.db)):
148+
for _ in parallel_bulk(client, _project_docs(request.db),
149+
index=new_index_name):
147150
pass
148151
except: # noqa
149152
new_index.delete()

0 commit comments

Comments
 (0)