From a66e411848663c9c652250aa241d40c4befe19d7 Mon Sep 17 00:00:00 2001 From: Alexander Nozik Date: Tue, 21 May 2024 09:44:45 +0300 Subject: [PATCH] Fix Rsocket endpoint without filter. Add integration test with loop --- CHANGELOG.md | 1 + build.gradle.kts | 2 +- controls-magix/build.gradle.kts | 6 ++ .../kscience/controls/client/MagixLoopTest.kt | 60 +++++++++++++++++++ .../magix/server/RSocketMagixFlowPlugin.kt | 15 +++-- 5 files changed, 77 insertions(+), 7 deletions(-) create mode 100644 controls-magix/src/jvmTest/kotlin/space/kscience/controls/client/MagixLoopTest.kt diff --git a/CHANGELOG.md b/CHANGELOG.md index 2c7ea2f..5a1dad9 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -19,6 +19,7 @@ ### Removed ### Fixed +- Fix a problem with rsocket endpoint with no filter. ### Security diff --git a/build.gradle.kts b/build.gradle.kts index 0f00407..6423c3c 100644 --- a/build.gradle.kts +++ b/build.gradle.kts @@ -7,7 +7,7 @@ plugins { allprojects { group = "space.kscience" - version = "0.4.0-dev-2" + version = "0.4.0-dev-3" repositories{ maven("https://maven.pkg.jetbrains.space/spc/p/sci/dev") } diff --git a/controls-magix/build.gradle.kts b/controls-magix/build.gradle.kts index ef4a3af..ea03a7e 100644 --- a/controls-magix/build.gradle.kts +++ b/controls-magix/build.gradle.kts @@ -17,6 +17,7 @@ kscience { useSerialization { json() } + commonMain { api(projects.magix.magixApi) api(projects.controlsCore) @@ -25,6 +26,11 @@ kscience { jvmTest{ implementation(spclibs.logback.classic) + implementation(projects.magix.magixServer) + implementation(projects.magix.magixRsocket) + implementation(spclibs.ktor.server.cio) + implementation(spclibs.ktor.server.websockets) + implementation(spclibs.ktor.client.cio) } } diff --git a/controls-magix/src/jvmTest/kotlin/space/kscience/controls/client/MagixLoopTest.kt b/controls-magix/src/jvmTest/kotlin/space/kscience/controls/client/MagixLoopTest.kt new file mode 100644 index 0000000..764dcad --- /dev/null +++ b/controls-magix/src/jvmTest/kotlin/space/kscience/controls/client/MagixLoopTest.kt @@ -0,0 +1,60 @@ +package space.kscience.controls.client + +import kotlinx.coroutines.Dispatchers +import kotlinx.coroutines.delay +import kotlinx.coroutines.launch +import kotlinx.coroutines.test.runTest +import kotlinx.coroutines.withContext +import space.kscience.controls.client.RemoteDeviceConnect.TestDevice +import space.kscience.controls.manager.DeviceManager +import space.kscience.controls.manager.install +import space.kscience.dataforge.context.Context +import space.kscience.dataforge.context.request +import space.kscience.magix.api.MagixEndpoint +import space.kscience.magix.rsocket.rSocketWithWebSockets +import space.kscience.magix.server.startMagixServer +import kotlin.test.Test +import kotlin.test.assertEquals + +class MagixLoopTest { + + @Test + fun deviceHub() = runTest { + val context = Context { + plugin(DeviceManager) + } + + val server = context.startMagixServer() + + val deviceManager = context.request(DeviceManager) + + val deviceEndpoint = MagixEndpoint.rSocketWithWebSockets("localhost") + +// deviceEndpoint.subscribe().onEach { +// println(it) +// }.launchIn(this) + + deviceManager.launchMagixService(deviceEndpoint, "device") + + launch { + delay(50) + repeat(10) { + deviceManager.install("test[$it]", TestDevice) + } + } + + val clientEndpoint = MagixEndpoint.rSocketWithWebSockets("localhost") + + val remoteHub = clientEndpoint.remoteDeviceHub(context, "client", "device") + + assertEquals(0, remoteHub.devices.size) + + delay(60) + //switch context to use actual delay + withContext(Dispatchers.Default) { + clientEndpoint.requestDeviceUpdate("client", "device") + delay(60) + assertEquals(10, remoteHub.devices.size) + } + } +} \ No newline at end of file diff --git a/magix/magix-server/src/jvmMain/kotlin/space/kscience/magix/server/RSocketMagixFlowPlugin.kt b/magix/magix-server/src/jvmMain/kotlin/space/kscience/magix/server/RSocketMagixFlowPlugin.kt index a00c33f..699a281 100644 --- a/magix/magix-server/src/jvmMain/kotlin/space/kscience/magix/server/RSocketMagixFlowPlugin.kt +++ b/magix/magix-server/src/jvmMain/kotlin/space/kscience/magix/server/RSocketMagixFlowPlugin.kt @@ -59,9 +59,12 @@ public class RSocketMagixFlowPlugin( RSocketRequestHandler(coroutineScope.coroutineContext) { //handler for request/stream requestStream { request: Payload -> - val filter = MagixEndpoint.magixJson.decodeFromString( + val requestText = request.data.readText() + val filter = if(requestText.isBlank()) { + MagixMessageFilter.ALL + } else MagixEndpoint.magixJson.decodeFromString( MagixMessageFilter.serializer(), - request.data.readText() + requestText ) receive.filter(filter).map { message -> @@ -89,12 +92,12 @@ public class RSocketMagixFlowPlugin( ) }.launchIn(this) - val filterText = request.use { it.data.readText() } + val filterText = request.data.readText() - val filter = if (filterText.isNotBlank()) { - MagixEndpoint.magixJson.decodeFromString(MagixMessageFilter.serializer(), filterText) + val filter = if (filterText.isBlank()) { + MagixMessageFilter.ALL } else { - MagixMessageFilter() + MagixEndpoint.magixJson.decodeFromString(MagixMessageFilter.serializer(), filterText) } receive.filter(filter).map { message ->