Skip to content

Commit 5d87743

Browse files
committed
get rid of mkql results in scripting
1 parent 785e5b7 commit 5d87743

18 files changed

+87
-130
lines changed

ydb/core/grpc_services/rpc_stream_execute_yql_script.cpp

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -50,7 +50,7 @@ namespace {
5050
{}
5151

5252
NKqp::TEvKqp::TEvDataQueryStreamPart::TPtr Handle;
53-
google::protobuf::RepeatedPtrField<NKikimrMiniKQL::TResult>::const_iterator ResultIterator;
53+
google::protobuf::RepeatedPtrField<Ydb::ResultSet>::const_iterator ResultIterator;
5454
};
5555

5656
enum EStreamRpcWakeupTag : ui64 {
@@ -220,7 +220,7 @@ class TStreamExecuteYqlScriptRPC
220220
auto result = response.mutable_result();
221221

222222
try {
223-
NKqp::ConvertKqpQueryResultToDbResult(kqpResult, result->mutable_result_set());
223+
result->mutable_result_set()->CopyFrom(kqpResult);
224224
} catch (std::exception ex) {
225225
ReplyFinishStream(ex.what());
226226
}

ydb/core/kqp/common/events/query.h

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -70,7 +70,9 @@ struct TEvQueryRequest: public NActors::TEventLocal<TEvQueryRequest, TKqpEvents:
7070
const TQueryRequestSettings& querySettings = TQueryRequestSettings(),
7171
const TString& poolId = "");
7272

73-
TEvQueryRequest() = default;
73+
TEvQueryRequest() {
74+
Record.MutableRequest()->SetUsePublicResponseDataFormat(true);
75+
}
7476

