-
Notifications
You must be signed in to change notification settings - Fork 23
/
Copy pathstream.ts
201 lines (173 loc) · 6.18 KB
/
stream.ts
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
import { Event, type Options, StreamEvent } from './types'
import { getRandom } from './utils'
import type Poller from './poller'
import type { Emitter } from 'mitt'
const SSE_TIMEOUT_MS = 30000
export class Streamer {
private xhr: XMLHttpRequest
private closed: boolean = false
private readTimeoutCheckerId: any
private connectionOpened = false
private disconnectEventEmitted = false
private reconnectAttempts = 0
private retriesExhausted: boolean = false
constructor(
private eventBus: Emitter,
private configurations: Options,
private url: string,
private apiKey: string,
private standardHeaders: Record<string, string>,
private fallbackPoller: Poller,
private logDebug: (...data: any[]) => void,
private logError: (...data: any[]) => void,
private eventCallback: (e: StreamEvent) => void,
private maxRetries: number
) {}
start() {
const processData = (data: any): void => {
data.toString().split(/\r?\n/).forEach(processLine)
}
const processLine = (line: string): void => {
if (line.startsWith('data:')) {
const event: StreamEvent = JSON.parse(line.substring(5))
this.logDebugMessage('Received event from stream: ', event)
this.eventCallback(event)
}
}
const onConnected = () => {
this.logDebugMessage('Stream connected')
this.eventBus.emit(Event.CONNECTED)
this.reconnectAttempts = 0
}
const onDisconnect = () => {
clearInterval(this.readTimeoutCheckerId)
const reconnectDelayMs = getRandom(1000, 10000)
this.reconnectAttempts++
this.logDebugMessage('Stream disconnected, will reconnect in ' + reconnectDelayMs + 'ms')
if (!this.disconnectEventEmitted) {
this.eventBus.emit(Event.DISCONNECTED)
this.disconnectEventEmitted = true
}
if (this.reconnectAttempts >= 5 && this.reconnectAttempts % 5 === 0) {
this.logErrorMessage(
`Reconnection failed after ${this.reconnectAttempts} attempts; attempting further reconnections.`
)
}
if (this.reconnectAttempts >= this.maxRetries) {
this.retriesExhausted = true
if (this.configurations.pollingEnabled) {
this.logErrorMessage('Max streaming retries reached. Staying in polling mode.')
} else {
this.logErrorMessage('Max streaming retries reached.')
}
return
}
setTimeout(() => this.start(), reconnectDelayMs)
}
const onFailed = (msg: string) => {
if (this.retriesExhausted) {
return
}
if (!!msg) {
this.logDebugMessage('Stream has issue', msg)
}
// Fallback to polling while we have a stream failure
this.fallBackToPolling()
this.eventBus.emit(Event.ERROR_STREAM, msg)
this.eventBus.emit(Event.ERROR, msg)
onDisconnect()
}
const sseHeaders: Record<string, string> = {
'Cache-Control': 'no-cache',
Accept: 'text/event-stream',
'API-Key': this.apiKey,
...this.standardHeaders
}
this.logDebugMessage('SSE HTTP start request', this.url)
this.xhr = new XMLHttpRequest()
this.xhr.open('GET', this.url)
for (const [header, value] of Object.entries(sseHeaders)) {
this.xhr.setRequestHeader(header, value)
}
this.xhr.timeout = 24 * 60 * 60 * 1000 // Force SSE to reconnect after 24hrs
this.xhr.onerror = () => {
this.connectionOpened = false
onFailed('XMLHttpRequest error on SSE stream')
}
this.xhr.onabort = () => {
this.connectionOpened = false
this.logDebugMessage('SSE aborted')
if (!this.closed) {
onFailed(null)
}
}
this.xhr.ontimeout = () => {
this.connectionOpened = false
onFailed('SSE timeout')
}
// XMLHttpRequest fires `onload` when a request completes successfully, meaning the entire content has been downloaded.
// For SSE, if it fires it indicates an invalid state and we should reconnect
this.xhr.onload = () => {
onFailed(`Received XMLHttpRequest onLoad event: ${this.xhr.status}`)
return
}
let offset = 0
let lastActivity = Date.now()
this.xhr.onprogress = () => {
// XMLHttpRequest doesn't fire an `onload` event when used to open an SSE connection, so we fire the
// CONNECTED event here if we haven't already done so per unique connection event.
if (!this.connectionOpened) {
onConnected()
this.connectionOpened = true
this.disconnectEventEmitted = false
}
// if we are in polling mode due to a recovered streaming error, then stop polling
this.stopFallBackPolling()
lastActivity = Date.now()
const data = this.xhr.responseText.slice(offset)
offset += data.length
this.logDebugMessage('SSE GOT: ' + data)
processData(data)
}
this.readTimeoutCheckerId = setInterval(() => {
// this task will kill and restart the SSE connection if no data or heartbeat has arrived in a while
if (lastActivity < Date.now() - SSE_TIMEOUT_MS) {
this.logDebugMessage('SSE read timeout')
this.xhr.abort()
}
}, SSE_TIMEOUT_MS)
this.xhr.send()
}
close(): void {
this.connectionOpened = false
this.closed = true
if (this.xhr) {
this.xhr.abort()
}
// Stop the task that listens for heartbeats
clearInterval(this.readTimeoutCheckerId)
this.eventBus.emit(Event.STOPPED)
// if we are still in polling mode when close is called, then stop polling
this.stopFallBackPolling()
}
private fallBackToPolling() {
if (!this.fallbackPoller.isPolling() && this.configurations.pollingEnabled) {
this.logDebugMessage('Falling back to polling mode while stream recovers')
this.fallbackPoller.start()
}
}
private stopFallBackPolling() {
if (this.fallbackPoller.isPolling()) {
this.logDebugMessage('Stopping fallback polling mode')
this.fallbackPoller.stop()
}
}
private logDebugMessage(message: string, ...args: unknown[]): void {
if (this.configurations.debug) {
this.logDebug(`Streaming: ${message}`, ...args)
}
}
private logErrorMessage(message: string, ...args: unknown[]): void {
this.logError(`Streaming: ${message}`, ...args)
}
}