Skip to content

Commit e6e147e

Browse files
authored
Adjust failure store to work with TSDS (elastic#114307)
In this PR we add a test and we fix the issues we encountered when we enabled the failure store for TSDS and logsdb. **Logsdb** Logsdb worked out of the box, so we just added the test that indexes with a bulk request a couple of documents and tests how they are ingested. **TSDS** Here it was a bit trickier. We encountered the following issues: - TSDS requires a timestamp to determine the write index of the data stream meaning the failure happens earlier than we have anticipated so far. We added a special exception to detect this case and we treat it accordingly. - The template of a TSDS data stream sets certain settings that we do not want to have in the failure store index. We added an allowlist that gets applied before we add the necessary index settings. Furthermore, we added a test case to capture this.
1 parent 185bf68 commit e6e147e

File tree

12 files changed

+369
-15
lines changed

12 files changed

+369
-15
lines changed

modules/data-streams/src/internalClusterTest/java/org/elasticsearch/datastreams/TSDBIndexingIT.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020
import org.elasticsearch.action.admin.indices.template.put.PutComponentTemplateAction;
2121
import org.elasticsearch.action.admin.indices.template.put.TransportPutComposableIndexTemplateAction;
2222
import org.elasticsearch.action.bulk.BulkRequest;
23+
import org.elasticsearch.action.bulk.IndexDocFailureStoreStatus;
2324
import org.elasticsearch.action.get.GetRequest;
2425
import org.elasticsearch.action.index.IndexRequest;
2526
import org.elasticsearch.action.search.SearchRequest;
@@ -170,7 +171,7 @@ public void testTimeRanges() throws Exception {
170171
var indexRequest = new IndexRequest("k8s").opType(DocWriteRequest.OpType.CREATE);
171172
time = randomBoolean() ? endTime : endTime.plusSeconds(randomIntBetween(1, 99));
172173
indexRequest.source(DOC.replace("$time", formatInstant(time)), XContentType.JSON);
173-
expectThrows(IllegalArgumentException.class, () -> client().index(indexRequest).actionGet());
174+
expectThrows(IndexDocFailureStoreStatus.ExceptionWithFailureStoreStatus.class, () -> client().index(indexRequest).actionGet());
174175
}
175176

176177
// Fetch UpdateTimeSeriesRangeService and increment time range of latest backing index:

modules/data-streams/src/main/java/org/elasticsearch/datastreams/DataStreamFeatures.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@
2626
public class DataStreamFeatures implements FeatureSpecification {
2727

2828
public static final NodeFeature DATA_STREAM_LIFECYCLE = new NodeFeature("data_stream.lifecycle");
29+
public static final NodeFeature DATA_STREAM_FAILURE_STORE_TSDB_FIX = new NodeFeature("data_stream.failure_store.tsdb_fix");
2930

3031
@Override
3132
public Map<NodeFeature, Version> getHistoricalFeatures() {
@@ -41,4 +42,9 @@ public Set<NodeFeature> getFeatures() {
4142
DataStreamGlobalRetention.GLOBAL_RETENTION // Added in 8.14
4243
);
4344
}
45+
46+
@Override
47+
public Set<NodeFeature> getTestFeatures() {
48+
return Set.of(DATA_STREAM_FAILURE_STORE_TSDB_FIX);
49+
}
4450
}

modules/data-streams/src/yamlRestTest/resources/rest-api-spec/test/data_stream/150_tsdb.yml

Lines changed: 101 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -182,6 +182,107 @@ index without timestamp:
182182
body:
183183
- '{"metricset": "pod", "k8s": {"pod": {"name": "cat", "uid":"947e4ced-1786-4e53-9e0c-5c447e959507", "ip": "10.10.55.1", "network": {"tx": 2001818691, "rx": 802133794}}}}'
184184

185+
---
186+
TSDB failures go to failure store:
187+
- requires:
188+
cluster_features: ["data_stream.failure_store.tsdb_fix"]
189+
reason: "tests tsdb failure store fixes in 8.16.0 that catch timestamp errors that happen earlier in the process and redirect them to the failure store."
190+
191+
- do:
192+
allowed_warnings:
193+
- "index template [my-template2] has index patterns [fs-k8s*] matching patterns from existing older templates [global] with patterns (global => [*]); this template [my-template2] will take precedence during new index creation"
194+
indices.put_index_template:
195+
name: my-template2
196+
body:
197+
index_patterns: [ "fs-k8s*" ]
198+
data_stream:
199+
failure_store: true
200+
template:
201+
settings:
202+
index:
203+
mode: time_series
204+
number_of_replicas: 1
205+
number_of_shards: 2
206+
routing_path: [ metricset, time_series_dimension ]
207+
time_series:
208+
start_time: 2021-04-28T00:00:00Z
209+
end_time: 2021-04-29T00:00:00Z
210+
mappings:
211+
properties:
212+
"@timestamp":
213+
type: date
214+
metricset:
215+
type: keyword
216+
time_series_dimension: true
217+
k8s:
218+
properties:
219+
pod:
220+
properties:
221+
uid:
222+
type: keyword
223+
time_series_dimension: true
224+
name:
225+
type: keyword
226+
ip:
227+
type: ip
228+
network:
229+
properties:
230+
tx:
231+
type: long
232+
rx:
233+
type: long
234+
- do:
235+
index:
236+
index: fs-k8s
237+
body:
238+
- '{"metricset": "pod", "k8s": {"pod": {"name": "cat", "uid":"947e4ced-1786-4e53-9e0c-5c447e959507", "ip": "10.10.55.1", "network": {"tx": 2001818691, "rx": 802133794}}}}'
239+
- match: { result : "created"}
240+
- match: { failure_store : "used"}
241+
242+
- do:
243+
bulk:
244+
refresh: true
245+
body:
246+
- '{ "create": { "_index": "fs-k8s"} }'
247+
- '{"@timestamp":"2021-04-28T01:00:00ZZ", "metricset": "pod", "k8s": {"pod": {"name": "cat", "uid":"947e4ced-1786-4e53-9e0c-5c447e959507", "ip": "10.10.55.1", "network": {"tx": 2001818691, "rx": 802133794}}}}'
248+
- '{ "create": { "_index": "k8s"} }'
249+
- '{ "@timestamp": "2021-04-28T01:00:00ZZ", "metricset": "pod", "k8s": {"pod": {"name": "cat", "uid":"947e4ced-1786-4e53-9e0c-5c447e959507", "ip": "10.10.55.1", "network": {"tx": 2001818691, "rx": 802133794}}}}'
250+
- '{ "create": { "_index": "fs-k8s"} }'
251+
- '{ "metricset": "pod", "k8s": {"pod": {"name": "cat", "uid":"947e4ced-1786-4e53-9e0c-5c447e959507", "ip": "10.10.55.1", "network": {"tx": 2001818691, "rx": 802133794}}}}'
252+
- '{ "create": { "_index": "fs-k8s"} }'
253+
- '{ "@timestamp":"2000-04-28T01:00:00ZZ", "metricset": "pod", "k8s": {"pod": {"name": "cat", "uid":"947e4ced-1786-4e53-9e0c-5c447e959507", "ip": "10.10.55.1", "network": {"tx": 2001818691, "rx": 802133794}}}}'
254+
- '{ "create": { "_index": "k8s"} }'
255+
- '{"metricset": "pod", "k8s": {"pod": {"name": "cat", "uid":"947e4ced-1786-4e53-9e0c-5c447e959507", "ip": "10.10.55.1", "network": {"tx": 2001818691, "rx": 802133794}}}}'
256+
- '{ "create": { "_index": "k8s"} }'
257+
- '{ "@timestamp":"2000-04-28T01:00:00ZZ", "metricset": "pod", "k8s": {"pod": {"name": "cat", "uid":"947e4ced-1786-4e53-9e0c-5c447e959507", "ip": "10.10.55.1", "network": {"tx": 2001818691, "rx": 802133794}}}}'
258+
- is_true: errors
259+
260+
# Successfully indexed to backing index
261+
- match: { items.0.create._index: '/\.ds-fs-k8s-(\d{4}\.\d{2}\.\d{2}-)?000001/' }
262+
- match: { items.0.create.status: 201 }
263+
- is_false: items.0.create.failure_store
264+
- match: { items.1.create._index: '/\.ds-k8s-(\d{4}\.\d{2}\.\d{2}-)?000001/' }
265+
- match: { items.1.create.status: 201 }
266+
- is_false: items.1.create.failure_store
267+
268+
# Successfully indexed to failure store
269+
- match: { items.2.create._index: '/\.fs-fs-k8s-(\d{4}\.\d{2}\.\d{2}-)?000002/' }
270+
- match: { items.2.create.status: 201 }
271+
- match: { items.2.create.failure_store: used }
272+
- match: { items.3.create._index: '/\.fs-fs-k8s-(\d{4}\.\d{2}\.\d{2}-)?000002/' }
273+
- match: { items.3.create.status: 201 }
274+
- match: { items.3.create.failure_store: used }
275+
276+
# Rejected, eligible to go to failure store, but failure store not enabled
277+
- match: { items.4.create._index: 'k8s' }
278+
- match: { items.4.create.status: 400 }
279+
- match: { items.4.create.error.type: timestamp_error }
280+
- match: { items.4.create.failure_store: not_enabled }
281+
- match: { items.4.create._index: 'k8s' }
282+
- match: { items.4.create.status: 400 }
283+
- match: { items.4.create.error.type: timestamp_error }
284+
- match: { items.4.create.failure_store: not_enabled }
285+
185286
---
186287
index without timestamp with pipeline:
187288
- do:

modules/data-streams/src/yamlRestTest/resources/rest-api-spec/test/data_stream/190_failure_store_redirection.yml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -879,7 +879,7 @@ teardown:
879879
# Successfully indexed to backing index
880880
- match: { items.0.create._index: '/\.ds-logs-foobar-(\d{4}\.\d{2}\.\d{2}-)?000001/' }
881881
- match: { items.0.create.status: 201 }
882-
- is_false: items.1.create.failure_store
882+
- is_false: items.0.create.failure_store
883883

884884
# Rejected but not eligible to go to failure store
885885
- match: { items.1.create._index: '/\.ds-logs-foobar-(\d{4}\.\d{2}\.\d{2}-)?000001/' }

server/src/main/java/org/elasticsearch/action/admin/indices/create/CreateIndexClusterStateUpdateRequest.java

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,7 @@ public class CreateIndexClusterStateUpdateRequest {
3535
private ResizeType resizeType;
3636
private boolean copySettings;
3737
private SystemDataStreamDescriptor systemDataStreamDescriptor;
38+
private boolean isFailureIndex = false;
3839

3940
private Settings settings = Settings.EMPTY;
4041

@@ -102,6 +103,11 @@ public CreateIndexClusterStateUpdateRequest systemDataStreamDescriptor(SystemDat
102103
return this;
103104
}
104105

106+
public CreateIndexClusterStateUpdateRequest isFailureIndex(boolean isFailureIndex) {
107+
this.isFailureIndex = isFailureIndex;
108+
return this;
109+
}
110+
105111
public String cause() {
106112
return cause;
107113
}
@@ -168,6 +174,10 @@ public String dataStreamName() {
168174
return dataStreamName;
169175
}
170176

177+
public boolean isFailureIndex() {
178+
return isFailureIndex;
179+
}
180+
171181
public CreateIndexClusterStateUpdateRequest dataStreamName(String dataStreamName) {
172182
this.dataStreamName = dataStreamName;
173183
return this;
@@ -228,6 +238,8 @@ public String toString() {
228238
+ systemDataStreamDescriptor
229239
+ ", matchingTemplate="
230240
+ matchingTemplate
241+
+ ", isFailureIndex="
242+
+ isFailureIndex
231243
+ '}';
232244
}
233245
}

server/src/main/java/org/elasticsearch/action/bulk/BulkOperation.java

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -320,6 +320,12 @@ private Map<ShardId, List<BulkItemRequest>> groupRequestsByShards(
320320
shard -> new ArrayList<>()
321321
);
322322
shardRequests.add(bulkItemRequest);
323+
} catch (DataStream.TimestampError timestampError) {
324+
IndexDocFailureStoreStatus failureStoreStatus = processFailure(bulkItemRequest, clusterState, timestampError);
325+
if (IndexDocFailureStoreStatus.USED.equals(failureStoreStatus) == false) {
326+
String name = ia != null ? ia.getName() : docWriteRequest.index();
327+
addFailureAndDiscardRequest(docWriteRequest, bulkItemRequest.id(), name, timestampError, failureStoreStatus);
328+
}
323329
} catch (ElasticsearchParseException | IllegalArgumentException | RoutingMissingException | ResourceNotFoundException e) {
324330
String name = ia != null ? ia.getName() : docWriteRequest.index();
325331
var failureStoreStatus = isFailureStoreRequest(docWriteRequest)
@@ -545,6 +551,7 @@ private IndexDocFailureStoreStatus processFailure(BulkItemRequest bulkItemReques
545551
boolean added = addDocumentToRedirectRequests(bulkItemRequest, cause, failureStoreCandidate.getName());
546552
if (added) {
547553
failureStoreMetrics.incrementFailureStore(bulkItemRequest.index(), errorType, FailureStoreMetrics.ErrorLocation.SHARD);
554+
return IndexDocFailureStoreStatus.USED;
548555
} else {
549556
failureStoreMetrics.incrementRejected(
550557
bulkItemRequest.index(),

server/src/main/java/org/elasticsearch/cluster/metadata/DataStream.java

Lines changed: 20 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1343,7 +1343,7 @@ public Index getWriteIndex(IndexRequest request, Metadata metadata) {
13431343
+ "]"
13441344
)
13451345
.collect(Collectors.joining());
1346-
throw new IllegalArgumentException(
1346+
throw new TimestampError(
13471347
"the document timestamp ["
13481348
+ timestampAsString
13491349
+ "] is outside of ranges of currently writable indices ["
@@ -1405,10 +1405,10 @@ private static Instant getTimeStampFromRaw(Object rawTimestamp) {
14051405
} else if (rawTimestamp instanceof String sTimestamp) {
14061406
return DateFormatters.from(TIMESTAMP_FORMATTER.parse(sTimestamp), TIMESTAMP_FORMATTER.locale()).toInstant();
14071407
} else {
1408-
throw new IllegalArgumentException("timestamp [" + rawTimestamp + "] type [" + rawTimestamp.getClass() + "] error");
1408+
throw new TimestampError("timestamp [" + rawTimestamp + "] type [" + rawTimestamp.getClass() + "] error");
14091409
}
14101410
} catch (Exception e) {
1411-
throw new IllegalArgumentException("Error get data stream timestamp field: " + e.getMessage(), e);
1411+
throw new TimestampError("Error get data stream timestamp field: " + e.getMessage(), e);
14121412
}
14131413
}
14141414

@@ -1432,7 +1432,7 @@ private static Instant getTimestampFromParser(BytesReference source, XContentTyp
14321432
);
14331433
};
14341434
} catch (Exception e) {
1435-
throw new IllegalArgumentException("Error extracting data stream timestamp field: " + e.getMessage(), e);
1435+
throw new TimestampError("Error extracting data stream timestamp field: " + e.getMessage(), e);
14361436
}
14371437
}
14381438

@@ -1741,4 +1741,20 @@ public DataStream build() {
17411741
);
17421742
}
17431743
}
1744+
1745+
/**
1746+
* This is a specialised error to capture that a document does not have a valid timestamp
1747+
* to index a document. It is mainly applicable for TSDS data streams because they need the timestamp
1748+
* to determine the write index.
1749+
*/
1750+
public static class TimestampError extends IllegalArgumentException {
1751+
1752+
public TimestampError(String message, Exception cause) {
1753+
super(message, cause);
1754+
}
1755+
1756+
public TimestampError(String message) {
1757+
super(message);
1758+
}
1759+
}
17441760
}

