From 75ee237ac6d42225adce79186d3fbe44713ef4f6 Mon Sep 17 00:00:00 2001 From: Alexander Nozik Date: Fri, 5 Jun 2020 21:07:23 +0300 Subject: [PATCH] Working prototype --- .gitignore | 2 +- build.gradle.kts | 6 +- dataforge-control-core/build.gradle.kts | 11 +- .../dataforge/control/api/ActionDescriptor.kt | 4 +- .../hep/dataforge/control/api/Device.kt | 27 +- .../hep/dataforge/control/api/DeviceHub.kt | 2 +- .../control/api/PropertyChangeListener.kt | 2 +- .../control/api/PropertyDescriptor.kt | 4 +- .../hep/dataforge/control/base/Action.kt | 56 +++- .../hep/dataforge/control/base/DeviceBase.kt | 79 ++---- .../dataforge/control/base/DeviceProperty.kt | 85 ++++++ .../dataforge/control/base/GenericProperty.kt | 86 ------ .../hep/dataforge/control/base/Property.kt | 54 ---- .../dataforge/control/base/PropertyBuilder.kt | 56 ---- .../control/base/SimpleDeviceProperty.kt | 250 ++++++++++++++++++ .../control/controlers/DeviceMessage.kt | 49 ++-- .../control/controlers/FlowController.kt | 60 ----- .../control/controlers/MessageController.kt | 104 ++++++++ .../control/controlers/MessageFlow.kt | 57 ++++ .../control/controlers/PropertyFlow.kt | 28 ++ .../control/controlers/delegateMappers.kt | 10 + .../control/controlers/deviceResponse.kt | 61 ----- .../dataforge/control/demo/VirtualDevice.kt | 33 --- demo/build.gradle.kts | 24 ++ .../control/demo/ComplexStateFlowTest.kt | 30 +++ .../hep/dataforge/control/demo/DemoDevice.kt | 59 +++++ .../hep/dataforge/control/demo/DemoMain.kt | 119 +++++++++ .../control/demo/SimpleStateFlowTest.kt | 30 +++ settings.gradle.kts | 11 +- 29 files changed, 948 insertions(+), 451 deletions(-) create mode 100644 dataforge-control-core/src/commonMain/kotlin/hep/dataforge/control/base/DeviceProperty.kt delete mode 100644 dataforge-control-core/src/commonMain/kotlin/hep/dataforge/control/base/GenericProperty.kt delete mode 100644 dataforge-control-core/src/commonMain/kotlin/hep/dataforge/control/base/Property.kt delete mode 100644 dataforge-control-core/src/commonMain/kotlin/hep/dataforge/control/base/PropertyBuilder.kt create mode 100644 dataforge-control-core/src/commonMain/kotlin/hep/dataforge/control/base/SimpleDeviceProperty.kt delete mode 100644 dataforge-control-core/src/commonMain/kotlin/hep/dataforge/control/controlers/FlowController.kt create mode 100644 dataforge-control-core/src/commonMain/kotlin/hep/dataforge/control/controlers/MessageController.kt create mode 100644 dataforge-control-core/src/commonMain/kotlin/hep/dataforge/control/controlers/MessageFlow.kt create mode 100644 dataforge-control-core/src/commonMain/kotlin/hep/dataforge/control/controlers/PropertyFlow.kt create mode 100644 dataforge-control-core/src/commonMain/kotlin/hep/dataforge/control/controlers/delegateMappers.kt delete mode 100644 dataforge-control-core/src/commonMain/kotlin/hep/dataforge/control/controlers/deviceResponse.kt delete mode 100644 dataforge-control-core/src/jvmMain/kotlin/hep/dataforge/control/demo/VirtualDevice.kt create mode 100644 demo/build.gradle.kts create mode 100644 demo/src/main/kotlin/hep/dataforge/control/demo/ComplexStateFlowTest.kt create mode 100644 demo/src/main/kotlin/hep/dataforge/control/demo/DemoDevice.kt create mode 100644 demo/src/main/kotlin/hep/dataforge/control/demo/DemoMain.kt create mode 100644 demo/src/main/kotlin/hep/dataforge/control/demo/SimpleStateFlowTest.kt diff --git a/.gitignore b/.gitignore index faa82df..ea7eb83 100644 --- a/.gitignore +++ b/.gitignore @@ -3,5 +3,5 @@ .gradle *.iws out/ -/build/ +build/ !gradle-wrapper.jar \ No newline at end of file diff --git a/build.gradle.kts b/build.gradle.kts index a219533..10a5379 100644 --- a/build.gradle.kts +++ b/build.gradle.kts @@ -1,4 +1,6 @@ -val dataforgeVersion by extra("0.1.5-dev-9") +val dataforgeVersion by extra("0.1.8-dev-4") +val plotlyVersion by extra("0.2.0-dev-4") + allprojects { repositories { @@ -9,7 +11,7 @@ allprojects { } group = "hep.dataforge" - version = "0.1.0-dev" + version = "0.0.1" } val githubProject by extra("dataforge-control") diff --git a/dataforge-control-core/build.gradle.kts b/dataforge-control-core/build.gradle.kts index a701c36..15f1f19 100644 --- a/dataforge-control-core/build.gradle.kts +++ b/dataforge-control-core/build.gradle.kts @@ -1,17 +1,26 @@ +import scientifik.useCoroutines + plugins { id("scientifik.mpp") id("scientifik.publish") + id("kotlinx-atomicfu") version "0.14.3" } val dataforgeVersion: String by rootProject.extra -kotlin { +useCoroutines(version = "1.3.7") +kotlin { sourceSets { commonMain{ dependencies { api("hep.dataforge:dataforge-io:$dataforgeVersion") + //implementation("org.jetbrains.kotlinx:atomicfu-common:0.14.3") } } } +} + +atomicfu { + variant = "VH" } \ No newline at end of file diff --git a/dataforge-control-core/src/commonMain/kotlin/hep/dataforge/control/api/ActionDescriptor.kt b/dataforge-control-core/src/commonMain/kotlin/hep/dataforge/control/api/ActionDescriptor.kt index c44ae90..17d817c 100644 --- a/dataforge-control-core/src/commonMain/kotlin/hep/dataforge/control/api/ActionDescriptor.kt +++ b/dataforge-control-core/src/commonMain/kotlin/hep/dataforge/control/api/ActionDescriptor.kt @@ -1,7 +1,7 @@ package hep.dataforge.control.api -import hep.dataforge.meta.scheme.Scheme -import hep.dataforge.meta.scheme.SchemeSpec +import hep.dataforge.meta.Scheme +import hep.dataforge.meta.SchemeSpec /** * A descriptor for property diff --git a/dataforge-control-core/src/commonMain/kotlin/hep/dataforge/control/api/Device.kt b/dataforge-control-core/src/commonMain/kotlin/hep/dataforge/control/api/Device.kt index 8ea39ff..dd37b36 100644 --- a/dataforge-control-core/src/commonMain/kotlin/hep/dataforge/control/api/Device.kt +++ b/dataforge-control-core/src/commonMain/kotlin/hep/dataforge/control/api/Device.kt @@ -3,8 +3,10 @@ package hep.dataforge.control.api import hep.dataforge.meta.Meta import hep.dataforge.meta.MetaItem import kotlinx.coroutines.CoroutineScope +import kotlinx.coroutines.cancel +import kotlinx.io.Closeable -interface Device { +interface Device: Closeable { /** * List of supported property descriptors */ @@ -20,7 +22,17 @@ interface Device { */ val scope: CoroutineScope - var listener: PropertyChangeListener? + /** + * Register a new property change listener for this device. + * [owner] is provided optionally in order for listener to be + * easily removable + */ + fun registerListener(listener: PropertyChangeListener, owner: Any? = listener) + + /** + * Remove all listeners belonging to specified owner + */ + fun removeListener(owner: Any?) /** * Get the value of the property or throw error if property in not defined. Suspend if property value is not available @@ -42,10 +54,17 @@ interface Device { * Send a request and suspend caller while request is being processed. * Could return null if request does not return meaningful answer. */ - suspend fun action(name: String, argument: Meta? = null): Meta? + suspend fun call(action: String, argument: MetaItem<*>? = null): MetaItem<*>? + + override fun close() { + scope.cancel("The device is closed") + } companion object { const val GET_PROPERTY_ACTION = "@getProperty" const val SET_PROPERTY_ACTION = "@setProperty" + const val CALL_ACTION ="@call" } -} \ No newline at end of file +} + +suspend fun Device.call(name: String, meta: Meta?) = call(name, meta?.let { MetaItem.NodeItem(it) }) \ No newline at end of file diff --git a/dataforge-control-core/src/commonMain/kotlin/hep/dataforge/control/api/DeviceHub.kt b/dataforge-control-core/src/commonMain/kotlin/hep/dataforge/control/api/DeviceHub.kt index 7235489..9e285b6 100644 --- a/dataforge-control-core/src/commonMain/kotlin/hep/dataforge/control/api/DeviceHub.kt +++ b/dataforge-control-core/src/commonMain/kotlin/hep/dataforge/control/api/DeviceHub.kt @@ -20,4 +20,4 @@ suspend fun DeviceHub.setProperty(deviceName: String, propertyName: String, valu suspend fun DeviceHub.request(deviceName: String, command: String, argument: MetaItem<*>?): MetaItem<*>? = (getDevice(deviceName) ?: error("Device with name $deviceName not found in the hub")) - .action(command, argument) \ No newline at end of file + .call(command, argument) \ No newline at end of file diff --git a/dataforge-control-core/src/commonMain/kotlin/hep/dataforge/control/api/PropertyChangeListener.kt b/dataforge-control-core/src/commonMain/kotlin/hep/dataforge/control/api/PropertyChangeListener.kt index bc00083..25f3301 100644 --- a/dataforge-control-core/src/commonMain/kotlin/hep/dataforge/control/api/PropertyChangeListener.kt +++ b/dataforge-control-core/src/commonMain/kotlin/hep/dataforge/control/api/PropertyChangeListener.kt @@ -3,5 +3,5 @@ package hep.dataforge.control.api import hep.dataforge.meta.MetaItem interface PropertyChangeListener { - fun propertyChanged(propertyName: String, value: MetaItem<*>) + fun propertyChanged(propertyName: String, value: MetaItem<*>?) } \ No newline at end of file diff --git a/dataforge-control-core/src/commonMain/kotlin/hep/dataforge/control/api/PropertyDescriptor.kt b/dataforge-control-core/src/commonMain/kotlin/hep/dataforge/control/api/PropertyDescriptor.kt index e8c149c..2b7efd2 100644 --- a/dataforge-control-core/src/commonMain/kotlin/hep/dataforge/control/api/PropertyDescriptor.kt +++ b/dataforge-control-core/src/commonMain/kotlin/hep/dataforge/control/api/PropertyDescriptor.kt @@ -1,7 +1,7 @@ package hep.dataforge.control.api -import hep.dataforge.meta.scheme.Scheme -import hep.dataforge.meta.scheme.SchemeSpec +import hep.dataforge.meta.Scheme +import hep.dataforge.meta.SchemeSpec /** * A descriptor for property diff --git a/dataforge-control-core/src/commonMain/kotlin/hep/dataforge/control/base/Action.kt b/dataforge-control-core/src/commonMain/kotlin/hep/dataforge/control/base/Action.kt index 755df94..ebd1515 100644 --- a/dataforge-control-core/src/commonMain/kotlin/hep/dataforge/control/base/Action.kt +++ b/dataforge-control-core/src/commonMain/kotlin/hep/dataforge/control/base/Action.kt @@ -1,18 +1,64 @@ package hep.dataforge.control.base import hep.dataforge.control.api.ActionDescriptor -import hep.dataforge.meta.Meta +import hep.dataforge.meta.MetaBuilder +import hep.dataforge.meta.MetaItem +import hep.dataforge.values.Value +import kotlin.properties.ReadOnlyProperty +import kotlin.reflect.KProperty interface Action { val name: String val descriptor: ActionDescriptor - suspend operator fun invoke(arg: Meta?): Meta? + suspend operator fun invoke(arg: MetaItem<*>? = null): MetaItem<*>? } class SimpleAction( override val name: String, override val descriptor: ActionDescriptor, - val block: suspend (Meta?)->Meta? -): Action{ - override suspend fun invoke(arg: Meta?): Meta? = block(arg) + val block: suspend (MetaItem<*>?) -> MetaItem<*>? +) : Action { + override suspend fun invoke(arg: MetaItem<*>?): MetaItem<*>? = block(arg) +} + +class ActionDelegate( + val owner: D, + val descriptor: ActionDescriptor = ActionDescriptor.empty(), + val block: suspend (MetaItem<*>?) -> MetaItem<*>? +) : ReadOnlyProperty { + override fun getValue(thisRef: D, property: KProperty<*>): Action { + val name = property.name + return owner.resolveAction(name) { + SimpleAction(name, descriptor, block) + } + } +} + +fun D.request( + descriptor: ActionDescriptor = ActionDescriptor.empty(), + block: suspend (MetaItem<*>?) -> MetaItem<*>? +): ActionDelegate = ActionDelegate(this, descriptor, block) + +fun D.requestValue( + descriptor: ActionDescriptor = ActionDescriptor.empty(), + block: suspend (MetaItem<*>?) -> Any? +): ActionDelegate = ActionDelegate(this, descriptor){ + val res = block(it) + MetaItem.ValueItem(Value.of(res)) +} + +fun D.requestMeta( + descriptor: ActionDescriptor = ActionDescriptor.empty(), + block: suspend MetaBuilder.(MetaItem<*>?) -> Unit +): ActionDelegate = ActionDelegate(this, descriptor){ + val res = MetaBuilder().apply { block(it)} + MetaItem.NodeItem(res) +} + +fun D.action( + descriptor: ActionDescriptor = ActionDescriptor.empty(), + block: suspend (MetaItem<*>?) -> Unit +): ActionDelegate = ActionDelegate(this, descriptor) { + block(it) + null } \ No newline at end of file diff --git a/dataforge-control-core/src/commonMain/kotlin/hep/dataforge/control/base/DeviceBase.kt b/dataforge-control-core/src/commonMain/kotlin/hep/dataforge/control/base/DeviceBase.kt index 884a0da..85f4fa4 100644 --- a/dataforge-control-core/src/commonMain/kotlin/hep/dataforge/control/base/DeviceBase.kt +++ b/dataforge-control-core/src/commonMain/kotlin/hep/dataforge/control/base/DeviceBase.kt @@ -4,19 +4,24 @@ import hep.dataforge.control.api.ActionDescriptor import hep.dataforge.control.api.Device import hep.dataforge.control.api.PropertyChangeListener import hep.dataforge.control.api.PropertyDescriptor -import hep.dataforge.meta.Meta import hep.dataforge.meta.MetaItem -import kotlin.jvm.JvmStatic -import kotlin.reflect.KProperty -abstract class DeviceBase : Device, PropertyChangeListener { - private val properties = HashMap() +abstract class DeviceBase : Device { + private val properties = HashMap() private val actions = HashMap() - override var listener: PropertyChangeListener? = null + private val listeners = ArrayList>(4) - override fun propertyChanged(propertyName: String, value: MetaItem<*>) { - listener?.propertyChanged(propertyName, value) + override fun registerListener(listener: PropertyChangeListener, owner: Any?) { + listeners.add(owner to listener) + } + + override fun removeListener(owner: Any?) { + listeners.removeAll { it.first == owner } + } + + internal fun propertyChanged(propertyName: String, value: MetaItem<*>?) { + listeners.forEach { it.second.propertyChanged(propertyName, value) } } override val propertyDescriptors: Collection @@ -25,23 +30,12 @@ abstract class DeviceBase : Device, PropertyChangeListener { override val actionDescriptors: Collection get() = actions.values.map { it.descriptor } - fun

initProperty(prop: P): P { - properties[prop.name] = prop - return prop + internal fun resolveProperty(name: String, builder: () -> ReadOnlyDeviceProperty): ReadOnlyDeviceProperty { + return properties.getOrPut(name, builder) } - fun initRequest(Action: Action): Action { - actions[Action.name] = Action - return Action - } - - protected fun initRequest( - name: String, - descriptor: ActionDescriptor = ActionDescriptor.empty(), - block: suspend (MetaItem<*>?) -> MetaItem<*>? - ): Action { - val request = SimpleAction(name, descriptor, block) - return initRequest(request) + internal fun resolveAction(name: String, builder: () -> Action): Action { + return actions.getOrPut(name, builder) } override suspend fun getProperty(propertyName: String): MetaItem<*> = @@ -52,48 +46,19 @@ abstract class DeviceBase : Device, PropertyChangeListener { } override suspend fun setProperty(propertyName: String, value: MetaItem<*>) { - (properties[propertyName] as? Property ?: error("Property with name $propertyName not defined")).write(value) + (properties[propertyName] as? DeviceProperty ?: error("Property with name $propertyName not defined")).write( + value + ) } - override suspend fun action(name: String, argument: Meta?): Meta? = - (actions[name] ?: error("Request with name $name not defined")).invoke(argument) + override suspend fun call(action: String, argument: MetaItem<*>?): MetaItem<*>? = + (actions[action] ?: error("Request with name $action not defined")).invoke(argument) companion object { - @JvmStatic - protected fun D.initProperty( - name: String, - builder: PropertyBuilder.() -> P - ): P { - val property = PropertyBuilder(name, this).run(builder) - initProperty(property) - return property - } } } -class PropertyDelegateProvider>( - val owner: D, - val builder: PropertyBuilder.() -> P -) { - operator fun provideDelegate(thisRef: D, property: KProperty<*>): P { - val name = property.name - return owner.initProperty(PropertyBuilder(name, owner).run(builder)) - } -} - -fun D.property( - builder: PropertyBuilder.() -> GenericReadOnlyProperty -): PropertyDelegateProvider> { - return PropertyDelegateProvider(this, builder) -} - -//TODO try to use 'property' with new inference -fun D.mutableProperty( - builder: PropertyBuilder.() -> GenericProperty -): PropertyDelegateProvider> { - return PropertyDelegateProvider(this, builder) -} diff --git a/dataforge-control-core/src/commonMain/kotlin/hep/dataforge/control/base/DeviceProperty.kt b/dataforge-control-core/src/commonMain/kotlin/hep/dataforge/control/base/DeviceProperty.kt new file mode 100644 index 0000000..34ce529 --- /dev/null +++ b/dataforge-control-core/src/commonMain/kotlin/hep/dataforge/control/base/DeviceProperty.kt @@ -0,0 +1,85 @@ +package hep.dataforge.control.base + +import hep.dataforge.control.api.PropertyDescriptor +import hep.dataforge.meta.MetaItem +import kotlinx.coroutines.* +import kotlinx.coroutines.flow.Flow +import kotlin.properties.ReadOnlyProperty +import kotlin.properties.ReadWriteProperty +import kotlin.reflect.KProperty +import kotlin.time.Duration + +/** + * Read-only device property + */ +interface ReadOnlyDeviceProperty : ReadOnlyProperty?> { + /** + * Property name, should be unique in device + */ + val name: String + + /** + * Property descriptor + */ + val descriptor: PropertyDescriptor + + val scope: CoroutineScope + + /** + * Erase logical value and force re-read from device on next [read] + */ + suspend fun invalidate() + +// /** +// * Update property logical value and notify listener without writing it to device +// */ +// suspend fun update(item: MetaItem<*>) +// + /** + * Get cached value and return null if value is invalid or not initialized + */ + val value: MetaItem<*>? + + /** + * Read value either from cache if cache is valid or directly from physical device. + * If [force], reread + */ + suspend fun read(force: Boolean = false): MetaItem<*> + + /** + * The [Flow] representing future logical states of the property. + * Produces null when the state is invalidated + */ + fun flow(): Flow?> + + override fun getValue(thisRef: Any?, property: KProperty<*>): MetaItem<*>? = value +} + +/** + * Launch recurring force re-read job on a property scope with given [duration] between reads. + */ +fun ReadOnlyDeviceProperty.readEvery(duration: Duration): Job = scope.launch { + while (isActive) { + read(true) + delay(duration) + } +} + +/** + * A writeable device property with non-suspended write + */ +interface DeviceProperty : ReadOnlyDeviceProperty, ReadWriteProperty?> { + override var value: MetaItem<*>? + + /** + * Write value to physical device. Invalidates logical value, but does not update it automatically + */ + suspend fun write(item: MetaItem<*>) + + override fun setValue(thisRef: Any?, property: KProperty<*>, value: MetaItem<*>?) { + this.value = value + } + + override fun getValue(thisRef: Any?, property: KProperty<*>): MetaItem<*>? = value +} + diff --git a/dataforge-control-core/src/commonMain/kotlin/hep/dataforge/control/base/GenericProperty.kt b/dataforge-control-core/src/commonMain/kotlin/hep/dataforge/control/base/GenericProperty.kt deleted file mode 100644 index 831a9e1..0000000 --- a/dataforge-control-core/src/commonMain/kotlin/hep/dataforge/control/base/GenericProperty.kt +++ /dev/null @@ -1,86 +0,0 @@ -package hep.dataforge.control.base - -import hep.dataforge.control.api.PropertyDescriptor -import hep.dataforge.meta.MetaItem -import hep.dataforge.meta.transformations.MetaCaster -import kotlinx.coroutines.launch -import kotlinx.coroutines.sync.Mutex -import kotlinx.coroutines.sync.withLock -import kotlinx.coroutines.withContext -import kotlin.properties.ReadWriteProperty -import kotlin.reflect.KProperty - -open class GenericReadOnlyProperty( - override val name: String, - override val descriptor: PropertyDescriptor, - override val owner: D, - internal val converter: MetaCaster, - internal val getter: suspend D.() -> T -) : ReadOnlyProperty, kotlin.properties.ReadOnlyProperty { - - protected val mutex = Mutex() - protected var value: T? = null - - suspend fun updateValue(value: T) { - mutex.withLock { this.value = value } - owner.propertyChanged(name, converter.objectToMetaItem(value)) - } - - override suspend fun invalidate() { - mutex.withLock { value = null } - } - - suspend fun readValue(force: Boolean = false): T { - if (force) invalidate() - return value ?: withContext(owner.scope.coroutineContext) { - //all device operations should be run on device context - owner.getter().also { updateValue(it) } - } - } - - fun peekValue(): T? = value - - override suspend fun update(item: MetaItem<*>) { - updateValue(converter.itemToObject(item)) - } - - override suspend fun read(force: Boolean): MetaItem<*> = converter.objectToMetaItem(readValue(force)) - - override fun peek(): MetaItem<*>? = value?.let { converter.objectToMetaItem(it) } - - override fun getValue(thisRef: Any?, property: KProperty<*>): T? = peekValue() -} - -class GenericProperty( - name: String, - descriptor: PropertyDescriptor, - owner: D, - converter: MetaCaster, - getter: suspend D.() -> T, - private val setter: suspend D.(oldValue: T?, newValue: T) -> Unit -) : Property, ReadWriteProperty, GenericReadOnlyProperty(name, descriptor, owner, converter, getter) { - - suspend fun writeValue(newValue: T) { - val oldValue = value - withContext(owner.scope.coroutineContext) { - //all device operations should be run on device context - invalidate() - owner.setter(oldValue, newValue) - } - } - - override suspend fun write(item: MetaItem<*>) { - writeValue(converter.itemToObject(item)) - } - - override fun setValue(thisRef: Any?, property: KProperty<*>, value: T?) { - owner.scope.launch { - if (value == null) { - invalidate() - } else { - writeValue(value) - } - } - } - -} \ No newline at end of file diff --git a/dataforge-control-core/src/commonMain/kotlin/hep/dataforge/control/base/Property.kt b/dataforge-control-core/src/commonMain/kotlin/hep/dataforge/control/base/Property.kt deleted file mode 100644 index 98b6f0f..0000000 --- a/dataforge-control-core/src/commonMain/kotlin/hep/dataforge/control/base/Property.kt +++ /dev/null @@ -1,54 +0,0 @@ -package hep.dataforge.control.base - -import hep.dataforge.control.api.Device -import hep.dataforge.control.api.PropertyDescriptor -import hep.dataforge.meta.MetaItem - -/** - * Read-only device property - */ -interface ReadOnlyProperty { - /** - * Property name, should be unique in device - */ - val name: String - - val owner: Device - - /** - * Property descriptor - */ - val descriptor: PropertyDescriptor - - /** - * Erase logical value and force re-read from device on next [read] - */ - suspend fun invalidate() - - /** - * Update property logical value and notify listener without writing it to device - */ - suspend fun update(item: MetaItem<*>) - - /** - * Get cached value and return null if value is invalid - */ - fun peek(): MetaItem<*>? - - /** - * Read value either from cache if cache is valid or directly from physical device - */ - suspend fun read(force: Boolean = false): MetaItem<*> -} - -/** - * A single writeable property handler - */ -interface Property : ReadOnlyProperty { - - /** - * Write value to physical device. Invalidates logical value, but does not update it automatically - */ - suspend fun write(item: MetaItem<*>) -} - diff --git a/dataforge-control-core/src/commonMain/kotlin/hep/dataforge/control/base/PropertyBuilder.kt b/dataforge-control-core/src/commonMain/kotlin/hep/dataforge/control/base/PropertyBuilder.kt deleted file mode 100644 index 359f6b1..0000000 --- a/dataforge-control-core/src/commonMain/kotlin/hep/dataforge/control/base/PropertyBuilder.kt +++ /dev/null @@ -1,56 +0,0 @@ -package hep.dataforge.control.base - -import hep.dataforge.control.api.PropertyDescriptor -import hep.dataforge.meta.transformations.MetaCaster -import hep.dataforge.values.Value - -class PropertyBuilder(val name: String, val owner: D) { - var descriptor: PropertyDescriptor = PropertyDescriptor.empty() - - inline fun descriptor(block: PropertyDescriptor.() -> Unit) { - descriptor.apply(block) - } - - fun get(converter: MetaCaster, getter: (suspend D.() -> T)): GenericReadOnlyProperty = - GenericReadOnlyProperty(name, descriptor, owner, converter, getter) - - fun getDouble(getter: (suspend D.() -> Double)): GenericReadOnlyProperty = - GenericReadOnlyProperty(name, descriptor, owner, MetaCaster.double) { getter() } - - fun getString(getter: suspend D.() -> String): GenericReadOnlyProperty = - GenericReadOnlyProperty(name, descriptor, owner, MetaCaster.string) { getter() } - - fun getBoolean(getter: suspend D.() -> Boolean): GenericReadOnlyProperty = - GenericReadOnlyProperty(name, descriptor, owner, MetaCaster.boolean) { getter() } - - fun getValue(getter: suspend D.() -> Any?): GenericReadOnlyProperty = - GenericReadOnlyProperty(name, descriptor, owner, MetaCaster.value) { Value.of(getter()) } - - /** - * Convert this read-only property to read-write property - */ - infix fun GenericReadOnlyProperty.set(setter: (suspend D.(oldValue: T?, newValue: T) -> Unit)): GenericProperty { - return GenericProperty(name, descriptor, owner, converter, getter, setter) - } - - /** - * Create read-write property with synchronized setter which updates value after set - */ - fun GenericReadOnlyProperty.set(synchronousSetter: (suspend D.(oldValue: T?, newValue: T) -> T)): GenericProperty { - val setter: suspend D.(oldValue: T?, newValue: T) -> Unit = { oldValue, newValue -> - val result = synchronousSetter(oldValue, newValue) - updateValue(result) - } - return GenericProperty(name, descriptor, owner, converter, getter, setter) - } - - /** - * Define a setter that does nothing for virtual property - */ - fun GenericReadOnlyProperty.virtualSet(): GenericProperty { - val setter: suspend D.(oldValue: T?, newValue: T) -> Unit = { oldValue, newValue -> - updateValue(newValue) - } - return GenericProperty(name, descriptor, owner, converter, getter, setter) - } -} \ No newline at end of file diff --git a/dataforge-control-core/src/commonMain/kotlin/hep/dataforge/control/base/SimpleDeviceProperty.kt b/dataforge-control-core/src/commonMain/kotlin/hep/dataforge/control/base/SimpleDeviceProperty.kt new file mode 100644 index 0000000..23730b0 --- /dev/null +++ b/dataforge-control-core/src/commonMain/kotlin/hep/dataforge/control/base/SimpleDeviceProperty.kt @@ -0,0 +1,250 @@ +package hep.dataforge.control.base + +import hep.dataforge.control.api.PropertyDescriptor +import hep.dataforge.meta.Meta +import hep.dataforge.meta.MetaBuilder +import hep.dataforge.meta.MetaItem +import hep.dataforge.meta.double +import hep.dataforge.values.Value +import hep.dataforge.values.asValue +import kotlinx.coroutines.CoroutineScope +import kotlinx.coroutines.ExperimentalCoroutinesApi +import kotlinx.coroutines.flow.MutableStateFlow +import kotlinx.coroutines.flow.StateFlow +import kotlinx.coroutines.launch +import kotlinx.coroutines.sync.Mutex +import kotlinx.coroutines.sync.withLock +import kotlinx.coroutines.withContext +import kotlin.properties.ReadOnlyProperty +import kotlin.reflect.KProperty + +@OptIn(ExperimentalCoroutinesApi::class) +open class SimpleReadOnlyDeviceProperty( + override val name: String, + default: MetaItem<*>?, + override val descriptor: PropertyDescriptor, + override val scope: CoroutineScope, + private val updateCallback: (name: String, item: MetaItem<*>) -> Unit, + private val getter: suspend (before: MetaItem<*>?) -> MetaItem<*> +) : ReadOnlyDeviceProperty { + + private val state: MutableStateFlow?> = MutableStateFlow(default) + override val value: MetaItem<*>? get() = state.value + + override suspend fun invalidate() { + state.value = null + } + + private fun update(item: MetaItem<*>) { + state.value = item + updateCallback(name, item) + } + + override suspend fun read(force: Boolean): MetaItem<*> { + //backup current value + val currentValue = value + return if (force || currentValue == null) { + val res = withContext(scope.coroutineContext) { + //all device operations should be run on device context + //TODO add error catching + getter(currentValue) + } + update(res) + res + } else { + currentValue + } + } + + override fun flow(): StateFlow?> = state +} + +private class ReadOnlyDevicePropertyDelegate( + val owner: D, + val default: MetaItem<*>?, + val descriptor: PropertyDescriptor = PropertyDescriptor.empty(), + private val getter: suspend (MetaItem<*>?) -> MetaItem<*> +) : ReadOnlyProperty { + + override fun getValue(thisRef: D, property: KProperty<*>): SimpleReadOnlyDeviceProperty { + val name = property.name + + return owner.resolveProperty(name) { + @OptIn(ExperimentalCoroutinesApi::class) + SimpleReadOnlyDeviceProperty( + name, + default, + descriptor, + owner.scope, + owner::propertyChanged, + getter + ) + } as SimpleReadOnlyDeviceProperty + } +} + +fun D.reading( + default: MetaItem<*>? = null, + descriptorBuilder: PropertyDescriptor.() -> Unit = {}, + getter: suspend (MetaItem<*>?) -> MetaItem<*> +): ReadOnlyProperty = ReadOnlyDevicePropertyDelegate( + this, + default, + PropertyDescriptor.invoke(descriptorBuilder), + getter +) + +fun D.readingValue( + default: Value? = null, + descriptorBuilder: PropertyDescriptor.() -> Unit = {}, + getter: suspend () -> Any +): ReadOnlyProperty = ReadOnlyDevicePropertyDelegate( + this, + default?.let { MetaItem.ValueItem(it) }, + PropertyDescriptor.invoke(descriptorBuilder), + { MetaItem.ValueItem(Value.of(getter())) } +) + +fun D.readingNumber( + default: Number? = null, + descriptorBuilder: PropertyDescriptor.() -> Unit = {}, + getter: suspend () -> Number +): ReadOnlyProperty = ReadOnlyDevicePropertyDelegate( + this, + default?.let { MetaItem.ValueItem(it.asValue()) }, + PropertyDescriptor.invoke(descriptorBuilder), + { + val number = getter() + MetaItem.ValueItem(number.asValue()) + } +) + +fun D.readingMeta( + default: Meta? = null, + descriptorBuilder: PropertyDescriptor.() -> Unit = {}, + getter: suspend MetaBuilder.() -> Unit +): ReadOnlyProperty = ReadOnlyDevicePropertyDelegate( + this, + default?.let { MetaItem.NodeItem(it) }, + PropertyDescriptor.invoke(descriptorBuilder), + { MetaItem.NodeItem(MetaBuilder().apply { getter() }) } +) + +@OptIn(ExperimentalCoroutinesApi::class) +class SimpleDeviceProperty( + name: String, + default: MetaItem<*>?, + descriptor: PropertyDescriptor, + scope: CoroutineScope, + updateCallback: (name: String, item: MetaItem<*>?) -> Unit, + getter: suspend (MetaItem<*>?) -> MetaItem<*>, + private val setter: suspend (oldValue: MetaItem<*>?, newValue: MetaItem<*>) -> Unit +) : SimpleReadOnlyDeviceProperty(name, default, descriptor, scope, updateCallback, getter), DeviceProperty { + + override var value: MetaItem<*>? + get() = super.value + set(value) { + scope.launch { + if (value == null) { + invalidate() + } else { + write(value) + } + } + } + + private val writeLock = Mutex() + + override suspend fun write(item: MetaItem<*>) { + writeLock.withLock { + //fast return if value is not changed + if (item == value) return@withLock + val oldValue = value + //all device operations should be run on device context + withContext(scope.coroutineContext) { + //TODO add error catching + setter(oldValue, item) + } + } + } +} + +private class DevicePropertyDelegate( + val owner: D, + val default: MetaItem<*>?, + val descriptor: PropertyDescriptor = PropertyDescriptor.empty(), + private val getter: suspend (MetaItem<*>?) -> MetaItem<*>, + private val setter: suspend (oldValue: MetaItem<*>?, newValue: MetaItem<*>) -> Unit +) : ReadOnlyProperty { + + override fun getValue(thisRef: D, property: KProperty<*>): SimpleDeviceProperty { + val name = property.name + return owner.resolveProperty(name) { + @OptIn(ExperimentalCoroutinesApi::class) + SimpleDeviceProperty( + name, + default, + descriptor, + owner.scope, + owner::propertyChanged, + getter, + setter + ) + } as SimpleDeviceProperty + } +} + +fun D.writing( + default: MetaItem<*>? = null, + descriptorBuilder: PropertyDescriptor.() -> Unit = {}, + getter: suspend (MetaItem<*>?) -> MetaItem<*>, + setter: suspend (oldValue: MetaItem<*>?, newValue: MetaItem<*>) -> Unit +): ReadOnlyProperty = DevicePropertyDelegate( + this, + default, + PropertyDescriptor.invoke(descriptorBuilder), + getter, + setter +) + +fun D.writingVirtual( + default: MetaItem<*>, + descriptorBuilder: PropertyDescriptor.() -> Unit = {} +): ReadOnlyProperty = writing( + default, + descriptorBuilder, + getter = { it ?: default }, + setter = { _, _ -> } +) + +fun D.writingVirtual( + default: Value, + descriptorBuilder: PropertyDescriptor.() -> Unit = {} +): ReadOnlyProperty = writing( + MetaItem.ValueItem(default), + descriptorBuilder, + getter = { it ?: MetaItem.ValueItem(default) }, + setter = { _, _ -> } +) + +fun D.writingDouble( + descriptorBuilder: PropertyDescriptor.() -> Unit = {}, + getter: suspend (Double) -> Double, + setter: suspend (oldValue: Double?, newValue: Double) -> Unit +): ReadOnlyProperty { + val innerGetter: suspend (MetaItem<*>?) -> MetaItem<*> = { + MetaItem.ValueItem(getter(it.double ?: Double.NaN).asValue()) + } + + val innerSetter: suspend (oldValue: MetaItem<*>?, newValue: MetaItem<*>) -> Unit = { oldValue, newValue -> + setter(oldValue.double, newValue.double ?: Double.NaN) + } + + return DevicePropertyDelegate( + this, + MetaItem.ValueItem(Double.NaN.asValue()), + PropertyDescriptor.invoke(descriptorBuilder), + innerGetter, + innerSetter + ) +} diff --git a/dataforge-control-core/src/commonMain/kotlin/hep/dataforge/control/controlers/DeviceMessage.kt b/dataforge-control-core/src/commonMain/kotlin/hep/dataforge/control/controlers/DeviceMessage.kt index 379a64d..53e7c74 100644 --- a/dataforge-control-core/src/commonMain/kotlin/hep/dataforge/control/controlers/DeviceMessage.kt +++ b/dataforge-control-core/src/commonMain/kotlin/hep/dataforge/control/controlers/DeviceMessage.kt @@ -1,34 +1,34 @@ package hep.dataforge.control.controlers -import hep.dataforge.meta.Meta -import hep.dataforge.meta.scheme.* - -class PropertyValue : Scheme() { - var name by string { error("Name property not defined") } - var value by item() - - companion object : SchemeSpec(::PropertyValue) -} +import hep.dataforge.meta.* +import hep.dataforge.names.asName open class DeviceMessage : Scheme() { var id by item() var source by string()//TODO consider replacing by item var target by string() - var action by string() - var status by string() + var comment by string() + var action by string(key = MESSAGE_ACTION_KEY) + var status by string(RESPONSE_OK_STATUS) + var value by item(key = MESSAGE_VALUE_KEY) companion object : SchemeSpec(::DeviceMessage) { - const val MESSAGE_ACTION_KEY = "action" - const val MESSAGE_PROPERTY_NAME_KEY = "propertyName" - const val MESSAGE_VALUE_KEY = "value" + val MESSAGE_ACTION_KEY = "action".asName() + val MESSAGE_VALUE_KEY = "value".asName() const val RESPONSE_OK_STATUS = "response.OK" - const val EVENT_STATUS = "event.propertyChange" + const val RESPONSE_FAIL_STATUS = "response.FAIL" - fun ok(request: DeviceMessage? = null, block: DeviceMessage.() -> Unit): Meta { + fun ok(request: DeviceMessage? = null, block: DeviceMessage.() -> Unit = {}): DeviceMessage { return DeviceMessage { id = request?.id - status = RESPONSE_OK_STATUS - }.apply(block).toMeta() + }.apply(block) + } + + fun fail(request: DeviceMessage? = null,block: DeviceMessage.() -> Unit = {}): DeviceMessage { + return DeviceMessage { + id = request?.id + status = RESPONSE_FAIL_STATUS + }.apply(block) } } } @@ -41,15 +41,22 @@ class DevicePropertyMessage : DeviceMessage() { this.property = PropertyValue.invoke(builder) } + class PropertyValue : Scheme() { + var name by string { error("Property name not defined") } + var value by item() + + companion object : SchemeSpec(::PropertyValue) + } + companion object : SchemeSpec(::DevicePropertyMessage) { - fun ok(request: DeviceMessage? = null, block: DevicePropertyMessage.() -> Unit): Meta { + const val PROPERTY_CHANGED_ACTION = "event.propertyChange" + fun ok(request: DeviceMessage? = null, block: DevicePropertyMessage.() -> Unit = {}): DeviceMessage { return DevicePropertyMessage { id = request?.id property { name } - status = RESPONSE_OK_STATUS - }.apply(block).toMeta() + }.apply(block) } } } \ No newline at end of file diff --git a/dataforge-control-core/src/commonMain/kotlin/hep/dataforge/control/controlers/FlowController.kt b/dataforge-control-core/src/commonMain/kotlin/hep/dataforge/control/controlers/FlowController.kt deleted file mode 100644 index b25bd93..0000000 --- a/dataforge-control-core/src/commonMain/kotlin/hep/dataforge/control/controlers/FlowController.kt +++ /dev/null @@ -1,60 +0,0 @@ -package hep.dataforge.control.controlers - -import hep.dataforge.control.api.Device -import hep.dataforge.control.api.PropertyChangeListener -import hep.dataforge.control.controlers.DeviceMessage.Companion.EVENT_STATUS -import hep.dataforge.io.Envelope -import hep.dataforge.io.SimpleEnvelope -import hep.dataforge.meta.MetaItem -import kotlinx.coroutines.CoroutineScope -import kotlinx.coroutines.channels.Channel -import kotlinx.coroutines.channels.Channel.Factory.CONFLATED -import kotlinx.coroutines.channels.SendChannel -import kotlinx.coroutines.flow.consumeAsFlow -import kotlinx.coroutines.launch -import kotlinx.io.Closeable -import kotlinx.io.EmptyBinary - -class FlowController(val device: D, val target: String, val scope: CoroutineScope) : PropertyChangeListener, - Closeable { - - init { - if (device.listener != null) error("Can't attach controller to $device, the controller is already attached") - device.listener = this - } - - private val outputChannel = Channel(CONFLATED) - private val inputChannel = Channel(CONFLATED) - - val input: SendChannel get() = inputChannel - val output = outputChannel.consumeAsFlow() - - init { - scope.launch { - while (!inputChannel.isClosedForSend) { - val request = inputChannel.receive() - val response = device.respond(target, request) - outputChannel.send(response) - } - } - } - - override fun propertyChanged(propertyName: String, value: MetaItem<*>) { - scope.launch { - val changeMeta = DevicePropertyMessage.ok { - this.source = target - status = EVENT_STATUS - property { - name = propertyName - this.value = value - } - } - outputChannel.send(SimpleEnvelope(changeMeta, EmptyBinary)) - } - } - - override fun close() { - outputChannel.cancel() - } - -} \ No newline at end of file diff --git a/dataforge-control-core/src/commonMain/kotlin/hep/dataforge/control/controlers/MessageController.kt b/dataforge-control-core/src/commonMain/kotlin/hep/dataforge/control/controlers/MessageController.kt new file mode 100644 index 0000000..3b89b5f --- /dev/null +++ b/dataforge-control-core/src/commonMain/kotlin/hep/dataforge/control/controlers/MessageController.kt @@ -0,0 +1,104 @@ +package hep.dataforge.control.controlers + +import hep.dataforge.control.api.Device +import hep.dataforge.control.api.PropertyChangeListener +import hep.dataforge.control.controlers.DevicePropertyMessage.Companion.PROPERTY_CHANGED_ACTION +import hep.dataforge.io.Envelope +import hep.dataforge.io.Responder +import hep.dataforge.io.SimpleEnvelope +import hep.dataforge.meta.MetaItem +import hep.dataforge.meta.get +import hep.dataforge.meta.string +import hep.dataforge.meta.wrap +import kotlinx.io.Binary + + +interface MessageConsumer { + fun consume(message: Envelope): Unit +} + +class MessageController( + val device: Device, + val deviceTarget: String +) : Responder, PropertyChangeListener { + + init { + device.registerListener(this, this) + } + + var messageListener: MessageConsumer? = null + + override suspend fun respond(request: Envelope): Envelope { + val responseMessage: DeviceMessage = try { + when (val action = request.meta[DeviceMessage.MESSAGE_ACTION_KEY].string ?: error("Action not defined")) { + Device.GET_PROPERTY_ACTION -> { + val message = DevicePropertyMessage.wrap(request.meta) + val property = message.property ?: error("Property item not defined") + val propertyName: String = property.name + val result = device.getProperty(propertyName) + + DevicePropertyMessage.ok { + this.source = deviceTarget + this.target = message.source + property { + name = propertyName + value = result + } + } + } + Device.SET_PROPERTY_ACTION -> { + val message = DevicePropertyMessage.wrap(request.meta) + val property = message.property ?: error("Property item not defined") + val propertyName: String = property.name + val propertyValue = property.value + if (propertyValue == null) { + device.invalidateProperty(propertyName) + } else { + device.setProperty(propertyName, propertyValue) + } + DevicePropertyMessage.ok { + this.source = deviceTarget + this.target = message.source + property { + name = propertyName + } + } + } + else -> { + val value = request.meta[DeviceMessage.MESSAGE_VALUE_KEY] + val result = device.call(action, value) + DeviceMessage.ok { + this.source = deviceTarget + this.action = action + this.value = result + } + } + } + } catch (ex: Exception) { + DeviceMessage.fail { + comment = ex.message + } + } + + return SimpleEnvelope(responseMessage.toMeta(), Binary.EMPTY) + } + + override fun propertyChanged(propertyName: String, value: MetaItem<*>?) { + if (value == null) return + messageListener?.let { listener -> + val change = DevicePropertyMessage.ok { + this.source = deviceTarget + action = PROPERTY_CHANGED_ACTION + property { + name = propertyName + this.value = value + } + } + val envelope = SimpleEnvelope(change.toMeta(), Binary.EMPTY) + listener.consume(envelope) + } + } + + companion object { + } +} \ No newline at end of file diff --git a/dataforge-control-core/src/commonMain/kotlin/hep/dataforge/control/controlers/MessageFlow.kt b/dataforge-control-core/src/commonMain/kotlin/hep/dataforge/control/controlers/MessageFlow.kt new file mode 100644 index 0000000..aec564d --- /dev/null +++ b/dataforge-control-core/src/commonMain/kotlin/hep/dataforge/control/controlers/MessageFlow.kt @@ -0,0 +1,57 @@ +package hep.dataforge.control.controlers + +import hep.dataforge.io.Envelope +import kotlinx.coroutines.CoroutineScope +import kotlinx.coroutines.ExperimentalCoroutinesApi +import kotlinx.coroutines.channels.Channel +import kotlinx.coroutines.channels.Channel.Factory.CONFLATED +import kotlinx.coroutines.channels.SendChannel +import kotlinx.coroutines.flow.Flow +import kotlinx.coroutines.flow.consumeAsFlow +import kotlinx.coroutines.launch +import kotlinx.io.Closeable + +@ExperimentalCoroutinesApi +class MessageFlow( + val controller: MessageController, + val scope: CoroutineScope +) : Closeable, MessageConsumer { + + init { + if (controller.messageListener != null) error("Can't attach controller to $controller, the controller is already attached") + controller.messageListener = this + } + + private val outputChannel = Channel(CONFLATED) + private val inputChannel = Channel(CONFLATED) + + val input: SendChannel get() = inputChannel + val output: Flow = outputChannel.consumeAsFlow() + + init { + scope.launch { + while (!inputChannel.isClosedForSend) { + val request = inputChannel.receive() + val response = controller.respond(request) + outputChannel.send(response) + } + } + } + + override fun consume(message: Envelope) { + scope.launch { + outputChannel.send(message) + } + } + + override fun close() { + outputChannel.cancel() + } +} + +@ExperimentalCoroutinesApi +fun MessageController.flow(scope: CoroutineScope = device.scope): MessageFlow { + return MessageFlow(this, scope).also { + this@flow.messageListener = it + } +} \ No newline at end of file diff --git a/dataforge-control-core/src/commonMain/kotlin/hep/dataforge/control/controlers/PropertyFlow.kt b/dataforge-control-core/src/commonMain/kotlin/hep/dataforge/control/controlers/PropertyFlow.kt new file mode 100644 index 0000000..e23ab47 --- /dev/null +++ b/dataforge-control-core/src/commonMain/kotlin/hep/dataforge/control/controlers/PropertyFlow.kt @@ -0,0 +1,28 @@ +package hep.dataforge.control.controlers + +import hep.dataforge.control.api.Device +import hep.dataforge.control.api.PropertyChangeListener +import hep.dataforge.meta.MetaItem +import kotlinx.coroutines.ExperimentalCoroutinesApi +import kotlinx.coroutines.channels.awaitClose +import kotlinx.coroutines.flow.Flow +import kotlinx.coroutines.flow.callbackFlow +import kotlinx.coroutines.launch + + +@ExperimentalCoroutinesApi +suspend fun Device.valueFlow(): Flow>> = callbackFlow { + val listener = object : PropertyChangeListener { + override fun propertyChanged(propertyName: String, value: MetaItem<*>?) { + if (value != null) { + launch { + send(propertyName to value) + } + } + } + } + registerListener(listener, listener) + awaitClose { + removeListener(listener) + } +} \ No newline at end of file diff --git a/dataforge-control-core/src/commonMain/kotlin/hep/dataforge/control/controlers/delegateMappers.kt b/dataforge-control-core/src/commonMain/kotlin/hep/dataforge/control/controlers/delegateMappers.kt new file mode 100644 index 0000000..1e58386 --- /dev/null +++ b/dataforge-control-core/src/commonMain/kotlin/hep/dataforge/control/controlers/delegateMappers.kt @@ -0,0 +1,10 @@ +package hep.dataforge.control.controlers + +import hep.dataforge.control.base.DeviceProperty +import hep.dataforge.control.base.ReadOnlyDeviceProperty +import hep.dataforge.meta.double +import hep.dataforge.meta.map +import hep.dataforge.meta.transform + +fun ReadOnlyDeviceProperty.double() = map { it.double } +fun DeviceProperty.double() = transform { it.double ?: Double.NaN } diff --git a/dataforge-control-core/src/commonMain/kotlin/hep/dataforge/control/controlers/deviceResponse.kt b/dataforge-control-core/src/commonMain/kotlin/hep/dataforge/control/controlers/deviceResponse.kt deleted file mode 100644 index 91148ed..0000000 --- a/dataforge-control-core/src/commonMain/kotlin/hep/dataforge/control/controlers/deviceResponse.kt +++ /dev/null @@ -1,61 +0,0 @@ -package hep.dataforge.control.controlers - -import hep.dataforge.control.api.Device -import hep.dataforge.io.Envelope -import hep.dataforge.io.SimpleEnvelope -import hep.dataforge.meta.Meta -import hep.dataforge.meta.set -import kotlinx.io.EmptyBinary - -suspend fun Device.respond(target: String, request: Envelope, action: String): Envelope { - val metaResult = when (action) { - Device.GET_PROPERTY_ACTION -> { - val message = DevicePropertyMessage.wrap(request.meta) - val property = message.property ?: error("Property item not defined") - val propertyName: String = property.name - val result = getProperty(propertyName) - - DevicePropertyMessage.ok { - this.source = target - this.target = message.source - property { - name = propertyName - value = result - } - } - } - Device.SET_PROPERTY_ACTION -> { - val message = DevicePropertyMessage.wrap(request.meta) - val property = message.property ?: error("Property item not defined") - val propertyName: String = property.name - val propertyValue = property.value - if (propertyValue == null) { - invalidateProperty(propertyName) - } else { - setProperty(propertyName, propertyValue) - } - DevicePropertyMessage.ok { - this.source = target - this.target = message.source - property { - name = propertyName - } - } - } - else -> { - val data: Meta? = request.meta[DeviceMessage.MESSAGE_VALUE_KEY].node - val result = action(action, data) - DeviceMessage.ok { - this.source = target - config[DeviceMessage.MESSAGE_ACTION_KEY] = action - config[DeviceMessage.MESSAGE_VALUE_KEY] = result - } - } - } - return SimpleEnvelope(metaResult, EmptyBinary) -} - -suspend fun Device.respond(target: String, request: Envelope): Envelope { - val action: String = request.meta[DeviceMessage.MESSAGE_ACTION_KEY].string ?: error("Action not defined") - return respond(target, request, action) -} \ No newline at end of file diff --git a/dataforge-control-core/src/jvmMain/kotlin/hep/dataforge/control/demo/VirtualDevice.kt b/dataforge-control-core/src/jvmMain/kotlin/hep/dataforge/control/demo/VirtualDevice.kt deleted file mode 100644 index b0b7dc3..0000000 --- a/dataforge-control-core/src/jvmMain/kotlin/hep/dataforge/control/demo/VirtualDevice.kt +++ /dev/null @@ -1,33 +0,0 @@ -package hep.dataforge.control.demo - -import hep.dataforge.control.base.DeviceBase -import hep.dataforge.control.base.mutableProperty -import hep.dataforge.control.base.property -import hep.dataforge.meta.Meta -import kotlinx.coroutines.CoroutineScope -import java.time.Instant -import kotlin.math.cos -import kotlin.math.sin - -class VirtualDevice(val meta: Meta, override val scope: CoroutineScope) : DeviceBase() { - - var scale by mutableProperty { - getDouble { - 200.0 - }.virtualSet() - } - - val sin by property { - getDouble { - val time = Instant.now() - sin(time.toEpochMilli().toDouble() / (scale ?: 1000.0)) - } - } - - val cos by property { - getDouble { - val time = Instant.now() - cos(time.toEpochMilli().toDouble() / (scale ?: 1000.0)) - } - } -} \ No newline at end of file diff --git a/demo/build.gradle.kts b/demo/build.gradle.kts new file mode 100644 index 0000000..88dd48f --- /dev/null +++ b/demo/build.gradle.kts @@ -0,0 +1,24 @@ +plugins { + kotlin("jvm") version "1.3.72" +} + +val plotlyVersion: String by rootProject.extra + +repositories{ + jcenter() + maven("https://kotlin.bintray.com/kotlinx") + maven("https://dl.bintray.com/kotlin/kotlin-eap") + maven("https://dl.bintray.com/mipt-npm/dataforge") + maven("https://dl.bintray.com/mipt-npm/scientifik") + maven("https://dl.bintray.com/mipt-npm/dev") +} + +dependencies{ + implementation(kotlin("stdlib-jdk8")) + implementation(project(":dataforge-control-core")) + implementation("scientifik:plotlykt-server:$plotlyVersion") +} + +tasks.withType().configureEach { + kotlinOptions.jvmTarget = "11" +} \ No newline at end of file diff --git a/demo/src/main/kotlin/hep/dataforge/control/demo/ComplexStateFlowTest.kt b/demo/src/main/kotlin/hep/dataforge/control/demo/ComplexStateFlowTest.kt new file mode 100644 index 0000000..9fcb921 --- /dev/null +++ b/demo/src/main/kotlin/hep/dataforge/control/demo/ComplexStateFlowTest.kt @@ -0,0 +1,30 @@ +package hep.dataforge.control.demo + +import kotlinx.coroutines.* +import kotlinx.coroutines.flow.MutableStateFlow +import kotlinx.coroutines.flow.collect +import java.util.concurrent.Executors + +val producerDispatcher = Executors.newSingleThreadExecutor().asCoroutineDispatcher() + +fun main() { + runBlocking { + val test = MutableStateFlow(0) + + launch { + var counter = 0 + while (isActive){ + delay(500) + counter++ + println("produced $counter") + test.value = counter + } + } + + launch(producerDispatcher) { + test.collect{ + println("collected $it") + } + } + } +} \ No newline at end of file diff --git a/demo/src/main/kotlin/hep/dataforge/control/demo/DemoDevice.kt b/demo/src/main/kotlin/hep/dataforge/control/demo/DemoDevice.kt new file mode 100644 index 0000000..52fd895 --- /dev/null +++ b/demo/src/main/kotlin/hep/dataforge/control/demo/DemoDevice.kt @@ -0,0 +1,59 @@ +package hep.dataforge.control.demo + +import hep.dataforge.control.base.* +import hep.dataforge.control.controlers.double +import hep.dataforge.values.asValue +import kotlinx.coroutines.CoroutineScope +import kotlinx.coroutines.GlobalScope +import kotlinx.coroutines.asCoroutineDispatcher +import java.time.Instant +import java.util.concurrent.Executors +import kotlin.math.cos +import kotlin.math.sin +import kotlin.time.ExperimentalTime +import kotlin.time.seconds + +@OptIn(ExperimentalTime::class) +class DemoDevice(parentScope: CoroutineScope = GlobalScope) : DeviceBase() { + + private val executor = Executors.newSingleThreadExecutor() + + override val scope: CoroutineScope = CoroutineScope( + parentScope.coroutineContext + executor.asCoroutineDispatcher() + ) + + val scaleProperty: SimpleDeviceProperty by writingVirtual(5000.0.asValue()) + var scale by scaleProperty.double() + + val resetScale: Action by action { + scale = 5000.0 + } + + val sin by readingNumber { + val time = Instant.now() + sin(time.toEpochMilli().toDouble() / scale) + } + + val cos by readingNumber { + val time = Instant.now() + cos(time.toEpochMilli().toDouble() / scale) + } + + val coordinates by readingMeta { + val time = Instant.now() + "time" put time.toEpochMilli() + "x" put sin(time.toEpochMilli().toDouble() / scale) + "y" put cos(time.toEpochMilli().toDouble() / scale) + } + + init { + sin.readEvery(0.2.seconds) + cos.readEvery(0.2.seconds) + coordinates.readEvery(0.2.seconds) + } + + override fun close() { + super.close() + executor.shutdown() + } +} \ No newline at end of file diff --git a/demo/src/main/kotlin/hep/dataforge/control/demo/DemoMain.kt b/demo/src/main/kotlin/hep/dataforge/control/demo/DemoMain.kt new file mode 100644 index 0000000..8b0a2bb --- /dev/null +++ b/demo/src/main/kotlin/hep/dataforge/control/demo/DemoMain.kt @@ -0,0 +1,119 @@ +package hep.dataforge.control.demo + +import hep.dataforge.meta.double +import kotlinx.coroutines.Dispatchers +import kotlinx.coroutines.flow.collect +import kotlinx.coroutines.flow.mapNotNull +import kotlinx.coroutines.flow.zip +import kotlinx.coroutines.launch +import kotlinx.coroutines.runBlocking +import scientifik.plotly.Plotly +import scientifik.plotly.layout +import scientifik.plotly.models.Trace +import scientifik.plotly.server.pushUpdates +import scientifik.plotly.server.serve +import java.util.concurrent.ConcurrentLinkedQueue + +fun main() { + runBlocking(Dispatchers.Default) { + val device = DemoDevice() + + val sinFlow = device.sin.flow() + val cosFlow = device.cos.flow() + val sinCosFlow = sinFlow.zip(cosFlow) { sin, cos -> + sin.double to cos.double + } + +// launch { +// device.valueFlow().collect { (name, item) -> +// if (name == "sin") { +// println("Device produced $item") +// println("Sin value is ${sinFlow.value}") +// } +// } +// } +// +// launch { +// sinFlow.mapNotNull { it.double }.collect { +// println("Device processed $it") +// } +// } + + val server = Plotly.serve(this) { + plot(rowNumber = 0, colOrderNumber = 0, size = 6) { + layout { + title = "sin property" + xaxis.title = "point index" + yaxis.title = "sin" + } + val trace = Trace.empty() + data.add(trace) + launch { + val queue = ConcurrentLinkedQueue() + + sinFlow.mapNotNull { it.double }.collect { + queue.add(it) + if (queue.size >= 100) { + queue.poll() + } + trace.y.numbers = queue + } + } + } + plot(rowNumber = 0, colOrderNumber = 1, size = 6) { + layout { + title = "cos property" + xaxis.title = "point index" + yaxis.title = "cos" + } + val trace = Trace.empty() + data.add(trace) + launch { + val queue = ConcurrentLinkedQueue() + + cosFlow.mapNotNull { it.double }.collect { + queue.add(it) + if (queue.size >= 100) { + queue.poll() + } + trace.y.numbers = queue + } + } + } + plot(rowNumber = 1, colOrderNumber = 0, size = 12) { + layout { + title = "cos vs sin" + xaxis.title = "sin" + yaxis.title = "cos" + } + val trace = Trace.empty() + data.add(trace) + launch { + val queue = ConcurrentLinkedQueue>() + + sinCosFlow.collect { pair -> + val x = pair.first ?: return@collect + val y = pair.second ?: return@collect + queue.add(x to y) + if (queue.size >= 20) { + queue.poll() + } + trace.x.numbers = queue.map { it.first } + trace.y.numbers = queue.map { it.second } + } + } + } + + + + }.pushUpdates() + + + + readLine() + + println("Stopping") + server.stop() + device.close() + } +} \ No newline at end of file diff --git a/demo/src/main/kotlin/hep/dataforge/control/demo/SimpleStateFlowTest.kt b/demo/src/main/kotlin/hep/dataforge/control/demo/SimpleStateFlowTest.kt new file mode 100644 index 0000000..7865700 --- /dev/null +++ b/demo/src/main/kotlin/hep/dataforge/control/demo/SimpleStateFlowTest.kt @@ -0,0 +1,30 @@ +package hep.dataforge.control.demo + +import hep.dataforge.meta.MetaItem +import hep.dataforge.meta.double +import hep.dataforge.values.Null +import hep.dataforge.values.asValue +import kotlinx.coroutines.delay +import kotlinx.coroutines.flow.MutableStateFlow +import kotlinx.coroutines.flow.collect +import kotlinx.coroutines.flow.map +import kotlinx.coroutines.launch +import kotlinx.coroutines.runBlocking + +fun main() { + runBlocking { + val flow: MutableStateFlow> = MutableStateFlow>(MetaItem.ValueItem(Null)) + + val collector = launch { + flow.map { it.double }.collect { + println(it) + } + } + + repeat(10) { + delay(10) + flow.value = MetaItem.ValueItem(it.toDouble().asValue()) + } + collector.cancel() + } +} \ No newline at end of file diff --git a/settings.gradle.kts b/settings.gradle.kts index fa35c3c..4d7e789 100644 --- a/settings.gradle.kts +++ b/settings.gradle.kts @@ -1,6 +1,6 @@ pluginManagement { - val kotlinVersion = "1.3.61" - val toolsVersion = "0.3.2" + val kotlinVersion = "1.3.72" + val toolsVersion = "0.5.0" repositories { mavenLocal() @@ -27,6 +27,7 @@ pluginManagement { eachPlugin { when (requested.id.id) { "scientifik.publish", "scientifik.mpp", "scientifik.jvm", "scientifik.js" -> useModule("scientifik:gradle-tools:${toolsVersion}") + "kotlinx-atomicfu" -> useModule("org.jetbrains.kotlinx:atomicfu-gradle-plugin:${requested.version}") } } } @@ -35,7 +36,9 @@ pluginManagement { rootProject.name = "dataforge-control" include( - ":dataforge-control-core" + ":dataforge-control-core", + ":demo" ) -includeBuild("../dataforge-core") \ No newline at end of file +//includeBuild("../dataforge-core") +//includeBuild("../plotly.kt") \ No newline at end of file