Skip to content

Commit ae701e7

Browse files
committed
feat: do not write to stability window buffer til volatile
It's implemented by splitting StabilityWindowBuffer into 2 pieces: - Tip observable, which is used to determine local tip - StabilityWindowBuffer is now simplified to just 'getBlock' method BREAKING CHANGE: simplify StabilityWindowBuffer interface to just 'getBlock' - Bootstrap.fromCardanoNode now requires Tip observable parameter
1 parent e9b9e33 commit ae701e7

38 files changed

+1109
-715
lines changed

packages/cardano-services/src/Program/programs/projector.ts

+2-10
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,3 @@
1-
import { Bootstrap } from '@cardano-sdk/projection';
21
import { Cardano } from '@cardano-sdk/core';
32
import { CommonProgramOptions, OgmiosProgramOptions, PosgresProgramOptions } from '../options';
43
import { DnsResolver, createDnsResolver } from '../utils';
@@ -12,8 +11,8 @@ import { Logger } from 'ts-log';
1211
import { MissingProgramOption, UnknownServiceName } from '../errors';
1312
import { ProjectionHttpService, ProjectionName, createTypeormProjection, storeOperators } from '../../Projection';
1413
import { SrvRecord } from 'dns';
15-
import { TypeormStabilityWindowBuffer, createStorePoolMetricsUpdateJob } from '@cardano-sdk/projection-typeorm';
1614
import { createLogger } from 'bunyan';
15+
import { createStorePoolMetricsUpdateJob } from '@cardano-sdk/projection-typeorm';
1716
import { getConnectionConfig, getOgmiosObservableCardanoNode } from '../services';
1817

