Refactor local messages add generic respond to Device
This commit is contained in:
parent
ea8ebcf38f
commit
46f8da643d
@ -1,15 +1,23 @@
|
|||||||
package hep.dataforge.control.api
|
package hep.dataforge.control.api
|
||||||
|
|
||||||
|
import hep.dataforge.control.controllers.DeviceMessage
|
||||||
|
import hep.dataforge.control.controllers.MessageController
|
||||||
|
import hep.dataforge.control.controllers.MessageData
|
||||||
|
import hep.dataforge.io.Envelope
|
||||||
|
import hep.dataforge.io.Responder
|
||||||
|
import hep.dataforge.io.SimpleEnvelope
|
||||||
import hep.dataforge.meta.Meta
|
import hep.dataforge.meta.Meta
|
||||||
import hep.dataforge.meta.MetaItem
|
import hep.dataforge.meta.MetaItem
|
||||||
|
import hep.dataforge.meta.wrap
|
||||||
import kotlinx.coroutines.CoroutineScope
|
import kotlinx.coroutines.CoroutineScope
|
||||||
import kotlinx.coroutines.cancel
|
import kotlinx.coroutines.cancel
|
||||||
|
import kotlinx.io.Binary
|
||||||
import kotlinx.io.Closeable
|
import kotlinx.io.Closeable
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* General interface describing a managed Device
|
* General interface describing a managed Device
|
||||||
*/
|
*/
|
||||||
interface Device: Closeable {
|
interface Device: Closeable, Responder {
|
||||||
/**
|
/**
|
||||||
* List of supported property descriptors
|
* List of supported property descriptors
|
||||||
*/
|
*/
|
||||||
@ -61,9 +69,82 @@ interface Device: Closeable {
|
|||||||
*/
|
*/
|
||||||
suspend fun exec(action: String, argument: MetaItem<*>? = null): MetaItem<*>?
|
suspend fun exec(action: String, argument: MetaItem<*>? = null): MetaItem<*>?
|
||||||
|
|
||||||
|
override suspend fun respond(request: Envelope): Envelope {
|
||||||
|
val requestMessage = DeviceMessage.wrap(request.meta)
|
||||||
|
val responseMessage = respondMessage(requestMessage)
|
||||||
|
return SimpleEnvelope(responseMessage.toMeta(), Binary.EMPTY)
|
||||||
|
}
|
||||||
|
|
||||||
override fun close() {
|
override fun close() {
|
||||||
scope.cancel("The device is closed")
|
scope.cancel("The device is closed")
|
||||||
}
|
}
|
||||||
|
|
||||||
|
companion object{
|
||||||
|
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
suspend fun Device.respondMessage(
|
||||||
|
request: DeviceMessage
|
||||||
|
): DeviceMessage {
|
||||||
|
val result: List<MessageData> = when (val action = request.type) {
|
||||||
|
MessageController.GET_PROPERTY_ACTION -> {
|
||||||
|
request.data.map { property ->
|
||||||
|
MessageData {
|
||||||
|
name = property.name
|
||||||
|
value = getProperty(name)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
MessageController.SET_PROPERTY_ACTION -> {
|
||||||
|
request.data.map { property ->
|
||||||
|
val propertyName: String = property.name
|
||||||
|
val propertyValue = property.value
|
||||||
|
if (propertyValue == null) {
|
||||||
|
invalidateProperty(propertyName)
|
||||||
|
} else {
|
||||||
|
setProperty(propertyName, propertyValue)
|
||||||
|
}
|
||||||
|
MessageData {
|
||||||
|
name = propertyName
|
||||||
|
value = getProperty(propertyName)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
MessageController.EXECUTE_ACTION -> {
|
||||||
|
request.data.map { payload ->
|
||||||
|
MessageData {
|
||||||
|
name = payload.name
|
||||||
|
value = exec(payload.name, payload.value)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
MessageController.PROPERTY_LIST_ACTION -> {
|
||||||
|
propertyDescriptors.map { descriptor ->
|
||||||
|
MessageData {
|
||||||
|
name = descriptor.name
|
||||||
|
value = MetaItem.NodeItem(descriptor.config)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
MessageController.ACTION_LIST_ACTION -> {
|
||||||
|
actionDescriptors.map { descriptor ->
|
||||||
|
MessageData {
|
||||||
|
name = descriptor.name
|
||||||
|
value = MetaItem.NodeItem(descriptor.config)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
else -> {
|
||||||
|
error("Unrecognized action $action")
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return DeviceMessage.ok {
|
||||||
|
target = request.source
|
||||||
|
data = result
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
suspend fun Device.exec(name: String, meta: Meta?) = exec(name, meta?.let { MetaItem.NodeItem(it) })
|
suspend fun Device.exec(name: String, meta: Meta?) = exec(name, meta?.let { MetaItem.NodeItem(it) })
|
@ -1,11 +1,14 @@
|
|||||||
package hep.dataforge.control.controllers
|
package hep.dataforge.control.controllers
|
||||||
|
|
||||||
import hep.dataforge.control.controllers.DeviceMessage.Companion.DATA_VALUE_KEY
|
import hep.dataforge.control.controllers.DeviceMessage.Companion.DATA_VALUE_KEY
|
||||||
|
import hep.dataforge.io.SimpleEnvelope
|
||||||
import hep.dataforge.meta.*
|
import hep.dataforge.meta.*
|
||||||
import hep.dataforge.names.asName
|
import hep.dataforge.names.asName
|
||||||
import kotlinx.serialization.*
|
import kotlinx.serialization.Decoder
|
||||||
|
import kotlinx.serialization.Encoder
|
||||||
|
import kotlinx.serialization.KSerializer
|
||||||
|
import kotlinx.serialization.SerialDescriptor
|
||||||
|
|
||||||
@Serializable
|
|
||||||
class DeviceMessage : Scheme() {
|
class DeviceMessage : Scheme() {
|
||||||
var source by string()
|
var source by string()
|
||||||
var target by string()
|
var target by string()
|
||||||
@ -68,4 +71,6 @@ class MessageData : Scheme() {
|
|||||||
}
|
}
|
||||||
|
|
||||||
@DFBuilder
|
@DFBuilder
|
||||||
fun DeviceMessage.property(block: MessageData.() -> Unit): MessageData = append(MessageData, block)
|
fun DeviceMessage.data(block: MessageData.() -> Unit): MessageData = append(MessageData, block)
|
||||||
|
|
||||||
|
fun DeviceMessage.wrap() = SimpleEnvelope(this.config, null)
|
||||||
|
@ -2,12 +2,12 @@ package hep.dataforge.control.controllers
|
|||||||
|
|
||||||
import hep.dataforge.control.api.Device
|
import hep.dataforge.control.api.Device
|
||||||
import hep.dataforge.control.api.DeviceListener
|
import hep.dataforge.control.api.DeviceListener
|
||||||
|
import hep.dataforge.control.api.respondMessage
|
||||||
import hep.dataforge.control.controllers.DeviceMessage.Companion.PROPERTY_CHANGED_ACTION
|
import hep.dataforge.control.controllers.DeviceMessage.Companion.PROPERTY_CHANGED_ACTION
|
||||||
import hep.dataforge.io.Envelope
|
import hep.dataforge.io.Envelope
|
||||||
import hep.dataforge.io.Responder
|
import hep.dataforge.io.Responder
|
||||||
import hep.dataforge.io.SimpleEnvelope
|
import hep.dataforge.io.SimpleEnvelope
|
||||||
import hep.dataforge.meta.MetaItem
|
import hep.dataforge.meta.*
|
||||||
import hep.dataforge.meta.wrap
|
|
||||||
import kotlinx.coroutines.CoroutineScope
|
import kotlinx.coroutines.CoroutineScope
|
||||||
import kotlinx.coroutines.channels.Channel
|
import kotlinx.coroutines.channels.Channel
|
||||||
import kotlinx.coroutines.flow.consumeAsFlow
|
import kotlinx.coroutines.flow.consumeAsFlow
|
||||||
@ -33,78 +33,6 @@ class MessageController(
|
|||||||
|
|
||||||
private val outputChannel = Channel<Envelope>(Channel.CONFLATED)
|
private val outputChannel = Channel<Envelope>(Channel.CONFLATED)
|
||||||
|
|
||||||
suspend fun respondMessage(
|
|
||||||
request: DeviceMessage
|
|
||||||
): DeviceMessage = if (request.target != null && request.target != deviceTarget) {
|
|
||||||
DeviceMessage.fail {
|
|
||||||
comment = "Wrong target name $deviceTarget expected but ${request.target} found"
|
|
||||||
}
|
|
||||||
} else try {
|
|
||||||
val result: List<MessageData> = when (val action = request.type) {
|
|
||||||
GET_PROPERTY_ACTION -> {
|
|
||||||
request.data.map { property ->
|
|
||||||
MessageData {
|
|
||||||
name = property.name
|
|
||||||
value = device.getProperty(name)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
SET_PROPERTY_ACTION -> {
|
|
||||||
request.data.map { property ->
|
|
||||||
val propertyName: String = property.name
|
|
||||||
val propertyValue = property.value
|
|
||||||
if (propertyValue == null) {
|
|
||||||
device.invalidateProperty(propertyName)
|
|
||||||
} else {
|
|
||||||
device.setProperty(propertyName, propertyValue)
|
|
||||||
}
|
|
||||||
MessageData {
|
|
||||||
name = propertyName
|
|
||||||
value = device.getProperty(propertyName)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
EXECUTE_ACTION -> {
|
|
||||||
request.data.map { payload ->
|
|
||||||
MessageData {
|
|
||||||
name = payload.name
|
|
||||||
value = device.exec(payload.name, payload.value)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
PROPERTY_LIST_ACTION -> {
|
|
||||||
device.propertyDescriptors.map { descriptor ->
|
|
||||||
MessageData {
|
|
||||||
name = descriptor.name
|
|
||||||
value = MetaItem.NodeItem(descriptor.config)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
ACTION_LIST_ACTION -> {
|
|
||||||
device.actionDescriptors.map { descriptor ->
|
|
||||||
MessageData {
|
|
||||||
name = descriptor.name
|
|
||||||
value = MetaItem.NodeItem(descriptor.config)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
else -> {
|
|
||||||
error("Unrecognized action $action")
|
|
||||||
}
|
|
||||||
}
|
|
||||||
DeviceMessage.ok {
|
|
||||||
source = deviceTarget
|
|
||||||
target = request.source
|
|
||||||
data = result
|
|
||||||
}
|
|
||||||
} catch (ex: Exception) {
|
|
||||||
DeviceMessage.fail {
|
|
||||||
comment = ex.message
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
override fun consume(message: Envelope) {
|
override fun consume(message: Envelope) {
|
||||||
// Fire the respond procedure and forget about the result
|
// Fire the respond procedure and forget about the result
|
||||||
scope.launch {
|
scope.launch {
|
||||||
@ -112,10 +40,38 @@ class MessageController(
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
suspend fun respondMessage(message: DeviceMessage): DeviceMessage {
|
||||||
|
return try {
|
||||||
|
device.respondMessage(message).apply {
|
||||||
|
target = message.source
|
||||||
|
source = deviceTarget
|
||||||
|
}
|
||||||
|
} catch (ex: Exception) {
|
||||||
|
DeviceMessage.fail {
|
||||||
|
comment = ex.message
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
override suspend fun respond(request: Envelope): Envelope {
|
override suspend fun respond(request: Envelope): Envelope {
|
||||||
val requestMessage = DeviceMessage.wrap(request.meta)
|
val target = request.meta["target"].string
|
||||||
val responseMessage = respondMessage(requestMessage)
|
return try {
|
||||||
return SimpleEnvelope(responseMessage.toMeta(), Binary.EMPTY)
|
if (request.data == null) {
|
||||||
|
respondMessage(DeviceMessage.wrap(request.meta)).wrap()
|
||||||
|
}else if(target != null && target != deviceTarget) {
|
||||||
|
error("Wrong target name $deviceTarget expected but ${target} found")
|
||||||
|
} else {
|
||||||
|
val response = device.respond(request)
|
||||||
|
return SimpleEnvelope(response.meta.edit {
|
||||||
|
"target" put request.meta["source"].string
|
||||||
|
"source" put deviceTarget
|
||||||
|
}, response.data)
|
||||||
|
}
|
||||||
|
} catch (ex: Exception) {
|
||||||
|
DeviceMessage.fail {
|
||||||
|
comment = ex.message
|
||||||
|
}.wrap()
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
override fun propertyChanged(propertyName: String, value: MetaItem<*>?) {
|
override fun propertyChanged(propertyName: String, value: MetaItem<*>?) {
|
||||||
@ -124,7 +80,7 @@ class MessageController(
|
|||||||
val change = DeviceMessage.ok {
|
val change = DeviceMessage.ok {
|
||||||
this.source = deviceTarget
|
this.source = deviceTarget
|
||||||
type = PROPERTY_CHANGED_ACTION
|
type = PROPERTY_CHANGED_ACTION
|
||||||
property {
|
data {
|
||||||
name = propertyName
|
name = propertyName
|
||||||
this.value = value
|
this.value = value
|
||||||
}
|
}
|
||||||
|
@ -7,7 +7,7 @@ import hep.dataforge.control.controllers.DeviceMessage
|
|||||||
import hep.dataforge.control.controllers.MessageController
|
import hep.dataforge.control.controllers.MessageController
|
||||||
import hep.dataforge.control.controllers.MessageController.Companion.GET_PROPERTY_ACTION
|
import hep.dataforge.control.controllers.MessageController.Companion.GET_PROPERTY_ACTION
|
||||||
import hep.dataforge.control.controllers.MessageController.Companion.SET_PROPERTY_ACTION
|
import hep.dataforge.control.controllers.MessageController.Companion.SET_PROPERTY_ACTION
|
||||||
import hep.dataforge.control.controllers.property
|
import hep.dataforge.control.controllers.data
|
||||||
import hep.dataforge.meta.toJson
|
import hep.dataforge.meta.toJson
|
||||||
import hep.dataforge.meta.toMeta
|
import hep.dataforge.meta.toMeta
|
||||||
import hep.dataforge.meta.toMetaItem
|
import hep.dataforge.meta.toMetaItem
|
||||||
@ -103,7 +103,7 @@ private suspend fun ApplicationCall.getProperty(target: MessageController) {
|
|||||||
type = GET_PROPERTY_ACTION
|
type = GET_PROPERTY_ACTION
|
||||||
source = WEB_SERVER_TARGET
|
source = WEB_SERVER_TARGET
|
||||||
this.target = target.deviceTarget
|
this.target = target.deviceTarget
|
||||||
property {
|
data {
|
||||||
name = property
|
name = property
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -121,7 +121,7 @@ private suspend fun ApplicationCall.setProperty(target: MessageController) {
|
|||||||
type = SET_PROPERTY_ACTION
|
type = SET_PROPERTY_ACTION
|
||||||
source = WEB_SERVER_TARGET
|
source = WEB_SERVER_TARGET
|
||||||
this.target = target.deviceTarget
|
this.target = target.deviceTarget
|
||||||
property {
|
data {
|
||||||
name = property
|
name = property
|
||||||
value = json.toMetaItem()
|
value = json.toMetaItem()
|
||||||
}
|
}
|
||||||
|
Loading…
Reference in New Issue
Block a user