@@ -40,6 +40,7 @@ import (
40
40
"github.com/cockroachdb/cockroach/pkg/util/hlc"
41
41
"github.com/cockroachdb/cockroach/pkg/util/log"
42
42
"github.com/cockroachdb/cockroach/pkg/util/retry"
43
+ "github.com/cockroachdb/cockroach/pkg/util/timeutil"
43
44
"github.com/cockroachdb/cockroach/pkg/util/uuid"
44
45
"github.com/cockroachdb/errors"
45
46
"github.com/gogo/protobuf/types"
@@ -218,8 +219,8 @@ func (b *backupResumer) ResumeCompaction(
218
219
219
220
var backupManifest * backuppb.BackupManifest
220
221
updatedDetails := initialDetails
222
+ testingKnobs := execCtx .ExecCfg ().BackupRestoreTestingKnobs
221
223
if initialDetails .URI == "" {
222
- testingKnobs := execCtx .ExecCfg ().BackupRestoreTestingKnobs
223
224
if testingKnobs != nil && testingKnobs .RunBeforeResolvingCompactionDest != nil {
224
225
if err := testingKnobs .RunBeforeResolvingCompactionDest (); err != nil {
225
226
return err
@@ -334,6 +335,10 @@ func (b *backupResumer) ResumeCompaction(
334
335
}
335
336
}
336
337
338
+ if testingKnobs != nil && testingKnobs .AfterLoadingManifestOnResume != nil {
339
+ testingKnobs .AfterLoadingManifestOnResume (backupManifest )
340
+ }
341
+
337
342
// We retry on pretty generic failures -- any rpc error. If a worker node were
338
343
// to restart, it would produce this kind of error, but there may be other
339
344
// errors that are also rpc errors. Don't retry too aggressively.
@@ -342,9 +347,8 @@ func (b *backupResumer) ResumeCompaction(
342
347
MaxRetries : 5 ,
343
348
}
344
349
345
- if execCtx .ExecCfg ().BackupRestoreTestingKnobs != nil &&
346
- execCtx .ExecCfg ().BackupRestoreTestingKnobs .BackupDistSQLRetryPolicy != nil {
347
- retryOpts = * execCtx .ExecCfg ().BackupRestoreTestingKnobs .BackupDistSQLRetryPolicy
350
+ if testingKnobs != nil && testingKnobs .BackupDistSQLRetryPolicy != nil {
351
+ retryOpts = * testingKnobs .BackupDistSQLRetryPolicy
348
352
}
349
353
350
354
if err := execCtx .ExecCfg ().JobRegistry .CheckPausepoint ("backup.before.flow" ); err != nil {
@@ -376,8 +380,6 @@ func (b *backupResumer) ResumeCompaction(
376
380
377
381
// Reload the backup manifest to pick up any spans we may have completed on
378
382
// previous attempts.
379
- // TODO (kev-cao): Compactions currently do not create checkpoints, but this
380
- // can be used to reload the manifest once we add checkpointing.
381
383
var reloadBackupErr error
382
384
mem .Shrink (ctx , memSize )
383
385
backupManifest , memSize , reloadBackupErr = b .readManifestOnResume (ctx , & mem , execCtx .ExecCfg (),
@@ -752,9 +754,13 @@ func concludeBackupCompaction(
752
754
// the associated manifest.
753
755
func processProgress (
754
756
ctx context.Context ,
757
+ execCtx sql.JobExecContext ,
758
+ details jobspb.BackupDetails ,
755
759
manifest * backuppb.BackupManifest ,
756
760
progCh <- chan * execinfrapb.RemoteProducerMetadata_BulkProcessorProgress ,
761
+ kmsEnv cloud.KMSEnv ,
757
762
) error {
763
+ var lastCheckpointTime time.Time
758
764
// When a processor is done exporting a span, it will send a progress update
759
765
// to progCh.
760
766
for progress := range progCh {
@@ -763,15 +769,22 @@ func processProgress(
763
769
log .Errorf (ctx , "unable to unmarshal backup progress details: %+v" , err )
764
770
return err
765
771
}
766
- for _ , file := range progDetails .Files {
767
- manifest .Files = append (manifest .Files , file )
768
- manifest .EntryCounts .Add (file .EntryCounts )
769
- }
772
+ updateManifestWithProgress (progDetails , manifest )
773
+
770
774
// TODO (kev-cao): Add per node progress updates.
775
+
776
+ if wroteCheckpoint , err := maybeWriteBackupCheckpoint (
777
+ ctx , execCtx , details , manifest , lastCheckpointTime , kmsEnv ,
778
+ ); err != nil {
779
+ log .Errorf (ctx , "unable to checkpoint compaction: %+v" , err )
780
+ } else if wroteCheckpoint {
781
+ lastCheckpointTime = timeutil .Now ()
782
+ }
771
783
}
772
784
return nil
773
785
}
774
786
787
+ // compactionJobDescription generates a redacted description of the job.
775
788
func compactionJobDescription (details jobspb.BackupDetails ) (string , error ) {
776
789
fmtCtx := tree .NewFmtCtx (tree .FmtSimple )
777
790
redactedURIs , err := sanitizeURIList (details .Destination .To )
@@ -822,8 +835,7 @@ func doCompaction(
822
835
)
823
836
}
824
837
checkpointLoop := func (ctx context.Context ) error {
825
- // TODO (kev-cao): Add logic for checkpointing during loop.
826
- return processProgress (ctx , manifest , progCh )
838
+ return processProgress (ctx , execCtx , details , manifest , progCh , kmsEnv )
827
839
}
828
840
// TODO (kev-cao): Add trace aggregator loop.
829
841
@@ -838,6 +850,49 @@ func doCompaction(
838
850
)
839
851
}
840
852
853
+ // updateManifestWithProgress takes a progress update from the processors and
854
+ // updates the backup manifest accordingly.
855
+ func updateManifestWithProgress (
856
+ progDetails backuppb.BackupManifest_Progress , manifest * backuppb.BackupManifest ,
857
+ ) {
858
+ for _ , file := range progDetails .Files {
859
+ manifest .Files = append (manifest .Files , file )
860
+ manifest .EntryCounts .Add (file .EntryCounts )
861
+ }
862
+ }
863
+
864
+ // maybeWriteBackupCheckpoint writes a checkpoint for the backup if
865
+ // the time since the last checkpoint exceeds the configured interval. If a
866
+ // checkpoint is written, the function returns true.
867
+ func maybeWriteBackupCheckpoint (
868
+ ctx context.Context ,
869
+ execCtx sql.JobExecContext ,
870
+ details jobspb.BackupDetails ,
871
+ manifest * backuppb.BackupManifest ,
872
+ lastCheckpointTime time.Time ,
873
+ kmsEnv cloud.KMSEnv ,
874
+ ) (bool , error ) {
875
+ if details .URI == "" {
876
+ return false , errors .New ("backup details does not contain a default URI" )
877
+ }
878
+ execCfg := execCtx .ExecCfg ()
879
+ interval := BackupCheckpointInterval .Get (& execCfg .Settings .SV )
880
+ if timeutil .Since (lastCheckpointTime ) < interval {
881
+ return false , nil
882
+ }
883
+ if err := backupinfo .WriteBackupManifestCheckpoint (
884
+ ctx , details .URI , details .EncryptionOptions , kmsEnv ,
885
+ manifest , execCfg , execCtx .User (),
886
+ ); err != nil {
887
+ return false , err
888
+ }
889
+ backupRestoreKnobs := execCfg .BackupRestoreTestingKnobs
890
+ if backupRestoreKnobs != nil && backupRestoreKnobs .AfterCompactBackupsCheckpoint != nil {
891
+ backupRestoreKnobs .AfterCompactBackupsCheckpoint ()
892
+ }
893
+ return true , nil
894
+ }
895
+
841
896
func init () {
842
897
builtins .StartCompactionJob = StartCompactionJob
843
898
}
0 commit comments