Skip to content

Commit df17542

Browse files
committed
iproto: support feature push
Adds support for receiving out-of-band messages from a server that uses box.session.push call. Data obtaining is possible for methods: `call`, `eval`, `select`, `insert`, `replace`, `update`, `upsert`, `delete`. To do this, an optional argument `on_push_ctx` is used in the form of a python list, where the received data from out-of-band messages will be added. Closes #201
1 parent a94f97c commit df17542

File tree

3 files changed

+110
-23
lines changed

3 files changed

+110
-23
lines changed

tarantool/connection.py

+108-22
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@
77
import time
88
import errno
99
import socket
10+
import copy
1011
try:
1112
import ssl
1213
is_ssl_supported = True
@@ -55,7 +56,8 @@
5556
REQUEST_TYPE_ERROR,
5657
IPROTO_GREETING_SIZE,
5758
ITERATOR_EQ,
58-
ITERATOR_ALL
59+
ITERATOR_ALL,
60+
IPROTO_CHUNK
5961
)
6062
from tarantool.error import (
6163
Error,
@@ -748,7 +750,7 @@ def _read_response(self):
748750
# Read the packet
749751
return self._recv(length)
750752

751-
def _send_request_wo_reconnect(self, request):
753+
def _send_request_wo_reconnect(self, request, pushed_data=[], on_push=None, on_push_res=[]):
752754
"""
753755
Send request without trying to reconnect.
754756
Reload schema, if required.
@@ -767,12 +769,30 @@ def _send_request_wo_reconnect(self, request):
767769

768770
assert isinstance(request, Request)
769771

772+
# Flag for detecting the last message as out-of-band.
773+
iproto_chunk_detected = False
774+
770775
response = None
771776
while True:
772777
try:
773-
self._socket.sendall(bytes(request))
778+
if not iproto_chunk_detected:
779+
# If the last received message is out-of-band,
780+
# the request will not be sent.
781+
self._socket.sendall(bytes(request))
774782
response = request.response_class(self, self._read_response())
775-
break
783+
if response._code == IPROTO_CHUNK:
784+
# Сase of receiving an out-of-band message.
785+
pushed_data.append(copy.deepcopy(response._data))
786+
if callable(on_push):
787+
# Callback function with data from out-of-band
788+
# message is being called.
789+
on_push_res.append(on_push(copy.deepcopy(response._data)))
790+
iproto_chunk_detected = True
791+
# Receiving the next message.
792+
continue
793+
else:
794+
# Сase of receiving main message.
795+
break
776796
except SchemaReloadException as e:
777797
self.update_schema(e.schema_version)
778798
continue
@@ -851,7 +871,7 @@ def check(): # Check that connection is alive
851871
self.wrap_socket_ssl()
852872
self.handshake()
853873

854-
def _send_request(self, request):
874+
def _send_request(self, request, pushed_data=[], on_push=None, on_push_res=[]):
855875
"""
856876
Send a request to the server through the socket.
857877
@@ -872,7 +892,7 @@ def _send_request(self, request):
872892

873893
self._opt_reconnect()
874894

875-
return self._send_request_wo_reconnect(request)
895+
return self._send_request_wo_reconnect(request, pushed_data, on_push, on_push_res)
876896

877897
def load_schema(self):
878898
"""
@@ -914,7 +934,7 @@ def flush_schema(self):
914934
self.schema.flush()
915935
self.load_schema()
916936

917-
def call(self, func_name, *args):
937+
def call(self, func_name, *args, **kwargs):
918938
"""
919939
Execute a CALL request: call a stored Lua function.
920940
@@ -930,19 +950,30 @@ def call(self, func_name, *args):
930950
:exc:`~tarantool.error.SchemaError`,
931951
:exc:`~tarantool.error.NetworkError`,
932952
:exc:`~tarantool.error.SslError`
953+
954+
!!!
955+
TODO: write docs
956+
!!!
933957
"""
934958

935959
assert isinstance(func_name, str)
936960

937961
# This allows to use a tuple or list as an argument
938962
if len(args) == 1 and isinstance(args[0], (list, tuple)):
939963
args = args[0]
964+
# Case for absence of optional arg for accepting out-of-band msg data
965+
if not 'pushed_data' in kwargs:
966+
kwargs['pushed_data'] = []
967+
if not 'on_push' in kwargs:
968+
kwargs['on_push'] = None
969+
if not 'on_push_res' in kwargs:
970+
kwargs['on_push_res'] = []
940971

941972
request = RequestCall(self, func_name, args, self.call_16)
942-
response = self._send_request(request)
973+
response = self._send_request(request, kwargs['pushed_data'], kwargs['on_push'], kwargs['on_push_res'])
943974
return response
944975

945-
def eval(self, expr, *args):
976+
def eval(self, expr, *args, **kwargs):
946977
"""
947978
Execute an EVAL request: evaluate a Lua expression.
948979
@@ -966,12 +997,19 @@ def eval(self, expr, *args):
966997
# This allows to use a tuple or list as an argument
967998
if len(args) == 1 and isinstance(args[0], (list, tuple)):
968999
args = args[0]
1000+
# Case for absence of optional arg for accepting out-of-band msg data
1001+
if not 'pushed_data' in kwargs:
1002+
kwargs['pushed_data'] = []
1003+
if not 'on_push' in kwargs:
1004+
kwargs['on_push'] = None
1005+
if not 'on_push_res' in kwargs:
1006+
kwargs['on_push_res'] = []
9691007

9701008
request = RequestEval(self, expr, args)
971-
response = self._send_request(request)
1009+
response = self._send_request(request, kwargs['pushed_data'], kwargs['on_push'], kwargs['on_push_res'])
9721010
return response
9731011

974-
def replace(self, space_name, values):
1012+
def replace(self, space_name, values, **kwargs):
9751013
"""
9761014
Execute a REPLACE request: `replace`_ a tuple in the space.
9771015
Doesn't throw an error if there is no tuple with the specified
@@ -994,10 +1032,18 @@ def replace(self, space_name, values):
9941032
.. _replace: https://www.tarantool.io/en/doc/latest/reference/reference_lua/box_space/replace/
9951033
"""
9961034

1035+
# Case for absence of optional arg for accepting out-of-band msg data
1036+
if not 'pushed_data' in kwargs:
1037+
kwargs['pushed_data'] = []
1038+
if not 'on_push' in kwargs:
1039+
kwargs['on_push'] = None
1040+
if not 'on_push_res' in kwargs:
1041+
kwargs['on_push_res'] = []
1042+
9971043
if isinstance(space_name, str):
9981044
space_name = self.schema.get_space(space_name).sid
9991045
request = RequestReplace(self, space_name, values)
1000-
return self._send_request(request)
1046+
return self._send_request(request, kwargs['pushed_data'], kwargs['on_push'], kwargs['on_push_res'])
10011047

10021048
def authenticate(self, user, password):
10031049
"""
@@ -1149,7 +1195,7 @@ def subscribe(self, cluster_uuid, server_uuid, vclock=None):
11491195
return
11501196
self.close() # close connection after SUBSCRIBE
11511197

1152-
def insert(self, space_name, values):
1198+
def insert(self, space_name, values, **kwargs):
11531199
"""
11541200
Execute an INSERT request: `insert`_ a tuple to the space.
11551201
Throws an error if there is already a tuple with the same
@@ -1172,12 +1218,20 @@ def insert(self, space_name, values):
11721218
.. _insert: https://www.tarantool.io/en/doc/latest/reference/reference_lua/box_space/insert/
11731219
"""
11741220

1221+
# Case for absence of optional arg for accepting out-of-band msg data
1222+
if not 'pushed_data' in kwargs:
1223+
kwargs['pushed_data'] = []
1224+
if not 'on_push' in kwargs:
1225+
kwargs['on_push'] = None
1226+
if not 'on_push_res' in kwargs:
1227+
kwargs['on_push_res'] = []
1228+
11751229
if isinstance(space_name, str):
11761230
space_name = self.schema.get_space(space_name).sid
11771231
request = RequestInsert(self, space_name, values)
1178-
return self._send_request(request)
1232+
return self._send_request(request, kwargs['pushed_data'], kwargs['on_push'], kwargs['on_push_res'])
11791233

1180-
def delete(self, space_name, key, *, index=0):
1234+
def delete(self, space_name, key, *, index=0, **kwargs):
11811235
"""
11821236
Execute a DELETE request: `delete`_ a tuple in the space.
11831237
@@ -1202,15 +1256,23 @@ def delete(self, space_name, key, *, index=0):
12021256
.. _delete: https://www.tarantool.io/en/doc/latest/reference/reference_lua/box_space/delete/
12031257
"""
12041258

1259+
# Case for absence of optional arg for accepting out-of-band msg data
1260+
if not 'pushed_data' in kwargs:
1261+
kwargs['pushed_data'] = []
1262+
if not 'on_push' in kwargs:
1263+
kwargs['on_push'] = None
1264+
if not 'on_push_res' in kwargs:
1265+
kwargs['on_push_res'] = []
1266+
12051267
key = check_key(key)
12061268
if isinstance(space_name, str):
12071269
space_name = self.schema.get_space(space_name).sid
12081270
if isinstance(index, str):
12091271
index = self.schema.get_index(space_name, index).iid
12101272
request = RequestDelete(self, space_name, index, key)
1211-
return self._send_request(request)
1273+
return self._send_request(request, kwargs['pushed_data'], kwargs['on_push'], kwargs['on_push_res'])
12121274

1213-
def upsert(self, space_name, tuple_value, op_list, *, index=0):
1275+
def upsert(self, space_name, tuple_value, op_list, *, index=0, **kwargs):
12141276
"""
12151277
Execute an UPSERT request: `upsert`_ a tuple to the space.
12161278
@@ -1252,16 +1314,24 @@ def upsert(self, space_name, tuple_value, op_list, *, index=0):
12521314
.. _upsert: https://www.tarantool.io/en/doc/latest/reference/reference_lua/box_space/upsert/
12531315
"""
12541316

1317+
# Case for absence of optional arg for accepting out-of-band msg data
1318+
if not 'pushed_data' in kwargs:
1319+
kwargs['pushed_data'] = []
1320+
if not 'on_push' in kwargs:
1321+
kwargs['on_push'] = None
1322+
if not 'on_push_res' in kwargs:
1323+
kwargs['on_push_res'] = []
1324+
12551325
if isinstance(space_name, str):
12561326
space_name = self.schema.get_space(space_name).sid
12571327
if isinstance(index, str):
12581328
index = self.schema.get_index(space_name, index).iid
12591329
op_list = self._ops_process(space_name, op_list)
12601330
request = RequestUpsert(self, space_name, index, tuple_value,
12611331
op_list)
1262-
return self._send_request(request)
1332+
return self._send_request(request, kwargs['pushed_data'], kwargs['on_push'], kwargs['on_push_res'])
12631333

1264-
def update(self, space_name, key, op_list, *, index=0):
1334+
def update(self, space_name, key, op_list, *, index=0, **kwargs):
12651335
"""
12661336
Execute an UPDATE request: `update`_ a tuple in the space.
12671337
@@ -1331,14 +1401,22 @@ def update(self, space_name, key, op_list, *, index=0):
13311401
.. _update: https://www.tarantool.io/en/doc/latest/reference/reference_lua/box_space/update/
13321402
"""
13331403

1404+
# Case for absence of optional arg for accepting out-of-band msg data
1405+
if not 'pushed_data' in kwargs:
1406+
kwargs['pushed_data'] = []
1407+
if not 'on_push' in kwargs:
1408+
kwargs['on_push'] = None
1409+
if not 'on_push_res' in kwargs:
1410+
kwargs['on_push_res'] = []
1411+
13341412
key = check_key(key)
13351413
if isinstance(space_name, str):
13361414
space_name = self.schema.get_space(space_name).sid
13371415
if isinstance(index, str):
13381416
index = self.schema.get_index(space_name, index).iid
13391417
op_list = self._ops_process(space_name, op_list)
13401418
request = RequestUpdate(self, space_name, index, key, op_list)
1341-
return self._send_request(request)
1419+
return self._send_request(request, kwargs['pushed_data'], kwargs['on_push'], kwargs['on_push_res'])
13421420

13431421
def ping(self, notime=False):
13441422
"""
@@ -1368,7 +1446,7 @@ def ping(self, notime=False):
13681446
return "Success"
13691447
return t1 - t0
13701448

1371-
def select(self, space_name, key=None, *, offset=0, limit=0xffffffff, index=0, iterator=None):
1449+
def select(self, space_name, key=None, *, offset=0, limit=0xffffffff, index=0, iterator=None, **kwargs):
13721450
"""
13731451
Execute a SELECT request: `select`_ a tuple from the space.
13741452
@@ -1518,13 +1596,21 @@ def select(self, space_name, key=None, *, offset=0, limit=0xffffffff, index=0, i
15181596
# tuples)
15191597
key = check_key(key, select=True)
15201598

1599+
# Case for absence of optional arg for accepting out-of-band msg data
1600+
if not 'pushed_data' in kwargs:
1601+
kwargs['pushed_data'] = []
1602+
if not 'on_push' in kwargs:
1603+
kwargs['on_push'] = None
1604+
if not 'on_push_res' in kwargs:
1605+
kwargs['on_push_res'] = []
1606+
15211607
if isinstance(space_name, str):
15221608
space_name = self.schema.get_space(space_name).sid
15231609
if isinstance(index, str):
15241610
index = self.schema.get_index(space_name, index).iid
15251611
request = RequestSelect(self, space_name, index, key, offset,
15261612
limit, iterator)
1527-
response = self._send_request(request)
1613+
response = self._send_request(request, kwargs['pushed_data'], kwargs['on_push'], kwargs['on_push_res'])
15281614
return response
15291615

15301616
def space(self, space_name):

tarantool/const.py

+1
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,7 @@
3838

3939
IPROTO_GREETING_SIZE = 128
4040
IPROTO_BODY_MAX_LEN = 2147483648
41+
IPROTO_CHUNK=0x80
4142

4243
REQUEST_TYPE_OK = 0
4344
REQUEST_TYPE_SELECT = 1

tarantool/mesh_connection.py

+1-1
Original file line numberDiff line numberDiff line change
@@ -581,7 +581,7 @@ def _opt_refresh_instances(self):
581581
update_connection(self, addr)
582582
self._opt_reconnect()
583583

584-
def _send_request(self, request):
584+
def _send_request(self, request, pushed_data=[], on_push=None, on_push_res=[]):
585585
"""
586586
Send a request to a Tarantool server. If required, refresh
587587
addresses list before sending a request.

0 commit comments

Comments
 (0)