diff --git a/dataforge-control-core/src/commonMain/kotlin/hep/dataforge/control/base/SimpleDeviceProperty.kt b/dataforge-control-core/src/commonMain/kotlin/hep/dataforge/control/base/SimpleDeviceProperty.kt index 23730b0..7c15450 100644 --- a/dataforge-control-core/src/commonMain/kotlin/hep/dataforge/control/base/SimpleDeviceProperty.kt +++ b/dataforge-control-core/src/commonMain/kotlin/hep/dataforge/control/base/SimpleDeviceProperty.kt @@ -35,7 +35,7 @@ open class SimpleReadOnlyDeviceProperty( state.value = null } - private fun update(item: MetaItem<*>) { + protected fun update(item: MetaItem<*>) { state.value = item updateCallback(name, item) } @@ -102,7 +102,7 @@ fun D.readingValue( this, default?.let { MetaItem.ValueItem(it) }, PropertyDescriptor.invoke(descriptorBuilder), - { MetaItem.ValueItem(Value.of(getter())) } + getter = { MetaItem.ValueItem(Value.of(getter())) } ) fun D.readingNumber( @@ -113,7 +113,7 @@ fun D.readingNumber( this, default?.let { MetaItem.ValueItem(it.asValue()) }, PropertyDescriptor.invoke(descriptorBuilder), - { + getter = { val number = getter() MetaItem.ValueItem(number.asValue()) } @@ -127,7 +127,9 @@ fun D.readingMeta( this, default?.let { MetaItem.NodeItem(it) }, PropertyDescriptor.invoke(descriptorBuilder), - { MetaItem.NodeItem(MetaBuilder().apply { getter() }) } + getter = { + MetaItem.NodeItem(MetaBuilder().apply { getter() }) + } ) @OptIn(ExperimentalCoroutinesApi::class) @@ -138,7 +140,7 @@ class SimpleDeviceProperty( scope: CoroutineScope, updateCallback: (name: String, item: MetaItem<*>?) -> Unit, getter: suspend (MetaItem<*>?) -> MetaItem<*>, - private val setter: suspend (oldValue: MetaItem<*>?, newValue: MetaItem<*>) -> Unit + private val setter: suspend (oldValue: MetaItem<*>?, newValue: MetaItem<*>) -> MetaItem<*>? ) : SimpleReadOnlyDeviceProperty(name, default, descriptor, scope, updateCallback, getter), DeviceProperty { override var value: MetaItem<*>? @@ -163,7 +165,9 @@ class SimpleDeviceProperty( //all device operations should be run on device context withContext(scope.coroutineContext) { //TODO add error catching - setter(oldValue, item) + setter(oldValue, item)?.let { + update(it) + } } } } @@ -174,7 +178,7 @@ private class DevicePropertyDelegate( val default: MetaItem<*>?, val descriptor: PropertyDescriptor = PropertyDescriptor.empty(), private val getter: suspend (MetaItem<*>?) -> MetaItem<*>, - private val setter: suspend (oldValue: MetaItem<*>?, newValue: MetaItem<*>) -> Unit + private val setter: suspend (oldValue: MetaItem<*>?, newValue: MetaItem<*>) -> MetaItem<*>? ) : ReadOnlyProperty { override fun getValue(thisRef: D, property: KProperty<*>): SimpleDeviceProperty { @@ -198,7 +202,7 @@ fun D.writing( default: MetaItem<*>? = null, descriptorBuilder: PropertyDescriptor.() -> Unit = {}, getter: suspend (MetaItem<*>?) -> MetaItem<*>, - setter: suspend (oldValue: MetaItem<*>?, newValue: MetaItem<*>) -> Unit + setter: suspend (oldValue: MetaItem<*>?, newValue: MetaItem<*>) -> MetaItem<*>? ): ReadOnlyProperty = DevicePropertyDelegate( this, default, @@ -214,7 +218,7 @@ fun D.writingVirtual( default, descriptorBuilder, getter = { it ?: default }, - setter = { _, _ -> } + setter = { _, newItem -> newItem } ) fun D.writingVirtual( @@ -224,20 +228,20 @@ fun D.writingVirtual( MetaItem.ValueItem(default), descriptorBuilder, getter = { it ?: MetaItem.ValueItem(default) }, - setter = { _, _ -> } + setter = { _, newItem -> newItem } ) fun D.writingDouble( descriptorBuilder: PropertyDescriptor.() -> Unit = {}, getter: suspend (Double) -> Double, - setter: suspend (oldValue: Double?, newValue: Double) -> Unit + setter: suspend (oldValue: Double?, newValue: Double) -> Double? ): ReadOnlyProperty { val innerGetter: suspend (MetaItem<*>?) -> MetaItem<*> = { MetaItem.ValueItem(getter(it.double ?: Double.NaN).asValue()) } - val innerSetter: suspend (oldValue: MetaItem<*>?, newValue: MetaItem<*>) -> Unit = { oldValue, newValue -> - setter(oldValue.double, newValue.double ?: Double.NaN) + val innerSetter: suspend (oldValue: MetaItem<*>?, newValue: MetaItem<*>) -> MetaItem<*>? = { oldValue, newValue -> + setter(oldValue.double, newValue.double ?: Double.NaN)?.asMetaItem() } return DevicePropertyDelegate( diff --git a/dataforge-control-core/src/commonMain/kotlin/hep/dataforge/control/base/misc.kt b/dataforge-control-core/src/commonMain/kotlin/hep/dataforge/control/base/misc.kt new file mode 100644 index 0000000..e627c18 --- /dev/null +++ b/dataforge-control-core/src/commonMain/kotlin/hep/dataforge/control/base/misc.kt @@ -0,0 +1,6 @@ +package hep.dataforge.control.base + +import hep.dataforge.meta.MetaItem +import hep.dataforge.values.asValue + +fun Double.asMetaItem(): MetaItem.ValueItem = MetaItem.ValueItem(asValue()) \ No newline at end of file diff --git a/dataforge-control-core/src/commonMain/kotlin/hep/dataforge/control/controlers/MessageController.kt b/dataforge-control-core/src/commonMain/kotlin/hep/dataforge/control/controlers/MessageController.kt index 3b89b5f..778e337 100644 --- a/dataforge-control-core/src/commonMain/kotlin/hep/dataforge/control/controlers/MessageController.kt +++ b/dataforge-control-core/src/commonMain/kotlin/hep/dataforge/control/controlers/MessageController.kt @@ -12,7 +12,6 @@ import hep.dataforge.meta.string import hep.dataforge.meta.wrap import kotlinx.io.Binary - interface MessageConsumer { fun consume(message: Envelope): Unit } diff --git a/dataforge-control-core/src/commonMain/kotlin/hep/dataforge/control/controlers/PropertyFlow.kt b/dataforge-control-core/src/commonMain/kotlin/hep/dataforge/control/controlers/PropertyFlow.kt index e23ab47..2861a06 100644 --- a/dataforge-control-core/src/commonMain/kotlin/hep/dataforge/control/controlers/PropertyFlow.kt +++ b/dataforge-control-core/src/commonMain/kotlin/hep/dataforge/control/controlers/PropertyFlow.kt @@ -11,7 +11,7 @@ import kotlinx.coroutines.launch @ExperimentalCoroutinesApi -suspend fun Device.valueFlow(): Flow>> = callbackFlow { +suspend fun Device.flowValues(): Flow>> = callbackFlow { val listener = object : PropertyChangeListener { override fun propertyChanged(propertyName: String, value: MetaItem<*>?) { if (value != null) { diff --git a/dataforge-control-core/src/commonMain/kotlin/hep/dataforge/control/controlers/delegateMappers.kt b/dataforge-control-core/src/commonMain/kotlin/hep/dataforge/control/controlers/delegateMappers.kt index 1e58386..1827d94 100644 --- a/dataforge-control-core/src/commonMain/kotlin/hep/dataforge/control/controlers/delegateMappers.kt +++ b/dataforge-control-core/src/commonMain/kotlin/hep/dataforge/control/controlers/delegateMappers.kt @@ -2,9 +2,13 @@ package hep.dataforge.control.controlers import hep.dataforge.control.base.DeviceProperty import hep.dataforge.control.base.ReadOnlyDeviceProperty +import hep.dataforge.meta.MetaItem import hep.dataforge.meta.double import hep.dataforge.meta.map -import hep.dataforge.meta.transform +import hep.dataforge.values.asValue fun ReadOnlyDeviceProperty.double() = map { it.double } -fun DeviceProperty.double() = transform { it.double ?: Double.NaN } +fun DeviceProperty.double() = map( + reader = { it.double ?: Double.NaN }, + writer = { MetaItem.ValueItem(it.asValue()) } +) diff --git a/demo/build.gradle.kts b/demo/build.gradle.kts index 88dd48f..cb86a7d 100644 --- a/demo/build.gradle.kts +++ b/demo/build.gradle.kts @@ -1,5 +1,6 @@ plugins { kotlin("jvm") version "1.3.72" + id("org.openjfx.javafxplugin") version "0.0.8" } val plotlyVersion: String by rootProject.extra @@ -14,11 +15,17 @@ repositories{ } dependencies{ - implementation(kotlin("stdlib-jdk8")) implementation(project(":dataforge-control-core")) + implementation("no.tornado:tornadofx:1.7.20") + implementation(kotlin("stdlib-jdk8")) implementation("scientifik:plotlykt-server:$plotlyVersion") } tasks.withType().configureEach { kotlinOptions.jvmTarget = "11" +} + +javafx{ + version = "14" + modules("javafx.controls") } \ No newline at end of file diff --git a/demo/src/main/kotlin/hep/dataforge/control/demo/ComplexStateFlowTest.kt b/demo/src/main/kotlin/hep/dataforge/control/demo/ComplexStateFlowTest.kt deleted file mode 100644 index 9fcb921..0000000 --- a/demo/src/main/kotlin/hep/dataforge/control/demo/ComplexStateFlowTest.kt +++ /dev/null @@ -1,30 +0,0 @@ -package hep.dataforge.control.demo - -import kotlinx.coroutines.* -import kotlinx.coroutines.flow.MutableStateFlow -import kotlinx.coroutines.flow.collect -import java.util.concurrent.Executors - -val producerDispatcher = Executors.newSingleThreadExecutor().asCoroutineDispatcher() - -fun main() { - runBlocking { - val test = MutableStateFlow(0) - - launch { - var counter = 0 - while (isActive){ - delay(500) - counter++ - println("produced $counter") - test.value = counter - } - } - - launch(producerDispatcher) { - test.collect{ - println("collected $it") - } - } - } -} \ No newline at end of file diff --git a/demo/src/main/kotlin/hep/dataforge/control/demo/DemoControllerView.kt b/demo/src/main/kotlin/hep/dataforge/control/demo/DemoControllerView.kt new file mode 100644 index 0000000..cf22fa4 --- /dev/null +++ b/demo/src/main/kotlin/hep/dataforge/control/demo/DemoControllerView.kt @@ -0,0 +1,117 @@ +package hep.dataforge.control.demo + +import javafx.scene.Parent +import javafx.scene.control.Slider +import javafx.scene.layout.Priority +import javafx.stage.Stage +import kotlinx.coroutines.* +import org.slf4j.LoggerFactory +import scientifik.plotly.server.PlotlyServer +import tornadofx.* +import java.awt.Desktop +import java.net.URI +import kotlin.coroutines.CoroutineContext + +val logger = LoggerFactory.getLogger("Demo") + +class DemoController : Controller(), CoroutineScope { + + var device: DemoDevice? = null + var server: PlotlyServer? = null + override val coroutineContext: CoroutineContext = GlobalScope.newCoroutineContext(Dispatchers.Default) + Job() + + fun init() { + launch { + device = DemoDevice(this) + server = device?.let { servePlots(it) } + } + } + + fun shutdown() { + logger.info("Shutting down...") + server?.stop() + logger.info("Visualization server stopped") + device?.close() + logger.info("Device server stopped") + cancel("Application context closed") + } +} + + +class DemoControllerView : View(title = " Demo controller remote") { + private val controller: DemoController by inject() + private var timeScaleSlider: Slider by singleAssign() + private var xScaleSlider: Slider by singleAssign() + private var yScaleSlider: Slider by singleAssign() + + override val root: Parent = vbox { + hbox { + label("Time scale") + pane { + hgrow = Priority.ALWAYS + } + timeScaleSlider = slider(1000..10000, 5000) { + isShowTickLabels = true + isShowTickMarks = true + } + } + hbox { + label("X scale") + pane { + hgrow = Priority.ALWAYS + } + xScaleSlider = slider(0.0..2.0, 1.0) { + isShowTickLabels = true + isShowTickMarks = true + } + } + hbox { + label("Y scale") + pane { + hgrow = Priority.ALWAYS + } + yScaleSlider = slider(0.0..2.0, 1.0) { + isShowTickLabels = true + isShowTickMarks = true + } + } + button("Submit") { + useMaxWidth = true + action { + controller.device?.apply { + timeScaleValue = timeScaleSlider.value + sinScaleValue = xScaleSlider.value + cosScaleValue = yScaleSlider.value + } + } + } + button("Show plots") { + useMaxWidth = true + action { + controller.server?.run { + val uri = URI("http", null, host, port, null, null, null) + Desktop.getDesktop().browse(uri) + } + } + } + } +} + +class DemoControllerApp : App(DemoControllerView::class) { + private val controller: DemoController by inject() + + override fun start(stage: Stage) { + super.start(stage) + controller.init() + } + + override fun stop() { + controller.shutdown() + super.stop() + } +} + + +fun main() { + launch() +} \ No newline at end of file diff --git a/demo/src/main/kotlin/hep/dataforge/control/demo/DemoDevice.kt b/demo/src/main/kotlin/hep/dataforge/control/demo/DemoDevice.kt index 52fd895..9c7c1c5 100644 --- a/demo/src/main/kotlin/hep/dataforge/control/demo/DemoDevice.kt +++ b/demo/src/main/kotlin/hep/dataforge/control/demo/DemoDevice.kt @@ -22,34 +22,38 @@ class DemoDevice(parentScope: CoroutineScope = GlobalScope) : DeviceBase() { parentScope.coroutineContext + executor.asCoroutineDispatcher() ) - val scaleProperty: SimpleDeviceProperty by writingVirtual(5000.0.asValue()) - var scale by scaleProperty.double() + val timeScale: SimpleDeviceProperty by writingVirtual(5000.0.asValue()) + var timeScaleValue by timeScale.double() val resetScale: Action by action { - scale = 5000.0 + timeScaleValue = 5000.0 } + val sinScale by writingVirtual(1.0.asValue()) + var sinScaleValue by sinScale.double() val sin by readingNumber { val time = Instant.now() - sin(time.toEpochMilli().toDouble() / scale) + sin(time.toEpochMilli().toDouble() / timeScaleValue)*sinScaleValue } + val cosScale by writingVirtual(1.0.asValue()) + var cosScaleValue by cosScale.double() val cos by readingNumber { val time = Instant.now() - cos(time.toEpochMilli().toDouble() / scale) + cos(time.toEpochMilli().toDouble() / timeScaleValue)*cosScaleValue } val coordinates by readingMeta { val time = Instant.now() "time" put time.toEpochMilli() - "x" put sin(time.toEpochMilli().toDouble() / scale) - "y" put cos(time.toEpochMilli().toDouble() / scale) + "x" put sin(time.toEpochMilli().toDouble() / timeScaleValue)*sinScaleValue + "y" put cos(time.toEpochMilli().toDouble() / timeScaleValue)*cosScaleValue } init { sin.readEvery(0.2.seconds) cos.readEvery(0.2.seconds) - coordinates.readEvery(0.2.seconds) + coordinates.readEvery(0.3.seconds) } override fun close() { diff --git a/demo/src/main/kotlin/hep/dataforge/control/demo/DemoMain.kt b/demo/src/main/kotlin/hep/dataforge/control/demo/DemoMain.kt deleted file mode 100644 index 8b0a2bb..0000000 --- a/demo/src/main/kotlin/hep/dataforge/control/demo/DemoMain.kt +++ /dev/null @@ -1,119 +0,0 @@ -package hep.dataforge.control.demo - -import hep.dataforge.meta.double -import kotlinx.coroutines.Dispatchers -import kotlinx.coroutines.flow.collect -import kotlinx.coroutines.flow.mapNotNull -import kotlinx.coroutines.flow.zip -import kotlinx.coroutines.launch -import kotlinx.coroutines.runBlocking -import scientifik.plotly.Plotly -import scientifik.plotly.layout -import scientifik.plotly.models.Trace -import scientifik.plotly.server.pushUpdates -import scientifik.plotly.server.serve -import java.util.concurrent.ConcurrentLinkedQueue - -fun main() { - runBlocking(Dispatchers.Default) { - val device = DemoDevice() - - val sinFlow = device.sin.flow() - val cosFlow = device.cos.flow() - val sinCosFlow = sinFlow.zip(cosFlow) { sin, cos -> - sin.double to cos.double - } - -// launch { -// device.valueFlow().collect { (name, item) -> -// if (name == "sin") { -// println("Device produced $item") -// println("Sin value is ${sinFlow.value}") -// } -// } -// } -// -// launch { -// sinFlow.mapNotNull { it.double }.collect { -// println("Device processed $it") -// } -// } - - val server = Plotly.serve(this) { - plot(rowNumber = 0, colOrderNumber = 0, size = 6) { - layout { - title = "sin property" - xaxis.title = "point index" - yaxis.title = "sin" - } - val trace = Trace.empty() - data.add(trace) - launch { - val queue = ConcurrentLinkedQueue() - - sinFlow.mapNotNull { it.double }.collect { - queue.add(it) - if (queue.size >= 100) { - queue.poll() - } - trace.y.numbers = queue - } - } - } - plot(rowNumber = 0, colOrderNumber = 1, size = 6) { - layout { - title = "cos property" - xaxis.title = "point index" - yaxis.title = "cos" - } - val trace = Trace.empty() - data.add(trace) - launch { - val queue = ConcurrentLinkedQueue() - - cosFlow.mapNotNull { it.double }.collect { - queue.add(it) - if (queue.size >= 100) { - queue.poll() - } - trace.y.numbers = queue - } - } - } - plot(rowNumber = 1, colOrderNumber = 0, size = 12) { - layout { - title = "cos vs sin" - xaxis.title = "sin" - yaxis.title = "cos" - } - val trace = Trace.empty() - data.add(trace) - launch { - val queue = ConcurrentLinkedQueue>() - - sinCosFlow.collect { pair -> - val x = pair.first ?: return@collect - val y = pair.second ?: return@collect - queue.add(x to y) - if (queue.size >= 20) { - queue.poll() - } - trace.x.numbers = queue.map { it.first } - trace.y.numbers = queue.map { it.second } - } - } - } - - - - }.pushUpdates() - - - - readLine() - - println("Stopping") - server.stop() - device.close() - } -} \ No newline at end of file diff --git a/demo/src/main/kotlin/hep/dataforge/control/demo/SimpleStateFlowTest.kt b/demo/src/main/kotlin/hep/dataforge/control/demo/SimpleStateFlowTest.kt deleted file mode 100644 index 7865700..0000000 --- a/demo/src/main/kotlin/hep/dataforge/control/demo/SimpleStateFlowTest.kt +++ /dev/null @@ -1,30 +0,0 @@ -package hep.dataforge.control.demo - -import hep.dataforge.meta.MetaItem -import hep.dataforge.meta.double -import hep.dataforge.values.Null -import hep.dataforge.values.asValue -import kotlinx.coroutines.delay -import kotlinx.coroutines.flow.MutableStateFlow -import kotlinx.coroutines.flow.collect -import kotlinx.coroutines.flow.map -import kotlinx.coroutines.launch -import kotlinx.coroutines.runBlocking - -fun main() { - runBlocking { - val flow: MutableStateFlow> = MutableStateFlow>(MetaItem.ValueItem(Null)) - - val collector = launch { - flow.map { it.double }.collect { - println(it) - } - } - - repeat(10) { - delay(10) - flow.value = MetaItem.ValueItem(it.toDouble().asValue()) - } - collector.cancel() - } -} \ No newline at end of file diff --git a/demo/src/main/kotlin/hep/dataforge/control/demo/demoDeviceDisplay.kt b/demo/src/main/kotlin/hep/dataforge/control/demo/demoDeviceDisplay.kt new file mode 100644 index 0000000..668fa4b --- /dev/null +++ b/demo/src/main/kotlin/hep/dataforge/control/demo/demoDeviceDisplay.kt @@ -0,0 +1,94 @@ +package hep.dataforge.control.demo + +import hep.dataforge.meta.double +import kotlinx.coroutines.CoroutineScope +import kotlinx.coroutines.flow.* +import kotlinx.coroutines.launch +import scientifik.plotly.Plotly +import scientifik.plotly.layout +import scientifik.plotly.models.Trace +import scientifik.plotly.server.PlotlyServer +import scientifik.plotly.server.pushUpdates +import scientifik.plotly.server.serve +import scientifik.plotly.trace +import java.util.concurrent.ConcurrentLinkedQueue + +/** + * In-place replacement for absent method from stdlib + */ +fun Flow.windowed(size: Int): Flow> { + val queue = ConcurrentLinkedQueue() + return flow { + this@windowed.collect { + queue.add(it) + if (queue.size >= size) { + queue.poll() + } + emit(queue) + } + } +} + +suspend fun Trace.updateFrom(axisName: String, flow: Flow>) { + flow.collect { + axis(axisName).numbers = it + } +} + +suspend fun Trace.updateXYFrom(flow: Flow>>) { + flow.collect { pairs -> + x.numbers = pairs.map { it.first } + y.numbers = pairs.map { it.second } + } +} + +fun CoroutineScope.servePlots(device: DemoDevice): PlotlyServer { + val sinFlow = device.sin.flow() + val cosFlow = device.cos.flow() + val sinCosFlow = sinFlow.zip(cosFlow) { sin, cos -> + sin.double!! to cos.double!! + } + + return Plotly.serve(this) { + plot(rowNumber = 0, colOrderNumber = 0, size = 6) { + layout { + title = "sin property" + xaxis.title = "point index" + yaxis.title = "sin" + } + trace { + this@servePlots.launch { + val flow: Flow> = sinFlow.mapNotNull { it.double }.windowed(100) + updateFrom(Trace.Y_AXIS, flow) + } + } + } + plot(rowNumber = 0, colOrderNumber = 1, size = 6) { + layout { + title = "cos property" + xaxis.title = "point index" + yaxis.title = "cos" + } + trace { + this@servePlots.launch { + val flow: Flow> = cosFlow.mapNotNull { it.double }.windowed(100) + updateFrom(Trace.Y_AXIS, flow) + } + } + } + plot(rowNumber = 1, colOrderNumber = 0, size = 12) { + layout { + title = "cos vs sin" + xaxis.title = "sin" + yaxis.title = "cos" + } + trace { + name = "non-synchronized" + this@servePlots.launch { + val flow: Flow>> = sinCosFlow.windowed(30) + updateXYFrom(flow) + } + } + } + }.pushUpdates() +}