From 71a783ddc5f90e265d970d1112fcc888a7ac9570 Mon Sep 17 00:00:00 2001 From: Ilnaz Nizametdinov Date: Tue, 18 Jun 2024 20:21:40 +0300 Subject: [PATCH 1/5] Fix ApplyReplicationChanges (#5683) --- .../tx/datashard/datashard_repl_apply.cpp | 9 +++++++- .../tx/datashard/datashard_ut_replication.cpp | 23 +++++++++++++++++++ 2 files changed, 31 insertions(+), 1 deletion(-) diff --git a/ydb/core/tx/datashard/datashard_repl_apply.cpp b/ydb/core/tx/datashard/datashard_repl_apply.cpp index 48aa0db04e9b..92533d7befde 100644 --- a/ydb/core/tx/datashard/datashard_repl_apply.cpp +++ b/ydb/core/tx/datashard/datashard_repl_apply.cpp @@ -24,13 +24,20 @@ class TDataShard::TTxApplyReplicationChanges : public TTransactionBaseState != TShardState::Ready && !Self->IsReplicated()) { + if (Self->State != TShardState::Ready) { Result = MakeHolder( NKikimrTxDataShard::TEvApplyReplicationChangesResult::STATUS_REJECTED, NKikimrTxDataShard::TEvApplyReplicationChangesResult::REASON_WRONG_STATE); return true; } + if (!Self->IsReplicated()) { + Result = MakeHolder( + NKikimrTxDataShard::TEvApplyReplicationChangesResult::STATUS_REJECTED, + NKikimrTxDataShard::TEvApplyReplicationChangesResult::REASON_BAD_REQUEST); + return true; + } + const auto& msg = Ev->Get()->Record; const auto& tableId = msg.GetTableId(); diff --git a/ydb/core/tx/datashard/datashard_ut_replication.cpp b/ydb/core/tx/datashard/datashard_ut_replication.cpp index e72f6203b329..b0395077e915 100644 --- a/ydb/core/tx/datashard/datashard_ut_replication.cpp +++ b/ydb/core/tx/datashard/datashard_ut_replication.cpp @@ -281,6 +281,29 @@ Y_UNIT_TEST_SUITE(DataShardReplication) { ); } + Y_UNIT_TEST(ApplyChangesToCommonTable) { + TPortManager pm; + TServerSettings serverSettings(pm.GetPort(2134)); + serverSettings.SetDomainName("Root") + .SetUseRealThreads(false); + + Tests::TServer::TPtr server = new TServer(serverSettings); + auto &runtime = *server->GetRuntime(); + auto sender = runtime.AllocateEdgeActor(); + + runtime.SetLogPriority(NKikimrServices::TX_DATASHARD, NLog::PRI_TRACE); + + InitRoot(server, sender); + CreateShardedTable(server, sender, "/Root", "table-1", TShardedTableOptions()); + + auto shards = GetTableShards(server, sender, "/Root/table-1"); + auto tableId = ResolveTableId(server, sender, "/Root/table-1"); + + ApplyChanges(server, shards.at(0), tableId, "my-source", { + TChange{ .Offset = 0, .WriteTxId = 0, .Key = 1, .Value = 11 }, + }, NKikimrTxDataShard::TEvApplyReplicationChangesResult::STATUS_REJECTED); + } + } } // namespace NKikimr From b50ff5630adbf4809713423f7a5e1f2b4328d40f Mon Sep 17 00:00:00 2001 From: Ilnaz Nizametdinov Date: Tue, 18 Jun 2024 21:06:57 +0300 Subject: [PATCH 2/5] Subscribe to pipe state (#5685) --- ydb/core/tx/replication/service/table_writer.cpp | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/ydb/core/tx/replication/service/table_writer.cpp b/ydb/core/tx/replication/service/table_writer.cpp index 345c9f19c632..5a2c8607d7c7 100644 --- a/ydb/core/tx/replication/service/table_writer.cpp +++ b/ydb/core/tx/replication/service/table_writer.cpp @@ -93,7 +93,7 @@ class TTablePartitionWriter: public TActorBootstrapped { event->Record.SetSource(source); } - Send(LeaderPipeCache, new TEvPipeCache::TEvForward(event.Release(), TabletId, false)); + Send(LeaderPipeCache, new TEvPipeCache::TEvForward(event.Release(), TabletId, true, ++SubscribeCookie)); Become(&TThis::StateWaitingStatus); } @@ -133,7 +133,7 @@ class TTablePartitionWriter: public TActorBootstrapped { } void Handle(TEvPipeCache::TEvDeliveryProblem::TPtr& ev) { - if (TabletId == ev->Get()->TabletId) { + if (TabletId == ev->Get()->TabletId && ev->Cookie == SubscribeCookie) { Leave(); } } @@ -188,6 +188,7 @@ class TTablePartitionWriter: public TActorBootstrapped { mutable TMaybe LogPrefix; TActorId LeaderPipeCache; + ui64 SubscribeCookie = 0; TMemoryPool MemoryPool; }; // TTablePartitionWriter From 7ed2702a96eddc7b16d7d4ace8a805d8eadaeea6 Mon Sep 17 00:00:00 2001 From: Ilnaz Nizametdinov Date: Wed, 19 Jun 2024 15:18:53 +0300 Subject: [PATCH 3/5] Delay restart in case of retriable errors (#5708) --- ydb/core/tx/replication/service/table_writer.cpp | 14 ++++++++++++-- 1 file changed, 12 insertions(+), 2 deletions(-) diff --git a/ydb/core/tx/replication/service/table_writer.cpp b/ydb/core/tx/replication/service/table_writer.cpp index 5a2c8607d7c7..e80919c2c31c 100644 --- a/ydb/core/tx/replication/service/table_writer.cpp +++ b/ydb/core/tx/replication/service/table_writer.cpp @@ -117,7 +117,11 @@ class TTablePartitionWriter: public TActorBootstrapped { << ": status# " << static_cast(record.GetStatus()) << ", reason# " << static_cast(record.GetReason()) << ", error# " << record.GetErrorDescription()); - return Leave(IsHardError(record.GetReason())); + if (IsHardError(record.GetReason())) { + return Leave(true); + } else { + return DelayedLeave(); + } } } @@ -134,10 +138,15 @@ class TTablePartitionWriter: public TActorBootstrapped { void Handle(TEvPipeCache::TEvDeliveryProblem::TPtr& ev) { if (TabletId == ev->Get()->TabletId && ev->Cookie == SubscribeCookie) { - Leave(); + DelayedLeave(); } } + void DelayedLeave() { + static constexpr TDuration delay = TDuration::MilliSeconds(50); + this->Schedule(delay, new TEvents::TEvWakeup()); + } + void Leave(bool hardError = false) { LOG_I("Leave" << ": hard error# " << hardError); @@ -177,6 +186,7 @@ class TTablePartitionWriter: public TActorBootstrapped { STATEFN(StateBase) { switch (ev->GetTypeRewrite()) { hFunc(TEvPipeCache::TEvDeliveryProblem, Handle); + sFunc(TEvents::TEvWakeup, Leave); sFunc(TEvents::TEvPoison, PassAway); } } From a78dc02237e9515d9a6fc9352f0eb2d405950ec9 Mon Sep 17 00:00:00 2001 From: Ilnaz Nizametdinov Date: Wed, 19 Jun 2024 16:10:39 +0300 Subject: [PATCH 4/5] Create replica with same partitions as original table (#5712) --- .../tx/replication/controller/dst_creator.cpp | 3 +- .../replication/controller/dst_creator_ut.cpp | 33 +++++++++++++++++++ .../tx/replication/ut_helpers/test_table.cpp | 4 +++ .../tx/replication/ut_helpers/test_table.h | 1 + 4 files changed, 40 insertions(+), 1 deletion(-) diff --git a/ydb/core/tx/replication/controller/dst_creator.cpp b/ydb/core/tx/replication/controller/dst_creator.cpp index 7fa0132512d2..fe7ab716806a 100644 --- a/ydb/core/tx/replication/controller/dst_creator.cpp +++ b/ydb/core/tx/replication/controller/dst_creator.cpp @@ -112,7 +112,8 @@ class TDstCreator: public TActorBootstrapped { if (bootstrap) { GetTableProfiles(); } else { - Send(YdbProxy, new TEvYdbProxy::TEvDescribeTableRequest(SrcPath, {})); + Send(YdbProxy, new TEvYdbProxy::TEvDescribeTableRequest(SrcPath, NYdb::NTable::TDescribeTableSettings() + .WithKeyShardBoundary(true))); } break; } diff --git a/ydb/core/tx/replication/controller/dst_creator_ut.cpp b/ydb/core/tx/replication/controller/dst_creator_ut.cpp index c54e59393ac7..f750e9c7cc08 100644 --- a/ydb/core/tx/replication/controller/dst_creator_ut.cpp +++ b/ydb/core/tx/replication/controller/dst_creator_ut.cpp @@ -80,6 +80,7 @@ Y_UNIT_TEST_SUITE(DstCreator) { }, .ReplicationConfig = Nothing(), })); + env.GetRuntime().Register(CreateDstCreator( env.GetSender(), env.GetSchemeshardId("/Root/Table"), env.GetYdbProxy(), env.GetPathId("/Root"), 1 /* rid */, 1 /* tid */, TReplication::ETargetKind::Table, "/Root/Table", "/Root/Replicated" @@ -93,6 +94,38 @@ Y_UNIT_TEST_SUITE(DstCreator) { UNIT_ASSERT_VALUES_EQUAL(replicatedSelf.GetOwner(), "user@builtin"); } + Y_UNIT_TEST(SamePartitionCount) { + TEnv env; + env.GetRuntime().SetLogPriority(NKikimrServices::REPLICATION_CONTROLLER, NLog::PRI_TRACE); + + env.CreateTable("/Root", *MakeTableDescription({ + .Name = "Table", + .KeyColumns = {"key"}, + .Columns = { + {.Name = "key", .Type = "Uint32"}, + {.Name = "value", .Type = "Utf8"}, + }, + .ReplicationConfig = Nothing(), + .UniformPartitions = 2, + })); + + env.GetRuntime().Register(CreateDstCreator( + env.GetSender(), env.GetSchemeshardId("/Root/Table"), env.GetYdbProxy(), env.GetPathId("/Root"), + 1 /* rid */, 1 /* tid */, TReplication::ETargetKind::Table, "/Root/Table", "/Root/Replicated" + )); + + auto ev = env.GetRuntime().GrabEdgeEvent(env.GetSender()); + UNIT_ASSERT_VALUES_EQUAL(ev->Get()->Status, NKikimrScheme::StatusSuccess); + + auto originalDesc = env.GetDescription("/Root/Table"); + const auto& originalTable = originalDesc.GetPathDescription(); + UNIT_ASSERT_VALUES_EQUAL(originalTable.TablePartitionsSize(), 2); + + auto replicatedDesc = env.GetDescription("/Root/Replicated"); + const auto& replicatedTable = replicatedDesc.GetPathDescription(); + UNIT_ASSERT_VALUES_EQUAL(originalTable.TablePartitionsSize(), replicatedTable.TablePartitionsSize()); + } + Y_UNIT_TEST(NonExistentSrc) { TEnv env; env.GetRuntime().SetLogPriority(NKikimrServices::REPLICATION_CONTROLLER, NLog::PRI_TRACE); diff --git a/ydb/core/tx/replication/ut_helpers/test_table.cpp b/ydb/core/tx/replication/ut_helpers/test_table.cpp index 8460f97181bd..e77b6b5a4a16 100644 --- a/ydb/core/tx/replication/ut_helpers/test_table.cpp +++ b/ydb/core/tx/replication/ut_helpers/test_table.cpp @@ -57,6 +57,10 @@ void TTestTableDescription::SerializeTo(NKikimrSchemeOp::TTableDescription& prot if (ReplicationConfig) { ReplicationConfig->SerializeTo(*proto.MutableReplicationConfig()); } + + if (UniformPartitions) { + proto.SetUniformPartitionsCount(*UniformPartitions); + } } THolder MakeTableDescription(const TTestTableDescription& desc) { diff --git a/ydb/core/tx/replication/ut_helpers/test_table.h b/ydb/core/tx/replication/ut_helpers/test_table.h index ea7083aec960..1df27de901d1 100644 --- a/ydb/core/tx/replication/ut_helpers/test_table.h +++ b/ydb/core/tx/replication/ut_helpers/test_table.h @@ -44,6 +44,7 @@ struct TTestTableDescription { TVector KeyColumns; TVector Columns; TMaybe ReplicationConfig = TReplicationConfig::Default(); + TMaybe UniformPartitions = Nothing(); void SerializeTo(NKikimrSchemeOp::TTableDescription& proto) const; }; From 7aade9f8b8df99940ddcae8bcc7181210a425920 Mon Sep 17 00:00:00 2001 From: Ilnaz Nizametdinov Date: Wed, 19 Jun 2024 18:54:02 +0300 Subject: [PATCH 5/5] Fixed SqlParsingOnly.AlterTableAddIndexWithIsNotSupported --- ydb/library/yql/sql/v1/sql_ut.cpp | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/ydb/library/yql/sql/v1/sql_ut.cpp b/ydb/library/yql/sql/v1/sql_ut.cpp index 88a519410dcf..82387a20781d 100644 --- a/ydb/library/yql/sql/v1/sql_ut.cpp +++ b/ydb/library/yql/sql/v1/sql_ut.cpp @@ -2395,7 +2395,7 @@ Y_UNIT_TEST_SUITE(SqlParsingOnly) { Y_UNIT_TEST(AlterTableAddIndexWithIsNotSupported) { ExpectFailWithError("USE plato; ALTER TABLE table ADD INDEX idx LOCAL WITH (a=b, c=d, e=f) ON (col)", - "
:1:40: Error: local: alternative is not implemented yet: 714:7: local_index\n"); + "
:1:40: Error: local: alternative is not implemented yet: 715:7: local_index\n"); } Y_UNIT_TEST(OptionalAliases) {