Skip to content

Commit 7aae212

Browse files
author
Hendrik Muhs
authored
[Transform] Fix possible audit logging disappearance after rolling upgrade (#49731) (#49767)
ensure audit index template is available during a rolling upgrade before a transform task can write to it. fixes #49730
1 parent a3f8859 commit 7aae212

File tree

5 files changed

+177
-13
lines changed

5 files changed

+177
-13
lines changed

x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/transform/transforms/persistence/TransformInternalIndexConstants.java

+2-1
Original file line numberDiff line numberDiff line change
@@ -32,7 +32,8 @@ public final class TransformInternalIndexConstants {
3232
public static final String INDEX_NAME_PATTERN_DEPRECATED = ".data-frame-internal-*";
3333

3434
// audit index
35-
public static final String AUDIT_TEMPLATE_VERSION = "000001";
35+
// gh #49730: upped version of audit index to 000002
36+
public static final String AUDIT_TEMPLATE_VERSION = "000002";
3637
public static final String AUDIT_INDEX_PREFIX = ".transform-notifications-";
3738
public static final String AUDIT_INDEX_PATTERN = AUDIT_INDEX_PREFIX + "*";
3839
public static final String AUDIT_INDEX_DEPRECATED = ".data-frame-notifications-1";

x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/persistence/TransformInternalIndex.java

+56-7
Original file line numberDiff line numberDiff line change
@@ -317,21 +317,35 @@ private static XContentBuilder addMetaInformation(XContentBuilder builder) throw
317317
return builder.startObject("_meta").field("version", Version.CURRENT).endObject();
318318
}
319319

320-
public static boolean haveLatestVersionedIndexTemplate(ClusterState state) {
321-
return state.getMetaData().getTemplates().containsKey(TransformInternalIndexConstants.LATEST_INDEX_VERSIONED_NAME);
322-
}
323-
324320
/**
325321
* This method should be called before any document is indexed that relies on the
326-
* existence of the latest index template to create the internal index. The
327-
* reason is that the standard template upgrader only runs when the master node
322+
* existence of the latest index templates to create the internal and audit index.
323+
* The reason is that the standard template upgrader only runs when the master node
328324
* is upgraded to the newer version. If data nodes are upgraded before master
329325
* nodes and transforms get assigned to those data nodes then without this check
330326
* the data nodes will index documents into the internal index before the necessary
331327
* index template is present and this will result in an index with completely
332328
* dynamic mappings being created (which is very bad).
333329
*/
334-
public static void installLatestVersionedIndexTemplateIfRequired(
330+
public static void installLatestIndexTemplatesIfRequired(ClusterService clusterService, Client client, ActionListener<Void> listener) {
331+
332+
installLatestVersionedIndexTemplateIfRequired(
333+
clusterService,
334+
client,
335+
ActionListener.wrap(r -> { installLatestAuditIndexTemplateIfRequired(clusterService, client, listener); }, listener::onFailure)
336+
);
337+
338+
}
339+
340+
protected static boolean haveLatestVersionedIndexTemplate(ClusterState state) {
341+
return state.getMetaData().getTemplates().containsKey(TransformInternalIndexConstants.LATEST_INDEX_VERSIONED_NAME);
342+
}
343+
344+
protected static boolean haveLatestAuditIndexTemplate(ClusterState state) {
345+
return state.getMetaData().getTemplates().containsKey(TransformInternalIndexConstants.AUDIT_INDEX);
346+
}
347+
348+
protected static void installLatestVersionedIndexTemplateIfRequired(
335349
ClusterService clusterService,
336350
Client client,
337351
ActionListener<Void> listener
@@ -367,5 +381,40 @@ public static void installLatestVersionedIndexTemplateIfRequired(
367381
}
368382
}
369383

384+
protected static void installLatestAuditIndexTemplateIfRequired(
385+
ClusterService clusterService,
386+
Client client,
387+
ActionListener<Void> listener
388+
) {
389+
390+
// The check for existence of the template is against local cluster state, so very cheap
391+
if (haveLatestAuditIndexTemplate(clusterService.state())) {
392+
listener.onResponse(null);
393+
return;
394+
}
395+
396+
// Installing the template involves communication with the master node, so it's more expensive but much rarer
397+
try {
398+
IndexTemplateMetaData indexTemplateMetaData = getAuditIndexTemplateMetaData();
399+
BytesReference jsonMappings = new BytesArray(indexTemplateMetaData.mappings().get(SINGLE_MAPPING_NAME).uncompressed());
400+
PutIndexTemplateRequest request = new PutIndexTemplateRequest(TransformInternalIndexConstants.AUDIT_INDEX).patterns(
401+
indexTemplateMetaData.patterns()
402+
)
403+
.version(indexTemplateMetaData.version())
404+
.settings(indexTemplateMetaData.settings())
405+
.mapping(SINGLE_MAPPING_NAME, XContentHelper.convertToMap(jsonMappings, true, XContentType.JSON).v2());
406+
ActionListener<AcknowledgedResponse> innerListener = ActionListener.wrap(r -> listener.onResponse(null), listener::onFailure);
407+
executeAsyncWithOrigin(
408+
client.threadPool().getThreadContext(),
409+
TRANSFORM_ORIGIN,
410+
request,
411+
innerListener,
412+
client.admin().indices()::putTemplate
413+
);
414+
} catch (IOException e) {
415+
listener.onFailure(e);
416+
}
417+
}
418+
370419
private TransformInternalIndex() {}
371420
}

x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/transforms/TransformPersistentTasksExecutor.java

+2-2
Original file line numberDiff line numberDiff line change
@@ -283,8 +283,8 @@ protected void nodeOperation(AllocatedPersistentTask task, @Nullable TransformTa
283283
}
284284
);
285285

