From 2a386568f98c067d67931eb6b350e7258cbcc317 Mon Sep 17 00:00:00 2001 From: Alexander Nozik Date: Sat, 25 Feb 2023 17:45:09 +0300 Subject: [PATCH] Refactor magix server flow plugins --- .../controls/server/deviceWebServer.kt | 35 +++++---- .../controls/demo/DemoControllerView.kt | 4 +- .../controls/demo/car/VirtualCarController.kt | 4 +- .../space/kscience/controls/demo/echo/main.kt | 10 ++- demo/magix-demo/src/main/kotlin/zmq.kt | 11 ++- .../kscience/magix/api/MagixFlowPlugin.kt | 9 +++ .../kscience/magix/server/RSocketMagix.kt | 74 +++++++++++++++++++ .../space/kscience/magix/server/ZmqMagix.kt | 53 +++++++++++++ .../kscience/magix/server/magixModule.kt | 52 +------------ .../space/kscience/magix/server/server.kt | 56 ++------------ .../magix/server/zmqMagixServerSocket.kt | 48 ------------ 11 files changed, 180 insertions(+), 176 deletions(-) create mode 100644 magix/magix-api/src/commonMain/kotlin/space/kscience/magix/api/MagixFlowPlugin.kt create mode 100644 magix/magix-server/src/main/kotlin/space/kscience/magix/server/RSocketMagix.kt create mode 100644 magix/magix-server/src/main/kotlin/space/kscience/magix/server/ZmqMagix.kt delete mode 100644 magix/magix-server/src/main/kotlin/space/kscience/magix/server/zmqMagixServerSocket.kt diff --git a/controls-server/src/main/kotlin/space/kscience/controls/server/deviceWebServer.kt b/controls-server/src/main/kotlin/space/kscience/controls/server/deviceWebServer.kt index 434aaa0..596758c 100644 --- a/controls-server/src/main/kotlin/space/kscience/controls/server/deviceWebServer.kt +++ b/controls-server/src/main/kotlin/space/kscience/controls/server/deviceWebServer.kt @@ -36,8 +36,8 @@ import space.kscience.dataforge.meta.toMeta import space.kscience.dataforge.names.Name import space.kscience.dataforge.names.asName import space.kscience.magix.api.MagixEndpoint +import space.kscience.magix.api.MagixFlowPlugin import space.kscience.magix.api.MagixMessage -import space.kscience.magix.server.launchMagixServerRawRSocket import space.kscience.magix.server.magixModule /** @@ -47,26 +47,23 @@ public fun CoroutineScope.startDeviceServer( manager: DeviceManager, port: Int = MagixEndpoint.DEFAULT_MAGIX_HTTP_PORT, host: String = "localhost", -): ApplicationEngine { - - return this.embeddedServer(CIO, port, host) { - install(WebSockets) +): ApplicationEngine = embeddedServer(CIO, port, host) { + install(WebSockets) // install(CORS) { // anyHost() // } - install(StatusPages) { - exception { call, cause -> - call.respond(HttpStatusCode.BadRequest, cause.message ?: "") - } + install(StatusPages) { + exception { call, cause -> + call.respond(HttpStatusCode.BadRequest, cause.message ?: "") } - deviceManagerModule(manager) - routing { - get("/") { - call.respondRedirect("/dashboard") - } + } + deviceManagerModule(manager) + routing { + get("/") { + call.respondRedirect("/dashboard") } - }.start() -} + } +}.start() public fun ApplicationEngine.whenStarted(callback: Application.() -> Unit) { environment.monitor.subscribe(ApplicationStarted, callback) @@ -77,9 +74,9 @@ public val WEB_SERVER_TARGET: Name = "@webServer".asName() public fun Application.deviceManagerModule( manager: DeviceManager, + vararg plugins: MagixFlowPlugin, deviceNames: Collection = manager.devices.keys.map { it.toString() }, route: String = "/", - rawSocketPort: Int = MagixEndpoint.DEFAULT_MAGIX_RAW_PORT, buffer: Int = 100, ) { if (pluginOrNull(WebSockets) == null) { @@ -217,6 +214,8 @@ public fun Application.deviceManagerModule( extraBufferCapacity = buffer ) - launchMagixServerRawRSocket(magixFlow, rawSocketPort) + plugins.forEach { + it.start(this, magixFlow) + } magixModule(magixFlow) } \ No newline at end of file diff --git a/demo/all-things/src/main/kotlin/space/kscience/controls/demo/DemoControllerView.kt b/demo/all-things/src/main/kotlin/space/kscience/controls/demo/DemoControllerView.kt index cefe834..d2cd259 100644 --- a/demo/all-things/src/main/kotlin/space/kscience/controls/demo/DemoControllerView.kt +++ b/demo/all-things/src/main/kotlin/space/kscience/controls/demo/DemoControllerView.kt @@ -21,6 +21,8 @@ import space.kscience.dataforge.context.* import space.kscience.magix.api.MagixEndpoint import space.kscience.magix.rsocket.rSocketWithTcp import space.kscience.magix.rsocket.rSocketWithWebSockets +import space.kscience.magix.server.RSocketMagix +import space.kscience.magix.server.ZmqMagix import space.kscience.magix.server.startMagixServer import tornadofx.* import java.awt.Desktop @@ -49,7 +51,7 @@ class DemoController : Controller(), ContextAware { context.launch { device = deviceManager.install("demo", DemoDevice) //starting magix event loop - magixServer = startMagixServer(enableRawRSocket = true, enableZmq = true) + magixServer = startMagixServer(RSocketMagix(), ZmqMagix()) //Launch device client and connect it to the server val deviceEndpoint = MagixEndpoint.rSocketWithTcp("localhost") deviceManager.connectToMagix(deviceEndpoint) diff --git a/demo/car/src/main/kotlin/space/kscience/controls/demo/car/VirtualCarController.kt b/demo/car/src/main/kotlin/space/kscience/controls/demo/car/VirtualCarController.kt index 5e6651e..fc78bee 100644 --- a/demo/car/src/main/kotlin/space/kscience/controls/demo/car/VirtualCarController.kt +++ b/demo/car/src/main/kotlin/space/kscience/controls/demo/car/VirtualCarController.kt @@ -18,6 +18,8 @@ import space.kscience.dataforge.context.* import space.kscience.dataforge.meta.Meta import space.kscience.magix.api.MagixEndpoint import space.kscience.magix.rsocket.rSocketWithTcp +import space.kscience.magix.server.RSocketMagix +import space.kscience.magix.server.ZmqMagix import space.kscience.magix.server.startMagixServer import space.kscience.magix.storage.xodus.storeInXodus import tornadofx.* @@ -47,7 +49,7 @@ class VirtualCarController : Controller(), ContextAware { virtualCar = deviceManager.install("virtual-car", VirtualCar) //starting magix event loop and connect it to entity store - magixServer = startMagixServer(enableRawRSocket = true, enableZmq = true) + magixServer = startMagixServer(RSocketMagix(), ZmqMagix()) storageEndpoint = MagixEndpoint.rSocketWithTcp("localhost").apply { storeInXodus(this@launch, magixEntityStorePath) diff --git a/demo/echo/src/main/kotlin/space/kscience/controls/demo/echo/main.kt b/demo/echo/src/main/kotlin/space/kscience/controls/demo/echo/main.kt index a02c044..3dbb43d 100644 --- a/demo/echo/src/main/kotlin/space/kscience/controls/demo/echo/main.kt +++ b/demo/echo/src/main/kotlin/space/kscience/controls/demo/echo/main.kt @@ -1,11 +1,12 @@ package space.kscience.controls.demo.echo -import io.ktor.server.application.log import kotlinx.coroutines.* import kotlinx.coroutines.flow.launchIn import kotlinx.coroutines.flow.onEach import kotlinx.serialization.json.JsonObject +import org.slf4j.LoggerFactory import space.kscience.magix.api.MagixEndpoint +import space.kscience.magix.api.MagixFlowPlugin import space.kscience.magix.api.MagixMessage import space.kscience.magix.api.MagixMessageFilter import space.kscience.magix.rsocket.rSocketStreamWithWebSockets @@ -59,16 +60,17 @@ private suspend fun MagixEndpoint.collectEcho(scope: CoroutineScope, n: Int) { @OptIn(ExperimentalTime::class) suspend fun main(): Unit = coroutineScope { launch(Dispatchers.Default) { - val server = startMagixServer(enableRawRSocket = false, enableZmq = false) { flow -> + val server = startMagixServer(MagixFlowPlugin { _, flow -> + val logger = LoggerFactory.getLogger("echo") //echo each message flow.onEach { message -> if (message.parentId == null) { val m = message.copy(origin = "loop", parentId = message.id, id = message.id + ".response") - log.info("echo: $m") + logger.info(m.toString()) flow.emit(m) } }.launchIn(this) - } + }) val responseTime = measureTime { diff --git a/demo/magix-demo/src/main/kotlin/zmq.kt b/demo/magix-demo/src/main/kotlin/zmq.kt index 6a7e01d..fbb2ad6 100644 --- a/demo/magix-demo/src/main/kotlin/zmq.kt +++ b/demo/magix-demo/src/main/kotlin/zmq.kt @@ -9,6 +9,8 @@ import kotlinx.serialization.json.* import org.slf4j.LoggerFactory import space.kscience.magix.api.MagixEndpoint import space.kscience.magix.api.MagixMessage +import space.kscience.magix.server.RSocketMagix +import space.kscience.magix.server.ZmqMagix import space.kscience.magix.server.startMagixServer import space.kscince.magix.zmq.ZmqMagixEndpoint import java.awt.Desktop @@ -22,7 +24,7 @@ suspend fun MagixEndpoint.sendJson( id: String? = null, parentId: String? = null, user: JsonElement? = null, - builder: JsonObjectBuilder.() -> Unit + builder: JsonObjectBuilder.() -> Unit, ): Unit = broadcast(MagixMessage(format, buildJsonObject(builder), origin, target, id, parentId, user)) internal const val numberOfMessages = 100 @@ -30,10 +32,7 @@ internal const val numberOfMessages = 100 suspend fun main(): Unit = coroutineScope { val logger = LoggerFactory.getLogger("magix-demo") logger.info("Starting magix server") - val server = startMagixServer( - buffer = 10, - enableRawRSocket = false //Disable rsocket to avoid kotlin 1.5 compatibility issue - ) + val server = startMagixServer(RSocketMagix(), ZmqMagix(), buffer = 10) server.apply { val host = "localhost"//environment.connectors.first().host @@ -44,7 +43,7 @@ suspend fun main(): Unit = coroutineScope { logger.info("Starting client") //Create zmq magix endpoint and wait for to finish - ZmqMagixEndpoint("localhost","tcp").use { client -> + ZmqMagixEndpoint("localhost", "tcp").use { client -> logger.info("Starting subscription") client.subscribe().onEach { println(it.payload) diff --git a/magix/magix-api/src/commonMain/kotlin/space/kscience/magix/api/MagixFlowPlugin.kt b/magix/magix-api/src/commonMain/kotlin/space/kscience/magix/api/MagixFlowPlugin.kt new file mode 100644 index 0000000..8cf9ccd --- /dev/null +++ b/magix/magix-api/src/commonMain/kotlin/space/kscience/magix/api/MagixFlowPlugin.kt @@ -0,0 +1,9 @@ +package space.kscience.magix.api + +import kotlinx.coroutines.CoroutineScope +import kotlinx.coroutines.Job +import kotlinx.coroutines.flow.MutableSharedFlow + +public fun interface MagixFlowPlugin { + public fun start(scope: CoroutineScope, magixFlow: MutableSharedFlow): Job +} \ No newline at end of file diff --git a/magix/magix-server/src/main/kotlin/space/kscience/magix/server/RSocketMagix.kt b/magix/magix-server/src/main/kotlin/space/kscience/magix/server/RSocketMagix.kt new file mode 100644 index 0000000..8deafdf --- /dev/null +++ b/magix/magix-server/src/main/kotlin/space/kscience/magix/server/RSocketMagix.kt @@ -0,0 +1,74 @@ +package space.kscience.magix.server + +import io.rsocket.kotlin.ConnectionAcceptor +import io.rsocket.kotlin.RSocketRequestHandler +import io.rsocket.kotlin.core.RSocketServer +import io.rsocket.kotlin.payload.Payload +import io.rsocket.kotlin.payload.buildPayload +import io.rsocket.kotlin.payload.data +import io.rsocket.kotlin.transport.ktor.tcp.TcpServer +import io.rsocket.kotlin.transport.ktor.tcp.TcpServerTransport +import kotlinx.coroutines.CoroutineScope +import kotlinx.coroutines.Job +import kotlinx.coroutines.flow.* +import kotlinx.serialization.encodeToString +import space.kscience.magix.api.* +import space.kscience.magix.api.MagixEndpoint.Companion.DEFAULT_MAGIX_RAW_PORT + +/** + * Raw TCP magix server + */ +public class RSocketMagix(public val port: Int = DEFAULT_MAGIX_RAW_PORT): MagixFlowPlugin { + override fun start(scope: CoroutineScope, magixFlow: MutableSharedFlow): Job { + val tcpTransport = TcpServerTransport(port = port) + val rSocketJob: TcpServer = RSocketServer().bindIn(scope, tcpTransport, acceptor(scope, magixFlow)) + + scope.coroutineContext[Job]?.invokeOnCompletion { + rSocketJob.handlerJob.cancel() + } + + return rSocketJob.handlerJob + } + + public companion object{ + public fun acceptor( + coroutineScope: CoroutineScope, + magixFlow: MutableSharedFlow, + ): ConnectionAcceptor = ConnectionAcceptor { + RSocketRequestHandler(coroutineScope.coroutineContext) { + //handler for request/stream + requestStream { request: Payload -> + val filter = MagixEndpoint.magixJson.decodeFromString(MagixMessageFilter.serializer(), request.data.readText()) + magixFlow.filter(filter).map { message -> + val string = MagixEndpoint.magixJson.encodeToString(MagixMessage.serializer(), message) + buildPayload { data(string) } + } + } + //single send + fireAndForget { request: Payload -> + val message = MagixEndpoint.magixJson.decodeFromString(MagixMessage.serializer(), request.data.readText()) + magixFlow.emit(message) + } + // bi-directional connection + requestChannel { request: Payload, input: Flow -> + input.onEach { + magixFlow.emit(MagixEndpoint.magixJson.decodeFromString(MagixMessage.serializer(), it.data.readText())) + }.launchIn(this) + + val filterText = request.data.readText() + + val filter = if(filterText.isNotBlank()){ + MagixEndpoint.magixJson.decodeFromString(MagixMessageFilter.serializer(), filterText) + } else { + MagixMessageFilter() + } + + magixFlow.filter(filter).map { message -> + val string = MagixEndpoint.magixJson.encodeToString(message) + buildPayload { data(string) } + } + } + } + } + } +} \ No newline at end of file diff --git a/magix/magix-server/src/main/kotlin/space/kscience/magix/server/ZmqMagix.kt b/magix/magix-server/src/main/kotlin/space/kscience/magix/server/ZmqMagix.kt new file mode 100644 index 0000000..c2e54a0 --- /dev/null +++ b/magix/magix-server/src/main/kotlin/space/kscience/magix/server/ZmqMagix.kt @@ -0,0 +1,53 @@ +package space.kscience.magix.server + +import kotlinx.coroutines.* +import kotlinx.coroutines.flow.MutableSharedFlow +import kotlinx.coroutines.flow.launchIn +import kotlinx.coroutines.flow.onEach +import kotlinx.serialization.decodeFromString +import kotlinx.serialization.encodeToString +import org.slf4j.LoggerFactory +import org.zeromq.SocketType +import org.zeromq.ZContext +import space.kscience.magix.api.MagixEndpoint +import space.kscience.magix.api.MagixFlowPlugin +import space.kscience.magix.api.MagixMessage + + +public class ZmqMagix( + public val localHost: String = "tcp://*", + public val zmqPubSocketPort: Int = MagixEndpoint.DEFAULT_MAGIX_ZMQ_PUB_PORT, + public val zmqPullSocketPort: Int = MagixEndpoint.DEFAULT_MAGIX_ZMQ_PULL_PORT, +) : MagixFlowPlugin { + override fun start(scope: CoroutineScope, magixFlow: MutableSharedFlow): Job = + scope.launch(Dispatchers.IO) { + val logger = LoggerFactory.getLogger("magix-server-zmq") + + ZContext().use { context -> + //launch publishing job + val pubSocket = context.createSocket(SocketType.PUB) + pubSocket.bind("$localHost:$zmqPubSocketPort") + magixFlow.onEach { message -> + val string = MagixEndpoint.magixJson.encodeToString(message) + pubSocket.send(string) + logger.debug("Published: $string") + }.launchIn(this) + + //launch pulling job + val pullSocket = context.createSocket(SocketType.PULL) + pullSocket.bind("$localHost:$zmqPullSocketPort") + pullSocket.receiveTimeOut = 500 + //suspending loop while pulling is active + while (isActive) { + val string: String? = pullSocket.recvStr() + if (string != null) { + logger.debug("Received: $string") + val message = MagixEndpoint.magixJson.decodeFromString(string) + magixFlow.emit(message) + } + } + } + } + + +} \ No newline at end of file diff --git a/magix/magix-server/src/main/kotlin/space/kscience/magix/server/magixModule.kt b/magix/magix-server/src/main/kotlin/space/kscience/magix/server/magixModule.kt index 50c5921..e9bedd3 100644 --- a/magix/magix-server/src/main/kotlin/space/kscience/magix/server/magixModule.kt +++ b/magix/magix-server/src/main/kotlin/space/kscience/magix/server/magixModule.kt @@ -8,15 +8,10 @@ import io.ktor.server.request.receive import io.ktor.server.routing.* import io.ktor.server.util.getValue import io.ktor.server.websocket.WebSockets -import io.rsocket.kotlin.ConnectionAcceptor -import io.rsocket.kotlin.RSocketRequestHandler import io.rsocket.kotlin.ktor.server.RSocketSupport import io.rsocket.kotlin.ktor.server.rSocket -import io.rsocket.kotlin.payload.Payload -import io.rsocket.kotlin.payload.buildPayload -import io.rsocket.kotlin.payload.data -import kotlinx.coroutines.CoroutineScope -import kotlinx.coroutines.flow.* +import kotlinx.coroutines.flow.MutableSharedFlow +import kotlinx.coroutines.flow.map import kotlinx.html.* import kotlinx.serialization.encodeToString import space.kscience.magix.api.MagixEndpoint.Companion.magixJson @@ -26,45 +21,6 @@ import space.kscience.magix.api.filter import java.util.* -internal fun CoroutineScope.magixAcceptor( - magixFlow: MutableSharedFlow, -): ConnectionAcceptor = ConnectionAcceptor { - RSocketRequestHandler(coroutineContext) { - //handler for request/stream - requestStream { request: Payload -> - val filter = magixJson.decodeFromString(MagixMessageFilter.serializer(), request.data.readText()) - magixFlow.filter(filter).map { message -> - val string = magixJson.encodeToString(MagixMessage.serializer(), message) - buildPayload { data(string) } - } - } - //single send - fireAndForget { request: Payload -> - val message = magixJson.decodeFromString(MagixMessage.serializer(), request.data.readText()) - magixFlow.emit(message) - } - // bi-directional connection - requestChannel { request: Payload, input: Flow -> - input.onEach { - magixFlow.emit(magixJson.decodeFromString(MagixMessage.serializer(), it.data.readText())) - }.launchIn(this) - - val filterText = request.data.readText() - - val filter = if(filterText.isNotBlank()){ - magixJson.decodeFromString(MagixMessageFilter.serializer(), filterText) - } else { - MagixMessageFilter() - } - - magixFlow.filter(filter).map { message -> - val string = magixJson.encodeToString(message) - buildPayload { data(string) } - } - } - } -} - /** * Create a message filter from call parameters */ @@ -84,7 +40,7 @@ private fun ApplicationCall.buildFilter(): MagixMessageFilter { } /** - * Attache magix http/sse and websocket-based rsocket event loop + statistics page to existing [MutableSharedFlow] + * Attach magix http/sse and websocket-based rsocket event loop + statistics page to existing [MutableSharedFlow] */ public fun Application.magixModule(magixFlow: MutableSharedFlow, route: String = "/") { if (pluginOrNull(WebSockets) == null) { @@ -149,7 +105,7 @@ public fun Application.magixModule(magixFlow: MutableSharedFlow, r magixFlow.emit(message) } //rSocket server. Filter from Payload - rSocket("rsocket", acceptor = application.magixAcceptor(magixFlow)) + rSocket("rsocket", acceptor = RSocketMagix.acceptor( application, magixFlow)) } } } diff --git a/magix/magix-server/src/main/kotlin/space/kscience/magix/server/server.kt b/magix/magix-server/src/main/kotlin/space/kscience/magix/server/server.kt index b37d5ef..2396e25 100644 --- a/magix/magix-server/src/main/kotlin/space/kscience/magix/server/server.kt +++ b/magix/magix-server/src/main/kotlin/space/kscience/magix/server/server.kt @@ -1,80 +1,36 @@ package space.kscience.magix.server -import io.ktor.server.application.Application import io.ktor.server.cio.CIO import io.ktor.server.engine.ApplicationEngine import io.ktor.server.engine.embeddedServer -import io.rsocket.kotlin.core.RSocketServer -import io.rsocket.kotlin.transport.ktor.tcp.TcpServer -import io.rsocket.kotlin.transport.ktor.tcp.TcpServerTransport import kotlinx.coroutines.CoroutineScope -import kotlinx.coroutines.Job import kotlinx.coroutines.channels.BufferOverflow import kotlinx.coroutines.flow.MutableSharedFlow -import org.slf4j.LoggerFactory -import space.kscience.magix.api.MagixEndpoint import space.kscience.magix.api.MagixEndpoint.Companion.DEFAULT_MAGIX_HTTP_PORT -import space.kscience.magix.api.MagixEndpoint.Companion.DEFAULT_MAGIX_RAW_PORT +import space.kscience.magix.api.MagixFlowPlugin import space.kscience.magix.api.MagixMessage -/** - * Raw TCP magix server - */ -public fun CoroutineScope.launchMagixServerRawRSocket( - magixFlow: MutableSharedFlow, - rawSocketPort: Int = DEFAULT_MAGIX_RAW_PORT, -): TcpServer { - val tcpTransport = TcpServerTransport(port = rawSocketPort) - val rSocketJob: TcpServer = RSocketServer().bindIn(this, tcpTransport, magixAcceptor(magixFlow)) - - coroutineContext[Job]?.invokeOnCompletion { - rSocketJob.handlerJob.cancel() - } - - return rSocketJob; -} /** * A combined RSocket/TCP/ZMQ server - * @param applicationConfiguration optional additional configuration for magix loop server */ public fun CoroutineScope.startMagixServer( + vararg plugins: MagixFlowPlugin, port: Int = DEFAULT_MAGIX_HTTP_PORT, buffer: Int = 1000, - enableRawRSocket: Boolean = true, - rawRSocketPort: Int = DEFAULT_MAGIX_RAW_PORT, - enableZmq: Boolean = true, - zmqPubSocketPort: Int = MagixEndpoint.DEFAULT_MAGIX_ZMQ_PUB_PORT, - zmqPullSocketPort: Int = MagixEndpoint.DEFAULT_MAGIX_ZMQ_PULL_PORT, - applicationConfiguration: Application.(MutableSharedFlow) -> Unit = {}, ): ApplicationEngine { - val logger = LoggerFactory.getLogger("magix-server") + val magixFlow = MutableSharedFlow( replay = buffer, extraBufferCapacity = buffer, onBufferOverflow = BufferOverflow.DROP_OLDEST ) - if (enableRawRSocket) { - //Start tcpRSocket server - logger.info("Starting magix raw rsocket server on port $rawRSocketPort") - launchMagixServerRawRSocket(magixFlow, rawRSocketPort) - } - if (enableZmq) { - //Start ZMQ server socket pair - logger.info("Starting magix zmq server on pub port $zmqPubSocketPort and pull port $zmqPullSocketPort") - launchMagixServerZmqSocket( - magixFlow, - zmqPubSocketPort = zmqPubSocketPort, - zmqPullSocketPort = zmqPullSocketPort - ) + plugins.forEach { + it.start(this, magixFlow) } - @Suppress("ExtractKtorModule") - return embeddedServer(CIO, host = "localhost", port = port) { - magixModule(magixFlow) - applicationConfiguration(magixFlow) - }.apply { + return embeddedServer(CIO, host = "localhost", port = port, module = { magixModule(magixFlow) }).apply { start() } } \ No newline at end of file diff --git a/magix/magix-server/src/main/kotlin/space/kscience/magix/server/zmqMagixServerSocket.kt b/magix/magix-server/src/main/kotlin/space/kscience/magix/server/zmqMagixServerSocket.kt deleted file mode 100644 index 18717e4..0000000 --- a/magix/magix-server/src/main/kotlin/space/kscience/magix/server/zmqMagixServerSocket.kt +++ /dev/null @@ -1,48 +0,0 @@ -package space.kscience.magix.server - -import kotlinx.coroutines.* -import kotlinx.coroutines.flow.MutableSharedFlow -import kotlinx.coroutines.flow.launchIn -import kotlinx.coroutines.flow.onEach -import kotlinx.serialization.decodeFromString -import kotlinx.serialization.encodeToString -import org.slf4j.LoggerFactory -import org.zeromq.SocketType -import org.zeromq.ZContext -import space.kscience.magix.api.MagixEndpoint -import space.kscience.magix.api.MagixMessage - -public fun CoroutineScope.launchMagixServerZmqSocket( - magixFlow: MutableSharedFlow, - localHost: String = "tcp://*", - zmqPubSocketPort: Int = MagixEndpoint.DEFAULT_MAGIX_ZMQ_PUB_PORT, - zmqPullSocketPort: Int = MagixEndpoint.DEFAULT_MAGIX_ZMQ_PULL_PORT, -): Job = launch(Dispatchers.IO) { - val logger = LoggerFactory.getLogger("magix-server-zmq") - - ZContext().use { context -> - //launch publishing job - val pubSocket = context.createSocket(SocketType.PUB) - pubSocket.bind("$localHost:$zmqPubSocketPort") - magixFlow.onEach { message -> - val string = MagixEndpoint.magixJson.encodeToString(message) - pubSocket.send(string) - logger.debug("Published: $string") - }.launchIn(this) - - //launch pulling job - val pullSocket = context.createSocket(SocketType.PULL) - pullSocket.bind("$localHost:$zmqPullSocketPort") - pullSocket.receiveTimeOut = 500 - //suspending loop while pulling is active - while (isActive) { - val string: String? = pullSocket.recvStr() - if (string != null) { - logger.debug("Received: $string") - val message = MagixEndpoint.magixJson.decodeFromString(string) - magixFlow.emit(message) - } - } - } -} -