From dbacdbc7cff379b573d538c279aa422cec5f235c Mon Sep 17 00:00:00 2001 From: Alexander Nozik Date: Mon, 4 Mar 2024 12:47:40 +0300 Subject: [PATCH] Replace event controls-storage with async api --- .../xodus/XodusDeviceMessageStorage.kt | 33 +++++++------------ .../controls/storage/DeviceMessageStorage.kt | 30 +++++++++++++++-- .../controls/storage/propertyHistory.kt | 20 +++++++++++ .../controls/storage/storageCommon.kt | 22 ------------- 4 files changed, 58 insertions(+), 47 deletions(-) create mode 100644 controls-storage/src/commonMain/kotlin/space/kscience/controls/storage/propertyHistory.kt diff --git a/controls-storage/controls-xodus/src/main/kotlin/space/kscience/controls/xodus/XodusDeviceMessageStorage.kt b/controls-storage/controls-xodus/src/main/kotlin/space/kscience/controls/xodus/XodusDeviceMessageStorage.kt index e1b5a8d..35c9b89 100644 --- a/controls-storage/controls-xodus/src/main/kotlin/space/kscience/controls/xodus/XodusDeviceMessageStorage.kt +++ b/controls-storage/controls-xodus/src/main/kotlin/space/kscience/controls/xodus/XodusDeviceMessageStorage.kt @@ -4,9 +4,11 @@ import jetbrains.exodus.entitystore.Entity import jetbrains.exodus.entitystore.PersistentEntityStore import jetbrains.exodus.entitystore.PersistentEntityStores import jetbrains.exodus.entitystore.StoreTransaction +import kotlinx.coroutines.flow.Flow +import kotlinx.coroutines.flow.asFlow +import kotlinx.coroutines.flow.filter +import kotlinx.coroutines.flow.map import kotlinx.datetime.Instant -import kotlinx.serialization.ExperimentalSerializationApi -import kotlinx.serialization.descriptors.serialDescriptor import kotlinx.serialization.encodeToString import kotlinx.serialization.json.Json import kotlinx.serialization.json.jsonObject @@ -65,12 +67,12 @@ public class XodusDeviceMessageStorage( } } - override suspend fun readAll(): List = entityStore.computeInReadonlyTransaction { transaction -> + override fun readAll(): Flow = entityStore.computeInReadonlyTransaction { transaction -> transaction.sort( DEVICE_MESSAGE_ENTITY_TYPE, DeviceMessage::time.name, true - ).map { + ).asFlow().map { Json.decodeFromString( DeviceMessage.serializer(), it.getBlobString("json") ?: error("No json content found") @@ -78,17 +80,17 @@ public class XodusDeviceMessageStorage( } } - override suspend fun read( + override fun read( eventType: String, range: ClosedRange?, sourceDevice: Name?, targetDevice: Name?, - ): List = entityStore.computeInReadonlyTransaction { transaction -> + ): Flow = entityStore.computeInReadonlyTransaction { transaction -> transaction.find( DEVICE_MESSAGE_ENTITY_TYPE, "type", eventType - ).asSequence().filter { + ).asFlow().filter { it.timeInRange(range) && it.propertyMatchesName(DeviceMessage::sourceDevice.name, sourceDevice) && it.propertyMatchesName(DeviceMessage::targetDevice.name, targetDevice) @@ -97,7 +99,7 @@ public class XodusDeviceMessageStorage( DeviceMessage.serializer(), it.getBlobString("json") ?: error("No json content found") ) - }.sortedBy { it.time }.toList() + } } override fun close() { @@ -120,17 +122,4 @@ public class XodusDeviceMessageStorage( return XodusDeviceMessageStorage(entityStore) } } -} - -/** - * Query all messages of given type - */ -@OptIn(ExperimentalSerializationApi::class) -public suspend inline fun XodusDeviceMessageStorage.query( - range: ClosedRange? = null, - sourceDevice: Name? = null, - targetDevice: Name? = null, -): List = read(serialDescriptor().serialName, range, sourceDevice, targetDevice).map { - //Check that all types are correct - it as T -} +} \ No newline at end of file diff --git a/controls-storage/src/commonMain/kotlin/space/kscience/controls/storage/DeviceMessageStorage.kt b/controls-storage/src/commonMain/kotlin/space/kscience/controls/storage/DeviceMessageStorage.kt index 87f4b74..b9cc84f 100644 --- a/controls-storage/src/commonMain/kotlin/space/kscience/controls/storage/DeviceMessageStorage.kt +++ b/controls-storage/src/commonMain/kotlin/space/kscience/controls/storage/DeviceMessageStorage.kt @@ -1,6 +1,10 @@ package space.kscience.controls.storage +import kotlinx.coroutines.flow.Flow +import kotlinx.coroutines.flow.map import kotlinx.datetime.Instant +import kotlinx.serialization.ExperimentalSerializationApi +import kotlinx.serialization.descriptors.serialDescriptor import space.kscience.controls.api.DeviceMessage import space.kscience.dataforge.names.Name @@ -10,14 +14,34 @@ import space.kscience.dataforge.names.Name public interface DeviceMessageStorage { public suspend fun write(event: DeviceMessage) - public suspend fun readAll(): List + /** + * Return all messages in a storage as a flow + */ + public fun readAll(): Flow - public suspend fun read( + /** + * Flow messages with given [eventType] and filters by [range], [sourceDevice] and [targetDevice]. + * Null in filters means that there is not filtering for this field. + */ + public fun read( eventType: String, range: ClosedRange? = null, sourceDevice: Name? = null, targetDevice: Name? = null, - ): List + ): Flow public fun close() +} + +/** + * Query all messages of given type + */ +@OptIn(ExperimentalSerializationApi::class) +public inline fun DeviceMessageStorage.read( + range: ClosedRange? = null, + sourceDevice: Name? = null, + targetDevice: Name? = null, +): Flow = read(serialDescriptor().serialName, range, sourceDevice, targetDevice).map { + //Check that all types are correct + it as T } \ No newline at end of file diff --git a/controls-storage/src/commonMain/kotlin/space/kscience/controls/storage/propertyHistory.kt b/controls-storage/src/commonMain/kotlin/space/kscience/controls/storage/propertyHistory.kt new file mode 100644 index 0000000..0ca30cc --- /dev/null +++ b/controls-storage/src/commonMain/kotlin/space/kscience/controls/storage/propertyHistory.kt @@ -0,0 +1,20 @@ +package space.kscience.controls.storage + +import kotlinx.coroutines.flow.Flow +import kotlinx.coroutines.flow.filter +import kotlinx.coroutines.flow.map +import kotlinx.datetime.Instant +import space.kscience.controls.api.PropertyChangedMessage +import space.kscience.controls.misc.PropertyHistory +import space.kscience.controls.misc.ValueWithTime +import space.kscience.dataforge.meta.transformations.MetaConverter + +public fun DeviceMessageStorage.propertyHistory( + propertyName: String, + converter: MetaConverter, +): PropertyHistory = object : PropertyHistory { + override fun flowHistory(from: Instant, until: Instant): Flow> = + read(from..until) + .filter { it.property == propertyName } + .map { ValueWithTime(converter.metaToObject(it.value), it.time) } +} \ No newline at end of file diff --git a/controls-storage/src/commonMain/kotlin/space/kscience/controls/storage/storageCommon.kt b/controls-storage/src/commonMain/kotlin/space/kscience/controls/storage/storageCommon.kt index 04b633a..ed96efc 100644 --- a/controls-storage/src/commonMain/kotlin/space/kscience/controls/storage/storageCommon.kt +++ b/controls-storage/src/commonMain/kotlin/space/kscience/controls/storage/storageCommon.kt @@ -39,26 +39,4 @@ public fun DeviceManager.storeMessages( }.launchIn(context) } -///** -// * @return the list of deviceMessages that describes changes of specified property of specified device sorted by time -// * @param sourceDeviceName a name of device, history of which property we want to get -// * @param propertyName a name of property, history of which we want to get -// * @param factory a factory that produce mongo clients -// */ -//public suspend fun getPropertyHistory( -// sourceDeviceName: String, -// propertyName: String, -// factory: Factory, -// meta: Meta = Meta.EMPTY, -//): List { -// return factory(meta).use { -// it.getPropertyHistory(sourceDeviceName, propertyName) -// } -//} -// -// -//public enum class StorageKind { -// DEVICE_HUB, -// MAGIX_SERVER -//}