|
4 | 4 | #include "target_with_stream.h"
|
5 | 5 | #include "util.h"
|
6 | 6 |
|
| 7 | +#include <ydb/core/base/appdata.h> |
7 | 8 | #include <ydb/core/base/path.h>
|
| 9 | +#include <ydb/core/protos/replication.pb.h> |
8 | 10 | #include <ydb/core/tx/replication/ydb_proxy/ydb_proxy.h>
|
9 | 11 | #include <ydb/library/actors/core/actor_bootstrapped.h>
|
10 | 12 | #include <ydb/library/actors/core/hfunc.h>
|
|
16 | 18 | namespace NKikimr::NReplication::NController {
|
17 | 19 |
|
18 | 20 | class TStreamCreator: public TActorBootstrapped<TStreamCreator> {
|
19 |
| - static NYdb::NTable::TChangefeedDescription MakeChangefeed(const TString& name, const NJson::TJsonMap& attrs) { |
| 21 | + static NYdb::NTable::TChangefeedDescription MakeChangefeed( |
| 22 | + const TString& name, const TDuration& retentionPeriod, const NJson::TJsonMap& attrs) |
| 23 | + { |
20 | 24 | using namespace NYdb::NTable;
|
21 | 25 | return TChangefeedDescription(name, EChangefeedMode::Updates, EChangefeedFormat::Json)
|
| 26 | + .WithRetentionPeriod(retentionPeriod) |
22 | 27 | .WithInitialScan()
|
23 | 28 | .AddAttribute("__async_replication", NJson::WriteJson(attrs, false));
|
24 | 29 | }
|
@@ -133,14 +138,15 @@ class TStreamCreator: public TActorBootstrapped<TStreamCreator> {
|
133 | 138 | TReplication::ETargetKind kind,
|
134 | 139 | const TString& srcPath,
|
135 | 140 | const TString& dstPath,
|
136 |
| - const TString& streamName) |
| 141 | + const TString& streamName, |
| 142 | + const TDuration& streamRetentionPeriod) |
137 | 143 | : Parent(parent)
|
138 | 144 | , YdbProxy(proxy)
|
139 | 145 | , ReplicationId(rid)
|
140 | 146 | , TargetId(tid)
|
141 | 147 | , Kind(kind)
|
142 | 148 | , SrcPath(srcPath)
|
143 |
| - , Changefeed(MakeChangefeed(streamName, NJson::TJsonMap{ |
| 149 | + , Changefeed(MakeChangefeed(streamName, streamRetentionPeriod, NJson::TJsonMap{ |
144 | 150 | {"path", dstPath},
|
145 | 151 | {"id", ToString(rid)},
|
146 | 152 | }))
|
@@ -175,13 +181,15 @@ IActor* CreateStreamCreator(TReplication* replication, ui64 targetId, const TAct
|
175 | 181 | Y_ABORT_UNLESS(target);
|
176 | 182 | return CreateStreamCreator(ctx.SelfID, replication->GetYdbProxy(),
|
177 | 183 | replication->GetId(), target->GetId(), target->GetKind(),
|
178 |
| - target->GetSrcPath(), target->GetDstPath(), target->GetStreamName()); |
| 184 | + target->GetSrcPath(), target->GetDstPath(), target->GetStreamName(), |
| 185 | + TDuration::Seconds(AppData()->ReplicationConfig.GetRetentionPeriodSeconds())); |
179 | 186 | }
|
180 | 187 |
|
181 | 188 | IActor* CreateStreamCreator(const TActorId& parent, const TActorId& proxy, ui64 rid, ui64 tid,
|
182 |
| - TReplication::ETargetKind kind, const TString& srcPath, const TString& dstPath, const TString& streamName) |
| 189 | + TReplication::ETargetKind kind, const TString& srcPath, const TString& dstPath, |
| 190 | + const TString& streamName, const TDuration& streamRetentionPeriod) |
183 | 191 | {
|
184 |
| - return new TStreamCreator(parent, proxy, rid, tid, kind, srcPath, dstPath, streamName); |
| 192 | + return new TStreamCreator(parent, proxy, rid, tid, kind, srcPath, dstPath, streamName, streamRetentionPeriod); |
185 | 193 | }
|
186 | 194 |
|
187 | 195 | }
|
0 commit comments