This repository was archived by the owner on Sep 30, 2024. It is now read-only.
-
Notifications
You must be signed in to change notification settings - Fork 1.3k
/
Copy pathsyncer.go
750 lines (631 loc) · 24.4 KB
/
syncer.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
package repos
import (
"context"
"fmt"
"strconv"
"strings"
"time"
"github.com/sourcegraph/log"
"go.opentelemetry.io/otel/attribute"
"golang.org/x/sync/singleflight"
"golang.org/x/time/rate"
"github.com/sourcegraph/sourcegraph/internal/actor"
"github.com/sourcegraph/sourcegraph/internal/api"
"github.com/sourcegraph/sourcegraph/internal/conf"
"github.com/sourcegraph/sourcegraph/internal/database"
"github.com/sourcegraph/sourcegraph/internal/dotcom"
"github.com/sourcegraph/sourcegraph/internal/errcode"
"github.com/sourcegraph/sourcegraph/internal/goroutine"
"github.com/sourcegraph/sourcegraph/internal/licensing"
"github.com/sourcegraph/sourcegraph/internal/metrics"
"github.com/sourcegraph/sourcegraph/internal/observation"
"github.com/sourcegraph/sourcegraph/internal/trace"
"github.com/sourcegraph/sourcegraph/internal/types"
"github.com/sourcegraph/sourcegraph/lib/errors"
)
// A Syncer periodically synchronizes available repositories from all its given Sources
// with the stored Repositories in Sourcegraph.
type Syncer struct {
Sourcer Sourcer
Store Store
// Synced is sent a collection of Repos that were synced by Sync (only if Synced is non-nil)
Synced chan types.RepoSyncDiff
ObsvCtx *observation.Context
// Now is time.Now. Can be set by tests to get deterministic output.
Now func() time.Time
// Ensure that we only run one sync per repo at a time
syncGroup singleflight.Group
}
func NewSyncer(observationCtx *observation.Context, store Store, sourcer Sourcer) *Syncer {
return &Syncer{
Sourcer: sourcer,
Store: store,
Synced: make(chan types.RepoSyncDiff),
Now: func() time.Time { return time.Now().UTC() },
ObsvCtx: observation.ContextWithLogger(observationCtx.Logger.Scoped("syncer"), observationCtx),
}
}
// RunOptions contains options customizing Run behaviour.
type RunOptions struct {
EnqueueInterval func() time.Duration // Defaults to 1 minute
MinSyncInterval func() time.Duration // Defaults to 1 minute
DequeueInterval time.Duration // Default to 10 seconds
}
// Routines returns the goroutines that run the Sync at the specified interval.
func (s *Syncer) Routines(ctx context.Context, store Store, opts RunOptions) []goroutine.BackgroundRoutine {
if opts.EnqueueInterval == nil {
opts.EnqueueInterval = func() time.Duration { return time.Minute }
}
if opts.MinSyncInterval == nil {
opts.MinSyncInterval = func() time.Duration { return time.Minute }
}
if opts.DequeueInterval == 0 {
opts.DequeueInterval = 10 * time.Second
}
worker, resetter, syncerJanitor := NewSyncWorker(ctx, observation.ContextWithLogger(s.ObsvCtx.Logger.Scoped("syncWorker"), s.ObsvCtx),
store.Handle(),
&syncHandler{
syncer: s,
store: store,
minSyncInterval: opts.MinSyncInterval,
}, SyncWorkerOptions{
WorkerInterval: opts.DequeueInterval,
NumHandlers: conf.RepoConcurrentExternalServiceSyncers(),
CleanupOldJobs: true,
},
)
scheduler := goroutine.NewPeriodicGoroutine(
actor.WithInternalActor(ctx),
goroutine.HandlerFunc(func(ctx context.Context) error {
if conf.Get().DisableAutoCodeHostSyncs {
return nil
}
if err := store.EnqueueSyncJobs(ctx); err != nil {
return errors.Wrap(err, "enqueueing sync jobs")
}
return nil
}),
goroutine.WithName("repo-updater.repo-sync-scheduler"),
goroutine.WithDescription("enqueues sync jobs for external service sync jobs"),
goroutine.WithIntervalFunc(opts.EnqueueInterval),
)
routines := []goroutine.BackgroundRoutine{worker, resetter, syncerJanitor, scheduler}
return routines
}
type syncHandler struct {
syncer *Syncer
store Store
minSyncInterval func() time.Duration
}
func (s *syncHandler) Handle(ctx context.Context, _ log.Logger, sj *SyncJob) (err error) {
// Limit calls to progressRecorder as it will most likely hit the database
progressLimiter := rate.NewLimiter(rate.Limit(1.0), 1)
progressRecorder := func(ctx context.Context, progress SyncProgress, final bool) error {
if final || progressLimiter.Allow() {
return s.store.ExternalServiceStore().UpdateSyncJobCounters(ctx, &types.ExternalServiceSyncJob{
ID: int64(sj.ID),
ReposSynced: progress.Synced,
RepoSyncErrors: progress.Errors,
ReposAdded: progress.Added,
ReposDeleted: progress.Deleted,
ReposModified: progress.Modified,
ReposUnmodified: progress.Unmodified,
})
}
return nil
}
return s.syncer.SyncExternalService(ctx, sj.ExternalServiceID, s.minSyncInterval(), progressRecorder)
}
// TriggerExternalServiceSync will enqueue a sync job for the supplied external
// service
func (s *Syncer) TriggerExternalServiceSync(ctx context.Context, id int64) error {
return s.Store.EnqueueSingleSyncJob(ctx, id)
}
const (
ownerUndefined = ""
ownerSite = "site"
)
type ErrUnauthorized struct{}
func (e ErrUnauthorized) Error() string {
return "bad credentials"
}
func (e ErrUnauthorized) Unauthorized() bool {
return true
}
type ErrForbidden struct{}
func (e ErrForbidden) Error() string {
return "forbidden"
}
func (e ErrForbidden) Forbidden() bool {
return true
}
type ErrAccountSuspended struct{}
func (e ErrAccountSuspended) Error() string {
return "account suspended"
}
func (e ErrAccountSuspended) AccountSuspended() bool {
return true
}
func (s *Syncer) notifyDeleted(ctx context.Context, deleted ...api.RepoID) {
var d types.RepoSyncDiff
for _, id := range deleted {
d.Deleted = append(d.Deleted, &types.Repo{ID: id})
}
observeDiff(d)
if s.Synced != nil && d.Len() > 0 {
select {
case <-ctx.Done():
case s.Synced <- d:
}
}
}
// SyncProgress represents running counts for an external service sync.
type SyncProgress struct {
Synced int32 `json:"synced,omitempty"`
Errors int32 `json:"errors,omitempty"`
// Diff stats
Added int32 `json:"added,omitempty"`
Removed int32 `json:"removed,omitempty"`
Modified int32 `json:"modified,omitempty"`
Unmodified int32 `json:"unmodified,omitempty"`
Deleted int32 `json:"deleted,omitempty"`
}
type LicenseError struct {
error
}
// progressRecorderFunc is a function that implements persisting sync progress.
// The final param represents whether this is the final call. This allows the
// function to decide whether to drop some intermediate calls.
type progressRecorderFunc func(ctx context.Context, progress SyncProgress, final bool) error
// SyncExternalService syncs repos using the supplied external service in a streaming fashion, rather than batch.
// This allows very large sync jobs (i.e. that source potentially millions of repos) to incrementally persist changes.
// Deletes of repositories that were not sourced are done at the end.
func (s *Syncer) SyncExternalService(
ctx context.Context,
externalServiceID int64,
minSyncInterval time.Duration,
progressRecorder progressRecorderFunc,
) (err error) {
logger := s.ObsvCtx.Logger.Scoped("SyncExternalService").With(log.Int64("externalServiceID", externalServiceID))
logger.Info("syncing external service")
// Ensure the job field is recorded when monitoring external API calls
ctx = metrics.ContextWithTask(ctx, "SyncExternalService")
var svc *types.ExternalService
ctx, save := s.observeSync(ctx, "Syncer.SyncExternalService")
defer func() { save(svc, err) }()
// We don't use tx here as the sourcing process below can be slow and we don't
// want to hold a lock on the external_services table for too long.
svc, err = s.Store.ExternalServiceStore().GetByID(ctx, externalServiceID)
if err != nil {
return errors.Wrap(err, "fetching external services")
}
ctx, cancel := context.WithCancel(ctx)
defer cancel()
// From this point we always want to make a best effort attempt to update the
// service timestamps
var modified bool
defer func() {
now := s.Now()
interval := calcSyncInterval(now, svc.LastSyncAt, minSyncInterval, modified, err)
nextSyncAt := now.Add(interval)
lastSyncAt := now
// We call Update here instead of Upsert, because upsert stores all fields of the external
// service, and syncs take a while so changes to the external service made while this sync
// was running would be overwritten again.
if err := s.Store.ExternalServiceStore().Update(ctx, nil, svc.ID, &database.ExternalServiceUpdate{
LastSyncAt: &lastSyncAt,
NextSyncAt: &nextSyncAt,
}); err != nil {
// We only want to log this error, not return it
logger.Error("upserting external service", log.Error(err))
}
logger.Debug("synced external service", log.Duration("backoff duration", interval))
}()
src, err := s.Sourcer(ctx, svc)
if err != nil {
return err
}
if err := src.CheckConnection(ctx); err != nil {
logger.Warn("connection check failed. syncing repositories might still succeed.", log.Error(err))
}
results := make(chan SourceResult)
go func() {
src.ListRepos(ctx, results)
close(results)
}()
seen := make(map[api.RepoID]struct{})
var errs error
fatal := func(err error) bool {
// If the error is just a warning, then it is not fatal.
if errors.IsWarning(err) && !errcode.IsAccountSuspended(err) {
return false
}
return errcode.IsUnauthorized(err) ||
errcode.IsForbidden(err) ||
errcode.IsAccountSuspended(err)
}
logger = s.ObsvCtx.Logger.With(log.Object("svc", log.String("name", svc.DisplayName), log.Int64("id", svc.ID)))
var syncProgress SyncProgress
// Record the final progress state
defer func() {
// Use a different context here so that we make sure to record progress
// even if context has been canceled
if err := progressRecorder(context.Background(), syncProgress, true); err != nil {
logger.Error("recording final sync progress", log.Error(err))
}
}()
// Insert or update repos as they are sourced. Keep track of what was seen so we
// can remove anything else at the end.
for res := range results {
logger.Debug("received result", log.String("repo", fmt.Sprintf("%v", res)))
if err := progressRecorder(ctx, syncProgress, false); err != nil {
logger.Warn("recording sync progress", log.Error(err))
}
if err := res.Err; err != nil {
syncProgress.Errors++
logger.Error("error from codehost", log.Int("seen", len(seen)), log.Error(err))
errs = errors.Append(errs, errors.Wrapf(err, "fetching from code host %s", svc.DisplayName))
if fatal(err) {
// Delete all external service repos of this external service
logger.Error("stopping external service sync due to fatal error from codehost", log.Error(err))
seen = map[api.RepoID]struct{}{}
break
}
continue
}
sourced := res.Repo
if dotcom.SourcegraphDotComMode() && sourced.Private {
err := errors.Newf("%s is private, but dotcom does not support private repositories.", sourced.Name)
syncProgress.Errors++
logger.Error("failed to sync private repo", log.String("repo", string(sourced.Name)), log.Error(err))
errs = errors.Append(errs, err)
continue
}
var diff types.RepoSyncDiff
if diff, err = s.sync(ctx, svc, sourced); err != nil {
syncProgress.Errors++
logger.Error("failed to sync, skipping", log.String("repo", string(sourced.Name)), log.Error(err))
errs = errors.Append(errs, err)
continue
}
syncProgress.Added += int32(diff.Added.Len())
syncProgress.Removed += int32(diff.Deleted.Len())
syncProgress.Modified += int32(diff.Modified.Repos().Len())
syncProgress.Unmodified += int32(diff.Unmodified.Len())
for _, r := range diff.Repos() {
seen[r.ID] = struct{}{}
}
syncProgress.Synced = int32(len(seen))
modified = modified || len(diff.Modified)+len(diff.Added) > 0
}
// This information isn't necessarily an error in and of itself, but it's possible
// for the code host to misbehave in a way that returns a legitimate-looking response
// (e.g. a non-fatal HTTP status code, with a well-formed response body) that is
// nonetheless incorrect (e.g. temporarily returning empty set of repositories
// instead of an error).
//
// Because of this, we have to be able to log every response from the code host
// so that we can audit them later so that we can rule in/out the above scenario.
// So, we choose the WARN level instead of INFO so that this info is logged by default.
logger.Warn("finished listing repositories from external service",
log.Object("syncProgress",
log.Int32("synced", syncProgress.Synced),
log.Int32("errors", syncProgress.Errors),
log.Int32("added", syncProgress.Added),
log.Int32("removed", syncProgress.Removed),
log.Int32("modified", syncProgress.Modified),
log.Int32("unmodified", syncProgress.Unmodified),
),
log.Int("seen", len(seen)),
log.Bool("modified", modified),
log.Error(errs),
)
// We don't delete any repos of site-level external services if there were any
// non-warning errors during a sync.
//
// Only user or organization external services will delete
// repos in a sync run with fatal errors.
//
// Site-level external services can own lots of repos and are managed by site admins.
// It's preferable to have them fix any invalidated token manually rather than deleting the repos automatically.
deleted := 0
// If all of our errors are warnings and either Forbidden or Unauthorized,
// we want to proceed with the deletion. This is to be able to properly sync
// repos (by removing ones if code-host permissions have changed).
abortDeletion := false
if errs != nil {
logger.Error("received errors during sync", log.Error(errs))
var ref errors.MultiError
if errors.As(errs, &ref) {
for _, e := range ref.Errors() {
if errors.IsWarning(e) {
baseError := errors.Unwrap(e)
if !errcode.IsForbidden(baseError) && !errcode.IsUnauthorized(baseError) {
abortDeletion = true
logger.Info("aborting deletion due to fatal error", log.Error(e))
break
}
continue
}
if errors.As(e, &LicenseError{}) {
continue
}
abortDeletion = true
logger.Info("aborting deletion due to fatal error", log.Error(e))
break
}
}
}
if !abortDeletion {
// Remove associations and any repos that are no longer associated with any
// external service.
//
// We don't want to delete all repos that weren't seen if we had a lot of
// spurious errors since that could cause lots of repos to be deleted, only to be
// added the next sync. We delete only if we had no errors,
// or all of our errors are warnings and either Forbidden or Unauthorized,
// or we had one of the fatal errors and the service is not site owned.
var deletedErr error
deleted, deletedErr = s.delete(ctx, svc, seen)
if deletedErr != nil {
logger.Warn("failed to delete some repos",
log.Int("seen", len(seen)),
log.Int("deleted", deleted),
log.Error(deletedErr),
)
errs = errors.Append(errs, errors.Wrap(deletedErr, "some repos couldn't be deleted"))
}
if deleted > 0 {
syncProgress.Deleted += int32(deleted)
logger.Warn("deleted not seen repos",
log.Int("seen", len(seen)),
log.Int("deleted", deleted),
log.Error(err),
)
}
}
modified = modified || deleted > 0
return errs
}
// syncs a sourced repo of a given external service, returning a diff with a single repo.
func (s *Syncer) sync(ctx context.Context, svc *types.ExternalService, sourced *types.Repo) (d types.RepoSyncDiff, err error) {
tx, err := s.Store.Transact(ctx)
if err != nil {
return types.RepoSyncDiff{}, errors.Wrap(err, "syncer: opening transaction")
}
defer func() {
observeDiff(d)
// We must commit the transaction before publishing to s.Synced
// so that gitserver finds the repo in the database.
s.ObsvCtx.Logger.Debug("committing transaction")
err = tx.Done(err)
if err != nil {
s.ObsvCtx.Logger.Warn("failed to commit transaction", log.Error(err))
return
}
if s.Synced != nil && d.Len() > 0 {
select {
case <-ctx.Done():
s.ObsvCtx.Logger.Debug("sync context done")
case s.Synced <- d:
s.ObsvCtx.Logger.Debug("diff synced")
}
}
}()
stored, err := tx.RepoStore().List(ctx, database.ReposListOptions{
Names: []string{string(sourced.Name)},
ExternalRepos: []api.ExternalRepoSpec{sourced.ExternalRepo},
IncludeBlocked: true,
IncludeDeleted: true,
UseOr: true,
})
if err != nil {
return types.RepoSyncDiff{}, errors.Wrap(err, "syncer: getting repo from the database")
}
switch len(stored) {
case 2: // Existing repo with a naming conflict
// Scenario where this can happen:
// 1. Repo `owner/repo1` with external_id 1 exists
// 2. Repo `owner/repo2` with external_id 2 exists
// 3. The owner deletes repo1, and renames repo2 to repo1
// 4. We sync and we receive `owner/repo1` with external_id 2
//
// Then the above query will return two results: one matching the name owner/repo1, and one matching the external_service_id 2
// The original owner/repo1 should be deleted, and then owner/repo2 with the matching external_service_id should be updated
s.ObsvCtx.Logger.Debug("naming conflict")
// Pick this sourced repo to own the name by deleting the other repo. If it still exists, it'll have a different
// name when we source it from the same code host, and it will be re-created.
var conflicting, existing *types.Repo
for _, r := range stored {
if r.ExternalRepo.Equal(&sourced.ExternalRepo) {
existing = r
} else {
conflicting = r
}
}
// invariant: conflicting can't be nil due to our database constraints
if err = tx.RepoStore().Delete(ctx, conflicting.ID); err != nil {
return types.RepoSyncDiff{}, errors.Wrap(err, "syncer: failed to delete conflicting repo")
}
// We fallthrough to the next case after removing the conflicting repo in order to update
// the winner (i.e. existing). This works because we mutate stored to contain it, which the case expects.
stored = types.Repos{existing}
s.ObsvCtx.Logger.Debug("retrieved stored repo, falling through", log.String("stored", fmt.Sprintf("%v", stored)))
fallthrough
case 1: // Existing repo, update.
wasDeleted := !stored[0].DeletedAt.IsZero()
s.ObsvCtx.Logger.Debug("existing repo")
if err := UpdateRepoLicenseHook(ctx, tx, stored[0], sourced); err != nil {
return types.RepoSyncDiff{}, LicenseError{errors.Wrapf(err, "syncer: failed to update repo %s", sourced.Name)}
}
modified := stored[0].Update(sourced)
if modified == types.RepoUnmodified {
d.Unmodified = append(d.Unmodified, stored[0])
break
}
if err = tx.UpdateExternalServiceRepo(ctx, svc, stored[0]); err != nil {
return types.RepoSyncDiff{}, errors.Wrap(err, "syncer: failed to update external service repo")
}
*sourced = *stored[0]
if wasDeleted {
d.Added = append(d.Added, stored[0])
s.ObsvCtx.Logger.Debug("revived soft-deleted repo")
} else {
d.Modified = append(d.Modified, types.RepoModified{Repo: stored[0], Modified: modified})
s.ObsvCtx.Logger.Debug("appended to modified repos")
}
case 0: // New repo, create.
s.ObsvCtx.Logger.Debug("new repo")
if err := CreateRepoLicenseHook(ctx, tx, sourced); err != nil {
return types.RepoSyncDiff{}, LicenseError{errors.Wrapf(err, "syncer: failed to update repo %s", sourced.Name)}
}
if err = tx.CreateExternalServiceRepo(ctx, svc, sourced); err != nil {
return types.RepoSyncDiff{}, errors.Wrapf(err, "syncer: failed to create external service repo: %s", sourced.Name)
}
d.Added = append(d.Added, sourced)
s.ObsvCtx.Logger.Debug("appended to added repos")
default: // Impossible since we have two separate unique constraints on name and external repo spec
panic("unreachable")
}
s.ObsvCtx.Logger.Debug("completed")
return d, nil
}
// CreateRepoLicenseHook checks if there is still room for private repositories
// available in the applied license before creating a new private repository.
func CreateRepoLicenseHook(ctx context.Context, s Store, repo *types.Repo) error {
// If the repository is public, we don't have to check anything
if !repo.Private {
return nil
}
if prFeature := (&licensing.FeaturePrivateRepositories{}); licensing.Check(prFeature) == nil {
if prFeature.Unrestricted {
return nil
}
numPrivateRepos, err := s.RepoStore().Count(ctx, database.ReposListOptions{OnlyPrivate: true})
if err != nil {
return err
}
if numPrivateRepos >= prFeature.MaxNumPrivateRepos {
return errors.Newf("maximum number of private repositories included in license (%d) reached", prFeature.MaxNumPrivateRepos)
}
return nil
}
return licensing.NewFeatureNotActivatedError("The private repositories feature is not activated for this license. Please upgrade your license to use this feature.")
}
// UpdateRepoLicenseHook checks if there is still room for private repositories
// available in the applied license before updating a repository from public to private,
// or undeleting a private repository.
func UpdateRepoLicenseHook(ctx context.Context, s Store, existingRepo *types.Repo, newRepo *types.Repo) error {
// If it is being updated to a public repository, or if a repository is being deleted, we don't have to check anything
if !newRepo.Private || !newRepo.DeletedAt.IsZero() {
return nil
}
if prFeature := (&licensing.FeaturePrivateRepositories{}); licensing.Check(prFeature) == nil {
if prFeature.Unrestricted {
return nil
}
numPrivateRepos, err := s.RepoStore().Count(ctx, database.ReposListOptions{OnlyPrivate: true})
if err != nil {
return err
}
if numPrivateRepos > prFeature.MaxNumPrivateRepos {
return errors.Newf("maximum number of private repositories included in license (%d) reached", prFeature.MaxNumPrivateRepos)
} else if numPrivateRepos == prFeature.MaxNumPrivateRepos {
// If the repository is already private, we don't have to check anything
newPrivateRepo := (!existingRepo.DeletedAt.IsZero() || !existingRepo.Private) && newRepo.Private // If restoring a deleted repository, or if it was a public repository, and is now private
if newPrivateRepo {
return errors.Newf("maximum number of private repositories included in license (%d) reached", prFeature.MaxNumPrivateRepos)
}
}
return nil
}
return licensing.NewFeatureNotActivatedError("The private repositories feature is not activated for this license. Please upgrade your license to use this feature.")
}
func (s *Syncer) delete(ctx context.Context, svc *types.ExternalService, seen map[api.RepoID]struct{}) (int, error) {
// We do deletion in a best effort manner, returning any errors for individual repos that failed to be deleted.
deleted, err := s.Store.DeleteExternalServiceReposNotIn(ctx, svc, seen)
s.notifyDeleted(ctx, deleted...)
return len(deleted), err
}
func observeDiff(diff types.RepoSyncDiff) {
for state, repos := range map[string]types.Repos{
"added": diff.Added,
"modified": diff.Modified.Repos(),
"deleted": diff.Deleted,
"unmodified": diff.Unmodified,
} {
syncedTotal.WithLabelValues(state).Add(float64(len(repos)))
}
}
func calcSyncInterval(
now time.Time,
lastSync time.Time,
minSyncInterval time.Duration,
modified bool,
err error,
) time.Duration {
const maxSyncInterval = 8 * time.Hour
// Special case, we've never synced
if err == nil && (lastSync.IsZero() || modified) {
return minSyncInterval
}
// No change or there were errors, back off
interval := now.Sub(lastSync) * 2
if interval < minSyncInterval {
return minSyncInterval
}
if interval > maxSyncInterval {
return maxSyncInterval
}
return interval
}
func (s *Syncer) observeSync(
ctx context.Context,
name string,
attributes ...attribute.KeyValue,
) (context.Context, func(*types.ExternalService, error)) {
began := s.Now()
tr, ctx := trace.New(ctx, name, attributes...)
return ctx, func(svc *types.ExternalService, err error) {
var owner string
if svc == nil {
owner = ownerUndefined
} else {
owner = ownerSite
}
syncStarted.WithLabelValues(name, owner).Inc()
now := s.Now()
took := now.Sub(began).Seconds()
lastSync.WithLabelValues(name).Set(float64(now.Unix()))
success := err == nil
syncDuration.WithLabelValues(strconv.FormatBool(success), name).Observe(took)
if !success {
tr.SetError(err)
syncErrors.WithLabelValues(name, owner, syncErrorReason(err)).Inc()
}
tr.End()
}
}
func syncErrorReason(err error) string {
switch {
case err == nil:
return ""
case errcode.IsNotFound(err):
return "not_found"
case errcode.IsUnauthorized(err):
return "unauthorized"
case errcode.IsForbidden(err):
return "forbidden"
case errcode.IsTemporary(err):
return "temporary"
case strings.Contains(err.Error(), "expected path in npm/(scope/)?name"):
// This is a known issue which we can filter out for now
return "invalid_npm_path"
case strings.Contains(err.Error(), "internal rate limit exceeded"):
// We want to identify these as it's not an issue communicating with the code
// host and is most likely caused by temporary traffic spikes.
return "internal_rate_limit"
default:
return "unknown"
}
}