Skip to content

Commit 015536d

Browse files
author
Serhii Zakharov
committed
refactoring and minor changes needed by conditional gathering
1 parent 1b45229 commit 015536d

17 files changed

+176
-113
lines changed

Diff for: pkg/gather/gather.go

+9-2
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@ import (
1616
"github.com/openshift/insights-operator/pkg/config/configobserver"
1717
"github.com/openshift/insights-operator/pkg/gatherers"
1818
"github.com/openshift/insights-operator/pkg/gatherers/clusterconfig"
19+
"github.com/openshift/insights-operator/pkg/gatherers/conditional"
1920
"github.com/openshift/insights-operator/pkg/gatherers/workloads"
2021
"github.com/openshift/insights-operator/pkg/record"
2122
"github.com/openshift/insights-operator/pkg/recorder"
@@ -58,8 +59,9 @@ func CreateAllGatherers(
5859
gatherKubeConfig, gatherProtoKubeConfig, metricsGatherKubeConfig, anonymizer,
5960
)
6061
workloadsGatherer := workloads.New(gatherProtoKubeConfig)
62+
conditionalGatherer := conditional.New(gatherProtoKubeConfig, metricsGatherKubeConfig)
6163

62-
return []gatherers.Interface{clusterConfigGatherer, workloadsGatherer}
64+
return []gatherers.Interface{clusterConfigGatherer, workloadsGatherer, conditionalGatherer}
6365
}
6466

