Implement Tango messages

This commit is contained in:
Alexander Nozik 2021-06-20 16:37:03 +03:00
parent 6e4bf51a6f
commit a093f1921e
16 changed files with 186 additions and 65 deletions

View File

@ -36,7 +36,7 @@ public interface Device : Closeable, ContextAware {
* Get the value of the property or throw error if property in not defined.
* Suspend if property value is not available
*/
public suspend fun getProperty(propertyName: String): MetaItem?
public suspend fun getProperty(propertyName: String): MetaItem
/**
* Invalidate property and force recalculate

View File

@ -46,23 +46,29 @@ public interface DeviceHub : Provider {
public operator fun DeviceHub.get(nameToken: NameToken): Device =
devices[nameToken] ?: error("Device with name $nameToken not found in $this")
public operator fun DeviceHub.get(name: Name): Device? = when {
public fun DeviceHub.getOrNull(name: Name): Device? = when {
name.isEmpty() -> this as? Device
name.length == 1 -> get(name.firstOrNull()!!)
else -> (get(name.firstOrNull()!!) as? DeviceHub)?.get(name.cutFirst())
else -> (get(name.firstOrNull()!!) as? DeviceHub)?.getOrNull(name.cutFirst())
}
public operator fun DeviceHub.get(deviceName: String): Device? = get(deviceName.toName())
public operator fun DeviceHub.get(name: Name): Device =
getOrNull(name) ?: error("Device with name $name not found in $this")
public suspend fun DeviceHub.getProperty(deviceName: Name, propertyName: String): MetaItem? =
this[deviceName]?.getProperty(propertyName)
public fun DeviceHub.getOrNull(nameString: String): Device? = getOrNull(nameString.toName())
public operator fun DeviceHub.get(nameString: String): Device =
getOrNull(nameString) ?: error("Device with name $nameString not found in $this")
public suspend fun DeviceHub.getProperty(deviceName: Name, propertyName: String): MetaItem =
this[deviceName].getProperty(propertyName)
public suspend fun DeviceHub.setProperty(deviceName: Name, propertyName: String, value: MetaItem) {
this[deviceName]?.setProperty(propertyName, value)
this[deviceName].setProperty(propertyName, value)
}
public suspend fun DeviceHub.execute(deviceName: Name, command: String, argument: MetaItem?): MetaItem? =
this[deviceName]?.execute(command, argument)
this[deviceName].execute(command, argument)
//suspend fun DeviceHub.respond(request: Envelope): EnvelopeBuilder {

View File

@ -4,7 +4,7 @@ import kotlinx.coroutines.flow.Flow
import kotlinx.coroutines.flow.map
import space.kscience.dataforge.control.api.Device
import space.kscience.dataforge.control.api.DeviceHub
import space.kscience.dataforge.control.api.get
import space.kscience.dataforge.control.api.getOrNull
import space.kscience.dataforge.control.messages.*
import space.kscience.dataforge.meta.Meta
import space.kscience.dataforge.meta.MetaItem
@ -146,7 +146,7 @@ public class DeviceController(
public suspend fun DeviceHub.respondMessage(request: DeviceMessage): DeviceMessage {
return try {
val targetName = request.targetDevice?.toName() ?: Name.EMPTY
val device = this[targetName] ?: error("The device with name $targetName not found in $this")
val device = this.getOrNull(targetName) ?: error("The device with name $targetName not found in $this")
DeviceController.respondMessage(device, targetName.toString(), request)
} catch (ex: Exception) {
DeviceMessage.error(ex, sourceDevice = deviceName, targetDevice = request.sourceDevice)

View File

@ -4,7 +4,7 @@ import kotlinx.coroutines.channels.Channel
import kotlinx.coroutines.flow.Flow
import kotlinx.coroutines.flow.consumeAsFlow
import space.kscience.dataforge.control.api.DeviceHub
import space.kscience.dataforge.control.api.get
import space.kscience.dataforge.control.api.getOrNull
import space.kscience.dataforge.control.messages.DeviceMessage
import space.kscience.dataforge.misc.DFExperimental
import space.kscience.dataforge.names.Name
@ -51,7 +51,7 @@ public class HubController(
public suspend fun respondMessage(message: DeviceMessage): DeviceMessage = try {
val targetName = message.targetDevice?.toName() ?: Name.EMPTY
val device = hub[targetName] ?: error("The device with name $targetName not found in $hub")
val device = hub.getOrNull(targetName) ?: error("The device with name $targetName not found in $hub")
DeviceController.respondMessage(device, targetName.toString(), message)
} catch (ex: Exception) {
DeviceMessage.error(ex, sourceDevice = hub.deviceName, targetDevice = message.sourceDevice)

View File

@ -1,24 +0,0 @@
package space.kscience.dataforge.control.client
//public sealed class TangoPayload(
// val host: String,
// val device: String,
// val name: String,
// val timestamp: Long? = null,
// val quality: String = "VALID",
// val event: String? = null,
//// val input: Any? = null,
//// val output: Any? = null,
//// val errors: Iterable<Any?>?,
//)
//
//public class TangoAttributePayload(
// host: String,
// device: String,
// name: String,
// val value: Any? = null,
// timestamp: Long? = null,
// quality: String = "VALID",
// event: String? = null,
// errors: Iterable<Any?>?,
//) : TangoPayload(host, device, name, timestamp, quality, event)

View File

@ -6,17 +6,17 @@ import kotlinx.coroutines.flow.launchIn
import kotlinx.coroutines.flow.onEach
import kotlinx.coroutines.launch
import ru.mipt.npm.magix.api.MagixEndpoint
import ru.mipt.npm.magix.api.MagixMessage
import space.kscience.dataforge.context.error
import space.kscience.dataforge.context.logger
import space.kscience.dataforge.control.controllers.DeviceManager
import space.kscience.dataforge.control.controllers.respondMessage
import space.kscience.dataforge.control.messages.DeviceMessage
import space.kscience.dataforge.magix.api.MagixMessage
public const val DATAFORGE_MAGIX_FORMAT: String = "dataforge"
private fun generateId(request: MagixMessage<DeviceMessage>): String = if (request.id != null) {
internal fun generateId(request: MagixMessage<*>): String = if (request.id != null) {
"${request.id}.response"
} else {
"df[${request.payload.hashCode()}"
@ -25,13 +25,11 @@ private fun generateId(request: MagixMessage<DeviceMessage>): String = if (reque
/**
* Communicate with server in [Magix format](https://github.com/waltz-controls/rfc/tree/master/1)
*/
public fun DeviceManager.launchMagixClient(
public fun DeviceManager.launchDfMagix(
endpoint: MagixEndpoint<DeviceMessage>,
endpointID: String = DATAFORGE_MAGIX_FORMAT,
): Job = context.launch {
endpoint.subscribe().onEach { request ->
//TODO analyze action
val responsePayload = respondMessage(request.payload)
val response = MagixMessage(
format = DATAFORGE_MAGIX_FORMAT,
@ -46,11 +44,13 @@ public fun DeviceManager.launchMagixClient(
}.launchIn(this)
controller.messageOutput().onEach { payload ->
MagixMessage(
format = DATAFORGE_MAGIX_FORMAT,
id = "df[${payload.hashCode()}]",
origin = endpointID,
payload = payload
endpoint.broadcast(
MagixMessage(
format = DATAFORGE_MAGIX_FORMAT,
id = "df[${payload.hashCode()}]",
origin = endpointID,
payload = payload
)
)
}.catch { error ->
logger.error(error) { "Error while sending a message" }

View File

@ -0,0 +1,143 @@
package ru.mipt.npm.controls.client
import kotlinx.coroutines.Job
import kotlinx.coroutines.flow.launchIn
import kotlinx.coroutines.flow.onEach
import kotlinx.coroutines.launch
import kotlinx.serialization.Serializable
import ru.mipt.npm.magix.api.MagixEndpoint
import ru.mipt.npm.magix.api.MagixMessage
import space.kscience.dataforge.context.error
import space.kscience.dataforge.context.logger
import space.kscience.dataforge.control.api.get
import space.kscience.dataforge.control.client.generateId
import space.kscience.dataforge.control.controllers.DeviceManager
import space.kscience.dataforge.meta.MetaItem
public const val TANGO_MAGIX_FORMAT: String = "tango"
/*
See https://github.com/waltz-controls/rfc/tree/master/4 for details
"action":"read|write|exec|pipe",
"timestamp": "int",
"host":"tango_host",
"device":"device name",
"name":"attribute, command or pipe's name",
"[value]":"attribute's value",
"[quality]":"VALID|WARNING|ALARM",
"[argin]":"command argin",
"[argout]":"command argout",
"[data]":"pipe's data",
"[errors]":[]
*/
@Serializable
public enum class TangoAction {
read,
write,
exec,
pipe
}
public enum class TangoQuality {
VALID,
WARNING,
ALARM
}
@Serializable
public data class TangoPayload(
val action: TangoAction,
val timestamp: Int,
val host: String,
val device: String,
val name: String,
val value: MetaItem? = null,
val quality: TangoQuality = TangoQuality.VALID,
val argin: MetaItem? = null,
val argout: MetaItem? = null,
val data: MetaItem? = null,
val errors: List<String>? = null
)
public fun DeviceManager.launchTangoMagix(
endpoint: MagixEndpoint<TangoPayload>,
endpointID: String = TANGO_MAGIX_FORMAT,
): Job = context.launch {
suspend inline fun respond(request: MagixMessage<TangoPayload>, payloadBuilder: (TangoPayload) -> TangoPayload) {
endpoint.broadcast(
request.copy(
id = generateId(request),
parentId = request.id,
origin = endpointID,
payload = payloadBuilder(request.payload)
)
)
}
endpoint.subscribe().onEach { request ->
try {
val device = get(request.payload.device)
when (request.payload.action) {
TangoAction.read -> {
val value = device.getProperty(request.payload.name)
respond(request) { requestPayload ->
requestPayload.copy(
value = value,
quality = TangoQuality.VALID
)
}
}
TangoAction.write -> {
request.payload.value?.let { value ->
device.setProperty(request.payload.name, value)
}
//wait for value to be written and return final state
val value = device.getProperty(request.payload.name)
respond(request) { requestPayload ->
requestPayload.copy(
value = value,
quality = TangoQuality.VALID
)
}
}
TangoAction.exec -> {
val result = device.execute(request.payload.name, request.payload.argin)
respond(request) { requestPayload ->
requestPayload.copy(
argout = result,
quality = TangoQuality.VALID
)
}
}
TangoAction.pipe -> TODO("Pipe not implemented")
}
} catch (ex: Exception) {
logger.error(ex) { "Error while responding to message" }
endpoint.broadcast(
request.copy(
id = generateId(request),
parentId = request.id,
origin = endpointID,
payload = request.payload.copy(quality = TangoQuality.WARNING)
)
)
}
}.launchIn(this)
//TODO implement subscriptions?
// controller.messageOutput().onEach { payload ->
// endpoint.broadcast(
// MagixMessage(
// format = TANGO_MAGIX_FORMAT,
// id = "df[${payload.hashCode()}]",
// origin = endpointID,
// payload = payload
// )
// )
// }.catch { error ->
// logger.error(error) { "Error while sending a message" }
// }.launchIn(this)
}

View File

@ -29,7 +29,7 @@ import kotlinx.serialization.json.Json
import kotlinx.serialization.json.JsonObject
import kotlinx.serialization.json.buildJsonArray
import kotlinx.serialization.json.put
import space.kscience.dataforge.control.api.get
import space.kscience.dataforge.control.api.getOrNull
import space.kscience.dataforge.control.controllers.DeviceManager
import space.kscience.dataforge.control.controllers.respondMessage
import space.kscience.dataforge.control.messages.DeviceMessage
@ -115,7 +115,7 @@ public fun Application.deviceModule(
}
deviceNames.forEach { deviceName ->
val device =
manager[deviceName] ?: error("The device with name $deviceName not found in $manager")
manager.getOrNull(deviceName) ?: error("The device with name $deviceName not found in $manager")
div {
id = deviceName
h2 { +deviceName }

View File

@ -5,9 +5,6 @@ import kotlinx.coroutines.flow.map
import kotlinx.serialization.KSerializer
import kotlinx.serialization.json.Json
import kotlinx.serialization.json.JsonElement
import space.kscience.dataforge.magix.api.MagixMessage
import space.kscience.dataforge.magix.api.MagixMessageFilter
import space.kscience.dataforge.magix.api.replacePayload
/**
* Inwards API of magix endpoint used to build services

View File

@ -1,4 +1,4 @@
package space.kscience.dataforge.magix.api
package ru.mipt.npm.magix.api
import kotlinx.serialization.Serializable
import kotlinx.serialization.json.JsonElement
@ -28,7 +28,6 @@ public data class MagixMessage<T>(
val id: String? = null,
val parentId: String? = null,
val user: JsonElement? = null,
val action: String? = null
)
/**
@ -36,4 +35,4 @@ public data class MagixMessage<T>(
*/
@Suppress("UNCHECKED_CAST")
public fun <T, R> MagixMessage<T>.replacePayload(payloadTransform: (T) -> R): MagixMessage<R> =
MagixMessage(format, origin, payloadTransform(payload), target, id, parentId, user, action)
MagixMessage(format, origin, payloadTransform(payload), target, id, parentId, user)

View File

@ -1,4 +1,4 @@
package space.kscience.dataforge.magix.api
package ru.mipt.npm.magix.api
import kotlinx.coroutines.flow.Flow
import kotlinx.coroutines.flow.filter
@ -9,7 +9,6 @@ public data class MagixMessageFilter(
val format: List<String?>? = null,
val origin: List<String?>? = null,
val target: List<String?>? = null,
val action: List<String?>? = null,
) {
public companion object {
public val ALL: MagixMessageFilter = MagixMessageFilter()
@ -28,6 +27,5 @@ public fun <T> Flow<MagixMessage<T>>.filter(filter: MagixMessageFilter): Flow<Ma
&& filter.origin?.contains(message.origin) ?: true
&& filter.origin?.contains(message.origin) ?: true
&& filter.target?.contains(message.target) ?: true
&& filter.action?.contains(message.action) ?: true
}
}

View File

@ -5,6 +5,8 @@ import kotlinx.coroutines.Job
import kotlinx.coroutines.flow.launchIn
import kotlinx.coroutines.flow.onEach
import ru.mipt.npm.magix.api.MagixEndpoint
import ru.mipt.npm.magix.api.MagixMessage
import ru.mipt.npm.magix.api.MagixMessageFilter
/**
* Launch magix message converter service

View File

@ -1,7 +1,7 @@
package ru.mipt.npm.magix.client;
import kotlinx.serialization.json.JsonElement;
import space.kscience.dataforge.magix.api.MagixMessage;
import ru.mipt.npm.magix.api.MagixMessage;
import java.io.IOException;
import java.util.concurrent.Flow;

View File

@ -4,8 +4,8 @@ import kotlinx.coroutines.jdk9.asPublisher
import kotlinx.coroutines.runBlocking
import kotlinx.serialization.KSerializer
import ru.mipt.npm.magix.api.MagixEndpoint
import space.kscience.dataforge.magix.api.MagixMessage
import space.kscience.dataforge.magix.api.MagixMessageFilter
import ru.mipt.npm.magix.api.MagixMessage
import ru.mipt.npm.magix.api.MagixMessageFilter
import space.kscience.dataforge.magix.service.RSocketMagixEndpoint
import space.kscience.dataforge.magix.service.withTcp
import java.util.concurrent.Flow

View File

@ -26,9 +26,9 @@ import kotlinx.html.*
import kotlinx.serialization.KSerializer
import kotlinx.serialization.json.JsonElement
import ru.mipt.npm.magix.api.MagixEndpoint.Companion.magixJson
import space.kscience.dataforge.magix.api.MagixMessage
import space.kscience.dataforge.magix.api.MagixMessageFilter
import space.kscience.dataforge.magix.api.filter
import ru.mipt.npm.magix.api.MagixMessage
import ru.mipt.npm.magix.api.MagixMessageFilter
import ru.mipt.npm.magix.api.filter
import space.kscience.dataforge.magix.server.SseEvent
import space.kscience.dataforge.magix.server.respondSse
import java.util.*

View File

@ -15,8 +15,8 @@ import kotlinx.coroutines.withContext
import kotlinx.serialization.KSerializer
import kotlinx.serialization.encodeToString
import ru.mipt.npm.magix.api.MagixEndpoint
import space.kscience.dataforge.magix.api.MagixMessage
import space.kscience.dataforge.magix.api.MagixMessageFilter
import ru.mipt.npm.magix.api.MagixMessage
import ru.mipt.npm.magix.api.MagixMessageFilter
import kotlin.coroutines.CoroutineContext
import kotlin.coroutines.coroutineContext