Skip to content

Commit fbaace1

Browse files
authored
Merge 56bd581 into b7a3970
2 parents b7a3970 + 56bd581 commit fbaace1

File tree

6 files changed

+729
-70
lines changed

6 files changed

+729
-70
lines changed

ydb/core/tx/datashard/create_table_unit.cpp

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -86,6 +86,7 @@ EExecutionStatus TCreateTableUnit::Execute(TOperation::TPtr op,
8686
txc.DB.NoMoreReadsForTx();
8787
DataShard.SetPersistState(TShardState::Ready, txc);
8888
DataShard.CheckMvccStateChangeCanStart(ctx); // Recheck
89+
DataShard.SendRegistrationRequestTimeCast(ctx);
8990
}
9091

9192
return EExecutionStatus::DelayCompleteNoMoreRestarts;

ydb/core/tx/datashard/datashard.cpp

Lines changed: 118 additions & 64 deletions
Original file line numberDiff line numberDiff line change
@@ -398,8 +398,23 @@ void TDataShard::SendRegistrationRequestTimeCast(const TActorContext &ctx) {
398398
if (RegistrationSended)
399399
return;
400400

401-
if (!ProcessingParams)
401+
if (!ProcessingParams) {
402+
LOG_DEBUG_S(ctx, NKikimrServices::TX_DATASHARD, TabletID()
403+
<< " not sending time cast registration request in state "
404+
<< DatashardStateName(State)
405+
<< ": missing processing params");
402406
return;
407+
}
408+
409+
if (State == TShardState::WaitScheme ||
410+
State == TShardState::SplitDstReceivingSnapshot)
411+
{
412+
// We don't have all the necessary info yet
413+
LOG_DEBUG_S(ctx, NKikimrServices::TX_DATASHARD, TabletID()
414+
<< " not sending time cast registration request in state "
415+
<< DatashardStateName(State));
416+
return;
417+
}
403418

404419
LOG_INFO_S(ctx, NKikimrServices::TX_DATASHARD, "Send registration request to time cast "
405420
<< DatashardStateName(State) << " tabletId " << TabletID()
@@ -1961,68 +1976,81 @@ TRowVersion TDataShard::GetMvccTxVersion(EMvccTxMode mode, TOperation* op) const
19611976
}
19621977
}
19631978

