Skip to content

Commit f3b158a

Browse files
authored
Process heartbeats (#12333)
Test is flaky
1 parent 7b9edec commit f3b158a

File tree

9 files changed

+187
-2
lines changed

9 files changed

+187
-2
lines changed

ydb/core/protos/counters_replication.proto

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -46,4 +46,5 @@ enum ETxTypes {
4646
TXTYPE_DESCRIBE_REPLICATION = 13 [(TxTypeOpts) = {Name: "TxDescribeReplication"}];
4747
TXTYPE_WORKER_ERROR = 14 [(TxTypeOpts) = {Name: "TxWorkerError"}];
4848
TXTYPE_ASSIGN_TX_ID = 15 [(TxTypeOpts) = {Name: "TxAssignTxId"}];
49+
TXTYPE_HEARTBEAT = 16 [(TxTypeOpts) = {Name: "TxHeartbeat"}];
4950
}

ydb/core/tx/replication/controller/controller.cpp

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -80,6 +80,7 @@ STFUNC(TController::StateWork) {
8080
HFunc(TEvService::TEvRunWorker, Handle);
8181
HFunc(TEvService::TEvWorkerDataEnd, Handle);
8282
HFunc(TEvService::TEvGetTxId, Handle);
83+
HFunc(TEvService::TEvHeartbeat, Handle);
8384
HFunc(TEvTxAllocatorClient::TEvAllocateResult, Handle);
8485
HFunc(TEvInterconnect::TEvNodeDisconnected, Handle);
8586
default:
@@ -776,6 +777,22 @@ void TController::Handle(TEvService::TEvGetTxId::TPtr& ev, const TActorContext&
776777
RunTxAssignTxId(ctx);
777778
}
778779

780+
void TController::Handle(TEvService::TEvHeartbeat::TPtr& ev, const TActorContext& ctx) {
781+
CLOG_T(ctx, "Handle " << ev->Get()->ToString());
782+
783+
const auto nodeId = ev->Sender.NodeId();
784+
if (!Sessions.contains(nodeId)) {
785+
return;
786+
}
787+
788+
const auto& record = ev->Get()->Record;
789+
const auto id = TWorkerId::Parse(record.GetWorker());
790+
const auto version = TRowVersion::Parse(record.GetVersion());
791+
PendingHeartbeats[id] = version;
792+
793+
RunTxHeartbeat(ctx);
794+
}
795+
779796
void TController::Handle(TEvInterconnect::TEvNodeDisconnected::TPtr& ev, const TActorContext& ctx) {
780797
const ui32 nodeId = ev->Get()->NodeId;
781798

ydb/core/tx/replication/controller/controller_impl.h

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -97,6 +97,7 @@ class TController
9797
void Handle(TEvService::TEvRunWorker::TPtr& ev, const TActorContext& ctx);
9898
void Handle(TEvService::TEvWorkerDataEnd::TPtr& ev, const TActorContext& ctx);
9999
void Handle(TEvService::TEvGetTxId::TPtr& ev, const TActorContext& ctx);
100+
void Handle(TEvService::TEvHeartbeat::TPtr& ev, const TActorContext& ctx);
100101
void Handle(TEvTxAllocatorClient::TEvAllocateResult::TPtr& ev, const TActorContext& ctx);
101102
void Handle(TEvInterconnect::TEvNodeDisconnected::TPtr& ev, const TActorContext& ctx);
102103

@@ -133,6 +134,7 @@ class TController
133134
class TTxResolveSecretResult;
134135
class TTxWorkerError;
135136
class TTxAssignTxId;
137+
class TTxHeartbeat;
136138

137139
// tx runners
138140
void RunTxInitSchema(const TActorContext& ctx);
@@ -153,6 +155,7 @@ class TController
153155
void RunTxResolveSecretResult(TEvPrivate::TEvResolveSecretResult::TPtr& ev, const TActorContext& ctx);
154156
void RunTxWorkerError(const TWorkerId& id, const TString& error, const TActorContext& ctx);
155157
void RunTxAssignTxId(const TActorContext& ctx);
158+
void RunTxHeartbeat(const TActorContext& ctx);
156159

157160
// other
158161
template <typename T>
@@ -208,6 +211,11 @@ class TController
208211
TMap<TRowVersion, THashSet<ui32>> PendingTxId;
209212
bool AssignTxIdInFlight = false;
210213

214+
THashSet<TWorkerId> WorkersWithHeartbeat;
215+
TMap<TRowVersion, THashSet<TWorkerId>> WorkersByHeartbeat;
216+
THashMap<TWorkerId, TRowVersion> PendingHeartbeats;
217+
bool ProcessHeartbeatsInFlight = false;
218+
211219
}; // TController
212220

213221
}

ydb/core/tx/replication/controller/schema.h

Lines changed: 13 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -70,12 +70,24 @@ struct TControllerSchema: NIceDb::Schema {
7070
using TColumns = TableColumns<VersionStep, VersionTxId, WriteTxId>;
7171
};
7272

73+
struct Workers: Table<6> {
74+
struct ReplicationId: Column<1, NScheme::NTypeIds::Uint64> {};
75+
struct TargetId: Column<2, NScheme::NTypeIds::Uint64> {};
76+
struct WorkerId: Column<3, NScheme::NTypeIds::Uint64> {};
77+
struct HeartbeatVersionStep: Column<4, NScheme::NTypeIds::Uint64> {};
78+
struct HeartbeatVersionTxId: Column<5, NScheme::NTypeIds::Uint64> {};
79+
80+
using TKey = TableKey<ReplicationId, TargetId, WorkerId>;
81+
using TColumns = TableColumns<ReplicationId, TargetId, WorkerId, HeartbeatVersionStep, HeartbeatVersionTxId>;
82+
};
83+
7384
using TTables = SchemaTables<
7485
SysParams,
7586
Replications,
7687
Targets,
7788
SrcStreams,
78-
TxIds
89+
TxIds,
90+
Workers
7991
>;
8092

8193
}; // TControllerSchema

