Refactor magix server flow plugins

This commit is contained in:
Alexander Nozik 2023-02-25 17:45:09 +03:00
parent fe24c0ee31
commit 2a386568f9
11 changed files with 180 additions and 176 deletions

View File

@ -36,8 +36,8 @@ import space.kscience.dataforge.meta.toMeta
import space.kscience.dataforge.names.Name
import space.kscience.dataforge.names.asName
import space.kscience.magix.api.MagixEndpoint
import space.kscience.magix.api.MagixFlowPlugin
import space.kscience.magix.api.MagixMessage
import space.kscience.magix.server.launchMagixServerRawRSocket
import space.kscience.magix.server.magixModule
/**
@ -47,9 +47,7 @@ public fun CoroutineScope.startDeviceServer(
manager: DeviceManager,
port: Int = MagixEndpoint.DEFAULT_MAGIX_HTTP_PORT,
host: String = "localhost",
): ApplicationEngine {
return this.embeddedServer(CIO, port, host) {
): ApplicationEngine = embeddedServer(CIO, port, host) {
install(WebSockets)
// install(CORS) {
// anyHost()
@ -65,8 +63,7 @@ public fun CoroutineScope.startDeviceServer(
call.respondRedirect("/dashboard")
}
}
}.start()
}
}.start()
public fun ApplicationEngine.whenStarted(callback: Application.() -> Unit) {
environment.monitor.subscribe(ApplicationStarted, callback)
@ -77,9 +74,9 @@ public val WEB_SERVER_TARGET: Name = "@webServer".asName()
public fun Application.deviceManagerModule(
manager: DeviceManager,
vararg plugins: MagixFlowPlugin,
deviceNames: Collection<String> = manager.devices.keys.map { it.toString() },
route: String = "/",
rawSocketPort: Int = MagixEndpoint.DEFAULT_MAGIX_RAW_PORT,
buffer: Int = 100,
) {
if (pluginOrNull(WebSockets) == null) {
@ -217,6 +214,8 @@ public fun Application.deviceManagerModule(
extraBufferCapacity = buffer
)
launchMagixServerRawRSocket(magixFlow, rawSocketPort)
plugins.forEach {
it.start(this, magixFlow)
}
magixModule(magixFlow)
}

View File

@ -21,6 +21,8 @@ import space.kscience.dataforge.context.*
import space.kscience.magix.api.MagixEndpoint
import space.kscience.magix.rsocket.rSocketWithTcp
import space.kscience.magix.rsocket.rSocketWithWebSockets
import space.kscience.magix.server.RSocketMagix
import space.kscience.magix.server.ZmqMagix
import space.kscience.magix.server.startMagixServer
import tornadofx.*
import java.awt.Desktop
@ -49,7 +51,7 @@ class DemoController : Controller(), ContextAware {
context.launch {
device = deviceManager.install("demo", DemoDevice)
//starting magix event loop
magixServer = startMagixServer(enableRawRSocket = true, enableZmq = true)
magixServer = startMagixServer(RSocketMagix(), ZmqMagix())
//Launch device client and connect it to the server
val deviceEndpoint = MagixEndpoint.rSocketWithTcp("localhost")
deviceManager.connectToMagix(deviceEndpoint)

View File

@ -18,6 +18,8 @@ import space.kscience.dataforge.context.*
import space.kscience.dataforge.meta.Meta
import space.kscience.magix.api.MagixEndpoint
import space.kscience.magix.rsocket.rSocketWithTcp
import space.kscience.magix.server.RSocketMagix
import space.kscience.magix.server.ZmqMagix
import space.kscience.magix.server.startMagixServer
import space.kscience.magix.storage.xodus.storeInXodus
import tornadofx.*
@ -47,7 +49,7 @@ class VirtualCarController : Controller(), ContextAware {
virtualCar = deviceManager.install("virtual-car", VirtualCar)
//starting magix event loop and connect it to entity store
magixServer = startMagixServer(enableRawRSocket = true, enableZmq = true)
magixServer = startMagixServer(RSocketMagix(), ZmqMagix())
storageEndpoint = MagixEndpoint.rSocketWithTcp("localhost").apply {
storeInXodus(this@launch, magixEntityStorePath)

View File

@ -1,11 +1,12 @@
package space.kscience.controls.demo.echo
import io.ktor.server.application.log
import kotlinx.coroutines.*
import kotlinx.coroutines.flow.launchIn
import kotlinx.coroutines.flow.onEach
import kotlinx.serialization.json.JsonObject
import org.slf4j.LoggerFactory
import space.kscience.magix.api.MagixEndpoint
import space.kscience.magix.api.MagixFlowPlugin
import space.kscience.magix.api.MagixMessage
import space.kscience.magix.api.MagixMessageFilter
import space.kscience.magix.rsocket.rSocketStreamWithWebSockets
@ -59,16 +60,17 @@ private suspend fun MagixEndpoint.collectEcho(scope: CoroutineScope, n: Int) {
@OptIn(ExperimentalTime::class)
suspend fun main(): Unit = coroutineScope {
launch(Dispatchers.Default) {
val server = startMagixServer(enableRawRSocket = false, enableZmq = false) { flow ->
val server = startMagixServer(MagixFlowPlugin { _, flow ->
val logger = LoggerFactory.getLogger("echo")
//echo each message
flow.onEach { message ->
if (message.parentId == null) {
val m = message.copy(origin = "loop", parentId = message.id, id = message.id + ".response")
log.info("echo: $m")
logger.info(m.toString())
flow.emit(m)
}
}.launchIn(this)
}
})
val responseTime = measureTime {

View File

@ -9,6 +9,8 @@ import kotlinx.serialization.json.*
import org.slf4j.LoggerFactory
import space.kscience.magix.api.MagixEndpoint
import space.kscience.magix.api.MagixMessage
import space.kscience.magix.server.RSocketMagix
import space.kscience.magix.server.ZmqMagix
import space.kscience.magix.server.startMagixServer
import space.kscince.magix.zmq.ZmqMagixEndpoint
import java.awt.Desktop
@ -22,7 +24,7 @@ suspend fun MagixEndpoint.sendJson(
id: String? = null,
parentId: String? = null,
user: JsonElement? = null,
builder: JsonObjectBuilder.() -> Unit
builder: JsonObjectBuilder.() -> Unit,
): Unit = broadcast(MagixMessage(format, buildJsonObject(builder), origin, target, id, parentId, user))
internal const val numberOfMessages = 100
@ -30,10 +32,7 @@ internal const val numberOfMessages = 100
suspend fun main(): Unit = coroutineScope {
val logger = LoggerFactory.getLogger("magix-demo")
logger.info("Starting magix server")
val server = startMagixServer(
buffer = 10,
enableRawRSocket = false //Disable rsocket to avoid kotlin 1.5 compatibility issue
)
val server = startMagixServer(RSocketMagix(), ZmqMagix(), buffer = 10)
server.apply {
val host = "localhost"//environment.connectors.first().host
@ -44,7 +43,7 @@ suspend fun main(): Unit = coroutineScope {
logger.info("Starting client")
//Create zmq magix endpoint and wait for to finish
ZmqMagixEndpoint("localhost","tcp").use { client ->
ZmqMagixEndpoint("localhost", "tcp").use { client ->
logger.info("Starting subscription")
client.subscribe().onEach {
println(it.payload)

View File

@ -0,0 +1,9 @@
package space.kscience.magix.api
import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.Job
import kotlinx.coroutines.flow.MutableSharedFlow
public fun interface MagixFlowPlugin {
public fun start(scope: CoroutineScope, magixFlow: MutableSharedFlow<MagixMessage>): Job
}

View File

@ -0,0 +1,74 @@
package space.kscience.magix.server
import io.rsocket.kotlin.ConnectionAcceptor
import io.rsocket.kotlin.RSocketRequestHandler
import io.rsocket.kotlin.core.RSocketServer
import io.rsocket.kotlin.payload.Payload
import io.rsocket.kotlin.payload.buildPayload
import io.rsocket.kotlin.payload.data
import io.rsocket.kotlin.transport.ktor.tcp.TcpServer
import io.rsocket.kotlin.transport.ktor.tcp.TcpServerTransport
import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.Job
import kotlinx.coroutines.flow.*
import kotlinx.serialization.encodeToString
import space.kscience.magix.api.*
import space.kscience.magix.api.MagixEndpoint.Companion.DEFAULT_MAGIX_RAW_PORT
/**
* Raw TCP magix server
*/
public class RSocketMagix(public val port: Int = DEFAULT_MAGIX_RAW_PORT): MagixFlowPlugin {
override fun start(scope: CoroutineScope, magixFlow: MutableSharedFlow<MagixMessage>): Job {
val tcpTransport = TcpServerTransport(port = port)
val rSocketJob: TcpServer = RSocketServer().bindIn(scope, tcpTransport, acceptor(scope, magixFlow))
scope.coroutineContext[Job]?.invokeOnCompletion {
rSocketJob.handlerJob.cancel()
}
return rSocketJob.handlerJob
}
public companion object{
public fun acceptor(
coroutineScope: CoroutineScope,
magixFlow: MutableSharedFlow<MagixMessage>,
): ConnectionAcceptor = ConnectionAcceptor {
RSocketRequestHandler(coroutineScope.coroutineContext) {
//handler for request/stream
requestStream { request: Payload ->
val filter = MagixEndpoint.magixJson.decodeFromString(MagixMessageFilter.serializer(), request.data.readText())
magixFlow.filter(filter).map { message ->
val string = MagixEndpoint.magixJson.encodeToString(MagixMessage.serializer(), message)
buildPayload { data(string) }
}
}
//single send
fireAndForget { request: Payload ->
val message = MagixEndpoint.magixJson.decodeFromString(MagixMessage.serializer(), request.data.readText())
magixFlow.emit(message)
}
// bi-directional connection
requestChannel { request: Payload, input: Flow<Payload> ->
input.onEach {
magixFlow.emit(MagixEndpoint.magixJson.decodeFromString(MagixMessage.serializer(), it.data.readText()))
}.launchIn(this)
val filterText = request.data.readText()
val filter = if(filterText.isNotBlank()){
MagixEndpoint.magixJson.decodeFromString(MagixMessageFilter.serializer(), filterText)
} else {
MagixMessageFilter()
}
magixFlow.filter(filter).map { message ->
val string = MagixEndpoint.magixJson.encodeToString(message)
buildPayload { data(string) }
}
}
}
}
}
}

