@@ -31,6 +31,8 @@ class TJsonQuery : public TViewerPipeClient {
31
31
TString TransactionMode;
32
32
bool Direct = false ;
33
33
bool IsBase64Encode = true ;
34
+ int LimitRows = 10000 ;
35
+ int TotalRows = 0 ;
34
36
35
37
enum ESchemaType {
36
38
Classic,
@@ -89,6 +91,9 @@ class TJsonQuery : public TViewerPipeClient {
89
91
if (params.Has (" base64" )) {
90
92
IsBase64Encode = FromStringWithDefault<bool >(params.Get (" base64" ), true );
91
93
}
94
+ if (params.Has (" limit_rows" )) {
95
+ LimitRows = std::clamp<int >(FromStringWithDefault<int >(params.Get (" limit_rows" ), 10000 ), 1 , 100000 );
96
+ }
92
97
Direct = FromStringWithDefault<bool >(params.Get (" direct" ), Direct);
93
98
}
94
99
@@ -124,6 +129,9 @@ class TJsonQuery : public TViewerPipeClient {
124
129
if (requestData.Has (" base64" )) {
125
130
IsBase64Encode = requestData[" base64" ].GetBooleanRobust ();
126
131
}
132
+ if (requestData.Has (" limit_rows" )) {
133
+ LimitRows = std::clamp<int >(requestData[" limit_rows" ].GetIntegerRobust (), 1 , 100000 );
134
+ }
127
135
}
128
136
return success;
129
137
}
@@ -307,7 +315,13 @@ class TJsonQuery : public TViewerPipeClient {
307
315
request.SetAction (NKikimrKqp::QUERY_ACTION_EXPLAIN);
308
316
request.SetType (NKikimrKqp::QUERY_TYPE_SQL_SCRIPT);
309
317
}
310
- if (Stats == " profile" ) {
318
+ if (Stats == " none" ) {
319
+ request.SetStatsMode (NYql::NDqProto::DQ_STATS_MODE_NONE);
320
+ request.SetCollectStats (Ydb::Table::QueryStatsCollection::STATS_COLLECTION_NONE);
321
+ } else if (Stats == " basic" ) {
322
+ request.SetStatsMode (NYql::NDqProto::DQ_STATS_MODE_BASIC);
323
+ request.SetCollectStats (Ydb::Table::QueryStatsCollection::STATS_COLLECTION_BASIC);
324
+ } else if (Stats == " profile" ) {
311
325
request.SetStatsMode (NYql::NDqProto::DQ_STATS_MODE_PROFILE);
312
326
request.SetCollectStats (Ydb::Table::QueryStatsCollection::STATS_COLLECTION_PROFILE);
313
327
} else if (Stats == " full" ) {
@@ -479,13 +493,23 @@ class TJsonQuery : public TViewerPipeClient {
479
493
}
480
494
481
495
void HandleReply (NKqp::TEvKqpExecuter::TEvStreamData::TPtr& ev) {
482
- const NKikimrKqp::TEvExecuterStreamData& data (ev->Get ()->Record );
496
+ NKikimrKqp::TEvExecuterStreamData& data (ev->Get ()->Record );
483
497
484
- ResultSets.emplace_back ();
485
- ResultSets.back () = std::move (data.GetResultSet ());
498
+ if (TotalRows < LimitRows) {
499
+ int rowsAvailable = LimitRows - TotalRows;
500
+ if (data.GetResultSet ().rows_size () > rowsAvailable) {
501
+ data.MutableResultSet ()->mutable_rows ()->Truncate (rowsAvailable);
502
+ data.MutableResultSet ()->set_truncated (true );
503
+ }
504
+ TotalRows += data.GetResultSet ().rows_size ();
505
+ ResultSets.emplace_back () = std::move (*data.MutableResultSet ());
506
+ }
486
507
487
508
THolder<NKqp::TEvKqpExecuter::TEvStreamDataAck> ack = MakeHolder<NKqp::TEvKqpExecuter::TEvStreamDataAck>();
488
509
ack->Record .SetSeqNo (ev->Get ()->Record .GetSeqNo ());
510
+ if (TotalRows >= LimitRows) {
511
+ ack->Record .SetEnough (true );
512
+ }
489
513
Send (ev->Sender , ack.Release ());
490
514
}
491
515
@@ -618,6 +642,9 @@ class TJsonQuery : public TViewerPipeClient {
618
642
jsonColumn = ColumnValueToJsonValue (rsParser.ColumnParser (columnNum));
619
643
}
620
644
}
645
+ if (resultSet.Truncated ()) {
646
+ jsonResult[" truncated" ] = true ;
647
+ }
621
648
}
622
649
}
623
650
0 commit comments