Skip to content

Commit 928fc00

Browse files
committed
Serialize the data in bulk helper
1 parent 9e4d0dd commit 928fc00

File tree

2 files changed

+14
-8
lines changed

2 files changed

+14
-8
lines changed

elasticsearch/helpers/__init__.py

Lines changed: 13 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,5 @@
1+
from __future__ import unicode_literals
2+
13
import logging
24
from itertools import islice
35
from operator import methodcaller
@@ -38,13 +40,13 @@ def expand_action(data):
3840

3941
return action, data.get('_source', data)
4042

41-
def _chunk_actions(actions, chunk_size):
43+
def _chunk_actions(actions, chunk_size, serializer):
4244
while True:
4345
bulk_actions = []
4446
for action, data in islice(actions, chunk_size):
45-
bulk_actions.append(action)
47+
bulk_actions.append(serializer.dumps(action))
4648
if data is not None:
47-
bulk_actions.append(data)
49+
bulk_actions.append(serializer.dumps(data))
4850

4951
if not bulk_actions:
5052
return
@@ -107,17 +109,16 @@ def streaming_bulk(client, actions, chunk_size=500, raise_on_error=True,
107109
should return a tuple containing the action line and the data line
108110
(`None` if data line should be omitted).
109111
"""
112+
serializer = client.transport.serializer
110113
actions = map(expand_action_callback, actions)
111114

112115
# if raise on error is set, we need to collect errors per chunk before raising them
113116
errors = []
114117

115-
for bulk_actions in _chunk_actions(actions, chunk_size):
116-
117-
118+
for bulk_actions in _chunk_actions(actions, chunk_size, serializer):
118119
try:
119120
# send the actual request
120-
resp = client.bulk(bulk_actions, **kwargs)
121+
resp = client.bulk('\n'.join(bulk_actions) + '\n', **kwargs)
121122
except TransportError as e:
122123
# default behavior - just propagate exception
123124
if raise_on_exception:
@@ -126,7 +127,11 @@ def streaming_bulk(client, actions, chunk_size=500, raise_on_error=True,
126127
# if we are not propagating, mark all actions in current chunk as failed
127128
err_message = str(e)
128129
exc_errors = []
129-
bulk_data = iter(bulk_actions)
130+
131+
# deserialize the data back, thisis expensive but only run on
132+
# errors if raise_on_exception is false, so shouldn't be a real
133+
# issue
134+
bulk_data = iter(map(serializer.loads, bulk_actions))
130135
while True:
131136
try:
132137
# collect all the information about failed actions

test_elasticsearch/test_server/test_helpers.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@ def __init__(self, client, fail_at=1):
1010
self.client = client
1111
self._called = -1
1212
self._fail_at = fail_at
13+
self.transport = client.transport
1314

1415
def bulk(self, *args, **kwargs):
1516
self._called += 1

0 commit comments

Comments
 (0)