View File

@ -0,0 +1,53 @@
package space.kscience.magix.server
import kotlinx.coroutines.*
import kotlinx.coroutines.flow.MutableSharedFlow
import kotlinx.coroutines.flow.launchIn
import kotlinx.coroutines.flow.onEach
import kotlinx.serialization.decodeFromString
import kotlinx.serialization.encodeToString
import org.slf4j.LoggerFactory
import org.zeromq.SocketType
import org.zeromq.ZContext
import space.kscience.magix.api.MagixEndpoint
import space.kscience.magix.api.MagixFlowPlugin
import space.kscience.magix.api.MagixMessage
public class ZmqMagix(
public val localHost: String = "tcp://*",
public val zmqPubSocketPort: Int = MagixEndpoint.DEFAULT_MAGIX_ZMQ_PUB_PORT,
public val zmqPullSocketPort: Int = MagixEndpoint.DEFAULT_MAGIX_ZMQ_PULL_PORT,
) : MagixFlowPlugin {
override fun start(scope: CoroutineScope, magixFlow: MutableSharedFlow<MagixMessage>): Job =
scope.launch(Dispatchers.IO) {
val logger = LoggerFactory.getLogger("magix-server-zmq")
ZContext().use { context ->
//launch publishing job
val pubSocket = context.createSocket(SocketType.PUB)
pubSocket.bind("$localHost:$zmqPubSocketPort")
magixFlow.onEach { message ->
val string = MagixEndpoint.magixJson.encodeToString(message)
pubSocket.send(string)
logger.debug("Published: $string")
}.launchIn(this)
//launch pulling job
val pullSocket = context.createSocket(SocketType.PULL)
pullSocket.bind("$localHost:$zmqPullSocketPort")
pullSocket.receiveTimeOut = 500
//suspending loop while pulling is active
while (isActive) {
val string: String? = pullSocket.recvStr()
if (string != null) {
logger.debug("Received: $string")
val message = MagixEndpoint.magixJson.decodeFromString<MagixMessage>(string)
magixFlow.emit(message)
}
}
}
}
}

