Skip to content

Commit 7644097

Browse files
authored
Consistency option (core part) (#11970)
1 parent a328b30 commit 7644097

File tree

11 files changed

+340
-46
lines changed

11 files changed

+340
-46
lines changed

ydb/core/protos/replication.proto

Lines changed: 13 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -69,7 +69,19 @@ message TReplicationConfig {
6969
TTargetSpecific Specific = 5;
7070
}
7171

72-
optional bool InitialSync = 6;
72+
reserved 6; // InitialSync
73+
74+
message TWeakConsistency {
75+
}
76+
77+
message TStrongConsistency {
78+
optional uint64 CommitIntervalMilliSeconds = 1;
79+
}
80+
81+
oneof Consistency {
82+
TWeakConsistency WeakConsistency = 7;
83+
TStrongConsistency StrongConsistency = 8;
84+
}
7385
}
7486

7587
message TReplicationState {

ydb/core/tx/replication/controller/dst_creator.cpp

Lines changed: 69 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@
77
#include <ydb/core/base/tablet_pipecache.h>
88
#include <ydb/core/cms/console/configs_dispatcher.h>
99
#include <ydb/core/protos/console_config.pb.h>
10+
#include <ydb/core/protos/replication.pb.h>
1011
#include <ydb/core/protos/schemeshard/operations.pb.h>
1112
#include <ydb/core/tx/replication/ydb_proxy/ydb_proxy.h>
1213
#include <ydb/core/tx/scheme_board/events.h>
@@ -226,10 +227,8 @@ class TDstCreator: public TActorBootstrapped<TDstCreator> {
226227
AllocateTxId();
227228
}
228229

229-
static void FillReplicationConfig(NKikimrSchemeOp::TTableReplicationConfig& replicationConfig) {
230-
// TODO: support other modes
231-
replicationConfig.SetMode(NKikimrSchemeOp::TTableReplicationConfig::REPLICATION_MODE_READ_ONLY);
232-
replicationConfig.SetConsistency(NKikimrSchemeOp::TTableReplicationConfig::CONSISTENCY_WEAK);
230+
void FillReplicationConfig(NKikimrSchemeOp::TTableReplicationConfig& replicationConfig) const {
231+
NController::FillReplicationConfig(replicationConfig, Mode, Consistency);
233232
}
234233

235234
void AllocateTxId() {
@@ -375,22 +374,7 @@ class TDstCreator: public TActorBootstrapped<TDstCreator> {
375374
return false;
376375
}
377376

378-
const auto& replicationConfig = got.GetReplicationConfig();
379-
380-
switch (replicationConfig.GetMode()) {
381-
case NKikimrSchemeOp::TTableReplicationConfig::REPLICATION_MODE_READ_ONLY:
382-
break;
383-
default:
384-
error = "Unsupported replication mode";
385-
return false;
386-
}
387-
388-
switch (replicationConfig.GetConsistency()) {
389-
case NKikimrSchemeOp::TTableReplicationConfig::CONSISTENCY_WEAK:
390-
break;
391-
default:
392-
error = TStringBuilder() << "Unsupported replication consistency"
393-
<< ": " << static_cast<int>(replicationConfig.GetConsistency());
377+
if (!CheckReplicationConfig(got.GetReplicationConfig(), Mode, Consistency, error)) {
394378
return false;
395379
}
396380

@@ -623,7 +607,9 @@ class TDstCreator: public TActorBootstrapped<TDstCreator> {
623607
ui64 tid,
624608
TReplication::ETargetKind kind,
625609
const TString& srcPath,
626-
const TString& dstPath)
610+
const TString& dstPath,
611+
EReplicationMode mode,
612+
EReplicaConsistency consistency)
627613
: Parent(parent)
628614
, SchemeShardId(schemeShardId)
629615
, YdbProxy(proxy)
@@ -633,6 +619,8 @@ class TDstCreator: public TActorBootstrapped<TDstCreator> {
633619
, Kind(kind)
634620
, SrcPath(srcPath)
635621
, DstPath(dstPath)
622+
, Mode(mode)
623+
, Consistency(consistency)
636624
, LogPrefix("DstCreator", ReplicationId, TargetId)
637625
{
638626
}
@@ -665,6 +653,8 @@ class TDstCreator: public TActorBootstrapped<TDstCreator> {
665653
const TReplication::ETargetKind Kind;
666654
const TString SrcPath;
667655
const TString DstPath;
656+
const EReplicationMode Mode;
657+
const EReplicaConsistency Consistency;
668658
const TActorLogPrefix LogPrefix;
669659

670660
TPathId DomainKey;
@@ -680,17 +670,72 @@ class TDstCreator: public TActorBootstrapped<TDstCreator> {
680670

681671
}; // TDstCreator
682672

673+
static NKikimrSchemeOp::TTableReplicationConfig::EConsistency ConvertConsistency(EReplicaConsistency value) {
674+
switch (value) {
675+
case EReplicaConsistency::Weak:
676+
return NKikimrSchemeOp::TTableReplicationConfig::CONSISTENCY_WEAK;
677+
case EReplicaConsistency::Strong:
678+
return NKikimrSchemeOp::TTableReplicationConfig::CONSISTENCY_STRONG;
679+
}
680+
}
681+
682+
static NKikimrSchemeOp::TTableReplicationConfig::EReplicationMode ConvertMode(EReplicationMode value) {
683+
switch (value) {
684+
case EReplicationMode::ReadOnly:
685+
return NKikimrSchemeOp::TTableReplicationConfig::REPLICATION_MODE_READ_ONLY;
686+
}
687+
}
688+
689+
void FillReplicationConfig(
690+
NKikimrSchemeOp::TTableReplicationConfig& out,
691+
EReplicationMode mode,
692+
EReplicaConsistency consistency
693+
) {
694+
out.SetMode(ConvertMode(mode));
695+
out.SetConsistency(ConvertConsistency(consistency));
696+
}
697+
698+
bool CheckReplicationConfig(
699+
const NKikimrSchemeOp::TTableReplicationConfig& in,
700+
EReplicationMode mode,
701+
EReplicaConsistency consistency,
702+
TString& error
703+
) {
704+
if (in.GetMode() != ConvertMode(mode)) {
705+
error = TStringBuilder() << "Replication mode mismatch"
706+
<< ": expected: " << ConvertMode(mode)
707+
<< ", got: " << static_cast<int>(in.GetMode());
708+
return false;
709+
}
710+
711+
if (in.GetConsistency() != ConvertConsistency(consistency)) {
712+
error = TStringBuilder() << "Replication consistency mismatch"
713+
<< ": expected: " << ConvertConsistency(consistency)
714+
<< ", got: " << static_cast<int>(in.GetConsistency());
715+
return false;
716+
}
717+
718+
return true;
719+
}
720+
683721
IActor* CreateDstCreator(TReplication* replication, ui64 targetId, const TActorContext& ctx) {
684722
const auto* target = replication->FindTarget(targetId);
685723
Y_ABORT_UNLESS(target);
724+
725+
const auto consistency = replication->GetConfig().HasStrongConsistency()
726+
? EReplicaConsistency::Strong
727+
: EReplicaConsistency::Weak;
728+
686729
return CreateDstCreator(ctx.SelfID, replication->GetSchemeShardId(), replication->GetYdbProxy(), replication->GetPathId(),
687-
replication->GetId(), target->GetId(), target->GetKind(), target->GetSrcPath(), target->GetDstPath());
730+
replication->GetId(), target->GetId(), target->GetKind(), target->GetSrcPath(), target->GetDstPath(),
731+
EReplicationMode::ReadOnly, consistency);
688732
}
689733

690734
IActor* CreateDstCreator(const TActorId& parent, ui64 schemeShardId, const TActorId& proxy, const TPathId& pathId,
691-
ui64 rid, ui64 tid, TReplication::ETargetKind kind, const TString& srcPath, const TString& dstPath)
735+
ui64 rid, ui64 tid, TReplication::ETargetKind kind, const TString& srcPath, const TString& dstPath,
736+
EReplicationMode mode, EReplicaConsistency consistency)
692737
{
693-
return new TDstCreator(parent, schemeShardId, proxy, pathId, rid, tid, kind, srcPath, dstPath);
738+
return new TDstCreator(parent, schemeShardId, proxy, pathId, rid, tid, kind, srcPath, dstPath, mode, consistency);
694739
}
695740

696741
}

ydb/core/tx/replication/controller/dst_creator.h

Lines changed: 26 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,10 +2,35 @@
22

33
#include "replication.h"
44

5+
namespace NKikimrSchemeOp {
6+
class TTableReplicationConfig;
7+
}
8+
59
namespace NKikimr::NReplication::NController {
610

11+
enum class EReplicationMode {
12+
ReadOnly,
13+
};
14+
15+
enum class EReplicaConsistency {
16+
Weak,
17+
Strong,
18+
};
19+
20+
void FillReplicationConfig(
21+
NKikimrSchemeOp::TTableReplicationConfig& out,
22+
EReplicationMode mode,
23+
EReplicaConsistency consistency);
24+
bool CheckReplicationConfig(
25+
const NKikimrSchemeOp::TTableReplicationConfig& in,
26+
EReplicationMode mode,
27+
EReplicaConsistency consistency,
28+
TString& error);
29+
730
IActor* CreateDstCreator(TReplication* replication, ui64 targetId, const TActorContext& ctx);
831
IActor* CreateDstCreator(const TActorId& parent, ui64 schemeShardId, const TActorId& proxy, const TPathId& pathId,
9-
ui64 rid, ui64 tid, TReplication::ETargetKind kind, const TString& srcPath, const TString& dstPath);
32+
ui64 rid, ui64 tid, TReplication::ETargetKind kind, const TString& srcPath, const TString& dstPath,
33+
EReplicationMode mode = EReplicationMode::ReadOnly,
34+
EReplicaConsistency consistency = EReplicaConsistency::Weak);
1035

1136
}

ydb/core/tx/replication/controller/dst_creator_ut.cpp

Lines changed: 27 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,12 @@ namespace NKikimr::NReplication::NController {
1313
Y_UNIT_TEST_SUITE(DstCreator) {
1414
using namespace NTestHelpers;
1515

16-
void CheckTableReplica(const TTestTableDescription& tableDesc, const NKikimrSchemeOp::TTableDescription& replicatedDesc) {
16+
void CheckTableReplica(
17+
const TTestTableDescription& tableDesc,
18+
const NKikimrSchemeOp::TTableDescription& replicatedDesc,
19+
EReplicationMode mode = EReplicationMode::ReadOnly,
20+
EReplicaConsistency consistency = EReplicaConsistency::Weak
21+
) {
1722
UNIT_ASSERT_VALUES_EQUAL(replicatedDesc.KeyColumnNamesSize(), tableDesc.KeyColumns.size());
1823
for (ui32 i = 0; i < replicatedDesc.KeyColumnNamesSize(); ++i) {
1924
UNIT_ASSERT_VALUES_EQUAL(replicatedDesc.GetKeyColumnNames(i), tableDesc.KeyColumns[i]);
@@ -28,12 +33,15 @@ Y_UNIT_TEST_SUITE(DstCreator) {
2833
UNIT_ASSERT(FindIfPtr(tableDesc.Columns, pred));
2934
}
3035

31-
const auto& replCfg = replicatedDesc.GetReplicationConfig();
32-
UNIT_ASSERT_VALUES_EQUAL(replCfg.GetMode(), NKikimrSchemeOp::TTableReplicationConfig::REPLICATION_MODE_READ_ONLY);
33-
UNIT_ASSERT_VALUES_EQUAL(replCfg.GetConsistency(), NKikimrSchemeOp::TTableReplicationConfig::CONSISTENCY_WEAK);
36+
TString error;
37+
UNIT_ASSERT_C(CheckReplicationConfig(replicatedDesc.GetReplicationConfig(), mode, consistency, error), error);
3438
}
3539

36-
void Basic(const TString& replicatedPath) {
40+
void Basic(
41+
const TString& replicatedPath,
42+
EReplicationMode mode = EReplicationMode::ReadOnly,
43+
EReplicaConsistency consistency = EReplicaConsistency::Weak
44+
) {
3745
TEnv env;
3846
env.GetRuntime().SetLogPriority(NKikimrServices::REPLICATION_CONTROLLER, NLog::PRI_TRACE);
3947

@@ -50,7 +58,7 @@ Y_UNIT_TEST_SUITE(DstCreator) {
5058
env.CreateTable("/Root", *MakeTableDescription(tableDesc));
5159
env.GetRuntime().Register(CreateDstCreator(
5260
env.GetSender(), env.GetSchemeshardId("/Root/Table"), env.GetYdbProxy(), env.GetPathId("/Root"),
53-
1 /* rid */, 1 /* tid */, TReplication::ETargetKind::Table, "/Root/Table", replicatedPath
61+
1 /* rid */, 1 /* tid */, TReplication::ETargetKind::Table, "/Root/Table", replicatedPath, mode, consistency
5462
));
5563

5664
auto ev = env.GetRuntime().GrabEdgeEvent<TEvPrivate::TEvCreateDstResult>(env.GetSender());
@@ -59,7 +67,7 @@ Y_UNIT_TEST_SUITE(DstCreator) {
5967
auto desc = env.GetDescription(replicatedPath);
6068
const auto& replicatedDesc = desc.GetPathDescription().GetTable();
6169

62-
CheckTableReplica(tableDesc, replicatedDesc);
70+
CheckTableReplica(tableDesc, replicatedDesc, mode, consistency);
6371
}
6472

6573
Y_UNIT_TEST(Basic) {
@@ -70,6 +78,10 @@ Y_UNIT_TEST_SUITE(DstCreator) {
7078
Basic("/Root/Dir/Replicated");
7179
}
7280

81+
Y_UNIT_TEST(StrongConsistency) {
82+
Basic("/Root/Replicated", EReplicationMode::ReadOnly, EReplicaConsistency::Strong);
83+
}
84+
7385
void WithIndex(const TString& replicatedPath, NKikimrSchemeOp::EIndexType indexType) {
7486
TEnv env(TFeatureFlags().SetEnableChangefeedsOnIndexTables(true));
7587
env.GetRuntime().SetLogPriority(NKikimrServices::REPLICATION_CONTROLLER, NLog::PRI_TRACE);
@@ -370,15 +382,15 @@ Y_UNIT_TEST_SUITE(DstCreator) {
370382
});
371383
}
372384

373-
Y_UNIT_TEST(UnsupportedReplicationMode) {
385+
Y_UNIT_TEST(ReplicationModeMismatch) {
374386
auto changeMode = [](const TTestTableDescription& desc) {
375387
auto copy = desc;
376388
copy.ReplicationConfig->Mode = TTestTableDescription::TReplicationConfig::MODE_NONE;
377389
copy.ReplicationConfig->Consistency = TTestTableDescription::TReplicationConfig::CONSISTENCY_UNKNOWN;
378390
return copy;
379391
};
380392

381-
ExistingDst(NKikimrScheme::StatusSchemeError, "Unsupported replication mode", changeMode, TTestTableDescription{
393+
ExistingDst(NKikimrScheme::StatusSchemeError, "Replication mode mismatch", changeMode, TTestTableDescription{
382394
.Name = "Table",
383395
.KeyColumns = {"key"},
384396
.Columns = {
@@ -388,20 +400,24 @@ Y_UNIT_TEST_SUITE(DstCreator) {
388400
});
389401
}
390402

391-
Y_UNIT_TEST(UnsupportedReplicationConsistency) {
403+
Y_UNIT_TEST(ReplicationConsistencyMismatch) {
392404
auto changeConsistency = [](const TTestTableDescription& desc) {
393405
auto copy = desc;
394406
copy.ReplicationConfig->Consistency = TTestTableDescription::TReplicationConfig::CONSISTENCY_STRONG;
395407
return copy;
396408
};
397409

398-
ExistingDst(NKikimrScheme::StatusSchemeError, "Unsupported replication consistency", changeConsistency, TTestTableDescription{
410+
ExistingDst(NKikimrScheme::StatusSchemeError, "Replication consistency mismatch", changeConsistency, TTestTableDescription{
399411
.Name = "Table",
400412
.KeyColumns = {"key"},
401413
.Columns = {
402414
{.Name = "key", .Type = "Uint32"},
403415
{.Name = "value", .Type = "Utf8"},
404416
},
417+
.ReplicationConfig = TTestTableDescription::TReplicationConfig{
418+
.Mode = TTestTableDescription::TReplicationConfig::MODE_READ_ONLY,
419+
.Consistency = TTestTableDescription::TReplicationConfig::CONSISTENCY_WEAK,
420+
},
405421
});
406422
}
407423
}

ydb/core/tx/replication/controller/stream_creator.cpp

Lines changed: 32 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -19,13 +19,25 @@ namespace NKikimr::NReplication::NController {
1919

2020
class TStreamCreator: public TActorBootstrapped<TStreamCreator> {
2121
static NYdb::NTable::TChangefeedDescription MakeChangefeed(
22-
const TString& name, const TDuration& retentionPeriod, const NJson::TJsonMap& attrs)
22+
const TString& name,
23+
const TDuration& retentionPeriod,
24+
const std::optional<TDuration>& resolvedTimestamps,
25+
const NJson::TJsonMap& attrs)
2326
{
2427
using namespace NYdb::NTable;
25-
return TChangefeedDescription(name, EChangefeedMode::Updates, EChangefeedFormat::Json)
28+
29+
auto desc = TChangefeedDescription(name, EChangefeedMode::Updates, EChangefeedFormat::Json)
2630
.WithRetentionPeriod(retentionPeriod)
2731
.WithInitialScan()
2832
.AddAttribute("__async_replication", NJson::WriteJson(attrs, false));
33+
34+
if (resolvedTimestamps) {
35+
desc
36+
.WithVirtualTimestamps()
37+
.WithResolvedTimestamps(*resolvedTimestamps);
38+
}
39+
40+
return desc;
2941
}
3042

3143
void RequestPermission() {
@@ -161,17 +173,19 @@ class TStreamCreator: public TActorBootstrapped<TStreamCreator> {
161173
const TString& srcPath,
162174
const TString& dstPath,
163175
const TString& streamName,
164-
const TDuration& streamRetentionPeriod)
176+
const TDuration& retentionPeriod,
177+
const std::optional<TDuration>& resolvedTimestamps,
178+
bool supportsTopicAutopartitioning)
165179
: Parent(parent)
166180
, YdbProxy(proxy)
167181
, ReplicationId(rid)
168182
, TargetId(tid)
169183
, Kind(kind)
170184
, SrcPath(srcPath)
171-
, Changefeed(MakeChangefeed(streamName, streamRetentionPeriod, NJson::TJsonMap{
185+
, Changefeed(MakeChangefeed(streamName, retentionPeriod, resolvedTimestamps, NJson::TJsonMap{
172186
{"path", dstPath},
173187
{"id", ToString(rid)},
174-
{"supports_topic_autopartitioning", AppData()->FeatureFlags.GetEnableTopicAutopartitioningForReplication()},
188+
{"supports_topic_autopartitioning", supportsTopicAutopartitioning},
175189
}))
176190
, LogPrefix("StreamCreator", ReplicationId, TargetId)
177191
{
@@ -202,17 +216,27 @@ class TStreamCreator: public TActorBootstrapped<TStreamCreator> {
202216
IActor* CreateStreamCreator(TReplication* replication, ui64 targetId, const TActorContext& ctx) {
203217
const auto* target = replication->FindTarget(targetId);
204218
Y_ABORT_UNLESS(target);
219+
220+
const auto& config = replication->GetConfig();
221+
const auto resolvedTimestamps = config.HasStrongConsistency()
222+
? std::make_optional(TDuration::MilliSeconds(config.GetStrongConsistency().GetCommitIntervalMilliSeconds()))
223+
: std::nullopt;
224+
205225
return CreateStreamCreator(ctx.SelfID, replication->GetYdbProxy(),
206226
replication->GetId(), target->GetId(), target->GetKind(),
207227
target->GetSrcPath(), target->GetDstPath(), target->GetStreamName(),
208-
TDuration::Seconds(AppData()->ReplicationConfig.GetRetentionPeriodSeconds()));
228+
TDuration::Seconds(AppData()->ReplicationConfig.GetRetentionPeriodSeconds()), resolvedTimestamps,
229+
AppData()->FeatureFlags.GetEnableTopicAutopartitioningForReplication());
209230
}
210231

211232
IActor* CreateStreamCreator(const TActorId& parent, const TActorId& proxy, ui64 rid, ui64 tid,
212233
TReplication::ETargetKind kind, const TString& srcPath, const TString& dstPath,
213-
const TString& streamName, const TDuration& streamRetentionPeriod)
234+
const TString& streamName, const TDuration& retentionPeriod,
235+
const std::optional<TDuration>& resolvedTimestamps,
236+
bool supportsTopicAutopartitioning)
214237
{
215-
return new TStreamCreator(parent, proxy, rid, tid, kind, srcPath, dstPath, streamName, streamRetentionPeriod);
238+
return new TStreamCreator(parent, proxy, rid, tid, kind, srcPath, dstPath,
239+
streamName, retentionPeriod, resolvedTimestamps, supportsTopicAutopartitioning);
216240
}
217241

218242
}

0 commit comments

Comments
 (0)