Basic design for external connectors

This commit is contained in:
Alexander Nozik 2020-03-02 19:17:34 +03:00
parent 69074fef9e
commit 9ca10930b8
14 changed files with 270 additions and 65 deletions

View File

@ -10,7 +10,7 @@ kotlin {
sourceSets { sourceSets {
commonMain{ commonMain{
dependencies { dependencies {
api("hep.dataforge:dataforge-context:$dataforgeVersion") api("hep.dataforge:dataforge-io:$dataforgeVersion")
} }
} }
} }

View File

@ -6,9 +6,9 @@ import hep.dataforge.meta.scheme.SchemeSpec
/** /**
* A descriptor for property * A descriptor for property
*/ */
class RequestDescriptor : Scheme() { class ActionDescriptor : Scheme() {
//var name by string { error("Property name is mandatory") } //var name by string { error("Property name is mandatory") }
//var descriptor by spec(ItemDescriptor) //var descriptor by spec(ItemDescriptor)
companion object : SchemeSpec<RequestDescriptor>(::RequestDescriptor) companion object : SchemeSpec<ActionDescriptor>(::ActionDescriptor)
} }

View File

@ -1,5 +1,6 @@
package hep.dataforge.control.api package hep.dataforge.control.api
import hep.dataforge.meta.Meta
import hep.dataforge.meta.MetaItem import hep.dataforge.meta.MetaItem
import kotlinx.coroutines.CoroutineScope import kotlinx.coroutines.CoroutineScope
@ -12,20 +13,25 @@ interface Device {
/** /**
* List of supported requests descriptors * List of supported requests descriptors
*/ */
val requestDescriptors: Collection<RequestDescriptor> val actionDescriptors: Collection<ActionDescriptor>
/** /**
* The scope encompassing all operations on a device. When canceled, cancels all running processes * The scope encompassing all operations on a device. When canceled, cancels all running processes
*/ */
val scope: CoroutineScope val scope: CoroutineScope
var controller: PropertyChangeListener? var listener: PropertyChangeListener?
/** /**
* Get the value of the property or throw error if property in not defined. Suspend if property value is not available * Get the value of the property or throw error if property in not defined. Suspend if property value is not available
*/ */
suspend fun getProperty(propertyName: String): MetaItem<*> suspend fun getProperty(propertyName: String): MetaItem<*>
/**
* Invalidate property and force recalculate
*/
suspend fun invalidateProperty(propertyName: String)
/** /**
* Set property [value] for a property with name [propertyName]. * Set property [value] for a property with name [propertyName].
* In rare cases could suspend if the [Device] supports command queue and it is full at the moment. * In rare cases could suspend if the [Device] supports command queue and it is full at the moment.
@ -36,5 +42,10 @@ interface Device {
* Send a request and suspend caller while request is being processed. * Send a request and suspend caller while request is being processed.
* Could return null if request does not return meaningful answer. * Could return null if request does not return meaningful answer.
*/ */
suspend fun request(name: String, argument: MetaItem<*>? = null): MetaItem<*>? suspend fun action(name: String, argument: Meta? = null): Meta?
companion object {
const val GET_PROPERTY_ACTION = "@getProperty"
const val SET_PROPERTY_ACTION = "@setProperty"
}
} }

View File