View File

@ -8,15 +8,10 @@ import io.ktor.server.request.receive
import io.ktor.server.routing.*
import io.ktor.server.util.getValue
import io.ktor.server.websocket.WebSockets
import io.rsocket.kotlin.ConnectionAcceptor
import io.rsocket.kotlin.RSocketRequestHandler
import io.rsocket.kotlin.ktor.server.RSocketSupport
import io.rsocket.kotlin.ktor.server.rSocket
import io.rsocket.kotlin.payload.Payload
import io.rsocket.kotlin.payload.buildPayload
import io.rsocket.kotlin.payload.data
import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.flow.*
import kotlinx.coroutines.flow.MutableSharedFlow
import kotlinx.coroutines.flow.map
import kotlinx.html.*
import kotlinx.serialization.encodeToString
import space.kscience.magix.api.MagixEndpoint.Companion.magixJson
@ -26,45 +21,6 @@ import space.kscience.magix.api.filter
import java.util.*
internal fun CoroutineScope.magixAcceptor(
magixFlow: MutableSharedFlow<MagixMessage>,
): ConnectionAcceptor = ConnectionAcceptor {
RSocketRequestHandler(coroutineContext) {
//handler for request/stream
requestStream { request: Payload ->
val filter = magixJson.decodeFromString(MagixMessageFilter.serializer(), request.data.readText())
magixFlow.filter(filter).map { message ->
val string = magixJson.encodeToString(MagixMessage.serializer(), message)
buildPayload { data(string) }
}
}
//single send
fireAndForget { request: Payload ->
val message = magixJson.decodeFromString(MagixMessage.serializer(), request.data.readText())
magixFlow.emit(message)
}
// bi-directional connection
requestChannel { request: Payload, input: Flow<Payload> ->
input.onEach {
magixFlow.emit(magixJson.decodeFromString(MagixMessage.serializer(), it.data.readText()))
}.launchIn(this)
val filterText = request.data.readText()
val filter = if(filterText.isNotBlank()){
magixJson.decodeFromString(MagixMessageFilter.serializer(), filterText)
} else {
MagixMessageFilter()
}
magixFlow.filter(filter).map { message ->
val string = magixJson.encodeToString(message)
buildPayload { data(string) }
}
}
}
}
/**
* Create a message filter from call parameters
*/
@ -84,7 +40,7 @@ private fun ApplicationCall.buildFilter(): MagixMessageFilter {
}
/**
* Attache magix http/sse and websocket-based rsocket event loop + statistics page to existing [MutableSharedFlow]
* Attach magix http/sse and websocket-based rsocket event loop + statistics page to existing [MutableSharedFlow]
*/
public fun Application.magixModule(magixFlow: MutableSharedFlow<MagixMessage>, route: String = "/") {
if (pluginOrNull(WebSockets) == null) {
@ -149,7 +105,7 @@ public fun Application.magixModule(magixFlow: MutableSharedFlow<MagixMessage>, r
magixFlow.emit(message)
}
//rSocket server. Filter from Payload
rSocket("rsocket", acceptor = application.magixAcceptor(magixFlow))
rSocket("rsocket", acceptor = RSocketMagix.acceptor( application, magixFlow))
}
}
}

