Skip to content

Commit acb72f2

Browse files
stanislav-shchetininKamil Khamitov
authored and
Kamil Khamitov
committed
Import changefeed's consumers from s3 (ydb-platform#14780)
1 parent 36cecdb commit acb72f2

9 files changed

+185
-8
lines changed

ydb/core/tx/schemeshard/schemeshard_import__create.cpp

Lines changed: 65 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -541,6 +541,21 @@ struct TSchemeShard::TImport::TTxProgress: public TSchemeShard::TXxport::TTxBase
541541
Send(Self->SelfId(), CreateChangefeedPropose(Self, txId, item));
542542
}
543543

544+
void CreateConsumers(TImportInfo::TPtr importInfo, ui32 itemIdx, TTxId txId) {
545+
Y_ABORT_UNLESS(itemIdx < importInfo->Items.size());
546+
auto& item = importInfo->Items.at(itemIdx);
547+
item.SubState = ESubState::Proposed;
548+
549+
LOG_I("TImport::TTxProgress: CreateConsumers propose"
550+
<< ": info# " << importInfo->ToString()
551+
<< ", item# " << item.ToString(itemIdx)
552+
<< ", txId# " << txId);
553+
554+
Y_ABORT_UNLESS(item.WaitTxId == InvalidTxId);
555+
556+
Send(Self->SelfId(), CreateConsumersPropose(Self, txId, item));
557+
}
558+
544559
void AllocateTxId(TImportInfo::TPtr importInfo, ui32 itemIdx) {
545560
Y_ABORT_UNLESS(itemIdx < importInfo->Items.size());
546561
auto& item = importInfo->Items.at(itemIdx);
@@ -622,6 +637,26 @@ struct TSchemeShard::TImport::TTxProgress: public TSchemeShard::TXxport::TTxBase
622637
return path->LastTxId;
623638
}
624639

