Skip to content

Commit 73a405a

Browse files
authored
Merge f9f3237 into 86d2b5b
2 parents 86d2b5b + f9f3237 commit 73a405a

File tree

6 files changed

+58
-57
lines changed

6 files changed

+58
-57
lines changed

ydb/core/grpc_services/rpc_forget_operation.cpp

+1-1
Original file line numberDiff line numberDiff line change
@@ -97,7 +97,7 @@ class TForgetOperationRPC: public TRpcOperationRequestActor<TForgetOperationRPC,
9797
}
9898

9999
void SendForgetScriptExecutionOperation() {
100-
Send(NKqp::MakeKqpProxyID(SelfId().NodeId()), new NKqp::TEvForgetScriptExecutionOperation(DatabaseName, OperationId, Request->GetDeadline()));
100+
Send(NKqp::MakeKqpProxyID(SelfId().NodeId()), new NKqp::TEvForgetScriptExecutionOperation(DatabaseName, OperationId));
101101
}
102102

103103
public:

ydb/core/kqp/common/events/script_executions.h

+4-7
Original file line numberDiff line numberDiff line change
@@ -23,16 +23,13 @@ enum EFinalizationStatus : i32 {
2323
};
2424

2525
struct TEvForgetScriptExecutionOperation : public NActors::TEventLocal<TEvForgetScriptExecutionOperation, TKqpScriptExecutionEvents::EvForgetScriptExecutionOperation> {
26-
explicit TEvForgetScriptExecutionOperation(const TString& database, const NOperationId::TOperationId& id, TInstant deadline)
26+
TEvForgetScriptExecutionOperation(const TString& database, const NOperationId::TOperationId& id)
2727
: Database(database)
2828
, OperationId(id)
29-
, Deadline(deadline)
30-
{
31-
}
29+
{}
3230

33-
TString Database;
34-
NOperationId::TOperationId OperationId;
35-
TInstant Deadline;
31+
const TString Database;
32+
const NOperationId::TOperationId OperationId;
3633
};
3734

