From e64e58127b415ac5d4f359a0b4090de6e287a094 Mon Sep 17 00:00:00 2001 From: Anton Nashatyrev Date: Wed, 24 Mar 2021 20:11:32 +0300 Subject: [PATCH 01/12] Cache PeerHandler.getIP() --- .../io/libp2p/pubsub/gossip/GossipRouter.kt | 4 ---- .../io/libp2p/pubsub/gossip/GossipScore.kt | 16 ++++++++++++++-- 2 files changed, 14 insertions(+), 6 deletions(-) diff --git a/src/main/kotlin/io/libp2p/pubsub/gossip/GossipRouter.kt b/src/main/kotlin/io/libp2p/pubsub/gossip/GossipRouter.kt index 3067b9ee2..5c17a5bb6 100644 --- a/src/main/kotlin/io/libp2p/pubsub/gossip/GossipRouter.kt +++ b/src/main/kotlin/io/libp2p/pubsub/gossip/GossipRouter.kt @@ -2,7 +2,6 @@ package io.libp2p.pubsub.gossip import io.libp2p.core.InternalErrorException import io.libp2p.core.PeerId -import io.libp2p.core.multiformats.Protocol import io.libp2p.core.pubsub.ValidationResult import io.libp2p.etc.types.anyComplete import io.libp2p.etc.types.copy @@ -35,9 +34,6 @@ const val MaxIAskedEntries = 256 const val MaxPeerIHaveEntries = 256 const val MaxIWantRequestsEntries = 10 * 1024 -fun P2PService.PeerHandler.getIP(): String? = - streamHandler.stream.connection.remoteAddress().getStringComponent(Protocol.IP4) - fun P2PService.PeerHandler.isOutbound() = streamHandler.stream.connection.isInitiator fun P2PService.PeerHandler.getPeerProtocol(): PubsubProtocol { diff --git a/src/main/kotlin/io/libp2p/pubsub/gossip/GossipScore.kt b/src/main/kotlin/io/libp2p/pubsub/gossip/GossipScore.kt index 0e546e65a..84b4ac63e 100644 --- a/src/main/kotlin/io/libp2p/pubsub/gossip/GossipScore.kt +++ b/src/main/kotlin/io/libp2p/pubsub/gossip/GossipScore.kt @@ -1,6 +1,7 @@ package io.libp2p.pubsub.gossip import io.libp2p.core.PeerId +import io.libp2p.core.multiformats.Protocol import io.libp2p.core.pubsub.ValidationResult import io.libp2p.etc.types.cappedDouble import io.libp2p.etc.types.createLRUMap @@ -16,6 +17,9 @@ import kotlin.math.max import kotlin.math.min import kotlin.math.pow +fun P2PService.PeerHandler.getIP(): String? = + streamHandler.stream.connection.remoteAddress().getStringComponent(Protocol.IP4) + class GossipScore( val params: GossipScoreParams = GossipScoreParams(), val executor: ScheduledExecutorService, @@ -101,6 +105,7 @@ class GossipScore( private val validationTime: MutableMap = createLRUMap(1024) val peerScores = mutableMapOf() + private val peerIpCache = mutableMapOf() val refreshTask: ScheduledFuture<*> @@ -112,6 +117,8 @@ class GossipScore( private fun getPeerScores(peer: P2PService.PeerHandler) = peerScores.computeIfAbsent(peer.peerId) { PeerScores() } + private fun getPeerIp(peer: P2PService.PeerHandler): String? = peerIpCache[peer.peerId] + private fun getTopicScores(peer: P2PService.PeerHandler, topic: Topic) = getPeerScores(peer).topicScores.computeIfAbsent(topic) { TopicScores(it) } @@ -133,7 +140,7 @@ class GossipScore( ) val appScore = peerParams.appSpecificScore(peer.peerId) * peerParams.appSpecificWeight - val peersInIp: Int = peer.getIP()?.let { thisIp -> + val peersInIp: Int = getPeerIp(peer)?.let { thisIp -> if (peerParams.ipWhitelisted(thisIp)) 0 else peerScores.values.count { thisIp in it.ips } } ?: 0 @@ -164,12 +171,17 @@ class GossipScore( } getPeerScores(peer).disconnectedTimeMillis = curTimeMillis() + peerIpCache -= peer.peerId } fun notifyConnected(peer: P2PService.PeerHandler) { + peer.getIP()?.also { peerIp -> + peerIpCache[peer.peerId] = peerIp + } + getPeerScores(peer).apply { connectedTimeMillis = curTimeMillis() - peer.getIP()?.also { ips += it } + getPeerIp(peer)?.also { ips += it } } } From 4e784e9ec28a83d9301783fbe29cf6559ea21c1a Mon Sep 17 00:00:00 2001 From: Anton Nashatyrev Date: Thu, 25 Mar 2021 15:10:11 +0300 Subject: [PATCH 02/12] Cache topic score unless any of input values changed --- .../kotlin/io/libp2p/etc/types/Delegates.kt | 23 ++++++++--- .../io/libp2p/pubsub/gossip/GossipScore.kt | 41 ++++++++++++++++--- .../io/libp2p/etc/types/DelegatesTest.kt | 3 +- 3 files changed, 54 insertions(+), 13 deletions(-) diff --git a/src/main/kotlin/io/libp2p/etc/types/Delegates.kt b/src/main/kotlin/io/libp2p/etc/types/Delegates.kt index e44335659..10f7f44b5 100644 --- a/src/main/kotlin/io/libp2p/etc/types/Delegates.kt +++ b/src/main/kotlin/io/libp2p/etc/types/Delegates.kt @@ -22,16 +22,20 @@ fun > cappedVar(value: T, lowerBound: T, upperBound: T) = /** * Creates a Double delegate which may drop value to [0.0] when the new value is less than [decayToZero] */ -fun cappedDouble(value: Double, decayToZero: Double = Double.MIN_VALUE): CappedValueDelegate { - return cappedDouble(value, decayToZero) { Double.MAX_VALUE } +fun cappedDouble(value: Double, decayToZero: Double = Double.MIN_VALUE, updateListener: (Double) -> Unit = { }): CappedValueDelegate { + return cappedDouble(value, decayToZero, { Double.MAX_VALUE }, updateListener) } /** * Creates a Double delegate which may cap upper bound (set [upperBound] when the new value is greater) * and may drop value to [0.0] when the new value is less than [decayToZero] */ -fun cappedDouble(value: Double, decayToZero: Double = Double.MIN_VALUE, upperBound: () -> Double) = - CappedValueDelegate(value, { decayToZero }, { 0.0 }, upperBound, upperBound) +fun cappedDouble( + value: Double, + decayToZero: Double = Double.MIN_VALUE, + upperBound: () -> Double, + updateListener: (Double) -> Unit = { } +) = CappedValueDelegate(value, { decayToZero }, { 0.0 }, upperBound, upperBound, updateListener) // thanks to https://stackoverflow.com/a/47948047/9630725 class LazyMutable(val initializer: () -> T, val rejectSetAfterGet: Boolean = false) : ReadWriteProperty { @@ -64,16 +68,25 @@ data class CappedValueDelegate>( private val lowerBound: () -> C, private val lowerBoundVal: () -> C = lowerBound, private val upperBound: () -> C, - private val upperBoundVal: () -> C = upperBound + private val upperBoundVal: () -> C = upperBound, + private val updateListener: (C) -> Unit = { } ) : ReadWriteProperty { override fun getValue(thisRef: Any?, property: KProperty<*>): C { + val oldValue = value val v1 = if (value > upperBound()) upperBoundVal() else value value = if (value < lowerBound()) lowerBoundVal() else v1 + if (oldValue != value) { + updateListener(value) + } return value } override fun setValue(thisRef: Any?, property: KProperty<*>, value: C) { + val oldValue = value this.value = value + if (oldValue != value) { + updateListener(value) + } } } diff --git a/src/main/kotlin/io/libp2p/pubsub/gossip/GossipScore.kt b/src/main/kotlin/io/libp2p/pubsub/gossip/GossipScore.kt index 84b4ac63e..042207af9 100644 --- a/src/main/kotlin/io/libp2p/pubsub/gossip/GossipScore.kt +++ b/src/main/kotlin/io/libp2p/pubsub/gossip/GossipScore.kt @@ -6,6 +6,7 @@ import io.libp2p.core.pubsub.ValidationResult import io.libp2p.etc.types.cappedDouble import io.libp2p.etc.types.createLRUMap import io.libp2p.etc.types.millis +import io.libp2p.etc.types.seconds import io.libp2p.etc.util.P2PService import io.libp2p.pubsub.PubsubMessage import io.libp2p.pubsub.Topic @@ -27,22 +28,43 @@ class GossipScore( ) { inner class TopicScores(val topic: Topic) { + private val recalcMaxDuration = 1.seconds private val params: GossipTopicScoreParams get() = topicParams[topic] + private var cachedScore: Double = 0.0 + private var cacheValid: Boolean = false + private var prevParams = params + private var prevTime = curTimeMillis() var joinedMeshTimeMillis: Long = 0 + set(value) { + field = value + cacheValid = false + } + var firstMessageDeliveries: Double by cappedDouble( 0.0, this@GossipScore.peerParams.decayToZero, - { params.firstMessageDeliveriesCap } + { params.firstMessageDeliveriesCap }, + { cacheValid = false } ) var meshMessageDeliveries: Double by cappedDouble( 0.0, this@GossipScore.peerParams.decayToZero, - { params.meshMessageDeliveriesCap } + { params.meshMessageDeliveriesCap }, + { cacheValid = false } + ) + var meshFailurePenalty: Double by cappedDouble( + 0.0, + this@GossipScore.peerParams.decayToZero, + { _ -> cacheValid = false } + ) + + var invalidMessages: Double by cappedDouble( + 0.0, + this@GossipScore.peerParams.decayToZero, + { _ -> cacheValid = false } ) - var meshFailurePenalty: Double by cappedDouble(0.0, this@GossipScore.peerParams.decayToZero) - var invalidMessages: Double by cappedDouble(0.0, this@GossipScore.peerParams.decayToZero) fun inMesh() = joinedMeshTimeMillis > 0 @@ -62,19 +84,26 @@ class GossipScore( fun meshMessageDeliveriesDeficitSqr() = meshMessageDeliveriesDeficit().pow(2) fun calcTopicScore(): Double { + val curTime = curTimeMillis(); + if (cacheValid && prevParams === params && curTime - prevTime < recalcMaxDuration.toMillis()) { + return cachedScore + } + prevParams = params + prevTime = curTime val p1 = meshTimeNorm() val p2 = firstMessageDeliveries val p3 = meshMessageDeliveriesDeficitSqr() val p3b = meshFailurePenalty val p4 = invalidMessages.pow(2) - val ret = params.topicWeight * ( + cachedScore = params.topicWeight * ( p1 * params.timeInMeshWeight + p2 * params.firstMessageDeliveriesWeight + p3 * params.meshMessageDeliveriesWeight + p3b * params.meshFailurePenaltyWeight + p4 * params.invalidMessageDeliveriesWeight ) - return ret + cacheValid = true + return cachedScore } fun decayScores() { diff --git a/src/test/kotlin/io/libp2p/etc/types/DelegatesTest.kt b/src/test/kotlin/io/libp2p/etc/types/DelegatesTest.kt index ab1551fa6..c7ea3dbd1 100644 --- a/src/test/kotlin/io/libp2p/etc/types/DelegatesTest.kt +++ b/src/test/kotlin/io/libp2p/etc/types/DelegatesTest.kt @@ -14,8 +14,7 @@ class DelegatesTest { var cappedValueDelegate: Double by CappedValueDelegate(0.0, { min.get() }, { minVal.get() }, { max.get() }, { maxVal.get() }) var cappedInt: Int by cappedVar(10, 5, 20) - var cappedDouble: Double by cappedDouble(0.0, 1.0) { max.get() } - + var cappedDouble: Double by cappedDouble(0.0, 1.0, { -> max.get() }) @Test fun cappedVarTest() { Assertions.assertEquals(10, cappedInt) From 7bb4ab4395cf12572b825250e878c7fcdc97f9e5 Mon Sep 17 00:00:00 2001 From: Anton Nashatyrev Date: Thu, 25 Mar 2021 15:30:37 +0300 Subject: [PATCH 03/12] Maintain acceptRequests whitelist with expiration for peers with non-negative score to avoid score recalculation for every inbound message --- .../io/libp2p/pubsub/gossip/GossipRouter.kt | 20 ++++++++++++++++++- .../io/libp2p/pubsub/gossip/GossipScore.kt | 2 +- 2 files changed, 20 insertions(+), 2 deletions(-) diff --git a/src/main/kotlin/io/libp2p/pubsub/gossip/GossipRouter.kt b/src/main/kotlin/io/libp2p/pubsub/gossip/GossipRouter.kt index 5c17a5bb6..954f8041a 100644 --- a/src/main/kotlin/io/libp2p/pubsub/gossip/GossipRouter.kt +++ b/src/main/kotlin/io/libp2p/pubsub/gossip/GossipRouter.kt @@ -55,6 +55,8 @@ open class GossipRouter @JvmOverloads constructor( subscriptionTopicSubscriptionFilter: TopicSubscriptionFilter = TopicSubscriptionFilter.AllowAllTopicSubscriptionFilter() ) : AbstractRouter(subscriptionTopicSubscriptionFilter, params.maxGossipMessageSize) { + val acceptRequestsWhitelistThreshold = 0 + val acceptRequestsWhitelistDuration = 1.seconds val score by lazy { GossipScore(scoreParams, executor, curTimeMillis) } val fanout: MutableMap> = linkedMapOf() val mesh: MutableMap> = linkedMapOf() @@ -74,6 +76,7 @@ open class GossipRouter @JvmOverloads constructor( TimeUnit.MILLISECONDS ) } + private val acceptRequestsWhitelist = mutableMapOf() override val seenMessages: SeenCache> by lazy { TTLSeenCache(SimpleSeenCache(), params.seenTTL, curTimeMillis) @@ -169,7 +172,22 @@ open class GossipRouter @JvmOverloads constructor( } override fun acceptRequestsFrom(peer: PeerHandler): Boolean { - return isDirect(peer) || score.score(peer) >= score.params.graylistThreshold + if (isDirect(peer)) { + return true + } + + val curTime = curTimeMillis() + val whitelistedTill = acceptRequestsWhitelist[peer] ?: 0 + if (curTime <= whitelistedTill) { + return true + } + + val peerScore = score.score(peer) + if (peerScore >= acceptRequestsWhitelistThreshold) { + acceptRequestsWhitelist[peer] = curTime + acceptRequestsWhitelistDuration.toMillis() + } + + return peerScore >= score.params.graylistThreshold } override fun validateMessageListLimits(msg: Rpc.RPC): Boolean { diff --git a/src/main/kotlin/io/libp2p/pubsub/gossip/GossipScore.kt b/src/main/kotlin/io/libp2p/pubsub/gossip/GossipScore.kt index 042207af9..338883e15 100644 --- a/src/main/kotlin/io/libp2p/pubsub/gossip/GossipScore.kt +++ b/src/main/kotlin/io/libp2p/pubsub/gossip/GossipScore.kt @@ -84,7 +84,7 @@ class GossipScore( fun meshMessageDeliveriesDeficitSqr() = meshMessageDeliveriesDeficit().pow(2) fun calcTopicScore(): Double { - val curTime = curTimeMillis(); + val curTime = curTimeMillis() if (cacheValid && prevParams === params && curTime - prevTime < recalcMaxDuration.toMillis()) { return cachedScore } From 7e6aa51a6d221875bf7987e6610ba44f4b06268b Mon Sep 17 00:00:00 2001 From: Anton Nashatyrev Date: Thu, 25 Mar 2021 15:40:51 +0300 Subject: [PATCH 04/12] Fix updateListener --- src/main/kotlin/io/libp2p/etc/types/Delegates.kt | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/main/kotlin/io/libp2p/etc/types/Delegates.kt b/src/main/kotlin/io/libp2p/etc/types/Delegates.kt index 10f7f44b5..9c9e3a77a 100644 --- a/src/main/kotlin/io/libp2p/etc/types/Delegates.kt +++ b/src/main/kotlin/io/libp2p/etc/types/Delegates.kt @@ -73,7 +73,7 @@ data class CappedValueDelegate>( ) : ReadWriteProperty { override fun getValue(thisRef: Any?, property: KProperty<*>): C { - val oldValue = value + val oldValue = this.value val v1 = if (value > upperBound()) upperBoundVal() else value value = if (value < lowerBound()) lowerBoundVal() else v1 if (oldValue != value) { @@ -83,7 +83,7 @@ data class CappedValueDelegate>( } override fun setValue(thisRef: Any?, property: KProperty<*>, value: C) { - val oldValue = value + val oldValue = this.value this.value = value if (oldValue != value) { updateListener(value) From 4548e71ff647ef08d96fa65f0d5c8344ac1539ca Mon Sep 17 00:00:00 2001 From: Anton Nashatyrev Date: Thu, 25 Mar 2021 16:05:43 +0300 Subject: [PATCH 05/12] Use `timeInMeshQuantum` for TopicScores.recalcMaxDuration --- src/main/kotlin/io/libp2p/pubsub/gossip/GossipScore.kt | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/src/main/kotlin/io/libp2p/pubsub/gossip/GossipScore.kt b/src/main/kotlin/io/libp2p/pubsub/gossip/GossipScore.kt index 338883e15..cb3fd4a11 100644 --- a/src/main/kotlin/io/libp2p/pubsub/gossip/GossipScore.kt +++ b/src/main/kotlin/io/libp2p/pubsub/gossip/GossipScore.kt @@ -6,7 +6,6 @@ import io.libp2p.core.pubsub.ValidationResult import io.libp2p.etc.types.cappedDouble import io.libp2p.etc.types.createLRUMap import io.libp2p.etc.types.millis -import io.libp2p.etc.types.seconds import io.libp2p.etc.util.P2PService import io.libp2p.pubsub.PubsubMessage import io.libp2p.pubsub.Topic @@ -28,9 +27,9 @@ class GossipScore( ) { inner class TopicScores(val topic: Topic) { - private val recalcMaxDuration = 1.seconds private val params: GossipTopicScoreParams get() = topicParams[topic] + private val recalcMaxDuration = params.timeInMeshQuantum private var cachedScore: Double = 0.0 private var cacheValid: Boolean = false private var prevParams = params From 405a66b2f3b301b7a6eb810bc652bb3000d1ec63 Mon Sep 17 00:00:00 2001 From: Anton Nashatyrev Date: Thu, 25 Mar 2021 16:06:13 +0300 Subject: [PATCH 06/12] Clean up acceptRequestsWhitelist on peer disconnect --- src/main/kotlin/io/libp2p/pubsub/gossip/GossipRouter.kt | 1 + 1 file changed, 1 insertion(+) diff --git a/src/main/kotlin/io/libp2p/pubsub/gossip/GossipRouter.kt b/src/main/kotlin/io/libp2p/pubsub/gossip/GossipRouter.kt index 954f8041a..1a3b6edf0 100644 --- a/src/main/kotlin/io/libp2p/pubsub/gossip/GossipRouter.kt +++ b/src/main/kotlin/io/libp2p/pubsub/gossip/GossipRouter.kt @@ -103,6 +103,7 @@ open class GossipRouter @JvmOverloads constructor( score.notifyDisconnected(peer) mesh.values.forEach { it.remove(peer) } fanout.values.forEach { it.remove(peer) } + acceptRequestsWhitelist -= peer collectPeerMessage(peer) // discard them super.onPeerDisconnected(peer) } From 6728534c2dd2628dda5a1e1d98ca116aba19e758 Mon Sep 17 00:00:00 2001 From: Anton Nashatyrev Date: Thu, 25 Mar 2021 16:26:11 +0300 Subject: [PATCH 07/12] Bound acceptRequest whitelisting by number of inbound messages as well as by time --- .../io/libp2p/pubsub/gossip/GossipRouter.kt | 23 ++++++++++++++----- 1 file changed, 17 insertions(+), 6 deletions(-) diff --git a/src/main/kotlin/io/libp2p/pubsub/gossip/GossipRouter.kt b/src/main/kotlin/io/libp2p/pubsub/gossip/GossipRouter.kt index 1a3b6edf0..99de8b148 100644 --- a/src/main/kotlin/io/libp2p/pubsub/gossip/GossipRouter.kt +++ b/src/main/kotlin/io/libp2p/pubsub/gossip/GossipRouter.kt @@ -55,7 +55,8 @@ open class GossipRouter @JvmOverloads constructor( subscriptionTopicSubscriptionFilter: TopicSubscriptionFilter = TopicSubscriptionFilter.AllowAllTopicSubscriptionFilter() ) : AbstractRouter(subscriptionTopicSubscriptionFilter, params.maxGossipMessageSize) { - val acceptRequestsWhitelistThreshold = 0 + val acceptRequestsWhitelistThresholdScore = 0 + val acceptRequestsWhitelistMaxMessages = 128 val acceptRequestsWhitelistDuration = 1.seconds val score by lazy { GossipScore(scoreParams, executor, curTimeMillis) } val fanout: MutableMap> = linkedMapOf() @@ -76,7 +77,7 @@ open class GossipRouter @JvmOverloads constructor( TimeUnit.MILLISECONDS ) } - private val acceptRequestsWhitelist = mutableMapOf() + private val acceptRequestsWhitelist = mutableMapOf() override val seenMessages: SeenCache> by lazy { TTLSeenCache(SimpleSeenCache(), params.seenTTL, curTimeMillis) @@ -178,14 +179,20 @@ open class GossipRouter @JvmOverloads constructor( } val curTime = curTimeMillis() - val whitelistedTill = acceptRequestsWhitelist[peer] ?: 0 - if (curTime <= whitelistedTill) { + val whitelistEntry = acceptRequestsWhitelist[peer] + if (whitelistEntry != null && + curTime <= whitelistEntry.whitelistedTill && + whitelistEntry.messagesAccepted < acceptRequestsWhitelistMaxMessages + ) { + + acceptRequestsWhitelist[peer] = whitelistEntry.incrementMessageCount() return true } val peerScore = score.score(peer) - if (peerScore >= acceptRequestsWhitelistThreshold) { - acceptRequestsWhitelist[peer] = curTime + acceptRequestsWhitelistDuration.toMillis() + if (peerScore >= acceptRequestsWhitelistThresholdScore) { + acceptRequestsWhitelist[peer] = + AcceptRequestsWhitelistEntry(curTime + acceptRequestsWhitelistDuration.toMillis()) } return peerScore >= score.params.graylistThreshold @@ -565,4 +572,8 @@ open class GossipRouter @JvmOverloads constructor( ).build() ) } + + data class AcceptRequestsWhitelistEntry(val whitelistedTill: Long, val messagesAccepted: Int = 0) { + fun incrementMessageCount() = AcceptRequestsWhitelistEntry(whitelistedTill, messagesAccepted + 1) + } } From 04faead6f17e7d846d2f2c396229c4acc9b876db Mon Sep 17 00:00:00 2001 From: Anton Nashatyrev Date: Thu, 25 Mar 2021 19:50:43 +0300 Subject: [PATCH 08/12] Add test for acceptRequests whitelist --- .../libp2p/pubsub/gossip/GossipV1_1Tests.kt | 54 +++++++++++++++++++ 1 file changed, 54 insertions(+) diff --git a/src/test/kotlin/io/libp2p/pubsub/gossip/GossipV1_1Tests.kt b/src/test/kotlin/io/libp2p/pubsub/gossip/GossipV1_1Tests.kt index 6d2fed860..4e77dbe41 100644 --- a/src/test/kotlin/io/libp2p/pubsub/gossip/GossipV1_1Tests.kt +++ b/src/test/kotlin/io/libp2p/pubsub/gossip/GossipV1_1Tests.kt @@ -250,6 +250,60 @@ class GossipV1_1Tests { test.mockRouter.inboundMessages.clear() } + @Test + fun `test that acceptRequests whitelist is refreshed on timeout`() { + val appScore = AtomicDouble() + val peerScoreParams = GossipPeerScoreParams( + appSpecificScore = { appScore.get() }, + appSpecificWeight = 1.0 + ) + val scoreParams = GossipScoreParams( + peerScoreParams = peerScoreParams, + graylistThreshold = -100.0 + ) + val test = TwoRoutersTest(scoreParams = scoreParams) + + // with this score the peer should be whitelisted for some period + appScore.set(test.gossipRouter.acceptRequestsWhitelistThresholdScore.toDouble()) + + test.mockRouter.subscribe("topic1") + test.gossipRouter.subscribe("topic1") + + // 2 heartbeats - the topic should be GRAFTed + test.fuzz.timeController.addTime(2.seconds) + test.mockRouter.waitForMessage { it.hasControl() && it.control.graftCount > 0 } + test.mockRouter.inboundMessages.clear() + + val msg1 = Rpc.RPC.newBuilder() + .addPublish(newProtoMessage("topic1", 0L, "Hello-1".toByteArray())) + .build() + test.mockRouter.sendToSingle(msg1) + // at this point peer is whitelisted for a period + + appScore.set(-101.0) + + val graftMsg = Rpc.RPC.newBuilder().setControl( + Rpc.ControlMessage.newBuilder().addGraft( + Rpc.ControlGraft.newBuilder().setTopicID("topic1") + ) + ).build() + for (i in 0..2) { + test.fuzz.timeController.addTime(50.millis) + + // even having the score below gralist threshold the peer should be answered because + // it is still in acceptRequests whitelist + test.mockRouter.sendToSingle(graftMsg) + test.mockRouter.waitForMessage { it.hasControl() && it.control.pruneCount > 0 } + } + + test.fuzz.timeController.addTime(test.gossipRouter.acceptRequestsWhitelistDuration) + // at this point whitelist should be invalidated and score recalculated + + test.mockRouter.sendToSingle(graftMsg) + // the last message should be ignored + assertEquals(0, test.mockRouter.inboundMessages.size) + } + @Test fun testGraftFloodPenalty() { val test = TwoRoutersTest() From aaca0d0ce4dded7772c02540232e25da1fc6751a Mon Sep 17 00:00:00 2001 From: Anton Nashatyrev Date: Fri, 26 Mar 2021 10:35:24 +0300 Subject: [PATCH 09/12] Add capped Double updateListener unit tests --- .../io/libp2p/etc/types/DelegatesTest.kt | 56 ++++++++++++++++++- 1 file changed, 54 insertions(+), 2 deletions(-) diff --git a/src/test/kotlin/io/libp2p/etc/types/DelegatesTest.kt b/src/test/kotlin/io/libp2p/etc/types/DelegatesTest.kt index c7ea3dbd1..1779976f7 100644 --- a/src/test/kotlin/io/libp2p/etc/types/DelegatesTest.kt +++ b/src/test/kotlin/io/libp2p/etc/types/DelegatesTest.kt @@ -12,9 +12,21 @@ class DelegatesTest { val min = AtomicDouble(5.0) val minVal = AtomicDouble(0.0) - var cappedValueDelegate: Double by CappedValueDelegate(0.0, { min.get() }, { minVal.get() }, { max.get() }, { maxVal.get() }) + val cappedValueDelegateUpdates = mutableListOf() + val cappedDoubleUpdates = mutableListOf() + + var cappedValueDelegate: Double by CappedValueDelegate( + 0.0, + { min.get() }, + { minVal.get() }, + { max.get() }, + { maxVal.get() }, + { cappedValueDelegateUpdates += it }) + var cappedInt: Int by cappedVar(10, 5, 20) - var cappedDouble: Double by cappedDouble(0.0, 1.0, { -> max.get() }) + var cappedDouble: Double by cappedDouble(0.0, 1.0, { -> max.get() }, { cappedDoubleUpdates += it } ) + var blackhole: Double = 0.0 + @Test fun cappedVarTest() { Assertions.assertEquals(10, cappedInt) @@ -89,4 +101,44 @@ class DelegatesTest { min.set(7.0) assertThat(cappedValueDelegate).isEqualTo(0.0) } + + @Test + fun `test cappedDouble update callback`() { + cappedDouble = 5.0 + assertThat(cappedDoubleUpdates).containsExactly(5.0) + + cappedDouble = 5.0 + assertThat(cappedDoubleUpdates).containsExactly(5.0) + + cappedDouble = 4.0 + assertThat(cappedDoubleUpdates).containsExactly(5.0, 4.0) + + max.set(3.0) + blackhole = cappedDouble + assertThat(cappedDoubleUpdates).containsExactly(5.0, 4.0, 3.0) + + max.set(5.0) + blackhole = cappedDouble + assertThat(cappedDoubleUpdates).containsExactly(5.0, 4.0, 3.0) + } + + @Test + fun `test cappedValueDelegate update callback`() { + cappedValueDelegate = 8.0 + assertThat(cappedValueDelegateUpdates).containsExactly(8.0) + + cappedValueDelegate = 8.0 + assertThat(cappedValueDelegateUpdates).containsExactly(8.0) + + cappedValueDelegate = 7.0 + assertThat(cappedValueDelegateUpdates).containsExactly(8.0, 7.0) + + max.set(6.0) + blackhole = cappedValueDelegate + assertThat(cappedValueDelegateUpdates).containsExactly(8.0, 7.0, 15.0) + + max.set(7.0) + blackhole = cappedValueDelegate + assertThat(cappedValueDelegateUpdates).containsExactly(8.0, 7.0, 15.0) + } } From 5069593b78ec5a289afa9ec0619af6118026b09b Mon Sep 17 00:00:00 2001 From: Anton Nashatyrev Date: Fri, 26 Mar 2021 10:46:06 +0300 Subject: [PATCH 10/12] Comment default acceptRequestsWhitelist values reason --- src/main/kotlin/io/libp2p/pubsub/gossip/GossipRouter.kt | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/src/main/kotlin/io/libp2p/pubsub/gossip/GossipRouter.kt b/src/main/kotlin/io/libp2p/pubsub/gossip/GossipRouter.kt index 99de8b148..b0dfaf9c8 100644 --- a/src/main/kotlin/io/libp2p/pubsub/gossip/GossipRouter.kt +++ b/src/main/kotlin/io/libp2p/pubsub/gossip/GossipRouter.kt @@ -55,9 +55,15 @@ open class GossipRouter @JvmOverloads constructor( subscriptionTopicSubscriptionFilter: TopicSubscriptionFilter = TopicSubscriptionFilter.AllowAllTopicSubscriptionFilter() ) : AbstractRouter(subscriptionTopicSubscriptionFilter, params.maxGossipMessageSize) { + // The idea behind choosing these specific default values for acceptRequestsWhitelist was + // - from one side are pretty small and safe: peer unlikely be able to drop its score to `graylist` + // with 128 messages. But even if so then it's not critical to accept some extra messages before + // blocking - not too much space for DoS here + // - from the other side param values are pretty high to yield good performance gain val acceptRequestsWhitelistThresholdScore = 0 val acceptRequestsWhitelistMaxMessages = 128 val acceptRequestsWhitelistDuration = 1.seconds + val score by lazy { GossipScore(scoreParams, executor, curTimeMillis) } val fanout: MutableMap> = linkedMapOf() val mesh: MutableMap> = linkedMapOf() From 4c154685ecc6afd2cf04f7021e3215a6212979a9 Mon Sep 17 00:00:00 2001 From: Anton Nashatyrev Date: Fri, 26 Mar 2021 10:50:37 +0300 Subject: [PATCH 11/12] Remove acceptRequestsWhitelist entry (if any) if not adding a new one --- src/main/kotlin/io/libp2p/pubsub/gossip/GossipRouter.kt | 2 ++ 1 file changed, 2 insertions(+) diff --git a/src/main/kotlin/io/libp2p/pubsub/gossip/GossipRouter.kt b/src/main/kotlin/io/libp2p/pubsub/gossip/GossipRouter.kt index b0dfaf9c8..e857d8edf 100644 --- a/src/main/kotlin/io/libp2p/pubsub/gossip/GossipRouter.kt +++ b/src/main/kotlin/io/libp2p/pubsub/gossip/GossipRouter.kt @@ -199,6 +199,8 @@ open class GossipRouter @JvmOverloads constructor( if (peerScore >= acceptRequestsWhitelistThresholdScore) { acceptRequestsWhitelist[peer] = AcceptRequestsWhitelistEntry(curTime + acceptRequestsWhitelistDuration.toMillis()) + } else { + acceptRequestsWhitelist -= peer } return peerScore >= score.params.graylistThreshold From c545f198abf04689a2614b40e196eed282d1e1e8 Mon Sep 17 00:00:00 2001 From: Anton Nashatyrev Date: Fri, 26 Mar 2021 10:52:49 +0300 Subject: [PATCH 12/12] Spotless --- src/test/kotlin/io/libp2p/etc/types/DelegatesTest.kt | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/src/test/kotlin/io/libp2p/etc/types/DelegatesTest.kt b/src/test/kotlin/io/libp2p/etc/types/DelegatesTest.kt index 1779976f7..f1573f8c7 100644 --- a/src/test/kotlin/io/libp2p/etc/types/DelegatesTest.kt +++ b/src/test/kotlin/io/libp2p/etc/types/DelegatesTest.kt @@ -21,10 +21,11 @@ class DelegatesTest { { minVal.get() }, { max.get() }, { maxVal.get() }, - { cappedValueDelegateUpdates += it }) + { cappedValueDelegateUpdates += it } + ) var cappedInt: Int by cappedVar(10, 5, 20) - var cappedDouble: Double by cappedDouble(0.0, 1.0, { -> max.get() }, { cappedDoubleUpdates += it } ) + var cappedDouble: Double by cappedDouble(0.0, 1.0, { -> max.get() }, { cappedDoubleUpdates += it }) var blackhole: Double = 0.0 @Test