diff --git a/magix/magix-demo/src/main/kotlin/zmq.kt b/magix/magix-demo/src/main/kotlin/zmq.kt index 1b76127..2a73640 100644 --- a/magix/magix-demo/src/main/kotlin/zmq.kt +++ b/magix/magix-demo/src/main/kotlin/zmq.kt @@ -1,21 +1,21 @@ +import kotlinx.coroutines.cancel import kotlinx.coroutines.coroutineScope import kotlinx.coroutines.delay import kotlinx.coroutines.flow.catch import kotlinx.coroutines.flow.launchIn import kotlinx.coroutines.flow.onEach import kotlinx.coroutines.isActive -import kotlinx.serialization.json.JsonElement -import kotlinx.serialization.json.JsonObjectBuilder -import kotlinx.serialization.json.buildJsonObject -import kotlinx.serialization.json.put +import kotlinx.serialization.json.* import org.slf4j.LoggerFactory import ru.mipt.npm.magix.api.MagixEndpoint import ru.mipt.npm.magix.api.MagixMessage import ru.mipt.npm.magix.server.startMagixServer import ru.mipt.npm.magix.zmq.ZmqMagixEndpoint +import java.awt.Desktop +import java.net.URI -suspend fun MagixEndpoint.sendJson( +suspend fun MagixEndpoint.sendJson( origin: String, format: String = "json", target: String? = null, @@ -25,33 +25,44 @@ suspend fun MagixEndpoint.sendJson( builder: JsonObjectBuilder.() -> Unit ): Unit = broadcast(MagixMessage(format, origin, buildJsonObject(builder), target, id, parentId, user)) +internal const val numberOfMessages = 100 + suspend fun main(): Unit = coroutineScope { val logger = LoggerFactory.getLogger("magix-demo") logger.info("Starting magix server") - val server = startMagixServer(enableRawRSocket = false) - logger.info("Waiting for server to start") - delay(2000) + val server = startMagixServer( + buffer = 10, + enableRawRSocket = false //Disable rsocket to avoid kotlin 1.5 compatibility issue + ) + + server.apply { + val host = "localhost"//environment.connectors.first().host + val port = environment.connectors.first().port + val uri = URI("http", null, host, port, "/state", null, null) + Desktop.getDesktop().browse(uri) + } logger.info("Starting client") - ZmqMagixEndpoint("tcp://localhost", JsonElement.serializer()).use { client -> + //Create zmq magix endpoint and wait for to finish + ZmqMagixEndpoint("tcp://localhost", JsonObject.serializer()).use { client -> logger.info("Starting subscription") - try { - client.subscribe().onEach { - println(it.payload) - }.catch { it.printStackTrace() }.launchIn(this) - } catch (t: Throwable) { - t.printStackTrace() - throw t - } + client.subscribe().onEach { + println(it.payload) + if (it.payload["index"]?.jsonPrimitive?.int == numberOfMessages) { + logger.info("Index $numberOfMessages reached. Terminating") + cancel() + } + }.catch { it.printStackTrace() }.launchIn(this) var counter = 0 while (isActive) { delay(500) - logger.info("Sending message number ${counter + 1}") - client.sendJson("magix-demo") { + val index = (counter++).toString() + logger.info("Sending message number $index") + client.sendJson("magix-demo", id = index) { put("message", "Hello world!") - put("index", counter++) + put("index", index) } } diff --git a/magix/magix-server/src/main/kotlin/ru/mipt/npm/magix/server/magixModule.kt b/magix/magix-server/src/main/kotlin/ru/mipt/npm/magix/server/magixModule.kt index 398b89a..67d44c0 100644 --- a/magix/magix-server/src/main/kotlin/ru/mipt/npm/magix/server/magixModule.kt +++ b/magix/magix-server/src/main/kotlin/ru/mipt/npm/magix/server/magixModule.kt @@ -84,6 +84,9 @@ private fun ApplicationCall.buildFilter(): MagixMessageFilter { ) } +/** + * Attache magix http/sse and websocket-based rsocket event loop + statistics page to existing [MutableSharedFlow] + */ public fun Application.magixModule(magixFlow: MutableSharedFlow, route: String = "/") { if (featureOrNull(WebSockets) == null) { install(WebSockets) @@ -107,11 +110,7 @@ public fun Application.magixModule(magixFlow: MutableSharedFlow() - magixFlow.emit(message) - } - get("loop-state") { + get("state") { call.respondHtml { body { h1 { +"Magix loop statistics" } @@ -140,12 +139,19 @@ public fun Application.magixModule(magixFlow: MutableSharedFlow() + magixFlow.emit(message) + } //rSocket server. Filter from Payload rSocket("rsocket", acceptor = magixAcceptor(magixFlow)) } } } +/** + * Create a new loop [MutableSharedFlow] with given [buffer] and setup magix module based on it + */ public fun Application.magixModule(route: String = "/", buffer: Int = 100) { val magixFlow = MutableSharedFlow(buffer) magixModule(magixFlow, route) diff --git a/magix/magix-server/src/main/kotlin/ru/mipt/npm/magix/server/server.kt b/magix/magix-server/src/main/kotlin/ru/mipt/npm/magix/server/server.kt index 848b504..cf0d98a 100644 --- a/magix/magix-server/src/main/kotlin/ru/mipt/npm/magix/server/server.kt +++ b/magix/magix-server/src/main/kotlin/ru/mipt/npm/magix/server/server.kt @@ -57,10 +57,16 @@ public fun CoroutineScope.startMagixServer( val zmqPubSocketPort: Int = MagixEndpoint.DEFAULT_MAGIX_ZMQ_PUB_PORT val zmqPullSocketPort: Int = MagixEndpoint.DEFAULT_MAGIX_ZMQ_PULL_PORT logger.info("Starting magix zmq server on pub port $zmqPubSocketPort and pull port $zmqPullSocketPort") - launchMagixServerZmqSocket(magixFlow, zmqPubSocketPort = zmqPubSocketPort, zmqPullSocketPort = zmqPullSocketPort) + launchMagixServerZmqSocket( + magixFlow, + zmqPubSocketPort = zmqPubSocketPort, + zmqPullSocketPort = zmqPullSocketPort + ) } - return embeddedServer(CIO, port = port) { + return embeddedServer(CIO, host = "localhost", port = port) { magixModule(magixFlow) + }.apply { + start() } } \ No newline at end of file diff --git a/magix/magix-server/src/main/kotlin/ru/mipt/npm/magix/server/zmqMagixServerSocket.kt b/magix/magix-server/src/main/kotlin/ru/mipt/npm/magix/server/zmqMagixServerSocket.kt index f845775..e62acd7 100644 --- a/magix/magix-server/src/main/kotlin/ru/mipt/npm/magix/server/zmqMagixServerSocket.kt +++ b/magix/magix-server/src/main/kotlin/ru/mipt/npm/magix/server/zmqMagixServerSocket.kt @@ -1,12 +1,9 @@ package ru.mipt.npm.magix.server -import kotlinx.coroutines.CoroutineScope -import kotlinx.coroutines.Job +import kotlinx.coroutines.* import kotlinx.coroutines.flow.MutableSharedFlow import kotlinx.coroutines.flow.launchIn import kotlinx.coroutines.flow.onEach -import kotlinx.coroutines.isActive -import kotlinx.coroutines.launch import org.slf4j.LoggerFactory import org.zeromq.SocketType import org.zeromq.ZContext @@ -17,7 +14,7 @@ public fun CoroutineScope.launchMagixServerZmqSocket( localHost: String = "tcp://*", zmqPubSocketPort: Int = MagixEndpoint.DEFAULT_MAGIX_ZMQ_PUB_PORT, zmqPullSocketPort: Int = MagixEndpoint.DEFAULT_MAGIX_ZMQ_PULL_PORT, -): Job = launch { +): Job = launch(Dispatchers.IO) { val logger = LoggerFactory.getLogger("magix-server-zmq") ZContext().use { context -> @@ -33,11 +30,10 @@ public fun CoroutineScope.launchMagixServerZmqSocket( //launch pulling job val pullSocket = context.createSocket(SocketType.PULL) pullSocket.bind("$localHost:$zmqPullSocketPort") + pullSocket.receiveTimeOut = 500 //suspending loop while pulling is active - while (isActive) { - //This is a blocking call. - val string: String? = pullSocket.recvStr(zmq.ZMQ.ZMQ_DONTWAIT) + val string: String? = pullSocket.recvStr() if (string != null) { logger.debug("Received: $string") val message = MagixEndpoint.magixJson.decodeFromString(genericMessageSerializer, string) @@ -45,4 +41,5 @@ public fun CoroutineScope.launchMagixServerZmqSocket( } } } -} \ No newline at end of file +} + diff --git a/magix/magix-zmq/src/main/kotlin/ru/mipt/npm/magix/zmq/ZmqMagixEndpoint.kt b/magix/magix-zmq/src/main/kotlin/ru/mipt/npm/magix/zmq/ZmqMagixEndpoint.kt index 4080d95..1711a25 100644 --- a/magix/magix-zmq/src/main/kotlin/ru/mipt/npm/magix/zmq/ZmqMagixEndpoint.kt +++ b/magix/magix-zmq/src/main/kotlin/ru/mipt/npm/magix/zmq/ZmqMagixEndpoint.kt @@ -43,7 +43,7 @@ public class ZmqMagixEndpoint( while (activeFlag) { try { //This is a blocking call. - val string: String? = socket.recvStr(zmq.ZMQ.ZMQ_DONTWAIT) + val string: String? = socket.recvStr() if (string != null) { val message = MagixEndpoint.magixJson.decodeFromString(serializer, string) send(message)