Skip to content

Commit 230face

Browse files
authored
Merge 51d01a3 into 9b28941
2 parents 9b28941 + 51d01a3 commit 230face

File tree

12 files changed

+122
-9
lines changed

12 files changed

+122
-9
lines changed

ydb/core/kqp/compile_service/kqp_compile_actor.cpp

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -275,7 +275,7 @@ class TKqpCompileActor : public TActorBootstrapped<TKqpCompileActor> {
275275

276276
KqpHost = CreateKqpHost(Gateway, QueryId.Cluster, QueryId.Database, Config, ModuleResolverState->ModuleResolver,
277277
FederatedQuerySetup, UserToken, GUCSettings, ApplicationName, AppData(ctx)->FunctionRegistry,
278-
false, false, std::move(TempTablesState), nullptr, SplitCtx);
278+
false, false, std::move(TempTablesState), nullptr, SplitCtx, QueryServiceConfig);
279279

280280
IKqpHost::TPrepareSettings prepareSettings;
281281
prepareSettings.DocumentApiRestricted = QueryId.Settings.DocumentApiRestricted;

ydb/core/kqp/executer_actor/ut/kqp_executer_ut.cpp

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@
88
#include <ydb/library/yql/core/services/mounts/yql_mounts.h>
99

1010
#include <library/cpp/protobuf/util/pb_io.h>
11+
#include <ydb/core/protos/config.pb.h>
1112