640+
TTxId GetActiveCreateConsumerTxId(TImportInfo::TPtr importInfo, ui32 itemIdx) {
641+
Y_ABORT_UNLESS(itemIdx < importInfo->Items.size());
642+
const auto& item = importInfo->Items.at(itemIdx);
643+
644+
Y_ABORT_UNLESS(item.State == EState::CreateChangefeed);
645+
Y_ABORT_UNLESS(item.ChangefeedState == TImportInfo::TItem::EChangefeedState::CreateConsumers);
646+
Y_ABORT_UNLESS(item.StreamImplPathId);
647+
648+
if (!Self->PathsById.contains(item.StreamImplPathId)) {
649+
return InvalidTxId;
650+
}
651+
652+
auto path = Self->PathsById.at(item.StreamImplPathId);
653+
if (path->PathState != NKikimrSchemeOp::EPathStateAlter) {
654+
return InvalidTxId;
655+
}
656+
657+
return path->LastTxId;
658+
}
659+
625660
static TString MakeIndexBuildUid(TImportInfo::TPtr importInfo, ui32 itemIdx) {
626661
Y_ABORT_UNLESS(itemIdx < importInfo->Items.size());
627662
const auto& item = importInfo->Items.at(itemIdx);
@@ -816,10 +851,6 @@ struct TSchemeShard::TImport::TTxProgress: public TSchemeShard::TXxport::TTxBase
816851
TTxId txId = InvalidTxId;
817852

818853
switch (item.State) {
819-
case EState::CreateChangefeed:
820-
txId = GetActiveCreateChangefeedTxId(importInfo, itemIdx);
821-
break;
822-
823854
case EState::Transferring:
824855
if (!CancelTransferring(importInfo, itemIdx)) {
825856
txId = GetActiveRestoreTxId(importInfo, itemIdx);
@@ -1045,7 +1076,11 @@ struct TSchemeShard::TImport::TTxProgress: public TSchemeShard::TXxport::TTxBase
10451076
break;
10461077

10471078
case EState::CreateChangefeed:
1048-
CreateChangefeed(importInfo, i, txId);
1079+
if (item.ChangefeedState == TImportInfo::TItem::EChangefeedState::CreateChangefeed) {
1080+
CreateChangefeed(importInfo, i, txId);
1081+
} else {
1082+
CreateConsumers(importInfo, i, txId);
1083+
}
10491084
itemIdx = i;
10501085
break;
10511086

@@ -1109,11 +1144,30 @@ struct TSchemeShard::TImport::TTxProgress: public TSchemeShard::TXxport::TTxBase
11091144
} else if (item.State == EState::Transferring) {
11101145
txId = GetActiveRestoreTxId(importInfo, itemIdx);
11111146
} else if (item.State == EState::CreateChangefeed) {
1112-
txId = GetActiveCreateChangefeedTxId(importInfo, itemIdx);
1147+
if (item.ChangefeedState == TImportInfo::TItem::EChangefeedState::CreateChangefeed) {
1148+
txId = GetActiveCreateChangefeedTxId(importInfo, itemIdx);
1149+
} else {
1150+
txId = GetActiveCreateConsumerTxId(importInfo, itemIdx);
1151+
}
1152+
11131153
}
11141154
}
11151155

11161156
if (txId == InvalidTxId) {
1157+
1158+
if (record.GetStatus() == NKikimrScheme::StatusAlreadyExists && item.State == EState::CreateChangefeed) {
1159+
if (item.ChangefeedState == TImportInfo::TItem::EChangefeedState::CreateChangefeed) {
1160+
item.ChangefeedState = TImportInfo::TItem::EChangefeedState::CreateConsumers;
1161+
AllocateTxId(importInfo, itemIdx);
1162+
} else if (++item.NextChangefeedIdx < item.Changefeeds.GetChangefeeds().size()) {
1163+
item.ChangefeedState = TImportInfo::TItem::EChangefeedState::CreateChangefeed;
1164+
AllocateTxId(importInfo, itemIdx);
1165+
} else {
1166+
item.State = EState::Done;
1167+
}
1168+
return;
1169+
}
1170+
11171171
return CancelAndPersist(db, importInfo, itemIdx, record.GetReason(), "unhappy propose");
11181172
}
11191173

@@ -1290,7 +1344,11 @@ struct TSchemeShard::TImport::TTxProgress: public TSchemeShard::TXxport::TTxBase
12901344
break;
12911345

12921346
case EState::CreateChangefeed:
1293-
if (++item.NextChangefeedIdx < item.Changefeeds.GetChangefeeds().size()) {
1347+
if (item.ChangefeedState == TImportInfo::TItem::EChangefeedState::CreateChangefeed) {
1348+
item.ChangefeedState = TImportInfo::TItem::EChangefeedState::CreateConsumers;
1349+
AllocateTxId(importInfo, itemIdx);
1350+
} else if (++item.NextChangefeedIdx < item.Changefeeds.GetChangefeeds().size()) {
1351+
item.ChangefeedState = TImportInfo::TItem::EChangefeedState::CreateChangefeed;
12941352
AllocateTxId(importInfo, itemIdx);
12951353
} else {
12961354
item.State = EState::Done;

ydb/core/tx/schemeshard/schemeshard_import_flow_proposals.cpp

Lines changed: 53 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -294,5 +294,58 @@ THolder<TEvSchemeShard::TEvModifySchemeTransaction> CreateChangefeedPropose(
294294
return propose;
295295
}
296296

297+
THolder<TEvSchemeShard::TEvModifySchemeTransaction> CreateConsumersPropose(
298+
TSchemeShard* ss,
299+
TTxId txId,
300+
TImportInfo::TItem& item
301+
) {
302+
Y_ABORT_UNLESS(item.NextChangefeedIdx < item.Changefeeds.GetChangefeeds().size());
303+
304+
const auto& importChangefeedTopic = item.Changefeeds.GetChangefeeds()[item.NextChangefeedIdx];
305+
const auto& topic = importChangefeedTopic.GetTopic();
306+
307+
auto propose = MakeHolder<TEvSchemeShard::TEvModifySchemeTransaction>(ui64(txId), ss->TabletID());
308+
auto& record = propose->Record;
309+
auto& modifyScheme = *record.AddTransaction();
310+
modifyScheme.SetOperationType(NKikimrSchemeOp::EOperationType::ESchemeOpAlterPersQueueGroup);
311+
auto& pqGroup = *modifyScheme.MutableAlterPersQueueGroup();
312+
313+
const TPath dstPath = TPath::Init(item.DstPathId, ss);
314+
const TString changefeedPath = dstPath.PathString() + "/" + importChangefeedTopic.GetChangefeed().name();
315+
modifyScheme.SetWorkingDir(changefeedPath);
316+
modifyScheme.SetInternal(true);
317+
318+
pqGroup.SetName("streamImpl");
319+
320+
NKikimrSchemeOp::TDescribeOptions opts;
321+
opts.SetReturnPartitioningInfo(false);
322+
opts.SetReturnPartitionConfig(true);
323+
opts.SetReturnBoundaries(true);
324+
opts.SetReturnIndexTableBoundaries(true);
325+
opts.SetShowPrivateTable(true);
326+
auto describeSchemeResult = DescribePath(ss, TlsActivationContext->AsActorContext(),changefeedPath + "/streamImpl", opts);
327+
328+
const auto& response = describeSchemeResult->GetRecord().GetPathDescription();
329+
item.StreamImplPathId = {response.GetSelf().GetSchemeshardId(), response.GetSelf().GetPathId()};
330+
pqGroup.CopyFrom(response.GetPersQueueGroup());
331+
332+
pqGroup.ClearTotalGroupCount();
333+
pqGroup.MutablePQTabletConfig()->ClearPartitionKeySchema();
334+
335+
auto* tabletConfig = pqGroup.MutablePQTabletConfig();
336+
const auto& pqConfig = AppData()->PQConfig;
337+
338+
for (const auto& consumer : topic.consumers()) {
339+
auto& addedConsumer = *tabletConfig->AddConsumers();
340+
auto consumerName = NPersQueue::ConvertNewConsumerName(consumer.name(), pqConfig);
341+
addedConsumer.SetName(consumerName);
342+
if (consumer.important()) {
343+
addedConsumer.SetImportant(true);
344+
}
345+
}
346+
347+
return propose;
348+
}
349+
297350
} // NSchemeShard
298351
} // NKikimr

ydb/core/tx/schemeshard/schemeshard_import_flow_proposals.h

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -52,5 +52,11 @@ THolder<TEvSchemeShard::TEvModifySchemeTransaction> CreateChangefeedPropose(
5252
const TImportInfo::TItem& item
5353
);
5454

55+
THolder<TEvSchemeShard::TEvModifySchemeTransaction> CreateConsumersPropose(
56+
TSchemeShard* ss,
57+
TTxId txId,
58+
TImportInfo::TItem& item
59+
);
60+
5561
} // NSchemeShard
5662
} // NKikimr

