From d91296c47d73358b3d5e0ebcb018240902168cf7 Mon Sep 17 00:00:00 2001 From: Alexander Nozik Date: Mon, 25 Mar 2024 15:48:23 +0300 Subject: [PATCH] Refactor load test --- .../space/kscience/controls/ports/Port.kt | 2 +- .../space/kscience/controls/ports/phrases.kt | 25 ++++ .../kscience/controls/client/controlsMagix.kt | 5 +- demo/many-devices/build.gradle.kts | 2 +- .../kscience/controls/demo/MassDevice.kt | 109 ++++++++++-------- gradle/wrapper/gradle-wrapper.properties | 2 +- 6 files changed, 91 insertions(+), 54 deletions(-) diff --git a/controls-core/src/commonMain/kotlin/space/kscience/controls/ports/Port.kt b/controls-core/src/commonMain/kotlin/space/kscience/controls/ports/Port.kt index b17de92..9366604 100644 --- a/controls-core/src/commonMain/kotlin/space/kscience/controls/ports/Port.kt +++ b/controls-core/src/commonMain/kotlin/space/kscience/controls/ports/Port.kt @@ -80,7 +80,7 @@ public abstract class AbstractPort( /** * Raw flow of incoming data chunks. The chunks are not guaranteed to be complete phrases. - * In order to form phrases, some condition should be used on top of it. + * To form phrases, some condition should be used on top of it. * For example [stringsDelimitedIncoming] generates phrases with fixed delimiter. */ override fun receiving(): Flow = incoming.receiveAsFlow() diff --git a/controls-core/src/commonMain/kotlin/space/kscience/controls/ports/phrases.kt b/controls-core/src/commonMain/kotlin/space/kscience/controls/ports/phrases.kt index b86591f..03d21f5 100644 --- a/controls-core/src/commonMain/kotlin/space/kscience/controls/ports/phrases.kt +++ b/controls-core/src/commonMain/kotlin/space/kscience/controls/ports/phrases.kt @@ -42,6 +42,31 @@ public fun Flow.withDelimiter(delimiter: ByteArray): Flow } } +private fun Flow.withFixedMessageSize(messageSize: Int): Flow { + require(messageSize > 0) { "Message size should be positive" } + + val output = Buffer() + + onCompletion { + output.close() + } + + return transform { chunk -> + val remaining: Int = (messageSize - output.size).toInt() + if (chunk.size >= remaining) { + output.write(chunk, endIndex = remaining) + emit(output.readByteArray()) + output.clear() + //write the remaining chunk fragment + if(chunk.size> remaining) { + output.write(chunk, startIndex = remaining) + } + } else { + output.write(chunk) + } + } +} + /** * Transform byte fragments into utf-8 phrases using utf-8 delimiter */ diff --git a/controls-magix/src/commonMain/kotlin/space/kscience/controls/client/controlsMagix.kt b/controls-magix/src/commonMain/kotlin/space/kscience/controls/client/controlsMagix.kt index 8aa4383..915d0a0 100644 --- a/controls-magix/src/commonMain/kotlin/space/kscience/controls/client/controlsMagix.kt +++ b/controls-magix/src/commonMain/kotlin/space/kscience/controls/client/controlsMagix.kt @@ -12,6 +12,8 @@ import space.kscience.controls.manager.respondHubMessage import space.kscience.dataforge.context.error import space.kscience.dataforge.context.logger import space.kscience.magix.api.* +import kotlin.coroutines.CoroutineContext +import kotlin.coroutines.EmptyCoroutineContext internal val controlsMagixFormat: MagixFormat = MagixFormat( @@ -38,7 +40,8 @@ internal fun generateId(request: MagixMessage): String = if (request.id != null) public fun DeviceManager.launchMagixService( endpoint: MagixEndpoint, endpointID: String = controlsMagixFormat.defaultFormat, -): Job = context.launch { + coroutineContext: CoroutineContext = EmptyCoroutineContext, +): Job = context.launch(coroutineContext) { endpoint.subscribe(controlsMagixFormat, targetFilter = listOf(endpointID, null)).onEach { (request, payload) -> val responsePayload = respondHubMessage(payload) responsePayload.forEach { diff --git a/demo/many-devices/build.gradle.kts b/demo/many-devices/build.gradle.kts index 7248e42..f46b71f 100644 --- a/demo/many-devices/build.gradle.kts +++ b/demo/many-devices/build.gradle.kts @@ -19,7 +19,7 @@ dependencies { implementation(projects.magix.magixZmq) implementation("io.ktor:ktor-client-cio:$ktorVersion") - implementation("space.kscience:plotlykt-server:0.6.0") + implementation("space.kscience:plotlykt-server:0.7.1") implementation(spclibs.logback.classic) } diff --git a/demo/many-devices/src/main/kotlin/space/kscience/controls/demo/MassDevice.kt b/demo/many-devices/src/main/kotlin/space/kscience/controls/demo/MassDevice.kt index 22d7e1c..5eafabf 100644 --- a/demo/many-devices/src/main/kotlin/space/kscience/controls/demo/MassDevice.kt +++ b/demo/many-devices/src/main/kotlin/space/kscience/controls/demo/MassDevice.kt @@ -1,8 +1,11 @@ package space.kscience.controls.demo -import kotlinx.coroutines.* +import kotlinx.coroutines.Dispatchers +import kotlinx.coroutines.delay import kotlinx.coroutines.flow.launchIn import kotlinx.coroutines.flow.onEach +import kotlinx.coroutines.isActive +import kotlinx.coroutines.launch import kotlinx.coroutines.sync.Mutex import kotlinx.coroutines.sync.withLock import kotlinx.datetime.Clock @@ -19,15 +22,19 @@ import space.kscience.dataforge.meta.get import space.kscience.dataforge.meta.int import space.kscience.magix.api.MagixEndpoint import space.kscience.magix.api.subscribe -import space.kscience.magix.rsocket.rSocketWithTcp -import space.kscience.magix.rsocket.rSocketWithWebSockets +import space.kscience.magix.rsocket.rSocketStreamWithWebSockets import space.kscience.magix.server.RSocketMagixFlowPlugin import space.kscience.magix.server.startMagixServer -import space.kscience.plotly.* +import space.kscience.plotly.Plotly +import space.kscience.plotly.PlotlyConfig +import space.kscience.plotly.layout +import space.kscience.plotly.models.Bar +import space.kscience.plotly.plot import space.kscience.plotly.server.PlotlyUpdateMode import space.kscience.plotly.server.serve import space.kscience.plotly.server.show import space.kscince.magix.zmq.ZmqMagixFlowPlugin +import space.kscince.magix.zmq.zmq import kotlin.random.Random import kotlin.time.Duration import kotlin.time.Duration.Companion.ZERO @@ -46,14 +53,13 @@ class MassDevice(context: Context, meta: Meta) : DeviceBySpec(MassDe val value by doubleProperty { randomValue } override suspend fun MassDevice.onOpen() { - doRecurring((meta["delay"].int ?: 10).milliseconds) { + doRecurring((meta["delay"].int ?: 5).milliseconds) { read(value) } } } } -@OptIn(DelicateCoroutinesApi::class) suspend fun main() { val context = Context("Mass") @@ -62,28 +68,60 @@ suspend fun main() { ZmqMagixFlowPlugin() ) - val numDevices = 100 + val numDevices = 50 + repeat(numDevices) { - context.launch(newFixedThreadPoolContext(2, "Device${it}")) { - delay(1) - val deviceContext = Context("Device${it}") { - plugin(DeviceManager) + delay(1) + val deviceContext = Context("Device${it}") { + plugin(DeviceManager) + } + + val deviceManager = deviceContext.request(DeviceManager) + + deviceManager.install("device$it", MassDevice) + + val endpointId = "device$it" + val deviceEndpoint = MagixEndpoint.rSocketStreamWithWebSockets("localhost") + deviceManager.launchMagixService(deviceEndpoint, endpointId, Dispatchers.IO) + + } + + val trace = Bar { + context.launch(Dispatchers.IO) { + val monitorEndpoint = MagixEndpoint.zmq("localhost") + + val mutex = Mutex() + + val latest = HashMap() + val max = HashMap() + + monitorEndpoint.subscribe(DeviceManager.magixFormat).onEach { (magixMessage, payload) -> + mutex.withLock { + val delay = Clock.System.now() - payload.time + latest[magixMessage.sourceEndpoint] = Clock.System.now() - payload.time + max[magixMessage.sourceEndpoint] = + maxOf(delay, max[magixMessage.sourceEndpoint] ?: ZERO) + } + }.launchIn(this) + + while (isActive) { + delay(200) + mutex.withLock { + val sorted = max.mapKeys { it.key.substring(6).toInt() }.toSortedMap() + latest.clear() + max.clear() + x.numbers = sorted.keys + y.numbers = sorted.values.map { it.inWholeMicroseconds / 1000.0 + 0.0001 } + } } - - val deviceManager = deviceContext.request(DeviceManager) - - deviceManager.install("device$it", MassDevice) - - val endpointId = "device$it" - val deviceEndpoint = MagixEndpoint.rSocketWithTcp("localhost") - deviceManager.launchMagixService(deviceEndpoint, endpointId) } } - val application = Plotly.serve(port = 9091, scope = context) { + val application = Plotly.serve(port = 9091) { updateMode = PlotlyUpdateMode.PUSH updateInterval = 1000 + page { container -> plot(renderer = container, config = PlotlyConfig { saveAsSvg() }) { layout { @@ -92,36 +130,7 @@ suspend fun main() { xaxis.title = "Device number" yaxis.title = "Maximum latency in ms" } - bar { - launch(Dispatchers.IO) { - val monitorEndpoint = MagixEndpoint.rSocketWithWebSockets("localhost") - - val mutex = Mutex() - - val latest = HashMap() - val max = HashMap() - - monitorEndpoint.subscribe(DeviceManager.magixFormat).onEach { (magixMessage, payload) -> - mutex.withLock { - val delay = Clock.System.now() - payload.time - latest[magixMessage.sourceEndpoint] = Clock.System.now() - payload.time - max[magixMessage.sourceEndpoint] = - maxOf(delay, max[magixMessage.sourceEndpoint] ?: ZERO) - } - }.launchIn(this) - - while (isActive) { - delay(200) - mutex.withLock { - val sorted = max.mapKeys { it.key.substring(6).toInt() }.toSortedMap() - latest.clear() - max.clear() - x.numbers = sorted.keys - y.numbers = sorted.values.map { it.inWholeMicroseconds / 1000.0 + 0.0001 } - } - } - } - } + traces(trace) } } } diff --git a/gradle/wrapper/gradle-wrapper.properties b/gradle/wrapper/gradle-wrapper.properties index e411586..48c0a02 100644 --- a/gradle/wrapper/gradle-wrapper.properties +++ b/gradle/wrapper/gradle-wrapper.properties @@ -1,5 +1,5 @@ distributionBase=GRADLE_USER_HOME distributionPath=wrapper/dists -distributionUrl=https\://services.gradle.org/distributions/gradle-8.4-bin.zip +distributionUrl=https\://services.gradle.org/distributions/gradle-8.7-bin.zip zipStoreBase=GRADLE_USER_HOME zipStorePath=wrapper/dists