Skip to content

Commit 9a073ec

Browse files
committed
Fix missing locks on read iterator empty result elision. Fixes ydb-platform#2765. (ydb-platform#2825)
1 parent cceacdc commit 9a073ec

File tree

2 files changed

+82
-0
lines changed

2 files changed

+82
-0
lines changed

ydb/core/tx/datashard/datashard__read_iterator.cpp

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -665,6 +665,10 @@ class TReader {
665665
}
666666
}
667667

668+
if (record.TxLocksSize() > 0 || record.BrokenTxLocksSize() > 0) {
669+
useful = true;
670+
}
671+
668672
Self->IncCounter(COUNTER_READ_ITERATOR_ROWS_READ, RowsRead);
669673
if (!isKeysRequest) {
670674
Self->IncCounter(COUNTER_ENGINE_HOST_SELECT_RANGE_ROW_SKIPS, DeletedRowSkips);

ydb/core/tx/datashard/datashard_ut_read_iterator.cpp

Lines changed: 78 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121
namespace NKikimr {
2222

2323
using namespace NKikimr::NDataShard;
24+
using namespace NKikimr::NDataShard::NKqpHelpers;
2425
using namespace NSchemeShard;
2526
using namespace Tests;
2627

@@ -3689,6 +3690,83 @@ Y_UNIT_TEST_SUITE(DataShardReadIteratorPageFaults) {
36893690
// We should be able to drop table
36903691
WaitTxNotification(server, AsyncDropTable(server, sender, "/Root", "table-1"));
36913692
}
3693+
3694+
Y_UNIT_TEST(LocksNotLostOnPageFault) {
3695+
TPortManager pm;
3696+
NFake::TCaches caches;
3697+
caches.Shared = 1 /* bytes */;
3698+
TServerSettings serverSettings(pm.GetPort(2134));
3699+
serverSettings.SetDomainName("Root")
3700+
.SetUseRealThreads(false)
3701+
.SetCacheParams(caches);
3702+
TServer::TPtr server = new TServer(serverSettings);
3703+
3704+
auto& runtime = *server->GetRuntime();
3705+
auto sender = runtime.AllocateEdgeActor();
3706+
3707+
runtime.SetLogPriority(NKikimrServices::TX_DATASHARD, NLog::PRI_TRACE);
3708+
3709+
InitRoot(server, sender);
3710+
3711+
TDisableDataShardLogBatching disableDataShardLogBatching;
3712+
3713+
// Use a policy that forces very small page sizes, effectively making each row on its own page
3714+
NLocalDb::TCompactionPolicyPtr policy = NLocalDb::CreateDefaultTablePolicy();
3715+
policy->MinDataPageSize = 1;
3716+
3717+
auto opts = TShardedTableOptions()
3718+
.Columns({{"key", "Int32", true, false},
3719+
{"index", "Int32", true, false},
3720+
{"value", "Int32", false, false}})
3721+
.Policy(policy.Get())
3722+
.ExecutorCacheSize(1 /* byte */);
3723+
auto [shards, tableId] = CreateShardedTable(server, sender, "/Root", "table-1", opts);
3724+
3725+
ExecSQL(server, sender, "UPSERT INTO `/Root/table-1` (key, index, value) VALUES (1, 0, 10), (3, 0, 30), (5, 0, 50), (7, 0, 70), (9, 0, 90);");
3726+
runtime.SimulateSleep(TDuration::Seconds(1));
3727+
3728+
const auto shard1 = shards.at(0);
3729+
CompactTable(runtime, shard1, tableId, false);
3730+
RebootTablet(runtime, shard1, sender);
3731+
runtime.SimulateSleep(TDuration::Seconds(1));
3732+
3733+
// Start a write transaction that has uncommitted write to key (2, 0)
3734+
// This is because read iterator measures "work" in processed/skipped rows, so we have to give it something
3735+
TString writeSessionId, writeTxId;
3736+
UNIT_ASSERT_VALUES_EQUAL(
3737+
KqpSimpleBegin(runtime, writeSessionId, writeTxId, R"(
3738+
UPSERT INTO `/Root/table-1` (key, index, value) VALUES (2, 0, 20), (4, 0, 40);
3739+
3740+
SELECT key, index, value FROM `/Root/table-1`
3741+
WHERE key = 2
3742+
ORDER BY key, index;
3743+
)"),
3744+
"{ items { int32_value: 2 } items { int32_value: 0 } items { int32_value: 20 } }");
3745+
3746+
// Start a read transaction with several range read in a specific order
3747+
// The first two prefixes don't exist (nothing committed yet)
3748+
// The other two prefixes are supposed to page fault
3749+
TString sessionId, txId;
3750+
UNIT_ASSERT_VALUES_EQUAL(
3751+
KqpSimpleBegin(runtime, sessionId, txId, R"(
3752+
SELECT key, index, value FROM `/Root/table-1`
3753+
WHERE key IN (2, 4, 7, 9)
3754+
ORDER BY key, index;
3755+
)"),
3756+
"{ items { int32_value: 7 } items { int32_value: 0 } items { int32_value: 70 } }, "
3757+
"{ items { int32_value: 9 } items { int32_value: 0 } items { int32_value: 90 } }");
3758+
3759+
// Commit the first transaction, it must succeed
3760+
UNIT_ASSERT_VALUES_EQUAL(
3761+
KqpSimpleCommit(runtime, writeSessionId, writeTxId, "SELECT 1;"),
3762+
"{ items { int32_value: 1 } }");
3763+
3764+
// Commit the second transaction with a new upsert, it must not succeed
3765+
UNIT_ASSERT_VALUES_EQUAL(
3766+
KqpSimpleCommit(runtime, sessionId, txId,
3767+
"UPSERT INTO `/Root/table-1` (key, index, value) VALUES (2, 0, 22);"),
3768+
"ERROR: ABORTED");
3769+
}
36923770
}
36933771

36943772
} // namespace NKikimr

0 commit comments

Comments
 (0)