diff --git a/controls-storage/src/jvmMain/kotlin/ru/mipt/npm/controls/storage/storageJvm.kt b/controls-storage/src/jvmMain/kotlin/ru/mipt/npm/controls/storage/storageJvm.kt index 28135dc..3960ccd 100644 --- a/controls-storage/src/jvmMain/kotlin/ru/mipt/npm/controls/storage/storageJvm.kt +++ b/controls-storage/src/jvmMain/kotlin/ru/mipt/npm/controls/storage/storageJvm.kt @@ -1,46 +1,46 @@ -package ru.mipt.npm.controls.storage - -import io.ktor.server.application.Application -import kotlinx.coroutines.InternalCoroutinesApi -import kotlinx.coroutines.flow.Flow -import kotlinx.coroutines.flow.MutableSharedFlow -import kotlinx.coroutines.flow.filter -import kotlinx.coroutines.flow.onEach -import kotlinx.coroutines.job -import ru.mipt.npm.magix.server.GenericMagixMessage -import space.kscience.dataforge.context.Factory -import space.kscience.dataforge.meta.Meta - -/** - * Asynchronous version of synchronous API, so for more details check relative docs - */ - -internal fun Flow.store( - client: EventStorage, - flowFilter: suspend (GenericMagixMessage) -> Boolean = { true }, -) { - filter(flowFilter).onEach { message -> - client.storeMagixMessage(message) - } -} - -/** Begin to store MagixMessages from certain flow - * @param flow flow of messages which we will store - * @param meta Meta which may have some configuration parameters for our storage and will be used in invoke method of factory - * @param factory factory that will be used for creating persistent entity store instance. DefaultPersistentStoreFactory by default. - * @param flowFilter allow you to specify messages which we want to store. Always true by default. - */ -@OptIn(InternalCoroutinesApi::class) -public fun Application.store( - flow: MutableSharedFlow, - factory: Factory, - meta: Meta = Meta.EMPTY, - flowFilter: suspend (GenericMagixMessage) -> Boolean = { true }, -) { - val client = factory(meta) - - flow.store(client, flowFilter) - coroutineContext.job.invokeOnCompletion(onCancelling = true) { - client.close() - } -} +//package ru.mipt.npm.controls.storage +// +//import io.ktor.server.application.Application +//import kotlinx.coroutines.InternalCoroutinesApi +//import kotlinx.coroutines.flow.Flow +//import kotlinx.coroutines.flow.MutableSharedFlow +//import kotlinx.coroutines.flow.filter +//import kotlinx.coroutines.flow.onEach +//import kotlinx.coroutines.job +//import ru.mipt.npm.magix.server.GenericMagixMessage +//import space.kscience.dataforge.context.Factory +//import space.kscience.dataforge.meta.Meta +// +///** +// * Asynchronous version of synchronous API, so for more details check relative docs +// */ +// +//internal fun Flow.store( +// client: EventStorage, +// flowFilter: suspend (GenericMagixMessage) -> Boolean = { true }, +//) { +// filter(flowFilter).onEach { message -> +// client.storeMagixMessage(message) +// } +//} +// +///** Begin to store MagixMessages from certain flow +// * @param flow flow of messages which we will store +// * @param meta Meta which may have some configuration parameters for our storage and will be used in invoke method of factory +// * @param factory factory that will be used for creating persistent entity store instance. DefaultPersistentStoreFactory by default. +// * @param flowFilter allow you to specify messages which we want to store. Always true by default. +// */ +//@OptIn(InternalCoroutinesApi::class) +//public fun Application.store( +// flow: MutableSharedFlow, +// factory: Factory, +// meta: Meta = Meta.EMPTY, +// flowFilter: suspend (GenericMagixMessage) -> Boolean = { true }, +//) { +// val client = factory(meta) +// +// flow.store(client, flowFilter) +// coroutineContext.job.invokeOnCompletion(onCancelling = true) { +// client.close() +// } +//} diff --git a/controls-xodus/src/main/kotlin/ru/mipt/npm/controls/xodus/XodusDeviceMessageStorage.kt b/controls-xodus/src/main/kotlin/ru/mipt/npm/controls/xodus/XodusDeviceMessageStorage.kt index ea96f36..fd2f79b 100644 --- a/controls-xodus/src/main/kotlin/ru/mipt/npm/controls/xodus/XodusDeviceMessageStorage.kt +++ b/controls-xodus/src/main/kotlin/ru/mipt/npm/controls/xodus/XodusDeviceMessageStorage.kt @@ -27,7 +27,7 @@ import space.kscience.dataforge.names.matches import space.kscience.dataforge.names.parseAsName -internal fun StoreTransaction.writeMessage(message: DeviceMessage): Entity { +internal fun StoreTransaction.writeMessage(message: DeviceMessage): Unit { val entity: Entity = newEntity(XodusDeviceMessageStorage.DEVICE_MESSAGE_ENTITY_TYPE) val json = Json.encodeToJsonElement(DeviceMessage.serializer(), message).jsonObject val type = json["type"]?.jsonPrimitive?.content ?: error("Message json representation must have type.") @@ -43,8 +43,6 @@ internal fun StoreTransaction.writeMessage(message: DeviceMessage): Entity { entity.setProperty(DeviceMessage::targetDevice.name, it.toString()) } entity.setBlobString("json", Json.encodeToString(json)) - - return entity } @@ -65,15 +63,16 @@ public class XodusDeviceMessageStorage( ) : DeviceMessageStorage, AutoCloseable { override suspend fun write(event: DeviceMessage) { - //entityStore.encodeToEntity(event, DEVICE_MESSAGE_ENTITY_TYPE, DeviceMessage.serializer()) - entityStore.computeInTransaction { txn -> + entityStore.executeInTransaction { txn -> txn.writeMessage(event) } } - override suspend fun readAll(): List = entityStore.computeInTransaction { transaction -> - transaction.getAll( + override suspend fun readAll(): List = entityStore.computeInReadonlyTransaction { transaction -> + transaction.sort( DEVICE_MESSAGE_ENTITY_TYPE, + DeviceMessage::time.name, + true ).map { Json.decodeFromString( DeviceMessage.serializer(), @@ -87,22 +86,21 @@ public class XodusDeviceMessageStorage( range: ClosedRange?, sourceDevice: Name?, targetDevice: Name?, - ): List = entityStore.computeInTransaction { transaction -> + ): List = entityStore.computeInReadonlyTransaction { transaction -> transaction.find( DEVICE_MESSAGE_ENTITY_TYPE, "type", eventType - ).mapNotNull { - if (it.timeInRange(range) && - it.propertyMatchesName(DeviceMessage::sourceDevice.name, sourceDevice) && - it.propertyMatchesName(DeviceMessage::targetDevice.name, targetDevice) - ) { - Json.decodeFromString( - DeviceMessage.serializer(), - it.getBlobString("json") ?: error("No json content found") - ) - } else null - } + ).asSequence().filter { + it.timeInRange(range) && + it.propertyMatchesName(DeviceMessage::sourceDevice.name, sourceDevice) && + it.propertyMatchesName(DeviceMessage::targetDevice.name, targetDevice) + }.map { + Json.decodeFromString( + DeviceMessage.serializer(), + it.getBlobString("json") ?: error("No json content found") + ) + }.sortedBy { it.time }.toList() } override fun close() { @@ -110,7 +108,7 @@ public class XodusDeviceMessageStorage( } public companion object : Factory { - internal const val DEVICE_MESSAGE_ENTITY_TYPE = "DeviceMessage" + internal const val DEVICE_MESSAGE_ENTITY_TYPE = "controls-kt.message" public val XODUS_STORE_PROPERTY: Name = Name.of("xodus", "storagePath") diff --git a/controls-xodus/src/test/kotlin/PropertyHistoryTest.kt b/controls-xodus/src/test/kotlin/PropertyHistoryTest.kt index b1d8080..618569f 100644 --- a/controls-xodus/src/test/kotlin/PropertyHistoryTest.kt +++ b/controls-xodus/src/test/kotlin/PropertyHistoryTest.kt @@ -6,20 +6,19 @@ import org.junit.jupiter.api.AfterAll import org.junit.jupiter.api.Assertions.assertEquals import org.junit.jupiter.api.BeforeAll import org.junit.jupiter.api.Test -import ru.mipt.npm.controls.api.DeviceMessage import ru.mipt.npm.controls.api.PropertyChangedMessage -import ru.mipt.npm.controls.storage.getPropertyHistory -import ru.mipt.npm.controls.xodus.XODUS_STORE_PROPERTY -import ru.mipt.npm.controls.xodus.XodusEventStorage -import ru.mipt.npm.xodus.serialization.json.encodeToEntity +import ru.mipt.npm.controls.xodus.XodusDeviceMessageStorage +import ru.mipt.npm.controls.xodus.query +import ru.mipt.npm.controls.xodus.writeMessage import space.kscience.dataforge.meta.Meta import space.kscience.dataforge.names.Name -import java.io.File +import space.kscience.dataforge.names.asName +import java.nio.file.Files internal class PropertyHistoryTest { companion object { - private val storeName = ".property_history_test" - private val entityStore = PersistentEntityStores.newInstance(storeName) + val storeFile = Files.createTempDirectory("controls-xodus").toFile() + private val propertyChangedMessages = listOf( PropertyChangedMessage( @@ -45,28 +44,34 @@ internal class PropertyHistoryTest { @BeforeAll @JvmStatic fun createEntities() { - propertyChangedMessages.forEach { - entityStore.encodeToEntity(it, "DeviceMessage") + PersistentEntityStores.newInstance(storeFile).use { + it.executeInTransaction { transaction -> + propertyChangedMessages.forEach { message -> + transaction.writeMessage(message) + } + } } - entityStore.close() } @AfterAll @JvmStatic fun deleteDatabase() { - File(storeName).deleteRecursively() + storeFile.deleteRecursively() } } @OptIn(ExperimentalCoroutinesApi::class) @Test fun getPropertyHistoryTest() = runTest { - assertEquals( - listOf(propertyChangedMessages[0]), - getPropertyHistory( - "virtual-car", "speed", XodusEventStorage, Meta { - XODUS_STORE_PROPERTY put storeName - }) - ) + PersistentEntityStores.newInstance(storeFile).use { entityStore -> + XodusDeviceMessageStorage(entityStore).use { storage -> + assertEquals( + propertyChangedMessages[0], + storage.query( + sourceDevice = "virtual-car".asName() + ).first { it.property == "speed" } + ) + } + } } } \ No newline at end of file diff --git a/magix/magix-storage/magix-storage-xodus/src/main/kotlin/ru/mipt/npm/magix/storage/xodus/XodusMagixStorage.kt b/magix/magix-storage/magix-storage-xodus/src/main/kotlin/ru/mipt/npm/magix/storage/xodus/XodusMagixStorage.kt index 007d3b1..6c76ee7 100644 --- a/magix/magix-storage/magix-storage-xodus/src/main/kotlin/ru/mipt/npm/magix/storage/xodus/XodusMagixStorage.kt +++ b/magix/magix-storage/magix-storage-xodus/src/main/kotlin/ru/mipt/npm/magix/storage/xodus/XodusMagixStorage.kt @@ -1,25 +1,52 @@ package ru.mipt.npm.magix.storage.xodus +import jetbrains.exodus.entitystore.PersistentEntityStore import kotlinx.coroutines.CoroutineScope import kotlinx.coroutines.flow.launchIn import kotlinx.coroutines.flow.onEach +import kotlinx.serialization.encodeToString import kotlinx.serialization.json.JsonElement import ru.mipt.npm.magix.api.MagixEndpoint +import ru.mipt.npm.magix.api.MagixMessage import ru.mipt.npm.magix.api.MagixMessageFilter -import java.nio.file.Path public class XodusMagixStorage( - private val scope: CoroutineScope, - private val path: Path, - private val endpoint: MagixEndpoint, - private val filter: MagixMessageFilter = MagixMessageFilter(), + scope: CoroutineScope, + private val store: PersistentEntityStore, + endpoint: MagixEndpoint, + filter: MagixMessageFilter = MagixMessageFilter(), ) : AutoCloseable { - private val subscriptionJob = endpoint.subscribe(filter).onEach { - TODO() + //TODO consider message buffering + private val subscriptionJob = endpoint.subscribe(filter).onEach { message -> + store.executeInTransaction { transaction -> + transaction.newEntity(MAGIC_MESSAGE_ENTITY_TYPE).apply { + setProperty(MagixMessage<*>::origin.name, message.origin) + setProperty(MagixMessage<*>::format.name, message.format) + + setBlobString(MagixMessage<*>::payload.name, MagixEndpoint.magixJson.encodeToString(message.payload)) + + message.target?.let { + setProperty(MagixMessage<*>::target.name, it) + } + message.id?.let { + setProperty(MagixMessage<*>::id.name, it) + } + message.parentId?.let { + setProperty(MagixMessage<*>::parentId.name, it) + } + message.user?.let { + setBlobString(MagixMessage<*>::user.name, MagixEndpoint.magixJson.encodeToString(it)) + } + } + } }.launchIn(scope) override fun close() { subscriptionJob.cancel() } + + public companion object { + public const val MAGIC_MESSAGE_ENTITY_TYPE: String = "magix.message" + } } \ No newline at end of file