Skip to content

Commit 7e40ab5

Browse files
nshestakovblinkov
authored andcommitted
Alter transfer from a topic to a table (#15024)
1 parent ba93767 commit 7e40ab5

15 files changed

+205
-41
lines changed

ydb/core/tx/replication/controller/dst_alterer.cpp

Lines changed: 9 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -43,12 +43,13 @@ class TDstAlterer: public TActorBootstrapped<TDstAlterer> {
4343
switch (Kind) {
4444
case TReplication::ETargetKind::Table:
4545
case TReplication::ETargetKind::IndexTable:
46-
case TReplication::ETargetKind::Transfer:
4746
tx.SetOperationType(NKikimrSchemeOp::ESchemeOpAlterTable);
4847
DstPathId.ToProto(tx.MutableAlterTable()->MutablePathId());
4948
tx.MutableAlterTable()->MutableReplicationConfig()->SetMode(
5049
NKikimrSchemeOp::TTableReplicationConfig::REPLICATION_MODE_NONE);
5150
break;
51+
case TReplication::ETargetKind::Transfer:
52+
break;
5253
}
5354

5455
Send(PipeCache, new TEvPipeCache::TEvForward(ev.Release(), SchemeShardId, true));
@@ -153,7 +154,13 @@ class TDstAlterer: public TActorBootstrapped<TDstAlterer> {
153154
if (!DstPathId) {
154155
Success();
155156
} else {
156-
AllocateTxId();
157+
switch (Kind) {
158+
case TReplication::ETargetKind::Table:
159+
case TReplication::ETargetKind::IndexTable:
160+
return AllocateTxId();
161+
case TReplication::ETargetKind::Transfer:
162+
return Success();
163+
}
157164
}
158165
}
159166

ydb/core/tx/replication/controller/event_util.cpp

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -51,7 +51,7 @@ THolder<TEvService::TEvRunWorker> MakeRunWorkerEv(
5151
break;
5252
}
5353
case TReplication::ETargetKind::Transfer: {
54-
auto p = std::dynamic_pointer_cast<TTargetTransfer::TTransferConfig>(config);
54+
auto p = std::dynamic_pointer_cast<const TTargetTransfer::TTransferConfig>(config);
5555
auto& writerSettings = *record.MutableCommand()->MutableTransferWriter();
5656
dstPathId.ToProto(writerSettings.MutablePathId());
5757
writerSettings.SetTransformLambda(p->GetTransformLambda());

ydb/core/tx/replication/controller/replication.cpp

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -228,6 +228,7 @@ class TReplication::TImpl: public TLagProvider {
228228
NKikimrReplication::TReplicationConfig Config;
229229
EState State = EState::Ready;
230230
TString Issue;
231+
EState DesiredState = EState::Ready;
231232
ui64 NextTargetId = 1;
232233
THashMap<ui64, TTarget> Targets;
233234
THashSet<ui64> PendingAlterTargets;
@@ -329,6 +330,14 @@ const TString& TReplication::GetIssue() const {
329330
return Impl->Issue;
330331
}
331332

333+
TReplication::EState TReplication::GetDesiredState() const {
334+
return Impl->DesiredState;
335+
}
336+
337+
void TReplication::SetDesiredState(EState state) {
338+
Impl->DesiredState = state;
339+
}
340+
332341
void TReplication::SetNextTargetId(ui64 value) {
333342
Impl->NextTargetId = value;
334343
}

ydb/core/tx/replication/controller/replication.h

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -56,7 +56,7 @@ class TReplication: public TSimpleRefCount<TReplication> {
5656
class ITarget {
5757
public:
5858
struct IConfig {
59-
using TPtr = std::shared_ptr<IConfig>;
59+
using TPtr = std::shared_ptr<const IConfig>;
6060

6161
virtual ~IConfig() = default;
6262

@@ -98,6 +98,8 @@ class TReplication: public TSimpleRefCount<TReplication> {
9898
virtual void Progress(const TActorContext& ctx) = 0;
9999
virtual void Shutdown(const TActorContext& ctx) = 0;
100100

101+
virtual void UpdateConfig(const NKikimrReplication::TReplicationConfig&) = 0;
102+
101103
protected:
102104
virtual IActor* CreateWorkerRegistar(const TActorContext& ctx) const = 0;
103105
};
@@ -135,6 +137,8 @@ class TReplication: public TSimpleRefCount<TReplication> {
135137
const NKikimrReplication::TReplicationConfig& GetConfig() const;
136138
void SetState(EState state, TString issue = {});
137139
EState GetState() const;
140+
EState GetDesiredState() const;
141+
void SetDesiredState(EState state);
138142
const TString& GetIssue() const;
139143
const TMaybe<TDuration> GetLag() const;
140144

ydb/core/tx/replication/controller/schema.h

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -25,9 +25,10 @@ struct TControllerSchema: NIceDb::Schema {
2525
struct State: Column<5, NScheme::NTypeIds::Uint8> { using Type = TReplication::EState; };
2626
struct Issue: Column<6, NScheme::NTypeIds::Utf8> {};
2727
struct NextTargetId: Column<7, NScheme::NTypeIds::Uint64> { static constexpr Type Default = 1; };
28+
struct DesiredState: Column<8, NScheme::NTypeIds::Uint8> { using Type = TReplication::EState; };
2829

2930
using TKey = TableKey<Id>;
30-
using TColumns = TableColumns<Id, PathOwnerId, PathLocalId, Config, State, Issue, NextTargetId>;
31+
using TColumns = TableColumns<Id, PathOwnerId, PathLocalId, Config, State, Issue, NextTargetId, DesiredState>;
3132
};
3233

3334
struct Targets: Table<3> {

ydb/core/tx/replication/controller/target_base.cpp

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -73,6 +73,9 @@ void TTargetBase::SetDstState(const EDstState value) {
7373
return Replication->AddPendingAlterTarget(Id);
7474
case EDstState::Done:
7575
return Replication->RemovePendingAlterTarget(Id);
76+
case EDstState::Ready:
77+
PendingRemoveWorkers = false;
78+
break;
7679
default:
7780
break;
7881
}
@@ -196,4 +199,7 @@ void TTargetBase::Shutdown(const TActorContext& ctx) {
196199
}
197200
}
198201

202+
void TTargetBase::UpdateConfig(const NKikimrReplication::TReplicationConfig&) {
203+
}
204+
199205
}

ydb/core/tx/replication/controller/target_base.h

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -71,11 +71,12 @@ class TTargetBase
7171
void Progress(const TActorContext& ctx) override;
7272
void Shutdown(const TActorContext& ctx) override;
7373

74+
void UpdateConfig(const NKikimrReplication::TReplicationConfig&) override;
75+
7476
private:
7577
TReplication* const Replication;
7678
const ui64 Id;
7779
const ETargetKind Kind;
78-
const IConfig::TPtr Config;
7980

8081
EDstState DstState = EDstState::Creating;
8182
TPathId DstPathId;
@@ -90,6 +91,9 @@ class TTargetBase
9091
THashMap<ui64, TWorker> Workers;
9192
bool PendingRemoveWorkers = false;
9293

94+
protected:
95+
IConfig::TPtr Config;
96+
9397
}; // TTargetBase
9498

9599
}

ydb/core/tx/replication/controller/target_discoverer_ut.cpp

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -124,7 +124,7 @@ Y_UNIT_TEST_SUITE(TargetDiscoverer) {
124124
UNIT_ASSERT_VALUES_EQUAL(toAdd.at(0).Config->GetSrcPath(), "/Root/Topic");
125125
UNIT_ASSERT_VALUES_EQUAL(toAdd.at(0).Config->GetDstPath(), "/Root/Replicated/Table");
126126
UNIT_ASSERT_VALUES_EQUAL(toAdd.at(0).Kind, TReplication::ETargetKind::Transfer);
127-
auto p = std::dynamic_pointer_cast<TTargetTransfer::TTransferConfig>(toAdd.at(0).Config);
127+
auto p = std::dynamic_pointer_cast<const TTargetTransfer::TTransferConfig>(toAdd.at(0).Config);
128128
UNIT_ASSERT(p);
129129
UNIT_ASSERT_VALUES_EQUAL(p->GetTransformLambda(), "lambda body");
130130
}

ydb/core/tx/replication/controller/target_table.cpp

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -141,6 +141,14 @@ TTargetTransfer::TTargetTransfer(TReplication* replication, ui64 id, const IConf
141141
{
142142
}
143143

144+
void TTargetTransfer::UpdateConfig(const NKikimrReplication::TReplicationConfig& cfg) {
145+
auto& t = cfg.GetTransferSpecific().GetTargets(0);
146+
Config = std::make_shared<TTargetTransfer::TTransferConfig>(
147+
GetConfig()->GetSrcPath(),
148+
GetConfig()->GetDstPath(),
149+
t.GetTransformLambda());
150+
}
151+
144152
TString TTargetTransfer::BuildStreamPath() const {
145153
return CanonizePath(GetSrcPath());
146154
}

ydb/core/tx/replication/controller/target_table.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -66,6 +66,7 @@ class TTargetTransfer: public TTargetTableBase {
6666
explicit TTargetTransfer(TReplication* replication,
6767
ui64 id, const IConfig::TPtr& config);
6868

69+
void UpdateConfig(const NKikimrReplication::TReplicationConfig&) override;
6970

7071
protected:
7172
TString BuildStreamPath() const override;

ydb/core/tx/replication/controller/tx_alter_dst_result.cpp

Lines changed: 31 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@ namespace NKikimr::NReplication::NController {
44

55
class TController::TTxAlterDstResult: public TTxBase {
66
TEvPrivate::TEvAlterDstResult::TPtr Ev;
7+
TReplication::TPtr Replication;
78

89
public:
910
explicit TTxAlterDstResult(TController* self, TEvPrivate::TEvAlterDstResult::TPtr& ev)
@@ -22,14 +23,14 @@ class TController::TTxAlterDstResult: public TTxBase {
2223
const auto rid = Ev->Get()->ReplicationId;
2324
const auto tid = Ev->Get()->TargetId;
2425

25-
auto replication = Self->Find(rid);
26-
if (!replication) {
26+
Replication = Self->Find(rid);
27+
if (!Replication) {
2728
CLOG_W(ctx, "Unknown replication"
2829
<< ": rid# " << rid);
2930
return true;
3031
}
3132

32-
auto* target = replication->FindTarget(tid);
33+
auto* target = Replication->FindTarget(tid);
3334
if (!target) {
3435
CLOG_W(ctx, "Unknown target"
3536
<< ": rid# " << rid
@@ -46,24 +47,28 @@ class TController::TTxAlterDstResult: public TTxBase {
4647
}
4748

4849
if (Ev->Get()->IsSuccess()) {
49-
target->SetDstState(TReplication::EDstState::Done);
50+
target->SetDstState(NextState(Replication->GetDesiredState()));
51+
target->UpdateConfig(Replication->GetConfig());
5052

5153
CLOG_N(ctx, "Target dst altered"
5254
<< ": rid# " << rid
5355
<< ", tid# " << tid);
5456

55-
if (replication->CheckAlterDone()) {
57+
if (Replication->CheckAlterDone()) {
5658
CLOG_N(ctx, "Replication altered"
5759
<< ": rid# " << rid);
58-
replication->SetState(TReplication::EState::Done);
60+
Replication->SetState(Replication->GetDesiredState());
61+
if (Replication->GetState() != TReplication::EState::Ready) {
62+
Replication.Reset();
63+
}
5964
}
6065
} else {
6166
target->SetDstState(TReplication::EDstState::Error);
6267
target->SetIssue(TStringBuilder() << "Alter dst error"
6368
<< ": " << NKikimrScheme::EStatus_Name(Ev->Get()->Status)
6469
<< ", " << Ev->Get()->Error);
6570

66-
replication->SetState(TReplication::EState::Error, TStringBuilder() << "Error in target #" << target->GetId()
71+
Replication->SetState(TReplication::EState::Error, TStringBuilder() << "Error in target #" << target->GetId()
6772
<< ": " << target->GetIssue());
6873

6974
CLOG_E(ctx, "Alter dst error"
@@ -75,8 +80,8 @@ class TController::TTxAlterDstResult: public TTxBase {
7580

7681
NIceDb::TNiceDb db(txc.DB);
7782
db.Table<Schema::Replications>().Key(rid).Update(
78-
NIceDb::TUpdate<Schema::Replications::State>(replication->GetState()),
79-
NIceDb::TUpdate<Schema::Replications::Issue>(replication->GetIssue())
83+
NIceDb::TUpdate<Schema::Replications::State>(Replication->GetState()),
84+
NIceDb::TUpdate<Schema::Replications::Issue>(Replication->GetIssue())
8085
);
8186
db.Table<Schema::Targets>().Key(rid, tid).Update(
8287
NIceDb::TUpdate<Schema::Targets::DstState>(target->GetDstState()),
@@ -86,8 +91,25 @@ class TController::TTxAlterDstResult: public TTxBase {
8691
return true;
8792
}
8893

94+
TReplication::EDstState NextState(TReplication::EState state) {
95+
switch (state) {
96+
case TReplication::EState::Done:
97+
return TReplication::EDstState::Done;
98+
case TReplication::EState::Ready:
99+
return TReplication::EDstState::Ready;
100+
case TReplication::EState::Error:
101+
return TReplication::EDstState::Error;
102+
case TReplication::EState::Removing:
103+
return TReplication::EDstState::Removing;
104+
}
105+
}
106+
89107
void Complete(const TActorContext& ctx) override {
90108
CLOG_D(ctx, "Complete");
109+
110+
if (Replication) {
111+
Replication->Progress(ctx);
112+
}
91113
}
92114

93115
}; // TTxAlterDstResult

ydb/core/tx/replication/controller/tx_alter_replication.cpp

Lines changed: 31 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -37,27 +37,48 @@ class TController::TTxAlterReplication: public TTxBase {
3737
return true;
3838
}
3939

40+
bool alter = false;
41+
42+
const auto& oldConfig = Replication->GetConfig();
43+
const auto& newConfig = record.GetConfig();
44+
45+
if (oldConfig.HasTransferSpecific()) {
46+
auto& oldLambda = oldConfig.GetTransferSpecific().GetTargets(0).GetTransformLambda();
47+
auto& newLambda = newConfig.GetTransferSpecific().GetTargets(0).GetTransformLambda();
48+
49+
alter = oldLambda != newLambda;
50+
}
51+
52+
auto desiredState = Replication->GetState();
53+
if (record.HasSwitchState()) {
54+
switch (record.GetSwitchState().GetStateCase()) {
55+
case NKikimrReplication::TReplicationState::kDone:
56+
desiredState = TReplication::EState::Done;
57+
alter = true;
58+
break;
59+
default:
60+
Y_ABORT("Invalid state");
61+
}
62+
}
63+
64+
if (alter) {
65+
Replication->SetDesiredState(desiredState);
66+
}
67+
4068
Replication->SetConfig(std::move(*record.MutableConfig()));
4169
NIceDb::TNiceDb db(txc.DB);
4270
db.Table<Schema::Replications>().Key(Replication->GetId()).Update(
43-
NIceDb::TUpdate<Schema::Replications::Config>(record.GetConfig().SerializeAsString())
71+
NIceDb::TUpdate<Schema::Replications::Config>(record.GetConfig().SerializeAsString()),
72+
NIceDb::TUpdate<Schema::Replications::DesiredState>(desiredState)
4473
);
4574

46-
if (!record.HasSwitchState()) {
75+
if (!alter) {
4776
Result->Record.SetStatus(NKikimrReplication::TEvAlterReplicationResult::SUCCESS);
4877
return true;
4978
}
5079

51-
switch (record.GetSwitchState().GetStateCase()) {
52-
case NKikimrReplication::TReplicationState::kDone:
53-
break;
54-
default:
55-
Y_ABORT("Invalid state");
56-
}
57-
5880
Result->Record.SetStatus(NKikimrReplication::TEvAlterReplicationResult::SUCCESS);
5981

60-
bool alter = false;
6182
for (ui64 tid = 0; tid < Replication->GetNextTargetId(); ++tid) {
6283
auto* target = Replication->FindTarget(tid);
6384
if (!target) {

ydb/core/tx/replication/controller/tx_discovery_targets_result.cpp

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -47,7 +47,7 @@ class TController::TTxDiscoveryTargetsResult: public TTxBase {
4747
const auto tid = Replication->AddTarget(target.Kind, target.Config);
4848

4949
TString transformLambda;
50-
if (auto p = std::dynamic_pointer_cast<TTargetTransfer::TTransferConfig>(target.Config)) {
50+
if (auto p = std::dynamic_pointer_cast<const TTargetTransfer::TTransferConfig>(target.Config)) {
5151
transformLambda = p->GetTransformLambda();
5252
}
5353

ydb/core/tx/replication/controller/tx_init.cpp

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -53,10 +53,12 @@ class TController::TTxInit: public TTxBase {
5353
const auto state = rowset.GetValue<Schema::Replications::State>();
5454
const auto issue = rowset.GetValue<Schema::Replications::Issue>();
5555
const auto nextTid = rowset.GetValue<Schema::Replications::NextTargetId>();
56+
const auto desiredState = rowset.GetValue<Schema::Replications::DesiredState>();
5657

5758
auto replication = Self->Add(rid, pathId, config);
5859
replication->SetState(state, issue);
5960
replication->SetNextTargetId(nextTid);
61+
replication->SetDesiredState(desiredState);
6062

6163
if (!rowset.Next()) {
6264
return false;

0 commit comments

Comments
 (0)