Skip to content

Commit 6f83431

Browse files
committed
Added retries for internal queries
1 parent 8472b26 commit 6f83431

File tree

6 files changed

+227
-212
lines changed

6 files changed

+227
-212
lines changed

ydb/core/kqp/common/events/script_executions.h

+53-36
Original file line numberDiff line numberDiff line change
@@ -221,20 +221,28 @@ struct TEvFetchScriptResultsQueryResponse : public NActors::TEventLocal<TEvFetch
221221
};
222222

223223
struct TEvSaveScriptExternalEffectRequest : public NActors::TEventLocal<TEvSaveScriptExternalEffectRequest, TKqpScriptExecutionEvents::EvSaveScriptExternalEffectRequest> {
224+
struct TDescription {
225+
TDescription(const TString& executionId, const TString& database, const TString& customerSuppliedId, const TString& userToken)
226+
: ExecutionId(executionId)
227+
, Database(database)
228+
, CustomerSuppliedId(customerSuppliedId)
229+
, UserToken(userToken)
230+
{}
231+
232+
TString ExecutionId;
233+
TString Database;
234+
235+
TString CustomerSuppliedId;
236+
TString UserToken;
237+
std::vector<NKqpProto::TKqpExternalSink> Sinks;
238+
std::vector<TString> SecretNames;
239+
};
240+
224241
TEvSaveScriptExternalEffectRequest(const TString& executionId, const TString& database, const TString& customerSuppliedId, const TString& userToken)
225-
: ExecutionId(executionId)
226-
, Database(database)
227-
, CustomerSuppliedId(customerSuppliedId)
228-
, UserToken(userToken)
242+
: Description(executionId, database, customerSuppliedId, userToken)
229243
{}
230244

231-
TString ExecutionId;
232-
TString Database;
233-
234-
TString CustomerSuppliedId;
235-
TString UserToken;
236-
std::vector<NKqpProto::TKqpExternalSink> Sinks;
237-
std::vector<TString> SecretNames;
245+
TDescription Description;
238246
};
239247

240248
struct TEvSaveScriptExternalEffectResponse : public NActors::TEventLocal<TEvSaveScriptExternalEffectResponse, TKqpScriptExecutionEvents::EvSaveScriptExternalEffectResponse> {
@@ -248,31 +256,41 @@ struct TEvSaveScriptExternalEffectResponse : public NActors::TEventLocal<TEvSave
248256
};
249257

250258
struct TEvScriptFinalizeRequest : public NActors::TEventLocal<TEvScriptFinalizeRequest, TKqpScriptExecutionEvents::EvScriptFinalizeRequest> {
259+
struct TDescription {
260+
TDescription(EFinalizationStatus finalizationStatus, const TString& executionId, const TString& database,
261+
Ydb::StatusIds::StatusCode operationStatus, Ydb::Query::ExecStatus execStatus, NYql::TIssues issues, std::optional<NKqpProto::TKqpStatsQuery> queryStats,
262+
std::optional<TString> queryPlan, std::optional<TString> queryAst, std::optional<ui64> leaseGeneration)
263+
: FinalizationStatus(finalizationStatus)
264+
, ExecutionId(executionId)
265+
, Database(database)
266+
, OperationStatus(operationStatus)
267+
, ExecStatus(execStatus)
268+
, Issues(std::move(issues))
269+
, QueryStats(std::move(queryStats))
270+
, QueryPlan(std::move(queryPlan))
271+
, QueryAst(std::move(queryAst))
272+
, LeaseGeneration(leaseGeneration)
273+
{}
274+
275+
EFinalizationStatus FinalizationStatus;
276+
TString ExecutionId;
277+
TString Database;
278+
Ydb::StatusIds::StatusCode OperationStatus;
279+
Ydb::Query::ExecStatus ExecStatus;
280+
NYql::TIssues Issues;
281+
std::optional<NKqpProto::TKqpStatsQuery> QueryStats;
282+
std::optional<TString> QueryPlan;
283+
std::optional<TString> QueryAst;
284+
std::optional<ui64> LeaseGeneration;
285+
};
286+
251287
TEvScriptFinalizeRequest(EFinalizationStatus finalizationStatus, const TString& executionId, const TString& database,
252288
Ydb::StatusIds::StatusCode operationStatus, Ydb::Query::ExecStatus execStatus, NYql::TIssues issues = {}, std::optional<NKqpProto::TKqpStatsQuery> queryStats = std::nullopt,
253289
std::optional<TString> queryPlan = std::nullopt, std::optional<TString> queryAst = std::nullopt, std::optional<ui64> leaseGeneration = std::nullopt)
254-
: FinalizationStatus(finalizationStatus)
255-
, ExecutionId(executionId)
256-
, Database(database)
257-
, OperationStatus(operationStatus)
258-
, ExecStatus(execStatus)
259-
, Issues(std::move(issues))
260-
, QueryStats(std::move(queryStats))
261-
, QueryPlan(std::move(queryPlan))
262-
, QueryAst(std::move(queryAst))
263-
, LeaseGeneration(leaseGeneration)
290+
: Description(finalizationStatus, executionId, database, operationStatus, execStatus, issues, queryStats, queryPlan, queryAst, leaseGeneration)
264291
{}
265292

266-
EFinalizationStatus FinalizationStatus;
267-
TString ExecutionId;
268-
TString Database;
269-
Ydb::StatusIds::StatusCode OperationStatus;
270-
Ydb::Query::ExecStatus ExecStatus;
271-
NYql::TIssues Issues;
272-
std::optional<NKqpProto::TKqpStatsQuery> QueryStats;
273-
std::optional<TString> QueryPlan;
274-
std::optional<TString> QueryAst;
275-
std::optional<ui64> LeaseGeneration;
293+
TDescription Description;
276294
};
277295

