Skip to content

Fix stale read anomalies detected with Jepsen #2374

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions ydb/core/tx/datashard/create_table_unit.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,7 @@ EExecutionStatus TCreateTableUnit::Execute(TOperation::TPtr op,
txc.DB.NoMoreReadsForTx();
DataShard.SetPersistState(TShardState::Ready, txc);
DataShard.CheckMvccStateChangeCanStart(ctx); // Recheck
DataShard.SendRegistrationRequestTimeCast(ctx);
}

return EExecutionStatus::DelayCompleteNoMoreRestarts;
Expand Down
182 changes: 118 additions & 64 deletions ydb/core/tx/datashard/datashard.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -398,8 +398,23 @@ void TDataShard::SendRegistrationRequestTimeCast(const TActorContext &ctx) {
if (RegistrationSended)
return;

if (!ProcessingParams)
if (!ProcessingParams) {
LOG_DEBUG_S(ctx, NKikimrServices::TX_DATASHARD, TabletID()
<< " not sending time cast registration request in state "
<< DatashardStateName(State)
<< ": missing processing params");
return;
}

if (State == TShardState::WaitScheme ||
State == TShardState::SplitDstReceivingSnapshot)
{
// We don't have all the necessary info yet
LOG_DEBUG_S(ctx, NKikimrServices::TX_DATASHARD, TabletID()
<< " not sending time cast registration request in state "
<< DatashardStateName(State));
return;
}

LOG_INFO_S(ctx, NKikimrServices::TX_DATASHARD, "Send registration request to time cast "
<< DatashardStateName(State) << " tabletId " << TabletID()
Expand Down Expand Up @@ -1961,68 +1976,81 @@ TRowVersion TDataShard::GetMvccTxVersion(EMvccTxMode mode, TOperation* op) const
}
}

TRowVersion edge;
TRowVersion readEdge = Max(
SnapshotManager.GetCompleteEdge(),
SnapshotManager.GetUnprotectedReadEdge());
TRowVersion writeEdge = Max(readEdge, SnapshotManager.GetIncompleteEdge());
switch (mode) {
case EMvccTxMode::ReadOnly:
// With read-only transactions we don't need reads to include
// changes made at the incomplete edge, as that is a point where
// distributed transactions performed some reads, not writes.
// Since incomplete transactions are still inflight, the actual
// version will stick to the first incomplete transaction is queue,
// effectively reading non-repeatable state before that transaction.
edge = readEdge;
break;
case EMvccTxMode::ReadWrite:
// With read-write transactions we must choose a point that is
// greater than both complete and incomplete edges. The reason
// is that incomplete transactions performed some reads at that
// point and these snapshot points must be repeatable.
// Note that as soon as the first write past the IncompleteEdge
// happens it cements all distributed transactions up to that point
// as complete, so all future reads and writes are guaranteed to
// include that point as well.
edge = writeEdge;
break;
}
LOG_TRACE_S(*TlsActivationContext, NKikimrServices::TX_DATASHARD, "GetMvccTxVersion at " << TabletID()
<< " CompleteEdge# " << SnapshotManager.GetCompleteEdge()
<< " IncompleteEdge# " << SnapshotManager.GetIncompleteEdge()
<< " UnprotectedReadEdge# " << SnapshotManager.GetUnprotectedReadEdge()
<< " ImmediateWriteEdge# " << SnapshotManager.GetImmediateWriteEdge()
<< " ImmediateWriteEdgeReplied# " << SnapshotManager.GetImmediateWriteEdgeReplied());

