@@ -184,10 +184,10 @@ func (f *Format) Run() (err error) {
184
184
f .filesCh = make (chan * walk.File , BatchSize * runtime .NumCPU ())
185
185
186
186
// create a channel for files that have been formatted
187
- f .formattedCh = make (chan * walk. File , cap (f .filesCh ))
187
+ f .formattedCh = make (chan * format. Task , cap (f .filesCh ))
188
188
189
189
// create a channel for files that have been processed
190
- f .processedCh = make (chan * walk. File , cap (f .filesCh ))
190
+ f .processedCh = make (chan * format. Task , cap (f .filesCh ))
191
191
192
192
// start concurrent processing tasks in reverse order
193
193
eg .Go (f .updateCache (ctx ))
@@ -317,17 +317,21 @@ func (f *Format) applyFormatters(ctx context.Context) func() error {
317
317
318
318
// asynchronously apply the sequence formatters to the batch
319
319
fg .Go (func () error {
320
- // iterate the formatters, applying them in sequence to the batch of tasks
321
- // we get the formatters list from the first task since they have all the same formatters list
322
- for _ , f := range tasks [0 ].Formatters {
323
- if err := f .Apply (ctx , tasks ); err != nil {
324
- return err
320
+ // Iterate the formatters, applying them in sequence to the batch of tasks.
321
+ // We get the formatter list from the first task since they have all the same formatters list.
322
+ formatters := tasks [0 ].Formatters
323
+
324
+ var formatErrors []error
325
+ for idx := range formatters {
326
+ if err := formatters [idx ].Apply (ctx , tasks ); err != nil {
327
+ formatErrors = append (formatErrors , err )
325
328
}
326
329
}
327
330
328
331
// pass each file to the formatted channel
329
332
for _ , task := range tasks {
330
- f .formattedCh <- task .File
333
+ task .Errors = formatErrors
334
+ f .formattedCh <- task
331
335
}
332
336
333
337
return nil
@@ -359,7 +363,9 @@ func (f *Format) applyFormatters(ctx context.Context) func() error {
359
363
if format .PathMatches (file .RelPath , f .globalExcludes ) {
360
364
log .Debugf ("path matched global excludes: %s" , file .RelPath )
361
365
// mark it as processed and continue to the next
362
- f .formattedCh <- file
366
+ f .formattedCh <- & format.Task {
367
+ File : file ,
368
+ }
363
369
continue
364
370
}
365
371
@@ -378,7 +384,9 @@ func (f *Format) applyFormatters(ctx context.Context) func() error {
378
384
}
379
385
log .Logf (f .OnUnmatched , "no formatter for path: %s" , file .RelPath )
380
386
// mark it as processed and continue to the next
381
- f .formattedCh <- file
387
+ f .formattedCh <- & format.Task {
388
+ File : file ,
389
+ }
382
390
} else {
383
391
// record the match
384
392
stats .Add (stats .Matched , 1 )
@@ -414,14 +422,15 @@ func (f *Format) detectFormatted(ctx context.Context) func() error {
414
422
// detect ctx cancellation
415
423
case <- ctx .Done ():
416
424
return ctx .Err ()
417
- // take the next file that has been processed
418
- case file , ok := <- f .formattedCh :
425
+ // take the next task that has been processed
426
+ case task , ok := <- f .formattedCh :
419
427
if ! ok {
420
428
// channel has been closed, no further files to process
421
429
return nil
422
430
}
423
431
424
432
// check if the file has changed
433
+ file := task .File
425
434
changed , newInfo , err := file .HasChanged ()
426
435
if err != nil {
427
436
return err
@@ -451,7 +460,7 @@ func (f *Format) detectFormatted(ctx context.Context) func() error {
451
460
}
452
461
453
462
// mark as processed
454
- f .processedCh <- file
463
+ f .processedCh <- task
455
464
}
456
465
}
457
466
}
@@ -460,12 +469,16 @@ func (f *Format) detectFormatted(ctx context.Context) func() error {
460
469
func (f * Format ) updateCache (ctx context.Context ) func () error {
461
470
return func () error {
462
471
// used to batch updates for more efficient txs
463
- batch := make ([]* walk. File , 0 , BatchSize )
472
+ batch := make ([]* format. Task , 0 , BatchSize )
464
473
465
474
// apply a batch
466
475
processBatch := func () error {
467
476
// pass the batch to the cache for updating
468
- if err := cache .Update (batch ); err != nil {
477
+ files := make ([]* walk.File , len (batch ))
478
+ for idx := range batch {
479
+ files [idx ] = batch [idx ].File
480
+ }
481
+ if err := cache .Update (files ); err != nil {
469
482
return err
470
483
}
471
484
batch = batch [:0 ]
@@ -486,12 +499,14 @@ func (f *Format) updateCache(ctx context.Context) func() error {
486
499
case <- ctx .Done ():
487
500
return ctx .Err ()
488
501
// respond to formatted files
489
- case file , ok := <- f .processedCh :
502
+ case task , ok := <- f .processedCh :
490
503
if ! ok {
491
504
// channel has been closed, no further files to process
492
505
break LOOP
493
506
}
494
507
508
+ file := task .File
509
+
495
510
if f .Stdin {
496
511
// dump file into stdout
497
512
f , err := os .Open (file .Path )
@@ -508,11 +523,16 @@ func (f *Format) updateCache(ctx context.Context) func() error {
508
523
continue
509
524
}
510
525
511
- // append to batch and process if we have enough
512
- batch = append (batch , file )
513
- if len (batch ) == BatchSize {
514
- if err := processBatch (); err != nil {
515
- return err
526
+ // Append to batch and process if we have enough.
527
+ // We do not cache any files that were part of a pipeline in which one or more formatters failed.
528
+ // This is to ensure those files are re-processed in later invocations after the user has potentially
529
+ // resolved the issue, e.g. fixed a config problem.
530
+ if len (task .Errors ) == 0 {
531
+ batch = append (batch , task )
532
+ if len (batch ) == BatchSize {
533
+ if err := processBatch (); err != nil {
534
+ return err
535
+ }
516
536
}
517
537
}
518
538
}
0 commit comments