From 24d1dfeff8df8b65dfa9f52f21731ca98772ad83 Mon Sep 17 00:00:00 2001 From: Gigoriy Pisarenko Date: Wed, 31 Jan 2024 09:52:55 +0000 Subject: [PATCH 1/2] Added retries for internal queries --- .../kqp/common/events/script_executions.h | 89 +++-- .../kqp/executer_actor/kqp_data_executer.cpp | 4 +- .../kqp_finalize_script_actor.cpp | 22 +- .../kqp_finalize_script_service.cpp | 7 +- .../proxy_service/kqp_script_executions.cpp | 313 +++++++++--------- .../kqp/proxy_service/kqp_script_executions.h | 4 +- 6 files changed, 227 insertions(+), 212 deletions(-) diff --git a/ydb/core/kqp/common/events/script_executions.h b/ydb/core/kqp/common/events/script_executions.h index 6b2b331e368e..f5157a1a10b2 100644 --- a/ydb/core/kqp/common/events/script_executions.h +++ b/ydb/core/kqp/common/events/script_executions.h @@ -221,20 +221,28 @@ struct TEvFetchScriptResultsQueryResponse : public NActors::TEventLocal { + struct TDescription { + TDescription(const TString& executionId, const TString& database, const TString& customerSuppliedId, const TString& userToken) + : ExecutionId(executionId) + , Database(database) + , CustomerSuppliedId(customerSuppliedId) + , UserToken(userToken) + {} + + TString ExecutionId; + TString Database; + + TString CustomerSuppliedId; + TString UserToken; + std::vector Sinks; + std::vector SecretNames; + }; + TEvSaveScriptExternalEffectRequest(const TString& executionId, const TString& database, const TString& customerSuppliedId, const TString& userToken) - : ExecutionId(executionId) - , Database(database) - , CustomerSuppliedId(customerSuppliedId) - , UserToken(userToken) + : Description(executionId, database, customerSuppliedId, userToken) {} - TString ExecutionId; - TString Database; - - TString CustomerSuppliedId; - TString UserToken; - std::vector Sinks; - std::vector SecretNames; + TDescription Description; }; struct TEvSaveScriptExternalEffectResponse : public NActors::TEventLocal { @@ -248,31 +256,41 @@ struct TEvSaveScriptExternalEffectResponse : public NActors::TEventLocal { + struct TDescription { + TDescription(EFinalizationStatus finalizationStatus, const TString& executionId, const TString& database, + Ydb::StatusIds::StatusCode operationStatus, Ydb::Query::ExecStatus execStatus, NYql::TIssues issues, std::optional queryStats, + std::optional queryPlan, std::optional queryAst, std::optional leaseGeneration) + : FinalizationStatus(finalizationStatus) + , ExecutionId(executionId) + , Database(database) + , OperationStatus(operationStatus) + , ExecStatus(execStatus) + , Issues(std::move(issues)) + , QueryStats(std::move(queryStats)) + , QueryPlan(std::move(queryPlan)) + , QueryAst(std::move(queryAst)) + , LeaseGeneration(leaseGeneration) + {} + + EFinalizationStatus FinalizationStatus; + TString ExecutionId; + TString Database; + Ydb::StatusIds::StatusCode OperationStatus; + Ydb::Query::ExecStatus ExecStatus; + NYql::TIssues Issues; + std::optional QueryStats; + std::optional QueryPlan; + std::optional QueryAst; + std::optional LeaseGeneration; + }; + TEvScriptFinalizeRequest(EFinalizationStatus finalizationStatus, const TString& executionId, const TString& database, Ydb::StatusIds::StatusCode operationStatus, Ydb::Query::ExecStatus execStatus, NYql::TIssues issues = {}, std::optional queryStats = std::nullopt, std::optional queryPlan = std::nullopt, std::optional queryAst = std::nullopt, std::optional leaseGeneration = std::nullopt) - : FinalizationStatus(finalizationStatus) - , ExecutionId(executionId) - , Database(database) - , OperationStatus(operationStatus) - , ExecStatus(execStatus) - , Issues(std::move(issues)) - , QueryStats(std::move(queryStats)) - , QueryPlan(std::move(queryPlan)) - , QueryAst(std::move(queryAst)) - , LeaseGeneration(leaseGeneration) + : Description(finalizationStatus, executionId, database, operationStatus, execStatus, issues, queryStats, queryPlan, queryAst, leaseGeneration) {} - EFinalizationStatus FinalizationStatus; - TString ExecutionId; - TString Database; - Ydb::StatusIds::StatusCode OperationStatus; - Ydb::Query::ExecStatus ExecStatus; - NYql::TIssues Issues; - std::optional QueryStats; - std::optional QueryPlan; - std::optional QueryAst; - std::optional LeaseGeneration; + TDescription Description; }; struct TEvScriptFinalizeResponse : public NActors::TEventLocal { @@ -284,15 +302,14 @@ struct TEvScriptFinalizeResponse : public NActors::TEventLocal { - TEvSaveScriptFinalStatusResponse(const TString& customerSuppliedId, const TString& userToken) - : CustomerSuppliedId(customerSuppliedId) - , UserToken(userToken) - {} - + bool ApplicateScriptExternalEffectRequired = false; + bool OperationAlreadyFinalized = false; TString CustomerSuppliedId; TString UserToken; std::vector Sinks; std::vector SecretNames; + Ydb::StatusIds::StatusCode Status; + NYql::TIssues Issues; }; struct TEvDescribeSecretsResponse : public NActors::TEventLocal { diff --git a/ydb/core/kqp/executer_actor/kqp_data_executer.cpp b/ydb/core/kqp/executer_actor/kqp_data_executer.cpp index 219396b5cdcc..f3a492821f0c 100644 --- a/ydb/core/kqp/executer_actor/kqp_data_executer.cpp +++ b/ydb/core/kqp/executer_actor/kqp_data_executer.cpp @@ -1629,13 +1629,13 @@ class TKqpDataExecuter : public TKqpExecuterBaseSinks.push_back(sink.GetExternalSink()); + scriptExternalEffect->Description.Sinks.push_back(sink.GetExternalSink()); } } } } } - scriptExternalEffect->SecretNames = SecretNames; + scriptExternalEffect->Description.SecretNames = SecretNames; if (!WaitRequired()) { return Execute(); diff --git a/ydb/core/kqp/finalize_script_service/kqp_finalize_script_actor.cpp b/ydb/core/kqp/finalize_script_service/kqp_finalize_script_actor.cpp index 03d2a475bbe3..6ffc4b58b3d2 100644 --- a/ydb/core/kqp/finalize_script_service/kqp_finalize_script_actor.cpp +++ b/ydb/core/kqp/finalize_script_service/kqp_finalize_script_actor.cpp @@ -22,9 +22,9 @@ class TScriptFinalizerActor : public TActorBootstrapped { const NKikimrConfig::TMetadataProviderConfig& metadataProviderConfig, const std::optional& federatedQuerySetup) : ReplyActor_(request->Sender) - , ExecutionId_(request->Get()->ExecutionId) - , Database_(request->Get()->Database) - , FinalizationStatus_(request->Get()->FinalizationStatus) + , ExecutionId_(request->Get()->Description.ExecutionId) + , Database_(request->Get()->Description.Database) + , FinalizationStatus_(request->Get()->Description.FinalizationStatus) , Request_(std::move(request)) , FinalizationTimeout_(TDuration::Seconds(finalizeScriptServiceConfig.GetScriptFinalizationTimeoutSeconds())) , MaximalSecretsSnapshotWaitTime_(2 * TDuration::Seconds(metadataProviderConfig.GetRefreshPeriodSeconds())) @@ -32,16 +32,20 @@ class TScriptFinalizerActor : public TActorBootstrapped { {} void Bootstrap() { - Register(CreateSaveScriptFinalStatusActor(std::move(Request_))); + Register(CreateSaveScriptFinalStatusActor(SelfId(), std::move(Request_))); Become(&TScriptFinalizerActor::FetchState); } STRICT_STFUNC(FetchState, hFunc(TEvSaveScriptFinalStatusResponse, Handle); - hFunc(TEvScriptExecutionFinished, Handle); ) void Handle(TEvSaveScriptFinalStatusResponse::TPtr& ev) { + if (!ev->Get()->ApplicateScriptExternalEffectRequired || ev->Get()->Status != Ydb::StatusIds::SUCCESS) { + Reply(ev->Get()->OperationAlreadyFinalized, ev->Get()->Status, std::move(ev->Get()->Issues)); + return; + } + Schedule(FinalizationTimeout_, new TEvents::TEvWakeup()); Become(&TScriptFinalizerActor::PrepareState); @@ -168,7 +172,7 @@ class TScriptFinalizerActor : public TActorBootstrapped { ) void FinishScriptFinalization(std::optional status, NYql::TIssues issues) { - Register(CreateScriptFinalizationFinisherActor(ExecutionId_, Database_, status, std::move(issues))); + Register(CreateScriptFinalizationFinisherActor(SelfId(), ExecutionId_, Database_, status, std::move(issues))); Become(&TScriptFinalizerActor::FinishState); } @@ -181,7 +185,11 @@ class TScriptFinalizerActor : public TActorBootstrapped { } void Handle(TEvScriptExecutionFinished::TPtr& ev) { - Send(ReplyActor_, ev->Release()); + Reply(ev->Get()->OperationAlreadyFinalized, ev->Get()->Status, std::move(ev->Get()->Issues)); + } + + void Reply(bool operationAlreadyFinalized, Ydb::StatusIds::StatusCode status, NYql::TIssues&& issues) { + Send(ReplyActor_, new TEvScriptExecutionFinished(operationAlreadyFinalized, status, std::move(issues))); Send(MakeKqpFinalizeScriptServiceId(SelfId().NodeId()), new TEvScriptFinalizeResponse(ExecutionId_)); PassAway(); diff --git a/ydb/core/kqp/finalize_script_service/kqp_finalize_script_service.cpp b/ydb/core/kqp/finalize_script_service/kqp_finalize_script_service.cpp index cf6c66d5b597..6c0868e4c42b 100644 --- a/ydb/core/kqp/finalize_script_service/kqp_finalize_script_service.cpp +++ b/ydb/core/kqp/finalize_script_service/kqp_finalize_script_service.cpp @@ -27,9 +27,10 @@ class TKqpFinalizeScriptService : public TActorBootstrappedGet()->Sinks = FilterExternalSinks(ev->Get()->Sinks); + auto& description = ev->Get()->Description; + description.Sinks = FilterExternalSinks(description.Sinks); - if (!ev->Get()->Sinks.empty()) { + if (!description.Sinks.empty()) { Register(CreateSaveScriptExternalEffectActor(std::move(ev))); } else { Send(ev->Sender, new TEvSaveScriptExternalEffectResponse(Ydb::StatusIds::SUCCESS, {})); @@ -37,7 +38,7 @@ class TKqpFinalizeScriptService : public TActorBootstrappedGet()->ExecutionId; + TString executionId = ev->Get()->Description.ExecutionId; if (!FinalizationRequestsQueue_.contains(executionId)) { WaitingFinalizationExecutions_.push(executionId); diff --git a/ydb/core/kqp/proxy_service/kqp_script_executions.cpp b/ydb/core/kqp/proxy_service/kqp_script_executions.cpp index 56a9dae3b58a..728f6d6c20ea 100644 --- a/ydb/core/kqp/proxy_service/kqp_script_executions.cpp +++ b/ydb/core/kqp/proxy_service/kqp_script_executions.cpp @@ -77,6 +77,96 @@ class TQueryBase : public NKikimr::TQueryBase { {} }; +template +class TQueryRetryActor : public TActorBootstrapped> { +public: + using TBase = TActorBootstrapped>; + using IRetryPolicy = IRetryPolicy; + + explicit TQueryRetryActor(const TActorId& replyActorId, const TArgs&... args, TDuration maxRetryTime = TDuration::Seconds(30)) + : ReplyActorId(replyActorId) + , CreateQueryActor([=]() { + return new TQueryActor(args...); + }) + , MaxRetryTime(maxRetryTime) + {} + + void StartQueryActor() const { + TBase::Register(CreateQueryActor()); + } + + void Bootstrap() { + TBase::Become(&TQueryRetryActor::StateFunc); + StartQueryActor(); + } + + STRICT_STFUNC(StateFunc, + hFunc(TEvents::TEvWakeup, Wakeup); + hFunc(TResponse, Handle); + ) + + void Wakeup(TEvents::TEvWakeup::TPtr&) { + StartQueryActor(); + } + + void Handle(const typename TResponse::TPtr& ev) { + const Ydb::StatusIds::StatusCode status = ev->Get()->Status; + if (Retryable(status) == ERetryErrorClass::NoRetry) { + Reply(ev); + return; + } + + if (RetryState == nullptr) { + CreateRetryState(); + } + + if (auto delay = RetryState->GetNextRetryDelay(status)) { + TBase::Schedule(*delay, new TEvents::TEvWakeup()); + } else { + Reply(ev); + } + } + + void Reply(const typename TResponse::TPtr& ev) { + TBase::Send(ev->Forward(ReplyActorId)); + TBase::PassAway(); + } + + static ERetryErrorClass Retryable(Ydb::StatusIds::StatusCode status) { + if (status == Ydb::StatusIds::SUCCESS) { + return ERetryErrorClass::NoRetry; + } + + if (status == Ydb::StatusIds::INTERNAL_ERROR + || status == Ydb::StatusIds::UNAVAILABLE + || status == Ydb::StatusIds::BAD_SESSION + || status == Ydb::StatusIds::SESSION_EXPIRED + || status == Ydb::StatusIds::SESSION_BUSY + || status == Ydb::StatusIds::TIMEOUT + || status == Ydb::StatusIds::ABORTED) { + return ERetryErrorClass::ShortRetry; + } + + if (status == Ydb::StatusIds::OVERLOADED + || status == Ydb::StatusIds::UNDETERMINED) { + return ERetryErrorClass::LongRetry; + } + + return ERetryErrorClass::NoRetry; + } + + void CreateRetryState() { + IRetryPolicy::TPtr policy = IRetryPolicy::GetExponentialBackoffPolicy(Retryable, TDuration::MilliSeconds(10), TDuration::MilliSeconds(200), TDuration::Seconds(1), std::numeric_limits::max(), MaxRetryTime); + RetryState = policy->CreateRetryState(); + } + +private: + const TActorId ReplyActorId; + const std::function CreateQueryActor; + const TDuration MaxRetryTime; + IRetryPolicy::IRetryState::TPtr RetryState = nullptr; +}; + class TScriptExecutionsTablesCreator : public TActorBootstrapped { public: @@ -478,8 +568,6 @@ class TScriptLeaseUpdater : public TQueryBase { class TScriptLeaseUpdateActor : public TActorBootstrapped { public: - using IRetryPolicy = IRetryPolicy; - TScriptLeaseUpdateActor(const TActorId& runScriptActorId, const TString& database, const TString& executionId, TDuration leaseDuration, TIntrusivePtr counters) : RunScriptActorId(runScriptActorId) , Database(database) @@ -489,44 +577,16 @@ class TScriptLeaseUpdateActor : public TActorBootstrapped(SelfId(), Database, ExecutionId, LeaseDuration, LeaseDuration / 2)); Become(&TScriptLeaseUpdateActor::StateFunc); } STRICT_STFUNC(StateFunc, hFunc(TEvScriptLeaseUpdateResponse, Handle); - hFunc(NActors::TEvents::TEvWakeup, Wakeup); ) - void Wakeup(NActors::TEvents::TEvWakeup::TPtr&) { - CreateScriptLeaseUpdater(); - } - void Handle(TEvScriptLeaseUpdateResponse::TPtr& ev) { - auto queryStatus = ev->Get()->Status; - if (!ev->Get()->ExecutionEntryExists && queryStatus == Ydb::StatusIds::BAD_REQUEST || queryStatus == Ydb::StatusIds::SUCCESS) { - Reply(std::move(ev)); - return; - } - - if (RetryState == nullptr) { - CreateRetryState(); - } - - const TMaybe delay = RetryState->GetNextRetryDelay(queryStatus); - if (delay) { - Schedule(*delay, new NActors::TEvents::TEvWakeup()); - } else { - Reply(std::move(ev)); - } - } - - void Reply(TEvScriptLeaseUpdateResponse::TPtr&& ev) { if (Counters) { Counters->ReportLeaseUpdateLatency(TInstant::Now() - LeaseUpdateStartTime); } @@ -534,33 +594,6 @@ class TScriptLeaseUpdateActor : public TActorBootstrapped::max(), LeaseDuration / 2); - RetryState = policy->CreateRetryState(); - } - private: TActorId RunScriptActorId; TString Database; @@ -568,7 +601,6 @@ class TScriptLeaseUpdateActor : public TActorBootstrapped Counters; TInstant LeaseUpdateStartTime; - IRetryPolicy::IRetryState::TPtr RetryState = nullptr; }; class TCheckLeaseStatusActorBase : public TActorBootstrapped { @@ -646,9 +678,9 @@ class TCheckLeaseStatusActorBase : public TActorBootstrappedOperationStatus; - FinalExecStatus = ScriptFinalizeRequest->ExecStatus; - FinalIssues = ScriptFinalizeRequest->Issues; + FinalOperationStatus = ScriptFinalizeRequest->Description.OperationStatus; + FinalExecStatus = ScriptFinalizeRequest->Description.ExecStatus; + FinalIssues = ScriptFinalizeRequest->Description.Issues; Send(MakeKqpFinalizeScriptServiceId(SelfId().NodeId()), ScriptFinalizeRequest.release()); } @@ -1756,43 +1788,9 @@ class TSaveScriptExecutionResultMetaQuery : public TQueryBase { const TString SerializedMetas; }; -class TSaveScriptExecutionResultMetaActor : public TActorBootstrapped { -public: - TSaveScriptExecutionResultMetaActor(const NActors::TActorId& replyActorId, const TString& database, const TString& executionId, const TString& serializedMetas) - : ReplyActorId(replyActorId), Database(database), ExecutionId(executionId), SerializedMetas(serializedMetas) - { - } - - void Bootstrap() { - Register(new TSaveScriptExecutionResultMetaQuery(Database, ExecutionId, SerializedMetas)); - - Become(&TSaveScriptExecutionResultMetaActor::StateFunc); - } - - STRICT_STFUNC(StateFunc, - hFunc(TEvSaveScriptResultMetaFinished, Handle); - ) - - void Handle(TEvSaveScriptResultMetaFinished::TPtr& ev) { - if (ev->Get()->Status == Ydb::StatusIds::ABORTED) { - Register(new TSaveScriptExecutionResultMetaQuery(Database, ExecutionId, SerializedMetas)); - return; - } - - Send(ev->Forward(ReplyActorId)); - PassAway(); - } - -private: - const NActors::TActorId ReplyActorId; - const TString Database; - const TString ExecutionId; - const TString SerializedMetas; -}; - class TSaveScriptExecutionResultQuery : public TQueryBase { public: - TSaveScriptExecutionResultQuery(const TString& database, const TString& executionId, i32 resultSetId, TMaybe expireAt, i64 firstRow, Ydb::ResultSet&& resultSet) + TSaveScriptExecutionResultQuery(const TString& database, const TString& executionId, i32 resultSetId, TMaybe expireAt, i64 firstRow, Ydb::ResultSet resultSet) : Database(database), ExecutionId(executionId), ResultSetId(resultSetId), ExpireAt(expireAt), FirstRow(firstRow), ResultSet(std::move(resultSet)) { } @@ -1895,7 +1893,7 @@ class TSaveScriptExecutionResultActor : public TActorBootstrapped, i64, Ydb::ResultSet>(SelfId(), Database, ExecutionId, ResultSetId, ExpireAt, FirstRow, ResultSets.back())); FirstRow += numberRows; ResultSets.pop_back(); @@ -2203,8 +2201,8 @@ class TGetScriptExecutionResultActor : public TActorBootstrappedGet()->Database) + .Utf8(Request.Database) .Build() .AddParam("$execution_id") - .Utf8(Request->Get()->ExecutionId) + .Utf8(Request.ExecutionId) .Build() .AddParam("$customer_supplied_id") - .Utf8(Request->Get()->CustomerSuppliedId) + .Utf8(Request.CustomerSuppliedId) .Build() .AddParam("$user_token") - .Utf8(Request->Get()->UserToken) + .Utf8(Request.UserToken) .Build() .AddParam("$script_sinks") - .JsonDocument(SerializeSinks(Request->Get()->Sinks)) + .JsonDocument(SerializeSinks(Request.Sinks)) .Build() .AddParam("$script_secret_names") - .JsonDocument(SerializeSecretNames(Request->Get()->SecretNames)) + .JsonDocument(SerializeSecretNames(Request.SecretNames)) .Build(); RunDataQuery(sql, ¶ms); @@ -2255,7 +2253,7 @@ class TSaveScriptExternalEffectActor : public TQueryBase { } void OnFinish(Ydb::StatusIds::StatusCode status, NYql::TIssues&& issues) override { - Send(Request->Sender, new TEvSaveScriptExternalEffectResponse(status, std::move(issues))); + Send(Owner, new TEvSaveScriptExternalEffectResponse(status, std::move(issues))); } private: @@ -2292,14 +2290,16 @@ class TSaveScriptExternalEffectActor : public TQueryBase { } private: - TEvSaveScriptExternalEffectRequest::TPtr Request; + TEvSaveScriptExternalEffectRequest::TDescription Request; }; class TSaveScriptFinalStatusActor : public TQueryBase { public: - explicit TSaveScriptFinalStatusActor(TEvScriptFinalizeRequest::TPtr ev) - : Request(ev) - {} + explicit TSaveScriptFinalStatusActor(const TEvScriptFinalizeRequest::TDescription& request) + : Request(request) + { + Response = std::make_unique(); + } void OnRunQuery() override { TString sql = R"( @@ -2328,10 +2328,10 @@ class TSaveScriptFinalStatusActor : public TQueryBase { NYdb::TParamsBuilder params; params .AddParam("$database") - .Utf8(Request->Get()->Database) + .Utf8(Request.Database) .Build() .AddParam("$execution_id") - .Utf8(Request->Get()->ExecutionId) + .Utf8(Request.ExecutionId) .Build(); RunDataQuery(sql, ¶ms, TTxControl::BeginTx()); @@ -2354,16 +2354,16 @@ class TSaveScriptFinalStatusActor : public TQueryBase { TMaybe finalizationStatus = result.ColumnParser("finalization_status").GetOptionalInt32(); if (finalizationStatus) { - if (Request->Get()->FinalizationStatus != *finalizationStatus) { + if (Request.FinalizationStatus != *finalizationStatus) { Finish(Ydb::StatusIds::PRECONDITION_FAILED, "Execution already have different finalization status"); return; } - ApplicateScriptExternalEffectRequired = true; + Response->ApplicateScriptExternalEffectRequired = true; } TMaybe operationStatus = result.ColumnParser("operation_status").GetOptionalInt32(); - if (Request->Get()->LeaseGeneration && !operationStatus) { + if (Request.LeaseGeneration && !operationStatus) { NYdb::TResultSetParser leaseResult(ResultSets[1]); if (leaseResult.RowsCount() == 0) { Finish(Ydb::StatusIds::INTERNAL_ERROR, "Unexpected operation state"); @@ -2378,7 +2378,7 @@ class TSaveScriptFinalStatusActor : public TQueryBase { return; } - if (*Request->Get()->LeaseGeneration != static_cast(*leaseGenerationInDatabase)) { + if (*Request.LeaseGeneration != static_cast(*leaseGenerationInDatabase)) { Finish(Ydb::StatusIds::PRECONDITION_FAILED, "Lease was lost"); return; } @@ -2386,12 +2386,12 @@ class TSaveScriptFinalStatusActor : public TQueryBase { TMaybe customerSuppliedId = result.ColumnParser("customer_supplied_id").GetOptionalUtf8(); if (customerSuppliedId) { - CustomerSuppliedId = *customerSuppliedId; + Response->CustomerSuppliedId = *customerSuppliedId; } TMaybe userToken = result.ColumnParser("user_token").GetOptionalUtf8(); if (userToken) { - UserToken = *userToken; + Response->UserToken = *userToken; } SerializedSinks = result.ColumnParser("script_sinks").GetOptionalJsonDocument(); @@ -2408,7 +2408,7 @@ class TSaveScriptFinalStatusActor : public TQueryBase { NKqpProto::TKqpExternalSink sink; NProtobufJson::Json2Proto(*serializedSink, sink); - Sinks.push_back(sink); + Response->Sinks.push_back(sink); } } @@ -2424,7 +2424,7 @@ class TSaveScriptFinalStatusActor : public TQueryBase { const NJson::TJsonValue* serializedSecretName; value.GetValuePointer(i, &serializedSecretName); - SecretNames.push_back(serializedSecretName->GetString()); + Response->SecretNames.push_back(serializedSecretName->GetString()); } } @@ -2443,12 +2443,12 @@ class TSaveScriptFinalStatusActor : public TQueryBase { if (operationStatus) { FinalStatusAlreadySaved = true; - OperationAlreadyFinalized = !finalizationStatus; + Response->OperationAlreadyFinalized = !finalizationStatus; CommitTransaction(); return; } - ApplicateScriptExternalEffectRequired = ApplicateScriptExternalEffectRequired || HasExternalEffect(); + Response->ApplicateScriptExternalEffectRequired = Response->ApplicateScriptExternalEffectRequired || HasExternalEffect(); FinishScriptExecution(); } @@ -2493,10 +2493,10 @@ class TSaveScriptFinalStatusActor : public TQueryBase { )"; TString serializedStats = "{}"; - if (Request->Get()->QueryStats) { + if (Request.QueryStats) { NJson::TJsonValue statsJson; Ydb::TableStats::QueryStats queryStats; - NGRpcService::FillQueryStats(queryStats, *Request->Get()->QueryStats); + NGRpcService::FillQueryStats(queryStats, *Request.QueryStats); NProtobufJson::Proto2Json(queryStats, statsJson, NProtobufJson::TProto2JsonConfig()); serializedStats = NJson::WriteJson(statsJson); } @@ -2504,40 +2504,40 @@ class TSaveScriptFinalStatusActor : public TQueryBase { NYdb::TParamsBuilder params; params .AddParam("$database") - .Utf8(Request->Get()->Database) + .Utf8(Request.Database) .Build() .AddParam("$execution_id") - .Utf8(Request->Get()->ExecutionId) + .Utf8(Request.ExecutionId) .Build() .AddParam("$operation_status") - .Int32(Request->Get()->OperationStatus) + .Int32(Request.OperationStatus) .Build() .AddParam("$execution_status") - .Int32(Request->Get()->ExecStatus) + .Int32(Request.ExecStatus) .Build() .AddParam("$finalization_status") - .Int32(Request->Get()->FinalizationStatus) + .Int32(Request.FinalizationStatus) .Build() .AddParam("$issues") - .JsonDocument(SerializeIssues(Request->Get()->Issues)) + .JsonDocument(SerializeIssues(Request.Issues)) .Build() .AddParam("$plan") - .JsonDocument(Request->Get()->QueryPlan.value_or("{}")) + .JsonDocument(Request.QueryPlan.value_or("{}")) .Build() .AddParam("$stats") .JsonDocument(serializedStats) .Build() .AddParam("$ast") - .Utf8(Request->Get()->QueryAst.value_or("")) + .Utf8(Request.QueryAst.value_or("")) .Build() .AddParam("$operation_ttl") .Interval(static_cast(OperationTtl.MicroSeconds())) .Build() .AddParam("$customer_supplied_id") - .Utf8(CustomerSuppliedId) + .Utf8(Response->CustomerSuppliedId) .Build() .AddParam("$user_token") - .Utf8(UserToken) + .Utf8(Response->UserToken) .Build() .AddParam("$script_sinks") .OptionalJsonDocument(SerializedSinks) @@ -2546,7 +2546,7 @@ class TSaveScriptFinalStatusActor : public TQueryBase { .OptionalJsonDocument(SerializedSecretNames) .Build() .AddParam("$applicate_script_external_effect_required") - .Bool(ApplicateScriptExternalEffectRequired) + .Bool(Response->ApplicateScriptExternalEffectRequired) .Build(); RunDataQuery(sql, ¶ms, TTxControl::ContinueAndCommitTx()); @@ -2559,42 +2559,31 @@ class TSaveScriptFinalStatusActor : public TQueryBase { void OnFinish(Ydb::StatusIds::StatusCode status, NYql::TIssues&& issues) override { if (!FinalStatusAlreadySaved) { - KQP_PROXY_LOG_D("Finish script execution operation. ExecutionId: " << Request->Get()->ExecutionId - << ". " << Ydb::StatusIds::StatusCode_Name(Request->Get()->OperationStatus) - << ". Issues: " << Request->Get()->Issues.ToOneLineString() << ". Plan: " << Request->Get()->QueryPlan.value_or("")); - } - - if (!ApplicateScriptExternalEffectRequired || status != Ydb::StatusIds::SUCCESS) { - Send(Owner, new TEvScriptExecutionFinished(OperationAlreadyFinalized, status, issues)); - return; + KQP_PROXY_LOG_D("Finish script execution operation. ExecutionId: " << Request.ExecutionId + << ". " << Ydb::StatusIds::StatusCode_Name(Request.OperationStatus) + << ". Issues: " << Request.Issues.ToOneLineString() << ". Plan: " << Request.QueryPlan.value_or("")); } - auto response = std::make_unique(CustomerSuppliedId, UserToken); - response->Sinks = std::move(Sinks); - response->SecretNames = std::move(SecretNames); + Response->Status = status; + Response->Issues = std::move(issues); - Send(Owner, response.release()); + Send(Owner, Response.release()); } private: bool HasExternalEffect() const { - return !Sinks.empty(); + return !Response->Sinks.empty(); } private: - TEvScriptFinalizeRequest::TPtr Request; + TEvScriptFinalizeRequest::TDescription Request; + std::unique_ptr Response; - bool OperationAlreadyFinalized = false; bool FinalStatusAlreadySaved = false; - bool ApplicateScriptExternalEffectRequired = false; TDuration OperationTtl; - TString CustomerSuppliedId; - TString UserToken; TMaybe SerializedSinks; - std::vector Sinks; TMaybe SerializedSecretNames; - std::vector SecretNames; }; class TScriptFinalizationFinisherActor : public TQueryBase { @@ -2830,7 +2819,7 @@ NActors::IActor* CreateScriptLeaseUpdateActor(const TActorId& runScriptActorId, } NActors::IActor* CreateSaveScriptExecutionResultMetaActor(const NActors::TActorId& runScriptActorId, const TString& database, const TString& executionId, const TString& serializedMeta) { - return new TSaveScriptExecutionResultMetaActor(runScriptActorId, database, executionId, serializedMeta); + return new TQueryRetryActor(runScriptActorId, database, executionId, serializedMeta); } NActors::IActor* CreateSaveScriptExecutionResultActor(const NActors::TActorId& runScriptActorId, const TString& database, const TString& executionId, i32 resultSetId, TMaybe expireAt, i64 firstRow, Ydb::ResultSet&& resultSet) { @@ -2842,15 +2831,15 @@ NActors::IActor* CreateGetScriptExecutionResultActor(const NActors::TActorId& re } NActors::IActor* CreateSaveScriptExternalEffectActor(TEvSaveScriptExternalEffectRequest::TPtr ev) { - return new TSaveScriptExternalEffectActor(std::move(ev)); + return new TQueryRetryActor(ev->Sender, ev->Get()->Description); } -NActors::IActor* CreateSaveScriptFinalStatusActor(TEvScriptFinalizeRequest::TPtr ev) { - return new TSaveScriptFinalStatusActor(std::move(ev)); +NActors::IActor* CreateSaveScriptFinalStatusActor(const NActors::TActorId& finalizationActorId, TEvScriptFinalizeRequest::TPtr ev) { + return new TQueryRetryActor(finalizationActorId, ev->Get()->Description); } -NActors::IActor* CreateScriptFinalizationFinisherActor(const TString& executionId, const TString& database, std::optional operationStatus, NYql::TIssues operationIssues) { - return new TScriptFinalizationFinisherActor(executionId, database, operationStatus, std::move(operationIssues)); +NActors::IActor* CreateScriptFinalizationFinisherActor(const NActors::TActorId& finalizationActorId, const TString& executionId, const TString& database, std::optional operationStatus, NYql::TIssues operationIssues) { + return new TQueryRetryActor, NYql::TIssues>(finalizationActorId, executionId, database, operationStatus, operationIssues); } NActors::IActor* CreateScriptProgressActor(const TString& executionId, const TString& database, const TString& queryPlan, const TString& queryStats) { diff --git a/ydb/core/kqp/proxy_service/kqp_script_executions.h b/ydb/core/kqp/proxy_service/kqp_script_executions.h index 5781046a1df7..ea4fae00e842 100644 --- a/ydb/core/kqp/proxy_service/kqp_script_executions.h +++ b/ydb/core/kqp/proxy_service/kqp_script_executions.h @@ -33,8 +33,8 @@ NActors::IActor* CreateGetScriptExecutionResultActor(const NActors::TActorId& re // Compute external effects and updates status in database NActors::IActor* CreateSaveScriptExternalEffectActor(TEvSaveScriptExternalEffectRequest::TPtr ev); -NActors::IActor* CreateSaveScriptFinalStatusActor(TEvScriptFinalizeRequest::TPtr ev); -NActors::IActor* CreateScriptFinalizationFinisherActor(const TString& executionId, const TString& database, std::optional operationStatus, NYql::TIssues operationIssues); +NActors::IActor* CreateSaveScriptFinalStatusActor(const NActors::TActorId& finalizationActorId, TEvScriptFinalizeRequest::TPtr ev); +NActors::IActor* CreateScriptFinalizationFinisherActor(const NActors::TActorId& finalizationActorId, const TString& executionId, const TString& database, std::optional operationStatus, NYql::TIssues operationIssues); NActors::IActor* CreateScriptProgressActor(const TString& executionId, const TString& database, const TString& queryPlan, const TString& queryStats); } // namespace NKikimr::NKqp From 7b1e87a5906684c110376a963bff59897de8f8c1 Mon Sep 17 00:00:00 2001 From: Gigoriy Pisarenko Date: Thu, 1 Feb 2024 08:12:51 +0000 Subject: [PATCH 2/2] Moved logic to query_actor.h --- .../proxy_service/kqp_script_executions.cpp | 90 ---------------- ydb/library/query_actor/query_actor.h | 100 +++++++++++++++++- 2 files changed, 99 insertions(+), 91 deletions(-) diff --git a/ydb/core/kqp/proxy_service/kqp_script_executions.cpp b/ydb/core/kqp/proxy_service/kqp_script_executions.cpp index 728f6d6c20ea..30491e1bb186 100644 --- a/ydb/core/kqp/proxy_service/kqp_script_executions.cpp +++ b/ydb/core/kqp/proxy_service/kqp_script_executions.cpp @@ -77,96 +77,6 @@ class TQueryBase : public NKikimr::TQueryBase { {} }; -template -class TQueryRetryActor : public TActorBootstrapped> { -public: - using TBase = TActorBootstrapped>; - using IRetryPolicy = IRetryPolicy; - - explicit TQueryRetryActor(const TActorId& replyActorId, const TArgs&... args, TDuration maxRetryTime = TDuration::Seconds(30)) - : ReplyActorId(replyActorId) - , CreateQueryActor([=]() { - return new TQueryActor(args...); - }) - , MaxRetryTime(maxRetryTime) - {} - - void StartQueryActor() const { - TBase::Register(CreateQueryActor()); - } - - void Bootstrap() { - TBase::Become(&TQueryRetryActor::StateFunc); - StartQueryActor(); - } - - STRICT_STFUNC(StateFunc, - hFunc(TEvents::TEvWakeup, Wakeup); - hFunc(TResponse, Handle); - ) - - void Wakeup(TEvents::TEvWakeup::TPtr&) { - StartQueryActor(); - } - - void Handle(const typename TResponse::TPtr& ev) { - const Ydb::StatusIds::StatusCode status = ev->Get()->Status; - if (Retryable(status) == ERetryErrorClass::NoRetry) { - Reply(ev); - return; - } - - if (RetryState == nullptr) { - CreateRetryState(); - } - - if (auto delay = RetryState->GetNextRetryDelay(status)) { - TBase::Schedule(*delay, new TEvents::TEvWakeup()); - } else { - Reply(ev); - } - } - - void Reply(const typename TResponse::TPtr& ev) { - TBase::Send(ev->Forward(ReplyActorId)); - TBase::PassAway(); - } - - static ERetryErrorClass Retryable(Ydb::StatusIds::StatusCode status) { - if (status == Ydb::StatusIds::SUCCESS) { - return ERetryErrorClass::NoRetry; - } - - if (status == Ydb::StatusIds::INTERNAL_ERROR - || status == Ydb::StatusIds::UNAVAILABLE - || status == Ydb::StatusIds::BAD_SESSION - || status == Ydb::StatusIds::SESSION_EXPIRED - || status == Ydb::StatusIds::SESSION_BUSY - || status == Ydb::StatusIds::TIMEOUT - || status == Ydb::StatusIds::ABORTED) { - return ERetryErrorClass::ShortRetry; - } - - if (status == Ydb::StatusIds::OVERLOADED - || status == Ydb::StatusIds::UNDETERMINED) { - return ERetryErrorClass::LongRetry; - } - - return ERetryErrorClass::NoRetry; - } - - void CreateRetryState() { - IRetryPolicy::TPtr policy = IRetryPolicy::GetExponentialBackoffPolicy(Retryable, TDuration::MilliSeconds(10), TDuration::MilliSeconds(200), TDuration::Seconds(1), std::numeric_limits::max(), MaxRetryTime); - RetryState = policy->CreateRetryState(); - } - -private: - const TActorId ReplyActorId; - const std::function CreateQueryActor; - const TDuration MaxRetryTime; - IRetryPolicy::IRetryState::TPtr RetryState = nullptr; -}; - class TScriptExecutionsTablesCreator : public TActorBootstrapped { public: diff --git a/ydb/library/query_actor/query_actor.h b/ydb/library/query_actor/query_actor.h index ef47d2300a0a..5d21f2f840e8 100644 --- a/ydb/library/query_actor/query_actor.h +++ b/ydb/library/query_actor/query_actor.h @@ -12,11 +12,12 @@ #include #include #include +#include +#include #include namespace NKikimr { -// TODO: add retry logic class TQueryBase : public NActors::TActorBootstrapped { protected: struct TTxControl { @@ -168,4 +169,101 @@ class TQueryBase : public NActors::TActorBootstrapped { std::vector ResultSets; }; +template +class TQueryRetryActor : public NActors::TActorBootstrapped> { +public: + using TBase = NActors::TActorBootstrapped>; + using IRetryPolicy = IRetryPolicy; + + explicit TQueryRetryActor(const NActors::TActorId& replyActorId, const TArgs&... args, TDuration maxRetryTime = TDuration::Seconds(1)) + : ReplyActorId(replyActorId) + , RetryPolicy(IRetryPolicy::GetExponentialBackoffPolicy( + Retryable, TDuration::MilliSeconds(10), + TDuration::MilliSeconds(200), TDuration::Seconds(1), + std::numeric_limits::max(), maxRetryTime + )) + , CreateQueryActor([=]() { + return new TQueryActor(args...); + }) + {} + + TQueryRetryActor(const NActors::TActorId& replyActorId, IRetryPolicy::TPtr retryPolicy, const TArgs&... args) + : ReplyActorId(replyActorId) + , RetryPolicy(retryPolicy) + , CreateQueryActor([=]() { + return new TQueryActor(args...); + }) + {} + + void StartQueryActor() const { + TBase::Register(CreateQueryActor()); + } + + void Bootstrap() { + TBase::Become(&TQueryRetryActor::StateFunc); + StartQueryActor(); + } + + STRICT_STFUNC(StateFunc, + hFunc(NActors::TEvents::TEvWakeup, Wakeup); + hFunc(TResponse, Handle); + ) + + void Wakeup(NActors::TEvents::TEvWakeup::TPtr&) { + StartQueryActor(); + } + + void Handle(const typename TResponse::TPtr& ev) { + const Ydb::StatusIds::StatusCode status = ev->Get()->Status; + if (Retryable(status) == ERetryErrorClass::NoRetry) { + Reply(ev); + return; + } + + if (RetryState == nullptr) { + RetryState = RetryPolicy->CreateRetryState(); + } + + if (auto delay = RetryState->GetNextRetryDelay(status)) { + TBase::Schedule(*delay, new NActors::TEvents::TEvWakeup()); + } else { + Reply(ev); + } + } + + void Reply(const typename TResponse::TPtr& ev) { + TBase::Send(ev->Forward(ReplyActorId)); + TBase::PassAway(); + } + + static ERetryErrorClass Retryable(Ydb::StatusIds::StatusCode status) { + if (status == Ydb::StatusIds::SUCCESS) { + return ERetryErrorClass::NoRetry; + } + + if (status == Ydb::StatusIds::INTERNAL_ERROR + || status == Ydb::StatusIds::UNAVAILABLE + || status == Ydb::StatusIds::BAD_SESSION + || status == Ydb::StatusIds::SESSION_EXPIRED + || status == Ydb::StatusIds::SESSION_BUSY + || status == Ydb::StatusIds::TIMEOUT + || status == Ydb::StatusIds::ABORTED) { + return ERetryErrorClass::ShortRetry; + } + + if (status == Ydb::StatusIds::OVERLOADED + || status == Ydb::StatusIds::UNDETERMINED) { + return ERetryErrorClass::LongRetry; + } + + return ERetryErrorClass::NoRetry; + } + +private: + const NActors::TActorId ReplyActorId; + const IRetryPolicy::TPtr RetryPolicy; + const std::function CreateQueryActor; + IRetryPolicy::IRetryState::TPtr RetryState = nullptr; +}; + } // namespace NKikimr