Skip to content

Commit ef69ce7

Browse files
committed
Tasks: Only require task permissions
Right now using the `GET /_tasks/<taskid>` API and causing a task to opt in to saving its result after being completed requires permissions on the `.tasks` index. When we built this we thought that that was fine, but we've since moved towards not leaking details like "persisting task results after the task is completed is done by saving them into an index named `.tasks`." A more modern way of doing this would be to save the tasks into the index "under the hood" and to have APIs to manage the saved tasks. This is the first step down that road: it drops the requirement to have permissions to interact with the `.tasks` index when fetching task statuses and when persisting statuses beyond the lifetime of the task. In particular, this moves the concept of the "origin" of an action into a more prominent place in the Elasticsearch server. The origin of an action is ignored by the server, but the security plugin uses the origin to make requests on behalf of a user in such a way that the user need not have permissions to perform these actions. It *can* be made to be fairly precise. More specifically, we can create an internal user just for the tasks API that just has permission to interact with the `.tasks` index. This change doesn't do that, instead, it uses the ubiquitus "xpack" user which has most permissions because it is simpler. Adding the tasks user is something I'd like to get to in a follow up change. Instead, the majority of this change is about moving the "origin" concept from the security portion of x-pack into the server. This should allow any code to use the origin. To keep the change managable I've also opted to deprecate rather than remove the "origin" helpers in the security code. Removing them is almost entirely mechanical and I'd like to that in a follow up as well. Relates to elastic#35573
1 parent d62bbca commit ef69ce7

File tree

20 files changed

+420
-174
lines changed

20 files changed

+420
-174
lines changed

