@@ -78,6 +78,54 @@ class SentinelManagedSSLConnection(SentinelManagedConnection, SSLConnection):
78
78
pass
79
79
80
80
81
+ class SentinelConnectionPoolProxy :
82
+ def __init__ (
83
+ self ,
84
+ connection_pool ,
85
+ is_master ,
86
+ check_connection ,
87
+ service_name ,
88
+ sentinel_manager ,
89
+ ):
90
+ self .connection_pool_ref = weakref .ref (connection_pool )
91
+ self .is_master = is_master
92
+ self .check_connection = check_connection
93
+ self .service_name = service_name
94
+ self .sentinel_manager = sentinel_manager
95
+ self .reset ()
96
+
97
+ def reset (self ):
98
+ self .master_address = None
99
+ self .slave_rr_counter = None
100
+
101
+ def get_master_address (self ):
102
+ master_address = self .sentinel_manager .discover_master (self .service_name )
103
+ if self .is_master and self .master_address != master_address :
104
+ self .master_address = master_address
105
+ # disconnect any idle connections so that they reconnect
106
+ # to the new master the next time that they are used.
107
+ connection_pool = self .connection_pool_ref ()
108
+ if connection_pool is not None :
109
+ connection_pool .disconnect (inuse_connections = False )
110
+ return master_address
111
+
112
+ def rotate_slaves (self ):
113
+ slaves = self .sentinel_manager .discover_slaves (self .service_name )
114
+ if slaves :
115
+ if self .slave_rr_counter is None :
116
+ self .slave_rr_counter = random .randint (0 , len (slaves ) - 1 )
117
+ for _ in range (len (slaves )):
118
+ self .slave_rr_counter = (self .slave_rr_counter + 1 ) % len (slaves )
119
+ slave = slaves [self .slave_rr_counter ]
120
+ yield slave
121
+ # Fallback to the master connection
122
+ try :
123
+ yield self .get_master_address ()
124
+ except MasterNotFoundError :
125
+ pass
126
+ raise SlaveNotFoundError (f"No slave found for { self .service_name !r} " )
127
+
128
+
81
129
class SentinelConnectionPool (ConnectionPool ):
82
130
"""
83
131
Sentinel backed connection pool.
@@ -95,8 +143,15 @@ def __init__(self, service_name, sentinel_manager, **kwargs):
95
143
)
96
144
self .is_master = kwargs .pop ("is_master" , True )
97
145
self .check_connection = kwargs .pop ("check_connection" , False )
146
+ self .proxy = SentinelConnectionPoolProxy (
147
+ connection_pool = self ,
148
+ is_master = self .is_master ,
149
+ check_connection = self .check_connection ,
150
+ service_name = service_name ,
151
+ sentinel_manager = sentinel_manager ,
152
+ )
98
153
super ().__init__ (** kwargs )
99
- self .connection_kwargs ["connection_pool" ] = weakref .proxy ( self )
154
+ self .connection_kwargs ["connection_pool" ] = self .proxy
100
155
self .service_name = service_name
101
156
self .sentinel_manager = sentinel_manager
102
157
@@ -106,8 +161,11 @@ def __repr__(self):
106
161
107
162
def reset (self ):
108
163
super ().reset ()
109
- self .master_address = None
110
- self .slave_rr_counter = None
164
+ self .proxy .reset ()
165
+
166
+ @property
167
+ def master_address (self ):
168
+ return self .proxy .master_address
111
169
112
170
def owns_connection (self , connection ):
113
171
check = not self .is_master or (
@@ -117,31 +175,11 @@ def owns_connection(self, connection):
117
175
return check and parent .owns_connection (connection )
118
176
119
177
def get_master_address (self ):
120
- master_address = self .sentinel_manager .discover_master (self .service_name )
121
- if self .is_master :
122
- if self .master_address != master_address :
123
- self .master_address = master_address
124
- # disconnect any idle connections so that they reconnect
125
- # to the new master the next time that they are used.
126
- self .disconnect (inuse_connections = False )
127
- return master_address
178
+ return self .proxy .get_master_address ()
128
179
129
180
def rotate_slaves (self ):
130
181
"Round-robin slave balancer"
131
- slaves = self .sentinel_manager .discover_slaves (self .service_name )
132
- if slaves :
133
- if self .slave_rr_counter is None :
134
- self .slave_rr_counter = random .randint (0 , len (slaves ) - 1 )
135
- for _ in range (len (slaves )):
136
- self .slave_rr_counter = (self .slave_rr_counter + 1 ) % len (slaves )
137
- slave = slaves [self .slave_rr_counter ]
138
- yield slave
139
- # Fallback to the master connection
140
- try :
141
- yield self .get_master_address ()
142
- except MasterNotFoundError :
143
- pass
144
- raise SlaveNotFoundError (f"No slave found for { self .service_name !r} " )
182
+ return self .proxy .rotate_slaves ()
145
183
146
184
147
185
class Sentinel (SentinelCommands ):
0 commit comments