Compare commits

...

3 Commits

11 changed files with 146 additions and 73 deletions

View File

@ -22,7 +22,7 @@ public sealed class DeviceMessage {
public abstract val sourceDevice: Name? public abstract val sourceDevice: Name?
public abstract val targetDevice: Name? public abstract val targetDevice: Name?
public abstract val comment: String? public abstract val comment: String?
public abstract val time: Instant? public abstract val time: Instant
/** /**
* Update the source device name for composition. If the original name is null, the resulting name is also null. * Update the source device name for composition. If the original name is null, the resulting name is also null.
@ -59,7 +59,7 @@ public data class PropertyChangedMessage(
override val sourceDevice: Name = Name.EMPTY, override val sourceDevice: Name = Name.EMPTY,
override val targetDevice: Name? = null, override val targetDevice: Name? = null,
override val comment: String? = null, override val comment: String? = null,
@EncodeDefault override val time: Instant? = Clock.System.now(), @EncodeDefault override val time: Instant = Clock.System.now(),
) : DeviceMessage() { ) : DeviceMessage() {
override fun changeSource(block: (Name) -> Name): DeviceMessage = copy(sourceDevice = block(sourceDevice)) override fun changeSource(block: (Name) -> Name): DeviceMessage = copy(sourceDevice = block(sourceDevice))
} }
@ -75,7 +75,7 @@ public data class PropertySetMessage(
override val sourceDevice: Name? = null, override val sourceDevice: Name? = null,
override val targetDevice: Name, override val targetDevice: Name,
override val comment: String? = null, override val comment: String? = null,
@EncodeDefault override val time: Instant? = Clock.System.now(), @EncodeDefault override val time: Instant = Clock.System.now(),
) : DeviceMessage() { ) : DeviceMessage() {
override fun changeSource(block: (Name) -> Name): DeviceMessage = copy(sourceDevice = sourceDevice?.let(block)) override fun changeSource(block: (Name) -> Name): DeviceMessage = copy(sourceDevice = sourceDevice?.let(block))
} }
@ -91,7 +91,7 @@ public data class PropertyGetMessage(
override val sourceDevice: Name? = null, override val sourceDevice: Name? = null,
override val targetDevice: Name, override val targetDevice: Name,
override val comment: String? = null, override val comment: String? = null,
@EncodeDefault override val time: Instant? = Clock.System.now(), @EncodeDefault override val time: Instant = Clock.System.now(),
) : DeviceMessage() { ) : DeviceMessage() {
override fun changeSource(block: (Name) -> Name): DeviceMessage = copy(sourceDevice = sourceDevice?.let(block)) override fun changeSource(block: (Name) -> Name): DeviceMessage = copy(sourceDevice = sourceDevice?.let(block))
} }
@ -105,7 +105,7 @@ public data class GetDescriptionMessage(
override val sourceDevice: Name? = null, override val sourceDevice: Name? = null,
override val targetDevice: Name? = null, override val targetDevice: Name? = null,
override val comment: String? = null, override val comment: String? = null,
@EncodeDefault override val time: Instant? = Clock.System.now(), @EncodeDefault override val time: Instant = Clock.System.now(),
) : DeviceMessage() { ) : DeviceMessage() {
override fun changeSource(block: (Name) -> Name): DeviceMessage = copy(sourceDevice = sourceDevice?.let(block)) override fun changeSource(block: (Name) -> Name): DeviceMessage = copy(sourceDevice = sourceDevice?.let(block))
} }
@ -122,7 +122,7 @@ public data class DescriptionMessage(
override val sourceDevice: Name, override val sourceDevice: Name,
override val targetDevice: Name? = null, override val targetDevice: Name? = null,
override val comment: String? = null, override val comment: String? = null,
@EncodeDefault override val time: Instant? = Clock.System.now(), @EncodeDefault override val time: Instant = Clock.System.now(),
) : DeviceMessage() { ) : DeviceMessage() {
override fun changeSource(block: (Name) -> Name): DeviceMessage = copy(sourceDevice = block(sourceDevice)) override fun changeSource(block: (Name) -> Name): DeviceMessage = copy(sourceDevice = block(sourceDevice))
} }
@ -141,7 +141,7 @@ public data class ActionExecuteMessage(
override val sourceDevice: Name? = null, override val sourceDevice: Name? = null,
override val targetDevice: Name, override val targetDevice: Name,
override val comment: String? = null, override val comment: String? = null,
@EncodeDefault override val time: Instant? = Clock.System.now(), @EncodeDefault override val time: Instant = Clock.System.now(),
) : DeviceMessage() { ) : DeviceMessage() {
override fun changeSource(block: (Name) -> Name): DeviceMessage = copy(sourceDevice = sourceDevice?.let(block)) override fun changeSource(block: (Name) -> Name): DeviceMessage = copy(sourceDevice = sourceDevice?.let(block))
} }
@ -160,7 +160,7 @@ public data class ActionResultMessage(
override val sourceDevice: Name, override val sourceDevice: Name,
override val targetDevice: Name? = null, override val targetDevice: Name? = null,
override val comment: String? = null, override val comment: String? = null,
@EncodeDefault override val time: Instant? = Clock.System.now(), @EncodeDefault override val time: Instant = Clock.System.now(),
) : DeviceMessage() { ) : DeviceMessage() {
override fun changeSource(block: (Name) -> Name): DeviceMessage = copy(sourceDevice = block(sourceDevice)) override fun changeSource(block: (Name) -> Name): DeviceMessage = copy(sourceDevice = block(sourceDevice))
} }
@ -175,7 +175,7 @@ public data class BinaryNotificationMessage(
override val sourceDevice: Name, override val sourceDevice: Name,
override val targetDevice: Name? = null, override val targetDevice: Name? = null,
override val comment: String? = null, override val comment: String? = null,
@EncodeDefault override val time: Instant? = Clock.System.now(), @EncodeDefault override val time: Instant = Clock.System.now(),
) : DeviceMessage() { ) : DeviceMessage() {
override fun changeSource(block: (Name) -> Name): DeviceMessage = copy(sourceDevice = block(sourceDevice)) override fun changeSource(block: (Name) -> Name): DeviceMessage = copy(sourceDevice = block(sourceDevice))
} }
@ -190,7 +190,7 @@ public data class EmptyDeviceMessage(
override val sourceDevice: Name? = null, override val sourceDevice: Name? = null,
override val targetDevice: Name? = null, override val targetDevice: Name? = null,
override val comment: String? = null, override val comment: String? = null,
@EncodeDefault override val time: Instant? = Clock.System.now(), @EncodeDefault override val time: Instant = Clock.System.now(),
) : DeviceMessage() { ) : DeviceMessage() {
override fun changeSource(block: (Name) -> Name): DeviceMessage = copy(sourceDevice = sourceDevice?.let(block)) override fun changeSource(block: (Name) -> Name): DeviceMessage = copy(sourceDevice = sourceDevice?.let(block))
} }
@ -206,7 +206,7 @@ public data class DeviceLogMessage(
override val sourceDevice: Name = Name.EMPTY, override val sourceDevice: Name = Name.EMPTY,
override val targetDevice: Name? = null, override val targetDevice: Name? = null,
override val comment: String? = null, override val comment: String? = null,
@EncodeDefault override val time: Instant? = Clock.System.now(), @EncodeDefault override val time: Instant = Clock.System.now(),
) : DeviceMessage() { ) : DeviceMessage() {
override fun changeSource(block: (Name) -> Name): DeviceMessage = copy(sourceDevice = block(sourceDevice)) override fun changeSource(block: (Name) -> Name): DeviceMessage = copy(sourceDevice = block(sourceDevice))
} }
@ -223,7 +223,7 @@ public data class DeviceErrorMessage(
override val sourceDevice: Name = Name.EMPTY, override val sourceDevice: Name = Name.EMPTY,
override val targetDevice: Name? = null, override val targetDevice: Name? = null,
override val comment: String? = null, override val comment: String? = null,
@EncodeDefault override val time: Instant? = Clock.System.now(), @EncodeDefault override val time: Instant = Clock.System.now(),
) : DeviceMessage() { ) : DeviceMessage() {
override fun changeSource(block: (Name) -> Name): DeviceMessage = copy(sourceDevice = block(sourceDevice)) override fun changeSource(block: (Name) -> Name): DeviceMessage = copy(sourceDevice = block(sourceDevice))
} }
@ -238,7 +238,7 @@ public data class DeviceLifeCycleMessage(
override val sourceDevice: Name = Name.EMPTY, override val sourceDevice: Name = Name.EMPTY,
override val targetDevice: Name? = null, override val targetDevice: Name? = null,
override val comment: String? = null, override val comment: String? = null,
@EncodeDefault override val time: Instant? = Clock.System.now(), @EncodeDefault override val time: Instant = Clock.System.now(),
) : DeviceMessage() { ) : DeviceMessage() {
override fun changeSource(block: (Name) -> Name): DeviceMessage = copy(sourceDevice = block(sourceDevice)) override fun changeSource(block: (Name) -> Name): DeviceMessage = copy(sourceDevice = block(sourceDevice))
} }

View File

@ -0,0 +1,66 @@
package space.kscience.controls.misc
import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.flow.*
import kotlinx.datetime.Clock
import kotlinx.datetime.Instant
import space.kscience.controls.api.Device
import space.kscience.controls.api.DeviceMessage
import space.kscience.controls.api.PropertyChangedMessage
import space.kscience.controls.spec.DevicePropertySpec
import space.kscience.controls.spec.name
import space.kscience.dataforge.meta.transformations.MetaConverter
/**
* An interface for device property history.
*/
public interface PropertyHistory<T> {
/**
* Flow property values filtered by a time range. The implementation could flow it as a chunk or provide paging.
* So the resulting flow is allowed to suspend.
*
* If [until] is in the future, the resulting flow is potentially unlimited.
* Theoretically, it could be also unlimited if the event source keeps producing new event with timestamp in a given range.
*/
public fun flowHistory(
from: Instant = Instant.DISTANT_PAST,
until: Instant = Clock.System.now(),
): Flow<ValueWithTime<T>>
}
/**
* An in-memory property values history collector
*/
public class CollectedPropertyHistory<T>(
public val scope: CoroutineScope,
eventFlow: Flow<DeviceMessage>,
public val propertyName: String,
public val converter: MetaConverter<T>,
maxSize: Int = 1000,
) : PropertyHistory<T> {
private val store: SharedFlow<ValueWithTime<T>> = eventFlow
.filterIsInstance<PropertyChangedMessage>()
.filter { it.property == propertyName }
.map { ValueWithTime(converter.metaToObject(it.value), it.time) }
.shareIn(scope, started = SharingStarted.Eagerly, replay = maxSize)
override fun flowHistory(from: Instant, until: Instant): Flow<ValueWithTime<T>> =
store.filter { it.time in from..until }
}
/**
* Collect and store in memory device property changes for a given property
*/
public fun <T> Device.collectPropertyHistory(
scope: CoroutineScope = this,
propertyName: String,
converter: MetaConverter<T>,
maxSize: Int = 1000,
): PropertyHistory<T> = CollectedPropertyHistory(scope, messageFlow, propertyName, converter, maxSize)
public fun <D : Device, T> D.collectPropertyHistory(
scope: CoroutineScope = this,
spec: DevicePropertySpec<D, T>,
maxSize: Int = 1000,
): PropertyHistory<T> = collectPropertyHistory(scope, spec.name, spec.converter, maxSize)

View File

@ -55,7 +55,7 @@ public fun DeviceManager.launchMagixService(
logger.error(error) { "Error while responding to message: ${error.message}" } logger.error(error) { "Error while responding to message: ${error.message}" }
}.launchIn(this) }.launchIn(this)
hubMessageFlow(this).onEach { payload -> hubMessageFlow().onEach { payload ->
endpoint.send( endpoint.send(
format = controlsMagixFormat, format = controlsMagixFormat,
payload = payload, payload = payload,

View File

@ -60,7 +60,7 @@ internal class RemoteDeviceConnect {
MagixMessage( MagixMessage(
format = DeviceManager.magixFormat.defaultFormat, format = DeviceManager.magixFormat.defaultFormat,
payload = MagixEndpoint.magixJson.encodeToJsonElement(DeviceManager.magixFormat.serializer, it), payload = MagixEndpoint.magixJson.encodeToJsonElement(DeviceManager.magixFormat.serializer, it),
sourceEndpoint = "test", sourceEndpoint = "source",
) )
} }
@ -76,7 +76,7 @@ internal class RemoteDeviceConnect {
} }
} }
val remoteDevice = virtualMagixEndpoint.remoteDevice(context, "test", Name.EMPTY) val remoteDevice = virtualMagixEndpoint.remoteDevice(context, "source", "target", Name.EMPTY)
assertContains(0.0..1.0, remoteDevice.read(TestDevice.value)) assertContains(0.0..1.0, remoteDevice.read(TestDevice.value))
} }

