From 5da155988297e6fb4d114f376a9c5f628aa790f7 Mon Sep 17 00:00:00 2001 From: Alexander Nozik Date: Sat, 27 Nov 2021 13:37:39 +0300 Subject: [PATCH] Architecture rework --- .../ru/mipt/npm/controls/mongo/connections.kt | 14 +++--- .../ru/mipt/npm/controls/xodus/connections.kt | 43 +++++++++++-------- .../mipt/npm/controls/xodus/util/queries.kt | 19 ++++---- .../npm/controls/xodus/util/QueriesTest.kt | 2 +- .../controls/demo/car/VirtualCarController.kt | 13 +++--- motors/build.gradle.kts | 1 + .../pimotionmaster/PiMotionMasterApp.kt | 4 +- 7 files changed, 56 insertions(+), 40 deletions(-) diff --git a/controls-mongo/src/main/kotlin/ru/mipt/npm/controls/mongo/connections.kt b/controls-mongo/src/main/kotlin/ru/mipt/npm/controls/mongo/connections.kt index 950dd31..d5157c1 100644 --- a/controls-mongo/src/main/kotlin/ru/mipt/npm/controls/mongo/connections.kt +++ b/controls-mongo/src/main/kotlin/ru/mipt/npm/controls/mongo/connections.kt @@ -23,11 +23,9 @@ import space.kscience.dataforge.meta.string public object MongoClientFactory : Factory { public const val connectionString: String = "mongodb://mongoadmin:secret@localhost:27888" - override fun invoke(meta: Meta, context: Context): CoroutineClient { - return meta["connectionString"]?.string?.let { - KMongo.createClient(it).coroutine - } ?: KMongo.createClient(connectionString).coroutine - } + override fun invoke(meta: Meta, context: Context): CoroutineClient = meta["connectionString"]?.string?.let { + KMongo.createClient(it).coroutine + } ?: KMongo.createClient(connectionString).coroutine } public fun DeviceManager.connectMongo( @@ -40,4 +38,8 @@ public fun DeviceManager.connectMongo( .getCollection() .insertOne(Json.encodeToString(message)) } -}.launchIn(context) +}.launchIn(context).apply { + invokeOnCompletion { + client.close() + } +} diff --git a/controls-xodus/src/main/kotlin/ru/mipt/npm/controls/xodus/connections.kt b/controls-xodus/src/main/kotlin/ru/mipt/npm/controls/xodus/connections.kt index 4a35bd1..4aa1be6 100644 --- a/controls-xodus/src/main/kotlin/ru/mipt/npm/controls/xodus/connections.kt +++ b/controls-xodus/src/main/kotlin/ru/mipt/npm/controls/xodus/connections.kt @@ -1,11 +1,8 @@ package ru.mipt.npm.controls.xodus -import io.ktor.application.* -import io.ktor.server.engine.* import jetbrains.exodus.entitystore.PersistentEntityStore -import kotlinx.coroutines.CoroutineScope import kotlinx.coroutines.Job -import kotlinx.coroutines.flow.MutableSharedFlow +import kotlinx.coroutines.flow.SharedFlow import kotlinx.coroutines.flow.filter import kotlinx.coroutines.flow.launchIn import kotlinx.coroutines.flow.onEach @@ -13,9 +10,7 @@ import ru.mipt.npm.controls.api.DeviceMessage import ru.mipt.npm.controls.api.PropertyChangedMessage import ru.mipt.npm.controls.controllers.DeviceManager import ru.mipt.npm.controls.controllers.hubMessageFlow -import ru.mipt.npm.magix.api.MagixEndpoint import ru.mipt.npm.magix.server.GenericMagixMessage -import ru.mipt.npm.magix.server.startMagixServer public fun DeviceManager.connectXodus( @@ -27,22 +22,34 @@ public fun DeviceManager.connectXodus( } }.launchIn(context) -public fun CoroutineScope.startMagixServer( +//public fun CoroutineScope.startMagixServer( +// entityStore: PersistentEntityStore, +// flowFilter: suspend (GenericMagixMessage) -> Boolean = { true }, +// port: Int = MagixEndpoint.DEFAULT_MAGIX_HTTP_PORT, +// buffer: Int = 100, +// enableRawRSocket: Boolean = true, +// enableZmq: Boolean = true, +// applicationConfiguration: Application.(MutableSharedFlow) -> Unit = {}, +//): ApplicationEngine = startMagixServer( +// port, buffer, enableRawRSocket, enableZmq +//) { flow -> +// applicationConfiguration(flow) +// flow.filter(flowFilter).onEach { message -> +// entityStore.executeInTransaction { txn -> +// val entity = txn.newEntity("MagixMessage") +// entity.setProperty("value", message.toString()) +// } +// } +//} + +public fun SharedFlow.storeInXodus( entityStore: PersistentEntityStore, flowFilter: suspend (GenericMagixMessage) -> Boolean = { true }, - port: Int = MagixEndpoint.DEFAULT_MAGIX_HTTP_PORT, - buffer: Int = 100, - enableRawRSocket: Boolean = true, - enableZmq: Boolean = true, - applicationConfiguration: Application.(MutableSharedFlow) -> Unit = {}, -): ApplicationEngine = startMagixServer( - port, buffer, enableRawRSocket, enableZmq -) { flow -> - applicationConfiguration(flow) - flow.filter(flowFilter).onEach { message -> +){ + filter(flowFilter).onEach { message -> entityStore.executeInTransaction { txn -> val entity = txn.newEntity("MagixMessage") entity.setProperty("value", message.toString()) } } -} +} \ No newline at end of file diff --git a/controls-xodus/src/main/kotlin/ru/mipt/npm/controls/xodus/util/queries.kt b/controls-xodus/src/main/kotlin/ru/mipt/npm/controls/xodus/util/queries.kt index 7771f35..f547e05 100644 --- a/controls-xodus/src/main/kotlin/ru/mipt/npm/controls/xodus/util/queries.kt +++ b/controls-xodus/src/main/kotlin/ru/mipt/npm/controls/xodus/util/queries.kt @@ -5,11 +5,14 @@ import kotlinx.datetime.Instant import ru.mipt.npm.controls.api.PropertyChangedMessage import ru.mipt.npm.controls.xodus.toPropertyChangedMessage -public fun StoreTransaction.fromTo(from: Instant, to: Instant): List { - return find( - "PropertyChangedMessage", - "time", - from.toEpochMilliseconds(), - to.toEpochMilliseconds() - ).map { it -> it.toPropertyChangedMessage() }.filterNotNull() -} +//selectDeviceMessagesInRange +public fun StoreTransaction.fromTo( + range: ClosedRange, +// from: Instant, +// to: Instant, +): List = find( + "PropertyChangedMessage", + "time", + range.start.toEpochMilliseconds(), + range.endInclusive.toEpochMilliseconds() +).mapNotNull { it.toPropertyChangedMessage() } diff --git a/controls-xodus/src/test/kotlin/ru/mipt/npm/controls/xodus/util/QueriesTest.kt b/controls-xodus/src/test/kotlin/ru/mipt/npm/controls/xodus/util/QueriesTest.kt index 7da8bba..73d915b 100644 --- a/controls-xodus/src/test/kotlin/ru/mipt/npm/controls/xodus/util/QueriesTest.kt +++ b/controls-xodus/src/test/kotlin/ru/mipt/npm/controls/xodus/util/QueriesTest.kt @@ -55,7 +55,7 @@ internal class QueriesTest { @Test fun testFromTo() { assertEquals(propertyChangedMessages.subList(0, 2).toSet(), entityStore.computeInReadonlyTransaction { - it.fromTo(Instant.fromEpochMilliseconds(1000), Instant.fromEpochMilliseconds(1500)) + it.fromTo( Instant.fromEpochMilliseconds(1000)..Instant.fromEpochMilliseconds(1500)) }.toSet()) } diff --git a/demo/car/src/main/kotlin/ru/mipt/npm/controls/demo/car/VirtualCarController.kt b/demo/car/src/main/kotlin/ru/mipt/npm/controls/demo/car/VirtualCarController.kt index f9950fb..8a652bb 100644 --- a/demo/car/src/main/kotlin/ru/mipt/npm/controls/demo/car/VirtualCarController.kt +++ b/demo/car/src/main/kotlin/ru/mipt/npm/controls/demo/car/VirtualCarController.kt @@ -19,9 +19,10 @@ import ru.mipt.npm.controls.demo.car.IVirtualCar.Companion.acceleration import ru.mipt.npm.controls.mongo.MongoClientFactory import ru.mipt.npm.controls.mongo.connectMongo import ru.mipt.npm.controls.xodus.connectXodus +import ru.mipt.npm.controls.xodus.storeInXodus import ru.mipt.npm.magix.api.MagixEndpoint import ru.mipt.npm.magix.rsocket.rSocketWithTcp -import ru.mipt.npm.controls.xodus.startMagixServer +import ru.mipt.npm.magix.server.startMagixServer import space.kscience.dataforge.context.* import space.kscience.dataforge.meta.Meta import tornadofx.* @@ -49,15 +50,17 @@ class VirtualCarController : Controller(), ContextAware { //starting magix event loop and connect it to entity store magixEntityStore = PersistentEntityStores.newInstance("/home/marvel1337/2021/SCADA/.server_messages") - magixServer = startMagixServer( - entityStore = magixEntityStore as PersistentEntityStore, enableRawRSocket = true, enableZmq = true) + magixServer = startMagixServer(enableRawRSocket = true, enableZmq = true) { flow -> + flow.storeInXodus(magixEntityStore as PersistentEntityStore) + } magixVirtualCar = deviceManager.install("magix-virtual-car", MagixVirtualCar) deviceEntityStore = PersistentEntityStores.newInstance("/home/marvel1337/2021/SCADA/.messages") //connect to device entity store xodusStorageJob = deviceManager.connectXodus(deviceEntityStore as PersistentEntityStore) //Create mongo client and connect to MongoDB - mongoClient = MongoClientFactory.invoke(meta = Meta.EMPTY, context) - mongoStorageJob = deviceManager.connectMongo(mongoClient as CoroutineClient) + val mongoClient = MongoClientFactory.invoke(meta = Meta.EMPTY, context) + mongoStorageJob = deviceManager.connectMongo(mongoClient) + this@VirtualCarController.mongoClient = mongoClient //Launch device client and connect it to the server val deviceEndpoint = MagixEndpoint.rSocketWithTcp("localhost", DeviceMessage.serializer()) deviceManager.connectToMagix(deviceEndpoint) diff --git a/motors/build.gradle.kts b/motors/build.gradle.kts index a2bd21d..503ab5d 100644 --- a/motors/build.gradle.kts +++ b/motors/build.gradle.kts @@ -19,6 +19,7 @@ kscience{ } val ktorVersion: String by rootProject.extra +val dataforgeVersion: String by extra dependencies { implementation(project(":controls-tcp")) diff --git a/motors/src/main/kotlin/ru/mipt/npm/devices/pimotionmaster/PiMotionMasterApp.kt b/motors/src/main/kotlin/ru/mipt/npm/devices/pimotionmaster/PiMotionMasterApp.kt index cf66c82..76fc7d2 100644 --- a/motors/src/main/kotlin/ru/mipt/npm/devices/pimotionmaster/PiMotionMasterApp.kt +++ b/motors/src/main/kotlin/ru/mipt/npm/devices/pimotionmaster/PiMotionMasterApp.kt @@ -16,7 +16,7 @@ import ru.mipt.npm.controls.controllers.installing import ru.mipt.npm.devices.pimotionmaster.PiMotionMasterDevice.Axis.Companion.maxPosition import ru.mipt.npm.devices.pimotionmaster.PiMotionMasterDevice.Axis.Companion.minPosition import ru.mipt.npm.devices.pimotionmaster.PiMotionMasterDevice.Axis.Companion.position -import space.kscience.dataforge.context.Global +import space.kscience.dataforge.context.Context import space.kscience.dataforge.context.fetch import tornadofx.* @@ -24,7 +24,7 @@ class PiMotionMasterApp : App(PiMotionMasterView::class) class PiMotionMasterController : Controller() { //initialize context - val context = Global.buildContext("piMotionMaster"){ + val context = Context("piMotionMaster"){ plugin(DeviceManager) }