@@ -1120,6 +1120,52 @@ using TBaseComputation = TStatefulWideFlowCodegeneratorNode<TBlockExpandChunkedW
1120
1120
const size_t WideFieldsIndex_;
1121
1121
};
1122
1122
1123
+ class TBlockExpandChunkedStreamWrapper : public TMutableComputationNode <TBlockExpandChunkedStreamWrapper> {
1124
+ using TBaseComputation = TMutableComputationNode<TBlockExpandChunkedStreamWrapper>;
1125
+ class TExpanderState : public TComputationValue <TExpanderState> {
1126
+ using TBase = TComputationValue<TExpanderState>;
1127
+ public:
1128
+ TExpanderState (TMemoryUsageInfo* memInfo, TComputationContext& ctx, NUdf::TUnboxedValue&& stream, size_t width)
1129
+ : TBase(memInfo), HolderFactory_(ctx.HolderFactory), State_(ctx.HolderFactory.Create<TBlockState>(width)), Stream_(stream) {}
1130
+
1131
+ NUdf::EFetchStatus WideFetch (NUdf::TUnboxedValue* output, ui32 width) {
1132
+ auto & s = *static_cast <TBlockState*>(State_.AsBoxed ().Get ());
1133
+ if (!s.Count ) {
1134
+ s.ClearValues ();
1135
+ auto result = Stream_.WideFetch (s.Values .data (), width);
1136
+ if (NUdf::EFetchStatus::Ok != result) {
1137
+ return result;
1138
+ }
1139
+ s.FillArrays ();
1140
+ }
1141
+
1142
+ const auto sliceSize = s.Slice ();
1143
+ for (size_t i = 0 ; i < width; ++i) {
1144
+ output[i] = s.Get (sliceSize, HolderFactory_, i);
1145
+ }
1146
+ return NUdf::EFetchStatus::Ok;
1147
+ }
1148
+
1149
+ private:
1150
+ const THolderFactory& HolderFactory_;
1151
+ NUdf::TUnboxedValue State_;
1152
+ NUdf::TUnboxedValue Stream_;
1153
+ };
1154
+ public:
1155
+ TBlockExpandChunkedStreamWrapper (TComputationMutables& mutables, IComputationNode* stream, size_t width)
1156
+ : TBaseComputation(mutables, EValueRepresentation::Boxed)
1157
+ , Stream_(stream)
1158
+ , Width_(width) {}
1159
+
1160
+ NUdf::TUnboxedValuePod DoCalculate (TComputationContext& ctx) const {
1161
+ return ctx.HolderFactory .Create <TExpanderState>(ctx, std::move (Stream_->GetValue (ctx)), Width_);
1162
+ }
1163
+ void RegisterDependencies () const override {}
1164
+ private:
1165
+ IComputationNode* const Stream_;
1166
+ const size_t Width_;
1167
+ };
1168
+
1123
1169
} // namespace
1124
1170
1125
1171
IComputationNode* WrapToBlocks (TCallable& callable, const TComputationNodeFactoryContext& ctx) {
@@ -1184,13 +1230,21 @@ IComputationNode* WrapReplicateScalar(TCallable& callable, const TComputationNod
1184
1230
1185
1231
IComputationNode* WrapBlockExpandChunked (TCallable& callable, const TComputationNodeFactoryContext& ctx) {
1186
1232
MKQL_ENSURE (callable.GetInputsCount () == 1 , " Expected 1 args, got " << callable.GetInputsCount ());
1187
-
1188
- const auto flowType = AS_TYPE (TFlowType, callable.GetInput (0 ).GetStaticType ());
1189
- const auto wideComponents = GetWideComponents (flowType);
1190
-
1191
- const auto wideFlow = dynamic_cast <IComputationWideFlowNode*>(LocateNode (ctx.NodeLocator , callable, 0 ));
1192
- MKQL_ENSURE (wideFlow != nullptr , " Expected wide flow node" );
1193
- return new TBlockExpandChunkedWrapper (ctx.Mutables , wideFlow, wideComponents.size ());
1233
+ if (callable.GetInput (0 ).GetStaticType ()->IsStream ()) {
1234
+ const auto streamType = AS_TYPE (TStreamType, callable.GetInput (0 ).GetStaticType ());
1235
+ const auto wideComponents = GetWideComponents (streamType);
1236
+ const auto computation = dynamic_cast <IComputationNode*>(LocateNode (ctx.NodeLocator , callable, 0 ));
1237
+
1238
+ MKQL_ENSURE (computation != nullptr , " Expected computation node" );
1239
+ return new TBlockExpandChunkedStreamWrapper (ctx.Mutables , computation, wideComponents.size ());
1240
+ } else {
1241
+ const auto flowType = AS_TYPE (TFlowType, callable.GetInput (0 ).GetStaticType ());
1242
+ const auto wideComponents = GetWideComponents (flowType);
1243
+
1244
+ const auto wideFlow = dynamic_cast <IComputationWideFlowNode*>(LocateNode (ctx.NodeLocator , callable, 0 ));
1245
+ MKQL_ENSURE (wideFlow != nullptr , " Expected wide flow node" );
1246
+ return new TBlockExpandChunkedWrapper (ctx.Mutables , wideFlow, wideComponents.size ());
1247
+ }
1194
1248
}
1195
1249
1196
1250
}
0 commit comments