Skip to content

YQ-3225 detached script execution results forgetting #5598

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion ydb/core/grpc_services/rpc_forget_operation.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,7 @@ class TForgetOperationRPC: public TRpcOperationRequestActor<TForgetOperationRPC,
}

void SendForgetScriptExecutionOperation() {
Send(NKqp::MakeKqpProxyID(SelfId().NodeId()), new NKqp::TEvForgetScriptExecutionOperation(DatabaseName, OperationId, Request->GetDeadline()));
Send(NKqp::MakeKqpProxyID(SelfId().NodeId()), new NKqp::TEvForgetScriptExecutionOperation(DatabaseName, OperationId));
}

public:
Expand Down
11 changes: 4 additions & 7 deletions ydb/core/kqp/common/events/script_executions.h
Original file line number Diff line number Diff line change
Expand Up @@ -23,16 +23,13 @@ enum EFinalizationStatus : i32 {
};

struct TEvForgetScriptExecutionOperation : public NActors::TEventLocal<TEvForgetScriptExecutionOperation, TKqpScriptExecutionEvents::EvForgetScriptExecutionOperation> {
explicit TEvForgetScriptExecutionOperation(const TString& database, const NOperationId::TOperationId& id, TInstant deadline)
TEvForgetScriptExecutionOperation(const TString& database, const NOperationId::TOperationId& id)
: Database(database)
, OperationId(id)
, Deadline(deadline)
{
}
{}

TString Database;
NOperationId::TOperationId OperationId;
TInstant Deadline;
const TString Database;
const NOperationId::TOperationId OperationId;
};

