From e9bde6867466d4fcea57b20a11c46d25856abd60 Mon Sep 17 00:00:00 2001 From: Alexander Nozik Date: Sun, 9 Jun 2024 15:09:43 +0300 Subject: [PATCH] [WIP] remote communication for CollectiveDevice --- .../kscience/controls/peer/PeerConnection.kt | 7 +- .../controls/server/deviceWebServer.kt | 2 + demo/device-collective/build.gradle.kts | 2 + .../src/jvmMain/kotlin/CollectiveDevice.kt | 33 ++- .../jvmMain/kotlin/DeviceCollectiveModel.kt | 86 ++++++- .../src/jvmMain/kotlin/SampleDeviceState.kt | 19 +- .../src/jvmMain/kotlin/debugModel.kt | 13 +- .../src/jvmMain/kotlin/main.kt | 218 +++++++++++------- .../kscience/controls/demo/MassDevice.kt | 3 +- .../kscience/magix/api/MagixFlowPlugin.kt | 13 +- .../kscience/magix/server/magixModule.kt | 57 +++-- .../space/kscience/magix/server/server.kt | 2 +- 12 files changed, 312 insertions(+), 143 deletions(-) diff --git a/controls-core/src/commonMain/kotlin/space/kscience/controls/peer/PeerConnection.kt b/controls-core/src/commonMain/kotlin/space/kscience/controls/peer/PeerConnection.kt index 14b49ef..a831207 100644 --- a/controls-core/src/commonMain/kotlin/space/kscience/controls/peer/PeerConnection.kt +++ b/controls-core/src/commonMain/kotlin/space/kscience/controls/peer/PeerConnection.kt @@ -2,14 +2,13 @@ package space.kscience.controls.peer import space.kscience.dataforge.io.Envelope import space.kscience.dataforge.meta.Meta -import space.kscience.dataforge.names.Name /** * A manager that allows direct synchronous sending and receiving binary data */ public interface PeerConnection { /** - * Receive an [Envelope] from a device with name [deviceName] on a given [address] with given [contentId]. + * Receive an [Envelope] from a device on a given [address] with given [contentId]. * * The address depends on the specifics of given [PeerConnection]. For example, it could be a TCP/IP port or * magix endpoint name. @@ -20,13 +19,12 @@ public interface PeerConnection { */ public suspend fun receive( address: String, - deviceName: Name, contentId: String, requestMeta: Meta = Meta.EMPTY, ): Envelope /** - * Send an [envelope] to a device with name [deviceName] on a given [address] + * Send an [envelope] to a device on a given [address] * * The address depends on the specifics of given [PeerConnection]. For example, it could be a TCP/IP port or * magix endpoint name. @@ -35,7 +33,6 @@ public interface PeerConnection { */ public suspend fun send( address: String, - deviceName: Name, envelope: Envelope, requestMeta: Meta = Meta.EMPTY, ) diff --git a/controls-server/src/jvmMain/kotlin/space/kscience/controls/server/deviceWebServer.kt b/controls-server/src/jvmMain/kotlin/space/kscience/controls/server/deviceWebServer.kt index 4f37322..8fd104e 100644 --- a/controls-server/src/jvmMain/kotlin/space/kscience/controls/server/deviceWebServer.kt +++ b/controls-server/src/jvmMain/kotlin/space/kscience/controls/server/deviceWebServer.kt @@ -38,6 +38,7 @@ import space.kscience.dataforge.names.get import space.kscience.magix.api.MagixEndpoint import space.kscience.magix.api.MagixFlowPlugin import space.kscience.magix.api.MagixMessage +import space.kscience.magix.api.start import space.kscience.magix.server.magixModule @@ -215,5 +216,6 @@ public fun Application.deviceManagerModule( plugins.forEach { it.start(this, magixFlow) } + magixModule(magixFlow) } \ No newline at end of file diff --git a/demo/device-collective/build.gradle.kts b/demo/device-collective/build.gradle.kts index 8bb0597..3290a0d 100644 --- a/demo/device-collective/build.gradle.kts +++ b/demo/device-collective/build.gradle.kts @@ -13,7 +13,9 @@ kscience { commonMain { implementation(projects.controlsVisualisationCompose) implementation(projects.controlsConstructor) + implementation(projects.magix.magixServer) implementation(projects.magix.magixRsocket) + implementation(projects.controlsMagix) } jvmMain { // implementation("io.ktor:ktor-server-cio") diff --git a/demo/device-collective/src/jvmMain/kotlin/CollectiveDevice.kt b/demo/device-collective/src/jvmMain/kotlin/CollectiveDevice.kt index 050794c..9fce04d 100644 --- a/demo/device-collective/src/jvmMain/kotlin/CollectiveDevice.kt +++ b/demo/device-collective/src/jvmMain/kotlin/CollectiveDevice.kt @@ -6,24 +6,35 @@ import space.kscience.controls.api.Device import space.kscience.controls.constructor.DeviceConstructor import space.kscience.controls.constructor.MutableDeviceState import space.kscience.controls.constructor.registerAsProperty +import space.kscience.controls.peer.PeerConnection import space.kscience.controls.spec.DeviceSpec +import space.kscience.controls.spec.unit import space.kscience.dataforge.context.Context import space.kscience.dataforge.meta.MetaConverter import space.kscience.dataforge.meta.Scheme +import space.kscience.dataforge.meta.int import space.kscience.dataforge.meta.string import space.kscience.dataforge.misc.DFExperimental import space.kscience.maps.coordinates.Gmc +import space.kscience.maps.coordinates.GmcCurve import kotlin.time.Duration.Companion.milliseconds -class CollectiveDeviceConfiguration(deviceId: DeviceId) : Scheme() { +typealias CollectiveDeviceId = String + +class CollectiveDeviceConfiguration(deviceId: CollectiveDeviceId) : Scheme() { var deviceId by string(deviceId) var description by string() + var reportInterval by int(500) + var radioFrequency by string(default = "169 MHz") } +typealias CollectiveDeviceRoster = Map interface CollectiveDevice : Device { - public val id: DeviceId + public val id: CollectiveDeviceId + + public val peerConnection: PeerConnection suspend fun getPosition(): Gmc @@ -31,8 +42,7 @@ interface CollectiveDevice : Device { suspend fun setVelocity(value: GmcVelocity) - suspend fun listVisible(): Collection - + suspend fun listVisible(): Collection companion object : DeviceSpec() { val position by property( @@ -45,6 +55,10 @@ interface CollectiveDevice : Device { read = { getVelocity() }, write = { _, value -> setVelocity(value) } ) + + val listVisible by action(MetaConverter.unit, MetaConverter.valueList { it.string }) { + listVisible().toList() + } } } @@ -54,13 +68,14 @@ class CollectiveDeviceConstructor( val configuration: CollectiveDeviceConfiguration, position: MutableDeviceState, velocity: MutableDeviceState, - private val listVisible: suspend () -> Collection, + override val peerConnection: PeerConnection, + private val observation: suspend () -> Map, ) : DeviceConstructor(context, configuration.meta), CollectiveDevice { - override val id: DeviceId get() = configuration.deviceId + override val id: CollectiveDeviceId get() = configuration.deviceId - val position = registerAsProperty(CollectiveDevice.position, position.sample(500.milliseconds)) - val velocity = registerAsProperty(CollectiveDevice.velocity, velocity) + val position = registerAsProperty(CollectiveDevice.position, position.sample(configuration.reportInterval.milliseconds)) + val velocity = registerAsProperty(CollectiveDevice.velocity, velocity.sample(configuration.reportInterval.milliseconds)) override suspend fun getPosition(): Gmc = position.value @@ -70,5 +85,5 @@ class CollectiveDeviceConstructor( velocity.value = value } - override suspend fun listVisible(): Collection = listVisible.invoke() + override suspend fun listVisible(): Collection = observation.invoke().keys } \ No newline at end of file diff --git a/demo/device-collective/src/jvmMain/kotlin/DeviceCollectiveModel.kt b/demo/device-collective/src/jvmMain/kotlin/DeviceCollectiveModel.kt index 4abedea..2c325b4 100644 --- a/demo/device-collective/src/jvmMain/kotlin/DeviceCollectiveModel.kt +++ b/demo/device-collective/src/jvmMain/kotlin/DeviceCollectiveModel.kt @@ -1,27 +1,41 @@ package space.kscience.controls.demo.collective +import kotlinx.coroutines.CoroutineScope +import kotlinx.coroutines.Dispatchers +import kotlinx.coroutines.Job +import kotlinx.coroutines.launch +import space.kscience.controls.client.launchMagixService import space.kscience.controls.constructor.ModelConstructor import space.kscience.controls.constructor.MutableDeviceState import space.kscience.controls.constructor.onTimer +import space.kscience.controls.manager.DeviceManager +import space.kscience.controls.manager.install +import space.kscience.controls.peer.PeerConnection import space.kscience.dataforge.context.Context +import space.kscience.dataforge.context.request +import space.kscience.dataforge.io.Envelope +import space.kscience.dataforge.meta.Meta +import space.kscience.dataforge.names.parseAsName +import space.kscience.magix.api.MagixEndpoint +import space.kscience.magix.rsocket.rSocketWithWebSockets +import space.kscience.magix.server.startMagixServer import space.kscience.maps.coordinates.* +import kotlin.time.Duration +import kotlin.time.Duration.Companion.milliseconds -typealias DeviceId = String - - -internal data class VirtualDeviceState( - val id: DeviceId, +internal data class CollectiveDeviceState( + val id: CollectiveDeviceId, val configuration: CollectiveDeviceConfiguration, val position: MutableDeviceState, val velocity: MutableDeviceState, ) internal fun VirtualDeviceState( - id: DeviceId, + id: CollectiveDeviceId, position: Gmc, configuration: CollectiveDeviceConfiguration.() -> Unit = {}, -) = VirtualDeviceState( +) = CollectiveDeviceState( id, CollectiveDeviceConfiguration(id).apply(configuration), MutableDeviceState(position), @@ -31,9 +45,11 @@ internal fun VirtualDeviceState( internal class DeviceCollectiveModel( context: Context, - val deviceStates: Collection, - val visibilityRange: Distance, -) : ModelConstructor(context) { + val deviceStates: Collection, + val visibilityRange: Distance = 0.4.kilometers, + val radioRange: Distance = 5.kilometers, + val reportInterval: Duration = 1000.milliseconds +) : ModelConstructor(context), PeerConnection { /** * Propagate movement @@ -45,7 +61,7 @@ internal class DeviceCollectiveModel( } } - suspend fun locateVisible(id: DeviceId): Map { + private fun locateVisible(id: CollectiveDeviceId): Map { val coordinatesSnapshot = deviceStates.associate { it.id to it.position.value } val selected = coordinatesSnapshot[id] ?: error("Can't find device with id $id") @@ -56,4 +72,52 @@ internal class DeviceCollectiveModel( return allCurves.filterValues { it.distance in 0.kilometers..visibilityRange } } + + val devices = deviceStates.associate { + val device = CollectiveDeviceConstructor( + context = context, + configuration = it.configuration, + position = it.position, + velocity = it.velocity, + peerConnection = this, + ) { + locateVisible(it.id) + } + //start movement program + device.moveInCircles() + it.id to device + } + + val roster = deviceStates.associate { it.id to it.configuration } + + override suspend fun receive(address: String, contentId: String, requestMeta: Meta): Envelope { + TODO("Not yet implemented") + } + + override suspend fun send(address: String, envelope: Envelope, requestMeta: Meta) { +// devices.values.filter { it.configuration.radioFrequency == address }.forEach { device -> +// ``` +// } + } +} + +internal fun CoroutineScope.launchCollectiveMagixServer( + collectiveModel: DeviceCollectiveModel, +): Job = launch(Dispatchers.IO) { + val server = startMagixServer( +// RSocketMagixFlowPlugin() + ) + + collectiveModel.devices.forEach { (id, device) -> + val deviceContext = collectiveModel.context.buildContext(id.parseAsName()) { + coroutineContext(coroutineContext) + plugin(DeviceManager) + } + + deviceContext.install(id, device) + + val deviceEndpoint = MagixEndpoint.rSocketWithWebSockets("localhost") + + deviceContext.request(DeviceManager).launchMagixService(deviceEndpoint, id) + } } \ No newline at end of file diff --git a/demo/device-collective/src/jvmMain/kotlin/SampleDeviceState.kt b/demo/device-collective/src/jvmMain/kotlin/SampleDeviceState.kt index a2ef06c..499c903 100644 --- a/demo/device-collective/src/jvmMain/kotlin/SampleDeviceState.kt +++ b/demo/device-collective/src/jvmMain/kotlin/SampleDeviceState.kt @@ -4,6 +4,7 @@ import kotlinx.coroutines.FlowPreview import kotlinx.coroutines.flow.Flow import kotlinx.coroutines.flow.sample import space.kscience.controls.constructor.DeviceState +import space.kscience.controls.constructor.MutableDeviceState import kotlin.time.Duration @OptIn(FlowPreview::class) @@ -11,10 +12,24 @@ class SampleDeviceState( val origin: DeviceState, val interval: Duration, ) : DeviceState { - override val value: T get() = origin.value + override val value: T by origin::value override val valueFlow: Flow get() = origin.valueFlow.sample(interval) override fun toString(): String = "DebounceDeviceState($value, interval=$interval)" } -fun DeviceState.sample(interval: Duration) = SampleDeviceState(this, interval) \ No newline at end of file + +fun DeviceState.sample(interval: Duration) = SampleDeviceState(this, interval) + +@OptIn(FlowPreview::class) +class MutableSampleDeviceState( + val origin: MutableDeviceState, + val interval: Duration, +) : MutableDeviceState { + override var value: T by origin::value + override val valueFlow: Flow get() = origin.valueFlow.sample(interval) + + override fun toString(): String = "DebounceDeviceState($value, interval=$interval)" +} + +fun MutableDeviceState.sample(interval: Duration) = MutableSampleDeviceState(this, interval) \ No newline at end of file diff --git a/demo/device-collective/src/jvmMain/kotlin/debugModel.kt b/demo/device-collective/src/jvmMain/kotlin/debugModel.kt index 96c9ca8..d0ec384 100644 --- a/demo/device-collective/src/jvmMain/kotlin/debugModel.kt +++ b/demo/device-collective/src/jvmMain/kotlin/debugModel.kt @@ -12,6 +12,8 @@ import space.kscience.maps.coordinates.Gmc import space.kscience.maps.coordinates.kilometers import kotlin.math.PI import kotlin.random.Random +import kotlin.time.Duration +import kotlin.time.Duration.Companion.milliseconds private val deviceVelocity = 0.1.kilometers @@ -19,8 +21,13 @@ private val center = Gmc.ofDegrees(55.925, 37.514) private val radius = 0.01.degrees -internal fun generateModel(context: Context): DeviceCollectiveModel { - val devices: List = List(100) { index -> +internal fun generateModel( + context: Context, + size: Int = 50, + reportInterval: Duration = 500.milliseconds, + additionalConfiguration: CollectiveDeviceConfiguration.() -> Unit = {}, +): DeviceCollectiveModel { + val devices: List = List(size) { index -> val id = "device[$index]" VirtualDeviceState( @@ -32,6 +39,8 @@ internal fun generateModel(context: Context): DeviceCollectiveModel { ) { deviceId = id description = "Virtual remote device $id" + this.reportInterval = reportInterval.inWholeMilliseconds.toInt() + additionalConfiguration() } } diff --git a/demo/device-collective/src/jvmMain/kotlin/main.kt b/demo/device-collective/src/jvmMain/kotlin/main.kt index 1db8621..9aff323 100644 --- a/demo/device-collective/src/jvmMain/kotlin/main.kt +++ b/demo/device-collective/src/jvmMain/kotlin/main.kt @@ -24,74 +24,95 @@ import androidx.compose.ui.window.Window import androidx.compose.ui.window.application import io.ktor.client.HttpClient import io.ktor.client.engine.cio.CIO +import kotlinx.coroutines.* import kotlinx.coroutines.flow.launchIn import kotlinx.coroutines.flow.onEach import org.jetbrains.compose.splitpane.ExperimentalSplitPaneApi import org.jetbrains.compose.splitpane.HorizontalSplitPane import org.jetbrains.compose.splitpane.rememberSplitPaneState +import space.kscience.controls.api.PropertyChangedMessage +import space.kscience.controls.client.* import space.kscience.controls.compose.conditional import space.kscience.controls.manager.DeviceManager -import space.kscience.controls.spec.useProperty import space.kscience.dataforge.context.Context -import space.kscience.dataforge.context.request +import space.kscience.dataforge.context.ContextBuilder +import space.kscience.dataforge.meta.MetaConverter +import space.kscience.dataforge.names.parseAsName +import space.kscience.magix.api.MagixEndpoint +import space.kscience.magix.api.subscribe +import space.kscience.magix.rsocket.rSocketWithWebSockets import space.kscience.maps.compose.MapView import space.kscience.maps.compose.OpenStreetMapTileProvider +import space.kscience.maps.coordinates.Gmc +import space.kscience.maps.coordinates.meters import space.kscience.maps.features.* import java.nio.file.Path +import kotlin.time.Duration.Companion.seconds @Composable -fun rememberDeviceManager(): DeviceManager = remember { - val context = Context { - plugin(DeviceManager) - } - - context.request(DeviceManager) +fun rememberContext(name: String, contextBuilder: ContextBuilder.() -> Unit = {}): Context = remember { + Context(name, contextBuilder) } @Composable fun App() { val scope = rememberCoroutineScope() - - val deviceManager = rememberDeviceManager() - - - val collectiveModel = remember { - generateModel(deviceManager.context) + val parentContext = rememberContext("Parent") { + plugin(DeviceManager) } - val devices: Map = remember { - collectiveModel.deviceStates.associate { - val device = CollectiveDeviceConstructor( - context = deviceManager.context, - configuration = it.configuration, - position = it.position, - velocity = it.velocity - ) { - collectiveModel.locateVisible(it.id).keys + val collectiveModel = remember { + generateModel(parentContext, 60) + } + + val roster = remember { + collectiveModel.roster + } + + val client = remember { CompletableDeferred() } + + val devices = remember { mutableStateMapOf() } + + + LaunchedEffect(collectiveModel) { + launchCollectiveMagixServer(collectiveModel) + withContext(Dispatchers.IO) { + val magixClient = MagixEndpoint.rSocketWithWebSockets("localhost") + + client.complete(magixClient) + + collectiveModel.roster.forEach { (id, config) -> + devices[id] = magixClient.remoteDevice(parentContext, "listener", id, id.parseAsName()) } - device.moveInCircles() - it.id to device + } + + } + + var selectedDeviceId by remember { mutableStateOf(null) } + + var currentPosition by remember { mutableStateOf(null) } + + LaunchedEffect(selectedDeviceId, devices) { + selectedDeviceId?.let { devices[it] }?.propertyFlow(CollectiveDevice.position)?.collect { + currentPosition = it } } - var selectedDeviceId by remember { mutableStateOf(null) } var showOnlyVisible by remember { mutableStateOf(false) } - val mapTileProvider = remember { - OpenStreetMapTileProvider( - client = HttpClient(CIO), - cacheDirectory = Path.of("mapCache") - ) - } - HorizontalSplitPane( splitPaneState = rememberSplitPaneState(0.9f) ) { first(400.dp) { MapView( - mapTileProvider = mapTileProvider, + mapTileProvider = remember { + OpenStreetMapTileProvider( + client = HttpClient(CIO), + cacheDirectory = Path.of("mapCache") + ) + }, config = ViewConfig() ) { collectiveModel.deviceStates.forEach { device -> @@ -103,65 +124,61 @@ fun App() { }.launchIn(scope) } - devices.forEach { (id, device) -> - device.useProperty(CollectiveDevice.position, scope = scope) { position -> + scope.launch { - val activeDevice = selectedDeviceId?.let { devices[it] } + client.await().subscribe(DeviceManager.magixFormat).onEach { (magixMessage, deviceMessage) -> + if (deviceMessage is PropertyChangedMessage && deviceMessage.property == "position") { + val id = magixMessage.sourceEndpoint + val position = MetaConverter.serializable().read(deviceMessage.value) + val activeDevice = selectedDeviceId?.let { devices[it] } - if (activeDevice == null || id == selectedDeviceId || !showOnlyVisible || id in activeDevice.listVisible()) { - rectangle( - position, - id = id, - size = if (selectedDeviceId == id) DpSize(10.dp, 10.dp) else DpSize(5.dp, 5.dp) - ).color(if (selectedDeviceId == id) Color.Magenta else Color.Blue) - .modifyAttribute(AlphaAttribute, if (selectedDeviceId == id) 1f else 0.5f) - .onClick { selectedDeviceId = id } - } else { - removeFeature(id) + suspend fun DeviceClient.idIsVisible() = try { + withTimeout(1.seconds) { + id in execute(CollectiveDevice.listVisible) + } + } catch (ex: Exception) { + ex.printStackTrace() + true + } + + if ( + activeDevice == null || + id == selectedDeviceId || + !showOnlyVisible || + activeDevice.idIsVisible() + ) { + rectangle( + position, + id = id, + size = if (selectedDeviceId == id) DpSize(10.dp, 10.dp) else DpSize(5.dp, 5.dp) + ).color(if (selectedDeviceId == id) Color.Magenta else Color.Blue) + .modifyAttribute(AlphaAttribute, if (selectedDeviceId == id) 1f else 0.5f) + .onClick { selectedDeviceId = id } + } else { + removeFeature(id) + } } + }.launchIn(scope) - } } } } second(200.dp) { - Column { - selectedDeviceId?.let { id -> - Column( - modifier = Modifier - .padding(8.dp) - .border(2.dp, Color.DarkGray) + + Column( + modifier = Modifier.verticalScroll(rememberScrollState()) + ) { + devices.forEach { (id, _) -> + Card( + elevation = 16.dp, + modifier = Modifier.padding(8.dp).onClick { + selectedDeviceId = id + }.conditional(id == selectedDeviceId) { + border(2.dp, Color.Blue) + } ) { - Card( - elevation = 16.dp, - ) { - Text( - text = "Выбран: $id", - fontSize = 16.sp, - fontWeight = FontWeight.Bold, - modifier = Modifier.padding(10.dp).fillMaxWidth() - ) - } - devices[id]?.let { - Text(it.meta.toString(), Modifier.padding(10.dp)) - } - Row(verticalAlignment = Alignment.CenterVertically, modifier = Modifier.padding(10.dp).fillMaxWidth()) { - Text("Показать только видимые") - Checkbox(showOnlyVisible, { showOnlyVisible = it }) - } - } - } - Column( - modifier = Modifier.verticalScroll(rememberScrollState()) - ) { - devices.forEach { (id, device) -> - Card( - elevation = 16.dp, - modifier = Modifier.padding(8.dp).onClick { - selectedDeviceId = id - }.conditional(id == selectedDeviceId) { - border(2.dp, Color.Blue) - } + Column( + modifier = Modifier.padding(8.dp) ) { Text( text = id, @@ -169,16 +186,49 @@ fun App() { fontWeight = FontWeight.Bold, modifier = Modifier.padding(10.dp).fillMaxWidth() ) + if (id == selectedDeviceId) { + roster[id]?.let { + Text("Meta:", color = Color.Blue, fontWeight = FontWeight.Bold) + Card(elevation = 16.dp, modifier = Modifier.fillMaxWidth().padding(8.dp)) { + Text(it.toString()) + } + } + + currentPosition?.let { currentPosition -> + Text("Широта: ${String.format("%.3f", currentPosition.latitude.toDegrees().value)}") + Text( + "Долгота: ${ + String.format( + "%.3f", + currentPosition.longitude.toDegrees().value + ) + }" + ) + currentPosition.elevation?.let { + Text("Высота: ${String.format("%.1f", it.meters)} м") + } + } + + Row( + verticalAlignment = Alignment.CenterVertically, + modifier = Modifier.fillMaxWidth() + ) { + Text("Показать только видимые") + Checkbox(showOnlyVisible, { showOnlyVisible = it }) + } + } } } } } + } } } fun main() = application { +// System.setProperty(IO_PARALLELISM_PROPERTY_NAME, 300.toString()) Window(onCloseRequest = ::exitApplication, title = "Maps-kt demo", icon = painterResource("SPC-logo.png")) { MaterialTheme { App() diff --git a/demo/many-devices/src/main/kotlin/space/kscience/controls/demo/MassDevice.kt b/demo/many-devices/src/main/kotlin/space/kscience/controls/demo/MassDevice.kt index 5eafabf..53c4c7f 100644 --- a/demo/many-devices/src/main/kotlin/space/kscience/controls/demo/MassDevice.kt +++ b/demo/many-devices/src/main/kotlin/space/kscience/controls/demo/MassDevice.kt @@ -83,8 +83,7 @@ suspend fun main() { val endpointId = "device$it" val deviceEndpoint = MagixEndpoint.rSocketStreamWithWebSockets("localhost") - deviceManager.launchMagixService(deviceEndpoint, endpointId, Dispatchers.IO) - + deviceManager.launchMagixService(deviceEndpoint, endpointId) } val trace = Bar { diff --git a/magix/magix-api/src/commonMain/kotlin/space/kscience/magix/api/MagixFlowPlugin.kt b/magix/magix-api/src/commonMain/kotlin/space/kscience/magix/api/MagixFlowPlugin.kt index 83c95cc..1a3dd75 100644 --- a/magix/magix-api/src/commonMain/kotlin/space/kscience/magix/api/MagixFlowPlugin.kt +++ b/magix/magix-api/src/commonMain/kotlin/space/kscience/magix/api/MagixFlowPlugin.kt @@ -21,9 +21,10 @@ public fun interface MagixFlowPlugin { sendMessage: suspend (MagixMessage) -> Unit, ): Job - /** - * Use the same [MutableSharedFlow] to send and receive messages. Could be a bottleneck in case of many plugins. - */ - public fun start(scope: CoroutineScope, magixFlow: MutableSharedFlow): Job = - start(scope, magixFlow) { magixFlow.emit(it) } -} \ No newline at end of file +} + +/** + * Use the same [MutableSharedFlow] to send and receive messages. Could be a bottleneck in case of many plugins. + */ +public fun MagixFlowPlugin.start(scope: CoroutineScope, magixFlow: MutableSharedFlow): Job = + start(scope, magixFlow) { magixFlow.emit(it) } \ No newline at end of file diff --git a/magix/magix-server/src/jvmMain/kotlin/space/kscience/magix/server/magixModule.kt b/magix/magix-server/src/jvmMain/kotlin/space/kscience/magix/server/magixModule.kt index dc197ad..a1edec5 100644 --- a/magix/magix-server/src/jvmMain/kotlin/space/kscience/magix/server/magixModule.kt +++ b/magix/magix-server/src/jvmMain/kotlin/space/kscience/magix/server/magixModule.kt @@ -10,7 +10,9 @@ import io.ktor.server.util.getValue import io.ktor.server.websocket.WebSockets import io.rsocket.kotlin.ktor.server.RSocketSupport import io.rsocket.kotlin.ktor.server.rSocket +import kotlinx.coroutines.flow.Flow import kotlinx.coroutines.flow.MutableSharedFlow +import kotlinx.coroutines.flow.SharedFlow import kotlinx.coroutines.flow.map import kotlinx.html.* import kotlinx.serialization.encodeToString @@ -42,7 +44,11 @@ private fun ApplicationCall.buildFilter(): MagixMessageFilter { /** * Attach magix http/sse and websocket-based rsocket event loop + statistics page to existing [MutableSharedFlow] */ -public fun Application.magixModule(magixFlow: MutableSharedFlow, route: String = "/") { +public fun Application.magixModule( + magixFlow: Flow, + send: suspend (MagixMessage) -> Unit, + route: String = "/", +) { if (pluginOrNull(WebSockets) == null) { install(WebSockets) } @@ -62,27 +68,31 @@ public fun Application.magixModule(magixFlow: MutableSharedFlow, r routing { route(route) { - install(ContentNegotiation){ + install(ContentNegotiation) { json() } - get("state") { - call.respondHtml { - head { - meta { - httpEquiv = "refresh" - content = "2" + if (magixFlow is SharedFlow) { + get("state") { + call.respondHtml { + head { + meta { + httpEquiv = "refresh" + content = "2" + } } - } - body { - h1 { +"Magix loop statistics" } - h2 { +"Number of subscribers: ${magixFlow.subscriptionCount.value}" } - h3 { +"Replay cache size: ${magixFlow.replayCache.size}" } - h3 { +"Replay cache:" } - ol { - magixFlow.replayCache.forEach { message -> - li { - code { - +magixJson.encodeToString(message) + body { + h1 { +"Magix loop statistics" } + if (magixFlow is MutableSharedFlow) { + h2 { +"Number of subscribers: ${magixFlow.subscriptionCount.value}" } + } + h3 { +"Replay cache size: ${magixFlow.replayCache.size}" } + h3 { +"Replay cache:" } + ol { + magixFlow.replayCache.forEach { message -> + li { + code { + +magixJson.encodeToString(message) + } } } } @@ -102,17 +112,22 @@ public fun Application.magixModule(magixFlow: MutableSharedFlow, r } post("broadcast") { val message = call.receive() - magixFlow.emit(message) + send(message) } //rSocket WS server. Filter from Payload rSocket( "rsocket", - acceptor = RSocketMagixFlowPlugin.acceptor(application, magixFlow) { magixFlow.emit(it) } + acceptor = RSocketMagixFlowPlugin.acceptor(application, magixFlow) { send(it) } ) } } } +public fun Application.magixModule( + magixFlow: MutableSharedFlow, + route: String = "/", +): Unit = magixModule(magixFlow, { magixFlow.emit(it) }, route) + /** * Create a new loop [MutableSharedFlow] with given [buffer] and setup magix module based on it */ diff --git a/magix/magix-server/src/jvmMain/kotlin/space/kscience/magix/server/server.kt b/magix/magix-server/src/jvmMain/kotlin/space/kscience/magix/server/server.kt index 2396e25..aa26bee 100644 --- a/magix/magix-server/src/jvmMain/kotlin/space/kscience/magix/server/server.kt +++ b/magix/magix-server/src/jvmMain/kotlin/space/kscience/magix/server/server.kt @@ -9,6 +9,7 @@ import kotlinx.coroutines.flow.MutableSharedFlow import space.kscience.magix.api.MagixEndpoint.Companion.DEFAULT_MAGIX_HTTP_PORT import space.kscience.magix.api.MagixFlowPlugin import space.kscience.magix.api.MagixMessage +import space.kscience.magix.api.start /** @@ -22,7 +23,6 @@ public fun CoroutineScope.startMagixServer( val magixFlow = MutableSharedFlow( replay = buffer, - extraBufferCapacity = buffer, onBufferOverflow = BufferOverflow.DROP_OLDEST )