Skip to content

Commit 2b4c9f9

Browse files
authored
YQ-3736 Shared reading: fix unsupported predicates (#10256)
1 parent 8321c66 commit 2b4c9f9

File tree

6 files changed

+30
-6
lines changed

6 files changed

+30
-6
lines changed

ydb/core/fq/libs/row_dispatcher/coordinator.cpp

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -168,7 +168,6 @@ void TActorCoordinator::AddRowDispatcher(NActors::TActorId actorId, bool isLocal
168168
void TActorCoordinator::Handle(NActors::TEvents::TEvPing::TPtr& ev) {
169169
LOG_ROW_DISPATCHER_TRACE("TEvPing received, " << ev->Sender);
170170
AddRowDispatcher(ev->Sender, false);
171-
PrintInternalState();
172171
LOG_ROW_DISPATCHER_TRACE("Send TEvPong to " << ev->Sender);
173172
Send(ev->Sender, new NActors::TEvents::TEvPong(), IEventHandle::FlagTrackDelivery | IEventHandle::FlagSubscribeOnSession);
174173
}

ydb/core/fq/libs/row_dispatcher/row_dispatcher.cpp

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -299,8 +299,8 @@ void TRowDispatcher::Handle(TEvPrivate::TEvCoordinatorPing::TPtr&) {
299299
if (!CoordinatorActorId) {
300300
return;
301301
}
302-
LOG_ROW_DISPATCHER_DEBUG("Send ping to " << *CoordinatorActorId);
303-
Send(*CoordinatorActorId, new NActors::TEvents::TEvPing(), IEventHandle::FlagTrackDelivery | IEventHandle::FlagSubscribeOnSession);
302+
LOG_ROW_DISPATCHER_TRACE("Send ping to " << *CoordinatorActorId);
303+
Send(*CoordinatorActorId, new NActors::TEvents::TEvPing());
304304
}
305305

306306
void TRowDispatcher::Handle(NActors::TEvents::TEvPong::TPtr&) {

ydb/library/yql/providers/pq/async_io/dq_pq_rd_read_actor.cpp

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -598,7 +598,7 @@ void TDqPqRdReadActor::Handle(NFq::TEvRowDispatcher::TEvMessageBatch::TPtr& ev)
598598
return;
599599
}
600600

601-
Metrics.InFlyGetNextBatch->Dec();
601+
Metrics.InFlyGetNextBatch->Set(0);
602602
auto& sessionInfo = it->second;
603603
if (!sessionInfo.EventsQueue.OnEventReceived(ev)) {
604604
SRC_LOG_W("Wrong seq num ignore message, seqNo " << meta.GetSeqNo());

ydb/library/yql/providers/pq/provider/yql_pq_dq_integration.cpp

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -263,7 +263,8 @@ class TPqDqIntegration: public TDqIntegrationBase {
263263
if (auto predicate = topicSource.FilterPredicate(); !NYql::IsEmptyFilterPredicate(predicate)) {
264264
TStringBuilder err;
265265
if (!NYql::SerializeFilterPredicate(predicate, &predicateProto, err)) {
266-
ythrow yexception() << "Failed to serialize filter predicate for source: " << err;
266+
ctx.AddWarning(TIssue(ctx.GetPosition(node.Pos()), "Failed to serialize filter predicate for source: " + err));
267+
predicateProto.Clear();
267268
}
268269
}
269270

ydb/library/yql/providers/pq/provider/yql_pq_logical_opt.cpp

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -257,7 +257,6 @@ class TPqLogicalOptProposalTransformer : public TOptimizeTransformerBase {
257257

258258
auto newFilterLambda = MakePushdownPredicate(flatmap.Lambda(), ctx, node.Pos(), TPushdownSettings());
259259
if (!newFilterLambda) {
260-
ctx.AddWarning(TIssue(ctx.GetPosition(node.Pos()), "No predicate to pushdown"));
261260
return node;
262261
}
263262
YQL_CLOG(INFO, ProviderPq) << "Build new TCoFlatMap with predicate";

ydb/tests/fq/yds/test_row_dispatcher.py

Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -243,6 +243,31 @@ def test_filter(self, kikimr, client):
243243
issues = str(client.describe_query(query_id).result.query.transient_issue)
244244
assert "Row dispatcher will use the predicate: WHERE (`time` > 101" in issues, "Incorrect Issues: " + issues
245245

246+
@yq_v1
247+
def test_filter_use_unsupported_predicate(self, kikimr, client):
248+
client.create_yds_connection(
249+
YDS_CONNECTION, os.getenv("YDB_DATABASE"), os.getenv("YDB_ENDPOINT"), shared_reading=True
250+
)
251+
self.init_topics("test_filter_use_unsupported_predicate")
252+
253+
sql = Rf'''
254+
INSERT INTO {YDS_CONNECTION}.`{self.output_topic}`
255+
SELECT Cast(time as String) FROM {YDS_CONNECTION}.`{self.input_topic}`
256+
WITH (format=json_each_row, SCHEMA (time UInt64 NOT NULL, data String NOT NULL, event String NOT NULL))
257+
WHERE event LIKE 'event2%';'''
258+
259+
query_id = start_yds_query(kikimr, client, sql)
260+
wait_actor_count(kikimr, "FQ_ROW_DISPATCHER_SESSION", 1)
261+
262+
data = [
263+
'{"time": 102, "data": "hello2", "event": "event2"}',
264+
]
265+
266+
self.write_stream(data)
267+
assert self.read_stream(1, topic_path=self.output_topic) == ['102']
268+
wait_actor_count(kikimr, "DQ_PQ_READ_ACTOR", 1)
269+
stop_yds_query(client, query_id)
270+
246271
@yq_v1
247272
def test_filter_with_mr(self, kikimr, client):
248273
client.create_yds_connection(

0 commit comments

Comments
 (0)