Skip to content

Commit c1d5478

Browse files
committed
Logs Endpoint
1 parent ea35df8 commit c1d5478

File tree

9 files changed

+512
-2
lines changed

9 files changed

+512
-2
lines changed

server/src/main/java/org/elasticsearch/action/ActionModule.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -402,6 +402,7 @@
402402
import org.elasticsearch.rest.action.ingest.RestGetPipelineAction;
403403
import org.elasticsearch.rest.action.ingest.RestPutPipelineAction;
404404
import org.elasticsearch.rest.action.ingest.RestSimulatePipelineAction;
405+
import org.elasticsearch.rest.action.logs.RestLogsAction;
405406
import org.elasticsearch.rest.action.search.RestClearScrollAction;
406407
import org.elasticsearch.rest.action.search.RestCountAction;
407408
import org.elasticsearch.rest.action.search.RestExplainAction;
@@ -870,6 +871,9 @@ public void initRestHandlers(Supplier<DiscoveryNodes> nodesInCluster) {
870871
registerHandler.accept(new RestUpdateDesiredNodesAction());
871872
registerHandler.accept(new RestDeleteDesiredNodesAction());
872873

874+
// Logs API
875+
registerHandler.accept(new RestLogsAction());
876+
873877
for (ActionPlugin plugin : actionPlugins) {
874878
for (RestHandler handler : plugin.getRestHandlers(
875879
settings,

server/src/main/java/org/elasticsearch/action/bulk/BulkRequest.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -129,7 +129,7 @@ public BulkRequest add(DocWriteRequest<?> request) {
129129
/**
130130
* Adds a list of requests to be executed. Either index or delete requests.
131131
*/
132-
public BulkRequest add(Iterable<DocWriteRequest<?>> requests) {
132+
public BulkRequest add(Iterable<? extends DocWriteRequest<?>> requests) {
133133
for (DocWriteRequest<?> request : requests) {
134134
add(request);
135135
}
Lines changed: 235 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,235 @@
1+
/*
2+
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
3+
* or more contributor license agreements. Licensed under the Elastic License
4+
* 2.0 and the Server Side Public License, v 1; you may not use this file except
5+
* in compliance with, at your election, the Elastic License 2.0 or the Server
6+
* Side Public License, v 1.
7+
*/
8+
9+
package org.elasticsearch.rest.action.logs;
10+
11+
import org.apache.logging.log4j.LogManager;
12+
import org.apache.logging.log4j.Logger;
13+
import org.elasticsearch.ElasticsearchException;
14+
import org.elasticsearch.action.ActionListener;
15+
import org.elasticsearch.action.DocWriteRequest;
16+
import org.elasticsearch.action.bulk.BulkItemResponse;
17+
import org.elasticsearch.action.bulk.BulkRequest;
18+
import org.elasticsearch.action.bulk.BulkResponse;
19+
import org.elasticsearch.action.index.IndexRequest;
20+
import org.elasticsearch.client.internal.Requests;
21+
import org.elasticsearch.client.internal.node.NodeClient;
22+
import org.elasticsearch.rest.BaseRestHandler;
23+
import org.elasticsearch.rest.RestRequest;
24+
import org.elasticsearch.rest.RestResponse;
25+
import org.elasticsearch.rest.RestStatus;
26+
import org.elasticsearch.xcontent.XContentBuilder;
27+
import org.elasticsearch.xcontent.XContentParser;
28+
import org.elasticsearch.xcontent.XContentParserConfiguration;
29+
import org.elasticsearch.xcontent.XContentType;
30+
31+
import java.io.BufferedReader;
32+
import java.io.IOException;
33+
import java.io.InputStreamReader;
34+
import java.time.Instant;
35+
import java.util.ArrayList;
36+
import java.util.Arrays;
37+
import java.util.HashMap;
38+
import java.util.List;
39+
import java.util.Map;
40+
41+
import static java.util.function.Predicate.not;
42+
import static org.elasticsearch.rest.RestRequest.Method.POST;
43+
44+
public class RestLogsAction extends BaseRestHandler {
45+
46+
private static final Logger logger = LogManager.getLogger(RestLogsAction.class);
47+
48+
@Override
49+
public String getName() {
50+
return "logs_action";
51+
}
52+
53+
@Override
54+
public List<Route> routes() {
55+
return List.of(
56+
new Route(POST, "/_logs"),
57+
new Route(POST, "/_logs/{data_stream.dataset}"),
58+
new Route(POST, "/_logs/{data_stream.dataset}/{data_stream.namespace}")
59+
);
60+
}
61+
62+
@Override
63+
public boolean mediaTypesValid(RestRequest request) {
64+
return true;
65+
}
66+
67+
@Override
68+
protected RestChannelConsumer prepareRequest(RestRequest request, NodeClient client) throws IOException {
69+
Map<String, Object> globalMetadata = new HashMap<>();
70+
Map<String, String> params = request.params();
71+
params.entrySet()
72+
.stream()
73+
.filter(not(e -> e.getKey().startsWith("_")))
74+
.forEach(e -> addPath(globalMetadata, e.getKey(), request.param(e.getKey())));
75+
76+
List<IndexRequest> indexRequests = new ArrayList<>();
77+
try (BufferedReader reader = new BufferedReader(new InputStreamReader(request.content().streamInput()))) {
78+
Map<String, Object> localMetadata = Map.of();
79+
int i = 0;
80+
for (String line = reader.readLine(); line != null; line = reader.readLine(), i++) {
81+
if (line.isBlank()) {
82+
continue;
83+
}
84+
Map<String, Object> event = null;
85+
if (line.startsWith("{")) {
86+
event = parseJson(line);
87+
}
88+
if (event == null) {
89+
event = Map.of("message", line);
90+
}
91+
if (event.containsKey("_metadata")) {
92+
Map<String, Object> metadata = getMetadata(event);
93+
expandDots(metadata);
94+
if (i == 0) {
95+
globalMetadata.putAll(metadata);
96+
} else {
97+
localMetadata = metadata;
98+
}
99+
} else {
100+
HashMap<String, Object> doc = new HashMap<>(globalMetadata);
101+
// TODO try re-using org.elasticsearch.ingest.common.JsonProcessor.recursiveMerge
102+
doc.putAll(localMetadata);
103+
doc.putAll(event);
104+
expandDots(doc);
105+
if (doc.containsKey("@timestamp") == false) {
106+
String now = Instant.now().toString();
107+
doc.put("@timestamp", now);
108+
}
109+
doc.putIfAbsent("data_stream", new HashMap<>());
110+
@SuppressWarnings("unchecked")
111+
Map<String, String> dataStream = (Map<String, String>) doc.get("data_stream");
112+
dataStream.put("type", "logs");
113+
dataStream.putIfAbsent("dataset", "generic");
114+
dataStream.putIfAbsent("namespace", "default");
115+
indexRequests.add(Requests.indexRequest("logs-" + dataStream.get("dataset") + "-" + dataStream.get("namespace"))
116+
.opType(DocWriteRequest.OpType.CREATE)
117+
.source(doc)
118+
);
119+
}
120+
}
121+
}
122+
123+
return channel -> {
124+
// always accept request and process it asynchronously
125+
try (XContentBuilder builder = channel.newBuilder()) {
126+
builder.startObject();
127+
builder.endObject();
128+
channel.sendResponse(new RestResponse(RestStatus.ACCEPTED, builder));
129+
}
130+
131+
client.bulk(Requests.bulkRequest().add(indexRequests), new ActionListener<BulkResponse>() {
132+
@Override
133+
public void onResponse(BulkResponse bulkItemResponses) {
134+
if (bulkItemResponses.hasFailures()) {
135+
BulkRequest retryBulk = Requests.bulkRequest();
136+
Arrays.stream(bulkItemResponses.getItems())
137+
.filter(BulkItemResponse::isFailed)
138+
.map(failedRequest -> {
139+
Map<String, Object> doc = indexRequests.get(failedRequest.getItemId()).sourceAsMap();
140+
Exception cause = failedRequest.getFailure().getCause();
141+
addPath(doc, "error.type", ElasticsearchException.getExceptionName(cause));
142+
addPath(doc, "error.message", cause.getMessage());
143+
addPath(doc, "data_stream.dataset", "generic");
144+
addPath(doc, "data_stream.namespace", "default");
145+
return doc;
146+
})
147+
.map(doc -> Requests.indexRequest("logs-generic-default")
148+
.opType(DocWriteRequest.OpType.CREATE)
149+
.source(doc))
150+
.forEach(retryBulk::add);
151+
client.bulk(retryBulk, new ActionListener<BulkResponse>() {
152+
@Override
153+
public void onResponse(BulkResponse bulkItemResponses) {
154+
if (bulkItemResponses.hasFailures()) {
155+
logger.error(
156+
"Failed to ingest logs: re-try batch has failures. First failure: {}",
157+
Arrays.stream(bulkItemResponses.getItems())
158+
.filter(BulkItemResponse::isFailed)
159+
.findFirst()
160+
.map(BulkItemResponse::getFailureMessage)
161+
.orElse(null)
162+
);
163+
}
164+
}
165+
166+
@Override
167+
public void onFailure(Exception e) {
168+
logger.error("Failed to ingest logs", e);
169+
}
170+
});
171+
}
172+
}
173+
174+
@Override
175+
public void onFailure(Exception e) {
176+
logger.error("Failed to ingest logs", e);
177+
}
178+
});
179+
};
180+
181+
}
182+
183+
private Map<String, Object> parseJson(String json) {
184+
try (XContentParser parser = XContentType.JSON.xContent().createParser(XContentParserConfiguration.EMPTY, json)) {
185+
parser.allowDuplicateKeys(true);
186+
return parser.map();
187+
} catch (Exception e) {
188+
return null;
189+
}
190+
}
191+
192+
private Map<String, Object> getMetadata(Map<String, ?> event) {
193+
Object metadata = event.get("_metadata");
194+
if (metadata instanceof Map<?,?>) {
195+
@SuppressWarnings("unchecked")
196+
Map<String, Object> metadataMap = (Map<String, Object>) metadata;
197+
return metadataMap;
198+
}
199+
return Map.of();
200+
}
201+
202+
public static void expandDots(Map<String, Object> doc) {
203+
for (String key : new ArrayList<>(doc.keySet())) {
204+
if (key.contains(".")) {
205+
Object value = doc.remove(key);
206+
addPath(doc, key, value);
207+
}
208+
}
209+
}
210+
211+
@SuppressWarnings("unchecked")
212+
private static Map<String, Object> addPath(Map<String, Object> doc, String path, Object value) {
213+
Map<String, Object> parent = doc;
214+
String[] pathElements = path.split("\\.");
215+
for (int i = 0; i < pathElements.length; i++) {
216+
String pathElement = pathElements[i];
217+
if (i == pathElements.length -1) {
218+
parent.put(pathElement, value);
219+
} else {
220+
if (parent.containsKey(pathElement) == false) {
221+
parent.put(pathElement, new HashMap<>());
222+
}
223+
Object potentialParent = parent.get(pathElement);
224+
if (potentialParent instanceof Map) {
225+
parent = (Map<String, Object>) potentialParent;
226+
} else {
227+
// conflict, put the dotted key back in
228+
doc.put(path, value);
229+
break;
230+
}
231+
}
232+
}
233+
return doc;
234+
}
235+
}

0 commit comments

Comments
 (0)