diff --git a/controls-core/src/commonMain/kotlin/space/kscience/controls/spec/DeviceBase.kt b/controls-core/src/commonMain/kotlin/space/kscience/controls/spec/DeviceBase.kt index bafe688..0f5436b 100644 --- a/controls-core/src/commonMain/kotlin/space/kscience/controls/spec/DeviceBase.kt +++ b/controls-core/src/commonMain/kotlin/space/kscience/controls/spec/DeviceBase.kt @@ -108,6 +108,9 @@ public abstract class DeviceBase( return meta } + /** + * Read property if it exists and read correctly. Return null otherwise. + */ public suspend fun readPropertyOrNull(propertyName: String): Meta? { val spec = properties[propertyName] ?: return null val meta = spec.readMeta(self) ?: return null diff --git a/controls-core/src/commonMain/kotlin/space/kscience/controls/spec/DevicePropertySpec.kt b/controls-core/src/commonMain/kotlin/space/kscience/controls/spec/DevicePropertySpec.kt index ede312a..4feace7 100644 --- a/controls-core/src/commonMain/kotlin/space/kscience/controls/spec/DevicePropertySpec.kt +++ b/controls-core/src/commonMain/kotlin/space/kscience/controls/spec/DevicePropertySpec.kt @@ -1,5 +1,6 @@ package space.kscience.controls.spec +import kotlinx.coroutines.CoroutineScope import kotlinx.coroutines.Job import kotlinx.coroutines.flow.filter import kotlinx.coroutines.flow.filterIsInstance @@ -19,6 +20,9 @@ import space.kscience.dataforge.meta.transformations.MetaConverter @RequiresOptIn("This API should not be called outside of Device internals") public annotation class InternalDeviceAPI +/** + * Specification for a device read-only property + */ public interface DevicePropertySpec { /** * Property descriptor @@ -75,7 +79,7 @@ public interface DeviceActionSpec { public val DeviceActionSpec<*, *, *>.name: String get() = descriptor.name public suspend fun D.read(propertySpec: DevicePropertySpec): T = - propertySpec.converter.metaToObject(readProperty(propertySpec.name)) ?: error("Can't convert result from meta") + propertySpec.converter.metaToObject(readProperty(propertySpec.name)) ?: error("Property read result is not valid") /** * Read typed value and update/push event if needed. @@ -102,9 +106,8 @@ public operator fun D.set(propertySpec: WritableDevicePropertySp write(propertySpec, value) } - /** - * A type safe property change listener + * A type safe property change listener. Uses the device [CoroutineScope]. */ public fun Device.onPropertyChange( spec: DevicePropertySpec, @@ -123,5 +126,8 @@ public suspend fun D.invalidate(propertySpec: DevicePropertySpec D.execute(actionSpec: DeviceActionSpec, input: I? = null): O? = actionSpec.execute(this, input) \ No newline at end of file diff --git a/demo/all-things/src/main/kotlin/space/kscience/controls/demo/DemoControllerView.kt b/demo/all-things/src/main/kotlin/space/kscience/controls/demo/DemoControllerView.kt index 538570d..b464611 100644 --- a/demo/all-things/src/main/kotlin/space/kscience/controls/demo/DemoControllerView.kt +++ b/demo/all-things/src/main/kotlin/space/kscience/controls/demo/DemoControllerView.kt @@ -56,12 +56,12 @@ class DemoController : Controller(), ContextAware { RSocketMagixFlowPlugin(), //TCP rsocket support ZmqMagixFlowPlugin() //ZMQ support ) - //Launch device client and connect it to the server + //Launch a device client and connect it to the server val deviceEndpoint = MagixEndpoint.rSocketWithTcp("localhost") deviceManager.connectToMagix(deviceEndpoint) //connect visualization to a magix endpoint val visualEndpoint = MagixEndpoint.rSocketWithWebSockets("localhost") - visualizer = visualEndpoint.startDemoDeviceServer() + visualizer = startDemoDeviceServer(visualEndpoint) //serve devices as OPC-UA namespace opcUaServer.startup() diff --git a/demo/all-things/src/main/kotlin/space/kscience/controls/demo/DemoDevice.kt b/demo/all-things/src/main/kotlin/space/kscience/controls/demo/DemoDevice.kt index 1a8ad97..b576b9a 100644 --- a/demo/all-things/src/main/kotlin/space/kscience/controls/demo/DemoDevice.kt +++ b/demo/all-things/src/main/kotlin/space/kscience/controls/demo/DemoDevice.kt @@ -41,7 +41,7 @@ class DemoDevice(context: Context, meta: Meta) : DeviceBySpec(DemoDe val cos by doubleProperty { val time = Instant.now() - kotlin.math.cos(time.toEpochMilli().toDouble() / timeScaleState) * sinScaleState + kotlin.math.cos(time.toEpochMilli().toDouble() / timeScaleState) * cosScaleState } val coordinates by metaProperty( diff --git a/demo/all-things/src/main/kotlin/space/kscience/controls/demo/demoDeviceServer.kt b/demo/all-things/src/main/kotlin/space/kscience/controls/demo/demoDeviceServer.kt index 5b6c7cc..9ef69cf 100644 --- a/demo/all-things/src/main/kotlin/space/kscience/controls/demo/demoDeviceServer.kt +++ b/demo/all-things/src/main/kotlin/space/kscience/controls/demo/demoDeviceServer.kt @@ -1,27 +1,24 @@ package space.kscience.controls.demo -import io.ktor.server.application.install -import io.ktor.server.cio.CIO import io.ktor.server.engine.ApplicationEngine -import io.ktor.server.engine.embeddedServer -import io.ktor.server.plugins.cors.routing.CORS -import io.ktor.server.websocket.WebSockets -import io.rsocket.kotlin.ktor.server.RSocketSupport +import kotlinx.coroutines.CoroutineScope import kotlinx.coroutines.flow.* import kotlinx.coroutines.launch import kotlinx.html.div import kotlinx.html.link import space.kscience.controls.api.PropertyChangedMessage import space.kscience.controls.client.controlsMagixFormat -import space.kscience.dataforge.meta.Meta +import space.kscience.controls.spec.name import space.kscience.dataforge.meta.double +import space.kscience.dataforge.meta.get import space.kscience.magix.api.MagixEndpoint import space.kscience.magix.api.subscribe +import space.kscience.plotly.Plotly import space.kscience.plotly.layout import space.kscience.plotly.models.Trace import space.kscience.plotly.plot import space.kscience.plotly.server.PlotlyUpdateMode -import space.kscience.plotly.server.plotlyModule +import space.kscience.plotly.server.serve import space.kscience.plotly.trace import java.util.concurrent.ConcurrentLinkedQueue @@ -55,36 +52,26 @@ suspend fun Trace.updateXYFrom(flow: Flow>>) { } -@Suppress("ExtractKtorModule") -suspend fun MagixEndpoint.startDemoDeviceServer(): ApplicationEngine = embeddedServer(CIO, 9091) { - install(WebSockets) - install(RSocketSupport) +fun CoroutineScope.startDemoDeviceServer(magixEndpoint: MagixEndpoint): ApplicationEngine { + //share subscription to a parse message only once + val subscription = magixEndpoint.subscribe(controlsMagixFormat).shareIn(this, SharingStarted.Lazily) - install(CORS) { - anyHost() - } + val sinFlow = subscription.mapNotNull { (_, payload) -> + (payload as? PropertyChangedMessage)?.takeIf { it.property == DemoDevice.sin.name } + }.map { it.value } - val sinFlow = MutableSharedFlow()// = device.sin.flow() - val cosFlow = MutableSharedFlow()// = device.cos.flow() + val cosFlow = subscription.mapNotNull { (_, payload) -> + (payload as? PropertyChangedMessage)?.takeIf { it.property == DemoDevice.cos.name } + }.map { it.value } - launch { - subscribe(controlsMagixFormat).collect { (_, payload) -> - (payload as? PropertyChangedMessage)?.let { message -> - when (message.property) { - "sin" -> sinFlow.emit(message.value) - "cos" -> cosFlow.emit(message.value) - } - } - } - } + val sinCosFlow = subscription.mapNotNull { (_, payload) -> + (payload as? PropertyChangedMessage)?.takeIf { it.property == DemoDevice.coordinates.name } + }.map { it.value } - plotlyModule{ + return Plotly.serve(port = 9091, scope = this) { updateMode = PlotlyUpdateMode.PUSH - updateInterval = 50 + updateInterval = 100 page { container -> - val sinCosFlow = sinFlow.zip(cosFlow) { sin, cos -> - sin.double!! to cos.double!! - } link { rel = "stylesheet" href = "https://stackpath.bootstrapcdn.com/bootstrap/4.5.0/css/bootstrap.min.css" @@ -134,7 +121,9 @@ suspend fun MagixEndpoint.startDemoDeviceServer(): ApplicationEngine = embeddedS trace { name = "non-synchronized" launch { - val flow: Flow>> = sinCosFlow.windowed(30) + val flow: Flow>> = sinCosFlow.mapNotNull { + it["x"].double!! to it["y"].double!! + }.windowed(30) updateXYFrom(flow) } } @@ -144,5 +133,6 @@ suspend fun MagixEndpoint.startDemoDeviceServer(): ApplicationEngine = embeddedS } } -}.apply { start() } + +} diff --git a/magix/magix-api/src/commonMain/kotlin/space/kscience/magix/api/MagixEndpoint.kt b/magix/magix-api/src/commonMain/kotlin/space/kscience/magix/api/MagixEndpoint.kt index 9baafea..0a5abd3 100644 --- a/magix/magix-api/src/commonMain/kotlin/space/kscience/magix/api/MagixEndpoint.kt +++ b/magix/magix-api/src/commonMain/kotlin/space/kscience/magix/api/MagixEndpoint.kt @@ -9,7 +9,7 @@ import kotlinx.serialization.json.Json public interface MagixEndpoint { /** - * Subscribe to a [Flow] of messages + * Subscribe to a [Flow] of messages. Subscription is not guaranteed to be shared. */ public fun subscribe( filter: MagixMessageFilter = MagixMessageFilter.ALL, diff --git a/magix/magix-rsocket/src/commonMain/kotlin/space/kscience/magix/rsocket/RSocketMagixEndpoint.kt b/magix/magix-rsocket/src/commonMain/kotlin/space/kscience/magix/rsocket/RSocketMagixEndpoint.kt index 43c505c..f9690fc 100644 --- a/magix/magix-rsocket/src/commonMain/kotlin/space/kscience/magix/rsocket/RSocketMagixEndpoint.kt +++ b/magix/magix-rsocket/src/commonMain/kotlin/space/kscience/magix/rsocket/RSocketMagixEndpoint.kt @@ -75,7 +75,7 @@ public suspend fun MagixEndpoint.Companion.rSocketWithWebSockets( val rSocket = client.rSocket(host, port, path) - //Ensure client is closed after rSocket if finished + //Ensure the client is closed after rSocket if finished rSocket.coroutineContext[Job]?.invokeOnCompletion { client.close() }