Skip to content

Commit 2dbe2ec

Browse files
committed
Detach result deliting for forget operation
1 parent eeb0728 commit 2dbe2ec

File tree

6 files changed

+41
-52
lines changed

6 files changed

+41
-52
lines changed

ydb/core/grpc_services/rpc_forget_operation.cpp

+1-1
Original file line numberDiff line numberDiff line change
@@ -97,7 +97,7 @@ class TForgetOperationRPC: public TRpcOperationRequestActor<TForgetOperationRPC,
9797
}
9898

9999
void SendForgetScriptExecutionOperation() {
100-
Send(NKqp::MakeKqpProxyID(SelfId().NodeId()), new NKqp::TEvForgetScriptExecutionOperation(DatabaseName, OperationId, Request->GetDeadline()));
100+
Send(NKqp::MakeKqpProxyID(SelfId().NodeId()), new NKqp::TEvForgetScriptExecutionOperation(DatabaseName, OperationId));
101101
}
102102

103103
public:

ydb/core/kqp/common/events/script_executions.h

+4-7
Original file line numberDiff line numberDiff line change
@@ -23,16 +23,13 @@ enum EFinalizationStatus : i32 {
2323
};
2424

2525
struct TEvForgetScriptExecutionOperation : public NActors::TEventLocal<TEvForgetScriptExecutionOperation, TKqpScriptExecutionEvents::EvForgetScriptExecutionOperation> {
26-
explicit TEvForgetScriptExecutionOperation(const TString& database, const NOperationId::TOperationId& id, TInstant deadline)
26+
TEvForgetScriptExecutionOperation(const TString& database, const NOperationId::TOperationId& id)
2727
: Database(database)
2828
, OperationId(id)
29-
, Deadline(deadline)
30-
{
31-
}
29+
{}
3230

33-
TString Database;
34-
NOperationId::TOperationId OperationId;
35-
TInstant Deadline;
31+
const TString Database;
32+
const NOperationId::TOperationId OperationId;
3633
};
3734

