@@ -823,11 +823,10 @@ class TForgetScriptExecutionOperationQueryActor : public TQueryBase {
823
823
static constexpr i32 MAX_NUMBER_ROWS_IN_BATCH = 100000 ;
824
824
825
825
public:
826
- TForgetScriptExecutionOperationQueryActor (const TString& executionId, const TString& database, TInstant operationDeadline )
826
+ TForgetScriptExecutionOperationQueryActor (const TString& executionId, const TString& database)
827
827
: TQueryBase(__func__, executionId)
828
828
, ExecutionId(executionId)
829
829
, Database(database)
830
- , Deadline(operationDeadline)
831
830
{}
832
831
833
832
void OnRunQuery () override {
@@ -840,14 +839,36 @@ class TForgetScriptExecutionOperationQueryActor : public TQueryBase {
840
839
FROM `.metadata/script_executions`
841
840
WHERE database = $database AND execution_id = $execution_id;
842
841
842
+ DELETE
843
+ FROM `.metadata/script_execution_leases`
844
+ WHERE database = $database AND execution_id = $execution_id;
845
+ )" ;
846
+
847
+ NYdb::TParamsBuilder params;
848
+ params
849
+ .AddParam (" $database" )
850
+ .Utf8 (Database)
851
+ .Build ()
852
+ .AddParam (" $execution_id" )
853
+ .Utf8 (ExecutionId)
854
+ .Build ();
855
+
856
+ RunDataQuery (sql, ¶ms);
857
+ SetQueryResultHandler (&TForgetScriptExecutionOperationQueryActor::OnOperationDeleted, " Forget script execution operation" );
858
+ }
859
+
860
+ void OnOperationDeleted () {
861
+ SendResponse (Ydb::StatusIds::SUCCESS, {});
862
+
863
+ TString sql = R"(
864
+ -- TForgetScriptExecutionOperationQueryActor::OnOperationDeleted
865
+ DECLARE $database AS Text;
866
+ DECLARE $execution_id AS Text;
867
+
843
868
SELECT MAX(result_set_id) AS max_result_set_id, MAX(row_id) AS max_row_id
844
869
FROM `.metadata/result_sets`
845
870
WHERE database = $database AND execution_id = $execution_id AND
846
871
(expire_at > CurrentUtcTimestamp() OR expire_at IS NULL);
847
-
848
- DELETE
849
- FROM `.metadata/script_execution_leases`
850
- WHERE database = $database AND execution_id = $execution_id;
851
872
)" ;
852
873
853
874
NYdb::TParamsBuilder params;
@@ -860,7 +881,7 @@ class TForgetScriptExecutionOperationQueryActor : public TQueryBase {
860
881
.Build ();
861
882
862
883
RunDataQuery (sql, ¶ms);
863
- SetQueryResultHandler (&TForgetScriptExecutionOperationQueryActor::OnGetResultsInfo, " Forget script execution operation " );
884
+ SetQueryResultHandler (&TForgetScriptExecutionOperationQueryActor::OnGetResultsInfo, " Get results info " );
864
885
}
865
886
866
887
void OnGetResultsInfo () {
@@ -891,7 +912,6 @@ class TForgetScriptExecutionOperationQueryActor : public TQueryBase {
891
912
}
892
913
MaxRowId = *maxRowId;
893
914
894
- ClearTimeInfo ();
895
915
DeleteScriptResults ();
896
916
}
897
917
@@ -937,34 +957,34 @@ class TForgetScriptExecutionOperationQueryActor : public TQueryBase {
937
957
return ;
938
958
}
939
959
940
- if (TInstant::Now () + 2 * GetAverageTime () >= Deadline) {
941
- Finish (Ydb::StatusIds::TIMEOUT, ForgetOperationTimeoutIssues ());
942
- return ;
943
- }
944
-
945
960
DeleteScriptResults ();
946
961
}
947
962
948
963
void OnFinish (Ydb::StatusIds::StatusCode status, NYql::TIssues&& issues) override {
949
- Send (Owner, new TEvForgetScriptExecutionOperationResponse ( status, std::move (issues) ));
964
+ SendResponse ( status, std::move (issues));
950
965
}
951
966
952
- static NYql::TIssues ForgetOperationTimeoutIssues () {
953
- return { NYql::TIssue (" Forget script execution operation timeout" ) };
967
+ private:
968
+ void SendResponse (Ydb::StatusIds::StatusCode status, NYql::TIssues issues) {
969
+ if (ResponseSent) {
970
+ return ;
971
+ }
972
+ ResponseSent = true ;
973
+ Send (Owner, new TEvForgetScriptExecutionOperationResponse (status, std::move (issues)));
954
974
}
955
975
956
976
private:
957
- TString ExecutionId;
958
- TString Database;
959
- TInstant Deadline;
977
+ const TString ExecutionId;
978
+ const TString Database;
960
979
i64 NumberRowsInBatch = 0 ;
961
980
i64 MaxRowId = 0 ;
981
+ bool ResponseSent = false ;
962
982
};
963
983
964
984
class TForgetScriptExecutionOperationActor : public TActorBootstrapped <TForgetScriptExecutionOperationActor> {
965
- public:
966
- using TForgetOperationRetryActor = TQueryRetryActor<TForgetScriptExecutionOperationQueryActor, TEvForgetScriptExecutionOperationResponse, TString, TString, TInstant>;
985
+ using TForgetOperationRetryActor = TQueryRetryActor<TForgetScriptExecutionOperationQueryActor, TEvForgetScriptExecutionOperationResponse, TString, TString>;
967
986
987
+ public:
968
988
explicit TForgetScriptExecutionOperationActor (TEvForgetScriptExecutionOperation::TPtr ev)
969
989
: Request(std::move(ev))
970
990
{}
@@ -1002,19 +1022,7 @@ class TForgetScriptExecutionOperationActor : public TActorBootstrapped<TForgetSc
1002
1022
}
1003
1023
1004
1024
KQP_PROXY_LOG_D (" [TForgetScriptExecutionOperationActor] ExecutionId: " << ExecutionId << " , lease check success. Start TForgetOperationRetryActor" );
1005
-
1006
- TDuration minDelay = TDuration::MilliSeconds (10 );
1007
- TDuration maxTime = Request->Get ()->Deadline - TInstant::Now ();
1008
- if (maxTime <= minDelay) {
1009
- Reply (Ydb::StatusIds::TIMEOUT, TForgetScriptExecutionOperationQueryActor::ForgetOperationTimeoutIssues ());
1010
- return ;
1011
- }
1012
-
1013
- Register (new TForgetOperationRetryActor (
1014
- SelfId (),
1015
- TForgetOperationRetryActor::IRetryPolicy::GetExponentialBackoffPolicy (TForgetOperationRetryActor::Retryable, minDelay, TDuration::MilliSeconds (200 ), TDuration::Seconds (1 ), std::numeric_limits<size_t >::max (), maxTime),
1016
- ExecutionId, Request->Get ()->Database , TInstant::Now () + maxTime
1017
- ));
1025
+ Register (new TForgetOperationRetryActor (SelfId (), ExecutionId, Request->Get ()->Database ));
1018
1026
}
1019
1027
1020
1028
void Handle (TEvForgetScriptExecutionOperationResponse::TPtr& ev) {
@@ -1042,7 +1050,7 @@ class TForgetScriptExecutionOperationActor : public TActorBootstrapped<TForgetSc
1042
1050
}
1043
1051
1044
1052
private:
1045
- TEvForgetScriptExecutionOperation::TPtr Request;
1053
+ const TEvForgetScriptExecutionOperation::TPtr Request;
1046
1054
TString ExecutionId;
1047
1055
bool ExecutionEntryExists = true ;
1048
1056
};
0 commit comments