Skip to content

Commit a177375

Browse files
committed
Don't keep the data twice in bulk helper
1 parent 0ad6b94 commit a177375

File tree

1 file changed

+13
-9
lines changed

1 file changed

+13
-9
lines changed

elasticsearch/helpers/__init__.py

Lines changed: 13 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -105,10 +105,6 @@ def streaming_bulk(client, actions, chunk_size=500, raise_on_error=True,
105105

106106
for chunk in _chunk_actions(actions, chunk_size):
107107

108-
# raise on exception means we might need to iterate on chunk twice
109-
if not raise_on_exception:
110-
chunk = list(chunk)
111-
112108
bulk_actions = []
113109
for action, data in chunk:
114110
bulk_actions.append(action)
@@ -129,11 +125,19 @@ def streaming_bulk(client, actions, chunk_size=500, raise_on_error=True,
129125
# if we are not propagating, mark all actions in current chunk as failed
130126
err_message = str(e)
131127
exc_errors = []
132-
for action, data in chunk:
133-
info = {"error": err_message, "status": e.status_code, "exception": e, "data": data}
134-
op_type, action = action.popitem()
135-
info.update(action)
136-
exc_errors.append({op_type: info})
128+
bulk_data = iter(bulk_actions)
129+
while True:
130+
try:
131+
# collect all the information about failed actions
132+
action = next(bulk_data)
133+
op_type, action = action.popitem()
134+
info = {"error": err_message, "status": e.status_code, "exception": e}
135+
if op_type != 'delete':
136+
info['data'] = next(bulk_data)
137+
info.update(action)
138+
exc_errors.append({op_type: info})
139+
except StopIteration:
140+
break
137141

138142
# emulate standard behavior for failed actions
139143
if raise_on_error:

0 commit comments

Comments
 (0)