Replace event controls-storage with async api

This commit is contained in:
Alexander Nozik 2024-03-04 12:47:40 +03:00
parent 28ec2bc8b8
commit dbacdbc7cf
4 changed files with 58 additions and 47 deletions

View File

@ -4,9 +4,11 @@ import jetbrains.exodus.entitystore.Entity
import jetbrains.exodus.entitystore.PersistentEntityStore import jetbrains.exodus.entitystore.PersistentEntityStore
import jetbrains.exodus.entitystore.PersistentEntityStores import jetbrains.exodus.entitystore.PersistentEntityStores
import jetbrains.exodus.entitystore.StoreTransaction import jetbrains.exodus.entitystore.StoreTransaction
import kotlinx.coroutines.flow.Flow
import kotlinx.coroutines.flow.asFlow
import kotlinx.coroutines.flow.filter
import kotlinx.coroutines.flow.map
import kotlinx.datetime.Instant import kotlinx.datetime.Instant
import kotlinx.serialization.ExperimentalSerializationApi
import kotlinx.serialization.descriptors.serialDescriptor
import kotlinx.serialization.encodeToString import kotlinx.serialization.encodeToString
import kotlinx.serialization.json.Json import kotlinx.serialization.json.Json
import kotlinx.serialization.json.jsonObject import kotlinx.serialization.json.jsonObject
@ -65,12 +67,12 @@ public class XodusDeviceMessageStorage(
} }
} }
override suspend fun readAll(): List<DeviceMessage> = entityStore.computeInReadonlyTransaction { transaction -> override fun readAll(): Flow<DeviceMessage> = entityStore.computeInReadonlyTransaction { transaction ->
transaction.sort( transaction.sort(
DEVICE_MESSAGE_ENTITY_TYPE, DEVICE_MESSAGE_ENTITY_TYPE,
DeviceMessage::time.name, DeviceMessage::time.name,
true true
).map { ).asFlow().map {
Json.decodeFromString( Json.decodeFromString(
DeviceMessage.serializer(), DeviceMessage.serializer(),
it.getBlobString("json") ?: error("No json content found") it.getBlobString("json") ?: error("No json content found")
@ -78,17 +80,17 @@ public class XodusDeviceMessageStorage(
} }
} }
override suspend fun read( override fun read(
eventType: String, eventType: String,
range: ClosedRange<Instant>?, range: ClosedRange<Instant>?,
sourceDevice: Name?, sourceDevice: Name?,
targetDevice: Name?, targetDevice: Name?,
): List<DeviceMessage> = entityStore.computeInReadonlyTransaction { transaction -> ): Flow<DeviceMessage> = entityStore.computeInReadonlyTransaction { transaction ->
transaction.find( transaction.find(
DEVICE_MESSAGE_ENTITY_TYPE, DEVICE_MESSAGE_ENTITY_TYPE,
"type", "type",
eventType eventType
).asSequence().filter { ).asFlow().filter {
it.timeInRange(range) && it.timeInRange(range) &&
it.propertyMatchesName(DeviceMessage::sourceDevice.name, sourceDevice) && it.propertyMatchesName(DeviceMessage::sourceDevice.name, sourceDevice) &&
it.propertyMatchesName(DeviceMessage::targetDevice.name, targetDevice) it.propertyMatchesName(DeviceMessage::targetDevice.name, targetDevice)
@ -97,7 +99,7 @@ public class XodusDeviceMessageStorage(
DeviceMessage.serializer(), DeviceMessage.serializer(),
it.getBlobString("json") ?: error("No json content found") it.getBlobString("json") ?: error("No json content found")
) )
}.sortedBy { it.time }.toList() }
} }
override fun close() { override fun close() {
@ -120,17 +122,4 @@ public class XodusDeviceMessageStorage(
return XodusDeviceMessageStorage(entityStore) return XodusDeviceMessageStorage(entityStore)
} }
} }
} }
/**
* Query all messages of given type
*/
@OptIn(ExperimentalSerializationApi::class)
public suspend inline fun <reified T : DeviceMessage> XodusDeviceMessageStorage.query(
range: ClosedRange<Instant>? = null,
sourceDevice: Name? = null,
targetDevice: Name? = null,
): List<T> = read(serialDescriptor<T>().serialName, range, sourceDevice, targetDevice).map {
//Check that all types are correct
it as T
}

