Add storeInMongo for magix server

This commit is contained in:
Atos1337 2021-12-16 17:25:35 +03:00
parent dec45b6050
commit 0c4d2fc9e1
3 changed files with 48 additions and 14 deletions

View File

@ -9,5 +9,6 @@ dependencies {
implementation(projects.controlsCore)
implementation(projects.magix.magixApi)
implementation(projects.controlsMagixClient)
implementation(projects.magix.magixServer)
implementation("org.litote.kmongo:kmongo-coroutine-serialization:$kmongoVersion")
}

View File

@ -1,20 +1,22 @@
package ru.mipt.npm.controls.mongo
import io.ktor.application.*
import kotlinx.coroutines.InternalCoroutinesApi
import kotlinx.coroutines.Job
import kotlinx.coroutines.flow.filter
import kotlinx.coroutines.flow.launchIn
import kotlinx.coroutines.flow.onEach
import kotlinx.coroutines.flow.*
import kotlinx.coroutines.job
import kotlinx.coroutines.launch
import kotlinx.serialization.encodeToString
import kotlinx.serialization.json.Json
import org.litote.kmongo.coroutine.CoroutineClient
import org.litote.kmongo.coroutine.CoroutineCollection
import org.litote.kmongo.coroutine.coroutine
import org.litote.kmongo.coroutine.insertOne
import org.litote.kmongo.reactivestreams.KMongo
import ru.mipt.npm.controls.api.DeviceMessage
import ru.mipt.npm.controls.controllers.DeviceManager
import ru.mipt.npm.controls.controllers.hubMessageFlow
import ru.mipt.npm.magix.server.GenericMagixMessage
import space.kscience.dataforge.context.Context
import space.kscience.dataforge.context.Factory
import space.kscience.dataforge.context.debug
@ -22,17 +24,19 @@ import space.kscience.dataforge.context.logger
import space.kscience.dataforge.meta.Meta
import space.kscience.dataforge.meta.get
import space.kscience.dataforge.meta.string
import space.kscience.dataforge.names.Name
internal object DefaultMongoConfig {
const val databaseName = "deviceMessage"
}
private const val DEFAULT_MONGO_DATABASE_URL = "mongodb://mongoadmin:secret@localhost:27888"
private const val DEFAULT_DEVICE_MESSAGE_DATABASE_NAME = "deviceMessage"
private const val DEFAULT_MAGIX_MESSAGE_DATABASE_NAME = "magixMessage"
public val MONGO_DATABASE_URL_PROPERTY: Name = Name.of("mongo", "databaseUrl")
public val MONGO_DEVICE_MESSAGE_DATABASE_NAME_PROPERTY: Name = Name.of("mongo", "deviceMessageDatabaseName")
public val MONGO_MAGIX_MESSAGE_DATABASE_NAME_PROPERTY: Name = Name.of("mongo", "magixMessageDatabaseName")
public object MongoClientFactory : Factory<CoroutineClient> {
private const val connectionString: String = "mongodb://mongoadmin:secret@localhost:27888"
override fun invoke(meta: Meta, context: Context): CoroutineClient = meta["mongoConfig"]?.get("connectionString")?.string?.let {
public object DefaultMongoClientFactory : Factory<CoroutineClient> {
override fun invoke(meta: Meta, context: Context): CoroutineClient = meta[MONGO_DATABASE_URL_PROPERTY]?.string?.let {
KMongo.createClient(it).coroutine
} ?: KMongo.createClient(connectionString).coroutine
} ?: KMongo.createClient(DEFAULT_MONGO_DATABASE_URL).coroutine
}
@OptIn(InternalCoroutinesApi::class)
@ -43,7 +47,7 @@ public fun DeviceManager.connectMongo(
val client = factory.invoke(meta, context)
logger.debug { "Mongo client opened" }
val collection = client
.getDatabase(meta["mongoConfig"]?.get("databaseName")?.string ?: DefaultMongoConfig.databaseName)
.getDatabase(meta[MONGO_DEVICE_MESSAGE_DATABASE_NAME_PROPERTY]?.string ?: DEFAULT_DEVICE_MESSAGE_DATABASE_NAME)
.getCollection<DeviceMessage>()
return hubMessageFlow(context).filter(filterCondition).onEach { message ->
context.launch {
@ -56,3 +60,30 @@ public fun DeviceManager.connectMongo(
}
}
}
internal fun Flow<GenericMagixMessage>.storeInMongo(
collection: CoroutineCollection<GenericMagixMessage>,
flowFilter: suspend (GenericMagixMessage) -> Boolean = { true },
) {
filter(flowFilter).onEach { message ->
collection.insertOne(Json.encodeToString(message))
}
}
@OptIn(InternalCoroutinesApi::class)
public fun Application.storeInMongo(
flow: MutableSharedFlow<GenericMagixMessage>,
meta: Meta = Meta.EMPTY,
factory: Factory<CoroutineClient> = DefaultMongoClientFactory,
flowFilter: suspend (GenericMagixMessage) -> Boolean = { true },
) {
val client = factory.invoke(meta)
val collection = client
.getDatabase(meta[MONGO_MAGIX_MESSAGE_DATABASE_NAME_PROPERTY]?.string ?: DEFAULT_MAGIX_MESSAGE_DATABASE_NAME)
.getCollection<GenericMagixMessage>()
flow.storeInMongo(collection, flowFilter)
coroutineContext.job.invokeOnCompletion(onCancelling = true) {
client.close()
}
}

View File

@ -13,8 +13,9 @@ import ru.mipt.npm.controls.client.connectToMagix
import ru.mipt.npm.controls.controllers.DeviceManager
import ru.mipt.npm.controls.controllers.install
import ru.mipt.npm.controls.demo.car.IVirtualCar.Companion.acceleration
import ru.mipt.npm.controls.mongo.MongoClientFactory
import ru.mipt.npm.controls.mongo.DefaultMongoClientFactory
import ru.mipt.npm.controls.mongo.connectMongo
import ru.mipt.npm.controls.mongo.storeInMongo
import ru.mipt.npm.controls.xodus.XODUS_STORE_PROPERTY
import ru.mipt.npm.controls.xodus.storeInXodus
import ru.mipt.npm.controls.xodus.storeMessagesInXodus
@ -58,12 +59,13 @@ class VirtualCarController : Controller(), ContextAware {
storeInXodus( flow, Meta {
XODUS_STORE_PROPERTY put VirtualCarControllerConfig.magixEntityStorePath.toString()
})
storeInMongo(flow)
}
magixVirtualCar = deviceManager.install("magix-virtual-car", MagixVirtualCar)
//connect to device entity store
xodusStorageJob = deviceManager.storeMessagesInXodus()
//Create mongo client and connect to MongoDB
mongoStorageJob = deviceManager.connectMongo(MongoClientFactory)
mongoStorageJob = deviceManager.connectMongo(DefaultMongoClientFactory)
//Launch device client and connect it to the server
val deviceEndpoint = MagixEndpoint.rSocketWithTcp("localhost", DeviceMessage.serializer())
deviceManager.connectToMagix(deviceEndpoint)