Fix client updates
This commit is contained in:
parent
81aa5d2fcc
commit
960d17855b
@ -1,20 +1,14 @@
|
||||
package space.kscience.visionforge.examples
|
||||
|
||||
import space.kscience.gdml.GdmlShowCase
|
||||
import space.kscience.visionforge.Colors
|
||||
import space.kscience.visionforge.gdml.gdml
|
||||
import space.kscience.visionforge.solid.Solids
|
||||
import space.kscience.visionforge.solid.ambientLight
|
||||
import space.kscience.visionforge.solid.set
|
||||
import space.kscience.visionforge.solid.solid
|
||||
|
||||
fun main() = makeVisionFile {
|
||||
vision("canvas") {
|
||||
requirePlugin(Solids)
|
||||
solid {
|
||||
ambientLight {
|
||||
color.set(Colors.white)
|
||||
}
|
||||
gdml(GdmlShowCase.babyIaxo(), "D0")
|
||||
}
|
||||
}
|
||||
|
@ -6,6 +6,7 @@ import kotlinx.html.div
|
||||
import kotlinx.html.h1
|
||||
import space.kscience.dataforge.context.Context
|
||||
import space.kscience.dataforge.context.fetch
|
||||
import space.kscience.dataforge.meta.Null
|
||||
import space.kscience.dataforge.misc.DFExperimental
|
||||
import space.kscience.dataforge.names.Name
|
||||
import space.kscience.visionforge.Colors
|
||||
@ -56,7 +57,8 @@ fun main() {
|
||||
val targetVision = sat[target] as Solid
|
||||
targetVision.color.set("red")
|
||||
delay(1000)
|
||||
targetVision.color.clear()
|
||||
//use to ensure that color is cleared
|
||||
targetVision.color.value = Null
|
||||
delay(500)
|
||||
}
|
||||
}
|
||||
|
@ -5,6 +5,8 @@ import kotlinx.coroutines.flow.Flow
|
||||
import kotlinx.coroutines.flow.flow
|
||||
import kotlinx.coroutines.flow.launchIn
|
||||
import kotlinx.coroutines.flow.onEach
|
||||
import kotlinx.coroutines.sync.Mutex
|
||||
import kotlinx.coroutines.sync.withLock
|
||||
import kotlinx.serialization.Serializable
|
||||
import space.kscience.dataforge.meta.*
|
||||
import space.kscience.dataforge.meta.descriptors.MetaDescriptor
|
||||
@ -47,7 +49,7 @@ public object NullVision : Vision {
|
||||
/**
|
||||
* An update for a [Vision]
|
||||
*/
|
||||
public class VisionChangeBuilder(private val manager: VisionManager) : MutableVisionContainer<Vision> {
|
||||
public class VisionChangeBuilder : MutableVisionContainer<Vision> {
|
||||
|
||||
private var vision: Vision? = null
|
||||
private var propertyChange = MutableMeta()
|
||||
@ -57,7 +59,14 @@ public class VisionChangeBuilder(private val manager: VisionManager) : MutableVi
|
||||
|
||||
@Synchronized
|
||||
private fun getOrPutChild(visionName: Name): VisionChangeBuilder =
|
||||
children.getOrPut(visionName) { VisionChangeBuilder(manager) }
|
||||
children.getOrPut(visionName) { VisionChangeBuilder() }
|
||||
|
||||
@Synchronized
|
||||
internal fun reset() {
|
||||
vision = null
|
||||
propertyChange = MutableMeta()
|
||||
children.clear()
|
||||
}
|
||||
|
||||
public fun propertyChanged(visionName: Name, propertyName: Name, item: Meta?) {
|
||||
if (visionName == Name.EMPTY) {
|
||||
@ -82,10 +91,10 @@ public class VisionChangeBuilder(private val manager: VisionManager) : MutableVi
|
||||
/**
|
||||
* Isolate collected changes by creating detached copies of given visions
|
||||
*/
|
||||
public fun deepCopy(): VisionChange = VisionChange(
|
||||
vision?.deepCopy(manager),
|
||||
public fun deepCopy(visionManager: VisionManager): VisionChange = VisionChange(
|
||||
vision?.deepCopy(visionManager),
|
||||
if (propertyChange.isEmpty()) null else propertyChange.seal(),
|
||||
if (children.isEmpty()) null else children.mapValues { it.value.deepCopy() }
|
||||
if (children.isEmpty()) null else children.mapValues { it.value.deepCopy(visionManager) }
|
||||
)
|
||||
}
|
||||
|
||||
@ -102,12 +111,13 @@ public data class VisionChange(
|
||||
)
|
||||
|
||||
public inline fun VisionManager.VisionChange(block: VisionChangeBuilder.() -> Unit): VisionChange =
|
||||
VisionChangeBuilder(this).apply(block).deepCopy()
|
||||
VisionChangeBuilder().apply(block).deepCopy(this)
|
||||
|
||||
|
||||
private fun CoroutineScope.collectChange(
|
||||
name: Name,
|
||||
source: Vision,
|
||||
mutex: Mutex,
|
||||
collector: () -> VisionChangeBuilder,
|
||||
) {
|
||||
|
||||
@ -120,7 +130,7 @@ private fun CoroutineScope.collectChange(
|
||||
val children = source.children
|
||||
//Subscribe for children changes
|
||||
children?.forEach { token, child ->
|
||||
collectChange(name + token, child, collector)
|
||||
collectChange(name + token, child, mutex, collector)
|
||||
}
|
||||
|
||||
//Subscribe for structure change
|
||||
@ -128,9 +138,11 @@ private fun CoroutineScope.collectChange(
|
||||
val after = children[changedName]
|
||||
val fullName = name + changedName
|
||||
if (after != null) {
|
||||
collectChange(fullName, after, collector)
|
||||
collectChange(fullName, after, mutex, collector)
|
||||
}
|
||||
mutex.withLock {
|
||||
collector().setChild(fullName, after)
|
||||
}
|
||||
collector().setChild(fullName, after)
|
||||
}?.launchIn(this)
|
||||
}
|
||||
|
||||
@ -141,24 +153,26 @@ public fun Vision.flowChanges(
|
||||
collectionDuration: Duration,
|
||||
): Flow<VisionChange> = flow {
|
||||
val manager = manager ?: error("Orphan vision could not collect changes")
|
||||
|
||||
var collector = VisionChangeBuilder(manager)
|
||||
coroutineScope {
|
||||
collectChange(Name.EMPTY, this@flowChanges) { collector }
|
||||
val collector = VisionChangeBuilder()
|
||||
val mutex = Mutex()
|
||||
collectChange(Name.EMPTY, this@flowChanges, mutex) { collector }
|
||||
|
||||
//Send initial vision state
|
||||
val initialChange = VisionChange(vision = deepCopy(manager))
|
||||
emit(initialChange)
|
||||
|
||||
while (currentCoroutineContext().isActive) {
|
||||
while (true) {
|
||||
//Wait for changes to accumulate
|
||||
delay(collectionDuration)
|
||||
//Propagate updates only if something is changed
|
||||
if (!collector.isEmpty()) {
|
||||
//emit changes
|
||||
emit(collector.deepCopy())
|
||||
emit(collector.deepCopy(manager))
|
||||
//Reset the collector
|
||||
collector = VisionChangeBuilder(manager)
|
||||
mutex.withLock {
|
||||
collector.reset()
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -189,7 +189,9 @@ public abstract class AbstractVisionProperties(
|
||||
}
|
||||
|
||||
override fun setProperty(name: Name, node: Meta?, notify: Boolean) {
|
||||
//TODO check old value?
|
||||
//ignore if the value is the same as existing
|
||||
if (own?.getMeta(name) == node) return
|
||||
|
||||
if (name.isEmpty()) {
|
||||
properties = node?.asMutableMeta()
|
||||
} else if (node == null) {
|
||||
@ -203,7 +205,9 @@ public abstract class AbstractVisionProperties(
|
||||
}
|
||||
|
||||
override fun setValue(name: Name, value: Value?, notify: Boolean) {
|
||||
//TODO check old value?
|
||||
//ignore if the value is the same as existing
|
||||
if (own?.getValue(name) == value) return
|
||||
|
||||
if (value == null) {
|
||||
properties?.getMeta(name)?.value = null
|
||||
} else {
|
||||
|
@ -3,8 +3,8 @@ package space.kscience.visionforge
|
||||
import kotlinx.browser.document
|
||||
import kotlinx.browser.window
|
||||
import kotlinx.coroutines.Job
|
||||
import kotlinx.coroutines.flow.launchIn
|
||||
import kotlinx.coroutines.flow.onEach
|
||||
import kotlinx.coroutines.delay
|
||||
import kotlinx.coroutines.launch
|
||||
import org.w3c.dom.*
|
||||
import org.w3c.dom.url.URL
|
||||
import space.kscience.dataforge.context.*
|
||||
@ -63,6 +63,17 @@ public class VisionClient : AbstractPlugin() {
|
||||
|
||||
private fun Element.getFlag(attribute: String): Boolean = attributes[attribute]?.value != null
|
||||
|
||||
|
||||
private val changeCollector = VisionChangeBuilder()
|
||||
|
||||
public fun visionPropertyChanged(visionName: Name, propertyName: Name, item: Meta?) {
|
||||
changeCollector.propertyChanged(visionName, propertyName, item)
|
||||
}
|
||||
|
||||
public fun visionChanged(name: Name?, child: Vision?) {
|
||||
changeCollector.setChild(name, child)
|
||||
}
|
||||
|
||||
private fun renderVision(name: String, element: Element, vision: Vision?, outputMeta: Meta) {
|
||||
if (vision != null) {
|
||||
vision.setAsRoot(visionManager)
|
||||
@ -115,12 +126,13 @@ public class VisionClient : AbstractPlugin() {
|
||||
val feedbackAggregationTime = meta["aggregationTime"]?.int ?: 300
|
||||
|
||||
onopen = {
|
||||
feedbackJob = vision.flowChanges(
|
||||
feedbackAggregationTime.milliseconds,
|
||||
).onEach { change ->
|
||||
send(visionManager.encodeToString(change))
|
||||
}.launchIn(visionManager.context)
|
||||
|
||||
feedbackJob = visionManager.context.launch {
|
||||
delay(feedbackAggregationTime.milliseconds)
|
||||
if (!changeCollector.isEmpty()) {
|
||||
send(visionManager.encodeToString(changeCollector.deepCopy(visionManager)))
|
||||
changeCollector.reset()
|
||||
}
|
||||
}
|
||||
console.info("WebSocket update channel established for output '$name'")
|
||||
}
|
||||
|
||||
@ -165,6 +177,7 @@ public class VisionClient : AbstractPlugin() {
|
||||
logger.info { "Found embedded vision for output with name $name" }
|
||||
renderVision(name, element, embeddedVision, outputMeta)
|
||||
}
|
||||
|
||||
element.attributes[OUTPUT_FETCH_ATTRIBUTE] != null -> {
|
||||
val attr = element.attributes[OUTPUT_FETCH_ATTRIBUTE]!!
|
||||
|
||||
@ -192,6 +205,7 @@ public class VisionClient : AbstractPlugin() {
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
else -> error("No embedded vision data / fetch url for $name")
|
||||
}
|
||||
element.setAttribute(OUTPUT_RENDERED, "true")
|
||||
@ -204,7 +218,7 @@ public class VisionClient : AbstractPlugin() {
|
||||
) else super.content(target)
|
||||
|
||||
public companion object : PluginFactory<VisionClient> {
|
||||
override fun build(context: Context, meta: Meta): VisionClient = VisionClient()
|
||||
override fun build(context: Context, meta: Meta): VisionClient = VisionClient()
|
||||
|
||||
override val tag: PluginTag = PluginTag(name = "vision.client", group = PluginTag.DATAFORGE_GROUP)
|
||||
|
||||
|
@ -122,8 +122,10 @@ public class VisionServer internal constructor(
|
||||
|
||||
launch {
|
||||
incoming.consumeEach {
|
||||
val data = it.data.decodeToString()
|
||||
application.log.debug("Received update: \n$data")
|
||||
val change = visionManager.jsonFormat.decodeFromString(
|
||||
VisionChange.serializer(), it.data.decodeToString()
|
||||
VisionChange.serializer(),data
|
||||
)
|
||||
vision.update(change)
|
||||
}
|
||||
@ -136,6 +138,7 @@ public class VisionServer internal constructor(
|
||||
VisionChange.serializer(),
|
||||
update
|
||||
)
|
||||
application.log.debug("Sending update: \n$json")
|
||||
outgoing.send(Frame.Text(json))
|
||||
}.collect()
|
||||
}
|
||||
|
@ -6,6 +6,8 @@ import space.kscience.visionforge.solid.three.ThreePlugin
|
||||
|
||||
|
||||
@DFExperimental
|
||||
public fun main(): Unit = runVisionClient {
|
||||
plugin(ThreePlugin)
|
||||
public fun main(): Unit {
|
||||
runVisionClient {
|
||||
plugin(ThreePlugin)
|
||||
}
|
||||
}
|
Loading…
Reference in New Issue
Block a user