From d055f1c627579a5a5e2efac7864169fbf2a7dea7 Mon Sep 17 00:00:00 2001 From: Martijn van Groningen Date: Fri, 20 Mar 2020 11:22:18 +0100 Subject: [PATCH 1/3] Initial data stream commit (#53666) This commits adds a data stream feature flag, initial definition of a data stream and the stubs for the data stream create, delete and get APIs. Also simple serialization tests are added and a rest test to thest the data stream API stubs. This is a large amount of code and mainly mechanical, but this commit should be straightforward to review, because there isn't any real logic. The data stream transport and rest action are behind the data stream feature flag and are only intialized if the feature flag is enabled. The feature flag is enabled if elasticsearch is build as snapshot or a release build and the 'es.datastreams_feature_flag_registered' is enabled. The integ-test-zip sets the feature flag if building a release build, otherwise rest tests would fail. Relates to #53100 --- build.gradle | 1 + .../client/RestHighLevelClientTests.java | 5 +- .../api/indices.create_data_stream.json | 31 ++++ .../api/indices.delete_data_stream.json | 26 +++ .../api/indices.get_data_streams.json | 33 ++++ .../test/indices.data_stream/10_basic.yml | 26 +++ .../elasticsearch/action/ActionModule.java | 38 ++++ .../datastream/CreateDataStreamAction.java | 128 +++++++++++++ .../datastream/DeleteDataStreamAction.java | 120 +++++++++++++ .../datastream/GetDataStreamsAction.java | 170 ++++++++++++++++++ .../client/IndicesAdminClient.java | 32 ++++ .../client/support/AbstractClient.java | 33 ++++ .../cluster/metadata/DataStream.java | 116 ++++++++++++ .../indices/RestCreateDataStreamAction.java | 58 ++++++ .../indices/RestDeleteDataStreamAction.java | 47 +++++ .../indices/RestGetDataStreamsAction.java | 52 ++++++ .../CreateDataStreamRequestTests.java | 38 ++++ .../DeleteDataStreamRequestTests.java | 36 ++++ .../GetDataStreamsRequestTests.java | 36 ++++ .../GetDataStreamsResponseTests.java | 60 +++++++ .../cluster/metadata/DataStreamTests.java | 50 ++++++ .../privilege/ClusterPrivilegeResolver.java | 7 +- 22 files changed, 1140 insertions(+), 3 deletions(-) create mode 100644 rest-api-spec/src/main/resources/rest-api-spec/api/indices.create_data_stream.json create mode 100644 rest-api-spec/src/main/resources/rest-api-spec/api/indices.delete_data_stream.json create mode 100644 rest-api-spec/src/main/resources/rest-api-spec/api/indices.get_data_streams.json create mode 100644 rest-api-spec/src/main/resources/rest-api-spec/test/indices.data_stream/10_basic.yml create mode 100644 server/src/main/java/org/elasticsearch/action/admin/indices/datastream/CreateDataStreamAction.java create mode 100644 server/src/main/java/org/elasticsearch/action/admin/indices/datastream/DeleteDataStreamAction.java create mode 100644 server/src/main/java/org/elasticsearch/action/admin/indices/datastream/GetDataStreamsAction.java create mode 100644 server/src/main/java/org/elasticsearch/cluster/metadata/DataStream.java create mode 100644 server/src/main/java/org/elasticsearch/rest/action/admin/indices/RestCreateDataStreamAction.java create mode 100644 server/src/main/java/org/elasticsearch/rest/action/admin/indices/RestDeleteDataStreamAction.java create mode 100644 server/src/main/java/org/elasticsearch/rest/action/admin/indices/RestGetDataStreamsAction.java create mode 100644 server/src/test/java/org/elasticsearch/action/admin/indices/datastream/CreateDataStreamRequestTests.java create mode 100644 server/src/test/java/org/elasticsearch/action/admin/indices/datastream/DeleteDataStreamRequestTests.java create mode 100644 server/src/test/java/org/elasticsearch/action/admin/indices/datastream/GetDataStreamsRequestTests.java create mode 100644 server/src/test/java/org/elasticsearch/action/admin/indices/datastream/GetDataStreamsResponseTests.java create mode 100644 server/src/test/java/org/elasticsearch/cluster/metadata/DataStreamTests.java diff --git a/build.gradle b/build.gradle index 53749c2a21c46..edc37d47b9f74 100644 --- a/build.gradle +++ b/build.gradle @@ -554,6 +554,7 @@ subprojects { testClusters.all { if (org.elasticsearch.gradle.info.BuildParams.isSnapshotBuild() == false) { systemProperty 'es.itv2_feature_flag_registered', 'true' + systemProperty 'es.datastreams_feature_flag_registered', 'true' } } } diff --git a/client/rest-high-level/src/test/java/org/elasticsearch/client/RestHighLevelClientTests.java b/client/rest-high-level/src/test/java/org/elasticsearch/client/RestHighLevelClientTests.java index 2dd4a9d02128e..440b979d3509a 100644 --- a/client/rest-high-level/src/test/java/org/elasticsearch/client/RestHighLevelClientTests.java +++ b/client/rest-high-level/src/test/java/org/elasticsearch/client/RestHighLevelClientTests.java @@ -798,7 +798,10 @@ public void testApiNamingConventions() throws Exception { "scripts_painless_execute", "cluster.put_component_template", "cluster.get_component_template", - "cluster.delete_component_template" + "cluster.delete_component_template", + "indices.create_data_stream", + "indices.get_data_streams", + "indices.delete_data_stream" }; //These API are not required for high-level client feature completeness String[] notRequiredApi = new String[] { diff --git a/rest-api-spec/src/main/resources/rest-api-spec/api/indices.create_data_stream.json b/rest-api-spec/src/main/resources/rest-api-spec/api/indices.create_data_stream.json new file mode 100644 index 0000000000000..ef8615a69b1ca --- /dev/null +++ b/rest-api-spec/src/main/resources/rest-api-spec/api/indices.create_data_stream.json @@ -0,0 +1,31 @@ +{ + "indices.create_data_stream":{ + "documentation":{ + "url":"https://www.elastic.co/guide/en/elasticsearch/reference/master/data-streams.html", + "description":"Creates or updates a data stream" + }, + "stability":"experimental", + "url":{ + "paths":[ + { + "path":"/_data_stream/{name}", + "methods":[ + "PUT" + ], + "parts":{ + "name":{ + "type":"string", + "description":"The name of the data stream" + } + } + } + ] + }, + "params":{ + }, + "body":{ + "description":"The data stream definition", + "required":true + } + } +} diff --git a/rest-api-spec/src/main/resources/rest-api-spec/api/indices.delete_data_stream.json b/rest-api-spec/src/main/resources/rest-api-spec/api/indices.delete_data_stream.json new file mode 100644 index 0000000000000..71ed5808caefc --- /dev/null +++ b/rest-api-spec/src/main/resources/rest-api-spec/api/indices.delete_data_stream.json @@ -0,0 +1,26 @@ +{ + "indices.delete_data_stream":{ + "documentation":{ + "url":"https://www.elastic.co/guide/en/elasticsearch/reference/master/data-streams.html", + "description":"Deletes a data stream." + }, + "stability":"experimental", + "url":{ + "paths":[ + { + "path":"/_data_stream/{name}", + "methods":[ + "DELETE" + ], + "parts":{ + "name":{ + "type":"string", + "description":"The name of the data stream" + } + } + } + ] + }, + "params":{} + } +} diff --git a/rest-api-spec/src/main/resources/rest-api-spec/api/indices.get_data_streams.json b/rest-api-spec/src/main/resources/rest-api-spec/api/indices.get_data_streams.json new file mode 100644 index 0000000000000..42415068d4a5d --- /dev/null +++ b/rest-api-spec/src/main/resources/rest-api-spec/api/indices.get_data_streams.json @@ -0,0 +1,33 @@ +{ + "indices.get_data_streams":{ + "documentation":{ + "url":"https://www.elastic.co/guide/en/elasticsearch/reference/master/data-streams.html", + "description":"Returns data streams." + }, + "stability":"experimental", + "url":{ + "paths":[ + { + "path":"/_data_streams", + "methods":[ + "GET" + ] + }, + { + "path":"/_data_streams/{name}", + "methods":[ + "GET" + ], + "parts":{ + "name":{ + "type":"list", + "description":"The comma separated names of data streams" + } + } + } + ] + }, + "params":{ + } + } +} diff --git a/rest-api-spec/src/main/resources/rest-api-spec/test/indices.data_stream/10_basic.yml b/rest-api-spec/src/main/resources/rest-api-spec/test/indices.data_stream/10_basic.yml new file mode 100644 index 0000000000000..49754005b6db5 --- /dev/null +++ b/rest-api-spec/src/main/resources/rest-api-spec/test/indices.data_stream/10_basic.yml @@ -0,0 +1,26 @@ +--- +"Test stubs": + - skip: + version: " - 7.99.99" + reason: not backported yet + + - do: + indices.create_data_stream: + name: data-stream2 + body: + timestamp_field: "@timestamp" + - is_true: acknowledged + + - do: + indices.get_data_streams: {} + - match: { 0.name: my_data_stream1 } + - match: { 0.timestamp_field: '@timestamp' } + - match: { 0.indices: ['my_data_stream1-000000'] } + - match: { 1.name: my_data_stream2 } + - match: { 1.timestamp_field: '@timestamp' } + - match: { 1.indices: [] } + + - do: + indices.delete_data_stream: + name: data-stream2 + - is_true: acknowledged diff --git a/server/src/main/java/org/elasticsearch/action/ActionModule.java b/server/src/main/java/org/elasticsearch/action/ActionModule.java index 430271a76c9eb..d3ba8b6eb4611 100644 --- a/server/src/main/java/org/elasticsearch/action/ActionModule.java +++ b/server/src/main/java/org/elasticsearch/action/ActionModule.java @@ -28,6 +28,9 @@ import org.elasticsearch.action.admin.cluster.configuration.ClearVotingConfigExclusionsAction; import org.elasticsearch.action.admin.cluster.configuration.TransportAddVotingConfigExclusionsAction; import org.elasticsearch.action.admin.cluster.configuration.TransportClearVotingConfigExclusionsAction; +import org.elasticsearch.action.admin.indices.datastream.DeleteDataStreamAction; +import org.elasticsearch.action.admin.indices.datastream.GetDataStreamsAction; +import org.elasticsearch.action.admin.indices.datastream.CreateDataStreamAction; import org.elasticsearch.action.admin.cluster.health.ClusterHealthAction; import org.elasticsearch.action.admin.cluster.health.TransportClusterHealthAction; import org.elasticsearch.action.admin.cluster.node.hotthreads.NodesHotThreadsAction; @@ -251,9 +254,11 @@ import org.elasticsearch.rest.action.admin.cluster.RestClusterStatsAction; import org.elasticsearch.rest.action.admin.cluster.RestClusterUpdateSettingsAction; import org.elasticsearch.rest.action.admin.cluster.RestCreateSnapshotAction; +import org.elasticsearch.rest.action.admin.indices.RestDeleteDataStreamAction; import org.elasticsearch.rest.action.admin.cluster.RestDeleteRepositoryAction; import org.elasticsearch.rest.action.admin.cluster.RestDeleteSnapshotAction; import org.elasticsearch.rest.action.admin.cluster.RestDeleteStoredScriptAction; +import org.elasticsearch.rest.action.admin.indices.RestGetDataStreamsAction; import org.elasticsearch.rest.action.admin.cluster.RestGetRepositoriesAction; import org.elasticsearch.rest.action.admin.cluster.RestGetScriptContextAction; import org.elasticsearch.rest.action.admin.cluster.RestGetScriptLanguageAction; @@ -266,6 +271,7 @@ import org.elasticsearch.rest.action.admin.cluster.RestNodesStatsAction; import org.elasticsearch.rest.action.admin.cluster.RestNodesUsageAction; import org.elasticsearch.rest.action.admin.cluster.RestPendingClusterTasksAction; +import org.elasticsearch.rest.action.admin.indices.RestCreateDataStreamAction; import org.elasticsearch.rest.action.admin.cluster.RestPutRepositoryAction; import org.elasticsearch.rest.action.admin.cluster.RestPutStoredScriptAction; import org.elasticsearch.rest.action.admin.cluster.RestReloadSecureSettingsAction; @@ -388,6 +394,24 @@ public class ActionModule extends AbstractModule { } } + private static final boolean DATASTREAMS_FEATURE_FLAG_REGISTERED; + + static { + final String property = System.getProperty("es.datastreams_feature_flag_registered"); + if (Build.CURRENT.isSnapshot() && property != null) { + throw new IllegalArgumentException("es.datastreams_feature_flag_registered is only supported in non-snapshot builds"); + } + if (Build.CURRENT.isSnapshot() || "true".equals(property)) { + DATASTREAMS_FEATURE_FLAG_REGISTERED = true; + } else if ("false".equals(property) || property == null) { + DATASTREAMS_FEATURE_FLAG_REGISTERED = false; + } else { + throw new IllegalArgumentException( + "expected es.datastreams_feature_flag_registered to be unset or [true|false] but was [" + property + "]" + ); + } + } + private final Settings settings; private final IndexNameExpressionResolver indexNameExpressionResolver; private final IndexScopedSettings indexScopedSettings; @@ -576,6 +600,13 @@ public void reg actionPlugins.stream().flatMap(p -> p.getActions().stream()).forEach(actions::register); + // Data streams: + if (DATASTREAMS_FEATURE_FLAG_REGISTERED) { + actions.register(CreateDataStreamAction.INSTANCE, CreateDataStreamAction.TransportAction.class); + actions.register(DeleteDataStreamAction.INSTANCE, DeleteDataStreamAction.TransportAction.class); + actions.register(GetDataStreamsAction.INSTANCE, GetDataStreamsAction.TransportAction.class); + } + // Persistent tasks: actions.register(StartPersistentTaskAction.INSTANCE, StartPersistentTaskAction.TransportAction.class); actions.register(UpdatePersistentTaskStatusAction.INSTANCE, UpdatePersistentTaskStatusAction.TransportAction.class); @@ -718,6 +749,13 @@ public void initRestHandlers(Supplier nodesInCluster) { registerHandler.accept(new RestDeletePipelineAction()); registerHandler.accept(new RestSimulatePipelineAction()); + // Data Stream API + if (DATASTREAMS_FEATURE_FLAG_REGISTERED) { + registerHandler.accept(new RestCreateDataStreamAction()); + registerHandler.accept(new RestDeleteDataStreamAction()); + registerHandler.accept(new RestGetDataStreamsAction()); + } + // CAT API registerHandler.accept(new RestAllocationAction()); registerHandler.accept(new RestShardsAction()); diff --git a/server/src/main/java/org/elasticsearch/action/admin/indices/datastream/CreateDataStreamAction.java b/server/src/main/java/org/elasticsearch/action/admin/indices/datastream/CreateDataStreamAction.java new file mode 100644 index 0000000000000..df6e829a28af4 --- /dev/null +++ b/server/src/main/java/org/elasticsearch/action/admin/indices/datastream/CreateDataStreamAction.java @@ -0,0 +1,128 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.elasticsearch.action.admin.indices.datastream; + +import org.elasticsearch.action.ActionListener; +import org.elasticsearch.action.ActionRequestValidationException; +import org.elasticsearch.action.ActionType; +import org.elasticsearch.action.support.ActionFilters; +import org.elasticsearch.action.support.master.AcknowledgedResponse; +import org.elasticsearch.action.support.master.MasterNodeRequest; +import org.elasticsearch.action.support.master.TransportMasterNodeAction; +import org.elasticsearch.cluster.ClusterState; +import org.elasticsearch.cluster.block.ClusterBlockException; +import org.elasticsearch.cluster.block.ClusterBlockLevel; +import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver; +import org.elasticsearch.cluster.service.ClusterService; +import org.elasticsearch.common.inject.Inject; +import org.elasticsearch.common.io.stream.StreamInput; +import org.elasticsearch.common.io.stream.StreamOutput; +import org.elasticsearch.tasks.Task; +import org.elasticsearch.threadpool.ThreadPool; +import org.elasticsearch.transport.TransportService; + +import java.io.IOException; +import java.util.Objects; + +public class CreateDataStreamAction extends ActionType { + + public static final CreateDataStreamAction INSTANCE = new CreateDataStreamAction(); + public static final String NAME = "indices:admin/data_stream/create"; + + private CreateDataStreamAction() { + super(NAME, AcknowledgedResponse::new); + } + + public static class Request extends MasterNodeRequest { + + private final String name; + private String timestampFieldName; + + public Request(String name) { + this.name = name; + } + + public void setTimestampFieldName(String timestampFieldName) { + this.timestampFieldName = timestampFieldName; + } + + @Override + public ActionRequestValidationException validate() { + return null; + } + + public Request(StreamInput in) throws IOException { + super(in); + this.name = in.readString(); + this.timestampFieldName = in.readString(); + } + + @Override + public void writeTo(StreamOutput out) throws IOException { + super.writeTo(out); + out.writeString(name); + out.writeString(timestampFieldName); + } + + @Override + public boolean equals(Object o) { + if (this == o) return true; + if (o == null || getClass() != o.getClass()) return false; + Request request = (Request) o; + return name.equals(request.name) && + timestampFieldName.equals(request.timestampFieldName); + } + + @Override + public int hashCode() { + return Objects.hash(name, timestampFieldName); + } + } + + public static class TransportAction extends TransportMasterNodeAction { + + @Inject + public TransportAction(TransportService transportService, ClusterService clusterService, ThreadPool threadPool, + ActionFilters actionFilters, IndexNameExpressionResolver indexNameExpressionResolver) { + super(NAME, transportService, clusterService, threadPool, actionFilters, Request::new, indexNameExpressionResolver); + } + + @Override + protected String executor() { + return ThreadPool.Names.SAME; + } + + @Override + protected AcknowledgedResponse read(StreamInput in) throws IOException { + return new AcknowledgedResponse(in); + } + + @Override + protected void masterOperation(Task task, Request request, ClusterState state, + ActionListener listener) throws Exception { + listener.onResponse(new AcknowledgedResponse(true)); + } + + @Override + protected ClusterBlockException checkBlock(Request request, ClusterState state) { + return state.blocks().globalBlockedException(ClusterBlockLevel.METADATA_WRITE); + } + } + +} diff --git a/server/src/main/java/org/elasticsearch/action/admin/indices/datastream/DeleteDataStreamAction.java b/server/src/main/java/org/elasticsearch/action/admin/indices/datastream/DeleteDataStreamAction.java new file mode 100644 index 0000000000000..20a2ba4aa2cd6 --- /dev/null +++ b/server/src/main/java/org/elasticsearch/action/admin/indices/datastream/DeleteDataStreamAction.java @@ -0,0 +1,120 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.elasticsearch.action.admin.indices.datastream; + +import org.elasticsearch.action.ActionListener; +import org.elasticsearch.action.ActionRequestValidationException; +import org.elasticsearch.action.ActionType; +import org.elasticsearch.action.support.ActionFilters; +import org.elasticsearch.action.support.master.AcknowledgedResponse; +import org.elasticsearch.action.support.master.MasterNodeRequest; +import org.elasticsearch.action.support.master.TransportMasterNodeAction; +import org.elasticsearch.cluster.ClusterState; +import org.elasticsearch.cluster.block.ClusterBlockException; +import org.elasticsearch.cluster.block.ClusterBlockLevel; +import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver; +import org.elasticsearch.cluster.service.ClusterService; +import org.elasticsearch.common.inject.Inject; +import org.elasticsearch.common.io.stream.StreamInput; +import org.elasticsearch.common.io.stream.StreamOutput; +import org.elasticsearch.tasks.Task; +import org.elasticsearch.threadpool.ThreadPool; +import org.elasticsearch.transport.TransportService; + +import java.io.IOException; +import java.util.Objects; + +public class DeleteDataStreamAction extends ActionType { + + public static final DeleteDataStreamAction INSTANCE = new DeleteDataStreamAction(); + public static final String NAME = "indices:admin/data_stream/delete"; + + private DeleteDataStreamAction() { + super(NAME, AcknowledgedResponse::new); + } + + public static class Request extends MasterNodeRequest { + + private final String name; + + public Request(String name) { + this.name = Objects.requireNonNull(name); + } + + @Override + public ActionRequestValidationException validate() { + return null; + } + + public Request(StreamInput in) throws IOException { + super(in); + this.name = in.readString(); + } + + @Override + public void writeTo(StreamOutput out) throws IOException { + super.writeTo(out); + out.writeString(name); + } + + @Override + public boolean equals(Object o) { + if (this == o) return true; + if (o == null || getClass() != o.getClass()) return false; + Request request = (Request) o; + return name.equals(request.name); + } + + @Override + public int hashCode() { + return Objects.hash(name); + } + } + + public static class TransportAction extends TransportMasterNodeAction { + + @Inject + public TransportAction(TransportService transportService, ClusterService clusterService, ThreadPool threadPool, + ActionFilters actionFilters, IndexNameExpressionResolver indexNameExpressionResolver) { + super(NAME, transportService, clusterService, threadPool, actionFilters, Request::new, indexNameExpressionResolver); + } + + @Override + protected String executor() { + return ThreadPool.Names.SAME; + } + + @Override + protected AcknowledgedResponse read(StreamInput in) throws IOException { + return new AcknowledgedResponse(in); + } + + @Override + protected void masterOperation(Task task, Request request, ClusterState state, + ActionListener listener) throws Exception { + listener.onResponse(new AcknowledgedResponse(true)); + } + + @Override + protected ClusterBlockException checkBlock(Request request, ClusterState state) { + return state.blocks().globalBlockedException(ClusterBlockLevel.METADATA_WRITE); + } + } + +} diff --git a/server/src/main/java/org/elasticsearch/action/admin/indices/datastream/GetDataStreamsAction.java b/server/src/main/java/org/elasticsearch/action/admin/indices/datastream/GetDataStreamsAction.java new file mode 100644 index 0000000000000..1549f056e811f --- /dev/null +++ b/server/src/main/java/org/elasticsearch/action/admin/indices/datastream/GetDataStreamsAction.java @@ -0,0 +1,170 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.elasticsearch.action.admin.indices.datastream; + +import org.elasticsearch.action.ActionListener; +import org.elasticsearch.action.ActionRequestValidationException; +import org.elasticsearch.action.ActionResponse; +import org.elasticsearch.action.ActionType; +import org.elasticsearch.action.support.ActionFilters; +import org.elasticsearch.action.support.master.MasterNodeReadRequest; +import org.elasticsearch.action.support.master.TransportMasterNodeReadAction; +import org.elasticsearch.cluster.ClusterState; +import org.elasticsearch.cluster.block.ClusterBlockException; +import org.elasticsearch.cluster.block.ClusterBlockLevel; +import org.elasticsearch.cluster.metadata.DataStream; +import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver; +import org.elasticsearch.cluster.service.ClusterService; +import org.elasticsearch.common.inject.Inject; +import org.elasticsearch.common.io.stream.StreamInput; +import org.elasticsearch.common.io.stream.StreamOutput; +import org.elasticsearch.common.xcontent.ToXContentObject; +import org.elasticsearch.common.xcontent.XContentBuilder; +import org.elasticsearch.tasks.Task; +import org.elasticsearch.threadpool.ThreadPool; +import org.elasticsearch.transport.TransportService; + +import java.io.IOException; +import java.util.Arrays; +import java.util.List; +import java.util.Objects; + +public class GetDataStreamsAction extends ActionType { + + public static final GetDataStreamsAction INSTANCE = new GetDataStreamsAction(); + public static final String NAME = "indices:admin/data_stream/get"; + + private GetDataStreamsAction() { + super(NAME, Response::new); + } + + public static class Request extends MasterNodeReadRequest { + + private final String[] names; + + public Request(String[] names) { + this.names = Objects.requireNonNull(names); + } + + @Override + public ActionRequestValidationException validate() { + return null; + } + + public Request(StreamInput in) throws IOException { + super(in); + this.names = in.readStringArray(); + } + + @Override + public void writeTo(StreamOutput out) throws IOException { + super.writeTo(out); + out.writeStringArray(names); + } + + @Override + public boolean equals(Object o) { + if (this == o) return true; + if (o == null || getClass() != o.getClass()) return false; + Request request = (Request) o; + return Arrays.equals(names, request.names); + } + + @Override + public int hashCode() { + return Arrays.hashCode(names); + } + } + + public static class Response extends ActionResponse implements ToXContentObject { + + private final List dataStreams; + + public Response(List dataStreams) { + this.dataStreams = dataStreams; + } + + public Response(StreamInput in) throws IOException { + this(in.readList(DataStream::new)); + } + + @Override + public void writeTo(StreamOutput out) throws IOException { + out.writeList(dataStreams); + } + + @Override + public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException { + builder.startArray(); + for (DataStream dataStream : dataStreams) { + dataStream.toXContent(builder, params); + } + builder.endArray(); + return builder; + } + + @Override + public boolean equals(Object o) { + if (this == o) return true; + if (o == null || getClass() != o.getClass()) return false; + Response response = (Response) o; + return dataStreams.equals(response.dataStreams); + } + + @Override + public int hashCode() { + return Objects.hash(dataStreams); + } + } + + public static class TransportAction extends TransportMasterNodeReadAction { + + @Inject + public TransportAction(TransportService transportService, ClusterService clusterService, ThreadPool threadPool, + ActionFilters actionFilters, IndexNameExpressionResolver indexNameExpressionResolver) { + super(NAME, transportService, clusterService, threadPool, actionFilters, Request::new, indexNameExpressionResolver); + } + + @Override + protected String executor() { + return ThreadPool.Names.SAME; + } + + @Override + protected Response read(StreamInput in) throws IOException { + return new Response(in); + } + + @Override + protected void masterOperation(Task task, Request request, ClusterState state, + ActionListener listener) throws Exception { + List dataStreams = List.of( + new DataStream("my_data_stream1", "@timestamp", List.of("my_data_stream1-000000")), + new DataStream("my_data_stream2", "@timestamp", List.of()) + ); + listener.onResponse(new Response(dataStreams)); + } + + @Override + protected ClusterBlockException checkBlock(Request request, ClusterState state) { + return state.blocks().globalBlockedException(ClusterBlockLevel.METADATA_WRITE); + } + } + +} diff --git a/server/src/main/java/org/elasticsearch/client/IndicesAdminClient.java b/server/src/main/java/org/elasticsearch/client/IndicesAdminClient.java index 3324d0abcfa3f..c234cf29deeeb 100644 --- a/server/src/main/java/org/elasticsearch/client/IndicesAdminClient.java +++ b/server/src/main/java/org/elasticsearch/client/IndicesAdminClient.java @@ -21,6 +21,9 @@ import org.elasticsearch.action.ActionFuture; import org.elasticsearch.action.ActionListener; +import org.elasticsearch.action.admin.indices.datastream.DeleteDataStreamAction; +import org.elasticsearch.action.admin.indices.datastream.GetDataStreamsAction; +import org.elasticsearch.action.admin.indices.datastream.CreateDataStreamAction; import org.elasticsearch.action.admin.indices.alias.IndicesAliasesRequest; import org.elasticsearch.action.admin.indices.alias.IndicesAliasesRequestBuilder; import org.elasticsearch.action.admin.indices.alias.exists.AliasesExistRequestBuilder; @@ -819,4 +822,33 @@ public interface IndicesAdminClient extends ElasticsearchClient { */ void rolloverIndex(RolloverRequest request, ActionListener listener); + /** + * Store a data stream + */ + void createDataStream(CreateDataStreamAction.Request request, ActionListener listener); + + /** + * Store a data stream + */ + ActionFuture createDataStream(CreateDataStreamAction.Request request); + + /** + * Delete a data stream + */ + void deleteDataStream(DeleteDataStreamAction.Request request, ActionListener listener); + + /** + * Delete a data stream + */ + ActionFuture deleteDataStream(DeleteDataStreamAction.Request request); + + /** + * Get data streams + */ + void getDataStreams(GetDataStreamsAction.Request request, ActionListener listener); + + /** + * Get data streams + */ + ActionFuture getDataStreams(GetDataStreamsAction.Request request); } diff --git a/server/src/main/java/org/elasticsearch/client/support/AbstractClient.java b/server/src/main/java/org/elasticsearch/client/support/AbstractClient.java index 82e3ace2ee706..e53895a55c26e 100644 --- a/server/src/main/java/org/elasticsearch/client/support/AbstractClient.java +++ b/server/src/main/java/org/elasticsearch/client/support/AbstractClient.java @@ -30,6 +30,9 @@ import org.elasticsearch.action.admin.cluster.allocation.ClusterAllocationExplainRequest; import org.elasticsearch.action.admin.cluster.allocation.ClusterAllocationExplainRequestBuilder; import org.elasticsearch.action.admin.cluster.allocation.ClusterAllocationExplainResponse; +import org.elasticsearch.action.admin.indices.datastream.DeleteDataStreamAction; +import org.elasticsearch.action.admin.indices.datastream.GetDataStreamsAction; +import org.elasticsearch.action.admin.indices.datastream.CreateDataStreamAction; import org.elasticsearch.action.admin.cluster.health.ClusterHealthAction; import org.elasticsearch.action.admin.cluster.health.ClusterHealthRequest; import org.elasticsearch.action.admin.cluster.health.ClusterHealthRequestBuilder; @@ -1742,6 +1745,36 @@ public ActionFuture getSettings(GetSettingsRequest request) public void getSettings(GetSettingsRequest request, ActionListener listener) { execute(GetSettingsAction.INSTANCE, request, listener); } + + @Override + public void createDataStream(CreateDataStreamAction.Request request, ActionListener listener) { + execute(CreateDataStreamAction.INSTANCE, request, listener); + } + + @Override + public ActionFuture createDataStream(CreateDataStreamAction.Request request) { + return execute(CreateDataStreamAction.INSTANCE, request); + } + + @Override + public void deleteDataStream(DeleteDataStreamAction.Request request, ActionListener listener) { + execute(DeleteDataStreamAction.INSTANCE, request, listener); + } + + @Override + public ActionFuture deleteDataStream(DeleteDataStreamAction.Request request) { + return execute(DeleteDataStreamAction.INSTANCE, request); + } + + @Override + public void getDataStreams(GetDataStreamsAction.Request request, ActionListener listener) { + execute(GetDataStreamsAction.INSTANCE, request, listener); + } + + @Override + public ActionFuture getDataStreams(GetDataStreamsAction.Request request) { + return execute(GetDataStreamsAction.INSTANCE, request); + } } @Override diff --git a/server/src/main/java/org/elasticsearch/cluster/metadata/DataStream.java b/server/src/main/java/org/elasticsearch/cluster/metadata/DataStream.java new file mode 100644 index 0000000000000..f6bba191a173b --- /dev/null +++ b/server/src/main/java/org/elasticsearch/cluster/metadata/DataStream.java @@ -0,0 +1,116 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.elasticsearch.cluster.metadata; + +import org.elasticsearch.cluster.AbstractDiffable; +import org.elasticsearch.cluster.Diff; +import org.elasticsearch.common.ParseField; +import org.elasticsearch.common.io.stream.StreamInput; +import org.elasticsearch.common.io.stream.StreamOutput; +import org.elasticsearch.common.xcontent.ConstructingObjectParser; +import org.elasticsearch.common.xcontent.ToXContentObject; +import org.elasticsearch.common.xcontent.XContentBuilder; +import org.elasticsearch.common.xcontent.XContentParser; + +import java.io.IOException; +import java.util.List; +import java.util.Objects; + +public final class DataStream extends AbstractDiffable implements ToXContentObject { + + private final String name; + private final String timeStampField; + private final List indices; + + public DataStream(String name, String timeStampField, List indices) { + this.name = name; + this.timeStampField = timeStampField; + this.indices = indices; + } + + public String getName() { + return name; + } + + public String getTimeStampField() { + return timeStampField; + } + + public List getIndices() { + return indices; + } + + public DataStream(StreamInput in) throws IOException { + this(in.readString(), in.readString(), in.readStringList()); + } + + public static Diff readDiffFrom(StreamInput in) throws IOException { + return readDiffFrom(DataStream::new, in); + } + + @Override + public void writeTo(StreamOutput out) throws IOException { + out.writeString(name); + out.writeString(timeStampField); + out.writeStringCollection(indices); + } + + public static final ParseField NAME_FIELD = new ParseField("name"); + public static final ParseField TIMESTAMP_FIELD_FIELD = new ParseField("timestamp_field"); + public static final ParseField INDICES_FIELD = new ParseField("indices"); + + @SuppressWarnings("unchecked") + private static final ConstructingObjectParser PARSER = new ConstructingObjectParser<>("data_stream", + args -> new DataStream((String) args[0], (String) args[1], (List) args[2])); + + static { + PARSER.declareString(ConstructingObjectParser.constructorArg(), NAME_FIELD); + PARSER.declareString(ConstructingObjectParser.constructorArg(), TIMESTAMP_FIELD_FIELD); + PARSER.declareStringArray(ConstructingObjectParser.constructorArg(), INDICES_FIELD); + } + + public static DataStream fromXContent(XContentParser parser) throws IOException { + return PARSER.parse(parser, null); + } + + @Override + public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException { + builder.startObject(); + builder.field(NAME_FIELD.getPreferredName(), name); + builder.field(TIMESTAMP_FIELD_FIELD.getPreferredName(), timeStampField); + builder.field(INDICES_FIELD.getPreferredName(), indices); + builder.endObject(); + return builder; + } + + @Override + public boolean equals(Object o) { + if (this == o) return true; + if (o == null || getClass() != o.getClass()) return false; + DataStream that = (DataStream) o; + return name.equals(that.name) && + timeStampField.equals(that.timeStampField) && + indices.equals(that.indices); + } + + @Override + public int hashCode() { + return Objects.hash(name, timeStampField, indices); + } +} diff --git a/server/src/main/java/org/elasticsearch/rest/action/admin/indices/RestCreateDataStreamAction.java b/server/src/main/java/org/elasticsearch/rest/action/admin/indices/RestCreateDataStreamAction.java new file mode 100644 index 0000000000000..67b67677195dc --- /dev/null +++ b/server/src/main/java/org/elasticsearch/rest/action/admin/indices/RestCreateDataStreamAction.java @@ -0,0 +1,58 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.elasticsearch.rest.action.admin.indices; + +import org.elasticsearch.action.admin.indices.datastream.CreateDataStreamAction; +import org.elasticsearch.client.node.NodeClient; +import org.elasticsearch.cluster.metadata.DataStream; +import org.elasticsearch.rest.BaseRestHandler; +import org.elasticsearch.rest.RestRequest; +import org.elasticsearch.rest.action.RestToXContentListener; + +import java.io.IOException; +import java.util.List; +import java.util.Map; + +public class RestCreateDataStreamAction extends BaseRestHandler { + + @Override + public String getName() { + return "create_data_stream_action"; + } + + @Override + public List routes() { + return List.of( + new Route(RestRequest.Method.PUT, "/_data_stream/{name}") + ); + } + + @Override + protected RestChannelConsumer prepareRequest(RestRequest request, NodeClient client) throws IOException { + CreateDataStreamAction.Request putDataStreamRequest = new CreateDataStreamAction.Request(request.param("name")); + request.withContentOrSourceParamParserOrNull(parser -> { + Map body = parser.map(); + String timeStampFieldName = (String) body.get(DataStream.TIMESTAMP_FIELD_FIELD.getPreferredName()); + if (timeStampFieldName != null) { + putDataStreamRequest.setTimestampFieldName(timeStampFieldName); + } + }); + return channel -> client.admin().indices().createDataStream(putDataStreamRequest, new RestToXContentListener<>(channel)); + } +} diff --git a/server/src/main/java/org/elasticsearch/rest/action/admin/indices/RestDeleteDataStreamAction.java b/server/src/main/java/org/elasticsearch/rest/action/admin/indices/RestDeleteDataStreamAction.java new file mode 100644 index 0000000000000..b69ee981d12b3 --- /dev/null +++ b/server/src/main/java/org/elasticsearch/rest/action/admin/indices/RestDeleteDataStreamAction.java @@ -0,0 +1,47 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.elasticsearch.rest.action.admin.indices; + +import org.elasticsearch.action.admin.indices.datastream.DeleteDataStreamAction; +import org.elasticsearch.client.node.NodeClient; +import org.elasticsearch.rest.BaseRestHandler; +import org.elasticsearch.rest.RestRequest; +import org.elasticsearch.rest.action.RestToXContentListener; + +import java.io.IOException; +import java.util.List; + +public class RestDeleteDataStreamAction extends BaseRestHandler { + + @Override + public String getName() { + return "delete_data_stream_action"; + } + + @Override + public List routes() { + return List.of(new Route(RestRequest.Method.DELETE, "/_data_stream/{name}")); + } + + @Override + protected RestChannelConsumer prepareRequest(RestRequest request, NodeClient client) throws IOException { + DeleteDataStreamAction.Request deleteDataStreamRequest = new DeleteDataStreamAction.Request(request.param("name")); + return channel -> client.admin().indices().deleteDataStream(deleteDataStreamRequest, new RestToXContentListener<>(channel)); + } +} diff --git a/server/src/main/java/org/elasticsearch/rest/action/admin/indices/RestGetDataStreamsAction.java b/server/src/main/java/org/elasticsearch/rest/action/admin/indices/RestGetDataStreamsAction.java new file mode 100644 index 0000000000000..fdbe63829882e --- /dev/null +++ b/server/src/main/java/org/elasticsearch/rest/action/admin/indices/RestGetDataStreamsAction.java @@ -0,0 +1,52 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.elasticsearch.rest.action.admin.indices; + +import org.elasticsearch.action.admin.indices.datastream.GetDataStreamsAction; +import org.elasticsearch.client.node.NodeClient; +import org.elasticsearch.common.Strings; +import org.elasticsearch.rest.BaseRestHandler; +import org.elasticsearch.rest.RestRequest; +import org.elasticsearch.rest.action.RestToXContentListener; + +import java.io.IOException; +import java.util.List; + +public class RestGetDataStreamsAction extends BaseRestHandler { + + @Override + public String getName() { + return "get_data_streams_action"; + } + + @Override + public List routes() { + return List.of( + new Route(RestRequest.Method.GET, "/_data_streams"), + new Route(RestRequest.Method.GET, "/_data_streams/{name}") + ); + } + + @Override + protected RestChannelConsumer prepareRequest(RestRequest request, NodeClient client) throws IOException { + String[] names = Strings.splitStringByCommaToArray(request.param("name")); + GetDataStreamsAction.Request getDataStreamsRequest = new GetDataStreamsAction.Request(names); + return channel -> client.admin().indices().getDataStreams(getDataStreamsRequest, new RestToXContentListener<>(channel)); + } +} diff --git a/server/src/test/java/org/elasticsearch/action/admin/indices/datastream/CreateDataStreamRequestTests.java b/server/src/test/java/org/elasticsearch/action/admin/indices/datastream/CreateDataStreamRequestTests.java new file mode 100644 index 0000000000000..d6a846c205fb3 --- /dev/null +++ b/server/src/test/java/org/elasticsearch/action/admin/indices/datastream/CreateDataStreamRequestTests.java @@ -0,0 +1,38 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.elasticsearch.action.admin.indices.datastream; + +import org.elasticsearch.action.admin.indices.datastream.CreateDataStreamAction.Request; +import org.elasticsearch.common.io.stream.Writeable; +import org.elasticsearch.test.AbstractWireSerializingTestCase; + +public class CreateDataStreamRequestTests extends AbstractWireSerializingTestCase { + + @Override + protected Writeable.Reader instanceReader() { + return Request::new; + } + + @Override + protected Request createTestInstance() { + Request request = new Request(randomAlphaOfLength(8)); + request.setTimestampFieldName(randomAlphaOfLength(8)); + return request; + } +} diff --git a/server/src/test/java/org/elasticsearch/action/admin/indices/datastream/DeleteDataStreamRequestTests.java b/server/src/test/java/org/elasticsearch/action/admin/indices/datastream/DeleteDataStreamRequestTests.java new file mode 100644 index 0000000000000..f460065699795 --- /dev/null +++ b/server/src/test/java/org/elasticsearch/action/admin/indices/datastream/DeleteDataStreamRequestTests.java @@ -0,0 +1,36 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.elasticsearch.action.admin.indices.datastream; + +import org.elasticsearch.action.admin.indices.datastream.DeleteDataStreamAction.Request; +import org.elasticsearch.common.io.stream.Writeable; +import org.elasticsearch.test.AbstractWireSerializingTestCase; + +public class DeleteDataStreamRequestTests extends AbstractWireSerializingTestCase { + + @Override + protected Writeable.Reader instanceReader() { + return Request::new; + } + + @Override + protected Request createTestInstance() { + return new Request(randomAlphaOfLength(8)); + } +} diff --git a/server/src/test/java/org/elasticsearch/action/admin/indices/datastream/GetDataStreamsRequestTests.java b/server/src/test/java/org/elasticsearch/action/admin/indices/datastream/GetDataStreamsRequestTests.java new file mode 100644 index 0000000000000..062bdef629cc9 --- /dev/null +++ b/server/src/test/java/org/elasticsearch/action/admin/indices/datastream/GetDataStreamsRequestTests.java @@ -0,0 +1,36 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.elasticsearch.action.admin.indices.datastream; + +import org.elasticsearch.action.admin.indices.datastream.GetDataStreamsAction.Request; +import org.elasticsearch.common.io.stream.Writeable; +import org.elasticsearch.test.AbstractWireSerializingTestCase; + +public class GetDataStreamsRequestTests extends AbstractWireSerializingTestCase { + + @Override + protected Writeable.Reader instanceReader() { + return Request::new; + } + + @Override + protected Request createTestInstance() { + return new Request(generateRandomStringArray(8, 8, false)); + } +} diff --git a/server/src/test/java/org/elasticsearch/action/admin/indices/datastream/GetDataStreamsResponseTests.java b/server/src/test/java/org/elasticsearch/action/admin/indices/datastream/GetDataStreamsResponseTests.java new file mode 100644 index 0000000000000..5c74c515634cc --- /dev/null +++ b/server/src/test/java/org/elasticsearch/action/admin/indices/datastream/GetDataStreamsResponseTests.java @@ -0,0 +1,60 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.elasticsearch.action.admin.indices.datastream; + +import org.elasticsearch.action.admin.indices.datastream.GetDataStreamsAction.Response; +import org.elasticsearch.cluster.metadata.DataStream; +import org.elasticsearch.common.io.stream.Writeable; +import org.elasticsearch.common.xcontent.XContentParser; +import org.elasticsearch.common.xcontent.XContentParser.Token; +import org.elasticsearch.test.AbstractSerializingTestCase; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; + +public class GetDataStreamsResponseTests extends AbstractSerializingTestCase { + + @Override + protected Writeable.Reader instanceReader() { + return Response::new; + } + + @Override + protected Response doParseInstance(XContentParser parser) throws IOException { + List dataStreams = new ArrayList<>(); + for (Token token = parser.nextToken(); token != Token.END_ARRAY; token = parser.nextToken()) { + if (token == Token.START_OBJECT) { + dataStreams.add(DataStream.fromXContent(parser)); + } + } + return new Response(dataStreams); + } + + @Override + protected Response createTestInstance() { + int numDataStreams = randomIntBetween(0, 8); + List dataStreams = new ArrayList<>(); + for (int i = 0; i < numDataStreams; i++) { + dataStreams.add(new DataStream(randomAlphaOfLength(4), randomAlphaOfLength(4), + List.of(generateRandomStringArray(8, 4, false)))); + } + return new Response(dataStreams); + } +} diff --git a/server/src/test/java/org/elasticsearch/cluster/metadata/DataStreamTests.java b/server/src/test/java/org/elasticsearch/cluster/metadata/DataStreamTests.java new file mode 100644 index 0000000000000..072165ab098ef --- /dev/null +++ b/server/src/test/java/org/elasticsearch/cluster/metadata/DataStreamTests.java @@ -0,0 +1,50 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.elasticsearch.cluster.metadata; + +import org.elasticsearch.common.io.stream.Writeable; +import org.elasticsearch.common.xcontent.XContentParser; +import org.elasticsearch.test.AbstractSerializingTestCase; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; + +public class DataStreamTests extends AbstractSerializingTestCase { + + @Override + protected DataStream doParseInstance(XContentParser parser) throws IOException { + return DataStream.fromXContent(parser); + } + + @Override + protected Writeable.Reader instanceReader() { + return DataStream::new; + } + + @Override + protected DataStream createTestInstance() { + int numIndices = randomIntBetween(0, 128); + List indices = new ArrayList<>(numIndices); + for (int i = 0; i < numIndices; i++) { + indices.add(randomAlphaOfLength(10)); + } + return new DataStream(randomAlphaOfLength(10), randomAlphaOfLength(10), indices); + } +} diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/security/authz/privilege/ClusterPrivilegeResolver.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/security/authz/privilege/ClusterPrivilegeResolver.java index 2c40e784cbbbb..5b3741463394f 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/security/authz/privilege/ClusterPrivilegeResolver.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/security/authz/privilege/ClusterPrivilegeResolver.java @@ -56,7 +56,7 @@ public class ClusterPrivilegeResolver { private static final Set MONITOR_WATCHER_PATTERN = Collections.singleton("cluster:monitor/xpack/watcher/*"); private static final Set MONITOR_ROLLUP_PATTERN = Collections.singleton("cluster:monitor/xpack/rollup/*"); private static final Set ALL_CLUSTER_PATTERN = Collections.unmodifiableSet( - Sets.newHashSet("cluster:*", "indices:admin/template/*")); + Sets.newHashSet("cluster:*", "indices:admin/template/*", "indices:admin/data_stream/*")); private static final Set MANAGE_ML_PATTERN = Collections.unmodifiableSet( Sets.newHashSet("cluster:admin/xpack/ml/*", "cluster:monitor/xpack/ml/*")); private static final Set MANAGE_TRANSFORM_PATTERN = Collections.unmodifiableSet( @@ -205,7 +205,10 @@ public static Set names() { } public static boolean isClusterAction(String actionName) { - return actionName.startsWith("cluster:") || actionName.startsWith("indices:admin/template/"); + return actionName.startsWith("cluster:") || + actionName.startsWith("indices:admin/template/") || + // todo: hack until we implement security of data_streams + actionName.startsWith("indices:admin/data_stream/"); } private static String actionToPattern(String text) { From d6eed292f5cf4bdfbc859633078ad5cc60aff08d Mon Sep 17 00:00:00 2001 From: Martijn van Groningen Date: Mon, 23 Mar 2020 10:40:54 +0100 Subject: [PATCH 2/3] fixed compile errors after cherry-picking from master --- .../indices/datastream/CreateDataStreamAction.java | 3 +-- .../indices/datastream/DeleteDataStreamAction.java | 3 +-- .../admin/indices/datastream/GetDataStreamsAction.java | 10 +++++----- .../admin/indices/RestCreateDataStreamAction.java | 3 ++- .../admin/indices/RestDeleteDataStreamAction.java | 3 ++- .../action/admin/indices/RestGetDataStreamsAction.java | 6 +++--- .../datastream/GetDataStreamsResponseTests.java | 3 ++- 7 files changed, 16 insertions(+), 15 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/action/admin/indices/datastream/CreateDataStreamAction.java b/server/src/main/java/org/elasticsearch/action/admin/indices/datastream/CreateDataStreamAction.java index df6e829a28af4..08e7978f39f7f 100644 --- a/server/src/main/java/org/elasticsearch/action/admin/indices/datastream/CreateDataStreamAction.java +++ b/server/src/main/java/org/elasticsearch/action/admin/indices/datastream/CreateDataStreamAction.java @@ -33,7 +33,6 @@ import org.elasticsearch.common.inject.Inject; import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamOutput; -import org.elasticsearch.tasks.Task; import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.transport.TransportService; @@ -114,7 +113,7 @@ protected AcknowledgedResponse read(StreamInput in) throws IOException { } @Override - protected void masterOperation(Task task, Request request, ClusterState state, + protected void masterOperation(Request request, ClusterState state, ActionListener listener) throws Exception { listener.onResponse(new AcknowledgedResponse(true)); } diff --git a/server/src/main/java/org/elasticsearch/action/admin/indices/datastream/DeleteDataStreamAction.java b/server/src/main/java/org/elasticsearch/action/admin/indices/datastream/DeleteDataStreamAction.java index 20a2ba4aa2cd6..3443199381d38 100644 --- a/server/src/main/java/org/elasticsearch/action/admin/indices/datastream/DeleteDataStreamAction.java +++ b/server/src/main/java/org/elasticsearch/action/admin/indices/datastream/DeleteDataStreamAction.java @@ -33,7 +33,6 @@ import org.elasticsearch.common.inject.Inject; import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamOutput; -import org.elasticsearch.tasks.Task; import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.transport.TransportService; @@ -106,7 +105,7 @@ protected AcknowledgedResponse read(StreamInput in) throws IOException { } @Override - protected void masterOperation(Task task, Request request, ClusterState state, + protected void masterOperation(Request request, ClusterState state, ActionListener listener) throws Exception { listener.onResponse(new AcknowledgedResponse(true)); } diff --git a/server/src/main/java/org/elasticsearch/action/admin/indices/datastream/GetDataStreamsAction.java b/server/src/main/java/org/elasticsearch/action/admin/indices/datastream/GetDataStreamsAction.java index 1549f056e811f..8c9be4442a9ee 100644 --- a/server/src/main/java/org/elasticsearch/action/admin/indices/datastream/GetDataStreamsAction.java +++ b/server/src/main/java/org/elasticsearch/action/admin/indices/datastream/GetDataStreamsAction.java @@ -36,12 +36,12 @@ import org.elasticsearch.common.io.stream.StreamOutput; import org.elasticsearch.common.xcontent.ToXContentObject; import org.elasticsearch.common.xcontent.XContentBuilder; -import org.elasticsearch.tasks.Task; import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.transport.TransportService; import java.io.IOException; import java.util.Arrays; +import java.util.Collections; import java.util.List; import java.util.Objects; @@ -152,11 +152,11 @@ protected Response read(StreamInput in) throws IOException { } @Override - protected void masterOperation(Task task, Request request, ClusterState state, + protected void masterOperation(Request request, ClusterState state, ActionListener listener) throws Exception { - List dataStreams = List.of( - new DataStream("my_data_stream1", "@timestamp", List.of("my_data_stream1-000000")), - new DataStream("my_data_stream2", "@timestamp", List.of()) + List dataStreams = Arrays.asList( + new DataStream("my_data_stream1", "@timestamp", Collections.singletonList("my_data_stream1-000000")), + new DataStream("my_data_stream2", "@timestamp", Collections.emptyList()) ); listener.onResponse(new Response(dataStreams)); } diff --git a/server/src/main/java/org/elasticsearch/rest/action/admin/indices/RestCreateDataStreamAction.java b/server/src/main/java/org/elasticsearch/rest/action/admin/indices/RestCreateDataStreamAction.java index 67b67677195dc..7e80c227b55ca 100644 --- a/server/src/main/java/org/elasticsearch/rest/action/admin/indices/RestCreateDataStreamAction.java +++ b/server/src/main/java/org/elasticsearch/rest/action/admin/indices/RestCreateDataStreamAction.java @@ -26,6 +26,7 @@ import org.elasticsearch.rest.action.RestToXContentListener; import java.io.IOException; +import java.util.Collections; import java.util.List; import java.util.Map; @@ -38,7 +39,7 @@ public String getName() { @Override public List routes() { - return List.of( + return Collections.singletonList( new Route(RestRequest.Method.PUT, "/_data_stream/{name}") ); } diff --git a/server/src/main/java/org/elasticsearch/rest/action/admin/indices/RestDeleteDataStreamAction.java b/server/src/main/java/org/elasticsearch/rest/action/admin/indices/RestDeleteDataStreamAction.java index b69ee981d12b3..bbbf731984441 100644 --- a/server/src/main/java/org/elasticsearch/rest/action/admin/indices/RestDeleteDataStreamAction.java +++ b/server/src/main/java/org/elasticsearch/rest/action/admin/indices/RestDeleteDataStreamAction.java @@ -25,6 +25,7 @@ import org.elasticsearch.rest.action.RestToXContentListener; import java.io.IOException; +import java.util.Collections; import java.util.List; public class RestDeleteDataStreamAction extends BaseRestHandler { @@ -36,7 +37,7 @@ public String getName() { @Override public List routes() { - return List.of(new Route(RestRequest.Method.DELETE, "/_data_stream/{name}")); + return Collections.singletonList(new Route(RestRequest.Method.DELETE, "/_data_stream/{name}")); } @Override diff --git a/server/src/main/java/org/elasticsearch/rest/action/admin/indices/RestGetDataStreamsAction.java b/server/src/main/java/org/elasticsearch/rest/action/admin/indices/RestGetDataStreamsAction.java index fdbe63829882e..3dd1f67060c1f 100644 --- a/server/src/main/java/org/elasticsearch/rest/action/admin/indices/RestGetDataStreamsAction.java +++ b/server/src/main/java/org/elasticsearch/rest/action/admin/indices/RestGetDataStreamsAction.java @@ -26,6 +26,7 @@ import org.elasticsearch.rest.action.RestToXContentListener; import java.io.IOException; +import java.util.Arrays; import java.util.List; public class RestGetDataStreamsAction extends BaseRestHandler { @@ -37,10 +38,9 @@ public String getName() { @Override public List routes() { - return List.of( + return Arrays.asList( new Route(RestRequest.Method.GET, "/_data_streams"), - new Route(RestRequest.Method.GET, "/_data_streams/{name}") - ); + new Route(RestRequest.Method.GET, "/_data_streams/{name}")); } @Override diff --git a/server/src/test/java/org/elasticsearch/action/admin/indices/datastream/GetDataStreamsResponseTests.java b/server/src/test/java/org/elasticsearch/action/admin/indices/datastream/GetDataStreamsResponseTests.java index 5c74c515634cc..c110def6d806b 100644 --- a/server/src/test/java/org/elasticsearch/action/admin/indices/datastream/GetDataStreamsResponseTests.java +++ b/server/src/test/java/org/elasticsearch/action/admin/indices/datastream/GetDataStreamsResponseTests.java @@ -27,6 +27,7 @@ import java.io.IOException; import java.util.ArrayList; +import java.util.Arrays; import java.util.List; public class GetDataStreamsResponseTests extends AbstractSerializingTestCase { @@ -53,7 +54,7 @@ protected Response createTestInstance() { List dataStreams = new ArrayList<>(); for (int i = 0; i < numDataStreams; i++) { dataStreams.add(new DataStream(randomAlphaOfLength(4), randomAlphaOfLength(4), - List.of(generateRandomStringArray(8, 4, false)))); + Arrays.asList(generateRandomStringArray(8, 4, false)))); } return new Response(dataStreams); } From 40de3e05919f1d90e0ef714f5c7d188139c3c107 Mon Sep 17 00:00:00 2001 From: Martijn van Groningen Date: Mon, 23 Mar 2020 11:29:17 +0100 Subject: [PATCH 3/3] adjusted skip version --- .../rest-api-spec/test/indices.data_stream/10_basic.yml | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/rest-api-spec/src/main/resources/rest-api-spec/test/indices.data_stream/10_basic.yml b/rest-api-spec/src/main/resources/rest-api-spec/test/indices.data_stream/10_basic.yml index 49754005b6db5..035c20c1600bf 100644 --- a/rest-api-spec/src/main/resources/rest-api-spec/test/indices.data_stream/10_basic.yml +++ b/rest-api-spec/src/main/resources/rest-api-spec/test/indices.data_stream/10_basic.yml @@ -1,8 +1,8 @@ --- "Test stubs": - skip: - version: " - 7.99.99" - reason: not backported yet + version: " - 7.6.99" + reason: only available in 7.7+ - do: indices.create_data_stream: