Skip to content

FFM-11788 Add maxStreamRetries config option #126

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 12 commits into from
Jul 31, 2024
26 changes: 26 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -119,6 +119,32 @@ const client = initialize(
)
```

Max Stream Retries
You can configure the maximum number of streaming retries before the SDK stops attempting to reconnect or falls back to polling (if enabled). The maxRetries option can be set to any positive number or Infinity for unlimited retries (which is the default).

```typescript
const options = {
maxRetries: 5, // Set the maximum number of retries for streaming. Default is Infinity.
streamEnabled: true,
pollingEnabled: true,
pollingInterval: 60000,
}

const client = initialize(
'YOUR_SDK_KEY',
{
identifier: 'Harness1',
attributes: {
lastUpdated: Date(),
host: location.href
}
},
options
)

```
If maxRetries is reached and pollingEnabled is true, the SDK will stay in polling mode. If pollingEnabled is false, the SDK will not poll, and evaluations will not be updated until the SDK Client is initialized again, for example if the app or page is restarted.

## Listening to events from the `client` instance.

```typescript
Expand Down
247 changes: 247 additions & 0 deletions src/__tests__/stream.test.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,247 @@
import { Streamer } from '../stream'
import type { Options } from '../types'
import { Event } from '../types'
import { getRandom } from '../utils'
import type { Emitter } from 'mitt'
import type Poller from "../poller";

jest.useFakeTimers()

jest.mock('../utils.ts', () => ({
getRandom: jest.fn()
}))

const mockEventBus: Emitter = {
emit: jest.fn(),
on: jest.fn(),
off: jest.fn(),
all: new Map()
}

const mockXHR = {
open: jest.fn(),
setRequestHeader: jest.fn(),
send: jest.fn(),
abort: jest.fn(),
status: 0,
responseText: '',
onload: null,
onerror: null,
onprogress: null,
onabort: null,
ontimeout: null
}

global.XMLHttpRequest = jest.fn(() => mockXHR) as unknown as jest.MockedClass<typeof XMLHttpRequest>

const logError = jest.fn()
const logDebug = jest.fn()

const getStreamer = (overrides: Partial<Options> = {}, maxRetries: number = Infinity): Streamer => {
const options: Options = {
baseUrl: 'http://test',
eventUrl: 'http://event',
pollingInterval: 60000,
debug: true,
pollingEnabled: true,
streamEnabled: true,
...overrides
}

return new Streamer(
mockEventBus,
options,
`${options.baseUrl}/stream`,
'test-api-key',
{ 'Test-Header': 'value' },
{ start: jest.fn(), stop: jest.fn(), isPolling: jest.fn() } as unknown as Poller,
logDebug,
logError,
jest.fn(),
maxRetries
)
}

