@@ -11,7 +11,7 @@ const { pipe } = require('it-pipe')
11
11
const MulticodecTopology = require ( '../topology/multicodec-topology' )
12
12
const { codes } = require ( './errors' )
13
13
14
- const message = require ( './message' )
14
+ const { RPC } = require ( './message/rpc ' )
15
15
const PeerStreams = require ( './peer-streams' )
16
16
const { SignaturePolicy } = require ( './signature-policy' )
17
17
const utils = require ( './utils' )
@@ -27,10 +27,10 @@ const {
27
27
* @typedef {import('bl') } BufferList
28
28
* @typedef {import('../stream-muxer/types').MuxedStream } MuxedStream
29
29
* @typedef {import('../connection/connection') } Connection
30
- * @typedef {import('./message/types').RPC } RPC
31
- * @typedef {import('./message/types').SubOpts } RPCSubOpts
32
- * @typedef {import('./message/types').Message } RPCMessage
33
30
* @typedef {import('./signature-policy').SignaturePolicyType } SignaturePolicyType
31
+ * @typedef {import('./message/rpc').IRPC } IRPC
32
+ * @typedef {import('./message/rpc').RPC.SubOpts } RPCSubOpts
33
+ * @typedef {import('./message/rpc').RPC.Message } RPCMessage
34
34
*/
35
35
36
36
/**
@@ -382,7 +382,7 @@ class PubsubBaseProtocol extends EventEmitter {
382
382
383
383
if ( subs . length ) {
384
384
// update peer subscriptions
385
- subs . forEach ( ( /** @type { RPCSubOpts } */ subOpt ) => {
385
+ subs . forEach ( ( subOpt ) => {
386
386
this . _processRpcSubOpt ( idB58Str , subOpt )
387
387
} )
388
388
this . emit ( 'pubsub:subscription-change' , peerStreams . id , subs )
@@ -396,7 +396,7 @@ class PubsubBaseProtocol extends EventEmitter {
396
396
if ( msgs . length ) {
397
397
// @ts -ignore RPC message is modified
398
398
msgs . forEach ( ( message ) => {
399
- if ( ! ( this . canRelayMessage || message . topicIDs . some ( ( /** @type { string } */ topic ) => this . subscriptions . has ( topic ) ) ) ) {
399
+ if ( ! ( this . canRelayMessage || ( message . topicIDs && message . topicIDs . some ( ( topic ) => this . subscriptions . has ( topic ) ) ) ) ) {
400
400
this . log ( 'received message we didn\'t subscribe to. Dropping.' )
401
401
return
402
402
}
@@ -411,11 +411,15 @@ class PubsubBaseProtocol extends EventEmitter {
411
411
* Handles a subscription change from a peer
412
412
*
413
413
* @param {string } id
414
- * @param {RPCSubOpts } subOpt
414
+ * @param {RPC.ISubOpts } subOpt
415
415
*/
416
416
_processRpcSubOpt ( id , subOpt ) {
417
417
const t = subOpt . topicID
418
418
419
+ if ( ! t ) {
420
+ return
421
+ }
422
+
419
423
let topicSet = this . topics . get ( t )
420
424
if ( ! topicSet ) {
421
425
topicSet = new Set ( )
@@ -473,13 +477,14 @@ class PubsubBaseProtocol extends EventEmitter {
473
477
* The default msgID implementation
474
478
* Child class can override this.
475
479
*
476
- * @param {RPCMessage } msg - the message object
480
+ * @param {InMessage } msg - the message object
477
481
* @returns {Uint8Array } message id as bytes
478
482
*/
479
483
getMsgId ( msg ) {
480
484
const signaturePolicy = this . globalSignaturePolicy
481
485
switch ( signaturePolicy ) {
482
486
case SignaturePolicy . StrictSign :
487
+ // @ts -ignore seqno is optional in protobuf definition but it will exist
483
488
return utils . msgId ( msg . from , msg . seqno )
484
489
case SignaturePolicy . StrictNoSign :
485
490
return utils . noSignMsgId ( msg . data )
@@ -508,25 +513,25 @@ class PubsubBaseProtocol extends EventEmitter {
508
513
* @returns {RPC }
509
514
*/
510
515
_decodeRpc ( bytes ) {
511
- return message . rpc . RPC . decode ( bytes )
516
+ return RPC . decode ( bytes )
512
517
}
513
518
514
519
/**
515
520
* Encode RPC object into a Uint8Array.
516
521
* This can be override to use a custom router protobuf.
517
522
*
518
- * @param {RPC } rpc
523
+ * @param {IRPC } rpc
519
524
* @returns {Uint8Array }
520
525
*/
521
526
_encodeRpc ( rpc ) {
522
- return message . rpc . RPC . encode ( rpc )
527
+ return RPC . encode ( rpc ) . finish ( )
523
528
}
524
529
525
530
/**
526
531
* Send an rpc object to a peer
527
532
*
528
533
* @param {string } id - peer id
529
- * @param {RPC } rpc
534
+ * @param {IRPC } rpc
530
535
* @returns {void }
531
536
*/
532
537
_sendRpc ( id , rpc ) {
@@ -592,12 +597,12 @@ class PubsubBaseProtocol extends EventEmitter {
592
597
default :
593
598
throw errcode ( new Error ( 'Cannot validate message: unhandled signature policy: ' + signaturePolicy ) , codes . ERR_UNHANDLED_SIGNATURE_POLICY )
594
599
}
600
+
595
601
for ( const topic of message . topicIDs ) {
596
602
const validatorFn = this . topicValidators . get ( topic )
597
- if ( ! validatorFn ) {
598
- continue // eslint-disable-line
603
+ if ( validatorFn ) {
604
+ await validatorFn ( topic , message )
599
605
}
600
- await validatorFn ( topic , message )
601
606
}
602
607
}
603
608
@@ -606,8 +611,8 @@ class PubsubBaseProtocol extends EventEmitter {
606
611
* Should be used by the routers to create the message to send.
607
612
*
608
613
* @protected
609
- * @param {RPCMessage } message
610
- * @returns {Promise<RPCMessage > }
614
+ * @param {InMessage } message
615
+ * @returns {Promise<InMessage > }
611
616
*/
612
617
_buildMessage ( message ) {
613
618
const signaturePolicy = this . globalSignaturePolicy
@@ -617,7 +622,7 @@ class PubsubBaseProtocol extends EventEmitter {
617
622
message . seqno = utils . randomSeqno ( )
618
623
return signMessage ( this . peerId , utils . normalizeOutRpcMessage ( message ) )
619
624
case SignaturePolicy . StrictNoSign :
620
- return message
625
+ return Promise . resolve ( message )
621
626
default :
622
627
throw errcode ( new Error ( 'Cannot build message: unhandled signature policy: ' + signaturePolicy ) , codes . ERR_UNHANDLED_SIGNATURE_POLICY )
623
628
}
@@ -663,29 +668,30 @@ class PubsubBaseProtocol extends EventEmitter {
663
668
this . log ( 'publish' , topic , message )
664
669
665
670
const from = this . peerId . toB58String ( )
666
- let msgObject = {
671
+ const msgObject = {
667
672
receivedFrom : from ,
668
673
data : message ,
669
674
topicIDs : [ topic ]
670
675
}
671
676
672
677
// ensure that the message follows the signature policy
673
678
const outMsg = await this . _buildMessage ( msgObject )
674
- msgObject = utils . normalizeInRpcMessage ( outMsg )
679
+ // @ts -ignore different type as from is converted
680
+ const msg = utils . normalizeInRpcMessage ( outMsg )
675
681
676
682
// Emit to self if I'm interested and emitSelf enabled
677
- this . emitSelf && this . _emitMessage ( msgObject )
683
+ this . emitSelf && this . _emitMessage ( msg )
678
684
679
685
// send to all the other peers
680
- await this . _publish ( msgObject )
686
+ await this . _publish ( msg )
681
687
}
682
688
683
689
/**
684
690
* Overriding the implementation of publish should handle the appropriate algorithms for the publish/subscriber implementation.
685
691
* For example, a Floodsub implementation might simply publish each message to each topic for every peer
686
692
*
687
693
* @abstract
688
- * @param {InMessage } message
694
+ * @param {InMessage|RPCMessage } message
689
695
* @returns {Promise<void> }
690
696
*
691
697
*/
@@ -744,7 +750,6 @@ class PubsubBaseProtocol extends EventEmitter {
744
750
}
745
751
}
746
752
747
- PubsubBaseProtocol . message = message
748
753
PubsubBaseProtocol . utils = utils
749
754
PubsubBaseProtocol . SignaturePolicy = SignaturePolicy
750
755
0 commit comments