Skip to content

Cache peer score #175

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 10 commits into from
Dec 15, 2021
85 changes: 85 additions & 0 deletions test/accept-from.spec.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,85 @@
const {expect} = require('chai')
const sinon = require('sinon')
const Gossipsub = require('../src')

describe('Gossipsub acceptFrom', () => {
let gossipsub
let sandbox
let scoreSpy

beforeEach(async () => {
sandbox = sinon.createSandbox()
sandbox.useFakeTimers(Date.now())
gossipsub = new Gossipsub({}, { emitSelf: false })
// stubbing PeerScore causes some pending issue in firefox browser environment
// we can only spy it
// using scoreSpy.withArgs("peerA").calledOnce causes the pending issue in firefox
// while spy.getCall() is fine
scoreSpy = sandbox.spy(gossipsub.score, "score")
})

afterEach(() => {
sandbox.restore()
})

it('should only white list peer with positive score', () => {
// by default the score is 0
gossipsub._acceptFrom("peerA")
// 1st time, we have to compute score
expect(scoreSpy.getCall(0).args[0]).to.be.equal("peerA")
expect(scoreSpy.getCall(0).returnValue).to.be.equal(0)
expect(scoreSpy.getCall(1)).to.be.undefined
// 2nd time, use a cached score since it's white listed
gossipsub._acceptFrom("peerA")
expect(scoreSpy.getCall(1)).to.be.undefined
})

it('should recompute score after 1s', () => {
// by default the score is 0
gossipsub._acceptFrom("peerA")
// 1st time, we have to compute score
expect(scoreSpy.getCall(0).args[0]).to.be.equal("peerA")
expect(scoreSpy.getCall(1)).to.be.undefined
gossipsub._acceptFrom("peerA")
// score is cached
expect(scoreSpy.getCall(1)).to.be.undefined

// after 1s
sandbox.clock.tick(1001)

gossipsub._acceptFrom("peerA")
expect(scoreSpy.getCall(1).args[0]).to.be.equal("peerA")
expect(scoreSpy.getCall(2)).to.be.undefined
})

it('should recompute score after max messages accepted', () => {
// by default the score is 0
gossipsub._acceptFrom("peerA")
// 1st time, we have to compute score
expect(scoreSpy.getCall(0).args[0]).to.be.equal("peerA")
expect(scoreSpy.getCall(1)).to.be.undefined

for (let i = 0; i < 128; i++) {
gossipsub._acceptFrom("peerA")
}
expect(scoreSpy.getCall(1)).to.be.undefined

// max messages reached
gossipsub._acceptFrom("peerA")
expect(scoreSpy.getCall(1).args[0]).to.be.equal("peerA")
expect(scoreSpy.getCall(2)).to.be.undefined
})

// TODO: run this in a unit test setup
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

there's an issue that sinon causes the test to never finish in firefox environment. I think current test set up is like an e2e test where we'll run it in different OSs and browsers, and sinon is not expected to run on it, we need a separate test:unit where we can stub any functions

// this causes the test to not finish in firefox environment
it.skip('should NOT white list peer with negative score', () => {
// peerB is not white listed since score is negative
scoreStub.score.withArgs("peerB").returns(-1)
gossipsub._acceptFrom("peerB")
// 1st time, we have to compute score
expect(scoreStub.score.withArgs("peerB").calledOnce).to.be.true
// 2nd time, still have to compute score since it's NOT white listed
gossipsub._acceptFrom("peerB")
expect(scoreStub.score.withArgs("peerB").calledTwice).to.be.true
})
})
70 changes: 69 additions & 1 deletion test/peer-score.spec.js
Original file line number Diff line number Diff line change
@@ -1,8 +1,10 @@
const sinon = require('sinon')
const { expect } = require('chai')
const PeerId = require('peer-id')
const delay = require('delay')

const { PeerScore, createPeerScoreParams, createTopicScoreParams } = require('../src/score')
const computeScoreModule = require('../src/score/compute-score')
const { ERR_TOPIC_VALIDATOR_IGNORE, ERR_TOPIC_VALIDATOR_REJECT } = require('../src/constants')
const { makeTestMessage, getMsgId } = require('./utils')