6567
// CollectAndRecord gathers enabled functions of the provided gatherer and records the results to the recorder
@@ -173,7 +175,12 @@ func startGatheringConcurrently(
173175

174176
var tasks []Task
175177

176-
for functionName, gatheringClosure := range gatherer.GetGatheringFunctions() {
178+
gatheringFunctions, err := gatherer.GetGatheringFunctions(ctx)
179+
if err != nil {
180+
return nil, err
181+
}
182+
183+
for functionName, gatheringClosure := range gatheringFunctions {
177184
if !gatherAllFunctions && !utils.StringInSlice(functionName, gatherFunctionsList) {
178185
continue
179186
}

Diff for: pkg/gather/mock_gatherers.go

+6-6
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,7 @@ type MockGatherer struct {
1717

1818
func (*MockGatherer) GetName() string { return "mock_gatherer" }
1919

20-
func (g *MockGatherer) GetGatheringFunctions() map[string]gatherers.GatheringClosure {
20+
func (g *MockGatherer) GetGatheringFunctions(context.Context) (map[string]gatherers.GatheringClosure, error) {
2121
return map[string]gatherers.GatheringClosure{
2222
"name": {
2323
Run: func(ctx context.Context) ([]record.Record, []error) {
@@ -49,7 +49,7 @@ func (g *MockGatherer) GetGatheringFunctions() map[string]gatherers.GatheringClo
4949
},
5050
CanFail: g.CanFail,
5151
},
52-
}
52+
}, nil
5353
}
5454

5555
func (g *MockGatherer) GatherName(context.Context) ([]record.Record, []error) {
@@ -107,15 +107,15 @@ type MockCustomPeriodGatherer struct {
107107

108108
func (*MockCustomPeriodGatherer) GetName() string { return "mock_custom_period_gatherer" }
109109

110-
func (g *MockCustomPeriodGatherer) GetGatheringFunctions() map[string]gatherers.GatheringClosure {
110+
func (g *MockCustomPeriodGatherer) GetGatheringFunctions(context.Context) (map[string]gatherers.GatheringClosure, error) {
111111
return map[string]gatherers.GatheringClosure{
112112
"period": {
113113
Run: func(ctx context.Context) ([]record.Record, []error) {
114114
return g.GatherPeriod(ctx)
115115
},
116116
CanFail: false,
117117
},
118-
}
118+
}, nil
119119
}
120120

121121
func (g *MockCustomPeriodGatherer) ShouldBeProcessedNow() bool {
@@ -149,15 +149,15 @@ func (*MockCustomPeriodGathererNoPeriod) GetName() string {
149149
return "mock_custom_period_gatherer_no_period"
150150
}
151151

152-
func (g *MockCustomPeriodGathererNoPeriod) GetGatheringFunctions() map[string]gatherers.GatheringClosure {
152+
func (g *MockCustomPeriodGathererNoPeriod) GetGatheringFunctions(context.Context) (map[string]gatherers.GatheringClosure, error) {
153153
return map[string]gatherers.GatheringClosure{
154154
"should_be_processed": {
155155
Run: func(ctx context.Context) ([]record.Record, []error) {
156156
return g.GatherShouldBeProcessed(ctx)
157157
},
158158
CanFail: false,
159159
},
160-
}
160+
}, nil
161161
}
162162

163163
func (g *MockCustomPeriodGathererNoPeriod) ShouldBeProcessedNow() bool {

Diff for: pkg/gatherers/clusterconfig/clusterconfig_gatherer.go

+3-3
Original file line numberDiff line numberDiff line change
@@ -44,7 +44,7 @@ func failableFunc(function gathererFuncPtr) gatheringFunction {
4444
}
4545

4646
var gatheringFunctions = map[string]gatheringFunction{
47-
"pdbs": importantFunc((*Gatherer).GatherPodDisruptionBudgets),
47+
"pdbs": failableFunc((*Gatherer).GatherPodDisruptionBudgets),
4848
"metrics": failableFunc((*Gatherer).GatherMostRecentMetrics),
4949
"operators": importantFunc((*Gatherer).GatherClusterOperators),
5050
"operators_pods_and_events": importantFunc((*Gatherer).GatherClusterOperatorPodsAndEvents),
@@ -99,7 +99,7 @@ func (g *Gatherer) GetName() string {
9999
return "clusterconfig"
100100
}
101101

102-
func (g *Gatherer) GetGatheringFunctions() map[string]gatherers.GatheringClosure {
102+
func (g *Gatherer) GetGatheringFunctions(context.Context) (map[string]gatherers.GatheringClosure, error) {
103103
result := make(map[string]gatherers.GatheringClosure)
104104

105105
for funcName, function := range gatheringFunctions {
@@ -113,5 +113,5 @@ func (g *Gatherer) GetGatheringFunctions() map[string]gatherers.GatheringClosure
113113
}
114114
}
115115

116-
return result
116+
return result, nil
117117
}

Diff for: pkg/gatherers/clusterconfig/clusterconfig_gatherer_test.go

+3-1
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
package clusterconfig_test
22

33
import (
4+
"context"
45
"testing"
56

67
"github.com/stretchr/testify/assert"
@@ -12,7 +13,8 @@ import (
1213
func Test_Gatherer_Basic(t *testing.T) {
1314
gatherer := clusterconfig.New(nil, nil, nil, nil)
1415
assert.Equal(t, "clusterconfig", gatherer.GetName())
15-
gatheringFunctions := gatherer.GetGatheringFunctions()
16+
gatheringFunctions, err := gatherer.GetGatheringFunctions(context.TODO())
17+
assert.NoError(t, err)
1618
assert.Greater(t, len(gatheringFunctions), 0)
1719

1820
assert.Implements(t, (*gatherers.Interface)(nil), gatherer)

Diff for: pkg/gatherers/clusterconfig/openshift_apiserver_operator_logs.go

+12-10
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@ import (
55

66
"k8s.io/client-go/kubernetes"
77

8+
"github.com/openshift/insights-operator/pkg/gatherers/common"
89
"github.com/openshift/insights-operator/pkg/record"
910
)
1011

@@ -19,18 +20,19 @@ import (
1920
//
2021
// * Location in archive: config/pod/{namespace-name}/logs/{pod-name}/errors.log
2122
func (g *Gatherer) GatherOpenShiftAPIServerOperatorLogs(ctx context.Context) ([]record.Record, []error) {
22-
containersFilter := logContainersFilter{
23-
namespace: "openshift-apiserver-operator",
24-
labelSelector: "app=openshift-apiserver-operator",
23+
24+
containersFilter := common.LogContainersFilter{
25+
Namespace: "openshift-apiserver-operator",
26+
LabelSelector: "app=openshift-apiserver-operator",
2527
}
26-
messagesFilter := logMessagesFilter{
27-
messagesToSearch: []string{
28+
messagesFilter := common.LogMessagesFilter{
29+
MessagesToSearch: []string{
2830
"the server has received too many requests and has asked us",
2931
"because serving request timed out and response had been started",
3032
},
31-
isRegexSearch: false,
32-
sinceSeconds: 86400, // last day
33-
limitBytes: 1024 * 64, // maximum 64 kb of logs
33+
IsRegexSearch: false,
34+
SinceSeconds: 86400, // last day
35+
LimitBytes: 1024 * 64, // maximum 64 kb of logs
3436
}
3537

3638
gatherKubeClient, err := kubernetes.NewForConfig(g.gatherProtoKubeConfig)
@@ -40,12 +42,12 @@ func (g *Gatherer) GatherOpenShiftAPIServerOperatorLogs(ctx context.Context) ([]
4042

4143
coreClient := gatherKubeClient.CoreV1()
4244

43-
records, err := gatherLogsFromContainers(
45+
records, err := common.GatherLogsFromContainers(
4446
ctx,
4547
coreClient,
4648
containersFilter,
4749
messagesFilter,
48-
"errors",
50+
nil,
4951
)
5052
if err != nil {
5153
return nil, []error{err}

Diff for: pkg/gatherers/clusterconfig/openshift_authentication_logs.go

+11-10
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@ import (
55

66
"k8s.io/client-go/kubernetes"
77

8+
"github.com/openshift/insights-operator/pkg/gatherers/common"
89
"github.com/openshift/insights-operator/pkg/record"
910
)
1011

@@ -20,17 +21,17 @@ import (
2021
// * Since versions:
2122
// * 4.7+
2223
func (g *Gatherer) GatherOpenshiftAuthenticationLogs(ctx context.Context) ([]record.Record, []error) {
23-
containersFilter := logContainersFilter{
24-
namespace: "openshift-authentication",
25-
labelSelector: "app=oauth-openshift",
24+
containersFilter := common.LogContainersFilter{
25+
Namespace: "openshift-authentication",
26+
LabelSelector: "app=oauth-openshift",
2627
}
27-
messagesFilter := logMessagesFilter{
28-
messagesToSearch: []string{
28+
messagesFilter := common.LogMessagesFilter{
29+
MessagesToSearch: []string{
2930
"AuthenticationError: invalid resource name",
3031
},
31-
isRegexSearch: false,
32-
sinceSeconds: 86400, // last day
33-
limitBytes: 1024 * 64, // maximum 64 kb of logs
32+
IsRegexSearch: false,
33+
SinceSeconds: 86400, // last day
34+
LimitBytes: 1024 * 64, // maximum 64 kb of logs
3435
}
3536

3637
gatherKubeClient, err := kubernetes.NewForConfig(g.gatherProtoKubeConfig)
@@ -40,12 +41,12 @@ func (g *Gatherer) GatherOpenshiftAuthenticationLogs(ctx context.Context) ([]rec
4041

4142
coreClient := gatherKubeClient.CoreV1()
4243

43-
records, err := gatherLogsFromContainers(
44+
records, err := common.GatherLogsFromContainers(
4445
ctx,
4546
coreClient,
4647
containersFilter,
4748
messagesFilter,
48-
"errors",
49+
nil,
4950
)
5051
if err != nil {
5152
return nil, []error{err}

Diff for: pkg/gatherers/clusterconfig/openshift_sdn_controller_logs.go

+11-10
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@ import (
66

77
"k8s.io/client-go/kubernetes"
88

9+
"github.com/openshift/insights-operator/pkg/gatherers/common"
910
"github.com/openshift/insights-operator/pkg/record"
1011
)
1112

@@ -31,20 +32,20 @@ import (
3132
// * 4.6.21+
3233
// * 4.7+
3334
func (g *Gatherer) GatherOpenshiftSDNControllerLogs(ctx context.Context) ([]record.Record, []error) {
34-
containersFilter := logContainersFilter{
35-
namespace: "openshift-sdn",
36-
labelSelector: "app=sdn-controller",
35+
containersFilter := common.LogContainersFilter{
36+
Namespace: "openshift-sdn",
37+
LabelSelector: "app=sdn-controller",
3738
}
38-
messagesFilter := logMessagesFilter{
39-
messagesToSearch: []string{
39+
messagesFilter := common.LogMessagesFilter{
40+
MessagesToSearch: []string{
4041
"Node.+is not Ready",
4142
"Node.+may be offline\\.\\.\\. retrying",
4243
"Node.+is offline",
4344
"Node.+is back online",
4445
},
45-
isRegexSearch: true,
46-
sinceSeconds: 86400, // last day
47-
limitBytes: 1024 * 64, // maximum 64 kb of logs
46+
IsRegexSearch: true,
47+
SinceSeconds: 86400, // last day
48+
LimitBytes: 1024 * 64, // maximum 64 kb of logs
4849
}
4950

5051
gatherKubeClient, err := kubernetes.NewForConfig(g.gatherProtoKubeConfig)
@@ -54,12 +55,12 @@ func (g *Gatherer) GatherOpenshiftSDNControllerLogs(ctx context.Context) ([]reco
5455

5556
coreClient := gatherKubeClient.CoreV1()
5657

57-
records, err := gatherLogsFromContainers(
58+
records, err := common.GatherLogsFromContainers(
5859
ctx,
5960
coreClient,
6061
containersFilter,
6162
messagesFilter,
62-
"errors",
63+
nil,
6364
)
6465
if err != nil {
6566
return nil, []error{err}

Diff for: pkg/gatherers/clusterconfig/openshift_sdn_logs.go

+11-10
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@ import (
66

77
"k8s.io/client-go/kubernetes"
88

9+
"github.com/openshift/insights-operator/pkg/gatherers/common"
910
"github.com/openshift/insights-operator/pkg/record"
1011
)
1112

@@ -25,20 +26,20 @@ import (
2526
// * 4.6.19+
2627
// * 4.7+
2728
func (g *Gatherer) GatherOpenshiftSDNLogs(ctx context.Context) ([]record.Record, []error) {
28-
containersFilter := logContainersFilter{
29-
namespace: "openshift-sdn",
30-
labelSelector: "app=sdn",
29+
containersFilter := common.LogContainersFilter{
30+
Namespace: "openshift-sdn",
31+
LabelSelector: "app=sdn",
3132
}
32-
messagesFilter := logMessagesFilter{
33-
messagesToSearch: []string{
33+
messagesFilter := common.LogMessagesFilter{
34+
MessagesToSearch: []string{
3435
"Got OnEndpointsUpdate for unknown Endpoints",
3536
"Got OnEndpointsDelete for unknown Endpoints",
3637
"Unable to update proxy firewall for policy",
3738
"Failed to update proxy firewall for policy",
3839
},
39-
isRegexSearch: false,
40-
sinceSeconds: 86400,
41-
limitBytes: 1024 * 64,
40+
IsRegexSearch: false,
41+
SinceSeconds: 86400,
42+
LimitBytes: 1024 * 64,
4243
}
4344

4445
gatherKubeClient, err := kubernetes.NewForConfig(g.gatherProtoKubeConfig)
@@ -48,12 +49,12 @@ func (g *Gatherer) GatherOpenshiftSDNLogs(ctx context.Context) ([]record.Record,
4849

4950
coreClient := gatherKubeClient.CoreV1()
5051

51-
records, err := gatherLogsFromContainers(
52+
records, err := common.GatherLogsFromContainers(
5253
ctx,
5354
coreClient,
5455
containersFilter,
5556
messagesFilter,
56-
"errors",
57+
nil,
5758
)
5859
if err != nil {
5960
return nil, []error{err}

Diff for: pkg/gatherers/clusterconfig/sap_vsystem_iptables_logs.go

+11-10
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@ import (
1111
corev1client "k8s.io/client-go/kubernetes/typed/core/v1"
1212
"k8s.io/klog/v2"
1313

14+
"github.com/openshift/insights-operator/pkg/gatherers/common"
1415
"github.com/openshift/insights-operator/pkg/record"
1516
)
1617

@@ -71,25 +72,25 @@ func gatherSAPLicenseManagementLogs(
7172
var errs []error
7273

7374
for _, item := range datahubs {
74-
containersFilter := logContainersFilter{
75-
namespace: item.GetNamespace(),
76-
containerNameRegexFilter: "^vsystem-iptables$",
75+
containersFilter := common.LogContainersFilter{
76+
Namespace: item.GetNamespace(),
77+
ContainerNameRegexFilter: "^vsystem-iptables$",
7778
}
78-
messagesFilter := logMessagesFilter{
79-
messagesToSearch: []string{
79+
messagesFilter := common.LogMessagesFilter{
80+
MessagesToSearch: []string{
8081
"can't initialize iptables table",
8182
},
82-
isRegexSearch: false,
83-
sinceSeconds: 86400,
84-
limitBytes: 1024 * 64,
83+
IsRegexSearch: false,
84+
SinceSeconds: 86400,
85+
LimitBytes: 1024 * 64,
8586
}
8687

87-
namespaceRecords, err := gatherLogsFromContainers(
88+
namespaceRecords, err := common.GatherLogsFromContainers(
8889
ctx,
8990
coreClient,
9091
containersFilter,
9192
messagesFilter,
92-
"errors",
93+
nil,
9394
)
9495
if err != nil {
9596
errs = append(errs, err)

Diff for: pkg/gatherers/common/constants.go

+6
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,6 @@
1+
package common
2+
3+
const (
4+
ImageConfigQPS = 10
5+
ImageConfigBurst = 10
6+
)

0 commit comments

Comments
 (0)