From 8db593402116e7295df16b7ec633ade50322161d Mon Sep 17 00:00:00 2001 From: Alexander Nozik Date: Tue, 30 Jun 2020 22:03:56 +0300 Subject: [PATCH] Device web-server (untested) --- dataforge-control-core/build.gradle.kts | 2 + .../dataforge/control/api/ActionDescriptor.kt | 14 -- .../hep/dataforge/control/api/Device.kt | 7 +- ...rtyChangeListener.kt => DeviceListener.kt} | 3 +- .../control/api/PropertyDescriptor.kt | 14 -- .../hep/dataforge/control/api/descriptors.kt | 20 ++ .../hep/dataforge/control/base/Action.kt | 20 +- .../hep/dataforge/control/base/DeviceBase.kt | 6 +- .../control/base/IsolatedDeviceProperty.kt | 20 +- .../control/controlers/DeviceMessage.kt | 83 +++---- .../control/controlers/MessageController.kt | 135 ++++++----- .../control/controlers/MessageFlow.kt | 57 ----- .../control/controlers/PropertyFlow.kt | 4 +- dataforge-control-server/build.gradle.kts | 21 ++ .../control/server/deviceWebServer.kt | 217 ++++++++++++++++++ .../hep/dataforge/control/server/envelopes.kt | 27 +++ .../hep/dataforge/control/server/sse.kt | 41 ++++ demo/build.gradle.kts | 1 + .../hep/dataforge/control/demo/DemoDevice.kt | 3 +- settings.gradle.kts | 1 + 20 files changed, 485 insertions(+), 211 deletions(-) delete mode 100644 dataforge-control-core/src/commonMain/kotlin/hep/dataforge/control/api/ActionDescriptor.kt rename dataforge-control-core/src/commonMain/kotlin/hep/dataforge/control/api/{PropertyChangeListener.kt => DeviceListener.kt} (64%) delete mode 100644 dataforge-control-core/src/commonMain/kotlin/hep/dataforge/control/api/PropertyDescriptor.kt create mode 100644 dataforge-control-core/src/commonMain/kotlin/hep/dataforge/control/api/descriptors.kt delete mode 100644 dataforge-control-core/src/commonMain/kotlin/hep/dataforge/control/controlers/MessageFlow.kt create mode 100644 dataforge-control-server/build.gradle.kts create mode 100644 dataforge-control-server/src/main/kotlin/hep/dataforge/control/server/deviceWebServer.kt create mode 100644 dataforge-control-server/src/main/kotlin/hep/dataforge/control/server/envelopes.kt create mode 100644 dataforge-control-server/src/main/kotlin/hep/dataforge/control/server/sse.kt diff --git a/dataforge-control-core/build.gradle.kts b/dataforge-control-core/build.gradle.kts index 15f1f19..38e5a8e 100644 --- a/dataforge-control-core/build.gradle.kts +++ b/dataforge-control-core/build.gradle.kts @@ -1,4 +1,5 @@ import scientifik.useCoroutines +import scientifik.useSerialization plugins { id("scientifik.mpp") @@ -9,6 +10,7 @@ plugins { val dataforgeVersion: String by rootProject.extra useCoroutines(version = "1.3.7") +useSerialization() kotlin { sourceSets { diff --git a/dataforge-control-core/src/commonMain/kotlin/hep/dataforge/control/api/ActionDescriptor.kt b/dataforge-control-core/src/commonMain/kotlin/hep/dataforge/control/api/ActionDescriptor.kt deleted file mode 100644 index 17d817c..0000000 --- a/dataforge-control-core/src/commonMain/kotlin/hep/dataforge/control/api/ActionDescriptor.kt +++ /dev/null @@ -1,14 +0,0 @@ -package hep.dataforge.control.api - -import hep.dataforge.meta.Scheme -import hep.dataforge.meta.SchemeSpec - -/** - * A descriptor for property - */ -class ActionDescriptor : Scheme() { - //var name by string { error("Property name is mandatory") } - //var descriptor by spec(ItemDescriptor) - - companion object : SchemeSpec(::ActionDescriptor) -} \ No newline at end of file diff --git a/dataforge-control-core/src/commonMain/kotlin/hep/dataforge/control/api/Device.kt b/dataforge-control-core/src/commonMain/kotlin/hep/dataforge/control/api/Device.kt index dd37b36..0c5e2e8 100644 --- a/dataforge-control-core/src/commonMain/kotlin/hep/dataforge/control/api/Device.kt +++ b/dataforge-control-core/src/commonMain/kotlin/hep/dataforge/control/api/Device.kt @@ -27,7 +27,7 @@ interface Device: Closeable { * [owner] is provided optionally in order for listener to be * easily removable */ - fun registerListener(listener: PropertyChangeListener, owner: Any? = listener) + fun registerListener(listener: DeviceListener, owner: Any? = listener) /** * Remove all listeners belonging to specified owner @@ -61,9 +61,8 @@ interface Device: Closeable { } companion object { - const val GET_PROPERTY_ACTION = "@getProperty" - const val SET_PROPERTY_ACTION = "@setProperty" - const val CALL_ACTION ="@call" + const val GET_PROPERTY_ACTION = "@get" + const val SET_PROPERTY_ACTION = "@set" } } diff --git a/dataforge-control-core/src/commonMain/kotlin/hep/dataforge/control/api/PropertyChangeListener.kt b/dataforge-control-core/src/commonMain/kotlin/hep/dataforge/control/api/DeviceListener.kt similarity index 64% rename from dataforge-control-core/src/commonMain/kotlin/hep/dataforge/control/api/PropertyChangeListener.kt rename to dataforge-control-core/src/commonMain/kotlin/hep/dataforge/control/api/DeviceListener.kt index 25f3301..e0a0f59 100644 --- a/dataforge-control-core/src/commonMain/kotlin/hep/dataforge/control/api/PropertyChangeListener.kt +++ b/dataforge-control-core/src/commonMain/kotlin/hep/dataforge/control/api/DeviceListener.kt @@ -2,6 +2,7 @@ package hep.dataforge.control.api import hep.dataforge.meta.MetaItem -interface PropertyChangeListener { +interface DeviceListener { fun propertyChanged(propertyName: String, value: MetaItem<*>?) + //TODO add general message listener method } \ No newline at end of file diff --git a/dataforge-control-core/src/commonMain/kotlin/hep/dataforge/control/api/PropertyDescriptor.kt b/dataforge-control-core/src/commonMain/kotlin/hep/dataforge/control/api/PropertyDescriptor.kt deleted file mode 100644 index 2b7efd2..0000000 --- a/dataforge-control-core/src/commonMain/kotlin/hep/dataforge/control/api/PropertyDescriptor.kt +++ /dev/null @@ -1,14 +0,0 @@ -package hep.dataforge.control.api - -import hep.dataforge.meta.Scheme -import hep.dataforge.meta.SchemeSpec - -/** - * A descriptor for property - */ -class PropertyDescriptor : Scheme() { - //var name by string { error("Property name is mandatory") } - //var descriptor by spec(ItemDescriptor) - - companion object : SchemeSpec(::PropertyDescriptor) -} \ No newline at end of file diff --git a/dataforge-control-core/src/commonMain/kotlin/hep/dataforge/control/api/descriptors.kt b/dataforge-control-core/src/commonMain/kotlin/hep/dataforge/control/api/descriptors.kt new file mode 100644 index 0000000..f64d61c --- /dev/null +++ b/dataforge-control-core/src/commonMain/kotlin/hep/dataforge/control/api/descriptors.kt @@ -0,0 +1,20 @@ +package hep.dataforge.control.api + +import hep.dataforge.meta.Scheme +import hep.dataforge.meta.string + +/** + * A descriptor for property + */ +class PropertyDescriptor(name: String) : Scheme() { + val name by string(name) +} + +/** + * A descriptor for property + */ +class ActionDescriptor(name: String) : Scheme() { + val name by string(name) + //var descriptor by spec(ItemDescriptor) +} + diff --git a/dataforge-control-core/src/commonMain/kotlin/hep/dataforge/control/base/Action.kt b/dataforge-control-core/src/commonMain/kotlin/hep/dataforge/control/base/Action.kt index ebd1515..7b6099b 100644 --- a/dataforge-control-core/src/commonMain/kotlin/hep/dataforge/control/base/Action.kt +++ b/dataforge-control-core/src/commonMain/kotlin/hep/dataforge/control/base/Action.kt @@ -23,42 +23,42 @@ class SimpleAction( class ActionDelegate( val owner: D, - val descriptor: ActionDescriptor = ActionDescriptor.empty(), + val descriptorBuilder: ActionDescriptor.()->Unit = {}, val block: suspend (MetaItem<*>?) -> MetaItem<*>? ) : ReadOnlyProperty { override fun getValue(thisRef: D, property: KProperty<*>): Action { val name = property.name return owner.resolveAction(name) { - SimpleAction(name, descriptor, block) + SimpleAction(name, ActionDescriptor(name).apply(descriptorBuilder), block) } } } fun D.request( - descriptor: ActionDescriptor = ActionDescriptor.empty(), + descriptorBuilder: ActionDescriptor.()->Unit = {}, block: suspend (MetaItem<*>?) -> MetaItem<*>? -): ActionDelegate = ActionDelegate(this, descriptor, block) +): ActionDelegate = ActionDelegate(this, descriptorBuilder, block) fun D.requestValue( - descriptor: ActionDescriptor = ActionDescriptor.empty(), + descriptorBuilder: ActionDescriptor.()->Unit = {}, block: suspend (MetaItem<*>?) -> Any? -): ActionDelegate = ActionDelegate(this, descriptor){ +): ActionDelegate = ActionDelegate(this, descriptorBuilder){ val res = block(it) MetaItem.ValueItem(Value.of(res)) } fun D.requestMeta( - descriptor: ActionDescriptor = ActionDescriptor.empty(), + descriptorBuilder: ActionDescriptor.()->Unit = {}, block: suspend MetaBuilder.(MetaItem<*>?) -> Unit -): ActionDelegate = ActionDelegate(this, descriptor){ +): ActionDelegate = ActionDelegate(this, descriptorBuilder){ val res = MetaBuilder().apply { block(it)} MetaItem.NodeItem(res) } fun D.action( - descriptor: ActionDescriptor = ActionDescriptor.empty(), + descriptorBuilder: ActionDescriptor.()->Unit = {}, block: suspend (MetaItem<*>?) -> Unit -): ActionDelegate = ActionDelegate(this, descriptor) { +): ActionDelegate = ActionDelegate(this, descriptorBuilder) { block(it) null } \ No newline at end of file diff --git a/dataforge-control-core/src/commonMain/kotlin/hep/dataforge/control/base/DeviceBase.kt b/dataforge-control-core/src/commonMain/kotlin/hep/dataforge/control/base/DeviceBase.kt index 85f4fa4..c2fdc94 100644 --- a/dataforge-control-core/src/commonMain/kotlin/hep/dataforge/control/base/DeviceBase.kt +++ b/dataforge-control-core/src/commonMain/kotlin/hep/dataforge/control/base/DeviceBase.kt @@ -2,7 +2,7 @@ package hep.dataforge.control.base import hep.dataforge.control.api.ActionDescriptor import hep.dataforge.control.api.Device -import hep.dataforge.control.api.PropertyChangeListener +import hep.dataforge.control.api.DeviceListener import hep.dataforge.control.api.PropertyDescriptor import hep.dataforge.meta.MetaItem @@ -10,9 +10,9 @@ abstract class DeviceBase : Device { private val properties = HashMap() private val actions = HashMap() - private val listeners = ArrayList>(4) + private val listeners = ArrayList>(4) - override fun registerListener(listener: PropertyChangeListener, owner: Any?) { + override fun registerListener(listener: DeviceListener, owner: Any?) { listeners.add(owner to listener) } diff --git a/dataforge-control-core/src/commonMain/kotlin/hep/dataforge/control/base/IsolatedDeviceProperty.kt b/dataforge-control-core/src/commonMain/kotlin/hep/dataforge/control/base/IsolatedDeviceProperty.kt index 38c0359..e24ac17 100644 --- a/dataforge-control-core/src/commonMain/kotlin/hep/dataforge/control/base/IsolatedDeviceProperty.kt +++ b/dataforge-control-core/src/commonMain/kotlin/hep/dataforge/control/base/IsolatedDeviceProperty.kt @@ -65,7 +65,7 @@ open class IsolatedReadOnlyDeviceProperty( private class ReadOnlyDevicePropertyDelegate( val owner: D, val default: MetaItem<*>?, - val descriptor: PropertyDescriptor = PropertyDescriptor.empty(), + val descriptorBuilder: PropertyDescriptor.() -> Unit = {}, private val getter: suspend (MetaItem<*>?) -> MetaItem<*> ) : ReadOnlyProperty { @@ -77,7 +77,7 @@ private class ReadOnlyDevicePropertyDelegate( IsolatedReadOnlyDeviceProperty( name, default, - descriptor, + PropertyDescriptor(name).apply(descriptorBuilder), owner.scope, owner::propertyChanged, getter @@ -93,7 +93,7 @@ fun D.reading( ): ReadOnlyProperty = ReadOnlyDevicePropertyDelegate( this, default, - PropertyDescriptor.invoke(descriptorBuilder), + descriptorBuilder, getter ) @@ -104,7 +104,7 @@ fun D.readingValue( ): ReadOnlyProperty = ReadOnlyDevicePropertyDelegate( this, default?.let { MetaItem.ValueItem(it) }, - PropertyDescriptor.invoke(descriptorBuilder), + descriptorBuilder, getter = { MetaItem.ValueItem(Value.of(getter())) } ) @@ -115,7 +115,7 @@ fun D.readingNumber( ): ReadOnlyProperty = ReadOnlyDevicePropertyDelegate( this, default?.let { MetaItem.ValueItem(it.asValue()) }, - PropertyDescriptor.invoke(descriptorBuilder), + descriptorBuilder, getter = { val number = getter() MetaItem.ValueItem(number.asValue()) @@ -129,7 +129,7 @@ fun D.readingMeta( ): ReadOnlyProperty = ReadOnlyDevicePropertyDelegate( this, default?.let { MetaItem.NodeItem(it) }, - PropertyDescriptor.invoke(descriptorBuilder), + descriptorBuilder, getter = { MetaItem.NodeItem(MetaBuilder().apply { getter() }) } @@ -179,7 +179,7 @@ class IsolatedDeviceProperty( private class DevicePropertyDelegate( val owner: D, val default: MetaItem<*>?, - val descriptor: PropertyDescriptor = PropertyDescriptor.empty(), + val descriptorBuilder: PropertyDescriptor.() -> Unit = {}, private val getter: suspend (MetaItem<*>?) -> MetaItem<*>, private val setter: suspend (oldValue: MetaItem<*>?, newValue: MetaItem<*>) -> MetaItem<*>? ) : ReadOnlyProperty { @@ -191,7 +191,7 @@ private class DevicePropertyDelegate( IsolatedDeviceProperty( name, default, - descriptor, + PropertyDescriptor(name).apply(descriptorBuilder), owner.scope, owner::propertyChanged, getter, @@ -209,7 +209,7 @@ fun D.writing( ): ReadOnlyProperty = DevicePropertyDelegate( this, default, - PropertyDescriptor.invoke(descriptorBuilder), + descriptorBuilder, getter, setter ) @@ -250,7 +250,7 @@ fun D.writingDouble( return DevicePropertyDelegate( this, MetaItem.ValueItem(Double.NaN.asValue()), - PropertyDescriptor.invoke(descriptorBuilder), + descriptorBuilder, innerGetter, innerSetter ) diff --git a/dataforge-control-core/src/commonMain/kotlin/hep/dataforge/control/controlers/DeviceMessage.kt b/dataforge-control-core/src/commonMain/kotlin/hep/dataforge/control/controlers/DeviceMessage.kt index 53e7c74..0adf5ab 100644 --- a/dataforge-control-core/src/commonMain/kotlin/hep/dataforge/control/controlers/DeviceMessage.kt +++ b/dataforge-control-core/src/commonMain/kotlin/hep/dataforge/control/controlers/DeviceMessage.kt @@ -1,62 +1,67 @@ package hep.dataforge.control.controlers +import hep.dataforge.control.controlers.DeviceMessage.Companion.PAYLOAD_VALUE_KEY import hep.dataforge.meta.* import hep.dataforge.names.asName +import hep.dataforge.names.plus +import kotlinx.serialization.KSerializer +import kotlinx.serialization.Serializable -open class DeviceMessage : Scheme() { +class DeviceMessage : Scheme() { var id by item() var source by string()//TODO consider replacing by item var target by string() var comment by string() var action by string(key = MESSAGE_ACTION_KEY) var status by string(RESPONSE_OK_STATUS) - var value by item(key = MESSAGE_VALUE_KEY) + var payload by config(key = MESSAGE_PAYLOAD_KEY) - companion object : SchemeSpec(::DeviceMessage) { + var value by item(key = (MESSAGE_PAYLOAD_KEY + PAYLOAD_VALUE_KEY)) + + /** + * Set a payload for this message according to the given scheme + */ + inline fun payload(spec: Specification, block: T.() -> Unit): T = + (payload?.let { spec.wrap(it) } ?: spec.empty().also { payload = it.config }).apply(block) + + companion object : SchemeSpec(::DeviceMessage){ val MESSAGE_ACTION_KEY = "action".asName() - val MESSAGE_VALUE_KEY = "value".asName() + val MESSAGE_PAYLOAD_KEY = "payload".asName() + val PAYLOAD_VALUE_KEY = "value".asName() const val RESPONSE_OK_STATUS = "response.OK" const val RESPONSE_FAIL_STATUS = "response.FAIL" + const val PROPERTY_CHANGED_ACTION = "event.propertyChange" - fun ok(request: DeviceMessage? = null, block: DeviceMessage.() -> Unit = {}): DeviceMessage { - return DeviceMessage { - id = request?.id - }.apply(block) - } + inline fun ok( + request: DeviceMessage? = null, + block: DeviceMessage.() -> Unit = {} + ): DeviceMessage = DeviceMessage { + id = request?.id + }.apply(block) - fun fail(request: DeviceMessage? = null,block: DeviceMessage.() -> Unit = {}): DeviceMessage { - return DeviceMessage { - id = request?.id - status = RESPONSE_FAIL_STATUS - }.apply(block) - } + inline fun fail( + request: DeviceMessage? = null, + block: DeviceMessage.() -> Unit = {} + ): DeviceMessage = DeviceMessage { + id = request?.id + status = RESPONSE_FAIL_STATUS + }.apply(block) } } -class DevicePropertyMessage : DeviceMessage() { - //TODO add multiple properties in the same message - var property by spec(PropertyValue) +class PropertyPayload : Scheme() { + var name by string { error("Property name could not be empty") } + var value by item(key = PAYLOAD_VALUE_KEY) - fun property(builder: PropertyValue.() -> Unit) { - this.property = PropertyValue.invoke(builder) + companion object : SchemeSpec(::PropertyPayload) +} + +@DFBuilder +inline fun DeviceMessage.property(block: PropertyPayload.() -> Unit): PropertyPayload = payload(PropertyPayload, block) + +var DeviceMessage.property: PropertyPayload? + get() = payload?.let { PropertyPayload.wrap(it) } + set(value) { + payload = value?.config } - class PropertyValue : Scheme() { - var name by string { error("Property name not defined") } - var value by item() - - companion object : SchemeSpec(::PropertyValue) - } - - companion object : SchemeSpec(::DevicePropertyMessage) { - const val PROPERTY_CHANGED_ACTION = "event.propertyChange" - fun ok(request: DeviceMessage? = null, block: DevicePropertyMessage.() -> Unit = {}): DeviceMessage { - return DevicePropertyMessage { - id = request?.id - property { - name - } - }.apply(block) - } - } -} \ No newline at end of file diff --git a/dataforge-control-core/src/commonMain/kotlin/hep/dataforge/control/controlers/MessageController.kt b/dataforge-control-core/src/commonMain/kotlin/hep/dataforge/control/controlers/MessageController.kt index 778e337..aa4421a 100644 --- a/dataforge-control-core/src/commonMain/kotlin/hep/dataforge/control/controlers/MessageController.kt +++ b/dataforge-control-core/src/commonMain/kotlin/hep/dataforge/control/controlers/MessageController.kt @@ -1,91 +1,110 @@ package hep.dataforge.control.controlers import hep.dataforge.control.api.Device -import hep.dataforge.control.api.PropertyChangeListener -import hep.dataforge.control.controlers.DevicePropertyMessage.Companion.PROPERTY_CHANGED_ACTION +import hep.dataforge.control.api.DeviceListener +import hep.dataforge.control.controlers.DeviceMessage.Companion.PROPERTY_CHANGED_ACTION import hep.dataforge.io.Envelope import hep.dataforge.io.Responder import hep.dataforge.io.SimpleEnvelope import hep.dataforge.meta.MetaItem -import hep.dataforge.meta.get -import hep.dataforge.meta.string import hep.dataforge.meta.wrap +import kotlinx.coroutines.CoroutineScope +import kotlinx.coroutines.channels.Channel +import kotlinx.coroutines.flow.consumeAsFlow +import kotlinx.coroutines.launch import kotlinx.io.Binary -interface MessageConsumer { +/** + * A consumer of envelopes + */ +interface Consumer { fun consume(message: Envelope): Unit } class MessageController( val device: Device, - val deviceTarget: String -) : Responder, PropertyChangeListener { + val deviceTarget: String, + val scope: CoroutineScope = device.scope +) : Consumer, Responder, DeviceListener { init { device.registerListener(this, this) } - var messageListener: MessageConsumer? = null + private val outputChannel = Channel(Channel.CONFLATED) - override suspend fun respond(request: Envelope): Envelope { - val responseMessage: DeviceMessage = try { - when (val action = request.meta[DeviceMessage.MESSAGE_ACTION_KEY].string ?: error("Action not defined")) { - Device.GET_PROPERTY_ACTION -> { - val message = DevicePropertyMessage.wrap(request.meta) - val property = message.property ?: error("Property item not defined") - val propertyName: String = property.name - val result = device.getProperty(propertyName) + suspend fun respondMessage( + request: DeviceMessage + ): DeviceMessage = if (request.target != null && request.target != deviceTarget) { + DeviceMessage.fail { + comment = "Wrong target name $deviceTarget expected but ${request.target} found" + } + } else try { + when (val action = request.action ?: error("Action is not defined in message")) { + Device.GET_PROPERTY_ACTION -> { + val property = request.property ?: error("Payload is not defined or not a property") + val propertyName: String = property.name + val result = device.getProperty(propertyName) - DevicePropertyMessage.ok { - this.source = deviceTarget - this.target = message.source - property { - name = propertyName - value = result - } - } - } - Device.SET_PROPERTY_ACTION -> { - val message = DevicePropertyMessage.wrap(request.meta) - val property = message.property ?: error("Property item not defined") - val propertyName: String = property.name - val propertyValue = property.value - if (propertyValue == null) { - device.invalidateProperty(propertyName) - } else { - device.setProperty(propertyName, propertyValue) - } - DevicePropertyMessage.ok { - this.source = deviceTarget - this.target = message.source - property { - name = propertyName - } - } - } - else -> { - val value = request.meta[DeviceMessage.MESSAGE_VALUE_KEY] - val result = device.call(action, value) - DeviceMessage.ok { - this.source = deviceTarget - this.action = action - this.value = result + DeviceMessage.ok { + this.source = deviceTarget + this.target = request.source + property { + name = propertyName + value = result } } } - } catch (ex: Exception) { - DeviceMessage.fail { - comment = ex.message + Device.SET_PROPERTY_ACTION -> { + val property = request.property ?: error("Payload is not defined or not a property") + val propertyName: String = property.name + val propertyValue = property.value + if (propertyValue == null) { + device.invalidateProperty(propertyName) + } else { + device.setProperty(propertyName, propertyValue) + } + DeviceMessage.ok { + this.source = deviceTarget + this.target = request.source + property { + name = propertyName + } + } + } + else -> { + val value = request.value + val result = device.call(action, value) + DeviceMessage.ok { + this.source = deviceTarget + this.action = action + this.value = result + } } } + } catch (ex: Exception) { + DeviceMessage.fail { + comment = ex.message + } + } + override fun consume(message: Envelope) { + // Fire the respond procedure and forget about the result + scope.launch { + respond(message) + } + } + + override suspend fun respond(request: Envelope): Envelope { + val requestMessage = DeviceMessage.wrap(request.meta) + val responseMessage = respondMessage(requestMessage) return SimpleEnvelope(responseMessage.toMeta(), Binary.EMPTY) } override fun propertyChanged(propertyName: String, value: MetaItem<*>?) { if (value == null) return - messageListener?.let { listener -> - val change = DevicePropertyMessage.ok { + scope.launch { + val change = DeviceMessage.ok { this.source = deviceTarget action = PROPERTY_CHANGED_ACTION property { @@ -94,10 +113,14 @@ class MessageController( } } val envelope = SimpleEnvelope(change.toMeta(), Binary.EMPTY) - listener.consume(envelope) + + outputChannel.send(envelope) } } + fun output() = outputChannel.consumeAsFlow() + + companion object { } } \ No newline at end of file diff --git a/dataforge-control-core/src/commonMain/kotlin/hep/dataforge/control/controlers/MessageFlow.kt b/dataforge-control-core/src/commonMain/kotlin/hep/dataforge/control/controlers/MessageFlow.kt deleted file mode 100644 index aec564d..0000000 --- a/dataforge-control-core/src/commonMain/kotlin/hep/dataforge/control/controlers/MessageFlow.kt +++ /dev/null @@ -1,57 +0,0 @@ -package hep.dataforge.control.controlers - -import hep.dataforge.io.Envelope -import kotlinx.coroutines.CoroutineScope -import kotlinx.coroutines.ExperimentalCoroutinesApi -import kotlinx.coroutines.channels.Channel -import kotlinx.coroutines.channels.Channel.Factory.CONFLATED -import kotlinx.coroutines.channels.SendChannel -import kotlinx.coroutines.flow.Flow -import kotlinx.coroutines.flow.consumeAsFlow -import kotlinx.coroutines.launch -import kotlinx.io.Closeable - -@ExperimentalCoroutinesApi -class MessageFlow( - val controller: MessageController, - val scope: CoroutineScope -) : Closeable, MessageConsumer { - - init { - if (controller.messageListener != null) error("Can't attach controller to $controller, the controller is already attached") - controller.messageListener = this - } - - private val outputChannel = Channel(CONFLATED) - private val inputChannel = Channel(CONFLATED) - - val input: SendChannel get() = inputChannel - val output: Flow = outputChannel.consumeAsFlow() - - init { - scope.launch { - while (!inputChannel.isClosedForSend) { - val request = inputChannel.receive() - val response = controller.respond(request) - outputChannel.send(response) - } - } - } - - override fun consume(message: Envelope) { - scope.launch { - outputChannel.send(message) - } - } - - override fun close() { - outputChannel.cancel() - } -} - -@ExperimentalCoroutinesApi -fun MessageController.flow(scope: CoroutineScope = device.scope): MessageFlow { - return MessageFlow(this, scope).also { - this@flow.messageListener = it - } -} \ No newline at end of file diff --git a/dataforge-control-core/src/commonMain/kotlin/hep/dataforge/control/controlers/PropertyFlow.kt b/dataforge-control-core/src/commonMain/kotlin/hep/dataforge/control/controlers/PropertyFlow.kt index 2861a06..8a3d82e 100644 --- a/dataforge-control-core/src/commonMain/kotlin/hep/dataforge/control/controlers/PropertyFlow.kt +++ b/dataforge-control-core/src/commonMain/kotlin/hep/dataforge/control/controlers/PropertyFlow.kt @@ -1,7 +1,7 @@ package hep.dataforge.control.controlers import hep.dataforge.control.api.Device -import hep.dataforge.control.api.PropertyChangeListener +import hep.dataforge.control.api.DeviceListener import hep.dataforge.meta.MetaItem import kotlinx.coroutines.ExperimentalCoroutinesApi import kotlinx.coroutines.channels.awaitClose @@ -12,7 +12,7 @@ import kotlinx.coroutines.launch @ExperimentalCoroutinesApi suspend fun Device.flowValues(): Flow>> = callbackFlow { - val listener = object : PropertyChangeListener { + val listener = object : DeviceListener { override fun propertyChanged(propertyName: String, value: MetaItem<*>?) { if (value != null) { launch { diff --git a/dataforge-control-server/build.gradle.kts b/dataforge-control-server/build.gradle.kts new file mode 100644 index 0000000..6e0da60 --- /dev/null +++ b/dataforge-control-server/build.gradle.kts @@ -0,0 +1,21 @@ +import scientifik.useCoroutines +import scientifik.useSerialization + +plugins { + id("scientifik.jvm") + id("scientifik.publish") + application +} + +useSerialization() + +val dataforgeVersion: String by rootProject.extra +val ktorVersion: String by extra("1.3.2") + +dependencies{ + implementation(project(":dataforge-control-core")) + implementation("io.ktor:ktor-server-cio:$ktorVersion") + implementation("io.ktor:ktor-websockets:$ktorVersion") + implementation("io.ktor:ktor-serialization:$ktorVersion") + implementation("io.ktor:ktor-html-builder:$ktorVersion") +} \ No newline at end of file diff --git a/dataforge-control-server/src/main/kotlin/hep/dataforge/control/server/deviceWebServer.kt b/dataforge-control-server/src/main/kotlin/hep/dataforge/control/server/deviceWebServer.kt new file mode 100644 index 0000000..d04baf1 --- /dev/null +++ b/dataforge-control-server/src/main/kotlin/hep/dataforge/control/server/deviceWebServer.kt @@ -0,0 +1,217 @@ +@file:OptIn(ExperimentalCoroutinesApi::class, KtorExperimentalAPI::class, FlowPreview::class, UnstableDefault::class) + +package hep.dataforge.control.server + +import hep.dataforge.control.api.Device +import hep.dataforge.control.controlers.DeviceMessage +import hep.dataforge.control.controlers.MessageController +import hep.dataforge.control.controlers.property +import hep.dataforge.meta.toJson +import hep.dataforge.meta.toMeta +import hep.dataforge.meta.toMetaItem +import hep.dataforge.meta.wrap +import io.ktor.application.* +import io.ktor.features.ContentNegotiation +import io.ktor.features.StatusPages +import io.ktor.html.respondHtml +import io.ktor.http.ContentType +import io.ktor.http.HttpStatusCode +import io.ktor.request.receiveText +import io.ktor.response.respond +import io.ktor.response.respondRedirect +import io.ktor.response.respondText +import io.ktor.routing.* +import io.ktor.serialization.json +import io.ktor.server.cio.CIO +import io.ktor.server.engine.ApplicationEngine +import io.ktor.server.engine.embeddedServer +import io.ktor.util.KtorExperimentalAPI +import io.ktor.util.getValue +import io.ktor.websocket.WebSockets +import io.ktor.websocket.webSocket +import kotlinx.coroutines.CoroutineScope +import kotlinx.coroutines.ExperimentalCoroutinesApi +import kotlinx.coroutines.FlowPreview +import kotlinx.coroutines.flow.asFlow +import kotlinx.coroutines.flow.collect +import kotlinx.coroutines.flow.flatMapMerge +import kotlinx.html.body +import kotlinx.html.h1 +import kotlinx.html.head +import kotlinx.html.title +import kotlinx.serialization.UnstableDefault +import kotlinx.serialization.json.* + +/** + * Create and start a web server for several devices + */ +fun CoroutineScope.startDeviceServer( + devices: Map, + port: Int = 8111, + host: String = "0.0.0.0" +): ApplicationEngine { + + val controllers = devices.mapValues { + MessageController(it.value, it.key, this) + } + + return embeddedServer(CIO, port, host) { + install(WebSockets) + install(ContentNegotiation) { + json() + } + install(StatusPages) { + exception { cause -> + call.respond(HttpStatusCode.BadRequest, cause.message ?: "") + } + } + routing { + routeDevices(controllers) + get("/") { + call.respondRedirect("/dashboard") + } + + } + }.start() +} + +suspend fun ApplicationCall.respondJson(builder: JsonObjectBuilder.() -> Unit) { + val json = json(builder) + respondText(json.toString(), contentType = ContentType.Application.Json) +} + +const val WEB_SERVER_TARGET = "@webServer" + + +private suspend fun ApplicationCall.message(target: MessageController) { + val body = receiveText() + val json = Json.parseJson(body) as? JsonObject + ?: throw IllegalArgumentException("The body is not a json object") + val meta = json.toMeta() + + val request = DeviceMessage.wrap(meta) + + val response = target.respondMessage(request) + respond(response.toMeta()) +} + +private suspend fun ApplicationCall.getProperty(target: MessageController) { + val property: String by parameters + val request = DeviceMessage { + action = Device.GET_PROPERTY_ACTION + source = WEB_SERVER_TARGET + this.target = target.deviceTarget + property { + name = property + } + } + + val response = target.respondMessage(request) + respond(response.toMeta()) +} + +private suspend fun ApplicationCall.setProperty(target: MessageController) { + val property: String by parameters + val body = receiveText() + val json = Json.parseJson(body) + + val request = DeviceMessage { + action = Device.SET_PROPERTY_ACTION + source = WEB_SERVER_TARGET + this.target = target.deviceTarget + property { + name = property + value = json.toMetaItem() + } + } + + val response = target.respondMessage(request) + respondMessage(response) +} + +fun Routing.routeDevices(targets: Map) { + this.application.feature(WebSockets) + + fun generateFlow(target: String?) = if (target == null) { + targets.values.asFlow().flatMapMerge { it.output() } + } else { + targets[target]?.output() ?: error("The device with target $target not found") + } + + get("dashboard") { + call.respondHtml { + head { + title("Device server dashboard") + } + body { + h1 { + +"Under construction" + } + } + } + } + + get("list") { + call.respondJson { + targets.values.forEach { controller -> + "target" to controller.deviceTarget + val device = controller.device + "properties" to jsonArray { + device.propertyDescriptors.forEach { descriptor -> + +descriptor.config.toJson() + } + } + "actions" to jsonArray { + device.actionDescriptors.forEach { actionDescriptor -> + +actionDescriptor.config.toJson() + } + } + } + } + } + //Check if application supports websockets and if it does add a push channel + if (this.application.featureOrNull(WebSockets) != null) { + webSocket("ws") { + //subscribe on device + val target: String? by call.request.queryParameters + + try { + application.log.debug("Opened server socket for ${call.request.queryParameters}") + + generateFlow(target).collect { + outgoing.send(it.toFrame()) + } + + } catch (ex: Exception) { + application.log.debug("Closed server socket for ${call.request.queryParameters}") + } + } + } + + post("message") { + val target: String by call.request.queryParameters + val controller = targets[target] ?: throw IllegalArgumentException("Target $target not found in $targets") + call.message(controller) + } + + route("{target}") { + //global route for the device + + route("{property}") { + get("get") { + val target: String by call.parameters + val controller = targets[target] + ?: throw IllegalArgumentException("Target $target not found in $targets") + + call.getProperty(controller) + } + post("set") { + val target: String by call.parameters + val controller = + targets[target] ?: throw IllegalArgumentException("Target $target not found in $targets") + + call.setProperty(controller) + } + } + } +} \ No newline at end of file diff --git a/dataforge-control-server/src/main/kotlin/hep/dataforge/control/server/envelopes.kt b/dataforge-control-server/src/main/kotlin/hep/dataforge/control/server/envelopes.kt new file mode 100644 index 0000000..60c0f1e --- /dev/null +++ b/dataforge-control-server/src/main/kotlin/hep/dataforge/control/server/envelopes.kt @@ -0,0 +1,27 @@ +package hep.dataforge.control.server + +import hep.dataforge.control.controlers.DeviceMessage +import hep.dataforge.io.Envelope +import io.ktor.application.ApplicationCall +import io.ktor.http.cio.websocket.Frame +import io.ktor.response.ApplicationResponse + +fun Frame.toEnvelope(): Envelope { + TODO() +} + +fun Envelope.toFrame(): Frame { + TODO() +} + +suspend fun ApplicationCall.respondMessage(message: DeviceMessage) { + TODO() +} + +suspend fun ApplicationCall.respondMessage(builder: DeviceMessage.() -> Unit) { + respondMessage(DeviceMessage(builder)) +} + +suspend fun ApplicationCall.respondFail(builder: DeviceMessage.() -> Unit) { + respondMessage(DeviceMessage.fail(null, builder)) +} \ No newline at end of file diff --git a/dataforge-control-server/src/main/kotlin/hep/dataforge/control/server/sse.kt b/dataforge-control-server/src/main/kotlin/hep/dataforge/control/server/sse.kt new file mode 100644 index 0000000..34ce2c1 --- /dev/null +++ b/dataforge-control-server/src/main/kotlin/hep/dataforge/control/server/sse.kt @@ -0,0 +1,41 @@ +package hep.dataforge.control.server + +import io.ktor.application.ApplicationCall +import io.ktor.http.CacheControl +import io.ktor.http.ContentType +import io.ktor.response.cacheControl +import io.ktor.response.respondTextWriter +import kotlinx.coroutines.channels.ReceiveChannel +import kotlinx.coroutines.flow.Flow +import kotlinx.coroutines.flow.collect + +/** + * The data class representing a SSE Event that will be sent to the client. + */ +data class SseEvent(val data: String, val event: String? = null, val id: String? = null) + +/** + * Method that responds an [ApplicationCall] by reading all the [SseEvent]s from the specified [events] [ReceiveChannel] + * and serializing them in a way that is compatible with the Server-Sent Events specification. + * + * You can read more about it here: https://www.html5rocks.com/en/tutorials/eventsource/basics/ + */ +@Suppress("BlockingMethodInNonBlockingContext") +suspend fun ApplicationCall.respondSse(events: Flow) { + response.cacheControl(CacheControl.NoCache(null)) + respondTextWriter(contentType = ContentType.Text.EventStream) { + events.collect { event-> + if (event.id != null) { + write("id: ${event.id}\n") + } + if (event.event != null) { + write("event: ${event.event}\n") + } + for (dataLine in event.data.lines()) { + write("data: $dataLine\n") + } + write("\n") + flush() + } + } +} \ No newline at end of file diff --git a/demo/build.gradle.kts b/demo/build.gradle.kts index ae448df..6c5eeaa 100644 --- a/demo/build.gradle.kts +++ b/demo/build.gradle.kts @@ -16,6 +16,7 @@ repositories{ dependencies{ implementation(project(":dataforge-control-core")) + implementation(project(":dataforge-control-server")) implementation("no.tornado:tornadofx:1.7.20") implementation(kotlin("stdlib-jdk8")) implementation("scientifik:plotlykt-server:$plotlyVersion") diff --git a/demo/src/main/kotlin/hep/dataforge/control/demo/DemoDevice.kt b/demo/src/main/kotlin/hep/dataforge/control/demo/DemoDevice.kt index ca2e300..3d871e0 100644 --- a/demo/src/main/kotlin/hep/dataforge/control/demo/DemoDevice.kt +++ b/demo/src/main/kotlin/hep/dataforge/control/demo/DemoDevice.kt @@ -5,6 +5,7 @@ import hep.dataforge.control.controlers.double import hep.dataforge.values.asValue import kotlinx.coroutines.CoroutineScope import kotlinx.coroutines.GlobalScope +import kotlinx.coroutines.Job import kotlinx.coroutines.asCoroutineDispatcher import java.time.Instant import java.util.concurrent.Executors @@ -19,7 +20,7 @@ class DemoDevice(parentScope: CoroutineScope = GlobalScope) : DeviceBase() { private val executor = Executors.newSingleThreadExecutor() override val scope: CoroutineScope = CoroutineScope( - parentScope.coroutineContext + executor.asCoroutineDispatcher() + parentScope.coroutineContext + executor.asCoroutineDispatcher() + Job(parentScope.coroutineContext[Job]) ) val timeScale: IsolatedDeviceProperty by writingVirtual(5000.0.asValue()) diff --git a/settings.gradle.kts b/settings.gradle.kts index 4d7e789..a7516a4 100644 --- a/settings.gradle.kts +++ b/settings.gradle.kts @@ -37,6 +37,7 @@ rootProject.name = "dataforge-control" include( ":dataforge-control-core", + ":dataforge-control-server", ":demo" )