Skip to content

Commit 3400179

Browse files
committed
Allos the size of the bulk request to be defined in bytes
Fixes #199
1 parent 928fc00 commit 3400179

File tree

2 files changed

+44
-19
lines changed

2 files changed

+44
-19
lines changed

elasticsearch/helpers/__init__.py

Lines changed: 29 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,6 @@
11
from __future__ import unicode_literals
22

33
import logging
4-
from itertools import islice
54
from operator import methodcaller
65

76
from ..exceptions import ElasticsearchException, TransportError
@@ -40,22 +39,35 @@ def expand_action(data):
4039

4140
return action, data.get('_source', data)
4241

43-
def _chunk_actions(actions, chunk_size, serializer):
44-
while True:
45-
bulk_actions = []
46-
for action, data in islice(actions, chunk_size):
47-
bulk_actions.append(serializer.dumps(action))
48-
if data is not None:
49-
bulk_actions.append(serializer.dumps(data))
50-
51-
if not bulk_actions:
52-
return
53-
42+
def _chunk_actions(actions, chunk_size, max_chunk_bytes, serializer):
43+
bulk_actions = []
44+
size, action_count = 0, 0
45+
for action, data in actions:
46+
action = serializer.dumps(action)
47+
cur_size = len(action) + 1
48+
49+
if data is not None:
50+
data = serializer.dumps(data)
51+
cur_size += len(data) + 1
52+
53+
# full chunk, send it and start a new one
54+
if bulk_actions and (size + cur_size > max_chunk_bytes or action_count == chunk_size):
55+
yield bulk_actions
56+
bulk_actions = []
57+
size, action_count = 0, 0
58+
59+
bulk_actions.append(action)
60+
if data is not None:
61+
bulk_actions.append(data)
62+
size += cur_size
63+
action_count += 1
64+
65+
if bulk_actions:
5466
yield bulk_actions
5567

56-
def streaming_bulk(client, actions, chunk_size=500, raise_on_error=True,
57-
expand_action_callback=expand_action, raise_on_exception=True,
58-
**kwargs):
68+
def streaming_bulk(client, actions, chunk_size=500, max_chunk_bytes=100 * 1014 * 1024,
69+
raise_on_error=True, expand_action_callback=expand_action,
70+
raise_on_exception=True, **kwargs):
5971
"""
6072
Streaming bulk consumes actions from the iterable passed in and yields
6173
results per action. For non-streaming usecases use
@@ -101,6 +113,7 @@ def streaming_bulk(client, actions, chunk_size=500, raise_on_error=True,
101113
:arg client: instance of :class:`~elasticsearch.Elasticsearch` to use
102114
:arg actions: iterable containing the actions to be executed
103115
:arg chunk_size: number of docs in one chunk sent to es (default: 500)
116+
:arg max_chunk_bytes: the maximum size of the request in bytes (default: 100MB)
104117
:arg raise_on_error: raise ``BulkIndexError`` containing errors (as `.errors`)
105118
from the execution of the last chunk when some occur. By default we raise.
106119
:arg raise_on_exception: if ``False`` then don't propagate exceptions from
@@ -115,7 +128,7 @@ def streaming_bulk(client, actions, chunk_size=500, raise_on_error=True,
115128
# if raise on error is set, we need to collect errors per chunk before raising them
116129
errors = []
117130

118-
for bulk_actions in _chunk_actions(actions, chunk_size, serializer):
131+
for bulk_actions in _chunk_actions(actions, chunk_size, max_chunk_bytes, serializer):
119132
try:
120133
# send the actual request
121134
resp = client.bulk('\n'.join(bulk_actions) + '\n', **kwargs)

test_elasticsearch/test_server/test_helpers.py

Lines changed: 15 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,21 @@
1-
from datetime import datetime
2-
31
from elasticsearch import helpers, TransportError
2+
from elasticsearch.serializer import JSONSerializer
43

54
from . import ElasticsearchTestCase
6-
from ..test_cases import SkipTest
5+
from ..test_cases import SkipTest, TestCase
6+
7+
8+
class TestChunkActions(TestCase):
9+
def setUp(self):
10+
super(TestChunkActions, self).setUp()
11+
self.actions = [({'index': {}}, {'some': 'data', 'i': i}) for i in range(100)]
12+
13+
def test_chunks_are_chopped_by_byte_size(self):
14+
self.assertEquals(100, len(list(helpers._chunk_actions(self.actions, 100000, 1, JSONSerializer()))))
15+
16+
def test_chunks_are_chopped_by_chunk_size(self):
17+
self.assertEquals(10, len(list(helpers._chunk_actions(self.actions, 10, 99999999, JSONSerializer()))))
18+
719

820
class FailingBulkClient(object):
921
def __init__(self, client, fail_at=1):

0 commit comments

Comments
 (0)