1213
namespace NKikimr {
1314
namespace NKqp {
@@ -28,7 +29,7 @@ NKqpProto::TKqpPhyTx BuildTxPlan(const TString& sql, TIntrusivePtr<IKqpGateway>
2829
IModuleResolver::TPtr moduleResolver;
2930
UNIT_ASSERT(GetYqlDefaultModuleResolver(moduleCtx, moduleResolver));
3031

31-
auto qp = CreateKqpHost(gateway, cluster, "/Root", config, moduleResolver, NYql::IHTTPGateway::Make(), nullptr, nullptr, Nothing(), nullptr, nullptr, false, false, nullptr, actorSystem);
32+
auto qp = CreateKqpHost(gateway, cluster, "/Root", config, moduleResolver, NYql::IHTTPGateway::Make(), nullptr, nullptr, Nothing(), nullptr, nullptr, false, false, nullptr, actorSystem, nullptr, NKikimrConfig::TQueryServiceConfig());
3233
auto result = qp->SyncPrepareDataQuery(sql, IKqpHost::TPrepareSettings());
3334
result.Issues().PrintTo(Cerr);
3435
UNIT_ASSERT(result.Success());

ydb/core/kqp/host/kqp_host.cpp

Lines changed: 11 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1033,7 +1033,7 @@ class TKqpHost : public IKqpHost {
10331033
std::optional<TKqpFederatedQuerySetup> federatedQuerySetup, const TIntrusiveConstPtr<NACLib::TUserToken>& userToken,
10341034
const NKikimr::NMiniKQL::IFunctionRegistry* funcRegistry, bool keepConfigChanges, bool isInternalCall,
10351035
TKqpTempTablesState::TConstPtr tempTablesState = nullptr, NActors::TActorSystem* actorSystem = nullptr,
1036-
NYql::TExprContext* ctx = nullptr)
1036+
NYql::TExprContext* ctx = nullptr, const NKikimrConfig::TQueryServiceConfig& queryServiceConfig = NKikimrConfig::TQueryServiceConfig())
10371037
: Gateway(gateway)
10381038
, Cluster(cluster)
10391039
, GUCSettings(gUCSettings)
@@ -1051,6 +1051,7 @@ class TKqpHost : public IKqpHost {
10511051
, FakeWorld(ctx ? nullptr : ExprCtx->NewWorld(TPosition()))
10521052
, ExecuteCtx(MakeIntrusive<TExecuteContext>())
10531053
, ActorSystem(actorSystem ? actorSystem : NActors::TActivationContext::ActorSystem())
1054+
, QueryServiceConfig(queryServiceConfig)
10541055
{
10551056
if (funcRegistry) {
10561057
FuncRegistry = funcRegistry;
@@ -1825,10 +1826,15 @@ class TKqpHost : public IKqpHost {
18251826
|| settingName == "FilterPushdownOverJoinOptionalSide"
18261827
|| settingName == "DisableFilterPushdownOverJoinOptionalSide"
18271828
|| settingName == "RotateJoinTree"
1829+
|| settingName == "TimeOrderRecoverDelay"
1830+
|| settingName == "TimeOrderRecoverAhead"
1831+
|| settingName == "TimeOrderRecoverRowLimit"
1832+
|| settingName == "MatchRecognizeStream"
18281833
;
18291834
};
18301835
auto configProvider = CreateConfigProvider(*TypesCtx, gatewaysConfig, {}, allowSettings);
18311836
TypesCtx->AddDataSource(ConfigProviderName, configProvider);
1837+
TypesCtx->MatchRecognize = QueryServiceConfig.GetEnableMatchRecognize();
18321838

18331839
YQL_ENSURE(TypesCtx->Initialize(*ExprCtx));
18341840

@@ -1930,6 +1936,7 @@ class TKqpHost : public IKqpHost {
19301936

19311937
TKqpTempTablesState::TConstPtr TempTablesState;
19321938
NActors::TActorSystem* ActorSystem = nullptr;
1939+
NKikimrConfig::TQueryServiceConfig QueryServiceConfig;
19331940
};
19341941

19351942
} // namespace
@@ -1951,10 +1958,11 @@ TIntrusivePtr<IKqpHost> CreateKqpHost(TIntrusivePtr<IKqpGateway> gateway, const
19511958
const TString& database, TKikimrConfiguration::TPtr config, IModuleResolver::TPtr moduleResolver,
19521959
std::optional<TKqpFederatedQuerySetup> federatedQuerySetup, const TIntrusiveConstPtr<NACLib::TUserToken>& userToken, const TGUCSettings::TPtr& gUCSettings,
19531960
const TMaybe<TString>& applicationName, const NKikimr::NMiniKQL::IFunctionRegistry* funcRegistry, bool keepConfigChanges,
1954-
bool isInternalCall, TKqpTempTablesState::TConstPtr tempTablesState, NActors::TActorSystem* actorSystem, NYql::TExprContext* ctx)
1961+
bool isInternalCall, TKqpTempTablesState::TConstPtr tempTablesState, NActors::TActorSystem* actorSystem, NYql::TExprContext* ctx,
1962+
const NKikimrConfig::TQueryServiceConfig& queryServiceConfig)
19551963
{
19561964
return MakeIntrusive<TKqpHost>(gateway, cluster, database, gUCSettings, applicationName, config, moduleResolver, federatedQuerySetup, userToken, funcRegistry,
1957-
keepConfigChanges, isInternalCall, std::move(tempTablesState), actorSystem, ctx);
1965+
keepConfigChanges, isInternalCall, std::move(tempTablesState), actorSystem, ctx, queryServiceConfig);
19581966
}
19591967

19601968
} // namespace NKqp

ydb/core/kqp/host/kqp_host.h

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -123,7 +123,8 @@ TIntrusivePtr<IKqpHost> CreateKqpHost(TIntrusivePtr<IKqpGateway> gateway,
123123
const TMaybe<TString>& applicationName = Nothing(), const NKikimr::NMiniKQL::IFunctionRegistry* funcRegistry = nullptr,
124124
bool keepConfigChanges = false, bool isInternalCall = false, TKqpTempTablesState::TConstPtr tempTablesState = nullptr,
125125
NActors::TActorSystem* actorSystem = nullptr /*take from TLS by default*/,
126-
NYql::TExprContext* ctx = nullptr);
126+
NYql::TExprContext* ctx = nullptr,
127+
const NKikimrConfig::TQueryServiceConfig& queryServiceConfig = NKikimrConfig::TQueryServiceConfig());
127128

128129
} // namespace NKqp
129130
} // namespace NKikimr

ydb/core/kqp/opt/logical/kqp_opt_log.cpp

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@
66
#include <ydb/core/kqp/opt/physical/kqp_opt_phy_rules.h>
77
#include <ydb/core/kqp/provider/yql_kikimr_provider_impl.h>
88

9+
#include <ydb/library/yql/core/yql_opt_match_recognize.h>
910
#include <ydb/library/yql/core/yql_opt_utils.h>
1011
#include <ydb/library/yql/dq/opt/dq_opt_join.h>
1112
#include <ydb/library/yql/dq/opt/dq_opt_log.h>
@@ -60,6 +61,7 @@ class TKqpLogicalOptTransformer : public TOptimizeTransformerBase {
6061
AddHandler(0, &TCoNarrowFlatMap::Match, HNDL(DqReadWideWrapFieldSubset));
6162
AddHandler(0, &TCoNarrowMultiMap::Match, HNDL(DqReadWideWrapFieldSubset));
6263
AddHandler(0, &TCoWideMap::Match, HNDL(DqReadWideWrapFieldSubset));
64+
AddHandler(0, &TCoMatchRecognize::Match, HNDL(MatchRecognize));
6365

6466
AddHandler(1, &TCoTop::Match, HNDL(RewriteTopSortOverIndexRead));
6567
AddHandler(1, &TCoTopSort::Match, HNDL(RewriteTopSortOverIndexRead));
@@ -288,6 +290,14 @@ class TKqpLogicalOptTransformer : public TOptimizeTransformerBase {
288290
return output;
289291
}
290292

293+
TMaybeNode<TExprBase> MatchRecognize(TExprBase node, TExprContext& ctx) {
294+
auto output = ExpandMatchRecognize(node.Ptr(), ctx, TypesCtx);
295+
if (output) {
296+
DumpAppliedRule("MatchRecognize", node.Ptr(), output, ctx);
297+
}
298+
return output;
299+
}
300+
291301
TMaybeNode<TExprBase> DqReadWrapByProvider(TExprBase node, TExprContext& ctx) {
292302
auto output = NDq::DqReadWrapByProvider(node, ctx, TypesCtx);
293303
if (output) {

ydb/core/kqp/session_actor/kqp_worker_actor.cpp

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -188,7 +188,7 @@ class TKqpWorkerActor : public TActorBootstrapped<TKqpWorkerActor> {
188188
Config->FeatureFlags = AppData(ctx)->FeatureFlags;
189189

190190
KqpHost = CreateKqpHost(Gateway, Settings.Cluster, Settings.Database, Config, ModuleResolverState->ModuleResolver, FederatedQuerySetup,
191-
QueryState->RequestEv->GetUserToken(), GUCSettings, Settings.ApplicationName, AppData(ctx)->FunctionRegistry, !Settings.LongSession, false);
191+
QueryState->RequestEv->GetUserToken(), GUCSettings, Settings.ApplicationName, AppData(ctx)->FunctionRegistry, !Settings.LongSession, false, nullptr, nullptr, nullptr, QueryServiceConfig);
192192

193193
auto& queryRequest = QueryState->RequestEv;
194194
QueryState->ProxyRequestId = proxyRequestId;

ydb/core/kqp/ut/indexes/kqp_indexes_ut.cpp

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -55,7 +55,7 @@ TIntrusivePtr<IKqpHost> CreateKikimrQueryProcessor(TIntrusivePtr<IKqpGateway> ga
5555

5656
auto federatedQuerySetup = std::make_optional<TKqpFederatedQuerySetup>({NYql::IHTTPGateway::Make(), nullptr, nullptr, nullptr, {}, {}, {}, nullptr, nullptr});
5757
return NKqp::CreateKqpHost(gateway, cluster, "/Root", kikimrConfig, moduleResolver,
58-
federatedQuerySetup, nullptr, nullptr, {}, funcRegistry, funcRegistry, keepConfigChanges, nullptr, actorSystem);
58+
federatedQuerySetup, nullptr, nullptr, {}, funcRegistry, funcRegistry, keepConfigChanges, nullptr, actorSystem, nullptr, NKikimrConfig::TQueryServiceConfig());
5959
}
6060

6161
NYql::NNodes::TExprBase GetExpr(const TString& ast, NYql::TExprContext& ctx, NYql::IModuleResolver* moduleResolver) {

ydb/core/kqp/ut/yql/kqp_pragma_ut.cpp

Lines changed: 89 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -84,6 +84,95 @@ Y_UNIT_TEST_SUITE(KqpPragma) {
8484
UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString());
8585
UNIT_ASSERT_C(result.GetIssues().Empty(), result.GetIssues().ToString());
8686
}
87+
88+
Y_UNIT_TEST(MatchRecognizeWithTimeOrderRecoverer) {
89+
TKikimrSettings settings;
90+
NKikimrConfig::TAppConfig appConfig;
91+
appConfig.MutableQueryServiceConfig()->SetEnableMatchRecognize(true);
92+
settings.SetAppConfig(appConfig);
93+
94+
TKikimrRunner kikimr(settings);
95+
NYdb::NScripting::TScriptingClient client(kikimr.GetDriver());
96+
97+
auto result = client.ExecuteYqlScript(R"(
98+
PRAGMA FeatureR010="prototype";
99+
100+
CREATE TABLE `/Root/NewTable` (
101+
dt Uint64,
102+
value String,
103+
PRIMARY KEY (dt)
104+
);
105+
COMMIT;
106+
107+
INSERT INTO `/Root/NewTable` (dt, value) VALUES
108+
(1, 'value1'), (2, 'value2'), (3, 'value3'), (4, 'value4');
109+
COMMIT;
110+
111+
SELECT * FROM (SELECT dt, value FROM `/Root/NewTable`)
112+
MATCH_RECOGNIZE(
113+
ORDER BY CAST(dt as Timestamp)
114+
MEASURES
115+
LAST(V1.dt) as v1,
116+
LAST(V4.dt) as v4
117+
ONE ROW PER MATCH
118+
PATTERN (V1 V* V4)
119+
DEFINE
120+
V1 as V1.value = "value1",
121+
V as True,
122+
V4 as V4.value = "value4"
123+
);
124+
)").GetValueSync();
125+
UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString());
126+
CompareYson(R"([
127+
[[1u];[4u]];
128+
])", FormatResultSetYson(result.GetResultSet(0)));
129+
}
130+
131+
// Not implemented.
132+
133+
// Y_UNIT_TEST(MatchRecognizeWithoutTimeOrderRecoverer) {
134+
// TKikimrSettings settings;
135+
// NKikimrConfig::TAppConfig appConfig;
136+
// appConfig.MutableQueryServiceConfig()->SetEnableMatchRecognize(true);
137+
// settings.SetAppConfig(appConfig);
138+
139+
// TKikimrRunner kikimr(settings);
140+
// NYdb::NScripting::TScriptingClient client(kikimr.GetDriver());
141+
142+
// auto result = client.ExecuteYqlScript(R"(
143+
// PRAGMA FeatureR010="prototype";
144+
// PRAGMA config.flags("MatchRecognizeStream", "disable");
145+
146+
// CREATE TABLE `/Root/NewTable` (
147+
// dt Uint64,
148+
// value String,
149+
// PRIMARY KEY (dt)
150+
// );
151+
// COMMIT;
152+
153+
// INSERT INTO `/Root/NewTable` (dt, value) VALUES
154+
// (1, 'value1'), (2, 'value2'), (3, 'value3'), (4, 'value4');
155+
// COMMIT;
156+
157+
158+
// SELECT * FROM (SELECT dt, value FROM `/Root/NewTable`)
159+
// MATCH_RECOGNIZE(
160+
// MEASURES
161+
// LAST(V1.dt) as v1,
162+
// LAST(V4.dt) as v4
163+
// ONE ROW PER MATCH
164+
// PATTERN (V1 V* V4)
165+
// DEFINE
166+
// V1 as V1.value = "value1",
167+
// V as True,
168+
// V4 as V4.value = "value4"
169+
// );
170+
// )").GetValueSync();
171+
// UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString());
172+
// CompareYson(R"([
173+
// [[1u];[4u]];
174+
// ])", FormatResultSetYson(result.GetResultSet(0)));
175+
// }
87176
}
88177