1964-
TRowVersion edge;
1965-
TRowVersion readEdge = Max(
1966-
SnapshotManager.GetCompleteEdge(),
1967-
SnapshotManager.GetUnprotectedReadEdge());
1968-
TRowVersion writeEdge = Max(readEdge, SnapshotManager.GetIncompleteEdge());
1969-
switch (mode) {
1970-
case EMvccTxMode::ReadOnly:
1971-
// With read-only transactions we don't need reads to include
1972-
// changes made at the incomplete edge, as that is a point where
1973-
// distributed transactions performed some reads, not writes.
1974-
// Since incomplete transactions are still inflight, the actual
1975-
// version will stick to the first incomplete transaction is queue,
1976-
// effectively reading non-repeatable state before that transaction.
1977-
edge = readEdge;
1978-
break;
1979-
case EMvccTxMode::ReadWrite:
1980-
// With read-write transactions we must choose a point that is
1981-
// greater than both complete and incomplete edges. The reason
1982-
// is that incomplete transactions performed some reads at that
1983-
// point and these snapshot points must be repeatable.
1984-
// Note that as soon as the first write past the IncompleteEdge
1985-
// happens it cements all distributed transactions up to that point
1986-
// as complete, so all future reads and writes are guaranteed to
1987-
// include that point as well.
1988-
edge = writeEdge;
1989-
break;
1990-
}
1979+
LOG_TRACE_S(*TlsActivationContext, NKikimrServices::TX_DATASHARD, "GetMvccTxVersion at " << TabletID()
1980+
<< " CompleteEdge# " << SnapshotManager.GetCompleteEdge()
1981+
<< " IncompleteEdge# " << SnapshotManager.GetIncompleteEdge()
1982+
<< " UnprotectedReadEdge# " << SnapshotManager.GetUnprotectedReadEdge()
1983+
<< " ImmediateWriteEdge# " << SnapshotManager.GetImmediateWriteEdge()
1984+
<< " ImmediateWriteEdgeReplied# " << SnapshotManager.GetImmediateWriteEdgeReplied());
1985+
1986+
TRowVersion version = [&]() {
1987+
TRowVersion edge;
1988+
TRowVersion readEdge = Max(
1989+
SnapshotManager.GetCompleteEdge(),
1990+
SnapshotManager.GetUnprotectedReadEdge());
1991+
TRowVersion writeEdge = Max(readEdge, SnapshotManager.GetIncompleteEdge());
1992+
switch (mode) {
1993+
case EMvccTxMode::ReadOnly:
1994+
// With read-only transactions we don't need reads to include
1995+
// changes made at the incomplete edge, as that is a point where
1996+
// distributed transactions performed some reads, not writes.
1997+
// Since incomplete transactions are still inflight, the actual
1998+
// version will stick to the first incomplete transaction is queue,
1999+
// effectively reading non-repeatable state before that transaction.
2000+
edge = readEdge;
2001+
break;
2002+
case EMvccTxMode::ReadWrite:
2003+
// With read-write transactions we must choose a point that is
2004+
// greater than both complete and incomplete edges. The reason
2005+
// is that incomplete transactions performed some reads at that
2006+
// point and these snapshot points must be repeatable.
2007+
// Note that as soon as the first write past the IncompleteEdge
2008+
// happens it cements all distributed transactions up to that point
2009+
// as complete, so all future reads and writes are guaranteed to
2010+
// include that point as well.
2011+
edge = writeEdge;
2012+
break;
2013+
}
19912014

1992-
// If there's any planned operation that is above our edge, it would be a
1993-
// suitable version for a new immediate operation. We effectively try to
1994-
// execute "before" that point if possible.
1995-
if (auto nextOp = Pipeline.GetNextPlannedOp(edge.Step, edge.TxId))
1996-
return TRowVersion(nextOp->GetStep(), nextOp->GetTxId());
2015+
// If there's any planned operation that is above our edge, it would be a
2016+
// suitable version for a new immediate operation. We effectively try to
2017+
// execute "before" that point if possible.
2018+
if (auto nextOp = Pipeline.GetNextPlannedOp(edge.Step, edge.TxId))
2019+
return TRowVersion(nextOp->GetStep(), nextOp->GetTxId());
2020+
2021+
// Normally we stick transactions to the end of the last known mediator step
2022+
// Note this calculations only happen when we don't have distributed
2023+
// transactions left in queue, and we won't have any more transactions
2024+
// up to the current mediator time. The mediator time itself may be stale,
2025+
// in which case we may have evidence of its higher value via complete and
2026+
// incomplete edges above.
2027+
const ui64 mediatorStep = Max(MediatorTimeCastEntry ? MediatorTimeCastEntry->Get(TabletID()) : 0, writeEdge.Step);
2028+
TRowVersion mediatorEdge(mediatorStep, ::Max<ui64>());
2029+
2030+
switch (mode) {
2031+
case EMvccTxMode::ReadOnly: {
2032+
// We read at the end of the current step
2033+
return mediatorEdge;
2034+
}
2035+
2036+
case EMvccTxMode::ReadWrite: {
2037+
// We write at the end of the current step, or the start of the next step when that's protected
2038+
return Max(mediatorEdge, writeEdge.Next());
2039+
}
2040+
}
19972041

1998-
// Normally we stick transactions to the end of the last known mediator step
1999-
// Note this calculations only happen when we don't have distributed
2000-
// transactions left in queue, and we won't have any more transactions
2001-
// up to the current mediator time. The mediator time itself may be stale,
2002-
// in which case we may have evidence of its higher value via complete and
2003-
// incomplete edges above.
2004-
const ui64 mediatorStep = Max(MediatorTimeCastEntry ? MediatorTimeCastEntry->Get(TabletID()) : 0, writeEdge.Step);
2005-
TRowVersion mediatorEdge(mediatorStep, ::Max<ui64>());
2042+
Y_ABORT("unreachable");
2043+
}();
20062044