278296
struct TEvScriptFinalizeResponse : public NActors::TEventLocal<TEvScriptFinalizeResponse, TKqpScriptExecutionEvents::EvScriptFinalizeResponse> {
@@ -284,15 +302,14 @@ struct TEvScriptFinalizeResponse : public NActors::TEventLocal<TEvScriptFinalize
284302
};
285303

286304
struct TEvSaveScriptFinalStatusResponse : public NActors::TEventLocal<TEvSaveScriptFinalStatusResponse, TKqpScriptExecutionEvents::EvSaveScriptFinalStatusResponse> {
287-
TEvSaveScriptFinalStatusResponse(const TString& customerSuppliedId, const TString& userToken)
288-
: CustomerSuppliedId(customerSuppliedId)
289-
, UserToken(userToken)
290-
{}
291-
305+
bool ApplicateScriptExternalEffectRequired = false;
306+
bool OperationAlreadyFinalized = false;
292307
TString CustomerSuppliedId;
293308
TString UserToken;
294309
std::vector<NKqpProto::TKqpExternalSink> Sinks;
295310
std::vector<TString> SecretNames;
311+
Ydb::StatusIds::StatusCode Status;
312+
NYql::TIssues Issues;
296313
};
297314

298315
struct TEvDescribeSecretsResponse : public NActors::TEventLocal<TEvDescribeSecretsResponse, TKqpScriptExecutionEvents::EvDescribeSecretsResponse> {

ydb/core/kqp/executer_actor/kqp_data_executer.cpp

+2-2
Original file line numberDiff line numberDiff line change
@@ -1627,13 +1627,13 @@ class TKqpDataExecuter : public TKqpExecuterBase<TKqpDataExecuter, EExecType::Da
16271627
for (const auto& sink : stage.GetSinks()) {
16281628
if (sink.GetTypeCase() == NKqpProto::TKqpSink::kExternalSink) {
16291629
SaveScriptExternalEffectRequired = true;
1630-
scriptExternalEffect->Sinks.push_back(sink.GetExternalSink());
1630+
scriptExternalEffect->Description.Sinks.push_back(sink.GetExternalSink());
16311631
}
16321632
}
16331633
}
16341634
}
16351635
}
1636-
scriptExternalEffect->SecretNames = SecretNames;
1636+
scriptExternalEffect->Description.SecretNames = SecretNames;
16371637

