ZMQ fully functional

This commit is contained in:
Alexander Nozik 2021-07-03 14:16:31 +03:00
parent 89190db653
commit 6e01e28015
9 changed files with 168 additions and 62 deletions

View File

@ -32,8 +32,8 @@ import ru.mipt.npm.controls.controllers.DeviceManager
import ru.mipt.npm.controls.controllers.respondMessage import ru.mipt.npm.controls.controllers.respondMessage
import ru.mipt.npm.magix.api.MagixEndpoint import ru.mipt.npm.magix.api.MagixEndpoint
import ru.mipt.npm.magix.server.GenericMagixMessage import ru.mipt.npm.magix.server.GenericMagixMessage
import ru.mipt.npm.magix.server.launchMagixServerRawRSocket
import ru.mipt.npm.magix.server.magixModule import ru.mipt.npm.magix.server.magixModule
import ru.mipt.npm.magix.server.rawMagixServerSocket
import space.kscience.dataforge.meta.toJson import space.kscience.dataforge.meta.toJson
import space.kscience.dataforge.meta.toMetaItem import space.kscience.dataforge.meta.toMetaItem
@ -204,6 +204,6 @@ public fun Application.deviceManagerModule(
extraBufferCapacity = buffer extraBufferCapacity = buffer
) )
rawMagixServerSocket(magixFlow, rawSocketPort) launchMagixServerRawRSocket(magixFlow, rawSocketPort)
magixModule(magixFlow) magixModule(magixFlow)
} }

View File

@ -0,0 +1,15 @@
plugins {
id("ru.mipt.npm.gradle.jvm")
}
dependencies{
implementation(projects.magix.magixServer)
implementation(projects.magix.magixZmq)
implementation(projects.magix.magixRsocket)
implementation("ch.qos.logback:logback-classic:1.2.3")
}
kotlin{
explicitApi = null
}

View File

@ -0,0 +1,59 @@
import kotlinx.coroutines.coroutineScope
import kotlinx.coroutines.delay
import kotlinx.coroutines.flow.catch
import kotlinx.coroutines.flow.launchIn
import kotlinx.coroutines.flow.onEach
import kotlinx.coroutines.isActive
import kotlinx.serialization.json.JsonElement
import kotlinx.serialization.json.JsonObjectBuilder
import kotlinx.serialization.json.buildJsonObject
import kotlinx.serialization.json.put
import org.slf4j.LoggerFactory
import ru.mipt.npm.magix.api.MagixEndpoint
import ru.mipt.npm.magix.api.MagixMessage
import ru.mipt.npm.magix.server.startMagixServer
import ru.mipt.npm.magix.zmq.ZmqMagixEndpoint
suspend fun MagixEndpoint<JsonElement>.sendJson(
origin: String,
format: String = "json",
target: String? = null,
id: String? = null,
parentId: String? = null,
user: JsonElement? = null,
builder: JsonObjectBuilder.() -> Unit
): Unit = broadcast(MagixMessage(format, origin, buildJsonObject(builder), target, id, parentId, user))
suspend fun main(): Unit = coroutineScope {
val logger = LoggerFactory.getLogger("magix-demo")
logger.info("Starting magix server")
val server = startMagixServer(enableRawRSocket = false)
logger.info("Waiting for server to start")
delay(2000)
logger.info("Starting client")
ZmqMagixEndpoint("tcp://localhost", JsonElement.serializer()).use { client ->
logger.info("Starting subscription")
try {
client.subscribe().onEach {
println(it.payload)
}.catch { it.printStackTrace() }.launchIn(this)
} catch (t: Throwable) {
t.printStackTrace()
throw t
}
var counter = 0
while (isActive) {
delay(500)
logger.info("Sending message number ${counter + 1}")
client.sendJson("magix-demo") {
put("message", "Hello world!")
put("index", counter++)
}
}
}
}

View File

