Skip to content

Commit 6c56dda

Browse files
committed
Adapt for origin setting client
1 parent f620d6b commit 6c56dda

File tree

2 files changed

+32
-40
lines changed

2 files changed

+32
-40
lines changed

x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/retention/ExpiredModelSnapshotsRemover.java

Lines changed: 13 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -42,9 +42,6 @@
4242
import java.util.Objects;
4343
import java.util.concurrent.TimeUnit;
4444

45-
import static org.elasticsearch.xpack.core.ClientHelper.ML_ORIGIN;
46-
import static org.elasticsearch.xpack.core.ClientHelper.executeAsyncWithOrigin;
47-
4845
/**
4946
* Deletes all model snapshots that have expired the configured retention time
5047
* of their respective job with the exception of the currently used snapshot.
@@ -109,19 +106,19 @@ private void latestSnapshotTimeStamp(String jobId, ActionListener<Long> listener
109106
searchRequest.source(searchSourceBuilder);
110107
searchRequest.indicesOptions(MlIndicesUtils.addIgnoreUnavailable(SearchRequest.DEFAULT_INDICES_OPTIONS));
111108

112-
executeAsyncWithOrigin(client.threadPool().getThreadContext(), ML_ORIGIN, searchRequest,
113-
ActionListener.<SearchResponse>wrap(
114-
response -> {
115-
SearchHit[] hits = response.getHits().getHits();
116-
if (hits.length == 0) {
117-
// no snapshots found
118-
listener.onResponse(null);
119-
} else {
120-
ModelSnapshot snapshot = ModelSnapshot.fromJson(hits[0].getSourceRef());
121-
listener.onResponse(snapshot.getTimestamp().getTime());
122-
}
123-
}, listener::onFailure
124-
), client::search);
109+
client.search(searchRequest, ActionListener.wrap(
110+
response -> {
111+
SearchHit[] hits = response.getHits().getHits();
112+
if (hits.length == 0) {
113+
// no snapshots found
114+
listener.onResponse(null);
115+
} else {
116+
ModelSnapshot snapshot = ModelSnapshot.fromJson(hits[0].getSourceRef());
117+
listener.onResponse(snapshot.getTimestamp().getTime());
118+
}
119+
},
120+
listener::onFailure)
121+
);
125122
}
126123

127124
@Override

x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/retention/ExpiredResultsRemover.java

Lines changed: 19 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,6 @@
1111
import org.elasticsearch.ElasticsearchParseException;
1212
import org.elasticsearch.action.ActionListener;
1313
import org.elasticsearch.action.search.SearchRequest;
14-
import org.elasticsearch.action.search.SearchResponse;
1514
import org.elasticsearch.action.support.ThreadedActionListener;
1615
import org.elasticsearch.client.OriginSettingClient;
1716
import org.elasticsearch.common.unit.TimeValue;
@@ -54,9 +53,6 @@
5453
import java.util.Objects;
5554
import java.util.concurrent.TimeUnit;
5655

57-
import static org.elasticsearch.xpack.core.ClientHelper.ML_ORIGIN;
58-
import static org.elasticsearch.xpack.core.ClientHelper.executeAsyncWithOrigin;
59-
6056
/**
6157
* Removes all results that have expired the configured retention time
6258
* of their respective job. A result is deleted if its timestamp is earlier
@@ -162,26 +158,25 @@ private void latestBucketTime(String jobId, ActionListener<Long> listener) {
162158
searchRequest.source(searchSourceBuilder);
163159
searchRequest.indicesOptions(MlIndicesUtils.addIgnoreUnavailable(SearchRequest.DEFAULT_INDICES_OPTIONS));
164160

165-
executeAsyncWithOrigin(client.threadPool().getThreadContext(), ML_ORIGIN, searchRequest,
166-
ActionListener.<SearchResponse>wrap(
167-
response -> {
168-
SearchHit[] hits = response.getHits().getHits();
169-
if (hits.length == 0) {
170-
// no buckets found
171-
listener.onResponse(null);
172-
} else {
173-
174-
try (InputStream stream = hits[0].getSourceRef().streamInput();
175-
XContentParser parser = XContentFactory.xContent(XContentType.JSON)
176-
.createParser(NamedXContentRegistry.EMPTY, LoggingDeprecationHandler.INSTANCE, stream)) {
177-
Bucket bucket = Bucket.LENIENT_PARSER.apply(parser, null);
178-
listener.onResponse(bucket.getTimestamp().getTime());
179-
} catch (IOException e) {
180-
listener.onFailure(new ElasticsearchParseException("failed to parse bucket", e));
181-
}
182-
}
183-
}, listener::onFailure
184-
), client::search);
161+
client.search(searchRequest, ActionListener.wrap(
162+
response -> {
163+
SearchHit[] hits = response.getHits().getHits();
164+
if (hits.length == 0) {
165+
// no buckets found
166+
listener.onResponse(null);
167+
} else {
168+
169+
try (InputStream stream = hits[0].getSourceRef().streamInput();
170+
XContentParser parser = XContentFactory.xContent(XContentType.JSON)
171+
.createParser(NamedXContentRegistry.EMPTY, LoggingDeprecationHandler.INSTANCE, stream)) {
172+
Bucket bucket = Bucket.LENIENT_PARSER.apply(parser, null);
173+
listener.onResponse(bucket.getTimestamp().getTime());
174+
} catch (IOException e) {
175+
listener.onFailure(new ElasticsearchParseException("failed to parse bucket", e));
176+
}
177+
}
178+
}, listener::onFailure
179+
));
185180
}
186181

187182
private void auditResultsWereDeleted(String jobId, long cutoffEpochMs) {

0 commit comments

Comments
 (0)