struct TEvForgetScriptExecutionOperationResponse : public NActors::TEventLocal<TEvForgetScriptExecutionOperationResponse, TKqpScriptExecutionEvents::EvForgetScriptExecutionOperationResponse> {
Expand Down
78 changes: 43 additions & 35 deletions ydb/core/kqp/proxy_service/kqp_script_executions.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -823,11 +823,10 @@ class TForgetScriptExecutionOperationQueryActor : public TQueryBase {
static constexpr i32 MAX_NUMBER_ROWS_IN_BATCH = 100000;

public:
TForgetScriptExecutionOperationQueryActor(const TString& executionId, const TString& database, TInstant operationDeadline)
TForgetScriptExecutionOperationQueryActor(const TString& executionId, const TString& database)
: TQueryBase(__func__, executionId)
, ExecutionId(executionId)
, Database(database)
, Deadline(operationDeadline)
{}

void OnRunQuery() override {
Expand All @@ -840,14 +839,36 @@ class TForgetScriptExecutionOperationQueryActor : public TQueryBase {
FROM `.metadata/script_executions`
WHERE database = $database AND execution_id = $execution_id;

DELETE
FROM `.metadata/script_execution_leases`
WHERE database = $database AND execution_id = $execution_id;
)";

NYdb::TParamsBuilder params;
params
.AddParam("$database")
.Utf8(Database)
.Build()
.AddParam("$execution_id")
.Utf8(ExecutionId)
.Build();

RunDataQuery(sql, &params);
SetQueryResultHandler(&TForgetScriptExecutionOperationQueryActor::OnOperationDeleted, "Forget script execution operation");
}

void OnOperationDeleted() {
SendResponse(Ydb::StatusIds::SUCCESS, {});

TString sql = R"(
-- TForgetScriptExecutionOperationQueryActor::OnOperationDeleted
DECLARE $database AS Text;
DECLARE $execution_id AS Text;

SELECT MAX(result_set_id) AS max_result_set_id, MAX(row_id) AS max_row_id
FROM `.metadata/result_sets`
WHERE database = $database AND execution_id = $execution_id AND
(expire_at > CurrentUtcTimestamp() OR expire_at IS NULL);

DELETE
FROM `.metadata/script_execution_leases`
WHERE database = $database AND execution_id = $execution_id;
)";

NYdb::TParamsBuilder params;
Expand All @@ -860,7 +881,7 @@ class TForgetScriptExecutionOperationQueryActor : public TQueryBase {
.Build();

RunDataQuery(sql, &params);
SetQueryResultHandler(&TForgetScriptExecutionOperationQueryActor::OnGetResultsInfo, "Forget script execution operation");
SetQueryResultHandler(&TForgetScriptExecutionOperationQueryActor::OnGetResultsInfo, "Get results info");
}

void OnGetResultsInfo() {
Expand Down Expand Up @@ -891,7 +912,6 @@ class TForgetScriptExecutionOperationQueryActor : public TQueryBase {
}
MaxRowId = *maxRowId;

ClearTimeInfo();
DeleteScriptResults();
}

Expand Down Expand Up @@ -937,34 +957,34 @@ class TForgetScriptExecutionOperationQueryActor : public TQueryBase {
return;
}

if (TInstant::Now() + 2 * GetAverageTime() >= Deadline) {
Finish(Ydb::StatusIds::TIMEOUT, ForgetOperationTimeoutIssues());
return;
}

DeleteScriptResults();
}

void OnFinish(Ydb::StatusIds::StatusCode status, NYql::TIssues&& issues) override {
Send(Owner, new TEvForgetScriptExecutionOperationResponse(status, std::move(issues)));
SendResponse(status, std::move(issues));
}

static NYql::TIssues ForgetOperationTimeoutIssues() {
return { NYql::TIssue("Forget script execution operation timeout") };
private:
void SendResponse(Ydb::StatusIds::StatusCode status, NYql::TIssues issues) {
if (ResponseSent) {
return;
}
ResponseSent = true;
Send(Owner, new TEvForgetScriptExecutionOperationResponse(status, std::move(issues)));
}

private:
TString ExecutionId;
TString Database;
TInstant Deadline;
const TString ExecutionId;
const TString Database;
i64 NumberRowsInBatch = 0;
i64 MaxRowId = 0;
bool ResponseSent = false;
};

class TForgetScriptExecutionOperationActor : public TActorBootstrapped<TForgetScriptExecutionOperationActor> {
public:
using TForgetOperationRetryActor = TQueryRetryActor<TForgetScriptExecutionOperationQueryActor, TEvForgetScriptExecutionOperationResponse, TString, TString, TInstant>;
using TForgetOperationRetryActor = TQueryRetryActor<TForgetScriptExecutionOperationQueryActor, TEvForgetScriptExecutionOperationResponse, TString, TString>;

public:
explicit TForgetScriptExecutionOperationActor(TEvForgetScriptExecutionOperation::TPtr ev)
: Request(std::move(ev))
{}
Expand Down Expand Up @@ -1002,19 +1022,7 @@ class TForgetScriptExecutionOperationActor : public TActorBootstrapped<TForgetSc
}

KQP_PROXY_LOG_D("[TForgetScriptExecutionOperationActor] ExecutionId: " << ExecutionId << ", lease check success. Start TForgetOperationRetryActor");

TDuration minDelay = TDuration::MilliSeconds(10);
TDuration maxTime = Request->Get()->Deadline - TInstant::Now();
if (maxTime <= minDelay) {
Reply(Ydb::StatusIds::TIMEOUT, TForgetScriptExecutionOperationQueryActor::ForgetOperationTimeoutIssues());
return;
}

Register(new TForgetOperationRetryActor(
SelfId(),
TForgetOperationRetryActor::IRetryPolicy::GetExponentialBackoffPolicy(TForgetOperationRetryActor::Retryable, minDelay, TDuration::MilliSeconds(200), TDuration::Seconds(1), std::numeric_limits<size_t>::max(), maxTime),
ExecutionId, Request->Get()->Database, TInstant::Now() + maxTime
));
Register(new TForgetOperationRetryActor(SelfId(), ExecutionId, Request->Get()->Database));
}

void Handle(TEvForgetScriptExecutionOperationResponse::TPtr& ev) {
Expand Down Expand Up @@ -1042,7 +1050,7 @@ class TForgetScriptExecutionOperationActor : public TActorBootstrapped<TForgetSc
}

