Skip to content

Release 0.8.1 #185

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 3 commits into from
Mar 31, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ Hosting of artefacts is graciously provided by [Cloudsmith](https://cloudsmith.c
maven { url "https://dl.cloudsmith.io/public/libp2p/jvm-libp2p/maven/" }
}

implementation 'io.libp2p:jvm-libp2p-minimal:0.8.0-RELEASE'
implementation 'io.libp2p:jvm-libp2p-minimal:0.8.1-RELEASE'
```
### Using Maven
Add the repository to the `dependencyManagement` section of the pom file:
Expand All @@ -96,7 +96,7 @@ And then add jvm-libp2p as a dependency:
<dependency>
<groupId>io.libp2p</groupId>
<artifactId>jvm-libp2p-minimal</artifactId>
<version>0.8.0-RELEASE</version>
<version>0.8.1-RELEASE</version>
<type>pom</type>
</dependency>
```
Expand Down
2 changes: 1 addition & 1 deletion build.gradle.kts
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ import java.nio.file.Paths
// ./gradlew publish -PcloudsmithUser=<user> -PcloudsmithApiKey=<api-key>

group = "io.libp2p"
version = "0.8.0-RELEASE"
version = "0.8.1-RELEASE"
description = "a minimal implementation of libp2p for the jvm"

