Skip to content

Commit 73264da

Browse files
authored
Merge 2a79013 into df0fa56
2 parents df0fa56 + 2a79013 commit 73264da

File tree

5 files changed

+665
-15
lines changed

5 files changed

+665
-15
lines changed

ydb/core/tx/datashard/create_table_unit.cpp

+1
Original file line numberDiff line numberDiff line change
@@ -86,6 +86,7 @@ EExecutionStatus TCreateTableUnit::Execute(TOperation::TPtr op,
8686
txc.DB.NoMoreReadsForTx();
8787
DataShard.SetPersistState(TShardState::Ready, txc);
8888
DataShard.CheckMvccStateChangeCanStart(ctx); // Recheck
89+
DataShard.SendRegistrationRequestTimeCast(ctx);
8990
}
9091

9192
return EExecutionStatus::DelayCompleteNoMoreRestarts;

ydb/core/tx/datashard/datashard.cpp

+58-10
Original file line numberDiff line numberDiff line change
@@ -399,8 +399,23 @@ void TDataShard::SendRegistrationRequestTimeCast(const TActorContext &ctx) {
399399
if (RegistrationSended)
400400
return;
401401

402-
if (!ProcessingParams)
402+
if (!ProcessingParams) {
403+
LOG_DEBUG_S(ctx, NKikimrServices::TX_DATASHARD, TabletID()
404+
<< " not sending time cast registration request in state "
405+
<< DatashardStateName(State)
406+
<< ": missing processing params");
403407
return;
408+
}
409+
410+
if (State == TShardState::WaitScheme ||
411+
State == TShardState::SplitDstReceivingSnapshot)
412+
{
413+
// We don't have all the necessary info yet
414+
LOG_DEBUG_S(ctx, NKikimrServices::TX_DATASHARD, TabletID()
415+
<< " not sending time cast registration request in state "
416+
<< DatashardStateName(State));
417+
return;
418+
}
404419

405420
LOG_INFO_S(ctx, NKikimrServices::TX_DATASHARD, "Send registration request to time cast "
406421
<< DatashardStateName(State) << " tabletId " << TabletID()
@@ -2028,6 +2043,13 @@ TRowVersion TDataShard::GetMvccTxVersion(EMvccTxMode mode, TOperation* op) const
20282043
}
20292044
}
20302045

2046+
LOG_TRACE_S(*TlsActivationContext, NKikimrServices::TX_DATASHARD, "GetMvccTxVersion at " << TabletID()
2047+
<< " CompleteEdge# " << SnapshotManager.GetCompleteEdge()
2048+
<< " IncompleteEdge# " << SnapshotManager.GetIncompleteEdge()
2049+
<< " UnprotectedReadEdge# " << SnapshotManager.GetUnprotectedReadEdge()
2050+
<< " ImmediateWriteEdge# " << SnapshotManager.GetImmediateWriteEdge()
2051+
<< " ImmediateWriteEdgeReplied# " << SnapshotManager.GetImmediateWriteEdgeReplied());
2052+
20312053
TRowVersion edge;
20322054
TRowVersion readEdge = Max(
20332055
SnapshotManager.GetCompleteEdge(),
@@ -2142,6 +2164,8 @@ TDataShard::TPromotePostExecuteEdges TDataShard::PromoteImmediatePostExecuteEdge
21422164
// We need to wait for completion until the flag is committed
21432165
res.WaitCompletion = true;
21442166
}
2167+
LOG_TRACE_S(*TlsActivationContext, NKikimrServices::TX_DATASHARD, "PromoteImmediatePostExecuteEdges at " << TabletID()
2168+
<< " promoting UnprotectedReadEdge to " << version);
21452169
SnapshotManager.PromoteUnprotectedReadEdge(version);
21462170

