Skip to content

Commit f7167a8

Browse files
authored
Merge a8416a5 into bc5b8c1
2 parents bc5b8c1 + a8416a5 commit f7167a8

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

67 files changed

+456
-2329
lines changed

ydb/core/client/locks_ut.cpp

+1-1
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@
22
#include <ydb/core/engine/mkql_engine_flat_impl.h>
33
#include <ydb/core/testlib/test_client.h>
44
#include <ydb/core/tx/tx_proxy/proxy.h>
5-
#include <ydb/core/tx/datashard/datashard_locks.h>
5+
#include <ydb/core/tx/locks/locks.h>
66
#include <ydb/public/lib/deprecated/kicli/kicli.h>
77

88
#include <library/cpp/testing/unittest/tests_data.h>

ydb/core/engine/minikql/minikql_engine_host.cpp

+1-1
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@
66
#include <ydb/library/yql/minikql/computation/mkql_custom_list.h>
77
#include <ydb/library/yql/minikql/mkql_string_util.h>
88
#include <ydb/library/yql/parser/pg_wrapper/interface/codec.h>
9-
#include <ydb/core/tx/datashard/sys_tables.h>
9+
#include <ydb/core/tx/locks/sys_tables.h>
1010

1111
#include <library/cpp/containers/stack_vector/stack_vec.h>
1212

ydb/core/kqp/runtime/kqp_stream_lookup_worker.h

+1-1
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@
44
#include <ydb/library/yql/minikql/mkql_node.h>
55
#include <ydb/library/yql/minikql/computation/mkql_computation_node_holders.h>
66
#include <ydb/core/scheme/scheme_tabledefs.h>
7-
#include <ydb/core/tx/datashard/sys_tables.h>
7+
#include <ydb/core/tx/locks/sys_tables.h>
88
#include <ydb/core/tx/datashard/datashard.h>
99

