Skip to content

Commit 5601aef

Browse files
committed
backup: add checkpointing to backup compactions
This patch adds periodic checkpointing to backup compaction jobs. Epic: None Release note: None
1 parent 6e53270 commit 5601aef

File tree

4 files changed

+142
-23
lines changed

4 files changed

+142
-23
lines changed

Diff for: pkg/backup/compaction_job.go

+55-21
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,7 @@ import (
4040
"github.com/cockroachdb/cockroach/pkg/util/hlc"
4141
"github.com/cockroachdb/cockroach/pkg/util/log"
4242
"github.com/cockroachdb/cockroach/pkg/util/retry"
43+
"github.com/cockroachdb/cockroach/pkg/util/timeutil"
4344
"github.com/cockroachdb/cockroach/pkg/util/uuid"
4445
"github.com/cockroachdb/errors"
4546
"github.com/gogo/protobuf/types"
@@ -231,8 +232,8 @@ func (b *backupResumer) ResumeCompaction(
231232

232233
var backupManifest *backuppb.BackupManifest
233234
updatedDetails := initialDetails
235+
testingKnobs := execCtx.ExecCfg().BackupRestoreTestingKnobs
234236
if initialDetails.URI == "" {
235-
testingKnobs := execCtx.ExecCfg().BackupRestoreTestingKnobs
236237
if testingKnobs != nil && testingKnobs.RunBeforeResolvingCompactionDest != nil {
237238
if err := testingKnobs.RunBeforeResolvingCompactionDest(); err != nil {
238239
return err
@@ -263,21 +264,13 @@ func (b *backupResumer) ResumeCompaction(
263264
return err
264265
}
265266

266-
if err := execCtx.ExecCfg().JobRegistry.CheckPausepoint("backup.before.write_first_checkpoint"); err != nil {
267-
return err
268-
}
269-
270267
if err := backupinfo.WriteBackupManifestCheckpoint(
271268
ctx, updatedDetails.URI, updatedDetails.EncryptionOptions, kmsEnv,
272269
backupManifest, execCtx.ExecCfg(), execCtx.User(),
273270
); err != nil {
274271
return err
275272
}
276273

277-
if err := execCtx.ExecCfg().JobRegistry.CheckPausepoint("backup.after.write_first_checkpoint"); err != nil {
278-
return err
279-
}
280-
281274
description := maybeUpdateJobDescription(
282275
initialDetails, updatedDetails, b.job.Payload().Description,
283276
)
@@ -297,7 +290,7 @@ func (b *backupResumer) ResumeCompaction(
297290
return err
298291
}
299292

300-
if err := execCtx.ExecCfg().JobRegistry.CheckPausepoint("backup.after.details_has_checkpoint"); err != nil {
293+
if err := execCtx.ExecCfg().JobRegistry.CheckPausepoint("backup_compaction.after.details_has_checkpoint"); err != nil {
301294
return err
302295
}
303296
// TODO (kev-cao): Add telemetry for backup compactions.
@@ -347,6 +340,10 @@ func (b *backupResumer) ResumeCompaction(
347340
}
348341
}
349342

343+
if testingKnobs != nil && testingKnobs.AfterLoadingCompactionManifestOnResume != nil {
344+
testingKnobs.AfterLoadingCompactionManifestOnResume(backupManifest)
345+
}
346+
350347
// We retry on pretty generic failures -- any rpc error. If a worker node were
351348
// to restart, it would produce this kind of error, but there may be other
352349
// errors that are also rpc errors. Don't retry too aggressively.
@@ -355,13 +352,8 @@ func (b *backupResumer) ResumeCompaction(
355352
MaxRetries: 5,
356353
}
357354

358-
if execCtx.ExecCfg().BackupRestoreTestingKnobs != nil &&
359-
execCtx.ExecCfg().BackupRestoreTestingKnobs.BackupDistSQLRetryPolicy != nil {
360-
retryOpts = *execCtx.ExecCfg().BackupRestoreTestingKnobs.BackupDistSQLRetryPolicy
361-
}
362-
363-
if err := execCtx.ExecCfg().JobRegistry.CheckPausepoint("backup.before.flow"); err != nil {
364-
return err
355+
if testingKnobs != nil && testingKnobs.BackupDistSQLRetryPolicy != nil {
356+
retryOpts = *testingKnobs.BackupDistSQLRetryPolicy
365357
}
366358

367359
// We want to retry a backup if there are transient failures (i.e. worker nodes
@@ -389,8 +381,6 @@ func (b *backupResumer) ResumeCompaction(
389381

390382
// Reload the backup manifest to pick up any spans we may have completed on
391383
// previous attempts.
392-
// TODO (kev-cao): Compactions currently do not create checkpoints, but this
393-
// can be used to reload the manifest once we add checkpointing.
394384
var reloadBackupErr error
395385
mem.Shrink(ctx, memSize)
396386
backupManifest, memSize, reloadBackupErr = b.readManifestOnResume(ctx, &mem, execCtx.ExecCfg(),
@@ -765,9 +755,13 @@ func concludeBackupCompaction(
765755
// the associated manifest.
766756
func processProgress(
767757
ctx context.Context,
758+
execCtx sql.JobExecContext,
759+
details jobspb.BackupDetails,
768760
manifest *backuppb.BackupManifest,
769761
progCh <-chan *execinfrapb.RemoteProducerMetadata_BulkProcessorProgress,
762+
kmsEnv cloud.KMSEnv,
770763
) error {
764+
var lastCheckpointTime time.Time
771765
// When a processor is done exporting a span, it will send a progress update
772766
// to progCh.
773767
for progress := range progCh {
@@ -780,11 +774,24 @@ func processProgress(
780774
manifest.Files = append(manifest.Files, file)
781775
manifest.EntryCounts.Add(file.EntryCounts)
782776
}
777+
783778
// TODO (kev-cao): Add per node progress updates.
779+
780+
if wroteCheckpoint, err := maybeWriteBackupCheckpoint(
781+
ctx, execCtx, details, manifest, lastCheckpointTime, kmsEnv,
782+
); err != nil {
783+
log.Errorf(ctx, "unable to checkpoint compaction: %+v", err)
784+
} else if wroteCheckpoint {
785+
lastCheckpointTime = timeutil.Now()
786+
if err := execCtx.ExecCfg().JobRegistry.CheckPausepoint("backup_compaction.after.write_checkpoint"); err != nil {
787+
return err
788+
}
789+
}
784790
}
785791
return nil
786792
}
787793

794+
// compactionJobDescription generates a redacted description of the job.
788795
func compactionJobDescription(details jobspb.BackupDetails) (string, error) {
789796
fmtCtx := tree.NewFmtCtx(tree.FmtSimple)
790797
redactedURIs, err := sanitizeURIList(details.Destination.To)
@@ -835,8 +842,7 @@ func doCompaction(
835842
)
836843
}
837844
checkpointLoop := func(ctx context.Context) error {
838-
// TODO (kev-cao): Add logic for checkpointing during loop.
839-
return processProgress(ctx, manifest, progCh)
845+
return processProgress(ctx, execCtx, details, manifest, progCh, kmsEnv)
840846
}
841847
// TODO (kev-cao): Add trace aggregator loop.
842848

@@ -851,6 +857,34 @@ func doCompaction(
851857
)
852858
}
853859

860+
// maybeWriteBackupCheckpoint writes a checkpoint for the backup if
861+
// the time since the last checkpoint exceeds the configured interval. If a
862+
// checkpoint is written, the function returns true.
863+
func maybeWriteBackupCheckpoint(
864+
ctx context.Context,
865+
execCtx sql.JobExecContext,
866+
details jobspb.BackupDetails,
867+
manifest *backuppb.BackupManifest,
868+
lastCheckpointTime time.Time,
869+
kmsEnv cloud.KMSEnv,
870+
) (bool, error) {
871+
if details.URI == "" {
872+
return false, errors.New("backup details does not contain a default URI")
873+
}
874+
execCfg := execCtx.ExecCfg()
875+
interval := BackupCheckpointInterval.Get(&execCfg.Settings.SV)
876+
if timeutil.Since(lastCheckpointTime) < interval {
877+
return false, nil
878+
}
879+
if err := backupinfo.WriteBackupManifestCheckpoint(
880+
ctx, details.URI, details.EncryptionOptions, kmsEnv,
881+
manifest, execCfg, execCtx.User(),
882+
); err != nil {
883+
return false, err
884+
}
885+
return true, nil
886+
}
887+
854888
func init() {
855889
builtins.StartCompactionJob = StartCompactionJob
856890
}

Diff for: pkg/backup/compaction_test.go

+81-2
Original file line numberDiff line numberDiff line change
@@ -12,10 +12,12 @@ import (
1212
"net/url"
1313
"strconv"
1414
"strings"
15+
"sync/atomic"
1516
"testing"
1617
"time"
1718

1819
"github.com/cockroachdb/cockroach/pkg/backup/backupinfo"
20+
"github.com/cockroachdb/cockroach/pkg/backup/backuppb"
1921
"github.com/cockroachdb/cockroach/pkg/base"
2022
"github.com/cockroachdb/cockroach/pkg/jobs"
2123
"github.com/cockroachdb/cockroach/pkg/jobs/jobspb"
@@ -310,7 +312,7 @@ crdb_internal.json_to_pb(
310312
db.Exec(t, "INSERT INTO foo VALUES (3, 3)")
311313
end := getTime(t)
312314
db.Exec(t, fmt.Sprintf(incBackupAostCmd, 7, end))
313-
db.Exec(t, "SET CLUSTER SETTING jobs.debug.pausepoints = 'backup.after.details_has_checkpoint'")
315+
db.Exec(t, "SET CLUSTER SETTING jobs.debug.pausepoints = 'backup_compaction.after.details_has_checkpoint'")
314316

315317
var backupPath string
316318
db.QueryRow(t, "SHOW BACKUPS IN 'nodelocal://1/backup/7'").Scan(&backupPath)
@@ -324,7 +326,7 @@ crdb_internal.json_to_pb(
324326
db.Exec(t, "INSERT INTO foo VALUES (4, 4)")
325327
end = getTime(t)
326328
db.Exec(t, fmt.Sprintf(incBackupAostCmd, 7, end))
327-
db.Exec(t, "SET CLUSTER SETTING jobs.debug.pausepoints = 'backup.after.details_has_checkpoint'")
329+
db.Exec(t, "SET CLUSTER SETTING jobs.debug.pausepoints = 'backup_compaction.after.details_has_checkpoint'")
328330
jobID = startCompaction(7, backupPath, start, end)
329331
jobutils.WaitForJobToPause(t, db, jobID)
330332
db.Exec(t, "CANCEL JOB $1", jobID)
@@ -675,6 +677,83 @@ func TestBackupCompactionUnsupportedOptions(t *testing.T) {
675677
}
676678
}
677679

680+
func TestCompactionCheckpointing(t *testing.T) {
681+
defer leaktest.AfterTest(t)()
682+
defer log.Scope(t).Close(t)
683+
684+
// The backup needs to be large enough that checkpoints will be created, so
685+
// we pick a large number of accounts.
686+
const numAccounts = 1000
687+
var manifestNumFiles atomic.Int32
688+
manifestNumFiles.Store(-1) // -1 means we haven't seen the manifests yet.
689+
tc, db, _, cleanup := backupRestoreTestSetupWithParams(
690+
t, singleNode, numAccounts, InitManualReplication, base.TestClusterArgs{
691+
ServerArgs: base.TestServerArgs{
692+
Knobs: base.TestingKnobs{
693+
BackupRestore: &sql.BackupRestoreTestingKnobs{
694+
AfterLoadingCompactionManifestOnResume: func(m *backuppb.BackupManifest) {
695+
manifestNumFiles.Store(int32(len(m.Files)))
696+
},
697+
},
698+
},
699+
},
700+
},
701+
)
702+
defer cleanup()
703+
writeQueries := func() {
704+
db.Exec(t, "UPDATE data.bank SET balance = balance + 1")
705+
}
706+
db.Exec(t, "SET CLUSTER SETTING bulkio.backup.checkpoint_interval = '10ms'")
707+
start := getTime(t)
708+
db.Exec(t, fmt.Sprintf("BACKUP INTO 'nodelocal://1/backup' AS OF SYSTEM TIME %d", start))
709+
writeQueries()
710+
db.Exec(t, "BACKUP INTO LATEST IN 'nodelocal://1/backup'")
711+
writeQueries()
712+
end := getTime(t)
713+
db.Exec(t, fmt.Sprintf("BACKUP INTO LATEST IN 'nodelocal://1/backup' AS OF SYSTEM TIME %d", end))
714+
715+
var backupPath string
716+
db.QueryRow(t, "SHOW BACKUPS IN 'nodelocal://1/backup'").Scan(&backupPath)
717+
718+
db.Exec(t, "SET CLUSTER SETTING jobs.debug.pausepoints = 'backup_compaction.after.write_checkpoint'")
719+
var jobID jobspb.JobID
720+
db.QueryRow(
721+
t,
722+
"SELECT crdb_internal.backup_compaction(ARRAY['nodelocal://1/backup'], $1, ''::BYTES, $2, $3)",
723+
backupPath, start, end,
724+
).Scan(&jobID)
725+
// Ensure that the very first manifest when the job is initially started has
726+
// no files.
727+
testutils.SucceedsSoon(t, func() error {
728+
if manifestNumFiles.Load() < 0 {
729+
return fmt.Errorf("waiting for manifest to be loaded")
730+
}
731+
return nil
732+
})
733+
require.Equal(t, 0, manifestNumFiles.Load(), "expected no files in manifest")
734+
735+
// Wait for the job to hit the pausepoint.
736+
jobutils.WaitForJobToPause(t, db, jobID)
737+
// Don't bother pausing on other checkpoints.
738+
db.Exec(t, "SET CLUSTER SETTING jobs.debug.pausepoints = ''")
739+
db.Exec(t, "RESUME JOB $1", jobID)
740+
jobutils.WaitForJobToRun(t, db, jobID)
741+
742+
// Now that the job has been paused and resumed after previously hitting a
743+
// checkpoint, the initial manifest at the start of the job should have
744+
// some files from before the pause.
745+
testutils.SucceedsSoon(t, func() error {
746+
if manifestNumFiles.Load() <= 0 {
747+
return fmt.Errorf("waiting for manifest to be loaded")
748+
}
749+
return nil
750+
})
751+
require.Greater(t, manifestNumFiles.Load(), 0, "expected non-zero number of files in manifest")
752+
753+
waitForSuccessfulJob(t, tc, jobID)
754+
validateCompactedBackupForTables(t, db, []string{"bank"}, "'nodelocal://1/backup'", start, end)
755+
}
756+
678757
// Start and end are unix epoch in nanoseconds.
679758
func validateCompactedBackupForTables(
680759
t *testing.T, db *sqlutils.SQLRunner, tables []string, collectionURIs string, start, end int64,

Diff for: pkg/sql/BUILD.bazel

+1
Original file line numberDiff line numberDiff line change
@@ -301,6 +301,7 @@ go_library(
301301
importpath = "github.com/cockroachdb/cockroach/pkg/sql",
302302
visibility = ["//visibility:public"],
303303
deps = [
304+
"//pkg/backup/backuppb",
304305
"//pkg/base",
305306
"//pkg/build",
306307
"//pkg/cloud",

Diff for: pkg/sql/exec_util.go

+5
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@ import (
2121
"time"
2222

2323
apd "github.com/cockroachdb/apd/v3"
24+
"github.com/cockroachdb/cockroach/pkg/backup/backuppb"
2425
"github.com/cockroachdb/cockroach/pkg/base"
2526
"github.com/cockroachdb/cockroach/pkg/cloud/externalconn"
2627
"github.com/cockroachdb/cockroach/pkg/clusterversion"
@@ -1873,6 +1874,10 @@ type BackupRestoreTestingKnobs struct {
18731874
// is written.
18741875
AfterBackupCheckpoint func()
18751876

1877+
// AfterLoadingCompactionManifestOnResume is run once the backup manifest has been
1878+
// loaded/created on the resumption of a compaction job.
1879+
AfterLoadingCompactionManifestOnResume func(manifest *backuppb.BackupManifest)
1880+
18761881
// CaptureResolvedTableDescSpans allows for intercepting the spans which are
18771882
// resolved during backup planning, and will eventually be backed up during
18781883
// execution.

0 commit comments

Comments
 (0)