Skip to content

Commit 26fd3f5

Browse files
committed
Fix stale read of some acknowledged writes after a table split (ydb-platform#2286)
1 parent 385b62e commit 26fd3f5

File tree

5 files changed

+665
-15
lines changed

5 files changed

+665
-15
lines changed

ydb/core/tx/datashard/create_table_unit.cpp

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -86,6 +86,7 @@ EExecutionStatus TCreateTableUnit::Execute(TOperation::TPtr op,
8686
txc.DB.NoMoreReadsForTx();
8787
DataShard.SetPersistState(TShardState::Ready, txc);
8888
DataShard.CheckMvccStateChangeCanStart(ctx); // Recheck
89+
DataShard.SendRegistrationRequestTimeCast(ctx);
8990
}
9091

9192
return EExecutionStatus::DelayCompleteNoMoreRestarts;

ydb/core/tx/datashard/datashard.cpp

Lines changed: 58 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -398,8 +398,23 @@ void TDataShard::SendRegistrationRequestTimeCast(const TActorContext &ctx) {
398398
if (RegistrationSended)
399399
return;
400400

401-
if (!ProcessingParams)
401+
if (!ProcessingParams) {
402+
LOG_DEBUG_S(ctx, NKikimrServices::TX_DATASHARD, TabletID()
403+
<< " not sending time cast registration request in state "
404+
<< DatashardStateName(State)
405+
<< ": missing processing params");
402406
return;
407+
}
408+
409+
if (State == TShardState::WaitScheme ||
410+
State == TShardState::SplitDstReceivingSnapshot)
411+
{
412+
// We don't have all the necessary info yet
413+
LOG_DEBUG_S(ctx, NKikimrServices::TX_DATASHARD, TabletID()
414+
<< " not sending time cast registration request in state "
415+
<< DatashardStateName(State));
416+
return;
417+
}
403418

404419
LOG_INFO_S(ctx, NKikimrServices::TX_DATASHARD, "Send registration request to time cast "
405420
<< DatashardStateName(State) << " tabletId " << TabletID()
@@ -1961,6 +1976,13 @@ TRowVersion TDataShard::GetMvccTxVersion(EMvccTxMode mode, TOperation* op) const
19611976
}
19621977
}
19631978

1979+
LOG_TRACE_S(*TlsActivationContext, NKikimrServices::TX_DATASHARD, "GetMvccTxVersion at " << TabletID()
1980+
<< " CompleteEdge# " << SnapshotManager.GetCompleteEdge()
1981+
<< " IncompleteEdge# " << SnapshotManager.GetIncompleteEdge()
1982+
<< " UnprotectedReadEdge# " << SnapshotManager.GetUnprotectedReadEdge()
1983+
<< " ImmediateWriteEdge# " << SnapshotManager.GetImmediateWriteEdge()
1984+
<< " ImmediateWriteEdgeReplied# " << SnapshotManager.GetImmediateWriteEdgeReplied());
1985+
19641986
TRowVersion edge;
19651987
TRowVersion readEdge = Max(
19661988
SnapshotManager.GetCompleteEdge(),
@@ -2075,6 +2097,8 @@ TDataShard::TPromotePostExecuteEdges TDataShard::PromoteImmediatePostExecuteEdge
20752097
// We need to wait for completion until the flag is committed
20762098
res.WaitCompletion = true;
20772099
}
2100+
LOG_TRACE_S(*TlsActivationContext, NKikimrServices::TX_DATASHARD, "PromoteImmediatePostExecuteEdges at " << TabletID()
2101+
<< " promoting UnprotectedReadEdge to " << version);
20782102
SnapshotManager.PromoteUnprotectedReadEdge(version);
20792103

20802104
// We want to promote the complete edge when protected reads are
@@ -2237,6 +2261,19 @@ void TDataShard::SendAfterMediatorStepActivate(ui64 mediatorStep, const TActorCo
22372261
for (auto it = MediatorDelayedReplies.begin(); it != MediatorDelayedReplies.end();) {
22382262
const ui64 step = it->first.Step;
22392263

2264+
if (SrcSplitDescription) {
2265+
if (State == TShardState::SplitSrcSendingSnapshot ||
2266+
State == TShardState::SplitSrcWaitForPartitioningChanged ||
2267+
State == TShardState::PreOffline ||
2268+
State == TShardState::Offline)
2269+
{
2270+
// We cannot send replies, since dst shard is now in charge
2271+
// of keeping track of acknowledged writes. So we expect
2272+
// split src logic to reboot this shard later.
2273+
break;
2274+
}
2275+
}
2276+
22402277
if (step <= mediatorStep) {
22412278
SnapshotManager.PromoteImmediateWriteEdgeReplied(it->first);
22422279
Send(it->second.Target, it->second.Event.Release(), 0, it->second.Cookie);
@@ -2304,13 +2341,16 @@ void TDataShard::CheckMediatorStateRestored() {
23042341
// HEAD reads must include that in their results.
23052342
const ui64 waitStep = CoordinatorPrevReadStepMax;
23062343
const ui64 readStep = CoordinatorPrevReadStepMax;
2307-
2308-
LOG_DEBUG_S(*TlsActivationContext, NKikimrServices::TX_DATASHARD, "CheckMediatorStateRestored: waitStep# " << waitStep << " readStep# " << readStep);
2344+
const ui64 observedStep = GetMaxObservedStep();
2345+
LOG_DEBUG_S(*TlsActivationContext, NKikimrServices::TX_DATASHARD, "CheckMediatorStateRestored at " << TabletID() << ":"
2346+
<< " waitStep# " << waitStep
2347+
<< " readStep# " << readStep
2348+
<< " observedStep# " << observedStep);
23092349

23102350
// WARNING: we must perform this check BEFORE we update unprotected read edge
23112351
// We may enter this code path multiple times, and we expect that the above
23122352
// read step may be refined while we wait based on pessimistic backup step.
2313-
if (GetMaxObservedStep() < waitStep) {
2353+
if (observedStep < waitStep) {
23142354
// We need to wait until we observe mediator step that is at least
23152355
// as large as the step we found.
23162356
if (MediatorTimeCastWaitingSteps.insert(waitStep).second) {
@@ -2331,7 +2371,10 @@ void TDataShard::CheckMediatorStateRestored() {
23312371
SnapshotManager.GetImmediateWriteEdge().Step > SnapshotManager.GetCompleteEdge().Step
23322372
? SnapshotManager.GetImmediateWriteEdge().Prev()
23332373
: TRowVersion::Min();
2334-
SnapshotManager.PromoteUnprotectedReadEdge(Max(lastReadEdge, preImmediateWriteEdge));
2374+
const TRowVersion edge = Max(lastReadEdge, preImmediateWriteEdge);
2375+
LOG_TRACE_S(*TlsActivationContext, NKikimrServices::TX_DATASHARD, "CheckMediatorStateRestored at " << TabletID()
2376+
<< " promoting UnprotectedReadEdge to " << edge);
2377+
SnapshotManager.PromoteUnprotectedReadEdge(edge);
23352378
}
23362379

23372380
// Promote the replied immediate write edge up to the currently observed step
@@ -2340,7 +2383,7 @@ void TDataShard::CheckMediatorStateRestored() {
23402383
// data that is definitely not replied yet.
23412384
if (SnapshotManager.GetImmediateWriteEdgeReplied() < SnapshotManager.GetImmediateWriteEdge()) {
23422385
const ui64 writeStep = SnapshotManager.GetImmediateWriteEdge().Step;
2343-
const TRowVersion edge(GetMaxObservedStep(), Max<ui64>());
2386+
const TRowVersion edge(observedStep, Max<ui64>());
23442387
SnapshotManager.PromoteImmediateWriteEdgeReplied(
23452388
Min(edge, SnapshotManager.GetImmediateWriteEdge()));
23462389
// Try to ensure writes become visible sooner rather than later
@@ -2477,6 +2520,10 @@ bool TDataShard::CheckDataTxReject(const TString& opDescr,
24772520
rejectDescriptions.push_back(TStringBuilder()
24782521
<< "is in process of split opId " << DstSplitOpId
24792522
<< " state " << DatashardStateName(State));
2523+
} else if (State == TShardState::WaitScheme) {
2524+
reject = true;
2525+
rejectReasons |= ERejectReasons::WrongState;
2526+
rejectDescriptions.push_back("is not created yet");
24802527
} else if (State == TShardState::PreOffline || State == TShardState::Offline) {
24812528
reject = true;
24822529
rejectStatus = NKikimrTxDataShard::TEvProposeTransactionResult::ERROR;
@@ -2639,6 +2686,11 @@ void TDataShard::Handle(TEvDataShard::TEvProposeTransaction::TPtr &ev, const TAc
26392686
auto* msg = ev->Get();
26402687
LWTRACK(ProposeTransactionRequest, msg->Orbit);
26412688

2689+
if (CheckDataTxRejectAndReply(ev, ctx)) {
2690+
IncCounter(COUNTER_PREPARE_REQUEST);
2691+
return;
2692+
}
2693+
26422694
// Check if we need to delay an immediate transaction
26432695
if (MediatorStateWaiting &&
26442696
(ev->Get()->GetFlags() & TTxFlags::Immediate) &&
@@ -2671,10 +2723,6 @@ void TDataShard::Handle(TEvDataShard::TEvProposeTransaction::TPtr &ev, const TAc
26712723

26722724
IncCounter(COUNTER_PREPARE_REQUEST);
26732725

2674-
if (CheckDataTxRejectAndReply(ev, ctx)) {
2675-
return;
2676-
}
2677-
26782726
switch (ev->Get()->GetTxKind()) {
26792727
case NKikimrTxDataShard::TX_KIND_DATA:
26802728
case NKikimrTxDataShard::TX_KIND_SCAN:

ydb/core/tx/datashard/datashard_split_dst.cpp

Lines changed: 34 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -175,6 +175,7 @@ class TDataShard::TTxSplitTransferSnapshot : public NTabletFlatExecutor::TTransa
175175

176176
LOG_DEBUG_S(ctx, NKikimrServices::TX_DATASHARD, Self->TabletID() << " Received snapshot for split/merge TxId " << opId
177177
<< " from tabeltId " << srcTabletId);
178+
LOG_TRACE_S(ctx, NKikimrServices::TX_DATASHARD, Self->TabletID() << " Received snapshot: " << record.DebugString());
178179

179180
if (!Self->DstSplitSchemaInitialized) {
180181
LegacyInitSchema(txc);
@@ -291,8 +292,9 @@ class TDataShard::TTxSplitTransferSnapshot : public NTabletFlatExecutor::TTransa
291292
Self->PromoteFollowerReadEdge(txc);
292293
}
293294

294-
Self->State = TShardState::Ready;
295-
Self->PersistSys(db, Schema::Sys_State, Self->State);
295+
// Note: we persist Ready, but keep current state in memory until Complete
296+
Self->SetPersistState(TShardState::Ready, txc);
297+
Self->State = TShardState::SplitDstReceivingSnapshot;
296298
}
297299

298300
return true;
@@ -306,9 +308,36 @@ class TDataShard::TTxSplitTransferSnapshot : public NTabletFlatExecutor::TTransa
306308

307309
ctx.Send(ackTo, new TEvDataShard::TEvSplitTransferSnapshotAck(opId, Self->TabletID()));
308310

309-
if (LastSnapshotReceived) {
310-
// We have received all the data, reload everything from the received system tables
311-
Self->Execute(Self->CreateTxInit(), ctx);
311+
// Note: we skip init in an unlikely event of state resetting between Execute and Complete
312+
if (LastSnapshotReceived && Self->State == TShardState::SplitDstReceivingSnapshot) {
313+
// We have received all the data, finish shard initialization
314+
// Note: previously we used TxInit, however received system tables
315+
// have been empty for years now, and since pipes are still open we
316+
// may receive requests between TxInit loading the Ready state and
317+
// its Complete method initializing everything properly. Instead
318+
// necessary steps are repeated here.
319+
Self->State = TShardState::Ready;
320+
321+
// We are already in StateWork, but we need to repeat many steps now that we are Ready
322+
Self->SwitchToWork(ctx);
323+
324+
// We can send the registration request now that we are ready
325+
Self->SendRegistrationRequestTimeCast(ctx);
326+
327+
// Initialize snapshot expiration queue with current context time
328+
Self->GetSnapshotManager().InitExpireQueue(ctx.Now());
329+
if (Self->GetSnapshotManager().HasExpiringSnapshots()) {
330+
Self->PlanCleanup(ctx);
331+
}
332+
333+
// Initialize change senders
334+
Self->KillChangeSender(ctx);
335+
Self->CreateChangeSender(ctx);
336+
Self->MaybeActivateChangeSender(ctx);
337+
Self->EmitHeartbeats();
338+
339+
// Switch mvcc state if needed
340+
Self->CheckMvccStateChangeCanStart(ctx);
312341
}
313342
}
314343
};

ydb/core/tx/datashard/datashard_split_src.cpp

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -529,6 +529,15 @@ class TDataShard::TTxSplitPartitioningChanged : public NTabletFlatExecutor::TTra
529529
}
530530
}
531531

532+
if (!Self->MediatorDelayedReplies.empty()) {
533+
// We have some pending mediator replies, which must not be replied.
534+
// Unfortunately we may linger around for a long time, and clients
535+
// would keep awaiting replies for all that time. We have to make
536+
// sure those clients receive an appropriate disconnection error
537+
// instead.
538+
ctx.Send(Self->SelfId(), new TEvents::TEvPoison);
539+
}
540+
532541
// TODO: properly check if there are no loans
533542
Self->CheckStateChange(ctx);
534543
}

0 commit comments

Comments
 (0)