Add entityStoreFactory and refactor mongo/xodus connections.kt

This commit is contained in:
Atos1337 2021-12-04 02:10:10 +03:00
parent b79ad40a9b
commit e07780e7da
3 changed files with 102 additions and 46 deletions

View File

@ -1,5 +1,6 @@
package ru.mipt.npm.controls.mongo
import kotlinx.coroutines.InternalCoroutinesApi
import kotlinx.coroutines.Job
import kotlinx.coroutines.flow.filter
import kotlinx.coroutines.flow.launchIn
@ -16,30 +17,42 @@ import ru.mipt.npm.controls.controllers.DeviceManager
import ru.mipt.npm.controls.controllers.hubMessageFlow
import space.kscience.dataforge.context.Context
import space.kscience.dataforge.context.Factory
import space.kscience.dataforge.context.debug
import space.kscience.dataforge.context.logger
import space.kscience.dataforge.meta.Meta
import space.kscience.dataforge.meta.get
import space.kscience.dataforge.meta.string
public object MongoClientFactory : Factory<CoroutineClient> {
public const val connectionString: String = "mongodb://mongoadmin:secret@localhost:27888"
internal object DefaultMongoConfig {
const val databaseName = "deviceMessage"
}
override fun invoke(meta: Meta, context: Context): CoroutineClient = meta["connectionString"]?.string?.let {
public object MongoClientFactory : Factory<CoroutineClient> {
private const val connectionString: String = "mongodb://mongoadmin:secret@localhost:27888"
override fun invoke(meta: Meta, context: Context): CoroutineClient = meta["mongoConfig"]?.get("connectionString")?.string?.let {
KMongo.createClient(it).coroutine
} ?: KMongo.createClient(connectionString).coroutine
}
@OptIn(InternalCoroutinesApi::class)
public fun DeviceManager.connectMongo(
client: CoroutineClient,
factory: Factory<CoroutineClient>,
filterCondition: suspend (DeviceMessage) -> Boolean = { true }
): Job = hubMessageFlow(context).filter(filterCondition).onEach { message ->
context.launch {
client
.getDatabase("deviceServer")
.getCollection<DeviceMessage>()
.insertOne(Json.encodeToString(message))
}
}.launchIn(context).apply {
invokeOnCompletion {
client.close()
): Job {
val client = factory.invoke(meta, context)
logger.debug { "Mongo client opened" }
val collection = client
.getDatabase(meta["mongoConfig"]?.get("databaseName")?.string ?: DefaultMongoConfig.databaseName)
.getCollection<DeviceMessage>()
return hubMessageFlow(context).filter(filterCondition).onEach { message ->
context.launch {
collection.insertOne(Json.encodeToString(message))
}
}.launchIn(context).apply {
invokeOnCompletion(onCancelling = true) {
logger.debug { "Mongo client closed" }
client.close()
}
}
}

View File

@ -1,26 +1,57 @@
package ru.mipt.npm.controls.xodus
import io.ktor.application.*
import jetbrains.exodus.entitystore.PersistentEntityStore
import jetbrains.exodus.entitystore.PersistentEntityStores
import kotlinx.coroutines.InternalCoroutinesApi
import kotlinx.coroutines.Job
import kotlinx.coroutines.flow.SharedFlow
import kotlinx.coroutines.flow.filter
import kotlinx.coroutines.flow.launchIn
import kotlinx.coroutines.flow.onEach
import kotlinx.coroutines.flow.*
import kotlinx.coroutines.job
import ru.mipt.npm.controls.api.DeviceMessage
import ru.mipt.npm.controls.api.PropertyChangedMessage
import ru.mipt.npm.controls.controllers.DeviceManager
import ru.mipt.npm.controls.controllers.hubMessageFlow
import ru.mipt.npm.magix.server.GenericMagixMessage
import space.kscience.dataforge.context.Context
import space.kscience.dataforge.context.Factory
import space.kscience.dataforge.context.debug
import space.kscience.dataforge.context.logger
import space.kscience.dataforge.meta.Meta
import space.kscience.dataforge.meta.get
import space.kscience.dataforge.meta.string
import java.nio.file.Paths
internal object DefaultXodusConfig {
val entityStorePath = Paths.get(".messages")
}
public fun DeviceManager.connectXodus(
entityStore: PersistentEntityStore,
filterCondition: suspend (DeviceMessage) -> Boolean = { it is PropertyChangedMessage }
): Job = hubMessageFlow(context).filter(filterCondition).onEach { message ->
entityStore.executeInTransaction {
(message as PropertyChangedMessage).toEntity(it)
public object EntityStoreFactory : Factory<PersistentEntityStore> {
override fun invoke(meta: Meta, context: Context): PersistentEntityStore {
return PersistentEntityStores.newInstance(
meta["xodusConfig"]?.get("entityStorePath")?.string ?: DefaultXodusConfig.entityStorePath.toString()
)
}
}.launchIn(context)
}
@OptIn(InternalCoroutinesApi::class)
public fun DeviceManager.connectXodus(
factory: Factory<PersistentEntityStore>,
filterCondition: suspend (DeviceMessage) -> Boolean = { it is PropertyChangedMessage }
): Job {
val entityStore = factory.invoke(meta, context)
logger.debug { "Device entity store opened" }
return hubMessageFlow(context).filter(filterCondition).onEach { message ->
entityStore.executeInTransaction {
(message as PropertyChangedMessage).toEntity(it)
}
}.launchIn(context).apply {
invokeOnCompletion(onCancelling = true) {
entityStore.close()
logger.debug { "Device entity store closed" }
}
}
}
//public fun CoroutineScope.startMagixServer(
// entityStore: PersistentEntityStore,
@ -53,3 +84,17 @@ public fun SharedFlow<GenericMagixMessage>.storeInXodus(
}
}
}
@OptIn(InternalCoroutinesApi::class)
public fun Application.storeInXodus(
factory: Factory<PersistentEntityStore>,
flow: MutableSharedFlow<GenericMagixMessage>,
meta: Meta = Meta.EMPTY
) {
val entityStore = factory.invoke(meta)
flow.storeInXodus(entityStore)
coroutineContext.job.invokeOnCompletion(onCancelling = true) {
entityStore.close()
}
}

View File

@ -6,11 +6,8 @@ import javafx.scene.Parent
import javafx.scene.control.TextField
import javafx.scene.layout.Priority
import javafx.stage.Stage
import jetbrains.exodus.entitystore.PersistentEntityStore
import jetbrains.exodus.entitystore.PersistentEntityStores
import kotlinx.coroutines.Job
import kotlinx.coroutines.launch
import org.litote.kmongo.coroutine.CoroutineClient
import ru.mipt.npm.controls.api.DeviceMessage
import ru.mipt.npm.controls.client.connectToMagix
import ru.mipt.npm.controls.controllers.DeviceManager
@ -18,6 +15,7 @@ import ru.mipt.npm.controls.controllers.install
import ru.mipt.npm.controls.demo.car.IVirtualCar.Companion.acceleration
import ru.mipt.npm.controls.mongo.MongoClientFactory
import ru.mipt.npm.controls.mongo.connectMongo
import ru.mipt.npm.controls.xodus.EntityStoreFactory
import ru.mipt.npm.controls.xodus.connectXodus
import ru.mipt.npm.controls.xodus.storeInXodus
import ru.mipt.npm.magix.api.MagixEndpoint
@ -28,14 +26,16 @@ import space.kscience.dataforge.meta.Meta
import tornadofx.*
import java.nio.file.Paths
internal object VirtualCarControllerConfig {
val deviceEntityStorePath = Paths.get(".messages")
val magixEntityStorePath = Paths.get(".server_messages")
}
class VirtualCarController : Controller(), ContextAware {
var virtualCar: VirtualCar? = null
var magixVirtualCar: MagixVirtualCar? = null
var magixServer: ApplicationEngine? = null
var deviceEntityStore: PersistentEntityStore? = null
var magixEntityStore: PersistentEntityStore? = null
var mongoClient: CoroutineClient? = null
var xodusStorageJob: Job? = null
var mongoStorageJob: Job? = null
@ -43,25 +43,29 @@ class VirtualCarController : Controller(), ContextAware {
plugin(DeviceManager)
}
private val deviceManager = context.fetch(DeviceManager)
private val deviceManager = context.fetch(DeviceManager, Meta {
"xodusConfig" put {
"entityStorePath" put VirtualCarControllerConfig.deviceEntityStorePath.toString()
}
})
fun init() {
context.launch {
virtualCar = deviceManager.install("virtual-car", VirtualCar)
//starting magix event loop and connect it to entity store
magixEntityStore = PersistentEntityStores.newInstance(Paths.get(".server_messages").toFile())
magixServer = startMagixServer(enableRawRSocket = true, enableZmq = true) { flow ->
flow.storeInXodus(magixEntityStore as PersistentEntityStore)
storeInXodus(EntityStoreFactory, flow, Meta {
"xodusConfig" put {
"entityStorePath" put VirtualCarControllerConfig.magixEntityStorePath.toString()
}
})
}
magixVirtualCar = deviceManager.install("magix-virtual-car", MagixVirtualCar)
deviceEntityStore = PersistentEntityStores.newInstance(Paths.get(".messages").toFile())
//connect to device entity store
xodusStorageJob = deviceManager.connectXodus(deviceEntityStore as PersistentEntityStore)
xodusStorageJob = deviceManager.connectXodus(EntityStoreFactory)
//Create mongo client and connect to MongoDB
val mongoClient = MongoClientFactory.invoke(meta = Meta.EMPTY, context)
mongoStorageJob = deviceManager.connectMongo(mongoClient)
this@VirtualCarController.mongoClient = mongoClient
mongoStorageJob = deviceManager.connectMongo(MongoClientFactory)
//Launch device client and connect it to the server
val deviceEndpoint = MagixEndpoint.rSocketWithTcp("localhost", DeviceMessage.serializer())
deviceManager.connectToMagix(deviceEndpoint)
@ -76,12 +80,6 @@ class VirtualCarController : Controller(), ContextAware {
logger.info { "Magix virtual car server stopped" }
virtualCar?.close()
logger.info { "Virtual car server stopped" }
deviceEntityStore?.close()
logger.info { "Device entity store closed" }
magixEntityStore?.close()
logger.info { "Magix entity store closed" }
mongoClient?.close()
logger.info { "MongoClient closed" }
context.close()
}
}