plugins {
Expand Down
23 changes: 18 additions & 5 deletions src/main/kotlin/io/libp2p/etc/types/Delegates.kt
Original file line number Diff line number Diff line change
Expand Up @@ -22,16 +22,20 @@ fun <T : Comparable<T>> cappedVar(value: T, lowerBound: T, upperBound: T) =
/**
* Creates a Double delegate which may drop value to [0.0] when the new value is less than [decayToZero]
*/
fun cappedDouble(value: Double, decayToZero: Double = Double.MIN_VALUE): CappedValueDelegate<Double> {
return cappedDouble(value, decayToZero) { Double.MAX_VALUE }
fun cappedDouble(value: Double, decayToZero: Double = Double.MIN_VALUE, updateListener: (Double) -> Unit = { }): CappedValueDelegate<Double> {
return cappedDouble(value, decayToZero, { Double.MAX_VALUE }, updateListener)
}

/**
* Creates a Double delegate which may cap upper bound (set [upperBound] when the new value is greater)
* and may drop value to [0.0] when the new value is less than [decayToZero]
*/
fun cappedDouble(value: Double, decayToZero: Double = Double.MIN_VALUE, upperBound: () -> Double) =
CappedValueDelegate(value, { decayToZero }, { 0.0 }, upperBound, upperBound)
fun cappedDouble(
value: Double,
decayToZero: Double = Double.MIN_VALUE,
upperBound: () -> Double,
updateListener: (Double) -> Unit = { }
) = CappedValueDelegate(value, { decayToZero }, { 0.0 }, upperBound, upperBound, updateListener)

// thanks to https://stackoverflow.com/a/47948047/9630725
class LazyMutable<T>(val initializer: () -> T, val rejectSetAfterGet: Boolean = false) : ReadWriteProperty<Any?, T> {
Expand Down Expand Up @@ -64,16 +68,25 @@ data class CappedValueDelegate<C : Comparable<C>>(
private val lowerBound: () -> C,
private val lowerBoundVal: () -> C = lowerBound,
private val upperBound: () -> C,
private val upperBoundVal: () -> C = upperBound
private val upperBoundVal: () -> C = upperBound,
private val updateListener: (C) -> Unit = { }
) : ReadWriteProperty<Any?, C> {

override fun getValue(thisRef: Any?, property: KProperty<*>): C {
val oldValue = this.value
val v1 = if (value > upperBound()) upperBoundVal() else value
value = if (value < lowerBound()) lowerBoundVal() else v1
if (oldValue != value) {
updateListener(value)
}
return value
}

override fun setValue(thisRef: Any?, property: KProperty<*>, value: C) {
val oldValue = this.value
this.value = value
if (oldValue != value) {
updateListener(value)
}
}
}
44 changes: 39 additions & 5 deletions src/main/kotlin/io/libp2p/pubsub/gossip/GossipRouter.kt
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ package io.libp2p.pubsub.gossip

import io.libp2p.core.InternalErrorException
import io.libp2p.core.PeerId
import io.libp2p.core.multiformats.Protocol
import io.libp2p.core.pubsub.ValidationResult
import io.libp2p.etc.types.anyComplete
import io.libp2p.etc.types.copy
Expand Down Expand Up @@ -35,9 +34,6 @@ const val MaxIAskedEntries = 256
const val MaxPeerIHaveEntries = 256
const val MaxIWantRequestsEntries = 10 * 1024

fun P2PService.PeerHandler.getIP(): String? =
streamHandler.stream.connection.remoteAddress().getStringComponent(Protocol.IP4)

fun P2PService.PeerHandler.isOutbound() = streamHandler.stream.connection.isInitiator

fun P2PService.PeerHandler.getPeerProtocol(): PubsubProtocol {
Expand All @@ -59,6 +55,15 @@ open class GossipRouter @JvmOverloads constructor(
subscriptionTopicSubscriptionFilter: TopicSubscriptionFilter = TopicSubscriptionFilter.AllowAllTopicSubscriptionFilter()
) : AbstractRouter(subscriptionTopicSubscriptionFilter, params.maxGossipMessageSize) {

// The idea behind choosing these specific default values for acceptRequestsWhitelist was
// - from one side are pretty small and safe: peer unlikely be able to drop its score to `graylist`
// with 128 messages. But even if so then it's not critical to accept some extra messages before
// blocking - not too much space for DoS here
// - from the other side param values are pretty high to yield good performance gain
val acceptRequestsWhitelistThresholdScore = 0
val acceptRequestsWhitelistMaxMessages = 128
val acceptRequestsWhitelistDuration = 1.seconds

val score by lazy { GossipScore(scoreParams, executor, curTimeMillis) }
val fanout: MutableMap<Topic, MutableSet<PeerHandler>> = linkedMapOf()
val mesh: MutableMap<Topic, MutableSet<PeerHandler>> = linkedMapOf()
Expand All @@ -78,6 +83,7 @@ open class GossipRouter @JvmOverloads constructor(
TimeUnit.MILLISECONDS
)
}
private val acceptRequestsWhitelist = mutableMapOf<PeerHandler, AcceptRequestsWhitelistEntry>()

override val seenMessages: SeenCache<Optional<ValidationResult>> by lazy {
TTLSeenCache(SimpleSeenCache(), params.seenTTL, curTimeMillis)
Expand All @@ -104,6 +110,7 @@ open class GossipRouter @JvmOverloads constructor(
score.notifyDisconnected(peer)
mesh.values.forEach { it.remove(peer) }
fanout.values.forEach { it.remove(peer) }
acceptRequestsWhitelist -= peer
collectPeerMessage(peer) // discard them
super.onPeerDisconnected(peer)
}
Expand Down Expand Up @@ -173,7 +180,30 @@ open class GossipRouter @JvmOverloads constructor(
}

override fun acceptRequestsFrom(peer: PeerHandler): Boolean {
return isDirect(peer) || score.score(peer) >= score.params.graylistThreshold
if (isDirect(peer)) {
return true
}

val curTime = curTimeMillis()
val whitelistEntry = acceptRequestsWhitelist[peer]
if (whitelistEntry != null &&
curTime <= whitelistEntry.whitelistedTill &&
whitelistEntry.messagesAccepted < acceptRequestsWhitelistMaxMessages
) {

acceptRequestsWhitelist[peer] = whitelistEntry.incrementMessageCount()
return true
}

val peerScore = score.score(peer)
if (peerScore >= acceptRequestsWhitelistThresholdScore) {
acceptRequestsWhitelist[peer] =
AcceptRequestsWhitelistEntry(curTime + acceptRequestsWhitelistDuration.toMillis())
} else {
acceptRequestsWhitelist -= peer
}

return peerScore >= score.params.graylistThreshold
}

override fun validateMessageListLimits(msg: Rpc.RPC): Boolean {
Expand Down Expand Up @@ -550,4 +580,8 @@ open class GossipRouter @JvmOverloads constructor(
).build()
)
}

data class AcceptRequestsWhitelistEntry(val whitelistedTill: Long, val messagesAccepted: Int = 0) {
fun incrementMessageCount() = AcceptRequestsWhitelistEntry(whitelistedTill, messagesAccepted + 1)
}
}
56 changes: 48 additions & 8 deletions src/main/kotlin/io/libp2p/pubsub/gossip/GossipScore.kt
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package io.libp2p.pubsub.gossip

import io.libp2p.core.PeerId
import io.libp2p.core.multiformats.Protocol
import io.libp2p.core.pubsub.ValidationResult
import io.libp2p.etc.types.cappedDouble
import io.libp2p.etc.types.createLRUMap
Expand All @@ -16,6 +17,9 @@ import kotlin.math.max
import kotlin.math.min
import kotlin.math.pow

fun P2PService.PeerHandler.getIP(): String? =
streamHandler.stream.connection.remoteAddress().getStringComponent(Protocol.IP4)

class GossipScore(
val params: GossipScoreParams = GossipScoreParams(),
val executor: ScheduledExecutorService,
Expand All @@ -25,20 +29,41 @@ class GossipScore(
inner class TopicScores(val topic: Topic) {
private val params: GossipTopicScoreParams
get() = topicParams[topic]
private val recalcMaxDuration = params.timeInMeshQuantum
private var cachedScore: Double = 0.0
private var cacheValid: Boolean = false
private var prevParams = params
private var prevTime = curTimeMillis()

var joinedMeshTimeMillis: Long = 0
set(value) {
field = value
cacheValid = false
}

var firstMessageDeliveries: Double by cappedDouble(
0.0,
[email protected],
{ params.firstMessageDeliveriesCap }
{ params.firstMessageDeliveriesCap },
{ cacheValid = false }
)
var meshMessageDeliveries: Double by cappedDouble(
0.0,
[email protected],
{ params.meshMessageDeliveriesCap }
{ params.meshMessageDeliveriesCap },
{ cacheValid = false }
)
var meshFailurePenalty: Double by cappedDouble(
0.0,
[email protected],
{ _ -> cacheValid = false }
)

var invalidMessages: Double by cappedDouble(
0.0,
[email protected],
{ _ -> cacheValid = false }
)
var meshFailurePenalty: Double by cappedDouble(0.0, [email protected])
var invalidMessages: Double by cappedDouble(0.0, [email protected])

fun inMesh() = joinedMeshTimeMillis > 0

Expand All @@ -58,19 +83,26 @@ class GossipScore(
fun meshMessageDeliveriesDeficitSqr() = meshMessageDeliveriesDeficit().pow(2)

fun calcTopicScore(): Double {
val curTime = curTimeMillis()
if (cacheValid && prevParams === params && curTime - prevTime < recalcMaxDuration.toMillis()) {
return cachedScore
}
prevParams = params
prevTime = curTime
val p1 = meshTimeNorm()
val p2 = firstMessageDeliveries
val p3 = meshMessageDeliveriesDeficitSqr()
val p3b = meshFailurePenalty
val p4 = invalidMessages.pow(2)
val ret = params.topicWeight * (
cachedScore = params.topicWeight * (
p1 * params.timeInMeshWeight +
p2 * params.firstMessageDeliveriesWeight +
p3 * params.meshMessageDeliveriesWeight +
p3b * params.meshFailurePenaltyWeight +
p4 * params.invalidMessageDeliveriesWeight
)
return ret
cacheValid = true
return cachedScore
}

fun decayScores() {
Expand Down Expand Up @@ -101,6 +133,7 @@ class GossipScore(

private val validationTime: MutableMap<PubsubMessage, Long> = createLRUMap(1024)
val peerScores = mutableMapOf<PeerId, PeerScores>()
private val peerIpCache = mutableMapOf<PeerId, String>()

val refreshTask: ScheduledFuture<*>

Expand All @@ -112,6 +145,8 @@ class GossipScore(
private fun getPeerScores(peer: P2PService.PeerHandler) =
peerScores.computeIfAbsent(peer.peerId) { PeerScores() }

private fun getPeerIp(peer: P2PService.PeerHandler): String? = peerIpCache[peer.peerId]

private fun getTopicScores(peer: P2PService.PeerHandler, topic: Topic) =
getPeerScores(peer).topicScores.computeIfAbsent(topic) { TopicScores(it) }

Expand All @@ -133,7 +168,7 @@ class GossipScore(
)
val appScore = peerParams.appSpecificScore(peer.peerId) * peerParams.appSpecificWeight

val peersInIp: Int = peer.getIP()?.let { thisIp ->
val peersInIp: Int = getPeerIp(peer)?.let { thisIp ->
if (peerParams.ipWhitelisted(thisIp)) 0 else
peerScores.values.count { thisIp in it.ips }
} ?: 0
Expand Down Expand Up @@ -164,12 +199,17 @@ class GossipScore(
}

getPeerScores(peer).disconnectedTimeMillis = curTimeMillis()
peerIpCache -= peer.peerId
}

fun notifyConnected(peer: P2PService.PeerHandler) {
peer.getIP()?.also { peerIp ->
peerIpCache[peer.peerId] = peerIp
}

getPeerScores(peer).apply {
connectedTimeMillis = curTimeMillis()
peer.getIP()?.also { ips += it }
getPeerIp(peer)?.also { ips += it }
}
}

Expand Down
56 changes: 54 additions & 2 deletions src/test/kotlin/io/libp2p/etc/types/DelegatesTest.kt
Original file line number Diff line number Diff line change
Expand Up @@ -12,9 +12,21 @@ class DelegatesTest {
val min = AtomicDouble(5.0)
val minVal = AtomicDouble(0.0)

var cappedValueDelegate: Double by CappedValueDelegate(0.0, { min.get() }, { minVal.get() }, { max.get() }, { maxVal.get() })
val cappedValueDelegateUpdates = mutableListOf<Double>()
val cappedDoubleUpdates = mutableListOf<Double>()

var cappedValueDelegate: Double by CappedValueDelegate(
0.0,
{ min.get() },
{ minVal.get() },
{ max.get() },
{ maxVal.get() },
{ cappedValueDelegateUpdates += it }
)

var cappedInt: Int by cappedVar(10, 5, 20)
var cappedDouble: Double by cappedDouble(0.0, 1.0) { max.get() }
var cappedDouble: Double by cappedDouble(0.0, 1.0, { -> max.get() }, { cappedDoubleUpdates += it })
var blackhole: Double = 0.0

@Test
fun cappedVarTest() {
Expand Down Expand Up @@ -90,4 +102,44 @@ class DelegatesTest {
min.set(7.0)
assertThat(cappedValueDelegate).isEqualTo(0.0)
}

@Test
fun `test cappedDouble update callback`() {
cappedDouble = 5.0
assertThat(cappedDoubleUpdates).containsExactly(5.0)

cappedDouble = 5.0
assertThat(cappedDoubleUpdates).containsExactly(5.0)

cappedDouble = 4.0
assertThat(cappedDoubleUpdates).containsExactly(5.0, 4.0)

max.set(3.0)
blackhole = cappedDouble
assertThat(cappedDoubleUpdates).containsExactly(5.0, 4.0, 3.0)

max.set(5.0)
blackhole = cappedDouble
assertThat(cappedDoubleUpdates).containsExactly(5.0, 4.0, 3.0)
}

@Test
fun `test cappedValueDelegate update callback`() {
cappedValueDelegate = 8.0
assertThat(cappedValueDelegateUpdates).containsExactly(8.0)

cappedValueDelegate = 8.0
assertThat(cappedValueDelegateUpdates).containsExactly(8.0)

cappedValueDelegate = 7.0
assertThat(cappedValueDelegateUpdates).containsExactly(8.0, 7.0)

max.set(6.0)
blackhole = cappedValueDelegate
assertThat(cappedValueDelegateUpdates).containsExactly(8.0, 7.0, 15.0)

max.set(7.0)
blackhole = cappedValueDelegate
assertThat(cappedValueDelegateUpdates).containsExactly(8.0, 7.0, 15.0)
}
}
Loading