Stress test demo

This commit is contained in:
Alexander Nozik 2023-05-08 15:39:34 +03:00
parent 53e506893f
commit 7103786ec9
15 changed files with 201 additions and 32 deletions

View File

@ -1,10 +1,12 @@
package space.kscience.controls.spec
import kotlinx.coroutines.Job
import kotlinx.coroutines.delay
import kotlinx.coroutines.flow.Flow
import kotlinx.coroutines.flow.flow
import kotlinx.coroutines.isActive
import kotlinx.coroutines.launch
import space.kscience.controls.api.Device
import kotlin.time.Duration
/**
@ -14,19 +16,23 @@ import kotlin.time.Duration
*
* The flow is canceled when the device scope is canceled
*/
public fun <D : DeviceBase<D>, R> D.readRecurring(interval: Duration, reader: suspend D.() -> R): Flow<R> = flow {
public fun <D : Device, R> D.readRecurring(interval: Duration, reader: suspend D.() -> R): Flow<R> = flow {
while (isActive) {
kotlinx.coroutines.delay(interval)
emit(reader())
delay(interval)
launch {
emit(reader())
}
}
}
/**
* Do a recurring (with a fixed delay) task on a device.
*/
public fun <D : DeviceBase<D>> D.doRecurring(interval: Duration, task: suspend D.() -> Unit): Job = launch {
public fun <D : Device> D.doRecurring(interval: Duration, task: suspend D.() -> Unit): Job = launch {
while (isActive) {
kotlinx.coroutines.delay(interval)
task()
delay(interval)
launch {
task()
}
}
}

View File

@ -14,9 +14,9 @@ kscience {
json()
}
dependencies {
implementation(projects.magix.magixApi)
implementation(projects.controlsCore)
implementation("com.benasher44:uuid:0.7.0")
api(projects.magix.magixApi)
api(projects.controlsCore)
api("com.benasher44:uuid:0.7.0")
}
}

View File

@ -26,6 +26,7 @@ public class DeviceClient(
private val send: suspend (DeviceMessage) -> Unit,
) : Device {
@OptIn(kotlinx.coroutines.ExperimentalCoroutinesApi::class)
override val coroutineContext: CoroutineContext = newCoroutineContext(context.coroutineContext)
private val mutex = Mutex()

View File

@ -32,7 +32,7 @@ public fun DeviceManager.connectToMagix(
endpoint: MagixEndpoint,
endpointID: String = controlsMagixFormat.defaultFormat,
): Job = context.launch {
endpoint.subscribe(controlsMagixFormat).onEach { (request, payload) ->
endpoint.subscribe(controlsMagixFormat, targetFilter = listOf(endpointID)).onEach { (request, payload) ->
val responsePayload = respondHubMessage(payload)
if (responsePayload != null) {
endpoint.broadcast(
@ -44,7 +44,7 @@ public fun DeviceManager.connectToMagix(
)
}
}.catch { error ->
logger.error(error) { "Error while responding to message" }
logger.error(error) { "Error while responding to message: ${error.message}" }
}.launchIn(this)
hubMessageFlow(this).onEach { payload ->
@ -55,7 +55,7 @@ public fun DeviceManager.connectToMagix(
id = "df[${payload.hashCode()}]"
)
}.catch { error ->
logger.error(error) { "Error while sending a message" }
logger.error(error) { "Error while sending a message: ${error.message}" }
}.launchIn(this)
}

View File

@ -66,7 +66,9 @@ internal val tangoMagixFormat = MagixFormat(
setOf("tango")
)
/**
* Controls-kt device binding for Tango-flavored magix loop
*/
public fun DeviceManager.launchTangoMagix(
endpoint: MagixEndpoint,
endpointID: String = TANGO_MAGIX_FORMAT,

View File

@ -0,0 +1,4 @@
# Module all-things

View File

@ -0,0 +1,39 @@
plugins {
kotlin("jvm")
application
}
repositories {
mavenCentral()
maven("https://repo.kotlin.link")
}
val ktorVersion: String by rootProject.extra
val rsocketVersion: String by rootProject.extra
dependencies {
implementation(projects.magix.magixServer)
implementation(projects.controlsMagixClient)
implementation(projects.magix.magixRsocket)
implementation("io.ktor:ktor-client-cio:$ktorVersion")
implementation("space.kscience:plotlykt-server:0.5.3")
implementation(spclibs.logback.classic)
}
kotlin{
jvmToolchain(11)
}
tasks.withType<org.jetbrains.kotlin.gradle.tasks.KotlinCompile>().configureEach {
kotlinOptions {
freeCompilerArgs = freeCompilerArgs + listOf("-Xjvm-default=all", "-Xopt-in=kotlin.RequiresOptIn")
}
}
application {
mainClass.set("space.kscience.controls.demo.DemoControllerViewKt")
}

View File

@ -0,0 +1,118 @@
package space.kscience.controls.demo
import kotlinx.coroutines.Dispatchers
import kotlinx.coroutines.delay
import kotlinx.coroutines.flow.launchIn
import kotlinx.coroutines.flow.onEach
import kotlinx.coroutines.isActive
import kotlinx.coroutines.launch
import kotlinx.datetime.Clock
import kotlinx.datetime.Instant
import space.kscience.controls.client.connectToMagix
import space.kscience.controls.client.controlsMagixFormat
import space.kscience.controls.manager.DeviceManager
import space.kscience.controls.manager.install
import space.kscience.controls.spec.*
import space.kscience.dataforge.context.Context
import space.kscience.dataforge.context.Factory
import space.kscience.dataforge.context.request
import space.kscience.dataforge.meta.Meta
import space.kscience.dataforge.meta.get
import space.kscience.dataforge.meta.int
import space.kscience.magix.api.MagixEndpoint
import space.kscience.magix.api.subscribe
import space.kscience.magix.rsocket.rSocketWithTcp
import space.kscience.magix.rsocket.rSocketWithWebSockets
import space.kscience.magix.server.RSocketMagixFlowPlugin
import space.kscience.magix.server.startMagixServer
import space.kscience.plotly.Plotly
import space.kscience.plotly.layout
import space.kscience.plotly.plot
import space.kscience.plotly.scatter
import space.kscience.plotly.server.PlotlyUpdateMode
import space.kscience.plotly.server.serve
import space.kscience.plotly.server.show
import java.util.concurrent.ConcurrentHashMap
import kotlin.random.Random
import kotlin.time.Duration.Companion.milliseconds
class MassDevice(context: Context, meta: Meta) : DeviceBySpec<MassDevice>(MassDevice, context, meta) {
private val rng = Random(meta["seed"].int ?: 0)
private val randomValue get() = rng.nextDouble()
companion object : DeviceSpec<MassDevice>(), Factory<MassDevice> {
override fun build(context: Context, meta: Meta): MassDevice = MassDevice(context, meta)
val value by doubleProperty { randomValue }
override suspend fun MassDevice.onOpen() {
doRecurring(200.milliseconds) {
read(value)
}
}
}
}
fun main() {
val context = Context("Mass")
context.startMagixServer(
RSocketMagixFlowPlugin()
)
val numDevices = 1000
repeat(numDevices) {
val deviceContext = Context("Device${it}") {
plugin(DeviceManager)
}
val deviceManager = deviceContext.request(DeviceManager)
deviceManager.install("device$it", MassDevice)
deviceContext.launch(Dispatchers.Default) {
val deviceEndpoint = MagixEndpoint.rSocketWithWebSockets("localhost")
deviceManager.connectToMagix(deviceEndpoint, "device$it")
}
}
val application = Plotly.serve(port = 9091, scope = context) {
updateMode = PlotlyUpdateMode.PUSH
updateInterval = 1000
page { container ->
plot(renderer = container) {
layout {
title = "Latest event"
}
scatter {
launch(Dispatchers.Default) {
val monitorEndpoint = MagixEndpoint.rSocketWithTcp("localhost")
val latest = ConcurrentHashMap<String, Instant>()
monitorEndpoint.subscribe(controlsMagixFormat).onEach { (magixMessage, payload) ->
latest[magixMessage.origin] = payload.time ?: Clock.System.now()
}.launchIn(this)
while (isActive) {
delay(1000)
val now = Clock.System.now()
x.strings = latest.keys
y.numbers = latest.values.map { now.minus(it).inWholeMilliseconds / 1000.0 }
}
}
}
}
}
}
application.show()
while (readlnOrNull().isNullOrBlank()) {
}
}

View File

@ -19,9 +19,7 @@ public interface MagixEndpoint {
/**
* Send an event
*/
public suspend fun broadcast(
message: MagixMessage,
)
public suspend fun broadcast(message: MagixMessage)
/**
* Close the endpoint and the associated connection if it exists

View File

@ -10,6 +10,12 @@ public data class MagixMessageFilter(
val origin: Collection<String?>? = null,
val target: Collection<String?>? = null,
) {
public fun accepts(message: MagixMessage): Boolean =
format?.contains(message.format) ?: true
&& origin?.contains(message.origin) ?: true
&& target?.contains(message.target) ?: true
public companion object {
public val ALL: MagixMessageFilter = MagixMessageFilter()
}
@ -22,9 +28,5 @@ public fun Flow<MagixMessage>.filter(filter: MagixMessageFilter): Flow<MagixMess
if (filter == MagixMessageFilter.ALL) {
return this
}
return filter { message ->
filter.format?.contains(message.format) ?: true
&& filter.origin?.contains(message.origin) ?: true
&& filter.target?.contains(message.target) ?: true
}
return filter(filter::accepts)
}

View File

@ -18,22 +18,20 @@ import space.kscience.magix.api.MagixEndpoint
import space.kscience.magix.api.MagixMessage
import space.kscience.magix.api.MagixMessageFilter
import space.kscience.magix.api.filter
import kotlin.coroutines.CoroutineContext
import kotlin.coroutines.coroutineContext
public class RSocketMagixEndpoint(
private val rSocket: RSocket,
private val coroutineContext: CoroutineContext,
) : MagixEndpoint, Closeable {
public class RSocketMagixEndpoint(private val rSocket: RSocket) : MagixEndpoint, Closeable {
override fun subscribe(
filter: MagixMessageFilter,
): Flow<MagixMessage> {
val payload = buildPayload { data(MagixEndpoint.magixJson.encodeToString(MagixMessageFilter.serializer(), filter)) }
val payload = buildPayload {
data(MagixEndpoint.magixJson.encodeToString(MagixMessageFilter.serializer(), filter))
}
val flow = rSocket.requestStream(payload)
return flow.map {
MagixEndpoint.magixJson.decodeFromString(MagixMessage.serializer(), it.data.readText())
}.filter(filter).flowOn(coroutineContext[CoroutineDispatcher] ?: Dispatchers.Unconfined)
}.filter(filter).flowOn(rSocket.coroutineContext[CoroutineDispatcher] ?: Dispatchers.Unconfined)
}
override suspend fun broadcast(message: MagixMessage): Unit = withContext(coroutineContext) {
@ -80,5 +78,5 @@ public suspend fun MagixEndpoint.Companion.rSocketWithWebSockets(
client.close()
}
return RSocketMagixEndpoint(rSocket, coroutineContext)
return RSocketMagixEndpoint(rSocket)
}

View File

@ -26,7 +26,7 @@ import kotlin.coroutines.CoroutineContext
import kotlin.coroutines.coroutineContext
/**
* RSocket endpoint based on established channel. This way it works a bit faster than [RSocketMagixEndpoint]
* RSocket endpoint based on an established channel. This way it works a bit faster than [RSocketMagixEndpoint]
* for sending and receiving, but less flexible in terms of filters. One general [streamFilter] could be set
* in constructor and applied on the loop side. Filters in [subscribe] are applied on the endpoint side on top
* of received data.

View File

@ -23,7 +23,7 @@ public suspend fun MagixEndpoint.Companion.rSocketWithTcp(
)
val rSocket = buildConnector(rSocketConfig).connect(transport)
return RSocketMagixEndpoint(rSocket, coroutineContext)
return RSocketMagixEndpoint(rSocket)
}

View File

@ -49,7 +49,7 @@ public class RSocketMagixFlowPlugin(public val port: Int = DEFAULT_MAGIX_RAW_POR
val message = MagixEndpoint.magixJson.decodeFromString(MagixMessage.serializer(), request.data.readText())
magixFlow.emit(message)
}
// bi-directional connection
// bidirectional connection, not covered by a standard
requestChannel { request: Payload, input: Flow<Payload> ->
input.onEach {
magixFlow.emit(MagixEndpoint.magixJson.decodeFromString(MagixMessage.serializer(), it.data.readText()))

View File

@ -62,6 +62,7 @@ include(
":magix:magix-storage:magix-storage-xodus",
":controls-magix-client",
":demo:all-things",
":demo:many-devices",
":demo:magix-demo",
":demo:car",
":demo:motors",