Expand Down Expand Up @@ -642,7 +644,6 @@ describe('PeerScore', () => {
const ps = new PeerScore(params, connectionManager, getMsgId)
ps.addPeer(peerA)
ps.graft(peerA, mytopic)

// score should equal -1000 (app-specific score)
const expected = -1000
ps._refreshScores()
Expand All @@ -665,3 +666,70 @@ describe('PeerScore', () => {
expect(aScore).to.equal(0)
})
})

describe('PeerScore score cache', function () {
const peerA = '16Uiu2HAmMkH6ZLen2tbhiuNCTZLLvrZaDgufNdT5MPjtC9Hr9YNG'
let sandbox
let computeStoreStub
const params = createPeerScoreParams({
appSpecificScore: () => -1000,
appSpecificWeight: 1,
retainScore: 800,
decayInterval: 1000,
topics: {a: {topicWeight: 10}}
})
const ps2 = new PeerScore(params, connectionManager, getMsgId)

beforeEach(() => {
sandbox = sinon.createSandbox()
const now = Date.now()
sandbox.useFakeTimers(now)
computeStoreStub = sandbox.stub(computeScoreModule, 'computeScore')
})

afterEach(() => {
sandbox.restore()
})

it('should compute first time', function () {
computeStoreStub.returns(10)
ps2.addPeer(peerA)
expect(computeStoreStub.calledOnce).to.be.false
ps2.score(peerA)
expect(computeStoreStub.calledOnce).to.be.true
// this time peerA score is cached
ps2.score(peerA)
expect(computeStoreStub.calledOnce).to.be.true
})

const testCases = [
{name: 'decayInterval timeout', fun: () => sandbox.clock.tick(params.decayInterval)},
{name: '_refreshScores', fun: () => ps2._refreshScores()},
{name: 'addPenalty', fun: () => ps2.addPenalty(peerA, 10)},
{name: 'graft', fun: () => ps2.graft(peerA, 'a')},
{name: 'prune', fun: () => ps2.prune(peerA, 'a')},
{name: '_markInvalidMessageDelivery', fun: () => ps2._markInvalidMessageDelivery(peerA, {topicIDs: ['a']})},
{name: '_markFirstMessageDelivery', fun: () => ps2._markFirstMessageDelivery(peerA, {topicIDs: ['a']})},
{name: '_markDuplicateMessageDelivery', fun: () => ps2._markDuplicateMessageDelivery(peerA, {topicIDs: ['a']})},
{name: '_setIPs', fun: () => ps2._setIPs(peerA, [], ['127.0.0.1'])},
{name: '_removeIPs', fun: () => ps2._removeIPs(peerA, ['127.0.0.1'])},
{name: '_updateIPs', fun: () => ps2._updateIPs()},
]

for (const {name, fun} of testCases) {
it(`should invalidate the cache after ${name}`, function () {
computeStoreStub.returns(10)
ps2.addPeer(peerA)
ps2.score(peerA)
expect(computeStoreStub.calledOnce).to.be.true
// the score is cached
ps2.score(peerA)
expect(computeStoreStub.calledOnce).to.be.true
// invalidate the cache
fun()
// should not use the cache
ps2.score(peerA)
expect(computeStoreStub.calledTwice).to.be.true
})
}
})
18 changes: 18 additions & 0 deletions ts/constants.ts
Original file line number Diff line number Diff line change
Expand Up @@ -220,3 +220,21 @@ export const TimeCacheDuration = 120 * 1000

export const ERR_TOPIC_VALIDATOR_REJECT = 'ERR_TOPIC_VALIDATOR_REJECT'
export const ERR_TOPIC_VALIDATOR_IGNORE = 'ERR_TOPIC_VALIDATOR_IGNORE'

/**
* If peer score is better than this, we accept messages from this peer
* within ACCEPT_FROM_WHITELIST_DURATION_MS from the last time computing score.
**/
export const ACCEPT_FROM_WHITELIST_THRESHOLD_SCORE = 0

