Skip to content

Commit d9c6826

Browse files
committed
refactor!: simplify projection Sink to be an operator
Instead of {sink,before,after}, a Sink is now just an rxjs operator. It gives more control to the sink implementation - it can now decide the order of granular sinks. It is needed for stakePoolMetadata, which requires stakePool to be run before it. Also extract the bootstrapping part from projectIntoSink, which is now a separate export `Bootstrap.fromCardanoNode`. It simplifies projectIntoSink function by decoupling it form ObservableCardanoNode. This refactor wasn't strictly required right now, but projectIntoSink was previously too large and difficult to work with. Lastly, remove StabilityWindowBuffer.handleEvents and only keep tip$ and tail$, leaving the mechanism of updating the buffer up to concrete implementations. It is no longer quired since handleEvents is no longer called by projectIntoSink.
1 parent ae85933 commit d9c6826

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

42 files changed

+709
-619
lines changed

packages/e2e/test/projection/offline-fork.test.ts

+24-15
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,14 @@
11
/* eslint-disable promise/always-return */
22
import * as Postgres from '@cardano-sdk/projection-typeorm';
3+
import {
4+
Bootstrap,
5+
InMemory,
6+
Projections,
7+
Sink,
8+
StabilityWindowBuffer,
9+
WithBlock,
10+
projectIntoSink
11+
} from '@cardano-sdk/projection';
312
import {
413
Cardano,
514
ChainSyncEvent,
@@ -10,7 +19,6 @@ import {
1019
} from '@cardano-sdk/core';
1120
import { ChainSyncDataSet, chainSyncData, logger } from '@cardano-sdk/util-dev';
1221
import { ConnectionConfig } from '@cardano-ogmios/client';
13-
import { InMemory, Projections, SinksFactory, WithBlock, projectIntoSink } from '@cardano-sdk/projection';
1422
import { Observable, filter, firstValueFrom, lastValueFrom, of, share, take } from 'rxjs';
1523
import { OgmiosObservableCardanoNode } from '@cardano-sdk/ogmios';
1624
import { createDatabase } from 'typeorm-extension';
@@ -108,35 +116,36 @@ describe('resuming projection when intersection is not local tip', () => {
108116
ogmiosCardanoNode = new OgmiosObservableCardanoNode({ connectionConfig$: of(ogmiosConnectionConfig) }, { logger });
109117
});
110118

111-
const project = (cardanoNode: ObservableCardanoNode, sinksFactory: SinksFactory<typeof projections>) =>
119+
const project = (cardanoNode: ObservableCardanoNode, buffer: StabilityWindowBuffer, sink: Sink<typeof projections>) =>
112120
projectIntoSink({
113-
cardanoNode,
114121
logger,
115122
projections,
116-
sinksFactory
123+
sink,
124+
source$: Bootstrap.fromCardanoNode({ buffer, cardanoNode, logger })
117125
});
118126

119127
const testRollbackAndContinue = (
120-
sinksFactory: SinksFactory<typeof projections>,
128+
buffer: StabilityWindowBuffer,
129+
sink: Sink<typeof projections>,
121130
getNumberOfLocalStakeKeys: () => Promise<number>
122131
) => {
123132
it('rolls back local data to intersection and resumes projection from there', async () => {
124133
// Project some events until we find at least 1 stake key registration
125134
const firstEventWithKeyRegistrations = await firstValueFrom(
126-
project(ogmiosCardanoNode, sinksFactory).pipe(filter((evt) => evt.stakeKeys.insert.length > 0))
135+
project(ogmiosCardanoNode, buffer, sink).pipe(filter((evt) => evt.stakeKeys.insert.length > 0))
127136
);
128137
const lastEventFromOriginalSync = firstEventWithKeyRegistrations;
129138
const numStakeKeysBeforeFork = await getNumberOfLocalStakeKeys();
130139
expect(numStakeKeysBeforeFork).toBe(firstEventWithKeyRegistrations.stakeKeys.insert.length); // sanity check
131140

132141
// Simulate a fork by adding some blocks that are not on the ogmios chain
133142
const stubForkCardanoNode = createForkProjectionSource(ogmiosCardanoNode, lastEventFromOriginalSync);
134-
await lastValueFrom(project(stubForkCardanoNode, sinksFactory).pipe(take(4)));
143+
await lastValueFrom(project(stubForkCardanoNode, buffer, sink).pipe(take(4)));
135144
const numStakeKeysAfterFork = await getNumberOfLocalStakeKeys();
136145
expect(numStakeKeysAfterFork).toBeGreaterThan(numStakeKeysBeforeFork);
137146

138147
// Continue projection from ogmios
139-
const continue$ = project(ogmiosCardanoNode, sinksFactory).pipe(share());
148+
const continue$ = project(ogmiosCardanoNode, buffer, sink).pipe(share());
140149
const rollForward$ = continue$.pipe(filter((evt) => evt.eventType === ChainSyncEventType.RollForward));
141150
const rolledBackKeyRegistrations$ = continue$.pipe(
142151
filter(
@@ -178,14 +187,13 @@ describe('resuming projection when intersection is not local tip', () => {
178187

179188
describe('InMemory', () => {
180189
const store = InMemory.createStore();
181-
const sinks = InMemory.createSinks(store);
182-
testRollbackAndContinue(
183-
() => sinks,
184-
async () => store.stakeKeys.size
185-
);
190+
const buffer = new InMemory.InMemoryStabilityWindowBuffer();
191+
const sink = InMemory.createSink(store, buffer);
192+
testRollbackAndContinue(buffer, sink, async () => store.stakeKeys.size);
186193
});
187194

188195
describe('typeorm', () => {
196+
const buffer = new Postgres.TypeormStabilityWindowBuffer({ logger });
189197
const dataSource = Postgres.createDataSource({
190198
connectionConfig: pgConnectionConfig,
191199
devOptions: {
@@ -198,7 +206,8 @@ describe('resuming projection when intersection is not local tip', () => {
198206
},
199207
projections
200208
});
201-
const sinksFactory = Postgres.createSinksFactory({
209+
const sink = Postgres.createSink({
210+
buffer,
202211
dataSource$: of(dataSource),
203212
logger
204213
});
@@ -219,6 +228,6 @@ describe('resuming projection when intersection is not local tip', () => {
219228
});
220229
afterAll(() => dataSource.destroy());
221230

222-
testRollbackAndContinue(sinksFactory, getNumberOfLocalStakeKeys);
231+
testRollbackAndContinue(buffer, sink, getNumberOfLocalStakeKeys);
223232
});
224233
});

packages/projection-typeorm/src/TypeormStabilityWindowBuffer.ts

+32-20
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,7 @@ const blockDataSelect: FindOptionsSelect<BlockDataEntity> = {
2323
data: true as any
2424
};
2525

26-
export interface TypeormStabilityWindowBufferProps {
26+
export interface TypeormStabilityWindowBufferProps extends WithLogger {
2727
/**
2828
* 100 by default
2929
*/
@@ -34,32 +34,32 @@ export interface TypeormStabilityWindowBufferProps {
3434
allowNonSequentialBlockHeights?: boolean;
3535
}
3636

37-
export class TypeormStabilityWindowBuffer
38-
implements StabilityWindowBuffer<Operators.WithNetworkInfo & WithTypeormContext>
39-
{
37+
export class TypeormStabilityWindowBuffer implements StabilityWindowBuffer {
4038
#tail: Cardano.Block | 'origin';
39+
#tip$: ReplaySubject<Cardano.Block | 'origin'>;
40+
#tail$: ReplaySubject<Cardano.Block | 'origin'>;
41+
tip$: Observable<Cardano.Block | 'origin'>;
42+
tail$: Observable<Cardano.Block | 'origin'>;
4143
readonly #logger: Logger;
4244
readonly #compactEvery: number;
4345
readonly #allowNonSequentialBlockHeights?: boolean;
44-
readonly #tip$ = new ReplaySubject<Cardano.Block | 'origin'>(1);
45-
readonly #tail$ = new ReplaySubject<Cardano.Block | 'origin'>(1);
46-
readonly tip$: Observable<Cardano.Block | 'origin'>;
47-
readonly tail$: Observable<Cardano.Block | 'origin'>;
48-
readonly handleEvents: UnifiedProjectorOperator<
49-
Operators.WithNetworkInfo & WithTypeormContext,
50-
Operators.WithNetworkInfo & WithTypeormContext
51-
>;
5246

53-
constructor(
54-
{ allowNonSequentialBlockHeights, compactBufferEveryNBlocks = 100 }: TypeormStabilityWindowBufferProps,
55-
dependencies: WithLogger
56-
) {
57-
this.tip$ = this.#tip$.asObservable();
58-
this.tail$ = this.#tail$.asObservable();
47+
constructor({
48+
allowNonSequentialBlockHeights,
49+
compactBufferEveryNBlocks = 100,
50+
logger
51+
}: TypeormStabilityWindowBufferProps) {
52+
this.start();
5953
this.#compactEvery = compactBufferEveryNBlocks;
6054
this.#allowNonSequentialBlockHeights = allowNonSequentialBlockHeights;
61-
this.#logger = contextLogger(dependencies.logger, 'PgStabilityWindowBuffer');
62-
this.handleEvents = (evt$) =>
55+
this.#logger = contextLogger(logger, 'TypeormStabilityWindowBuffer');
56+
}
57+
58+
handleEvents(): UnifiedProjectorOperator<
59+
Operators.WithNetworkInfo & WithTypeormContext,
60+
Operators.WithNetworkInfo & WithTypeormContext
61+
> {
62+
return (evt$) =>
6363
evt$.pipe(
6464
concatMap((evt) =>
6565
from(
@@ -86,6 +86,18 @@ export class TypeormStabilityWindowBuffer
8686
this.#setTail(tail[0]?.data || 'origin');
8787
}
8888

89+
/**
90+
* (Re)create tip$ and tail$ observables.
91+
* Automatically called in constructor.
92+
* Can be used to reuse the same buffer object after shutdown().
93+
*/
94+
start() {
95+
this.#tip$ = new ReplaySubject(1);
96+
this.#tail$ = new ReplaySubject(1);
97+
this.tip$ = this.#tip$.asObservable();
98+
this.tail$ = this.#tail$.asObservable();
99+
}
100+
89101
shutdown(): void {
90102
this.#tip$.complete();
91103
this.#tail$.complete();

packages/projection-typeorm/src/createDataSource.ts

+2-3
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,9 @@
11
import 'reflect-metadata';
2-
import * as supportedSinks from './sinks';
32
import { BlockDataEntity } from './entity';
43
import { BlockEntity } from './entity/Block.entity';
54
import { DataSource, DataSourceOptions, DefaultNamingStrategy, NamingStrategyInterface } from 'typeorm';
65
import { Logger } from 'ts-log';
7-
import { WithTypeormSinkMetadata } from './types';
6+
import { supportedSinks } from './util';
87
import { typeormLogger } from './logger';
98
import snakeCase from 'lodash/snakeCase';
109
import uniq from 'lodash/uniq';
@@ -93,7 +92,7 @@ export const createDataSource = <P extends object>({
9392
projections,
9493
logger
9594
}: CreateDataSourceProps<P>) => {
96-
const requestedProjectionEntities = Object.entries<WithTypeormSinkMetadata>(supportedSinks)
95+
const requestedProjectionEntities = Object.entries(supportedSinks)
9796
.filter(([projectionName]) => projectionName in projections)
9897
.flatMap(([_, sink]) => sink.entities);
9998
const entities: Function[] = uniq([BlockEntity, BlockDataEntity, ...requestedProjectionEntities]);

0 commit comments

Comments
 (0)