Skip to content

Commit 723085a

Browse files
authored
Merge 943f055 into 38747d7
2 parents 38747d7 + 943f055 commit 723085a

File tree

2 files changed

+30
-2
lines changed

2 files changed

+30
-2
lines changed

ydb/core/fq/libs/row_dispatcher/json_filter.cpp

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@
44
#include <ydb/library/yql/minikql/mkql_alloc.h>
55
#include <ydb/library/yql/minikql/computation/mkql_computation_node_holders.h>
66
#include <ydb/library/yql/minikql/mkql_terminator.h>
7+
#include <ydb/library/yql/minikql/mkql_string_util.h>
78

89
#include <ydb/core/fq/libs/row_dispatcher/json_filter.h>
910
#include <ydb/core/fq/libs/actors/logging/log.h>
@@ -111,8 +112,7 @@ class TFilterInputConsumer : public NYql::NPureCalc::IConsumer<std::pair<ui64, c
111112

112113
size_t fieldId = 0;
113114
for (const auto& column : values.second) {
114-
NYql::NUdf::TStringValue str(column[rowId]);
115-
items[FieldsPositions[fieldId++]] = NYql::NUdf::TUnboxedValuePod(std::move(str));
115+
items[FieldsPositions[fieldId++]] = NKikimr::NMiniKQL::MakeString(column[rowId]);
116116
}
117117

118118
Worker->Push(std::move(result));

ydb/tests/fq/yds/test_row_dispatcher.py

Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -285,6 +285,34 @@ def test_filter(self, kikimr, client):
285285
issues = str(client.describe_query(query_id).result.query.transient_issue)
286286
assert "Row dispatcher will use the predicate: WHERE (`time` > 101" in issues, "Incorrect Issues: " + issues
287287

288+
@yq_v1
289+
def test_filter_missing_fields(self, kikimr, client):
290+
client.create_yds_connection(
291+
YDS_CONNECTION, os.getenv("YDB_DATABASE"), os.getenv("YDB_ENDPOINT"), shared_reading=True
292+
)
293+
self.init_topics("test_filter")
294+
295+
sql = Rf'''
296+
INSERT INTO {YDS_CONNECTION}.`{self.output_topic}`
297+
SELECT Cast(time as String) FROM {YDS_CONNECTION}.`{self.input_topic}`
298+
WITH (format=json_each_row, SCHEMA (time UInt64 NOT NULL, data String, event String NOT NULL))
299+
WHERE data = "";'''
300+
301+
query_id = start_yds_query(kikimr, client, sql)
302+
wait_actor_count(kikimr, "FQ_ROW_DISPATCHER_SESSION", 1)
303+
304+
data = [
305+
'{"time": 101, "event": "event1"}',
306+
'{"time": 102, "data": null, "event": "event2"}',
307+
]
308+
309+
self.write_stream(data)
310+
expected = ['101', '102']
311+
assert self.read_stream(len(expected), topic_path=self.output_topic) == expected
312+
313+
wait_actor_count(kikimr, "DQ_PQ_READ_ACTOR", 1)
314+
stop_yds_query(client, query_id)
315+
288316
@yq_v1
289317
def test_filter_use_unsupported_predicate(self, kikimr, client):
290318
client.create_yds_connection(

0 commit comments

Comments
 (0)