Add test for remote hub

This commit is contained in:
Alexander Nozik 2024-05-19 18:50:56 +03:00
parent 207064cd45
commit 4a10c3c443
3 changed files with 122 additions and 38 deletions

View File

@ -26,6 +26,11 @@ public interface DeviceHub : Provider {
public companion object
}
public fun DeviceHub(deviceMap: Map<Name, Device>): DeviceHub = object : DeviceHub {
override val devices: Map<Name, Device>
get() = deviceMap
}
/**
* List all devices, including sub-devices
*/

View File

@ -163,23 +163,64 @@ private class MapBasedDeviceHub(val deviceMap: Map<Name, Device>, val prefix: Na
}
public fun MagixEndpoint.remoteDeviceHub(
/**
* Create a dynamic [DeviceHub] from incoming messages
*/
public suspend fun MagixEndpoint.remoteDeviceHub(
context: Context,
thisEndpoint: String,
deviceEndpoint: String,
): DeviceHub {
val devices = mutableMapOf<Name, DeviceClient>()
val subscription = subscribe(DeviceManager.magixFormat, originFilter = listOf(deviceEndpoint)).map { it.second }
subscription.filterIsInstance<DescriptionMessage>().onEach {
subscription.filterIsInstance<DescriptionMessage>().onEach { descriptionMessage ->
devices.getOrPut(descriptionMessage.sourceDevice) {
DeviceClient(
context = context,
deviceName = descriptionMessage.sourceDevice,
propertyDescriptors = descriptionMessage.properties,
actionDescriptors = descriptionMessage.actions,
incomingFlow = subscription
) {
send(
format = DeviceManager.magixFormat,
payload = it,
source = thisEndpoint,
target = deviceEndpoint,
id = stringUID()
)
}
}.run {
propertyDescriptors = descriptionMessage.properties
}
}.launchIn(context)
return object : DeviceHub {
override val devices: Map<Name, Device>
get() = TODO("Not yet implemented")
send(
format = DeviceManager.magixFormat,
payload = GetDescriptionMessage(targetDevice = null),
source = thisEndpoint,
target = deviceEndpoint,
id = stringUID()
)
}
return DeviceHub(devices)
}
/**
* Request a description update for all devices on an endpoint
*/
public suspend fun MagixEndpoint.requestDeviceUpdate(
thisEndpoint: String,
deviceEndpoint: String,
) {
send(
format = DeviceManager.magixFormat,
payload = GetDescriptionMessage(),
source = thisEndpoint,
target = deviceEndpoint,
id = stringUID()
)
}
/**

View File

@ -1,15 +1,21 @@
package space.kscience.controls.client
import kotlinx.coroutines.Dispatchers
import kotlinx.coroutines.delay
import kotlinx.coroutines.flow.Flow
import kotlinx.coroutines.flow.MutableSharedFlow
import kotlinx.coroutines.flow.map
import kotlinx.coroutines.flow.merge
import kotlinx.coroutines.launch
import kotlinx.coroutines.test.runTest
import kotlinx.coroutines.withContext
import kotlinx.serialization.json.Json
import space.kscience.controls.api.DeviceHub
import space.kscience.controls.api.DeviceMessage
import space.kscience.controls.manager.DeviceManager
import space.kscience.controls.manager.hubMessageFlow
import space.kscience.controls.manager.install
import space.kscience.controls.manager.respondMessage
import space.kscience.controls.manager.respondHubMessage
import space.kscience.controls.spec.*
import space.kscience.dataforge.context.Context
import space.kscience.dataforge.context.Factory
@ -17,15 +23,43 @@ import space.kscience.dataforge.context.request
import space.kscience.dataforge.meta.Meta
import space.kscience.dataforge.meta.get
import space.kscience.dataforge.meta.int
import space.kscience.dataforge.names.Name
import space.kscience.dataforge.names.asName
import space.kscience.magix.api.MagixEndpoint
import space.kscience.magix.api.MagixMessage
import space.kscience.magix.api.MagixMessageFilter
import kotlin.random.Random
import kotlin.test.Test
import kotlin.test.assertContains
import kotlin.test.assertEquals
import kotlin.time.Duration.Companion.milliseconds
class VirtualMagixEndpoint(val hub: DeviceHub) : MagixEndpoint {
private val additionalMessages = MutableSharedFlow<DeviceMessage>(1)
override fun subscribe(
filter: MagixMessageFilter,
): Flow<MagixMessage> = merge(hub.hubMessageFlow(), additionalMessages).map {
MagixMessage(
format = DeviceManager.magixFormat.defaultFormat,
payload = MagixEndpoint.magixJson.encodeToJsonElement(DeviceManager.magixFormat.serializer, it),
sourceEndpoint = "device",
)
}
override suspend fun broadcast(message: MagixMessage) {
hub.respondHubMessage(
Json.decodeFromJsonElement(DeviceManager.magixFormat.serializer, message.payload)
).forEach {
additionalMessages.emit(it)
}
}
override fun close() {
//
}
}
internal class RemoteDeviceConnect {
@ -53,40 +87,44 @@ internal class RemoteDeviceConnect {
val context = Context {
plugin(DeviceManager)
}
val deviceManager = context.request(DeviceManager)
val device = context.request(DeviceManager).install("test", TestDevice)
deviceManager.install("test", TestDevice)
val virtualMagixEndpoint = object : MagixEndpoint {
val virtualMagixEndpoint = VirtualMagixEndpoint(deviceManager)
private val additionalMessages = MutableSharedFlow<DeviceMessage>(1)
override fun subscribe(
filter: MagixMessageFilter,
): Flow<MagixMessage> = merge(device.messageFlow, additionalMessages).map {
MagixMessage(
format = DeviceManager.magixFormat.defaultFormat,
payload = MagixEndpoint.magixJson.encodeToJsonElement(DeviceManager.magixFormat.serializer, it),
sourceEndpoint = "device",
)
}
override suspend fun broadcast(message: MagixMessage) {
device.respondMessage(
Name.EMPTY,
Json.decodeFromJsonElement(DeviceManager.magixFormat.serializer, message.payload)
)?.let {
additionalMessages.emit(it)
}
}
override fun close() {
//
}
}
val remoteDevice = virtualMagixEndpoint.remoteDevice(context, "client", "device", Name.EMPTY)
val remoteDevice = virtualMagixEndpoint.remoteDevice(context, "client", "device", "test".asName())
assertContains(0.0..1.0, remoteDevice.read(TestDevice.value))
}
@Test
fun deviceHub() = runTest {
val context = Context {
plugin(DeviceManager)
}
val deviceManager = context.request(DeviceManager)
launch {
delay(50)
repeat(10) {
deviceManager.install("test[$it]", TestDevice)
}
}
val virtualMagixEndpoint = VirtualMagixEndpoint(deviceManager)
val remoteHub = virtualMagixEndpoint.remoteDeviceHub(context, "client", "device")
assertEquals(0, remoteHub.devices.size)
delay(60)
//switch context to use actual delay
withContext(Dispatchers.Default) {
virtualMagixEndpoint.requestDeviceUpdate("client", "device")
delay(30)
assertEquals(10, remoteHub.devices.size)
}
}
}