Skip to content

Commit 6441a49

Browse files
authored
Consistently use the project's defaultDatabase for run caching metadata. (#1031)
1 parent 661d69f commit 6441a49

File tree

5 files changed

+32
-13
lines changed

5 files changed

+32
-13
lines changed

api/commands/build.ts

+6-1
Original file line numberDiff line numberDiff line change
@@ -56,7 +56,12 @@ export async function build(
5656
return new Builder(
5757
prunedGraph,
5858
runConfig,
59-
await state(dbadapter, Array.from(allInvolvedTargets), runConfig.useRunCache),
59+
await state(
60+
compiledGraph.projectConfig.defaultDatabase,
61+
dbadapter,
62+
Array.from(allInvolvedTargets),
63+
runConfig.useRunCache
64+
),
6065
transitiveInputsByTarget
6166
).build();
6267
}

api/commands/run.ts

+1
Original file line numberDiff line numberDiff line change
@@ -169,6 +169,7 @@ export class Runner {
169169
if (this.graph.runConfig && this.graph.runConfig.useRunCache) {
170170
await Promise.all(this.metadataReadPromises);
171171
await this.dbadapter.persistStateMetadata(
172+
this.graph.projectConfig.defaultDatabase,
172173
new StringifiedMap<
173174
dataform.ITarget,
174175
dataform.PersistedTableMetadata.ITransitiveInputMetadata

api/commands/state.ts

+2-1
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@ import { IDbAdapter } from "df/api/dbadapters";
22
import { dataform } from "df/protos/ts";
33

44
export async function state(
5+
defaultDatabase: string,
56
dbadapter: IDbAdapter,
67
targets: dataform.ITarget[],
78
fetchPersistedMetadata: boolean
@@ -17,7 +18,7 @@ export async function state(
1718

1819
if (fetchPersistedMetadata) {
1920
try {
20-
cachedStates = await dbadapter.persistedStateMetadata();
21+
cachedStates = await dbadapter.persistedStateMetadata(defaultDatabase);
2122
} catch (err) {
2223
// If the table doesn't exist or for some network error
2324
// cache state is not fetchable, then return empty array

api/dbadapters/bigquery.ts

+21-10
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,13 @@ import { PromisePoolExecutor } from "promise-pool-executor";
33

44
import { BigQuery, TableField, TableMetadata } from "@google-cloud/bigquery";
55
import { Credentials } from "df/api/commands/credentials";
6-
import { IDbAdapter, IDbClient, IExecutionResult, OnCancel } from "df/api/dbadapters/index";
6+
import {
7+
CACHED_STATE_TABLE_TARGET,
8+
IDbAdapter,
9+
IDbClient,
10+
IExecutionResult,
11+
OnCancel
12+
} from "df/api/dbadapters/index";
713
import { parseBigqueryEvalError } from "df/api/utils/error_parsing";
814
import { LimitedResultSet } from "df/api/utils/results";
915
import {
@@ -18,8 +24,6 @@ import { StringifiedMap } from "df/common/strings/stringifier";
1824
import { collectEvaluationQueries, QueryOrAction } from "df/core/adapters";
1925
import { dataform } from "df/protos/ts";
2026

21-
const CACHED_STATE_TABLE_NAME = "dataform_meta.cache_state";
22-
2327
const EXTRA_GOOGLE_SCOPES = ["https://www.googleapis.com/auth/drive"];
2428

2529
const BIGQUERY_DATE_RELATED_FIELDS = [
@@ -254,17 +258,23 @@ export class BigQueryDbAdapter implements IDbAdapter {
254258
// Unimplemented.
255259
}
256260

257-
public async persistedStateMetadata(): Promise<dataform.IPersistedTableMetadata[]> {
258-
const { rows } = await this.execute(`SELECT * FROM ${CACHED_STATE_TABLE_NAME}`, {
259-
rowLimit: 5000 // TODO: Add pagination for 5000+ rows
260-
});
261+
public async persistedStateMetadata(
262+
database: string
263+
): Promise<dataform.IPersistedTableMetadata[]> {
264+
const { rows } = await this.execute(
265+
`SELECT * FROM \`${database}.${CACHED_STATE_TABLE_TARGET.schema}.${CACHED_STATE_TABLE_TARGET.name}\``,
266+
{
267+
rowLimit: 5000 // TODO: Add pagination for 5000+ rows
268+
}
269+
);
261270
const persistedMetadata = rows.map((row: IMetadataRow) =>
262271
decodePersistedTableMetadata(row.metadata_proto)
263272
);
264273
return persistedMetadata;
265274
}
266275

267276
public async persistStateMetadata(
277+
database: string,
268278
transitiveInputMetadataByTarget: StringifiedMap<
269279
dataform.ITarget,
270280
dataform.PersistedTableMetadata.ITransitiveInputMetadata
@@ -278,11 +288,12 @@ export class BigQueryDbAdapter implements IDbAdapter {
278288
if (allActions.length === 0) {
279289
return;
280290
}
291+
const cachedStateTableName = `${database}.${CACHED_STATE_TABLE_TARGET.schema}.${CACHED_STATE_TABLE_TARGET.name}`;
281292
try {
282293
// Create the cache table, if needed.
283294
await this.execute(
284295
`
285-
CREATE TABLE IF NOT EXISTS \`${CACHED_STATE_TABLE_NAME}\` (
296+
CREATE TABLE IF NOT EXISTS \`${cachedStateTableName}\` (
286297
target STRING,
287298
metadata_proto STRING
288299
)`,
@@ -291,7 +302,7 @@ CREATE TABLE IF NOT EXISTS \`${CACHED_STATE_TABLE_NAME}\` (
291302
// Before saving any new data, delete all entries for 'allActions'.
292303
await this.execute(
293304
`
294-
DELETE \`${CACHED_STATE_TABLE_NAME}\` WHERE target IN (${allActions
305+
DELETE \`${cachedStateTableName}\` WHERE target IN (${allActions
295306
.map(({ target }) => `'${toRowKey(target)}'`)
296307
.join(",")})`,
297308
options
@@ -322,7 +333,7 @@ DELETE \`${CACHED_STATE_TABLE_NAME}\` WHERE target IN (${allActions
322333
);
323334
// We have to split up the INSERT queries to get around BigQuery's query length limit.
324335
while (valuesTuples.length > 0) {
325-
let insertStatement = `INSERT INTO \`${CACHED_STATE_TABLE_NAME}\` (target, metadata_proto) VALUES ${valuesTuples.pop()}`;
336+
let insertStatement = `INSERT INTO \`${cachedStateTableName}\` (target, metadata_proto) VALUES ${valuesTuples.pop()}`;
326337
let nextInsertStatement = `${insertStatement}, ${valuesTuples[valuesTuples.length - 1]}`;
327338
while (valuesTuples.length > 0 && nextInsertStatement.length < MAX_QUERY_LENGTH) {
328339
insertStatement = nextInsertStatement;

api/dbadapters/index.ts

+2-1
Original file line numberDiff line numberDiff line change
@@ -54,6 +54,7 @@ export interface IDbAdapter extends IDbClient {
5454
setMetadata(action: dataform.IExecutionAction): Promise<void>;
5555

5656
persistStateMetadata(
57+
database: string,
5758
transitiveInputMetadataByTarget: StringifiedMap<
5859
dataform.ITarget,
5960
dataform.PersistedTableMetadata.ITransitiveInputMetadata
@@ -64,7 +65,7 @@ export interface IDbAdapter extends IDbClient {
6465
onCancel: OnCancel;
6566
}
6667
): Promise<void>;
67-
persistedStateMetadata(): Promise<dataform.IPersistedTableMetadata[]>;
68+
persistedStateMetadata(database: string): Promise<dataform.IPersistedTableMetadata[]>;
6869

6970
close(): Promise<void>;
7071
}

0 commit comments

Comments
 (0)