From 4a10c3c44345b115d33612a4b3c6f44fcaa0223d Mon Sep 17 00:00:00 2001 From: Alexander Nozik Date: Sun, 19 May 2024 18:50:56 +0300 Subject: [PATCH] Add test for remote hub --- .../space/kscience/controls/api/DeviceHub.kt | 5 + .../kscience/controls/client/DeviceClient.kt | 55 ++++++++-- .../controls/client/RemoteDeviceConnect.kt | 100 ++++++++++++------ 3 files changed, 122 insertions(+), 38 deletions(-) diff --git a/controls-core/src/commonMain/kotlin/space/kscience/controls/api/DeviceHub.kt b/controls-core/src/commonMain/kotlin/space/kscience/controls/api/DeviceHub.kt index 077585b..e2f6331 100644 --- a/controls-core/src/commonMain/kotlin/space/kscience/controls/api/DeviceHub.kt +++ b/controls-core/src/commonMain/kotlin/space/kscience/controls/api/DeviceHub.kt @@ -26,6 +26,11 @@ public interface DeviceHub : Provider { public companion object } +public fun DeviceHub(deviceMap: Map): DeviceHub = object : DeviceHub { + override val devices: Map + get() = deviceMap +} + /** * List all devices, including sub-devices */ diff --git a/controls-magix/src/commonMain/kotlin/space/kscience/controls/client/DeviceClient.kt b/controls-magix/src/commonMain/kotlin/space/kscience/controls/client/DeviceClient.kt index 9418791..859bce2 100644 --- a/controls-magix/src/commonMain/kotlin/space/kscience/controls/client/DeviceClient.kt +++ b/controls-magix/src/commonMain/kotlin/space/kscience/controls/client/DeviceClient.kt @@ -163,23 +163,64 @@ private class MapBasedDeviceHub(val deviceMap: Map, val prefix: Na } -public fun MagixEndpoint.remoteDeviceHub( +/** + * Create a dynamic [DeviceHub] from incoming messages + */ +public suspend fun MagixEndpoint.remoteDeviceHub( context: Context, thisEndpoint: String, deviceEndpoint: String, ): DeviceHub { val devices = mutableMapOf() val subscription = subscribe(DeviceManager.magixFormat, originFilter = listOf(deviceEndpoint)).map { it.second } - subscription.filterIsInstance().onEach { - + subscription.filterIsInstance().onEach { descriptionMessage -> + devices.getOrPut(descriptionMessage.sourceDevice) { + DeviceClient( + context = context, + deviceName = descriptionMessage.sourceDevice, + propertyDescriptors = descriptionMessage.properties, + actionDescriptors = descriptionMessage.actions, + incomingFlow = subscription + ) { + send( + format = DeviceManager.magixFormat, + payload = it, + source = thisEndpoint, + target = deviceEndpoint, + id = stringUID() + ) + } + }.run { + propertyDescriptors = descriptionMessage.properties + } }.launchIn(context) - return object : DeviceHub { - override val devices: Map - get() = TODO("Not yet implemented") + send( + format = DeviceManager.magixFormat, + payload = GetDescriptionMessage(targetDevice = null), + source = thisEndpoint, + target = deviceEndpoint, + id = stringUID() + ) - } + return DeviceHub(devices) +} + +/** + * Request a description update for all devices on an endpoint + */ +public suspend fun MagixEndpoint.requestDeviceUpdate( + thisEndpoint: String, + deviceEndpoint: String, +) { + send( + format = DeviceManager.magixFormat, + payload = GetDescriptionMessage(), + source = thisEndpoint, + target = deviceEndpoint, + id = stringUID() + ) } /** diff --git a/controls-magix/src/commonTest/kotlin/space/kscience/controls/client/RemoteDeviceConnect.kt b/controls-magix/src/commonTest/kotlin/space/kscience/controls/client/RemoteDeviceConnect.kt index db80848..3c707b2 100644 --- a/controls-magix/src/commonTest/kotlin/space/kscience/controls/client/RemoteDeviceConnect.kt +++ b/controls-magix/src/commonTest/kotlin/space/kscience/controls/client/RemoteDeviceConnect.kt @@ -1,15 +1,21 @@ package space.kscience.controls.client +import kotlinx.coroutines.Dispatchers +import kotlinx.coroutines.delay import kotlinx.coroutines.flow.Flow import kotlinx.coroutines.flow.MutableSharedFlow import kotlinx.coroutines.flow.map import kotlinx.coroutines.flow.merge +import kotlinx.coroutines.launch import kotlinx.coroutines.test.runTest +import kotlinx.coroutines.withContext import kotlinx.serialization.json.Json +import space.kscience.controls.api.DeviceHub import space.kscience.controls.api.DeviceMessage import space.kscience.controls.manager.DeviceManager +import space.kscience.controls.manager.hubMessageFlow import space.kscience.controls.manager.install -import space.kscience.controls.manager.respondMessage +import space.kscience.controls.manager.respondHubMessage import space.kscience.controls.spec.* import space.kscience.dataforge.context.Context import space.kscience.dataforge.context.Factory @@ -17,15 +23,43 @@ import space.kscience.dataforge.context.request import space.kscience.dataforge.meta.Meta import space.kscience.dataforge.meta.get import space.kscience.dataforge.meta.int -import space.kscience.dataforge.names.Name +import space.kscience.dataforge.names.asName import space.kscience.magix.api.MagixEndpoint import space.kscience.magix.api.MagixMessage import space.kscience.magix.api.MagixMessageFilter import kotlin.random.Random import kotlin.test.Test import kotlin.test.assertContains +import kotlin.test.assertEquals import kotlin.time.Duration.Companion.milliseconds +class VirtualMagixEndpoint(val hub: DeviceHub) : MagixEndpoint { + + private val additionalMessages = MutableSharedFlow(1) + + override fun subscribe( + filter: MagixMessageFilter, + ): Flow = merge(hub.hubMessageFlow(), additionalMessages).map { + MagixMessage( + format = DeviceManager.magixFormat.defaultFormat, + payload = MagixEndpoint.magixJson.encodeToJsonElement(DeviceManager.magixFormat.serializer, it), + sourceEndpoint = "device", + ) + } + + override suspend fun broadcast(message: MagixMessage) { + hub.respondHubMessage( + Json.decodeFromJsonElement(DeviceManager.magixFormat.serializer, message.payload) + ).forEach { + additionalMessages.emit(it) + } + } + + override fun close() { + // + } +} + internal class RemoteDeviceConnect { @@ -53,40 +87,44 @@ internal class RemoteDeviceConnect { val context = Context { plugin(DeviceManager) } + val deviceManager = context.request(DeviceManager) - val device = context.request(DeviceManager).install("test", TestDevice) + deviceManager.install("test", TestDevice) - val virtualMagixEndpoint = object : MagixEndpoint { + val virtualMagixEndpoint = VirtualMagixEndpoint(deviceManager) - - private val additionalMessages = MutableSharedFlow(1) - - override fun subscribe( - filter: MagixMessageFilter, - ): Flow = merge(device.messageFlow, additionalMessages).map { - MagixMessage( - format = DeviceManager.magixFormat.defaultFormat, - payload = MagixEndpoint.magixJson.encodeToJsonElement(DeviceManager.magixFormat.serializer, it), - sourceEndpoint = "device", - ) - } - - override suspend fun broadcast(message: MagixMessage) { - device.respondMessage( - Name.EMPTY, - Json.decodeFromJsonElement(DeviceManager.magixFormat.serializer, message.payload) - )?.let { - additionalMessages.emit(it) - } - } - - override fun close() { - // - } - } - val remoteDevice = virtualMagixEndpoint.remoteDevice(context, "client", "device", Name.EMPTY) + val remoteDevice = virtualMagixEndpoint.remoteDevice(context, "client", "device", "test".asName()) assertContains(0.0..1.0, remoteDevice.read(TestDevice.value)) } + + @Test + fun deviceHub() = runTest { + val context = Context { + plugin(DeviceManager) + } + val deviceManager = context.request(DeviceManager) + + launch { + delay(50) + repeat(10) { + deviceManager.install("test[$it]", TestDevice) + } + } + + val virtualMagixEndpoint = VirtualMagixEndpoint(deviceManager) + + val remoteHub = virtualMagixEndpoint.remoteDeviceHub(context, "client", "device") + + assertEquals(0, remoteHub.devices.size) + + delay(60) + //switch context to use actual delay + withContext(Dispatchers.Default) { + virtualMagixEndpoint.requestDeviceUpdate("client", "device") + delay(30) + assertEquals(10, remoteHub.devices.size) + } + } } \ No newline at end of file