Skip to content

Commit b87553f

Browse files
authored
Stable cs to analytics (#9484)
1 parent 79bc3c2 commit b87553f

File tree

587 files changed

+19209
-6264
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

587 files changed

+19209
-6264
lines changed

.github/config/muted_ya.txt

+2-6
Original file line numberDiff line numberDiff line change
@@ -11,8 +11,6 @@ ydb/core/kqp/provider/ut KikimrIcGateway.TestLoadBasicSecretValueFromExternalDat
1111
ydb/core/kqp/ut/join KqpJoinOrder.Chain65Nodes
1212
ydb/core/kqp/ut/olap KqpOlapBlobsSharing.*
1313
ydb/core/kqp/ut/olap KqpOlapStatistics.StatsUsageWithTTL
14-
ydb/core/kqp/ut/olap KqpOlapAggregations.Aggregation_ResultCountAll_FilterL
15-
ydb/core/kqp/ut/olap KqpOlapWrite.WriteDeleteCleanGC
1614
ydb/core/kqp/ut/pg KqpPg.CreateIndex
1715
ydb/core/kqp/ut/tx KqpLocksTricky.TestNoLocksIssueInteractiveTx+withSink
1816
ydb/core/kqp/ut/tx KqpLocksTricky.TestNoLocksIssue+withSink
@@ -33,10 +31,6 @@ ydb/core/kqp/ut/federated_query/s3 KqpFederatedQuery.CreateTableAsSelectFromExte
3331
ydb/core/kqp/ut/federated_query/s3 KqpFederatedQuery.CreateTableAsSelectFromExternalTable
3432
ydb/core/kqp/ut/scan KqpRequestContext.TraceIdInErrorMessage
3533
ydb/core/kqp/ut/scheme KqpOlapScheme.TenThousandColumns
36-
ydb/core/kqp/ut/scheme KqpOlap.OlapRead_GenericQuerys
37-
ydb/core/kqp/ut/scheme KqpOlap.OlapRead_StreamGenericQuery
38-
ydb/core/kqp/ut/scheme KqpOlap.OlapRead_UsesGenericQueryOnJoinWithDataShardTable
39-
ydb/core/kqp/ut/scheme KqpOlapScheme.DropTable
4034
ydb/core/kqp/ut/scheme KqpScheme.AlterAsyncReplication
4135
ydb/core/kqp/ut/scheme KqpScheme.QueryWithAlter
4236
ydb/core/kqp/ut/scheme [14/50]*
@@ -122,4 +116,6 @@ ydb/tests/functional/tenants test_storage_config.py.TestStorageConfig.*
122116
ydb/tests/functional/tenants test_tenants.py.*
123117
ydb/tests/functional/ydb_cli test_ydb_impex.py.TestImpex.test_big_dataset*
124118
ydb/tests/tools/pq_read/test test_timeout.py.TestTimeout.test_timeout
119+
ydb/core/kqp/ut/olap KqpDecimalColumnShard.TestFilterCompare
120+
ydb/core/kqp/ut/olap KqpDecimalColumnShard.TestAggregation
125121
ydb/tests/functional/rename [test_rename.py */10] chunk chunk

ydb/core/base/events.h

+2
Original file line numberDiff line numberDiff line change
@@ -179,6 +179,8 @@ struct TKikimrEvents : TEvents {
179179
ES_TX_BACKGROUND = 4256,
180180
ES_SS_BG_TASKS = 4257,
181181
ES_LIMITER = 4258,
182+
//ES_MEMORY = 4259, NB. exists in main
183+
ES_GROUPED_ALLOCATIONS_MANAGER = 4260,
182184
};
183185
};
184186

ydb/core/driver_lib/run/kikimr_services_initializers.cpp

+23
Original file line numberDiff line numberDiff line change
@@ -186,6 +186,9 @@
186186
#include <ydb/core/tx/limiter/usage/config.h>
187187
#include <ydb/core/tx/limiter/usage/service.h>
188188

189+
#include <ydb/core/tx/limiter/grouped_memory/usage/config.h>
190+
#include <ydb/core/tx/limiter/grouped_memory/usage/service.h>
191+
189192
#include <ydb/core/backup/controller/tablet.h>
190193

191194
#include <ydb/services/ext_index/common/config.h>
@@ -2179,6 +2182,26 @@ void TKqpServiceInitializer::InitializeServices(NActors::TActorSystemSetup* setu
21792182
}
21802183
}
21812184

