From 6e01e28015aa5440e135b0ff5675b1c7d1c7d6a7 Mon Sep 17 00:00:00 2001 From: Alexander Nozik Date: Sat, 3 Jul 2021 14:16:31 +0300 Subject: [PATCH] ZMQ fully functional --- .../npm/controls/server/deviceWebServer.kt | 4 +- magix/magix-demo/build.gradle.kts | 15 +++++ magix/magix-demo/src/main/kotlin/zmq.kt | 59 ++++++++++++++++++ magix/magix-server/build.gradle.kts | 14 ++--- .../ru/mipt/npm/magix/server/magixModule.kt | 4 +- .../kotlin/ru/mipt/npm/magix/server/server.kt | 61 +++++++------------ .../npm/magix/server/zmqMagixServerSocket.kt | 48 +++++++++++++++ .../ru/mipt/npm/magix/zmq/ZmqMagixEndpoint.kt | 22 ++++--- settings.gradle.kts | 3 +- 9 files changed, 168 insertions(+), 62 deletions(-) create mode 100644 magix/magix-demo/build.gradle.kts create mode 100644 magix/magix-demo/src/main/kotlin/zmq.kt create mode 100644 magix/magix-server/src/main/kotlin/ru/mipt/npm/magix/server/zmqMagixServerSocket.kt diff --git a/controls-server/src/main/kotlin/ru/mipt/npm/controls/server/deviceWebServer.kt b/controls-server/src/main/kotlin/ru/mipt/npm/controls/server/deviceWebServer.kt index 01e8faf..00d27ba 100644 --- a/controls-server/src/main/kotlin/ru/mipt/npm/controls/server/deviceWebServer.kt +++ b/controls-server/src/main/kotlin/ru/mipt/npm/controls/server/deviceWebServer.kt @@ -32,8 +32,8 @@ import ru.mipt.npm.controls.controllers.DeviceManager import ru.mipt.npm.controls.controllers.respondMessage import ru.mipt.npm.magix.api.MagixEndpoint import ru.mipt.npm.magix.server.GenericMagixMessage +import ru.mipt.npm.magix.server.launchMagixServerRawRSocket import ru.mipt.npm.magix.server.magixModule -import ru.mipt.npm.magix.server.rawMagixServerSocket import space.kscience.dataforge.meta.toJson import space.kscience.dataforge.meta.toMetaItem @@ -204,6 +204,6 @@ public fun Application.deviceManagerModule( extraBufferCapacity = buffer ) - rawMagixServerSocket(magixFlow, rawSocketPort) + launchMagixServerRawRSocket(magixFlow, rawSocketPort) magixModule(magixFlow) } \ No newline at end of file diff --git a/magix/magix-demo/build.gradle.kts b/magix/magix-demo/build.gradle.kts new file mode 100644 index 0000000..71f0888 --- /dev/null +++ b/magix/magix-demo/build.gradle.kts @@ -0,0 +1,15 @@ +plugins { + id("ru.mipt.npm.gradle.jvm") +} + + +dependencies{ + implementation(projects.magix.magixServer) + implementation(projects.magix.magixZmq) + implementation(projects.magix.magixRsocket) + implementation("ch.qos.logback:logback-classic:1.2.3") +} + +kotlin{ + explicitApi = null +} \ No newline at end of file diff --git a/magix/magix-demo/src/main/kotlin/zmq.kt b/magix/magix-demo/src/main/kotlin/zmq.kt new file mode 100644 index 0000000..1b76127 --- /dev/null +++ b/magix/magix-demo/src/main/kotlin/zmq.kt @@ -0,0 +1,59 @@ +import kotlinx.coroutines.coroutineScope +import kotlinx.coroutines.delay +import kotlinx.coroutines.flow.catch +import kotlinx.coroutines.flow.launchIn +import kotlinx.coroutines.flow.onEach +import kotlinx.coroutines.isActive +import kotlinx.serialization.json.JsonElement +import kotlinx.serialization.json.JsonObjectBuilder +import kotlinx.serialization.json.buildJsonObject +import kotlinx.serialization.json.put +import org.slf4j.LoggerFactory +import ru.mipt.npm.magix.api.MagixEndpoint +import ru.mipt.npm.magix.api.MagixMessage +import ru.mipt.npm.magix.server.startMagixServer +import ru.mipt.npm.magix.zmq.ZmqMagixEndpoint + + +suspend fun MagixEndpoint.sendJson( + origin: String, + format: String = "json", + target: String? = null, + id: String? = null, + parentId: String? = null, + user: JsonElement? = null, + builder: JsonObjectBuilder.() -> Unit +): Unit = broadcast(MagixMessage(format, origin, buildJsonObject(builder), target, id, parentId, user)) + +suspend fun main(): Unit = coroutineScope { + val logger = LoggerFactory.getLogger("magix-demo") + logger.info("Starting magix server") + val server = startMagixServer(enableRawRSocket = false) + logger.info("Waiting for server to start") + delay(2000) + + logger.info("Starting client") + ZmqMagixEndpoint("tcp://localhost", JsonElement.serializer()).use { client -> + logger.info("Starting subscription") + try { + client.subscribe().onEach { + println(it.payload) + }.catch { it.printStackTrace() }.launchIn(this) + } catch (t: Throwable) { + t.printStackTrace() + throw t + } + + + var counter = 0 + while (isActive) { + delay(500) + logger.info("Sending message number ${counter + 1}") + client.sendJson("magix-demo") { + put("message", "Hello world!") + put("index", counter++) + } + } + + } +} \ No newline at end of file diff --git a/magix/magix-server/build.gradle.kts b/magix/magix-server/build.gradle.kts index 63b394f..7f01ea8 100644 --- a/magix/magix-server/build.gradle.kts +++ b/magix/magix-server/build.gradle.kts @@ -20,13 +20,13 @@ val ktorVersion: String = ru.mipt.npm.gradle.KScienceVersions.ktorVersion dependencies{ api(project(":magix:magix-api")) - implementation("io.ktor:ktor-server-cio:$ktorVersion") - implementation("io.ktor:ktor-websockets:$ktorVersion") - implementation("io.ktor:ktor-serialization:$ktorVersion") - implementation("io.ktor:ktor-html-builder:$ktorVersion") + api("io.ktor:ktor-server-cio:$ktorVersion") + api("io.ktor:ktor-websockets:$ktorVersion") + api("io.ktor:ktor-serialization:$ktorVersion") + api("io.ktor:ktor-html-builder:$ktorVersion") - implementation("io.rsocket.kotlin:rsocket-core:$rsocketVersion") - implementation("io.rsocket.kotlin:rsocket-transport-ktor-server:$rsocketVersion") + api("io.rsocket.kotlin:rsocket-core:$rsocketVersion") + api("io.rsocket.kotlin:rsocket-transport-ktor-server:$rsocketVersion") - implementation("org.zeromq:jeromq:0.5.2") + api("org.zeromq:jeromq:0.5.2") } \ No newline at end of file diff --git a/magix/magix-server/src/main/kotlin/ru/mipt/npm/magix/server/magixModule.kt b/magix/magix-server/src/main/kotlin/ru/mipt/npm/magix/server/magixModule.kt index fa90d28..398b89a 100644 --- a/magix/magix-server/src/main/kotlin/ru/mipt/npm/magix/server/magixModule.kt +++ b/magix/magix-server/src/main/kotlin/ru/mipt/npm/magix/server/magixModule.kt @@ -111,10 +111,10 @@ public fun Application.magixModule(magixFlow: MutableSharedFlow() magixFlow.emit(message) } - get { + get("loop-state") { call.respondHtml { body { - h1 { +"Magix stream statistics" } + h1 { +"Magix loop statistics" } h2 { +"Number of subscribers: ${magixFlow.subscriptionCount.value}" } h3 { +"Replay cache size: ${magixFlow.replayCache.size}" } h3 { +"Replay cache:" } diff --git a/magix/magix-server/src/main/kotlin/ru/mipt/npm/magix/server/server.kt b/magix/magix-server/src/main/kotlin/ru/mipt/npm/magix/server/server.kt index a405493..848b504 100644 --- a/magix/magix-server/src/main/kotlin/ru/mipt/npm/magix/server/server.kt +++ b/magix/magix-server/src/main/kotlin/ru/mipt/npm/magix/server/server.kt @@ -7,22 +7,19 @@ import io.ktor.server.engine.ApplicationEngine import io.ktor.server.engine.embeddedServer import io.rsocket.kotlin.core.RSocketServer import io.rsocket.kotlin.transport.ktor.serverTransport -import kotlinx.coroutines.* +import kotlinx.coroutines.CoroutineScope +import kotlinx.coroutines.Dispatchers +import kotlinx.coroutines.Job import kotlinx.coroutines.flow.MutableSharedFlow -import kotlinx.coroutines.flow.launchIn -import kotlinx.coroutines.flow.onEach -import org.zeromq.SocketType -import org.zeromq.ZContext +import org.slf4j.LoggerFactory import ru.mipt.npm.magix.api.MagixEndpoint import ru.mipt.npm.magix.api.MagixEndpoint.Companion.DEFAULT_MAGIX_HTTP_PORT import ru.mipt.npm.magix.api.MagixEndpoint.Companion.DEFAULT_MAGIX_RAW_PORT -import ru.mipt.npm.magix.api.MagixEndpoint.Companion.DEFAULT_MAGIX_ZMQ_PUB_PORT -import ru.mipt.npm.magix.api.MagixEndpoint.Companion.DEFAULT_MAGIX_ZMQ_PULL_PORT /** * Raw TCP magix server */ -public fun CoroutineScope.rawMagixServerSocket( +public fun CoroutineScope.launchMagixServerRawRSocket( magixFlow: MutableSharedFlow, rawSocketPort: Int = DEFAULT_MAGIX_RAW_PORT ): Job { @@ -34,50 +31,34 @@ public fun CoroutineScope.rawMagixServerSocket( return rSocketJob; } -public fun CoroutineScope.zmqMagixServerSocket( - magixFlow: MutableSharedFlow, - localHost: String = "tcp://*", - zmqPubSocketPort: Int = DEFAULT_MAGIX_ZMQ_PUB_PORT, - zmqPullSocketPort: Int = DEFAULT_MAGIX_ZMQ_PULL_PORT, -): Job = launch { - ZContext().use { context -> - //launch publishing job - val pubSocket = context.createSocket(SocketType.XPUB) - pubSocket.bind("$localHost:$zmqPubSocketPort") - magixFlow.onEach { message -> - pubSocket.send(MagixEndpoint.magixJson.encodeToString(genericMessageSerializer, message)) - }.launchIn(this) - - //launch pulling job - val pullSocket = context.createSocket(SocketType.PULL) - pubSocket.bind("$localHost:$zmqPullSocketPort") - launch(Dispatchers.IO) { - while (isActive) { - //This is a blocking call. - val string = pullSocket.recvStr() - val message = MagixEndpoint.magixJson.decodeFromString(genericMessageSerializer, string) - magixFlow.emit(message) - } - } - } -} - /** * A combined RSocket/TCP server */ public fun CoroutineScope.startMagixServer( port: Int = DEFAULT_MAGIX_HTTP_PORT, buffer: Int = 100, + enableRawRSocket: Boolean = true, + enableZmq: Boolean = true ): ApplicationEngine { - + val logger = LoggerFactory.getLogger("magix-server") val magixFlow = MutableSharedFlow( buffer, extraBufferCapacity = buffer ) - //start tcpRSocket server - rawMagixServerSocket(magixFlow) - zmqMagixServerSocket(magixFlow) + if (enableRawRSocket) { + //Start tcpRSocket server + val rawRSocketPort = DEFAULT_MAGIX_RAW_PORT + logger.info("Starting magix raw rsocket server on port $rawRSocketPort") + launchMagixServerRawRSocket(magixFlow, rawRSocketPort) + } + if (enableZmq) { + //Start ZMQ server socket pair + val zmqPubSocketPort: Int = MagixEndpoint.DEFAULT_MAGIX_ZMQ_PUB_PORT + val zmqPullSocketPort: Int = MagixEndpoint.DEFAULT_MAGIX_ZMQ_PULL_PORT + logger.info("Starting magix zmq server on pub port $zmqPubSocketPort and pull port $zmqPullSocketPort") + launchMagixServerZmqSocket(magixFlow, zmqPubSocketPort = zmqPubSocketPort, zmqPullSocketPort = zmqPullSocketPort) + } return embeddedServer(CIO, port = port) { magixModule(magixFlow) diff --git a/magix/magix-server/src/main/kotlin/ru/mipt/npm/magix/server/zmqMagixServerSocket.kt b/magix/magix-server/src/main/kotlin/ru/mipt/npm/magix/server/zmqMagixServerSocket.kt new file mode 100644 index 0000000..f845775 --- /dev/null +++ b/magix/magix-server/src/main/kotlin/ru/mipt/npm/magix/server/zmqMagixServerSocket.kt @@ -0,0 +1,48 @@ +package ru.mipt.npm.magix.server + +import kotlinx.coroutines.CoroutineScope +import kotlinx.coroutines.Job +import kotlinx.coroutines.flow.MutableSharedFlow +import kotlinx.coroutines.flow.launchIn +import kotlinx.coroutines.flow.onEach +import kotlinx.coroutines.isActive +import kotlinx.coroutines.launch +import org.slf4j.LoggerFactory +import org.zeromq.SocketType +import org.zeromq.ZContext +import ru.mipt.npm.magix.api.MagixEndpoint + +public fun CoroutineScope.launchMagixServerZmqSocket( + magixFlow: MutableSharedFlow, + localHost: String = "tcp://*", + zmqPubSocketPort: Int = MagixEndpoint.DEFAULT_MAGIX_ZMQ_PUB_PORT, + zmqPullSocketPort: Int = MagixEndpoint.DEFAULT_MAGIX_ZMQ_PULL_PORT, +): Job = launch { + val logger = LoggerFactory.getLogger("magix-server-zmq") + + ZContext().use { context -> + //launch publishing job + val pubSocket = context.createSocket(SocketType.PUB) + pubSocket.bind("$localHost:$zmqPubSocketPort") + magixFlow.onEach { message -> + val string = MagixEndpoint.magixJson.encodeToString(genericMessageSerializer, message) + pubSocket.send(string) + logger.debug("Published: $string") + }.launchIn(this) + + //launch pulling job + val pullSocket = context.createSocket(SocketType.PULL) + pullSocket.bind("$localHost:$zmqPullSocketPort") + //suspending loop while pulling is active + + while (isActive) { + //This is a blocking call. + val string: String? = pullSocket.recvStr(zmq.ZMQ.ZMQ_DONTWAIT) + if (string != null) { + logger.debug("Received: $string") + val message = MagixEndpoint.magixJson.decodeFromString(genericMessageSerializer, string) + magixFlow.emit(message) + } + } + } +} \ No newline at end of file diff --git a/magix/magix-zmq/src/main/kotlin/ru/mipt/npm/magix/zmq/ZmqMagixEndpoint.kt b/magix/magix-zmq/src/main/kotlin/ru/mipt/npm/magix/zmq/ZmqMagixEndpoint.kt index bccdfe0..4080d95 100644 --- a/magix/magix-zmq/src/main/kotlin/ru/mipt/npm/magix/zmq/ZmqMagixEndpoint.kt +++ b/magix/magix-zmq/src/main/kotlin/ru/mipt/npm/magix/zmq/ZmqMagixEndpoint.kt @@ -24,17 +24,15 @@ public class ZmqMagixEndpoint( private val pullPort: Int = MagixEndpoint.DEFAULT_MAGIX_ZMQ_PULL_PORT, private val coroutineContext: CoroutineContext = Dispatchers.IO ) : MagixEndpoint, AutoCloseable { - private val zmqContext = ZContext() + private val zmqContext by lazy { ZContext() } private val serializer = MagixMessage.serializer(payloadSerializer) @OptIn(ExperimentalCoroutinesApi::class) override fun subscribe(filter: MagixMessageFilter): Flow> { - val socket = zmqContext.createSocket(SocketType.XSUB) + val socket = zmqContext.createSocket(SocketType.SUB) socket.connect("$host:$pubPort") - - val topic = "magix"//MagixEndpoint.magixJson.encodeToString(filter) - socket.subscribe(topic) + socket.subscribe("") return channelFlow { var activeFlag = true @@ -45,9 +43,11 @@ public class ZmqMagixEndpoint( while (activeFlag) { try { //This is a blocking call. - val string = socket.recvStr() - val message = MagixEndpoint.magixJson.decodeFromString(serializer, string) - send(message) + val string: String? = socket.recvStr(zmq.ZMQ.ZMQ_DONTWAIT) + if (string != null) { + val message = MagixEndpoint.magixJson.decodeFromString(serializer, string) + send(message) + } } catch (t: Throwable) { socket.close() if (t is ZMQException && t.errorCode == ZMQ.Error.ETERM.code) { @@ -60,8 +60,10 @@ public class ZmqMagixEndpoint( }.filter(filter).flowOn(coroutineContext) //should be flown on IO because of blocking calls } - private val publishSocket = zmqContext.createSocket(SocketType.PUSH).apply { - connect("$host:$pullPort") + private val publishSocket by lazy { + zmqContext.createSocket(SocketType.PUSH).apply { + connect("$host:$pullPort") + } } override suspend fun broadcast(message: MagixMessage): Unit = withContext(coroutineContext) { diff --git a/settings.gradle.kts b/settings.gradle.kts index 1f0cd42..73d49aa 100644 --- a/settings.gradle.kts +++ b/settings.gradle.kts @@ -31,6 +31,7 @@ include( ":magix:magix-rsocket", ":magix:magix-java-client", ":magix:magix-zmq", + ":magix:magix-demo", ":controls-magix-client", ":motors" -) +) \ No newline at end of file