diff --git a/controls-mongo/build.gradle.kts b/controls-mongo/build.gradle.kts index e6ce49d..624b71f 100644 --- a/controls-mongo/build.gradle.kts +++ b/controls-mongo/build.gradle.kts @@ -9,5 +9,6 @@ dependencies { implementation(projects.controlsCore) implementation(projects.magix.magixApi) implementation(projects.controlsMagixClient) + implementation(projects.magix.magixServer) implementation("org.litote.kmongo:kmongo-coroutine-serialization:$kmongoVersion") } diff --git a/controls-mongo/src/main/kotlin/ru/mipt/npm/controls/mongo/connections.kt b/controls-mongo/src/main/kotlin/ru/mipt/npm/controls/mongo/connections.kt index fded873..a8a7800 100644 --- a/controls-mongo/src/main/kotlin/ru/mipt/npm/controls/mongo/connections.kt +++ b/controls-mongo/src/main/kotlin/ru/mipt/npm/controls/mongo/connections.kt @@ -1,20 +1,22 @@ package ru.mipt.npm.controls.mongo +import io.ktor.application.* import kotlinx.coroutines.InternalCoroutinesApi import kotlinx.coroutines.Job -import kotlinx.coroutines.flow.filter -import kotlinx.coroutines.flow.launchIn -import kotlinx.coroutines.flow.onEach +import kotlinx.coroutines.flow.* +import kotlinx.coroutines.job import kotlinx.coroutines.launch import kotlinx.serialization.encodeToString import kotlinx.serialization.json.Json import org.litote.kmongo.coroutine.CoroutineClient +import org.litote.kmongo.coroutine.CoroutineCollection import org.litote.kmongo.coroutine.coroutine import org.litote.kmongo.coroutine.insertOne import org.litote.kmongo.reactivestreams.KMongo import ru.mipt.npm.controls.api.DeviceMessage import ru.mipt.npm.controls.controllers.DeviceManager import ru.mipt.npm.controls.controllers.hubMessageFlow +import ru.mipt.npm.magix.server.GenericMagixMessage import space.kscience.dataforge.context.Context import space.kscience.dataforge.context.Factory import space.kscience.dataforge.context.debug @@ -22,17 +24,19 @@ import space.kscience.dataforge.context.logger import space.kscience.dataforge.meta.Meta import space.kscience.dataforge.meta.get import space.kscience.dataforge.meta.string +import space.kscience.dataforge.names.Name -internal object DefaultMongoConfig { - const val databaseName = "deviceMessage" -} +private const val DEFAULT_MONGO_DATABASE_URL = "mongodb://mongoadmin:secret@localhost:27888" +private const val DEFAULT_DEVICE_MESSAGE_DATABASE_NAME = "deviceMessage" +private const val DEFAULT_MAGIX_MESSAGE_DATABASE_NAME = "magixMessage" +public val MONGO_DATABASE_URL_PROPERTY: Name = Name.of("mongo", "databaseUrl") +public val MONGO_DEVICE_MESSAGE_DATABASE_NAME_PROPERTY: Name = Name.of("mongo", "deviceMessageDatabaseName") +public val MONGO_MAGIX_MESSAGE_DATABASE_NAME_PROPERTY: Name = Name.of("mongo", "magixMessageDatabaseName") -public object MongoClientFactory : Factory { - private const val connectionString: String = "mongodb://mongoadmin:secret@localhost:27888" - - override fun invoke(meta: Meta, context: Context): CoroutineClient = meta["mongoConfig"]?.get("connectionString")?.string?.let { +public object DefaultMongoClientFactory : Factory { + override fun invoke(meta: Meta, context: Context): CoroutineClient = meta[MONGO_DATABASE_URL_PROPERTY]?.string?.let { KMongo.createClient(it).coroutine - } ?: KMongo.createClient(connectionString).coroutine + } ?: KMongo.createClient(DEFAULT_MONGO_DATABASE_URL).coroutine } @OptIn(InternalCoroutinesApi::class) @@ -43,7 +47,7 @@ public fun DeviceManager.connectMongo( val client = factory.invoke(meta, context) logger.debug { "Mongo client opened" } val collection = client - .getDatabase(meta["mongoConfig"]?.get("databaseName")?.string ?: DefaultMongoConfig.databaseName) + .getDatabase(meta[MONGO_DEVICE_MESSAGE_DATABASE_NAME_PROPERTY]?.string ?: DEFAULT_DEVICE_MESSAGE_DATABASE_NAME) .getCollection() return hubMessageFlow(context).filter(filterCondition).onEach { message -> context.launch { @@ -56,3 +60,30 @@ public fun DeviceManager.connectMongo( } } } + +internal fun Flow.storeInMongo( + collection: CoroutineCollection, + flowFilter: suspend (GenericMagixMessage) -> Boolean = { true }, +) { + filter(flowFilter).onEach { message -> + collection.insertOne(Json.encodeToString(message)) + } +} + +@OptIn(InternalCoroutinesApi::class) +public fun Application.storeInMongo( + flow: MutableSharedFlow, + meta: Meta = Meta.EMPTY, + factory: Factory = DefaultMongoClientFactory, + flowFilter: suspend (GenericMagixMessage) -> Boolean = { true }, +) { + val client = factory.invoke(meta) + val collection = client + .getDatabase(meta[MONGO_MAGIX_MESSAGE_DATABASE_NAME_PROPERTY]?.string ?: DEFAULT_MAGIX_MESSAGE_DATABASE_NAME) + .getCollection() + + flow.storeInMongo(collection, flowFilter) + coroutineContext.job.invokeOnCompletion(onCancelling = true) { + client.close() + } +} diff --git a/demo/car/src/main/kotlin/ru/mipt/npm/controls/demo/car/VirtualCarController.kt b/demo/car/src/main/kotlin/ru/mipt/npm/controls/demo/car/VirtualCarController.kt index 03b62f1..0dd6ffe 100644 --- a/demo/car/src/main/kotlin/ru/mipt/npm/controls/demo/car/VirtualCarController.kt +++ b/demo/car/src/main/kotlin/ru/mipt/npm/controls/demo/car/VirtualCarController.kt @@ -13,8 +13,9 @@ import ru.mipt.npm.controls.client.connectToMagix import ru.mipt.npm.controls.controllers.DeviceManager import ru.mipt.npm.controls.controllers.install import ru.mipt.npm.controls.demo.car.IVirtualCar.Companion.acceleration -import ru.mipt.npm.controls.mongo.MongoClientFactory +import ru.mipt.npm.controls.mongo.DefaultMongoClientFactory import ru.mipt.npm.controls.mongo.connectMongo +import ru.mipt.npm.controls.mongo.storeInMongo import ru.mipt.npm.controls.xodus.XODUS_STORE_PROPERTY import ru.mipt.npm.controls.xodus.storeInXodus import ru.mipt.npm.controls.xodus.storeMessagesInXodus @@ -58,12 +59,13 @@ class VirtualCarController : Controller(), ContextAware { storeInXodus( flow, Meta { XODUS_STORE_PROPERTY put VirtualCarControllerConfig.magixEntityStorePath.toString() }) + storeInMongo(flow) } magixVirtualCar = deviceManager.install("magix-virtual-car", MagixVirtualCar) //connect to device entity store xodusStorageJob = deviceManager.storeMessagesInXodus() //Create mongo client and connect to MongoDB - mongoStorageJob = deviceManager.connectMongo(MongoClientFactory) + mongoStorageJob = deviceManager.connectMongo(DefaultMongoClientFactory) //Launch device client and connect it to the server val deviceEndpoint = MagixEndpoint.rSocketWithTcp("localhost", DeviceMessage.serializer()) deviceManager.connectToMagix(deviceEndpoint)