@@ -18,7 +18,7 @@ import SKSupport
18
18
/// Task metadata for `SyntacticTestIndexer.indexingQueue`
19
19
fileprivate enum TaskMetadata : DependencyTracker , Equatable {
20
20
case read
21
- case index( DocumentURI )
21
+ case index( Set < DocumentURI > )
22
22
23
23
/// Reads can be concurrent and files can be indexed concurrently. But we need to wait for all files to finish
24
24
/// indexing before reading the index.
@@ -38,12 +38,12 @@ fileprivate enum TaskMetadata: DependencyTracker, Equatable {
38
38
// This ensures that the index has been updated at least to the state of file at which the read was scheduled.
39
39
// Adding the dependency also elevates the index task's priorities.
40
40
return true
41
- case ( . index( let lhsUri ) , . index( let rhsUri ) ) :
42
- // Technically, we should be able to allow simultaneous indexing of the same file. But conceptually the code
43
- // becomes simpler if we don't need to think racing indexing tasks for the same file and it shouldn't make a
44
- // performance impact because if the same file state is indexed twice, the second one will realize that the mtime
45
- // hasn't changed and thus be a no-op .
46
- return lhsUri == rhsUri
41
+ case ( . index( let lhsUris ) , . index( let rhsUris ) ) :
42
+ // Technically, we should be able to allow simultaneous indexing of the same file. When a file gets re-scheduled
43
+ // for indexing, all previous index invocations should get cancelled. But conceptually the code becomes simpler
44
+ // if we don't need to think racing indexing tasks for the same file and it shouldn't make a performance impact
45
+ // in practice because of the cancellation described before .
46
+ return !lhsUris . intersection ( rhsUris ) . isEmpty
47
47
}
48
48
}
49
49
}
@@ -86,6 +86,13 @@ actor SyntacticTestIndex {
86
86
/// The tests discovered by the index.
87
87
private var indexedTests : [ DocumentURI : IndexedTests ] = [ : ]
88
88
89
+ /// Files that have been removed using `removeFileForIndex`.
90
+ ///
91
+ /// We need to keep track of these files because when the files get removed, there might be an in-progress indexing
92
+ /// operation running for that file. We need to ensure that this indexing operation doesn't write add the removed file
93
+ /// back to `indexTests`.
94
+ private var removedFiles : Set < DocumentURI > = [ ]
95
+
89
96
/// The queue on which the index is being updated and queried.
90
97
///
91
98
/// Tracking dependencies between tasks within this queue allows us to start indexing tasks in parallel with low
@@ -96,14 +103,7 @@ actor SyntacticTestIndex {
96
103
init ( ) { }
97
104
98
105
private func removeFilesFromIndex( _ removedFiles: Set < DocumentURI > ) {
99
- // Cancel any tasks for the removed files to ensure any pending indexing tasks don't re-add index data for the
100
- // removed files.
101
- self . indexingQueue. cancelTasks ( where: { taskMetadata in
102
- guard case . index( let uri) = taskMetadata else {
103
- return false
104
- }
105
- return removedFiles. contains ( uri)
106
- } )
106
+ self . removedFiles. formUnion ( removedFiles)
107
107
for removedFile in removedFiles {
108
108
self . indexedTests [ removedFile] = nil
109
109
}
@@ -112,14 +112,11 @@ actor SyntacticTestIndex {
112
112
/// Called when the list of files that may contain tests is updated.
113
113
///
114
114
/// All files that are not in the new list of test files will be removed from the index.
115
- func listOfTestFilesDidChange( _ testFiles: Set < DocumentURI > ) {
116
- let testFiles = Set ( testFiles)
117
- let removedFiles = Set ( self . indexedTests. keys. filter { !testFiles. contains ( $0) } )
115
+ func listOfTestFilesDidChange( _ testFiles: [ DocumentURI ] ) {
116
+ let removedFiles = Set ( self . indexedTests. keys) . subtracting ( testFiles)
118
117
removeFilesFromIndex ( removedFiles)
119
118
120
- for testFile in testFiles {
121
- rescanFile ( testFile)
122
- }
119
+ rescanFiles ( testFiles)
123
120
}
124
121
125
122
func filesDidChange( _ events: [ FileEvent ] ) {
@@ -130,7 +127,7 @@ actor SyntacticTestIndex {
130
127
// `listOfTestFilesDidChange`
131
128
break
132
129
case . changed:
133
- rescanFile ( fileEvent. uri)
130
+ rescanFiles ( [ fileEvent. uri] )
134
131
case . deleted:
135
132
removeFilesFromIndex ( [ fileEvent. uri] )
136
133
default :
@@ -139,42 +136,61 @@ actor SyntacticTestIndex {
139
136
}
140
137
}
141
138
142
- /// Called when a single file was updated. Just re-scans that file.
143
- private func rescanFile( _ uri: DocumentURI ) {
144
- self . indexingQueue. async ( priority: . low, metadata: . index( uri) ) {
145
- guard let url = uri. fileURL else {
146
- logger. log ( " Not indexing \( uri. forLogging) for swift-testing tests because it is not a file URL " )
147
- return
148
- }
149
- if Task . isCancelled {
150
- return
151
- }
152
- guard
153
- let fileModificationDate = try ? FileManager . default. attributesOfItem ( atPath: url. path) [ . modificationDate]
154
- as? Date
155
- else {
156
- logger. fault ( " Not indexing \( uri. forLogging) for tests because the modification date could not be determined " )
157
- return
139
+ /// Called when a list of files was updated. Re-scans those files
140
+ private func rescanFiles( _ uris: [ DocumentURI ] ) {
141
+ // Divide the files into multiple batches. This is more efficient than spawning a new task for every file, mostly
142
+ // because it keeps the number of pending items in `indexingQueue` low and adding a new task to `indexingQueue` is
143
+ // in O(number of pending tasks), since we need to scan for dependency edges to add, which would make scanning files
144
+ // be O(number of files).
145
+ // Over-subscribe the processor count in case one batch finishes more quickly than another.
146
+ let batches = uris. partition ( intoNumberOfBatches: ProcessInfo . processInfo. processorCount * 4 )
147
+ for batch in batches {
148
+ self . indexingQueue. async ( priority: . low, metadata: . index( Set ( batch) ) ) {
149
+ for uri in batch {
150
+ await self . rescanFileAssumingOnQueue ( uri)
151
+ }
158
152
}
159
- if let indexModificationDate = self . indexedTests [ uri] ? . sourceFileModificationDate,
160
- indexModificationDate >= fileModificationDate
161
- {
162
- // Index already up to date.
163
- return
164
- }
165
- if Task . isCancelled {
166
- return
167
- }
168
- let testItems = await testItems ( in: url)
153
+ }
154
+ }
169
155
170
- if Task . isCancelled {
171
- // This `isCancelled` check is essential for correctness. When `testFilesDidChange` is called, it cancels all
172
- // indexing tasks for files that have been removed. If we didn't have this check, an index task that was already
173
- // started might add the file back into `indexedTests`.
174
- return
175
- }
176
- self . indexedTests [ uri ] = IndexedTests ( tests : testItems , sourceFileModificationDate : fileModificationDate )
156
+ /// Re-scans a single file.
157
+ ///
158
+ /// - Important: This method must be called in a task that is executing on `indexingQueue`.
159
+ private func rescanFileAssumingOnQueue ( _ uri : DocumentURI ) async {
160
+ guard let url = uri . fileURL else {
161
+ logger . log ( " Not indexing \( uri . forLogging ) for swift-testing tests because it is not a file URL " )
162
+ return
177
163
}
164
+ if Task . isCancelled {
165
+ return
166
+ }
167
+ guard !removedFiles. contains ( uri) else {
168
+ return
169
+ }
170
+ guard
171
+ let fileModificationDate = try ? FileManager . default. attributesOfItem ( atPath: url. path) [ . modificationDate]
172
+ as? Date
173
+ else {
174
+ logger. fault ( " Not indexing \( uri. forLogging) for tests because the modification date could not be determined " )
175
+ return
176
+ }
177
+ if let indexModificationDate = self . indexedTests [ uri] ? . sourceFileModificationDate,
178
+ indexModificationDate >= fileModificationDate
179
+ {
180
+ // Index already up to date.
181
+ return
182
+ }
183
+ if Task . isCancelled {
184
+ return
185
+ }
186
+ let testItems = await testItems ( in: url)
187
+
188
+ guard !removedFiles. contains ( uri) else {
189
+ // Check whether the file got removed while we were scanning it for tests. If so, don't add it back to
190
+ // `indexedTests`.
191
+ return
192
+ }
193
+ self . indexedTests [ uri] = IndexedTests ( tests: testItems, sourceFileModificationDate: fileModificationDate)
178
194
}
179
195
180
196
/// Gets all the tests in the syntactic index.
@@ -187,3 +203,27 @@ actor SyntacticTestIndex {
187
203
return await readTask. value
188
204
}
189
205
}
206
+
207
+ fileprivate extension Collection {
208
+ /// Partition the elements of the collection into `numberOfBatches` roughly equally sized batches.
209
+ ///
210
+ /// Elements are assigned to the batches round-robin. This ensures that elements that are close to each other in the
211
+ /// original collection end up in different batches. This is important because eg. test files will live close to each
212
+ /// other in the file system and test scanning wants to scan them in different batches so we don't end up with one
213
+ /// batch only containing source files and the other only containing test files.
214
+ func partition( intoNumberOfBatches numberOfBatches: Int ) -> [ [ Element ] ] {
215
+ var batches : [ [ Element ] ] = Array (
216
+ repeating: {
217
+ var batch : [ Element ] = [ ]
218
+ batch. reserveCapacity ( self . count / numberOfBatches)
219
+ return batch
220
+ } ( ) ,
221
+ count: numberOfBatches
222
+ )
223
+
224
+ for (index, element) in self . enumerated ( ) {
225
+ batches [ index % numberOfBatches] . append ( element)
226
+ }
227
+ return batches. filter { !$0. isEmpty }
228
+ }
229
+ }
0 commit comments