@@ -97,20 +97,26 @@ void setConfiguration(Configuration configuration) {
97
97
*
98
98
* @param shardId The shard id object of the document being processed
99
99
* @param operation The index operation
100
- * @return The index operation
100
+ * @param result The result of the operation
101
101
*/
102
102
@ Override
103
- public Engine .Index preIndex (ShardId shardId , Engine .Index operation ) {
103
+ public void postIndex (ShardId shardId , Engine .Index operation , Engine .IndexResult result ) {
104
+
104
105
if (isWatchDocument (shardId .getIndexName ())) {
106
+ if (result .getResultType () == Engine .Result .Type .FAILURE ) {
107
+ postIndex (shardId , operation , result .getFailure ());
108
+ return ;
109
+ }
110
+
105
111
ZonedDateTime now = Instant .ofEpochMilli (clock .millis ()).atZone (ZoneOffset .UTC );
106
112
try {
107
113
Watch watch = parser .parseWithSecrets (operation .id (), true , operation .source (), now , XContentType .JSON ,
108
114
operation .getIfSeqNo (), operation .getIfPrimaryTerm ());
109
115
ShardAllocationConfiguration shardAllocationConfiguration = configuration .localShards .get (shardId );
110
116
if (shardAllocationConfiguration == null ) {
111
117
logger .debug ("no distributed watch execution info found for watch [{}] on shard [{}], got configuration for {}" ,
112
- watch .id (), shardId , configuration .localShards .keySet ());
113
- return operation ;
118
+ watch .id (), shardId , configuration .localShards .keySet ());
119
+ return ;
114
120
}
115
121
116
122
boolean shouldBeTriggered = shardAllocationConfiguration .shouldBeTriggered (watch .id ());
@@ -128,21 +134,6 @@ public Engine.Index preIndex(ShardId shardId, Engine.Index operation) {
128
134
} catch (IOException e ) {
129
135
throw new ElasticsearchParseException ("Could not parse watch with id [{}]" , e , operation .id ());
130
136
}
131
-
132
- }
133
-
134
- return operation ;
135
- }
136
-
137
- /**
138
- * In case of a document related failure (for example version conflict), then clean up resources for a watch
139
- * in the trigger service.
140
- */
141
- @ Override
142
- public void postIndex (ShardId shardId , Engine .Index index , Engine .IndexResult result ) {
143
- if (result .getResultType () == Engine .Result .Type .FAILURE ) {
144
- assert result .getFailure () != null ;
145
- postIndex (shardId , index , result .getFailure ());
146
137
}
147
138
}
148
139
@@ -162,8 +153,7 @@ public void postIndex(ShardId shardId, Engine.Index index, Engine.IndexResult re
162
153
@ Override
163
154
public void postIndex (ShardId shardId , Engine .Index index , Exception ex ) {
164
155
if (isWatchDocument (shardId .getIndexName ())) {
165
- logger .debug (() -> new ParameterizedMessage ("removing watch [{}] from trigger" , index .id ()), ex );
166
- triggerService .remove (index .id ());
156
+ logger .debug (() -> new ParameterizedMessage ("failed to add watch [{}] to trigger service" , index .id ()), ex );
167
157
}
168
158
}
169
159
0 commit comments