@@ -112,7 +112,7 @@ public static boolean isFailureStoreFeatureFlagEnabled() {
112
112
private final IndexMode indexMode ;
113
113
@ Nullable
114
114
private final DataStreamLifecycle lifecycle ;
115
- private final boolean failureStoreEnabled ;
115
+ private final DataStreamOptions dataStreamOptions ;
116
116
117
117
private final DataStreamIndices backingIndices ;
118
118
private final DataStreamIndices failureIndices ;
@@ -128,7 +128,7 @@ public DataStream(
128
128
boolean allowCustomRouting ,
129
129
IndexMode indexMode ,
130
130
DataStreamLifecycle lifecycle ,
131
- boolean failureStoreEnabled ,
131
+ @ Nullable DataStreamOptions dataStreamOptions ,
132
132
List <Index > failureIndices ,
133
133
boolean rolloverOnWrite ,
134
134
@ Nullable DataStreamAutoShardingEvent autoShardingEvent
@@ -144,7 +144,7 @@ public DataStream(
144
144
allowCustomRouting ,
145
145
indexMode ,
146
146
lifecycle ,
147
- failureStoreEnabled ,
147
+ dataStreamOptions ,
148
148
new DataStreamIndices (BACKING_INDEX_PREFIX , List .copyOf (indices ), rolloverOnWrite , autoShardingEvent ),
149
149
new DataStreamIndices (FAILURE_STORE_PREFIX , List .copyOf (failureIndices ), false , null )
150
150
);
@@ -162,7 +162,7 @@ public DataStream(
162
162
boolean allowCustomRouting ,
163
163
IndexMode indexMode ,
164
164
DataStreamLifecycle lifecycle ,
165
- boolean failureStoreEnabled ,
165
+ DataStreamOptions dataStreamOptions ,
166
166
DataStreamIndices backingIndices ,
167
167
DataStreamIndices failureIndices
168
168
) {
@@ -177,7 +177,7 @@ public DataStream(
177
177
this .allowCustomRouting = allowCustomRouting ;
178
178
this .indexMode = indexMode ;
179
179
this .lifecycle = lifecycle ;
180
- this .failureStoreEnabled = failureStoreEnabled ;
180
+ this .dataStreamOptions = dataStreamOptions == null ? DataStreamOptions . EMPTY : dataStreamOptions ;
181
181
assert backingIndices .indices .isEmpty () == false ;
182
182
assert replicated == false || (backingIndices .rolloverOnWrite == false && failureIndices .rolloverOnWrite == false )
183
183
: "replicated data streams cannot be marked for lazy rollover" ;
@@ -198,9 +198,11 @@ public static DataStream read(StreamInput in) throws IOException {
198
198
var lifecycle = in .getTransportVersion ().onOrAfter (TransportVersions .V_8_9_X )
199
199
? in .readOptionalWriteable (DataStreamLifecycle ::new )
200
200
: null ;
201
- var failureStoreEnabled = in .getTransportVersion ().onOrAfter (DataStream .ADDED_FAILURE_STORE_TRANSPORT_VERSION )
202
- ? in .readBoolean ()
203
- : false ;
201
+ // This boolean flag has been moved in data stream options
202
+ var failureStoreEnabled = in .getTransportVersion ()
203
+ .between (DataStream .ADDED_FAILURE_STORE_TRANSPORT_VERSION , TransportVersions .ADD_DATA_STREAM_OPTIONS )
204
+ ? in .readBoolean ()
205
+ : false ;
204
206
var failureIndices = in .getTransportVersion ().onOrAfter (DataStream .ADDED_FAILURE_STORE_TRANSPORT_VERSION )
205
207
? readIndices (in )
206
208
: List .<Index >of ();
@@ -213,6 +215,14 @@ public static DataStream read(StreamInput in) throws IOException {
213
215
failureIndicesBuilder .setRolloverOnWrite (in .readBoolean ())
214
216
.setAutoShardingEvent (in .readOptionalWriteable (DataStreamAutoShardingEvent ::new ));
215
217
}
218
+ DataStreamOptions dataStreamOptions ;
219
+ if (in .getTransportVersion ().onOrAfter (TransportVersions .ADD_DATA_STREAM_OPTIONS )) {
220
+ dataStreamOptions = in .readOptionalWriteable (DataStreamOptions ::read );
221
+ } else {
222
+ // We cannot distinguish if failure store was explicitly disabled or not. Given that failure store
223
+ // is still behind a feature flag in previous version we use the default value instead of explicitly disabling it.
224
+ dataStreamOptions = failureStoreEnabled ? DataStreamOptions .FAILURE_STORE_ENABLED : null ;
225
+ }
216
226
return new DataStream (
217
227
name ,
218
228
generation ,
@@ -224,7 +234,7 @@ public static DataStream read(StreamInput in) throws IOException {
224
234
allowCustomRouting ,
225
235
indexMode ,
226
236
lifecycle ,
227
- failureStoreEnabled ,
237
+ dataStreamOptions ,
228
238
backingIndicesBuilder .build (),
229
239
failureIndicesBuilder .build ()
230
240
);
@@ -274,6 +284,10 @@ public boolean isFailureStoreIndex(String indexName) {
274
284
return failureIndices .containsIndex (indexName );
275
285
}
276
286
287
+ public DataStreamOptions getDataStreamOptions () {
288
+ return dataStreamOptions ;
289
+ }
290
+
277
291
public boolean rolloverOnWrite () {
278
292
return backingIndices .rolloverOnWrite ;
279
293
}
@@ -406,13 +420,12 @@ public boolean isAllowCustomRouting() {
406
420
}
407
421
408
422
/**
409
- * Determines if this data stream should persist ingest pipeline and mapping failures from bulk requests to a locally
410
- * configured failure store.
411
- *
412
- * @return Whether this data stream should store ingestion failures.
423
+ * Determines if this data stream has its failure store enabled or not. Currently, the failure store
424
+ * is enabled only when a user has explicitly requested it.
425
+ * @return true, if the user has explicitly enabled the failure store.
413
426
*/
414
427
public boolean isFailureStoreEnabled () {
415
- return failureStoreEnabled ;
428
+ return dataStreamOptions . failureStore () != null && dataStreamOptions . failureStore (). isExplicitlyEnabled () ;
416
429
}
417
430
418
431
@ Nullable
@@ -1063,8 +1076,11 @@ public void writeTo(StreamOutput out) throws IOException {
1063
1076
if (out .getTransportVersion ().onOrAfter (TransportVersions .V_8_9_X )) {
1064
1077
out .writeOptionalWriteable (lifecycle );
1065
1078
}
1079
+ if (out .getTransportVersion ()
1080
+ .between (DataStream .ADDED_FAILURE_STORE_TRANSPORT_VERSION , TransportVersions .ADD_DATA_STREAM_OPTIONS )) {
1081
+ out .writeBoolean (isFailureStoreEnabled ());
1082
+ }
1066
1083
if (out .getTransportVersion ().onOrAfter (DataStream .ADDED_FAILURE_STORE_TRANSPORT_VERSION )) {
1067
- out .writeBoolean (failureStoreEnabled );
1068
1084
out .writeCollection (failureIndices .indices );
1069
1085
}
1070
1086
if (out .getTransportVersion ().onOrAfter (TransportVersions .V_8_13_0 )) {
@@ -1077,6 +1093,9 @@ public void writeTo(StreamOutput out) throws IOException {
1077
1093
out .writeBoolean (failureIndices .rolloverOnWrite );
1078
1094
out .writeOptionalWriteable (failureIndices .autoShardingEvent );
1079
1095
}
1096
+ if (out .getTransportVersion ().onOrAfter (TransportVersions .ADD_DATA_STREAM_OPTIONS )) {
1097
+ out .writeOptionalWriteable (dataStreamOptions .isEmpty () ? null : dataStreamOptions );
1098
+ }
1080
1099
}
1081
1100
1082
1101
public static final ParseField NAME_FIELD = new ParseField ("name" );
@@ -1096,6 +1115,7 @@ public void writeTo(StreamOutput out) throws IOException {
1096
1115
public static final ParseField AUTO_SHARDING_FIELD = new ParseField ("auto_sharding" );
1097
1116
public static final ParseField FAILURE_ROLLOVER_ON_WRITE_FIELD = new ParseField ("failure_rollover_on_write" );
1098
1117
public static final ParseField FAILURE_AUTO_SHARDING_FIELD = new ParseField ("failure_auto_sharding" );
1118
+ public static final ParseField DATA_STREAM_OPTIONS_FIELD = new ParseField ("options" );
1099
1119
1100
1120
@ SuppressWarnings ("unchecked" )
1101
1121
private static final ConstructingObjectParser <DataStream , Void > PARSER = new ConstructingObjectParser <>("data_stream" , args -> {
@@ -1110,6 +1130,16 @@ public void writeTo(StreamOutput out) throws IOException {
1110
1130
(DataStreamAutoShardingEvent ) args [15 ]
1111
1131
)
1112
1132
: new DataStreamIndices (FAILURE_STORE_PREFIX , List .of (), false , null );
1133
+ // We cannot distinguish if failure store was explicitly disabled or not. Given that failure store
1134
+ // is still behind a feature flag in previous version we use the default value instead of explicitly disabling it.
1135
+ DataStreamOptions dataStreamOptions = DataStreamOptions .EMPTY ;
1136
+ if (DataStream .isFailureStoreFeatureFlagEnabled ()) {
1137
+ if (args [16 ] != null ) {
1138
+ dataStreamOptions = (DataStreamOptions ) args [16 ];
1139
+ } else if (failureStoreEnabled ) {
1140
+ dataStreamOptions = DataStreamOptions .FAILURE_STORE_ENABLED ;
1141
+ }
1142
+ }
1113
1143
return new DataStream (
1114
1144
(String ) args [0 ],
1115
1145
(Long ) args [2 ],
@@ -1121,7 +1151,7 @@ public void writeTo(StreamOutput out) throws IOException {
1121
1151
args [7 ] != null && (boolean ) args [7 ],
1122
1152
args [8 ] != null ? IndexMode .fromString ((String ) args [8 ]) : null ,
1123
1153
(DataStreamLifecycle ) args [9 ],
1124
- failureStoreEnabled ,
1154
+ dataStreamOptions ,
1125
1155
new DataStreamIndices (
1126
1156
BACKING_INDEX_PREFIX ,
1127
1157
(List <Index >) args [1 ],
@@ -1171,6 +1201,11 @@ public void writeTo(StreamOutput out) throws IOException {
1171
1201
(p , c ) -> DataStreamAutoShardingEvent .fromXContent (p ),
1172
1202
FAILURE_AUTO_SHARDING_FIELD
1173
1203
);
1204
+ PARSER .declareObject (
1205
+ ConstructingObjectParser .optionalConstructorArg (),
1206
+ (p , c ) -> DataStreamOptions .fromXContent (p ),
1207
+ DATA_STREAM_OPTIONS_FIELD
1208
+ );
1174
1209
}
1175
1210
}
1176
1211
@@ -1208,7 +1243,6 @@ public XContentBuilder toXContent(
1208
1243
builder .field (SYSTEM_FIELD .getPreferredName (), system );
1209
1244
builder .field (ALLOW_CUSTOM_ROUTING .getPreferredName (), allowCustomRouting );
1210
1245
if (DataStream .isFailureStoreFeatureFlagEnabled ()) {
1211
- builder .field (FAILURE_STORE_FIELD .getPreferredName (), failureStoreEnabled );
1212
1246
if (failureIndices .indices .isEmpty () == false ) {
1213
1247
builder .xContentList (FAILURE_INDICES_FIELD .getPreferredName (), failureIndices .indices );
1214
1248
}
@@ -1218,6 +1252,10 @@ public XContentBuilder toXContent(
1218
1252
failureIndices .autoShardingEvent .toXContent (builder , params );
1219
1253
builder .endObject ();
1220
1254
}
1255
+ if (dataStreamOptions .isEmpty () == false ) {
1256
+ builder .field (DATA_STREAM_OPTIONS_FIELD .getPreferredName ());
1257
+ dataStreamOptions .toXContent (builder , params );
1258
+ }
1221
1259
}
1222
1260
if (indexMode != null ) {
1223
1261
builder .field (INDEX_MODE .getPreferredName (), indexMode );
@@ -1250,7 +1288,7 @@ public boolean equals(Object o) {
1250
1288
&& allowCustomRouting == that .allowCustomRouting
1251
1289
&& indexMode == that .indexMode
1252
1290
&& Objects .equals (lifecycle , that .lifecycle )
1253
- && failureStoreEnabled == that .failureStoreEnabled
1291
+ && Objects . equals ( dataStreamOptions , that .dataStreamOptions )
1254
1292
&& Objects .equals (backingIndices , that .backingIndices )
1255
1293
&& Objects .equals (failureIndices , that .failureIndices );
1256
1294
}
@@ -1267,7 +1305,7 @@ public int hashCode() {
1267
1305
allowCustomRouting ,
1268
1306
indexMode ,
1269
1307
lifecycle ,
1270
- failureStoreEnabled ,
1308
+ dataStreamOptions ,
1271
1309
backingIndices ,
1272
1310
failureIndices
1273
1311
);
@@ -1580,7 +1618,7 @@ public static class Builder {
1580
1618
private IndexMode indexMode = null ;
1581
1619
@ Nullable
1582
1620
private DataStreamLifecycle lifecycle = null ;
1583
- private boolean failureStoreEnabled = false ;
1621
+ private DataStreamOptions dataStreamOptions = DataStreamOptions . EMPTY ;
1584
1622
private DataStreamIndices backingIndices ;
1585
1623
private DataStreamIndices failureIndices = DataStreamIndices .failureIndicesBuilder (List .of ()).build ();
1586
1624
@@ -1605,7 +1643,7 @@ private Builder(DataStream dataStream) {
1605
1643
allowCustomRouting = dataStream .allowCustomRouting ;
1606
1644
indexMode = dataStream .indexMode ;
1607
1645
lifecycle = dataStream .lifecycle ;
1608
- failureStoreEnabled = dataStream .failureStoreEnabled ;
1646
+ dataStreamOptions = dataStream .dataStreamOptions ;
1609
1647
backingIndices = dataStream .backingIndices ;
1610
1648
failureIndices = dataStream .failureIndices ;
1611
1649
}
@@ -1660,8 +1698,8 @@ public Builder setLifecycle(DataStreamLifecycle lifecycle) {
1660
1698
return this ;
1661
1699
}
1662
1700
1663
- public Builder setFailureStoreEnabled ( boolean failureStoreEnabled ) {
1664
- this .failureStoreEnabled = failureStoreEnabled ;
1701
+ public Builder setDataStreamOptions ( DataStreamOptions dataStreamOptions ) {
1702
+ this .dataStreamOptions = dataStreamOptions ;
1665
1703
return this ;
1666
1704
}
1667
1705
@@ -1697,7 +1735,7 @@ public DataStream build() {
1697
1735
allowCustomRouting ,
1698
1736
indexMode ,
1699
1737
lifecycle ,
1700
- failureStoreEnabled ,
1738
+ dataStreamOptions ,
1701
1739
backingIndices ,
1702
1740
failureIndices
1703
1741
);
0 commit comments