Skip to content

Commit a805964

Browse files
authored
streamlookup fixes (ydb-platform#8258)
1 parent db03225 commit a805964

File tree

1 file changed

+26
-7
lines changed

1 file changed

+26
-7
lines changed

ydb/library/yql/dq/actors/input_transforms/dq_input_transform_lookup.cpp

Lines changed: 26 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -168,9 +168,8 @@ class TInputTransformStreamLookupBase
168168
const auto maxKeysInRequest = LookupSource.first->GetMaxSupportedKeysInRequest();
169169
IDqAsyncLookupSource::TUnboxedValueMap keysForLookup{maxKeysInRequest, KeyTypeHelper->GetValueHash(), KeyTypeHelper->GetValueEqual()};
170170
while (
171-
((InputFlowFetchStatus = FetchWideInputValue(inputRowItems)) == NUdf::EFetchStatus::Ok) &&
172-
(keysForLookup.size() < maxKeysInRequest)
173-
) {
171+
(keysForLookup.size() < maxKeysInRequest) &&
172+
((InputFlowFetchStatus = FetchWideInputValue(inputRowItems)) == NUdf::EFetchStatus::Ok)) {
174173
NUdf::TUnboxedValue* keyItems;
175174
NUdf::TUnboxedValue key = HolderFactory.CreateDirectArrayHolder(InputJoinColumns.size(), keyItems);
176175
for (size_t i = 0; i != InputJoinColumns.size(); ++i) {
@@ -384,6 +383,25 @@ THashMap<TStringBuf, size_t> GetNameToIndex(const ::google::protobuf::RepeatedPt
384383
return result;
385384
}
386385

386+
THashMap<TStringBuf, size_t> GetNameToIndex(const NMiniKQL::TStructType* type) {
387+
THashMap<TStringBuf, size_t> result;
388+
for (ui32 i = 0; i != type->GetMembersCount(); ++i) {
389+
result[type->GetMemberName(i)] = i;
390+
}
391+
return result;
392+
}
393+
394+
TVector<size_t> GetJoinColumnIndexes(const ::google::protobuf::RepeatedPtrField<TProtoStringType>& names, const THashMap<TStringBuf, size_t>& joinColumns) {
395+
TVector<size_t> result;
396+
result.reserve(joinColumns.size());
397+
for (int i = 0; i != names.size(); ++i) {
398+
if (auto p = joinColumns.FindPtr(names[i])) {
399+
result.push_back(*p);
400+
}
401+
}
402+
return result;
403+
}
404+
387405
TVector<size_t> GetJoinColumnIndexes(const NMiniKQL::TStructType* type, const THashMap<TStringBuf, size_t>& joinColumns) {
388406
TVector<size_t> result;
389407
result.reserve(joinColumns.size());
@@ -411,14 +429,15 @@ std::pair<IDqComputeActorAsyncInput*, NActors::IActor*> CreateInputTransformStre
411429

412430
const auto rightRowType = DeserializeStructType(settings.GetRightSource().GetSerializedRowType(), args.TypeEnv);
413431

414-
auto leftJoinColumns = GetNameToIndex(settings.GetLeftJoinKeyNames());
432+
auto inputColumns = GetNameToIndex(narrowInputRowType);
415433
auto rightJoinColumns = GetNameToIndex(settings.GetRightJoinKeyNames());
416-
Y_ABORT_UNLESS(leftJoinColumns.size() == rightJoinColumns.size());
417434

418-
auto leftJoinColumnIndexes = GetJoinColumnIndexes(narrowInputRowType, leftJoinColumns);
419-
Y_ABORT_UNLESS(leftJoinColumnIndexes.size() == leftJoinColumns.size());
435+
auto leftJoinColumnIndexes = GetJoinColumnIndexes(
436+
settings.GetLeftJoinKeyNames(),
437+
inputColumns);
420438
auto rightJoinColumnIndexes = GetJoinColumnIndexes(rightRowType, rightJoinColumns);
421439
Y_ABORT_UNLESS(rightJoinColumnIndexes.size() == rightJoinColumns.size());
440+
Y_ABORT_UNLESS(leftJoinColumnIndexes.size() == rightJoinColumnIndexes.size());
422441

423442
const auto& [lookupKeyType, lookupPayloadType] = SplitLookupTableColumns(rightRowType, rightJoinColumns, args.TypeEnv);
424443
const auto& outputColumnsOrder = CategorizeOutputRowItems(

0 commit comments

Comments
 (0)