From 9c5b6db9d1616c2c7822b3dfd7581b221bc5b35b Mon Sep 17 00:00:00 2001 From: Alexander Nozik Date: Wed, 29 Jul 2020 22:35:21 +0300 Subject: [PATCH] Device plugin refactoring and waltz client. --- dataforge-device-client/build.gradle.kts | 19 ++- .../dataforge/control/client/MagixClient.kt | 84 +++++++++++ .../dataforge/control/client/waltzClient.kt | 72 ---------- .../hep/dataforge/control/api/Device.kt | 21 ++- .../hep/dataforge/control/api/DeviceHub.kt | 55 +++---- ...ssageController.kt => DeviceController.kt} | 16 +-- .../control/controllers/DeviceManager.kt | 17 ++- .../control/controllers/DeviceMessage.kt | 18 ++- .../control/controllers/HubController.kt | 103 ++++++++++++++ .../control/server/deviceWebServer.kt | 134 ++++++++---------- demo/build.gradle.kts | 1 + .../control/demo/demoDeviceServer.kt | 3 +- 12 files changed, 332 insertions(+), 211 deletions(-) create mode 100644 dataforge-device-client/src/commonMain/kotlin/hep/dataforge/control/client/MagixClient.kt delete mode 100644 dataforge-device-client/src/jvmMain/kotlin/hep/dataforge/control/client/waltzClient.kt rename dataforge-device-core/src/commonMain/kotlin/hep/dataforge/control/controllers/{MessageController.kt => DeviceController.kt} (88%) create mode 100644 dataforge-device-core/src/commonMain/kotlin/hep/dataforge/control/controllers/HubController.kt diff --git a/dataforge-device-client/build.gradle.kts b/dataforge-device-client/build.gradle.kts index 0453db3..65e7b97 100644 --- a/dataforge-device-client/build.gradle.kts +++ b/dataforge-device-client/build.gradle.kts @@ -7,17 +7,32 @@ val ktorVersion: String by extra("1.3.2") kotlin { + js { + browser { + dceTask { + keep("ktor-ktor-io.\$\$importsForInline\$\$.ktor-ktor-io.io.ktor.utils.io") + } + } + } + sourceSets { - commonMain{ + commonMain { dependencies { implementation(project(":dataforge-device-core")) implementation("io.ktor:ktor-client-core:$ktorVersion") } } - jvmMain{ + jvmMain { dependencies { implementation("io.ktor:ktor-client-cio:$ktorVersion") } } + jsMain { + dependencies { + implementation("io.ktor:ktor-client-js:$ktorVersion") + implementation(npm("text-encoding", "0.7.0")) + implementation(npm("abort-controller", "3.0.0")) + } + } } } \ No newline at end of file diff --git a/dataforge-device-client/src/commonMain/kotlin/hep/dataforge/control/client/MagixClient.kt b/dataforge-device-client/src/commonMain/kotlin/hep/dataforge/control/client/MagixClient.kt new file mode 100644 index 0000000..fc28e60 --- /dev/null +++ b/dataforge-device-client/src/commonMain/kotlin/hep/dataforge/control/client/MagixClient.kt @@ -0,0 +1,84 @@ +package hep.dataforge.control.client + +import hep.dataforge.control.api.respondMessage +import hep.dataforge.control.controllers.DeviceManager +import hep.dataforge.control.controllers.DeviceMessage +import hep.dataforge.meta.toJson +import io.ktor.client.HttpClient +import io.ktor.client.request.post +import io.ktor.http.ContentType +import io.ktor.http.Url +import io.ktor.http.contentType +import kotlinx.coroutines.Job +import kotlinx.coroutines.flow.Flow +import kotlinx.coroutines.flow.collect +import kotlinx.coroutines.launch +import kotlinx.serialization.json.JsonObject +import kotlinx.serialization.json.json + +/* +{ + "id":"string|number[optional, but desired]", + "parentId": "string|number[optional]", + "target":"string[optional]", + "origin":"string[required]", + "user":"string[optional]", + "action":"string[optional, default='heartbeat']", + "payload":"object[optional]" +} + */ + +/** + * A stateful unique id generator + */ +interface IdGenerator{ + operator fun invoke(message: DeviceMessage): String +} + +object MagixClient { + /** + * Convert a [DeviceMessage] to [Waltz format](https://github.com/waltz-controls/rfc/tree/master/1) + */ + fun DeviceMessage.toWaltz(id: String, parentId: String? = null): JsonObject = json { + "id" to id + if (parentId != null) { + "parentId" to parentId + } + "target" to "magix" + "origin" to "df" + "payload" to config.toJson() + } + + fun buildCallback(url: Url, idGenerator: IdGenerator): suspend (DeviceMessage) -> Unit { + val client = HttpClient() + return { message -> + client.post(url) { + val messageId = idGenerator(message) + val waltzMessage = message.toWaltz(messageId) + this.contentType(ContentType.Application.Json) + body = waltzMessage.toString() + } + } + } + +} + +/** + * Event loop for magix input and output flows + */ +fun DeviceManager.startMagix( + inbox: Flow, // Inbox flow like SSE + outbox: suspend (DeviceMessage) -> Unit // outbox callback +): Job = context.launch { + launch { + controller.messageOutput().collect { message -> + outbox.invoke(message) + } + } + launch { + inbox.collect { message -> + val response = respondMessage(message) + outbox.invoke(response) + } + } +} diff --git a/dataforge-device-client/src/jvmMain/kotlin/hep/dataforge/control/client/waltzClient.kt b/dataforge-device-client/src/jvmMain/kotlin/hep/dataforge/control/client/waltzClient.kt deleted file mode 100644 index 0d49bb3..0000000 --- a/dataforge-device-client/src/jvmMain/kotlin/hep/dataforge/control/client/waltzClient.kt +++ /dev/null @@ -1,72 +0,0 @@ -package hep.dataforge.control.client - -import hep.dataforge.control.api.getDevice -import hep.dataforge.control.controllers.DeviceManager -import hep.dataforge.control.controllers.DeviceMessage -import hep.dataforge.control.controllers.MessageController -import hep.dataforge.meta.Meta -import hep.dataforge.meta.toJson -import hep.dataforge.meta.toMeta -import hep.dataforge.meta.wrap -import io.ktor.client.HttpClient -import io.ktor.client.request.post -import io.ktor.http.ContentType -import io.ktor.http.Url -import io.ktor.http.contentType -import kotlinx.coroutines.Job -import kotlinx.coroutines.flow.* -import kotlinx.coroutines.launch -import kotlinx.serialization.json.JsonObject -import kotlinx.serialization.json.json - -/* -{ - "id":"string|number[optional, but desired]", - "parentId": "string|number[optional]", - "target":"string[optional]", - "origin":"string[required]", - "user":"string[optional]", - "action":"string[optional, default='heartbeat']", - "payload":"object[optional]" -} - */ - -/** - * Convert a [DeviceMessage] to [Waltz format](https://github.com/waltz-controls/rfc/tree/master/1) - */ -fun DeviceMessage.toWaltz(id: String, parentId: String): JsonObject = json { - "id" to id - "parentId" to parentId - "target" to "magix" - "origin" to "df" - "payload" to config.toJson() -} - -fun DeviceMessage.fromWaltz(json: JsonObject): DeviceMessage = - DeviceMessage.wrap(json["payload"]?.jsonObject?.toMeta() ?: Meta.EMPTY) - -fun DeviceManager.startWaltzClient( - waltzUrl: Url, - deviceNames: Collection = devices.keys.map { it.toString() } -): Job { - - val controllers = deviceNames.map { name -> - val device = getDevice(name) - MessageController(device, name, context) - } - - val client = HttpClient() - - val outputFlow = controllers.asFlow().flatMapMerge { - it.output() - }.filter { it.data == null }.map { DeviceMessage.wrap(it.meta) } - - return context.launch { - outputFlow.collect { message -> - client.post(waltzUrl){ - this.contentType(ContentType.Application.Json) - body = message.config.toJson().toString() - } - } - } -} \ No newline at end of file diff --git a/dataforge-device-core/src/commonMain/kotlin/hep/dataforge/control/api/Device.kt b/dataforge-device-core/src/commonMain/kotlin/hep/dataforge/control/api/Device.kt index 45c715c..7648704 100644 --- a/dataforge-device-core/src/commonMain/kotlin/hep/dataforge/control/api/Device.kt +++ b/dataforge-device-core/src/commonMain/kotlin/hep/dataforge/control/api/Device.kt @@ -1,8 +1,12 @@ package hep.dataforge.control.api +import hep.dataforge.control.api.Device.Companion.ACTION_LIST_ACTION import hep.dataforge.control.api.Device.Companion.DEVICE_TARGET +import hep.dataforge.control.api.Device.Companion.EXECUTE_ACTION +import hep.dataforge.control.api.Device.Companion.GET_PROPERTY_ACTION +import hep.dataforge.control.api.Device.Companion.PROPERTY_LIST_ACTION +import hep.dataforge.control.api.Device.Companion.SET_PROPERTY_ACTION import hep.dataforge.control.controllers.DeviceMessage -import hep.dataforge.control.controllers.MessageController import hep.dataforge.control.controllers.MessageData import hep.dataforge.io.Envelope import hep.dataforge.io.Responder @@ -84,6 +88,11 @@ interface Device: Closeable, Responder { companion object{ const val DEVICE_TARGET = "device" + const val GET_PROPERTY_ACTION = "read" + const val SET_PROPERTY_ACTION = "write" + const val EXECUTE_ACTION = "execute" + const val PROPERTY_LIST_ACTION = "propertyList" + const val ACTION_LIST_ACTION = "actionList" } } @@ -91,7 +100,7 @@ suspend fun Device.respondMessage( request: DeviceMessage ): DeviceMessage { val result: List = when (val action = request.type) { - MessageController.GET_PROPERTY_ACTION -> { + GET_PROPERTY_ACTION -> { request.data.map { property -> MessageData { name = property.name @@ -99,7 +108,7 @@ suspend fun Device.respondMessage( } } } - MessageController.SET_PROPERTY_ACTION -> { + SET_PROPERTY_ACTION -> { request.data.map { property -> val propertyName: String = property.name val propertyValue = property.value @@ -114,7 +123,7 @@ suspend fun Device.respondMessage( } } } - MessageController.EXECUTE_ACTION -> { + EXECUTE_ACTION -> { request.data.map { payload -> MessageData { name = payload.name @@ -122,7 +131,7 @@ suspend fun Device.respondMessage( } } } - MessageController.PROPERTY_LIST_ACTION -> { + PROPERTY_LIST_ACTION -> { propertyDescriptors.map { descriptor -> MessageData { name = descriptor.name @@ -131,7 +140,7 @@ suspend fun Device.respondMessage( } } - MessageController.ACTION_LIST_ACTION -> { + ACTION_LIST_ACTION -> { actionDescriptors.map { descriptor -> MessageData { name = descriptor.name diff --git a/dataforge-device-core/src/commonMain/kotlin/hep/dataforge/control/api/DeviceHub.kt b/dataforge-device-core/src/commonMain/kotlin/hep/dataforge/control/api/DeviceHub.kt index d2c7862..7704d9e 100644 --- a/dataforge-device-core/src/commonMain/kotlin/hep/dataforge/control/api/DeviceHub.kt +++ b/dataforge-device-core/src/commonMain/kotlin/hep/dataforge/control/api/DeviceHub.kt @@ -1,9 +1,11 @@ package hep.dataforge.control.api +import hep.dataforge.control.controllers.DeviceMessage +import hep.dataforge.io.Envelope import hep.dataforge.meta.MetaItem +import hep.dataforge.meta.get +import hep.dataforge.meta.string import hep.dataforge.names.Name -import hep.dataforge.names.NameToken -import hep.dataforge.names.asName import hep.dataforge.names.toName import hep.dataforge.provider.Provider @@ -11,13 +13,15 @@ import hep.dataforge.provider.Provider * A hub that could locate multiple devices and redirect actions to them */ interface DeviceHub : Provider { - val devices: Map + val devices: Map override val defaultTarget: String get() = Device.DEVICE_TARGET + override val defaultChainTarget: String get() = Device.DEVICE_TARGET + override fun provideTop(target: String): Map { if (target == Device.DEVICE_TARGET) { - return devices.mapKeys { it.key.asName() } + return devices } else { throw IllegalArgumentException("Target $target is not supported for $this") } @@ -28,31 +32,30 @@ interface DeviceHub : Provider { } } -/** - * Resolve the device by its full name if it is present. Hubs are resolved recursively. - */ -fun DeviceHub.getDevice(name: Name): Device = when (name.length) { - 0 -> (this as? Device) ?: error("The DeviceHub is resolved by name but it is not a Device") - 1 -> { - val token = name.first()!! - devices[token] ?: error("Device with name $token not found in the hub $this") - } - else -> { - val hub = getDevice(name.cutLast()) as? DeviceHub - ?: error("The device with name ${name.cutLast()} does not exist or is not a hub") - hub.getDevice(name.last()!!.asName()) - } +operator fun DeviceHub.get(deviceName: Name) = + devices[deviceName] ?: error("Device with name $deviceName not found in $this") + +operator fun DeviceHub.get(deviceName: String) = get(deviceName.toName()) + +suspend fun DeviceHub.getProperty(deviceName: Name, propertyName: String): MetaItem<*> = + this[deviceName].getProperty(propertyName) + +suspend fun DeviceHub.setProperty(deviceName: Name, propertyName: String, value: MetaItem<*>) { + this[deviceName].setProperty(propertyName, value) } +suspend fun DeviceHub.exec(deviceName: Name, command: String, argument: MetaItem<*>?): MetaItem<*>? = + this[deviceName].exec(command, argument) -fun DeviceHub.getDevice(deviceName: String) = getDevice(deviceName.toName()) +suspend fun DeviceHub.respondMessage(request: DeviceMessage): DeviceMessage { + val device = this[request.target?.toName() ?: Name.EMPTY] -suspend fun DeviceHub.getProperty(deviceName: String, propertyName: String): MetaItem<*> = - getDevice(deviceName).getProperty(propertyName) - -suspend fun DeviceHub.setProperty(deviceName: String, propertyName: String, value: MetaItem<*>) { - getDevice(deviceName).setProperty(propertyName, value) + return device.respondMessage(request) } -suspend fun DeviceHub.exec(deviceName: String, command: String, argument: MetaItem<*>?): MetaItem<*>? = - getDevice(deviceName).exec(command, argument) \ No newline at end of file +suspend fun DeviceHub.respond(request: Envelope): Envelope { + val target = request.meta[DeviceMessage.TARGET_KEY].string + val device = this[target?.toName() ?: Name.EMPTY] + + return device.respond(request) +} \ No newline at end of file diff --git a/dataforge-device-core/src/commonMain/kotlin/hep/dataforge/control/controllers/MessageController.kt b/dataforge-device-core/src/commonMain/kotlin/hep/dataforge/control/controllers/DeviceController.kt similarity index 88% rename from dataforge-device-core/src/commonMain/kotlin/hep/dataforge/control/controllers/MessageController.kt rename to dataforge-device-core/src/commonMain/kotlin/hep/dataforge/control/controllers/DeviceController.kt index 4ea11cc..dfe16c2 100644 --- a/dataforge-device-core/src/commonMain/kotlin/hep/dataforge/control/controllers/MessageController.kt +++ b/dataforge-device-core/src/commonMain/kotlin/hep/dataforge/control/controllers/DeviceController.kt @@ -4,6 +4,7 @@ import hep.dataforge.control.api.Device import hep.dataforge.control.api.DeviceListener import hep.dataforge.control.api.respondMessage import hep.dataforge.control.controllers.DeviceMessage.Companion.PROPERTY_CHANGED_ACTION +import hep.dataforge.io.Consumer import hep.dataforge.io.Envelope import hep.dataforge.io.Responder import hep.dataforge.io.SimpleEnvelope @@ -14,14 +15,7 @@ import kotlinx.coroutines.flow.consumeAsFlow import kotlinx.coroutines.launch import kotlinx.io.Binary -/** - * A consumer of envelopes - */ -interface Consumer { - fun consume(message: Envelope): Unit -} - -class MessageController( +class DeviceController( val device: Device, val deviceTarget: String, val scope: CoroutineScope = device.scope @@ -95,10 +89,6 @@ class MessageController( companion object { - const val GET_PROPERTY_ACTION = "read" - const val SET_PROPERTY_ACTION = "write" - const val EXECUTE_ACTION = "execute" - const val PROPERTY_LIST_ACTION = "propertyList" - const val ACTION_LIST_ACTION = "actionList" + } } \ No newline at end of file diff --git a/dataforge-device-core/src/commonMain/kotlin/hep/dataforge/control/controllers/DeviceManager.kt b/dataforge-device-core/src/commonMain/kotlin/hep/dataforge/control/controllers/DeviceManager.kt index 3527689..d0186d9 100644 --- a/dataforge-device-core/src/commonMain/kotlin/hep/dataforge/control/controllers/DeviceManager.kt +++ b/dataforge-device-core/src/commonMain/kotlin/hep/dataforge/control/controllers/DeviceManager.kt @@ -8,7 +8,6 @@ import hep.dataforge.control.api.Device import hep.dataforge.control.api.DeviceHub import hep.dataforge.meta.Meta import hep.dataforge.names.Name -import hep.dataforge.names.NameToken import kotlin.reflect.KClass class DeviceManager : AbstractPlugin(), DeviceHub { @@ -17,12 +16,15 @@ class DeviceManager : AbstractPlugin(), DeviceHub { /** * Actual list of connected devices */ - private val top = HashMap() - override val devices: Map get() = top + private val top = HashMap() + override val devices: Map get() = top - fun registerDevice(name: String, device: Device, index: String? = null) { - val token = NameToken(name, index) - top[token] = device + val controller by lazy { + HubController(this, context) + } + + fun registerDevice(name: Name, device: Device) { + top[name] = device } override fun provideTop(target: String): Map = super.provideTop(target) @@ -36,4 +38,5 @@ class DeviceManager : AbstractPlugin(), DeviceHub { } -val Context.devices: DeviceManager get() = plugins.fetch(DeviceManager) \ No newline at end of file +val Context.devices: DeviceManager get() = plugins.fetch(DeviceManager) + diff --git a/dataforge-device-core/src/commonMain/kotlin/hep/dataforge/control/controllers/DeviceMessage.kt b/dataforge-device-core/src/commonMain/kotlin/hep/dataforge/control/controllers/DeviceMessage.kt index d7526e6..e08fdd1 100644 --- a/dataforge-device-core/src/commonMain/kotlin/hep/dataforge/control/controllers/DeviceMessage.kt +++ b/dataforge-device-core/src/commonMain/kotlin/hep/dataforge/control/controllers/DeviceMessage.kt @@ -1,6 +1,6 @@ package hep.dataforge.control.controllers -import hep.dataforge.control.controllers.DeviceMessage.Companion.DATA_VALUE_KEY +import hep.dataforge.control.api.Device.Companion.GET_PROPERTY_ACTION import hep.dataforge.io.SimpleEnvelope import hep.dataforge.meta.* import hep.dataforge.names.asName @@ -10,9 +10,9 @@ import kotlinx.serialization.KSerializer import kotlinx.serialization.SerialDescriptor class DeviceMessage : Scheme() { - var source by string() - var target by string() - var type by string(default = MessageController.GET_PROPERTY_ACTION, key = MESSAGE_TYPE_KEY) + var source by string(key = SOURCE_KEY) + var target by string(key = TARGET_KEY) + var type by string(default = GET_PROPERTY_ACTION, key = MESSAGE_TYPE_KEY) var comment by string() var status by string(RESPONSE_OK_STATUS) var data: List @@ -28,9 +28,11 @@ class DeviceMessage : Scheme() { spec.invoke(block).also { config.append(MESSAGE_DATA_KEY, it) } companion object : SchemeSpec(::DeviceMessage), KSerializer { - val MESSAGE_TYPE_KEY = "action".asName() + val SOURCE_KEY = "source".asName() + val TARGET_KEY = "target".asName() + val MESSAGE_TYPE_KEY = "type".asName() val MESSAGE_DATA_KEY = "data".asName() - val DATA_VALUE_KEY = "value".asName() + const val RESPONSE_OK_STATUS = "response.OK" const val RESPONSE_FAIL_STATUS = "response.FAIL" const val PROPERTY_CHANGED_ACTION = "event.propertyChange" @@ -67,7 +69,9 @@ class MessageData : Scheme() { var name by string { error("Property name could not be empty") } var value by item(key = DATA_VALUE_KEY) - companion object : SchemeSpec(::MessageData) + companion object : SchemeSpec(::MessageData) { + val DATA_VALUE_KEY = "value".asName() + } } @DFBuilder diff --git a/dataforge-device-core/src/commonMain/kotlin/hep/dataforge/control/controllers/HubController.kt b/dataforge-device-core/src/commonMain/kotlin/hep/dataforge/control/controllers/HubController.kt new file mode 100644 index 0000000..f44f2ea --- /dev/null +++ b/dataforge-device-core/src/commonMain/kotlin/hep/dataforge/control/controllers/HubController.kt @@ -0,0 +1,103 @@ +package hep.dataforge.control.controllers + +import hep.dataforge.control.api.DeviceHub +import hep.dataforge.control.api.DeviceListener +import hep.dataforge.control.api.get +import hep.dataforge.control.api.respondMessage +import hep.dataforge.io.Consumer +import hep.dataforge.io.Envelope +import hep.dataforge.io.Responder +import hep.dataforge.io.SimpleEnvelope +import hep.dataforge.meta.* +import hep.dataforge.names.Name +import hep.dataforge.names.toName +import kotlinx.coroutines.CoroutineScope +import kotlinx.coroutines.channels.Channel +import kotlinx.coroutines.flow.consumeAsFlow +import kotlinx.coroutines.isActive +import kotlinx.coroutines.launch + +class HubController( + val hub: DeviceHub, + val scope: CoroutineScope +) : Consumer, Responder { + + private val messageOutbox = Channel(Channel.CONFLATED) + + private val envelopeOutbox = Channel(Channel.CONFLATED) + + fun messageOutput() = messageOutbox.consumeAsFlow() + + fun envelopeOutput() = envelopeOutbox.consumeAsFlow() + + private val packJob = scope.launch { + while (isActive) { + val message = messageOutbox.receive() + envelopeOutbox.send(message.wrap()) + } + } + + private val listeners: Map = hub.devices.mapValues { (name, device) -> + object : DeviceListener { + override fun propertyChanged(propertyName: String, value: MetaItem<*>?) { + if (value == null) return + scope.launch { + val change = DeviceMessage.ok { + source = name.toString() + type = DeviceMessage.PROPERTY_CHANGED_ACTION + data { + this.name = propertyName + this.value = value + } + } + + messageOutbox.send(change) + } + } + }.also { + device.registerListener(it) + } + } + + suspend fun respondMessage(message: DeviceMessage): DeviceMessage { + return try { + val targetName = message.target?.toName() ?: Name.EMPTY + val device = hub[targetName] + device.respondMessage(message).apply { + target = message.source + source = targetName.toString() + } + } catch (ex: Exception) { + DeviceMessage.fail { + comment = ex.message + } + } + } + + override suspend fun respond(request: Envelope): Envelope { + val targetName = request.meta[DeviceMessage.TARGET_KEY].string?.toName() ?: Name.EMPTY + return try { + val device = hub[targetName] + if (request.data == null) { + respondMessage(DeviceMessage.wrap(request.meta)).wrap() + } else { + val response = device.respond(request) + return SimpleEnvelope(response.meta.edit { + DeviceMessage.TARGET_KEY put request.meta[DeviceMessage.SOURCE_KEY].string + DeviceMessage.SOURCE_KEY put targetName.toString() + }, response.data) + } + } catch (ex: Exception) { + DeviceMessage.fail { + comment = ex.message + }.wrap() + } + } + + override fun consume(message: Envelope) { + // Fire the respond procedure and forget about the result + scope.launch { + respond(message) + } + } +} \ No newline at end of file diff --git a/dataforge-device-server/src/main/kotlin/hep/dataforge/control/server/deviceWebServer.kt b/dataforge-device-server/src/main/kotlin/hep/dataforge/control/server/deviceWebServer.kt index a9a4a33..fcd831b 100644 --- a/dataforge-device-server/src/main/kotlin/hep/dataforge/control/server/deviceWebServer.kt +++ b/dataforge-device-server/src/main/kotlin/hep/dataforge/control/server/deviceWebServer.kt @@ -2,12 +2,12 @@ package hep.dataforge.control.server -import hep.dataforge.control.api.getDevice +import hep.dataforge.control.api.Device.Companion.GET_PROPERTY_ACTION +import hep.dataforge.control.api.Device.Companion.SET_PROPERTY_ACTION +import hep.dataforge.control.api.get +import hep.dataforge.control.api.respondMessage import hep.dataforge.control.controllers.DeviceManager import hep.dataforge.control.controllers.DeviceMessage -import hep.dataforge.control.controllers.MessageController -import hep.dataforge.control.controllers.MessageController.Companion.GET_PROPERTY_ACTION -import hep.dataforge.control.controllers.MessageController.Companion.SET_PROPERTY_ACTION import hep.dataforge.control.controllers.data import hep.dataforge.meta.toJson import hep.dataforge.meta.toMeta @@ -32,9 +32,7 @@ import io.ktor.websocket.webSocket import kotlinx.coroutines.CoroutineScope import kotlinx.coroutines.ExperimentalCoroutinesApi import kotlinx.coroutines.FlowPreview -import kotlinx.coroutines.flow.asFlow import kotlinx.coroutines.flow.collect -import kotlinx.coroutines.flow.flatMapMerge import kotlinx.html.* import kotlinx.serialization.UnstableDefault import kotlinx.serialization.json.Json @@ -79,68 +77,22 @@ fun ApplicationEngine.whenStarted(callback: Application.() -> Unit) { const val WEB_SERVER_TARGET = "@webServer" -private suspend fun ApplicationCall.message(target: MessageController) { - val body = receiveText() - val json = Json.parseJson(body) as? JsonObject - ?: throw IllegalArgumentException("The body is not a json object") - val meta = json.toMeta() - - val request = DeviceMessage.wrap(meta) - - val response = target.respondMessage(request) - respondMessage(response) -} - -private suspend fun ApplicationCall.getProperty(target: MessageController) { - val property: String by parameters - val request = DeviceMessage { - type = GET_PROPERTY_ACTION - source = WEB_SERVER_TARGET - this.target = target.deviceTarget - data { - name = property - } - } - - val response = target.respondMessage(request) - respondMessage(response) -} - -private suspend fun ApplicationCall.setProperty(target: MessageController) { - val property: String by parameters - val body = receiveText() - val json = Json.parseJson(body) - - val request = DeviceMessage { - type = SET_PROPERTY_ACTION - source = WEB_SERVER_TARGET - this.target = target.deviceTarget - data { - name = property - value = json.toMetaItem() - } - } - - val response = target.respondMessage(request) - respondMessage(response) -} - @OptIn(KtorExperimentalAPI::class) fun Application.deviceModule( manager: DeviceManager, deviceNames: Collection = manager.devices.keys.map { it.toString() }, route: String = "/" ) { - val controllers = deviceNames.associateWith { name -> - val device = manager.getDevice(name) - MessageController(device, name, manager.context) - } - - fun generateFlow(target: String?) = if (target == null) { - controllers.values.asFlow().flatMapMerge { it.output() } - } else { - controllers[target]?.output() ?: error("The device with target $target not found") - } +// val controllers = deviceNames.associateWith { name -> +// val device = manager.devices[name.toName()] +// DeviceController(device, name, manager.context) +// } +// +// fun generateFlow(target: String?) = if (target == null) { +// controllers.values.asFlow().flatMapMerge { it.output() } +// } else { +// controllers[target]?.output() ?: error("The device with target $target not found") +// } if (featureOrNull(WebSockets) == null) { install(WebSockets) @@ -164,7 +116,7 @@ fun Application.deviceModule( +"Device server dashboard" } deviceNames.forEach { deviceName -> - val device = controllers[deviceName]!!.device + val device = manager[deviceName] div { id = deviceName h2 { +deviceName } @@ -198,9 +150,8 @@ fun Application.deviceModule( get("list") { call.respondJson { - controllers.values.forEach { controller -> - "target" to controller.deviceTarget - val device = controller.device + manager.devices.forEach { (name, device) -> + "target" to name.toString() "properties" to jsonArray { device.propertyDescriptors.forEach { descriptor -> +descriptor.config.toJson() @@ -223,7 +174,7 @@ fun Application.deviceModule( try { application.log.debug("Opened server socket for ${call.request.queryParameters}") - generateFlow(target).collect { + manager.controller.envelopeOutput().collect { outgoing.send(it.toFrame()) } @@ -235,9 +186,16 @@ fun Application.deviceModule( post("message") { val target: String by call.request.queryParameters - val controller = - controllers[target] ?: throw IllegalArgumentException("Target $target not found in $controllers") - call.message(controller) + val device = manager[target] + val body = call.receiveText() + val json = Json.parseJson(body) as? JsonObject + ?: throw IllegalArgumentException("The body is not a json object") + val meta = json.toMeta() + + val request = DeviceMessage.wrap(meta) + + val response = device.respondMessage(request) + call.respondMessage(response) } route("{target}") { @@ -246,18 +204,40 @@ fun Application.deviceModule( route("{property}") { get("get") { val target: String by call.parameters - val controller = controllers[target] - ?: throw IllegalArgumentException("Target $target not found in $controllers") + val device = manager[target] + val property: String by call.parameters + val request = DeviceMessage { + type = GET_PROPERTY_ACTION + source = WEB_SERVER_TARGET + this.target = target + data { + name = property + } + } - call.getProperty(controller) + val response = device.respondMessage(request) + call.respondMessage(response) } post("set") { val target: String by call.parameters - val controller = - controllers[target] - ?: throw IllegalArgumentException("Target $target not found in $controllers") + val device = manager[target] - call.setProperty(controller) + val property: String by call.parameters + val body = call.receiveText() + val json = Json.parseJson(body) + + val request = DeviceMessage { + type = SET_PROPERTY_ACTION + source = WEB_SERVER_TARGET + this.target = target + data { + name = property + value = json.toMetaItem() + } + } + + val response = device.respondMessage(request) + call.respondMessage(response) } } } diff --git a/demo/build.gradle.kts b/demo/build.gradle.kts index c678408..62a250e 100644 --- a/demo/build.gradle.kts +++ b/demo/build.gradle.kts @@ -18,6 +18,7 @@ repositories{ dependencies{ implementation(project(":dataforge-device-core")) implementation(project(":dataforge-device-server")) + implementation(project(":dataforge-device-client")) implementation("no.tornado:tornadofx:1.7.20") implementation(kotlin("stdlib-jdk8")) implementation("scientifik:plotlykt-server:$plotlyVersion") diff --git a/demo/src/main/kotlin/hep/dataforge/control/demo/demoDeviceServer.kt b/demo/src/main/kotlin/hep/dataforge/control/demo/demoDeviceServer.kt index 74e0684..df553e9 100644 --- a/demo/src/main/kotlin/hep/dataforge/control/demo/demoDeviceServer.kt +++ b/demo/src/main/kotlin/hep/dataforge/control/demo/demoDeviceServer.kt @@ -6,6 +6,7 @@ import hep.dataforge.control.server.startDeviceServer import hep.dataforge.control.server.whenStarted import hep.dataforge.meta.double import hep.dataforge.meta.invoke +import hep.dataforge.names.asName import io.ktor.server.engine.ApplicationEngine import kotlinx.coroutines.flow.* import kotlinx.coroutines.launch @@ -49,7 +50,7 @@ suspend fun Trace.updateXYFrom(flow: Flow>>) { fun startDemoDeviceServer(context: Context, device: DemoDevice): ApplicationEngine { - context.devices.registerDevice("demo", device) + context.devices.registerDevice("demo".asName(), device) val server = context.startDeviceServer(context.devices) server.whenStarted { plotlyModule("plots").apply {