1
+ /*
2
+ * Licensed to Elasticsearch under one or more contributor
3
+ * license agreements. See the NOTICE file distributed with
4
+ * this work for additional information regarding copyright
5
+ * ownership. Elasticsearch licenses this file to you under
6
+ * the Apache License, Version 2.0 (the "License"); you may
7
+ * not use this file except in compliance with the License.
8
+ * You may obtain a copy of the License at
9
+ *
10
+ * http://www.apache.org/licenses/LICENSE-2.0
11
+ *
12
+ * Unless required by applicable law or agreed to in writing,
13
+ * software distributed under the License is distributed on an
14
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15
+ * KIND, either express or implied. See the License for the
16
+ * specific language governing permissions and limitations
17
+ * under the License.
18
+ */
19
+
20
+ package org .elasticsearch .index .seqno ;
21
+
22
+ import org .apache .logging .log4j .LogManager ;
23
+ import org .apache .logging .log4j .Logger ;
24
+ import org .elasticsearch .action .Action ;
25
+ import org .elasticsearch .action .ActionListener ;
26
+ import org .elasticsearch .action .ActionRequestValidationException ;
27
+ import org .elasticsearch .action .ActionResponse ;
28
+ import org .elasticsearch .action .support .ActionFilters ;
29
+ import org .elasticsearch .action .support .single .shard .SingleShardRequest ;
30
+ import org .elasticsearch .action .support .single .shard .TransportSingleShardAction ;
31
+ import org .elasticsearch .cluster .ClusterState ;
32
+ import org .elasticsearch .cluster .metadata .IndexNameExpressionResolver ;
33
+ import org .elasticsearch .cluster .routing .ShardsIterator ;
34
+ import org .elasticsearch .cluster .service .ClusterService ;
35
+ import org .elasticsearch .common .inject .Inject ;
36
+ import org .elasticsearch .common .io .stream .StreamInput ;
37
+ import org .elasticsearch .common .io .stream .StreamOutput ;
38
+ import org .elasticsearch .common .lease .Releasable ;
39
+ import org .elasticsearch .common .util .concurrent .FutureUtils ;
40
+ import org .elasticsearch .index .IndexService ;
41
+ import org .elasticsearch .index .shard .IndexShard ;
42
+ import org .elasticsearch .index .shard .ShardId ;
43
+ import org .elasticsearch .indices .IndicesService ;
44
+ import org .elasticsearch .threadpool .ThreadPool ;
45
+ import org .elasticsearch .transport .TransportService ;
46
+
47
+ import java .io .IOException ;
48
+ import java .util .Objects ;
49
+ import java .util .concurrent .CompletableFuture ;
50
+
51
+ public class RetentionLeaseActions {
52
+
53
+ static abstract class TransportRetentionLeaseAction extends TransportSingleShardAction <Request , Response > {
54
+
55
+ private Logger logger = LogManager .getLogger (getClass ());
56
+
57
+ private final IndicesService indicesService ;
58
+
59
+ @ Inject
60
+ public TransportRetentionLeaseAction (
61
+ final String name ,
62
+ final ThreadPool threadPool ,
63
+ final ClusterService clusterService ,
64
+ final TransportService transportService ,
65
+ final ActionFilters actionFilters ,
66
+ final IndexNameExpressionResolver indexNameExpressionResolver ,
67
+ final IndicesService indicesService ) {
68
+ super (name , threadPool , clusterService , transportService , actionFilters , indexNameExpressionResolver , Request ::new , ThreadPool .Names .MANAGEMENT );
69
+ this .indicesService = Objects .requireNonNull (indicesService );
70
+ }
71
+
72
+ @ Override
73
+ protected ShardsIterator shards (ClusterState state , InternalRequest request ) {
74
+ return state
75
+ .routingTable ()
76
+ .shardRoutingTable (request .concreteIndex (), request .request ().getShardId ().id ())
77
+ .primaryShardIt ();
78
+ }
79
+
80
+ @ Override
81
+ protected Response shardOperation (final Request request , final ShardId shardId ) {
82
+ final IndexService indexService = indicesService .indexServiceSafe (request .getShardId ().getIndex ());
83
+ final IndexShard indexShard = indexService .getShard (request .getShardId ().id ());
84
+
85
+ final CompletableFuture <Releasable > permit = new CompletableFuture <>();
86
+ final ActionListener <Releasable > onAcquired = new ActionListener <Releasable >() {
87
+
88
+ @ Override
89
+ public void onResponse (Releasable releasable ) {
90
+ if (permit .complete (releasable ) == false ) {
91
+ releasable .close ();
92
+ }
93
+ }
94
+
95
+ @ Override
96
+ public void onFailure (Exception e ) {
97
+ permit .completeExceptionally (e );
98
+ }
99
+
100
+ };
101
+
102
+ indexShard .acquirePrimaryOperationPermit (onAcquired , ThreadPool .Names .SAME , request );
103
+
104
+ // block until we have the permit
105
+ try (Releasable ignore = FutureUtils .get (permit )) {
106
+ doRetentionLeaseAction (indexShard , request );
107
+ } finally {
108
+ // just in case we got an exception (likely interrupted) while waiting for the get
109
+ permit .whenComplete ((r , e ) -> {
110
+ if (r != null ) {
111
+ r .close ();
112
+ }
113
+ if (e != null ) {
114
+ logger .trace ("suppressing exception on completion (it was already bubbled up or the operation was aborted)" , e );
115
+ }
116
+ });
117
+ }
118
+
119
+ return new Response ();
120
+ }
121
+
122
+ abstract void doRetentionLeaseAction (IndexShard indexShard , Request request );
123
+
124
+ @ Override
125
+ protected Response newResponse () {
126
+ return new Response ();
127
+ }
128
+
129
+ @ Override
130
+ protected boolean resolveIndex (final Request request ) {
131
+ return false ;
132
+ }
133
+
134
+ }
135
+
136
+ public static class Add extends Action <Response > {
137
+
138
+ public static final Add INSTANCE = new Add ();
139
+ public static final String NAME = "indices:admin/seq_no/add_retention_lease" ;
140
+
141
+ private Add () {
142
+ super (NAME );
143
+ }
144
+
145
+ public static class TransportAction extends TransportRetentionLeaseAction {
146
+
147
+ @ Inject
148
+ public TransportAction (
149
+ final ThreadPool threadPool ,
150
+ final ClusterService clusterService ,
151
+ final TransportService transportService ,
152
+ final ActionFilters actionFilters ,
153
+ final IndexNameExpressionResolver indexNameExpressionResolver ,
154
+ final IndicesService indicesService ) {
155
+ super (NAME , threadPool , clusterService , transportService , actionFilters , indexNameExpressionResolver , indicesService );
156
+ }
157
+
158
+ @ Override
159
+ void doRetentionLeaseAction (final IndexShard indexShard , final Request request ) {
160
+ indexShard .addRetentionLease (request .getId (), request .getRetainingSequenceNumber (), request .getSource (), ActionListener .wrap (() -> {}));
161
+ }
162
+ }
163
+
164
+ @ Override
165
+ public Response newResponse () {
166
+ return new Response ();
167
+ }
168
+
169
+ }
170
+
171
+ public static class Renew extends Action <Response > {
172
+
173
+ public static final Renew INSTANCE = new Renew ();
174
+ public static final String NAME = "indices:admin/seq_no/renew_retention_lease" ;
175
+
176
+ private Renew () {
177
+ super (NAME );
178
+ }
179
+
180
+ public static class TransportAction extends TransportRetentionLeaseAction {
181
+
182
+ @ Inject
183
+ public TransportAction (
184
+ final ThreadPool threadPool ,
185
+ final ClusterService clusterService ,
186
+ final TransportService transportService ,
187
+ final ActionFilters actionFilters ,
188
+ final IndexNameExpressionResolver indexNameExpressionResolver ,
189
+ final IndicesService indicesService ) {
190
+ super (NAME , threadPool , clusterService , transportService , actionFilters , indexNameExpressionResolver , indicesService );
191
+ }
192
+
193
+
194
+ @ Override
195
+ void doRetentionLeaseAction (final IndexShard indexShard , final Request request ) {
196
+ indexShard .renewRetentionLease (request .getId (), request .getRetainingSequenceNumber (), request .getSource ());
197
+ }
198
+ }
199
+
200
+ @ Override
201
+ public Response newResponse () {
202
+ return new Response ();
203
+ }
204
+
205
+ }
206
+
207
+ public static class Request extends SingleShardRequest <Request > {
208
+
209
+ public static long RETAIN_ALL = -1 ;
210
+
211
+ private ShardId shardId ;
212
+
213
+ public ShardId getShardId () {
214
+ return shardId ;
215
+ }
216
+
217
+ private String id ;
218
+
219
+ public String getId () {
220
+ return id ;
221
+ }
222
+
223
+ private long retainingSequenceNumber ;
224
+
225
+ public long getRetainingSequenceNumber () {
226
+ return retainingSequenceNumber ;
227
+ }
228
+
229
+ private String source ;
230
+
231
+ public String getSource () {
232
+ return source ;
233
+ }
234
+
235
+ public Request () {
236
+ }
237
+
238
+ public Request (final ShardId shardId , final String id , final long retainingSequenceNumber , final String source ) {
239
+ super (Objects .requireNonNull (shardId ).getIndexName ());
240
+ this .shardId = shardId ;
241
+ this .id = Objects .requireNonNull (id );
242
+ if (retainingSequenceNumber < 0 && retainingSequenceNumber != RETAIN_ALL ) {
243
+ throw new IllegalArgumentException (
244
+ "retention lease retaining sequence number [" + retainingSequenceNumber + "] out of range" );
245
+ }
246
+ this .retainingSequenceNumber = retainingSequenceNumber ;
247
+ this .source = Objects .requireNonNull (source );
248
+ }
249
+
250
+ @ Override
251
+ public ActionRequestValidationException validate () {
252
+ return null ;
253
+ }
254
+
255
+ @ Override
256
+ public void readFrom (StreamInput in ) throws IOException {
257
+ super .readFrom (in );
258
+ shardId = ShardId .readShardId (in );
259
+ id = in .readString ();
260
+ retainingSequenceNumber = in .readZLong ();
261
+ source = in .readString ();
262
+ }
263
+
264
+ @ Override
265
+ public void writeTo (StreamOutput out ) throws IOException {
266
+ super .writeTo (out );
267
+ shardId .writeTo (out );
268
+ out .writeString (id );
269
+ out .writeZLong (retainingSequenceNumber );
270
+ out .writeString (source );
271
+ }
272
+
273
+ }
274
+
275
+ public static class Response extends ActionResponse {
276
+
277
+ }
278
+
279
+ }
0 commit comments