Skip to content

Commit 8e415f0

Browse files
authored
[yt provider] Column group hint (#5727)
1 parent 5bff5d6 commit 8e415f0

File tree

68 files changed

+876
-96
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

68 files changed

+876
-96
lines changed

ydb/library/yql/providers/yt/gateway/file/yql_yt_file.cpp

+8-2
Original file line numberDiff line numberDiff line change
@@ -933,8 +933,14 @@ class TYtFileGateway : public IYtGateway {
933933
const auto nativeYtTypeCompatibility = options.Config()->NativeYtTypeCompatibility.Get(cluster).GetOrElse(NTCF_LEGACY);
934934
const bool rowSpecCompactForm = options.Config()->UseYqlRowSpecCompactForm.Get().GetOrElse(DEFAULT_ROW_SPEC_COMPACT_FORM);
935935
dstRowSpec->FillAttrNode(attrs[YqlRowSpecAttribute], nativeYtTypeCompatibility, rowSpecCompactForm);
936-
if (!append || !attrs.HasKey("schema")) {
937-
attrs["schema"] = RowSpecToYTSchema(spec[YqlRowSpecAttribute], nativeYtTypeCompatibility).ToNode();
936+
NYT::TNode columnGroupsSpec;
937+
if (options.Config()->OptimizeFor.Get(cluster).GetOrElse(NYT::OF_LOOKUP_ATTR) != NYT::OF_LOOKUP_ATTR) {
938+
if (auto setting = NYql::GetSetting(publish.Settings().Ref(), EYtSettingType::ColumnGroups)) {
939+
columnGroupsSpec = NYT::NodeFromYsonString(setting->Tail().Content());
940+
}
941+
}
942+
if (!append || !attrs.HasKey("schema") || !columnGroupsSpec.IsUndefined()) {
943+
attrs["schema"] = RowSpecToYTSchema(spec[YqlRowSpecAttribute], nativeYtTypeCompatibility, columnGroupsSpec).ToNode();
938944
}
939945
TOFStream ofAttr(destFilePath + ".attr");
940946
ofAttr.Write(NYT::NodeToYsonString(attrs, NYson::EYsonFormat::Pretty));

ydb/library/yql/providers/yt/gateway/native/yql_yt_native.cpp

+21-7
Original file line numberDiff line numberDiff line change
@@ -794,7 +794,6 @@ class TYtNativeGateway : public IYtGateway {
794794
mode = FromString<EYtWriteMode>(modeSetting->Child(1)->Content());
795795
}
796796
const bool initial = NYql::HasSetting(publish.Settings().Ref(), EYtSettingType::Initial);
797-
const bool monotonicKeys = NYql::HasSetting(publish.Settings().Ref(), EYtSettingType::MonotonicKeys);
798797

799798
std::unordered_map<EYtSettingType, TString> strOpts;
800799
for (const auto& setting : publish.Settings().Ref().Children()) {
@@ -815,9 +814,15 @@ class TYtNativeGateway : public IYtGateway {
815814
TVector<TString> src;
816815
ui64 chunksCount = 0;
817816
ui64 dataSize = 0;
817+
std::unordered_set<TString> columnGroups;
818818
for (auto out: publish.Input()) {
819819
auto outTable = GetOutTable(out).Cast<TYtOutTable>();
820820
src.emplace_back(outTable.Name().Value());
821+
if (auto columnGroupSetting = NYql::GetSetting(outTable.Settings().Ref(), EYtSettingType::ColumnGroups)) {
822+
columnGroups.emplace(columnGroupSetting->Tail().Content());
823+
} else {
824+
columnGroups.emplace();
825+
}
821826
auto stat = TYtTableStatInfo(outTable.Stat());
822827
chunksCount += stat.ChunkCount;
823828
dataSize += stat.DataSize;
@@ -828,6 +833,7 @@ class TYtNativeGateway : public IYtGateway {
828833
if (src.size() > 10) {
829834
YQL_CLOG(INFO, ProviderYt) << "...total input tables=" << src.size();
830835
}
836+
TString srcColumnGroups = columnGroups.size() == 1 ? *columnGroups.cbegin() : TString();
831837

832838
bool combineChunks = false;
833839
if (auto minChunkSize = options.Config()->MinPublishedAvgChunkSize.Get()) {
@@ -858,9 +864,9 @@ class TYtNativeGateway : public IYtGateway {
858864
const ui32 dstEpoch = TEpochInfo::Parse(publish.Publish().Epoch().Ref()).GetOrElse(0);
859865
auto execCtx = MakeExecCtx(std::move(options), session, cluster, node.Get(), &ctx);
860866

861-
return session->Queue_->Async([execCtx, src = std::move(src), dst, dstEpoch, isAnonymous, mode, initial, monotonicKeys, combineChunks, strOpts = std::move(strOpts)] () {
867+
return session->Queue_->Async([execCtx, src = std::move(src), dst, dstEpoch, isAnonymous, mode, initial, srcColumnGroups, combineChunks, strOpts = std::move(strOpts)] () {
862868
YQL_LOG_CTX_ROOT_SESSION_SCOPE(execCtx->LogCtx_);
863-
return ExecPublish(execCtx, src, dst, dstEpoch, isAnonymous, mode, initial, monotonicKeys, combineChunks, strOpts);
869+
return ExecPublish(execCtx, src, dst, dstEpoch, isAnonymous, mode, initial, srcColumnGroups, combineChunks, strOpts);
864870
})
865871
.Apply([nodePos] (const TFuture<void>& f) {
866872
try {
@@ -2008,7 +2014,7 @@ class TYtNativeGateway : public IYtGateway {
20082014
const bool isAnonymous,
20092015
EYtWriteMode mode,
20102016
const bool initial,
2011-
const bool monotonicKeys,
2017+
const TString& srcColumnGroups,
20122018
const bool combineChunks,
20132019
const std::unordered_map<EYtSettingType, TString>& strOpts)
20142020
{
@@ -2060,7 +2066,7 @@ class TYtNativeGateway : public IYtGateway {
20602066
TYqlRowSpecInfo::TPtr rowSpec = execCtx->Options_.DestinationRowSpec();
20612067

20622068
bool appendToSorted = false;
2063-
if (EYtWriteMode::Append == mode && !monotonicKeys) {
2069+
if (EYtWriteMode::Append == mode && !strOpts.contains(EYtSettingType::MonotonicKeys)) {
20642070
NYT::TNode attrs = entry->Tx->Get(dstPath + "/@", TGetOptions()
20652071
.AttributeFilter(TAttributeFilter()
20662072
.AddAttribute(TString("sorted_by"))
@@ -2187,6 +2193,14 @@ class TYtNativeGateway : public IYtGateway {
21872193

21882194
#undef DEFINE_OPT
21892195

2196+
NYT::TNode columnGroupsSpec;
2197+
if (const auto it = strOpts.find(EYtSettingType::ColumnGroups); it != strOpts.cend() && execCtx->Options_.Config()->OptimizeFor.Get(cluster).GetOrElse(NYT::OF_LOOKUP_ATTR) != NYT::OF_LOOKUP_ATTR) {
2198+
columnGroupsSpec = NYT::NodeFromYsonString(it->second);
2199+
if (it->second != srcColumnGroups) {
2200+
forceMerge = forceTransform = true;
2201+
}
2202+
}
2203+
21902204
TFuture<void> res;
21912205
if (EYtWriteMode::Flush == mode || EYtWriteMode::Append == mode || srcPaths.size() > 1 || forceMerge) {
21922206
TFuture<bool> cacheCheck = MakeFuture<bool>(false);
@@ -2198,7 +2212,7 @@ class TYtNativeGateway : public IYtGateway {
21982212
appendToSorted, initial, entry, dstPath, dstEpoch, yqlAttrs, combineChunks,
21992213
dstCompressionCodec, dstErasureCodec, dstReplicationFactor, dstMedia, dstPrimaryMedium,
22002214
nativeYtTypeCompatibility, publishTx, cluster,
2201-
commitCheckpoint] (const auto& f) mutable
2215+
commitCheckpoint, columnGroupsSpec = std::move(columnGroupsSpec)] (const auto& f) mutable
22022216
{
22032217
if (f.GetValue()) {
22042218
execCtx->QueryCacheItem.Destroy();
@@ -2237,7 +2251,7 @@ class TYtNativeGateway : public IYtGateway {
22372251
} else {
22382252
NYT::TNode fullSpecYson;
22392253
rowSpec->FillCodecNode(fullSpecYson);
2240-
const auto schema = RowSpecToYTSchema(fullSpecYson, nativeYtTypeCompatibility);
2254+
const auto schema = RowSpecToYTSchema(fullSpecYson, nativeYtTypeCompatibility, columnGroupsSpec);
22412255
ytDst.Schema(schema);
22422256

22432257
if (EYtWriteMode::Append != mode && EYtWriteMode::RenewKeepMeta != mode) {

ydb/library/yql/providers/yt/provider/yql_yt_datasink.cpp

+12-1
Original file line numberDiff line numberDiff line change
@@ -263,11 +263,22 @@ class TYtDataSink : public TDataProviderBase {
263263
ctx);
264264
res = ctx.ChangeChild(*res, TYtWriteTable::idx_Settings, std::move(settings));
265265
}
266+
if (auto columnGroup = NYql::GetSetting(*res->Child(TYtWriteTable::idx_Settings), EYtSettingType::ColumnGroups)) {
267+
const TString normalized = NormalizeColumnGroupSpec(columnGroup->Tail().Content());
268+
res = ctx.ChangeChild(*res, TYtWriteTable::idx_Settings,
269+
NYql::UpdateSettingValue(*res->Child(TYtWriteTable::idx_Settings),
270+
EYtSettingType::ColumnGroups,
271+
ctx.NewAtom(res->Child(TYtWriteTable::idx_Settings)->Pos(), normalized, TNodeFlags::MultilineContent),
272+
ctx)
273+
);
274+
}
266275
auto mutationId = ++NextMutationId_;
267276
res = ctx.ChangeChild(*res, TYtWriteTable::idx_Settings,
268277
NYql::AddSetting(*res->Child(TYtWriteTable::idx_Settings),
269278
EYtSettingType::MutationId,
270-
ctx.NewAtom(res->Child(TYtWriteTable::idx_Settings)->Pos(), ToString(mutationId)), ctx));
279+
ctx.NewAtom(res->Child(TYtWriteTable::idx_Settings)->Pos(), mutationId),
280+
ctx)
281+
);
271282
if (State_->Configuration->UseSystemColumns.Get().GetOrElse(DEFAULT_USE_SYS_COLUMNS)) {
272283
res = ctx.ChangeChild(*res, TYtWriteTable::idx_Content,
273284
ctx.Builder(node->Pos())

ydb/library/yql/providers/yt/provider/yql_yt_datasink_type_ann.cpp

+37-17
Original file line numberDiff line numberDiff line change
@@ -385,13 +385,27 @@ class TYtDataSinkTypeAnnotationTransformer : public TVisitorTransformerBase {
385385
}
386386

387387
TStatus ValidateTableWrite(const TPosition& pos, const TExprNode::TPtr& table, TExprNode::TPtr& content, const TTypeAnnotationNode* itemType,
388-
const TVector<TYqlRowSpecInfo::TPtr>& contentRowSpecs, const TString& cluster, const EYtWriteMode mode, const bool initialWrite, const bool monotonicKeys, TExprContext& ctx) const
388+
const TVector<TYqlRowSpecInfo::TPtr>& contentRowSpecs, const TString& cluster, const TExprNode& settings, TExprContext& ctx) const
389389
{
390390
YQL_ENSURE(itemType);
391391
if (content && !EnsurePersistableType(content->Pos(), *itemType, ctx)) {
392392
return TStatus::Error;
393393
}
394394

395+
EYtWriteMode mode = EYtWriteMode::Renew;
396+
if (auto modeSetting = NYql::GetSetting(settings, EYtSettingType::Mode)) {
397+
mode = FromString<EYtWriteMode>(modeSetting->Child(1)->Content());
398+
}
399+
const bool initialWrite = NYql::HasSetting(settings, EYtSettingType::Initial);
400+
const bool monotonicKeys = NYql::HasSetting(settings, EYtSettingType::MonotonicKeys);
401+
TString columnGroups;
402+
if (auto setting = NYql::GetSetting(settings, EYtSettingType::ColumnGroups)) {
403+
if (!ValidateColumnGroups(*setting, *itemType->Cast<TStructExprType>(), ctx)) {
404+
return TStatus::Error;
405+
}
406+
columnGroups.assign(setting->Tail().Content());
407+
}
408+
395409
if (!initialWrite && mode != EYtWriteMode::Append) {
396410
ctx.AddError(TIssue(pos, TStringBuilder() <<
397411
"Replacing " << TString{table->Child(TYtTable::idx_Name)->Content()}.Quote() << " table content after another table modifications in the same transaction"));
@@ -427,6 +441,14 @@ class TYtDataSinkTypeAnnotationTransformer : public TVisitorTransformerBase {
427441
return TStatus::Error;
428442
}
429443

444+
if (initialWrite && !replaceMeta && columnGroups) {
445+
ctx.AddError(TIssue(pos, TStringBuilder()
446+
<< "Insert with "
447+
<< ToString(EYtSettingType::ColumnGroups).Quote()
448+
<< " to existing table is not allowed"));
449+
return TStatus::Error;
450+
}
451+
430452
if (auto commitEpoch = outTableInfo.CommitEpoch.GetOrElse(0)) {
431453
// Check type compatibility with previous epoch
432454
if (auto nextDescription = State_->TablesData->FindTable(cluster, outTableInfo.Name, commitEpoch)) {
@@ -569,6 +591,16 @@ class TYtDataSinkTypeAnnotationTransformer : public TVisitorTransformerBase {
569591
}
570592
}
571593

594+
if (initialWrite) {
595+
nextDescription.ColumnGroupSpec = columnGroups;
596+
} else if (columnGroups != nextDescription.ColumnGroupSpec) {
597+
ctx.AddError(TIssue(pos, TStringBuilder()
598+
<< "All appends within the same commit should have the equal "
599+
<< ToString(EYtSettingType::ColumnGroups).Quote()
600+
<< " value"));
601+
return TStatus::Error;
602+
}
603+
572604
YQL_ENSURE(nextDescription.RowSpec);
573605
if (contentRowSpecs) {
574606
size_t from = 0;
@@ -1441,6 +1473,7 @@ class TYtDataSinkTypeAnnotationTransformer : public TVisitorTransformerBase {
14411473
| EYtSettingType::Expiration
14421474
| EYtSettingType::MonotonicKeys
14431475
| EYtSettingType::MutationId
1476+
| EYtSettingType::ColumnGroups
14441477
, ctx))
14451478
{
14461479
return TStatus::Error;
@@ -1455,13 +1488,6 @@ class TYtDataSinkTypeAnnotationTransformer : public TVisitorTransformerBase {
14551488
return status.Combine(TStatus::Repeat);
14561489
}
14571490

1458-
EYtWriteMode mode = EYtWriteMode::Renew;
1459-
if (auto modeSetting = NYql::GetSetting(*settings, EYtSettingType::Mode)) {
1460-
mode = FromString<EYtWriteMode>(modeSetting->Child(1)->Content());
1461-
}
1462-
const bool initialWrite = NYql::HasSetting(*settings, EYtSettingType::Initial);
1463-
const bool monotonicKeys = NYql::HasSetting(*settings, EYtSettingType::MonotonicKeys);
1464-
14651491
auto writeTable = TYtWriteTable(input);
14661492
auto cluster = writeTable.DataSink().Cluster().StringValue();
14671493

@@ -1471,7 +1497,7 @@ class TYtDataSinkTypeAnnotationTransformer : public TVisitorTransformerBase {
14711497
}
14721498

14731499
auto content = writeTable.Content().Ptr();
1474-
status = ValidateTableWrite(ctx.GetPosition(input->Pos()), table, content, itemType, {}, cluster, mode, initialWrite, monotonicKeys, ctx);
1500+
status = ValidateTableWrite(ctx.GetPosition(input->Pos()), table, content, itemType, {}, cluster, *settings, ctx);
14751501
if (TStatus::Error == status.Level) {
14761502
return status;
14771503
}
@@ -1719,6 +1745,7 @@ class TYtDataSinkTypeAnnotationTransformer : public TVisitorTransformerBase {
17191745
| EYtSettingType::Expiration
17201746
| EYtSettingType::MonotonicKeys
17211747
| EYtSettingType::MutationId
1748+
| EYtSettingType::ColumnGroups
17221749
, ctx))
17231750
{
17241751
return TStatus::Error;
@@ -1727,13 +1754,6 @@ class TYtDataSinkTypeAnnotationTransformer : public TVisitorTransformerBase {
17271754
if (!NYql::HasSetting(*table->Child(TYtTable::idx_Settings), EYtSettingType::Anonymous)
17281755
|| !table->Child(TYtTable::idx_Name)->Content().StartsWith("tmp/"))
17291756
{
1730-
EYtWriteMode mode = EYtWriteMode::Renew;
1731-
if (auto modeSetting = NYql::GetSetting(*settings, EYtSettingType::Mode)) {
1732-
mode = FromString<EYtWriteMode>(modeSetting->Child(1)->Content());
1733-
}
1734-
const bool initialWrite = NYql::HasSetting(*settings, EYtSettingType::Initial);
1735-
const bool monotonicKeys = NYql::HasSetting(*settings, EYtSettingType::MonotonicKeys);
1736-
17371757
auto publish = TYtPublish(input);
17381758

17391759
TVector<TYqlRowSpecInfo::TPtr> contentRowSpecs;
@@ -1744,7 +1764,7 @@ class TYtDataSinkTypeAnnotationTransformer : public TVisitorTransformerBase {
17441764
}
17451765
}
17461766
TExprNode::TPtr content; // Don't try to convert content
1747-
auto status = ValidateTableWrite(ctx.GetPosition(input->Pos()), table, content, itemType, contentRowSpecs, TString{publish.DataSink().Cluster().Value()}, mode, initialWrite, monotonicKeys, ctx);
1767+
auto status = ValidateTableWrite(ctx.GetPosition(input->Pos()), table, content, itemType, contentRowSpecs, publish.DataSink().Cluster().StringValue(), *settings, ctx);
17481768
if (TStatus::Ok != status.Level) {
17491769
return status;
17501770
}

ydb/library/yql/providers/yt/provider/yql_yt_op_settings.cpp

+42-7
Original file line numberDiff line numberDiff line change
@@ -836,32 +836,32 @@ bool ValidateSettings(const TExprNode& settingsNode, EYtSettingTypes accepted, T
836836
if (it->second.IsEntity()) {
837837
if (hasDef) {
838838
ctx.AddError(TIssue(ctx.GetPosition(setting->Tail().Pos()), TStringBuilder()
839-
<< "Not more than one map key should have # value: "
839+
<< "Not more than one group should have # value: "
840840
<< it->first.Quote()));
841841
return false;
842842
}
843843
hasDef = true;
844844
} else if (!it->second.IsList()) {
845845
ctx.AddError(TIssue(ctx.GetPosition(setting->Tail().Pos()), TStringBuilder()
846-
<< "Expected Yson map key having list value: "
846+
<< "Expected list value, group: "
847847
<< it->first.Quote()));
848848
return false;
849-
} else if (it->second.AsList().empty()) {
849+
} else if (it->second.AsList().size() < 2) {
850850
ctx.AddError(TIssue(ctx.GetPosition(setting->Tail().Pos()), TStringBuilder()
851-
<< "Expected Yson map key having non empty list value: "
851+
<< "Expected list with at least two columns, group: "
852852
<< it->first.Quote()));
853853
return false;
854854
} else {
855855
for (const auto& item: it->second.AsList()) {
856856
if (!item.IsString()) {
857857
ctx.AddError(TIssue(ctx.GetPosition(setting->Tail().Pos()), TStringBuilder()
858858
<< "Expected string value in list, found "
859-
<< item.GetType() << ", key: " << it->first.Quote()));
859+
<< item.GetType() << ", group: " << it->first.Quote()));
860860
return false;
861861
}
862862
if (!uniqColumns.insert(item.AsString()).second) {
863863
ctx.AddError(TIssue(ctx.GetPosition(setting->Tail().Pos()), TStringBuilder()
864-
<< "Duplicate column value " << item.AsString().Quote()));
864+
<< "Duplicate column " << item.AsString().Quote()));
865865
return false;
866866
}
867867
}
@@ -907,7 +907,7 @@ bool ValidateSettings(const TExprNode& settingsNode, EYtSettingTypes accepted, T
907907
}
908908
}
909909
} else {
910-
for (auto type: {EYtSettingType::Expiration, EYtSettingType::Media, EYtSettingType::PrimaryMedium, EYtSettingType::KeepMeta, EYtSettingType::ColumnGroups}) {
910+
for (auto type: {EYtSettingType::Expiration, EYtSettingType::Media, EYtSettingType::PrimaryMedium, EYtSettingType::KeepMeta}) {
911911
if (used.HasFlags(type)) {
912912
ctx.AddError(TIssue(ctx.GetPosition(settingsNode.Pos()), TStringBuilder()
913913
<< ToString(type).Quote()
@@ -920,6 +920,41 @@ bool ValidateSettings(const TExprNode& settingsNode, EYtSettingTypes accepted, T
920920
return true;
921921
}
922922

923+
bool ValidateColumnGroups(const TExprNode& setting, const TStructExprType& rowType, TExprContext& ctx) {
924+
const auto columnGroups = NYT::NodeFromYsonString(setting.Tail().Content());
925+
TIssueScopeGuard issueScope(ctx.IssueManager, [&]() {
926+
return MakeIntrusive<TIssue>(ctx.GetPosition(setting.Pos()), TStringBuilder() << "Setting " << setting.Head().Content());
927+
});
928+
929+
for (const auto& grp: columnGroups.AsMap()) {
930+
if (!grp.second.IsEntity()) {
931+
for (const auto& col: grp.second.AsList()) {
932+
if (!rowType.FindItem(col.AsString())) {
933+
ctx.AddError(TIssue(ctx.GetPosition(setting.Pos()), TStringBuilder()
934+
<< "Column group " << grp.first.Quote() << " refers to unknown column " << col.AsString().Quote()));
935+
return false;
936+
}
937+
}
938+
}
939+
}
940+
return true;
941+
}
942+
943+
TString NormalizeColumnGroupSpec(const TStringBuf spec) {
944+
try {
945+
auto columnGroups = NYT::NodeFromYsonString(spec);
946+
for (auto& grp: columnGroups.AsMap()) {
947+
if (!grp.second.IsEntity()) {
948+
std::stable_sort(grp.second.AsList().begin(), grp.second.AsList().end(), [](const auto& l, const auto& r) { return l.AsString() < r.AsString(); });
949+
}
950+
}
951+
return NYT::NodeToCanonicalYsonString(columnGroups);
952+
} catch (...) {
953+
// Keep as is. Type annotation will add user friendly error later
954+
return TString{spec};
955+
}
956+
}
957+
923958
TExprNode::TPtr GetSetting(const TExprNode& settings, EYtSettingType type) {
924959
for (auto& setting : settings.Children()) {
925960
if (setting->ChildrenSize() != 0 && FromString<EYtSettingType>(setting->Child(0)->Content()) == type) {

0 commit comments

Comments
 (0)