3835
struct TEvForgetScriptExecutionOperationResponse : public NActors::TEventLocal<TEvForgetScriptExecutionOperationResponse, TKqpScriptExecutionEvents::EvForgetScriptExecutionOperationResponse> {

ydb/core/kqp/proxy_service/kqp_script_executions.cpp

+26-30
Original file line numberDiff line numberDiff line change
@@ -823,11 +823,10 @@ class TForgetScriptExecutionOperationQueryActor : public TQueryBase {
823823
static constexpr i32 MAX_NUMBER_ROWS_IN_BATCH = 100000;
824824

825825
public:
826-
TForgetScriptExecutionOperationQueryActor(const TString& executionId, const TString& database, TInstant operationDeadline)
826+
TForgetScriptExecutionOperationQueryActor(const TString& executionId, const TString& database)
827827
: TQueryBase(__func__, executionId)
828828
, ExecutionId(executionId)
829829
, Database(database)
830-
, Deadline(operationDeadline)
831830
{}
832831

833832
void OnRunQuery() override {
@@ -891,7 +890,16 @@ class TForgetScriptExecutionOperationQueryActor : public TQueryBase {
891890
}
892891
MaxRowId = *maxRowId;
893892

894-
ClearTimeInfo();
893+
if (MaxRowId >= NumberRowsInBatch) {
894+
TStringBuilder message = TStringBuilder() << "Query result rows count is " << MaxRowId + 1;
895+
if (*maxResultSetId > 0) {
896+
message << " in " << *maxResultSetId + 1 << " result sets";
897+
}
898+
NYql::TIssue issue(message << ", that is larger than allowed limit " << MAX_NUMBER_ROWS_IN_BATCH << " rows for one time forget, results will be forgotten in the background process");
899+
issue.SetCode(NYql::DEFAULT_ERROR, NYql::TSeverityIds::S_INFO);
900+
SendResponse(Ydb::StatusIds::SUCCESS, {issue});
901+
}
902+
895903
DeleteScriptResults();
896904
}
897905

@@ -937,34 +945,34 @@ class TForgetScriptExecutionOperationQueryActor : public TQueryBase {
937945
return;
938946
}
939947

940-
if (TInstant::Now() + 2 * GetAverageTime() >= Deadline) {
941-
Finish(Ydb::StatusIds::TIMEOUT, ForgetOperationTimeoutIssues());
942-
return;
943-
}
944-
945948
DeleteScriptResults();
946949
}
947950

948951
void OnFinish(Ydb::StatusIds::StatusCode status, NYql::TIssues&& issues) override {
949-
Send(Owner, new TEvForgetScriptExecutionOperationResponse(status, std::move(issues)));
952+
SendResponse(status, std::move(issues));
950953
}
951954

952-
static NYql::TIssues ForgetOperationTimeoutIssues() {
953-
return { NYql::TIssue("Forget script execution operation timeout") };
955+
private:
956+
void SendResponse(Ydb::StatusIds::StatusCode status, NYql::TIssues issues) {
957+
if (ResponseSent) {
958+
return;
959+
}
960+
ResponseSent = true;
961+
Send(Owner, new TEvForgetScriptExecutionOperationResponse(status, std::move(issues)));
954962
}
955963

956964
private:
957-
TString ExecutionId;
958-
TString Database;
959-
TInstant Deadline;
965+
const TString ExecutionId;
966+
const TString Database;
960967
i64 NumberRowsInBatch = 0;
961968
i64 MaxRowId = 0;
969+
bool ResponseSent = false;
962970
};
963971

964972
class TForgetScriptExecutionOperationActor : public TActorBootstrapped<TForgetScriptExecutionOperationActor> {
965-
public:
966-
using TForgetOperationRetryActor = TQueryRetryActor<TForgetScriptExecutionOperationQueryActor, TEvForgetScriptExecutionOperationResponse, TString, TString, TInstant>;
973+
using TForgetOperationRetryActor = TQueryRetryActor<TForgetScriptExecutionOperationQueryActor, TEvForgetScriptExecutionOperationResponse, TString, TString>;
967974

975+
public:
968976
explicit TForgetScriptExecutionOperationActor(TEvForgetScriptExecutionOperation::TPtr ev)
969977
: Request(std::move(ev))
970978
{}
@@ -1002,19 +1010,7 @@ class TForgetScriptExecutionOperationActor : public TActorBootstrapped<TForgetSc
10021010
}
10031011

10041012
KQP_PROXY_LOG_D("[TForgetScriptExecutionOperationActor] ExecutionId: " << ExecutionId << ", lease check success. Start TForgetOperationRetryActor");
1005-
1006-
TDuration minDelay = TDuration::MilliSeconds(10);
1007-
TDuration maxTime = Request->Get()->Deadline - TInstant::Now();
1008-
if (maxTime <= minDelay) {
1009-
Reply(Ydb::StatusIds::TIMEOUT, TForgetScriptExecutionOperationQueryActor::ForgetOperationTimeoutIssues());
1010-
return;
1011-
}
1012-
1013-
Register(new TForgetOperationRetryActor(
1014-
SelfId(),
1015-
TForgetOperationRetryActor::IRetryPolicy::GetExponentialBackoffPolicy(TForgetOperationRetryActor::Retryable, minDelay, TDuration::MilliSeconds(200), TDuration::Seconds(1), std::numeric_limits<size_t>::max(), maxTime),
1016-
ExecutionId, Request->Get()->Database, TInstant::Now() + maxTime
1017-
));
1013+
Register(new TForgetOperationRetryActor(SelfId(), ExecutionId, Request->Get()->Database));
10181014
}
10191015

10201016
void Handle(TEvForgetScriptExecutionOperationResponse::TPtr& ev) {
@@ -1042,7 +1038,7 @@ class TForgetScriptExecutionOperationActor : public TActorBootstrapped<TForgetSc
10421038
}
10431039

10441040
private:
1045-
TEvForgetScriptExecutionOperation::TPtr Request;
1041+
const TEvForgetScriptExecutionOperation::TPtr Request;
10461042
TString ExecutionId;
10471043
bool ExecutionEntryExists = true;
10481044
};

