Skip to content

YQ-2734 added retries for internal queries #1457

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
89 changes: 53 additions & 36 deletions ydb/core/kqp/common/events/script_executions.h
Original file line number Diff line number Diff line change
Expand Up @@ -221,20 +221,28 @@ struct TEvFetchScriptResultsQueryResponse : public NActors::TEventLocal<TEvFetch
};

struct TEvSaveScriptExternalEffectRequest : public NActors::TEventLocal<TEvSaveScriptExternalEffectRequest, TKqpScriptExecutionEvents::EvSaveScriptExternalEffectRequest> {
struct TDescription {
TDescription(const TString& executionId, const TString& database, const TString& customerSuppliedId, const TString& userToken)
: ExecutionId(executionId)
, Database(database)
, CustomerSuppliedId(customerSuppliedId)
, UserToken(userToken)
{}

TString ExecutionId;
TString Database;

TString CustomerSuppliedId;
TString UserToken;
std::vector<NKqpProto::TKqpExternalSink> Sinks;
std::vector<TString> SecretNames;
};

TEvSaveScriptExternalEffectRequest(const TString& executionId, const TString& database, const TString& customerSuppliedId, const TString& userToken)
: ExecutionId(executionId)
, Database(database)
, CustomerSuppliedId(customerSuppliedId)
, UserToken(userToken)
: Description(executionId, database, customerSuppliedId, userToken)
{}

TString ExecutionId;
TString Database;

TString CustomerSuppliedId;
TString UserToken;
std::vector<NKqpProto::TKqpExternalSink> Sinks;
std::vector<TString> SecretNames;
TDescription Description;
};

struct TEvSaveScriptExternalEffectResponse : public NActors::TEventLocal<TEvSaveScriptExternalEffectResponse, TKqpScriptExecutionEvents::EvSaveScriptExternalEffectResponse> {
Expand All @@ -248,31 +256,41 @@ struct TEvSaveScriptExternalEffectResponse : public NActors::TEventLocal<TEvSave
};

struct TEvScriptFinalizeRequest : public NActors::TEventLocal<TEvScriptFinalizeRequest, TKqpScriptExecutionEvents::EvScriptFinalizeRequest> {
struct TDescription {
TDescription(EFinalizationStatus finalizationStatus, const TString& executionId, const TString& database,
Ydb::StatusIds::StatusCode operationStatus, Ydb::Query::ExecStatus execStatus, NYql::TIssues issues, std::optional<NKqpProto::TKqpStatsQuery> queryStats,
std::optional<TString> queryPlan, std::optional<TString> queryAst, std::optional<ui64> leaseGeneration)
: FinalizationStatus(finalizationStatus)
, ExecutionId(executionId)
, Database(database)
, OperationStatus(operationStatus)
, ExecStatus(execStatus)
, Issues(std::move(issues))
, QueryStats(std::move(queryStats))
, QueryPlan(std::move(queryPlan))
, QueryAst(std::move(queryAst))
, LeaseGeneration(leaseGeneration)
{}

EFinalizationStatus FinalizationStatus;
TString ExecutionId;
TString Database;
Ydb::StatusIds::StatusCode OperationStatus;
Ydb::Query::ExecStatus ExecStatus;
NYql::TIssues Issues;
std::optional<NKqpProto::TKqpStatsQuery> QueryStats;
std::optional<TString> QueryPlan;
std::optional<TString> QueryAst;
std::optional<ui64> LeaseGeneration;
};

TEvScriptFinalizeRequest(EFinalizationStatus finalizationStatus, const TString& executionId, const TString& database,
Ydb::StatusIds::StatusCode operationStatus, Ydb::Query::ExecStatus execStatus, NYql::TIssues issues = {}, std::optional<NKqpProto::TKqpStatsQuery> queryStats = std::nullopt,
std::optional<TString> queryPlan = std::nullopt, std::optional<TString> queryAst = std::nullopt, std::optional<ui64> leaseGeneration = std::nullopt)
: FinalizationStatus(finalizationStatus)
, ExecutionId(executionId)
, Database(database)
, OperationStatus(operationStatus)
, ExecStatus(execStatus)
, Issues(std::move(issues))
, QueryStats(std::move(queryStats))
, QueryPlan(std::move(queryPlan))
, QueryAst(std::move(queryAst))
, LeaseGeneration(leaseGeneration)
: Description(finalizationStatus, executionId, database, operationStatus, execStatus, issues, queryStats, queryPlan, queryAst, leaseGeneration)
{}

EFinalizationStatus FinalizationStatus;
TString ExecutionId;
TString Database;
Ydb::StatusIds::StatusCode OperationStatus;
Ydb::Query::ExecStatus ExecStatus;
NYql::TIssues Issues;
std::optional<NKqpProto::TKqpStatsQuery> QueryStats;
std::optional<TString> QueryPlan;
std::optional<TString> QueryAst;
std::optional<ui64> LeaseGeneration;
TDescription Description;
};

struct TEvScriptFinalizeResponse : public NActors::TEventLocal<TEvScriptFinalizeResponse, TKqpScriptExecutionEvents::EvScriptFinalizeResponse> {
Expand All @@ -284,15 +302,14 @@ struct TEvScriptFinalizeResponse : public NActors::TEventLocal<TEvScriptFinalize
};

struct TEvSaveScriptFinalStatusResponse : public NActors::TEventLocal<TEvSaveScriptFinalStatusResponse, TKqpScriptExecutionEvents::EvSaveScriptFinalStatusResponse> {
TEvSaveScriptFinalStatusResponse(const TString& customerSuppliedId, const TString& userToken)
: CustomerSuppliedId(customerSuppliedId)
, UserToken(userToken)
{}

bool ApplicateScriptExternalEffectRequired = false;
bool OperationAlreadyFinalized = false;
TString CustomerSuppliedId;
TString UserToken;
std::vector<NKqpProto::TKqpExternalSink> Sinks;
std::vector<TString> SecretNames;
Ydb::StatusIds::StatusCode Status;
NYql::TIssues Issues;
};

