Skip to content

Commit 7ec9d1b

Browse files
committed
Allow dropping documents with auto-generated ID (#46773)
When using auto-generated IDs + the ingest drop processor (which looks to be used by filebeat as well) + coordinating nodes that do not have the ingest processor functionality, this can lead to a NullPointerException. The issue is that markCurrentItemAsDropped() is creating an UpdateResponse with no id when the request contains auto-generated IDs. The response serialization is lenient for our REST/XContent format (i.e. we will send "id" : null) but the internal transport format (used for communication between nodes) assumes for this field to be non-null, which means that it can't be serialized between nodes. Bulk requests with ingest functionality are processed on the coordinating node if the node has the ingest capability, and only otherwise sent to a different node. This means that, in order to reproduce this, one needs two nodes, with the coordinating node not having the ingest functionality. Closes #46678
1 parent d0831a2 commit 7ec9d1b

File tree

16 files changed

+105
-36
lines changed

16 files changed

+105
-36
lines changed

modules/reindex/src/test/java/org/elasticsearch/index/reindex/AsyncBulkByScrollActionTests.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -822,7 +822,7 @@ RequestBuilder extends ActionRequestBuilder<Request, Response, RequestBuilder>>
822822
new IndexResponse(
823823
shardId,
824824
index.type(),
825-
index.id(),
825+
index.id() == null ? "dummy_id" : index.id(),
826826
randomInt(20),
827827
randomIntBetween(1, 16),
828828
randomIntBetween(0, Integer.MAX_VALUE),

server/src/main/java/org/elasticsearch/action/DocWriteResponse.java

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,7 @@
4141
import java.io.UnsupportedEncodingException;
4242
import java.net.URLEncoder;
4343
import java.util.Locale;
44+
import java.util.Objects;
4445

4546
import static org.elasticsearch.common.xcontent.XContentParserUtils.ensureExpectedToken;
4647
import static org.elasticsearch.index.seqno.SequenceNumbers.UNASSIGNED_PRIMARY_TERM;
@@ -122,13 +123,13 @@ public void writeTo(StreamOutput out) throws IOException {
122123
protected Result result;
123124

124125
public DocWriteResponse(ShardId shardId, String type, String id, long seqNo, long primaryTerm, long version, Result result) {
125-
this.shardId = shardId;
126-
this.type = type;
127-
this.id = id;
126+
this.shardId = Objects.requireNonNull(shardId);
127+
this.type = Objects.requireNonNull(type);
128+
this.id = Objects.requireNonNull(id);
128129
this.seqNo = seqNo;
129130
this.primaryTerm = primaryTerm;
130131
this.version = version;
131-
this.result = result;
132+
this.result = Objects.requireNonNull(result);
132133
}
133134

134135
// needed for deserialization

server/src/main/java/org/elasticsearch/action/bulk/TransportBulkAction.java

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -98,6 +98,7 @@ public class TransportBulkAction extends HandledTransportAction<BulkRequest, Bul
9898
private final TransportCreateIndexAction createIndexAction;
9999
private final LongSupplier relativeTimeProvider;
100100
private final IngestActionForwarder ingestForwarder;
101+
private static final String DROPPED_ITEM_WITH_AUTO_GENERATED_ID = "auto-generated";
101102

102103
@Inject
103104
public TransportBulkAction(Settings settings, ThreadPool threadPool, TransportService transportService,
@@ -229,8 +230,8 @@ protected void doExecute(Task task, BulkRequest bulkRequest, ActionListener<Bulk
229230
final Set<String> indices = bulkRequest.requests.stream()
230231
// delete requests should not attempt to create the index (if the index does not
231232
// exists), unless an external versioning is used
232-
.filter(request -> request.opType() != DocWriteRequest.OpType.DELETE
233-
|| request.versionType() == VersionType.EXTERNAL
233+
.filter(request -> request.opType() != DocWriteRequest.OpType.DELETE
234+
|| request.versionType() == VersionType.EXTERNAL
234235
|| request.versionType() == VersionType.EXTERNAL_GTE)
235236
.map(DocWriteRequest::index)
236237
.collect(Collectors.toSet());
@@ -673,11 +674,12 @@ ActionListener<BulkResponse> wrapActionListenerIfNeeded(long ingestTookInMillis,
673674
void markCurrentItemAsDropped() {
674675
IndexRequest indexRequest = getIndexWriteRequest(bulkRequest.requests().get(currentSlot));
675676
failedSlots.set(currentSlot);
677+
final String id = indexRequest.id() == null ? DROPPED_ITEM_WITH_AUTO_GENERATED_ID : indexRequest.id();
676678
itemResponses.add(
677679
new BulkItemResponse(currentSlot, indexRequest.opType(),
678680
new UpdateResponse(
679681
new ShardId(indexRequest.index(), IndexMetaData.INDEX_UUID_NA_VALUE, 0),
680-
indexRequest.type(), indexRequest.id(), indexRequest.version(), DocWriteResponse.Result.NOOP
682+
indexRequest.type(), id, indexRequest.version(), DocWriteResponse.Result.NOOP
681683
)
682684
)
683685
);

server/src/test/java/org/elasticsearch/action/bulk/RetryTests.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@
2626
import org.elasticsearch.action.update.UpdateRequest;
2727
import org.elasticsearch.common.unit.TimeValue;
2828
import org.elasticsearch.common.util.concurrent.EsRejectedExecutionException;
29+
import org.elasticsearch.index.shard.ShardId;
2930
import org.elasticsearch.test.ESTestCase;
3031
import org.elasticsearch.test.client.NoOpClient;
3132
import org.junit.After;
@@ -226,7 +227,8 @@ public void bulk(BulkRequest request, ActionListener<BulkResponse> listener) {
226227
}
227228

228229
private BulkItemResponse successfulResponse() {
229-
return new BulkItemResponse(1, OpType.DELETE, new DeleteResponse());
230+
return new BulkItemResponse(1, OpType.DELETE, new DeleteResponse(
231+
new ShardId("test", "test", 0), "_doc", "test", 0, 0, 0, false));
230232
}
231233

232234
private BulkItemResponse failedResponse() {

server/src/test/java/org/elasticsearch/action/bulk/TransportShardBulkActionTests.java

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -275,6 +275,7 @@ public void testExecuteBulkIndexRequestWithMappingUpdates() throws Exception {
275275
Engine.IndexResult success = new FakeIndexResult(1, 1, 13, true, resultLocation);
276276

277277
IndexShard shard = mock(IndexShard.class);
278+
when(shard.shardId()).thenReturn(shardId);
278279
when(shard.applyIndexOperationOnPrimary(anyLong(), any(), any(), anyLong(), anyLong(), anyLong(), anyBoolean()))
279280
.thenReturn(mappingUpdate);
280281

@@ -606,6 +607,7 @@ public void testUpdateRequestWithSuccess() throws Exception {
606607
when(shard.applyIndexOperationOnPrimary(anyLong(), any(), any(), anyLong(), anyLong(), anyLong(), anyBoolean()))
607608
.thenReturn(indexResult);
608609
when(shard.indexSettings()).thenReturn(indexSettings);
610+
when(shard.shardId()).thenReturn(shardId);
609611

610612
UpdateHelper updateHelper = mock(UpdateHelper.class);
611613
when(updateHelper.prepare(any(), eq(shard), eq(true), any())).thenReturn(
@@ -652,6 +654,7 @@ public void testUpdateWithDelete() throws Exception {
652654
IndexShard shard = mock(IndexShard.class);
653655
when(shard.applyDeleteOperationOnPrimary(anyLong(), any(), any(), any(), anyLong(), anyLong())).thenReturn(deleteResult);
654656
when(shard.indexSettings()).thenReturn(indexSettings);
657+
when(shard.shardId()).thenReturn(shardId);
655658

656659
UpdateHelper updateHelper = mock(UpdateHelper.class);
657660
when(updateHelper.prepare(any(), eq(shard), eq(true), any())).thenReturn(
@@ -806,6 +809,7 @@ public void testRetries() throws Exception {
806809
}
807810
});
808811
when(shard.indexSettings()).thenReturn(indexSettings);
812+
when(shard.shardId()).thenReturn(shardId);
809813

810814
UpdateHelper updateHelper = mock(UpdateHelper.class);
811815
when(updateHelper.prepare(any(), eq(shard), eq(true), any())).thenReturn(
@@ -891,7 +895,7 @@ public void testRejectCASUsingSeqNo() throws Exception {
891895
private void randomlySetIgnoredPrimaryResponse(BulkItemRequest primaryRequest) {
892896
if (randomBoolean()) {
893897
// add a response to the request and thereby check that it is ignored for the primary.
894-
primaryRequest.setPrimaryResponse(new BulkItemResponse(0, DocWriteRequest.OpType.INDEX, new IndexResponse(null, "_doc",
898+
primaryRequest.setPrimaryResponse(new BulkItemResponse(0, DocWriteRequest.OpType.INDEX, new IndexResponse(shardId, "_doc",
895899
"ignore-primary-response-on-primary", 42, 42, 42, false)));
896900
}
897901
}

server/src/test/java/org/elasticsearch/action/ingest/SimulateExecutionServiceTests.java

Lines changed: 3 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -103,8 +103,7 @@ public void testExecuteItem() throws Exception {
103103

104104
public void testExecuteVerboseItemExceptionWithoutOnFailure() throws Exception {
105105
TestProcessor processor1 = new TestProcessor("processor_0", "mock", ingestDocument -> {});
106-
TestProcessor processor2 = new TestProcessor("processor_1", "mock",
107-
ingestDocument -> { throw new RuntimeException("processor failed"); });
106+
TestProcessor processor2 = new TestProcessor("processor_1", "mock", new RuntimeException("processor failed"));
108107
TestProcessor processor3 = new TestProcessor("processor_2", "mock", ingestDocument -> {});
109108
Pipeline pipeline = new Pipeline("_id", "_description", version, new CompoundProcessor(processor1, processor2, processor3));
110109
SimulateDocumentResult actualItemResponse = executionService.executeDocument(pipeline, ingestDocument, true);
@@ -128,8 +127,7 @@ public void testExecuteVerboseItemExceptionWithoutOnFailure() throws Exception {
128127
}
129128

130129
public void testExecuteVerboseItemWithOnFailure() throws Exception {
131-
TestProcessor processor1 = new TestProcessor("processor_0", "mock",
132-
ingestDocument -> { throw new RuntimeException("processor failed"); });
130+
TestProcessor processor1 = new TestProcessor("processor_0", "mock", new RuntimeException("processor failed"));
133131
TestProcessor processor2 = new TestProcessor("processor_1", "mock", ingestDocument -> {});
134132
TestProcessor processor3 = new TestProcessor("processor_2", "mock", ingestDocument -> {});
135133
Pipeline pipeline = new Pipeline("_id", "_description", version,
@@ -167,7 +165,7 @@ public void testExecuteVerboseItemWithOnFailure() throws Exception {
167165

168166
public void testExecuteVerboseItemExceptionWithIgnoreFailure() throws Exception {
169167
RuntimeException exception = new RuntimeException("processor failed");
170-
TestProcessor testProcessor = new TestProcessor("processor_0", "mock", ingestDocument -> { throw exception; });
168+
TestProcessor testProcessor = new TestProcessor("processor_0", "mock", exception);
171169
CompoundProcessor processor = new CompoundProcessor(true, Collections.singletonList(testProcessor), Collections.emptyList());
172170
Pipeline pipeline = new Pipeline("_id", "_description", version, new CompoundProcessor(processor));
173171
SimulateDocumentResult actualItemResponse = executionService.executeDocument(pipeline, ingestDocument, true);

server/src/test/java/org/elasticsearch/index/mapper/DateFieldMapperTests.java

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -96,6 +96,23 @@ public void testDefaults() throws Exception {
9696
assertFalse(dvField.fieldType().stored());
9797
}
9898

99+
public void testSimple() throws Exception {
100+
String mapping = Strings.toString(XContentFactory.jsonBuilder().startObject().startObject("type")
101+
.startObject("properties").startObject("field").field("type", "date").endObject().endObject()
102+
.endObject().endObject());
103+
104+
DocumentMapper mapper = parser.parse("type", new CompressedXContent(mapping));
105+
106+
assertEquals(mapping, mapper.mappingSource().toString());
107+
108+
ParsedDocument doc = mapper.parse(SourceToParse.source("test", "type", "1", BytesReference
109+
.bytes(XContentFactory.jsonBuilder()
110+
.startObject()
111+
.field("field", "2016-03")
112+
.endObject()),
113+
XContentType.JSON));
114+
}
115+
99116
public void testNotIndexed() throws Exception {
100117
String mapping = Strings.toString(XContentFactory.jsonBuilder().startObject().startObject("type")
101118
.startObject("properties").startObject("field").field("type", "date").field("index", false).endObject().endObject()

server/src/test/java/org/elasticsearch/ingest/CompoundProcessorTests.java

Lines changed: 14 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@
2828
import java.util.HashMap;
2929
import java.util.Map;
3030
import java.util.concurrent.TimeUnit;
31+
import java.util.function.Consumer;
3132
import java.util.function.LongSupplier;
3233

3334
import static org.hamcrest.CoreMatchers.equalTo;
@@ -74,7 +75,7 @@ public void testSingleProcessor() throws Exception {
7475
}
7576

7677
public void testSingleProcessorWithException() throws Exception {
77-
TestProcessor processor = new TestProcessor(ingestDocument -> {throw new RuntimeException("error");});
78+
TestProcessor processor = new TestProcessor(new RuntimeException("error"));
7879
LongSupplier relativeTimeProvider = mock(LongSupplier.class);
7980
when(relativeTimeProvider.getAsLong()).thenReturn(0L);
8081
CompoundProcessor compoundProcessor = new CompoundProcessor(relativeTimeProvider, processor);
@@ -93,7 +94,7 @@ public void testSingleProcessorWithException() throws Exception {
9394
}
9495

9596
public void testIgnoreFailure() throws Exception {
96-
TestProcessor processor1 = new TestProcessor(ingestDocument -> {throw new RuntimeException("error");});
97+
TestProcessor processor1 = new TestProcessor(new RuntimeException("error"));
9798
TestProcessor processor2 = new TestProcessor(ingestDocument -> {ingestDocument.setFieldValue("field", "value");});
9899
LongSupplier relativeTimeProvider = mock(LongSupplier.class);
99100
when(relativeTimeProvider.getAsLong()).thenReturn(0L);
@@ -108,7 +109,7 @@ public void testIgnoreFailure() throws Exception {
108109
}
109110

110111
public void testSingleProcessorWithOnFailureProcessor() throws Exception {
111-
TestProcessor processor1 = new TestProcessor("id", "first", ingestDocument -> {throw new RuntimeException("error");});
112+
TestProcessor processor1 = new TestProcessor("id", "first", new RuntimeException("error"));
112113
TestProcessor processor2 = new TestProcessor(ingestDocument -> {
113114
Map<String, Object> ingestMetadata = ingestDocument.getIngestMetadata();
114115
assertThat(ingestMetadata.size(), equalTo(3));
@@ -130,7 +131,7 @@ public void testSingleProcessorWithOnFailureProcessor() throws Exception {
130131
}
131132

132133
public void testSingleProcessorWithOnFailureDropProcessor() throws Exception {
133-
TestProcessor processor1 = new TestProcessor("id", "first", ingestDocument -> {throw new RuntimeException("error");});
134+
TestProcessor processor1 = new TestProcessor("id", "first", new RuntimeException("error"));
134135
Processor processor2 = new Processor() {
135136
@Override
136137
public IngestDocument execute(IngestDocument ingestDocument) throws Exception {
@@ -159,8 +160,8 @@ public String getTag() {
159160
}
160161

161162
public void testSingleProcessorWithNestedFailures() throws Exception {
162-
TestProcessor processor = new TestProcessor("id", "first", ingestDocument -> {throw new RuntimeException("error");});
163-
TestProcessor processorToFail = new TestProcessor("id2", "second", ingestDocument -> {
163+
TestProcessor processor = new TestProcessor("id", "first", new RuntimeException("error"));
164+
TestProcessor processorToFail = new TestProcessor("id2", "second", (Consumer<IngestDocument>) ingestDocument -> {
164165
Map<String, Object> ingestMetadata = ingestDocument.getIngestMetadata();
165166
assertThat(ingestMetadata.size(), equalTo(3));
166167
assertThat(ingestMetadata.get(CompoundProcessor.ON_FAILURE_MESSAGE_FIELD), equalTo("error"));
@@ -189,7 +190,7 @@ public void testSingleProcessorWithNestedFailures() throws Exception {
189190
}
190191

191192
public void testCompoundProcessorExceptionFailWithoutOnFailure() throws Exception {
192-
TestProcessor firstProcessor = new TestProcessor("id1", "first", ingestDocument -> {throw new RuntimeException("error");});
193+
TestProcessor firstProcessor = new TestProcessor("id1", "first", new RuntimeException("error"));
193194
TestProcessor secondProcessor = new TestProcessor("id3", "second", ingestDocument -> {
194195
Map<String, Object> ingestMetadata = ingestDocument.getIngestMetadata();
195196
assertThat(ingestMetadata.entrySet(), hasSize(3));
@@ -212,9 +213,9 @@ public void testCompoundProcessorExceptionFailWithoutOnFailure() throws Exceptio
212213
}
213214

214215
public void testCompoundProcessorExceptionFail() throws Exception {
215-
TestProcessor firstProcessor = new TestProcessor("id1", "first", ingestDocument -> {throw new RuntimeException("error");});
216+
TestProcessor firstProcessor = new TestProcessor("id1", "first", new RuntimeException("error"));
216217
TestProcessor failProcessor =
217-
new TestProcessor("tag_fail", "fail", ingestDocument -> {throw new RuntimeException("custom error message");});
218+
new TestProcessor("tag_fail", "fail", new RuntimeException("custom error message"));
218219
TestProcessor secondProcessor = new TestProcessor("id3", "second", ingestDocument -> {
219220
Map<String, Object> ingestMetadata = ingestDocument.getIngestMetadata();
220221
assertThat(ingestMetadata.entrySet(), hasSize(3));
@@ -238,9 +239,9 @@ public void testCompoundProcessorExceptionFail() throws Exception {
238239
}
239240

240241
public void testCompoundProcessorExceptionFailInOnFailure() throws Exception {
241-
TestProcessor firstProcessor = new TestProcessor("id1", "first", ingestDocument -> {throw new RuntimeException("error");});
242+
TestProcessor firstProcessor = new TestProcessor("id1", "first", new RuntimeException("error"));
242243
TestProcessor failProcessor =
243-
new TestProcessor("tag_fail", "fail", ingestDocument -> {throw new RuntimeException("custom error message");});
244+
new TestProcessor("tag_fail", "fail", new RuntimeException("custom error message"));
244245
TestProcessor secondProcessor = new TestProcessor("id3", "second", ingestDocument -> {
245246
Map<String, Object> ingestMetadata = ingestDocument.getIngestMetadata();
246247
assertThat(ingestMetadata.entrySet(), hasSize(3));
@@ -264,8 +265,8 @@ public void testCompoundProcessorExceptionFailInOnFailure() throws Exception {
264265
}
265266

266267
public void testBreakOnFailure() throws Exception {
267-
TestProcessor firstProcessor = new TestProcessor("id1", "first", ingestDocument -> {throw new RuntimeException("error1");});
268-
TestProcessor secondProcessor = new TestProcessor("id2", "second", ingestDocument -> {throw new RuntimeException("error2");});
268+
TestProcessor firstProcessor = new TestProcessor("id1", "first", new RuntimeException("error1"));
269+
TestProcessor secondProcessor = new TestProcessor("id2", "second", new RuntimeException("error2"));
269270
TestProcessor onFailureProcessor = new TestProcessor("id2", "on_failure", ingestDocument -> {});
270271
LongSupplier relativeTimeProvider = mock(LongSupplier.class);
271272
when(relativeTimeProvider.getAsLong()).thenReturn(0L);

server/src/test/java/org/elasticsearch/ingest/IngestClientIT.java

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -273,4 +273,25 @@ public void testPutWithPipelineFactoryError() throws Exception {
273273
GetPipelineResponse response = client().admin().cluster().prepareGetPipeline("_id2").get();
274274
assertFalse(response.isFound());
275275
}
276+
277+
public void testWithDedicatedMaster() throws Exception {
278+
String masterOnlyNode = internalCluster().startMasterOnlyNode();
279+
BytesReference source = BytesReference.bytes(jsonBuilder().startObject()
280+
.field("description", "my_pipeline")
281+
.startArray("processors")
282+
.startObject()
283+
.startObject("test")
284+
.endObject()
285+
.endObject()
286+
.endArray()
287+
.endObject());
288+
PutPipelineRequest putPipelineRequest = new PutPipelineRequest("_id", source, XContentType.JSON);
289+
client().admin().cluster().putPipeline(putPipelineRequest).get();
290+
291+
BulkItemResponse item = client(masterOnlyNode).prepareBulk().add(
292+
client().prepareIndex("test", "type").setSource("field", "value2", "drop", true).setPipeline("_id")).get()
293+
.getItems()[0];
294+
assertFalse(item.isFailed());
295+
assertEquals("auto-generated", item.getResponse().getId());
296+
}
276297
}

server/src/test/java/org/elasticsearch/ingest/TrackingResultProcessorTests.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -101,7 +101,7 @@ public void testActualCompoundProcessorWithoutOnFailure() throws Exception {
101101

102102
public void testActualCompoundProcessorWithOnFailure() throws Exception {
103103
RuntimeException exception = new RuntimeException("fail");
104-
TestProcessor failProcessor = new TestProcessor("fail", "test", ingestDocument -> { throw exception; });
104+
TestProcessor failProcessor = new TestProcessor("fail", "test", exception);
105105
TestProcessor onFailureProcessor = new TestProcessor("success", "test", ingestDocument -> {});
106106
CompoundProcessor actualProcessor = new CompoundProcessor(false,
107107
Arrays.asList(new CompoundProcessor(false,

test/framework/src/main/java/org/elasticsearch/ingest/IngestTestPlugin.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,10 @@ public Map<String, Processor.Factory> getProcessors(Processor.Parameters paramet
3737
if (doc.hasField("fail") && doc.getFieldValue("fail", Boolean.class)) {
3838
throw new IllegalArgumentException("test processor failed");
3939
}
40+
if (doc.hasField("drop") && doc.getFieldValue("drop", Boolean.class)) {
41+
return null;
42+
}
43+
return doc;
4044
}));
4145
}
4246
}

0 commit comments

Comments
 (0)