Skip to content

Commit a491b73

Browse files
authored
get rid of mkql results in scripting (#9997)
1 parent 91f8a44 commit a491b73

24 files changed

+121
-286
lines changed

ydb/core/grpc_services/rpc_execute_data_query.cpp

+1-2
Original file line numberDiff line numberDiff line change
@@ -189,9 +189,8 @@ class TExecuteDataQueryRPC : public TRpcKqpRequestActor<TExecuteDataQueryRPC, TE
189189
// https://protobuf.dev/reference/cpp/arenas/#swap
190190
// Actualy will be copy in case pf remote execution
191191
queryResult->mutable_result_sets()->Swap(record.MutableResponse()->MutableYdbResults());
192-
} else {
193-
NKqp::ConvertKqpQueryResultsToDbResult(kqpResponse, queryResult);
194192
}
193+
195194
ConvertQueryStats(kqpResponse, queryResult);
196195
if (kqpResponse.HasTxMeta()) {
197196
queryResult->mutable_tx_meta()->CopyFrom(kqpResponse.GetTxMeta());

ydb/core/grpc_services/rpc_execute_yql_script.cpp

+5-1
Original file line numberDiff line numberDiff line change
@@ -101,7 +101,11 @@ class TExecuteYqlScriptRPC : public TRpcKqpRequestActor<TExecuteYqlScriptRPC, TE
101101
auto queryResult = TEvExecuteYqlScriptRequest::AllocateResult<TResult>(Request_);
102102

103103
try {
104-
NKqp::ConvertKqpQueryResultsToDbResult(kqpResponse, queryResult);
104+
const auto& results = kqpResponse.GetYdbResults();
105+
for (const auto& result : results) {
106+
queryResult->add_result_sets()->CopyFrom(result);
107+
}
108+
105109
} catch (const std::exception& ex) {
106110
NYql::TIssues issues;
107111
issues.AddIssue(NYql::ExceptionToIssue(ex));

ydb/core/grpc_services/rpc_stream_execute_yql_script.cpp

+2-2
Original file line numberDiff line numberDiff line change
@@ -50,7 +50,7 @@ namespace {
5050
{}
5151

5252
NKqp::TEvKqp::TEvDataQueryStreamPart::TPtr Handle;
53-
google::protobuf::RepeatedPtrField<NKikimrMiniKQL::TResult>::const_iterator ResultIterator;
53+
google::protobuf::RepeatedPtrField<Ydb::ResultSet>::const_iterator ResultIterator;
5454
};
5555

5656
enum EStreamRpcWakeupTag : ui64 {
@@ -220,7 +220,7 @@ class TStreamExecuteYqlScriptRPC
220220
auto result = response.mutable_result();
221221

222222
try {
223-
NKqp::ConvertKqpQueryResultToDbResult(kqpResult, result->mutable_result_set());
223+
result->mutable_result_set()->CopyFrom(kqpResult);
224224
} catch (std::exception ex) {
225225
ReplyFinishStream(ex.what());
226226
}

ydb/core/kqp/common/events/query.h

+3-1
Original file line numberDiff line numberDiff line change
@@ -70,7 +70,9 @@ struct TEvQueryRequest: public NActors::TEventLocal<TEvQueryRequest, TKqpEvents:
7070
const TQueryRequestSettings& querySettings = TQueryRequestSettings(),
7171
const TString& poolId = "");
7272

73-
TEvQueryRequest() = default;
73+
TEvQueryRequest() {
74+
Record.MutableRequest()->SetUsePublicResponseDataFormat(true);
75+
}
7476

7577
bool IsSerializable() const override {
7678
return true;

ydb/core/kqp/common/kqp.h

+1-10
Original file line numberDiff line numberDiff line change
@@ -33,18 +33,9 @@
3333

3434
namespace NKikimr::NKqp {
3535

36-
void ConvertKqpQueryResultToDbResult(const NKikimrMiniKQL::TResult& from, Ydb::ResultSet* to);
37-
3836
TString ScriptExecutionRunnerActorIdString(const NActors::TActorId& actorId);
3937
bool ScriptExecutionRunnerActorIdFromString(const TString& executionId, TActorId& actorId);
4038

41-
template<typename TFrom, typename TTo>
42-
inline void ConvertKqpQueryResultsToDbResult(const TFrom& from, TTo* to) {
43-
const auto& results = from.GetResults();
44-
for (const auto& result : results) {
45-
ConvertKqpQueryResultToDbResult(result, to->add_result_sets());
46-
}
47-
}
4839

4940
class TKqpRequestInfo {
5041
public:
@@ -80,7 +71,7 @@ class IQueryReplayBackend : public TNonCopyable {
8071
/// Accepts query text
8172
virtual void Collect(const TString& queryData) = 0;
8273

83-
virtual bool IsNull() { return false; }
74+
virtual bool IsNull() { return false; }
8475

8576
virtual ~IQueryReplayBackend() {};
8677

ydb/core/kqp/common/kqp_event_impl.cpp

+1
Original file line numberDiff line numberDiff line change
@@ -94,6 +94,7 @@ void TEvKqp::TEvQueryRequest::PrepareRemote() const {
9494
Record.MutableRequest()->SetDatabaseId(DatabaseId);
9595
}
9696

97+
Record.MutableRequest()->SetUsePublicResponseDataFormat(true);
9798
Record.MutableRequest()->SetSessionId(SessionId);
9899
Record.MutableRequest()->SetAction(QueryAction);
99100
Record.MutableRequest()->SetType(QueryType);

ydb/core/kqp/gateway/kqp_ic_gateway.cpp

+13-15
Original file line numberDiff line numberDiff line change
@@ -227,7 +227,7 @@ class TKqpScanQueryRequestHandler : public TRequestHandlerBase<
227227
void HandleResponse(typename TResponse::TPtr &ev, const TActorContext &ctx) {
228228
auto& response = *ev->Get()->Record.GetRef().MutableResponse();
229229

230-
NKikimr::ConvertYdbResultToKqpResult(ResultSet,*response.AddResults());
230+
response.AddYdbResults()->CopyFrom(ResultSet);
231231
for (auto& execStats : Executions) {
232232
response.MutableQueryStats()->AddExecutions()->Swap(&execStats);
233233
}
@@ -286,20 +286,18 @@ class TKqpStreamRequestHandler : public TRequestHandlerBase<
286286
virtual void HandleResponse(typename TResponse::TPtr &ev, const TActorContext &ctx) {
287287
auto& record = ev->Get()->Record.GetRef();
288288
if (record.GetYdbStatus() == Ydb::StatusIds::SUCCESS) {
289-
if (record.MutableResponse()->GetResults().size()) {
289+
if (record.MutableResponse()->GetYdbResults().size()) {
290290
// Send result sets to RPC actor TStreamExecuteYqlScriptRPC
291291
auto evStreamPart = MakeHolder<NKqp::TEvKqp::TEvDataQueryStreamPart>();
292292
ActorIdToProto(this->SelfId(), evStreamPart->Record.MutableGatewayActorId());
293293

294-
for (int i = 0; i < record.MutableResponse()->MutableResults()->size(); ++i) {
294+
for (int i = 0; i < record.MutableResponse()->MutableYdbResults()->size(); ++i) {
295295
// Workaround to avoid errors on Pull execution stage which would expect some results
296-
Ydb::ResultSet resultSet;
297-
NKikimr::ConvertYdbResultToKqpResult(resultSet, *evStreamPart->Record.AddResults());
296+
evStreamPart->Record.AddResults();
298297
}
299298

300-
evStreamPart->Record.MutableResults()->Swap(record.MutableResponse()->MutableResults());
299+
evStreamPart->Record.MutableResults()->Swap(record.MutableResponse()->MutableYdbResults());
301300
this->Send(TargetActorId, evStreamPart.Release());
302-
303301
// Save response without data to send it later
304302
ResponseHandle = ev.Release();
305303
} else {
@@ -405,7 +403,7 @@ class TKqpForwardStreamRequestHandler : public TRequestHandlerBase<
405403
auto& response = *ev->Get()->Record.GetRef().MutableResponse();
406404

407405
Ydb::ResultSet resultSet;
408-
NKikimr::ConvertYdbResultToKqpResult(resultSet, *response.AddResults());
406+
response.AddYdbResults()->CopyFrom(resultSet);
409407
for (auto& execStats : Executions) {
410408
response.MutableQueryStats()->AddExecutions()->Swap(&execStats);
411409
}
@@ -511,7 +509,7 @@ class TKqpGenericQueryRequestHandler: public TRequestHandlerBase<
511509
auto& response = *ev->Get()->Record.GetRef().MutableResponse();
512510

513511
for (auto& resultSet : ResultSets) {
514-
ConvertYdbResultToKqpResult(std::move(resultSet.ResultSet), *response.AddResults());
512+
response.AddYdbResults()->Swap(&resultSet.ResultSet);
515513
}
516514

517515
TBase::HandleResponse(ev, ctx);
@@ -672,8 +670,8 @@ void KqpResponseToQueryResult(const NKikimrKqp::TEvQueryResponse& response, IKqp
672670
queryResult.AddIssue(NYql::IssueFromMessage(issue));
673671
}
674672

675-
for (auto& result : queryResponse.GetResults()) {
676-
auto arenaResult = google::protobuf::Arena::CreateMessage<NKikimrMiniKQL::TResult>(
673+
for (auto& result : queryResponse.GetYdbResults()) {
674+
auto arenaResult = google::protobuf::Arena::CreateMessage<Ydb::ResultSet>(
677675
queryResult.ProtobufArenaPtr.get());
678676

679677
arenaResult->CopyFrom(result);
@@ -1419,11 +1417,11 @@ class TKikimrIcGateway : public IKqpGateway {
14191417
if (!CheckCluster(cluster)) {
14201418
return InvalidCluster<TGenericResult>(cluster);
14211419
}
1422-
1420+
14231421
auto analyzePromise = NewPromise<TGenericResult>();
14241422
IActor* analyzeActor = new TAnalyzeActor(settings.TablePath, settings.Columns, analyzePromise);
14251423
RegisterActor(analyzeActor);
1426-
1424+
14271425
return analyzePromise.GetFuture();
14281426
} catch (yexception& e) {
14291427
return MakeFuture(ResultFromException<TGenericResult>(e));
@@ -1995,7 +1993,7 @@ class TKikimrIcGateway : public IKqpGateway {
19951993
}
19961994

19971995
TFuture<TQueryResult> ExecDataQueryAst(const TString& cluster, const TString& query, TQueryData::TPtr params,
1998-
const TAstQuerySettings& settings, const Ydb::Table::TransactionSettings& txSettings,
1996+
const TAstQuerySettings& settings, const Ydb::Table::TransactionSettings& txSettings,
19991997
const TMaybe<TString>& traceId) override
20001998
{
20011999
YQL_ENSURE(cluster == Cluster);
@@ -2075,7 +2073,7 @@ class TKikimrIcGateway : public IKqpGateway {
20752073
}
20762074

20772075
TFuture<TQueryResult> ExecGenericQuery(const TString& cluster, const TString& query, TQueryData::TPtr params,
2078-
const TAstQuerySettings& settings, const Ydb::Table::TransactionSettings& txSettings,
2076+
const TAstQuerySettings& settings, const Ydb::Table::TransactionSettings& txSettings,
20792077
const TMaybe<TString>& traceId) override
20802078
{
20812079
YQL_ENSURE(cluster == Cluster);

ydb/core/kqp/host/kqp_host.cpp

+2-2
Original file line numberDiff line numberDiff line change
@@ -288,8 +288,8 @@ class TAsyncExecuteYqlResult : public TKqpAsyncResultBase<IKqpHost::TQueryResult
288288

289289
for (auto& resultStr : ResultProviderConfig.CommittedResults) {
290290
queryResult.Results.emplace_back(
291-
google::protobuf::Arena::CreateMessage<NKikimrMiniKQL::TResult>(queryResult.ProtobufArenaPtr.get()));
292-
NKikimrMiniKQL::TResult* result = queryResult.Results.back();
291+
google::protobuf::Arena::CreateMessage<Ydb::ResultSet>(queryResult.ProtobufArenaPtr.get()));
292+
Ydb::ResultSet* result = queryResult.Results.back();
293293

294294
if (!result->ParseFromArray(resultStr.data(), resultStr.size())) {
295295
queryResult = ResultFromError<TResult>("Failed to parse run result.");

ydb/core/kqp/host/kqp_runner.cpp

+3-13
Original file line numberDiff line numberDiff line change
@@ -117,17 +117,7 @@ class TPrepareQueryAsyncResult : public TKqpAsyncResultBase<IKikimrQueryExecutor
117117
}
118118

119119
void FillResult(TResult& queryResult) const override {
120-
TVector<NKikimrMiniKQL::TResult*> results;
121-
for (auto& phyResult : TransformCtx.PhysicalQueryResults) {
122-
auto result = google::protobuf::Arena::CreateMessage<NKikimrMiniKQL::TResult>(
123-
queryResult.ProtobufArenaPtr.get());
124-
125-
result->CopyFrom(phyResult);
126-
results.push_back(result);
127-
}
128-
129120
queryResult.QueryStats.CopyFrom(TransformCtx.QueryStats);
130-
queryResult.Results = std::move(results);
131121
}
132122

133123
private:
@@ -328,7 +318,7 @@ class TKqpRunner : public IKqpRunner {
328318
Config),
329319
"BuildPhysicalTxs")
330320
.Build(false));
331-
321+
332322
auto physicalBuildQueryTransformer = TTransformationPipeline(typesCtx)
333323
.AddServiceTransformers()
334324
.Add(Log("PhysicalBuildQuery"), "LogPhysicalBuildQuery")
@@ -403,8 +393,8 @@ class TKqpRunner : public IKqpRunner {
403393
TKqpProviderContext Pctx;
404394

405395
TAutoPtr<IGraphTransformer> Transformer;
406-
407-
TActorSystem* ActorSystem;
396+
397+
TActorSystem* ActorSystem;
408398
};
409399

410400
} // namespace

ydb/core/kqp/host/kqp_transform.h

-5
Original file line numberDiff line numberDiff line change
@@ -26,18 +26,13 @@ struct TKqlTransformContext : TThrRefBase {
2626
NKqpProto::TKqpStatsQuery QueryStats;
2727
std::shared_ptr<const NKqpProto::TKqpPhyQuery> PhysicalQuery;
2828

29-
TVector<TSimpleSharedPtr<NKikimrMiniKQL::TResult>> MkqlResults;
30-
TVector<NKikimrMiniKQL::TResult> PhysicalQueryResults;
31-
3229
NYql::TExprNode::TPtr ExplainTransformerInput; // Explain transformer must work after other transformers, but use input before peephole
3330
TMaybe<NYql::NNodes::TKiDataQueryBlocks> DataQueryBlocks;
3431

3532
void Reset() {
3633
ReplyTarget = {};
37-
MkqlResults.clear();
3834
QueryStats = {};
3935
PhysicalQuery = nullptr;
40-
PhysicalQueryResults.clear();
4136
ExplainTransformerInput = nullptr;
4237
DataQueryBlocks = Nothing();
4338
}

ydb/core/kqp/provider/yql_kikimr_exec.cpp

+1-5
Original file line numberDiff line numberDiff line change
@@ -997,16 +997,12 @@ class TKiSourceCallableExecutionTransformer : public TAsyncCallbackTransformer<T
997997

998998
private:
999999
static TExprNode::TPtr GetResOrPullResult(const TExprNode& node, const IDataProvider::TFillSettings& fillSettings,
1000-
const NKikimrMiniKQL::TResult& resultValue, TExprContext& ctx)
1000+
const Ydb::ResultSet& resultValue, TExprContext& ctx)
10011001
{
10021002
TColumnOrder columnHints(NCommon::GetResOrPullColumnHints(node));
10031003

10041004
auto protoValue = &resultValue;
10051005
YQL_ENSURE(resultValue.GetArena());
1006-
if (IsRawKikimrResult(resultValue)) {
1007-
protoValue = KikimrResultToProto(resultValue, columnHints, fillSettings, resultValue.GetArena());
1008-
}
1009-
10101006
YQL_ENSURE(fillSettings.Format == IDataProvider::EResultFormat::Custom);
10111007
YQL_ENSURE(fillSettings.FormatDetails == KikimrMkqlProtoFormat);
10121008

ydb/core/kqp/provider/yql_kikimr_gateway.h

+1-1
Original file line numberDiff line numberDiff line change
@@ -902,7 +902,7 @@ class IKikimrGateway : public TThrRefBase {
902902

903903
struct TQueryResult : public TGenericResult {
904904
TString SessionId;
905-
TVector<NKikimrMiniKQL::TResult*> Results;
905+
TVector<Ydb::ResultSet*> Results;
906906
NKqpProto::TKqpStatsQuery QueryStats;
907907
std::unique_ptr<NKikimrKqp::TPreparedQuery> PreparingQuery;
908908
std::shared_ptr<const NKikimrKqp::TPreparedQuery> PreparedQuery;

ydb/core/kqp/provider/yql_kikimr_results.cpp

-88
Original file line numberDiff line numberDiff line change
@@ -338,94 +338,6 @@ void KikimrResultToYson(const TStringStream& stream, NYson::TYsonWriter& writer,
338338
fillSettings, truncated, true);
339339
}
340340

341-
bool IsRawKikimrResult(const NKikimrMiniKQL::TResult& result) {
342-
auto& type = result.GetType();
343-
if (type.GetKind() != NKikimrMiniKQL::ETypeKind::Struct) {
344-
return true;
345-
}
346-
347-
auto& structType = type.GetStruct();
348-
if (structType.MemberSize() != 2) {
349-
return true;
350-
}
351-
352-
return structType.GetMember(0).GetName() != "Data" || structType.GetMember(1).GetName() != "Truncated";
353-
}
354-
355-
NKikimrMiniKQL::TResult* KikimrResultToProto(const NKikimrMiniKQL::TResult& result, const TColumnOrder& columnHints,
356-
const IDataProvider::TFillSettings& fillSettings, google::protobuf::Arena* arena)
357-
{
358-
NKikimrMiniKQL::TResult* packedResult = google::protobuf::Arena::CreateMessage<NKikimrMiniKQL::TResult>(arena);
359-
auto* packedType = packedResult->MutableType();
360-
packedType->SetKind(NKikimrMiniKQL::ETypeKind::Struct);
361-
auto* dataMember = packedType->MutableStruct()->AddMember();
362-
dataMember->SetName("Data");
363-
auto* truncatedMember = packedType->MutableStruct()->AddMember();
364-
truncatedMember->SetName("Truncated");
365-
truncatedMember->MutableType()->SetKind(NKikimrMiniKQL::ETypeKind::Data);
366-
truncatedMember->MutableType()->MutableData()->SetScheme(NKikimr::NUdf::TDataType<bool>::Id);
367-
368-
auto* packedValue = packedResult->MutableValue();
369-
auto* dataValue = packedValue->AddStruct();
370-
auto* dataType = dataMember->MutableType();
371-
auto* truncatedValue = packedValue->AddStruct();
372-
373-
bool truncated = false;
374-
if (result.GetType().GetKind() == NKikimrMiniKQL::ETypeKind::List) {
375-
const auto& itemType = result.GetType().GetList().GetItem();
376-
377-
TMap<TString, size_t> memberIndices;
378-
if (itemType.GetKind() == NKikimrMiniKQL::ETypeKind::Struct && columnHints.Size() != 0) {
379-
const auto& structType = itemType.GetStruct();
380-
381-
for (size_t i = 0; i < structType.MemberSize(); ++i) {
382-
memberIndices[structType.GetMember(i).GetName()] = i;
383-
}
384-
385-
dataType->SetKind(NKikimrMiniKQL::ETypeKind::List);
386-
auto* newItem = dataType->MutableList()->MutableItem();
387-
newItem->SetKind(NKikimrMiniKQL::ETypeKind::Struct);
388-
auto* newStructType = newItem->MutableStruct();
389-
for (auto& [column, gen_col] : columnHints) {
390-
auto* memberIndex = memberIndices.FindPtr(gen_col);
391-
YQL_ENSURE(memberIndex);
392-
393-
(*newStructType->AddMember() = structType.GetMember(*memberIndex)).SetName(column);
394-
}
395-
} else {
396-
*dataType = result.GetType();
397-
}
398-
399-
ui64 rowsWritten = 0;
400-
ui64 bytesWritten = 0;
401-
for (auto& item : result.GetValue().GetList()) {
402-
if (ResultsOverflow(rowsWritten, bytesWritten, fillSettings)) {
403-
truncated = true;
404-
break;
405-
}
406-
if (!memberIndices.empty()) {
407-
auto* newStruct = dataValue->AddList();
408-
for (auto& [column, gen_column] : columnHints) {
409-
auto* memberIndex = memberIndices.FindPtr(gen_column);
410-
YQL_ENSURE(memberIndex);
411-
412-
*newStruct->AddStruct() = item.GetStruct(*memberIndex);
413-
}
414-
} else {
415-
*dataValue->AddList() = item;
416-
}
417-
418-
bytesWritten += item.ByteSize();
419-
++rowsWritten;
420-
}
421-
} else {
422-
dataType->CopyFrom(result.GetType());
423-
dataValue->CopyFrom(result.GetValue());
424-
}
425-
426-
truncatedValue->SetBool(truncated);
427-
return packedResult;
428-
}
429341

430342
const TTypeAnnotationNode* ParseTypeFromKikimrProto(const NKikimrMiniKQL::TType& type, TExprContext& ctx) {
431343
switch (type.GetKind()) {

ydb/core/kqp/provider/yql_kikimr_results.h

-5
Original file line numberDiff line numberDiff line change
@@ -7,11 +7,6 @@ namespace NYql {
77
void KikimrResultToYson(const TStringStream& stream, NYson::TYsonWriter& writer, const NKikimrMiniKQL::TResult& result,
88
const TColumnOrder& columnHints, const IDataProvider::TFillSettings& fillSettings, bool& truncated);
99

10-
NKikimrMiniKQL::TResult* KikimrResultToProto(const NKikimrMiniKQL::TResult& result, const TColumnOrder& columnHints,
11-
const IDataProvider::TFillSettings& fillSettings, google::protobuf::Arena* arena);
12-
13-
bool IsRawKikimrResult(const NKikimrMiniKQL::TResult& result);
14-
1510
const TTypeAnnotationNode* ParseTypeFromKikimrProto(const NKikimrMiniKQL::TType& type, TExprContext& ctx);
1611
bool ExportTypeToKikimrProto(const TTypeAnnotationNode& type, NKikimrMiniKQL::TType& protoType, TExprContext& ctx);
1712
TExprNode::TPtr ParseKikimrProtoValue(const NKikimrMiniKQL::TType& type, const NKikimrMiniKQL::TValue& value,

0 commit comments

Comments
 (0)