@@ -38,7 +38,6 @@ import {
38
38
ToSendGroupCount
39
39
} from './metrics.js'
40
40
import {
41
- MessageAcceptance ,
42
41
MsgIdFn ,
43
42
PublishConfig ,
44
43
TopicStr ,
@@ -51,7 +50,6 @@ import {
51
50
FastMsgIdFn ,
52
51
AddrInfo ,
53
52
DataTransform ,
54
- TopicValidatorFn ,
55
53
rejectReasonFromAcceptance ,
56
54
MsgIdToStrFn ,
57
55
MessageId
@@ -61,7 +59,6 @@ import { msgIdFnStrictNoSign, msgIdFnStrictSign } from './utils/msgIdFn.js'
61
59
import { computeAllPeersScoreWeights } from './score/scoreMetrics.js'
62
60
import { getPublishConfigFromPeerId } from './utils/publishConfig.js'
63
61
import type { GossipsubOptsSpec } from './config.js'
64
- import { Components , Initializable } from '@libp2p/components'
65
62
import {
66
63
Message ,
67
64
PublishResult ,
@@ -70,14 +67,18 @@ import {
70
67
PubSubInit ,
71
68
StrictNoSign ,
72
69
StrictSign ,
73
- SubscriptionChangeData
70
+ SubscriptionChangeData ,
71
+ TopicValidatorFn ,
72
+ TopicValidatorResult
74
73
} from '@libp2p/interface-pubsub'
75
- import type { IncomingStreamData } from '@libp2p/interface-registrar'
74
+ import type { IncomingStreamData , Registrar } from '@libp2p/interface-registrar'
76
75
import { removeFirstNItemsFromSet , removeItemsFromSet } from './utils/set.js'
77
76
import { pushable } from 'it-pushable'
78
77
import { InboundStream , OutboundStream } from './stream.js'
79
78
import { Uint8ArrayList } from 'uint8arraylist'
80
79
import { decodeRpc , DecodeRPCLimits , defaultDecodeRpcLimits } from './message/decodeRpc.js'
80
+ import { ConnectionManager } from '@libp2p/interface-connection-manager'
81
+ import { PeerStore } from '@libp2p/interface-peer-store'
81
82
82
83
type ConnectionDirection = 'inbound' | 'outbound'
83
84
@@ -209,7 +210,14 @@ interface AcceptFromWhitelistEntry {
209
210
acceptUntil : number
210
211
}
211
212
212
- export class GossipSub extends EventEmitter < GossipsubEvents > implements Initializable , PubSub < GossipsubEvents > {
213
+ export interface GossipSubComponents {
214
+ peerId : PeerId
215
+ peerStore : PeerStore
216
+ registrar : Registrar
217
+ connectionManager : ConnectionManager
218
+ }
219
+
220
+ export class GossipSub extends EventEmitter < GossipsubEvents > implements PubSub < GossipsubEvents > {
213
221
/**
214
222
* The signature policy to follow by default
215
223
*/
@@ -325,6 +333,12 @@ export class GossipSub extends EventEmitter<GossipsubEvents> implements Initiali
325
333
/** Peer score tracking */
326
334
public readonly score : PeerScore
327
335
336
+ /**
337
+ * Custom validator function per topic.
338
+ * Must return or resolve quickly (< 100ms) to prevent causing penalties for late messages.
339
+ * If you need to apply validation that may require longer times use `asyncValidation` option and callback the
340
+ * validation result through `Gossipsub.reportValidationResult`
341
+ */
328
342
public readonly topicValidators = new Map < TopicStr , TopicValidatorFn > ( )
329
343
330
344
/**
@@ -338,7 +352,7 @@ export class GossipSub extends EventEmitter<GossipsubEvents> implements Initiali
338
352
*/
339
353
readonly gossipTracer : IWantTracer
340
354
341
- private components = new Components ( )
355
+ private readonly components : GossipSubComponents
342
356
343
357
private directPeerInitial : ReturnType < typeof setTimeout > | null = null
344
358
private readonly log : Logger
@@ -361,7 +375,7 @@ export class GossipSub extends EventEmitter<GossipsubEvents> implements Initiali
361
375
cancel : ( ) => void
362
376
} | null = null
363
377
364
- constructor ( options : Partial < GossipsubOpts > = { } ) {
378
+ constructor ( components : GossipSubComponents , options : Partial < GossipsubOpts > = { } ) {
365
379
super ( )
366
380
367
381
const opts = {
@@ -392,6 +406,7 @@ export class GossipSub extends EventEmitter<GossipsubEvents> implements Initiali
392
406
scoreThresholds : createPeerScoreThresholds ( options . scoreThresholds )
393
407
}
394
408
409
+ this . components = components
395
410
this . decodeRpcLimits = opts . decodeRpcLimits ?? defaultDecodeRpcLimits
396
411
397
412
this . globalSignaturePolicy = opts . globalSignaturePolicy ?? StrictSign
@@ -473,7 +488,7 @@ export class GossipSub extends EventEmitter<GossipsubEvents> implements Initiali
473
488
/**
474
489
* libp2p
475
490
*/
476
- this . score = new PeerScore ( this . opts . scoreParams , this . metrics , {
491
+ this . score = new PeerScore ( components , this . opts . scoreParams , this . metrics , {
477
492
scoreCacheValidityMs : opts . heartbeatInterval
478
493
} )
479
494
@@ -493,14 +508,6 @@ export class GossipSub extends EventEmitter<GossipsubEvents> implements Initiali
493
508
494
509
// LIFECYCLE METHODS
495
510
496
- /**
497
- * Pass libp2p components to interested system components
498
- */
499
- async init ( components : Components ) : Promise < void > {
500
- this . components = components
501
- this . score . init ( components )
502
- }
503
-
504
511
/**
505
512
* Mounts the gossipsub protocol onto the libp2p node and sends our
506
513
* our subscriptions to every peer connected
@@ -513,7 +520,7 @@ export class GossipSub extends EventEmitter<GossipsubEvents> implements Initiali
513
520
514
521
this . log ( 'starting' )
515
522
516
- this . publishConfig = await getPublishConfigFromPeerId ( this . globalSignaturePolicy , this . components . getPeerId ( ) )
523
+ this . publishConfig = await getPublishConfigFromPeerId ( this . globalSignaturePolicy , this . components . peerId )
517
524
518
525
// Create the outbound inflight queue
519
526
// This ensures that outbound stream creation happens sequentially
@@ -527,11 +534,11 @@ export class GossipSub extends EventEmitter<GossipsubEvents> implements Initiali
527
534
// set direct peer addresses in the address book
528
535
await Promise . all (
529
536
this . opts . directPeers . map ( async ( p ) => {
530
- await this . components . getPeerStore ( ) . addressBook . add ( p . id , p . addrs )
537
+ await this . components . peerStore . addressBook . add ( p . id , p . addrs )
531
538
} )
532
539
)
533
540
534
- const registrar = this . components . getRegistrar ( )
541
+ const registrar = this . components . registrar
535
542
// Incoming streams
536
543
// Called after a peer dials us
537
544
await Promise . all (
@@ -611,7 +618,7 @@ export class GossipSub extends EventEmitter<GossipsubEvents> implements Initiali
611
618
this . status = { code : GossipStatusCode . stopped }
612
619
613
620
// unregister protocol and handlers
614
- const registrar = this . components . getRegistrar ( )
621
+ const registrar = this . components . registrar
615
622
registrarTopologyIds . forEach ( ( id ) => registrar . unregister ( id ) )
616
623
617
624
this . outboundInflightQueue . end ( )
@@ -1059,7 +1066,7 @@ export class GossipSub extends EventEmitter<GossipsubEvents> implements Initiali
1059
1066
1060
1067
// Dispatch the message to the user if we are subscribed to the topic
1061
1068
if ( this . subscriptions . has ( rpcMsg . topic ) ) {
1062
- const isFromSelf = this . components . getPeerId ( ) . equals ( from )
1069
+ const isFromSelf = this . components . peerId . equals ( from )
1063
1070
1064
1071
if ( ! isFromSelf || this . opts . emitSelf ) {
1065
1072
super . dispatchEvent (
@@ -1146,18 +1153,18 @@ export class GossipSub extends EventEmitter<GossipsubEvents> implements Initiali
1146
1153
// to not penalize peers for long validation times.
1147
1154
const topicValidator = this . topicValidators . get ( rpcMsg . topic )
1148
1155
if ( topicValidator != null ) {
1149
- let acceptance : MessageAcceptance
1156
+ let acceptance : TopicValidatorResult
1150
1157
// Use try {} catch {} in case topicValidator() is synchronous
1151
1158
try {
1152
- acceptance = await topicValidator ( msg . topic , msg , propagationSource )
1159
+ acceptance = await topicValidator ( propagationSource , msg )
1153
1160
} catch ( e ) {
1154
1161
const errCode = ( e as { code : string } ) . code
1155
- if ( errCode === constants . ERR_TOPIC_VALIDATOR_IGNORE ) acceptance = MessageAcceptance . Ignore
1156
- if ( errCode === constants . ERR_TOPIC_VALIDATOR_REJECT ) acceptance = MessageAcceptance . Reject
1157
- else acceptance = MessageAcceptance . Ignore
1162
+ if ( errCode === constants . ERR_TOPIC_VALIDATOR_IGNORE ) acceptance = TopicValidatorResult . Ignore
1163
+ if ( errCode === constants . ERR_TOPIC_VALIDATOR_REJECT ) acceptance = TopicValidatorResult . Reject
1164
+ else acceptance = TopicValidatorResult . Ignore
1158
1165
}
1159
1166
1160
- if ( acceptance !== MessageAcceptance . Accept ) {
1167
+ if ( acceptance !== TopicValidatorResult . Accept ) {
1161
1168
return { code : MessageStatus . invalid , reason : rejectReasonFromAcceptance ( acceptance ) , msgIdStr }
1162
1169
}
1163
1170
}
@@ -1619,7 +1626,7 @@ export class GossipSub extends EventEmitter<GossipsubEvents> implements Initiali
1619
1626
this . log ( "bogus peer record obtained through px: peer ID %p doesn't match expected peer %p" , eid , p )
1620
1627
return
1621
1628
}
1622
- if ( ! ( await this . components . getPeerStore ( ) . addressBook . consumePeerRecord ( envelope ) ) ) {
1629
+ if ( ! ( await this . components . peerStore . addressBook . consumePeerRecord ( envelope ) ) ) {
1623
1630
this . log ( 'bogus peer record obtained through px: could not add peer record to address book' )
1624
1631
return
1625
1632
}
@@ -1643,9 +1650,9 @@ export class GossipSub extends EventEmitter<GossipsubEvents> implements Initiali
1643
1650
private async connect ( id : PeerIdStr ) : Promise < void > {
1644
1651
this . log ( 'Initiating connection with %s' , id )
1645
1652
const peerId = peerIdFromString ( id )
1646
- const connection = await this . components . getConnectionManager ( ) . openConnection ( peerId )
1653
+ const connection = await this . components . connectionManager . openConnection ( peerId )
1647
1654
for ( const multicodec of this . multicodecs ) {
1648
- for ( const topology of this . components . getRegistrar ( ) . getTopologies ( multicodec ) ) {
1655
+ for ( const topology of this . components . registrar . getTopologies ( multicodec ) ) {
1649
1656
topology . onConnect ( peerId , connection )
1650
1657
}
1651
1658
}
@@ -2006,12 +2013,12 @@ export class GossipSub extends EventEmitter<GossipsubEvents> implements Initiali
2006
2013
2007
2014
// Dispatch the message to the user if we are subscribed to the topic
2008
2015
if ( willSendToSelf ) {
2009
- tosend . add ( this . components . getPeerId ( ) . toString ( ) )
2016
+ tosend . add ( this . components . peerId . toString ( ) )
2010
2017
2011
2018
super . dispatchEvent (
2012
2019
new CustomEvent < GossipsubMessage > ( 'gossipsub:message' , {
2013
2020
detail : {
2014
- propagationSource : this . components . getPeerId ( ) ,
2021
+ propagationSource : this . components . peerId ,
2015
2022
msgId : msgIdStr ,
2016
2023
msg
2017
2024
}
@@ -2047,8 +2054,8 @@ export class GossipSub extends EventEmitter<GossipsubEvents> implements Initiali
2047
2054
*
2048
2055
* This should only be called once per message.
2049
2056
*/
2050
- reportMessageValidationResult ( msgId : MsgIdStr , propagationSource : PeerId , acceptance : MessageAcceptance ) : void {
2051
- if ( acceptance === MessageAcceptance . Accept ) {
2057
+ reportMessageValidationResult ( msgId : MsgIdStr , propagationSource : PeerId , acceptance : TopicValidatorResult ) : void {
2058
+ if ( acceptance === TopicValidatorResult . Accept ) {
2052
2059
const cacheEntry = this . mcache . validate ( msgId )
2053
2060
this . metrics ?. onReportValidationMcacheHit ( cacheEntry !== null )
2054
2061
@@ -2340,7 +2347,7 @@ export class GossipSub extends EventEmitter<GossipsubEvents> implements Initiali
2340
2347
2341
2348
return {
2342
2349
peerID : id . toBytes ( ) ,
2343
- signedPeerRecord : await this . components . getPeerStore ( ) . addressBook . getRawEnvelope ( id )
2350
+ signedPeerRecord : await this . components . peerStore . addressBook . getRawEnvelope ( id )
2344
2351
}
2345
2352
} )
2346
2353
)
@@ -2821,3 +2828,9 @@ export class GossipSub extends EventEmitter<GossipsubEvents> implements Initiali
2821
2828
metrics . registerScoreWeights ( sw )
2822
2829
}
2823
2830
}
2831
+
2832
+ export function gossipsub (
2833
+ init : Partial < GossipsubOpts > = { }
2834
+ ) : ( components : GossipSubComponents ) => PubSub < GossipsubEvents > {
2835
+ return ( components : GossipSubComponents ) => new GossipSub ( components , init )
2836
+ }
0 commit comments