View File

@ -118,7 +118,7 @@ public class DeviceNameSpace(
//Subscribe on properties updates //Subscribe on properties updates
device.onPropertyChange { device.onPropertyChange {
nodes[property]?.let { node -> nodes[property]?.let { node ->
val sourceTime = time?.let { DateTime(it.toJavaInstant()) } val sourceTime = DateTime(time.toJavaInstant())
val newValue = value.toOpc(sourceTime = sourceTime) val newValue = value.toOpc(sourceTime = sourceTime)
if (node.value.value != newValue.value) { if (node.value.value != newValue.value) {
node.value = newValue node.value = newValue

View File

@ -4,9 +4,11 @@ import jetbrains.exodus.entitystore.Entity
import jetbrains.exodus.entitystore.PersistentEntityStore import jetbrains.exodus.entitystore.PersistentEntityStore
import jetbrains.exodus.entitystore.PersistentEntityStores import jetbrains.exodus.entitystore.PersistentEntityStores
import jetbrains.exodus.entitystore.StoreTransaction import jetbrains.exodus.entitystore.StoreTransaction
import kotlinx.coroutines.flow.Flow
import kotlinx.coroutines.flow.asFlow
import kotlinx.coroutines.flow.filter
import kotlinx.coroutines.flow.map
import kotlinx.datetime.Instant import kotlinx.datetime.Instant
import kotlinx.serialization.ExperimentalSerializationApi
import kotlinx.serialization.descriptors.serialDescriptor
import kotlinx.serialization.encodeToString import kotlinx.serialization.encodeToString
import kotlinx.serialization.json.Json import kotlinx.serialization.json.Json
import kotlinx.serialization.json.jsonObject import kotlinx.serialization.json.jsonObject
@ -19,7 +21,6 @@ import space.kscience.dataforge.context.request
import space.kscience.dataforge.io.IOPlugin import space.kscience.dataforge.io.IOPlugin
import space.kscience.dataforge.io.workDirectory import space.kscience.dataforge.io.workDirectory
import space.kscience.dataforge.meta.Meta import space.kscience.dataforge.meta.Meta
import space.kscience.dataforge.meta.get
import space.kscience.dataforge.meta.string import space.kscience.dataforge.meta.string
import space.kscience.dataforge.misc.DFExperimental import space.kscience.dataforge.misc.DFExperimental
import space.kscience.dataforge.names.Name import space.kscience.dataforge.names.Name
@ -39,9 +40,7 @@ internal fun StoreTransaction.writeMessage(message: DeviceMessage): Unit {
message.targetDevice?.let { message.targetDevice?.let {
entity.setProperty(DeviceMessage::targetDevice.name, it.toString()) entity.setProperty(DeviceMessage::targetDevice.name, it.toString())
} }
message.time?.let { entity.setProperty(DeviceMessage::targetDevice.name, message.time.toString())
entity.setProperty(DeviceMessage::targetDevice.name, it.toString())
}
entity.setBlobString("json", Json.encodeToString(json)) entity.setBlobString("json", Json.encodeToString(json))
} }
@ -68,12 +67,12 @@ public class XodusDeviceMessageStorage(
} }
} }
override suspend fun readAll(): List<DeviceMessage> = entityStore.computeInReadonlyTransaction { transaction -> override fun readAll(): Flow<DeviceMessage> = entityStore.computeInReadonlyTransaction { transaction ->
transaction.sort( transaction.sort(
DEVICE_MESSAGE_ENTITY_TYPE, DEVICE_MESSAGE_ENTITY_TYPE,
DeviceMessage::time.name, DeviceMessage::time.name,
true true
).map { ).asFlow().map {
Json.decodeFromString( Json.decodeFromString(
DeviceMessage.serializer(), DeviceMessage.serializer(),
it.getBlobString("json") ?: error("No json content found") it.getBlobString("json") ?: error("No json content found")
@ -81,17 +80,17 @@ public class XodusDeviceMessageStorage(
} }
} }
override suspend fun read( override fun read(
eventType: String, eventType: String,
range: ClosedRange<Instant>?, range: ClosedRange<Instant>?,
sourceDevice: Name?, sourceDevice: Name?,
targetDevice: Name?, targetDevice: Name?,
): List<DeviceMessage> = entityStore.computeInReadonlyTransaction { transaction -> ): Flow<DeviceMessage> = entityStore.computeInReadonlyTransaction { transaction ->
transaction.find( transaction.find(
DEVICE_MESSAGE_ENTITY_TYPE, DEVICE_MESSAGE_ENTITY_TYPE,
"type", "type",
eventType eventType
).asSequence().filter { ).asFlow().filter {
it.timeInRange(range) && it.timeInRange(range) &&
it.propertyMatchesName(DeviceMessage::sourceDevice.name, sourceDevice) && it.propertyMatchesName(DeviceMessage::sourceDevice.name, sourceDevice) &&
it.propertyMatchesName(DeviceMessage::targetDevice.name, targetDevice) it.propertyMatchesName(DeviceMessage::targetDevice.name, targetDevice)
@ -100,7 +99,7 @@ public class XodusDeviceMessageStorage(
DeviceMessage.serializer(), DeviceMessage.serializer(),
it.getBlobString("json") ?: error("No json content found") it.getBlobString("json") ?: error("No json content found")
) )
}.sortedBy { it.time }.toList() }
} }
override fun close() { override fun close() {
@ -123,17 +122,4 @@ public class XodusDeviceMessageStorage(
return XodusDeviceMessageStorage(entityStore) return XodusDeviceMessageStorage(entityStore)
} }
} }
} }
/**
* Query all messages of given type
*/
@OptIn(ExperimentalSerializationApi::class)
public suspend inline fun <reified T : DeviceMessage> XodusDeviceMessageStorage.query(
range: ClosedRange<Instant>? = null,
sourceDevice: Name? = null,
targetDevice: Name? = null,
): List<T> = read(serialDescriptor<T>().serialName, range, sourceDevice, targetDevice).map {
//Check that all types are correct
it as T
}

View File

@ -1,6 +1,10 @@
package space.kscience.controls.storage package space.kscience.controls.storage
import kotlinx.coroutines.flow.Flow
import kotlinx.coroutines.flow.map
import kotlinx.datetime.Instant import kotlinx.datetime.Instant
import kotlinx.serialization.ExperimentalSerializationApi
import kotlinx.serialization.descriptors.serialDescriptor
import space.kscience.controls.api.DeviceMessage import space.kscience.controls.api.DeviceMessage
import space.kscience.dataforge.names.Name import space.kscience.dataforge.names.Name
@ -10,14 +14,34 @@ import space.kscience.dataforge.names.Name
public interface DeviceMessageStorage { public interface DeviceMessageStorage {
public suspend fun write(event: DeviceMessage) public suspend fun write(event: DeviceMessage)
public suspend fun readAll(): List<DeviceMessage> /**
* Return all messages in a storage as a flow
*/
public fun readAll(): Flow<DeviceMessage>
public suspend fun read( /**
* Flow messages with given [eventType] and filters by [range], [sourceDevice] and [targetDevice].
* Null in filters means that there is not filtering for this field.
*/
public fun read(
eventType: String, eventType: String,
range: ClosedRange<Instant>? = null, range: ClosedRange<Instant>? = null,
sourceDevice: Name? = null, sourceDevice: Name? = null,
targetDevice: Name? = null, targetDevice: Name? = null,
): List<DeviceMessage> ): Flow<DeviceMessage>
public fun close() public fun close()
}
/**
* Query all messages of given type
*/
@OptIn(ExperimentalSerializationApi::class)
public inline fun <reified T : DeviceMessage> DeviceMessageStorage.read(
range: ClosedRange<Instant>? = null,
sourceDevice: Name? = null,
targetDevice: Name? = null,
): Flow<T> = read(serialDescriptor<T>().serialName, range, sourceDevice, targetDevice).map {
//Check that all types are correct
it as T
} }

View File

@ -0,0 +1,20 @@
package space.kscience.controls.storage
import kotlinx.coroutines.flow.Flow
import kotlinx.coroutines.flow.filter
import kotlinx.coroutines.flow.map
import kotlinx.datetime.Instant
import space.kscience.controls.api.PropertyChangedMessage
import space.kscience.controls.misc.PropertyHistory
import space.kscience.controls.misc.ValueWithTime
import space.kscience.dataforge.meta.transformations.MetaConverter
public fun <T> DeviceMessageStorage.propertyHistory(
propertyName: String,
converter: MetaConverter<T>,
): PropertyHistory<T> = object : PropertyHistory<T> {
override fun flowHistory(from: Instant, until: Instant): Flow<ValueWithTime<T>> =
read<PropertyChangedMessage>(from..until)
.filter { it.property == propertyName }
.map { ValueWithTime(converter.metaToObject(it.value), it.time) }
}

View File

@ -31,7 +31,7 @@ public fun DeviceManager.storeMessages(
val storage = factory.build(context, meta) val storage = factory.build(context, meta)
logger.debug { "Message storage with meta = $meta created" } logger.debug { "Message storage with meta = $meta created" }
return hubMessageFlow(context).filter(filterCondition).onEach { message -> return hubMessageFlow().filter(filterCondition).onEach { message ->
storage.write(message) storage.write(message)
}.onCompletion { }.onCompletion {
storage.close() storage.close()
@ -39,26 +39,4 @@ public fun DeviceManager.storeMessages(
}.launchIn(context) }.launchIn(context)
} }
///**
// * @return the list of deviceMessages that describes changes of specified property of specified device sorted by time
// * @param sourceDeviceName a name of device, history of which property we want to get
// * @param propertyName a name of property, history of which we want to get
// * @param factory a factory that produce mongo clients
// */
//public suspend fun getPropertyHistory(
// sourceDeviceName: String,
// propertyName: String,
// factory: Factory<EventStorage>,
// meta: Meta = Meta.EMPTY,
//): List<PropertyChangedMessage> {
// return factory(meta).use {
// it.getPropertyHistory(sourceDeviceName, propertyName)
// }
//}
//
//
//public enum class StorageKind {
// DEVICE_HUB,
// MAGIX_SERVER
//}

View File

@ -104,10 +104,9 @@ public fun Plot.plotDeviceProperty(
coroutineScope: CoroutineScope = device.context, coroutineScope: CoroutineScope = device.context,
configuration: Scatter.() -> Unit = {}, configuration: Scatter.() -> Unit = {},
): Job = scatter(configuration).run { ): Job = scatter(configuration).run {
val clock = device.context.clock
val data = TimeData() val data = TimeData()
device.propertyMessageFlow(propertyName).sample(sampling).transform { device.propertyMessageFlow(propertyName).sample(sampling).transform {
data.append(it.time ?: clock.now(), it.value.extractValue()) data.append(it.time, it.value.extractValue())
data.trim(maxAge, maxPoints, minPoints) data.trim(maxAge, maxPoints, minPoints)
emit(data) emit(data)
}.onEach { }.onEach {

View File

@ -103,8 +103,8 @@ suspend fun main() {
monitorEndpoint.subscribe(DeviceManager.magixFormat).onEach { (magixMessage, payload) -> monitorEndpoint.subscribe(DeviceManager.magixFormat).onEach { (magixMessage, payload) ->
mutex.withLock { mutex.withLock {
val delay = Clock.System.now() - payload.time!! val delay = Clock.System.now() - payload.time
latest[magixMessage.sourceEndpoint] = Clock.System.now() - payload.time!! latest[magixMessage.sourceEndpoint] = Clock.System.now() - payload.time
max[magixMessage.sourceEndpoint] = max[magixMessage.sourceEndpoint] =
maxOf(delay, max[magixMessage.sourceEndpoint] ?: ZERO) maxOf(delay, max[magixMessage.sourceEndpoint] ?: ZERO)
} }