Update connection logic

This commit is contained in:
Alexander Nozik 2022-11-21 13:28:39 +03:00
parent 279b848039
commit 4ceffef67a
No known key found for this signature in database
GPG Key ID: F7FCF2DD25C71357
3 changed files with 102 additions and 81 deletions

View File

@ -4,10 +4,10 @@
"cell_type": "code", "cell_type": "code",
"execution_count": null, "execution_count": null,
"metadata": { "metadata": {
"tags": [],
"pycharm": { "pycharm": {
"is_executing": true "is_executing": true
}, }
"tags": []
}, },
"outputs": [], "outputs": [],
"source": [ "source": [

View File

@ -148,9 +148,12 @@ private fun CoroutineScope.collectChange(
/** /**
* Generate a flow of changes of this vision and its children * Generate a flow of changes of this vision and its children
*
* @param sendInitial if true, send the initial vision state as first change
*/ */
public fun Vision.flowChanges( public fun Vision.flowChanges(
collectionDuration: Duration, collectionDuration: Duration,
sendInitial: Boolean = false
): Flow<VisionChange> = flow { ): Flow<VisionChange> = flow {
val manager = manager ?: error("Orphan vision could not collect changes") val manager = manager ?: error("Orphan vision could not collect changes")
coroutineScope { coroutineScope {
@ -158,9 +161,11 @@ public fun Vision.flowChanges(
val mutex = Mutex() val mutex = Mutex()
collectChange(Name.EMPTY, this@flowChanges, mutex, collector) collectChange(Name.EMPTY, this@flowChanges, mutex, collector)
//Send initial vision state if(sendInitial) {
val initialChange = VisionChange(vision = deepCopy(manager)) //Send initial vision state
emit(initialChange) val initialChange = VisionChange(vision = deepCopy(manager))
emit(initialChange)
}
while (true) { while (true) {
//Wait for changes to accumulate //Wait for changes to accumulate

View File

@ -45,7 +45,7 @@ public class VisionClient : AbstractPlugin() {
return attribute?.value return attribute?.value
} }
private val renderers by lazy { context.gather<ElementVisionRenderer>(ElementVisionRenderer.TYPE).values } internal val renderers by lazy { context.gather<ElementVisionRenderer>(ElementVisionRenderer.TYPE).values }
private fun findRendererFor(vision: Vision): ElementVisionRenderer? = renderers.mapNotNull { private fun findRendererFor(vision: Vision): ElementVisionRenderer? = renderers.mapNotNull {
val rating = it.rateVision(vision) val rating = it.rateVision(vision)
@ -71,76 +71,77 @@ public class VisionClient : AbstractPlugin() {
changeCollector.setChild(name, child) changeCollector.setChild(name, child)
} }
private fun renderVision(name: String, element: Element, vision: Vision?, outputMeta: Meta) { private fun renderVision(element: Element, vision: Vision, outputMeta: Meta) {
if (vision != null) { vision.setAsRoot(visionManager)
vision.setAsRoot(visionManager) val renderer = findRendererFor(vision) ?: error("Could not find renderer for ${vision::class}")
val renderer = findRendererFor(vision) renderer.render(element, vision, outputMeta)
?: error("Could not find renderer for ${vision::class}") }
renderer.render(element, vision, outputMeta)
element.attributes[OUTPUT_CONNECT_ATTRIBUTE]?.let { attr -> private fun updateVision(name: String, element: Element, vision: Vision?, outputMeta: Meta) {
val wsUrl = if (attr.value.isBlank() || attr.value == VisionTagConsumer.AUTO_DATA_ATTRIBUTE) { element.attributes[OUTPUT_CONNECT_ATTRIBUTE]?.let { attr ->
val endpoint = resolveEndpoint(element) val wsUrl = if (attr.value.isBlank() || attr.value == VisionTagConsumer.AUTO_DATA_ATTRIBUTE) {
logger.info { "Vision server is resolved to $endpoint" } val endpoint = resolveEndpoint(element)
URL(endpoint).apply { logger.info { "Vision server is resolved to $endpoint" }
pathname += "/ws" URL(endpoint).apply {
pathname += "/ws"
}
} else {
URL(attr.value)
}.apply {
protocol = "ws"
searchParams.append("name", name)
}
logger.info { "Updating vision data from $wsUrl" }
//Individual websocket for this element
WebSocket(wsUrl.toString()).apply {
onmessage = { messageEvent ->
val stringData: String? = messageEvent.data as? String
if (stringData != null) {
val change: VisionChange = visionManager.jsonFormat.decodeFromString(
VisionChange.serializer(),
stringData
)
// If change contains root vision replacement, do it
change.vision?.let { vision ->
renderVision(element, vision, outputMeta)
}
logger.debug { "Got update $change for output with name $name" }
if (vision == null) error("Can't update vision because it is not loaded.")
vision.update(change)
} else {
logger.error { "WebSocket message data is not a string" }
} }
} else {
URL(attr.value)
}.apply {
protocol = "ws"
searchParams.append("name", name)
} }
logger.info { "Updating vision data from $wsUrl" }
//Individual websocket for this element //Backward change propagation
WebSocket(wsUrl.toString()).apply { var feedbackJob: Job? = null
onmessage = { messageEvent ->
val stringData: String? = messageEvent.data as? String
if (stringData != null) {
val change: VisionChange = visionManager.jsonFormat.decodeFromString(
VisionChange.serializer(),
stringData
)
if (change.vision != null) { //Feedback changes aggregation time in milliseconds
renderer.render(element, vision, outputMeta) val feedbackAggregationTime = meta["aggregationTime"]?.int ?: 300
}
logger.debug { "Got update $change for output with name $name" } onopen = {
vision.update(change) feedbackJob = visionManager.context.launch {
} else { delay(feedbackAggregationTime.milliseconds)
console.error("WebSocket message data is not a string") if (!changeCollector.isEmpty()) {
send(visionManager.encodeToString(changeCollector.deepCopy(visionManager)))
changeCollector.reset()
} }
} }
logger.info { "WebSocket update channel established for output '$name'" }
}
onclose = {
//Backward change propagation feedbackJob?.cancel()
var feedbackJob: Job? = null logger.info { "WebSocket update channel closed for output '$name'" }
}
//Feedback changes aggregation time in milliseconds onerror = {
val feedbackAggregationTime = meta["aggregationTime"]?.int ?: 300 feedbackJob?.cancel()
logger.error { "WebSocket update channel error for output '$name'" }
onopen = {
feedbackJob = visionManager.context.launch {
delay(feedbackAggregationTime.milliseconds)
if (!changeCollector.isEmpty()) {
send(visionManager.encodeToString(changeCollector.deepCopy(visionManager)))
changeCollector.reset()
}
}
console.info("WebSocket update channel established for output '$name'")
}
onclose = {
feedbackJob?.cancel()
console.info("WebSocket update channel closed for output '$name'")
}
onerror = {
feedbackJob?.cancel()
console.error("WebSocket update channel error for output '$name'")
}
} }
} }
} }
@ -164,17 +165,8 @@ public class VisionClient : AbstractPlugin() {
VisionManager.defaultJson.decodeFromString(MetaSerializer, it) VisionManager.defaultJson.decodeFromString(MetaSerializer, it)
} ?: Meta.EMPTY } ?: Meta.EMPTY
//Trying to render embedded vision
val embeddedVision = element.getEmbeddedData(VisionTagConsumer.OUTPUT_DATA_CLASS)?.let {
visionManager.decodeFromString(it)
}
when { when {
embeddedVision != null -> { // fetch data if path is provided
logger.info { "Found embedded vision for output with name $name" }
renderVision(name, element, embeddedVision, outputMeta)
}
element.attributes[OUTPUT_FETCH_ATTRIBUTE] != null -> { element.attributes[OUTPUT_FETCH_ATTRIBUTE] != null -> {
val attr = element.attributes[OUTPUT_FETCH_ATTRIBUTE]!! val attr = element.attributes[OUTPUT_FETCH_ATTRIBUTE]!!
@ -195,7 +187,8 @@ public class VisionClient : AbstractPlugin() {
if (response.ok) { if (response.ok) {
response.text().then { text -> response.text().then { text ->
val vision = visionManager.decodeFromString(text) val vision = visionManager.decodeFromString(text)
renderVision(name, element, vision, outputMeta) renderVision(element, vision, outputMeta)
updateVision(name, element, vision, outputMeta)
} }
} else { } else {
logger.error { "Failed to fetch initial vision state from $fetchUrl" } logger.error { "Failed to fetch initial vision state from $fetchUrl" }
@ -203,6 +196,22 @@ public class VisionClient : AbstractPlugin() {
} }
} }
// use embedded data if it is available
element.getElementsByClassName(VisionTagConsumer.OUTPUT_DATA_CLASS).length > 0 -> {
//Getting embedded vision data
val embeddedVision = element.getEmbeddedData(VisionTagConsumer.OUTPUT_DATA_CLASS)!!.let {
visionManager.decodeFromString(it)
}
logger.info { "Found embedded vision for output with name $name" }
renderVision(element, embeddedVision, outputMeta)
updateVision(name, element, embeddedVision, outputMeta)
}
//Try to load vision via websocket
element.attributes[OUTPUT_CONNECT_ATTRIBUTE] != null -> {
updateVision(name, element, null, outputMeta)
}
else -> error("No embedded vision data / fetch url for $name") else -> error("No embedded vision data / fetch url for $name")
} }
element.setAttribute(OUTPUT_RENDERED, "true") element.setAttribute(OUTPUT_RENDERED, "true")
@ -237,7 +246,7 @@ private fun whenDocumentLoaded(block: Document.() -> Unit): Unit {
*/ */
public fun VisionClient.renderAllVisionsIn(element: Element) { public fun VisionClient.renderAllVisionsIn(element: Element) {
val elements = element.getElementsByClassName(VisionTagConsumer.OUTPUT_CLASS) val elements = element.getElementsByClassName(VisionTagConsumer.OUTPUT_CLASS)
console.info("Finished search for outputs. Found ${elements.length} items") logger.info { "Finished search for outputs. Found ${elements.length} items" }
elements.asList().forEach { child -> elements.asList().forEach { child ->
renderVisionIn(child) renderVisionIn(child)
} }
@ -251,7 +260,7 @@ public fun VisionClient.renderAllVisionsById(id: String): Unit = whenDocumentLoa
if (element != null) { if (element != null) {
renderAllVisionsIn(element) renderAllVisionsIn(element)
} else { } else {
console.warn("Element with id $id not found") logger.warn { "Element with id $id not found" }
} }
} }
@ -268,7 +277,14 @@ public class VisionClientApplication(public val context: Context) : Application
private val client = context.fetch(VisionClient) private val client = context.fetch(VisionClient)
override fun start(document: Document, state: Map<String, Any>) { override fun start(document: Document, state: Map<String, Any>) {
console.info("Starting Vision Client") context.logger.info {
"Starting VisionClient with renderers: ${
client.renderers.joinToString(
prefix = "\n\t",
separator = "\n\t"
) { it.name.toString() }
}"
}
val element = document.body ?: error("Document does not have a body") val element = document.body ?: error("Document does not have a body")
client.renderAllVisionsIn(element) client.renderAllVisionsIn(element)
} }
@ -279,7 +295,7 @@ public class VisionClientApplication(public val context: Context) : Application
* Create a vision client context and render all visions on the page. * Create a vision client context and render all visions on the page.
*/ */
public fun runVisionClient(contextBuilder: ContextBuilder.() -> Unit) { public fun runVisionClient(contextBuilder: ContextBuilder.() -> Unit) {
console.info("Starting VisionForge context") Global.logger.info { "Starting VisionForge context" }
val context = Context("VisionForge") { val context = Context("VisionForge") {
plugin(VisionClient) plugin(VisionClient)