20072045
switch (mode) {
20082046
case EMvccTxMode::ReadOnly: {
2009-
// We want to include everything that was potentially confirmed to
2010-
// users, but we don't want to include anything that is not replied
2011-
// at the start of this read.
2012-
// Note it's only possible to have ImmediateWriteEdge > mediatorEdge
2013-
// when ImmediateWriteEdge == mediatorEdge + 1
2014-
return Max(mediatorEdge, SnapshotManager.GetImmediateWriteEdgeReplied());
2047+
// We must read all writes we have replied to already
2048+
return Max(version, SnapshotManager.GetImmediateWriteEdgeReplied());
20152049
}
20162050

20172051
case EMvccTxMode::ReadWrite: {
2018-
// We must use at least a previously used immediate write edge
2019-
// But we must also avoid trumpling over any unprotected mvcc
2020-
// snapshot reads that have occurred.
2021-
// Note it's only possible to go past the last known mediator step
2022-
// when we had an unprotected read, which itself happens at the
2023-
// last mediator step. So we may only ever have a +1 step, never
2024-
// anything more.
2025-
return Max(mediatorEdge, writeEdge.Next(), SnapshotManager.GetImmediateWriteEdge());
2052+
// We must never go backwards in our single-shard writes
2053+
return Max(version, SnapshotManager.GetImmediateWriteEdge());
20262054
}
20272055
}
20282056

@@ -2075,6 +2103,8 @@ TDataShard::TPromotePostExecuteEdges TDataShard::PromoteImmediatePostExecuteEdge
20752103
// We need to wait for completion until the flag is committed
20762104
res.WaitCompletion = true;
20772105
}
2106+
LOG_TRACE_S(*TlsActivationContext, NKikimrServices::TX_DATASHARD, "PromoteImmediatePostExecuteEdges at " << TabletID()
2107+
<< " promoting UnprotectedReadEdge to " << version);
20782108
SnapshotManager.PromoteUnprotectedReadEdge(version);
20792109

