Skip to content

Commit 1d8796c

Browse files
authored
Merge 2036e13 into be94821
2 parents be94821 + 2036e13 commit 1d8796c

File tree

5 files changed

+633
-10
lines changed

5 files changed

+633
-10
lines changed

ydb/core/tx/datashard/create_table_unit.cpp

+1
Original file line numberDiff line numberDiff line change
@@ -86,6 +86,7 @@ EExecutionStatus TCreateTableUnit::Execute(TOperation::TPtr op,
8686
txc.DB.NoMoreReadsForTx();
8787
DataShard.SetPersistState(TShardState::Ready, txc);
8888
DataShard.CheckMvccStateChangeCanStart(ctx); // Recheck
89+
DataShard.SendRegistrationRequestTimeCast(ctx);
8990
}
9091

9192
return EExecutionStatus::DelayCompleteNoMoreRestarts;

ydb/core/tx/datashard/datashard.cpp

+58-10
Original file line numberDiff line numberDiff line change
@@ -398,8 +398,23 @@ void TDataShard::SendRegistrationRequestTimeCast(const TActorContext &ctx) {
398398
if (RegistrationSended)
399399
return;
400400

401-
if (!ProcessingParams)
401+
if (!ProcessingParams) {
402+
LOG_DEBUG_S(ctx, NKikimrServices::TX_DATASHARD, TabletID()
403+
<< " not sending time cast registration request in state "
404+
<< DatashardStateName(State)
405+
<< ": missing processing params");
402406
return;
407+
}
408+
409+
if (State == TShardState::WaitScheme ||
410+
State == TShardState::SplitDstReceivingSnapshot)
411+
{
412+
// We don't have all the necessary info yet
413+
LOG_DEBUG_S(ctx, NKikimrServices::TX_DATASHARD, TabletID()
414+
<< " not sending time cast registration request in state "
415+
<< DatashardStateName(State));
416+
return;
417+
}
403418

404419
LOG_INFO_S(ctx, NKikimrServices::TX_DATASHARD, "Send registration request to time cast "
405420
<< DatashardStateName(State) << " tabletId " << TabletID()
@@ -2027,6 +2042,13 @@ TRowVersion TDataShard::GetMvccTxVersion(EMvccTxMode mode, TOperation* op) const
20272042
}
20282043
}
20292044

2045+
LOG_TRACE_S(*TlsActivationContext, NKikimrServices::TX_DATASHARD, "GetMvccTxVersion at " << TabletID()
2046+
<< " CompleteEdge# " << SnapshotManager.GetCompleteEdge()
2047+
<< " IncompleteEdge# " << SnapshotManager.GetIncompleteEdge()
2048+
<< " UnprotectedReadEdge# " << SnapshotManager.GetUnprotectedReadEdge()
2049+
<< " ImmediateWriteEdge# " << SnapshotManager.GetImmediateWriteEdge()
2050+
<< " ImmediateWriteEdgeReplied# " << SnapshotManager.GetImmediateWriteEdgeReplied());
2051+
20302052
TRowVersion edge;
20312053
TRowVersion readEdge = Max(
20322054
SnapshotManager.GetCompleteEdge(),
@@ -2141,6 +2163,8 @@ TDataShard::TPromotePostExecuteEdges TDataShard::PromoteImmediatePostExecuteEdge
21412163
// We need to wait for completion until the flag is committed
21422164
res.WaitCompletion = true;
21432165
}
2166+
LOG_TRACE_S(*TlsActivationContext, NKikimrServices::TX_DATASHARD, "PromoteImmediatePostExecuteEdges at " << TabletID()
2167+
<< " promoting UnprotectedReadEdge to " << version);
21442168
SnapshotManager.PromoteUnprotectedReadEdge(version);
21452169

