@@ -11,6 +11,8 @@ const config = require('./config')
11
11
const multicodec = config . multicodec
12
12
const ensureArray = utils . ensureArray
13
13
const setImmediate = require ( 'async/setImmediate' )
14
+ const asyncMap = require ( 'async/map' )
15
+ const noop = ( ) => { }
14
16
15
17
/**
16
18
* FloodSub (aka dumbsub is an implementation of pubsub focused on
@@ -158,11 +160,13 @@ class FloodSub extends BaseProtocol {
158
160
* @override
159
161
* @param {Array<string>|string } topics
160
162
* @param {Array<any>|any } messages
163
+ * @param {function(Error) } callback
161
164
* @returns {undefined }
162
165
*
163
166
*/
164
- publish ( topics , messages ) {
167
+ publish ( topics , messages , callback ) {
165
168
assert ( this . started , 'FloodSub is not started' )
169
+ callback = callback || noop
166
170
167
171
this . log ( 'publish' , topics , messages )
168
172
@@ -171,25 +175,29 @@ class FloodSub extends BaseProtocol {
171
175
172
176
const from = this . libp2p . peerInfo . id . toB58String ( )
173
177
174
- const buildMessage = ( msg ) => {
178
+ const buildMessage = ( msg , cb ) => {
175
179
const seqno = utils . randomSeqno ( )
176
180
this . seenCache . put ( utils . msgId ( from , seqno ) )
177
181
178
- return {
182
+ this . _buildMessage ( {
179
183
from : from ,
180
184
data : msg ,
181
185
seqno : seqno ,
182
186
topicIDs : topics
183
- }
187
+ } , cb )
184
188
}
185
189
186
- const msgObjects = messages . map ( buildMessage )
190
+ asyncMap ( messages , buildMessage , ( err , msgObjects ) => {
191
+ if ( err ) return callback ( err )
192
+
193
+ // Emit to self if I'm interested
194
+ this . _emitMessages ( topics , msgObjects )
187
195
188
- // Emit to self if I'm interested
189
- this . _emitMessages ( topics , msgObjects )
196
+ // send to all the other peers
197
+ this . _forwardMessages ( topics , msgObjects )
190
198
191
- // send to all the other peers
192
- this . _forwardMessages ( topics , msgObjects )
199
+ callback ( null )
200
+ } )
193
201
}
194
202
195
203
/**
0 commit comments