Skip to content

Commit aecc074

Browse files
Merge pull request #16766 from pravisankar/replace-eventqueue
Automatic merge from submit-queue (batch tested with PRs 18075, 17725, 16766, 18070, 18113). Replaced event queue based openshift sdn resource watches with shared informers - Use widely used and relatively stable shared informers instead of custom sdn event queue which has some issues: openshift/origin: Issue #16080 openshift/origin: Issue #13879 - We do network object watching in multiple places for the same resource (HostSubnet/NetNamespace/EgressNetworkPolicy) and each watch consumes a go routine. Shared informer reduces bandwidth/cpu/memory footprint by running only one go routine per resource watch and allows multiple subscribers.
2 parents 09042cd + 99cb946 commit aecc074

21 files changed

+480
-1365
lines changed

pkg/cmd/server/kubernetes/network/network_config.go

+11-11
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@ import (
2121
"github.com/openshift/origin/pkg/cmd/server/kubernetes/network/transport"
2222
"github.com/openshift/origin/pkg/dns"
2323
"github.com/openshift/origin/pkg/network"
24+
networkinformers "github.com/openshift/origin/pkg/network/generated/informers/internalversion"
2425
networkclient "github.com/openshift/origin/pkg/network/generated/internalclientset"
2526
)
2627

