Skip to content

Commit 782ac23

Browse files
authored
Fix lost coordinator steps during mediator reconnect race (#2037) (#2056)
1 parent cfdbc77 commit 782ac23

File tree

2 files changed

+101
-1
lines changed

2 files changed

+101
-1
lines changed

ydb/core/tx/coordinator/coordinator_volatile_ut.cpp

+99
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
#include <ydb/core/tx/coordinator/public/events.h>
2+
#include <ydb/core/tx/coordinator/coordinator_impl.h>
23
#include <ydb/core/tx/tx.h>
34
#include <ydb/core/tx/tx_processing.h>
45
#include <ydb/core/tablet_flat/tablet_flat_executed.h>
@@ -258,6 +259,104 @@ namespace NKikimr::NFlatTxCoordinator::NTest {
258259
observedSteps.clear();
259260
}
260261

262+
Y_UNIT_TEST(MediatorReconnectPlanRace) {
263+
TPortManager pm;
264+
TServerSettings serverSettings(pm.GetPort(2134));
265+
serverSettings.SetDomainName("Root")
266+
.SetNodeCount(1)
267+
.SetUseRealThreads(false);
268+
269+
Tests::TServer::TPtr server = new TServer(serverSettings);
270+
271+
auto &runtime = *server->GetRuntime();
272+
runtime.SetLogPriority(NKikimrServices::TX_COORDINATOR, NActors::NLog::PRI_DEBUG);
273+
runtime.SetLogPriority(NKikimrServices::BOOTSTRAPPER, NActors::NLog::PRI_DEBUG);
274+
275+
auto sender = runtime.AllocateEdgeActor();
276+
ui64 coordinatorId = ChangeStateStorage(Coordinator, server->GetSettings().Domain);
277+
ui64 mediatorId = ChangeStateStorage(Mediator, server->GetSettings().Domain);
278+
ui64 tabletId = ChangeStateStorage(TTestTxConfig::TxTablet0, server->GetSettings().Domain);
279+
280+
CreateTestBootstrapper(runtime,
281+
CreateTestTabletInfo(tabletId, TTabletTypes::Dummy),
282+
[](const TActorId& tablet, TTabletStorageInfo* info) {
283+
return new TPlanTargetTablet(tablet, info);
284+
});
285+
286+
{
287+
TDispatchOptions options;
288+
options.FinalEvents.push_back(TDispatchOptions::TFinalEventCondition(TEvTablet::EvBoot, 1));
289+
runtime.DispatchEvents(options);
290+
}
291+
292+
TActorId mediatorQueue;
293+
std::vector<std::unique_ptr<IEventHandle>> mediatorQueueSteps;
294+
auto blockMediatorQueueSteps = runtime.AddObserver<TEvMediatorQueueStep>([&](TEvMediatorQueueStep::TPtr& ev) {
295+
mediatorQueue = ev->GetRecipientRewrite();
296+
mediatorQueueSteps.emplace_back(ev.Release());
297+
Cerr << "... blocked TEvMediatorQueueStep for " << mediatorQueue << Endl;
298+
});
299+
300+
std::vector<ui64> observedSteps;
301+
auto stepsObserver = runtime.AddObserver<TEvTxProcessing::TEvPlanStep>([&](TEvTxProcessing::TEvPlanStep::TPtr& ev) {
302+
auto* msg = ev->Get();
303+
observedSteps.push_back(msg->Record.GetStep());
304+
});
305+
306+
auto waitFor = [&](const auto& condition, const TString& description) {
307+
for (int i = 0; i < 5 && !condition(); ++i) {
308+
Cerr << "... waiting for " << description << Endl;
309+
TDispatchOptions options;
310+
options.CustomFinalCondition = [&]() {
311+
return condition();
312+
};
313+
runtime.DispatchEvents(options);
314+
}
315+
UNIT_ASSERT_C(condition(), "... failed to wait for " << description);
316+
};
317+
318+
ui64 txId = 12345678;
319+
if (auto propose = std::make_unique<TEvTxProxy::TEvProposeTransaction>(coordinatorId, txId, 0, Min<ui64>(), Max<ui64>())) {
320+
auto* tx = propose->Record.MutableTransaction();
321+
// Not necessary, but we test volatile transactions here
322+
tx->SetFlags(TEvTxProxy::TEvProposeTransaction::FlagVolatile);
323+
auto* affected = tx->AddAffectedSet();
324+
affected->SetTabletId(tabletId);
325+
affected->SetFlags(TEvTxProxy::TEvProposeTransaction::AffectedWrite);
326+
327+
runtime.SendToPipe(coordinatorId, sender, propose.release());
328+
}
329+
330+
waitFor([&]{ return mediatorQueueSteps.size() > 0; }, "TEvMediatorQueueStep");
331+
UNIT_ASSERT_VALUES_EQUAL(mediatorQueueSteps.size(), 1u);
332+
333+
// We shouldn't see any steps yet
334+
UNIT_ASSERT_VALUES_EQUAL(observedSteps.size(), 0u);
335+
336+
auto injectMediatorQueueStep = runtime.AddObserver<TEvTabletPipe::TEvClientDestroyed>([&](TEvTabletPipe::TEvClientDestroyed::TPtr& ev) {
337+
if (ev->GetRecipientRewrite() == mediatorQueue) {
338+
Cerr << "... found pipe disconnect at " << mediatorQueue << Endl;
339+
// Stop blocking mediator queue steps
340+
// This seems to be safe, since we remove someone else from std::list
341+
blockMediatorQueueSteps.Remove();
342+
// Inject blocked mediator steps into queue mailbox, they will be handled after the disconnect
343+
for (auto& ev : mediatorQueueSteps) {
344+
runtime.Send(ev.release(), 0, true);
345+
}
346+
mediatorQueueSteps.clear();
347+
}
348+
});
349+
350+
Cerr << "... rebooting mediator" << Endl;
351+
RebootTablet(runtime, mediatorId, sender);
352+
353+
waitFor([&]{ return mediatorQueueSteps.empty(); }, "injected mediator steps");
354+
355+
// We must observe the plan step soon
356+
runtime.SimulateSleep(TDuration::Seconds(2));
357+
UNIT_ASSERT_VALUES_EQUAL(observedSteps.size(), 1u);
358+
}
359+
261360
} // Y_UNIT_TEST_SUITE(CoordinatorVolatile)
262361

