Skip to content

iproto: support feature push #247

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Nov 3, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -184,6 +184,8 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
Thus, if ``True``, datetime is computed in a way that `dt.timestamp` will
always be equal to initialization `timestamp`.

- Support iproto feature push (#201).

### Changed
- Bump msgpack requirement to 1.0.4 (PR #223).
The only reason of this bump is various vulnerability fixes,
Expand Down
147 changes: 147 additions & 0 deletions docs/source/quick-start.rst
Original file line number Diff line number Diff line change
Expand Up @@ -200,3 +200,150 @@ read-write and read-only pool instances:
>>> resp = conn.select('demo', 'AAAA', mode=tarantool.Mode.PREFER_RO)
>>> resp
- ['AAAA', 'Alpha']


Receiving out-of-band messages
----------------------------------

Receiving out-of-band messages from a server that uses box.session.push
call is supported for methods: :meth:`~tarantool.Connection.call`,
:meth:`~tarantool.Connection.eval`, :meth:`~tarantool.Connection.select`,
:meth:`~tarantool.Connection.insert`, :meth:`~tarantool.Connection.replace`,
:meth:`~tarantool.Connection.update`, :meth:`~tarantool.Connection.upsert`,
:meth:`~tarantool.Connection.delete`.

To work with out-of-band messages, 2 optional arguments are used in
the methods listed above:

* `on_push` - callback, launched with the received data for each out-of-band message. Two arguments for this callback are expected:

* the first is the received from an out-of-band message data.

* the second is `on_push_ctx`, variable for working with callback context (for example, recording the result or pass data to callback).
* `on_push_ctx` - result of the `on_push` work can be written to this variable, or through this variable you can pass data to `on_push` callback.

Below is an example of the proposed API with method :meth:`~tarantool.Connection.call`
and :meth:`~tarantool.Connection.insert`. In the described example, before the end
of the :meth:`~tarantool.Connection.call` and :meth:`~tarantool.Connection.insert`,
out-of-band messages are processed via specified callback.

In the example below, two shells are used, in the first we will configure the server:

.. code-block:: lua

fiber = require('fiber')
box.cfg({listen = 3301})
box.schema.user.grant(
'guest',
'read,write,execute',
'universe'
)
function server_function()
x = {0,0}
while x[1] < 3 do
x[1] = x[1] + 1
fiber.sleep(1)
box.session.push(x)
end
fiber.sleep(1)
return x
end

In the second shell, we will execute a :meth:`~tarantool.Connection.call`
with receiving out-of-band messages from the server:

.. code-block:: python

import tarantool

def callback(data, on_push_ctx=[]):
print('run callback with data: ', data)
data[0][1] = data[0][1] + 1
on_push_ctx.append(data)

callback_res = []

conn = tarantool.Connection(port=3301)
res = conn.call(
'server_function',
on_push=callback,
on_push_ctx=callback_res
)

# receiving out-of-band messages,
# the conn.call is not finished yet.

>>> run callback with data: [[1, 0]]
>>> run callback with data: [[2, 0]]
>>> run callback with data: [[3, 0]]

# the conn.call is finished now.

print(res)
>>> [3, 0]

print(callback_res)
>>> [[[1, 1]], [[2, 1]], [[3, 1]]]

Let's go back to the first shell with the server and
create a space and a trigger for it:

.. code-block:: lua

box.schema.create_space(
'tester', {
format = {
{name = 'id', type = 'unsigned'},
{name = 'name', type = 'string'},
}
})
box.space.tester:create_index(
'primary_index', {
parts = {
{field = 1, type = 'unsigned'},
}
})
function on_replace_callback()
x = {0,0}
while x[1] < 300 do
x[1] = x[1] + 100
box.session.push(x)
end
return x
end
box.space.tester:on_replace(
on_replace_callback
)

Now, in the second shell, we will execute an :meth:`~tarantool.ConnectionPool.insert`
with out-of-band message processing:

.. code-block:: python

callback_res = []

conn_pool = tarantool.ConnectionPool(
[{'host':'localhost', 'port':3301}],
user='guest')

res = conn_pool.insert(
'tester',
(1, 'Mike'),
on_push=callback,
on_push_ctx=callback_res,
)

# receiving out-of-band messages,
# the conn_pool.insert is not finished yet.

>>> run callback with data: [[100, 0]]
>>> run callback with data: [[200, 0]]
>>> run callback with data: [[300, 0]]

# the conn_pool.insert is finished now.

print(res)
>>> [1, 'Mike']

print(callback_res)
>>> [[[100, 1]], [[200, 1]], [[300, 1]]]
Loading