Skip to content

Commit ff0131b

Browse files
authored
feat(query-core): refetchMode “replace” for streamedQuery (#9092)
* refetchmode replace * feat: refetchMode replace for streamedQuery * test: should allow arrays
1 parent a9fe406 commit ff0131b

File tree

3 files changed

+161
-29
lines changed

3 files changed

+161
-29
lines changed

docs/reference/streamedQuery.md

+7-5
Original file line numberDiff line numberDiff line change
@@ -26,8 +26,10 @@ const query = queryOptions({
2626
- **Required**
2727
- The function that returns a Promise of an AsyncIterable of data to stream in.
2828
- Receives a [QueryFunctionContext](../guides/query-functions.md#queryfunctioncontext)
29-
- `refetchMode?: 'append' | 'reset'`
30-
- optional
31-
- when set to `'reset'`, the query will erase all data and go back into `pending` state when a refetch occurs.
32-
- when set to `'append'`, data will be appended on a refetch.
33-
- defaults to `'reset'`
29+
- `refetchMode?: 'append' | 'reset' | 'replace`
30+
- Optional
31+
- Defines how refetches are handled.
32+
- Defaults to `'reset'`
33+
- When set to `'reset'`, the query will erase all data and go back into `pending` state.
34+
- When set to `'append'`, data will be appended to existing data.
35+
- When set to `'replace'`, data will be written to the cache at the end of the stream.

packages/query-core/src/__tests__/streamedQuery.test.tsx

+116-3
Original file line numberDiff line numberDiff line change
@@ -18,11 +18,11 @@ describe('streamedQuery', () => {
1818
vi.useRealTimers()
1919
})
2020

21-
function createAsyncNumberGenerator(amount: number) {
21+
function createAsyncNumberGenerator(amount: number, start = 0) {
2222
return {
2323
async *[Symbol.asyncIterator]() {
24-
let num = 0
25-
while (num < amount) {
24+
let num = start
25+
while (num < amount + start) {
2626
await sleep(50)
2727
yield num++
2828
}
@@ -74,6 +74,61 @@ describe('streamedQuery', () => {
7474
unsubscribe()
7575
})
7676

77+
test('should allow Arrays to be returned from the stream', async () => {
78+
const key = queryKey()
79+
const observer = new QueryObserver(queryClient, {
80+
queryKey: key,
81+
queryFn: streamedQuery({
82+
queryFn: async function* () {
83+
for await (const num of createAsyncNumberGenerator(3)) {
84+
yield [num, num] as const
85+
}
86+
},
87+
}),
88+
})
89+
90+
const unsubscribe = observer.subscribe(vi.fn())
91+
92+
expect(observer.getCurrentResult()).toMatchObject({
93+
status: 'pending',
94+
fetchStatus: 'fetching',
95+
data: undefined,
96+
})
97+
98+
await vi.advanceTimersByTimeAsync(50)
99+
100+
expect(observer.getCurrentResult()).toMatchObject({
101+
status: 'success',
102+
fetchStatus: 'fetching',
103+
data: [[0, 0]],
104+
})
105+
106+
await vi.advanceTimersByTimeAsync(50)
107+
108+
expect(observer.getCurrentResult()).toMatchObject({
109+
status: 'success',
110+
fetchStatus: 'fetching',
111+
data: [
112+
[0, 0],
113+
[1, 1],
114+
],
115+
})
116+
117+
await vi.advanceTimersByTimeAsync(50)
118+
119+
expect(observer.getCurrentResult()).toMatchObject({
120+
status: 'success',
121+
fetchStatus: 'idle',
122+
data: [
123+
[0, 0],
124+
[1, 1],
125+
[2, 2],
126+
],
127+
})
128+
129+
unsubscribe()
130+
})
131+
77132
test('should replace on refetch', async () => {
78133
const key = queryKey()
79134
const observer = new QueryObserver(queryClient, {
@@ -183,6 +238,64 @@ describe('streamedQuery', () => {
183238
unsubscribe()
184239
})
185240

241+
test('should support refetchMode replace', async () => {
242+
const key = queryKey()
243+
let offset = 0
244+
const observer = new QueryObserver(queryClient, {
245+
queryKey: key,
246+
queryFn: streamedQuery({
247+
queryFn: () => createAsyncNumberGenerator(2, offset),
248+
refetchMode: 'replace',
249+
}),
250+
})
251+
252+
const unsubscribe = observer.subscribe(vi.fn())
253+
254+
expect(observer.getCurrentResult()).toMatchObject({
255+
status: 'pending',
256+
fetchStatus: 'fetching',
257+
data: undefined,
258+
})
259+
260+
await vi.advanceTimersByTimeAsync(100)
261+
262+
expect(observer.getCurrentResult()).toMatchObject({
263+
status: 'success',
264+
fetchStatus: 'idle',
265+
data: [0, 1],
266+
})
267+
268+
offset = 100
269+
270+
void observer.refetch()
271+
272+
await vi.advanceTimersByTimeAsync(10)
273+
274+
expect(observer.getCurrentResult()).toMatchObject({
275+
status: 'success',
276+
fetchStatus: 'fetching',
277+
data: [0, 1],
278+
})
279+
280+
await vi.advanceTimersByTimeAsync(40)
281+
282+
expect(observer.getCurrentResult()).toMatchObject({
283+
status: 'success',
284+
fetchStatus: 'fetching',
285+
data: [0, 1],
286+
})
287+
288+
await vi.advanceTimersByTimeAsync(50)
289+
290+
expect(observer.getCurrentResult()).toMatchObject({
291+
status: 'success',
292+
fetchStatus: 'idle',
293+
data: [100, 101],
294+
})
295+
296+
unsubscribe()
297+
})
298+
186299
test('should abort ongoing stream when refetch happens', async () => {
187300
const key = queryKey()
188301
const observer = new QueryObserver(queryClient, {

packages/query-core/src/streamedQuery.ts

+38-21
Original file line numberDiff line numberDiff line change
@@ -6,46 +6,63 @@ import type { QueryFunction, QueryFunctionContext, QueryKey } from './types'
66
* The query will be in a 'pending' state until the first chunk of data is received, but will go to 'success' after that.
77
* The query will stay in fetchStatus 'fetching' until the stream ends.
88
* @param queryFn - The function that returns an AsyncIterable to stream data from.
9-
* @param refetchMode - Defaults to 'reset', which replaces data when a refetch happens. Set to 'append' to append new data to the existing data.
9+
* @param refetchMode - Defines how re-fetches are handled.
10+
* Defaults to `'reset'`, erases all data and puts the query back into `pending` state.
11+
* Set to `'append'` to append new data to the existing data.
12+
* Set to `'replace'` to write the data to the cache at the end of the stream.
1013
*/
1114
export function streamedQuery<
1215
TQueryFnData = unknown,
1316
TQueryKey extends QueryKey = QueryKey,
1417
>({
1518
queryFn,
16-
refetchMode,
19+
refetchMode = 'reset',
1720
}: {
1821
queryFn: (
1922
context: QueryFunctionContext<TQueryKey>,
2023
) => AsyncIterable<TQueryFnData> | Promise<AsyncIterable<TQueryFnData>>
21-
refetchMode?: 'append' | 'reset'
24+
refetchMode?: 'append' | 'reset' | 'replace'
2225
}): QueryFunction<Array<TQueryFnData>, TQueryKey> {
2326
return async (context) => {
24-
if (refetchMode !== 'append') {
25-
const query = context.client
26-
.getQueryCache()
27-
.find({ queryKey: context.queryKey, exact: true })
28-
if (query && query.state.data !== undefined) {
29-
query.setState({
30-
status: 'pending',
31-
data: undefined,
32-
error: null,
33-
fetchStatus: 'fetching',
34-
})
35-
}
27+
const query = context.client
28+
.getQueryCache()
29+
.find({ queryKey: context.queryKey, exact: true })
30+
const isRefetch = !!query && query.state.data !== undefined
31+
32+
if (isRefetch && refetchMode === 'reset') {
33+
query.setState({
34+
status: 'pending',
35+
data: undefined,
36+
error: null,
37+
fetchStatus: 'fetching',
38+
})
3639
}
40+
41+
const result: Array<TQueryFnData> = []
3742
const stream = await queryFn(context)
43+
3844
for await (const chunk of stream) {
3945
if (context.signal.aborted) {
4046
break
4147
}
42-
context.client.setQueryData<Array<TQueryFnData>>(
43-
context.queryKey,
44-
(prev = []) => {
45-
return prev.concat(chunk)
46-
},
47-
)
48+
49+
// don't append to the cache directly when replace-refetching
50+
if (!isRefetch || refetchMode !== 'replace') {
51+
context.client.setQueryData<Array<TQueryFnData>>(
52+
context.queryKey,
53+
(prev = []) => {
54+
return prev.concat([chunk])
55+
},
56+
)
57+
}
58+
result.push(chunk)
59+
}
60+
61+
// finalize result: replace-refetching needs to write to the cache
62+
if (isRefetch && refetchMode === 'replace' && !context.signal.aborted) {
63+
context.client.setQueryData<Array<TQueryFnData>>(context.queryKey, result)
4864
}
65+
4966
return context.client.getQueryData(context.queryKey)!
5067
}
5168
}

0 commit comments

Comments
 (0)