Working prototype

This commit is contained in:
Alexander Nozik 2020-06-06 21:48:38 +03:00
parent 75ee237ac6
commit 3982f02f64
12 changed files with 261 additions and 205 deletions

View File

@ -35,7 +35,7 @@ open class SimpleReadOnlyDeviceProperty(
state.value = null
}
private fun update(item: MetaItem<*>) {
protected fun update(item: MetaItem<*>) {
state.value = item
updateCallback(name, item)
}
@ -102,7 +102,7 @@ fun <D : DeviceBase> D.readingValue(
this,
default?.let { MetaItem.ValueItem(it) },
PropertyDescriptor.invoke(descriptorBuilder),
{ MetaItem.ValueItem(Value.of(getter())) }
getter = { MetaItem.ValueItem(Value.of(getter())) }
)
fun <D : DeviceBase> D.readingNumber(
@ -113,7 +113,7 @@ fun <D : DeviceBase> D.readingNumber(
this,
default?.let { MetaItem.ValueItem(it.asValue()) },
PropertyDescriptor.invoke(descriptorBuilder),
{
getter = {
val number = getter()
MetaItem.ValueItem(number.asValue())
}
@ -127,7 +127,9 @@ fun <D : DeviceBase> D.readingMeta(
this,
default?.let { MetaItem.NodeItem(it) },
PropertyDescriptor.invoke(descriptorBuilder),
{ MetaItem.NodeItem(MetaBuilder().apply { getter() }) }
getter = {
MetaItem.NodeItem(MetaBuilder().apply { getter() })
}
)
@OptIn(ExperimentalCoroutinesApi::class)
@ -138,7 +140,7 @@ class SimpleDeviceProperty(
scope: CoroutineScope,
updateCallback: (name: String, item: MetaItem<*>?) -> Unit,
getter: suspend (MetaItem<*>?) -> MetaItem<*>,
private val setter: suspend (oldValue: MetaItem<*>?, newValue: MetaItem<*>) -> Unit
private val setter: suspend (oldValue: MetaItem<*>?, newValue: MetaItem<*>) -> MetaItem<*>?
) : SimpleReadOnlyDeviceProperty(name, default, descriptor, scope, updateCallback, getter), DeviceProperty {
override var value: MetaItem<*>?
@ -163,7 +165,9 @@ class SimpleDeviceProperty(
//all device operations should be run on device context
withContext(scope.coroutineContext) {
//TODO add error catching
setter(oldValue, item)
setter(oldValue, item)?.let {
update(it)
}
}
}
}
@ -174,7 +178,7 @@ private class DevicePropertyDelegate<D : DeviceBase>(
val default: MetaItem<*>?,
val descriptor: PropertyDescriptor = PropertyDescriptor.empty(),
private val getter: suspend (MetaItem<*>?) -> MetaItem<*>,
private val setter: suspend (oldValue: MetaItem<*>?, newValue: MetaItem<*>) -> Unit
private val setter: suspend (oldValue: MetaItem<*>?, newValue: MetaItem<*>) -> MetaItem<*>?
) : ReadOnlyProperty<D, SimpleDeviceProperty> {
override fun getValue(thisRef: D, property: KProperty<*>): SimpleDeviceProperty {
@ -198,7 +202,7 @@ fun <D : DeviceBase> D.writing(
default: MetaItem<*>? = null,
descriptorBuilder: PropertyDescriptor.() -> Unit = {},
getter: suspend (MetaItem<*>?) -> MetaItem<*>,
setter: suspend (oldValue: MetaItem<*>?, newValue: MetaItem<*>) -> Unit
setter: suspend (oldValue: MetaItem<*>?, newValue: MetaItem<*>) -> MetaItem<*>?
): ReadOnlyProperty<D, SimpleDeviceProperty> = DevicePropertyDelegate(
this,
default,
@ -214,7 +218,7 @@ fun <D : DeviceBase> D.writingVirtual(
default,
descriptorBuilder,
getter = { it ?: default },
setter = { _, _ -> }
setter = { _, newItem -> newItem }
)
fun <D : DeviceBase> D.writingVirtual(
@ -224,20 +228,20 @@ fun <D : DeviceBase> D.writingVirtual(
MetaItem.ValueItem(default),
descriptorBuilder,
getter = { it ?: MetaItem.ValueItem(default) },
setter = { _, _ -> }
setter = { _, newItem -> newItem }
)
fun <D : DeviceBase> D.writingDouble(
descriptorBuilder: PropertyDescriptor.() -> Unit = {},
getter: suspend (Double) -> Double,
setter: suspend (oldValue: Double?, newValue: Double) -> Unit
setter: suspend (oldValue: Double?, newValue: Double) -> Double?
): ReadOnlyProperty<D, SimpleDeviceProperty> {
val innerGetter: suspend (MetaItem<*>?) -> MetaItem<*> = {
MetaItem.ValueItem(getter(it.double ?: Double.NaN).asValue())
}
val innerSetter: suspend (oldValue: MetaItem<*>?, newValue: MetaItem<*>) -> Unit = { oldValue, newValue ->
setter(oldValue.double, newValue.double ?: Double.NaN)
val innerSetter: suspend (oldValue: MetaItem<*>?, newValue: MetaItem<*>) -> MetaItem<*>? = { oldValue, newValue ->
setter(oldValue.double, newValue.double ?: Double.NaN)?.asMetaItem()
}
return DevicePropertyDelegate(

View File

@ -0,0 +1,6 @@
package hep.dataforge.control.base
import hep.dataforge.meta.MetaItem
import hep.dataforge.values.asValue
fun Double.asMetaItem(): MetaItem.ValueItem = MetaItem.ValueItem(asValue())

View File

@ -12,7 +12,6 @@ import hep.dataforge.meta.string
import hep.dataforge.meta.wrap
import kotlinx.io.Binary
interface MessageConsumer {
fun consume(message: Envelope): Unit
}

View File

@ -11,7 +11,7 @@ import kotlinx.coroutines.launch
@ExperimentalCoroutinesApi
suspend fun Device.valueFlow(): Flow<Pair<String, MetaItem<*>>> = callbackFlow {
suspend fun Device.flowValues(): Flow<Pair<String, MetaItem<*>>> = callbackFlow {
val listener = object : PropertyChangeListener {
override fun propertyChanged(propertyName: String, value: MetaItem<*>?) {
if (value != null) {

View File

@ -2,9 +2,13 @@ package hep.dataforge.control.controlers
import hep.dataforge.control.base.DeviceProperty
import hep.dataforge.control.base.ReadOnlyDeviceProperty
import hep.dataforge.meta.MetaItem
import hep.dataforge.meta.double
import hep.dataforge.meta.map
import hep.dataforge.meta.transform
import hep.dataforge.values.asValue
fun ReadOnlyDeviceProperty.double() = map { it.double }
fun DeviceProperty.double() = transform { it.double ?: Double.NaN }
fun DeviceProperty.double() = map(
reader = { it.double ?: Double.NaN },
writer = { MetaItem.ValueItem(it.asValue()) }
)

View File

@ -1,5 +1,6 @@
plugins {
kotlin("jvm") version "1.3.72"
id("org.openjfx.javafxplugin") version "0.0.8"
}
val plotlyVersion: String by rootProject.extra
@ -14,11 +15,17 @@ repositories{
}
dependencies{
implementation(kotlin("stdlib-jdk8"))
implementation(project(":dataforge-control-core"))
implementation("no.tornado:tornadofx:1.7.20")
implementation(kotlin("stdlib-jdk8"))
implementation("scientifik:plotlykt-server:$plotlyVersion")
}
tasks.withType<org.jetbrains.kotlin.gradle.tasks.KotlinCompile>().configureEach {
kotlinOptions.jvmTarget = "11"
}
javafx{
version = "14"
modules("javafx.controls")
}

View File

@ -1,30 +0,0 @@
package hep.dataforge.control.demo
import kotlinx.coroutines.*
import kotlinx.coroutines.flow.MutableStateFlow
import kotlinx.coroutines.flow.collect
import java.util.concurrent.Executors
val producerDispatcher = Executors.newSingleThreadExecutor().asCoroutineDispatcher()
fun main() {
runBlocking {
val test = MutableStateFlow(0)
launch {
var counter = 0
while (isActive){
delay(500)
counter++
println("produced $counter")
test.value = counter
}
}
launch(producerDispatcher) {
test.collect{
println("collected $it")
}
}
}
}

View File

@ -0,0 +1,117 @@
package hep.dataforge.control.demo
import javafx.scene.Parent
import javafx.scene.control.Slider
import javafx.scene.layout.Priority
import javafx.stage.Stage
import kotlinx.coroutines.*
import org.slf4j.LoggerFactory
import scientifik.plotly.server.PlotlyServer
import tornadofx.*
import java.awt.Desktop
import java.net.URI
import kotlin.coroutines.CoroutineContext
val logger = LoggerFactory.getLogger("Demo")
class DemoController : Controller(), CoroutineScope {
var device: DemoDevice? = null
var server: PlotlyServer? = null
override val coroutineContext: CoroutineContext = GlobalScope.newCoroutineContext(Dispatchers.Default) + Job()
fun init() {
launch {
device = DemoDevice(this)
server = device?.let { servePlots(it) }
}
}
fun shutdown() {
logger.info("Shutting down...")
server?.stop()
logger.info("Visualization server stopped")
device?.close()
logger.info("Device server stopped")
cancel("Application context closed")
}
}
class DemoControllerView : View(title = " Demo controller remote") {
private val controller: DemoController by inject()
private var timeScaleSlider: Slider by singleAssign()
private var xScaleSlider: Slider by singleAssign()
private var yScaleSlider: Slider by singleAssign()
override val root: Parent = vbox {
hbox {
label("Time scale")
pane {
hgrow = Priority.ALWAYS
}
timeScaleSlider = slider(1000..10000, 5000) {
isShowTickLabels = true
isShowTickMarks = true
}
}
hbox {
label("X scale")
pane {
hgrow = Priority.ALWAYS
}
xScaleSlider = slider(0.0..2.0, 1.0) {
isShowTickLabels = true
isShowTickMarks = true
}
}
hbox {
label("Y scale")
pane {
hgrow = Priority.ALWAYS
}
yScaleSlider = slider(0.0..2.0, 1.0) {
isShowTickLabels = true
isShowTickMarks = true
}
}
button("Submit") {
useMaxWidth = true
action {
controller.device?.apply {
timeScaleValue = timeScaleSlider.value
sinScaleValue = xScaleSlider.value
cosScaleValue = yScaleSlider.value
}
}
}
button("Show plots") {
useMaxWidth = true
action {
controller.server?.run {
val uri = URI("http", null, host, port, null, null, null)
Desktop.getDesktop().browse(uri)
}
}
}
}
}
class DemoControllerApp : App(DemoControllerView::class) {
private val controller: DemoController by inject()
override fun start(stage: Stage) {
super.start(stage)
controller.init()
}
override fun stop() {
controller.shutdown()
super.stop()
}
}
fun main() {
launch<DemoControllerApp>()
}

View File

@ -22,34 +22,38 @@ class DemoDevice(parentScope: CoroutineScope = GlobalScope) : DeviceBase() {
parentScope.coroutineContext + executor.asCoroutineDispatcher()
)
val scaleProperty: SimpleDeviceProperty by writingVirtual(5000.0.asValue())
var scale by scaleProperty.double()
val timeScale: SimpleDeviceProperty by writingVirtual(5000.0.asValue())
var timeScaleValue by timeScale.double()
val resetScale: Action by action {
scale = 5000.0
timeScaleValue = 5000.0
}
val sinScale by writingVirtual(1.0.asValue())
var sinScaleValue by sinScale.double()
val sin by readingNumber {
val time = Instant.now()
sin(time.toEpochMilli().toDouble() / scale)
sin(time.toEpochMilli().toDouble() / timeScaleValue)*sinScaleValue
}
val cosScale by writingVirtual(1.0.asValue())
var cosScaleValue by cosScale.double()
val cos by readingNumber {
val time = Instant.now()
cos(time.toEpochMilli().toDouble() / scale)
cos(time.toEpochMilli().toDouble() / timeScaleValue)*cosScaleValue
}
val coordinates by readingMeta {
val time = Instant.now()
"time" put time.toEpochMilli()
"x" put sin(time.toEpochMilli().toDouble() / scale)
"y" put cos(time.toEpochMilli().toDouble() / scale)
"x" put sin(time.toEpochMilli().toDouble() / timeScaleValue)*sinScaleValue
"y" put cos(time.toEpochMilli().toDouble() / timeScaleValue)*cosScaleValue
}
init {
sin.readEvery(0.2.seconds)
cos.readEvery(0.2.seconds)
coordinates.readEvery(0.2.seconds)
coordinates.readEvery(0.3.seconds)
}
override fun close() {

View File

@ -1,119 +0,0 @@
package hep.dataforge.control.demo
import hep.dataforge.meta.double
import kotlinx.coroutines.Dispatchers
import kotlinx.coroutines.flow.collect
import kotlinx.coroutines.flow.mapNotNull
import kotlinx.coroutines.flow.zip
import kotlinx.coroutines.launch
import kotlinx.coroutines.runBlocking
import scientifik.plotly.Plotly
import scientifik.plotly.layout
import scientifik.plotly.models.Trace
import scientifik.plotly.server.pushUpdates
import scientifik.plotly.server.serve
import java.util.concurrent.ConcurrentLinkedQueue
fun main() {
runBlocking(Dispatchers.Default) {
val device = DemoDevice()
val sinFlow = device.sin.flow()
val cosFlow = device.cos.flow()
val sinCosFlow = sinFlow.zip(cosFlow) { sin, cos ->
sin.double to cos.double
}
// launch {
// device.valueFlow().collect { (name, item) ->
// if (name == "sin") {
// println("Device produced $item")
// println("Sin value is ${sinFlow.value}")
// }
// }
// }
//
// launch {
// sinFlow.mapNotNull { it.double }.collect {
// println("Device processed $it")
// }
// }
val server = Plotly.serve(this) {
plot(rowNumber = 0, colOrderNumber = 0, size = 6) {
layout {
title = "sin property"
xaxis.title = "point index"
yaxis.title = "sin"
}
val trace = Trace.empty()
data.add(trace)
launch {
val queue = ConcurrentLinkedQueue<Double>()
sinFlow.mapNotNull { it.double }.collect {
queue.add(it)
if (queue.size >= 100) {
queue.poll()
}
trace.y.numbers = queue
}
}
}
plot(rowNumber = 0, colOrderNumber = 1, size = 6) {
layout {
title = "cos property"
xaxis.title = "point index"
yaxis.title = "cos"
}
val trace = Trace.empty()
data.add(trace)
launch {
val queue = ConcurrentLinkedQueue<Double>()
cosFlow.mapNotNull { it.double }.collect {
queue.add(it)
if (queue.size >= 100) {
queue.poll()
}
trace.y.numbers = queue
}
}
}
plot(rowNumber = 1, colOrderNumber = 0, size = 12) {
layout {
title = "cos vs sin"
xaxis.title = "sin"
yaxis.title = "cos"
}
val trace = Trace.empty()
data.add(trace)
launch {
val queue = ConcurrentLinkedQueue<Pair<Double, Double>>()
sinCosFlow.collect { pair ->
val x = pair.first ?: return@collect
val y = pair.second ?: return@collect
queue.add(x to y)
if (queue.size >= 20) {
queue.poll()
}
trace.x.numbers = queue.map { it.first }
trace.y.numbers = queue.map { it.second }
}
}
}
}.pushUpdates()
readLine()
println("Stopping")
server.stop()
device.close()
}
}

View File

@ -1,30 +0,0 @@
package hep.dataforge.control.demo
import hep.dataforge.meta.MetaItem
import hep.dataforge.meta.double
import hep.dataforge.values.Null
import hep.dataforge.values.asValue
import kotlinx.coroutines.delay
import kotlinx.coroutines.flow.MutableStateFlow
import kotlinx.coroutines.flow.collect
import kotlinx.coroutines.flow.map
import kotlinx.coroutines.launch
import kotlinx.coroutines.runBlocking
fun main() {
runBlocking {
val flow: MutableStateFlow<MetaItem<*>> = MutableStateFlow<MetaItem<*>>(MetaItem.ValueItem(Null))
val collector = launch {
flow.map { it.double }.collect {
println(it)
}
}
repeat(10) {
delay(10)
flow.value = MetaItem.ValueItem(it.toDouble().asValue())
}
collector.cancel()
}
}

View File

@ -0,0 +1,94 @@
package hep.dataforge.control.demo
import hep.dataforge.meta.double
import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.flow.*
import kotlinx.coroutines.launch
import scientifik.plotly.Plotly
import scientifik.plotly.layout
import scientifik.plotly.models.Trace
import scientifik.plotly.server.PlotlyServer
import scientifik.plotly.server.pushUpdates
import scientifik.plotly.server.serve
import scientifik.plotly.trace
import java.util.concurrent.ConcurrentLinkedQueue
/**
* In-place replacement for absent method from stdlib
*/
fun <T> Flow<T>.windowed(size: Int): Flow<Iterable<T>> {
val queue = ConcurrentLinkedQueue<T>()
return flow {
this@windowed.collect {
queue.add(it)
if (queue.size >= size) {
queue.poll()
}
emit(queue)
}
}
}
suspend fun Trace.updateFrom(axisName: String, flow: Flow<Iterable<Double>>) {
flow.collect {
axis(axisName).numbers = it
}
}
suspend fun Trace.updateXYFrom(flow: Flow<Iterable<Pair<Double, Double>>>) {
flow.collect { pairs ->
x.numbers = pairs.map { it.first }
y.numbers = pairs.map { it.second }
}
}
fun CoroutineScope.servePlots(device: DemoDevice): PlotlyServer {
val sinFlow = device.sin.flow()
val cosFlow = device.cos.flow()
val sinCosFlow = sinFlow.zip(cosFlow) { sin, cos ->
sin.double!! to cos.double!!
}
return Plotly.serve(this) {
plot(rowNumber = 0, colOrderNumber = 0, size = 6) {
layout {
title = "sin property"
xaxis.title = "point index"
yaxis.title = "sin"
}
trace {
this@servePlots.launch {
val flow: Flow<Iterable<Double>> = sinFlow.mapNotNull { it.double }.windowed(100)
updateFrom(Trace.Y_AXIS, flow)
}
}
}
plot(rowNumber = 0, colOrderNumber = 1, size = 6) {
layout {
title = "cos property"
xaxis.title = "point index"
yaxis.title = "cos"
}
trace {
this@servePlots.launch {
val flow: Flow<Iterable<Double>> = cosFlow.mapNotNull { it.double }.windowed(100)
updateFrom(Trace.Y_AXIS, flow)
}
}
}
plot(rowNumber = 1, colOrderNumber = 0, size = 12) {
layout {
title = "cos vs sin"
xaxis.title = "sin"
yaxis.title = "cos"
}
trace {
name = "non-synchronized"
this@servePlots.launch {
val flow: Flow<Iterable<Pair<Double, Double>>> = sinCosFlow.windowed(30)
updateXYFrom(flow)
}
}
}
}.pushUpdates()
}