Skip to content

Commit 1143523

Browse files
authored
get rid of mkql results in scripting (#9997) (#10094)
1 parent a2be43b commit 1143523

27 files changed

+182
-231
lines changed

ydb/core/client/server/msgbus_server_pq_metacache.cpp

+18-11
Original file line numberDiff line numberDiff line change
@@ -218,6 +218,7 @@ class TPersQueueMetaCacheActor : public TActorBootstrapped<TPersQueueMetaCacheAc
218218
req->Record.MutableRequest()->SetType(NKikimrKqp::QUERY_TYPE_SQL_DML);
219219
req->Record.MutableRequest()->SetKeepSession(false);
220220
req->Record.MutableRequest()->SetDatabase(NKikimr::NPQ::GetDatabaseFromConfig(AppData(ctx)->PQConfig));
221+
req->Record.MutableRequest()->SetUsePublicResponseDataFormat(true);
221222

222223
req->Record.MutableRequest()->MutableQueryCachePolicy()->set_keep_in_cache(true);
223224
req->Record.MutableRequest()->MutableTxControl()->mutable_begin_tx()->mutable_serializable_read_write();
@@ -274,9 +275,14 @@ class TPersQueueMetaCacheActor : public TActorBootstrapped<TPersQueueMetaCacheAc
274275

275276
const auto& record = ev->Get()->Record.GetRef();
276277

277-
Y_ABORT_UNLESS(record.GetResponse().GetResults().size() == 1);
278-
const auto& rr = record.GetResponse().GetResults(0).GetValue().GetStruct(0);
279-
ui64 newVersion = rr.ListSize() == 0 ? 0 : rr.GetList(0).GetStruct(0).GetOptional().GetInt64();
278+
Y_VERIFY(record.GetResponse().YdbResultsSize() == 1);
279+
NYdb::TResultSetParser parser(record.GetResponse().GetYdbResults(0));
280+
281+
ui64 newVersion = 0;
282+
if (parser.RowsCount() != 0) {
283+
parser.TryNextRow();
284+
newVersion = *parser.ColumnParser(0).GetOptionalInt64();
285+
}
280286

281287
LastVersionUpdate = ctx.Now();
282288
if (newVersion > CurrentTopicsVersion || CurrentTopicsVersion == 0 || SkipVersionCheck) {
@@ -293,17 +299,18 @@ class TPersQueueMetaCacheActor : public TActorBootstrapped<TPersQueueMetaCacheAc
293299

294300
const auto& record = ev->Get()->Record.GetRef();
295301

296-
Y_ABORT_UNLESS(record.GetResponse().GetResults().size() == 1);
302+
Y_VERIFY(record.GetResponse().YdbResultsSize() == 1);
297303
TString path, dc;
298-
const auto& rr = record.GetResponse().GetResults(0).GetValue().GetStruct(0);
299-
for (const auto& row : rr.GetList()) {
300-
301-
path = row.GetStruct(0).GetOptional().GetText();
302-
dc = row.GetStruct(1).GetOptional().GetText();
304+
NYdb::TResultSetParser parser(record.GetResponse().GetYdbResults(0));
305+
const ui32 rowCount = parser.RowsCount();
306+
while (parser.TryNextRow()) {
307+
path = *parser.ColumnParser(0).GetOptionalUtf8();
308+
dc = *parser.ColumnParser(1).GetOptionalUtf8();
303309

304310
NewTopics.emplace_back(decltype(NewTopics)::value_type{path, dc});
305311
}
306-
if (rr.ListSize() > 0) {
312+
313+
if (rowCount > 0) {
307314
LastTopicKey = {path, dc};
308315
return RunQuery(EQueryType::EGetTopics, ctx);
309316
} else {
@@ -710,7 +717,7 @@ class TPersQueueMetaCacheActor : public TActorBootstrapped<TPersQueueMetaCacheAc
710717
void ProcessNodesInfoWaitersQueue(bool status, const TActorContext& ctx) {
711718
if (DynamicNodesMapping == nullptr) {
712719
Y_ABORT_UNLESS(!status);
713-
DynamicNodesMapping.reset(new THashMap<ui32, ui32>);
720+
DynamicNodesMapping.reset(new THashMap<ui32, ui32>);
714721
}
715722
while(!NodesMappingWaiters.empty()) {
716723
ctx.Send(NodesMappingWaiters.front(),

ydb/core/grpc_services/rpc_execute_data_query.cpp

+1-2
Original file line numberDiff line numberDiff line change
@@ -185,9 +185,8 @@ class TExecuteDataQueryRPC : public TRpcKqpRequestActor<TExecuteDataQueryRPC, TE
185185
// https://protobuf.dev/reference/cpp/arenas/#swap
186186
// Actualy will be copy in case pf remote execution
187187
queryResult->mutable_result_sets()->Swap(record.MutableResponse()->MutableYdbResults());
188-
} else {
189-
NKqp::ConvertKqpQueryResultsToDbResult(kqpResponse, queryResult);
190188
}
189+
191190
ConvertQueryStats(kqpResponse, queryResult);
192191
if (kqpResponse.HasTxMeta()) {
193192
queryResult->mutable_tx_meta()->CopyFrom(kqpResponse.GetTxMeta());

ydb/core/grpc_services/rpc_execute_yql_script.cpp

+5-1
Original file line numberDiff line numberDiff line change
@@ -97,7 +97,11 @@ class TExecuteYqlScriptRPC : public TRpcKqpRequestActor<TExecuteYqlScriptRPC, TE
9797
auto queryResult = TEvExecuteYqlScriptRequest::AllocateResult<TResult>(Request_);
9898

9999
try {
100-
NKqp::ConvertKqpQueryResultsToDbResult(kqpResponse, queryResult);
100+
const auto& results = kqpResponse.GetYdbResults();
101+
for (const auto& result : results) {
102+
queryResult->add_result_sets()->CopyFrom(result);
103+
}
104+
101105
} catch (const std::exception& ex) {
102106
NYql::TIssues issues;
103107
issues.AddIssue(NYql::ExceptionToIssue(ex));

ydb/core/grpc_services/rpc_stream_execute_yql_script.cpp

+2-2
Original file line numberDiff line numberDiff line change
@@ -49,7 +49,7 @@ namespace {
4949
{}
5050

5151
NKqp::TEvKqp::TEvDataQueryStreamPart::TPtr Handle;
52-
google::protobuf::RepeatedPtrField<NKikimrMiniKQL::TResult>::const_iterator ResultIterator;
52+
google::protobuf::RepeatedPtrField<Ydb::ResultSet>::const_iterator ResultIterator;
5353
};
5454

5555
enum EStreamRpcWakeupTag : ui64 {
@@ -218,7 +218,7 @@ class TStreamExecuteYqlScriptRPC
218218
auto result = response.mutable_result();
219219

220220
try {
221-
NKqp::ConvertKqpQueryResultToDbResult(kqpResult, result->mutable_result_set());
221+
result->mutable_result_set()->CopyFrom(kqpResult);
222222
} catch (std::exception ex) {
223223
ReplyFinishStream(ex.what());
224224
}

ydb/core/kqp/common/events/query.h

+3-1
Original file line numberDiff line numberDiff line change
@@ -70,7 +70,9 @@ struct TEvQueryRequest: public NActors::TEventLocal<TEvQueryRequest, TKqpEvents:
7070
const TQueryRequestSettings& querySettings = TQueryRequestSettings(),
7171
const TString& poolId = "");
7272

73-
TEvQueryRequest() = default;
73+
TEvQueryRequest() {
74+
Record.MutableRequest()->SetUsePublicResponseDataFormat(true);
75+
}
7476

7577
bool IsSerializable() const override {
7678
return true;

ydb/core/kqp/common/kqp.h

+1-10
Original file line numberDiff line numberDiff line change
@@ -33,18 +33,9 @@
3333

3434
namespace NKikimr::NKqp {
3535

36-
void ConvertKqpQueryResultToDbResult(const NKikimrMiniKQL::TResult& from, Ydb::ResultSet* to);
37-
3836
TString ScriptExecutionRunnerActorIdString(const NActors::TActorId& actorId);
3937
bool ScriptExecutionRunnerActorIdFromString(const TString& executionId, TActorId& actorId);
4038

41-
template<typename TFrom, typename TTo>
42-
inline void ConvertKqpQueryResultsToDbResult(const TFrom& from, TTo* to) {
43-
const auto& results = from.GetResults();
44-
for (const auto& result : results) {
45-
ConvertKqpQueryResultToDbResult(result, to->add_result_sets());
46-
}
47-
}
4839

4940
class TKqpRequestInfo {
5041
public:
@@ -80,7 +71,7 @@ class IQueryReplayBackend : public TNonCopyable {
8071
/// Accepts query text
8172
virtual void Collect(const TString& queryData) = 0;
8273

83-
virtual bool IsNull() { return false; }
74+
virtual bool IsNull() { return false; }
8475

8576
virtual ~IQueryReplayBackend() {};
8677

ydb/core/kqp/common/kqp_event_impl.cpp

+1
Original file line numberDiff line numberDiff line change
@@ -90,6 +90,7 @@ void TEvKqp::TEvQueryRequest::PrepareRemote() const {
9090
Record.MutableRequest()->SetPoolId(PoolId);
9191
}
9292

93+
Record.MutableRequest()->SetUsePublicResponseDataFormat(true);
9394
Record.MutableRequest()->SetSessionId(SessionId);
9495
Record.MutableRequest()->SetAction(QueryAction);
9596
Record.MutableRequest()->SetType(QueryType);

ydb/core/kqp/gateway/kqp_ic_gateway.cpp

+9-11
Original file line numberDiff line numberDiff line change
@@ -226,7 +226,7 @@ class TKqpScanQueryRequestHandler : public TRequestHandlerBase<
226226
void HandleResponse(typename TResponse::TPtr &ev, const TActorContext &ctx) {
227227
auto& response = *ev->Get()->Record.GetRef().MutableResponse();
228228

229-
NKikimr::ConvertYdbResultToKqpResult(ResultSet,*response.AddResults());
229+
response.AddYdbResults()->CopyFrom(ResultSet);
230230
for (auto& execStats : Executions) {
231231
response.MutableQueryStats()->AddExecutions()->Swap(&execStats);
232232
}
@@ -285,20 +285,18 @@ class TKqpStreamRequestHandler : public TRequestHandlerBase<
285285
virtual void HandleResponse(typename TResponse::TPtr &ev, const TActorContext &ctx) {
286286
auto& record = ev->Get()->Record.GetRef();
287287
if (record.GetYdbStatus() == Ydb::StatusIds::SUCCESS) {
288-
if (record.MutableResponse()->GetResults().size()) {
288+
if (record.MutableResponse()->GetYdbResults().size()) {
289289
// Send result sets to RPC actor TStreamExecuteYqlScriptRPC
290290
auto evStreamPart = MakeHolder<NKqp::TEvKqp::TEvDataQueryStreamPart>();
291291
ActorIdToProto(this->SelfId(), evStreamPart->Record.MutableGatewayActorId());
292292

293-
for (int i = 0; i < record.MutableResponse()->MutableResults()->size(); ++i) {
293+
for (int i = 0; i < record.MutableResponse()->MutableYdbResults()->size(); ++i) {
294294
// Workaround to avoid errors on Pull execution stage which would expect some results
295-
Ydb::ResultSet resultSet;
296-
NKikimr::ConvertYdbResultToKqpResult(resultSet, *evStreamPart->Record.AddResults());
295+
evStreamPart->Record.AddResults();
297296
}
298297

299-
evStreamPart->Record.MutableResults()->Swap(record.MutableResponse()->MutableResults());
298+
evStreamPart->Record.MutableResults()->Swap(record.MutableResponse()->MutableYdbResults());
300299
this->Send(TargetActorId, evStreamPart.Release());
301-
302300
// Save response without data to send it later
303301
ResponseHandle = ev.Release();
304302
} else {
@@ -404,7 +402,7 @@ class TKqpForwardStreamRequestHandler : public TRequestHandlerBase<
404402
auto& response = *ev->Get()->Record.GetRef().MutableResponse();
405403

406404
Ydb::ResultSet resultSet;
407-
NKikimr::ConvertYdbResultToKqpResult(resultSet, *response.AddResults());
405+
response.AddYdbResults()->CopyFrom(resultSet);
408406
for (auto& execStats : Executions) {
409407
response.MutableQueryStats()->AddExecutions()->Swap(&execStats);
410408
}
@@ -510,7 +508,7 @@ class TKqpGenericQueryRequestHandler: public TRequestHandlerBase<
510508
auto& response = *ev->Get()->Record.GetRef().MutableResponse();
511509

512510
for (auto& resultSet : ResultSets) {
513-
ConvertYdbResultToKqpResult(std::move(resultSet.ResultSet), *response.AddResults());
511+
response.AddYdbResults()->Swap(&resultSet.ResultSet);
514512
}
515513

516514
TBase::HandleResponse(ev, ctx);
@@ -671,8 +669,8 @@ void KqpResponseToQueryResult(const NKikimrKqp::TEvQueryResponse& response, IKqp
671669
queryResult.AddIssue(NYql::IssueFromMessage(issue));
672670
}
673671

674-
for (auto& result : queryResponse.GetResults()) {
675-
auto arenaResult = google::protobuf::Arena::CreateMessage<NKikimrMiniKQL::TResult>(
672+
for (auto& result : queryResponse.GetYdbResults()) {
673+
auto arenaResult = google::protobuf::Arena::CreateMessage<Ydb::ResultSet>(
676674
queryResult.ProtobufArenaPtr.get());
677675

678676
arenaResult->CopyFrom(result);

ydb/core/kqp/host/kqp_host.cpp

+2-2
Original file line numberDiff line numberDiff line change
@@ -288,8 +288,8 @@ class TAsyncExecuteYqlResult : public TKqpAsyncResultBase<IKqpHost::TQueryResult
288288

289289
for (auto& resultStr : ResultProviderConfig.CommittedResults) {
290290
queryResult.Results.emplace_back(
291-
google::protobuf::Arena::CreateMessage<NKikimrMiniKQL::TResult>(queryResult.ProtobufArenaPtr.get()));
292-
NKikimrMiniKQL::TResult* result = queryResult.Results.back();
291+
google::protobuf::Arena::CreateMessage<Ydb::ResultSet>(queryResult.ProtobufArenaPtr.get()));
292+
Ydb::ResultSet* result = queryResult.Results.back();
293293

294294
if (!result->ParseFromArray(resultStr.data(), resultStr.size())) {
295295
queryResult = ResultFromError<TResult>("Failed to parse run result.");

ydb/core/kqp/host/kqp_runner.cpp

-10
Original file line numberDiff line numberDiff line change
@@ -116,17 +116,7 @@ class TPrepareQueryAsyncResult : public TKqpAsyncResultBase<IKikimrQueryExecutor
116116
}
117117

118118
void FillResult(TResult& queryResult) const override {
119-
TVector<NKikimrMiniKQL::TResult*> results;
120-
for (auto& phyResult : TransformCtx.PhysicalQueryResults) {
121-
auto result = google::protobuf::Arena::CreateMessage<NKikimrMiniKQL::TResult>(
122-
queryResult.ProtobufArenaPtr.get());
123-
124-
result->CopyFrom(phyResult);
125-
results.push_back(result);
126-
}
127-
128119
queryResult.QueryStats.CopyFrom(TransformCtx.QueryStats);
129-
queryResult.Results = std::move(results);
130120
}
131121

132122
private:

ydb/core/kqp/host/kqp_transform.h

-5
Original file line numberDiff line numberDiff line change
@@ -26,18 +26,13 @@ struct TKqlTransformContext : TThrRefBase {
2626
NKqpProto::TKqpStatsQuery QueryStats;
2727
std::shared_ptr<const NKqpProto::TKqpPhyQuery> PhysicalQuery;
2828

29-
TVector<TSimpleSharedPtr<NKikimrMiniKQL::TResult>> MkqlResults;
30-
TVector<NKikimrMiniKQL::TResult> PhysicalQueryResults;
31-
3229
NYql::TExprNode::TPtr ExplainTransformerInput; // Explain transformer must work after other transformers, but use input before peephole
3330
TMaybe<NYql::NNodes::TKiDataQueryBlocks> DataQueryBlocks;
3431

3532
void Reset() {
3633
ReplyTarget = {};
37-
MkqlResults.clear();
3834
QueryStats = {};
3935
PhysicalQuery = nullptr;
40-
PhysicalQueryResults.clear();
4136
ExplainTransformerInput = nullptr;
4237
DataQueryBlocks = Nothing();
4338
}

ydb/core/kqp/provider/yql_kikimr_exec.cpp

+1-5
Original file line numberDiff line numberDiff line change
@@ -980,16 +980,12 @@ class TKiSourceCallableExecutionTransformer : public TAsyncCallbackTransformer<T
980980

981981
private:
982982
static TExprNode::TPtr GetResOrPullResult(const TExprNode& node, const IDataProvider::TFillSettings& fillSettings,
983-
const NKikimrMiniKQL::TResult& resultValue, TExprContext& ctx)
983+
const Ydb::ResultSet& resultValue, TExprContext& ctx)
984984
{
985985
TVector<TString> columnHints(NCommon::GetResOrPullColumnHints(node));
986986

987987
auto protoValue = &resultValue;
988988
YQL_ENSURE(resultValue.GetArena());
989-
if (IsRawKikimrResult(resultValue)) {
990-
protoValue = KikimrResultToProto(resultValue, columnHints, fillSettings, resultValue.GetArena());
991-
}
992-
993989
YQL_ENSURE(fillSettings.Format == IDataProvider::EResultFormat::Custom);
994990
YQL_ENSURE(fillSettings.FormatDetails == KikimrMkqlProtoFormat);
995991

ydb/core/kqp/provider/yql_kikimr_gateway.h

+1-1
Original file line numberDiff line numberDiff line change
@@ -856,7 +856,7 @@ class IKikimrGateway : public TThrRefBase {
856856

857857
struct TQueryResult : public TGenericResult {
858858
TString SessionId;
859-
TVector<NKikimrMiniKQL::TResult*> Results;
859+
TVector<Ydb::ResultSet*> Results;
860860
TMaybe<NKikimrKqp::TQueryProfile> Profile; // TODO: Deprecate.
861861
NKqpProto::TKqpStatsQuery QueryStats;
862862
std::unique_ptr<NKikimrKqp::TPreparedQuery> PreparingQuery;

ydb/core/kqp/session_actor/kqp_response.cpp

-30
Original file line numberDiff line numberDiff line change
@@ -53,36 +53,6 @@ bool HasSchemeOrFatalIssues(const TIssue& issue) {
5353

5454
} // namespace
5555

56-
void ConvertKqpQueryResultToDbResult(const NKikimrMiniKQL::TResult& from, Ydb::ResultSet* to) {
57-
const auto& type = from.GetType();
58-
TStackVec<NKikimrMiniKQL::TType> columnTypes;
59-
Y_ENSURE(type.GetKind() == NKikimrMiniKQL::ETypeKind::Struct);
60-
for (const auto& member : type.GetStruct().GetMember()) {
61-
if (member.GetType().GetKind() == NKikimrMiniKQL::ETypeKind::List) {
62-
for (const auto& column : member.GetType().GetList().GetItem().GetStruct().GetMember()) {
63-
auto columnMeta = to->add_columns();
64-
columnMeta->set_name(column.GetName());
65-
columnTypes.push_back(column.GetType());
66-
ConvertMiniKQLTypeToYdbType(column.GetType(), *columnMeta->mutable_type());
67-
}
68-
}
69-
}
70-
for (const auto& responseStruct : from.GetValue().GetStruct()) {
71-
for (const auto& row : responseStruct.GetList()) {
72-
auto newRow = to->add_rows();
73-
ui32 columnCount = static_cast<ui32>(row.StructSize());
74-
Y_ENSURE(columnCount == columnTypes.size());
75-
for (ui32 i = 0; i < columnCount; i++) {
76-
const auto& column = row.GetStruct(i);
77-
ConvertMiniKQLValueToYdbValue(columnTypes[i], column, *newRow->add_items());
78-
}
79-
}
80-
if (responseStruct.Getvalue_valueCase() == NKikimrMiniKQL::TValue::kBool) {
81-
to->set_truncated(responseStruct.GetBool());
82-
}
83-
}
84-
}
85-
8656
TMaybe<Ydb::StatusIds::StatusCode> GetYdbStatus(const TIssue& issue) {
8757
if (issue.GetSeverity() == TSeverityIds::S_FATAL) {
8858
return Ydb::StatusIds::INTERNAL_ERROR;

ydb/core/kqp/session_actor/kqp_session_actor.cpp

+5-22
Original file line numberDiff line numberDiff line change
@@ -1771,7 +1771,6 @@ class TKqpSessionActor : public TActorBootstrapped<TKqpSessionActor> {
17711771
// Result for scan query is sent directly to target actor.
17721772
Y_ABORT_UNLESS(response->GetArena());
17731773
if (QueryState->PreparedQuery) {
1774-
bool useYdbResponseFormat = QueryState->GetUsePublicResponseDataFormat();
17751774
auto& phyQuery = QueryState->PreparedQuery->GetPhysicalQuery();
17761775
size_t trailingResultsCount = 0;
17771776
for (size_t i = 0; i < phyQuery.ResultBindingsSize(); ++i) {
@@ -1788,28 +1787,12 @@ class TKqpSessionActor : public TActorBootstrapped<TKqpSessionActor> {
17881787
continue;
17891788
}
17901789

1791-
if (useYdbResponseFormat) {
1792-
TMaybe<ui64> effectiveRowsLimit = FillSettings.RowsLimitPerWrite;
1793-
if (QueryState->PreparedQuery->GetResults(i).GetRowsLimit()) {
1794-
effectiveRowsLimit = QueryState->PreparedQuery->GetResults(i).GetRowsLimit();
1795-
}
1796-
auto* ydbResult = QueryState->QueryData->GetYdbTxResult(phyQuery.GetResultBindings(i), response->GetArena(), effectiveRowsLimit);
1797-
response->AddYdbResults()->Swap(ydbResult);
1798-
} else {
1799-
auto* protoRes = QueryState->QueryData->GetMkqlTxResult(phyQuery.GetResultBindings(i), response->GetArena());
1800-
std::optional<IDataProvider::TFillSettings> fillSettings;
1801-
if (QueryState->PreparedQuery->ResultsSize()) {
1802-
YQL_ENSURE(phyQuery.ResultBindingsSize() == QueryState->PreparedQuery->ResultsSize(), ""
1803-
<< phyQuery.ResultBindingsSize() << " != " << QueryState->PreparedQuery->ResultsSize());
1804-
const auto& result = QueryState->PreparedQuery->GetResults(i);
1805-
if (result.GetRowsLimit()) {
1806-
fillSettings = FillSettings;
1807-
fillSettings->RowsLimitPerWrite = result.GetRowsLimit();
1808-
}
1809-
}
1810-
auto* finalResult = KikimrResultToProto(*protoRes, {}, fillSettings.value_or(FillSettings), response->GetArena());
1811-
response->AddResults()->Swap(finalResult);
1790+
TMaybe<ui64> effectiveRowsLimit = FillSettings.RowsLimitPerWrite;
1791+
if (QueryState->PreparedQuery->GetResults(i).GetRowsLimit()) {
1792+
effectiveRowsLimit = QueryState->PreparedQuery->GetResults(i).GetRowsLimit();
18121793
}
1794+
auto* ydbResult = QueryState->QueryData->GetYdbTxResult(phyQuery.GetResultBindings(i), response->GetArena(), effectiveRowsLimit);
1795+
response->AddYdbResults()->Swap(ydbResult);
18131796
}
18141797
}
18151798

ydb/core/kqp/session_actor/kqp_worker_actor.cpp

+1-1
Original file line numberDiff line numberDiff line change
@@ -960,7 +960,7 @@ class TKqpWorkerActor : public TActorBootstrapped<TKqpWorkerActor> {
960960
// If we have result it must be allocated on protobuf arena
961961
Y_ASSERT(result->GetArena());
962962
Y_ASSERT(resp->GetArena() == result->GetArena());
963-
resp->AddResults()->Swap(result);
963+
resp->AddYdbResults()->Swap(result);
964964
}
965965
} else {
966966
auto resp = ev.MutableResponse();

ydb/core/kqp/session_actor/kqp_worker_common.cpp

+1-1
Original file line numberDiff line numberDiff line change
@@ -119,7 +119,7 @@ void SlowLogQuery(const TActorContext &ctx, const TKikimrConfiguration* config,
119119
<< 'b';
120120

121121
ui64 resultsSize = 0;
122-
for (auto& result : record->GetResponse().GetResults()) {
122+
for (auto& result : record->GetResponse().GetYdbResults()) {
123123
resultsSize += result.ByteSize();
124124
}
125125

0 commit comments

Comments
 (0)