1
+ #include < ydb/library/yql/minikql/mkql_alloc.h>
2
+ #include < ydb/library/yql/minikql/mkql_node.h>
3
+ #include < ydb/library/yql/minikql/mkql_node_builder.h>
4
+ #include < ydb/library/yql/public/udf/udf_value.h>
5
+ #include < ydb/library/yql/minikql/mkql_type_builder.h>
6
+
7
+ #include < ydb/library/yql/providers/generic/actors/yql_generic_lookup_actor.h>
8
+
9
+ #include < ydb/library/actors/testlib/test_runtime.h>
10
+ #include < ydb/library/yql/providers/generic/connector/libcpp/ut_helpers/connector_client_mock.h>
11
+ #include < ydb/library/yql/providers/generic/connector/libcpp/ut_helpers/test_creds.h>
12
+ #include < ydb/library/yql/providers/generic/actors/yql_generic_lookup_actor.h>
13
+ #include < library/cpp/testing/unittest/registar.h>
14
+
15
+ #include < ydb/library/yql/utils/log/proto/logger_config.pb.h>
16
+ #include < ydb/library/yql/utils/log/log.h>
17
+
18
+ using namespace NYql ::NConnector;
19
+ using namespace NYql ::NConnector::NTest;
20
+ using namespace NYql ;
21
+ using namespace NActors ;
22
+
23
+ Y_UNIT_TEST_SUITE (GenericProviderLookupActor) {
24
+
25
+ // Simple actor to call IDqAsyncLookupSource::AsyncLookup from an actor system's thread
26
+ class TCallLookupActor : public TActorBootstrapped <TCallLookupActor> {
27
+ public:
28
+ TCallLookupActor (
29
+ std::shared_ptr<NKikimr::NMiniKQL::TScopedAlloc> alloc,
30
+ NYql::NDq::IDqAsyncLookupSource* lookupSource,
31
+ NKikimr::NMiniKQL::TUnboxedValueVector&& keysToLookUp)
32
+ : Alloc(alloc)
33
+ , LookupSource(lookupSource)
34
+ , KeysToLookUp(std::move(keysToLookUp))
35
+ {
36
+ }
37
+
38
+ void Bootstrap () {
39
+ LookupSource->AsyncLookup (std::move (KeysToLookUp));
40
+ auto guard = Guard (*Alloc);
41
+ KeysToLookUp.clear ();
42
+ KeysToLookUp.shrink_to_fit ();
43
+ }
44
+
45
+ private:
46
+ static constexpr char ActorName[] = " TEST" ;
47
+
48
+ private:
49
+ std::shared_ptr<NKikimr::NMiniKQL::TScopedAlloc> Alloc;
50
+ NYql::NDq::IDqAsyncLookupSource* LookupSource;
51
+ NKikimr::NMiniKQL::TUnboxedValueVector KeysToLookUp;
52
+ };
53
+
54
+ Y_UNIT_TEST (Lookup) {
55
+ auto alloc = std::make_shared<NKikimr::NMiniKQL::TScopedAlloc>(__LOCATION__, NKikimr::TAlignedPagePoolCounters (), true , false );
56
+ NKikimr::NMiniKQL::TMemoryUsageInfo memUsage (" TestMemUsage" );
57
+ NKikimr::NMiniKQL::THolderFactory holderFactory (alloc->Ref (), memUsage);
58
+ NKikimr::NMiniKQL::TTypeEnvironment typeEnv (*alloc);
59
+ NKikimr::NMiniKQL::TTypeBuilder typeBuilder (typeEnv);
60
+
61
+ auto loggerConfig = NYql::NProto::TLoggingConfig ();
62
+ loggerConfig.set_allcomponentslevel (::NYql::NProto::TLoggingConfig_ELevel::TLoggingConfig_ELevel_TRACE);
63
+ NYql::NLog::InitLogger (loggerConfig, false );
64
+
65
+ TTestActorRuntimeBase runtime;
66
+ runtime.Initialize ();
67
+ auto edge = runtime.AllocateEdgeActor ();
68
+
69
+ NYql::NConnector::NApi::TDataSourceInstance dsi;
70
+ dsi.Setkind (NYql::NConnector::NApi::EDataSourceKind::YDB);
71
+ dsi.mutable_endpoint ()->Sethost (" some_host" );
72
+ dsi.mutable_endpoint ()->Setport (2135 );
73
+ dsi.Setdatabase (" some_db" );
74
+ dsi.Setuse_tls (true );
75
+ dsi.set_protocol (::NYql::NConnector::NApi::EProtocol::NATIVE);
76
+ auto token = dsi.mutable_credentials () -> mutable_token ();
77
+ token->Settype (" IAM" );
78
+ token->Setvalue (" TEST_TOKEN" );
79
+
80
+ auto connectorMock = std::make_shared<NYql::NConnector::NTest::TConnectorClientMock>();
81
+
82
+ // clang-format off
83
+ // step 1: ListSplits
84
+ connectorMock->ExpectListSplits ()
85
+ .Select ()
86
+ .DataSourceInstance (dsi)
87
+ .What ()
88
+ .Column (" id" , Ydb::Type::UINT64)
89
+ .NullableColumn (" optional_id" , Ydb::Type::UINT64)
90
+ .NullableColumn (" string_value" , Ydb::Type::STRING)
91
+ .Done ()
92
+ .Table (" lookup_test" )
93
+ .Where ()
94
+ .Filter ()
95
+ .Disjunction ()
96
+ .Operand ()
97
+ .Conjunction ()
98
+ .Operand ().Equal ().Column (" id" ).Value <ui64>(0 ).Done ().Done ()
99
+ .Operand ().Equal ().Column (" optional_id" ).OptionalValue <ui64>(100 ).Done ().Done ()
100
+ .Done ()
101
+ .Done ()
102
+ .Operand ()
103
+ .Conjunction ()
104
+ .Operand ().Equal ().Column (" id" ).Value <ui64>(1 ).Done ().Done ()
105
+ .Operand ().Equal ().Column (" optional_id" ).OptionalValue <ui64>(101 ).Done ().Done ()
106
+ .Done ()
107
+ .Done ()
108
+ .Operand ()
109
+ .Conjunction ()
110
+ .Operand ().Equal ().Column (" id" ).Value <ui64>(2 ).Done ().Done ()
111
+ .Operand ().Equal ().Column (" optional_id" ).OptionalValue <ui64>(102 ).Done ().Done ()
112
+ .Done ()
113
+ .Done ()
114
+ .Done ()
115
+ .Done ()
116
+ .Done ()
117
+ .Done ()
118
+ .MaxSplitCount (1 )
119
+ .Result ()
120
+ .AddResponse (NewSuccess ())
121
+ .Description (" Actual split info is not important" )
122
+ ;
123
+
124
+ connectorMock->ExpectReadSplits ()
125
+ .DataSourceInstance (dsi)
126
+ .Split ()
127
+ .Description (" Actual split info is not important" )
128
+ .Done ()
129
+ .Result ()
130
+ .AddResponse (
131
+ MakeRecordBatch (
132
+ MakeArray<arrow::UInt64Builder, ui64>(" id" , {0 , 1 , 2 }, arrow::uint64 ()),
133
+ MakeArray<arrow::UInt64Builder, ui64>(" optional_id" , {100 , 101 , 103 }, arrow::uint64 ()), // the last value is intentially wrong
134
+ MakeArray<arrow::StringBuilder, std::string>(" string_value" , {" a" , " b" , " c" }, arrow::utf8 ())
135
+ ),
136
+ NewSuccess ()
137
+ )
138
+ ;
139
+ // clang-format on
140
+
141
+ NYql::Generic::TLookupSource lookupSourceSettings;
142
+ *lookupSourceSettings.mutable_data_source_instance () = dsi;
143
+ lookupSourceSettings.Settable (" lookup_test" );
144
+ lookupSourceSettings.SetServiceAccountId (" testsaid" );
145
+ lookupSourceSettings.SetServiceAccountIdSignature (" fake_signature" );
146
+
147
+ google::protobuf::Any packedLookupSource;
148
+ Y_ABORT_UNLESS (packedLookupSource.PackFrom (lookupSourceSettings));
149
+
150
+ NKikimr::NMiniKQL::TStructTypeBuilder keyTypeBuilder{typeEnv};
151
+ keyTypeBuilder.Add (" id" , typeBuilder.NewDataType (NUdf::EDataSlot::Uint64, false ));
152
+ keyTypeBuilder.Add (" optional_id" , typeBuilder.NewDataType (NUdf::EDataSlot::Uint64, true ));
153
+ NKikimr::NMiniKQL::TStructTypeBuilder outputypeBuilder{typeEnv};
154
+ outputypeBuilder.Add (" string_value" , typeBuilder.NewDataType (NUdf::EDataSlot::String, true ));
155
+
156
+ auto guard = Guard (*alloc.get ());
157
+
158
+ auto [lookupSource, actor] = NYql::NDq::CreateGenericLookupActor (
159
+ connectorMock,
160
+ std::make_shared<NTestCreds::TSecuredServiceAccountCredentialsFactory>(),
161
+ edge,
162
+ alloc,
163
+ std::move (lookupSourceSettings),
164
+ keyTypeBuilder.Build (),
165
+ outputypeBuilder.Build (),
166
+ typeEnv,
167
+ holderFactory,
168
+ 1'000'000 );
169
+ runtime.Register (actor);
170
+
171
+ NKikimr::NMiniKQL::TUnboxedValueVector keys;
172
+ for (size_t i = 0 ; i != 3 ; ++i) {
173
+ NUdf::TUnboxedValue* keyItems;
174
+ auto key = holderFactory.CreateDirectArrayHolder (2 , keyItems);
175
+ keyItems[0 ] = NUdf::TUnboxedValuePod (ui64 (i));
176
+ keyItems[1 ] = NUdf::TUnboxedValuePod (ui64 (100 + i));
177
+ keys.push_back (std::move (key));
178
+ }
179
+
180
+ guard.Release (); // let actors use alloc
181
+
182
+ auto callLookupActor = new TCallLookupActor (alloc, lookupSource, std::move (keys));
183
+ runtime.Register (callLookupActor);
184
+
185
+ auto ev = runtime.GrabEdgeEventRethrow <NYql::NDq::IDqAsyncLookupSource::TEvLookupResult>(edge);
186
+ auto guard2 = Guard (*alloc.get ());
187
+ NKikimr::NMiniKQL::TKeyPayloadPairVector lookupResult = std::move (ev->Get ()->Data );
188
+
189
+ UNIT_ASSERT_EQUAL (3 , lookupResult.size ());
190
+ {
191
+ auto & [k, v] = lookupResult[0 ];
192
+ UNIT_ASSERT_EQUAL (0 , k.GetElement (0 ).Get <ui64>());
193
+ UNIT_ASSERT_EQUAL (100 , k.GetElement (1 ).Get <ui64>());
194
+ NUdf::TUnboxedValue val = v.GetElement (0 );
195
+ UNIT_ASSERT (val.AsStringRef () == TStringBuf (" a" ));
196
+ }
197
+ {
198
+ auto & [k, v] = lookupResult[1 ];
199
+ UNIT_ASSERT_EQUAL (1 , k.GetElement (0 ).Get <ui64>());
200
+ UNIT_ASSERT_EQUAL (101 , k.GetElement (1 ).Get <ui64>());
201
+ NUdf::TUnboxedValue val = v.GetElement (0 );
202
+ UNIT_ASSERT (val.AsStringRef () == TStringBuf (" b" ));
203
+ }
204
+ {
205
+ auto & [k, v] = lookupResult[2 ];
206
+ UNIT_ASSERT_EQUAL (2 , k.GetElement (0 ).Get <ui64>());
207
+ UNIT_ASSERT_EQUAL (102 , k.GetElement (1 ).Get <ui64>());
208
+ // this key was not found and reported as empty
209
+ UNIT_ASSERT (!v);
210
+ }
211
+ }
212
+
213
+ } // Y_UNIT_TEST_SUITE(GenericProviderLookupActor)
0 commit comments