Skip to content

Commit cbbe900

Browse files
committed
iproto: support feature push
Adds support for receiving out-of-band messages from a server that uses box.session.push call. Data obtaining is possible for methods: `call`, `eval`, `select`, `insert`, `replace`, `update`, `upsert`, `delete`. To do this, optional arguments `on_push` and `on_push_ctx` are used for this methods. Closes #201
1 parent a94f97c commit cbbe900

File tree

4 files changed

+73
-47
lines changed

4 files changed

+73
-47
lines changed

tarantool/connection.py

+55-30
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@
77
import time
88
import errno
99
import socket
10+
import copy
1011
try:
1112
import ssl
1213
is_ssl_supported = True
@@ -55,7 +56,8 @@
5556
REQUEST_TYPE_ERROR,
5657
IPROTO_GREETING_SIZE,
5758
ITERATOR_EQ,
58-
ITERATOR_ALL
59+
ITERATOR_ALL,
60+
IPROTO_CHUNK
5961
)
6062
from tarantool.error import (
6163
Error,
@@ -151,55 +153,56 @@ def connect(self):
151153
raise NotImplementedError
152154

153155
@abc.abstractmethod
154-
def call(self, func_name, *args):
156+
def call(self, func_name, *args, on_push=None, on_push_ctx=None):
155157
"""
156158
Reference implementation: :meth:`~tarantool.Connection.call`.
157159
"""
158160

159161
raise NotImplementedError
160162

161163
@abc.abstractmethod
162-
def eval(self, expr, *args):
164+
def eval(self, expr, *args, on_push=None, on_push_ctx=None):
163165
"""
164166
Reference implementation: :meth:`~tarantool.Connection.eval`.
165167
"""
166168

167169
raise NotImplementedError
168170

169171
@abc.abstractmethod
170-
def replace(self, space_name, values):
172+
def replace(self, space_name, values, on_push=None, on_push_ctx=None):
171173
"""
172174
Reference implementation: :meth:`~tarantool.Connection.replace`.
173175
"""
174176

175177
raise NotImplementedError
176178

177179
@abc.abstractmethod
178-
def insert(self, space_name, values):
180+
def insert(self, space_name, values, on_push=None, on_push_ctx=None):
179181
"""
180182
Reference implementation: :meth:`~tarantool.Connection.insert`.
181183
"""
182184

183185
raise NotImplementedError
184186

185187
@abc.abstractmethod
186-
def delete(self, space_name, key, *, index=None):
188+
def delete(self, space_name, key, *, index=None, on_push=None, on_push_ctx=None):
187189
"""
188190
Reference implementation: :meth:`~tarantool.Connection.delete`.
189191
"""
190192

191193
raise NotImplementedError
192194

193195
@abc.abstractmethod
194-
def upsert(self, space_name, tuple_value, op_list, *, index=None):
196+
def upsert(self, space_name, tuple_value, op_list, *, index=None,
197+
on_push=None, on_push_ctx=None):
195198
"""
196199
Reference implementation: :meth:`~tarantool.Connection.upsert`.
197200
"""
198201

199202
raise NotImplementedError
200203

201204
@abc.abstractmethod
202-
def update(self, space_name, key, op_list, *, index=None):
205+
def update(self, space_name, key, op_list, *, index=None, on_push=None, on_push_ctx=None):
203206
"""
204207
Reference implementation: :meth:`~tarantool.Connection.update`.
205208
"""
@@ -216,7 +219,7 @@ def ping(self, notime):
216219

217220
@abc.abstractmethod
218221
def select(self, space_name, key, *, offset=None, limit=None,
219-
index=None, iterator=None):
222+
index=None, iterator=None, on_push=None, on_push_ctx=None):
220223
"""
221224
Reference implementation: :meth:`~tarantool.Connection.select`.
222225
"""
@@ -748,7 +751,7 @@ def _read_response(self):
748751
# Read the packet
749752
return self._recv(length)
750753

