Skip to content
This repository was archived by the owner on Mar 7, 2024. It is now read-only.

Commit 8eedfc8

Browse files
committed
support historical event export
1 parent 0239ddb commit 8eedfc8

File tree

3 files changed

+132
-130
lines changed

3 files changed

+132
-130
lines changed

index.ts

+115-107
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,6 @@ import { Client } from 'pg'
55
type PostgresPlugin = Plugin<{
66
global: {
77
pgClient: Client
8-
buffer: ReturnType<typeof createBuffer>
98
eventsToIgnore: Set<string>
109
sanitizedTableName: string
1110
}
@@ -17,37 +16,54 @@ type PostgresPlugin = Plugin<{
1716
tableName: string
1817
dbUsername: string
1918
dbPassword: string
20-
uploadSeconds: string
21-
uploadMegabytes: string
2219
eventsToIgnore: string
23-
isHeroku: 'Yes' | 'No'
20+
hasSelfSignedCert: 'Yes' | 'No'
2421
}
2522
}>
2623

2724
type PostgresMeta = PluginMeta<PostgresPlugin>
2825

2926
interface ParsedEvent {
30-
uuid: string
27+
uuid?: string
3128
eventName: string
32-
properties: Record<string, any>
33-
elements: Record<string, any>
34-
set: Record<string, any>
35-
set_once: Record<string, any>
29+
properties: string
30+
elements: string
31+
set: string
32+
set_once: string
3633
distinct_id: string
3734
team_id: number
38-
ip: string
35+
ip: string | null
3936
site_url: string
4037
timestamp: string
4138
}
4239

43-
type InsertQueryValue = string | number
44-
4540
interface UploadJobPayload {
4641
batch: ParsedEvent[]
4742
batchId: number
4843
retriesPerformedSoFar: number
4944
}
5045

