@@ -9,11 +9,16 @@ import { TableUpdateProvider } from "./export";
9
9
import * as ProgressBar from "progress" ;
10
10
import { query , NamedConnection } from "./database" ;
11
11
import { injectable , inject } from "inversify" ;
12
- import * as path from 'path' ;
13
- import * as fs from 'fs' ;
14
- import { Semaphore } from '@gitpod/gitpod-protocol/lib/util/semaphore'
15
-
16
- export type PeriodicReplicatorProvider = ( source : Connection , targets : Connection [ ] , syncInterval : number , tableSet : string ) => PeriodicReplicator
12
+ import * as path from "path" ;
13
+ import * as fs from "fs" ;
14
+ import { Semaphore } from "@gitpod/gitpod-protocol/lib/util/semaphore" ;
15
+
16
+ export type PeriodicReplicatorProvider = (
17
+ source : Connection ,
18
+ targets : Connection [ ] ,
19
+ syncInterval : number ,
20
+ tableSet : string ,
21
+ ) => PeriodicReplicator ;
17
22
export const PeriodicReplicatorProvider = Symbol ( "PeriodicReplicatorProvider" ) ;
18
23
19
24
@injectable ( )
@@ -31,7 +36,12 @@ export class PeriodicReplicator {
31
36
32
37
// This is a weird setup and I'd rather have those fields set in the constructor.
33
38
// I have not found a way how to do that using inversify.
34
- public setup ( source : NamedConnection , targets : NamedConnection [ ] , syncInterval : number , tableSet : string | undefined ) {
39
+ public setup (
40
+ source : NamedConnection ,
41
+ targets : NamedConnection [ ] ,
42
+ syncInterval : number ,
43
+ tableSet : string | undefined ,
44
+ ) {
35
45
this . source = source ;
36
46
this . targets = targets ;
37
47
this . syncInterval = syncInterval ;
@@ -63,29 +73,37 @@ export class PeriodicReplicator {
63
73
const now = new Date ( ) ;
64
74
let previousRun = await this . getLastExportDate ( ) ;
65
75
console . info ( `Replicating ${ this . toString ( ) } : last ran on ${ previousRun } ` ) ;
66
- if ( ignoreStartDate ) {
67
- if ( previousRun && previousRun > now ) {
68
- console . warn ( `Previous run was in the future (${ previousRun } > now=${ now } ). Possible time sync issue between database and db-sync.` ) ;
76
+ if ( ignoreStartDate ) {
77
+ if ( previousRun && previousRun > now ) {
78
+ console . warn (
79
+ `Previous run was in the future (${ previousRun } > now=${ now } ). Possible time sync issue between database and db-sync.` ,
80
+ ) ;
69
81
}
70
82
71
83
console . info ( "Synchronizing complete database (ignoring previous run)" ) ;
72
84
previousRun = undefined ;
73
- } else if ( previousRun && previousRun > now ) {
74
- throw new Error ( `Previous run was in the future (${ previousRun } > now=${ now } ). Possible time sync issue between database and db-sync.` ) ;
85
+ } else if ( previousRun && previousRun > now ) {
86
+ throw new Error (
87
+ `Previous run was in the future (${ previousRun } > now=${ now } ). Possible time sync issue between database and db-sync.` ,
88
+ ) ;
75
89
}
76
90
77
- const modifications = await this . tableUpdateProvider . getAllStatementsForAllTables ( this . source , this . tableSet , previousRun ) ;
91
+ const modifications = await this . tableUpdateProvider . getAllStatementsForAllTables (
92
+ this . source ,
93
+ this . tableSet ,
94
+ previousRun ,
95
+ ) ;
78
96
const deletions = modifications . deletions ;
79
97
const updates = modifications . updates ;
80
98
const total = [ ...deletions , ...updates ] ;
81
- console . debug ( `Collected ${ total . length } statements` )
99
+ console . debug ( `Collected ${ total . length } statements` ) ;
82
100
try {
83
101
/* nowait */ this . logStatements ( now , total ) ;
84
102
85
- await Promise . all ( [ this . source , ...this . targets ] . map ( target => this . update ( target , deletions ) ) ) ;
86
- await Promise . all ( this . targets . map ( target => this . update ( target , updates ) ) ) ;
103
+ await Promise . all ( [ this . source , ...this . targets ] . map ( ( target ) => this . update ( target , deletions ) ) ) ;
104
+ await Promise . all ( this . targets . map ( ( target ) => this . update ( target , updates ) ) ) ;
87
105
await this . markLastExportDate ( now ) ;
88
- } catch ( err ) {
106
+ } catch ( err ) {
89
107
console . error ( "Error during replication" , err ) ;
90
108
}
91
109
}
@@ -94,7 +112,7 @@ export class PeriodicReplicator {
94
112
if ( ! this . logdir ) return ;
95
113
96
114
const logdir = this . logdir ;
97
- const dest = path . join ( logdir , `${ this . tableSet || ' default' } -${ now . getTime ( ) } .sql` ) ;
115
+ const dest = path . join ( logdir , `${ this . tableSet || " default" } -${ now . getTime ( ) } .sql` ) ;
98
116
try {
99
117
if ( ! ( await new Promise < boolean > ( ( resolve , reject ) => fs . exists ( logdir , resolve ) ) ) ) {
100
118
await new Promise < void > ( ( resolve , reject ) => {
@@ -105,20 +123,20 @@ export class PeriodicReplicator {
105
123
resolve ( ) ;
106
124
}
107
125
} ) ;
108
- } )
126
+ } ) ;
109
127
}
110
128
111
- const logfile = fs . createWriteStream ( dest , { flags : 'w' } ) ;
129
+ const logfile = fs . createWriteStream ( dest , { flags : "w" } ) ;
112
130
const semaphore = new Semaphore ( 1 ) ;
113
- logfile . on ( ' drain' , ( ) => semaphore . release ( ) ) ;
131
+ logfile . on ( " drain" , ( ) => semaphore . release ( ) ) ;
114
132
for ( const row of updates ) {
115
133
const written = logfile . write ( row + "\n" ) ;
116
134
if ( ! written ) {
117
135
await semaphore . acquire ( ) ;
118
136
}
119
137
}
120
138
console . debug ( `Log file ${ dest } written` ) ;
121
- } catch ( err ) {
139
+ } catch ( err ) {
122
140
console . warn ( "Error while writing log file to " + dest , err ) ;
123
141
}
124
142
@@ -127,23 +145,27 @@ export class PeriodicReplicator {
127
145
128
146
protected async deleteOldLogs ( logdir : string ) {
129
147
try {
130
- const files = await new Promise < string [ ] > ( ( resolve , reject ) => fs . readdir ( this . logdir ! , ( err : any , files : string [ ] ) => {
131
- if ( err ) {
132
- reject ( err ) ;
133
- } else {
134
- resolve ( files )
135
- }
136
- } ) ) ;
148
+ const files = await new Promise < string [ ] > ( ( resolve , reject ) =>
149
+ fs . readdir ( this . logdir ! , ( err : any , files : string [ ] ) => {
150
+ if ( err ) {
151
+ reject ( err ) ;
152
+ } else {
153
+ resolve ( files ) ;
154
+ }
155
+ } ) ,
156
+ ) ;
137
157
for ( const file of files ) {
138
158
// We don't care about errors during deletion: it's racy anyway (see "nowait" above), and will succeed next time
139
159
const filePath = path . join ( logdir , file ) ;
140
- const ctime = await new Promise < number > ( ( resolve , reject ) => fs . stat ( filePath , ( err , stats ) => {
141
- if ( ! err ) {
142
- resolve ( stats . ctimeMs ) ;
143
- }
144
- } ) ) ;
160
+ const ctime = await new Promise < number > ( ( resolve , reject ) =>
161
+ fs . stat ( filePath , ( err , stats ) => {
162
+ if ( ! err ) {
163
+ resolve ( stats . ctimeMs ) ;
164
+ }
165
+ } ) ,
166
+ ) ;
145
167
const now = Date . now ( ) ;
146
- const endTime = ctime + ( 2 * 24 * 60 * 60 ) ;
168
+ const endTime = ctime + 2 * 24 * 60 * 60 ;
147
169
if ( now > endTime ) {
148
170
fs . unlink ( filePath , ( _ ) => { } ) ;
149
171
}
@@ -155,13 +177,16 @@ export class PeriodicReplicator {
155
177
156
178
protected async getLastExportDate ( ) : Promise < Date | undefined > {
157
179
try {
158
- const rows = await query ( this . source , "SELECT value FROM gitpod_replication WHERE item = 'lastExport'" ) as any [ ] ;
159
- if ( rows . length > 0 ) {
160
- return new Date ( rows [ 0 ] [ 'value' ] as string ) ;
180
+ const rows = ( await query (
181
+ this . source ,
182
+ "SELECT value FROM gitpod_replication WHERE item = 'lastExport'" ,
183
+ ) ) as any [ ] ;
184
+ if ( rows . length > 0 ) {
185
+ return new Date ( rows [ 0 ] [ "value" ] as string ) ;
161
186
}
162
187
return undefined ;
163
- } catch ( err ) {
164
- if ( err . toString ( ) . indexOf ( "ER_NO_SUCH_TABLE" ) > - 1 ) {
188
+ } catch ( err ) {
189
+ if ( err . toString ( ) . indexOf ( "ER_NO_SUCH_TABLE" ) > - 1 ) {
165
190
return undefined ;
166
191
} else {
167
192
throw err ;
@@ -170,32 +195,39 @@ export class PeriodicReplicator {
170
195
}
171
196
172
197
protected async update ( target : Connection , updates : string [ ] , batchSize = 100 ) {
173
- const updateSize = updates . join ( ) . length ;
174
- const inTransaction = updateSize < ( 8 * 1024 * 1024 ) && this . useTransactions ;
175
- if ( inTransaction ) {
176
- await query ( target , "START TRANSACTION;" ) ;
177
- } else {
178
- console . warn ( "Update is too big (> 8mb) or transactions are disabled, not running in a transaction! Inconsistency is possible." ) ;
179
- }
180
-
181
- const bar = this . showProgressBar && ( updates . length > batchSize ) ? new ProgressBar ( 'inserting/updating [:bar] :rate/bps :percent :etas' , updates . length / batchSize ) : { tick : ( ) => { } , terminate : ( ) => { } } ;
198
+ // const updateSize = updates.join().length;
199
+ // const inTransaction = updateSize < (8 * 1024 * 1024) && this.useTransactions;
200
+ // if(inTransaction) {
201
+ // await query(target, "START TRANSACTION;");
202
+ // } else {
203
+ // console.warn("Update is too big (> 8mb) or transactions are disabled, not running in a transaction! Inconsistency is possible.");
204
+ // }
205
+ const inTransaction = false ;
206
+
207
+ const bar =
208
+ this . showProgressBar && updates . length > batchSize
209
+ ? new ProgressBar ( "inserting/updating [:bar] :rate/bps :percent :etas" , updates . length / batchSize )
210
+ : { tick : ( ) => { } , terminate : ( ) => { } } ;
182
211
try {
183
- for ( var i = 0 ; i < updates . length ; i += batchSize ) {
212
+ for ( var i = 0 ; i < updates . length ; i += batchSize ) {
184
213
const imax = Math . min ( i + batchSize , updates . length ) ;
185
214
const thisUpdate = updates . slice ( i , imax ) . join ( "" ) ;
186
215
await query ( target , thisUpdate ) ;
187
216
bar . tick ( ) ;
188
217
}
189
- if ( inTransaction ) {
218
+ if ( inTransaction ) {
190
219
console . debug ( "Modifications were OK. Committing transaction." ) ;
191
220
await query ( target , "COMMIT;" ) ;
192
221
}
193
- } catch ( err ) {
194
- if ( inTransaction ) {
222
+ } catch ( err ) {
223
+ if ( inTransaction ) {
195
224
console . error ( "Caught an error during modification. Rolling back transaction." , err ) ;
196
225
await query ( target , "ROLLBACK;" ) ;
197
226
} else {
198
- console . error ( "Caught an error during modification. NOT RUNNING IN A TRANSACTION. Data may be inconsistent." , err ) ;
227
+ console . error (
228
+ "Caught an error during modification. NOT RUNNING IN A TRANSACTION. Data may be inconsistent." ,
229
+ err ,
230
+ ) ;
199
231
}
200
232
throw err ;
201
233
} finally {
@@ -204,12 +236,18 @@ export class PeriodicReplicator {
204
236
}
205
237
206
238
async markLastExportDate ( date : Date ) {
207
- await query ( this . source , "CREATE TABLE IF NOT EXISTS gitpod_replication (item VARCHAR(36), value VARCHAR(255), PRIMARY KEY (item))" ) ;
208
- await query ( this . source , "INSERT INTO gitpod_replication VALUES ('lastExport', ?) ON DUPLICATE KEY UPDATE value=?" , { values : [ date . toISOString ( ) , date . toISOString ( ) ] } ) ;
239
+ await query (
240
+ this . source ,
241
+ "CREATE TABLE IF NOT EXISTS gitpod_replication (item VARCHAR(36), value VARCHAR(255), PRIMARY KEY (item))" ,
242
+ ) ;
243
+ await query (
244
+ this . source ,
245
+ "INSERT INTO gitpod_replication VALUES ('lastExport', ?) ON DUPLICATE KEY UPDATE value=?" ,
246
+ { values : [ date . toISOString ( ) , date . toISOString ( ) ] } ,
247
+ ) ;
209
248
}
210
249
211
250
public toString ( ) : string {
212
- return `${ this . source . name } -> [ ${ this . targets . map ( t => t . name ) . join ( ', ' ) } ]` ;
251
+ return `${ this . source . name } -> [ ${ this . targets . map ( ( t ) => t . name ) . join ( ", " ) } ]` ;
213
252
}
214
-
215
253
}
0 commit comments