From 4ceffef67a02f23f9739cb3d45c6dd818e5cd373 Mon Sep 17 00:00:00 2001 From: Alexander Nozik Date: Mon, 21 Nov 2022 13:28:39 +0300 Subject: [PATCH] Update connection logic --- demo/playground/notebooks/dynamic-demo.ipynb | 4 +- .../kscience/visionforge/VisionChange.kt | 11 +- .../kscience/visionforge/VisionClient.kt | 168 ++++++++++-------- 3 files changed, 102 insertions(+), 81 deletions(-) diff --git a/demo/playground/notebooks/dynamic-demo.ipynb b/demo/playground/notebooks/dynamic-demo.ipynb index ff7ca7ea..7200d15b 100644 --- a/demo/playground/notebooks/dynamic-demo.ipynb +++ b/demo/playground/notebooks/dynamic-demo.ipynb @@ -4,10 +4,10 @@ "cell_type": "code", "execution_count": null, "metadata": { + "tags": [], "pycharm": { "is_executing": true - }, - "tags": [] + } }, "outputs": [], "source": [ diff --git a/visionforge-core/src/commonMain/kotlin/space/kscience/visionforge/VisionChange.kt b/visionforge-core/src/commonMain/kotlin/space/kscience/visionforge/VisionChange.kt index e95dbd1b..1fbc33bc 100644 --- a/visionforge-core/src/commonMain/kotlin/space/kscience/visionforge/VisionChange.kt +++ b/visionforge-core/src/commonMain/kotlin/space/kscience/visionforge/VisionChange.kt @@ -148,9 +148,12 @@ private fun CoroutineScope.collectChange( /** * Generate a flow of changes of this vision and its children + * + * @param sendInitial if true, send the initial vision state as first change */ public fun Vision.flowChanges( collectionDuration: Duration, + sendInitial: Boolean = false ): Flow = flow { val manager = manager ?: error("Orphan vision could not collect changes") coroutineScope { @@ -158,9 +161,11 @@ public fun Vision.flowChanges( val mutex = Mutex() collectChange(Name.EMPTY, this@flowChanges, mutex, collector) - //Send initial vision state - val initialChange = VisionChange(vision = deepCopy(manager)) - emit(initialChange) + if(sendInitial) { + //Send initial vision state + val initialChange = VisionChange(vision = deepCopy(manager)) + emit(initialChange) + } while (true) { //Wait for changes to accumulate diff --git a/visionforge-core/src/jsMain/kotlin/space/kscience/visionforge/VisionClient.kt b/visionforge-core/src/jsMain/kotlin/space/kscience/visionforge/VisionClient.kt index a84a5a42..81655335 100644 --- a/visionforge-core/src/jsMain/kotlin/space/kscience/visionforge/VisionClient.kt +++ b/visionforge-core/src/jsMain/kotlin/space/kscience/visionforge/VisionClient.kt @@ -45,7 +45,7 @@ public class VisionClient : AbstractPlugin() { return attribute?.value } - private val renderers by lazy { context.gather(ElementVisionRenderer.TYPE).values } + internal val renderers by lazy { context.gather(ElementVisionRenderer.TYPE).values } private fun findRendererFor(vision: Vision): ElementVisionRenderer? = renderers.mapNotNull { val rating = it.rateVision(vision) @@ -71,76 +71,77 @@ public class VisionClient : AbstractPlugin() { changeCollector.setChild(name, child) } - private fun renderVision(name: String, element: Element, vision: Vision?, outputMeta: Meta) { - if (vision != null) { - vision.setAsRoot(visionManager) - val renderer = findRendererFor(vision) - ?: error("Could not find renderer for ${vision::class}") - renderer.render(element, vision, outputMeta) + private fun renderVision(element: Element, vision: Vision, outputMeta: Meta) { + vision.setAsRoot(visionManager) + val renderer = findRendererFor(vision) ?: error("Could not find renderer for ${vision::class}") + renderer.render(element, vision, outputMeta) + } - element.attributes[OUTPUT_CONNECT_ATTRIBUTE]?.let { attr -> - val wsUrl = if (attr.value.isBlank() || attr.value == VisionTagConsumer.AUTO_DATA_ATTRIBUTE) { - val endpoint = resolveEndpoint(element) - logger.info { "Vision server is resolved to $endpoint" } - URL(endpoint).apply { - pathname += "/ws" + private fun updateVision(name: String, element: Element, vision: Vision?, outputMeta: Meta) { + element.attributes[OUTPUT_CONNECT_ATTRIBUTE]?.let { attr -> + val wsUrl = if (attr.value.isBlank() || attr.value == VisionTagConsumer.AUTO_DATA_ATTRIBUTE) { + val endpoint = resolveEndpoint(element) + logger.info { "Vision server is resolved to $endpoint" } + URL(endpoint).apply { + pathname += "/ws" + } + } else { + URL(attr.value) + }.apply { + protocol = "ws" + searchParams.append("name", name) + } + + logger.info { "Updating vision data from $wsUrl" } + + //Individual websocket for this element + WebSocket(wsUrl.toString()).apply { + onmessage = { messageEvent -> + val stringData: String? = messageEvent.data as? String + if (stringData != null) { + val change: VisionChange = visionManager.jsonFormat.decodeFromString( + VisionChange.serializer(), + stringData + ) + + // If change contains root vision replacement, do it + change.vision?.let { vision -> + renderVision(element, vision, outputMeta) + } + + logger.debug { "Got update $change for output with name $name" } + if (vision == null) error("Can't update vision because it is not loaded.") + vision.update(change) + } else { + logger.error { "WebSocket message data is not a string" } } - } else { - URL(attr.value) - }.apply { - protocol = "ws" - searchParams.append("name", name) } - logger.info { "Updating vision data from $wsUrl" } - //Individual websocket for this element - WebSocket(wsUrl.toString()).apply { - onmessage = { messageEvent -> - val stringData: String? = messageEvent.data as? String - if (stringData != null) { - val change: VisionChange = visionManager.jsonFormat.decodeFromString( - VisionChange.serializer(), - stringData - ) + //Backward change propagation + var feedbackJob: Job? = null - if (change.vision != null) { - renderer.render(element, vision, outputMeta) - } + //Feedback changes aggregation time in milliseconds + val feedbackAggregationTime = meta["aggregationTime"]?.int ?: 300 - logger.debug { "Got update $change for output with name $name" } - vision.update(change) - } else { - console.error("WebSocket message data is not a string") + onopen = { + feedbackJob = visionManager.context.launch { + delay(feedbackAggregationTime.milliseconds) + if (!changeCollector.isEmpty()) { + send(visionManager.encodeToString(changeCollector.deepCopy(visionManager))) + changeCollector.reset() } } + logger.info { "WebSocket update channel established for output '$name'" } + } - - //Backward change propagation - var feedbackJob: Job? = null - - //Feedback changes aggregation time in milliseconds - val feedbackAggregationTime = meta["aggregationTime"]?.int ?: 300 - - onopen = { - feedbackJob = visionManager.context.launch { - delay(feedbackAggregationTime.milliseconds) - if (!changeCollector.isEmpty()) { - send(visionManager.encodeToString(changeCollector.deepCopy(visionManager))) - changeCollector.reset() - } - } - console.info("WebSocket update channel established for output '$name'") - } - - onclose = { - feedbackJob?.cancel() - console.info("WebSocket update channel closed for output '$name'") - } - onerror = { - feedbackJob?.cancel() - console.error("WebSocket update channel error for output '$name'") - } + onclose = { + feedbackJob?.cancel() + logger.info { "WebSocket update channel closed for output '$name'" } + } + onerror = { + feedbackJob?.cancel() + logger.error { "WebSocket update channel error for output '$name'" } } } } @@ -164,17 +165,8 @@ public class VisionClient : AbstractPlugin() { VisionManager.defaultJson.decodeFromString(MetaSerializer, it) } ?: Meta.EMPTY - //Trying to render embedded vision - val embeddedVision = element.getEmbeddedData(VisionTagConsumer.OUTPUT_DATA_CLASS)?.let { - visionManager.decodeFromString(it) - } - when { - embeddedVision != null -> { - logger.info { "Found embedded vision for output with name $name" } - renderVision(name, element, embeddedVision, outputMeta) - } - + // fetch data if path is provided element.attributes[OUTPUT_FETCH_ATTRIBUTE] != null -> { val attr = element.attributes[OUTPUT_FETCH_ATTRIBUTE]!! @@ -195,7 +187,8 @@ public class VisionClient : AbstractPlugin() { if (response.ok) { response.text().then { text -> val vision = visionManager.decodeFromString(text) - renderVision(name, element, vision, outputMeta) + renderVision(element, vision, outputMeta) + updateVision(name, element, vision, outputMeta) } } else { logger.error { "Failed to fetch initial vision state from $fetchUrl" } @@ -203,6 +196,22 @@ public class VisionClient : AbstractPlugin() { } } + // use embedded data if it is available + element.getElementsByClassName(VisionTagConsumer.OUTPUT_DATA_CLASS).length > 0 -> { + //Getting embedded vision data + val embeddedVision = element.getEmbeddedData(VisionTagConsumer.OUTPUT_DATA_CLASS)!!.let { + visionManager.decodeFromString(it) + } + logger.info { "Found embedded vision for output with name $name" } + renderVision(element, embeddedVision, outputMeta) + updateVision(name, element, embeddedVision, outputMeta) + } + + //Try to load vision via websocket + element.attributes[OUTPUT_CONNECT_ATTRIBUTE] != null -> { + updateVision(name, element, null, outputMeta) + } + else -> error("No embedded vision data / fetch url for $name") } element.setAttribute(OUTPUT_RENDERED, "true") @@ -237,7 +246,7 @@ private fun whenDocumentLoaded(block: Document.() -> Unit): Unit { */ public fun VisionClient.renderAllVisionsIn(element: Element) { val elements = element.getElementsByClassName(VisionTagConsumer.OUTPUT_CLASS) - console.info("Finished search for outputs. Found ${elements.length} items") + logger.info { "Finished search for outputs. Found ${elements.length} items" } elements.asList().forEach { child -> renderVisionIn(child) } @@ -251,7 +260,7 @@ public fun VisionClient.renderAllVisionsById(id: String): Unit = whenDocumentLoa if (element != null) { renderAllVisionsIn(element) } else { - console.warn("Element with id $id not found") + logger.warn { "Element with id $id not found" } } } @@ -268,7 +277,14 @@ public class VisionClientApplication(public val context: Context) : Application private val client = context.fetch(VisionClient) override fun start(document: Document, state: Map) { - console.info("Starting Vision Client") + context.logger.info { + "Starting VisionClient with renderers: ${ + client.renderers.joinToString( + prefix = "\n\t", + separator = "\n\t" + ) { it.name.toString() } + }" + } val element = document.body ?: error("Document does not have a body") client.renderAllVisionsIn(element) } @@ -279,7 +295,7 @@ public class VisionClientApplication(public val context: Context) : Application * Create a vision client context and render all visions on the page. */ public fun runVisionClient(contextBuilder: ContextBuilder.() -> Unit) { - console.info("Starting VisionForge context") + Global.logger.info { "Starting VisionForge context" } val context = Context("VisionForge") { plugin(VisionClient)