Skip to content

Commit 21190e9

Browse files
Implement a test for connection lost
1 parent 31da057 commit 21190e9

File tree

3 files changed

+38
-9
lines changed

3 files changed

+38
-9
lines changed

TODO

+1
Original file line numberDiff line numberDiff line change
@@ -4,3 +4,4 @@
44
* MySQL 5.6 support: http://dev.mysql.com/doc/internals/en/row-based-replication.html#rows-event
55
* Support binlog_row_image=minimal or binlog_row_image=noblob
66
* Support connection lost
7+
* Test log file change

pymysqlreplication/binlogstream.py

+13-9
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,7 @@ def __init__(self, connection_settings = {}, resume_stream = False, blocking = F
1818
only_events: Array of allowed events
1919
'''
2020
connection_settings['charset'] = 'utf8'
21-
self.__stream_connection = pymysql.connect(**connection_settings)
21+
self._stream_connection = pymysql.connect(**connection_settings)
2222
ctl_connection_settings = copy.copy(connection_settings)
2323
ctl_connection_settings['db'] = 'information_schema'
2424
ctl_connection_settings['cursorclass'] = pymysql.cursors.DictCursor
@@ -28,16 +28,17 @@ def __init__(self, connection_settings = {}, resume_stream = False, blocking = F
2828
self.__blocking = blocking
2929
self.__only_events = only_events
3030
self.__server_id = server_id
31+
self.__log_pos = None
3132

3233
#Store table meta informations
3334
self.table_map = {}
3435

3536
def close(self):
36-
self.__stream_connection.close()
37+
self._stream_connection.close()
3738
self.__ctl_connection.close()
3839

3940
def __connect_to_stream(self):
40-
cur = self.__stream_connection.cursor()
41+
cur = self._stream_connection.cursor()
4142
cur.execute("SHOW MASTER STATUS")
4243
(log_file, log_pos) = cur.fetchone()[:2]
4344
cur.close()
@@ -50,24 +51,27 @@ def __connect_to_stream(self):
5051
command = COM_BINLOG_DUMP
5152
prelude = struct.pack('<i', len(log_file) + 11) \
5253
+ int2byte(command)
53-
if self.__resume_stream:
54-
prelude += struct.pack('<I', log_pos)
54+
if self.__log_pos is None:
55+
if self.__resume_stream:
56+
prelude += struct.pack('<I', log_pos)
57+
else:
58+
prelude += struct.pack('<I', 4)
5559
else:
56-
prelude += struct.pack('<I', 4)
60+
prelude += struct.pack('<I', self.__log_pos)
5761
if self.__blocking:
5862
prelude += struct.pack('<h', 0)
5963
else:
6064
prelude += struct.pack('<h', 1)
6165
prelude += struct.pack('<I', self.__server_id)
62-
self.__stream_connection.wfile.write(prelude + log_file.encode())
63-
self.__stream_connection.wfile.flush()
66+
self._stream_connection.wfile.write(prelude + log_file.encode())
67+
self._stream_connection.wfile.flush()
6468
self.__connected = True
6569

6670
def fetchone(self):
6771
if self.__connected == False:
6872
self.__connect_to_stream()
6973
while True:
70-
pkt = self.__stream_connection.read_packet()
74+
pkt = self._stream_connection.read_packet()
7175
if not pkt.is_ok_packet():
7276
return None
7377
binlog_event = BinLogPacketWrapper(pkt, self.table_map, self.__ctl_connection)

pymysqlreplication/tests/test_basic.py

+24
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,30 @@ def test_read_query_event(self):
1919
self.assertIsInstance(event, QueryEvent)
2020
self.assertEqual(event.query, query)
2121

22+
def test_connection_lost_event(self):
23+
self.stream.close()
24+
self.stream = BinLogStreamReader(connection_settings = self.database, blocking = True)
25+
26+
query = "CREATE TABLE test (id INT NOT NULL AUTO_INCREMENT, data VARCHAR (50) NOT NULL, PRIMARY KEY (id))"
27+
self.execute(query)
28+
query2 = "INSERT INTO test (data) VALUES('a')";
29+
for i in range(0, 1000):
30+
self.execute(query2)
31+
32+
#RotateEvent
33+
self.stream.fetchone()
34+
35+
self.conn_control.kill(self.stream._stream_connection.thread_id())
36+
#FormatDescription
37+
self.stream.fetchone()
38+
39+
event = self.stream.fetchone()
40+
self.assertIsInstance(event, QueryEvent)
41+
self.assertEqual(event.query, query)
42+
for i in range(0, 1000):
43+
event = self.stream.fetchone()
44+
self.assertEqual(event.query, query2)
45+
2246
def test_filtering_events(self):
2347
self.stream.close()
2448
self.stream = BinLogStreamReader(connection_settings = self.database, only_events = [QueryEvent])

0 commit comments

Comments
 (0)