20802110
// We want to promote the complete edge when protected reads are
@@ -2237,6 +2267,19 @@ void TDataShard::SendAfterMediatorStepActivate(ui64 mediatorStep, const TActorCo
22372267
for (auto it = MediatorDelayedReplies.begin(); it != MediatorDelayedReplies.end();) {
22382268
const ui64 step = it->first.Step;
22392269

2270+
if (SrcSplitDescription) {
2271+
if (State == TShardState::SplitSrcSendingSnapshot ||
2272+
State == TShardState::SplitSrcWaitForPartitioningChanged ||
2273+
State == TShardState::PreOffline ||
2274+
State == TShardState::Offline)
2275+
{
2276+
// We cannot send replies, since dst shard is now in charge
2277+
// of keeping track of acknowledged writes. So we expect
2278+
// split src logic to reboot this shard later.
2279+
break;
2280+
}
2281+
}
2282+
22402283
if (step <= mediatorStep) {
22412284
SnapshotManager.PromoteImmediateWriteEdgeReplied(it->first);
22422285
Send(it->second.Target, it->second.Event.Release(), 0, it->second.Cookie);
@@ -2304,13 +2347,16 @@ void TDataShard::CheckMediatorStateRestored() {
23042347
// HEAD reads must include that in their results.
23052348
const ui64 waitStep = CoordinatorPrevReadStepMax;
23062349
const ui64 readStep = CoordinatorPrevReadStepMax;
2307-
2308-
LOG_DEBUG_S(*TlsActivationContext, NKikimrServices::TX_DATASHARD, "CheckMediatorStateRestored: waitStep# " << waitStep << " readStep# " << readStep);
2350+
const ui64 observedStep = GetMaxObservedStep();
2351+
LOG_DEBUG_S(*TlsActivationContext, NKikimrServices::TX_DATASHARD, "CheckMediatorStateRestored at " << TabletID() << ":"
2352+
<< " waitStep# " << waitStep
2353+
<< " readStep# " << readStep
2354+
<< " observedStep# " << observedStep);
23092355

23102356
// WARNING: we must perform this check BEFORE we update unprotected read edge
23112357
// We may enter this code path multiple times, and we expect that the above
23122358
// read step may be refined while we wait based on pessimistic backup step.
2313-
if (GetMaxObservedStep() < waitStep) {
2359+
if (observedStep < waitStep) {
23142360
// We need to wait until we observe mediator step that is at least
23152361
// as large as the step we found.
23162362
if (MediatorTimeCastWaitingSteps.insert(waitStep).second) {
@@ -2331,7 +2377,10 @@ void TDataShard::CheckMediatorStateRestored() {
23312377
SnapshotManager.GetImmediateWriteEdge().Step > SnapshotManager.GetCompleteEdge().Step
23322378
? SnapshotManager.GetImmediateWriteEdge().Prev()
23332379
: TRowVersion::Min();
2334-
SnapshotManager.PromoteUnprotectedReadEdge(Max(lastReadEdge, preImmediateWriteEdge));
2380+
const TRowVersion edge = Max(lastReadEdge, preImmediateWriteEdge);
2381+
LOG_TRACE_S(*TlsActivationContext, NKikimrServices::TX_DATASHARD, "CheckMediatorStateRestored at " << TabletID()
2382+
<< " promoting UnprotectedReadEdge to " << edge);
2383+
SnapshotManager.PromoteUnprotectedReadEdge(edge);
23352384
}
23362385

23372386
// Promote the replied immediate write edge up to the currently observed step
@@ -2340,7 +2389,7 @@ void TDataShard::CheckMediatorStateRestored() {
23402389
// data that is definitely not replied yet.
23412390
if (SnapshotManager.GetImmediateWriteEdgeReplied() < SnapshotManager.GetImmediateWriteEdge()) {
23422391
const ui64 writeStep = SnapshotManager.GetImmediateWriteEdge().Step;
2343-
const TRowVersion edge(GetMaxObservedStep(), Max<ui64>());
2392+
const TRowVersion edge(observedStep, Max<ui64>());
23442393
SnapshotManager.PromoteImmediateWriteEdgeReplied(
23452394
Min(edge, SnapshotManager.GetImmediateWriteEdge()));
23462395
// Try to ensure writes become visible sooner rather than later
@@ -2477,6 +2526,10 @@ bool TDataShard::CheckDataTxReject(const TString& opDescr,
24772526
rejectDescriptions.push_back(TStringBuilder()
24782527
<< "is in process of split opId " << DstSplitOpId
24792528
<< " state " << DatashardStateName(State));
2529+
} else if (State == TShardState::WaitScheme) {
2530+
reject = true;
2531+
rejectReasons |= ERejectReasons::WrongState;
2532+
rejectDescriptions.push_back("is not created yet");
24802533
} else if (State == TShardState::PreOffline || State == TShardState::Offline) {
24812534
reject = true;
24822535
rejectStatus = NKikimrTxDataShard::TEvProposeTransactionResult::ERROR;
@@ -2639,6 +2692,11 @@ void TDataShard::Handle(TEvDataShard::TEvProposeTransaction::TPtr &ev, const TAc
26392692
auto* msg = ev->Get();
26402693
LWTRACK(ProposeTransactionRequest, msg->Orbit);
26412694

2695+
if (CheckDataTxRejectAndReply(ev, ctx)) {
2696+
IncCounter(COUNTER_PREPARE_REQUEST);
2697+
return;
2698+
}
2699+
26422700
// Check if we need to delay an immediate transaction
26432701
if (MediatorStateWaiting &&
26442702
(ev->Get()->GetFlags() & TTxFlags::Immediate) &&
@@ -2671,10 +2729,6 @@ void TDataShard::Handle(TEvDataShard::TEvProposeTransaction::TPtr &ev, const TAc
26712729

26722730
IncCounter(COUNTER_PREPARE_REQUEST);
26732731

2674-
if (CheckDataTxRejectAndReply(ev, ctx)) {
2675-
return;
2676-
}
2677-
26782732
switch (ev->Get()->GetTxKind()) {
26792733
case NKikimrTxDataShard::TX_KIND_DATA:
26802734
case NKikimrTxDataShard::TX_KIND_SCAN:

ydb/core/tx/datashard/datashard_split_dst.cpp

Lines changed: 34 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -175,6 +175,7 @@ class TDataShard::TTxSplitTransferSnapshot : public NTabletFlatExecutor::TTransa
175175

176176
LOG_DEBUG_S(ctx, NKikimrServices::TX_DATASHARD, Self->TabletID() << " Received snapshot for split/merge TxId " << opId
177177
<< " from tabeltId " << srcTabletId);
178+
LOG_TRACE_S(ctx, NKikimrServices::TX_DATASHARD, Self->TabletID() << " Received snapshot: " << record.DebugString());
178179

179180
if (!Self->DstSplitSchemaInitialized) {
180181
LegacyInitSchema(txc);
@@ -291,8 +292,9 @@ class TDataShard::TTxSplitTransferSnapshot : public NTabletFlatExecutor::TTransa
291292
Self->PromoteFollowerReadEdge(txc);
292293
}
293294

294-
Self->State = TShardState::Ready;
295-
Self->PersistSys(db, Schema::Sys_State, Self->State);
295+
// Note: we persist Ready, but keep current state in memory until Complete
296+
Self->SetPersistState(TShardState::Ready, txc);
297+
Self->State = TShardState::SplitDstReceivingSnapshot;
296298
}
297299

298300
return true;
@@ -306,9 +308,36 @@ class TDataShard::TTxSplitTransferSnapshot : public NTabletFlatExecutor::TTransa
306308

307309
ctx.Send(ackTo, new TEvDataShard::TEvSplitTransferSnapshotAck(opId, Self->TabletID()));
308310

309-
if (LastSnapshotReceived) {
310-
// We have received all the data, reload everything from the received system tables
311-
Self->Execute(Self->CreateTxInit(), ctx);
311+
// Note: we skip init in an unlikely event of state resetting between Execute and Complete
312+
if (LastSnapshotReceived && Self->State == TShardState::SplitDstReceivingSnapshot) {
313+
// We have received all the data, finish shard initialization
314+
// Note: previously we used TxInit, however received system tables
315+
// have been empty for years now, and since pipes are still open we
316+
// may receive requests between TxInit loading the Ready state and
317+
// its Complete method initializing everything properly. Instead
318+
// necessary steps are repeated here.
319+
Self->State = TShardState::Ready;
320+
321+
// We are already in StateWork, but we need to repeat many steps now that we are Ready
322+
Self->SwitchToWork(ctx);
323+
324+
// We can send the registration request now that we are ready
325+
Self->SendRegistrationRequestTimeCast(ctx);
326+
327+
// Initialize snapshot expiration queue with current context time
328+
Self->GetSnapshotManager().InitExpireQueue(ctx.Now());
329+
if (Self->GetSnapshotManager().HasExpiringSnapshots()) {
330+
Self->PlanCleanup(ctx);
331+
}
332+
333+
// Initialize change senders
334+
Self->KillChangeSender(ctx);
335+
Self->CreateChangeSender(ctx);
336+
Self->MaybeActivateChangeSender(ctx);
337+
Self->EmitHeartbeats();
338+
339+
// Switch mvcc state if needed
340+
Self->CheckMvccStateChangeCanStart(ctx);
312341
}
313342
}
314343
};

ydb/core/tx/datashard/datashard_split_src.cpp

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -529,6 +529,15 @@ class TDataShard::TTxSplitPartitioningChanged : public NTabletFlatExecutor::TTra
529529
}
530530
}
531531

532+
if (!Self->MediatorDelayedReplies.empty()) {
533+
// We have some pending mediator replies, which must not be replied.
534+
// Unfortunately we may linger around for a long time, and clients
535+
// would keep awaiting replies for all that time. We have to make
536+
// sure those clients receive an appropriate disconnection error
537+
// instead.
538+
ctx.Send(Self->SelfId(), new TEvents::TEvPoison);
539+
}
540+
532541
// TODO: properly check if there are no loans
533542
Self->CheckStateChange(ctx);
534543
}

0 commit comments

Comments
 (0)