Skip to content

Commit 41736dd

Browse files
authored
[ML] retry bulk indexing of state docs (#50149) (#50185)
This exchanges the direct use of the `Client` for `ResultsPersisterService`. State doc persistence will now retry. Failures to persist state will still not throw, but will be audited and logged.
1 parent fe3c9e7 commit 41736dd

File tree

6 files changed

+80
-45
lines changed

6 files changed

+80
-45
lines changed

x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/MachineLearning.java

+10-4
Original file line numberDiff line numberDiff line change
@@ -560,11 +560,17 @@ public Collection<Object> createComponents(Client client, ClusterService cluster
560560
environment,
561561
settings,
562562
nativeController,
563-
client,
564-
clusterService);
563+
clusterService,
564+
resultsPersisterService,
565+
anomalyDetectionAuditor);
565566
normalizerProcessFactory = new NativeNormalizerProcessFactory(environment, nativeController, clusterService);
566-
analyticsProcessFactory = new NativeAnalyticsProcessFactory(environment, client, nativeController, clusterService,
567-
xContentRegistry);
567+
analyticsProcessFactory = new NativeAnalyticsProcessFactory(
568+
environment,
569+
nativeController,
570+
clusterService,
571+
xContentRegistry,
572+
resultsPersisterService,
573+
dataFrameAnalyticsAuditor);
568574
memoryEstimationProcessFactory =
569575
new NativeMemoryUsageEstimationProcessFactory(environment, nativeController, clusterService);
570576
} catch (IOException e) {

x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/dataframe/process/NativeAnalyticsProcessFactory.java

+13-6
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,6 @@
77

88
import org.apache.logging.log4j.LogManager;
99
import org.apache.logging.log4j.Logger;
10-
import org.elasticsearch.client.Client;
1110
import org.elasticsearch.cluster.service.ClusterService;
1211
import org.elasticsearch.common.Nullable;
1312
import org.elasticsearch.common.bytes.BytesReference;
@@ -20,10 +19,12 @@
2019
import org.elasticsearch.xpack.core.ml.utils.ExceptionsHelper;
2120
import org.elasticsearch.xpack.ml.MachineLearning;
2221
import org.elasticsearch.xpack.ml.dataframe.process.results.AnalyticsResult;
22+
import org.elasticsearch.xpack.ml.notifications.DataFrameAnalyticsAuditor;
2323
import org.elasticsearch.xpack.ml.process.IndexingStateProcessor;
2424
import org.elasticsearch.xpack.ml.process.NativeController;
2525
import org.elasticsearch.xpack.ml.process.ProcessPipes;
2626
import org.elasticsearch.xpack.ml.utils.NamedPipeHelper;
27+
import org.elasticsearch.xpack.ml.utils.persistence.ResultsPersisterService;
2728

2829
import java.io.IOException;
2930
import java.nio.file.Path;
@@ -40,18 +41,24 @@ public class NativeAnalyticsProcessFactory implements AnalyticsProcessFactory<An
4041

4142
private static final NamedPipeHelper NAMED_PIPE_HELPER = new NamedPipeHelper();
4243

43-
private final Client client;
4444
private final Environment env;
4545
private final NativeController nativeController;
4646
private final NamedXContentRegistry namedXContentRegistry;
47+
private final ResultsPersisterService resultsPersisterService;
48+
private final DataFrameAnalyticsAuditor auditor;
4749
private volatile Duration processConnectTimeout;
4850

49-
public NativeAnalyticsProcessFactory(Environment env, Client client, NativeController nativeController, ClusterService clusterService,
50-
NamedXContentRegistry namedXContentRegistry) {
51+
public NativeAnalyticsProcessFactory(Environment env,
52+
NativeController nativeController,
53+
ClusterService clusterService,
54+
NamedXContentRegistry namedXContentRegistry,
55+
ResultsPersisterService resultsPersisterService,
56+
DataFrameAnalyticsAuditor auditor) {
5157
this.env = Objects.requireNonNull(env);
52-
this.client = Objects.requireNonNull(client);
5358
this.nativeController = Objects.requireNonNull(nativeController);
5459
this.namedXContentRegistry = Objects.requireNonNull(namedXContentRegistry);
60+
this.auditor = auditor;
61+
this.resultsPersisterService = resultsPersisterService;
5562
setProcessConnectTimeout(MachineLearning.PROCESS_CONNECT_TIMEOUT.get(env.settings()));
5663
clusterService.getClusterSettings().addSettingsUpdateConsumer(MachineLearning.PROCESS_CONNECT_TIMEOUT,
5764
this::setProcessConnectTimeout);
@@ -96,7 +103,7 @@ public NativeAnalyticsProcess createAnalyticsProcess(DataFrameAnalyticsConfig co
96103
private void startProcess(DataFrameAnalyticsConfig config, ExecutorService executorService, ProcessPipes processPipes,
97104
NativeAnalyticsProcess process) {
98105
if (config.getAnalysis().persistsState()) {
99-
IndexingStateProcessor stateProcessor = new IndexingStateProcessor(client, config.getId());
106+
IndexingStateProcessor stateProcessor = new IndexingStateProcessor(config.getId(), resultsPersisterService, auditor);
100107
process.start(executorService, stateProcessor, processPipes.getPersistStream().get());
101108
} else {
102109
process.start(executorService);

x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/NativeAutodetectProcessFactory.java

+13-6
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,6 @@
77

88
import org.apache.logging.log4j.LogManager;
99
import org.apache.logging.log4j.Logger;
10-
import org.elasticsearch.client.Client;
1110
import org.elasticsearch.cluster.service.ClusterService;
1211
import org.elasticsearch.common.settings.Settings;
1312
import org.elasticsearch.common.unit.TimeValue;
@@ -20,11 +19,13 @@
2019
import org.elasticsearch.xpack.ml.MachineLearning;
2120
import org.elasticsearch.xpack.ml.job.process.autodetect.params.AutodetectParams;
2221
import org.elasticsearch.xpack.ml.job.results.AutodetectResult;
22+
import org.elasticsearch.xpack.ml.notifications.AnomalyDetectionAuditor;
2323
import org.elasticsearch.xpack.ml.process.IndexingStateProcessor;
2424
import org.elasticsearch.xpack.ml.process.NativeController;
2525
import org.elasticsearch.xpack.ml.process.ProcessPipes;
2626
import org.elasticsearch.xpack.ml.process.ProcessResultsParser;
2727
import org.elasticsearch.xpack.ml.utils.NamedPipeHelper;
28+
import org.elasticsearch.xpack.ml.utils.persistence.ResultsPersisterService;
2829

2930
import java.io.IOException;
3031
import java.nio.file.Path;
@@ -40,20 +41,26 @@ public class NativeAutodetectProcessFactory implements AutodetectProcessFactory
4041
private static final Logger LOGGER = LogManager.getLogger(NativeAutodetectProcessFactory.class);
4142
private static final NamedPipeHelper NAMED_PIPE_HELPER = new NamedPipeHelper();
4243

43-
private final Client client;
4444
private final Environment env;
4545
private final Settings settings;
4646
private final NativeController nativeController;
4747
private final ClusterService clusterService;
48+
private final ResultsPersisterService resultsPersisterService;
49+
private final AnomalyDetectionAuditor auditor;
4850
private volatile Duration processConnectTimeout;
4951

50-
public NativeAutodetectProcessFactory(Environment env, Settings settings, NativeController nativeController, Client client,
51-
ClusterService clusterService) {
52+
public NativeAutodetectProcessFactory(Environment env,
53+
Settings settings,
54+
NativeController nativeController,
55+
ClusterService clusterService,
56+
ResultsPersisterService resultsPersisterService,
57+
AnomalyDetectionAuditor auditor) {
5258
this.env = Objects.requireNonNull(env);
5359
this.settings = Objects.requireNonNull(settings);
5460
this.nativeController = Objects.requireNonNull(nativeController);
55-
this.client = client;
5661
this.clusterService = clusterService;
62+
this.resultsPersisterService = resultsPersisterService;
63+
this.auditor = auditor;
5764
setProcessConnectTimeout(MachineLearning.PROCESS_CONNECT_TIMEOUT.get(settings));
5865
clusterService.getClusterSettings().addSettingsUpdateConsumer(MachineLearning.PROCESS_CONNECT_TIMEOUT,
5966
this::setProcessConnectTimeout);
@@ -78,7 +85,7 @@ public AutodetectProcess createAutodetectProcess(Job job,
7885
// The extra 1 is the control field
7986
int numberOfFields = job.allInputFields().size() + (includeTokensField ? 1 : 0) + 1;
8087

81-
IndexingStateProcessor stateProcessor = new IndexingStateProcessor(client, job.getId());
88+
IndexingStateProcessor stateProcessor = new IndexingStateProcessor(job.getId(), resultsPersisterService, auditor);
8289
ProcessResultsParser<AutodetectResult> resultsParser = new ProcessResultsParser<>(AutodetectResult.PARSER,
8390
NamedXContentRegistry.EMPTY);
8491
NativeAutodetectProcess autodetect = new NativeAutodetectProcess(

x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/process/IndexingStateProcessor.java

+20-8
Original file line numberDiff line numberDiff line change
@@ -7,21 +7,22 @@
77

88
import org.apache.logging.log4j.LogManager;
99
import org.apache.logging.log4j.Logger;
10+
import org.apache.logging.log4j.message.ParameterizedMessage;
1011
import org.elasticsearch.action.bulk.BulkRequest;
11-
import org.elasticsearch.client.Client;
1212
import org.elasticsearch.common.bytes.BytesArray;
1313
import org.elasticsearch.common.bytes.BytesReference;
1414
import org.elasticsearch.common.bytes.CompositeBytesReference;
15-
import org.elasticsearch.common.util.concurrent.ThreadContext;
1615
import org.elasticsearch.common.xcontent.XContentType;
16+
import org.elasticsearch.xpack.core.common.notifications.AbstractAuditMessage;
17+
import org.elasticsearch.xpack.core.common.notifications.AbstractAuditor;
1718
import org.elasticsearch.xpack.core.ml.job.persistence.AnomalyDetectorsIndex;
19+
import org.elasticsearch.xpack.ml.utils.persistence.ResultsPersisterService;
1820

1921
import java.io.IOException;
2022
import java.io.InputStream;
2123
import java.util.ArrayList;
2224
import java.util.List;
2325

24-
import static org.elasticsearch.xpack.core.ClientHelper.ML_ORIGIN;
2526

2627
/**
2728
* Reads state documents of a stream, splits them and persists to an index via a bulk request
@@ -32,12 +33,16 @@ public class IndexingStateProcessor implements StateProcessor {
3233

3334
private static final int READ_BUF_SIZE = 8192;
3435

35-
private final Client client;
3636
private final String jobId;
37+
private final AbstractAuditor<? extends AbstractAuditMessage> auditor;
38+
private final ResultsPersisterService resultsPersisterService;
3739

38-
public IndexingStateProcessor(Client client, String jobId) {
39-
this.client = client;
40+
public IndexingStateProcessor(String jobId,
41+
ResultsPersisterService resultsPersisterService,
42+
AbstractAuditor<? extends AbstractAuditMessage> auditor) {
4043
this.jobId = jobId;
44+
this.resultsPersisterService = resultsPersisterService;
45+
this.auditor = auditor;
4146
}
4247

4348
@Override
@@ -98,8 +103,15 @@ void persist(BytesReference bytes) throws IOException {
98103
bulkRequest.add(bytes, AnomalyDetectorsIndex.jobStateIndexWriteAlias(), XContentType.JSON);
99104
if (bulkRequest.numberOfActions() > 0) {
100105
LOGGER.trace("[{}] Persisting job state document", jobId);
101-
try (ThreadContext.StoredContext ignore = client.threadPool().getThreadContext().stashWithOrigin(ML_ORIGIN)) {
102-
client.bulk(bulkRequest).actionGet();
106+
try {
107+
resultsPersisterService.bulkIndexWithRetry(bulkRequest,
108+
jobId,
109+
() -> true,
110+
(msg) -> auditor.warning(jobId, "Bulk indexing of state failed " + msg));
111+
} catch (Exception ex) {
112+
String msg = "failed indexing updated state docs";
113+
LOGGER.error(() -> new ParameterizedMessage("[{}] {}", jobId, msg), ex);
114+
auditor.error(jobId, msg + " error: " + ex.getMessage());
103115
}
104116
}
105117
}

x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/process/autodetect/NativeAutodetectProcessFactoryTests.java

+11-4
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,6 @@
55
*/
66
package org.elasticsearch.xpack.ml.job.process.autodetect;
77

8-
import org.elasticsearch.client.Client;
98
import org.elasticsearch.cluster.service.ClusterService;
109
import org.elasticsearch.common.settings.ClusterSettings;
1110
import org.elasticsearch.common.settings.Settings;
@@ -17,8 +16,10 @@
1716
import org.elasticsearch.xpack.core.ml.job.config.Job;
1817
import org.elasticsearch.xpack.ml.MachineLearning;
1918
import org.elasticsearch.xpack.ml.job.process.autodetect.params.AutodetectParams;
19+
import org.elasticsearch.xpack.ml.notifications.AnomalyDetectionAuditor;
2020
import org.elasticsearch.xpack.ml.process.NativeController;
2121
import org.elasticsearch.xpack.ml.process.ProcessPipes;
22+
import org.elasticsearch.xpack.ml.utils.persistence.ResultsPersisterService;
2223

2324
import java.io.IOException;
2425
import java.time.Duration;
@@ -41,7 +42,8 @@ public void testSetProcessConnectTimeout() throws IOException {
4142
.build();
4243
Environment env = TestEnvironment.newEnvironment(settings);
4344
NativeController nativeController = mock(NativeController.class);
44-
Client client = mock(Client.class);
45+
ResultsPersisterService resultsPersisterService = mock(ResultsPersisterService.class);
46+
AnomalyDetectionAuditor anomalyDetectionAuditor = mock(AnomalyDetectionAuditor.class);
4547
ClusterSettings clusterSettings = new ClusterSettings(settings,
4648
Sets.newHashSet(MachineLearning.PROCESS_CONNECT_TIMEOUT, AutodetectBuilder.MAX_ANOMALY_RECORDS_SETTING_DYNAMIC));
4749
ClusterService clusterService = mock(ClusterService.class);
@@ -51,8 +53,13 @@ public void testSetProcessConnectTimeout() throws IOException {
5153
AutodetectParams autodetectParams = mock(AutodetectParams.class);
5254
ProcessPipes processPipes = mock(ProcessPipes.class);
5355

54-
NativeAutodetectProcessFactory nativeAutodetectProcessFactory =
55-
new NativeAutodetectProcessFactory(env, settings, nativeController, client, clusterService);
56+
NativeAutodetectProcessFactory nativeAutodetectProcessFactory = new NativeAutodetectProcessFactory(
57+
env,
58+
settings,
59+
nativeController,
60+
clusterService,
61+
resultsPersisterService,
62+
anomalyDetectionAuditor);
5663
nativeAutodetectProcessFactory.setProcessConnectTimeout(TimeValue.timeValueSeconds(timeoutSeconds));
5764
nativeAutodetectProcessFactory.createNativeProcess(job, autodetectParams, processPipes, Collections.emptyList());
5865

x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/process/IndexingStateProcessorTests.java

+13-17
Original file line numberDiff line numberDiff line change
@@ -6,16 +6,16 @@
66
package org.elasticsearch.xpack.ml.process;
77

88
import com.carrotsearch.randomizedtesting.annotations.Timeout;
9-
import org.elasticsearch.action.ActionFuture;
109
import org.elasticsearch.action.bulk.BulkRequest;
1110
import org.elasticsearch.action.bulk.BulkResponse;
12-
import org.elasticsearch.client.Client;
1311
import org.elasticsearch.common.bytes.BytesReference;
1412
import org.elasticsearch.common.settings.Settings;
1513
import org.elasticsearch.common.util.concurrent.ThreadContext;
1614
import org.elasticsearch.mock.orig.Mockito;
1715
import org.elasticsearch.test.ESTestCase;
1816
import org.elasticsearch.threadpool.ThreadPool;
17+
import org.elasticsearch.xpack.ml.notifications.AnomalyDetectionAuditor;
18+
import org.elasticsearch.xpack.ml.utils.persistence.ResultsPersisterService;
1919
import org.junit.After;
2020
import org.junit.Before;
2121
import org.mockito.ArgumentCaptor;
@@ -54,24 +54,22 @@ public class IndexingStateProcessorTests extends ESTestCase {
5454
private static final int NUM_LARGE_DOCS = 2;
5555
private static final int LARGE_DOC_SIZE = 1000000;
5656

57-
private Client client;
5857
private IndexingStateProcessor stateProcessor;
58+
private ResultsPersisterService resultsPersisterService;
5959

6060
@Before
61-
public void initialize() throws IOException {
62-
client = mock(Client.class);
63-
@SuppressWarnings("unchecked")
64-
ActionFuture<BulkResponse> bulkResponseFuture = mock(ActionFuture.class);
65-
stateProcessor = spy(new IndexingStateProcessor(client, JOB_ID));
66-
when(client.bulk(any(BulkRequest.class))).thenReturn(bulkResponseFuture);
61+
public void initialize() {
62+
resultsPersisterService = mock(ResultsPersisterService.class);
63+
AnomalyDetectionAuditor auditor = mock(AnomalyDetectionAuditor.class);
64+
stateProcessor = spy(new IndexingStateProcessor(JOB_ID, resultsPersisterService, auditor));
65+
when(resultsPersisterService.bulkIndexWithRetry(any(BulkRequest.class), any(), any(), any())).thenReturn(mock(BulkResponse.class));
6766
ThreadPool threadPool = mock(ThreadPool.class);
68-
when(client.threadPool()).thenReturn(threadPool);
6967
when(threadPool.getThreadContext()).thenReturn(new ThreadContext(Settings.EMPTY));
7068
}
7169

7270
@After
7371
public void verifyNoMoreClientInteractions() {
74-
Mockito.verifyNoMoreInteractions(client);
72+
Mockito.verifyNoMoreInteractions(resultsPersisterService);
7573
}
7674

7775
public void testStateRead() throws IOException {
@@ -85,8 +83,7 @@ public void testStateRead() throws IOException {
8583
assertEquals(threeStates[0], capturedBytes.get(0).utf8ToString());
8684
assertEquals(threeStates[1], capturedBytes.get(1).utf8ToString());
8785
assertEquals(threeStates[2], capturedBytes.get(2).utf8ToString());
88-
verify(client, times(3)).bulk(any(BulkRequest.class));
89-
verify(client, times(3)).threadPool();
86+
verify(resultsPersisterService, times(3)).bulkIndexWithRetry(any(BulkRequest.class), any(), any(), any());
9087
}
9188

9289
public void testStateReadGivenConsecutiveZeroBytes() throws IOException {
@@ -96,7 +93,7 @@ public void testStateReadGivenConsecutiveZeroBytes() throws IOException {
9693
stateProcessor.process(stream);
9794

9895
verify(stateProcessor, never()).persist(any());
99-
Mockito.verifyNoMoreInteractions(client);
96+
Mockito.verifyNoMoreInteractions(resultsPersisterService);
10097
}
10198

10299
public void testStateReadGivenConsecutiveSpacesFollowedByZeroByte() throws IOException {
@@ -106,7 +103,7 @@ public void testStateReadGivenConsecutiveSpacesFollowedByZeroByte() throws IOExc
106103
stateProcessor.process(stream);
107104

108105
verify(stateProcessor, times(1)).persist(any());
109-
Mockito.verifyNoMoreInteractions(client);
106+
Mockito.verifyNoMoreInteractions(resultsPersisterService);
110107
}
111108

112109
/**
@@ -128,7 +125,6 @@ public void testLargeStateRead() throws Exception {
128125
ByteArrayInputStream stream = new ByteArrayInputStream(builder.toString().getBytes(StandardCharsets.UTF_8));
129126
stateProcessor.process(stream);
130127
verify(stateProcessor, times(NUM_LARGE_DOCS)).persist(any());
131-
verify(client, times(NUM_LARGE_DOCS)).bulk(any(BulkRequest.class));
132-
verify(client, times(NUM_LARGE_DOCS)).threadPool();
128+
verify(resultsPersisterService, times(NUM_LARGE_DOCS)).bulkIndexWithRetry(any(BulkRequest.class), any(), any(), any());
133129
}
134130
}

0 commit comments

Comments
 (0)