View File

@ -1,80 +1,36 @@
package space.kscience.magix.server
import io.ktor.server.application.Application
import io.ktor.server.cio.CIO
import io.ktor.server.engine.ApplicationEngine
import io.ktor.server.engine.embeddedServer
import io.rsocket.kotlin.core.RSocketServer
import io.rsocket.kotlin.transport.ktor.tcp.TcpServer
import io.rsocket.kotlin.transport.ktor.tcp.TcpServerTransport
import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.Job
import kotlinx.coroutines.channels.BufferOverflow
import kotlinx.coroutines.flow.MutableSharedFlow
import org.slf4j.LoggerFactory
import space.kscience.magix.api.MagixEndpoint
import space.kscience.magix.api.MagixEndpoint.Companion.DEFAULT_MAGIX_HTTP_PORT
import space.kscience.magix.api.MagixEndpoint.Companion.DEFAULT_MAGIX_RAW_PORT
import space.kscience.magix.api.MagixFlowPlugin
import space.kscience.magix.api.MagixMessage
/**
* Raw TCP magix server
*/
public fun CoroutineScope.launchMagixServerRawRSocket(
magixFlow: MutableSharedFlow<MagixMessage>,
rawSocketPort: Int = DEFAULT_MAGIX_RAW_PORT,
): TcpServer {
val tcpTransport = TcpServerTransport(port = rawSocketPort)
val rSocketJob: TcpServer = RSocketServer().bindIn(this, tcpTransport, magixAcceptor(magixFlow))
coroutineContext[Job]?.invokeOnCompletion {
rSocketJob.handlerJob.cancel()
}
return rSocketJob;
}
/**
* A combined RSocket/TCP/ZMQ server
* @param applicationConfiguration optional additional configuration for magix loop server
*/
public fun CoroutineScope.startMagixServer(
vararg plugins: MagixFlowPlugin,
port: Int = DEFAULT_MAGIX_HTTP_PORT,
buffer: Int = 1000,
enableRawRSocket: Boolean = true,
rawRSocketPort: Int = DEFAULT_MAGIX_RAW_PORT,
enableZmq: Boolean = true,
zmqPubSocketPort: Int = MagixEndpoint.DEFAULT_MAGIX_ZMQ_PUB_PORT,
zmqPullSocketPort: Int = MagixEndpoint.DEFAULT_MAGIX_ZMQ_PULL_PORT,
applicationConfiguration: Application.(MutableSharedFlow<MagixMessage>) -> Unit = {},
): ApplicationEngine {
val logger = LoggerFactory.getLogger("magix-server")
val magixFlow = MutableSharedFlow<MagixMessage>(
replay = buffer,
extraBufferCapacity = buffer,
onBufferOverflow = BufferOverflow.DROP_OLDEST
)
if (enableRawRSocket) {
//Start tcpRSocket server
logger.info("Starting magix raw rsocket server on port $rawRSocketPort")
launchMagixServerRawRSocket(magixFlow, rawRSocketPort)
}
if (enableZmq) {
//Start ZMQ server socket pair
logger.info("Starting magix zmq server on pub port $zmqPubSocketPort and pull port $zmqPullSocketPort")
launchMagixServerZmqSocket(
magixFlow,
zmqPubSocketPort = zmqPubSocketPort,
zmqPullSocketPort = zmqPullSocketPort
)
plugins.forEach {
it.start(this, magixFlow)
}
@Suppress("ExtractKtorModule")
return embeddedServer(CIO, host = "localhost", port = port) {
magixModule(magixFlow)
applicationConfiguration(magixFlow)
}.apply {
return embeddedServer(CIO, host = "localhost", port = port, module = { magixModule(magixFlow) }).apply {
start()
}
}

View File

@ -1,48 +0,0 @@
package space.kscience.magix.server
import kotlinx.coroutines.*
import kotlinx.coroutines.flow.MutableSharedFlow
import kotlinx.coroutines.flow.launchIn
import kotlinx.coroutines.flow.onEach
import kotlinx.serialization.decodeFromString
import kotlinx.serialization.encodeToString
import org.slf4j.LoggerFactory
import org.zeromq.SocketType
import org.zeromq.ZContext
import space.kscience.magix.api.MagixEndpoint
import space.kscience.magix.api.MagixMessage
public fun CoroutineScope.launchMagixServerZmqSocket(
magixFlow: MutableSharedFlow<MagixMessage>,
localHost: String = "tcp://*",
zmqPubSocketPort: Int = MagixEndpoint.DEFAULT_MAGIX_ZMQ_PUB_PORT,
zmqPullSocketPort: Int = MagixEndpoint.DEFAULT_MAGIX_ZMQ_PULL_PORT,
): Job = launch(Dispatchers.IO) {
val logger = LoggerFactory.getLogger("magix-server-zmq")
ZContext().use { context ->
//launch publishing job
val pubSocket = context.createSocket(SocketType.PUB)
pubSocket.bind("$localHost:$zmqPubSocketPort")
magixFlow.onEach { message ->
val string = MagixEndpoint.magixJson.encodeToString(message)
pubSocket.send(string)
logger.debug("Published: $string")
}.launchIn(this)
//launch pulling job
val pullSocket = context.createSocket(SocketType.PULL)
pullSocket.bind("$localHost:$zmqPullSocketPort")
pullSocket.receiveTimeOut = 500
//suspending loop while pulling is active
while (isActive) {
val string: String? = pullSocket.recvStr()
if (string != null) {
logger.debug("Received: $string")
val message = MagixEndpoint.magixJson.decodeFromString<MagixMessage>(string)
magixFlow.emit(message)
}
}
}
}