Skip to content

Commit b257b6b

Browse files
committed
fixup use Semaphore
Tool: gitpod/catfood.gitpod.cloud
1 parent 6dfd754 commit b257b6b

File tree

1 file changed

+68
-85
lines changed

1 file changed

+68
-85
lines changed

Diff for: components/ide/jetbrains/backend-plugin/src/main/kotlin/io/gitpod/jetbrains/remote/AbstractGitpodPortForwardingService.kt

+68-85
Original file line numberDiff line numberDiff line change
@@ -26,14 +26,18 @@ import java.util.*
2626
import java.util.concurrent.CompletableFuture
2727
import java.util.concurrent.ConcurrentHashMap
2828
import java.util.concurrent.atomic.AtomicReference
29-
import kotlinx.coroutines.sync.Mutex
30-
import kotlinx.coroutines.sync.withLock
29+
import kotlinx.coroutines.sync.Semaphore
30+
import kotlinx.coroutines.sync.withPermit
3131

3232
@Suppress("UnstableApiUsage")
3333
abstract class AbstractGitpodPortForwardingService : GitpodPortForwardingService {
3434
companion object {
3535
const val FORWARDED_PORT_LABEL = "ForwardedByGitpod"
3636
const val EXPOSED_PORT_LABEL = "ExposedByGitpod"
37+
private const val MAX_CONCURRENT_OPERATIONS = 10
38+
private const val BATCH_SIZE = 10
39+
private const val BATCH_DELAY = 100L
40+
private const val DEBOUNCE_DELAY = 500L
3741
}
3842

3943
private val perClientPortForwardingManager = service<PerClientPortForwardingManager>()
@@ -52,12 +56,8 @@ abstract class AbstractGitpodPortForwardingService : GitpodPortForwardingService
5256
// Debounce job for port updates
5357
private var debounceJob: Job? = null
5458

55-
// Batch size for port operations
56-
private val batchSize = 10
57-
private val batchDelay = 100L
58-
59-
// Mutex to limit concurrent operations
60-
private val operationMutex = Mutex()
59+
// Semaphore to limit concurrent operations
60+
private val operationSemaphore = Semaphore(MAX_CONCURRENT_OPERATIONS)
6161

6262
init { start() }
6363

@@ -82,7 +82,6 @@ abstract class AbstractGitpodPortForwardingService : GitpodPortForwardingService
8282
is InterruptedException, is CancellationException -> {
8383
cancel("gitpod: Stopped observing ports list due to an expected interruption.")
8484
}
85-
8685
else -> {
8786
thisLogger().warn(
8887
"gitpod: Got an error while trying to get ports list from Supervisor. " +
@@ -98,9 +97,7 @@ abstract class AbstractGitpodPortForwardingService : GitpodPortForwardingService
9897

9998
private fun observePortsList(): CompletableFuture<Void> {
10099
val completableFuture = CompletableFuture<Void>()
101-
102100
val statusServiceStub = StatusServiceGrpc.newStub(GitpodManager.supervisorChannel)
103-
104101
val portsStatusRequest = Status.PortsStatusRequest.newBuilder().setObserve(true).build()
105102

106103
val portsStatusResponseObserver = object :
@@ -110,12 +107,9 @@ abstract class AbstractGitpodPortForwardingService : GitpodPortForwardingService
110107
}
111108

112109
override fun onNext(response: Status.PortsStatusResponse) {
113-
// Cancel previous debounce job if exists
114110
debounceJob?.cancel()
115-
116-
// Create new debounce job
117111
debounceJob = runJob(lifetime) {
118-
delay(500)
112+
delay(DEBOUNCE_DELAY)
119113
try {
120114
if (syncInProgress.compareAndSet(false, true)) {
121115
try {
@@ -142,7 +136,6 @@ abstract class AbstractGitpodPortForwardingService : GitpodPortForwardingService
142136
}
143137

144138
statusServiceStub.portsStatus(portsStatusRequest, portsStatusResponseObserver)
145-
146139
return completableFuture
147140
}
148141

@@ -165,7 +158,6 @@ abstract class AbstractGitpodPortForwardingService : GitpodPortForwardingService
165158
val exposedPorts = servedPorts.filter { it.exposed?.url?.isNotBlank() ?: false }
166159
val portsNumbersFromNonServedPorts = portsList.filter { !it.served }.map { it.localPort }
167160

168-
// Clean up unused port lifetimes
169161
val allPortsToKeep = mutableSetOf<Int>()
170162

171163
val servedPortsToStartForwarding = servedPorts.filter {
@@ -184,52 +176,35 @@ abstract class AbstractGitpodPortForwardingService : GitpodPortForwardingService
184176
.map { it.hostPortNumber }
185177
.filter { portsNumbersFromNonServedPorts.contains(it) || !portsNumbersFromPortsList.contains(it) }
186178

187-
// Process port changes in background with batching
188179
runJob(lifetime) {
189180
try {
190-
forwardedPortsToStopForwarding.chunked(batchSize).forEach { batch ->
191-
batch.forEach { port ->
192-
operationMutex.withLock { stopForwarding(port) }
193-
}
194-
delay(batchDelay)
181+
processPortsInBatches(forwardedPortsToStopForwarding) { port ->
182+
operationSemaphore.withPermit { stopForwarding(port) }
195183
}
196184

197-
exposedPortsToStopExposingOnClient.chunked(batchSize).forEach { batch ->
198-
batch.forEach { port ->
199-
operationMutex.withLock { stopExposingOnClient(port) }
200-
}
201-
delay(batchDelay)
185+
processPortsInBatches(exposedPortsToStopExposingOnClient) { port ->
186+
operationSemaphore.withPermit { stopExposingOnClient(port) }
202187
}
203188

204-
servedPortsToStartForwarding.chunked(batchSize).forEach { batch ->
205-
batch.forEach { port ->
206-
operationMutex.withLock {
207-
startForwarding(port)
208-
allPortsToKeep.add(port.localPort)
209-
}
189+
processPortsInBatches(servedPortsToStartForwarding) { port ->
190+
operationSemaphore.withPermit {
191+
startForwarding(port)
192+
allPortsToKeep.add(port.localPort)
210193
}
211-
delay(batchDelay)
212194
}
213195

214-
exposedPortsToStartExposingOnClient.chunked(batchSize).forEach { batch ->
215-
batch.forEach { port ->
216-
operationMutex.withLock {
217-
startExposingOnClient(port)
218-
allPortsToKeep.add(port.localPort)
219-
}
196+
processPortsInBatches(exposedPortsToStartExposingOnClient) { port ->
197+
operationSemaphore.withPermit {
198+
startExposingOnClient(port)
199+
allPortsToKeep.add(port.localPort)
220200
}
221-
delay(batchDelay)
222201
}
223202

224-
// Update presentation for all ports in batches
225-
portsList.chunked(batchSize).forEach { batch ->
203+
processPortsInBatches(portsList) { port ->
226204
application.invokeLater {
227-
batch.forEach {
228-
updatePortsPresentation(it)
229-
allPortsToKeep.add(it.localPort)
230-
}
205+
updatePortsPresentation(port)
206+
allPortsToKeep.add(port.localPort)
231207
}
232-
delay(batchDelay)
233208
}
234209

235210
cleanupUnusedLifetimes(allPortsToKeep)
@@ -239,9 +214,17 @@ abstract class AbstractGitpodPortForwardingService : GitpodPortForwardingService
239214
}
240215
}
241216

217+
private suspend fun <T> processPortsInBatches(ports: List<T>, action: suspend (T) -> Unit) {
218+
ports.chunked(BATCH_SIZE).forEach { batch ->
219+
batch.forEach { port ->
220+
action(port)
221+
}
222+
delay(BATCH_DELAY)
223+
}
224+
}
225+
242226
private fun cleanupUnusedLifetimes(portsToKeep: Set<Int>) {
243-
val portsToRemove = portLifetimes.keys.filter { !portsToKeep.contains(it) }
244-
portsToRemove.forEach { port ->
227+
portLifetimes.keys.filter { !portsToKeep.contains(it) }.forEach { port ->
245228
portLifetimes[port]?.let { lifetime ->
246229
thisLogger().debug("gitpod: Terminating lifetime for port $port")
247230
lifetime.terminate()
@@ -251,9 +234,7 @@ abstract class AbstractGitpodPortForwardingService : GitpodPortForwardingService
251234
}
252235

253236
private fun startForwarding(portStatus: PortsStatus) {
254-
if (isLocalPortForwardingDisabled()) {
255-
return
256-
}
237+
if (isLocalPortForwardingDisabled()) return
257238

258239
val portLifetime = getOrCreatePortLifetime(portStatus.localPort)
259240

@@ -332,12 +313,11 @@ abstract class AbstractGitpodPortForwardingService : GitpodPortForwardingService
332313
}
333314
}
334315

335-
private fun getOrCreatePortLifetime(port: Int): Lifetime {
336-
return portLifetimes.computeIfAbsent(port) {
316+
private fun getOrCreatePortLifetime(port: Int): Lifetime =
317+
portLifetimes.computeIfAbsent(port) {
337318
thisLogger().debug("gitpod: Creating new lifetime for port $port")
338319
lifetime.createNested()
339320
}
340-
}
341321

342322
private fun terminatePortLifetime(port: Int) {
343323
portLifetimes[port]?.let { portLifetime ->
@@ -349,39 +329,42 @@ abstract class AbstractGitpodPortForwardingService : GitpodPortForwardingService
349329

350330
private fun updatePortsPresentation(portStatus: PortsStatus) {
351331
perClientPortForwardingManager.getPorts(portStatus.localPort).forEach {
352-
if (it.configuration.isForwardedPort()) {
353-
it.presentation.name = portStatus.name
354-
it.presentation.description = portStatus.description
355-
it.presentation.tooltip = "Forwarded"
356-
it.presentation.icon = RowIcon(AllIcons.Actions.Commit)
357-
} else if (it.configuration.isExposedPort()) {
358-
val isPubliclyExposed = (portStatus.exposed.visibility == Status.PortVisibility.public_visibility)
359-
360-
it.presentation.name = portStatus.name
361-
it.presentation.description = portStatus.description
362-
it.presentation.tooltip = "Exposed (${if (isPubliclyExposed) "Public" else "Private"})"
363-
it.presentation.icon = if (isPubliclyExposed) {
364-
RowIcon(AllIcons.Actions.Commit)
365-
} else {
366-
RowIcon(AllIcons.Actions.Commit, AllIcons.Diff.Lock)
332+
when {
333+
it.configuration.isForwardedPort() -> {
334+
it.presentation.name = portStatus.name
335+
it.presentation.description = portStatus.description
336+
it.presentation.tooltip = "Forwarded"
337+
it.presentation.icon = RowIcon(AllIcons.Actions.Commit)
338+
}
339+
it.configuration.isExposedPort() -> {
340+
val isPubliclyExposed = (portStatus.exposed.visibility == Status.PortVisibility.public_visibility)
341+
it.presentation.name = portStatus.name
342+
it.presentation.description = portStatus.description
343+
it.presentation.tooltip = "Exposed (${if (isPubliclyExposed) "Public" else "Private"})"
344+
it.presentation.icon = if (isPubliclyExposed) {
345+
RowIcon(AllIcons.Actions.Commit)
346+
} else {
347+
RowIcon(AllIcons.Actions.Commit, AllIcons.Diff.Lock)
348+
}
367349
}
368350
}
369351
}
370352
}
371353

372-
override fun getLocalHostUriFromHostPort(hostPort: Int): Optional<URI> {
373-
val forwardedPort = perClientPortForwardingManager.getPorts(hostPort).firstOrNull {
374-
it.configuration.isForwardedPort()
375-
} ?: return Optional.empty()
376-
377-
(forwardedPort.configuration as PortConfiguration.PerClientTcpForwarding).clientPortState.let {
378-
return if (it is ClientPortState.Assigned) {
379-
Optional.of(URIBuilder().setScheme("http").setHost(it.clientInterface).setPort(it.clientPort).build())
380-
} else {
381-
Optional.empty()
382-
}
383-
}
384-
}
354+
override fun getLocalHostUriFromHostPort(hostPort: Int): Optional<URI> =
355+
perClientPortForwardingManager.getPorts(hostPort)
356+
.firstOrNull { it.configuration.isForwardedPort() }
357+
?.let { forwardedPort ->
358+
(forwardedPort.configuration as PortConfiguration.PerClientTcpForwarding)
359+
.clientPortState
360+
.let {
361+
if (it is ClientPortState.Assigned) {
362+
Optional.of(URIBuilder().setScheme("http").setHost(it.clientInterface).setPort(it.clientPort).build())
363+
} else {
364+
Optional.empty()
365+
}
366+
}
367+
} ?: Optional.empty()
385368

386369
override fun dispose() {
387370
portLifetimes.values.forEach { it.terminate() }

0 commit comments

Comments
 (0)