Skip to content

[db-sync] Fix it #13049

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Sep 16, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
152 changes: 95 additions & 57 deletions components/ee/db-sync/src/replication.ts
Original file line number Diff line number Diff line change
Expand Up @@ -9,11 +9,16 @@ import { TableUpdateProvider } from "./export";
import * as ProgressBar from "progress";
import { query, NamedConnection } from "./database";
import { injectable, inject } from "inversify";
import * as path from 'path';
import * as fs from 'fs';
import { Semaphore } from '@gitpod/gitpod-protocol/lib/util/semaphore'

export type PeriodicReplicatorProvider = (source: Connection, targets: Connection[], syncInterval: number, tableSet: string) => PeriodicReplicator
import * as path from "path";
import * as fs from "fs";
import { Semaphore } from "@gitpod/gitpod-protocol/lib/util/semaphore";

export type PeriodicReplicatorProvider = (
source: Connection,
targets: Connection[],
syncInterval: number,
tableSet: string,
) => PeriodicReplicator;
export const PeriodicReplicatorProvider = Symbol("PeriodicReplicatorProvider");

@injectable()
Expand All @@ -31,7 +36,12 @@ export class PeriodicReplicator {

// This is a weird setup and I'd rather have those fields set in the constructor.
// I have not found a way how to do that using inversify.
public setup(source: NamedConnection, targets: NamedConnection[], syncInterval: number, tableSet: string | undefined) {
public setup(
source: NamedConnection,
targets: NamedConnection[],
syncInterval: number,
tableSet: string | undefined,
) {
this.source = source;
this.targets = targets;
this.syncInterval = syncInterval;
Expand Down Expand Up @@ -63,29 +73,37 @@ export class PeriodicReplicator {
const now = new Date();
let previousRun = await this.getLastExportDate();
console.info(`Replicating ${this.toString()}: last ran on ${previousRun}`);
if(ignoreStartDate) {
if(previousRun && previousRun > now) {
console.warn(`Previous run was in the future (${previousRun} > now=${now}). Possible time sync issue between database and db-sync.`);
if (ignoreStartDate) {
if (previousRun && previousRun > now) {
console.warn(
`Previous run was in the future (${previousRun} > now=${now}). Possible time sync issue between database and db-sync.`,
);
}

console.info("Synchronizing complete database (ignoring previous run)");
previousRun = undefined;
} else if(previousRun && previousRun > now) {
throw new Error(`Previous run was in the future (${previousRun} > now=${now}). Possible time sync issue between database and db-sync.`);
} else if (previousRun && previousRun > now) {
throw new Error(
`Previous run was in the future (${previousRun} > now=${now}). Possible time sync issue between database and db-sync.`,
);
}

const modifications = await this.tableUpdateProvider.getAllStatementsForAllTables(this.source, this.tableSet, previousRun);
const modifications = await this.tableUpdateProvider.getAllStatementsForAllTables(
this.source,
this.tableSet,
previousRun,
);
const deletions = modifications.deletions;
const updates = modifications.updates;
const total = [...deletions, ...updates];
console.debug(`Collected ${total.length} statements`)
console.debug(`Collected ${total.length} statements`);
try {
/* nowait */ this.logStatements(now, total);

await Promise.all([ this.source, ...this.targets ].map(target => this.update(target, deletions)));
await Promise.all(this.targets.map(target => this.update(target, updates)));
await Promise.all([this.source, ...this.targets].map((target) => this.update(target, deletions)));
await Promise.all(this.targets.map((target) => this.update(target, updates)));
await this.markLastExportDate(now);
} catch(err) {
} catch (err) {
console.error("Error during replication", err);
}
}
Expand All @@ -94,7 +112,7 @@ export class PeriodicReplicator {
if (!this.logdir) return;

const logdir = this.logdir;
const dest = path.join(logdir, `${this.tableSet || 'default'}-${now.getTime()}.sql`);
const dest = path.join(logdir, `${this.tableSet || "default"}-${now.getTime()}.sql`);
try {
if (!(await new Promise<boolean>((resolve, reject) => fs.exists(logdir, resolve)))) {
await new Promise<void>((resolve, reject) => {
Expand All @@ -105,20 +123,20 @@ export class PeriodicReplicator {
resolve();
}
});
})
});
}

const logfile = fs.createWriteStream(dest, { flags: 'w' });
const logfile = fs.createWriteStream(dest, { flags: "w" });
const semaphore = new Semaphore(1);
logfile.on('drain', () => semaphore.release());
logfile.on("drain", () => semaphore.release());
for (const row of updates) {
const written = logfile.write(row + "\n");
if (!written) {
await semaphore.acquire();
}
}
console.debug(`Log file ${dest} written`);
} catch(err) {
} catch (err) {
console.warn("Error while writing log file to " + dest, err);
}

Expand All @@ -127,23 +145,27 @@ export class PeriodicReplicator {

protected async deleteOldLogs(logdir: string) {
try {
const files = await new Promise<string[]>((resolve, reject) => fs.readdir(this.logdir!, (err: any, files: string[]) => {
if (err) {
reject(err);
} else {
resolve(files)
}
}));
const files = await new Promise<string[]>((resolve, reject) =>
fs.readdir(this.logdir!, (err: any, files: string[]) => {
if (err) {
reject(err);
} else {
resolve(files);
}
}),
);
for (const file of files) {
// We don't care about errors during deletion: it's racy anyway (see "nowait" above), and will succeed next time
const filePath = path.join(logdir, file);
const ctime = await new Promise<number>((resolve, reject) => fs.stat(filePath, (err, stats) => {
if (!err) {
resolve(stats.ctimeMs);
}
}));
const ctime = await new Promise<number>((resolve, reject) =>
fs.stat(filePath, (err, stats) => {
if (!err) {
resolve(stats.ctimeMs);
}
}),
);
const now = Date.now();
const endTime = ctime + (2 * 24 * 60 * 60);
const endTime = ctime + 2 * 24 * 60 * 60;
if (now > endTime) {
fs.unlink(filePath, (_) => {});
}
Expand All @@ -155,13 +177,16 @@ export class PeriodicReplicator {

protected async getLastExportDate(): Promise<Date | undefined> {
try {
const rows = await query(this.source, "SELECT value FROM gitpod_replication WHERE item = 'lastExport'") as any[];
if(rows.length > 0) {
return new Date(rows[0]['value'] as string);
const rows = (await query(
this.source,
"SELECT value FROM gitpod_replication WHERE item = 'lastExport'",
)) as any[];
if (rows.length > 0) {
return new Date(rows[0]["value"] as string);
}
return undefined;
} catch(err) {
if(err.toString().indexOf("ER_NO_SUCH_TABLE") > -1) {
} catch (err) {
if (err.toString().indexOf("ER_NO_SUCH_TABLE") > -1) {
return undefined;
} else {
throw err;
Expand All @@ -170,32 +195,39 @@ export class PeriodicReplicator {
}

protected async update(target: Connection, updates: string[], batchSize = 100) {
const updateSize = updates.join().length;
const inTransaction = updateSize < (8 * 1024 * 1024) && this.useTransactions;
if(inTransaction) {
await query(target, "START TRANSACTION;");
} else {
console.warn("Update is too big (> 8mb) or transactions are disabled, not running in a transaction! Inconsistency is possible.");
}

const bar = this.showProgressBar && (updates.length > batchSize) ? new ProgressBar('inserting/updating [:bar] :rate/bps :percent :etas', updates.length / batchSize) : { tick: () => {}, terminate: () => {} };
// const updateSize = updates.join().length;
// const inTransaction = updateSize < (8 * 1024 * 1024) && this.useTransactions;
// if(inTransaction) {
// await query(target, "START TRANSACTION;");
// } else {
// console.warn("Update is too big (> 8mb) or transactions are disabled, not running in a transaction! Inconsistency is possible.");
// }
const inTransaction = false;

const bar =
this.showProgressBar && updates.length > batchSize
? new ProgressBar("inserting/updating [:bar] :rate/bps :percent :etas", updates.length / batchSize)
: { tick: () => {}, terminate: () => {} };
try {
for(var i = 0; i < updates.length; i += batchSize) {
for (var i = 0; i < updates.length; i += batchSize) {
const imax = Math.min(i + batchSize, updates.length);
const thisUpdate = updates.slice(i, imax).join("");
await query(target, thisUpdate);
bar.tick();
}
if(inTransaction) {
if (inTransaction) {
console.debug("Modifications were OK. Committing transaction.");
await query(target, "COMMIT;");
}
} catch(err) {
if(inTransaction) {
} catch (err) {
if (inTransaction) {
console.error("Caught an error during modification. Rolling back transaction.", err);
await query(target, "ROLLBACK;");
} else {
console.error("Caught an error during modification. NOT RUNNING IN A TRANSACTION. Data may be inconsistent.", err);
console.error(
"Caught an error during modification. NOT RUNNING IN A TRANSACTION. Data may be inconsistent.",
err,
);
}
throw err;
} finally {
Expand All @@ -204,12 +236,18 @@ export class PeriodicReplicator {
}

async markLastExportDate(date: Date) {
await query(this.source, "CREATE TABLE IF NOT EXISTS gitpod_replication (item VARCHAR(36), value VARCHAR(255), PRIMARY KEY (item))");
await query(this.source, "INSERT INTO gitpod_replication VALUES ('lastExport', ?) ON DUPLICATE KEY UPDATE value=?", { values: [ date.toISOString(), date.toISOString() ] });
await query(
this.source,
"CREATE TABLE IF NOT EXISTS gitpod_replication (item VARCHAR(36), value VARCHAR(255), PRIMARY KEY (item))",
);
await query(
this.source,
"INSERT INTO gitpod_replication VALUES ('lastExport', ?) ON DUPLICATE KEY UPDATE value=?",
{ values: [date.toISOString(), date.toISOString()] },
);
}

public toString(): string {
return `${this.source.name} -> [ ${this.targets.map(t => t.name).join(', ')} ]`;
return `${this.source.name} -> [ ${this.targets.map((t) => t.name).join(", ")} ]`;
}

}
2 changes: 1 addition & 1 deletion components/gitpod-db/src/tables.ts
Original file line number Diff line number Diff line change
Expand Up @@ -282,7 +282,7 @@ export class GitpodTableDescriptionProvider implements TableDescriptionProvider
},
{
name: "d_b_cost_center",
primaryKeys: ["id"],
primaryKeys: ["id", "creationTime"],
deletionColumn: "deleted",
timeColumn: "_lastModified",
},
Expand Down