Skip to content

Commit a48b189

Browse files
authored
Replication stats (total & per item): lag, initial scan progress (#6092)
1 parent 3a2566e commit a48b189

File tree

18 files changed

+416
-34
lines changed

18 files changed

+416
-34
lines changed

ydb/core/grpc_services/rpc_replication.cpp

Lines changed: 12 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -102,6 +102,7 @@ class TDescribeReplicationRPC: public TRpcSchemeRequestActor<TDescribeReplicatio
102102

103103
auto ev = std::make_unique<NReplication::TEvController::TEvDescribeReplication>();
104104
PathIdFromPathId(pathId, ev->Record.MutablePathId());
105+
ev->Record.SetIncludeStats(GetProtoRequest()->include_stats());
105106

106107
NTabletPipe::SendData(SelfId(), ControllerPipeClient, ev.release());
107108
Become(&TDescribeReplicationRPC::StateDescribeReplication);
@@ -167,16 +168,26 @@ class TDescribeReplicationRPC: public TRpcSchemeRequestActor<TDescribeReplicatio
167168
if (from.HasSrcStreamName()) {
168169
to.set_source_changefeed_name(from.GetSrcStreamName());
169170
}
171+
if (from.HasLagMilliSeconds()) {
172+
*to.mutable_stats()->mutable_lag() = google::protobuf::util::TimeUtil::MillisecondsToDuration(
173+
from.GetLagMilliSeconds());
174+
}
175+
if (from.HasInitialScanProgress()) {
176+
to.mutable_stats()->set_initial_scan_progress(from.GetInitialScanProgress());
177+
}
170178
}
171179

