From 427ecbf91a3f3c7d612dba2ab6d91fe0004e60c8 Mon Sep 17 00:00:00 2001 From: Alexander Nozik Date: Tue, 18 Jan 2022 18:25:40 +0300 Subject: [PATCH] Refactor storage --- controls-mongo/build.gradle.kts | 8 +-- ...ousMongoClient.kt => MongoEventStorage.kt} | 27 +++++----- controls-storage/build.gradle.kts | 12 +++-- .../asynchronous/commonApi.kt | 50 ------------------- .../synchronous/SynchronousStorageClient.kt | 20 -------- .../npm/controls/storage/EventStorage.kt} | 8 +-- .../npm/controls/storage/storageCommon.kt} | 24 +++++---- .../controls/storage/asynchronous/jvmApi.kt | 40 --------------- .../{synchronous/jvmApi.kt => storageJvm.kt} | 12 +++-- controls-xodus/build.gradle.kts | 13 +++-- ...ousXodusClient.kt => XodusEventStorage.kt} | 44 ++++++++-------- .../src/test/kotlin/PropertyHistoryTest.kt | 18 ++++--- .../controls/demo/car/VirtualCarController.kt | 14 +++--- 13 files changed, 98 insertions(+), 192 deletions(-) rename controls-mongo/src/main/kotlin/ru/mipt/npm/controls/mongo/{AsynchronousMongoClient.kt => MongoEventStorage.kt} (75%) delete mode 100644 controls-storage/src/commonMain/kotlin/ru.mipt.npm.controls.storage/asynchronous/commonApi.kt delete mode 100644 controls-storage/src/commonMain/kotlin/ru.mipt.npm.controls.storage/synchronous/SynchronousStorageClient.kt rename controls-storage/src/commonMain/kotlin/{ru.mipt.npm.controls.storage/asynchronous/AsynchronousStorageClient.kt => ru/mipt/npm/controls/storage/EventStorage.kt} (64%) rename controls-storage/src/commonMain/kotlin/{ru.mipt.npm.controls.storage/synchronous/commonApi.kt => ru/mipt/npm/controls/storage/storageCommon.kt} (90%) delete mode 100644 controls-storage/src/jvmMain/kotlin/ru/mipt/npm/controls/storage/asynchronous/jvmApi.kt rename controls-storage/src/jvmMain/kotlin/ru/mipt/npm/controls/storage/{synchronous/jvmApi.kt => storageJvm.kt} (86%) rename controls-xodus/src/main/kotlin/ru/mipt/npm/controls/xodus/{SynchronousXodusClient.kt => XodusEventStorage.kt} (52%) diff --git a/controls-mongo/build.gradle.kts b/controls-mongo/build.gradle.kts index d5c81f0..c9e6b57 100644 --- a/controls-mongo/build.gradle.kts +++ b/controls-mongo/build.gradle.kts @@ -6,10 +6,10 @@ plugins { val kmongoVersion = "4.4.0" dependencies { - implementation(projects.controlsCore) - implementation(projects.magix.magixApi) - implementation(projects.controlsMagixClient) - implementation(projects.magix.magixServer) implementation(projects.controlsStorage) implementation("org.litote.kmongo:kmongo-coroutine-serialization:$kmongoVersion") } + +readme{ + maturity = ru.mipt.npm.gradle.Maturity.PROTOTYPE +} diff --git a/controls-mongo/src/main/kotlin/ru/mipt/npm/controls/mongo/AsynchronousMongoClient.kt b/controls-mongo/src/main/kotlin/ru/mipt/npm/controls/mongo/MongoEventStorage.kt similarity index 75% rename from controls-mongo/src/main/kotlin/ru/mipt/npm/controls/mongo/AsynchronousMongoClient.kt rename to controls-mongo/src/main/kotlin/ru/mipt/npm/controls/mongo/MongoEventStorage.kt index f9c4407..f6cb44f 100644 --- a/controls-mongo/src/main/kotlin/ru/mipt/npm/controls/mongo/AsynchronousMongoClient.kt +++ b/controls-mongo/src/main/kotlin/ru/mipt/npm/controls/mongo/MongoEventStorage.kt @@ -1,37 +1,33 @@ package ru.mipt.npm.controls.mongo -import kotlinx.serialization.InternalSerializationApi import kotlinx.serialization.KSerializer import kotlinx.serialization.json.Json -import kotlinx.serialization.serializer import org.litote.kmongo.coroutine.CoroutineClient import org.litote.kmongo.coroutine.coroutine import org.litote.kmongo.coroutine.insertOne import org.litote.kmongo.reactivestreams.KMongo import ru.mipt.npm.controls.api.DeviceMessage import ru.mipt.npm.controls.api.PropertyChangedMessage -import ru.mipt.npm.controls.storage.asynchronous.AsynchronousStorageClient -import ru.mipt.npm.controls.storage.synchronous.StorageKind +import ru.mipt.npm.controls.storage.EventStorage import ru.mipt.npm.magix.server.GenericMagixMessage import space.kscience.dataforge.context.Context +import space.kscience.dataforge.context.Factory import space.kscience.dataforge.meta.Meta import space.kscience.dataforge.meta.get import space.kscience.dataforge.meta.string -import space.kscience.dataforge.context.Factory import space.kscience.dataforge.names.Name -import kotlin.reflect.KClass private const val DEFAULT_DEVICE_MESSAGE_DATABASE_NAME: String = "deviceMessage" private const val DEFAULT_MAGIX_MESSAGE_DATABASE_NAME = "magixMessage" private const val DEFAULT_MONGO_DATABASE_URL = "mongodb://mongoadmin:secret@localhost:27888" -public val MONGO_DEVICE_MESSAGE_DATABASE_NAME_PROPERTY: Name = Name.of("mongo", "deviceMessageDatabaseName") +private val MONGO_DEVICE_MESSAGE_DATABASE_NAME_PROPERTY: Name = Name.of("mongo", "deviceMessageDatabaseName") public val MONGO_MAGIX_MESSAGE_DATABASE_NAME_PROPERTY: Name = Name.of("mongo", "magixMessageDatabaseName") public val MONGO_DATABASE_URL_PROPERTY: Name = Name.of("mongo", "databaseUrl") -internal class AsynchronousMongoClient( +internal class MongoEventStorage( private val client: CoroutineClient, - private val meta: Meta = Meta.EMPTY -) : AsynchronousStorageClient { + private val meta: Meta = Meta.EMPTY, +) : EventStorage { override suspend fun storeValueInDeviceHub(value: T, serializer: KSerializer) { val collection = client .getDatabase( @@ -45,7 +41,8 @@ internal class AsynchronousMongoClient( override suspend fun storeValueInMagixServer(value: T, serializer: KSerializer) { val collection = client - .getDatabase(meta[MONGO_MAGIX_MESSAGE_DATABASE_NAME_PROPERTY]?.string ?: DEFAULT_MAGIX_MESSAGE_DATABASE_NAME) + .getDatabase(meta[MONGO_MAGIX_MESSAGE_DATABASE_NAME_PROPERTY]?.string + ?: DEFAULT_MAGIX_MESSAGE_DATABASE_NAME) .getCollection() collection.insertOne(Json.encodeToString(serializer, value)) @@ -53,7 +50,7 @@ internal class AsynchronousMongoClient( override suspend fun getPropertyHistory( sourceDeviceName: String, - propertyName: String + propertyName: String, ): List { TODO("Not yet implemented: problems with deserialization") } @@ -63,12 +60,12 @@ internal class AsynchronousMongoClient( } } -public object DefaultAsynchronousMongoClientFactory : Factory { - override fun invoke(meta: Meta, context: Context): AsynchronousStorageClient { +public object DefaultAsynchronousMongoClientFactory : Factory { + override fun invoke(meta: Meta, context: Context): EventStorage { val client = meta[MONGO_DATABASE_URL_PROPERTY]?.string?.let { KMongo.createClient(it).coroutine } ?: KMongo.createClient(DEFAULT_MONGO_DATABASE_URL).coroutine - return AsynchronousMongoClient(client, meta) + return MongoEventStorage(client, meta) } } diff --git a/controls-storage/build.gradle.kts b/controls-storage/build.gradle.kts index 13e3d5a..ab25e1d 100644 --- a/controls-storage/build.gradle.kts +++ b/controls-storage/build.gradle.kts @@ -9,16 +9,20 @@ kotlin { sourceSets { commonMain { dependencies { - implementation(projects.controlsCore) + api(projects.controlsCore) } } jvmMain { dependencies { - implementation(projects.magix.magixApi) - implementation(projects.controlsMagixClient) - implementation(projects.magix.magixServer) + api(projects.magix.magixApi) + api(projects.controlsMagixClient) + api(projects.magix.magixServer) } } } } + +readme{ + maturity = ru.mipt.npm.gradle.Maturity.PROTOTYPE +} diff --git a/controls-storage/src/commonMain/kotlin/ru.mipt.npm.controls.storage/asynchronous/commonApi.kt b/controls-storage/src/commonMain/kotlin/ru.mipt.npm.controls.storage/asynchronous/commonApi.kt deleted file mode 100644 index 26f6ddb..0000000 --- a/controls-storage/src/commonMain/kotlin/ru.mipt.npm.controls.storage/asynchronous/commonApi.kt +++ /dev/null @@ -1,50 +0,0 @@ -package ru.mipt.npm.controls.storage.asynchronous - -import io.ktor.utils.io.core.use -import kotlinx.coroutines.InternalCoroutinesApi -import kotlinx.coroutines.Job -import kotlinx.coroutines.flow.filter -import kotlinx.coroutines.flow.launchIn -import kotlinx.coroutines.flow.onEach -import ru.mipt.npm.controls.api.DeviceMessage -import ru.mipt.npm.controls.api.PropertyChangedMessage -import ru.mipt.npm.controls.controllers.DeviceManager -import ru.mipt.npm.controls.controllers.hubMessageFlow -import space.kscience.dataforge.context.Factory -import space.kscience.dataforge.context.debug -import space.kscience.dataforge.context.logger -import space.kscience.dataforge.meta.Meta - -/** - * Asynchronous version of synchronous API, so for more details check relative docs - */ - -@OptIn(InternalCoroutinesApi::class) -public fun DeviceManager.storeMessages( - factory: Factory, - filterCondition: suspend (DeviceMessage) -> Boolean = { true }, -): Job { - val client = factory(meta, context) - logger.debug { "Storage client created" } - - return hubMessageFlow(context).filter(filterCondition).onEach { message -> - client.storeValueInDeviceHub(message) - }.launchIn(context).apply { - invokeOnCompletion(onCancelling = true) { - client.close() - logger.debug { "Storage client closed" } - } - } -} - -public suspend fun getPropertyHistory( - sourceDeviceName: String, - propertyName: String, - factory: Factory, - meta: Meta = Meta.EMPTY -): List { - return factory(meta).use { - it.getPropertyHistory(sourceDeviceName, propertyName) - } -} - diff --git a/controls-storage/src/commonMain/kotlin/ru.mipt.npm.controls.storage/synchronous/SynchronousStorageClient.kt b/controls-storage/src/commonMain/kotlin/ru.mipt.npm.controls.storage/synchronous/SynchronousStorageClient.kt deleted file mode 100644 index 3b6cfdf..0000000 --- a/controls-storage/src/commonMain/kotlin/ru.mipt.npm.controls.storage/synchronous/SynchronousStorageClient.kt +++ /dev/null @@ -1,20 +0,0 @@ -package ru.mipt.npm.controls.storage.synchronous - -import io.ktor.utils.io.core.Closeable -import kotlinx.serialization.KSerializer -import kotlinx.serialization.serializer -import ru.mipt.npm.controls.api.PropertyChangedMessage - -public interface SynchronousStorageClient : Closeable { - public fun storeValueInDeviceHub(value: T, serializer: KSerializer) - - public fun storeValueInMagixServer(value: T, serializer: KSerializer) - - public fun getPropertyHistory(sourceDeviceName: String, propertyName: String): List -} - -public inline fun SynchronousStorageClient.storeValueInDeviceHub(value: T): Unit = - storeValueInDeviceHub(value, serializer()) - -public inline fun SynchronousStorageClient.storeValueInMagixServer(value: T): Unit = - storeValueInMagixServer(value, serializer()) diff --git a/controls-storage/src/commonMain/kotlin/ru.mipt.npm.controls.storage/asynchronous/AsynchronousStorageClient.kt b/controls-storage/src/commonMain/kotlin/ru/mipt/npm/controls/storage/EventStorage.kt similarity index 64% rename from controls-storage/src/commonMain/kotlin/ru.mipt.npm.controls.storage/asynchronous/AsynchronousStorageClient.kt rename to controls-storage/src/commonMain/kotlin/ru/mipt/npm/controls/storage/EventStorage.kt index 71b5558..6f874aa 100644 --- a/controls-storage/src/commonMain/kotlin/ru.mipt.npm.controls.storage/asynchronous/AsynchronousStorageClient.kt +++ b/controls-storage/src/commonMain/kotlin/ru/mipt/npm/controls/storage/EventStorage.kt @@ -1,11 +1,11 @@ -package ru.mipt.npm.controls.storage.asynchronous +package ru.mipt.npm.controls.storage import io.ktor.utils.io.core.Closeable import kotlinx.serialization.KSerializer import kotlinx.serialization.serializer import ru.mipt.npm.controls.api.PropertyChangedMessage -public interface AsynchronousStorageClient : Closeable { +public interface EventStorage : Closeable { public suspend fun storeValueInDeviceHub(value: T, serializer: KSerializer) public suspend fun storeValueInMagixServer(value: T, serializer: KSerializer) @@ -13,8 +13,8 @@ public interface AsynchronousStorageClient : Closeable { public suspend fun getPropertyHistory(sourceDeviceName: String, propertyName: String): List } -public suspend inline fun AsynchronousStorageClient.storeValueInDeviceHub(value: T): Unit = +public suspend inline fun EventStorage.storeValueInDeviceHub(value: T): Unit = storeValueInDeviceHub(value, serializer()) -public suspend inline fun AsynchronousStorageClient.storeValueInMagixServer(value: T): Unit = +public suspend inline fun EventStorage.storeValueInMagixServer(value: T): Unit = storeValueInMagixServer(value, serializer()) diff --git a/controls-storage/src/commonMain/kotlin/ru.mipt.npm.controls.storage/synchronous/commonApi.kt b/controls-storage/src/commonMain/kotlin/ru/mipt/npm/controls/storage/storageCommon.kt similarity index 90% rename from controls-storage/src/commonMain/kotlin/ru.mipt.npm.controls.storage/synchronous/commonApi.kt rename to controls-storage/src/commonMain/kotlin/ru/mipt/npm/controls/storage/storageCommon.kt index 7a074f6..1446438 100644 --- a/controls-storage/src/commonMain/kotlin/ru.mipt.npm.controls.storage/synchronous/commonApi.kt +++ b/controls-storage/src/commonMain/kotlin/ru/mipt/npm/controls/storage/storageCommon.kt @@ -1,4 +1,4 @@ -package ru.mipt.npm.controls.storage.synchronous +package ru.mipt.npm.controls.storage import io.ktor.utils.io.core.use import kotlinx.coroutines.InternalCoroutinesApi @@ -14,11 +14,7 @@ import space.kscience.dataforge.context.Factory import space.kscience.dataforge.context.debug import space.kscience.dataforge.context.logger import space.kscience.dataforge.meta.Meta - -public enum class StorageKind { - DEVICE_HUB, - MAGIX_SERVER -} +import kotlin.jvm.JvmName /** * Begin to store DeviceMessages from this DeviceManager @@ -28,8 +24,9 @@ public enum class StorageKind { * @return Job which responsible for our storage */ @OptIn(InternalCoroutinesApi::class) +@JvmName("storeMessagesAsync") public fun DeviceManager.storeMessages( - factory: Factory, + factory: Factory, filterCondition: suspend (DeviceMessage) -> Boolean = { true }, ): Job { val client = factory(meta, context) @@ -51,13 +48,20 @@ public fun DeviceManager.storeMessages( * @param propertyName a name of property, history of which we want to get * @param factory a factory that produce mongo clients */ -public fun getPropertyHistory( +public suspend fun getPropertyHistory( sourceDeviceName: String, propertyName: String, - factory: Factory, - meta: Meta = Meta.EMPTY + factory: Factory, + meta: Meta = Meta.EMPTY, ): List { return factory(meta).use { it.getPropertyHistory(sourceDeviceName, propertyName) } } + + +public enum class StorageKind { + DEVICE_HUB, + MAGIX_SERVER +} + diff --git a/controls-storage/src/jvmMain/kotlin/ru/mipt/npm/controls/storage/asynchronous/jvmApi.kt b/controls-storage/src/jvmMain/kotlin/ru/mipt/npm/controls/storage/asynchronous/jvmApi.kt deleted file mode 100644 index ccf65ff..0000000 --- a/controls-storage/src/jvmMain/kotlin/ru/mipt/npm/controls/storage/asynchronous/jvmApi.kt +++ /dev/null @@ -1,40 +0,0 @@ -package ru.mipt.npm.controls.storage.asynchronous - -import io.ktor.application.* -import kotlinx.coroutines.InternalCoroutinesApi -import kotlinx.coroutines.flow.Flow -import kotlinx.coroutines.flow.MutableSharedFlow -import kotlinx.coroutines.flow.filter -import kotlinx.coroutines.flow.onEach -import kotlinx.coroutines.job -import ru.mipt.npm.magix.server.GenericMagixMessage -import space.kscience.dataforge.context.Factory -import space.kscience.dataforge.meta.Meta - -/** - * Asynchronous version of synchronous API, so for more details check relative docs - */ - -internal fun Flow.store( - client: AsynchronousStorageClient, - flowFilter: suspend (GenericMagixMessage) -> Boolean = { true }, -) { - filter(flowFilter).onEach { message -> - client.storeValueInMagixServer(message) - } -} - -@OptIn(InternalCoroutinesApi::class) -public fun Application.store( - flow: MutableSharedFlow, - factory: Factory, - meta: Meta = Meta.EMPTY, - flowFilter: suspend (GenericMagixMessage) -> Boolean = { true }, -) { - val client = factory(meta) - - flow.store(client, flowFilter) - coroutineContext.job.invokeOnCompletion(onCancelling = true) { - client.close() - } -} diff --git a/controls-storage/src/jvmMain/kotlin/ru/mipt/npm/controls/storage/synchronous/jvmApi.kt b/controls-storage/src/jvmMain/kotlin/ru/mipt/npm/controls/storage/storageJvm.kt similarity index 86% rename from controls-storage/src/jvmMain/kotlin/ru/mipt/npm/controls/storage/synchronous/jvmApi.kt rename to controls-storage/src/jvmMain/kotlin/ru/mipt/npm/controls/storage/storageJvm.kt index 0f26915..4e068f3 100644 --- a/controls-storage/src/jvmMain/kotlin/ru/mipt/npm/controls/storage/synchronous/jvmApi.kt +++ b/controls-storage/src/jvmMain/kotlin/ru/mipt/npm/controls/storage/storageJvm.kt @@ -1,6 +1,6 @@ -package ru.mipt.npm.controls.storage.synchronous +package ru.mipt.npm.controls.storage -import io.ktor.application.* +import io.ktor.application.Application import kotlinx.coroutines.InternalCoroutinesApi import kotlinx.coroutines.flow.Flow import kotlinx.coroutines.flow.MutableSharedFlow @@ -11,8 +11,12 @@ import ru.mipt.npm.magix.server.GenericMagixMessage import space.kscience.dataforge.context.Factory import space.kscience.dataforge.meta.Meta +/** + * Asynchronous version of synchronous API, so for more details check relative docs + */ + internal fun Flow.store( - client: SynchronousStorageClient, + client: EventStorage, flowFilter: suspend (GenericMagixMessage) -> Boolean = { true }, ) { filter(flowFilter).onEach { message -> @@ -29,8 +33,8 @@ internal fun Flow.store( @OptIn(InternalCoroutinesApi::class) public fun Application.store( flow: MutableSharedFlow, + factory: Factory, meta: Meta = Meta.EMPTY, - factory: Factory, flowFilter: suspend (GenericMagixMessage) -> Boolean = { true }, ) { val client = factory(meta) diff --git a/controls-xodus/build.gradle.kts b/controls-xodus/build.gradle.kts index b07ed6b..2dd89d0 100644 --- a/controls-xodus/build.gradle.kts +++ b/controls-xodus/build.gradle.kts @@ -6,10 +6,15 @@ plugins { val xodusVersion = "1.3.232" dependencies { - implementation(projects.xodusSerialization) - implementation(projects.controlsStorage) - implementation(projects.controlsCore) + api(projects.xodusSerialization) + api(projects.controlsStorage) implementation("org.jetbrains.xodus:xodus-entity-store:$xodusVersion") implementation("org.jetbrains.xodus:xodus-environment:$xodusVersion") implementation("org.jetbrains.xodus:xodus-vfs:$xodusVersion") -} \ No newline at end of file + + testImplementation(npmlibs.kotlinx.coroutines.test) +} + +readme{ + maturity = ru.mipt.npm.gradle.Maturity.PROTOTYPE +} diff --git a/controls-xodus/src/main/kotlin/ru/mipt/npm/controls/xodus/SynchronousXodusClient.kt b/controls-xodus/src/main/kotlin/ru/mipt/npm/controls/xodus/XodusEventStorage.kt similarity index 52% rename from controls-xodus/src/main/kotlin/ru/mipt/npm/controls/xodus/SynchronousXodusClient.kt rename to controls-xodus/src/main/kotlin/ru/mipt/npm/controls/xodus/XodusEventStorage.kt index 8bca4d2..3072141 100644 --- a/controls-xodus/src/main/kotlin/ru/mipt/npm/controls/xodus/SynchronousXodusClient.kt +++ b/controls-xodus/src/main/kotlin/ru/mipt/npm/controls/xodus/XodusEventStorage.kt @@ -5,8 +5,7 @@ import jetbrains.exodus.entitystore.PersistentEntityStores import kotlinx.datetime.Instant import kotlinx.serialization.KSerializer import ru.mipt.npm.controls.api.PropertyChangedMessage -import ru.mipt.npm.controls.storage.synchronous.StorageKind -import ru.mipt.npm.controls.storage.synchronous.SynchronousStorageClient +import ru.mipt.npm.controls.storage.EventStorage import ru.mipt.npm.xodus.serialization.json.decodeFromEntity import ru.mipt.npm.xodus.serialization.json.encodeToEntity import space.kscience.dataforge.context.Context @@ -15,7 +14,6 @@ import space.kscience.dataforge.meta.Meta import space.kscience.dataforge.meta.get import space.kscience.dataforge.meta.string import space.kscience.dataforge.names.Name -import kotlin.reflect.KClass private const val DEFAULT_XODUS_STORE_PATH = ".storage" public val XODUS_STORE_PROPERTY: Name = Name.of("xodus", "entityStorePath") @@ -23,31 +21,38 @@ public val XODUS_STORE_PROPERTY: Name = Name.of("xodus", "entityStorePath") private const val DEVICE_HUB_ENTITY_TYPE = "DeviceMessage" private const val MAGIX_SERVER_ENTITY_TYPE = "MagixMessage" -internal class SynchronousXodusClient(private val entityStore: PersistentEntityStore) : SynchronousStorageClient { - override fun storeValueInDeviceHub(value: T, serializer: KSerializer) { +public class XodusEventStorage(private val entityStore: PersistentEntityStore) : EventStorage { + override suspend fun storeValueInDeviceHub(value: T, serializer: KSerializer) { entityStore.encodeToEntity(value, DEVICE_HUB_ENTITY_TYPE, serializer) } - override fun storeValueInMagixServer(value: T, serializer: KSerializer) { + override suspend fun storeValueInMagixServer(value: T, serializer: KSerializer) { entityStore.encodeToEntity(value, MAGIX_SERVER_ENTITY_TYPE, serializer) } - override fun getPropertyHistory( + override suspend fun getPropertyHistory( sourceDeviceName: String, - propertyName: String - ): List { - return entityStore.computeInTransaction { txn -> - txn.find(DEVICE_HUB_ENTITY_TYPE, "type", "property.changed").asSequence() - .filter { it?.getProperty("sourceDevice")?.let { it == sourceDeviceName } ?: false && - it?.getProperty("property")?.let { it == propertyName } ?: false - }.sortedByDescending { it?.getProperty("time")?.let { timeStr -> Instant.parse(timeStr as String) } } - .toList().map { txn.decodeFromEntity(it) } - } + propertyName: String, + ): List = entityStore.computeInTransaction { txn -> + txn.find(DEVICE_HUB_ENTITY_TYPE, "type", "property.changed") + .filter { + it?.getProperty("sourceDevice") == sourceDeviceName && it.getProperty("property") == propertyName + } + .sortedByDescending { it?.getProperty("time")?.let { timeStr -> Instant.parse(timeStr as String) } } + .map { txn.decodeFromEntity(it, PropertyChangedMessage.serializer()) } + .toList() } override fun close() { entityStore.close() } + + public companion object : Factory { + override fun invoke(meta: Meta, context: Context): EventStorage { + val entityStore = context.getPersistentEntityStore(meta) + return XodusEventStorage(entityStore) + } + } } private fun Context.getPersistentEntityStore(meta: Meta = Meta.EMPTY): PersistentEntityStore { @@ -57,10 +62,3 @@ private fun Context.getPersistentEntityStore(meta: Meta = Meta.EMPTY): Persisten return PersistentEntityStores.newInstance(storePath) } - -public object DefaultSynchronousXodusClientFactory : Factory { - override fun invoke(meta: Meta, context: Context): SynchronousStorageClient { - val entityStore = context.getPersistentEntityStore(meta) - return SynchronousXodusClient(entityStore) - } -} diff --git a/controls-xodus/src/test/kotlin/PropertyHistoryTest.kt b/controls-xodus/src/test/kotlin/PropertyHistoryTest.kt index 4df90ba..b1d8080 100644 --- a/controls-xodus/src/test/kotlin/PropertyHistoryTest.kt +++ b/controls-xodus/src/test/kotlin/PropertyHistoryTest.kt @@ -1,4 +1,6 @@ import jetbrains.exodus.entitystore.PersistentEntityStores +import kotlinx.coroutines.ExperimentalCoroutinesApi +import kotlinx.coroutines.test.runTest import kotlinx.datetime.Instant import org.junit.jupiter.api.AfterAll import org.junit.jupiter.api.Assertions.assertEquals @@ -6,9 +8,9 @@ import org.junit.jupiter.api.BeforeAll import org.junit.jupiter.api.Test import ru.mipt.npm.controls.api.DeviceMessage import ru.mipt.npm.controls.api.PropertyChangedMessage -import ru.mipt.npm.controls.storage.synchronous.getPropertyHistory -import ru.mipt.npm.controls.xodus.DefaultSynchronousXodusClientFactory +import ru.mipt.npm.controls.storage.getPropertyHistory import ru.mipt.npm.controls.xodus.XODUS_STORE_PROPERTY +import ru.mipt.npm.controls.xodus.XodusEventStorage import ru.mipt.npm.xodus.serialization.json.encodeToEntity import space.kscience.dataforge.meta.Meta import space.kscience.dataforge.names.Name @@ -56,11 +58,15 @@ internal class PropertyHistoryTest { } } + @OptIn(ExperimentalCoroutinesApi::class) @Test - fun getPropertyHistoryTest() { - assertEquals(listOf(propertyChangedMessages[0]), getPropertyHistory( - "virtual-car", "speed", DefaultSynchronousXodusClientFactory, Meta { + fun getPropertyHistoryTest() = runTest { + assertEquals( + listOf(propertyChangedMessages[0]), + getPropertyHistory( + "virtual-car", "speed", XodusEventStorage, Meta { XODUS_STORE_PROPERTY put storeName - })) + }) + ) } } \ No newline at end of file diff --git a/demo/car/src/main/kotlin/ru/mipt/npm/controls/demo/car/VirtualCarController.kt b/demo/car/src/main/kotlin/ru/mipt/npm/controls/demo/car/VirtualCarController.kt index d13c8ef..9b9a61e 100644 --- a/demo/car/src/main/kotlin/ru/mipt/npm/controls/demo/car/VirtualCarController.kt +++ b/demo/car/src/main/kotlin/ru/mipt/npm/controls/demo/car/VirtualCarController.kt @@ -14,12 +14,10 @@ import ru.mipt.npm.controls.controllers.DeviceManager import ru.mipt.npm.controls.controllers.install import ru.mipt.npm.controls.demo.car.IVirtualCar.Companion.acceleration import ru.mipt.npm.controls.mongo.DefaultAsynchronousMongoClientFactory -import ru.mipt.npm.controls.storage.asynchronous.store -import ru.mipt.npm.controls.storage.asynchronous.storeMessages -import ru.mipt.npm.controls.storage.synchronous.store -import ru.mipt.npm.controls.storage.synchronous.storeMessages -import ru.mipt.npm.controls.xodus.DefaultSynchronousXodusClientFactory +import ru.mipt.npm.controls.storage.store +import ru.mipt.npm.controls.storage.storeMessages import ru.mipt.npm.controls.xodus.XODUS_STORE_PROPERTY +import ru.mipt.npm.controls.xodus.XodusEventStorage import ru.mipt.npm.magix.api.MagixEndpoint import ru.mipt.npm.magix.rsocket.rSocketWithTcp import ru.mipt.npm.magix.server.startMagixServer @@ -57,14 +55,14 @@ class VirtualCarController : Controller(), ContextAware { //starting magix event loop and connect it to entity store magixServer = startMagixServer(enableRawRSocket = true, enableZmq = true) { flow -> - store( flow, Meta { + store(flow, XodusEventStorage, Meta { XODUS_STORE_PROPERTY put VirtualCarControllerConfig.magixEntityStorePath.toString() - }, DefaultSynchronousXodusClientFactory) + }) store(flow, DefaultAsynchronousMongoClientFactory) } magixVirtualCar = deviceManager.install("magix-virtual-car", MagixVirtualCar) //connect to device entity store - xodusStorageJob = deviceManager.storeMessages(DefaultSynchronousXodusClientFactory) + xodusStorageJob = deviceManager.storeMessages(XodusEventStorage) //Create mongo client and connect to MongoDB mongoStorageJob = deviceManager.storeMessages(DefaultAsynchronousMongoClientFactory) //Launch device client and connect it to the server