Skip to content

Commit c797692

Browse files
nshestakovblinkov
authored andcommitted
Test for describe a transfer with errors (#15404)
1 parent df4eb3e commit c797692

File tree

2 files changed

+97
-30
lines changed

2 files changed

+97
-30
lines changed

ydb/core/tx/replication/service/transfer_writer.cpp

Lines changed: 19 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -402,7 +402,7 @@ class TTransferWriter
402402
switch (ev->GetTypeRewrite()) {
403403
hFunc(TEvTxProxySchemeCache::TEvNavigateKeySetResult, Handle);
404404

405-
hFunc(TEvWorker::TEvHandshake, HoldHandle);
405+
hFunc(TEvWorker::TEvHandshake, Handle);
406406
hFunc(TEvWorker::TEvData, HoldHandle);
407407
sFunc(TEvents::TEvWakeup, GetTableScheme);
408408
sFunc(TEvents::TEvPoison, PassAway);
@@ -501,7 +501,7 @@ class TTransferWriter
501501
switch (ev->GetTypeRewrite()) {
502502
hFunc(NFq::TEvRowDispatcher::TEvPurecalcCompileResponse, Handle);
503503

504-
hFunc(TEvWorker::TEvHandshake, HoldHandle);
504+
hFunc(TEvWorker::TEvHandshake, Handle);
505505
hFunc(TEvWorker::TEvData, HoldHandle);
506506
//sFunc(TEvents::TEvWakeup, SendS3Request);
507507
sFunc(TEvents::TEvPoison, PassAway);
@@ -545,11 +545,6 @@ class TTransferWriter
545545
void StartWork() {
546546
Become(&TThis::StateWork);
547547

548-
if (PendingWorker) {
549-
ProcessWorker(*PendingWorker);
550-
PendingWorker.reset();
551-
}
552-
553548
if (PendingRecords) {
554549
ProcessData(PendingPartitionId, *PendingRecords);
555550
PendingRecords.reset();
@@ -565,21 +560,16 @@ class TTransferWriter
565560
}
566561
}
567562

568-
void HoldHandle(TEvWorker::TEvHandshake::TPtr& ev) {
569-
Y_ABORT_UNLESS(!PendingWorker);
570-
PendingWorker = ev->Sender;
571-
}
572-
573563
void Handle(TEvWorker::TEvHandshake::TPtr& ev) {
574-
ProcessWorker(ev->Sender);
575-
}
576-
577-
void ProcessWorker(const TActorId& worker) {
578-
Worker = worker;
564+
Worker = ev->Sender;
579565
LOG_D("Handshake"
580566
<< ": worker# " << Worker);
581567

582-
Send(Worker, new TEvWorker::TEvHandshake());
568+
if (ProcessingError) {
569+
Leave(ProcessingErrorStatus, *ProcessingError);
570+
} else {
571+
Send(Worker, new TEvWorker::TEvHandshake());
572+
}
583573
}
584574

585575
void HoldHandle(TEvWorker::TEvData::TPtr& ev) {
@@ -632,7 +622,7 @@ class TTransferWriter
632622
STFUNC(StateWrite) {
633623
switch (ev->GetTypeRewrite()) {
634624
hFunc(TEvents::TEvCompleted, Handle);
635-
hFunc(TEvWorker::TEvHandshake, HoldHandle);
625+
hFunc(TEvWorker::TEvHandshake, Handle);
636626
hFunc(TEvWorker::TEvData, HoldHandle);
637627

638628
sFunc(TEvents::TEvPoison, PassAway);
@@ -688,12 +678,16 @@ class TTransferWriter
688678
this->Schedule(Delay + random, new TEvents::TEvWakeup());
689679
}
690680

691-
template <typename... Args>
692-
void Leave(Args&&... args) {
681+
void Leave(TEvWorker::TEvGone::EStatus status, const TString& message) {
693682
LOG_I("Leave");
694683

695-
Send(Worker, new TEvWorker::TEvGone(std::forward<Args>(args)...));
696-
PassAway();
684+
if (Worker) {
685+
Send(Worker, new TEvWorker::TEvGone(status, message));
686+
PassAway();
687+
} else {
688+
ProcessingErrorStatus = status;
689+
ProcessingError = message;
690+
}
697691
}
698692

699693
void PassAway() override {
@@ -726,9 +720,10 @@ class TTransferWriter
726720
TProgramHolder::TPtr ProgramHolder;
727721

728722
mutable TMaybe<TString> LogPrefix;
723+
724+
mutable TEvWorker::TEvGone::EStatus ProcessingErrorStatus;
729725
mutable TMaybe<TString> ProcessingError;
730726

731-
std::optional<TActorId> PendingWorker;
732727
ui32 PendingPartitionId = 0;
733728
std::optional<TVector<TTopicMessage>> PendingRecords;
734729

ydb/tests/functional/transfer/main.cpp

Lines changed: 78 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -147,13 +147,13 @@ struct MainTestCase {
147147
UNIT_ASSERT_C(res.IsSuccess(), res.GetIssues().ToString());
148148
}
149149

150-
void CreateTopic() {
150+
void CreateTopic(size_t partitionCount = 10) {
151151
auto res = Session.ExecuteQuery(Sprintf(R"(
152152
CREATE TOPIC `%s`
153153
WITH (
154-
min_active_partitions = 10
154+
min_active_partitions = %d
155155
);
156-
)", TopicName.data()), TTxControl::NoTx()).GetValueSync();
156+
)", TopicName.data(), partitionCount), TTxControl::NoTx()).GetValueSync();
157157
UNIT_ASSERT_C(res.IsSuccess(), res.GetIssues().ToString());
158158
}
159159

@@ -194,7 +194,7 @@ struct MainTestCase {
194194
TDescribeReplicationSettings settings;
195195
settings.IncludeStats(true);
196196

197-
return client.DescribeReplication(TString("/") + GetEnv("YDB_DATABASE") + "/" + TransferName, settings);
197+
return client.DescribeReplication(TString("/") + GetEnv("YDB_DATABASE") + "/" + TransferName, settings).ExtractValueSync();
198198
}
199199

200200
auto DescribeTopic() {
@@ -828,14 +828,14 @@ Y_UNIT_TEST_SUITE(Transfer)
828828
});
829829

830830
{
831-
auto result = testCase.DescribeTransfer().ExtractValueSync();
831+
auto result = testCase.DescribeTransfer();
832832
UNIT_ASSERT_C(result.IsSuccess(), result.GetIssues().ToOneLineString());
833833
}
834834

835835
testCase.DropTransfer();
836836

837837
{
838-
auto result = testCase.DescribeTransfer().ExtractValueSync();
838+
auto result = testCase.DescribeTransfer();
839839
UNIT_ASSERT_C(!result.IsSuccess(), result.GetIssues().ToOneLineString());
840840
UNIT_ASSERT_VALUES_EQUAL(EStatus::SCHEME_ERROR, result.GetStatus());
841841
}
@@ -897,5 +897,77 @@ Y_UNIT_TEST_SUITE(Transfer)
897897
}
898898
}
899899

900+
Y_UNIT_TEST(DescribeError_OnLambdaCompilation)
901+
{
902+
MainTestCase testCase;
903+
testCase.CreateTable(R"(
904+
CREATE TABLE `%s` (
905+
Key Uint64 NOT NULL,
906+
Message Utf8 NOT NULL,
907+
PRIMARY KEY (Key)
908+
) WITH (
909+
STORE = COLUMN
910+
);
911+
)");
912+
913+
testCase.CreateTopic(1);
914+
testCase.CreateTransfer(R"(
915+
$l = ($x) -> {
916+
return $x._unknown_field_for_lambda_compilation_error;
917+
};
918+
)");
919+
920+
for (size_t i = 20; i--;) {
921+
auto result = testCase.DescribeTransfer().GetReplicationDescription();
922+
if (TReplicationDescription::EState::Error == result.GetState()) {
923+
Cerr << ">>>>> " << result.GetErrorState().GetIssues().ToOneLineString() << Endl << Flush;
924+
UNIT_ASSERT(result.GetErrorState().GetIssues().ToOneLineString().contains("_unknown_field_for_lambda_compilation_error"));
925+
break;
926+
}
927+
928+
UNIT_ASSERT_C(i, "Unable to wait transfer error");
929+
Sleep(TDuration::Seconds(1));
930+
}
931+
}
932+
933+
Y_UNIT_TEST(DescribeError_OnWriteToShard)
934+
{
935+
MainTestCase testCase;
936+
testCase.CreateTable(R"(
937+
CREATE TABLE `%s` (
938+
Key Uint64 NOT NULL,
939+
Message Utf8,
940+
PRIMARY KEY (Key)
941+
) WITH (
942+
STORE = COLUMN
943+
);
944+
)");
945+
946+
testCase.CreateTopic(1);
947+
testCase.CreateTransfer(R"(
948+
$l = ($x) -> {
949+
return [
950+
<|
951+
Key:null,
952+
Message:CAST($x._data AS Utf8)
953+
|>
954+
];
955+
};
956+
)");
957+
958+
testCase.Write({"message-1"});
959+
960+
for (size_t i = 20; i--;) {
961+
auto result = testCase.DescribeTransfer().GetReplicationDescription();
962+
if (TReplicationDescription::EState::Error == result.GetState()) {
963+
Cerr << ">>>>> " << result.GetErrorState().GetIssues().ToOneLineString() << Endl << Flush;
964+
UNIT_ASSERT(result.GetErrorState().GetIssues().ToOneLineString().contains("Cannot write data into shard"));
965+
break;
966+
}
967+
968+
UNIT_ASSERT_C(i, "Unable to wait transfer error");
969+
Sleep(TDuration::Seconds(1));
970+
}
971+
}
900972
}
901973

0 commit comments

Comments
 (0)