From de000c05ae5d1a123fae7f4d2b3a304e6e15c89d Mon Sep 17 00:00:00 2001 From: Yibo-Chen13 Date: Wed, 27 Nov 2024 11:42:16 +0800 Subject: [PATCH] support json type --- proton_driver/block.py | 4 +- proton_driver/client.py | 11 +++- proton_driver/columns/arraycolumn.py | 27 ++++++--- proton_driver/columns/jsoncolumn.py | 39 +++++++++++++ proton_driver/columns/nestedcolumn.py | 71 ++-------------------- proton_driver/columns/service.py | 23 ++++++-- proton_driver/columns/tuplecolumn.py | 63 +++++++++----------- proton_driver/columns/util.py | 63 ++++++++++++++++++++ proton_driver/util/compat.py | 2 + tests/columns/test_json.py | 84 +++++++++++++++++++++++++++ tests/columns/test_nested.py | 20 ++++--- tests/docker-compose.yml | 2 + 12 files changed, 285 insertions(+), 124 deletions(-) create mode 100644 proton_driver/columns/jsoncolumn.py create mode 100644 proton_driver/columns/util.py create mode 100644 tests/columns/test_json.py diff --git a/proton_driver/block.py b/proton_driver/block.py index 2a1295c..8e3a995 100644 --- a/proton_driver/block.py +++ b/proton_driver/block.py @@ -1,7 +1,7 @@ from .reader import read_varint, read_binary_uint8, read_binary_int32 from .varint import write_varint from .writer import write_binary_uint8, write_binary_int32 -from .columns import nestedcolumn +from .columns.util import get_inner_columns_with_types class BlockInfo(object): @@ -172,7 +172,7 @@ def _pure_mutate_dicts_to_rows( for name, type_ in columns_with_types: cwt = None if type_.startswith('nested'): - cwt = nestedcolumn.get_columns_with_types(type_) + cwt = get_inner_columns_with_types('nested', type_) columns_with_cwt.append((name, cwt)) for i, row in enumerate(data): diff --git a/proton_driver/client.py b/proton_driver/client.py index c8c6f58..b79fbe6 100644 --- a/proton_driver/client.py +++ b/proton_driver/client.py @@ -49,6 +49,11 @@ class Client(object): * ``quota_key`` -- A string to differentiate quotas when the user have keyed quotas configured on server. New in version *0.2.3*. + * ``namedtuple_as_json`` -- Controls named tuple and nested types + deserialization. To interpret these column as + Python tuple set ``namedtuple_as_json`` + to ``False``. Default: False. + New in version *0.2.12*. """ available_client_settings = ( @@ -58,7 +63,8 @@ class Client(object): 'use_numpy', 'opentelemetry_traceparent', 'opentelemetry_tracestate', - 'quota_key' + 'quota_key', + 'namedtuple_as_json' ) def __init__(self, *args, **kwargs): @@ -85,6 +91,9 @@ def __init__(self, *args, **kwargs): ), 'quota_key': self.settings.pop( 'quota_key', '' + ), + 'namedtuple_as_json': self.settings.pop( + 'namedtuple_as_json', False ) } diff --git a/proton_driver/columns/arraycolumn.py b/proton_driver/columns/arraycolumn.py index abd866c..0ce336e 100644 --- a/proton_driver/columns/arraycolumn.py +++ b/proton_driver/columns/arraycolumn.py @@ -28,25 +28,31 @@ class ArrayColumn(Column): py_types = (list, tuple) def __init__(self, nested_column, **kwargs): - self.size_column = UInt64Column() + self.init_kwargs = kwargs + self.size_column = UInt64Column(**kwargs) self.nested_column = nested_column self._write_depth_0_size = True super(ArrayColumn, self).__init__(**kwargs) + self.null_value = [] def write_data(self, data, buf): # Column of Array(T) is stored in "compact" format and passed to server # wrapped into another Array without size of wrapper array. - self.nested_column = ArrayColumn(self.nested_column) + self.nested_column = ArrayColumn( + self.nested_column, **self.init_kwargs + ) self.nested_column.nullable = self.nullable self.nullable = False self._write_depth_0_size = False self._write(data, buf) - def read_data(self, rows, buf): - self.nested_column = ArrayColumn(self.nested_column) + def read_data(self, n_rows, buf): + self.nested_column = ArrayColumn( + self.nested_column, **self.init_kwargs + ) self.nested_column.nullable = self.nullable self.nullable = False - return self._read(rows, buf)[0] + return self._read(n_rows, buf)[0] def _write_sizes(self, value, buf): nulls_map = [] @@ -99,14 +105,19 @@ def _write_nulls_data(self, value, buf): self.nested_column._write_nulls_map(value, buf) def _write(self, value, buf): + value = self.prepare_items(value) self._write_sizes(value, buf) self._write_nulls_data(value, buf) self._write_data(value, buf) def read_state_prefix(self, buf): - return self.nested_column.read_state_prefix(buf) + super(ArrayColumn, self).read_state_prefix(buf) + + self.nested_column.read_state_prefix(buf) def write_state_prefix(self, buf): + super(ArrayColumn, self).write_state_prefix(buf) + self.nested_column.write_state_prefix(buf) def _read(self, size, buf): @@ -145,6 +156,6 @@ def _read(self, size, buf): return tuple(data) -def create_array_column(spec, column_by_spec_getter): +def create_array_column(spec, column_by_spec_getter, column_options): inner = spec[6:-1] - return ArrayColumn(column_by_spec_getter(inner)) + return ArrayColumn(column_by_spec_getter(inner), **column_options) diff --git a/proton_driver/columns/jsoncolumn.py b/proton_driver/columns/jsoncolumn.py new file mode 100644 index 0000000..f6bf810 --- /dev/null +++ b/proton_driver/columns/jsoncolumn.py @@ -0,0 +1,39 @@ +from .base import Column +from .stringcolumn import String +from ..reader import read_binary_uint8, read_binary_str +from ..util.compat import json +from ..writer import write_binary_uint8 + + +class JsonColumn(Column): + py_types = (dict, ) + + # No NULL value actually + null_value = {} + + def __init__(self, column_by_spec_getter, **kwargs): + self.column_by_spec_getter = column_by_spec_getter + self.string_column = String(**kwargs) + super(JsonColumn, self).__init__(**kwargs) + + def write_state_prefix(self, buf): + # Read in binary format. + # Write in text format. + write_binary_uint8(1, buf) + + def read_items(self, n_items, buf): + read_binary_uint8(buf) + spec = read_binary_str(buf) + col = self.column_by_spec_getter( + spec, dict(namedtuple_as_json=True) + ) + col.read_state_prefix(buf) + return col.read_data(n_items, buf) + + def write_items(self, items, buf): + items = [x if isinstance(x, str) else json.dumps(x) for x in items] + self.string_column.write_items(items, buf) + + +def create_json_column(spec, column_by_spec_getter, column_options): + return JsonColumn(column_by_spec_getter, **column_options) diff --git a/proton_driver/columns/nestedcolumn.py b/proton_driver/columns/nestedcolumn.py index 7d9b90e..827e57a 100644 --- a/proton_driver/columns/nestedcolumn.py +++ b/proton_driver/columns/nestedcolumn.py @@ -1,73 +1,10 @@ from .arraycolumn import create_array_column +from .util import get_inner_spec -def create_nested_column(spec, column_by_spec_getter): +def create_nested_column(spec, column_by_spec_getter, column_options): return create_array_column( - 'array(tuple({}))'.format(','.join(get_nested_columns(spec))), - column_by_spec_getter=column_by_spec_getter + 'array(tuple({}))'.format(get_inner_spec('nested', spec)), + column_by_spec_getter, column_options ) - - -def get_nested_columns(spec): - brackets = 0 - column_begin = 0 - - inner_spec = get_inner_spec(spec) - nested_columns = [] - for i, x in enumerate(inner_spec + ','): - if x == ',': - if brackets == 0: - nested_columns.append(inner_spec[column_begin:i]) - column_begin = i + 1 - elif x == '(': - brackets += 1 - elif x == ')': - brackets -= 1 - elif x == ' ': - if brackets == 0: - column_begin = i + 1 - return nested_columns - - -def get_columns_with_types(spec): - brackets = 0 - prev_comma = 0 - prev_space = 0 - - inner_spec = get_inner_spec(spec) - columns_with_types = [] - - for i, x in enumerate(inner_spec + ','): - if x == ',': - if brackets == 0: - columns_with_types.append(( - inner_spec[prev_comma:prev_space].strip(), - inner_spec[prev_space:i] - )) - prev_comma = i + 1 - elif x == '(': - brackets += 1 - elif x == ')': - brackets -= 1 - elif x == ' ': - if brackets == 0: - prev_space = i + 1 - return columns_with_types - - -def get_inner_spec(spec): - brackets = 0 - offset = len('nested') - i = offset - for i, ch in enumerate(spec[offset:], offset): - if ch == '(': - brackets += 1 - - elif ch == ')': - brackets -= 1 - - if brackets == 0: - break - - return spec[offset + 1:i] diff --git a/proton_driver/columns/service.py b/proton_driver/columns/service.py index 9b5088a..985d0b5 100644 --- a/proton_driver/columns/service.py +++ b/proton_driver/columns/service.py @@ -32,6 +32,7 @@ IntervalSecondColumn ) from .ipcolumn import IPv4Column, IPv6Column +from .jsoncolumn import create_json_column column_by_type = {c.ch_type: c for c in [ @@ -64,7 +65,11 @@ def get_column_by_spec(spec, column_options, use_numpy=None): logger.warning('NumPy support is not implemented for %s. ' 'Using generic column', spec) - def create_column_with_options(x): + def create_column_with_options(x, settings=None): + if settings: + client_settings = column_options['context'].client_settings + client_settings.update(settings) + column_options['context'].client_settings = client_settings return get_column_by_spec(x, column_options, use_numpy=use_numpy) if spec == 'string' or spec.startswith('fixed_string'): @@ -80,13 +85,23 @@ def create_column_with_options(x): return create_decimal_column(spec, column_options) elif spec.startswith('array'): - return create_array_column(spec, create_column_with_options) + return create_array_column( + spec, create_column_with_options, column_options + ) elif spec.startswith('tuple'): - return create_tuple_column(spec, create_column_with_options) + return create_tuple_column( + spec, create_column_with_options, column_options + ) + elif spec.startswith('json'): + return create_json_column( + spec, create_column_with_options, column_options + ) elif spec.startswith('nested'): - return create_nested_column(spec, create_column_with_options) + return create_nested_column( + spec, create_column_with_options, column_options + ) elif spec.startswith('nullable'): return create_nullable_column(spec, create_column_with_options) diff --git a/proton_driver/columns/tuplecolumn.py b/proton_driver/columns/tuplecolumn.py index 87ffc2e..2b5c4b8 100644 --- a/proton_driver/columns/tuplecolumn.py +++ b/proton_driver/columns/tuplecolumn.py @@ -1,15 +1,24 @@ from .base import Column +from .util import get_inner_columns_with_types class TupleColumn(Column): py_types = (list, tuple) - def __init__(self, nested_columns, **kwargs): + def __init__(self, names, nested_columns, **kwargs): + self.names = names self.nested_columns = nested_columns + client_settings = kwargs['context'].client_settings + self.namedtuple_as_json = client_settings.get( + 'namedtuple_as_json', False + ) + super(TupleColumn, self).__init__(**kwargs) + self.null_value = tuple(x.null_value for x in nested_columns) def write_data(self, items, buf): + items = self.prepare_items(items) items = list(zip(*items)) for i, x in enumerate(self.nested_columns): @@ -20,46 +29,32 @@ def write_items(self, items, buf): def read_data(self, n_items, buf): rv = [x.read_data(n_items, buf) for x in self.nested_columns] - return list(zip(*rv)) + rv = list(zip(*rv)) + + if self.names[0] and self.namedtuple_as_json: + return [dict(zip(self.names, x)) for x in rv] + else: + return rv def read_items(self, n_items, buf): return self.read_data(n_items, buf) + def read_state_prefix(self, buf): + super(TupleColumn, self).read_state_prefix(buf) -def create_tuple_column(spec, column_by_spec_getter): - brackets = 0 - column_begin = 0 - - inner_spec = get_inner_spec(spec) - nested_columns = [] - for i, x in enumerate(inner_spec + ','): - if x == ',': - if brackets == 0: - nested_columns.append(inner_spec[column_begin:i]) - column_begin = i + 1 - elif x == '(': - brackets += 1 - elif x == ')': - brackets -= 1 - elif x == ' ': - if brackets == 0: - column_begin = i + 1 - - return TupleColumn([column_by_spec_getter(x) for x in nested_columns]) + for x in self.nested_columns: + x.read_state_prefix(buf) + def write_state_prefix(self, buf): + super(TupleColumn, self).write_state_prefix(buf) -def get_inner_spec(spec): - brackets = 0 - offset = len('tuple') - i = offset - for i, ch in enumerate(spec[offset:], offset): - if ch == '(': - brackets += 1 + for x in self.nested_columns: + x.write_state_prefix(buf) - elif ch == ')': - brackets -= 1 - if brackets == 0: - break +def create_tuple_column(spec, column_by_spec_getter, column_options): + columns_with_types = get_inner_columns_with_types('tuple', spec) + names, types = zip(*columns_with_types) - return spec[offset + 1:i] + return TupleColumn(names, [column_by_spec_getter(x) for x in types], + **column_options) diff --git a/proton_driver/columns/util.py b/proton_driver/columns/util.py new file mode 100644 index 0000000..400bdd8 --- /dev/null +++ b/proton_driver/columns/util.py @@ -0,0 +1,63 @@ + +def get_inner_spec(column_name, spec): + brackets = 0 + offset = len(column_name) + + for i, ch in enumerate(spec[offset:], offset): + if ch == '(': + brackets += 1 + + elif ch == ')': + brackets -= 1 + + if brackets == 0: + break + + return spec[offset + 1:i] + + +def get_inner_columns(column_name, spec): + inner_spec = get_inner_spec(column_name, spec) + brackets = 0 + column_begin = 0 + + columns = [] + for i, x in enumerate(inner_spec + ','): + if x == ',': + if brackets == 0: + columns.append(inner_spec[column_begin:i]) + column_begin = i + 1 + elif x == '(': + brackets += 1 + elif x == ')': + brackets -= 1 + elif x == ' ': + if brackets == 0: + column_begin = i + 1 + return columns + + +def get_inner_columns_with_types(column_name, spec): + inner_spec = get_inner_spec(column_name, spec) + inner_spec = inner_spec.strip() + brackets = 0 + prev_comma = 0 + prev_space = 0 + + columns = [] + for i, x in enumerate(inner_spec.strip() + ','): + if x == ',': + if brackets == 0: + columns.append(( + inner_spec[prev_comma:prev_space].strip(), + inner_spec[prev_space:i] + )) + prev_comma = i + 1 + elif x == '(': + brackets += 1 + elif x == ')': + brackets -= 1 + elif x == ' ': + if brackets == 0: + prev_space = i + 1 + return columns diff --git a/proton_driver/util/compat.py b/proton_driver/util/compat.py index bdcf3d7..6b3e342 100644 --- a/proton_driver/util/compat.py +++ b/proton_driver/util/compat.py @@ -5,6 +5,8 @@ except ImportError: import dummy_threading as threading # noqa: F401 +import json # noqa: F401 + try: # since tzlocal 4.0+ # this will avoid warning for get_localzone().key diff --git a/tests/columns/test_json.py b/tests/columns/test_json.py new file mode 100644 index 0000000..2eea756 --- /dev/null +++ b/tests/columns/test_json.py @@ -0,0 +1,84 @@ +import json +from time import sleep +from tests.testcase import BaseTestCase + + +class JSONTestCase(BaseTestCase): + def test_simple(self): + rv = self.client.execute("SELECT '{\"bb\": {\"cc\": [255, 1]}}'::json") + self.assertEqual(rv, [({'bb': {'cc': [255, 1]}},)]) + + def test_from_table(self): + self.emit_cli('CREATE STREAM test (a json)') + data = [ + ({},), + ({'key1': 1}, ), + ({'key1': 2.1, 'key2': {'nested': 'key'}}, ), + ({'key1': 3, 'key3': ['test'], 'key4': [10, 20]}, ) + ] + self.client.execute('INSERT INTO test (a) VALUES', data) + sleep(3) + query = 'SELECT a FROM table(test)' + inserted = self.client.execute(query) + self.assertEqual( + inserted, + [ + ((0.0, ('',), [], []),), + ((1.0, ('',), [], []),), + ((2.1, ('key',), [], []),), + ((3.0, ('',), ['test'], [10, 20]),) + ] + ) + inserted = self.client.execute( + query, settings=dict(namedtuple_as_json=True) + ) + data_with_all_keys = [ + ({'key1': 0, 'key2': {'nested': ''}, 'key3': [], 'key4': []},), + ({'key1': 1, 'key2': {'nested': ''}, 'key3': [], 'key4': []},), + ({'key1': 2.1, 'key2': {'nested': 'key'}, 'key3': [], + 'key4': []},), + ({'key1': 3, 'key2': {'nested': ''}, 'key3': ['test'], + 'key4': [10, 20]},) + ] + self.assertEqual(inserted, data_with_all_keys) + self.emit_cli('DROP STREAM test') + + def test_insert_json_strings(self): + self.emit_cli('CREATE STREAM test (a json)') + data = [ + (json.dumps({'i-am': 'dumped json'}),), + ] + self.client.execute('INSERT INTO test (a) VALUES', data) + sleep(3) + query = 'SELECT a FROM table(test)' + inserted = self.client.execute(query) + self.assertEqual( + inserted, + [(('dumped json',),)] + ) + inserted = self.client.execute( + query, settings=dict(namedtuple_as_json=True) + ) + data_with_all_keys = [ + ({'`i-am`': 'dumped json'},) + ] + self.assertEqual(inserted, data_with_all_keys) + self.emit_cli('DROP STREAM test') + + def test_json_as_named_tuple(self): + settings = {'namedtuple_as_json': True} + query = 'SELECT a FROM table(test)' + + self.emit_cli('CREATE STREAM test (a json)') + data = [ + ({'key': 'value'}, ), + ] + self.client.execute('INSERT INTO test (a) VALUES', data) + sleep(3) + inserted = self.client.execute(query) + self.assertEqual(inserted, [(('value',),)]) + + with self.created_client(settings=settings) as client: + inserted = client.execute(query) + self.assertEqual(inserted, data) + self.emit_cli('DROP STREAM test') diff --git a/tests/columns/test_nested.py b/tests/columns/test_nested.py index 0025652..6ce0b74 100644 --- a/tests/columns/test_nested.py +++ b/tests/columns/test_nested.py @@ -1,5 +1,9 @@ from tests.testcase import BaseTestCase -from proton_driver.columns import nestedcolumn +from proton_driver.columns.util import ( + get_inner_spec, + get_inner_columns, + get_inner_columns_with_types +) class NestedTestCase(BaseTestCase): @@ -85,24 +89,24 @@ def test_dict(self): ) def test_get_nested_columns(self): + spec = 'nested(a tuple(array(int8)),\n b nullable(string))' + columns = get_inner_columns('nested', spec) self.assertEqual( - nestedcolumn.get_nested_columns( - 'nested(a tuple(array(int8)),\n b nullable(string))', - ), + columns, ['tuple(array(int8))', 'nullable(string)'] ) def test_get_columns_with_types(self): + spec = 'nested(a tuple(array(int8)),\n b nullable(string))' + columns = get_inner_columns_with_types('nested', spec) self.assertEqual( - nestedcolumn.get_columns_with_types( - 'nested(a tuple(array(int8)),\n b nullable(string))', - ), + columns, [('a', 'tuple(array(int8))'), ('b', 'nullable(string)')] ) def test_get_inner_spec(self): inner = 'a tuple(array(int8), array(int64)), b nullable(string)' self.assertEqual( - nestedcolumn.get_inner_spec('nested({}) dummy '.format(inner)), + get_inner_spec('nested', 'nested({}) dummy '.format(inner)), inner ) diff --git a/tests/docker-compose.yml b/tests/docker-compose.yml index 7f0971f..458289b 100644 --- a/tests/docker-compose.yml +++ b/tests/docker-compose.yml @@ -10,6 +10,8 @@ services: - "127.0.0.1:8463:8463" command: > /bin/bash -c "echo sleeping; sleep 2; /entrypoint.sh" + volumes: + - /mnt/timeplusd:/var/lib/timeplusd proton-client: image: "timeplus/timeplusd:latest"