private:
TEvForgetScriptExecutionOperation::TPtr Request;
const TEvForgetScriptExecutionOperation::TPtr Request;
TString ExecutionId;
bool ExecutionEntryExists = true;
};
Expand Down
33 changes: 22 additions & 11 deletions ydb/core/kqp/ut/federated_query/s3/kqp_federated_query_ut.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1752,22 +1752,33 @@ Y_UNIT_TEST_SUITE(KqpFederatedQuery) {
UNIT_ASSERT_VALUES_EQUAL(rowsFetched, numberRows);

// Test forget operation
TInstant forgetOperationTimeout = TInstant::Now() + NSan::PlainOrUnderSanitizer(TDuration::Minutes(5), TDuration::Minutes(20));
NYdb::NOperation::TOperationClient operationClient(kikimr->GetDriver());
while (TInstant::Now() < forgetOperationTimeout) {
auto status = operationClient.Forget(scriptExecutionOperation.Id()).ExtractValueSync();
if (status.GetStatus() == NYdb::EStatus::SUCCESS || status.GetStatus() == NYdb::EStatus::NOT_FOUND) {
return;
}
auto status = operationClient.Forget(scriptExecutionOperation.Id()).ExtractValueSync();
UNIT_ASSERT_C(status.IsSuccess(), status.GetIssues().ToOneLineString());

const TString countResultsQuery = fmt::format(R"(
SELECT COUNT(*)
FROM `.metadata/result_sets`
WHERE execution_id = "{execution_id}" AND expire_at > CurrentUtcTimestamp();
)", "execution_id"_a=readyOp.Metadata().ExecutionId);

TInstant forgetChecksStart = TInstant::Now();
while (TInstant::Now() - forgetChecksStart <= TDuration::Minutes(5)) {
NYdb::NTable::TDataQueryResult result = session.ExecuteDataQuery(countResultsQuery, NYdb::NTable::TTxControl::BeginTx().CommitTx()).ExtractValueSync();
UNIT_ASSERT_C(result.IsSuccess(), result.GetIssues().ToString());

UNIT_ASSERT_C(status.GetStatus() == NYdb::EStatus::ABORTED || status.GetStatus() == NYdb::EStatus::TIMEOUT || status.GetStatus() == NYdb::EStatus::CLIENT_DEADLINE_EXCEEDED, status.GetIssues().ToString());
auto resultSet = result.GetResultSetParser(0);
resultSet.TryNextRow();

if (status.GetStatus() == NYdb::EStatus::CLIENT_DEADLINE_EXCEEDED) {
// Wait until last forget is not finished
Sleep(TDuration::Seconds(30));
ui64 numberRows = resultSet.ColumnParser(0).GetUint64();
if (!numberRows) {
return;
}

Cerr << "Rows remains: " << numberRows << ", elapsed time: " << TInstant::Now() - forgetChecksStart << "\n";
Sleep(TDuration::Seconds(1));
}
UNIT_ASSERT_C(false, "Forget operation timeout");
UNIT_ASSERT_C(false, "Results removing timeout");
}

Y_UNIT_TEST(ExecuteScriptWithLargeStrings) {
Expand Down
4 changes: 4 additions & 0 deletions ydb/tests/tools/kqprun/src/kqp_runner.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -199,6 +199,10 @@ class TKqpRunner::TImpl {
return false;
}

if (!status.Issues.Empty()) {
Cerr << CerrColors_.Red() << "Forget operation finished with issues:" << CerrColors_.Default() << Endl << status.Issues.ToString() << Endl;
}

return true;
}

Expand Down
2 changes: 1 addition & 1 deletion ydb/tests/tools/kqprun/src/ydb_setup.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -256,7 +256,7 @@ class TYdbSetup::TImpl {

NKikimr::NKqp::TEvForgetScriptExecutionOperationResponse::TPtr ForgetScriptExecutionOperationRequest(const TString& operation) const {
NKikimr::NOperationId::TOperationId operationId(operation);
auto event = MakeHolder<NKikimr::NKqp::TEvForgetScriptExecutionOperation>(Settings_.DomainName, operationId, TInstant::Max());
auto event = MakeHolder<NKikimr::NKqp::TEvForgetScriptExecutionOperation>(Settings_.DomainName, operationId);

return RunKqpProxyRequest<NKikimr::NKqp::TEvForgetScriptExecutionOperation, NKikimr::NKqp::TEvForgetScriptExecutionOperationResponse>(std::move(event));
}
Expand Down
Loading