From 060eba3db3fa6379d04b561a77fca8522b9792bf Mon Sep 17 00:00:00 2001 From: Gero Posmyk-Leinemann Date: Fri, 16 Sep 2022 17:08:08 +0000 Subject: [PATCH] [db-sync] Fix it --- components/ee/db-sync/src/replication.ts | 152 ++++++++++++++--------- components/gitpod-db/src/tables.ts | 2 +- 2 files changed, 96 insertions(+), 58 deletions(-) diff --git a/components/ee/db-sync/src/replication.ts b/components/ee/db-sync/src/replication.ts index a4b140f7171297..935ce138f08a84 100644 --- a/components/ee/db-sync/src/replication.ts +++ b/components/ee/db-sync/src/replication.ts @@ -9,11 +9,16 @@ import { TableUpdateProvider } from "./export"; import * as ProgressBar from "progress"; import { query, NamedConnection } from "./database"; import { injectable, inject } from "inversify"; -import * as path from 'path'; -import * as fs from 'fs'; -import { Semaphore } from '@gitpod/gitpod-protocol/lib/util/semaphore' - -export type PeriodicReplicatorProvider = (source: Connection, targets: Connection[], syncInterval: number, tableSet: string) => PeriodicReplicator +import * as path from "path"; +import * as fs from "fs"; +import { Semaphore } from "@gitpod/gitpod-protocol/lib/util/semaphore"; + +export type PeriodicReplicatorProvider = ( + source: Connection, + targets: Connection[], + syncInterval: number, + tableSet: string, +) => PeriodicReplicator; export const PeriodicReplicatorProvider = Symbol("PeriodicReplicatorProvider"); @injectable() @@ -31,7 +36,12 @@ export class PeriodicReplicator { // This is a weird setup and I'd rather have those fields set in the constructor. // I have not found a way how to do that using inversify. - public setup(source: NamedConnection, targets: NamedConnection[], syncInterval: number, tableSet: string | undefined) { + public setup( + source: NamedConnection, + targets: NamedConnection[], + syncInterval: number, + tableSet: string | undefined, + ) { this.source = source; this.targets = targets; this.syncInterval = syncInterval; @@ -63,29 +73,37 @@ export class PeriodicReplicator { const now = new Date(); let previousRun = await this.getLastExportDate(); console.info(`Replicating ${this.toString()}: last ran on ${previousRun}`); - if(ignoreStartDate) { - if(previousRun && previousRun > now) { - console.warn(`Previous run was in the future (${previousRun} > now=${now}). Possible time sync issue between database and db-sync.`); + if (ignoreStartDate) { + if (previousRun && previousRun > now) { + console.warn( + `Previous run was in the future (${previousRun} > now=${now}). Possible time sync issue between database and db-sync.`, + ); } console.info("Synchronizing complete database (ignoring previous run)"); previousRun = undefined; - } else if(previousRun && previousRun > now) { - throw new Error(`Previous run was in the future (${previousRun} > now=${now}). Possible time sync issue between database and db-sync.`); + } else if (previousRun && previousRun > now) { + throw new Error( + `Previous run was in the future (${previousRun} > now=${now}). Possible time sync issue between database and db-sync.`, + ); } - const modifications = await this.tableUpdateProvider.getAllStatementsForAllTables(this.source, this.tableSet, previousRun); + const modifications = await this.tableUpdateProvider.getAllStatementsForAllTables( + this.source, + this.tableSet, + previousRun, + ); const deletions = modifications.deletions; const updates = modifications.updates; const total = [...deletions, ...updates]; - console.debug(`Collected ${total.length} statements`) + console.debug(`Collected ${total.length} statements`); try { /* nowait */ this.logStatements(now, total); - await Promise.all([ this.source, ...this.targets ].map(target => this.update(target, deletions))); - await Promise.all(this.targets.map(target => this.update(target, updates))); + await Promise.all([this.source, ...this.targets].map((target) => this.update(target, deletions))); + await Promise.all(this.targets.map((target) => this.update(target, updates))); await this.markLastExportDate(now); - } catch(err) { + } catch (err) { console.error("Error during replication", err); } } @@ -94,7 +112,7 @@ export class PeriodicReplicator { if (!this.logdir) return; const logdir = this.logdir; - const dest = path.join(logdir, `${this.tableSet || 'default'}-${now.getTime()}.sql`); + const dest = path.join(logdir, `${this.tableSet || "default"}-${now.getTime()}.sql`); try { if (!(await new Promise((resolve, reject) => fs.exists(logdir, resolve)))) { await new Promise((resolve, reject) => { @@ -105,12 +123,12 @@ export class PeriodicReplicator { resolve(); } }); - }) + }); } - const logfile = fs.createWriteStream(dest, { flags: 'w' }); + const logfile = fs.createWriteStream(dest, { flags: "w" }); const semaphore = new Semaphore(1); - logfile.on('drain', () => semaphore.release()); + logfile.on("drain", () => semaphore.release()); for (const row of updates) { const written = logfile.write(row + "\n"); if (!written) { @@ -118,7 +136,7 @@ export class PeriodicReplicator { } } console.debug(`Log file ${dest} written`); - } catch(err) { + } catch (err) { console.warn("Error while writing log file to " + dest, err); } @@ -127,23 +145,27 @@ export class PeriodicReplicator { protected async deleteOldLogs(logdir: string) { try { - const files = await new Promise((resolve, reject) => fs.readdir(this.logdir!, (err: any, files: string[]) => { - if (err) { - reject(err); - } else { - resolve(files) - } - })); + const files = await new Promise((resolve, reject) => + fs.readdir(this.logdir!, (err: any, files: string[]) => { + if (err) { + reject(err); + } else { + resolve(files); + } + }), + ); for (const file of files) { // We don't care about errors during deletion: it's racy anyway (see "nowait" above), and will succeed next time const filePath = path.join(logdir, file); - const ctime = await new Promise((resolve, reject) => fs.stat(filePath, (err, stats) => { - if (!err) { - resolve(stats.ctimeMs); - } - })); + const ctime = await new Promise((resolve, reject) => + fs.stat(filePath, (err, stats) => { + if (!err) { + resolve(stats.ctimeMs); + } + }), + ); const now = Date.now(); - const endTime = ctime + (2 * 24 * 60 * 60); + const endTime = ctime + 2 * 24 * 60 * 60; if (now > endTime) { fs.unlink(filePath, (_) => {}); } @@ -155,13 +177,16 @@ export class PeriodicReplicator { protected async getLastExportDate(): Promise { try { - const rows = await query(this.source, "SELECT value FROM gitpod_replication WHERE item = 'lastExport'") as any[]; - if(rows.length > 0) { - return new Date(rows[0]['value'] as string); + const rows = (await query( + this.source, + "SELECT value FROM gitpod_replication WHERE item = 'lastExport'", + )) as any[]; + if (rows.length > 0) { + return new Date(rows[0]["value"] as string); } return undefined; - } catch(err) { - if(err.toString().indexOf("ER_NO_SUCH_TABLE") > -1) { + } catch (err) { + if (err.toString().indexOf("ER_NO_SUCH_TABLE") > -1) { return undefined; } else { throw err; @@ -170,32 +195,39 @@ export class PeriodicReplicator { } protected async update(target: Connection, updates: string[], batchSize = 100) { - const updateSize = updates.join().length; - const inTransaction = updateSize < (8 * 1024 * 1024) && this.useTransactions; - if(inTransaction) { - await query(target, "START TRANSACTION;"); - } else { - console.warn("Update is too big (> 8mb) or transactions are disabled, not running in a transaction! Inconsistency is possible."); - } - - const bar = this.showProgressBar && (updates.length > batchSize) ? new ProgressBar('inserting/updating [:bar] :rate/bps :percent :etas', updates.length / batchSize) : { tick: () => {}, terminate: () => {} }; + // const updateSize = updates.join().length; + // const inTransaction = updateSize < (8 * 1024 * 1024) && this.useTransactions; + // if(inTransaction) { + // await query(target, "START TRANSACTION;"); + // } else { + // console.warn("Update is too big (> 8mb) or transactions are disabled, not running in a transaction! Inconsistency is possible."); + // } + const inTransaction = false; + + const bar = + this.showProgressBar && updates.length > batchSize + ? new ProgressBar("inserting/updating [:bar] :rate/bps :percent :etas", updates.length / batchSize) + : { tick: () => {}, terminate: () => {} }; try { - for(var i = 0; i < updates.length; i += batchSize) { + for (var i = 0; i < updates.length; i += batchSize) { const imax = Math.min(i + batchSize, updates.length); const thisUpdate = updates.slice(i, imax).join(""); await query(target, thisUpdate); bar.tick(); } - if(inTransaction) { + if (inTransaction) { console.debug("Modifications were OK. Committing transaction."); await query(target, "COMMIT;"); } - } catch(err) { - if(inTransaction) { + } catch (err) { + if (inTransaction) { console.error("Caught an error during modification. Rolling back transaction.", err); await query(target, "ROLLBACK;"); } else { - console.error("Caught an error during modification. NOT RUNNING IN A TRANSACTION. Data may be inconsistent.", err); + console.error( + "Caught an error during modification. NOT RUNNING IN A TRANSACTION. Data may be inconsistent.", + err, + ); } throw err; } finally { @@ -204,12 +236,18 @@ export class PeriodicReplicator { } async markLastExportDate(date: Date) { - await query(this.source, "CREATE TABLE IF NOT EXISTS gitpod_replication (item VARCHAR(36), value VARCHAR(255), PRIMARY KEY (item))"); - await query(this.source, "INSERT INTO gitpod_replication VALUES ('lastExport', ?) ON DUPLICATE KEY UPDATE value=?", { values: [ date.toISOString(), date.toISOString() ] }); + await query( + this.source, + "CREATE TABLE IF NOT EXISTS gitpod_replication (item VARCHAR(36), value VARCHAR(255), PRIMARY KEY (item))", + ); + await query( + this.source, + "INSERT INTO gitpod_replication VALUES ('lastExport', ?) ON DUPLICATE KEY UPDATE value=?", + { values: [date.toISOString(), date.toISOString()] }, + ); } public toString(): string { - return `${this.source.name} -> [ ${this.targets.map(t => t.name).join(', ')} ]`; + return `${this.source.name} -> [ ${this.targets.map((t) => t.name).join(", ")} ]`; } - } diff --git a/components/gitpod-db/src/tables.ts b/components/gitpod-db/src/tables.ts index 63a29786653571..c7b1fec27ed87a 100644 --- a/components/gitpod-db/src/tables.ts +++ b/components/gitpod-db/src/tables.ts @@ -282,7 +282,7 @@ export class GitpodTableDescriptionProvider implements TableDescriptionProvider }, { name: "d_b_cost_center", - primaryKeys: ["id"], + primaryKeys: ["id", "creationTime"], deletionColumn: "deleted", timeColumn: "_lastModified", },