@@ -13,55 +13,81 @@ class RandomAggregatorPool:
13
13
aggregators by periodically calling `SHOW AGGREGATORS`.
14
14
"""
15
15
16
- def __init__ (self ):
16
+ def __init__ (self , host , port , user = 'root' , password = '' , database = 'information_schema' ):
17
+ """ Initialize the RandomAggregatorPool with connection
18
+ information for an aggregator in a MemSQL Distributed System.
19
+
20
+ All aggregator connections will share the same user/password/database.
21
+ """
17
22
self .logger = logging .getLogger ('memsql.random_aggregator_pool' )
18
23
self ._pool = ConnectionPool ()
19
- self ._aggregators = []
20
- self ._aggregator = None
21
24
self ._refresh_aggregator_list = memoize (30 )(self ._update_aggregator_list )
22
25
self ._lock = threading .Lock ()
23
26
24
- def connect (self , host , port , user , password , database ):
25
- conn = self ._connect (host , port , user , password , database )
27
+ self ._primary_aggregator = (host , port )
28
+ self ._user = user
29
+ self ._password = password
30
+ self ._database = database
31
+ self ._aggregators = []
32
+ self ._aggregator = None
33
+
34
+ def connect (self ):
35
+ """ Returns an aggregator connection, and periodically updates the aggregator list. """
36
+ conn = self ._connect ()
26
37
self ._refresh_aggregator_list (conn )
27
38
return conn
28
39
29
- def _connect (self , host , port , user , password , database ):
40
+ def _pool_connect (self , agg ):
41
+ """ `agg` should be (host, port)
42
+ Returns a live connection from the connection pool
43
+ """
44
+ return self ._pool .connect (agg [0 ], agg [1 ], self ._user , self ._password , self ._database )
45
+
46
+ def _connect (self ):
47
+ """ Returns an aggregator connection. """
30
48
if self ._aggregator :
31
49
try :
32
- return self ._pool . connect (self ._aggregator [ 0 ], self . _aggregator [ 1 ], user , password , database )
50
+ return self ._pool_connect (self ._aggregator )
33
51
except PoolConnectionException :
34
52
self ._aggregator = None
35
- pass
36
53
37
- if not self ._aggregators :
38
- with self ._pool . connect ( host , port , user , password , database ) as conn :
54
+ if not len ( self ._aggregators ) :
55
+ with self ._pool_connect ( self . _primary_aggregator ) as conn :
39
56
self ._update_aggregator_list (conn )
40
57
conn .expire ()
41
58
42
- assert self ._aggregators , "Failed to retrieve a list of aggregators"
59
+ with self ._lock :
60
+ random .shuffle (self ._aggregators )
43
61
44
- # we will run a few attempts of connecting to an
45
- # aggregator
46
62
last_exception = None
47
- for i in range (10 ):
48
- with self ._lock :
49
- self ._aggregator = random .choice (self ._aggregators )
50
-
51
- self .logger .debug ('Connecting to %s:%s' % (self ._aggregator [0 ], self ._aggregator [1 ]))
63
+ for aggregator in self ._aggregators :
64
+ self .logger .debug ('Attempting connection with %s:%s' % (aggregator [0 ], aggregator [1 ]))
52
65
53
66
try :
54
- return self ._pool .connect (self ._aggregator [0 ], self ._aggregator [1 ], user , password , database )
67
+ conn = self ._pool_connect (aggregator )
68
+ # connection successful!
69
+ self ._aggregator = aggregator
70
+ return conn
55
71
except PoolConnectionException as e :
72
+ # connection error
56
73
last_exception = e
57
74
else :
58
75
with self ._lock :
76
+ # bad news bears... try again later
59
77
self ._aggregator = None
60
78
self ._aggregators = []
61
79
62
80
raise last_exception
63
81
64
82
def _update_aggregator_list (self , conn ):
83
+ rows = conn .query ('SHOW AGGREGATORS' )
65
84
with self ._lock :
66
- self ._aggregators = [(r ['Host' ], r ['Port' ]) for r in conn .query ('SHOW AGGREGATORS' )]
67
- self .logger .debug ('Aggregator list is updated to %s. Current aggregator is %s.' % (self ._aggregators , self ._aggregator ))
85
+ self ._aggregators = []
86
+ for row in rows :
87
+ if row .Host == '127.0.0.1' :
88
+ # this is the aggregator we are connecting to
89
+ row ['Host' ] = conn .connection_info ()[0 ]
90
+ self ._aggregators .append ((row .Host , row .Port ))
91
+
92
+ assert self ._aggregators , "Failed to retrieve a list of aggregators"
93
+ self .logger .debug ('Aggregator list is updated to %s. Current aggregator is %s.' % (self ._aggregators , self ._aggregator ))
0 commit comments