62
62
import org .elasticsearch .common .util .concurrent .AtomicArray ;
63
63
import org .elasticsearch .index .Index ;
64
64
import org .elasticsearch .index .IndexNotFoundException ;
65
- import org .elasticsearch .index .VersionType ;
66
65
import org .elasticsearch .index .IndexingPressure ;
66
+ import org .elasticsearch .index .VersionType ;
67
67
import org .elasticsearch .index .seqno .SequenceNumbers ;
68
68
import org .elasticsearch .index .shard .ShardId ;
69
69
import org .elasticsearch .indices .IndexClosedException ;
70
+ import org .elasticsearch .indices .SystemIndices ;
70
71
import org .elasticsearch .ingest .IngestService ;
71
72
import org .elasticsearch .node .NodeClosedException ;
72
73
import org .elasticsearch .tasks .Task ;
73
74
import org .elasticsearch .threadpool .ThreadPool ;
75
+ import org .elasticsearch .threadpool .ThreadPool .Names ;
74
76
import org .elasticsearch .transport .TransportService ;
75
77
76
78
import java .util .ArrayList ;
82
84
import java .util .Map ;
83
85
import java .util .Objects ;
84
86
import java .util .Set ;
87
+ import java .util .SortedMap ;
85
88
import java .util .concurrent .TimeUnit ;
86
89
import java .util .concurrent .atomic .AtomicInteger ;
87
90
import java .util .concurrent .atomic .AtomicIntegerArray ;
@@ -112,22 +115,24 @@ public class TransportBulkAction extends HandledTransportAction<BulkRequest, Bul
112
115
private final IndexNameExpressionResolver indexNameExpressionResolver ;
113
116
private static final String DROPPED_ITEM_WITH_AUTO_GENERATED_ID = "auto-generated" ;
114
117
private final IndexingPressure indexingPressure ;
118
+ private final SystemIndices systemIndices ;
115
119
116
120
@ Inject
117
121
public TransportBulkAction (ThreadPool threadPool , TransportService transportService ,
118
122
ClusterService clusterService , IngestService ingestService ,
119
- TransportShardBulkAction shardBulkAction , NodeClient client ,
120
- ActionFilters actionFilters , IndexNameExpressionResolver indexNameExpressionResolver ,
121
- AutoCreateIndex autoCreateIndex , IndexingPressure indexingPressure ) {
123
+ TransportShardBulkAction shardBulkAction , NodeClient client , ActionFilters actionFilters ,
124
+ IndexNameExpressionResolver indexNameExpressionResolver ,
125
+ AutoCreateIndex autoCreateIndex , IndexingPressure indexingPressure , SystemIndices systemIndices ) {
122
126
this (threadPool , transportService , clusterService , ingestService , shardBulkAction , client , actionFilters ,
123
- indexNameExpressionResolver , autoCreateIndex , indexingPressure , System ::nanoTime );
127
+ indexNameExpressionResolver , autoCreateIndex , indexingPressure , systemIndices , System ::nanoTime );
124
128
}
125
129
126
130
public TransportBulkAction (ThreadPool threadPool , TransportService transportService ,
127
131
ClusterService clusterService , IngestService ingestService ,
128
132
TransportShardBulkAction shardBulkAction , NodeClient client ,
129
133
ActionFilters actionFilters , IndexNameExpressionResolver indexNameExpressionResolver ,
130
- AutoCreateIndex autoCreateIndex , IndexingPressure indexingPressure , LongSupplier relativeTimeProvider ) {
134
+ AutoCreateIndex autoCreateIndex , IndexingPressure indexingPressure , SystemIndices systemIndices ,
135
+ LongSupplier relativeTimeProvider ) {
131
136
super (BulkAction .NAME , transportService , actionFilters , BulkRequest ::new , ThreadPool .Names .SAME );
132
137
Objects .requireNonNull (relativeTimeProvider );
133
138
this .threadPool = threadPool ;
@@ -140,6 +145,7 @@ public TransportBulkAction(ThreadPool threadPool, TransportService transportServ
140
145
this .client = client ;
141
146
this .indexNameExpressionResolver = indexNameExpressionResolver ;
142
147
this .indexingPressure = indexingPressure ;
148
+ this .systemIndices = systemIndices ;
143
149
clusterService .addStateApplier (this .ingestForwarder );
144
150
}
145
151
@@ -163,17 +169,19 @@ public static IndexRequest getIndexWriteRequest(DocWriteRequest<?> docWriteReque
163
169
164
170
@ Override
165
171
protected void doExecute (Task task , BulkRequest bulkRequest , ActionListener <BulkResponse > listener ) {
166
- long indexingBytes = bulkRequest .ramBytesUsed ();
167
- final Releasable releasable = indexingPressure .markCoordinatingOperationStarted (indexingBytes );
172
+ final long indexingBytes = bulkRequest .ramBytesUsed ();
173
+ final boolean isOnlySystem = isOnlySystem (bulkRequest , clusterService .state ().metadata ().getIndicesLookup (), systemIndices );
174
+ final Releasable releasable = indexingPressure .markCoordinatingOperationStarted (indexingBytes , isOnlySystem );
168
175
final ActionListener <BulkResponse > releasingListener = ActionListener .runBefore (listener , releasable ::close );
176
+ final String executorName = isOnlySystem ? Names .SYSTEM_WRITE : Names .WRITE ;
169
177
try {
170
- doInternalExecute (task , bulkRequest , releasingListener );
178
+ doInternalExecute (task , bulkRequest , executorName , releasingListener );
171
179
} catch (Exception e ) {
172
180
releasingListener .onFailure (e );
173
181
}
174
182
}
175
183
176
- protected void doInternalExecute (Task task , BulkRequest bulkRequest , ActionListener <BulkResponse > listener ) {
184
+ protected void doInternalExecute (Task task , BulkRequest bulkRequest , String executorName , ActionListener <BulkResponse > listener ) {
177
185
final long startTime = relativeTime ();
178
186
final AtomicArray <BulkItemResponse > responses = new AtomicArray <>(bulkRequest .requests .size ());
179
187
@@ -211,7 +219,7 @@ protected void doInternalExecute(Task task, BulkRequest bulkRequest, ActionListe
211
219
assert arePipelinesResolved : bulkRequest ;
212
220
}
213
221
if (clusterService .localNode ().isIngestNode ()) {
214
- processBulkIndexIngestRequest (task , bulkRequest , listener );
222
+ processBulkIndexIngestRequest (task , bulkRequest , executorName , listener );
215
223
} else {
216
224
ingestForwarder .forwardIngestRequest (BulkAction .INSTANCE , bulkRequest , listener );
217
225
}
@@ -261,7 +269,7 @@ protected void doInternalExecute(Task task, BulkRequest bulkRequest, ActionListe
261
269
@ Override
262
270
public void onResponse (CreateIndexResponse result ) {
263
271
if (counter .decrementAndGet () == 0 ) {
264
- threadPool .executor (ThreadPool . Names . WRITE ).execute (
272
+ threadPool .executor (executorName ).execute (
265
273
() -> executeBulk (task , bulkRequest , startTime , listener , responses , indicesThatCannotBeCreated ));
266
274
}
267
275
}
@@ -278,10 +286,11 @@ public void onFailure(Exception e) {
278
286
}
279
287
}
280
288
if (counter .decrementAndGet () == 0 ) {
281
- executeBulk (task , bulkRequest , startTime , ActionListener .wrap (listener ::onResponse , inner -> {
289
+ threadPool .executor (executorName ).execute (() -> executeBulk (task , bulkRequest , startTime ,
290
+ ActionListener .wrap (listener ::onResponse , inner -> {
282
291
inner .addSuppressed (e );
283
292
listener .onFailure (inner );
284
- }), responses , indicesThatCannotBeCreated );
293
+ }), responses , indicesThatCannotBeCreated )) ;
285
294
}
286
295
}
287
296
});
@@ -342,6 +351,18 @@ static void prohibitCustomRoutingOnDataStream(DocWriteRequest<?> writeRequest, M
342
351
}
343
352
}
344
353
354
+ boolean isOnlySystem (BulkRequest request , SortedMap <String , IndexAbstraction > indicesLookup , SystemIndices systemIndices ) {
355
+ final boolean onlySystem = request .getIndices ().stream ().allMatch (indexName -> {
356
+ final IndexAbstraction abstraction = indicesLookup .get (indexName );
357
+ if (abstraction != null ) {
358
+ return abstraction .isSystem ();
359
+ } else {
360
+ return systemIndices .isSystemIndex (indexName );
361
+ }
362
+ });
363
+ return onlySystem ;
364
+ }
365
+
345
366
boolean needToCheck () {
346
367
return autoCreateIndex .needToCheck ();
347
368
}
@@ -662,7 +683,8 @@ private long relativeTime() {
662
683
return relativeTimeProvider .getAsLong ();
663
684
}
664
685
665
- private void processBulkIndexIngestRequest (Task task , BulkRequest original , ActionListener <BulkResponse > listener ) {
686
+ private void processBulkIndexIngestRequest (Task task , BulkRequest original , String executorName ,
687
+ ActionListener <BulkResponse > listener ) {
666
688
final long ingestStartTimeInNanos = System .nanoTime ();
667
689
final BulkRequestModifier bulkRequestModifier = new BulkRequestModifier (original );
668
690
ingestService .executeBulkRequest (
@@ -687,18 +709,18 @@ private void processBulkIndexIngestRequest(Task task, BulkRequest original, Acti
687
709
// If a processor went async and returned a response on a different thread then
688
710
// before we continue the bulk request we should fork back on a write thread:
689
711
if (originalThread == Thread .currentThread ()) {
690
- assert Thread .currentThread ().getName ().contains (ThreadPool . Names . WRITE );
691
- doInternalExecute (task , bulkRequest , actionListener );
712
+ assert Thread .currentThread ().getName ().contains (executorName );
713
+ doInternalExecute (task , bulkRequest , executorName , actionListener );
692
714
} else {
693
- threadPool .executor (ThreadPool . Names . WRITE ).execute (new AbstractRunnable () {
715
+ threadPool .executor (executorName ).execute (new AbstractRunnable () {
694
716
@ Override
695
717
public void onFailure (Exception e ) {
696
718
listener .onFailure (e );
697
719
}
698
720
699
721
@ Override
700
722
protected void doRun () throws Exception {
701
- doInternalExecute (task , bulkRequest , actionListener );
723
+ doInternalExecute (task , bulkRequest , executorName , actionListener );
702
724
}
703
725
704
726
@ Override
@@ -714,7 +736,8 @@ public boolean isForceExecution() {
714
736
}
715
737
}
716
738
},
717
- bulkRequestModifier ::markItemAsDropped
739
+ bulkRequestModifier ::markItemAsDropped ,
740
+ executorName
718
741
);
719
742
}
720
743
0 commit comments