1010
namespace NKikimr {

ydb/core/kqp/ut/opt/kqp_ne_ut.cpp

+1
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@
22

33
#include <ydb/public/sdk/cpp/client/ydb_proto/accessor.h>
44
#include <ydb/core/kqp/runtime/kqp_read_actor.h>
5+
#include <ydb/core/tx/datashard/datashard_impl.h>
56

67
namespace NKikimr::NKqp {
78

ydb/core/kqp/ut/scan/kqp_scan_ut.cpp

+1
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
#include <ydb/core/kqp/ut/common/kqp_ut_common.h>
22
#include <ydb/core/kqp/counters/kqp_counters.h>
33
#include <ydb/core/tx/datashard/datashard_failpoints.h>
4+
#include <ydb/core/tx/datashard/datashard_impl.h>
45
#include <ydb/core/tx/scheme_cache/scheme_cache.h>
56

67
#include <ydb/library/yql/dq/actors/compute/dq_compute_actor.h>

ydb/core/kqp/ut/scan/kqp_split_ut.cpp

+1
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
#include <ydb/core/kqp/ut/common/kqp_ut_common.h>
22
#include <ydb/core/kqp/counters/kqp_counters.h>
33
#include <ydb/core/tx/scheme_cache/scheme_cache.h>
4+
#include <ydb/core/tx/datashard/datashard_impl.h>
45

56
#include <ydb/core/base/tablet_pipecache.h>
67
#include <ydb/core/kqp/runtime/kqp_read_actor.h>

ydb/core/sys_view/common/schema.h

+1-1
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@
33
#include "path.h"
44

55
#include <ydb/core/tablet_flat/flat_cxx_database.h>
6-
#include <ydb/core/tx/datashard/sys_tables.h>
6+
#include <ydb/core/tx/locks/sys_tables.h>
77

88
namespace NKikimr {
99
namespace NSysView {

ydb/core/tx/columnshard/columnshard__init.cpp

+19
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@
44
#include "columnshard_schema.h"
55
#include "hooks/abstract/abstract.h"
66
#include <ydb/core/tx/columnshard/blobs_action/blob_manager_db.h>
7+
#include <ydb/core/tx/columnshard/transactions/locks_db.h>
78

89
#include <ydb/core/tablet/tablet_exception.h>
910
#include <ydb/core/tx/columnshard/operations/write.h>
@@ -36,6 +37,7 @@ void TTxInit::SetDefaults() {
3637
Self->LastWriteId = TWriteId{0};
3738
Self->LastPlannedStep = 0;
3839
Self->LastPlannedTxId = 0;
40+
Self->LastCompletedTx = NOlap::TSnapshot::Zero();
3941
Self->OwnerPathId = 0;
4042
Self->OwnerPath.clear();
4143
Self->LongTxWrites.clear();
@@ -71,6 +73,14 @@ bool TTxInit::Precharge(TTransactionContext& txc) {
7173
ready = ready && Schema::GetSpecialValue(db, Schema::EValueIds::OwnerPathId, Self->OwnerPathId);
7274
ready = ready && Schema::GetSpecialValue(db, Schema::EValueIds::OwnerPath, Self->OwnerPath);
7375

76+
{
77+
ui64 lastCompletedStep = 0;
78+
ui64 lastCompletedTx = 0;
79+
ready = ready && Schema::GetSpecialValue(db, Schema::EValueIds::LastCompletedStep, lastCompletedStep);
80+
ready = ready && Schema::GetSpecialValue(db, Schema::EValueIds::LastCompletedTxId, lastCompletedTx);
81+
Self->LastCompletedTx = NOlap::TSnapshot(lastCompletedStep, lastCompletedTx);
82+
}
83+
7484
if (!ready) {
7585
return false;
7686
}
@@ -177,6 +187,15 @@ bool TTxInit::ReadEverything(TTransactionContext& txc, const TActorContext& ctx)
177187
}
178188
}
179189
}
190+
{
191+
TMemoryProfileGuard g("TTxInit/LocksDB");
192+
if (txc.DB.GetScheme().GetTableInfo(Schema::Locks::TableId)) {
193+
TColumnShardLocksDb locksDb(*Self, txc);
194+
if (!Self->SysLocks.Load(locksDb)) {
195+
return false;
196+
}
197+
}
198+
}
180199

181200
Self->UpdateInsertTableCounters();
182201
Self->UpdateIndexCounters();

ydb/core/tx/columnshard/columnshard__progress_tx.cpp

+10
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,12 @@ class TColumnShard::TTxProgressTx : public TTransactionBase<TColumnShard> {
3131
if (!!plannedItem) {
3232
ui64 step = plannedItem->PlanStep;
3333
ui64 txId = plannedItem->TxId;
34+
LastCompletedTx = NOlap::TSnapshot(step, txId);
35+
if (LastCompletedTx > Self->LastCompletedTx) {
36+
NIceDb::TNiceDb db(txc.DB);
37+
Schema::SaveSpecialValue(db, Schema::EValueIds::LastCompletedStep, LastCompletedTx->GetPlanStep());
38+
Schema::SaveSpecialValue(db, Schema::EValueIds::LastCompletedTxId, LastCompletedTx->GetTxId());
39+
}
3440

3541
TxOperator = Self->ProgressTxController->GetVerifiedTxOperator(txId);
3642
AFL_VERIFY(TxOperator->Progress(*Self, NOlap::TSnapshot(step, txId), txc));
@@ -50,12 +56,16 @@ class TColumnShard::TTxProgressTx : public TTransactionBase<TColumnShard> {
5056
if (TxOperator) {
5157
TxOperator->Complete(*Self, ctx);
5258
}
59+
if (LastCompletedTx) {
60+
Self->LastCompletedTx = std::max(*LastCompletedTx, Self->LastCompletedTx);
61+
}
5362
Self->SetupIndexation();
5463
}
5564

5665
private:
5766
TTxController::ITransactionOperatior::TPtr TxOperator;
5867
const ui32 TabletTxNo;
68+
std::optional<NOlap::TSnapshot> LastCompletedTx;
5969
};
6070

6171
void TColumnShard::EnqueueProgressTx(const TActorContext& ctx) {

ydb/core/tx/columnshard/columnshard_impl.cpp

+1
Original file line numberDiff line numberDiff line change
@@ -93,6 +93,7 @@ TColumnShard::TColumnShard(TTabletStorageInfo* info, const TActorId& tablet)
9393
, ScanCounters("Scan")
9494
, WritesMonitor(*this)
9595
, NormalizerController(StoragesManager, SubscribeCounters)
96+
, SysLocks(this)
9697
{
9798
TabletCountersPtr.reset(new TProtobufTabletCounters<
9899
ESimpleCounters_descriptor,

ydb/core/tx/columnshard/columnshard_impl.h

+34-4
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@
2525
#include <ydb/core/tx/tiering/manager.h>
2626
#include <ydb/core/tx/time_cast/time_cast.h>
2727
#include <ydb/core/tx/tx_processing.h>
28+
#include <ydb/core/tx/locks/locks.h>
2829
#include <ydb/services/metadata/service.h>
2930

3031
namespace NKikimr::NOlap {
@@ -206,10 +207,6 @@ class TColumnShard
206207
TabletCounters->Cumulative()[counter].Increment(num);
207208
}
208209

209-
void IncCounter(NColumnShard::EPercentileCounters counter, const TDuration& latency) const {
210-
TabletCounters->Percentile()[counter].IncrementFor(latency.MicroSeconds());
211-
}
212-
213210
void ActivateTiering(const ui64 pathId, const TString& useTiering, const bool onTabletInit = false);
214211
void OnTieringModified();
215212
public:
@@ -220,6 +217,37 @@ class TColumnShard
220217
None /* "none" */
221218
};
222219

220+
void IncCounter(NColumnShard::EPercentileCounters counter, const TDuration& latency) const {
221+
TabletCounters->Percentile()[counter].IncrementFor(latency.MicroSeconds());
222+
}
223+
224+
void IncCounter(NDataShard::ESimpleCounters counter, ui64 num = 1) const {
225+
TabletCounters->Simple()[counter].Add(num);
226+
}
227+
228+
// For syslocks
229+
void IncCounter(NDataShard::ECumulativeCounters counter, ui64 num = 1) const {
230+
TabletCounters->Cumulative()[counter].Increment(num);
231+
}
232+
233+
void IncCounter(NDataShard::EPercentileCounters counter, ui64 num) const {
234+
TabletCounters->Percentile()[counter].IncrementFor(num);
235+
}
236+
237+
void IncCounter(NDataShard::EPercentileCounters counter, const TDuration& latency) const {
238+
TabletCounters->Percentile()[counter].IncrementFor(latency.MilliSeconds());
239+
}
240+
241+
inline TRowVersion LastCompleteTxVersion() const {
242+
return TRowVersion(LastCompletedTx.GetPlanStep(), LastCompletedTx.GetTxId());
243+
}
244+
245+
ui32 Generation() const { return Executor()->Generation(); }
246+
247+
bool IsUserTable(const TTableId&) const {
248+
return true;
249+
}
250+
223251
private:
224252
void OverloadWriteFail(const EOverloadStatus overloadReason, const NEvWrite::TWriteData& writeData, std::unique_ptr<NActors::IEventBase>&& event, const TActorContext& ctx);
225253
EOverloadStatus CheckOverloaded(const ui64 tableId) const;
@@ -360,6 +388,7 @@ class TColumnShard
360388
TWriteId LastWriteId = TWriteId{0};
361389
ui64 LastPlannedStep = 0;
362390
ui64 LastPlannedTxId = 0;
391+
NOlap::TSnapshot LastCompletedTx = NOlap::TSnapshot::Zero();
363392
ui64 LastExportNo = 0;
364393

365394
ui64 OwnerPathId = 0;
@@ -412,6 +441,7 @@ class TColumnShard
412441
TLimits Limits;
413442
TCompactionLimits CompactionLimits;
414443
NOlap::TNormalizationController NormalizerController;
444+
NDataShard::TSysLocks SysLocks;
415445

416446
void TryRegisterMediatorTimeCast();
417447
void UnregisterMediatorTimeCast();

ydb/core/tx/columnshard/columnshard_schema.h

+47-1
Original file line numberDiff line numberDiff line change
@@ -38,7 +38,11 @@ struct Schema : NIceDb::Schema {
3838
ColumnsTableId,
3939
CountersTableId,
4040
OperationsTableId,
41-
IndexesTableId
41+
IndexesTableId,
42+
LocksTableId,
43+
LockRangesTableId,
44+
LockConflictsTableId,
45+
LockVolatileDependenciesTableId
4246
};
4347

4448
enum class ETierTables: ui32 {
@@ -60,6 +64,8 @@ struct Schema : NIceDb::Schema {
6064
LastExportNumber = 10,
6165
OwnerPathId = 11,
6266
OwnerPath = 12,
67+
LastCompletedStep = 13,
68+
LastCompletedTxId = 14,
6369
};
6470

6571
enum class EInsertTableIds : ui8 {
@@ -305,6 +311,46 @@ struct Schema : NIceDb::Schema {
305311
using TColumns = TableColumns<PathId, PortionId, IndexId, ChunkIdx, Blob, Offset, Size, RecordsCount, RawBytes>;
306312
};
307313

314+
struct Locks : Table<LocksTableId> {
315+
struct LockId : Column<1, NScheme::NTypeIds::Uint64> {};
316+
struct LockNodeId : Column<2, NScheme::NTypeIds::Uint32> {};
317+
struct Generation : Column<3, NScheme::NTypeIds::Uint32> {};
318+
struct Counter : Column<4, NScheme::NTypeIds::Uint64> {};
319+
struct CreateTimestamp : Column<5, NScheme::NTypeIds::Uint64> {};
320+
struct Flags : Column<6, NScheme::NTypeIds::Uint64> {};
321+
322+
using TKey = TableKey<LockId>;
323+
using TColumns = TableColumns<LockId, LockNodeId, Generation, Counter, CreateTimestamp, Flags>;
324+
};
325+
326+
struct LockRanges : Table<LockRangesTableId> {
327+
struct LockId : Column<1, NScheme::NTypeIds::Uint64> {};
328+
struct RangeId : Column<2, NScheme::NTypeIds::Uint64> {};
329+
struct PathOwnerId : Column<3, NScheme::NTypeIds::Uint64> {};
330+
struct LocalPathId : Column<4, NScheme::NTypeIds::Uint64> {};
331+
struct Flags : Column<5, NScheme::NTypeIds::Uint64> {};
332+
struct Data : Column<6, NScheme::NTypeIds::String> {};
333+
334+
using TKey = TableKey<LockId, RangeId>;
335+
using TColumns = TableColumns<LockId, RangeId, PathOwnerId, LocalPathId, Flags, Data>;
336+
};
337+
338+
struct LockConflicts : Table<LockConflictsTableId> {
339+
struct LockId : Column<1, NScheme::NTypeIds::Uint64> {};
340+
struct ConflictId : Column<2, NScheme::NTypeIds::Uint64> {};
341+
342+
using TKey = TableKey<LockId, ConflictId>;
343+
using TColumns = TableColumns<LockId, ConflictId>;
344+
};
345+
346+
struct LockVolatileDependencies : Table<LockVolatileDependenciesTableId> {
347+
struct LockId : Column<1, NScheme::NTypeIds::Uint64> {};
348+
struct TxId : Column<2, NScheme::NTypeIds::Uint64> {};
349+
350+
using TKey = TableKey<LockId, TxId>;
351+
using TColumns = TableColumns<LockId, TxId>;
352+
};
353+
308354
using TTables = SchemaTables<
309355
Value,
310356
TxInfo,
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
#include "locks_db.h"
2+
3+
namespace NKikimr::NColumnShard {
4+
5+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,27 @@
1+
#pragma once
2+
#include <ydb/core/tx/locks/locks_db.h>
3+
#include <ydb/core/tx/columnshard/columnshard_impl.h>
4+
5+
6+
namespace NKikimr::NColumnShard {
7+
8+
class TColumnShardLocksDb : public NLocks::TShardLocksDb<TColumnShard, NColumnShard::Schema> {
9+
private:
10+
using TBase = NLocks::TShardLocksDb<TColumnShard, NColumnShard::Schema>;
11+
12+
public:
13+
using TBase::TBase;
14+
15+
void PersistRemoveLock(ui64 lockId) override {
16+
NIceDb::TNiceDb db(DB);
17+
db.Table<NColumnShard::Schema::Locks>().Key(lockId).Delete();
18+
HasChanges_ = true;
19+
}
20+
21+
bool MayAddLock(ui64) override {
22+
return true;
23+
}
24+
25+
};
26+
27+
}

ydb/core/tx/columnshard/transactions/ya.make

+1-6
Original file line numberDiff line numberDiff line change
@@ -2,19 +2,14 @@ LIBRARY()
22

33
SRCS(
44
tx_controller.cpp
5+
locks_db.cpp
56
)
67

78
PEERDIR(
89
ydb/core/tablet_flat
910
ydb/core/tx/data_events
1011
)
1112

12-
IF (OS_WINDOWS)
13-
CFLAGS(
14-
-DKIKIMR_DISABLE_S3_OPS
15-
)
16-
ENDIF()
17-
1813
YQL_LAST_ABI_VERSION()
1914

2015
END()

ydb/core/tx/datashard/build_and_wait_dependencies_unit.cpp

+2-1
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,9 @@
11
#include "datashard_impl.h"
2-
#include "datashard_counters.h"
32
#include "datashard_pipeline.h"
43
#include "execution_unit_ctors.h"
54

5+
#include <ydb/core/tx/locks/time_counters.h>
6+
67
namespace NKikimr {
78
namespace NDataShard {
89

ydb/core/tx/datashard/datashard__engine_host.cpp

+1-1
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@
22
#include "datashard_impl.h"
33
#include "datashard_user_db.h"
44
#include "datashard__engine_host.h"
5-
#include "sys_tables.h"
5+
#include <ydb/core/tx/locks/sys_tables.h>
66

77
#include <ydb/core/engine/minikql/minikql_engine_host.h>
88
#include <ydb/core/kqp/rm_service/kqp_rm_service.h>

ydb/core/tx/datashard/datashard_active_transaction.cpp

+1-1
Original file line numberDiff line numberDiff line change
@@ -2,11 +2,11 @@
22

33
#include "datashard_active_transaction.h"
44
#include "datashard_kqp.h"
5-
#include "datashard_locks.h"
65
#include "datashard_impl.h"
76
#include "datashard_failpoints.h"
87
#include "key_conflicts.h"
98

9+
#include <ydb/core/tx/locks/locks.h>
1010
#include <ydb/library/actors/util/memory_track.h>
1111

1212
namespace NKikimr {

ydb/core/tx/datashard/datashard_active_transaction.h

+1-1
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
#pragma once
22

33
#include "datashard.h"
4-
#include "datashard_locks.h"
4+
#include <ydb/core/tx/locks/locks.h>
55
#include "datashard__engine_host.h"
66
#include "operation.h"
77

ydb/core/tx/datashard/datashard_dep_tracker.h

+1-1
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@
33
#include "datashard.h"
44
#include "datashard_user_table.h"
55
#include "datashard_active_transaction.h"
6-
#include "range_treap.h"
6+
#include <ydb/core/tx/locks/range_treap.h>
77

88
#include <library/cpp/containers/absl_flat_hash/flat_hash_map.h>
99

0 commit comments

Comments
 (0)