Skip to content

Commit 31afe80

Browse files
authored
Cluster transfer OCM controller (#571)
* Implement the new cluster transfer controller * Update the logic to get only accepted cluster transfers * Minor fixes after code review
1 parent 4870124 commit 31afe80

20 files changed

+734
-18
lines changed

config/pod.yaml

+2
Original file line numberDiff line numberDiff line change
@@ -15,5 +15,7 @@ pull_report:
1515
ocm:
1616
scaEndpoint: https://api.openshift.com/api/accounts_mgmt/v1/certificates
1717
scaInterval: "8h"
18+
clusterTransferEndpoint: https://api.openshift.com/api/accounts_mgmt/v1/cluster_transfers/
19+
clusterTransferInterval: "24h"
1820
gather:
1921
- ALL

go.mod

+1
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@ go 1.16
44

55
require (
66
github.com/blang/semver/v4 v4.0.0
7+
github.com/evanphx/json-patch v4.12.0+incompatible
78
github.com/go-logr/logr v1.2.2 // indirect
89
github.com/golang/groupcache v0.0.0-20210331224755-41bb18bfe9da
910
github.com/google/go-cmp v0.5.6 // indirect

manifests/03-clusterrole.yaml

+1
Original file line numberDiff line numberDiff line change
@@ -310,6 +310,7 @@ rules:
310310
- support
311311
verbs:
312312
- get
313+
- update
313314
---
314315
apiVersion: rbac.authorization.k8s.io/v1
315316
kind: RoleBinding

pkg/cmd/start/start.go

+4-2
Original file line numberDiff line numberDiff line change
@@ -34,8 +34,10 @@ func NewOperator() *cobra.Command {
3434
ReportMinRetryTime: 10 * time.Second,
3535
ReportPullingTimeout: 30 * time.Minute,
3636
OCMConfig: config.OCMConfig{
37-
SCAInterval: 8 * time.Hour,
38-
SCAEndpoint: "https://api.openshift.com/api/accounts_mgmt/v1/certificates",
37+
SCAInterval: 8 * time.Hour,
38+
SCAEndpoint: "https://api.openshift.com/api/accounts_mgmt/v1/certificates",
39+
ClusterTransferEndpoint: "https://api.openshift.com/api/accounts_mgmt/v1/cluster_transfers",
40+
ClusterTransferInterval: 24 * time.Hour,
3941
},
4042
},
4143
}

pkg/config/config.go

+28-8
Original file line numberDiff line numberDiff line change
@@ -26,9 +26,11 @@ type Serialized struct {
2626
Gather []string `json:"gather"`
2727
EnableGlobalObfuscation bool `json:"enableGlobalObfuscation"`
2828
OCM struct {
29-
SCAEndpoint string `json:"scaEndpoint"`
30-
SCAInterval string `json:"scaInterval"`
31-
SCADisabled bool `json:"scaDisabled"`
29+
SCAEndpoint string `json:"scaEndpoint"`
30+
SCAInterval string `json:"scaInterval"`
31+
SCADisabled bool `json:"scaDisabled"`
32+
ClusterTransferEndpoint string `json:"clusterTransferEndpoint"`
33+
ClusterTransferInterval string `json:"clusterTransferInterval"`
3234
}
3335
}
3436

@@ -75,9 +77,11 @@ type HTTPConfig struct {
7577

7678
// OCMConfig configures the interval and endpoint for retrieving the data from OCM API
7779
type OCMConfig struct {
78-
SCAInterval time.Duration
79-
SCAEndpoint string
80-
SCADisabled bool
80+
SCAInterval time.Duration
81+
SCAEndpoint string
82+
SCADisabled bool
83+
ClusterTransferEndpoint string
84+
ClusterTransferInterval time.Duration
8185
}
8286

8387
type Converter func(s *Serialized, cfg *Controller) (*Controller, error)
@@ -157,6 +161,13 @@ func (c *Controller) mergeOCM(cfg *Controller) {
157161
c.OCMConfig.SCAInterval = cfg.OCMConfig.SCAInterval
158162
}
159163
c.OCMConfig.SCADisabled = cfg.OCMConfig.SCADisabled
164+
165+
if len(cfg.OCMConfig.ClusterTransferEndpoint) > 0 {
166+
c.OCMConfig.ClusterTransferEndpoint = cfg.OCMConfig.ClusterTransferEndpoint
167+
}
168+
if cfg.OCMConfig.ClusterTransferInterval > 0 {
169+
c.OCMConfig.ClusterTransferInterval = cfg.OCMConfig.ClusterTransferInterval
170+
}
160171
}
161172

162173
func (c *Controller) mergeHTTP(cfg *Controller) {
@@ -171,11 +182,10 @@ func (c *Controller) mergeInterval(cfg *Controller) {
171182

172183
// ToController creates/updates a config Controller according to the Serialized config.
173184
// Makes sure that the config is correct.
174-
func ToController(s *Serialized, cfg *Controller) (*Controller, error) { // nolint: gocyclo
185+
func ToController(s *Serialized, cfg *Controller) (*Controller, error) { // nolint: gocyclo, funlen
175186
if cfg == nil {
176187
cfg = &Controller{}
177188
}
178-
179189
cfg.Report = s.Report
180190
cfg.StoragePath = s.StoragePath
181191
cfg.Endpoint = s.Endpoint
@@ -252,6 +262,16 @@ func ToController(s *Serialized, cfg *Controller) (*Controller, error) { // noli
252262
}
253263
cfg.OCMConfig.SCAInterval = i
254264
}
265+
if len(s.OCM.SCAEndpoint) > 0 {
266+
cfg.OCMConfig.ClusterTransferEndpoint = s.OCM.ClusterTransferEndpoint
267+
}
268+
if len(s.OCM.ClusterTransferInterval) > 0 {
269+
i, err := time.ParseDuration(s.OCM.ClusterTransferInterval)
270+
if err != nil {
271+
return nil, fmt.Errorf("OCM Cluster transfer interval must be a valid duration: %v", err)
272+
}
273+
cfg.OCMConfig.ClusterTransferInterval = i
274+
}
255275
return cfg, nil
256276
}
257277

pkg/config/configobserver/config.go

+14
Original file line numberDiff line numberDiff line change
@@ -146,4 +146,18 @@ func (c *Config) loadOCM(data map[string][]byte) {
146146
if scaDisabled, ok := data["scaPullDisabled"]; ok {
147147
c.OCMConfig.SCADisabled = strings.EqualFold(string(scaDisabled), "true")
148148
}
149+
150+
if clusterTransferEndpoint, ok := data["clusterTransferEndpoint"]; ok {
151+
c.OCMConfig.ClusterTransferEndpoint = string(clusterTransferEndpoint)
152+
}
153+
if clusterTransferInterval, ok := data["clusterTransferInterval"]; ok {
154+
if newInterval, err := time.ParseDuration(string(clusterTransferInterval)); err == nil {
155+
c.OCMConfig.ClusterTransferInterval = newInterval
156+
} else {
157+
klog.Warningf(
158+
"secret contains an invalid value (%s) for clusterTransferInterval. Using previous value",
159+
clusterTransferInterval,
160+
)
161+
}
162+
}
149163
}

pkg/controller/operator.go

+6
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@ import (
2727
"github.com/openshift/insights-operator/pkg/insights/insightsclient"
2828
"github.com/openshift/insights-operator/pkg/insights/insightsreport"
2929
"github.com/openshift/insights-operator/pkg/insights/insightsuploader"
30+
"github.com/openshift/insights-operator/pkg/ocm/clustertransfer"
3031
"github.com/openshift/insights-operator/pkg/ocm/sca"
3132
"github.com/openshift/insights-operator/pkg/recorder"
3233
"github.com/openshift/insights-operator/pkg/recorder/diskrecorder"
@@ -149,6 +150,11 @@ func (s *Operator) Run(ctx context.Context, controller *controllercmd.Controller
149150
statusReporter.AddSources(scaController)
150151
go scaController.Run()
151152
}
153+
154+
clusterTransferController := clustertransfer.New(ctx, kubeClient.CoreV1(), configObserver, insightsClient)
155+
statusReporter.AddSources(clusterTransferController)
156+
go clusterTransferController.Run()
157+
152158
klog.Warning("started")
153159

154160
<-ctx.Done()

pkg/controller/status/conditions.go

+3
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,9 @@ const (
1414
InsightsDownloadDegraded configv1.ClusterStatusConditionType = "InsightsDownloadDegraded"
1515
// SCANotAvailable is a condition type providing info about unsuccessful SCA pull attempt from the OCM API
1616
SCANotAvailable configv1.ClusterStatusConditionType = "SCANotAvailable"
17+
// ClusterTransferFailed is a condition type providing info about unsuccessful pull attempt of the ClusterTransfer from the OCM API
18+
// or unsuccessful pull-secret update
19+
ClusterTransferFailed configv1.ClusterStatusConditionType = "ClusterTransferFailed"
1720
)
1821

1922
type conditionsMap map[configv1.ClusterStatusConditionType]configv1.ClusterOperatorStatusCondition

pkg/controller/status/controller.go

+16-1
Original file line numberDiff line numberDiff line change
@@ -166,7 +166,7 @@ func (c *Controller) merge(clusterOperator *configv1.ClusterOperator) *configv1.
166166
}
167167

168168
// calculate the current controller status based on its given sources
169-
func (c *Controller) currentControllerStatus() (allReady bool, lastTransition time.Time) {
169+
func (c *Controller) currentControllerStatus() (allReady bool, lastTransition time.Time) { //nolint: gocyclo
170170
var errorReason string
171171
var errs []string
172172

@@ -210,6 +210,14 @@ func (c *Controller) currentControllerStatus() (allReady bool, lastTransition ti
210210
degradingFailure = true
211211
}
212212
c.ctrlStatus.setStatus(SCAPullStatus, summary.Reason, summary.Message)
213+
} else if summary.Operation.Name == controllerstatus.PullingClusterTransfer.Name {
214+
// mark as degraded only in case of HTTP 500 and higher
215+
if summary.Operation.HTTPStatusCode >= 500 {
216+
klog.V(4).Infof("Failed to pull the cluster transfer object within the threshold %d with exponential backoff. Marking as degraded.",
217+
OCMAPIFailureCountThreshold)
218+
degradingFailure = true
219+
}
220+
c.ctrlStatus.setStatus(ClusterTransferStatus, summary.Reason, summary.Message)
213221
}
214222

215223
if degradingFailure {
@@ -356,6 +364,13 @@ func updateControllerConditions(cs *conditions, ctrlStatus *controllerStatus,
356364
} else {
357365
cs.removeCondition(SCANotAvailable)
358366
}
367+
368+
// handler when ClusterTransfer pull from the OCM fails
369+
if ss := ctrlStatus.getStatus(ClusterTransferStatus); ss != nil {
370+
cs.setCondition(ClusterTransferFailed, configv1.ConditionTrue, ss.reason, ss.message, metav1.Time{Time: lastTransition})
371+
} else {
372+
cs.removeCondition(ClusterTransferFailed)
373+
}
359374
}
360375

361376
// update the current controller state by it status

pkg/controller/status/status.go

+6-5
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,12 @@
11
package status
22

33
const (
4-
DisabledStatus = "disabled"
5-
UploadStatus = "upload"
6-
DownloadStatus = "download"
7-
ErrorStatus = "error"
8-
SCAPullStatus = "scaPullStatus"
4+
DisabledStatus = "disabled"
5+
UploadStatus = "upload"
6+
DownloadStatus = "download"
7+
ErrorStatus = "error"
8+
SCAPullStatus = "scaPullStatus"
9+
ClusterTransferStatus = "clusterTransferStatus"
910
)
1011

1112
type controllerStatus struct {

pkg/controllerstatus/controllerstatus.go

+5-1
Original file line numberDiff line numberDiff line change
@@ -12,10 +12,12 @@ type Interface interface {
1212
}
1313

1414
type Operation struct {
15-
Name string
15+
Name OperationName
1616
HTTPStatusCode int
1717
}
1818

19+
type OperationName string
20+
1921
var (
2022
// DownloadingReport specific flag for Smart Proxy report downloading process.
2123
DownloadingReport = Operation{Name: "DownloadingReport"}
@@ -25,6 +27,8 @@ var (
2527
GatheringReport = Operation{Name: "GatheringReport"}
2628
// PullingSCACerts is specific operation for pulling the SCA certs data from the OCM API
2729
PullingSCACerts = Operation{Name: "PullingSCACerts"}
30+
// PullingClusterTransfer is an operator for pulling ClusterTransfer object from the OCM API endpoint
31+
PullingClusterTransfer = Operation{Name: "PullingClusterTransfer"}
2832
)
2933

3034
// Summary represents the status summary of an Operation

pkg/insights/insightsclient/requests.go

+46-1
Original file line numberDiff line numberDiff line change
@@ -5,11 +5,12 @@ import (
55
"context"
66
"fmt"
77
"io"
8-
"k8s.io/klog/v2"
98
"mime/multipart"
109
"net/http"
1110
"strconv"
1211

12+
"k8s.io/klog/v2"
13+
1314
"github.com/openshift/insights-operator/pkg/authorizer"
1415
)
1516

@@ -245,3 +246,47 @@ func (c *Client) RecvGatheringRules(ctx context.Context, endpoint string) ([]byt
245246

246247
return io.ReadAll(resp.Body)
247248
}
249+
250+
// RecvClusterTransfer performs a request to the OCM cluster transfer API.
251+
// It is a HTTP GET request with the `search` query parameter limiting the result
252+
// only for the one cluster and only for the `accepted` cluster transfers.
253+
func (c *Client) RecvClusterTransfer(endpoint string) ([]byte, error) {
254+
cv, err := c.getClusterVersion()
255+
if err != nil {
256+
return nil, err
257+
}
258+
if cv == nil {
259+
return nil, ErrWaitingForVersion
260+
}
261+
token, err := c.authorizer.Token()
262+
if err != nil {
263+
return nil, err
264+
}
265+
req, err := http.NewRequest(http.MethodGet, endpoint, http.NoBody)
266+
if err != nil {
267+
return nil, err
268+
}
269+
q := req.URL.Query()
270+
searchQuery := fmt.Sprintf("cluster_uuid is '%s' and status is 'accepted'", cv.Spec.ClusterID)
271+
q.Add("search", searchQuery)
272+
req.URL.RawQuery = q.Encode()
273+
req.Header.Set("Content-Type", "application/json")
274+
c.client.Transport = clientTransport(c.authorizer)
275+
authHeader := fmt.Sprintf("AccessToken %s:%s", cv.Spec.ClusterID, token)
276+
req.Header.Set("Authorization", authHeader)
277+
278+
resp, err := c.client.Do(req)
279+
if err != nil {
280+
return nil, fmt.Errorf("unable to retrieve cluster transfer data from %s: %v", endpoint, err)
281+
}
282+
283+
if resp.StatusCode > 399 || resp.StatusCode < 200 {
284+
return nil, ocmErrorMessage(resp)
285+
}
286+
defer func() {
287+
if err := resp.Body.Close(); err != nil {
288+
klog.Warningf("Failed to close response body: %v", err)
289+
}
290+
}()
291+
return io.ReadAll(resp.Body)
292+
}

0 commit comments

Comments
 (0)