Add mongo module

This commit is contained in:
Atos1337 2021-11-27 00:52:14 +03:00
parent 3639783b4e
commit 312cd06706
5 changed files with 74 additions and 4 deletions

View File

@ -0,0 +1,13 @@
plugins {
id("ru.mipt.npm.gradle.jvm")
`maven-publish`
}
val kmongoVersion = "4.4.0"
dependencies {
implementation(projects.controlsCore)
implementation(projects.magix.magixApi)
implementation(projects.controlsMagixClient)
implementation("org.litote.kmongo:kmongo-coroutine-serialization:$kmongoVersion")
}

View File

@ -0,0 +1,43 @@
package ru.mipt.npm.controls.mongo
import kotlinx.coroutines.Job
import kotlinx.coroutines.flow.filter
import kotlinx.coroutines.flow.launchIn
import kotlinx.coroutines.flow.onEach
import kotlinx.coroutines.launch
import kotlinx.serialization.encodeToString
import kotlinx.serialization.json.Json
import org.litote.kmongo.coroutine.CoroutineClient
import org.litote.kmongo.coroutine.coroutine
import org.litote.kmongo.coroutine.insertOne
import org.litote.kmongo.reactivestreams.KMongo
import ru.mipt.npm.controls.api.DeviceMessage
import ru.mipt.npm.controls.controllers.DeviceManager
import ru.mipt.npm.controls.controllers.hubMessageFlow
import space.kscience.dataforge.context.Context
import space.kscience.dataforge.context.Factory
import space.kscience.dataforge.meta.Meta
import space.kscience.dataforge.meta.get
import space.kscience.dataforge.meta.string
public object MongoClientFactory : Factory<CoroutineClient> {
public const val connectionString: String = "mongodb://mongoadmin:secret@localhost:27888"
override fun invoke(meta: Meta, context: Context): CoroutineClient {
return meta["connectionString"]?.string?.let {
KMongo.createClient(it).coroutine
} ?: KMongo.createClient(connectionString).coroutine
}
}
public fun DeviceManager.connectMongo(
client: CoroutineClient,
filterCondition: suspend (DeviceMessage) -> Boolean = { true }
): Job = hubMessageFlow(context).filter(filterCondition).onEach { message ->
context.launch {
client
.getDatabase("deviceServer")
.getCollection<DeviceMessage>()
.insertOne(Json.encodeToString(message))
}
}.launchIn(context)

View File

