diff --git a/README.md b/README.md index cb3a920..6218ae3 100644 --- a/README.md +++ b/README.md @@ -32,7 +32,7 @@ Among other things, you can: ### `dataforge-control-core` module packages - `api` - defines API for device management. The main class here is -[`Device`](dataforge-control-core/src/commonMain/kotlin/hep/dataforge/control/api/Device.kt). +[`Device`](dataforge-device-core/src/commonMain/kotlin/hep/dataforge/control/api/Device.kt). Generally, a Device has Properties that can be read and written. Also, some Actions can optionally be applied on a device (may or may not affect properties). diff --git a/build.gradle.kts b/build.gradle.kts index 10a5379..23e34aa 100644 --- a/build.gradle.kts +++ b/build.gradle.kts @@ -1,5 +1,5 @@ -val dataforgeVersion by extra("0.1.8-dev-4") -val plotlyVersion by extra("0.2.0-dev-4") +val dataforgeVersion by extra("0.1.8") +val plotlyVersion by extra("0.2.0-dev-12") allprojects { diff --git a/dataforge-control-core/src/commonMain/kotlin/hep/dataforge/control/controllers/delegateMappers.kt b/dataforge-control-core/src/commonMain/kotlin/hep/dataforge/control/controllers/delegateMappers.kt deleted file mode 100644 index d80ceeb..0000000 --- a/dataforge-control-core/src/commonMain/kotlin/hep/dataforge/control/controllers/delegateMappers.kt +++ /dev/null @@ -1,14 +0,0 @@ -package hep.dataforge.control.controllers - -import hep.dataforge.control.base.DeviceProperty -import hep.dataforge.control.base.ReadOnlyDeviceProperty -import hep.dataforge.meta.MetaItem -import hep.dataforge.meta.double -import hep.dataforge.meta.map -import hep.dataforge.values.asValue - -fun ReadOnlyDeviceProperty.double() = map { it.double } -fun DeviceProperty.double() = map( - reader = { it.double ?: Double.NaN }, - writer = { MetaItem.ValueItem(it.asValue()) } -) diff --git a/dataforge-control-server/src/main/kotlin/hep/dataforge/control/server/envelopes.kt b/dataforge-control-server/src/main/kotlin/hep/dataforge/control/server/envelopes.kt deleted file mode 100644 index 095100b..0000000 --- a/dataforge-control-server/src/main/kotlin/hep/dataforge/control/server/envelopes.kt +++ /dev/null @@ -1,26 +0,0 @@ -package hep.dataforge.control.server - -import hep.dataforge.control.controllers.DeviceMessage -import hep.dataforge.io.Envelope -import io.ktor.application.ApplicationCall -import io.ktor.http.cio.websocket.Frame - -fun Frame.toEnvelope(): Envelope { - TODO() -} - -fun Envelope.toFrame(): Frame { - TODO() -} - -suspend fun ApplicationCall.respondMessage(message: DeviceMessage) { - TODO() -} - -suspend fun ApplicationCall.respondMessage(builder: DeviceMessage.() -> Unit) { - respondMessage(DeviceMessage(builder)) -} - -suspend fun ApplicationCall.respondFail(builder: DeviceMessage.() -> Unit) { - respondMessage(DeviceMessage.fail(null, builder)) -} \ No newline at end of file diff --git a/dataforge-device-client/build.gradle.kts b/dataforge-device-client/build.gradle.kts new file mode 100644 index 0000000..1404ffe --- /dev/null +++ b/dataforge-device-client/build.gradle.kts @@ -0,0 +1,18 @@ +plugins { + id("scientifik.mpp") + id("scientifik.publish") +} + +val ktorVersion: String by extra("1.3.2") + + +kotlin { + sourceSets { + commonMain{ + dependencies { + implementation(project(":dataforge-device-core")) + implementation("io.ktor:ktor-client-cio:$ktorVersion") + } + } + } +} \ No newline at end of file diff --git a/dataforge-control-core/build.gradle.kts b/dataforge-device-core/build.gradle.kts similarity index 86% rename from dataforge-control-core/build.gradle.kts rename to dataforge-device-core/build.gradle.kts index 38e5a8e..a809fd7 100644 --- a/dataforge-control-core/build.gradle.kts +++ b/dataforge-device-core/build.gradle.kts @@ -4,7 +4,6 @@ import scientifik.useSerialization plugins { id("scientifik.mpp") id("scientifik.publish") - id("kotlinx-atomicfu") version "0.14.3" } val dataforgeVersion: String by rootProject.extra @@ -21,8 +20,4 @@ kotlin { } } } -} - -atomicfu { - variant = "VH" } \ No newline at end of file diff --git a/dataforge-control-core/src/commonMain/kotlin/hep/dataforge/control/api/Device.kt b/dataforge-device-core/src/commonMain/kotlin/hep/dataforge/control/api/Device.kt similarity index 87% rename from dataforge-control-core/src/commonMain/kotlin/hep/dataforge/control/api/Device.kt rename to dataforge-device-core/src/commonMain/kotlin/hep/dataforge/control/api/Device.kt index 448a4fc..cda02fc 100644 --- a/dataforge-control-core/src/commonMain/kotlin/hep/dataforge/control/api/Device.kt +++ b/dataforge-device-core/src/commonMain/kotlin/hep/dataforge/control/api/Device.kt @@ -59,16 +59,11 @@ interface Device: Closeable { * Send an action request and suspend caller while request is being processed. * Could return null if request does not return a meaningful answer. */ - suspend fun call(action: String, argument: MetaItem<*>? = null): MetaItem<*>? + suspend fun exec(action: String, argument: MetaItem<*>? = null): MetaItem<*>? override fun close() { scope.cancel("The device is closed") } - - companion object { - const val GET_PROPERTY_ACTION = "@get" - const val SET_PROPERTY_ACTION = "@set" - } } -suspend fun Device.call(name: String, meta: Meta?) = call(name, meta?.let { MetaItem.NodeItem(it) }) \ No newline at end of file +suspend fun Device.exec(name: String, meta: Meta?) = exec(name, meta?.let { MetaItem.NodeItem(it) }) \ No newline at end of file diff --git a/dataforge-control-core/src/commonMain/kotlin/hep/dataforge/control/api/DeviceHub.kt b/dataforge-device-core/src/commonMain/kotlin/hep/dataforge/control/api/DeviceHub.kt similarity index 89% rename from dataforge-control-core/src/commonMain/kotlin/hep/dataforge/control/api/DeviceHub.kt rename to dataforge-device-core/src/commonMain/kotlin/hep/dataforge/control/api/DeviceHub.kt index 46958c2..3ad2014 100644 --- a/dataforge-control-core/src/commonMain/kotlin/hep/dataforge/control/api/DeviceHub.kt +++ b/dataforge-device-core/src/commonMain/kotlin/hep/dataforge/control/api/DeviceHub.kt @@ -18,6 +18,6 @@ suspend fun DeviceHub.setProperty(deviceName: String, propertyName: String, valu .setProperty(propertyName, value) } -suspend fun DeviceHub.call(deviceName: String, command: String, argument: MetaItem<*>?): MetaItem<*>? = +suspend fun DeviceHub.exec(deviceName: String, command: String, argument: MetaItem<*>?): MetaItem<*>? = (getDevice(deviceName) ?: error("Device with name $deviceName not found in the hub")) - .call(command, argument) \ No newline at end of file + .exec(command, argument) \ No newline at end of file diff --git a/dataforge-control-core/src/commonMain/kotlin/hep/dataforge/control/api/DeviceListener.kt b/dataforge-device-core/src/commonMain/kotlin/hep/dataforge/control/api/DeviceListener.kt similarity index 100% rename from dataforge-control-core/src/commonMain/kotlin/hep/dataforge/control/api/DeviceListener.kt rename to dataforge-device-core/src/commonMain/kotlin/hep/dataforge/control/api/DeviceListener.kt diff --git a/dataforge-control-core/src/commonMain/kotlin/hep/dataforge/control/api/descriptors.kt b/dataforge-device-core/src/commonMain/kotlin/hep/dataforge/control/api/descriptors.kt similarity index 100% rename from dataforge-control-core/src/commonMain/kotlin/hep/dataforge/control/api/descriptors.kt rename to dataforge-device-core/src/commonMain/kotlin/hep/dataforge/control/api/descriptors.kt diff --git a/dataforge-control-core/src/commonMain/kotlin/hep/dataforge/control/base/Action.kt b/dataforge-device-core/src/commonMain/kotlin/hep/dataforge/control/base/Action.kt similarity index 100% rename from dataforge-control-core/src/commonMain/kotlin/hep/dataforge/control/base/Action.kt rename to dataforge-device-core/src/commonMain/kotlin/hep/dataforge/control/base/Action.kt diff --git a/dataforge-control-core/src/commonMain/kotlin/hep/dataforge/control/base/DeviceBase.kt b/dataforge-device-core/src/commonMain/kotlin/hep/dataforge/control/base/DeviceBase.kt similarity index 97% rename from dataforge-control-core/src/commonMain/kotlin/hep/dataforge/control/base/DeviceBase.kt rename to dataforge-device-core/src/commonMain/kotlin/hep/dataforge/control/base/DeviceBase.kt index 79c06a5..16a2ff5 100644 --- a/dataforge-control-core/src/commonMain/kotlin/hep/dataforge/control/base/DeviceBase.kt +++ b/dataforge-device-core/src/commonMain/kotlin/hep/dataforge/control/base/DeviceBase.kt @@ -54,7 +54,7 @@ abstract class DeviceBase : Device { ) } - override suspend fun call(action: String, argument: MetaItem<*>?): MetaItem<*>? = + override suspend fun exec(action: String, argument: MetaItem<*>?): MetaItem<*>? = (actions[action] ?: error("Request with name $action not defined")).invoke(argument) diff --git a/dataforge-control-core/src/commonMain/kotlin/hep/dataforge/control/base/DeviceProperty.kt b/dataforge-device-core/src/commonMain/kotlin/hep/dataforge/control/base/DeviceProperty.kt similarity index 74% rename from dataforge-control-core/src/commonMain/kotlin/hep/dataforge/control/base/DeviceProperty.kt rename to dataforge-device-core/src/commonMain/kotlin/hep/dataforge/control/base/DeviceProperty.kt index 34ce529..d100b9c 100644 --- a/dataforge-control-core/src/commonMain/kotlin/hep/dataforge/control/base/DeviceProperty.kt +++ b/dataforge-device-core/src/commonMain/kotlin/hep/dataforge/control/base/DeviceProperty.kt @@ -4,15 +4,12 @@ import hep.dataforge.control.api.PropertyDescriptor import hep.dataforge.meta.MetaItem import kotlinx.coroutines.* import kotlinx.coroutines.flow.Flow -import kotlin.properties.ReadOnlyProperty -import kotlin.properties.ReadWriteProperty -import kotlin.reflect.KProperty import kotlin.time.Duration /** * Read-only device property */ -interface ReadOnlyDeviceProperty : ReadOnlyProperty?> { +interface ReadOnlyDeviceProperty { /** * Property name, should be unique in device */ @@ -51,10 +48,9 @@ interface ReadOnlyDeviceProperty : ReadOnlyProperty?> { * Produces null when the state is invalidated */ fun flow(): Flow?> - - override fun getValue(thisRef: Any?, property: KProperty<*>): MetaItem<*>? = value } + /** * Launch recurring force re-read job on a property scope with given [duration] between reads. */ @@ -68,18 +64,11 @@ fun ReadOnlyDeviceProperty.readEvery(duration: Duration): Job = scope.launch { /** * A writeable device property with non-suspended write */ -interface DeviceProperty : ReadOnlyDeviceProperty, ReadWriteProperty?> { +interface DeviceProperty : ReadOnlyDeviceProperty { override var value: MetaItem<*>? /** * Write value to physical device. Invalidates logical value, but does not update it automatically */ suspend fun write(item: MetaItem<*>) - - override fun setValue(thisRef: Any?, property: KProperty<*>, value: MetaItem<*>?) { - this.value = value - } - - override fun getValue(thisRef: Any?, property: KProperty<*>): MetaItem<*>? = value -} - +} \ No newline at end of file diff --git a/dataforge-control-core/src/commonMain/kotlin/hep/dataforge/control/base/IsolatedDeviceProperty.kt b/dataforge-device-core/src/commonMain/kotlin/hep/dataforge/control/base/IsolatedDeviceProperty.kt similarity index 100% rename from dataforge-control-core/src/commonMain/kotlin/hep/dataforge/control/base/IsolatedDeviceProperty.kt rename to dataforge-device-core/src/commonMain/kotlin/hep/dataforge/control/base/IsolatedDeviceProperty.kt diff --git a/dataforge-control-core/src/commonMain/kotlin/hep/dataforge/control/base/misc.kt b/dataforge-device-core/src/commonMain/kotlin/hep/dataforge/control/base/misc.kt similarity index 100% rename from dataforge-control-core/src/commonMain/kotlin/hep/dataforge/control/base/misc.kt rename to dataforge-device-core/src/commonMain/kotlin/hep/dataforge/control/base/misc.kt diff --git a/dataforge-control-core/src/commonMain/kotlin/hep/dataforge/control/controllers/DeviceMessage.kt b/dataforge-device-core/src/commonMain/kotlin/hep/dataforge/control/controllers/DeviceMessage.kt similarity index 56% rename from dataforge-control-core/src/commonMain/kotlin/hep/dataforge/control/controllers/DeviceMessage.kt rename to dataforge-device-core/src/commonMain/kotlin/hep/dataforge/control/controllers/DeviceMessage.kt index f663658..78cccba 100644 --- a/dataforge-control-core/src/commonMain/kotlin/hep/dataforge/control/controllers/DeviceMessage.kt +++ b/dataforge-device-core/src/commonMain/kotlin/hep/dataforge/control/controllers/DeviceMessage.kt @@ -3,26 +3,29 @@ package hep.dataforge.control.controllers import hep.dataforge.control.controllers.DeviceMessage.Companion.PAYLOAD_VALUE_KEY import hep.dataforge.meta.* import hep.dataforge.names.asName -import hep.dataforge.names.plus + class DeviceMessage : Scheme() { - var id by item() - var source by string()//TODO consider replacing by item + var id by string { error("The message id must not be empty") } + var parent by string() + var origin by string() var target by string() + var action by string(default = MessageController.GET_PROPERTY_ACTION, key = MESSAGE_ACTION_KEY) var comment by string() - var action by string(key = MESSAGE_ACTION_KEY) var status by string(RESPONSE_OK_STATUS) - var payload by config(key = MESSAGE_PAYLOAD_KEY) - - var value by item(key = (MESSAGE_PAYLOAD_KEY + PAYLOAD_VALUE_KEY)) + var payload: List + get() = config.getIndexed(MESSAGE_PAYLOAD_KEY).values.map { MessagePayload.wrap(it.node!!) } + set(value) { + config[MESSAGE_PAYLOAD_KEY] = value.map { it.config } + } /** - * Set a payload for this message according to the given scheme + * Append a payload to this message according to the given scheme */ - inline fun payload(spec: Specification, block: T.() -> Unit): T = - (payload?.let { spec.wrap(it) } ?: spec.empty().also { payload = it.config }).apply(block) + fun append(spec: Specification, block: T.() -> Unit): T = + spec.invoke(block).also { config.append(MESSAGE_PAYLOAD_KEY, it) } - companion object : SchemeSpec(::DeviceMessage){ + companion object : SchemeSpec(::DeviceMessage) { val MESSAGE_ACTION_KEY = "action".asName() val MESSAGE_PAYLOAD_KEY = "payload".asName() val PAYLOAD_VALUE_KEY = "value".asName() @@ -34,32 +37,25 @@ class DeviceMessage : Scheme() { request: DeviceMessage? = null, block: DeviceMessage.() -> Unit = {} ): DeviceMessage = DeviceMessage { - id = request?.id + parent = request?.id }.apply(block) inline fun fail( request: DeviceMessage? = null, block: DeviceMessage.() -> Unit = {} ): DeviceMessage = DeviceMessage { - id = request?.id + parent = request?.id status = RESPONSE_FAIL_STATUS }.apply(block) } } -class PropertyPayload : Scheme() { +class MessagePayload : Scheme() { var name by string { error("Property name could not be empty") } var value by item(key = PAYLOAD_VALUE_KEY) - companion object : SchemeSpec(::PropertyPayload) + companion object : SchemeSpec(::MessagePayload) } @DFBuilder -inline fun DeviceMessage.property(block: PropertyPayload.() -> Unit): PropertyPayload = payload(PropertyPayload, block) - -var DeviceMessage.property: PropertyPayload? - get() = payload?.let { PropertyPayload.wrap(it) } - set(value) { - payload = value?.config - } - +fun DeviceMessage.property(block: MessagePayload.() -> Unit): MessagePayload = append(MessagePayload, block) diff --git a/dataforge-control-core/src/commonMain/kotlin/hep/dataforge/control/controllers/MessageController.kt b/dataforge-device-core/src/commonMain/kotlin/hep/dataforge/control/controllers/MessageController.kt similarity index 54% rename from dataforge-control-core/src/commonMain/kotlin/hep/dataforge/control/controllers/MessageController.kt rename to dataforge-device-core/src/commonMain/kotlin/hep/dataforge/control/controllers/MessageController.kt index ef0eb44..1c61c33 100644 --- a/dataforge-control-core/src/commonMain/kotlin/hep/dataforge/control/controllers/MessageController.kt +++ b/dataforge-device-core/src/commonMain/kotlin/hep/dataforge/control/controllers/MessageController.kt @@ -40,48 +40,66 @@ class MessageController( comment = "Wrong target name $deviceTarget expected but ${request.target} found" } } else try { - when (val action = request.action ?: error("Action is not defined in message")) { - Device.GET_PROPERTY_ACTION -> { - val property = request.property ?: error("Payload is not defined or not a property") - val propertyName: String = property.name - val result = device.getProperty(propertyName) + val result: List = when (val action = request.action) { + GET_PROPERTY_ACTION -> { + request.payload.map { property -> + MessagePayload { + name = property.name + value = device.getProperty(name) + } + } + } + SET_PROPERTY_ACTION -> { + request.payload.map { property -> + val propertyName: String = property.name + val propertyValue = property.value + if (propertyValue == null) { + device.invalidateProperty(propertyName) + } else { + device.setProperty(propertyName, propertyValue) + } + MessagePayload { + name = propertyName + value = device.getProperty(propertyName) + } + } + } + EXECUTE_ACTION -> { + request.payload.map { payload -> + MessagePayload { + name = payload.name + value = device.exec(payload.name, payload.value) + } + } + } + PROPERTY_LIST_ACTION -> { + device.propertyDescriptors.map { descriptor -> + MessagePayload { + name = descriptor.name + value = MetaItem.NodeItem(descriptor.config) + } + } + } - DeviceMessage.ok { - this.source = deviceTarget - this.target = request.source - property { - name = propertyName - value = result - } - } - } - Device.SET_PROPERTY_ACTION -> { - val property = request.property ?: error("Payload is not defined or not a property") - val propertyName: String = property.name - val propertyValue = property.value - if (propertyValue == null) { - device.invalidateProperty(propertyName) - } else { - device.setProperty(propertyName, propertyValue) - } - DeviceMessage.ok { - this.source = deviceTarget - this.target = request.source - property { - name = propertyName + ACTION_LIST_ACTION -> { + device.actionDescriptors.map { descriptor -> + MessagePayload { + name = descriptor.name + value = MetaItem.NodeItem(descriptor.config) } } } + else -> { - val value = request.value - val result = device.call(action, value) - DeviceMessage.ok { - this.source = deviceTarget - this.action = action - this.value = result - } + error("Unrecognized action $action") } } + DeviceMessage.ok { + this.parent = request.id + this.origin = deviceTarget + this.target = request.origin + this.payload = result + } } catch (ex: Exception) { DeviceMessage.fail { comment = ex.message @@ -105,7 +123,7 @@ class MessageController( if (value == null) return scope.launch { val change = DeviceMessage.ok { - this.source = deviceTarget + this.origin = deviceTarget action = PROPERTY_CHANGED_ACTION property { name = propertyName @@ -122,5 +140,10 @@ class MessageController( companion object { + const val GET_PROPERTY_ACTION = "read" + const val SET_PROPERTY_ACTION = "write" + const val EXECUTE_ACTION = "execute" + const val PROPERTY_LIST_ACTION = "propertyList" + const val ACTION_LIST_ACTION = "actionList" } } \ No newline at end of file diff --git a/dataforge-control-core/src/commonMain/kotlin/hep/dataforge/control/controllers/MessageFlow.kt b/dataforge-device-core/src/commonMain/kotlin/hep/dataforge/control/controllers/MessageFlow.kt similarity index 100% rename from dataforge-control-core/src/commonMain/kotlin/hep/dataforge/control/controllers/MessageFlow.kt rename to dataforge-device-core/src/commonMain/kotlin/hep/dataforge/control/controllers/MessageFlow.kt diff --git a/dataforge-control-core/src/commonMain/kotlin/hep/dataforge/control/controllers/PropertyFlow.kt b/dataforge-device-core/src/commonMain/kotlin/hep/dataforge/control/controllers/PropertyFlow.kt similarity index 100% rename from dataforge-control-core/src/commonMain/kotlin/hep/dataforge/control/controllers/PropertyFlow.kt rename to dataforge-device-core/src/commonMain/kotlin/hep/dataforge/control/controllers/PropertyFlow.kt diff --git a/dataforge-device-core/src/commonMain/kotlin/hep/dataforge/control/controllers/delegates.kt b/dataforge-device-core/src/commonMain/kotlin/hep/dataforge/control/controllers/delegates.kt new file mode 100644 index 0000000..b315926 --- /dev/null +++ b/dataforge-device-core/src/commonMain/kotlin/hep/dataforge/control/controllers/delegates.kt @@ -0,0 +1,40 @@ +package hep.dataforge.control.controllers + +import hep.dataforge.control.base.DeviceProperty +import hep.dataforge.control.base.ReadOnlyDeviceProperty +import hep.dataforge.meta.MetaItem +import hep.dataforge.meta.transformations.MetaConverter +import hep.dataforge.values.Null +import kotlin.properties.ReadOnlyProperty +import kotlin.properties.ReadWriteProperty +import kotlin.reflect.KProperty + +operator fun ReadOnlyDeviceProperty.getValue(thisRef: Any?, property: KProperty<*>): MetaItem<*> = + value ?: MetaItem.ValueItem(Null) + +operator fun DeviceProperty.setValue(thisRef: Any?, property: KProperty<*>, value: MetaItem<*>) { + this.value = value +} + +fun ReadOnlyDeviceProperty.convert(metaConverter: MetaConverter): ReadOnlyProperty { + return object : ReadOnlyProperty { + override fun getValue(thisRef: Any?, property: KProperty<*>): T { + return this@convert.getValue(thisRef, property).let { metaConverter.itemToObject(it) } + } + } +} + +fun DeviceProperty.convert(metaConverter: MetaConverter): ReadWriteProperty { + return object : ReadWriteProperty { + override fun getValue(thisRef: Any?, property: KProperty<*>): T { + return this@convert.getValue(thisRef, property).let { metaConverter.itemToObject(it) } + } + + override fun setValue(thisRef: Any?, property: KProperty<*>, value: T) { + this@convert.setValue(thisRef, property, value.let { metaConverter.objectToMetaItem(it) }) + } + } +} + +fun ReadOnlyDeviceProperty.double() = convert(MetaConverter.double) +fun DeviceProperty.double() = convert(MetaConverter.double) diff --git a/dataforge-control-server/build.gradle.kts b/dataforge-device-server/build.gradle.kts similarity index 81% rename from dataforge-control-server/build.gradle.kts rename to dataforge-device-server/build.gradle.kts index 6e0da60..a04075f 100644 --- a/dataforge-control-server/build.gradle.kts +++ b/dataforge-device-server/build.gradle.kts @@ -1,10 +1,8 @@ -import scientifik.useCoroutines import scientifik.useSerialization plugins { id("scientifik.jvm") id("scientifik.publish") - application } useSerialization() @@ -13,7 +11,7 @@ val dataforgeVersion: String by rootProject.extra val ktorVersion: String by extra("1.3.2") dependencies{ - implementation(project(":dataforge-control-core")) + implementation(project(":dataforge-device-core")) implementation("io.ktor:ktor-server-cio:$ktorVersion") implementation("io.ktor:ktor-websockets:$ktorVersion") implementation("io.ktor:ktor-serialization:$ktorVersion") diff --git a/dataforge-device-server/src/main/kotlin/hep/dataforge/control/server/conversions.kt b/dataforge-device-server/src/main/kotlin/hep/dataforge/control/server/conversions.kt new file mode 100644 index 0000000..7bc25eb --- /dev/null +++ b/dataforge-device-server/src/main/kotlin/hep/dataforge/control/server/conversions.kt @@ -0,0 +1,43 @@ +package hep.dataforge.control.server + +import hep.dataforge.control.controllers.DeviceMessage +import hep.dataforge.io.* +import hep.dataforge.meta.MetaSerializer +import io.ktor.application.ApplicationCall +import io.ktor.http.ContentType +import io.ktor.http.cio.websocket.Frame +import io.ktor.response.respondText +import kotlinx.io.asBinary +import kotlinx.serialization.UnstableDefault +import kotlinx.serialization.json.Json +import kotlinx.serialization.json.JsonObjectBuilder +import kotlinx.serialization.json.json + +fun Frame.toEnvelope(): Envelope { + return data.asBinary().readWith(TaggedEnvelopeFormat) +} + +fun Envelope.toFrame(): Frame { + val data = buildByteArray { + writeWith(TaggedEnvelopeFormat,this@toFrame) + } + return Frame.Binary(false, data) +} + +suspend fun ApplicationCall.respondJson(builder: JsonObjectBuilder.() -> Unit) { + val json = json(builder) + respondText(json.toString(), contentType = ContentType.Application.Json) +} + +@OptIn(UnstableDefault::class) +suspend fun ApplicationCall.respondMessage(message: DeviceMessage) { + respondText(Json.stringify(MetaSerializer,message.toMeta()), contentType = ContentType.Application.Json) +} + +suspend fun ApplicationCall.respondMessage(builder: DeviceMessage.() -> Unit) { + respondMessage(DeviceMessage(builder)) +} + +suspend fun ApplicationCall.respondFail(builder: DeviceMessage.() -> Unit) { + respondMessage(DeviceMessage.fail(null, builder)) +} \ No newline at end of file diff --git a/dataforge-control-server/src/main/kotlin/hep/dataforge/control/server/deviceWebServer.kt b/dataforge-device-server/src/main/kotlin/hep/dataforge/control/server/deviceWebServer.kt similarity index 51% rename from dataforge-control-server/src/main/kotlin/hep/dataforge/control/server/deviceWebServer.kt rename to dataforge-device-server/src/main/kotlin/hep/dataforge/control/server/deviceWebServer.kt index 5aa58ea..9f31b24 100644 --- a/dataforge-control-server/src/main/kotlin/hep/dataforge/control/server/deviceWebServer.kt +++ b/dataforge-device-server/src/main/kotlin/hep/dataforge/control/server/deviceWebServer.kt @@ -5,21 +5,22 @@ package hep.dataforge.control.server import hep.dataforge.control.api.Device import hep.dataforge.control.controllers.DeviceMessage import hep.dataforge.control.controllers.MessageController +import hep.dataforge.control.controllers.MessageController.Companion.GET_PROPERTY_ACTION +import hep.dataforge.control.controllers.MessageController.Companion.SET_PROPERTY_ACTION import hep.dataforge.control.controllers.property import hep.dataforge.meta.toJson import hep.dataforge.meta.toMeta import hep.dataforge.meta.toMetaItem import hep.dataforge.meta.wrap import io.ktor.application.* +import io.ktor.features.CORS import io.ktor.features.ContentNegotiation import io.ktor.features.StatusPages import io.ktor.html.respondHtml -import io.ktor.http.ContentType import io.ktor.http.HttpStatusCode import io.ktor.request.receiveText import io.ktor.response.respond import io.ktor.response.respondRedirect -import io.ktor.response.respondText import io.ktor.routing.* import io.ktor.serialization.json import io.ktor.server.cio.CIO @@ -40,7 +41,9 @@ import kotlinx.html.h1 import kotlinx.html.head import kotlinx.html.title import kotlinx.serialization.UnstableDefault -import kotlinx.serialization.json.* +import kotlinx.serialization.json.Json +import kotlinx.serialization.json.JsonObject +import kotlinx.serialization.json.jsonArray /** * Create and start a web server for several devices @@ -48,15 +51,18 @@ import kotlinx.serialization.json.* fun CoroutineScope.startDeviceServer( devices: Map, port: Int = 8111, - host: String = "0.0.0.0" + host: String = "localhost" ): ApplicationEngine { val controllers = devices.mapValues { MessageController(it.value, it.key, this) } - return embeddedServer(CIO, port, host) { + return this.embeddedServer(CIO, port, host) { install(WebSockets) + install(CORS) { + anyHost() + } install(ContentNegotiation) { json() } @@ -65,23 +71,21 @@ fun CoroutineScope.startDeviceServer( call.respond(HttpStatusCode.BadRequest, cause.message ?: "") } } + deviceModule(controllers) routing { - routeDevices(controllers) get("/") { call.respondRedirect("/dashboard") } - } }.start() } -suspend fun ApplicationCall.respondJson(builder: JsonObjectBuilder.() -> Unit) { - val json = json(builder) - respondText(json.toString(), contentType = ContentType.Application.Json) +fun ApplicationEngine.whenStarted(callback: Application.() -> Unit){ + environment.monitor.subscribe(ApplicationStarted, callback) } -const val WEB_SERVER_TARGET = "@webServer" +const val WEB_SERVER_TARGET = "@webServer" private suspend fun ApplicationCall.message(target: MessageController) { val body = receiveText() @@ -98,8 +102,8 @@ private suspend fun ApplicationCall.message(target: MessageController) { private suspend fun ApplicationCall.getProperty(target: MessageController) { val property: String by parameters val request = DeviceMessage { - action = Device.GET_PROPERTY_ACTION - source = WEB_SERVER_TARGET + action = GET_PROPERTY_ACTION + origin = WEB_SERVER_TARGET this.target = target.deviceTarget property { name = property @@ -116,8 +120,8 @@ private suspend fun ApplicationCall.setProperty(target: MessageController) { val json = Json.parseJson(body) val request = DeviceMessage { - action = Device.SET_PROPERTY_ACTION - source = WEB_SERVER_TARGET + action = SET_PROPERTY_ACTION + origin = WEB_SERVER_TARGET this.target = target.deviceTarget property { name = property @@ -129,88 +133,99 @@ private suspend fun ApplicationCall.setProperty(target: MessageController) { respondMessage(response) } -fun Routing.routeDevices(targets: Map) { - this.application.feature(WebSockets) - +@OptIn(KtorExperimentalAPI::class) +fun Application.deviceModule(targets: Map, route: String = "/") { + if(featureOrNull(WebSockets) == null) { + install(WebSockets) + } + if(featureOrNull(CORS)==null){ + install(CORS) { + anyHost() + } + } fun generateFlow(target: String?) = if (target == null) { targets.values.asFlow().flatMapMerge { it.output() } } else { targets[target]?.output() ?: error("The device with target $target not found") } - - get("dashboard") { - call.respondHtml { - head { - title("Device server dashboard") - } - body { - h1 { - +"Under construction" - } - } - } - } - - get("list") { - call.respondJson { - targets.values.forEach { controller -> - "target" to controller.deviceTarget - val device = controller.device - "properties" to jsonArray { - device.propertyDescriptors.forEach { descriptor -> - +descriptor.config.toJson() + routing { + route(route) { + get("dashboard") { + call.respondHtml { + head { + title("Device server dashboard") } - } - "actions" to jsonArray { - device.actionDescriptors.forEach { actionDescriptor -> - +actionDescriptor.config.toJson() + body { + h1 { + +"Under construction" + } } } } - } - } - //Check if application supports websockets and if it does add a push channel - if (this.application.featureOrNull(WebSockets) != null) { - webSocket("ws") { - //subscribe on device - val target: String? by call.request.queryParameters - try { - application.log.debug("Opened server socket for ${call.request.queryParameters}") - - generateFlow(target).collect { - outgoing.send(it.toFrame()) + get("list") { + call.respondJson { + targets.values.forEach { controller -> + "target" to controller.deviceTarget + val device = controller.device + "properties" to jsonArray { + device.propertyDescriptors.forEach { descriptor -> + +descriptor.config.toJson() + } + } + "actions" to jsonArray { + device.actionDescriptors.forEach { actionDescriptor -> + +actionDescriptor.config.toJson() + } + } + } } - - } catch (ex: Exception) { - application.log.debug("Closed server socket for ${call.request.queryParameters}") } - } - } + //Check if application supports websockets and if it does add a push channel + if (this.application.featureOrNull(WebSockets) != null) { + webSocket("ws") { + //subscribe on device + val target: String? by call.request.queryParameters - post("message") { - val target: String by call.request.queryParameters - val controller = targets[target] ?: throw IllegalArgumentException("Target $target not found in $targets") - call.message(controller) - } + try { + application.log.debug("Opened server socket for ${call.request.queryParameters}") - route("{target}") { - //global route for the device + generateFlow(target).collect { + outgoing.send(it.toFrame()) + } - route("{property}") { - get("get") { - val target: String by call.parameters - val controller = targets[target] - ?: throw IllegalArgumentException("Target $target not found in $targets") - - call.getProperty(controller) + } catch (ex: Exception) { + application.log.debug("Closed server socket for ${call.request.queryParameters}") + } + } } - post("set") { - val target: String by call.parameters + + post("message") { + val target: String by call.request.queryParameters val controller = targets[target] ?: throw IllegalArgumentException("Target $target not found in $targets") + call.message(controller) + } - call.setProperty(controller) + route("{target}") { + //global route for the device + + route("{property}") { + get("get") { + val target: String by call.parameters + val controller = targets[target] + ?: throw IllegalArgumentException("Target $target not found in $targets") + + call.getProperty(controller) + } + post("set") { + val target: String by call.parameters + val controller = + targets[target] ?: throw IllegalArgumentException("Target $target not found in $targets") + + call.setProperty(controller) + } + } } } } diff --git a/dataforge-control-server/src/main/kotlin/hep/dataforge/control/server/sse.kt b/dataforge-device-server/src/main/kotlin/hep/dataforge/control/server/sse.kt similarity index 100% rename from dataforge-control-server/src/main/kotlin/hep/dataforge/control/server/sse.kt rename to dataforge-device-server/src/main/kotlin/hep/dataforge/control/server/sse.kt diff --git a/demo/build.gradle.kts b/demo/build.gradle.kts index 1cea9ce..700a287 100644 --- a/demo/build.gradle.kts +++ b/demo/build.gradle.kts @@ -1,7 +1,7 @@ plugins { kotlin("jvm") version "1.3.72" id("org.openjfx.javafxplugin") version "0.0.8" - `application` + application } val plotlyVersion: String by rootProject.extra @@ -16,8 +16,8 @@ repositories{ } dependencies{ - implementation(project(":dataforge-control-core")) - implementation(project(":dataforge-control-server")) + implementation(project(":dataforge-device-core")) + implementation(project(":dataforge-device-server")) implementation("no.tornado:tornadofx:1.7.20") implementation(kotlin("stdlib-jdk8")) implementation("scientifik:plotlykt-server:$plotlyVersion") diff --git a/demo/src/main/kotlin/hep/dataforge/control/demo/DemoControllerView.kt b/demo/src/main/kotlin/hep/dataforge/control/demo/DemoControllerView.kt index cf22fa4..ba3b733 100644 --- a/demo/src/main/kotlin/hep/dataforge/control/demo/DemoControllerView.kt +++ b/demo/src/main/kotlin/hep/dataforge/control/demo/DemoControllerView.kt @@ -1,12 +1,12 @@ package hep.dataforge.control.demo +import io.ktor.server.engine.ApplicationEngine import javafx.scene.Parent import javafx.scene.control.Slider import javafx.scene.layout.Priority import javafx.stage.Stage import kotlinx.coroutines.* import org.slf4j.LoggerFactory -import scientifik.plotly.server.PlotlyServer import tornadofx.* import java.awt.Desktop import java.net.URI @@ -17,19 +17,19 @@ val logger = LoggerFactory.getLogger("Demo") class DemoController : Controller(), CoroutineScope { var device: DemoDevice? = null - var server: PlotlyServer? = null + var server: ApplicationEngine? = null override val coroutineContext: CoroutineContext = GlobalScope.newCoroutineContext(Dispatchers.Default) + Job() fun init() { launch { device = DemoDevice(this) - server = device?.let { servePlots(it) } + server = device?.let { this.startDemoDeviceServer(it) } } } fun shutdown() { logger.info("Shutting down...") - server?.stop() + server?.stop(1000, 5000) logger.info("Visualization server stopped") device?.close() logger.info("Device server stopped") @@ -89,7 +89,9 @@ class DemoControllerView : View(title = " Demo controller remote") { useMaxWidth = true action { controller.server?.run { - val uri = URI("http", null, host, port, null, null, null) + val host = "localhost"//environment.connectors.first().host + val port = environment.connectors.first().port + val uri = URI("http", null, host, port, "/plots", null, null) Desktop.getDesktop().browse(uri) } } diff --git a/demo/src/main/kotlin/hep/dataforge/control/demo/demoDeviceDisplay.kt b/demo/src/main/kotlin/hep/dataforge/control/demo/demoDeviceDisplay.kt deleted file mode 100644 index 668fa4b..0000000 --- a/demo/src/main/kotlin/hep/dataforge/control/demo/demoDeviceDisplay.kt +++ /dev/null @@ -1,94 +0,0 @@ -package hep.dataforge.control.demo - -import hep.dataforge.meta.double -import kotlinx.coroutines.CoroutineScope -import kotlinx.coroutines.flow.* -import kotlinx.coroutines.launch -import scientifik.plotly.Plotly -import scientifik.plotly.layout -import scientifik.plotly.models.Trace -import scientifik.plotly.server.PlotlyServer -import scientifik.plotly.server.pushUpdates -import scientifik.plotly.server.serve -import scientifik.plotly.trace -import java.util.concurrent.ConcurrentLinkedQueue - -/** - * In-place replacement for absent method from stdlib - */ -fun Flow.windowed(size: Int): Flow> { - val queue = ConcurrentLinkedQueue() - return flow { - this@windowed.collect { - queue.add(it) - if (queue.size >= size) { - queue.poll() - } - emit(queue) - } - } -} - -suspend fun Trace.updateFrom(axisName: String, flow: Flow>) { - flow.collect { - axis(axisName).numbers = it - } -} - -suspend fun Trace.updateXYFrom(flow: Flow>>) { - flow.collect { pairs -> - x.numbers = pairs.map { it.first } - y.numbers = pairs.map { it.second } - } -} - -fun CoroutineScope.servePlots(device: DemoDevice): PlotlyServer { - val sinFlow = device.sin.flow() - val cosFlow = device.cos.flow() - val sinCosFlow = sinFlow.zip(cosFlow) { sin, cos -> - sin.double!! to cos.double!! - } - - return Plotly.serve(this) { - plot(rowNumber = 0, colOrderNumber = 0, size = 6) { - layout { - title = "sin property" - xaxis.title = "point index" - yaxis.title = "sin" - } - trace { - this@servePlots.launch { - val flow: Flow> = sinFlow.mapNotNull { it.double }.windowed(100) - updateFrom(Trace.Y_AXIS, flow) - } - } - } - plot(rowNumber = 0, colOrderNumber = 1, size = 6) { - layout { - title = "cos property" - xaxis.title = "point index" - yaxis.title = "cos" - } - trace { - this@servePlots.launch { - val flow: Flow> = cosFlow.mapNotNull { it.double }.windowed(100) - updateFrom(Trace.Y_AXIS, flow) - } - } - } - plot(rowNumber = 1, colOrderNumber = 0, size = 12) { - layout { - title = "cos vs sin" - xaxis.title = "sin" - yaxis.title = "cos" - } - trace { - name = "non-synchronized" - this@servePlots.launch { - val flow: Flow>> = sinCosFlow.windowed(30) - updateXYFrom(flow) - } - } - } - }.pushUpdates() -} diff --git a/demo/src/main/kotlin/hep/dataforge/control/demo/demoDeviceServer.kt b/demo/src/main/kotlin/hep/dataforge/control/demo/demoDeviceServer.kt new file mode 100644 index 0000000..9233644 --- /dev/null +++ b/demo/src/main/kotlin/hep/dataforge/control/demo/demoDeviceServer.kt @@ -0,0 +1,126 @@ +package hep.dataforge.control.demo + +import hep.dataforge.control.server.startDeviceServer +import hep.dataforge.control.server.whenStarted +import hep.dataforge.meta.double +import io.ktor.application.uninstall +import io.ktor.server.engine.ApplicationEngine +import io.ktor.websocket.WebSockets +import kotlinx.coroutines.CoroutineScope +import kotlinx.coroutines.flow.* +import kotlinx.coroutines.launch +import kotlinx.html.div +import kotlinx.html.link +import scientifik.plotly.layout +import scientifik.plotly.models.Trace +import scientifik.plotly.plot +import scientifik.plotly.server.PlotlyServerConfig +import scientifik.plotly.server.PlotlyUpdateMode +import scientifik.plotly.server.plotlyModule +import scientifik.plotly.trace +import java.util.concurrent.ConcurrentLinkedQueue + +/** + * In-place replacement for absent method from stdlib + */ +fun Flow.windowed(size: Int): Flow> { + val queue = ConcurrentLinkedQueue() + return flow { + this@windowed.collect { + queue.add(it) + if (queue.size >= size) { + queue.poll() + } + emit(queue) + } + } +} + +suspend fun Trace.updateFrom(axisName: String, flow: Flow>) { + flow.collect { + axis(axisName).numbers = it + } +} + +suspend fun Trace.updateXYFrom(flow: Flow>>) { + flow.collect { pairs -> + x.numbers = pairs.map { it.first } + y.numbers = pairs.map { it.second } + } +} + + +fun CoroutineScope.startDemoDeviceServer(device: DemoDevice): ApplicationEngine { + val server = startDeviceServer(mapOf("demo" to device)) + server.whenStarted { + uninstall(WebSockets) + plotlyModule( + "plots", + PlotlyServerConfig { updateMode = PlotlyUpdateMode.PUSH; updateInterval = 50 } + ) { container -> + val sinFlow = device.sin.flow() + val cosFlow = device.cos.flow() + val sinCosFlow = sinFlow.zip(cosFlow) { sin, cos -> + sin.double!! to cos.double!! + } + link { + rel = "stylesheet" + href = "https://stackpath.bootstrapcdn.com/bootstrap/4.5.0/css/bootstrap.min.css" + attributes["integrity"] = "sha384-9aIt2nRpC12Uk9gS9baDl411NQApFmC26EwAOH8WgZl5MYYxFfc+NcPb1dKGj7Sk" + attributes["crossorigin"] = "anonymous" + } + div("row") { + div("col-6") { + plot(container = container) { + layout { + title = "sin property" + xaxis.title = "point index" + yaxis.title = "sin" + } + trace { + launch { + val flow: Flow> = sinFlow.mapNotNull { it.double }.windowed(100) + updateFrom(Trace.Y_AXIS, flow) + } + } + } + } + div("col-6") { + plot(container = container) { + layout { + title = "cos property" + xaxis.title = "point index" + yaxis.title = "cos" + } + trace { + launch { + val flow: Flow> = cosFlow.mapNotNull { it.double }.windowed(100) + updateFrom(Trace.Y_AXIS, flow) + } + } + } + } + } + div("row") { + div("col-12") { + plot(container = container) { + layout { + title = "cos vs sin" + xaxis.title = "sin" + yaxis.title = "cos" + } + trace { + name = "non-synchronized" + launch { + val flow: Flow>> = sinCosFlow.windowed(30) + updateXYFrom(flow) + } + } + } + } + } + } + } + return server +} + diff --git a/settings.gradle.kts b/settings.gradle.kts index a7516a4..2cc6c04 100644 --- a/settings.gradle.kts +++ b/settings.gradle.kts @@ -33,11 +33,12 @@ pluginManagement { } } -rootProject.name = "dataforge-control" +rootProject.name = "dataforge-device" include( - ":dataforge-control-core", - ":dataforge-control-server", + ":dataforge-device-core", + ":dataforge-device-server", + ":dataforge-device-client", ":demo" )