@@ -10,10 +10,8 @@ const { pipe } = require('it-pipe')
10
10
11
11
const MulticodecTopology = require ( '../topology/multicodec-topology' )
12
12
const { codes } = require ( './errors' )
13
- /**
14
- * @type {typeof import('./message') }
15
- */
16
- const message = require ( './message' )
13
+
14
+ const { rpc, RPC , Message, SubOpts } = require ( './message' ) // eslint-disable-line no-unused-vars
17
15
const PeerStreams = require ( './peer-streams' )
18
16
const { SignaturePolicy } = require ( './signature-policy' )
19
17
const utils = require ( './utils' )
@@ -29,9 +27,9 @@ const {
29
27
* @typedef {import('bl') } BufferList
30
28
* @typedef {import('../stream-muxer/types').MuxedStream } MuxedStream
31
29
* @typedef {import('../connection/connection') } Connection
32
- * @typedef {import('./message'). RPC } RPC
33
- * @typedef {import('./message'). SubOpts } RPCSubOpts
34
- * @typedef {import('./message'). Message } RPCMessage
30
+ * @typedef {RPC } RPCM
31
+ * @typedef {SubOpts } RPCSubOpts
32
+ * @typedef {Message } RPCMessage
35
33
* @typedef {import('./signature-policy').SignaturePolicyType } SignaturePolicyType
36
34
*/
37
35
@@ -44,6 +42,16 @@ const {
44
42
* @property {Uint8Array } data
45
43
* @property {Uint8Array } [signature]
46
44
* @property {Uint8Array } [key]
45
+ *
46
+ * @typedef {Object } PubsubProperties
47
+ * @property {string } debugName - log namespace
48
+ * @property {Array<string>|string } multicodecs - protocol identificers to connect
49
+ * @property {Libp2p } libp2p
50
+ *
51
+ * @typedef {Object } PubsubOptions
52
+ * @property {SignaturePolicyType } [globalSignaturePolicy = SignaturePolicy.StrictSign] - defines how signatures should be handled
53
+ * @property {boolean } [canRelayMessage = false] - if can relay messages not subscribed
54
+ * @property {boolean } [emitSelf = false] - if publish should emit to self, if subscribed
47
55
*/
48
56
49
57
/**
@@ -52,13 +60,7 @@ const {
52
60
*/
53
61
class PubsubBaseProtocol extends EventEmitter {
54
62
/**
55
- * @param {Object } props
56
- * @param {string } props.debugName - log namespace
57
- * @param {Array<string>|string } props.multicodecs - protocol identificers to connect
58
- * @param {Libp2p } props.libp2p
59
- * @param {SignaturePolicyType } [props.globalSignaturePolicy = SignaturePolicy.StrictSign] - defines how signatures should be handled
60
- * @param {boolean } [props.canRelayMessage = false] - if can relay messages not subscribed
61
- * @param {boolean } [props.emitSelf = false] - if publish should emit to self, if subscribed
63
+ * @param {PubsubProperties & PubsubOptions } props
62
64
* @abstract
63
65
*/
64
66
constructor ( {
@@ -83,8 +85,9 @@ class PubsubBaseProtocol extends EventEmitter {
83
85
84
86
super ( )
85
87
86
- this . log = debug ( debugName )
87
- this . log . err = debug ( `${ debugName } :error` )
88
+ this . log = Object . assign ( debug ( debugName ) , {
89
+ err : debug ( `${ debugName } :error` )
90
+ } )
88
91
89
92
/**
90
93
* @type {Array<string> }
@@ -122,7 +125,7 @@ class PubsubBaseProtocol extends EventEmitter {
122
125
123
126
// validate signature policy
124
127
if ( ! SignaturePolicy [ globalSignaturePolicy ] ) {
125
- throw errcode ( new Error ( 'Invalid global signature policy' ) , codes . ERR_INVALID_SIGUATURE_POLICY )
128
+ throw errcode ( new Error ( 'Invalid global signature policy' ) , codes . ERR_INVALID_SIGNATURE_POLICY )
126
129
}
127
130
128
131
/**
@@ -369,7 +372,7 @@ class PubsubBaseProtocol extends EventEmitter {
369
372
*
370
373
* @param {string } idB58Str
371
374
* @param {PeerStreams } peerStreams
372
- * @param {RPC } rpc
375
+ * @param {RPCM } rpc
373
376
* @returns {boolean }
374
377
*/
375
378
_processRpc ( idB58Str , peerStreams , rpc ) {
@@ -379,7 +382,9 @@ class PubsubBaseProtocol extends EventEmitter {
379
382
380
383
if ( subs . length ) {
381
384
// update peer subscriptions
382
- subs . forEach ( ( subOpt ) => this . _processRpcSubOpt ( idB58Str , subOpt ) )
385
+ subs . forEach ( ( /** @type {RPCSubOpts } */ subOpt ) => {
386
+ this . _processRpcSubOpt ( idB58Str , subOpt )
387
+ } )
383
388
this . emit ( 'pubsub:subscription-change' , peerStreams . id , subs )
384
389
}
385
390
@@ -389,8 +394,9 @@ class PubsubBaseProtocol extends EventEmitter {
389
394
}
390
395
391
396
if ( msgs . length ) {
392
- msgs . forEach ( message => {
393
- if ( ! ( this . canRelayMessage || message . topicIDs . some ( ( topic ) => this . subscriptions . has ( topic ) ) ) ) {
397
+ // @ts -ignore RPC message
398
+ msgs . forEach ( ( message ) => {
399
+ if ( ! ( this . canRelayMessage || message . topicIDs . some ( ( /** @type {string } */ topic ) => this . subscriptions . has ( topic ) ) ) ) {
394
400
this . log ( 'received message we didn\'t subscribe to. Dropping.' )
395
401
return
396
402
}
@@ -499,28 +505,28 @@ class PubsubBaseProtocol extends EventEmitter {
499
505
* This can be override to use a custom router protobuf.
500
506
*
501
507
* @param {Uint8Array } bytes
502
- * @returns {RPC }
508
+ * @returns {RPCM }
503
509
*/
504
510
_decodeRpc ( bytes ) {
505
- return message . rpc . RPC . decode ( bytes )
511
+ return rpc . RPC . decode ( bytes )
506
512
}
507
513
508
514
/**
509
515
* Encode RPC object into a Uint8Array.
510
516
* This can be override to use a custom router protobuf.
511
517
*
512
- * @param {RPC } rpc
518
+ * @param {RPCM } rpc
513
519
* @returns {Uint8Array }
514
520
*/
515
521
_encodeRpc ( rpc ) {
516
- return message . rpc . RPC . encode ( rpc )
522
+ return rpc . RPC . encode ( rpc )
517
523
}
518
524
519
525
/**
520
526
* Send an rpc object to a peer
521
527
*
522
528
* @param {string } id - peer id
523
- * @param {RPC } rpc
529
+ * @param {RPCM } rpc
524
530
* @returns {void }
525
531
*/
526
532
_sendRpc ( id , rpc ) {
@@ -589,7 +595,7 @@ class PubsubBaseProtocol extends EventEmitter {
589
595
for ( const topic of message . topicIDs ) {
590
596
const validatorFn = this . topicValidators . get ( topic )
591
597
if ( ! validatorFn ) {
592
- continue
598
+ continue // eslint-disable-line
593
599
}
594
600
await validatorFn ( topic , message )
595
601
}
@@ -738,7 +744,7 @@ class PubsubBaseProtocol extends EventEmitter {
738
744
}
739
745
}
740
746
741
- PubsubBaseProtocol . message = message
747
+ PubsubBaseProtocol . message = { rpc }
742
748
PubsubBaseProtocol . utils = utils
743
749
PubsubBaseProtocol . SignaturePolicy = SignaturePolicy
744
750
0 commit comments