struct TEvDescribeSecretsResponse : public NActors::TEventLocal<TEvDescribeSecretsResponse, TKqpScriptExecutionEvents::EvDescribeSecretsResponse> {
Expand Down
4 changes: 2 additions & 2 deletions ydb/core/kqp/executer_actor/kqp_data_executer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1629,13 +1629,13 @@ class TKqpDataExecuter : public TKqpExecuterBase<TKqpDataExecuter, EExecType::Da
for (const auto& sink : stage.GetSinks()) {
if (sink.GetTypeCase() == NKqpProto::TKqpSink::kExternalSink) {
SaveScriptExternalEffectRequired = true;
scriptExternalEffect->Sinks.push_back(sink.GetExternalSink());
scriptExternalEffect->Description.Sinks.push_back(sink.GetExternalSink());
}
}
}
}
}
scriptExternalEffect->SecretNames = SecretNames;
scriptExternalEffect->Description.SecretNames = SecretNames;

if (!WaitRequired()) {
return Execute();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,26 +22,30 @@ class TScriptFinalizerActor : public TActorBootstrapped<TScriptFinalizerActor> {
const NKikimrConfig::TMetadataProviderConfig& metadataProviderConfig,
const std::optional<TKqpFederatedQuerySetup>& federatedQuerySetup)
: ReplyActor_(request->Sender)
, ExecutionId_(request->Get()->ExecutionId)
, Database_(request->Get()->Database)
, FinalizationStatus_(request->Get()->FinalizationStatus)
, ExecutionId_(request->Get()->Description.ExecutionId)
, Database_(request->Get()->Description.Database)
, FinalizationStatus_(request->Get()->Description.FinalizationStatus)
, Request_(std::move(request))
, FinalizationTimeout_(TDuration::Seconds(finalizeScriptServiceConfig.GetScriptFinalizationTimeoutSeconds()))
, MaximalSecretsSnapshotWaitTime_(2 * TDuration::Seconds(metadataProviderConfig.GetRefreshPeriodSeconds()))
, FederatedQuerySetup_(federatedQuerySetup)
{}

void Bootstrap() {
Register(CreateSaveScriptFinalStatusActor(std::move(Request_)));
Register(CreateSaveScriptFinalStatusActor(SelfId(), std::move(Request_)));
Become(&TScriptFinalizerActor::FetchState);
}

STRICT_STFUNC(FetchState,
hFunc(TEvSaveScriptFinalStatusResponse, Handle);
hFunc(TEvScriptExecutionFinished, Handle);
)

void Handle(TEvSaveScriptFinalStatusResponse::TPtr& ev) {
if (!ev->Get()->ApplicateScriptExternalEffectRequired || ev->Get()->Status != Ydb::StatusIds::SUCCESS) {
Reply(ev->Get()->OperationAlreadyFinalized, ev->Get()->Status, std::move(ev->Get()->Issues));
return;
}

Schedule(FinalizationTimeout_, new TEvents::TEvWakeup());
Become(&TScriptFinalizerActor::PrepareState);

Expand Down Expand Up @@ -168,7 +172,7 @@ class TScriptFinalizerActor : public TActorBootstrapped<TScriptFinalizerActor> {
)

void FinishScriptFinalization(std::optional<Ydb::StatusIds::StatusCode> status, NYql::TIssues issues) {
Register(CreateScriptFinalizationFinisherActor(ExecutionId_, Database_, status, std::move(issues)));
Register(CreateScriptFinalizationFinisherActor(SelfId(), ExecutionId_, Database_, status, std::move(issues)));
Become(&TScriptFinalizerActor::FinishState);
}

Expand All @@ -181,7 +185,11 @@ class TScriptFinalizerActor : public TActorBootstrapped<TScriptFinalizerActor> {
}

void Handle(TEvScriptExecutionFinished::TPtr& ev) {
Send(ReplyActor_, ev->Release());
Reply(ev->Get()->OperationAlreadyFinalized, ev->Get()->Status, std::move(ev->Get()->Issues));
}

void Reply(bool operationAlreadyFinalized, Ydb::StatusIds::StatusCode status, NYql::TIssues&& issues) {
Send(ReplyActor_, new TEvScriptExecutionFinished(operationAlreadyFinalized, status, std::move(issues)));
Send(MakeKqpFinalizeScriptServiceId(SelfId().NodeId()), new TEvScriptFinalizeResponse(ExecutionId_));

PassAway();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,17 +27,18 @@ class TKqpFinalizeScriptService : public TActorBootstrapped<TKqpFinalizeScriptSe
}

void Handle(TEvSaveScriptExternalEffectRequest::TPtr& ev) {
ev->Get()->Sinks = FilterExternalSinks(ev->Get()->Sinks);
auto& description = ev->Get()->Description;
description.Sinks = FilterExternalSinks(description.Sinks);

if (!ev->Get()->Sinks.empty()) {
if (!description.Sinks.empty()) {
Register(CreateSaveScriptExternalEffectActor(std::move(ev)));
} else {
Send(ev->Sender, new TEvSaveScriptExternalEffectResponse(Ydb::StatusIds::SUCCESS, {}));
}
}

void Handle(TEvScriptFinalizeRequest::TPtr& ev) {
TString executionId = ev->Get()->ExecutionId;
TString executionId = ev->Get()->Description.ExecutionId;

if (!FinalizationRequestsQueue_.contains(executionId)) {
WaitingFinalizationExecutions_.push(executionId);
Expand Down
Loading