@@ -59,20 +59,20 @@ class TPqDqIntegration: public TDqIntegrationBase {
59
59
return 0 ;
60
60
}
61
61
62
- ui64 Partition (const TDqSettings&, size_t maxPartitions, const TExprNode& node, TVector<TString>& partitions, TString*, TExprContext&, bool ) override {
62
+ ui64 Partition (const TExprNode& node, TVector<TString>& partitions, TString*, TExprContext&, const TPartitionSettings& settings ) override {
63
63
if (auto maybePqRead = TMaybeNode<TPqReadTopic>(&node)) {
64
- return PartitionTopicRead (maybePqRead.Cast ().Topic (), maxPartitions , partitions);
64
+ return PartitionTopicRead (maybePqRead.Cast ().Topic (), settings. MaxPartitions , partitions);
65
65
}
66
66
if (auto maybeDqSource = TMaybeNode<TDqSource>(&node)) {
67
- auto settings = maybeDqSource.Cast ().Settings ();
68
- if (auto topicSource = TMaybeNode<TDqPqTopicSource>(settings .Raw ())) {
69
- return PartitionTopicRead (topicSource.Cast ().Topic (), maxPartitions , partitions);
67
+ auto srcSettings = maybeDqSource.Cast ().Settings ();
68
+ if (auto topicSource = TMaybeNode<TDqPqTopicSource>(srcSettings .Raw ())) {
69
+ return PartitionTopicRead (topicSource.Cast ().Topic (), settings. MaxPartitions , partitions);
70
70
}
71
71
}
72
72
return 0 ;
73
73
}
74
74
75
- TExprNode::TPtr WrapRead (const TDqSettings& dqSettings, const TExprNode::TPtr& read, TExprContext& ctx) override {
75
+ TExprNode::TPtr WrapRead (const TExprNode::TPtr& read, TExprContext& ctx, const TWrapReadSettings& wrSettings ) override {
76
76
if (const auto & maybePqReadTopic = TMaybeNode<TPqReadTopic>(read )) {
77
77
const auto & pqReadTopic = maybePqReadTopic.Cast ();
78
78
YQL_ENSURE (pqReadTopic.Ref ().GetTypeAnn (), " No type annotation for node " << pqReadTopic.Ref ().Content ());
@@ -127,7 +127,7 @@ class TPqDqIntegration: public TDqIntegrationBase {
127
127
128
128
const auto & typeItems = pqReadTopic.Topic ().RowSpec ().Ref ().GetTypeAnn ()->Cast <TTypeExprType>()->GetType ()->Cast <TStructExprType>()->GetItems ();
129
129
const auto pos = read ->Pos ();
130
-
130
+
131
131
TExprNode::TListType colNames;
132
132
colNames.reserve (typeItems.size ());
133
133
std::transform (typeItems.cbegin (), typeItems.cend (), std::back_inserter (colNames),
@@ -146,7 +146,7 @@ class TPqDqIntegration: public TDqIntegrationBase {
146
146
.World (pqReadTopic.World ())
147
147
.Topic (pqReadTopic.Topic ())
148
148
.Columns (std::move (columnNames))
149
- .Settings (BuildTopicReadSettings (clusterName, dqSettings , read ->Pos (), format, ctx))
149
+ .Settings (BuildTopicReadSettings (clusterName, wrSettings , read ->Pos (), format, ctx))
150
150
.Token <TCoSecureParam>()
151
151
.Name ().Build (token)
152
152
.Build ()
@@ -325,7 +325,7 @@ class TPqDqIntegration: public TDqIntegrationBase {
325
325
326
326
NNodes::TCoNameValueTupleList BuildTopicReadSettings (
327
327
const TString& cluster,
328
- const TDqSettings& dqSettings ,
328
+ const IDqIntegration::TWrapReadSettings& wrSettings ,
329
329
TPositionHandle pos,
330
330
std::string_view format,
331
331
TExprContext& ctx) const
@@ -349,7 +349,7 @@ class TPqDqIntegration: public TDqIntegrationBase {
349
349
Add (props, ReconnectPeriod, ToString (clusterConfiguration->ReconnectPeriod ), pos, ctx);
350
350
Add (props, Format, format, pos, ctx);
351
351
352
-
352
+
353
353
if (clusterConfiguration->UseSsl ) {
354
354
Add (props, UseSslSetting, " 1" , pos, ctx);
355
355
}
@@ -358,23 +358,21 @@ class TPqDqIntegration: public TDqIntegrationBase {
358
358
Add (props, AddBearerToTokenSetting, " 1" , pos, ctx);
359
359
}
360
360
361
- if (dqSettings .WatermarksMode . Get () .GetOrElse (" " ) == " default" ) {
361
+ if (wrSettings .WatermarksMode .GetOrElse (" " ) == " default" ) {
362
362
Add (props, WatermarksEnableSetting, ToString (true ), pos, ctx);
363
363
364
- const auto granularity = TDuration::MilliSeconds (dqSettings
364
+ const auto granularity = TDuration::MilliSeconds (wrSettings
365
365
.WatermarksGranularityMs
366
- .Get ()
367
366
.GetOrElse (TDqSettings::TDefault::WatermarksGranularityMs));
368
367
Add (props, WatermarksGranularityUsSetting, ToString (granularity.MicroSeconds ()), pos, ctx);
369
368
370
- const auto lateArrivalDelay = TDuration::MilliSeconds (dqSettings
369
+ const auto lateArrivalDelay = TDuration::MilliSeconds (wrSettings
371
370
.WatermarksLateArrivalDelayMs
372
- .Get ()
373
371
.GetOrElse (TDqSettings::TDefault::WatermarksLateArrivalDelayMs));
374
372
Add (props, WatermarksLateArrivalDelayUsSetting, ToString (lateArrivalDelay.MicroSeconds ()), pos, ctx);
375
373
}
376
374
377
- if (dqSettings .WatermarksEnableIdlePartitions . Get () .GetOrElse (false )) {
375
+ if (wrSettings .WatermarksEnableIdlePartitions .GetOrElse (false )) {
378
376
Add (props, WatermarksIdlePartitionsSetting, ToString (true ), pos, ctx);
379
377
}
380
378
0 commit comments