32
32
import java .util .Collection ;
33
33
import java .util .Collections ;
34
34
import java .util .List ;
35
+ import java .util .OptionalLong ;
35
36
import java .util .concurrent .atomic .AtomicLong ;
36
37
import java .util .function .Supplier ;
37
38
@@ -98,7 +99,9 @@ public void testSoftDeletesRetentionLock() {
98
99
.min ()
99
100
.orElse (Long .MAX_VALUE );
100
101
long retainedSeqNo =
101
- Math .min (safeCommitCheckpoint , Math .min (minimumRetainingSequenceNumber , globalCheckpoint .get () - retainedOps )) + 1 ;
102
+ Math .min (
103
+ 1 + safeCommitCheckpoint ,
104
+ Math .min (minimumRetainingSequenceNumber , 1 + globalCheckpoint .get () - retainedOps ));
102
105
minRetainedSeqNo = Math .max (minRetainedSeqNo , retainedSeqNo );
103
106
}
104
107
assertThat (retentionQuery .getNumDims (), equalTo (1 ));
@@ -113,7 +116,7 @@ public void testSoftDeletesRetentionLock() {
113
116
.min ()
114
117
.orElse (Long .MAX_VALUE );
115
118
long retainedSeqNo =
116
- Math .min (safeCommitCheckpoint , Math .min (minimumRetainingSequenceNumber , globalCheckpoint .get () - retainedOps )) + 1 ;
119
+ Math .min (1 + safeCommitCheckpoint , Math .min (minimumRetainingSequenceNumber , 1 + globalCheckpoint .get () - retainedOps ));
117
120
minRetainedSeqNo = Math .max (minRetainedSeqNo , retainedSeqNo );
118
121
assertThat (policy .getMinRetainedSeqNo (), equalTo (minRetainedSeqNo ));
119
122
}
@@ -141,4 +144,87 @@ public void testAlwaysFetchLatestRetentionLeases() {
141
144
assertThat (policy .getRetentionPolicy ().v2 ().leases (), contains (leases .toArray (new RetentionLease [0 ])));
142
145
}
143
146
}
147
+
148
+ public void testWhenGlobalCheckpointDictatesThePolicy () {
149
+ final int retentionOperations = randomIntBetween (0 , 1024 );
150
+ final AtomicLong globalCheckpoint = new AtomicLong (randomLongBetween (0 , Long .MAX_VALUE - 2 ));
151
+ final Collection <RetentionLease > leases = new ArrayList <>();
152
+ final int numberOfLeases = randomIntBetween (0 , 16 );
153
+ for (int i = 0 ; i < numberOfLeases ; i ++) {
154
+ // setup leases where the minimum retained sequence number is more than the policy dictated by the global checkpoint
155
+ leases .add (new RetentionLease (
156
+ Integer .toString (i ),
157
+ randomLongBetween (1 + globalCheckpoint .get () - retentionOperations + 1 , Long .MAX_VALUE ),
158
+ randomNonNegativeLong (), "test" ));
159
+ }
160
+ final long primaryTerm = randomNonNegativeLong ();
161
+ final long version = randomNonNegativeLong ();
162
+ final Supplier <RetentionLeases > leasesSupplier =
163
+ () -> new RetentionLeases (
164
+ primaryTerm ,
165
+ version ,
166
+ Collections .unmodifiableCollection (new ArrayList <>(leases )));
167
+ final SoftDeletesPolicy policy =
168
+ new SoftDeletesPolicy (globalCheckpoint ::get , 0 , retentionOperations , leasesSupplier );
169
+ // set the local checkpoint of the safe commit to more than the policy dicated by the global checkpoint
170
+ final long localCheckpointOfSafeCommit = randomLongBetween (1 + globalCheckpoint .get () - retentionOperations + 1 , Long .MAX_VALUE );
171
+ policy .setLocalCheckpointOfSafeCommit (localCheckpointOfSafeCommit );
172
+ assertThat (policy .getMinRetainedSeqNo (), equalTo (1 + globalCheckpoint .get () - retentionOperations ));
173
+ }
174
+
175
+ public void testWhenLocalCheckpointOfSafeCommitDictatesThePolicy () {
176
+ final int retentionOperations = randomIntBetween (0 , 1024 );
177
+ final long localCheckpointOfSafeCommit = randomLongBetween (-1 , Long .MAX_VALUE - retentionOperations - 1 );
178
+ final AtomicLong globalCheckpoint =
179
+ new AtomicLong (randomLongBetween (Math .max (0 , localCheckpointOfSafeCommit + retentionOperations ), Long .MAX_VALUE - 1 ));
180
+ final Collection <RetentionLease > leases = new ArrayList <>();
181
+ final int numberOfLeases = randomIntBetween (0 , 16 );
182
+ for (int i = 0 ; i < numberOfLeases ; i ++) {
183
+ leases .add (new RetentionLease (
184
+ Integer .toString (i ),
185
+ randomLongBetween (1 + localCheckpointOfSafeCommit + 1 , Long .MAX_VALUE ), // leases are for more than the local checkpoint
186
+ randomNonNegativeLong (), "test" ));
187
+ }
188
+ final long primaryTerm = randomNonNegativeLong ();
189
+ final long version = randomNonNegativeLong ();
190
+ final Supplier <RetentionLeases > leasesSupplier =
191
+ () -> new RetentionLeases (
192
+ primaryTerm ,
193
+ version ,
194
+ Collections .unmodifiableCollection (new ArrayList <>(leases )));
195
+
196
+ final SoftDeletesPolicy policy =
197
+ new SoftDeletesPolicy (globalCheckpoint ::get , 0 , retentionOperations , leasesSupplier );
198
+ policy .setLocalCheckpointOfSafeCommit (localCheckpointOfSafeCommit );
199
+ assertThat (policy .getMinRetainedSeqNo (), equalTo (1 + localCheckpointOfSafeCommit ));
200
+ }
201
+
202
+ public void testWhenRetentionLeasesDictateThePolicy () {
203
+ final int retentionOperations = randomIntBetween (0 , 1024 );
204
+ final Collection <RetentionLease > leases = new ArrayList <>();
205
+ final int numberOfLeases = randomIntBetween (1 , 16 );
206
+ for (int i = 0 ; i < numberOfLeases ; i ++) {
207
+ leases .add (new RetentionLease (
208
+ Integer .toString (i ),
209
+ randomLongBetween (0 , Long .MAX_VALUE - retentionOperations - 1 ),
210
+ randomNonNegativeLong (), "test" ));
211
+ }
212
+ final OptionalLong minimumRetainingSequenceNumber = leases .stream ().mapToLong (RetentionLease ::retainingSequenceNumber ).min ();
213
+ assert minimumRetainingSequenceNumber .isPresent () : leases ;
214
+ final long localCheckpointOfSafeCommit = randomLongBetween (minimumRetainingSequenceNumber .getAsLong (), Long .MAX_VALUE - 1 );
215
+ final AtomicLong globalCheckpoint =
216
+ new AtomicLong (randomLongBetween (minimumRetainingSequenceNumber .getAsLong () + retentionOperations , Long .MAX_VALUE - 1 ));
217
+ final long primaryTerm = randomNonNegativeLong ();
218
+ final long version = randomNonNegativeLong ();
219
+ final Supplier <RetentionLeases > leasesSupplier =
220
+ () -> new RetentionLeases (
221
+ primaryTerm ,
222
+ version ,
223
+ Collections .unmodifiableCollection (new ArrayList <>(leases )));
224
+ final SoftDeletesPolicy policy =
225
+ new SoftDeletesPolicy (globalCheckpoint ::get , 0 , retentionOperations , leasesSupplier );
226
+ policy .setLocalCheckpointOfSafeCommit (localCheckpointOfSafeCommit );
227
+ assertThat (policy .getMinRetainedSeqNo (), equalTo (minimumRetainingSequenceNumber .getAsLong ()));
228
+ }
229
+
144
230
}
0 commit comments