ydb/core/tx/schemeshard/schemeshard_info_types.h

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2846,6 +2846,11 @@ struct TImportInfo: public TSimpleRefCount<TImportInfo> {
28462846
Subscribed,
28472847
};
28482848

2849+
enum class EChangefeedState: ui8 {
2850+
CreateChangefeed = 0,
2851+
CreateConsumers,
2852+
};
2853+
28492854
TString DstPathName;
28502855
TPathId DstPathId;
28512856
Ydb::Table::CreateTableRequest Scheme;
@@ -2857,13 +2862,15 @@ struct TImportInfo: public TSimpleRefCount<TImportInfo> {
28572862

28582863
EState State = EState::GetScheme;
28592864
ESubState SubState = ESubState::AllocateTxId;
2865+
EChangefeedState ChangefeedState = EChangefeedState::CreateChangefeed;
28602866
TTxId WaitTxId = InvalidTxId;
28612867
TActorId SchemeGetter;
28622868
TActorId SchemeQueryExecutor;
28632869
int NextIndexIdx = 0;
28642870
int NextChangefeedIdx = 0;
28652871
TString Issue;
28662872
int ViewCreationRetries = 0;
2873+
TPathId StreamImplPathId;
28672874

28682875
TItem() = default;
28692876

ydb/core/tx/schemeshard/schemeshard_path_describer.cpp

Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1308,6 +1308,29 @@ THolder<TEvSchemeShard::TEvDescribeSchemeResultBuilder> DescribePath(
13081308
return DescribePath(self, ctx, pathId, options);
13091309
}
13101310

1311+
THolder<TEvSchemeShard::TEvDescribeSchemeResultBuilder> DescribePath(
1312+
TSchemeShard* self,
1313+
const TActorContext& ctx,
1314+
const TString& path,
1315+
const NKikimrSchemeOp::TDescribeOptions& opts
1316+
) {
1317+
NKikimrSchemeOp::TDescribePath params;
1318+
params.SetPath(path);
1319+
params.MutableOptions()->CopyFrom(opts);
1320+
1321+
return TPathDescriber(self, std::move(params)).Describe(ctx);
1322+
}
1323+
1324+
THolder<TEvSchemeShard::TEvDescribeSchemeResultBuilder> DescribePath(
1325+
TSchemeShard* self,
1326+
const TActorContext& ctx,
1327+
const TString& path
1328+
) {
1329+
NKikimrSchemeOp::TDescribeOptions options;
1330+
options.SetShowPrivateTable(true);
1331+
return DescribePath(self, ctx, path, options);
1332+
}
1333+
13111334
void TSchemeShard::DescribeTable(
13121335
const TTableInfo& tableInfo,
13131336
const NScheme::TTypeRegistry* typeRegistry,

ydb/core/tx/schemeshard/schemeshard_path_describer.h

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -83,5 +83,18 @@ THolder<TEvSchemeShard::TEvDescribeSchemeResultBuilder> DescribePath(
8383
TPathId pathId
8484
);
8585

86+
THolder<TEvSchemeShard::TEvDescribeSchemeResultBuilder> DescribePath(
87+
TSchemeShard* self,
88+
const TActorContext& ctx,
89+
const TString& path,
90+
const NKikimrSchemeOp::TDescribeOptions& opts
91+
);
92+
93+
THolder<TEvSchemeShard::TEvDescribeSchemeResultBuilder> DescribePath(
94+
TSchemeShard* self,
95+
const TActorContext& ctx,
96+
const TString& path
97+
);
98+
8699
} // NSchemeShard
87100
} // NKikimr

