Working prototype
This commit is contained in:
parent
9ca10930b8
commit
75ee237ac6
2
.gitignore
vendored
2
.gitignore
vendored
@ -3,5 +3,5 @@
|
||||
.gradle
|
||||
*.iws
|
||||
out/
|
||||
/build/
|
||||
build/
|
||||
!gradle-wrapper.jar
|
@ -1,4 +1,6 @@
|
||||
val dataforgeVersion by extra("0.1.5-dev-9")
|
||||
val dataforgeVersion by extra("0.1.8-dev-4")
|
||||
val plotlyVersion by extra("0.2.0-dev-4")
|
||||
|
||||
|
||||
allprojects {
|
||||
repositories {
|
||||
@ -9,7 +11,7 @@ allprojects {
|
||||
}
|
||||
|
||||
group = "hep.dataforge"
|
||||
version = "0.1.0-dev"
|
||||
version = "0.0.1"
|
||||
}
|
||||
|
||||
val githubProject by extra("dataforge-control")
|
||||
|
@ -1,17 +1,26 @@
|
||||
import scientifik.useCoroutines
|
||||
|
||||
plugins {
|
||||
id("scientifik.mpp")
|
||||
id("scientifik.publish")
|
||||
id("kotlinx-atomicfu") version "0.14.3"
|
||||
}
|
||||
|
||||
val dataforgeVersion: String by rootProject.extra
|
||||
|
||||
kotlin {
|
||||
useCoroutines(version = "1.3.7")
|
||||
|
||||
kotlin {
|
||||
sourceSets {
|
||||
commonMain{
|
||||
dependencies {
|
||||
api("hep.dataforge:dataforge-io:$dataforgeVersion")
|
||||
//implementation("org.jetbrains.kotlinx:atomicfu-common:0.14.3")
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
atomicfu {
|
||||
variant = "VH"
|
||||
}
|
@ -1,7 +1,7 @@
|
||||
package hep.dataforge.control.api
|
||||
|
||||
import hep.dataforge.meta.scheme.Scheme
|
||||
import hep.dataforge.meta.scheme.SchemeSpec
|
||||
import hep.dataforge.meta.Scheme
|
||||
import hep.dataforge.meta.SchemeSpec
|
||||
|
||||
/**
|
||||
* A descriptor for property
|
||||
|
@ -3,8 +3,10 @@ package hep.dataforge.control.api
|
||||
import hep.dataforge.meta.Meta
|
||||
import hep.dataforge.meta.MetaItem
|
||||
import kotlinx.coroutines.CoroutineScope
|
||||
import kotlinx.coroutines.cancel
|
||||
import kotlinx.io.Closeable
|
||||
|
||||
interface Device {
|
||||
interface Device: Closeable {
|
||||
/**
|
||||
* List of supported property descriptors
|
||||
*/
|
||||
@ -20,7 +22,17 @@ interface Device {
|
||||
*/
|
||||
val scope: CoroutineScope
|
||||
|
||||
var listener: PropertyChangeListener?
|
||||
/**
|
||||
* Register a new property change listener for this device.
|
||||
* [owner] is provided optionally in order for listener to be
|
||||
* easily removable
|
||||
*/
|
||||
fun registerListener(listener: PropertyChangeListener, owner: Any? = listener)
|
||||
|
||||
/**
|
||||
* Remove all listeners belonging to specified owner
|
||||
*/
|
||||
fun removeListener(owner: Any?)
|
||||
|
||||
/**
|
||||
* Get the value of the property or throw error if property in not defined. Suspend if property value is not available
|
||||
@ -42,10 +54,17 @@ interface Device {
|
||||
* Send a request and suspend caller while request is being processed.
|
||||
* Could return null if request does not return meaningful answer.
|
||||
*/
|
||||
suspend fun action(name: String, argument: Meta? = null): Meta?
|
||||
suspend fun call(action: String, argument: MetaItem<*>? = null): MetaItem<*>?
|
||||
|
||||
override fun close() {
|
||||
scope.cancel("The device is closed")
|
||||
}
|
||||
|
||||
companion object {
|
||||
const val GET_PROPERTY_ACTION = "@getProperty"
|
||||
const val SET_PROPERTY_ACTION = "@setProperty"
|
||||
const val CALL_ACTION ="@call"
|
||||
}
|
||||
}
|
||||
|
||||
suspend fun Device.call(name: String, meta: Meta?) = call(name, meta?.let { MetaItem.NodeItem(it) })
|
@ -20,4 +20,4 @@ suspend fun DeviceHub.setProperty(deviceName: String, propertyName: String, valu
|
||||
|
||||
suspend fun DeviceHub.request(deviceName: String, command: String, argument: MetaItem<*>?): MetaItem<*>? =
|
||||
(getDevice(deviceName) ?: error("Device with name $deviceName not found in the hub"))
|
||||
.action(command, argument)
|
||||
.call(command, argument)
|
@ -3,5 +3,5 @@ package hep.dataforge.control.api
|
||||
import hep.dataforge.meta.MetaItem
|
||||
|
||||
interface PropertyChangeListener {
|
||||
fun propertyChanged(propertyName: String, value: MetaItem<*>)
|
||||
fun propertyChanged(propertyName: String, value: MetaItem<*>?)
|
||||
}
|
@ -1,7 +1,7 @@
|
||||
package hep.dataforge.control.api
|
||||
|
||||
import hep.dataforge.meta.scheme.Scheme
|
||||
import hep.dataforge.meta.scheme.SchemeSpec
|
||||
import hep.dataforge.meta.Scheme
|
||||
import hep.dataforge.meta.SchemeSpec
|
||||
|
||||
/**
|
||||
* A descriptor for property
|
||||
|
@ -1,18 +1,64 @@
|
||||
package hep.dataforge.control.base
|
||||
|
||||
import hep.dataforge.control.api.ActionDescriptor
|
||||
import hep.dataforge.meta.Meta
|
||||
import hep.dataforge.meta.MetaBuilder
|
||||
import hep.dataforge.meta.MetaItem
|
||||
import hep.dataforge.values.Value
|
||||
import kotlin.properties.ReadOnlyProperty
|
||||
import kotlin.reflect.KProperty
|
||||
|
||||
interface Action {
|
||||
val name: String
|
||||
val descriptor: ActionDescriptor
|
||||
suspend operator fun invoke(arg: Meta?): Meta?
|
||||
suspend operator fun invoke(arg: MetaItem<*>? = null): MetaItem<*>?
|
||||
}
|
||||
|
||||
class SimpleAction(
|
||||
override val name: String,
|
||||
override val descriptor: ActionDescriptor,
|
||||
val block: suspend (Meta?)->Meta?
|
||||
val block: suspend (MetaItem<*>?) -> MetaItem<*>?
|
||||
) : Action {
|
||||
override suspend fun invoke(arg: Meta?): Meta? = block(arg)
|
||||
override suspend fun invoke(arg: MetaItem<*>?): MetaItem<*>? = block(arg)
|
||||
}
|
||||
|
||||
class ActionDelegate<D : DeviceBase>(
|
||||
val owner: D,
|
||||
val descriptor: ActionDescriptor = ActionDescriptor.empty(),
|
||||
val block: suspend (MetaItem<*>?) -> MetaItem<*>?
|
||||
) : ReadOnlyProperty<D, Action> {
|
||||
override fun getValue(thisRef: D, property: KProperty<*>): Action {
|
||||
val name = property.name
|
||||
return owner.resolveAction(name) {
|
||||
SimpleAction(name, descriptor, block)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
fun <D : DeviceBase> D.request(
|
||||
descriptor: ActionDescriptor = ActionDescriptor.empty(),
|
||||
block: suspend (MetaItem<*>?) -> MetaItem<*>?
|
||||
): ActionDelegate<D> = ActionDelegate(this, descriptor, block)
|
||||
|
||||
fun <D : DeviceBase> D.requestValue(
|
||||
descriptor: ActionDescriptor = ActionDescriptor.empty(),
|
||||
block: suspend (MetaItem<*>?) -> Any?
|
||||
): ActionDelegate<D> = ActionDelegate(this, descriptor){
|
||||
val res = block(it)
|
||||
MetaItem.ValueItem(Value.of(res))
|
||||
}
|
||||
|
||||
fun <D : DeviceBase> D.requestMeta(
|
||||
descriptor: ActionDescriptor = ActionDescriptor.empty(),
|
||||
block: suspend MetaBuilder.(MetaItem<*>?) -> Unit
|
||||
): ActionDelegate<D> = ActionDelegate(this, descriptor){
|
||||
val res = MetaBuilder().apply { block(it)}
|
||||
MetaItem.NodeItem(res)
|
||||
}
|
||||
|
||||
fun <D : DeviceBase> D.action(
|
||||
descriptor: ActionDescriptor = ActionDescriptor.empty(),
|
||||
block: suspend (MetaItem<*>?) -> Unit
|
||||
): ActionDelegate<D> = ActionDelegate(this, descriptor) {
|
||||
block(it)
|
||||
null
|
||||
}
|
@ -4,19 +4,24 @@ import hep.dataforge.control.api.ActionDescriptor
|
||||
import hep.dataforge.control.api.Device
|
||||
import hep.dataforge.control.api.PropertyChangeListener
|
||||
import hep.dataforge.control.api.PropertyDescriptor
|
||||
import hep.dataforge.meta.Meta
|
||||
import hep.dataforge.meta.MetaItem
|
||||
import kotlin.jvm.JvmStatic
|
||||
import kotlin.reflect.KProperty
|
||||
|
||||
abstract class DeviceBase : Device, PropertyChangeListener {
|
||||
private val properties = HashMap<String, ReadOnlyProperty>()
|
||||
abstract class DeviceBase : Device {
|
||||
private val properties = HashMap<String, ReadOnlyDeviceProperty>()
|
||||
private val actions = HashMap<String, Action>()
|
||||
|
||||
override var listener: PropertyChangeListener? = null
|
||||
private val listeners = ArrayList<Pair<Any?, PropertyChangeListener>>(4)
|
||||
|
||||
override fun propertyChanged(propertyName: String, value: MetaItem<*>) {
|
||||
listener?.propertyChanged(propertyName, value)
|
||||
override fun registerListener(listener: PropertyChangeListener, owner: Any?) {
|
||||
listeners.add(owner to listener)
|
||||
}
|
||||
|
||||
override fun removeListener(owner: Any?) {
|
||||
listeners.removeAll { it.first == owner }
|
||||
}
|
||||
|
||||
internal fun propertyChanged(propertyName: String, value: MetaItem<*>?) {
|
||||
listeners.forEach { it.second.propertyChanged(propertyName, value) }
|
||||
}
|
||||
|
||||
override val propertyDescriptors: Collection<PropertyDescriptor>
|
||||
@ -25,23 +30,12 @@ abstract class DeviceBase : Device, PropertyChangeListener {
|
||||
override val actionDescriptors: Collection<ActionDescriptor>
|
||||
get() = actions.values.map { it.descriptor }
|
||||
|
||||
fun <P : ReadOnlyProperty> initProperty(prop: P): P {
|
||||
properties[prop.name] = prop
|
||||
return prop
|
||||
internal fun resolveProperty(name: String, builder: () -> ReadOnlyDeviceProperty): ReadOnlyDeviceProperty {
|
||||
return properties.getOrPut(name, builder)
|
||||
}
|
||||
|
||||
fun initRequest(Action: Action): Action {
|
||||
actions[Action.name] = Action
|
||||
return Action
|
||||
}
|
||||
|
||||
protected fun initRequest(
|
||||
name: String,
|
||||
descriptor: ActionDescriptor = ActionDescriptor.empty(),
|
||||
block: suspend (MetaItem<*>?) -> MetaItem<*>?
|
||||
): Action {
|
||||
val request = SimpleAction(name, descriptor, block)
|
||||
return initRequest(request)
|
||||
internal fun resolveAction(name: String, builder: () -> Action): Action {
|
||||
return actions.getOrPut(name, builder)
|
||||
}
|
||||
|
||||
override suspend fun getProperty(propertyName: String): MetaItem<*> =
|
||||
@ -52,48 +46,19 @@ abstract class DeviceBase : Device, PropertyChangeListener {
|
||||
}
|
||||
|
||||
override suspend fun setProperty(propertyName: String, value: MetaItem<*>) {
|
||||
(properties[propertyName] as? Property ?: error("Property with name $propertyName not defined")).write(value)
|
||||
(properties[propertyName] as? DeviceProperty ?: error("Property with name $propertyName not defined")).write(
|
||||
value
|
||||
)
|
||||
}
|
||||
|
||||
override suspend fun action(name: String, argument: Meta?): Meta? =
|
||||
(actions[name] ?: error("Request with name $name not defined")).invoke(argument)
|
||||
override suspend fun call(action: String, argument: MetaItem<*>?): MetaItem<*>? =
|
||||
(actions[action] ?: error("Request with name $action not defined")).invoke(argument)
|
||||
|
||||
|
||||
companion object {
|
||||
|
||||
@JvmStatic
|
||||
protected fun <D : DeviceBase, P : ReadOnlyProperty> D.initProperty(
|
||||
name: String,
|
||||
builder: PropertyBuilder<D>.() -> P
|
||||
): P {
|
||||
val property = PropertyBuilder(name, this).run(builder)
|
||||
initProperty(property)
|
||||
return property
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
class PropertyDelegateProvider<D : DeviceBase, T : Any, P : GenericReadOnlyProperty<D, T>>(
|
||||
val owner: D,
|
||||
val builder: PropertyBuilder<D>.() -> P
|
||||
) {
|
||||
operator fun provideDelegate(thisRef: D, property: KProperty<*>): P {
|
||||
val name = property.name
|
||||
return owner.initProperty(PropertyBuilder(name, owner).run(builder))
|
||||
}
|
||||
}
|
||||
|
||||
fun <D : DeviceBase, T : Any> D.property(
|
||||
builder: PropertyBuilder<D>.() -> GenericReadOnlyProperty<D, T>
|
||||
): PropertyDelegateProvider<D, T, GenericReadOnlyProperty<D, T>> {
|
||||
return PropertyDelegateProvider(this, builder)
|
||||
}
|
||||
|
||||
//TODO try to use 'property' with new inference
|
||||
fun <D : DeviceBase, T : Any> D.mutableProperty(
|
||||
builder: PropertyBuilder<D>.() -> GenericProperty<D, T>
|
||||
): PropertyDelegateProvider<D, T, GenericProperty<D, T>> {
|
||||
return PropertyDelegateProvider(this, builder)
|
||||
}
|
||||
|
||||
|
||||
|
@ -0,0 +1,85 @@
|
||||
package hep.dataforge.control.base
|
||||
|
||||
import hep.dataforge.control.api.PropertyDescriptor
|
||||
import hep.dataforge.meta.MetaItem
|
||||
import kotlinx.coroutines.*
|
||||
import kotlinx.coroutines.flow.Flow
|
||||
import kotlin.properties.ReadOnlyProperty
|
||||
import kotlin.properties.ReadWriteProperty
|
||||
import kotlin.reflect.KProperty
|
||||
import kotlin.time.Duration
|
||||
|
||||
/**
|
||||
* Read-only device property
|
||||
*/
|
||||
interface ReadOnlyDeviceProperty : ReadOnlyProperty<Any?, MetaItem<*>?> {
|
||||
/**
|
||||
* Property name, should be unique in device
|
||||
*/
|
||||
val name: String
|
||||
|
||||
/**
|
||||
* Property descriptor
|
||||
*/
|
||||
val descriptor: PropertyDescriptor
|
||||
|
||||
val scope: CoroutineScope
|
||||
|
||||
/**
|
||||
* Erase logical value and force re-read from device on next [read]
|
||||
*/
|
||||
suspend fun invalidate()
|
||||
|
||||
// /**
|
||||
// * Update property logical value and notify listener without writing it to device
|
||||
// */
|
||||
// suspend fun update(item: MetaItem<*>)
|
||||
//
|
||||
/**
|
||||
* Get cached value and return null if value is invalid or not initialized
|
||||
*/
|
||||
val value: MetaItem<*>?
|
||||
|
||||
/**
|
||||
* Read value either from cache if cache is valid or directly from physical device.
|
||||
* If [force], reread
|
||||
*/
|
||||
suspend fun read(force: Boolean = false): MetaItem<*>
|
||||
|
||||
/**
|
||||
* The [Flow] representing future logical states of the property.
|
||||
* Produces null when the state is invalidated
|
||||
*/
|
||||
fun flow(): Flow<MetaItem<*>?>
|
||||
|
||||
override fun getValue(thisRef: Any?, property: KProperty<*>): MetaItem<*>? = value
|
||||
}
|
||||
|
||||
/**
|
||||
* Launch recurring force re-read job on a property scope with given [duration] between reads.
|
||||
*/
|
||||
fun ReadOnlyDeviceProperty.readEvery(duration: Duration): Job = scope.launch {
|
||||
while (isActive) {
|
||||
read(true)
|
||||
delay(duration)
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* A writeable device property with non-suspended write
|
||||
*/
|
||||
interface DeviceProperty : ReadOnlyDeviceProperty, ReadWriteProperty<Any?, MetaItem<*>?> {
|
||||
override var value: MetaItem<*>?
|
||||
|
||||
/**
|
||||
* Write value to physical device. Invalidates logical value, but does not update it automatically
|
||||
*/
|
||||
suspend fun write(item: MetaItem<*>)
|
||||
|
||||
override fun setValue(thisRef: Any?, property: KProperty<*>, value: MetaItem<*>?) {
|
||||
this.value = value
|
||||
}
|
||||
|
||||
override fun getValue(thisRef: Any?, property: KProperty<*>): MetaItem<*>? = value
|
||||
}
|
||||
|
@ -1,86 +0,0 @@
|
||||
package hep.dataforge.control.base
|
||||
|
||||
import hep.dataforge.control.api.PropertyDescriptor
|
||||
import hep.dataforge.meta.MetaItem
|
||||
import hep.dataforge.meta.transformations.MetaCaster
|
||||
import kotlinx.coroutines.launch
|
||||
import kotlinx.coroutines.sync.Mutex
|
||||
import kotlinx.coroutines.sync.withLock
|
||||
import kotlinx.coroutines.withContext
|
||||
import kotlin.properties.ReadWriteProperty
|
||||
import kotlin.reflect.KProperty
|
||||
|
||||
open class GenericReadOnlyProperty<D : DeviceBase, T : Any>(
|
||||
override val name: String,
|
||||
override val descriptor: PropertyDescriptor,
|
||||
override val owner: D,
|
||||
internal val converter: MetaCaster<T>,
|
||||
internal val getter: suspend D.() -> T
|
||||
) : ReadOnlyProperty, kotlin.properties.ReadOnlyProperty<Any?, T?> {
|
||||
|
||||
protected val mutex = Mutex()
|
||||
protected var value: T? = null
|
||||
|
||||
suspend fun updateValue(value: T) {
|
||||
mutex.withLock { this.value = value }
|
||||
owner.propertyChanged(name, converter.objectToMetaItem(value))
|
||||
}
|
||||
|
||||
override suspend fun invalidate() {
|
||||
mutex.withLock { value = null }
|
||||
}
|
||||
|
||||
suspend fun readValue(force: Boolean = false): T {
|
||||
if (force) invalidate()
|
||||
return value ?: withContext(owner.scope.coroutineContext) {
|
||||
//all device operations should be run on device context
|
||||
owner.getter().also { updateValue(it) }
|
||||
}
|
||||
}
|
||||
|
||||
fun peekValue(): T? = value
|
||||
|
||||
override suspend fun update(item: MetaItem<*>) {
|
||||
updateValue(converter.itemToObject(item))
|
||||
}
|
||||
|
||||
override suspend fun read(force: Boolean): MetaItem<*> = converter.objectToMetaItem(readValue(force))
|
||||
|
||||
override fun peek(): MetaItem<*>? = value?.let { converter.objectToMetaItem(it) }
|
||||
|
||||
override fun getValue(thisRef: Any?, property: KProperty<*>): T? = peekValue()
|
||||
}
|
||||
|
||||
class GenericProperty<D : DeviceBase, T : Any>(
|
||||
name: String,
|
||||
descriptor: PropertyDescriptor,
|
||||
owner: D,
|
||||
converter: MetaCaster<T>,
|
||||
getter: suspend D.() -> T,
|
||||
private val setter: suspend D.(oldValue: T?, newValue: T) -> Unit
|
||||
) : Property, ReadWriteProperty<Any?, T?>, GenericReadOnlyProperty<D, T>(name, descriptor, owner, converter, getter) {
|
||||
|
||||
suspend fun writeValue(newValue: T) {
|
||||
val oldValue = value
|
||||
withContext(owner.scope.coroutineContext) {
|
||||
//all device operations should be run on device context
|
||||
invalidate()
|
||||
owner.setter(oldValue, newValue)
|
||||
}
|
||||
}
|
||||
|
||||
override suspend fun write(item: MetaItem<*>) {
|
||||
writeValue(converter.itemToObject(item))
|
||||
}
|
||||
|
||||
override fun setValue(thisRef: Any?, property: KProperty<*>, value: T?) {
|
||||
owner.scope.launch {
|
||||
if (value == null) {
|
||||
invalidate()
|
||||
} else {
|
||||
writeValue(value)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
}
|
@ -1,54 +0,0 @@
|
||||
package hep.dataforge.control.base
|
||||
|
||||
import hep.dataforge.control.api.Device
|
||||
import hep.dataforge.control.api.PropertyDescriptor
|
||||
import hep.dataforge.meta.MetaItem
|
||||
|
||||
/**
|
||||
* Read-only device property
|
||||
*/
|
||||
interface ReadOnlyProperty {
|
||||
/**
|
||||
* Property name, should be unique in device
|
||||
*/
|
||||
val name: String
|
||||
|
||||
val owner: Device
|
||||
|
||||
/**
|
||||
* Property descriptor
|
||||
*/
|
||||
val descriptor: PropertyDescriptor
|
||||
|
||||
/**
|
||||
* Erase logical value and force re-read from device on next [read]
|
||||
*/
|
||||
suspend fun invalidate()
|
||||
|
||||
/**
|
||||
* Update property logical value and notify listener without writing it to device
|
||||
*/
|
||||
suspend fun update(item: MetaItem<*>)
|
||||
|
||||
/**
|
||||
* Get cached value and return null if value is invalid
|
||||
*/
|
||||
fun peek(): MetaItem<*>?
|
||||
|
||||
/**
|
||||
* Read value either from cache if cache is valid or directly from physical device
|
||||
*/
|
||||
suspend fun read(force: Boolean = false): MetaItem<*>
|
||||
}
|
||||
|
||||
/**
|
||||
* A single writeable property handler
|
||||
*/
|
||||
interface Property : ReadOnlyProperty {
|
||||
|
||||
/**
|
||||
* Write value to physical device. Invalidates logical value, but does not update it automatically
|
||||
*/
|
||||
suspend fun write(item: MetaItem<*>)
|
||||
}
|
||||
|
@ -1,56 +0,0 @@
|
||||
package hep.dataforge.control.base
|
||||
|
||||
import hep.dataforge.control.api.PropertyDescriptor
|
||||
import hep.dataforge.meta.transformations.MetaCaster
|
||||
import hep.dataforge.values.Value
|
||||
|
||||
class PropertyBuilder<D : DeviceBase>(val name: String, val owner: D) {
|
||||
var descriptor: PropertyDescriptor = PropertyDescriptor.empty()
|
||||
|
||||
inline fun descriptor(block: PropertyDescriptor.() -> Unit) {
|
||||
descriptor.apply(block)
|
||||
}
|
||||
|
||||
fun <T : Any> get(converter: MetaCaster<T>, getter: (suspend D.() -> T)): GenericReadOnlyProperty<D, T> =
|
||||
GenericReadOnlyProperty(name, descriptor, owner, converter, getter)
|
||||
|
||||
fun getDouble(getter: (suspend D.() -> Double)): GenericReadOnlyProperty<D, Double> =
|
||||
GenericReadOnlyProperty(name, descriptor, owner, MetaCaster.double) { getter() }
|
||||
|
||||
fun getString(getter: suspend D.() -> String): GenericReadOnlyProperty<D, String> =
|
||||
GenericReadOnlyProperty(name, descriptor, owner, MetaCaster.string) { getter() }
|
||||
|
||||
fun getBoolean(getter: suspend D.() -> Boolean): GenericReadOnlyProperty<D, Boolean> =
|
||||
GenericReadOnlyProperty(name, descriptor, owner, MetaCaster.boolean) { getter() }
|
||||
|
||||
fun getValue(getter: suspend D.() -> Any?): GenericReadOnlyProperty<D, Value> =
|
||||
GenericReadOnlyProperty(name, descriptor, owner, MetaCaster.value) { Value.of(getter()) }
|
||||
|
||||
/**
|
||||
* Convert this read-only property to read-write property
|
||||
*/
|
||||
infix fun <T : Any> GenericReadOnlyProperty<D, T>.set(setter: (suspend D.(oldValue: T?, newValue: T) -> Unit)): GenericProperty<D, T> {
|
||||
return GenericProperty(name, descriptor, owner, converter, getter, setter)
|
||||
}
|
||||
|
||||
/**
|
||||
* Create read-write property with synchronized setter which updates value after set
|
||||
*/
|
||||
fun <T : Any> GenericReadOnlyProperty<D, T>.set(synchronousSetter: (suspend D.(oldValue: T?, newValue: T) -> T)): GenericProperty<D, T> {
|
||||
val setter: suspend D.(oldValue: T?, newValue: T) -> Unit = { oldValue, newValue ->
|
||||
val result = synchronousSetter(oldValue, newValue)
|
||||
updateValue(result)
|
||||
}
|
||||
return GenericProperty(name, descriptor, owner, converter, getter, setter)
|
||||
}
|
||||
|
||||
/**
|
||||
* Define a setter that does nothing for virtual property
|
||||
*/
|
||||
fun <T : Any> GenericReadOnlyProperty<D, T>.virtualSet(): GenericProperty<D, T> {
|
||||
val setter: suspend D.(oldValue: T?, newValue: T) -> Unit = { oldValue, newValue ->
|
||||
updateValue(newValue)
|
||||
}
|
||||
return GenericProperty(name, descriptor, owner, converter, getter, setter)
|
||||
}
|
||||
}
|
@ -0,0 +1,250 @@
|
||||
package hep.dataforge.control.base
|
||||
|
||||
import hep.dataforge.control.api.PropertyDescriptor
|
||||
import hep.dataforge.meta.Meta
|
||||
import hep.dataforge.meta.MetaBuilder
|
||||
import hep.dataforge.meta.MetaItem
|
||||
import hep.dataforge.meta.double
|
||||
import hep.dataforge.values.Value
|
||||
import hep.dataforge.values.asValue
|
||||
import kotlinx.coroutines.CoroutineScope
|
||||
import kotlinx.coroutines.ExperimentalCoroutinesApi
|
||||
import kotlinx.coroutines.flow.MutableStateFlow
|
||||
import kotlinx.coroutines.flow.StateFlow
|
||||
import kotlinx.coroutines.launch
|
||||
import kotlinx.coroutines.sync.Mutex
|
||||
import kotlinx.coroutines.sync.withLock
|
||||
import kotlinx.coroutines.withContext
|
||||
import kotlin.properties.ReadOnlyProperty
|
||||
import kotlin.reflect.KProperty
|
||||
|
||||
@OptIn(ExperimentalCoroutinesApi::class)
|
||||
open class SimpleReadOnlyDeviceProperty(
|
||||
override val name: String,
|
||||
default: MetaItem<*>?,
|
||||
override val descriptor: PropertyDescriptor,
|
||||
override val scope: CoroutineScope,
|
||||
private val updateCallback: (name: String, item: MetaItem<*>) -> Unit,
|
||||
private val getter: suspend (before: MetaItem<*>?) -> MetaItem<*>
|
||||
) : ReadOnlyDeviceProperty {
|
||||
|
||||
private val state: MutableStateFlow<MetaItem<*>?> = MutableStateFlow(default)
|
||||
override val value: MetaItem<*>? get() = state.value
|
||||
|
||||
override suspend fun invalidate() {
|
||||
state.value = null
|
||||
}
|
||||
|
||||
private fun update(item: MetaItem<*>) {
|
||||
state.value = item
|
||||
updateCallback(name, item)
|
||||
}
|
||||
|
||||
override suspend fun read(force: Boolean): MetaItem<*> {
|
||||
//backup current value
|
||||
val currentValue = value
|
||||
return if (force || currentValue == null) {
|
||||
val res = withContext(scope.coroutineContext) {
|
||||
//all device operations should be run on device context
|
||||
//TODO add error catching
|
||||
getter(currentValue)
|
||||
}
|
||||
update(res)
|
||||
res
|
||||
} else {
|
||||
currentValue
|
||||
}
|
||||
}
|
||||
|
||||
override fun flow(): StateFlow<MetaItem<*>?> = state
|
||||
}
|
||||
|
||||
private class ReadOnlyDevicePropertyDelegate<D : DeviceBase>(
|
||||
val owner: D,
|
||||
val default: MetaItem<*>?,
|
||||
val descriptor: PropertyDescriptor = PropertyDescriptor.empty(),
|
||||
private val getter: suspend (MetaItem<*>?) -> MetaItem<*>
|
||||
) : ReadOnlyProperty<D, SimpleReadOnlyDeviceProperty> {
|
||||
|
||||
override fun getValue(thisRef: D, property: KProperty<*>): SimpleReadOnlyDeviceProperty {
|
||||
val name = property.name
|
||||
|
||||
return owner.resolveProperty(name) {
|
||||
@OptIn(ExperimentalCoroutinesApi::class)
|
||||
SimpleReadOnlyDeviceProperty(
|
||||
name,
|
||||
default,
|
||||
descriptor,
|
||||
owner.scope,
|
||||
owner::propertyChanged,
|
||||
getter
|
||||
)
|
||||
} as SimpleReadOnlyDeviceProperty
|
||||
}
|
||||
}
|
||||
|
||||
fun <D : DeviceBase> D.reading(
|
||||
default: MetaItem<*>? = null,
|
||||
descriptorBuilder: PropertyDescriptor.() -> Unit = {},
|
||||
getter: suspend (MetaItem<*>?) -> MetaItem<*>
|
||||
): ReadOnlyProperty<D, SimpleReadOnlyDeviceProperty> = ReadOnlyDevicePropertyDelegate(
|
||||
this,
|
||||
default,
|
||||
PropertyDescriptor.invoke(descriptorBuilder),
|
||||
getter
|
||||
)
|
||||
|
||||
fun <D : DeviceBase> D.readingValue(
|
||||
default: Value? = null,
|
||||
descriptorBuilder: PropertyDescriptor.() -> Unit = {},
|
||||
getter: suspend () -> Any
|
||||
): ReadOnlyProperty<D, SimpleReadOnlyDeviceProperty> = ReadOnlyDevicePropertyDelegate(
|
||||
this,
|
||||
default?.let { MetaItem.ValueItem(it) },
|
||||
PropertyDescriptor.invoke(descriptorBuilder),
|
||||
{ MetaItem.ValueItem(Value.of(getter())) }
|
||||
)
|
||||
|
||||
fun <D : DeviceBase> D.readingNumber(
|
||||
default: Number? = null,
|
||||
descriptorBuilder: PropertyDescriptor.() -> Unit = {},
|
||||
getter: suspend () -> Number
|
||||
): ReadOnlyProperty<D, SimpleReadOnlyDeviceProperty> = ReadOnlyDevicePropertyDelegate(
|
||||
this,
|
||||
default?.let { MetaItem.ValueItem(it.asValue()) },
|
||||
PropertyDescriptor.invoke(descriptorBuilder),
|
||||
{
|
||||
val number = getter()
|
||||
MetaItem.ValueItem(number.asValue())
|
||||
}
|
||||
)
|
||||
|
||||
fun <D : DeviceBase> D.readingMeta(
|
||||
default: Meta? = null,
|
||||
descriptorBuilder: PropertyDescriptor.() -> Unit = {},
|
||||
getter: suspend MetaBuilder.() -> Unit
|
||||
): ReadOnlyProperty<D, SimpleReadOnlyDeviceProperty> = ReadOnlyDevicePropertyDelegate(
|
||||
this,
|
||||
default?.let { MetaItem.NodeItem(it) },
|
||||
PropertyDescriptor.invoke(descriptorBuilder),
|
||||
{ MetaItem.NodeItem(MetaBuilder().apply { getter() }) }
|
||||
)
|
||||
|
||||
@OptIn(ExperimentalCoroutinesApi::class)
|
||||
class SimpleDeviceProperty(
|
||||
name: String,
|
||||
default: MetaItem<*>?,
|
||||
descriptor: PropertyDescriptor,
|
||||
scope: CoroutineScope,
|
||||
updateCallback: (name: String, item: MetaItem<*>?) -> Unit,
|
||||
getter: suspend (MetaItem<*>?) -> MetaItem<*>,
|
||||
private val setter: suspend (oldValue: MetaItem<*>?, newValue: MetaItem<*>) -> Unit
|
||||
) : SimpleReadOnlyDeviceProperty(name, default, descriptor, scope, updateCallback, getter), DeviceProperty {
|
||||
|
||||
override var value: MetaItem<*>?
|
||||
get() = super.value
|
||||
set(value) {
|
||||
scope.launch {
|
||||
if (value == null) {
|
||||
invalidate()
|
||||
} else {
|
||||
write(value)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private val writeLock = Mutex()
|
||||
|
||||
override suspend fun write(item: MetaItem<*>) {
|
||||
writeLock.withLock {
|
||||
//fast return if value is not changed
|
||||
if (item == value) return@withLock
|
||||
val oldValue = value
|
||||
//all device operations should be run on device context
|
||||
withContext(scope.coroutineContext) {
|
||||
//TODO add error catching
|
||||
setter(oldValue, item)
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private class DevicePropertyDelegate<D : DeviceBase>(
|
||||
val owner: D,
|
||||
val default: MetaItem<*>?,
|
||||
val descriptor: PropertyDescriptor = PropertyDescriptor.empty(),
|
||||
private val getter: suspend (MetaItem<*>?) -> MetaItem<*>,
|
||||
private val setter: suspend (oldValue: MetaItem<*>?, newValue: MetaItem<*>) -> Unit
|
||||
) : ReadOnlyProperty<D, SimpleDeviceProperty> {
|
||||
|
||||
override fun getValue(thisRef: D, property: KProperty<*>): SimpleDeviceProperty {
|
||||
val name = property.name
|
||||
return owner.resolveProperty(name) {
|
||||
@OptIn(ExperimentalCoroutinesApi::class)
|
||||
SimpleDeviceProperty(
|
||||
name,
|
||||
default,
|
||||
descriptor,
|
||||
owner.scope,
|
||||
owner::propertyChanged,
|
||||
getter,
|
||||
setter
|
||||
)
|
||||
} as SimpleDeviceProperty
|
||||
}
|
||||
}
|
||||
|
||||
fun <D : DeviceBase> D.writing(
|
||||
default: MetaItem<*>? = null,
|
||||
descriptorBuilder: PropertyDescriptor.() -> Unit = {},
|
||||
getter: suspend (MetaItem<*>?) -> MetaItem<*>,
|
||||
setter: suspend (oldValue: MetaItem<*>?, newValue: MetaItem<*>) -> Unit
|
||||
): ReadOnlyProperty<D, SimpleDeviceProperty> = DevicePropertyDelegate(
|
||||
this,
|
||||
default,
|
||||
PropertyDescriptor.invoke(descriptorBuilder),
|
||||
getter,
|
||||
setter
|
||||
)
|
||||
|
||||
fun <D : DeviceBase> D.writingVirtual(
|
||||
default: MetaItem<*>,
|
||||
descriptorBuilder: PropertyDescriptor.() -> Unit = {}
|
||||
): ReadOnlyProperty<D, SimpleDeviceProperty> = writing(
|
||||
default,
|
||||
descriptorBuilder,
|
||||
getter = { it ?: default },
|
||||
setter = { _, _ -> }
|
||||
)
|
||||
|
||||
fun <D : DeviceBase> D.writingVirtual(
|
||||
default: Value,
|
||||
descriptorBuilder: PropertyDescriptor.() -> Unit = {}
|
||||
): ReadOnlyProperty<D, SimpleDeviceProperty> = writing(
|
||||
MetaItem.ValueItem(default),
|
||||
descriptorBuilder,
|
||||
getter = { it ?: MetaItem.ValueItem(default) },
|
||||
setter = { _, _ -> }
|
||||
)
|
||||
|
||||
fun <D : DeviceBase> D.writingDouble(
|
||||
descriptorBuilder: PropertyDescriptor.() -> Unit = {},
|
||||
getter: suspend (Double) -> Double,
|
||||
setter: suspend (oldValue: Double?, newValue: Double) -> Unit
|
||||
): ReadOnlyProperty<D, SimpleDeviceProperty> {
|
||||
val innerGetter: suspend (MetaItem<*>?) -> MetaItem<*> = {
|
||||
MetaItem.ValueItem(getter(it.double ?: Double.NaN).asValue())
|
||||
}
|
||||
|
||||
val innerSetter: suspend (oldValue: MetaItem<*>?, newValue: MetaItem<*>) -> Unit = { oldValue, newValue ->
|
||||
setter(oldValue.double, newValue.double ?: Double.NaN)
|
||||
}
|
||||
|
||||
return DevicePropertyDelegate(
|
||||
this,
|
||||
MetaItem.ValueItem(Double.NaN.asValue()),
|
||||
PropertyDescriptor.invoke(descriptorBuilder),
|
||||
innerGetter,
|
||||
innerSetter
|
||||
)
|
||||
}
|
@ -1,34 +1,34 @@
|
||||
package hep.dataforge.control.controlers
|
||||
|
||||
import hep.dataforge.meta.Meta
|
||||
import hep.dataforge.meta.scheme.*
|
||||
|
||||
class PropertyValue : Scheme() {
|
||||
var name by string { error("Name property not defined") }
|
||||
var value by item()
|
||||
|
||||
companion object : SchemeSpec<PropertyValue>(::PropertyValue)
|
||||
}
|
||||
import hep.dataforge.meta.*
|
||||
import hep.dataforge.names.asName
|
||||
|
||||
open class DeviceMessage : Scheme() {
|
||||
var id by item()
|
||||
var source by string()//TODO consider replacing by item
|
||||
var target by string()
|
||||
var action by string()
|
||||
var status by string()
|
||||
var comment by string()
|
||||
var action by string(key = MESSAGE_ACTION_KEY)
|
||||
var status by string(RESPONSE_OK_STATUS)
|
||||
var value by item(key = MESSAGE_VALUE_KEY)
|
||||
|
||||
companion object : SchemeSpec<DeviceMessage>(::DeviceMessage) {
|
||||
const val MESSAGE_ACTION_KEY = "action"
|
||||
const val MESSAGE_PROPERTY_NAME_KEY = "propertyName"
|
||||
const val MESSAGE_VALUE_KEY = "value"
|
||||
val MESSAGE_ACTION_KEY = "action".asName()
|
||||
val MESSAGE_VALUE_KEY = "value".asName()
|
||||
const val RESPONSE_OK_STATUS = "response.OK"
|
||||
const val EVENT_STATUS = "event.propertyChange"
|
||||
const val RESPONSE_FAIL_STATUS = "response.FAIL"
|
||||
|
||||
fun ok(request: DeviceMessage? = null, block: DeviceMessage.() -> Unit): Meta {
|
||||
fun ok(request: DeviceMessage? = null, block: DeviceMessage.() -> Unit = {}): DeviceMessage {
|
||||
return DeviceMessage {
|
||||
id = request?.id
|
||||
status = RESPONSE_OK_STATUS
|
||||
}.apply(block).toMeta()
|
||||
}.apply(block)
|
||||
}
|
||||
|
||||
fun fail(request: DeviceMessage? = null,block: DeviceMessage.() -> Unit = {}): DeviceMessage {
|
||||
return DeviceMessage {
|
||||
id = request?.id
|
||||
status = RESPONSE_FAIL_STATUS
|
||||
}.apply(block)
|
||||
}
|
||||
}
|
||||
}
|
||||
@ -41,15 +41,22 @@ class DevicePropertyMessage : DeviceMessage() {
|
||||
this.property = PropertyValue.invoke(builder)
|
||||
}
|
||||
|
||||
class PropertyValue : Scheme() {
|
||||
var name by string { error("Property name not defined") }
|
||||
var value by item()
|
||||
|
||||
companion object : SchemeSpec<PropertyValue>(::PropertyValue)
|
||||
}
|
||||
|
||||
companion object : SchemeSpec<DevicePropertyMessage>(::DevicePropertyMessage) {
|
||||
fun ok(request: DeviceMessage? = null, block: DevicePropertyMessage.() -> Unit): Meta {
|
||||
const val PROPERTY_CHANGED_ACTION = "event.propertyChange"
|
||||
fun ok(request: DeviceMessage? = null, block: DevicePropertyMessage.() -> Unit = {}): DeviceMessage {
|
||||
return DevicePropertyMessage {
|
||||
id = request?.id
|
||||
property {
|
||||
name
|
||||
}
|
||||
status = RESPONSE_OK_STATUS
|
||||
}.apply(block).toMeta()
|
||||
}.apply(block)
|
||||
}
|
||||
}
|
||||
}
|
@ -1,60 +0,0 @@
|
||||
package hep.dataforge.control.controlers
|
||||
|
||||
import hep.dataforge.control.api.Device
|
||||
import hep.dataforge.control.api.PropertyChangeListener
|
||||
import hep.dataforge.control.controlers.DeviceMessage.Companion.EVENT_STATUS
|
||||
import hep.dataforge.io.Envelope
|
||||
import hep.dataforge.io.SimpleEnvelope
|
||||
import hep.dataforge.meta.MetaItem
|
||||
import kotlinx.coroutines.CoroutineScope
|
||||
import kotlinx.coroutines.channels.Channel
|
||||
import kotlinx.coroutines.channels.Channel.Factory.CONFLATED
|
||||
import kotlinx.coroutines.channels.SendChannel
|
||||
import kotlinx.coroutines.flow.consumeAsFlow
|
||||
import kotlinx.coroutines.launch
|
||||
import kotlinx.io.Closeable
|
||||
import kotlinx.io.EmptyBinary
|
||||
|
||||
class FlowController<D : Device>(val device: D, val target: String, val scope: CoroutineScope) : PropertyChangeListener,
|
||||
Closeable {
|
||||
|
||||
init {
|
||||
if (device.listener != null) error("Can't attach controller to $device, the controller is already attached")
|
||||
device.listener = this
|
||||
}
|
||||
|
||||
private val outputChannel = Channel<Envelope>(CONFLATED)
|
||||
private val inputChannel = Channel<Envelope>(CONFLATED)
|
||||
|
||||
val input: SendChannel<Envelope> get() = inputChannel
|
||||
val output = outputChannel.consumeAsFlow()
|
||||
|
||||
init {
|
||||
scope.launch {
|
||||
while (!inputChannel.isClosedForSend) {
|
||||
val request = inputChannel.receive()
|
||||
val response = device.respond(target, request)
|
||||
outputChannel.send(response)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
override fun propertyChanged(propertyName: String, value: MetaItem<*>) {
|
||||
scope.launch {
|
||||
val changeMeta = DevicePropertyMessage.ok {
|
||||
this.source = target
|
||||
status = EVENT_STATUS
|
||||
property {
|
||||
name = propertyName
|
||||
this.value = value
|
||||
}
|
||||
}
|
||||
outputChannel.send(SimpleEnvelope(changeMeta, EmptyBinary))
|
||||
}
|
||||
}
|
||||
|
||||
override fun close() {
|
||||
outputChannel.cancel()
|
||||
}
|
||||
|
||||
}
|
@ -0,0 +1,104 @@
|
||||
package hep.dataforge.control.controlers
|
||||
|
||||
import hep.dataforge.control.api.Device
|
||||
import hep.dataforge.control.api.PropertyChangeListener
|
||||
import hep.dataforge.control.controlers.DevicePropertyMessage.Companion.PROPERTY_CHANGED_ACTION
|
||||
import hep.dataforge.io.Envelope
|
||||
import hep.dataforge.io.Responder
|
||||
import hep.dataforge.io.SimpleEnvelope
|
||||
import hep.dataforge.meta.MetaItem
|
||||
import hep.dataforge.meta.get
|
||||
import hep.dataforge.meta.string
|
||||
import hep.dataforge.meta.wrap
|
||||
import kotlinx.io.Binary
|
||||
|
||||
|
||||
interface MessageConsumer {
|
||||
fun consume(message: Envelope): Unit
|
||||
}
|
||||
|
||||
class MessageController(
|
||||
val device: Device,
|
||||
val deviceTarget: String
|
||||
) : Responder, PropertyChangeListener {
|
||||
|
||||
init {
|
||||
device.registerListener(this, this)
|
||||
}
|
||||
|
||||
var messageListener: MessageConsumer? = null
|
||||
|
||||
override suspend fun respond(request: Envelope): Envelope {
|
||||
val responseMessage: DeviceMessage = try {
|
||||
when (val action = request.meta[DeviceMessage.MESSAGE_ACTION_KEY].string ?: error("Action not defined")) {
|
||||
Device.GET_PROPERTY_ACTION -> {
|
||||
val message = DevicePropertyMessage.wrap(request.meta)
|
||||
val property = message.property ?: error("Property item not defined")
|
||||
val propertyName: String = property.name
|
||||
val result = device.getProperty(propertyName)
|
||||
|
||||
DevicePropertyMessage.ok {
|
||||
this.source = deviceTarget
|
||||
this.target = message.source
|
||||
property {
|
||||
name = propertyName
|
||||
value = result
|
||||
}
|
||||
}
|
||||
}
|
||||
Device.SET_PROPERTY_ACTION -> {
|
||||
val message = DevicePropertyMessage.wrap(request.meta)
|
||||
val property = message.property ?: error("Property item not defined")
|
||||
val propertyName: String = property.name
|
||||
val propertyValue = property.value
|
||||
if (propertyValue == null) {
|
||||
device.invalidateProperty(propertyName)
|
||||
} else {
|
||||
device.setProperty(propertyName, propertyValue)
|
||||
}
|
||||
DevicePropertyMessage.ok {
|
||||
this.source = deviceTarget
|
||||
this.target = message.source
|
||||
property {
|
||||
name = propertyName
|
||||
}
|
||||
}
|
||||
}
|
||||
else -> {
|
||||
val value = request.meta[DeviceMessage.MESSAGE_VALUE_KEY]
|
||||
val result = device.call(action, value)
|
||||
DeviceMessage.ok {
|
||||
this.source = deviceTarget
|
||||
this.action = action
|
||||
this.value = result
|
||||
}
|
||||
}
|
||||
}
|
||||
} catch (ex: Exception) {
|
||||
DeviceMessage.fail {
|
||||
comment = ex.message
|
||||
}
|
||||
}
|
||||
|
||||
return SimpleEnvelope(responseMessage.toMeta(), Binary.EMPTY)
|
||||
}
|
||||
|
||||
override fun propertyChanged(propertyName: String, value: MetaItem<*>?) {
|
||||
if (value == null) return
|
||||
messageListener?.let { listener ->
|
||||
val change = DevicePropertyMessage.ok {
|
||||
this.source = deviceTarget
|
||||
action = PROPERTY_CHANGED_ACTION
|
||||
property {
|
||||
name = propertyName
|
||||
this.value = value
|
||||
}
|
||||
}
|
||||
val envelope = SimpleEnvelope(change.toMeta(), Binary.EMPTY)
|
||||
listener.consume(envelope)
|
||||
}
|
||||
}
|
||||
|
||||
companion object {
|
||||
}
|
||||
}
|
@ -0,0 +1,57 @@
|
||||
package hep.dataforge.control.controlers
|
||||
|
||||
import hep.dataforge.io.Envelope
|
||||
import kotlinx.coroutines.CoroutineScope
|
||||
import kotlinx.coroutines.ExperimentalCoroutinesApi
|
||||
import kotlinx.coroutines.channels.Channel
|
||||
import kotlinx.coroutines.channels.Channel.Factory.CONFLATED
|
||||
import kotlinx.coroutines.channels.SendChannel
|
||||
import kotlinx.coroutines.flow.Flow
|
||||
import kotlinx.coroutines.flow.consumeAsFlow
|
||||
import kotlinx.coroutines.launch
|
||||
import kotlinx.io.Closeable
|
||||
|
||||
@ExperimentalCoroutinesApi
|
||||
class MessageFlow(
|
||||
val controller: MessageController,
|
||||
val scope: CoroutineScope
|
||||
) : Closeable, MessageConsumer {
|
||||
|
||||
init {
|
||||
if (controller.messageListener != null) error("Can't attach controller to $controller, the controller is already attached")
|
||||
controller.messageListener = this
|
||||
}
|
||||
|
||||
private val outputChannel = Channel<Envelope>(CONFLATED)
|
||||
private val inputChannel = Channel<Envelope>(CONFLATED)
|
||||
|
||||
val input: SendChannel<Envelope> get() = inputChannel
|
||||
val output: Flow<Envelope> = outputChannel.consumeAsFlow()
|
||||
|
||||
init {
|
||||
scope.launch {
|
||||
while (!inputChannel.isClosedForSend) {
|
||||
val request = inputChannel.receive()
|
||||
val response = controller.respond(request)
|
||||
outputChannel.send(response)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
override fun consume(message: Envelope) {
|
||||
scope.launch {
|
||||
outputChannel.send(message)
|
||||
}
|
||||
}
|
||||
|
||||
override fun close() {
|
||||
outputChannel.cancel()
|
||||
}
|
||||
}
|
||||
|
||||
@ExperimentalCoroutinesApi
|
||||
fun MessageController.flow(scope: CoroutineScope = device.scope): MessageFlow {
|
||||
return MessageFlow(this, scope).also {
|
||||
this@flow.messageListener = it
|
||||
}
|
||||
}
|
@ -0,0 +1,28 @@
|
||||
package hep.dataforge.control.controlers
|
||||
|
||||
import hep.dataforge.control.api.Device
|
||||
import hep.dataforge.control.api.PropertyChangeListener
|
||||
import hep.dataforge.meta.MetaItem
|
||||
import kotlinx.coroutines.ExperimentalCoroutinesApi
|
||||
import kotlinx.coroutines.channels.awaitClose
|
||||
import kotlinx.coroutines.flow.Flow
|
||||
import kotlinx.coroutines.flow.callbackFlow
|
||||
import kotlinx.coroutines.launch
|
||||
|
||||
|
||||
@ExperimentalCoroutinesApi
|
||||
suspend fun Device.valueFlow(): Flow<Pair<String, MetaItem<*>>> = callbackFlow {
|
||||
val listener = object : PropertyChangeListener {
|
||||
override fun propertyChanged(propertyName: String, value: MetaItem<*>?) {
|
||||
if (value != null) {
|
||||
launch {
|
||||
send(propertyName to value)
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
registerListener(listener, listener)
|
||||
awaitClose {
|
||||
removeListener(listener)
|
||||
}
|
||||
}
|
@ -0,0 +1,10 @@
|
||||
package hep.dataforge.control.controlers
|
||||
|
||||
import hep.dataforge.control.base.DeviceProperty
|
||||
import hep.dataforge.control.base.ReadOnlyDeviceProperty
|
||||
import hep.dataforge.meta.double
|
||||
import hep.dataforge.meta.map
|
||||
import hep.dataforge.meta.transform
|
||||
|
||||
fun ReadOnlyDeviceProperty.double() = map { it.double }
|
||||
fun DeviceProperty.double() = transform { it.double ?: Double.NaN }
|
@ -1,61 +0,0 @@
|
||||
package hep.dataforge.control.controlers
|
||||
|
||||
import hep.dataforge.control.api.Device
|
||||
import hep.dataforge.io.Envelope
|
||||
import hep.dataforge.io.SimpleEnvelope
|
||||
import hep.dataforge.meta.Meta
|
||||
import hep.dataforge.meta.set
|
||||
import kotlinx.io.EmptyBinary
|
||||
|
||||
suspend fun Device.respond(target: String, request: Envelope, action: String): Envelope {
|
||||
val metaResult = when (action) {
|
||||
Device.GET_PROPERTY_ACTION -> {
|
||||
val message = DevicePropertyMessage.wrap(request.meta)
|
||||
val property = message.property ?: error("Property item not defined")
|
||||
val propertyName: String = property.name
|
||||
val result = getProperty(propertyName)
|
||||
|
||||
DevicePropertyMessage.ok {
|
||||
this.source = target
|
||||
this.target = message.source
|
||||
property {
|
||||
name = propertyName
|
||||
value = result
|
||||
}
|
||||
}
|
||||
}
|
||||
Device.SET_PROPERTY_ACTION -> {
|
||||
val message = DevicePropertyMessage.wrap(request.meta)
|
||||
val property = message.property ?: error("Property item not defined")
|
||||
val propertyName: String = property.name
|
||||
val propertyValue = property.value
|
||||
if (propertyValue == null) {
|
||||
invalidateProperty(propertyName)
|
||||
} else {
|
||||
setProperty(propertyName, propertyValue)
|
||||
}
|
||||
DevicePropertyMessage.ok {
|
||||
this.source = target
|
||||
this.target = message.source
|
||||
property {
|
||||
name = propertyName
|
||||
}
|
||||
}
|
||||
}
|
||||
else -> {
|
||||
val data: Meta? = request.meta[DeviceMessage.MESSAGE_VALUE_KEY].node
|
||||
val result = action(action, data)
|
||||
DeviceMessage.ok {
|
||||
this.source = target
|
||||
config[DeviceMessage.MESSAGE_ACTION_KEY] = action
|
||||
config[DeviceMessage.MESSAGE_VALUE_KEY] = result
|
||||
}
|
||||
}
|
||||
}
|
||||
return SimpleEnvelope(metaResult, EmptyBinary)
|
||||
}
|
||||
|
||||
suspend fun Device.respond(target: String, request: Envelope): Envelope {
|
||||
val action: String = request.meta[DeviceMessage.MESSAGE_ACTION_KEY].string ?: error("Action not defined")
|
||||
return respond(target, request, action)
|
||||
}
|
@ -1,33 +0,0 @@
|
||||
package hep.dataforge.control.demo
|
||||
|
||||
import hep.dataforge.control.base.DeviceBase
|
||||
import hep.dataforge.control.base.mutableProperty
|
||||
import hep.dataforge.control.base.property
|
||||
import hep.dataforge.meta.Meta
|
||||
import kotlinx.coroutines.CoroutineScope
|
||||
import java.time.Instant
|
||||
import kotlin.math.cos
|
||||
import kotlin.math.sin
|
||||
|
||||
class VirtualDevice(val meta: Meta, override val scope: CoroutineScope) : DeviceBase() {
|
||||
|
||||
var scale by mutableProperty {
|
||||
getDouble {
|
||||
200.0
|
||||
}.virtualSet()
|
||||
}
|
||||
|
||||
val sin by property {
|
||||
getDouble {
|
||||
val time = Instant.now()
|
||||
sin(time.toEpochMilli().toDouble() / (scale ?: 1000.0))
|
||||
}
|
||||
}
|
||||
|
||||
val cos by property {
|
||||
getDouble {
|
||||
val time = Instant.now()
|
||||
cos(time.toEpochMilli().toDouble() / (scale ?: 1000.0))
|
||||
}
|
||||
}
|
||||
}
|
24
demo/build.gradle.kts
Normal file
24
demo/build.gradle.kts
Normal file
@ -0,0 +1,24 @@
|
||||
plugins {
|
||||
kotlin("jvm") version "1.3.72"
|
||||
}
|
||||
|
||||
val plotlyVersion: String by rootProject.extra
|
||||
|
||||
repositories{
|
||||
jcenter()
|
||||
maven("https://kotlin.bintray.com/kotlinx")
|
||||
maven("https://dl.bintray.com/kotlin/kotlin-eap")
|
||||
maven("https://dl.bintray.com/mipt-npm/dataforge")
|
||||
maven("https://dl.bintray.com/mipt-npm/scientifik")
|
||||
maven("https://dl.bintray.com/mipt-npm/dev")
|
||||
}
|
||||
|
||||
dependencies{
|
||||
implementation(kotlin("stdlib-jdk8"))
|
||||
implementation(project(":dataforge-control-core"))
|
||||
implementation("scientifik:plotlykt-server:$plotlyVersion")
|
||||
}
|
||||
|
||||
tasks.withType<org.jetbrains.kotlin.gradle.tasks.KotlinCompile>().configureEach {
|
||||
kotlinOptions.jvmTarget = "11"
|
||||
}
|
@ -0,0 +1,30 @@
|
||||
package hep.dataforge.control.demo
|
||||
|
||||
import kotlinx.coroutines.*
|
||||
import kotlinx.coroutines.flow.MutableStateFlow
|
||||
import kotlinx.coroutines.flow.collect
|
||||
import java.util.concurrent.Executors
|
||||
|
||||
val producerDispatcher = Executors.newSingleThreadExecutor().asCoroutineDispatcher()
|
||||
|
||||
fun main() {
|
||||
runBlocking {
|
||||
val test = MutableStateFlow(0)
|
||||
|
||||
launch {
|
||||
var counter = 0
|
||||
while (isActive){
|
||||
delay(500)
|
||||
counter++
|
||||
println("produced $counter")
|
||||
test.value = counter
|
||||
}
|
||||
}
|
||||
|
||||
launch(producerDispatcher) {
|
||||
test.collect{
|
||||
println("collected $it")
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
@ -0,0 +1,59 @@
|
||||
package hep.dataforge.control.demo
|
||||
|
||||
import hep.dataforge.control.base.*
|
||||
import hep.dataforge.control.controlers.double
|
||||
import hep.dataforge.values.asValue
|
||||
import kotlinx.coroutines.CoroutineScope
|
||||
import kotlinx.coroutines.GlobalScope
|
||||
import kotlinx.coroutines.asCoroutineDispatcher
|
||||
import java.time.Instant
|
||||
import java.util.concurrent.Executors
|
||||
import kotlin.math.cos
|
||||
import kotlin.math.sin
|
||||
import kotlin.time.ExperimentalTime
|
||||
import kotlin.time.seconds
|
||||
|
||||
@OptIn(ExperimentalTime::class)
|
||||
class DemoDevice(parentScope: CoroutineScope = GlobalScope) : DeviceBase() {
|
||||
|
||||
private val executor = Executors.newSingleThreadExecutor()
|
||||
|
||||
override val scope: CoroutineScope = CoroutineScope(
|
||||
parentScope.coroutineContext + executor.asCoroutineDispatcher()
|
||||
)
|
||||
|
||||
val scaleProperty: SimpleDeviceProperty by writingVirtual(5000.0.asValue())
|
||||
var scale by scaleProperty.double()
|
||||
|
||||
val resetScale: Action by action {
|
||||
scale = 5000.0
|
||||
}
|
||||
|
||||
val sin by readingNumber {
|
||||
val time = Instant.now()
|
||||
sin(time.toEpochMilli().toDouble() / scale)
|
||||
}
|
||||
|
||||
val cos by readingNumber {
|
||||
val time = Instant.now()
|
||||
cos(time.toEpochMilli().toDouble() / scale)
|
||||
}
|
||||
|
||||
val coordinates by readingMeta {
|
||||
val time = Instant.now()
|
||||
"time" put time.toEpochMilli()
|
||||
"x" put sin(time.toEpochMilli().toDouble() / scale)
|
||||
"y" put cos(time.toEpochMilli().toDouble() / scale)
|
||||
}
|
||||
|
||||
init {
|
||||
sin.readEvery(0.2.seconds)
|
||||
cos.readEvery(0.2.seconds)
|
||||
coordinates.readEvery(0.2.seconds)
|
||||
}
|
||||
|
||||
override fun close() {
|
||||
super.close()
|
||||
executor.shutdown()
|
||||
}
|
||||
}
|
119
demo/src/main/kotlin/hep/dataforge/control/demo/DemoMain.kt
Normal file
119
demo/src/main/kotlin/hep/dataforge/control/demo/DemoMain.kt
Normal file
@ -0,0 +1,119 @@
|
||||
package hep.dataforge.control.demo
|
||||
|
||||
import hep.dataforge.meta.double
|
||||
import kotlinx.coroutines.Dispatchers
|
||||
import kotlinx.coroutines.flow.collect
|
||||
import kotlinx.coroutines.flow.mapNotNull
|
||||
import kotlinx.coroutines.flow.zip
|
||||
import kotlinx.coroutines.launch
|
||||
import kotlinx.coroutines.runBlocking
|
||||
import scientifik.plotly.Plotly
|
||||
import scientifik.plotly.layout
|
||||
import scientifik.plotly.models.Trace
|
||||
import scientifik.plotly.server.pushUpdates
|
||||
import scientifik.plotly.server.serve
|
||||
import java.util.concurrent.ConcurrentLinkedQueue
|
||||
|
||||
fun main() {
|
||||
runBlocking(Dispatchers.Default) {
|
||||
val device = DemoDevice()
|
||||
|
||||
val sinFlow = device.sin.flow()
|
||||
val cosFlow = device.cos.flow()
|
||||
val sinCosFlow = sinFlow.zip(cosFlow) { sin, cos ->
|
||||
sin.double to cos.double
|
||||
}
|
||||
|
||||
// launch {
|
||||
// device.valueFlow().collect { (name, item) ->
|
||||
// if (name == "sin") {
|
||||
// println("Device produced $item")
|
||||
// println("Sin value is ${sinFlow.value}")
|
||||
// }
|
||||
// }
|
||||
// }
|
||||
//
|
||||
// launch {
|
||||
// sinFlow.mapNotNull { it.double }.collect {
|
||||
// println("Device processed $it")
|
||||
// }
|
||||
// }
|
||||
|
||||
val server = Plotly.serve(this) {
|
||||
plot(rowNumber = 0, colOrderNumber = 0, size = 6) {
|
||||
layout {
|
||||
title = "sin property"
|
||||
xaxis.title = "point index"
|
||||
yaxis.title = "sin"
|
||||
}
|
||||
val trace = Trace.empty()
|
||||
data.add(trace)
|
||||
launch {
|
||||
val queue = ConcurrentLinkedQueue<Double>()
|
||||
|
||||
sinFlow.mapNotNull { it.double }.collect {
|
||||
queue.add(it)
|
||||
if (queue.size >= 100) {
|
||||
queue.poll()
|
||||
}
|
||||
trace.y.numbers = queue
|
||||
}
|
||||
}
|
||||
}
|
||||
plot(rowNumber = 0, colOrderNumber = 1, size = 6) {
|
||||
layout {
|
||||
title = "cos property"
|
||||
xaxis.title = "point index"
|
||||
yaxis.title = "cos"
|
||||
}
|
||||
val trace = Trace.empty()
|
||||
data.add(trace)
|
||||
launch {
|
||||
val queue = ConcurrentLinkedQueue<Double>()
|
||||
|
||||
cosFlow.mapNotNull { it.double }.collect {
|
||||
queue.add(it)
|
||||
if (queue.size >= 100) {
|
||||
queue.poll()
|
||||
}
|
||||
trace.y.numbers = queue
|
||||
}
|
||||
}
|
||||
}
|
||||
plot(rowNumber = 1, colOrderNumber = 0, size = 12) {
|
||||
layout {
|
||||
title = "cos vs sin"
|
||||
xaxis.title = "sin"
|
||||
yaxis.title = "cos"
|
||||
}
|
||||
val trace = Trace.empty()
|
||||
data.add(trace)
|
||||
launch {
|
||||
val queue = ConcurrentLinkedQueue<Pair<Double, Double>>()
|
||||
|
||||
sinCosFlow.collect { pair ->
|
||||
val x = pair.first ?: return@collect
|
||||
val y = pair.second ?: return@collect
|
||||
queue.add(x to y)
|
||||
if (queue.size >= 20) {
|
||||
queue.poll()
|
||||
}
|
||||
trace.x.numbers = queue.map { it.first }
|
||||
trace.y.numbers = queue.map { it.second }
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
|
||||
}.pushUpdates()
|
||||
|
||||
|
||||
|
||||
readLine()
|
||||
|
||||
println("Stopping")
|
||||
server.stop()
|
||||
device.close()
|
||||
}
|
||||
}
|
@ -0,0 +1,30 @@
|
||||
package hep.dataforge.control.demo
|
||||
|
||||
import hep.dataforge.meta.MetaItem
|
||||
import hep.dataforge.meta.double
|
||||
import hep.dataforge.values.Null
|
||||
import hep.dataforge.values.asValue
|
||||
import kotlinx.coroutines.delay
|
||||
import kotlinx.coroutines.flow.MutableStateFlow
|
||||
import kotlinx.coroutines.flow.collect
|
||||
import kotlinx.coroutines.flow.map
|
||||
import kotlinx.coroutines.launch
|
||||
import kotlinx.coroutines.runBlocking
|
||||
|
||||
fun main() {
|
||||
runBlocking {
|
||||
val flow: MutableStateFlow<MetaItem<*>> = MutableStateFlow<MetaItem<*>>(MetaItem.ValueItem(Null))
|
||||
|
||||
val collector = launch {
|
||||
flow.map { it.double }.collect {
|
||||
println(it)
|
||||
}
|
||||
}
|
||||
|
||||
repeat(10) {
|
||||
delay(10)
|
||||
flow.value = MetaItem.ValueItem(it.toDouble().asValue())
|
||||
}
|
||||
collector.cancel()
|
||||
}
|
||||
}
|
@ -1,6 +1,6 @@
|
||||
pluginManagement {
|
||||
val kotlinVersion = "1.3.61"
|
||||
val toolsVersion = "0.3.2"
|
||||
val kotlinVersion = "1.3.72"
|
||||
val toolsVersion = "0.5.0"
|
||||
|
||||
repositories {
|
||||
mavenLocal()
|
||||
@ -27,6 +27,7 @@ pluginManagement {
|
||||
eachPlugin {
|
||||
when (requested.id.id) {
|
||||
"scientifik.publish", "scientifik.mpp", "scientifik.jvm", "scientifik.js" -> useModule("scientifik:gradle-tools:${toolsVersion}")
|
||||
"kotlinx-atomicfu" -> useModule("org.jetbrains.kotlinx:atomicfu-gradle-plugin:${requested.version}")
|
||||
}
|
||||
}
|
||||
}
|
||||
@ -35,7 +36,9 @@ pluginManagement {
|
||||
rootProject.name = "dataforge-control"
|
||||
|
||||
include(
|
||||
":dataforge-control-core"
|
||||
":dataforge-control-core",
|
||||
":demo"
|
||||
)
|
||||
|
||||
includeBuild("../dataforge-core")
|
||||
//includeBuild("../dataforge-core")
|
||||
//includeBuild("../plotly.kt")
|
Loading…
Reference in New Issue
Block a user