@ -20,6 +20,7 @@ dependencies {
implementation(projects.magix.magixRsocket) implementation(projects.magix.magixRsocket)
implementation(projects.controlsMagixClient) implementation(projects.controlsMagixClient)
implementation(projects.controlsXodus) implementation(projects.controlsXodus)
implementation(projects.controlsMongo)
implementation("io.ktor:ktor-client-cio:$ktorVersion") implementation("io.ktor:ktor-client-cio:$ktorVersion")
implementation("org.jetbrains.kotlinx:kotlinx-datetime:0.3.1") implementation("org.jetbrains.kotlinx:kotlinx-datetime:0.3.1")
@ -29,6 +30,7 @@ dependencies {
implementation("org.jetbrains.xodus:xodus-entity-store:1.3.232") implementation("org.jetbrains.xodus:xodus-entity-store:1.3.232")
implementation("org.jetbrains.xodus:xodus-environment:1.3.232") implementation("org.jetbrains.xodus:xodus-environment:1.3.232")
implementation("org.jetbrains.xodus:xodus-vfs:1.3.232") implementation("org.jetbrains.xodus:xodus-vfs:1.3.232")
implementation("org.litote.kmongo:kmongo-coroutine-serialization:4.4.0")
} }
tasks.withType<org.jetbrains.kotlin.gradle.tasks.KotlinCompile>().configureEach { tasks.withType<org.jetbrains.kotlin.gradle.tasks.KotlinCompile>().configureEach {

View File

@ -10,16 +10,20 @@ import jetbrains.exodus.entitystore.PersistentEntityStore
import jetbrains.exodus.entitystore.PersistentEntityStores import jetbrains.exodus.entitystore.PersistentEntityStores
import kotlinx.coroutines.Job import kotlinx.coroutines.Job
import kotlinx.coroutines.launch import kotlinx.coroutines.launch
import org.litote.kmongo.coroutine.CoroutineClient
import ru.mipt.npm.controls.api.DeviceMessage import ru.mipt.npm.controls.api.DeviceMessage
import ru.mipt.npm.controls.client.connectToMagix import ru.mipt.npm.controls.client.connectToMagix
import ru.mipt.npm.controls.controllers.DeviceManager import ru.mipt.npm.controls.controllers.DeviceManager
import ru.mipt.npm.controls.controllers.install import ru.mipt.npm.controls.controllers.install
import ru.mipt.npm.controls.demo.car.IVirtualCar.Companion.acceleration import ru.mipt.npm.controls.demo.car.IVirtualCar.Companion.acceleration
import ru.mipt.npm.controls.mongo.MongoClientFactory
import ru.mipt.npm.controls.mongo.connectMongo
import ru.mipt.npm.controls.xodus.connectXodus import ru.mipt.npm.controls.xodus.connectXodus
import ru.mipt.npm.magix.api.MagixEndpoint import ru.mipt.npm.magix.api.MagixEndpoint
import ru.mipt.npm.magix.rsocket.rSocketWithTcp import ru.mipt.npm.magix.rsocket.rSocketWithTcp
import ru.mipt.npm.controls.xodus.startMagixServer import ru.mipt.npm.controls.xodus.startMagixServer
import space.kscience.dataforge.context.* import space.kscience.dataforge.context.*
import space.kscience.dataforge.meta.Meta
import tornadofx.* import tornadofx.*
class VirtualCarController : Controller(), ContextAware { class VirtualCarController : Controller(), ContextAware {
@ -29,7 +33,9 @@ class VirtualCarController : Controller(), ContextAware {
var magixServer: ApplicationEngine? = null var magixServer: ApplicationEngine? = null
var deviceEntityStore: PersistentEntityStore? = null var deviceEntityStore: PersistentEntityStore? = null
var magixEntityStore: PersistentEntityStore? = null var magixEntityStore: PersistentEntityStore? = null
var storageJob: Job? =null var mongoClient: CoroutineClient? = null
var xodusStorageJob: Job? = null
var mongoStorageJob: Job? = null
override val context = Context("demoDevice") { override val context = Context("demoDevice") {
plugin(DeviceManager) plugin(DeviceManager)
@ -48,7 +54,10 @@ class VirtualCarController : Controller(), ContextAware {
magixVirtualCar = deviceManager.install("magix-virtual-car", MagixVirtualCar) magixVirtualCar = deviceManager.install("magix-virtual-car", MagixVirtualCar)
deviceEntityStore = PersistentEntityStores.newInstance("/home/marvel1337/2021/SCADA/.messages") deviceEntityStore = PersistentEntityStores.newInstance("/home/marvel1337/2021/SCADA/.messages")
//connect to device entity store //connect to device entity store
storageJob = deviceManager.connectXodus(deviceEntityStore as PersistentEntityStore) xodusStorageJob = deviceManager.connectXodus(deviceEntityStore as PersistentEntityStore)
//Create mongo client and connect to MongoDB
mongoClient = MongoClientFactory.invoke(meta = Meta.EMPTY, context)
mongoStorageJob = deviceManager.connectMongo(mongoClient as CoroutineClient)
//Launch device client and connect it to the server //Launch device client and connect it to the server
val deviceEndpoint = MagixEndpoint.rSocketWithTcp("localhost", DeviceMessage.serializer()) val deviceEndpoint = MagixEndpoint.rSocketWithTcp("localhost", DeviceMessage.serializer())
deviceManager.connectToMagix(deviceEndpoint) deviceManager.connectToMagix(deviceEndpoint)
@ -67,6 +76,8 @@ class VirtualCarController : Controller(), ContextAware {
logger.info { "Device entity store closed" } logger.info { "Device entity store closed" }
magixEntityStore?.close() magixEntityStore?.close()
logger.info { "Magix entity store closed" } logger.info { "Magix entity store closed" }
mongoClient?.close()
logger.info { "MongoClient closed" }
context.close() context.close()
} }
} }

View File

@ -51,5 +51,6 @@ include(
":magix:magix-demo", ":magix:magix-demo",
":controls-magix-client", ":controls-magix-client",
":motors", ":motors",
":controls-xodus" ":controls-xodus",
) ":controls-mongo"
)