@@ -30,11 +30,11 @@ const (
30
30
)
31
31
32
32
var (
33
- globalExcludes []glob.Glob
34
- formatters map [string ]* format.Formatter
35
- pipelines map [ string ] * format. Pipeline
36
- filesCh chan * walk.File
37
- processedCh chan * walk.File
33
+ excludes []glob.Glob
34
+ formatters map [string ]* format.Formatter
35
+
36
+ filesCh chan * walk.File
37
+ processedCh chan * walk.File
38
38
39
39
ErrFailOnChange = errors .New ("unexpected changes detected, --fail-on-change is enabled" )
40
40
)
@@ -73,45 +73,25 @@ func (f *Format) Run() (err error) {
73
73
}
74
74
75
75
// compile global exclude globs
76
- if globalExcludes , err = format .CompileGlobs (cfg .Global .Excludes ); err != nil {
76
+ if excludes , err = format .CompileGlobs (cfg .Global .Excludes ); err != nil {
77
77
return fmt .Errorf ("failed to compile global excludes: %w" , err )
78
78
}
79
79
80
- // initialise pipelines
81
- pipelines = make (map [string ]* format.Pipeline )
80
+ // initialise formatters
82
81
formatters = make (map [string ]* format.Formatter )
83
82
84
- // iterate the formatters in lexicographical order
85
- for _ , name := range cfg .Names {
86
- // init formatter
87
- formatterCfg := cfg .Formatters [name ]
88
- formatter , err := format .NewFormatter (name , Cli .TreeRoot , formatterCfg , globalExcludes )
83
+ for name , formatterCfg := range cfg .Formatters {
84
+ formatter , err := format .NewFormatter (name , Cli .TreeRoot , formatterCfg , excludes )
85
+
89
86
if errors .Is (err , format .ErrCommandNotFound ) && Cli .AllowMissingFormatter {
90
- log .Debugf ("formatter not found: %v" , name )
87
+ log .Debugf ("formatter command not found: %v" , name )
91
88
continue
92
89
} else if err != nil {
93
90
return fmt .Errorf ("%w: failed to initialise formatter: %v" , err , name )
94
91
}
95
92
96
93
// store formatter by name
97
94
formatters [name ] = formatter
98
-
99
- // If no pipeline is configured, we add the formatter to a nominal pipeline of size 1 with the key being the
100
- // formatter's name. If a pipeline is configured, we add the formatter to a pipeline keyed by
101
- // 'p:<pipeline_name>' in which it is sorted by priority.
102
- if formatterCfg .Pipeline == "" {
103
- pipeline := format.Pipeline {}
104
- pipeline .Add (formatter )
105
- pipelines [name ] = & pipeline
106
- } else {
107
- key := fmt .Sprintf ("p:%s" , formatterCfg .Pipeline )
108
- pipeline , ok := pipelines [key ]
109
- if ! ok {
110
- pipeline = & format.Pipeline {}
111
- pipelines [key ] = pipeline
112
- }
113
- pipeline .Add (formatter )
114
- }
115
95
}
116
96
117
97
// open the cache
@@ -304,37 +284,43 @@ func walkFilesystem(ctx context.Context) func() error {
304
284
func applyFormatters (ctx context.Context ) func () error {
305
285
// create our own errgroup for concurrent formatting tasks
306
286
fg , ctx := errgroup .WithContext (ctx )
287
+ // simple optimization to avoid too many concurrent formatting tasks
288
+ // we can queue them up faster than the formatters can process them, this paces things a bit
289
+ fg .SetLimit (runtime .NumCPU ())
307
290
308
- // pre-initialise batches keyed by pipeline
309
- batches := make (map [string ][]* walk.File )
310
- for key := range pipelines {
311
- batches [key ] = make ([]* walk.File , 0 , BatchSize )
312
- }
313
-
314
- // for a given pipeline key, add the provided file to the current batch and trigger a format if the batch size has
315
- // been reached
316
- tryApply := func (key string , file * walk.File ) {
317
- // append to batch
318
- batches [key ] = append (batches [key ], file )
291
+ // track batches of formatting task based on their batch keys, which are determined by the unique sequence of
292
+ // formatters which should be applied to their respective files
293
+ batches := make (map [string ][]* format.Task )
319
294
320
- // check if the batch is full
295
+ apply := func (key string , flush bool ) {
296
+ // lookup the batch and exit early if it's empty
321
297
batch := batches [key ]
322
- if len (batch ) == BatchSize {
323
- // get the pipeline
324
- pipeline := pipelines [key ]
298
+ if len (batch ) == 0 {
299
+ return
300
+ }
301
+
302
+ // process the batch if it's full, or we've been asked to flush partial batches
303
+ if flush || len (batch ) == BatchSize {
325
304
326
- // copy the batch
327
- files := make ([]* walk. File , len (batch ))
328
- copy (files , batch )
305
+ // copy the batch as we re-use it for the next batch
306
+ tasks := make ([]* format. Task , len (batch ))
307
+ copy (tasks , batch )
329
308
330
- // apply to the pipeline
309
+ // asynchronously apply the sequence formatters to the batch
331
310
fg .Go (func () error {
332
- if err := pipeline .Apply (ctx , files ); err != nil {
333
- return err
311
+ // iterate the formatters, applying them in sequence to the batch of tasks
312
+ // we get the formatters list from the first task since they have all the same formatters list
313
+ for _ , f := range tasks [0 ].Formatters {
314
+ if err := f .Apply (ctx , tasks ); err != nil {
315
+ return err
316
+ }
334
317
}
335
- for _ , path := range files {
336
- processedCh <- path
318
+
319
+ // pass each file to the processed channel
320
+ for _ , task := range tasks {
321
+ processedCh <- task .File
337
322
}
323
+
338
324
return nil
339
325
})
340
326
@@ -343,25 +329,12 @@ func applyFormatters(ctx context.Context) func() error {
343
329
}
344
330
}
345
331
346
- // format any partial batches
347
- flushBatches := func () {
348
- for key , pipeline := range pipelines {
349
-
350
- batch := batches [key ]
351
- pipeline := pipeline // capture for closure
352
-
353
- if len (batch ) > 0 {
354
- fg .Go (func () error {
355
- if err := pipeline .Apply (ctx , batch ); err != nil {
356
- return fmt .Errorf ("%s failure: %w" , key , err )
357
- }
358
- for _ , path := range batch {
359
- processedCh <- path
360
- }
361
- return nil
362
- })
363
- }
364
- }
332
+ tryApply := func (task * format.Task ) {
333
+ // append to batch
334
+ key := task .BatchKey
335
+ batches [key ] = append (batches [key ], task )
336
+ // try to apply
337
+ apply (key , false )
365
338
}
366
339
367
340
return func () error {
@@ -370,35 +343,38 @@ func applyFormatters(ctx context.Context) func() error {
370
343
close (processedCh )
371
344
}()
372
345
373
- // iterate the files channel, checking if any pipeline wants it, and attempting to apply if so.
346
+ // iterate the files channel
374
347
for file := range filesCh {
375
- var matches []string
376
348
377
- for key , pipeline := range pipelines {
378
- if ! pipeline .Wants (file ) {
379
- continue
349
+ // determine a list of formatters that are interested in file
350
+ var matches []* format.Formatter
351
+ for _ , formatter := range formatters {
352
+ if formatter .Wants (file ) {
353
+ matches = append (matches , formatter )
380
354
}
381
- matches = append (matches , key )
382
- tryApply (key , file )
383
355
}
384
- switch len (matches ) {
385
- case 0 :
386
- log .Debugf ("no match found: %s" , file .Path )
356
+
357
+ if len (matches ) == 0 {
387
358
// no match, so we send it direct to the processed channel
359
+ log .Debugf ("no match found: %s" , file .Path )
388
360
processedCh <- file
389
- case 1 :
361
+ } else {
362
+ // record the match
390
363
stats .Add (stats .Matched , 1 )
391
- default :
392
- return fmt .Errorf ("path '%s' matched multiple formatters/pipelines %v" , file .Path , matches )
364
+ // create a new format task, add it to a batch based on its batch key and try to apply if the batch is full
365
+ task := format .NewTask (file , matches )
366
+ tryApply (& task )
393
367
}
394
368
}
395
369
396
370
// flush any partial batches which remain
397
- flushBatches ()
371
+ for key := range batches {
372
+ apply (key , true )
373
+ }
398
374
399
375
// wait for all outstanding formatting tasks to complete
400
376
if err := fg .Wait (); err != nil {
401
- return fmt .Errorf ("pipeline processing failure: %w" , err )
377
+ return fmt .Errorf ("formatting failure: %w" , err )
402
378
}
403
379
return nil
404
380
}
0 commit comments