This commit is contained in:
Alexander Nozik 2023-05-07 18:03:46 +03:00
parent b66e23cca6
commit 4c33c16c94
7 changed files with 41 additions and 42 deletions

View File

@ -108,6 +108,9 @@ public abstract class DeviceBase<D : Device>(
return meta
}
/**
* Read property if it exists and read correctly. Return null otherwise.
*/
public suspend fun readPropertyOrNull(propertyName: String): Meta? {
val spec = properties[propertyName] ?: return null
val meta = spec.readMeta(self) ?: return null

View File

@ -1,5 +1,6 @@
package space.kscience.controls.spec
import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.Job
import kotlinx.coroutines.flow.filter
import kotlinx.coroutines.flow.filterIsInstance
@ -19,6 +20,9 @@ import space.kscience.dataforge.meta.transformations.MetaConverter
@RequiresOptIn("This API should not be called outside of Device internals")
public annotation class InternalDeviceAPI
/**
* Specification for a device read-only property
*/
public interface DevicePropertySpec<in D : Device, T> {
/**
* Property descriptor
@ -75,7 +79,7 @@ public interface DeviceActionSpec<in D : Device, I, O> {
public val DeviceActionSpec<*, *, *>.name: String get() = descriptor.name
public suspend fun <T, D : Device> D.read(propertySpec: DevicePropertySpec<D, T>): T =
propertySpec.converter.metaToObject(readProperty(propertySpec.name)) ?: error("Can't convert result from meta")
propertySpec.converter.metaToObject(readProperty(propertySpec.name)) ?: error("Property read result is not valid")
/**
* Read typed value and update/push event if needed.
@ -102,9 +106,8 @@ public operator fun <T, D : Device> D.set(propertySpec: WritableDevicePropertySp
write(propertySpec, value)
}
/**
* A type safe property change listener
* A type safe property change listener. Uses the device [CoroutineScope].
*/
public fun <D : Device, T> Device.onPropertyChange(
spec: DevicePropertySpec<D, T>,
@ -123,5 +126,8 @@ public suspend fun <D : Device> D.invalidate(propertySpec: DevicePropertySpec<D,
invalidate(propertySpec.name)
}
/**
* Execute the action with name according to [actionSpec]
*/
public suspend fun <I, O, D : Device> D.execute(actionSpec: DeviceActionSpec<D, I, O>, input: I? = null): O? =
actionSpec.execute(this, input)

View File

@ -56,12 +56,12 @@ class DemoController : Controller(), ContextAware {
RSocketMagixFlowPlugin(), //TCP rsocket support
ZmqMagixFlowPlugin() //ZMQ support
)
//Launch device client and connect it to the server
//Launch a device client and connect it to the server
val deviceEndpoint = MagixEndpoint.rSocketWithTcp("localhost")
deviceManager.connectToMagix(deviceEndpoint)
//connect visualization to a magix endpoint
val visualEndpoint = MagixEndpoint.rSocketWithWebSockets("localhost")
visualizer = visualEndpoint.startDemoDeviceServer()
visualizer = startDemoDeviceServer(visualEndpoint)
//serve devices as OPC-UA namespace
opcUaServer.startup()

View File

@ -41,7 +41,7 @@ class DemoDevice(context: Context, meta: Meta) : DeviceBySpec<DemoDevice>(DemoDe
val cos by doubleProperty {
val time = Instant.now()
kotlin.math.cos(time.toEpochMilli().toDouble() / timeScaleState) * sinScaleState
kotlin.math.cos(time.toEpochMilli().toDouble() / timeScaleState) * cosScaleState
}
val coordinates by metaProperty(

View File

@ -1,27 +1,24 @@
package space.kscience.controls.demo
import io.ktor.server.application.install
import io.ktor.server.cio.CIO
import io.ktor.server.engine.ApplicationEngine
import io.ktor.server.engine.embeddedServer
import io.ktor.server.plugins.cors.routing.CORS
import io.ktor.server.websocket.WebSockets
import io.rsocket.kotlin.ktor.server.RSocketSupport
import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.flow.*
import kotlinx.coroutines.launch
import kotlinx.html.div
import kotlinx.html.link
import space.kscience.controls.api.PropertyChangedMessage
import space.kscience.controls.client.controlsMagixFormat
import space.kscience.dataforge.meta.Meta
import space.kscience.controls.spec.name
import space.kscience.dataforge.meta.double
import space.kscience.dataforge.meta.get
import space.kscience.magix.api.MagixEndpoint
import space.kscience.magix.api.subscribe
import space.kscience.plotly.Plotly
import space.kscience.plotly.layout
import space.kscience.plotly.models.Trace
import space.kscience.plotly.plot
import space.kscience.plotly.server.PlotlyUpdateMode
import space.kscience.plotly.server.plotlyModule
import space.kscience.plotly.server.serve
import space.kscience.plotly.trace
import java.util.concurrent.ConcurrentLinkedQueue
@ -55,36 +52,26 @@ suspend fun Trace.updateXYFrom(flow: Flow<Iterable<Pair<Double, Double>>>) {
}
@Suppress("ExtractKtorModule")
suspend fun MagixEndpoint.startDemoDeviceServer(): ApplicationEngine = embeddedServer(CIO, 9091) {
install(WebSockets)
install(RSocketSupport)
fun CoroutineScope.startDemoDeviceServer(magixEndpoint: MagixEndpoint): ApplicationEngine {
//share subscription to a parse message only once
val subscription = magixEndpoint.subscribe(controlsMagixFormat).shareIn(this, SharingStarted.Lazily)
install(CORS) {
anyHost()
}
val sinFlow = subscription.mapNotNull { (_, payload) ->
(payload as? PropertyChangedMessage)?.takeIf { it.property == DemoDevice.sin.name }
}.map { it.value }
val sinFlow = MutableSharedFlow<Meta?>()// = device.sin.flow()
val cosFlow = MutableSharedFlow<Meta?>()// = device.cos.flow()
val cosFlow = subscription.mapNotNull { (_, payload) ->
(payload as? PropertyChangedMessage)?.takeIf { it.property == DemoDevice.cos.name }
}.map { it.value }
launch {
subscribe(controlsMagixFormat).collect { (_, payload) ->
(payload as? PropertyChangedMessage)?.let { message ->
when (message.property) {
"sin" -> sinFlow.emit(message.value)
"cos" -> cosFlow.emit(message.value)
}
}
}
}
val sinCosFlow = subscription.mapNotNull { (_, payload) ->
(payload as? PropertyChangedMessage)?.takeIf { it.property == DemoDevice.coordinates.name }
}.map { it.value }
plotlyModule{
return Plotly.serve(port = 9091, scope = this) {
updateMode = PlotlyUpdateMode.PUSH
updateInterval = 50
updateInterval = 100
page { container ->
val sinCosFlow = sinFlow.zip(cosFlow) { sin, cos ->
sin.double!! to cos.double!!
}
link {
rel = "stylesheet"
href = "https://stackpath.bootstrapcdn.com/bootstrap/4.5.0/css/bootstrap.min.css"
@ -134,7 +121,9 @@ suspend fun MagixEndpoint.startDemoDeviceServer(): ApplicationEngine = embeddedS
trace {
name = "non-synchronized"
launch {
val flow: Flow<Iterable<Pair<Double, Double>>> = sinCosFlow.windowed(30)
val flow: Flow<Iterable<Pair<Double, Double>>> = sinCosFlow.mapNotNull {
it["x"].double!! to it["y"].double!!
}.windowed(30)
updateXYFrom(flow)
}
}
@ -144,5 +133,6 @@ suspend fun MagixEndpoint.startDemoDeviceServer(): ApplicationEngine = embeddedS
}
}
}.apply { start() }
}

View File

@ -9,7 +9,7 @@ import kotlinx.serialization.json.Json
public interface MagixEndpoint {
/**
* Subscribe to a [Flow] of messages
* Subscribe to a [Flow] of messages. Subscription is not guaranteed to be shared.
*/
public fun subscribe(
filter: MagixMessageFilter = MagixMessageFilter.ALL,

View File

@ -75,7 +75,7 @@ public suspend fun MagixEndpoint.Companion.rSocketWithWebSockets(
val rSocket = client.rSocket(host, port, path)
//Ensure client is closed after rSocket if finished
//Ensure the client is closed after rSocket if finished
rSocket.coroutineContext[Job]?.invokeOnCompletion {
client.close()
}