Skip to content

Commit c1a1ea9

Browse files
authored
Remove blocking code in SearchDocument processing.
Original Pull Request #2094 Closes #2025
1 parent 453460f commit c1a1ea9

File tree

4 files changed

+83
-47
lines changed

4 files changed

+83
-47
lines changed

src/main/java/org/springframework/data/elasticsearch/core/ElasticsearchRestTemplate.java

+12-7
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121
import java.util.Iterator;
2222
import java.util.List;
2323
import java.util.Map;
24+
import java.util.concurrent.CompletableFuture;
2425
import java.util.stream.Collectors;
2526
import java.util.stream.Stream;
2627

@@ -386,7 +387,7 @@ public <T> SearchHits<T> search(Query query, Class<T> clazz, IndexCoordinates in
386387
ReadDocumentCallback<T> documentCallback = new ReadDocumentCallback<>(elasticsearchConverter, clazz, index);
387388
SearchDocumentResponseCallback<SearchHits<T>> callback = new ReadSearchDocumentResponseCallback<>(clazz, index);
388389

389-
return callback.doWith(SearchDocumentResponse.from(response, documentCallback::doWith));
390+
return callback.doWith(SearchDocumentResponse.from(response, getEntityCreator(documentCallback)));
390391
}
391392

392393
protected <T> SearchHits<T> doSearch(MoreLikeThisQuery query, Class<T> clazz, IndexCoordinates index) {
@@ -410,7 +411,7 @@ public <T> SearchScrollHits<T> searchScrollStart(long scrollTimeInMillis, Query
410411
ReadDocumentCallback<T> documentCallback = new ReadDocumentCallback<>(elasticsearchConverter, clazz, index);
411412
SearchDocumentResponseCallback<SearchScrollHits<T>> callback = new ReadSearchScrollDocumentResponseCallback<>(clazz,
412413
index);
413-
return callback.doWith(SearchDocumentResponse.from(response, documentCallback::doWith));
414+
return callback.doWith(SearchDocumentResponse.from(response, getEntityCreator(documentCallback)));
414415
}
415416

416417
@Override
@@ -425,7 +426,7 @@ public <T> SearchScrollHits<T> searchScrollContinue(@Nullable String scrollId, l
425426
ReadDocumentCallback<T> documentCallback = new ReadDocumentCallback<>(elasticsearchConverter, clazz, index);
426427
SearchDocumentResponseCallback<SearchScrollHits<T>> callback = new ReadSearchScrollDocumentResponseCallback<>(clazz,
427428
index);
428-
return callback.doWith(SearchDocumentResponse.from(response, documentCallback::doWith));
429+
return callback.doWith(SearchDocumentResponse.from(response, getEntityCreator(documentCallback)));
429430
}
430431

431432
@Override
@@ -458,7 +459,7 @@ public <T> List<SearchHits<T>> multiSearch(List<? extends Query> queries, Class<
458459
SearchDocumentResponseCallback<SearchHits<T>> callback = new ReadSearchDocumentResponseCallback<>(clazz, index);
459460
List<SearchHits<T>> res = new ArrayList<>(queries.size());
460461
for (int i = 0; i < queries.size(); i++) {
461-
res.add(callback.doWith(SearchDocumentResponse.from(items[i].getResponse(), documentCallback::doWith)));
462+
res.add(callback.doWith(SearchDocumentResponse.from(items[i].getResponse(), getEntityCreator(documentCallback))));
462463
}
463464
return res;
464465
}
@@ -491,7 +492,7 @@ public List<SearchHits<?>> multiSearch(List<? extends Query> queries, List<Class
491492
index);
492493

493494
SearchResponse response = items[i].getResponse();
494-
res.add(callback.doWith(SearchDocumentResponse.from(response, documentCallback::doWith)));
495+
res.add(callback.doWith(SearchDocumentResponse.from(response, getEntityCreator(documentCallback))));
495496
}
496497
return res;
497498
}
@@ -524,7 +525,7 @@ public List<SearchHits<?>> multiSearch(List<? extends Query> queries, List<Class
524525
index);
525526

526527
SearchResponse response = items[i].getResponse();
527-
res.add(callback.doWith(SearchDocumentResponse.from(response, documentCallback::doWith)));
528+
res.add(callback.doWith(SearchDocumentResponse.from(response, getEntityCreator(documentCallback))));
528529
}
529530
return res;
530531
}
@@ -535,8 +536,12 @@ protected MultiSearchResponse.Item[] getMultiSearchResult(MultiSearchRequest req
535536
Assert.isTrue(items.length == request.requests().size(), "Response should has same length with queries");
536537
return items;
537538
}
538-
// endregion
539539

