Skip to content

Commit ab051d7

Browse files
Correct index construction (#6500)
1 parent 85d4495 commit ab051d7

File tree

128 files changed

+1435
-2106
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

128 files changed

+1435
-2106
lines changed
Lines changed: 47 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,47 @@
1+
#include "serialization.h"
2+
#include <ydb/core/formats/arrow/switch/switch_type.h>
3+
#include <ydb/library/actors/core/log.h>
4+
5+
namespace NKikimr::NArrow::NScalar {
6+
7+
TConclusion<TString> TSerializer::SerializePayloadToString(const std::shared_ptr<arrow::Scalar>& scalar) {
8+
TString resultString;
9+
const bool resultFlag = NArrow::SwitchType(scalar->type->id(), [&](const auto& type) {
10+
using TWrap = std::decay_t<decltype(type)>;
11+
if constexpr (arrow::has_c_type<typename TWrap::T>()) {
12+
using CType = typename TWrap::T::c_type;
13+
using ScalarType = typename arrow::TypeTraits<typename TWrap::T>::ScalarType;
14+
const ScalarType* scalarTyped = static_cast<const ScalarType*>(scalar.get());
15+
resultString = TString(sizeof(CType), '\0');
16+
memcpy(&resultString[0], scalarTyped->data(), sizeof(CType));
17+
return true;
18+
}
19+
return false;
20+
});
21+
if (!resultFlag) {
22+
return TConclusionStatus::Fail("incorrect scalar type for payload serialization: " + scalar->type->ToString());
23+
}
24+
return resultString;
25+
}
26+
27+
TConclusion<std::shared_ptr<arrow::Scalar>> TSerializer::DeserializeFromStringWithPayload(const TString& data, const std::shared_ptr<arrow::DataType>& dataType) {
28+
AFL_VERIFY(dataType);
29+
std::shared_ptr<arrow::Scalar> result;
30+
const bool resultFlag = NArrow::SwitchType(dataType->id(), [&](const auto& type) {
31+
using TWrap = std::decay_t<decltype(type)>;
32+
if constexpr (arrow::has_c_type<typename TWrap::T>()) {
33+
using CType = typename TWrap::T::c_type;
34+
AFL_VERIFY(data.size() == sizeof(CType));
35+
using ScalarType = typename arrow::TypeTraits<typename TWrap::T>::ScalarType;
36+
result = std::make_shared<ScalarType>(*(CType*)&data[0], dataType);
37+
return true;
38+
}
39+
return false;
40+
});
41+
if (!resultFlag) {
42+
return TConclusionStatus::Fail("incorrect scalar type for payload deserialization: " + dataType->ToString());
43+
}
44+
return result;
45+
}
46+
47+
}
Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,15 @@
1+
#pragma once
2+
#include <ydb/library/conclusion/result.h>
3+
4+
#include <contrib/libs/apache/arrow/cpp/src/arrow/scalar.h>
5+
#include <contrib/libs/apache/arrow/cpp/src/arrow/type.h>
6+
7+
#include <util/generic/string.h>
8+
9+
namespace NKikimr::NArrow::NScalar {
10+
class TSerializer {
11+
public:
12+
static TConclusion<TString> SerializePayloadToString(const std::shared_ptr<arrow::Scalar>& scalar);
13+
static TConclusion<std::shared_ptr<arrow::Scalar>> DeserializeFromStringWithPayload(const TString& data, const std::shared_ptr<arrow::DataType>& dataType);
14+
};
15+
}

ydb/core/formats/arrow/scalar/ya.make

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,14 @@
1+
LIBRARY()
2+
3+
PEERDIR(
4+
contrib/libs/apache/arrow
5+
ydb/library/conclusion
6+
ydb/core/formats/arrow/switch
7+
ydb/library/actors/core
8+
)
9+
10+
SRCS(
11+
serialization.cpp
12+
)
13+
14+
END()

ydb/core/formats/arrow/ya.make

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@ PEERDIR(
1212
ydb/core/formats/arrow/dictionary
1313
ydb/core/formats/arrow/transformer
1414
ydb/core/formats/arrow/reader
15+
ydb/core/formats/arrow/scalar
1516
ydb/core/formats/arrow/hash
1617
ydb/library/actors/core
1718
ydb/library/arrow_kernels

ydb/core/kqp/gateway/behaviour/tablestore/operations/alter_sharding.cpp

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
#include "alter_sharding.h"
2+
#include <ydb/library/actors/core/log.h>
23
#include <util/string/type.h>
34

45
namespace NKikimr::NKqp {
@@ -26,4 +27,8 @@ void TAlterShardingOperation::DoSerializeScheme(NKikimrSchemeOp::TModifyScheme&
2627
scheme.MutableAlterColumnTable()->MutableReshardColumnTable()->SetIncrease(*Increase);
2728
}
2829

30+
void TAlterShardingOperation::DoSerializeScheme(NKikimrSchemeOp::TAlterColumnTableSchema& /*scheme*/) const {
31+
AFL_VERIFY(false);
32+
}
33+
2934
}

ydb/core/kqp/gateway/behaviour/tablestore/operations/alter_sharding.h

Lines changed: 1 addition & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,4 @@
11
#include "abstract.h"
2-
#include <ydb/core/tx/columnshard/engines/scheme/statistics/abstract/constructor.h>
32

43
namespace NKikimr::NKqp {
54

@@ -12,9 +11,7 @@ class TAlterShardingOperation: public ITableStoreOperation {
1211
static inline const auto Registrator = TFactory::TRegistrator<TAlterShardingOperation>(GetTypeName());
1312
private:
1413
std::optional<bool> Increase;
15-
virtual void DoSerializeScheme(NKikimrSchemeOp::TAlterColumnTableSchema& /*scheme*/) const override {
16-
AFL_VERIFY(false);
17-
}
14+
virtual void DoSerializeScheme(NKikimrSchemeOp::TAlterColumnTableSchema& /*scheme*/) const override;
1815
virtual void DoSerializeScheme(NKikimrSchemeOp::TModifyScheme& scheme, const bool isStandalone) const override;
1916

2017
public:

ydb/core/kqp/gateway/behaviour/tablestore/operations/drop_stat.cpp

Lines changed: 0 additions & 21 deletions
This file was deleted.

ydb/core/kqp/gateway/behaviour/tablestore/operations/drop_stat.h

Lines changed: 0 additions & 19 deletions
This file was deleted.

ydb/core/kqp/gateway/behaviour/tablestore/operations/upsert_index.cpp

Lines changed: 0 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -12,10 +12,6 @@ TConclusionStatus TUpsertIndexOperation::DoDeserialize(NYql::TObjectSettingsImpl
1212
}
1313
IndexName = *fValue;
1414
}
15-
StorageId = features.Extract("STORAGE_ID");
16-
if (StorageId && !*StorageId) {
17-
return TConclusionStatus::Fail("STORAGE_ID cannot be empty string");
18-
}
1915
TString indexType;
2016
{
2117
auto fValue = features.Extract("TYPE");
@@ -46,9 +42,6 @@ TConclusionStatus TUpsertIndexOperation::DoDeserialize(NYql::TObjectSettingsImpl
4642

4743
void TUpsertIndexOperation::DoSerializeScheme(NKikimrSchemeOp::TAlterColumnTableSchema& schemaData) const {
4844
auto* indexProto = schemaData.AddUpsertIndexes();
49-
if (StorageId) {
50-
indexProto->SetStorageId(*StorageId);
51-
}
5245
indexProto->SetName(IndexName);
5346
IndexMetaConstructor.SerializeToProto(*indexProto);
5447
}

ydb/core/kqp/gateway/behaviour/tablestore/operations/upsert_index.h

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,6 @@ class TUpsertIndexOperation : public ITableStoreOperation {
1212
static inline auto Registrator = TFactory::TRegistrator<TUpsertIndexOperation>(GetTypeName());
1313
private:
1414
TString IndexName;
15-
std::optional<TString> StorageId;
1615
NBackgroundTasks::TInterfaceProtoContainer<NOlap::NIndexes::IIndexMetaConstructor> IndexMetaConstructor;
1716
public:
1817
TConclusionStatus DoDeserialize(NYql::TObjectSettingsImpl::TFeaturesExtractor& features) override;

ydb/core/kqp/gateway/behaviour/tablestore/operations/upsert_stat.cpp

Lines changed: 0 additions & 49 deletions
This file was deleted.

ydb/core/kqp/gateway/behaviour/tablestore/operations/upsert_stat.h

Lines changed: 0 additions & 23 deletions
This file was deleted.

ydb/core/kqp/gateway/behaviour/tablestore/operations/ya.make

Lines changed: 0 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -7,16 +7,13 @@ SRCS(
77
GLOBAL drop_column.cpp
88
GLOBAL upsert_index.cpp
99
GLOBAL drop_index.cpp
10-
GLOBAL upsert_stat.cpp
11-
GLOBAL drop_stat.cpp
1210
GLOBAL upsert_opt.cpp
1311
GLOBAL alter_sharding.cpp
1412
)
1513

1614
PEERDIR(
1715
ydb/services/metadata/manager
1816
ydb/core/formats/arrow/serializer
19-
ydb/core/tx/columnshard/engines/scheme/statistics/abstract
2017
ydb/core/tx/columnshard/engines/storage/optimizer/abstract
2118
ydb/core/kqp/gateway/utils
2219
ydb/core/protos

ydb/core/kqp/ut/olap/helpers/typed_local.cpp

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -143,16 +143,16 @@ void TTypedLocalHelper::FillPKOnly(const double pkKff /*= 0*/, const ui32 numRow
143143
TBase::SendDataViaActorSystem(TablePath, batch);
144144
}
145145

146-
void TTypedLocalHelper::GetStats(std::vector<NKikimrColumnShardStatisticsProto::TPortionStorage>& stats, const bool verbose /*= false*/) {
146+
void TTypedLocalHelper::GetStats(std::vector<NJson::TJsonValue>& stats, const bool verbose /*= false*/) {
147147
TString selectQuery = "SELECT * FROM `" + TablePath + "/.sys/primary_index_portion_stats` WHERE Activity = true";
148148
auto tableClient = KikimrRunner.GetTableClient();
149149
auto rows = ExecuteScanQuery(tableClient, selectQuery, verbose);
150150
for (auto&& r : rows) {
151151
for (auto&& c : r) {
152152
if (c.first == "Stats") {
153-
NKikimrColumnShardStatisticsProto::TPortionStorage store;
154-
AFL_VERIFY(google::protobuf::TextFormat::ParseFromString(GetUtf8(c.second), &store));
155-
stats.emplace_back(store);
153+
NJson::TJsonValue jsonStore;
154+
AFL_VERIFY(NJson::ReadJsonFastTree(GetUtf8(c.second), &jsonStore));
155+
stats.emplace_back(jsonStore);
156156
}
157157
}
158158
}

ydb/core/kqp/ut/olap/helpers/typed_local.h

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,14 @@
11
#pragma once
22
#include <ydb/core/testlib/cs_helper.h>
33
#include <ydb/core/kqp/ut/common/kqp_ut_common.h>
4-
#include <ydb/public/sdk/cpp/client/ydb_types/status_codes.h>
54
#include <ydb/core/formats/arrow/simple_builder/array.h>
65
#include <ydb/core/formats/arrow/simple_builder/batch.h>
76
#include <ydb/core/formats/arrow/simple_builder/filler.h>
87

8+
#include <ydb/public/sdk/cpp/client/ydb_types/status_codes.h>
9+
10+
#include <library/cpp/json/writer/json_value.h>
11+
912
namespace NKikimr::NKqp {
1013

1114
class TTypedLocalHelper: public Tests::NCS::THelper {
@@ -19,7 +22,7 @@ class TTypedLocalHelper: public Tests::NCS::THelper {
1922
protected:
2023
virtual TString GetTestTableSchema() const override;
2124
virtual std::vector<TString> GetShardingColumns() const override {
22-
return {"pk_int"};
25+
return { "pk_int" };
2326
}
2427
public:
2528
TTypedLocalHelper(const TString& typeName, TKikimrRunner& kikimrRunner, const TString& tableName = "olapTable", const TString& storeName = "olapStore")
@@ -66,7 +69,7 @@ class TTypedLocalHelper: public Tests::NCS::THelper {
6669

6770
void GetVolumes(ui64& rawBytes, ui64& bytes, const bool verbose = false, const std::vector<TString> columnNames = {});
6871

69-
void GetStats(std::vector<NKikimrColumnShardStatisticsProto::TPortionStorage>& stats, const bool verbose = false);
72+
void GetStats(std::vector<NJson::TJsonValue>& stats, const bool verbose = false);
7073

7174
void GetCount(ui64& count);
7275

0 commit comments

Comments
 (0)