172180
static void ConvertState(NKikimrReplication::TReplicationState& from, Ydb::Replication::DescribeReplicationResult& to) {
173181
switch (from.GetStateCase()) {
174182
case NKikimrReplication::TReplicationState::kStandBy:
175183
to.mutable_running();
176184
if (from.GetStandBy().HasLagMilliSeconds()) {
177-
*to.mutable_running()->mutable_lag() = google::protobuf::util::TimeUtil::MillisecondsToDuration(
185+
*to.mutable_running()->mutable_stats()->mutable_lag() = google::protobuf::util::TimeUtil::MillisecondsToDuration(
178186
from.GetStandBy().GetLagMilliSeconds());
179187
}
188+
if (from.GetStandBy().HasInitialScanProgress()) {
189+
to.mutable_running()->mutable_stats()->set_initial_scan_progress(from.GetStandBy().GetInitialScanProgress());
190+
}
180191
break;
181192
case NKikimrReplication::TReplicationState::kError:
182193
*to.mutable_error()->mutable_issues() = std::move(*from.MutableError()->MutableIssues());

ydb/core/kqp/ut/scheme/kqp_scheme_ut.cpp

Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5485,7 +5485,10 @@ Y_UNIT_TEST_SUITE(KqpScheme) {
54855485
}
54865486

54875487
Y_UNIT_TEST(CreateAsyncReplicationWithSecret) {
5488+
using namespace NReplication;
5489+
54885490
TKikimrRunner kikimr("root@builtin");
5491+
auto repl = TReplicationClient(kikimr.GetDriver(), TCommonClientSettings().Database("/Root"));
54895492
auto db = kikimr.GetTableClient();
54905493
auto session = db.CreateSession().GetValueSync().GetSession();
54915494

@@ -5529,6 +5532,34 @@ Y_UNIT_TEST_SUITE(KqpScheme) {
55295532

55305533
Sleep(TDuration::Seconds(1));
55315534
}
5535+
5536+
while (true) {
5537+
auto settings = TDescribeReplicationSettings().IncludeStats(true);
5538+
const auto result = repl.DescribeReplication("/Root/replication", settings).ExtractValueSync();
5539+
UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString());
5540+
5541+
const auto& desc = result.GetReplicationDescription();
5542+
UNIT_ASSERT_VALUES_EQUAL(desc.GetState(), TReplicationDescription::EState::Running);
5543+
5544+
const auto& total = desc.GetRunningState().GetStats();
5545+
if (!total.GetInitialScanProgress() || *total.GetInitialScanProgress() < 100) {
5546+
Sleep(TDuration::Seconds(1));
5547+
continue;
5548+
}
5549+
5550+
UNIT_ASSERT(total.GetInitialScanProgress());
5551+
UNIT_ASSERT_DOUBLES_EQUAL(*total.GetInitialScanProgress(), 100.0, 0.01);
5552+
5553+
const auto& items = desc.GetItems();
5554+
UNIT_ASSERT_VALUES_EQUAL(items.size(), 1);
5555+
const auto& item = items.at(0).Stats;
5556+
5557+
UNIT_ASSERT(item.GetInitialScanProgress());
5558+
UNIT_ASSERT_DOUBLES_EQUAL(*item.GetInitialScanProgress(), *total.GetInitialScanProgress(), 0.01);
5559+
5560+
// TODO: check lag too
5561+
break;
5562+
}
55325563
}
55335564

55345565
Y_UNIT_TEST(AlterAsyncReplication) {

ydb/core/protos/replication.proto

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -33,10 +33,14 @@ message TReplicationConfig {
3333

3434
message TTargetSpecific {
3535
message TTarget {
36+
// in/out
3637
optional string SrcPath = 1;
3738
optional string DstPath = 2;
3839
optional string SrcStreamName = 3;
40+
// out
3941
optional uint64 Id = 4;
42+
optional uint32 LagMilliSeconds = 5;
43+
optional float InitialScanProgress = 6; // pencentage
4044
}
4145

4246
repeated TTarget Targets = 1;
@@ -59,6 +63,7 @@ message TReplicationConfig {
5963
message TReplicationState {
6064
message TStandBy {
6165
optional uint32 LagMilliSeconds = 1;
66+
optional float InitialScanProgress = 2; // pencentage
6267
}
6368

6469
message TPaused {
@@ -147,6 +152,7 @@ message TEvDropReplicationResult {
147152

148153
message TEvDescribeReplication {
149154
optional NKikimrProto.TPathID PathId = 1;
155+
optional bool IncludeStats = 2;
150156
}
151157

152158
message TEvDescribeReplicationResult {

ydb/core/tx/replication/controller/controller.cpp

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -59,6 +59,7 @@ STFUNC(TController::StateWork) {
5959
HFunc(TEvPrivate::TEvUpdateTenantNodes, Handle);
6060
HFunc(TEvPrivate::TEvProcessQueues, Handle);
6161
HFunc(TEvPrivate::TEvRemoveWorker, Handle);
62+
HFunc(TEvPrivate::TEvDescribeTargetsResult, Handle);
6263
HFunc(TEvDiscovery::TEvDiscoveryData, Handle);
6364
HFunc(TEvDiscovery::TEvError, Handle);
6465
HFunc(TEvService::TEvStatus, Handle);
@@ -132,6 +133,11 @@ void TController::Handle(TEvController::TEvDescribeReplication::TPtr& ev, const
132133
RunTxDescribeReplication(ev, ctx);
133134
}
134135

136+
void TController::Handle(TEvPrivate::TEvDescribeTargetsResult::TPtr& ev, const TActorContext& ctx) {
137+
CLOG_T(ctx, "Handle " << ev->Get()->ToString());
138+
RunTxDescribeReplication(ev, ctx);
139+
}
140+
135141
void TController::Handle(TEvPrivate::TEvDiscoveryTargetsResult::TPtr& ev, const TActorContext& ctx) {
136142
CLOG_T(ctx, "Handle " << ev->Get()->ToString());
137143
RunTxDiscoveryTargetsResult(ev, ctx);

ydb/core/tx/replication/controller/controller_impl.h

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -82,6 +82,7 @@ class TController
8282
void Handle(TEvPrivate::TEvUpdateTenantNodes::TPtr& ev, const TActorContext& ctx);
8383
void Handle(TEvPrivate::TEvProcessQueues::TPtr& ev, const TActorContext& ctx);
8484
void Handle(TEvPrivate::TEvRemoveWorker::TPtr& ev, const TActorContext& ctx);
85+
void Handle(TEvPrivate::TEvDescribeTargetsResult::TPtr& ev, const TActorContext& ctx);
8586
void Handle(TEvDiscovery::TEvDiscoveryData::TPtr& ev, const TActorContext& ctx);
8687
void Handle(TEvDiscovery::TEvError::TPtr& ev, const TActorContext& ctx);
8788
void Handle(TEvService::TEvStatus::TPtr& ev, const TActorContext& ctx);
@@ -128,6 +129,7 @@ class TController
128129
void RunTxDropReplication(TEvController::TEvDropReplication::TPtr& ev, const TActorContext& ctx);
129130
void RunTxDropReplication(TEvPrivate::TEvDropReplication::TPtr& ev, const TActorContext& ctx);
130131
void RunTxDescribeReplication(TEvController::TEvDescribeReplication::TPtr& ev, const TActorContext& ctx);
132+
void RunTxDescribeReplication(TEvPrivate::TEvDescribeTargetsResult::TPtr& ev, const TActorContext& ctx);
131133
void RunTxDiscoveryTargetsResult(TEvPrivate::TEvDiscoveryTargetsResult::TPtr& ev, const TActorContext& ctx);
132134
void RunTxAssignStreamName(TEvPrivate::TEvAssignStreamName::TPtr& ev, const TActorContext& ctx);
133135
void RunTxCreateStreamResult(TEvPrivate::TEvCreateStreamResult::TPtr& ev, const TActorContext& ctx);

ydb/core/tx/replication/controller/private_events.cpp

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -163,6 +163,19 @@ TString TEvPrivate::TEvRemoveWorker::ToString() const {
163163
<< " }";
164164
}
165165

166+
TEvPrivate::TEvDescribeTargetsResult::TEvDescribeTargetsResult(const TActorId& sender, ui64 rid, TResult&& result)
167+
: Sender(sender)
168+
, ReplicationId(rid)
169+
, Result(std::move(result))
170+
{
171+
}
172+
173+
TString TEvPrivate::TEvDescribeTargetsResult::ToString() const {
174+
return TStringBuilder() << ToStringHeader() << " {"
175+
<< " ReplicationId: " << ReplicationId
176+
<< " }";
177+
}
178+
166179
}
167180

168181
Y_DECLARE_OUT_SPEC(, NKikimr::NReplication::NController::TEvPrivate::TEvDiscoveryTargetsResult::TAddEntry, stream, value) {

ydb/core/tx/replication/controller/private_events.h

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,13 +1,18 @@
11
#pragma once
22

33
#include <ydb/public/sdk/cpp/client/ydb_scheme/scheme.h>
4+
#include <ydb/public/sdk/cpp/client/ydb_table/table.h>
45

56
#include <ydb/core/base/defs.h>
67
#include <ydb/core/base/events.h>
78
#include <ydb/core/scheme/scheme_pathid.h>
89
#include <ydb/core/protos/flat_tx_scheme.pb.h>
910
#include <ydb/core/tx/replication/common/worker_id.h>
1011

12+
#include <util/generic/hash.h>
13+
14+
#include <optional>
15+
1116
namespace NKikimr::NReplication::NController {
1217

1318
struct TEvPrivate {
@@ -25,6 +30,7 @@ struct TEvPrivate {
2530
EvResolveSecretResult,
2631
EvAlterDstResult,
2732
EvRemoveWorker,
33+
EvDescribeTargetsResult,
2834

2935
EvEnd,
3036
};
@@ -191,6 +197,17 @@ struct TEvPrivate {
191197
TString ToString() const override;
192198
};
193199

200+
struct TEvDescribeTargetsResult: public TEventLocal<TEvDescribeTargetsResult, EvDescribeTargetsResult> {
201+
using TResult = THashMap<ui64, std::optional<NYdb::NTable::TDescribeTableResult>>;
202+
203+
const TActorId Sender;
204+
const ui64 ReplicationId;
205+
TResult Result;
206+
207+
explicit TEvDescribeTargetsResult(const TActorId& sender, ui64 rid, TResult&& result);
208+
TString ToString() const override;
209+
};
210+
194211
}; // TEvPrivate
195212

196213
}

ydb/core/tx/replication/controller/replication.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -79,6 +79,7 @@ class TReplication: public TSimpleRefCount<TReplication> {
7979
virtual void AddWorker(ui64 id) = 0;
8080
virtual void RemoveWorker(ui64 id) = 0;
8181
virtual void UpdateLag(ui64 workerId, TDuration lag) = 0;
82+
virtual const TMaybe<TDuration> GetLag() const = 0;
8283

8384
virtual void Progress(const TActorContext& ctx) = 0;
8485
virtual void Shutdown(const TActorContext& ctx) = 0;

ydb/core/tx/replication/controller/target_base.cpp

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -117,10 +117,14 @@ void TTargetBase::UpdateLag(ui64 workerId, TDuration lag) {
117117
}
118118

119119
if (TLagProvider::UpdateLag(it->second, workerId, lag)) {
120-
Replication->UpdateLag(GetId(), GetLag().GetRef());
120+
Replication->UpdateLag(GetId(), TLagProvider::GetLag().GetRef());
121121
}
122122
}
123123

124+
const TMaybe<TDuration> TTargetBase::GetLag() const {
125+
return TLagProvider::GetLag();
126+
}
127+
124128
void TTargetBase::Progress(const TActorContext& ctx) {
125129
switch (DstState) {
126130
case EDstState::Creating:

ydb/core/tx/replication/controller/target_base.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -50,6 +50,7 @@ class TTargetBase
5050
void AddWorker(ui64 id) override;
5151
void RemoveWorker(ui64 id) override;
5252
void UpdateLag(ui64 workerId, TDuration lag) override;
53+
const TMaybe<TDuration> GetLag() const override;
5354

5455
void Progress(const TActorContext& ctx) override;
5556
void Shutdown(const TActorContext& ctx) override;

0 commit comments

Comments
 (0)