Compare commits
3 Commits
9edf3b13ef
...
dbacdbc7cf
Author | SHA1 | Date | |
---|---|---|---|
dbacdbc7cf | |||
28ec2bc8b8 | |||
cfd9eb053c |
@ -22,7 +22,7 @@ public sealed class DeviceMessage {
|
|||||||
public abstract val sourceDevice: Name?
|
public abstract val sourceDevice: Name?
|
||||||
public abstract val targetDevice: Name?
|
public abstract val targetDevice: Name?
|
||||||
public abstract val comment: String?
|
public abstract val comment: String?
|
||||||
public abstract val time: Instant?
|
public abstract val time: Instant
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Update the source device name for composition. If the original name is null, the resulting name is also null.
|
* Update the source device name for composition. If the original name is null, the resulting name is also null.
|
||||||
@ -59,7 +59,7 @@ public data class PropertyChangedMessage(
|
|||||||
override val sourceDevice: Name = Name.EMPTY,
|
override val sourceDevice: Name = Name.EMPTY,
|
||||||
override val targetDevice: Name? = null,
|
override val targetDevice: Name? = null,
|
||||||
override val comment: String? = null,
|
override val comment: String? = null,
|
||||||
@EncodeDefault override val time: Instant? = Clock.System.now(),
|
@EncodeDefault override val time: Instant = Clock.System.now(),
|
||||||
) : DeviceMessage() {
|
) : DeviceMessage() {
|
||||||
override fun changeSource(block: (Name) -> Name): DeviceMessage = copy(sourceDevice = block(sourceDevice))
|
override fun changeSource(block: (Name) -> Name): DeviceMessage = copy(sourceDevice = block(sourceDevice))
|
||||||
}
|
}
|
||||||
@ -75,7 +75,7 @@ public data class PropertySetMessage(
|
|||||||
override val sourceDevice: Name? = null,
|
override val sourceDevice: Name? = null,
|
||||||
override val targetDevice: Name,
|
override val targetDevice: Name,
|
||||||
override val comment: String? = null,
|
override val comment: String? = null,
|
||||||
@EncodeDefault override val time: Instant? = Clock.System.now(),
|
@EncodeDefault override val time: Instant = Clock.System.now(),
|
||||||
) : DeviceMessage() {
|
) : DeviceMessage() {
|
||||||
override fun changeSource(block: (Name) -> Name): DeviceMessage = copy(sourceDevice = sourceDevice?.let(block))
|
override fun changeSource(block: (Name) -> Name): DeviceMessage = copy(sourceDevice = sourceDevice?.let(block))
|
||||||
}
|
}
|
||||||
@ -91,7 +91,7 @@ public data class PropertyGetMessage(
|
|||||||
override val sourceDevice: Name? = null,
|
override val sourceDevice: Name? = null,
|
||||||
override val targetDevice: Name,
|
override val targetDevice: Name,
|
||||||
override val comment: String? = null,
|
override val comment: String? = null,
|
||||||
@EncodeDefault override val time: Instant? = Clock.System.now(),
|
@EncodeDefault override val time: Instant = Clock.System.now(),
|
||||||
) : DeviceMessage() {
|
) : DeviceMessage() {
|
||||||
override fun changeSource(block: (Name) -> Name): DeviceMessage = copy(sourceDevice = sourceDevice?.let(block))
|
override fun changeSource(block: (Name) -> Name): DeviceMessage = copy(sourceDevice = sourceDevice?.let(block))
|
||||||
}
|
}
|
||||||
@ -105,7 +105,7 @@ public data class GetDescriptionMessage(
|
|||||||
override val sourceDevice: Name? = null,
|
override val sourceDevice: Name? = null,
|
||||||
override val targetDevice: Name? = null,
|
override val targetDevice: Name? = null,
|
||||||
override val comment: String? = null,
|
override val comment: String? = null,
|
||||||
@EncodeDefault override val time: Instant? = Clock.System.now(),
|
@EncodeDefault override val time: Instant = Clock.System.now(),
|
||||||
) : DeviceMessage() {
|
) : DeviceMessage() {
|
||||||
override fun changeSource(block: (Name) -> Name): DeviceMessage = copy(sourceDevice = sourceDevice?.let(block))
|
override fun changeSource(block: (Name) -> Name): DeviceMessage = copy(sourceDevice = sourceDevice?.let(block))
|
||||||
}
|
}
|
||||||
@ -122,7 +122,7 @@ public data class DescriptionMessage(
|
|||||||
override val sourceDevice: Name,
|
override val sourceDevice: Name,
|
||||||
override val targetDevice: Name? = null,
|
override val targetDevice: Name? = null,
|
||||||
override val comment: String? = null,
|
override val comment: String? = null,
|
||||||
@EncodeDefault override val time: Instant? = Clock.System.now(),
|
@EncodeDefault override val time: Instant = Clock.System.now(),
|
||||||
) : DeviceMessage() {
|
) : DeviceMessage() {
|
||||||
override fun changeSource(block: (Name) -> Name): DeviceMessage = copy(sourceDevice = block(sourceDevice))
|
override fun changeSource(block: (Name) -> Name): DeviceMessage = copy(sourceDevice = block(sourceDevice))
|
||||||
}
|
}
|
||||||
@ -141,7 +141,7 @@ public data class ActionExecuteMessage(
|
|||||||
override val sourceDevice: Name? = null,
|
override val sourceDevice: Name? = null,
|
||||||
override val targetDevice: Name,
|
override val targetDevice: Name,
|
||||||
override val comment: String? = null,
|
override val comment: String? = null,
|
||||||
@EncodeDefault override val time: Instant? = Clock.System.now(),
|
@EncodeDefault override val time: Instant = Clock.System.now(),
|
||||||
) : DeviceMessage() {
|
) : DeviceMessage() {
|
||||||
override fun changeSource(block: (Name) -> Name): DeviceMessage = copy(sourceDevice = sourceDevice?.let(block))
|
override fun changeSource(block: (Name) -> Name): DeviceMessage = copy(sourceDevice = sourceDevice?.let(block))
|
||||||
}
|
}
|
||||||
@ -160,7 +160,7 @@ public data class ActionResultMessage(
|
|||||||
override val sourceDevice: Name,
|
override val sourceDevice: Name,
|
||||||
override val targetDevice: Name? = null,
|
override val targetDevice: Name? = null,
|
||||||
override val comment: String? = null,
|
override val comment: String? = null,
|
||||||
@EncodeDefault override val time: Instant? = Clock.System.now(),
|
@EncodeDefault override val time: Instant = Clock.System.now(),
|
||||||
) : DeviceMessage() {
|
) : DeviceMessage() {
|
||||||
override fun changeSource(block: (Name) -> Name): DeviceMessage = copy(sourceDevice = block(sourceDevice))
|
override fun changeSource(block: (Name) -> Name): DeviceMessage = copy(sourceDevice = block(sourceDevice))
|
||||||
}
|
}
|
||||||
@ -175,7 +175,7 @@ public data class BinaryNotificationMessage(
|
|||||||
override val sourceDevice: Name,
|
override val sourceDevice: Name,
|
||||||
override val targetDevice: Name? = null,
|
override val targetDevice: Name? = null,
|
||||||
override val comment: String? = null,
|
override val comment: String? = null,
|
||||||
@EncodeDefault override val time: Instant? = Clock.System.now(),
|
@EncodeDefault override val time: Instant = Clock.System.now(),
|
||||||
) : DeviceMessage() {
|
) : DeviceMessage() {
|
||||||
override fun changeSource(block: (Name) -> Name): DeviceMessage = copy(sourceDevice = block(sourceDevice))
|
override fun changeSource(block: (Name) -> Name): DeviceMessage = copy(sourceDevice = block(sourceDevice))
|
||||||
}
|
}
|
||||||
@ -190,7 +190,7 @@ public data class EmptyDeviceMessage(
|
|||||||
override val sourceDevice: Name? = null,
|
override val sourceDevice: Name? = null,
|
||||||
override val targetDevice: Name? = null,
|
override val targetDevice: Name? = null,
|
||||||
override val comment: String? = null,
|
override val comment: String? = null,
|
||||||
@EncodeDefault override val time: Instant? = Clock.System.now(),
|
@EncodeDefault override val time: Instant = Clock.System.now(),
|
||||||
) : DeviceMessage() {
|
) : DeviceMessage() {
|
||||||
override fun changeSource(block: (Name) -> Name): DeviceMessage = copy(sourceDevice = sourceDevice?.let(block))
|
override fun changeSource(block: (Name) -> Name): DeviceMessage = copy(sourceDevice = sourceDevice?.let(block))
|
||||||
}
|
}
|
||||||
@ -206,7 +206,7 @@ public data class DeviceLogMessage(
|
|||||||
override val sourceDevice: Name = Name.EMPTY,
|
override val sourceDevice: Name = Name.EMPTY,
|
||||||
override val targetDevice: Name? = null,
|
override val targetDevice: Name? = null,
|
||||||
override val comment: String? = null,
|
override val comment: String? = null,
|
||||||
@EncodeDefault override val time: Instant? = Clock.System.now(),
|
@EncodeDefault override val time: Instant = Clock.System.now(),
|
||||||
) : DeviceMessage() {
|
) : DeviceMessage() {
|
||||||
override fun changeSource(block: (Name) -> Name): DeviceMessage = copy(sourceDevice = block(sourceDevice))
|
override fun changeSource(block: (Name) -> Name): DeviceMessage = copy(sourceDevice = block(sourceDevice))
|
||||||
}
|
}
|
||||||
@ -223,7 +223,7 @@ public data class DeviceErrorMessage(
|
|||||||
override val sourceDevice: Name = Name.EMPTY,
|
override val sourceDevice: Name = Name.EMPTY,
|
||||||
override val targetDevice: Name? = null,
|
override val targetDevice: Name? = null,
|
||||||
override val comment: String? = null,
|
override val comment: String? = null,
|
||||||
@EncodeDefault override val time: Instant? = Clock.System.now(),
|
@EncodeDefault override val time: Instant = Clock.System.now(),
|
||||||
) : DeviceMessage() {
|
) : DeviceMessage() {
|
||||||
override fun changeSource(block: (Name) -> Name): DeviceMessage = copy(sourceDevice = block(sourceDevice))
|
override fun changeSource(block: (Name) -> Name): DeviceMessage = copy(sourceDevice = block(sourceDevice))
|
||||||
}
|
}
|
||||||
@ -238,7 +238,7 @@ public data class DeviceLifeCycleMessage(
|
|||||||
override val sourceDevice: Name = Name.EMPTY,
|
override val sourceDevice: Name = Name.EMPTY,
|
||||||
override val targetDevice: Name? = null,
|
override val targetDevice: Name? = null,
|
||||||
override val comment: String? = null,
|
override val comment: String? = null,
|
||||||
@EncodeDefault override val time: Instant? = Clock.System.now(),
|
@EncodeDefault override val time: Instant = Clock.System.now(),
|
||||||
) : DeviceMessage() {
|
) : DeviceMessage() {
|
||||||
override fun changeSource(block: (Name) -> Name): DeviceMessage = copy(sourceDevice = block(sourceDevice))
|
override fun changeSource(block: (Name) -> Name): DeviceMessage = copy(sourceDevice = block(sourceDevice))
|
||||||
}
|
}
|
||||||
|
@ -0,0 +1,66 @@
|
|||||||
|
package space.kscience.controls.misc
|
||||||
|
|
||||||
|
import kotlinx.coroutines.CoroutineScope
|
||||||
|
import kotlinx.coroutines.flow.*
|
||||||
|
import kotlinx.datetime.Clock
|
||||||
|
import kotlinx.datetime.Instant
|
||||||
|
import space.kscience.controls.api.Device
|
||||||
|
import space.kscience.controls.api.DeviceMessage
|
||||||
|
import space.kscience.controls.api.PropertyChangedMessage
|
||||||
|
import space.kscience.controls.spec.DevicePropertySpec
|
||||||
|
import space.kscience.controls.spec.name
|
||||||
|
import space.kscience.dataforge.meta.transformations.MetaConverter
|
||||||
|
|
||||||
|
/**
|
||||||
|
* An interface for device property history.
|
||||||
|
*/
|
||||||
|
public interface PropertyHistory<T> {
|
||||||
|
/**
|
||||||
|
* Flow property values filtered by a time range. The implementation could flow it as a chunk or provide paging.
|
||||||
|
* So the resulting flow is allowed to suspend.
|
||||||
|
*
|
||||||
|
* If [until] is in the future, the resulting flow is potentially unlimited.
|
||||||
|
* Theoretically, it could be also unlimited if the event source keeps producing new event with timestamp in a given range.
|
||||||
|
*/
|
||||||
|
public fun flowHistory(
|
||||||
|
from: Instant = Instant.DISTANT_PAST,
|
||||||
|
until: Instant = Clock.System.now(),
|
||||||
|
): Flow<ValueWithTime<T>>
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* An in-memory property values history collector
|
||||||
|
*/
|
||||||
|
public class CollectedPropertyHistory<T>(
|
||||||
|
public val scope: CoroutineScope,
|
||||||
|
eventFlow: Flow<DeviceMessage>,
|
||||||
|
public val propertyName: String,
|
||||||
|
public val converter: MetaConverter<T>,
|
||||||
|
maxSize: Int = 1000,
|
||||||
|
) : PropertyHistory<T> {
|
||||||
|
|
||||||
|
private val store: SharedFlow<ValueWithTime<T>> = eventFlow
|
||||||
|
.filterIsInstance<PropertyChangedMessage>()
|
||||||
|
.filter { it.property == propertyName }
|
||||||
|
.map { ValueWithTime(converter.metaToObject(it.value), it.time) }
|
||||||
|
.shareIn(scope, started = SharingStarted.Eagerly, replay = maxSize)
|
||||||
|
|
||||||
|
override fun flowHistory(from: Instant, until: Instant): Flow<ValueWithTime<T>> =
|
||||||
|
store.filter { it.time in from..until }
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Collect and store in memory device property changes for a given property
|
||||||
|
*/
|
||||||
|
public fun <T> Device.collectPropertyHistory(
|
||||||
|
scope: CoroutineScope = this,
|
||||||
|
propertyName: String,
|
||||||
|
converter: MetaConverter<T>,
|
||||||
|
maxSize: Int = 1000,
|
||||||
|
): PropertyHistory<T> = CollectedPropertyHistory(scope, messageFlow, propertyName, converter, maxSize)
|
||||||
|
|
||||||
|
public fun <D : Device, T> D.collectPropertyHistory(
|
||||||
|
scope: CoroutineScope = this,
|
||||||
|
spec: DevicePropertySpec<D, T>,
|
||||||
|
maxSize: Int = 1000,
|
||||||
|
): PropertyHistory<T> = collectPropertyHistory(scope, spec.name, spec.converter, maxSize)
|
@ -55,7 +55,7 @@ public fun DeviceManager.launchMagixService(
|
|||||||
logger.error(error) { "Error while responding to message: ${error.message}" }
|
logger.error(error) { "Error while responding to message: ${error.message}" }
|
||||||
}.launchIn(this)
|
}.launchIn(this)
|
||||||
|
|
||||||
hubMessageFlow(this).onEach { payload ->
|
hubMessageFlow().onEach { payload ->
|
||||||
endpoint.send(
|
endpoint.send(
|
||||||
format = controlsMagixFormat,
|
format = controlsMagixFormat,
|
||||||
payload = payload,
|
payload = payload,
|
||||||
|
@ -60,7 +60,7 @@ internal class RemoteDeviceConnect {
|
|||||||
MagixMessage(
|
MagixMessage(
|
||||||
format = DeviceManager.magixFormat.defaultFormat,
|
format = DeviceManager.magixFormat.defaultFormat,
|
||||||
payload = MagixEndpoint.magixJson.encodeToJsonElement(DeviceManager.magixFormat.serializer, it),
|
payload = MagixEndpoint.magixJson.encodeToJsonElement(DeviceManager.magixFormat.serializer, it),
|
||||||
sourceEndpoint = "test",
|
sourceEndpoint = "source",
|
||||||
)
|
)
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -76,7 +76,7 @@ internal class RemoteDeviceConnect {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
val remoteDevice = virtualMagixEndpoint.remoteDevice(context, "test", Name.EMPTY)
|
val remoteDevice = virtualMagixEndpoint.remoteDevice(context, "source", "target", Name.EMPTY)
|
||||||
|
|
||||||
assertContains(0.0..1.0, remoteDevice.read(TestDevice.value))
|
assertContains(0.0..1.0, remoteDevice.read(TestDevice.value))
|
||||||
}
|
}
|
||||||
|
@ -118,7 +118,7 @@ public class DeviceNameSpace(
|
|||||||
//Subscribe on properties updates
|
//Subscribe on properties updates
|
||||||
device.onPropertyChange {
|
device.onPropertyChange {
|
||||||
nodes[property]?.let { node ->
|
nodes[property]?.let { node ->
|
||||||
val sourceTime = time?.let { DateTime(it.toJavaInstant()) }
|
val sourceTime = DateTime(time.toJavaInstant())
|
||||||
val newValue = value.toOpc(sourceTime = sourceTime)
|
val newValue = value.toOpc(sourceTime = sourceTime)
|
||||||
if (node.value.value != newValue.value) {
|
if (node.value.value != newValue.value) {
|
||||||
node.value = newValue
|
node.value = newValue
|
||||||
|
@ -4,9 +4,11 @@ import jetbrains.exodus.entitystore.Entity
|
|||||||
import jetbrains.exodus.entitystore.PersistentEntityStore
|
import jetbrains.exodus.entitystore.PersistentEntityStore
|
||||||
import jetbrains.exodus.entitystore.PersistentEntityStores
|
import jetbrains.exodus.entitystore.PersistentEntityStores
|
||||||
import jetbrains.exodus.entitystore.StoreTransaction
|
import jetbrains.exodus.entitystore.StoreTransaction
|
||||||
|
import kotlinx.coroutines.flow.Flow
|
||||||
|
import kotlinx.coroutines.flow.asFlow
|
||||||
|
import kotlinx.coroutines.flow.filter
|
||||||
|
import kotlinx.coroutines.flow.map
|
||||||
import kotlinx.datetime.Instant
|
import kotlinx.datetime.Instant
|
||||||
import kotlinx.serialization.ExperimentalSerializationApi
|
|
||||||
import kotlinx.serialization.descriptors.serialDescriptor
|
|
||||||
import kotlinx.serialization.encodeToString
|
import kotlinx.serialization.encodeToString
|
||||||
import kotlinx.serialization.json.Json
|
import kotlinx.serialization.json.Json
|
||||||
import kotlinx.serialization.json.jsonObject
|
import kotlinx.serialization.json.jsonObject
|
||||||
@ -19,7 +21,6 @@ import space.kscience.dataforge.context.request
|
|||||||
import space.kscience.dataforge.io.IOPlugin
|
import space.kscience.dataforge.io.IOPlugin
|
||||||
import space.kscience.dataforge.io.workDirectory
|
import space.kscience.dataforge.io.workDirectory
|
||||||
import space.kscience.dataforge.meta.Meta
|
import space.kscience.dataforge.meta.Meta
|
||||||
import space.kscience.dataforge.meta.get
|
|
||||||
import space.kscience.dataforge.meta.string
|
import space.kscience.dataforge.meta.string
|
||||||
import space.kscience.dataforge.misc.DFExperimental
|
import space.kscience.dataforge.misc.DFExperimental
|
||||||
import space.kscience.dataforge.names.Name
|
import space.kscience.dataforge.names.Name
|
||||||
@ -39,9 +40,7 @@ internal fun StoreTransaction.writeMessage(message: DeviceMessage): Unit {
|
|||||||
message.targetDevice?.let {
|
message.targetDevice?.let {
|
||||||
entity.setProperty(DeviceMessage::targetDevice.name, it.toString())
|
entity.setProperty(DeviceMessage::targetDevice.name, it.toString())
|
||||||
}
|
}
|
||||||
message.time?.let {
|
entity.setProperty(DeviceMessage::targetDevice.name, message.time.toString())
|
||||||
entity.setProperty(DeviceMessage::targetDevice.name, it.toString())
|
|
||||||
}
|
|
||||||
entity.setBlobString("json", Json.encodeToString(json))
|
entity.setBlobString("json", Json.encodeToString(json))
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -68,12 +67,12 @@ public class XodusDeviceMessageStorage(
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
override suspend fun readAll(): List<DeviceMessage> = entityStore.computeInReadonlyTransaction { transaction ->
|
override fun readAll(): Flow<DeviceMessage> = entityStore.computeInReadonlyTransaction { transaction ->
|
||||||
transaction.sort(
|
transaction.sort(
|
||||||
DEVICE_MESSAGE_ENTITY_TYPE,
|
DEVICE_MESSAGE_ENTITY_TYPE,
|
||||||
DeviceMessage::time.name,
|
DeviceMessage::time.name,
|
||||||
true
|
true
|
||||||
).map {
|
).asFlow().map {
|
||||||
Json.decodeFromString(
|
Json.decodeFromString(
|
||||||
DeviceMessage.serializer(),
|
DeviceMessage.serializer(),
|
||||||
it.getBlobString("json") ?: error("No json content found")
|
it.getBlobString("json") ?: error("No json content found")
|
||||||
@ -81,17 +80,17 @@ public class XodusDeviceMessageStorage(
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
override suspend fun read(
|
override fun read(
|
||||||
eventType: String,
|
eventType: String,
|
||||||
range: ClosedRange<Instant>?,
|
range: ClosedRange<Instant>?,
|
||||||
sourceDevice: Name?,
|
sourceDevice: Name?,
|
||||||
targetDevice: Name?,
|
targetDevice: Name?,
|
||||||
): List<DeviceMessage> = entityStore.computeInReadonlyTransaction { transaction ->
|
): Flow<DeviceMessage> = entityStore.computeInReadonlyTransaction { transaction ->
|
||||||
transaction.find(
|
transaction.find(
|
||||||
DEVICE_MESSAGE_ENTITY_TYPE,
|
DEVICE_MESSAGE_ENTITY_TYPE,
|
||||||
"type",
|
"type",
|
||||||
eventType
|
eventType
|
||||||
).asSequence().filter {
|
).asFlow().filter {
|
||||||
it.timeInRange(range) &&
|
it.timeInRange(range) &&
|
||||||
it.propertyMatchesName(DeviceMessage::sourceDevice.name, sourceDevice) &&
|
it.propertyMatchesName(DeviceMessage::sourceDevice.name, sourceDevice) &&
|
||||||
it.propertyMatchesName(DeviceMessage::targetDevice.name, targetDevice)
|
it.propertyMatchesName(DeviceMessage::targetDevice.name, targetDevice)
|
||||||
@ -100,7 +99,7 @@ public class XodusDeviceMessageStorage(
|
|||||||
DeviceMessage.serializer(),
|
DeviceMessage.serializer(),
|
||||||
it.getBlobString("json") ?: error("No json content found")
|
it.getBlobString("json") ?: error("No json content found")
|
||||||
)
|
)
|
||||||
}.sortedBy { it.time }.toList()
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
override fun close() {
|
override fun close() {
|
||||||
@ -124,16 +123,3 @@ public class XodusDeviceMessageStorage(
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
|
||||||
* Query all messages of given type
|
|
||||||
*/
|
|
||||||
@OptIn(ExperimentalSerializationApi::class)
|
|
||||||
public suspend inline fun <reified T : DeviceMessage> XodusDeviceMessageStorage.query(
|
|
||||||
range: ClosedRange<Instant>? = null,
|
|
||||||
sourceDevice: Name? = null,
|
|
||||||
targetDevice: Name? = null,
|
|
||||||
): List<T> = read(serialDescriptor<T>().serialName, range, sourceDevice, targetDevice).map {
|
|
||||||
//Check that all types are correct
|
|
||||||
it as T
|
|
||||||
}
|
|
||||||
|
@ -1,6 +1,10 @@
|
|||||||
package space.kscience.controls.storage
|
package space.kscience.controls.storage
|
||||||
|
|
||||||
|
import kotlinx.coroutines.flow.Flow
|
||||||
|
import kotlinx.coroutines.flow.map
|
||||||
import kotlinx.datetime.Instant
|
import kotlinx.datetime.Instant
|
||||||
|
import kotlinx.serialization.ExperimentalSerializationApi
|
||||||
|
import kotlinx.serialization.descriptors.serialDescriptor
|
||||||
import space.kscience.controls.api.DeviceMessage
|
import space.kscience.controls.api.DeviceMessage
|
||||||
import space.kscience.dataforge.names.Name
|
import space.kscience.dataforge.names.Name
|
||||||
|
|
||||||
@ -10,14 +14,34 @@ import space.kscience.dataforge.names.Name
|
|||||||
public interface DeviceMessageStorage {
|
public interface DeviceMessageStorage {
|
||||||
public suspend fun write(event: DeviceMessage)
|
public suspend fun write(event: DeviceMessage)
|
||||||
|
|
||||||
public suspend fun readAll(): List<DeviceMessage>
|
/**
|
||||||
|
* Return all messages in a storage as a flow
|
||||||
|
*/
|
||||||
|
public fun readAll(): Flow<DeviceMessage>
|
||||||
|
|
||||||
public suspend fun read(
|
/**
|
||||||
|
* Flow messages with given [eventType] and filters by [range], [sourceDevice] and [targetDevice].
|
||||||
|
* Null in filters means that there is not filtering for this field.
|
||||||
|
*/
|
||||||
|
public fun read(
|
||||||
eventType: String,
|
eventType: String,
|
||||||
range: ClosedRange<Instant>? = null,
|
range: ClosedRange<Instant>? = null,
|
||||||
sourceDevice: Name? = null,
|
sourceDevice: Name? = null,
|
||||||
targetDevice: Name? = null,
|
targetDevice: Name? = null,
|
||||||
): List<DeviceMessage>
|
): Flow<DeviceMessage>
|
||||||
|
|
||||||
public fun close()
|
public fun close()
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Query all messages of given type
|
||||||
|
*/
|
||||||
|
@OptIn(ExperimentalSerializationApi::class)
|
||||||
|
public inline fun <reified T : DeviceMessage> DeviceMessageStorage.read(
|
||||||
|
range: ClosedRange<Instant>? = null,
|
||||||
|
sourceDevice: Name? = null,
|
||||||
|
targetDevice: Name? = null,
|
||||||
|
): Flow<T> = read(serialDescriptor<T>().serialName, range, sourceDevice, targetDevice).map {
|
||||||
|
//Check that all types are correct
|
||||||
|
it as T
|
||||||
|
}
|
@ -0,0 +1,20 @@
|
|||||||
|
package space.kscience.controls.storage
|
||||||
|
|
||||||
|
import kotlinx.coroutines.flow.Flow
|
||||||
|
import kotlinx.coroutines.flow.filter
|
||||||
|
import kotlinx.coroutines.flow.map
|
||||||
|
import kotlinx.datetime.Instant
|
||||||
|
import space.kscience.controls.api.PropertyChangedMessage
|
||||||
|
import space.kscience.controls.misc.PropertyHistory
|
||||||
|
import space.kscience.controls.misc.ValueWithTime
|
||||||
|
import space.kscience.dataforge.meta.transformations.MetaConverter
|
||||||
|
|
||||||
|
public fun <T> DeviceMessageStorage.propertyHistory(
|
||||||
|
propertyName: String,
|
||||||
|
converter: MetaConverter<T>,
|
||||||
|
): PropertyHistory<T> = object : PropertyHistory<T> {
|
||||||
|
override fun flowHistory(from: Instant, until: Instant): Flow<ValueWithTime<T>> =
|
||||||
|
read<PropertyChangedMessage>(from..until)
|
||||||
|
.filter { it.property == propertyName }
|
||||||
|
.map { ValueWithTime(converter.metaToObject(it.value), it.time) }
|
||||||
|
}
|
@ -31,7 +31,7 @@ public fun DeviceManager.storeMessages(
|
|||||||
val storage = factory.build(context, meta)
|
val storage = factory.build(context, meta)
|
||||||
logger.debug { "Message storage with meta = $meta created" }
|
logger.debug { "Message storage with meta = $meta created" }
|
||||||
|
|
||||||
return hubMessageFlow(context).filter(filterCondition).onEach { message ->
|
return hubMessageFlow().filter(filterCondition).onEach { message ->
|
||||||
storage.write(message)
|
storage.write(message)
|
||||||
}.onCompletion {
|
}.onCompletion {
|
||||||
storage.close()
|
storage.close()
|
||||||
@ -39,26 +39,4 @@ public fun DeviceManager.storeMessages(
|
|||||||
}.launchIn(context)
|
}.launchIn(context)
|
||||||
}
|
}
|
||||||
|
|
||||||
///**
|
|
||||||
// * @return the list of deviceMessages that describes changes of specified property of specified device sorted by time
|
|
||||||
// * @param sourceDeviceName a name of device, history of which property we want to get
|
|
||||||
// * @param propertyName a name of property, history of which we want to get
|
|
||||||
// * @param factory a factory that produce mongo clients
|
|
||||||
// */
|
|
||||||
//public suspend fun getPropertyHistory(
|
|
||||||
// sourceDeviceName: String,
|
|
||||||
// propertyName: String,
|
|
||||||
// factory: Factory<EventStorage>,
|
|
||||||
// meta: Meta = Meta.EMPTY,
|
|
||||||
//): List<PropertyChangedMessage> {
|
|
||||||
// return factory(meta).use {
|
|
||||||
// it.getPropertyHistory(sourceDeviceName, propertyName)
|
|
||||||
// }
|
|
||||||
//}
|
|
||||||
//
|
|
||||||
//
|
|
||||||
//public enum class StorageKind {
|
|
||||||
// DEVICE_HUB,
|
|
||||||
// MAGIX_SERVER
|
|
||||||
//}
|
|
||||||
|
|
||||||
|
@ -104,10 +104,9 @@ public fun Plot.plotDeviceProperty(
|
|||||||
coroutineScope: CoroutineScope = device.context,
|
coroutineScope: CoroutineScope = device.context,
|
||||||
configuration: Scatter.() -> Unit = {},
|
configuration: Scatter.() -> Unit = {},
|
||||||
): Job = scatter(configuration).run {
|
): Job = scatter(configuration).run {
|
||||||
val clock = device.context.clock
|
|
||||||
val data = TimeData()
|
val data = TimeData()
|
||||||
device.propertyMessageFlow(propertyName).sample(sampling).transform {
|
device.propertyMessageFlow(propertyName).sample(sampling).transform {
|
||||||
data.append(it.time ?: clock.now(), it.value.extractValue())
|
data.append(it.time, it.value.extractValue())
|
||||||
data.trim(maxAge, maxPoints, minPoints)
|
data.trim(maxAge, maxPoints, minPoints)
|
||||||
emit(data)
|
emit(data)
|
||||||
}.onEach {
|
}.onEach {
|
||||||
|
@ -103,8 +103,8 @@ suspend fun main() {
|
|||||||
|
|
||||||
monitorEndpoint.subscribe(DeviceManager.magixFormat).onEach { (magixMessage, payload) ->
|
monitorEndpoint.subscribe(DeviceManager.magixFormat).onEach { (magixMessage, payload) ->
|
||||||
mutex.withLock {
|
mutex.withLock {
|
||||||
val delay = Clock.System.now() - payload.time!!
|
val delay = Clock.System.now() - payload.time
|
||||||
latest[magixMessage.sourceEndpoint] = Clock.System.now() - payload.time!!
|
latest[magixMessage.sourceEndpoint] = Clock.System.now() - payload.time
|
||||||
max[magixMessage.sourceEndpoint] =
|
max[magixMessage.sourceEndpoint] =
|
||||||
maxOf(delay, max[magixMessage.sourceEndpoint] ?: ZERO)
|
maxOf(delay, max[magixMessage.sourceEndpoint] ?: ZERO)
|
||||||
}
|
}
|
||||||
|
Loading…
Reference in New Issue
Block a user