server/src/main/java/org/elasticsearch/action/admin/cluster/node/tasks/get/GetTaskAction.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@
2525
* Action for retrieving a list of currently running tasks
2626
*/
2727
public class GetTaskAction extends Action<GetTaskResponse> {
28+
public static final String TASKS_ORIGIN = "tasks";
2829

2930
public static final GetTaskAction INSTANCE = new GetTaskAction();
3031
public static final String NAME = "cluster:monitor/task/get";

server/src/main/java/org/elasticsearch/action/admin/cluster/node/tasks/get/TransportGetTaskAction.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@
2828
import org.elasticsearch.action.support.ActionFilters;
2929
import org.elasticsearch.action.support.HandledTransportAction;
3030
import org.elasticsearch.client.Client;
31+
import org.elasticsearch.client.OriginSettingClient;
3132
import org.elasticsearch.cluster.node.DiscoveryNode;
3233
import org.elasticsearch.cluster.service.ClusterService;
3334
import org.elasticsearch.common.inject.Inject;
@@ -51,6 +52,7 @@
5152

5253
import java.io.IOException;
5354

55+
import static org.elasticsearch.action.admin.cluster.node.tasks.get.GetTaskAction.TASKS_ORIGIN;
5456
import static org.elasticsearch.action.admin.cluster.node.tasks.list.TransportListTasksAction.waitForCompletionTimeout;
5557

5658
/**
@@ -210,7 +212,7 @@ void getFinishedTaskFromIndex(Task thisTask, GetTaskRequest request, ActionListe
210212
GetRequest get = new GetRequest(TaskResultsService.TASK_INDEX, TaskResultsService.TASK_TYPE,
211213
request.getTaskId().toString());
212214
get.setParentTask(clusterService.localNode().getId(), thisTask.getId());
213-
client.get(get, new ActionListener<GetResponse>() {
215+
new OriginSettingClient(client, TASKS_ORIGIN).get(get, new ActionListener<GetResponse>() {
214216
@Override
215217
public void onResponse(GetResponse getResponse) {
216218
try {
Lines changed: 54 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,54 @@
1+
/*
2+
* Licensed to Elasticsearch under one or more contributor
3+
* license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright
5+
* ownership. Elasticsearch licenses this file to you under
6+
* the Apache License, Version 2.0 (the "License"); you may
7+
* not use this file except in compliance with the License.
8+
* You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
20+
package org.elasticsearch.client;
21+
22+
import org.elasticsearch.action.Action;
23+
import org.elasticsearch.action.ActionListener;
24+
import org.elasticsearch.action.ActionRequest;
25+
import org.elasticsearch.action.ActionResponse;
26+
import org.elasticsearch.action.support.ContextPreservingActionListener;
27+
import org.elasticsearch.common.util.concurrent.ThreadContext;
28+
29+
import java.util.function.Supplier;
30+
31+
/**
32+
* A {@linkplain Client} that sends requests with the
33+
* {@link ThreadContext#stashWithOrigin origin} set to a particular
34+
* value and calls its {@linkplain ActionListener} in its original
35+
* {@link ThreadContext}.
36+
*/
37+
public final class OriginSettingClient extends FilterClient {
38+
39+
private final String origin;
40+
41+
public OriginSettingClient(Client in, String origin) {
42+
super(in);
43+
this.origin = origin;
44+
}
45+
46+
@Override
47+
protected <Request extends ActionRequest, Response extends ActionResponse>
48+
void doExecute(Action<Response> action, Request request, ActionListener<Response> listener) {
49+
final Supplier<ThreadContext.StoredContext> supplier = in().threadPool().getThreadContext().newRestorableContext(false);
50+
try (ThreadContext.StoredContext ignore = in().threadPool().getThreadContext().stashWithOrigin(origin)) {
51+
super.doExecute(action, request, new ContextPreservingActionListener<>(supplier, listener));
52+
}
53+
}
54+
}

server/src/main/java/org/elasticsearch/common/util/concurrent/ThreadContext.java

Lines changed: 34 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,8 @@
2222
import org.apache.logging.log4j.LogManager;
2323
import org.apache.lucene.util.CloseableThreadLocal;
2424
import org.elasticsearch.ExceptionsHelper;
25+
import org.elasticsearch.action.support.ContextPreservingActionListener;
26+
import org.elasticsearch.client.OriginSettingClient;
2527
import org.elasticsearch.common.io.stream.StreamInput;
2628
import org.elasticsearch.common.io.stream.StreamOutput;
2729
import org.elasticsearch.common.io.stream.Writeable;
@@ -85,6 +87,12 @@ public final class ThreadContext implements Closeable, Writeable {
8587

8688
public static final String PREFIX = "request.headers";
8789
public static final Setting<Settings> DEFAULT_HEADERS_SETTING = Setting.groupSetting(PREFIX + ".", Property.NodeScope);
90+
91+
/**
92+
* Name for the {@link #stashWithOrigin origin} attribute.
93+
*/
94+
public static final String ACTION_ORIGIN_TRANSIENT_NAME = "action.origin";
95+
8896
private static final Logger logger = LogManager.getLogger(ThreadContext.class);
8997
private static final ThreadContextStruct DEFAULT_CONTEXT = new ThreadContextStruct();
9098
private final Map<String, String> defaultHeader;
@@ -119,14 +127,39 @@ public void close() throws IOException {
119127

120128
/**
121129
* Removes the current context and resets a default context. The removed context can be
122-
* restored when closing the returned {@link StoredContext}
130+
* restored by closing the returned {@link StoredContext}.
123131
*/
124132
public StoredContext stashContext() {
125133
final ThreadContextStruct context = threadLocal.get();
126134
threadLocal.set(null);
127135
return () -> threadLocal.set(context);
128136
}
129137

138+
/**
139+
* Removes the current context and resets a default context marked with as
140+
* originating from the supplied string. The removed context can be
141+
* restored by closing the returned {@link StoredContext}. Callers should
142+
* be careful to save the current context before calling this method and
143+
* restore it any listeners, likely with
144+
* {@link ContextPreservingActionListener}. Use {@link OriginSettingClient}
145+
* which can be used to do this automatically.
146+
* <p>
147+
* Without security the origin is ignored, but security uses it to authorize
148+
* actions that are made up of many sub-actions. These actions call
149+
* {@link #stashWithOrigin} before performing on behalf of a user that
150+
* should be allowed even if the user doesn't have permission to perform
151+
* those actions on their own.
152+
* <p>
153+
* For example, a user might not have permission to GET from the tasks index
154+
* but the tasks API will perform a get on their behalf using this method
155+
* if it can't find the task in memory.
156+
*/
157+
public StoredContext stashWithOrigin(String origin) {
158+
final ThreadContext.StoredContext storedContext = stashContext();
159+
putTransient(ACTION_ORIGIN_TRANSIENT_NAME, origin);
160+
return storedContext;
161+
}
162+
130163
/**
131164
* Removes the current context and resets a new context that contains a merge of the current headers and the given headers.
132165
* The removed context can be restored when closing the returned {@link StoredContext}. The merge strategy is that headers

server/src/main/java/org/elasticsearch/persistent/PersistentTasksService.java

Lines changed: 5 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -23,22 +23,20 @@
2323
import org.elasticsearch.action.ActionRequest;
2424
import org.elasticsearch.action.admin.cluster.node.tasks.cancel.CancelTasksRequest;
2525
import org.elasticsearch.action.admin.cluster.node.tasks.cancel.CancelTasksResponse;
26-
import org.elasticsearch.action.support.ContextPreservingActionListener;
2726
import org.elasticsearch.client.Client;
27+
import org.elasticsearch.client.OriginSettingClient;
2828
import org.elasticsearch.cluster.ClusterState;
2929
import org.elasticsearch.cluster.ClusterStateObserver;
3030
import org.elasticsearch.cluster.service.ClusterService;
3131
import org.elasticsearch.common.Nullable;
3232
import org.elasticsearch.common.component.AbstractComponent;
3333
import org.elasticsearch.common.unit.TimeValue;
34-
import org.elasticsearch.common.util.concurrent.ThreadContext;
3534
import org.elasticsearch.node.NodeClosedException;
3635
import org.elasticsearch.persistent.PersistentTasksCustomMetaData.PersistentTask;
3736
import org.elasticsearch.tasks.TaskId;
3837
import org.elasticsearch.threadpool.ThreadPool;
3938

4039
import java.util.function.Predicate;
41-
import java.util.function.Supplier;
4240

4341
/**
4442
* This service is used by persistent tasks and allocated persistent tasks to communicate changes
@@ -47,15 +45,14 @@
4745
*/
4846
public class PersistentTasksService extends AbstractComponent {
4947

50-
private static final String ACTION_ORIGIN_TRANSIENT_NAME = "action.origin";
5148
private static final String PERSISTENT_TASK_ORIGIN = "persistent_tasks";
5249

5350
private final Client client;
5451
private final ClusterService clusterService;
5552
private final ThreadPool threadPool;
5653

5754
public PersistentTasksService(ClusterService clusterService, ThreadPool threadPool, Client client) {
58-
this.client = client;
55+
this.client = new OriginSettingClient(client, PERSISTENT_TASK_ORIGIN);
5956
this.clusterService = clusterService;
6057
this.threadPool = threadPool;
6158
}
@@ -95,12 +92,7 @@ void sendCancelRequest(final long taskId, final String reason, final ActionListe
9592
request.setTaskId(new TaskId(clusterService.localNode().getId(), taskId));
9693
request.setReason(reason);
9794
try {
98-
final ThreadContext threadContext = client.threadPool().getThreadContext();
99-
final Supplier<ThreadContext.StoredContext> supplier = threadContext.newRestorableContext(false);
100-
101-
try (ThreadContext.StoredContext ignore = stashWithOrigin(threadContext, PERSISTENT_TASK_ORIGIN)) {
102-
client.admin().cluster().cancelTasks(request, new ContextPreservingActionListener<>(supplier, listener));
103-
}
95+
client.admin().cluster().cancelTasks(request, listener);
10496
} catch (Exception e) {
10597
listener.onFailure(e);
10698
}
@@ -137,14 +129,8 @@ public void sendRemoveRequest(final String taskId, final ActionListener<Persiste
137129
private <Req extends ActionRequest, Resp extends PersistentTaskResponse>
138130
void execute(final Req request, final Action<Resp> action, final ActionListener<PersistentTask<?>> listener) {
139131
try {
140-
final ThreadContext threadContext = client.threadPool().getThreadContext();
141-
final Supplier<ThreadContext.StoredContext> supplier = threadContext.newRestorableContext(false);
142-
143-
try (ThreadContext.StoredContext ignore = stashWithOrigin(threadContext, PERSISTENT_TASK_ORIGIN)) {
144-
client.execute(action, request,
145-
new ContextPreservingActionListener<>(supplier,
146-
ActionListener.wrap(r -> listener.onResponse(r.getTask()), listener::onFailure)));
147-
}
132+
client.execute(action, request,
133+
ActionListener.wrap(r -> listener.onResponse(r.getTask()), listener::onFailure));
148134
} catch (Exception e) {
149135
listener.onFailure(e);
150136
}
@@ -230,10 +216,4 @@ default void onTimeout(TimeValue timeout) {
230216
onFailure(new IllegalStateException("Timed out when waiting for persistent task after " + timeout));
231217
}
232218
}
233-
234-
public static ThreadContext.StoredContext stashWithOrigin(ThreadContext threadContext, String origin) {
235-
final ThreadContext.StoredContext storedContext = threadContext.stashContext();
236-
threadContext.putTransient(ACTION_ORIGIN_TRANSIENT_NAME, origin);
237-
return storedContext;
238-
}
239219
}

server/src/main/java/org/elasticsearch/tasks/TaskResultsService.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@
2929
import org.elasticsearch.action.index.IndexResponse;
3030
import org.elasticsearch.action.support.master.AcknowledgedResponse;
3131
import org.elasticsearch.client.Client;
32+
import org.elasticsearch.client.OriginSettingClient;
3233
import org.elasticsearch.client.Requests;
3334
import org.elasticsearch.cluster.ClusterState;
3435
import org.elasticsearch.cluster.metadata.IndexMetaData;
@@ -70,7 +71,7 @@ public class TaskResultsService extends AbstractComponent {
7071

7172
@Inject
7273
public TaskResultsService(Client client, ClusterService clusterService) {
73-
this.client = client;
74+
this.client = new OriginSettingClient(client, "tasks");
7475
this.clusterService = clusterService;
7576
}
7677

server/src/test/java/org/elasticsearch/action/admin/cluster/node/tasks/TasksIT.java

Lines changed: 25 additions & 39 deletions
Original file line numberDiff line numberDiff line change
@@ -27,14 +27,14 @@
2727
import org.elasticsearch.action.TaskOperationFailure;
2828
import org.elasticsearch.action.admin.cluster.health.ClusterHealthAction;
2929
import org.elasticsearch.action.admin.cluster.node.tasks.cancel.CancelTasksResponse;
30+
import org.elasticsearch.action.admin.cluster.node.tasks.get.GetTaskRequest;
3031
import org.elasticsearch.action.admin.cluster.node.tasks.get.GetTaskResponse;
3132
import org.elasticsearch.action.admin.cluster.node.tasks.list.ListTasksAction;
3233
import org.elasticsearch.action.admin.cluster.node.tasks.list.ListTasksResponse;
3334
import org.elasticsearch.action.admin.indices.refresh.RefreshAction;
3435
import org.elasticsearch.action.admin.indices.upgrade.post.UpgradeAction;
3536
import org.elasticsearch.action.admin.indices.validate.query.ValidateQueryAction;
3637
import org.elasticsearch.action.bulk.BulkAction;
37-
import org.elasticsearch.action.get.GetResponse;
3838
import org.elasticsearch.action.index.IndexAction;
3939
import org.elasticsearch.action.index.IndexResponse;
4040
import org.elasticsearch.action.search.SearchAction;
@@ -85,7 +85,6 @@
8585
import static org.elasticsearch.common.unit.TimeValue.timeValueMillis;
8686
import static org.elasticsearch.common.unit.TimeValue.timeValueSeconds;
8787
import static org.elasticsearch.http.HttpTransportSettings.SETTING_HTTP_MAX_HEADER_SIZE;
88-
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked;
8988
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertNoFailures;
9089
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertSearchResponse;
9190
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertThrows;
@@ -725,12 +724,6 @@ public void testTasksWaitForAllTask() throws Exception {
725724
}
726725

727726
public void testTaskStoringSuccesfulResult() throws Exception {
728-
// Randomly create an empty index to make sure the type is created automatically
729-
if (randomBoolean()) {
730-
logger.info("creating an empty results index with custom settings");
731-
assertAcked(client().admin().indices().prepareCreate(TaskResultsService.TASK_INDEX));
732-
}
733-
734727
registerTaskManageListeners(TestTaskPlugin.TestTaskAction.NAME); // we need this to get task id of the process
735728

736729
// Start non-blocking test task
@@ -743,23 +736,20 @@ public void testTaskStoringSuccesfulResult() throws Exception {
743736
TaskInfo taskInfo = events.get(0);
744737
TaskId taskId = taskInfo.getTaskId();
745738

746-
GetResponse resultDoc = client()
747-
.prepareGet(TaskResultsService.TASK_INDEX, TaskResultsService.TASK_TYPE, taskId.toString()).get();
748-
assertTrue(resultDoc.isExists());
749-
750-
Map<String, Object> source = resultDoc.getSource();
751-
@SuppressWarnings("unchecked")
752-
Map<String, Object> task = (Map<String, Object>) source.get("task");
753-
assertEquals(taskInfo.getTaskId().getNodeId(), task.get("node"));
754-
assertEquals(taskInfo.getAction(), task.get("action"));
755-
assertEquals(Long.toString(taskInfo.getId()), task.get("id").toString());
756-
757-
@SuppressWarnings("unchecked")
758-
Map<String, Object> result = (Map<String, Object>) source.get("response");
739+
TaskResult taskResult = client().admin().cluster()
740+
.getTask(new GetTaskRequest().setTaskId(taskId)).get().getTask();
741+
assertTrue(taskResult.isCompleted());
742+
assertNull(taskResult.getError());
743+
744+
assertEquals(taskInfo.getTaskId(), taskResult.getTask().getTaskId());
745+
assertEquals(taskInfo.getType(), taskResult.getTask().getType());
746+
assertEquals(taskInfo.getAction(), taskResult.getTask().getAction());
747+
assertEquals(taskInfo.getDescription(), taskResult.getTask().getDescription());
748+
assertEquals(taskInfo.getStartTime(), taskResult.getTask().getStartTime());
749+
assertEquals(taskInfo.getHeaders(), taskResult.getTask().getHeaders());
750+
Map<?, ?> result = taskResult.getResponseAsMap();
759751
assertEquals("0", result.get("failure_count").toString());
760752

761-
assertNull(source.get("failure"));
762-
763753
assertNoFailures(client().admin().indices().prepareRefresh(TaskResultsService.TASK_INDEX).get());
764754

765755
SearchResponse searchResponse = client().prepareSearch(TaskResultsService.TASK_INDEX)
@@ -797,25 +787,21 @@ public void testTaskStoringFailureResult() throws Exception {
797787
TaskInfo failedTaskInfo = events.get(0);
798788
TaskId failedTaskId = failedTaskInfo.getTaskId();
799789

800-
GetResponse failedResultDoc = client()
801-
.prepareGet(TaskResultsService.TASK_INDEX, TaskResultsService.TASK_TYPE, failedTaskId.toString())
802-
.get();
803-
assertTrue(failedResultDoc.isExists());
804-
805-
Map<String, Object> source = failedResultDoc.getSource();
806-
@SuppressWarnings("unchecked")
807-
Map<String, Object> task = (Map<String, Object>) source.get("task");
808-
assertEquals(failedTaskInfo.getTaskId().getNodeId(), task.get("node"));
809-
assertEquals(failedTaskInfo.getAction(), task.get("action"));
810-
assertEquals(Long.toString(failedTaskInfo.getId()), task.get("id").toString());
811-
812-
@SuppressWarnings("unchecked")
813-
Map<String, Object> error = (Map<String, Object>) source.get("error");
790+
TaskResult taskResult = client().admin().cluster()
791+
.getTask(new GetTaskRequest().setTaskId(failedTaskId)).get().getTask();
792+
assertTrue(taskResult.isCompleted());
793+
assertNull(taskResult.getResponse());
794+
795+
assertEquals(failedTaskInfo.getTaskId(), taskResult.getTask().getTaskId());
796+
assertEquals(failedTaskInfo.getType(), taskResult.getTask().getType());
797+
assertEquals(failedTaskInfo.getAction(), taskResult.getTask().getAction());
798+
assertEquals(failedTaskInfo.getDescription(), taskResult.getTask().getDescription());
799+
assertEquals(failedTaskInfo.getStartTime(), taskResult.getTask().getStartTime());
800+
assertEquals(failedTaskInfo.getHeaders(), taskResult.getTask().getHeaders());
801+
Map<?, ?> error = (Map<?, ?>) taskResult.getErrorAsMap();
814802
assertEquals("Simulating operation failure", error.get("reason"));
815803
assertEquals("illegal_state_exception", error.get("type"));
816804

817-
assertNull(source.get("result"));
818-
819805
GetTaskResponse getResponse = expectFinishedTask(failedTaskId);
820806
assertNull(getResponse.getTask().getResponse());
821807
assertEquals(error, getResponse.getTask().getErrorAsMap());

0 commit comments

Comments
 (0)