Skip to content

Commit 1c9ee51

Browse files
authored
[8.x] Add LogsDB option to route on sort fields (#116687) (#119113)
* Add LogsDB option to route on sort fields (#116687) * Add LogsDB option to route on sort fields * fix encoding * Update docs/changelog/116687.yaml * tests * tests * tests * fix mode * tests * tests * tests * add test * fix test * sync * updates from review * test fixes * test fixes * test fixes * Move logic to SyntheticSourceIndexSettingsProvider * fix test * sync * merge, no fallback * comments * fix test * address comments * address comments * address comments * Update x-pack/plugin/logsdb/src/main/java/org/elasticsearch/xpack/logsdb/LogsdbIndexModeSettingsProvider.java Co-authored-by: Martijn van Groningen <[email protected]> * [CI] Auto commit changes from spotless * update tests * [CI] Auto commit changes from spotless * update tests * fix rest compat tests --------- Co-authored-by: Martijn van Groningen <[email protected]> Co-authored-by: elasticsearchmachine <[email protected]> (cherry picked from commit d80cbdd) # Conflicts: # rest-api-spec/build.gradle # server/src/main/java/org/elasticsearch/common/TimeBasedKOrderedUUIDGenerator.java * Update LogsIndexingIT.java
1 parent 7af5f28 commit 1c9ee51

File tree

24 files changed

+673
-58
lines changed

24 files changed

+673
-58
lines changed

docs/changelog/116687.yaml

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
pr: 116687
2+
summary: Add LogsDB option to route on sort fields
3+
area: Logs
4+
type: enhancement
5+
issues: []

rest-api-spec/src/yamlRestTest/resources/rest-api-spec/test/logsdb/10_settings.yml

Lines changed: 42 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -514,6 +514,48 @@ routing path not allowed in logs mode:
514514
- match: { error.type: "illegal_argument_exception" }
515515
- match: { error.reason: "[index.routing_path] requires [index.mode=time_series]" }
516516

517+
---
518+
routing path allowed in logs mode with routing on sort fields:
519+
- requires:
520+
cluster_features: [ "routing.logsb_route_on_sort_fields" ]
521+
reason: introduction of route on index sorting fields
522+
523+
- do:
524+
indices.create:
525+
index: test
526+
body:
527+
settings:
528+
index:
529+
mode: logsdb
530+
number_of_replicas: 0
531+
number_of_shards: 2
532+
routing_path: [ host.name, agent_id ]
533+
logsdb:
534+
route_on_sort_fields: true
535+
mappings:
536+
properties:
537+
"@timestamp":
538+
type: date
539+
host.name:
540+
type: keyword
541+
agent_id:
542+
type: keyword
543+
process_id:
544+
type: integer
545+
http_method:
546+
type: keyword
547+
message:
548+
type: text
549+
550+
- do:
551+
indices.get_settings:
552+
index: test
553+
554+
- is_true: test
555+
- match: { test.settings.index.mode: logsdb }
556+
- match: { test.settings.index.logsdb.route_on_sort_fields: "true" }
557+
- match: { test.settings.index.routing_path: [ host.name, agent_id ] }
558+
517559
---
518560
start time not allowed in logs mode:
519561
- requires:

rest-api-spec/src/yamlRestTest/resources/rest-api-spec/test/tsdb/20_mapping.yml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -127,7 +127,7 @@ exact match object type:
127127
reason: routing_path error message updated in 8.14.0
128128

129129
- do:
130-
catch: '/All fields that match routing_path must be configured with \[time_series_dimension: true\] or flattened fields with a list of dimensions in \[time_series_dimensions\] and without the \[script\] parameter. \[dim\] was \[object\]./'
130+
catch: '/All fields that match routing_path must be .*flattened fields.* \[dim\] was \[object\]./'
131131
indices.create:
132132
index: tsdb_index
133133
body:

rest-api-spec/src/yamlRestTest/resources/rest-api-spec/test/tsdb/25_id_generation.yml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -427,7 +427,7 @@ delete over _bulk:
427427
- match: {items.0.delete.result: deleted}
428428
- match: {items.1.delete.result: deleted}
429429
- match: {items.2.delete.status: 404}
430-
- match: {items.2.delete.error.reason: "invalid id [not found ++ not found] for index [id_generation_test] in time series mode"}
430+
- match: {items.2.delete.error.reason: '/invalid\ id\ \[not\ found\ \+\+\ not\ found\]\ for\ index\ \[id_generation_test\]\ in\ time.series\ mode/'}
431431

432432
---
433433
routing_path matches deep object:

rest-api-spec/src/yamlRestTest/resources/rest-api-spec/test/tsdb/80_index_resize.yml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -95,7 +95,7 @@ split:
9595
reason: tsdb indexing changed in 8.2.0
9696

9797
- do:
98-
catch: /index-split is not supported because the destination index \[test\] is in time series mode/
98+
catch: /index-split is not supported because the destination index \[test\] is in time.series mode/
9999
indices.split:
100100
index: test
101101
target: test_split

rest-api-spec/src/yamlRestTest/resources/rest-api-spec/test/tsdb/90_unsupported_operations.yml

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -75,7 +75,7 @@ index with routing:
7575
reason: tsdb indexing changed in 8.2.0
7676

7777
- do:
78-
catch: /specifying routing is not supported because the destination index \[test\] is in time series mode/
78+
catch: /specifying routing is not supported because the destination index \[test\] is in time.series mode/
7979
index:
8080
index: test
8181
routing: foo
@@ -104,7 +104,7 @@ index with routing over _bulk:
104104
body:
105105
- '{"index": {"routing": "foo"}}'
106106
- '{"@timestamp": "2021-04-28T18:50:04.467Z", "metricset": "pod", "k8s": {"pod": {"name": "cat", "uid":"947e4ced-1786-4e53-9e0c-5c447e959507", "ip": "10.10.55.1", "network": {"tx": 2001818691, "rx": 802133794}}}}'
107-
- match: {items.0.index.error.reason: "specifying routing is not supported because the destination index [test] is in time series mode"}
107+
- match: {items.0.index.error.reason: '/specifying\ routing\ is\ not\ supported\ because\ the\ destination\ index\ \[test\]\ is\ in\ time.series\ mode/'}
108108

109109
---
110110
noop update:
@@ -120,7 +120,7 @@ noop update:
120120
- length: {hits.hits: 1}
121121

122122
- do:
123-
catch: /update is not supported because the destination index \[test\] is in time series mode/
123+
catch: /update is not supported because the destination index \[test\] is in time.series mode/
124124
update:
125125
index: test
126126
id: "1"
@@ -136,7 +136,7 @@ regular update:
136136

137137
# We fail even though the document isn't found.
138138
- do:
139-
catch: /update is not supported because the destination index \[test\] is in time series mode/
139+
catch: /update is not supported because the destination index \[test\] is in time.series mode/
140140
update:
141141
index: test
142142
id: "1"
@@ -165,7 +165,7 @@ update over _bulk:
165165
body:
166166
- '{"update": {"_id": 1}}'
167167
- '{"doc":{"@timestamp": "2021-04-28T18:03:24.467Z", "metricset": "pod", "k8s": {"pod": {"name": "dog", "uid":"df3145b3-0563-4d3b-a0f7-897eb2876ea9", "ip": "10.10.55.3", "network": {"tx": 1434595272, "rx": 530605511}}}}}'
168-
- match: {items.0.update.error.reason: "update is not supported because the destination index [test] is in time series mode"}
168+
- match: {items.0.update.error.reason: '/update\ is\ not\ supported\ because\ the\ destination\ index\ \[test\]\ is\ in\ time.series\ mode/'}
169169

170170
---
171171
search with routing:
@@ -175,7 +175,7 @@ search with routing:
175175

176176
# We fail even though the document isn't found.
177177
- do:
178-
catch: /searching with a specified routing is not supported because the destination index \[test\] is in time series mode/
178+
catch: /searching with a specified routing is not supported because the destination index \[test\] is in time.series mode/
179179
search:
180180
index: test
181181
routing: rrrr

server/src/main/java/org/elasticsearch/action/index/IndexRequest.java

Lines changed: 11 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -51,6 +51,7 @@
5151
import java.util.Locale;
5252
import java.util.Map;
5353
import java.util.Objects;
54+
import java.util.OptionalInt;
5455
import java.util.function.Supplier;
5556

5657
import static org.elasticsearch.action.ValidateActions.addValidationError;
@@ -78,7 +79,6 @@ public class IndexRequest extends ReplicatedWriteRequest<IndexRequest> implement
7879
private static final TransportVersion PIPELINES_HAVE_RUN_FIELD_ADDED = TransportVersions.V_8_10_X;
7980

8081
private static final Supplier<String> ID_GENERATOR = UUIDs::base64UUID;
81-
private static final Supplier<String> K_SORTED_TIME_BASED_ID_GENERATOR = UUIDs::base64TimeBasedKOrderedUUID;
8282

8383
/**
8484
* Max length of the source document to include into string()
@@ -705,9 +705,18 @@ public void autoGenerateId() {
705705
}
706706

707707
public void autoGenerateTimeBasedId() {
708+
autoGenerateTimeBasedId(OptionalInt.empty());
709+
}
710+
711+
/**
712+
* Set the {@code #id()} to an automatically generated one, optimized for storage (compression) efficiency.
713+
* If a routing hash is passed, it is included in the generated id starting at 9 bytes before the end.
714+
* @param hash optional routing hash value, used to route requests by id to the right shard.
715+
*/
716+
public void autoGenerateTimeBasedId(OptionalInt hash) {
708717
assertBeforeGeneratingId();
709718
autoGenerateTimestamp();
710-
id(K_SORTED_TIME_BASED_ID_GENERATOR.get());
719+
id(UUIDs.base64TimeBasedKOrderedUUIDWithHash(hash));
711720
}
712721

713722
private void autoGenerateTimestamp() {

server/src/main/java/org/elasticsearch/cluster/routing/IndexRouting.java

Lines changed: 22 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,7 @@
3939
import java.util.Collections;
4040
import java.util.List;
4141
import java.util.Map;
42+
import java.util.OptionalInt;
4243
import java.util.Set;
4344
import java.util.function.IntConsumer;
4445
import java.util.function.IntSupplier;
@@ -54,6 +55,7 @@ public abstract class IndexRouting {
5455

5556
static final NodeFeature BOOLEAN_ROUTING_PATH = new NodeFeature("routing.boolean_routing_path");
5657
static final NodeFeature MULTI_VALUE_ROUTING_PATH = new NodeFeature("routing.multi_value_routing_path");
58+
static final NodeFeature LOGSB_ROUTE_ON_SORT_FIELDS = new NodeFeature("routing.logsb_route_on_sort_fields");
5759

5860
/**
5961
* Build the routing from {@link IndexMetadata}.
@@ -164,7 +166,8 @@ private abstract static class IdAndRoutingOnly extends IndexRouting {
164166

165167
@Override
166168
public void preProcess(IndexRequest indexRequest) {
167-
// generate id if not already provided
169+
// Generate id if not already provided.
170+
// This is needed for routing, so it has to happen in pre-processing.
168171
final String id = indexRequest.id();
169172
if (id == null) {
170173
if (creationVersion.onOrAfter(IndexVersions.TIME_BASED_K_ORDERED_DOC_ID_BACKPORT) && indexMode == IndexMode.LOGSDB) {
@@ -262,15 +265,20 @@ public void collectSearchShards(String routing, IntConsumer consumer) {
262265
public static class ExtractFromSource extends IndexRouting {
263266
private final Predicate<String> isRoutingPath;
264267
private final XContentParserConfiguration parserConfig;
268+
private final IndexMode indexMode;
265269
private final boolean trackTimeSeriesRoutingHash;
270+
private final boolean addIdWithRoutingHash;
266271
private int hash = Integer.MAX_VALUE;
267272

268273
ExtractFromSource(IndexMetadata metadata) {
269274
super(metadata);
270275
if (metadata.isRoutingPartitionedIndex()) {
271276
throw new IllegalArgumentException("routing_partition_size is incompatible with routing_path");
272277
}
273-
trackTimeSeriesRoutingHash = metadata.getCreationVersion().onOrAfter(IndexVersions.TIME_SERIES_ROUTING_HASH_IN_ID);
278+
indexMode = metadata.getIndexMode();
279+
trackTimeSeriesRoutingHash = indexMode == IndexMode.TIME_SERIES
280+
&& metadata.getCreationVersion().onOrAfter(IndexVersions.TIME_SERIES_ROUTING_HASH_IN_ID);
281+
addIdWithRoutingHash = indexMode == IndexMode.LOGSDB;
274282
List<String> routingPaths = metadata.getRoutingPaths();
275283
isRoutingPath = Regex.simpleMatcher(routingPaths.toArray(String[]::new));
276284
this.parserConfig = XContentParserConfiguration.EMPTY.withFiltering(null, Set.copyOf(routingPaths), null, true);
@@ -282,8 +290,13 @@ public boolean matchesField(String fieldName) {
282290

283291
@Override
284292
public void postProcess(IndexRequest indexRequest) {
293+
// Update the request with the routing hash, if needed.
294+
// This needs to happen in post-processing, after the routing hash is calculated.
285295
if (trackTimeSeriesRoutingHash) {
286296
indexRequest.routing(TimeSeriesRoutingHashFieldMapper.encode(hash));
297+
} else if (addIdWithRoutingHash) {
298+
assert hash != Integer.MAX_VALUE;
299+
indexRequest.autoGenerateTimeBasedId(OptionalInt.of(hash));
287300
}
288301
}
289302

@@ -451,12 +464,15 @@ private int idToHash(String id) {
451464
try {
452465
idBytes = Base64.getUrlDecoder().decode(id);
453466
} catch (IllegalArgumentException e) {
454-
throw new ResourceNotFoundException("invalid id [{}] for index [{}] in time series mode", id, indexName);
467+
throw new ResourceNotFoundException("invalid id [{}] for index [{}] in " + indexMode.getName() + " mode", id, indexName);
455468
}
456469
if (idBytes.length < 4) {
457-
throw new ResourceNotFoundException("invalid id [{}] for index [{}] in time series mode", id, indexName);
470+
throw new ResourceNotFoundException("invalid id [{}] for index [{}] in " + indexMode.getName() + " mode", id, indexName);
458471
}
459-
return hashToShardId(ByteUtils.readIntLE(idBytes, 0));
472+
// For TSDB, the hash is stored as the id prefix.
473+
// For LogsDB with routing on sort fields, the routing hash is stored in the range[id.length - 9, id.length - 5] of the id,
474+
// see IndexRequest#autoGenerateTimeBasedId.
475+
return hashToShardId(ByteUtils.readIntLE(idBytes, addIdWithRoutingHash ? idBytes.length - 9 : 0));
460476
}
461477

462478
@Override
@@ -470,7 +486,7 @@ public void collectSearchShards(String routing, IntConsumer consumer) {
470486
}
471487

472488
private String error(String operation) {
473-
return operation + " is not supported because the destination index [" + indexName + "] is in time series mode";
489+
return operation + " is not supported because the destination index [" + indexName + "] is in " + indexMode.getName() + " mode";
474490
}
475491
}
476492

server/src/main/java/org/elasticsearch/cluster/routing/RoutingFeatures.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,4 +20,9 @@ public class RoutingFeatures implements FeatureSpecification {
2020
public Set<NodeFeature> getFeatures() {
2121
return Set.of(IndexRouting.BOOLEAN_ROUTING_PATH, IndexRouting.MULTI_VALUE_ROUTING_PATH);
2222
}
23+
24+
@Override
25+
public Set<NodeFeature> getTestFeatures() {
26+
return Set.of(IndexRouting.LOGSB_ROUTE_ON_SORT_FIELDS);
27+
}
2328
}

server/src/main/java/org/elasticsearch/common/TimeBasedKOrderedUUIDGenerator.java

Lines changed: 16 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -9,8 +9,11 @@
99

1010
package org.elasticsearch.common;
1111

12+
import org.elasticsearch.common.util.ByteUtils;
13+
1214
import java.nio.ByteBuffer;
1315
import java.util.Base64;
16+
import java.util.OptionalInt;
1417
import java.util.function.Supplier;
1518

1619
/**
@@ -29,6 +32,7 @@
2932
* The result is a compact base64-encoded string, optimized for efficient compression of the _id field in an inverted index.
3033
*/
3134
public class TimeBasedKOrderedUUIDGenerator extends TimeBasedUUIDGenerator {
35+
static final int SIZE_IN_BYTES = 15;
3236

3337
private static final Base64.Encoder BASE_64_NO_PADDING_URL_ENCODER = Base64.getUrlEncoder().withoutPadding();
3438

@@ -42,6 +46,10 @@ public TimeBasedKOrderedUUIDGenerator(
4246

4347
@Override
4448
public String getBase64UUID() {
49+
return getBase64UUID(OptionalInt.empty());
50+
}
51+
52+
public String getBase64UUID(OptionalInt hash) {
4553
final int sequenceId = sequenceNumber.incrementAndGet() & 0x00FF_FFFF;
4654

4755
// Calculate timestamp to ensure ordering and avoid backward movement in case of time shifts.
@@ -53,7 +61,7 @@ public String getBase64UUID() {
5361
sequenceId == 0 ? (lastTimestamp, currentTimeMillis) -> Math.max(lastTimestamp, currentTimeMillis) + 1 : Math::max
5462
);
5563

56-
final byte[] uuidBytes = new byte[15];
64+
final byte[] uuidBytes = new byte[SIZE_IN_BYTES + (hash.isPresent() ? 4 : 0)];
5765
final ByteBuffer buffer = ByteBuffer.wrap(uuidBytes);
5866

5967
buffer.put((byte) (timestamp >>> 40)); // changes every 35 years
@@ -67,6 +75,13 @@ public String getBase64UUID() {
6775
assert macAddress.length == 6;
6876
buffer.put(macAddress, 0, macAddress.length);
6977

78+
// Copy the hash value if provided
79+
if (hash.isPresent()) {
80+
byte[] hashBytes = new byte[4];
81+
ByteUtils.writeIntLE(hash.getAsInt(), hashBytes, 0);
82+
buffer.put(hashBytes, 0, hashBytes.length);
83+
}
84+
7085
buffer.put((byte) (sequenceId >>> 16));
7186

7287
// From hereinafter everything is almost like random and does not compress well

server/src/main/java/org/elasticsearch/common/UUIDs.java

Lines changed: 6 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@
1111

1212
import org.elasticsearch.common.settings.SecureString;
1313

14+
import java.util.OptionalInt;
1415
import java.util.Random;
1516
import java.util.concurrent.atomic.AtomicInteger;
1617
import java.util.function.Supplier;
@@ -23,14 +24,14 @@ public class UUIDs {
2324
public static final Supplier<Long> DEFAULT_TIMESTAMP_SUPPLIER = System::currentTimeMillis;
2425
public static final Supplier<Integer> DEFAULT_SEQUENCE_ID_SUPPLIER = sequenceNumber::incrementAndGet;
2526
public static final Supplier<byte[]> DEFAULT_MAC_ADDRESS_SUPPLIER = MacAddressProvider::getSecureMungedAddress;
26-
private static final UUIDGenerator RANDOM_UUID_GENERATOR = new RandomBasedUUIDGenerator();
27-
private static final UUIDGenerator TIME_BASED_K_ORDERED_GENERATOR = new TimeBasedKOrderedUUIDGenerator(
27+
private static final RandomBasedUUIDGenerator RANDOM_UUID_GENERATOR = new RandomBasedUUIDGenerator();
28+
private static final TimeBasedKOrderedUUIDGenerator TIME_BASED_K_ORDERED_GENERATOR = new TimeBasedKOrderedUUIDGenerator(
2829
DEFAULT_TIMESTAMP_SUPPLIER,
2930
DEFAULT_SEQUENCE_ID_SUPPLIER,
3031
DEFAULT_MAC_ADDRESS_SUPPLIER
3132
);
3233

33-
private static final UUIDGenerator TIME_UUID_GENERATOR = new TimeBasedUUIDGenerator(
34+
private static final TimeBasedUUIDGenerator TIME_UUID_GENERATOR = new TimeBasedUUIDGenerator(
3435
DEFAULT_TIMESTAMP_SUPPLIER,
3536
DEFAULT_SEQUENCE_ID_SUPPLIER,
3637
DEFAULT_MAC_ADDRESS_SUPPLIER
@@ -51,12 +52,8 @@ public static String base64UUID() {
5152
return TIME_UUID_GENERATOR.getBase64UUID();
5253
}
5354

54-
public static String base64TimeBasedKOrderedUUID() {
55-
return TIME_BASED_K_ORDERED_GENERATOR.getBase64UUID();
56-
}
57-
58-
public static String base64TimeBasedUUID() {
59-
return TIME_UUID_GENERATOR.getBase64UUID();
55+
public static String base64TimeBasedKOrderedUUIDWithHash(OptionalInt hash) {
56+
return TIME_BASED_K_ORDERED_GENERATOR.getBase64UUID(hash);
6057
}
6158

6259
/**

server/src/main/java/org/elasticsearch/common/settings/IndexScopedSettings.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -183,6 +183,7 @@ public final class IndexScopedSettings extends AbstractScopedSettings {
183183
IndexSettings.LIFECYCLE_ORIGINATION_DATE_SETTING,
184184
IndexSettings.LIFECYCLE_PARSE_ORIGINATION_DATE_SETTING,
185185
IndexSettings.TIME_SERIES_ES87TSDB_CODEC_ENABLED_SETTING,
186+
IndexSettings.LOGSDB_ROUTE_ON_SORT_FIELDS,
186187
IndexSettings.PREFER_ILM_SETTING,
187188
DataStreamFailureStoreDefinition.FAILURE_STORE_DEFINITION_VERSION_SETTING,
188189
FieldMapper.SYNTHETIC_SOURCE_KEEP_INDEX_SETTING,

0 commit comments

Comments
 (0)