263362
} // namespace NKikimr::NFlatTxCoordinator::NTest

ydb/core/tx/coordinator/mediator_queue.cpp

+2-1
Original file line numberDiff line numberDiff line change
@@ -284,6 +284,7 @@ class TTxCoordinatorMediatorQueue : public TActorBootstrapped<TTxCoordinatorMedi
284284

285285
STFUNC(StateSync) {
286286
switch (ev->GetTypeRewrite()) {
287+
HFunc(TEvMediatorQueueStep, Handle);
287288
HFunc(TEvTxProcessing::TEvPlanStepAck, Handle);
288289
HFunc(TEvTxCoordinator::TEvCoordinatorSyncResult, Handle);
289290
HFunc(TEvTabletPipe::TEvClientConnected, Handle);
@@ -294,8 +295,8 @@ class TTxCoordinatorMediatorQueue : public TActorBootstrapped<TTxCoordinatorMedi
294295

295296
STFUNC(StateWork) {
296297
switch (ev->GetTypeRewrite()) {
297-
HFunc(TEvTxProcessing::TEvPlanStepAck, Handle);
298298
HFunc(TEvMediatorQueueStep, Handle);
299+
HFunc(TEvTxProcessing::TEvPlanStepAck, Handle);
299300
HFunc(TEvTabletPipe::TEvClientDestroyed, Handle);
300301
CFunc(TEvents::TSystem::PoisonPill, Die)
301302
}

0 commit comments

Comments
 (0)