@@ -7,19 +7,19 @@ const parallel = require('async/parallel')
7
7
const series = require ( 'async/series' )
8
8
const _times = require ( 'lodash.times' )
9
9
10
- const PSG = require ( '../src' )
10
+ const FloodSub = require ( '../src' )
11
11
const utils = require ( './utils' )
12
12
const first = utils . first
13
13
const createNode = utils . createNode
14
14
const expectSet = utils . expectSet
15
15
16
- describe ( 'basics' , ( ) => {
17
- let nodeA
18
- let nodeB
19
- let psA
20
- let psB
21
-
16
+ describe ( 'basics between 2 nodes' , ( ) => {
22
17
describe ( 'fresh nodes' , ( ) => {
18
+ let nodeA
19
+ let nodeB
20
+ let fsA
21
+ let fsB
22
+
23
23
before ( ( done ) => {
24
24
series ( [
25
25
( cb ) => createNode ( '/ip4/127.0.0.1/tcp/0' , cb ) ,
@@ -42,124 +42,143 @@ describe('basics', () => {
42
42
} )
43
43
44
44
it ( 'Mount the pubsub protocol' , ( done ) => {
45
- psA = new PSG ( nodeA )
46
- psB = new PSG ( nodeB )
45
+ fsA = new FloodSub ( nodeA )
46
+ fsB = new FloodSub ( nodeB )
47
47
48
48
setTimeout ( ( ) => {
49
- expect ( psA . peers . size ) . to . be . eql ( 0 )
50
- expect ( psA . subscriptions . size ) . to . eql ( 0 )
51
- expect ( psB . peers . size ) . to . be . eql ( 0 )
52
- expect ( psB . subscriptions . size ) . to . eql ( 0 )
49
+ expect ( fsA . peers . size ) . to . be . eql ( 0 )
50
+ expect ( fsA . subscriptions . size ) . to . eql ( 0 )
51
+ expect ( fsB . peers . size ) . to . be . eql ( 0 )
52
+ expect ( fsB . subscriptions . size ) . to . eql ( 0 )
53
53
done ( )
54
54
} , 50 )
55
55
} )
56
56
57
+ it ( 'start both FloodSubs' , ( done ) => {
58
+ parallel ( [
59
+ ( cb ) => fsA . start ( cb ) ,
60
+ ( cb ) => fsB . start ( cb )
61
+ ] , done )
62
+ } )
63
+
57
64
it ( 'Dial from nodeA to nodeB' , ( done ) => {
58
65
series ( [
59
66
( cb ) => nodeA . dialByPeerInfo ( nodeB . peerInfo , cb ) ,
60
67
( cb ) => setTimeout ( ( ) => {
61
- expect ( psA . peers . size ) . to . equal ( 1 )
62
- expect ( psB . peers . size ) . to . equal ( 1 )
68
+ expect ( fsA . peers . size ) . to . equal ( 1 )
69
+ expect ( fsB . peers . size ) . to . equal ( 1 )
63
70
cb ( )
64
71
} , 250 )
65
72
] , done )
66
73
} )
67
74
68
75
it ( 'Subscribe to a topic:Z in nodeA' , ( done ) => {
69
- psA . subscribe ( 'Z' )
76
+ fsA . subscribe ( 'Z' )
70
77
setTimeout ( ( ) => {
71
- expectSet ( psA . subscriptions , [ 'Z' ] )
72
- expect ( psB . peers . size ) . to . equal ( 1 )
73
- expectSet ( first ( psB . peers ) . topics , [ 'Z' ] )
78
+ expectSet ( fsA . subscriptions , [ 'Z' ] )
79
+ expect ( fsB . peers . size ) . to . equal ( 1 )
80
+ expectSet ( first ( fsB . peers ) . topics , [ 'Z' ] )
74
81
done ( )
75
82
} , 100 )
76
83
} )
77
84
78
85
it ( 'Publish to a topic:Z in nodeA' , ( done ) => {
79
- psB . once ( 'Z' , shouldNotHappen )
86
+ fsB . once ( 'Z' , shouldNotHappen )
80
87
81
88
function shouldNotHappen ( msg ) { expect . fail ( ) }
82
89
83
- psA . once ( 'Z' , ( msg ) => {
90
+ fsA . once ( 'Z' , ( msg ) => {
84
91
expect ( msg . data . toString ( ) ) . to . equal ( 'hey' )
85
- psB . removeListener ( 'Z' , shouldNotHappen )
92
+ fsB . removeListener ( 'Z' , shouldNotHappen )
86
93
done ( )
87
94
} )
88
95
89
- psB . once ( 'Z' , shouldNotHappen )
96
+ fsB . once ( 'Z' , shouldNotHappen )
90
97
91
- psA . publish ( 'Z' , new Buffer ( 'hey' ) )
98
+ fsA . publish ( 'Z' , new Buffer ( 'hey' ) )
92
99
} )
93
100
94
101
it ( 'Publish to a topic:Z in nodeB' , ( done ) => {
95
- psB . once ( 'Z' , shouldNotHappen )
102
+ fsB . once ( 'Z' , shouldNotHappen )
96
103
97
- psA . once ( 'Z' , ( msg ) => {
98
- psA . once ( 'Z' , shouldNotHappen )
104
+ fsA . once ( 'Z' , ( msg ) => {
105
+ fsA . once ( 'Z' , shouldNotHappen )
99
106
expect ( msg . data . toString ( ) ) . to . equal ( 'banana' )
100
107
setTimeout ( ( ) => {
101
- psA . removeListener ( 'Z' , shouldNotHappen )
102
- psB . removeListener ( 'Z' , shouldNotHappen )
108
+ fsA . removeListener ( 'Z' , shouldNotHappen )
109
+ fsB . removeListener ( 'Z' , shouldNotHappen )
103
110
done ( )
104
111
} , 100 )
105
112
} )
106
113
107
- psB . once ( 'Z' , shouldNotHappen )
114
+ fsB . once ( 'Z' , shouldNotHappen )
108
115
109
- psB . publish ( 'Z' , new Buffer ( 'banana' ) )
116
+ fsB . publish ( 'Z' , new Buffer ( 'banana' ) )
110
117
} )
111
118
112
119
it ( 'Publish 10 msg to a topic:Z in nodeB' , ( done ) => {
113
120
let counter = 0
114
121
115
- psB . once ( 'Z' , shouldNotHappen )
122
+ fsB . once ( 'Z' , shouldNotHappen )
116
123
117
- psA . on ( 'Z' , receivedMsg )
124
+ fsA . on ( 'Z' , receivedMsg )
118
125
119
126
function receivedMsg ( msg ) {
120
127
expect ( msg . data . toString ( ) ) . to . equal ( 'banana' )
121
- expect ( msg . from ) . to . be . eql ( psB . libp2p . peerInfo . id . toB58String ( ) )
128
+ expect ( msg . from ) . to . be . eql ( fsB . libp2p . peerInfo . id . toB58String ( ) )
122
129
expect ( Buffer . isBuffer ( msg . seqno ) ) . to . be . true
123
130
expect ( msg . topicCIDs ) . to . be . eql ( [ 'Z' ] )
124
131
125
132
if ( ++ counter === 10 ) {
126
- psA . removeListener ( 'Z' , receivedMsg )
133
+ fsA . removeListener ( 'Z' , receivedMsg )
127
134
done ( )
128
135
}
129
136
}
130
137
131
138
_times ( 10 , ( ) => {
132
- psB . publish ( 'Z' , new Buffer ( 'banana' ) )
139
+ fsB . publish ( 'Z' , new Buffer ( 'banana' ) )
133
140
} )
134
141
} )
135
142
136
143
it ( 'Unsubscribe from topic:Z in nodeA' , ( done ) => {
137
- psA . unsubscribe ( 'Z' )
138
- expect ( psA . subscriptions . size ) . to . equal ( 0 )
144
+ fsA . unsubscribe ( 'Z' )
145
+ expect ( fsA . subscriptions . size ) . to . equal ( 0 )
139
146
140
147
setTimeout ( ( ) => {
141
- expect ( psB . peers . size ) . to . equal ( 1 )
142
- expectSet ( first ( psB . peers ) . topics , [ ] )
148
+ expect ( fsB . peers . size ) . to . equal ( 1 )
149
+ expectSet ( first ( fsB . peers ) . topics , [ ] )
143
150
done ( )
144
151
} , 100 )
145
152
} )
146
153
147
154
it ( 'Publish to a topic:Z in nodeA nodeB' , ( done ) => {
148
- psA . once ( 'Z' , shouldNotHappen )
149
- psB . once ( 'Z' , shouldNotHappen )
155
+ fsA . once ( 'Z' , shouldNotHappen )
156
+ fsB . once ( 'Z' , shouldNotHappen )
150
157
151
158
setTimeout ( ( ) => {
152
- psA . removeListener ( 'Z' , shouldNotHappen )
153
- psB . removeListener ( 'Z' , shouldNotHappen )
159
+ fsA . removeListener ( 'Z' , shouldNotHappen )
160
+ fsB . removeListener ( 'Z' , shouldNotHappen )
154
161
done ( )
155
162
} , 100 )
156
163
157
- psB . publish ( 'Z' , new Buffer ( 'banana' ) )
158
- psA . publish ( 'Z' , new Buffer ( 'banana' ) )
164
+ fsB . publish ( 'Z' , new Buffer ( 'banana' ) )
165
+ fsA . publish ( 'Z' , new Buffer ( 'banana' ) )
166
+ } )
167
+
168
+ it ( 'stop both FloodSubs' , ( done ) => {
169
+ parallel ( [
170
+ ( cb ) => fsA . stop ( cb ) ,
171
+ ( cb ) => fsB . stop ( cb )
172
+ ] , done )
159
173
} )
160
174
} )
161
175
162
176
describe ( 'long running nodes (already have state)' , ( ) => {
177
+ let nodeA
178
+ let nodeB
179
+ let fsA
180
+ let fsB
181
+
163
182
before ( ( done ) => {
164
183
series ( [
165
184
( cb ) => createNode ( '/ip4/127.0.0.1/tcp/0' , cb ) ,
@@ -168,20 +187,24 @@ describe('basics', () => {
168
187
nodeA = nodes [ 0 ]
169
188
nodeB = nodes [ 1 ]
170
189
171
- psA = new PSG ( nodeA )
172
- psB = new PSG ( nodeB )
190
+ fsA = new FloodSub ( nodeA )
191
+ fsB = new FloodSub ( nodeB )
173
192
174
- psA . subscribe ( 'Za' )
175
- psB . subscribe ( 'Zb' )
193
+ parallel ( [
194
+ ( cb ) => fsA . start ( cb ) ,
195
+ ( cb ) => fsB . start ( cb )
196
+ ] , next )
176
197
177
- setTimeout ( ( ) => {
178
- expect ( psA . peers . size ) . to . equal ( 0 )
179
- expectSet ( psA . subscriptions , [ 'Za' ] )
180
- expect ( psB . peers . size ) . to . equal ( 0 )
181
- expectSet ( psB . subscriptions , [ 'Zb' ] )
198
+ function next ( ) {
199
+ fsA . subscribe ( 'Za' )
200
+ fsB . subscribe ( 'Zb' )
182
201
202
+ expect ( fsA . peers . size ) . to . equal ( 0 )
203
+ expectSet ( fsA . subscriptions , [ 'Za' ] )
204
+ expect ( fsB . peers . size ) . to . equal ( 0 )
205
+ expectSet ( fsB . subscriptions , [ 'Zb' ] )
183
206
done ( )
184
- } , 50 )
207
+ }
185
208
} )
186
209
} )
187
210
@@ -196,21 +219,28 @@ describe('basics', () => {
196
219
nodeA . dialByPeerInfo ( nodeB . peerInfo , ( err ) => {
197
220
expect ( err ) . to . not . exist
198
221
setTimeout ( ( ) => {
199
- expect ( psA . peers . size ) . to . equal ( 1 )
200
- expect ( psB . peers . size ) . to . equal ( 1 )
222
+ expect ( fsA . peers . size ) . to . equal ( 1 )
223
+ expect ( fsB . peers . size ) . to . equal ( 1 )
201
224
202
- expectSet ( psA . subscriptions , [ 'Za' ] )
203
- expect ( psB . peers . size ) . to . equal ( 1 )
204
- expectSet ( first ( psB . peers ) . topics , [ 'Za' ] )
225
+ expectSet ( fsA . subscriptions , [ 'Za' ] )
226
+ expect ( fsB . peers . size ) . to . equal ( 1 )
227
+ expectSet ( first ( fsB . peers ) . topics , [ 'Za' ] )
205
228
206
- expectSet ( psB . subscriptions , [ 'Zb' ] )
207
- expect ( psA . peers . size ) . to . equal ( 1 )
208
- expectSet ( first ( psA . peers ) . topics , [ 'Zb' ] )
229
+ expectSet ( fsB . subscriptions , [ 'Zb' ] )
230
+ expect ( fsA . peers . size ) . to . equal ( 1 )
231
+ expectSet ( first ( fsA . peers ) . topics , [ 'Zb' ] )
209
232
210
233
done ( )
211
234
} , 250 )
212
235
} )
213
236
} )
237
+
238
+ it ( 'stop both FloodSubs' , ( done ) => {
239
+ parallel ( [
240
+ ( cb ) => fsA . stop ( cb ) ,
241
+ ( cb ) => fsB . stop ( cb )
242
+ ] , done )
243
+ } )
214
244
} )
215
245
} )
216
246
0 commit comments