Use blocking access to solve ZMQ performance issues.

This commit is contained in:
Alexander Nozik 2021-07-04 14:44:53 +03:00
parent 6e01e28015
commit d0f22eec93
5 changed files with 57 additions and 37 deletions

View File

@ -1,21 +1,21 @@
import kotlinx.coroutines.cancel
import kotlinx.coroutines.coroutineScope
import kotlinx.coroutines.delay
import kotlinx.coroutines.flow.catch
import kotlinx.coroutines.flow.launchIn
import kotlinx.coroutines.flow.onEach
import kotlinx.coroutines.isActive
import kotlinx.serialization.json.JsonElement
import kotlinx.serialization.json.JsonObjectBuilder
import kotlinx.serialization.json.buildJsonObject
import kotlinx.serialization.json.put
import kotlinx.serialization.json.*
import org.slf4j.LoggerFactory
import ru.mipt.npm.magix.api.MagixEndpoint
import ru.mipt.npm.magix.api.MagixMessage
import ru.mipt.npm.magix.server.startMagixServer
import ru.mipt.npm.magix.zmq.ZmqMagixEndpoint
import java.awt.Desktop
import java.net.URI
suspend fun MagixEndpoint<JsonElement>.sendJson(
suspend fun MagixEndpoint<JsonObject>.sendJson(
origin: String,
format: String = "json",
target: String? = null,
@ -25,33 +25,44 @@ suspend fun MagixEndpoint<JsonElement>.sendJson(
builder: JsonObjectBuilder.() -> Unit
): Unit = broadcast(MagixMessage(format, origin, buildJsonObject(builder), target, id, parentId, user))
internal const val numberOfMessages = 100
suspend fun main(): Unit = coroutineScope {
val logger = LoggerFactory.getLogger("magix-demo")
logger.info("Starting magix server")
val server = startMagixServer(enableRawRSocket = false)
logger.info("Waiting for server to start")
delay(2000)
val server = startMagixServer(
buffer = 10,
enableRawRSocket = false //Disable rsocket to avoid kotlin 1.5 compatibility issue
)
server.apply {
val host = "localhost"//environment.connectors.first().host
val port = environment.connectors.first().port
val uri = URI("http", null, host, port, "/state", null, null)
Desktop.getDesktop().browse(uri)
}
logger.info("Starting client")
ZmqMagixEndpoint("tcp://localhost", JsonElement.serializer()).use { client ->
//Create zmq magix endpoint and wait for to finish
ZmqMagixEndpoint("tcp://localhost", JsonObject.serializer()).use { client ->
logger.info("Starting subscription")
try {
client.subscribe().onEach {
println(it.payload)
}.catch { it.printStackTrace() }.launchIn(this)
} catch (t: Throwable) {
t.printStackTrace()
throw t
}
client.subscribe().onEach {
println(it.payload)
if (it.payload["index"]?.jsonPrimitive?.int == numberOfMessages) {
logger.info("Index $numberOfMessages reached. Terminating")
cancel()
}
}.catch { it.printStackTrace() }.launchIn(this)
var counter = 0
while (isActive) {
delay(500)
logger.info("Sending message number ${counter + 1}")
client.sendJson("magix-demo") {
val index = (counter++).toString()
logger.info("Sending message number $index")
client.sendJson("magix-demo", id = index) {
put("message", "Hello world!")
put("index", counter++)
put("index", index)
}
}

View File

@ -84,6 +84,9 @@ private fun ApplicationCall.buildFilter(): MagixMessageFilter {
)
}
/**
* Attache magix http/sse and websocket-based rsocket event loop + statistics page to existing [MutableSharedFlow]
*/
public fun Application.magixModule(magixFlow: MutableSharedFlow<GenericMagixMessage>, route: String = "/") {
if (featureOrNull(WebSockets) == null) {
install(WebSockets)
@ -107,11 +110,7 @@ public fun Application.magixModule(magixFlow: MutableSharedFlow<GenericMagixMess
routing {
route(route) {
post {
val message = call.receive<GenericMagixMessage>()
magixFlow.emit(message)
}
get("loop-state") {
get("state") {
call.respondHtml {
body {
h1 { +"Magix loop statistics" }
@ -140,12 +139,19 @@ public fun Application.magixModule(magixFlow: MutableSharedFlow<GenericMagixMess
}
call.respondSse(sseFlow)
}
post("broadcast") {
val message = call.receive<GenericMagixMessage>()
magixFlow.emit(message)
}
//rSocket server. Filter from Payload
rSocket("rsocket", acceptor = magixAcceptor(magixFlow))
}
}
}
/**
* Create a new loop [MutableSharedFlow] with given [buffer] and setup magix module based on it
*/
public fun Application.magixModule(route: String = "/", buffer: Int = 100) {
val magixFlow = MutableSharedFlow<GenericMagixMessage>(buffer)
magixModule(magixFlow, route)

View File

@ -57,10 +57,16 @@ public fun CoroutineScope.startMagixServer(
val zmqPubSocketPort: Int = MagixEndpoint.DEFAULT_MAGIX_ZMQ_PUB_PORT
val zmqPullSocketPort: Int = MagixEndpoint.DEFAULT_MAGIX_ZMQ_PULL_PORT
logger.info("Starting magix zmq server on pub port $zmqPubSocketPort and pull port $zmqPullSocketPort")
launchMagixServerZmqSocket(magixFlow, zmqPubSocketPort = zmqPubSocketPort, zmqPullSocketPort = zmqPullSocketPort)
launchMagixServerZmqSocket(
magixFlow,
zmqPubSocketPort = zmqPubSocketPort,
zmqPullSocketPort = zmqPullSocketPort
)
}
return embeddedServer(CIO, port = port) {
return embeddedServer(CIO, host = "localhost", port = port) {
magixModule(magixFlow)
}.apply {
start()
}
}

View File

@ -1,12 +1,9 @@
package ru.mipt.npm.magix.server
import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.Job
import kotlinx.coroutines.*
import kotlinx.coroutines.flow.MutableSharedFlow
import kotlinx.coroutines.flow.launchIn
import kotlinx.coroutines.flow.onEach
import kotlinx.coroutines.isActive
import kotlinx.coroutines.launch
import org.slf4j.LoggerFactory
import org.zeromq.SocketType
import org.zeromq.ZContext
@ -17,7 +14,7 @@ public fun CoroutineScope.launchMagixServerZmqSocket(
localHost: String = "tcp://*",
zmqPubSocketPort: Int = MagixEndpoint.DEFAULT_MAGIX_ZMQ_PUB_PORT,
zmqPullSocketPort: Int = MagixEndpoint.DEFAULT_MAGIX_ZMQ_PULL_PORT,
): Job = launch {
): Job = launch(Dispatchers.IO) {
val logger = LoggerFactory.getLogger("magix-server-zmq")
ZContext().use { context ->
@ -33,11 +30,10 @@ public fun CoroutineScope.launchMagixServerZmqSocket(
//launch pulling job
val pullSocket = context.createSocket(SocketType.PULL)
pullSocket.bind("$localHost:$zmqPullSocketPort")
pullSocket.receiveTimeOut = 500
//suspending loop while pulling is active
while (isActive) {
//This is a blocking call.
val string: String? = pullSocket.recvStr(zmq.ZMQ.ZMQ_DONTWAIT)
val string: String? = pullSocket.recvStr()
if (string != null) {
logger.debug("Received: $string")
val message = MagixEndpoint.magixJson.decodeFromString(genericMessageSerializer, string)
@ -45,4 +41,5 @@ public fun CoroutineScope.launchMagixServerZmqSocket(
}
}
}
}
}

View File

@ -43,7 +43,7 @@ public class ZmqMagixEndpoint<T>(
while (activeFlag) {
try {
//This is a blocking call.
val string: String? = socket.recvStr(zmq.ZMQ.ZMQ_DONTWAIT)
val string: String? = socket.recvStr()
if (string != null) {
val message = MagixEndpoint.magixJson.decodeFromString(serializer, string)
send(message)