ydb/core/tx/schemeshard/ut_helpers/ls_checks.cpp

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -968,6 +968,19 @@ TCheckFunc RetentionPeriod(const TDuration& value) {
968968
};
969969
}
970970

971+
TCheckFunc ConsumerExist(const TString& name) {
972+
return [=] (const NKikimrScheme::TEvDescribeSchemeResult& record) {
973+
bool isExist = false;
974+
for (const auto& consumer : record.GetPathDescription().GetPersQueueGroup().GetPQTabletConfig().GetConsumers()) {
975+
if (consumer.GetName() == name) {
976+
isExist = true;
977+
break;
978+
}
979+
}
980+
UNIT_ASSERT(isExist);
981+
};
982+
}
983+
971984
void NoChildren(const NKikimrScheme::TEvDescribeSchemeResult& record) {
972985
ChildrenCount(0)(record);
973986
}

ydb/core/tx/schemeshard/ut_helpers/ls_checks.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -169,6 +169,7 @@ namespace NLs {
169169
TCheckFunc StreamAwsRegion(const TString& value);
170170
TCheckFunc StreamInitialScanProgress(ui32 total, ui32 completed);
171171
TCheckFunc RetentionPeriod(const TDuration& value);
172+
TCheckFunc ConsumerExist(const TString& name);
172173

173174
TCheckFunc HasBackupInFly(ui64 txId);
174175
void NoBackupInFly(const NKikimrScheme::TEvDescribeSchemeResult& record);

ydb/core/tx/schemeshard/ut_restore/ut_restore.cpp

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5067,7 +5067,10 @@ Y_UNIT_TEST_SUITE(TImportTests) {
50675067
{changefeedPath, GenerateTestData({EPathTypeCdcStream, changefeedDesc, std::move(attr)})},
50685068
[changefeedPath = TString(changefeedPath)](TTestBasicRuntime& runtime) {
50695069
TestDescribeResult(DescribePath(runtime, "/MyRoot/Table" + changefeedPath, false, false, true), {
5070-
NLs::PathExist
5070+
NLs::PathExist,
5071+
});
5072+
TestDescribeResult(DescribePath(runtime, "/MyRoot/Table" + changefeedPath + "/streamImpl", false, false, true), {
5073+
NLs::ConsumerExist("my_consumer")
50715074
});
50725075
}
50735076
};

0 commit comments

Comments
 (0)