Skip to content

Commit d894a3e

Browse files
committed
move eventQueue to the only package using it
1 parent 94a8339 commit d894a3e

File tree

3 files changed

+46
-47
lines changed

3 files changed

+46
-47
lines changed

pkg/client/cache/eventqueue.go pkg/router/controller/factory/eventqueue.go

+27-27
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
package cache
1+
package factory
22

33
import (
44
"fmt"
@@ -35,7 +35,7 @@ import (
3535
// events. This is necessary because the Store API does not receive the deleted state on a
3636
// watch.Deleted event (though this state is delivered by the watch API itself, it is not passed on
3737
// to the reflector Store).
38-
type EventQueue struct {
38+
type eventQueue struct {
3939
lock sync.RWMutex
4040
cond sync.Cond
4141
store kcache.Store
@@ -56,7 +56,7 @@ type EventQueue struct {
5656
}
5757

5858
// EventQueue implements kcache.Store
59-
var _ kcache.Store = &EventQueue{}
59+
var _ kcache.Store = &eventQueue{}
6060

6161
// Describes the effect of processing a watch event on the event queue's state.
6262
type watchEventEffect string
@@ -108,7 +108,7 @@ func (es EventQueueStopped) Error() string {
108108

109109
// handleEvent is called by Add, Update, and Delete to determine the effect
110110
// of an event of the queue, realize that effect, and update the underlying store.
111-
func (eq *EventQueue) handleEvent(obj interface{}, newEventType watch.EventType) error {
111+
func (eq *eventQueue) handleEvent(obj interface{}, newEventType watch.EventType) error {
112112
key, err := eq.keyFn(obj)
113113
if err != nil {
114114
return err
@@ -162,13 +162,13 @@ func (eq *EventQueue) handleEvent(obj interface{}, newEventType watch.EventType)
162162
}
163163

164164
// Cancel function to force Pop function to unblock
165-
func (eq *EventQueue) Cancel() {
165+
func (eq *eventQueue) Cancel() {
166166
eq.cond.Broadcast()
167167
}
168168

169169
// updateStore updates the stored value for the given key. Note that deletions are not handled
170170
// here; they are performed in Pop in order to provide the deleted value on watch.Deleted events.
171-
func (eq *EventQueue) updateStore(key string, obj interface{}, eventType watch.EventType) error {
171+
func (eq *eventQueue) updateStore(key string, obj interface{}, eventType watch.EventType) error {
172172
if eventType == watch.Deleted {
173173
return nil
174174
}
@@ -183,7 +183,7 @@ func (eq *EventQueue) updateStore(key string, obj interface{}, eventType watch.E
183183
}
184184

185185
// queueWithout returns the internal queue minus the given key.
186-
func (eq *EventQueue) queueWithout(key string) []string {
186+
func (eq *eventQueue) queueWithout(key string) []string {
187187
rq := make([]string, 0)
188188
for _, qkey := range eq.queue {
189189
if qkey == key {
@@ -197,22 +197,22 @@ func (eq *EventQueue) queueWithout(key string) []string {
197197
}
198198

199199
// Add enqueues a watch.Added event for the given state.
200-
func (eq *EventQueue) Add(obj interface{}) error {
200+
func (eq *eventQueue) Add(obj interface{}) error {
201201
return eq.handleEvent(obj, watch.Added)
202202
}
203203

204204
// Update enqueues a watch.Modified event for the given state.
205-
func (eq *EventQueue) Update(obj interface{}) error {
205+
func (eq *eventQueue) Update(obj interface{}) error {
206206
return eq.handleEvent(obj, watch.Modified)
207207
}
208208

209209
// Delete enqueues a watch.Delete event for the given object.
210-
func (eq *EventQueue) Delete(obj interface{}) error {
210+
func (eq *eventQueue) Delete(obj interface{}) error {
211211
return eq.handleEvent(obj, watch.Deleted)
212212
}
213213

214214
// List returns a list of all enqueued items.
215-
func (eq *EventQueue) List() []interface{} {
215+
func (eq *eventQueue) List() []interface{} {
216216
eq.lock.RLock()
217217
defer eq.lock.RUnlock()
218218

@@ -232,7 +232,7 @@ func (eq *EventQueue) List() []interface{} {
232232
}
233233

234234
// ListKeys returns all enqueued keys.
235-
func (eq *EventQueue) ListKeys() []string {
235+
func (eq *eventQueue) ListKeys() []string {
236236
eq.lock.RLock()
237237
defer eq.lock.RUnlock()
238238

@@ -244,7 +244,7 @@ func (eq *EventQueue) ListKeys() []string {
244244
// ContainedIDs returns a sets.String containing all IDs of the enqueued items.
245245
// This is a snapshot of a moment in time, and one should keep in mind that
246246
// other go routines can add or remove items after you call this.
247-
func (eq *EventQueue) ContainedIDs() sets.String {
247+
func (eq *eventQueue) ContainedIDs() sets.String {
248248
eq.lock.RLock()
249249
defer eq.lock.RUnlock()
250250

@@ -257,7 +257,7 @@ func (eq *EventQueue) ContainedIDs() sets.String {
257257
}
258258

259259
// Get returns the requested item, or sets exists=false.
260-
func (eq *EventQueue) Get(obj interface{}) (item interface{}, exists bool, err error) {
260+
func (eq *eventQueue) Get(obj interface{}) (item interface{}, exists bool, err error) {
261261
key, err := eq.keyFn(obj)
262262
if err != nil {
263263
return nil, false, err
@@ -266,7 +266,7 @@ func (eq *EventQueue) Get(obj interface{}) (item interface{}, exists bool, err e
266266
}
267267

268268
// GetByKey returns the requested item, or sets exists=false.
269-
func (eq *EventQueue) GetByKey(key string) (item interface{}, exists bool, err error) {
269+
func (eq *eventQueue) GetByKey(key string) (item interface{}, exists bool, err error) {
270270
eq.lock.RLock()
271271
defer eq.lock.RUnlock()
272272

@@ -280,7 +280,7 @@ func (eq *EventQueue) GetByKey(key string) (item interface{}, exists bool, err e
280280

281281
// Pop gets the event and object at the head of the queue. If the event
282282
// is a delete event, Pop deletes the key from the underlying cache.
283-
func (eq *EventQueue) Pop() (watch.EventType, interface{}, error) {
283+
func (eq *eventQueue) Pop() (watch.EventType, interface{}, error) {
284284
eq.lock.Lock()
285285
defer eq.lock.Unlock()
286286

@@ -327,7 +327,7 @@ func (eq *EventQueue) Pop() (watch.EventType, interface{}, error) {
327327
// populates the queue with a watch.Modified event for each of the replaced
328328
// objects. The backing store takes ownership of keyToObjs; you should not
329329
// reference the map again after calling this function.
330-
func (eq *EventQueue) Replace(objects []interface{}, resourceVersion string) error {
330+
func (eq *eventQueue) Replace(objects []interface{}, resourceVersion string) error {
331331
eq.lock.Lock()
332332
defer eq.lock.Unlock()
333333

@@ -360,15 +360,15 @@ func (eq *EventQueue) Replace(objects []interface{}, resourceVersion string) err
360360

361361
// ListSuccessfulAtLeastOnce indicates whether a List operation was
362362
// successfully completed regardless of whether any items were queued.
363-
func (eq *EventQueue) ListSuccessfulAtLeastOnce() bool {
363+
func (eq *eventQueue) ListSuccessfulAtLeastOnce() bool {
364364
eq.lock.Lock()
365365
defer eq.lock.Unlock()
366366

367367
return eq.replaceCalled
368368
}
369369

370370
// ListCount returns how many objects were queued by the most recent List operation.
371-
func (eq *EventQueue) ListCount() int {
371+
func (eq *eventQueue) ListCount() int {
372372
eq.lock.Lock()
373373
defer eq.lock.Unlock()
374374

@@ -377,15 +377,15 @@ func (eq *EventQueue) ListCount() int {
377377

378378
// ListConsumed indicates whether the items queued by a List/Relist
379379
// operation have been consumed.
380-
func (eq *EventQueue) ListConsumed() bool {
380+
func (eq *eventQueue) ListConsumed() bool {
381381
eq.lock.Lock()
382382
defer eq.lock.Unlock()
383383

384384
return eq.lastReplaceKey == ""
385385
}
386386

387387
// Resync will touch all objects to put them into the processing queue
388-
func (eq *EventQueue) Resync() error {
388+
func (eq *eventQueue) Resync() error {
389389
eq.lock.Lock()
390390
defer eq.lock.Unlock()
391391

@@ -409,9 +409,9 @@ func (eq *EventQueue) Resync() error {
409409
return nil
410410
}
411411

412-
// NewEventQueue returns a new EventQueue.
413-
func NewEventQueue(keyFn kcache.KeyFunc) *EventQueue {
414-
q := &EventQueue{
412+
// newEventQueue returns a new EventQueue.
413+
func newEventQueue(keyFn kcache.KeyFunc) *eventQueue {
414+
q := &eventQueue{
415415
store: kcache.NewStore(keyFn),
416416
events: map[string]watch.EventType{},
417417
queue: []string{},
@@ -421,9 +421,9 @@ func NewEventQueue(keyFn kcache.KeyFunc) *EventQueue {
421421
return q
422422
}
423423

424-
// NewEventQueueForStore returns a new EventQueue that uses the provided store.
425-
func NewEventQueueForStore(keyFn kcache.KeyFunc, store kcache.Store) *EventQueue {
426-
q := &EventQueue{
424+
// newEventQueueForStore returns a new EventQueue that uses the provided store.
425+
func newEventQueueForStore(keyFn kcache.KeyFunc, store kcache.Store) *eventQueue {
426+
q := &eventQueue{
427427
store: store,
428428
events: map[string]watch.EventType{},
429429
queue: []string{},

pkg/client/cache/eventqueue_test.go pkg/router/controller/factory/eventqueue_test.go

+12-12
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
package cache
1+
package factory
22

33
import (
44
"testing"
@@ -16,7 +16,7 @@ func keyFunc(obj interface{}) (string, error) {
1616
}
1717

1818
func TestEventQueue_basic(t *testing.T) {
19-
q := NewEventQueue(keyFunc)
19+
q := newEventQueue(keyFunc)
2020

2121
const amount = 500
2222
go func() {
@@ -54,7 +54,7 @@ func TestEventQueue_basic(t *testing.T) {
5454
}
5555

5656
func TestEventQueue_initialEventIsDelete(t *testing.T) {
57-
q := NewEventQueue(keyFunc)
57+
q := newEventQueue(keyFunc)
5858

5959
q.Replace([]interface{}{
6060
cacheable{"foo", 2},
@@ -75,7 +75,7 @@ func TestEventQueue_initialEventIsDelete(t *testing.T) {
7575
}
7676

7777
func TestEventQueue_compressAddDelete(t *testing.T) {
78-
q := NewEventQueue(keyFunc)
78+
q := newEventQueue(keyFunc)
7979

8080
q.Add(cacheable{"foo", 10})
8181
q.Delete(cacheable{key: "foo"})
@@ -94,7 +94,7 @@ func TestEventQueue_compressAddDelete(t *testing.T) {
9494
}
9595

9696
func TestEventQueue_compressAddUpdate(t *testing.T) {
97-
q := NewEventQueue(keyFunc)
97+
q := newEventQueue(keyFunc)
9898

9999
q.Add(cacheable{"foo", 10})
100100
q.Update(cacheable{"foo", 11})
@@ -111,7 +111,7 @@ func TestEventQueue_compressAddUpdate(t *testing.T) {
111111
}
112112

113113
func TestEventQueue_compressTwoUpdates(t *testing.T) {
114-
q := NewEventQueue(keyFunc)
114+
q := newEventQueue(keyFunc)
115115

116116
q.Replace([]interface{}{
117117
cacheable{"foo", 2},
@@ -132,7 +132,7 @@ func TestEventQueue_compressTwoUpdates(t *testing.T) {
132132
}
133133

134134
func TestEventQueue_compressUpdateDelete(t *testing.T) {
135-
q := NewEventQueue(keyFunc)
135+
q := newEventQueue(keyFunc)
136136

137137
q.Replace([]interface{}{
138138
cacheable{"foo", 2},
@@ -153,7 +153,7 @@ func TestEventQueue_compressUpdateDelete(t *testing.T) {
153153
}
154154

155155
func TestEventQueue_modifyEventsFromReplace(t *testing.T) {
156-
q := NewEventQueue(keyFunc)
156+
q := newEventQueue(keyFunc)
157157

158158
q.Replace([]interface{}{
159159
cacheable{"foo", 2},
@@ -173,7 +173,7 @@ func TestEventQueue_modifyEventsFromReplace(t *testing.T) {
173173
}
174174

175175
func TestEventQueue_ListConsumed(t *testing.T) {
176-
q := NewEventQueue(keyFunc)
176+
q := newEventQueue(keyFunc)
177177
if !q.ListConsumed() {
178178
t.Fatalf("expected ListConsumed to be true after queue creation")
179179
}
@@ -207,7 +207,7 @@ func TestEventQueue_ListConsumed(t *testing.T) {
207207
}
208208

209209
func TestEventQueue_PopAfterResyncShouldBeTypeModified(t *testing.T) {
210-
q := NewEventQueue(keyFunc)
210+
q := newEventQueue(keyFunc)
211211
q.Add(cacheable{"foo", 10})
212212
q.Pop()
213213
q.Resync()
@@ -218,7 +218,7 @@ func TestEventQueue_PopAfterResyncShouldBeTypeModified(t *testing.T) {
218218
}
219219

220220
func TestEventQueue_ResyncsShouldNotCausePanics(t *testing.T) {
221-
q := NewEventQueue(keyFunc)
221+
q := newEventQueue(keyFunc)
222222
q.Add(cacheable{"foo", 10})
223223
q.Pop()
224224

@@ -236,7 +236,7 @@ func TestEventQueue_ResyncsShouldNotCausePanics(t *testing.T) {
236236
}
237237

238238
func TestEventQueue_ResyncsShouldNotRestoreDeletedItems(t *testing.T) {
239-
q := NewEventQueue(keyFunc)
239+
q := newEventQueue(keyFunc)
240240
q.Add(cacheable{"foo", 10})
241241
q.Delete(cacheable{key: "foo"})
242242

pkg/router/controller/factory/factory.go

+7-8
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,6 @@ import (
1919
kcoreclient "k8s.io/kubernetes/pkg/client/clientset_generated/internalclientset/typed/core/internalversion"
2020
kextensionsclient "k8s.io/kubernetes/pkg/client/clientset_generated/internalclientset/typed/extensions/internalversion"
2121

22-
oscache "github.com/openshift/origin/pkg/client/cache"
2322
routeapi "github.com/openshift/origin/pkg/route/apis/route"
2423
routeclient "github.com/openshift/origin/pkg/route/generated/internalclientset/typed/route/internalversion"
2524
"github.com/openshift/origin/pkg/router"
@@ -78,7 +77,7 @@ func routerKeyFn(obj interface{}) (string, error) {
7877
// Create begins listing and watching against the API server for the desired route and endpoint
7978
// resources. It spawns child goroutines that cannot be terminated.
8079
func (factory *RouterControllerFactory) Create(plugin router.Plugin, watchNodes, enableIngress bool) *routercontroller.RouterController {
81-
routeEventQueue := oscache.NewEventQueue(routerKeyFn)
80+
routeEventQueue := newEventQueue(routerKeyFn)
8281
rLW := &routeLW{
8382
client: factory.RouteClient,
8483
namespace: factory.Namespace,
@@ -87,14 +86,14 @@ func (factory *RouterControllerFactory) Create(plugin router.Plugin, watchNodes,
8786
}
8887
cache.NewReflector(&cache.ListWatch{ListFunc: rLW.List, WatchFunc: rLW.Watch}, &routeapi.Route{}, routeEventQueue, factory.ResyncInterval).Run()
8988

90-
endpointsEventQueue := oscache.NewEventQueue(routerKeyFn)
89+
endpointsEventQueue := newEventQueue(routerKeyFn)
9190
cache.NewReflector(&endpointsLW{
9291
client: factory.KClient,
9392
namespace: factory.Namespace,
9493
// we do not scope endpoints by labels or fields because the route labels != endpoints labels
9594
}, &kapi.Endpoints{}, endpointsEventQueue, factory.ResyncInterval).Run()
9695

97-
nodeEventQueue := oscache.NewEventQueue(routerKeyFn)
96+
nodeEventQueue := newEventQueue(routerKeyFn)
9897
if watchNodes {
9998
cache.NewReflector(&nodeLW{
10099
client: factory.NodeClient,
@@ -103,8 +102,8 @@ func (factory *RouterControllerFactory) Create(plugin router.Plugin, watchNodes,
103102
}, &kapi.Node{}, nodeEventQueue, factory.ResyncInterval).Run()
104103
}
105104

106-
ingressEventQueue := oscache.NewEventQueue(routerKeyFn)
107-
secretEventQueue := oscache.NewEventQueue(routerKeyFn)
105+
ingressEventQueue := newEventQueue(routerKeyFn)
106+
secretEventQueue := newEventQueue(routerKeyFn)
108107
var ingressTranslator *routercontroller.IngressTranslator
109108
if enableIngress {
110109
ingressTranslator = routercontroller.NewIngressTranslator(factory.SecretClient)
@@ -216,7 +215,7 @@ func (factory *RouterControllerFactory) Create(plugin router.Plugin, watchNodes,
216215
func (factory *RouterControllerFactory) CreateNotifier(changed func()) RoutesByHost {
217216
keyFn := routerKeyFn
218217
routeStore := cache.NewIndexer(keyFn, cache.Indexers{"host": hostIndexFunc})
219-
routeEventQueue := oscache.NewEventQueueForStore(keyFn, routeStore)
218+
routeEventQueue := newEventQueueForStore(keyFn, routeStore)
220219
rLW := &routeLW{
221220
client: factory.RouteClient,
222221
namespace: factory.Namespace,
@@ -226,7 +225,7 @@ func (factory *RouterControllerFactory) CreateNotifier(changed func()) RoutesByH
226225
cache.NewReflector(&cache.ListWatch{ListFunc: rLW.List, WatchFunc: rLW.Watch}, &routeapi.Route{}, routeEventQueue, factory.ResyncInterval).Run()
227226

228227
endpointStore := cache.NewStore(keyFn)
229-
endpointsEventQueue := oscache.NewEventQueueForStore(keyFn, endpointStore)
228+
endpointsEventQueue := newEventQueueForStore(keyFn, endpointStore)
230229
cache.NewReflector(&endpointsLW{
231230
client: factory.KClient,
232231
namespace: factory.Namespace,

0 commit comments

Comments
 (0)