Skip to content

Commit 5b6b6bb

Browse files
Correct bulk size in bulk param source
With this commit we correct the bulk size to *not* take the action-and-metadata line into account for the bulk size calculation. This problem has been introduced in 863d532. Relates elastic#45
1 parent 59112a9 commit 5b6b6bb

File tree

2 files changed

+60
-2
lines changed

2 files changed

+60
-2
lines changed

eventdata/parameter_sources/elasticlogs_bulk_source.py

Lines changed: 9 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -81,7 +81,13 @@ def __init__(self, track, params, **kwargs):
8181
self.orig_args = [track, params, kwargs]
8282
self._indices = track.indices
8383
self._params = params
84-
self._randomevent = RandomEvent(params)
84+
# we could also do `kwargs.get("random_event", RandomEvent(params))` but that would call the constructor eagerly
85+
# which we want to avoid because this can cause significant overhead.
86+
if "random_event" in kwargs:
87+
self._randomevent = kwargs["random_event"]
88+
else:
89+
self._randomevent = RandomEvent(params)
90+
8591

8692
self._bulk_size = 1000
8793
if 'bulk-size' in params.keys():
@@ -192,7 +198,8 @@ def params(self):
192198
response = {
193199
"body": "\n".join(bulk_array),
194200
"action-metadata-present": True,
195-
"bulk-size": len(bulk_array)
201+
# the bulk array contains the action-and-metadata line and the actual document
202+
"bulk-size": len(bulk_array) // 2
196203
}
197204

198205
if "pipeline" in self._params.keys():
Lines changed: 51 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,51 @@
1+
from eventdata.parameter_sources.elasticlogs_bulk_source import ElasticlogsBulkSource
2+
3+
4+
class StaticEventGenerator:
5+
def __init__(self, index, type, doc, at_most=-1):
6+
self.index = index
7+
self.type = type
8+
self.doc = doc
9+
self.at_most = at_most
10+
11+
def generate_event(self):
12+
if self.at_most == 0:
13+
raise StopIteration()
14+
self.at_most -= 1
15+
return self.doc, self.index, self.type
16+
17+
18+
class StaticTrack:
19+
def __init__(self):
20+
self.indices = []
21+
22+
23+
def test_generates_a_complete_bulk():
24+
expected_bulk_size = 10
25+
26+
generator = StaticEventGenerator(index="elasticlogs", type="_doc", doc='{"location": [-0.1485188,51.5250666]}')
27+
param_source = ElasticlogsBulkSource(track=StaticTrack(), params={
28+
"index": "elasticlogs",
29+
"bulk-size": expected_bulk_size
30+
}, random_event=generator)
31+
client_param_source = param_source.partition(partition_index=0, total_partitions=1)
32+
33+
generated_params = client_param_source.params()
34+
assert len(generated_params["body"].split("\n")) == 2 * expected_bulk_size
35+
assert generated_params["action-metadata-present"] is True
36+
assert generated_params["bulk-size"] == expected_bulk_size
37+
38+
39+
def test_generates_a_bulk_that_ends_prematurely():
40+
generator = StaticEventGenerator(index="elasticlogs", type="_doc", doc='{"loc": [-0.14851,51.5250]}', at_most=5)
41+
param_source = ElasticlogsBulkSource(track=StaticTrack(), params={
42+
"index": "elasticlogs",
43+
"bulk-size": 10
44+
}, random_event=generator)
45+
client_param_source = param_source.partition(partition_index=0, total_partitions=1)
46+
47+
generated_params = client_param_source.params()
48+
# the actual bulk size does not matter but instead that the generator stopped prematurely after 5 items
49+
assert len(generated_params["body"].split("\n")) == 10
50+
assert generated_params["action-metadata-present"] is True
51+
assert generated_params["bulk-size"] == 5

0 commit comments

Comments
 (0)