540+
private <T> SearchDocumentResponse.EntityCreator<T> getEntityCreator(ReadDocumentCallback<T> documentCallback) {
541+
return searchDocument -> CompletableFuture.completedFuture(documentCallback.doWith(searchDocument));
542+
}
543+
544+
// endregion
540545
// region ClientCallback
541546
/**
542547
* Callback interface to be used with {@link #execute(ClientCallback)} for operating directly on

src/main/java/org/springframework/data/elasticsearch/core/ReactiveElasticsearchTemplate.java

+7-6
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,6 @@
2323
import java.util.HashMap;
2424
import java.util.List;
2525
import java.util.Map;
26-
import java.util.function.Function;
2726
import java.util.stream.Collectors;
2827

2928
import org.apache.commons.logging.Log;
@@ -820,15 +819,17 @@ private Flux<SearchDocument> doFind(Query query, Class<?> clazz, IndexCoordinate
820819
});
821820
}
822821

823-
private Mono<SearchDocumentResponse> doFindForResponse(Query query, Class<?> clazz, IndexCoordinates index) {
822+
private <T> Mono<SearchDocumentResponse> doFindForResponse(Query query, Class<?> clazz, IndexCoordinates index) {
824823

825824
return Mono.defer(() -> {
826825
SearchRequest request = requestFactory.searchRequest(query, clazz, index);
827826
request = prepareSearchRequest(request, false);
828827

829828
SearchDocumentCallback<?> documentCallback = new ReadSearchDocumentCallback<>(clazz, index);
830-
Function<SearchDocument, Object> entityCreator = searchDocument -> documentCallback.toEntity(searchDocument)
831-
.block();
829+
// noinspection unchecked
830+
SearchDocumentResponse.EntityCreator<T> entityCreator = searchDocument -> ((Mono<T>) documentCallback
831+
.toEntity(searchDocument)).toFuture();
832+
832833
return doFindForResponse(request, entityCreator);
833834
});
834835
}
@@ -949,8 +950,8 @@ protected Flux<SearchDocument> doFind(SearchRequest request) {
949950
* @param entityCreator
950951
* @return a {@link Mono} emitting the result of the operation converted to s {@link SearchDocumentResponse}.
951952
*/
952-
protected Mono<SearchDocumentResponse> doFindForResponse(SearchRequest request,
953-
Function<SearchDocument, ? extends Object> entityCreator) {
953+
protected <T> Mono<SearchDocumentResponse> doFindForResponse(SearchRequest request,
954+
SearchDocumentResponse.EntityCreator<T> entityCreator) {
954955

955956
if (QUERY_LOGGER.isDebugEnabled()) {
956957
QUERY_LOGGER.debug(String.format("Executing doFindForResponse: %s", request));

src/main/java/org/springframework/data/elasticsearch/core/document/SearchDocumentResponse.java

+32-6
Original file line numberDiff line numberDiff line change
@@ -17,8 +17,11 @@
1717

1818
import java.util.ArrayList;
1919
import java.util.List;
20+
import java.util.concurrent.CompletableFuture;
2021
import java.util.function.Function;
2122

23+
import org.apache.commons.logging.Log;
24+
import org.apache.commons.logging.LogFactory;
2225
import org.apache.lucene.search.TotalHits;
2326
import org.elasticsearch.action.search.SearchResponse;
2427
import org.elasticsearch.common.text.Text;
@@ -38,13 +41,15 @@
3841

3942
/**
4043
* This represents the complete search response from Elasticsearch, including the returned documents. Instances must be
41-
* created with the {@link #from(SearchResponse,Function)} method.
44+
* created with the {@link #from(SearchResponse, EntityCreator)} method.
4245
*
4346
* @author Peter-Josef Meisch
4447
* @since 4.0
4548
*/
4649
public class SearchDocumentResponse {
4750

51+
private static final Log LOGGER = LogFactory.getLog(SearchDocumentResponse.class);
52+
4853
private final long totalHits;
4954
private final String totalHitsRelation;
5055
private final float maxScore;
@@ -102,8 +107,7 @@ public Suggest getSuggest() {
102107
* @param <T> entity type
103108
* @return the SearchDocumentResponse
104109
*/
105-
public static <T> SearchDocumentResponse from(SearchResponse searchResponse,
106-
Function<SearchDocument, T> entityCreator) {
110+
public static <T> SearchDocumentResponse from(SearchResponse searchResponse, EntityCreator<T> entityCreator) {
107111

108112
Assert.notNull(searchResponse, "searchResponse must not be null");
109113

@@ -129,7 +133,7 @@ public static <T> SearchDocumentResponse from(SearchResponse searchResponse,
129133
*/
130134
public static <T> SearchDocumentResponse from(SearchHits searchHits, @Nullable String scrollId,
131135
@Nullable Aggregations aggregations, @Nullable org.elasticsearch.search.suggest.Suggest suggestES,
132-
Function<SearchDocument, T> entityCreator) {
136+
EntityCreator<T> entityCreator) {
133137

134138
TotalHits responseTotalHits = searchHits.getTotalHits();
135139

@@ -160,7 +164,7 @@ public static <T> SearchDocumentResponse from(SearchHits searchHits, @Nullable S
160164

161165
@Nullable
162166
private static <T> Suggest suggestFrom(@Nullable org.elasticsearch.search.suggest.Suggest suggestES,
163-
Function<SearchDocument, T> entityCreator) {
167+
EntityCreator<T> entityCreator) {
164168

165169
if (suggestES == null) {
166170
return null;
@@ -219,7 +223,19 @@ private static <T> Suggest suggestFrom(@Nullable org.elasticsearch.search.sugges
219223
List<CompletionSuggestion.Entry.Option<T>> options = new ArrayList<>();
220224
for (org.elasticsearch.search.suggest.completion.CompletionSuggestion.Entry.Option optionES : entryES) {
221225
SearchDocument searchDocument = optionES.getHit() != null ? DocumentAdapters.from(optionES.getHit()) : null;
222-
T hitEntity = searchDocument != null ? entityCreator.apply(searchDocument) : null;
226+
227+
T hitEntity = null;
228+
229+
if (searchDocument != null) {
230+
try {
231+
hitEntity = entityCreator.apply(searchDocument).get();
232+
} catch (Exception e) {
233+
if (LOGGER.isWarnEnabled()) {
234+
LOGGER.warn("Error creating entity from SearchDocument");
235+
}
236+
}
237+
}
238+
223239
options.add(new CompletionSuggestion.Entry.Option<T>(textToString(optionES.getText()),
224240
textToString(optionES.getHighlighted()), optionES.getScore(), optionES.collateMatch(),
225241
optionES.getContexts(), scoreDocFrom(optionES.getDoc()), searchDocument, hitEntity));
@@ -254,4 +270,14 @@ private static ScoreDoc scoreDocFrom(@Nullable org.apache.lucene.search.ScoreDoc
254270
private static String textToString(@Nullable Text text) {
255271
return text != null ? text.string() : "";
256272
}
273+
274+
/**
275+
* A function to convert a {@link SearchDocument} async into an entity. Asynchronous so that it can be used from the
276+
* imperative and the reactive code.
277+
*
278+
* @param <T> the entity type
279+
*/
280+
@FunctionalInterface
281+
public interface EntityCreator<T> extends Function<SearchDocument, CompletableFuture<T>> {}
282+
257283
}

src/test/java/org/springframework/data/elasticsearch/core/suggest/SuggestReactiveTemplateIntegrationTests.java

+32-28
Original file line numberDiff line numberDiff line change
@@ -37,11 +37,11 @@
3737
import org.springframework.data.annotation.Id;
3838
import org.springframework.data.elasticsearch.annotations.CompletionField;
3939
import org.springframework.data.elasticsearch.annotations.Document;
40-
import org.springframework.data.elasticsearch.core.query.NativeSearchQuery;
41-
import org.springframework.data.elasticsearch.core.query.NativeSearchQueryBuilder;
4240
import org.springframework.data.elasticsearch.core.ReactiveElasticsearchOperations;
4341
import org.springframework.data.elasticsearch.core.mapping.IndexCoordinates;
4442
import org.springframework.data.elasticsearch.core.query.IndexQuery;
43+
import org.springframework.data.elasticsearch.core.query.NativeSearchQueryBuilder;
44+
import org.springframework.data.elasticsearch.core.query.Query;
4545
import org.springframework.data.elasticsearch.core.suggest.response.CompletionSuggestion;
4646
import org.springframework.data.elasticsearch.core.suggest.response.Suggest;
4747
import org.springframework.data.elasticsearch.junit.jupiter.ReactiveElasticsearchRestTemplateConfiguration;
@@ -88,34 +88,38 @@ void shouldDoSomeTest() {
8888
@DisplayName("should find suggestions for given prefix completion")
8989
void shouldFindSuggestionsForGivenPrefixCompletion() {
9090

91-
loadCompletionObjectEntities().map(unused -> {
92-
93-
NativeSearchQuery query = new NativeSearchQueryBuilder().withSuggestBuilder(new SuggestBuilder()
94-
.addSuggestion("test-suggest", SuggestBuilders.completionSuggestion("suggest").prefix("m", Fuzziness.AUTO)))
95-
.build();
96-
97-
operations.suggest(query, CompletionEntity.class) //
98-
.as(StepVerifier::create) //
99-
.assertNext(suggest -> {
100-
Suggest.Suggestion<? extends Suggest.Suggestion.Entry<? extends Suggest.Suggestion.Entry.Option>> suggestion = suggest
101-
.getSuggestion("test-suggest");
102-
assertThat(suggestion).isNotNull();
103-
assertThat(suggestion).isInstanceOf(CompletionSuggestion.class);
104-
// noinspection unchecked
105-
List<CompletionSuggestion.Entry.Option<CompletionIntegrationTests.AnnotatedCompletionEntity>> options = ((CompletionSuggestion<CompletionIntegrationTests.AnnotatedCompletionEntity>) suggestion)
106-
.getEntries().get(0).getOptions();
107-
assertThat(options).hasSize(2);
108-
assertThat(options.get(0).getText()).isIn("Marchand", "Mohsin");
109-
assertThat(options.get(1).getText()).isIn("Marchand", "Mohsin");
110-
111-
}) //
112-
.verifyComplete();
113-
return Mono.empty();
114-
});
91+
loadCompletionObjectEntities() //
92+
.flatMap(unused -> {
93+
Query query = getSuggestQuery("test-suggest", "suggest", "m");
94+
return operations.suggest(query, CompletionEntity.class);
95+
}) //
96+
.as(StepVerifier::create) //
97+
.assertNext(suggest -> {
98+
Suggest.Suggestion<? extends Suggest.Suggestion.Entry<? extends Suggest.Suggestion.Entry.Option>> suggestion = suggest
99+
.getSuggestion("test-suggest");
100+
assertThat(suggestion).isNotNull();
101+
assertThat(suggestion).isInstanceOf(CompletionSuggestion.class);
102+
// noinspection unchecked
103+
List<CompletionSuggestion.Entry.Option<CompletionIntegrationTests.AnnotatedCompletionEntity>> options = ((CompletionSuggestion<CompletionIntegrationTests.AnnotatedCompletionEntity>) suggestion)
104+
.getEntries().get(0).getOptions();
105+
assertThat(options).hasSize(2);
106+
assertThat(options.get(0).getText()).isIn("Marchand", "Mohsin");
107+
assertThat(options.get(1).getText()).isIn("Marchand", "Mohsin");
108+
}) //
109+
.verifyComplete();
110+
}
111+
112+
protected Query getSuggestQuery(String suggestionName, String fieldName, String prefix) {
113+
return new NativeSearchQueryBuilder() //
114+
.withSuggestBuilder(new SuggestBuilder() //
115+
.addSuggestion(suggestionName, //
116+
SuggestBuilders.completionSuggestion(fieldName) //
117+
.prefix(prefix, Fuzziness.AUTO))) //
118+
.build(); //
115119
}
116120

117121
// region helper functions
118-
private Mono<Void> loadCompletionObjectEntities() {
122+
private Mono<CompletionEntity> loadCompletionObjectEntities() {
119123

120124
CompletionEntity rizwan_idrees = new CompletionEntityBuilder("1").name("Rizwan Idrees")
121125
.suggest(new String[] { "Rizwan Idrees" }).build();
@@ -128,7 +132,7 @@ private Mono<Void> loadCompletionObjectEntities() {
128132
List<CompletionEntity> entities = new ArrayList<>(
129133
Arrays.asList(rizwan_idrees, franck_marchand, mohsin_husen, artur_konczak));
130134
IndexCoordinates index = IndexCoordinates.of(indexNameProvider.indexName());
131-
return operations.saveAll(entities, index).then();
135+
return operations.saveAll(entities, index).last();
132136
}
133137
// endregion
134138

0 commit comments

Comments
 (0)