From 7103786ec9384dbde75596a4e632b0b876a2757a Mon Sep 17 00:00:00 2001 From: Alexander Nozik Date: Mon, 8 May 2023 15:39:34 +0300 Subject: [PATCH] Stress test demo --- .../controls/spec/deviceExtensions.kt | 18 ++- controls-magix-client/build.gradle.kts | 6 +- .../kscience/controls/client/DeviceClient.kt | 1 + .../kscience/controls/client/controlsMagix.kt | 6 +- .../kscience/controls/client/tangoMagix.kt | 4 +- demo/many-devices/README.md | 4 + demo/many-devices/build.gradle.kts | 39 ++++++ .../kscience/controls/demo/MassDevice.kt | 118 ++++++++++++++++++ .../space/kscience/magix/api/MagixEndpoint.kt | 4 +- .../kscience/magix/api/MagixMessageFilter.kt | 12 +- .../magix/rsocket/RSocketMagixEndpoint.kt | 14 +-- .../rsocket/RSocketStreamMagixEndpoint.kt | 2 +- .../space/kscience/magix/rsocket/withTcp.kt | 2 +- .../magix/server/RSocketMagixFlowPlugin.kt | 2 +- settings.gradle.kts | 1 + 15 files changed, 201 insertions(+), 32 deletions(-) create mode 100644 demo/many-devices/README.md create mode 100644 demo/many-devices/build.gradle.kts create mode 100644 demo/many-devices/src/main/kotlin/space/kscience/controls/demo/MassDevice.kt diff --git a/controls-core/src/commonMain/kotlin/space/kscience/controls/spec/deviceExtensions.kt b/controls-core/src/commonMain/kotlin/space/kscience/controls/spec/deviceExtensions.kt index 55fc216..af42f60 100644 --- a/controls-core/src/commonMain/kotlin/space/kscience/controls/spec/deviceExtensions.kt +++ b/controls-core/src/commonMain/kotlin/space/kscience/controls/spec/deviceExtensions.kt @@ -1,10 +1,12 @@ package space.kscience.controls.spec import kotlinx.coroutines.Job +import kotlinx.coroutines.delay import kotlinx.coroutines.flow.Flow import kotlinx.coroutines.flow.flow import kotlinx.coroutines.isActive import kotlinx.coroutines.launch +import space.kscience.controls.api.Device import kotlin.time.Duration /** @@ -14,19 +16,23 @@ import kotlin.time.Duration * * The flow is canceled when the device scope is canceled */ -public fun , R> D.readRecurring(interval: Duration, reader: suspend D.() -> R): Flow = flow { +public fun D.readRecurring(interval: Duration, reader: suspend D.() -> R): Flow = flow { while (isActive) { - kotlinx.coroutines.delay(interval) - emit(reader()) + delay(interval) + launch { + emit(reader()) + } } } /** * Do a recurring (with a fixed delay) task on a device. */ -public fun > D.doRecurring(interval: Duration, task: suspend D.() -> Unit): Job = launch { +public fun D.doRecurring(interval: Duration, task: suspend D.() -> Unit): Job = launch { while (isActive) { - kotlinx.coroutines.delay(interval) - task() + delay(interval) + launch { + task() + } } } \ No newline at end of file diff --git a/controls-magix-client/build.gradle.kts b/controls-magix-client/build.gradle.kts index 0f108f4..cbdefcc 100644 --- a/controls-magix-client/build.gradle.kts +++ b/controls-magix-client/build.gradle.kts @@ -14,9 +14,9 @@ kscience { json() } dependencies { - implementation(projects.magix.magixApi) - implementation(projects.controlsCore) - implementation("com.benasher44:uuid:0.7.0") + api(projects.magix.magixApi) + api(projects.controlsCore) + api("com.benasher44:uuid:0.7.0") } } diff --git a/controls-magix-client/src/commonMain/kotlin/space/kscience/controls/client/DeviceClient.kt b/controls-magix-client/src/commonMain/kotlin/space/kscience/controls/client/DeviceClient.kt index 16fd4b0..53a7704 100644 --- a/controls-magix-client/src/commonMain/kotlin/space/kscience/controls/client/DeviceClient.kt +++ b/controls-magix-client/src/commonMain/kotlin/space/kscience/controls/client/DeviceClient.kt @@ -26,6 +26,7 @@ public class DeviceClient( private val send: suspend (DeviceMessage) -> Unit, ) : Device { + @OptIn(kotlinx.coroutines.ExperimentalCoroutinesApi::class) override val coroutineContext: CoroutineContext = newCoroutineContext(context.coroutineContext) private val mutex = Mutex() diff --git a/controls-magix-client/src/commonMain/kotlin/space/kscience/controls/client/controlsMagix.kt b/controls-magix-client/src/commonMain/kotlin/space/kscience/controls/client/controlsMagix.kt index be77a4a..f64c8e0 100644 --- a/controls-magix-client/src/commonMain/kotlin/space/kscience/controls/client/controlsMagix.kt +++ b/controls-magix-client/src/commonMain/kotlin/space/kscience/controls/client/controlsMagix.kt @@ -32,7 +32,7 @@ public fun DeviceManager.connectToMagix( endpoint: MagixEndpoint, endpointID: String = controlsMagixFormat.defaultFormat, ): Job = context.launch { - endpoint.subscribe(controlsMagixFormat).onEach { (request, payload) -> + endpoint.subscribe(controlsMagixFormat, targetFilter = listOf(endpointID)).onEach { (request, payload) -> val responsePayload = respondHubMessage(payload) if (responsePayload != null) { endpoint.broadcast( @@ -44,7 +44,7 @@ public fun DeviceManager.connectToMagix( ) } }.catch { error -> - logger.error(error) { "Error while responding to message" } + logger.error(error) { "Error while responding to message: ${error.message}" } }.launchIn(this) hubMessageFlow(this).onEach { payload -> @@ -55,7 +55,7 @@ public fun DeviceManager.connectToMagix( id = "df[${payload.hashCode()}]" ) }.catch { error -> - logger.error(error) { "Error while sending a message" } + logger.error(error) { "Error while sending a message: ${error.message}" } }.launchIn(this) } diff --git a/controls-magix-client/src/commonMain/kotlin/space/kscience/controls/client/tangoMagix.kt b/controls-magix-client/src/commonMain/kotlin/space/kscience/controls/client/tangoMagix.kt index ddec8d6..48ae3c5 100644 --- a/controls-magix-client/src/commonMain/kotlin/space/kscience/controls/client/tangoMagix.kt +++ b/controls-magix-client/src/commonMain/kotlin/space/kscience/controls/client/tangoMagix.kt @@ -66,7 +66,9 @@ internal val tangoMagixFormat = MagixFormat( setOf("tango") ) - +/** + * Controls-kt device binding for Tango-flavored magix loop + */ public fun DeviceManager.launchTangoMagix( endpoint: MagixEndpoint, endpointID: String = TANGO_MAGIX_FORMAT, diff --git a/demo/many-devices/README.md b/demo/many-devices/README.md new file mode 100644 index 0000000..5d9c12f --- /dev/null +++ b/demo/many-devices/README.md @@ -0,0 +1,4 @@ +# Module all-things + + + diff --git a/demo/many-devices/build.gradle.kts b/demo/many-devices/build.gradle.kts new file mode 100644 index 0000000..d360ffa --- /dev/null +++ b/demo/many-devices/build.gradle.kts @@ -0,0 +1,39 @@ +plugins { + kotlin("jvm") + application +} + + +repositories { + mavenCentral() + maven("https://repo.kotlin.link") +} + +val ktorVersion: String by rootProject.extra +val rsocketVersion: String by rootProject.extra + +dependencies { + implementation(projects.magix.magixServer) + implementation(projects.controlsMagixClient) + implementation(projects.magix.magixRsocket) + + implementation("io.ktor:ktor-client-cio:$ktorVersion") + implementation("space.kscience:plotlykt-server:0.5.3") + implementation(spclibs.logback.classic) +} + +kotlin{ + jvmToolchain(11) +} + + +tasks.withType().configureEach { + kotlinOptions { + freeCompilerArgs = freeCompilerArgs + listOf("-Xjvm-default=all", "-Xopt-in=kotlin.RequiresOptIn") + } +} + + +application { + mainClass.set("space.kscience.controls.demo.DemoControllerViewKt") +} \ No newline at end of file diff --git a/demo/many-devices/src/main/kotlin/space/kscience/controls/demo/MassDevice.kt b/demo/many-devices/src/main/kotlin/space/kscience/controls/demo/MassDevice.kt new file mode 100644 index 0000000..a62b928 --- /dev/null +++ b/demo/many-devices/src/main/kotlin/space/kscience/controls/demo/MassDevice.kt @@ -0,0 +1,118 @@ +package space.kscience.controls.demo + +import kotlinx.coroutines.Dispatchers +import kotlinx.coroutines.delay +import kotlinx.coroutines.flow.launchIn +import kotlinx.coroutines.flow.onEach +import kotlinx.coroutines.isActive +import kotlinx.coroutines.launch +import kotlinx.datetime.Clock +import kotlinx.datetime.Instant +import space.kscience.controls.client.connectToMagix +import space.kscience.controls.client.controlsMagixFormat +import space.kscience.controls.manager.DeviceManager +import space.kscience.controls.manager.install +import space.kscience.controls.spec.* +import space.kscience.dataforge.context.Context +import space.kscience.dataforge.context.Factory +import space.kscience.dataforge.context.request +import space.kscience.dataforge.meta.Meta +import space.kscience.dataforge.meta.get +import space.kscience.dataforge.meta.int +import space.kscience.magix.api.MagixEndpoint +import space.kscience.magix.api.subscribe +import space.kscience.magix.rsocket.rSocketWithTcp +import space.kscience.magix.rsocket.rSocketWithWebSockets +import space.kscience.magix.server.RSocketMagixFlowPlugin +import space.kscience.magix.server.startMagixServer +import space.kscience.plotly.Plotly +import space.kscience.plotly.layout +import space.kscience.plotly.plot +import space.kscience.plotly.scatter +import space.kscience.plotly.server.PlotlyUpdateMode +import space.kscience.plotly.server.serve +import space.kscience.plotly.server.show +import java.util.concurrent.ConcurrentHashMap +import kotlin.random.Random +import kotlin.time.Duration.Companion.milliseconds + + +class MassDevice(context: Context, meta: Meta) : DeviceBySpec(MassDevice, context, meta) { + private val rng = Random(meta["seed"].int ?: 0) + + private val randomValue get() = rng.nextDouble() + + companion object : DeviceSpec(), Factory { + + override fun build(context: Context, meta: Meta): MassDevice = MassDevice(context, meta) + + val value by doubleProperty { randomValue } + + override suspend fun MassDevice.onOpen() { + doRecurring(200.milliseconds) { + read(value) + } + } + } +} + +fun main() { + val context = Context("Mass") + + context.startMagixServer( + RSocketMagixFlowPlugin() + ) + + val numDevices = 1000 + + repeat(numDevices) { + val deviceContext = Context("Device${it}") { + plugin(DeviceManager) + } + + val deviceManager = deviceContext.request(DeviceManager) + + deviceManager.install("device$it", MassDevice) + + deviceContext.launch(Dispatchers.Default) { + val deviceEndpoint = MagixEndpoint.rSocketWithWebSockets("localhost") + deviceManager.connectToMagix(deviceEndpoint, "device$it") + } + } + + val application = Plotly.serve(port = 9091, scope = context) { + updateMode = PlotlyUpdateMode.PUSH + updateInterval = 1000 + page { container -> + plot(renderer = container) { + layout { + title = "Latest event" + } + scatter { + launch(Dispatchers.Default) { + val monitorEndpoint = MagixEndpoint.rSocketWithTcp("localhost") + + val latest = ConcurrentHashMap() + + monitorEndpoint.subscribe(controlsMagixFormat).onEach { (magixMessage, payload) -> + latest[magixMessage.origin] = payload.time ?: Clock.System.now() + }.launchIn(this) + + while (isActive) { + delay(1000) + val now = Clock.System.now() + x.strings = latest.keys + y.numbers = latest.values.map { now.minus(it).inWholeMilliseconds / 1000.0 } + } + } + } + } + } + } + + application.show() + + while (readlnOrNull().isNullOrBlank()) { + + } +} diff --git a/magix/magix-api/src/commonMain/kotlin/space/kscience/magix/api/MagixEndpoint.kt b/magix/magix-api/src/commonMain/kotlin/space/kscience/magix/api/MagixEndpoint.kt index 0a5abd3..35cb8e5 100644 --- a/magix/magix-api/src/commonMain/kotlin/space/kscience/magix/api/MagixEndpoint.kt +++ b/magix/magix-api/src/commonMain/kotlin/space/kscience/magix/api/MagixEndpoint.kt @@ -19,9 +19,7 @@ public interface MagixEndpoint { /** * Send an event */ - public suspend fun broadcast( - message: MagixMessage, - ) + public suspend fun broadcast(message: MagixMessage) /** * Close the endpoint and the associated connection if it exists diff --git a/magix/magix-api/src/commonMain/kotlin/space/kscience/magix/api/MagixMessageFilter.kt b/magix/magix-api/src/commonMain/kotlin/space/kscience/magix/api/MagixMessageFilter.kt index f497b97..4ce1995 100644 --- a/magix/magix-api/src/commonMain/kotlin/space/kscience/magix/api/MagixMessageFilter.kt +++ b/magix/magix-api/src/commonMain/kotlin/space/kscience/magix/api/MagixMessageFilter.kt @@ -10,6 +10,12 @@ public data class MagixMessageFilter( val origin: Collection? = null, val target: Collection? = null, ) { + + public fun accepts(message: MagixMessage): Boolean = + format?.contains(message.format) ?: true + && origin?.contains(message.origin) ?: true + && target?.contains(message.target) ?: true + public companion object { public val ALL: MagixMessageFilter = MagixMessageFilter() } @@ -22,9 +28,5 @@ public fun Flow.filter(filter: MagixMessageFilter): Flow - filter.format?.contains(message.format) ?: true - && filter.origin?.contains(message.origin) ?: true - && filter.target?.contains(message.target) ?: true - } + return filter(filter::accepts) } \ No newline at end of file diff --git a/magix/magix-rsocket/src/commonMain/kotlin/space/kscience/magix/rsocket/RSocketMagixEndpoint.kt b/magix/magix-rsocket/src/commonMain/kotlin/space/kscience/magix/rsocket/RSocketMagixEndpoint.kt index f9690fc..19cc8f8 100644 --- a/magix/magix-rsocket/src/commonMain/kotlin/space/kscience/magix/rsocket/RSocketMagixEndpoint.kt +++ b/magix/magix-rsocket/src/commonMain/kotlin/space/kscience/magix/rsocket/RSocketMagixEndpoint.kt @@ -18,22 +18,20 @@ import space.kscience.magix.api.MagixEndpoint import space.kscience.magix.api.MagixMessage import space.kscience.magix.api.MagixMessageFilter import space.kscience.magix.api.filter -import kotlin.coroutines.CoroutineContext import kotlin.coroutines.coroutineContext -public class RSocketMagixEndpoint( - private val rSocket: RSocket, - private val coroutineContext: CoroutineContext, -) : MagixEndpoint, Closeable { +public class RSocketMagixEndpoint(private val rSocket: RSocket) : MagixEndpoint, Closeable { override fun subscribe( filter: MagixMessageFilter, ): Flow { - val payload = buildPayload { data(MagixEndpoint.magixJson.encodeToString(MagixMessageFilter.serializer(), filter)) } + val payload = buildPayload { + data(MagixEndpoint.magixJson.encodeToString(MagixMessageFilter.serializer(), filter)) + } val flow = rSocket.requestStream(payload) return flow.map { MagixEndpoint.magixJson.decodeFromString(MagixMessage.serializer(), it.data.readText()) - }.filter(filter).flowOn(coroutineContext[CoroutineDispatcher] ?: Dispatchers.Unconfined) + }.filter(filter).flowOn(rSocket.coroutineContext[CoroutineDispatcher] ?: Dispatchers.Unconfined) } override suspend fun broadcast(message: MagixMessage): Unit = withContext(coroutineContext) { @@ -80,5 +78,5 @@ public suspend fun MagixEndpoint.Companion.rSocketWithWebSockets( client.close() } - return RSocketMagixEndpoint(rSocket, coroutineContext) + return RSocketMagixEndpoint(rSocket) } \ No newline at end of file diff --git a/magix/magix-rsocket/src/commonMain/kotlin/space/kscience/magix/rsocket/RSocketStreamMagixEndpoint.kt b/magix/magix-rsocket/src/commonMain/kotlin/space/kscience/magix/rsocket/RSocketStreamMagixEndpoint.kt index 1875602..b6fd428 100644 --- a/magix/magix-rsocket/src/commonMain/kotlin/space/kscience/magix/rsocket/RSocketStreamMagixEndpoint.kt +++ b/magix/magix-rsocket/src/commonMain/kotlin/space/kscience/magix/rsocket/RSocketStreamMagixEndpoint.kt @@ -26,7 +26,7 @@ import kotlin.coroutines.CoroutineContext import kotlin.coroutines.coroutineContext /** - * RSocket endpoint based on established channel. This way it works a bit faster than [RSocketMagixEndpoint] + * RSocket endpoint based on an established channel. This way it works a bit faster than [RSocketMagixEndpoint] * for sending and receiving, but less flexible in terms of filters. One general [streamFilter] could be set * in constructor and applied on the loop side. Filters in [subscribe] are applied on the endpoint side on top * of received data. diff --git a/magix/magix-rsocket/src/jvmMain/kotlin/space/kscience/magix/rsocket/withTcp.kt b/magix/magix-rsocket/src/jvmMain/kotlin/space/kscience/magix/rsocket/withTcp.kt index 23fe0be..211ce68 100644 --- a/magix/magix-rsocket/src/jvmMain/kotlin/space/kscience/magix/rsocket/withTcp.kt +++ b/magix/magix-rsocket/src/jvmMain/kotlin/space/kscience/magix/rsocket/withTcp.kt @@ -23,7 +23,7 @@ public suspend fun MagixEndpoint.Companion.rSocketWithTcp( ) val rSocket = buildConnector(rSocketConfig).connect(transport) - return RSocketMagixEndpoint(rSocket, coroutineContext) + return RSocketMagixEndpoint(rSocket) } diff --git a/magix/magix-server/src/main/kotlin/space/kscience/magix/server/RSocketMagixFlowPlugin.kt b/magix/magix-server/src/main/kotlin/space/kscience/magix/server/RSocketMagixFlowPlugin.kt index a2f875d..7d5a34e 100644 --- a/magix/magix-server/src/main/kotlin/space/kscience/magix/server/RSocketMagixFlowPlugin.kt +++ b/magix/magix-server/src/main/kotlin/space/kscience/magix/server/RSocketMagixFlowPlugin.kt @@ -49,7 +49,7 @@ public class RSocketMagixFlowPlugin(public val port: Int = DEFAULT_MAGIX_RAW_POR val message = MagixEndpoint.magixJson.decodeFromString(MagixMessage.serializer(), request.data.readText()) magixFlow.emit(message) } - // bi-directional connection + // bidirectional connection, not covered by a standard requestChannel { request: Payload, input: Flow -> input.onEach { magixFlow.emit(MagixEndpoint.magixJson.decodeFromString(MagixMessage.serializer(), it.data.readText())) diff --git a/settings.gradle.kts b/settings.gradle.kts index a255044..21b2467 100644 --- a/settings.gradle.kts +++ b/settings.gradle.kts @@ -62,6 +62,7 @@ include( ":magix:magix-storage:magix-storage-xodus", ":controls-magix-client", ":demo:all-things", + ":demo:many-devices", ":demo:magix-demo", ":demo:car", ":demo:motors",