Skip to content

Implement reconnection strategy class #109

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 3 commits into from
Oct 16, 2018
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions .gitmodules
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
[submodule "test-run"]
path = test-run
url = https://github.com/tarantool/test-run
4 changes: 3 additions & 1 deletion Makefile
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
.PHONY: test
test:
python -m pytest
python setup.py test
cd test && ./test-run.py
coverage:
python -m coverage run -p --source=. setup.py test
cov-html:
2 changes: 1 addition & 1 deletion setup.py
Original file line number Diff line number Diff line change
@@ -39,7 +39,7 @@
# Test runner
# python setup.py test
try:
from tests.setup_command import test
from unit.setup_command import test
cmdclass["test"] = test
except ImportError:
pass
28 changes: 26 additions & 2 deletions tarantool/__init__.py
Original file line number Diff line number Diff line change
@@ -2,6 +2,7 @@
# pylint: disable=C0301,W0105,W0401,W0614

from tarantool.connection import Connection
from tarantool.mesh_connection import MeshConnection
from tarantool.const import (
SOCKET_TIMEOUT,
RECONNECT_MAX_ATTEMPTS,
@@ -50,5 +51,28 @@ def connect(host="localhost", port=33013, user=None, password=None,
encoding=encoding)


__all__ = ['connect', 'Connection', 'Schema', 'Error', 'DatabaseError',
'NetworkError', 'NetworkWarning', 'SchemaError']
def connectmesh(addrs=({'host': 'localhost', 'port': 3301},), user=None,
password=None, encoding=ENCODING_DEFAULT):
'''
Create a connection to the mesh of Tarantool servers.

:param list addrs: A list of maps: {'host':(HOSTNAME|IP_ADDR), 'port':PORT}.

:rtype: :class:`~tarantool.mesh_connection.MeshConnection`

:raise: `NetworkError`
'''

return MeshConnection(addrs=addrs,
user=user,
password=password,
socket_timeout=SOCKET_TIMEOUT,
reconnect_max_attempts=RECONNECT_MAX_ATTEMPTS,
reconnect_delay=RECONNECT_DELAY,
connect_now=True,
encoding=encoding)


__all__ = ['connect', 'Connection', 'connectmesh', 'MeshConnection', 'Schema',
'Error', 'DatabaseError', 'NetworkError', 'NetworkWarning',
'SchemaError']
65 changes: 65 additions & 0 deletions tarantool/mesh_connection.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
# -*- coding: utf-8 -*-
'''
This module provides MeshConnection class with automatic switch
between tarantool instances and basic Round-Robin strategy.
'''

from tarantool.connection import Connection
from tarantool.error import NetworkError
from tarantool.utils import ENCODING_DEFAULT
from tarantool.const import (
SOCKET_TIMEOUT,
RECONNECT_MAX_ATTEMPTS,
RECONNECT_DELAY
)


class RoundRobinStrategy(object):
def __init__(self, addrs):
self.addrs = addrs
self.pos = 0

def getnext(self):
tmp = self.pos
self.pos = (self.pos + 1) % len(self.addrs)
return self.addrs[tmp]


class MeshConnection(Connection):
def __init__(self, addrs,
user=None,
password=None,
socket_timeout=SOCKET_TIMEOUT,
reconnect_max_attempts=RECONNECT_MAX_ATTEMPTS,
reconnect_delay=RECONNECT_DELAY,
connect_now=True,
encoding=ENCODING_DEFAULT,
strategy_class=RoundRobinStrategy):
self.nattempts = 2 * len(addrs) + 1
self.strategy = strategy_class(addrs)
addr = self.strategy.getnext()
host = addr['host']
port = addr['port']
super(MeshConnection, self).__init__(host=host,
port=port,
user=user,
password=password,
socket_timeout=socket_timeout,
reconnect_max_attempts=reconnect_max_attempts,
reconnect_delay=reconnect_delay,
connect_now=connect_now,
encoding=encoding)

def _opt_reconnect(self):
nattempts = self.nattempts
while nattempts > 0:
try:
super(MeshConnection, self)._opt_reconnect()
break
except NetworkError:
nattempts -= 1
addr = self.strategy.getnext()
self.host = addr['host']
self.port = addr['port']
else:
raise NetworkError
1 change: 1 addition & 0 deletions test-run
Submodule test-run added at b85d7e
15 changes: 15 additions & 0 deletions test/.tarantoolctl
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
-- Options for test-run tarantoolctl

local workdir = os.getenv('TEST_WORKDIR')
default_cfg = {
pid_file = workdir,
wal_dir = workdir,
memtx_dir = workdir,
vinyl_dir = workdir,
log = workdir,
background = false,
}

instance_dir = workdir

-- vim: set ft=lua :
16 changes: 16 additions & 0 deletions test/cluster-py/instance.lua
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
#!/usr/bin/env tarantool

local INSTANCE_ID = string.match(arg[0], "%d")
local SOCKET_DIR = require('fio').cwd()

local function instance_uri(instance_id)
return SOCKET_DIR..'/instance'..instance_id..'.sock';
end

require('console').listen(os.getenv('ADMIN'))

box.cfg({
--listen = os.getenv("LISTEN"),
listen = instance_uri(INSTANCE_ID),
memtx_memory = 107374182,
})
1 change: 1 addition & 0 deletions test/cluster-py/instance1.lua
1 change: 1 addition & 0 deletions test/cluster-py/instance2.lua
9 changes: 9 additions & 0 deletions test/cluster-py/master.lua
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
#!/usr/bin/env tarantool
os = require('os')
box.cfg({
listen = os.getenv("LISTEN"),
memtx_memory = 107374182,
replication_timeout = 0.1
})

require('console').listen(os.getenv('ADMIN'))
22 changes: 22 additions & 0 deletions test/cluster-py/multi.result
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
box.schema.user.grant('guest', 'read,write,execute', 'universe')
---
...
_ = box.schema.space.create('test')
---
...
_ = box.space.test:create_index('primary')
---
...
box.schema.user.grant('guest', 'read,write,execute', 'universe')
---
...
_ = box.schema.space.create('test')
---
...
_ = box.space.test:create_index('primary')
---
...
- [1, 0]
- [1, 1]
- [1, 0]
NetworkError !
71 changes: 71 additions & 0 deletions test/cluster-py/multi.test.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
import sys
import os
import time
import yaml
from lib.tarantool_server import TarantoolServer
sys.path.append('../tarantool')
from mesh_connection import MeshConnection
from tarantool.const import (
SOCKET_TIMEOUT,
RECONNECT_DELAY,
)
from tarantool.error import NetworkError
from tarantool.utils import ENCODING_DEFAULT

INSTANCE_N = 2


def check_connection(con):
try:
s = con.space('test')
print s.select()
except NetworkError:
print 'NetworkError !'
except Exception as e:
print e


# Start instances
master = server
cluster = [master]
for i in range(INSTANCE_N):
server = TarantoolServer(server.ini)
server.script = 'cluster-py/instance%d.lua' % (i+1)
server.vardir = os.path.join(server.vardir, 'instance', str(i))
server.deploy()
server.admin("box.schema.user.grant('guest', 'read,write,execute', 'universe')")
server.admin("_ = box.schema.space.create('test')")
server.admin("_ = box.space.test:create_index('primary')")
server.admin("box.space.test:insert{%d, %s}" % (1, i), silent = True)
cluster.append(server)

# Make a list of servers
sources = []
for server in cluster[1:]:
sources.append(yaml.load(server.admin('box.cfg.listen', silent=True))[0])

addrs = []
for addr in sources:
addrs.append({'host': None, 'port': addr})

con = MeshConnection(addrs=addrs,
user=None,
password=None,
socket_timeout=SOCKET_TIMEOUT,
reconnect_max_attempts=0,
reconnect_delay=RECONNECT_DELAY,
connect_now=True,
encoding=ENCODING_DEFAULT)

cluster[0].stop() # stop server - no effect
check_connection(con) # instance#1
cluster[1].stop() # stop instance#1
check_connection(con) # instance#2
cluster[1].start() # start instance#1
cluster[2].stop() # stop instance#2
check_connection(con) # instance#1 again
cluster[1].stop() # stop instance#1
check_connection(con) # both stopped: NetworkError !

master.cleanup()
master.deploy()
5 changes: 5 additions & 0 deletions test/cluster-py/suite.ini
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
[default]
core = tarantool
script = master.lua
description = reconnect
is_parallel = False
1 change: 1 addition & 0 deletions test/test-run.py
File renamed without changes.
2 changes: 1 addition & 1 deletion tests/setup_command.py → unit/setup_command.py
Original file line number Diff line number Diff line change
@@ -22,6 +22,6 @@ def run(self):
Find all tests in test/tarantool/ and run them
'''

tests = unittest.defaultTestLoader.discover('tests')
tests = unittest.defaultTestLoader.discover('unit')
test_runner = unittest.TextTestRunner(verbosity = 2)
test_runner.run(tests)
File renamed without changes.
File renamed without changes.
File renamed without changes.
File renamed without changes.
2 changes: 1 addition & 1 deletion tests/suites/test_dml.py → unit/suites/test_dml.py
Original file line number Diff line number Diff line change
@@ -11,7 +11,7 @@ def setUpClass(self):
print(' DML '.center(70, '='))
print('-' * 70)
self.srv = TarantoolServer()
self.srv.script = 'tests/suites/box.lua'
self.srv.script = 'unit/suites/box.lua'
self.srv.start()
self.con = tarantool.Connection('localhost', self.srv.args['primary'])
self.adm = self.srv.admin
File renamed without changes.
2 changes: 1 addition & 1 deletion tests/suites/test_schema.py → unit/suites/test_schema.py
Original file line number Diff line number Diff line change
@@ -10,7 +10,7 @@ def setUpClass(self):
print(' SCHEMA '.center(70, '='))
print('-' * 70)
self.srv = TarantoolServer()
self.srv.script = 'tests/suites/box.lua'
self.srv.script = 'unit/suites/box.lua'
self.srv.start()
self.con = tarantool.Connection('localhost', self.srv.args['primary'])
self.sch = self.con.schema