ydb/core/tx/replication/controller/session_info.cpp

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -83,4 +83,17 @@ void TWorkerInfo::SetDataEnded(bool value) {
8383
DataEnded = value;
8484
}
8585

86+
void TWorkerInfo::SetHeartbeat(const TRowVersion& value) {
87+
Heartbeat = value;
88+
}
89+
90+
bool TWorkerInfo::HasHeartbeat() const {
91+
return Heartbeat.Defined();
92+
}
93+
94+
const TRowVersion& TWorkerInfo::GetHeartbeat() const {
95+
Y_ABORT_UNLESS(Heartbeat.Defined());
96+
return *Heartbeat;
97+
}
98+
8699
}

ydb/core/tx/replication/controller/session_info.h

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
#pragma once
22

3+
#include <ydb/core/base/row_version.h>
34
#include <ydb/core/tx/replication/common/worker_id.h>
45

56
#include <util/generic/hash_set.h>
@@ -45,10 +46,15 @@ class TWorkerInfo {
4546
bool IsDataEnded() const;
4647
void SetDataEnded(bool value);
4748

49+
void SetHeartbeat(const TRowVersion& value);
50+
bool HasHeartbeat() const;
51+
const TRowVersion& GetHeartbeat() const;
52+
4853
private:
4954
THolder<NKikimrReplication::TRunWorkerCommand> Command;
5055
TMaybe<ui32> Session;
5156
bool DataEnded = false;
57+
TMaybe<TRowVersion> Heartbeat;
5258
};
5359