TRowVersion version = [&]() {
TRowVersion edge;
TRowVersion readEdge = Max(
SnapshotManager.GetCompleteEdge(),
SnapshotManager.GetUnprotectedReadEdge());
TRowVersion writeEdge = Max(readEdge, SnapshotManager.GetIncompleteEdge());
switch (mode) {
case EMvccTxMode::ReadOnly:
// With read-only transactions we don't need reads to include
// changes made at the incomplete edge, as that is a point where
// distributed transactions performed some reads, not writes.
// Since incomplete transactions are still inflight, the actual
// version will stick to the first incomplete transaction is queue,
// effectively reading non-repeatable state before that transaction.
edge = readEdge;
break;
case EMvccTxMode::ReadWrite:
// With read-write transactions we must choose a point that is
// greater than both complete and incomplete edges. The reason
// is that incomplete transactions performed some reads at that
// point and these snapshot points must be repeatable.
// Note that as soon as the first write past the IncompleteEdge
// happens it cements all distributed transactions up to that point
// as complete, so all future reads and writes are guaranteed to
// include that point as well.
edge = writeEdge;
break;
}

// If there's any planned operation that is above our edge, it would be a
// suitable version for a new immediate operation. We effectively try to
// execute "before" that point if possible.
if (auto nextOp = Pipeline.GetNextPlannedOp(edge.Step, edge.TxId))
return TRowVersion(nextOp->GetStep(), nextOp->GetTxId());
// If there's any planned operation that is above our edge, it would be a
// suitable version for a new immediate operation. We effectively try to
// execute "before" that point if possible.
if (auto nextOp = Pipeline.GetNextPlannedOp(edge.Step, edge.TxId))
return TRowVersion(nextOp->GetStep(), nextOp->GetTxId());

// Normally we stick transactions to the end of the last known mediator step
// Note this calculations only happen when we don't have distributed
// transactions left in queue, and we won't have any more transactions
// up to the current mediator time. The mediator time itself may be stale,
// in which case we may have evidence of its higher value via complete and
// incomplete edges above.
const ui64 mediatorStep = Max(MediatorTimeCastEntry ? MediatorTimeCastEntry->Get(TabletID()) : 0, writeEdge.Step);
TRowVersion mediatorEdge(mediatorStep, ::Max<ui64>());

switch (mode) {
case EMvccTxMode::ReadOnly: {
// We read at the end of the current step
return mediatorEdge;
}

case EMvccTxMode::ReadWrite: {
// We write at the end of the current step, or the start of the next step when that's protected
return Max(mediatorEdge, writeEdge.Next());
}
}

// Normally we stick transactions to the end of the last known mediator step
// Note this calculations only happen when we don't have distributed
// transactions left in queue, and we won't have any more transactions
// up to the current mediator time. The mediator time itself may be stale,
// in which case we may have evidence of its higher value via complete and
// incomplete edges above.
const ui64 mediatorStep = Max(MediatorTimeCastEntry ? MediatorTimeCastEntry->Get(TabletID()) : 0, writeEdge.Step);
TRowVersion mediatorEdge(mediatorStep, ::Max<ui64>());
Y_ABORT("unreachable");
}();

switch (mode) {
case EMvccTxMode::ReadOnly: {
// We want to include everything that was potentially confirmed to
// users, but we don't want to include anything that is not replied
// at the start of this read.
// Note it's only possible to have ImmediateWriteEdge > mediatorEdge
// when ImmediateWriteEdge == mediatorEdge + 1
return Max(mediatorEdge, SnapshotManager.GetImmediateWriteEdgeReplied());
// We must read all writes we have replied to already
return Max(version, SnapshotManager.GetImmediateWriteEdgeReplied());
}

case EMvccTxMode::ReadWrite: {
// We must use at least a previously used immediate write edge
// But we must also avoid trumpling over any unprotected mvcc
// snapshot reads that have occurred.
// Note it's only possible to go past the last known mediator step
// when we had an unprotected read, which itself happens at the
// last mediator step. So we may only ever have a +1 step, never
// anything more.
return Max(mediatorEdge, writeEdge.Next(), SnapshotManager.GetImmediateWriteEdge());
// We must never go backwards in our single-shard writes
return Max(version, SnapshotManager.GetImmediateWriteEdge());
}
}

Expand Down Expand Up @@ -2075,6 +2103,8 @@ TDataShard::TPromotePostExecuteEdges TDataShard::PromoteImmediatePostExecuteEdge
// We need to wait for completion until the flag is committed
res.WaitCompletion = true;
}
LOG_TRACE_S(*TlsActivationContext, NKikimrServices::TX_DATASHARD, "PromoteImmediatePostExecuteEdges at " << TabletID()
<< " promoting UnprotectedReadEdge to " << version);
SnapshotManager.PromoteUnprotectedReadEdge(version);