21462170
// We want to promote the complete edge when protected reads are
@@ -2303,6 +2327,19 @@ void TDataShard::SendAfterMediatorStepActivate(ui64 mediatorStep, const TActorCo
23032327
for (auto it = MediatorDelayedReplies.begin(); it != MediatorDelayedReplies.end();) {
23042328
const ui64 step = it->first.Step;
23052329

2330+
if (SrcSplitDescription) {
2331+
if (State == TShardState::SplitSrcSendingSnapshot ||
2332+
State == TShardState::SplitSrcWaitForPartitioningChanged ||
2333+
State == TShardState::PreOffline ||
2334+
State == TShardState::Offline)
2335+
{
2336+
// We cannot send replies, since dst shard is now in charge
2337+
// of keeping track of acknowledged writes. So we expect
2338+
// split src logic to reboot this shard later.
2339+
break;
2340+
}
2341+
}
2342+
23062343
if (step <= mediatorStep) {
23072344
SnapshotManager.PromoteImmediateWriteEdgeReplied(it->first);
23082345
Send(it->second.Target, it->second.Event.Release(), 0, it->second.Cookie);
@@ -2370,13 +2407,16 @@ void TDataShard::CheckMediatorStateRestored() {
23702407
// HEAD reads must include that in their results.
23712408
const ui64 waitStep = CoordinatorPrevReadStepMax;
23722409
const ui64 readStep = CoordinatorPrevReadStepMax;
2373-
2374-
LOG_DEBUG_S(*TlsActivationContext, NKikimrServices::TX_DATASHARD, "CheckMediatorStateRestored: waitStep# " << waitStep << " readStep# " << readStep);
2410+
const ui64 observedStep = GetMaxObservedStep();
2411+
LOG_DEBUG_S(*TlsActivationContext, NKikimrServices::TX_DATASHARD, "CheckMediatorStateRestored at " << TabletID() << ":"
2412+
<< " waitStep# " << waitStep
2413+
<< " readStep# " << readStep
2414+
<< " observedStep# " << observedStep);
23752415

23762416
// WARNING: we must perform this check BEFORE we update unprotected read edge
23772417
// We may enter this code path multiple times, and we expect that the above
23782418
// read step may be refined while we wait based on pessimistic backup step.
2379-
if (GetMaxObservedStep() < waitStep) {
2419+
if (observedStep < waitStep) {
23802420
// We need to wait until we observe mediator step that is at least
23812421
// as large as the step we found.
23822422
if (MediatorTimeCastWaitingSteps.insert(waitStep).second) {
@@ -2397,7 +2437,10 @@ void TDataShard::CheckMediatorStateRestored() {
23972437
SnapshotManager.GetImmediateWriteEdge().Step > SnapshotManager.GetCompleteEdge().Step
23982438
? SnapshotManager.GetImmediateWriteEdge().Prev()
23992439
: TRowVersion::Min();
2400-
SnapshotManager.PromoteUnprotectedReadEdge(Max(lastReadEdge, preImmediateWriteEdge));
2440+
const TRowVersion edge = Max(lastReadEdge, preImmediateWriteEdge);
2441+
LOG_TRACE_S(*TlsActivationContext, NKikimrServices::TX_DATASHARD, "CheckMediatorStateRestored at " << TabletID()
2442+
<< " promoting UnprotectedReadEdge to " << edge);
2443+
SnapshotManager.PromoteUnprotectedReadEdge(edge);
24012444
}
24022445

24032446
// Promote the replied immediate write edge up to the currently observed step
@@ -2406,7 +2449,7 @@ void TDataShard::CheckMediatorStateRestored() {
24062449
// data that is definitely not replied yet.
24072450
if (SnapshotManager.GetImmediateWriteEdgeReplied() < SnapshotManager.GetImmediateWriteEdge()) {
24082451
const ui64 writeStep = SnapshotManager.GetImmediateWriteEdge().Step;
2409-
const TRowVersion edge(GetMaxObservedStep(), Max<ui64>());
2452+
const TRowVersion edge(observedStep, Max<ui64>());
24102453
SnapshotManager.PromoteImmediateWriteEdgeReplied(
24112454
Min(edge, SnapshotManager.GetImmediateWriteEdge()));
24122455
// Try to ensure writes become visible sooner rather than later
@@ -2543,6 +2586,10 @@ bool TDataShard::CheckDataTxReject(const TString& opDescr,
25432586
rejectDescriptions.push_back(TStringBuilder()
25442587
<< "is in process of split opId " << DstSplitOpId
25452588
<< " state " << DatashardStateName(State));
2589+
} else if (State == TShardState::WaitScheme) {
2590+
reject = true;
2591+
rejectReasons |= ERejectReasons::WrongState;
2592+
rejectDescriptions.push_back("is not created yet");
25462593
} else if (State == TShardState::PreOffline || State == TShardState::Offline) {
25472594
reject = true;
25482595
rejectStatus = NKikimrTxDataShard::TEvProposeTransactionResult::ERROR;
@@ -2705,6 +2752,11 @@ void TDataShard::Handle(TEvDataShard::TEvProposeTransaction::TPtr &ev, const TAc
27052752
auto* msg = ev->Get();
27062753
LWTRACK(ProposeTransactionRequest, msg->Orbit);
27072754

2755+
if (CheckDataTxRejectAndReply(ev, ctx)) {
2756+
IncCounter(COUNTER_PREPARE_REQUEST);
2757+
return;
2758+
}
2759+
27082760
// Check if we need to delay an immediate transaction
27092761
if (MediatorStateWaiting &&
27102762
(ev->Get()->GetFlags() & TTxFlags::Immediate) &&
@@ -2737,10 +2789,6 @@ void TDataShard::Handle(TEvDataShard::TEvProposeTransaction::TPtr &ev, const TAc
27372789

27382790
IncCounter(COUNTER_PREPARE_REQUEST);
27392791

2740-
if (CheckDataTxRejectAndReply(ev, ctx)) {
2741-
return;
2742-
}
2743-
27442792
switch (ev->Get()->GetTxKind()) {
27452793
case NKikimrTxDataShard::TX_KIND_DATA:
27462794
case NKikimrTxDataShard::TX_KIND_SCAN:

ydb/core/tx/datashard/datashard_split_dst.cpp

+2
Original file line numberDiff line numberDiff line change
@@ -175,6 +175,7 @@ class TDataShard::TTxSplitTransferSnapshot : public NTabletFlatExecutor::TTransa
175175

176176
LOG_DEBUG_S(ctx, NKikimrServices::TX_DATASHARD, Self->TabletID() << " Received snapshot for split/merge TxId " << opId
177177
<< " from tabeltId " << srcTabletId);
178+
LOG_TRACE_S(ctx, NKikimrServices::TX_DATASHARD, Self->TabletID() << " Received snapshot: " << record.DebugString());
178179

179180
if (!Self->DstSplitSchemaInitialized) {
180181
LegacyInitSchema(txc);
@@ -293,6 +294,7 @@ class TDataShard::TTxSplitTransferSnapshot : public NTabletFlatExecutor::TTransa
293294

294295
Self->State = TShardState::Ready;
295296
Self->PersistSys(db, Schema::Sys_State, Self->State);
297+
Self->SendRegistrationRequestTimeCast(ctx);
296298
}
297299

298300
return true;

ydb/core/tx/datashard/datashard_split_src.cpp

+9
Original file line numberDiff line numberDiff line change
@@ -529,6 +529,15 @@ class TDataShard::TTxSplitPartitioningChanged : public NTabletFlatExecutor::TTra
529529
}
530530
}
531531

532+
if (!Self->MediatorDelayedReplies.empty()) {
533+
// We have some pending mediator replies, which must not be replied.
534+
// Unfortunately we may linger around for a long time, and clients
535+
// would keep awaiting replies for all that time. We have to make
536+
// sure those clients receive an appropriate disconnection error
537+
// instead.
538+
ctx.Send(Self->SelfId(), new TEvents::TEvPoison);
539+
}
540+
532541
// TODO: properly check if there are no loans
533542
Self->CheckStateChange(ctx);
534543
}

0 commit comments

Comments
 (0)