5460
}
Lines changed: 96 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,96 @@
1+
#include "controller_impl.h"
2+
3+
namespace NKikimr::NReplication::NController {
4+
5+
class TController::TTxHeartbeat: public TTxBase {
6+
public:
7+
explicit TTxHeartbeat(TController* self)
8+
: TTxBase("TxHeartbeat", self)
9+
{
10+
}
11+
12+
TTxType GetTxType() const override {
13+
return TXTYPE_HEARTBEAT;
14+
}
15+
16+
bool Execute(TTransactionContext& txc, const TActorContext& ctx) override {
17+
CLOG_D(ctx, "Execute"
18+
<< ": pending# " << Self->PendingHeartbeats.size());
19+
20+
if (Self->Workers.empty()) {
21+
CLOG_W(ctx, "There are no workers");
22+
return true;
23+
}
24+
25+
const auto prevMinVersion = !Self->WorkersByHeartbeat.empty()
26+
? std::make_optional<TRowVersion>(Self->WorkersByHeartbeat.begin()->first)
27+
: std::nullopt;
28+
29+
NIceDb::TNiceDb db(txc.DB);
30+
31+
for (const auto& [id, version] : Self->PendingHeartbeats) {
32+
if (!Self->Workers.contains(id)) {
33+
continue;
34+
}
35+
36+
auto& worker = Self->Workers[id];
37+
if (worker.HasHeartbeat()) {
38+
auto it = Self->WorkersByHeartbeat.find(worker.GetHeartbeat());
39+
if (it != Self->WorkersByHeartbeat.end()) {
40+
it->second.erase(id);
41+
if (it->second.empty()) {
42+
Self->WorkersByHeartbeat.erase(it);
43+
}
44+
}
45+
}
46+
47+
worker.SetHeartbeat(version);
48+
Self->WorkersWithHeartbeat.insert(id);
49+
Self->WorkersByHeartbeat[version].insert(id);
50+
51+
db.Table<Schema::Workers>().Key(id.ReplicationId(), id.TargetId(), id.WorkerId()).Update(
52+
NIceDb::TUpdate<Schema::Workers::HeartbeatVersionStep>(version.Step),
53+
NIceDb::TUpdate<Schema::Workers::HeartbeatVersionTxId>(version.TxId)
54+
);
55+
}
56+
57+
if (Self->Workers.size() != Self->WorkersWithHeartbeat.size()) {
58+
return true;
59+
}
60+
61+
Y_ABORT_UNLESS(!Self->WorkersByHeartbeat.empty());
62+
const auto newMinVersion = Self->WorkersByHeartbeat.begin()->first;
63+
64+
if (newMinVersion <= prevMinVersion.value_or(TRowVersion::Min())) {
65+
return true;
66+
}
67+
68+
CLOG_N(ctx, "Min version has been changed"
69+
<< ": prev# " << prevMinVersion.value_or(TRowVersion::Min())
70+
<< ", new# " << newMinVersion);
71+
72+
// TODO: run commit
73+
return true;
74+
}
75+
76+
void Complete(const TActorContext& ctx) override {
77+
CLOG_D(ctx, "Complete"
78+
<< ": pending# " << Self->PendingHeartbeats.size());
79+
80+
if (Self->PendingHeartbeats) {
81+
Self->Execute(new TTxHeartbeat(Self), ctx);
82+
} else {
83+
Self->ProcessHeartbeatsInFlight = false;
84+
}
85+
}
86+
87+
}; // TTxHeartbeat
88+
89+
void TController::RunTxHeartbeat(const TActorContext& ctx) {
90+
if (!ProcessHeartbeatsInFlight) {
91+
ProcessHeartbeatsInFlight = true;
92+
Execute(new TTxHeartbeat(this), ctx);
93+
}
94+
}
95+
96+
}

ydb/core/tx/replication/controller/tx_init.cpp

Lines changed: 32 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -157,13 +157,44 @@ class TController::TTxInit: public TTxBase {
157157
return true;
158158
}
159159

160+
bool LoadWorkers(NIceDb::TNiceDb& db) {
161+
auto rowset = db.Table<Schema::Workers>().Select();
162+
if (!rowset.IsReady()) {
163+
return false;
164+
}
165+
166+
while (!rowset.EndOfSet()) {
167+
const auto id = TWorkerId(
168+
rowset.GetValue<Schema::Workers::ReplicationId>(),
169+
rowset.GetValue<Schema::Workers::TargetId>(),
170+
rowset.GetValue<Schema::Workers::WorkerId>()
171+
);
172+
const auto version = TRowVersion(
173+
rowset.GetValue<Schema::Workers::HeartbeatVersionStep>(),
174+
rowset.GetValue<Schema::Workers::HeartbeatVersionTxId>()
175+
);
176+
177+
auto* worker = Self->GetOrCreateWorker(id);
178+
worker->SetHeartbeat(version);
179+
Self->WorkersWithHeartbeat.insert(id);
180+
Self->WorkersByHeartbeat[version].insert(id);
181+
182+
if (!rowset.Next()) {
183+
return false;
184+
}
185+
}
186+
187+
return true;
188+
}
189+
160190
inline bool Load(NIceDb::TNiceDb& db) {
161191
Self->Reset();
162192
return LoadSysParams(db)
163193
&& LoadReplications(db)
164194
&& LoadTargets(db)
165195
&& LoadSrcStreams(db)
166-
&& LoadTxIds(db);
196+
&& LoadTxIds(db)
197+
&& LoadWorkers(db);
167198
}
168199

169200
inline bool Load(NTable::TDatabase& toughDb) {

ydb/core/tx/replication/controller/ya.make

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -50,6 +50,7 @@ SRCS(
5050
tx_drop_dst_result.cpp
5151
tx_drop_replication.cpp
5252
tx_drop_stream_result.cpp
53+
tx_heartbeat.cpp
5354
tx_init.cpp
5455
tx_init_schema.cpp
5556
tx_resolve_secret_result.cpp

0 commit comments

Comments
 (0)