@ -20,4 +20,4 @@ suspend fun DeviceHub.setProperty(deviceName: String, propertyName: String, valu
suspend fun DeviceHub.request(deviceName: String, command: String, argument: MetaItem<*>?): MetaItem<*>? = suspend fun DeviceHub.request(deviceName: String, command: String, argument: MetaItem<*>?): MetaItem<*>? =
(getDevice(deviceName) ?: error("Device with name $deviceName not found in the hub")) (getDevice(deviceName) ?: error("Device with name $deviceName not found in the hub"))
.request(command, argument) .action(command, argument)

View File

@ -0,0 +1,18 @@
package hep.dataforge.control.base
import hep.dataforge.control.api.ActionDescriptor
import hep.dataforge.meta.Meta
interface Action {
val name: String
val descriptor: ActionDescriptor
suspend operator fun invoke(arg: Meta?): Meta?
}
class SimpleAction(
override val name: String,
override val descriptor: ActionDescriptor,
val block: suspend (Meta?)->Meta?
): Action{
override suspend fun invoke(arg: Meta?): Meta? = block(arg)
}

View File

@ -1,59 +1,66 @@
package hep.dataforge.control.base package hep.dataforge.control.base
import hep.dataforge.control.api.ActionDescriptor
import hep.dataforge.control.api.Device import hep.dataforge.control.api.Device
import hep.dataforge.control.api.PropertyChangeListener import hep.dataforge.control.api.PropertyChangeListener
import hep.dataforge.control.api.PropertyDescriptor import hep.dataforge.control.api.PropertyDescriptor
import hep.dataforge.control.api.RequestDescriptor import hep.dataforge.meta.Meta
import hep.dataforge.meta.MetaItem import hep.dataforge.meta.MetaItem
import kotlin.jvm.JvmStatic import kotlin.jvm.JvmStatic
import kotlin.reflect.KProperty import kotlin.reflect.KProperty
abstract class DeviceBase : Device, PropertyChangeListener { abstract class DeviceBase : Device, PropertyChangeListener {
private val properties = HashMap<String, ReadOnlyProperty>() private val properties = HashMap<String, ReadOnlyProperty>()
private val requests = HashMap<String, Request>() private val actions = HashMap<String, Action>()
override var controller: PropertyChangeListener? = null override var listener: PropertyChangeListener? = null
override fun propertyChanged(propertyName: String, value: MetaItem<*>) { override fun propertyChanged(propertyName: String, value: MetaItem<*>) {
controller?.propertyChanged(propertyName, value) listener?.propertyChanged(propertyName, value)
} }
override val propertyDescriptors: Collection<PropertyDescriptor> override val propertyDescriptors: Collection<PropertyDescriptor>
get() = properties.values.map { it.descriptor } get() = properties.values.map { it.descriptor }
override val requestDescriptors: Collection<RequestDescriptor> override val actionDescriptors: Collection<ActionDescriptor>
get() = requests.values.map { it.descriptor } get() = actions.values.map { it.descriptor }
fun <P : ReadOnlyProperty> initProperty(prop: P): P { fun <P : ReadOnlyProperty> initProperty(prop: P): P {
properties[prop.name] = prop properties[prop.name] = prop
return prop return prop
} }
fun initRequest(request: Request): Request { fun initRequest(Action: Action): Action {
requests[request.name] = request actions[Action.name] = Action
return request return Action
} }
protected fun initRequest( protected fun initRequest(
name: String, name: String,
descriptor: RequestDescriptor = RequestDescriptor.empty(), descriptor: ActionDescriptor = ActionDescriptor.empty(),
block: suspend (MetaItem<*>?) -> MetaItem<*>? block: suspend (MetaItem<*>?) -> MetaItem<*>?
): Request { ): Action {
val request = SimpleRequest(name, descriptor, block) val request = SimpleAction(name, descriptor, block)
return initRequest(request) return initRequest(request)
} }
override suspend fun getProperty(propertyName: String): MetaItem<*> = override suspend fun getProperty(propertyName: String): MetaItem<*> =
(properties[propertyName] ?: error("Property with name $propertyName not defined")).read() (properties[propertyName] ?: error("Property with name $propertyName not defined")).read()
override suspend fun invalidateProperty(propertyName: String) {
(properties[propertyName] ?: error("Property with name $propertyName not defined")).invalidate()
}
override suspend fun setProperty(propertyName: String, value: MetaItem<*>) { override suspend fun setProperty(propertyName: String, value: MetaItem<*>) {
(properties[propertyName] as? Property ?: error("Property with name $propertyName not defined")).write(value) (properties[propertyName] as? Property ?: error("Property with name $propertyName not defined")).write(value)
} }
override suspend fun request(name: String, argument: MetaItem<*>?): MetaItem<*>? = override suspend fun action(name: String, argument: Meta?): Meta? =
(requests[name] ?: error("Request with name $name not defined")).invoke(argument) (actions[name] ?: error("Request with name $name not defined")).invoke(argument)
companion object { companion object {
@JvmStatic @JvmStatic
protected fun <D : DeviceBase, P : ReadOnlyProperty> D.initProperty( protected fun <D : DeviceBase, P : ReadOnlyProperty> D.initProperty(
name: String, name: String,
@ -82,6 +89,7 @@ fun <D : DeviceBase, T : Any> D.property(
return PropertyDelegateProvider(this, builder) return PropertyDelegateProvider(this, builder)
} }
//TODO try to use 'property' with new inference
fun <D : DeviceBase, T : Any> D.mutableProperty( fun <D : DeviceBase, T : Any> D.mutableProperty(
builder: PropertyBuilder<D>.() -> GenericProperty<D, T> builder: PropertyBuilder<D>.() -> GenericProperty<D, T>
): PropertyDelegateProvider<D, T, GenericProperty<D, T>> { ): PropertyDelegateProvider<D, T, GenericProperty<D, T>> {

View File

@ -10,7 +10,7 @@ import kotlinx.coroutines.withContext
import kotlin.properties.ReadWriteProperty import kotlin.properties.ReadWriteProperty
import kotlin.reflect.KProperty import kotlin.reflect.KProperty
open class GenericReadOnlyProperty<D: DeviceBase, T : Any>( open class GenericReadOnlyProperty<D : DeviceBase, T : Any>(
override val name: String, override val name: String,
override val descriptor: PropertyDescriptor, override val descriptor: PropertyDescriptor,
override val owner: D, override val owner: D,
@ -26,26 +26,32 @@ open class GenericReadOnlyProperty<D: DeviceBase, T : Any>(
owner.propertyChanged(name, converter.objectToMetaItem(value)) owner.propertyChanged(name, converter.objectToMetaItem(value))
} }
suspend fun readValue(): T = override suspend fun invalidate() {
value ?: withContext(owner.scope.coroutineContext) { mutex.withLock { value = null }
}
suspend fun readValue(force: Boolean = false): T {
if (force) invalidate()
return value ?: withContext(owner.scope.coroutineContext) {
//all device operations should be run on device context //all device operations should be run on device context
owner.getter().also { updateValue(it) } owner.getter().also { updateValue(it) }
} }
}
fun peekValue(): T? = value fun peekValue(): T? = value
suspend fun update(item: MetaItem<*>) { override suspend fun update(item: MetaItem<*>) {
updateValue(converter.itemToObject(item)) updateValue(converter.itemToObject(item))
} }
override suspend fun read(): MetaItem<*> = converter.objectToMetaItem(readValue()) override suspend fun read(force: Boolean): MetaItem<*> = converter.objectToMetaItem(readValue(force))
override fun peek(): MetaItem<*>? = value?.let { converter.objectToMetaItem(it) } override fun peek(): MetaItem<*>? = value?.let { converter.objectToMetaItem(it) }
override fun getValue(thisRef: Any?, property: KProperty<*>): T? = peekValue() override fun getValue(thisRef: Any?, property: KProperty<*>): T? = peekValue()
} }
class GenericProperty<D: DeviceBase, T : Any>( class GenericProperty<D : DeviceBase, T : Any>(
name: String, name: String,
descriptor: PropertyDescriptor, descriptor: PropertyDescriptor,
owner: D, owner: D,
@ -63,10 +69,6 @@ class GenericProperty<D: DeviceBase, T : Any>(
} }
} }
override suspend fun invalidate() {
mutex.withLock { value = null }
}
override suspend fun write(item: MetaItem<*>) { override suspend fun write(item: MetaItem<*>) {
writeValue(converter.itemToObject(item)) writeValue(converter.itemToObject(item))
} }

View File

@ -20,6 +20,16 @@ interface ReadOnlyProperty {
*/ */
val descriptor: PropertyDescriptor val descriptor: PropertyDescriptor
/**
* Erase logical value and force re-read from device on next [read]
*/
suspend fun invalidate()
/**
* Update property logical value and notify listener without writing it to device
*/
suspend fun update(item: MetaItem<*>)
/** /**
* Get cached value and return null if value is invalid * Get cached value and return null if value is invalid
*/ */
@ -28,7 +38,7 @@ interface ReadOnlyProperty {
/** /**
* Read value either from cache if cache is valid or directly from physical device * Read value either from cache if cache is valid or directly from physical device
*/ */
suspend fun read(): MetaItem<*> suspend fun read(force: Boolean = false): MetaItem<*>
} }
/** /**
@ -36,16 +46,6 @@ interface ReadOnlyProperty {
*/ */
interface Property : ReadOnlyProperty { interface Property : ReadOnlyProperty {
/**
* Update property logical value and notify listener without writing it to device
*/
suspend fun update(item: MetaItem<*>)
/**
* Erase logical value and force re-read from device on next [read]
*/
suspend fun invalidate()
/** /**
* Write value to physical device. Invalidates logical value, but does not update it automatically * Write value to physical device. Invalidates logical value, but does not update it automatically
*/ */

View File

@ -29,18 +29,28 @@ class PropertyBuilder<D : DeviceBase>(val name: String, val owner: D) {
/** /**
* Convert this read-only property to read-write property * Convert this read-only property to read-write property
*/ */
infix fun <T: Any> GenericReadOnlyProperty<D, T>.set(setter: (suspend D.(oldValue: T?, newValue: T) -> Unit)): GenericProperty<D,T> { infix fun <T : Any> GenericReadOnlyProperty<D, T>.set(setter: (suspend D.(oldValue: T?, newValue: T) -> Unit)): GenericProperty<D, T> {
return GenericProperty(name, descriptor, owner, converter, getter, setter) return GenericProperty(name, descriptor, owner, converter, getter, setter)
} }
/** /**
* Create read-write property with synchronized setter which updates value after set * Create read-write property with synchronized setter which updates value after set
*/ */
fun <T: Any> GenericReadOnlyProperty<D, T>.set(synchronousSetter: (suspend D.(oldValue: T?, newValue: T) -> T)): GenericProperty<D,T> { fun <T : Any> GenericReadOnlyProperty<D, T>.set(synchronousSetter: (suspend D.(oldValue: T?, newValue: T) -> T)): GenericProperty<D, T> {
val setter: suspend D.(oldValue: T?, newValue: T) -> Unit = { oldValue, newValue -> val setter: suspend D.(oldValue: T?, newValue: T) -> Unit = { oldValue, newValue ->
val result = synchronousSetter(oldValue, newValue) val result = synchronousSetter(oldValue, newValue)
updateValue(result) updateValue(result)
} }
return GenericProperty(name, descriptor, owner, converter, getter, setter) return GenericProperty(name, descriptor, owner, converter, getter, setter)
} }
/**
* Define a setter that does nothing for virtual property
*/
fun <T : Any> GenericReadOnlyProperty<D, T>.virtualSet(): GenericProperty<D, T> {
val setter: suspend D.(oldValue: T?, newValue: T) -> Unit = { oldValue, newValue ->
updateValue(newValue)
}
return GenericProperty(name, descriptor, owner, converter, getter, setter)
}
} }

View File

@ -1,18 +0,0 @@
package hep.dataforge.control.base
import hep.dataforge.control.api.RequestDescriptor
import hep.dataforge.meta.MetaItem
interface Request {
val name: String
val descriptor: RequestDescriptor
suspend operator fun invoke(arg: MetaItem<*>?): MetaItem<*>?
}
class SimpleRequest(
override val name: String,
override val descriptor: RequestDescriptor,
val block: suspend (MetaItem<*>?)->MetaItem<*>?
): Request{
override suspend fun invoke(arg: MetaItem<*>?): MetaItem<*>? = block(arg)
}

View File

@ -0,0 +1,55 @@
package hep.dataforge.control.controlers
import hep.dataforge.meta.Meta
import hep.dataforge.meta.scheme.*
class PropertyValue : Scheme() {
var name by string { error("Name property not defined") }
var value by item()
companion object : SchemeSpec<PropertyValue>(::PropertyValue)
}
open class DeviceMessage : Scheme() {
var id by item()
var source by string()//TODO consider replacing by item
var target by string()
var action by string()
var status by string()
companion object : SchemeSpec<DeviceMessage>(::DeviceMessage) {
const val MESSAGE_ACTION_KEY = "action"
const val MESSAGE_PROPERTY_NAME_KEY = "propertyName"
const val MESSAGE_VALUE_KEY = "value"
const val RESPONSE_OK_STATUS = "response.OK"
const val EVENT_STATUS = "event.propertyChange"
fun ok(request: DeviceMessage? = null, block: DeviceMessage.() -> Unit): Meta {
return DeviceMessage {
id = request?.id
status = RESPONSE_OK_STATUS
}.apply(block).toMeta()
}
}
}
class DevicePropertyMessage : DeviceMessage() {
//TODO add multiple properties in the same message
var property by spec(PropertyValue)
fun property(builder: PropertyValue.() -> Unit) {
this.property = PropertyValue.invoke(builder)
}
companion object : SchemeSpec<DevicePropertyMessage>(::DevicePropertyMessage) {
fun ok(request: DeviceMessage? = null, block: DevicePropertyMessage.() -> Unit): Meta {
return DevicePropertyMessage {
id = request?.id
property {
name
}
status = RESPONSE_OK_STATUS
}.apply(block).toMeta()
}
}
}

View File

@ -0,0 +1,60 @@
package hep.dataforge.control.controlers
import hep.dataforge.control.api.Device
import hep.dataforge.control.api.PropertyChangeListener
import hep.dataforge.control.controlers.DeviceMessage.Companion.EVENT_STATUS
import hep.dataforge.io.Envelope
import hep.dataforge.io.SimpleEnvelope
import hep.dataforge.meta.MetaItem
import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.channels.Channel
import kotlinx.coroutines.channels.Channel.Factory.CONFLATED
import kotlinx.coroutines.channels.SendChannel
import kotlinx.coroutines.flow.consumeAsFlow
import kotlinx.coroutines.launch
import kotlinx.io.Closeable
import kotlinx.io.EmptyBinary
class FlowController<D : Device>(val device: D, val target: String, val scope: CoroutineScope) : PropertyChangeListener,
Closeable {
init {
if (device.listener != null) error("Can't attach controller to $device, the controller is already attached")
device.listener = this
}
private val outputChannel = Channel<Envelope>(CONFLATED)
private val inputChannel = Channel<Envelope>(CONFLATED)
val input: SendChannel<Envelope> get() = inputChannel
val output = outputChannel.consumeAsFlow()
init {
scope.launch {
while (!inputChannel.isClosedForSend) {
val request = inputChannel.receive()
val response = device.respond(target, request)
outputChannel.send(response)
}
}
}
override fun propertyChanged(propertyName: String, value: MetaItem<*>) {
scope.launch {
val changeMeta = DevicePropertyMessage.ok {
this.source = target
status = EVENT_STATUS
property {
name = propertyName
this.value = value
}
}
outputChannel.send(SimpleEnvelope(changeMeta, EmptyBinary))
}
}
override fun close() {
outputChannel.cancel()
}
}

View File

@ -0,0 +1,61 @@
package hep.dataforge.control.controlers
import hep.dataforge.control.api.Device
import hep.dataforge.io.Envelope
import hep.dataforge.io.SimpleEnvelope
import hep.dataforge.meta.Meta
import hep.dataforge.meta.set
import kotlinx.io.EmptyBinary
suspend fun Device.respond(target: String, request: Envelope, action: String): Envelope {
val metaResult = when (action) {
Device.GET_PROPERTY_ACTION -> {
val message = DevicePropertyMessage.wrap(request.meta)
val property = message.property ?: error("Property item not defined")
val propertyName: String = property.name
val result = getProperty(propertyName)
DevicePropertyMessage.ok {
this.source = target
this.target = message.source
property {
name = propertyName
value = result
}
}
}
Device.SET_PROPERTY_ACTION -> {
val message = DevicePropertyMessage.wrap(request.meta)
val property = message.property ?: error("Property item not defined")
val propertyName: String = property.name
val propertyValue = property.value
if (propertyValue == null) {
invalidateProperty(propertyName)
} else {
setProperty(propertyName, propertyValue)
}
DevicePropertyMessage.ok {
this.source = target
this.target = message.source
property {
name = propertyName
}
}
}
else -> {
val data: Meta? = request.meta[DeviceMessage.MESSAGE_VALUE_KEY].node
val result = action(action, data)
DeviceMessage.ok {
this.source = target
config[DeviceMessage.MESSAGE_ACTION_KEY] = action
config[DeviceMessage.MESSAGE_VALUE_KEY] = result
}
}
}
return SimpleEnvelope(metaResult, EmptyBinary)
}
suspend fun Device.respond(target: String, request: Envelope): Envelope {
val action: String = request.meta[DeviceMessage.MESSAGE_ACTION_KEY].string ?: error("Action not defined")
return respond(target, request, action)
}

View File

@ -14,9 +14,7 @@ class VirtualDevice(val meta: Meta, override val scope: CoroutineScope) : Device
var scale by mutableProperty { var scale by mutableProperty {
getDouble { getDouble {
200.0 200.0
} set { _, _ -> }.virtualSet()
}
} }
val sin by property { val sin by property {