Skip to content

Commit 0588dad

Browse files
authored
Tasks: Only require task permissions (#35667)
Right now using the `GET /_tasks/<taskid>` API and causing a task to opt in to saving its result after being completed requires permissions on the `.tasks` index. When we built this we thought that that was fine, but we've since moved towards not leaking details like "persisting task results after the task is completed is done by saving them into an index named `.tasks`." A more modern way of doing this would be to save the tasks into the index "under the hood" and to have APIs to manage the saved tasks. This is the first step down that road: it drops the requirement to have permissions to interact with the `.tasks` index when fetching task statuses and when persisting statuses beyond the lifetime of the task. In particular, this moves the concept of the "origin" of an action into a more prominent place in the Elasticsearch server. The origin of an action is ignored by the server, but the security plugin uses the origin to make requests on behalf of a user in such a way that the user need not have permissions to perform these actions. It *can* be made to be fairly precise. More specifically, we can create an internal user just for the tasks API that just has permission to interact with the `.tasks` index. This change doesn't do that, instead, it uses the ubiquitus "xpack" user which has most permissions because it is simpler. Adding the tasks user is something I'd like to get to in a follow up change. Instead, the majority of this change is about moving the "origin" concept from the security portion of x-pack into the server. This should allow any code to use the origin. To keep the change managable I've also opted to deprecate rather than remove the "origin" helpers in the security code. Removing them is almost entirely mechanical and I'd like to that in a follow up as well. Relates to #35573
1 parent 51a7dc5 commit 0588dad

File tree

20 files changed

+422
-174
lines changed

20 files changed

+422
-174
lines changed

server/src/main/java/org/elasticsearch/action/admin/cluster/node/tasks/get/GetTaskAction.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@
2525
* Action for retrieving a list of currently running tasks
2626
*/
2727
public class GetTaskAction extends Action<GetTaskResponse> {
28+
public static final String TASKS_ORIGIN = "tasks";
2829

2930
public static final GetTaskAction INSTANCE = new GetTaskAction();
3031
public static final String NAME = "cluster:monitor/task/get";

server/src/main/java/org/elasticsearch/action/admin/cluster/node/tasks/get/TransportGetTaskAction.java

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@
2828
import org.elasticsearch.action.support.ActionFilters;
2929
import org.elasticsearch.action.support.HandledTransportAction;
3030
import org.elasticsearch.client.Client;
31+
import org.elasticsearch.client.OriginSettingClient;
3132
import org.elasticsearch.cluster.node.DiscoveryNode;
3233
import org.elasticsearch.cluster.service.ClusterService;
3334
import org.elasticsearch.common.inject.Inject;
@@ -51,6 +52,7 @@
5152

5253
import java.io.IOException;
5354

55+
import static org.elasticsearch.action.admin.cluster.node.tasks.get.GetTaskAction.TASKS_ORIGIN;
5456
import static org.elasticsearch.action.admin.cluster.node.tasks.list.TransportListTasksAction.waitForCompletionTimeout;
5557

5658
/**
@@ -77,7 +79,7 @@ public TransportGetTaskAction(ThreadPool threadPool, TransportService transportS
7779
this.threadPool = threadPool;
7880
this.clusterService = clusterService;
7981
this.transportService = transportService;
80-
this.client = client;
82+
this.client = new OriginSettingClient(client, TASKS_ORIGIN);
8183
this.xContentRegistry = xContentRegistry;
8284
}
8385

@@ -210,6 +212,7 @@ void getFinishedTaskFromIndex(Task thisTask, GetTaskRequest request, ActionListe
210212
GetRequest get = new GetRequest(TaskResultsService.TASK_INDEX, TaskResultsService.TASK_TYPE,
211213
request.getTaskId().toString());
212214
get.setParentTask(clusterService.localNode().getId(), thisTask.getId());
215+
213216
client.get(get, new ActionListener<GetResponse>() {
214217
@Override
215218
public void onResponse(GetResponse getResponse) {
Lines changed: 54 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,54 @@
1+
/*
2+
* Licensed to Elasticsearch under one or more contributor
3+
* license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright
5+
* ownership. Elasticsearch licenses this file to you under
6+
* the Apache License, Version 2.0 (the "License"); you may
7+
* not use this file except in compliance with the License.
8+
* You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
20+
package org.elasticsearch.client;
21+
22+
import org.elasticsearch.action.Action;
23+
import org.elasticsearch.action.ActionListener;
24+
import org.elasticsearch.action.ActionRequest;
25+
import org.elasticsearch.action.ActionResponse;
26+
import org.elasticsearch.action.support.ContextPreservingActionListener;
27+
import org.elasticsearch.common.util.concurrent.ThreadContext;
28+
29+
import java.util.function.Supplier;
30+
31+
/**
32+
* A {@linkplain Client} that sends requests with the
33+
* {@link ThreadContext#stashWithOrigin origin} set to a particular
34+
* value and calls its {@linkplain ActionListener} in its original
35+
* {@link ThreadContext}.
36+
*/
37+
public final class OriginSettingClient extends FilterClient {
38+
39+
private final String origin;
40+
41+
public OriginSettingClient(Client in, String origin) {
42+
super(in);
43+
this.origin = origin;
44+
}
45+
46+
@Override
47+
protected <Request extends ActionRequest, Response extends ActionResponse>
48+
void doExecute(Action<Response> action, Request request, ActionListener<Response> listener) {
49+
final Supplier<ThreadContext.StoredContext> supplier = in().threadPool().getThreadContext().newRestorableContext(false);
50+
try (ThreadContext.StoredContext ignore = in().threadPool().getThreadContext().stashWithOrigin(origin)) {
51+
super.doExecute(action, request, new ContextPreservingActionListener<>(supplier, listener));
52+
}
53+
}
54+
}

server/src/main/java/org/elasticsearch/common/util/concurrent/ThreadContext.java

Lines changed: 34 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,8 @@
2222
import org.apache.logging.log4j.LogManager;
2323
import org.apache.lucene.util.CloseableThreadLocal;
2424
import org.elasticsearch.ExceptionsHelper;
25+
import org.elasticsearch.action.support.ContextPreservingActionListener;
26+
import org.elasticsearch.client.OriginSettingClient;
2527
import org.elasticsearch.common.io.stream.StreamInput;
2628
import org.elasticsearch.common.io.stream.StreamOutput;
2729
import org.elasticsearch.common.io.stream.Writeable;
@@ -85,6 +87,12 @@ public final class ThreadContext implements Closeable, Writeable {
8587

8688
public static final String PREFIX = "request.headers";
8789
public static final Setting<Settings> DEFAULT_HEADERS_SETTING = Setting.groupSetting(PREFIX + ".", Property.NodeScope);
90+
91+
/**
92+
* Name for the {@link #stashWithOrigin origin} attribute.
93+
*/
94+
public static final String ACTION_ORIGIN_TRANSIENT_NAME = "action.origin";
95+
8896
private static final Logger logger = LogManager.getLogger(ThreadContext.class);
8997
private static final ThreadContextStruct DEFAULT_CONTEXT = new ThreadContextStruct();
9098
private final Map<String, String> defaultHeader;
@@ -119,14 +127,39 @@ public void close() throws IOException {
119127

120128
/**
121129
* Removes the current context and resets a default context. The removed context can be
122-
* restored when closing the returned {@link StoredContext}
130+
* restored by closing the returned {@link StoredContext}.
123131
*/
124132
public StoredContext stashContext() {
125133
final ThreadContextStruct context = threadLocal.get();
126134
threadLocal.set(null);
127135
return () -> threadLocal.set(context);
128136
}
129137

138+
/**
139+
* Removes the current context and resets a default context marked with as
140+
* originating from the supplied string. The removed context can be
141+
* restored by closing the returned {@link StoredContext}. Callers should
142+
* be careful to save the current context before calling this method and
143+
* restore it any listeners, likely with
144+
* {@link ContextPreservingActionListener}. Use {@link OriginSettingClient}
145+
* which can be used to do this automatically.
146+
* <p>
147+
* Without security the origin is ignored, but security uses it to authorize
148+
* actions that are made up of many sub-actions. These actions call
149+
* {@link #stashWithOrigin} before performing on behalf of a user that
150+
* should be allowed even if the user doesn't have permission to perform
151+
* those actions on their own.
152+
* <p>
153+
* For example, a user might not have permission to GET from the tasks index
154+
* but the tasks API will perform a get on their behalf using this method
155+
* if it can't find the task in memory.
156+
*/
157+
public StoredContext stashWithOrigin(String origin) {
158+
final ThreadContext.StoredContext storedContext = stashContext();
159+
putTransient(ACTION_ORIGIN_TRANSIENT_NAME, origin);
160+
return storedContext;
161+
}
162+
130163
/**
131164
* Removes the current context and resets a new context that contains a merge of the current headers and the given headers.
132165
* The removed context can be restored when closing the returned {@link StoredContext}. The merge strategy is that headers

server/src/main/java/org/elasticsearch/persistent/PersistentTasksService.java

Lines changed: 5 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -25,21 +25,19 @@
2525
import org.elasticsearch.action.ActionRequest;
2626
import org.elasticsearch.action.admin.cluster.node.tasks.cancel.CancelTasksRequest;
2727
import org.elasticsearch.action.admin.cluster.node.tasks.cancel.CancelTasksResponse;
28-
import org.elasticsearch.action.support.ContextPreservingActionListener;
2928
import org.elasticsearch.client.Client;
29+
import org.elasticsearch.client.OriginSettingClient;
3030
import org.elasticsearch.cluster.ClusterState;
3131
import org.elasticsearch.cluster.ClusterStateObserver;
3232
import org.elasticsearch.cluster.service.ClusterService;
3333
import org.elasticsearch.common.Nullable;
3434
import org.elasticsearch.common.unit.TimeValue;
35-
import org.elasticsearch.common.util.concurrent.ThreadContext;
3635
import org.elasticsearch.node.NodeClosedException;
3736
import org.elasticsearch.persistent.PersistentTasksCustomMetaData.PersistentTask;
3837
import org.elasticsearch.tasks.TaskId;
3938
import org.elasticsearch.threadpool.ThreadPool;
4039

4140
import java.util.function.Predicate;
42-
import java.util.function.Supplier;
4341

4442
/**
4543
* This service is used by persistent tasks and allocated persistent tasks to communicate changes
@@ -50,15 +48,14 @@ public class PersistentTasksService {
5048

5149
private static final Logger logger = LogManager.getLogger(PersistentTasksService.class);
5250

53-
private static final String ACTION_ORIGIN_TRANSIENT_NAME = "action.origin";
5451
private static final String PERSISTENT_TASK_ORIGIN = "persistent_tasks";
5552

5653
private final Client client;
5754
private final ClusterService clusterService;
5855
private final ThreadPool threadPool;
5956

6057
public PersistentTasksService(ClusterService clusterService, ThreadPool threadPool, Client client) {
61-
this.client = client;
58+
this.client = new OriginSettingClient(client, PERSISTENT_TASK_ORIGIN);
6259
this.clusterService = clusterService;
6360
this.threadPool = threadPool;
6461
}
@@ -98,12 +95,7 @@ void sendCancelRequest(final long taskId, final String reason, final ActionListe
9895
request.setTaskId(new TaskId(clusterService.localNode().getId(), taskId));
9996
request.setReason(reason);
10097
try {
101-
final ThreadContext threadContext = client.threadPool().getThreadContext();
102-
final Supplier<ThreadContext.StoredContext> supplier = threadContext.newRestorableContext(false);
103-
104-
try (ThreadContext.StoredContext ignore = stashWithOrigin(threadContext, PERSISTENT_TASK_ORIGIN)) {
105-
client.admin().cluster().cancelTasks(request, new ContextPreservingActionListener<>(supplier, listener));
106-
}
98+
client.admin().cluster().cancelTasks(request, listener);
10799
} catch (Exception e) {
108100
listener.onFailure(e);
109101
}
@@ -140,14 +132,8 @@ public void sendRemoveRequest(final String taskId, final ActionListener<Persiste
140132
private <Req extends ActionRequest, Resp extends PersistentTaskResponse>
141133
void execute(final Req request, final Action<Resp> action, final ActionListener<PersistentTask<?>> listener) {
142134
try {
143-
final ThreadContext threadContext = client.threadPool().getThreadContext();
144-
final Supplier<ThreadContext.StoredContext> supplier = threadContext.newRestorableContext(false);
145-
146-
try (ThreadContext.StoredContext ignore = stashWithOrigin(threadContext, PERSISTENT_TASK_ORIGIN)) {
147-
client.execute(action, request,
148-
new ContextPreservingActionListener<>(supplier,
149-
ActionListener.wrap(r -> listener.onResponse(r.getTask()), listener::onFailure)));
150-
}
135+
client.execute(action, request,
136+
ActionListener.wrap(r -> listener.onResponse(r.getTask()), listener::onFailure));
151137
} catch (Exception e) {
152138
listener.onFailure(e);
153139
}
@@ -233,10 +219,4 @@ default void onTimeout(TimeValue timeout) {
233219
onFailure(new IllegalStateException("Timed out when waiting for persistent task after " + timeout));
234220
}
235221
}
236-
237-
public static ThreadContext.StoredContext stashWithOrigin(ThreadContext threadContext, String origin) {
238-
final ThreadContext.StoredContext storedContext = threadContext.stashContext();
239-
threadContext.putTransient(ACTION_ORIGIN_TRANSIENT_NAME, origin);
240-
return storedContext;
241-
}
242222
}

server/src/main/java/org/elasticsearch/tasks/TaskResultsService.java

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,7 @@
3131
import org.elasticsearch.action.index.IndexResponse;
3232
import org.elasticsearch.action.support.master.AcknowledgedResponse;
3333
import org.elasticsearch.client.Client;
34+
import org.elasticsearch.client.OriginSettingClient;
3435
import org.elasticsearch.client.Requests;
3536
import org.elasticsearch.cluster.ClusterState;
3637
import org.elasticsearch.cluster.metadata.IndexMetaData;
@@ -50,6 +51,8 @@
5051
import java.nio.charset.StandardCharsets;
5152
import java.util.Map;
5253

54+
import static org.elasticsearch.action.admin.cluster.node.tasks.get.GetTaskAction.TASKS_ORIGIN;
55+
5356
/**
5457
* Service that can store task results.
5558
*/
@@ -73,7 +76,7 @@ public class TaskResultsService {
7376

7477
@Inject
7578
public TaskResultsService(Client client, ClusterService clusterService) {
76-
this.client = client;
79+
this.client = new OriginSettingClient(client, TASKS_ORIGIN);
7780
this.clusterService = clusterService;
7881
}
7982

server/src/test/java/org/elasticsearch/action/admin/cluster/node/tasks/TasksIT.java

Lines changed: 25 additions & 39 deletions
Original file line numberDiff line numberDiff line change
@@ -27,14 +27,14 @@
2727
import org.elasticsearch.action.TaskOperationFailure;
2828
import org.elasticsearch.action.admin.cluster.health.ClusterHealthAction;
2929
import org.elasticsearch.action.admin.cluster.node.tasks.cancel.CancelTasksResponse;
30+
import org.elasticsearch.action.admin.cluster.node.tasks.get.GetTaskRequest;
3031
import org.elasticsearch.action.admin.cluster.node.tasks.get.GetTaskResponse;
3132
import org.elasticsearch.action.admin.cluster.node.tasks.list.ListTasksAction;
3233
import org.elasticsearch.action.admin.cluster.node.tasks.list.ListTasksResponse;
3334
import org.elasticsearch.action.admin.indices.refresh.RefreshAction;
3435
import org.elasticsearch.action.admin.indices.upgrade.post.UpgradeAction;
3536
import org.elasticsearch.action.admin.indices.validate.query.ValidateQueryAction;
3637
import org.elasticsearch.action.bulk.BulkAction;
37-
import org.elasticsearch.action.get.GetResponse;
3838
import org.elasticsearch.action.index.IndexAction;
3939
import org.elasticsearch.action.index.IndexResponse;
4040
import org.elasticsearch.action.search.SearchAction;
@@ -85,7 +85,6 @@
8585
import static org.elasticsearch.common.unit.TimeValue.timeValueMillis;
8686
import static org.elasticsearch.common.unit.TimeValue.timeValueSeconds;
8787
import static org.elasticsearch.http.HttpTransportSettings.SETTING_HTTP_MAX_HEADER_SIZE;
88-
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked;
8988
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertNoFailures;
9089
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertSearchResponse;
9190
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertThrows;
@@ -725,12 +724,6 @@ public void testTasksWaitForAllTask() throws Exception {
725724
}
726725

727726
public void testTaskStoringSuccesfulResult() throws Exception {
728-
// Randomly create an empty index to make sure the type is created automatically
729-
if (randomBoolean()) {
730-
logger.info("creating an empty results index with custom settings");
731-
assertAcked(client().admin().indices().prepareCreate(TaskResultsService.TASK_INDEX));
732-
}
733-
734727
registerTaskManageListeners(TestTaskPlugin.TestTaskAction.NAME); // we need this to get task id of the process
735728

736729
// Start non-blocking test task
@@ -743,23 +736,20 @@ public void testTaskStoringSuccesfulResult() throws Exception {
743736
TaskInfo taskInfo = events.get(0);
744737
TaskId taskId = taskInfo.getTaskId();
745738

746-
GetResponse resultDoc = client()
747-
.prepareGet(TaskResultsService.TASK_INDEX, TaskResultsService.TASK_TYPE, taskId.toString()).get();
748-
assertTrue(resultDoc.isExists());
749-
750-
Map<String, Object> source = resultDoc.getSource();
751-
@SuppressWarnings("unchecked")
752-
Map<String, Object> task = (Map<String, Object>) source.get("task");
753-
assertEquals(taskInfo.getTaskId().getNodeId(), task.get("node"));
754-
assertEquals(taskInfo.getAction(), task.get("action"));
755-
assertEquals(Long.toString(taskInfo.getId()), task.get("id").toString());
756-
757-
@SuppressWarnings("unchecked")
758-
Map<String, Object> result = (Map<String, Object>) source.get("response");
739+
TaskResult taskResult = client().admin().cluster()
740+
.getTask(new GetTaskRequest().setTaskId(taskId)).get().getTask();
741+
assertTrue(taskResult.isCompleted());
742+
assertNull(taskResult.getError());
743+
744+
assertEquals(taskInfo.getTaskId(), taskResult.getTask().getTaskId());
745+
assertEquals(taskInfo.getType(), taskResult.getTask().getType());
746+
assertEquals(taskInfo.getAction(), taskResult.getTask().getAction());
747+
assertEquals(taskInfo.getDescription(), taskResult.getTask().getDescription());
748+
assertEquals(taskInfo.getStartTime(), taskResult.getTask().getStartTime());
749+
assertEquals(taskInfo.getHeaders(), taskResult.getTask().getHeaders());
750+
Map<?, ?> result = taskResult.getResponseAsMap();
759751
assertEquals("0", result.get("failure_count").toString());
760752

761-
assertNull(source.get("failure"));
762-
763753
assertNoFailures(client().admin().indices().prepareRefresh(TaskResultsService.TASK_INDEX).get());
764754

765755
SearchResponse searchResponse = client().prepareSearch(TaskResultsService.TASK_INDEX)
@@ -797,25 +787,21 @@ public void testTaskStoringFailureResult() throws Exception {
797787
TaskInfo failedTaskInfo = events.get(0);
798788
TaskId failedTaskId = failedTaskInfo.getTaskId();
799789

800-
GetResponse failedResultDoc = client()
801-
.prepareGet(TaskResultsService.TASK_INDEX, TaskResultsService.TASK_TYPE, failedTaskId.toString())
802-
.get();
803-
assertTrue(failedResultDoc.isExists());
804-
805-
Map<String, Object> source = failedResultDoc.getSource();
806-
@SuppressWarnings("unchecked")
807-
Map<String, Object> task = (Map<String, Object>) source.get("task");
808-
assertEquals(failedTaskInfo.getTaskId().getNodeId(), task.get("node"));
809-
assertEquals(failedTaskInfo.getAction(), task.get("action"));
810-
assertEquals(Long.toString(failedTaskInfo.getId()), task.get("id").toString());
811-
812-
@SuppressWarnings("unchecked")
813-
Map<String, Object> error = (Map<String, Object>) source.get("error");
790+
TaskResult taskResult = client().admin().cluster()
791+
.getTask(new GetTaskRequest().setTaskId(failedTaskId)).get().getTask();
792+
assertTrue(taskResult.isCompleted());
793+
assertNull(taskResult.getResponse());
794+
795+
assertEquals(failedTaskInfo.getTaskId(), taskResult.getTask().getTaskId());
796+
assertEquals(failedTaskInfo.getType(), taskResult.getTask().getType());
797+
assertEquals(failedTaskInfo.getAction(), taskResult.getTask().getAction());
798+
assertEquals(failedTaskInfo.getDescription(), taskResult.getTask().getDescription());
799+
assertEquals(failedTaskInfo.getStartTime(), taskResult.getTask().getStartTime());
800+
assertEquals(failedTaskInfo.getHeaders(), taskResult.getTask().getHeaders());
801+
Map<?, ?> error = (Map<?, ?>) taskResult.getErrorAsMap();
814802
assertEquals("Simulating operation failure", error.get("reason"));
815803
assertEquals("illegal_state_exception", error.get("type"));
816804

817-
assertNull(source.get("result"));
818-
819805
GetTaskResponse getResponse = expectFinishedTask(failedTaskId);
820806
assertNull(getResponse.getTask().getResponse());
821807
assertEquals(error, getResponse.getTask().getErrorAsMap());

0 commit comments

Comments
 (0)