diff --git a/libs/grok/src/main/java/org/elasticsearch/grok/Grok.java b/libs/grok/src/main/java/org/elasticsearch/grok/Grok.java index d383f79b23e2e..f5f5a482bf493 100644 --- a/libs/grok/src/main/java/org/elasticsearch/grok/Grok.java +++ b/libs/grok/src/main/java/org/elasticsearch/grok/Grok.java @@ -73,24 +73,24 @@ public final class Grok { private final Map patternBank; private final boolean namedCaptures; private final Regex compiledExpression; - private final ThreadWatchdog threadWatchdog; + private final MatcherWatchdog matcherWatchdog; public Grok(Map patternBank, String grokPattern) { - this(patternBank, grokPattern, true, ThreadWatchdog.noop()); + this(patternBank, grokPattern, true, MatcherWatchdog.noop()); } - public Grok(Map patternBank, String grokPattern, ThreadWatchdog threadWatchdog) { - this(patternBank, grokPattern, true, threadWatchdog); + public Grok(Map patternBank, String grokPattern, MatcherWatchdog matcherWatchdog) { + this(patternBank, grokPattern, true, matcherWatchdog); } Grok(Map patternBank, String grokPattern, boolean namedCaptures) { - this(patternBank, grokPattern, namedCaptures, ThreadWatchdog.noop()); + this(patternBank, grokPattern, namedCaptures, MatcherWatchdog.noop()); } - private Grok(Map patternBank, String grokPattern, boolean namedCaptures, ThreadWatchdog threadWatchdog) { + private Grok(Map patternBank, String grokPattern, boolean namedCaptures, MatcherWatchdog matcherWatchdog) { this.patternBank = patternBank; this.namedCaptures = namedCaptures; - this.threadWatchdog = threadWatchdog; + this.matcherWatchdog = matcherWatchdog; for (Map.Entry entry : patternBank.entrySet()) { String name = entry.getKey(); @@ -168,14 +168,12 @@ public String toRegex(String grokPattern) { int result; try { - threadWatchdog.register(); - result = matcher.searchInterruptible(0, grokPatternBytes.length, Option.NONE); - } catch (InterruptedException e) { - result = Matcher.INTERRUPTED; + matcherWatchdog.register(matcher); + result = matcher.search(0, grokPatternBytes.length, Option.NONE); } finally { - threadWatchdog.unregister(); + matcherWatchdog.unregister(matcher); } - if (result != -1) { + if (result >= 0) { Region region = matcher.getEagerRegion(); String namedPatternRef = groupMatch(NAME_GROUP, region, grokPattern); String subName = groupMatch(SUBNAME_GROUP, region, grokPattern); @@ -213,18 +211,16 @@ public String toRegex(String grokPattern) { * Checks whether a specific text matches the defined grok expression. * * @param text the string to match - * @return true if grok expression matches text, false otherwise. + * @return true if grok expression matches text or there is a timeout, false otherwise. */ public boolean match(String text) { Matcher matcher = compiledExpression.matcher(text.getBytes(StandardCharsets.UTF_8)); int result; try { - threadWatchdog.register(); - result = matcher.searchInterruptible(0, text.length(), Option.DEFAULT); - } catch (InterruptedException e) { - result = Matcher.INTERRUPTED; + matcherWatchdog.register(matcher); + result = matcher.search(0, text.length(), Option.DEFAULT); } finally { - threadWatchdog.unregister(); + matcherWatchdog.unregister(matcher); } return (result != -1); } @@ -241,16 +237,14 @@ public Map captures(String text) { Matcher matcher = compiledExpression.matcher(textAsBytes); int result; try { - threadWatchdog.register(); - result = matcher.searchInterruptible(0, textAsBytes.length, Option.DEFAULT); - } catch (InterruptedException e) { - result = Matcher.INTERRUPTED; + matcherWatchdog.register(matcher); + result = matcher.search(0, textAsBytes.length, Option.DEFAULT); } finally { - threadWatchdog.unregister(); + matcherWatchdog.unregister(matcher); } if (result == Matcher.INTERRUPTED) { throw new RuntimeException("grok pattern matching was interrupted after [" + - threadWatchdog.maxExecutionTimeInMillis() + "] ms"); + matcherWatchdog.maxExecutionTimeInMillis() + "] ms"); } else if (result == Matcher.FAILED) { // TODO: I think we should throw an error here? return null; diff --git a/libs/grok/src/main/java/org/elasticsearch/grok/ThreadWatchdog.java b/libs/grok/src/main/java/org/elasticsearch/grok/MatcherWatchdog.java similarity index 74% rename from libs/grok/src/main/java/org/elasticsearch/grok/ThreadWatchdog.java rename to libs/grok/src/main/java/org/elasticsearch/grok/MatcherWatchdog.java index 602b3e97635ee..dcac74af1fe94 100644 --- a/libs/grok/src/main/java/org/elasticsearch/grok/ThreadWatchdog.java +++ b/libs/grok/src/main/java/org/elasticsearch/grok/MatcherWatchdog.java @@ -18,6 +18,8 @@ */ package org.elasticsearch.grok; +import org.joni.Matcher; + import java.util.Map; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.atomic.AtomicBoolean; @@ -27,7 +29,7 @@ /** * Protects against long running operations that happen between the register and unregister invocations. - * Threads that invoke {@link #register()}, but take too long to invoke the {@link #unregister()} method + * Threads that invoke {@link #register(Matcher)}, but take too long to invoke the {@link #unregister(Matcher)} method * will be interrupted. * * This is needed for Joni's {@link org.joni.Matcher#search(int, int, int)} method, because @@ -35,28 +37,32 @@ * that for every 30k iterations it checks if the current thread is interrupted and if so * returns {@link org.joni.Matcher#INTERRUPTED}. */ -public interface ThreadWatchdog { - +public interface MatcherWatchdog { + /** - * Registers the current thread and interrupts the current thread - * if the takes too long for this thread to invoke {@link #unregister()}. + * Registers the current matcher and interrupts the this matcher + * if the takes too long for this thread to invoke {@link #unregister(Matcher)}. + * + * @param matcher The matcher to register */ - void register(); - + void register(Matcher matcher); + /** - * @return The maximum allowed time in milliseconds for a thread to invoke {@link #unregister()} - * after {@link #register()} has been invoked before this ThreadWatchDog starts to interrupting that thread. + * @return The maximum allowed time in milliseconds for a thread to invoke {@link #unregister(Matcher)} + * after {@link #register(Matcher)} has been invoked before this ThreadWatchDog starts to interrupting that thread. */ long maxExecutionTimeInMillis(); - + /** - * Unregisters the current thread and prevents it from being interrupted. + * Unregisters the current matcher and prevents it from being interrupted. + * + * @param matcher The matcher to unregister */ - void unregister(); - + void unregister(Matcher matcher); + /** - * Returns an implementation that checks for each fixed interval if there are threads that have invoked {@link #register()} - * and not {@link #unregister()} and have been in this state for longer than the specified max execution interval and + * Returns an implementation that checks for each fixed interval if there are threads that have invoked {@link #register(Matcher)} + * and not {@link #unregister(Matcher)} and have been in this state for longer than the specified max execution interval and * then interrupts these threads. * * @param interval The fixed interval to check if there are threads to interrupt @@ -64,51 +70,51 @@ public interface ThreadWatchdog { * @param relativeTimeSupplier A supplier that returns relative time * @param scheduler A scheduler that is able to execute a command for each fixed interval */ - static ThreadWatchdog newInstance(long interval, + static MatcherWatchdog newInstance(long interval, long maxExecutionTime, LongSupplier relativeTimeSupplier, BiConsumer scheduler) { return new Default(interval, maxExecutionTime, relativeTimeSupplier, scheduler); } - + /** * @return A noop implementation that does not interrupt threads and is useful for testing and pre-defined grok expressions. */ - static ThreadWatchdog noop() { + static MatcherWatchdog noop() { return Noop.INSTANCE; } - - class Noop implements ThreadWatchdog { - + + class Noop implements MatcherWatchdog { + private static final Noop INSTANCE = new Noop(); - + private Noop() { } - + @Override - public void register() { + public void register(Matcher matcher) { } - + @Override public long maxExecutionTimeInMillis() { return Long.MAX_VALUE; } - + @Override - public void unregister() { + public void unregister(Matcher matcher) { } } - - class Default implements ThreadWatchdog { - + + class Default implements MatcherWatchdog { + private final long interval; private final long maxExecutionTime; private final LongSupplier relativeTimeSupplier; private final BiConsumer scheduler; private final AtomicInteger registered = new AtomicInteger(0); private final AtomicBoolean running = new AtomicBoolean(false); - final ConcurrentHashMap registry = new ConcurrentHashMap<>(); - + final ConcurrentHashMap registry = new ConcurrentHashMap<>(); + private Default(long interval, long maxExecutionTime, LongSupplier relativeTimeSupplier, @@ -118,30 +124,30 @@ private Default(long interval, this.relativeTimeSupplier = relativeTimeSupplier; this.scheduler = scheduler; } - - public void register() { + + public void register(Matcher matcher) { registered.getAndIncrement(); - Long previousValue = registry.put(Thread.currentThread(), relativeTimeSupplier.getAsLong()); + Long previousValue = registry.put(matcher, relativeTimeSupplier.getAsLong()); if (running.compareAndSet(false, true) == true) { scheduler.accept(interval, this::interruptLongRunningExecutions); } assert previousValue == null; } - + @Override public long maxExecutionTimeInMillis() { return maxExecutionTime; } - - public void unregister() { - Long previousValue = registry.remove(Thread.currentThread()); + + public void unregister(Matcher matcher) { + Long previousValue = registry.remove(matcher); registered.decrementAndGet(); assert previousValue != null; } - + private void interruptLongRunningExecutions() { final long currentRelativeTime = relativeTimeSupplier.getAsLong(); - for (Map.Entry entry : registry.entrySet()) { + for (Map.Entry entry : registry.entrySet()) { if ((currentRelativeTime - entry.getValue()) > maxExecutionTime) { entry.getKey().interrupt(); // not removing the entry here, this happens in the unregister() method. @@ -153,7 +159,7 @@ private void interruptLongRunningExecutions() { running.set(false); } } - + } - + } diff --git a/libs/grok/src/test/java/org/elasticsearch/grok/GrokTests.java b/libs/grok/src/test/java/org/elasticsearch/grok/GrokTests.java index 10ed049561899..f25d502b6efec 100644 --- a/libs/grok/src/test/java/org/elasticsearch/grok/GrokTests.java +++ b/libs/grok/src/test/java/org/elasticsearch/grok/GrokTests.java @@ -430,7 +430,7 @@ public void testExponentialExpressions() { }); t.start(); }; - Grok grok = new Grok(basePatterns, grokPattern, ThreadWatchdog.newInstance(10, 200, System::currentTimeMillis, scheduler)); + Grok grok = new Grok(basePatterns, grokPattern, MatcherWatchdog.newInstance(10, 200, System::currentTimeMillis, scheduler)); Exception e = expectThrows(RuntimeException.class, () -> grok.captures(logLine)); run.set(false); assertThat(e.getMessage(), equalTo("grok pattern matching was interrupted after [200] ms")); diff --git a/libs/grok/src/test/java/org/elasticsearch/grok/ThreadWatchdogTests.java b/libs/grok/src/test/java/org/elasticsearch/grok/MatcherWatchdogTests.java similarity index 81% rename from libs/grok/src/test/java/org/elasticsearch/grok/ThreadWatchdogTests.java rename to libs/grok/src/test/java/org/elasticsearch/grok/MatcherWatchdogTests.java index 95a50c1fc5c2a..672a507733b7d 100644 --- a/libs/grok/src/test/java/org/elasticsearch/grok/ThreadWatchdogTests.java +++ b/libs/grok/src/test/java/org/elasticsearch/grok/MatcherWatchdogTests.java @@ -24,6 +24,7 @@ import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import org.elasticsearch.test.ESTestCase; +import org.joni.Matcher; import org.mockito.Mockito; import static org.hamcrest.Matchers.is; @@ -31,15 +32,16 @@ import static org.mockito.Matchers.eq; import static org.mockito.Mockito.doAnswer; import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.timeout; import static org.mockito.Mockito.verify; import static org.mockito.Mockito.verifyNoMoreInteractions; import static org.mockito.Mockito.verifyZeroInteractions; -public class ThreadWatchdogTests extends ESTestCase { +public class MatcherWatchdogTests extends ESTestCase { public void testInterrupt() throws Exception { AtomicBoolean run = new AtomicBoolean(true); // to avoid a lingering thread when test has completed - ThreadWatchdog watchdog = ThreadWatchdog.newInstance(10, 100, System::currentTimeMillis, (delay, command) -> { + MatcherWatchdog watchdog = MatcherWatchdog.newInstance(10, 100, System::currentTimeMillis, (delay, command) -> { try { Thread.sleep(delay); } catch (InterruptedException e) { @@ -53,17 +55,17 @@ public void testInterrupt() throws Exception { thread.start(); }); - Map registry = ((ThreadWatchdog.Default) watchdog).registry; + Map registry = ((MatcherWatchdog.Default) watchdog).registry; assertThat(registry.size(), is(0)); // need to call #register() method on a different thread, assertBusy() fails if current thread gets interrupted AtomicBoolean interrupted = new AtomicBoolean(false); Thread thread = new Thread(() -> { - Thread currentThread = Thread.currentThread(); - watchdog.register(); - while (currentThread.isInterrupted() == false) {} + Matcher matcher = mock(Matcher.class); + watchdog.register(matcher); + verify(matcher, timeout(9999).times(1)).interrupt(); interrupted.set(true); while (run.get()) {} // wait here so that the size of the registry can be asserted - watchdog.unregister(); + watchdog.unregister(matcher); }); thread.start(); assertBusy(() -> { @@ -79,7 +81,7 @@ public void testInterrupt() throws Exception { public void testIdleIfNothingRegistered() throws Exception { long interval = 1L; ScheduledExecutorService threadPool = mock(ScheduledExecutorService.class); - ThreadWatchdog watchdog = ThreadWatchdog.newInstance(interval, Long.MAX_VALUE, System::currentTimeMillis, + MatcherWatchdog watchdog = MatcherWatchdog.newInstance(interval, Long.MAX_VALUE, System::currentTimeMillis, (delay, command) -> threadPool.schedule(command, delay, TimeUnit.MILLISECONDS)); // Periodic action is not scheduled because no thread is registered verifyZeroInteractions(threadPool); @@ -91,16 +93,20 @@ public void testIdleIfNothingRegistered() throws Exception { }).when(threadPool).schedule( any(Runnable.class), eq(interval), eq(TimeUnit.MILLISECONDS) ); - watchdog.register(); + Matcher matcher = mock(Matcher.class); + watchdog.register(matcher); // Registering the first thread should have caused the command to get scheduled again Runnable command = commandFuture.get(1L, TimeUnit.MILLISECONDS); Mockito.reset(threadPool); - watchdog.unregister(); + watchdog.unregister(matcher); command.run(); // Periodic action is not scheduled again because no thread is registered verifyZeroInteractions(threadPool); - watchdog.register(); - Thread otherThread = new Thread(watchdog::register); + watchdog.register(matcher); + Thread otherThread = new Thread(() -> { + Matcher otherMatcher = mock(Matcher.class); + watchdog.register(otherMatcher); + }); try { verify(threadPool).schedule(any(Runnable.class), eq(interval), eq(TimeUnit.MILLISECONDS)); // Registering a second thread does not cause the command to get scheduled twice diff --git a/modules/ingest-common/src/main/java/org/elasticsearch/ingest/common/GrokProcessor.java b/modules/ingest-common/src/main/java/org/elasticsearch/ingest/common/GrokProcessor.java index 19883053d2a09..c603da766a65e 100644 --- a/modules/ingest-common/src/main/java/org/elasticsearch/ingest/common/GrokProcessor.java +++ b/modules/ingest-common/src/main/java/org/elasticsearch/ingest/common/GrokProcessor.java @@ -20,7 +20,7 @@ package org.elasticsearch.ingest.common; import org.elasticsearch.grok.Grok; -import org.elasticsearch.grok.ThreadWatchdog; +import org.elasticsearch.grok.MatcherWatchdog; import org.elasticsearch.ingest.AbstractProcessor; import org.elasticsearch.ingest.ConfigurationUtils; import org.elasticsearch.ingest.IngestDocument; @@ -44,11 +44,11 @@ public final class GrokProcessor extends AbstractProcessor { private final boolean ignoreMissing; GrokProcessor(String tag, Map patternBank, List matchPatterns, String matchField, - boolean traceMatch, boolean ignoreMissing, ThreadWatchdog threadWatchdog) { + boolean traceMatch, boolean ignoreMissing, MatcherWatchdog matcherWatchdog) { super(tag); this.matchField = matchField; this.matchPatterns = matchPatterns; - this.grok = new Grok(patternBank, combinePatterns(matchPatterns, traceMatch), threadWatchdog); + this.grok = new Grok(patternBank, combinePatterns(matchPatterns, traceMatch), matcherWatchdog); this.traceMatch = traceMatch; this.ignoreMissing = ignoreMissing; } @@ -133,11 +133,11 @@ static String combinePatterns(List patterns, boolean traceMatch) { public static final class Factory implements Processor.Factory { private final Map builtinPatterns; - private final ThreadWatchdog threadWatchdog; + private final MatcherWatchdog matcherWatchdog; - public Factory(Map builtinPatterns, ThreadWatchdog threadWatchdog) { + public Factory(Map builtinPatterns, MatcherWatchdog matcherWatchdog) { this.builtinPatterns = builtinPatterns; - this.threadWatchdog = threadWatchdog; + this.matcherWatchdog = matcherWatchdog; } @Override @@ -159,7 +159,7 @@ public GrokProcessor create(Map registry, String proc try { return new GrokProcessor(processorTag, patternBank, matchPatterns, matchField, traceMatch, ignoreMissing, - threadWatchdog); + matcherWatchdog); } catch (Exception e) { throw newConfigurationException(TYPE, processorTag, "patterns", "Invalid regex pattern found in: " + matchPatterns + ". " + e.getMessage()); diff --git a/modules/ingest-common/src/main/java/org/elasticsearch/ingest/common/IngestCommonPlugin.java b/modules/ingest-common/src/main/java/org/elasticsearch/ingest/common/IngestCommonPlugin.java index 5ff113a2e01ec..4f99c850e5bd3 100644 --- a/modules/ingest-common/src/main/java/org/elasticsearch/ingest/common/IngestCommonPlugin.java +++ b/modules/ingest-common/src/main/java/org/elasticsearch/ingest/common/IngestCommonPlugin.java @@ -30,7 +30,7 @@ import org.elasticsearch.common.settings.SettingsFilter; import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.grok.Grok; -import org.elasticsearch.grok.ThreadWatchdog; +import org.elasticsearch.grok.MatcherWatchdog; import org.elasticsearch.ingest.DropProcessor; import org.elasticsearch.ingest.PipelineProcessor; import org.elasticsearch.ingest.Processor; @@ -110,10 +110,10 @@ public List> getSettings() { return Arrays.asList(WATCHDOG_INTERVAL, WATCHDOG_MAX_EXECUTION_TIME); } - private static ThreadWatchdog createGrokThreadWatchdog(Processor.Parameters parameters) { + private static MatcherWatchdog createGrokThreadWatchdog(Processor.Parameters parameters) { long intervalMillis = WATCHDOG_INTERVAL.get(parameters.env.settings()).getMillis(); long maxExecutionTimeMillis = WATCHDOG_MAX_EXECUTION_TIME.get(parameters.env.settings()).getMillis(); - return ThreadWatchdog.newInstance(intervalMillis, maxExecutionTimeMillis, + return MatcherWatchdog.newInstance(intervalMillis, maxExecutionTimeMillis, parameters.relativeTimeSupplier, parameters.scheduler::apply); } diff --git a/modules/ingest-common/src/test/java/org/elasticsearch/ingest/common/GrokProcessorFactoryTests.java b/modules/ingest-common/src/test/java/org/elasticsearch/ingest/common/GrokProcessorFactoryTests.java index f35fa34eec46d..c7029012da7d4 100644 --- a/modules/ingest-common/src/test/java/org/elasticsearch/ingest/common/GrokProcessorFactoryTests.java +++ b/modules/ingest-common/src/test/java/org/elasticsearch/ingest/common/GrokProcessorFactoryTests.java @@ -20,7 +20,7 @@ package org.elasticsearch.ingest.common; import org.elasticsearch.ElasticsearchParseException; -import org.elasticsearch.grok.ThreadWatchdog; +import org.elasticsearch.grok.MatcherWatchdog; import org.elasticsearch.test.ESTestCase; import java.util.Collections; @@ -34,7 +34,7 @@ public class GrokProcessorFactoryTests extends ESTestCase { public void testBuild() throws Exception { - GrokProcessor.Factory factory = new GrokProcessor.Factory(Collections.emptyMap(), ThreadWatchdog.noop()); + GrokProcessor.Factory factory = new GrokProcessor.Factory(Collections.emptyMap(), MatcherWatchdog.noop()); Map config = new HashMap<>(); config.put("field", "_field"); @@ -48,7 +48,7 @@ public void testBuild() throws Exception { } public void testBuildWithIgnoreMissing() throws Exception { - GrokProcessor.Factory factory = new GrokProcessor.Factory(Collections.emptyMap(), ThreadWatchdog.noop()); + GrokProcessor.Factory factory = new GrokProcessor.Factory(Collections.emptyMap(), MatcherWatchdog.noop()); Map config = new HashMap<>(); config.put("field", "_field"); @@ -63,7 +63,7 @@ public void testBuildWithIgnoreMissing() throws Exception { } public void testBuildMissingField() throws Exception { - GrokProcessor.Factory factory = new GrokProcessor.Factory(Collections.emptyMap(), ThreadWatchdog.noop()); + GrokProcessor.Factory factory = new GrokProcessor.Factory(Collections.emptyMap(), MatcherWatchdog.noop()); Map config = new HashMap<>(); config.put("patterns", Collections.singletonList("(?\\w+)")); ElasticsearchParseException e = expectThrows(ElasticsearchParseException.class, () -> factory.create(null, null, config)); @@ -71,7 +71,7 @@ public void testBuildMissingField() throws Exception { } public void testBuildMissingPatterns() throws Exception { - GrokProcessor.Factory factory = new GrokProcessor.Factory(Collections.emptyMap(), ThreadWatchdog.noop()); + GrokProcessor.Factory factory = new GrokProcessor.Factory(Collections.emptyMap(), MatcherWatchdog.noop()); Map config = new HashMap<>(); config.put("field", "foo"); ElasticsearchParseException e = expectThrows(ElasticsearchParseException.class, () -> factory.create(null, null, config)); @@ -79,7 +79,7 @@ public void testBuildMissingPatterns() throws Exception { } public void testBuildEmptyPatternsList() throws Exception { - GrokProcessor.Factory factory = new GrokProcessor.Factory(Collections.emptyMap(), ThreadWatchdog.noop()); + GrokProcessor.Factory factory = new GrokProcessor.Factory(Collections.emptyMap(), MatcherWatchdog.noop()); Map config = new HashMap<>(); config.put("field", "foo"); config.put("patterns", Collections.emptyList()); @@ -88,7 +88,7 @@ public void testBuildEmptyPatternsList() throws Exception { } public void testCreateWithCustomPatterns() throws Exception { - GrokProcessor.Factory factory = new GrokProcessor.Factory(Collections.emptyMap(), ThreadWatchdog.noop()); + GrokProcessor.Factory factory = new GrokProcessor.Factory(Collections.emptyMap(), MatcherWatchdog.noop()); Map config = new HashMap<>(); config.put("field", "_field"); @@ -101,7 +101,7 @@ public void testCreateWithCustomPatterns() throws Exception { } public void testCreateWithInvalidPattern() throws Exception { - GrokProcessor.Factory factory = new GrokProcessor.Factory(Collections.emptyMap(), ThreadWatchdog.noop()); + GrokProcessor.Factory factory = new GrokProcessor.Factory(Collections.emptyMap(), MatcherWatchdog.noop()); Map config = new HashMap<>(); config.put("field", "_field"); config.put("patterns", Collections.singletonList("[")); @@ -110,7 +110,7 @@ public void testCreateWithInvalidPattern() throws Exception { } public void testCreateWithInvalidPatternDefinition() throws Exception { - GrokProcessor.Factory factory = new GrokProcessor.Factory(Collections.emptyMap(), ThreadWatchdog.noop()); + GrokProcessor.Factory factory = new GrokProcessor.Factory(Collections.emptyMap(), MatcherWatchdog.noop()); Map config = new HashMap<>(); config.put("field", "_field"); config.put("patterns", Collections.singletonList("%{MY_PATTERN:name}!")); diff --git a/modules/ingest-common/src/test/java/org/elasticsearch/ingest/common/GrokProcessorTests.java b/modules/ingest-common/src/test/java/org/elasticsearch/ingest/common/GrokProcessorTests.java index 68654923ae926..159115f4e38b5 100644 --- a/modules/ingest-common/src/test/java/org/elasticsearch/ingest/common/GrokProcessorTests.java +++ b/modules/ingest-common/src/test/java/org/elasticsearch/ingest/common/GrokProcessorTests.java @@ -19,7 +19,7 @@ package org.elasticsearch.ingest.common; -import org.elasticsearch.grok.ThreadWatchdog; +import org.elasticsearch.grok.MatcherWatchdog; import org.elasticsearch.ingest.IngestDocument; import org.elasticsearch.ingest.RandomDocumentPicks; import org.elasticsearch.test.ESTestCase; @@ -40,7 +40,7 @@ public void testMatch() throws Exception { IngestDocument doc = RandomDocumentPicks.randomIngestDocument(random(), new HashMap<>()); doc.setFieldValue(fieldName, "1"); GrokProcessor processor = new GrokProcessor(randomAlphaOfLength(10), Collections.singletonMap("ONE", "1"), - Collections.singletonList("%{ONE:one}"), fieldName, false, false, ThreadWatchdog.noop()); + Collections.singletonList("%{ONE:one}"), fieldName, false, false, MatcherWatchdog.noop()); processor.execute(doc); assertThat(doc.getFieldValue("one", String.class), equalTo("1")); } @@ -50,7 +50,7 @@ public void testNoMatch() { IngestDocument doc = RandomDocumentPicks.randomIngestDocument(random(), new HashMap<>()); doc.setFieldValue(fieldName, "23"); GrokProcessor processor = new GrokProcessor(randomAlphaOfLength(10), Collections.singletonMap("ONE", "1"), - Collections.singletonList("%{ONE:one}"), fieldName, false, false, ThreadWatchdog.noop()); + Collections.singletonList("%{ONE:one}"), fieldName, false, false, MatcherWatchdog.noop()); Exception e = expectThrows(Exception.class, () -> processor.execute(doc)); assertThat(e.getMessage(), equalTo("Provided Grok expressions do not match field value: [23]")); } @@ -61,7 +61,7 @@ public void testNoMatchingPatternName() { doc.setFieldValue(fieldName, "23"); Exception e = expectThrows(IllegalArgumentException.class, () -> new GrokProcessor(randomAlphaOfLength(10), Collections.singletonMap("ONE", "1"), Collections.singletonList("%{NOTONE:not_one}"), fieldName, - false, false, ThreadWatchdog.noop())); + false, false, MatcherWatchdog.noop())); assertThat(e.getMessage(), equalTo("Unable to find pattern [NOTONE] in Grok's pattern dictionary")); } @@ -71,7 +71,7 @@ public void testMatchWithoutCaptures() throws Exception { originalDoc.setFieldValue(fieldName, fieldName); IngestDocument doc = new IngestDocument(originalDoc); GrokProcessor processor = new GrokProcessor(randomAlphaOfLength(10), Collections.emptyMap(), - Collections.singletonList(fieldName), fieldName, false, false, ThreadWatchdog.noop()); + Collections.singletonList(fieldName), fieldName, false, false, MatcherWatchdog.noop()); processor.execute(doc); assertThat(doc, equalTo(originalDoc)); } @@ -81,7 +81,7 @@ public void testNullField() { IngestDocument doc = RandomDocumentPicks.randomIngestDocument(random(), new HashMap<>()); doc.setFieldValue(fieldName, null); GrokProcessor processor = new GrokProcessor(randomAlphaOfLength(10), Collections.singletonMap("ONE", "1"), - Collections.singletonList("%{ONE:one}"), fieldName, false, false, ThreadWatchdog.noop()); + Collections.singletonList("%{ONE:one}"), fieldName, false, false, MatcherWatchdog.noop()); Exception e = expectThrows(Exception.class, () -> processor.execute(doc)); assertThat(e.getMessage(), equalTo("field [" + fieldName + "] is null, cannot process it.")); } @@ -92,7 +92,7 @@ public void testNullFieldWithIgnoreMissing() throws Exception { originalIngestDocument.setFieldValue(fieldName, null); IngestDocument ingestDocument = new IngestDocument(originalIngestDocument); GrokProcessor processor = new GrokProcessor(randomAlphaOfLength(10), Collections.singletonMap("ONE", "1"), - Collections.singletonList("%{ONE:one}"), fieldName, false, true, ThreadWatchdog.noop()); + Collections.singletonList("%{ONE:one}"), fieldName, false, true, MatcherWatchdog.noop()); processor.execute(ingestDocument); assertIngestDocument(originalIngestDocument, ingestDocument); } @@ -102,7 +102,7 @@ public void testNotStringField() { IngestDocument doc = RandomDocumentPicks.randomIngestDocument(random(), new HashMap<>()); doc.setFieldValue(fieldName, 1); GrokProcessor processor = new GrokProcessor(randomAlphaOfLength(10), Collections.singletonMap("ONE", "1"), - Collections.singletonList("%{ONE:one}"), fieldName, false, false, ThreadWatchdog.noop()); + Collections.singletonList("%{ONE:one}"), fieldName, false, false, MatcherWatchdog.noop()); Exception e = expectThrows(Exception.class, () -> processor.execute(doc)); assertThat(e.getMessage(), equalTo("field [" + fieldName + "] of type [java.lang.Integer] cannot be cast to [java.lang.String]")); } @@ -112,7 +112,7 @@ public void testNotStringFieldWithIgnoreMissing() { IngestDocument doc = RandomDocumentPicks.randomIngestDocument(random(), new HashMap<>()); doc.setFieldValue(fieldName, 1); GrokProcessor processor = new GrokProcessor(randomAlphaOfLength(10), Collections.singletonMap("ONE", "1"), - Collections.singletonList("%{ONE:one}"), fieldName, false, true, ThreadWatchdog.noop()); + Collections.singletonList("%{ONE:one}"), fieldName, false, true, MatcherWatchdog.noop()); Exception e = expectThrows(Exception.class, () -> processor.execute(doc)); assertThat(e.getMessage(), equalTo("field [" + fieldName + "] of type [java.lang.Integer] cannot be cast to [java.lang.String]")); } @@ -121,7 +121,7 @@ public void testMissingField() { String fieldName = "foo.bar"; IngestDocument doc = RandomDocumentPicks.randomIngestDocument(random(), new HashMap<>()); GrokProcessor processor = new GrokProcessor(randomAlphaOfLength(10), Collections.singletonMap("ONE", "1"), - Collections.singletonList("%{ONE:one}"), fieldName, false, false, ThreadWatchdog.noop()); + Collections.singletonList("%{ONE:one}"), fieldName, false, false, MatcherWatchdog.noop()); Exception e = expectThrows(Exception.class, () -> processor.execute(doc)); assertThat(e.getMessage(), equalTo("field [foo] not present as part of path [foo.bar]")); } @@ -131,7 +131,7 @@ public void testMissingFieldWithIgnoreMissing() throws Exception { IngestDocument originalIngestDocument = RandomDocumentPicks.randomIngestDocument(random(), new HashMap<>()); IngestDocument ingestDocument = new IngestDocument(originalIngestDocument); GrokProcessor processor = new GrokProcessor(randomAlphaOfLength(10), Collections.singletonMap("ONE", "1"), - Collections.singletonList("%{ONE:one}"), fieldName, false, true, ThreadWatchdog.noop()); + Collections.singletonList("%{ONE:one}"), fieldName, false, true, MatcherWatchdog.noop()); processor.execute(ingestDocument); assertIngestDocument(originalIngestDocument, ingestDocument); } @@ -145,7 +145,7 @@ public void testMultiplePatternsWithMatchReturn() throws Exception { patternBank.put("TWO", "2"); patternBank.put("THREE", "3"); GrokProcessor processor = new GrokProcessor(randomAlphaOfLength(10), patternBank, - Arrays.asList("%{ONE:one}", "%{TWO:two}", "%{THREE:three}"), fieldName, false, false, ThreadWatchdog.noop()); + Arrays.asList("%{ONE:one}", "%{TWO:two}", "%{THREE:three}"), fieldName, false, false, MatcherWatchdog.noop()); processor.execute(doc); assertThat(doc.hasField("one"), equalTo(false)); assertThat(doc.getFieldValue("two", String.class), equalTo("2")); @@ -161,7 +161,7 @@ public void testSetMetadata() throws Exception { patternBank.put("TWO", "2"); patternBank.put("THREE", "3"); GrokProcessor processor = new GrokProcessor(randomAlphaOfLength(10), patternBank, - Arrays.asList("%{ONE:one}", "%{TWO:two}", "%{THREE:three}"), fieldName, true, false, ThreadWatchdog.noop()); + Arrays.asList("%{ONE:one}", "%{TWO:two}", "%{THREE:three}"), fieldName, true, false, MatcherWatchdog.noop()); processor.execute(doc); assertThat(doc.hasField("one"), equalTo(false)); assertThat(doc.getFieldValue("two", String.class), equalTo("2")); @@ -176,7 +176,7 @@ public void testTraceWithOnePattern() throws Exception { Map patternBank = new HashMap<>(); patternBank.put("ONE", "1"); GrokProcessor processor = new GrokProcessor(randomAlphaOfLength(10), patternBank, - Arrays.asList("%{ONE:one}"), fieldName, true, false, ThreadWatchdog.noop()); + Arrays.asList("%{ONE:one}"), fieldName, true, false, MatcherWatchdog.noop()); processor.execute(doc); assertThat(doc.hasField("one"), equalTo(true)); assertThat(doc.getFieldValue("_ingest._grok_match_index", String.class), equalTo("0")); @@ -207,7 +207,7 @@ public void testCombineSamePatternNameAcrossPatterns() throws Exception { patternBank.put("TWO", "2"); patternBank.put("THREE", "3"); GrokProcessor processor = new GrokProcessor(randomAlphaOfLength(10), patternBank, Arrays.asList("%{ONE:first}-%{TWO:second}", - "%{ONE:first}-%{THREE:second}"), fieldName, randomBoolean(), randomBoolean(), ThreadWatchdog.noop()); + "%{ONE:first}-%{THREE:second}"), fieldName, randomBoolean(), randomBoolean(), MatcherWatchdog.noop()); processor.execute(doc); assertThat(doc.getFieldValue("first", String.class), equalTo("1")); assertThat(doc.getFieldValue("second", String.class), equalTo("3")); @@ -221,7 +221,7 @@ public void testFirstWinNamedCapture() throws Exception { patternBank.put("ONETWO", "1|2"); GrokProcessor processor = new GrokProcessor(randomAlphaOfLength(10), patternBank, Collections.singletonList("%{ONETWO:first}%{ONETWO:first}"), fieldName, randomBoolean(), randomBoolean(), - ThreadWatchdog.noop()); + MatcherWatchdog.noop()); processor.execute(doc); assertThat(doc.getFieldValue("first", String.class), equalTo("1")); } @@ -235,7 +235,7 @@ public void testUnmatchedNamesNotIncludedInDocument() throws Exception { patternBank.put("THREE", "3"); GrokProcessor processor = new GrokProcessor(randomAlphaOfLength(10), patternBank, Collections.singletonList("%{ONETWO:first}|%{THREE:second}"), fieldName, randomBoolean(), randomBoolean(), - ThreadWatchdog.noop()); + MatcherWatchdog.noop()); processor.execute(doc); assertFalse(doc.hasField("first")); assertThat(doc.getFieldValue("second", String.class), equalTo("3")); diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/filestructurefinder/TimeoutChecker.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/filestructurefinder/TimeoutChecker.java index 97eb6146a6654..a943e7b7a7349 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/filestructurefinder/TimeoutChecker.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/filestructurefinder/TimeoutChecker.java @@ -6,15 +6,17 @@ package org.elasticsearch.xpack.ml.filestructurefinder; import org.elasticsearch.ElasticsearchTimeoutException; -import org.elasticsearch.common.collect.Tuple; import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.common.util.concurrent.FutureUtils; import org.elasticsearch.grok.Grok; -import org.elasticsearch.grok.ThreadWatchdog; +import org.elasticsearch.grok.MatcherWatchdog; +import org.joni.Matcher; import java.io.Closeable; +import java.util.Collection; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.CopyOnWriteArrayList; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.ScheduledFuture; import java.util.concurrent.TimeUnit; @@ -38,7 +40,7 @@ public class TimeoutChecker implements Closeable { private static final TimeoutCheckerWatchdog timeoutCheckerWatchdog = new TimeoutCheckerWatchdog(); - public static final ThreadWatchdog watchdog = timeoutCheckerWatchdog; + public static final MatcherWatchdog watchdog = timeoutCheckerWatchdog; private final String operation; private final TimeValue timeout; @@ -122,51 +124,68 @@ private synchronized void setTimeoutExceeded() { /** * An implementation of the type of watchdog used by the {@link Grok} class to interrupt * matching operations that take too long. Rather than have a timeout per match operation - * like the {@link ThreadWatchdog.Default} implementation, the interruption is governed by + * like the {@link MatcherWatchdog.Default} implementation, the interruption is governed by * a {@link TimeoutChecker} associated with the thread doing the matching. */ - static class TimeoutCheckerWatchdog implements ThreadWatchdog { + static class TimeoutCheckerWatchdog implements MatcherWatchdog { - final ConcurrentHashMap> registry = new ConcurrentHashMap<>(); + final ConcurrentHashMap registry = new ConcurrentHashMap<>(); void add(Thread thread, TimeValue timeout) { - Tuple previousValue = registry.put(thread, new Tuple<>(new AtomicBoolean(false), timeout)); + WatchDogEntry previousValue = registry.put(thread, new WatchDogEntry(timeout)); assert previousValue == null; } @Override - public void register() { - Tuple value = registry.get(Thread.currentThread()); + public void register(Matcher matcher) { + WatchDogEntry value = registry.get(Thread.currentThread()); if (value != null) { - boolean wasFalse = value.v1().compareAndSet(false, true); + boolean wasFalse = value.registered.compareAndSet(false, true); assert wasFalse; + value.matchers.add(matcher); } } @Override public long maxExecutionTimeInMillis() { - Tuple value = registry.get(Thread.currentThread()); - return value != null ? value.v2().getMillis() : Long.MAX_VALUE; + WatchDogEntry value = registry.get(Thread.currentThread()); + return value != null ? value.timeout.getMillis() : Long.MAX_VALUE; } @Override - public void unregister() { - Tuple value = registry.get(Thread.currentThread()); + public void unregister(Matcher matcher) { + WatchDogEntry value = registry.get(Thread.currentThread()); if (value != null) { - boolean wasTrue = value.v1().compareAndSet(true, false); + boolean wasTrue = value.registered.compareAndSet(true, false); assert wasTrue; + value.matchers.remove(matcher); } } void remove(Thread thread) { - Tuple previousValue = registry.remove(thread); + WatchDogEntry previousValue = registry.remove(thread); assert previousValue != null; } void interruptLongRunningThreadIfRegistered(Thread thread) { - Tuple value = registry.get(thread); - if (value.v1().get()) { - thread.interrupt(); + WatchDogEntry value = registry.get(thread); + if (value.registered.get()) { + for (Matcher matcher : value.matchers) { + matcher.interrupt(); + } + } + } + + static class WatchDogEntry { + + final TimeValue timeout; + final AtomicBoolean registered; + final Collection matchers; + + WatchDogEntry(TimeValue timeout) { + this.timeout = timeout; + this.registered = new AtomicBoolean(false); + this.matchers = new CopyOnWriteArrayList<>(); } } } diff --git a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/filestructurefinder/TimeoutCheckerTests.java b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/filestructurefinder/TimeoutCheckerTests.java index 2770656279cff..63346de64c266 100644 --- a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/filestructurefinder/TimeoutCheckerTests.java +++ b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/filestructurefinder/TimeoutCheckerTests.java @@ -9,11 +9,16 @@ import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.grok.Grok; import org.elasticsearch.threadpool.Scheduler; +import org.joni.Matcher; import org.junit.After; import org.junit.Before; import java.util.concurrent.ScheduledExecutorService; +import static org.hamcrest.Matchers.equalTo; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.verify; + public class TimeoutCheckerTests extends FileStructureTestCase { private ScheduledExecutorService scheduler; @@ -29,15 +34,12 @@ public void shutdownScheduler() { } public void testCheckNoTimeout() { - NOOP_TIMEOUT_CHECKER.check("should never happen"); } public void testCheckTimeoutNotExceeded() throws InterruptedException { - TimeValue timeout = TimeValue.timeValueSeconds(10); try (TimeoutChecker timeoutChecker = new TimeoutChecker("timeout not exceeded test", timeout, scheduler)) { - for (int count = 0; count < 10; ++count) { timeoutChecker.check("should not timeout"); Thread.sleep(randomIntBetween(1, 10)); @@ -46,10 +48,8 @@ public void testCheckTimeoutNotExceeded() throws InterruptedException { } public void testCheckTimeoutExceeded() throws Exception { - TimeValue timeout = TimeValue.timeValueMillis(10); try (TimeoutChecker timeoutChecker = new TimeoutChecker("timeout exceeded test", timeout, scheduler)) { - assertBusy(() -> { ElasticsearchTimeoutException e = expectThrows(ElasticsearchTimeoutException.class, () -> timeoutChecker.check("should timeout")); @@ -59,30 +59,27 @@ public void testCheckTimeoutExceeded() throws Exception { } } - public void testWatchdog() { - - assertFalse(Thread.interrupted()); - - TimeValue timeout = TimeValue.timeValueMillis(100); + public void testWatchdog() throws Exception { + TimeValue timeout = TimeValue.timeValueMillis(500); try (TimeoutChecker timeoutChecker = new TimeoutChecker("watchdog test", timeout, scheduler)) { + TimeoutChecker.TimeoutCheckerWatchdog watchdog = (TimeoutChecker.TimeoutCheckerWatchdog) TimeoutChecker.watchdog; - TimeoutChecker.watchdog.register(); + Matcher matcher = mock(Matcher.class); + TimeoutChecker.watchdog.register(matcher); + assertThat(watchdog.registry.get(Thread.currentThread()).matchers.size(), equalTo(1)); try { - expectThrows(InterruptedException.class, () -> Thread.sleep(10000)); + assertBusy(() -> { + verify(matcher).interrupt(); + }); } finally { - TimeoutChecker.watchdog.unregister(); + TimeoutChecker.watchdog.unregister(matcher); + assertThat(watchdog.registry.get(Thread.currentThread()).matchers.size(), equalTo(0)); } - } finally { - // ensure the interrupted flag is cleared to stop it making subsequent tests fail - Thread.interrupted(); } } public void testGrokCaptures() throws Exception { - - assertFalse(Thread.interrupted()); Grok grok = new Grok(Grok.getBuiltinPatterns(), "{%DATA:data}{%GREEDYDATA:greedydata}", TimeoutChecker.watchdog); - TimeValue timeout = TimeValue.timeValueMillis(1); try (TimeoutChecker timeoutChecker = new TimeoutChecker("grok captures test", timeout, scheduler)) { @@ -92,9 +89,6 @@ public void testGrokCaptures() throws Exception { assertEquals("Aborting grok captures test during [should timeout] as it has taken longer than the timeout of [" + timeout + "]", e.getMessage()); }); - } finally { - // ensure the interrupted flag is cleared to stop it making subsequent tests fail - Thread.interrupted(); } } }