1918
export const BLOCKS_BUFFER_LENGTH_DEFAULT = 10;
@@ -50,24 +49,17 @@ const createProjectionHttpService = async (options: ProjectionMapFactoryOptions)
5049
ogmiosUrl: args.ogmiosUrl
5150
});
5251
const connectionConfig$ = getConnectionConfig(dnsResolver, 'projector', '', args);
53-
const buffer = new TypeormStabilityWindowBuffer({ logger });
5452
const { blocksBufferLength, dropSchema, dryRun, exitAtBlockNo, handlePolicyIds, projectionNames, synchronize } = args;
5553
const projection$ = createTypeormProjection({
5654
blocksBufferLength,
57-
buffer,
55+
cardanoNode,
5856
connectionConfig$,
5957
devOptions: { dropSchema, synchronize },
6058
exitAtBlockNo,
6159
logger,
6260
projectionOptions: {
6361
handlePolicyIds
6462
},
65-
projectionSource$: Bootstrap.fromCardanoNode({
66-
blocksBufferLength,
67-
buffer,
68-
cardanoNode,
69-
logger
70-
}),
7163
projections: projectionNames
7264
});
7365
return new ProjectionHttpService({ dryRun, projection$, projectionNames }, { logger });
Original file line numberDiff line numberDiff line change
@@ -1,14 +1,17 @@
11
/* eslint-disable @typescript-eslint/no-explicit-any */
22
/* eslint-disable prefer-spread */
3-
import { Cardano } from '@cardano-sdk/core';
3+
import { Bootstrap, ProjectionEvent, logProjectionProgress, requestNext } from '@cardano-sdk/projection';
4+
import { Cardano, ObservableCardanoNode } from '@cardano-sdk/core';
45
import { Logger } from 'ts-log';
5-
import { Observable, takeWhile } from 'rxjs';
6+
import { Observable, concat, defer, take, takeWhile } from 'rxjs';
67
import {
78
PgConnectionConfig,
89
TypeormDevOptions,
10+
TypeormOptions,
911
TypeormStabilityWindowBuffer,
1012
WithTypeormContext,
1113
createObservableConnection,
14+
createTypeormTipTracker,
1215
isRecoverableTypeormError,
1316
typeormTransactionCommit,
1417
withTypeormTransaction
@@ -19,19 +22,22 @@ import {
1922
ProjectionOptions,
2023
prepareTypeormProjection
2124
} from './prepareTypeormProjection';
22-
import { ProjectionEvent, logProjectionProgress, requestNext } from '@cardano-sdk/projection';
25+
import { ReconnectionConfig, passthrough, shareRetryBackoff, toEmpty } from '@cardano-sdk/util-rxjs';
2326
import { migrations } from './migrations';
24-
import { passthrough, shareRetryBackoff } from '@cardano-sdk/util-rxjs';
27+
28+
const reconnectionConfig: ReconnectionConfig = {
29+
initialInterval: 50,
30+
maxInterval: 5000
31+
};
2532

2633
export interface CreateTypeormProjectionProps {
2734
projections: ProjectionName[];
2835
blocksBufferLength: number;
29-
buffer?: TypeormStabilityWindowBuffer;
30-
projectionSource$: Observable<ProjectionEvent>;
3136
connectionConfig$: Observable<PgConnectionConfig>;
3237
devOptions?: TypeormDevOptions;
3338
exitAtBlockNo?: Cardano.BlockNo;
3439
logger: Logger;
40+
cardanoNode: ObservableCardanoNode;
3541
projectionOptions?: ProjectionOptions;
3642
}
3743

@@ -54,12 +60,11 @@ const applyStores =
5460
export const createTypeormProjection = ({
5561
blocksBufferLength,
5662
projections,
57-
projectionSource$,
5863
connectionConfig$,
5964
logger,
60-
devOptions,
65+
devOptions: requestedDevOptions,
66+
cardanoNode,
6167
exitAtBlockNo,
62-
buffer,
6368
projectionOptions
6469
}: CreateTypeormProjectionProps) => {
6570
const { handlePolicyIds } = { handlePolicyIds: [], ...projectionOptions };
@@ -69,32 +74,65 @@ export const createTypeormProjection = ({
6974

7075
const { mappers, entities, stores, extensions } = prepareTypeormProjection(
7176
{
72-
buffer,
7377
options: projectionOptions,
7478
projections
7579
},
7680
{ logger }
7781
);
78-
const connection$ = createObservableConnection({
79-
connectionConfig$,
80-
devOptions,
81-
entities,
82-
extensions,
82+
const connect = (options?: TypeormOptions, devOptions?: TypeormDevOptions) =>
83+
createObservableConnection({
84+
connectionConfig$,
85+
devOptions,
86+
entities,
87+
extensions,
88+
logger,
89+
options
90+
});
91+
92+
const tipTracker = createTypeormTipTracker({
93+
connection$: connect(),
94+
reconnectionConfig
95+
});
96+
const buffer = new TypeormStabilityWindowBuffer({
97+
connection$: connect(),
98+
logger,
99+
reconnectionConfig
100+
});
101+
const projectionSource$ = Bootstrap.fromCardanoNode({
102+
blocksBufferLength,
103+
buffer,
104+
cardanoNode,
83105
logger,
84-
options: {
85-
installExtensions: true,
86-
migrations: migrations.filter(({ entity }) => entities.includes(entity as any)),
87-
migrationsRun: !devOptions?.synchronize
88-
}
106+
projectedTip$: tipTracker.tip$
89107
});
90-
return projectionSource$.pipe(
91-
applyMappers(mappers),
92-
shareRetryBackoff(
93-
(evt$) => evt$.pipe(withTypeormTransaction({ connection$ }), applyStores(stores), typeormTransactionCommit()),
94-
{ shouldRetry: isRecoverableTypeormError }
95-
),
96-
requestNext(),
97-
logProjectionProgress(logger),
98-
exitAtBlockNo ? takeWhile((event) => event.block.header.blockNo < exitAtBlockNo) : passthrough()
108+
return concat(
109+
// initialize database before starting the projector
110+
connect(
111+
{
112+
installExtensions: true,
113+
migrations: migrations.filter(({ entity }) => entities.includes(entity as any)),
114+
migrationsRun: !requestedDevOptions?.synchronize
115+
},
116+
requestedDevOptions
117+
).pipe(take(1), toEmpty),
118+
defer(() =>
119+
projectionSource$.pipe(
120+
applyMappers(mappers),
121+
shareRetryBackoff(
122+
(evt$) =>
123+
evt$.pipe(
124+
withTypeormTransaction({ connection$: connect() }),
125+
applyStores(stores),
126+
buffer.storeBlockData(),
127+
typeormTransactionCommit()
128+
),
129+
{ shouldRetry: isRecoverableTypeormError }
130+
),
131+
tipTracker.trackProjectedTip(),
132+
requestNext(),
133+
logProjectionProgress(logger),
134+
exitAtBlockNo ? takeWhile((event) => event.block.header.blockNo < exitAtBlockNo) : passthrough()
135+
)
136+
)
99137
);
100138
};

packages/cardano-services/src/Projection/prepareTypeormProjection.ts

+2-8
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,6 @@ import {
1515
StakeKeyRegistrationEntity,
1616
StakePoolEntity,
1717
TokensEntity,
18-
TypeormStabilityWindowBuffer,
1918
createStorePoolMetricsUpdateJob,
2019
storeAddresses,
2120
storeAssets,
@@ -139,7 +138,7 @@ type Entity = Entities[EntityName];
139138
const storeEntities: Partial<Record<StoreName, EntityName[]>> = {
140139
storeAddresses: ['address'],
141140
storeAssets: ['asset'],
142-
storeBlock: ['block'],
141+
storeBlock: ['block', 'blockData'],
143142
storeHandleMetadata: ['handleMetadata', 'output'],
144143
storeHandles: ['handle', 'asset', 'tokens', 'output'],
145144
storeNftMetadata: ['asset'],
@@ -303,7 +302,6 @@ const keyOf = <T extends {}>(obj: T, value: unknown): keyof T | null => {
303302

304303
export interface PrepareTypeormProjectionProps {
305304
projections: ProjectionName[];
306-
buffer?: TypeormStabilityWindowBuffer;
307305
options?: ProjectionOptions;
308306
}
309307

@@ -312,7 +310,7 @@ export interface PrepareTypeormProjectionProps {
312310
* based on 'projections' and presence of 'buffer':
313311
*/
314312
export const prepareTypeormProjection = (
315-
{ projections, buffer, options = {} }: PrepareTypeormProjectionProps,
313+
{ projections, options = {} }: PrepareTypeormProjectionProps,
316314
dependencies: WithLogger
317315
) => {
318316
const mapperSorter = new Sorter<MapperOperator>();
@@ -329,10 +327,6 @@ export const prepareTypeormProjection = (
329327
const selectedEntities = entitySorter.nodes;
330328
const selectedMappers = mapperSorter.nodes;
331329
const selectedStores = storeSorter.nodes;
332-
if (buffer) {
333-
selectedEntities.push(BlockDataEntity);
334-
selectedStores.push(buffer.storeBlockData());
335-
}
336330
const extensions = requiredExtensions(projections);
337331
return {
338332
__debug: {

packages/cardano-services/test/Projection/createTypeormProjection.test.ts

+16-30
Original file line numberDiff line numberDiff line change
@@ -1,56 +1,42 @@
1-
import {
2-
AssetEntity,
3-
OutputEntity,
4-
TokensEntity,
5-
TypeormStabilityWindowBuffer,
6-
createDataSource
7-
} from '@cardano-sdk/projection-typeorm';
8-
import { Bootstrap } from '@cardano-sdk/projection';
1+
import { AssetEntity, OutputEntity, TokensEntity, createDataSource } from '@cardano-sdk/projection-typeorm';
92
import { ChainSyncDataSet, chainSyncData, logger } from '@cardano-sdk/util-dev';
103
import { ProjectionName, createTypeormProjection, prepareTypeormProjection } from '../../src';
114
import { lastValueFrom } from 'rxjs';
125
import { projectorConnectionConfig, projectorConnectionConfig$ } from '../util';
136

147
describe('createTypeormProjection', () => {
158
it('creates a projection to PostgreSQL based on requested projection names', async () => {
16-
// Setup
9+
// Setup projector
1710
const projections = [ProjectionName.UTXO];
18-
const buffer = new TypeormStabilityWindowBuffer({ allowNonSequentialBlockHeights: true, logger });
19-
const { entities } = prepareTypeormProjection({ buffer, projections }, { logger });
20-
const dataSource = createDataSource({
21-
connectionConfig: projectorConnectionConfig,
22-
devOptions: { dropSchema: true, synchronize: true },
23-
entities,
24-
logger
25-
});
26-
await dataSource.initialize();
27-
const queryRunner = dataSource.createQueryRunner();
28-
await queryRunner.connect();
2911
const data = chainSyncData(ChainSyncDataSet.WithMint);
30-
await buffer.initialize(queryRunner);
31-
3212
const projection$ = createTypeormProjection({
3313
blocksBufferLength: 10,
34-
buffer,
14+
cardanoNode: data.cardanoNode,
3515
connectionConfig$: projectorConnectionConfig$,
36-
devOptions: { dropSchema: true, synchronize: true },
16+
devOptions: { dropSchema: true },
3717
logger,
38-
projectionSource$: Bootstrap.fromCardanoNode({
39-
blocksBufferLength: 10,
40-
buffer,
41-
cardanoNode: data.cardanoNode,
42-
logger
43-
}),
4418
projections
4519
});
4620

4721
// Project
4822
await lastValueFrom(projection$);
4923

24+
// Setup query runner for assertions
25+
const { entities } = prepareTypeormProjection({ projections }, { logger });
26+
const dataSource = createDataSource({
27+
connectionConfig: projectorConnectionConfig,
28+
entities,
29+
logger
30+
});
31+
await dataSource.initialize();
32+
const queryRunner = dataSource.createQueryRunner();
33+
await queryRunner.connect();
34+
5035
// Check data in the database
5136
expect(await queryRunner.manager.count(AssetEntity)).toBeGreaterThan(0);
5237
expect(await queryRunner.manager.count(TokensEntity)).toBeGreaterThan(0);
5338
expect(await queryRunner.manager.count(OutputEntity)).toBeGreaterThan(0);
39+
5440
await queryRunner.release();
5541
await dataSource.destroy();
5642
});

packages/cardano-services/test/Projection/prepareTypeormProjection.test.ts

+13-23
Original file line numberDiff line numberDiff line change
@@ -1,40 +1,30 @@
11
import { ProjectionName, prepareTypeormProjection } from '../../src';
2-
import { TypeormStabilityWindowBuffer } from '@cardano-sdk/projection-typeorm';
32
import { dummyLogger } from 'ts-log';
43

5-
const prepare = (projections: ProjectionName[], useBuffer?: boolean) => {
6-
const { __debug } = prepareTypeormProjection(
7-
{
8-
buffer: useBuffer ? new TypeormStabilityWindowBuffer({ logger: dummyLogger }) : undefined,
9-
projections
10-
},
11-
{ logger: dummyLogger }
12-
);
13-
return __debug;
14-
};
4+
const prepare = (projections: ProjectionName[]) =>
5+
prepareTypeormProjection({ projections }, { logger: dummyLogger }).__debug;
156

167
describe('prepareTypeormProjection', () => {
178
describe('computes required entities, mappers and stores based on selected projections and presence of a buffer', () => {
18-
test('utxo (without buffer)', () => {
9+
test('utxo', () => {
1910
const { entities, mappers, stores } = prepare([ProjectionName.UTXO]);
20-
expect(new Set(entities)).toEqual(new Set(['tokens', 'block', 'asset', 'nftMetadata', 'output']));
21-
expect(mappers).toEqual(['withMint', 'withUtxo']);
22-
expect(stores).toEqual(['storeBlock', 'storeAssets', 'storeUtxo']);
23-
});
24-
25-
test('utxo (with buffer)', () => {
26-
const { entities, mappers, stores } = prepare([ProjectionName.UTXO], true);
2711
expect(new Set(entities)).toEqual(new Set(['tokens', 'block', 'asset', 'nftMetadata', 'output', 'blockData']));
2812
expect(mappers).toEqual(['withMint', 'withUtxo']);
29-
// 'null' is expected here because buffer.storeBlockData is not a common operator,
30-
// but is a method of the buffer. As a result it's not part of the predefined operators object.
31-
expect(stores).toEqual(['storeBlock', 'storeAssets', 'storeUtxo', null]);
13+
expect(stores).toEqual(['storeBlock', 'storeAssets', 'storeUtxo']);
3214
});
3315

3416
test('stake-pool,stake-pool-metadata', () => {
3517
const { entities, mappers, stores } = prepare([ProjectionName.StakePool, ProjectionName.StakePoolMetadataJob]);
3618
expect(new Set(entities)).toEqual(
37-
new Set(['block', 'stakePool', 'poolRegistration', 'poolRetirement', 'poolMetadata', 'currentPoolMetrics'])
19+
new Set([
20+
'block',
21+
'blockData',
22+
'stakePool',
23+
'poolRegistration',
24+
'poolRetirement',
25+
'poolMetadata',
26+
'currentPoolMetrics'
27+
])
3828
);
3929
expect(mappers).toEqual(['withCertificates', 'withStakePools']);
4030
expect(stores).toEqual(['storeBlock', 'storeStakePools', 'storeStakePoolMetadataJob']);

0 commit comments

Comments
 (0)