Refactor load test

This commit is contained in:
Alexander Nozik 2024-03-25 15:48:23 +03:00
parent 78dade4b49
commit d91296c47d
6 changed files with 91 additions and 54 deletions

View File

@ -80,7 +80,7 @@ public abstract class AbstractPort(
/**
* Raw flow of incoming data chunks. The chunks are not guaranteed to be complete phrases.
* In order to form phrases, some condition should be used on top of it.
* To form phrases, some condition should be used on top of it.
* For example [stringsDelimitedIncoming] generates phrases with fixed delimiter.
*/
override fun receiving(): Flow<ByteArray> = incoming.receiveAsFlow()

View File

@ -42,6 +42,31 @@ public fun Flow<ByteArray>.withDelimiter(delimiter: ByteArray): Flow<ByteArray>
}
}
private fun Flow<ByteArray>.withFixedMessageSize(messageSize: Int): Flow<ByteArray> {
require(messageSize > 0) { "Message size should be positive" }
val output = Buffer()
onCompletion {
output.close()
}
return transform { chunk ->
val remaining: Int = (messageSize - output.size).toInt()
if (chunk.size >= remaining) {
output.write(chunk, endIndex = remaining)
emit(output.readByteArray())
output.clear()
//write the remaining chunk fragment
if(chunk.size> remaining) {
output.write(chunk, startIndex = remaining)
}
} else {
output.write(chunk)
}
}
}
/**
* Transform byte fragments into utf-8 phrases using utf-8 delimiter
*/

View File