46+
const randomBytes = (): string => {
47+
return (((1 + Math.random()) * 0x10000) | 0).toString(16).substring(1)
48+
}
49+
50+
const generateUuid = (): string => {
51+
return (
52+
randomBytes() +
53+
randomBytes() +
54+
'-' +
55+
randomBytes() +
56+
'-3' +
57+
randomBytes().substr(0, 2) +
58+
'-' +
59+
randomBytes() +
60+
'-' +
61+
randomBytes() +
62+
randomBytes() +
63+
randomBytes()
64+
).toLowerCase()
65+
}
66+
5167
export const jobs: PostgresPlugin['jobs'] = {
5268
uploadBatchToPostgres: async (payload: UploadJobPayload, meta: PostgresMeta) => {
5369
await insertBatchIntoPostgres(payload, meta)
@@ -66,9 +82,6 @@ export const setupPlugin: PostgresPlugin['setupPlugin'] = async (meta) => {
6682
}
6783
}
6884

69-
const uploadMegabytes = Math.max(1, Math.min(parseInt(config.uploadMegabytes) || 1, 10))
70-
const uploadSeconds = Math.max(1, Math.min(parseInt(config.uploadSeconds) || 1, 600))
71-
7285
global.sanitizedTableName = sanitizeSqlIdentifier(config.tableName)
7386

7487
const queryError = await executeQuery(
@@ -93,103 +106,102 @@ export const setupPlugin: PostgresPlugin['setupPlugin'] = async (meta) => {
93106
throw new Error(`Unable to connect to PostgreSQL instance and create table with error: ${queryError.message}`)
94107
}
95108

96-
global.buffer = createBuffer({
97-
limit: uploadMegabytes * 1024 * 1024,
98-
timeoutSeconds: uploadSeconds,
99-
onFlush: async (batch) => {
100-
await insertBatchIntoPostgres(
101-
{ batch, batchId: Math.floor(Math.random() * 1000000), retriesPerformedSoFar: 0 },
102-
meta
103-
)
104-
},
105-
})
106-
107109
global.eventsToIgnore = new Set(
108110
config.eventsToIgnore ? config.eventsToIgnore.split(',').map((event) => event.trim()) : null
109111
)
110112
}
111113

112-
export async function onEvent(event: PluginEvent, { global }: PostgresMeta) {
113-
const {
114-
event: eventName,
115-
properties,
116-
$set,
117-
$set_once,
118-
distinct_id,
119-
team_id,
120-
site_url,
121-
now,
122-
sent_at,
123-
uuid,
124-
..._discard
125-
} = event
126-
127-
const ip = properties?.['$ip'] || event.ip
128-
const timestamp = event.timestamp || properties?.timestamp || now || sent_at
129-
let ingestedProperties = properties
130-
let elements = []
131-
132-
// only move prop to elements for the $autocapture action
133-
if (eventName === '$autocapture' && properties && '$elements' in properties) {
134-
const { $elements, ...props } = properties
135-
ingestedProperties = props
136-
elements = $elements
137-
}
114+
export async function exportEvents(events: PluginEvent[], { global, jobs }: PostgresMeta) {
115+
const batch: ParsedEvent[] = []
116+
for (const event of events) {
117+
const {
118+
event: eventName,
119+
properties,
120+
$set,
121+
$set_once,
122+
distinct_id,
123+
team_id,
124+
site_url,
125+
now,
126+
sent_at,
127+
uuid,
128+
..._discard
129+
} = event
130+
131+
if (global.eventsToIgnore.has(eventName)) {
132+
continue
133+
}
134+
135+
const ip = properties?.['$ip'] || event.ip
136+
const timestamp = event.timestamp || properties?.timestamp || now || sent_at
137+
let ingestedProperties = properties
138+
let elements = []
139+
140+
// only move prop to elements for the $autocapture action
141+
if (eventName === '$autocapture' && properties && '$elements' in properties) {
142+
const { $elements, ...props } = properties
143+
ingestedProperties = props
144+
elements = $elements
145+
}
138146

139-
const parsedEvent = {
140-
uuid,
141-
eventName,
142-
properties: JSON.stringify(ingestedProperties || {}),
143-
elements: JSON.stringify(elements || {}),
144-
set: JSON.stringify($set || {}),
145-
set_once: JSON.stringify($set_once || {}),
146-
distinct_id,
147-
team_id,
148-
ip,
149-
site_url,
150-
timestamp: new Date(timestamp).toISOString(),
147+
const parsedEvent: ParsedEvent = {
148+
uuid,
149+
eventName,
150+
properties: JSON.stringify(ingestedProperties || {}),
151+
elements: JSON.stringify(elements || {}),
152+
set: JSON.stringify($set || {}),
153+
set_once: JSON.stringify($set_once || {}),
154+
distinct_id,
155+
team_id,
156+
ip,
157+
site_url,
158+
timestamp: new Date(timestamp).toISOString(),
159+
}
160+
161+
batch.push(parsedEvent)
151162
}
152163

153-
if (!global.eventsToIgnore.has(eventName)) {
154-
global.buffer.add(parsedEvent)
164+
if (batch.length > 0) {
165+
await jobs
166+
.uploadBatchToPostgres({ batch, batchId: Math.floor(Math.random() * 1000000), retriesPerformedSoFar: 0 })
167+
.runNow()
155168
}
156169
}
157170

158171
export const insertBatchIntoPostgres = async (payload: UploadJobPayload, { global, jobs, config }: PostgresMeta) => {
159-
let values: InsertQueryValue[] = []
172+
let values: any[] = []
160173
let valuesString = ''
161174

162175
for (let i = 0; i < payload.batch.length; ++i) {
163176
const { uuid, eventName, properties, elements, set, set_once, distinct_id, team_id, ip, site_url, timestamp } =
164177
payload.batch[i]
165178

179+
166180
// Creates format: ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11), ($12, $13, $14, $15, $16, $17, $18, $19, $20, $21, $22)
167181
valuesString += ' ('
168182
for (let j = 1; j <= 11; ++j) {
169183
valuesString += `$${11 * i + j}${j === 11 ? '' : ', '}`
170184
}
171185
valuesString += `)${i === payload.batch.length - 1 ? '' : ','}`
172-
173-
values = [
174-
...values,
175-
...[
176-
uuid,
177-
eventName,
178-
properties,
179-
elements,
180-
set,
181-
set_once,
182-
distinct_id,
183-
team_id,
184-
ip,
185-
site_url,
186-
timestamp,
187-
],
188-
]
186+
187+
values = values.concat([
188+
uuid || generateUuid(),
189+
eventName,
190+
properties,
191+
elements,
192+
set,
193+
set_once,
194+
distinct_id,
195+
team_id,
196+
ip,
197+
site_url,
198+
timestamp,
199+
])
189200
}
190201

191202
console.log(
192-
`(Batch Id: ${payload.batchId}) Flushing ${payload.batch.length} event${payload.batch.length > 1 ? 's' : ''
203+
`(Batch Id: ${payload.batchId}) Flushing ${payload.batch.length} event${
204+
payload.batch.length > 1 ? 's' : ''
193205
} to Postgres instance`
194206
)
195207

@@ -217,42 +229,38 @@ export const insertBatchIntoPostgres = async (payload: UploadJobPayload, { globa
217229
}
218230

219231
const executeQuery = async (query: string, values: any[], config: PostgresMeta['config']): Promise<Error | null> => {
220-
const basicConnectionOptions = config.databaseUrl ? {
221-
connectionString: config.databaseUrl
222-
} : {
223-
user: config.dbUsername,
224-
password: config.dbPassword,
225-
host: config.host,
226-
database: config.dbName,
227-
port: parseInt(config.port),
228-
}
229-
const pgClient = new Client(
230-
{
231-
...basicConnectionOptions,
232-
ssl: {
233-
rejectUnauthorized: config.isHeroku === "No"
234-
}
235-
}
236-
)
232+
const basicConnectionOptions = config.databaseUrl
233+
? {
234+
connectionString: config.databaseUrl,
235+
}
236+
: {
237+
user: config.dbUsername,
238+
password: config.dbPassword,
239+
host: config.host,
240+
database: config.dbName,
241+
port: parseInt(config.port),
242+
}
243+
const pgClient = new Client({
244+
...basicConnectionOptions,
245+
ssl: {
246+
rejectUnauthorized: config.hasSelfSignedCert === 'No',
247+
},
248+
})
237249

238250
await pgClient.connect()
239251

240252
let error: Error | null = null
241253
try {
242254
await pgClient.query(query, values)
243255
} catch (err) {
244-
error = err
256+
error = err as Error
245257
}
246258

247259
await pgClient.end()
248260

249261
return error
250262
}
251263

252-
export const teardownPlugin: PostgresPlugin['teardownPlugin'] = ({ global }) => {
253-
global.buffer.flush()
254-
}
255-
256264
const sanitizeSqlIdentifier = (unquotedIdentifier: string): string => {
257265
return unquotedIdentifier.replace(/[^\w\d_]+/g, '')
258266
}

package.json

+1-1
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
{
22
"name": "@posthog/postgres-plugin",
33
"private": true,
4-
"version": "0.0.1",
4+
"version": "0.1.0",
55
"description": "Export PostHog events to a PostgreSQL instance on ingestion.",
66
"devDependencies": {
77
"@posthog/plugin-contrib": "^0.0.3",

0 commit comments

Comments
 (0)