3835
struct TEvForgetScriptExecutionOperationResponse : public NActors::TEventLocal<TEvForgetScriptExecutionOperationResponse, TKqpScriptExecutionEvents::EvForgetScriptExecutionOperationResponse> {

ydb/core/kqp/proxy_service/kqp_script_executions.cpp

+43-35
Original file line numberDiff line numberDiff line change
@@ -823,11 +823,10 @@ class TForgetScriptExecutionOperationQueryActor : public TQueryBase {
823823
static constexpr i32 MAX_NUMBER_ROWS_IN_BATCH = 100000;
824824

825825
public:
826-
TForgetScriptExecutionOperationQueryActor(const TString& executionId, const TString& database, TInstant operationDeadline)
826+
TForgetScriptExecutionOperationQueryActor(const TString& executionId, const TString& database)
827827
: TQueryBase(__func__, executionId)
828828
, ExecutionId(executionId)
829829
, Database(database)
830-
, Deadline(operationDeadline)
831830
{}
832831

833832
void OnRunQuery() override {
@@ -840,14 +839,36 @@ class TForgetScriptExecutionOperationQueryActor : public TQueryBase {
840839
FROM `.metadata/script_executions`
841840
WHERE database = $database AND execution_id = $execution_id;
842841
842+
DELETE
843+
FROM `.metadata/script_execution_leases`
844+
WHERE database = $database AND execution_id = $execution_id;
845+
)";
846+
847+
NYdb::TParamsBuilder params;
848+
params
849+
.AddParam("$database")
850+
.Utf8(Database)
851+
.Build()
852+
.AddParam("$execution_id")
853+
.Utf8(ExecutionId)
854+
.Build();
855+
856+
RunDataQuery(sql, &params);
857+
SetQueryResultHandler(&TForgetScriptExecutionOperationQueryActor::OnOperationDeleted, "Forget script execution operation");
858+
}
859+
860+
void OnOperationDeleted() {
861+
SendResponse(Ydb::StatusIds::SUCCESS, {});
862+
863+
TString sql = R"(
864+
-- TForgetScriptExecutionOperationQueryActor::OnOperationDeleted
865+
DECLARE $database AS Text;
866+
DECLARE $execution_id AS Text;
867+
843868
SELECT MAX(result_set_id) AS max_result_set_id, MAX(row_id) AS max_row_id
844869
FROM `.metadata/result_sets`
845870
WHERE database = $database AND execution_id = $execution_id AND
846871
(expire_at > CurrentUtcTimestamp() OR expire_at IS NULL);
847-
848-
DELETE
849-
FROM `.metadata/script_execution_leases`
850-
WHERE database = $database AND execution_id = $execution_id;
851872
)";
852873

853874
NYdb::TParamsBuilder params;
@@ -860,7 +881,7 @@ class TForgetScriptExecutionOperationQueryActor : public TQueryBase {
860881
.Build();
861882

862883
RunDataQuery(sql, &params);
863-
SetQueryResultHandler(&TForgetScriptExecutionOperationQueryActor::OnGetResultsInfo, "Forget script execution operation");
884+
SetQueryResultHandler(&TForgetScriptExecutionOperationQueryActor::OnGetResultsInfo, "Get results info");
864885
}
865886

866887
void OnGetResultsInfo() {
@@ -891,7 +912,6 @@ class TForgetScriptExecutionOperationQueryActor : public TQueryBase {
891912
}
892913
MaxRowId = *maxRowId;
893914

894-
ClearTimeInfo();
895915
DeleteScriptResults();
896916
}
897917

@@ -937,34 +957,34 @@ class TForgetScriptExecutionOperationQueryActor : public TQueryBase {
937957
return;
938958
}
939959

940-
if (TInstant::Now() + 2 * GetAverageTime() >= Deadline) {
941-
Finish(Ydb::StatusIds::TIMEOUT, ForgetOperationTimeoutIssues());
942-
return;
943-
}
944-
945960
DeleteScriptResults();
946961
}
947962

948963
void OnFinish(Ydb::StatusIds::StatusCode status, NYql::TIssues&& issues) override {
949-
Send(Owner, new TEvForgetScriptExecutionOperationResponse(status, std::move(issues)));
964+
SendResponse(status, std::move(issues));
950965
}
951966

952-
static NYql::TIssues ForgetOperationTimeoutIssues() {
953-
return { NYql::TIssue("Forget script execution operation timeout") };
967+
private:
968+
void SendResponse(Ydb::StatusIds::StatusCode status, NYql::TIssues issues) {
969+
if (ResponseSent) {
970+
return;
971+
}
972+
ResponseSent = true;
973+
Send(Owner, new TEvForgetScriptExecutionOperationResponse(status, std::move(issues)));
954974
}
955975

956976
private:
957-
TString ExecutionId;
958-
TString Database;
959-
TInstant Deadline;
977+
const TString ExecutionId;
978+
const TString Database;
960979
i64 NumberRowsInBatch = 0;
961980
i64 MaxRowId = 0;
981+
bool ResponseSent = false;
962982
};
963983

964984
class TForgetScriptExecutionOperationActor : public TActorBootstrapped<TForgetScriptExecutionOperationActor> {
965-
public:
966-
using TForgetOperationRetryActor = TQueryRetryActor<TForgetScriptExecutionOperationQueryActor, TEvForgetScriptExecutionOperationResponse, TString, TString, TInstant>;
985+
using TForgetOperationRetryActor = TQueryRetryActor<TForgetScriptExecutionOperationQueryActor, TEvForgetScriptExecutionOperationResponse, TString, TString>;
967986

987+
public:
968988
explicit TForgetScriptExecutionOperationActor(TEvForgetScriptExecutionOperation::TPtr ev)
969989
: Request(std::move(ev))
970990
{}
@@ -1002,19 +1022,7 @@ class TForgetScriptExecutionOperationActor : public TActorBootstrapped<TForgetSc
10021022
}
10031023

10041024
KQP_PROXY_LOG_D("[TForgetScriptExecutionOperationActor] ExecutionId: " << ExecutionId << ", lease check success. Start TForgetOperationRetryActor");
1005-
1006-
TDuration minDelay = TDuration::MilliSeconds(10);
1007-
TDuration maxTime = Request->Get()->Deadline - TInstant::Now();
1008-
if (maxTime <= minDelay) {
1009-
Reply(Ydb::StatusIds::TIMEOUT, TForgetScriptExecutionOperationQueryActor::ForgetOperationTimeoutIssues());
1010-
return;
1011-
}
1012-
1013-
Register(new TForgetOperationRetryActor(
1014-
SelfId(),
1015-
TForgetOperationRetryActor::IRetryPolicy::GetExponentialBackoffPolicy(TForgetOperationRetryActor::Retryable, minDelay, TDuration::MilliSeconds(200), TDuration::Seconds(1), std::numeric_limits<size_t>::max(), maxTime),
1016-
ExecutionId, Request->Get()->Database, TInstant::Now() + maxTime
1017-
));
1025+
Register(new TForgetOperationRetryActor(SelfId(), ExecutionId, Request->Get()->Database));
10181026
}
10191027

