Skip to content

Commit 359cf0a

Browse files
committed
support json type
1 parent 2e6331e commit 359cf0a

File tree

12 files changed

+297
-124
lines changed

12 files changed

+297
-124
lines changed

proton_driver/block.py

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
from .reader import read_varint, read_binary_uint8, read_binary_int32
22
from .varint import write_varint
33
from .writer import write_binary_uint8, write_binary_int32
4-
from .columns import nestedcolumn
4+
from .columns.util import get_inner_spec, get_inner_columns_with_types
55

66

77
class BlockInfo(object):
@@ -172,7 +172,8 @@ def _pure_mutate_dicts_to_rows(
172172
for name, type_ in columns_with_types:
173173
cwt = None
174174
if type_.startswith('nested'):
175-
cwt = nestedcolumn.get_columns_with_types(type_)
175+
inner_spec = get_inner_spec('nested', type_)
176+
cwt = get_inner_columns_with_types(inner_spec)
176177
columns_with_cwt.append((name, cwt))
177178

178179
for i, row in enumerate(data):

proton_driver/client.py

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -49,6 +49,11 @@ class Client(object):
4949
* ``quota_key`` -- A string to differentiate quotas when the user have
5050
keyed quotas configured on server.
5151
New in version *0.2.3*.
52+
* ``namedtuple_as_json`` -- Controls named tuple and nested types
53+
deserialization. To interpret these column as
54+
Python tuple set ``namedtuple_as_json``
55+
to ``False``. Default: False.
56+
New in version *0.2.12*.
5257
"""
5358

5459
available_client_settings = (
@@ -58,7 +63,8 @@ class Client(object):
5863
'use_numpy',
5964
'opentelemetry_traceparent',
6065
'opentelemetry_tracestate',
61-
'quota_key'
66+
'quota_key',
67+
'namedtuple_as_json'
6268
)
6369

6470
def __init__(self, *args, **kwargs):
@@ -85,6 +91,9 @@ def __init__(self, *args, **kwargs):
8591
),
8692
'quota_key': self.settings.pop(
8793
'quota_key', ''
94+
),
95+
'namedtuple_as_json': self.settings.pop(
96+
'namedtuple_as_json', False
8897
)
8998
}
9099

proton_driver/columns/arraycolumn.py

Lines changed: 19 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -28,25 +28,31 @@ class ArrayColumn(Column):
2828
py_types = (list, tuple)
2929

3030
def __init__(self, nested_column, **kwargs):
31-
self.size_column = UInt64Column()
31+
self.init_kwargs = kwargs
32+
self.size_column = UInt64Column(**kwargs)
3233
self.nested_column = nested_column
3334
self._write_depth_0_size = True
3435
super(ArrayColumn, self).__init__(**kwargs)
36+
self.null_value = []
3537

3638
def write_data(self, data, buf):
3739
# Column of Array(T) is stored in "compact" format and passed to server
3840
# wrapped into another Array without size of wrapper array.
39-
self.nested_column = ArrayColumn(self.nested_column)
41+
self.nested_column = ArrayColumn(
42+
self.nested_column, **self.init_kwargs
43+
)
4044
self.nested_column.nullable = self.nullable
4145
self.nullable = False
4246
self._write_depth_0_size = False
4347
self._write(data, buf)
4448

45-
def read_data(self, rows, buf):
46-
self.nested_column = ArrayColumn(self.nested_column)
49+
def read_data(self, n_rows, buf):
50+
self.nested_column = ArrayColumn(
51+
self.nested_column, **self.init_kwargs
52+
)
4753
self.nested_column.nullable = self.nullable
4854
self.nullable = False
49-
return self._read(rows, buf)[0]
55+
return self._read(n_rows, buf)[0]
5056

5157
def _write_sizes(self, value, buf):
5258
nulls_map = []
@@ -99,14 +105,19 @@ def _write_nulls_data(self, value, buf):
99105
self.nested_column._write_nulls_map(value, buf)
100106

101107
def _write(self, value, buf):
108+
value = self.prepare_items(value)
102109
self._write_sizes(value, buf)
103110
self._write_nulls_data(value, buf)
104111
self._write_data(value, buf)
105112

106113
def read_state_prefix(self, buf):
107-
return self.nested_column.read_state_prefix(buf)
114+
super(ArrayColumn, self).read_state_prefix(buf)
115+
116+
self.nested_column.read_state_prefix(buf)
108117

109118
def write_state_prefix(self, buf):
119+
super(ArrayColumn, self).write_state_prefix(buf)
120+
110121
self.nested_column.write_state_prefix(buf)
111122

112123
def _read(self, size, buf):
@@ -145,6 +156,6 @@ def _read(self, size, buf):
145156
return tuple(data)
146157

147158

148-
def create_array_column(spec, column_by_spec_getter):
159+
def create_array_column(spec, column_by_spec_getter, column_options):
149160
inner = spec[6:-1]
150-
return ArrayColumn(column_by_spec_getter(inner))
161+
return ArrayColumn(column_by_spec_getter(inner), **column_options)

proton_driver/columns/jsoncolumn.py

Lines changed: 39 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,39 @@
1+
from .base import Column
2+
from .stringcolumn import String
3+
from ..reader import read_binary_uint8, read_binary_str
4+
from ..util.compat import json
5+
from ..writer import write_binary_uint8
6+
7+
8+
class JsonColumn(Column):
9+
py_types = (dict, )
10+
11+
# No NULL value actually
12+
null_value = {}
13+
14+
def __init__(self, column_by_spec_getter, **kwargs):
15+
self.column_by_spec_getter = column_by_spec_getter
16+
self.string_column = String(**kwargs)
17+
super(JsonColumn, self).__init__(**kwargs)
18+
19+
def write_state_prefix(self, buf):
20+
# Read in binary format.
21+
# Write in text format.
22+
write_binary_uint8(1, buf)
23+
24+
def read_items(self, n_items, buf):
25+
read_binary_uint8(buf)
26+
spec = read_binary_str(buf)
27+
col = self.column_by_spec_getter(
28+
spec, dict(namedtuple_as_json=True)
29+
)
30+
col.read_state_prefix(buf)
31+
return col.read_data(n_items, buf)
32+
33+
def write_items(self, items, buf):
34+
items = [x if isinstance(x, str) else json.dumps(x) for x in items]
35+
self.string_column.write_items(items, buf)
36+
37+
38+
def create_json_column(spec, column_by_spec_getter, column_options):
39+
return JsonColumn(column_by_spec_getter, **column_options)

proton_driver/columns/nestedcolumn.py

Lines changed: 4 additions & 67 deletions
Original file line numberDiff line numberDiff line change
@@ -1,73 +1,10 @@
11

22
from .arraycolumn import create_array_column
3+
from .util import get_inner_spec
34

45

5-
def create_nested_column(spec, column_by_spec_getter):
6+
def create_nested_column(spec, column_by_spec_getter, column_options):
67
return create_array_column(
7-
'array(tuple({}))'.format(','.join(get_nested_columns(spec))),
8-
column_by_spec_getter=column_by_spec_getter
8+
'array(tuple({}))'.format(get_inner_spec('nested', spec)),
9+
column_by_spec_getter, column_options
910
)
10-
11-
12-
def get_nested_columns(spec):
13-
brackets = 0
14-
column_begin = 0
15-
16-
inner_spec = get_inner_spec(spec)
17-
nested_columns = []
18-
for i, x in enumerate(inner_spec + ','):
19-
if x == ',':
20-
if brackets == 0:
21-
nested_columns.append(inner_spec[column_begin:i])
22-
column_begin = i + 1
23-
elif x == '(':
24-
brackets += 1
25-
elif x == ')':
26-
brackets -= 1
27-
elif x == ' ':
28-
if brackets == 0:
29-
column_begin = i + 1
30-
return nested_columns
31-
32-
33-
def get_columns_with_types(spec):
34-
brackets = 0
35-
prev_comma = 0
36-
prev_space = 0
37-
38-
inner_spec = get_inner_spec(spec)
39-
columns_with_types = []
40-
41-
for i, x in enumerate(inner_spec + ','):
42-
if x == ',':
43-
if brackets == 0:
44-
columns_with_types.append((
45-
inner_spec[prev_comma:prev_space].strip(),
46-
inner_spec[prev_space:i]
47-
))
48-
prev_comma = i + 1
49-
elif x == '(':
50-
brackets += 1
51-
elif x == ')':
52-
brackets -= 1
53-
elif x == ' ':
54-
if brackets == 0:
55-
prev_space = i + 1
56-
return columns_with_types
57-
58-
59-
def get_inner_spec(spec):
60-
brackets = 0
61-
offset = len('nested')
62-
i = offset
63-
for i, ch in enumerate(spec[offset:], offset):
64-
if ch == '(':
65-
brackets += 1
66-
67-
elif ch == ')':
68-
brackets -= 1
69-
70-
if brackets == 0:
71-
break
72-
73-
return spec[offset + 1:i]

proton_driver/columns/service.py

Lines changed: 19 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,7 @@
3232
IntervalSecondColumn
3333
)
3434
from .ipcolumn import IPv4Column, IPv6Column
35+
from .jsoncolumn import create_json_column
3536

3637

3738
column_by_type = {c.ch_type: c for c in [
@@ -64,7 +65,11 @@ def get_column_by_spec(spec, column_options, use_numpy=None):
6465
logger.warning('NumPy support is not implemented for %s. '
6566
'Using generic column', spec)
6667

67-
def create_column_with_options(x):
68+
def create_column_with_options(x, settings=None):
69+
if settings:
70+
client_settings = column_options['context'].client_settings
71+
client_settings.update(settings)
72+
column_options['context'].client_settings = client_settings
6873
return get_column_by_spec(x, column_options, use_numpy=use_numpy)
6974

7075
if spec == 'string' or spec.startswith('fixed_string'):
@@ -80,13 +85,23 @@ def create_column_with_options(x):
8085
return create_decimal_column(spec, column_options)
8186

8287
elif spec.startswith('array'):
83-
return create_array_column(spec, create_column_with_options)
88+
return create_array_column(
89+
spec, create_column_with_options, column_options
90+
)
8491

8592
elif spec.startswith('tuple'):
86-
return create_tuple_column(spec, create_column_with_options)
93+
return create_tuple_column(
94+
spec, create_column_with_options, column_options
95+
)
96+
elif spec.startswith('json'):
97+
return create_json_column(
98+
spec, create_column_with_options, column_options
99+
)
87100

88101
elif spec.startswith('nested'):
89-
return create_nested_column(spec, create_column_with_options)
102+
return create_nested_column(
103+
spec, create_column_with_options, column_options
104+
)
90105

91106
elif spec.startswith('nullable'):
92107
return create_nullable_column(spec, create_column_with_options)

proton_driver/columns/tuplecolumn.py

Lines changed: 30 additions & 34 deletions
Original file line numberDiff line numberDiff line change
@@ -1,15 +1,24 @@
11

22
from .base import Column
3+
from .util import get_inner_spec, get_inner_columns_with_types
34

45

56
class TupleColumn(Column):
67
py_types = (list, tuple)
78

8-
def __init__(self, nested_columns, **kwargs):
9+
def __init__(self, names, nested_columns, **kwargs):
10+
self.names = names
911
self.nested_columns = nested_columns
12+
client_settings = kwargs['context'].client_settings
13+
self.namedtuple_as_json = client_settings.get(
14+
'namedtuple_as_json', False
15+
)
16+
1017
super(TupleColumn, self).__init__(**kwargs)
18+
self.null_value = tuple(x.null_value for x in nested_columns)
1119

1220
def write_data(self, items, buf):
21+
items = self.prepare_items(items)
1322
items = list(zip(*items))
1423

1524
for i, x in enumerate(self.nested_columns):
@@ -20,46 +29,33 @@ def write_items(self, items, buf):
2029

2130
def read_data(self, n_items, buf):
2231
rv = [x.read_data(n_items, buf) for x in self.nested_columns]
23-
return list(zip(*rv))
32+
rv = list(zip(*rv))
33+
34+
if self.names[0] and self.namedtuple_as_json:
35+
return [dict(zip(self.names, x)) for x in rv]
36+
else:
37+
return rv
2438

2539
def read_items(self, n_items, buf):
2640
return self.read_data(n_items, buf)
2741

42+
def read_state_prefix(self, buf):
43+
super(TupleColumn, self).read_state_prefix(buf)
2844

29-
def create_tuple_column(spec, column_by_spec_getter):
30-
brackets = 0
31-
column_begin = 0
32-
33-
inner_spec = get_inner_spec(spec)
34-
nested_columns = []
35-
for i, x in enumerate(inner_spec + ','):
36-
if x == ',':
37-
if brackets == 0:
38-
nested_columns.append(inner_spec[column_begin:i])
39-
column_begin = i + 1
40-
elif x == '(':
41-
brackets += 1
42-
elif x == ')':
43-
brackets -= 1
44-
elif x == ' ':
45-
if brackets == 0:
46-
column_begin = i + 1
47-
48-
return TupleColumn([column_by_spec_getter(x) for x in nested_columns])
45+
for x in self.nested_columns:
46+
x.read_state_prefix(buf)
4947

48+
def write_state_prefix(self, buf):
49+
super(TupleColumn, self).write_state_prefix(buf)
5050

51-
def get_inner_spec(spec):
52-
brackets = 0
53-
offset = len('tuple')
54-
i = offset
55-
for i, ch in enumerate(spec[offset:], offset):
56-
if ch == '(':
57-
brackets += 1
51+
for x in self.nested_columns:
52+
x.write_state_prefix(buf)
5853

59-
elif ch == ')':
60-
brackets -= 1
6154

62-
if brackets == 0:
63-
break
55+
def create_tuple_column(spec, column_by_spec_getter, column_options):
56+
inner_spec = get_inner_spec('tuple', spec)
57+
columns_with_types = get_inner_columns_with_types(inner_spec)
58+
names, types = zip(*columns_with_types)
6459

65-
return spec[offset + 1:i]
60+
return TupleColumn(names, [column_by_spec_getter(x) for x in types],
61+
**column_options)

0 commit comments

Comments
 (0)