751-
def _send_request_wo_reconnect(self, request):
754+
def _send_request_wo_reconnect(self, request, on_push=None, on_push_ctx=None):
752755
"""
753756
Send request without trying to reconnect.
754757
Reload schema, if required.
@@ -767,12 +770,30 @@ def _send_request_wo_reconnect(self, request):
767770

768771
assert isinstance(request, Request)
769772

773+
# Flag for detecting the last message as out-of-band.
774+
iproto_chunk_detected = False
775+
770776
response = None
771777
while True:
772778
try:
773-
self._socket.sendall(bytes(request))
779+
if not iproto_chunk_detected:
780+
# If the last received message is out-of-band,
781+
# the request will not be sent.
782+
self._socket.sendall(bytes(request))
774783
response = request.response_class(self, self._read_response())
775-
break
784+
if response._code == IPROTO_CHUNK:
785+
# Сase of receiving an out-of-band message.
786+
if callable(on_push):
787+
# Callback function with data from out-of-band
788+
# message is being called.
789+
# The callback result can be written to on_push_ctx.
790+
on_push(response._data, on_push_ctx)
791+
iproto_chunk_detected = True
792+
# Receiving the next message.
793+
continue
794+
else:
795+
# Сase of receiving main message.
796+
break
776797
except SchemaReloadException as e:
777798
self.update_schema(e.schema_version)
778799
continue
@@ -851,7 +872,7 @@ def check(): # Check that connection is alive
851872
self.wrap_socket_ssl()
852873
self.handshake()
853874

854-
def _send_request(self, request):
875+
def _send_request(self, request, on_push=None, on_push_ctx=None):
855876
"""
856877
Send a request to the server through the socket.
857878
@@ -872,7 +893,7 @@ def _send_request(self, request):
872893

873894
self._opt_reconnect()
874895

875-
return self._send_request_wo_reconnect(request)
896+
return self._send_request_wo_reconnect(request, on_push, on_push_ctx)
876897

877898
def load_schema(self):
878899
"""
@@ -914,7 +935,7 @@ def flush_schema(self):
914935
self.schema.flush()
915936
self.load_schema()
916937

917-
def call(self, func_name, *args):
938+
def call(self, func_name, *args, on_push=None, on_push_ctx=None):
918939
"""
919940
Execute a CALL request: call a stored Lua function.
920941
@@ -930,6 +951,10 @@ def call(self, func_name, *args):
930951
:exc:`~tarantool.error.SchemaError`,
931952
:exc:`~tarantool.error.NetworkError`,
932953
:exc:`~tarantool.error.SslError`
954+
955+
!!!
956+
TODO: write docs
957+
!!!
933958
"""
934959

935960
assert isinstance(func_name, str)
@@ -939,10 +964,10 @@ def call(self, func_name, *args):
939964
args = args[0]
940965

941966
request = RequestCall(self, func_name, args, self.call_16)
942-
response = self._send_request(request)
967+
response = self._send_request(request, on_push, on_push_ctx)
943968
return response
944969

945-
def eval(self, expr, *args):
970+
def eval(self, expr, *args, on_push=None, on_push_ctx=None):
946971
"""
947972
Execute an EVAL request: evaluate a Lua expression.
948973
@@ -968,10 +993,10 @@ def eval(self, expr, *args):
968993
args = args[0]
969994

970995
request = RequestEval(self, expr, args)
971-
response = self._send_request(request)
996+
response = self._send_request(request, on_push, on_push_ctx)
972997
return response
973998

974-
def replace(self, space_name, values):
999+
def replace(self, space_name, values, on_push=None, on_push_ctx=None):
9751000
"""
9761001
Execute a REPLACE request: `replace`_ a tuple in the space.
9771002
Doesn't throw an error if there is no tuple with the specified
@@ -997,7 +1022,7 @@ def replace(self, space_name, values):
9971022
if isinstance(space_name, str):
9981023
space_name = self.schema.get_space(space_name).sid
9991024
request = RequestReplace(self, space_name, values)
1000-
return self._send_request(request)
1025+
return self._send_request(request, on_push, on_push_ctx)
10011026

10021027
def authenticate(self, user, password):
10031028
"""
@@ -1149,7 +1174,7 @@ def subscribe(self, cluster_uuid, server_uuid, vclock=None):
11491174
return
11501175
self.close() # close connection after SUBSCRIBE
11511176

1152-
def insert(self, space_name, values):
1177+
def insert(self, space_name, values, on_push=None, on_push_ctx=None):
11531178
"""
11541179
Execute an INSERT request: `insert`_ a tuple to the space.
11551180
Throws an error if there is already a tuple with the same
@@ -1175,9 +1200,9 @@ def insert(self, space_name, values):
11751200
if isinstance(space_name, str):
11761201
space_name = self.schema.get_space(space_name).sid
11771202
request = RequestInsert(self, space_name, values)
1178-
return self._send_request(request)
1203+
return self._send_request(request, on_push, on_push_ctx)
11791204

1180-
def delete(self, space_name, key, *, index=0):
1205+
def delete(self, space_name, key, *, index=0, on_push=None, on_push_ctx=None):
11811206
"""
11821207
Execute a DELETE request: `delete`_ a tuple in the space.
11831208
@@ -1208,9 +1233,9 @@ def delete(self, space_name, key, *, index=0):
12081233
if isinstance(index, str):
12091234
index = self.schema.get_index(space_name, index).iid
12101235
request = RequestDelete(self, space_name, index, key)
1211-
return self._send_request(request)
1236+
return self._send_request(request, on_push, on_push_ctx)
12121237

