diff --git a/.changeset/neat-badgers-matter.md b/.changeset/neat-badgers-matter.md new file mode 100644 index 000000000..f60dedf7e --- /dev/null +++ b/.changeset/neat-badgers-matter.md @@ -0,0 +1,7 @@ +--- +"@hyperdx/common-utils": patch +"@hyperdx/api": patch +"@hyperdx/app": patch +--- + +feat: move rrweb event fetching to the client instead of an api route diff --git a/packages/api/src/api-app.ts b/packages/api/src/api-app.ts index fa603b9e6..1f21ed190 100644 --- a/packages/api/src/api-app.ts +++ b/packages/api/src/api-app.ts @@ -89,7 +89,6 @@ app.use('/', routers.rootRouter); app.use('/alerts', isUserAuthenticated, routers.alertsRouter); app.use('/dashboards', isUserAuthenticated, routers.dashboardRouter); app.use('/me', isUserAuthenticated, routers.meRouter); -app.use('/sessions', isUserAuthenticated, routers.sessionsRouter); app.use('/team', isUserAuthenticated, routers.teamRouter); app.use('/webhooks', isUserAuthenticated, routers.webhooksRouter); app.use('/datasources', isUserAuthenticated, routers.datasourceRouter); diff --git a/packages/api/src/middleware/validation.ts b/packages/api/src/middleware/validation.ts new file mode 100644 index 000000000..ade29e363 --- /dev/null +++ b/packages/api/src/middleware/validation.ts @@ -0,0 +1,17 @@ +import express from 'express'; +import { z } from 'zod'; + +export function validateRequestHeaders(schema: T) { + return function ( + req: express.Request, + res: express.Response, + next: express.NextFunction, + ) { + const parsed = schema.safeParse(req.headers); + if (!parsed.success) { + return res.status(400).json({ type: 'Headers', errors: parsed.error }); + } + + return next(); + }; +} diff --git a/packages/api/src/routers/api/clickhouseProxy.ts b/packages/api/src/routers/api/clickhouseProxy.ts index cf07498ca..3fc718133 100644 --- a/packages/api/src/routers/api/clickhouseProxy.ts +++ b/packages/api/src/routers/api/clickhouseProxy.ts @@ -1,10 +1,11 @@ -import express, { Request, Response } from 'express'; +import express, { RequestHandler, Response } from 'express'; import { createProxyMiddleware } from 'http-proxy-middleware'; import { z } from 'zod'; import { validateRequest } from 'zod-express-middleware'; import { getConnectionById } from '@/controllers/connection'; import { getNonNullUserWithTeam } from '@/middleware/auth'; +import { validateRequestHeaders } from '@/middleware/validation'; import { objectIdSchema } from '@/utils/zod'; const router = express.Router(); @@ -58,17 +59,22 @@ router.post( }, ); -router.get( - '/*', - validateRequest({ - query: z.object({ - hyperdx_connection_id: objectIdSchema, - }), +const hasConnectionId = validateRequestHeaders( + z.object({ + 'x-hyperdx-connection-id': objectIdSchema, }), +); + +const getConnection: RequestHandler = + // prettier-ignore-next-line async (req, res, next) => { try { const { teamId } = getNonNullUserWithTeam(req); - const { hyperdx_connection_id } = req.query; + const connection_id = req.headers['x-hyperdx-connection-id']!; // ! because zod already validated + delete req.headers['x-hyperdx-connection-id']; + const hyperdx_connection_id = Array.isArray(connection_id) + ? connection_id.join('') + : connection_id; const connection = await getConnectionById( teamId.toString(), @@ -93,13 +99,15 @@ router.get( console.error('Error fetching connection info:', e); next(e); } - }, + }; + +const proxyMiddleware: RequestHandler = + // prettier-ignore-next-line createProxyMiddleware({ target: '', // doesn't matter. it should be overridden by the router changeOrigin: true, pathFilter: (path, _req) => { - // TODO: allow other methods - return _req.method === 'GET'; + return _req.method === 'GET' || _req.method === 'POST'; }, pathRewrite: { '^/clickhouse-proxy': '', @@ -113,8 +121,8 @@ router.get( on: { proxyReq: (proxyReq, _req) => { const newPath = _req.params[0]; + // @ts-expect-error _req.query is type ParamQs, which doesn't play nicely with URLSearchParams. TODO: Replace with getting query params from _req.url eventually const qparams = new URLSearchParams(_req.query); - qparams.delete('hyperdx_connection_id'); if (_req._hdx_connection?.username && _req._hdx_connection?.password) { proxyReq.setHeader( 'X-ClickHouse-User', @@ -122,7 +130,11 @@ router.get( ); proxyReq.setHeader('X-ClickHouse-Key', _req._hdx_connection.password); } - proxyReq.path = `/${newPath}?${qparams.toString()}`; + if (_req.method === 'POST') { + // TODO: Use fixRequestBody after this issue is resolved: https://github.com/chimurai/http-proxy-middleware/issues/1102 + proxyReq.write(_req.body); + } + proxyReq.path = `/${newPath}?${qparams}`; }, proxyRes: (proxyRes, _req, res) => { // since clickhouse v24, the cors headers * will be attached to the response by default @@ -158,7 +170,9 @@ router.get( // ...(config.IS_DEV && { // logger: console, // }), - }), -); + }); + +router.get('/*', hasConnectionId, getConnection, proxyMiddleware); +router.post('/*', hasConnectionId, getConnection, proxyMiddleware); export default router; diff --git a/packages/api/src/routers/api/index.ts b/packages/api/src/routers/api/index.ts index 11cde8a60..d439fbbe1 100644 --- a/packages/api/src/routers/api/index.ts +++ b/packages/api/src/routers/api/index.ts @@ -3,7 +3,6 @@ import dashboardRouter from './dashboards'; import datasourceRouter from './datasources'; import meRouter from './me'; import rootRouter from './root'; -import sessionsRouter from './sessions'; import teamRouter from './team'; import webhooksRouter from './webhooks'; @@ -13,7 +12,6 @@ export default { dashboardRouter, meRouter, rootRouter, - sessionsRouter, teamRouter, webhooksRouter, }; diff --git a/packages/api/src/routers/api/sessions.ts b/packages/api/src/routers/api/sessions.ts deleted file mode 100644 index c5bcdfd46..000000000 --- a/packages/api/src/routers/api/sessions.ts +++ /dev/null @@ -1,152 +0,0 @@ -import type { Row } from '@clickhouse/client'; -import { ClickhouseClient } from '@hyperdx/common-utils/dist/clickhouse'; -import { getMetadata } from '@hyperdx/common-utils/dist/metadata'; -import { renderChartConfig } from '@hyperdx/common-utils/dist/renderChartConfig'; -import opentelemetry, { SpanStatusCode } from '@opentelemetry/api'; -import express from 'express'; -import { parseInt } from 'lodash'; -import { serializeError } from 'serialize-error'; -import { z } from 'zod'; -import { validateRequest } from 'zod-express-middleware'; - -import { getConnectionById } from '@/controllers/connection'; -import { getNonNullUserWithTeam } from '@/middleware/auth'; -import { Source } from '@/models/source'; -import logger from '@/utils/logger'; -import { objectIdSchema } from '@/utils/zod'; - -const router = express.Router(); - -router.get( - '/:sessionId/rrweb', - validateRequest({ - params: z.object({ - sessionId: z.string(), - }), - query: z.object({ - endTime: z.string().regex(/^\d+$/, 'Must be an integer string'), - limit: z.string().regex(/^\d+$/, 'Must be an integer string'), - offset: z.string().regex(/^\d+$/, 'Must be an integer string'), - serviceName: z.string(), - sourceId: objectIdSchema, - startTime: z.string().regex(/^\d+$/, 'Must be an integer string'), - }), - }), - async (req, res, next) => { - try { - const { sessionId } = req.params; - const { endTime, limit, offset, serviceName, sourceId, startTime } = - req.query; - - const { teamId } = getNonNullUserWithTeam(req); - - const source = await Source.findById(sourceId); - - if (!source) { - res.status(404).send('Source not found'); - return; - } - - const connection = await getConnectionById( - teamId.toString(), - source.connection.toString(), - true, - ); - - if (!connection) { - res.status(404).send('Connection not found'); - return; - } - - const MAX_LIMIT = 1e6; - - res.setHeader('Cache-Control', 'no-cache'); - res.setHeader('Content-Type', 'text/event-stream'); - res.setHeader('Connection', 'keep-alive'); - res.flushHeaders(); // flush the headers to establish SSE with client - - const clickhouseClient = new ClickhouseClient({ - host: connection.host, - username: connection.username, - password: connection.password, - }); - - const metadata = getMetadata(clickhouseClient); - const query = await renderChartConfig( - { - // FIXME: add mappings to session source - select: [ - { - valueExpression: `${source.implicitColumnExpression}`, - alias: 'b', - }, - { - valueExpression: `simpleJSONExtractInt(${source.implicitColumnExpression}, 'type')`, - alias: 't', - }, - { - valueExpression: `${source.eventAttributesExpression}['rr-web.chunk']`, - alias: 'ck', - }, - { - valueExpression: `${source.eventAttributesExpression}['rr-web.total-chunks']`, - alias: 'tcks', - }, - ], - dateRange: [ - new Date(parseInt(startTime)), - new Date(parseInt(endTime)), - ], - from: source.from, - whereLanguage: 'lucene', - where: `ServiceName:"${serviceName}" AND ${source.resourceAttributesExpression}.rum.sessionId:"${sessionId}"`, - timestampValueExpression: source.timestampValueExpression, - implicitColumnExpression: source.implicitColumnExpression, - connection: connection.id, - orderBy: `${source.timestampValueExpression} ASC`, - limit: { - limit: Math.min(MAX_LIMIT, parseInt(limit)), - offset: parseInt(offset), - }, - }, - metadata, - ); - - const resultSet = await clickhouseClient.query({ - query: query.sql, - query_params: query.params, - format: 'JSONEachRow', - clickhouse_settings: { - wait_end_of_query: 0, - send_progress_in_http_headers: 0, - }, - }); - const stream = resultSet.stream(); - - stream.on('data', (rows: Row[]) => { - res.write(`${rows.map(row => `data: ${row.text}`).join('\n')}\n\n`); - res.flush(); - }); - stream.on('end', () => { - logger.info('Stream ended'); - - res.write('event: end\ndata:\n\n'); - res.end(); - }); - } catch (e) { - const span = opentelemetry.trace.getActiveSpan(); - span?.recordException(e as Error); - span?.setStatus({ code: SpanStatusCode.ERROR }); - // WARNING: no need to call next(e) here, as the stream will be closed - logger.error({ - message: 'Error while streaming rrweb events', - error: serializeError(e), - teamId: req.user?.team, - query: req.query, - }); - res.end(); - } - }, -); - -export default router; diff --git a/packages/app/pages/api/[...all].ts b/packages/app/pages/api/[...all].ts index 2009ac5ae..3e005bfdb 100644 --- a/packages/app/pages/api/[...all].ts +++ b/packages/app/pages/api/[...all].ts @@ -6,7 +6,7 @@ const DEFAULT_SERVER_URL = `http://127.0.0.1:${process.env.HYPERDX_API_PORT}`; export const config = { api: { externalResolver: true, - bodyParser: true, + bodyParser: false, }, }; @@ -17,12 +17,6 @@ export default (req: NextApiRequest, res: NextApiResponse) => { pathRewrite: { '^/api': '' }, target: process.env.NEXT_PUBLIC_SERVER_URL || DEFAULT_SERVER_URL, autoRewrite: true, - /** - * Fix bodyParser - **/ - on: { - proxyReq: fixRequestBody, - }, // ...(IS_DEV && { // logger: console, // }), diff --git a/packages/app/src/sessions.ts b/packages/app/src/sessions.ts index f349c1a3b..5e4c90dd5 100644 --- a/packages/app/src/sessions.ts +++ b/packages/app/src/sessions.ts @@ -1,5 +1,7 @@ import { useCallback, useEffect, useRef, useState } from 'react'; +import produce from 'immer'; import type { ResponseJSON } from '@clickhouse/client'; +import { createClient } from '@clickhouse/client-web'; import { chSql } from '@hyperdx/common-utils/dist/clickhouse'; import { renderChartConfig } from '@hyperdx/common-utils/dist/renderChartConfig'; import { @@ -8,13 +10,15 @@ import { SearchConditionLanguage, TSource, } from '@hyperdx/common-utils/dist/types'; -import { fetchEventSource } from '@microsoft/fetch-event-source'; import { useQuery, UseQueryOptions } from '@tanstack/react-query'; import { getMetadata } from '@/metadata'; import { usePrevious } from '@/utils'; import { getClickhouseClient } from './clickhouse'; +import { IS_LOCAL_MODE } from './config'; +import { getLocalConnections } from './connection'; +import { useSource } from './source'; export type Session = { errorCount: string; @@ -236,6 +240,27 @@ class FatalError extends Error {} class TimeoutError extends Error {} const EventStreamContentType = 'text/event-stream'; +async function* streamToAsyncIterator( + stream: ReadableStream, +): AsyncIterableIterator { + const reader = stream.getReader(); + try { + while (true) { + const { done, value } = await reader.read(); + if (done) return; + yield value; + } + } finally { + reader.releaseLock(); + } +} + +// OPTIMIZATION STRATEGY +// +// 1. Write a clickhouse query to divide a session into different chunks, where each chunk has a start time. Maybe each chunk contains 100 events. +// 2. When slider advances, use the timestamp to determine which chunk you are in +// 3. Fetch data associated with that chunk +// 4. Probably do some prefetching for future times export function useRRWebEventStream( { serviceName, @@ -278,6 +303,8 @@ export function useRRWebEventStream( const [fetchStatus, setFetchStatus] = useState<'fetching' | 'idle'>('idle'); const lastFetchStatusRef = useRef<'fetching' | 'idle' | undefined>(); + const { data: source } = useSource({ id: sourceId }); + const fetchResults = useCallback( async ({ pageParam = 0, @@ -286,20 +313,14 @@ export function useRRWebEventStream( pageParam: number; limit?: number; }) => { + if (!source) return; const resBuffer: any[] = []; let linesFetched = 0; const startTime = startDate.getTime().toString(); const endTime = endDate.getTime().toString(); - - const searchParams = new URLSearchParams([ - ['endTime', endTime], - ['limit', (limitOverride ?? limit).toString()], - ['offset', pageParam.toString()], - ['serviceName', serviceName], - ['sourceId', sourceId], - ['startTime', startTime], - ]); + const queryLimit = (limitOverride ?? limit).toString(); + const offset = pageParam.toString(); const ctrl = new AbortController(); lastAbortController.current = ctrl; @@ -308,90 +329,137 @@ export function useRRWebEventStream( setFetchStatus('fetching'); lastFetchStatusRef.current = 'fetching'; - const fetchPromise = fetchEventSource( - `/api/sessions/${sessionId}/rrweb?${searchParams.toString()}`, + const MAX_LIMIT = 1e6; + + const metadata = getMetadata(); + const query = await renderChartConfig( { - method: 'GET', - signal: ctrl.signal, - credentials: 'include', - async onopen(response) { - if ( - response.ok && - response.headers.get('content-type') === EventStreamContentType - ) { - return; // everything's good - } else if ( - response.status >= 400 && - response.status < 500 && - response.status !== 429 - ) { - // client-side errors are usually non-retriable: - // TODO: handle these??? - throw new FatalError(); - } else { - throw new RetriableError(); - } + // FIXME: add mappings to session source + select: [ + { + valueExpression: `${source.implicitColumnExpression}`, + alias: 'b', + }, + { + valueExpression: `simpleJSONExtractInt(${source.implicitColumnExpression}, 'type')`, + alias: 't', + }, + { + valueExpression: `${source.eventAttributesExpression}['rr-web.chunk']`, + alias: 'ck', + }, + { + valueExpression: `${source.eventAttributesExpression}['rr-web.total-chunks']`, + alias: 'tcks', + }, + ], + dateRange: [ + new Date(parseInt(startTime)), + new Date(parseInt(endTime)), + ], + from: source.from, + whereLanguage: 'lucene', + where: `ServiceName:"${serviceName}" AND ${source.resourceAttributesExpression}.rum.sessionId:"${sessionId}"`, + timestampValueExpression: source.timestampValueExpression, + implicitColumnExpression: source.implicitColumnExpression, + connection: source.connection, + orderBy: `${source.timestampValueExpression} ASC`, + limit: { + limit: Math.min(MAX_LIMIT, parseInt(queryLimit)), + offset: parseInt(offset), }, - onmessage(event) { - if (event.event === '') { - const parsedRows = event.data - .split('\n') - .map((row: string) => { - try { - const parsed = JSON.parse(row); - linesFetched++; - return parsed; - } catch (e) { - return null; - } - }) - .filter((v: any) => v !== null); - - if (onEvent != null) { - parsedRows.forEach(onEvent); - } else if (keepPreviousData) { - resBuffer.push(...parsedRows); - } else { - setResults(prevResults => ({ - key: resultsKey ?? prevResults.key ?? 'DEFAULT_KEY', - data: [...prevResults.data, ...parsedRows], - })); - } - } else if (event.event === 'end') { - onEnd?.(); - - if (keepPreviousData) { - setResults({ - key: resultsKey ?? 'DEFAULT_KEY', - data: resBuffer, - }); - } - - if (linesFetched === 0 || linesFetched < limit) { - setHasNextPage(false); - } - } - }, - onclose() { - ctrl.abort(); - - setIsFetching(false); - setFetchStatus('idle'); - lastFetchStatusRef.current = 'idle'; - // if the server closes the connection unexpectedly, retry: - // throw new RetriableError(); - }, - // onerror(err) { - // if (err instanceof FatalError) { - // throw err; // rethrow to stop the operation - // } else { - // // do nothing to automatically retry. You can also - // // return a specific retry interval here. - // } - // }, }, + metadata, ); + // TODO: Change ClickhouseClient class to use this under the hood, + // and refactor this to use ClickhouseClient.query. Also change pathname + // in createClient to PROXY_CLICKHOUSE_HOST instead + const format = 'JSONEachRow'; + const queryFn = async () => { + if (IS_LOCAL_MODE) { + const localConnections = getLocalConnections(); + const localModeUrl = new URL(localConnections[0].host); + localModeUrl.username = localConnections[0].username; + localModeUrl.password = localConnections[0].password; + + const clickhouseClient = getClickhouseClient(); + return clickhouseClient.query({ + query: query.sql, + query_params: query.params, + format, + }); + } else { + const clickhouseClient = createClient({ + clickhouse_settings: { + add_http_cors_header: IS_LOCAL_MODE ? 1 : 0, + cancel_http_readonly_queries_on_client_close: 1, + date_time_output_format: 'iso', + wait_end_of_query: 0, + }, + http_headers: { 'x-hyperdx-connection-id': source.connection }, + keep_alive: { + enabled: true, + }, + url: window.location.origin, + pathname: '/api/clickhouse-proxy', + compression: { + response: true, + }, + }); + + return clickhouseClient.query({ + query: query.sql, + query_params: query.params, + format, + }); + } + }; + + const fetchPromise = (async () => { + const resultSet = await queryFn(); + + let forFunc: (data: any) => void; + if (onEvent) { + forFunc = onEvent; + } else if (keepPreviousData) { + forFunc = (data: any) => resBuffer.push(data); + } else { + forFunc = (data: any) => + setResults(prevResults => + produce(prevResults, draft => { + draft.key = resultsKey ?? draft.key ?? 'DEFAULT_KEY'; + draft.data.push(data); + }), + ); + } + const stream = resultSet.stream(); + for await (const chunk of streamToAsyncIterator(stream)) { + for (const row of chunk) { + try { + const parsed = row.json(); + linesFetched++; + forFunc(parsed); + } catch { + // do noting + } + } + } + + onEnd?.(); + + if (keepPreviousData) { + setResults({ + key: resultsKey ?? 'DEFAULT_KEY', + data: resBuffer, + }); + } + + if (linesFetched === 0 || linesFetched < limit) { + setHasNextPage(false); + } + })(); + try { await Promise.race([ fetchPromise, @@ -413,8 +481,15 @@ export function useRRWebEventStream( console.error(e); } } + + ctrl.abort(); + setIsFetching(false); + setFetchStatus('idle'); + lastFetchStatusRef.current = 'idle'; }, [ + source, + serviceName, sessionId, startDate, endDate, diff --git a/packages/common-utils/src/clickhouse.ts b/packages/common-utils/src/clickhouse.ts index 4f2c5749f..7333c94bd 100644 --- a/packages/common-utils/src/clickhouse.ts +++ b/packages/common-utils/src/clickhouse.ts @@ -361,15 +361,13 @@ export class ClickhouseClient { clickhouse_settings?: Record; connectionId?: string; queryId?: string; - }): Promise> { + }): Promise> { const isLocalMode = this.username != null && this.password != null; const includeCredentials = !isLocalMode; const includeCorsHeader = isLocalMode; - const _connectionId = isLocalMode ? undefined : connectionId; const searchParams = new URLSearchParams([ ...(includeCorsHeader ? [['add_http_cors_header', '1']] : []), - ...(_connectionId ? [['hyperdx_connection_id', _connectionId]] : []), ['query', query], ['default_format', format], ['date_time_output_format', 'iso'], @@ -405,11 +403,17 @@ export class ClickhouseClient { if (isBrowser) { // TODO: check if we can use the client-web directly const { ResultSet } = await import('@clickhouse/client-web'); + + const headers = {}; + if (!isLocalMode && connectionId) { + headers['x-hyperdx-connection-id'] = connectionId; + } // https://github.com/ClickHouse/clickhouse-js/blob/1ebdd39203730bb99fad4c88eac35d9a5e96b34a/packages/client-web/src/connection/web_connection.ts#L200C7-L200C23 const response = await fetch(`${this.host}/?${searchParams.toString()}`, { ...(includeCredentials ? { credentials: 'include' } : {}), signal: abort_signal, method: 'GET', + headers, }); // TODO: Send command to CH to cancel query on abort_signal