Skip to content

Commit ec02f9b

Browse files
authored
YQ-3859 Check semaphore generation in row dispatcher (#11572)
1 parent 09771d9 commit ec02f9b

File tree

5 files changed

+31
-18
lines changed

5 files changed

+31
-18
lines changed

ydb/core/fq/libs/row_dispatcher/events/data_plane.h

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -33,10 +33,12 @@ struct TEvRowDispatcher {
3333
};
3434

3535
struct TEvCoordinatorChanged : NActors::TEventLocal<TEvCoordinatorChanged, EEv::EvCoordinatorChanged> {
36-
TEvCoordinatorChanged(NActors::TActorId coordinatorActorId)
37-
: CoordinatorActorId(coordinatorActorId) {
36+
TEvCoordinatorChanged(NActors::TActorId coordinatorActorId, ui64 generation)
37+
: CoordinatorActorId(coordinatorActorId)
38+
, Generation(generation) {
3839
}
3940
NActors::TActorId CoordinatorActorId;
41+
ui64 Generation = 0;
4042
};
4143

4244
struct TEvCoordinatorChangesSubscribe : public NActors::TEventLocal<TEvCoordinatorChangesSubscribe, EEv::EvCoordinatorChangesSubscribe> {};

ydb/core/fq/libs/row_dispatcher/leader_election.cpp

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -223,7 +223,7 @@ void TLeaderElection::Bootstrap() {
223223
LogPrefix = "TLeaderElection " + SelfId().ToString() + " ";
224224
LOG_ROW_DISPATCHER_DEBUG("Successfully bootstrapped, local coordinator id " << CoordinatorId.ToString());
225225
if (Config.GetLocalMode()) {
226-
TActivationContext::ActorSystem()->Send(ParentId, new NFq::TEvRowDispatcher::TEvCoordinatorChanged(CoordinatorId));
226+
TActivationContext::ActorSystem()->Send(ParentId, new NFq::TEvRowDispatcher::TEvCoordinatorChanged(CoordinatorId, 0));
227227
return;
228228
}
229229
ProcessState();
@@ -445,17 +445,19 @@ void TLeaderElection::Handle(TEvPrivate::TEvDescribeSemaphoreResult::TPtr& ev) {
445445
// Wait OnChanged.
446446
return;
447447
}
448-
TString data = description.GetOwners()[0].GetData();
448+
const auto& session = description.GetOwners()[0];
449+
TString data = session.GetData();
450+
auto generation = session.GetOrderId();
449451
NActorsProto::TActorId protoId;
450452
if (!protoId.ParseFromString(data)) {
451453
Y_ABORT("ParseFromString");
452454
}
453455

454456
NActors::TActorId id = ActorIdFromProto(protoId);
455-
LOG_ROW_DISPATCHER_DEBUG("Semaphore successfully described: coordinator id " << id);
457+
LOG_ROW_DISPATCHER_DEBUG("Semaphore successfully described: coordinator id " << id << " generation " << generation);
456458
if (!LeaderActorId || (*LeaderActorId != id)) {
457459
LOG_ROW_DISPATCHER_INFO("Send TEvCoordinatorChanged to " << ParentId);
458-
TActivationContext::ActorSystem()->Send(ParentId, new NFq::TEvRowDispatcher::TEvCoordinatorChanged(id));
460+
TActivationContext::ActorSystem()->Send(ParentId, new NFq::TEvRowDispatcher::TEvCoordinatorChanged(id, generation));
459461
Metrics.LeaderChangedCount->Inc();
460462
}
461463
LeaderActorId = id;

ydb/core/fq/libs/row_dispatcher/row_dispatcher.cpp

Lines changed: 9 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -218,6 +218,7 @@ class TRowDispatcher : public TActorBootstrapped<TRowDispatcher> {
218218
IPureCalcProgramFactory::TPtr PureCalcProgramFactory;
219219
TYqSharedResources::TPtr YqSharedResources;
220220
TMaybe<TActorId> CoordinatorActorId;
221+
ui64 CoordinatorGeneration = 0;
221222
TSet<TActorId> CoordinatorChangedSubscribers;
222223
NYql::ISecuredServiceAccountCredentialsFactory::TPtr CredentialsFactory;
223224
const TString LogPrefix;
@@ -395,14 +396,18 @@ void TRowDispatcher::Bootstrap() {
395396
}
396397

397398
void TRowDispatcher::Handle(NFq::TEvRowDispatcher::TEvCoordinatorChanged::TPtr& ev) {
398-
LOG_ROW_DISPATCHER_DEBUG("Coordinator changed, old leader " << CoordinatorActorId << ", new " << ev->Get()->CoordinatorActorId);
399-
399+
LOG_ROW_DISPATCHER_DEBUG("Coordinator changed, old leader " << CoordinatorActorId << ", new " << ev->Get()->CoordinatorActorId << " generation " << ev->Get()->Generation);
400+
if (ev->Get()->Generation < CoordinatorGeneration) {
401+
LOG_ROW_DISPATCHER_ERROR("New generation (" << ev->Get()->Generation << ") is less previous (" << CoordinatorGeneration << "), ignore updates");
402+
return;
403+
}
400404
CoordinatorActorId = ev->Get()->CoordinatorActorId;
405+
CoordinatorGeneration = ev->Get()->Generation;
401406
Send(*CoordinatorActorId, new NActors::TEvents::TEvPing(), IEventHandle::FlagTrackDelivery);
402407
for (auto actorId : CoordinatorChangedSubscribers) {
403408
Send(
404409
actorId,
405-
new NFq::TEvRowDispatcher::TEvCoordinatorChanged(ev->Get()->CoordinatorActorId),
410+
new NFq::TEvRowDispatcher::TEvCoordinatorChanged(*CoordinatorActorId, CoordinatorGeneration),
406411
IEventHandle::FlagTrackDelivery);
407412
}
408413
}
@@ -450,7 +455,7 @@ void TRowDispatcher::Handle(NFq::TEvRowDispatcher::TEvCoordinatorChangesSubscrib
450455
if (!CoordinatorActorId) {
451456
return;
452457
}
453-
Send(ev->Sender, new NFq::TEvRowDispatcher::TEvCoordinatorChanged(*CoordinatorActorId), IEventHandle::FlagTrackDelivery);
458+
Send(ev->Sender, new NFq::TEvRowDispatcher::TEvCoordinatorChanged(*CoordinatorActorId, CoordinatorGeneration), IEventHandle::FlagTrackDelivery);
454459
}
455460

456461
void TRowDispatcher::UpdateMetrics() {

ydb/core/fq/libs/row_dispatcher/ut/row_dispatcher_ut.cpp

Lines changed: 11 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -70,7 +70,8 @@ class TFixture : public NUnitTest::TBaseFixture {
7070
auto yqSharedResources = NFq::TYqSharedResources::Cast(NFq::CreateYqSharedResourcesImpl({}, credFactory, MakeIntrusive<NMonitoring::TDynamicCounters>()));
7171

7272
NYql::ISecuredServiceAccountCredentialsFactory::TPtr credentialsFactory;
73-
Coordinator = Runtime.AllocateEdgeActor();
73+
Coordinator1 = Runtime.AllocateEdgeActor();
74+
Coordinator2 = Runtime.AllocateEdgeActor();
7475
EdgeActor = Runtime.AllocateEdgeActor();
7576
ReadActorId1 = Runtime.AllocateEdgeActor();
7677
ReadActorId2 = Runtime.AllocateEdgeActor();
@@ -221,7 +222,8 @@ class TFixture : public NUnitTest::TBaseFixture {
221222
TActorSystemStub actorSystemStub;
222223
NActors::TTestActorRuntime Runtime;
223224
NActors::TActorId RowDispatcher;
224-
NActors::TActorId Coordinator;
225+
NActors::TActorId Coordinator1;
226+
NActors::TActorId Coordinator2;
225227
NActors::TActorId EdgeActor;
226228
NActors::TActorId ReadActorId1;
227229
NActors::TActorId ReadActorId2;
@@ -278,27 +280,29 @@ Y_UNIT_TEST_SUITE(RowDispatcherTests) {
278280
}
279281

280282
Y_UNIT_TEST_F(CoordinatorSubscribe, TFixture) {
281-
Runtime.Send(new IEventHandle(RowDispatcher, EdgeActor, new NFq::TEvRowDispatcher::TEvCoordinatorChanged(Coordinator)));
283+
Runtime.Send(new IEventHandle(RowDispatcher, EdgeActor, new NFq::TEvRowDispatcher::TEvCoordinatorChanged(Coordinator1, 10)));
284+
Runtime.Send(new IEventHandle(RowDispatcher, EdgeActor, new NFq::TEvRowDispatcher::TEvCoordinatorChanged(Coordinator2, 9))); // ignore
285+
282286
Runtime.Send(new IEventHandle(RowDispatcher, ReadActorId1, new NFq::TEvRowDispatcher::TEvCoordinatorChangesSubscribe));
283287

284288
auto eventHolder = Runtime.GrabEdgeEvent<NFq::TEvRowDispatcher::TEvCoordinatorChanged>(ReadActorId1);
285289
UNIT_ASSERT(eventHolder.Get() != nullptr);
286-
UNIT_ASSERT(eventHolder->Get()->CoordinatorActorId == Coordinator);
290+
UNIT_ASSERT(eventHolder->Get()->CoordinatorActorId == Coordinator1);
287291
}
288292

289293
Y_UNIT_TEST_F(CoordinatorSubscribeBeforeCoordinatorChanged, TFixture) {
290294
Runtime.Send(new IEventHandle(RowDispatcher, ReadActorId1, new NFq::TEvRowDispatcher::TEvCoordinatorChangesSubscribe));
291295
Runtime.Send(new IEventHandle(RowDispatcher, ReadActorId2, new NFq::TEvRowDispatcher::TEvCoordinatorChangesSubscribe));
292296

293-
Runtime.Send(new IEventHandle(RowDispatcher, EdgeActor, new NFq::TEvRowDispatcher::TEvCoordinatorChanged(Coordinator)));
297+
Runtime.Send(new IEventHandle(RowDispatcher, EdgeActor, new NFq::TEvRowDispatcher::TEvCoordinatorChanged(Coordinator1, 0)));
294298

295299
auto eventHolder = Runtime.GrabEdgeEvent<NFq::TEvRowDispatcher::TEvCoordinatorChanged>(ReadActorId1);
296300
UNIT_ASSERT(eventHolder.Get() != nullptr);
297-
UNIT_ASSERT(eventHolder->Get()->CoordinatorActorId == Coordinator);
301+
UNIT_ASSERT(eventHolder->Get()->CoordinatorActorId == Coordinator1);
298302

299303
eventHolder = Runtime.GrabEdgeEvent<NFq::TEvRowDispatcher::TEvCoordinatorChanged>(ReadActorId2);
300304
UNIT_ASSERT(eventHolder.Get() != nullptr);
301-
UNIT_ASSERT(eventHolder->Get()->CoordinatorActorId == Coordinator);
305+
UNIT_ASSERT(eventHolder->Get()->CoordinatorActorId == Coordinator1);
302306
}
303307

304308
Y_UNIT_TEST_F(TwoClients4Sessions, TFixture) {

ydb/tests/fq/pq_async_io/ut/dq_pq_rd_read_actor_ut.cpp

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -90,7 +90,7 @@ struct TFixture : public TPqIoTestFixture {
9090

9191
void MockCoordinatorChanged(NActors::TActorId coordinatorId) {
9292
CaSetup->Execute([&](TFakeActor& actor) {
93-
auto event = new NFq::TEvRowDispatcher::TEvCoordinatorChanged(coordinatorId);
93+
auto event = new NFq::TEvRowDispatcher::TEvCoordinatorChanged(coordinatorId, 0);
9494
CaSetup->Runtime->Send(new NActors::IEventHandle(*actor.DqAsyncInputActorId, LocalRowDispatcherId, event));
9595
});
9696
}

0 commit comments

Comments
 (0)