From cfaeb964e724375b9ce7adf83c7ea9cc6bfa0832 Mon Sep 17 00:00:00 2001 From: Alexander Nozik Date: Sun, 5 Jun 2022 19:06:50 +0300 Subject: [PATCH] Add benchmark demo. Fix some issues with RSocket --- .../ru/mipt/npm/controls/ports/TcpPort.kt | 1 + demo/echo/build.gradle.kts | 32 +++++++ .../ru/mipt/npm/controls/demo/echo/main.kt | 85 ++++++++++++++++++ {motors => demo/motors}/build.gradle.kts | 0 .../C885T0002-TN-C-885.PIMotionMaster-EN.pdf | Bin .../pimotionmaster/PiMotionMasterApp.kt | 0 .../pimotionmaster/PiMotionMasterDevice.kt | 0 .../PiMotionMasterVirtualDevice.kt | 0 .../pimotionmaster/fxDeviceProperties.kt | 0 .../devices/pimotionmaster/piDebugServer.kt | 0 magix/magix-demo/src/main/kotlin/zmq.kt | 2 +- .../npm/magix/rsocket/RSocketMagixEndpoint.kt | 20 +++-- .../rsocket/RSocketStreamMagixEndpoint.kt | 57 ++++++++++++ .../ru/mipt/npm/magix/rsocket/withTcp.kt | 17 ++++ .../ru/mipt/npm/magix/server/magixModule.kt | 31 ++++--- .../kotlin/ru/mipt/npm/magix/server/server.kt | 8 +- .../magix-storage-mongo}/build.gradle.kts | 0 .../npm/controls/mongo/MongoEventStorage.kt | 0 .../ru/mipt/npm/magix/zmq/ZmqMagixEndpoint.kt | 15 ++-- settings.gradle.kts | 3 +- 20 files changed, 237 insertions(+), 34 deletions(-) create mode 100644 demo/echo/build.gradle.kts create mode 100644 demo/echo/src/main/kotlin/ru/mipt/npm/controls/demo/echo/main.kt rename {motors => demo/motors}/build.gradle.kts (100%) rename {motors => demo/motors}/docs/C885T0002-TN-C-885.PIMotionMaster-EN.pdf (100%) rename {motors => demo/motors}/src/main/kotlin/ru/mipt/npm/devices/pimotionmaster/PiMotionMasterApp.kt (100%) rename {motors => demo/motors}/src/main/kotlin/ru/mipt/npm/devices/pimotionmaster/PiMotionMasterDevice.kt (100%) rename {motors => demo/motors}/src/main/kotlin/ru/mipt/npm/devices/pimotionmaster/PiMotionMasterVirtualDevice.kt (100%) rename {motors => demo/motors}/src/main/kotlin/ru/mipt/npm/devices/pimotionmaster/fxDeviceProperties.kt (100%) rename {motors => demo/motors}/src/main/kotlin/ru/mipt/npm/devices/pimotionmaster/piDebugServer.kt (100%) create mode 100644 magix/magix-rsocket/src/commonMain/kotlin/ru/mipt/npm/magix/rsocket/RSocketStreamMagixEndpoint.kt rename {controls-mongo => magix/magix-storage/magix-storage-mongo}/build.gradle.kts (100%) rename {controls-mongo => magix/magix-storage/magix-storage-mongo}/src/main/kotlin/ru/mipt/npm/controls/mongo/MongoEventStorage.kt (100%) diff --git a/controls-core/src/jvmMain/kotlin/ru/mipt/npm/controls/ports/TcpPort.kt b/controls-core/src/jvmMain/kotlin/ru/mipt/npm/controls/ports/TcpPort.kt index cfd810b..da41a47 100644 --- a/controls-core/src/jvmMain/kotlin/ru/mipt/npm/controls/ports/TcpPort.kt +++ b/controls-core/src/jvmMain/kotlin/ru/mipt/npm/controls/ports/TcpPort.kt @@ -62,6 +62,7 @@ public class TcpPort private constructor( futureChannel.await().write(ByteBuffer.wrap(data)) } + @OptIn(ExperimentalCoroutinesApi::class) override fun close() { listenerJob.cancel() if(futureChannel.isCompleted){ diff --git a/demo/echo/build.gradle.kts b/demo/echo/build.gradle.kts new file mode 100644 index 0000000..e0cc749 --- /dev/null +++ b/demo/echo/build.gradle.kts @@ -0,0 +1,32 @@ +plugins { + kotlin("jvm") + application +} + +repositories { + mavenCentral() + maven("https://repo.kotlin.link") +} + +val ktorVersion: String by rootProject.extra +val rsocketVersion: String by rootProject.extra + +dependencies { + implementation(projects.magix.magixServer) + implementation(projects.magix.magixRsocket) + implementation(projects.magix.magixZmq) + implementation("io.ktor:ktor-client-cio:$ktorVersion") + + implementation("ch.qos.logback:logback-classic:1.2.11") +} + +tasks.withType().configureEach { + kotlinOptions { + jvmTarget = "11" + freeCompilerArgs = freeCompilerArgs + listOf("-Xjvm-default=all", "-Xopt-in=kotlin.RequiresOptIn") + } +} + +application { + mainClass.set("ru.mipt.npm.controls.demo.echo.MainKt") +} \ No newline at end of file diff --git a/demo/echo/src/main/kotlin/ru/mipt/npm/controls/demo/echo/main.kt b/demo/echo/src/main/kotlin/ru/mipt/npm/controls/demo/echo/main.kt new file mode 100644 index 0000000..31d33d5 --- /dev/null +++ b/demo/echo/src/main/kotlin/ru/mipt/npm/controls/demo/echo/main.kt @@ -0,0 +1,85 @@ +package ru.mipt.npm.controls.demo.echo + +import io.ktor.server.application.log +import kotlinx.coroutines.* +import kotlinx.coroutines.flow.launchIn +import kotlinx.coroutines.flow.onEach +import kotlinx.serialization.json.JsonObject +import ru.mipt.npm.magix.api.MagixEndpoint +import ru.mipt.npm.magix.api.MagixMessage +import ru.mipt.npm.magix.api.MagixMessageFilter +import ru.mipt.npm.magix.rsocket.rSocketStreamWithTcp +import ru.mipt.npm.magix.server.startMagixServer +import kotlin.time.ExperimentalTime +import kotlin.time.measureTime + +private suspend fun MagixEndpoint.collectEcho(scope: CoroutineScope, n: Int) { + val complete = CompletableDeferred() + + val responseIds = HashSet() + + scope.launch { + subscribe( + MagixMessageFilter( + origin = listOf("loop") + ) + ).collect { message -> + if (message.id?.endsWith(".response") == true) { + responseIds.add(message.parentId!!) + } + val parentId = message.parentId + if (parentId != null && parentId.toInt() >= n - 1) { + println("Losses ${(1 - responseIds.size.toDouble() / n) * 100}%") + complete.complete(true) + cancel() + } + } + } + + scope.launch { + repeat(n) { + if (it % 20 == 0) delay(1) + broadcast( + MagixMessage( + format = "test", + payload = JsonObject(emptyMap()), + origin = "test", + target = "loop", + id = it.toString() + ) + ) + } + } + + complete.await() + println("completed") +} + + +@OptIn(ExperimentalTime::class) +suspend fun main(): Unit = coroutineScope { + launch(Dispatchers.Default) { + val server = startMagixServer(enableRawRSocket = true, enableZmq = true) { flow -> + //echo each message + flow.onEach { message -> + if (message.parentId == null) { + val m = message.copy(origin = "loop", parentId = message.id, id = message.id + ".response") + log.info("echo: $m") + flow.emit(m) + } + }.launchIn(this) + } + + + val responseTime = measureTime { + MagixEndpoint.rSocketStreamWithTcp("localhost").use { + it.collectEcho(this, 5000) + } + } + + println(responseTime) + + server.stop(500, 500) + cancel() + } +} \ No newline at end of file diff --git a/motors/build.gradle.kts b/demo/motors/build.gradle.kts similarity index 100% rename from motors/build.gradle.kts rename to demo/motors/build.gradle.kts diff --git a/motors/docs/C885T0002-TN-C-885.PIMotionMaster-EN.pdf b/demo/motors/docs/C885T0002-TN-C-885.PIMotionMaster-EN.pdf similarity index 100% rename from motors/docs/C885T0002-TN-C-885.PIMotionMaster-EN.pdf rename to demo/motors/docs/C885T0002-TN-C-885.PIMotionMaster-EN.pdf diff --git a/motors/src/main/kotlin/ru/mipt/npm/devices/pimotionmaster/PiMotionMasterApp.kt b/demo/motors/src/main/kotlin/ru/mipt/npm/devices/pimotionmaster/PiMotionMasterApp.kt similarity index 100% rename from motors/src/main/kotlin/ru/mipt/npm/devices/pimotionmaster/PiMotionMasterApp.kt rename to demo/motors/src/main/kotlin/ru/mipt/npm/devices/pimotionmaster/PiMotionMasterApp.kt diff --git a/motors/src/main/kotlin/ru/mipt/npm/devices/pimotionmaster/PiMotionMasterDevice.kt b/demo/motors/src/main/kotlin/ru/mipt/npm/devices/pimotionmaster/PiMotionMasterDevice.kt similarity index 100% rename from motors/src/main/kotlin/ru/mipt/npm/devices/pimotionmaster/PiMotionMasterDevice.kt rename to demo/motors/src/main/kotlin/ru/mipt/npm/devices/pimotionmaster/PiMotionMasterDevice.kt diff --git a/motors/src/main/kotlin/ru/mipt/npm/devices/pimotionmaster/PiMotionMasterVirtualDevice.kt b/demo/motors/src/main/kotlin/ru/mipt/npm/devices/pimotionmaster/PiMotionMasterVirtualDevice.kt similarity index 100% rename from motors/src/main/kotlin/ru/mipt/npm/devices/pimotionmaster/PiMotionMasterVirtualDevice.kt rename to demo/motors/src/main/kotlin/ru/mipt/npm/devices/pimotionmaster/PiMotionMasterVirtualDevice.kt diff --git a/motors/src/main/kotlin/ru/mipt/npm/devices/pimotionmaster/fxDeviceProperties.kt b/demo/motors/src/main/kotlin/ru/mipt/npm/devices/pimotionmaster/fxDeviceProperties.kt similarity index 100% rename from motors/src/main/kotlin/ru/mipt/npm/devices/pimotionmaster/fxDeviceProperties.kt rename to demo/motors/src/main/kotlin/ru/mipt/npm/devices/pimotionmaster/fxDeviceProperties.kt diff --git a/motors/src/main/kotlin/ru/mipt/npm/devices/pimotionmaster/piDebugServer.kt b/demo/motors/src/main/kotlin/ru/mipt/npm/devices/pimotionmaster/piDebugServer.kt similarity index 100% rename from motors/src/main/kotlin/ru/mipt/npm/devices/pimotionmaster/piDebugServer.kt rename to demo/motors/src/main/kotlin/ru/mipt/npm/devices/pimotionmaster/piDebugServer.kt diff --git a/magix/magix-demo/src/main/kotlin/zmq.kt b/magix/magix-demo/src/main/kotlin/zmq.kt index 4672275..c8c4b45 100644 --- a/magix/magix-demo/src/main/kotlin/zmq.kt +++ b/magix/magix-demo/src/main/kotlin/zmq.kt @@ -44,7 +44,7 @@ suspend fun main(): Unit = coroutineScope { logger.info("Starting client") //Create zmq magix endpoint and wait for to finish - ZmqMagixEndpoint("tcp://localhost").use { client -> + ZmqMagixEndpoint("localhost","tcp").use { client -> logger.info("Starting subscription") client.subscribe().onEach { println(it.payload) diff --git a/magix/magix-rsocket/src/commonMain/kotlin/ru/mipt/npm/magix/rsocket/RSocketMagixEndpoint.kt b/magix/magix-rsocket/src/commonMain/kotlin/ru/mipt/npm/magix/rsocket/RSocketMagixEndpoint.kt index 4b05e6f..255b5bb 100644 --- a/magix/magix-rsocket/src/commonMain/kotlin/ru/mipt/npm/magix/rsocket/RSocketMagixEndpoint.kt +++ b/magix/magix-rsocket/src/commonMain/kotlin/ru/mipt/npm/magix/rsocket/RSocketMagixEndpoint.kt @@ -2,6 +2,7 @@ package ru.mipt.npm.magix.rsocket import io.ktor.client.HttpClient import io.ktor.client.plugins.websocket.WebSockets +import io.ktor.utils.io.core.Closeable import io.rsocket.kotlin.RSocket import io.rsocket.kotlin.core.RSocketConnector import io.rsocket.kotlin.core.RSocketConnectorBuilder @@ -9,13 +10,10 @@ import io.rsocket.kotlin.ktor.client.RSocketSupport import io.rsocket.kotlin.ktor.client.rSocket import io.rsocket.kotlin.payload.buildPayload import io.rsocket.kotlin.payload.data -import kotlinx.coroutines.CoroutineDispatcher -import kotlinx.coroutines.Dispatchers -import kotlinx.coroutines.Job +import kotlinx.coroutines.* import kotlinx.coroutines.flow.Flow import kotlinx.coroutines.flow.flowOn import kotlinx.coroutines.flow.map -import kotlinx.coroutines.withContext import kotlinx.serialization.encodeToString import ru.mipt.npm.magix.api.MagixEndpoint import ru.mipt.npm.magix.api.MagixMessage @@ -27,7 +25,7 @@ import kotlin.coroutines.coroutineContext public class RSocketMagixEndpoint( private val rSocket: RSocket, private val coroutineContext: CoroutineContext, -) : MagixEndpoint { +) : MagixEndpoint, Closeable { override fun subscribe( filter: MagixMessageFilter, @@ -39,11 +37,15 @@ public class RSocketMagixEndpoint( }.filter(filter).flowOn(coroutineContext[CoroutineDispatcher] ?: Dispatchers.Unconfined) } - override suspend fun broadcast(message: MagixMessage) { - withContext(coroutineContext) { - val payload = buildPayload { data(MagixEndpoint.magixJson.encodeToString(MagixMessage.serializer(), message)) } - rSocket.fireAndForget(payload) + override suspend fun broadcast(message: MagixMessage): Unit = withContext(coroutineContext) { + val payload = buildPayload { + data(MagixEndpoint.magixJson.encodeToString(MagixMessage.serializer(), message)) } + rSocket.fireAndForget(payload) + } + + override fun close() { + rSocket.cancel() } public companion object diff --git a/magix/magix-rsocket/src/commonMain/kotlin/ru/mipt/npm/magix/rsocket/RSocketStreamMagixEndpoint.kt b/magix/magix-rsocket/src/commonMain/kotlin/ru/mipt/npm/magix/rsocket/RSocketStreamMagixEndpoint.kt new file mode 100644 index 0000000..4b0745e --- /dev/null +++ b/magix/magix-rsocket/src/commonMain/kotlin/ru/mipt/npm/magix/rsocket/RSocketStreamMagixEndpoint.kt @@ -0,0 +1,57 @@ +package ru.mipt.npm.magix.rsocket + +import io.ktor.utils.io.core.Closeable +import io.rsocket.kotlin.RSocket +import io.rsocket.kotlin.payload.Payload +import io.rsocket.kotlin.payload.buildPayload +import io.rsocket.kotlin.payload.data +import kotlinx.coroutines.CoroutineDispatcher +import kotlinx.coroutines.Dispatchers +import kotlinx.coroutines.cancel +import kotlinx.coroutines.flow.Flow +import kotlinx.coroutines.flow.MutableSharedFlow +import kotlinx.coroutines.flow.flowOn +import kotlinx.coroutines.flow.map +import ru.mipt.npm.magix.api.MagixEndpoint +import ru.mipt.npm.magix.api.MagixMessage +import ru.mipt.npm.magix.api.MagixMessageFilter +import ru.mipt.npm.magix.api.filter +import kotlin.coroutines.CoroutineContext + +/** + * RSocket endpoint based on established channel + */ +public class RSocketStreamMagixEndpoint( + private val rSocket: RSocket, + private val coroutineContext: CoroutineContext, +) : MagixEndpoint, Closeable { + + private val output: MutableSharedFlow = MutableSharedFlow() + + private val input: Flow by lazy { + rSocket.requestChannel( + Payload.Empty, + output.map { message -> + buildPayload { + data(MagixEndpoint.magixJson.encodeToString(MagixMessage.serializer(), message)) + } + }.flowOn(coroutineContext[CoroutineDispatcher] ?: Dispatchers.Unconfined) + ) + } + + override fun subscribe( + filter: MagixMessageFilter, + ): Flow { + return input.map { + MagixEndpoint.magixJson.decodeFromString(MagixMessage.serializer(), it.data.readText()) + }.filter(filter).flowOn(coroutineContext[CoroutineDispatcher] ?: Dispatchers.Unconfined) + } + + override suspend fun broadcast(message: MagixMessage): Unit { + output.emit(message) + } + + override fun close() { + rSocket.cancel() + } +} \ No newline at end of file diff --git a/magix/magix-rsocket/src/jvmMain/kotlin/ru/mipt/npm/magix/rsocket/withTcp.kt b/magix/magix-rsocket/src/jvmMain/kotlin/ru/mipt/npm/magix/rsocket/withTcp.kt index b42ee4f..5d76532 100644 --- a/magix/magix-rsocket/src/jvmMain/kotlin/ru/mipt/npm/magix/rsocket/withTcp.kt +++ b/magix/magix-rsocket/src/jvmMain/kotlin/ru/mipt/npm/magix/rsocket/withTcp.kt @@ -25,3 +25,20 @@ public suspend fun MagixEndpoint.Companion.rSocketWithTcp( return RSocketMagixEndpoint(rSocket, coroutineContext) } + + +public suspend fun MagixEndpoint.Companion.rSocketStreamWithTcp( + host: String, + port: Int = DEFAULT_MAGIX_RAW_PORT, + tcpConfig: SocketOptions.TCPClientSocketOptions.() -> Unit = {}, + rSocketConfig: RSocketConnectorBuilder.ConnectionConfigBuilder.() -> Unit = {}, +): RSocketStreamMagixEndpoint { + val transport = TcpClientTransport( + hostname = host, + port = port, + configure = tcpConfig + ) + val rSocket = buildConnector(rSocketConfig).connect(transport) + + return RSocketStreamMagixEndpoint(rSocket, coroutineContext) +} \ No newline at end of file diff --git a/magix/magix-server/src/main/kotlin/ru/mipt/npm/magix/server/magixModule.kt b/magix/magix-server/src/main/kotlin/ru/mipt/npm/magix/server/magixModule.kt index 70d683f..6cbf00a 100644 --- a/magix/magix-server/src/main/kotlin/ru/mipt/npm/magix/server/magixModule.kt +++ b/magix/magix-server/src/main/kotlin/ru/mipt/npm/magix/server/magixModule.kt @@ -5,10 +5,7 @@ import io.ktor.server.application.* import io.ktor.server.html.respondHtml import io.ktor.server.plugins.contentnegotiation.ContentNegotiation import io.ktor.server.request.receive -import io.ktor.server.routing.get -import io.ktor.server.routing.post -import io.ktor.server.routing.route -import io.ktor.server.routing.routing +import io.ktor.server.routing.* import io.ktor.server.util.getValue import io.ktor.server.websocket.WebSockets import io.rsocket.kotlin.ConnectionAcceptor @@ -21,7 +18,6 @@ import io.rsocket.kotlin.payload.data import kotlinx.coroutines.CoroutineScope import kotlinx.coroutines.flow.* import kotlinx.html.* -import kotlinx.serialization.decodeFromString import kotlinx.serialization.encodeToString import ru.mipt.npm.magix.api.MagixEndpoint.Companion.magixJson import ru.mipt.npm.magix.api.MagixMessage @@ -30,27 +26,36 @@ import ru.mipt.npm.magix.api.filter import java.util.* -internal fun CoroutineScope.magixAcceptor(magixFlow: MutableSharedFlow) = ConnectionAcceptor { - RSocketRequestHandler { +internal fun CoroutineScope.magixAcceptor( + magixFlow: MutableSharedFlow, +): ConnectionAcceptor = ConnectionAcceptor { + RSocketRequestHandler(coroutineContext) { //handler for request/stream requestStream { request: Payload -> val filter = magixJson.decodeFromString(MagixMessageFilter.serializer(), request.data.readText()) magixFlow.filter(filter).map { message -> - val string = magixJson.encodeToString(message) + val string = magixJson.encodeToString(MagixMessage.serializer(), message) buildPayload { data(string) } } } + //single send fireAndForget { request: Payload -> - val message = magixJson.decodeFromString(request.data.readText()) + val message = magixJson.decodeFromString(MagixMessage.serializer(), request.data.readText()) magixFlow.emit(message) } // bi-directional connection requestChannel { request: Payload, input: Flow -> input.onEach { - magixFlow.emit(magixJson.decodeFromString(it.data.readText())) - }.launchIn(this@magixAcceptor) + magixFlow.emit(magixJson.decodeFromString(MagixMessage.serializer(), it.data.readText())) + }.launchIn(this) - val filter = magixJson.decodeFromString(MagixMessageFilter.serializer(), request.data.readText()) + val filterText = request.data.readText() + + val filter = if(filterText.isNotBlank()){ + magixJson.decodeFromString(MagixMessageFilter.serializer(), filterText) + } else { + MagixMessageFilter() + } magixFlow.filter(filter).map { message -> val string = magixJson.encodeToString(message) @@ -144,7 +149,7 @@ public fun Application.magixModule(magixFlow: MutableSharedFlow, r magixFlow.emit(message) } //rSocket server. Filter from Payload - rSocket("rsocket", acceptor = this@magixModule.magixAcceptor(magixFlow)) + rSocket("rsocket", acceptor = application.magixAcceptor(magixFlow)) } } } diff --git a/magix/magix-server/src/main/kotlin/ru/mipt/npm/magix/server/server.kt b/magix/magix-server/src/main/kotlin/ru/mipt/npm/magix/server/server.kt index dd4091b..539f439 100644 --- a/magix/magix-server/src/main/kotlin/ru/mipt/npm/magix/server/server.kt +++ b/magix/magix-server/src/main/kotlin/ru/mipt/npm/magix/server/server.kt @@ -9,6 +9,7 @@ import io.rsocket.kotlin.transport.ktor.tcp.TcpServer import io.rsocket.kotlin.transport.ktor.tcp.TcpServerTransport import kotlinx.coroutines.CoroutineScope import kotlinx.coroutines.Job +import kotlinx.coroutines.channels.BufferOverflow import kotlinx.coroutines.flow.MutableSharedFlow import org.slf4j.LoggerFactory import ru.mipt.npm.magix.api.MagixEndpoint @@ -39,15 +40,16 @@ public fun CoroutineScope.launchMagixServerRawRSocket( */ public fun CoroutineScope.startMagixServer( port: Int = DEFAULT_MAGIX_HTTP_PORT, - buffer: Int = 100, + buffer: Int = 1000, enableRawRSocket: Boolean = true, enableZmq: Boolean = true, applicationConfiguration: Application.(MutableSharedFlow) -> Unit = {}, ): ApplicationEngine { val logger = LoggerFactory.getLogger("magix-server") val magixFlow = MutableSharedFlow( - buffer, - extraBufferCapacity = buffer + replay = buffer, + extraBufferCapacity = buffer, + onBufferOverflow = BufferOverflow.DROP_OLDEST ) if (enableRawRSocket) { diff --git a/controls-mongo/build.gradle.kts b/magix/magix-storage/magix-storage-mongo/build.gradle.kts similarity index 100% rename from controls-mongo/build.gradle.kts rename to magix/magix-storage/magix-storage-mongo/build.gradle.kts diff --git a/controls-mongo/src/main/kotlin/ru/mipt/npm/controls/mongo/MongoEventStorage.kt b/magix/magix-storage/magix-storage-mongo/src/main/kotlin/ru/mipt/npm/controls/mongo/MongoEventStorage.kt similarity index 100% rename from controls-mongo/src/main/kotlin/ru/mipt/npm/controls/mongo/MongoEventStorage.kt rename to magix/magix-storage/magix-storage-mongo/src/main/kotlin/ru/mipt/npm/controls/mongo/MongoEventStorage.kt diff --git a/magix/magix-zmq/src/main/kotlin/ru/mipt/npm/magix/zmq/ZmqMagixEndpoint.kt b/magix/magix-zmq/src/main/kotlin/ru/mipt/npm/magix/zmq/ZmqMagixEndpoint.kt index 56846e9..b621a69 100644 --- a/magix/magix-zmq/src/main/kotlin/ru/mipt/npm/magix/zmq/ZmqMagixEndpoint.kt +++ b/magix/magix-zmq/src/main/kotlin/ru/mipt/npm/magix/zmq/ZmqMagixEndpoint.kt @@ -4,8 +4,6 @@ import kotlinx.coroutines.* import kotlinx.coroutines.flow.Flow import kotlinx.coroutines.flow.channelFlow import kotlinx.coroutines.flow.flowOn -import kotlinx.serialization.decodeFromString -import kotlinx.serialization.encodeToString import org.zeromq.SocketType import org.zeromq.ZContext import org.zeromq.ZMQ @@ -19,6 +17,7 @@ import kotlin.coroutines.coroutineContext public class ZmqMagixEndpoint( private val host: String, + private val protocol: String, private val pubPort: Int = MagixEndpoint.DEFAULT_MAGIX_ZMQ_PUB_PORT, private val pullPort: Int = MagixEndpoint.DEFAULT_MAGIX_ZMQ_PULL_PORT, private val coroutineContext: CoroutineContext = Dispatchers.IO, @@ -28,7 +27,7 @@ public class ZmqMagixEndpoint( @OptIn(ExperimentalCoroutinesApi::class) override fun subscribe(filter: MagixMessageFilter): Flow { val socket = zmqContext.createSocket(SocketType.SUB) - socket.connect("$host:$pubPort") + socket.connect("$protocol://$host:$pubPort") socket.subscribe("") return channelFlow { @@ -40,7 +39,7 @@ public class ZmqMagixEndpoint( //This is a blocking call. val string: String? = socket.recvStr() if (string != null) { - val message = MagixEndpoint.magixJson.decodeFromString(string) + val message = MagixEndpoint.magixJson.decodeFromString(MagixMessage.serializer(), string) send(message) } } catch (t: Throwable) { @@ -58,12 +57,12 @@ public class ZmqMagixEndpoint( private val publishSocket by lazy { zmqContext.createSocket(SocketType.PUSH).apply { - connect("$host:$pullPort") + connect("$protocol://$host:$pullPort") } } override suspend fun broadcast(message: MagixMessage): Unit = withContext(coroutineContext) { - val string = MagixEndpoint.magixJson.encodeToString(message) + val string = MagixEndpoint.magixJson.encodeToString(MagixMessage.serializer(), message) publishSocket.send(string) } @@ -72,12 +71,14 @@ public class ZmqMagixEndpoint( } } -public suspend fun MagixEndpoint.Companion.zmq( +public suspend fun MagixEndpoint.Companion.zmq( host: String, + protocol: String = "tcp", pubPort: Int = DEFAULT_MAGIX_ZMQ_PUB_PORT, pullPort: Int = DEFAULT_MAGIX_ZMQ_PULL_PORT, ): ZmqMagixEndpoint = ZmqMagixEndpoint( host, + protocol, pubPort, pullPort, coroutineContext = coroutineContext diff --git a/settings.gradle.kts b/settings.gradle.kts index 40e069f..e587c96 100644 --- a/settings.gradle.kts +++ b/settings.gradle.kts @@ -57,7 +57,8 @@ include( // ":magix:magix-storage", ":magix:magix-storage:magix-storage-xodus", ":controls-magix-client", - ":motors", ":demo:all-things", ":demo:car", + ":demo:motors", + ":demo:echo" )