1213-
def upsert(self, space_name, tuple_value, op_list, *, index=0):
1238+
def upsert(self, space_name, tuple_value, op_list, *, index=0, on_push=None, on_push_ctx=None):
12141239
"""
12151240
Execute an UPSERT request: `upsert`_ a tuple to the space.
12161241
@@ -1259,9 +1284,9 @@ def upsert(self, space_name, tuple_value, op_list, *, index=0):
12591284
op_list = self._ops_process(space_name, op_list)
12601285
request = RequestUpsert(self, space_name, index, tuple_value,
12611286
op_list)
1262-
return self._send_request(request)
1287+
return self._send_request(request, on_push, on_push_ctx)
12631288

1264-
def update(self, space_name, key, op_list, *, index=0):
1289+
def update(self, space_name, key, op_list, *, index=0, on_push=None, on_push_ctx=None):
12651290
"""
12661291
Execute an UPDATE request: `update`_ a tuple in the space.
12671292
@@ -1338,7 +1363,7 @@ def update(self, space_name, key, op_list, *, index=0):
13381363
index = self.schema.get_index(space_name, index).iid
13391364
op_list = self._ops_process(space_name, op_list)
13401365
request = RequestUpdate(self, space_name, index, key, op_list)
1341-
return self._send_request(request)
1366+
return self._send_request(request, on_push, on_push_ctx)
13421367

13431368
def ping(self, notime=False):
13441369
"""
@@ -1368,7 +1393,7 @@ def ping(self, notime=False):
13681393
return "Success"
13691394
return t1 - t0
13701395

1371-
def select(self, space_name, key=None, *, offset=0, limit=0xffffffff, index=0, iterator=None):
1396+
def select(self, space_name, key=None, *, offset=0, limit=0xffffffff, index=0, iterator=None, on_push=None, on_push_ctx=None):
13721397
"""
13731398
Execute a SELECT request: `select`_ a tuple from the space.
13741399
@@ -1524,7 +1549,7 @@ def select(self, space_name, key=None, *, offset=0, limit=0xffffffff, index=0, i
15241549
index = self.schema.get_index(space_name, index).iid
15251550
request = RequestSelect(self, space_name, index, key, offset,
15261551
limit, iterator)
1527-
response = self._send_request(request)
1552+
response = self._send_request(request, on_push, on_push_ctx)
15281553
return response
15291554

15301555
def space(self, space_name):

tarantool/connection_pool.py

+16-16
Original file line numberDiff line numberDiff line change
@@ -711,7 +711,7 @@ def _send(self, mode, method_name, *args, **kwargs):
711711

712712
return resp
713713

714-
def call(self, func_name, *args, mode=None):
714+
def call(self, func_name, *args, mode=None, on_push=None, on_push_ctx=None):
715715
"""
716716
Execute a CALL request on the pool server: call a stored Lua
717717
function. Refer to :meth:`~tarantool.Connection.call`.
@@ -734,9 +734,9 @@ def call(self, func_name, *args, mode=None):
734734
if mode is None:
735735
raise ValueError("Please, specify 'mode' keyword argument")
736736

737-
return self._send(mode, 'call', func_name, *args)
737+
return self._send(mode, 'call', func_name, *args, on_push=on_push, on_push_ctx=on_push_ctx)
738738

739-
def eval(self, expr, *args, mode=None):
739+
def eval(self, expr, *args, mode=None, on_push=None, on_push_ctx=None):
740740
"""
741741
Execute an EVAL request on the pool server: evaluate a Lua
742742
expression. Refer to :meth:`~tarantool.Connection.eval`.
@@ -759,9 +759,9 @@ def eval(self, expr, *args, mode=None):
759759
if mode is None:
760760
raise ValueError("Please, specify 'mode' keyword argument")
761761

762-
return self._send(mode, 'eval', expr, *args)
762+
return self._send(mode, 'eval', expr, *args, on_push=on_push, on_push_ctx=on_push_ctx)
763763

764-
def replace(self, space_name, values, *, mode=Mode.RW):
764+
def replace(self, space_name, values, *, mode=Mode.RW, on_push=None, on_push_ctx=None):
765765
"""
766766
Execute a REPLACE request on the pool server: `replace`_ a tuple
767767
in the space. Refer to :meth:`~tarantool.Connection.replace`.
@@ -782,9 +782,9 @@ def replace(self, space_name, values, *, mode=Mode.RW):
782782
.. _replace: https://www.tarantool.io/en/doc/latest/reference/reference_lua/box_space/replace/
783783
"""
784784

785-
return self._send(mode, 'replace', space_name, values)
785+
return self._send(mode, 'replace', space_name, values, on_push=on_push, on_push_ctx=on_push_ctx)
786786

787-
def insert(self, space_name, values, *, mode=Mode.RW):
787+
def insert(self, space_name, values, *, mode=Mode.RW, on_push=None, on_push_ctx=None):
788788
"""
789789
Execute an INSERT request on the pool server: `insert`_ a tuple
790790
to the space. Refer to :meth:`~tarantool.Connection.insert`.
@@ -805,9 +805,9 @@ def insert(self, space_name, values, *, mode=Mode.RW):
805805
.. _insert: https://www.tarantool.io/en/doc/latest/reference/reference_lua/box_space/insert/
806806
"""
807807

808-
return self._send(mode, 'insert', space_name, values)
808+
return self._send(mode, 'insert', space_name, values, on_push=on_push, on_push_ctx=on_push_ctx)
809809

810-
def delete(self, space_name, key, *, index=0, mode=Mode.RW):
810+
def delete(self, space_name, key, *, index=0, mode=Mode.RW, on_push=None, on_push_ctx=None):
811811
"""
812812
Execute an DELETE request on the pool server: `delete`_ a tuple
813813
in the space. Refer to :meth:`~tarantool.Connection.delete`.
@@ -831,9 +831,9 @@ def delete(self, space_name, key, *, index=0, mode=Mode.RW):
831831
.. _delete: https://www.tarantool.io/en/doc/latest/reference/reference_lua/box_space/delete/
832832
"""
833833

834-
return self._send(mode, 'delete', space_name, key, index=index)
834+
return self._send(mode, 'delete', space_name, key, index=index, on_push=on_push, on_push_ctx=on_push_ctx)
835835

836-
def upsert(self, space_name, tuple_value, op_list, *, index=0, mode=Mode.RW):
836+
def upsert(self, space_name, tuple_value, op_list, *, index=0, mode=Mode.RW, on_push=None, on_push_ctx=None):
837837
"""
838838
Execute an UPSERT request on the pool server: `upsert`_ a tuple to
839839
the space. Refer to :meth:`~tarantool.Connection.upsert`.
@@ -861,9 +861,9 @@ def upsert(self, space_name, tuple_value, op_list, *, index=0, mode=Mode.RW):
861861
"""
862862

863863
return self._send(mode, 'upsert', space_name, tuple_value,
864-
op_list, index=index)
864+
op_list, index=index, on_push=on_push, on_push_ctx=on_push_ctx)
865865