View File

@ -1,6 +1,10 @@
package space.kscience.controls.storage package space.kscience.controls.storage
import kotlinx.coroutines.flow.Flow
import kotlinx.coroutines.flow.map
import kotlinx.datetime.Instant import kotlinx.datetime.Instant
import kotlinx.serialization.ExperimentalSerializationApi
import kotlinx.serialization.descriptors.serialDescriptor
import space.kscience.controls.api.DeviceMessage import space.kscience.controls.api.DeviceMessage
import space.kscience.dataforge.names.Name import space.kscience.dataforge.names.Name
@ -10,14 +14,34 @@ import space.kscience.dataforge.names.Name
public interface DeviceMessageStorage { public interface DeviceMessageStorage {
public suspend fun write(event: DeviceMessage) public suspend fun write(event: DeviceMessage)
public suspend fun readAll(): List<DeviceMessage> /**
* Return all messages in a storage as a flow
*/
public fun readAll(): Flow<DeviceMessage>
public suspend fun read( /**
* Flow messages with given [eventType] and filters by [range], [sourceDevice] and [targetDevice].
* Null in filters means that there is not filtering for this field.
*/
public fun read(
eventType: String, eventType: String,
range: ClosedRange<Instant>? = null, range: ClosedRange<Instant>? = null,
sourceDevice: Name? = null, sourceDevice: Name? = null,
targetDevice: Name? = null, targetDevice: Name? = null,
): List<DeviceMessage> ): Flow<DeviceMessage>
public fun close() public fun close()
}
/**
* Query all messages of given type
*/
@OptIn(ExperimentalSerializationApi::class)
public inline fun <reified T : DeviceMessage> DeviceMessageStorage.read(
range: ClosedRange<Instant>? = null,
sourceDevice: Name? = null,
targetDevice: Name? = null,
): Flow<T> = read(serialDescriptor<T>().serialName, range, sourceDevice, targetDevice).map {
//Check that all types are correct
it as T
} }

View File

@ -0,0 +1,20 @@
package space.kscience.controls.storage
import kotlinx.coroutines.flow.Flow
import kotlinx.coroutines.flow.filter
import kotlinx.coroutines.flow.map
import kotlinx.datetime.Instant
import space.kscience.controls.api.PropertyChangedMessage
import space.kscience.controls.misc.PropertyHistory
import space.kscience.controls.misc.ValueWithTime
import space.kscience.dataforge.meta.transformations.MetaConverter
public fun <T> DeviceMessageStorage.propertyHistory(
propertyName: String,
converter: MetaConverter<T>,
): PropertyHistory<T> = object : PropertyHistory<T> {
override fun flowHistory(from: Instant, until: Instant): Flow<ValueWithTime<T>> =
read<PropertyChangedMessage>(from..until)
.filter { it.property == propertyName }
.map { ValueWithTime(converter.metaToObject(it.value), it.time) }
}

View File

@ -39,26 +39,4 @@ public fun DeviceManager.storeMessages(
}.launchIn(context) }.launchIn(context)
} }
///**
// * @return the list of deviceMessages that describes changes of specified property of specified device sorted by time
// * @param sourceDeviceName a name of device, history of which property we want to get
// * @param propertyName a name of property, history of which we want to get
// * @param factory a factory that produce mongo clients
// */
//public suspend fun getPropertyHistory(
// sourceDeviceName: String,
// propertyName: String,
// factory: Factory<EventStorage>,
// meta: Meta = Meta.EMPTY,
//): List<PropertyChangedMessage> {
// return factory(meta).use {
// it.getPropertyHistory(sourceDeviceName, propertyName)
// }
//}
//
//
//public enum class StorageKind {
// DEVICE_HUB,
// MAGIX_SERVER
//}