server/src/main/java/org/elasticsearch/cluster/metadata/DataStreamFailureStoreDefinition.java

Lines changed: 41 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@
99

1010
package org.elasticsearch.cluster.metadata;
1111

12+
import org.elasticsearch.cluster.routing.allocation.DataTier;
1213
import org.elasticsearch.common.compress.CompressedXContent;
1314
import org.elasticsearch.common.settings.Setting;
1415
import org.elasticsearch.common.settings.Settings;
@@ -19,19 +20,39 @@
1920
import org.elasticsearch.index.mapper.RoutingFieldMapper;
2021

2122
import java.io.IOException;
23+
import java.util.HashSet;
24+
import java.util.Set;
2225

2326
/**
2427
* A utility class that contains the mappings and settings logic for failure store indices that are a part of data streams.
2528
*/
2629
public class DataStreamFailureStoreDefinition {
2730

2831
public static final String FAILURE_STORE_REFRESH_INTERVAL_SETTING_NAME = "data_streams.failure_store.refresh_interval";
32+
public static final String INDEX_FAILURE_STORE_VERSION_SETTING_NAME = "index.failure_store.version";
2933
public static final Settings DATA_STREAM_FAILURE_STORE_SETTINGS;
34+
// Only a subset of user configurable settings is applicable for a failure index. Here we have an
35+
// allowlist that will filter all other settings out.
36+
public static final Set<String> SUPPORTED_USER_SETTINGS = Set.of(
37+
DataTier.TIER_PREFERENCE,
38+
IndexMetadata.SETTING_INDEX_HIDDEN,
39+
INDEX_FAILURE_STORE_VERSION_SETTING_NAME,
40+
IndexMetadata.SETTING_NUMBER_OF_SHARDS,
41+
IndexMetadata.SETTING_NUMBER_OF_REPLICAS,
42+
IndexMetadata.SETTING_AUTO_EXPAND_REPLICAS,
43+
IndexSettings.INDEX_REFRESH_INTERVAL_SETTING.getKey(),
44+
IndexMetadata.LIFECYCLE_NAME
45+
);
46+
public static final Set<String> SUPPORTED_USER_SETTINGS_PREFIXES = Set.of(
47+
IndexMetadata.INDEX_ROUTING_REQUIRE_GROUP_PREFIX + ".",
48+
IndexMetadata.INDEX_ROUTING_INCLUDE_GROUP_PREFIX + ".",
49+
IndexMetadata.INDEX_ROUTING_EXCLUDE_GROUP_PREFIX + "."
50+
);
3051
public static final CompressedXContent DATA_STREAM_FAILURE_STORE_MAPPING;
3152

3253
public static final int FAILURE_STORE_DEFINITION_VERSION = 1;
3354
public static final Setting<Integer> FAILURE_STORE_DEFINITION_VERSION_SETTING = Setting.intSetting(
34-
"index.failure_store.version",
55+
INDEX_FAILURE_STORE_VERSION_SETTING_NAME,
3556
0,
3657
Setting.Property.IndexScope
3758
);
@@ -40,11 +61,6 @@ public class DataStreamFailureStoreDefinition {
4061
DATA_STREAM_FAILURE_STORE_SETTINGS = Settings.builder()
4162
// Always start with the hidden settings for a backing index.
4263
.put(IndexMetadata.SETTING_INDEX_HIDDEN, true)
43-
// Override any pipeline settings on the failure store to not use any
44-
// specified by the data stream template. Default pipelines are very much
45-
// meant for the backing indices only.
46-
.putNull(IndexSettings.DEFAULT_PIPELINE.getKey())
47-
.putNull(IndexSettings.FINAL_PIPELINE.getKey())
4864
.put(FAILURE_STORE_DEFINITION_VERSION_SETTING.getKey(), FAILURE_STORE_DEFINITION_VERSION)
4965
.build();
5066

@@ -199,4 +215,23 @@ public static Settings.Builder applyFailureStoreSettings(Settings nodeSettings,
199215
}
200216
return builder;
201217
}
218+
219+
/**
220+
* Removes the unsupported by the failure store settings from the settings provided.
221+
* ATTENTION: This method should be applied BEFORE we set the necessary settings for an index
222+
* @param builder the settings builder that is going to be updated
223+
* @return the original settings builder, with the unsupported settings removed.
224+
*/
225+
public static Settings.Builder filterUserDefinedSettings(Settings.Builder builder) {
226+
if (builder.keys().isEmpty() == false) {
227+
Set<String> existingKeys = new HashSet<>(builder.keys());
228+
for (String setting : existingKeys) {
229+
if (SUPPORTED_USER_SETTINGS.contains(setting) == false
230+
&& SUPPORTED_USER_SETTINGS_PREFIXES.stream().anyMatch(setting::startsWith) == false) {
231+
builder.remove(setting);
232+
}
233+
}
234+
}
235+
return builder;
236+
}
202237
}

