Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

argocd declarative call #6456

Open
wants to merge 9 commits into
base: develop
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
148 changes: 56 additions & 92 deletions client/argocdServer/ArgoClientWrapperService.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,15 +37,12 @@ import (
bean2 "github.com/devtron-labs/devtron/client/argocdServer/repoCredsK8sClient/bean"
repocreds2 "github.com/devtron-labs/devtron/client/argocdServer/repocreds"
"github.com/devtron-labs/devtron/client/argocdServer/repository"
"github.com/devtron-labs/devtron/internal/constants"
"github.com/devtron-labs/devtron/internal/util"
"github.com/devtron-labs/devtron/pkg/deployment/gitOps/config"
"github.com/devtron-labs/devtron/pkg/deployment/gitOps/git"
"github.com/devtron-labs/devtron/util/retryFunc"
"go.opentelemetry.io/otel"
"github.com/pkg/errors"
"go.uber.org/zap"
"google.golang.org/grpc"
"strconv"
"strings"
"time"
)
Expand All @@ -54,6 +51,8 @@ type ACDConfig struct {
ArgoCDAutoSyncEnabled bool `env:"ARGO_AUTO_SYNC_ENABLED" envDefault:"true"` // will gradually switch this flag to false in enterprise
RegisterRepoMaxRetryCount int `env:"ARGO_REPO_REGISTER_RETRY_COUNT" envDefault:"3"`
RegisterRepoMaxRetryDelay int `env:"ARGO_REPO_REGISTER_RETRY_DELAY" envDefault:"10"`
SyncRetryCount int `env:"ARGO_SYNC_RETRY_COUNT" envDefault:"1"`
SyncDelaySeconds int `env:"ARGO_SYNC_DELAY_SECONDS" envDefault:"2"`
}