866-
def update(self, space_name, key, op_list, *, index=0, mode=Mode.RW):
866+
def update(self, space_name, key, op_list, *, index=0, mode=Mode.RW, on_push=None, on_push_ctx=None):
867867
"""
868868
Execute an UPDATE request on the pool server: `update`_ a tuple
869869
in the space. Refer to :meth:`~tarantool.Connection.update`.
@@ -891,7 +891,7 @@ def update(self, space_name, key, op_list, *, index=0, mode=Mode.RW):
891891
"""
892892

893893
return self._send(mode, 'update', space_name, key,
894-
op_list, index=index)
894+
op_list, index=index, on_push=on_push, on_push_ctx=on_push_ctx)
895895

896896
def ping(self, notime=False, *, mode=None):
897897
"""
@@ -917,7 +917,7 @@ def ping(self, notime=False, *, mode=None):
917917
return self._send(mode, 'ping', notime)
918918

919919
def select(self, space_name, key, *, offset=0, limit=0xffffffff,
920-
index=0, iterator=None, mode=Mode.ANY):
920+
index=0, iterator=None, mode=Mode.ANY, on_push=None, on_push_ctx=None):
921921
"""
922922
Execute a SELECT request on the pool server: `update`_ a tuple
923923
from the space. Refer to :meth:`~tarantool.Connection.select`.
@@ -951,7 +951,7 @@ def select(self, space_name, key, *, offset=0, limit=0xffffffff,
951951
"""
952952

953953
return self._send(mode, 'select', space_name, key, offset=offset, limit=limit,
954-
index=index, iterator=iterator)
954+
index=index, iterator=iterator, on_push=on_push, on_push_ctx=on_push_ctx)
955955

956956
def execute(self, query, params=None, *, mode=None):
957957
"""

tarantool/const.py

+1
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,7 @@
3838

3939
IPROTO_GREETING_SIZE = 128
4040
IPROTO_BODY_MAX_LEN = 2147483648
41+
IPROTO_CHUNK=0x80
4142

4243
REQUEST_TYPE_OK = 0
4344
REQUEST_TYPE_SELECT = 1

0 commit comments

Comments
 (0)