describe('Streamer', () => {
beforeEach(() => {
jest.clearAllMocks()
})

it('should connect and emit CONNECTED event', () => {
const streamer = getStreamer({}, 3)

streamer.start()
expect(mockXHR.open).toHaveBeenCalledWith('GET', 'http://test/stream')
expect(mockXHR.send).toHaveBeenCalled()

mockXHR.onprogress({} as ProgressEvent)
expect(mockEventBus.emit).toHaveBeenCalledWith(Event.CONNECTED)
})

it('should reconnect successfully after multiple failures', () => {
const streamer = getStreamer({}, 5)

streamer.start()
expect(mockXHR.send).toHaveBeenCalled()

for (let i = 0; i < 3; i++) {
mockXHR.onerror({} as ProgressEvent)
jest.advanceTimersByTime(getRandom(1000, 10000))
}

// Simulate a successful connection on the next attempt
mockXHR.onprogress({} as ProgressEvent)

expect(mockEventBus.emit).toHaveBeenCalledWith(Event.CONNECTED)
expect(mockXHR.send).toHaveBeenCalledTimes(4) // Should attempt to reconnect 3 times before succeeding
})

it('should retry connecting on error and eventually fallback to polling', () => {
const streamer = getStreamer()

streamer.start()
expect(mockXHR.send).toHaveBeenCalled()

for (let i = 0; i < 3; i++) {
mockXHR.onerror({} as ProgressEvent)
jest.advanceTimersByTime(getRandom(1000, 10000))
}

expect(mockEventBus.emit).toHaveBeenCalledWith(Event.DISCONNECTED)
})

it('should not retry after max retries are exhausted', () => {
const streamer = getStreamer({}, 3)

streamer.start()
expect(mockXHR.send).toHaveBeenCalled()

for (let i = 0; i < 3; i++) {
mockXHR.onerror({} as ProgressEvent)
jest.advanceTimersByTime(getRandom(1000, 10000))
}

mockXHR.onerror({} as ProgressEvent)
expect(logError).toHaveBeenCalledWith('Streaming: Max streaming retries reached. Staying in polling mode.')
expect(mockEventBus.emit).toHaveBeenCalledWith(Event.DISCONNECTED)
expect(mockXHR.send).toHaveBeenCalledTimes(3) // Should not send after max retries
})

it('should fallback to polling on stream failure', () => {
const poller = { start: jest.fn(), stop: jest.fn(), isPolling: jest.fn() } as unknown as Poller
const streamer = new Streamer(
mockEventBus,
{ baseUrl: 'http://test', eventUrl: 'http://event', pollingEnabled: true, streamEnabled: true, debug: true },
'http://test/stream',
'test-api-key',
{ 'Test-Header': 'value' },
poller,
logDebug,
logError,
jest.fn(),
Infinity
)

streamer.start()
expect(mockXHR.send).toHaveBeenCalled()

mockXHR.onerror({} as ProgressEvent)
jest.advanceTimersByTime(getRandom(1000, 10000))

expect(poller.start).toHaveBeenCalled()
expect(logDebug).toHaveBeenCalledWith('Streaming: Falling back to polling mode while stream recovers')
})

it('should stop polling when close is called if in fallback polling mode', () => {
const poller = { start: jest.fn(), stop: jest.fn(), isPolling: jest.fn() } as unknown as Poller
;(poller.isPolling as jest.Mock)
.mockImplementationOnce(() => false)
.mockImplementationOnce(() => true)

const streamer = new Streamer(
mockEventBus,
{ baseUrl: 'http://test', eventUrl: 'http://event', pollingEnabled: true, streamEnabled: true, debug: true },
'http://test/stream',
'test-api-key',
{ 'Test-Header': 'value' },
poller,
logDebug,
logError,
jest.fn(),
3
)

streamer.start()
expect(mockXHR.send).toHaveBeenCalled()

// Simulate stream failure and fallback to polling
mockXHR.onerror({} as ProgressEvent)
jest.advanceTimersByTime(getRandom(1000, 10000))

// Ensure polling has started
expect(poller.start).toHaveBeenCalled()

// Now close the streamer
streamer.close()

expect(mockXHR.abort).toHaveBeenCalled()
expect(poller.stop).toHaveBeenCalled()
expect(mockEventBus.emit).toHaveBeenCalledWith(Event.STOPPED)
})

it('should stop streaming but not call poller.stop if not in fallback polling mode when close is called', () => {
const poller = { start: jest.fn(), stop: jest.fn(), isPolling: jest.fn().mockReturnValue(false) } as unknown as Poller
const streamer = new Streamer(
mockEventBus,
{ baseUrl: 'http://test', eventUrl: 'http://event', pollingEnabled: true, streamEnabled: true, debug: true },
'http://test/stream',
'test-api-key',
{ 'Test-Header': 'value' },
poller,
logDebug,
logError,
jest.fn(),
3
)

streamer.start()
streamer.close()

expect(mockXHR.abort).toHaveBeenCalled()
expect(poller.stop).not.toHaveBeenCalled()
expect(mockEventBus.emit).toHaveBeenCalledWith(Event.STOPPED)
})

it('should retry indefinitely if maxRetries is set to Infinity', () => {
const streamer = getStreamer()

streamer.start()
expect(mockXHR.send).toHaveBeenCalled()

for (let i = 0; i < 100; i++) {
mockXHR.onerror({} as ProgressEvent)
jest.advanceTimersByTime(getRandom(1000, 10000))
}

expect(logError).not.toHaveBeenCalledWith('Streaming: Max streaming retries reached. Staying in polling mode.')
expect(mockXHR.send).toHaveBeenCalledTimes(101)
})

it('should reconnect successfully after multiple failures', () => {
const streamer = getStreamer({}, 5)

streamer.start()
expect(mockXHR.send).toHaveBeenCalled()

for (let i = 0; i < 3; i++) {
mockXHR.onerror({} as ProgressEvent)
jest.advanceTimersByTime(getRandom(1000, 10000))
}

// Simulate a successful connection on the next attempt
mockXHR.onprogress({} as ProgressEvent)

expect(mockEventBus.emit).toHaveBeenCalledWith(Event.CONNECTED)
expect(mockXHR.send).toHaveBeenCalledTimes(4) // Should attempt to reconnect 3 times before succeeding
})
})
3 changes: 2 additions & 1 deletion src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -543,7 +543,8 @@ const initialize = (apiKey: string, target: Target, options?: Options): Result =
} else if (event.domain === 'target-segment') {
handleSegmentEvent(event)
}
}
},
configurations.maxStreamRetries
)
eventSource.start()
}
Expand Down
20 changes: 18 additions & 2 deletions src/stream.ts
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,8 @@ export class Streamer {
private readTimeoutCheckerId: any
private connectionOpened = false
private disconnectEventEmitted = false
private reconnectAttempts = 0
private reconnectAttempts = 0
private retriesExhausted: boolean = false

constructor(
private eventBus: Emitter,
Expand All @@ -22,7 +23,8 @@ export class Streamer {
private fallbackPoller: Poller,
private logDebug: (...data: any[]) => void,
private logError: (...data: any[]) => void,
private eventCallback: (e: StreamEvent) => void
private eventCallback: (e: StreamEvent) => void,
private maxRetries: number
) {}

start() {
Expand Down Expand Up @@ -60,10 +62,24 @@ export class Streamer {
)
}

if (this.reconnectAttempts >= this.maxRetries) {
this.retriesExhausted = true
if (this.configurations.pollingEnabled) {
this.logErrorMessage('Max streaming retries reached. Staying in polling mode.')
} else {
this.logErrorMessage('Max streaming retries reached.')
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Wondering would it be a good idea to indicate that polling will also be stopped in this else statement log so that users would know they're gonna stop receiving updates?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks - good catch.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Updated

}
return
}

setTimeout(() => this.start(), reconnectDelayMs)
}

const onFailed = (msg: string) => {
if (this.retriesExhausted) {
return
}

if (!!msg) {
this.logDebugMessage('Stream has issue', msg)
}
Expand Down
6 changes: 6 additions & 0 deletions src/types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -144,6 +144,12 @@ export interface Options {
* @default console
*/
logger?: Logger

/**
* By default, the stream will attempt to reconnect indefinitely if it disconnects. Use this option to limit
* the number of attempts it will make.
*/
maxStreamRetries?: number
}

export interface MetricsInfo {
Expand Down
3 changes: 2 additions & 1 deletion src/utils.ts
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,8 @@ export const defaultOptions: Options = {
eventsSyncInterval: MIN_EVENTS_SYNC_INTERVAL,
pollingInterval: MIN_POLLING_INTERVAL,
streamEnabled: true,
cache: false
cache: false,
maxStreamRetries: Infinity
}

export const getConfiguration = (options: Options): Options => {
Expand Down