From 89190db6538c12a3526df8ea8aca83688dade78a Mon Sep 17 00:00:00 2001 From: Alexander Nozik Date: Mon, 28 Jun 2021 20:58:27 +0300 Subject: [PATCH] ZMQ support(untested) --- controls-server/build.gradle.kts | 3 +- .../server/{conversions.kt => responses.kt} | 24 +++--- demo/build.gradle.kts | 2 +- .../ru/mipt/npm/magix/api/MagixEndpoint.kt | 11 +++ magix/magix-server/build.gradle.kts | 2 + .../ru/mipt/npm/magix/server/MagixServer.kt | 48 ----------- .../ru/mipt/npm/magix/server/magixModule.kt | 2 +- .../kotlin/ru/mipt/npm/magix/server/server.kt | 85 +++++++++++++++++++ magix/magix-zmq/build.gradle.kts | 3 + .../ru/mipt/npm/magix/zmq/ZmqMagixEndpoint.kt | 18 ++-- settings.gradle.kts | 2 +- 11 files changed, 127 insertions(+), 73 deletions(-) rename controls-server/src/main/kotlin/ru/mipt/npm/controls/server/{conversions.kt => responses.kt} (60%) delete mode 100644 magix/magix-server/src/main/kotlin/ru/mipt/npm/magix/server/MagixServer.kt create mode 100644 magix/magix-server/src/main/kotlin/ru/mipt/npm/magix/server/server.kt diff --git a/controls-server/build.gradle.kts b/controls-server/build.gradle.kts index cad4073..3f5597f 100644 --- a/controls-server/build.gradle.kts +++ b/controls-server/build.gradle.kts @@ -4,8 +4,7 @@ plugins { } description = """ - A stand-alone device tree web server which also works as magix event dispatcher. - The server is used to work with stand-alone devices without intermediate control system. + A magix event loop server with web server for visualization. """.trimIndent() val dataforgeVersion: String by rootProject.extra diff --git a/controls-server/src/main/kotlin/ru/mipt/npm/controls/server/conversions.kt b/controls-server/src/main/kotlin/ru/mipt/npm/controls/server/responses.kt similarity index 60% rename from controls-server/src/main/kotlin/ru/mipt/npm/controls/server/conversions.kt rename to controls-server/src/main/kotlin/ru/mipt/npm/controls/server/responses.kt index 592c42f..8041acb 100644 --- a/controls-server/src/main/kotlin/ru/mipt/npm/controls/server/conversions.kt +++ b/controls-server/src/main/kotlin/ru/mipt/npm/controls/server/responses.kt @@ -2,32 +2,30 @@ package ru.mipt.npm.controls.server import io.ktor.application.ApplicationCall import io.ktor.http.ContentType -import io.ktor.http.cio.websocket.Frame import io.ktor.response.respondText import kotlinx.serialization.json.JsonObjectBuilder import kotlinx.serialization.json.buildJsonObject import ru.mipt.npm.controls.api.DeviceMessage import ru.mipt.npm.magix.api.MagixEndpoint -import space.kscience.dataforge.io.* -internal fun Frame.toEnvelope(): Envelope { - return data.asBinary().readWith(TaggedEnvelopeFormat) -} - -internal fun Envelope.toFrame(): Frame { - val data = buildByteArray { - writeWith(TaggedEnvelopeFormat, this@toFrame) - } - return Frame.Binary(false, data) -} +//internal fun Frame.toEnvelope(): Envelope { +// return data.asBinary().readWith(TaggedEnvelopeFormat) +//} +// +//internal fun Envelope.toFrame(): Frame { +// val data = buildByteArray { +// writeWith(TaggedEnvelopeFormat, this@toFrame) +// } +// return Frame.Binary(false, data) +//} internal suspend fun ApplicationCall.respondJson(builder: JsonObjectBuilder.() -> Unit) { val json = buildJsonObject(builder) respondText(json.toString(), contentType = ContentType.Application.Json) } -public suspend fun ApplicationCall.respondMessage(message: DeviceMessage): Unit = respondText( +internal suspend fun ApplicationCall.respondMessage(message: DeviceMessage): Unit = respondText( MagixEndpoint.magixJson.encodeToString(DeviceMessage.serializer(), message), contentType = ContentType.Application.Json ) \ No newline at end of file diff --git a/demo/build.gradle.kts b/demo/build.gradle.kts index cdb9b7f..d197b1f 100644 --- a/demo/build.gradle.kts +++ b/demo/build.gradle.kts @@ -36,5 +36,5 @@ javafx{ } application{ - mainClass.set("space.kscience.dataforge.control.demo.DemoControllerViewKt") + mainClass.set("ru.mipt.npm.controls.demo.DemoControllerViewKt") } \ No newline at end of file diff --git a/magix/magix-api/src/commonMain/kotlin/ru/mipt/npm/magix/api/MagixEndpoint.kt b/magix/magix-api/src/commonMain/kotlin/ru/mipt/npm/magix/api/MagixEndpoint.kt index a266cd1..78ab54d 100644 --- a/magix/magix-api/src/commonMain/kotlin/ru/mipt/npm/magix/api/MagixEndpoint.kt +++ b/magix/magix-api/src/commonMain/kotlin/ru/mipt/npm/magix/api/MagixEndpoint.kt @@ -37,6 +37,17 @@ public interface MagixEndpoint { */ public const val DEFAULT_MAGIX_RAW_PORT: Int = 7778 + /** + * A default PUB port for ZMQ connections + */ + public const val DEFAULT_MAGIX_ZMQ_PUB_PORT: Int = 7781 + + /** + * A default PULL port for ZMQ connections + */ + public const val DEFAULT_MAGIX_ZMQ_PULL_PORT: Int = 7782 + + public val magixJson: Json = Json { ignoreUnknownKeys = true encodeDefaults = false diff --git a/magix/magix-server/build.gradle.kts b/magix/magix-server/build.gradle.kts index e4de48a..63b394f 100644 --- a/magix/magix-server/build.gradle.kts +++ b/magix/magix-server/build.gradle.kts @@ -27,4 +27,6 @@ dependencies{ implementation("io.rsocket.kotlin:rsocket-core:$rsocketVersion") implementation("io.rsocket.kotlin:rsocket-transport-ktor-server:$rsocketVersion") + + implementation("org.zeromq:jeromq:0.5.2") } \ No newline at end of file diff --git a/magix/magix-server/src/main/kotlin/ru/mipt/npm/magix/server/MagixServer.kt b/magix/magix-server/src/main/kotlin/ru/mipt/npm/magix/server/MagixServer.kt deleted file mode 100644 index 0481521..0000000 --- a/magix/magix-server/src/main/kotlin/ru/mipt/npm/magix/server/MagixServer.kt +++ /dev/null @@ -1,48 +0,0 @@ -package ru.mipt.npm.magix.server - -import io.ktor.network.selector.ActorSelectorManager -import io.ktor.network.sockets.aSocket -import io.ktor.server.cio.CIO -import io.ktor.server.engine.ApplicationEngine -import io.ktor.server.engine.embeddedServer -import io.rsocket.kotlin.core.RSocketServer -import io.rsocket.kotlin.transport.ktor.serverTransport -import kotlinx.coroutines.CoroutineScope -import kotlinx.coroutines.Dispatchers -import kotlinx.coroutines.Job -import kotlinx.coroutines.flow.MutableSharedFlow -import ru.mipt.npm.magix.api.MagixEndpoint.Companion.DEFAULT_MAGIX_HTTP_PORT -import ru.mipt.npm.magix.api.MagixEndpoint.Companion.DEFAULT_MAGIX_RAW_PORT - -/** - * - */ -public fun CoroutineScope.rawMagixServerSocket( - magixFlow: MutableSharedFlow, - rawSocketPort: Int = DEFAULT_MAGIX_RAW_PORT -): Job { - val tcpTransport = aSocket(ActorSelectorManager(Dispatchers.IO)).tcp().serverTransport(port = rawSocketPort) - val rSocketJob = RSocketServer().bind(tcpTransport, magixAcceptor(magixFlow)) - coroutineContext[Job]?.invokeOnCompletion{ - rSocketJob.cancel() - } - return rSocketJob; -} - -public fun CoroutineScope.startMagixServer( - port: Int = DEFAULT_MAGIX_HTTP_PORT, - rawSocketPort: Int = DEFAULT_MAGIX_RAW_PORT, - buffer: Int = 100, -): ApplicationEngine { - - val magixFlow = MutableSharedFlow( - buffer, - extraBufferCapacity = buffer - ) - - rawMagixServerSocket(magixFlow, rawSocketPort) - - return embeddedServer(CIO, port = port) { - magixModule(magixFlow) - } -} \ No newline at end of file diff --git a/magix/magix-server/src/main/kotlin/ru/mipt/npm/magix/server/magixModule.kt b/magix/magix-server/src/main/kotlin/ru/mipt/npm/magix/server/magixModule.kt index 74f3d8d..fa90d28 100644 --- a/magix/magix-server/src/main/kotlin/ru/mipt/npm/magix/server/magixModule.kt +++ b/magix/magix-server/src/main/kotlin/ru/mipt/npm/magix/server/magixModule.kt @@ -32,7 +32,7 @@ import java.util.* public typealias GenericMagixMessage = MagixMessage -private val genericMessageSerializer: KSerializer> = +internal val genericMessageSerializer: KSerializer> = MagixMessage.serializer(JsonElement.serializer()) diff --git a/magix/magix-server/src/main/kotlin/ru/mipt/npm/magix/server/server.kt b/magix/magix-server/src/main/kotlin/ru/mipt/npm/magix/server/server.kt new file mode 100644 index 0000000..a405493 --- /dev/null +++ b/magix/magix-server/src/main/kotlin/ru/mipt/npm/magix/server/server.kt @@ -0,0 +1,85 @@ +package ru.mipt.npm.magix.server + +import io.ktor.network.selector.ActorSelectorManager +import io.ktor.network.sockets.aSocket +import io.ktor.server.cio.CIO +import io.ktor.server.engine.ApplicationEngine +import io.ktor.server.engine.embeddedServer +import io.rsocket.kotlin.core.RSocketServer +import io.rsocket.kotlin.transport.ktor.serverTransport +import kotlinx.coroutines.* +import kotlinx.coroutines.flow.MutableSharedFlow +import kotlinx.coroutines.flow.launchIn +import kotlinx.coroutines.flow.onEach +import org.zeromq.SocketType +import org.zeromq.ZContext +import ru.mipt.npm.magix.api.MagixEndpoint +import ru.mipt.npm.magix.api.MagixEndpoint.Companion.DEFAULT_MAGIX_HTTP_PORT +import ru.mipt.npm.magix.api.MagixEndpoint.Companion.DEFAULT_MAGIX_RAW_PORT +import ru.mipt.npm.magix.api.MagixEndpoint.Companion.DEFAULT_MAGIX_ZMQ_PUB_PORT +import ru.mipt.npm.magix.api.MagixEndpoint.Companion.DEFAULT_MAGIX_ZMQ_PULL_PORT + +/** + * Raw TCP magix server + */ +public fun CoroutineScope.rawMagixServerSocket( + magixFlow: MutableSharedFlow, + rawSocketPort: Int = DEFAULT_MAGIX_RAW_PORT +): Job { + val tcpTransport = aSocket(ActorSelectorManager(Dispatchers.IO)).tcp().serverTransport(port = rawSocketPort) + val rSocketJob = RSocketServer().bind(tcpTransport, magixAcceptor(magixFlow)) + coroutineContext[Job]?.invokeOnCompletion { + rSocketJob.cancel() + } + return rSocketJob; +} + +public fun CoroutineScope.zmqMagixServerSocket( + magixFlow: MutableSharedFlow, + localHost: String = "tcp://*", + zmqPubSocketPort: Int = DEFAULT_MAGIX_ZMQ_PUB_PORT, + zmqPullSocketPort: Int = DEFAULT_MAGIX_ZMQ_PULL_PORT, +): Job = launch { + ZContext().use { context -> + //launch publishing job + val pubSocket = context.createSocket(SocketType.XPUB) + pubSocket.bind("$localHost:$zmqPubSocketPort") + magixFlow.onEach { message -> + pubSocket.send(MagixEndpoint.magixJson.encodeToString(genericMessageSerializer, message)) + }.launchIn(this) + + //launch pulling job + val pullSocket = context.createSocket(SocketType.PULL) + pubSocket.bind("$localHost:$zmqPullSocketPort") + launch(Dispatchers.IO) { + while (isActive) { + //This is a blocking call. + val string = pullSocket.recvStr() + val message = MagixEndpoint.magixJson.decodeFromString(genericMessageSerializer, string) + magixFlow.emit(message) + } + } + } +} + +/** + * A combined RSocket/TCP server + */ +public fun CoroutineScope.startMagixServer( + port: Int = DEFAULT_MAGIX_HTTP_PORT, + buffer: Int = 100, +): ApplicationEngine { + + val magixFlow = MutableSharedFlow( + buffer, + extraBufferCapacity = buffer + ) + + //start tcpRSocket server + rawMagixServerSocket(magixFlow) + zmqMagixServerSocket(magixFlow) + + return embeddedServer(CIO, port = port) { + magixModule(magixFlow) + } +} \ No newline at end of file diff --git a/magix/magix-zmq/build.gradle.kts b/magix/magix-zmq/build.gradle.kts index d4e991b..cf9e4be 100644 --- a/magix/magix-zmq/build.gradle.kts +++ b/magix/magix-zmq/build.gradle.kts @@ -3,6 +3,9 @@ plugins { `maven-publish` } +description = """ + ZMQ client endpoint for Magix +""".trimIndent() dependencies { api(projects.magix.magixApi) diff --git a/magix/magix-zmq/src/main/kotlin/ru/mipt/npm/magix/zmq/ZmqMagixEndpoint.kt b/magix/magix-zmq/src/main/kotlin/ru/mipt/npm/magix/zmq/ZmqMagixEndpoint.kt index 068d3d1..bccdfe0 100644 --- a/magix/magix-zmq/src/main/kotlin/ru/mipt/npm/magix/zmq/ZmqMagixEndpoint.kt +++ b/magix/magix-zmq/src/main/kotlin/ru/mipt/npm/magix/zmq/ZmqMagixEndpoint.kt @@ -1,6 +1,7 @@ package ru.mipt.npm.magix.zmq import kotlinx.coroutines.Dispatchers +import kotlinx.coroutines.ExperimentalCoroutinesApi import kotlinx.coroutines.flow.Flow import kotlinx.coroutines.flow.channelFlow import kotlinx.coroutines.flow.flowOn @@ -17,17 +18,20 @@ import ru.mipt.npm.magix.api.filter import kotlin.coroutines.CoroutineContext public class ZmqMagixEndpoint( - private val coroutineContext: CoroutineContext, + private val host: String, payloadSerializer: KSerializer, - private val address: String, + private val pubPort: Int = MagixEndpoint.DEFAULT_MAGIX_ZMQ_PUB_PORT, + private val pullPort: Int = MagixEndpoint.DEFAULT_MAGIX_ZMQ_PULL_PORT, + private val coroutineContext: CoroutineContext = Dispatchers.IO ) : MagixEndpoint, AutoCloseable { private val zmqContext = ZContext() private val serializer = MagixMessage.serializer(payloadSerializer) + @OptIn(ExperimentalCoroutinesApi::class) override fun subscribe(filter: MagixMessageFilter): Flow> { val socket = zmqContext.createSocket(SocketType.XSUB) - socket.bind(address) + socket.connect("$host:$pubPort") val topic = "magix"//MagixEndpoint.magixJson.encodeToString(filter) socket.subscribe(topic) @@ -53,14 +57,14 @@ public class ZmqMagixEndpoint( } } } - }.filter(filter).flowOn(Dispatchers.IO) //should be flown on IO because of blocking calls + }.filter(filter).flowOn(coroutineContext) //should be flown on IO because of blocking calls } - private val publishSocket = zmqContext.createSocket(SocketType.XPUB).apply { - bind(address) + private val publishSocket = zmqContext.createSocket(SocketType.PUSH).apply { + connect("$host:$pullPort") } - override suspend fun broadcast(message: MagixMessage): Unit = withContext(Dispatchers.IO) { + override suspend fun broadcast(message: MagixMessage): Unit = withContext(coroutineContext) { val string = MagixEndpoint.magixJson.encodeToString(serializer, message) publishSocket.send(string) } diff --git a/settings.gradle.kts b/settings.gradle.kts index 4430f74..1f0cd42 100644 --- a/settings.gradle.kts +++ b/settings.gradle.kts @@ -1,4 +1,4 @@ -rootProject.name = "controls" +rootProject.name = "controls-kt" enableFeaturePreview("TYPESAFE_PROJECT_ACCESSORS")