From cf49baa6798d53bbca6a64f12bcb05576ccc10fd Mon Sep 17 00:00:00 2001 From: Aleksei Borzenkov Date: Fri, 15 Mar 2024 13:54:37 +0000 Subject: [PATCH] Fix missing locks on read iterator empty result elision. Fixes #2765. --- .../tx/datashard/datashard__read_iterator.cpp | 4 + .../datashard/datashard_ut_read_iterator.cpp | 78 +++++++++++++++++++ 2 files changed, 82 insertions(+) diff --git a/ydb/core/tx/datashard/datashard__read_iterator.cpp b/ydb/core/tx/datashard/datashard__read_iterator.cpp index bfdc952e1ace..1fd915755040 100644 --- a/ydb/core/tx/datashard/datashard__read_iterator.cpp +++ b/ydb/core/tx/datashard/datashard__read_iterator.cpp @@ -665,6 +665,10 @@ class TReader { } } + if (record.TxLocksSize() > 0 || record.BrokenTxLocksSize() > 0) { + useful = true; + } + Self->IncCounter(COUNTER_READ_ITERATOR_ROWS_READ, RowsRead); if (!isKeysRequest) { Self->IncCounter(COUNTER_ENGINE_HOST_SELECT_RANGE_ROW_SKIPS, DeletedRowSkips); diff --git a/ydb/core/tx/datashard/datashard_ut_read_iterator.cpp b/ydb/core/tx/datashard/datashard_ut_read_iterator.cpp index d4237d6e6680..95e02823a185 100644 --- a/ydb/core/tx/datashard/datashard_ut_read_iterator.cpp +++ b/ydb/core/tx/datashard/datashard_ut_read_iterator.cpp @@ -23,6 +23,7 @@ namespace NKikimr { using namespace NKikimr::NDataShard; +using namespace NKikimr::NDataShard::NKqpHelpers; using namespace NSchemeShard; using namespace Tests; @@ -4167,6 +4168,83 @@ Y_UNIT_TEST_SUITE(DataShardReadIteratorPageFaults) { // We should be able to drop table WaitTxNotification(server, AsyncDropTable(server, sender, "/Root", "table-1")); } + + Y_UNIT_TEST(LocksNotLostOnPageFault) { + TPortManager pm; + NFake::TCaches caches; + caches.Shared = 1 /* bytes */; + TServerSettings serverSettings(pm.GetPort(2134)); + serverSettings.SetDomainName("Root") + .SetUseRealThreads(false) + .SetCacheParams(caches); + TServer::TPtr server = new TServer(serverSettings); + + auto& runtime = *server->GetRuntime(); + auto sender = runtime.AllocateEdgeActor(); + + runtime.SetLogPriority(NKikimrServices::TX_DATASHARD, NLog::PRI_TRACE); + + InitRoot(server, sender); + + TDisableDataShardLogBatching disableDataShardLogBatching; + + // Use a policy that forces very small page sizes, effectively making each row on its own page + NLocalDb::TCompactionPolicyPtr policy = NLocalDb::CreateDefaultTablePolicy(); + policy->MinDataPageSize = 1; + + auto opts = TShardedTableOptions() + .Columns({{"key", "Int32", true, false}, + {"index", "Int32", true, false}, + {"value", "Int32", false, false}}) + .Policy(policy.Get()) + .ExecutorCacheSize(1 /* byte */); + auto [shards, tableId] = CreateShardedTable(server, sender, "/Root", "table-1", opts); + + ExecSQL(server, sender, "UPSERT INTO `/Root/table-1` (key, index, value) VALUES (1, 0, 10), (3, 0, 30), (5, 0, 50), (7, 0, 70), (9, 0, 90);"); + runtime.SimulateSleep(TDuration::Seconds(1)); + + const auto shard1 = shards.at(0); + CompactTable(runtime, shard1, tableId, false); + RebootTablet(runtime, shard1, sender); + runtime.SimulateSleep(TDuration::Seconds(1)); + + // Start a write transaction that has uncommitted write to key (2, 0) + // This is because read iterator measures "work" in processed/skipped rows, so we have to give it something + TString writeSessionId, writeTxId; + UNIT_ASSERT_VALUES_EQUAL( + KqpSimpleBegin(runtime, writeSessionId, writeTxId, R"( + UPSERT INTO `/Root/table-1` (key, index, value) VALUES (2, 0, 20), (4, 0, 40); + + SELECT key, index, value FROM `/Root/table-1` + WHERE key = 2 + ORDER BY key, index; + )"), + "{ items { int32_value: 2 } items { int32_value: 0 } items { int32_value: 20 } }"); + + // Start a read transaction with several range read in a specific order + // The first two prefixes don't exist (nothing committed yet) + // The other two prefixes are supposed to page fault + TString sessionId, txId; + UNIT_ASSERT_VALUES_EQUAL( + KqpSimpleBegin(runtime, sessionId, txId, R"( + SELECT key, index, value FROM `/Root/table-1` + WHERE key IN (2, 4, 7, 9) + ORDER BY key, index; + )"), + "{ items { int32_value: 7 } items { int32_value: 0 } items { int32_value: 70 } }, " + "{ items { int32_value: 9 } items { int32_value: 0 } items { int32_value: 90 } }"); + + // Commit the first transaction, it must succeed + UNIT_ASSERT_VALUES_EQUAL( + KqpSimpleCommit(runtime, writeSessionId, writeTxId, "SELECT 1;"), + "{ items { int32_value: 1 } }"); + + // Commit the second transaction with a new upsert, it must not succeed + UNIT_ASSERT_VALUES_EQUAL( + KqpSimpleCommit(runtime, sessionId, txId, + "UPSERT INTO `/Root/table-1` (key, index, value) VALUES (2, 0, 22);"), + "ERROR: ABORTED"); + } } } // namespace NKikimr