2185+
TGroupedMemoryLimiterInitializer::TGroupedMemoryLimiterInitializer(const TKikimrRunConfig& runConfig)
2186+
: IKikimrServicesInitializer(runConfig) {
2187+
}
2188+
2189+
void TGroupedMemoryLimiterInitializer::InitializeServices(NActors::TActorSystemSetup* setup, const NKikimr::TAppData* appData) {
2190+
NOlap::NGroupedMemoryManager::TConfig serviceConfig;
2191+
Y_ABORT_UNLESS(serviceConfig.DeserializeFromProto(Config.GetGroupedMemoryLimiterConfig()));
2192+
2193+
if (serviceConfig.IsEnabled()) {
2194+
TIntrusivePtr<::NMonitoring::TDynamicCounters> tabletGroup = GetServiceCounters(appData->Counters, "tablets");
2195+
TIntrusivePtr<::NMonitoring::TDynamicCounters> countersGroup = tabletGroup->GetSubgroup("type", "TX_GROUPED_MEMORY_LIMITER");
2196+
2197+
auto service = NOlap::NGroupedMemoryManager::TScanMemoryLimiterOperator::CreateService(serviceConfig, countersGroup);
2198+
2199+
setup->LocalServices.push_back(std::make_pair(
2200+
NOlap::NGroupedMemoryManager::TScanMemoryLimiterOperator::MakeServiceId(NodeId),
2201+
TActorSetupCmd(service, TMailboxType::HTSwap, appData->UserPoolId)));
2202+
}
2203+
}
2204+
21822205
TCompDiskLimiterInitializer::TCompDiskLimiterInitializer(const TKikimrRunConfig& runConfig)
21832206
: IKikimrServicesInitializer(runConfig) {
21842207
}

ydb/core/driver_lib/run/kikimr_services_initializers.h

+6
Original file line numberDiff line numberDiff line change
@@ -397,6 +397,12 @@ class TCompDiskLimiterInitializer: public IKikimrServicesInitializer {
397397
void InitializeServices(NActors::TActorSystemSetup* setup, const NKikimr::TAppData* appData) override;
398398
};
399399

400+
class TGroupedMemoryLimiterInitializer: public IKikimrServicesInitializer {
401+
public:
402+
TGroupedMemoryLimiterInitializer(const TKikimrRunConfig& runConfig);
403+
void InitializeServices(NActors::TActorSystemSetup* setup, const NKikimr::TAppData* appData) override;
404+
};
405+
400406
class TCompConveyorInitializer: public IKikimrServicesInitializer {
401407
public:
402408
TCompConveyorInitializer(const TKikimrRunConfig& runConfig);

ydb/core/driver_lib/run/run.cpp

+4
Original file line numberDiff line numberDiff line change
@@ -1554,6 +1554,10 @@ TIntrusivePtr<TServiceInitializersList> TKikimrRunner::CreateServiceInitializers
15541554
sil->AddServiceInitializer(new TCompDiskLimiterInitializer(runConfig));
15551555
}
15561556

1557+
if (serviceMask.EnableGroupedMemoryLimiter) {
1558+
sil->AddServiceInitializer(new TGroupedMemoryLimiterInitializer(runConfig));
1559+
}
1560+
15571561
if (serviceMask.EnableScanConveyor) {
15581562
sil->AddServiceInitializer(new TScanConveyorInitializer(runConfig));
15591563
}

ydb/core/driver_lib/run/service_mask.h

+1
Original file line numberDiff line numberDiff line change
@@ -78,6 +78,7 @@ union TBasicKikimrServicesMask {
7878
bool EnableDatabaseMetadataCache:1;
7979
bool EnableGraphService:1;
8080
bool EnableCompDiskLimiter:1;
81+
bool EnableGroupedMemoryLimiter:1;
8182
};
8283

