Skip to content

backup: add checkpointing to backup compactions #143849

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Apr 8, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
76 changes: 55 additions & 21 deletions pkg/backup/compaction_job.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/util/hlc"
"github.com/cockroachdb/cockroach/pkg/util/log"
"github.com/cockroachdb/cockroach/pkg/util/retry"
"github.com/cockroachdb/cockroach/pkg/util/timeutil"
"github.com/cockroachdb/cockroach/pkg/util/uuid"
"github.com/cockroachdb/errors"
"github.com/gogo/protobuf/types"
Expand Down Expand Up @@ -231,8 +232,8 @@ func (b *backupResumer) ResumeCompaction(

var backupManifest *backuppb.BackupManifest
updatedDetails := initialDetails
testingKnobs := execCtx.ExecCfg().BackupRestoreTestingKnobs
if initialDetails.URI == "" {
testingKnobs := execCtx.ExecCfg().BackupRestoreTestingKnobs
if testingKnobs != nil && testingKnobs.RunBeforeResolvingCompactionDest != nil {
if err := testingKnobs.RunBeforeResolvingCompactionDest(); err != nil {
return err
Expand Down Expand Up @@ -263,21 +264,13 @@ func (b *backupResumer) ResumeCompaction(
return err
}

if err := execCtx.ExecCfg().JobRegistry.CheckPausepoint("backup.before.write_first_checkpoint"); err != nil {
return err
}

if err := backupinfo.WriteBackupManifestCheckpoint(
ctx, updatedDetails.URI, updatedDetails.EncryptionOptions, kmsEnv,
backupManifest, execCtx.ExecCfg(), execCtx.User(),
); err != nil {
return err
}

if err := execCtx.ExecCfg().JobRegistry.CheckPausepoint("backup.after.write_first_checkpoint"); err != nil {
return err
}

description := maybeUpdateJobDescription(
initialDetails, updatedDetails, b.job.Payload().Description,
)
Expand All @@ -297,7 +290,7 @@ func (b *backupResumer) ResumeCompaction(
return err
}

if err := execCtx.ExecCfg().JobRegistry.CheckPausepoint("backup.after.details_has_checkpoint"); err != nil {
if err := execCtx.ExecCfg().JobRegistry.CheckPausepoint("backup_compaction.after.details_has_checkpoint"); err != nil {
return err
}
// TODO (kev-cao): Add telemetry for backup compactions.
Expand Down Expand Up @@ -347,6 +340,10 @@ func (b *backupResumer) ResumeCompaction(
}
}

if testingKnobs != nil && testingKnobs.AfterLoadingCompactionManifestOnResume != nil {
testingKnobs.AfterLoadingCompactionManifestOnResume(backupManifest)
}

// We retry on pretty generic failures -- any rpc error. If a worker node were
// to restart, it would produce this kind of error, but there may be other
// errors that are also rpc errors. Don't retry too aggressively.
Expand All @@ -355,13 +352,8 @@ func (b *backupResumer) ResumeCompaction(
MaxRetries: 5,
}

if execCtx.ExecCfg().BackupRestoreTestingKnobs != nil &&
execCtx.ExecCfg().BackupRestoreTestingKnobs.BackupDistSQLRetryPolicy != nil {
retryOpts = *execCtx.ExecCfg().BackupRestoreTestingKnobs.BackupDistSQLRetryPolicy
}

if err := execCtx.ExecCfg().JobRegistry.CheckPausepoint("backup.before.flow"); err != nil {
return err
if testingKnobs != nil && testingKnobs.BackupDistSQLRetryPolicy != nil {
retryOpts = *testingKnobs.BackupDistSQLRetryPolicy
}

// We want to retry a backup if there are transient failures (i.e. worker nodes
Expand Down Expand Up @@ -389,8 +381,6 @@ func (b *backupResumer) ResumeCompaction(

// Reload the backup manifest to pick up any spans we may have completed on
// previous attempts.
// TODO (kev-cao): Compactions currently do not create checkpoints, but this
// can be used to reload the manifest once we add checkpointing.
var reloadBackupErr error
mem.Shrink(ctx, memSize)
backupManifest, memSize, reloadBackupErr = b.readManifestOnResume(ctx, &mem, execCtx.ExecCfg(),
Expand Down Expand Up @@ -765,9 +755,13 @@ func concludeBackupCompaction(
// the associated manifest.
func processProgress(
ctx context.Context,
execCtx sql.JobExecContext,
details jobspb.BackupDetails,
manifest *backuppb.BackupManifest,
progCh <-chan *execinfrapb.RemoteProducerMetadata_BulkProcessorProgress,
kmsEnv cloud.KMSEnv,
) error {
var lastCheckpointTime time.Time
// When a processor is done exporting a span, it will send a progress update
// to progCh.
for progress := range progCh {
Expand All @@ -780,11 +774,24 @@ func processProgress(
manifest.Files = append(manifest.Files, file)
manifest.EntryCounts.Add(file.EntryCounts)
}

// TODO (kev-cao): Add per node progress updates.

if wroteCheckpoint, err := maybeWriteBackupCheckpoint(
ctx, execCtx, details, manifest, lastCheckpointTime, kmsEnv,
); err != nil {
log.Errorf(ctx, "unable to checkpoint compaction: %+v", err)
} else if wroteCheckpoint {
lastCheckpointTime = timeutil.Now()
if err := execCtx.ExecCfg().JobRegistry.CheckPausepoint("backup_compaction.after.write_checkpoint"); err != nil {
return err
}
}
}
return nil
}

// compactionJobDescription generates a redacted description of the job.
func compactionJobDescription(details jobspb.BackupDetails) (string, error) {
fmtCtx := tree.NewFmtCtx(tree.FmtSimple)
redactedURIs, err := sanitizeURIList(details.Destination.To)
Expand Down Expand Up @@ -835,8 +842,7 @@ func doCompaction(
)
}
checkpointLoop := func(ctx context.Context) error {
// TODO (kev-cao): Add logic for checkpointing during loop.
return processProgress(ctx, manifest, progCh)
return processProgress(ctx, execCtx, details, manifest, progCh, kmsEnv)
}
// TODO (kev-cao): Add trace aggregator loop.

Expand All @@ -851,6 +857,34 @@ func doCompaction(
)
}

// maybeWriteBackupCheckpoint writes a checkpoint for the backup if
// the time since the last checkpoint exceeds the configured interval. If a
// checkpoint is written, the function returns true.
func maybeWriteBackupCheckpoint(
ctx context.Context,
execCtx sql.JobExecContext,
details jobspb.BackupDetails,
manifest *backuppb.BackupManifest,
lastCheckpointTime time.Time,
kmsEnv cloud.KMSEnv,
) (bool, error) {
if details.URI == "" {
return false, errors.New("backup details does not contain a default URI")
}
execCfg := execCtx.ExecCfg()
interval := BackupCheckpointInterval.Get(&execCfg.Settings.SV)
if timeutil.Since(lastCheckpointTime) < interval {
return false, nil
}
if err := backupinfo.WriteBackupManifestCheckpoint(
ctx, details.URI, details.EncryptionOptions, kmsEnv,
manifest, execCfg, execCtx.User(),
); err != nil {
return false, err
}
return true, nil
}

func init() {
builtins.StartCompactionJob = StartCompactionJob
}
83 changes: 81 additions & 2 deletions pkg/backup/compaction_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,10 +12,12 @@ import (
"net/url"
"strconv"
"strings"
"sync/atomic"
"testing"
"time"

"github.com/cockroachdb/cockroach/pkg/backup/backupinfo"
"github.com/cockroachdb/cockroach/pkg/backup/backuppb"
"github.com/cockroachdb/cockroach/pkg/base"
"github.com/cockroachdb/cockroach/pkg/jobs"
"github.com/cockroachdb/cockroach/pkg/jobs/jobspb"
Expand Down Expand Up @@ -310,7 +312,7 @@ crdb_internal.json_to_pb(
db.Exec(t, "INSERT INTO foo VALUES (3, 3)")
end := getTime(t)
db.Exec(t, fmt.Sprintf(incBackupAostCmd, 7, end))
db.Exec(t, "SET CLUSTER SETTING jobs.debug.pausepoints = 'backup.after.details_has_checkpoint'")
db.Exec(t, "SET CLUSTER SETTING jobs.debug.pausepoints = 'backup_compaction.after.details_has_checkpoint'")

var backupPath string
db.QueryRow(t, "SHOW BACKUPS IN 'nodelocal://1/backup/7'").Scan(&backupPath)
Expand All @@ -324,7 +326,7 @@ crdb_internal.json_to_pb(
db.Exec(t, "INSERT INTO foo VALUES (4, 4)")
end = getTime(t)
db.Exec(t, fmt.Sprintf(incBackupAostCmd, 7, end))
db.Exec(t, "SET CLUSTER SETTING jobs.debug.pausepoints = 'backup.after.details_has_checkpoint'")
db.Exec(t, "SET CLUSTER SETTING jobs.debug.pausepoints = 'backup_compaction.after.details_has_checkpoint'")
jobID = startCompaction(7, backupPath, start, end)
jobutils.WaitForJobToPause(t, db, jobID)
db.Exec(t, "CANCEL JOB $1", jobID)
Expand Down Expand Up @@ -675,6 +677,83 @@ func TestBackupCompactionUnsupportedOptions(t *testing.T) {
}
}

func TestCompactionCheckpointing(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)

// The backup needs to be large enough that checkpoints will be created, so
// we pick a large number of accounts.
const numAccounts = 1000
var manifestNumFiles atomic.Int32
manifestNumFiles.Store(-1) // -1 means we haven't seen the manifests yet.
tc, db, _, cleanup := backupRestoreTestSetupWithParams(
t, singleNode, numAccounts, InitManualReplication, base.TestClusterArgs{
ServerArgs: base.TestServerArgs{
Knobs: base.TestingKnobs{
BackupRestore: &sql.BackupRestoreTestingKnobs{
AfterLoadingCompactionManifestOnResume: func(m *backuppb.BackupManifest) {
manifestNumFiles.Store(int32(len(m.Files)))
},
},
},
},
},
)
defer cleanup()
writeQueries := func() {
db.Exec(t, "UPDATE data.bank SET balance = balance + 1")
}
db.Exec(t, "SET CLUSTER SETTING bulkio.backup.checkpoint_interval = '10ms'")
start := getTime(t)
db.Exec(t, fmt.Sprintf("BACKUP INTO 'nodelocal://1/backup' AS OF SYSTEM TIME %d", start))
writeQueries()
db.Exec(t, "BACKUP INTO LATEST IN 'nodelocal://1/backup'")
writeQueries()
end := getTime(t)
db.Exec(t, fmt.Sprintf("BACKUP INTO LATEST IN 'nodelocal://1/backup' AS OF SYSTEM TIME %d", end))

var backupPath string
db.QueryRow(t, "SHOW BACKUPS IN 'nodelocal://1/backup'").Scan(&backupPath)

db.Exec(t, "SET CLUSTER SETTING jobs.debug.pausepoints = 'backup_compaction.after.write_checkpoint'")
var jobID jobspb.JobID
db.QueryRow(
t,
"SELECT crdb_internal.backup_compaction(ARRAY['nodelocal://1/backup'], $1, ''::BYTES, $2, $3)",
backupPath, start, end,
).Scan(&jobID)
// Ensure that the very first manifest when the job is initially started has
// no files.
testutils.SucceedsSoon(t, func() error {
if manifestNumFiles.Load() < 0 {
return fmt.Errorf("waiting for manifest to be loaded")
}
return nil
})
require.Equal(t, 0, int(manifestNumFiles.Load()), "expected no files in manifest")

// Wait for the job to hit the pausepoint.
jobutils.WaitForJobToPause(t, db, jobID)
// Don't bother pausing on other checkpoints.
db.Exec(t, "SET CLUSTER SETTING jobs.debug.pausepoints = ''")
db.Exec(t, "RESUME JOB $1", jobID)
jobutils.WaitForJobToRun(t, db, jobID)

// Now that the job has been paused and resumed after previously hitting a
// checkpoint, the initial manifest at the start of the job should have
// some files from before the pause.
testutils.SucceedsSoon(t, func() error {
if manifestNumFiles.Load() <= 0 {
return fmt.Errorf("waiting for manifest to be loaded")
}
return nil
})
require.Greater(t, int(manifestNumFiles.Load()), 0, "expected non-zero number of files in manifest")

waitForSuccessfulJob(t, tc, jobID)
validateCompactedBackupForTables(t, db, []string{"bank"}, "'nodelocal://1/backup'", start, end)
}

// Start and end are unix epoch in nanoseconds.
func validateCompactedBackupForTables(
t *testing.T, db *sqlutils.SQLRunner, tables []string, collectionURIs string, start, end int64,
Expand Down
1 change: 1 addition & 0 deletions pkg/sql/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -301,6 +301,7 @@ go_library(
importpath = "github.com/cockroachdb/cockroach/pkg/sql",
visibility = ["//visibility:public"],
deps = [
"//pkg/backup/backuppb",
"//pkg/base",
"//pkg/build",
"//pkg/cloud",
Expand Down
5 changes: 5 additions & 0 deletions pkg/sql/exec_util.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
"time"

apd "github.com/cockroachdb/apd/v3"
"github.com/cockroachdb/cockroach/pkg/backup/backuppb"
"github.com/cockroachdb/cockroach/pkg/base"
"github.com/cockroachdb/cockroach/pkg/cloud/externalconn"
"github.com/cockroachdb/cockroach/pkg/clusterversion"
Expand Down Expand Up @@ -1873,6 +1874,10 @@ type BackupRestoreTestingKnobs struct {
// is written.
AfterBackupCheckpoint func()

// AfterLoadingCompactionManifestOnResume is run once the backup manifest has been
// loaded/created on the resumption of a compaction job.
AfterLoadingCompactionManifestOnResume func(manifest *backuppb.BackupManifest)

// CaptureResolvedTableDescSpans allows for intercepting the spans which are
// resolved during backup planning, and will eventually be backed up during
// execution.
Expand Down
Loading