|
33 | 33 | import org.elasticsearch.cluster.ack.ClusterStateUpdateRequest;
|
34 | 34 | import org.elasticsearch.cluster.service.ClusterService;
|
35 | 35 | import org.elasticsearch.common.Priority;
|
| 36 | +import org.elasticsearch.common.Strings; |
36 | 37 | import org.elasticsearch.common.settings.Settings;
|
37 | 38 | import org.elasticsearch.common.unit.TimeValue;
|
38 | 39 | import org.elasticsearch.common.xcontent.NamedXContentRegistry;
|
39 | 40 | import org.elasticsearch.common.xcontent.ObjectPath;
|
| 41 | +import org.elasticsearch.index.Index; |
40 | 42 | import org.elasticsearch.index.mapper.MapperService;
|
41 | 43 | import org.elasticsearch.index.mapper.MetadataFieldMapper;
|
42 | 44 | import org.elasticsearch.rest.RestStatus;
|
43 | 45 | import org.elasticsearch.threadpool.ThreadPool;
|
44 | 46 |
|
45 | 47 | import java.io.IOException;
|
46 |
| -import java.util.Collections; |
47 |
| -import java.util.HashMap; |
| 48 | +import java.util.List; |
48 | 49 | import java.util.Locale;
|
49 | 50 | import java.util.Map;
|
| 51 | +import java.util.Objects; |
50 | 52 | import java.util.concurrent.atomic.AtomicReference;
|
| 53 | +import java.util.stream.Collectors; |
51 | 54 |
|
52 | 55 | public class MetadataCreateDataStreamService {
|
53 | 56 |
|
@@ -116,54 +119,81 @@ public CreateDataStreamClusterStateUpdateRequest(String name,
|
116 | 119 | static ClusterState createDataStream(MetadataCreateIndexService metadataCreateIndexService,
|
117 | 120 | ClusterState currentState,
|
118 | 121 | CreateDataStreamClusterStateUpdateRequest request) throws Exception {
|
| 122 | + return createDataStream(metadataCreateIndexService, currentState, request.name, org.elasticsearch.common.collect.List.of(), null); |
| 123 | + } |
| 124 | + |
| 125 | + /** |
| 126 | + * Creates a data stream with the specified properties. |
| 127 | + * |
| 128 | + * @param metadataCreateIndexService Used if a new write index must be created |
| 129 | + * @param currentState Cluster state |
| 130 | + * @param dataStreamName Name of the data stream |
| 131 | + * @param backingIndices List of backing indices. May be empty |
| 132 | + * @param writeIndex Write index for the data stream. If null, a new write index will be created. |
| 133 | + * @return Cluster state containing the new data stream |
| 134 | + */ |
| 135 | + static ClusterState createDataStream(MetadataCreateIndexService metadataCreateIndexService, |
| 136 | + ClusterState currentState, |
| 137 | + String dataStreamName, |
| 138 | + List<IndexMetadata> backingIndices, |
| 139 | + IndexMetadata writeIndex) throws Exception |
| 140 | + { |
119 | 141 | if (currentState.nodes().getMinNodeVersion().before(Version.V_7_9_0)) {
|
120 | 142 | throw new IllegalStateException("data streams require minimum node version of " + Version.V_7_9_0);
|
121 | 143 | }
|
122 |
| - |
123 |
| - if (currentState.metadata().dataStreams().containsKey(request.name)) { |
124 |
| - throw new ResourceAlreadyExistsException("data_stream [" + request.name + "] already exists"); |
| 144 | + if (writeIndex == null) { |
| 145 | + Objects.requireNonNull(metadataCreateIndexService); |
| 146 | + } |
| 147 | + Objects.requireNonNull(currentState); |
| 148 | + Objects.requireNonNull(backingIndices); |
| 149 | + if (currentState.metadata().dataStreams().containsKey(dataStreamName)) { |
| 150 | + throw new ResourceAlreadyExistsException("data_stream [" + dataStreamName + "] already exists"); |
125 | 151 | }
|
126 | 152 |
|
127 |
| - MetadataCreateIndexService.validateIndexOrAliasName(request.name, |
| 153 | + MetadataCreateIndexService.validateIndexOrAliasName(dataStreamName, |
128 | 154 | (s1, s2) -> new IllegalArgumentException("data_stream [" + s1 + "] " + s2));
|
129 | 155 |
|
130 |
| - if (request.name.toLowerCase(Locale.ROOT).equals(request.name) == false) { |
131 |
| - throw new IllegalArgumentException("data_stream [" + request.name + "] must be lowercase"); |
| 156 | + if (dataStreamName.toLowerCase(Locale.ROOT).equals(dataStreamName) == false) { |
| 157 | + throw new IllegalArgumentException("data_stream [" + dataStreamName + "] must be lowercase"); |
132 | 158 | }
|
133 |
| - if (request.name.startsWith(DataStream.BACKING_INDEX_PREFIX)) { |
134 |
| - throw new IllegalArgumentException("data_stream [" + request.name + "] must not start with '" |
| 159 | + if (dataStreamName.startsWith(DataStream.BACKING_INDEX_PREFIX)) { |
| 160 | + throw new IllegalArgumentException("data_stream [" + dataStreamName + "] must not start with '" |
135 | 161 | + DataStream.BACKING_INDEX_PREFIX + "'");
|
136 | 162 | }
|
137 | 163 |
|
138 |
| - ComposableIndexTemplate template = lookupTemplateForDataStream(request.name, currentState.metadata()); |
139 |
| - |
140 |
| - String firstBackingIndexName = DataStream.getDefaultBackingIndexName(request.name, 1); |
141 |
| - CreateIndexClusterStateUpdateRequest createIndexRequest = |
142 |
| - new CreateIndexClusterStateUpdateRequest("initialize_data_stream", firstBackingIndexName, firstBackingIndexName) |
143 |
| - .dataStreamName(request.name) |
144 |
| - .settings(Settings.builder().put("index.hidden", true).build()); |
145 |
| - try { |
146 |
| - currentState = metadataCreateIndexService.applyCreateIndexRequest(currentState, createIndexRequest, false); |
147 |
| - } catch (ResourceAlreadyExistsException e) { |
148 |
| - // Rethrow as ElasticsearchStatusException, so that bulk transport action doesn't ignore it during |
149 |
| - // auto index/data stream creation. |
150 |
| - // (otherwise bulk execution fails later, because data stream will also not have been created) |
151 |
| - throw new ElasticsearchStatusException("data stream could not be created because backing index [{}] already exists", |
152 |
| - RestStatus.BAD_REQUEST, e, firstBackingIndexName); |
| 164 | + ComposableIndexTemplate template = lookupTemplateForDataStream(dataStreamName, currentState.metadata()); |
| 165 | + |
| 166 | + if (writeIndex == null) { |
| 167 | + String firstBackingIndexName = DataStream.getDefaultBackingIndexName(dataStreamName, 1); |
| 168 | + CreateIndexClusterStateUpdateRequest createIndexRequest = |
| 169 | + new CreateIndexClusterStateUpdateRequest("initialize_data_stream", firstBackingIndexName, firstBackingIndexName) |
| 170 | + .dataStreamName(dataStreamName) |
| 171 | + .settings(Settings.builder().put("index.hidden", true).build()); |
| 172 | + try { |
| 173 | + currentState = metadataCreateIndexService.applyCreateIndexRequest(currentState, createIndexRequest, false); |
| 174 | + } catch (ResourceAlreadyExistsException e) { |
| 175 | + // Rethrow as ElasticsearchStatusException, so that bulk transport action doesn't ignore it during |
| 176 | + // auto index/data stream creation. |
| 177 | + // (otherwise bulk execution fails later, because data stream will also not have been created) |
| 178 | + throw new ElasticsearchStatusException("data stream could not be created because backing index [{}] already exists", |
| 179 | + RestStatus.BAD_REQUEST, e, firstBackingIndexName); |
| 180 | + } |
| 181 | + writeIndex = currentState.metadata().index(firstBackingIndexName); |
153 | 182 | }
|
154 |
| - IndexMetadata firstBackingIndex = currentState.metadata().index(firstBackingIndexName); |
155 |
| - assert firstBackingIndex != null; |
156 |
| - assert firstBackingIndex.mapping() != null : "no mapping found for backing index [" + firstBackingIndexName + "]"; |
| 183 | + assert writeIndex != null; |
| 184 | + assert writeIndex.mapping() != null : "no mapping found for backing index [" + writeIndex.getIndex().getName() + "]"; |
157 | 185 |
|
158 | 186 | String fieldName = template.getDataStreamTemplate().getTimestampField();
|
159 | 187 | DataStream.TimestampField timestampField = new DataStream.TimestampField(fieldName);
|
| 188 | + List<Index> dsBackingIndices = backingIndices.stream().map(IndexMetadata::getIndex).collect(Collectors.toList()); |
| 189 | + dsBackingIndices.add(writeIndex.getIndex()); |
160 | 190 | boolean hidden = template.getDataStreamTemplate().isHidden();
|
161 |
| - DataStream newDataStream = |
162 |
| - new DataStream(request.name, timestampField, |
163 |
| - Collections.singletonList(firstBackingIndex.getIndex()), 1L, |
164 |
| - template.metadata() != null ? Collections.unmodifiableMap(new HashMap<>(template.metadata())) : null, hidden, false); |
| 191 | + DataStream newDataStream = new DataStream(dataStreamName, timestampField, dsBackingIndices, 1L, |
| 192 | + template.metadata() != null ? org.elasticsearch.common.collect.Map.copyOf(template.metadata()) : null, hidden, false); |
165 | 193 | Metadata.Builder builder = Metadata.builder(currentState.metadata()).put(newDataStream);
|
166 |
| - logger.info("adding data stream [{}]", request.name); |
| 194 | + logger.info("adding data stream [{}] with write index [{}] and backing indices [{}]", dataStreamName, |
| 195 | + writeIndex.getIndex().getName(), |
| 196 | + Strings.arrayToCommaDelimitedString(backingIndices.stream().map(i -> i.getIndex().getName()).toArray())); |
167 | 197 | return ClusterState.builder(currentState).metadata(builder).build();
|
168 | 198 | }
|
169 | 199 |
|
|
0 commit comments