Some refactoring
This commit is contained in:
parent
f880b2d637
commit
76846a50cd
@ -2,7 +2,7 @@ plugins {
|
|||||||
id("ru.mipt.npm.gradle.project")
|
id("ru.mipt.npm.gradle.project")
|
||||||
}
|
}
|
||||||
|
|
||||||
val dataforgeVersion: String by extra("0.5.1")
|
val dataforgeVersion: String by extra("0.5.2")
|
||||||
val ktorVersion: String by extra(ru.mipt.npm.gradle.KScienceVersions.ktorVersion)
|
val ktorVersion: String by extra(ru.mipt.npm.gradle.KScienceVersions.ktorVersion)
|
||||||
val rsocketVersion by extra("0.13.1")
|
val rsocketVersion by extra("0.13.1")
|
||||||
|
|
||||||
|
@ -1,6 +1,6 @@
|
|||||||
package ru.mipt.npm.controls.xodus
|
package ru.mipt.npm.controls.xodus
|
||||||
|
|
||||||
import io.ktor.application.*
|
import io.ktor.application.Application
|
||||||
import jetbrains.exodus.entitystore.PersistentEntityStore
|
import jetbrains.exodus.entitystore.PersistentEntityStore
|
||||||
import jetbrains.exodus.entitystore.PersistentEntityStores
|
import jetbrains.exodus.entitystore.PersistentEntityStores
|
||||||
import kotlinx.coroutines.InternalCoroutinesApi
|
import kotlinx.coroutines.InternalCoroutinesApi
|
||||||
@ -8,7 +8,6 @@ import kotlinx.coroutines.Job
|
|||||||
import kotlinx.coroutines.flow.*
|
import kotlinx.coroutines.flow.*
|
||||||
import kotlinx.coroutines.job
|
import kotlinx.coroutines.job
|
||||||
import ru.mipt.npm.controls.api.DeviceMessage
|
import ru.mipt.npm.controls.api.DeviceMessage
|
||||||
import ru.mipt.npm.controls.api.PropertyChangedMessage
|
|
||||||
import ru.mipt.npm.controls.controllers.DeviceManager
|
import ru.mipt.npm.controls.controllers.DeviceManager
|
||||||
import ru.mipt.npm.controls.controllers.hubMessageFlow
|
import ru.mipt.npm.controls.controllers.hubMessageFlow
|
||||||
import ru.mipt.npm.magix.server.GenericMagixMessage
|
import ru.mipt.npm.magix.server.GenericMagixMessage
|
||||||
@ -20,26 +19,29 @@ import space.kscience.dataforge.context.logger
|
|||||||
import space.kscience.dataforge.meta.Meta
|
import space.kscience.dataforge.meta.Meta
|
||||||
import space.kscience.dataforge.meta.get
|
import space.kscience.dataforge.meta.get
|
||||||
import space.kscience.dataforge.meta.string
|
import space.kscience.dataforge.meta.string
|
||||||
import java.nio.file.Paths
|
import space.kscience.dataforge.names.Name
|
||||||
|
|
||||||
internal object DefaultXodusConfig {
|
private const val DEFAULT_XODUS_STORE_PATH = ".storage"
|
||||||
val entityStorePath = Paths.get(".messages")
|
private val XODUS_STORE_PROPERTY = Name.of("xodus", "entityStorePath")
|
||||||
|
|
||||||
|
private fun Context.getPersistentEntityStore(meta: Meta = Meta.EMPTY): PersistentEntityStore {
|
||||||
|
val storePath = meta[XODUS_STORE_PROPERTY]?.string
|
||||||
|
?: properties[XODUS_STORE_PROPERTY]?.string
|
||||||
|
?: DEFAULT_XODUS_STORE_PATH
|
||||||
|
|
||||||
|
return PersistentEntityStores.newInstance(storePath)
|
||||||
}
|
}
|
||||||
|
|
||||||
public object EntityStoreFactory : Factory<PersistentEntityStore> {
|
internal val defaultPersistentStoreFactory = object : Factory<PersistentEntityStore> {
|
||||||
override fun invoke(meta: Meta, context: Context): PersistentEntityStore {
|
override fun invoke(meta: Meta, context: Context): PersistentEntityStore = context.getPersistentEntityStore(meta)
|
||||||
return PersistentEntityStores.newInstance(
|
|
||||||
meta["xodusConfig"]?.get("entityStorePath")?.string ?: DefaultXodusConfig.entityStorePath.toString()
|
|
||||||
)
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
@OptIn(InternalCoroutinesApi::class)
|
@OptIn(InternalCoroutinesApi::class)
|
||||||
public fun DeviceManager.connectXodus(
|
public fun DeviceManager.storeMessagesInXodus(
|
||||||
factory: Factory<PersistentEntityStore>,
|
factory: Factory<PersistentEntityStore> = defaultPersistentStoreFactory,
|
||||||
filterCondition: suspend (DeviceMessage) -> Boolean = { true }
|
filterCondition: suspend (DeviceMessage) -> Boolean = { true },
|
||||||
): Job {
|
): Job {
|
||||||
val entityStore = factory.invoke(meta, context)
|
val entityStore = factory(Meta.EMPTY, context)
|
||||||
logger.debug { "Device entity store opened" }
|
logger.debug { "Device entity store opened" }
|
||||||
|
|
||||||
return hubMessageFlow(context).filter(filterCondition).onEach { message ->
|
return hubMessageFlow(context).filter(filterCondition).onEach { message ->
|
||||||
@ -72,9 +74,9 @@ public fun DeviceManager.connectXodus(
|
|||||||
// }
|
// }
|
||||||
//}
|
//}
|
||||||
|
|
||||||
public fun SharedFlow<GenericMagixMessage>.storeInXodus(
|
internal fun Flow<GenericMagixMessage>.storeInXodus(
|
||||||
entityStore: PersistentEntityStore,
|
entityStore: PersistentEntityStore,
|
||||||
flowFilter: suspend (GenericMagixMessage) -> Boolean = { true },
|
flowFilter: (GenericMagixMessage) -> Boolean = { true },
|
||||||
) {
|
) {
|
||||||
filter(flowFilter).onEach { message ->
|
filter(flowFilter).onEach { message ->
|
||||||
entityStore.encodeToEntity(message, "MagixMessage")
|
entityStore.encodeToEntity(message, "MagixMessage")
|
||||||
@ -83,13 +85,14 @@ public fun SharedFlow<GenericMagixMessage>.storeInXodus(
|
|||||||
|
|
||||||
@OptIn(InternalCoroutinesApi::class)
|
@OptIn(InternalCoroutinesApi::class)
|
||||||
public fun Application.storeInXodus(
|
public fun Application.storeInXodus(
|
||||||
factory: Factory<PersistentEntityStore>,
|
|
||||||
flow: MutableSharedFlow<GenericMagixMessage>,
|
flow: MutableSharedFlow<GenericMagixMessage>,
|
||||||
meta: Meta = Meta.EMPTY
|
meta: Meta = Meta.EMPTY,
|
||||||
|
factory: Factory<PersistentEntityStore> = defaultPersistentStoreFactory,
|
||||||
|
flowFilter: (GenericMagixMessage) -> Boolean = { true },
|
||||||
) {
|
) {
|
||||||
val entityStore = factory.invoke(meta)
|
val entityStore = factory(meta)
|
||||||
|
|
||||||
flow.storeInXodus(entityStore)
|
flow.storeInXodus(entityStore, flowFilter)
|
||||||
coroutineContext.job.invokeOnCompletion(onCancelling = true) {
|
coroutineContext.job.invokeOnCompletion(onCancelling = true) {
|
||||||
entityStore.close()
|
entityStore.close()
|
||||||
}
|
}
|
||||||
|
@ -15,9 +15,8 @@ import ru.mipt.npm.controls.controllers.install
|
|||||||
import ru.mipt.npm.controls.demo.car.IVirtualCar.Companion.acceleration
|
import ru.mipt.npm.controls.demo.car.IVirtualCar.Companion.acceleration
|
||||||
import ru.mipt.npm.controls.mongo.MongoClientFactory
|
import ru.mipt.npm.controls.mongo.MongoClientFactory
|
||||||
import ru.mipt.npm.controls.mongo.connectMongo
|
import ru.mipt.npm.controls.mongo.connectMongo
|
||||||
import ru.mipt.npm.controls.xodus.EntityStoreFactory
|
|
||||||
import ru.mipt.npm.controls.xodus.connectXodus
|
|
||||||
import ru.mipt.npm.controls.xodus.storeInXodus
|
import ru.mipt.npm.controls.xodus.storeInXodus
|
||||||
|
import ru.mipt.npm.controls.xodus.storeMessagesInXodus
|
||||||
import ru.mipt.npm.magix.api.MagixEndpoint
|
import ru.mipt.npm.magix.api.MagixEndpoint
|
||||||
import ru.mipt.npm.magix.rsocket.rSocketWithTcp
|
import ru.mipt.npm.magix.rsocket.rSocketWithTcp
|
||||||
import ru.mipt.npm.magix.server.startMagixServer
|
import ru.mipt.npm.magix.server.startMagixServer
|
||||||
@ -55,7 +54,7 @@ class VirtualCarController : Controller(), ContextAware {
|
|||||||
|
|
||||||
//starting magix event loop and connect it to entity store
|
//starting magix event loop and connect it to entity store
|
||||||
magixServer = startMagixServer(enableRawRSocket = true, enableZmq = true) { flow ->
|
magixServer = startMagixServer(enableRawRSocket = true, enableZmq = true) { flow ->
|
||||||
storeInXodus(EntityStoreFactory, flow, Meta {
|
storeInXodus( flow, Meta {
|
||||||
"xodusConfig" put {
|
"xodusConfig" put {
|
||||||
"entityStorePath" put VirtualCarControllerConfig.magixEntityStorePath.toString()
|
"entityStorePath" put VirtualCarControllerConfig.magixEntityStorePath.toString()
|
||||||
}
|
}
|
||||||
@ -63,7 +62,7 @@ class VirtualCarController : Controller(), ContextAware {
|
|||||||
}
|
}
|
||||||
magixVirtualCar = deviceManager.install("magix-virtual-car", MagixVirtualCar)
|
magixVirtualCar = deviceManager.install("magix-virtual-car", MagixVirtualCar)
|
||||||
//connect to device entity store
|
//connect to device entity store
|
||||||
xodusStorageJob = deviceManager.connectXodus(EntityStoreFactory)
|
xodusStorageJob = deviceManager.storeMessagesInXodus()
|
||||||
//Create mongo client and connect to MongoDB
|
//Create mongo client and connect to MongoDB
|
||||||
mongoStorageJob = deviceManager.connectMongo(MongoClientFactory)
|
mongoStorageJob = deviceManager.connectMongo(MongoClientFactory)
|
||||||
//Launch device client and connect it to the server
|
//Launch device client and connect it to the server
|
||||||
|
2
gradle/wrapper/gradle-wrapper.properties
vendored
2
gradle/wrapper/gradle-wrapper.properties
vendored
@ -1,5 +1,5 @@
|
|||||||
distributionBase=GRADLE_USER_HOME
|
distributionBase=GRADLE_USER_HOME
|
||||||
distributionPath=wrapper/dists
|
distributionPath=wrapper/dists
|
||||||
distributionUrl=https\://services.gradle.org/distributions/gradle-7.2-bin.zip
|
distributionUrl=https\://services.gradle.org/distributions/gradle-7.3-bin.zip
|
||||||
zipStoreBase=GRADLE_USER_HOME
|
zipStoreBase=GRADLE_USER_HOME
|
||||||
zipStorePath=wrapper/dists
|
zipStorePath=wrapper/dists
|
||||||
|
@ -5,7 +5,7 @@ enableFeaturePreview("VERSION_CATALOGS")
|
|||||||
|
|
||||||
pluginManagement {
|
pluginManagement {
|
||||||
|
|
||||||
val toolsVersion = "0.10.5"
|
val toolsVersion = "0.10.7"
|
||||||
|
|
||||||
repositories {
|
repositories {
|
||||||
maven("https://repo.kotlin.link")
|
maven("https://repo.kotlin.link")
|
||||||
@ -29,7 +29,7 @@ dependencyResolutionManagement {
|
|||||||
|
|
||||||
versionCatalogs {
|
versionCatalogs {
|
||||||
create("npm") {
|
create("npm") {
|
||||||
from("ru.mipt.npm:version-catalog:0.10.5")
|
from("ru.mipt.npm:version-catalog:0.10.7")
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -5,6 +5,8 @@ plugins {
|
|||||||
|
|
||||||
val xodusVersion = "1.3.232"
|
val xodusVersion = "1.3.232"
|
||||||
|
|
||||||
|
//TODO to be moved to DataForge
|
||||||
|
|
||||||
kscience {
|
kscience {
|
||||||
useSerialization {
|
useSerialization {
|
||||||
json()
|
json()
|
||||||
|
Loading…
Reference in New Issue
Block a user