Skip to content

Commit 4946d33

Browse files
author
Hendrik Muhs
authored
[ML-Dataframe] Use AsyncTwoPhaseIndexer (#33504)
Replace mocked indexer and use AsyncTwoPhaseIndexer (introduced in #32743) instead.
1 parent b2195fb commit 4946d33

File tree

7 files changed

+312
-174
lines changed

7 files changed

+312
-174
lines changed

x-pack/plugin/ml-feature-index-builder/src/main/java/org/elasticsearch/xpack/ml/featureindexbuilder/FeatureIndexBuilder.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -36,7 +36,7 @@
3636
import org.elasticsearch.xpack.ml.featureindexbuilder.action.TransportPutFeatureIndexBuilderJobAction;
3737
import org.elasticsearch.xpack.ml.featureindexbuilder.action.TransportStartFeatureIndexBuilderJobAction;
3838
import org.elasticsearch.xpack.ml.featureindexbuilder.job.FeatureIndexBuilderJob;
39-
import org.elasticsearch.xpack.ml.featureindexbuilder.job.FeatureIndexBuilderJobTask;
39+
import org.elasticsearch.xpack.ml.featureindexbuilder.job.FeatureIndexBuilderJobPersistentTasksExecutor;
4040
import org.elasticsearch.xpack.ml.featureindexbuilder.rest.action.RestPutFeatureIndexBuilderJobAction;
4141
import org.elasticsearch.xpack.ml.featureindexbuilder.rest.action.RestStartFeatureIndexBuilderJobAction;
4242

@@ -127,7 +127,7 @@ public List<PersistentTasksExecutor<?>> getPersistentTasksExecutor(ClusterServic
127127
}
128128

129129
SchedulerEngine schedulerEngine = new SchedulerEngine(settings, Clock.systemUTC());
130-
return Collections.singletonList(new FeatureIndexBuilderJobTask.FeatureIndexBuilderJobPersistentTasksExecutor(settings, client,
130+
return Collections.singletonList(new FeatureIndexBuilderJobPersistentTasksExecutor(settings, client,
131131
schedulerEngine, threadPool));
132132
}
133133
@Override

x-pack/plugin/ml-feature-index-builder/src/main/java/org/elasticsearch/xpack/ml/featureindexbuilder/action/TransportPutFeatureIndexBuilderJobAction.java

Lines changed: 40 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -7,16 +7,19 @@
77
package org.elasticsearch.xpack.ml.featureindexbuilder.action;
88

99
import org.elasticsearch.action.ActionListener;
10+
import org.elasticsearch.action.admin.indices.create.CreateIndexRequest;
1011
import org.elasticsearch.action.support.ActionFilters;
1112
import org.elasticsearch.action.support.master.TransportMasterNodeAction;
1213
import org.elasticsearch.client.Client;
14+
import org.elasticsearch.client.IndicesAdminClient;
1315
import org.elasticsearch.cluster.ClusterState;
1416
import org.elasticsearch.cluster.block.ClusterBlockException;
1517
import org.elasticsearch.cluster.block.ClusterBlockLevel;
1618
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
1719
import org.elasticsearch.cluster.service.ClusterService;
1820
import org.elasticsearch.common.inject.Inject;
1921
import org.elasticsearch.common.settings.Settings;
22+
import org.elasticsearch.common.xcontent.XContentType;
2023
import org.elasticsearch.license.LicenseUtils;
2124
import org.elasticsearch.license.XPackLicenseState;
2225
import org.elasticsearch.persistent.PersistentTasksService;
@@ -29,8 +32,14 @@
2932
import org.elasticsearch.xpack.ml.featureindexbuilder.job.FeatureIndexBuilderJob;
3033
import org.elasticsearch.xpack.ml.featureindexbuilder.job.FeatureIndexBuilderJobConfig;
3134

35+
import static org.elasticsearch.xpack.core.ml.job.persistence.ElasticsearchMappings.DOC_TYPE;
36+
3237
public class TransportPutFeatureIndexBuilderJobAction
3338
extends TransportMasterNodeAction<PutFeatureIndexBuilderJobAction.Request, PutFeatureIndexBuilderJobAction.Response> {
39+
40+
// TODO: hack, to be replaced
41+
private static final String PIVOT_INDEX = "pivot-reviews";
42+
3443
private final XPackLicenseState licenseState;
3544
private final PersistentTasksService persistentTasksService;
3645
private final Client client;
@@ -67,7 +76,7 @@ protected void masterOperation(Request request, ClusterState clusterState, Actio
6776
XPackPlugin.checkReadyForXPackCustomMetadata(clusterState);
6877

6978
FeatureIndexBuilderJob job = createFeatureIndexBuilderJob(request.getConfig(), threadPool);
70-
79+
createIndex(client, job.getConfig().getId());
7180
startPersistentTask(job, listener, persistentTasksService);
7281
}
7382

@@ -90,4 +99,34 @@ static void startPersistentTask(FeatureIndexBuilderJob job, ActionListener<PutFe
9099
protected ClusterBlockException checkBlock(PutFeatureIndexBuilderJobAction.Request request, ClusterState state) {
91100
return state.blocks().globalBlockedException(ClusterBlockLevel.METADATA_WRITE);
92101
}
102+
103+
/*
104+
* Mocked demo case
105+
*
106+
* TODO: everything below will be replaced with proper implementation read from job configuration
107+
*/
108+
private static void createIndex(Client client, String suffix) {
109+
110+
String indexName = PIVOT_INDEX + "_" + suffix;
111+
CreateIndexRequest request = new CreateIndexRequest(indexName);
112+
113+
request.settings(Settings.builder() // <1>
114+
.put("index.number_of_shards", 1).put("index.number_of_replicas", 0));
115+
request.mapping(DOC_TYPE, // <1>
116+
"{\n" +
117+
" \"" + DOC_TYPE + "\": {\n" +
118+
" \"properties\": {\n" +
119+
" \"reviewerId\": {\n" +
120+
" \"type\": \"keyword\"\n" +
121+
" },\n" +
122+
" \"avg_rating\": {\n" +
123+
" \"type\": \"integer\"\n" +
124+
" }\n" +
125+
" }\n" +
126+
" }\n" +
127+
"}", // <2>
128+
XContentType.JSON);
129+
IndicesAdminClient adminClient = client.admin().indices();
130+
adminClient.create(request).actionGet();
131+
}
93132
}

x-pack/plugin/ml-feature-index-builder/src/main/java/org/elasticsearch/xpack/ml/featureindexbuilder/job/FeatureIndexBuilderIndexer.java

Lines changed: 58 additions & 121 deletions
Original file line numberDiff line numberDiff line change
@@ -7,188 +7,125 @@
77
package org.elasticsearch.xpack.ml.featureindexbuilder.job;
88

99
import org.apache.log4j.Logger;
10-
import org.elasticsearch.action.admin.indices.create.CreateIndexRequest;
11-
import org.elasticsearch.action.admin.indices.delete.DeleteIndexRequest;
12-
import org.elasticsearch.action.bulk.BulkRequest;
1310
import org.elasticsearch.action.index.IndexRequest;
1411
import org.elasticsearch.action.search.SearchRequest;
1512
import org.elasticsearch.action.search.SearchResponse;
16-
import org.elasticsearch.client.Client;
17-
import org.elasticsearch.client.IndicesAdminClient;
18-
import org.elasticsearch.common.settings.Settings;
1913
import org.elasticsearch.common.xcontent.XContentBuilder;
20-
import org.elasticsearch.common.xcontent.XContentType;
21-
import org.elasticsearch.index.IndexNotFoundException;
2214
import org.elasticsearch.index.query.MatchAllQueryBuilder;
2315
import org.elasticsearch.index.query.QueryBuilder;
2416
import org.elasticsearch.search.aggregations.AggregationBuilders;
2517
import org.elasticsearch.search.aggregations.bucket.composite.CompositeAggregation;
26-
import org.elasticsearch.search.aggregations.bucket.composite.CompositeAggregation.Bucket;
2718
import org.elasticsearch.search.aggregations.bucket.composite.CompositeAggregationBuilder;
2819
import org.elasticsearch.search.aggregations.bucket.composite.CompositeValuesSourceBuilder;
2920
import org.elasticsearch.search.aggregations.bucket.composite.TermsValuesSourceBuilder;
3021
import org.elasticsearch.search.aggregations.metrics.avg.InternalAvg;
3122
import org.elasticsearch.search.builder.SearchSourceBuilder;
23+
import org.elasticsearch.xpack.core.indexing.AsyncTwoPhaseIndexer;
24+
import org.elasticsearch.xpack.core.indexing.IndexerState;
25+
import org.elasticsearch.xpack.core.indexing.IterationResult;
3226

3327
import java.io.IOException;
28+
import java.io.UncheckedIOException;
3429
import java.util.ArrayList;
3530
import java.util.List;
3631
import java.util.Map;
37-
import java.util.concurrent.ExecutionException;
32+
import java.util.concurrent.Executor;
33+
import java.util.concurrent.atomic.AtomicReference;
34+
import java.util.stream.Collectors;
3835

39-
import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder;
4036
import static org.elasticsearch.xpack.core.ml.job.persistence.ElasticsearchMappings.DOC_TYPE;
37+
import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder;
4138

42-
public class FeatureIndexBuilderIndexer {
39+
public abstract class FeatureIndexBuilderIndexer extends AsyncTwoPhaseIndexer<Map<String, Object>, FeatureIndexBuilderJobStats> {
4340
private static final String PIVOT_INDEX = "pivot-reviews";
4441
private static final String SOURCE_INDEX = "anonreviews";
4542

4643
private static final Logger logger = Logger.getLogger(FeatureIndexBuilderIndexer.class.getName());
4744
private FeatureIndexBuilderJob job;
48-
private Client client;
4945

50-
public FeatureIndexBuilderIndexer(FeatureIndexBuilderJob job, Client client) {
46+
public FeatureIndexBuilderIndexer(Executor executor, FeatureIndexBuilderJob job, AtomicReference<IndexerState> initialState,
47+
Map<String, Object> initialPosition) {
48+
super(executor, initialState, initialPosition, new FeatureIndexBuilderJobStats());
5149

5250
this.job = job;
53-
this.client = client;
54-
logger.info("delete pivot-reviews");
55-
5651
}
5752

58-
public synchronized void start() {
59-
deleteIndex(client);
60-
61-
createIndex(client);
62-
63-
int runs = 0;
64-
65-
Map<String, Object> after = null;
66-
logger.info("start feature indexing");
67-
SearchResponse response;
68-
69-
try {
70-
response = runQuery(client, after);
71-
72-
CompositeAggregation compositeAggregation = response.getAggregations().get("feature");
73-
after = compositeAggregation.afterKey();
74-
75-
while (after != null) {
76-
indexBuckets(compositeAggregation);
77-
78-
++runs;
79-
response = runQuery(client, after);
80-
81-
compositeAggregation = response.getAggregations().get("feature");
82-
after = compositeAggregation.afterKey();
83-
84-
//after = null;
85-
}
86-
87-
indexBuckets(compositeAggregation);
88-
} catch (InterruptedException | ExecutionException e) {
89-
logger.error("Failed to build feature index", e);
90-
}
91-
92-
logger.info("Finished feature indexing");
53+
@Override
54+
protected String getJobId() {
55+
return job.getConfig().getId();
9356
}
9457

95-
private void indexBuckets(CompositeAggregation compositeAggregation) {
96-
BulkRequest bulkIndexRequest = new BulkRequest();
97-
try {
98-
for (Bucket b : compositeAggregation.getBuckets()) {
58+
@Override
59+
protected void onStartJob(long now) {
60+
}
9961

100-
InternalAvg avgAgg = b.getAggregations().get("avg_rating");
62+
@Override
63+
protected IterationResult<Map<String, Object>> doProcess(SearchResponse searchResponse) {
64+
final CompositeAggregation agg = searchResponse.getAggregations().get("feature");
65+
return new IterationResult<>(processBuckets(agg), agg.afterKey(), agg.getBuckets().isEmpty());
66+
}
10167

102-
XContentBuilder builder;
68+
/*
69+
* Mocked demo case
70+
*
71+
* TODO: replace with proper implementation
72+
*/
73+
private List<IndexRequest> processBuckets(CompositeAggregation agg) {
74+
return agg.getBuckets().stream().map(b -> {
75+
InternalAvg avgAgg = b.getAggregations().get("avg_rating");
76+
XContentBuilder builder;
77+
try {
10378
builder = jsonBuilder();
79+
10480
builder.startObject();
10581
builder.field("reviewerId", b.getKey().get("reviewerId"));
10682
builder.field("avg_rating", avgAgg.getValue());
10783
builder.endObject();
108-
bulkIndexRequest.add(new IndexRequest(PIVOT_INDEX, DOC_TYPE).source(builder));
109-
84+
} catch (IOException e) {
85+
throw new UncheckedIOException(e);
11086
}
111-
client.bulk(bulkIndexRequest);
112-
} catch (IOException e) {
113-
logger.error("Failed to index", e);
114-
}
87+
88+
String indexName = PIVOT_INDEX + "_" + job.getConfig().getId();
89+
IndexRequest request = new IndexRequest(indexName, DOC_TYPE).source(builder);
90+
return request;
91+
}).collect(Collectors.toList());
92+
}
93+
94+
@Override
95+
protected SearchRequest buildSearchRequest() {
96+
97+
final Map<String, Object> position = getPosition();
98+
SearchRequest request = buildFeatureQuery(position);
99+
return request;
115100
}
116-
101+
117102
/*
118-
* Hardcoded demo case for pivoting
103+
* Mocked demo case
104+
*
105+
* TODO: everything below will be replaced with proper implementation read from job configuration
119106
*/
120-
121-
private static void deleteIndex(Client client) {
122-
DeleteIndexRequest deleteIndex = new DeleteIndexRequest(PIVOT_INDEX);
123-
124-
IndicesAdminClient adminClient = client.admin().indices();
125-
try {
126-
adminClient.delete(deleteIndex).actionGet();
127-
} catch (IndexNotFoundException e) {
128-
}
129-
}
130-
131-
private static void createIndex(Client client) {
132-
133-
CreateIndexRequest request = new CreateIndexRequest(PIVOT_INDEX);
134-
request.settings(Settings.builder() // <1>
135-
.put("index.number_of_shards", 1)
136-
.put("index.number_of_replicas", 0)
137-
);
138-
request.mapping(DOC_TYPE, // <1>
139-
"{\n" +
140-
" \"" + DOC_TYPE + "\": {\n" +
141-
" \"properties\": {\n" +
142-
" \"reviewerId\": {\n" +
143-
" \"type\": \"keyword\"\n" +
144-
" },\n" +
145-
" \"avg_rating\": {\n" +
146-
" \"type\": \"integer\"\n" +
147-
" }\n" +
148-
" }\n" +
149-
" }\n" +
150-
"}", // <2>
151-
XContentType.JSON);
152-
IndicesAdminClient adminClient = client.admin().indices();
153-
adminClient.create(request).actionGet();
154-
}
155-
156107
private static SearchRequest buildFeatureQuery(Map<String, Object> after) {
157108
QueryBuilder queryBuilder = new MatchAllQueryBuilder();
158109
SearchRequest searchRequest = new SearchRequest(SOURCE_INDEX);
159-
110+
160111
List<CompositeValuesSourceBuilder<?>> sources = new ArrayList<>();
161112
sources.add(new TermsValuesSourceBuilder("reviewerId").field("reviewerId"));
162-
113+
163114
CompositeAggregationBuilder compositeAggregation = new CompositeAggregationBuilder("feature", sources);
164115
compositeAggregation.size(1000);
165-
116+
166117
if (after != null) {
167118
compositeAggregation.aggregateAfter(after);
168119
}
169-
120+
170121
compositeAggregation.subAggregation(AggregationBuilders.avg("avg_rating").field("rating"));
171122
compositeAggregation.subAggregation(AggregationBuilders.cardinality("dc_vendors").field("vendorId"));
172123
SearchSourceBuilder sourceBuilder = new SearchSourceBuilder();
173124
sourceBuilder.aggregation(compositeAggregation);
174125
sourceBuilder.size(0);
175126
sourceBuilder.query(queryBuilder);
176127
searchRequest.source(sourceBuilder);
177-
128+
178129
return searchRequest;
179-
}
180-
181-
private static SearchResponse runQuery(Client client, Map<String, Object> after) throws InterruptedException, ExecutionException {
182-
183-
SearchRequest request = buildFeatureQuery(after);
184-
SearchResponse response = client.search(request).get();
185-
186-
return response;
187-
}
188-
189-
private static void indexResult() {
190-
191-
192-
193130
}
194131
}

x-pack/plugin/ml-feature-index-builder/src/main/java/org/elasticsearch/xpack/ml/featureindexbuilder/job/FeatureIndexBuilderJob.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,8 @@
1616
import org.elasticsearch.xpack.core.XPackPlugin;
1717

1818
import java.io.IOException;
19+
import java.util.Collections;
20+
import java.util.Map;
1921
import java.util.Objects;
2022

2123
public class FeatureIndexBuilderJob implements XPackPlugin.XPackPersistentTaskParams {
@@ -92,4 +94,8 @@ public boolean equals(Object other) {
9294
public int hashCode() {
9395
return Objects.hash(config);
9496
}
97+
98+
public Map<String, String> getHeaders() {
99+
return Collections.emptyMap();
100+
}
95101
}

0 commit comments

Comments
 (0)