|
7 | 7 |
|
8 | 8 | import org.elasticsearch.action.ActionListener;
|
9 | 9 | import org.elasticsearch.action.DocWriteResponse;
|
| 10 | +import org.elasticsearch.action.index.IndexRequest; |
| 11 | +import org.elasticsearch.action.index.IndexResponse; |
10 | 12 | import org.elasticsearch.action.support.ActionFilters;
|
11 | 13 | import org.elasticsearch.action.support.WriteRequest;
|
12 | 14 | import org.elasticsearch.action.update.UpdateRequest;
|
@@ -89,18 +91,29 @@ protected void doExecute(PutWatchRequest request, ActionListener<PutWatchRespons
|
89 | 91 | try (XContentBuilder builder = jsonBuilder()) {
|
90 | 92 | watch.toXContent(builder, DEFAULT_PARAMS);
|
91 | 93 |
|
92 |
| - UpdateRequest updateRequest = new UpdateRequest(Watch.INDEX, Watch.DOC_TYPE, request.getId()); |
93 |
| - updateRequest.docAsUpsert(isUpdate == false); |
94 |
| - updateRequest.version(request.getVersion()); |
95 |
| - updateRequest.setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE); |
96 |
| - updateRequest.doc(builder); |
| 94 | + if (isUpdate) { |
| 95 | + UpdateRequest updateRequest = new UpdateRequest(Watch.INDEX, Watch.DOC_TYPE, request.getId()); |
| 96 | + updateRequest.version(request.getVersion()); |
| 97 | + updateRequest.setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE); |
| 98 | + updateRequest.doc(builder); |
97 | 99 |
|
98 |
| - executeAsyncWithOrigin(client.threadPool().getThreadContext(), WATCHER_ORIGIN, updateRequest, |
99 |
| - ActionListener.<UpdateResponse>wrap(response -> { |
| 100 | + executeAsyncWithOrigin(client.threadPool().getThreadContext(), WATCHER_ORIGIN, updateRequest, |
| 101 | + ActionListener.<UpdateResponse>wrap(response -> { |
| 102 | + boolean created = response.getResult() == DocWriteResponse.Result.CREATED; |
| 103 | + listener.onResponse(new PutWatchResponse(response.getId(), response.getVersion(), created)); |
| 104 | + }, listener::onFailure), |
| 105 | + client::update); |
| 106 | + } else { |
| 107 | + IndexRequest indexRequest = new IndexRequest(Watch.INDEX, Watch.DOC_TYPE, request.getId()); |
| 108 | + indexRequest.source(builder); |
| 109 | + indexRequest.setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE); |
| 110 | + executeAsyncWithOrigin(client.threadPool().getThreadContext(), WATCHER_ORIGIN, indexRequest, |
| 111 | + ActionListener.<IndexResponse>wrap(response -> { |
100 | 112 | boolean created = response.getResult() == DocWriteResponse.Result.CREATED;
|
101 | 113 | listener.onResponse(new PutWatchResponse(response.getId(), response.getVersion(), created));
|
102 | 114 | }, listener::onFailure),
|
103 |
| - client::update); |
| 115 | + client::index); |
| 116 | + } |
104 | 117 | }
|
105 | 118 | } catch (Exception e) {
|
106 | 119 | listener.onFailure(e);
|
|
0 commit comments