/**
* If peer score >= ACCEPT_FROM_WHITELIST_THRESHOLD_SCORE, accept up to this
* number of messages from that peer.
*/
export const ACCEPT_FROM_WHITELIST_MAX_MESSAGES = 128

/**
* If peer score >= ACCEPT_FROM_WHITELIST_THRESHOLD_SCORE, accept messages from
* this peer up to this time duration.
*/
export const ACCEPT_FROM_WHITELIST_DURATION_MS = 1000
46 changes: 45 additions & 1 deletion ts/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import PeerId = require('peer-id')
// eslint-disable-next-line @typescript-eslint/ban-ts-comment
// @ts-ignore
import Envelope = require('libp2p/src/record/envelope')
import { ACCEPT_FROM_WHITELIST_DURATION_MS, ACCEPT_FROM_WHITELIST_MAX_MESSAGES, ACCEPT_FROM_WHITELIST_THRESHOLD_SCORE } from './constants'

interface GossipInputOptions {
emitSelf: boolean
Expand Down Expand Up @@ -84,10 +85,18 @@ interface GossipOptions extends GossipInputOptions {
scoreThresholds: PeerScoreThresholds
}

interface AcceptFromWhitelistEntry {
/** number of messages accepted since recomputing the peer's score */
messagesAccepted: number
/** have to recompute score after this time */
acceptUntil: number
}

class Gossipsub extends Pubsub {
peers: Map<string, PeerStreams>
direct: Set<string>
seenCache: SimpleTimeCache
acceptFromWhitelist: Map<string, AcceptFromWhitelistEntry>
topics: Map<string, Set<string>>
mesh: Map<string, Set<string>>
fanout: Map<string, Set<string>>
Expand Down Expand Up @@ -182,6 +191,13 @@ class Gossipsub extends Pubsub {
*/
this.direct = new Set(opts.directPeers.map(p => p.id.toB58String()))

/**
* Map of peer id and AcceptRequestWhileListEntry
*
* @type {Map<string, AcceptFromWhitelistEntry}
*/
this.acceptFromWhitelist = new Map()
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we need a place where these entries get routinely pruned so they don't build up.
Maybe you can prune old entries on every N heartbeats

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

also remove the peer's entry in _removePeer

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@wemeetagain so I removed it in _removePeer but regarding this

I think we need a place where these entries get routinely pruned so they don't build up.
Maybe you can prune old entries on every N heartbeats

I don't think it's necessary as we only do the prune in _removePeer() function for other map as well (unless there's already a leak somewhere, we'll have to do the prune regularly for all of the map). In acceptFrom() we also do the prune if the score is negative


// set direct peer addresses in the address book
opts.directPeers.forEach(p => {
libp2p.peerStore.addressBook.add(p.id, p.addrs)
Expand Down Expand Up @@ -374,6 +390,8 @@ class Gossipsub extends Pubsub {
// Remove from peer scoring
this.score.removePeer(id)

this.acceptFromWhitelist.delete(id)

return peerStreams
}

Expand Down Expand Up @@ -449,7 +467,33 @@ class Gossipsub extends Pubsub {
* @returns {boolean}
*/
_acceptFrom (id: string): boolean {
return this.direct.has(id) || this.score.score(id) >= this._options.scoreThresholds.graylistThreshold
if (this.direct.has(id)) {
return true
}

const now = Date.now()
const entry = this.acceptFromWhitelist.get(id)

if (entry &&
entry.messagesAccepted < ACCEPT_FROM_WHITELIST_MAX_MESSAGES &&
entry.acceptUntil >= now) {
entry.messagesAccepted += 1
return true
}

const score = this.score.score(id)
if (score >= ACCEPT_FROM_WHITELIST_THRESHOLD_SCORE) {
// peer is unlikely to be able to drop its score to `graylistThreshold`
// after 128 messages or 1s
this.acceptFromWhitelist.set(id, {
messagesAccepted: 0,
acceptUntil: now + ACCEPT_FROM_WHITELIST_DURATION_MS
})
} else {
this.acceptFromWhitelist.delete(id)
}

return score >= this._options.scoreThresholds.graylistThreshold
}

/**
Expand Down
Loading