-
Notifications
You must be signed in to change notification settings - Fork 83
[FSSDK-10642] Refactor batch event processor #960
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Merged
Merged
Changes from 41 commits
Commits
Show all changes
45 commits
Select commit
Hold shift + click to select a range
83a7b1b
inital commit: queueing event processor
raju-opti 9a139e4
saving
raju-opti be3cbd3
s
raju-opti e1f41ce
saving
raju-opti 9c82bee
save
raju-opti 4a28d26
saving
raju-opti 5fcafc1
saving
raju-opti 6769148
update
raju-opti 7f13bec
update
raju-opti 6bed2a1
update
raju-opti 66b217f
upd
raju-opti bc7fcd7
test
raju-opti 3292886
tests
raju-opti 78908c6
tests
raju-opti 6754a1d
up
raju-opti 5bf1eb8
update
raju-opti 51dd143
up
raju-opti b1cf28f
up
raju-opti 1171325
u
raju-opti 4eb8910
queing processor: wip updates
raju-opti 4cebabc
update
raju-opti a2b7fe8
up
raju-opti 37aba34
update
raju-opti 48c9e1d
upd
raju-opti ee194ae
retry runner tests
raju-opti 227c7b0
rem
raju-opti 20771dc
more test
raju-opti e92f027
test
raju-opti e1df4a3
more test
raju-opti dbd3b59
bak
raju-opti 6a64167
more tests
raju-opti 2c411ca
new test
raju-opti 6032ba0
factory test
raju-opti 5b04476
cleanup
raju-opti 8727c9b
factory test
raju-opti 851da33
test
raju-opti 7bb5c64
up
raju-opti 8d73fad
react naive
raju-opti 59b678b
node test
raju-opti 9d684fe
more test
raju-opti 96400ee
up
raju-opti d40119a
review
raju-opti 5daa4e0
up
raju-opti 372a1bf
up
raju-opti a51ff8d
up
raju-opti File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
155 changes: 155 additions & 0 deletions
155
lib/event_processor/batch_event_processor.react_native.spec.ts
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,155 @@ | ||
import { vi, describe, it, expect, beforeEach } from 'vitest'; | ||
|
||
const mockNetInfo = vi.hoisted(() => { | ||
const netInfo = { | ||
listeners: [], | ||
unsubs: [], | ||
addEventListener(fn: any) { | ||
this.listeners.push(fn); | ||
const unsub = vi.fn(); | ||
this.unsubs.push(unsub); | ||
return unsub; | ||
}, | ||
pushState(state: boolean) { | ||
for (const listener of this.listeners) { | ||
listener({ isInternetReachable: state }); | ||
} | ||
}, | ||
clear() { | ||
this.listeners = []; | ||
this.unsubs = []; | ||
} | ||
}; | ||
return netInfo; | ||
}); | ||
|
||
vi.mock('@react-native-community/netinfo', () => { | ||
return { | ||
addEventListener: mockNetInfo.addEventListener.bind(mockNetInfo), | ||
}; | ||
}); | ||
|
||
import { ReactNativeNetInfoEventProcessor } from './batch_event_processor.react_native'; | ||
import { getMockLogger } from '../tests/mock/mock_logger'; | ||
import { getMockRepeater } from '../tests/mock/mock_repeater'; | ||
import { getMockAsyncCache } from '../tests/mock/mock_cache'; | ||
|
||
import { EventWithId } from './batch_event_processor'; | ||
import { EventDispatcher } from './eventDispatcher'; | ||
import { formatEvents } from './v1/buildEventV1'; | ||
import { createImpressionEvent } from '../tests/mock/create_event'; | ||
import { ProcessableEvent } from './eventProcessor'; | ||
|
||
const getMockDispatcher = () => { | ||
return { | ||
dispatchEvent: vi.fn(), | ||
}; | ||
}; | ||
|
||
const exhaustMicrotasks = async (loop = 100) => { | ||
for(let i = 0; i < loop; i++) { | ||
await Promise.resolve(); | ||
} | ||
} | ||
|
||
|
||
describe('ReactNativeNetInfoEventProcessor', () => { | ||
beforeEach(() => { | ||
mockNetInfo.clear(); | ||
}); | ||
|
||
it('should not retry failed events when reachable state does not change', async () => { | ||
const eventDispatcher = getMockDispatcher(); | ||
const dispatchRepeater = getMockRepeater(); | ||
const failedEventRepeater = getMockRepeater(); | ||
|
||
const cache = getMockAsyncCache<EventWithId>(); | ||
const events: ProcessableEvent[] = []; | ||
|
||
for(let i = 0; i < 5; i++) { | ||
const id = `id-${i}`; | ||
const event = createImpressionEvent(id); | ||
events.push(event); | ||
await cache.set(id, { id, event }); | ||
} | ||
|
||
const processor = new ReactNativeNetInfoEventProcessor({ | ||
eventDispatcher, | ||
dispatchRepeater, | ||
failedEventRepeater, | ||
batchSize: 1000, | ||
eventStore: cache, | ||
}); | ||
|
||
processor.start(); | ||
await processor.onRunning(); | ||
|
||
mockNetInfo.pushState(true); | ||
expect(eventDispatcher.dispatchEvent).not.toHaveBeenCalled(); | ||
|
||
mockNetInfo.pushState(true); | ||
expect(eventDispatcher.dispatchEvent).not.toHaveBeenCalled(); | ||
}); | ||
|
||
it('should retry failed events when network becomes reachable', async () => { | ||
const eventDispatcher = getMockDispatcher(); | ||
const dispatchRepeater = getMockRepeater(); | ||
const failedEventRepeater = getMockRepeater(); | ||
|
||
const cache = getMockAsyncCache<EventWithId>(); | ||
const events: ProcessableEvent[] = []; | ||
|
||
for(let i = 0; i < 5; i++) { | ||
const id = `id-${i}`; | ||
const event = createImpressionEvent(id); | ||
events.push(event); | ||
await cache.set(id, { id, event }); | ||
} | ||
|
||
const processor = new ReactNativeNetInfoEventProcessor({ | ||
eventDispatcher, | ||
dispatchRepeater, | ||
failedEventRepeater, | ||
batchSize: 1000, | ||
eventStore: cache, | ||
}); | ||
|
||
processor.start(); | ||
await processor.onRunning(); | ||
|
||
mockNetInfo.pushState(false); | ||
expect(eventDispatcher.dispatchEvent).not.toHaveBeenCalled(); | ||
|
||
mockNetInfo.pushState(true); | ||
|
||
await exhaustMicrotasks(); | ||
|
||
expect(eventDispatcher.dispatchEvent).toHaveBeenCalledWith(formatEvents(events)); | ||
}); | ||
|
||
it('should unsubscribe from netinfo listener when stopped', async () => { | ||
const eventDispatcher = getMockDispatcher(); | ||
const dispatchRepeater = getMockRepeater(); | ||
const failedEventRepeater = getMockRepeater(); | ||
|
||
const cache = getMockAsyncCache<EventWithId>(); | ||
|
||
const processor = new ReactNativeNetInfoEventProcessor({ | ||
eventDispatcher, | ||
dispatchRepeater, | ||
failedEventRepeater, | ||
batchSize: 1000, | ||
eventStore: cache, | ||
}); | ||
|
||
processor.start(); | ||
await processor.onRunning(); | ||
|
||
mockNetInfo.pushState(false); | ||
|
||
processor.stop(); | ||
await processor.onTerminated(); | ||
|
||
expect(mockNetInfo.unsubs[0]).toHaveBeenCalled(); | ||
}); | ||
}); |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,40 @@ | ||
import { | ||
NetInfoState, | ||
addEventListener as addConnectionListener, | ||
} from '@react-native-community/netinfo'; | ||
|
||
import { BatchEventProcessor, BatchEventProcessorConfig } from './batch_event_processor'; | ||
import { Fn } from '../utils/type'; | ||
|
||
export class ReactNativeNetInfoEventProcessor extends BatchEventProcessor { | ||
private isInternetReachable = true; | ||
private unsubscribeNetInfo?: Fn; | ||
|
||
constructor(config: BatchEventProcessorConfig) { | ||
super(config); | ||
} | ||
|
||
private async connectionListener(state: NetInfoState) { | ||
if (this.isInternetReachable && !state.isInternetReachable) { | ||
this.isInternetReachable = false; | ||
return; | ||
} | ||
|
||
if (!this.isInternetReachable && state.isInternetReachable) { | ||
this.isInternetReachable = true; | ||
this.retryFailedEvents(); | ||
} | ||
} | ||
|
||
start(): void { | ||
super.start(); | ||
this.unsubscribeNetInfo = addConnectionListener(this.connectionListener.bind(this)); | ||
} | ||
|
||
stop(): void { | ||
if (this.unsubscribeNetInfo) { | ||
this.unsubscribeNetInfo(); | ||
} | ||
super.stop(); | ||
} | ||
} |
Oops, something went wrong.
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
Uh oh!
There was an error while loading. Please reload this page.