Skip to content

Commit d1df4fd

Browse files
Add logstash system index APIs (elastic#53350)
We want Logstash indices to be system indices, but the logstash service will still need to be able to manage its indices. This PR adds special system index APIs to the logstash plugin so that logstash can manage its pipelines without direct access to the underlying indices. * Add logstash module with dedicated logstash APIs * merge with x-pack plugin * add system index access allowance * Break out serialization tests into distinct classes * Log failures for partial multiget failure * Move LogstashSystemIndexIT to javaRestTest task Co-authored-by: William Brafford <[email protected]>
1 parent 9332a9c commit d1df4fd

27 files changed

+1508
-2
lines changed

x-pack/plugin/logstash/src/main/java/org/elasticsearch/xpack/logstash/Logstash.java

Lines changed: 46 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -7,29 +7,48 @@
77

88
import org.apache.logging.log4j.LogManager;
99
import org.elasticsearch.Version;
10+
import org.elasticsearch.action.ActionRequest;
11+
import org.elasticsearch.action.ActionResponse;
12+
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
1013
import org.elasticsearch.cluster.metadata.IndexTemplateMetadata;
14+
import org.elasticsearch.cluster.node.DiscoveryNodes;
15+
import org.elasticsearch.common.settings.ClusterSettings;
16+
import org.elasticsearch.common.settings.IndexScopedSettings;
1117
import org.elasticsearch.common.inject.Module;
1218
import org.elasticsearch.common.settings.Settings;
19+
import org.elasticsearch.common.settings.SettingsFilter;
1320
import org.elasticsearch.index.mapper.MapperService;
1421
import org.elasticsearch.indices.SystemIndexDescriptor;
1522
import org.elasticsearch.plugins.Plugin;
1623
import org.elasticsearch.plugins.SystemIndexPlugin;
1724
import org.elasticsearch.xpack.core.XPackPlugin;
25+
import org.elasticsearch.rest.RestController;
26+
import org.elasticsearch.rest.RestHandler;
1827
import org.elasticsearch.xpack.core.template.TemplateUtils;
28+
import org.elasticsearch.xpack.logstash.action.DeletePipelineAction;
29+
import org.elasticsearch.xpack.logstash.action.GetPipelineAction;
30+
import org.elasticsearch.xpack.logstash.action.PutPipelineAction;
31+
import org.elasticsearch.xpack.logstash.action.TransportDeletePipelineAction;
32+
import org.elasticsearch.xpack.logstash.action.TransportGetPipelineAction;
33+
import org.elasticsearch.xpack.logstash.action.TransportPutPipelineAction;
34+
import org.elasticsearch.xpack.logstash.rest.RestDeletePipelineAction;
35+
import org.elasticsearch.xpack.logstash.rest.RestGetPipelineAction;
36+
import org.elasticsearch.xpack.logstash.rest.RestPutPipelineAction;
1937

2038
import java.util.ArrayList;
2139
import java.util.Collection;
2240
import java.util.Collections;
2341
import java.util.List;
2442
import java.util.Map;
43+
import java.util.function.Supplier;
2544
import java.util.function.UnaryOperator;
2645

2746
/**
2847
* This class activates/deactivates the logstash modules depending if we're running a node client or transport client
2948
*/
3049
public class Logstash extends Plugin implements SystemIndexPlugin {
3150

32-
private static final String LOGSTASH_CONCRETE_INDEX_NAME = ".logstash";
51+
public static final String LOGSTASH_CONCRETE_INDEX_NAME = ".logstash";
3352
private static final String LOGSTASH_TEMPLATE_FILE_NAME = "logstash-management";
3453
private static final String LOGSTASH_INDEX_TEMPLATE_NAME = ".logstash-management";
3554
private static final String OLD_LOGSTASH_INDEX_NAME = "logstash-index-template";
@@ -43,6 +62,32 @@ public Collection<Module> createGuiceModules() {
4362
return modules;
4463
}
4564

65+
@Override
66+
public List<ActionHandler<? extends ActionRequest, ? extends ActionResponse>> getActions() {
67+
return org.elasticsearch.common.collect.List.of(
68+
new ActionHandler<>(PutPipelineAction.INSTANCE, TransportPutPipelineAction.class),
69+
new ActionHandler<>(GetPipelineAction.INSTANCE, TransportGetPipelineAction.class),
70+
new ActionHandler<>(DeletePipelineAction.INSTANCE, TransportDeletePipelineAction.class)
71+
);
72+
}
73+
74+
@Override
75+
public List<RestHandler> getRestHandlers(
76+
Settings settings,
77+
RestController restController,
78+
ClusterSettings clusterSettings,
79+
IndexScopedSettings indexScopedSettings,
80+
SettingsFilter settingsFilter,
81+
IndexNameExpressionResolver indexNameExpressionResolver,
82+
Supplier<DiscoveryNodes> nodesInCluster
83+
) {
84+
return org.elasticsearch.common.collect.List.of(
85+
new RestPutPipelineAction(),
86+
new RestGetPipelineAction(),
87+
new RestDeletePipelineAction()
88+
);
89+
}
90+
4691
public UnaryOperator<Map<String, IndexTemplateMetadata>> getIndexTemplateMetadataUpgrader() {
4792
return templates -> {
4893
templates.keySet().removeIf(OLD_LOGSTASH_INDEX_NAME::equals);
Lines changed: 102 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,102 @@
1+
/*
2+
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
3+
* or more contributor license agreements. Licensed under the Elastic License;
4+
* you may not use this file except in compliance with the Elastic License.
5+
*/
6+
7+
package org.elasticsearch.xpack.logstash;
8+
9+
import org.elasticsearch.common.ParseField;
10+
import org.elasticsearch.common.xcontent.ConstructingObjectParser;
11+
import org.elasticsearch.common.xcontent.ObjectParser.ValueType;
12+
13+
import java.time.Instant;
14+
import java.util.Arrays;
15+
import java.util.Iterator;
16+
import java.util.Map;
17+
18+
import static org.elasticsearch.common.xcontent.ConstructingObjectParser.constructorArg;
19+
20+
public class Pipeline {
21+
22+
@SuppressWarnings("unchecked")
23+
public static final ConstructingObjectParser<Pipeline, String> PARSER = new ConstructingObjectParser<>(
24+
"pipeline",
25+
true,
26+
(objects, id) -> {
27+
Iterator<Object> iterator = Arrays.asList(objects).iterator();
28+
return new Pipeline(
29+
id,
30+
(Instant) iterator.next(),
31+
(Map<String, Object>) iterator.next(),
32+
(String) iterator.next(),
33+
(String) iterator.next(),
34+
(Map<String, Object>) iterator.next()
35+
);
36+
}
37+
);
38+
39+
public static final ParseField LAST_MODIFIED = new ParseField("last_modified");
40+
public static final ParseField PIPELINE_METADATA = new ParseField("pipeline_metadata");
41+
public static final ParseField USERNAME = new ParseField("username");
42+
public static final ParseField PIPELINE = new ParseField("pipeline");
43+
public static final ParseField PIPELINE_SETTINGS = new ParseField("pipeline_settings");
44+
45+
static {
46+
PARSER.declareField(constructorArg(), (parser, s) -> {
47+
final String instantISOString = parser.text();
48+
return Instant.parse(instantISOString);
49+
}, LAST_MODIFIED, ValueType.STRING);
50+
PARSER.declareObject(constructorArg(), (parser, s) -> parser.map(), PIPELINE_METADATA);
51+
PARSER.declareString(constructorArg(), USERNAME);
52+
PARSER.declareString(constructorArg(), PIPELINE);
53+
PARSER.declareObject(constructorArg(), (parser, s) -> parser.map(), PIPELINE_SETTINGS);
54+
}
55+
56+
private final String id;
57+
private final Instant lastModified;
58+
private final Map<String, Object> pipelineMetadata;
59+
private final String username;
60+
private final String pipeline;
61+
private final Map<String, Object> pipelineSettings;
62+
63+
public Pipeline(
64+
String id,
65+
Instant lastModified,
66+
Map<String, Object> pipelineMetadata,
67+
String username,
68+
String pipeline,
69+
Map<String, Object> pipelineSettings
70+
) {
71+
this.id = id;
72+
this.lastModified = lastModified;
73+
this.pipelineMetadata = pipelineMetadata;
74+
this.username = username;
75+
this.pipeline = pipeline;
76+
this.pipelineSettings = pipelineSettings;
77+
}
78+
79+
public String getId() {
80+
return id;
81+
}
82+
83+
public Instant getLastModified() {
84+
return lastModified;
85+
}
86+
87+
public Map<String, Object> getPipelineMetadata() {
88+
return pipelineMetadata;
89+
}
90+
91+
public String getUsername() {
92+
return username;
93+
}
94+
95+
public String getPipeline() {
96+
return pipeline;
97+
}
98+
99+
public Map<String, Object> getPipelineSettings() {
100+
return pipelineSettings;
101+
}
102+
}
Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,19 @@
1+
/*
2+
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
3+
* or more contributor license agreements. Licensed under the Elastic License;
4+
* you may not use this file except in compliance with the Elastic License.
5+
*/
6+
7+
package org.elasticsearch.xpack.logstash.action;
8+
9+
import org.elasticsearch.action.ActionType;
10+
11+
public class DeletePipelineAction extends ActionType<DeletePipelineResponse> {
12+
13+
public static final String NAME = "cluster:admin/logstash/pipeline/delete";
14+
public static final DeletePipelineAction INSTANCE = new DeletePipelineAction();
15+
16+
private DeletePipelineAction() {
17+
super(NAME, DeletePipelineResponse::new);
18+
}
19+
}
Lines changed: 57 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,57 @@
1+
/*
2+
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
3+
* or more contributor license agreements. Licensed under the Elastic License;
4+
* you may not use this file except in compliance with the Elastic License.
5+
*/
6+
7+
package org.elasticsearch.xpack.logstash.action;
8+
9+
import org.elasticsearch.action.ActionRequest;
10+
import org.elasticsearch.action.ActionRequestValidationException;
11+
import org.elasticsearch.common.io.stream.StreamInput;
12+
import org.elasticsearch.common.io.stream.StreamOutput;
13+
14+
import java.io.IOException;
15+
import java.util.Objects;
16+
17+
public class DeletePipelineRequest extends ActionRequest {
18+
19+
private final String id;
20+
21+
public DeletePipelineRequest(String id) {
22+
this.id = Objects.requireNonNull(id);
23+
}
24+
25+
public DeletePipelineRequest(StreamInput in) throws IOException {
26+
super(in);
27+
this.id = in.readString();
28+
}
29+
30+
public String id() {
31+
return id;
32+
}
33+
34+
@Override
35+
public ActionRequestValidationException validate() {
36+
return null;
37+
}
38+
39+
@Override
40+
public void writeTo(StreamOutput out) throws IOException {
41+
super.writeTo(out);
42+
out.writeString(id);
43+
}
44+
45+
@Override
46+
public boolean equals(Object o) {
47+
if (this == o) return true;
48+
if (o == null || getClass() != o.getClass()) return false;
49+
DeletePipelineRequest that = (DeletePipelineRequest) o;
50+
return Objects.equals(id, that.id);
51+
}
52+
53+
@Override
54+
public int hashCode() {
55+
return Objects.hash(id);
56+
}
57+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,50 @@
1+
/*
2+
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
3+
* or more contributor license agreements. Licensed under the Elastic License;
4+
* you may not use this file except in compliance with the Elastic License.
5+
*/
6+
7+
package org.elasticsearch.xpack.logstash.action;
8+
9+
import org.elasticsearch.action.ActionResponse;
10+
import org.elasticsearch.common.io.stream.StreamInput;
11+
import org.elasticsearch.common.io.stream.StreamOutput;
12+
13+
import java.io.IOException;
14+
import java.util.Objects;
15+
16+
public class DeletePipelineResponse extends ActionResponse {
17+
18+
private final boolean deleted;
19+
20+
public DeletePipelineResponse(boolean deleted) {
21+
this.deleted = deleted;
22+
}
23+
24+
public DeletePipelineResponse(StreamInput in) throws IOException {
25+
super(in);
26+
this.deleted = in.readBoolean();
27+
}
28+
29+
public boolean isDeleted() {
30+
return deleted;
31+
}
32+
33+
@Override
34+
public void writeTo(StreamOutput out) throws IOException {
35+
out.writeBoolean(deleted);
36+
}
37+
38+
@Override
39+
public boolean equals(Object o) {
40+
if (this == o) return true;
41+
if (o == null || getClass() != o.getClass()) return false;
42+
DeletePipelineResponse that = (DeletePipelineResponse) o;
43+
return deleted == that.deleted;
44+
}
45+
46+
@Override
47+
public int hashCode() {
48+
return Objects.hash(deleted);
49+
}
50+
}
Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,19 @@
1+
/*
2+
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
3+
* or more contributor license agreements. Licensed under the Elastic License;
4+
* you may not use this file except in compliance with the Elastic License.
5+
*/
6+
7+
package org.elasticsearch.xpack.logstash.action;
8+
9+
import org.elasticsearch.action.ActionType;
10+
11+
public class GetPipelineAction extends ActionType<GetPipelineResponse> {
12+
13+
public static final String NAME = "cluster:admin/logstash/pipeline/get";
14+
public static final GetPipelineAction INSTANCE = new GetPipelineAction();
15+
16+
private GetPipelineAction() {
17+
super(NAME, GetPipelineResponse::new);
18+
}
19+
}
Lines changed: 58 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,58 @@
1+
/*
2+
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
3+
* or more contributor license agreements. Licensed under the Elastic License;
4+
* you may not use this file except in compliance with the Elastic License.
5+
*/
6+
7+
package org.elasticsearch.xpack.logstash.action;
8+
9+
import org.elasticsearch.action.ActionRequest;
10+
import org.elasticsearch.action.ActionRequestValidationException;
11+
import org.elasticsearch.common.io.stream.StreamInput;
12+
import org.elasticsearch.common.io.stream.StreamOutput;
13+
14+
import java.io.IOException;
15+
import java.util.List;
16+
import java.util.Objects;
17+
18+
public class GetPipelineRequest extends ActionRequest {
19+
20+
private final List<String> ids;
21+
22+
public GetPipelineRequest(List<String> ids) {
23+
this.ids = Objects.requireNonNull(ids);
24+
}
25+
26+
public GetPipelineRequest(StreamInput in) throws IOException {
27+
super(in);
28+
ids = in.readStringList();
29+
}
30+
31+
public List<String> ids() {
32+
return ids;
33+
}
34+
35+
@Override
36+
public void writeTo(StreamOutput out) throws IOException {
37+
super.writeTo(out);
38+
out.writeStringCollection(ids);
39+
}
40+
41+
@Override
42+
public ActionRequestValidationException validate() {
43+
return null;
44+
}
45+
46+
@Override
47+
public boolean equals(Object o) {
48+
if (this == o) return true;
49+
if (o == null || getClass() != o.getClass()) return false;
50+
GetPipelineRequest that = (GetPipelineRequest) o;
51+
return Objects.equals(ids, that.ids);
52+
}
53+
54+
@Override
55+
public int hashCode() {
56+
return Objects.hash(ids);
57+
}
58+
}

0 commit comments

Comments
 (0)