Move ZMQ to a separate module

This commit is contained in:
Alexander Nozik 2023-02-25 18:15:19 +03:00
parent 2a386568f9
commit fca718cfc4
10 changed files with 28 additions and 24 deletions
controls-server/src/main/kotlin/space/kscience/controls/server
demo
all-things/src/main/kotlin/space/kscience/controls/demo
car
build.gradle.kts
src/main/kotlin/space/kscience/controls/demo/car
magix-demo/src/main/kotlin
magix
magix-server
build.gradle.kts
src/main/kotlin/space/kscience/magix/server
magix-zmq
build.gradle.kts
src/main/kotlin/space/kscince/magix/zmq

@ -40,14 +40,9 @@ import space.kscience.magix.api.MagixFlowPlugin
import space.kscience.magix.api.MagixMessage import space.kscience.magix.api.MagixMessage
import space.kscience.magix.server.magixModule import space.kscience.magix.server.magixModule
/**
* Create and start a web server for several devices
*/ private fun Application.deviceServerModule(manager: DeviceManager) {
public fun CoroutineScope.startDeviceServer(
manager: DeviceManager,
port: Int = MagixEndpoint.DEFAULT_MAGIX_HTTP_PORT,
host: String = "localhost",
): ApplicationEngine = embeddedServer(CIO, port, host) {
install(WebSockets) install(WebSockets)
// install(CORS) { // install(CORS) {
// anyHost() // anyHost()
@ -63,7 +58,16 @@ public fun CoroutineScope.startDeviceServer(
call.respondRedirect("/dashboard") call.respondRedirect("/dashboard")
} }
} }
}.start() }
/**
* Create and start a web server for several devices
*/
public fun CoroutineScope.startDeviceServer(
manager: DeviceManager,
port: Int = MagixEndpoint.DEFAULT_MAGIX_HTTP_PORT,
host: String = "localhost",
): ApplicationEngine = embeddedServer(CIO, port, host, module = { deviceServerModule(manager) }).start()
public fun ApplicationEngine.whenStarted(callback: Application.() -> Unit) { public fun ApplicationEngine.whenStarted(callback: Application.() -> Unit) {
environment.monitor.subscribe(ApplicationStarted, callback) environment.monitor.subscribe(ApplicationStarted, callback)

@ -21,9 +21,9 @@ import space.kscience.dataforge.context.*
import space.kscience.magix.api.MagixEndpoint import space.kscience.magix.api.MagixEndpoint
import space.kscience.magix.rsocket.rSocketWithTcp import space.kscience.magix.rsocket.rSocketWithTcp
import space.kscience.magix.rsocket.rSocketWithWebSockets import space.kscience.magix.rsocket.rSocketWithWebSockets
import space.kscience.magix.server.RSocketMagix import space.kscience.magix.server.RSocketMagixFlowPlugin
import space.kscience.magix.server.ZmqMagix
import space.kscience.magix.server.startMagixServer import space.kscience.magix.server.startMagixServer
import space.kscince.magix.zmq.ZmqMagixFlowPlugin
import tornadofx.* import tornadofx.*
import java.awt.Desktop import java.awt.Desktop
import java.net.URI import java.net.URI
@ -51,7 +51,7 @@ class DemoController : Controller(), ContextAware {
context.launch { context.launch {
device = deviceManager.install("demo", DemoDevice) device = deviceManager.install("demo", DemoDevice)
//starting magix event loop //starting magix event loop
magixServer = startMagixServer(RSocketMagix(), ZmqMagix()) magixServer = startMagixServer(RSocketMagixFlowPlugin(), ZmqMagixFlowPlugin())
//Launch device client and connect it to the server //Launch device client and connect it to the server
val deviceEndpoint = MagixEndpoint.rSocketWithTcp("localhost") val deviceEndpoint = MagixEndpoint.rSocketWithTcp("localhost")
deviceManager.connectToMagix(deviceEndpoint) deviceManager.connectToMagix(deviceEndpoint)

@ -18,6 +18,7 @@ dependencies {
implementation(projects.magix.magixApi) implementation(projects.magix.magixApi)
implementation(projects.magix.magixServer) implementation(projects.magix.magixServer)
implementation(projects.magix.magixRsocket) implementation(projects.magix.magixRsocket)
implementation(projects.magix.magixZmq)
implementation(projects.controlsMagixClient) implementation(projects.controlsMagixClient)
implementation(projects.controlsStorage.controlsXodus) implementation(projects.controlsStorage.controlsXodus)
implementation(projects.magix.magixStorage.magixStorageXodus) implementation(projects.magix.magixStorage.magixStorageXodus)

@ -18,10 +18,10 @@ import space.kscience.dataforge.context.*
import space.kscience.dataforge.meta.Meta import space.kscience.dataforge.meta.Meta
import space.kscience.magix.api.MagixEndpoint import space.kscience.magix.api.MagixEndpoint
import space.kscience.magix.rsocket.rSocketWithTcp import space.kscience.magix.rsocket.rSocketWithTcp
import space.kscience.magix.server.RSocketMagix import space.kscience.magix.server.RSocketMagixFlowPlugin
import space.kscience.magix.server.ZmqMagix
import space.kscience.magix.server.startMagixServer import space.kscience.magix.server.startMagixServer
import space.kscience.magix.storage.xodus.storeInXodus import space.kscience.magix.storage.xodus.storeInXodus
import space.kscince.magix.zmq.ZmqMagixFlowPlugin
import tornadofx.* import tornadofx.*
import java.nio.file.Paths import java.nio.file.Paths
@ -49,7 +49,7 @@ class VirtualCarController : Controller(), ContextAware {
virtualCar = deviceManager.install("virtual-car", VirtualCar) virtualCar = deviceManager.install("virtual-car", VirtualCar)
//starting magix event loop and connect it to entity store //starting magix event loop and connect it to entity store
magixServer = startMagixServer(RSocketMagix(), ZmqMagix()) magixServer = startMagixServer(RSocketMagixFlowPlugin(), ZmqMagixFlowPlugin())
storageEndpoint = MagixEndpoint.rSocketWithTcp("localhost").apply { storageEndpoint = MagixEndpoint.rSocketWithTcp("localhost").apply {
storeInXodus(this@launch, magixEntityStorePath) storeInXodus(this@launch, magixEntityStorePath)

@ -9,10 +9,10 @@ import kotlinx.serialization.json.*
import org.slf4j.LoggerFactory import org.slf4j.LoggerFactory
import space.kscience.magix.api.MagixEndpoint import space.kscience.magix.api.MagixEndpoint
import space.kscience.magix.api.MagixMessage import space.kscience.magix.api.MagixMessage
import space.kscience.magix.server.RSocketMagix import space.kscience.magix.server.RSocketMagixFlowPlugin
import space.kscience.magix.server.ZmqMagix
import space.kscience.magix.server.startMagixServer import space.kscience.magix.server.startMagixServer
import space.kscince.magix.zmq.ZmqMagixEndpoint import space.kscince.magix.zmq.ZmqMagixEndpoint
import space.kscince.magix.zmq.ZmqMagixFlowPlugin
import java.awt.Desktop import java.awt.Desktop
import java.net.URI import java.net.URI
@ -32,7 +32,7 @@ internal const val numberOfMessages = 100
suspend fun main(): Unit = coroutineScope { suspend fun main(): Unit = coroutineScope {
val logger = LoggerFactory.getLogger("magix-demo") val logger = LoggerFactory.getLogger("magix-demo")
logger.info("Starting magix server") logger.info("Starting magix server")
val server = startMagixServer(RSocketMagix(), ZmqMagix(), buffer = 10) val server = startMagixServer(RSocketMagixFlowPlugin(), ZmqMagixFlowPlugin(), buffer = 10)
server.apply { server.apply {
val host = "localhost"//environment.connectors.first().host val host = "localhost"//environment.connectors.first().host

@ -28,6 +28,4 @@ dependencies{
api("io.rsocket.kotlin:rsocket-ktor-server:$rsocketVersion") api("io.rsocket.kotlin:rsocket-ktor-server:$rsocketVersion")
api("io.rsocket.kotlin:rsocket-transport-ktor-tcp:$rsocketVersion") api("io.rsocket.kotlin:rsocket-transport-ktor-tcp:$rsocketVersion")
api("org.zeromq:jeromq:0.5.2")
} }

@ -18,7 +18,7 @@ import space.kscience.magix.api.MagixEndpoint.Companion.DEFAULT_MAGIX_RAW_PORT
/** /**
* Raw TCP magix server * Raw TCP magix server
*/ */
public class RSocketMagix(public val port: Int = DEFAULT_MAGIX_RAW_PORT): MagixFlowPlugin { public class RSocketMagixFlowPlugin(public val port: Int = DEFAULT_MAGIX_RAW_PORT): MagixFlowPlugin {
override fun start(scope: CoroutineScope, magixFlow: MutableSharedFlow<MagixMessage>): Job { override fun start(scope: CoroutineScope, magixFlow: MutableSharedFlow<MagixMessage>): Job {
val tcpTransport = TcpServerTransport(port = port) val tcpTransport = TcpServerTransport(port = port)
val rSocketJob: TcpServer = RSocketServer().bindIn(scope, tcpTransport, acceptor(scope, magixFlow)) val rSocketJob: TcpServer = RSocketServer().bindIn(scope, tcpTransport, acceptor(scope, magixFlow))

@ -105,7 +105,7 @@ public fun Application.magixModule(magixFlow: MutableSharedFlow<MagixMessage>, r
magixFlow.emit(message) magixFlow.emit(message)
} }
//rSocket server. Filter from Payload //rSocket server. Filter from Payload
rSocket("rsocket", acceptor = RSocketMagix.acceptor( application, magixFlow)) rSocket("rsocket", acceptor = RSocketMagixFlowPlugin.acceptor( application, magixFlow))
} }
} }
} }

@ -9,5 +9,6 @@ description = """
dependencies { dependencies {
api(projects.magix.magixApi) api(projects.magix.magixApi)
api("org.slf4j:slf4j-api:2.0.6")
implementation("org.zeromq:jeromq:0.5.2") implementation("org.zeromq:jeromq:0.5.2")
} }

@ -1,4 +1,4 @@
package space.kscience.magix.server package space.kscince.magix.zmq
import kotlinx.coroutines.* import kotlinx.coroutines.*
import kotlinx.coroutines.flow.MutableSharedFlow import kotlinx.coroutines.flow.MutableSharedFlow
@ -14,7 +14,7 @@ import space.kscience.magix.api.MagixFlowPlugin
import space.kscience.magix.api.MagixMessage import space.kscience.magix.api.MagixMessage
public class ZmqMagix( public class ZmqMagixFlowPlugin(
public val localHost: String = "tcp://*", public val localHost: String = "tcp://*",
public val zmqPubSocketPort: Int = MagixEndpoint.DEFAULT_MAGIX_ZMQ_PUB_PORT, public val zmqPubSocketPort: Int = MagixEndpoint.DEFAULT_MAGIX_ZMQ_PUB_PORT,
public val zmqPullSocketPort: Int = MagixEndpoint.DEFAULT_MAGIX_ZMQ_PULL_PORT, public val zmqPullSocketPort: Int = MagixEndpoint.DEFAULT_MAGIX_ZMQ_PULL_PORT,