89178
} // namspace NKqp

ydb/core/protos/config.proto

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1015,6 +1015,7 @@ message TQueryServiceConfig {
10151015
optional TFinalizeScriptServiceConfig FinalizeScriptServiceConfig = 12;
10161016
optional uint64 ProgressStatsPeriodMs = 14 [default = 0]; // 0 = disabled
10171017
optional uint32 QueryTimeoutDefaultSeconds = 19 [default = 1800];
1018+
optional bool EnableMatchRecognize = 20 [default = false];
10181019
}
10191020

10201021
// Config describes immediate controls and allows

ydb/library/yql/tools/dqrun/examples/gateways.conf

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -124,6 +124,9 @@ YqlCore {
124124
Flags {
125125
Name: "_EnableStreamLookupJoin"
126126
}
127+
Flags {
128+
Name: "_EnableMatchRecognize"
129+
}
127130
}
128131

129132
SqlCore {

ydb/tests/fq/yt/kqp_yt_file.py

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,6 @@
88
from yql_utils import KSV_ATTR, get_files, get_http_files, get_tables, is_xfail, yql_binary_path, yql_source_path
99

1010
EXCLUDED_SUITES = [
11-
'match_recognize', # MATCH_RECOGNIZE is disabled in KQP
1211
]
1312

1413
EXCLUDED_TESTS = [

ydb/tests/tools/kqprun/configuration/app_config.conf

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@ QueryServiceConfig {
2929
QueryArtifactsCompressionMethod: "zstd_6"
3030
ScriptResultRowsLimit: 0
3131
ScriptResultSizeLimit: 10485760
32+
EnableMatchRecognize: true
3233

3334
FileStorage {
3435
MaxFiles: 1000

0 commit comments

Comments
 (0)