ZMQ support(untested)

This commit is contained in:
Alexander Nozik 2021-06-28 20:58:27 +03:00
parent 596e3a0cfc
commit 89190db653
11 changed files with 127 additions and 73 deletions

View File

@ -4,8 +4,7 @@ plugins {
} }
description = """ description = """
A stand-alone device tree web server which also works as magix event dispatcher. A magix event loop server with web server for visualization.
The server is used to work with stand-alone devices without intermediate control system.
""".trimIndent() """.trimIndent()
val dataforgeVersion: String by rootProject.extra val dataforgeVersion: String by rootProject.extra

View File

@ -2,32 +2,30 @@ package ru.mipt.npm.controls.server
import io.ktor.application.ApplicationCall import io.ktor.application.ApplicationCall
import io.ktor.http.ContentType import io.ktor.http.ContentType
import io.ktor.http.cio.websocket.Frame
import io.ktor.response.respondText import io.ktor.response.respondText
import kotlinx.serialization.json.JsonObjectBuilder import kotlinx.serialization.json.JsonObjectBuilder
import kotlinx.serialization.json.buildJsonObject import kotlinx.serialization.json.buildJsonObject
import ru.mipt.npm.controls.api.DeviceMessage import ru.mipt.npm.controls.api.DeviceMessage
import ru.mipt.npm.magix.api.MagixEndpoint import ru.mipt.npm.magix.api.MagixEndpoint
import space.kscience.dataforge.io.*
internal fun Frame.toEnvelope(): Envelope { //internal fun Frame.toEnvelope(): Envelope {
return data.asBinary().readWith(TaggedEnvelopeFormat) // return data.asBinary().readWith(TaggedEnvelopeFormat)
} //}
//
internal fun Envelope.toFrame(): Frame { //internal fun Envelope.toFrame(): Frame {
val data = buildByteArray { // val data = buildByteArray {
writeWith(TaggedEnvelopeFormat, this@toFrame) // writeWith(TaggedEnvelopeFormat, this@toFrame)
} // }
return Frame.Binary(false, data) // return Frame.Binary(false, data)
} //}
internal suspend fun ApplicationCall.respondJson(builder: JsonObjectBuilder.() -> Unit) { internal suspend fun ApplicationCall.respondJson(builder: JsonObjectBuilder.() -> Unit) {
val json = buildJsonObject(builder) val json = buildJsonObject(builder)
respondText(json.toString(), contentType = ContentType.Application.Json) respondText(json.toString(), contentType = ContentType.Application.Json)
} }
public suspend fun ApplicationCall.respondMessage(message: DeviceMessage): Unit = respondText( internal suspend fun ApplicationCall.respondMessage(message: DeviceMessage): Unit = respondText(
MagixEndpoint.magixJson.encodeToString(DeviceMessage.serializer(), message), MagixEndpoint.magixJson.encodeToString(DeviceMessage.serializer(), message),
contentType = ContentType.Application.Json contentType = ContentType.Application.Json
) )

View File

@ -36,5 +36,5 @@ javafx{
} }
application{ application{
mainClass.set("space.kscience.dataforge.control.demo.DemoControllerViewKt") mainClass.set("ru.mipt.npm.controls.demo.DemoControllerViewKt")
} }

View File