7577
bool IsSerializable() const override {
7678
return true;

ydb/core/kqp/common/kqp.h

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -40,9 +40,9 @@ bool ScriptExecutionRunnerActorIdFromString(const TString& executionId, TActorId
4040

4141
template<typename TFrom, typename TTo>
4242
inline void ConvertKqpQueryResultsToDbResult(const TFrom& from, TTo* to) {
43-
const auto& results = from.GetResults();
43+
const auto& results = from.GetYdbResults();
4444
for (const auto& result : results) {
45-
ConvertKqpQueryResultToDbResult(result, to->add_result_sets());
45+
to->add_result_sets()->CopyFrom(result);
4646
}
4747
}
4848

@@ -80,7 +80,7 @@ class IQueryReplayBackend : public TNonCopyable {
8080
/// Accepts query text
8181
virtual void Collect(const TString& queryData) = 0;
8282

83-
virtual bool IsNull() { return false; }
83+
virtual bool IsNull() { return false; }
8484

8585
virtual ~IQueryReplayBackend() {};
8686

ydb/core/kqp/common/kqp_event_impl.cpp

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -94,6 +94,7 @@ void TEvKqp::TEvQueryRequest::PrepareRemote() const {
9494
Record.MutableRequest()->SetDatabaseId(DatabaseId);
9595
}
9696

97+
Record.MutableRequest()->SetUsePublicResponseDataFormat(true);
9798
Record.MutableRequest()->SetSessionId(SessionId);
9899
Record.MutableRequest()->SetAction(QueryAction);
99100
Record.MutableRequest()->SetType(QueryType);

ydb/core/kqp/gateway/kqp_ic_gateway.cpp

Lines changed: 13 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -227,7 +227,7 @@ class TKqpScanQueryRequestHandler : public TRequestHandlerBase<
227227
void HandleResponse(typename TResponse::TPtr &ev, const TActorContext &ctx) {
228228
auto& response = *ev->Get()->Record.GetRef().MutableResponse();
229229

230-
NKikimr::ConvertYdbResultToKqpResult(ResultSet,*response.AddResults());
230+
response.AddYdbResults()->CopyFrom(ResultSet);
231231
for (auto& execStats : Executions) {
232232
response.MutableQueryStats()->AddExecutions()->Swap(&execStats);
233233
}
@@ -286,20 +286,18 @@ class TKqpStreamRequestHandler : public TRequestHandlerBase<
286286
virtual void HandleResponse(typename TResponse::TPtr &ev, const TActorContext &ctx) {
287287
auto& record = ev->Get()->Record.GetRef();
288288
if (record.GetYdbStatus() == Ydb::StatusIds::SUCCESS) {
289-
if (record.MutableResponse()->GetResults().size()) {
289+
if (record.MutableResponse()->GetYdbResults().size()) {
290290
// Send result sets to RPC actor TStreamExecuteYqlScriptRPC
291291
auto evStreamPart = MakeHolder<NKqp::TEvKqp::TEvDataQueryStreamPart>();
292292
ActorIdToProto(this->SelfId(), evStreamPart->Record.MutableGatewayActorId());
293293

294-
for (int i = 0; i < record.MutableResponse()->MutableResults()->size(); ++i) {
294+
for (int i = 0; i < record.MutableResponse()->MutableYdbResults()->size(); ++i) {
295295
// Workaround to avoid errors on Pull execution stage which would expect some results
296-
Ydb::ResultSet resultSet;
297-
NKikimr::ConvertYdbResultToKqpResult(resultSet, *evStreamPart->Record.AddResults());
296+
evStreamPart->Record.AddResults();
298297
}
299298

300-
evStreamPart->Record.MutableResults()->Swap(record.MutableResponse()->MutableResults());
299+
evStreamPart->Record.MutableResults()->Swap(record.MutableResponse()->MutableYdbResults());
301300
this->Send(TargetActorId, evStreamPart.Release());
302-
303301
// Save response without data to send it later
304302
ResponseHandle = ev.Release();
305303
} else {
@@ -405,7 +403,7 @@ class TKqpForwardStreamRequestHandler : public TRequestHandlerBase<
405403
auto& response = *ev->Get()->Record.GetRef().MutableResponse();
406404

407405
Ydb::ResultSet resultSet;
408-
NKikimr::ConvertYdbResultToKqpResult(resultSet, *response.AddResults());
406+
response.AddYdbResults()->CopyFrom(resultSet);
409407
for (auto& execStats : Executions) {
410408
response.MutableQueryStats()->AddExecutions()->Swap(&execStats);
411409
}
@@ -511,7 +509,7 @@ class TKqpGenericQueryRequestHandler: public TRequestHandlerBase<
511509
auto& response = *ev->Get()->Record.GetRef().MutableResponse();
512510

513511
for (auto& resultSet : ResultSets) {
514-
ConvertYdbResultToKqpResult(std::move(resultSet.ResultSet), *response.AddResults());
512+
response.AddYdbResults()->Swap(&resultSet.ResultSet);
515513
}
516514

517515
TBase::HandleResponse(ev, ctx);
@@ -672,8 +670,8 @@ void KqpResponseToQueryResult(const NKikimrKqp::TEvQueryResponse& response, IKqp
672670
queryResult.AddIssue(NYql::IssueFromMessage(issue));
673671
}
674672

675-
for (auto& result : queryResponse.GetResults()) {
676-
auto arenaResult = google::protobuf::Arena::CreateMessage<NKikimrMiniKQL::TResult>(
673+
for (auto& result : queryResponse.GetYdbResults()) {
674+
auto arenaResult = google::protobuf::Arena::CreateMessage<Ydb::ResultSet>(
677675
queryResult.ProtobufArenaPtr.get());
678676

679677
arenaResult->CopyFrom(result);
@@ -1419,11 +1417,11 @@ class TKikimrIcGateway : public IKqpGateway {
14191417
if (!CheckCluster(cluster)) {
14201418
return InvalidCluster<TGenericResult>(cluster);
14211419
}
1422-
1420+
14231421
auto analyzePromise = NewPromise<TGenericResult>();
14241422
IActor* analyzeActor = new TAnalyzeActor(settings.TablePath, settings.Columns, analyzePromise);
14251423
RegisterActor(analyzeActor);
1426-
1424+
14271425
return analyzePromise.GetFuture();
14281426
} catch (yexception& e) {
14291427
return MakeFuture(ResultFromException<TGenericResult>(e));
@@ -1995,7 +1993,7 @@ class TKikimrIcGateway : public IKqpGateway {
19951993
}
19961994

19971995
TFuture<TQueryResult> ExecDataQueryAst(const TString& cluster, const TString& query, TQueryData::TPtr params,
1998-
const TAstQuerySettings& settings, const Ydb::Table::TransactionSettings& txSettings,
1996+
const TAstQuerySettings& settings, const Ydb::Table::TransactionSettings& txSettings,
19991997
const TMaybe<TString>& traceId) override
20001998
{
20011999
YQL_ENSURE(cluster == Cluster);
@@ -2075,7 +2073,7 @@ class TKikimrIcGateway : public IKqpGateway {
20752073
}
20762074

20772075
TFuture<TQueryResult> ExecGenericQuery(const TString& cluster, const TString& query, TQueryData::TPtr params,
2078-
const TAstQuerySettings& settings, const Ydb::Table::TransactionSettings& txSettings,
2076+
const TAstQuerySettings& settings, const Ydb::Table::TransactionSettings& txSettings,
20792077
const TMaybe<TString>& traceId) override
20802078
{
20812079
YQL_ENSURE(cluster == Cluster);

ydb/core/kqp/host/kqp_host.cpp

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -288,8 +288,8 @@ class TAsyncExecuteYqlResult : public TKqpAsyncResultBase<IKqpHost::TQueryResult
288288

289289
for (auto& resultStr : ResultProviderConfig.CommittedResults) {
290290
queryResult.Results.emplace_back(
291-
google::protobuf::Arena::CreateMessage<NKikimrMiniKQL::TResult>(queryResult.ProtobufArenaPtr.get()));
292-
NKikimrMiniKQL::TResult* result = queryResult.Results.back();
291+
google::protobuf::Arena::CreateMessage<Ydb::ResultSet>(queryResult.ProtobufArenaPtr.get()));
292+
Ydb::ResultSet* result = queryResult.Results.back();
293293

294294
if (!result->ParseFromArray(resultStr.data(), resultStr.size())) {
295295
queryResult = ResultFromError<TResult>("Failed to parse run result.");

ydb/core/kqp/host/kqp_runner.cpp

Lines changed: 3 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -117,17 +117,7 @@ class TPrepareQueryAsyncResult : public TKqpAsyncResultBase<IKikimrQueryExecutor
117117
}
118118

119119
void FillResult(TResult& queryResult) const override {
120-
TVector<NKikimrMiniKQL::TResult*> results;
121-
for (auto& phyResult : TransformCtx.PhysicalQueryResults) {
122-
auto result = google::protobuf::Arena::CreateMessage<NKikimrMiniKQL::TResult>(
123-
queryResult.ProtobufArenaPtr.get());
124-
125-
result->CopyFrom(phyResult);
126-
results.push_back(result);
127-
}
128-
129120
queryResult.QueryStats.CopyFrom(TransformCtx.QueryStats);
130-
queryResult.Results = std::move(results);
131121
}
132122

133123
private:
@@ -328,7 +318,7 @@ class TKqpRunner : public IKqpRunner {
328318
Config),
329319
"BuildPhysicalTxs")
330320
.Build(false));
331-
321+
332322
auto physicalBuildQueryTransformer = TTransformationPipeline(typesCtx)
333323
.AddServiceTransformers()
334324
.Add(Log("PhysicalBuildQuery"), "LogPhysicalBuildQuery")
@@ -403,8 +393,8 @@ class TKqpRunner : public IKqpRunner {
403393
TKqpProviderContext Pctx;
404394

405395
TAutoPtr<IGraphTransformer> Transformer;
406-
407-
TActorSystem* ActorSystem;
396+
397+
TActorSystem* ActorSystem;
408398
};
409399

410400
} // namespace

ydb/core/kqp/host/kqp_transform.h

Lines changed: 0 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -26,18 +26,13 @@ struct TKqlTransformContext : TThrRefBase {
2626
NKqpProto::TKqpStatsQuery QueryStats;
2727
std::shared_ptr<const NKqpProto::TKqpPhyQuery> PhysicalQuery;
2828

29-
TVector<TSimpleSharedPtr<NKikimrMiniKQL::TResult>> MkqlResults;
30-
TVector<NKikimrMiniKQL::TResult> PhysicalQueryResults;
31-
3229
NYql::TExprNode::TPtr ExplainTransformerInput; // Explain transformer must work after other transformers, but use input before peephole
3330
TMaybe<NYql::NNodes::TKiDataQueryBlocks> DataQueryBlocks;
3431

3532
void Reset() {
3633
ReplyTarget = {};
37-
MkqlResults.clear();
3834
QueryStats = {};
3935
PhysicalQuery = nullptr;
40-
PhysicalQueryResults.clear();
4136
ExplainTransformerInput = nullptr;
4237
DataQueryBlocks = Nothing();
4338
}

ydb/core/kqp/provider/yql_kikimr_exec.cpp

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -997,15 +997,15 @@ class TKiSourceCallableExecutionTransformer : public TAsyncCallbackTransformer<T
997997

998998
private:
999999
static TExprNode::TPtr GetResOrPullResult(const TExprNode& node, const IDataProvider::TFillSettings& fillSettings,
1000-
const NKikimrMiniKQL::TResult& resultValue, TExprContext& ctx)
1000+
const Ydb::ResultSet& resultValue, TExprContext& ctx)
10011001
{
10021002
TColumnOrder columnHints(NCommon::GetResOrPullColumnHints(node));
10031003

10041004
auto protoValue = &resultValue;
10051005
YQL_ENSURE(resultValue.GetArena());
1006-
if (IsRawKikimrResult(resultValue)) {
1007-
protoValue = KikimrResultToProto(resultValue, columnHints, fillSettings, resultValue.GetArena());
1008-
}
1006+
//if (IsRawKikimrResult(resultValue)) {
1007+
// protoValue = KikimrResultToProto(resultValue, columnHints, fillSettings, resultValue.GetArena());
1008+
//}
10091009

10101010
YQL_ENSURE(fillSettings.Format == IDataProvider::EResultFormat::Custom);
10111011
YQL_ENSURE(fillSettings.FormatDetails == KikimrMkqlProtoFormat);

ydb/core/kqp/provider/yql_kikimr_gateway.h

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -902,7 +902,7 @@ class IKikimrGateway : public TThrRefBase {
902902

903903
struct TQueryResult : public TGenericResult {
904904
TString SessionId;
905-
TVector<NKikimrMiniKQL::TResult*> Results;
905+
TVector<Ydb::ResultSet*> Results;
906906
NKqpProto::TKqpStatsQuery QueryStats;
907907
std::unique_ptr<NKikimrKqp::TPreparedQuery> PreparingQuery;
908908
std::shared_ptr<const NKikimrKqp::TPreparedQuery> PreparedQuery;

ydb/core/kqp/session_actor/kqp_session_actor.cpp

Lines changed: 10 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -1788,7 +1788,7 @@ class TKqpSessionActor : public TActorBootstrapped<TKqpSessionActor> {
17881788
// Result for scan query is sent directly to target actor.
17891789
Y_ABORT_UNLESS(response->GetArena());
17901790
if (QueryState->PreparedQuery) {
1791-
bool useYdbResponseFormat = QueryState->GetUsePublicResponseDataFormat();
1791+
// bool useYdbResponseFormat = QueryState->GetUsePublicResponseDataFormat();
17921792
auto& phyQuery = QueryState->PreparedQuery->GetPhysicalQuery();
17931793
size_t trailingResultsCount = 0;
17941794
for (size_t i = 0; i < phyQuery.ResultBindingsSize(); ++i) {
@@ -1805,14 +1805,15 @@ class TKqpSessionActor : public TActorBootstrapped<TKqpSessionActor> {
18051805
continue;
18061806
}
18071807

1808-
if (useYdbResponseFormat) {
1808+
//if (useYdbResponseFormat) {
18091809
TMaybe<ui64> effectiveRowsLimit = FillSettings.RowsLimitPerWrite;
18101810
if (QueryState->PreparedQuery->GetResults(i).GetRowsLimit()) {
18111811
effectiveRowsLimit = QueryState->PreparedQuery->GetResults(i).GetRowsLimit();
18121812
}
18131813
auto* ydbResult = QueryState->QueryData->GetYdbTxResult(phyQuery.GetResultBindings(i), response->GetArena(), effectiveRowsLimit);
18141814
response->AddYdbResults()->Swap(ydbResult);
1815-
} else {
1815+
//}
1816+
/*else {
18161817
auto* protoRes = QueryState->QueryData->GetMkqlTxResult(phyQuery.GetResultBindings(i), response->GetArena());
18171818
std::optional<IDataProvider::TFillSettings> fillSettings;
18181819
if (QueryState->PreparedQuery->ResultsSize()) {
@@ -1826,7 +1827,7 @@ class TKqpSessionActor : public TActorBootstrapped<TKqpSessionActor> {
18261827
}
18271828
auto* finalResult = KikimrResultToProto(*protoRes, {}, fillSettings.value_or(FillSettings), response->GetArena());
18281829
response->AddResults()->Swap(finalResult);
1829-
}
1830+
}*/
18301831
}
18311832
}
18321833

@@ -1892,10 +1893,10 @@ class TKqpSessionActor : public TActorBootstrapped<TKqpSessionActor> {
18921893
AddTrailingInfo(response->Record.GetRef());
18931894

18941895
NDataIntegrity::LogIntegrityTrails(
1895-
request->Get()->GetTraceId(),
1896-
request->Get()->GetAction(),
1897-
request->Get()->GetType(),
1898-
response,
1896+
request->Get()->GetTraceId(),
1897+
request->Get()->GetAction(),
1898+
request->Get()->GetType(),
1899+
response,
18991900
TlsActivationContext->AsActorContext()
19001901
);
19011902

@@ -1955,7 +1956,7 @@ class TKqpSessionActor : public TActorBootstrapped<TKqpSessionActor> {
19551956
QueryState->UserRequestContext->TraceId,
19561957
QueryState->GetAction(),
19571958
QueryState->GetType(),
1958-
QueryResponse,
1959+
QueryResponse,
19591960
TlsActivationContext->AsActorContext()
19601961
);
19611962

ydb/core/kqp/session_actor/kqp_worker_actor.cpp

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -194,7 +194,7 @@ class TKqpWorkerActor : public TActorBootstrapped<TKqpWorkerActor> {
194194
Config->FeatureFlags = AppData(ctx)->FeatureFlags;
195195

196196
KqpHost = CreateKqpHost(Gateway, Settings.Cluster, Settings.Database, Config, ModuleResolverState->ModuleResolver, FederatedQuerySetup,
197-
QueryState->RequestEv->GetUserToken(), GUCSettings, QueryServiceConfig, Settings.ApplicationName, AppData(ctx)->FunctionRegistry,
197+
QueryState->RequestEv->GetUserToken(), GUCSettings, QueryServiceConfig, Settings.ApplicationName, AppData(ctx)->FunctionRegistry,
198198
!Settings.LongSession, false, nullptr, nullptr, nullptr, QueryState->RequestEv->GetUserRequestContext());
199199

200200
auto& queryRequest = QueryState->RequestEv;
@@ -959,7 +959,7 @@ class TKqpWorkerActor : public TActorBootstrapped<TKqpWorkerActor> {
959959
// If we have result it must be allocated on protobuf arena
960960
Y_ASSERT(result->GetArena());
961961
Y_ASSERT(resp->GetArena() == result->GetArena());
962-
resp->AddResults()->Swap(result);
962+
resp->AddYdbResults()->Swap(result);
963963
}
964964
} else {
965965
auto resp = ev.MutableResponse();

ydb/core/kqp/session_actor/kqp_worker_common.cpp

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -119,7 +119,7 @@ void SlowLogQuery(const TActorContext &ctx, const TKikimrConfiguration* config,
119119
<< 'b';
120120

121121
ui64 resultsSize = 0;
122-
for (auto& result : record->GetResponse().GetResults()) {
122+
for (auto& result : record->GetResponse().GetYdbResults()) {
123123
resultsSize += result.ByteSize();
124124
}
125125

0 commit comments

Comments
 (0)