10201028
void Handle(TEvForgetScriptExecutionOperationResponse::TPtr& ev) {
@@ -1042,7 +1050,7 @@ class TForgetScriptExecutionOperationActor : public TActorBootstrapped<TForgetSc
10421050
}
10431051

10441052
private:
1045-
TEvForgetScriptExecutionOperation::TPtr Request;
1053+
const TEvForgetScriptExecutionOperation::TPtr Request;
10461054
TString ExecutionId;
10471055
bool ExecutionEntryExists = true;
10481056
};

ydb/core/kqp/ut/federated_query/s3/kqp_federated_query_ut.cpp

+5-13
Original file line numberDiff line numberDiff line change
@@ -1752,22 +1752,14 @@ Y_UNIT_TEST_SUITE(KqpFederatedQuery) {
17521752
UNIT_ASSERT_VALUES_EQUAL(rowsFetched, numberRows);
17531753

17541754
// Test forget operation
1755-
TInstant forgetOperationTimeout = TInstant::Now() + NSan::PlainOrUnderSanitizer(TDuration::Minutes(5), TDuration::Minutes(20));
17561755
NYdb::NOperation::TOperationClient operationClient(kikimr->GetDriver());
1757-
while (TInstant::Now() < forgetOperationTimeout) {
1758-
auto status = operationClient.Forget(scriptExecutionOperation.Id()).ExtractValueSync();
1759-
if (status.GetStatus() == NYdb::EStatus::SUCCESS || status.GetStatus() == NYdb::EStatus::NOT_FOUND) {
1760-
return;
1761-
}
1762-
1763-
UNIT_ASSERT_C(status.GetStatus() == NYdb::EStatus::ABORTED || status.GetStatus() == NYdb::EStatus::TIMEOUT || status.GetStatus() == NYdb::EStatus::CLIENT_DEADLINE_EXCEEDED, status.GetIssues().ToString());
1756+
auto status = operationClient.Forget(scriptExecutionOperation.Id()).ExtractValueSync();
1757+
UNIT_ASSERT_C(status.IsSuccess(), status.GetIssues().ToOneLineString());
17641758

1765-
if (status.GetStatus() == NYdb::EStatus::CLIENT_DEADLINE_EXCEEDED) {
1766-
// Wait until last forget is not finished
1767-
Sleep(TDuration::Seconds(30));
1768-
}
1759+
const size_t forgetRowsLimit = 100000;
1760+
if (numberRows > forgetRowsLimit) {
1761+
UNIT_ASSERT_STRING_CONTAINS(status.GetIssues().ToString(), TStringBuilder() << "Info: Query result rows count is " << numberRows << ", that is larger than allowed limit " << forgetRowsLimit << " rows for one time forget, results will be forgotten in the background process");
17691762
}
1770-
UNIT_ASSERT_C(false, "Forget operation timeout");
17711763
}
17721764

17731765
Y_UNIT_TEST(ExecuteScriptWithLargeStrings) {

ydb/tests/tools/kqprun/src/kqp_runner.cpp

+4
Original file line numberDiff line numberDiff line change
@@ -199,6 +199,10 @@ class TKqpRunner::TImpl {
199199
return false;
200200
}
201201

202+
if (!status.Issues.Empty()) {
203+
Cerr << CerrColors_.Red() << "Forget operation finished with issues:" << CerrColors_.Default() << Endl << status.Issues.ToString() << Endl;
204+
}
205+
202206
return true;
203207
}
204208

ydb/tests/tools/kqprun/src/ydb_setup.cpp

+1-1
Original file line numberDiff line numberDiff line change
@@ -256,7 +256,7 @@ class TYdbSetup::TImpl {
256256

257257
NKikimr::NKqp::TEvForgetScriptExecutionOperationResponse::TPtr ForgetScriptExecutionOperationRequest(const TString& operation) const {
258258
NKikimr::NOperationId::TOperationId operationId(operation);
259-
auto event = MakeHolder<NKikimr::NKqp::TEvForgetScriptExecutionOperation>(Settings_.DomainName, operationId, TInstant::Max());
259+
auto event = MakeHolder<NKikimr::NKqp::TEvForgetScriptExecutionOperation>(Settings_.DomainName, operationId);
260260

261261
return RunKqpProxyRequest<NKikimr::NKqp::TEvForgetScriptExecutionOperation, NKikimr::NKqp::TEvForgetScriptExecutionOperationResponse>(std::move(event));
262262
}

0 commit comments

Comments
 (0)