@ -20,13 +20,13 @@ val ktorVersion: String = ru.mipt.npm.gradle.KScienceVersions.ktorVersion
dependencies{ dependencies{
api(project(":magix:magix-api")) api(project(":magix:magix-api"))
implementation("io.ktor:ktor-server-cio:$ktorVersion") api("io.ktor:ktor-server-cio:$ktorVersion")
implementation("io.ktor:ktor-websockets:$ktorVersion") api("io.ktor:ktor-websockets:$ktorVersion")
implementation("io.ktor:ktor-serialization:$ktorVersion") api("io.ktor:ktor-serialization:$ktorVersion")
implementation("io.ktor:ktor-html-builder:$ktorVersion") api("io.ktor:ktor-html-builder:$ktorVersion")
implementation("io.rsocket.kotlin:rsocket-core:$rsocketVersion") api("io.rsocket.kotlin:rsocket-core:$rsocketVersion")
implementation("io.rsocket.kotlin:rsocket-transport-ktor-server:$rsocketVersion") api("io.rsocket.kotlin:rsocket-transport-ktor-server:$rsocketVersion")
implementation("org.zeromq:jeromq:0.5.2") api("org.zeromq:jeromq:0.5.2")
} }

View File

@ -111,10 +111,10 @@ public fun Application.magixModule(magixFlow: MutableSharedFlow<GenericMagixMess
val message = call.receive<GenericMagixMessage>() val message = call.receive<GenericMagixMessage>()
magixFlow.emit(message) magixFlow.emit(message)
} }
get { get("loop-state") {
call.respondHtml { call.respondHtml {
body { body {
h1 { +"Magix stream statistics" } h1 { +"Magix loop statistics" }
h2 { +"Number of subscribers: ${magixFlow.subscriptionCount.value}" } h2 { +"Number of subscribers: ${magixFlow.subscriptionCount.value}" }
h3 { +"Replay cache size: ${magixFlow.replayCache.size}" } h3 { +"Replay cache size: ${magixFlow.replayCache.size}" }
h3 { +"Replay cache:" } h3 { +"Replay cache:" }

View File

@ -7,22 +7,19 @@ import io.ktor.server.engine.ApplicationEngine
import io.ktor.server.engine.embeddedServer import io.ktor.server.engine.embeddedServer
import io.rsocket.kotlin.core.RSocketServer import io.rsocket.kotlin.core.RSocketServer
import io.rsocket.kotlin.transport.ktor.serverTransport import io.rsocket.kotlin.transport.ktor.serverTransport
import kotlinx.coroutines.* import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.Dispatchers
import kotlinx.coroutines.Job
import kotlinx.coroutines.flow.MutableSharedFlow import kotlinx.coroutines.flow.MutableSharedFlow
import kotlinx.coroutines.flow.launchIn import org.slf4j.LoggerFactory
import kotlinx.coroutines.flow.onEach
import org.zeromq.SocketType
import org.zeromq.ZContext
import ru.mipt.npm.magix.api.MagixEndpoint import ru.mipt.npm.magix.api.MagixEndpoint
import ru.mipt.npm.magix.api.MagixEndpoint.Companion.DEFAULT_MAGIX_HTTP_PORT import ru.mipt.npm.magix.api.MagixEndpoint.Companion.DEFAULT_MAGIX_HTTP_PORT
import ru.mipt.npm.magix.api.MagixEndpoint.Companion.DEFAULT_MAGIX_RAW_PORT import ru.mipt.npm.magix.api.MagixEndpoint.Companion.DEFAULT_MAGIX_RAW_PORT
import ru.mipt.npm.magix.api.MagixEndpoint.Companion.DEFAULT_MAGIX_ZMQ_PUB_PORT
import ru.mipt.npm.magix.api.MagixEndpoint.Companion.DEFAULT_MAGIX_ZMQ_PULL_PORT
/** /**
* Raw TCP magix server * Raw TCP magix server
*/ */
public fun CoroutineScope.rawMagixServerSocket( public fun CoroutineScope.launchMagixServerRawRSocket(
magixFlow: MutableSharedFlow<GenericMagixMessage>, magixFlow: MutableSharedFlow<GenericMagixMessage>,
rawSocketPort: Int = DEFAULT_MAGIX_RAW_PORT rawSocketPort: Int = DEFAULT_MAGIX_RAW_PORT
): Job { ): Job {
@ -34,50 +31,34 @@ public fun CoroutineScope.rawMagixServerSocket(
return rSocketJob; return rSocketJob;
} }
public fun CoroutineScope.zmqMagixServerSocket(
magixFlow: MutableSharedFlow<GenericMagixMessage>,
localHost: String = "tcp://*",
zmqPubSocketPort: Int = DEFAULT_MAGIX_ZMQ_PUB_PORT,
zmqPullSocketPort: Int = DEFAULT_MAGIX_ZMQ_PULL_PORT,
): Job = launch {
ZContext().use { context ->
//launch publishing job
val pubSocket = context.createSocket(SocketType.XPUB)
pubSocket.bind("$localHost:$zmqPubSocketPort")
magixFlow.onEach { message ->
pubSocket.send(MagixEndpoint.magixJson.encodeToString(genericMessageSerializer, message))
}.launchIn(this)
//launch pulling job
val pullSocket = context.createSocket(SocketType.PULL)
pubSocket.bind("$localHost:$zmqPullSocketPort")
launch(Dispatchers.IO) {
while (isActive) {
//This is a blocking call.
val string = pullSocket.recvStr()
val message = MagixEndpoint.magixJson.decodeFromString(genericMessageSerializer, string)
magixFlow.emit(message)
}
}
}
}
/** /**
* A combined RSocket/TCP server * A combined RSocket/TCP server
*/ */
public fun CoroutineScope.startMagixServer( public fun CoroutineScope.startMagixServer(
port: Int = DEFAULT_MAGIX_HTTP_PORT, port: Int = DEFAULT_MAGIX_HTTP_PORT,
buffer: Int = 100, buffer: Int = 100,
enableRawRSocket: Boolean = true,
enableZmq: Boolean = true
): ApplicationEngine { ): ApplicationEngine {
val logger = LoggerFactory.getLogger("magix-server")
val magixFlow = MutableSharedFlow<GenericMagixMessage>( val magixFlow = MutableSharedFlow<GenericMagixMessage>(
buffer, buffer,
extraBufferCapacity = buffer extraBufferCapacity = buffer
) )
//start tcpRSocket server if (enableRawRSocket) {
rawMagixServerSocket(magixFlow) //Start tcpRSocket server
zmqMagixServerSocket(magixFlow) val rawRSocketPort = DEFAULT_MAGIX_RAW_PORT
logger.info("Starting magix raw rsocket server on port $rawRSocketPort")
launchMagixServerRawRSocket(magixFlow, rawRSocketPort)
}
if (enableZmq) {
//Start ZMQ server socket pair
val zmqPubSocketPort: Int = MagixEndpoint.DEFAULT_MAGIX_ZMQ_PUB_PORT
val zmqPullSocketPort: Int = MagixEndpoint.DEFAULT_MAGIX_ZMQ_PULL_PORT
logger.info("Starting magix zmq server on pub port $zmqPubSocketPort and pull port $zmqPullSocketPort")
launchMagixServerZmqSocket(magixFlow, zmqPubSocketPort = zmqPubSocketPort, zmqPullSocketPort = zmqPullSocketPort)
}
return embeddedServer(CIO, port = port) { return embeddedServer(CIO, port = port) {
magixModule(magixFlow) magixModule(magixFlow)

View File

@ -0,0 +1,48 @@
package ru.mipt.npm.magix.server
import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.Job
import kotlinx.coroutines.flow.MutableSharedFlow
import kotlinx.coroutines.flow.launchIn
import kotlinx.coroutines.flow.onEach
import kotlinx.coroutines.isActive
import kotlinx.coroutines.launch
import org.slf4j.LoggerFactory
import org.zeromq.SocketType
import org.zeromq.ZContext
import ru.mipt.npm.magix.api.MagixEndpoint
public fun CoroutineScope.launchMagixServerZmqSocket(
magixFlow: MutableSharedFlow<GenericMagixMessage>,
localHost: String = "tcp://*",
zmqPubSocketPort: Int = MagixEndpoint.DEFAULT_MAGIX_ZMQ_PUB_PORT,
zmqPullSocketPort: Int = MagixEndpoint.DEFAULT_MAGIX_ZMQ_PULL_PORT,
): Job = launch {
val logger = LoggerFactory.getLogger("magix-server-zmq")
ZContext().use { context ->
//launch publishing job
val pubSocket = context.createSocket(SocketType.PUB)
pubSocket.bind("$localHost:$zmqPubSocketPort")
magixFlow.onEach { message ->
val string = MagixEndpoint.magixJson.encodeToString(genericMessageSerializer, message)
pubSocket.send(string)
logger.debug("Published: $string")
}.launchIn(this)
//launch pulling job
val pullSocket = context.createSocket(SocketType.PULL)
pullSocket.bind("$localHost:$zmqPullSocketPort")
//suspending loop while pulling is active
while (isActive) {
//This is a blocking call.
val string: String? = pullSocket.recvStr(zmq.ZMQ.ZMQ_DONTWAIT)
if (string != null) {
logger.debug("Received: $string")
val message = MagixEndpoint.magixJson.decodeFromString(genericMessageSerializer, string)
magixFlow.emit(message)
}
}
}
}

View File

@ -24,17 +24,15 @@ public class ZmqMagixEndpoint<T>(
private val pullPort: Int = MagixEndpoint.DEFAULT_MAGIX_ZMQ_PULL_PORT, private val pullPort: Int = MagixEndpoint.DEFAULT_MAGIX_ZMQ_PULL_PORT,
private val coroutineContext: CoroutineContext = Dispatchers.IO private val coroutineContext: CoroutineContext = Dispatchers.IO
) : MagixEndpoint<T>, AutoCloseable { ) : MagixEndpoint<T>, AutoCloseable {
private val zmqContext = ZContext() private val zmqContext by lazy { ZContext() }
private val serializer = MagixMessage.serializer(payloadSerializer) private val serializer = MagixMessage.serializer(payloadSerializer)
@OptIn(ExperimentalCoroutinesApi::class) @OptIn(ExperimentalCoroutinesApi::class)
override fun subscribe(filter: MagixMessageFilter): Flow<MagixMessage<T>> { override fun subscribe(filter: MagixMessageFilter): Flow<MagixMessage<T>> {
val socket = zmqContext.createSocket(SocketType.XSUB) val socket = zmqContext.createSocket(SocketType.SUB)
socket.connect("$host:$pubPort") socket.connect("$host:$pubPort")
socket.subscribe("")
val topic = "magix"//MagixEndpoint.magixJson.encodeToString(filter)
socket.subscribe(topic)
return channelFlow { return channelFlow {
var activeFlag = true var activeFlag = true
@ -45,9 +43,11 @@ public class ZmqMagixEndpoint<T>(
while (activeFlag) { while (activeFlag) {
try { try {
//This is a blocking call. //This is a blocking call.
val string = socket.recvStr() val string: String? = socket.recvStr(zmq.ZMQ.ZMQ_DONTWAIT)
if (string != null) {
val message = MagixEndpoint.magixJson.decodeFromString(serializer, string) val message = MagixEndpoint.magixJson.decodeFromString(serializer, string)
send(message) send(message)
}
} catch (t: Throwable) { } catch (t: Throwable) {
socket.close() socket.close()
if (t is ZMQException && t.errorCode == ZMQ.Error.ETERM.code) { if (t is ZMQException && t.errorCode == ZMQ.Error.ETERM.code) {
@ -60,9 +60,11 @@ public class ZmqMagixEndpoint<T>(
}.filter(filter).flowOn(coroutineContext) //should be flown on IO because of blocking calls }.filter(filter).flowOn(coroutineContext) //should be flown on IO because of blocking calls
} }
private val publishSocket = zmqContext.createSocket(SocketType.PUSH).apply { private val publishSocket by lazy {
zmqContext.createSocket(SocketType.PUSH).apply {
connect("$host:$pullPort") connect("$host:$pullPort")
} }
}
override suspend fun broadcast(message: MagixMessage<T>): Unit = withContext(coroutineContext) { override suspend fun broadcast(message: MagixMessage<T>): Unit = withContext(coroutineContext) {
val string = MagixEndpoint.magixJson.encodeToString(serializer, message) val string = MagixEndpoint.magixJson.encodeToString(serializer, message)

View File

@ -31,6 +31,7 @@ include(
":magix:magix-rsocket", ":magix:magix-rsocket",
":magix:magix-java-client", ":magix:magix-java-client",
":magix:magix-zmq", ":magix:magix-zmq",
":magix:magix-demo",
":controls-magix-client", ":controls-magix-client",
":motors" ":motors"
) )