@@ -33,6 +34,8 @@ type NetworkConfig struct {
3334
ExternalKubeClientset kclientsetexternal.Interface
3435
// Internal kubernetes shared informer factory.
3536
InternalKubeInformers kinternalinformers.SharedInformerFactory
37+
// Internal network shared informer factory.
38+
InternalNetworkInformers networkinformers.SharedInformerFactory
3639

3740
// ProxyConfig is the configuration for the kube-proxy, fully initialized
3841
ProxyConfig *kubeproxyconfig.KubeProxyConfiguration
@@ -94,25 +97,22 @@ func New(options configapi.NodeConfig, clusterDomain string, proxyConfig *kubepr
9497

9598
internalKubeInformers := kinternalinformers.NewSharedInformerFactory(internalKubeClient, proxyConfig.ConfigSyncPeriod.Duration)
9699

97-
var sdnNode network.NodeInterface
98-
var sdnProxy network.ProxyInterface
99-
if network.IsOpenShiftNetworkPlugin(options.NetworkConfig.NetworkPluginName) {
100-
sdnNode, sdnProxy, err = NewSDNInterfaces(options, networkClient, kubeClient, internalKubeClient, internalKubeInformers, proxyConfig)
101-
if err != nil {
102-
return nil, fmt.Errorf("SDN initialization failed: %v", err)
103-
}
104-
}
105-
106100
config := &NetworkConfig{
107101
KubeClientset: kubeClient,
108102
ExternalKubeClientset: externalKubeClient,
109103
InternalKubeInformers: internalKubeInformers,
110104

111105
ProxyConfig: proxyConfig,
112106
EnableUnidling: options.EnableUnidling,
107+
}
108+
109+
if network.IsOpenShiftNetworkPlugin(options.NetworkConfig.NetworkPluginName) {
110+
config.InternalNetworkInformers = networkinformers.NewSharedInformerFactory(networkClient, network.DefaultInformerResyncPeriod)
113111

114-
SDNNode: sdnNode,
115-
SDNProxy: sdnProxy,
112+
config.SDNNode, config.SDNProxy, err = NewSDNInterfaces(options, networkClient, kubeClient, internalKubeClient, internalKubeInformers, config.InternalNetworkInformers, proxyConfig)
113+
if err != nil {
114+
return nil, fmt.Errorf("SDN initialization failed: %v", err)
115+
}
116116
}
117117

118118
if enableDNS {

pkg/cmd/server/kubernetes/network/sdn_linux.go

+9-2
Original file line numberDiff line numberDiff line change
@@ -14,12 +14,18 @@ import (
1414

1515
configapi "github.com/openshift/origin/pkg/cmd/server/api"
1616
"github.com/openshift/origin/pkg/network"
17+
networkinformers "github.com/openshift/origin/pkg/network/generated/informers/internalversion"
1718
networkclient "github.com/openshift/origin/pkg/network/generated/internalclientset"
1819
sdnnode "github.com/openshift/origin/pkg/network/node"
1920
sdnproxy "github.com/openshift/origin/pkg/network/proxy"
2021
)
2122

22-
func NewSDNInterfaces(options configapi.NodeConfig, networkClient networkclient.Interface, kubeClientset kclientset.Interface, kubeClient kinternalclientset.Interface, internalKubeInformers kinternalinformers.SharedInformerFactory, proxyconfig *kubeproxyconfig.KubeProxyConfiguration) (network.NodeInterface, network.ProxyInterface, error) {
23+
func NewSDNInterfaces(options configapi.NodeConfig, networkClient networkclient.Interface,
24+
kubeClientset kclientset.Interface, kubeClient kinternalclientset.Interface,
25+
internalKubeInformers kinternalinformers.SharedInformerFactory,
26+
internalNetworkInformers networkinformers.SharedInformerFactory,
27+
proxyconfig *kubeproxyconfig.KubeProxyConfiguration) (network.NodeInterface, network.ProxyInterface, error) {
28+
2329
runtimeEndpoint := options.DockerConfig.DockerShimSocket
2430
runtime, ok := options.KubeletArguments["container-runtime"]
2531
if ok && len(runtime) == 1 && runtime[0] == "remote" {
@@ -47,6 +53,7 @@ func NewSDNInterfaces(options configapi.NodeConfig, networkClient networkclient.
4753
NetworkClient: networkClient,
4854
KClient: kubeClient,
4955
KubeInformers: internalKubeInformers,
56+
NetworkInformers: internalNetworkInformers,
5057
IPTablesSyncPeriod: proxyconfig.IPTables.SyncPeriod.Duration,
5158
ProxyMode: proxyconfig.Mode,
5259
EnableHostports: enableHostports,
@@ -56,7 +63,7 @@ func NewSDNInterfaces(options configapi.NodeConfig, networkClient networkclient.
5663
return nil, nil, err
5764
}
5865

59-
proxy, err := sdnproxy.New(options.NetworkConfig.NetworkPluginName, networkClient, kubeClient)
66+
proxy, err := sdnproxy.New(options.NetworkConfig.NetworkPluginName, networkClient, kubeClient, internalNetworkInformers)
6067
if err != nil {
6168
return nil, nil, err
6269
}

pkg/cmd/server/kubernetes/network/sdn_unsupported.go

+7-1
Original file line numberDiff line numberDiff line change
@@ -12,9 +12,15 @@ import (
1212

1313
configapi "github.com/openshift/origin/pkg/cmd/server/api"
1414
"github.com/openshift/origin/pkg/network"
15+
networkinformers "github.com/openshift/origin/pkg/network/generated/informers/internalversion"
1516
networkclient "github.com/openshift/origin/pkg/network/generated/internalclientset"
1617
)
1718

18-
func NewSDNInterfaces(options configapi.NodeConfig, networkClient networkclient.Interface, kubeClientset kclientset.Interface, kubeClient kinternalclientset.Interface, internalKubeInformers kinternalinformers.SharedInformerFactory, proxyconfig *kubeproxyconfig.KubeProxyConfiguration) (network.NodeInterface, network.ProxyInterface, error) {
19+
func NewSDNInterfaces(options configapi.NodeConfig, networkClient networkclient.Interface,
20+
kubeClientset kclientset.Interface, kubeClient kinternalclientset.Interface,
21+
internalKubeInformers kinternalinformers.SharedInformerFactory,
22+
internalNetworkInformers networkinformers.SharedInformerFactory,
23+
proxyconfig *kubeproxyconfig.KubeProxyConfiguration) (network.NodeInterface, network.ProxyInterface, error) {
24+
1925
return nil, nil, fmt.Errorf("SDN not supported on this platform")
2026
}

pkg/cmd/server/origin/controller/network_sdn.go

+11-1
Original file line numberDiff line numberDiff line change
@@ -3,9 +3,12 @@ package controller
33
import (
44
"fmt"
55

6+
utilwait "k8s.io/apimachinery/pkg/util/wait"
7+
68
configapi "github.com/openshift/origin/pkg/cmd/server/api"
79
"github.com/openshift/origin/pkg/cmd/server/bootstrappolicy"
810
"github.com/openshift/origin/pkg/network"
11+
networkinformers "github.com/openshift/origin/pkg/network/generated/informers/internalversion"
912
sdnmaster "github.com/openshift/origin/pkg/network/master"
1013
)
1114

@@ -18,13 +21,20 @@ func (c *SDNControllerConfig) RunController(ctx ControllerContext) (bool, error)
1821
return false, nil
1922
}
2023

24+
networkClient := ctx.ClientBuilder.OpenshiftInternalNetworkClientOrDie(bootstrappolicy.InfraSDNControllerServiceAccountName)
25+
networkInformers := networkinformers.NewSharedInformerFactory(networkClient, network.DefaultInformerResyncPeriod)
26+
2127
if err := sdnmaster.Start(
2228
c.NetworkConfig,
23-
ctx.ClientBuilder.OpenshiftInternalNetworkClientOrDie(bootstrappolicy.InfraSDNControllerServiceAccountName),
29+
networkClient,
2430
ctx.ClientBuilder.KubeInternalClientOrDie(bootstrappolicy.InfraSDNControllerServiceAccountName),
2531
ctx.InternalKubeInformers,
32+
networkInformers,
2633
); err != nil {
2734
return false, fmt.Errorf("failed to start SDN plugin controller: %v", err)
2835
}
36+
37+
networkInformers.Start(utilwait.NeverStop)
38+
2939
return true, nil
3040
}

pkg/cmd/server/start/start_node.go

+3
Original file line numberDiff line numberDiff line change
@@ -495,6 +495,9 @@ func StartNode(nodeConfig configapi.NodeConfig, components *utilflags.ComponentF
495495
}
496496

497497
networkConfig.InternalKubeInformers.Start(wait.NeverStop)
498+
if networkConfig.InternalNetworkInformers != nil {
499+
networkConfig.InternalNetworkInformers.Start(wait.NeverStop)
500+
}
498501

499502
return nil
500503
}

pkg/network/common/common.go

+5-123
Original file line numberDiff line numberDiff line change
@@ -3,26 +3,17 @@ package common
33
import (
44
"fmt"
55
"net"
6-
"reflect"
7-
"strings"
8-
"time"
96

107
"github.com/golang/glog"
118

9+
"k8s.io/apimachinery/pkg/apis/meta/v1"
10+
kerrors "k8s.io/apimachinery/pkg/util/errors"
11+
kapi "k8s.io/kubernetes/pkg/apis/core"
12+
1213
configapi "github.com/openshift/origin/pkg/cmd/server/api"
1314
networkapi "github.com/openshift/origin/pkg/network/apis/network"
1415
networkclient "github.com/openshift/origin/pkg/network/generated/internalclientset"
1516
"github.com/openshift/origin/pkg/util/netutils"
16-
17-
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
18-
"k8s.io/apimachinery/pkg/fields"
19-
kerrors "k8s.io/apimachinery/pkg/util/errors"
20-
"k8s.io/apimachinery/pkg/util/wait"
21-
"k8s.io/apimachinery/pkg/watch"
22-
kcache "k8s.io/client-go/tools/cache"
23-
kapi "k8s.io/kubernetes/pkg/apis/core"
24-
"k8s.io/kubernetes/pkg/apis/networking"
25-
kinternalinformers "k8s.io/kubernetes/pkg/client/informers/informers_generated/internalversion"
2617
)
2718

2819
func HostSubnetToString(subnet *networkapi.HostSubnet) string {
@@ -161,119 +152,10 @@ func (ni *NetworkInfo) CheckClusterObjects(subnets []networkapi.HostSubnet, pods
161152
}
162153

163154
func GetNetworkInfo(networkClient networkclient.Interface) (*NetworkInfo, error) {
164-
cn, err := networkClient.Network().ClusterNetworks().Get(networkapi.ClusterNetworkDefault, metav1.GetOptions{})
155+
cn, err := networkClient.Network().ClusterNetworks().Get(networkapi.ClusterNetworkDefault, v1.GetOptions{})
165156
if err != nil {
166157
return nil, err
167158
}
168159

169160
return ParseNetworkInfo(cn.ClusterNetworks, cn.ServiceNetwork)
170161
}
171-
172-
type ResourceName string
173-
174-
const (
175-
Nodes ResourceName = "Nodes"
176-
Namespaces ResourceName = "Namespaces"
177-
NetNamespaces ResourceName = "NetNamespaces"
178-
Services ResourceName = "Services"
179-
HostSubnets ResourceName = "HostSubnets"
180-
Pods ResourceName = "Pods"
181-
EgressNetworkPolicies ResourceName = "EgressNetworkPolicies"
182-
NetworkPolicies ResourceName = "NetworkPolicies"
183-
)
184-
185-
func newEventQueue(client kcache.Getter, resourceName ResourceName, expectedType interface{}, namespace string) *EventQueue {
186-
rn := strings.ToLower(string(resourceName))
187-
lw := kcache.NewListWatchFromClient(client, rn, namespace, fields.Everything())
188-
eventQueue := NewEventQueue(DeletionHandlingMetaNamespaceKeyFunc)
189-
// Repopulate event queue every 30 mins
190-
// Existing items in the event queue will have watch.Modified event type
191-
go kcache.NewReflector(lw, expectedType, eventQueue, 30*time.Minute).Run(wait.NeverStop)
192-
return eventQueue
193-
}
194-
195-
// Run event queue for the given resource. The 'process' function is called
196-
// repeatedly with each available cache.Delta that describes state changes
197-
// to an object. If the process function returns an error queued changes
198-
// for that object are dropped but processing continues with the next available
199-
// object's cache.Deltas. The error is logged with call stack information.
200-
//
201-
// NOTE: this function will handle DeletedFinalStateUnknown delta objects
202-
// automatically, which may not always be what you want since the now-deleted
203-
// object may be stale.
204-
func RunEventQueue(client kcache.Getter, resourceName ResourceName, process ProcessEventFunc) {
205-
var expectedType interface{}
206-
207-
switch resourceName {
208-
case HostSubnets:
209-
expectedType = &networkapi.HostSubnet{}
210-
case NetNamespaces:
211-
expectedType = &networkapi.NetNamespace{}
212-
case EgressNetworkPolicies:
213-
expectedType = &networkapi.EgressNetworkPolicy{}
214-
case NetworkPolicies:
215-
expectedType = &networking.NetworkPolicy{}
216-
default:
217-
glog.Fatalf("Unknown resource %s during initialization of event queue", resourceName)
218-
}
219-
220-
eventQueue := newEventQueue(client, resourceName, expectedType, metav1.NamespaceAll)
221-
for {
222-
eventQueue.Pop(process, expectedType)
223-
}
224-
}
225-
226-
// RegisterSharedInformerEventHandlers registers addOrUpdateFunc and delFunc event handlers with
227-
// kubernetes shared informers for the given resource name.
228-
func RegisterSharedInformerEventHandlers(kubeInformers kinternalinformers.SharedInformerFactory,
229-
addOrUpdateFunc func(interface{}, interface{}, watch.EventType),
230-
delFunc func(interface{}), resourceName ResourceName) {
231-
232-
var expectedObjType interface{}
233-
var informer kcache.SharedIndexInformer
234-
235-
internalVersion := kubeInformers.Core().InternalVersion()
236-
237-
switch resourceName {
238-
case Nodes:
239-
informer = internalVersion.Nodes().Informer()
240-
expectedObjType = &kapi.Node{}
241-
case Namespaces:
242-
informer = internalVersion.Namespaces().Informer()
243-
expectedObjType = &kapi.Namespace{}
244-
case Services:
245-
informer = internalVersion.Services().Informer()
246-
expectedObjType = &kapi.Service{}
247-
case Pods:
248-
informer = internalVersion.Pods().Informer()
249-
expectedObjType = &kapi.Pod{}
250-
default:
251-
glog.Errorf("Unknown resource name: %s", resourceName)
252-
return
253-
}
254-
255-
informer.AddEventHandler(kcache.ResourceEventHandlerFuncs{
256-
AddFunc: func(obj interface{}) {
257-
addOrUpdateFunc(obj, nil, watch.Added)
258-
},
259-
UpdateFunc: func(old, cur interface{}) {
260-
addOrUpdateFunc(cur, old, watch.Modified)
261-
},
262-
DeleteFunc: func(obj interface{}) {
263-
if reflect.TypeOf(expectedObjType) != reflect.TypeOf(obj) {
264-
tombstone, ok := obj.(kcache.DeletedFinalStateUnknown)
265-
if !ok {
266-
glog.Errorf("Couldn't get object from tombstone: %+v", obj)
267-
return
268-
}
269-
270-
obj = tombstone.Obj
271-
if reflect.TypeOf(expectedObjType) != reflect.TypeOf(obj) {
272-
glog.Errorf("Tombstone contained object that is not a %s: %+v", resourceName, obj)
273-
return
274-
}
275-
}
276-
delFunc(obj)
277-
},
278-
})
279-
}

0 commit comments

Comments
 (0)