diff --git a/pkg/backup/compaction_job.go b/pkg/backup/compaction_job.go index efaa851c1edf..ba2c68e8fbfd 100644 --- a/pkg/backup/compaction_job.go +++ b/pkg/backup/compaction_job.go @@ -40,6 +40,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/util/hlc" "github.com/cockroachdb/cockroach/pkg/util/log" "github.com/cockroachdb/cockroach/pkg/util/retry" + "github.com/cockroachdb/cockroach/pkg/util/timeutil" "github.com/cockroachdb/cockroach/pkg/util/uuid" "github.com/cockroachdb/errors" "github.com/gogo/protobuf/types" @@ -231,8 +232,8 @@ func (b *backupResumer) ResumeCompaction( var backupManifest *backuppb.BackupManifest updatedDetails := initialDetails + testingKnobs := execCtx.ExecCfg().BackupRestoreTestingKnobs if initialDetails.URI == "" { - testingKnobs := execCtx.ExecCfg().BackupRestoreTestingKnobs if testingKnobs != nil && testingKnobs.RunBeforeResolvingCompactionDest != nil { if err := testingKnobs.RunBeforeResolvingCompactionDest(); err != nil { return err @@ -263,10 +264,6 @@ func (b *backupResumer) ResumeCompaction( return err } - if err := execCtx.ExecCfg().JobRegistry.CheckPausepoint("backup.before.write_first_checkpoint"); err != nil { - return err - } - if err := backupinfo.WriteBackupManifestCheckpoint( ctx, updatedDetails.URI, updatedDetails.EncryptionOptions, kmsEnv, backupManifest, execCtx.ExecCfg(), execCtx.User(), @@ -274,10 +271,6 @@ func (b *backupResumer) ResumeCompaction( return err } - if err := execCtx.ExecCfg().JobRegistry.CheckPausepoint("backup.after.write_first_checkpoint"); err != nil { - return err - } - description := maybeUpdateJobDescription( initialDetails, updatedDetails, b.job.Payload().Description, ) @@ -297,7 +290,7 @@ func (b *backupResumer) ResumeCompaction( return err } - if err := execCtx.ExecCfg().JobRegistry.CheckPausepoint("backup.after.details_has_checkpoint"); err != nil { + if err := execCtx.ExecCfg().JobRegistry.CheckPausepoint("backup_compaction.after.details_has_checkpoint"); err != nil { return err } // TODO (kev-cao): Add telemetry for backup compactions. @@ -347,6 +340,10 @@ func (b *backupResumer) ResumeCompaction( } } + if testingKnobs != nil && testingKnobs.AfterLoadingCompactionManifestOnResume != nil { + testingKnobs.AfterLoadingCompactionManifestOnResume(backupManifest) + } + // We retry on pretty generic failures -- any rpc error. If a worker node were // to restart, it would produce this kind of error, but there may be other // errors that are also rpc errors. Don't retry too aggressively. @@ -355,13 +352,8 @@ func (b *backupResumer) ResumeCompaction( MaxRetries: 5, } - if execCtx.ExecCfg().BackupRestoreTestingKnobs != nil && - execCtx.ExecCfg().BackupRestoreTestingKnobs.BackupDistSQLRetryPolicy != nil { - retryOpts = *execCtx.ExecCfg().BackupRestoreTestingKnobs.BackupDistSQLRetryPolicy - } - - if err := execCtx.ExecCfg().JobRegistry.CheckPausepoint("backup.before.flow"); err != nil { - return err + if testingKnobs != nil && testingKnobs.BackupDistSQLRetryPolicy != nil { + retryOpts = *testingKnobs.BackupDistSQLRetryPolicy } // We want to retry a backup if there are transient failures (i.e. worker nodes @@ -389,8 +381,6 @@ func (b *backupResumer) ResumeCompaction( // Reload the backup manifest to pick up any spans we may have completed on // previous attempts. - // TODO (kev-cao): Compactions currently do not create checkpoints, but this - // can be used to reload the manifest once we add checkpointing. var reloadBackupErr error mem.Shrink(ctx, memSize) backupManifest, memSize, reloadBackupErr = b.readManifestOnResume(ctx, &mem, execCtx.ExecCfg(), @@ -765,9 +755,13 @@ func concludeBackupCompaction( // the associated manifest. func processProgress( ctx context.Context, + execCtx sql.JobExecContext, + details jobspb.BackupDetails, manifest *backuppb.BackupManifest, progCh <-chan *execinfrapb.RemoteProducerMetadata_BulkProcessorProgress, + kmsEnv cloud.KMSEnv, ) error { + var lastCheckpointTime time.Time // When a processor is done exporting a span, it will send a progress update // to progCh. for progress := range progCh { @@ -780,11 +774,24 @@ func processProgress( manifest.Files = append(manifest.Files, file) manifest.EntryCounts.Add(file.EntryCounts) } + // TODO (kev-cao): Add per node progress updates. + + if wroteCheckpoint, err := maybeWriteBackupCheckpoint( + ctx, execCtx, details, manifest, lastCheckpointTime, kmsEnv, + ); err != nil { + log.Errorf(ctx, "unable to checkpoint compaction: %+v", err) + } else if wroteCheckpoint { + lastCheckpointTime = timeutil.Now() + if err := execCtx.ExecCfg().JobRegistry.CheckPausepoint("backup_compaction.after.write_checkpoint"); err != nil { + return err + } + } } return nil } +// compactionJobDescription generates a redacted description of the job. func compactionJobDescription(details jobspb.BackupDetails) (string, error) { fmtCtx := tree.NewFmtCtx(tree.FmtSimple) redactedURIs, err := sanitizeURIList(details.Destination.To) @@ -835,8 +842,7 @@ func doCompaction( ) } checkpointLoop := func(ctx context.Context) error { - // TODO (kev-cao): Add logic for checkpointing during loop. - return processProgress(ctx, manifest, progCh) + return processProgress(ctx, execCtx, details, manifest, progCh, kmsEnv) } // TODO (kev-cao): Add trace aggregator loop. @@ -851,6 +857,34 @@ func doCompaction( ) } +// maybeWriteBackupCheckpoint writes a checkpoint for the backup if +// the time since the last checkpoint exceeds the configured interval. If a +// checkpoint is written, the function returns true. +func maybeWriteBackupCheckpoint( + ctx context.Context, + execCtx sql.JobExecContext, + details jobspb.BackupDetails, + manifest *backuppb.BackupManifest, + lastCheckpointTime time.Time, + kmsEnv cloud.KMSEnv, +) (bool, error) { + if details.URI == "" { + return false, errors.New("backup details does not contain a default URI") + } + execCfg := execCtx.ExecCfg() + interval := BackupCheckpointInterval.Get(&execCfg.Settings.SV) + if timeutil.Since(lastCheckpointTime) < interval { + return false, nil + } + if err := backupinfo.WriteBackupManifestCheckpoint( + ctx, details.URI, details.EncryptionOptions, kmsEnv, + manifest, execCfg, execCtx.User(), + ); err != nil { + return false, err + } + return true, nil +} + func init() { builtins.StartCompactionJob = StartCompactionJob } diff --git a/pkg/backup/compaction_test.go b/pkg/backup/compaction_test.go index 53020d8885e6..7dc53009ef9d 100644 --- a/pkg/backup/compaction_test.go +++ b/pkg/backup/compaction_test.go @@ -12,10 +12,12 @@ import ( "net/url" "strconv" "strings" + "sync/atomic" "testing" "time" "github.com/cockroachdb/cockroach/pkg/backup/backupinfo" + "github.com/cockroachdb/cockroach/pkg/backup/backuppb" "github.com/cockroachdb/cockroach/pkg/base" "github.com/cockroachdb/cockroach/pkg/jobs" "github.com/cockroachdb/cockroach/pkg/jobs/jobspb" @@ -310,7 +312,7 @@ crdb_internal.json_to_pb( db.Exec(t, "INSERT INTO foo VALUES (3, 3)") end := getTime(t) db.Exec(t, fmt.Sprintf(incBackupAostCmd, 7, end)) - db.Exec(t, "SET CLUSTER SETTING jobs.debug.pausepoints = 'backup.after.details_has_checkpoint'") + db.Exec(t, "SET CLUSTER SETTING jobs.debug.pausepoints = 'backup_compaction.after.details_has_checkpoint'") var backupPath string db.QueryRow(t, "SHOW BACKUPS IN 'nodelocal://1/backup/7'").Scan(&backupPath) @@ -324,7 +326,7 @@ crdb_internal.json_to_pb( db.Exec(t, "INSERT INTO foo VALUES (4, 4)") end = getTime(t) db.Exec(t, fmt.Sprintf(incBackupAostCmd, 7, end)) - db.Exec(t, "SET CLUSTER SETTING jobs.debug.pausepoints = 'backup.after.details_has_checkpoint'") + db.Exec(t, "SET CLUSTER SETTING jobs.debug.pausepoints = 'backup_compaction.after.details_has_checkpoint'") jobID = startCompaction(7, backupPath, start, end) jobutils.WaitForJobToPause(t, db, jobID) db.Exec(t, "CANCEL JOB $1", jobID) @@ -675,6 +677,83 @@ func TestBackupCompactionUnsupportedOptions(t *testing.T) { } } +func TestCompactionCheckpointing(t *testing.T) { + defer leaktest.AfterTest(t)() + defer log.Scope(t).Close(t) + + // The backup needs to be large enough that checkpoints will be created, so + // we pick a large number of accounts. + const numAccounts = 1000 + var manifestNumFiles atomic.Int32 + manifestNumFiles.Store(-1) // -1 means we haven't seen the manifests yet. + tc, db, _, cleanup := backupRestoreTestSetupWithParams( + t, singleNode, numAccounts, InitManualReplication, base.TestClusterArgs{ + ServerArgs: base.TestServerArgs{ + Knobs: base.TestingKnobs{ + BackupRestore: &sql.BackupRestoreTestingKnobs{ + AfterLoadingCompactionManifestOnResume: func(m *backuppb.BackupManifest) { + manifestNumFiles.Store(int32(len(m.Files))) + }, + }, + }, + }, + }, + ) + defer cleanup() + writeQueries := func() { + db.Exec(t, "UPDATE data.bank SET balance = balance + 1") + } + db.Exec(t, "SET CLUSTER SETTING bulkio.backup.checkpoint_interval = '10ms'") + start := getTime(t) + db.Exec(t, fmt.Sprintf("BACKUP INTO 'nodelocal://1/backup' AS OF SYSTEM TIME %d", start)) + writeQueries() + db.Exec(t, "BACKUP INTO LATEST IN 'nodelocal://1/backup'") + writeQueries() + end := getTime(t) + db.Exec(t, fmt.Sprintf("BACKUP INTO LATEST IN 'nodelocal://1/backup' AS OF SYSTEM TIME %d", end)) + + var backupPath string + db.QueryRow(t, "SHOW BACKUPS IN 'nodelocal://1/backup'").Scan(&backupPath) + + db.Exec(t, "SET CLUSTER SETTING jobs.debug.pausepoints = 'backup_compaction.after.write_checkpoint'") + var jobID jobspb.JobID + db.QueryRow( + t, + "SELECT crdb_internal.backup_compaction(ARRAY['nodelocal://1/backup'], $1, ''::BYTES, $2, $3)", + backupPath, start, end, + ).Scan(&jobID) + // Ensure that the very first manifest when the job is initially started has + // no files. + testutils.SucceedsSoon(t, func() error { + if manifestNumFiles.Load() < 0 { + return fmt.Errorf("waiting for manifest to be loaded") + } + return nil + }) + require.Equal(t, 0, int(manifestNumFiles.Load()), "expected no files in manifest") + + // Wait for the job to hit the pausepoint. + jobutils.WaitForJobToPause(t, db, jobID) + // Don't bother pausing on other checkpoints. + db.Exec(t, "SET CLUSTER SETTING jobs.debug.pausepoints = ''") + db.Exec(t, "RESUME JOB $1", jobID) + jobutils.WaitForJobToRun(t, db, jobID) + + // Now that the job has been paused and resumed after previously hitting a + // checkpoint, the initial manifest at the start of the job should have + // some files from before the pause. + testutils.SucceedsSoon(t, func() error { + if manifestNumFiles.Load() <= 0 { + return fmt.Errorf("waiting for manifest to be loaded") + } + return nil + }) + require.Greater(t, int(manifestNumFiles.Load()), 0, "expected non-zero number of files in manifest") + + waitForSuccessfulJob(t, tc, jobID) + validateCompactedBackupForTables(t, db, []string{"bank"}, "'nodelocal://1/backup'", start, end) +} + // Start and end are unix epoch in nanoseconds. func validateCompactedBackupForTables( t *testing.T, db *sqlutils.SQLRunner, tables []string, collectionURIs string, start, end int64, diff --git a/pkg/sql/BUILD.bazel b/pkg/sql/BUILD.bazel index 5dcc133aaac9..2cdbe1debd53 100644 --- a/pkg/sql/BUILD.bazel +++ b/pkg/sql/BUILD.bazel @@ -301,6 +301,7 @@ go_library( importpath = "github.com/cockroachdb/cockroach/pkg/sql", visibility = ["//visibility:public"], deps = [ + "//pkg/backup/backuppb", "//pkg/base", "//pkg/build", "//pkg/cloud", diff --git a/pkg/sql/exec_util.go b/pkg/sql/exec_util.go index 3d5e8417968d..77f78fe3fac0 100644 --- a/pkg/sql/exec_util.go +++ b/pkg/sql/exec_util.go @@ -21,6 +21,7 @@ import ( "time" apd "github.com/cockroachdb/apd/v3" + "github.com/cockroachdb/cockroach/pkg/backup/backuppb" "github.com/cockroachdb/cockroach/pkg/base" "github.com/cockroachdb/cockroach/pkg/cloud/externalconn" "github.com/cockroachdb/cockroach/pkg/clusterversion" @@ -1873,6 +1874,10 @@ type BackupRestoreTestingKnobs struct { // is written. AfterBackupCheckpoint func() + // AfterLoadingCompactionManifestOnResume is run once the backup manifest has been + // loaded/created on the resumption of a compaction job. + AfterLoadingCompactionManifestOnResume func(manifest *backuppb.BackupManifest) + // CaptureResolvedTableDescSpans allows for intercepting the spans which are // resolved during backup planning, and will eventually be backed up during // execution.