29
29
import org .elasticsearch .cluster .metadata .IndexNameExpressionResolver ;
30
30
import org .elasticsearch .cluster .service .ClusterService ;
31
31
import org .elasticsearch .common .UUIDs ;
32
+ import org .elasticsearch .common .collect .Tuple ;
32
33
import org .elasticsearch .common .inject .Inject ;
33
34
import org .elasticsearch .common .settings .Settings ;
35
+ import org .elasticsearch .common .util .concurrent .ThreadContext ;
34
36
import org .elasticsearch .common .xcontent .NamedXContentRegistry ;
35
37
import org .elasticsearch .persistent .PersistentTasksCustomMetaData ;
36
38
import org .elasticsearch .persistent .PersistentTasksService ;
39
41
import org .elasticsearch .threadpool .ThreadPool ;
40
42
import org .elasticsearch .transport .TransportService ;
41
43
44
+ import java .util .List ;
45
+ import java .util .Map ;
42
46
import java .util .function .Predicate ;
47
+ import java .util .stream .Collectors ;
43
48
44
49
public class TransportStartReindexTaskAction
45
50
extends HandledTransportAction <StartReindexTaskAction .Request , StartReindexTaskAction .Response > {
46
51
52
+ private final List <String > headersToInclude ;
47
53
private final ThreadPool threadPool ;
48
54
private final PersistentTasksService persistentTasksService ;
49
55
private final ReindexValidator reindexValidator ;
@@ -55,6 +61,7 @@ public TransportStartReindexTaskAction(Settings settings, Client client, Transpo
55
61
ClusterService clusterService , PersistentTasksService persistentTasksService ,
56
62
AutoCreateIndex autoCreateIndex , NamedXContentRegistry xContentRegistry ) {
57
63
super (StartReindexTaskAction .NAME , transportService , actionFilters , StartReindexTaskAction .Request ::new );
64
+ this .headersToInclude = ReindexHeaders .REINDEX_INCLUDED_HEADERS .get (settings );
58
65
this .threadPool = threadPool ;
59
66
this .reindexValidator = new ReindexValidator (settings , clusterService , indexNameExpressionResolver , autoCreateIndex );
60
67
this .persistentTasksService = persistentTasksService ;
@@ -72,9 +79,15 @@ protected void doExecute(Task task, StartReindexTaskAction.Request request, Acti
72
79
73
80
String generatedId = UUIDs .randomBase64UUID ();
74
81
82
+ ThreadContext threadContext = threadPool .getThreadContext ();
83
+ Map <String , String > included = headersToInclude .stream ()
84
+ .map (header -> new Tuple <>(header , threadContext .getHeader (header )))
85
+ .filter (t -> t .v2 () != null )
86
+ .collect (Collectors .toMap (Tuple ::v1 , Tuple ::v2 ));
87
+
75
88
// In the current implementation, we only need to store task results if we do not wait for completion
76
89
boolean storeTaskResult = request .getWaitForCompletion () == false ;
77
- ReindexTaskParams job = new ReindexTaskParams (storeTaskResult , threadPool . getThreadContext (). getHeaders () );
90
+ ReindexTaskParams job = new ReindexTaskParams (storeTaskResult , included );
78
91
79
92
ReindexTaskStateDoc reindexState = new ReindexTaskStateDoc (request .getReindexRequest ());
80
93
reindexIndexClient .createReindexTaskDoc (generatedId , reindexState , new ActionListener <>() {
0 commit comments