21472171
// We want to promote the complete edge when protected reads are
@@ -2304,6 +2328,19 @@ void TDataShard::SendAfterMediatorStepActivate(ui64 mediatorStep, const TActorCo
23042328
for (auto it = MediatorDelayedReplies.begin(); it != MediatorDelayedReplies.end();) {
23052329
const ui64 step = it->first.Step;
23062330

2331+
if (SrcSplitDescription) {
2332+
if (State == TShardState::SplitSrcSendingSnapshot ||
2333+
State == TShardState::SplitSrcWaitForPartitioningChanged ||
2334+
State == TShardState::PreOffline ||
2335+
State == TShardState::Offline)
2336+
{
2337+
// We cannot send replies, since dst shard is now in charge
2338+
// of keeping track of acknowledged writes. So we expect
2339+
// split src logic to reboot this shard later.
2340+
break;
2341+
}
2342+
}
2343+
23072344
if (step <= mediatorStep) {
23082345
SnapshotManager.PromoteImmediateWriteEdgeReplied(it->first);
23092346
Send(it->second.Target, it->second.Event.Release(), 0, it->second.Cookie);
@@ -2371,13 +2408,16 @@ void TDataShard::CheckMediatorStateRestored() {
23712408
// HEAD reads must include that in their results.
23722409
const ui64 waitStep = CoordinatorPrevReadStepMax;
23732410
const ui64 readStep = CoordinatorPrevReadStepMax;
2374-
2375-
LOG_DEBUG_S(*TlsActivationContext, NKikimrServices::TX_DATASHARD, "CheckMediatorStateRestored: waitStep# " << waitStep << " readStep# " << readStep);
2411+
const ui64 observedStep = GetMaxObservedStep();
2412+
LOG_DEBUG_S(*TlsActivationContext, NKikimrServices::TX_DATASHARD, "CheckMediatorStateRestored at " << TabletID() << ":"
2413+
<< " waitStep# " << waitStep
2414+
<< " readStep# " << readStep
2415+
<< " observedStep# " << observedStep);
23762416

23772417
// WARNING: we must perform this check BEFORE we update unprotected read edge
23782418
// We may enter this code path multiple times, and we expect that the above
23792419
// read step may be refined while we wait based on pessimistic backup step.
2380-
if (GetMaxObservedStep() < waitStep) {
2420+
if (observedStep < waitStep) {
23812421
// We need to wait until we observe mediator step that is at least
23822422
// as large as the step we found.
23832423
if (MediatorTimeCastWaitingSteps.insert(waitStep).second) {
@@ -2398,7 +2438,10 @@ void TDataShard::CheckMediatorStateRestored() {
23982438
SnapshotManager.GetImmediateWriteEdge().Step > SnapshotManager.GetCompleteEdge().Step
23992439
? SnapshotManager.GetImmediateWriteEdge().Prev()
24002440
: TRowVersion::Min();
2401-
SnapshotManager.PromoteUnprotectedReadEdge(Max(lastReadEdge, preImmediateWriteEdge));
2441+
const TRowVersion edge = Max(lastReadEdge, preImmediateWriteEdge);
2442+
LOG_TRACE_S(*TlsActivationContext, NKikimrServices::TX_DATASHARD, "CheckMediatorStateRestored at " << TabletID()
2443+
<< " promoting UnprotectedReadEdge to " << edge);
2444+
SnapshotManager.PromoteUnprotectedReadEdge(edge);
24022445
}
24032446

24042447
// Promote the replied immediate write edge up to the currently observed step
@@ -2407,7 +2450,7 @@ void TDataShard::CheckMediatorStateRestored() {
24072450
// data that is definitely not replied yet.
24082451
if (SnapshotManager.GetImmediateWriteEdgeReplied() < SnapshotManager.GetImmediateWriteEdge()) {
24092452
const ui64 writeStep = SnapshotManager.GetImmediateWriteEdge().Step;
2410-
const TRowVersion edge(GetMaxObservedStep(), Max<ui64>());
2453+
const TRowVersion edge(observedStep, Max<ui64>());
24112454
SnapshotManager.PromoteImmediateWriteEdgeReplied(
24122455
Min(edge, SnapshotManager.GetImmediateWriteEdge()));
24132456
// Try to ensure writes become visible sooner rather than later
@@ -2544,6 +2587,10 @@ bool TDataShard::CheckDataTxReject(const TString& opDescr,
25442587
rejectDescriptions.push_back(TStringBuilder()
25452588
<< "is in process of split opId " << DstSplitOpId
25462589
<< " state " << DatashardStateName(State));
2590+
} else if (State == TShardState::WaitScheme) {
2591+
reject = true;
2592+
rejectReasons |= ERejectReasons::WrongState;
2593+
rejectDescriptions.push_back("is not created yet");
25472594
} else if (State == TShardState::PreOffline || State == TShardState::Offline) {
25482595
reject = true;
25492596
rejectStatus = NKikimrTxDataShard::TEvProposeTransactionResult::ERROR;
@@ -2706,6 +2753,11 @@ void TDataShard::Handle(TEvDataShard::TEvProposeTransaction::TPtr &ev, const TAc
27062753
auto* msg = ev->Get();
27072754
LWTRACK(ProposeTransactionRequest, msg->Orbit);
27082755

2756+
if (CheckDataTxRejectAndReply(ev, ctx)) {
2757+
IncCounter(COUNTER_PREPARE_REQUEST);
2758+
return;
2759+
}
2760+
27092761
// Check if we need to delay an immediate transaction
27102762
if (MediatorStateWaiting &&
27112763
(ev->Get()->GetFlags() & TTxFlags::Immediate) &&
@@ -2738,10 +2790,6 @@ void TDataShard::Handle(TEvDataShard::TEvProposeTransaction::TPtr &ev, const TAc
27382790

27392791
IncCounter(COUNTER_PREPARE_REQUEST);
27402792

2741-
if (CheckDataTxRejectAndReply(ev, ctx)) {
2742-
return;
2743-
}
2744-
27452793
switch (ev->Get()->GetTxKind()) {
27462794
case NKikimrTxDataShard::TX_KIND_DATA:
27472795
case NKikimrTxDataShard::TX_KIND_SCAN:

ydb/core/tx/datashard/datashard_split_dst.cpp

+34-5
Original file line numberDiff line numberDiff line change
@@ -175,6 +175,7 @@ class TDataShard::TTxSplitTransferSnapshot : public NTabletFlatExecutor::TTransa
175175

176176
LOG_DEBUG_S(ctx, NKikimrServices::TX_DATASHARD, Self->TabletID() << " Received snapshot for split/merge TxId " << opId
177177
<< " from tabeltId " << srcTabletId);
178+
LOG_TRACE_S(ctx, NKikimrServices::TX_DATASHARD, Self->TabletID() << " Received snapshot: " << record.DebugString());
178179

179180
if (!Self->DstSplitSchemaInitialized) {
180181
LegacyInitSchema(txc);
@@ -291,8 +292,9 @@ class TDataShard::TTxSplitTransferSnapshot : public NTabletFlatExecutor::TTransa
291292
Self->PromoteFollowerReadEdge(txc);
292293
}
293294

294-
Self->State = TShardState::Ready;
295-
Self->PersistSys(db, Schema::Sys_State, Self->State);
295+
// Note: we persist Ready, but keep current state in memory until Complete
296+
Self->SetPersistState(TShardState::Ready, txc);
297+
Self->State = TShardState::SplitDstReceivingSnapshot;
296298
}
297299

298300
return true;
@@ -306,9 +308,36 @@ class TDataShard::TTxSplitTransferSnapshot : public NTabletFlatExecutor::TTransa
306308

307309
ctx.Send(ackTo, new TEvDataShard::TEvSplitTransferSnapshotAck(opId, Self->TabletID()));
308310

309-
if (LastSnapshotReceived) {
310-
// We have received all the data, reload everything from the received system tables
311-
Self->Execute(Self->CreateTxInit(), ctx);
311+
// Note: we skip init in an unlikely event of state resetting between Execute and Complete
312+
if (LastSnapshotReceived && Self->State == TShardState::SplitDstReceivingSnapshot) {
313+
// We have received all the data, finish shard initialization
314+
// Note: previously we used TxInit, however received system tables
315+
// have been empty for years now, and since pipes are still open we
316+
// may receive requests between TxInit loading the Ready state and
317+
// its Complete method initializing everything properly. Instead
318+
// necessary steps are repeated here.
319+
Self->State = TShardState::Ready;
320+
321+
// We are already in StateWork, but we need to repeat many steps now that we are Ready
322+
Self->SwitchToWork(ctx);
323+
324+
// We can send the registration request now that we are ready
325+
Self->SendRegistrationRequestTimeCast(ctx);
326+
327+
// Initialize snapshot expiration queue with current context time
328+
Self->GetSnapshotManager().InitExpireQueue(ctx.Now());
329+
if (Self->GetSnapshotManager().HasExpiringSnapshots()) {
330+
Self->PlanCleanup(ctx);
331+
}
332+
333+
// Initialize change senders
334+
Self->KillChangeSender(ctx);
335+
Self->CreateChangeSender(ctx);
336+
Self->MaybeActivateChangeSender(ctx);
337+
Self->EmitHeartbeats();
338+
339+
// Switch mvcc state if needed
340+
Self->CheckMvccStateChangeCanStart(ctx);
312341
}
313342
}
314343
};

ydb/core/tx/datashard/datashard_split_src.cpp

+9
Original file line numberDiff line numberDiff line change
@@ -529,6 +529,15 @@ class TDataShard::TTxSplitPartitioningChanged : public NTabletFlatExecutor::TTra
529529
}
530530
}
531531

532+
if (!Self->MediatorDelayedReplies.empty()) {
533+
// We have some pending mediator replies, which must not be replied.
534+
// Unfortunately we may linger around for a long time, and clients
535+
// would keep awaiting replies for all that time. We have to make
536+
// sure those clients receive an appropriate disconnection error
537+
// instead.
538+
ctx.Send(Self->SelfId(), new TEvents::TEvPoison);
539+
}
540+
532541
// TODO: properly check if there are no loans
533542
Self->CheckStateChange(ctx);
534543
}

0 commit comments

Comments
 (0)