From 89bb132e36df4ae92f3e05f258bd0f985fad83a3 Mon Sep 17 00:00:00 2001 From: Atos1337 Date: Fri, 14 Jan 2022 21:47:48 +0300 Subject: [PATCH] Refactor synchronous storage api --- controls-storage/build.gradle.kts | 27 +++++ .../synchronous/SynchronousStorageClient.kt | 14 +++ .../synchronous/commonApi.kt | 61 ++++++++++ .../controls/storage/synchronous/jvmApi.kt | 42 +++++++ controls-xodus/README.md | 26 ---- controls-xodus/build.gradle.kts | 6 +- .../controls/xodus/SynchronousXodusClient.kt | 66 +++++++++++ .../ru/mipt/npm/controls/xodus/connections.kt | 112 ------------------ .../ru/mipt/npm/controls/xodus/converters.kt | 73 ------------ .../mipt/npm/controls/xodus/util/queries.kt | 43 ------- .../mipt/npm/controls/xodus/ConvertersTest.kt | 60 ---------- .../npm/controls/xodus/util/QueriesTest.kt | 62 ---------- demo/car/build.gradle.kts | 1 + .../controls/demo/car/VirtualCarController.kt | 11 +- settings.gradle.kts | 4 +- .../npm/xodus/serialization/json/decoder.kt | 3 +- .../npm/xodus/serialization/json/encoder.kt | 6 + 17 files changed, 229 insertions(+), 388 deletions(-) create mode 100644 controls-storage/build.gradle.kts create mode 100644 controls-storage/src/commonMain/kotlin/ru.mipt.npm.controls.storage/synchronous/SynchronousStorageClient.kt create mode 100644 controls-storage/src/commonMain/kotlin/ru.mipt.npm.controls.storage/synchronous/commonApi.kt create mode 100644 controls-storage/src/jvmMain/kotlin/ru/mipt/npm/controls/storage/synchronous/jvmApi.kt delete mode 100644 controls-xodus/README.md create mode 100644 controls-xodus/src/main/kotlin/ru/mipt/npm/controls/xodus/SynchronousXodusClient.kt delete mode 100644 controls-xodus/src/main/kotlin/ru/mipt/npm/controls/xodus/connections.kt delete mode 100644 controls-xodus/src/main/kotlin/ru/mipt/npm/controls/xodus/converters.kt delete mode 100644 controls-xodus/src/main/kotlin/ru/mipt/npm/controls/xodus/util/queries.kt delete mode 100644 controls-xodus/src/test/kotlin/ru/mipt/npm/controls/xodus/ConvertersTest.kt delete mode 100644 controls-xodus/src/test/kotlin/ru/mipt/npm/controls/xodus/util/QueriesTest.kt diff --git a/controls-storage/build.gradle.kts b/controls-storage/build.gradle.kts new file mode 100644 index 0000000..beec52d --- /dev/null +++ b/controls-storage/build.gradle.kts @@ -0,0 +1,27 @@ +plugins { + id("ru.mipt.npm.gradle.mpp") + `maven-publish` +} + +val dataforgeVersion: String by rootProject.extra +val kotlinx_io_version = "0.1.1" + +kotlin { + sourceSets { + commonMain { + dependencies { + implementation(projects.controlsCore) + implementation("org.jetbrains.kotlinx:kotlinx-io:$kotlinx_io_version") + } + } + + jvmMain { + dependencies { + implementation(projects.magix.magixApi) + implementation(projects.controlsMagixClient) + implementation(projects.magix.magixServer) + implementation("org.jetbrains.kotlinx:kotlinx-io-jvm:$kotlinx_io_version") + } + } + } +} diff --git a/controls-storage/src/commonMain/kotlin/ru.mipt.npm.controls.storage/synchronous/SynchronousStorageClient.kt b/controls-storage/src/commonMain/kotlin/ru.mipt.npm.controls.storage/synchronous/SynchronousStorageClient.kt new file mode 100644 index 0000000..ef4e22f --- /dev/null +++ b/controls-storage/src/commonMain/kotlin/ru.mipt.npm.controls.storage/synchronous/SynchronousStorageClient.kt @@ -0,0 +1,14 @@ +package ru.mipt.npm.controls.storage.synchronous + +import kotlinx.io.core.Closeable +import ru.mipt.npm.controls.api.PropertyChangedMessage +import kotlin.reflect.KClass + +public interface SynchronousStorageClient : Closeable { + public fun storeValue(value: T, storageKind: StorageKind, clazz: KClass) + + public fun getPropertyHistory(sourceDeviceName: String, propertyName: String): List +} + +public inline fun SynchronousStorageClient.storeValue(value: T, storageKind: StorageKind): Unit = + storeValue(value, storageKind, T::class) diff --git a/controls-storage/src/commonMain/kotlin/ru.mipt.npm.controls.storage/synchronous/commonApi.kt b/controls-storage/src/commonMain/kotlin/ru.mipt.npm.controls.storage/synchronous/commonApi.kt new file mode 100644 index 0000000..29155dc --- /dev/null +++ b/controls-storage/src/commonMain/kotlin/ru.mipt.npm.controls.storage/synchronous/commonApi.kt @@ -0,0 +1,61 @@ +package ru.mipt.npm.controls.storage.synchronous + +import kotlinx.coroutines.InternalCoroutinesApi +import kotlinx.coroutines.Job +import kotlinx.coroutines.flow.* +import kotlinx.io.core.use +import ru.mipt.npm.controls.api.DeviceMessage +import ru.mipt.npm.controls.api.PropertyChangedMessage +import ru.mipt.npm.controls.controllers.DeviceManager +import ru.mipt.npm.controls.controllers.hubMessageFlow +import space.kscience.dataforge.context.Factory +import space.kscience.dataforge.context.debug +import space.kscience.dataforge.context.logger +import space.kscience.dataforge.meta.Meta + +public enum class StorageKind { + DEVICE_HUB, + MAGIX_SERVER +} + +/** + * Begin to store DeviceMessages from this DeviceManager + * @param factory factory that will be used for creating persistent entity store instance. DefaultPersistentStoreFactory by default. + * DeviceManager's meta and context will be used for in invoke method. + * @param filterCondition allow you to specify messages which we want to store. Always true by default. + * @return Job which responsible for our storage + */ +@OptIn(InternalCoroutinesApi::class) +public fun DeviceManager.storeMessages( + factory: Factory, + filterCondition: suspend (DeviceMessage) -> Boolean = { true }, +): Job { + val client = factory(meta, context) + logger.debug { "Storage client created" } + + return hubMessageFlow(context).filter(filterCondition).onEach { message -> + client.storeValue(message, StorageKind.DEVICE_HUB) + }.launchIn(context).apply { + invokeOnCompletion(onCancelling = true) { + client.close() + logger.debug { "Storage client closed" } + } + } +} + +/** + * @return the list of deviceMessages that describes changes of specified property of specified device sorted by time + * @param sourceDeviceName a name of device, history of which property we want to get + * @param propertyName a name of property, history of which we want to get + * @param factory a factory that produce mongo clients + */ +public fun getPropertyHistory( + sourceDeviceName: String, + propertyName: String, + factory: Factory, + meta: Meta = Meta.EMPTY +): List { + return factory(meta).use { + it.getPropertyHistory(sourceDeviceName, propertyName) + } +} diff --git a/controls-storage/src/jvmMain/kotlin/ru/mipt/npm/controls/storage/synchronous/jvmApi.kt b/controls-storage/src/jvmMain/kotlin/ru/mipt/npm/controls/storage/synchronous/jvmApi.kt new file mode 100644 index 0000000..6c31500 --- /dev/null +++ b/controls-storage/src/jvmMain/kotlin/ru/mipt/npm/controls/storage/synchronous/jvmApi.kt @@ -0,0 +1,42 @@ +package ru.mipt.npm.controls.storage.synchronous + +import io.ktor.application.* +import kotlinx.coroutines.InternalCoroutinesApi +import kotlinx.coroutines.flow.Flow +import kotlinx.coroutines.flow.MutableSharedFlow +import kotlinx.coroutines.flow.filter +import kotlinx.coroutines.flow.onEach +import kotlinx.coroutines.job +import ru.mipt.npm.magix.server.GenericMagixMessage +import space.kscience.dataforge.context.Factory +import space.kscience.dataforge.meta.Meta + +internal fun Flow.store( + client: SynchronousStorageClient, + flowFilter: suspend (GenericMagixMessage) -> Boolean = { true }, +) { + filter(flowFilter).onEach { message -> + client.storeValue(message, StorageKind.MAGIX_SERVER) + } +} + +/** Begin to store MagixMessages from certain flow + * @param flow flow of messages which we will store + * @param meta Meta which may have some configuration parameters for our storage and will be used in invoke method of factory + * @param factory factory that will be used for creating persistent entity store instance. DefaultPersistentStoreFactory by default. + * @param flowFilter allow you to specify messages which we want to store. Always true by default. + */ +@OptIn(InternalCoroutinesApi::class) +public fun Application.store( + flow: MutableSharedFlow, + meta: Meta = Meta.EMPTY, + factory: Factory, + flowFilter: suspend (GenericMagixMessage) -> Boolean = { true }, +) { + val client = factory(meta) + + flow.store(client, flowFilter) + coroutineContext.job.invokeOnCompletion(onCancelling = true) { + client.close() + } +} diff --git a/controls-xodus/README.md b/controls-xodus/README.md deleted file mode 100644 index dca332c..0000000 --- a/controls-xodus/README.md +++ /dev/null @@ -1,26 +0,0 @@ -# Description -This module allows you to store [DeviceMessages](/controls-core/src/commonMain/kotlin/ru/mipt/npm/controls/api/DeviceMessage.kt) -from certain [DeviceManager](/controls-core/src/commonMain/kotlin/ru/mipt/npm/controls/controllers/DeviceManager.kt) -or [MagixMessages](magix/magix-api/src/commonMain/kotlin/ru/mipt/npm/magix/api/MagixMessage.kt) -from [magix server](/magix/magix-server/src/main/kotlin/ru/mipt/npm/magix/server/server.kt) -in [xodus database](https://github.com/JetBrains/xodus). - -# Usage - -All usage examples can be found in [VirtualCarController](/demo/car/src/main/kotlin/ru/mipt/npm/controls/demo/car/VirtualCarController.kt). - -## Storage from Device Manager - -Just call connectMongo. For more details, you can see comments in [source code](/controls-xodus/src/main/kotlin/ru/mipt/npm/controls/xodus/connections.kt) - -## Storage from Magix Server - -Just pass such lambda as parameter to startMagixServer: -```kotlin -{ flow -> - // some code - storeInXodus(flow) - // some code -} -``` -For more details, you can see comments in [source code](/controls-xodus/src/main/kotlin/ru/mipt/npm/controls/xodus/connections.kt) diff --git a/controls-xodus/build.gradle.kts b/controls-xodus/build.gradle.kts index 9421f36..b07ed6b 100644 --- a/controls-xodus/build.gradle.kts +++ b/controls-xodus/build.gradle.kts @@ -6,11 +6,9 @@ plugins { val xodusVersion = "1.3.232" dependencies { - implementation(projects.controlsCore) - implementation(projects.magix.magixApi) - implementation(projects.controlsMagixClient) - implementation(projects.magix.magixServer) implementation(projects.xodusSerialization) + implementation(projects.controlsStorage) + implementation(projects.controlsCore) implementation("org.jetbrains.xodus:xodus-entity-store:$xodusVersion") implementation("org.jetbrains.xodus:xodus-environment:$xodusVersion") implementation("org.jetbrains.xodus:xodus-vfs:$xodusVersion") diff --git a/controls-xodus/src/main/kotlin/ru/mipt/npm/controls/xodus/SynchronousXodusClient.kt b/controls-xodus/src/main/kotlin/ru/mipt/npm/controls/xodus/SynchronousXodusClient.kt new file mode 100644 index 0000000..344d34e --- /dev/null +++ b/controls-xodus/src/main/kotlin/ru/mipt/npm/controls/xodus/SynchronousXodusClient.kt @@ -0,0 +1,66 @@ +package ru.mipt.npm.controls.xodus + +import jetbrains.exodus.entitystore.PersistentEntityStore +import jetbrains.exodus.entitystore.PersistentEntityStores +import kotlinx.datetime.Instant +import ru.mipt.npm.controls.api.PropertyChangedMessage +import ru.mipt.npm.controls.storage.synchronous.StorageKind +import ru.mipt.npm.controls.storage.synchronous.SynchronousStorageClient +import ru.mipt.npm.xodus.serialization.json.decodeFromEntity +import ru.mipt.npm.xodus.serialization.json.encodeToEntity +import space.kscience.dataforge.context.Context +import space.kscience.dataforge.context.Factory +import space.kscience.dataforge.meta.Meta +import space.kscience.dataforge.meta.get +import space.kscience.dataforge.meta.string +import space.kscience.dataforge.names.Name +import kotlin.reflect.KClass + +private const val DEFAULT_XODUS_STORE_PATH = ".storage" +public val XODUS_STORE_PROPERTY: Name = Name.of("xodus", "entityStorePath") + +private const val DEVICE_HUB_ENTITY_TYPE = "DeviceMessage" +private const val MAGIX_SERVER_ENTITY_TYPE = "MagixMessage" + +internal class SynchronousXodusClient(private val entityStore: PersistentEntityStore) : SynchronousStorageClient { + override fun storeValue(value: T, storageKind: StorageKind, clazz: KClass) { + val entityType = when (storageKind) { + StorageKind.DEVICE_HUB -> DEVICE_HUB_ENTITY_TYPE + StorageKind.MAGIX_SERVER -> MAGIX_SERVER_ENTITY_TYPE + } + + entityStore.encodeToEntity(value, entityType, clazz) + } + + override fun getPropertyHistory( + sourceDeviceName: String, + propertyName: String + ): List { + return entityStore.computeInTransaction { txn -> + txn.find(DEVICE_HUB_ENTITY_TYPE, "type", "property.changed").asSequence() + .filter { it?.getProperty("sourceDevice")?.let { it == sourceDeviceName } ?: false && + it?.getProperty("property")?.let { it == propertyName } ?: false + }.sortedByDescending { it?.getProperty("time")?.let { timeStr -> Instant.parse(timeStr as String) } } + .toList().map { txn.decodeFromEntity(it) } + } + } + + override fun close() { + entityStore.close() + } +} + +private fun Context.getPersistentEntityStore(meta: Meta = Meta.EMPTY): PersistentEntityStore { + val storePath = meta[XODUS_STORE_PROPERTY]?.string + ?: properties[XODUS_STORE_PROPERTY]?.string + ?: DEFAULT_XODUS_STORE_PATH + + return PersistentEntityStores.newInstance(storePath) +} + +public object DefaultSynchronousXodusClientFactory : Factory { + override fun invoke(meta: Meta, context: Context): SynchronousStorageClient { + val entityStore = context.getPersistentEntityStore(meta) + return SynchronousXodusClient(entityStore) + } +} diff --git a/controls-xodus/src/main/kotlin/ru/mipt/npm/controls/xodus/connections.kt b/controls-xodus/src/main/kotlin/ru/mipt/npm/controls/xodus/connections.kt deleted file mode 100644 index 3fe3ce6..0000000 --- a/controls-xodus/src/main/kotlin/ru/mipt/npm/controls/xodus/connections.kt +++ /dev/null @@ -1,112 +0,0 @@ -package ru.mipt.npm.controls.xodus - -import io.ktor.application.Application -import jetbrains.exodus.entitystore.PersistentEntityStore -import jetbrains.exodus.entitystore.PersistentEntityStores -import kotlinx.coroutines.InternalCoroutinesApi -import kotlinx.coroutines.Job -import kotlinx.coroutines.flow.* -import kotlinx.coroutines.job -import ru.mipt.npm.controls.api.DeviceMessage -import ru.mipt.npm.controls.controllers.DeviceManager -import ru.mipt.npm.controls.controllers.hubMessageFlow -import ru.mipt.npm.magix.server.GenericMagixMessage -import ru.mipt.npm.xodus.serialization.json.encodeToEntity -import space.kscience.dataforge.context.Context -import space.kscience.dataforge.context.Factory -import space.kscience.dataforge.context.debug -import space.kscience.dataforge.context.logger -import space.kscience.dataforge.meta.Meta -import space.kscience.dataforge.meta.get -import space.kscience.dataforge.meta.string -import space.kscience.dataforge.names.Name - -private const val DEFAULT_XODUS_STORE_PATH = ".storage" -public val XODUS_STORE_PROPERTY: Name = Name.of("xodus", "entityStorePath") - -private fun Context.getPersistentEntityStore(meta: Meta = Meta.EMPTY): PersistentEntityStore { - val storePath = meta[XODUS_STORE_PROPERTY]?.string - ?: properties[XODUS_STORE_PROPERTY]?.string - ?: DEFAULT_XODUS_STORE_PATH - - return PersistentEntityStores.newInstance(storePath) -} - -internal val defaultPersistentStoreFactory = object : Factory { - override fun invoke(meta: Meta, context: Context): PersistentEntityStore = context.getPersistentEntityStore(meta) -} - -/** - * Begin to store DeviceMessages from this DeviceManager - * @param factory factory that will be used for creating persistent entity store instance. DefaultPersistentStoreFactory by default. - * DeviceManager's meta and context will be used for in invoke method. - * @param filterCondition allow you to specify messages which we want to store. Always true by default. - * @return Job which responsible for our storage - */ -@OptIn(InternalCoroutinesApi::class) -public fun DeviceManager.storeMessagesInXodus( - factory: Factory = defaultPersistentStoreFactory, - filterCondition: suspend (DeviceMessage) -> Boolean = { true }, -): Job { - val entityStore = factory(meta, context) - logger.debug { "Device entity store opened" } - - return hubMessageFlow(context).filter(filterCondition).onEach { message -> - entityStore.encodeToEntity(message, "DeviceMessage") - }.launchIn(context).apply { - invokeOnCompletion(onCancelling = true) { - entityStore.close() - logger.debug { "Device entity store closed" } - } - } -} - -//public fun CoroutineScope.startMagixServer( -// entityStore: PersistentEntityStore, -// flowFilter: suspend (GenericMagixMessage) -> Boolean = { true }, -// port: Int = MagixEndpoint.DEFAULT_MAGIX_HTTP_PORT, -// buffer: Int = 100, -// enableRawRSocket: Boolean = true, -// enableZmq: Boolean = true, -// applicationConfiguration: Application.(MutableSharedFlow) -> Unit = {}, -//): ApplicationEngine = startMagixServer( -// port, buffer, enableRawRSocket, enableZmq -//) { flow -> -// applicationConfiguration(flow) -// flow.filter(flowFilter).onEach { message -> -// entityStore.executeInTransaction { txn -> -// val entity = txn.newEntity("MagixMessage") -// entity.setProperty("value", message.toString()) -// } -// } -//} - -internal fun Flow.storeInXodus( - entityStore: PersistentEntityStore, - flowFilter: suspend (GenericMagixMessage) -> Boolean = { true }, -) { - filter(flowFilter).onEach { message -> - entityStore.encodeToEntity(message, "MagixMessage") - } -} - -/** Begin to store MagixMessages from certain flow - * @param flow flow of messages which we will store - * @param meta Meta which may have some configuration parameters for our storage and will be used in invoke method of factory - * @param factory factory that will be used for creating persistent entity store instance. DefaultPersistentStoreFactory by default. - * @param flowFilter allow you to specify messages which we want to store. Always true by default. - */ -@OptIn(InternalCoroutinesApi::class) -public fun Application.storeInXodus( - flow: MutableSharedFlow, - meta: Meta = Meta.EMPTY, - factory: Factory = defaultPersistentStoreFactory, - flowFilter: suspend (GenericMagixMessage) -> Boolean = { true }, -) { - val entityStore = factory(meta) - - flow.storeInXodus(entityStore, flowFilter) - coroutineContext.job.invokeOnCompletion(onCancelling = true) { - entityStore.close() - } -} diff --git a/controls-xodus/src/main/kotlin/ru/mipt/npm/controls/xodus/converters.kt b/controls-xodus/src/main/kotlin/ru/mipt/npm/controls/xodus/converters.kt deleted file mode 100644 index f7bbd87..0000000 --- a/controls-xodus/src/main/kotlin/ru/mipt/npm/controls/xodus/converters.kt +++ /dev/null @@ -1,73 +0,0 @@ -package ru.mipt.npm.controls.xodus - -import jetbrains.exodus.entitystore.Entity -import jetbrains.exodus.entitystore.StoreTransaction -import kotlinx.datetime.Instant -import kotlinx.serialization.json.Json -import kotlinx.serialization.json.JsonElement -import ru.mipt.npm.controls.api.PropertyChangedMessage -import ru.mipt.npm.magix.api.MagixMessage -import space.kscience.dataforge.meta.MetaSerializer -import space.kscience.dataforge.meta.isLeaf -import space.kscience.dataforge.names.Name -import space.kscience.dataforge.values.Value -import space.kscience.dataforge.values.ValueType - -internal fun PropertyChangedMessage.toEntity(transaction: StoreTransaction): Entity { - val entity = transaction.newEntity("PropertyChangedMessage") - entity.setProperty("property", property) - entity.setProperty("value", value.toString()) - entity.setProperty("sourceDevice", sourceDevice.toString()) - targetDevice?.let { entity.setProperty("targetDevice", it.toString()) } - comment?.let { entity.setProperty("comment", it) } - time?.let { entity.setProperty("time", it.toEpochMilliseconds()) } - return entity -} - -internal fun Entity.toPropertyChangedMessage(): PropertyChangedMessage? { - if (getProperty("property") == null || getProperty("value") == null || getProperty("sourceDevice") == null) { - return null - } - - return PropertyChangedMessage( - getProperty("property") as String, - Json.decodeFromString(MetaSerializer, getProperty("value") as String), - Name.parse(getProperty("sourceDevice") as String), - getProperty("targetDevice")?.let { Name.parse(it as String) }, - getProperty("comment")?.let { it as String }, - getProperty("time")?.let { Instant.fromEpochMilliseconds(it as Long) } - ) -} - -internal fun MagixMessage.toEntity(transaction: StoreTransaction): Entity { - val entity = transaction.newEntity("MagixMessage") - entity.setProperty("format", format) - entity.setProperty("origin", origin) - if (payload is PropertyChangedMessage) { - val payloadEntity = (payload as PropertyChangedMessage).toEntity(transaction) - entity.setLink("payload", payloadEntity) - } - target?.let { entity.setProperty("target", it) } - id?.let { entity.setProperty("id", it) } - parentId?.let { entity.setProperty("parentId", it) } - user?.let { entity.setProperty("user", it.toString()) } - return entity -} - -internal fun Entity.toMagixMessage(): MagixMessage? { - if (getProperty("format") == null || getProperty("origin") == null) { - return null - } - - return getLink("payload")?.toPropertyChangedMessage()?.let { propertyChangedMessage -> - MagixMessage( - getProperty("format") as String, - getProperty("origin") as String, - propertyChangedMessage, - getProperty("target")?.let { it as String }, - getProperty("id")?.let { it as String }, - getProperty("parentId")?.let { it as String }, - getProperty("user")?.let { Json.decodeFromString(JsonElement.serializer(), it as String) } - ) - } -} diff --git a/controls-xodus/src/main/kotlin/ru/mipt/npm/controls/xodus/util/queries.kt b/controls-xodus/src/main/kotlin/ru/mipt/npm/controls/xodus/util/queries.kt deleted file mode 100644 index 5d7f959..0000000 --- a/controls-xodus/src/main/kotlin/ru/mipt/npm/controls/xodus/util/queries.kt +++ /dev/null @@ -1,43 +0,0 @@ -package ru.mipt.npm.controls.xodus.util - -import jetbrains.exodus.entitystore.PersistentEntityStore -import jetbrains.exodus.entitystore.StoreTransaction -import kotlinx.datetime.Instant -import ru.mipt.npm.controls.api.PropertyChangedMessage -import ru.mipt.npm.controls.xodus.defaultPersistentStoreFactory -import ru.mipt.npm.controls.xodus.toPropertyChangedMessage -import ru.mipt.npm.xodus.serialization.json.decodeFromEntity -import space.kscience.dataforge.context.Factory -import space.kscience.dataforge.meta.Meta - -public fun StoreTransaction.selectPropertyChangedMessagesFromRange( - range: ClosedRange -): List = find( - "PropertyChangedMessage", - "time", - range.start.toEpochMilliseconds(), - range.endInclusive.toEpochMilliseconds() -).mapNotNull { it.toPropertyChangedMessage() } - -/** - * @return the list of deviceMessages that describes changes of specified property of specified device sorted by time - * @param sourceDeviceName a name of device, history of which property we want to get - * @param propertyName a name of property, history of which we want to get - * @param factory a factory that produce mongo clients - */ -public fun getPropertyHistory( - sourceDeviceName: String, - propertyName: String, - factory: Factory = defaultPersistentStoreFactory, - meta: Meta = Meta.EMPTY -): List { - return factory(meta).use { store -> - store.computeInTransaction { txn -> - txn.find("DeviceMessage", "type", "property.changed").asSequence() - .filter { it?.getProperty("sourceDevice")?.let { it == sourceDeviceName } ?: false && - it?.getProperty("property")?.let { it == propertyName } ?: false - }.sortedByDescending { it?.getProperty("time")?.let { timeStr -> Instant.parse(timeStr as String) } } - .toList().map { txn.decodeFromEntity(it) } - } - } -} diff --git a/controls-xodus/src/test/kotlin/ru/mipt/npm/controls/xodus/ConvertersTest.kt b/controls-xodus/src/test/kotlin/ru/mipt/npm/controls/xodus/ConvertersTest.kt deleted file mode 100644 index da7f4d6..0000000 --- a/controls-xodus/src/test/kotlin/ru/mipt/npm/controls/xodus/ConvertersTest.kt +++ /dev/null @@ -1,60 +0,0 @@ -package ru.mipt.npm.controls.xodus - -import jetbrains.exodus.entitystore.PersistentEntityStores -import kotlinx.datetime.Instant -import kotlinx.serialization.json.JsonObject -import kotlinx.serialization.json.JsonPrimitive -import org.junit.jupiter.api.AfterAll -import org.junit.jupiter.api.BeforeAll -import org.junit.jupiter.api.Test -import ru.mipt.npm.controls.api.PropertyChangedMessage -import ru.mipt.npm.magix.api.MagixMessage -import space.kscience.dataforge.meta.Meta -import space.kscience.dataforge.names.Name -import java.io.File -import kotlin.test.assertEquals - -internal class ConvertersTest { - companion object { - private val storeName = ".converters_test" - private val entityStore = PersistentEntityStores.newInstance(storeName) - private val expectedMessage = MagixMessage( - "dataforge", - "dataforge", - PropertyChangedMessage( - "acceleration", - Meta { - "x" put 3.0 - "y" put 9.0 - }, - Name.parse("virtual-car"), - Name.parse("magix-virtual-car"), - time = Instant.fromEpochMilliseconds(1337) - ), - "magix-virtual-car", - user = JsonObject(content = mapOf(Pair("name", JsonPrimitive("SCADA")))) - ) - - @BeforeAll - @JvmStatic - fun createEntities() { - entityStore.executeInTransaction { - expectedMessage.toEntity(it) - } - } - - @AfterAll - @JvmStatic - fun deleteDatabase() { - entityStore.close() - File(storeName).deleteRecursively() - } - } - - @Test - fun testMagixMessageAndPropertyChangedMessageConverters() { - assertEquals(expectedMessage, entityStore.computeInReadonlyTransaction { - it.getAll("MagixMessage").first?.toMagixMessage() - }!!) - } -} diff --git a/controls-xodus/src/test/kotlin/ru/mipt/npm/controls/xodus/util/QueriesTest.kt b/controls-xodus/src/test/kotlin/ru/mipt/npm/controls/xodus/util/QueriesTest.kt deleted file mode 100644 index b09ae2e..0000000 --- a/controls-xodus/src/test/kotlin/ru/mipt/npm/controls/xodus/util/QueriesTest.kt +++ /dev/null @@ -1,62 +0,0 @@ -package ru.mipt.npm.controls.xodus.util - -import jetbrains.exodus.entitystore.PersistentEntityStores -import kotlinx.datetime.Instant -import org.junit.jupiter.api.AfterAll -import org.junit.jupiter.api.Assertions.assertEquals -import org.junit.jupiter.api.BeforeAll -import org.junit.jupiter.api.Test -import ru.mipt.npm.controls.api.PropertyChangedMessage -import ru.mipt.npm.controls.xodus.toEntity -import space.kscience.dataforge.meta.Meta -import java.io.File - -internal class QueriesTest { - companion object { - private val storeName = ".queries_test" - private val entityStore = PersistentEntityStores.newInstance(storeName) - - private val propertyChangedMessages = listOf( - PropertyChangedMessage( - "", - Meta.EMPTY, - time = Instant.fromEpochMilliseconds(1000) - ), - PropertyChangedMessage( - "", - Meta.EMPTY, - time = Instant.fromEpochMilliseconds(1500) - ), - PropertyChangedMessage( - "", - Meta.EMPTY, - time = Instant.fromEpochMilliseconds(2000) - ) - ) - - @BeforeAll - @JvmStatic - fun createEntities() { - entityStore.executeInTransaction { transaction -> - propertyChangedMessages.forEach { - it.toEntity(transaction) - } - } - } - - @AfterAll - @JvmStatic - fun deleteDatabase() { - entityStore.close() - File(storeName).deleteRecursively() - } - } - - @Test - fun testFromTo() { - assertEquals(propertyChangedMessages.subList(0, 2).toSet(), entityStore.computeInReadonlyTransaction { - it.selectPropertyChangedMessagesFromRange( Instant.fromEpochMilliseconds(1000)..Instant.fromEpochMilliseconds(1500)) - }.toSet()) - } - -} \ No newline at end of file diff --git a/demo/car/build.gradle.kts b/demo/car/build.gradle.kts index ad3dda1..93968e7 100644 --- a/demo/car/build.gradle.kts +++ b/demo/car/build.gradle.kts @@ -21,6 +21,7 @@ dependencies { implementation(projects.controlsMagixClient) implementation(projects.controlsXodus) implementation(projects.controlsMongo) + implementation(projects.controlsStorage) implementation("io.ktor:ktor-client-cio:$ktorVersion") implementation("org.jetbrains.kotlinx:kotlinx-datetime:0.3.1") diff --git a/demo/car/src/main/kotlin/ru/mipt/npm/controls/demo/car/VirtualCarController.kt b/demo/car/src/main/kotlin/ru/mipt/npm/controls/demo/car/VirtualCarController.kt index 0dd6ffe..3796bc7 100644 --- a/demo/car/src/main/kotlin/ru/mipt/npm/controls/demo/car/VirtualCarController.kt +++ b/demo/car/src/main/kotlin/ru/mipt/npm/controls/demo/car/VirtualCarController.kt @@ -16,9 +16,10 @@ import ru.mipt.npm.controls.demo.car.IVirtualCar.Companion.acceleration import ru.mipt.npm.controls.mongo.DefaultMongoClientFactory import ru.mipt.npm.controls.mongo.connectMongo import ru.mipt.npm.controls.mongo.storeInMongo +import ru.mipt.npm.controls.storage.synchronous.store +import ru.mipt.npm.controls.storage.synchronous.storeMessages +import ru.mipt.npm.controls.xodus.DefaultSynchronousXodusClientFactory import ru.mipt.npm.controls.xodus.XODUS_STORE_PROPERTY -import ru.mipt.npm.controls.xodus.storeInXodus -import ru.mipt.npm.controls.xodus.storeMessagesInXodus import ru.mipt.npm.magix.api.MagixEndpoint import ru.mipt.npm.magix.rsocket.rSocketWithTcp import ru.mipt.npm.magix.server.startMagixServer @@ -56,14 +57,14 @@ class VirtualCarController : Controller(), ContextAware { //starting magix event loop and connect it to entity store magixServer = startMagixServer(enableRawRSocket = true, enableZmq = true) { flow -> - storeInXodus( flow, Meta { + store( flow, Meta { XODUS_STORE_PROPERTY put VirtualCarControllerConfig.magixEntityStorePath.toString() - }) + }, DefaultSynchronousXodusClientFactory) storeInMongo(flow) } magixVirtualCar = deviceManager.install("magix-virtual-car", MagixVirtualCar) //connect to device entity store - xodusStorageJob = deviceManager.storeMessagesInXodus() + xodusStorageJob = deviceManager.storeMessages(DefaultSynchronousXodusClientFactory) //Create mongo client and connect to MongoDB mongoStorageJob = deviceManager.connectMongo(DefaultMongoClientFactory) //Launch device client and connect it to the server diff --git a/settings.gradle.kts b/settings.gradle.kts index 896d04d..827b689 100644 --- a/settings.gradle.kts +++ b/settings.gradle.kts @@ -53,6 +53,6 @@ include( ":motors", ":controls-xodus", ":controls-mongo", - ":xodus-serialization" + ":xodus-serialization", + ":controls-storage" ) -include("xodus-serialization") diff --git a/xodus-serialization/src/main/kotlin/ru/mipt/npm/xodus/serialization/json/decoder.kt b/xodus-serialization/src/main/kotlin/ru/mipt/npm/xodus/serialization/json/decoder.kt index ff29904..eeba26b 100644 --- a/xodus-serialization/src/main/kotlin/ru/mipt/npm/xodus/serialization/json/decoder.kt +++ b/xodus-serialization/src/main/kotlin/ru/mipt/npm/xodus/serialization/json/decoder.kt @@ -38,7 +38,8 @@ internal fun StoreTransaction.decodeFromEntity(entity: Entity): JsonElement = bu public fun StoreTransaction.decodeFromEntity(entity: Entity, deserializer: DeserializationStrategy): T { val jsonElement = decodeFromEntity(entity) - return Json.decodeFromJsonElement(deserializer, jsonElement) + val json = Json { ignoreUnknownKeys = true } + return json.decodeFromJsonElement(deserializer, jsonElement) } public inline fun StoreTransaction.decodeFromEntity(entity: Entity): T = decodeFromEntity(entity, serializer()) diff --git a/xodus-serialization/src/main/kotlin/ru/mipt/npm/xodus/serialization/json/encoder.kt b/xodus-serialization/src/main/kotlin/ru/mipt/npm/xodus/serialization/json/encoder.kt index 3cd5b79..6a160da 100644 --- a/xodus-serialization/src/main/kotlin/ru/mipt/npm/xodus/serialization/json/encoder.kt +++ b/xodus-serialization/src/main/kotlin/ru/mipt/npm/xodus/serialization/json/encoder.kt @@ -4,9 +4,11 @@ import jetbrains.exodus.entitystore.Entity import jetbrains.exodus.entitystore.EntityId import jetbrains.exodus.entitystore.PersistentEntityStore import jetbrains.exodus.entitystore.StoreTransaction +import kotlinx.serialization.InternalSerializationApi import kotlinx.serialization.SerializationStrategy import kotlinx.serialization.json.* import kotlinx.serialization.serializer +import kotlin.reflect.KClass internal fun StoreTransaction.encodeToEntity(jsonElement: JsonElement, entity: Entity) { when (jsonElement) { @@ -69,3 +71,7 @@ public fun PersistentEntityStore.encodeToEntity(serializer: SerializationStr public inline fun PersistentEntityStore.encodeToEntity(value: T, entityType: String): EntityId = encodeToEntity(serializer(), value, entityType) + +@OptIn(InternalSerializationApi::class) +public fun PersistentEntityStore.encodeToEntity(value: T, entityType: String, clazz: KClass): EntityId = encodeToEntity( + clazz.serializer(), value, entityType)