Skip to content

Commit b4ca550

Browse files
authored
Backport #16482 to 7.17: Bugfix for BufferedTokenizer to completely consume lines in case of lines bigger then sizeLimit (#16577)
* Bugfix for BufferedTokenizer to completely consume lines in case of lines bigger then sizeLimit (#16482) Fixes the behaviour of the tokenizer to be able to work properly when buffer full conditions are met. Updates BufferedTokenizerExt so that can accumulate token fragments coming from different data segments. When a "buffer full" condition is matched, it record this state in a local field so that on next data segment it can consume all the token fragments till the next token delimiter. Updated the accumulation variable from RubyArray containing strings to a StringBuilder which contains the head token, plus the remaining token fragments are stored in the input array. Furthermore it translates the `buftok_spec` tests into JUnit tests. * Fixed compilation error due to libraries incompatibilities - usage of `data.convertToString().split(context, delimiter, MINUS_ONE);` instead of `data.convertToString().split(delimiter, -1);` - avoid to extend BuffererdTokenir test cases from `org.logstash.RubyTestBase` which was introduced in #13159 - JDK 8 compatibilities: - `Arrays.asList` vs `List.of` - `assertThrows` method from JUnit5 not available in JUnit4 so reimplemented in the test
1 parent e742b76 commit b4ca550

File tree

4 files changed

+331
-13
lines changed

4 files changed

+331
-13
lines changed

logstash-core/src/main/java/org/logstash/common/BufferedTokenizerExt.java

Lines changed: 56 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@
2222

2323
import org.jruby.Ruby;
2424
import org.jruby.RubyArray;
25+
import org.jruby.RubyBoolean;
2526
import org.jruby.RubyClass;
2627
import org.jruby.RubyObject;
2728
import org.jruby.RubyString;
@@ -39,10 +40,12 @@ public class BufferedTokenizerExt extends RubyObject {
3940
private static final IRubyObject MINUS_ONE = RubyUtil.RUBY.newFixnum(-1);
4041

4142
private @SuppressWarnings("rawtypes") RubyArray input = RubyUtil.RUBY.newArray();
43+
private StringBuilder headToken = new StringBuilder();
4244
private IRubyObject delimiter = RubyUtil.RUBY.newString("\n");
4345
private int sizeLimit;
4446
private boolean hasSizeLimit;
4547
private int inputSize;
48+
private boolean bufferFullErrorNotified = false;
4649

4750
public BufferedTokenizerExt(final Ruby runtime, final RubyClass metaClass) {
4851
super(runtime, metaClass);
@@ -65,7 +68,6 @@ public IRubyObject init(final ThreadContext context, IRubyObject[] args) {
6568
* Extract takes an arbitrary string of input data and returns an array of
6669
* tokenized entities, provided there were any available to extract. This
6770
* makes for easy processing of datagrams using a pattern like:
68-
*
6971
* {@code tokenizer.extract(data).map { |entity| Decode(entity) }.each do}
7072
*
7173
* @param context ThreadContext
@@ -75,23 +77,64 @@ public IRubyObject init(final ThreadContext context, IRubyObject[] args) {
7577
@JRubyMethod
7678
@SuppressWarnings("rawtypes")
7779
public RubyArray extract(final ThreadContext context, IRubyObject data) {
78-
final RubyArray entities = ((RubyString) data).split(context, delimiter, MINUS_ONE);
80+
final RubyArray entities = data.convertToString().split(context, delimiter, MINUS_ONE);
81+
if (!bufferFullErrorNotified) {
82+
input.clear();
83+
input.addAll(entities);
84+
} else {
85+
// after a full buffer signal
86+
if (input.isEmpty()) {
87+
// after a buffer full error, the remaining part of the line, till next delimiter,
88+
// has to be consumed, unless the input buffer doesn't still contain fragments of
89+
// subsequent tokens.
90+
entities.shift(context);
91+
input.addAll(entities);
92+
} else {
93+
// merge last of the input with first of incoming data segment
94+
if (!entities.isEmpty()) {
95+
RubyString last = ((RubyString) input.pop(context));
96+
RubyString nextFirst = ((RubyString) entities.shift(context));
97+
entities.unshift(last.concat(nextFirst));
98+
input.addAll(entities);
99+
}
100+
}
101+
}
102+
79103
if (hasSizeLimit) {
80-
final int entitiesSize = ((RubyString) entities.first()).size();
104+
if (bufferFullErrorNotified) {
105+
bufferFullErrorNotified = false;
106+
if (input.isEmpty()) {
107+
return RubyUtil.RUBY.newArray();
108+
}
109+
}
110+
final int entitiesSize = ((RubyString) input.first()).size();
81111
if (inputSize + entitiesSize > sizeLimit) {
112+
bufferFullErrorNotified = true;
113+
headToken = new StringBuilder();
114+
inputSize = 0;
115+
input.shift(context); // consume the token fragment that generates the buffer full
82116
throw new IllegalStateException("input buffer full");
83117
}
84118
this.inputSize = inputSize + entitiesSize;
85119
}
86-
input.append(entities.shift(context));
87-
if (entities.isEmpty()) {
120+
121+
if (input.getLength() < 2) {
122+
// this is a specialization case which avoid adding and removing from input accumulator
123+
// when it contains just one element
124+
headToken.append(input.shift(context)); // remove head
88125
return RubyUtil.RUBY.newArray();
89126
}
90-
entities.unshift(input.join(context));
91-
input.clear();
92-
input.append(entities.pop(context));
93-
inputSize = ((RubyString) input.first()).size();
94-
return entities;
127+
128+
if (headToken.length() > 0) {
129+
// if there is a pending token part, merge it with the first token segment present
130+
// in the accumulator, and clean the pending token part.
131+
headToken.append(input.shift(context)); // append buffer to first element and
132+
input.unshift(RubyUtil.toRubyObject(headToken.toString())); // reinsert it into the array
133+
headToken = new StringBuilder();
134+
}
135+
headToken.append(input.pop(context)); // put the leftovers in headToken for later
136+
inputSize = headToken.length();
137+
return input;
95138
}
96139

97140
/**
@@ -103,14 +146,14 @@ public RubyArray extract(final ThreadContext context, IRubyObject data) {
103146
*/
104147
@JRubyMethod
105148
public IRubyObject flush(final ThreadContext context) {
106-
final IRubyObject buffer = input.join(context);
107-
input.clear();
149+
final IRubyObject buffer = RubyUtil.toRubyObject(headToken.toString());
150+
headToken = new StringBuilder();
108151
return buffer;
109152
}
110153

111154
@JRubyMethod(name = "empty?")
112155
public IRubyObject isEmpty(final ThreadContext context) {
113-
return input.empty_p();
156+
return RubyBoolean.newBoolean(context.runtime, headToken.toString().isEmpty());
114157
}
115158

116159
}
Lines changed: 90 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,90 @@
1+
/*
2+
* Licensed to Elasticsearch B.V. under one or more contributor
3+
* license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright
5+
* ownership. Elasticsearch B.V. licenses this file to you under
6+
* the Apache License, Version 2.0 (the "License"); you may
7+
* not use this file except in compliance with the License.
8+
* You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
20+
package org.logstash.common;
21+
22+
import org.jruby.RubyArray;
23+
import org.jruby.RubyString;
24+
import org.jruby.runtime.ThreadContext;
25+
import org.jruby.runtime.builtin.IRubyObject;
26+
import org.junit.Before;
27+
import org.junit.Test;
28+
import org.logstash.RubyUtil;
29+
30+
import java.util.Arrays;
31+
32+
import static org.junit.Assert.assertEquals;
33+
import static org.junit.Assert.assertTrue;
34+
import static org.logstash.RubyUtil.RUBY;
35+
36+
@SuppressWarnings("unchecked")
37+
public final class BufferedTokenizerExtTest {
38+
39+
private BufferedTokenizerExt sut;
40+
private ThreadContext context;
41+
42+
@Before
43+
public void setUp() {
44+
sut = new BufferedTokenizerExt(RubyUtil.RUBY, RubyUtil.BUFFERED_TOKENIZER);
45+
context = RUBY.getCurrentContext();
46+
IRubyObject[] args = {};
47+
sut.init(context, args);
48+
}
49+
50+
@Test
51+
public void shouldTokenizeASingleToken() {
52+
RubyArray<RubyString> tokens = (RubyArray<RubyString>) sut.extract(context, RubyUtil.RUBY.newString("foo\n"));
53+
54+
assertEquals(Arrays.asList("foo"), tokens);
55+
}
56+
57+
@Test
58+
public void shouldMergeMultipleToken() {
59+
RubyArray<RubyString> tokens = (RubyArray<RubyString>) sut.extract(context, RubyUtil.RUBY.newString("foo"));
60+
assertTrue(tokens.isEmpty());
61+
62+
tokens = (RubyArray<RubyString>) sut.extract(context, RubyUtil.RUBY.newString("bar\n"));
63+
assertEquals(Arrays.asList("foobar"), tokens);
64+
}
65+
66+
@Test
67+
public void shouldTokenizeMultipleToken() {
68+
RubyArray<RubyString> tokens = (RubyArray<RubyString>) sut.extract(context, RubyUtil.RUBY.newString("foo\nbar\n"));
69+
70+
assertEquals(Arrays.asList("foo", "bar"), tokens);
71+
}
72+
73+
@Test
74+
public void shouldIgnoreEmptyPayload() {
75+
RubyArray<RubyString> tokens = (RubyArray<RubyString>) sut.extract(context, RubyUtil.RUBY.newString(""));
76+
assertTrue(tokens.isEmpty());
77+
78+
tokens = (RubyArray<RubyString>) sut.extract(context, RubyUtil.RUBY.newString("foo\nbar"));
79+
assertEquals(Arrays.asList("foo"), tokens);
80+
}
81+
82+
@Test
83+
public void shouldTokenizeEmptyPayloadWithNewline() {
84+
RubyArray<RubyString> tokens = (RubyArray<RubyString>) sut.extract(context, RubyUtil.RUBY.newString("\n"));
85+
assertEquals(Arrays.asList(""), tokens);
86+
87+
tokens = (RubyArray<RubyString>) sut.extract(context, RubyUtil.RUBY.newString("\n\n\n"));
88+
assertEquals(Arrays.asList("", "", ""), tokens);
89+
}
90+
}
Lines changed: 65 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,65 @@
1+
/*
2+
* Licensed to Elasticsearch B.V. under one or more contributor
3+
* license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright
5+
* ownership. Elasticsearch B.V. licenses this file to you under
6+
* the Apache License, Version 2.0 (the "License"); you may
7+
* not use this file except in compliance with the License.
8+
* You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
20+
package org.logstash.common;
21+
22+
import org.jruby.RubyArray;
23+
import org.jruby.RubyString;
24+
import org.jruby.runtime.ThreadContext;
25+
import org.jruby.runtime.builtin.IRubyObject;
26+
import org.junit.Before;
27+
import org.junit.Test;
28+
import org.logstash.RubyUtil;
29+
30+
import java.util.Arrays;
31+
32+
import static org.junit.Assert.assertEquals;
33+
import static org.junit.Assert.assertTrue;
34+
import static org.logstash.RubyUtil.RUBY;
35+
36+
@SuppressWarnings("unchecked")
37+
public final class BufferedTokenizerExtWithDelimiterTest {
38+
39+
private BufferedTokenizerExt sut;
40+
private ThreadContext context;
41+
42+
@Before
43+
public void setUp() {
44+
sut = new BufferedTokenizerExt(RubyUtil.RUBY, RubyUtil.BUFFERED_TOKENIZER);
45+
context = RUBY.getCurrentContext();
46+
IRubyObject[] args = {RubyUtil.RUBY.newString("||")};
47+
sut.init(context, args);
48+
}
49+
50+
@Test
51+
public void shouldTokenizeMultipleToken() {
52+
RubyArray<RubyString> tokens = (RubyArray<RubyString>) sut.extract(context, RubyUtil.RUBY.newString("foo||b|r||"));
53+
54+
assertEquals(Arrays.asList("foo", "b|r"), tokens);
55+
}
56+
57+
@Test
58+
public void shouldIgnoreEmptyPayload() {
59+
RubyArray<RubyString> tokens = (RubyArray<RubyString>) sut.extract(context, RubyUtil.RUBY.newString(""));
60+
assertTrue(tokens.isEmpty());
61+
62+
tokens = (RubyArray<RubyString>) sut.extract(context, RubyUtil.RUBY.newString("foo||bar"));
63+
assertEquals(Arrays.asList("foo"), tokens);
64+
}
65+
}

0 commit comments

Comments
 (0)