server/src/main/java/org/elasticsearch/cluster/metadata/MetadataCreateDataStreamService.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -425,7 +425,8 @@ public static ClusterState createFailureStoreIndex(
425425
.nameResolvedInstant(nameResolvedInstant)
426426
.performReroute(false)
427427
.setMatchingTemplate(template)
428-
.settings(indexSettings);
428+
.settings(indexSettings)
429+
.isFailureIndex(true);
429430

430431
try {
431432
currentState = metadataCreateIndexService.applyCreateIndexRequest(

server/src/main/java/org/elasticsearch/cluster/metadata/MetadataCreateIndexService.java

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -983,6 +983,7 @@ static Settings aggregateIndexSettings(
983983
final Settings templateAndRequestSettings = Settings.builder().put(combinedTemplateSettings).put(request.settings()).build();
984984

985985
final IndexMode templateIndexMode = Optional.of(request)
986+
.filter(r -> r.isFailureIndex() == false)
986987
.map(CreateIndexClusterStateUpdateRequest::matchingTemplate)
987988
.map(metadata::retrieveIndexModeFromTemplate)
988989
.orElse(null);
@@ -1038,11 +1039,13 @@ static Settings aggregateIndexSettings(
10381039

10391040
// Finally, we actually add the explicit defaults prior to the template settings and the
10401041
// request settings, so that the precedence goes:
1041-
// Explicit Defaults -> Template -> Request -> Necessary Settings (# of shards, uuid, etc)
1042+
// Explicit Defaults -> Template -> Request -> Filter out failure store settings -> Necessary Settings (# of shards, uuid, etc)
10421043
indexSettingsBuilder.put(additionalIndexSettings.build());
10431044
indexSettingsBuilder.put(templateSettings.build());
10441045
}
1045-
1046+
if (request.isFailureIndex()) {
1047+
DataStreamFailureStoreDefinition.filterUserDefinedSettings(indexSettingsBuilder);
1048+
}
10461049
// now, put the request settings, so they override templates
10471050
indexSettingsBuilder.put(requestSettings.build());
10481051

0 commit comments

Comments
 (0)