Another API refactoring

This commit is contained in:
Alexander Nozik 2020-08-29 15:28:34 +03:00
parent 6500d5a05e
commit 20079e62da
12 changed files with 232 additions and 168 deletions

View File

@ -1,8 +1,8 @@
package hep.dataforge.control.client
import hep.dataforge.control.api.respondMessage
import hep.dataforge.control.controllers.DeviceManager
import hep.dataforge.control.controllers.DeviceMessage
import hep.dataforge.control.controllers.respondMessage
import hep.dataforge.meta.toJson
import hep.dataforge.meta.toMeta
import hep.dataforge.meta.wrap

View File

@ -1,12 +1,10 @@
package hep.dataforge.control.api
import hep.dataforge.control.api.Device.Companion.DEVICE_TARGET
import hep.dataforge.control.controllers.DeviceMessage
import hep.dataforge.control.controllers.MessageData
import hep.dataforge.control.controllers.wrap
import hep.dataforge.io.Envelope
import hep.dataforge.io.EnvelopeBuilder
import hep.dataforge.meta.*
import hep.dataforge.meta.Meta
import hep.dataforge.meta.MetaItem
import hep.dataforge.provider.Type
import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.cancel
@ -20,7 +18,7 @@ interface Consumer {
* General interface describing a managed Device
*/
@Type(DEVICE_TARGET)
interface Device: Closeable{
interface Device : Closeable {
/**
* List of supported property descriptors
*/
@ -70,119 +68,23 @@ interface Device: Closeable{
* Send an action request and suspend caller while request is being processed.
* Could return null if request does not return a meaningful answer.
*/
suspend fun execute(action: String, argument: MetaItem<*>? = null): MetaItem<*>?
suspend fun execute(command: String, argument: MetaItem<*>? = null): MetaItem<*>?
/**
*
* A request with binary data or for binary response (or both). This request does not cover basic functionality like
* [setProperty], [getProperty] or [execute] and not defined for a generic device.
*
* TODO implement Responder after DF 0.1.9
*/
suspend fun respondWithData(request: Envelope): EnvelopeBuilder = error("Respond with data not implemented")
suspend fun respond(request: Envelope): EnvelopeBuilder = error("No binary response defined")
override fun close() {
scope.cancel("The device is closed")
}
companion object{
companion object {
const val DEVICE_TARGET = "device"
const val GET_PROPERTY_ACTION = "read"
const val SET_PROPERTY_ACTION = "write"
const val EXECUTE_ACTION = "execute"
const val PROPERTY_LIST_ACTION = "propertyList"
const val ACTION_LIST_ACTION = "actionList"
internal suspend fun respond(device: Device, deviceTarget: String, request: Envelope): Envelope {
val target = request.meta["target"].string
return try {
if (request.data == null) {
respondMessage(device, deviceTarget, DeviceMessage.wrap(request.meta)).wrap()
} else if (target != null && target != deviceTarget) {
error("Wrong target name $deviceTarget expected but $target found")
} else {
val response = device.respondWithData(request).apply {
meta {
"target" put request.meta["source"].string
"source" put deviceTarget
}
}
return response.build()
}
} catch (ex: Exception) {
DeviceMessage.fail {
comment = ex.message
}.wrap()
}
}
internal suspend fun respondMessage(
device: Device,
deviceTarget: String,
request: DeviceMessage
): DeviceMessage {
return try {
val result: List<MessageData> = when (val action = request.type) {
GET_PROPERTY_ACTION -> {
request.data.map { property ->
MessageData {
name = property.name
value = device.getProperty(name)
}
}
}
SET_PROPERTY_ACTION -> {
request.data.map { property ->
val propertyName: String = property.name
val propertyValue = property.value
if (propertyValue == null) {
device.invalidateProperty(propertyName)
} else {
device.setProperty(propertyName, propertyValue)
}
MessageData {
name = propertyName
value = device.getProperty(propertyName)
}
}
}
EXECUTE_ACTION -> {
request.data.map { payload ->
MessageData {
name = payload.name
value = device.execute(payload.name, payload.value)
}
}
}
PROPERTY_LIST_ACTION -> {
device.propertyDescriptors.map { descriptor ->
MessageData {
name = descriptor.name
value = MetaItem.NodeItem(descriptor.config)
}
}
}
ACTION_LIST_ACTION -> {
device.actionDescriptors.map { descriptor ->
MessageData {
name = descriptor.name
value = MetaItem.NodeItem(descriptor.config)
}
}
}
else -> {
error("Unrecognized action $action")
}
}
DeviceMessage.ok {
target = request.source
source = deviceTarget
data = result
}
} catch (ex: Exception) {
DeviceMessage.fail {
comment = ex.message
}
}
}
}
}

View File

@ -1,10 +1,6 @@
package hep.dataforge.control.api
import hep.dataforge.control.controllers.DeviceMessage
import hep.dataforge.io.Envelope
import hep.dataforge.meta.MetaItem
import hep.dataforge.meta.get
import hep.dataforge.meta.string
import hep.dataforge.names.Name
import hep.dataforge.names.toName
import hep.dataforge.provider.Provider
@ -47,21 +43,10 @@ suspend fun DeviceHub.setProperty(deviceName: Name, propertyName: String, value:
suspend fun DeviceHub.execute(deviceName: Name, command: String, argument: MetaItem<*>?): MetaItem<*>? =
this[deviceName].execute(command, argument)
suspend fun DeviceHub.respondMessage(request: DeviceMessage): DeviceMessage {
return try {
val targetName = request.target?.toName() ?: Name.EMPTY
val device = this[targetName]
Device.respondMessage(device, targetName.toString(), request)
} catch (ex: Exception) {
DeviceMessage.fail {
comment = ex.message
}
}
}
suspend fun DeviceHub.respond(request: Envelope): Envelope {
val target = request.meta[DeviceMessage.TARGET_KEY].string ?: defaultTarget
val device = this[target.toName()]
return Device.respond(device, target, request)
}
//suspend fun DeviceHub.respond(request: Envelope): EnvelopeBuilder {
// val target = request.meta[DeviceMessage.TARGET_KEY].string ?: defaultTarget
// val device = this[target.toName()]
//
// return device.respond(device, target, request)
//}

View File

@ -8,7 +8,7 @@ import hep.dataforge.meta.MetaItem
*/
interface DeviceListener {
fun propertyChanged(propertyName: String, value: MetaItem<*>?)
fun actionExecuted(action:String, argument: MetaItem<*>?, result: MetaItem<*>?)
fun actionExecuted(action: String, argument: MetaItem<*>?, result: MetaItem<*>?) {}
//TODO add general message listener method
}

View File

@ -13,50 +13,60 @@ interface Action {
suspend operator fun invoke(arg: MetaItem<*>? = null): MetaItem<*>?
}
class SimpleAction(
private fun DeviceBase.actionExecuted(action: String, argument: MetaItem<*>?, result: MetaItem<*>?){
notifyListeners { actionExecuted(action, argument, result) }
}
/**
* A stand-alone action
*/
class IsolatedAction(
override val name: String,
override val descriptor: ActionDescriptor,
val callback: (action: String, argument: MetaItem<*>?, result: MetaItem<*>?) -> Unit,
val block: suspend (MetaItem<*>?) -> MetaItem<*>?
) : Action {
override suspend fun invoke(arg: MetaItem<*>?): MetaItem<*>? = block(arg)
override suspend fun invoke(arg: MetaItem<*>?): MetaItem<*>? = block(arg).also {
callback(name, arg, it)
}
}
class ActionDelegate<D : DeviceBase>(
val owner: D,
val descriptorBuilder: ActionDescriptor.()->Unit = {},
val descriptorBuilder: ActionDescriptor.() -> Unit = {},
val block: suspend (MetaItem<*>?) -> MetaItem<*>?
) : ReadOnlyProperty<D, Action> {
override fun getValue(thisRef: D, property: KProperty<*>): Action {
val name = property.name
return owner.registerAction(name) {
SimpleAction(name, ActionDescriptor(name).apply(descriptorBuilder), block)
IsolatedAction(name, ActionDescriptor(name).apply(descriptorBuilder), owner::actionExecuted, block)
}
}
}
fun <D : DeviceBase> D.request(
descriptorBuilder: ActionDescriptor.()->Unit = {},
descriptorBuilder: ActionDescriptor.() -> Unit = {},
block: suspend (MetaItem<*>?) -> MetaItem<*>?
): ActionDelegate<D> = ActionDelegate(this, descriptorBuilder, block)
fun <D : DeviceBase> D.requestValue(
descriptorBuilder: ActionDescriptor.()->Unit = {},
descriptorBuilder: ActionDescriptor.() -> Unit = {},
block: suspend (MetaItem<*>?) -> Any?
): ActionDelegate<D> = ActionDelegate(this, descriptorBuilder){
): ActionDelegate<D> = ActionDelegate(this, descriptorBuilder) {
val res = block(it)
MetaItem.ValueItem(Value.of(res))
}
fun <D : DeviceBase> D.requestMeta(
descriptorBuilder: ActionDescriptor.()->Unit = {},
descriptorBuilder: ActionDescriptor.() -> Unit = {},
block: suspend MetaBuilder.(MetaItem<*>?) -> Unit
): ActionDelegate<D> = ActionDelegate(this, descriptorBuilder){
val res = MetaBuilder().apply { block(it)}
): ActionDelegate<D> = ActionDelegate(this, descriptorBuilder) {
val res = MetaBuilder().apply { block(it) }
MetaItem.NodeItem(res)
}
fun <D : DeviceBase> D.action(
descriptorBuilder: ActionDescriptor.()->Unit = {},
descriptorBuilder: ActionDescriptor.() -> Unit = {},
block: suspend (MetaItem<*>?) -> Unit
): ActionDelegate<D> = ActionDelegate(this, descriptorBuilder) {
block(it)

View File

@ -5,6 +5,7 @@ import hep.dataforge.control.api.Device
import hep.dataforge.control.api.DeviceListener
import hep.dataforge.control.api.PropertyDescriptor
import hep.dataforge.meta.MetaItem
import kotlinx.coroutines.launch
/**
* Baseline implementation of [Device] interface
@ -23,8 +24,15 @@ abstract class DeviceBase : Device {
listeners.removeAll { it.first == owner }
}
internal fun propertyChanged(propertyName: String, value: MetaItem<*>?) {
listeners.forEach { it.second.propertyChanged(propertyName, value) }
fun notifyListeners(block: DeviceListener.() -> Unit) {
listeners.forEach { it.second.block() }
}
fun notifyPropertyChanged(propertyName: String) {
scope.launch {
val value = getProperty(propertyName)
notifyListeners { propertyChanged(propertyName, value) }
}
}
override val propertyDescriptors: Collection<PropertyDescriptor>
@ -54,8 +62,8 @@ abstract class DeviceBase : Device {
)
}
override suspend fun execute(action: String, argument: MetaItem<*>?): MetaItem<*>? =
(actions[action] ?: error("Request with name $action not defined")).invoke(argument)
override suspend fun execute(command: String, argument: MetaItem<*>?): MetaItem<*>? =
(actions[command] ?: error("Request with name $command not defined")).invoke(argument)
companion object {

View File

@ -18,6 +18,10 @@ import kotlinx.coroutines.withContext
import kotlin.properties.ReadOnlyProperty
import kotlin.reflect.KProperty
private fun DeviceBase.propertyChanged(name: String, item: MetaItem<*>?){
notifyListeners { propertyChanged(name, item) }
}
/**
* A stand-alone [ReadOnlyDeviceProperty] implementation not directly attached to a device
*/
@ -27,7 +31,7 @@ open class IsolatedReadOnlyDeviceProperty(
default: MetaItem<*>?,
override val descriptor: PropertyDescriptor,
override val scope: CoroutineScope,
private val updateCallback: (name: String, item: MetaItem<*>) -> Unit,
private val callback: (name: String, item: MetaItem<*>) -> Unit,
private val getter: suspend (before: MetaItem<*>?) -> MetaItem<*>
) : ReadOnlyDeviceProperty {
@ -40,7 +44,7 @@ open class IsolatedReadOnlyDeviceProperty(
protected fun update(item: MetaItem<*>) {
state.value = item
updateCallback(name, item)
callback(name, item)
}
override suspend fun read(force: Boolean): MetaItem<*> {
@ -62,6 +66,22 @@ open class IsolatedReadOnlyDeviceProperty(
override fun flow(): StateFlow<MetaItem<*>?> = state
}
fun DeviceBase.readOnlyProperty(
name: String,
default: MetaItem<*>?,
descriptorBuilder: PropertyDescriptor.() -> Unit = {},
getter: suspend (MetaItem<*>?) -> MetaItem<*>
): ReadOnlyDeviceProperty = registerProperty(name) {
IsolatedReadOnlyDeviceProperty(
name,
default,
PropertyDescriptor(name).apply(descriptorBuilder),
scope,
::propertyChanged,
getter
)
}
private class ReadOnlyDevicePropertyDelegate<D : DeviceBase>(
val owner: D,
val default: MetaItem<*>?,
@ -176,6 +196,24 @@ class IsolatedDeviceProperty(
}
}
fun DeviceBase.mutableProperty(
name: String,
default: MetaItem<*>?,
descriptorBuilder: PropertyDescriptor.() -> Unit = {},
getter: suspend (MetaItem<*>?) -> MetaItem<*>,
setter: suspend (oldValue: MetaItem<*>?, newValue: MetaItem<*>) -> MetaItem<*>?
): ReadOnlyDeviceProperty = registerProperty(name) {
IsolatedDeviceProperty(
name,
default,
PropertyDescriptor(name).apply(descriptorBuilder),
scope,
::propertyChanged,
getter,
setter
)
}
private class DevicePropertyDelegate<D : DeviceBase>(
val owner: D,
val default: MetaItem<*>?,

View File

@ -1,13 +1,16 @@
package hep.dataforge.control.controllers
import hep.dataforge.control.api.Consumer
import hep.dataforge.control.api.Device
import hep.dataforge.control.api.DeviceListener
import hep.dataforge.control.api.*
import hep.dataforge.control.controllers.DeviceMessage.Companion.PROPERTY_CHANGED_ACTION
import hep.dataforge.io.Envelope
import hep.dataforge.io.Responder
import hep.dataforge.io.SimpleEnvelope
import hep.dataforge.meta.MetaItem
import hep.dataforge.meta.get
import hep.dataforge.meta.string
import hep.dataforge.meta.wrap
import hep.dataforge.names.Name
import hep.dataforge.names.toName
import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.channels.Channel
import kotlinx.coroutines.flow.consumeAsFlow
@ -26,13 +29,9 @@ class DeviceController(
private val outputChannel = Channel<Envelope>(Channel.CONFLATED)
suspend fun respondMessage(message: DeviceMessage): DeviceMessage {
return Device.respondMessage(device, deviceTarget, message)
}
suspend fun respondMessage(message: DeviceMessage): DeviceMessage = respondMessage(device, deviceTarget, message)
override suspend fun respond(request: Envelope): Envelope {
return Device.respond(device, deviceTarget, request)
}
override suspend fun respond(request: Envelope): Envelope = respond(device, deviceTarget, request)
override fun propertyChanged(propertyName: String, value: MetaItem<*>?) {
if (value == null) return
@ -61,6 +60,116 @@ class DeviceController(
}
companion object {
const val GET_PROPERTY_ACTION = "read"
const val SET_PROPERTY_ACTION = "write"
const val EXECUTE_ACTION = "execute"
const val PROPERTY_LIST_ACTION = "propertyList"
const val ACTION_LIST_ACTION = "actionList"
internal suspend fun respond(device: Device, deviceTarget: String, request: Envelope): Envelope {
val target = request.meta["target"].string
return try {
if (request.data == null) {
respondMessage(device, deviceTarget, DeviceMessage.wrap(request.meta)).wrap()
} else if (target != null && target != deviceTarget) {
error("Wrong target name $deviceTarget expected but $target found")
} else {
val response = device.respond(request).apply {
meta {
"target" put request.meta["source"].string
"source" put deviceTarget
}
}
return response.build()
}
} catch (ex: Exception) {
DeviceMessage.fail {
comment = ex.message
}.wrap()
}
}
internal suspend fun respondMessage(
device: Device,
deviceTarget: String,
request: DeviceMessage
): DeviceMessage {
return try {
val result: List<MessageData> = when (val action = request.type) {
GET_PROPERTY_ACTION -> {
request.data.map { property ->
MessageData {
name = property.name
value = device.getProperty(name)
}
}
}
SET_PROPERTY_ACTION -> {
request.data.map { property ->
val propertyName: String = property.name
val propertyValue = property.value
if (propertyValue == null) {
device.invalidateProperty(propertyName)
} else {
device.setProperty(propertyName, propertyValue)
}
MessageData {
name = propertyName
value = device.getProperty(propertyName)
}
}
}
EXECUTE_ACTION -> {
request.data.map { payload ->
MessageData {
name = payload.name
value = device.execute(payload.name, payload.value)
}
}
}
PROPERTY_LIST_ACTION -> {
device.propertyDescriptors.map { descriptor ->
MessageData {
name = descriptor.name
value = MetaItem.NodeItem(descriptor.config)
}
}
}
ACTION_LIST_ACTION -> {
device.actionDescriptors.map { descriptor ->
MessageData {
name = descriptor.name
value = MetaItem.NodeItem(descriptor.config)
}
}
}
else -> {
error("Unrecognized action $action")
}
}
DeviceMessage.ok {
target = request.source
source = deviceTarget
data = result
}
} catch (ex: Exception) {
DeviceMessage.fail {
comment = ex.message
}
}
}
}
}
suspend fun DeviceHub.respondMessage(request: DeviceMessage): DeviceMessage {
return try {
val targetName = request.target?.toName() ?: Name.EMPTY
val device = this[targetName]
DeviceController.respondMessage(device, targetName.toString(), request)
} catch (ex: Exception) {
DeviceMessage.fail {
comment = ex.message
}
}
}

View File

@ -1,6 +1,6 @@
package hep.dataforge.control.controllers
import hep.dataforge.control.api.Device.Companion.GET_PROPERTY_ACTION
import hep.dataforge.control.controllers.DeviceController.Companion.GET_PROPERTY_ACTION
import hep.dataforge.io.SimpleEnvelope
import hep.dataforge.meta.*
import hep.dataforge.names.asName

View File

@ -1,6 +1,9 @@
package hep.dataforge.control.controllers
import hep.dataforge.control.api.*
import hep.dataforge.control.api.Consumer
import hep.dataforge.control.api.DeviceHub
import hep.dataforge.control.api.DeviceListener
import hep.dataforge.control.api.get
import hep.dataforge.io.Envelope
import hep.dataforge.io.Responder
import hep.dataforge.meta.MetaItem
@ -60,7 +63,7 @@ class HubController(
suspend fun respondMessage(message: DeviceMessage): DeviceMessage = try {
val targetName = message.target?.toName() ?: Name.EMPTY
val device = hub[targetName]
Device.respondMessage(device, targetName.toString(), message)
DeviceController.respondMessage(device, targetName.toString(), message)
} catch (ex: Exception) {
DeviceMessage.fail {
comment = ex.message
@ -71,9 +74,9 @@ class HubController(
val targetName = request.meta[DeviceMessage.TARGET_KEY].string?.toName() ?: Name.EMPTY
val device = hub[targetName]
if (request.data == null) {
Device.respondMessage(device, targetName.toString(), DeviceMessage.wrap(request.meta)).wrap()
DeviceController.respondMessage(device, targetName.toString(), DeviceMessage.wrap(request.meta)).wrap()
} else {
Device.respond(device, targetName.toString(), request)
DeviceController.respond(device, targetName.toString(), request)
}
} catch (ex: Exception) {
DeviceMessage.fail {

View File

@ -2,7 +2,7 @@ package hep.dataforge.control.ports
import kotlinx.coroutines.CancellationException
import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.cancel
import kotlinx.coroutines.Job
import kotlinx.coroutines.channels.Channel
import kotlinx.coroutines.flow.Flow
import kotlinx.coroutines.flow.receiveAsFlow
@ -16,7 +16,12 @@ abstract class Port(val scope: CoroutineScope) : Closeable {
private val outgoing = Channel<ByteArray>(100)
private val incoming = Channel<ByteArray>(Channel.CONFLATED)
//val receiveChannel: ReceiveChannel<ByteArray> get() = incoming
init {
scope.coroutineContext[Job]?.invokeOnCompletion {
close()
}
}
/**
* Internal method to synchronously send data
@ -45,6 +50,9 @@ abstract class Port(val scope: CoroutineScope) : Closeable {
}
}
/**
* Send a data packet via the port
*/
suspend fun send(data: ByteArray) {
outgoing.send(data)
}
@ -59,9 +67,9 @@ abstract class Port(val scope: CoroutineScope) : Closeable {
}
override fun close() {
scope.cancel("The port is closed")
outgoing.close()
incoming.close()
sendJob.cancel()
}
}

View File

@ -2,13 +2,14 @@
package hep.dataforge.control.server
import hep.dataforge.control.api.Device.Companion.GET_PROPERTY_ACTION
import hep.dataforge.control.api.Device.Companion.SET_PROPERTY_ACTION
import hep.dataforge.control.api.get
import hep.dataforge.control.api.respondMessage
import hep.dataforge.control.controllers.DeviceController.Companion.GET_PROPERTY_ACTION
import hep.dataforge.control.controllers.DeviceController.Companion.SET_PROPERTY_ACTION
import hep.dataforge.control.controllers.DeviceManager
import hep.dataforge.control.controllers.DeviceMessage
import hep.dataforge.control.controllers.data
import hep.dataforge.control.controllers.respondMessage
import hep.dataforge.meta.toJson
import hep.dataforge.meta.toMeta
import hep.dataforge.meta.toMetaItem