@ -37,6 +37,17 @@ public interface MagixEndpoint<T> {
*/ */
public const val DEFAULT_MAGIX_RAW_PORT: Int = 7778 public const val DEFAULT_MAGIX_RAW_PORT: Int = 7778
/**
* A default PUB port for ZMQ connections
*/
public const val DEFAULT_MAGIX_ZMQ_PUB_PORT: Int = 7781
/**
* A default PULL port for ZMQ connections
*/
public const val DEFAULT_MAGIX_ZMQ_PULL_PORT: Int = 7782
public val magixJson: Json = Json { public val magixJson: Json = Json {
ignoreUnknownKeys = true ignoreUnknownKeys = true
encodeDefaults = false encodeDefaults = false

View File

@ -27,4 +27,6 @@ dependencies{
implementation("io.rsocket.kotlin:rsocket-core:$rsocketVersion") implementation("io.rsocket.kotlin:rsocket-core:$rsocketVersion")
implementation("io.rsocket.kotlin:rsocket-transport-ktor-server:$rsocketVersion") implementation("io.rsocket.kotlin:rsocket-transport-ktor-server:$rsocketVersion")
implementation("org.zeromq:jeromq:0.5.2")
} }

View File

@ -1,48 +0,0 @@
package ru.mipt.npm.magix.server
import io.ktor.network.selector.ActorSelectorManager
import io.ktor.network.sockets.aSocket
import io.ktor.server.cio.CIO
import io.ktor.server.engine.ApplicationEngine
import io.ktor.server.engine.embeddedServer
import io.rsocket.kotlin.core.RSocketServer
import io.rsocket.kotlin.transport.ktor.serverTransport
import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.Dispatchers
import kotlinx.coroutines.Job
import kotlinx.coroutines.flow.MutableSharedFlow
import ru.mipt.npm.magix.api.MagixEndpoint.Companion.DEFAULT_MAGIX_HTTP_PORT
import ru.mipt.npm.magix.api.MagixEndpoint.Companion.DEFAULT_MAGIX_RAW_PORT
/**
*
*/
public fun CoroutineScope.rawMagixServerSocket(
magixFlow: MutableSharedFlow<GenericMagixMessage>,
rawSocketPort: Int = DEFAULT_MAGIX_RAW_PORT
): Job {
val tcpTransport = aSocket(ActorSelectorManager(Dispatchers.IO)).tcp().serverTransport(port = rawSocketPort)
val rSocketJob = RSocketServer().bind(tcpTransport, magixAcceptor(magixFlow))
coroutineContext[Job]?.invokeOnCompletion{
rSocketJob.cancel()
}
return rSocketJob;
}
public fun CoroutineScope.startMagixServer(
port: Int = DEFAULT_MAGIX_HTTP_PORT,
rawSocketPort: Int = DEFAULT_MAGIX_RAW_PORT,
buffer: Int = 100,
): ApplicationEngine {
val magixFlow = MutableSharedFlow<GenericMagixMessage>(
buffer,
extraBufferCapacity = buffer
)
rawMagixServerSocket(magixFlow, rawSocketPort)
return embeddedServer(CIO, port = port) {
magixModule(magixFlow)
}
}

View File

@ -32,7 +32,7 @@ import java.util.*
public typealias GenericMagixMessage = MagixMessage<JsonElement> public typealias GenericMagixMessage = MagixMessage<JsonElement>
private val genericMessageSerializer: KSerializer<MagixMessage<JsonElement>> = internal val genericMessageSerializer: KSerializer<MagixMessage<JsonElement>> =
MagixMessage.serializer(JsonElement.serializer()) MagixMessage.serializer(JsonElement.serializer())

View File

@ -0,0 +1,85 @@
package ru.mipt.npm.magix.server
import io.ktor.network.selector.ActorSelectorManager
import io.ktor.network.sockets.aSocket
import io.ktor.server.cio.CIO
import io.ktor.server.engine.ApplicationEngine
import io.ktor.server.engine.embeddedServer
import io.rsocket.kotlin.core.RSocketServer
import io.rsocket.kotlin.transport.ktor.serverTransport
import kotlinx.coroutines.*
import kotlinx.coroutines.flow.MutableSharedFlow
import kotlinx.coroutines.flow.launchIn
import kotlinx.coroutines.flow.onEach
import org.zeromq.SocketType
import org.zeromq.ZContext
import ru.mipt.npm.magix.api.MagixEndpoint
import ru.mipt.npm.magix.api.MagixEndpoint.Companion.DEFAULT_MAGIX_HTTP_PORT
import ru.mipt.npm.magix.api.MagixEndpoint.Companion.DEFAULT_MAGIX_RAW_PORT
import ru.mipt.npm.magix.api.MagixEndpoint.Companion.DEFAULT_MAGIX_ZMQ_PUB_PORT
import ru.mipt.npm.magix.api.MagixEndpoint.Companion.DEFAULT_MAGIX_ZMQ_PULL_PORT
/**
* Raw TCP magix server
*/
public fun CoroutineScope.rawMagixServerSocket(
magixFlow: MutableSharedFlow<GenericMagixMessage>,
rawSocketPort: Int = DEFAULT_MAGIX_RAW_PORT
): Job {
val tcpTransport = aSocket(ActorSelectorManager(Dispatchers.IO)).tcp().serverTransport(port = rawSocketPort)
val rSocketJob = RSocketServer().bind(tcpTransport, magixAcceptor(magixFlow))
coroutineContext[Job]?.invokeOnCompletion {
rSocketJob.cancel()
}
return rSocketJob;
}
public fun CoroutineScope.zmqMagixServerSocket(
magixFlow: MutableSharedFlow<GenericMagixMessage>,
localHost: String = "tcp://*",
zmqPubSocketPort: Int = DEFAULT_MAGIX_ZMQ_PUB_PORT,
zmqPullSocketPort: Int = DEFAULT_MAGIX_ZMQ_PULL_PORT,
): Job = launch {
ZContext().use { context ->
//launch publishing job
val pubSocket = context.createSocket(SocketType.XPUB)
pubSocket.bind("$localHost:$zmqPubSocketPort")
magixFlow.onEach { message ->
pubSocket.send(MagixEndpoint.magixJson.encodeToString(genericMessageSerializer, message))
}.launchIn(this)
//launch pulling job
val pullSocket = context.createSocket(SocketType.PULL)
pubSocket.bind("$localHost:$zmqPullSocketPort")
launch(Dispatchers.IO) {
while (isActive) {
//This is a blocking call.
val string = pullSocket.recvStr()
val message = MagixEndpoint.magixJson.decodeFromString(genericMessageSerializer, string)
magixFlow.emit(message)
}
}
}
}
/**
* A combined RSocket/TCP server
*/
public fun CoroutineScope.startMagixServer(
port: Int = DEFAULT_MAGIX_HTTP_PORT,
buffer: Int = 100,
): ApplicationEngine {
val magixFlow = MutableSharedFlow<GenericMagixMessage>(
buffer,
extraBufferCapacity = buffer
)
//start tcpRSocket server
rawMagixServerSocket(magixFlow)
zmqMagixServerSocket(magixFlow)
return embeddedServer(CIO, port = port) {
magixModule(magixFlow)
}
}

View File

@ -3,6 +3,9 @@ plugins {
`maven-publish` `maven-publish`
} }
description = """
ZMQ client endpoint for Magix
""".trimIndent()
dependencies { dependencies {
api(projects.magix.magixApi) api(projects.magix.magixApi)

View File

@ -1,6 +1,7 @@
package ru.mipt.npm.magix.zmq package ru.mipt.npm.magix.zmq
import kotlinx.coroutines.Dispatchers import kotlinx.coroutines.Dispatchers
import kotlinx.coroutines.ExperimentalCoroutinesApi
import kotlinx.coroutines.flow.Flow import kotlinx.coroutines.flow.Flow
import kotlinx.coroutines.flow.channelFlow import kotlinx.coroutines.flow.channelFlow
import kotlinx.coroutines.flow.flowOn import kotlinx.coroutines.flow.flowOn
@ -17,17 +18,20 @@ import ru.mipt.npm.magix.api.filter
import kotlin.coroutines.CoroutineContext import kotlin.coroutines.CoroutineContext
public class ZmqMagixEndpoint<T>( public class ZmqMagixEndpoint<T>(
private val coroutineContext: CoroutineContext, private val host: String,
payloadSerializer: KSerializer<T>, payloadSerializer: KSerializer<T>,
private val address: String, private val pubPort: Int = MagixEndpoint.DEFAULT_MAGIX_ZMQ_PUB_PORT,
private val pullPort: Int = MagixEndpoint.DEFAULT_MAGIX_ZMQ_PULL_PORT,
private val coroutineContext: CoroutineContext = Dispatchers.IO
) : MagixEndpoint<T>, AutoCloseable { ) : MagixEndpoint<T>, AutoCloseable {
private val zmqContext = ZContext() private val zmqContext = ZContext()
private val serializer = MagixMessage.serializer(payloadSerializer) private val serializer = MagixMessage.serializer(payloadSerializer)
@OptIn(ExperimentalCoroutinesApi::class)
override fun subscribe(filter: MagixMessageFilter): Flow<MagixMessage<T>> { override fun subscribe(filter: MagixMessageFilter): Flow<MagixMessage<T>> {
val socket = zmqContext.createSocket(SocketType.XSUB) val socket = zmqContext.createSocket(SocketType.XSUB)
socket.bind(address) socket.connect("$host:$pubPort")
val topic = "magix"//MagixEndpoint.magixJson.encodeToString(filter) val topic = "magix"//MagixEndpoint.magixJson.encodeToString(filter)
socket.subscribe(topic) socket.subscribe(topic)
@ -53,14 +57,14 @@ public class ZmqMagixEndpoint<T>(
} }
} }
} }
}.filter(filter).flowOn(Dispatchers.IO) //should be flown on IO because of blocking calls }.filter(filter).flowOn(coroutineContext) //should be flown on IO because of blocking calls
} }
private val publishSocket = zmqContext.createSocket(SocketType.XPUB).apply { private val publishSocket = zmqContext.createSocket(SocketType.PUSH).apply {
bind(address) connect("$host:$pullPort")
} }
override suspend fun broadcast(message: MagixMessage<T>): Unit = withContext(Dispatchers.IO) { override suspend fun broadcast(message: MagixMessage<T>): Unit = withContext(coroutineContext) {
val string = MagixEndpoint.magixJson.encodeToString(serializer, message) val string = MagixEndpoint.magixJson.encodeToString(serializer, message)
publishSocket.send(string) publishSocket.send(string)
} }

View File

@ -1,4 +1,4 @@
rootProject.name = "controls" rootProject.name = "controls-kt"
enableFeaturePreview("TYPESAFE_PROJECT_ACCESSORS") enableFeaturePreview("TYPESAFE_PROJECT_ACCESSORS")