From 3639783b4ed59a0becf32c8e2354ca04844ecaa6 Mon Sep 17 00:00:00 2001 From: Atos1337 Date: Fri, 26 Nov 2021 17:26:41 +0300 Subject: [PATCH] Add connection to xodus from magix server --- controls-xodus/build.gradle.kts | 1 + .../ru/mipt/npm/controls/xodus/connections.kt | 48 +++++++++++++++++++ .../ru/mipt/npm/controls/xodus/device.kt | 21 -------- .../controls/demo/car/VirtualCarController.kt | 23 +++++---- 4 files changed, 63 insertions(+), 30 deletions(-) create mode 100644 controls-xodus/src/main/kotlin/ru/mipt/npm/controls/xodus/connections.kt delete mode 100644 controls-xodus/src/main/kotlin/ru/mipt/npm/controls/xodus/device.kt diff --git a/controls-xodus/build.gradle.kts b/controls-xodus/build.gradle.kts index f6d7c73..cf06c4c 100644 --- a/controls-xodus/build.gradle.kts +++ b/controls-xodus/build.gradle.kts @@ -9,6 +9,7 @@ dependencies { implementation(projects.controlsCore) implementation(projects.magix.magixApi) implementation(projects.controlsMagixClient) + implementation(projects.magix.magixServer) implementation("org.jetbrains.xodus:xodus-entity-store:$xodusVersion") implementation("org.jetbrains.xodus:xodus-environment:$xodusVersion") implementation("org.jetbrains.xodus:xodus-vfs:$xodusVersion") diff --git a/controls-xodus/src/main/kotlin/ru/mipt/npm/controls/xodus/connections.kt b/controls-xodus/src/main/kotlin/ru/mipt/npm/controls/xodus/connections.kt new file mode 100644 index 0000000..4a35bd1 --- /dev/null +++ b/controls-xodus/src/main/kotlin/ru/mipt/npm/controls/xodus/connections.kt @@ -0,0 +1,48 @@ +package ru.mipt.npm.controls.xodus + +import io.ktor.application.* +import io.ktor.server.engine.* +import jetbrains.exodus.entitystore.PersistentEntityStore +import kotlinx.coroutines.CoroutineScope +import kotlinx.coroutines.Job +import kotlinx.coroutines.flow.MutableSharedFlow +import kotlinx.coroutines.flow.filter +import kotlinx.coroutines.flow.launchIn +import kotlinx.coroutines.flow.onEach +import ru.mipt.npm.controls.api.DeviceMessage +import ru.mipt.npm.controls.api.PropertyChangedMessage +import ru.mipt.npm.controls.controllers.DeviceManager +import ru.mipt.npm.controls.controllers.hubMessageFlow +import ru.mipt.npm.magix.api.MagixEndpoint +import ru.mipt.npm.magix.server.GenericMagixMessage +import ru.mipt.npm.magix.server.startMagixServer + + +public fun DeviceManager.connectXodus( + entityStore: PersistentEntityStore, + filterCondition: suspend (DeviceMessage) -> Boolean = { it is PropertyChangedMessage } +): Job = hubMessageFlow(context).filter(filterCondition).onEach { message -> + entityStore.executeInTransaction { + (message as PropertyChangedMessage).toEntity(it) + } +}.launchIn(context) + +public fun CoroutineScope.startMagixServer( + entityStore: PersistentEntityStore, + flowFilter: suspend (GenericMagixMessage) -> Boolean = { true }, + port: Int = MagixEndpoint.DEFAULT_MAGIX_HTTP_PORT, + buffer: Int = 100, + enableRawRSocket: Boolean = true, + enableZmq: Boolean = true, + applicationConfiguration: Application.(MutableSharedFlow) -> Unit = {}, +): ApplicationEngine = startMagixServer( + port, buffer, enableRawRSocket, enableZmq +) { flow -> + applicationConfiguration(flow) + flow.filter(flowFilter).onEach { message -> + entityStore.executeInTransaction { txn -> + val entity = txn.newEntity("MagixMessage") + entity.setProperty("value", message.toString()) + } + } +} diff --git a/controls-xodus/src/main/kotlin/ru/mipt/npm/controls/xodus/device.kt b/controls-xodus/src/main/kotlin/ru/mipt/npm/controls/xodus/device.kt deleted file mode 100644 index ea85fed..0000000 --- a/controls-xodus/src/main/kotlin/ru/mipt/npm/controls/xodus/device.kt +++ /dev/null @@ -1,21 +0,0 @@ -package ru.mipt.npm.controls.xodus - -import jetbrains.exodus.entitystore.PersistentEntityStore -import kotlinx.coroutines.Job -import kotlinx.coroutines.flow.filter -import kotlinx.coroutines.flow.launchIn -import kotlinx.coroutines.flow.onEach -import ru.mipt.npm.controls.api.DeviceMessage -import ru.mipt.npm.controls.api.PropertyChangedMessage -import ru.mipt.npm.controls.controllers.DeviceManager -import ru.mipt.npm.controls.controllers.hubMessageFlow - - -public fun DeviceManager.connectXodus( - entityStore: PersistentEntityStore, - filterCondition: suspend (DeviceMessage) -> Boolean = { it is PropertyChangedMessage } -): Job = hubMessageFlow(context).filter(filterCondition).onEach { message -> - entityStore.executeInTransaction { - (message as PropertyChangedMessage).toEntity(it) - } -}.launchIn(context) \ No newline at end of file diff --git a/demo/car/src/main/kotlin/ru/mipt/npm/controls/demo/car/VirtualCarController.kt b/demo/car/src/main/kotlin/ru/mipt/npm/controls/demo/car/VirtualCarController.kt index 90489c3..ed701ab 100644 --- a/demo/car/src/main/kotlin/ru/mipt/npm/controls/demo/car/VirtualCarController.kt +++ b/demo/car/src/main/kotlin/ru/mipt/npm/controls/demo/car/VirtualCarController.kt @@ -18,7 +18,7 @@ import ru.mipt.npm.controls.demo.car.IVirtualCar.Companion.acceleration import ru.mipt.npm.controls.xodus.connectXodus import ru.mipt.npm.magix.api.MagixEndpoint import ru.mipt.npm.magix.rsocket.rSocketWithTcp -import ru.mipt.npm.magix.server.startMagixServer +import ru.mipt.npm.controls.xodus.startMagixServer import space.kscience.dataforge.context.* import tornadofx.* @@ -27,7 +27,8 @@ class VirtualCarController : Controller(), ContextAware { var virtualCar: VirtualCar? = null var magixVirtualCar: MagixVirtualCar? = null var magixServer: ApplicationEngine? = null - var entityStore: PersistentEntityStore? = null + var deviceEntityStore: PersistentEntityStore? = null + var magixEntityStore: PersistentEntityStore? = null var storageJob: Job? =null override val context = Context("demoDevice") { @@ -40,12 +41,14 @@ class VirtualCarController : Controller(), ContextAware { context.launch { virtualCar = deviceManager.install("virtual-car", VirtualCar) - //starting magix event loop - magixServer = startMagixServer(enableRawRSocket = true, enableZmq = true) + //starting magix event loop and connect it to entity store + magixEntityStore = PersistentEntityStores.newInstance("/home/marvel1337/2021/SCADA/.server_messages") + magixServer = startMagixServer( + entityStore = magixEntityStore as PersistentEntityStore, enableRawRSocket = true, enableZmq = true) magixVirtualCar = deviceManager.install("magix-virtual-car", MagixVirtualCar) - entityStore = PersistentEntityStores.newInstance("/home/marvel1337/2021/SCADA/.messages") - //connect to entity store - storageJob = deviceManager.connectXodus(entityStore as PersistentEntityStore) + deviceEntityStore = PersistentEntityStores.newInstance("/home/marvel1337/2021/SCADA/.messages") + //connect to device entity store + storageJob = deviceManager.connectXodus(deviceEntityStore as PersistentEntityStore) //Launch device client and connect it to the server val deviceEndpoint = MagixEndpoint.rSocketWithTcp("localhost", DeviceMessage.serializer()) deviceManager.connectToMagix(deviceEndpoint) @@ -60,8 +63,10 @@ class VirtualCarController : Controller(), ContextAware { logger.info { "Magix virtual car server stopped" } virtualCar?.close() logger.info { "Virtual car server stopped" } - entityStore?.close() - logger.info { "Entity store closed" } + deviceEntityStore?.close() + logger.info { "Device entity store closed" } + magixEntityStore?.close() + logger.info { "Magix entity store closed" } context.close() } }