286-
// <1> Check the internal index template is installed
287-
TransformInternalIndex.installLatestVersionedIndexTemplateIfRequired(clusterService, client, templateCheckListener);
286+
// <1> Check the index templates are installed
287+
TransformInternalIndex.installLatestIndexTemplatesIfRequired(clusterService, client, templateCheckListener);
288288
}
289289

290290
private static IndexerState currentIndexerState(TransformState previousState) {

x-pack/plugin/transform/src/test/java/org/elasticsearch/xpack/transform/persistence/TransformInternalIndexTests.java

+110-2
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,7 @@
3838
public class TransformInternalIndexTests extends ESTestCase {
3939

4040
public static ClusterState STATE_WITH_LATEST_VERSIONED_INDEX_TEMPLATE;
41+
public static ClusterState STATE_WITH_LATEST_AUDIT_INDEX_TEMPLATE;
4142

4243
static {
4344
ImmutableOpenMap.Builder<String, IndexTemplateMetaData> mapBuilder = ImmutableOpenMap.builder();
@@ -51,6 +52,18 @@ public class TransformInternalIndexTests extends ESTestCase {
5152
ClusterState.Builder csBuilder = ClusterState.builder(ClusterName.DEFAULT);
5253
csBuilder.metaData(metaBuilder.build());
5354
STATE_WITH_LATEST_VERSIONED_INDEX_TEMPLATE = csBuilder.build();
55+
56+
mapBuilder = ImmutableOpenMap.builder();
57+
try {
58+
mapBuilder.put(TransformInternalIndexConstants.AUDIT_INDEX, TransformInternalIndex.getAuditIndexTemplateMetaData());
59+
} catch (IOException e) {
60+
throw new UncheckedIOException(e);
61+
}
62+
metaBuilder = MetaData.builder();
63+
metaBuilder.templates(mapBuilder.build());
64+
csBuilder = ClusterState.builder(ClusterName.DEFAULT);
65+
csBuilder.metaData(metaBuilder.build());
66+
STATE_WITH_LATEST_AUDIT_INDEX_TEMPLATE = csBuilder.build();
5467
}
5568

5669
public void testHaveLatestVersionedIndexTemplate() {
@@ -81,8 +94,7 @@ public void testInstallLatestVersionedIndexTemplateIfRequired_GivenRequired() {
8194
when(clusterService.state()).thenReturn(ClusterState.EMPTY_STATE);
8295

8396
IndicesAdminClient indicesClient = mock(IndicesAdminClient.class);
84-
doAnswer(
85-
invocationOnMock -> {
97+
doAnswer(invocationOnMock -> {
8698
@SuppressWarnings("unchecked")
8799
ActionListener<AcknowledgedResponse> listener = (ActionListener<AcknowledgedResponse>) invocationOnMock.getArguments()[1];
88100
listener.onResponse(new AcknowledgedResponse(true));
@@ -112,4 +124,100 @@ public void testInstallLatestVersionedIndexTemplateIfRequired_GivenRequired() {
112124
verify(indicesClient, times(1)).putTemplate(any(), any());
113125
verifyNoMoreInteractions(indicesClient);
114126
}
127+
128+
public void testHaveLatestAuditIndexTemplate() {
129+
130+
assertTrue(TransformInternalIndex.haveLatestAuditIndexTemplate(STATE_WITH_LATEST_AUDIT_INDEX_TEMPLATE));
131+
assertFalse(TransformInternalIndex.haveLatestAuditIndexTemplate(ClusterState.EMPTY_STATE));
132+
}
133+
134+
public void testInstallLatestAuditIndexTemplateIfRequired_GivenNotRequired() {
135+
136+
ClusterService clusterService = mock(ClusterService.class);
137+
when(clusterService.state()).thenReturn(TransformInternalIndexTests.STATE_WITH_LATEST_AUDIT_INDEX_TEMPLATE);
138+
139+
Client client = mock(Client.class);
140+
141+
AtomicBoolean gotResponse = new AtomicBoolean(false);
142+
ActionListener<Void> testListener = ActionListener.wrap(aVoid -> gotResponse.set(true), e -> fail(e.getMessage()));
143+
144+
TransformInternalIndex.installLatestAuditIndexTemplateIfRequired(clusterService, client, testListener);
145+
146+
assertTrue(gotResponse.get());
147+
verifyNoMoreInteractions(client);
148+
}
149+
150+
public void testInstallLatestAuditIndexTemplateIfRequired_GivenRequired() {
151+
152+
ClusterService clusterService = mock(ClusterService.class);
153+
when(clusterService.state()).thenReturn(ClusterState.EMPTY_STATE);
154+
155+
IndicesAdminClient indicesClient = mock(IndicesAdminClient.class);
156+
doAnswer(invocationOnMock -> {
157+
@SuppressWarnings("unchecked")
158+
ActionListener<AcknowledgedResponse> listener = (ActionListener<AcknowledgedResponse>) invocationOnMock.getArguments()[1];
159+
listener.onResponse(new AcknowledgedResponse(true));
160+
return null;
161+
}).when(indicesClient).putTemplate(any(), any());
162+
163+
AdminClient adminClient = mock(AdminClient.class);
164+
when(adminClient.indices()).thenReturn(indicesClient);
165+
Client client = mock(Client.class);
166+
when(client.admin()).thenReturn(adminClient);
167+
168+
ThreadPool threadPool = mock(ThreadPool.class);
169+
when(threadPool.getThreadContext()).thenReturn(new ThreadContext(Settings.EMPTY));
170+
when(client.threadPool()).thenReturn(threadPool);
171+
172+
AtomicBoolean gotResponse = new AtomicBoolean(false);
173+
ActionListener<Void> testListener = ActionListener.wrap(aVoid -> gotResponse.set(true), e -> fail(e.getMessage()));
174+
175+
TransformInternalIndex.installLatestAuditIndexTemplateIfRequired(clusterService, client, testListener);
176+
177+
assertTrue(gotResponse.get());
178+
verify(client, times(1)).threadPool();
179+
verify(client, times(1)).admin();
180+
verifyNoMoreInteractions(client);
181+
verify(adminClient, times(1)).indices();
182+
verifyNoMoreInteractions(adminClient);
183+
verify(indicesClient, times(1)).putTemplate(any(), any());
184+
verifyNoMoreInteractions(indicesClient);
185+
}
186+
187+
public void testInstallLatestIndexTemplateIfRequired_GivenRequired() {
188+
189+
ClusterService clusterService = mock(ClusterService.class);
190+
when(clusterService.state()).thenReturn(ClusterState.EMPTY_STATE);
191+
192+
IndicesAdminClient indicesClient = mock(IndicesAdminClient.class);
193+
doAnswer(invocationOnMock -> {
194+
@SuppressWarnings("unchecked")
195+
ActionListener<AcknowledgedResponse> listener = (ActionListener<AcknowledgedResponse>) invocationOnMock.getArguments()[1];
196+
listener.onResponse(new AcknowledgedResponse(true));
197+
return null;
198+
}).when(indicesClient).putTemplate(any(), any());
199+
200+
AdminClient adminClient = mock(AdminClient.class);
201+
when(adminClient.indices()).thenReturn(indicesClient);
202+
Client client = mock(Client.class);
203+
when(client.admin()).thenReturn(adminClient);
204+
205+
ThreadPool threadPool = mock(ThreadPool.class);
206+
when(threadPool.getThreadContext()).thenReturn(new ThreadContext(Settings.EMPTY));
207+
when(client.threadPool()).thenReturn(threadPool);
208+
209+
AtomicBoolean gotResponse = new AtomicBoolean(false);
210+
ActionListener<Void> testListener = ActionListener.wrap(aVoid -> gotResponse.set(true), e -> fail(e.getMessage()));
211+
212+
TransformInternalIndex.installLatestIndexTemplatesIfRequired(clusterService, client, testListener);
213+
214+
assertTrue(gotResponse.get());
215+
verify(client, times(2)).threadPool();
216+
verify(client, times(2)).admin();
217+
verifyNoMoreInteractions(client);
218+
verify(adminClient, times(2)).indices();
219+
verifyNoMoreInteractions(adminClient);
220+
verify(indicesClient, times(2)).putTemplate(any(), any());
221+
verifyNoMoreInteractions(indicesClient);
222+
}
115223
}

x-pack/qa/rolling-upgrade/src/test/resources/rest-api-spec/test/upgraded_cluster/80_transform_jobs_crud.yml

+7-1
Original file line numberDiff line numberDiff line change
@@ -258,7 +258,7 @@ setup:
258258
transform.delete_transform:
259259
transform_id: "mixed-simple-continuous-transform"
260260
---
261-
"Test index mappings for latest internal index":
261+
"Test index mappings for latest internal index and audit index":
262262
- do:
263263
transform.put_transform:
264264
transform_id: "upgraded-simple-transform"
@@ -279,3 +279,9 @@ setup:
279279
index: .transform-internal-004
280280
- match: { \.transform-internal-004.mappings.dynamic: "false" }
281281
- match: { \.transform-internal-004.mappings.properties.id.type: "keyword" }
282+
- do:
283+
indices.get_mapping:
284+
index: .transform-notifications-000002
285+
- match: { \.transform-notifications-000002.mappings.dynamic: "false" }
286+
- match: { \.transform-notifications-000002.mappings.properties.transform_id.type: "keyword" }
287+
- match: { \.transform-notifications-000002.mappings.properties.timestamp.type: "date" }

0 commit comments

Comments
 (0)