8384
struct {

ydb/core/driver_lib/run/ya.make

+1
Original file line numberDiff line numberDiff line change
@@ -111,6 +111,7 @@ PEERDIR(
111111
ydb/core/tx/coordinator
112112
ydb/core/tx/conveyor/service
113113
ydb/core/tx/limiter/service
114+
ydb/core/tx/limiter/grouped_memory/usage
114115
ydb/core/tx/datashard
115116
ydb/core/tx/long_tx_service
116117
ydb/core/tx/long_tx_service/public
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,163 @@
1+
#include "accessor.h"
2+
3+
#include <ydb/core/formats/arrow/arrow_helpers.h>
4+
#include <ydb/core/formats/arrow/permutations.h>
5+
#include <ydb/core/formats/arrow/save_load/saver.h>
6+
#include <ydb/core/formats/arrow/size_calcer.h>
7+
#include <ydb/core/formats/arrow/splitter/simple.h>
8+
#include <ydb/core/formats/arrow/switch/compare.h>
9+
#include <ydb/core/formats/arrow/switch/switch_type.h>
10+
11+
#include <ydb/library/actors/core/log.h>
12+
13+
namespace NKikimr::NArrow::NAccessor {
14+
15+
void IChunkedArray::TReader::AppendPositionTo(arrow::ArrayBuilder& builder, const ui64 position, ui64* recordSize) const {
16+
auto address = GetReadChunk(position);
17+
AFL_VERIFY(NArrow::Append(builder, *address.GetArray(), address.GetPosition(), recordSize));
18+
}
19+
20+
std::shared_ptr<arrow::Array> IChunkedArray::TReader::CopyRecord(const ui64 recordIndex) const {
21+
auto address = GetReadChunk(recordIndex);
22+
return NArrow::CopyRecords(address.GetArray(), { address.GetPosition() });
23+
}
24+
25+
std::shared_ptr<arrow::ChunkedArray> IChunkedArray::Slice(const ui32 offset, const ui32 count) const {
26+
AFL_VERIFY(offset + count <= (ui64)GetRecordsCount())("offset", offset)("count", count)("length", GetRecordsCount());
27+
ui32 currentOffset = offset;
28+
ui32 countLeast = count;
29+
std::vector<std::shared_ptr<arrow::Array>> chunks;
30+
auto address = GetChunkSlow(offset);
31+
while (countLeast) {
32+
address = GetChunk(address.GetAddress(), currentOffset);
33+
const ui64 internalPos = address.GetAddress().GetLocalIndex(currentOffset);
34+
if (internalPos + countLeast <= (ui64)address.GetArray()->length()) {
35+
chunks.emplace_back(address.GetArray()->Slice(internalPos, countLeast));
36+
break;
37+
} else {
38+
const ui32 deltaCount = address.GetArray()->length() - internalPos;
39+
chunks.emplace_back(address.GetArray()->Slice(internalPos, deltaCount));
40+
AFL_VERIFY(countLeast >= deltaCount);
41+
countLeast -= deltaCount;
42+
currentOffset += deltaCount;
43+
}
44+
}
45+
return std::make_shared<arrow::ChunkedArray>(chunks, DataType);
46+
}
47+
48+
NKikimr::NArrow::NAccessor::IChunkedArray::TFullDataAddress IChunkedArray::GetChunk(
49+
const std::optional<TAddressChain>& chunkCurrent, const ui64 position) const {
50+
AFL_VERIFY(position < GetRecordsCount());
51+
std::optional<TCommonChunkAddress> address;
52+
53+
if (IsDataOwner()) {
54+
if (chunkCurrent) {
55+
AFL_VERIFY(chunkCurrent->GetSize() == 1)("size", chunkCurrent->GetSize());
56+
}
57+
auto localAddress = GetLocalData(address, position);
58+
TAddressChain addressChain;
59+
addressChain.Add(localAddress.GetAddress());
60+
AFL_VERIFY(addressChain.Contains(position));
61+
return TFullDataAddress(localAddress.GetArray(), std::move(addressChain));
62+
} else {
63+
auto chunkedArrayAddress = GetArray(chunkCurrent, position, nullptr);
64+
if (chunkCurrent) {
65+
AFL_VERIFY(chunkCurrent->GetSize() == 1 + chunkedArrayAddress.GetAddress().GetSize())("current", chunkCurrent->GetSize())(
66+
"chunked", chunkedArrayAddress.GetAddress().GetSize());
67+
}
68+
auto localAddress = chunkedArrayAddress.GetArray()->GetLocalData(address, chunkedArrayAddress.GetAddress().GetLocalIndex(position));
69+
auto fullAddress = std::move(chunkedArrayAddress.MutableAddress());
70+
fullAddress.Add(localAddress.GetAddress());
71+
AFL_VERIFY(fullAddress.Contains(position));
72+
return TFullDataAddress(localAddress.GetArray(), std::move(fullAddress));
73+
}
74+
}
75+
76+
IChunkedArray::TFullChunkedArrayAddress IChunkedArray::GetArray(
77+
const std::optional<TAddressChain>& chunkCurrent, const ui64 position, const std::shared_ptr<IChunkedArray>& selfPtr) const {
78+
AFL_VERIFY(position < GetRecordsCount());
79+
if (IsDataOwner()) {
80+
AFL_VERIFY(selfPtr);
81+
TAddressChain chain;
82+
chain.Add(TCommonChunkAddress(0, GetRecordsCount(), 0));
83+
return IChunkedArray::TFullChunkedArrayAddress(selfPtr, std::move(chain));
84+
}
85+
TAddressChain addressChain;
86+
87+
auto* currentLevel = this;
88+
ui32 currentPosition = position;
89+
ui32 idx = 0;
90+
std::vector<std::shared_ptr<IChunkedArray>> chainForTemporarySave;
91+
while (!currentLevel->IsDataOwner()) {
92+
std::optional<TCommonChunkAddress> currentAddress;
93+
if (chunkCurrent) {
94+
currentAddress = chunkCurrent->GetAddress(idx);
95+
}
96+
auto nextChunkedArray = currentLevel->GetLocalChunkedArray(currentAddress, currentPosition);
97+
chainForTemporarySave.emplace_back(nextChunkedArray.GetArray());
98+
currentLevel = chainForTemporarySave.back().get();
99+
addressChain.Add(nextChunkedArray.GetAddress());
100+
AFL_VERIFY(nextChunkedArray.GetAddress().GetStartPosition() <= currentPosition);
101+
currentPosition -= nextChunkedArray.GetAddress().GetStartPosition();
102+
++idx;
103+
}
104+
AFL_VERIFY(!chunkCurrent || chunkCurrent->GetSize() - idx <= 1)("idx", idx)("size", chunkCurrent->GetSize());
105+
return TFullChunkedArrayAddress(chainForTemporarySave.back(), std::move(addressChain));
106+
}
107+
108+
TString IChunkedArray::TReader::DebugString(const ui32 position) const {
109+
auto address = GetReadChunk(position);
110+
return NArrow::DebugString(address.GetArray(), address.GetPosition());
111+
}
112+
113+
std::partial_ordering IChunkedArray::TReader::CompareColumns(
114+
const std::vector<TReader>& l, const ui64 lPosition, const std::vector<TReader>& r, const ui64 rPosition) {
115+
AFL_VERIFY(l.size() == r.size());
116+
for (ui32 i = 0; i < l.size(); ++i) {
117+
const TAddress lAddress = l[i].GetReadChunk(lPosition);
118+
const TAddress rAddress = r[i].GetReadChunk(rPosition);
119+
auto cmp = lAddress.Compare(rAddress);
120+
if (std::is_neq(cmp)) {
121+
return cmp;
122+
}
123+
}
124+
return std::partial_ordering::equivalent;
125+
}
126+
127+
IChunkedArray::TAddress IChunkedArray::TReader::GetReadChunk(const ui64 position) const {
128+
AFL_VERIFY(position < ChunkedArray->GetRecordsCount());
129+
if (CurrentChunkAddress && CurrentChunkAddress->GetAddress().Contains(position)) {
130+
} else {
131+
CurrentChunkAddress = ChunkedArray->GetChunk(CurrentChunkAddress, position);
132+
}
133+
return IChunkedArray::TAddress(CurrentChunkAddress->GetArray(), CurrentChunkAddress->GetAddress().GetLocalIndex(position));
134+
}
135+
136+
const std::partial_ordering IChunkedArray::TAddress::Compare(const TAddress& item) const {
137+
return TComparator::TypedCompare<true>(*Array, Position, *item.Array, item.Position);
138+
}
139+
140+
TChunkedArraySerialized::TChunkedArraySerialized(const std::shared_ptr<IChunkedArray>& array, const TString& serializedData)
141+
: Array(array)
142+
, SerializedData(serializedData) {
143+
AFL_VERIFY(serializedData);
144+
AFL_VERIFY(Array);
145+
AFL_VERIFY(Array->GetRecordsCount());
146+
}
147+
148+
std::partial_ordering IChunkedArray::TFullDataAddress::Compare(
149+
const ui64 position, const TFullDataAddress& item, const ui64 itemPosition) const {
150+
AFL_VERIFY(Address.Contains(position))("pos", position)("start", Address.DebugString());
151+
AFL_VERIFY(item.Address.Contains(itemPosition))("pos", itemPosition)("start", item.Address.DebugString());
152+
return TComparator::TypedCompare<true>(*Array, Address.GetLocalIndex(position), *item.Array, item.Address.GetLocalIndex(itemPosition));
153+
}
154+
155+
std::shared_ptr<arrow::Array> IChunkedArray::TFullDataAddress::CopyRecord(const ui64 recordIndex) const {
156+
return NArrow::CopyRecords(Array, { Address.GetLocalIndex(recordIndex) });
157+
}
158+
159+
TString IChunkedArray::TFullDataAddress::DebugString(const ui64 position) const {
160+
return NArrow::DebugString(Array, Address.GetLocalIndex(position));
161+
}
162+
163+
} // namespace NKikimr::NArrow::NAccessor

0 commit comments

Comments
 (0)