@ -12,6 +12,8 @@ import space.kscience.controls.manager.respondHubMessage
import space.kscience.dataforge.context.error
import space.kscience.dataforge.context.logger
import space.kscience.magix.api.*
import kotlin.coroutines.CoroutineContext
import kotlin.coroutines.EmptyCoroutineContext
internal val controlsMagixFormat: MagixFormat<DeviceMessage> = MagixFormat(
@ -38,7 +40,8 @@ internal fun generateId(request: MagixMessage): String = if (request.id != null)
public fun DeviceManager.launchMagixService(
endpoint: MagixEndpoint,
endpointID: String = controlsMagixFormat.defaultFormat,
): Job = context.launch {
coroutineContext: CoroutineContext = EmptyCoroutineContext,
): Job = context.launch(coroutineContext) {
endpoint.subscribe(controlsMagixFormat, targetFilter = listOf(endpointID, null)).onEach { (request, payload) ->
val responsePayload = respondHubMessage(payload)
responsePayload.forEach {

View File

@ -19,7 +19,7 @@ dependencies {
implementation(projects.magix.magixZmq)
implementation("io.ktor:ktor-client-cio:$ktorVersion")
implementation("space.kscience:plotlykt-server:0.6.0")
implementation("space.kscience:plotlykt-server:0.7.1")
implementation(spclibs.logback.classic)
}

View File

@ -1,8 +1,11 @@
package space.kscience.controls.demo
import kotlinx.coroutines.*
import kotlinx.coroutines.Dispatchers
import kotlinx.coroutines.delay
import kotlinx.coroutines.flow.launchIn
import kotlinx.coroutines.flow.onEach
import kotlinx.coroutines.isActive
import kotlinx.coroutines.launch
import kotlinx.coroutines.sync.Mutex
import kotlinx.coroutines.sync.withLock
import kotlinx.datetime.Clock
@ -19,15 +22,19 @@ import space.kscience.dataforge.meta.get
import space.kscience.dataforge.meta.int
import space.kscience.magix.api.MagixEndpoint
import space.kscience.magix.api.subscribe
import space.kscience.magix.rsocket.rSocketWithTcp
import space.kscience.magix.rsocket.rSocketWithWebSockets
import space.kscience.magix.rsocket.rSocketStreamWithWebSockets
import space.kscience.magix.server.RSocketMagixFlowPlugin
import space.kscience.magix.server.startMagixServer
import space.kscience.plotly.*
import space.kscience.plotly.Plotly
import space.kscience.plotly.PlotlyConfig
import space.kscience.plotly.layout
import space.kscience.plotly.models.Bar
import space.kscience.plotly.plot
import space.kscience.plotly.server.PlotlyUpdateMode
import space.kscience.plotly.server.serve
import space.kscience.plotly.server.show
import space.kscince.magix.zmq.ZmqMagixFlowPlugin
import space.kscince.magix.zmq.zmq
import kotlin.random.Random
import kotlin.time.Duration
import kotlin.time.Duration.Companion.ZERO
@ -46,14 +53,13 @@ class MassDevice(context: Context, meta: Meta) : DeviceBySpec<MassDevice>(MassDe
val value by doubleProperty { randomValue }
override suspend fun MassDevice.onOpen() {
doRecurring((meta["delay"].int ?: 10).milliseconds) {
doRecurring((meta["delay"].int ?: 5).milliseconds) {
read(value)
}
}
}
}
@OptIn(DelicateCoroutinesApi::class)
suspend fun main() {
val context = Context("Mass")
@ -62,28 +68,60 @@ suspend fun main() {
ZmqMagixFlowPlugin()
)
val numDevices = 100
val numDevices = 50
repeat(numDevices) {
context.launch(newFixedThreadPoolContext(2, "Device${it}")) {
delay(1)
val deviceContext = Context("Device${it}") {
plugin(DeviceManager)
delay(1)
val deviceContext = Context("Device${it}") {
plugin(DeviceManager)
}
val deviceManager = deviceContext.request(DeviceManager)
deviceManager.install("device$it", MassDevice)
val endpointId = "device$it"
val deviceEndpoint = MagixEndpoint.rSocketStreamWithWebSockets("localhost")
deviceManager.launchMagixService(deviceEndpoint, endpointId, Dispatchers.IO)
}
val trace = Bar {
context.launch(Dispatchers.IO) {
val monitorEndpoint = MagixEndpoint.zmq("localhost")
val mutex = Mutex()
val latest = HashMap<String, Duration>()
val max = HashMap<String, Duration>()
monitorEndpoint.subscribe(DeviceManager.magixFormat).onEach { (magixMessage, payload) ->
mutex.withLock {
val delay = Clock.System.now() - payload.time
latest[magixMessage.sourceEndpoint] = Clock.System.now() - payload.time
max[magixMessage.sourceEndpoint] =
maxOf(delay, max[magixMessage.sourceEndpoint] ?: ZERO)
}
}.launchIn(this)
while (isActive) {
delay(200)
mutex.withLock {
val sorted = max.mapKeys { it.key.substring(6).toInt() }.toSortedMap()
latest.clear()
max.clear()
x.numbers = sorted.keys
y.numbers = sorted.values.map { it.inWholeMicroseconds / 1000.0 + 0.0001 }
}
}
val deviceManager = deviceContext.request(DeviceManager)
deviceManager.install("device$it", MassDevice)
val endpointId = "device$it"
val deviceEndpoint = MagixEndpoint.rSocketWithTcp("localhost")
deviceManager.launchMagixService(deviceEndpoint, endpointId)
}
}
val application = Plotly.serve(port = 9091, scope = context) {
val application = Plotly.serve(port = 9091) {
updateMode = PlotlyUpdateMode.PUSH
updateInterval = 1000
page { container ->
plot(renderer = container, config = PlotlyConfig { saveAsSvg() }) {
layout {
@ -92,36 +130,7 @@ suspend fun main() {
xaxis.title = "Device number"
yaxis.title = "Maximum latency in ms"
}
bar {
launch(Dispatchers.IO) {
val monitorEndpoint = MagixEndpoint.rSocketWithWebSockets("localhost")
val mutex = Mutex()
val latest = HashMap<String, Duration>()
val max = HashMap<String, Duration>()
monitorEndpoint.subscribe(DeviceManager.magixFormat).onEach { (magixMessage, payload) ->
mutex.withLock {
val delay = Clock.System.now() - payload.time
latest[magixMessage.sourceEndpoint] = Clock.System.now() - payload.time
max[magixMessage.sourceEndpoint] =
maxOf(delay, max[magixMessage.sourceEndpoint] ?: ZERO)
}
}.launchIn(this)
while (isActive) {
delay(200)
mutex.withLock {
val sorted = max.mapKeys { it.key.substring(6).toInt() }.toSortedMap()
latest.clear()
max.clear()
x.numbers = sorted.keys
y.numbers = sorted.values.map { it.inWholeMicroseconds / 1000.0 + 0.0001 }
}
}
}
}
traces(trace)
}
}
}

View File

@ -1,5 +1,5 @@
distributionBase=GRADLE_USER_HOME
distributionPath=wrapper/dists
distributionUrl=https\://services.gradle.org/distributions/gradle-8.4-bin.zip
distributionUrl=https\://services.gradle.org/distributions/gradle-8.7-bin.zip
zipStoreBase=GRADLE_USER_HOME
zipStorePath=wrapper/dists