func (config *ACDConfig) IsManualSyncEnabled() bool {
Expand Down Expand Up @@ -90,9 +89,7 @@ type ApplicationClientWrapper interface {

DeleteArgoAppWithK8sClient(ctx context.Context, clusterId int, namespace, appName string, cascadeDelete bool) error

// SyncArgoCDApplicationIfNeededAndRefresh - if ARGO_AUTO_SYNC_ENABLED=true, app will be refreshed to initiate refresh at argoCD side or else it will be synced and refreshed
SyncArgoCDApplicationIfNeededAndRefresh(ctx context.Context, argoAppName, targetRevision string) error

SyncArgoCDApplicationAndRefreshWithK8sClient(ctx context.Context, clusterId int, namespace, appName string) error
// UpdateArgoCDSyncModeIfNeeded - if ARGO_AUTO_SYNC_ENABLED=true and app is in manual sync mode or vice versa update app
UpdateArgoCDSyncModeIfNeeded(ctx context.Context, argoApplication *v1alpha1.Application) (err error)

Expand Down Expand Up @@ -242,85 +239,6 @@ func (impl *ArgoClientWrapperServiceImpl) DeleteArgoApp(ctx context.Context, app
return impl.acdApplicationClient.Delete(ctx, grpcConfig, req)
}

func (impl *ArgoClientWrapperServiceImpl) SyncArgoCDApplicationIfNeededAndRefresh(ctx context.Context, argoAppName, targetRevision string) error {

newCtx, span := otel.Tracer("orchestrator").Start(ctx, "ArgoClientWrapperServiceImpl.SyncArgoCDApplicationIfNeededAndRefresh")
defer span.End()
impl.logger.Info("ArgoCd manual sync for app started", "argoAppName", argoAppName)

grpcConfig, err := impl.acdConfigGetter.GetGRPCConfig()
if err != nil {
impl.logger.Errorw("error in getting grpc config", "err", err)
return err
}

if impl.ACDConfig.IsManualSyncEnabled() {

impl.logger.Debugw("syncing ArgoCd app as manual sync is enabled", "argoAppName", argoAppName)
pruneResources := true
_, syncErr := impl.acdApplicationClient.Sync(newCtx, grpcConfig, &application2.ApplicationSyncRequest{Name: &argoAppName,
Revision: &targetRevision,
Prune: &pruneResources,
})
if syncErr != nil {
impl.logger.Errorw("error in syncing argoCD app", "app", argoAppName, "err", syncErr)
statusCode, msg := util.GetClientDetailedError(syncErr)
if statusCode.IsFailedPreconditionCode() && msg == ErrorOperationAlreadyInProgress {
impl.logger.Info("terminating ongoing sync operation and retrying manual sync", "argoAppName", argoAppName)
_, terminationErr := impl.acdApplicationClient.TerminateOperation(newCtx, grpcConfig, &application2.OperationTerminateRequest{
Name: &argoAppName,
})
if terminationErr != nil {
impl.logger.Errorw("error in terminating sync operation")
return fmt.Errorf("error in terminating existing sync, err: %w", terminationErr)
}
_, syncErr = impl.acdApplicationClient.Sync(newCtx, grpcConfig, &application2.ApplicationSyncRequest{Name: &argoAppName,
Revision: &targetRevision,
Prune: &pruneResources,
RetryStrategy: &v1alpha1.RetryStrategy{
Limit: 1,
},
})
if syncErr != nil {
impl.logger.Errorw("error in syncing argoCD app", "app", argoAppName, "err", syncErr)
return syncErr
}
} else {
return syncErr
}
}
impl.logger.Infow("ArgoCd sync completed", "argoAppName", argoAppName)
}

runnableFunc := func() {
// running ArgoCd app refresh in asynchronous mode
refreshErr := impl.GetArgoAppWithNormalRefresh(context.Background(), grpcConfig, argoAppName)
if refreshErr != nil {
impl.logger.Errorw("error in refreshing argo app", "argoAppName", argoAppName, "err", refreshErr)
}
}
impl.asyncRunnable.Execute(runnableFunc)
return nil
}

func (impl *ArgoClientWrapperServiceImpl) GetArgoAppWithNormalRefresh(ctx context.Context, grpcConfig *bean.ArgoGRPCConfig, argoAppName string) error {
newCtx, span := otel.Tracer("orchestrator").Start(ctx, "ArgoClientWrapperServiceImpl.GetArgoAppWithNormalRefresh")
defer span.End()
refreshType := bean.RefreshTypeNormal
impl.logger.Debugw("trying to normal refresh application through get ", "argoAppName", argoAppName)
_, err := impl.acdApplicationClient.Get(newCtx, grpcConfig, &application2.ApplicationQuery{Name: &argoAppName, Refresh: &refreshType})
if err != nil {
internalMsg := fmt.Sprintf("%s, err:- %s", constants.CannotGetAppWithRefreshErrMsg, err.Error())
clientCode, _ := util.GetClientDetailedError(err)
httpStatusCode := clientCode.GetHttpStatusCodeForGivenGrpcCode()
err = &util.ApiError{HttpStatusCode: httpStatusCode, Code: strconv.Itoa(httpStatusCode), InternalMessage: internalMsg, UserMessage: err.Error()}
impl.logger.Errorw("cannot get application with refresh", "app", argoAppName)
return err
}
impl.logger.Debugw("done getting the application with refresh with no error", "argoAppName", argoAppName)
return nil
}

func (impl *ArgoClientWrapperServiceImpl) UpdateArgoCDSyncModeIfNeeded(ctx context.Context, argoApplication *v1alpha1.Application) (err error) {
if isArgoAppSyncModeMigrationNeeded(argoApplication, impl.ACDConfig) {

Expand Down Expand Up @@ -418,12 +336,7 @@ func (impl *ArgoClientWrapperServiceImpl) GetArgoAppByNameWithK8sClient(ctx cont
impl.logger.Errorw("err in getting argo app by name", "app", appName)
return nil, err
}
application, err := GetAppObject(argoApplication)
if err != nil {
impl.logger.Errorw("error in getting app object", "deploymentAppName", appName, "err", err)
return nil, err
}
return application, nil
return argoApplication, nil
}

func (impl *ArgoClientWrapperServiceImpl) DeleteArgoAppWithK8sClient(ctx context.Context, clusterId int, namespace, appName string, cascadeDelete bool) error {
Expand All @@ -440,6 +353,57 @@ func (impl *ArgoClientWrapperServiceImpl) DeleteArgoAppWithK8sClient(ctx context
return nil
}

func (impl *ArgoClientWrapperServiceImpl) SyncArgoCDApplicationAndRefreshWithK8sClient(ctx context.Context, clusterId int, namespace, appName string) error {
k8sConfig, err := impl.acdConfigGetter.GetK8sConfigWithClusterIdAndNamespace(clusterId, namespace)
if err != nil {
impl.logger.Errorw("error in getting k8s config", "err", err)
return err
}
if impl.ACDConfig.IsManualSyncEnabled() {
impl.logger.Debugw("syncing ArgoCd app using k8s as manual sync is enabled", "argoAppName", appName)
err = impl.argoK8sClient.SyncArgoApplication(ctx, k8sConfig, appName)
if err != nil {
if errors.Is(err, ErrAnotherOperationInProgress) {
impl.logger.Infow("Another operation is already in progress, terminating app")
err = impl.argoK8sClient.TerminateApp(ctx, k8sConfig, appName)
if err != nil && !errors.Is(err, ErrNoOperationInProgress) {
impl.logger.Errorw("error in terminating argo application", "appName", appName, "err", err)
return err
}
callback := func(int) error {
err = impl.argoK8sClient.SyncArgoApplication(ctx, k8sConfig, appName)
if err != nil {
return err
}
return nil
}
shouldRetry := func(err error) bool {
return errors.Is(err, ErrAnotherOperationInProgress)
}
err = retryFunc.Retry(callback,
shouldRetry,
impl.ACDConfig.SyncRetryCount,
time.Second*time.Duration(impl.ACDConfig.SyncDelaySeconds),
impl.logger,
)
return err
}
impl.logger.Errorw("err in syncing argo application", "app", appName, "err", err)
return err
}
}
runnableFunc := func() {
impl.logger.Debugw("syncing ArgoCd app using k8s as manual sync is enabled", "argoAppName", appName)
// running ArgoCd app refresh in asynchronous mode
refreshErr := impl.argoK8sClient.RefreshApp(context.Background(), k8sConfig, appName, bean.RefreshTypeNormal)
if refreshErr != nil {
impl.logger.Errorw("error in refreshing argo app", "argoAppName", appName, "err", refreshErr)
}
}
impl.asyncRunnable.Execute(runnableFunc)
return nil
}

func (impl *ArgoClientWrapperServiceImpl) IsArgoAppPatchRequired(argoAppSpec *v1alpha1.ApplicationSource, currentGitRepoUrl, currentTargetRevision, currentChartPath string) bool {
if argoAppSpec == nil {
// if argo app spec is nil, then no need to patch
Expand Down
2 changes: 1 addition & 1 deletion client/argocdServer/ArgoClientWrapperServiceEA.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ func (impl *ArgoClientWrapperServiceEAImpl) DeleteArgoAppWithK8sClient(ctx conte
return nil
}

func (impl *ArgoClientWrapperServiceEAImpl) SyncArgoCDApplicationIfNeededAndRefresh(ctx context.Context, argoAppName, targetRevision string) error {
func (impl *ArgoClientWrapperServiceEAImpl) SyncArgoCDApplicationAndRefreshWithK8sClient(ctx context.Context, clusterId int, namespace, appName string) error {
impl.logger.Info("not implemented")
return nil
}
Expand Down
11 changes: 6 additions & 5 deletions client/argocdServer/bean/bean.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,11 +24,12 @@ import (
)

const (
RefreshTypeNormal = "normal"
TargetRevisionMaster = "master"
TargetRevisionOriginMaster = "origin/master"
PatchTypeMerge = "merge"
TargetRevisionHead = "head"
RefreshTypeNormal = "normal"
TargetRevisionMaster = "master"
TargetRevisionOriginMaster = "origin/master"
PatchTypeMerge = "merge"
TargetRevisionHead = "head"
AnnotationKeyRefresh string = "argocd.argoproj.io/refresh"
)

type ArgoCdAppPatchReqDto struct {
Expand Down
Loading