// We want to promote the complete edge when protected reads are
Expand Down Expand Up @@ -2237,6 +2267,19 @@ void TDataShard::SendAfterMediatorStepActivate(ui64 mediatorStep, const TActorCo
for (auto it = MediatorDelayedReplies.begin(); it != MediatorDelayedReplies.end();) {
const ui64 step = it->first.Step;

if (SrcSplitDescription) {
if (State == TShardState::SplitSrcSendingSnapshot ||
State == TShardState::SplitSrcWaitForPartitioningChanged ||
State == TShardState::PreOffline ||
State == TShardState::Offline)
{
// We cannot send replies, since dst shard is now in charge
// of keeping track of acknowledged writes. So we expect
// split src logic to reboot this shard later.
break;
}
}

if (step <= mediatorStep) {
SnapshotManager.PromoteImmediateWriteEdgeReplied(it->first);
Send(it->second.Target, it->second.Event.Release(), 0, it->second.Cookie);
Expand Down Expand Up @@ -2304,13 +2347,16 @@ void TDataShard::CheckMediatorStateRestored() {
// HEAD reads must include that in their results.
const ui64 waitStep = CoordinatorPrevReadStepMax;
const ui64 readStep = CoordinatorPrevReadStepMax;

LOG_DEBUG_S(*TlsActivationContext, NKikimrServices::TX_DATASHARD, "CheckMediatorStateRestored: waitStep# " << waitStep << " readStep# " << readStep);
const ui64 observedStep = GetMaxObservedStep();
LOG_DEBUG_S(*TlsActivationContext, NKikimrServices::TX_DATASHARD, "CheckMediatorStateRestored at " << TabletID() << ":"
<< " waitStep# " << waitStep
<< " readStep# " << readStep
<< " observedStep# " << observedStep);

// WARNING: we must perform this check BEFORE we update unprotected read edge
// We may enter this code path multiple times, and we expect that the above
// read step may be refined while we wait based on pessimistic backup step.
if (GetMaxObservedStep() < waitStep) {
if (observedStep < waitStep) {
// We need to wait until we observe mediator step that is at least
// as large as the step we found.
if (MediatorTimeCastWaitingSteps.insert(waitStep).second) {
Expand All @@ -2331,7 +2377,10 @@ void TDataShard::CheckMediatorStateRestored() {
SnapshotManager.GetImmediateWriteEdge().Step > SnapshotManager.GetCompleteEdge().Step
? SnapshotManager.GetImmediateWriteEdge().Prev()
: TRowVersion::Min();
SnapshotManager.PromoteUnprotectedReadEdge(Max(lastReadEdge, preImmediateWriteEdge));
const TRowVersion edge = Max(lastReadEdge, preImmediateWriteEdge);
LOG_TRACE_S(*TlsActivationContext, NKikimrServices::TX_DATASHARD, "CheckMediatorStateRestored at " << TabletID()
<< " promoting UnprotectedReadEdge to " << edge);
SnapshotManager.PromoteUnprotectedReadEdge(edge);
}

// Promote the replied immediate write edge up to the currently observed step
Expand All @@ -2340,7 +2389,7 @@ void TDataShard::CheckMediatorStateRestored() {
// data that is definitely not replied yet.
if (SnapshotManager.GetImmediateWriteEdgeReplied() < SnapshotManager.GetImmediateWriteEdge()) {
const ui64 writeStep = SnapshotManager.GetImmediateWriteEdge().Step;
const TRowVersion edge(GetMaxObservedStep(), Max<ui64>());
const TRowVersion edge(observedStep, Max<ui64>());
SnapshotManager.PromoteImmediateWriteEdgeReplied(
Min(edge, SnapshotManager.GetImmediateWriteEdge()));
// Try to ensure writes become visible sooner rather than later
Expand Down Expand Up @@ -2477,6 +2526,10 @@ bool TDataShard::CheckDataTxReject(const TString& opDescr,
rejectDescriptions.push_back(TStringBuilder()
<< "is in process of split opId " << DstSplitOpId
<< " state " << DatashardStateName(State));
} else if (State == TShardState::WaitScheme) {
reject = true;
rejectReasons |= ERejectReasons::WrongState;
rejectDescriptions.push_back("is not created yet");
} else if (State == TShardState::PreOffline || State == TShardState::Offline) {
reject = true;
rejectStatus = NKikimrTxDataShard::TEvProposeTransactionResult::ERROR;
Expand Down Expand Up @@ -2639,6 +2692,11 @@ void TDataShard::Handle(TEvDataShard::TEvProposeTransaction::TPtr &ev, const TAc
auto* msg = ev->Get();
LWTRACK(ProposeTransactionRequest, msg->Orbit);

if (CheckDataTxRejectAndReply(ev, ctx)) {
IncCounter(COUNTER_PREPARE_REQUEST);
return;
}

// Check if we need to delay an immediate transaction
if (MediatorStateWaiting &&
(ev->Get()->GetFlags() & TTxFlags::Immediate) &&
Expand Down Expand Up @@ -2671,10 +2729,6 @@ void TDataShard::Handle(TEvDataShard::TEvProposeTransaction::TPtr &ev, const TAc

IncCounter(COUNTER_PREPARE_REQUEST);

if (CheckDataTxRejectAndReply(ev, ctx)) {
return;
}

switch (ev->Get()->GetTxKind()) {
case NKikimrTxDataShard::TX_KIND_DATA:
case NKikimrTxDataShard::TX_KIND_SCAN:
Expand Down
39 changes: 34 additions & 5 deletions ydb/core/tx/datashard/datashard_split_dst.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -175,6 +175,7 @@ class TDataShard::TTxSplitTransferSnapshot : public NTabletFlatExecutor::TTransa

LOG_DEBUG_S(ctx, NKikimrServices::TX_DATASHARD, Self->TabletID() << " Received snapshot for split/merge TxId " << opId
<< " from tabeltId " << srcTabletId);
LOG_TRACE_S(ctx, NKikimrServices::TX_DATASHARD, Self->TabletID() << " Received snapshot: " << record.DebugString());

if (!Self->DstSplitSchemaInitialized) {
LegacyInitSchema(txc);
Expand Down Expand Up @@ -291,8 +292,9 @@ class TDataShard::TTxSplitTransferSnapshot : public NTabletFlatExecutor::TTransa
Self->PromoteFollowerReadEdge(txc);
}

Self->State = TShardState::Ready;
Self->PersistSys(db, Schema::Sys_State, Self->State);
// Note: we persist Ready, but keep current state in memory until Complete
Self->SetPersistState(TShardState::Ready, txc);
Self->State = TShardState::SplitDstReceivingSnapshot;
}

return true;
Expand All @@ -306,9 +308,36 @@ class TDataShard::TTxSplitTransferSnapshot : public NTabletFlatExecutor::TTransa

ctx.Send(ackTo, new TEvDataShard::TEvSplitTransferSnapshotAck(opId, Self->TabletID()));

if (LastSnapshotReceived) {
// We have received all the data, reload everything from the received system tables
Self->Execute(Self->CreateTxInit(), ctx);
// Note: we skip init in an unlikely event of state resetting between Execute and Complete
if (LastSnapshotReceived && Self->State == TShardState::SplitDstReceivingSnapshot) {
// We have received all the data, finish shard initialization
// Note: previously we used TxInit, however received system tables
// have been empty for years now, and since pipes are still open we
// may receive requests between TxInit loading the Ready state and
// its Complete method initializing everything properly. Instead
// necessary steps are repeated here.
Self->State = TShardState::Ready;

// We are already in StateWork, but we need to repeat many steps now that we are Ready
Self->SwitchToWork(ctx);

// We can send the registration request now that we are ready
Self->SendRegistrationRequestTimeCast(ctx);

// Initialize snapshot expiration queue with current context time
Self->GetSnapshotManager().InitExpireQueue(ctx.Now());
if (Self->GetSnapshotManager().HasExpiringSnapshots()) {
Self->PlanCleanup(ctx);
}

// Initialize change senders
Self->KillChangeSender(ctx);
Self->CreateChangeSender(ctx);
Self->MaybeActivateChangeSender(ctx);
Self->EmitHeartbeats();

// Switch mvcc state if needed
Self->CheckMvccStateChangeCanStart(ctx);
}
}
};
Expand Down
9 changes: 9 additions & 0 deletions ydb/core/tx/datashard/datashard_split_src.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -529,6 +529,15 @@ class TDataShard::TTxSplitPartitioningChanged : public NTabletFlatExecutor::TTra
}
}

if (!Self->MediatorDelayedReplies.empty()) {
// We have some pending mediator replies, which must not be replied.
// Unfortunately we may linger around for a long time, and clients
// would keep awaiting replies for all that time. We have to make
// sure those clients receive an appropriate disconnection error
// instead.
ctx.Send(Self->SelfId(), new TEvents::TEvPoison);
}

// TODO: properly check if there are no loans
Self->CheckStateChange(ctx);
}
Expand Down
Loading