16381638
if (!WaitRequired()) {
16391639
return Execute();

ydb/core/kqp/finalize_script_service/kqp_finalize_script_actor.cpp

+15-7
Original file line numberDiff line numberDiff line change
@@ -22,26 +22,30 @@ class TScriptFinalizerActor : public TActorBootstrapped<TScriptFinalizerActor> {
2222
const NKikimrConfig::TMetadataProviderConfig& metadataProviderConfig,
2323
const std::optional<TKqpFederatedQuerySetup>& federatedQuerySetup)
2424
: ReplyActor_(request->Sender)
25-
, ExecutionId_(request->Get()->ExecutionId)
26-
, Database_(request->Get()->Database)
27-
, FinalizationStatus_(request->Get()->FinalizationStatus)
25+
, ExecutionId_(request->Get()->Description.ExecutionId)
26+
, Database_(request->Get()->Description.Database)
27+
, FinalizationStatus_(request->Get()->Description.FinalizationStatus)
2828
, Request_(std::move(request))
2929
, FinalizationTimeout_(TDuration::Seconds(finalizeScriptServiceConfig.GetScriptFinalizationTimeoutSeconds()))
3030
, MaximalSecretsSnapshotWaitTime_(2 * TDuration::Seconds(metadataProviderConfig.GetRefreshPeriodSeconds()))
3131
, FederatedQuerySetup_(federatedQuerySetup)
3232
{}
3333

3434
void Bootstrap() {
35-
Register(CreateSaveScriptFinalStatusActor(std::move(Request_)));
35+
Register(CreateSaveScriptFinalStatusActor(SelfId(), std::move(Request_)));
3636
Become(&TScriptFinalizerActor::FetchState);
3737
}
3838

3939
STRICT_STFUNC(FetchState,
4040
hFunc(TEvSaveScriptFinalStatusResponse, Handle);
41-
hFunc(TEvScriptExecutionFinished, Handle);
4241
)
4342

4443
void Handle(TEvSaveScriptFinalStatusResponse::TPtr& ev) {
44+
if (!ev->Get()->ApplicateScriptExternalEffectRequired || ev->Get()->Status != Ydb::StatusIds::SUCCESS) {
45+
Reply(ev->Get()->OperationAlreadyFinalized, ev->Get()->Status, std::move(ev->Get()->Issues));
46+
return;
47+
}
48+
4549
Schedule(FinalizationTimeout_, new TEvents::TEvWakeup());
4650
Become(&TScriptFinalizerActor::PrepareState);
4751

@@ -168,7 +172,7 @@ class TScriptFinalizerActor : public TActorBootstrapped<TScriptFinalizerActor> {
168172
)
169173

170174
void FinishScriptFinalization(std::optional<Ydb::StatusIds::StatusCode> status, NYql::TIssues issues) {
171-
Register(CreateScriptFinalizationFinisherActor(ExecutionId_, Database_, status, std::move(issues)));
175+
Register(CreateScriptFinalizationFinisherActor(SelfId(), ExecutionId_, Database_, status, std::move(issues)));
172176
Become(&TScriptFinalizerActor::FinishState);
173177
}
174178

@@ -181,7 +185,11 @@ class TScriptFinalizerActor : public TActorBootstrapped<TScriptFinalizerActor> {
181185
}
182186

183187
void Handle(TEvScriptExecutionFinished::TPtr& ev) {
184-
Send(ReplyActor_, ev->Release());
188+
Reply(ev->Get()->OperationAlreadyFinalized, ev->Get()->Status, std::move(ev->Get()->Issues));
189+
}
190+
191+
void Reply(bool operationAlreadyFinalized, Ydb::StatusIds::StatusCode status, NYql::TIssues&& issues) {
192+
Send(ReplyActor_, new TEvScriptExecutionFinished(operationAlreadyFinalized, status, std::move(issues)));
185193
Send(MakeKqpFinalizeScriptServiceId(SelfId().NodeId()), new TEvScriptFinalizeResponse(ExecutionId_));
186194

187195
PassAway();

ydb/core/kqp/finalize_script_service/kqp_finalize_script_service.cpp

+4-3
Original file line numberDiff line numberDiff line change
@@ -27,17 +27,18 @@ class TKqpFinalizeScriptService : public TActorBootstrapped<TKqpFinalizeScriptSe
2727
}
2828

2929
void Handle(TEvSaveScriptExternalEffectRequest::TPtr& ev) {
30-
ev->Get()->Sinks = FilterExternalSinks(ev->Get()->Sinks);
30+
auto& description = ev->Get()->Description;
31+
description.Sinks = FilterExternalSinks(description.Sinks);
3132

32-
if (!ev->Get()->Sinks.empty()) {
33+
if (!description.Sinks.empty()) {
3334
Register(CreateSaveScriptExternalEffectActor(std::move(ev)));
3435
} else {
3536
Send(ev->Sender, new TEvSaveScriptExternalEffectResponse(Ydb::StatusIds::SUCCESS, {}));
3637
}
3738
}
3839

3940
void Handle(TEvScriptFinalizeRequest::TPtr& ev) {
40-
TString executionId = ev->Get()->ExecutionId;
41+
TString executionId = ev->Get()->Description.ExecutionId;
4142

4243
if (!FinalizationRequestsQueue_.contains(executionId)) {
4344
WaitingFinalizationExecutions_.push(executionId);

0 commit comments

Comments
 (0)