From 64f2b0d0d97bf1bf33ada99f3c77c6ae7ae6c361 Mon Sep 17 00:00:00 2001 From: Ilya Grishnov Date: Wed, 19 Oct 2022 15:46:28 +0300 Subject: [PATCH] iproto: support feature push Adds support for receiving out-of-band messages from server that uses box.session.push call. Data obtaining is implemented for methods: `call`, `eval`, `select`, `insert`, `replace`, `update`, `upsert`, `delete`. To do this, optional arguments `on_push` and `on_push_ctx` are used for these methods. Argument `on_push` sets the callback to call when an out-of-band message is received, and the `on_push_ctx` argument allows to save the result of `on_push` work or pass data to it. So the API is similar to the implementation of LUA version at the moment. Closes #201 --- CHANGELOG.md | 2 + docs/source/quick-start.rst | 147 ++++++++++++++++++++ tarantool/connection.py | 143 +++++++++++++++---- tarantool/connection_pool.py | 80 ++++++++--- tarantool/const.py | 1 + tarantool/mesh_connection.py | 4 +- test/suites/__init__.py | 3 +- test/suites/test_push.py | 257 +++++++++++++++++++++++++++++++++++ 8 files changed, 591 insertions(+), 46 deletions(-) create mode 100644 test/suites/test_push.py diff --git a/CHANGELOG.md b/CHANGELOG.md index 6d44e5de..f9f98237 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -184,6 +184,8 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 Thus, if ``True``, datetime is computed in a way that `dt.timestamp` will always be equal to initialization `timestamp`. +- Support iproto feature push (#201). + ### Changed - Bump msgpack requirement to 1.0.4 (PR #223). The only reason of this bump is various vulnerability fixes, diff --git a/docs/source/quick-start.rst b/docs/source/quick-start.rst index 0f22a7bf..5fe64ff8 100644 --- a/docs/source/quick-start.rst +++ b/docs/source/quick-start.rst @@ -200,3 +200,150 @@ read-write and read-only pool instances: >>> resp = conn.select('demo', 'AAAA', mode=tarantool.Mode.PREFER_RO) >>> resp - ['AAAA', 'Alpha'] + + +Receiving out-of-band messages +---------------------------------- + +Receiving out-of-band messages from a server that uses box.session.push +call is supported for methods: :meth:`~tarantool.Connection.call`, +:meth:`~tarantool.Connection.eval`, :meth:`~tarantool.Connection.select`, +:meth:`~tarantool.Connection.insert`, :meth:`~tarantool.Connection.replace`, +:meth:`~tarantool.Connection.update`, :meth:`~tarantool.Connection.upsert`, +:meth:`~tarantool.Connection.delete`. + +To work with out-of-band messages, 2 optional arguments are used in +the methods listed above: + + * `on_push` - callback, launched with the received data for each out-of-band message. Two arguments for this callback are expected: + + * the first is the received from an out-of-band message data. + + * the second is `on_push_ctx`, variable for working with callback context (for example, recording the result or pass data to callback). + * `on_push_ctx` - result of the `on_push` work can be written to this variable, or through this variable you can pass data to `on_push` callback. + +Below is an example of the proposed API with method :meth:`~tarantool.Connection.call` +and :meth:`~tarantool.Connection.insert`. In the described example, before the end +of the :meth:`~tarantool.Connection.call` and :meth:`~tarantool.Connection.insert`, +out-of-band messages are processed via specified callback. + +In the example below, two shells are used, in the first we will configure the server: + +.. code-block:: lua + + fiber = require('fiber') + box.cfg({listen = 3301}) + box.schema.user.grant( + 'guest', + 'read,write,execute', + 'universe' + ) + function server_function() + x = {0,0} + while x[1] < 3 do + x[1] = x[1] + 1 + fiber.sleep(1) + box.session.push(x) + end + fiber.sleep(1) + return x + end + +In the second shell, we will execute a :meth:`~tarantool.Connection.call` +with receiving out-of-band messages from the server: + +.. code-block:: python + + import tarantool + + def callback(data, on_push_ctx=[]): + print('run callback with data: ', data) + data[0][1] = data[0][1] + 1 + on_push_ctx.append(data) + + callback_res = [] + + conn = tarantool.Connection(port=3301) + res = conn.call( + 'server_function', + on_push=callback, + on_push_ctx=callback_res + ) + + # receiving out-of-band messages, + # the conn.call is not finished yet. + + >>> run callback with data: [[1, 0]] + >>> run callback with data: [[2, 0]] + >>> run callback with data: [[3, 0]] + + # the conn.call is finished now. + + print(res) + >>> [3, 0] + + print(callback_res) + >>> [[[1, 1]], [[2, 1]], [[3, 1]]] + +Let's go back to the first shell with the server and +create a space and a trigger for it: + +.. code-block:: lua + + box.schema.create_space( + 'tester', { + format = { + {name = 'id', type = 'unsigned'}, + {name = 'name', type = 'string'}, + } + }) + box.space.tester:create_index( + 'primary_index', { + parts = { + {field = 1, type = 'unsigned'}, + } + }) + function on_replace_callback() + x = {0,0} + while x[1] < 300 do + x[1] = x[1] + 100 + box.session.push(x) + end + return x + end + box.space.tester:on_replace( + on_replace_callback + ) + +Now, in the second shell, we will execute an :meth:`~tarantool.ConnectionPool.insert` +with out-of-band message processing: + +.. code-block:: python + + callback_res = [] + + conn_pool = tarantool.ConnectionPool( + [{'host':'localhost', 'port':3301}], + user='guest') + + res = conn_pool.insert( + 'tester', + (1, 'Mike'), + on_push=callback, + on_push_ctx=callback_res, + ) + + # receiving out-of-band messages, + # the conn_pool.insert is not finished yet. + + >>> run callback with data: [[100, 0]] + >>> run callback with data: [[200, 0]] + >>> run callback with data: [[300, 0]] + + # the conn_pool.insert is finished now. + + print(res) + >>> [1, 'Mike'] + + print(callback_res) + >>> [[[100, 1]], [[200, 1]], [[300, 1]]] diff --git a/tarantool/connection.py b/tarantool/connection.py index 3df82a9e..5b4fe608 100644 --- a/tarantool/connection.py +++ b/tarantool/connection.py @@ -64,6 +64,7 @@ IPROTO_FEATURE_ERROR_EXTENSION, IPROTO_FEATURE_WATCHERS, IPROTO_FEATURE_GRACEFUL_SHUTDOWN, + IPROTO_CHUNK, ) from tarantool.error import ( Error, @@ -159,7 +160,7 @@ def connect(self): raise NotImplementedError @abc.abstractmethod - def call(self, func_name, *args): + def call(self, func_name, *args, on_push=None, on_push_ctx=None): """ Reference implementation: :meth:`~tarantool.Connection.call`. """ @@ -167,7 +168,7 @@ def call(self, func_name, *args): raise NotImplementedError @abc.abstractmethod - def eval(self, expr, *args): + def eval(self, expr, *args, on_push=None, on_push_ctx=None): """ Reference implementation: :meth:`~tarantool.Connection.eval`. """ @@ -175,7 +176,7 @@ def eval(self, expr, *args): raise NotImplementedError @abc.abstractmethod - def replace(self, space_name, values): + def replace(self, space_name, values, on_push=None, on_push_ctx=None): """ Reference implementation: :meth:`~tarantool.Connection.replace`. """ @@ -183,7 +184,7 @@ def replace(self, space_name, values): raise NotImplementedError @abc.abstractmethod - def insert(self, space_name, values): + def insert(self, space_name, values, on_push=None, on_push_ctx=None): """ Reference implementation: :meth:`~tarantool.Connection.insert`. """ @@ -191,7 +192,7 @@ def insert(self, space_name, values): raise NotImplementedError @abc.abstractmethod - def delete(self, space_name, key, *, index=None): + def delete(self, space_name, key, *, index=None, on_push=None, on_push_ctx=None): """ Reference implementation: :meth:`~tarantool.Connection.delete`. """ @@ -199,7 +200,8 @@ def delete(self, space_name, key, *, index=None): raise NotImplementedError @abc.abstractmethod - def upsert(self, space_name, tuple_value, op_list, *, index=None): + def upsert(self, space_name, tuple_value, op_list, *, index=None, + on_push=None, on_push_ctx=None): """ Reference implementation: :meth:`~tarantool.Connection.upsert`. """ @@ -207,7 +209,7 @@ def upsert(self, space_name, tuple_value, op_list, *, index=None): raise NotImplementedError @abc.abstractmethod - def update(self, space_name, key, op_list, *, index=None): + def update(self, space_name, key, op_list, *, index=None, on_push=None, on_push_ctx=None): """ Reference implementation: :meth:`~tarantool.Connection.update`. """ @@ -224,7 +226,7 @@ def ping(self, notime): @abc.abstractmethod def select(self, space_name, key, *, offset=None, limit=None, - index=None, iterator=None): + index=None, iterator=None, on_push=None, on_push_ctx=None): """ Reference implementation: :meth:`~tarantool.Connection.select`. """ @@ -767,7 +769,7 @@ def _read_response(self): # Read the packet return self._recv(length) - def _send_request_wo_reconnect(self, request): + def _send_request_wo_reconnect(self, request, on_push=None, on_push_ctx=None): """ Send request without trying to reconnect. Reload schema, if required. @@ -775,6 +777,12 @@ def _send_request_wo_reconnect(self, request): :param request: Request to send. :type request: :class:`~tarantool.request.Request` + :param on_push: Сallback for processing out-of-band messages. + :type on_push: :obj:`function`, optional + + :param on_push_ctx: Сontext for working with on_push callback. + :type on_push_ctx: optional + :rtype: :class:`~tarantool.response.Response` :raise: :exc:`~AssertionError`, @@ -796,6 +804,11 @@ def _send_request_wo_reconnect(self, request): self.update_schema(e.schema_version) continue + while response._code == IPROTO_CHUNK: + if on_push is not None: + on_push(response._data, on_push_ctx) + response = request.response_class(self, self._read_response()) + return response def _opt_reconnect(self): @@ -870,13 +883,19 @@ def check(): # Check that connection is alive self.wrap_socket_ssl() self.handshake() - def _send_request(self, request): + def _send_request(self, request, on_push=None, on_push_ctx=None): """ Send a request to the server through the socket. :param request: Request to send. :type request: :class:`~tarantool.request.Request` + :param on_push: Сallback for processing out-of-band messages. + :type on_push: :obj:`function`, optional + + :param on_push_ctx: Сontext for working with on_push callback. + :type on_push_ctx: optional + :rtype: :class:`~tarantool.response.Response` :raise: :exc:`~AssertionError`, @@ -891,7 +910,7 @@ def _send_request(self, request): self._opt_reconnect() - return self._send_request_wo_reconnect(request) + return self._send_request_wo_reconnect(request, on_push, on_push_ctx) def load_schema(self): """ @@ -933,7 +952,7 @@ def flush_schema(self): self.schema.flush() self.load_schema() - def call(self, func_name, *args): + def call(self, func_name, *args, on_push=None, on_push_ctx=None): """ Execute a CALL request: call a stored Lua function. @@ -943,6 +962,12 @@ def call(self, func_name, *args): :param args: Stored Lua function arguments. :type args: :obj:`tuple` + :param on_push: Сallback for processing out-of-band messages. + :type on_push: :obj:`function`, optional + + :param on_push_ctx: Сontext for working with on_push callback. + :type on_push_ctx: optional + :rtype: :class:`~tarantool.response.Response` :raise: :exc:`~AssertionError`, @@ -952,16 +977,18 @@ def call(self, func_name, *args): """ assert isinstance(func_name, str) + if on_push is not None and not callable(on_push): + raise TypeError('The on_push callback must be callable') # This allows to use a tuple or list as an argument if len(args) == 1 and isinstance(args[0], (list, tuple)): args = args[0] request = RequestCall(self, func_name, args, self.call_16) - response = self._send_request(request) + response = self._send_request(request, on_push, on_push_ctx) return response - def eval(self, expr, *args): + def eval(self, expr, *args, on_push=None, on_push_ctx=None): """ Execute an EVAL request: evaluate a Lua expression. @@ -971,6 +998,12 @@ def eval(self, expr, *args): :param args: Lua expression arguments. :type args: :obj:`tuple` + :param on_push: Сallback for processing out-of-band messages. + :type on_push: :obj:`function`, optional + + :param on_push_ctx: Сontext for working with on_push callback. + :type on_push_ctx: optional + :rtype: :class:`~tarantool.response.Response` :raise: :exc:`~AssertionError`, @@ -981,16 +1014,18 @@ def eval(self, expr, *args): """ assert isinstance(expr, str) + if on_push is not None and not callable(on_push): + raise TypeError('The on_push callback must be callable') # This allows to use a tuple or list as an argument if len(args) == 1 and isinstance(args[0], (list, tuple)): args = args[0] request = RequestEval(self, expr, args) - response = self._send_request(request) + response = self._send_request(request, on_push, on_push_ctx) return response - def replace(self, space_name, values): + def replace(self, space_name, values, on_push=None, on_push_ctx=None): """ Execute a REPLACE request: `replace`_ a tuple in the space. Doesn't throw an error if there is no tuple with the specified @@ -1002,6 +1037,12 @@ def replace(self, space_name, values): :param values: Tuple to be replaced. :type values: :obj:`tuple` or :obj:`list` + :param on_push: Сallback for processing out-of-band messages. + :type on_push: :obj:`function`, optional + + :param on_push_ctx: Сontext for working with on_push callback. + :type on_push_ctx: optional + :rtype: :class:`~tarantool.response.Response` :raise: :exc:`~AssertionError`, @@ -1015,8 +1056,11 @@ def replace(self, space_name, values): if isinstance(space_name, str): space_name = self.schema.get_space(space_name).sid + if on_push is not None and not callable(on_push): + raise TypeError('The on_push callback must be callable') + request = RequestReplace(self, space_name, values) - return self._send_request(request) + return self._send_request(request, on_push, on_push_ctx) def authenticate(self, user, password): """ @@ -1168,7 +1212,7 @@ def subscribe(self, cluster_uuid, server_uuid, vclock=None): return self.close() # close connection after SUBSCRIBE - def insert(self, space_name, values): + def insert(self, space_name, values, on_push=None, on_push_ctx=None): """ Execute an INSERT request: `insert`_ a tuple to the space. Throws an error if there is already a tuple with the same @@ -1180,6 +1224,12 @@ def insert(self, space_name, values): :param values: Record to be inserted. :type values: :obj:`tuple` or :obj:`list` + :param on_push: Сallback for processing out-of-band messages. + :type on_push: :obj:`function`, optional + + :param on_push_ctx: Сontext for working with on_push callback. + :type on_push_ctx: optional + :rtype: :class:`~tarantool.response.Response` :raise: :exc:`~AssertionError`, @@ -1193,10 +1243,13 @@ def insert(self, space_name, values): if isinstance(space_name, str): space_name = self.schema.get_space(space_name).sid + if on_push is not None and not callable(on_push): + raise TypeError('The on_push callback must be callable') + request = RequestInsert(self, space_name, values) - return self._send_request(request) + return self._send_request(request, on_push, on_push_ctx) - def delete(self, space_name, key, *, index=0): + def delete(self, space_name, key, *, index=0, on_push=None, on_push_ctx=None): """ Execute a DELETE request: `delete`_ a tuple in the space. @@ -1210,6 +1263,12 @@ def delete(self, space_name, key, *, index=0): index. :type index: :obj:`str` or :obj:`int`, optional + :param on_push: Сallback for processing out-of-band messages. + :type on_push: :obj:`function`, optional + + :param on_push_ctx: Сontext for working with on_push callback. + :type on_push_ctx: optional + :rtype: :class:`~tarantool.response.Response` :raise: :exc:`~AssertionError`, @@ -1226,10 +1285,13 @@ def delete(self, space_name, key, *, index=0): space_name = self.schema.get_space(space_name).sid if isinstance(index, str): index = self.schema.get_index(space_name, index).iid + if on_push is not None and not callable(on_push): + raise TypeError('The on_push callback must be callable') + request = RequestDelete(self, space_name, index, key) - return self._send_request(request) + return self._send_request(request, on_push, on_push_ctx) - def upsert(self, space_name, tuple_value, op_list, *, index=0): + def upsert(self, space_name, tuple_value, op_list, *, index=0, on_push=None, on_push_ctx=None): """ Execute an UPSERT request: `upsert`_ a tuple to the space. @@ -1260,6 +1322,12 @@ def upsert(self, space_name, tuple_value, op_list, *, index=0): index. :type index: :obj:`str` or :obj:`int`, optional + :param on_push: Сallback for processing out-of-band messages. + :type on_push: :obj:`function`, optional + + :param on_push_ctx: Сontext for working with on_push callback. + :type on_push_ctx: optional + :rtype: :class:`~tarantool.response.Response` :raise: :exc:`~AssertionError`, @@ -1275,12 +1343,15 @@ def upsert(self, space_name, tuple_value, op_list, *, index=0): space_name = self.schema.get_space(space_name).sid if isinstance(index, str): index = self.schema.get_index(space_name, index).iid + if on_push is not None and not callable(on_push): + raise TypeError('The on_push callback must be callable') + op_list = self._ops_process(space_name, op_list) request = RequestUpsert(self, space_name, index, tuple_value, op_list) - return self._send_request(request) + return self._send_request(request, on_push, on_push_ctx) - def update(self, space_name, key, op_list, *, index=0): + def update(self, space_name, key, op_list, *, index=0, on_push=None, on_push_ctx=None): """ Execute an UPDATE request: `update`_ a tuple in the space. @@ -1339,6 +1410,12 @@ def update(self, space_name, key, op_list, *, index=0): index. :type index: :obj:`str` or :obj:`int`, optional + :param on_push: Сallback for processing out-of-band messages. + :type on_push: :obj:`function`, optional + + :param on_push_ctx: Сontext for working with on_push callback. + :type on_push_ctx: optional + :rtype: :class:`~tarantool.response.Response` :raise: :exc:`~AssertionError`, @@ -1355,9 +1432,12 @@ def update(self, space_name, key, op_list, *, index=0): space_name = self.schema.get_space(space_name).sid if isinstance(index, str): index = self.schema.get_index(space_name, index).iid + if on_push is not None and not callable(on_push): + raise TypeError('The on_push callback must be callable') + op_list = self._ops_process(space_name, op_list) request = RequestUpdate(self, space_name, index, key, op_list) - return self._send_request(request) + return self._send_request(request, on_push, on_push_ctx) def ping(self, notime=False): """ @@ -1387,7 +1467,7 @@ def ping(self, notime=False): return "Success" return t1 - t0 - def select(self, space_name, key=None, *, offset=0, limit=0xffffffff, index=0, iterator=None): + def select(self, space_name, key=None, *, offset=0, limit=0xffffffff, index=0, iterator=None, on_push=None, on_push_ctx=None): """ Execute a SELECT request: `select`_ a tuple from the space. @@ -1516,6 +1596,12 @@ def select(self, space_name, key=None, *, offset=0, limit=0xffffffff, index=0, i | | | the space. | +----------------------------+-----------+----------------------------------------------+ + :param on_push: Сallback for processing out-of-band messages. + :type on_push: :obj:`function`, optional + + :param on_push_ctx: Сontext for working with on_push callback. + :type on_push_ctx: optional + :rtype: :class:`~tarantool.response.Response` :raise: :exc:`~AssertionError`, @@ -1541,9 +1627,12 @@ def select(self, space_name, key=None, *, offset=0, limit=0xffffffff, index=0, i space_name = self.schema.get_space(space_name).sid if isinstance(index, str): index = self.schema.get_index(space_name, index).iid + if on_push is not None and not callable(on_push): + raise TypeError('The on_push callback must be callable') + request = RequestSelect(self, space_name, index, key, offset, limit, iterator) - response = self._send_request(request) + response = self._send_request(request, on_push, on_push_ctx) return response def space(self, space_name): diff --git a/tarantool/connection_pool.py b/tarantool/connection_pool.py index de4a869d..10523c84 100644 --- a/tarantool/connection_pool.py +++ b/tarantool/connection_pool.py @@ -711,7 +711,7 @@ def _send(self, mode, method_name, *args, **kwargs): return resp - def call(self, func_name, *args, mode=None): + def call(self, func_name, *args, mode=None, on_push=None, on_push_ctx=None): """ Execute a CALL request on the pool server: call a stored Lua function. Refer to :meth:`~tarantool.Connection.call`. @@ -725,6 +725,12 @@ def call(self, func_name, *args, mode=None): :param mode: Request mode. :type mode: :class:`~tarantool.Mode` + :param on_push: Refer to + :paramref:`~tarantool.Connection.call.params.on_push`. + + :param on_push_ctx: Refer to + :paramref:`~tarantool.Connection.call.params.on_push_ctx`. + :rtype: :class:`~tarantool.response.Response` :raise: :exc:`~ValueError`, @@ -734,9 +740,9 @@ def call(self, func_name, *args, mode=None): if mode is None: raise ValueError("Please, specify 'mode' keyword argument") - return self._send(mode, 'call', func_name, *args) + return self._send(mode, 'call', func_name, *args, on_push=on_push, on_push_ctx=on_push_ctx) - def eval(self, expr, *args, mode=None): + def eval(self, expr, *args, mode=None, on_push=None, on_push_ctx=None): """ Execute an EVAL request on the pool server: evaluate a Lua expression. Refer to :meth:`~tarantool.Connection.eval`. @@ -750,6 +756,12 @@ def eval(self, expr, *args, mode=None): :param mode: Request mode. :type mode: :class:`~tarantool.Mode` + :param on_push: Refer to + :paramref:`~tarantool.Connection.eval.params.on_push`. + + :param on_push_ctx: Refer to + :paramref:`~tarantool.Connection.eval.params.on_push_ctx`. + :rtype: :class:`~tarantool.response.Response` :raise: :exc:`~ValueError`, @@ -759,9 +771,9 @@ def eval(self, expr, *args, mode=None): if mode is None: raise ValueError("Please, specify 'mode' keyword argument") - return self._send(mode, 'eval', expr, *args) + return self._send(mode, 'eval', expr, *args, on_push=on_push, on_push_ctx=on_push_ctx) - def replace(self, space_name, values, *, mode=Mode.RW): + def replace(self, space_name, values, *, mode=Mode.RW, on_push=None, on_push_ctx=None): """ Execute a REPLACE request on the pool server: `replace`_ a tuple in the space. Refer to :meth:`~tarantool.Connection.replace`. @@ -775,6 +787,12 @@ def replace(self, space_name, values, *, mode=Mode.RW): :param mode: Request mode. :type mode: :class:`~tarantool.Mode`, optional + :param on_push: Refer to + :paramref:`~tarantool.Connection.replace.params.on_push`. + + :param on_push_ctx: Refer to + :paramref:`~tarantool.Connection.replace.params.on_push_ctx`. + :rtype: :class:`~tarantool.response.Response` :raise: :meth:`~tarantool.Connection.replace` exceptions @@ -782,9 +800,9 @@ def replace(self, space_name, values, *, mode=Mode.RW): .. _replace: https://www.tarantool.io/en/doc/latest/reference/reference_lua/box_space/replace/ """ - return self._send(mode, 'replace', space_name, values) + return self._send(mode, 'replace', space_name, values, on_push=on_push, on_push_ctx=on_push_ctx) - def insert(self, space_name, values, *, mode=Mode.RW): + def insert(self, space_name, values, *, mode=Mode.RW, on_push=None, on_push_ctx=None): """ Execute an INSERT request on the pool server: `insert`_ a tuple to the space. Refer to :meth:`~tarantool.Connection.insert`. @@ -798,6 +816,12 @@ def insert(self, space_name, values, *, mode=Mode.RW): :param mode: Request mode. :type mode: :class:`~tarantool.Mode`, optional + :param on_push: Refer to + :paramref:`~tarantool.Connection.insert.params.on_push`. + + :param on_push_ctx: Refer to + :paramref:`~tarantool.Connection.insert.params.on_push_ctx`. + :rtype: :class:`~tarantool.response.Response` :raise: :meth:`~tarantool.Connection.insert` exceptions @@ -805,9 +829,9 @@ def insert(self, space_name, values, *, mode=Mode.RW): .. _insert: https://www.tarantool.io/en/doc/latest/reference/reference_lua/box_space/insert/ """ - return self._send(mode, 'insert', space_name, values) + return self._send(mode, 'insert', space_name, values, on_push=on_push, on_push_ctx=on_push_ctx) - def delete(self, space_name, key, *, index=0, mode=Mode.RW): + def delete(self, space_name, key, *, index=0, mode=Mode.RW, on_push=None, on_push_ctx=None): """ Execute an DELETE request on the pool server: `delete`_ a tuple in the space. Refer to :meth:`~tarantool.Connection.delete`. @@ -824,6 +848,12 @@ def delete(self, space_name, key, *, index=0, mode=Mode.RW): :param mode: Request mode. :type mode: :class:`~tarantool.Mode`, optional + :param on_push: Refer to + :paramref:`~tarantool.Connection.delete.params.on_push`. + + :param on_push_ctx: Refer to + :paramref:`~tarantool.Connection.delete.params.on_push_ctx`. + :rtype: :class:`~tarantool.response.Response` :raise: :meth:`~tarantool.Connection.delete` exceptions @@ -831,9 +861,9 @@ def delete(self, space_name, key, *, index=0, mode=Mode.RW): .. _delete: https://www.tarantool.io/en/doc/latest/reference/reference_lua/box_space/delete/ """ - return self._send(mode, 'delete', space_name, key, index=index) + return self._send(mode, 'delete', space_name, key, index=index, on_push=on_push, on_push_ctx=on_push_ctx) - def upsert(self, space_name, tuple_value, op_list, *, index=0, mode=Mode.RW): + def upsert(self, space_name, tuple_value, op_list, *, index=0, mode=Mode.RW, on_push=None, on_push_ctx=None): """ Execute an UPSERT request on the pool server: `upsert`_ a tuple to the space. Refer to :meth:`~tarantool.Connection.upsert`. @@ -853,6 +883,12 @@ def upsert(self, space_name, tuple_value, op_list, *, index=0, mode=Mode.RW): :param mode: Request mode. :type mode: :class:`~tarantool.Mode`, optional + :param on_push: Refer to + :paramref:`~tarantool.Connection.upsert.params.on_push`. + + :param on_push_ctx: Refer to + :paramref:`~tarantool.Connection.upsert.params.on_push_ctx`. + :rtype: :class:`~tarantool.response.Response` :raise: :meth:`~tarantool.Connection.upsert` exceptions @@ -861,9 +897,9 @@ def upsert(self, space_name, tuple_value, op_list, *, index=0, mode=Mode.RW): """ return self._send(mode, 'upsert', space_name, tuple_value, - op_list, index=index) + op_list, index=index, on_push=on_push, on_push_ctx=on_push_ctx) - def update(self, space_name, key, op_list, *, index=0, mode=Mode.RW): + def update(self, space_name, key, op_list, *, index=0, mode=Mode.RW, on_push=None, on_push_ctx=None): """ Execute an UPDATE request on the pool server: `update`_ a tuple in the space. Refer to :meth:`~tarantool.Connection.update`. @@ -883,6 +919,12 @@ def update(self, space_name, key, op_list, *, index=0, mode=Mode.RW): :param mode: Request mode. :type mode: :class:`~tarantool.Mode`, optional + :param on_push: Refer to + :paramref:`~tarantool.Connection.update.params.on_push`. + + :param on_push_ctx: Refer to + :paramref:`~tarantool.Connection.update.params.on_push_ctx`. + :rtype: :class:`~tarantool.response.Response` :raise: :meth:`~tarantool.Connection.upsert` exceptions @@ -891,7 +933,7 @@ def update(self, space_name, key, op_list, *, index=0, mode=Mode.RW): """ return self._send(mode, 'update', space_name, key, - op_list, index=index) + op_list, index=index, on_push=on_push, on_push_ctx=on_push_ctx) def ping(self, notime=False, *, mode=None): """ @@ -917,7 +959,7 @@ def ping(self, notime=False, *, mode=None): return self._send(mode, 'ping', notime) def select(self, space_name, key, *, offset=0, limit=0xffffffff, - index=0, iterator=None, mode=Mode.ANY): + index=0, iterator=None, mode=Mode.ANY, on_push=None, on_push_ctx=None): """ Execute a SELECT request on the pool server: `update`_ a tuple from the space. Refer to :meth:`~tarantool.Connection.select`. @@ -943,6 +985,12 @@ def select(self, space_name, key, *, offset=0, limit=0xffffffff, :param mode: Request mode. :type mode: :class:`~tarantool.Mode`, optional + :param on_push: Refer to + :paramref:`~tarantool.Connection.select.params.on_push`. + + :param on_push_ctx: Refer to + :paramref:`~tarantool.Connection.select.params.on_push_ctx`. + :rtype: :class:`~tarantool.response.Response` :raise: :meth:`~tarantool.Connection.select` exceptions @@ -951,7 +999,7 @@ def select(self, space_name, key, *, offset=0, limit=0xffffffff, """ return self._send(mode, 'select', space_name, key, offset=offset, limit=limit, - index=index, iterator=iterator) + index=index, iterator=iterator, on_push=on_push, on_push_ctx=on_push_ctx) def execute(self, query, params=None, *, mode=None): """ diff --git a/tarantool/const.py b/tarantool/const.py index 8f9f6ce0..4843c387 100644 --- a/tarantool/const.py +++ b/tarantool/const.py @@ -40,6 +40,7 @@ # IPROTO_VERSION = 0x54 IPROTO_FEATURES = 0x55 +IPROTO_CHUNK = 0x80 IPROTO_GREETING_SIZE = 128 IPROTO_BODY_MAX_LEN = 2147483648 diff --git a/tarantool/mesh_connection.py b/tarantool/mesh_connection.py index 7ce89570..e8d31b54 100644 --- a/tarantool/mesh_connection.py +++ b/tarantool/mesh_connection.py @@ -581,7 +581,7 @@ def _opt_refresh_instances(self): update_connection(self, addr) self._opt_reconnect() - def _send_request(self, request): + def _send_request(self, request, on_push=None, on_push_ctx=None): """ Send a request to a Tarantool server. If required, refresh addresses list before sending a request. @@ -596,4 +596,4 @@ def _send_request(self, request): """ self._opt_refresh_instances() - return super(MeshConnection, self)._send_request(request) + return super(MeshConnection, self)._send_request(request, on_push, on_push_ctx) diff --git a/test/suites/__init__.py b/test/suites/__init__.py index aae5fe23..ee951038 100644 --- a/test/suites/__init__.py +++ b/test/suites/__init__.py @@ -21,6 +21,7 @@ from .test_interval import TestSuite_Interval from .test_package import TestSuite_Package from .test_error_ext import TestSuite_ErrorExt +from .test_push import TestSuite_Push test_cases = (TestSuite_Schema_UnicodeConnection, TestSuite_Schema_BinaryConnection, @@ -28,7 +29,7 @@ TestSuite_Mesh, TestSuite_Execute, TestSuite_DBAPI, TestSuite_Encoding, TestSuite_Pool, TestSuite_Ssl, TestSuite_Decimal, TestSuite_UUID, TestSuite_Datetime, - TestSuite_Interval, TestSuite_ErrorExt,) + TestSuite_Interval, TestSuite_ErrorExt, TestSuite_Push,) def load_tests(loader, tests, pattern): suite = unittest.TestSuite() diff --git a/test/suites/test_push.py b/test/suites/test_push.py new file mode 100644 index 00000000..618a56f5 --- /dev/null +++ b/test/suites/test_push.py @@ -0,0 +1,257 @@ +import sys +import unittest +import tarantool +from .lib.tarantool_server import TarantoolServer + + +def create_server(): + srv = TarantoolServer() + srv.script = 'test/suites/box.lua' + srv.start() + srv.admin("box.schema.user.create('test', {password = 'test', " + + "if_not_exists = true})") + srv.admin("box.schema.user.grant('test', 'read,write,execute', 'universe')") + + # Create server_function (for testing purposes). + srv.admin(""" + function server_function() + x = {0,0} + while x[1] < 3 do + x[1] = x[1] + 1 + box.session.push(x) + end + return x + end + """) + + # Create tester space and on_replace trigger (for testing purposes). + srv.admin(""" + box.schema.create_space( + 'tester', { + format = { + {name = 'id', type = 'unsigned'}, + {name = 'name', type = 'string'}, + } + }) + """) + srv.admin(""" + box.space.tester:create_index( + 'primary_index', { + parts = { + {field = 1, type = 'unsigned'}, + } + }) + """) + srv.admin(""" + box.space.tester:create_index( + 'primary_index', { + parts = { + {field = 1, type = 'unsigned'}, + } + }) + """) + srv.admin(""" + function on_replace_callback() + x = {0,0} + while x[1] < 300 do + x[1] = x[1] + 100 + box.session.push(x) + end + end + """) + srv.admin(""" + box.space.tester:on_replace( + on_replace_callback + ) + """) + + return srv + + +# Callback for on_push arg (for testing purposes). +def push_callback(data, on_push_ctx=[]): + data[0][1] = data[0][1] + 1 + on_push_ctx.append(data) + + +class TestSuite_Push(unittest.TestCase): + + @classmethod + def setUpClass(self): + print(' PUSH '.center(70, '='), file=sys.stderr) + print('-' * 70, file=sys.stderr) + # Create server and extract helpful fields for tests. + self.srv = create_server() + self.host = self.srv.host + self.port = self.srv.args['primary'] + + def setUp(self): + # Open connection, connection pool and mesh connection to instance. + self.conn = tarantool.Connection(host=self.host, port=self.port, + user='test', password='test') + self.conn_pool = tarantool.ConnectionPool([{'host':self.host, 'port':self.port}], + user='test', password='test') + self.mesh_conn = tarantool.MeshConnection(host=self.host, port=self.port, + user='test', password='test') + + push_test_cases = { + 'call': { + 'input': { + 'args': ['server_function'], + 'kwargs': { + 'on_push': push_callback, + # on_push_ctx must be set manually when running the test. + 'on_push_ctx': None, + } + }, + 'output': { + 'callback_res': [[[1, 1]], [[2, 1]], [[3, 1]]], + 'resp': [3, 0], + }, + }, + 'eval': { + 'input': { + 'args': ['return server_function()'], + 'kwargs': { + 'on_push': push_callback, + # on_push_ctx must be set manually when running the test. + 'on_push_ctx': None, + } + }, + 'output': { + 'callback_res': [[[1, 1]], [[2, 1]], [[3, 1]]], + 'resp': [3, 0], + }, + }, + 'insert': { + 'input': { + 'args': ['tester', (1, 'Mike')], + 'kwargs': { + 'on_push': push_callback, + # on_push_ctx must be set manually when running the test. + 'on_push_ctx': None, + } + }, + 'output': { + 'callback_res': [[[100, 1]], [[200, 1]], [[300, 1]]], + 'resp': [1, 'Mike'], + }, + }, + 'replace': { + 'input': { + 'args': ['tester', (1, 'Bill')], + 'kwargs': { + 'on_push': push_callback, + # on_push_ctx must be set manually when running the test. + 'on_push_ctx': None, + } + }, + 'output': { + 'callback_res': [[[100, 1]], [[200, 1]], [[300, 1]]], + 'resp': [1, 'Bill'], + }, + }, + 'update': { + 'input': { + 'args': ['tester', 1], + 'kwargs': { + 'op_list': [], + 'on_push': push_callback, + # on_push_ctx must be set manually when running the test. + 'on_push_ctx': None, + } + }, + 'output': { + 'callback_res': [[[100, 1]], [[200, 1]], [[300, 1]]], + 'resp': [1, 'Bill'], + }, + }, + 'upsert': { + 'input': { + 'args': ['tester', (1, 'Bill')], + 'kwargs': { + 'op_list': [], + 'on_push': push_callback, + # on_push_ctx must be set manually when running the test. + 'on_push_ctx': None, + } + }, + 'output': { + 'callback_res': [[[100, 1]], [[200, 1]], [[300, 1]]], + # resp not used in the test output. + 'resp': None, + }, + }, + 'delete': { + 'input': { + 'args': ['tester', 1], + 'kwargs': { + 'on_push': push_callback, + # on_push_ctx must be set manually when running the test. + 'on_push_ctx': None, + } + }, + 'output': { + 'callback_res': [[[100, 1]], [[200, 1]], [[300, 1]]], + 'resp': [1, 'Bill'], + }, + }, + } + + def test_00_00_push_via_connection(self): + for case_name in self.push_test_cases.keys(): + with self.subTest(name=case_name): + callback_res = [] + case = self.push_test_cases[case_name] + testing_function = getattr(self.conn, case_name) + case['input']['kwargs']['on_push_ctx'] = callback_res + resp = testing_function( + *case['input']['args'], + **case['input']['kwargs'] + ) + self.assertEqual(callback_res, case['output']['callback_res']) + if case['output']['resp'] is not None: + self.assertEqual(resp.data[0], case['output']['resp']) + + def test_00_01_push_via_mesh_connection(self): + for case_name in self.push_test_cases.keys(): + with self.subTest(name=case_name): + callback_res = [] + case = self.push_test_cases[case_name] + testing_function = getattr(self.mesh_conn, case_name) + case['input']['kwargs']['on_push_ctx'] = callback_res + resp = testing_function( + *case['input']['args'], + **case['input']['kwargs'] + ) + self.assertEqual(callback_res, case['output']['callback_res']) + if case['output']['resp'] is not None: + self.assertEqual(resp.data[0], case['output']['resp']) + + def test_00_02_push_via_connection_pool(self): + for case_name in self.push_test_cases.keys(): + with self.subTest(name=case_name): + callback_res = [] + case = self.push_test_cases[case_name] + testing_function = getattr(self.conn_pool, case_name) + case['input']['kwargs']['on_push_ctx'] = callback_res + resp = testing_function( + *case['input']['args'], + **case['input']['kwargs'], + mode=tarantool.Mode.RW + ) + self.assertEqual(callback_res, case['output']['callback_res']) + if case['output']['resp'] is not None: + self.assertEqual(resp.data[0], case['output']['resp']) + + def tearDown(self): + # Close connection, connection pool and mesh connection to instance. + self.conn.close() + self.conn_pool.close() + self.mesh_conn.close() + + @classmethod + def tearDownClass(self): + # Stop instance. + self.srv.stop() + self.srv.clean()