@@ -23,7 +23,7 @@ func (b *BinlogSyncer) StartBackup(backupDir string, p Position, timeout time.Du
23
23
return os .OpenFile (path .Join (backupDir , filename ), os .O_CREATE | os .O_WRONLY , 0644 )
24
24
})
25
25
} else {
26
- return b .StartSynchronousBackup (p )
26
+ return b .StartSynchronousBackup (p , timeout )
27
27
}
28
28
}
29
29
@@ -89,7 +89,7 @@ func (b *BinlogSyncer) StartBackupWithHandler(p Position, timeout time.Duration,
89
89
}
90
90
91
91
// StartSynchronousBackup starts the backup process using the SynchronousEventHandler in the BinlogSyncerConfig.
92
- func (b * BinlogSyncer ) StartSynchronousBackup (p Position ) error {
92
+ func (b * BinlogSyncer ) StartSynchronousBackup (p Position , timeout time. Duration ) error {
93
93
if b .cfg .SynchronousEventHandler == nil {
94
94
return errors .New ("SynchronousEventHandler must be set in BinlogSyncerConfig to use StartSynchronousBackup" )
95
95
}
@@ -99,10 +99,25 @@ func (b *BinlogSyncer) StartSynchronousBackup(p Position) error {
99
99
return errors .Trace (err )
100
100
}
101
101
102
+ var ctx context.Context
103
+ var cancel context.CancelFunc
104
+
105
+ if timeout > 0 {
106
+ ctx , cancel = context .WithTimeout (context .Background (), timeout )
107
+ defer cancel ()
108
+ } else {
109
+ ctx = context .Background ()
110
+ }
111
+
102
112
select {
113
+ case <- ctx .Done ():
114
+ // The timeout has been reached
115
+ return nil
103
116
case <- b .ctx .Done ():
117
+ // The BinlogSyncer has been closed
104
118
return nil
105
119
case err := <- s .ech :
120
+ // An error occurred during streaming
106
121
return errors .Trace (err )
107
122
}
108
123
}
0 commit comments