-
Notifications
You must be signed in to change notification settings - Fork 552
/
Copy pathdeployment.go
335 lines (287 loc) · 12.4 KB
/
deployment.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
package install
import (
"fmt"
"hash/fnv"
log "github.com/sirupsen/logrus"
appsv1 "k8s.io/api/apps/v1"
k8serrors "k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/util/rand"
"github.com/operator-framework/api/pkg/operators/v1alpha1"
"github.com/operator-framework/operator-lifecycle-manager/pkg/api/wrappers"
hashutil "github.com/operator-framework/operator-lifecycle-manager/pkg/lib/kubernetes/pkg/util/hash"
"github.com/operator-framework/operator-lifecycle-manager/pkg/lib/ownerutil"
)
const DeploymentSpecHashLabelKey = "olm.deployment-spec-hash"
type StrategyDeploymentInstaller struct {
strategyClient wrappers.InstallStrategyDeploymentInterface
owner ownerutil.Owner
previousStrategy Strategy
templateAnnotations map[string]string
initializers DeploymentInitializerFuncChain
apiServiceDescriptions []certResource
webhookDescriptions []certResource
}
var _ Strategy = &v1alpha1.StrategyDetailsDeployment{}
var _ StrategyInstaller = &StrategyDeploymentInstaller{}
// DeploymentInitializerFunc takes a deployment object and appropriately
// initializes it for install.
//
// Before a deployment is created on the cluster, we can run a series of
// overrides functions that will properly initialize the deployment object.
type DeploymentInitializerFunc func(deployment *appsv1.Deployment) error
// DeploymentInitializerFuncChain defines a chain of DeploymentInitializerFunc.
type DeploymentInitializerFuncChain []DeploymentInitializerFunc
// Apply runs series of overrides functions that will properly initialize
// the deployment object.
func (c DeploymentInitializerFuncChain) Apply(deployment *appsv1.Deployment) (err error) {
for _, initializer := range c {
if initializer == nil {
continue
}
if initializationErr := initializer(deployment); initializationErr != nil {
err = initializationErr
break
}
}
return
}
// DeploymentInitializerBuilderFunc returns a DeploymentInitializerFunc based on
// the given context.
type DeploymentInitializerBuilderFunc func(owner ownerutil.Owner) DeploymentInitializerFunc
func NewStrategyDeploymentInstaller(strategyClient wrappers.InstallStrategyDeploymentInterface, templateAnnotations map[string]string, owner ownerutil.Owner, previousStrategy Strategy, initializers DeploymentInitializerFuncChain, apiServiceDescriptions []v1alpha1.APIServiceDescription, webhookDescriptions []v1alpha1.WebhookDescription) StrategyInstaller {
apiDescs := make([]certResource, len(apiServiceDescriptions))
for i := range apiServiceDescriptions {
apiDescs[i] = &apiServiceDescriptionsWithCAPEM{apiServiceDescriptions[i], []byte{}}
}
webhookDescs := make([]certResource, len(webhookDescriptions))
for i := range webhookDescs {
webhookDescs[i] = &webhookDescriptionWithCAPEM{webhookDescriptions[i], []byte{}}
}
return &StrategyDeploymentInstaller{
strategyClient: strategyClient,
owner: owner,
previousStrategy: previousStrategy,
templateAnnotations: templateAnnotations,
initializers: initializers,
apiServiceDescriptions: apiDescs,
webhookDescriptions: webhookDescs,
}
}
func (i *StrategyDeploymentInstaller) installDeployments(deps []v1alpha1.StrategyDeploymentSpec) error {
for _, d := range deps {
deployment, _, err := i.deploymentForSpec(d.Name, d.Spec)
if err != nil {
return err
}
if _, err := i.strategyClient.CreateOrUpdateDeployment(deployment); err != nil {
return err
}
if err := i.createOrUpdateCertResourcesForDeployment(deployment.GetName()); err != nil {
return err
}
}
return nil
}
func (i *StrategyDeploymentInstaller) createOrUpdateCertResourcesForDeployment(deploymentName string) error {
for _, desc := range i.getCertResources() {
switch d := desc.(type) {
case *apiServiceDescriptionsWithCAPEM:
err := i.createOrUpdateAPIService(d.caPEM, d.apiServiceDescription)
if err != nil {
return err
}
// Cleanup legacy APIService resources
err = i.deleteLegacyAPIServiceResources(*d)
if err != nil {
return err
}
case *webhookDescriptionWithCAPEM:
err := i.createOrUpdateWebhook(d.caPEM, d.webhookDescription)
if err != nil {
return err
}
default:
return fmt.Errorf("Unsupported CA Resource")
}
}
return nil
}
func (i *StrategyDeploymentInstaller) deploymentForSpec(name string, spec appsv1.DeploymentSpec) (deployment *appsv1.Deployment, hash string, err error) {
dep := &appsv1.Deployment{Spec: spec}
dep.SetName(name)
dep.SetNamespace(i.owner.GetNamespace())
// Merge annotations (to avoid losing info from pod template)
annotations := map[string]string{}
for k, v := range i.templateAnnotations {
annotations[k] = v
}
for k, v := range dep.Spec.Template.GetAnnotations() {
annotations[k] = v
}
dep.Spec.Template.SetAnnotations(annotations)
ownerutil.AddNonBlockingOwner(dep, i.owner)
ownerutil.AddOwnerLabelsForKind(dep, i.owner, v1alpha1.ClusterServiceVersionKind)
if applyErr := i.initializers.Apply(dep); applyErr != nil {
err = applyErr
return
}
// OLM does not support Rollbacks.
// By default, each deployment created by OLM could spawn up to 10 replicaSets.
// By setting the deployments revisionHistoryLimit to 1, OLM will only create up
// to 2 ReplicaSets per deployment it manages, saving memory.
revisionHistoryLimit := int32(1)
dep.Spec.RevisionHistoryLimit = &revisionHistoryLimit
hash = HashDeploymentSpec(dep.Spec)
dep.Labels[DeploymentSpecHashLabelKey] = hash
deployment = dep
return
}
func (i *StrategyDeploymentInstaller) cleanupPrevious(current *v1alpha1.StrategyDetailsDeployment, previous *v1alpha1.StrategyDetailsDeployment) error {
previousDeploymentsMap := map[string]struct{}{}
for _, d := range previous.DeploymentSpecs {
previousDeploymentsMap[d.Name] = struct{}{}
}
for _, d := range current.DeploymentSpecs {
delete(previousDeploymentsMap, d.Name)
}
log.Debugf("preparing to cleanup: %s", previousDeploymentsMap)
// delete deployments in old strategy but not new
var err error = nil
for name := range previousDeploymentsMap {
err = i.strategyClient.DeleteDeployment(name)
}
return err
}
func (i *StrategyDeploymentInstaller) Install(s Strategy) error {
strategy, ok := s.(*v1alpha1.StrategyDetailsDeployment)
if !ok {
return fmt.Errorf("attempted to install %s strategy with deployment installer", strategy.GetStrategyName())
}
// Install owned APIServices and update strategy with serving cert data
updatedStrategy, err := i.installCertRequirements(strategy)
if err != nil {
return err
}
if err := i.installDeployments(updatedStrategy.DeploymentSpecs); err != nil {
if k8serrors.IsForbidden(err) {
return StrategyError{Reason: StrategyErrInsufficientPermissions, Message: fmt.Sprintf("install strategy failed: %s", err)}
}
return err
}
// Clean up orphaned deployments
return i.cleanupOrphanedDeployments(updatedStrategy.DeploymentSpecs)
}
// CheckInstalled can return nil (installed), or errors
// Errors can indicate: some component missing (keep installing), unable to query (check again later), or unrecoverable (failed in a way we know we can't recover from)
func (i *StrategyDeploymentInstaller) CheckInstalled(s Strategy) (installed bool, err error) {
strategy, ok := s.(*v1alpha1.StrategyDetailsDeployment)
if !ok {
return false, StrategyError{Reason: StrategyErrReasonInvalidStrategy, Message: fmt.Sprintf("attempted to check %s strategy with deployment installer", strategy.GetStrategyName())}
}
// Check deployments
if err := i.checkForDeployments(strategy.DeploymentSpecs); err != nil {
return false, err
}
return true, nil
}
func (i *StrategyDeploymentInstaller) checkForDeployments(deploymentSpecs []v1alpha1.StrategyDeploymentSpec) error {
var depNames []string
for _, dep := range deploymentSpecs {
depNames = append(depNames, dep.Name)
}
// Check the owner is a CSV
csv, ok := i.owner.(*v1alpha1.ClusterServiceVersion)
if !ok {
return StrategyError{Reason: StrategyErrReasonComponentMissing, Message: fmt.Sprintf("owner %s is not a CSV", i.owner.GetName())}
}
existingDeployments, err := i.strategyClient.FindAnyDeploymentsMatchingLabels(ownerutil.CSVOwnerSelector(csv))
if err != nil {
return StrategyError{Reason: StrategyErrReasonComponentMissing, Message: fmt.Sprintf("error querying existing deployments for CSV %s: %s", csv.GetName(), err)}
}
// compare deployments to see if any need to be created/updated
existingMap := map[string]*appsv1.Deployment{}
for _, d := range existingDeployments {
existingMap[d.GetName()] = d
}
for _, spec := range deploymentSpecs {
dep, exists := existingMap[spec.Name]
if !exists {
log.Debugf("missing deployment with name=%s", spec.Name)
return StrategyError{Reason: StrategyErrReasonComponentMissing, Message: fmt.Sprintf("missing deployment with name=%s", spec.Name)}
}
reason, ready, err := DeploymentStatus(dep)
if err != nil {
log.Debugf("deployment %s not ready before timeout: %s", dep.Name, err.Error())
return StrategyError{Reason: StrategyErrReasonTimeout, Message: fmt.Sprintf("deployment %s not ready before timeout: %s", dep.Name, err.Error())}
}
if !ready {
return StrategyError{Reason: StrategyErrReasonWaiting, Message: fmt.Sprintf("waiting for deployment %s to become ready: %s", dep.Name, reason)}
}
// check annotations
if len(i.templateAnnotations) > 0 && dep.Spec.Template.Annotations == nil {
return StrategyError{Reason: StrategyErrReasonAnnotationsMissing, Message: fmt.Sprintf("no annotations found on deployment")}
}
for key, value := range i.templateAnnotations {
if actualValue, ok := dep.Spec.Template.Annotations[key]; !ok {
return StrategyError{Reason: StrategyErrReasonAnnotationsMissing, Message: fmt.Sprintf("annotations on deployment does not contain expected key: %s", key)}
} else if dep.Spec.Template.Annotations[key] != value {
return StrategyError{Reason: StrategyErrReasonAnnotationsMissing, Message: fmt.Sprintf("unexpected annotation on deployment. Expected %s:%s, found %s:%s", key, value, key, actualValue)}
}
}
// check that the deployment spec hasn't changed since it was created
labels := dep.GetLabels()
if len(labels) == 0 {
return StrategyError{Reason: StrategyErrDeploymentUpdated, Message: fmt.Sprintf("deployment doesn't have a spec hash, update it")}
}
existingDeploymentSpecHash, ok := labels[DeploymentSpecHashLabelKey]
if !ok {
return StrategyError{Reason: StrategyErrDeploymentUpdated, Message: fmt.Sprintf("deployment doesn't have a spec hash, update it")}
}
_, calculatedDeploymentHash, err := i.deploymentForSpec(spec.Name, spec.Spec)
if err != nil {
return StrategyError{Reason: StrategyErrDeploymentUpdated, Message: fmt.Sprintf("couldn't calculate deployment spec hash: %v", err)}
}
if existingDeploymentSpecHash != calculatedDeploymentHash {
return StrategyError{Reason: StrategyErrDeploymentUpdated, Message: fmt.Sprintf("deployment changed old hash=%s, new hash=%s", existingDeploymentSpecHash, calculatedDeploymentHash)}
}
}
return nil
}
// Clean up orphaned deployments after reinstalling deployments process
func (i *StrategyDeploymentInstaller) cleanupOrphanedDeployments(deploymentSpecs []v1alpha1.StrategyDeploymentSpec) error {
// Map of deployments
depNames := map[string]string{}
for _, dep := range deploymentSpecs {
depNames[dep.Name] = dep.Name
}
// Check the owner is a CSV
csv, ok := i.owner.(*v1alpha1.ClusterServiceVersion)
if !ok {
return fmt.Errorf("owner %s is not a CSV", i.owner.GetName())
}
// Get existing deployments in CSV's namespace and owned by CSV
existingDeployments, err := i.strategyClient.FindAnyDeploymentsMatchingLabels(ownerutil.CSVOwnerSelector(csv))
if err != nil {
return err
}
// compare existing deployments to deployments in CSV's spec to see if any need to be deleted
for _, d := range existingDeployments {
if _, exists := depNames[d.GetName()]; !exists {
if ownerutil.IsOwnedBy(d, i.owner) {
log.Infof("found an orphaned deployment %s in namespace %s", d.GetName(), i.owner.GetNamespace())
if err := i.strategyClient.DeleteDeployment(d.GetName()); err != nil {
log.Warnf("error cleaning up deployment %s", d.GetName())
return err
}
}
}
}
return nil
}
// HashDeploymentSpec calculates a hash given a copy of the deployment spec from a CSV, stripping any
// operatorgroup annotations.
func HashDeploymentSpec(spec appsv1.DeploymentSpec) string {
hasher := fnv.New32a()
hashutil.DeepHashObject(hasher, &spec)
return rand.SafeEncodeString(fmt.Sprint(hasher.Sum32()))
}