Skip to content

Commit 0b72228

Browse files
committed
YQL-17542 move SaveState LoadState (ydb-platform#1474)
1 parent 382f31d commit 0b72228

File tree

2 files changed

+27
-25
lines changed

2 files changed

+27
-25
lines changed

ydb/library/yql/dq/actors/compute/dq_compute_actor_impl.h

Lines changed: 1 addition & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -741,22 +741,6 @@ class TDqComputeActorBase : public NActors::TActorBootstrapped<TDerived>
741741
return true;
742742
}
743743

744-
void SaveState(const NDqProto::TCheckpoint& checkpoint, NDqProto::TComputeActorState& state) const override {
745-
CA_LOG_D("Save state");
746-
NDqProto::TMiniKqlProgramState& mkqlProgramState = *state.MutableMiniKqlProgram();
747-
mkqlProgramState.SetRuntimeVersion(NDqProto::RUNTIME_VERSION_YQL_1_0);
748-
NDqProto::TStateData::TData& data = *mkqlProgramState.MutableData()->MutableStateData();
749-
data.SetVersion(TDqComputeActorCheckpoints::ComputeActorCurrentStateVersion);
750-
data.SetBlob(TaskRunner->Save());
751-
752-
for (auto& [inputIndex, source] : SourcesMap) {
753-
YQL_ENSURE(source.AsyncInput, "Source[" << inputIndex << "] is not created");
754-
NDqProto::TSourceState& sourceState = *state.AddSources();
755-
source.AsyncInput->SaveState(checkpoint, sourceState);
756-
sourceState.SetInputIndex(inputIndex);
757-
}
758-
}
759-
760744
void CommitState(const NDqProto::TCheckpoint& checkpoint) override {
761745
CA_LOG_D("Commit state");
762746
for (auto& [inputIndex, source] : SourcesMap) {
@@ -810,15 +794,7 @@ class TDqComputeActorBase : public NActors::TActorBootstrapped<TDerived>
810794
}
811795
}
812796

813-
virtual void DoLoadRunnerState(TString&& blob) {
814-
TMaybe<TString> error = Nothing();
815-
try {
816-
TaskRunner->Load(blob);
817-
} catch (const std::exception& e) {
818-
error = e.what();
819-
}
820-
Checkpoints->AfterStateLoading(error);
821-
}
797+
virtual void DoLoadRunnerState(TString&& blob) = 0;
822798

823799
void LoadState(NDqProto::TComputeActorState&& state) override {
824800
CA_LOG_D("Load state");

ydb/library/yql/dq/actors/compute/dq_sync_compute_actor_base.h

Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -44,6 +44,32 @@ class TDqSyncComputeActorBase: public TDqComputeActorBase<TDerived, TComputeActo
4444
return inputTransformInfo.Buffer.Get();
4545
}
4646
protected:
47+
void SaveState(const NDqProto::TCheckpoint& checkpoint, NDqProto::TComputeActorState& state) const override {
48+
CA_LOG_D("Save state");
49+
NDqProto::TMiniKqlProgramState& mkqlProgramState = *state.MutableMiniKqlProgram();
50+
mkqlProgramState.SetRuntimeVersion(NDqProto::RUNTIME_VERSION_YQL_1_0);
51+
NDqProto::TStateData::TData& data = *mkqlProgramState.MutableData()->MutableStateData();
52+
data.SetVersion(TDqComputeActorCheckpoints::ComputeActorCurrentStateVersion);
53+
data.SetBlob(this->TaskRunner->Save());
54+
55+
for (auto& [inputIndex, source] : this->SourcesMap) {
56+
YQL_ENSURE(source.AsyncInput, "Source[" << inputIndex << "] is not created");
57+
NDqProto::TSourceState& sourceState = *state.AddSources();
58+
source.AsyncInput->SaveState(checkpoint, sourceState);
59+
sourceState.SetInputIndex(inputIndex);
60+
}
61+
}
62+
63+
void DoLoadRunnerState(TString&& blob) override {
64+
TMaybe<TString> error = Nothing();
65+
try {
66+
this->TaskRunner->Load(blob);
67+
} catch (const std::exception& e) {
68+
error = e.what();
69+
}
70+
this->Checkpoints->AfterStateLoading(error);
71+
}
72+
4773
void SetTaskRunner(const TIntrusivePtr<IDqTaskRunner>& taskRunner) {
4874
this->TaskRunner = taskRunner;
4975
}

0 commit comments

Comments
 (0)