Skip to content

Commit 1502893

Browse files
committed
Tasks: Only require task permissions (#35667)
Right now using the `GET /_tasks/<taskid>` API and causing a task to opt in to saving its result after being completed requires permissions on the `.tasks` index. When we built this we thought that that was fine, but we've since moved towards not leaking details like "persisting task results after the task is completed is done by saving them into an index named `.tasks`." A more modern way of doing this would be to save the tasks into the index "under the hood" and to have APIs to manage the saved tasks. This is the first step down that road: it drops the requirement to have permissions to interact with the `.tasks` index when fetching task statuses and when persisting statuses beyond the lifetime of the task. In particular, this moves the concept of the "origin" of an action into a more prominent place in the Elasticsearch server. The origin of an action is ignored by the server, but the security plugin uses the origin to make requests on behalf of a user in such a way that the user need not have permissions to perform these actions. It *can* be made to be fairly precise. More specifically, we can create an internal user just for the tasks API that just has permission to interact with the `.tasks` index. This change doesn't do that, instead, it uses the ubiquitus "xpack" user which has most permissions because it is simpler. Adding the tasks user is something I'd like to get to in a follow up change. Instead, the majority of this change is about moving the "origin" concept from the security portion of x-pack into the server. This should allow any code to use the origin. To keep the change managable I've also opted to deprecate rather than remove the "origin" helpers in the security code. Removing them is almost entirely mechanical and I'd like to that in a follow up as well. Relates to #35573
1 parent 1842b1e commit 1502893

File tree

20 files changed

+437
-176
lines changed

20 files changed

+437
-176
lines changed

server/src/main/java/org/elasticsearch/action/admin/cluster/node/tasks/get/GetTaskAction.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@
2626
* Action for retrieving a list of currently running tasks
2727
*/
2828
public class GetTaskAction extends Action<GetTaskRequest, GetTaskResponse, GetTaskRequestBuilder> {
29+
public static final String TASKS_ORIGIN = "tasks";
2930

3031
public static final GetTaskAction INSTANCE = new GetTaskAction();
3132
public static final String NAME = "cluster:monitor/task/get";

server/src/main/java/org/elasticsearch/action/admin/cluster/node/tasks/get/TransportGetTaskAction.java

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@
2929
import org.elasticsearch.action.support.HandledTransportAction;
3030
import org.elasticsearch.client.Client;
3131
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
32+
import org.elasticsearch.client.OriginSettingClient;
3233
import org.elasticsearch.cluster.node.DiscoveryNode;
3334
import org.elasticsearch.cluster.service.ClusterService;
3435
import org.elasticsearch.common.inject.Inject;
@@ -53,6 +54,7 @@
5354

5455
import java.io.IOException;
5556

57+
import static org.elasticsearch.action.admin.cluster.node.tasks.get.GetTaskAction.TASKS_ORIGIN;
5658
import static org.elasticsearch.action.admin.cluster.node.tasks.list.TransportListTasksAction.waitForCompletionTimeout;
5759

5860
/**
@@ -78,7 +80,7 @@ public TransportGetTaskAction(Settings settings, ThreadPool threadPool, Transpor
7880
super(settings, GetTaskAction.NAME, threadPool, transportService, actionFilters, indexNameExpressionResolver, GetTaskRequest::new);
7981
this.clusterService = clusterService;
8082
this.transportService = transportService;
81-
this.client = client;
83+
this.client = new OriginSettingClient(client, TASKS_ORIGIN);
8284
this.xContentRegistry = xContentRegistry;
8385
}
8486

@@ -216,6 +218,7 @@ void getFinishedTaskFromIndex(Task thisTask, GetTaskRequest request, ActionListe
216218
GetRequest get = new GetRequest(TaskResultsService.TASK_INDEX, TaskResultsService.TASK_TYPE,
217219
request.getTaskId().toString());
218220
get.setParentTask(clusterService.localNode().getId(), thisTask.getId());
221+
219222
client.get(get, new ActionListener<GetResponse>() {
220223
@Override
221224
public void onResponse(GetResponse getResponse) {
Lines changed: 61 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,61 @@
1+
/*
2+
* Licensed to Elasticsearch under one or more contributor
3+
* license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright
5+
* ownership. Elasticsearch licenses this file to you under
6+
* the Apache License, Version 2.0 (the "License"); you may
7+
* not use this file except in compliance with the License.
8+
* You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
20+
package org.elasticsearch.client;
21+
22+
import org.elasticsearch.action.Action;
23+
import org.elasticsearch.action.ActionListener;
24+
import org.elasticsearch.action.ActionRequest;
25+
import org.elasticsearch.action.ActionRequestBuilder;
26+
import org.elasticsearch.action.ActionResponse;
27+
import org.elasticsearch.action.support.ContextPreservingActionListener;
28+
import org.elasticsearch.common.util.concurrent.ThreadContext;
29+
30+
import java.util.function.Supplier;
31+
32+
/**
33+
* A {@linkplain Client} that sends requests with the
34+
* {@link ThreadContext#stashWithOrigin origin} set to a particular
35+
* value and calls its {@linkplain ActionListener} in its original
36+
* {@link ThreadContext}.
37+
*/
38+
public final class OriginSettingClient extends FilterClient {
39+
40+
private final String origin;
41+
42+
public OriginSettingClient(Client in, String origin) {
43+
super(in);
44+
this.origin = origin;
45+
}
46+
47+
@Override
48+
protected <
49+
Request extends ActionRequest,
50+
Response extends ActionResponse,
51+
RequestBuilder extends ActionRequestBuilder<Request, Response, RequestBuilder>>
52+
void doExecute(
53+
Action<Request, Response, RequestBuilder> action,
54+
Request request,
55+
ActionListener<Response> listener) {
56+
final Supplier<ThreadContext.StoredContext> supplier = in().threadPool().getThreadContext().newRestorableContext(false);
57+
try (ThreadContext.StoredContext ignore = in().threadPool().getThreadContext().stashWithOrigin(origin)) {
58+
super.doExecute(action, request, new ContextPreservingActionListener<>(supplier, listener));
59+
}
60+
}
61+
}

server/src/main/java/org/elasticsearch/common/util/concurrent/ThreadContext.java

Lines changed: 34 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,8 @@
2222
import org.apache.logging.log4j.LogManager;
2323
import org.apache.lucene.util.CloseableThreadLocal;
2424
import org.elasticsearch.ExceptionsHelper;
25+
import org.elasticsearch.action.support.ContextPreservingActionListener;
26+
import org.elasticsearch.client.OriginSettingClient;
2527
import org.elasticsearch.common.io.stream.StreamInput;
2628
import org.elasticsearch.common.io.stream.StreamOutput;
2729
import org.elasticsearch.common.io.stream.Writeable;
@@ -85,6 +87,12 @@ public final class ThreadContext implements Closeable, Writeable {
8587

8688
public static final String PREFIX = "request.headers";
8789
public static final Setting<Settings> DEFAULT_HEADERS_SETTING = Setting.groupSetting(PREFIX + ".", Property.NodeScope);
90+
91+
/**
92+
* Name for the {@link #stashWithOrigin origin} attribute.
93+
*/
94+
public static final String ACTION_ORIGIN_TRANSIENT_NAME = "action.origin";
95+
8896
private static final Logger logger = LogManager.getLogger(ThreadContext.class);
8997
private static final ThreadContextStruct DEFAULT_CONTEXT = new ThreadContextStruct();
9098
private final Map<String, String> defaultHeader;
@@ -119,14 +127,39 @@ public void close() throws IOException {
119127

120128
/**
121129
* Removes the current context and resets a default context. The removed context can be
122-
* restored when closing the returned {@link StoredContext}
130+
* restored by closing the returned {@link StoredContext}.
123131
*/
124132
public StoredContext stashContext() {
125133
final ThreadContextStruct context = threadLocal.get();
126134
threadLocal.set(null);
127135
return () -> threadLocal.set(context);
128136
}
129137

138+
/**
139+
* Removes the current context and resets a default context marked with as
140+
* originating from the supplied string. The removed context can be
141+
* restored by closing the returned {@link StoredContext}. Callers should
142+
* be careful to save the current context before calling this method and
143+
* restore it any listeners, likely with
144+
* {@link ContextPreservingActionListener}. Use {@link OriginSettingClient}
145+
* which can be used to do this automatically.
146+
* <p>
147+
* Without security the origin is ignored, but security uses it to authorize
148+
* actions that are made up of many sub-actions. These actions call
149+
* {@link #stashWithOrigin} before performing on behalf of a user that
150+
* should be allowed even if the user doesn't have permission to perform
151+
* those actions on their own.
152+
* <p>
153+
* For example, a user might not have permission to GET from the tasks index
154+
* but the tasks API will perform a get on their behalf using this method
155+
* if it can't find the task in memory.
156+
*/
157+
public StoredContext stashWithOrigin(String origin) {
158+
final ThreadContext.StoredContext storedContext = stashContext();
159+
putTransient(ACTION_ORIGIN_TRANSIENT_NAME, origin);
160+
return storedContext;
161+
}
162+
130163
/**
131164
* Removes the current context and resets a new context that contains a merge of the current headers and the given headers.
132165
* The removed context can be restored when closing the returned {@link StoredContext}. The merge strategy is that headers

server/src/main/java/org/elasticsearch/persistent/PersistentTasksService.java

Lines changed: 5 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -26,21 +26,19 @@
2626
import org.elasticsearch.action.ActionRequestBuilder;
2727
import org.elasticsearch.action.admin.cluster.node.tasks.cancel.CancelTasksRequest;
2828
import org.elasticsearch.action.admin.cluster.node.tasks.cancel.CancelTasksResponse;
29-
import org.elasticsearch.action.support.ContextPreservingActionListener;
3029
import org.elasticsearch.client.Client;
30+
import org.elasticsearch.client.OriginSettingClient;
3131
import org.elasticsearch.cluster.ClusterState;
3232
import org.elasticsearch.cluster.ClusterStateObserver;
3333
import org.elasticsearch.cluster.service.ClusterService;
3434
import org.elasticsearch.common.Nullable;
3535
import org.elasticsearch.common.unit.TimeValue;
36-
import org.elasticsearch.common.util.concurrent.ThreadContext;
3736
import org.elasticsearch.node.NodeClosedException;
3837
import org.elasticsearch.persistent.PersistentTasksCustomMetaData.PersistentTask;
3938
import org.elasticsearch.tasks.TaskId;
4039
import org.elasticsearch.threadpool.ThreadPool;
4140

4241
import java.util.function.Predicate;
43-
import java.util.function.Supplier;
4442

4543
/**
4644
* This service is used by persistent tasks and allocated persistent tasks to communicate changes
@@ -51,15 +49,14 @@ public class PersistentTasksService {
5149

5250
private static final Logger logger = LogManager.getLogger(PersistentTasksService.class);
5351

54-
private static final String ACTION_ORIGIN_TRANSIENT_NAME = "action.origin";
5552
private static final String PERSISTENT_TASK_ORIGIN = "persistent_tasks";
5653

5754
private final Client client;
5855
private final ClusterService clusterService;
5956
private final ThreadPool threadPool;
6057

6158
public PersistentTasksService(ClusterService clusterService, ThreadPool threadPool, Client client) {
62-
this.client = client;
59+
this.client = new OriginSettingClient(client, PERSISTENT_TASK_ORIGIN);
6360
this.clusterService = clusterService;
6461
this.threadPool = threadPool;
6562
}
@@ -99,12 +96,7 @@ void sendCancelRequest(final long taskId, final String reason, final ActionListe
9996
request.setTaskId(new TaskId(clusterService.localNode().getId(), taskId));
10097
request.setReason(reason);
10198
try {
102-
final ThreadContext threadContext = client.threadPool().getThreadContext();
103-
final Supplier<ThreadContext.StoredContext> supplier = threadContext.newRestorableContext(false);
104-
105-
try (ThreadContext.StoredContext ignore = stashWithOrigin(threadContext, PERSISTENT_TASK_ORIGIN)) {
106-
client.admin().cluster().cancelTasks(request, new ContextPreservingActionListener<>(supplier, listener));
107-
}
99+
client.admin().cluster().cancelTasks(request, listener);
108100
} catch (Exception e) {
109101
listener.onFailure(e);
110102
}
@@ -141,14 +133,8 @@ public void sendRemoveRequest(final String taskId, final ActionListener<Persiste
141133
private <Req extends ActionRequest, Resp extends PersistentTaskResponse, Builder extends ActionRequestBuilder<Req, Resp, Builder>>
142134
void execute(final Req request, final Action<Req, Resp, Builder> action, final ActionListener<PersistentTask<?>> listener) {
143135
try {
144-
final ThreadContext threadContext = client.threadPool().getThreadContext();
145-
final Supplier<ThreadContext.StoredContext> supplier = threadContext.newRestorableContext(false);
146-
147-
try (ThreadContext.StoredContext ignore = stashWithOrigin(threadContext, PERSISTENT_TASK_ORIGIN)) {
148-
client.execute(action, request,
149-
new ContextPreservingActionListener<>(supplier,
150-
ActionListener.wrap(r -> listener.onResponse(r.getTask()), listener::onFailure)));
151-
}
136+
client.execute(action, request,
137+
ActionListener.wrap(r -> listener.onResponse(r.getTask()), listener::onFailure));
152138
} catch (Exception e) {
153139
listener.onFailure(e);
154140
}
@@ -234,10 +220,4 @@ default void onTimeout(TimeValue timeout) {
234220
onFailure(new IllegalStateException("Timed out when waiting for persistent task after " + timeout));
235221
}
236222
}
237-
238-
public static ThreadContext.StoredContext stashWithOrigin(ThreadContext threadContext, String origin) {
239-
final ThreadContext.StoredContext storedContext = threadContext.stashContext();
240-
threadContext.putTransient(ACTION_ORIGIN_TRANSIENT_NAME, origin);
241-
return storedContext;
242-
}
243223
}

server/src/main/java/org/elasticsearch/tasks/TaskResultsService.java

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,7 @@
3232
import org.elasticsearch.action.index.IndexResponse;
3333
import org.elasticsearch.action.support.master.AcknowledgedResponse;
3434
import org.elasticsearch.client.Client;
35+
import org.elasticsearch.client.OriginSettingClient;
3536
import org.elasticsearch.client.Requests;
3637
import org.elasticsearch.cluster.ClusterState;
3738
import org.elasticsearch.cluster.metadata.IndexMetaData;
@@ -51,6 +52,8 @@
5152
import java.nio.charset.StandardCharsets;
5253
import java.util.Map;
5354

55+
import static org.elasticsearch.action.admin.cluster.node.tasks.get.GetTaskAction.TASKS_ORIGIN;
56+
5457
/**
5558
* Service that can store task results.
5659
*/
@@ -77,7 +80,7 @@ public class TaskResultsService {
7780
@Inject
7881
public TaskResultsService(Client client, ClusterService clusterService,
7982
TransportCreateIndexAction createIndexAction) {
80-
this.client = client;
83+
this.client = new OriginSettingClient(client, TASKS_ORIGIN);
8184
this.clusterService = clusterService;
8285
this.createIndexAction = createIndexAction;
8386
}
@@ -93,7 +96,7 @@ public void storeResult(TaskResult taskResult, ActionListener<Void> listener) {
9396
createIndexRequest.mapping(TASK_TYPE, taskResultIndexMapping(), XContentType.JSON);
9497
createIndexRequest.cause("auto(task api)");
9598

96-
createIndexAction.execute(null, createIndexRequest, new ActionListener<CreateIndexResponse>() {
99+
client.admin().indices().create(createIndexRequest, new ActionListener<CreateIndexResponse>() {
97100
@Override
98101
public void onResponse(CreateIndexResponse result) {
99102
doStoreResult(taskResult, listener);

server/src/test/java/org/elasticsearch/action/admin/cluster/node/tasks/TasksIT.java

Lines changed: 25 additions & 39 deletions
Original file line numberDiff line numberDiff line change
@@ -27,14 +27,14 @@
2727
import org.elasticsearch.action.TaskOperationFailure;
2828
import org.elasticsearch.action.admin.cluster.health.ClusterHealthAction;
2929
import org.elasticsearch.action.admin.cluster.node.tasks.cancel.CancelTasksResponse;
30+
import org.elasticsearch.action.admin.cluster.node.tasks.get.GetTaskRequest;
3031
import org.elasticsearch.action.admin.cluster.node.tasks.get.GetTaskResponse;
3132
import org.elasticsearch.action.admin.cluster.node.tasks.list.ListTasksAction;
3233
import org.elasticsearch.action.admin.cluster.node.tasks.list.ListTasksResponse;
3334
import org.elasticsearch.action.admin.indices.refresh.RefreshAction;
3435
import org.elasticsearch.action.admin.indices.upgrade.post.UpgradeAction;
3536
import org.elasticsearch.action.admin.indices.validate.query.ValidateQueryAction;
3637
import org.elasticsearch.action.bulk.BulkAction;
37-
import org.elasticsearch.action.get.GetResponse;
3838
import org.elasticsearch.action.index.IndexAction;
3939
import org.elasticsearch.action.index.IndexResponse;
4040
import org.elasticsearch.action.search.SearchAction;
@@ -85,7 +85,6 @@
8585
import static org.elasticsearch.common.unit.TimeValue.timeValueMillis;
8686
import static org.elasticsearch.common.unit.TimeValue.timeValueSeconds;
8787
import static org.elasticsearch.http.HttpTransportSettings.SETTING_HTTP_MAX_HEADER_SIZE;
88-
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked;
8988
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertNoFailures;
9089
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertSearchResponse;
9190
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertThrows;
@@ -722,12 +721,6 @@ public void testTasksWaitForAllTask() throws Exception {
722721
}
723722

724723
public void testTaskStoringSuccesfulResult() throws Exception {
725-
// Randomly create an empty index to make sure the type is created automatically
726-
if (randomBoolean()) {
727-
logger.info("creating an empty results index with custom settings");
728-
assertAcked(client().admin().indices().prepareCreate(TaskResultsService.TASK_INDEX));
729-
}
730-
731724
registerTaskManageListeners(TestTaskPlugin.TestTaskAction.NAME); // we need this to get task id of the process
732725

733726
// Start non-blocking test task
@@ -739,23 +732,20 @@ public void testTaskStoringSuccesfulResult() throws Exception {
739732
TaskInfo taskInfo = events.get(0);
740733
TaskId taskId = taskInfo.getTaskId();
741734

742-
GetResponse resultDoc = client()
743-
.prepareGet(TaskResultsService.TASK_INDEX, TaskResultsService.TASK_TYPE, taskId.toString()).get();
744-
assertTrue(resultDoc.isExists());
745-
746-
Map<String, Object> source = resultDoc.getSource();
747-
@SuppressWarnings("unchecked")
748-
Map<String, Object> task = (Map<String, Object>) source.get("task");
749-
assertEquals(taskInfo.getTaskId().getNodeId(), task.get("node"));
750-
assertEquals(taskInfo.getAction(), task.get("action"));
751-
assertEquals(Long.toString(taskInfo.getId()), task.get("id").toString());
752-
753-
@SuppressWarnings("unchecked")
754-
Map<String, Object> result = (Map<String, Object>) source.get("response");
735+
TaskResult taskResult = client().admin().cluster()
736+
.getTask(new GetTaskRequest().setTaskId(taskId)).get().getTask();
737+
assertTrue(taskResult.isCompleted());
738+
assertNull(taskResult.getError());
739+
740+
assertEquals(taskInfo.getTaskId(), taskResult.getTask().getTaskId());
741+
assertEquals(taskInfo.getType(), taskResult.getTask().getType());
742+
assertEquals(taskInfo.getAction(), taskResult.getTask().getAction());
743+
assertEquals(taskInfo.getDescription(), taskResult.getTask().getDescription());
744+
assertEquals(taskInfo.getStartTime(), taskResult.getTask().getStartTime());
745+
assertEquals(taskInfo.getHeaders(), taskResult.getTask().getHeaders());
746+
Map<?, ?> result = taskResult.getResponseAsMap();
755747
assertEquals("0", result.get("failure_count").toString());
756748

757-
assertNull(source.get("failure"));
758-
759749
assertNoFailures(client().admin().indices().prepareRefresh(TaskResultsService.TASK_INDEX).get());
760750

761751
SearchResponse searchResponse = client().prepareSearch(TaskResultsService.TASK_INDEX)
@@ -793,25 +783,21 @@ public void testTaskStoringFailureResult() throws Exception {
793783
TaskInfo failedTaskInfo = events.get(0);
794784
TaskId failedTaskId = failedTaskInfo.getTaskId();
795785

796-
GetResponse failedResultDoc = client()
797-
.prepareGet(TaskResultsService.TASK_INDEX, TaskResultsService.TASK_TYPE, failedTaskId.toString())
798-
.get();
799-
assertTrue(failedResultDoc.isExists());
800-
801-
Map<String, Object> source = failedResultDoc.getSource();
802-
@SuppressWarnings("unchecked")
803-
Map<String, Object> task = (Map<String, Object>) source.get("task");
804-
assertEquals(failedTaskInfo.getTaskId().getNodeId(), task.get("node"));
805-
assertEquals(failedTaskInfo.getAction(), task.get("action"));
806-
assertEquals(Long.toString(failedTaskInfo.getId()), task.get("id").toString());
807-
808-
@SuppressWarnings("unchecked")
809-
Map<String, Object> error = (Map<String, Object>) source.get("error");
786+
TaskResult taskResult = client().admin().cluster()
787+
.getTask(new GetTaskRequest().setTaskId(failedTaskId)).get().getTask();
788+
assertTrue(taskResult.isCompleted());
789+
assertNull(taskResult.getResponse());
790+
791+
assertEquals(failedTaskInfo.getTaskId(), taskResult.getTask().getTaskId());
792+
assertEquals(failedTaskInfo.getType(), taskResult.getTask().getType());
793+
assertEquals(failedTaskInfo.getAction(), taskResult.getTask().getAction());
794+
assertEquals(failedTaskInfo.getDescription(), taskResult.getTask().getDescription());
795+
assertEquals(failedTaskInfo.getStartTime(), taskResult.getTask().getStartTime());
796+
assertEquals(failedTaskInfo.getHeaders(), taskResult.getTask().getHeaders());
797+
Map<?, ?> error = (Map<?, ?>) taskResult.getErrorAsMap();
810798
assertEquals("Simulating operation failure", error.get("reason"));
811799
assertEquals("illegal_state_exception", error.get("type"));
812800

813-
assertNull(source.get("result"));
814-
815801
GetTaskResponse getResponse = expectFinishedTask(failedTaskId);
816802
assertNull(getResponse.getTask().getResponse());
817803
assertEquals(error, getResponse.getTask().getErrorAsMap());

0 commit comments

Comments
 (0)