Fix Rsocket endpoint without filter. Add integration test with loop
This commit is contained in:
parent
4a10c3c443
commit
a66e411848
@ -19,6 +19,7 @@
|
|||||||
### Removed
|
### Removed
|
||||||
|
|
||||||
### Fixed
|
### Fixed
|
||||||
|
- Fix a problem with rsocket endpoint with no filter.
|
||||||
|
|
||||||
### Security
|
### Security
|
||||||
|
|
||||||
|
@ -7,7 +7,7 @@ plugins {
|
|||||||
|
|
||||||
allprojects {
|
allprojects {
|
||||||
group = "space.kscience"
|
group = "space.kscience"
|
||||||
version = "0.4.0-dev-2"
|
version = "0.4.0-dev-3"
|
||||||
repositories{
|
repositories{
|
||||||
maven("https://maven.pkg.jetbrains.space/spc/p/sci/dev")
|
maven("https://maven.pkg.jetbrains.space/spc/p/sci/dev")
|
||||||
}
|
}
|
||||||
|
@ -17,6 +17,7 @@ kscience {
|
|||||||
useSerialization {
|
useSerialization {
|
||||||
json()
|
json()
|
||||||
}
|
}
|
||||||
|
|
||||||
commonMain {
|
commonMain {
|
||||||
api(projects.magix.magixApi)
|
api(projects.magix.magixApi)
|
||||||
api(projects.controlsCore)
|
api(projects.controlsCore)
|
||||||
@ -25,6 +26,11 @@ kscience {
|
|||||||
|
|
||||||
jvmTest{
|
jvmTest{
|
||||||
implementation(spclibs.logback.classic)
|
implementation(spclibs.logback.classic)
|
||||||
|
implementation(projects.magix.magixServer)
|
||||||
|
implementation(projects.magix.magixRsocket)
|
||||||
|
implementation(spclibs.ktor.server.cio)
|
||||||
|
implementation(spclibs.ktor.server.websockets)
|
||||||
|
implementation(spclibs.ktor.client.cio)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -0,0 +1,60 @@
|
|||||||
|
package space.kscience.controls.client
|
||||||
|
|
||||||
|
import kotlinx.coroutines.Dispatchers
|
||||||
|
import kotlinx.coroutines.delay
|
||||||
|
import kotlinx.coroutines.launch
|
||||||
|
import kotlinx.coroutines.test.runTest
|
||||||
|
import kotlinx.coroutines.withContext
|
||||||
|
import space.kscience.controls.client.RemoteDeviceConnect.TestDevice
|
||||||
|
import space.kscience.controls.manager.DeviceManager
|
||||||
|
import space.kscience.controls.manager.install
|
||||||
|
import space.kscience.dataforge.context.Context
|
||||||
|
import space.kscience.dataforge.context.request
|
||||||
|
import space.kscience.magix.api.MagixEndpoint
|
||||||
|
import space.kscience.magix.rsocket.rSocketWithWebSockets
|
||||||
|
import space.kscience.magix.server.startMagixServer
|
||||||
|
import kotlin.test.Test
|
||||||
|
import kotlin.test.assertEquals
|
||||||
|
|
||||||
|
class MagixLoopTest {
|
||||||
|
|
||||||
|
@Test
|
||||||
|
fun deviceHub() = runTest {
|
||||||
|
val context = Context {
|
||||||
|
plugin(DeviceManager)
|
||||||
|
}
|
||||||
|
|
||||||
|
val server = context.startMagixServer()
|
||||||
|
|
||||||
|
val deviceManager = context.request(DeviceManager)
|
||||||
|
|
||||||
|
val deviceEndpoint = MagixEndpoint.rSocketWithWebSockets("localhost")
|
||||||
|
|
||||||
|
// deviceEndpoint.subscribe().onEach {
|
||||||
|
// println(it)
|
||||||
|
// }.launchIn(this)
|
||||||
|
|
||||||
|
deviceManager.launchMagixService(deviceEndpoint, "device")
|
||||||
|
|
||||||
|
launch {
|
||||||
|
delay(50)
|
||||||
|
repeat(10) {
|
||||||
|
deviceManager.install("test[$it]", TestDevice)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
val clientEndpoint = MagixEndpoint.rSocketWithWebSockets("localhost")
|
||||||
|
|
||||||
|
val remoteHub = clientEndpoint.remoteDeviceHub(context, "client", "device")
|
||||||
|
|
||||||
|
assertEquals(0, remoteHub.devices.size)
|
||||||
|
|
||||||
|
delay(60)
|
||||||
|
//switch context to use actual delay
|
||||||
|
withContext(Dispatchers.Default) {
|
||||||
|
clientEndpoint.requestDeviceUpdate("client", "device")
|
||||||
|
delay(60)
|
||||||
|
assertEquals(10, remoteHub.devices.size)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
@ -59,9 +59,12 @@ public class RSocketMagixFlowPlugin(
|
|||||||
RSocketRequestHandler(coroutineScope.coroutineContext) {
|
RSocketRequestHandler(coroutineScope.coroutineContext) {
|
||||||
//handler for request/stream
|
//handler for request/stream
|
||||||
requestStream { request: Payload ->
|
requestStream { request: Payload ->
|
||||||
val filter = MagixEndpoint.magixJson.decodeFromString(
|
val requestText = request.data.readText()
|
||||||
|
val filter = if(requestText.isBlank()) {
|
||||||
|
MagixMessageFilter.ALL
|
||||||
|
} else MagixEndpoint.magixJson.decodeFromString(
|
||||||
MagixMessageFilter.serializer(),
|
MagixMessageFilter.serializer(),
|
||||||
request.data.readText()
|
requestText
|
||||||
)
|
)
|
||||||
|
|
||||||
receive.filter(filter).map { message ->
|
receive.filter(filter).map { message ->
|
||||||
@ -89,12 +92,12 @@ public class RSocketMagixFlowPlugin(
|
|||||||
)
|
)
|
||||||
}.launchIn(this)
|
}.launchIn(this)
|
||||||
|
|
||||||
val filterText = request.use { it.data.readText() }
|
val filterText = request.data.readText()
|
||||||
|
|
||||||
val filter = if (filterText.isNotBlank()) {
|
val filter = if (filterText.isBlank()) {
|
||||||
MagixEndpoint.magixJson.decodeFromString(MagixMessageFilter.serializer(), filterText)
|
MagixMessageFilter.ALL
|
||||||
} else {
|
} else {
|
||||||
MagixMessageFilter()
|
MagixEndpoint.magixJson.decodeFromString(MagixMessageFilter.serializer(), filterText)
|
||||||
}
|
}
|
||||||
|
|
||||||
receive.filter(filter).map { message ->
|
receive.filter(filter).map { message ->
|
||||||
|
Loading…
Reference in New Issue
Block a user