Skip to content

Commit 96b9623

Browse files
committed
backup: add checkpointing to backup compactions
This patch adds periodic checkpointing to backup compaction jobs. Epic: None Release note: None
1 parent 6e53270 commit 96b9623

File tree

4 files changed

+151
-14
lines changed

4 files changed

+151
-14
lines changed

Diff for: pkg/backup/compaction_job.go

+58-12
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,7 @@ import (
4040
"github.com/cockroachdb/cockroach/pkg/util/hlc"
4141
"github.com/cockroachdb/cockroach/pkg/util/log"
4242
"github.com/cockroachdb/cockroach/pkg/util/retry"
43+
"github.com/cockroachdb/cockroach/pkg/util/timeutil"
4344
"github.com/cockroachdb/cockroach/pkg/util/uuid"
4445
"github.com/cockroachdb/errors"
4546
"github.com/gogo/protobuf/types"
@@ -231,8 +232,8 @@ func (b *backupResumer) ResumeCompaction(
231232

232233
var backupManifest *backuppb.BackupManifest
233234
updatedDetails := initialDetails
235+
testingKnobs := execCtx.ExecCfg().BackupRestoreTestingKnobs
234236
if initialDetails.URI == "" {
235-
testingKnobs := execCtx.ExecCfg().BackupRestoreTestingKnobs
236237
if testingKnobs != nil && testingKnobs.RunBeforeResolvingCompactionDest != nil {
237238
if err := testingKnobs.RunBeforeResolvingCompactionDest(); err != nil {
238239
return err
@@ -263,7 +264,7 @@ func (b *backupResumer) ResumeCompaction(
263264
return err
264265
}
265266

266-
if err := execCtx.ExecCfg().JobRegistry.CheckPausepoint("backup.before.write_first_checkpoint"); err != nil {
267+
if err := execCtx.ExecCfg().JobRegistry.CheckPausepoint("backup_compaction.before.write_first_checkpoint"); err != nil {
267268
return err
268269
}
269270

@@ -274,7 +275,7 @@ func (b *backupResumer) ResumeCompaction(
274275
return err
275276
}
276277

277-
if err := execCtx.ExecCfg().JobRegistry.CheckPausepoint("backup.after.write_first_checkpoint"); err != nil {
278+
if err := execCtx.ExecCfg().JobRegistry.CheckPausepoint("backup_compaction.after.write_first_checkpoint"); err != nil {
278279
return err
279280
}
280281

@@ -297,7 +298,7 @@ func (b *backupResumer) ResumeCompaction(
297298
return err
298299
}
299300

300-
if err := execCtx.ExecCfg().JobRegistry.CheckPausepoint("backup.after.details_has_checkpoint"); err != nil {
301+
if err := execCtx.ExecCfg().JobRegistry.CheckPausepoint("backup_compaction.after.details_has_checkpoint"); err != nil {
301302
return err
302303
}
303304
// TODO (kev-cao): Add telemetry for backup compactions.
@@ -347,6 +348,10 @@ func (b *backupResumer) ResumeCompaction(
347348
}
348349
}
349350

351+
if testingKnobs != nil && testingKnobs.AfterLoadingCompactionManifestOnResume != nil {
352+
testingKnobs.AfterLoadingCompactionManifestOnResume(backupManifest)
353+
}
354+
350355
// We retry on pretty generic failures -- any rpc error. If a worker node were
351356
// to restart, it would produce this kind of error, but there may be other
352357
// errors that are also rpc errors. Don't retry too aggressively.
@@ -355,12 +360,11 @@ func (b *backupResumer) ResumeCompaction(
355360
MaxRetries: 5,
356361
}
357362

358-
if execCtx.ExecCfg().BackupRestoreTestingKnobs != nil &&
359-
execCtx.ExecCfg().BackupRestoreTestingKnobs.BackupDistSQLRetryPolicy != nil {
360-
retryOpts = *execCtx.ExecCfg().BackupRestoreTestingKnobs.BackupDistSQLRetryPolicy
363+
if testingKnobs != nil && testingKnobs.BackupDistSQLRetryPolicy != nil {
364+
retryOpts = *testingKnobs.BackupDistSQLRetryPolicy
361365
}
362366

363-
if err := execCtx.ExecCfg().JobRegistry.CheckPausepoint("backup.before.flow"); err != nil {
367+
if err := execCtx.ExecCfg().JobRegistry.CheckPausepoint("backup_compaction.before.flow"); err != nil {
364368
return err
365369
}
366370

@@ -389,8 +393,6 @@ func (b *backupResumer) ResumeCompaction(
389393

390394
// Reload the backup manifest to pick up any spans we may have completed on
391395
// previous attempts.
392-
// TODO (kev-cao): Compactions currently do not create checkpoints, but this
393-
// can be used to reload the manifest once we add checkpointing.
394396
var reloadBackupErr error
395397
mem.Shrink(ctx, memSize)
396398
backupManifest, memSize, reloadBackupErr = b.readManifestOnResume(ctx, &mem, execCtx.ExecCfg(),
@@ -765,9 +767,13 @@ func concludeBackupCompaction(
765767
// the associated manifest.
766768
func processProgress(
767769
ctx context.Context,
770+
execCtx sql.JobExecContext,
771+
details jobspb.BackupDetails,
768772
manifest *backuppb.BackupManifest,
769773
progCh <-chan *execinfrapb.RemoteProducerMetadata_BulkProcessorProgress,
774+
kmsEnv cloud.KMSEnv,
770775
) error {
776+
var lastCheckpointTime time.Time
771777
// When a processor is done exporting a span, it will send a progress update
772778
// to progCh.
773779
for progress := range progCh {
@@ -780,11 +786,24 @@ func processProgress(
780786
manifest.Files = append(manifest.Files, file)
781787
manifest.EntryCounts.Add(file.EntryCounts)
782788
}
789+
783790
// TODO (kev-cao): Add per node progress updates.
791+
792+
if wroteCheckpoint, err := maybeWriteBackupCheckpoint(
793+
ctx, execCtx, details, manifest, lastCheckpointTime, kmsEnv,
794+
); err != nil {
795+
log.Errorf(ctx, "unable to checkpoint compaction: %+v", err)
796+
} else if wroteCheckpoint {
797+
lastCheckpointTime = timeutil.Now()
798+
if err := execCtx.ExecCfg().JobRegistry.CheckPausepoint("backup_compaction.after.write_checkpoint"); err != nil {
799+
return err
800+
}
801+
}
784802
}
785803
return nil
786804
}
787805

806+
// compactionJobDescription generates a redacted description of the job.
788807
func compactionJobDescription(details jobspb.BackupDetails) (string, error) {
789808
fmtCtx := tree.NewFmtCtx(tree.FmtSimple)
790809
redactedURIs, err := sanitizeURIList(details.Destination.To)
@@ -835,8 +854,7 @@ func doCompaction(
835854
)
836855
}
837856
checkpointLoop := func(ctx context.Context) error {
838-
// TODO (kev-cao): Add logic for checkpointing during loop.
839-
return processProgress(ctx, manifest, progCh)
857+
return processProgress(ctx, execCtx, details, manifest, progCh, kmsEnv)
840858
}
841859
// TODO (kev-cao): Add trace aggregator loop.
842860

@@ -851,6 +869,34 @@ func doCompaction(
851869
)
852870
}
853871

872+
// maybeWriteBackupCheckpoint writes a checkpoint for the backup if
873+
// the time since the last checkpoint exceeds the configured interval. If a
874+
// checkpoint is written, the function returns true.
875+
func maybeWriteBackupCheckpoint(
876+
ctx context.Context,
877+
execCtx sql.JobExecContext,
878+
details jobspb.BackupDetails,
879+
manifest *backuppb.BackupManifest,
880+
lastCheckpointTime time.Time,
881+
kmsEnv cloud.KMSEnv,
882+
) (bool, error) {
883+
if details.URI == "" {
884+
return false, errors.New("backup details does not contain a default URI")
885+
}
886+
execCfg := execCtx.ExecCfg()
887+
interval := BackupCheckpointInterval.Get(&execCfg.Settings.SV)
888+
if timeutil.Since(lastCheckpointTime) < interval {
889+
return false, nil
890+
}
891+
if err := backupinfo.WriteBackupManifestCheckpoint(
892+
ctx, details.URI, details.EncryptionOptions, kmsEnv,
893+
manifest, execCfg, execCtx.User(),
894+
); err != nil {
895+
return false, err
896+
}
897+
return true, nil
898+
}
899+
854900
func init() {
855901
builtins.StartCompactionJob = StartCompactionJob
856902
}

Diff for: pkg/backup/compaction_test.go

+87-2
Original file line numberDiff line numberDiff line change
@@ -12,10 +12,12 @@ import (
1212
"net/url"
1313
"strconv"
1414
"strings"
15+
"sync"
1516
"testing"
1617
"time"
1718

1819
"github.com/cockroachdb/cockroach/pkg/backup/backupinfo"
20+
"github.com/cockroachdb/cockroach/pkg/backup/backuppb"
1921
"github.com/cockroachdb/cockroach/pkg/base"
2022
"github.com/cockroachdb/cockroach/pkg/jobs"
2123
"github.com/cockroachdb/cockroach/pkg/jobs/jobspb"
@@ -310,7 +312,7 @@ crdb_internal.json_to_pb(
310312
db.Exec(t, "INSERT INTO foo VALUES (3, 3)")
311313
end := getTime(t)
312314
db.Exec(t, fmt.Sprintf(incBackupAostCmd, 7, end))
313-
db.Exec(t, "SET CLUSTER SETTING jobs.debug.pausepoints = 'backup.after.details_has_checkpoint'")
315+
db.Exec(t, "SET CLUSTER SETTING jobs.debug.pausepoints = 'backup_compaction.after.details_has_checkpoint'")
314316

315317
var backupPath string
316318
db.QueryRow(t, "SHOW BACKUPS IN 'nodelocal://1/backup/7'").Scan(&backupPath)
@@ -324,7 +326,7 @@ crdb_internal.json_to_pb(
324326
db.Exec(t, "INSERT INTO foo VALUES (4, 4)")
325327
end = getTime(t)
326328
db.Exec(t, fmt.Sprintf(incBackupAostCmd, 7, end))
327-
db.Exec(t, "SET CLUSTER SETTING jobs.debug.pausepoints = 'backup.after.details_has_checkpoint'")
329+
db.Exec(t, "SET CLUSTER SETTING jobs.debug.pausepoints = 'backup_compaction.after.details_has_checkpoint'")
328330
jobID = startCompaction(7, backupPath, start, end)
329331
jobutils.WaitForJobToPause(t, db, jobID)
330332
db.Exec(t, "CANCEL JOB $1", jobID)
@@ -675,6 +677,89 @@ func TestBackupCompactionUnsupportedOptions(t *testing.T) {
675677
}
676678
}
677679

680+
func TestCompactionCheckpointing(t *testing.T) {
681+
defer leaktest.AfterTest(t)()
682+
defer log.Scope(t).Close(t)
683+
684+
// Need a large enough backup so that it doesn't finish in one iteration
685+
const numAccounts = 10000
686+
var mu sync.Mutex
687+
manifestNumFiles := -1
688+
tc, db, _, cleanup := backupRestoreTestSetupWithParams(
689+
t, singleNode, numAccounts, InitManualReplication, base.TestClusterArgs{
690+
ServerArgs: base.TestServerArgs{
691+
Knobs: base.TestingKnobs{
692+
BackupRestore: &sql.BackupRestoreTestingKnobs{
693+
AfterLoadingCompactionManifestOnResume: func(m *backuppb.BackupManifest) {
694+
mu.Lock()
695+
defer mu.Unlock()
696+
manifestNumFiles = len(m.Files)
697+
},
698+
},
699+
},
700+
},
701+
},
702+
)
703+
defer cleanup()
704+
writeQueries := func() {
705+
db.Exec(t, "UPDATE data.bank SET balance = balance + 1")
706+
}
707+
db.Exec(t, "SET CLUSTER SETTING bulkio.backup.checkpoint_interval = '10ms'")
708+
start := getTime(t)
709+
db.Exec(t, fmt.Sprintf("BACKUP INTO 'nodelocal://1/backup' AS OF SYSTEM TIME %d", start))
710+
writeQueries()
711+
db.Exec(t, "BACKUP INTO LATEST IN 'nodelocal://1/backup'")
712+
writeQueries()
713+
end := getTime(t)
714+
db.Exec(t, fmt.Sprintf("BACKUP INTO LATEST IN 'nodelocal://1/backup' AS OF SYSTEM TIME %d", end))
715+
716+
var backupPath string
717+
db.QueryRow(t, "SHOW BACKUPS IN 'nodelocal://1/backup'").Scan(&backupPath)
718+
719+
db.Exec(t, "SET CLUSTER SETTING jobs.debug.pausepoints = 'backup_compaction.after.write_checkpoint'")
720+
var jobID jobspb.JobID
721+
db.QueryRow(
722+
t,
723+
"SELECT crdb_internal.backup_compaction(ARRAY['nodelocal://1/backup'], $1, ''::BYTES, $2, $3)",
724+
backupPath, start, end,
725+
).Scan(&jobID)
726+
// Ensure that the very first manifest when the job is initially started has
727+
// no files.
728+
testutils.SucceedsSoon(t, func() error {
729+
mu.Lock()
730+
defer mu.Unlock()
731+
if manifestNumFiles < 0 {
732+
return fmt.Errorf("waiting for manifest to be loaded")
733+
}
734+
return nil
735+
})
736+
require.Equal(t, 0, manifestNumFiles, "expected no files in manifest")
737+
manifestNumFiles = -1
738+
739+
// Wait for the job to hit the pausepoint.
740+
jobutils.WaitForJobToPause(t, db, jobID)
741+
// Don't bother pausing on other checkpoints.
742+
db.Exec(t, "SET CLUSTER SETTING jobs.debug.pausepoints = ''")
743+
db.Exec(t, "RESUME JOB $1", jobID)
744+
jobutils.WaitForJobToRun(t, db, jobID)
745+
746+
// Now that the job has been paused and resumed after previously hitting a
747+
// checkpoint, the initial manifest at the start of the job should have
748+
// some files from before the pause.
749+
testutils.SucceedsSoon(t, func() error {
750+
mu.Lock()
751+
defer mu.Unlock()
752+
if manifestNumFiles < 0 {
753+
return fmt.Errorf("waiting for manifest to be loaded")
754+
}
755+
return nil
756+
})
757+
require.Greater(t, manifestNumFiles, 0, "expected non-zero number of files in manifest")
758+
759+
waitForSuccessfulJob(t, tc, jobID)
760+
validateCompactedBackupForTables(t, db, []string{"bank"}, "'nodelocal://1/backup'", start, end)
761+
}
762+
678763
// Start and end are unix epoch in nanoseconds.
679764
func validateCompactedBackupForTables(
680765
t *testing.T, db *sqlutils.SQLRunner, tables []string, collectionURIs string, start, end int64,

Diff for: pkg/sql/BUILD.bazel

+1
Original file line numberDiff line numberDiff line change
@@ -301,6 +301,7 @@ go_library(
301301
importpath = "github.com/cockroachdb/cockroach/pkg/sql",
302302
visibility = ["//visibility:public"],
303303
deps = [
304+
"//pkg/backup/backuppb",
304305
"//pkg/base",
305306
"//pkg/build",
306307
"//pkg/cloud",

Diff for: pkg/sql/exec_util.go

+5
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@ import (
2121
"time"
2222

2323
apd "github.com/cockroachdb/apd/v3"
24+
"github.com/cockroachdb/cockroach/pkg/backup/backuppb"
2425
"github.com/cockroachdb/cockroach/pkg/base"
2526
"github.com/cockroachdb/cockroach/pkg/cloud/externalconn"
2627
"github.com/cockroachdb/cockroach/pkg/clusterversion"
@@ -1873,6 +1874,10 @@ type BackupRestoreTestingKnobs struct {
18731874
// is written.
18741875
AfterBackupCheckpoint func()
18751876

1877+
// AfterLoadingCompactionManifestOnResume is run once the backup manifest has been
1878+
// loaded/created on the resumption of a compaction job.
1879+
AfterLoadingCompactionManifestOnResume func(manifest *backuppb.BackupManifest)
1880+
18761881
// CaptureResolvedTableDescSpans allows for intercepting the spans which are
18771882
// resolved during backup planning, and will eventually be backed up during
18781883
// execution.

0 commit comments

Comments
 (0)