Skip to content

fix(apps/price_pusher) handle price feed removal more gracefully #2730

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 3 commits into from
Jun 5, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion apps/price_pusher/package.json
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
{
"name": "@pythnetwork/price-pusher",
"version": "9.3.3",
"version": "9.3.4",
"description": "Pyth Price Pusher",
"homepage": "https://pyth.network",
"main": "lib/index.js",
Expand Down
1 change: 1 addition & 0 deletions apps/price_pusher/src/aptos/aptos.ts
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,7 @@ export class AptosPricePusher implements IPricePusher {
async getPriceFeedsUpdateData(priceIds: string[]): Promise<number[][]> {
const response = await this.hermesClient.getLatestPriceUpdates(priceIds, {
encoding: "base64",
ignoreInvalidPriceIds: true,
});
return response.binary.data.map((data) =>
Array.from(Buffer.from(data, "base64")),
Expand Down
1 change: 1 addition & 0 deletions apps/price_pusher/src/fuel/fuel.ts
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,7 @@ export class FuelPricePusher implements IPricePusher {
try {
const response = await this.hermesClient.getLatestPriceUpdates(priceIds, {
encoding: "base64",
ignoreInvalidPriceIds: true,
});
priceFeedUpdateData = response.binary.data;
} catch (err: any) {
Expand Down
1 change: 1 addition & 0 deletions apps/price_pusher/src/injective/injective.ts
Original file line number Diff line number Diff line change
Expand Up @@ -301,6 +301,7 @@ export class InjectivePricePusher implements IPricePusher {
try {
const response = await this.hermesClient.getLatestPriceUpdates(priceIds, {
encoding: "base64",
ignoreInvalidPriceIds: true,
});
const vaas = response.binary.data;

Expand Down
1 change: 1 addition & 0 deletions apps/price_pusher/src/near/near.ts
Original file line number Diff line number Diff line change
Expand Up @@ -131,6 +131,7 @@ export class NearPricePusher implements IPricePusher {
): Promise<string[]> {
const response = await this.hermesClient.getLatestPriceUpdates(priceIds, {
encoding: "base64",
ignoreInvalidPriceIds: true,
});
return response.binary.data;
}
Expand Down
9 changes: 6 additions & 3 deletions apps/price_pusher/src/price-config.ts
Original file line number Diff line number Diff line change
Expand Up @@ -96,9 +96,12 @@ export function shouldUpdate(
): UpdateCondition {
const priceId = priceConfig.id;

// There is no price to update the target with.
// There is no price to update the target with. So we should not update it.
if (sourceLatestPrice === undefined) {
return UpdateCondition.YES;
logger.info(
`${priceConfig.alias} (${priceId}) is not available on the source network. Ignoring it.`,
);
return UpdateCondition.NO;
}

// It means that price never existed there. So we should push the latest price feed.
Expand Down Expand Up @@ -140,7 +143,7 @@ export function shouldUpdate(
}%? / early: < ${priceConfig.earlyUpdatePriceDeviation}%?) OR ` +
`Confidence ratio: ${confidenceRatioPct.toFixed(5)}% (< ${
priceConfig.confidenceRatio
}%? / early: < ${priceConfig.earlyUpdatePriceDeviation}%?)`,
}%? / early: < ${priceConfig.earlyUpdateConfidenceRatio}%?)`,
);

if (
Expand Down
33 changes: 22 additions & 11 deletions apps/price_pusher/src/pyth-price-listener.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import {
} from "@pythnetwork/hermes-client";
import { PriceInfo, IPriceListener, PriceItem } from "./interface";
import { Logger } from "pino";
import { sleep } from "./utils";

type TimestampInMs = number & { readonly _: unique symbol };

Expand Down Expand Up @@ -34,6 +35,24 @@ export class PythPriceListener implements IPriceListener {
// This method should be awaited on and once it finishes it has the latest value
// for the given price feeds (if they exist).
async start() {
this.startListening();

// Store health check interval reference
this.healthCheckInterval = setInterval(() => {
if (
this.lastUpdated === undefined ||
this.lastUpdated < Date.now() - 30 * 1000
) {
throw new Error("Hermes Price feeds are not updating.");
}
}, 5000);
}

async startListening() {
this.logger.info(
`Starting to listen for price updates from Hermes for ${this.priceIds.length} price feeds.`,
);

const eventSource = await this.hermesClient.getPriceUpdatesStream(
this.priceIds,
{
Expand Down Expand Up @@ -71,20 +90,12 @@ export class PythPriceListener implements IPriceListener {
});
};

eventSource.onerror = (error: Event) => {
eventSource.onerror = async (error: Event) => {
console.error("Error receiving updates from Hermes:", error);
eventSource.close();
await sleep(5000); // Wait a bit before trying to reconnect
this.startListening(); // Attempt to restart the listener
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

please note that we have another loop checking for latest price update that crashes if there's no update within last 30 seconds.

};

// Store health check interval reference
this.healthCheckInterval = setInterval(() => {
if (
this.lastUpdated === undefined ||
this.lastUpdated < Date.now() - 30 * 1000
) {
throw new Error("Hermes Price feeds are not updating.");
}
}, 5000);
}

getLatestPriceInfo(priceId: HexString): PriceInfo | undefined {
Expand Down
1 change: 1 addition & 0 deletions apps/price_pusher/src/solana/solana.ts
Original file line number Diff line number Diff line change
Expand Up @@ -118,6 +118,7 @@ export class SolanaPricePusher implements IPricePusher {
shuffledPriceIds,
{
encoding: "base64",
ignoreInvalidPriceIds: true,
},
);
priceFeedUpdateData = response.binary.data;
Expand Down
1 change: 1 addition & 0 deletions apps/price_pusher/src/sui/sui.ts
Original file line number Diff line number Diff line change
Expand Up @@ -225,6 +225,7 @@ export class SuiPricePusher implements IPricePusher {
priceIdChunk,
{
encoding: "base64",
ignoreInvalidPriceIds: true,
},
);
if (response.binary.data.length !== 1) {
Expand Down
1 change: 1 addition & 0 deletions apps/price_pusher/src/ton/ton.ts
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,7 @@ export class TonPricePusher implements IPricePusher {
try {
const response = await this.hermesClient.getLatestPriceUpdates(priceIds, {
encoding: "base64",
ignoreInvalidPriceIds: true,
});
priceFeedUpdateData = response.binary.data;
} catch (err: any) {
Expand Down
Loading