Skip to content

Commit 28859b5

Browse files
authored
24-2: Fix bugs in coordinator state migration (#6461)
1 parent ef7b8bc commit 28859b5

11 files changed

+613
-10
lines changed

ydb/core/tx/coordinator/coordinator__acquire_read_step.cpp

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -103,7 +103,12 @@ void TTxCoordinator::Handle(TEvTxProxy::TEvAcquireReadStep::TPtr& ev, const TAct
103103
return;
104104
}
105105

106-
if (ReadOnlyLeaseEnabled()) {
106+
// Note: when volatile state is preserved we don't want to update the last
107+
// acquired step, because the new generation might miss that and invariants
108+
// not read-step not going back would be violated. Run the code below using
109+
// the normal tx, which will almost certainly fail (the storage is supposed
110+
// to be blocked already), or successfully persist the new read step.
111+
if (ReadOnlyLeaseEnabled() && !VolatileState.Preserved) {
107112
// We acquire read step using a read-only lease from executor
108113
// It is guaranteed that any future generation was not running at
109114
// the time ConfirmReadOnlyLease was called.

ydb/core/tx/coordinator/coordinator__plan_step.cpp

Lines changed: 24 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
#include "coordinator_impl.h"
2+
#include "coordinator_hooks.h"
23

34
#include <util/generic/hash_set.h>
45

@@ -42,7 +43,29 @@ struct TTxCoordinator::TTxPlanStep : public TTransactionBase<TTxCoordinator> {
4243
}
4344

4445
void Plan(TTransactionContext &txc, const TActorContext &ctx) {
45-
Y_UNUSED(txc);
46+
if (Self->VolatileState.Preserved) {
47+
// A preserved state indicates a newer generation has been started
48+
// already, and this coordinator will stop eventually. Decline
49+
// all pending transactions.
50+
for (auto& slot : Slots) {
51+
for (auto& proposal : slot) {
52+
Self->MonCounters.StepPlannedDeclinedTx->Inc();
53+
ProxyPlanConfirmations.Queue.emplace_back(
54+
proposal.TxId,
55+
proposal.Proxy,
56+
TEvTxProxy::TEvProposeTransactionStatus::EStatus::StatusRestarting,
57+
0);
58+
++DeclinedCounter;
59+
}
60+
}
61+
Self->SendStepConfirmations(ProxyPlanConfirmations, ctx);
62+
return;
63+
}
64+
65+
if (auto* hooks = ICoordinatorHooks::Get(); Y_UNLIKELY(hooks)) {
66+
hooks->BeginPlanStep(Self->TabletID(), Self->Executor()->Generation(), PlanOnStep);
67+
}
68+
4669
NIceDb::TNiceDb db(txc.DB);
4770
ExecStartMoment = ctx.Now();
4871
const bool lowDiskSpace = Self->Executor()->GetStats().IsAnyChannelYellowStop;

ydb/core/tx/coordinator/coordinator__restore_transaction.cpp

Lines changed: 16 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -125,10 +125,12 @@ struct TTxCoordinator::TTxRestoreTransactions : public TTransactionBase<TTxCoord
125125
return true;
126126
}
127127

128-
void RestoreVolatileSteps() {
128+
TStepId RestoreVolatileSteps() {
129+
TStepId maxStep = 0;
129130
for (auto &pr : Self->VolatileTransactions) {
130131
auto txId = pr.first;
131132
auto &tx = pr.second;
133+
maxStep = Max(maxStep, tx.PlanOnStep);
132134
for (auto &prmed : tx.UnconfirmedAffectedSet) {
133135
auto medId = prmed.first;
134136
auto &medTx = GetMediatorTx(medId, tx.PlanOnStep, txId);
@@ -137,6 +139,7 @@ struct TTxCoordinator::TTxRestoreTransactions : public TTransactionBase<TTxCoord
137139
}
138140
}
139141
}
142+
return maxStep;
140143
}
141144

142145
TTxType GetTxType() const override { return TXTYPE_INIT; }
@@ -146,15 +149,24 @@ struct TTxCoordinator::TTxRestoreTransactions : public TTransactionBase<TTxCoord
146149
bool result = Restore(transactions, txc, ctx);
147150
if (!result)
148151
return false;
149-
RestoreVolatileSteps();
152+
TStepId maxVolatileStep = RestoreVolatileSteps();
150153
i64 txCounter = transactions.size() + Self->VolatileTransactions.size();
151154
Self->Transactions.swap(transactions);
152155
*Self->MonCounters.TxInFly += txCounter;
153156
Self->MonCounters.CurrentTxInFly = txCounter;
154157

155-
if (Self->PrevStateActorId) {
156-
NIceDb::TNiceDb db(txc.DB);
158+
NIceDb::TNiceDb db(txc.DB);
157159

160+
// Previous coordinator might have had transactions that were after
161+
// its persistent blocked range, but before LastPlanned was updated.
162+
// Since we pick them up as planned and send to mediators we also need
163+
// to make sure LastPlanned reflects that.
164+
if (Self->VolatileState.LastPlanned < maxVolatileStep) {
165+
Self->VolatileState.LastPlanned = maxVolatileStep;
166+
Schema::SaveState(db, Schema::State::KeyLastPlanned, maxVolatileStep);
167+
}
168+
169+
if (Self->PrevStateActorId) {
158170
ui64 volatileLeaseMs = Self->VolatilePlanLeaseMs;
159171
if (volatileLeaseMs > 0) {
160172
// Make sure we start and persist new state actor before allowing clients to acquire new read steps

ydb/core/tx/coordinator/coordinator_hooks.cpp

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,12 @@ namespace NKikimr::NFlatTxCoordinator {
1414
return true;
1515
}
1616

17+
void ICoordinatorHooks::BeginPlanStep(ui64 tabletId, ui64 generation, ui64 planStep) {
18+
Y_UNUSED(tabletId);
19+
Y_UNUSED(generation);
20+
Y_UNUSED(planStep);
21+
}
22+
1723
ICoordinatorHooks* ICoordinatorHooks::Get() {
1824
return CoordinatorHooks.load(std::memory_order_acquire);
1925
}

ydb/core/tx/coordinator/coordinator_hooks.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@ namespace NKikimr::NFlatTxCoordinator {
1010

1111
public:
1212
virtual bool PersistConfig(ui64 tabletId, const NKikimrSubDomains::TProcessingParams& config);
13+
virtual void BeginPlanStep(ui64 tabletId, ui64 generation, ui64 planStep);
1314

1415
public:
1516
static ICoordinatorHooks* Get();

ydb/core/tx/coordinator/coordinator_impl.cpp

Lines changed: 12 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -328,6 +328,11 @@ ui64 TTxCoordinator::AlignPlanStep(ui64 step) {
328328
void TTxCoordinator::Handle(TEvPrivate::TEvPlanTick::TPtr &ev, const TActorContext &ctx) {
329329
//LOG_DEBUG_S(ctx, NKikimrServices::TX_COORDINATOR, "tablet# " << TabletID() << " HANDLE EvPlanTick LastPlanned " << VolatileState.LastPlanned);
330330

331+
if (VolatileState.Preserved) {
332+
// Avoid planning any new transactions, wait until we are stopped
333+
return;
334+
}
335+
331336
ui64 next = ev->Get()->Step;
332337
while (!PendingPlanTicks.empty() && PendingPlanTicks.front() <= next) {
333338
PendingPlanTicks.pop_front();
@@ -556,8 +561,14 @@ void TTxCoordinator::TryInitMonCounters(const TActorContext &ctx) {
556561
}
557562

558563
void TTxCoordinator::SendMediatorStep(TMediator &mediator, const TActorContext &ctx) {
564+
if (VolatileState.Preserved) {
565+
// We don't want to send new steps when state has been preserved and
566+
// potentially sent to newer generations.
567+
return;
568+
}
569+
559570
if (!mediator.Active) {
560-
// We don't want to update LastSentStep when mediators are not empty
571+
// We don't want to update LastSentStep when mediators are not connected
561572
return;
562573
}
563574

ydb/core/tx/coordinator/coordinator_impl.h

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -433,6 +433,10 @@ class TTxCoordinator : public TActor<TTxCoordinator>, public TTabletExecutedFlat
433433
TVector<TAcquireReadStepRequest> AcquireReadStepPending;
434434
bool AcquireReadStepFlushing = false;
435435
bool AcquireReadStepStarting = false;
436+
437+
// When true the state has been preserved by the state actor
438+
// Any changes will not be migrated to newer generations
439+
bool Preserved = false;
436440
};
437441

438442
public:

ydb/core/tx/coordinator/coordinator_state.cpp

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -80,6 +80,7 @@ void TCoordinatorStateActor::PreserveState() {
8080
Y_ABORT_UNLESS(ok);
8181
}
8282

83+
Owner->VolatileState.Preserved = true;
8384
}
8485

8586
STFUNC(TCoordinatorStateActor::StateWork) {

0 commit comments

Comments
 (0)