Skip to content

Fix missing locks on read iterator empty result elision. Fixes #2765. #2825

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Mar 15, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions ydb/core/tx/datashard/datashard__read_iterator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -665,6 +665,10 @@ class TReader {
}
}

if (record.TxLocksSize() > 0 || record.BrokenTxLocksSize() > 0) {
useful = true;
}

Self->IncCounter(COUNTER_READ_ITERATOR_ROWS_READ, RowsRead);
if (!isKeysRequest) {
Self->IncCounter(COUNTER_ENGINE_HOST_SELECT_RANGE_ROW_SKIPS, DeletedRowSkips);
Expand Down
78 changes: 78 additions & 0 deletions ydb/core/tx/datashard/datashard_ut_read_iterator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
namespace NKikimr {

using namespace NKikimr::NDataShard;
using namespace NKikimr::NDataShard::NKqpHelpers;
using namespace NSchemeShard;
using namespace Tests;

Expand Down Expand Up @@ -4167,6 +4168,83 @@ Y_UNIT_TEST_SUITE(DataShardReadIteratorPageFaults) {
// We should be able to drop table
WaitTxNotification(server, AsyncDropTable(server, sender, "/Root", "table-1"));
}

Y_UNIT_TEST(LocksNotLostOnPageFault) {
TPortManager pm;
NFake::TCaches caches;
caches.Shared = 1 /* bytes */;
TServerSettings serverSettings(pm.GetPort(2134));
serverSettings.SetDomainName("Root")
.SetUseRealThreads(false)
.SetCacheParams(caches);
TServer::TPtr server = new TServer(serverSettings);

auto& runtime = *server->GetRuntime();
auto sender = runtime.AllocateEdgeActor();

runtime.SetLogPriority(NKikimrServices::TX_DATASHARD, NLog::PRI_TRACE);

InitRoot(server, sender);

TDisableDataShardLogBatching disableDataShardLogBatching;

// Use a policy that forces very small page sizes, effectively making each row on its own page
NLocalDb::TCompactionPolicyPtr policy = NLocalDb::CreateDefaultTablePolicy();
policy->MinDataPageSize = 1;

auto opts = TShardedTableOptions()
.Columns({{"key", "Int32", true, false},
{"index", "Int32", true, false},
{"value", "Int32", false, false}})
.Policy(policy.Get())
.ExecutorCacheSize(1 /* byte */);
auto [shards, tableId] = CreateShardedTable(server, sender, "/Root", "table-1", opts);

ExecSQL(server, sender, "UPSERT INTO `/Root/table-1` (key, index, value) VALUES (1, 0, 10), (3, 0, 30), (5, 0, 50), (7, 0, 70), (9, 0, 90);");
runtime.SimulateSleep(TDuration::Seconds(1));

const auto shard1 = shards.at(0);
CompactTable(runtime, shard1, tableId, false);
RebootTablet(runtime, shard1, sender);
runtime.SimulateSleep(TDuration::Seconds(1));

// Start a write transaction that has uncommitted write to key (2, 0)
// This is because read iterator measures "work" in processed/skipped rows, so we have to give it something
TString writeSessionId, writeTxId;
UNIT_ASSERT_VALUES_EQUAL(
KqpSimpleBegin(runtime, writeSessionId, writeTxId, R"(
UPSERT INTO `/Root/table-1` (key, index, value) VALUES (2, 0, 20), (4, 0, 40);

SELECT key, index, value FROM `/Root/table-1`
WHERE key = 2
ORDER BY key, index;
)"),
"{ items { int32_value: 2 } items { int32_value: 0 } items { int32_value: 20 } }");

// Start a read transaction with several range read in a specific order
// The first two prefixes don't exist (nothing committed yet)
// The other two prefixes are supposed to page fault
TString sessionId, txId;
UNIT_ASSERT_VALUES_EQUAL(
KqpSimpleBegin(runtime, sessionId, txId, R"(
SELECT key, index, value FROM `/Root/table-1`
WHERE key IN (2, 4, 7, 9)
ORDER BY key, index;
)"),
"{ items { int32_value: 7 } items { int32_value: 0 } items { int32_value: 70 } }, "
"{ items { int32_value: 9 } items { int32_value: 0 } items { int32_value: 90 } }");

// Commit the first transaction, it must succeed
UNIT_ASSERT_VALUES_EQUAL(
KqpSimpleCommit(runtime, writeSessionId, writeTxId, "SELECT 1;"),
"{ items { int32_value: 1 } }");

// Commit the second transaction with a new upsert, it must not succeed
UNIT_ASSERT_VALUES_EQUAL(
KqpSimpleCommit(runtime, sessionId, txId,
"UPSERT INTO `/Root/table-1` (key, index, value) VALUES (2, 0, 22);"),
"ERROR: ABORTED");
}
}

} // namespace NKikimr