diff --git a/controls-mongo/src/main/kotlin/ru/mipt/npm/controls/mongo/connections.kt b/controls-mongo/src/main/kotlin/ru/mipt/npm/controls/mongo/connections.kt index d5157c1..fded873 100644 --- a/controls-mongo/src/main/kotlin/ru/mipt/npm/controls/mongo/connections.kt +++ b/controls-mongo/src/main/kotlin/ru/mipt/npm/controls/mongo/connections.kt @@ -1,5 +1,6 @@ package ru.mipt.npm.controls.mongo +import kotlinx.coroutines.InternalCoroutinesApi import kotlinx.coroutines.Job import kotlinx.coroutines.flow.filter import kotlinx.coroutines.flow.launchIn @@ -16,30 +17,42 @@ import ru.mipt.npm.controls.controllers.DeviceManager import ru.mipt.npm.controls.controllers.hubMessageFlow import space.kscience.dataforge.context.Context import space.kscience.dataforge.context.Factory +import space.kscience.dataforge.context.debug +import space.kscience.dataforge.context.logger import space.kscience.dataforge.meta.Meta import space.kscience.dataforge.meta.get import space.kscience.dataforge.meta.string -public object MongoClientFactory : Factory { - public const val connectionString: String = "mongodb://mongoadmin:secret@localhost:27888" +internal object DefaultMongoConfig { + const val databaseName = "deviceMessage" +} - override fun invoke(meta: Meta, context: Context): CoroutineClient = meta["connectionString"]?.string?.let { +public object MongoClientFactory : Factory { + private const val connectionString: String = "mongodb://mongoadmin:secret@localhost:27888" + + override fun invoke(meta: Meta, context: Context): CoroutineClient = meta["mongoConfig"]?.get("connectionString")?.string?.let { KMongo.createClient(it).coroutine } ?: KMongo.createClient(connectionString).coroutine } +@OptIn(InternalCoroutinesApi::class) public fun DeviceManager.connectMongo( - client: CoroutineClient, + factory: Factory, filterCondition: suspend (DeviceMessage) -> Boolean = { true } -): Job = hubMessageFlow(context).filter(filterCondition).onEach { message -> - context.launch { - client - .getDatabase("deviceServer") - .getCollection() - .insertOne(Json.encodeToString(message)) - } -}.launchIn(context).apply { - invokeOnCompletion { - client.close() +): Job { + val client = factory.invoke(meta, context) + logger.debug { "Mongo client opened" } + val collection = client + .getDatabase(meta["mongoConfig"]?.get("databaseName")?.string ?: DefaultMongoConfig.databaseName) + .getCollection() + return hubMessageFlow(context).filter(filterCondition).onEach { message -> + context.launch { + collection.insertOne(Json.encodeToString(message)) + } + }.launchIn(context).apply { + invokeOnCompletion(onCancelling = true) { + logger.debug { "Mongo client closed" } + client.close() + } } } diff --git a/controls-xodus/src/main/kotlin/ru/mipt/npm/controls/xodus/connections.kt b/controls-xodus/src/main/kotlin/ru/mipt/npm/controls/xodus/connections.kt index 4aa1be6..b1cbf09 100644 --- a/controls-xodus/src/main/kotlin/ru/mipt/npm/controls/xodus/connections.kt +++ b/controls-xodus/src/main/kotlin/ru/mipt/npm/controls/xodus/connections.kt @@ -1,26 +1,57 @@ package ru.mipt.npm.controls.xodus +import io.ktor.application.* import jetbrains.exodus.entitystore.PersistentEntityStore +import jetbrains.exodus.entitystore.PersistentEntityStores +import kotlinx.coroutines.InternalCoroutinesApi import kotlinx.coroutines.Job -import kotlinx.coroutines.flow.SharedFlow -import kotlinx.coroutines.flow.filter -import kotlinx.coroutines.flow.launchIn -import kotlinx.coroutines.flow.onEach +import kotlinx.coroutines.flow.* +import kotlinx.coroutines.job import ru.mipt.npm.controls.api.DeviceMessage import ru.mipt.npm.controls.api.PropertyChangedMessage import ru.mipt.npm.controls.controllers.DeviceManager import ru.mipt.npm.controls.controllers.hubMessageFlow import ru.mipt.npm.magix.server.GenericMagixMessage +import space.kscience.dataforge.context.Context +import space.kscience.dataforge.context.Factory +import space.kscience.dataforge.context.debug +import space.kscience.dataforge.context.logger +import space.kscience.dataforge.meta.Meta +import space.kscience.dataforge.meta.get +import space.kscience.dataforge.meta.string +import java.nio.file.Paths +internal object DefaultXodusConfig { + val entityStorePath = Paths.get(".messages") +} -public fun DeviceManager.connectXodus( - entityStore: PersistentEntityStore, - filterCondition: suspend (DeviceMessage) -> Boolean = { it is PropertyChangedMessage } -): Job = hubMessageFlow(context).filter(filterCondition).onEach { message -> - entityStore.executeInTransaction { - (message as PropertyChangedMessage).toEntity(it) +public object EntityStoreFactory : Factory { + override fun invoke(meta: Meta, context: Context): PersistentEntityStore { + return PersistentEntityStores.newInstance( + meta["xodusConfig"]?.get("entityStorePath")?.string ?: DefaultXodusConfig.entityStorePath.toString() + ) } -}.launchIn(context) +} + +@OptIn(InternalCoroutinesApi::class) +public fun DeviceManager.connectXodus( + factory: Factory, + filterCondition: suspend (DeviceMessage) -> Boolean = { it is PropertyChangedMessage } +): Job { + val entityStore = factory.invoke(meta, context) + logger.debug { "Device entity store opened" } + + return hubMessageFlow(context).filter(filterCondition).onEach { message -> + entityStore.executeInTransaction { + (message as PropertyChangedMessage).toEntity(it) + } + }.launchIn(context).apply { + invokeOnCompletion(onCancelling = true) { + entityStore.close() + logger.debug { "Device entity store closed" } + } + } +} //public fun CoroutineScope.startMagixServer( // entityStore: PersistentEntityStore, @@ -52,4 +83,18 @@ public fun SharedFlow.storeInXodus( entity.setProperty("value", message.toString()) } } -} \ No newline at end of file +} + +@OptIn(InternalCoroutinesApi::class) +public fun Application.storeInXodus( + factory: Factory, + flow: MutableSharedFlow, + meta: Meta = Meta.EMPTY +) { + val entityStore = factory.invoke(meta) + + flow.storeInXodus(entityStore) + coroutineContext.job.invokeOnCompletion(onCancelling = true) { + entityStore.close() + } +} diff --git a/demo/car/src/main/kotlin/ru/mipt/npm/controls/demo/car/VirtualCarController.kt b/demo/car/src/main/kotlin/ru/mipt/npm/controls/demo/car/VirtualCarController.kt index 31f315c..44aae10 100644 --- a/demo/car/src/main/kotlin/ru/mipt/npm/controls/demo/car/VirtualCarController.kt +++ b/demo/car/src/main/kotlin/ru/mipt/npm/controls/demo/car/VirtualCarController.kt @@ -6,11 +6,8 @@ import javafx.scene.Parent import javafx.scene.control.TextField import javafx.scene.layout.Priority import javafx.stage.Stage -import jetbrains.exodus.entitystore.PersistentEntityStore -import jetbrains.exodus.entitystore.PersistentEntityStores import kotlinx.coroutines.Job import kotlinx.coroutines.launch -import org.litote.kmongo.coroutine.CoroutineClient import ru.mipt.npm.controls.api.DeviceMessage import ru.mipt.npm.controls.client.connectToMagix import ru.mipt.npm.controls.controllers.DeviceManager @@ -18,6 +15,7 @@ import ru.mipt.npm.controls.controllers.install import ru.mipt.npm.controls.demo.car.IVirtualCar.Companion.acceleration import ru.mipt.npm.controls.mongo.MongoClientFactory import ru.mipt.npm.controls.mongo.connectMongo +import ru.mipt.npm.controls.xodus.EntityStoreFactory import ru.mipt.npm.controls.xodus.connectXodus import ru.mipt.npm.controls.xodus.storeInXodus import ru.mipt.npm.magix.api.MagixEndpoint @@ -28,14 +26,16 @@ import space.kscience.dataforge.meta.Meta import tornadofx.* import java.nio.file.Paths +internal object VirtualCarControllerConfig { + val deviceEntityStorePath = Paths.get(".messages") + val magixEntityStorePath = Paths.get(".server_messages") +} + class VirtualCarController : Controller(), ContextAware { var virtualCar: VirtualCar? = null var magixVirtualCar: MagixVirtualCar? = null var magixServer: ApplicationEngine? = null - var deviceEntityStore: PersistentEntityStore? = null - var magixEntityStore: PersistentEntityStore? = null - var mongoClient: CoroutineClient? = null var xodusStorageJob: Job? = null var mongoStorageJob: Job? = null @@ -43,25 +43,29 @@ class VirtualCarController : Controller(), ContextAware { plugin(DeviceManager) } - private val deviceManager = context.fetch(DeviceManager) + private val deviceManager = context.fetch(DeviceManager, Meta { + "xodusConfig" put { + "entityStorePath" put VirtualCarControllerConfig.deviceEntityStorePath.toString() + } + }) fun init() { context.launch { virtualCar = deviceManager.install("virtual-car", VirtualCar) //starting magix event loop and connect it to entity store - magixEntityStore = PersistentEntityStores.newInstance(Paths.get(".server_messages").toFile()) magixServer = startMagixServer(enableRawRSocket = true, enableZmq = true) { flow -> - flow.storeInXodus(magixEntityStore as PersistentEntityStore) + storeInXodus(EntityStoreFactory, flow, Meta { + "xodusConfig" put { + "entityStorePath" put VirtualCarControllerConfig.magixEntityStorePath.toString() + } + }) } magixVirtualCar = deviceManager.install("magix-virtual-car", MagixVirtualCar) - deviceEntityStore = PersistentEntityStores.newInstance(Paths.get(".messages").toFile()) //connect to device entity store - xodusStorageJob = deviceManager.connectXodus(deviceEntityStore as PersistentEntityStore) + xodusStorageJob = deviceManager.connectXodus(EntityStoreFactory) //Create mongo client and connect to MongoDB - val mongoClient = MongoClientFactory.invoke(meta = Meta.EMPTY, context) - mongoStorageJob = deviceManager.connectMongo(mongoClient) - this@VirtualCarController.mongoClient = mongoClient + mongoStorageJob = deviceManager.connectMongo(MongoClientFactory) //Launch device client and connect it to the server val deviceEndpoint = MagixEndpoint.rSocketWithTcp("localhost", DeviceMessage.serializer()) deviceManager.connectToMagix(deviceEndpoint) @@ -76,12 +80,6 @@ class VirtualCarController : Controller(), ContextAware { logger.info { "Magix virtual car server stopped" } virtualCar?.close() logger.info { "Virtual car server stopped" } - deviceEntityStore?.close() - logger.info { "Device entity store closed" } - magixEntityStore?.close() - logger.info { "Magix entity store closed" } - mongoClient?.close() - logger.info { "MongoClient closed" } context.close() } }