Skip to content

Commit 1f3a001

Browse files
Support connection lost
1 parent 21190e9 commit 1f3a001

File tree

3 files changed

+24
-11
lines changed

3 files changed

+24
-11
lines changed

TODO

+1-1
Original file line numberDiff line numberDiff line change
@@ -3,5 +3,5 @@
33
* Test transaction support
44
* MySQL 5.6 support: http://dev.mysql.com/doc/internals/en/row-based-replication.html#rows-event
55
* Support binlog_row_image=minimal or binlog_row_image=noblob
6-
* Support connection lost
6+
* Raise exception if too much connection lost
77
* Test log file change

pymysqlreplication/binlogstream.py

+17-7
Original file line numberDiff line numberDiff line change
@@ -17,9 +17,9 @@ def __init__(self, connection_settings = {}, resume_stream = False, blocking = F
1717
blocking: Read on stream is blocking
1818
only_events: Array of allowed events
1919
'''
20-
connection_settings['charset'] = 'utf8'
21-
self._stream_connection = pymysql.connect(**connection_settings)
22-
ctl_connection_settings = copy.copy(connection_settings)
20+
self.__connection_settings = connection_settings
21+
self.__connection_settings['charset'] = 'utf8'
22+
ctl_connection_settings = copy.copy(self.__connection_settings)
2323
ctl_connection_settings['db'] = 'information_schema'
2424
ctl_connection_settings['cursorclass'] = pymysql.cursors.DictCursor
2525
self.__ctl_connection = pymysql.connect(**ctl_connection_settings)
@@ -34,10 +34,13 @@ def __init__(self, connection_settings = {}, resume_stream = False, blocking = F
3434
self.table_map = {}
3535

3636
def close(self):
37-
self._stream_connection.close()
37+
if self.__connected:
38+
self._stream_connection.close()
39+
self.__connected = False
3840
self.__ctl_connection.close()
3941

4042
def __connect_to_stream(self):
43+
self._stream_connection = pymysql.connect(**self.__connection_settings)
4144
cur = self._stream_connection.cursor()
4245
cur.execute("SHOW MASTER STATUS")
4346
(log_file, log_pos) = cur.fetchone()[:2]
@@ -68,17 +71,24 @@ def __connect_to_stream(self):
6871
self.__connected = True
6972

7073
def fetchone(self):
71-
if self.__connected == False:
72-
self.__connect_to_stream()
7374
while True:
74-
pkt = self._stream_connection.read_packet()
75+
if self.__connected == False:
76+
self.__connect_to_stream()
77+
pkt = None
78+
try:
79+
pkt = self._stream_connection.read_packet()
80+
except pymysql.OperationalError as (code, message):
81+
if code == 2013: #2013: Connection Lost
82+
self.__connected = False
83+
continue
7584
if not pkt.is_ok_packet():
7685
return None
7786
binlog_event = BinLogPacketWrapper(pkt, self.table_map, self.__ctl_connection)
7887
if binlog_event.event_type == TABLE_MAP_EVENT:
7988
self.table_map[binlog_event.event.table_id] = binlog_event.event
8089
if self.__filter_event(binlog_event.event):
8190
continue
91+
self.__log_pos = binlog_event.log_pos
8292
return binlog_event.event
8393

8494
def __filter_event(self, event):

pymysqlreplication/tests/test_basic.py

+6-3
Original file line numberDiff line numberDiff line change
@@ -26,22 +26,25 @@ def test_connection_lost_event(self):
2626
query = "CREATE TABLE test (id INT NOT NULL AUTO_INCREMENT, data VARCHAR (50) NOT NULL, PRIMARY KEY (id))"
2727
self.execute(query)
2828
query2 = "INSERT INTO test (data) VALUES('a')";
29-
for i in range(0, 1000):
29+
for i in range(0, 10000):
3030
self.execute(query2)
31+
self.execute("COMMIT")
3132

3233
#RotateEvent
3334
self.stream.fetchone()
3435

35-
self.conn_control.kill(self.stream._stream_connection.thread_id())
36+
3637
#FormatDescription
3738
self.stream.fetchone()
3839

3940
event = self.stream.fetchone()
4041
self.assertIsInstance(event, QueryEvent)
4142
self.assertEqual(event.query, query)
43+
44+
self.conn_control.kill(self.stream._stream_connection.thread_id())
4245
for i in range(0, 1000):
4346
event = self.stream.fetchone()
44-
self.assertEqual(event.query, query2)
47+
self.assertIsNotNone(event)
4548

4649
def test_filtering_events(self):
4750
self.stream.close()

0 commit comments

Comments
 (0)