ydb/core/kqp/ut/federated_query/s3/kqp_federated_query_ut.cpp

+5-13
Original file line numberDiff line numberDiff line change
@@ -1752,22 +1752,14 @@ Y_UNIT_TEST_SUITE(KqpFederatedQuery) {
17521752
UNIT_ASSERT_VALUES_EQUAL(rowsFetched, numberRows);
17531753

17541754
// Test forget operation
1755-
TInstant forgetOperationTimeout = TInstant::Now() + NSan::PlainOrUnderSanitizer(TDuration::Minutes(5), TDuration::Minutes(20));
17561755
NYdb::NOperation::TOperationClient operationClient(kikimr->GetDriver());
1757-
while (TInstant::Now() < forgetOperationTimeout) {
1758-
auto status = operationClient.Forget(scriptExecutionOperation.Id()).ExtractValueSync();
1759-
if (status.GetStatus() == NYdb::EStatus::SUCCESS || status.GetStatus() == NYdb::EStatus::NOT_FOUND) {
1760-
return;
1761-
}
1762-
1763-
UNIT_ASSERT_C(status.GetStatus() == NYdb::EStatus::ABORTED || status.GetStatus() == NYdb::EStatus::TIMEOUT || status.GetStatus() == NYdb::EStatus::CLIENT_DEADLINE_EXCEEDED, status.GetIssues().ToString());
1756+
auto status = operationClient.Forget(scriptExecutionOperation.Id()).ExtractValueSync();
1757+
UNIT_ASSERT_C(status.IsSuccess(), status.GetIssues().ToOneLineString());
17641758

1765-
if (status.GetStatus() == NYdb::EStatus::CLIENT_DEADLINE_EXCEEDED) {
1766-
// Wait until last forget is not finished
1767-
Sleep(TDuration::Seconds(30));
1768-
}
1759+
const size_t forgetRowsLimit = 100000;
1760+
if (numberRows > forgetRowsLimit) {
1761+
UNIT_ASSERT_STRING_CONTAINS(status.GetIssues().ToString(), TStringBuilder() << "Info: Query result rows count is " << numberRows << ", that is larger than allowed limit " << forgetRowsLimit << " rows for one time forget, results will be forgotten in the background process");
17691762
}
1770-
UNIT_ASSERT_C(false, "Forget operation timeout");
17711763
}
17721764

17731765
Y_UNIT_TEST(ExecuteScriptWithLargeStrings) {

ydb/tests/tools/kqprun/src/kqp_runner.cpp

+4
Original file line numberDiff line numberDiff line change
@@ -199,6 +199,10 @@ class TKqpRunner::TImpl {
199199
return false;
200200
}
201201

202+
if (!status.Issues.Empty()) {
203+
Cerr << CerrColors_.Red() << "Forget operation finished with issues:" << CerrColors_.Default() << Endl << status.Issues.ToString() << Endl;
204+
}
205+
202206
return true;
203207
}
204208

ydb/tests/tools/kqprun/src/ydb_setup.cpp

+1-1
Original file line numberDiff line numberDiff line change
@@ -256,7 +256,7 @@ class TYdbSetup::TImpl {
256256

257257
NKikimr::NKqp::TEvForgetScriptExecutionOperationResponse::TPtr ForgetScriptExecutionOperationRequest(const TString& operation) const {
258258
NKikimr::NOperationId::TOperationId operationId(operation);
259-
auto event = MakeHolder<NKikimr::NKqp::TEvForgetScriptExecutionOperation>(Settings_.DomainName, operationId, TInstant::Max());
259+
auto event = MakeHolder<NKikimr::NKqp::TEvForgetScriptExecutionOperation>(Settings_.DomainName, operationId);
260260

261261
return RunKqpProxyRequest<NKikimr::NKqp::TEvForgetScriptExecutionOperation, NKikimr::NKqp::TEvForgetScriptExecutionOperationResponse>(std::move(event));
262262
}

0 commit comments

Comments
 (0)