Compare commits

...

3 Commits

11 changed files with 146 additions and 73 deletions

View File

@ -22,7 +22,7 @@ public sealed class DeviceMessage {
public abstract val sourceDevice: Name?
public abstract val targetDevice: Name?
public abstract val comment: String?
public abstract val time: Instant?
public abstract val time: Instant
/**
* Update the source device name for composition. If the original name is null, the resulting name is also null.
@ -59,7 +59,7 @@ public data class PropertyChangedMessage(
override val sourceDevice: Name = Name.EMPTY,
override val targetDevice: Name? = null,
override val comment: String? = null,
@EncodeDefault override val time: Instant? = Clock.System.now(),
@EncodeDefault override val time: Instant = Clock.System.now(),
) : DeviceMessage() {
override fun changeSource(block: (Name) -> Name): DeviceMessage = copy(sourceDevice = block(sourceDevice))
}
@ -75,7 +75,7 @@ public data class PropertySetMessage(
override val sourceDevice: Name? = null,
override val targetDevice: Name,
override val comment: String? = null,
@EncodeDefault override val time: Instant? = Clock.System.now(),
@EncodeDefault override val time: Instant = Clock.System.now(),
) : DeviceMessage() {
override fun changeSource(block: (Name) -> Name): DeviceMessage = copy(sourceDevice = sourceDevice?.let(block))
}
@ -91,7 +91,7 @@ public data class PropertyGetMessage(
override val sourceDevice: Name? = null,
override val targetDevice: Name,
override val comment: String? = null,
@EncodeDefault override val time: Instant? = Clock.System.now(),
@EncodeDefault override val time: Instant = Clock.System.now(),
) : DeviceMessage() {
override fun changeSource(block: (Name) -> Name): DeviceMessage = copy(sourceDevice = sourceDevice?.let(block))
}
@ -105,7 +105,7 @@ public data class GetDescriptionMessage(
override val sourceDevice: Name? = null,
override val targetDevice: Name? = null,
override val comment: String? = null,
@EncodeDefault override val time: Instant? = Clock.System.now(),
@EncodeDefault override val time: Instant = Clock.System.now(),
) : DeviceMessage() {
override fun changeSource(block: (Name) -> Name): DeviceMessage = copy(sourceDevice = sourceDevice?.let(block))
}
@ -122,7 +122,7 @@ public data class DescriptionMessage(
override val sourceDevice: Name,
override val targetDevice: Name? = null,
override val comment: String? = null,
@EncodeDefault override val time: Instant? = Clock.System.now(),
@EncodeDefault override val time: Instant = Clock.System.now(),
) : DeviceMessage() {
override fun changeSource(block: (Name) -> Name): DeviceMessage = copy(sourceDevice = block(sourceDevice))
}
@ -141,7 +141,7 @@ public data class ActionExecuteMessage(
override val sourceDevice: Name? = null,
override val targetDevice: Name,
override val comment: String? = null,
@EncodeDefault override val time: Instant? = Clock.System.now(),
@EncodeDefault override val time: Instant = Clock.System.now(),
) : DeviceMessage() {
override fun changeSource(block: (Name) -> Name): DeviceMessage = copy(sourceDevice = sourceDevice?.let(block))
}
@ -160,7 +160,7 @@ public data class ActionResultMessage(
override val sourceDevice: Name,
override val targetDevice: Name? = null,
override val comment: String? = null,
@EncodeDefault override val time: Instant? = Clock.System.now(),
@EncodeDefault override val time: Instant = Clock.System.now(),
) : DeviceMessage() {
override fun changeSource(block: (Name) -> Name): DeviceMessage = copy(sourceDevice = block(sourceDevice))
}
@ -175,7 +175,7 @@ public data class BinaryNotificationMessage(
override val sourceDevice: Name,
override val targetDevice: Name? = null,
override val comment: String? = null,
@EncodeDefault override val time: Instant? = Clock.System.now(),
@EncodeDefault override val time: Instant = Clock.System.now(),
) : DeviceMessage() {
override fun changeSource(block: (Name) -> Name): DeviceMessage = copy(sourceDevice = block(sourceDevice))
}
@ -190,7 +190,7 @@ public data class EmptyDeviceMessage(
override val sourceDevice: Name? = null,
override val targetDevice: Name? = null,
override val comment: String? = null,
@EncodeDefault override val time: Instant? = Clock.System.now(),
@EncodeDefault override val time: Instant = Clock.System.now(),
) : DeviceMessage() {
override fun changeSource(block: (Name) -> Name): DeviceMessage = copy(sourceDevice = sourceDevice?.let(block))
}
@ -206,7 +206,7 @@ public data class DeviceLogMessage(
override val sourceDevice: Name = Name.EMPTY,
override val targetDevice: Name? = null,
override val comment: String? = null,
@EncodeDefault override val time: Instant? = Clock.System.now(),
@EncodeDefault override val time: Instant = Clock.System.now(),
) : DeviceMessage() {
override fun changeSource(block: (Name) -> Name): DeviceMessage = copy(sourceDevice = block(sourceDevice))
}
@ -223,7 +223,7 @@ public data class DeviceErrorMessage(
override val sourceDevice: Name = Name.EMPTY,
override val targetDevice: Name? = null,
override val comment: String? = null,
@EncodeDefault override val time: Instant? = Clock.System.now(),
@EncodeDefault override val time: Instant = Clock.System.now(),
) : DeviceMessage() {
override fun changeSource(block: (Name) -> Name): DeviceMessage = copy(sourceDevice = block(sourceDevice))
}
@ -238,7 +238,7 @@ public data class DeviceLifeCycleMessage(
override val sourceDevice: Name = Name.EMPTY,
override val targetDevice: Name? = null,
override val comment: String? = null,
@EncodeDefault override val time: Instant? = Clock.System.now(),
@EncodeDefault override val time: Instant = Clock.System.now(),
) : DeviceMessage() {
override fun changeSource(block: (Name) -> Name): DeviceMessage = copy(sourceDevice = block(sourceDevice))
}

View File

@ -0,0 +1,66 @@
package space.kscience.controls.misc
import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.flow.*
import kotlinx.datetime.Clock
import kotlinx.datetime.Instant
import space.kscience.controls.api.Device
import space.kscience.controls.api.DeviceMessage
import space.kscience.controls.api.PropertyChangedMessage
import space.kscience.controls.spec.DevicePropertySpec
import space.kscience.controls.spec.name
import space.kscience.dataforge.meta.transformations.MetaConverter
/**
* An interface for device property history.
*/
public interface PropertyHistory<T> {
/**
* Flow property values filtered by a time range. The implementation could flow it as a chunk or provide paging.
* So the resulting flow is allowed to suspend.
*
* If [until] is in the future, the resulting flow is potentially unlimited.
* Theoretically, it could be also unlimited if the event source keeps producing new event with timestamp in a given range.
*/
public fun flowHistory(
from: Instant = Instant.DISTANT_PAST,
until: Instant = Clock.System.now(),
): Flow<ValueWithTime<T>>
}
/**
* An in-memory property values history collector
*/
public class CollectedPropertyHistory<T>(
public val scope: CoroutineScope,
eventFlow: Flow<DeviceMessage>,
public val propertyName: String,
public val converter: MetaConverter<T>,
maxSize: Int = 1000,
) : PropertyHistory<T> {
private val store: SharedFlow<ValueWithTime<T>> = eventFlow
.filterIsInstance<PropertyChangedMessage>()
.filter { it.property == propertyName }
.map { ValueWithTime(converter.metaToObject(it.value), it.time) }
.shareIn(scope, started = SharingStarted.Eagerly, replay = maxSize)
override fun flowHistory(from: Instant, until: Instant): Flow<ValueWithTime<T>> =
store.filter { it.time in from..until }
}
/**
* Collect and store in memory device property changes for a given property
*/
public fun <T> Device.collectPropertyHistory(
scope: CoroutineScope = this,
propertyName: String,
converter: MetaConverter<T>,
maxSize: Int = 1000,
): PropertyHistory<T> = CollectedPropertyHistory(scope, messageFlow, propertyName, converter, maxSize)
public fun <D : Device, T> D.collectPropertyHistory(
scope: CoroutineScope = this,
spec: DevicePropertySpec<D, T>,
maxSize: Int = 1000,
): PropertyHistory<T> = collectPropertyHistory(scope, spec.name, spec.converter, maxSize)

View File

@ -55,7 +55,7 @@ public fun DeviceManager.launchMagixService(
logger.error(error) { "Error while responding to message: ${error.message}" }
}.launchIn(this)
hubMessageFlow(this).onEach { payload ->
hubMessageFlow().onEach { payload ->
endpoint.send(
format = controlsMagixFormat,
payload = payload,

View File

@ -60,7 +60,7 @@ internal class RemoteDeviceConnect {
MagixMessage(
format = DeviceManager.magixFormat.defaultFormat,
payload = MagixEndpoint.magixJson.encodeToJsonElement(DeviceManager.magixFormat.serializer, it),
sourceEndpoint = "test",
sourceEndpoint = "source",
)
}
@ -76,7 +76,7 @@ internal class RemoteDeviceConnect {
}
}
val remoteDevice = virtualMagixEndpoint.remoteDevice(context, "test", Name.EMPTY)
val remoteDevice = virtualMagixEndpoint.remoteDevice(context, "source", "target", Name.EMPTY)
assertContains(0.0..1.0, remoteDevice.read(TestDevice.value))
}

View File

@ -118,7 +118,7 @@ public class DeviceNameSpace(
//Subscribe on properties updates
device.onPropertyChange {
nodes[property]?.let { node ->
val sourceTime = time?.let { DateTime(it.toJavaInstant()) }
val sourceTime = DateTime(time.toJavaInstant())
val newValue = value.toOpc(sourceTime = sourceTime)
if (node.value.value != newValue.value) {
node.value = newValue

View File

@ -4,9 +4,11 @@ import jetbrains.exodus.entitystore.Entity
import jetbrains.exodus.entitystore.PersistentEntityStore
import jetbrains.exodus.entitystore.PersistentEntityStores
import jetbrains.exodus.entitystore.StoreTransaction
import kotlinx.coroutines.flow.Flow
import kotlinx.coroutines.flow.asFlow
import kotlinx.coroutines.flow.filter
import kotlinx.coroutines.flow.map
import kotlinx.datetime.Instant
import kotlinx.serialization.ExperimentalSerializationApi
import kotlinx.serialization.descriptors.serialDescriptor
import kotlinx.serialization.encodeToString
import kotlinx.serialization.json.Json
import kotlinx.serialization.json.jsonObject
@ -19,7 +21,6 @@ import space.kscience.dataforge.context.request
import space.kscience.dataforge.io.IOPlugin
import space.kscience.dataforge.io.workDirectory
import space.kscience.dataforge.meta.Meta
import space.kscience.dataforge.meta.get
import space.kscience.dataforge.meta.string
import space.kscience.dataforge.misc.DFExperimental
import space.kscience.dataforge.names.Name
@ -39,9 +40,7 @@ internal fun StoreTransaction.writeMessage(message: DeviceMessage): Unit {
message.targetDevice?.let {
entity.setProperty(DeviceMessage::targetDevice.name, it.toString())
}
message.time?.let {
entity.setProperty(DeviceMessage::targetDevice.name, it.toString())
}
entity.setProperty(DeviceMessage::targetDevice.name, message.time.toString())
entity.setBlobString("json", Json.encodeToString(json))
}
@ -68,12 +67,12 @@ public class XodusDeviceMessageStorage(
}
}
override suspend fun readAll(): List<DeviceMessage> = entityStore.computeInReadonlyTransaction { transaction ->
override fun readAll(): Flow<DeviceMessage> = entityStore.computeInReadonlyTransaction { transaction ->
transaction.sort(
DEVICE_MESSAGE_ENTITY_TYPE,
DeviceMessage::time.name,
true
).map {
).asFlow().map {
Json.decodeFromString(
DeviceMessage.serializer(),
it.getBlobString("json") ?: error("No json content found")
@ -81,17 +80,17 @@ public class XodusDeviceMessageStorage(
}
}
override suspend fun read(
override fun read(
eventType: String,
range: ClosedRange<Instant>?,
sourceDevice: Name?,
targetDevice: Name?,
): List<DeviceMessage> = entityStore.computeInReadonlyTransaction { transaction ->
): Flow<DeviceMessage> = entityStore.computeInReadonlyTransaction { transaction ->
transaction.find(
DEVICE_MESSAGE_ENTITY_TYPE,
"type",
eventType
).asSequence().filter {
).asFlow().filter {
it.timeInRange(range) &&
it.propertyMatchesName(DeviceMessage::sourceDevice.name, sourceDevice) &&
it.propertyMatchesName(DeviceMessage::targetDevice.name, targetDevice)
@ -100,7 +99,7 @@ public class XodusDeviceMessageStorage(
DeviceMessage.serializer(),
it.getBlobString("json") ?: error("No json content found")
)
}.sortedBy { it.time }.toList()
}
}
override fun close() {
@ -123,17 +122,4 @@ public class XodusDeviceMessageStorage(
return XodusDeviceMessageStorage(entityStore)
}
}
}
/**
* Query all messages of given type
*/
@OptIn(ExperimentalSerializationApi::class)
public suspend inline fun <reified T : DeviceMessage> XodusDeviceMessageStorage.query(
range: ClosedRange<Instant>? = null,
sourceDevice: Name? = null,
targetDevice: Name? = null,
): List<T> = read(serialDescriptor<T>().serialName, range, sourceDevice, targetDevice).map {
//Check that all types are correct
it as T
}
}

View File

@ -1,6 +1,10 @@
package space.kscience.controls.storage
import kotlinx.coroutines.flow.Flow
import kotlinx.coroutines.flow.map
import kotlinx.datetime.Instant
import kotlinx.serialization.ExperimentalSerializationApi
import kotlinx.serialization.descriptors.serialDescriptor
import space.kscience.controls.api.DeviceMessage
import space.kscience.dataforge.names.Name
@ -10,14 +14,34 @@ import space.kscience.dataforge.names.Name
public interface DeviceMessageStorage {
public suspend fun write(event: DeviceMessage)
public suspend fun readAll(): List<DeviceMessage>
/**
* Return all messages in a storage as a flow
*/
public fun readAll(): Flow<DeviceMessage>
public suspend fun read(
/**
* Flow messages with given [eventType] and filters by [range], [sourceDevice] and [targetDevice].
* Null in filters means that there is not filtering for this field.
*/
public fun read(
eventType: String,
range: ClosedRange<Instant>? = null,
sourceDevice: Name? = null,
targetDevice: Name? = null,
): List<DeviceMessage>
): Flow<DeviceMessage>
public fun close()
}
/**
* Query all messages of given type
*/
@OptIn(ExperimentalSerializationApi::class)
public inline fun <reified T : DeviceMessage> DeviceMessageStorage.read(
range: ClosedRange<Instant>? = null,
sourceDevice: Name? = null,
targetDevice: Name? = null,
): Flow<T> = read(serialDescriptor<T>().serialName, range, sourceDevice, targetDevice).map {
//Check that all types are correct
it as T
}

View File

@ -0,0 +1,20 @@
package space.kscience.controls.storage
import kotlinx.coroutines.flow.Flow
import kotlinx.coroutines.flow.filter
import kotlinx.coroutines.flow.map
import kotlinx.datetime.Instant
import space.kscience.controls.api.PropertyChangedMessage
import space.kscience.controls.misc.PropertyHistory
import space.kscience.controls.misc.ValueWithTime
import space.kscience.dataforge.meta.transformations.MetaConverter
public fun <T> DeviceMessageStorage.propertyHistory(
propertyName: String,
converter: MetaConverter<T>,
): PropertyHistory<T> = object : PropertyHistory<T> {
override fun flowHistory(from: Instant, until: Instant): Flow<ValueWithTime<T>> =
read<PropertyChangedMessage>(from..until)
.filter { it.property == propertyName }
.map { ValueWithTime(converter.metaToObject(it.value), it.time) }
}

View File

@ -31,7 +31,7 @@ public fun DeviceManager.storeMessages(
val storage = factory.build(context, meta)
logger.debug { "Message storage with meta = $meta created" }
return hubMessageFlow(context).filter(filterCondition).onEach { message ->
return hubMessageFlow().filter(filterCondition).onEach { message ->
storage.write(message)
}.onCompletion {
storage.close()
@ -39,26 +39,4 @@ public fun DeviceManager.storeMessages(
}.launchIn(context)
}
///**
// * @return the list of deviceMessages that describes changes of specified property of specified device sorted by time
// * @param sourceDeviceName a name of device, history of which property we want to get
// * @param propertyName a name of property, history of which we want to get
// * @param factory a factory that produce mongo clients
// */
//public suspend fun getPropertyHistory(
// sourceDeviceName: String,
// propertyName: String,
// factory: Factory<EventStorage>,
// meta: Meta = Meta.EMPTY,
//): List<PropertyChangedMessage> {
// return factory(meta).use {
// it.getPropertyHistory(sourceDeviceName, propertyName)
// }
//}
//
//
//public enum class StorageKind {
// DEVICE_HUB,
// MAGIX_SERVER
//}

View File

@ -104,10 +104,9 @@ public fun Plot.plotDeviceProperty(
coroutineScope: CoroutineScope = device.context,
configuration: Scatter.() -> Unit = {},
): Job = scatter(configuration).run {
val clock = device.context.clock
val data = TimeData()
device.propertyMessageFlow(propertyName).sample(sampling).transform {
data.append(it.time ?: clock.now(), it.value.extractValue())
data.append(it.time, it.value.extractValue())
data.trim(maxAge, maxPoints, minPoints)
emit(data)
}.onEach {

View File

@ -103,8 +103,8 @@ suspend fun main() {
monitorEndpoint.subscribe(DeviceManager.magixFormat).onEach { (magixMessage, payload) ->
mutex.withLock {
val delay = Clock.System.now() - payload.time!!
latest[magixMessage.sourceEndpoint] = Clock.System.now() - payload.time!!
val delay = Clock.System.now() - payload.time
latest[magixMessage.sourceEndpoint] = Clock.System.now() - payload.time
max[magixMessage.sourceEndpoint] =
maxOf(delay, max[magixMessage.sourceEndpoint] ?: ZERO)
}