From a093f1921ed737f0eac89a781b7fe9db99b756b5 Mon Sep 17 00:00:00 2001 From: Alexander Nozik Date: Sun, 20 Jun 2021 16:37:03 +0300 Subject: [PATCH] Implement Tango messages --- .../kotlin/ru/mipt/npm/controls/api/Device.kt | 2 +- .../ru/mipt/npm/controls/api/DeviceHub.kt | 20 ++- .../controls/controllers/DeviceController.kt | 4 +- .../npm/controls/controllers/HubController.kt | 4 +- .../mipt/npm/controls/client/TangoPayload.kt | 24 --- .../client/{magixClient.kt => dfMagix.kt} | 20 +-- .../ru/mipt/npm/controls/client/tangoMagix.kt | 143 ++++++++++++++++++ .../npm/controls/server/deviceWebServer.kt | 4 +- .../ru/mipt/npm/magix/api/MagixEndpoint.kt | 3 - .../ru/mipt/npm/magix/api/MagixMessage.kt | 5 +- .../mipt/npm/magix/api/MagixMessageFilter.kt | 4 +- .../ru/mipt/npm/magix/api/converters.kt | 2 + .../ru/mipt/npm/magix/client/MagixClient.java | 2 +- .../npm/magix/client/ControlsMagixClient.kt | 4 +- .../ru/mipt/npm/magix/server/magixModule.kt | 6 +- .../npm/magix/service/RSocketMagixEndpoint.kt | 4 +- 16 files changed, 186 insertions(+), 65 deletions(-) delete mode 100644 controls-magix-client/src/commonMain/kotlin/ru/mipt/npm/controls/client/TangoPayload.kt rename controls-magix-client/src/commonMain/kotlin/ru/mipt/npm/controls/client/{magixClient.kt => dfMagix.kt} (79%) create mode 100644 controls-magix-client/src/commonMain/kotlin/ru/mipt/npm/controls/client/tangoMagix.kt diff --git a/controls-core/src/commonMain/kotlin/ru/mipt/npm/controls/api/Device.kt b/controls-core/src/commonMain/kotlin/ru/mipt/npm/controls/api/Device.kt index 119052b..335e68c 100644 --- a/controls-core/src/commonMain/kotlin/ru/mipt/npm/controls/api/Device.kt +++ b/controls-core/src/commonMain/kotlin/ru/mipt/npm/controls/api/Device.kt @@ -36,7 +36,7 @@ public interface Device : Closeable, ContextAware { * Get the value of the property or throw error if property in not defined. * Suspend if property value is not available */ - public suspend fun getProperty(propertyName: String): MetaItem? + public suspend fun getProperty(propertyName: String): MetaItem /** * Invalidate property and force recalculate diff --git a/controls-core/src/commonMain/kotlin/ru/mipt/npm/controls/api/DeviceHub.kt b/controls-core/src/commonMain/kotlin/ru/mipt/npm/controls/api/DeviceHub.kt index ea59626..a3fba5c 100644 --- a/controls-core/src/commonMain/kotlin/ru/mipt/npm/controls/api/DeviceHub.kt +++ b/controls-core/src/commonMain/kotlin/ru/mipt/npm/controls/api/DeviceHub.kt @@ -46,23 +46,29 @@ public interface DeviceHub : Provider { public operator fun DeviceHub.get(nameToken: NameToken): Device = devices[nameToken] ?: error("Device with name $nameToken not found in $this") -public operator fun DeviceHub.get(name: Name): Device? = when { +public fun DeviceHub.getOrNull(name: Name): Device? = when { name.isEmpty() -> this as? Device name.length == 1 -> get(name.firstOrNull()!!) - else -> (get(name.firstOrNull()!!) as? DeviceHub)?.get(name.cutFirst()) + else -> (get(name.firstOrNull()!!) as? DeviceHub)?.getOrNull(name.cutFirst()) } -public operator fun DeviceHub.get(deviceName: String): Device? = get(deviceName.toName()) +public operator fun DeviceHub.get(name: Name): Device = + getOrNull(name) ?: error("Device with name $name not found in $this") -public suspend fun DeviceHub.getProperty(deviceName: Name, propertyName: String): MetaItem? = - this[deviceName]?.getProperty(propertyName) +public fun DeviceHub.getOrNull(nameString: String): Device? = getOrNull(nameString.toName()) + +public operator fun DeviceHub.get(nameString: String): Device = + getOrNull(nameString) ?: error("Device with name $nameString not found in $this") + +public suspend fun DeviceHub.getProperty(deviceName: Name, propertyName: String): MetaItem = + this[deviceName].getProperty(propertyName) public suspend fun DeviceHub.setProperty(deviceName: Name, propertyName: String, value: MetaItem) { - this[deviceName]?.setProperty(propertyName, value) + this[deviceName].setProperty(propertyName, value) } public suspend fun DeviceHub.execute(deviceName: Name, command: String, argument: MetaItem?): MetaItem? = - this[deviceName]?.execute(command, argument) + this[deviceName].execute(command, argument) //suspend fun DeviceHub.respond(request: Envelope): EnvelopeBuilder { diff --git a/controls-core/src/commonMain/kotlin/ru/mipt/npm/controls/controllers/DeviceController.kt b/controls-core/src/commonMain/kotlin/ru/mipt/npm/controls/controllers/DeviceController.kt index 6a72fdc..5e0cba6 100644 --- a/controls-core/src/commonMain/kotlin/ru/mipt/npm/controls/controllers/DeviceController.kt +++ b/controls-core/src/commonMain/kotlin/ru/mipt/npm/controls/controllers/DeviceController.kt @@ -4,7 +4,7 @@ import kotlinx.coroutines.flow.Flow import kotlinx.coroutines.flow.map import space.kscience.dataforge.control.api.Device import space.kscience.dataforge.control.api.DeviceHub -import space.kscience.dataforge.control.api.get +import space.kscience.dataforge.control.api.getOrNull import space.kscience.dataforge.control.messages.* import space.kscience.dataforge.meta.Meta import space.kscience.dataforge.meta.MetaItem @@ -146,7 +146,7 @@ public class DeviceController( public suspend fun DeviceHub.respondMessage(request: DeviceMessage): DeviceMessage { return try { val targetName = request.targetDevice?.toName() ?: Name.EMPTY - val device = this[targetName] ?: error("The device with name $targetName not found in $this") + val device = this.getOrNull(targetName) ?: error("The device with name $targetName not found in $this") DeviceController.respondMessage(device, targetName.toString(), request) } catch (ex: Exception) { DeviceMessage.error(ex, sourceDevice = deviceName, targetDevice = request.sourceDevice) diff --git a/controls-core/src/commonMain/kotlin/ru/mipt/npm/controls/controllers/HubController.kt b/controls-core/src/commonMain/kotlin/ru/mipt/npm/controls/controllers/HubController.kt index 8157349..09347f6 100644 --- a/controls-core/src/commonMain/kotlin/ru/mipt/npm/controls/controllers/HubController.kt +++ b/controls-core/src/commonMain/kotlin/ru/mipt/npm/controls/controllers/HubController.kt @@ -4,7 +4,7 @@ import kotlinx.coroutines.channels.Channel import kotlinx.coroutines.flow.Flow import kotlinx.coroutines.flow.consumeAsFlow import space.kscience.dataforge.control.api.DeviceHub -import space.kscience.dataforge.control.api.get +import space.kscience.dataforge.control.api.getOrNull import space.kscience.dataforge.control.messages.DeviceMessage import space.kscience.dataforge.misc.DFExperimental import space.kscience.dataforge.names.Name @@ -51,7 +51,7 @@ public class HubController( public suspend fun respondMessage(message: DeviceMessage): DeviceMessage = try { val targetName = message.targetDevice?.toName() ?: Name.EMPTY - val device = hub[targetName] ?: error("The device with name $targetName not found in $hub") + val device = hub.getOrNull(targetName) ?: error("The device with name $targetName not found in $hub") DeviceController.respondMessage(device, targetName.toString(), message) } catch (ex: Exception) { DeviceMessage.error(ex, sourceDevice = hub.deviceName, targetDevice = message.sourceDevice) diff --git a/controls-magix-client/src/commonMain/kotlin/ru/mipt/npm/controls/client/TangoPayload.kt b/controls-magix-client/src/commonMain/kotlin/ru/mipt/npm/controls/client/TangoPayload.kt deleted file mode 100644 index 8bce776..0000000 --- a/controls-magix-client/src/commonMain/kotlin/ru/mipt/npm/controls/client/TangoPayload.kt +++ /dev/null @@ -1,24 +0,0 @@ -package space.kscience.dataforge.control.client - -//public sealed class TangoPayload( -// val host: String, -// val device: String, -// val name: String, -// val timestamp: Long? = null, -// val quality: String = "VALID", -// val event: String? = null, -//// val input: Any? = null, -//// val output: Any? = null, -//// val errors: Iterable?, -//) -// -//public class TangoAttributePayload( -// host: String, -// device: String, -// name: String, -// val value: Any? = null, -// timestamp: Long? = null, -// quality: String = "VALID", -// event: String? = null, -// errors: Iterable?, -//) : TangoPayload(host, device, name, timestamp, quality, event) \ No newline at end of file diff --git a/controls-magix-client/src/commonMain/kotlin/ru/mipt/npm/controls/client/magixClient.kt b/controls-magix-client/src/commonMain/kotlin/ru/mipt/npm/controls/client/dfMagix.kt similarity index 79% rename from controls-magix-client/src/commonMain/kotlin/ru/mipt/npm/controls/client/magixClient.kt rename to controls-magix-client/src/commonMain/kotlin/ru/mipt/npm/controls/client/dfMagix.kt index d18dffd..e64da5e 100644 --- a/controls-magix-client/src/commonMain/kotlin/ru/mipt/npm/controls/client/magixClient.kt +++ b/controls-magix-client/src/commonMain/kotlin/ru/mipt/npm/controls/client/dfMagix.kt @@ -6,17 +6,17 @@ import kotlinx.coroutines.flow.launchIn import kotlinx.coroutines.flow.onEach import kotlinx.coroutines.launch import ru.mipt.npm.magix.api.MagixEndpoint +import ru.mipt.npm.magix.api.MagixMessage import space.kscience.dataforge.context.error import space.kscience.dataforge.context.logger import space.kscience.dataforge.control.controllers.DeviceManager import space.kscience.dataforge.control.controllers.respondMessage import space.kscience.dataforge.control.messages.DeviceMessage -import space.kscience.dataforge.magix.api.MagixMessage public const val DATAFORGE_MAGIX_FORMAT: String = "dataforge" -private fun generateId(request: MagixMessage): String = if (request.id != null) { +internal fun generateId(request: MagixMessage<*>): String = if (request.id != null) { "${request.id}.response" } else { "df[${request.payload.hashCode()}" @@ -25,13 +25,11 @@ private fun generateId(request: MagixMessage): String = if (reque /** * Communicate with server in [Magix format](https://github.com/waltz-controls/rfc/tree/master/1) */ -public fun DeviceManager.launchMagixClient( +public fun DeviceManager.launchDfMagix( endpoint: MagixEndpoint, endpointID: String = DATAFORGE_MAGIX_FORMAT, ): Job = context.launch { endpoint.subscribe().onEach { request -> - //TODO analyze action - val responsePayload = respondMessage(request.payload) val response = MagixMessage( format = DATAFORGE_MAGIX_FORMAT, @@ -46,11 +44,13 @@ public fun DeviceManager.launchMagixClient( }.launchIn(this) controller.messageOutput().onEach { payload -> - MagixMessage( - format = DATAFORGE_MAGIX_FORMAT, - id = "df[${payload.hashCode()}]", - origin = endpointID, - payload = payload + endpoint.broadcast( + MagixMessage( + format = DATAFORGE_MAGIX_FORMAT, + id = "df[${payload.hashCode()}]", + origin = endpointID, + payload = payload + ) ) }.catch { error -> logger.error(error) { "Error while sending a message" } diff --git a/controls-magix-client/src/commonMain/kotlin/ru/mipt/npm/controls/client/tangoMagix.kt b/controls-magix-client/src/commonMain/kotlin/ru/mipt/npm/controls/client/tangoMagix.kt new file mode 100644 index 0000000..6cac86d --- /dev/null +++ b/controls-magix-client/src/commonMain/kotlin/ru/mipt/npm/controls/client/tangoMagix.kt @@ -0,0 +1,143 @@ +package ru.mipt.npm.controls.client + +import kotlinx.coroutines.Job +import kotlinx.coroutines.flow.launchIn +import kotlinx.coroutines.flow.onEach +import kotlinx.coroutines.launch +import kotlinx.serialization.Serializable +import ru.mipt.npm.magix.api.MagixEndpoint +import ru.mipt.npm.magix.api.MagixMessage +import space.kscience.dataforge.context.error +import space.kscience.dataforge.context.logger +import space.kscience.dataforge.control.api.get +import space.kscience.dataforge.control.client.generateId +import space.kscience.dataforge.control.controllers.DeviceManager +import space.kscience.dataforge.meta.MetaItem + +public const val TANGO_MAGIX_FORMAT: String = "tango" + +/* + See https://github.com/waltz-controls/rfc/tree/master/4 for details + + "action":"read|write|exec|pipe", + "timestamp": "int", + "host":"tango_host", + "device":"device name", + "name":"attribute, command or pipe's name", + "[value]":"attribute's value", + "[quality]":"VALID|WARNING|ALARM", + "[argin]":"command argin", + "[argout]":"command argout", + "[data]":"pipe's data", + "[errors]":[] + */ + +@Serializable +public enum class TangoAction { + read, + write, + exec, + pipe +} + +public enum class TangoQuality { + VALID, + WARNING, + ALARM +} + +@Serializable +public data class TangoPayload( + val action: TangoAction, + val timestamp: Int, + val host: String, + val device: String, + val name: String, + val value: MetaItem? = null, + val quality: TangoQuality = TangoQuality.VALID, + val argin: MetaItem? = null, + val argout: MetaItem? = null, + val data: MetaItem? = null, + val errors: List? = null +) + +public fun DeviceManager.launchTangoMagix( + endpoint: MagixEndpoint, + endpointID: String = TANGO_MAGIX_FORMAT, +): Job = context.launch { + + suspend inline fun respond(request: MagixMessage, payloadBuilder: (TangoPayload) -> TangoPayload) { + endpoint.broadcast( + request.copy( + id = generateId(request), + parentId = request.id, + origin = endpointID, + payload = payloadBuilder(request.payload) + ) + ) + } + + endpoint.subscribe().onEach { request -> + try { + val device = get(request.payload.device) + when (request.payload.action) { + TangoAction.read -> { + val value = device.getProperty(request.payload.name) + respond(request) { requestPayload -> + requestPayload.copy( + value = value, + quality = TangoQuality.VALID + ) + } + } + TangoAction.write -> { + request.payload.value?.let { value -> + device.setProperty(request.payload.name, value) + } + //wait for value to be written and return final state + val value = device.getProperty(request.payload.name) + respond(request) { requestPayload -> + requestPayload.copy( + value = value, + quality = TangoQuality.VALID + ) + } + } + TangoAction.exec -> { + val result = device.execute(request.payload.name, request.payload.argin) + respond(request) { requestPayload -> + requestPayload.copy( + argout = result, + quality = TangoQuality.VALID + ) + } + } + TangoAction.pipe -> TODO("Pipe not implemented") + } + } catch (ex: Exception) { + logger.error(ex) { "Error while responding to message" } + endpoint.broadcast( + request.copy( + id = generateId(request), + parentId = request.id, + origin = endpointID, + payload = request.payload.copy(quality = TangoQuality.WARNING) + ) + ) + } + }.launchIn(this) + +//TODO implement subscriptions? +// controller.messageOutput().onEach { payload -> +// endpoint.broadcast( +// MagixMessage( +// format = TANGO_MAGIX_FORMAT, +// id = "df[${payload.hashCode()}]", +// origin = endpointID, +// payload = payload +// ) +// ) +// }.catch { error -> +// logger.error(error) { "Error while sending a message" } +// }.launchIn(this) +} \ No newline at end of file diff --git a/controls-server/src/main/kotlin/ru/mipt/npm/controls/server/deviceWebServer.kt b/controls-server/src/main/kotlin/ru/mipt/npm/controls/server/deviceWebServer.kt index d96e4c0..386b394 100644 --- a/controls-server/src/main/kotlin/ru/mipt/npm/controls/server/deviceWebServer.kt +++ b/controls-server/src/main/kotlin/ru/mipt/npm/controls/server/deviceWebServer.kt @@ -29,7 +29,7 @@ import kotlinx.serialization.json.Json import kotlinx.serialization.json.JsonObject import kotlinx.serialization.json.buildJsonArray import kotlinx.serialization.json.put -import space.kscience.dataforge.control.api.get +import space.kscience.dataforge.control.api.getOrNull import space.kscience.dataforge.control.controllers.DeviceManager import space.kscience.dataforge.control.controllers.respondMessage import space.kscience.dataforge.control.messages.DeviceMessage @@ -115,7 +115,7 @@ public fun Application.deviceModule( } deviceNames.forEach { deviceName -> val device = - manager[deviceName] ?: error("The device with name $deviceName not found in $manager") + manager.getOrNull(deviceName) ?: error("The device with name $deviceName not found in $manager") div { id = deviceName h2 { +deviceName } diff --git a/magix/magix-api/src/commonMain/kotlin/ru/mipt/npm/magix/api/MagixEndpoint.kt b/magix/magix-api/src/commonMain/kotlin/ru/mipt/npm/magix/api/MagixEndpoint.kt index b1bdfd4..b22a433 100644 --- a/magix/magix-api/src/commonMain/kotlin/ru/mipt/npm/magix/api/MagixEndpoint.kt +++ b/magix/magix-api/src/commonMain/kotlin/ru/mipt/npm/magix/api/MagixEndpoint.kt @@ -5,9 +5,6 @@ import kotlinx.coroutines.flow.map import kotlinx.serialization.KSerializer import kotlinx.serialization.json.Json import kotlinx.serialization.json.JsonElement -import space.kscience.dataforge.magix.api.MagixMessage -import space.kscience.dataforge.magix.api.MagixMessageFilter -import space.kscience.dataforge.magix.api.replacePayload /** * Inwards API of magix endpoint used to build services diff --git a/magix/magix-api/src/commonMain/kotlin/ru/mipt/npm/magix/api/MagixMessage.kt b/magix/magix-api/src/commonMain/kotlin/ru/mipt/npm/magix/api/MagixMessage.kt index bfc3704..3066e2d 100644 --- a/magix/magix-api/src/commonMain/kotlin/ru/mipt/npm/magix/api/MagixMessage.kt +++ b/magix/magix-api/src/commonMain/kotlin/ru/mipt/npm/magix/api/MagixMessage.kt @@ -1,4 +1,4 @@ -package space.kscience.dataforge.magix.api +package ru.mipt.npm.magix.api import kotlinx.serialization.Serializable import kotlinx.serialization.json.JsonElement @@ -28,7 +28,6 @@ public data class MagixMessage( val id: String? = null, val parentId: String? = null, val user: JsonElement? = null, - val action: String? = null ) /** @@ -36,4 +35,4 @@ public data class MagixMessage( */ @Suppress("UNCHECKED_CAST") public fun MagixMessage.replacePayload(payloadTransform: (T) -> R): MagixMessage = - MagixMessage(format, origin, payloadTransform(payload), target, id, parentId, user, action) \ No newline at end of file + MagixMessage(format, origin, payloadTransform(payload), target, id, parentId, user) \ No newline at end of file diff --git a/magix/magix-api/src/commonMain/kotlin/ru/mipt/npm/magix/api/MagixMessageFilter.kt b/magix/magix-api/src/commonMain/kotlin/ru/mipt/npm/magix/api/MagixMessageFilter.kt index b47e1e6..2f941c0 100644 --- a/magix/magix-api/src/commonMain/kotlin/ru/mipt/npm/magix/api/MagixMessageFilter.kt +++ b/magix/magix-api/src/commonMain/kotlin/ru/mipt/npm/magix/api/MagixMessageFilter.kt @@ -1,4 +1,4 @@ -package space.kscience.dataforge.magix.api +package ru.mipt.npm.magix.api import kotlinx.coroutines.flow.Flow import kotlinx.coroutines.flow.filter @@ -9,7 +9,6 @@ public data class MagixMessageFilter( val format: List? = null, val origin: List? = null, val target: List? = null, - val action: List? = null, ) { public companion object { public val ALL: MagixMessageFilter = MagixMessageFilter() @@ -28,6 +27,5 @@ public fun Flow>.filter(filter: MagixMessageFilter): Flow