@@ -50,8 +50,8 @@ type FakeSourceSyncer struct {
50
50
History map [registry.CatalogKey ][]connectivity.State
51
51
52
52
sync.Mutex
53
- expectedEvents int
54
- done chan struct {}
53
+ expectedReadies int
54
+ done chan struct {}
55
55
}
56
56
57
57
func (f * FakeSourceSyncer ) sync (state SourceState ) {
@@ -60,18 +60,20 @@ func (f *FakeSourceSyncer) sync(state SourceState) {
60
60
f .History [state .Key ] = []connectivity.State {}
61
61
}
62
62
f .History [state .Key ] = append (f .History [state .Key ], state .State )
63
- f .expectedEvents --
64
- if f .expectedEvents == 0 {
63
+ if state .State == connectivity .Ready {
64
+ f .expectedReadies --
65
+ }
66
+ if f .expectedReadies == 0 {
65
67
f .done <- struct {}{}
66
68
}
67
69
f .Unlock ()
68
70
}
69
71
70
- func NewFakeSourceSyncer (expectedEvents int ) * FakeSourceSyncer {
72
+ func NewFakeSourceSyncer (expectedReadies int ) * FakeSourceSyncer {
71
73
return & FakeSourceSyncer {
72
- History : map [registry.CatalogKey ][]connectivity.State {},
73
- expectedEvents : expectedEvents ,
74
- done : make (chan struct {}),
74
+ History : map [registry.CatalogKey ][]connectivity.State {},
75
+ expectedReadies : expectedReadies ,
76
+ done : make (chan struct {}),
75
77
}
76
78
}
77
79
@@ -84,21 +86,19 @@ func TestConnectionEvents(t *testing.T) {
84
86
test := func (tt testcase ) func (t * testing.T ) {
85
87
return func (t * testing.T ) {
86
88
// start server for each catalog
87
- totalEvents := 0
88
89
addresses := map [registry.CatalogKey ]string {}
89
90
90
- for catalog , events := range tt .expectedHistory {
91
- totalEvents += len (events )
91
+ for catalog := range tt .expectedHistory {
92
92
serve , address , stop := server (& fakes.FakeQuery {})
93
93
addresses [catalog ] = address
94
94
go serve ()
95
95
defer stop ()
96
96
}
97
97
98
98
// start source manager
99
- syncer := NewFakeSourceSyncer (totalEvents )
99
+ syncer := NewFakeSourceSyncer (len ( tt . expectedHistory ) )
100
100
sources := NewSourceStore (logrus .New (), 1 * time .Second , 5 * time .Second , syncer .sync )
101
- ctx , cancel := context .WithTimeout (context .Background (), 10 * time .Second )
101
+ ctx , cancel := context .WithTimeout (context .Background (), 20 * time .Second )
102
102
defer cancel ()
103
103
sources .Start (ctx )
104
104
@@ -115,7 +115,13 @@ func TestConnectionEvents(t *testing.T) {
115
115
for catalog , events := range tt .expectedHistory {
116
116
recordedEvents := syncer .History [catalog ]
117
117
for i := 0 ; i < len (recordedEvents ); i ++ {
118
- require .Equal (t , (events [i ]).String (), (recordedEvents [i ]).String ())
118
+ found := false
119
+ for _ , event := range events {
120
+ if event .String () == recordedEvents [i ].String () {
121
+ found = true
122
+ }
123
+ }
124
+ require .True (t , found )
119
125
}
120
126
}
121
127
}
0 commit comments