Skip to content

Commit 20be255

Browse files
committed
Fix schema not respected
1 parent f0331ef commit 20be255

File tree

2 files changed

+51
-15
lines changed

2 files changed

+51
-15
lines changed

src/table_view.cc

Lines changed: 17 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@
2222
#include <pulsar/TableViewConfiguration.h>
2323
#include <pybind11/stl.h>
2424
#include <pybind11/functional.h>
25+
#include <functional>
2526
#include <utility>
2627
#include "utils.h"
2728

@@ -51,8 +52,22 @@ void export_table_view(py::module_& m) {
5152
}
5253
})
5354
.def("size", &TableView::size, py::call_guard<py::gil_scoped_release>())
54-
.def("for_each", &TableView::forEach, py::call_guard<py::gil_scoped_release>())
55-
.def("for_each_and_listen", &TableView::forEachAndListen, py::call_guard<py::gil_scoped_release>())
55+
.def("for_each",
56+
[](TableView& view, std::function<void(std::string, py::bytes)> callback) {
57+
py::gil_scoped_release release;
58+
view.forEach([callback](const std::string& key, const std::string& value) {
59+
py::gil_scoped_acquire acquire;
60+
callback(key, py::bytes(value));
61+
});
62+
})
63+
.def("for_each_and_listen",
64+
[](TableView& view, std::function<void(std::string, py::bytes)> callback) {
65+
py::gil_scoped_release release;
66+
view.forEachAndListen([callback](const std::string& key, const std::string& value) {
67+
py::gil_scoped_acquire acquire;
68+
callback(key, py::bytes(value));
69+
});
70+
})
5671
.def("close", [](TableView& view) {
5772
waitForAsyncResult([&view](ResultCallback callback) { view.closeAsync(callback); });
5873
});

tests/table_view_test.py

Lines changed: 34 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@
2323
import time
2424

2525
from pulsar import Client
26+
from pulsar.schema.schema import StringSchema
2627

2728
class TableViewTest(TestCase):
2829

@@ -38,14 +39,14 @@ def test_get(self):
3839
self.assertEqual(len(table_view), 0)
3940

4041
producer = self._client.create_producer(topic)
41-
producer.send('value-0'.encode(), partition_key='key-0')
42+
producer.send(b'value-0', partition_key='key-0')
4243
producer.send(b'\xba\xd0\xba\xd0', partition_key='key-1') # an invalid UTF-8 bytes
4344

4445
self._wait_for_assertion(lambda: self.assertEqual(len(table_view), 2))
4546
self.assertEqual(table_view.get('key-0'), b'value-0')
4647
self.assertEqual(table_view.get('key-1'), b'\xba\xd0\xba\xd0')
4748

48-
producer.send('value-1'.encode(), partition_key='key-0')
49+
producer.send(b'value-1', partition_key='key-0')
4950
self._wait_for_assertion(lambda: self.assertEqual(table_view.get('key-0'), b'value-1'))
5051

5152
producer.close()
@@ -55,15 +56,15 @@ def test_for_each(self):
5556
topic = f'table_view_test_for_each-{time.time()}'
5657
table_view = self._client.create_table_view(topic)
5758
producer = self._client.create_producer(topic)
58-
producer.send('value-0'.encode(), partition_key='key-0')
59-
producer.send('value-1'.encode(), partition_key='key-1')
59+
producer.send(b'value-0', partition_key='key-0')
60+
producer.send(b'value-1', partition_key='key-1')
6061
self._wait_for_assertion(lambda: self.assertEqual(len(table_view), 2))
6162

6263
d = dict()
6364
table_view.for_each(lambda key, value: d.__setitem__(key, value))
6465
self.assertEqual(d, {
65-
'key-0': 'value-0',
66-
'key-1': 'value-1'
66+
'key-0': b'value-0',
67+
'key-1': b'value-1'
6768
})
6869

6970
def listener(key: str, value: str):
@@ -75,20 +76,40 @@ def listener(key: str, value: str):
7576
d.clear()
7677
table_view.for_each_and_listen(listener)
7778
self.assertEqual(d, {
78-
'key-0': 'value-0',
79-
'key-1': 'value-1'
79+
'key-0': b'value-0',
80+
'key-1': b'value-1'
8081
})
8182

82-
producer.send('value-0-new'.encode(), partition_key='key-0')
83-
producer.send(''.encode(), partition_key='key-1')
84-
producer.send('value-2'.encode(), partition_key='key-2')
83+
producer.send(b'value-0-new', partition_key='key-0')
84+
producer.send(b'', partition_key='key-1')
85+
producer.send(b'value-2', partition_key='key-2')
8586
def assert_latest_values():
8687
self.assertEqual(d, {
87-
'key-0': 'value-0-new',
88-
'key-2': 'value-2'
88+
'key-0': b'value-0-new',
89+
'key-2': b'value-2'
8990
})
9091
self._wait_for_assertion(assert_latest_values)
9192

93+
def test_schema(self):
94+
topic = f'table_view_test_schema-{time.time()}'
95+
table_view = self._client.create_table_view(topic, schema=StringSchema())
96+
producer = self._client.create_producer(topic, schema=StringSchema())
97+
producer.send('value', partition_key='key')
98+
99+
self._wait_for_assertion(lambda: self.assertEqual(table_view.get('key'), 'value'))
100+
self.assertEqual(table_view.get('missed-key'), None)
101+
102+
entries = dict()
103+
table_view.for_each(lambda key, value: entries.__setitem__(key, value))
104+
self.assertEqual(entries, {'key': 'value'})
105+
106+
entries.clear()
107+
table_view.for_each_and_listen(lambda key, value: entries.__setitem__(key, value))
108+
self.assertEqual(entries, {'key': 'value'})
109+
110+
producer.send('new-value', partition_key='key')
111+
self._wait_for_assertion(lambda: self.assertEqual(table_view.get('key'), 'new-value'))
112+
92113
def _wait_for_assertion(self, assertion: Callable, timeout=5) -> None:
93114
start_time = time.time()
94115
while time.time() - start_time < timeout:

0 commit comments

Comments
 (0)