Skip to content

fix!: remove @libp2p/components #360

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 9 additions & 6 deletions .github/.dependabot.yml
Original file line number Diff line number Diff line change
@@ -1,8 +1,11 @@
version: 2
updates:
- package-ecosystem: "npm"
allow:
# Allow both direct and indirect updates for all packages
- dependency-type: "production"
commit-message:
prefix: "chore: "
- package-ecosystem: npm
directory: "/"
schedule:
interval: daily
time: "10:00"
open-pull-requests-limit: 10
commit-message:
prefix: "deps"
prefix-development: "deps(dev)"
999 changes: 702 additions & 297 deletions package-lock.json

Large diffs are not rendered by default.

21 changes: 11 additions & 10 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -65,38 +65,39 @@
},
"homepage": "https://github.com/ChainSafe/js-libp2p-gossipsub#readme",
"dependencies": {
"@libp2p/components": "^2.0.3",
"@libp2p/crypto": "^1.0.3",
"@libp2p/interface-connection": "^3.0.1",
"@libp2p/interface-connection-manager": "^1.3.0",
"@libp2p/interface-keys": "^1.0.3",
"@libp2p/interface-peer-id": "^1.0.4",
"@libp2p/interface-pubsub": "^2.0.1",
"@libp2p/interface-peer-store": "^1.2.2",
"@libp2p/interface-pubsub": "^3.0.0",
"@libp2p/interface-registrar": "^2.0.3",
"@libp2p/interfaces": "^3.0.3",
"@libp2p/logger": "^2.0.0",
"@libp2p/peer-id": "^1.1.15",
"@libp2p/peer-record": "^4.0.1",
"@libp2p/pubsub": "^3.1.2",
"@libp2p/pubsub": "^5.0.0",
"@libp2p/topology": "^3.0.0",
"abortable-iterator": "^4.0.2",
"denque": "^1.5.0",
"err-code": "^3.0.1",
"it-length-prefixed": "^8.0.2",
"it-pipe": "^2.0.4",
"it-pushable": "^3.1.0",
"multiformats": "^9.6.4",
"multiformats": "^10.0.0",
"protobufjs": "^6.11.2",
"uint8arraylist": "^2.3.2",
"uint8arrays": "^3.0.0"
"uint8arrays": "^4.0.2"
},
"devDependencies": {
"@chainsafe/as-sha256": "^0.2.4",
"@dapplion/benchmark": "^0.2.2",
"@libp2p/floodsub": "^3.0.5",
"@libp2p/interface-mocks": "^4.0.1",
"@libp2p/interface-pubsub-compliance-tests": "^2.0.2",
"@libp2p/floodsub": "^5.0.0",
"@libp2p/interface-mocks": "^7.0.1",
"@libp2p/interface-pubsub-compliance-tests": "^4.0.0",
"@libp2p/peer-id-factory": "^1.0.18",
"@libp2p/peer-store": "^3.1.2",
"@libp2p/peer-store": "^5.0.0",
"@multiformats/multiaddr": "^11.0.0",
"@types/node": "^17.0.21",
"@typescript-eslint/eslint-plugin": "^3.0.2",
Expand All @@ -115,6 +116,7 @@
"eslint-plugin-standard": "^4.0.1",
"it-pair": "^2.0.2",
"lodash": "^4.17.15",
"mkdirp": "^1.0.4",
"os": "^0.1.1",
"p-defer": "^4.0.0",
"p-event": "^5.0.1",
Expand All @@ -127,7 +129,6 @@
"time-cache": "^0.3.0",
"ts-node": "^10.7.0",
"ts-sinon": "^2.0.2",
"mkdirp": "^1.0.4",
"util": "^0.12.3"
},
"engines": {
Expand Down
85 changes: 49 additions & 36 deletions src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,6 @@ import {
ToSendGroupCount
} from './metrics.js'
import {
MessageAcceptance,
MsgIdFn,
PublishConfig,
TopicStr,
Expand All @@ -51,7 +50,6 @@ import {
FastMsgIdFn,
AddrInfo,
DataTransform,
TopicValidatorFn,
rejectReasonFromAcceptance,
MsgIdToStrFn,
MessageId
Expand All @@ -61,7 +59,6 @@ import { msgIdFnStrictNoSign, msgIdFnStrictSign } from './utils/msgIdFn.js'
import { computeAllPeersScoreWeights } from './score/scoreMetrics.js'
import { getPublishConfigFromPeerId } from './utils/publishConfig.js'
import type { GossipsubOptsSpec } from './config.js'
import { Components, Initializable } from '@libp2p/components'
import {
Message,
PublishResult,
Expand All @@ -70,14 +67,18 @@ import {
PubSubInit,
StrictNoSign,
StrictSign,
SubscriptionChangeData
SubscriptionChangeData,
TopicValidatorFn,
TopicValidatorResult
} from '@libp2p/interface-pubsub'
import type { IncomingStreamData } from '@libp2p/interface-registrar'
import type { IncomingStreamData, Registrar } from '@libp2p/interface-registrar'
import { removeFirstNItemsFromSet, removeItemsFromSet } from './utils/set.js'
import { pushable } from 'it-pushable'
import { InboundStream, OutboundStream } from './stream.js'
import { Uint8ArrayList } from 'uint8arraylist'
import { decodeRpc, DecodeRPCLimits, defaultDecodeRpcLimits } from './message/decodeRpc.js'
import { ConnectionManager } from '@libp2p/interface-connection-manager'
import { PeerStore } from '@libp2p/interface-peer-store'

type ConnectionDirection = 'inbound' | 'outbound'

Expand Down Expand Up @@ -209,7 +210,14 @@ interface AcceptFromWhitelistEntry {
acceptUntil: number
}

export class GossipSub extends EventEmitter<GossipsubEvents> implements Initializable, PubSub<GossipsubEvents> {
export interface GossipSubComponents {
peerId: PeerId
peerStore: PeerStore
registrar: Registrar
connectionManager: ConnectionManager
}

export class GossipSub extends EventEmitter<GossipsubEvents> implements PubSub<GossipsubEvents> {
/**
* The signature policy to follow by default
*/
Expand Down Expand Up @@ -325,6 +333,12 @@ export class GossipSub extends EventEmitter<GossipsubEvents> implements Initiali
/** Peer score tracking */
public readonly score: PeerScore

/**
* Custom validator function per topic.
* Must return or resolve quickly (< 100ms) to prevent causing penalties for late messages.
* If you need to apply validation that may require longer times use `asyncValidation` option and callback the
* validation result through `Gossipsub.reportValidationResult`
*/
public readonly topicValidators = new Map<TopicStr, TopicValidatorFn>()

/**
Expand All @@ -338,7 +352,7 @@ export class GossipSub extends EventEmitter<GossipsubEvents> implements Initiali
*/
readonly gossipTracer: IWantTracer

private components = new Components()
private readonly components: GossipSubComponents

private directPeerInitial: ReturnType<typeof setTimeout> | null = null
private readonly log: Logger
Expand All @@ -361,7 +375,7 @@ export class GossipSub extends EventEmitter<GossipsubEvents> implements Initiali
cancel: () => void
} | null = null

constructor(options: Partial<GossipsubOpts> = {}) {
constructor(components: GossipSubComponents, options: Partial<GossipsubOpts> = {}) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a breaking change?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, as noted in the PR title & description

super()

const opts = {
Expand Down Expand Up @@ -392,6 +406,7 @@ export class GossipSub extends EventEmitter<GossipsubEvents> implements Initiali
scoreThresholds: createPeerScoreThresholds(options.scoreThresholds)
}

this.components = components
this.decodeRpcLimits = opts.decodeRpcLimits ?? defaultDecodeRpcLimits

this.globalSignaturePolicy = opts.globalSignaturePolicy ?? StrictSign
Expand Down Expand Up @@ -473,7 +488,7 @@ export class GossipSub extends EventEmitter<GossipsubEvents> implements Initiali
/**
* libp2p
*/
this.score = new PeerScore(this.opts.scoreParams, this.metrics, {
this.score = new PeerScore(components, this.opts.scoreParams, this.metrics, {
scoreCacheValidityMs: opts.heartbeatInterval
})

Expand All @@ -493,14 +508,6 @@ export class GossipSub extends EventEmitter<GossipsubEvents> implements Initiali

// LIFECYCLE METHODS

/**
* Pass libp2p components to interested system components
*/
async init(components: Components): Promise<void> {
this.components = components
this.score.init(components)
}

/**
* Mounts the gossipsub protocol onto the libp2p node and sends our
* our subscriptions to every peer connected
Expand All @@ -513,7 +520,7 @@ export class GossipSub extends EventEmitter<GossipsubEvents> implements Initiali

this.log('starting')

this.publishConfig = await getPublishConfigFromPeerId(this.globalSignaturePolicy, this.components.getPeerId())
this.publishConfig = await getPublishConfigFromPeerId(this.globalSignaturePolicy, this.components.peerId)

// Create the outbound inflight queue
// This ensures that outbound stream creation happens sequentially
Expand All @@ -527,11 +534,11 @@ export class GossipSub extends EventEmitter<GossipsubEvents> implements Initiali
// set direct peer addresses in the address book
await Promise.all(
this.opts.directPeers.map(async (p) => {
await this.components.getPeerStore().addressBook.add(p.id, p.addrs)
await this.components.peerStore.addressBook.add(p.id, p.addrs)
})
)

const registrar = this.components.getRegistrar()
const registrar = this.components.registrar
// Incoming streams
// Called after a peer dials us
await Promise.all(
Expand Down Expand Up @@ -611,7 +618,7 @@ export class GossipSub extends EventEmitter<GossipsubEvents> implements Initiali
this.status = { code: GossipStatusCode.stopped }

// unregister protocol and handlers
const registrar = this.components.getRegistrar()
const registrar = this.components.registrar
registrarTopologyIds.forEach((id) => registrar.unregister(id))

this.outboundInflightQueue.end()
Expand Down Expand Up @@ -1059,7 +1066,7 @@ export class GossipSub extends EventEmitter<GossipsubEvents> implements Initiali

// Dispatch the message to the user if we are subscribed to the topic
if (this.subscriptions.has(rpcMsg.topic)) {
const isFromSelf = this.components.getPeerId().equals(from)
const isFromSelf = this.components.peerId.equals(from)

if (!isFromSelf || this.opts.emitSelf) {
super.dispatchEvent(
Expand Down Expand Up @@ -1146,18 +1153,18 @@ export class GossipSub extends EventEmitter<GossipsubEvents> implements Initiali
// to not penalize peers for long validation times.
const topicValidator = this.topicValidators.get(rpcMsg.topic)
if (topicValidator != null) {
let acceptance: MessageAcceptance
let acceptance: TopicValidatorResult
// Use try {} catch {} in case topicValidator() is synchronous
try {
acceptance = await topicValidator(msg.topic, msg, propagationSource)
acceptance = await topicValidator(propagationSource, msg)
} catch (e) {
const errCode = (e as { code: string }).code
if (errCode === constants.ERR_TOPIC_VALIDATOR_IGNORE) acceptance = MessageAcceptance.Ignore
if (errCode === constants.ERR_TOPIC_VALIDATOR_REJECT) acceptance = MessageAcceptance.Reject
else acceptance = MessageAcceptance.Ignore
if (errCode === constants.ERR_TOPIC_VALIDATOR_IGNORE) acceptance = TopicValidatorResult.Ignore
if (errCode === constants.ERR_TOPIC_VALIDATOR_REJECT) acceptance = TopicValidatorResult.Reject
else acceptance = TopicValidatorResult.Ignore
}

if (acceptance !== MessageAcceptance.Accept) {
if (acceptance !== TopicValidatorResult.Accept) {
return { code: MessageStatus.invalid, reason: rejectReasonFromAcceptance(acceptance), msgIdStr }
}
}
Expand Down Expand Up @@ -1619,7 +1626,7 @@ export class GossipSub extends EventEmitter<GossipsubEvents> implements Initiali
this.log("bogus peer record obtained through px: peer ID %p doesn't match expected peer %p", eid, p)
return
}
if (!(await this.components.getPeerStore().addressBook.consumePeerRecord(envelope))) {
if (!(await this.components.peerStore.addressBook.consumePeerRecord(envelope))) {
this.log('bogus peer record obtained through px: could not add peer record to address book')
return
}
Expand All @@ -1643,9 +1650,9 @@ export class GossipSub extends EventEmitter<GossipsubEvents> implements Initiali
private async connect(id: PeerIdStr): Promise<void> {
this.log('Initiating connection with %s', id)
const peerId = peerIdFromString(id)
const connection = await this.components.getConnectionManager().openConnection(peerId)
const connection = await this.components.connectionManager.openConnection(peerId)
for (const multicodec of this.multicodecs) {
for (const topology of this.components.getRegistrar().getTopologies(multicodec)) {
for (const topology of this.components.registrar.getTopologies(multicodec)) {
topology.onConnect(peerId, connection)
}
}
Expand Down Expand Up @@ -2006,12 +2013,12 @@ export class GossipSub extends EventEmitter<GossipsubEvents> implements Initiali

// Dispatch the message to the user if we are subscribed to the topic
if (willSendToSelf) {
tosend.add(this.components.getPeerId().toString())
tosend.add(this.components.peerId.toString())

super.dispatchEvent(
new CustomEvent<GossipsubMessage>('gossipsub:message', {
detail: {
propagationSource: this.components.getPeerId(),
propagationSource: this.components.peerId,
msgId: msgIdStr,
msg
}
Expand Down Expand Up @@ -2047,8 +2054,8 @@ export class GossipSub extends EventEmitter<GossipsubEvents> implements Initiali
*
* This should only be called once per message.
*/
reportMessageValidationResult(msgId: MsgIdStr, propagationSource: PeerId, acceptance: MessageAcceptance): void {
if (acceptance === MessageAcceptance.Accept) {
reportMessageValidationResult(msgId: MsgIdStr, propagationSource: PeerId, acceptance: TopicValidatorResult): void {
if (acceptance === TopicValidatorResult.Accept) {
const cacheEntry = this.mcache.validate(msgId)
this.metrics?.onReportValidationMcacheHit(cacheEntry !== null)

Expand Down Expand Up @@ -2340,7 +2347,7 @@ export class GossipSub extends EventEmitter<GossipsubEvents> implements Initiali

return {
peerID: id.toBytes(),
signedPeerRecord: await this.components.getPeerStore().addressBook.getRawEnvelope(id)
signedPeerRecord: await this.components.peerStore.addressBook.getRawEnvelope(id)
}
})
)
Expand Down Expand Up @@ -2821,3 +2828,9 @@ export class GossipSub extends EventEmitter<GossipsubEvents> implements Initiali
metrics.registerScoreWeights(sw)
}
}

export function gossipsub(
init: Partial<GossipsubOpts> = {}
): (components: GossipSubComponents) => PubSub<GossipsubEvents> {
return (components: GossipSubComponents) => new GossipSub(components, init)
}
15 changes: 4 additions & 11 deletions src/metrics.ts
Original file line number Diff line number Diff line change
@@ -1,14 +1,7 @@
import { TopicValidatorResult } from '@libp2p/interface-pubsub'
import type { IRPC } from './message/rpc.js'
import type { PeerScoreThresholds } from './score/peer-score-thresholds.js'
import {
MessageAcceptance,
MessageStatus,
PeerIdStr,
RejectReason,
RejectReasonObj,
TopicStr,
ValidateError
} from './types.js'
import { MessageStatus, PeerIdStr, RejectReason, RejectReasonObj, TopicStr, ValidateError } from './types.js'

/** Topic label as provided in `topicStrToLabel` */
export type TopicLabel = string
Expand Down Expand Up @@ -241,7 +234,7 @@ export function getMetrics(
/** Message validation results for each topic.
* Invalid == Reject?
* = rust-libp2p `invalid_messages`, `accepted_messages`, `ignored_messages`, `rejected_messages` */
asyncValidationResult: register.gauge<{ topic: TopicLabel; acceptance: MessageAcceptance }>({
asyncValidationResult: register.gauge<{ topic: TopicLabel; acceptance: TopicValidatorResult }>({
name: 'gossipsub_async_validation_result_total',
help: 'Message validation result for each topic',
labelNames: ['topic', 'acceptance']
Expand Down Expand Up @@ -543,7 +536,7 @@ export function getMetrics(
this.asyncValidationMcacheHit.inc({ hit: hit ? 'hit' : 'miss' })
},

onReportValidation(topicStr: TopicStr, acceptance: MessageAcceptance): void {
onReportValidation(topicStr: TopicStr, acceptance: TopicValidatorResult): void {
const topic = this.toTopic(topicStr)
this.asyncValidationResult.inc({ topic: topic, acceptance })
},
Expand Down
Loading