Skip to content

Commit b555039

Browse files
Fixed an issue where leases could be missed when errors happen during initialization (#44648)
* Update PartitionSynchronizerImpl.java * Update CI pipeline threshold to 120 mins. * Adding more logs * Reduce timeout in cosmos-sdk-client.yml * Update PartitionSynchronizerImpl.java * Adding more logs * Update cosmos-sdk-client.yml * Added an option to verify for missing leases on every Change feed processor restart * Adding implementation for EPK based leases * Adding more test coverage * Update PartitionSynchronizerImpl.java * renaming method to enable lease verification --------- Co-authored-by: Abhijeet Mohanty <[email protected]>
1 parent 59f67d9 commit b555039

File tree

11 files changed

+313
-63
lines changed

11 files changed

+313
-63
lines changed

sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/implementation/changefeed/epkversion/BootstrapperImplTests.java

Lines changed: 87 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,12 +4,14 @@
44
package com.azure.cosmos.implementation.changefeed.epkversion;
55

66
import com.azure.cosmos.implementation.changefeed.Bootstrapper;
7+
import com.azure.cosmos.implementation.changefeed.Lease;
78
import com.azure.cosmos.implementation.changefeed.LeaseStore;
89
import com.azure.cosmos.implementation.changefeed.LeaseStoreManager;
910
import com.azure.cosmos.implementation.changefeed.common.ChangeFeedMode;
1011
import com.azure.cosmos.implementation.changefeed.common.ChangeFeedStartFromInternal;
1112
import com.azure.cosmos.implementation.changefeed.common.ChangeFeedState;
1213
import com.azure.cosmos.implementation.changefeed.common.ChangeFeedStateV1;
14+
import com.azure.cosmos.implementation.changefeed.common.LeaseVersion;
1315
import com.azure.cosmos.implementation.changefeed.pkversion.ServiceItemLease;
1416
import com.azure.cosmos.implementation.feedranges.FeedRangeContinuation;
1517
import com.azure.cosmos.implementation.feedranges.FeedRangePartitionKeyRangeImpl;
@@ -121,6 +123,19 @@ public Object[][] leaseProvider() {
121123
BASE_CONTINUATION_STRING_FOR_PK_FULL_RANGE),
122124
true
123125
},
126+
{
127+
createEpkRangeBasedLeaseWithContinuation(
128+
true,
129+
ChangeFeedMode.FULL_FIDELITY,
130+
ChangeFeedStartFromInternal.createFromNow(),
131+
"XyJKUI7=",
132+
"NO67Hq=",
133+
"0",
134+
"-FF",
135+
"0"),
136+
null,
137+
false
138+
},
124139
{
125140
null,
126141
null,
@@ -193,6 +208,78 @@ public void tryInitializeStoreFromEpkVersionLeaseStoreWithExistingLeases(
193208
Mockito.verify(leaseStoreMock, times(2)).isInitialized();
194209
}
195210

211+
@Test(groups = {"unit"}, dataProvider = "leaseProvider")
212+
public void tryInitializeStoreFromEpkVersionLeaseStoreWithExistingButMissingLeases(
213+
ServiceItemLeaseV1 epkRangeBasedLease,
214+
ServiceItemLease pkRangeBasedLease,
215+
boolean expectIllegalStateException) {
216+
217+
Duration lockTime = Duration.ofSeconds(5);
218+
Duration expireTIme = Duration.ofSeconds(5);
219+
220+
PartitionSynchronizer partitionSynchronizerMock = Mockito.mock(PartitionSynchronizer.class);
221+
Mockito.when(partitionSynchronizerMock.createMissingLeases()).thenReturn(Mono.empty());
222+
223+
LeaseStore leaseStoreMock = Mockito.mock(LeaseStore.class);
224+
Mockito
225+
.when(leaseStoreMock.isInitialized())
226+
.thenReturn(Mono.just(false))
227+
.thenReturn(Mono.just(true));
228+
Mockito.when(leaseStoreMock.acquireInitializationLock(lockTime)).thenReturn(Mono.just(true));
229+
Mockito.when(leaseStoreMock.markInitialized()).thenReturn(Mono.just(Boolean.TRUE));
230+
Mockito.when(leaseStoreMock.releaseInitializationLock()).thenReturn(Mono.empty());
231+
232+
LeaseStoreManager epkRangeVersionLeaseStoreManagerMock = Mockito.mock(LeaseStoreManager.class);
233+
LeaseStoreManager pkRangeVersionLeaseStoreManagerMock = Mockito.mock(LeaseStoreManager.class);
234+
235+
ChangeFeedProcessorOptions changeFeedProcessorOptions = new ChangeFeedProcessorOptions()
236+
.setLeaseVerificationEnabledOnRestart(true);
237+
238+
if (epkRangeBasedLease != null && expectIllegalStateException) {
239+
Mockito.when(epkRangeVersionLeaseStoreManagerMock.getTopLeases(Mockito.eq(1)))
240+
.thenReturn(Flux.just(epkRangeBasedLease));
241+
} else {
242+
Mockito.when(epkRangeVersionLeaseStoreManagerMock.getTopLeases(Mockito.eq(1)))
243+
.thenReturn(Flux.empty());
244+
}
245+
246+
if (pkRangeBasedLease == null) {
247+
Mockito.when(pkRangeVersionLeaseStoreManagerMock.getTopLeases(Mockito.eq(1)))
248+
.thenReturn(Flux.empty());
249+
} else {
250+
Mockito.when(pkRangeVersionLeaseStoreManagerMock.getTopLeases(Mockito.eq(1)))
251+
.thenReturn(Flux.just(pkRangeBasedLease));
252+
}
253+
254+
Bootstrapper bootstrapper = new BootstrapperImpl(
255+
partitionSynchronizerMock,
256+
leaseStoreMock,
257+
lockTime,
258+
expireTIme,
259+
epkRangeVersionLeaseStoreManagerMock,
260+
pkRangeVersionLeaseStoreManagerMock,
261+
changeFeedProcessorOptions,
262+
ChangeFeedMode.FULL_FIDELITY);
263+
264+
if (expectIllegalStateException) {
265+
Assert.assertThrows(IllegalStateException.class, () -> bootstrapper.initialize().block());
266+
} else {
267+
bootstrapper.initialize().block();
268+
}
269+
270+
Mockito.verify(pkRangeVersionLeaseStoreManagerMock, times(1)).getTopLeases(Mockito.eq(1));
271+
272+
if (pkRangeBasedLease == null) {
273+
Mockito.verify(epkRangeVersionLeaseStoreManagerMock, times(1)).getTopLeases(Mockito.eq(1));
274+
Mockito.verify(partitionSynchronizerMock, times(expectIllegalStateException ? 1 : 2)).createMissingLeases();
275+
} else {
276+
Mockito.verify(partitionSynchronizerMock, times(1)).createMissingLeases();
277+
}
278+
279+
Mockito.verify(leaseStoreMock, times(2)).isInitialized();
280+
Mockito.verify(leaseStoreMock, times(2)).acquireInitializationLock(Mockito.any());
281+
}
282+
196283
private static ServiceItemLeaseV1 createEpkRangeBasedLeaseWithContinuation(
197284
boolean withContinuation,
198285
ChangeFeedMode changeFeedMode,

sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/implementation/changefeed/pkversion/BootstrapperImplTests.java

Lines changed: 86 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@
1111
import com.azure.cosmos.implementation.changefeed.common.ChangeFeedStartFromInternal;
1212
import com.azure.cosmos.implementation.changefeed.common.ChangeFeedState;
1313
import com.azure.cosmos.implementation.changefeed.common.ChangeFeedStateV1;
14+
import com.azure.cosmos.implementation.changefeed.common.LeaseVersion;
1415
import com.azure.cosmos.implementation.changefeed.epkversion.ServiceItemLeaseV1;
1516
import com.azure.cosmos.implementation.feedranges.FeedRangeContinuation;
1617
import com.azure.cosmos.implementation.feedranges.FeedRangePartitionKeyRangeImpl;
@@ -86,6 +87,14 @@ public Object[][] leaseProvider() {
8687
"0"),
8788
true,
8889
},
90+
{
91+
createPkRangeBasedLeaseWithContinuation(false, "XyJKUI7=","NO67Hq=", "0", "0"),
92+
false
93+
},
94+
{
95+
createPkRangeBasedLeaseWithContinuation(true, "XyJKUI7=","NO67Hq=", "0", "0"),
96+
false
97+
},
8998
{
9099
null,
91100
false
@@ -139,6 +148,63 @@ public void tryInitializeStoreFromPkVersionLeaseStoreWithExistingLeases(Lease le
139148
Mockito.verify(leaseStoreMock, times(2)).isInitialized();
140149
}
141150

151+
@Test(groups = {"unit"}, dataProvider = "leaseProvider")
152+
public void tryInitializeStoreFromPkVersionLeaseStoreWithExistingButMissingLeases(Lease lease, boolean expectIllegalStateException) {
153+
Duration lockTime = Duration.ofSeconds(5);
154+
Duration sleepTime = Duration.ofSeconds(5);
155+
156+
PartitionSynchronizer partitionSynchronizerMock = Mockito.mock(PartitionSynchronizer.class);
157+
Mockito.when(partitionSynchronizerMock.createMissingLeases()).thenReturn(Mono.empty());
158+
159+
ChangeFeedProcessorOptions changeFeedProcessorOptions = new ChangeFeedProcessorOptions()
160+
.setLeaseVerificationEnabledOnRestart(true);
161+
162+
LeaseStore leaseStoreMock = Mockito.mock(LeaseStore.class);
163+
Mockito
164+
.when(leaseStoreMock.isInitialized())
165+
.thenReturn(Mono.just(false))
166+
.thenReturn(Mono.just(true));
167+
Mockito.when(leaseStoreMock.acquireInitializationLock(lockTime)).thenReturn(Mono.just(true));
168+
Mockito.when(leaseStoreMock.markInitialized()).thenReturn(Mono.just(Boolean.TRUE));
169+
Mockito.when(leaseStoreMock.releaseInitializationLock()).thenReturn(Mono.empty());
170+
171+
LeaseStoreManager epkRangeVersionLeaseStoreManagerMock = Mockito.mock(LeaseStoreManager.class);
172+
173+
// even return empty Flux when lease != null to simulate the missing leases
174+
if (lease != null) {
175+
Mockito.when(epkRangeVersionLeaseStoreManagerMock.getTopLeases(Mockito.eq(1)))
176+
.thenReturn(
177+
// even return empty Flux when PK-range based lease != null to simulate the missing leases
178+
lease.getVersion() == LeaseVersion.EPK_RANGE_BASED_LEASE ? Flux.just(lease) : Flux.empty()
179+
);
180+
} else {
181+
Mockito.when(epkRangeVersionLeaseStoreManagerMock.getTopLeases(Mockito.eq(1))).thenReturn(Flux.empty());
182+
}
183+
184+
Bootstrapper bootstrapper = new BootstrapperImpl(
185+
partitionSynchronizerMock,
186+
leaseStoreMock,
187+
epkRangeVersionLeaseStoreManagerMock,
188+
changeFeedProcessorOptions,
189+
lockTime,
190+
sleepTime);
191+
192+
if (expectIllegalStateException) {
193+
Assert.assertThrows(IllegalStateException.class, () -> bootstrapper.initialize().block());
194+
} else {
195+
bootstrapper.initialize().block();
196+
}
197+
198+
Mockito.verify(epkRangeVersionLeaseStoreManagerMock, times(1)).getTopLeases(Mockito.eq(1));
199+
if (lease != null && lease.getVersion() == LeaseVersion.EPK_RANGE_BASED_LEASE) {
200+
Mockito.verify(partitionSynchronizerMock, times(1)).createMissingLeases();
201+
} else {
202+
Mockito.verify(partitionSynchronizerMock, times(2)).createMissingLeases();
203+
}
204+
Mockito.verify(leaseStoreMock, times(2)).isInitialized();
205+
Mockito.verify(leaseStoreMock, times(2)).acquireInitializationLock(Mockito.any());
206+
}
207+
142208
private static ServiceItemLeaseV1 createEpkRangeBasedLeaseWithContinuation(
143209
boolean withContinuation,
144210
ChangeFeedMode changeFeedMode,
@@ -176,4 +242,24 @@ private static ServiceItemLeaseV1 createEpkRangeBasedLeaseWithContinuation(
176242

177243
return lease;
178244
}
245+
246+
private static ServiceItemLease createPkRangeBasedLeaseWithContinuation(
247+
boolean withContinuation,
248+
String databaseRid,
249+
String collectionRid,
250+
String leaseToken,
251+
String continuationToken) {
252+
253+
ServiceItemLease lease = new ServiceItemLease();
254+
255+
lease.setId(String.format("%s_%s..%s", databaseRid, collectionRid, leaseToken));
256+
257+
lease = lease.withLeaseToken(leaseToken);
258+
259+
if (withContinuation) {
260+
lease = lease.withContinuationToken(continuationToken);
261+
}
262+
263+
return lease;
264+
}
179265
}

sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/rx/changefeed/epkversion/FullFidelityChangeFeedProcessorTest.java

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1177,6 +1177,7 @@ public void staledLeaseAcquiring() throws InterruptedException {
11771177
.leaseContainer(createdLeaseCollection)
11781178
.options(new ChangeFeedProcessorOptions()
11791179
.setLeasePrefix(leasePrefix)
1180+
.setLeaseVerificationEnabledOnRestart(true)
11801181
)
11811182
.buildChangeFeedProcessor();
11821183

@@ -1199,6 +1200,7 @@ public void staledLeaseAcquiring() throws InterruptedException {
11991200
.setLeasePrefix(leasePrefix)
12001201
.setMaxItemCount(10)
12011202
.setMaxScaleCount(0) // unlimited
1203+
.setLeaseVerificationEnabledOnRestart(true)
12021204
)
12031205
.buildChangeFeedProcessor();
12041206

@@ -1323,6 +1325,7 @@ public void ownerNullAcquiring() throws InterruptedException {
13231325
.setLeaseAcquireInterval(Duration.ofMillis(5 * CHANGE_FEED_PROCESSOR_TIMEOUT))
13241326
.setLeaseExpirationInterval(Duration.ofMillis(6 * CHANGE_FEED_PROCESSOR_TIMEOUT))
13251327
.setFeedPollDelay(Duration.ofSeconds(5))
1328+
.setLeaseVerificationEnabledOnRestart(true)
13261329
)
13271330
.buildChangeFeedProcessor();
13281331

sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/rx/changefeed/epkversion/IncrementalChangeFeedProcessorTest.java

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -845,6 +845,7 @@ public void staledLeaseAcquiring() throws InterruptedException {
845845
.leaseContainer(createdLeaseCollection)
846846
.options(new ChangeFeedProcessorOptions()
847847
.setLeasePrefix(leasePrefix)
848+
.setLeaseVerificationEnabledOnRestart(true)
848849
)
849850
.buildChangeFeedProcessor();
850851

@@ -868,6 +869,7 @@ public void staledLeaseAcquiring() throws InterruptedException {
868869
.setMaxItemCount(10)
869870
.setStartFromBeginning(true)
870871
.setMaxScaleCount(0) // unlimited
872+
.setLeaseVerificationEnabledOnRestart(true)
871873
)
872874
.buildChangeFeedProcessor();
873875

@@ -994,6 +996,7 @@ public void ownerNullAcquiring() throws InterruptedException {
994996
.setLeaseAcquireInterval(Duration.ofMillis(5 * CHANGE_FEED_PROCESSOR_TIMEOUT))
995997
.setLeaseExpirationInterval(Duration.ofMillis(6 * CHANGE_FEED_PROCESSOR_TIMEOUT))
996998
.setFeedPollDelay(Duration.ofSeconds(5))
999+
.setLeaseVerificationEnabledOnRestart(true)
9971000
)
9981001
.buildChangeFeedProcessor();
9991002

sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/changefeed/epkversion/BootstrapperImpl.java

Lines changed: 34 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -82,34 +82,42 @@ public Mono<Void> initialize() {
8282
.flatMap(value -> this.leaseStore.isInitialized())
8383
.flatMap(initialized -> {
8484
this.isInitialized = initialized;
85-
85+
Mono<Void> previousOperation = Mono.empty();
8686
if (initialized) {
87-
return this.validateLeaseCFModeInteroperabilityForEpkRangeBasedLease();
88-
} else {
89-
logger.info("Acquire initialization lock");
90-
return this.leaseStore.acquireInitializationLock(this.lockTime)
91-
.flatMap(lockAcquired -> {
92-
this.isLockAcquired = lockAcquired;
93-
94-
if (!this.isLockAcquired) {
95-
logger.info("Another instance is initializing the store");
96-
return Mono.just(isLockAcquired).delayElement(this.sleepTime, CosmosSchedulers.COSMOS_PARALLEL);
97-
} else {
98-
return this.synchronizer.createMissingLeases()
99-
.then(this.leaseStore.markInitialized());
100-
}
101-
})
102-
.onErrorResume(throwable -> {
103-
logger.warn("Unexpected exception caught while initializing the lock", throwable);
104-
return Mono.just(this.isLockAcquired);
105-
})
106-
.flatMap(lockAcquired -> {
107-
if (this.isLockAcquired) {
108-
return this.leaseStore.releaseInitializationLock();
109-
}
110-
return Mono.just(lockAcquired);
111-
});
87+
if (!this.changeFeedProcessorOptions.isLeaseVerificationEnabledOnRestart()) {
88+
return this.validateLeaseCFModeInteroperabilityForEpkRangeBasedLease();
89+
} else {
90+
previousOperation = this.validateLeaseCFModeInteroperabilityForEpkRangeBasedLease();
91+
}
11292
}
93+
94+
logger.info("Acquire initialization lock");
95+
return previousOperation.then(
96+
this.leaseStore.acquireInitializationLock(this.lockTime)
97+
.flatMap(lockAcquired -> {
98+
this.isLockAcquired = lockAcquired;
99+
100+
if (!this.isLockAcquired) {
101+
logger.info("Another instance is initializing the store");
102+
return Mono.just(isLockAcquired).delayElement(this.sleepTime, CosmosSchedulers.COSMOS_PARALLEL);
103+
} else {
104+
return this.synchronizer.createMissingLeases()
105+
.then(!isInitialized
106+
? this.leaseStore.markInitialized().flatMap((initSucceeded) -> Mono.just(lockAcquired))
107+
: Mono.just(lockAcquired));
108+
}
109+
})
110+
.onErrorResume(throwable -> {
111+
logger.warn("Unexpected exception caught while initializing the lock", throwable);
112+
return Mono.just(this.isLockAcquired);
113+
})
114+
.flatMap(lockAcquired -> {
115+
if (this.isLockAcquired) {
116+
return this.leaseStore.releaseInitializationLock();
117+
}
118+
return Mono.just(lockAcquired);
119+
})
120+
);
113121
})
114122
.repeat(() -> !this.isInitialized)
115123
.then();

sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/changefeed/epkversion/LeaseStoreManagerImpl.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -191,6 +191,7 @@ public Mono<Lease> createLeaseIfNotExist(FeedRangeEpkImpl feedRange, String cont
191191
}
192192
}
193193

194+
logger.error("Failed to create lease document for {}.", leaseToken, ex);
194195
return Mono.error(ex);
195196
})
196197
.map(documentResourceResponse -> {

sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/changefeed/epkversion/PartitionSynchronizerImpl.java

Lines changed: 25 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -83,8 +83,26 @@ public Mono<Void> createMissingLeases() {
8383
@Override
8484
public Mono<Void> createMissingLeases(List<Lease> pkRangeIdVersionLeases) {
8585
return this.documentClient.getOverlappingRanges(PartitionKeyInternalHelper.FullRange, true)
86-
.flatMap(pkRangeList -> this.createLeases(pkRangeList, pkRangeIdVersionLeases).then())
87-
.doOnError(throwable -> logger.error("Create missing leases from pkRangeIdVersion leases failed", throwable));
86+
.flatMap(pkRangeList -> {
87+
if (logger.isInfoEnabled()) {
88+
StringBuilder sb = new StringBuilder();
89+
for (PartitionKeyRange pkr : pkRangeList) {
90+
if (sb.length() > 0) {
91+
sb.append(", ");
92+
}
93+
94+
sb.append(pkr.getId() + ":" + pkr.getMinInclusive() + "-" + pkr.getMaxExclusive());
95+
}
96+
logger.info(
97+
"Checking whether leases for any partition is missing - partitions - {}",
98+
sb);
99+
}
100+
return this.createLeases(pkRangeList, pkRangeIdVersionLeases).then();
101+
})
102+
.onErrorResume( throwable -> {
103+
logger.error("Create missing leases from pkRangeIdVersion leases failed", throwable);
104+
return Mono.error(throwable);
105+
});
88106
}
89107

90108
@Override
@@ -158,8 +176,12 @@ private Flux<Lease> createLeases(List<PartitionKeyRange> partitionKeyRanges) {
158176
return Mono.empty();
159177
}).flatMap(pkRange -> {
160178
FeedRangeEpkImpl feedRangeEpk = new FeedRangeEpkImpl(pkRange.toRange());
179+
logger.debug("Adding a new lease document for feed range {}", feedRangeEpk);
161180
// We are creating the lease for the whole pkRange.
162-
return leaseManager.createLeaseIfNotExist(feedRangeEpk, null);
181+
return leaseManager.createLeaseIfNotExist(feedRangeEpk, null)
182+
.doOnSuccess((lease) -> {
183+
logger.info("Added new lease document for feed range {}", lease.getLeaseToken());
184+
});
163185
}, this.degreeOfParallelism);
164186
});
165187
}

0 commit comments

Comments
 (0)