Compare commits
No commits in common. "dbacdbc7cff379b573d538c279aa422cec5f235c" and "9edf3b13efaf238a60bdc54de31a3d2acd63ac49" have entirely different histories.
dbacdbc7cf
...
9edf3b13ef
@ -22,7 +22,7 @@ public sealed class DeviceMessage {
|
||||
public abstract val sourceDevice: Name?
|
||||
public abstract val targetDevice: Name?
|
||||
public abstract val comment: String?
|
||||
public abstract val time: Instant
|
||||
public abstract val time: Instant?
|
||||
|
||||
/**
|
||||
* Update the source device name for composition. If the original name is null, the resulting name is also null.
|
||||
@ -59,7 +59,7 @@ public data class PropertyChangedMessage(
|
||||
override val sourceDevice: Name = Name.EMPTY,
|
||||
override val targetDevice: Name? = null,
|
||||
override val comment: String? = null,
|
||||
@EncodeDefault override val time: Instant = Clock.System.now(),
|
||||
@EncodeDefault override val time: Instant? = Clock.System.now(),
|
||||
) : DeviceMessage() {
|
||||
override fun changeSource(block: (Name) -> Name): DeviceMessage = copy(sourceDevice = block(sourceDevice))
|
||||
}
|
||||
@ -75,7 +75,7 @@ public data class PropertySetMessage(
|
||||
override val sourceDevice: Name? = null,
|
||||
override val targetDevice: Name,
|
||||
override val comment: String? = null,
|
||||
@EncodeDefault override val time: Instant = Clock.System.now(),
|
||||
@EncodeDefault override val time: Instant? = Clock.System.now(),
|
||||
) : DeviceMessage() {
|
||||
override fun changeSource(block: (Name) -> Name): DeviceMessage = copy(sourceDevice = sourceDevice?.let(block))
|
||||
}
|
||||
@ -91,7 +91,7 @@ public data class PropertyGetMessage(
|
||||
override val sourceDevice: Name? = null,
|
||||
override val targetDevice: Name,
|
||||
override val comment: String? = null,
|
||||
@EncodeDefault override val time: Instant = Clock.System.now(),
|
||||
@EncodeDefault override val time: Instant? = Clock.System.now(),
|
||||
) : DeviceMessage() {
|
||||
override fun changeSource(block: (Name) -> Name): DeviceMessage = copy(sourceDevice = sourceDevice?.let(block))
|
||||
}
|
||||
@ -105,7 +105,7 @@ public data class GetDescriptionMessage(
|
||||
override val sourceDevice: Name? = null,
|
||||
override val targetDevice: Name? = null,
|
||||
override val comment: String? = null,
|
||||
@EncodeDefault override val time: Instant = Clock.System.now(),
|
||||
@EncodeDefault override val time: Instant? = Clock.System.now(),
|
||||
) : DeviceMessage() {
|
||||
override fun changeSource(block: (Name) -> Name): DeviceMessage = copy(sourceDevice = sourceDevice?.let(block))
|
||||
}
|
||||
@ -122,7 +122,7 @@ public data class DescriptionMessage(
|
||||
override val sourceDevice: Name,
|
||||
override val targetDevice: Name? = null,
|
||||
override val comment: String? = null,
|
||||
@EncodeDefault override val time: Instant = Clock.System.now(),
|
||||
@EncodeDefault override val time: Instant? = Clock.System.now(),
|
||||
) : DeviceMessage() {
|
||||
override fun changeSource(block: (Name) -> Name): DeviceMessage = copy(sourceDevice = block(sourceDevice))
|
||||
}
|
||||
@ -141,7 +141,7 @@ public data class ActionExecuteMessage(
|
||||
override val sourceDevice: Name? = null,
|
||||
override val targetDevice: Name,
|
||||
override val comment: String? = null,
|
||||
@EncodeDefault override val time: Instant = Clock.System.now(),
|
||||
@EncodeDefault override val time: Instant? = Clock.System.now(),
|
||||
) : DeviceMessage() {
|
||||
override fun changeSource(block: (Name) -> Name): DeviceMessage = copy(sourceDevice = sourceDevice?.let(block))
|
||||
}
|
||||
@ -160,7 +160,7 @@ public data class ActionResultMessage(
|
||||
override val sourceDevice: Name,
|
||||
override val targetDevice: Name? = null,
|
||||
override val comment: String? = null,
|
||||
@EncodeDefault override val time: Instant = Clock.System.now(),
|
||||
@EncodeDefault override val time: Instant? = Clock.System.now(),
|
||||
) : DeviceMessage() {
|
||||
override fun changeSource(block: (Name) -> Name): DeviceMessage = copy(sourceDevice = block(sourceDevice))
|
||||
}
|
||||
@ -175,7 +175,7 @@ public data class BinaryNotificationMessage(
|
||||
override val sourceDevice: Name,
|
||||
override val targetDevice: Name? = null,
|
||||
override val comment: String? = null,
|
||||
@EncodeDefault override val time: Instant = Clock.System.now(),
|
||||
@EncodeDefault override val time: Instant? = Clock.System.now(),
|
||||
) : DeviceMessage() {
|
||||
override fun changeSource(block: (Name) -> Name): DeviceMessage = copy(sourceDevice = block(sourceDevice))
|
||||
}
|
||||
@ -190,7 +190,7 @@ public data class EmptyDeviceMessage(
|
||||
override val sourceDevice: Name? = null,
|
||||
override val targetDevice: Name? = null,
|
||||
override val comment: String? = null,
|
||||
@EncodeDefault override val time: Instant = Clock.System.now(),
|
||||
@EncodeDefault override val time: Instant? = Clock.System.now(),
|
||||
) : DeviceMessage() {
|
||||
override fun changeSource(block: (Name) -> Name): DeviceMessage = copy(sourceDevice = sourceDevice?.let(block))
|
||||
}
|
||||
@ -206,7 +206,7 @@ public data class DeviceLogMessage(
|
||||
override val sourceDevice: Name = Name.EMPTY,
|
||||
override val targetDevice: Name? = null,
|
||||
override val comment: String? = null,
|
||||
@EncodeDefault override val time: Instant = Clock.System.now(),
|
||||
@EncodeDefault override val time: Instant? = Clock.System.now(),
|
||||
) : DeviceMessage() {
|
||||
override fun changeSource(block: (Name) -> Name): DeviceMessage = copy(sourceDevice = block(sourceDevice))
|
||||
}
|
||||
@ -223,7 +223,7 @@ public data class DeviceErrorMessage(
|
||||
override val sourceDevice: Name = Name.EMPTY,
|
||||
override val targetDevice: Name? = null,
|
||||
override val comment: String? = null,
|
||||
@EncodeDefault override val time: Instant = Clock.System.now(),
|
||||
@EncodeDefault override val time: Instant? = Clock.System.now(),
|
||||
) : DeviceMessage() {
|
||||
override fun changeSource(block: (Name) -> Name): DeviceMessage = copy(sourceDevice = block(sourceDevice))
|
||||
}
|
||||
@ -238,7 +238,7 @@ public data class DeviceLifeCycleMessage(
|
||||
override val sourceDevice: Name = Name.EMPTY,
|
||||
override val targetDevice: Name? = null,
|
||||
override val comment: String? = null,
|
||||
@EncodeDefault override val time: Instant = Clock.System.now(),
|
||||
@EncodeDefault override val time: Instant? = Clock.System.now(),
|
||||
) : DeviceMessage() {
|
||||
override fun changeSource(block: (Name) -> Name): DeviceMessage = copy(sourceDevice = block(sourceDevice))
|
||||
}
|
||||
|
@ -1,66 +0,0 @@
|
||||
package space.kscience.controls.misc
|
||||
|
||||
import kotlinx.coroutines.CoroutineScope
|
||||
import kotlinx.coroutines.flow.*
|
||||
import kotlinx.datetime.Clock
|
||||
import kotlinx.datetime.Instant
|
||||
import space.kscience.controls.api.Device
|
||||
import space.kscience.controls.api.DeviceMessage
|
||||
import space.kscience.controls.api.PropertyChangedMessage
|
||||
import space.kscience.controls.spec.DevicePropertySpec
|
||||
import space.kscience.controls.spec.name
|
||||
import space.kscience.dataforge.meta.transformations.MetaConverter
|
||||
|
||||
/**
|
||||
* An interface for device property history.
|
||||
*/
|
||||
public interface PropertyHistory<T> {
|
||||
/**
|
||||
* Flow property values filtered by a time range. The implementation could flow it as a chunk or provide paging.
|
||||
* So the resulting flow is allowed to suspend.
|
||||
*
|
||||
* If [until] is in the future, the resulting flow is potentially unlimited.
|
||||
* Theoretically, it could be also unlimited if the event source keeps producing new event with timestamp in a given range.
|
||||
*/
|
||||
public fun flowHistory(
|
||||
from: Instant = Instant.DISTANT_PAST,
|
||||
until: Instant = Clock.System.now(),
|
||||
): Flow<ValueWithTime<T>>
|
||||
}
|
||||
|
||||
/**
|
||||
* An in-memory property values history collector
|
||||
*/
|
||||
public class CollectedPropertyHistory<T>(
|
||||
public val scope: CoroutineScope,
|
||||
eventFlow: Flow<DeviceMessage>,
|
||||
public val propertyName: String,
|
||||
public val converter: MetaConverter<T>,
|
||||
maxSize: Int = 1000,
|
||||
) : PropertyHistory<T> {
|
||||
|
||||
private val store: SharedFlow<ValueWithTime<T>> = eventFlow
|
||||
.filterIsInstance<PropertyChangedMessage>()
|
||||
.filter { it.property == propertyName }
|
||||
.map { ValueWithTime(converter.metaToObject(it.value), it.time) }
|
||||
.shareIn(scope, started = SharingStarted.Eagerly, replay = maxSize)
|
||||
|
||||
override fun flowHistory(from: Instant, until: Instant): Flow<ValueWithTime<T>> =
|
||||
store.filter { it.time in from..until }
|
||||
}
|
||||
|
||||
/**
|
||||
* Collect and store in memory device property changes for a given property
|
||||
*/
|
||||
public fun <T> Device.collectPropertyHistory(
|
||||
scope: CoroutineScope = this,
|
||||
propertyName: String,
|
||||
converter: MetaConverter<T>,
|
||||
maxSize: Int = 1000,
|
||||
): PropertyHistory<T> = CollectedPropertyHistory(scope, messageFlow, propertyName, converter, maxSize)
|
||||
|
||||
public fun <D : Device, T> D.collectPropertyHistory(
|
||||
scope: CoroutineScope = this,
|
||||
spec: DevicePropertySpec<D, T>,
|
||||
maxSize: Int = 1000,
|
||||
): PropertyHistory<T> = collectPropertyHistory(scope, spec.name, spec.converter, maxSize)
|
@ -55,7 +55,7 @@ public fun DeviceManager.launchMagixService(
|
||||
logger.error(error) { "Error while responding to message: ${error.message}" }
|
||||
}.launchIn(this)
|
||||
|
||||
hubMessageFlow().onEach { payload ->
|
||||
hubMessageFlow(this).onEach { payload ->
|
||||
endpoint.send(
|
||||
format = controlsMagixFormat,
|
||||
payload = payload,
|
||||
|
@ -60,7 +60,7 @@ internal class RemoteDeviceConnect {
|
||||
MagixMessage(
|
||||
format = DeviceManager.magixFormat.defaultFormat,
|
||||
payload = MagixEndpoint.magixJson.encodeToJsonElement(DeviceManager.magixFormat.serializer, it),
|
||||
sourceEndpoint = "source",
|
||||
sourceEndpoint = "test",
|
||||
)
|
||||
}
|
||||
|
||||
@ -76,7 +76,7 @@ internal class RemoteDeviceConnect {
|
||||
}
|
||||
}
|
||||
|
||||
val remoteDevice = virtualMagixEndpoint.remoteDevice(context, "source", "target", Name.EMPTY)
|
||||
val remoteDevice = virtualMagixEndpoint.remoteDevice(context, "test", Name.EMPTY)
|
||||
|
||||
assertContains(0.0..1.0, remoteDevice.read(TestDevice.value))
|
||||
}
|
||||
|
@ -118,7 +118,7 @@ public class DeviceNameSpace(
|
||||
//Subscribe on properties updates
|
||||
device.onPropertyChange {
|
||||
nodes[property]?.let { node ->
|
||||
val sourceTime = DateTime(time.toJavaInstant())
|
||||
val sourceTime = time?.let { DateTime(it.toJavaInstant()) }
|
||||
val newValue = value.toOpc(sourceTime = sourceTime)
|
||||
if (node.value.value != newValue.value) {
|
||||
node.value = newValue
|
||||
|
@ -4,11 +4,9 @@ import jetbrains.exodus.entitystore.Entity
|
||||
import jetbrains.exodus.entitystore.PersistentEntityStore
|
||||
import jetbrains.exodus.entitystore.PersistentEntityStores
|
||||
import jetbrains.exodus.entitystore.StoreTransaction
|
||||
import kotlinx.coroutines.flow.Flow
|
||||
import kotlinx.coroutines.flow.asFlow
|
||||
import kotlinx.coroutines.flow.filter
|
||||
import kotlinx.coroutines.flow.map
|
||||
import kotlinx.datetime.Instant
|
||||
import kotlinx.serialization.ExperimentalSerializationApi
|
||||
import kotlinx.serialization.descriptors.serialDescriptor
|
||||
import kotlinx.serialization.encodeToString
|
||||
import kotlinx.serialization.json.Json
|
||||
import kotlinx.serialization.json.jsonObject
|
||||
@ -21,6 +19,7 @@ import space.kscience.dataforge.context.request
|
||||
import space.kscience.dataforge.io.IOPlugin
|
||||
import space.kscience.dataforge.io.workDirectory
|
||||
import space.kscience.dataforge.meta.Meta
|
||||
import space.kscience.dataforge.meta.get
|
||||
import space.kscience.dataforge.meta.string
|
||||
import space.kscience.dataforge.misc.DFExperimental
|
||||
import space.kscience.dataforge.names.Name
|
||||
@ -40,7 +39,9 @@ internal fun StoreTransaction.writeMessage(message: DeviceMessage): Unit {
|
||||
message.targetDevice?.let {
|
||||
entity.setProperty(DeviceMessage::targetDevice.name, it.toString())
|
||||
}
|
||||
entity.setProperty(DeviceMessage::targetDevice.name, message.time.toString())
|
||||
message.time?.let {
|
||||
entity.setProperty(DeviceMessage::targetDevice.name, it.toString())
|
||||
}
|
||||
entity.setBlobString("json", Json.encodeToString(json))
|
||||
}
|
||||
|
||||
@ -67,12 +68,12 @@ public class XodusDeviceMessageStorage(
|
||||
}
|
||||
}
|
||||
|
||||
override fun readAll(): Flow<DeviceMessage> = entityStore.computeInReadonlyTransaction { transaction ->
|
||||
override suspend fun readAll(): List<DeviceMessage> = entityStore.computeInReadonlyTransaction { transaction ->
|
||||
transaction.sort(
|
||||
DEVICE_MESSAGE_ENTITY_TYPE,
|
||||
DeviceMessage::time.name,
|
||||
true
|
||||
).asFlow().map {
|
||||
).map {
|
||||
Json.decodeFromString(
|
||||
DeviceMessage.serializer(),
|
||||
it.getBlobString("json") ?: error("No json content found")
|
||||
@ -80,17 +81,17 @@ public class XodusDeviceMessageStorage(
|
||||
}
|
||||
}
|
||||
|
||||
override fun read(
|
||||
override suspend fun read(
|
||||
eventType: String,
|
||||
range: ClosedRange<Instant>?,
|
||||
sourceDevice: Name?,
|
||||
targetDevice: Name?,
|
||||
): Flow<DeviceMessage> = entityStore.computeInReadonlyTransaction { transaction ->
|
||||
): List<DeviceMessage> = entityStore.computeInReadonlyTransaction { transaction ->
|
||||
transaction.find(
|
||||
DEVICE_MESSAGE_ENTITY_TYPE,
|
||||
"type",
|
||||
eventType
|
||||
).asFlow().filter {
|
||||
).asSequence().filter {
|
||||
it.timeInRange(range) &&
|
||||
it.propertyMatchesName(DeviceMessage::sourceDevice.name, sourceDevice) &&
|
||||
it.propertyMatchesName(DeviceMessage::targetDevice.name, targetDevice)
|
||||
@ -99,7 +100,7 @@ public class XodusDeviceMessageStorage(
|
||||
DeviceMessage.serializer(),
|
||||
it.getBlobString("json") ?: error("No json content found")
|
||||
)
|
||||
}
|
||||
}.sortedBy { it.time }.toList()
|
||||
}
|
||||
|
||||
override fun close() {
|
||||
@ -123,3 +124,16 @@ public class XodusDeviceMessageStorage(
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Query all messages of given type
|
||||
*/
|
||||
@OptIn(ExperimentalSerializationApi::class)
|
||||
public suspend inline fun <reified T : DeviceMessage> XodusDeviceMessageStorage.query(
|
||||
range: ClosedRange<Instant>? = null,
|
||||
sourceDevice: Name? = null,
|
||||
targetDevice: Name? = null,
|
||||
): List<T> = read(serialDescriptor<T>().serialName, range, sourceDevice, targetDevice).map {
|
||||
//Check that all types are correct
|
||||
it as T
|
||||
}
|
||||
|
@ -1,10 +1,6 @@
|
||||
package space.kscience.controls.storage
|
||||
|
||||
import kotlinx.coroutines.flow.Flow
|
||||
import kotlinx.coroutines.flow.map
|
||||
import kotlinx.datetime.Instant
|
||||
import kotlinx.serialization.ExperimentalSerializationApi
|
||||
import kotlinx.serialization.descriptors.serialDescriptor
|
||||
import space.kscience.controls.api.DeviceMessage
|
||||
import space.kscience.dataforge.names.Name
|
||||
|
||||
@ -14,34 +10,14 @@ import space.kscience.dataforge.names.Name
|
||||
public interface DeviceMessageStorage {
|
||||
public suspend fun write(event: DeviceMessage)
|
||||
|
||||
/**
|
||||
* Return all messages in a storage as a flow
|
||||
*/
|
||||
public fun readAll(): Flow<DeviceMessage>
|
||||
public suspend fun readAll(): List<DeviceMessage>
|
||||
|
||||
/**
|
||||
* Flow messages with given [eventType] and filters by [range], [sourceDevice] and [targetDevice].
|
||||
* Null in filters means that there is not filtering for this field.
|
||||
*/
|
||||
public fun read(
|
||||
public suspend fun read(
|
||||
eventType: String,
|
||||
range: ClosedRange<Instant>? = null,
|
||||
sourceDevice: Name? = null,
|
||||
targetDevice: Name? = null,
|
||||
): Flow<DeviceMessage>
|
||||
): List<DeviceMessage>
|
||||
|
||||
public fun close()
|
||||
}
|
||||
|
||||
/**
|
||||
* Query all messages of given type
|
||||
*/
|
||||
@OptIn(ExperimentalSerializationApi::class)
|
||||
public inline fun <reified T : DeviceMessage> DeviceMessageStorage.read(
|
||||
range: ClosedRange<Instant>? = null,
|
||||
sourceDevice: Name? = null,
|
||||
targetDevice: Name? = null,
|
||||
): Flow<T> = read(serialDescriptor<T>().serialName, range, sourceDevice, targetDevice).map {
|
||||
//Check that all types are correct
|
||||
it as T
|
||||
}
|
@ -1,20 +0,0 @@
|
||||
package space.kscience.controls.storage
|
||||
|
||||
import kotlinx.coroutines.flow.Flow
|
||||
import kotlinx.coroutines.flow.filter
|
||||
import kotlinx.coroutines.flow.map
|
||||
import kotlinx.datetime.Instant
|
||||
import space.kscience.controls.api.PropertyChangedMessage
|
||||
import space.kscience.controls.misc.PropertyHistory
|
||||
import space.kscience.controls.misc.ValueWithTime
|
||||
import space.kscience.dataforge.meta.transformations.MetaConverter
|
||||
|
||||
public fun <T> DeviceMessageStorage.propertyHistory(
|
||||
propertyName: String,
|
||||
converter: MetaConverter<T>,
|
||||
): PropertyHistory<T> = object : PropertyHistory<T> {
|
||||
override fun flowHistory(from: Instant, until: Instant): Flow<ValueWithTime<T>> =
|
||||
read<PropertyChangedMessage>(from..until)
|
||||
.filter { it.property == propertyName }
|
||||
.map { ValueWithTime(converter.metaToObject(it.value), it.time) }
|
||||
}
|
@ -31,7 +31,7 @@ public fun DeviceManager.storeMessages(
|
||||
val storage = factory.build(context, meta)
|
||||
logger.debug { "Message storage with meta = $meta created" }
|
||||
|
||||
return hubMessageFlow().filter(filterCondition).onEach { message ->
|
||||
return hubMessageFlow(context).filter(filterCondition).onEach { message ->
|
||||
storage.write(message)
|
||||
}.onCompletion {
|
||||
storage.close()
|
||||
@ -39,4 +39,26 @@ public fun DeviceManager.storeMessages(
|
||||
}.launchIn(context)
|
||||
}
|
||||
|
||||
///**
|
||||
// * @return the list of deviceMessages that describes changes of specified property of specified device sorted by time
|
||||
// * @param sourceDeviceName a name of device, history of which property we want to get
|
||||
// * @param propertyName a name of property, history of which we want to get
|
||||
// * @param factory a factory that produce mongo clients
|
||||
// */
|
||||
//public suspend fun getPropertyHistory(
|
||||
// sourceDeviceName: String,
|
||||
// propertyName: String,
|
||||
// factory: Factory<EventStorage>,
|
||||
// meta: Meta = Meta.EMPTY,
|
||||
//): List<PropertyChangedMessage> {
|
||||
// return factory(meta).use {
|
||||
// it.getPropertyHistory(sourceDeviceName, propertyName)
|
||||
// }
|
||||
//}
|
||||
//
|
||||
//
|
||||
//public enum class StorageKind {
|
||||
// DEVICE_HUB,
|
||||
// MAGIX_SERVER
|
||||
//}
|
||||
|
||||
|
@ -104,9 +104,10 @@ public fun Plot.plotDeviceProperty(
|
||||
coroutineScope: CoroutineScope = device.context,
|
||||
configuration: Scatter.() -> Unit = {},
|
||||
): Job = scatter(configuration).run {
|
||||
val clock = device.context.clock
|
||||
val data = TimeData()
|
||||
device.propertyMessageFlow(propertyName).sample(sampling).transform {
|
||||
data.append(it.time, it.value.extractValue())
|
||||
data.append(it.time ?: clock.now(), it.value.extractValue())
|
||||
data.trim(maxAge, maxPoints, minPoints)
|
||||
emit(data)
|
||||
}.onEach {
|
||||
|
@ -103,8 +103,8 @@ suspend fun main() {
|
||||
|
||||
monitorEndpoint.subscribe(DeviceManager.magixFormat).onEach { (magixMessage, payload) ->
|
||||
mutex.withLock {
|
||||
val delay = Clock.System.now() - payload.time
|
||||
latest[magixMessage.sourceEndpoint] = Clock.System.now() - payload.time
|
||||
val delay = Clock.System.now() - payload.time!!
|
||||
latest[magixMessage.sourceEndpoint] = Clock.System.now() - payload.time!!
|
||||
max[magixMessage.sourceEndpoint] =
|
||||
maxOf(delay, max[magixMessage.sourceEndpoint] ?: ZERO)
|
||||
}
|
||||
|
Loading…
Reference in New Issue
Block a user